大数据
流式处理
Kafka
源码解析

Kafka系列 08 - 消费者源码分析 02:Rebalance操作

简介:讲解GroupCoordinatorRequest、JoinGroupRequest和SyncGroupRequest请求的发送

1. 轮询操作

确定了订阅主题之后,我们调用了KafkaConsumer消费者对象的poll(long timeout)拉取消息数据。poll()方法是用于消息轮询的消费者核心API,通过while无限循环以及poll(long timeout)方法所构建的的持续轮询向服务器请求数据。轮询操作会处理所有的细节,包括群组协调、分区再均衡、发送心跳和获取数据,开发者只需要关注对所获取数据的处理业务即可。示例代码如下:

  • // 创建消费者
  • KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
  • // 订阅主题
  • kafkaConsumer.subscribe(Arrays.asList("test"));
  • try {
  • while (true) {
  • // 拉取消息
  • ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);
  • // 遍历拉取到的消息
  • for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
  • System.out.println("[" + consumerRecord.topic() + ":" + consumerRecord.partition() + ":" + consumerRecord.offset() + "]:" + consumerRecord.key() + " -> " + consumerRecord.value());
  • }
  • }
  • } finally {
  • // 关闭消费者
  • kafkaConsumer.close();
  • }

消费者必须持续对Kafka进行轮询,否则会被认为已经死亡,它的分区会被移交给群组里的其他消费者。传给poll(long timeout)方法的参数是一个超时时间,用于控制poll(long timeout)方法的阻塞时间(在消费者的缓冲区里没有可用数据时会发生阻塞)。如果该参数被设为0,poll(long timeout)会立即返回,否则它会在指定的毫秒数内一直等待broker返回数据。我们查看poll(long timeout)方法的源码:

  • // kafkaConsumer类
  • public ConsumerRecords<K, V> poll(long timeout) {
  • // 检查是否有并发操作
  • acquire();
  • try {
  • // 检查超时参数
  • if (timeout < 0)
  • throw new IllegalArgumentException("Timeout must not be negative");
  • // poll for new data until the timeout expires
  • // 记录当前时间用于计算是否超时
  • long start = time.milliseconds();
  • long remaining = timeout;
  • do {
  • // 调用pollOnce()方法拉取消息
  • Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
  • if (!records.isEmpty()) {
  • // 检查是否有消息返回
  • // before returning the fetched records, we can send off the next round of fetches
  • // and avoid block waiting for their responses to enable pipelining while the user
  • // is handling the fetched records.
  • //
  • // NOTE: since the consumed position has already been updated, we must not allow
  • // wakeups or any other errors to be triggered prior to returning the fetched records.
  • // Additionally, pollNoWakeup does not allow automatic commits to get triggered.
  • // 处理拉取返回的消息之前,先发送一次FetchRequest,使消息响应处理与请求的网络传输并行处理,以提高性能
  • fetcher.sendFetches();
  • // 将FetchRequest发送出去,不会阻塞,不能被中断,不会执行定时任务
  • client.pollNoWakeup();
  • // 处理拦截器操作
  • if (this.interceptors == null)
  • // 没有拦截器,直接返回ConsumerRecords对象
  • return new ConsumerRecords<>(records);
  • else
  • // 使用拦截器处理后返回
  • return this.interceptors.onConsume(new ConsumerRecords<>(records));
  • }
  • // 计算已用时间
  • long elapsed = time.milliseconds() - start;
  • // 计算剩余时间
  • remaining = timeout - elapsed;
  • } while (remaining > 0);
  • // 没有拉取到消息,返回空记录
  • return ConsumerRecords.empty();
  • } finally {
  • // 释放重入次数
  • release();
  • }
  • }

poll(long timeout)方法的入口处及finally块中分别调用了acquire()release()这两个前面讲过的方法,避免多线程并发调用。中间的代码是处理轮询操作的主要业务,其中使用了while循环实现超时控制,而核心的轮询代码则是pollOnce(remaining)的调用,我们先看看pollOnce(long timeout)的源码:

  • // KafkaConsumer类
  • private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
  • // TODO: Sub-requests should take into account the poll timeout (KAFKA-1894)
  • // 确保GroupCoordinator已就绪,如果没有就绪会一直阻塞
  • coordinator.ensureCoordinatorReady();
  • // 如果是AUTO_TOPICS或AUTO_PATTERN订阅模式
  • // ensure we have partitions assigned if we expect to
  • if (subscriptions.partitionsAutoAssigned())
  • // 完成Rebalance操作
  • coordinator.ensurePartitionAssignment();
  • // fetch positions if we have partitions we're subscribed to that we
  • // don't know the offset for
  • /**
  • * 恢复SubscriptionState中对应的TopicPartitionState状态
  • * 主要是committed字段和position字段
  • */
  • if (!subscriptions.hasAllFetchPositions())
  • updateFetchPositions(this.subscriptions.missingFetchPositions());
  • long now = time.milliseconds();
  • // execute delayed tasks (e.g. autocommits and heartbeats) prior to fetching records
  • // 执行HeartbeatTask和AutoCommitTask定时任务
  • client.executeDelayedTasks(now);
  • // init any new fetches (won't resend pending fetches)
  • // 尝试从completedFetches缓存中解析消息
  • Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
  • // if data is available already, e.g. from a previous network client poll() call to commit,
  • // then just return it immediately
  • // 判断缓存中是否有消息
  • if (!records.isEmpty())
  • return records;
  • // 创建并缓存FetchRequest请求
  • fetcher.sendFetches();
  • // 发送FetchRequest请求
  • client.poll(timeout, now);
  • // 从completedFetches缓存中解析消息
  • return fetcher.fetchedRecords();
  • }

这个方法的功能非常多,包括定位GroupCoordinator、完成分区Rebalance操作、重置偏移量、执行心跳及自动提交Offset定时任务(如果有)、解析消息数据、发送拉取请求等,几乎囊括了KafkaConsumer的所有主流功能,接下来我们将一一进行讲解。

2. 定位GroupCoordinator

pollOnce(long timeout)方法的第一行代码coordinator.ensureCoordinatorReady()用于确保服务端的GroupCoordinator已经准备就绪,一旦调用则会阻塞直到GroupCoordinator已就绪且可以接受请求;该方法来自于ConsumerCoordinator的父类AbstractCoordinator,源码如下:

  • // AbstractCoordinator类
  • /**
  • * Block until the coordinator for this group is known and is ready to receive requests.
  • * 该方法会阻塞,直到GroupCoordinator已就绪且可以接受请求
  • * 如果GroupCoordinator不正常,会发送GroupCoordinatorRequest请求
  • */
  • public void ensureCoordinatorReady() {
  • // 检测是否需要重新查找GroupCoordinator
  • while (coordinatorUnknown()) {
  • // 需要查找GroupCoordinator
  • // 查找负载最低的Node节点,创建GroupCoordinatorRequest请求,会阻塞
  • RequestFuture<Void> future = sendGroupCoordinatorRequest();
  • // 发送GroupCoordinatorRequest请求,该方法会阻塞,直到接收到GroupCoordinatorResponse响应
  • client.poll(future);
  • // 检测future的状态,查看是否有异常
  • if (future.failed()) {
  • // 出现异常
  • if (future.isRetriable())
  • // 异常是RetriableException,则阻塞更新Metadata元数据
  • client.awaitMetadataUpdate();
  • else
  • // 异常不是RetriableException,抛出
  • throw future.exception();
  • } else if (coordinator != null && client.connectionFailed(coordinator)) {
  • // we found the coordinator, but the connection has failed, so mark
  • // it dead and backoff before retrying discovery
  • // 连接不到GroupCoordinator,退避一段时间后重试
  • coordinatorDead();
  • time.sleep(retryBackoffMs);
  • }
  • }
  • }
  • // 检测是否需要重新查找GroupCoordinator
  • public boolean coordinatorUnknown() {
  • // coordinator是否为null
  • if (coordinator == null)
  • return true;
  • // 检测与GroupCoordinator的连接是否正常
  • if (client.connectionFailed(coordinator)) {
  • // 如果不正常,标记GroupCoordinator已死
  • coordinatorDead();
  • return true;
  • }
  • return false;
  • }
  • // 发送GroupCoordinatorRequest请求
  • private RequestFuture<Void> sendGroupCoordinatorRequest() {
  • // initiate the group metadata request
  • // find a node to ask about the coordinator
  • // 查找负载最低的Node节点
  • Node node = this.client.leastLoadedNode();
  • if (node == null) {
  • // TODO: If there are no brokers left, perhaps we should use the bootstrap set
  • // from configuration?
  • // 无节点可用,返回包含NoAvailableBrokersException的RequestFuture
  • return RequestFuture.noBrokersAvailable();
  • } else {
  • // create a group metadata request
  • log.debug("Sending coordinator request for group {} to broker {}", groupId, node);
  • // 创建GroupCoordinatorRequest请求
  • GroupCoordinatorRequest metadataRequest = new GroupCoordinatorRequest(this.groupId);
  • // 使用ConsumerNetworkClient发送请求,返回经过compose()适配的RequestFuture<Void>对象
  • return client.send(node, ApiKeys.GROUP_COORDINATOR, metadataRequest)
  • .compose(new RequestFutureAdapter<ClientResponse, Void>() {
  • @Override
  • public void onSuccess(ClientResponse response, RequestFuture<Void> future) {
  • // 处理GroupMetadataResponse的入口方法
  • handleGroupMetadataResponse(response, future);
  • }
  • });
  • }
  • }

当初次进行轮询操作时,因为此时没有确定GroupCoordinator所在的broker节点,因此coordinatorUnknown()的返回值必然是true,此时就会触发sendGroupCoordinatorRequest()方法发送GroupCoordinatorRequest请求,但是由于此时KafkaConsumer对服务端的元数据都是未知的,因此在sendGroupCoordinatorRequest()方法中查找负载最低的Node节点这一操作的返回值是null,然后向上返回了RequestFuture.noBrokersAvailable()得到的RequestFuture对象,该方法源码如下:

  • // RequestFuture类
  • public static <T> RequestFuture<T> noBrokersAvailable() {
  • return failure(new NoAvailableBrokersException());
  • }
  • public static <T> RequestFuture<T> failure(RuntimeException e) {
  • RequestFuture<T> future = new RequestFuture<T>();
  • future.raise(e);
  • return future;
  • }
  • public void raise(RuntimeException e) {
  • if (isDone)
  • throw new IllegalStateException("Invalid attempt to complete a request future which is already complete");
  • this.exception = e;
  • this.isDone = true;
  • fireFailure();
  • }

注意,此时返回的RequestFuture对象的isDone是标记为true的,同时内部包含了NoAvailableBrokersException异常,这个异常的继承链很重要,它继承自InvalidMetadataException,而InvalidMetadataException由继承自RetriableException,因此NoAvailableBrokersException类其实是RetriableException的子类。

