大数据
流式处理
Kafka
源码解析

Kafka系列 09 - 消费者源码分析 03:更新Offset、重置Position、自动任务和消息拉取

简介:讲解OffsetFetchRequest、ListOffsetRequest、HeartbeatRequest、OffsetCommitRequest和FetchRequest的处理

1. 更新偏移量

在前面的讲解中,KafkaConsumer的pollOnce(long timeout)方法中已经分析了GroupCoordinator的定位及Rebalance操作的具体流程,此时KafkaConsumer们已经与GroupCoordinator成功建立连接,同时也知道了分配给自己的主题及分区,接下来首先需要根据服务端存储的偏移量信息来更新自己所订阅的分区的偏移量并重置Position值,这个操作的入口如下:

  • // KafkaConsumer类
  • private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
  • ...
  • // 前面定位GroupCoordinator、Rebalance等操作已完成
  • // fetch positions if we have partitions we're subscribed to that we
  • // don't know the offset for
  • /**
  • * 恢复SubscriptionState中对应的TopicPartitionState状态
  • * 主要是committed字段和position字段
  • */
  • if (!subscriptions.hasAllFetchPositions())
  • updateFetchPositions(this.subscriptions.missingFetchPositions());
  • ...
  • }
  • // SubscriptionState类
  • // 是否已经明确分配给自己的所有分区下一次将要拉取消息的位置
  • public boolean hasAllFetchPositions() {
  • for (TopicPartitionState state : assignment.values())
  • if (!state.hasValidPosition())
  • return false;
  • return true;
  • }
  • // position是否合法
  • public boolean hasValidPosition() {
  • return position != null;
  • }
  • // 筛选position未知的所有分区
  • public Set<TopicPartition> missingFetchPositions() {
  • Set<TopicPartition> missing = new HashSet<>();
  • for (Map.Entry<TopicPartition, TopicPartitionState> entry : assignment.entrySet())
  • if (!entry.getValue().hasValidPosition())
  • missing.add(entry.getKey());
  • return missing;
  • }

SubscriptionState对象会从自己存储的分区信息中先筛选出对应的position信息为空的分区,这部分信息存储在TopicPartitionState中。然后把这些分区交给KafkaConsumer类的updateFetchPositions(Set<TopicPartition> partitions)方法处理,从服务端获取对应的偏移量及Position信息:

  • // KafkaConsumer类
  • // 更新分区的position
  • private void updateFetchPositions(Set<TopicPartition> partitions) {
  • // 如有需要,重置所有分区的偏移量
  • coordinator.refreshCommittedOffsetsIfNeeded();
  • // 获取指定分区的position
  • fetcher.updateFetchPositions(partitions);
  • }

这个方法会进行两个操作:更新偏移量及重置Position。重置Position会在后面讲解,我们先看更新偏移量部分:

  • // ConsumerCoordinator类
  • // 发送OffsetFetchRequest请求,从服务端拉取最近提交的offset集合,并更新到subscriptions集合s
  • public void refreshCommittedOffsetsIfNeeded() {
  • // 根据needsFetchCommittedOffsets字段判断是否需要更新
  • if (subscriptions.refreshCommitsNeeded()) {
  • // 发送OffsetFetchRequest请求,并处理响应数据,得到新的offset信息
  • Map<TopicPartition, OffsetAndMetadata> offsets = fetchCommittedOffsets(subscriptions.assignedPartitions());
  • // 遍历得到的offset信息并进行更新
  • for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
  • TopicPartition tp = entry.getKey();
  • // verify assignment is still active
  • // 判断对应的TopicPartition在subscriptions是否有记录
  • if (subscriptions.isAssigned(tp))
  • // 如果有则更新偏移量
  • this.subscriptions.committed(tp, entry.getValue());
  • }
  • // 更新完毕,将needsFetchCommittedOffsets置为false
  • this.subscriptions.commitsRefreshed();
  • }
  • }
  • // SubscriptionState类
  • /**
  • * 是否需要从GroupCoordinator获取最近提交的offset,
  • * 当出现异步提交offset操作或者Rebalance操作刚完成时会将其设置为true,
  • * 成功获取最近提交的offset之后会设置为false
  • */
  • private boolean needsFetchCommittedOffsets;
  • public boolean refreshCommitsNeeded() {
  • return this.needsFetchCommittedOffsets;
  • }

此处subscriptions.refreshCommitsNeeded()返回的needsFetchCommittedOffsets字段已经在Rebalance操作成功之后被置为true了,读者可以回顾前面的源码,因此这里是会进行偏移量更新的。

1.1. OffsetFetchRequest请求发送

具体操作位于fetchCommittedOffsets(Set<TopicPartition> partitions)方法,通过发送OffsetFetchRequest请求来实现:

  • // ConsumerCoordinator类
  • // 发送OffsetFetchRequest并处理响应
  • public Map<TopicPartition, OffsetAndMetadata> fetchCommittedOffsets(Set<TopicPartition> partitions) {
  • while (true) {
  • // 保证GroupCoordinator是就绪的
  • ensureCoordinatorReady();
  • // contact coordinator to fetch committed offsets
  • // 构造OffsetFetchRequest,暂存到unsent中,等待发送
  • RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future = sendOffsetFetchRequest(partitions);
  • // 发送请求,会阻塞
  • client.poll(future);
  • // 判断是否发送成功
  • if (future.succeeded())
  • // 返回从服务端得到的offset
  • return future.value();
  • // 如果失败,判断是否可以重试
  • if (!future.isRetriable())
  • // 不可重试,抛出异常
  • throw future.exception();
  • // 可以重试,退避一段时间后再重试
  • time.sleep(retryBackoffMs);
  • }
  • }
  • private RequestFuture<Map<TopicPartition, OffsetAndMetadata>> sendOffsetFetchRequest(Set<TopicPartition> partitions) {
  • // 检查GroupCoordinator是可用的
  • if (coordinatorUnknown())
  • return RequestFuture.coordinatorNotAvailable();
  • log.debug("Group {} fetching committed offsets for partitions: {}", groupId, partitions);
  • // construct the request
  • // 构造OffsetFetchRequest请求对象
  • OffsetFetchRequest request = new OffsetFetchRequest(this.groupId, new ArrayList<>(partitions));
  • // send the request with a callback
  • // 暂存请求到unsent,等待发送
  • return client.send(coordinator, ApiKeys.OFFSET_FETCH, request)
  • // 适配响应处理器
  • .compose(new OffsetFetchResponseHandler());
  • }

有了前面处理GroupCoordinatorRequest和SyncGroupRequest的理解之后,这里发送OffsetFetchRequest请求的原理是大同小异的。OffsetFetchRequest内部包含了Group ID及需要更新偏移量的分区等信息。最终该请求会返回OffsetFetchResponse响应对象,类似的,这个响应对象由OffsetFetchResponseHandler处理。

1.2. OffsetFetchResponse响应处理

我们查看OffsetFetchResponseHandler的源码:

  • // ConsumerCoordinator类
  • // 处理OffsetFetchResponse响应
  • private class OffsetFetchResponseHandler extends CoordinatorResponseHandler<OffsetFetchResponse, Map<TopicPartition, OffsetAndMetadata>> {
  • @Override
  • public OffsetFetchResponse parse(ClientResponse response) {
  • return new OffsetFetchResponse(response.responseBody());
  • }
  • @Override
  • public void handle(OffsetFetchResponse response, RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
  • // 根据响应数据构造相应大小的字典用于存放处理后的offset
  • Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(response.responseData().size());
  • // 遍历响应结果
  • for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : response.responseData().entrySet()) {
  • // 得到TopicPartition
  • TopicPartition tp = entry.getKey();
  • // 得到对应的offset信息数据
  • OffsetFetchResponse.PartitionData data = entry.getValue();
  • // 判断是否有异常
  • if (data.hasError()) {
  • Errors error = Errors.forCode(data.errorCode);
  • log.debug("Group {} failed to fetch offset for partition {}: {}", groupId, tp, error.message());
  • if (error == Errors.GROUP_LOAD_IN_PROGRESS) {
  • // Group正在加载中
  • // just retry
  • future.raise(error);
  • } else if (error == Errors.NOT_COORDINATOR_FOR_GROUP) {
  • // 对于Group没有对应的Coordinator
  • // re-discover the coordinator and retry
  • coordinatorDead();
  • future.raise(error);
  • } else if (error == Errors.UNKNOWN_MEMBER_ID
  • || error == Errors.ILLEGAL_GENERATION) {
  • // 未知的MemberID
  • // 不合法的年代信息
  • // need to re-join group
  • subscriptions.needReassignment();
  • future.raise(error);
  • } else {
  • // 其他异常
  • future.raise(new KafkaException("Unexpected error in fetch offset response: " + error.message()));
  • }
  • return;
  • } else if (data.offset >= 0) {
  • // 没有异常,且返回的offset数据量大于0,将其添加到offsets字典中
  • // record the position with the offset (-1 indicates no committed offset to fetch)
  • offsets.put(tp, new OffsetAndMetadata(data.offset, data.metadata));
  • } else {
  • log.debug("Group {} has no committed offset for partition {}", groupId, tp);
  • }
  • }
  • // 传播offset集合,最终通过fetchCommittedOffset()方法返回
  • future.complete(offsets);
  • }
  • }

响应处理过程中,会从响应信息解析offset等信息,然后存入一个类型为Map的字典offsets中,最终该字典会逐级返回,在refreshCommittedOffsetsIfNeeded()被接收:

  • // ConsumerCoordinator类
  • // 发送OffsetFetchRequest请求,从服务端拉取最近提交的offset集合,并更新到subscriptions集合s
  • public void refreshCommittedOffsetsIfNeeded() {
  • // 根据needsFetchCommittedOffsets字段判断是否需要更新
  • if (subscriptions.refreshCommitsNeeded()) {
  • // 发送OffsetFetchRequest请求,并处理响应数据,得到新的offset信息
  • Map<TopicPartition, OffsetAndMetadata> offsets = fetchCommittedOffsets(subscriptions.assignedPartitions());
  • // 遍历得到的offset信息并进行更新
  • for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
  • TopicPartition tp = entry.getKey();
  • // verify assignment is still active
  • // 判断对应的TopicPartition在subscriptions是否有记录
  • if (subscriptions.isAssigned(tp))
  • // 如果有则更新偏移量
  • this.subscriptions.committed(tp, entry.getValue());
  • }
  • // 更新完毕,将needsFetchCommittedOffsets置为false
  • this.subscriptions.commitsRefreshed();
  • }
  • }

该方法接收这些数据之后,会传递给SubscriptionState对象进行更新,更新完后SubscriptionState对象的needsFetchCommittedOffsets字段会被置为false。

2. 重置Position

