大数据
流式处理
Kafka
源码解析

Kafka系列 06——生产者源码分析 02:消息发送

简介:Kafka提供了两种消息发送的形式:同步发送和异步发送,其实它们本质上都是异步发送。消息发送涉及多个过程,底层依赖于NIO的同步非阻塞处理。

1. 消息发送

这里先介绍一下KafkaProducer在发送消息时涉及到的类及其作用,这些类会在后面的内容中穿插介绍:

  1. KafkaProducer:这个类是提供给开发者使用的用于发送消息的主类。根据开发者指定的配置参数,初始化KafkaProducer后,就可以用它发送消息了。
  2. ProducerInterceptors:该类内部有一个装载ProducerInterceptor实例的集合,其中的ProducerInterceptor实例用于对消息进行拦截。
  3. KeySerializer和ValueSerializer:这两个类一般由开发者自行指定,用于对消息的key和value进行序列化。
  4. Partitioner:该类用于对消息进行分区,Kafka默认实现了DefaultPartitioner,开发者也可以自定指定。
  5. RecordAccumulator:该类是一个缓冲区,用于收集消息,实现批量发送。
  6. RecordBatch:RecordAccumulator内部用于暂存消息的对象。
  7. MemoryRecords:RecordBatch内部真正存放消息数据的对象。
  8. BufferPool:RecordAccumulator提供给MemoryRecords申请缓存内存的缓存池。
  9. Sender:Sender线程类承担了消息真正发送之前对消息进行准备处理的的大部分业务逻辑,它会从RecordAccumulator获取消息。
  10. RequestSend:封装消息数据的请求对象,消息结构的具体体现,包括请求头和请求体。
  11. ClientRequest:封装RequestSend对象,真正发送给服务器的对象。
  12. NetworkClient:将ClientRequest交给NetworkClient,准备发送。
  13. Selector:Kafka自行实现的Selector,内部依旧是基于JDK NIO提供的Selector。
  14. KafkaChannel:NetworkClient将请求放入KafkaChannel的缓存。

这里先给出一张KafkaProducer消息发送的流程示意图,让读者能够对消息发送的整体过程有一个初步的了解。如下:

1.KafkaProducer消息发送数据流.png

消息发送的大致流程如下:

  1. ProducerInterceptors对消息进行拦截。
  2. Serializer对消息的键和值进行序列化。
  3. Partitioner为消息选择合适的分区。
  4. RecordAccumulator收集消息,实现批量发送。
  5. Sender从RecordAccumulator获取消息。
  6. 构造ClientRequest。
  7. 将ClientRequest交给NetworkClient,准备发送。
  8. NetworkClient将请求放入KafkaChannel的缓存。
  9. 执行网络I/O,发送请求。
  10. 收到响应,调用ClientRequest的回调函数。
  11. 调用RecordBatch的回调函数,最终调用每个消息上注册的回调函数。

一般而言,开发者会调用KafkaProducer.send(ProducerRecord)方法发送消息,这个方法有多个重载用于实现同步和异步两种发送模式。我们关注其中的同步方法的实现:

org.apache.kafka.clients.producer.KafkaProducer
  • public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
  • return send(record, null);
  • }
  • public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
  • // intercept the record, which can be potentially modified; this method does not throw exceptions
  • // 使用ProducerInterceptor对消息进行拦截或修改
  • ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
  • return doSend(interceptedRecord, callback);
  • }

从源码可知,同步发送方法最终会调用异步发送方法send(ProducerRecord, Callback),底层会调用doSend(ProducerRecord<K, V>, Callback)方法,这个方法才是KafkaProducer发送消息的主要方法。在分析这个方法之前,我们先看一下ProducerInterceptor的处理。

1.1. 拦截器集合

在真正发送消息之前,KafkaProducer会先将ProducerRecord消息对象传给所有的ProducerInterceptor拦截器进行处理,代码体现如下:

org.apache.kafka.clients.producer.KafkaProducer#send
  • // 使用ProducerInterceptor对消息进行拦截或修改
  • ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);

成员变量interceptors使用final修饰,它是一个ProducerInterceptors对象,ProducerInterceptors内部有一个List集合存放了所有的ProducerInterceptor实例:

org.apache.kafka.clients.producer.KafkaProducer
  • private final List<ProducerInterceptor<K, V>> interceptors;

ProducerInterceptor是一个接口,约定了必要的方法,它的源码如下:

org.apache.kafka.clients.producer.internals.ProducerInterceptors
  • public interface ProducerInterceptor<K, V> extends Configurable {
  • // 当消息发送时调用
  • public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
  • // 当消息发送到服务端并得到ACK确认时调用,同时当KafkaProducer的send()消息抛出异常时也会调用
  • public void onAcknowledgement(RecordMetadata metadata, Exception exception);
  • // 当拦截器关闭时调用
  • public void close();
  • }

ProducerInterceptors中会根据具体情况遍历所有的ProducerInterceptor,并调用对应的方法:

org.apache.kafka.clients.producer.internals.ProducerInterceptors
  • // 在消息发送时调用,调用时机在键和值序列化之前
  • public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
  • ProducerRecord<K, V> interceptRecord = record;
  • // 遍历所有的拦截器
  • for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
  • try {
  • // 对需要发送的消息进行拦截器处理
  • interceptRecord = interceptor.onSend(interceptRecord);
  • } catch (Exception e) {
  • // do not propagate interceptor exception, log and continue calling other interceptors
  • // be careful not to throw exception from here
  • if (record != null)
  • log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e);
  • else
  • log.warn("Error executing interceptor onSend callback", e);
  • }
  • }
  • return interceptRecord;
  • }
  • // 当消息发送到服务端并得到ACK确认时调用,同时当KafkaProducer的send()消息抛出异常时也会调用
  • public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
  • for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
  • try {
  • interceptor.onAcknowledgement(metadata, exception);
  • } catch (Exception e) {
  • // do not propagate interceptor exceptions, just log
  • log.warn("Error executing interceptor onAcknowledgement callback", e);
  • }
  • }
  • }
  • // 当消息发送失败时调用
  • public void onSendError(ProducerRecord<K, V> record, TopicPartition interceptTopicPartition, Exception exception) {
  • for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
  • try {
  • if (record == null && interceptTopicPartition == null) {
  • interceptor.onAcknowledgement(null, exception);
  • } else {
  • if (interceptTopicPartition == null) {
  • interceptTopicPartition = new TopicPartition(record.topic(),
  • record.partition() == null ? RecordMetadata.UNKNOWN_PARTITION : record.partition());
  • }
  • interceptor.onAcknowledgement(new RecordMetadata(interceptTopicPartition, -1, -1, Record.NO_TIMESTAMP, -1, -1, -1),
  • exception);
  • }
  • } catch (Exception e) {
  • // do not propagate interceptor exceptions, just log
  • log.warn("Error executing interceptor onAcknowledgement callback", e);
  • }
  • }
  • }
  • // 调用所有ProducerInterceptor的close()方法
  • @Override
  • public void close() {
  • for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
  • try {
  • interceptor.close();
  • } catch (Exception e) {
  • log.error("Failed to close producer interceptor ", e);
  • }
  • }
  • }

从ProducerInterceptors的onSend(ProducerRecord)方法可知,它内部调用了存放在interceptors中所有ProducerInterceptor对象的onSend(ProducerRecord<K, V>)方法实现拦截功能。

1.2. 消息数据的封装

拦截器处理完成之后,会进行元数据更新,这个过程我们在前面已经讲过了;元数据更新会让KafkaProducer线程在waitOnMetadata(record.topic(), this.maxBlockTimeMs)处一直阻塞,当Sender线程中元数据更新成功后会调用Metadata类的update(Cluster cluster, long now)方法中,最终会调用notifyAll()唤醒处于等待状态的KafkaProducer线程,继续从此处往下继续执行;该方法接下来的源码如下:

