大数据
流式处理
Kafka
源码解析

Kafka系列 05 - 生产者源码分析 01:元数据更新

简介:KafkaProducer对元数据的管理和操作,是先行于数据发送环节的。元数据的完整、正确与否决定了是数据发送先决条件。KafkaProducer对元数据的更新操作与消息数据的发送操作虽然都需要经过网络I/O,但二者的实现略有差异

1. 简介

本文将以KafkaProducer发送消息的过程为例,介绍KafkaProducer的工作原理和源码构成。

KafkaProducer发送消息的大致流程是:处理拦截器操作 -> 等待更新集群元数据 -> 序列化键和值 -> 分区 -> 将消息添加到RecordAccumulator缓冲 -> 唤醒Sender线程发送缓冲中的消息。可以发现,其中元数据的更新是先行于真正的消息发送的,因此本文将先介绍元数据更新的详细流程。

2. 集群数据的组织

KafkaProducer如果需要发送消息数据到集群中,首先需要清楚集群的基本情况,包括某个Topic中有哪几个分区,每个分区的Leader副本分配在哪个节点上,Follower副本分配在哪些节点上,哪些副本在ISR集合中以及这些节点的网络地址、端口等信息。在org.apache.kafka.common包中,使用Node、TopicPartition、PartitionInfo这三个类封装了Kafka集群的相关元数据。这些元数据保存在了Cluster这个类中,并按照不同的映射方式进行存放,方便查询。KafkaProducer通过更新机制得到的Metadata元数据记录了大量集群相关的信息,都以这几个类进行组织和存储。我们先了解一下这些类,便于后面的分析。

2.1. Node类

Node类表示集群中的一个节点,Node记录这个节点的hostipport等信息。该类的关键属性如下:

org.apache.kafka.common.Node
  • // 无Node表示
  • private static final Node NO_NODE = new Node(-1, "", -1);
  • // broker节点id
  • private final int id;
  • // broker节点id的字符串形式
  • private final String idString;
  • // broker节点的地址
  • private final String host;
  • // broker节点的端口
  • private final int port;
  • // broker节点的机架
  • private final String rack;

2.2. TopicPartition类

TopicPartition类表示某Topic的一个分区,其中的topic字段是Topic的名称,partition字段则是此分区在Topic中的分区编号(ID)。该类的关键属性如下:

org.apache.kafka.common.TopicPartition
  • // hash值,用于hashCode()方法缓存
  • private int hash = 0;
  • // 分区编号
  • private final int partition;
  • // 主题名称
  • private final String topic;
  • ...
  • public int hashCode() {
  • if (hash != 0)
  • return hash;
  • final int prime = 31;
  • int result = 1;
  • result = prime * result + partition;
  • result = prime * result + ((topic == null) ? 0 : topic.hashCode());
  • this.hash = result;
  • return result;
  • }

2.3. PartitionInfo类

PartitionInfo表示一个分区的详细信息。其中topic字段和partition字段的含义与TopicPartition中的相同,除此之外,leader字段记录了Leader副本所在的节点,replica字段记录了全部副本所在的节点,inSyncReplicas字段记录了In-Sync副本集合(即ISR集合)中所有副本所在的节点。该类的关键属性如下:

org.apache.kafka.common.PartitionInfo
  • // 主题
  • private final String topic;
  • // 分区编号
  • private final int partition;
  • // 分区leader副本信息
  • private final Node leader;
  • // 全部副本信息
  • private final Node[] replicas;
  • // ISR副本信息
  • private final Node[] inSyncReplicas;

2.4. Cluster类

Cluster类统一保存了上面的三类信息,同时根据相应的关系做好了映射,并且提供了大量便捷操作方法。

值得注意的一点是,Node、TopicPartition、PartitionInfo、Cluster的所有字段都是private final修饰的,且只提供了查询方法,并未提供任何修改方法,这就保证了这四个类的对象都是不可变的线程安全的对象:

org.apache.kafka.common.Cluster
  • // Kafka集群中的broker节点集合
  • private final List<Node> nodes;
  • // 未授权的主题集合
  • private final Set<String> unauthorizedTopics;
  • private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
  • // topic对应的partition信息字典,键为topic名称,值为partition信息集合;存放的partition不一定有Leader副本
  • private final Map<String, List<PartitionInfo>> partitionsByTopic;
  • // topic中可用的partition信息字典,键为topic名称,值为可用的partition信息集合;存放的partition必须是有Leader副本的Partition
  • private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
  • // broker对应的partition信息字典,键为broker的id,值为partition信息集合
  • private final Map<Integer, List<PartitionInfo>> partitionsByNode;
  • // broker对应的Node信息字典,键为broker的id,值为表示该节点的Node实例
  • private final Map<Integer, Node> nodesById;
  • // 添加partition信息
  • public Cluster withPartitions(Map<TopicPartition, PartitionInfo> partitions) {
  • Map<TopicPartition, PartitionInfo> combinedPartitions = new HashMap<>(this.partitionsByTopicPartition);
  • combinedPartitions.putAll(partitions);
  • return new Cluster(this.nodes, combinedPartitions.values(), new HashSet<>(this.unauthorizedTopics));
  • }
  • // 根据Node ID获取Node
  • public Node nodeById(int id) {
  • return this.nodesById.get(id);
  • }
  • // 根据TopicPartition获取partition的leader分区所在的节点
  • public Node leaderFor(TopicPartition topicPartition) {
  • PartitionInfo info = partitionsByTopicPartition.get(topicPartition);
  • if (info == null)
  • return null;
  • else
  • return info.leader();
  • }
  • // 根据TopicPartition获取PartitionInfo
  • public PartitionInfo partition(TopicPartition topicPartition) {
  • return partitionsByTopicPartition.get(topicPartition);
  • }
  • // 获取主题的分区列表
  • public List<PartitionInfo> partitionsForTopic(String topic) {
  • return this.partitionsByTopic.get(topic);
  • }
  • // 获取主题的可用分区列表
  • public List<PartitionInfo> availablePartitionsForTopic(String topic) {
  • return this.availablePartitionsByTopic.get(topic);
  • }
  • // 根据Node ID获取该Node上的分区列表
  • public List<PartitionInfo> partitionsForNode(int nodeId) {
  • return this.partitionsByNode.get(nodeId);
  • }
  • // 根据主题获取该主题的分区数
  • public Integer partitionCountForTopic(String topic) {
  • List<PartitionInfo> partitionInfos = this.partitionsByTopic.get(topic);
  • return partitionInfos == null ? null : partitionInfos.size();
  • }

2.5. Metadata类

Metadata类在内部引用了一个Cluster实例,并记录了大量与更新原数操作相关的属性:

org.apache.kafka.clients.Metadata
  • // 两次刷新元数据退避时间,避免频繁刷新导致性能消耗
  • private final long refreshBackoffMs;
  • // 每隔多久更新一次,默认是300秒
  • private final long metadataExpireMs;
  • // 集群元数据版本号,元数据更新成功一次,版本号就自增1
  • private int version;
  • // 上一次更新元数据的时间戳
  • private long lastRefreshMs;
  • /**
  • * 上一次成功更新元数据的时间戳,如果每次更新都成功,
  • * lastSuccessfulRefreshMs应该与lastRefreshMs相同,否则lastRefreshMs > lastSuccessfulRefreshMs
  • */
  • private long lastSuccessfulRefreshMs;
  • // 记录kafka集群的元数据
  • private Cluster cluster;
  • // 表示是否强制更新Cluster
  • private boolean needUpdate;
  • // 记录当前已知的所有的主题
  • private final Set<String> topics;
  • // 监听器集合,用于监听Metadata更新
  • private final List<Listener> listeners;
  • // 是否需要更新全部主题的元数据
  • private boolean needMetadataForAllTopics;

3. KafkaProducer的构建过程

下面是构建一个简单的KafkaProducer的通用方式:

  • // 生产者需要的配置信息
  • Properties properties = new Properties();
  • // broker列表
  • properties.put("bootstrap.servers", "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");
  • // 键序列化器
  • properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  • // 值序列化器
  • properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  • // 根据配置信息创建生产者
  • KafkaProducer<String, String> kafkaProducer = kafkaProducer = new KafkaProducer<>(properties);