position值标识了下一次拉取分区数据时应该从哪里开始拉取;在某些场景下,例如首次消费某个Topic的分区,服务端的内部Offsets Topic中并没有记录当前消费者在此分区上的消费位置,所以消费者无法从服务端获取最近提交的Offset。此时如果用户手动指定消费的起始Offset,则可以从指定Offset开始消费,否则就需要重置position字段,通过特定重置策略来决定开始消费的Offset;当前版本Kafka有三种重置策略:LATEST(从最晚开始消费)、EARLIEST(从最早开始消费)或NONE(从提交的偏移量开始继续消费)。重置Position的操作在更新偏移量之后,通过Fetcher类从服务端获取。Fetcher是KafkaConsumer用来拉取消息的主要类,此处的Position重置以及后面要讲解的消息的拉取操作,都是通过该类完成的:

  • // KafkaConsumer类
  • // 更新分区的position
  • private void updateFetchPositions(Set<TopicPartition> partitions) {
  • // 如有需要,重置所有分区的偏移量
  • coordinator.refreshCommittedOffsetsIfNeeded();
  • // 获取指定分区的position
  • fetcher.updateFetchPositions(partitions);
  • }
  • // Fetcher类
  • public void updateFetchPositions(Set<TopicPartition> partitions) {
  • // reset the fetch position to the committed position
  • for (TopicPartition tp : partitions) {
  • /**
  • * 检测是否需要重置操作
  • * 1. subscriptions.isAssigned(tp):当前消费者是否被指定了tp分区
  • * 2. subscriptions.isFetchable(tp)
  • * isAssigned(tp) :当前消费者是否被指定了tp分区
  • * assignedState(tp).isFetchable():当前分区是否是可以被拉取的
  • * !paused:当前分区是否是非暂停拉取状态的
  • * hasValidPosition():position是否为null
  • * 结果:
  • * - 如果当前消费者没有被指定消费tp分区,那么不需要重置
  • * - 如果当前消费者被指定消费tp分区,且tp分区是非暂停拉取状态,同时position值不为null,那么不需要重置
  • */
  • if (!subscriptions.isAssigned(tp) || subscriptions.isFetchable(tp))
  • continue;
  • // TODO: If there are several offsets to reset, we could submit offset requests in parallel
  • // 走到这里说明需要重置position
  • // 判断是否指定了重置策略
  • if (subscriptions.isOffsetResetNeeded(tp)) {
  • // 指定了重置策略,按照指定策略进行更新
  • resetOffset(tp);
  • } else if (subscriptions.committed(tp) == null) {
  • // 最近一次该分区的提交为空,按照默认的defaultResetStrategy策略进行重置
  • // there's no committed position, so we need to reset with the default strategy
  • subscriptions.needOffsetReset(tp);
  • resetOffset(tp);
  • } else {
  • // 最近一次该分区的提交不为空,则将position重置为最近一次提交的offset
  • long committed = subscriptions.committed(tp).offset();
  • log.debug("Resetting offset for partition {} to the committed offset {}", tp, committed);
  • subscriptions.seek(tp, committed);
  • }
  • }
  • }

该方法会先判断分区是否需要重置Position,如果分区被分配给当前Consumer了,且分区没有处于非暂停拉取状态,就表明需要重置Position;接下来就是决定使用哪种方式重置Position,一共有三种情况:

  1. 指定了重置策略,使用指定策略重置。
  2. 未指定重置策略,且该分区没有最近提交的Offset记录(即前面更新偏移量时没有获得该分区之前的Offset提交记录),那么就按默认的策略重置,即LATEST。
  3. 未指定重置策略,但该分区有最近提交的Offset记录(即前面更新偏移量时获得了该分区之前的Offset提交记录),就将Position重置为该偏移量的值。

前两种情况使用resetOffset(TopicPartition partition)方法进行重置,后一种则直接使用SubscriptionState的seek(TopicPartition tp, long offset)方法进行重置。

我们先看前两种情况,resetOffset(TopicPartition partition)方法源码如下:

  • // Fetcher类
  • // 根据重置策略更新分区的position
  • private void resetOffset(TopicPartition partition) {
  • // 获取分区对应的重置策略
  • OffsetResetStrategy strategy = subscriptions.resetStrategy(partition);
  • final long timestamp;
  • // 根据重置策略选择时间戳
  • if (strategy == OffsetResetStrategy.EARLIEST)
  • // 最早策略
  • timestamp = ListOffsetRequest.EARLIEST_TIMESTAMP; // -2
  • else if (strategy == OffsetResetStrategy.LATEST)
  • // 最晚策略
  • timestamp = ListOffsetRequest.LATEST_TIMESTAMP; // -1
  • else
  • throw new NoOffsetForPartitionException(partition);
  • log.debug("Resetting offset for partition {} to {} offset.", partition, strategy.name().toLowerCase(Locale.ROOT));
  • // 向分区的Leader副本所在的节点发送OffsetsRequest请求,得到对应的offset,会阻塞
  • long offset = listOffset(partition, timestamp);
  • // we might lose the assignment while fetching the offset, so check it is still active
  • // 更新分区的position为得到的offset
  • if (subscriptions.isAssigned(partition))
  • this.subscriptions.seek(partition, offset);
  • }

该方法中对重置策略进行了合法检查,如果是EARLIEST和LATEST之外的策略会抛出NoOffsetForPartitionException异常。EARLIEST策略下,timestamp被置为ListOffsetRequest.EARLIEST_TIMESTAMP(值为-2),LATEST策略下,timestamp被置为ListOffsetRequest.EARLIEST_TIMESTAMP(值为-1),最终会使用TopicPartition分区对象和timestamp构造ListOffsetRequest请求并发送给Kafka服务端以获取Position值,这部分操作在listOffset(TopicPartition partition, long timestamp)方法中实现。

2.1. ListOffsetRequest请求发送

listOffset(TopicPartition partition, long timestamp)方法的源码如下:

  • // Fetcher类
  • private long listOffset(TopicPartition partition, long timestamp) {
  • while (true) {
  • // 构造OffsetsRequest请求到unsent,等待发送
  • RequestFuture<Long> future = sendListOffsetRequest(partition, timestamp);
  • // 发送请求
  • client.poll(future);
  • // 请求成功
  • if (future.succeeded())
  • // 返回响应结果
  • return future.value();
  • if (!future.isRetriable())
  • throw future.exception();
  • if (future.exception() instanceof InvalidMetadataException)
  • // 元数据无效异常,更新元数据
  • client.awaitMetadataUpdate();
  • else
  • // 退避一段时间后再次请求
  • time.sleep(retryBackoffMs);
  • }
  • }
  • // 发送OffsetsRequest请求
  • private RequestFuture<Long> sendListOffsetRequest(final TopicPartition topicPartition, long timestamp) {
  • // 构造字典
  • Map<TopicPartition, ListOffsetRequest.PartitionData> partitions = new HashMap<>(1);
  • partitions.put(topicPartition, new ListOffsetRequest.PartitionData(timestamp, 1));
  • // 从元数据中获取对应分区的信息
  • PartitionInfo info = metadata.fetch().partition(topicPartition);
  • if (info == null) {
  • // 分区信息为null,说明元数据过期了
  • metadata.add(topicPartition.topic());
  • log.debug("Partition {} is unknown for fetching offset, wait for metadata refresh", topicPartition);
  • // 元数据过期,stale:陈腐的; 不新鲜的; 走了味的;
  • return RequestFuture.staleMetadata();
  • } else if (info.leader() == null) {
  • // 分区leader为null
  • log.debug("Leader for partition {} unavailable for fetching offset, wait for metadata refresh", topicPartition);
  • return RequestFuture.leaderNotAvailable();
  • } else {
  • // 得到分区的Leader节点
  • Node node = info.leader();
  • // 构造ListOffsetRequest请求
  • ListOffsetRequest request = new ListOffsetRequest(-1, partitions);
  • // 暂存请求到unsent集合,等待发送,注意,这里的请求是发送给需要获取Position的分区所在节点的Leader节点
  • return client.send(node, ApiKeys.LIST_OFFSETS, request)
  • // 适配响应处理器
  • .compose(new RequestFutureAdapter<ClientResponse, Long>() {
  • @Override
  • public void onSuccess(ClientResponse response, RequestFuture<Long> future) {
  • // 处理响应数据
  • handleListOffsetResponse(topicPartition, response, future);
  • }
  • });
  • }
  • }

需要注意的是,ListOffsetRequest请求不是发送给GroupCoordinator的,而是发送给需要获取Position的分区所在节点的Leader节点,这一点源码中是有体现的。

发送操作与前面的几类请求的发送是类似的,同样的,响应的处理也类似,交给了handleListOffsetResponse(TopicPartition topicPartition, ClientResponse clientResponse, RequestFuture<Long> future)方法。

2.2. ListOffsetResponse响应处理

Fetcher类的handleListOffsetResponse(TopicPartition topicPartition, ClientResponse clientResponse, RequestFuture<Long> future)方法源码如下:

  • // Fetcher类
  • // 处理根据特定策略请求更新offset的响应
  • private void handleListOffsetResponse(TopicPartition topicPartition,
  • ClientResponse clientResponse,
  • RequestFuture<Long> future) {
  • // 从响应数据构造ListOffsetResponse对象
  • ListOffsetResponse lor = new ListOffsetResponse(clientResponse.responseBody());
  • // 获取错误码
  • short errorCode = lor.responseData().get(topicPartition).errorCode;
  • if (errorCode == Errors.NONE.code()) {
  • // 无错误码
  • // 获取响应数据中相应分区的offset数据
  • List<Long> offsets = lor.responseData().get(topicPartition).offsets;
  • if (offsets.size() != 1)
  • throw new IllegalStateException("This should not happen.");
  • // 获取其中的第一个offset
  • long offset = offsets.get(0);
  • log.debug("Fetched offset {} for partition {}", offset, topicPartition);
  • // 将得到的offset传递出去
  • future.complete(offset);
  • } else if (errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
  • || errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
  • // 分区无Leader异常,未知的主题或分区异常
  • log.debug("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.",
  • topicPartition);
  • future.raise(Errors.forCode(errorCode));
  • } else {
  • // 其他异常
  • log.warn("Attempt to fetch offsets for partition {} failed due to: {}",
  • topicPartition, Errors.forCode(errorCode).message());
  • future.raise(new StaleMetadataException());
  • }
  • }

