Kafka系列 05 - 生产者源码分析 01:元数据更新
发布于 / 2019-03-11
简介:KafkaProducer对元数据的管理和操作,是先行于数据发送环节的。元数据的完整、正确与否决定了是数据发送先决条件。KafkaProducer对元数据的更新操作与消息数据的发送操作虽然都需要经过网络I/O,但二者的实现略有差异
1. 简介
本文将以KafkaProducer发送消息的过程为例,介绍KafkaProducer的工作原理和源码构成。
KafkaProducer发送消息的大致流程是:处理拦截器操作 -> 等待更新集群元数据 -> 序列化键和值 -> 分区 -> 将消息添加到RecordAccumulator缓冲 -> 唤醒Sender线程发送缓冲中的消息。可以发现,其中元数据的更新是先行于真正的消息发送的,因此本文将先介绍元数据更新的详细流程。
2. 集群数据的组织
KafkaProducer如果需要发送消息数据到集群中,首先需要清楚集群的基本情况,包括某个Topic中有哪几个分区,每个分区的Leader副本分配在哪个节点上,Follower副本分配在哪些节点上,哪些副本在ISR集合中以及这些节点的网络地址、端口等信息。在org.apache.kafka.common包中,使用Node、TopicPartition、PartitionInfo这三个类封装了Kafka集群的相关元数据。这些元数据保存在了Cluster这个类中,并按照不同的映射方式进行存放,方便查询。KafkaProducer通过更新机制得到的Metadata元数据记录了大量集群相关的信息,都以这几个类进行组织和存储。我们先了解一下这些类,便于后面的分析。
2.1. Node类
Node类表示集群中的一个节点,Node记录这个节点的host
、ip
、port
等信息。该类的关键属性如下:
2.2. TopicPartition类
TopicPartition类表示某Topic的一个分区,其中的topic
字段是Topic的名称,partition
字段则是此分区在Topic中的分区编号(ID)。该类的关键属性如下:
2.3. PartitionInfo类
PartitionInfo表示一个分区的详细信息。其中topic
字段和partition
字段的含义与TopicPartition中的相同,除此之外,leader
字段记录了Leader副本所在的节点,replica
字段记录了全部副本所在的节点,inSyncReplicas
字段记录了In-Sync副本集合(即ISR集合)中所有副本所在的节点。该类的关键属性如下:
2.4. Cluster类
Cluster类统一保存了上面的三类信息,同时根据相应的关系做好了映射,并且提供了大量便捷操作方法。
值得注意的一点是,Node、TopicPartition、PartitionInfo、Cluster的所有字段都是private final
修饰的,且只提供了查询方法,并未提供任何修改方法,这就保证了这四个类的对象都是不可变的线程安全的对象:
2.5. Metadata类
Metadata类在内部引用了一个Cluster实例,并记录了大量与更新原数操作相关的属性:
3. KafkaProducer的构建过程
下面是构建一个简单的KafkaProducer的通用方式:
- // 生产者需要的配置信息
- Properties properties = new Properties();
- // broker列表
- properties.put("bootstrap.servers", "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");
- // 键序列化器
- properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- // 值序列化器
- properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- // 根据配置信息创建生产者
- KafkaProducer<String, String> kafkaProducer = kafkaProducer = new KafkaProducer<>(properties);
这份代码中只为KafkaProducer指定了bootstrap.servers
、key.serializer
和value.serializer
三个必要的配置。我们看一下这里用到的KafkaProducer的构造方法:
从源码我们可以得知,在创建KafkaProducer时会读取大量开发者指定的配置(虽然我们只指定了三项),如果开发者没有提供就使用默认的配置。同时,有以下几个关键的点需要注意:
- 如果开发者没有指定生产者Client的名称,将会使用
producer-自增序列
。 - 主要需要注意的对象有四个:Metadata对象、RecordAccumulator对象、NetworkClient对象和Sender对象在创建KafkaProducer的过程中被创建了。
- Sender是一个实现了Runnable接口的类,它会单独运行于
ioThread
线程中。
4. Sender线程类
Sender线程主要负责统筹数据发送业务,但真正实现通信过程的类是NetworkClient;我们先分析一下创建和启动Sender线程的代码:
对于Sender线程来说,start()
方法启动线程后会调用它的run()
方法,源码如下:
我们需要关注的是最后部分的代码,client.send(request, now)
负责使用client
客户端(此处使用的是NetworkClient,后面会讲解)将请求发送出去,这一点我们后面再讲解;最后的this.client.poll(pollTimeout, now)
会调用NetworkClient以NIO方式来处理数据的收发工作,它的代码如下:
在每次poll操作时,都会调用metadataUpdater.maybeUpdate(now)
操作判断是否需要更新元数据,如果有需要会构建更新请求。但需要注意的是,在初始创建KafkaProducer时,这里的maybeUpdate(long)
内部判断并不能满足更新元数据的条件。
5. 元数据更新
元数据更新是在KafkaProducer发送数据时进行的,由于它的过程的原理与消息发送某些细节比较相似,我们这里先对该过程进行详细介绍。在KafkaProducer的doSend(ProducerRecord<K, V>, Callback)
方法中,真正发送消息之前会进行元数据更新,源码如下:
KafkaProducer的waitOnMetadata(String topic, long maxWaitMs)
方法主要负责进行元数据更新,它的源码如下:
从上面的源码可知,当在Metadata对象引用的Cluster对象中无法获取到指定Topic的分区信息时,会进入while循环体;while循环体内首先会唤醒Sender线程,然后调用Metadata对象的awaitUpdate(final int lastVersion, final long maxWaitMs)
方法更新元数据,需要注意的是此时传入的lastVersion
参数是Metadata对象保存的当前的版本号;该方法源码如下:
从上面代码可以看出,当满足下面2个条件时KafkaProducer主线程就会一直wait等待直到超时:
while (metadata.fetch().partitionsForTopic(topic) == null)
:元数据中无法得到指定主题的分区信息;while (this.version <= lastVersion)
:Metadata的版本号没有被更新。
KafkaProducer主线程进入wait等待状态后,notify唤醒操作会在Sender线程更新Metadata后执行。
还记得前面我们讲解过的Sender线程吗?最终它的run(long)
方法会调用到NetworkClient的poll(long, long)
方法,在该方法中首先会调用metadataUpdater.maybeUpdate(now)
检测是否符合更新元数据的条件,此时这个方法内部会进行元数据更新的请求。
5.1. 更新请求的封装
NetworkClient的poll(long, long)
方法中使用的metadataUpdater
对象类型是MetadataUpdater,NetworkClient的构造方法中默认使用DefaultMetadataUpdater类型构建它,该类是NetworkClient内部类,它的maybeUpdate(long)
方法源码如下:
其中,查找负载最小的Node的方法leastLoadedNode(long)
由NetworkClient实现:
在maybeUpdate(long)
方法中会根据传入的时间参数判断是否需要更新元数据,如果满足更新条件,则会从Cluster对象的所有Node节点对象中选取一个负载最小的节点,将更新元数据的请求发送给该节点;在真正发送更新元数据请求的过程中,因为目前并没有与任何一个broker连接,所以会首先调用initiateConnect(node, now)
尝试连接,然后在下一次的poll过程中才会真正地发送Metadata更新请求。
更新Metadata元数据的请求是一个ClientRequest对象,内部通过将MetadataRequest的结构转换为RequestSend对象对MetadataRequest进行封装,最终通过NetworkClient的doSend(clientRequest, now)
方法将ClientRequest对象添加到inFlightRequests
队列中,然后等待下次poll操作中会将其发出;需要注意的是,此处同时会将此待发送的ClientRequest对象内部的RequestSend对象使用KafkaChannel的send
字段进行记录:
doSend(ClientRequest, long)
使用this.inFlightRequests
的是InFlightRequests类,它的add(ClientRequest)
方法如下:
doSend(ClientRequest, long)
使用selector
的是org.apache.kafka.common.network.Selector类,它的send(Send)
方法如下:
上述方法使用了KafkaChannel的setSend(Send)
方法
inFlightRequests
队列的主要作用是缓存了已经发出去但没收到响应的ClientRequest,它的类型是InFlightRequests,其底层是通过一个Map
InFlightRequests类还提供了canSendMore()
方法用于判断是否可以向指定Node发送请求的条件之一,底层也是通过NodeId来取得相应的Deque队列,然后基于队列进行各种判断,其代码如下:
5.2. 更新请求的发送
做完了对Metadata元数据更新的请求封装之后,在NetworkClient类的poll(long timeout, long now)
方法中会使用org.apache.kafka.common.network.Selector对象的poll(long)
方法来处理数据的发送,回顾这部分代码:
Kafka内进行网络IO通信使用的其实是JDK NIO包,但Kafka对其进行了封装,提供了org.apache.kafka.common.network.Selector,内部拥有一个java.nio.channels.Selector类型的成员属性nioSelector
。上面用到Selector的poll(long)
方法源码如下:
上面的代码是典型的通过NIO Selector方式进行网络IO操作的过程,我们先关注发送数据的部分:
其中使用了KafkaChannel的write()
方法:
这里调用的send.writeTo(transportLayer)
其实是ByteBufferSend的writeTo(GatheringByteChannel)
方法:
```java shownum=”0” toolsbar=’{“title”: “org.apache.kafka.common.network.ByteBufferSend#writeTo”, “global”: true}
// 需要注意的继承关系:RequestSend -继承-> NetworkSend -继承-> ByteBufferSend -实现-> Send
// 因此上面的RequestSend调用的writeTo()方法其实继承自ByteBufferSend类,
// 传入的channel是TransportLayer类型对象,这个后面会讲解
public long writeTo(GatheringByteChannel channel) throws IOException {
// 使用Channel写出数据
long written = channel.write(buffers);
if (written < 0)
throw new EOFException(“Wrote negative bytes to channel. This shouldn’t happen.”);
remaining -= written;
// This is temporary workaround. As Send , Receive interfaces are being used by BlockingChannel.
// Once BlockingChannel is removed we can make Send, Receive to work with transportLayer rather than
// GatheringByteChannel or ScatteringByteChannel.
if (channel instanceof TransportLayer)
pending = ((TransportLayer) channel).hasPendingWrites();
- return written;
}
``
该部分代码的作用是:当select操作得到的SelectionKey代表的是可写事件,使用KafkaChannel对象的
write()`方法将之前关联在KafkaChannel对象上的RequestSend对象写出到网络Channel中;需要注意的是,这里底层用到通道其实是TransportLayer类型的,它封装了SocketChannel和SelectionKey,根据网络协议的不同提供不同的子类,对KafkaChannel提供统一的接口,具体实现类有PlaintextTransportLayer和SslTransportLayer,这是策略模式的运用。
5.3. 更新请求的响应处理
请求的响应数据首先会在Selector的pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected)
方法中接收并处理:
java shownum="0" toolsbar='{"title": "org.apache.kafka.common.network.Selector", "global": true}'
/* if channel is ready read from any connections that have readable data */
if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
// Channel可读,处理OP_READ事件
NetworkReceive networkReceive;
// 循环接收,直到1个response完全接收到,才会从while循环退出
while ((networkReceive = channel.read()) != null)
/**
* 读取信息并将读到的NetworkReceive添加到stagedReceives集合中保存
* 若读取到一个完整的NetworkReceive,则将其添加到stagedReceives集合中保存
* 若读取不到一个完整的NetworkReceive,则返回null,下次处理OP_READ事件时,
* 继续读取,知道读取到一个完整的NetworkReceive
*/
addToStagedReceives(channel, networkReceive);
}
private void addToStagedReceives(KafkaChannel channel, NetworkReceive receive) {
// 如果stagedReceives中还没有与指定KafkaChannel对应的ArrayDeque,就创建一个新的
if (!stagedReceives.containsKey(channel))
stagedReceives.put(channel, new ArrayDeque<NetworkReceive>());
// 获取stagedReceives中指定KafkaChannel对应的Deque
Deque<NetworkReceive> deque = stagedReceives.get(channel);
// 将接收的NetworkReceive对象添加到该Deque中
deque.add(receive);
}
这里我们需要注意的是,在接收响应数据时,可能需要读取多次才能接收完毕,这里的做法是将每个KafkaChannel实例读取的数据所构成的NetworkReceive对象存放在一个DequestagedReceives
中。在完成一次poll操作后,会通过addToCompletedReceives()
方法将statedReceives
中的NetworkReceives数据复制到completedReceives
集合中,回顾源码:
而在NetworkClient的poll(long timeout, long now)
方法后面的代码则会在每次poll过程中尝试解析返回的响应数据:
上面的handleCompletedReceives(List<ClientResponse>, long)
中用到了NetworkClient内部类DefaultMetadataUpdater的maybeHandleCompletedReceive(ClientRequest, long, Struct)
方法:
- // NetworkClient$DefaultMetadataUpdater类
- public boolean maybeHandleCompletedReceive(ClientRequest req, long now, Struct body) {
- short apiKey = req.request().header().apiKey();
- // 检测是否是MetadataRequest请求的响应
- if (apiKey == ApiKeys.METADATA.id && req.isInitiatedByNetworkClient()) {
- // 处理请求响应
- handleResponse(req.request().header(), body, now);
- return true;
- }
- return false;
- }
这里我们需要注意的是handleCompletedReceives(List<ClientResponse> responses, long now)
方法中解析响应的过程,下面是源码:
在这个过程中会从Selector对象的completeReceives
集合中遍历NetworkReceive对象,然后根据NetworkReceive对象的source
字段从inFlightRequests
取出对应的ClientRequest,哪为什么可以通过source
字段来取出对应的ClientRequest呢?我们回顾一下前面发送请求的流程,NetworkClient在将请求对象ClientRequest交给KafkaChannel时,是会将ClientRequest添加到inFlightRequest
队列中的,回顾源码:
- // NetworkClient类
- private void doSend(ClientRequest request, long now) {
- request.setSendTimeMs(now);
- // 将请求添加到inFlightRequest队列中等待响应
- this.inFlightRequests.add(request);
- // 将请求交由KafkaChannel处理
- selector.send(request.request());
- }
- // InFlightRequests类
- public void add(ClientRequest request) {
- /**
- * 以ClientRequest对象中封装的RequestSend的destination为键从requests中获取对应的Deque
- * 这个destination的值其实是当时发送请求的目的Node节点的id,可以回顾下面的方法:
- * {@link NetworkClient.DefaultMetadataUpdater#maybeUpdate}
- * requests的结构是Map<String, Deque<ClientRequest>>
- */
- Deque<ClientRequest> reqs = this.requests.get(request.request().destination());
- // 如果没有获取到就创建一个
- if (reqs == null) {
- reqs = new ArrayDeque<>();
- this.requests.put(request.request().destination(), reqs);
- }
- // 将ClientRequest对象添加到对应的Deque中
- reqs.addFirst(request);
- }
而NetworkReceives中的source
字段也标识了响应来源Node的NodeId,因此通过这两个字段的比较,自然能够找到对应的ClientRequest了。
在获取到对应的ClientRequest之后,会使用NetworkReceives的载荷和ClientRequest的请求头信息来解析响应数据,最终通过DefaultMetadataUpdater的maybeHandleCompletedReceive(ClientRequest request, long now, Struct body)
来解析响应数据,该方法会判断响应是否是更新集群元数据的响应,如果是响应数据就交给handleResponse(RequestHeader header, Struct body, long now)
方法处理,如果不是会放弃处理并返回false,上层的handleCompletedReceives(List<ClientResponse> responses, long now)
方法就会将响应添加到一个List
5.4. 集群元数据的解析和更新
集群元数据的响应数据最终的解析和更新最终由handleResponse(RequestHeader header, Struct body, long now)
方法和Metadata类的update(Cluster cluster, long now)
方法负责完成;我们先查看handleResponse(RequestHeader header, Struct body, long now)
方的源码:
该方法首先将metadataFetchInProgress
字段置为了false,表明此时已经没有元数据更新请求了,然后通过传入请求体参数给MetadataResponse类的构造方法来创建一个MetadataResponse对象,该类的构造方法中有大量的解析响应数据的操作,会从响应数据中读取Broker、Controller、Topic及Partition等信息,最终其cluster()
方法会将这些信息统一由一个Cluster对象进行管理。这里另外需要注意的是,最终判断元数据是否更新成功的依据是得到的响应信息中Node节点的数量是否大于0,如果判定更新成功,最后会将Cluster对象交给Metadata类的update(Cluster cluster, long now)
方法以更新现有的集群元数据:
update(Cluster cluster, long now)
方法前面已经讲解过了,它会更新Metadata的相关字段、将更新事件和得到的包含了新的集群元数据的Cluster对象通知给所有的监听器,然后以替换的方式更新内部的Cluster对象完成集群元数据的更新。
5.5. 元数据更新机制
上面我们对元数据的更新流程进行了详细的讲解,这个过程是针对于初次发送消息数据时,由于本地还没有任何集群元数据而触发的更新。除了这一种情况,Metadata还会周期性进行更新,同时如果发现元数据失效,也会进行强制更新。
5.5.1. 周期性更新
我们已经知道,在Sender线程每次进行poll操作时,会调用NetworkClient的poll(long timeout, long now)
方法,该方法内部会通过DefaultMetadataUpdater对象的maybeUpdate(long)
检测是否需要更新,以达到周期更新的效果,回顾一下这部分的源码:
其中第一行的timeToNextMetadataUpdate
是由Metadata的timeToNextUpdate(long)
获取的:
5.5.2. 失效时更新
Metadata提供了一个requestUpdate()
方法,用于将Metadata的needUpdate
字段置为true,以实现强制更新,当然了,强制更新仅针对于周期更新而言,从上面的timeToNextUpdate(long nowMs)
方法可以看出,将needUpdate
字段置为true只是省掉了判断上次更新时间距离当前时间是否已经超过了指定的元数据过期时间阈值这个步骤,真正的更新操作依旧需要遵循退避时间、是否已经发送了请求请求等约束。
失效更新里面更重要的细节是,如何检测Metadata是否失效。KafkaProducer将这个操作分散在多种情况下:
- Sender线程在发送准备发送数据之前,如果有未知的Leader分区存在,会触发强制更新:
- 初始化Node节点连接时,即
initiateConnect(Node node, long now)
方法,该方法在检测元数据是否可用、检测Node节点是否可用时都有可能被调用;当其尝试连接Node节点时,如果发生异常,就会认为元数据已经失效,进而触发强制更新:
- 在NetworkClient的poll操作中,会处理与Kafka服务端断掉的连接,此时如果发现有连接断掉了,就会认为元数据已经失效,进而触发强制更新:
- 同样的,在NetworkClient的poll操作中,会处理超时的连接,此时如果发现有连接超时了,就会认为元数据已经失效,进而触发强制更新:
- 当接收到包含InvalidMetadataException错误信息的响应数据时,也会触发强制更新:
KafkaProducer对元数据的管理和操作,是先行于数据发送环节的。元数据的完整、正确与否决定了是数据发送先决条件。KafkaProducer对元数据的更新操作与消息数据的发送操作虽然都需要经过网络I/O,但二者的实现略有差异,主要表现在以下几点:
- 元数据更新请求被包装为MetadataRequest对象;而消息发送请求会被包装为ProduceRequest对象。不过最终这两个对象都会被ClientRequest再包装一次。
- 元数据的发送是直接发送的;而消息数据需要暂存于RecordAccumulator的RecordBatch缓存中,批量发送。
- 元数据更新请求的响应是会被转换为MetadataResponse对象,由DefaultMetadataUpdater对象来处理;而消息数据发送的响应会被转换为ClientResponse,由NetworkClient来处理。二者以请求头标记来区分类型。
在后面第二篇分析KafkaProducer的文章中会详细分析消息发送的流程,读者到时候可以注意观察二者之间的差别。
推荐阅读
Java多线程 46 - ScheduledThreadPoolExecutor详解(2)
ScheduledThreadPoolExecutor用于执行周期性或延时性的定时任务,它是在ThreadPoolExe...