这份代码中只为KafkaProducer指定了bootstrap.serverskey.serializervalue.serializer三个必要的配置。我们看一下这里用到的KafkaProducer的构造方法:

org.apache.kafka.clients.producer.KafkaProducer
  • public KafkaProducer(Properties properties) {
  • this(new ProducerConfig(properties), null, null);
  • }
  • private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
  • try {
  • // 从原始配置拷贝一份副本
  • Map<String, Object> userProvidedConfigs = config.originals();
  • this.producerConfig = config;
  • this.time = new SystemTime();
  • // 获取clientId()
  • clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
  • if (clientId.length() <= 0)
  • // 如果clientId没指定,则使用"producer-序列化"的形式表示
  • clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
  • ...
  • // 获取配置的分区器,如果没有指定就使用默认的DefaultPartitioner分区器(client.id)
  • this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
  • // (retry.backoff.ms)
  • long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
  • // 创建Metadata集群元数据对象(metadata.max.age.ms)
  • this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG));
  • // 获取配置的最大请求大小(max.request.size)
  • this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
  • // 获取配置的总缓冲大小(buffer.memory)
  • this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
  • // 获取配置的压缩器(compression.type)
  • this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
  • ...
  • /**
  • * 创建RecordAccumulator,它是一个发送消息数据的记录缓冲器,用于批量发送消息数据
  • * batch.size单位是字节,用于指定达到多少字节批量发送一次
  • */
  • this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), //(batch.size)
  • this.totalMemorySize,
  • this.compressionType,
  • config.getLong(ProducerConfig.LINGER_MS_CONFIG), //(linger.ms)
  • retryBackoffMs,
  • metrics,
  • time);
  • // 验证并过滤配置的bootstrap.servers项中的地址是否可用(bootstrap.servers)
  • List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
  • /**
  • * 更新Kafka集群的元数据
  • * 这一步会将创建KafkaProducer时配置的broker列表传入
  • * 该update()方法同时会更新Metadata对象的一些属性,
  • * 并通知所有MetadataUpdate Listener监听器,自己要开始更新数据了
  • * 同时唤醒等待Metadata更新完成的线程
  • */
  • this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
  • ...
  • // 创建NetworkClient网络I/O核心
  • NetworkClient client = new NetworkClient(
  • new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder), //(connections.max.idle.ms)
  • this.metadata,
  • clientId,
  • config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), //(max.in.flight.requests.per.connection)
  • config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG), //(reconnect.backoff.ms)
  • config.getInt(ProducerConfig.SEND_BUFFER_CONFIG), //(send.buffer.bytes)
  • config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG), //(receive.buffer.bytes)
  • this.requestTimeoutMs, time);
  • // 创建Sender
  • this.sender = new Sender(client,
  • this.metadata,
  • this.accumulator,
  • config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1, //(max.in.flight.requests.per.connection)
  • config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), //(max.request.size)
  • (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)), //(acks)
  • config.getInt(ProducerConfig.RETRIES_CONFIG), //(retries)
  • this.metrics,
  • new SystemTime(),
  • clientId,
  • this.requestTimeoutMs);
  • // 指定Sender线程的名称
  • String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
  • // 启动Sender对应的线程
  • this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
  • this.ioThread.start();
  • ...
  • // 值和键的序列化
  • if (keySerializer == null) {
  • this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class); //(key.serializer.class)
  • this.keySerializer.configure(config.originals(), true);
  • } else {
  • config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG); //(key.serializer.class)
  • this.keySerializer = keySerializer;
  • }
  • if (valueSerializer == null) {
  • this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class); //(value.serializer.class)
  • this.valueSerializer.configure(config.originals(), false);
  • } else {
  • config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG); //(value.serializer.class)
  • this.valueSerializer = valueSerializer;
  • }
  • ...
  • // 获取配置的拦截器
  • List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class); //(interceptor.classes)
  • this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);
  • ...
  • } catch (Throwable t) {
  • // call close methods if internal objects are already constructed
  • // this is to prevent resource leak. see KAFKA-2121
  • close(0, TimeUnit.MILLISECONDS, true);
  • // now propagate the exception
  • throw new KafkaException("Failed to construct kafka producer", t);
  • }
  • }

从源码我们可以得知,在创建KafkaProducer时会读取大量开发者指定的配置(虽然我们只指定了三项),如果开发者没有提供就使用默认的配置。同时,有以下几个关键的点需要注意:

  1. 如果开发者没有指定生产者Client的名称,将会使用producer-自增序列
  2. 主要需要注意的对象有四个:Metadata对象、RecordAccumulator对象、NetworkClient对象和Sender对象在创建KafkaProducer的过程中被创建了。
  3. Sender是一个实现了Runnable接口的类,它会单独运行于ioThread线程中。

4. Sender线程类

Sender线程主要负责统筹数据发送业务,但真正实现通信过程的类是NetworkClient;我们先分析一下创建和启动Sender线程的代码:

org.apache.kafka.clients.producer.KafkaProducer#KafkaProducer
  • ...
  • // 创建Sender
  • this.sender = new Sender(client,
  • this.metadata,
  • this.accumulator,
  • config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1, //(max.in.flight.requests.per.connection)
  • config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), //(max.request.size)
  • (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)), //(acks)
  • config.getInt(ProducerConfig.RETRIES_CONFIG), //(retries)
  • this.metrics,
  • new SystemTime(),
  • clientId,
  • this.requestTimeoutMs);
  • // 指定Sender线程的名称
  • String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
  • // 启动Sender对应的线程
  • this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
  • this.ioThread.start();

对于Sender线程来说,start()方法启动线程后会调用它的run()方法,源码如下:

org.apache.kafka.clients.producer.internals.Sender
  • // Sender
  • public void run() {
  • ...
  • while (running) {
  • try {
  • run(time.milliseconds());
  • } catch (Exception e) {
  • log.error("Uncaught error in kafka producer I/O thread: ", e);
  • }
  • }
  • ...
  • }
  • void run(long now) {
  • // 获取Cluster集群元数据信息
  • Cluster cluster = metadata.fetch();
  • // get the list of partitions with data ready to send
  • // 获取当前集群中符合发送消息条件的数据集
  • RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
  • // if there are any partitions whose leaders are not known yet, force metadata update
  • // 如果ReadyCheckResult中标识有unknownLeadersExist,则调用Metadata的requestUpdate方法,标记需要更新Kafka的集群信息
  • if (result.unknownLeadersExist)
  • this.metadata.requestUpdate();
  • ...
  • for (ClientRequest request : requests)
  • // 使用NetworkClient将ClientRequest对象写入KafkaChannel的send字段
  • client.send(request, now);
  • // if some partitions are already ready to be sent, the select time would be 0;
  • // otherwise if some partition already has some data accumulated but not ready yet,
  • // the select time will be the time difference between now and its linger expiry time;
  • // otherwise the select time will be the time difference between now and the metadata expiry time;
  • // 使用NetworkClient将KafkaChannel的send字段中保存的ClientRequest发送出去
  • this.client.poll(pollTimeout, now);
  • }
  • // Metadata
  • public synchronized int requestUpdate() {
  • this.needUpdate = true; // 设置为需要更新
  • return this.version; // 返回当前集群元数据的版本号
  • }

我们需要关注的是最后部分的代码,client.send(request, now)负责使用client客户端(此处使用的是NetworkClient,后面会讲解)将请求发送出去,这一点我们后面再讲解;最后的this.client.poll(pollTimeout, now)会调用NetworkClient以NIO方式来处理数据的收发工作,它的代码如下:

org.apache.kafka.clients.NetworkClient
  • public List<ClientResponse> poll(long timeout, long now) {
  • // 每次poll()的时候都会判断是否需要更新Metadata
  • long metadataTimeout = metadataUpdater.maybeUpdate(now);
  • try {
  • // 执行IO操作
  • this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
  • } catch (IOException e) {
  • log.error("Unexpected error during I/O", e);
  • }
  • // process completed actions
  • long updatedNow = this.time.milliseconds();
  • // 响应队列
  • List<ClientResponse> responses = new ArrayList<>();
  • // 处理completeSends队列
  • handleCompletedSends(responses, updatedNow);
  • // 处理completeReceives队列
  • handleCompletedReceives(responses, updatedNow);
  • // 处理disconnected列表
  • handleDisconnections(responses, updatedNow);
  • // 处理connected列表
  • handleConnections();
  • // 处理InFlightRequest中超时请求
  • handleTimedOutRequests(responses, updatedNow);
  • // invoke callbacks
  • // 遍历ClientResponse
  • for (ClientResponse response : responses) {
  • // 如果对应的ClientRequest有回调就执行回调
  • if (response.request().hasCallback()) {
  • try {
  • response.request().callback().onComplete(response);
  • } catch (Exception e) {
  • log.error("Uncaught error in request completion:", e);
  • }
  • }
  • }
  • return responses;
  • }

在每次poll操作时,都会调用metadataUpdater.maybeUpdate(now)操作判断是否需要更新元数据,如果有需要会构建更新请求。但需要注意的是,在初始创建KafkaProducer时,这里的maybeUpdate(long)内部判断并不能满足更新元数据的条件。

5. 元数据更新

元数据更新是在KafkaProducer发送数据时进行的,由于它的过程的原理与消息发送某些细节比较相似,我们这里先对该过程进行详细介绍。在KafkaProducer的doSend(ProducerRecord<K, V>, Callback)方法中,真正发送消息之前会进行元数据更新,源码如下:

org.apache.kafka.clients.producer.KafkaProducer#doSend
  • private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
  • TopicPartition tp = null;
  • try {
  • // first make sure the metadata for the topic is available
  • // 获取Kafka集群信息,会唤醒Sender线程更新Kafka集群的元数据
  • long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);
  • ...
  • } catch (ApiException e) {
  • ...
  • }
  • }

KafkaProducer的waitOnMetadata(String topic, long maxWaitMs)方法主要负责进行元数据更新,它的源码如下:

org.apache.kafka.clients.producer.KafkaProducer#doSend
  • private long waitOnMetadata(String topic, long maxWaitMs) throws InterruptedException {
  • // add topic to metadata topic list if it is not there already.
  • // 如果Metadata对象的topics集合中没有该topic,就将其加入
  • if (!this.metadata.containsTopic(topic))
  • this.metadata.add(topic);
  • // fetch()返回的是Cluster对象,判断Cluster对象中是否有该topic的分区信息,如果有就可以直接返回
  • if (metadata.fetch().partitionsForTopic(topic) != null)
  • return 0;
  • long begin = time.milliseconds();
  • long remainingWaitMs = maxWaitMs;
  • // 如果没有拉取到相应主题的元数据,将会重复拉取
  • while (metadata.fetch().partitionsForTopic(topic) == null) {
  • log.trace("Requesting metadata update for topic {}.", topic);
  • // 设置Metadata的needUpdate字段,并得到当前的元数据版本号
  • int version = metadata.requestUpdate();
  • // 唤醒Sender线程
  • sender.wakeup();
  • // 等待更新,会阻塞
  • metadata.awaitUpdate(version, remainingWaitMs);
  • long elapsed = time.milliseconds() - begin;
  • if (elapsed >= maxWaitMs)
  • // 等待超过最大超时时间,直接抛出异常
  • throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
  • if (metadata.fetch().unauthorizedTopics().contains(topic))
  • // 检查主题的授权信息,如果未授权,直接抛出异常
  • throw new TopicAuthorizationException(topic);
  • // 更新超时时间
  • remainingWaitMs = maxWaitMs - elapsed;
  • }
  • return time.milliseconds() - begin;
  • }

从上面的源码可知,当在Metadata对象引用的Cluster对象中无法获取到指定Topic的分区信息时,会进入while循环体;while循环体内首先会唤醒Sender线程,然后调用Metadata对象的awaitUpdate(final int lastVersion, final long maxWaitMs)方法更新元数据,需要注意的是此时传入的lastVersion参数是Metadata对象保存的当前的版本号;该方法源码如下:

org.apache.kafka.clients.Metadata#awaitUpdate
  • public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
  • if (maxWaitMs < 0) {
  • throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds");
  • }
  • long begin = System.currentTimeMillis();
  • long remainingWaitMs = maxWaitMs;
  • // 比较版本号
  • while (this.version <= lastVersion) {
  • if (remainingWaitMs != 0)
  • // 进入wait阻塞,等待更新成功被notify或超时,进入下一次while循环
  • wait(remainingWaitMs);
  • long elapsed = System.currentTimeMillis() - begin;
  • if (elapsed >= maxWaitMs)
  • throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
  • remainingWaitMs = maxWaitMs - elapsed;
  • }
  • }

从上面代码可以看出,当满足下面2个条件时KafkaProducer主线程就会一直wait等待直到超时:

  • while (metadata.fetch().partitionsForTopic(topic) == null):元数据中无法得到指定主题的分区信息;
  • while (this.version <= lastVersion):Metadata的版本号没有被更新。

KafkaProducer主线程进入wait等待状态后,notify唤醒操作会在Sender线程更新Metadata后执行。

还记得前面我们讲解过的Sender线程吗?最终它的run(long)方法会调用到NetworkClient的poll(long, long)方法,在该方法中首先会调用metadataUpdater.maybeUpdate(now)检测是否符合更新元数据的条件,此时这个方法内部会进行元数据更新的请求。

5.1. 更新请求的封装

NetworkClient的poll(long, long)方法中使用的metadataUpdater对象类型是MetadataUpdater,NetworkClient的构造方法中默认使用DefaultMetadataUpdater类型构建它,该类是NetworkClient内部类,它的maybeUpdate(long)方法源码如下:

org.apache.kafka.clients.NetworkClient.DefaultMetadataUpdater
  • public long maybeUpdate(long now) {
  • /**
  • * 调用Metadata.timeToNextUpdate()方法,其中会检测needUpdate的值、退避时间、是否长时间未更新
  • * 最终得到一个下次更新集群元数据的时间戳
  • */
  • // should we update our metadata?
  • long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
  • // 获取下次尝试重新连接服务端的时间戳
  • long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);
  • // 检测metadataFetchInProgress字段,判断是否已经发送了请求
  • long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0;
  • // if there is no node available to connect, back off refreshing metadata
  • // 计算当前距离下次可以发送MetadataRequest请求的时间差
  • long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt),
  • waitForMetadataFetch);
  • if (metadataTimeout == 0) {
  • // 允许发送MetadataRequest请求
  • // Beware that the behavior of this method and the computation of timeouts for poll() are
  • // highly dependent on the behavior of leastLoadedNode.
  • // 找到负载最小的Node,若没有可用就返回null
  • Node node = leastLoadedNode(now);
  • // 将更新Metadata的请求发送给这个Node;这里只会创建并缓存MetadataRequest,等待下次poll()方法发送请求
  • maybeUpdate(now, node);
  • }
  • return metadataTimeout;
  • }
  • // 该方法是个重载方法
  • private void maybeUpdate(long now, Node node) {
  • // 如果没有可用Node,将直接返回
  • if (node == null) {
  • log.debug("Give up sending metadata request since no node is available");
  • // mark the timestamp for no node available to connect
  • // 记录lastNoNodeAvailableMs字段
  • this.lastNoNodeAvailableMs = now;
  • return;
  • }
  • String nodeConnectionId = node.idString();
  • // 检测是否允许向该Node发送请求
  • if (canSendRequest(nodeConnectionId)) {
  • this.metadataFetchInProgress = true;
  • MetadataRequest metadataRequest;
  • // 指定需要更新元数据的Topic
  • if (metadata.needMetadataForAllTopics())
  • // 更新全部主题的元数据
  • metadataRequest = MetadataRequest.allTopics();
  • else
  • // 更新部分主题的元数据
  • metadataRequest = new MetadataRequest(new ArrayList<>(metadata.topics()));
  • // 将请求更新Metadata的MetadataRequest封装成ClientRequest
  • // 注意:ClientRequest内部其实会包装一个RequestSend,而该RequestSend的destination字段会被置为nodeConnectionId,即NodeId
  • ClientRequest clientRequest = request(now, nodeConnectionId, metadataRequest);
  • log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
  • // 缓存请求,在下次poll()操作中会将其发出,返回的response会在handleCompleteReceives()中处理
  • doSend(clientRequest, now);
  • } else if (connectionStates.canConnect(nodeConnectionId, now)) {
  • // we don't have a connection to this node right now, make one
  • log.debug("Initialize connection to node {} for sending metadata request", node.id());
  • // 初始化连接
  • initiateConnect(node, now);
  • // If initiateConnect failed immediately, this node will be put into blackout and we
  • // should allow immediately retrying in case there is another candidate node. If it
  • // is still connecting, the worst case is that we end up setting a longer timeout
  • // on the next round and then wait for the response.
  • } else { // connected, but can't send more OR connecting
  • // In either case, we just need to wait for a network event to let us know the selected
  • // connection might be usable again.
  • // 已成功连接到指定节点,但不能发送请求,则更新lastNoNodeAvailableMs
  • this.lastNoNodeAvailableMs = now;
  • }
  • }