ListOffsetResponse中会使用一个Map装载offset结果数据,Map的键是TopicPartition分区对象,值是装有Long型offset值的List集合,处理时会取出第一个,然后通过RequestFuture回调对象的complete(T value)方法返回。offset值会逐级返回,最终由Fetcher类的resetOffset(TopicPartition partition)接收:

  • // Fetcher类
  • // 根据重置策略更新分区的position
  • private void resetOffset(TopicPartition partition) {
  • // 获取分区对应的重置策略
  • OffsetResetStrategy strategy = subscriptions.resetStrategy(partition);
  • final long timestamp;
  • // 根据重置策略选择时间戳
  • if (strategy == OffsetResetStrategy.EARLIEST)
  • // 最早策略
  • timestamp = ListOffsetRequest.EARLIEST_TIMESTAMP; // -2
  • else if (strategy == OffsetResetStrategy.LATEST)
  • // 最晚策略
  • timestamp = ListOffsetRequest.LATEST_TIMESTAMP; // -1
  • else
  • throw new NoOffsetForPartitionException(partition);
  • log.debug("Resetting offset for partition {} to {} offset.", partition, strategy.name().toLowerCase(Locale.ROOT));
  • // 向分区的Leader副本所在的节点发送OffsetsRequest请求,得到对应的offset,会阻塞
  • long offset = listOffset(partition, timestamp);
  • // we might lose the assignment while fetching the offset, so check it is still active
  • // 更新分区的position为得到的offset
  • if (subscriptions.isAssigned(partition))
  • this.subscriptions.seek(partition, offset);
  • }

接收到的offset值,依旧是使用SubscriptionState对象的seek(TopicPartition tp, long offset)方法进行定向更新到TopicPartition对应的TopicPartitionState对象上。这样就完成了Position的重置。

3. 自动任务

介绍完偏移量的更新及Position的重置,我们回到KafkaConsumer的pollOnce(long timeout)方法继续分析。接下来会调用ConsumerNetworkClient的executeDelayedTasks(long now)方法执行定时任务,包括心跳和offset的自动提交(如果配置了):

  • // KafkaConsumer类
  • private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
  • // 前面已经完成了GroupCoordinator的定位、分区分配、更新偏移量、重置Position等操作
  • long now = time.milliseconds();
  • // execute delayed tasks (e.g. autocommits and heartbeats) prior to fetching records
  • // 执行HeartbeatTask和AutoCommitTask定时任务
  • client.executeDelayedTasks(now);
  • ...
  • }
  • // ConsumerNetworkClient类
  • /**
  • * 定时任务队列,底层使用JDK的PriorityQueue实现,
  • * PriorityQueue是一个非线程安全、无界、具有优先级功能的优先队列
  • * 实现原理是小顶堆,底层基于数组
  • * PriorityBlockingQueue是PriorityQueue的线程安全实现
  • */
  • private final DelayedTaskQueue delayedTasks = new DelayedTaskQueue();
  • public void executeDelayedTasks(long now) {
  • delayedTasks.poll(now);
  • maybeTriggerWakeup();
  • }

delayedTasks我们前面提到过,它内部维护了一个PriorityQueue优先队列task,而在构造KafkaConsumer过程中创建ConsumerCoordinator实例时,会顺便创建HeartbeatTask和AutoCommitTask两个定时任务,回顾源码:

  • // ConsumerCoordinator类
  • public ConsumerCoordinator(ConsumerNetworkClient client,
  • String groupId,
  • int sessionTimeoutMs,
  • int heartbeatIntervalMs,
  • List<PartitionAssignor> assignors,
  • Metadata metadata,
  • SubscriptionState subscriptions,
  • Metrics metrics,
  • String metricGrpPrefix,
  • Time time,
  • long retryBackoffMs,
  • OffsetCommitCallback defaultOffsetCommitCallback,
  • boolean autoCommitEnabled,
  • long autoCommitIntervalMs,
  • ConsumerInterceptors<?, ?> interceptors,
  • boolean excludeInternalTopics) {
  • // 调用父类AbstractCoordinator的构造器
  • super(client,
  • groupId,
  • sessionTimeoutMs,
  • heartbeatIntervalMs,
  • metrics,
  • metricGrpPrefix,
  • time,
  • retryBackoffMs);
  • ...
  • if (autoCommitEnabled) {
  • // 如果设置了自动提交offset,根据配置提交间隔时间,创建AutoCommitTask任务
  • this.autoCommitTask = new AutoCommitTask(autoCommitIntervalMs);
  • // 启动AutoCommitTask
  • this.autoCommitTask.reschedule();
  • } else {
  • this.autoCommitTask = null;
  • }
  • ...
  • }
  • // AbstractCoordinator类
  • public AbstractCoordinator(ConsumerNetworkClient client,
  • String groupId,
  • int sessionTimeoutMs,
  • int heartbeatIntervalMs,
  • Metrics metrics,
  • String metricGrpPrefix,
  • Time time,
  • long retryBackoffMs) {
  • ...
  • // 会话超时时间
  • this.sessionTimeoutMs = sessionTimeoutMs;
  • // 心跳操作辅助对象
  • this.heartbeat = new Heartbeat(this.sessionTimeoutMs, heartbeatIntervalMs, time.milliseconds());
  • // 心跳任务
  • this.heartbeatTask = new HeartbeatTask();
  • ...
  • }

从创建这两个定时任务的代码可知,如果开启了自动提交,AutoCommitTask在创建后就直接启动了,而HeartbeatTask则不一样,它会在多个位置进行重置启动。这里我们总结一下两个任务的启动时间点:

  1. AutoCommitTask,前提是设置了自动提交,配置项enable.auto.commit
  • 创建ConsumerCoordinator时,会调用reschedule()启动,也即是上面那段代码。
  • 在成功处理SyncGroupResponse中的到的分区分配结果后,会调用reschedule()重新启动:
  • // 处理从SyncGroupResponse中的到的分区分配结果
  • protected void onJoinComplete(int generation,
  • String memberId,
  • String assignmentStrategy,
  • ByteBuffer assignmentBuffer) {
  • ...
  • // 如果是自动提交offset,重新规划自动提交周期
  • if (autoCommitEnabled)
  • autoCommitTask.reschedule();
  • ...
  • }
  1. HeartbeatTask
  • 在找到GroupCoordinator并与之成功建立连接之后,并得到了合法的年代信息,会调用reset()重置HeartbeatTask任务:
  • // 处理GroupMetadataResponse的入口方法
  • private void handleGroupMetadataResponse(ClientResponse resp, RequestFuture<Void> future) {
  • ...
  • // 尝试与coordinator建立连接
  • client.tryConnect(coordinator);
  • // start sending heartbeats only if we have a valid generation
  • // 启动定时心跳任务
  • if (generation > 0)
  • heartbeatTask.reset();
  • ...
  • }
  • 在JoinGroupRequest请求成功响应之后,会调用reset()重置HeartbeatTask任务,注意,此时SyncGroupRequest请求也已经收到响应了:
  • public void ensureActiveGroup() {
  • ...
  • // 创建JoinGroupRequest
  • RequestFuture<ByteBuffer> future = sendJoinGroupRequest();
  • // 添加RequestFutureListener
  • future.addListener(new RequestFutureListener<ByteBuffer>() {
  • @Override
  • public void onSuccess(ByteBuffer value) {
  • // handle join completion in the callback so that the callback will be invoked
  • // even if the consumer is woken up before finishing the rebalance
  • onJoinComplete(generation, memberId, protocol, value);
  • needsJoinPrepare = true;
  • // 重启心跳定时任务
  • heartbeatTask.reset();
  • }
  • @Override
  • public void onFailure(RuntimeException e) {
  • // we handle failures below after the request finishes. if the join completes
  • // after having been woken up, the exception is ignored and we will rejoin
  • }
  • });
  • ...
  • }

我们不妨看看AutoCommitTask和HeartbeatTask的reschedule()reset()方法都做了什么:

  • // AutoCommitTask类
  • private class AutoCommitTask implements DelayedTask {
  • // 提交间隔时间
  • private final long interval;
  • ...
  • private void reschedule() {
  • client.schedule(this, time.milliseconds() + interval);
  • }
  • private void reschedule(long at) {
  • client.schedule(this, at);
  • }
  • ...
  • }
  • // 定时任务,负责定时发送HeartBeatRequest并处理其响应
  • private class HeartbeatTask implements DelayedTask {
  • private boolean requestInFlight = false;
  • public void reset() {
  • // start or restart the heartbeat task to be executed at the next chance
  • // 获取当前时间
  • long now = time.milliseconds();
  • // 重置最后一次心跳时间lastSessionReset为当前时间
  • heartbeat.resetSessionTimeout(now);
  • // 将当前HeartbeatTask任务对象从delayedTasks队列中移除
  • client.unschedule(this);
  • // 没有正在发送的心跳请求时
  • if (!requestInFlight)
  • // 使用ConsumerNetworkClient重新调度心跳任务
  • client.schedule(this, now);
  • }
  • ...
  • }

它们内部调用了ConsumerNetworkClient的unschedule(DelayedTask task)schedule(DelayedTask task, long at)两个方法:

  • // ConsumerNetworkClient类
  • private final DelayedTaskQueue delayedTasks = new DelayedTaskQueue();
  • public void schedule(DelayedTask task, long at) {
  • // 向delayedTasks添加任务
  • delayedTasks.add(task, at);
  • }
  • public void unschedule(DelayedTask task) {
  • delayedTasks.remove(task);
  • }

其实对应源码非常简单,schedule(DelayedTask task, long at)负责将传入的DelayedTask任务添加到delayedTasks队列,而unschedule(DelayedTask task)负责从delayedTasks队列移除指定的任务。

在每次向delayedTasks队列添加任务时都会传入当前时间戳,而delayedTasks队列是优先队列,使用小顶堆的方式对内部元素进行排序,排序的依据就是传入的时间戳,因此添加的任务会依次从头至尾排队:

  • // DelayedTaskQueue类
  • public void add(DelayedTask task, long at) {
  • // tasks是PriorityQueue类型
  • tasks.add(new Entry(task, at));
  • }
  • // PriorityQueue类
  • public boolean add(E e) {
  • return offer(e);
  • }
  • public boolean offer(E e) {
  • if (e == null)
  • throw new NullPointerException();
  • modCount++;
  • int i = size;
  • if (i >= queue.length)
  • // 扩容操作
  • grow(i + 1);
  • size = i + 1;
  • if (i == 0)
  • queue[0] = e;
  • else
  • // 上浮调整操作
  • siftUp(i, e);
  • return true;
  • }

下面我们先介绍自动心跳任务的执行。

3.1. 自动心跳任务

任务都添加到delayedTasks队列等待执行,executeDelayedTasks(long now)方法调用了队列的poll(long now)尝试执行定时任务,该方法的源码很简单:

  • // DelayedTaskQueue类
  • public void poll(long now) {
  • /**
  • * 队列不为空,查看队首元素timeout是否小于或等于当前时间
  • * peek(),poll()方法都是非阻塞的,没有话就返回null
  • * 这里由于首先判空了,因此tasks.peek().timeout不会抛出空指针异常
  • */
  • while (!tasks.isEmpty() && tasks.peek().timeout <= now) {
  • // 取出队首元素
  • Entry entry = tasks.poll();
  • // 判断并执行
  • entry.task.run(now);
  • }
  • }