ensureCoordinatorReady()方法中,无论sendGroupCoordinatorRequest()方法返回的是怎样的RequestFuture对象,它都会调用ConsumerNetworkClient的poll(RequestFuture<?> future)方法处理该RequestFuture对象,我们回顾ensureCoordinatorReady()的源码:

  • // AbstractCoordinator类
  • public void ensureCoordinatorReady() {
  • // 检测是否需要重新查找GroupCoordinator
  • while (coordinatorUnknown()) {
  • // 需要查找GroupCoordinator
  • // 查找负载最低的Node节点,创建GroupCoordinatorRequest请求
  • RequestFuture<Void> future = sendGroupCoordinatorRequest();
  • // 发送GroupCoordinatorRequest请求,该方法会阻塞,直到接收到GroupCoordinatorResponse响应
  • client.poll(future);
  • // 检测future的状态,查看是否有异常
  • if (future.failed()) {
  • // 出现异常
  • if (future.isRetriable())
  • // 异常是RetriableException,则阻塞更新Metadata元数据
  • client.awaitMetadataUpdate();
  • else
  • // 异常不是RetriableException,抛出
  • throw future.exception();
  • } else if (coordinator != null && client.connectionFailed(coordinator)) {
  • // we found the coordinator, but the connection has failed, so mark
  • // it dead and backoff before retrying discovery
  • // 连接不到GroupCoordinator,退避一段时间后重试
  • coordinatorDead();
  • time.sleep(retryBackoffMs);
  • }
  • }
  • }
  • // ConsumerNetworkClient
  • public void poll(RequestFuture<?> future) {
  • // 循环检测future是否完成,如果没有完成就执行poll()操作
  • while (!future.isDone())
  • poll(Long.MAX_VALUE);
  • }
  • // RequestFuture
  • public boolean isDone() {
  • return isDone;
  • }

还记得ConsumerNetworkClient类么?它内部封装了NetworkClient,负责主要的网络请求发送和响应处理业主,但此处我们不聊这一部分,仅仅关注它的poll(RequestFuture<?> future)方法,可见,当传入该方法的RequestFuture实例的idDone参数为true时,该方法会结束循环,因此ensureCoordinatorReady()会继续往下执行对future的结果进行处理;这里用到了RequestFuture的failed()isRetriable()两个方法进行判断:

  • public boolean failed() {
  • return isDone && exception != null;
  • }
  • public boolean isRetriable() {
  • return exception instanceof RetriableException;
  • }

这两个方法的实现也比较简单,其中failed()就不多说了,而isRetriable()的实现是判断该RequestFuture对象所包含的exception是否是RetriableException类型的,而前面我们讲到过,在本地消费者对集群元数据未知的情况下,sendGroupCoordinatorRequest()方法返回的RequestFuture对象内部包含的NoAvailableBrokersException异常就是RetriableException的子类,因此isRetriable()返回值是true,此时会调用client.awaitMetadataUpdate()进行集群元数据更新,这部分的原理与KafkaProducer是一样的,就不多赘述了。

2.1. GroupCoordinatorRequest请求发送

在元数据更新之后sendGroupCoordinatorRequest()方法就可以成功找到负载最低的Node节点,并发送GroupCoordinatorRequest请求了,我们回顾一下发送请求部分的代码:

  • // AbstractCoordinator类
  • // 发送GroupCoordinatorRequest请求
  • private RequestFuture<Void> sendGroupCoordinatorRequest() {
  • // initiate the group metadata request
  • // find a node to ask about the coordinator
  • // 查找负载最低的Node节点
  • Node node = this.client.leastLoadedNode();
  • if (node == null) {
  • // TODO: If there are no brokers left, perhaps we should use the bootstrap set
  • // from configuration?
  • // 无节点可用,返回包含NoAvailableBrokersException的RequestFuture
  • return RequestFuture.noBrokersAvailable();
  • } else {
  • // create a group metadata request
  • log.debug("Sending coordinator request for group {} to broker {}", groupId, node);
  • // 创建GroupCoordinatorRequest请求
  • GroupCoordinatorRequest metadataRequest = new GroupCoordinatorRequest(this.groupId);
  • // 使用ConsumerNetworkClient发送请求,返回经过compose()适配的RequestFuture<Void>对象
  • return client.send(node, ApiKeys.GROUP_COORDINATOR, metadataRequest)
  • .compose(new RequestFutureAdapter<ClientResponse, Void>() {
  • @Override
  • public void onSuccess(ClientResponse response, RequestFuture<Void> future) {
  • // 处理GroupMetadataResponse的入口方法
  • handleGroupMetadataResponse(response, future);
  • }
  • });
  • }
  • }

GroupCoordinatorRequest的构造非常简单,就是传入了指定的groupId;这里依旧会将GroupCoordinatorRequest对象通过ConsumerNetworkClient暂存起来,然后等待后面真正的发送处理。ConsumerNetworkClient的send(Node node, ApiKeys api, AbstractRequest request)方法源码如下:

  • // ConsumerNetworkClient类
  • // 封装请求为ClientRequest,保存到unsent集合中等待发送
  • public RequestFuture<ClientResponse> send(Node node, ApiKeys api, AbstractRequest request) {
  • long now = time.milliseconds();
  • RequestFutureCompletionHandler future = new RequestFutureCompletionHandler();
  • // 获取请求头
  • RequestHeader header = client.nextRequestHeader(api);
  • // 创建RequestSend对象
  • RequestSend send = new RequestSend(node.idString(), header, request.toStruct());
  • // 添加到unsent中,等待发送
  • put(node, new ClientRequest(now, true, send, future));
  • return future;
  • }

该方法的实现似曾相识,与KafkaProducer中发送请求的方式类似,将请求包装为RequestSend对象,然后再次包装RequestSend为ClientRequest对象,经由put(Node node, ClientRequest request)方法添加到ConsumerNetworkClient的unsent字典中:

  • // ConsumerNetworkClient类
  • // 缓存队列,key是Node节点,value是发往该Node节点的ClientRequest集合
  • private final Map<Node, List<ClientRequest>> unsent = new HashMap<>();
  • private void put(Node node, ClientRequest request) {
  • // 先从unsent中获取有没有对应node的List<ClientRequest>集合
  • List<ClientRequest> nodeUnsent = unsent.get(node);
  • if (nodeUnsent == null) {
  • // 如果没有则创建
  • nodeUnsent = new ArrayList<>();
  • unsent.put(node, nodeUnsent);
  • }
  • // 添加ClientRequest到对应的List<ClientRequest>集合
  • nodeUnsent.add(request);
  • }

unsent字典的类型是Map>,键是一个Node节点,值是发往该Node节点的ClientRequest集合,具体发送请求的操作依旧是ConsumerNetworkClient的poll操作进行,这个我们后面再讨论。

我们回到前面发送GroupCoordinatorRequest请求的代码,这里将核心代码都归纳在一起方便分析:

  • // AbstractCoordinator类
  • // 发送GroupCoordinatorRequest请求
  • private RequestFuture<Void> sendGroupCoordinatorRequest() {
  • ...
  • GroupCoordinatorRequest metadataRequest = new GroupCoordinatorRequest(this.groupId);
  • // 使用ConsumerNetworkClient发送请求,返回经过compose()适配的RequestFuture<Void>对象
  • return client.send(node, ApiKeys.GROUP_COORDINATOR, metadataRequest)
  • .compose(new RequestFutureAdapter<ClientResponse, Void>() {
  • @Override
  • public void onSuccess(ClientResponse response, RequestFuture<Void> future) {
  • // 处理GroupMetadataResponse的入口方法
  • handleGroupMetadataResponse(response, future);
  • }
  • });
  • ...
  • }
  • // ConsumerNetworkClient类
  • // 封装请求为ClientRequest,保存到unsent集合中等待发送
  • public RequestFuture<ClientResponse> send(Node node, ApiKeys api, AbstractRequest request) {
  • long now = time.milliseconds();
  • RequestFutureCompletionHandler future = new RequestFutureCompletionHandler();
  • // 获取请求头
  • RequestHeader header = client.nextRequestHeader(api);
  • // 创建RequestSend对象
  • RequestSend send = new RequestSend(node.idString(), header, request.toStruct());
  • // 添加到unsent中,等待发送
  • put(node, new ClientRequest(now, true, send, future));
  • return future;
  • }
  • // RequestFuture
  • // 将RequestFuture<T>适配为RequestFuture<S>
  • public <S> RequestFuture<S> compose(final RequestFutureAdapter<T, S> adapter) {
  • // 适配结果
  • final RequestFuture<S> adapted = new RequestFuture<S>();
  • // 在当前的RequestFuture上添加监听器
  • addListener(new RequestFutureListener<T>() {
  • @Override
  • public void onSuccess(T value) {
  • // 此处的onSuccess方法由抽象类RequestFutureAdapter的具体实现类重写
  • adapter.onSuccess(value, adapted);
  • }
  • @Override
  • public void onFailure(RuntimeException e) {
  • // 此处的onFailure()方法已被抽象类RequestFutureAdapter默认实现
  • adapter.onFailure(e, adapted);
  • }
  • });
  • return adapted;
  • }
  • // 为当前RequestFuture对象添加监听器
  • public void addListener(RequestFutureListener<T> listener) {
  • // 当前请求是否已经完成
  • if (isDone) {
  • // 已完成
  • if (exception != null)
  • // 如果有异常,直接调用listener的onFailure()
  • listener.onFailure(exception);
  • else
  • // 无异常,调用listener的onSuccess()
  • listener.onSuccess(value);
  • } else {
  • this.listeners.add(listener);
  • }
  • }
  • // RequestFutureAdapter类
  • public abstract class RequestFutureAdapter<F, T> {
  • public abstract void onSuccess(F value, RequestFuture<T> future);
  • // onFailure已经实现了
  • public void onFailure(RuntimeException e, RequestFuture<T> future) {
  • future.raise(e);
  • }
  • }

send(...)方法返回的是一个RequestFutureCompletionHandler对象,这个对象被包装在了ClientRequest对象中,便于在请求得到的响应中回调;它的源码如下:

  • public static class RequestFutureCompletionHandler
  • extends RequestFuture<ClientResponse>
  • implements RequestCompletionHandler {
  • /**
  • * 该方法重写了RequestCompletionHandler接口的
  • * RequestCompletionHandler接口只声明了这一个方法
  • */
  • @Override
  • public void onComplete(ClientResponse response) {
  • if (response.wasDisconnected()) {
  • // 如果是因为连接断开而产生的响应
  • ClientRequest request = response.request();
  • RequestSend send = request.request();
  • ApiKeys api = ApiKeys.forId(send.header().apiKey());
  • int correlation = send.header().correlationId();
  • log.debug("Cancelled {} request {} with correlation id {} due to node {} being disconnected",
  • api, request, correlation, send.destination());
  • // 调用RequestFuture的raise()方法,传递DisconnectException异常
  • raise(DisconnectException.INSTANCE);
  • } else {
  • // 否则正常完成回调,该方法来自RequestFuture类
  • complete(response);
  • }
  • }
  • }