org.apache.kafka.clients.producer.KafkaProducer#doSend
  • private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
  • TopicPartition tp = null;
  • try {
  • // first make sure the metadata for the topic is available
  • // 获取Kafka集群信息,会唤醒Sender线程更新Kafka集群的元数据
  • long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);
  • long remainingWaitMs = Math.max(0, this.maxBlockTimeMs - waitedOnMetadataMs);
  • // 序列化键和值
  • byte[] serializedKey;
  • try {
  • serializedKey = keySerializer.serialize(record.topic(), record.key());
  • } catch (ClassCastException cce) {
  • throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
  • " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
  • " specified in key.serializer");
  • }
  • byte[] serializedValue;
  • try {
  • serializedValue = valueSerializer.serialize(record.topic(), record.value());
  • } catch (ClassCastException cce) {
  • throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
  • " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
  • " specified in value.serializer");
  • }
  • // 选择合适的分区
  • int partition = partition(record, serializedKey, serializedValue, metadata.fetch());
  • /**
  • * 计算消息记录的总大小
  • * Records.LOG_OVERHEAD = SIZE_LENGTH(值为4) + OFFSET_LENGTH(值为8)
  • * Records.LOG_OVERHEAD有SIZE_LENGTH和OFFSET_LENGTH两个字段,分别表示存放消息长度和消息偏移量所需要的字节数
  • */
  • int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
  • // 检查消息长度是否合法,不大于maxRequestSize,不大于totalMemorySize,否则会抛出RecordTooLargeException异常
  • ensureValidRecordSize(serializedSize);
  • // 根据主题和分区创建TopicPartition对象
  • tp = new TopicPartition(record.topic(), partition);
  • ...
  • // producer callback will make sure to call both 'callback' and interceptor callback
  • // 如果有拦截器,使用拦截器包装回调
  • Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
  • // 将消息追加到RecordAccumulator中,调用其append()方法
  • RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
  • // 如果达到批次要求
  • if (result.batchIsFull || result.newBatchCreated) {
  • log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
  • // 唤醒Sender进行消息发送
  • this.sender.wakeup();
  • }
  • return result.future;
  • // handling exceptions and record the errors;
  • // for API exceptions return them in the future,
  • // for other exceptions throw directly
  • } catch (ApiException e) {
  • log.debug("Exception occurred during message send:", e);
  • if (callback != null)
  • callback.onCompletion(null, e);
  • this.errors.record();
  • if (this.interceptors != null)
  • this.interceptors.onSendError(record, tp, e);
  • return new FutureFailure(e);
  • } catch (InterruptedException e) {
  • this.errors.record();
  • if (this.interceptors != null)
  • this.interceptors.onSendError(record, tp, e);
  • throw new InterruptException(e);
  • } catch (BufferExhaustedException e) {
  • this.errors.record();
  • this.metrics.sensor("buffer-exhausted-records").record();
  • if (this.interceptors != null)
  • this.interceptors.onSendError(record, tp, e);
  • throw e;
  • } catch (KafkaException e) {
  • this.errors.record();
  • if (this.interceptors != null)
  • this.interceptors.onSendError(record, tp, e);
  • throw e;
  • } catch (Exception e) {
  • // we notify interceptor about all exceptions, since onSend is called before anything else in this method
  • if (this.interceptors != null)
  • this.interceptors.onSendError(record, tp, e);
  • throw e;
  • }
  • }

该方法后面的代码首先对键和值进行序列化,然后使用分区器进行分区,计算消息的总大小,最终会调用RecordAccumulator对象的append(...)方法将消息添加到其内部维护的队列中。下面我们详细介绍RecordAccumulator类。

1.2.1. RecordAccumulator类

KafkaProducer可以有同步和异步两种方式发送消息,其实两者的底层实现相同,都是通过异步方式实现的。KafkaProducer线程调用KafkaProducer.send()方法发送消息的时候,先将消息放到RecordAccumulator中暂存,然后KafkaProducer线程就可以从send()方法中返回了,此时消息并没有真正地发送给Kafka,而是缓存在了RecordAccumulator中。KafkaProducer线程通过KafkaProducer.send()方法不断向RecordAccumulator追加消息,当达到一定的条件,会唤醒Sender线程发送RecordAccumulator中的消息。我们先看一下RecordAccumulator类的append(...)方法源码:

org.apache.kafka.clients.producer.internals.RecordAccumulator#append
  • public RecordAppendResult append(TopicPartition tp,
  • long timestamp,
  • byte[] key,
  • byte[] value,
  • Callback callback,
  • long maxTimeToBlock) throws InterruptedException {
  • // We keep track of the number of appending thread to make sure we do not miss batches in
  • // abortIncompleteBatches().
  • // 统计正在向RecordAccumulator中追加数据的线程数
  • appendsInProgress.incrementAndGet();
  • try {
  • // check if we have an in-progress batch
  • // 步骤1:查找TopicPartition对应的Deque
  • Deque<RecordBatch> dq = getOrCreateDeque(tp);
  • // 同步操作,以Deque为锁
  • synchronized (dq) {
  • // 检查生产者是否已经关闭了
  • if (closed)
  • throw new IllegalStateException("Cannot send after the producer is closed.");
  • // 使用tryAppend()方法向Deque中最后一个RecordBatch追加Record
  • RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
  • if (appendResult != null)
  • // 返回值不为空说明添加成功,直接将返回值进行return
  • return appendResult;
  • }
  • // we don't have an in-progress record batch try to allocate a new batch
  • // 走到这里,说明可能没有正在接受Record的RecordBatch,需要分配一个新的RecordBatch
  • // 根据消息大小和batchSize,选择一个大的值作为RecordBatch底层ByteBuffer的大小
  • int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
  • log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
  • // 使用BufferPool缓存池分配ByteBuffer空间
  • ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
  • // 以dp为锁
  • synchronized (dq) {
  • // Need to check if producer is closed again after grabbing the dequeue lock.
  • // 检查生产者是否已经关闭了
  • if (closed)
  • throw new IllegalStateException("Cannot send after the producer is closed.");
  • // 再次使用tryAppend()方法向Deque中最后一个RecordBatch追加Record
  • RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
  • if (appendResult != null) {
  • // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
  • // 追加成功,释放buffer给BufferPool
  • free.deallocate(buffer);
  • // 返回
  • return appendResult;
  • }
  • // 走到这里说明追加失败
  • // 创建一个MemoryRecords
  • MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
  • // 使用传入的TopicPartition参数和records新创建一个RecordBatch
  • RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
  • // 使用新创建的batch再次尝试
  • FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));
  • // 将新创建的RecordBatch添加到dp及incomplete中
  • dq.addLast(batch);
  • incomplete.add(batch);
  • // 返回
  • return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
  • }
  • } finally {
  • // 将记录正在追加消息的线程数的计数器减1
  • appendsInProgress.decrementAndGet();
  • }
  • }
  • // 根据TopicPartition获取对应的Deque
  • private Deque<RecordBatch> getOrCreateDeque(TopicPartition tp) {
  • // 从batches中尝试获取,batches的结构是ConcurrentMap<TopicPartition, Deque<RecordBatch>>
  • Deque<RecordBatch> d = this.batches.get(tp);
  • // 如果获取到就返回
  • if (d != null)
  • return d;
  • // 如果没有获取到,就创建一个新的
  • d = new ArrayDeque<>();
  • // 使用putIfAbsent进行添加,防止并发引起的重复添加
  • // putIfAbsent会先判断是否存在,如果存在就返回且不添加新的,否则返回null并添加新的
  • Deque<RecordBatch> previous = this.batches.putIfAbsent(tp, d);
  • if (previous == null)
  • return d;
  • else
  • return previous;
  • }

从源码可以得知,RecordAccumulator内部为每个TopicPartition维护了一个Deque队列,该队列中存放了多个RecordBatch。RecordAccumulator的tryAppend(...)方法中会从对应的Deque队列中尝试获取最后一个RecordBatch,如果存在就调用该RecordBatch的tryAppend(...)方法添加消息数据,否则直接返回null:

org.apache.kafka.clients.producer.internals.RecordAccumulator#tryAppend
  • private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, Deque<RecordBatch> deque) {
  • // 获取deque中最后一个RecordBatch
  • RecordBatch last = deque.peekLast();
  • if (last != null) {
  • // 向last中添加消息,获得返回值future
  • FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
  • if (future == null)
  • // 如果返回的future为空,表示该RecordBatch已经没有足够的空间了,将该RecordBatch的MemoryRecords对象关闭
  • last.records.close();
  • else
  • // 如果返回有值,说明添加成功,直接返回包装对象RecordAppendResult
  • return new RecordAppendResult(future, deque.size() > 1 || last.records.isFull(), false);
  • }
  • // 如果last为空,说明deque为空,直接返回null
  • return null;
  • }

1.2.2. RecordBatch结构

我们先关注Deque队列中最后一个RecordBatch不存在的情况,以便了解RecordBatch的结构创建流程。回到上面RecordAccumulator类的append(...)方法中,由于想要获取的RecordBatch不存在,代码会结束第一个synchronized同步块,继续执行后面的代码;首先会根据消息记录的大小得到一个值作为RecordBatch内部MemoryRecords所需要试用的ByteBuffer的大小,然后根据得到的条件参数创建RecordBatch:

org.apache.kafka.clients.producer.internals.RecordAccumulator#append
  • public RecordAppendResult append(TopicPartition tp,
  • long timestamp,
  • byte[] key,
  • byte[] value,
  • Callback callback,
  • long maxTimeToBlock) throws InterruptedException {
  • // We keep track of the number of appending thread to make sure we do not miss batches in
  • // abortIncompleteBatches().
  • // 统计正在向RecordAccumulator中追加数据的线程数
  • appendsInProgress.incrementAndGet();
  • try {
  • // check if we have an in-progress batch
  • // 步骤1:查找TopicPartition对应的Deque
  • Deque<RecordBatch> dq = getOrCreateDeque(tp);
  • // 同步操作,以Deque为锁
  • synchronized (dq) {
  • // 检查生产者是否已经关闭了
  • if (closed)
  • throw new IllegalStateException("Cannot send after the producer is closed.");
  • // 使用tryAppend()方法向Deque中最后一个RecordBatch追加Record
  • RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
  • if (appendResult != null)
  • // 返回值不为空说明添加成功,直接将返回值进行return
  • return appendResult;
  • }
  • // we don't have an in-progress record batch try to allocate a new batch
  • // 走到这里,说明可能没有正在接受Record的RecordBatch,需要分配一个新的RecordBatch
  • // 根据消息大小和batchSize,选择一个大的值作为RecordBatch底层ByteBuffer的大小
  • int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
  • log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
  • // 使用BufferPool缓存池分配ByteBuffer空间
  • ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
  • // 以dp为锁
  • synchronized (dq) {
  • // Need to check if producer is closed again after grabbing the dequeue lock.
  • // 检查生产者是否已经关闭了
  • if (closed)
  • throw new IllegalStateException("Cannot send after the producer is closed.");
  • // 再次使用tryAppend()方法向Deque中最后一个RecordBatch追加Record
  • RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
  • if (appendResult != null) {
  • // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
  • // 追加成功,buffer没用了,需要释放buffer给BufferPool
  • free.deallocate(buffer);
  • // 返回
  • return appendResult;
  • }
  • // 走到这里说明追加失败
  • // 创建一个MemoryRecords
  • MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
  • // 使用传入的TopicPartition参数和records新创建一个RecordBatch
  • RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
  • // 使用新创建的batch再次尝试添加
  • FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));
  • // 将新创建的RecordBatch添加到dp及incomplete中
  • dq.addLast(batch);
  • incomplete.add(batch);
  • // 返回
  • return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
  • }
  • } finally {
  • // 将记录正在追加消息的线程数的计数器减1
  • appendsInProgress.decrementAndGet();
  • }
  • }