它会循环从tasks队列中查看队首任务的timeout字段是否小于或等于当前时间,如果是就将其取出,并执行内部taskrun(long now)方法。tasks内的元素都是Entry类型的,而Entry类型元素的task属性则是DelayedTask类型的,也即是HeartbeatTask和AutoCommitTask的父类。最终会执行taskrun(final long now)方法,该方法在HeartbeatTask中实现如下:

  • // HeartbeatTask类
  • public void run(final long now) {
  • /**
  • * 检查是否需要发送HeartbeatRequest
  • * 1. GroupCoordinator已确定且已连接
  • * 2. 不处于正在等待Partition分配结果的状态
  • * 3. 之前的HeartbeatRequest请求正常收到响应且没有过期
  • */
  • if (generation < 0 || needRejoin() || coordinatorUnknown()) {
  • // no need to send the heartbeat we're not using auto-assignment or if we are
  • // awaiting a rebalance
  • return;
  • }
  • // 检测HeartbeatResponse是否超时,若超时则认为GroupCoordinator宕机
  • if (heartbeat.sessionTimeoutExpired(now)) {
  • // we haven't received a successful heartbeat in one session interval
  • // so mark the coordinator dead
  • // 清空unsent集合中该GroupCoordinator所在Node对应的请求队列并将这些请求标记为异常
  • coordinatorDead();
  • return;
  • }
  • // 检测HeartbeatTask是否到期
  • if (!heartbeat.shouldHeartbeat(now)) {
  • /**
  • * 如果未到期,更新到期时间,将HeartbeatTask对象重新添加到DelayedTaskQueue中
  • * 注意,此时的时间已经更新为now + heartbeat.timeToNextHeartbeat(now)
  • */
  • // we don't need to heartbeat now, so reschedule for when we do
  • client.schedule(this, now + heartbeat.timeToNextHeartbeat(now));
  • } else {
  • // 已到期,更新最近发送HeartbeatRequest请求的时间,即将lastHeartbeatSend更新为当前时间
  • heartbeat.sentHeartbeat(now);
  • // 更新该字段,表示有HeartbeatRequest请求正在发送,还未收到响应,防止重复发送
  • requestInFlight = true;
  • // 发送心跳请求
  • RequestFuture<Void> future = sendHeartbeatRequest();
  • // 在返回的RequestFuture上添加RequestFutureListener监听器
  • future.addListener(new RequestFutureListener<Void>() {
  • @Override
  • public void onSuccess(Void value) {
  • // 心跳响应成功处理
  • requestInFlight = false;
  • long now = time.milliseconds();
  • // 更新最后一次收到心跳响应的时间
  • heartbeat.receiveHeartbeat(now);
  • // 计算下一次执行心跳任务的时间
  • long nextHeartbeatTime = now + heartbeat.timeToNextHeartbeat(now);
  • // 根据下一次执行心跳任务的时间重新添加心跳任务
  • client.schedule(HeartbeatTask.this, nextHeartbeatTime);
  • }
  • @Override
  • public void onFailure(RuntimeException e) {
  • // 心跳响应处理失败
  • requestInFlight = false;
  • // 重新规划心跳任务,执行时间为等待退避时间段之后
  • client.schedule(HeartbeatTask.this, time.milliseconds() + retryBackoffMs);
  • }
  • });
  • }
  • }
  • // Heartbeat类
  • // 检测是否过期
  • public boolean sessionTimeoutExpired(long now) {
  • // 距离上次会话重置或上次收到心跳响应的时间间隔已经大于超时时间了,则判定为会话超时
  • return now - Math.max(lastSessionReset, lastHeartbeatReceive) > timeout;
  • }
  • public boolean shouldHeartbeat(long now) {
  • return timeToNextHeartbeat(now) == 0;
  • }
  • // 计算下次发送心跳的时间
  • public long timeToNextHeartbeat(long now) {
  • // 当前距离上次发送心跳的时间
  • long timeSinceLastHeartbeat = now - Math.max(lastHeartbeatSend, lastSessionReset);
  • if (timeSinceLastHeartbeat > interval)
  • // 如果间隔时间大于设置的心跳间隔时间,说明时间到了,要发送心跳了,返回0
  • return 0;
  • else
  • // 否则计算还需要等待的时间
  • return interval - timeSinceLastHeartbeat;
  • }

run(long now)方法整体逻辑比较简单,首先根据几个条件判断是否需要发送心跳,同时根据最近重置心跳会话的时间、上次收到心跳响应的时间以及超时时间来判断心跳会话是否超时,如果超时则认为可能是GroupCoordinator宕机了;如果一切正常,就会判断是否已经到了发送心跳的时间,根据判断结果有两种处理方式:

  1. 没有到发送时间,通过将自己重新添加到delayedTasks任务队列来重新开启定时任务,此时会更新at时间戳参数为当前时间 + 还需等待的时间
  2. 已经到了发送时间,将会发送HeartbeatRequest请求完成心跳。

3.1.1. HeartbeatRequest请求发送

心跳任务的发送由sendHeartbeatRequest()方法负责处理,源码如下:

  • // AbstractCoordinator类
  • public RequestFuture<Void> sendHeartbeatRequest() {
  • // 创建心跳对象
  • HeartbeatRequest req = new HeartbeatRequest(this.groupId, this.generation, this.memberId);
  • // 使用ConsumerNetworkClient发送心跳,此时会将请求暂存到unsent集合,等待ConsumerNetworkClient的poll发送
  • return client.send(coordinator, ApiKeys.HEARTBEAT, req)
  • // 同时使用HeartbeatCompletionHandler将RequestFuture<ClientResponse>适配成RequestFuture<Void>
  • .compose(new HeartbeatCompletionHandler());
  • }

HeartbeatRequest对象即是心跳请求对象,内部携带了Group ID、年代、Member ID等信息,具体的发送操作与前面的几类请求大同小异,这里不再赘述。最终HeartbeatResponse响应由HeartbeatCompletionHandler处理。

3.1.2. HeartbeatResponse响应处理

  • // AbstractCoordinator类的内部类
  • private class HeartbeatCompletionHandler extends CoordinatorResponseHandler<HeartbeatResponse, Void> {
  • /**
  • * 具体业务线由父类CoordinatorResponseHandler处理
  • * 父类会调用该方法解析ClientResponse
  • * 而HeartbeatCompletionHandler子类实现中才真正对ClientResponse进行解析
  • * 最终解析为一个HeartbeatResponse对象
  • * @param response 未解析的响应
  • * @return 解析后的得到的响应
  • */
  • @Override
  • public HeartbeatResponse parse(ClientResponse response) {
  • return new HeartbeatResponse(response.responseBody());
  • }
  • /**
  • * 具体业务线由父类CoordinatorResponseHandler处理
  • * 父类会调用该方法解析ClientResponse
  • * 而HeartbeatCompletionHandler子类实现中才真正对ClientResponse进行解析
  • * 最终解析为一个HeartbeatResponse对象
  • * @param heartbeatResponse 已解析的响应
  • * @param future
  • */
  • @Override
  • public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
  • sensors.heartbeatLatency.record(response.requestLatencyMs());
  • // 获取响应错误
  • Errors error = Errors.forCode(heartbeatResponse.errorCode());
  • if (error == Errors.NONE) {
  • // 没有响应错误
  • log.debug("Received successful heartbeat response for group {}", groupId);
  • future.complete(null);
  • } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
  • || error == Errors.NOT_COORDINATOR_FOR_GROUP) {
  • // GroupCoordinator不可用,或没有Group对应的Coordinator
  • log.debug("Attempt to heart beat failed for group {} since coordinator {} is either not started or not valid.",
  • groupId, coordinator);
  • // 表示GroupCoordinator已失效
  • coordinatorDead();
  • // 抛出异常
  • future.raise(error);
  • } else if (error == Errors.REBALANCE_IN_PROGRESS) {
  • // 正在Rebalance过程中
  • log.debug("Attempt to heart beat failed for group {} since it is rebalancing.", groupId);
  • // 标记rejoinNeeded为true,重新发送JoinGroupRequest请求尝试重新加入Consumer Group
  • AbstractCoordinator.this.rejoinNeeded = true;
  • // 抛出异常
  • future.raise(Errors.REBALANCE_IN_PROGRESS);
  • } else if (error == Errors.ILLEGAL_GENERATION) {
  • // 表示HeartbeatRequest携带的generationId过期,说明可能进行了一次Rebalance操作
  • log.debug("Attempt to heart beat failed for group {} since generation id is not legal.", groupId);
  • // 标记rejoinNeeded为true,重新发送JoinGroupRequest请求尝试重新加入Consumer Group
  • AbstractCoordinator.this.rejoinNeeded = true;
  • // 抛出异常
  • future.raise(Errors.ILLEGAL_GENERATION);
  • } else if (error == Errors.UNKNOWN_MEMBER_ID) {
  • // GroupCoordinator无法识别该Consumer
  • log.debug("Attempt to heart beat failed for group {} since member id is not valid.", groupId);
  • // 清空memberId
  • memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
  • // 标记rejoinNeeded为true,重新发送JoinGroupRequest请求尝试重新加入Consumer Group
  • AbstractCoordinator.this.rejoinNeeded = true;
  • future.raise(Errors.UNKNOWN_MEMBER_ID);
  • } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
  • // GroupCoordinator授权失败
  • future.raise(new GroupAuthorizationException(groupId));
  • } else {
  • future.raise(new KafkaException("Unexpected error in heartbeat response: " + error.message()));
  • }
  • }
  • }

响应的处理中,如果没有携带异常,直接通过future.complete(null)完成回调,而如果存在异常,则会根据异常来做出一些记录和调整的。心跳操作并非是简单的保持GroupCoordinator对ConsumerCoordinator的活跃性检测,还包含了ConsumerCoordinator对GroupCoordinator的情况检测。我们可以观察几类错误类型:

  1. GROUP_COORDINATOR_NOT_AVAILABLE(GroupCoordinator不可用)、NOT_COORDINATOR_FOR_GROUP(找不到Group对应的GroupCoordinator):ConsumerCoordinator会认为GroupCoordinator已宕机,调用coordinatorDead()方法进行后续处理,同时抛出异常。
  2. REBALANCE_IN_PROGRESS(正在Rebalance)、ILLEGAL_GENERATION(非法的年代信息)、UNKNOWN_MEMBER_ID(未知的Member ID):ConsumerCoordinator会认为正处于Rebalance操作中,会将AbstractCoordinator.this.rejoinNeeded置为true,然后抛出异常。
  3. GROUP_AUTHORIZATION_FAILED(Group未授权):此时不做任何处理,直接抛出异常。