其中,查找负载最小的Node的方法leastLoadedNode(long)由NetworkClient实现:

org.apache.kafka.clients.NetworkClient#leastLoadedNode
  • // 选择负载最小的Node节点
  • public Node leastLoadedNode(long now) {
  • // 获取存放在Cluster中的Node节点列表
  • List<Node> nodes = this.metadataUpdater.fetchNodes();
  • int inflight = Integer.MAX_VALUE;
  • Node found = null;
  • // 获得随机偏移
  • int offset = this.randOffset.nextInt(nodes.size());
  • for (int i = 0; i < nodes.size(); i++) {
  • // 根据随机偏移随机选一个Node
  • int idx = (offset + i) % nodes.size();
  • Node node = nodes.get(idx);
  • // 获得该Node节点中请求队列的中的请求数量
  • int currInflight = this.inFlightRequests.inFlightRequestCount(node.idString());
  • if (currInflight == 0 && this.connectionStates.isConnected(node.idString())) {
  • // 如果该Node节点的请求队列中没有请求,且该Node节点是连接的,就选该节点
  • // if we find an established connection with no in-flight requests we can stop right away
  • return node;
  • } else if (!this.connectionStates.isBlackedOut(node.idString(), now) && currInflight < inflight) {
  • // otherwise if this is the best we have found so far, record that
  • // 如果不符合,则重新选择
  • inflight = currInflight;
  • found = node;
  • }
  • }
  • return found;
  • }

maybeUpdate(long)方法中会根据传入的时间参数判断是否需要更新元数据,如果满足更新条件,则会从Cluster对象的所有Node节点对象中选取一个负载最小的节点,将更新元数据的请求发送给该节点;在真正发送更新元数据请求的过程中,因为目前并没有与任何一个broker连接,所以会首先调用initiateConnect(node, now)尝试连接,然后在下一次的poll过程中才会真正地发送Metadata更新请求。

更新Metadata元数据的请求是一个ClientRequest对象,内部通过将MetadataRequest的结构转换为RequestSend对象对MetadataRequest进行封装,最终通过NetworkClient的doSend(clientRequest, now)方法将ClientRequest对象添加到inFlightRequests队列中,然后等待下次poll操作中会将其发出;需要注意的是,此处同时会将此待发送的ClientRequest对象内部的RequestSend对象使用KafkaChannel的send字段进行记录:

org.apache.kafka.clients.NetworkClient#doSend
  • private void doSend(ClientRequest request, long now) {
  • request.setSendTimeMs(now);
  • // 将请求添加到inFlightRequest队列中等待响应
  • this.inFlightRequests.add(request);
  • // 将请求交由KafkaChannel处理
  • selector.send(request.request());
  • }

doSend(ClientRequest, long)使用this.inFlightRequests的是InFlightRequests类,它的add(ClientRequest)方法如下:

org.apache.kafka.clients.InFlightRequests#add
  • public void add(ClientRequest request) {
  • /**
  • * 以ClientRequest对象中封装的RequestSend的destination为键从requests中获取对应的Deque
  • * 这个destination的值其实是当时发送请求的目的Node节点的id,可以回顾下面的方法:
  • * {@link NetworkClient.DefaultMetadataUpdater#maybeUpdate}
  • * requests的结构是Map<String, Deque<ClientRequest>>
  • */
  • Deque<ClientRequest> reqs = this.requests.get(request.request().destination());
  • // 如果没有获取到就创建一个
  • if (reqs == null) {
  • reqs = new ArrayDeque<>();
  • this.requests.put(request.request().destination(), reqs);
  • }
  • // 将ClientRequest对象添加到对应的Deque中
  • reqs.addFirst(request);
  • }

doSend(ClientRequest, long)使用selector的是org.apache.kafka.common.network.Selector类,它的send(Send)方法如下:

org.apache.kafka.common.network.Selector#send
  • // 内部封装了java.nio.channels.Selector
  • public void send(Send send) {
  • KafkaChannel channel = channelOrFail(send.destination());
  • try {
  • /**
  • * 将send对象缓存到KafkaChannel的send字段中,同时添加OP_WRITE事件的关注
  • * send对象实际类型是RequestSend对象,其中封装了具体的请求数据,包括请求头和请求体
  • * 这里只是将RequestSend对象用KafkaChannel的send字段记录下来
  • * 具体的发送会在Selector.poll()方法中进行
  • * KafkaChannel每次只会发送一个RequestSend对象
  • */
  • channel.setSend(send);
  • } catch (CancelledKeyException e) {
  • this.failedSends.add(send.destination());
  • close(channel);
  • }
  • }

上述方法使用了KafkaChannel的setSend(Send)方法

org.apache.kafka.common.network.KafkaChannel#setSend
  • // 注意:该方法会将RequestSend对象与KafkaChannel关联起来,使用KafkaChannel的send属性进行记录
  • public void setSend(Send send) {
  • if (this.send != null)
  • throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
  • // 记录send
  • this.send = send;
  • // 添加OP_WRITE事件关注
  • this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
  • }

inFlightRequests队列的主要作用是缓存了已经发出去但没收到响应的ClientRequest,它的类型是InFlightRequests,其底层是通过一个Map>对象实现的,键是NodeId,值是发送到对应Node的ClientRequest对象集合。

InFlightRequests类还提供了canSendMore()方法用于判断是否可以向指定Node发送请求的条件之一,底层也是通过NodeId来取得相应的Deque队列,然后基于队列进行各种判断,其代码如下:

org.apache.kafka.clients.InFlightRequests#canSendMore
  • public boolean canSendMore (String node) {
  • DequeClientRequest queue = requests.get(node);
  • return queue == null // 队列未创建
  • || queue.isEmpty() // 队列为空
  • || (queue.peekFirst().request().completed() // 队首请求已完成
  • && queue.size() this.maxInFlightRequestsPerConnection); // 队列长度小于maxInFlightRequestsPerConnection
  • }

5.2. 更新请求的发送

做完了对Metadata元数据更新的请求封装之后,在NetworkClient类的poll(long timeout, long now)方法中会使用org.apache.kafka.common.network.Selector对象的poll(long)方法来处理数据的发送,回顾这部分代码:

org.apache.kafka.clients.NetworkClient#poll
  • public List<ClientResponse> poll(long timeout, long now) {
  • // 每次poll()的时候都会判断是否需要更新Metadata
  • long metadataTimeout = metadataUpdater.maybeUpdate(now);
  • try {
  • // 执行IO操作
  • this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
  • } catch (IOException e) {
  • log.error("Unexpected error during I/O", e);
  • }
  • ...
  • }

Kafka内进行网络IO通信使用的其实是JDK NIO包,但Kafka对其进行了封装,提供了org.apache.kafka.common.network.Selector,内部拥有一个java.nio.channels.Selector类型的成员属性nioSelector。上面用到Selector的poll(long)方法源码如下:

org.apache.kafka.common.network.Selector
  • public void poll(long timeout) throws IOException {
  • if (timeout < 0)
  • throw new IllegalArgumentException("timeout should be >= 0");
  • // 将上一次poll()方法的结果全部清除掉
  • clear();
  • if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())
  • timeout = 0;
  • /* check ready keys */
  • long startSelect = time.nanoseconds();
  • // 调用select()方法
  • int readyKeys = select(timeout);
  • long endSelect = time.nanoseconds();
  • currentTimeNanos = endSelect;
  • this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
  • if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
  • // 处理IO事件
  • pollSelectionKeys(this.nioSelector.selectedKeys(), false);
  • pollSelectionKeys(immediatelyConnectedKeys, true);
  • }
  • // 将statedReceives复制到completedReceives集合中
  • addToCompletedReceives();
  • long endIo = time.nanoseconds();
  • this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
  • // 关闭长期空闲的连接
  • maybeCloseOldestConnection();
  • }
  • // 处理IO操作
  • private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected) {
  • // 遍历关注的事件SelectionKey
  • Iterator<SelectionKey> iterator = selectionKeys.iterator();
  • while (iterator.hasNext()) {
  • // 获取SelectionKey并从集合中移除
  • SelectionKey key = iterator.next();
  • iterator.remove();
  • // 获取与SelectionKey绑定的KafkaChannel
  • KafkaChannel channel = channel(key);
  • // register all per-connection metrics at once
  • sensors.maybeRegisterConnectionMetrics(channel.id());
  • // 更新LRU信息
  • lruConnections.put(channel.id(), currentTimeNanos);
  • try {
  • /* complete any connections that have finished their handshake (either normally or immediately) */
  • // connect()方法返回true或OP_CONNECTION事件的处理
  • if (isImmediatelyConnected || key.isConnectable()) {
  • // finishConnect方法会先检测sockChannel是否建立完成,建立后,会取消对OP_CONNECT事件关注,开始关注OP_READ事件
  • if (channel.finishConnect()) {
  • // 将连接的ID添加到connected集合中
  • this.connected.add(channel.id());
  • this.sensors.connectionCreated.record();
  • } else
  • // 连接未完成,跳过对该Channel的后续处理
  • continue;
  • }
  • /* if channel is not ready finish prepare */
  • if (channel.isConnected() && !channel.ready())
  • // 身份认证
  • channel.prepare();
  • /* if channel is ready read from any connections that have readable data */
  • if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
  • // Channel可读,处理OP_READ事件
  • NetworkReceive networkReceive;
  • while ((networkReceive = channel.read()) != null)
  • /**
  • * 读取信息并将读到的NetworkReceive添加到stagedReceives集合中保存
  • * 若读取到一个完整的NetworkReceive,则将其添加到stagedReceives集合中保存
  • * 若读取不到一个完整的NetworkReceive,则返回null,下次处理OP_READ事件时,
  • * 继续读取,知道读取到一个完整的NetworkReceive
  • */
  • addToStagedReceives(channel, networkReceive);
  • }
  • /* if channel is ready write to any sockets that have space in their buffer and for which we have data */
  • if (channel.ready() && key.isWritable()) {
  • // Channel可写,处理OP_WRITE事件
  • Send send = channel.write();
  • /**
  • * channel.write()将KafkaChannel中保存的send字段发送出去,
  • * 如果发送成功就会返回send,然后将其添加到completedSends集合,等待后续处理
  • * 如果发送未完成会返回null
  • */
  • if (send != null) {
  • this.completedSends.add(send);
  • this.sensors.recordBytesSent(channel.id(), send.size());
  • }
  • }
  • /* cancel any defunct sockets */
  • // 关注的键无效,关闭对应的KafkaChannel,并将对应的NodeId添加到disconnected集合中
  • if (!key.isValid()) {
  • close(channel);
  • this.disconnected.add(channel.id());
  • }
  • } catch (Exception e) {
  • String desc = channel.socketDescription();
  • if (e instanceof IOException)
  • log.debug("Connection with {} disconnected", desc, e);
  • else
  • log.warn("Unexpected error from {}; closing connection", desc, e);
  • // 抛出异常后关闭KafkaChannel,并将对应的NodeId添加到disconnected集合中
  • close(channel);
  • this.disconnected.add(channel.id());
  • }
  • }
  • }

上面的代码是典型的通过NIO Selector方式进行网络IO操作的过程,我们先关注发送数据的部分:

org.apache.kafka.common.network.Selector#pollSelectionKeys
  • if (channel.ready() && key.isWritable()) {
  • // Channel可写,处理OP_WRITE事件
  • Send send = channel.write();
  • /**
  • * channel.write()将KafkaChannel中保存的send字段发送出去,
  • * 如果发送成功就会返回send,然后将其添加到completedSends集合,等待后续处理
  • * 如果发送未完成会返回null,然后在下一次处理中继续发送
  • */
  • if (send != null) {
  • this.completedSends.add(send);
  • this.sensors.recordBytesSent(channel.id(), send.size());
  • }
  • }

其中使用了KafkaChannel的write()方法:

org.apache.kafka.common.network.KafkaChannel
  • // KafkaChannel类
  • public Send write() throws IOException {
  • Send result = null;
  • // send()方法发送时,如果发送完成会返回true,否则返回false
  • if (send != null && send(send)) {
  • // 完成后使用result记录send,用于返回,并将send置为null,用于下一次发送
  • result = send;
  • send = null;
  • }
  • return result;
  • }
  • private boolean send(Send send) throws IOException {
  • // 发送send,此处的send对象是RequestSend类型
  • send.writeTo(transportLayer);
  • // 检查是否完成
  • if (send.completed())
  • // 如果完成就移除对OP_WRITE事件的关注
  • transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
  • return send.completed();
  • }

这里调用的send.writeTo(transportLayer)其实是ByteBufferSend的writeTo(GatheringByteChannel)方法:

```java shownum=”0” toolsbar=’{“title”: “org.apache.kafka.common.network.ByteBufferSend#writeTo”, “global”: true}
// 需要注意的继承关系:RequestSend -继承-> NetworkSend -继承-> ByteBufferSend -实现-> Send
// 因此上面的RequestSend调用的writeTo()方法其实继承自ByteBufferSend类,
// 传入的channel是TransportLayer类型对象,这个后面会讲解
public long writeTo(GatheringByteChannel channel) throws IOException {
// 使用Channel写出数据
long written = channel.write(buffers);
if (written < 0)
throw new EOFException(“Wrote negative bytes to channel. This shouldn’t happen.”);
remaining -= written;
// This is temporary workaround. As Send , Receive interfaces are being used by BlockingChannel.
// Once BlockingChannel is removed we can make Send, Receive to work with transportLayer rather than
// GatheringByteChannel or ScatteringByteChannel.
if (channel instanceof TransportLayer)
pending = ((TransportLayer) channel).hasPendingWrites();

  • return written;

}
`` 该部分代码的作用是:当select操作得到的SelectionKey代表的是可写事件,使用KafkaChannel对象的write()`方法将之前关联在KafkaChannel对象上的RequestSend对象写出到网络Channel中;需要注意的是,这里底层用到通道其实是TransportLayer类型的,它封装了SocketChannel和SelectionKey,根据网络协议的不同提供不同的子类,对KafkaChannel提供统一的接口,具体实现类有PlaintextTransportLayer和SslTransportLayer,这是策略模式的运用。

5.3. 更新请求的响应处理

请求的响应数据首先会在Selector的pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected)方法中接收并处理:

java shownum="0" toolsbar='{"title": "org.apache.kafka.common.network.Selector", "global": true}' /* if channel is ready read from any connections that have readable data */ if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) { // Channel可读,处理OP_READ事件 NetworkReceive networkReceive; // 循环接收,直到1个response完全接收到,才会从while循环退出 while ((networkReceive = channel.read()) != null) /** * 读取信息并将读到的NetworkReceive添加到stagedReceives集合中保存 * 若读取到一个完整的NetworkReceive,则将其添加到stagedReceives集合中保存 * 若读取不到一个完整的NetworkReceive,则返回null,下次处理OP_READ事件时, * 继续读取,知道读取到一个完整的NetworkReceive */ addToStagedReceives(channel, networkReceive); } private void addToStagedReceives(KafkaChannel channel, NetworkReceive receive) { // 如果stagedReceives中还没有与指定KafkaChannel对应的ArrayDeque,就创建一个新的 if (!stagedReceives.containsKey(channel)) stagedReceives.put(channel, new ArrayDeque<NetworkReceive>()); // 获取stagedReceives中指定KafkaChannel对应的Deque Deque<NetworkReceive> deque = stagedReceives.get(channel); // 将接收的NetworkReceive对象添加到该Deque中 deque.add(receive); }

这里我们需要注意的是,在接收响应数据时,可能需要读取多次才能接收完毕,这里的做法是将每个KafkaChannel实例读取的数据所构成的NetworkReceive对象存放在一个Deque类型的队列中,然后以KafkaChannel实例为键、队列为值的形式存放在一个Map>类型的字段属性stagedReceives中。在完成一次poll操作后,会通过addToCompletedReceives()方法将statedReceives中的NetworkReceives数据复制到completedReceives集合中,回顾源码:

org.apache.kafka.common.network.Selector
  • // Selector类
  • public void poll(long timeout) throws IOException {
  • ...
  • // 调用select()方法
  • int readyKeys = select(timeout);
  • ...
  • if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
  • // 处理IO事件
  • pollSelectionKeys(this.nioSelector.selectedKeys(), false);
  • pollSelectionKeys(immediatelyConnectedKeys, true);
  • }
  • // 将statedReceives复制到completedReceives集合中
  • addToCompletedReceives();
  • ...
  • }
  • private void addToCompletedReceives() {
  • if (!this.stagedReceives.isEmpty()) {
  • // 如果stagedReceives集合不为空,则遍历该集合
  • Iterator<Map.Entry<KafkaChannel, Deque<NetworkReceive>>> iter = this.stagedReceives.entrySet().iterator();
  • while (iter.hasNext()) {
  • // 取出对应的键值对
  • Map.Entry<KafkaChannel, Deque<NetworkReceive>> entry = iter.next();
  • // 获取KafkaChannel
  • KafkaChannel channel = entry.getKey();
  • if (!channel.isMute()) {
  • // 判断KafkaChannel是否是mute状态,如果不是才表示此时KafkaChannel已经完成了读写操作
  • Deque<NetworkReceive> deque = entry.getValue();
  • // 获取队首networkReceive并添加到completedReceives
  • NetworkReceive networkReceive = deque.poll();
  • this.completedReceives.add(networkReceive);
  • this.sensors.recordBytesReceived(channel.id(), networkReceive.payload().limit());
  • // 如果队列空了,移除键值对
  • if (deque.isEmpty())
  • iter.remove();
  • }
  • }
  • }
  • }

而在NetworkClient的poll(long timeout, long now)方法后面的代码则会在每次poll过程中尝试解析返回的响应数据:

org.apache.kafka.clients.NetworkClient
  • public List<ClientResponse> poll(long timeout, long now) {
  • // 每次poll()的时候都会判断是否需要更新Metadata
  • long metadataTimeout = metadataUpdater.maybeUpdate(now);
  • try {
  • // 执行IO操作
  • this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
  • } catch (IOException e) {
  • log.error("Unexpected error during I/O", e);
  • }
  • // process completed actions
  • long updatedNow = this.time.milliseconds();
  • // 尝试解析各类数据
  • // 响应队列
  • List<ClientResponse> responses = new ArrayList<>();
  • // 处理completeSends队列
  • handleCompletedSends(responses, updatedNow);
  • // 处理completeReceives队列
  • handleCompletedReceives(responses, updatedNow);
  • // 处理disconnected列表
  • handleDisconnections(responses, updatedNow);
  • // 处理connected列表
  • handleConnections();
  • // 处理InFlightRequest中超时请求
  • handleTimedOutRequests(responses, updatedNow);
  • // invoke callbacks
  • // 遍历ClientResponse
  • for (ClientResponse response : responses) {
  • // 如果对应的ClientRequest有回调就执行回调
  • if (response.request().hasCallback()) {
  • try {
  • response.request().callback().onComplete(response);
  • } catch (Exception e) {
  • log.error("Uncaught error in request completion:", e);
  • }
  • }
  • }
  • return responses;
  • }
  • private void handleCompletedReceives(List<ClientResponse> responses, long now) {
  • // 遍历completeReceives中的NetworkReceive
  • for (NetworkReceive receive : this.selector.completedReceives()) {
  • // 获取返回响应的NodeId,这里的source对应于前年发送数据时,RequestSend的destination字段
  • String source = receive.source();
  • // 从inFlightRequests中取出对应的ClientRequest
  • ClientRequest req = inFlightRequests.completeNext(source);
  • // 解析响应
  • Struct body = parseResponse(receive.payload(), req.request().header());
  • /**
  • * 调用metadataUpdater.maybeHandleCompletedReceive()方法检查是否是Metadata更新的响应
  • * 如果是,就处理MetadataResponse,会更新Metadata中的记录的集群元数据,并唤醒所有等待Metadata更新完成的线程
  • * 如果不是会返回false
  • */
  • if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))
  • // 如果不是MetadataResponse,则创建ClientResponse并添加到responses集合
  • responses.add(new ClientResponse(req, now, false, body));
  • }
  • }

上面的handleCompletedReceives(List<ClientResponse>, long)中用到了NetworkClient内部类DefaultMetadataUpdater的maybeHandleCompletedReceive(ClientRequest, long, Struct)方法:

  • // NetworkClient$DefaultMetadataUpdater类
  • public boolean maybeHandleCompletedReceive(ClientRequest req, long now, Struct body) {
  • short apiKey = req.request().header().apiKey();
  • // 检测是否是MetadataRequest请求的响应
  • if (apiKey == ApiKeys.METADATA.id && req.isInitiatedByNetworkClient()) {
  • // 处理请求响应
  • handleResponse(req.request().header(), body, now);
  • return true;
  • }
  • return false;
  • }

这里我们需要注意的是handleCompletedReceives(List<ClientResponse> responses, long now)方法中解析响应的过程,下面是源码:

org.apache.kafka.clients.NetworkClient#handleCompletedReceives
  • private void handleCompletedReceives(List<ClientResponse> responses, long now) {
  • // 遍历completeReceives中的NetworkReceive
  • for (NetworkReceive receive : this.selector.completedReceives()) {
  • // 获取返回响应的NodeId,这里的source对应于前年发送数据时,RequestSend的destination字段
  • String source = receive.source();
  • // 从inFlightRequests中取出对应的ClientRequest
  • ClientRequest req = inFlightRequests.completeNext(source);
  • // 解析响应
  • Struct body = parseResponse(receive.payload(), req.request().header());
  • /**
  • * 调用metadataUpdater.maybeHandleCompletedReceive()方法检查是否是Metadata更新的响应
  • * 如果是,就处理MetadataResponse,会更新Metadata中的记录的集群元数据,并唤醒所有等待Metadata更新完成的线程
  • * 如果不是会返回false
  • */
  • if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))
  • // 如果不是MetadataResponse,则创建ClientResponse并添加到responses集合
  • responses.add(new ClientResponse(req, now, false, body));
  • }
  • }

在这个过程中会从Selector对象的completeReceives集合中遍历NetworkReceive对象,然后根据NetworkReceive对象的source字段从inFlightRequests取出对应的ClientRequest,哪为什么可以通过source字段来取出对应的ClientRequest呢?我们回顾一下前面发送请求的流程,NetworkClient在将请求对象ClientRequest交给KafkaChannel时,是会将ClientRequest添加到inFlightRequest队列中的,回顾源码:

  • // NetworkClient类
  • private void doSend(ClientRequest request, long now) {
  • request.setSendTimeMs(now);
  • // 将请求添加到inFlightRequest队列中等待响应
  • this.inFlightRequests.add(request);
  • // 将请求交由KafkaChannel处理
  • selector.send(request.request());
  • }
  • // InFlightRequests类
  • public void add(ClientRequest request) {
  • /**
  • * 以ClientRequest对象中封装的RequestSend的destination为键从requests中获取对应的Deque
  • * 这个destination的值其实是当时发送请求的目的Node节点的id,可以回顾下面的方法:
  • * {@link NetworkClient.DefaultMetadataUpdater#maybeUpdate}
  • * requests的结构是Map<String, Deque<ClientRequest>>
  • */
  • Deque<ClientRequest> reqs = this.requests.get(request.request().destination());
  • // 如果没有获取到就创建一个
  • if (reqs == null) {
  • reqs = new ArrayDeque<>();
  • this.requests.put(request.request().destination(), reqs);
  • }
  • // 将ClientRequest对象添加到对应的Deque中
  • reqs.addFirst(request);
  • }