可以看出,RequestFutureCompletionHandler类继承自RequestFuture,并实现了RequestCompletionHandler接口,而实现的onComplete(ClientResponse response)方法就来自于该接口。

compose(final RequestFutureAdapter<T, S> adapter)方法返回的RequestFutureCompletionHandler实例调用了自己的compose(...)方法,该方法需要传入一个RequestFutureAdapter类型的适配器对象,在内部通过添加RequestFutureListener监听器的方式对这个适配器进行了装饰,并返回了一个新构造的RequestFuture对象作为最终返回值。这是典型的适配器模式的设计,将RequestFutureCompletionHandler对象适配为了一个新的RequestFuture对象,同时在适配的过程中添加了对响应数据的处理逻辑。

当ClientRequest请求后,得到响应时会回调ClientRequest内部包装的RequestFutureCompletionHandler对象的相应的onComplete(...)方法,这里的事件传递流是:RequestFutureCompletionHandler的onComplete(...)方法 –调用–> RequestFutureCompletionHandler的complete(...)raise(...)方法 –调用–> RequestFutureCompletionHandler的fireSuccess(...)fireFailure(...)方法 –调用–> RequestFutureListener的onSuccess(...)onFailure(...)方法 –调用–> RequestFutureAdapter的onSuccess(...)onFailure(...)方法 –调用–> handleGroupMetadataResponse(...)处理方法 –调用–> 适配的RequestFuture的complete(...)raise(...)方法。其中handleGroupMetadataResponse(...)处理方法用于处理GroupMetadataResponse响应对象,稍后会详细讨论。

sendGroupCoordinatorRequest()内只是将构建好的GroupMetadataRequest包装为ClientRequest对象放入ConsumerNetworkClient的unsent字典中,真正的发送操作还是由ConsumerNetworkClient的poll(RequestFuture<?> future)方法执行的,回顾源码:

  • // AbstractCoordinator类
  • public void ensureCoordinatorReady() {
  • // 检测是否需要重新查找GroupCoordinator
  • while (coordinatorUnknown()) {
  • // 需要查找GroupCoordinator
  • // 查找负载最低的Node节点,创建GroupCoordinatorRequest请求
  • RequestFuture<Void> future = sendGroupCoordinatorRequest();
  • // 发送GroupCoordinatorRequest请求,该方法会阻塞,直到接收到GroupCoordinatorResponse响应
  • client.poll(future);
  • // 检测future的状态,查看是否有异常
  • if (future.failed()) {
  • // 出现异常
  • if (future.isRetriable())
  • // 异常是RetriableException,则阻塞更新Metadata元数据
  • client.awaitMetadataUpdate();
  • else
  • // 异常不是RetriableException,抛出
  • throw future.exception();
  • } else if (coordinator != null && client.connectionFailed(coordinator)) {
  • // we found the coordinator, but the connection has failed, so mark
  • // it dead and backoff before retrying discovery
  • // 连接不到GroupCoordinator,退避一段时间后重试
  • coordinatorDead();
  • time.sleep(retryBackoffMs);
  • }
  • }
  • }

ConsumerNetworkClient的poll(RequestFuture<?> future)方法源码如下:

  • // ConsumerNetworkClient
  • public void poll(RequestFuture<?> future) {
  • // 循环检测future是否完成,如果没有完成就执行poll()操作
  • while (!future.isDone())
  • poll(Long.MAX_VALUE);
  • }
  • public void poll(long timeout) {
  • poll(timeout, time.milliseconds(), true);
  • }
  • private void poll(long timeout, long now, boolean executeDelayedTasks) {
  • // send all the requests we can send now
  • // 检测Node节点发送条件,循环处理unsent中缓存的请求,将发送请求绑定到KafkaChannel的send上,等待发送
  • trySend(now);
  • // ensure we don't poll any longer than the deadline for
  • // the next scheduled task
  • // 计算超时时间,取超时时间和delayedTasks队列中最近要执行的定时任务的时间的较小值,这里后面会讲解
  • timeout = Math.min(timeout, delayedTasks.nextTimeout(now));
  • // 使用NetworkClient处理消息发送
  • clientPoll(timeout, now);
  • now = time.milliseconds();
  • // handle any disconnects by failing the active requests. note that disconnects must
  • // be checked immediately following poll since any subsequent call to client.ready()
  • // will reset the disconnect status
  • // 检测消费者和每个Node之间的连接状态
  • checkDisconnects(now);
  • // execute scheduled tasks
  • // 根据executeDelayedTasks决定是否要处理delayedTasks队列中超时的定时任务
  • if (executeDelayedTasks)
  • // 处理超时的定时任务
  • delayedTasks.poll(now);
  • // try again to send requests since buffer space may have been
  • // cleared or a connect finished in the poll
  • // 再次调用trySend循环处理unsent中缓存的请求
  • trySend(now);
  • // fail requests that couldn't be sent if they have expired
  • // 处理unsent中超时的请求
  • failExpiredRequests(now);
  • }
  • // 发送unsent缓存中的ClientRequest
  • private boolean trySend(long now) {
  • // send any requests that can be sent now
  • boolean requestsSent = false;
  • // 遍历每个<Node, List<ClientRequest>>键值对
  • for (Map.Entry<Node, List<ClientRequest>> requestEntry: unsent.entrySet()) {
  • // 获取对应的Node节点
  • Node node = requestEntry.getKey();
  • // 获取对应的ClientRequest集合迭代器
  • Iterator<ClientRequest> iterator = requestEntry.getValue().iterator();
  • // 遍历ClientRequest集合
  • while (iterator.hasNext()) {
  • ClientRequest request = iterator.next();
  • // 检查节点是否可用
  • if (client.ready(node, now)) {
  • // 将请求绑定到KafkaChannel上
  • client.send(request, now);
  • // 从集合中移除对应的ClientRequest
  • iterator.remove();
  • requestsSent = true;
  • }
  • }
  • }
  • return requestsSent;
  • }
  • private void clientPoll(long timeout, long now) {
  • // 调用NetworkClient的poll进行消息发送
  • client.poll(timeout, now);
  • // 检测wakeup和wakeupDisabledCount,查看是否有其他线程中断
  • maybeTriggerWakeup();
  • }

从源码可以发现,ConsumerNetworkClient请求发送的流程与前面KafkaProducer中讲解的一模一样:在trySend(long now)方法中,遍历了unsent字典,先检查请求发往的Node节点是否可用,如果可用就通过父类NetworkClient的send(ClientRequest request, long now)将ClientRequest对象绑定在KafkaChannel的send属性上,最终调用clientPoll(long timeout, long now)方法使用NetworkClient的poll(long timeout, long now)方法使用Selector发送请求,这个方法在前面的KafkaProducer中已经讲解得非常详细了,这里就不再赘述了,只回顾处理请求得到的响应部分:

  • // NetworkClient类
  • public List<ClientResponse> poll(long timeout, long now) {
  • // 每次poll()的时候都会判断是否需要更新Metadata
  • long metadataTimeout = metadataUpdater.maybeUpdate(now);
  • try {
  • // 执行IO操作
  • this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
  • } catch (IOException e) {
  • log.error("Unexpected error during I/O", e);
  • }
  • // process completed actions
  • long updatedNow = this.time.milliseconds();
  • // 响应队列
  • List<ClientResponse> responses = new ArrayList<>();
  • // 处理completeSends队列
  • handleCompletedSends(responses, updatedNow);
  • // 处理completeReceives队列
  • handleCompletedReceives(responses, updatedNow);
  • // 处理disconnected列表
  • handleDisconnections(responses, updatedNow);
  • // 处理connected列表
  • handleConnections();
  • // 处理InFlightRequest中超时请求
  • handleTimedOutRequests(responses, updatedNow);
  • // invoke callbacks
  • // 遍历ClientResponse
  • for (ClientResponse response : responses) {
  • // 如果对应的ClientRequest有回调就执行回调
  • if (response.request().hasCallback()) {
  • try {
  • response.request().callback().onComplete(response);
  • } catch (Exception e) {
  • log.error("Uncaught error in request completion:", e);
  • }
  • }
  • }
  • return responses;
  • }

从上面源代码最后的部分我们可以看到,对于最终的ClientResponse响应对象的处理,会调用其对应的ClientRequest内部的callback回调的onComplete(ClientResponse response)方法,这个callback是RequestCompletionHandler类型的,也就是前面在使用ConsumerNetworkClient包装GroupCoordinatorRequest时创建的RequestFutureCompletionHandler回调,回顾源码:

  • // AbstractCoordinator类
  • private RequestFuture<Void> sendGroupCoordinatorRequest() {
  • ...
  • // 创建GroupCoordinatorRequest请求
  • GroupCoordinatorRequest metadataRequest = new GroupCoordinatorRequest(this.groupId);
  • // 使用ConsumerNetworkClient发送请求,返回经过compose()适配的RequestFuture<Void>对象
  • /**
  • * 注意,此处的send()方法内部会构建一个RequestFutureCompletionHandler回调处理器
  • * 然后使用ClientRequest实例将该回调处理器会与GroupCoordinatorRequest对象绑定在一起,
  • * 以便在处理ClientResponse请求响应时通过该回调处理器处理回调事件
  • */
  • return client.send(node, ApiKeys.GROUP_COORDINATOR, metadataRequest)
  • /**
  • * send()方法构建的RequestFutureCompletionHandler回调处理器会在compose()方法中进行适配
  • * 通过给RequestFutureCompletionHandler回调处理器添加监听器的方式,
  • * 产生了一个新的RequestFuture回调对象
  • */
  • .compose(new RequestFutureAdapter<ClientResponse, Void>() {
  • @Override
  • public void onSuccess(ClientResponse response, RequestFuture<Void> future) {
  • // 处理GroupMetadataResponse的入口方法
  • handleGroupMetadataResponse(response, future);
  • }
  • });
  • ...
  • }
  • // 封装请求为ClientRequest,保存到unsent集合中等待发送
  • public RequestFuture<ClientResponse> send(Node node, ApiKeys api, AbstractRequest request) {
  • long now = time.milliseconds();
  • RequestFutureCompletionHandler future = new RequestFutureCompletionHandler();
  • // 获取请求头
  • RequestHeader header = client.nextRequestHeader(api);
  • // 创建RequestSend对象
  • RequestSend send = new RequestSend(node.idString(), header, request.toStruct());
  • // 添加到unsent中,等待发送
  • put(node, new ClientRequest(now, true, send, future));
  • return future;
  • }