coordinatorDead()方法其实在很多地方都出现过,一直没有详细介绍,这里简单看一下它的源码:

  • // AbstractCoordinator类
  • protected void coordinatorDead() {
  • if (this.coordinator != null) {
  • log.info("Marking the coordinator {} dead for group {}", this.coordinator, groupId);
  • client.failUnsentRequests(this.coordinator, GroupCoordinatorNotAvailableException.INSTANCE);
  • // 将GroupCoordinator设置为null,表示需要重选GroupCoordinator
  • this.coordinator = null;
  • }
  • }
  • // ConsumerNetworkClient类
  • protected void failUnsentRequests(Node node, RuntimeException e) {
  • // clear unsent requests to node and fail their corresponding futures
  • // 清空该Node对应的ClientRequest
  • List<ClientRequest> unsentRequests = unsent.remove(node);
  • if (unsentRequests != null) {
  • for (ClientRequest request : unsentRequests) {
  • // 调用raise()方法传递异常
  • RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback();
  • handler.raise(e);
  • }
  • }
  • }

作用比较简单,取消发往GroupCoordinator节点的请求,同时直接使用这些请求的回调抛出GroupCoordinatorNotAvailableException异常,然后将ConsumerCoordinator记录的GroupCoordinator置为null。这样在后面会重新发送GroupCoordinatorRequest、JoinGroupRequest及SyncGroupRequest一系列请求重新完成定位GroupCoordinator、分区分配、更新偏移量、重置Position等操作。

HeartbeatResponse的成功处理最终会调用发送HeartbeatRequest时创建的RequestFuture,完成心跳各类状态的更新,这部分代码在前面讲解的HeartbeatTask类的run(final long now)方法中,非常简单,读者请自行分析。

3.2. 自动提交offset任务

如果配置了自动提交offset(通过enable.auto.commit),AutoCommitTask则负责定时自动异步向服务端提交offset信息;该任务的调度原理与HeartbeatTask几乎一致,都是由DelayedTaskQueue队列的poll(long now)执行,最终依旧会交给AutoCommitTask的run(final long now)方法:

  • // AutoCommitTask类
  • public void run(final long now) {
  • // 判断GroupCoordinator是否可用,如果不可用则跳过提交
  • if (coordinatorUnknown()) {
  • log.debug("Cannot auto-commit offsets for group {} since the coordinator is unknown", groupId);
  • reschedule(now + retryBackoffMs);
  • return;
  • }
  • // 判断是否需要Rebalance,如果需要则跳过提交
  • if (needRejoin()) {
  • // skip the commit when we're rejoining since we'll commit offsets synchronously
  • // before the revocation callback is invoked
  • reschedule(now + interval);
  • return;
  • }
  • // 异步提交偏移量
  • commitOffsetsAsync(subscriptions.allConsumed(), new OffsetCommitCallback() {
  • // 在提交成功的回调中重新规划下一次提交计划
  • @Override
  • public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
  • if (exception == null) {
  • // 重新调度新的任务
  • reschedule(now + interval);
  • } else {
  • log.warn("Auto offset commit failed for group {}: {}", groupId, exception.getMessage());
  • // 重新调度新的任务
  • reschedule(now + interval);
  • }
  • }
  • });
  • }

该方法与HeartbeatTask的非常类似。提交offset操作由commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)完成,采用异步提交,第一个参数包含了消费分区需要提交的偏移量信息,第二个参数则是一个回调用于获知提交情况,源码如下:

  • // ConsumerCoordinator类
  • public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
  • // 将SubscriptionState的needsFetchCommittedOffset设置为true
  • this.subscriptions.needRefreshCommits();
  • // 创建并缓存OffsetCommitRequest请求,等待发送,响应由OffsetCommitResponseHandler处理
  • RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
  • // 选择回调函数
  • final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback;
  • // 添加监听器
  • future.addListener(new RequestFutureListener<Void>() {
  • @Override
  • public void onSuccess(Void value) {
  • if (interceptors != null)
  • interceptors.onCommit(offsets);
  • // 调用回调
  • cb.onComplete(offsets, null);
  • }
  • @Override
  • public void onFailure(RuntimeException e) {
  • // 异常处理
  • if (e instanceof RetriableException) {
  • cb.onComplete(offsets, new RetriableCommitFailedException("Commit offsets failed with retriable exception. You should retry committing offsets.", e));
  • } else {
  • cb.onComplete(offsets, e);
  • }
  • }
  • });
  • // ensure the commit has a chance to be transmitted (without blocking on its completion).
  • // Note that commits are treated as heartbeats by the coordinator, so there is no need to
  • // explicitly allow heartbeats through delayed task execution.
  • // 发送OffsetCommitRequest
  • client.pollNoWakeup();
  • }

该方法会使用sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets)发送OffsetCommitRequest请求,最后解析响应数据后,会在回调里处理OffsetCommitCallback的调用。

3.2.1. OffsetCommitRequest请求发送

我们看一下sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets)方法的源码:

  • private RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) {
  • // 检查GroupCoordinator是否就绪
  • if (coordinatorUnknown())
  • return RequestFuture.coordinatorNotAvailable();
  • // 检查offsets参数是否为空,如果为空则不提交
  • if (offsets.isEmpty())
  • return RequestFuture.voidSuccess();
  • // create the offset commit request
  • // 转换offsets参数,转换格式如下
  • // Map<TopicPartition, OffsetAndMetadata> -> Map<TopicPartition, OffsetCommitRequest.PartitionData>
  • Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData = new HashMap<>(offsets.size());
  • for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
  • OffsetAndMetadata offsetAndMetadata = entry.getValue();
  • offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData(
  • offsetAndMetadata.offset(), offsetAndMetadata.metadata()));
  • }
  • // 根据转换得到的offsetData构造OffsetCommitRequest请求对象
  • OffsetCommitRequest req = new OffsetCommitRequest(this.groupId,
  • this.generation,
  • this.memberId,
  • OffsetCommitRequest.DEFAULT_RETENTION_TIME,
  • offsetData);
  • log.trace("Sending offset-commit request with {} to coordinator {} for group {}", offsets, coordinator, groupId);
  • // 使用ConsumerNetworkClient暂存请求到unsent,等待发送
  • return client.send(coordinator, ApiKeys.OFFSET_COMMIT, req)
  • // 适配响应处理
  • .compose(new OffsetCommitResponseHandler(offsets));
  • }

该方法会进行GroupCoordinator可用性以及待提交的offset数据的检查,然后将待提交offset数据进行格式转换,最后根据Group ID、年代信息、Member ID、Offset Retention Time(offset保留时间)、待提交offset数据等信息构造OffsetCommitRequest请求进行发送,返回的OffsetCommitRespon响应由OffsetCommitResponseHandler处理。

3.2.2. OffsetCommitRespon响应处理

OffsetCommitResponseHandler负责处理OffsetCommitRespon响应,源码如下:

  • // 处理OffsetCommitResponse响应的方法
  • private class OffsetCommitResponseHandler extends CoordinatorResponseHandler<OffsetCommitResponse, Void> {
  • private final Map<TopicPartition, OffsetAndMetadata> offsets;
  • public OffsetCommitResponseHandler(Map<TopicPartition, OffsetAndMetadata> offsets) {
  • this.offsets = offsets;
  • }
  • @Override
  • public OffsetCommitResponse parse(ClientResponse response) {
  • return new OffsetCommitResponse(response.responseBody());
  • }
  • // 处理入口
  • @Override
  • public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> future) {
  • sensors.commitLatency.record(response.requestLatencyMs());
  • Set<String> unauthorizedTopics = new HashSet<>();
  • // 遍历获得的OffsetCommitResponse中responseData字典
  • for (Map.Entry<TopicPartition, Short> entry : commitResponse.responseData().entrySet()) {
  • // 获取TopicPartition
  • TopicPartition tp = entry.getKey();
  • // 根据TopicPartition获取对应的当时提交的OffsetAndMetadata
  • OffsetAndMetadata offsetAndMetadata = this.offsets.get(tp);
  • // 获取对应的当时提交的offset
  • long offset = offsetAndMetadata.offset();
  • // 获取错误
  • Errors error = Errors.forCode(entry.getValue());
  • if (error == Errors.NONE) {
  • // 无异常
  • log.debug("Group {} committed offset {} for partition {}", groupId, offset, tp);
  • // 判断subscriptions的assignment中是否记录了响应的TopicPartition
  • if (subscriptions.isAssigned(tp))
  • // update the local cache only if the partition is still assigned
  • // 如果记录了就更新相应的偏移量信息
  • subscriptions.committed(tp, offsetAndMetadata);
  • } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
  • log.error("Not authorized to commit offsets for group {}", groupId);
  • // GroupCoordinator未授权异常
  • future.raise(new GroupAuthorizationException(groupId));
  • return;
  • } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
  • // Topic未授权异常
  • unauthorizedTopics.add(tp.topic());
  • } else if (error == Errors.OFFSET_METADATA_TOO_LARGE
  • || error == Errors.INVALID_COMMIT_OFFSET_SIZE) {
  • // 提交的偏移量信息过大异常
  • // 无效的偏移量大小异常
  • // raise the error to the user
  • log.debug("Offset commit for group {} failed on partition {}: {}", groupId, tp, error.message());
  • future.raise(error);
  • return;
  • } else if (error == Errors.GROUP_LOAD_IN_PROGRESS) {
  • // Group正在加载中
  • // just retry
  • log.debug("Offset commit for group {} failed: {}", groupId, error.message());
  • future.raise(error);
  • return;
  • } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
  • || error == Errors.NOT_COORDINATOR_FOR_GROUP
  • || error == Errors.REQUEST_TIMED_OUT) {
  • // GroupCoordinator不可用
  • // 对于相应的Group没有对应的GroupCoordinator
  • log.debug("Offset commit for group {} failed: {}", groupId, error.message());
  • coordinatorDead();
  • future.raise(error);
  • return;
  • } else if (error == Errors.UNKNOWN_MEMBER_ID
  • || error == Errors.ILLEGAL_GENERATION
  • || error == Errors.REBALANCE_IN_PROGRESS) {
  • // 未知的MemberID
  • // 未知的年代信息
  • // 正在Rebalance过程中
  • // need to re-join group
  • log.debug("Offset commit for group {} failed: {}", groupId, error.message());
  • subscriptions.needReassignment();
  • future.raise(new CommitFailedException("Commit cannot be completed since the group has already " +
  • "rebalanced and assigned the partitions to another member. This means that the time " +
  • "between subsequent calls to poll() was longer than the configured session.timeout.ms, " +
  • "which typically implies that the poll loop is spending too much time message processing. " +
  • "You can address this either by increasing the session timeout or by reducing the maximum " +
  • "size of batches returned in poll() with max.poll.records."));
  • return;
  • } else {
  • // 其他异常
  • log.error("Group {} failed to commit partition {} at offset {}: {}", groupId, tp, offset, error.message());
  • future.raise(new KafkaException("Unexpected error in commit: " + error.message()));
  • return;
  • }
  • }
  • if (!unauthorizedTopics.isEmpty()) {
  • log.error("Not authorized to commit to topics {} for group {}", unauthorizedTopics, groupId);
  • future.raise(new TopicAuthorizationException(unauthorizedTopics));
  • } else {
  • future.complete(null);
  • }
  • }
  • }