这里需要详细说一下RecordBatch内部的结构。从上面的代码可以看出,RecordBatch内部维护了一个MemoryRecords类型的final属性records,该类的定义及重要的成员属性如下:

org.apache.kafka.common.record.MemoryRecords
  • public class MemoryRecords implements Records {
  • private final static int WRITE_LIMIT_FOR_READABLE_ONLY = -1;
  • // the compressor used for appends-only
  • // 压缩器
  • private final Compressor compressor;
  • // the write limit for writable buffer, which may be smaller than the buffer capacity
  • // 记录buffer最多可以写入的字节数
  • private final int writeLimit;
  • // the capacity of the initial buffer, which is only used for de-allocation of writable records
  • private final int initialCapacity;
  • // the underlying buffer used for read; while the records are still writable it is null
  • // 保存消息数据的NIO ByteBuffer
  • private ByteBuffer buffer;
  • // indicate if the memory records is writable or not (i.e. used for appends or read-only)
  • // 用于标识MemoryRecords是否可写,在MemoryRecord发送前会将该字段置为false只读
  • private boolean writable;
  • ...

可以发现,该类内部有一个ByteBuffer类型的成员变量buffer,同时还定义了buffer的初始容量和大小限额、是否可写、压缩器等成员变量,可见RecordAccumulator底层用于缓存数据的结构其实是JDK NIO提供的ByteBuffer。

而从上面RecordAccumulator类的append(...)中的代码可知,在实例化MemoryRecords对象时,这个ByteBuffer是由BufferPool缓存池提供的,下面是append(...)方法的代码片段,其中的free即是RecordAccumulator维护的BufferPool缓存池实例:

org.apache.kafka.clients.producer.internals.RecordAccumulator#append
  • // 根据消息大小和batchSize,选择一个大的值作为RecordBatch底层ByteBuffer的大小
  • int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
  • ...
  • // 使用BufferPool缓存池分配ByteBuffer空间
  • ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
  • // 走到这里说明追加失败
  • // 创建一个MemoryRecords
  • MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
  • // 使用传入的TopicPartition参数和records新创建一个RecordBatch
  • RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
  • // 使用新创建的batch再次尝试
  • FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));
  • ...

BufferPool缓存池用于提供ByteBuffer实例,它的定义和主要的几个方法实现都比较简单,使用了ReentrantLock和Condition等同步手段,源码如下:

org.apache.kafka.clients.producer.internals.BufferPool
  • public final class BufferPool {
  • // 整个Pool的大小
  • private final long totalMemory;
  • private final int poolableSize;
  • // 控制并发访问
  • private final ReentrantLock lock;
  • // 队列,缓存了指定大小的ByteBuffer
  • private final Deque<ByteBuffer> free;
  • // 记录因申请不到足够空间而阻塞的线程所在的Condition条件队列
  • private final Deque<Condition> waiters;
  • // 可用空间大小,totalMemory - free.size() * ByteBuffer大小
  • private long availableMemory;
  • private final Metrics metrics;
  • private final Time time;
  • private final Sensor waitTime;
  • ...
  • /**
  • * Allocate a buffer of the given size. This method blocks if there is not enough memory and the buffer pool
  • * is configured with blocking mode.
  • *
  • * @param size The buffer size to allocate in bytes
  • * @param maxTimeToBlockMs The maximum time in milliseconds to block for buffer memory to be available
  • * @return The buffer
  • * @throws InterruptedException If the thread is interrupted while blocked
  • * @throws IllegalArgumentException if size is larger than the total memory controlled by the pool (and hence we would block
  • * forever)
  • *
  • * 从缓存池中申请ByteBuffer缓存对象
  • */
  • public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
  • // 如果申请的大小比Pool总大小还大,就抛出异常
  • if (size > this.totalMemory)
  • throw new IllegalArgumentException("Attempt to allocate " + size
  • + " bytes, but there is a hard limit of "
  • + this.totalMemory
  • + " on memory allocations.");
  • // 加锁
  • this.lock.lock();
  • try {
  • // check if we have a free buffer of the right size pooled
  • // 当请求的大小是poolableSize,且free中还有空闲
  • if (size == poolableSize && !this.free.isEmpty())
  • // 从free中poll出队首ByteBuffer返回
  • return this.free.pollFirst();
  • // now check if the request is immediately satisfiable with the
  • // memory on hand or if we need to block
  • // 走到这里说明请求的大小不是poolableSize,或者大小是poolableSize但free是空的
  • // 计算free中空闲的大小
  • int freeListSize = this.free.size() * this.poolableSize;
  • if (this.availableMemory + freeListSize >= size) {
  • // 如果剩余可用内存加上free总空闲内存是大于等于需要的大小
  • // we have enough unallocated or pooled memory to immediately
  • // satisfy the request
  • // 使用freeUp()方法使availableMemory满足需要的大小
  • freeUp(size);
  • // 从availableMemory中减去需要的大小
  • this.availableMemory -= size;
  • lock.unlock();
  • // 返回size大小的ByteBuffer对象
  • return ByteBuffer.allocate(size);
  • } else {
  • // 如果剩余可用内存加上free总空闲内存是小于需要的大小
  • // we are out of memory and will have to block
  • int accumulated = 0;
  • ByteBuffer buffer = null;
  • // 构造新的Condition等待队列
  • Condition moreMemory = this.lock.newCondition();
  • // 最大等待时间
  • long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
  • this.waiters.addLast(moreMemory);
  • // loop over and over until we have a buffer or have reserved
  • // enough memory to allocate one
  • // 当申请到的缓存小于需要的缓存大小
  • while (accumulated < size) {
  • long startWaitNs = time.nanoseconds();
  • long timeNs;
  • boolean waitingTimeElapsed;
  • try {
  • // 等待一定时间,返回结果表示是否等待超时
  • waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
  • } catch (InterruptedException e) {
  • // 如果发生中断,就将等待队列从waiters中移除
  • this.waiters.remove(moreMemory);
  • throw e;
  • } finally {
  • // 计算等待的时间
  • long endWaitNs = time.nanoseconds();
  • timeNs = Math.max(0L, endWaitNs - startWaitNs);
  • // 记录等待时间
  • this.waitTime.record(timeNs, time.milliseconds());
  • }
  • // 判断是否等待超时
  • if (waitingTimeElapsed) {
  • // 如果超时,就将等待队列从waiters中移除,并且抛出异常
  • this.waiters.remove(moreMemory);
  • throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
  • }
  • // 从最大等待时间中减去已经等待的时间
  • remainingTimeToBlockNs -= timeNs;
  • // check if we can satisfy this request from the free list,
  • // otherwise allocate memory
  • // 如果accumulated为0,申请的大小是poolableSize,free不为空
  • if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
  • // just grab a buffer from the free list
  • // 就从free中poll出队首的ByteBuffer对象
  • buffer = this.free.pollFirst();
  • // 将accumulated置为申请的大小
  • accumulated = size;
  • } else {
  • // we'll need to allocate memory, but we may only get
  • // part of what we need on this iteration
  • // 这个分支会尝试先分配一部分大小,然后再下一次while循环中继续等待分配
  • // 尝试使用freeUp()使availableMemory满足size - accumulated,可能是不够的
  • freeUp(size - accumulated);
  • // 查看已经得到的大小
  • int got = (int) Math.min(size - accumulated, this.availableMemory);
  • // 从availableMemory中减去这部分大小
  • this.availableMemory -= got;
  • // 然后将得到的大小添加到accumulated上
  • accumulated += got;
  • }
  • }
  • // remove the condition for this thread to let the next thread
  • // in line start getting memory
  • // 从waiters移除队首的Condition
  • Condition removed = this.waiters.removeFirst();
  • // 判断移除的是否是刚刚添加的
  • if (removed != moreMemory)
  • throw new IllegalStateException("Wrong condition: this shouldn't happen.");
  • // signal any additional waiters if there is more memory left
  • // over for them
  • if (this.availableMemory > 0 || !this.free.isEmpty()) {
  • // 如果availableMemory大于0,free队列不为空
  • if (!this.waiters.isEmpty())
  • // 且waiters不为空,则将waiters队首的Condition唤醒
  • this.waiters.peekFirst().signal();
  • }
  • // unlock and return the buffer
  • // 解锁
  • lock.unlock();
  • // 分配ByteBuffer,返回
  • if (buffer == null)
  • return ByteBuffer.allocate(size);
  • else
  • return buffer;
  • }
  • } finally {
  • // 如果还没有解锁,就解锁
  • if (lock.isHeldByCurrentThread())
  • lock.unlock();
  • }
  • }
  • /**
  • * Attempt to ensure we have at least the requested number of bytes of memory for allocation by deallocating pooled
  • * buffers (if needed)
  • */
  • private void freeUp(int size) {
  • // 如果free不为空,且availableMemory小于size,则一直从free中poll
  • // 并将poll出的空间加到availableMemory上
  • while (!this.free.isEmpty() && this.availableMemory < size)
  • this.availableMemory += this.free.pollLast().capacity();
  • }
  • /**
  • * Return buffers to the pool. If they are of the poolable size add them to the free list, otherwise just mark the
  • * memory as free.
  • *
  • * @param buffer The buffer to return
  • * @param size The size of the buffer to mark as deallocated, note that this maybe smaller than buffer.capacity
  • * since the buffer may re-allocate itself during in-place compression
  • * 将ByteBuffer还给缓冲池
  • * 需要注意的是size参数可能小于buffer.capacity,因为buffer可能在使用过程中重新分配了
  • */
  • public void deallocate(ByteBuffer buffer, int size) {
  • // 上锁
  • lock.lock();
  • try {
  • // size是poolableSize大小,且size与buffer的容量相同
  • if (size == this.poolableSize && size == buffer.capacity()) {
  • // 清空buffer,并将buffer添加到free队列中
  • buffer.clear();
  • this.free.add(buffer);
  • } else {
  • // 否则就不归还buffer到size中,而是将size大小加到availableMemory上
  • this.availableMemory += size;
  • }
  • // 归还之后,获取waiters队首的Condition条件队列,并唤醒该条件队列上等待的线程
  • Condition moreMem = this.waiters.peekFirst();
  • if (moreMem != null)
  • moreMem.signal();
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • ...
  • }

有了上面的分析我们能够了解RecordBatch的内部结构了,在RecordBatch内部维护了一个MemoryRecords实例,该实例用于真正存放消息缓存数据,MemoryRecords内部依旧使用JDK NIO提供的ByteBuffer来存储数据,而ByteBuffer则是由BufferPool缓存池提供的。

1.2.3. 消息数据的存储

当需要的RecordBatch准备就绪时,会向RecordBatch中添加消息数据。主要调用RecordBatch的tryAppend(...)方法来完成,源码如下:

org.apache.kafka.clients.producer.internals.RecordBatch#tryAppend
  • public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) {
  • // 估算剩余空间是否足够
  • if (!this.records.hasRoomFor(key, value)) {
  • return null;
  • } else {
  • // 向MemoryRecords中添加数据,offsetCounter是在RecordBatch中的偏移量
  • long checksum = this.records.append(offsetCounter++, timestamp, key, value);
  • // 记录最大消息大小的字节数,这个值会不断更新,始终记录已添加的消息记录中最大的那条的大小
  • this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
  • // 更新最近添加时间
  • this.lastAppendTime = now;
  • // 将消息构造一个FutureRecordMetadata对象
  • FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
  • timestamp, checksum,
  • key == null ? -1 : key.length,
  • value == null ? -1 : value.length);
  • // 如果callback不会空,就将上面得到的FutureRecordMetadata和该callback包装为一个thunk,放到thunks集合里
  • if (callback != null)
  • thunks.add(new Thunk(callback, future));
  • // 更新保存的记录数量
  • this.recordCount++;
  • // 返回FutureRecordMetadata对象
  • return future;
  • }
  • }