因此,当ClientRequest请求后,得到响应时会回调ClientRequest内部包装的RequestFutureCompletionHandler对象的相应的onComplete(...)方法,这里的事件传递流是:RequestFutureCompletionHandler的onComplete(...)方法 –调用–> RequestFutureCompletionHandler的complete(...)raise(...)方法 –调用–> RequestFutureCompletionHandler的fireSuccess(...)fireFailure(...)方法 –调用–> RequestFutureListener的onSuccess(...)onFailure(...)方法 –调用–> RequestFutureAdapter的onSuccess(...)onFailure(...)方法 –调用–> handleGroupMetadataResponse(...)处理方法 –调用–> 适配的RequestFuture的complete(...)raise(...)方法。

2.2. GroupCoordinatorResponse响应处理

在上述事件传递流中,handleGroupMetadataResponse(ClientResponse resp, RequestFuture<Void> future)方法用于处理GroupMetadataResponse响应对象,源码如下:

  • // AbstractCoordinator类
  • // 处理GroupMetadataResponse的入口方法
  • private void handleGroupMetadataResponse(ClientResponse resp, RequestFuture<Void> future) {
  • log.debug("Received group coordinator response {}", resp);
  • /**
  • * 检测是否已经找到GroupCoordinator且成功连接
  • * 这是由于在发送GroupCoordinatorRequest的时候并没有防止重发
  • * 因此可能会有多个GroupCoordinatorResponse
  • */
  • if (!coordinatorUnknown()) {
  • // We already found the coordinator, so ignore the request
  • // 如果是,则忽略该GroupCoordinatorResponse
  • future.complete(null);
  • } else {
  • // 将响应体封装为GroupCoordinatorResponse对象
  • GroupCoordinatorResponse groupCoordinatorResponse = new GroupCoordinatorResponse(resp.responseBody());
  • // use MAX_VALUE - node.id as the coordinator id to mimic separate connections
  • // for the coordinator in the underlying network client layer
  • // TODO: this needs to be better handled in KAFKA-1935
  • // 获取响应中的异常信息
  • Errors error = Errors.forCode(groupCoordinatorResponse.errorCode());
  • if (error == Errors.NONE) {
  • // 无异常,根据响应信息构建一个Node节点对象赋值给coordinator
  • this.coordinator = new Node(Integer.MAX_VALUE - groupCoordinatorResponse.node().id(),
  • groupCoordinatorResponse.node().host(),
  • groupCoordinatorResponse.node().port());
  • log.info("Discovered coordinator {} for group {}.", coordinator, groupId);
  • // 尝试与该GroupCoordinator节点建立连接
  • client.tryConnect(coordinator);
  • // start sending heartbeats only if we have a valid generation
  • // 启动定时心跳任务
  • if (generation > 0)
  • heartbeatTask.reset();
  • // 将正常收到的GroupCoordinatorResponse事件传播出去
  • future.complete(null);
  • } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
  • // GroupCoordinator未授权异常,传播异常
  • future.raise(new GroupAuthorizationException(groupId));
  • } else {
  • // 传播其他异常
  • future.raise(error);
  • }
  • }
  • }

该方法的源码比较简单,唯一需要注意的地方在于开头if判断的第一个分支,因为在发送GroupCoordinatorRequest的时候并没有防止重发,因此可能会有多个GroupCoordinatorResponse返回,在处理响应时先使用coordinatorUnknown()判断了当前是否已经获知GroupCoordinator的信息,如果是的话说明已经有其他的GroupCoordinatorRequest请求先得到了有效的响应并解析了,当前就不需要处理了直接future.complete(null)结束掉。

当成功解析了GroupCoordinatorResponse,就会根据GroupCoordinatorResponse中记录的GroupCoordinator所在broker节点的ID、Host、Port等信息构建一个Node节点使用成员变量coordinator记录,然后调用client.tryConnect(coordinator)尝试与该GroupCoordinator节点建立连接,该方法定义在ConsumerNetworkClient中,它的实现就比较简单了,使用的是之前讲过的NetworkClient父类的相关方法:

  • // ConsumerNetworkClient类
  • public void tryConnect(Node node) {
  • // 检查Node是否准备好,如果准备好了就尝试连接
  • client.ready(node, time.milliseconds());
  • }
  • // NetworkClient类
  • public boolean ready(Node node, long now) {
  • if (node.isEmpty())
  • throw new IllegalArgumentException("Cannot connect to empty node " + node);
  • // 检查是否可以向一个Node发送请求
  • if (isReady(node, now))
  • return true;
  • // 如果不能发送请求,则尝试建立连接
  • if (connectionStates.canConnect(node.idString(), now))
  • // if we are interested in sending to a node and we don't have a connection to it, initiate one
  • // 发起连接
  • initiateConnect(node, now);
  • return false;
  • }

通过发送GroupCoordinatorRequest请求获取到GroupCoordinator节点所在的位置并成功连接后,ensureCoordinatorReady()方法就会跳出循环了,此时KafkaConsumer的pollOnce(long timeout)就会继续往下执行,回顾源码:

  • private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
  • // TODO: Sub-requests should take into account the poll timeout (KAFKA-1894)
  • // 确保GroupCoordinator已就绪,如果没有就绪会一直阻塞
  • coordinator.ensureCoordinatorReady();
  • // 如果是AUTO_TOPICS或AUTO_PATTERN订阅模式
  • // ensure we have partitions assigned if we expect to
  • if (subscriptions.partitionsAutoAssigned())
  • // 完成Rebalance操作
  • coordinator.ensurePartitionAssignment();
  • ...
  • }

而接下来的操作,则是发送JoinGroupRequest请求并完成Rebalance操作了。

3. Rebalance操作

在KafkaConsumer的pollOnce(long timeout)方法中,确定GroupCoordinator节点并成功连接之后,会判断当前订阅模式是否是AUTO_TOPICS或AUTO_PATTERN,如果是则需要进行Rebalance操作分配分区,这部分的业务在ConsumerCoordinator的ensurePartitionAssignment()中进行:

  • // ConsumerCoordinator
  • public void ensurePartitionAssignment() {
  • // 订阅模式是否是AUTO_TOPICS或AUTO_PATTERN
  • if (subscriptions.partitionsAutoAssigned()) {
  • // Due to a race condition between the initial metadata fetch and the initial rebalance, we need to ensure that
  • // the metadata is fresh before joining initially, and then request the metadata update. If metadata update arrives
  • // while the rebalance is still pending (for example, when the join group is still inflight), then we will lose
  • // track of the fact that we need to rebalance again to reflect the change to the topic subscription. Without
  • // ensuring that the metadata is fresh, any metadata update that changes the topic subscriptions and arrives with a
  • // rebalance in progress will essentially be ignored. See KAFKA-3949 for the complete description of the problem.
  • // 是否是AUTO_PATTERN
  • if (subscriptions.hasPatternSubscription())
  • // 如有需要,更新元数据
  • client.ensureFreshMetadata();
  • // 主要的流程代码
  • ensureActiveGroup();
  • }
  • }

该方法代码非常少,仅仅做了两个判断,以确保只有在AUTO_TOPICS或AUTO_PATTERN才进行Rebalance操作,同时如果有需要,在AUTO_PATTERN模式下还会阻塞更新集群元数据;最终调用的ensureActiveGroup()方法才是Rebalance操作的主要实现:

  • // AbstractCoordinator类
  • // 是否需要执行发送JoinGroupRequest请求前的准备操作
  • private boolean needsJoinPrepare = true;
  • // 是否重新发送JoinGroupRequest请求的条件之一
  • private boolean rejoinNeeded = true;
  • public void ensureActiveGroup() {
  • /**
  • * 检测是否使用了AUTO_TOPICS或AUTO_PATTERN模式,检测rejoinNeeded和needsPartitionAssignment两个字段的值
  • * 该方法由子类 {@link ConsumerCoordinator#needRejoin} 重写了
  • * 如果不需要重新发送JoinGroupRequest,就直接返回
  • */
  • if (!needRejoin())
  • return;
  • if (needsJoinPrepare) {
  • onJoinPrepare(generation, memberId);
  • needsJoinPrepare = false;
  • }
  • while (needRejoin()) {
  • // 检测GroupCoordinator是否就绪
  • ensureCoordinatorReady();
  • // ensure that there are no pending requests to the coordinator. This is important
  • // in particular to avoid resending a pending JoinGroup request.
  • // 查看是否还有发往GroupCoordinator所在Node的请求
  • if (client.pendingRequestCount(this.coordinator) > 0) {
  • // 等待正在发送的请求发送完成并收到响应,避免重复发送JoinGroupRequest
  • client.awaitPendingRequests(this.coordinator);
  • continue;
  • }
  • // 创建JoinGroupRequest
  • RequestFuture<ByteBuffer> future = sendJoinGroupRequest();
  • // 添加RequestFutureListener
  • future.addListener(new RequestFutureListener<ByteBuffer>() {
  • @Override
  • public void onSuccess(ByteBuffer value) {
  • // handle join completion in the callback so that the callback will be invoked
  • // even if the consumer is woken up before finishing the rebalance
  • onJoinComplete(generation, memberId, protocol, value);
  • needsJoinPrepare = true;
  • // 重启心跳定时任务
  • heartbeatTask.reset();
  • }
  • @Override
  • public void onFailure(RuntimeException e) {
  • // we handle failures below after the request finishes. if the join completes
  • // after having been woken up, the exception is ignored and we will rejoin
  • }
  • });
  • // 使用ConsumerNetworkClient发送JoinGroupRequest,会阻塞直到收到JoinGroupResponse或出现异常
  • client.poll(future);
  • // 检测发送是否失败
  • if (future.failed()) {
  • // 出现异常
  • RuntimeException exception = future.exception();
  • // 当异常是未知Consumer、正在重均衡、GroupCoordinator版本对不上时,直接尝试新的请求
  • if (exception instanceof UnknownMemberIdException ||
  • exception instanceof RebalanceInProgressException ||
  • exception instanceof IllegalGenerationException)
  • continue;
  • // 是否可重试
  • else if (!future.isRetriable())
  • // 不可重试,抛出异常
  • throw exception;
  • // 可以重试,等待退避时间后再次重试
  • time.sleep(retryBackoffMs);
  • }
  • }
  • }
  • protected boolean needRejoin() {
  • return rejoinNeeded;
  • }

可见,rejoinNeeded字段是决定是否进行Rebalance操作的关键,该字段在初始化时默认为true;与之类似的还有needsJoinPrepare字段,默认也为true,用于标识是否需要进行Rebalance之前的准备操作。

3.1. Rebalance准备