响应处理过程中,会循环遍历OffsetCommitResponse对象中的responseData属性,该属性是一个Map<TopicPartition, Short>类型的字典,键为主题分区对象,值为错误码,会有以下几种异常情况,同时对于不同异常会有不同的处理方式:

  • OFFSET_METADATA_TOO_LARGE(值为12):提交的偏移量信息过大异常。抛出异常。
  • GROUP_LOAD_IN_PROGRESS(值为14):Group正在加载中。抛出异常。
  • GROUP_COORDINATOR_NOT_AVAILABLE(值为15):GroupCoordinator不可用。调用coordinatorDead(),抛出异常。
  • NOT_COORDINATOR_FOR_GROUP(值为16):Group没有对应的GroupCoordinator。调用coordinatorDead(),抛出异常。
  • ILLEGAL_GENERATION(值为22):非法的年代信息。移除SubscriptionState对象的groupSubscription中除subscription集合所有元素之外的元素,并将其needsPartitionAssignment标志为true,抛出异常。
  • UNKNOWN_MEMBER_ID(值为25):未知的Member ID。移除SubscriptionState对象的groupSubscription中除subscription集合所有元素之外的元素,并将其needsPartitionAssignment标志为true,抛出异常。
  • REBALANCE_IN_PROGRESS(值为27):正在Rebalance。移除SubscriptionState对象的groupSubscription中除subscription集合所有元素之外的元素,并将其needsPartitionAssignment标志为true,抛出异常。
  • INVALID_COMMIT_OFFSET_SIZE(值为28):无效的偏移量大小。抛出异常。
  • TOPIC_AUTHORIZATION_FAILED(值为29):主题未授权。抛出异常。
  • GROUP_AUTHORIZATION_FAILED(值为30):Group未授权。抛出异常。
  • REQUEST_TIMED_OUT(值为7):请求超时。调用coordinatorDead(),抛出异常。

如果没有异常,会通过SubscriptionState对象更新对应的分区状态TopicPartitionState对象的committed(TopicPartition tp, OffsetAndMetadata offset)记录,该方法前面讲解过,这里进行回顾:

  • // SubscriptionState
  • public void committed(TopicPartition tp, OffsetAndMetadata offset) {
  • assignedState(tp).committed(offset);
  • }
  • // TopicPartitionState
  • // 设置最近一次提交的offset
  • private void committed(OffsetAndMetadata offset) {
  • this.committed = offset;
  • }

有效正确的响应结果会调用RequestFuture回调对象的complete(T value)方法,将事件往前传递,最终会调用AutoCommitTask类run(final long now)方法里定义的OffsetCommitCallback回调的onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception)方法,在该方法中会重新调度AutoCommitTask定时任务;注意,该方法得到的offsets是发送OffsetCommitRequest时准备的,并非是服务端返回的结果(个人觉得这里的处理有一定的风险,因为服务端对提交的offset并不一定会全部处理成功,对于OffsetCommitCallback的回调来说,似乎传入服务端返回的结果更为可靠)。

4. 消息拉取

终于到了讲解最为核心的操作的部分了。在经过一系列的预备操作之后,就可以正式地向Kafka服务端拉取消息了,这个操作依旧在KafkaConsumer的pollOnce(long timeout)方法中:

  • // KafkaConsumer类
  • private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
  • ...
  • // 前面已经完成了GroupCoordinator的定位、分区分配、更新偏移量、重置Position、执行心跳或自动提交offset定时任务等操作
  • // init any new fetches (won't resend pending fetches)
  • // 尝试从completedFetches缓存中解析消息
  • Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
  • // if data is available already, e.g. from a previous network client poll() call to commit,
  • // then just return it immediately
  • // 判断缓存中是否有消息
  • if (!records.isEmpty())
  • return records;
  • // 创建并缓存FetchRequest请求
  • fetcher.sendFetches();
  • // 发送FetchRequest请求
  • client.poll(timeout, now);
  • // 从completedFetches缓存中解析消息
  • return fetcher.fetchedRecords();
  • }

拉取消息的过程主要是fetcher.fetchedRecords(),但这一步仅仅是拉取;拉取消息是需要先发送特定请求,由服务端返回响应,从响应中才能获取到消息的。pollOnce(long timeout)是先进行的消息拉取,然后才发送拉取请求的。因此第一次执行fetcher.fetchedRecords()时是会直接返回的空字典的,因此会往下执行fetcher.sendFetches()发送拉取请求,我们先分析这部分的代码。

4.1. FetchRequest请求发送

在正式讲解FetchRequest的发送之前,我们先熟悉一下Fetcher类中几个重要的属性,这些属性与后面FetchRequest和FetchResponse的处理有很大的关系:

  • // 负责网络通信
  • private final ConsumerNetworkClient client;
  • // 在服务端收到FetchRequest之后并不是立即响应,而是当可返回的消息数据积累到至少minBytes个字节时才进行响应。
  • // 每个FetchResponse中包含多条消息,提高网络的有效负载
  • private final int minBytes; //(fetch.min.bytes)
  • // 等待FetchResponse的最长时间,服务端根据此时间决定何时进行响应
  • private final int maxWaitMs; //(fetch.max.wait.ms)
  • // 每次fetch操作的最大字节数
  • private final int fetchSize; //(max.partition.fetch.bytes)
  • // 每次获取Record的最大数量
  • private final int maxPollRecords; //(max.poll.records)
  • // 每个FetchResponse首先会转换为CompletedFetch对象进入此队列缓存
  • private final List<CompletedFetch> completedFetches;
  • // 保存了CompletedFetch解析后的结果集合
  • private PartitionRecords<K, V> nextInLineRecords = null;

KafkaConsumer在向服务端拉取数据时,服务端并不是立即响应当前请求,而是会将请求累积在一起,当消息数据量达到设置的阈值,或等待时间达到了最大时间,才会将多条消息一次性返回。在创建Fetcher对象时,会根据传入参数来初始化这些配置,其中minBytes表示每次拉取消息的最小大小,maxWaitMs表示每次拉取消息最大的等待时间,fetchSize表示每次拉取时每个分区的返回的消息的最大大小,maxPollRecords表示每次拉取消息的最多条数。

同时,当FetcherResponse返回后,会先转换为CompletedFetch对象存储在completedFetches队列中,在后续处理时将对CompletedFetch进行解析然后存储在PartitionRecords对象nextInLineRecords中。

CompletedFetch类的定义如下:

  • // Fetcher内部类
  • private static class CompletedFetch {
  • // 分区
  • private final TopicPartition partition;
  • // 拉取的offset
  • private final long fetchedOffset;
  • // 拉取的分区消息数据
  • private final FetchResponse.PartitionData partitionData;
  • private final FetchResponseMetricAggregator metricAggregator;
  • ...
  • }
  • // FetchResponse内部类
  • public static final class PartitionData {
  • // 错误码
  • public final short errorCode;
  • // 高水位记录
  • public final long highWatermark;
  • // 消息记录集缓冲区
  • public final ByteBuffer recordSet;
  • ...
  • }

CompletedFetch记录的是拉取到的消息的来源分区、拉取的offset以及分区消息数据,分区消息数据以PartitionData对象表示,PartitionData对象中存放了错误码、高水位记录以及消息记录集缓冲区。

PartitionRecords对象内则存放了分区对应的消息合集,同时存储了消息合集中第一条消息的offset,定义如下:

  • private static class PartitionRecords<K, V> {
  • // records中第一条消息的offset
  • private long fetchOffset;
  • // 消息对应的TopicPartition
  • private TopicPartition partition;
  • // 消息集合
  • private List<ConsumerRecord<K, V>> records;
  • ...
  • }

了解了上面的几个属性之后,我们开始分析用于发送拉取请求的Fetcher类的sendFetches()方法,它的源码如下:

  • // Fetcher类
  • // 发送拉取消息的请求
  • public void sendFetches() {
  • // 遍历createFetchRequest()方法得到的待发送请求字典,类型Map<Node, FetchRequest>
  • for (Map.Entry<Node, FetchRequest> fetchEntry: createFetchRequests().entrySet()) {
  • // 得到FetchRequest对象
  • final FetchRequest request = fetchEntry.getValue();
  • // 使用ConsumerNetworkClient发送请求
  • client.send(fetchEntry.getKey(), ApiKeys.FETCH, request)
  • // 并注册FetchResponse处理方法
  • .addListener(new RequestFutureListener<ClientResponse>() {
  • // 处理FetchResponse响应数据
  • @Override
  • public void onSuccess(ClientResponse resp) {
  • // 从ClientResponse响应对象数据组装FetchResponse对象
  • FetchResponse response = new FetchResponse(resp.responseBody());
  • /**
  • * 得到响应信息中所有的分区信息
  • * responseData()方法返回值类型为Map<TopicPartition, PartitionData>
  • */
  • Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet());
  • FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions);
  • // 遍历responseData()方法返回的响应信息
  • for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
  • // 获取分区
  • TopicPartition partition = entry.getKey();
  • // 从对应的FetchRequest中获取分区对应的offset
  • long fetchOffset = request.fetchData().get(partition).offset;
  • // 获取响应数据
  • FetchResponse.PartitionData fetchData = entry.getValue();
  • // 将响应数据构造为CompletedFetch对象添加到completedFetches队列
  • completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator));
  • }
  • sensors.fetchLatency.record(resp.requestLatencyMs());
  • sensors.fetchThrottleTimeSensor.record(response.getThrottleTime());
  • }
  • @Override
  • public void onFailure(RuntimeException e) {
  • log.debug("Fetch failed", e);
  • }
  • });
  • }
  • }