而NetworkReceives中的source字段也标识了响应来源Node的NodeId,因此通过这两个字段的比较,自然能够找到对应的ClientRequest了。

在获取到对应的ClientRequest之后,会使用NetworkReceives的载荷和ClientRequest的请求头信息来解析响应数据,最终通过DefaultMetadataUpdater的maybeHandleCompletedReceive(ClientRequest request, long now, Struct body)来解析响应数据,该方法会判断响应是否是更新集群元数据的响应,如果是响应数据就交给handleResponse(RequestHeader header, Struct body, long now)方法处理,如果不是会放弃处理并返回false,上层的handleCompletedReceives(List<ClientResponse> responses, long now)方法就会将响应添加到一个List类型的集合responses中,等待后面统一处理。

5.4. 集群元数据的解析和更新

集群元数据的响应数据最终的解析和更新最终由handleResponse(RequestHeader header, Struct body, long now)方法和Metadata类的update(Cluster cluster, long now)方法负责完成;我们先查看handleResponse(RequestHeader header, Struct body, long now)方的源码:

org.apache.kafka.clients.NetworkClient.DefaultMetadataUpdater#handleResponse
  • private void handleResponse(RequestHeader header, Struct body, long now) {
  • // 修改metadataFetchInProgress
  • this.metadataFetchInProgress = false;
  • // 解析MetadataResponse
  • MetadataResponse response = new MetadataResponse(body);
  • // 根据response返回的数据,创建新的Cluster对象
  • Cluster cluster = response.cluster();
  • // check if any topics metadata failed to get updated
  • Map<String, Errors> errors = response.errors();
  • if (!errors.isEmpty())
  • log.warn("Error while fetching metadata with correlation id {} : {}", header.correlationId(), errors);
  • // don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being
  • // created which means we will get errors and no nodes until it exists
  • if (cluster.nodes().size() > 0) {
  • // 首先通知Metadata上的监听器,然后更新cluster字段,最后唤醒等待Metadata更新完成的线程
  • this.metadata.update(cluster, now);
  • } else {
  • log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId());
  • // 更新Metadata失败,将lastRefreshMs字段更新
  • this.metadata.failedUpdate(now);
  • }
  • }

该方法首先将metadataFetchInProgress字段置为了false,表明此时已经没有元数据更新请求了,然后通过传入请求体参数给MetadataResponse类的构造方法来创建一个MetadataResponse对象,该类的构造方法中有大量的解析响应数据的操作,会从响应数据中读取Broker、Controller、Topic及Partition等信息,最终其cluster()方法会将这些信息统一由一个Cluster对象进行管理。这里另外需要注意的是,最终判断元数据是否更新成功的依据是得到的响应信息中Node节点的数量是否大于0,如果判定更新成功,最后会将Cluster对象交给Metadata类的update(Cluster cluster, long now)方法以更新现有的集群元数据:

org.apache.kafka.clients.Metadata#update
  • public synchronized void update(Cluster cluster, long now) {
  • this.needUpdate = false;
  • this.lastRefreshMs = now;
  • this.lastSuccessfulRefreshMs = now;
  • this.version += 1;
  • // 通知所有的监听器,数据要更新了
  • for (Listener listener: listeners)
  • listener.onMetadataUpdate(cluster);
  • // Do this after notifying listeners as subscribed topics' list can be changed by listeners
  • // 更新Cluster字段
  • this.cluster = this.needMetadataForAllTopics ? getClusterForCurrentTopics(cluster) : cluster;
  • // 唤醒等待Metadata更新完成的线程
  • notifyAll();
  • log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
  • }

update(Cluster cluster, long now)方法前面已经讲解过了,它会更新Metadata的相关字段、将更新事件和得到的包含了新的集群元数据的Cluster对象通知给所有的监听器,然后以替换的方式更新内部的Cluster对象完成集群元数据的更新。

5.5. 元数据更新机制

上面我们对元数据的更新流程进行了详细的讲解,这个过程是针对于初次发送消息数据时,由于本地还没有任何集群元数据而触发的更新。除了这一种情况,Metadata还会周期性进行更新,同时如果发现元数据失效,也会进行强制更新。

5.5.1. 周期性更新

我们已经知道,在Sender线程每次进行poll操作时,会调用NetworkClient的poll(long timeout, long now)方法,该方法内部会通过DefaultMetadataUpdater对象的maybeUpdate(long)检测是否需要更新,以达到周期更新的效果,回顾一下这部分的源码:

org.apache.kafka.clients.NetworkClient.DefaultMetadataUpdater#maybeUpdate
  • public long maybeUpdate(long now) {
  • /**
  • * 调用Metadata.timeToNextUpdate()方法,其中会检测needUpdate的值、退避时间、是否长时间未更新
  • * 最终得到一个下次更新集群元数据的时间戳
  • */
  • // should we update our metadata?
  • long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
  • // 获取下次尝试重新连接服务端的时间戳
  • long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);
  • // 检测metadataFetchInProgress字段,判断是否已经发送了请求
  • long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0;
  • // if there is no node available to connect, back off refreshing metadata
  • // 计算当前距离下次可以发送MetadataRequest请求的时间差
  • long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt),
  • waitForMetadataFetch);
  • if (metadataTimeout == 0) {
  • // 允许发送MetadataRequest请求
  • // Beware that the behavior of this method and the computation of timeouts for poll() are
  • // highly dependent on the behavior of leastLoadedNode.
  • // 找到负载最小的Node,若没有可用就返回null
  • Node node = leastLoadedNode(now);
  • // 将跟新Metadata的请求发送给这个Node;这里只会创建并缓存MetadataRequest,等待下次poll()方法发送请求
  • maybeUpdate(now, node);
  • }
  • return metadataTimeout;
  • }

其中第一行的timeToNextMetadataUpdate是由Metadata的timeToNextUpdate(long)获取的:

org.apache.kafka.clients.Metadata#timeToNextUpdate
  • public synchronized long timeToNextUpdate(long nowMs) {
  • /**
  • * 元数据是否过期,判断条件:
  • * 1. needUpdate被置为true
  • * 2. 上次更新时间距离当前时间已经超过了指定的元数据过期时间阈值metadataExpireMs(metadata.max.age.ms),默认是300秒
  • */
  • long timeToExpire = needUpdate ? 0 : Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - nowMs, 0);
  • /**
  • * 允许更新的时间点,计算方式:
  • * 上次更新时间 + 退避时间 - 当前时间的间隔
  • * 即要求上次更新时间与当前时间的间隔不能大于退避时间,如果大于则需要等待
  • */
  • long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - nowMs;
  • return Math.max(timeToExpire, timeToAllowUpdate);
  • }

5.5.2. 失效时更新

Metadata提供了一个requestUpdate()方法,用于将Metadata的needUpdate字段置为true,以实现强制更新,当然了,强制更新仅针对于周期更新而言,从上面的timeToNextUpdate(long nowMs)方法可以看出,将needUpdate字段置为true只是省掉了判断上次更新时间距离当前时间是否已经超过了指定的元数据过期时间阈值这个步骤,真正的更新操作依旧需要遵循退避时间、是否已经发送了请求请求等约束。

失效更新里面更重要的细节是,如何检测Metadata是否失效。KafkaProducer将这个操作分散在多种情况下:

  1. Sender线程在发送准备发送数据之前,如果有未知的Leader分区存在,会触发强制更新:
org.apache.kafka.clients.producer.internals.Sender#run(long)
  • void run(long now) {
  • // 获取Cluster集群元数据信息
  • Cluster cluster = metadata.fetch();
  • // get the list of partitions with data ready to send
  • // 获取当前集群中符合发送消息条件的数据集
  • RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
  • // if there are any partitions whose leaders are not known yet, force metadata update
  • // 如果ReadyCheckResult中标识有unknownLeadersExist,则调用Metadata的requestUpdate方法,标记需要更新Kafka的集群信息
  • if (result.unknownLeadersExist)
  • this.metadata.requestUpdate();
  • ...
  • }
  1. 初始化Node节点连接时,即initiateConnect(Node node, long now)方法,该方法在检测元数据是否可用、检测Node节点是否可用时都有可能被调用;当其尝试连接Node节点时,如果发生异常,就会认为元数据已经失效,进而触发强制更新:
org.apache.kafka.clients.NetworkClient#initiateConnect
  • private void initiateConnect(Node node, long now) {
  • String nodeConnectionId = node.idString();
  • try {
  • log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port());
  • this.connectionStates.connecting(nodeConnectionId, now);
  • // 发起连接
  • selector.connect(nodeConnectionId,
  • new InetSocketAddress(node.host(), node.port()),
  • this.socketSendBuffer,
  • this.socketReceiveBuffer);
  • } catch (IOException e) {
  • /* attempt failed, we'll try again after the backoff */
  • connectionStates.disconnected(nodeConnectionId, now);
  • /* maybe the problem is our metadata, update it */
  • // 如果发起连接失败,可能是由于集群元数据发生了变化,这里会触发一次Metadata的更新,最终会调用Metadata的requestUpdate()方法
  • metadataUpdater.requestUpdate();
  • log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e);
  • }
  • }
  1. 在NetworkClient的poll操作中,会处理与Kafka服务端断掉的连接,此时如果发现有连接断掉了,就会认为元数据已经失效,进而触发强制更新:
org.apache.kafka.clients.NetworkClient
  • public List<ClientResponse> poll(long timeout, long now) {
  • // 每次poll()的时候都会判断是否需要更新Metadata
  • long metadataTimeout = metadataUpdater.maybeUpdate(now);
  • try {
  • // 执行IO操作
  • this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
  • } catch (IOException e) {
  • log.error("Unexpected error during I/O", e);
  • }
  • ...
  • // 处理disconnected列表
  • handleDisconnections(responses, updatedNow);
  • ...
  • }
  • private void handleDisconnections(List<ClientResponse> responses, long now) {
  • // 更新连接状态,并清理掉InFlightRequests中断开连接的Node对应的ClientRequest
  • for (String node : this.selector.disconnected()) {
  • log.debug("Node {} disconnected.", node);
  • processDisconnection(responses, node, now);
  • }
  • // we got a disconnect so we should probably refresh our metadata and see if that broker is dead
  • if (this.selector.disconnected().size() > 0)
  • // 有连接断开了,标识需要更新集群元数据,最终会调用Metadata的requestUpdate()方法
  • metadataUpdater.requestUpdate();
  • }
  1. 同样的,在NetworkClient的poll操作中,会处理超时的连接,此时如果发现有连接超时了,就会认为元数据已经失效,进而触发强制更新:
org.apache.kafka.clients.NetworkClient
  • // NetworkClient类
  • public List<ClientResponse> poll(long timeout, long now) {
  • // 每次poll()的时候都会判断是否需要更新Metadata
  • long metadataTimeout = metadataUpdater.maybeUpdate(now);
  • try {
  • // 执行IO操作
  • this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
  • } catch (IOException e) {
  • log.error("Unexpected error during I/O", e);
  • }
  • ...
  • // 处理InFlightRequest中超时请求
  • handleTimedOutRequests(responses, updatedNow);
  • ...
  • }
  • private void handleTimedOutRequests(List<ClientResponse> responses, long now) {
  • // 从inFlightRequests中获取请求超时的Node的集合
  • List<String> nodeIds = this.inFlightRequests.getNodesWithTimedOutRequests(now, this.requestTimeoutMs);
  • // 遍历Node关闭连接并更新状态
  • for (String nodeId : nodeIds) {
  • // close connection to the node
  • this.selector.close(nodeId);
  • log.debug("Disconnecting from node {} due to request timeout.", nodeId);
  • processDisconnection(responses, nodeId, now);
  • }
  • // we disconnected, so we should probably refresh our metadata
  • // 有超时请求,因此需要更新Metadata元数据,最终会调用Metadata的requestUpdate()方法
  • if (nodeIds.size() > 0)
  • metadataUpdater.requestUpdate();
  • }
  1. 当接收到包含InvalidMetadataException错误信息的响应数据时,也会触发强制更新:
org.apache.kafka.clients.producer.internals.Sender
  • private void handleProduceResponse(ClientResponse response, Map<TopicPartition, RecordBatch> batches, long now) {
  • int correlationId = response.request().request().header().correlationId();
  • if (response.wasDisconnected()) {
  • // 对于连接断开而产生的ClientResponse,会重试发送请求,如果不能重试,则调用其中每条消息的回调
  • log.trace("Cancelled request {} due to node {} being disconnected", response, response.request()
  • .request()
  • .destination());
  • // 遍历RecordBatch,调用completeBatch()处理,第二个参数传递的是Errors.NETWORK_EXCEPTION
  • for (RecordBatch batch : batches.values())
  • completeBatch(batch, Errors.NETWORK_EXCEPTION, -1L, Record.NO_TIMESTAMP, correlationId, now);
  • } else {
  • log.trace("Received produce response from node {} with correlation id {}",
  • response.request().request().destination(),
  • correlationId);
  • // if we have a response, parse it
  • if (response.hasResponse()) {
  • // 解析响应体
  • ProduceResponse produceResponse = new ProduceResponse(response.responseBody());
  • for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) {
  • // 解析各种数据
  • TopicPartition tp = entry.getKey();
  • ProduceResponse.PartitionResponse partResp = entry.getValue();
  • Errors error = Errors.forCode(partResp.errorCode);
  • RecordBatch batch = batches.get(tp);
  • // 调用completeBatch()方法处理
  • completeBatch(batch, error, partResp.baseOffset, partResp.timestamp, correlationId, now);
  • }
  • this.sensors.recordLatency(response.request().request().destination(), response.requestLatencyMs());
  • this.sensors.recordThrottleTime(response.request().request().destination(),
  • produceResponse.getThrottleTime());
  • } else {
  • // this is the acks = 0 case, just complete all requests
  • // 没有响应内容,即acks = 0的情况,直接调用completeBatch()处理
  • for (RecordBatch batch : batches.values())
  • completeBatch(batch, Errors.NONE, -1L, Record.NO_TIMESTAMP, correlationId, now);
  • }
  • }
  • }
  • private void completeBatch(RecordBatch batch, Errors error, long baseOffset, long timestamp, long correlationId, long now) {
  • ...
  • if (error.exception() instanceof InvalidMetadataException)
  • // 标识需要更新Metadata中记录的集群元数据
  • metadata.requestUpdate();
  • // Unmute the completed partition.
  • if (guaranteeMessageOrder)
  • this.accumulator.unmutePartition(batch.topicPartition);
  • }

KafkaProducer对元数据的管理和操作,是先行于数据发送环节的。元数据的完整、正确与否决定了是数据发送先决条件。KafkaProducer对元数据的更新操作与消息数据的发送操作虽然都需要经过网络I/O,但二者的实现略有差异,主要表现在以下几点:

  1. 元数据更新请求被包装为MetadataRequest对象;而消息发送请求会被包装为ProduceRequest对象。不过最终这两个对象都会被ClientRequest再包装一次。
  2. 元数据的发送是直接发送的;而消息数据需要暂存于RecordAccumulator的RecordBatch缓存中,批量发送。
  3. 元数据更新请求的响应是会被转换为MetadataResponse对象,由DefaultMetadataUpdater对象来处理;而消息数据发送的响应会被转换为ClientResponse,由NetworkClient来处理。二者以请求头标记来区分类型。

在后面第二篇分析KafkaProducer的文章中会详细分析消息发送的流程,读者到时候可以注意观察二者之间的差别。