由于Rebalance操作并非只会在KafkaConsumer初次消费数据时才会发生,当新的消费者加入群组、某个消费者被关闭或发生崩溃、主题分区发生变化等情况都会发生Rebalance操作,Rebalance准备阶段提供给消费者们一个机会处理这个突发情况,让消费者们能够及时处理现场,以避免因Rebalance操作引起数据丢失或错乱(主要是处理偏移量相关);这个功能主要由ConsumerRebalanceListener监听器实现。当需要进行Rebalance之前的准备操作时,会调用ConsumerCoordinator类的onJoinPrepare(int generation, String memberId)方法:

  • // ConsumerCoordinator类
  • protected void onJoinPrepare(int generation, String memberId) {
  • // commit offsets prior to rebalance if auto-commit enabled
  • // 如果设置了自动提交offset,则进行一次同步提交offset操作
  • maybeAutoCommitOffsetsSync();
  • // execute the user's callback before rebalance
  • // 调用SubscriptionState中设置的ConsumerRebalanceListener
  • ConsumerRebalanceListener listener = subscriptions.listener();
  • log.info("Revoking previously assigned partitions {} for group {}", subscriptions.assignedPartitions(), groupId);
  • try {
  • // 调用ConsumerRebalanceListener的回调方法,告诉监听者当前的分区分配方案已废除
  • Set<TopicPartition> revoked = new HashSet<>(subscriptions.assignedPartitions());
  • listener.onPartitionsRevoked(revoked);
  • } catch (WakeupException e) {
  • throw e;
  • } catch (Exception e) {
  • log.error("User provided listener {} for group {} failed on partition revocation",
  • listener.getClass().getName(), groupId, e);
  • }
  • assignmentSnapshot = null;
  • // 将needsPartitionAssignment设置为true
  • subscriptions.needReassignment();
  • }
  • private void maybeAutoCommitOffsetsSync() {
  • // 是否是同步提交offset
  • if (autoCommitEnabled) {
  • try {
  • // 提交offset
  • commitOffsetsSync(subscriptions.allConsumed());
  • } catch (WakeupException e) {
  • // rethrow wakeups since they are triggered by the user
  • throw e;
  • } catch (Exception e) {
  • // consistent with async auto-commit failures, we do not propagate the exception
  • log.warn("Auto offset commit failed for group {}: {}", groupId, e.getMessage());
  • }
  • }
  • }

该方法的代码是比较简单的,如果消费者设置了自动提交offset,这里会触发一次同步的offset提交,然后通过ConsumerRebalanceListener监听器的onPartitionsRevoked(Collection<TopicPartition> partitions)方法告诉消费者要进行Rebalance操作了,当前的分区分配方案已废除。

3.2. JoinGroupRequest请求发送

Rebalance操作主要通过JoinGroupRequest请求和JoinGroupResponse响应来完成。JoinGroupRequest是KafkaConsumer发往GroupCoordinator节点用于获取分区信息的请求对象,如果有多个KafkaConsumer同时发送JoinGroupRequest请求进行获取,最终返回的JoinGroupResponse响应中,会有一个KafkaConsumer被标识为Group Leader,其他的节点均为Group Follower,详细原理后面会讲解。

在真正发送JoinGroupRequest请求之前,会判断是否已经存在还未处理的发往GroupCoordinator的请求,如果有会进行阻塞等待该请求发送完成并收到响应,避免重复发送JoinGroupRequest。

真正发送JoinGroupRequest请求的代码位于AbstractCoordinator类的sendJoinGroupRequest()方法:

  • // AbstractCoordinator类
  • private RequestFuture<ByteBuffer> sendJoinGroupRequest() {
  • // 检查GroupCoordinator是否就绪
  • if (coordinatorUnknown())
  • return RequestFuture.coordinatorNotAvailable();
  • // send a join group request to the coordinator
  • log.info("(Re-)joining group {}", groupId);
  • // 构造JoinGroupRequest对象
  • JoinGroupRequest request = new JoinGroupRequest(
  • groupId,
  • this.sessionTimeoutMs,
  • this.memberId,
  • protocolType(),
  • metadata()); // metadata()方法标识了消费者自身支持的PartitionAssignor信息
  • log.debug("Sending JoinGroup ({}) to coordinator {}", request, this.coordinator);
  • // 使用ConsumerNetworkClient将请求暂存入unsent,等待发送
  • return client.send(coordinator, ApiKeys.JOIN_GROUP, request)
  • // 适配响应处理器
  • .compose(new JoinGroupResponseHandler());
  • }

有没有发现这部分的代码似曾相识?这里发送JoinGroupRequest的方式与前面发送GroupCoordinatorRequest的方式非常类似,最终会使用JoinGroupResponseHandler处理JoinGroupResponse响应。请求发送这部分就不多赘述了,大家可以对照前面GroupCoordinatorRequest的讲解。

3.3. JoinGroupResponse响应处理

JoinGroupResponse响应的处理由JoinGroupResponseHandler负责,它的源码如下:

  • // AbstractCoordinator类
  • // 处理JoinGroupResponse
  • private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {
  • @Override
  • public JoinGroupResponse parse(ClientResponse response) {
  • return new JoinGroupResponse(response.responseBody());
  • }
  • // 处理JoinGroupResponse的流程入口
  • @Override
  • public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
  • // 获取错误并进行错误处理
  • Errors error = Errors.forCode(joinResponse.errorCode());
  • if (error == Errors.NONE) {
  • // 无错误
  • log.debug("Received successful join group response for group {}: {}", groupId, joinResponse.toStruct());
  • AbstractCoordinator.this.memberId = joinResponse.memberId();
  • AbstractCoordinator.this.generation = joinResponse.generationId();
  • AbstractCoordinator.this.rejoinNeeded = false;
  • AbstractCoordinator.this.protocol = joinResponse.groupProtocol();
  • sensors.joinLatency.record(response.requestLatencyMs());
  • // 是否是Leader
  • if (joinResponse.isLeader()) {
  • // 如果是Leader,在该方法中会进行分区分配,并将分配结果反馈给服务端
  • onJoinLeader(joinResponse).chain(future);
  • } else {
  • // 如果是Follower,也会发送进行同步分区分配的请求
  • onJoinFollower().chain(future);
  • }
  • } else if (error == Errors.GROUP_LOAD_IN_PROGRESS) {
  • log.debug("Attempt to join group {} rejected since coordinator {} is loading the group.", groupId,
  • coordinator);
  • // backoff and retry
  • // 抛出异常,GroupCoordinator正在加载组
  • future.raise(error);
  • } else if (error == Errors.UNKNOWN_MEMBER_ID) {
  • // reset the member id and retry immediately
  • AbstractCoordinator.this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
  • log.debug("Attempt to join group {} failed due to unknown member id.", groupId);
  • // 抛出异常,未知Consumer ID
  • future.raise(Errors.UNKNOWN_MEMBER_ID);
  • } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
  • || error == Errors.NOT_COORDINATOR_FOR_GROUP) {
  • // re-discover the coordinator and retry with backoff
  • // GroupCoordinator不可用,没有对应组的GroupCoordinator
  • coordinatorDead();
  • log.debug("Attempt to join group {} failed due to obsolete coordinator information: {}", groupId, error.message());
  • // 抛出异常
  • future.raise(error);
  • } else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL
  • || error == Errors.INVALID_SESSION_TIMEOUT
  • || error == Errors.INVALID_GROUP_ID) {
  • // 协议版本对不上,可能是由于消费者和服务端使用的Kafka版本不同
  • // 会话过期
  • // 无效的组ID
  • // log the error and re-throw the exception
  • log.error("Attempt to join group {} failed due to fatal error: {}", groupId, error.message());
  • // 抛出异常
  • future.raise(error);
  • } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
  • // 抛出异常,组授权失败
  • future.raise(new GroupAuthorizationException(groupId));
  • } else {
  • // unexpected error, throw the exception
  • // 其他异常
  • future.raise(new KafkaException("Unexpected error in join group response: " + error.message()));
  • }
  • }
  • }

这里与前面的RequestFutureAdapter有一点不一样;JoinGroupResponseHandler继承自CoordinatorResponseHandler,该类是AbstractCoordinator的内部类,源码如下:

  • // AbstractCoordinator的内部类
  • protected abstract class CoordinatorResponseHandler<R, T>
  • extends RequestFutureAdapter<ClientResponse, T> {
  • protected ClientResponse response;
  • // 对ClientResponse进行解析
  • public abstract R parse(ClientResponse response);
  • // 对解析后的响应进行处理
  • public abstract void handle(R response, RequestFuture<T> future);
  • @Override
  • public void onFailure(RuntimeException e, RequestFuture<T> future) {
  • // mark the coordinator as dead
  • if (e instanceof DisconnectException)
  • coordinatorDead();
  • future.raise(e);
  • }
  • @Override
  • public void onSuccess(ClientResponse clientResponse, RequestFuture<T> future) {
  • try {
  • this.response = clientResponse;
  • // 解析响应
  • R responseObj = parse(clientResponse);
  • // 处理响应
  • handle(responseObj, future);
  • } catch (RuntimeException e) {
  • if (!future.isDone())
  • future.raise(e);
  • }
  • }
  • }

回顾一下ClientResponse处理的事件传递流:当ClientRequest请求后,得到响应时会回调ClientRequest内部包装的RequestFutureCompletionHandler对象的相应的onComplete(...)方法 –调用–> RequestFutureCompletionHandler的complete(...)raise(...)方法 –调用–> RequestFutureCompletionHandler的fireSuccess(...)fireFailure(...)方法 –调用–> RequestFutureListener的onSuccess(...)onFailure(...)方法 –调用–> RequestFutureAdapter的onSuccess()onFailure()方法;而此处的RequestFutureAdapter即是JoinGroupResponseHandler对象,因此调用的onSuccess(...)onFailure(...)方法来自于它的父类CoordinatorResponseHandler。

CoordinatorResponseHandler的onSuccess(...)方法会调用parse(clientResponse)解析ClientResponse响应,然后调用handle(responseObj, future)处理解析后的响应,这两个方法又在其子类JoinGroupResponseHandler中实现。

JoinGroupResponseHandler的parse(ClientResponse response)操作比较简单,就是根据ClientResponse的数据构造JoinGroupResponse对象,handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future)方法里才是处理JoinGroupResponse对象的主要代码。该方法会从JoinGroupResponse对象中获取Member ID、Generation年代信息以及分区分配协议。同时通过isLeader()来判断自己是否被指派为了Group Leader,分别调用onJoinLeader(joinResponse).chain(future)onJoinFollower().chain(future)进行处理。

3.4. 分区分配