从源码可知,sendFetches()主要处理发送逻辑,而构造FetchRequest的操作在createFetchRequests()方法中,该方法的源码如下:

  • // Fetcher类
  • // 创建FetchRequest请求集合,key是Node,value是发往对应Node的FetchRequest
  • private Map<Node, FetchRequest> createFetchRequests() {
  • // create the fetch info
  • // 获取集群元数据
  • Cluster cluster = metadata.fetch();
  • // 用于暂存需要发送的请求
  • Map<Node, Map<TopicPartition, FetchRequest.PartitionData>> fetchable = new HashMap<>();
  • /**
  • * 遍历可拉取的分区,这里可拉取的条件如下:
  • * 1. 该分区是分配给当前消费者的;
  • * 2. 该分区未被标记为暂停状态,同时对应的TopicPartitionState的position不为null;
  • * 3. nextInLineRecords中没有来自此分区的消息;
  • * 4. completedFetches队列,诶呦来自此分区的CompletedFetch。
  • * 详见fetchablePartitions()方法
  • */
  • for (TopicPartition partition : fetchablePartitions()) {
  • // 获取分区对应的Node信息
  • Node node = cluster.leaderFor(partition);
  • // 如果Node信息为null,强制更新集群元数据
  • if (node == null) {
  • metadata.requestUpdate();
  • } else if (this.client.pendingRequestCount(node) == 0) {
  • // 如果没有正在发往该Node的请求
  • // if there is a leader and no in-flight requests, issue a new fetch
  • // 从fetchable中查询node对应的Map<TopicPartition, FetchRequest.PartitionData>数据
  • Map<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node);
  • // 如果查找的是null,就新建一个
  • if (fetch == null) {
  • fetch = new HashMap<>();
  • fetchable.put(node, fetch);
  • }
  • // 获取该分区当前的position,该position记录了下次要从Kafka服务端获取的消息的offset
  • long position = this.subscriptions.position(partition);
  • // 创建PartitionData对象,添加到对应的fetch字典中
  • fetch.put(partition, new FetchRequest.PartitionData(position, this.fetchSize));
  • log.trace("Added fetch request for partition {} at offset {}", partition, position);
  • }
  • }
  • // create the fetches
  • // 创建存储FetchRequest的字典,键为Node,值为发往该Node的FetchRequest
  • Map<Node, FetchRequest> requests = new HashMap<>();
  • // 遍历fetchable字典
  • for (Map.Entry<Node, Map<TopicPartition, FetchRequest.PartitionData>> entry : fetchable.entrySet()) {
  • Node node = entry.getKey();
  • // 创建FetchRequest
  • FetchRequest fetch = new FetchRequest(this.maxWaitMs, this.minBytes, entry.getValue());
  • requests.put(node, fetch);
  • }
  • return requests;
  • }
  • private Set<TopicPartition> fetchablePartitions() {
  • // 获取所有分配给当前消费者的可拉取的分区
  • Set<TopicPartition> fetchable = subscriptions.fetchablePartitions();
  • // 从fetchable移除nextInLineRecords记录中的分区,因为这些分区的响应还未被完全处理
  • if (nextInLineRecords != null && !nextInLineRecords.isEmpty())
  • fetchable.remove(nextInLineRecords.partition);
  • // 从fetchable移除存在于completedFetches中分区,因为这些分区的响应还未被完全处理
  • for (CompletedFetch completedFetch : completedFetches)
  • fetchable.remove(completedFetch.partition);
  • return fetchable;
  • }

createFetchRequests()方法会首先获取可拉取消息的分区,通过SubscriptionState对象的fetchablePartitions()实现。可被拉取的分区有合法的Position记录,同时未被标记为暂停拉取:

  • // SubscriptionState类
  • // 获取分配给当前消费者的可拉取分区的信息
  • public Set<TopicPartition> fetchablePartitions() {
  • Set<TopicPartition> fetchable = new HashSet<>();
  • // 遍历assignment
  • for (Map.Entry<TopicPartition, TopicPartitionState> entry : assignment.entrySet()) {
  • /**
  • * 判断是否可以拉取,isFetchable()为true有两个条件
  • * 1. 对应的TopicPartition未被标记为暂停状态;
  • * 2. 对应的TopicPartitionState的position不为null
  • */
  • if (entry.getValue().isFetchable())
  • fetchable.add(entry.getKey());
  • }
  • return fetchable;
  • }
  • // TopicPartitionState类
  • /**
  • * 判断是否可以拉取,isFetchable()为true有两个条件
  • * 1. 对应的TopicPartition未被标记为暂停状态;
  • * 2. 对应的TopicPartitionState的position不为null
  • */
  • private boolean isFetchable() {
  • return !paused && hasValidPosition();
  • }

同时Fetcher类fetchablePartitions()方法在从SubscriptionState对象中获取可拉取的分区集合之后,还会从中剔除存在于暂存响应的两个集合nextInLineRecordscompletedFetches中的分区,因为这些分区的响应还未被完全处理。

得到了可拉取消息的分区集合之后,将对它们进行遍历,查找每个分区对应的Leader节点,如果Leader节点不存在,会强制更新集群元数据,如果存在,就构建一个类型为Map<Node, Map<TopicPartition, FetchRequest.PartitionData>>的字典fetchable,这里解释一下这个字典的结构,其中键为Leader节点Node对象,值为一个Map<TopicPartition, FetchRequest.PartitionData>的字典对象,这个字典的键为分区对象,前面的Leader节点即为该分区的Leader,值是以该分区的position和每次fetch操作的最大字节数fetchSize构造的FetchRequest.PartitionData对象。这个结构比较复杂,其实就是将可拉取消息的分区进行整理,映射上对应的Leader节点以及保存该分区信息的FetchRequest.PartitionData对象。

最终该字典还会进行一次转换,以Leader节点为键,以根据对应的FetchRequest.PartitionData构造的FetchRequest对象为值,最终得到Map类型的字典requests进行返回。

Fetcher类的sendFetches()方法在接收到返回的requests字典后,又会对其进行遍历,将FetchRequest类型的值对象发送到对应的键所标识的Node节点,然后通过给发送请求返回的RequestFuture添加监听器来处理FetchResponse响应;回顾这部分源码:

  • // Fetcher
  • // 发送拉取消息的请求
  • public void sendFetches() {
  • // 遍历createFetchRequest()方法得到的待发送请求字典,类型Map<Node, FetchRequest>
  • for (Map.Entry<Node, FetchRequest> fetchEntry: createFetchRequests().entrySet()) {
  • // 得到FetchRequest对象
  • final FetchRequest request = fetchEntry.getValue();
  • // 使用ConsumerNetworkClient发送请求
  • client.send(fetchEntry.getKey(), ApiKeys.FETCH, request)
  • // 并注册FetchResponse处理方法
  • .addListener(new RequestFutureListener<ClientResponse>() {
  • // 处理FetchResponse响应数据
  • @Override
  • public void onSuccess(ClientResponse resp) {
  • // 从ClientResponse响应对象数据组装FetchResponse对象
  • FetchResponse response = new FetchResponse(resp.responseBody());
  • /**
  • * 得到响应信息中所有的分区信息
  • * responseData()方法返回值类型为Map<TopicPartition, PartitionData>
  • */
  • Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet());
  • FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions);
  • // 遍历responseData()方法返回的响应信息
  • for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
  • // 获取分区
  • TopicPartition partition = entry.getKey();
  • // 从对应的FetchRequest中获取分区对应的offset
  • long fetchOffset = request.fetchData().get(partition).offset;
  • // 获取响应数据
  • FetchResponse.PartitionData fetchData = entry.getValue();
  • // 将响应数据构造为CompletedFetch对象添加到completedFetches队列
  • completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator));
  • }
  • sensors.fetchLatency.record(resp.requestLatencyMs());
  • sensors.fetchThrottleTimeSensor.record(response.getThrottleTime());
  • }
  • @Override
  • public void onFailure(RuntimeException e) {
  • log.debug("Fetch failed", e);
  • }
  • });
  • }
  • }

4.2. FetchResponse响应处理

在上面Fetcher类的sendFetches()方法中,已经贴出了处理FetchResponse响应的代码,逻辑也并不负责:遍历FetchResponse响应的responseData字典,这个字典的结构为Map<TopicPartition, PartitionData>,键为主题分区,值为拉取的该主题分区的消息数据对象PartitionData。最终会将遍历的有用数据构造为一个CompletedFetch对象添加到Fetcher类的completedFetches集合中。

其实这里还没有对响应数据完全处理,在前面提到过,KafkaConsumer的pollOnce(long timeout)方法里会调用fetcher.fetchedRecords()尝试从nextInLineRecordscompletedFetches缓存中解析数据,再次回顾:

  • // KafkaConsumer类
  • private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
  • ...
  • // 前面已经完成了GroupCoordinator的定位、分区分配、更新偏移量、重置Position、执行心跳或自动提交offset定时任务等操作
  • // init any new fetches (won't resend pending fetches)
  • // 尝试从nextInLineRecords和completedFetches缓存中解析消息
  • Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
  • // if data is available already, e.g. from a previous network client poll() call to commit,
  • // then just return it immediately
  • // 判断缓存中是否有消息
  • if (!records.isEmpty())
  • return records;
  • // 创建并缓存FetchRequest请求
  • fetcher.sendFetches();
  • // 发送FetchRequest请求
  • client.poll(timeout, now);
  • // 从completedFetches缓存中解析消息
  • return fetcher.fetchedRecords();
  • }

因此其实Fetcher类的fetchedRecords()方法才是最终解析数据的方法,源码如下:

  • // Fetcher类
  • // 解析completedFetches中的消息响应数据
  • public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
  • // needsPartitionAssignment标记为true,表示可能需要重新分区
  • if (this.subscriptions.partitionAssignmentNeeded()) {
  • // 返回空集合
  • return Collections.emptyMap();
  • } else {
  • // 遍历completedFetches,按照TopicPartition进行分类
  • Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
  • // 一次最大的获取Record数量
  • int recordsRemaining = maxPollRecords;
  • // 获取迭代器
  • Iterator<CompletedFetch> completedFetchesIterator = completedFetches.iterator();
  • // 当还可以继续获取时
  • while (recordsRemaining > 0) {
  • if (nextInLineRecords == null || nextInLineRecords.isEmpty()) {
  • // 当nextInLineRecords为空时,会解析completedFetches,最终保存在nextInLineRecords集合中
  • // completedFetches为空,直接退出
  • if (!completedFetchesIterator.hasNext())
  • break;
  • // 从completedFetches获取CompletedFetch对象
  • CompletedFetch completion = completedFetchesIterator.next();
  • completedFetchesIterator.remove();
  • // 使用parseFetchedData()解析CompletedFetch,赋值给nextInLineRecords
  • nextInLineRecords = parseFetchedData(completion);
  • } else {
  • // nextInLineRecords不为空,将nextInLineRecords中的消息添加到drained中,并维护recordsRemaining记录
  • recordsRemaining -= append(drained, nextInLineRecords, recordsRemaining);
  • }
  • }
  • return drained;
  • }
  • }

这个方法主要是处理nextInLineRecordscompletedFetches两个缓存,处理逻辑如下:

  1. 检查是否需要重分区,如果需要直接返回空字典,否则继续处理。
  2. 准备一个Map<TopicPartition, List<ConsumerRecord<K, V>>>类型的字典drained用于存放处理得到的消息记录。
  3. 获取completedFetches集合的迭代器completedFetchesIterator
  4. 开启一个while循环,循环的条件是recordsRemaining > 0,即以一次最多可返回的消息条数阈值为结束循环的条件。
  5. 判断nextInLineRecords是否为空,如果为空会继续判断completedFetches内是否还有元素,如果二者都没有,说明不存在未处理的数据或暂存的数据都处理完了,直接跳出while循环,返回drained字典即可。
  6. 如果nextInLineRecords为空,但completedFetches中还有元素,则从completedFetches集合内取出CompletedFetch对象,交给parseFetchedData(...)方法处理,得到PartitionRecords对象,赋值给nextInLineRecords
  7. 如果nextInLineRecords不为空,则使用append(...)nextInLineRecords进行处理,处理后得到的消息数据会添加到drained字典中。
  8. 重复第5 ~ 7步,直到recordsRemaining小于或等于0,表示达到一次最多可返回的消息条数阈值,或nextInLineRecordscompletedFetches都为空,结束循环,返回drained字典。