其中涉及到的MemoryRecords类的方法如下:

org.apache.kafka.common.record.MemoryRecords
  • public boolean hasRoomFor(byte[] key, byte[] value) {
  • // 检查是否可写,如果不可写直接返回false
  • if (!this.writable)
  • return false;
  • /**
  • * this.compressor.numRecordsWritten():已写入压缩器的记录数
  • * Records.LOG_OVERHEAD:包括SIZE_LENGTH和OFFSET_LENGTH,分别表示记录长度和偏移量
  • * this.initialCapacity:buffer的初始容量
  • * this.compressor.estimatedBytesWritten():估算的压缩器已写入字节数
  • * this.writeLimit:buffer最多还可写入的字节数
  • *
  • * 判断逻辑如下:
  • * 1. 如果写入压缩器的记录数为0,则判断是否 buffer的初始容量 >= SIZE_LENGTH + OFFSET_LENGTH + 记录大小;
  • * 2. 否则判断是否 buffer最多还可写入的字节数 >= 估算的压缩器已写入字节数 + SIZE_LENGTH + OFFSET_LENGTH + 记录大小;
  • */
  • return this.compressor.numRecordsWritten() == 0 ?
  • this.initialCapacity >= Records.LOG_OVERHEAD + Record.recordSize(key, value) :
  • this.writeLimit >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD + Record.recordSize(key, value);
  • }
  • public long append(long offset, long timestamp, byte[] key, byte[] value) {
  • if (!writable)
  • throw new IllegalStateException("Memory records is not writable");
  • // 消息大小
  • int size = Record.recordSize(key, value);
  • // 向压缩器中添加offset记录
  • compressor.putLong(offset);
  • // 向压缩器中添加消息大小记录
  • compressor.putInt(size);
  • // 向压缩器中添加消息数据
  • long crc = compressor.putRecord(timestamp, key, value);
  • // 压缩器记录写入数据的大小
  • compressor.recordWritten(size + Records.LOG_OVERHEAD);
  • return crc;
  • }

从上面的源码可知,在添加消息数据时会先将相关的数据添加放入压缩器。下面我们简单了解一下MemoryRecords的压缩器compressor

1.2.3.1. 压缩器

MemoryRecords中的compressor压缩器的类型是Compressor,目前KafkaProducer支持GZIP、SNAPPY、LZ4三种压缩方式。压缩方式是由创建KafkaProducer时配置的compression.type参数指定的,在RecordAccumulator中创建MemoryRecords实例时会传入解析该参数得到的压缩器名称,回顾这部分代码:

org.apache.kafka.clients.producer.internals.RecordAccumulator
  • public RecordAppendResult append(TopicPartition tp,
  • long timestamp,
  • byte[] key,
  • byte[] value,
  • Callback callback,
  • long maxTimeToBlock) throws InterruptedException {
  • ...
  • // 走到这里说明追加失败
  • // 创建一个MemoryRecords
  • MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
  • ...
  • }

MemoryRecords.emptyRecords(ByteBuffer buffer, CompressionType type, int writeLimit)方法会返回一个新的MemoryRecords,该MemoryRecords内部则会根据压缩方式创建对应的压缩器:

org.apache.kafka.common.record.MemoryRecords#MemoryRecords
  • private MemoryRecords(ByteBuffer buffer, CompressionType type, boolean writable, int writeLimit) {
  • this.writable = writable;
  • this.writeLimit = writeLimit;
  • this.initialCapacity = buffer.capacity();
  • if (this.writable) {
  • this.buffer = null;
  • // 创建压缩器
  • this.compressor = new Compressor(buffer, type);
  • } else {
  • this.buffer = buffer;
  • this.compressor = null;
  • }
  • }

Compressor类的构造方法内部会使用ByteBufferOutputStream修饰一路传入的ByteBuffer实例(这个实例也就是MemoryRecords中使用的ByteBuffer实例,请留意前面的代码)得到一个bufferStream对象,然后通过wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, int bufferSize)方法修饰bufferStream对象得到压缩流对象appendStream,源码如下:

org.apache.kafka.common.record.Compressor
  • public Compressor(ByteBuffer buffer, CompressionType type) {
  • // 记录压缩类型
  • this.type = type;
  • this.initPos = buffer.position();
  • this.numRecords = 0;
  • this.writtenUncompressed = 0;
  • this.compressionRate = 1;
  • this.maxTimestamp = Record.NO_TIMESTAMP;
  • if (type != CompressionType.NONE) {
  • // for compressed records, leave space for the header and the shallow message metadata
  • // and move the starting position to the value payload offset
  • buffer.position(initPos + Records.LOG_OVERHEAD + Record.RECORD_OVERHEAD);
  • }
  • // create the stream
  • // 创建输出流,通过ByteBufferOutputStream装饰RecordAccumulator中传入的ByteBuffer实例,添加扩容功能
  • bufferStream = new ByteBufferOutputStream(buffer);
  • // 根据压缩类型创建合适的压缩流,通过wrapForOutput()方法中不同的压缩器装饰,添加压缩功能
  • appendStream = wrapForOutput(bufferStream, type, COMPRESSION_DEFAULT_BUFFER_SIZE);
  • }
  • public static DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, int bufferSize) {
  • try {
  • switch (type) {
  • case NONE:
  • return new DataOutputStream(buffer);
  • case GZIP:
  • // GZIP压缩输出流是JDK自带的
  • return new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));
  • // Snappy和LZ4不是JDK自带的,所以需要通过反射方式创建
  • case SNAPPY:
  • try {
  • // 利用反射方式创建Snappy压缩输出流
  • OutputStream stream = (OutputStream) snappyOutputStreamSupplier.get().newInstance(buffer, bufferSize);
  • return new DataOutputStream(stream);
  • } catch (Exception e) {
  • throw new KafkaException(e);
  • }
  • case LZ4:
  • try {
  • // 利用反射方式创建LZ4压缩输出流
  • OutputStream stream = (OutputStream) lz4OutputStreamSupplier.get().newInstance(buffer);
  • return new DataOutputStream(stream);
  • } catch (Exception e) {
  • throw new KafkaException(e);
  • }
  • default:
  • throw new IllegalArgumentException("Unknown compression type: " + type);
  • }
  • } catch (IOException e) {
  • throw new KafkaException(e);
  • }
  • }