onJoinLeader(joinResponse).chain(future)onJoinFollower().chain(future)两个方法的实现非常类似,它们定义在AbstractCoordinator类中:

  • // AbstractCoordinator类
  • private RequestFuture<ByteBuffer> onJoinFollower() {
  • // send follower's sync group with an empty assignment
  • // 创建follower的同步分区信息,参数groupAssignment空字典
  • SyncGroupRequest request = new SyncGroupRequest(groupId, generation,
  • memberId, Collections.<String, ByteBuffer>emptyMap());
  • log.debug("Sending follower SyncGroup for group {} to coordinator {}: {}", groupId, this.coordinator, request);
  • // 发送同步分区的请求
  • return sendSyncGroupRequest(request);
  • }
  • private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
  • try {
  • // perform the leader synchronization and send back the assignment for the group
  • // 进行分区分配,最终会返回的分配结果,其中键是ConsumerID,值是序列化后的Assignment对象
  • Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.leaderId(), joinResponse.groupProtocol(),
  • joinResponse.members());
  • // 将分组信息包装为SyncGroupRequest
  • SyncGroupRequest request = new SyncGroupRequest(groupId, generation, memberId, groupAssignment);
  • log.debug("Sending leader SyncGroup for group {} to coordinator {}: {}", groupId, this.coordinator, request);
  • // 发送分组信息,并返回
  • return sendSyncGroupRequest(request);
  • } catch (RuntimeException e) {
  • return RequestFuture.failure(e);
  • }
  • }

这两个方法最终都通过sendSyncGroupRequest(request)向GroupCoordinator节点发送了SyncGroupRequest请求,但Group Leader会额外负责分区分配操作,通过子类ConsumerCoordinator实现的performAssignment(String leaderId, String assignmentStrategy, Map<String, ByteBuffer> allSubscriptions)进行分区分配并得到分配的结果,并通过groupAssignment参数中包装在SyncGroupRequest请求中将其报告给GroupCoordinator,而onJoinFollower()中构造SyncGroupRequest请求时该参数传入的是Collections.emptyMap()空字典。

由于Group Leader和Group Follower的处理方式非常类似,我们这里只分析onJoinLeader(JoinGroupResponse joinResponse)的源码,其内部调用的performAssignment(String leaderId, String assignmentStrategy, Map<String, ByteBuffer> allSubscriptions)方法的源码如下:

  • /**
  • *
  • * @param leaderId The id of the leader (which is this member)
  • * @param assignmentStrategy 分配策略,这个参数传入的是joinResponse.groupProtocol()
  • * @param allSubscriptions 所有的消费者
  • * @return
  • */
  • @Override
  • protected Map<String, ByteBuffer> performAssignment(String leaderId,
  • String assignmentStrategy,
  • Map<String, ByteBuffer> allSubscriptions) {
  • // 根据group_protocol字段指定分区的分配策略,查找对应的PartitionAssignor对象
  • PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
  • if (assignor == null)
  • // PartitionAssignor为空将会抛出IllegalStateException异常
  • throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
  • Set<String> allSubscribedTopics = new HashSet<>();
  • Map<String, Subscription> subscriptions = new HashMap<>();
  • /**
  • * allSubscriptions是GroupCoordinator返回的Group组内的Member信息,遍历该字典,进行数据整理
  • * 键是Member ID,值是序列化后的Subscription对象,其中记录了该Member订阅的主题等信息
  • */
  • for (Map.Entry<String, ByteBuffer> subscriptionEntry : allSubscriptions.entrySet()) {
  • // 反序列化值,反序列化为Subscription对象,保存了MemberID对应的消费者所订阅的Topic信息
  • Subscription subscription = ConsumerProtocol.deserializeSubscription(subscriptionEntry.getValue());
  • // 添加到subscriptions字典中进行记录
  • subscriptions.put(subscriptionEntry.getKey(), subscription);
  • // 将该Member订阅的主题添加到allSubscribedTopics集合进行保存
  • allSubscribedTopics.addAll(subscription.topics());
  • }
  • // the leader will begin watching for changes to any of the topics the group is interested in,
  • // which ensures that all metadata changes will eventually be seen
  • /**
  • * 注意,由于本方法是由父类的onJoinLeader()方法内调用的,
  • * 所以此时的this对象即是leader的ConsumerCoordinator对象
  • * 此处是让leader记录所有的订阅主题信息
  • */
  • this.subscriptions.groupSubscribe(allSubscribedTopics);
  • // 更新元数据中的主题信息
  • metadata.setTopics(this.subscriptions.groupSubscription());
  • // update metadata (if needed) and keep track of the metadata used for assignment so that
  • // we can check after rebalance completion whether anything has changed
  • // 更新集群元数据信息
  • client.ensureFreshMetadata();
  • // 存储Metadata快照(通过Metadata的Listener创建的)
  • assignmentSnapshot = metadataSnapshot;
  • log.debug("Performing assignment for group {} using strategy {} with subscriptions {}",
  • groupId, assignor.name(), subscriptions);
  • // 进行分区分配
  • Map<String, Assignment> assignment = assignor.assign(metadata.fetch(), subscriptions);
  • log.debug("Finished assignment for group {}: {}", groupId, assignment);
  • // 将分区分配的结果进行序列化存入groupAssignment字典
  • Map<String, ByteBuffer> groupAssignment = new HashMap<>();
  • for (Map.Entry<String, Assignment> assignmentEntry : assignment.entrySet()) {
  • ByteBuffer buffer = ConsumerProtocol.serializeAssignment(assignmentEntry.getValue());
  • groupAssignment.put(assignmentEntry.getKey(), buffer);
  • }
  • // 返回序列化后的分区分配结果
  • return groupAssignment;
  • }

该方法首先调用lookupAssignor(String name)通过指定的分区分配策略在本地查找对应的分区器,这里的分区分配策略是从JoinGroupResponse中得到的;lookupAssignor(String name)的源码如下:

  • // ConsumerCoordinator类
  • /**
  • * 在消费者发送的JoinGroupRequest请求中包含了消费者自身支持的PartitionAssignor信息,
  • * GroupCoordinator从所有消费者都支持的分配策略中选择一个,通知Leader使用此分配策略进行分区分配。
  • * 此字段的值通过partition.assignment.strategy参数配置,可以配置多个。
  • */
  • private final List<PartitionAssignor> assignors;
  • private PartitionAssignor lookupAssignor(String name) {
  • // 遍历所有的PartitionAssignor
  • for (PartitionAssignor assignor : this.assignors) {
  • // 匹配对应的PartitionAssignor:range或roundrobin
  • if (assignor.name().equals(name))
  • return assignor;
  • }
  • return null;
  • }

该方法的实现比较简单,遍历assignors数组,根据GroupCoordinator指定的分配策略筛选分区分配器。默认情况下,Kafka内置了两个分区分配器类:RangeAssignor(name()返回字符串range)和RoundRobinAssignor(name()返回字符串roundrobin),它们都继承自AbstractPartitionAssignor类,而AbstractPartitionAssignor类则默认实现了PartitionAssignor接口的一些必要的方法。前面的文章里提到过Range和Round Robin这两个分配策略,这两个类就是对应的实现;我们回顾一下两种策略的具体规则:

  • Range:该策略会把主题的若干个连续的分区分配给消费者。假设消费者C1和消费者C2同时订阅了主题T1和主题T2,并且每个主题有3个分区。那么消费者C1有可能分配到这两个主题的分区0和分区1,而消费者C2分配到这两个主题的分区2。因为每个主题拥有奇数个分区,而分配是在主题内独立完成的,第一个消费者最后分配到比第二个消费者更多的分区。只要使用了Range策略,而且分区数量无法被消费者数量整除,就会出现这种情况。
  • RoundRobin:该策略把主题的所有分区逐个分配给消费者。如果使用RoundRobin策略来给消费者C1和消费者C2分配分区,那么消费者C1将分到主题T1的分区0和分区2以及主题T2的分区1,消费者C2将分配到主题T1的分区1以及主题T2的分区0和分区2。一般来说,如果所有消费者都订阅相同的主题(这种情况很常见),RoundRobin策略会给所有消费者分配相同数量的分区(或最多就差一个分区)。

确定分区分配器后,会对GroupCoordinator返回的Group Memeber信息进行整理得到Map类型的字典subscriptions对象,这里的Subscription类型对象是为了用户增强对分配结果的控制,会额外在主题订阅信息之外封装一些影响分配的用户自定义信息,例如,用户自定义数据可以是每个消费者的权重。Subscription的声明如下:

  • // PartitionAssignor类的内部类
  • class Subscription {
  • // 订阅的Topic集合
  • private final List<String> topics;
  • // 用户自定义数据
  • private final ByteBuffer userData;
  • ...
  • }

其中,topics集合表示某Member订阅的Topic集合,userData表示用户自定义的数据。

接下来,Metadata中保存的Cluster信息以及subscriptions对象会传入assignor.assign(metadata.fetch(), subscriptions)进行分区分配,这里调用的首先是父类AbstractPartitionAssignor类的assign(Cluster metadata, Map<String, Subscription> subscriptions)方法进行数据预处理:

  • public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) {
  • Set<String> allSubscribedTopics = new HashSet<>();
  • Map<String, List<String>> topicSubscriptions = new HashMap<>();
  • // 解析subscriptions集合,去除userData信息
  • for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet()) {
  • List<String> topics = subscriptionEntry.getValue().topics();
  • // 只取出了topics数据
  • allSubscribedTopics.addAll(topics);
  • topicSubscriptions.put(subscriptionEntry.getKey(), topics);
  • }
  • // 统计每个Topic的分区个数
  • Map<String, Integer> partitionsPerTopic = new HashMap<>();
  • for (String topic : allSubscribedTopics) {
  • // 从集群元数据中获取Topic对应的分区个数
  • Integer numPartitions = metadata.partitionCountForTopic(topic);
  • if (numPartitions != null && numPartitions > 0)
  • // 记录到Map
  • partitionsPerTopic.put(topic, numPartitions);
  • else
  • log.debug("Skipping assignment for topic {} since no metadata is available", topic);
  • }
  • /**
  • * 将分区分配的具体逻辑委托给assign()重载方法,由子类重写实现;传入的参数结构:
  • * partitionsPerTopic:集群元数据中保存的信息,键为Topic名称,值为该Topic拥有的分区数;
  • * topicSubscriptions:Group里每个Member订阅的主题,键为Member ID,值为订阅的主题集合
  • */
  • Map<String, List<TopicPartition>> rawAssignments = assign(partitionsPerTopic, topicSubscriptions);
  • // this class has maintains no user data, so just wrap the results
  • // 整理分区分配结果,即从Map<String, List<TopicPartition>>转换为<String, Assignment>
  • Map<String, Assignment> assignments = new HashMap<>();
  • for (Map.Entry<String, List<TopicPartition>> assignmentEntry : rawAssignments.entrySet())
  • assignments.put(assignmentEntry.getKey(), new Assignment(assignmentEntry.getValue()));
  • return assignments;
  • }