其实这个操作的流程比较简单,总结一下:不断将completedFetches集合中的数据转换为PartitionRecords,交给nextInLineRecords,然后解析nextInLineRecords得到消息数据添加到drained字典,直到达到一次最多可返回的消息条数阈值或nextInLineRecordscompletedFetches都为空,结束并返回drained字典。

这里有两个方法很重要,一个是解析CompletedFetch对象的parseFetchedData(CompletedFetch completedFetch)方法,一个是处理nextInLineRecords对象的append(Map<TopicPartition, List<ConsumerRecord<K, V>>> drained, PartitionRecords<K, V> partitionRecords, int maxRecords)方法。这两个方法存在于Fetcher类中,先看parseFetchedData(...)方法:

  • // Fetcher类
  • // 解析CompletedFetch对象
  • private PartitionRecords<K, V> parseFetchedData(CompletedFetch completedFetch) {
  • // 获取分区
  • TopicPartition tp = completedFetch.partition;
  • // 获取分区对应的数据
  • FetchResponse.PartitionData partition = completedFetch.partitionData;
  • // 获取拉取消息的offset,这个offset会与前面发送拉取请求时记录的position进行比较,以判断拉取到的消息的偏移量是否正确
  • long fetchOffset = completedFetch.fetchedOffset;
  • int bytes = 0;
  • int recordsCount = 0;
  • PartitionRecords<K, V> parsedRecords = null;
  • try {
  • if (!subscriptions.isFetchable(tp)) {
  • // 分区是不可被拉取的,这种情况可能出现在由于处于Rebalance操作中或分区被标记为停止的情况下
  • // this can happen when a rebalance happened or a partition consumption paused
  • // while fetch is still in-flight
  • log.debug("Ignoring fetched records for partition {} since it is no longer fetchable", tp);
  • } else if (partition.errorCode == Errors.NONE.code()) {
  • // 没有错误,解析数据
  • // we are interested in this fetch only if the beginning offset matches the
  • // current consumed position
  • // 从subscriptions中获取TopicPartition对应的下次要从Kafka服务端获取的消息的offset
  • Long position = subscriptions.position(tp);
  • // 检查是否和返回的fetchOffset相同,如果不同表示得到的响应数据与当初希望拉取的数据的偏移量不同,直接返回null
  • if (position == null || position != fetchOffset) {
  • log.debug("Discarding stale fetch response for partition {} since its offset {} does not match " +
  • "the expected offset {}", tp, fetchOffset, position);
  • return null;
  • }
  • // 得到装载消息数据的ByteBuffer
  • ByteBuffer buffer = partition.recordSet;
  • // 将消息数据转换为MemoryRecords对象,该对象包含了压缩器相关的配置
  • MemoryRecords records = MemoryRecords.readableRecords(buffer);
  • List<ConsumerRecord<K, V>> parsed = new ArrayList<>();
  • boolean skippedRecords = false;
  • // 遍历records
  • for (LogEntry logEntry : records) {
  • // Skip the messages earlier than current position.
  • // 判断是否需要跳过比当前position还要早的消息
  • if (logEntry.offset() >= position) {
  • // 解析消息并添加到parsed集合中
  • parsed.add(parseRecord(tp, logEntry));
  • // 维护已读取的字节数记录
  • bytes += logEntry.size();
  • } else {
  • // 跳过消息,将skippedRecords置为true
  • skippedRecords = true;
  • }
  • }
  • // 消息数量
  • recordsCount = parsed.size();
  • this.sensors.recordTopicFetchMetrics(tp.topic(), bytes, recordsCount);
  • if (!parsed.isEmpty()) {
  • // 读取到的消息数量大于0,将读取到的消息数据封装为一个PartitionRecords对象
  • log.trace("Adding fetched record for partition {} with offset {} to buffered record list", tp, position);
  • parsedRecords = new PartitionRecords<>(fetchOffset, tp, parsed);
  • ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1);
  • this.sensors.recordsFetchLag.record(partition.highWatermark - record.offset());
  • } else if (buffer.limit() > 0 && !skippedRecords) {
  • /**
  • * 如果读取到的消息数量为0,则进一步判断
  • * 1. 返回的消息数据的长度是否是大于0的
  • * 2. 是否没有跳过过早的消息
  • * 如果这两条都满足,说明返回的消息数据是有效的,且是新的消息
  • * 如果是这种情况,说明可能是由于消息数据的过大,大于指定的Fetch Size,此时抛出异常
  • */
  • // we did not read a single message from a non-empty buffer
  • // because that message's size is larger than fetch size, in this case
  • // record this exception
  • Map<TopicPartition, Long> recordTooLargePartitions = Collections.singletonMap(tp, fetchOffset);
  • throw new RecordTooLargeException("There are some messages at [Partition=Offset]: "
  • + recordTooLargePartitions
  • + " whose size is larger than the fetch size "
  • + this.fetchSize
  • + " and hence cannot be ever returned."
  • + " Increase the fetch size on the client (using max.partition.fetch.bytes),"
  • + " or decrease the maximum message size the broker will allow (using message.max.bytes).",
  • recordTooLargePartitions);
  • }
  • } else if (partition.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
  • || partition.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
  • // 没有找到分区对应的Leader,或者没有找到分区对应的主题,此时本地的集群元数据可能过期了,需要更新
  • this.metadata.requestUpdate();
  • } else if (partition.errorCode == Errors.OFFSET_OUT_OF_RANGE.code()) {
  • // 拉取消息的偏移量超出了允许范围,判断获取到的消息的偏移量是否与请求时保存的position相同
  • if (fetchOffset != subscriptions.position(tp)) {
  • // 不相同,说明拉取的消息不匹配,直接丢弃消息
  • log.debug("Discarding stale fetch response for partition {} since the fetched offset {}" +
  • "does not match the current offset {}", tp, fetchOffset, subscriptions.position(tp));
  • } else if (subscriptions.hasDefaultOffsetResetPolicy()) {
  • // 相同,说明此时拉取的消息的偏移量确实超出范围了,如果SubscriptionState设置了偏移量重置策略,则标记需要重置偏移量
  • log.info("Fetch offset {} is out of range for partition {}, resetting offset", fetchOffset, tp);
  • subscriptions.needOffsetReset(tp);
  • } else {
  • // 相同,说明此时拉取的消息的偏移量确实超出范围了,但没有设置偏移量重置策略,则抛出异常
  • throw new OffsetOutOfRangeException(Collections.singletonMap(tp, fetchOffset));
  • }
  • } else if (partition.errorCode == Errors.TOPIC_AUTHORIZATION_FAILED.code()) {
  • // 主题授权失败异常,抛出异常
  • log.warn("Not authorized to read from topic {}.", tp.topic());
  • throw new TopicAuthorizationException(Collections.singleton(tp.topic()));
  • } else if (partition.errorCode == Errors.UNKNOWN.code()) {
  • // 未知异常
  • log.warn("Unknown error fetching data for topic-partition {}", tp);
  • } else {
  • // 其他异常
  • throw new IllegalStateException("Unexpected error code " + partition.errorCode + " while fetching data");
  • }
  • } finally {
  • completedFetch.metricAggregator.record(tp, bytes, recordsCount);
  • }
  • // 返回封装好的PartitionRecords对象
  • return parsedRecords;
  • }

接下来是append(...)方法:

  • // Fetcher类
  • // 将partitionRecords中的消息数据转换到drained中
  • private int append(Map<TopicPartition, List<ConsumerRecord<K, V>>> drained,
  • PartitionRecords<K, V> partitionRecords,
  • int maxRecords) {
  • // 如果partitionRecords为空,则直接返回
  • if (partitionRecords.isEmpty())
  • return 0;
  • // 查看当前消费者是否被分配了对应的分区
  • if (!subscriptions.isAssigned(partitionRecords.partition)) {
  • // 当前消费者没有分配到该分区,这种情况一般是由于消息响应返回之前发生了Rebalance操作造成的
  • // this can happen when a rebalance happened before fetched records are returned to the consumer's poll call
  • log.debug("Not returning fetched records for partition {} since it is no longer assigned", partitionRecords.partition);
  • } else {
  • // 符合条件
  • // note that the consumed position should always be available as long as the partition is still assigned
  • // 获取当前subscriptions中存储的对应分区下次要从Kafka服务端获取的消息的offset
  • long position = subscriptions.position(partitionRecords.partition);
  • // 判断当前分区是否是可拉取的
  • if (!subscriptions.isFetchable(partitionRecords.partition)) {
  • // this can happen when a partition is paused before fetched records are returned to the consumer's poll call
  • // 当前分区不可被拉取,这种情况一般是由于在消息响应返回之前分区被标记为暂停拉取而造成的
  • log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", partitionRecords.partition);
  • } else if (partitionRecords.fetchOffset == position) {
  • // 拉取的消息的offset与之前暂存的position是相等的,说明消息被正确拉取了,且拉的是最新的记录
  • // we are ensured to have at least one record since we already checked for emptiness
  • // 从partitionRecords中取出maxRecords条记录,maxRecords指定了一次最多可以获取的消息数量
  • List<ConsumerRecord<K, V>> partRecords = partitionRecords.take(maxRecords);
  • // 计算下一次的offset
  • long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
  • log.trace("Returning fetched records at offset {} for assigned partition {} and update " +
  • "position to {}", position, partitionRecords.partition, nextOffset);
  • // 从drained中获取相应分区的<ConsumerRecord<K, V>>集合
  • List<ConsumerRecord<K, V>> records = drained.get(partitionRecords.partition);
  • if (records == null) {
  • // 如果为null,就直接将partRecords存入
  • records = partRecords;
  • drained.put(partitionRecords.partition, records);
  • } else {
  • // 如果存在,将partRecords中的元素添加到records中
  • records.addAll(partRecords);
  • }
  • // 更新下次要从Kafka服务端获取的消息的offset
  • subscriptions.position(partitionRecords.partition, nextOffset);
  • // 返回消息记录的大小
  • return partRecords.size();
  • } else {
  • // these records aren't next in line based on the last consumed position, ignore them
  • // they must be from an obsolete request
  • log.debug("Ignoring fetched records for {} at offset {} since the current position is {}",
  • partitionRecords.partition, partitionRecords.fetchOffset, position);
  • }
  • }
  • // 发生了Rebalance操作,将partitionRecords中的消息数据直接丢弃
  • partitionRecords.discard();
  • return 0;
  • }