1. 概述
回顾上一节的内容我们知道,当KafkaController初始化启动或触发Leader选举时,都会调用其onControllerFailover()
方法,而该方法中向Zookeeper中的特定路径注册了大量的监听器,回顾源码:
在本文中将对这些监听器进行一一讲解。
2. BrokerChangeListener
ReplicaStateMachine状态机的registerListeners()
方法中会注册BrokerChangeListener监听器,它会监听/brokers/ids
节点下的子节点变化,主要负责处理Broker的上线和故障下线。当Broker上线时会在/brokers/ids
下创建临时节点,下线时会删除对应的临时节点;BrokerChangeListener是ReplicaStateMachine的内部类,实现了IZkChildListener接口,源码如下:
BrokerChangeListener只有一个handleChildChange(...)
一个方法,该方法也是处理/brokers/ids
节点下的子节点变化的主要方法;当出现Broker节点的上线和下线时,handleChildChange(...)
会维护ControllerContext实例的brokerStateInfo
字段,维护其对应的ControllerBrokerStateInfo状态标识对象;涉及到的ControllerContext类的相关方法在前面已经讲过了,这里不再赘述。这里需要关注的是KafkaController的onBrokerStartup(...)
和onBrokerFailure(...)
两个方法。
2.1. Broker的上线
其中onBrokerStartup(...)
方法是在Broker上线时被执行的,源码如下:
该方法的主要流程有以下五步:
- 向集群中所有Broker发送UpdateMetadataRequest,内容为所有可用的Broker的ID信息,通过此请求,集群中所有Broker可以了解到新添加的Broker信息。
- 将新增的Broker上的副本转换为OnlineReplica状态,涉及发送LeaderAndIsrRequest请求。
- 将当前KafkaController上所有处于NewPartition和OfflinePartition状态的分区转换为OnlinePartition状态,触发Leader副本选举。
- 检查所有需要进行副本重分配的分区,如果分配给这个分区的新AR集合中,有某个副本存在于新的Broker节点上,就将该分区过滤出来,对其进行副本重分配操作。
- 如果新增Broker上有属于待删除Topic的副本,就唤醒DeleteTopicsThread线程进行删除。
2.2. Broker因故障下线
onBrokerFailure(...)
方法用于处理Broker出现故障下线的情况,源码如下:
该方法的主要流程有以下六步:
- 首先从出现故障的Broker集合中移除正在正常关闭的Broker。
- 过滤得到Leader副本存在于故障Broker上的分区,将它们的状态转换为OfflinePartition。
- 再次将第2步中的分区的状态转换为OnlinePartition状态,这将触发新Leader副本的选举操作(使用OfflinePartitionLeaderSelector选举器),并向可用的Broker发送LeaderAndIsrRequest和UpdateMetaRequest以更新相关信息。
- 将故障Broker上的副本转换为OfflineReplica状态,并将其从对应分区的ISR集合中删除。
- 检查故障Broker上是否有待删除的主题,如果有则将其标记为不可删除。
- 向所有可用的Broker发送UpdateMetadataRequest请求以更新元数据。
以一个例子来讲解onBrokerStartup(...)
和onBrokerFailure(...)
两个方法的流程;假设现在有Broker-0、Broker-1、Broker-2三个Broker,上面分别分配了三个分区,分区的副本因子都为3,正常的情况下,分配如下:
- Topic: topic-1 Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 2,1,0
- Topic: topic-1 Partition: 1 Leader: 2 Replicas: 2,0,1 Isr: 1,2,0
- Topic: topic-1 Partition: 2 Leader: 0 Replicas: 0,1,2 Isr: 2,0,1
当Broker-0发生故障下线时,ZooKeeper中的/brokers/ids/0
临时节点会被删除,并触发BrokerChangeListener进行处理。首先,将Broker-0从ControllerContext记录的可用Broker列表中删除。由于Partition-2的Leader副本在Broker-0上,因此需要将Partition-2转换为OfflinePartition状态,紧接着再将其转换成OnlinePartition状态,此时会使用OfflinePartitionLeaderSelector为其选举新的Leader副本和ISR集合并更新到ZooKeeper中,随后向可用的Broker发送LeaderAndIsrRequest和UpdateMetaRequest以更新相关信息。之后,将Broker-0上的三个副本转换成OfflineReplica,并将其从对应分区的ISR集合删除,此时会发送StopReplicaRequest(不删除副本)、LeaderAndIsrRequest和UpdateMetaRequest更新可用Broker的MetadataCache;最终得到的分配结果如下:
- Topic: topic-1 Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 2,1 // ISR [2,1,0] -> [2,1]
- Topic: topic-1 Partition: 1 Leader: 2 Replicas: 2,0,1 Isr: 1,2 // ISR [1,2,0] -> [1,2]
- # Partition-2的Leader副本从Broker-0重新选举为Broker-2
- Topic: topic-1 Partition: 2 Leader: 2 Replicas: 0,1,2 Isr: 2,1 // ISR [2,0,1] -> [2,1]
- 向集群中所有Broker发送UpdateMetadataRequest,内容为所有可用的Broker的ID信息,通过此请求,集群中所有Broker可以了解到新添加的Broker信息。
- 将新增的Broker上的副本转换为OnlineReplica状态,涉及发送LeaderAndIsrRequest请求。
- 将当前KafkaController上所有处于NewPartition和OfflinePartition状态的分区转换为OnlinePartition状态。
- 检查所有需要进行副本重分配的分区,如果分配给这个分区的新AR集合中,有某个副本存在于新的Broker节点上,就将该分区过滤出来,对其进行副本重分配操作。
- 如果新增Broker上有属于待删除Topic的副本,就唤醒DeleteTopicsThread线程进行删除。
当Broker-0恢复上线时,首先会向集群中当前所有的Broker发送UpdateMetadataRequest请求以告知Broker-0的信息;然后讲Broker-0上之前三个分区的副本都转换为OnlineReplica状态;但由于三个分区都处于OnlinePartition状态,不存在下线的分区,因此不会触发分区的Leader副本选举,只会向Broker-0上的分区发送LeaderAndIsrRequest使其成为Follower,并向可用Broker发送UpdateMetadataRequest更新MetadataCache信息;最终得到的分配结果如下:
- Topic: topic-1 Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 2,1,0 // ISR [2,1,0] -> [2,1,0]
- Topic: topic-1 Partition: 1 Leader: 2 Replicas: 2,0,1 Isr: 1,2,0 // ISR [1,2,0] -> [1,2,0]
- # Partition-2的Leader副本依旧为Broker-2
- Topic: topic-1 Partition: 2 Leader: 2 Replicas: 0,1,2 Isr: 2,1,0 // ISR [2,0,1] -> [2,1,0]
对于BrokerChangeListener我们需要理解的是,BrokerChangeListener的handleChildChange(...)
只会维护Broker上下线导致的数据变更,对于Broker的上线与下线涉及到的分区、副本的状态转换以及新Leader副本的选举其实都委托给了KafkaController的相关方法来处理。
2.3. Broker的正常关闭
某些时候,Broker的下线可能是由管理员主动触发的,Kafka中提供了Controlled Shutdown的方式来关闭一个Broker实例,该方式有两个好处:
- 可以让日志文件完全同步到磁盘上,在Broker下次重现上线时不需要进行Log的恢复操作;
- Controller Shutdown方式在关闭Broker之前,会对其上的Leader副本进行迁移,以减少分区的不可用时间。
在Kafka的main()
方法中加入了JVM关闭钩子操作,实现了以Controller Shutdown方式正常关闭Broker:
KafkaController的shutdownBroker(id: Int)
方法是处理ControlledShutdownRequest请求的核心方法,该方法会使用ControlledShutdownLeaderSelector选举器重新选择Leader副本和ISR集合,实现Leader副本的迁移,该方法源码如下:
3. TopicChangeListener
TopicChangeListener负责管理Topic的增删(实际上只处理了主题的增加,在处理主题删除时只维护了元数据记录,具体的删除操作交给了DeleteTopicsListener监听器),它监听Zookeeper的/brokers/topics
节点的子节点变化,它是PartitionStateMachine的内部类,实现了IZkChildListener接口,源码如下:
TopicChangeListener的handleChildChange(...)
方法会在Zookeeper的/brokers/topics
节点的子节点的发生变化被调用,传入的children
即为新的子节点,表示新的主题列表。handleChildChange(...)
方法主要分为以下三步:
- 会计算得到新添加的主题和即将被删除的主题,并更新ControllerContext的
allTopics
字段。 - 从Zookeeper的
/brokers/topics/[topic_name]
路径加载新增主题的分区信息和对应的AR集合信息,以此来更新ControllerContext的partitionReplicaAssignment
集合中记录的副本信息。 - 通过KafkaController的
onNewTopicCreation(...)
方法处理新增的主题,主要是进行Leader副本、ISR集合的初始化,更新其它Broker的元数据、Zookeeper内的数据等操作。
注:删除主题的操作将由TopicDeletionManager负责,后面会讲解。
其中第三步涉及到的KafkaController的onNewTopicCreation(...)
方法源码如下:
KafkaController的onNewTopicCreation(...)
方法会为每个新增主题注册PartitionModificationsListener监听器,并且调用onNewPartitionCreation(...)
方法完成新增主题的分区状态及副本状态的转换,主要是将分区转换为NewPartition后再转换为OnlinePartition,将副本转换为NewReplica后再转换为OnlineReplica。在分区状态从NewPartition转换为OnlinePartition时会初始化对应的Leader副本和ISR集合,该操作由PartitionStateMachine的initializeLeaderAndIsrForPartition(...)
方法实现,前面有讲解。最终会将分区副本的分配结果写入ZooKeeper,并向所有可用Broker发送LeaderAndIsrRequest来指导副本的角色切换,然后向所有可用Broker发送UpdateMetadataRequest来更新其MetadataCache。
4. DeleteTopicsListener
DeleteTopicsListener会监听ZooKeeper中/admin/delete_topics
节点下的子节点变化,当管理员执行了删除主题的命令后,会在该路径下添加需要被删除的主题,DeleteTopicsListener会被触发,它会将该待删除的主题交给TopicDeletionManager处理。DeleteTopicsListener也是PartitionStateMachine的内部类,实现了IZkChildListener接口,源码如下:
DeleteTopicsListener的handleChildChange(parentPath : String, children : java.util.List[String])
是用于处理/admin/delete_topics
节点的子节点变化的主要方法,其children
参数记录了需要被删除的主题,该方法的实现主要分为以下几步:
- 从Zookeeper的
/admin/delete_topics
节点中获取需要删除的主题的集合,与本地ControllerContext进行对比,过滤掉不存在的主题,直接将其从/admin/delete_topics
节点下删除。 - 遍历所有待删除的主题,检查主题中是否有分区正在进行“优先副本”选举或正在进行副本重分配操作,如果有则将该主题标记为不可删除(由TopicDeletionManager的
markTopicIneligibleForDeletion(...)
方法实现)。 - 将符合删除条件的主题存放到TopicDeletionManager的
topicsToBeDeleted
集合中,并将这些主题的分区存放到TopicDeletionManager的partitionsToBeDeleted
集合中,然后唤醒对应的DeleteTopicThread线程(由TopicDeletionManager的enqueueTopicsForDeletion(...)
方法实现)。
可见,在主题删除的操作中,TopicDeletionManager和DeleteTopicsThread是执行主要功能的两个类,下面就来了解这两个类的实现。
4.1. TopicDeletionManager
TopicDeletionManager用于管理待删除的主题以及不可删除的主题,它的定义和重要字段如下:
可以见,TopicDeletionManager实例中维护了topicsToBeDeleted
、partitionsToBeDeleted
和topicsIneligibleForDeletion
三个集合分别记录待删除和不可删除的主题等信息。另外,通过读取配置delete.topic.enable
来决定是否开启主题删除功能,isDeleteTopicEnabled
的值会决定是否启动DeleteTopicsThread线程。
另外需要注意的是,当一个主题满足下列三种情况之一时不能被删除:
- 如果Topic中的任一分区正在重新分配副本,则此Topic不能被删除。
- 如果Topic中的任一分区正在进行“优先副本”选举,则此Topic不能被删除。
- 如果Topic中的任一分区的任一副本所在的Broker宕机,则此Topic不能被删除。
每个KafkaController中都存在一个TopicDeletionManager实例,TopicDeletionManager的初始化过程如下:
initializeTopicDeletion()
方法是在KafkaController用于初始化ControllerContext的方法initializeControllerContext()
时被调用的,同时在KafkaController的onControllerFailover()
方法中会调用deleteTopicManager
的start()
方法以启动TopicDeletionManager;大家可以回顾上一篇文章中的相关内容。
TopicDeletionManager的start()
方法的源码如下:
可见,该方法会根据isDeleteTopicEnabled
(由delete.topic.enable
参数配置)来决定是否实例化并开启DeleteTopicsThread线程,接下来我们分析一下DeleteTopicsThread的实现。
4.2. DeleteTopicsThread
DeleteTopicsThread是真正执行删除主题操作的线程,它是TopicDeletionManager的内部类,继承了ShutdownableThread,因此我们需要关注其入口方法doWork()
,它的整体实现也只有这一个方法,源码如下:
DeleteTopicsThread的doWork()
方法的主要流程如下:
第一步:awaitTopicDeletionNotification()
方法属于TopicDeletionManager类,它会阻塞等待直到有其他线程将其唤醒,这里使用的是ReentrantLock锁的条件队列,与该方法对应的是resumeTopicDeletionThread()
唤醒方法,这两个方法源码如下:
第二步:检查DeleteTopicsThread是否在运行,检查TopicDeletionManager的topicsToBeDeleted
集合是否为空,不为空则表示有待删除的主题。
第三步:遍历待删除的主题,分别进行下面的操作:
- 首先需要判断主题的副本是否都删除了,会分为三种情况进行判断:
- 全部副本都被删除了,此时会调用
completeDeleteTopic(...)
完成主题的删除,这一步后面讲解; - 至少已经有一个副本进入了ReplicaDeletionStarted状态,说明此时副本的删除已经开始了,不做任何操作,继续等待;
- 如果没有副本进入ReplicaDeletionStarted状态,且有副本处于ReplicaDeletionIneligible状态,则调用TopicDeletionManager的
markTopicForDeletionRetry(...)
方法将这些处于ReplicaDeletionIneligible状态的副本转换为OfflineReplica后重试。
这里使用了ReplicaStateMachine的三个判断方法,源码如下:
TopicDeletionManager的markTopicForDeletionRetry(...)
方法的源码如下:
- 调用TopicDeletionManager的
isTopicEligibleForDeletion(...)
方法,判断主题是否可被删除,如果可以被删除,则调用TopicDeletionManager的onTopicDeletion(...)
方法执行删除流程,主要是删除主题的副本;isTopicEligibleForDeletion(...)
方法源码如下:
- 删除主题的副本,这个操作中如果某个副本删除失败,则会将该副本转换为ReplicaDeletionIneligible状态,并标记此副本所在的主题为不可删除;该过程由TopicDeletionManager的
onTopicDeletion(...)
方法开启,源码如下:
onTopicDeletion(...)
方法会获取待删除主题的所有分区,向这些分区所在的Broker发送UpdateMetadataRequest请求,注意在此过程中构造的LeaderAndIsr对象的leader
字段为LeaderAndIsr.LeaderDuringDelete(值为-2),通知它们指定的主题要被删除,并删除MetadataCache中与此主题相关的缓存信息;onPartitionDeletion(...)
方法用于删除主题的分区,它内部获取了分区的副本,并调用startReplicaDeletion(...)
方法进行处理:
在startReplicaDeletion(...)
中调用ReplicaStateMachine的handleStateChanges(...)
方法时传入了deleteTopicStopReplicaCallback(...)
回调函数,它的源码如下:
deleteTopicStopReplicaCallback(...)
回调函数中会调用failReplicaDeletion(...)
方法处理异常副本;如果StopReplicaResponse中的错误码表示出现异常,则将副本状态转换为ReplicaDeletionIneligible,并标记此副本所在主题不可删除,也就是将主题添加到topicsIneligibleForDeletion
队列,最后唤醒DeleteTopicsThread线程:
而completeReplicaDeletion(...)
方法处理返回正常StopReplicaResponse的副本,它会将副本状态转换为ReplicaDeletionSuccessful,并唤醒DeleteTopicsThread线程,源码如下:
回到第三步的第1小步中,当待删除主题的所有副本都被成功删除之后(状态都为ReplicaDeletionSuccessful),doWork()
方法会调用completeDeleteTopic(...)
完成主题的删除,该操作会处理状态切换、删除存储在Zookeeper中及本地缓存的信息。completeDeleteTopic(...)
属于TopicDeletionManager类,它的源码如下:
最终完成删除操作主要是移除对该主题的监听器、将该主题的所有状态为ReplicaDeletionSuccessful的副本转换为NonExistentReplica、所有分区转换为NonExistentPartition状态,并从本地缓存以及Zookeeper中移除主题的相关信息。
5. IsrChangeNotificationListener
分区的Follower副本会与Leader副本会进行消息同步,当Follower副本追上Leader副本时会被添加到ISR集合中,当Follower副本与Leader副本滞后太多时会被移出ISR集合。Leader副本不仅会在ISR集合变化时将其记录到ZooKeeper中,还会调用ReplicaManager的recordIsrChange(...)
方法记录到isrChangeSet
集合中,之后通过isr-change-propagation
定时任务将该集合中周期性地写入到ZooKeeper的/isr_change_notification
路径下。KafkaController中定义的IsrChangeNotificationListener用于监听此路径下的子节点变化,当某些分区的ISR集合变化时通知整个集群中的所有Broker;源码如下:
6. PartitionModificationsListener
PartitionModificationsListener会监听/brokers/topics/[topic_name]
节点中的数据变化,主要用于监听一个主题的分区的变化。在KafkaController启动或触发Leader选举时调用的onControllerFailover(...)
方法、以及新增主题时调用的KafkaController.onNewTopicCreation(...)
方法中都会为每个主题注册一个PartitionModificationsListener,而在成功删除主题之后在TopicDeletionManager的completeDeleteTopic(...)
中会将注册的PartitionModificationsListener删除。该监听器是PartitionStateMachine的内部类,实现了IZkDataListener接口,源码如下:
注:需要注意的是,PartitionModificationsListener并不对分区的删除进行处理,因为Topic的分区数量是不能减少的。
当发现存在主题的分区的新增操作时,handleDataChange(...)
会调用KafkaController的onNewPartitionCreation(...)
方法进行处理,该方法在讲解TopicChangeListener时完整分析过,这个理不再赘述。
7. PreferredReplicaElectionListener
PreferredReplicaElectionListener负责监听ZooKeeper的/admin/preferred_replica_election
节点。当通过kafka-preferred-replica-election.sh脚本命令启动某些分区的“优先副本”选举操作时,会将指定分区的信息写入该节点,以触发PreferredReplicaElectionListener进行处理。进行“优先副本”选举的目的是让分区的“优先副本”重新成为Leader副本,这是为了让Leader副本在整个集群中分布得更加均衡。PreferredReplicaElectionListener的源码如下:
该监听器的handleDataChange(...)
调用的onPreferredReplicaElection(...)
方法即是负责“优先副本”选举的主要方法,它在上一篇文章的2.2.2.6节处理优先副本选举中完整讲解过,并给出了具体示例,读者可以自行回顾。
8. PartitionsReassignedListener
PartitionsReassignedListener监听ZooKeeper的/admin/reassign_partitions
节点。当通过kafka-reassign-partitions.sh脚本命令指定某些分区需要重新分配副本时,会将指定分区的信息写入该节点,从而触发PartitionsReassignedListener进行处理。该监听器的源码如下:
PartitionsReassignedListener的handleDataChange(...)
会在监听到ZooKeeper的/admin/reassign_partitions
节点发生变化时被调用,最终执行副本重新分配的操作其实交给了KafkaController的initiateReassignReplicasForTopicPartition(topicAndPartition: TopicAndPartition, reassignedPartitionContext: ReassignedPartitionsContext)
方法,该方法在上一篇文章Kafka系列 20 - 服务端源码分析 11:KafkaController相关的Zookeeper监听器的“2.2.2.5 处理副本重分配”章节中完整讲解过,读者可以自行回顾。
9. ReassignedPartitionsIsrChangeListener
ReassignedPartitionsIsrChangeListener监听器是在分区副本重新分配的过程中被注册到ZooKeeper中/broker/topics/[topic_name]/partitions/[partitionId]/state
分区所对应的节点上的,回顾这部分代码:
它主要负责处理进行副本重新分配的分区的ISR集合变化,源码如下:
当ReassignedPartitionsIsrChangeListener监听到分区的ISR集合发生变化时,按照下列步骤进行处理:
- 检查当前分区是否正在进行副本的重新分配操作,若不是,则结束。
- 从ZooKeeper中读取当前分区的Leader和ISR集合。
- 如果新AR集合中的副本已完全进入当前ISR集合,则调用KafkaController的
onPartitionReassignment(...)
方法完成副本重分配的相关操作。 - 否则,输出日志后结束,等待下一次触发。
推荐阅读
Java多线程 46 - ScheduledThreadPoolExecutor详解(2)
ScheduledThreadPoolExecutor用于执行周期性或延时性的定时任务,它是在ThreadPoolExe...