具体的分区分配工作还是会委托给具体的子类中重写的assign(Map<String, Integer> partitionsPerTopic, Map<String, List<String>> subscriptions)方法,这里分开讲解RangeAssignor和RoundRobinAssignor中该方法的典型实现:

  1. RangeAssignor的实现:
  • /**
  • * 分配分区
  • * @param partitionsPerTopic 集群元数据中保存的信息,键为Topic名称,值为该Topic拥有的分区数;
  • * @param subscriptions Group里每个Member订阅的主题,键为Member ID,值为订阅的主题集合
  • * @return 分区分配信息,键为MemberID, 值为分配给该MemberID的分区TopicPartition对象的集合
  • */
  • @Override
  • public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
  • Map<String, List<String>> subscriptions) {
  • /**
  • * 转换Map结构:Map<String MemberID, List 主题集合> -> Map<String 主题名称, List MemberID集合>
  • * 从<MemberID, 该Member订阅的主题名称集合> 转换为<主题名称, 订阅该主题的MemberID集合>
  • */
  • Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions);
  • // Map<String MemberID, List 分配到的的主题分区集合>
  • Map<String, List<TopicPartition>> assignment = new HashMap<>();
  • // 遍历subscriptions中所有的MemberID,为每个Member创建一个ArrayList
  • for (String memberId : subscriptions.keySet())
  • assignment.put(memberId, new ArrayList<TopicPartition>());
  • // 遍历,每个键值对结构是:<主题名称, 订阅该主题的MemberID集合>
  • for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {
  • // 主题名称
  • String topic = topicEntry.getKey();
  • // 订阅该主题的MemberID集合
  • List<String> consumersForTopic = topicEntry.getValue();
  • // 获取主题的分区数
  • Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
  • if (numPartitionsForTopic == null)
  • continue;
  • // 对订阅该主题的所有Member进行排序
  • Collections.sort(consumersForTopic);
  • // 每个Member可以分到的分区数
  • int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
  • // 平均分配后额外多出的分区数
  • int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
  • /**
  • * 根据主题与其分区数得到TopicPartition集合,实现比较简单,例如主题名称为test,分区数为3,最终会得到:
  • * TopicPartition("test", 0), TopicPartition("test", 1), TopicPartition("test", 2) 三个对象组成的集合
  • */
  • List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
  • /**
  • * 根据Member的数量遍历相应的次数,分配当前topic的分区。假设test主题有7个分区,分配给2个Member,则有:
  • * numPartitionsPerConsumer = 7 / 2 = 3
  • * consumersWithExtraPartition = 7 % 2 = 1
  • * 1. i = 0,分配第1个Member的分区
  • * start = 3 * 0 + Math.min(0, 1) = 0
  • * length = 3 + ((0 + 1) > 1 ? 0 : 1) = 4
  • * 因此第1个Member得到: 0, 1, 2, 3 四个分区
  • * 2. i = 1, 分配第2个Member的分区
  • * start = 3 * 1 + Math.min(1, 1) = 4
  • * length = 3 + ((1 + 1) > 1 ? 0 : 1) = 3
  • * 因此第2个Member得到: 4, 5, 6 三个分区
  • */
  • for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
  • // 起始索引
  • int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
  • // 长度
  • int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
  • // 取partitions中范围为[start, start + length)子列表作为该MemberID分配到的分区信息
  • assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));
  • }
  • }
  • /**
  • * 订阅多个主题的情况:假设test1主题有7个分区,test2主题有3个分区,分配给2个Member,则有:
  • * 1. 对于test1主题
  • * numPartitionsPerConsumer = 7 / 2 = 3
  • * consumersWithExtraPartition = 7 % 2 = 1
  • * 1. i = 0,分配第1个Member的分区
  • * start = 3 * 0 + Math.min(0, 1) = 0
  • * length = 3 + ((0 + 1) > 1 ? 0 : 1) = 4
  • * 因此第1个Member得到: 0, 1, 2, 3 四个分区
  • * 2. i = 1, 分配第2个Member的分区
  • * start = 3 * 1 + Math.min(1, 1) = 4
  • * length = 3 + ((1 + 1) > 1 ? 0 : 1) = 3
  • * 因此第2个Member得到: 4, 5, 6 三个分区
  • * 2. 对于test2主题
  • * numPartitionsPerConsumer = 3 / 2 = 1
  • * consumersWithExtraPartition = 3 % 2 = 1
  • * 1. i = 0,分配第1个Member的分区
  • * start = 1 * 0 + Math.min(0, 1) = 0
  • * length = 1 + ((0 + 1) > 1 ? 0 : 1) = 2
  • * 因此第1个Member得到: 0, 1 两个分区
  • * 2. i = 1, 分配第2个Member的分区
  • * start = 1 * 1 + Math.min(1, 1) = 2
  • * length = 1 + ((1 + 1) > 1 ? 0 : 1) = 1
  • * 因此第2个Member得到: 2 一个分区
  • * 最终结果:
  • * 第一个Member:test1-[0, 1, 2, 3], test2-[0, 1]
  • * 第二个Member:test1-[4, 5, 6], test2-[2]
  • */
  • return assignment;
  • }

核心思想:将每个Topic的分区按照编号进行排序,按照范围段分配给多个Member。

  1. RoundRobinAssignor的实现:
  • /**
  • * 分配分区
  • * @param partitionsPerTopic 集群元数据中保存的信息,键为Topic名称,值为该Topic拥有的分区数;
  • * @param subscriptions Group里每个Member订阅的主题,键为Member ID,值为订阅的主题集合
  • * @return 分区分配信息,键为MemberID, 值为分配给该MemberID的分区TopicPartition对象的集合
  • */
  • @Override
  • public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
  • Map<String, List<String>> subscriptions) {
  • Map<String, List<TopicPartition>> assignment = new HashMap<>();
  • // 遍历subscriptions中所有的MemberID,为每个Member创建一个ArrayList
  • for (String memberId : subscriptions.keySet())
  • assignment.put(memberId, new ArrayList<TopicPartition>());
  • // 根据所有MemberID创建一个无限循环的迭代器,会对所有MemberID构成的列表进行从头至尾循环迭代
  • CircularIterator<String> assigner = new CircularIterator<>(Utils.sorted(subscriptions.keySet()));
  • /**
  • * 订阅多个主题的情况:假设test1主题有7个分区,test2主题有3个分区,分配给2个Member,则有:
  • * 1. 先对主题的分区进行排序,得到:
  • * TP(test1, 0), TP(test1, 1), TP(test1, 2), TP(test1, 3), TP(test1, 4), TP(test1, 5), TP(test1, 6),
  • * TP(test2, 0), TP(test2, 1), TP(test2, 2)
  • * 2. 然后对Member进行排序,得到:Member-1,Member-2
  • * 3. 进行轮询分配:
  • * TP(test1, 0):Member-1
  • * TP(test1, 1):Member-2
  • * TP(test1, 2):Member-1
  • * TP(test1, 3):Member-2
  • * TP(test1, 4):Member-1
  • * TP(test1, 5):Member-2
  • * TP(test1, 6):Member-1
  • * TP(test2, 0):Member-2
  • * TP(test2, 1):Member-1
  • * TP(test2, 2):Member-2
  • * 4. 最终结果:
  • * 第一个Member:test1-[0, 2, 4, 6], test2-[1]
  • * 第二个Member:test1-[1, 3, 5], test2-[0, 2]
  • */
  • for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) {
  • // 获取主题
  • final String topic = partition.topic();
  • // 获取一个MemberID,查看该Member是否订阅了topic主题,如果没有就后移
  • while (!subscriptions.get(assigner.peek()).contains(topic))
  • assigner.next();
  • // 直到循环迭代遇到订阅了topic的Member,将该TopicPartition分配给这个Member
  • assignment.get(assigner.next()).add(partition);
  • }
  • return assignment;
  • }
  • /**
  • * 对主题及其分区进行排序
  • * @param partitionsPerTopic 集群元数据中保存的信息,键为Topic名称,值为该Topic拥有的分区数;
  • * @param subscriptions Group里每个Member订阅的主题,键为Member ID,值为订阅的主题集合
  • * @return 最终会得到所有订阅的主题及分区的集合,里面都是TopicPartition,且严格按照Topic名称、分区编号进行排序
  • */
  • public List<TopicPartition> allPartitionsSorted(Map<String, Integer> partitionsPerTopic,
  • Map<String, List<String>> subscriptions) {
  • // 使用TreeSet对Member订阅的所有主题集合进行排序
  • SortedSet<String> topics = new TreeSet<>();
  • for (List<String> subscription : subscriptions.values())
  • topics.addAll(subscription);
  • // 定义集合
  • List<TopicPartition> allPartitions = new ArrayList<>();
  • // 遍历排序后的主题集合
  • for (String topic : topics) {
  • // 获取主题的分区数
  • Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
  • if (numPartitionsForTopic != null)
  • // 向allPartitions添加主题的所有分区对应的TopicPartition对象
  • allPartitions.addAll(AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic));
  • }
  • /**
  • * 最终allPartitions集合中的元素是TopicPartition,且严格按照Topic名称、分区编号进行排序
  • * 假设有test1,test2两个主题,test1有3个分区,test2有2个分区,得到的结果为:
  • * TP(test1, 0), TP(test1, 1), TP(test1, 2), TP(test2, 0), TP(test1, 1)
  • */
  • return allPartitions;
  • }

核心思想:将分区按照Topic编号和Partition编号进行排序,依次轮转分配给所有Member。

关于这两个分区分配器的分配逻辑上面的源码中的注释已经讲解得很清楚了,读者可以对照源码仔细分析。

3.5. SyncGroupRequest请求发送

继续回到onJoinLeader(JoinGroupResponse joinResponse)方法后续的代码流程。在使用performAssignment(String leaderId, String assignmentStrategy, Map<String, ByteBuffer> allSubscriptions)方法获取了分区结果之后,会构造SyncGroupRequest请求发送给GroupCoordinator节点。身份为Group Leader的Consumer发送时会带有分区结果信息。发送该请求的具体实现在AbstractCoordinator的sendSyncGroupRequest(SyncGroupRequest request)方法中:

  • private RequestFuture<ByteBuffer> sendSyncGroupRequest(SyncGroupRequest request) {
  • // 检查GroupCoordinator是否就绪
  • if (coordinatorUnknown())
  • return RequestFuture.coordinatorNotAvailable();
  • // 将发送分区分配信息的请求暂存到unsent集合,
  • return client.send(coordinator, ApiKeys.SYNC_GROUP, request)
  • .compose(new SyncGroupResponseHandler());
  • }