1.3. 消息数据的发送

消息数据的发送依旧是通过Sender、NetworkClient及Selector相互配合完成的,与元数据更新请求的发送略有不同。我们先查看Sender线程的run(long)方法源码:

org.apache.kafka.clients.producer.internals.Sender
  • void run(long now) {
  • // 获取Cluster集群元数据信息
  • Cluster cluster = metadata.fetch();
  • // get the list of partitions with data ready to send
  • // 获取当前集群中符合发送消息条件的数据集
  • RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
  • // if there are any partitions whose leaders are not known yet, force metadata update
  • // 如果ReadyCheckResult中标识有unknownLeadersExist,则调用Metadata的requestUpdate方法,标记需要更新Kafka的集群信息
  • if (result.unknownLeadersExist)
  • this.metadata.requestUpdate();
  • // remove any nodes we aren't ready to send to
  • Iterator<Node> iter = result.readyNodes.iterator();
  • long notReadyTimeout = Long.MAX_VALUE;
  • while (iter.hasNext()) {
  • Node node = iter.next();
  • // 遍历所有readyNode节点,使用NetWorkClient的ready()方法验证这些节点是否可用
  • if (!this.client.ready(node, now)) {
  • // 如果节点不可用,就将其从readyNodes集合中移除
  • iter.remove();
  • notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
  • }
  • }
  • // create produce requests
  • // key为NodeId,value是待发送的RecordBatch集合
  • Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
  • result.readyNodes,
  • this.maxRequestSize,
  • now);
  • // 禁止这些RecordBatch再次被drain操作
  • if (guaranteeMessageOrder) {
  • // Mute all the partitions drained
  • for (List<RecordBatch> batchList : batches.values()) {
  • for (RecordBatch batch : batchList)
  • this.accumulator.mutePartition(batch.topicPartition);
  • }
  • }
  • /**
  • * 调用RecordAccumulator.abortExpiredBatches()方法处理RecordAccumulator中超时的消息。
  • * 其代码逻辑是,遍历RecordAccumulator中保存的全部RecordBatch,调用RecordBatch.maybeExpire()方法进行处理。
  • * 如果已超时,则调用RecordBatch.done()方法,其中会触发自定义Callback,并将RecordBatch从队列中移除,释放ByteBuffer空间。
  • */
  • List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
  • // update sensors
  • for (RecordBatch expiredBatch : expiredBatches)
  • this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
  • sensors.updateProduceRequestMetrics(batches);
  • // 将待发送的消息封装为ClientRequest
  • List<ClientRequest> requests = createProduceRequests(batches, now);
  • // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
  • // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
  • // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
  • // with sendable data that aren't ready to send since they would cause busy looping.
  • long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
  • if (result.readyNodes.size() > 0) {
  • log.trace("Nodes with data ready to send: {}", result.readyNodes);
  • log.trace("Created {} produce requests: {}", requests.size(), requests);
  • pollTimeout = 0;
  • }
  • for (ClientRequest request : requests)
  • // 使用NetworkClient将ClientRequest对象写入KafkaChannel的send字段
  • client.send(request, now);
  • // if some partitions are already ready to be sent, the select time would be 0;
  • // otherwise if some partition already has some data accumulated but not ready yet,
  • // the select time will be the time difference between now and its linger expiry time;
  • // otherwise the select time will be the time difference between now and the metadata expiry time;
  • // 使用NetworkClient将KafkaChannel的send字段中保存的ClientRequest发送出去
  • this.client.poll(pollTimeout, now);
  • }

这个方法我们之前已经简单介绍过了,但跳过了其中对消息数据处理的大量细节。该方法首先调用RecordAccumulator对象的ready(Cluster cluster, long nowMs)筛选得到了当前集群中符合发送消息条件的数据集,该方法内部会遍历RecordAccumulator中存储的ConcurrentMap>类型的集合batches,然后根据多个条件判断能否发送数据,条件如下:

  1. 接收数据的分区的Leader分区的元数据是否已经拉取到了本地,先决条件;
  2. 对应的RecordBatch队列队首的RecordBatch存在,先决条件;
  3. Deque中有多个RecordBatch或是第一个RecordBatch已经装满了,此时需要发送数据;
  4. 是否有线程在阻塞等待BufferPool释放空间,此时说明BufferPool没有剩余空间了,需要发送数据;
  5. Sender线程正准备关闭,此时需要立即将剩余的数据发送出去;
  6. 有线程正在等待flush操作完成,需要发送数据。

具体的源码如下:

org.apache.kafka.clients.producer.internals.RecordAccumulator#ready
  • public ReadyCheckResult ready(Cluster cluster, long nowMs) {
  • // 记录可以向哪些Node节点发送数据
  • Set<Node> readyNodes = new HashSet<>();
  • // 记录下一次需要调用ready()方法的时间间隔
  • long nextReadyCheckDelayMs = Long.MAX_VALUE;
  • // 根据Metadata元数据中是否有找不到Leader副本的分区
  • boolean unknownLeadersExist = false;
  • // 是否有线程在阻塞等待BufferPool释放空间
  • boolean exhausted = this.free.queued() > 0;
  • // 遍历batches字典
  • for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
  • // 获取键和值
  • TopicPartition part = entry.getKey();
  • Deque<RecordBatch> deque = entry.getValue();
  • // 查找分区的Leader所在的Node
  • Node leader = cluster.leaderFor(part);
  • if (leader == null) {
  • // leader为null,将unknownLeadersExist标记为true,之后会触发Metadata的更新
  • unknownLeadersExist = true;
  • } else if (!readyNodes.contains(leader) && !muted.contains(part)) {
  • // 分区的leader存在,且readyNodes不包含该leader,
  • // 加锁
  • synchronized (deque) {
  • // 查看该RecordBatch队列的第一个RecordBatch
  • RecordBatch batch = deque.peekFirst();
  • if (batch != null) {
  • /**
  • * 频繁多次请求的间隔是否在退避时间限制之内,是否需要退避
  • * 1. batch.attempts > 0:请求次数大于0时;
  • * 2. batch.lastAttemptMs + retryBackoffMs > nowMs:本次请求与上次请求的间隔小于退避时间retryBackoffMs
  • */
  • boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;
  • // 本次请求与上次请求的间隔时间
  • long waitedTimeMs = nowMs - batch.lastAttemptMs;
  • /**
  • * 需要等待的时间,这个值根据是否需要退避,来选择退避时间或者延迟时间
  • * retryBackoffMs:退避时间(可配置,retry.backoff.ms)
  • * lingerMs:延迟时间(可配置,linger.ms)
  • * backingOff:是否需要退避
  • * 当需要退避时,需要等待的时间为退避时间,否则为延迟时间
  • */
  • long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
  • // 计算剩余时间,需要等待的时间 - 距离上次发送的间隔时间
  • long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
  • // Deque中有多个RecordBatch或是第一个RecordBatch是否满了
  • boolean full = deque.size() > 1 || batch.records.isFull();
  • // 是否超时,距离上次发送的间隔时间 - 需要等待的时间
  • boolean expired = waitedTimeMs >= timeToWaitMs;
  • // 得到条件
  • boolean sendable = full || expired || exhausted
  • || closed // Sender线程是否准备关闭
  • || flushInProgress(); // 是否有线程正在等待flush操作完成
  • if (sendable && !backingOff) {
  • readyNodes.add(leader);
  • } else {
  • // Note that this results in a conservative estimate since an un-sendable partition may have
  • // a leader that will later be found to have sendable data. However, this is good enough
  • // since we'll just wake up and then sleep again for the remaining time.
  • // 记录下次需要调用ready()方法检查的时间间隔
  • nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
  • }
  • }
  • }
  • }
  • }
  • return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist);
  • }

最终该方法会将筛选得到的Node节点封装在ReadyCheckResult进行返回,上层方法在得到已准备好的节点集合readyNodes之后,又会先使用NetworkClient的ready(...)方法验证这些节点是否可用,这部分我们后面在讲解。

接下来的代码中,会使用RecordAccumulator对象的drain(...)方法根据集群元数据和readyNodes节点数据得到一个以NodeId为键,RecordBatch集合为值的Map对象batches,这个Map的的作用是将接收数据的Node节点与需要发送到这个Node节点的数据一一映射起来,然后去除超时过期消息,最终通过Sender类的createProduceRequests(Map<Integer, List<RecordBatch>> collated, long now)方法将需要发送的消息映射batches封装为ClientRequest对象集合:

org.apache.kafka.clients.producer.internals.Sender
  • /**
  • * Transfer the record batches into a list of produce requests on a per-node basis
  • * 将NodeId -> List<RecordBatch> 转换为 List<ClientRequest>
  • */
  • private List<ClientRequest> createProduceRequests(Map<Integer, List<RecordBatch>> collated, long now) {
  • // 创建结果集合
  • List<ClientRequest> requests = new ArrayList<ClientRequest>(collated.size());
  • // 遍历collated,使用produceRequest()封装为ClientRequest对象
  • for (Map.Entry<Integer, List<RecordBatch>> entry : collated.entrySet())
  • // 将创建的ClientRequest对象放入requests中
  • requests.add(produceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue()));
  • return requests;
  • }
  • private ClientRequest produceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) {
  • Map<TopicPartition, ByteBuffer> produceRecordsByPartition = new HashMap<TopicPartition, ByteBuffer>(batches.size());
  • final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<TopicPartition, RecordBatch>(batches.size());
  • // 遍历batche集合进行分类,分别装入produceRecordsByPartition和recordsByPartition中
  • for (RecordBatch batch : batches) {
  • TopicPartition tp = batch.topicPartition;
  • produceRecordsByPartition.put(tp, batch.records.buffer());
  • recordsByPartition.put(tp, batch);
  • }
  • // 创建ProduceRequest
  • ProduceRequest request = new ProduceRequest(acks, timeout, produceRecordsByPartition);
  • // 创建RequestSend
  • RequestSend send = new RequestSend(Integer.toString(destination),
  • this.client.nextRequestHeader(ApiKeys.PRODUCE),
  • request.toStruct());
  • // 创建callback回调
  • RequestCompletionHandler callback = new RequestCompletionHandler() {
  • public void onComplete(ClientResponse response) {
  • handleProduceResponse(response, recordsByPartition, time.milliseconds());
  • }
  • };
  • // 创建ClientRequest对象并进行返回
  • return new ClientRequest(now, acks != 0, send, callback);
  • }

最终遍历这个集合,使用NetworkClient发送每个请求:

org.apache.kafka.clients.producer.internals.Sender#run(long)
  • ...
  • for (ClientRequest request : requests)
  • // 使用NetworkClient将ClientRequest对象写入KafkaChannel的send字段
  • client.send(request, now);
  • // if some partitions are already ready to be sent, the select time would be 0;
  • // otherwise if some partition already has some data accumulated but not ready yet,
  • // the select time will be the time difference between now and its linger expiry time;
  • // otherwise the select time will be the time difference between now and the metadata expiry time;
  • // 使用NetworkClient将KafkaChannel的send字段中保存的ClientRequest发送出去
  • this.client.poll(pollTimeout, now);
  • ...

接下来消息真正的发送流程就和元数据更新请求的发送流程一致了,依旧是使用Selector内部的JDK NIO提供的Selector进行数据读写。

2. 发送消息请求的响应处理

处理发送消息的请求对应的响应的方式就比较简单了,依旧是在NetworkClient类的poll(long timeout, long now)方法中,会调用handleCompletedReceives(List<ClientResponse> responses, long now)来处理响应数据,由于响应数据并不是元数据更新请求的响应,因此会将响应数据封装为ClientResponse对象并使用responses集合将其记录,最终调用与ClientResponse响应对象关联的ClientRequest中存储的回调对象的onComplete(ClientResponse)方法,并将响应数据作为参数传入:

org.apache.kafka.clients.NetworkClient
  • public List<ClientResponse> poll(long timeout, long now) {
  • // 每次poll()的时候都会判断是否需要更新Metadata
  • long metadataTimeout = metadataUpdater.maybeUpdate(now);
  • try {
  • // 执行IO操作
  • this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
  • } catch (IOException e) {
  • log.error("Unexpected error during I/O", e);
  • }
  • ...
  • // 响应队列
  • List<ClientResponse> responses = new ArrayList<>();
  • ...
  • handleCompletedReceives(responses, updatedNow);
  • ...
  • // invoke callbacks
  • // 遍历ClientResponse
  • for (ClientResponse response : responses) {
  • // 如果对应的ClientRequest有回调就执行回调
  • if (response.request().hasCallback()) {
  • try {
  • response.request().callback().onComplete(response);
  • } catch (Exception e) {
  • log.error("Uncaught error in request completion:", e);
  • }
  • }
  • }
  • return responses;
  • }
  • private void handleCompletedReceives(List<ClientResponse> responses, long now) {
  • // 遍历completeReceives中的NetworkReceive,completeReceives中存储了接收完成的封装了响应的NetworkReceives对象
  • for (NetworkReceive receive : this.selector.completedReceives()) {
  • // 获取返回响应的NodeId
  • String source = receive.source();
  • // 从inFlightRequests中取出对应的ClientRequest
  • ClientRequest req = inFlightRequests.completeNext(source);
  • // 解析响应
  • Struct body = parseResponse(receive.payload(), req.request().header());
  • /**
  • * 调用metadataUpdater.maybeHandleCompletedReceive()方法处理MetadataResponse,
  • * 会更新Metadata中的记录的集群元数据,并唤醒所有等待Metadata更新完成的线程
  • */
  • if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))
  • // 如果不是MetadataResponse,则创建ClientResponse并添加到responses集合
  • responses.add(new ClientResponse(req, now, false, body));
  • }
  • }

3. 网络I/O操作细节

从上面的源码分析可知,KafkaProducer在向集群发送消息数据时,使用的是JDK NIO包,这其中Kafka内部实现了许多关于I/O操作的细节,包括消息的分包、分界和时序保证等,下面的内容将根据这几个细节来展开讲解。

3.1. 消息分包

Sender线程中会将需要发送的消息包装为一个完整的ClientRequest对象,然后调用NetworkClient的send(ClientRequest request, long now)方法将该ClientRequest对象传递给到Selector,由Selector将其中RequestSend对象暂存到KafkaChannel对象的send字段,此时暂存的是一个内部包含了请求头和请求体数据包,这个RequestSend数据包对象最终交由底层的TransportLayer对象的write(Bytebuffer b)写出,但数据并不一定能够在一次写出中就发送完成,可能要调用多次write(Bytebuffer b)方法才能把一个RequestSend数据包完全发出去。这是因为NIO中写操作是非阻塞的,不会等到完全发出去才返回。

Kafka中使用多次写出的方式,每次由KafkaChannel负责写出数据时,写完后会有一个Send类型的返回值,如果该值返回一个Send对象,说明数据已经完全发送,就将得到的Send对象存入到completedSends集合中,否则返回为null说明还未写完,此时不会有任何操作,而是在后续的KafkaChannel再次处于可写状态时接着写出,直到写出完成。源码如下:

org.apache.kafka.common.network.Selector#pollSelectionKeys
  • /* if channel is ready write to any sockets that have space in their buffer and for which we have data */
  • if (channel.ready() && key.isWritable()) {
  • // Channel可写,处理OP_WRITE事件
  • Send send = channel.write();
  • /**
  • * channel.write()将KafkaChannel中保存的send字段发送出去,
  • * 如果发送成功就会返回send,然后将其添加到completedSends集合,等待后续处理
  • * 如果发送未完成会返回null
  • */
  • if (send != null) {
  • this.completedSends.add(send);
  • this.sensors.recordBytesSent(channel.id(), send.size());
  • }
  • }

这里我们需要清楚一个细节,每个KafkaChannel同一时间只会关联一个Send对象,在这个Send对象没有完全发送完成之前,KafkaChannel是无法接纳新的Send对象的,我们看一下KafkaChannel的write()方法:

org.apache.kafka.common.network.KafkaChannel#write
  • public Send write() throws IOException {
  • Send result = null;
  • // send()方法发送时,如果发送完成会返回true,否则返回false
  • if (send != null && send(send)) {
  • // 完成后使用result记录send,用于返回,并将send置为null,用于下一次发送
  • result = send;
  • send = null;
  • }
  • return result;
  • }

那KafkaChannel是如何避免当前Send未完成发送之前不会接纳新的Send对象的呢?我们可以回顾一下Sender线程的run(long)方法的源码片段:

org.apache.kafka.clients.producer.internals.Sender#run()
  • void run(long now) {
  • ...
  • // get the list of partitions with data ready to send
  • // 获取当前集群中符合发送消息条件的数据集
  • RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
  • ...
  • // remove any nodes we aren't ready to send to
  • Iterator<Node> iter = result.readyNodes.iterator();
  • long notReadyTimeout = Long.MAX_VALUE;
  • while (iter.hasNext()) {
  • Node node = iter.next();
  • // 遍历所有readyNode节点,使用NetWorkClient的ready()方法验证这些节点是否可用
  • if (!this.client.ready(node, now)) {
  • // 如果节点不可用,就将其从readyNodes集合中移除
  • iter.remove();
  • notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
  • }
  • }
  • ...
  • // create produce requests
  • // key为NodeId,value是待发送的RecordBatch集合
  • Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
  • result.readyNodes,
  • this.maxRequestSize,
  • now);
  • ...
  • // 将待发送的消息封装为ClientRequest
  • List<ClientRequest> requests = createProduceRequests(batches, now);
  • ...
  • for (ClientRequest request : requests)
  • // 使用NetworkClient将ClientRequest对象写入KafkaChannel的send字段
  • client.send(request, now);
  • ...
  • // 使用NetworkClient将KafkaChannel的send字段中保存的ClientRequest发送出去
  • this.client.poll(pollTimeout, now);
  • }

这部分源码的while循环中,会遍历所有通过RecordAccumulator拿到的readyNodes,使用NetworkClient的ready(Node, long)检测每个Node是否可用,如果不可用就将其移除,我们详细看一下它的源码:

org.apache.kafka.clients.NetworkClient
  • public boolean ready(Node node, long now) {
  • if (node.isEmpty())
  • throw new IllegalArgumentException("Cannot connect to empty node " + node);
  • // 检查是否可以向一个Node发送请求
  • if (isReady(node, now))
  • return true;
  • // 如果不能发送请求,则尝试建立连接
  • if (connectionStates.canConnect(node.idString(), now))
  • // if we are interested in sending to a node and we don't have a connection to it, initiate one
  • // 发起连接
  • initiateConnect(node, now);
  • return false;
  • }
  • public boolean isReady(Node node, long now) {
  • // if we need to update our metadata now declare all requests unready to make metadata requests first
  • // priority
  • // Metadata没有处于正在更新或需要更新状态
  • // 已经成功建立连接并连接正常,inFlightRequests还可以发送更多数据
  • return !metadataUpdater.isUpdateDue(now) && canSendRequest(node.idString());
  • }
  • private boolean canSendRequest(String node) {
  • // 已成功建立连接,连接正常,nFlightRequests还可以发送更多数据
  • return connectionStates.isConnected(node) && selector.isChannelReady(node) && inFlightRequests.canSendMore(node);
  • }
  • // NetworkClient$DefaultMetadataUpdater类
  • public boolean isUpdateDue(long now) {
  • return !this.metadataFetchInProgress && this.metadata.timeToNextUpdate(now) == 0;
  • }

涉及到的Selector类的isChannelReady(...)方法:

org.apache.kafka.common.network.Selector#isChannelReady
  • // Selector
  • public boolean isChannelReady(String id) {
  • KafkaChannel channel = this.channels.get(id);
  • return channel != null && channel.ready();
  • }

涉及到的InFlightRequests类的canSendMore(...)方法:

org.apache.kafka.clients.InFlightRequests#canSendMore
  • // InFlightRequests类
  • /**
  • * Can we send more requests to this node?
  • *
  • * 判断是否可以向某个Node发送更多的请求
  • * 主要通过Deque<ClientRequest>队列的情况来判断
  • * 该队列中存放了所有已经发送但没有收到响应的ClientRequest
  • *
  • * @param node Node in question
  • * @return true iff we have no requests still being sent to the given node
  • */
  • public boolean canSendMore(String node) {
  • Deque<ClientRequest> queue = requests.get(node);
  • /**
  • * queue为null,
  • * 或者queue内没有元素
  • * 或者queue的队首元素已经完成了请求,
  • * 或者queue内元素个数没有达到maxInFlightRequestsPerConnection(max.in.flight.requests.per.connection)指定的数量
  • */
  • return queue == null || queue.isEmpty() ||
  • (queue.peekFirst().request().completed() && queue.size() < this.maxInFlightRequestsPerConnection);
  • }

可以看出,NetworkClient结合DefaultMetadataUpdater、InFlightRequests及Selector实例对Node节点进行了大量的判断,主要的判断依据如下:

  1. !metadataUpdater.isUpdateDue(now):Metadata没有处于正在更新或需要更新的状态。
    - !this.metadataFetchInProgress:Metadata没有处于正在更新的状态。
    - this.metadata.timeToNextUpdate(now) == 0:当前Metadata需要更新。
  2. canSendRequest(node.idString()):已与Node成功建立连接并且连接通道正常,inFlightRequests还允许发送更多数据。
    - connectionStates.isConnected(node):已与Node成功建立连接。
    - selector.isChannelReady(node):连接通道正常。
    - inFlightRequests.canSendMore(node):InflightRequests队列还允许发送更多数据。

同样的,I/O通道处于可读状态时,由KafkaChannel负责读取数据的工作;此时KafkaChannel使用的则是while循环读取的方式,直到完全读取响应数据:

org.apache.kafka.common.network.Selector#pollSelectionKeys
  • if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
  • // Channel可读,处理OP_READ事件
  • NetworkReceive networkReceive;
  • // 循环接收,直到1个response完全接收到,才会从while循环退出
  • while ((networkReceive = channel.read()) != null)
  • /**
  • * 读取信息并将读到的NetworkReceive添加到stagedReceives集合中保存
  • * 若读取到一个完整的NetworkReceive,则将其添加到stagedReceives集合中保存
  • * 若读取不到一个完整的NetworkReceive,则返回null,下次处理OP_READ事件时,
  • * 继续读取,知道读取到一个完整的NetworkReceive
  • */
  • addToStagedReceives(channel, networkReceive);
  • }

涉及到的KafkaChannel类的read(...)方法:

org.apache.kafka.common.network.KafkaChannel
  • // 读取数据,返回NetworkReceive对象
  • public NetworkReceive read() throws IOException {
  • NetworkReceive result = null;
  • if (receive == null) {
  • receive = new NetworkReceive(maxReceiveSize, id);
  • }
  • // 读取数据
  • receive(receive);
  • // 判断是否读取完
  • if (receive.complete()) {
  • receive.payload().rewind();
  • result = receive;
  • // 将receive置为null
  • receive = null;
  • }
  • return result;
  • }
  • private long receive(NetworkReceive receive) throws IOException {
  • // transportLayer是底层的JDK NIO Channel对象
  • return receive.readFrom(transportLayer);
  • }
  • // NetworkReceive类
  • // 从channel中读取数据
  • public long readFrom(ScatteringByteChannel channel) throws IOException {
  • return readFromReadableChannel(channel);
  • }
  • public long readFromReadableChannel(ReadableByteChannel channel) throws IOException {
  • int read = 0;
  • if (size.hasRemaining()) {
  • int bytesRead = channel.read(size);
  • if (bytesRead < 0)
  • throw new EOFException();
  • read += bytesRead;
  • if (!size.hasRemaining()) {
  • size.rewind();
  • // 获取消息长度
  • int receiveSize = size.getInt();
  • if (receiveSize < 0)
  • throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")");
  • if (maxSize != UNLIMITED && receiveSize > maxSize)
  • throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")");
  • // 根据消息长度分配ByteBuffer
  • this.buffer = ByteBuffer.allocate(receiveSize);
  • }
  • }
  • if (buffer != null) {
  • // 读取消息体
  • int bytesRead = channel.read(buffer);
  • if (bytesRead < 0)
  • throw new EOFException();
  • read += bytesRead;
  • }
  • return read;
  • }

3.2. 消息分界

从上面对消息分包的了解,我们了解到发送和读取数据时的消息组织方式,而底层数据的通信是在多个Channel上进行的,发送流和接收流会同时进行,此时对流数据的处理就需要考虑边界问题。对于发送数据来说是比较好处理的,因为每次需要发送的数据的大小是已知的;而接收数据时则需要明确知道每次接收的数据长度以便确定边界。

我们先查看下上面的NetworkReceive类的定义和主要的几个成员属性:

  • public class NetworkReceive implements Receive {
  • public final static String UNKNOWN_SOURCE = "";
  • public final static int UNLIMITED = -1;
  • // 消息来源Node节点的ID
  • private final String source;
  • // 存放每次需要读取的消息的长度的Buffer
  • private final ByteBuffer size;
  • private final int maxSize;
  • // 真正的消息数据
  • private ByteBuffer buffer;
  • ...
  • }

名为size的ByteBuffer是用于存储标识每次需要读取的数据的长度信息的,回顾上面readFromReadableChannel(ReadableByteChannel channel)方法的源码,这个方法是读取数据的具体实现:

  • public long readFromReadableChannel(ReadableByteChannel channel) throws IOException {
  • int read = 0;
  • if (size.hasRemaining()) {
  • // 读取消息长度数据
  • int bytesRead = channel.read(size);
  • if (bytesRead < 0)
  • throw new EOFException();
  • read += bytesRead;
  • if (!size.hasRemaining()) {
  • size.rewind();
  • // 获取消息长度
  • int receiveSize = size.getInt();
  • if (receiveSize < 0)
  • throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")");
  • if (maxSize != UNLIMITED && receiveSize > maxSize)
  • throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")");
  • // 根据消息长度分配ByteBuffer
  • this.buffer = ByteBuffer.allocate(receiveSize);
  • }
  • }
  • if (buffer != null) {
  • // 读取消息体
  • int bytesRead = channel.read(buffer);
  • if (bytesRead < 0)
  • throw new EOFException();
  • read += bytesRead;
  • }
  • return read;
  • }

可以发现,该方法最前面会首先读取数据2个字节(读取的数据表达的是一个int值)到size缓冲区,然后转换为一个int类的值receiveSize以确定需要接收的数据的长度,然后根据receiveSize创建相应大小的真正用于接收数据的ByteBuffer,最终将消息正文数据存放到该ByteBuffer中。这样就实现了读取响应数据时的消息分界。

3.3. 时序保证

从前面分析的消息发送的过程我们知道,所有发出去的ClientRequest会存放在InFlightRequests中的requests字典中:

  • // 键为发送请求的目的Node节点的id,值为发送到该节点的ClientRequest请求的队列
  • private final Map<String, Deque<ClientRequest>> requests = new HashMap<String, Deque<ClientRequest>>();`

然后当接收到响应回来时,响应数据里标识了响应来源的Node节点ID,此时就把requests字典中相对应的ClientRequest取出,然后结合两者来处理响应业务(ClientRequest中可能保存了发送时要求的响应回调)。我们需要注意,由于ClientRequest和ClientResponse是需要严格配对的,因此在发送数据和接收响应时就必须要保证消息的时序,在一个Channel上,如果按照ClientRequest-0、ClientRequest-1、ClientRequest-2的顺序发送请求,那返回的ClientResponse的顺序也必须是ClientResponse-0、ClientResponse-1、ClientResponse-2。

但是Kafka服务端是1 + N + M模型,所有的请求将先放入一个请求队列,然后使用多线程并行处理的。那它如何保证消息的时序呢?Kafka使用的是Mute/Unmute机制,每当一个Channel上面接收到一个请求,这个Channel就会被Mute,然后等响应返回之后,才会被Unmute。这样就保证了同一个连接上面,同时只会有一个请求被处理。服务端的代码如下:

注:Kafka服务端的1 + N + M模型的意思是,服务端使用1个Acceptor线程监听和处理连接请求,然后将连接交给Processor线程;存在N个Processor线程并发处理请求的读写操作,Processor读取请求数据,解析为具体的操作,然后交给Handler线程执行;存在M个Handler线程处理请求所触发的具体操作,Handler线程最终会将处理结果交换给对应的Processor线程,有它返回给请求发送者。这些在后面都会详细介绍。

  • selector.completedReceives.asScala.foreach { receive =>
  • try {
  • val channel = selector.channel(receive.source)
  • val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName), channel.socketAddress)
  • val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)
  • requestChannel.sendRequest(req)
  • } catch {
  • case e @ (_: InvalidRequestException | _: SchemaException) =>
  • // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier
  • error("Closing socket for " + receive.source + " because of error", e)
  • close(selector, receive.source)
  • }
  • // 收到请求,Mute这个请求对应的Channel,实际上是移除了Selector上监听的OP_READ事件,即停止从连接读取数据
  • selector.mute(receive.source)
  • }
  • selector.completedSends.asScala.foreach { send =>
  • val resp = inflightResponses.remove(send.destination).getOrElse {
  • throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`")
  • }
  • resp.request.updateRequestMetrics()
  • // 发送响应之后,Unmute这个响应对应的Channel,实际上是向Selector添加OP_READ事件,即标识可从连接读取数据
  • selector.unmute(send.destination)
  • }

3.4. 网络连接的异常检测和重连

TCP长链接的编程中客户端与服务端建立的链接可能会由于各种情况发生中断,因此如何检测网络连接的状态也是整个消息发送过程中非常重要的需求,客户端需要根据连接状态的变化来处理因此产生的问题。

连接的状态有连接中、已连接和已断开三种,NIO编程模型中我们只能通过一系列间接的方式来检测连接的状态,如通过判断连接操作是否正常、读写操作是否通畅、响应是否超时等方式。在NetworkClient的实现中有三种方式可以判断一个连接是否断开:

  1. 通过检测IO连接、读操作、写操作所产生的异常来判断连接是否断开,这部分代码的实现在Selector的pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected)方法中:
  • // Selector类
  • // 处理IO操作
  • private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected) {
  • // 遍历关注的事件SelectionKey
  • Iterator<SelectionKey> iterator = selectionKeys.iterator();
  • while (iterator.hasNext()) {
  • // 获取SelectionKey并从集合中移除
  • SelectionKey key = iterator.next();
  • iterator.remove();
  • // 获取与SelectionKey绑定的KafkaChannel
  • KafkaChannel channel = channel(key);
  • ...
  • try {
  • /* complete any connections that have finished their handshake (either normally or immediately) */
  • // connect()方法返回true或OP_CONNECTION事件的处理
  • if (isImmediatelyConnected || key.isConnectable()) {
  • ...
  • }
  • /* if channel is not ready finish prepare */
  • if (channel.isConnected() && !channel.ready())
  • ...
  • /* if channel is ready read from any connections that have readable data */
  • if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
  • ...
  • }
  • /* if channel is ready write to any sockets that have space in their buffer and for which we have data */
  • if (channel.ready() && key.isWritable()) {
  • ...
  • }
  • ...
  • } catch (Exception e) {
  • String desc = channel.socketDescription();
  • if (e instanceof IOException)
  • log.debug("Connection with {} disconnected", desc, e);
  • else
  • log.warn("Unexpected error from {}; closing connection", desc, e);
  • // 抛出异常后关闭KafkaChannel,并将对应的NodeId添加到disconnected集合中
  • close(channel);
  • // 判断连接断开,将相应的NodeId添加到disconnected数组
  • this.disconnected.add(channel.id());
  • }
  • }
  • }
  1. 通过检测每次Select操作得到的SelectionKey是否有效来判断连接是否断开,这部分的代码的实现也在Selector的pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected)方法中:
  • // Selector类
  • // 处理IO操作
  • private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected) {
  • // 遍历关注的事件SelectionKey
  • Iterator<SelectionKey> iterator = selectionKeys.iterator();
  • while (iterator.hasNext()) {
  • // 获取SelectionKey并从集合中移除
  • SelectionKey key = iterator.next();
  • iterator.remove();
  • // 获取与SelectionKey绑定的KafkaChannel
  • KafkaChannel channel = channel(key);
  • ...
  • try {
  • ...
  • /* cancel any defunct sockets */
  • // 关注的键无效,关闭对应的KafkaChannel,并将对应的NodeId添加到disconnected集合中
  • if (!key.isValid()) {
  • close(channel);
  • // 将断开连接的NodeID添加到disconnected集合中
  • this.disconnected.add(channel.id());
  • }
  • } catch (Exception e) {
  • ...
  • }
  • }
  • }
  1. 存放在InFlightRequests中的请求都标记了发送的时间,可以根据请求的响应是否超时来判断连接是否断开:
  • // NetworkClient类
  • public List<ClientResponse> poll(long timeout, long now) {
  • // 每次poll()的时候都会判断是否需要更新Metadata
  • long metadataTimeout = metadataUpdater.maybeUpdate(now);
  • try {
  • // 执行IO操作
  • this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
  • } catch (IOException e) {
  • log.error("Unexpected error during I/O", e);
  • }
  • ...
  • // 响应队列
  • List<ClientResponse> responses = new ArrayList<>();
  • ...
  • // 处理InFlightRequest中超时请求
  • handleTimedOutRequests(responses, updatedNow);
  • ...
  • }
  • private void handleTimedOutRequests(List<ClientResponse> responses, long now) {
  • // 从inFlightRequests中获取请求超时的Node的集合
  • // requestTimeoutMs由timeout.ms参数决定,但该配置已废弃推荐使用request.timeout.ms配置
  • List<String> nodeIds = this.inFlightRequests.getNodesWithTimedOutRequests(now, this.requestTimeoutMs);
  • // 遍历Node关闭连接并更新状态
  • for (String nodeId : nodeIds) {
  • // close connection to the node
  • this.selector.close(nodeId);
  • log.debug("Disconnecting from node {} due to request timeout.", nodeId);
  • processDisconnection(responses, nodeId, now);
  • }
  • // we disconnected, so we should probably refresh our metadata
  • // 有超时请求,因此需要更新Metadata元数据
  • if (nodeIds.size() > 0)
  • metadataUpdater.requestUpdate();
  • }
  • // InFlightRequests类
  • // 获取超时的请求所到达的Node的Id
  • public List<String> getNodesWithTimedOutRequests(long now, int requestTimeout) {
  • List<String> nodeIds = new LinkedList<String>();
  • for (String nodeId : requests.keySet()) {
  • if (inFlightRequestCount(nodeId) > 0) {
  • ClientRequest request = requests.get(nodeId).peekLast();
  • // 获取距离发送的时间
  • long timeSinceSend = now - request.sendTimeMs();
  • // 判断是否超时
  • if (timeSinceSend > requestTimeout) {
  • nodeIds.add(nodeId);
  • }
  • }
  • }
  • return nodeIds;
  • }
  • // NetworkClient类
  • private void processDisconnection(List<ClientResponse> responses, String nodeId, long now) {
  • // 更新连接状态
  • connectionStates.disconnected(nodeId, now);
  • // 清理发送到相应Node的超时的请求并返回
  • for (ClientRequest request : this.inFlightRequests.clearAll(nodeId)) {
  • log.trace("Cancelled request {} due to node {} being disconnected", request, nodeId);
  • // 调用metadataUpdater.maybeHandleDisconnection()处理MetadataRequest
  • if (!metadataUpdater.maybeHandleDisconnection(request))
  • // 如果不是MetadataRequest,则创建ClientResponse并添加到responses集合,第三个参数为true标识不是正常响应,而是断开连接的响应
  • responses.add(new ClientResponse(request, now, true, null));
  • }
  • }

在检测到网络发生了断开,Sender线程的run(long)方法里对节点是否可用进行检查时,会在NetworkClient的ready(Node node, long )对网络进行重连,具体代码其实之前我们提到过,这里回顾一下:

  • // Sender类
  • void run(long now) {
  • // 获取Cluster集群元数据信息
  • Cluster cluster = metadata.fetch();
  • // get the list of partitions with data ready to send
  • // 获取当前集群中符合发送消息条件的数据集
  • RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
  • ...
  • // remove any nodes we aren't ready to send to
  • Iterator<Node> iter = result.readyNodes.iterator();
  • long notReadyTimeout = Long.MAX_VALUE;
  • while (iter.hasNext()) {
  • Node node = iter.next();
  • // 遍历所有readyNode节点,使用NetWorkClient的ready()方法验证这些节点是否可用
  • // 这一步的检测如果发现有Node连接断开,会进行重连操作
  • if (!this.client.ready(node, now)) {
  • // 如果节点不可用,就将其从readyNodes集合中移除
  • iter.remove();
  • notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
  • }
  • }
  • ...
  • }
  • // NetworkClient类
  • public boolean ready(Node node, long now) {
  • if (node.isEmpty())
  • throw new IllegalArgumentException("Cannot connect to empty node " + node);
  • // 检查是否可以向一个Node发送请求
  • if (isReady(node, now))
  • return true;
  • // 如果不能发送请求,则尝试建立连接
  • if (connectionStates.canConnect(node.idString(), now))
  • // if we are interested in sending to a node and we don't have a connection to it, initiate one
  • // 发起连接
  • initiateConnect(node, now);
  • return false;
  • }