实现非常简单,此处也用到了适配器模式对响应进行处理。

3.6. SyncGroupResponse响应处理

SyncGroupResponseHandler是SyncGroupResponse响应处理器,工作方式与前面讲过的JoinGroupResponseHandler一样,它的源码如下:

  • // AbstractCoordinator类
  • // 处理SyncGroupResponse响应的方法
  • private class SyncGroupResponseHandler extends CoordinatorResponseHandler<SyncGroupResponse, ByteBuffer> {
  • @Override
  • public SyncGroupResponse parse(ClientResponse response) {
  • return new SyncGroupResponse(response.responseBody());
  • }
  • // 处理入口
  • @Override
  • public void handle(SyncGroupResponse syncResponse,
  • RequestFuture<ByteBuffer> future) {
  • // 获取异常
  • Errors error = Errors.forCode(syncResponse.errorCode());
  • if (error == Errors.NONE) {
  • // 无异常
  • log.info("Successfully joined group {} with generation {}", groupId, generation);
  • sensors.syncLatency.record(response.requestLatencyMs());
  • // 调用RequestFuture.complete()方法传播分区分配结果
  • future.complete(syncResponse.memberAssignment());
  • } else {
  • // 有异常
  • // 将rejoinNeeded置为true
  • AbstractCoordinator.this.rejoinNeeded = true;
  • if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
  • // 组授权失败
  • future.raise(new GroupAuthorizationException(groupId));
  • } else if (error == Errors.REBALANCE_IN_PROGRESS) {
  • log.debug("SyncGroup for group {} failed due to coordinator rebalance", groupId);
  • // 正在Rebalance过程中
  • future.raise(error);
  • } else if (error == Errors.UNKNOWN_MEMBER_ID
  • || error == Errors.ILLEGAL_GENERATION) {
  • log.debug("SyncGroup for group {} failed due to {}", groupId, error);
  • AbstractCoordinator.this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
  • // 无效的Member ID,或无效的年代信息
  • future.raise(error);
  • } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
  • || error == Errors.NOT_COORDINATOR_FOR_GROUP) {
  • log.debug("SyncGroup for group {} failed due to {}", groupId, error);
  • // GroupCoordinator不可以用,没有Group对应的Coordinator
  • coordinatorDead();
  • future.raise(error);
  • } else {
  • // 其他异常
  • future.raise(new KafkaException("Unexpected error from SyncGroup: " + error.message()));
  • }
  • }
  • }
  • }
  • // SyncGroupResponse类
  • private final ByteBuffer memberState;
  • public ByteBuffer memberAssignment() {
  • return memberState;
  • }

代码实现比较简单,处理逻辑基本与JoinGroupResponseHandler一样。这里我们关注无异常的情况,此时会调用future.complete(syncResponse.memberAssignment())进行分配结果的传播。注意,此时无论是Group Leader还是Group Follower,得到的SyncGroupResponse中都包含了分区信息。

3.7. 响应传播

我们回顾JoinGroupResponseHandler的部分源码:

  • // 处理JoinGroupResponse
  • private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {
  • ...
  • // 处理JoinGroupResponse的流程入口
  • @Override
  • public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
  • // 获取错误并进行错误处理
  • ...
  • // 无错误
  • ...
  • // 是否是Leader
  • if (joinResponse.isLeader()) {
  • // 如果是Leader,在该方法中会进行分区分配,并将分配结果反馈给服务端
  • onJoinLeader(joinResponse).chain(future);
  • } else {
  • // 如果是Follower,也会发送进行同步分区分配的请求
  • onJoinFollower().chain(future);
  • }
  • ...
  • }
  • }

在收到JoinGroupResponse之后,交给JoinGroupResponseHandler处理,此时会分别使用onJoinLeader(JoinGroupResponse joinResponse)onJoinFollower()针对不同身份发送SyncGroupRequest。注意,在调用这两个方法时,还使用了chain(future)将JoinGroupResponseHandler的RequestFuture回调对象绑定到了onJoinLeader(JoinGroupResponse joinResponse)onJoinFollower()的RequestFuture回调对象上了,具体方式是使用RequestFuture的监听器:

  • // RequestFuture类
  • public void chain(final RequestFuture<T> future) {
  • // 添加监听器
  • addListener(new RequestFutureListener<T>() {
  • @Override
  • public void onSuccess(T value) {
  • // 将value传递给下一个RequestFuture
  • future.complete(value);
  • }
  • @Override
  • public void onFailure(RuntimeException e) {
  • // 将异常传递给下一个RequestFuture
  • future.raise(e);
  • }
  • });
  • }

以这种方式可以实现,在SyncGroupResponseHandler调用自己的RequestFuture回调对象的方法时,会通过监听器机制将对应的事件传递给JoinGroupResponseHandler的RequestFuture回调对象的对应方法,这是典型的责任链设计模式的实现。最终将SyncGroupResponse响应对象里的memberState字段传递给了该RequestFuture的complete(T value)方法;此时我们回顾最开始发送JoinGroupRequest请求代码,位于AbstractCoordinator类的ensureActiveGroup()方法中:

  • // AbstractCoordinator类
  • public void ensureActiveGroup() {
  • ...
  • // 发送JoinGroupRequest,返回对应的RequestFuture对象
  • RequestFuture<ByteBuffer> future = sendJoinGroupRequest();
  • // 给RequestFuture对象添加RequestFutureListener
  • future.addListener(new RequestFutureListener<ByteBuffer>() {
  • @Override
  • public void onSuccess(ByteBuffer value) {
  • // handle join completion in the callback so that the callback will be invoked
  • // even if the consumer is woken up before finishing the rebalance
  • onJoinComplete(generation, memberId, protocol, value);
  • needsJoinPrepare = true;
  • heartbeatTask.reset();
  • }
  • @Override
  • public void onFailure(RuntimeException e) {
  • // we handle failures below after the request finishes. if the join completes
  • // after having been woken up, the exception is ignored and we will rejoin
  • }
  • });
  • ...
  • }

这里的future即是发送JoinGroupRequest时产生的,由JoinGroupResponseHandler连接到SyncGroupResponseHandler中的RequestFuture,而此处还给它添加了一个匿名监听器,也就是说,在该RequestFuture的方法被调用,还会调用监听器对应的方法。其实在SyncGroupResponse响应对象里的memberState字段传递给了该RequestFuture的complete(T value)方法后,最后会被这个匿名监听器的onSuccess(ByteBuffer value)方法处理,它会调用ConsumerCoordinator类的onJoinComplete(int generation, String memberId, String assignmentStrategy, ByteBuffer assignmentBuffer)方法:

  • // ConsumerCoordinator类
  • // 处理从SyncGroupResponse中的到的分区分配结果
  • @Override
  • protected void onJoinComplete(int generation, String memberId, String assignmentStrategy, ByteBuffer assignmentBuffer) {
  • // if we were the assignor, then we need to make sure that there have been no metadata updates
  • // since the rebalance begin. Otherwise, we won't rebalance again until the next metadata change
  • // 对比记录的Metadata快照和最新的Metadata快照,如果不一致则说明分配过程中出现了Topic增删或分区数量的变化
  • if (assignmentSnapshot != null && !assignmentSnapshot.equals(metadataSnapshot)) {
  • // 去除groupSubscription中除subscription集合所有元素之外的元素,将needsPartitionAssignment置为true
  • subscriptions.needReassignment();
  • return;
  • }
  • // 获取指定的分区器
  • PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
  • if (assignor == null)
  • throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
  • // 获取分区分配信息
  • Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer);
  • // set the flag to refresh last committed offsets
  • // 设置needsFetchCommittedOffsets为true
  • subscriptions.needRefreshCommits();
  • // update partition assignment
  • // 根据新的分区信息更新SubscriptionState的assignment集合
  • subscriptions.assignFromSubscribed(assignment.partitions());
  • // give the assignor a chance to update internal state based on the received assignment
  • // 将assignment传递给onAssignment()方法,让分区分配器有机会更新内部状态
  • // 该方法默认是空实现,用户自定义分区器时可以重写该方法
  • assignor.onAssignment(assignment);
  • // reschedule the auto commit starting from now
  • // 如果是自动提交offset,重新规划自动提交周期
  • if (autoCommitEnabled)
  • autoCommitTask.reschedule();
  • // execute the user's callback after rebalance
  • // 获取Rebalance监听器
  • ConsumerRebalanceListener listener = subscriptions.listener();
  • log.info("Setting newly assigned partitions {} for group {}", subscriptions.assignedPartitions(), groupId);
  • try {
  • Set<TopicPartition> assigned = new HashSet<>(subscriptions.assignedPartitions());
  • // 调用监听器
  • listener.onPartitionsAssigned(assigned);
  • } catch (WakeupException e) {
  • throw e;
  • } catch (Exception e) {
  • log.error("User provided listener {} for group {} failed on partition assignment",
  • listener.getClass().getName(), groupId, e);
  • }
  • }
  • // SubscriptionState类
  • public void assignFromSubscribed(Collection<TopicPartition> assignments) {
  • // 遍历传入的assignments,判断当前subscription是否包含指定的主题
  • for (TopicPartition tp : assignments)
  • if (!this.subscription.contains(tp.topic()))
  • // 如果不包含,抛出异常
  • throw new IllegalArgumentException("Assigned partition " + tp + " for non-subscribed topic.");
  • // 清空assignment
  • this.assignment.clear();
  • // 遍历assignments,将TopicPartition作为键,新的TopicPartitionState对象作为值,添加到assignment字典中
  • for (TopicPartition tp: assignments)
  • addAssignedPartition(tp);
  • this.needsPartitionAssignment = false;
  • }
  • private void addAssignedPartition(TopicPartition tp) {
  • this.assignment.put(tp, new TopicPartitionState());
  • }

该方法比较简单,就是对一些状态进行更新,分区信息会被存储到SubscriptionState对象的assignment属性中,同时调用ConsumerRebalanceListener监听器的onPartitionsAssigned(Collection<TopicPartition> partitions)方法传入分区信息告诉监听者分区更新了。这样一来,所有的Consumer都知道分配给自己的主题及分区了,Rebalance操作也就完成了。

注:Rebalance本质上是一组协议。GroupCoordinator与ConsumerCoordinator共同使用它来完成Rebalance。目前Kafka提供了5个协议来处理与Consumer Group Coordination相关的问题:
- Heartbeat请求:Consumer需要定期给Coordinator发送心跳来表明自己还活着。
- JoinGroup请求:成员请求加入组。
- SyncGroup请求:Group Leader把分配方案告诉组内所有成员。
- LeaveGroup请求:主动告诉Coordinator我要离开Consumer Group。
- DescribeGroup请求:显示组的所有信息,包括成员信息,协议名称,分配方案,订阅信息等。通常该请求是给管理员使用。