大数据
流式处理
Kafka
源码解析

Kafka系列 21 - 服务端源码分析 12:其他的Zookeeper监听器

简介:主要讲解Broker、Topic、Partition相关的监听器

1. 概述

回顾上一节的内容我们知道,当KafkaController初始化启动或触发Leader选举时,都会调用其onControllerFailover()方法,而该方法中向Zookeeper中的特定路径注册了大量的监听器,回顾源码:

kafka.controller.KafkaController#onControllerFailover
  • ...
  • // 注册一系列Zookeeper监听器
  • // 注册PartitionsReassignedListener
  • registerReassignedPartitionsListener()
  • // 注册IsrChangeNotificationListener
  • registerIsrChangeNotificationListener()
  • // 注册PreferredReplicaElectionListener
  • registerPreferredReplicaElectionListener()
  • // 注册TopicChangeListener、DeleteTopicsListener
  • partitionStateMachine.registerListeners()
  • // 注册BrokerChangeListener
  • replicaStateMachine.registerListeners()
  • ...
  • // register the partition change listeners for all existing topics on failover
  • // 为所有Topic注册PartitionModificationsListener
  • controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
  • ...

在本文中将对这些监听器进行一一讲解。

2. BrokerChangeListener

ReplicaStateMachine状态机的registerListeners()方法中会注册BrokerChangeListener监听器,它会监听/brokers/ids节点下的子节点变化,主要负责处理Broker的上线和故障下线。当Broker上线时会在/brokers/ids下创建临时节点,下线时会删除对应的临时节点;BrokerChangeListener是ReplicaStateMachine的内部类,实现了IZkChildListener接口,源码如下:

kafka.controller.ReplicaStateMachine.BrokerChangeListener
  • /**
  • * This is the zookeeper listener that triggers all the state transitions for a replica
  • * 唯一的Zookeeper Listener,监听/brokers/ids节点下的子节点变化,主要负责处理Broker的上线和故障下线。
  • * 当Broker上线时会在/brokers/ids下创建临时节点,下线时会删除对应的临时节点。
  • */
  • class BrokerChangeListener() extends IZkChildListener with Logging {
  • this.logIdent = "[BrokerChangeListener on Controller " + controller.config.brokerId + "]: "
  • def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) {
  • info("Broker change listener fired for path %s with children %s".format(parentPath, currentBrokerList.sorted.mkString(",")))
  • inLock(controllerContext.controllerLock) {
  • if (hasStarted.get) { // 需要在状态机启动的情况下执行
  • ControllerStats.leaderElectionTimer.time { // 计时操作
  • try {
  • // 获取Zookeeper中的Broker列表、得到BrokerID集合
  • val curBrokers = currentBrokerList.map(_.toInt).toSet.flatMap(zkUtils.getBrokerInfo)
  • val curBrokerIds = curBrokers.map(_.id)
  • // ControllerContext中记录的当前可用Broker的ID集合
  • val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
  • // 过滤得到新增的Broker列表
  • val newBrokerIds = curBrokerIds -- liveOrShuttingDownBrokerIds
  • // 过滤得到出现故障的Broker的ID集合
  • val deadBrokerIds = liveOrShuttingDownBrokerIds -- curBrokerIds
  • val newBrokers = curBrokers.filter(broker => newBrokerIds(broker.id))
  • // 更新ControllerContext中记录的可用Broker列表
  • controllerContext.liveBrokers = curBrokers
  • // 对新增Broker的列表、出现故障Broker的列表以及Zookeeper中的Broker列表进行排序
  • val newBrokerIdsSorted = newBrokerIds.toSeq.sorted
  • val deadBrokerIdsSorted = deadBrokerIds.toSeq.sorted
  • val liveBrokerIdsSorted = curBrokerIds.toSeq.sorted
  • info("Newly added brokers: %s, deleted brokers: %s, all live brokers: %s"
  • .format(newBrokerIdsSorted.mkString(","), deadBrokerIdsSorted.mkString(","), liveBrokerIdsSorted.mkString(",")))
  • // 创建Controller和新增Broker的网络连接
  • newBrokers.foreach(controllerContext.controllerChannelManager.addBroker)
  • // 关闭Controller和出现故障的Broker的网络连接
  • deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker)
  • // 调用onBrokerStartup()方法和onBrokerFailure()方法分别处理新增Broker和下线Broker
  • if(newBrokerIds.size > 0)
  • controller.onBrokerStartup(newBrokerIdsSorted)
  • if(deadBrokerIds.size > 0)
  • controller.onBrokerFailure(deadBrokerIdsSorted)
  • } catch {
  • case e: Throwable => error("Error while handling broker changes", e)
  • }
  • }
  • }
  • }
  • }
  • }

BrokerChangeListener只有一个handleChildChange(...)一个方法,该方法也是处理/brokers/ids节点下的子节点变化的主要方法;当出现Broker节点的上线和下线时,handleChildChange(...)会维护ControllerContext实例的brokerStateInfo字段,维护其对应的ControllerBrokerStateInfo状态标识对象;涉及到的ControllerContext类的相关方法在前面已经讲过了,这里不再赘述。这里需要关注的是KafkaController的onBrokerStartup(...)onBrokerFailure(...)两个方法。

2.1. Broker的上线

其中onBrokerStartup(...)方法是在Broker上线时被执行的,源码如下:

kafka.controller.KafkaController#onBrokerStartup
  • /**
  • * This callback is invoked by the replica state machine's broker change listener, with the list of newly started
  • * brokers as input. It does the following -
  • * 1. Sends update metadata request to all live and shutting down brokers
  • * 2. Triggers the OnlinePartition state change for all new/offline partitions
  • * 3. It checks whether there are reassigned replicas assigned to any newly started brokers. If
  • * so, it performs the reassignment logic for each topic/partition.
  • *
  • * Note that we don't need to refresh the leader/isr cache for all topic/partitions at this point for two reasons:
  • * 1. The partition state machine, when triggering online state change, will refresh leader and ISR for only those
  • * partitions currently new or offline (rather than every partition this controller is aware of)
  • * 2. Even if we do refresh the cache, there is no guarantee that by the time the leader and ISR request reaches
  • * every broker that it is still valid. Brokers check the leader epoch to determine validity of the request.
  • */
  • def onBrokerStartup(newBrokers: Seq[Int]) {
  • info("New broker startup callback for %s".format(newBrokers.mkString(",")))
  • val newBrokersSet = newBrokers.toSet
  • // send update metadata request to all live and shutting down brokers. Old brokers will get to know of the new
  • // broker via this update.
  • // In cases of controlled shutdown leaders will not be elected when a new broker comes up. So at least in the
  • // common controlled shutdown case, the metadata will reach the new brokers faster
  • /**
  • * 向集群中所有Broker发送UpdateMetadataRequest,发送的是所有可用的Broker的ID信息
  • * 通过此请求,集群中所有Broker可以了解到新添加的Broker信息
  • */
  • sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
  • // the very first thing to do when a new broker comes up is send it the entire list of partitions that it is
  • // supposed to host. Based on that the broker starts the high watermark threads for the input list of partitions
  • // 将新增Broker上的副本转换为OnlineReplica状态,此步骤会涉及发送LeaderAndIsrRequest
  • val allReplicasOnNewBrokers = controllerContext.replicasOnBrokers(newBrokersSet)
  • replicaStateMachine.handleStateChanges(allReplicasOnNewBrokers, OnlineReplica)
  • // when a new broker comes up, the controller needs to trigger leader election for all new and offline partitions
  • // to see if these brokers can become leaders for some/all of those
  • // 将NewPartition和OfflinePartition状态的分区转换为OnlinePartition状态
  • partitionStateMachine.triggerOnlinePartitionStateChange()
  • // check if reassignment of some partitions need to be restarted
  • // 检测进行副本重新分配
  • val partitionsWithReplicasOnNewBrokers = controllerContext.partitionsBeingReassigned.filter {
  • case (topicAndPartition, reassignmentContext) => reassignmentContext.newReplicas.exists(newBrokersSet.contains(_))
  • }
  • partitionsWithReplicasOnNewBrokers.foreach(p => onPartitionReassignment(p._1, p._2))
  • // check if topic deletion needs to be resumed. If at least one replica that belongs to the topic being deleted exists
  • // on the newly restarted brokers, there is a chance that topic deletion can resume
  • // 如果新增Broker上有待删除的Topic的副本,则唤醒DeleteTopicsThread线程进行删除
  • val replicasForTopicsToBeDeleted = allReplicasOnNewBrokers.filter(p => deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
  • if(replicasForTopicsToBeDeleted.size > 0) {
  • info(("Some replicas %s for topics scheduled for deletion %s are on the newly restarted brokers %s. " +
  • "Signaling restart of topic deletion for these topics").format(replicasForTopicsToBeDeleted.mkString(","),
  • deleteTopicManager.topicsToBeDeleted.mkString(","), newBrokers.mkString(",")))
  • deleteTopicManager.resumeDeletionForTopics(replicasForTopicsToBeDeleted.map(_.topic))
  • }
  • }

该方法的主要流程有以下五步:

  1. 向集群中所有Broker发送UpdateMetadataRequest,内容为所有可用的Broker的ID信息,通过此请求,集群中所有Broker可以了解到新添加的Broker信息。
  2. 将新增的Broker上的副本转换为OnlineReplica状态,涉及发送LeaderAndIsrRequest请求。
  3. 将当前KafkaController上所有处于NewPartition和OfflinePartition状态的分区转换为OnlinePartition状态,触发Leader副本选举。
  4. 检查所有需要进行副本重分配的分区,如果分配给这个分区的新AR集合中,有某个副本存在于新的Broker节点上,就将该分区过滤出来,对其进行副本重分配操作。
  5. 如果新增Broker上有属于待删除Topic的副本,就唤醒DeleteTopicsThread线程进行删除。

2.2. Broker因故障下线

onBrokerFailure(...)方法用于处理Broker出现故障下线的情况,源码如下:

kafka.controller.KafkaController#onBrokerFailure
  • /**
  • * This callback is invoked by the replica state machine's broker change listener with the list of failed brokers
  • * as input. It does the following -
  • * 1. Mark partitions with dead leaders as offline
  • * 2. Triggers the OnlinePartition state change for all new/offline partitions
  • * 3. Invokes the OfflineReplica state change on the input list of newly started brokers
  • * 4. If no partitions are effected then send UpdateMetadataRequest to live or shutting down brokers
  • *
  • * Note that we don't need to refresh the leader/isr cache for all topic/partitions at this point. This is because
  • * the partition state machine will refresh our cache for us when performing leader election for all new/offline
  • * partitions coming online.
  • */
  • def onBrokerFailure(deadBrokers: Seq[Int]) {
  • info("Broker failure callback for %s".format(deadBrokers.mkString(",")))
  • // 将正在正常关闭的Broker从deadbroker列表中移除
  • val deadBrokersThatWereShuttingDown =
  • deadBrokers.filter(id => controllerContext.shuttingDownBrokerIds.remove(id))
  • info("Removed %s from list of shutting down brokers.".format(deadBrokersThatWereShuttingDown))
  • val deadBrokersSet = deadBrokers.toSet
  • // trigger OfflinePartition state for all partitions whose current leader is one amongst the dead brokers
  • // 过滤得到Leader副本在故障Broker上的分区,将其转换为OfflinePartition状态
  • val partitionsWithoutLeader = controllerContext.partitionLeadershipInfo.filter(partitionAndLeader =>
  • deadBrokersSet.contains(partitionAndLeader._2.leaderAndIsr.leader) &&
  • !deleteTopicManager.isTopicQueuedUpForDeletion(partitionAndLeader._1.topic)).keySet
  • partitionStateMachine.handleStateChanges(partitionsWithoutLeader, OfflinePartition)
  • // trigger OnlinePartition state changes for offline or new partitions
  • // 将OfflinePartition状态的分区转换为OnlinePartition状态
  • partitionStateMachine.triggerOnlinePartitionStateChange()
  • // filter out the replicas that belong to topics that are being deleted
  • // 过滤得到在故障Broker上的副本,将这些副本转换为OfflinePartition状态
  • var allReplicasOnDeadBrokers = controllerContext.replicasOnBrokers(deadBrokersSet)
  • val activeReplicasOnDeadBrokers = allReplicasOnDeadBrokers.filterNot(p => deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
  • // handle dead replicas
  • replicaStateMachine.handleStateChanges(activeReplicasOnDeadBrokers, OfflineReplica)
  • // check if topic deletion state for the dead replicas needs to be updated
  • // 检查故障Broker上是否有待删除Topic的副本,如果存在则将其转换为ReplicaDeletionIneligible状态并标记Topic不可删除
  • val replicasForTopicsToBeDeleted = allReplicasOnDeadBrokers.filter(p => deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
  • if(replicasForTopicsToBeDeleted.size > 0) {
  • // it is required to mark the respective replicas in TopicDeletionFailed state since the replica cannot be
  • // deleted when the broker is down. This will prevent the replica from being in TopicDeletionStarted state indefinitely
  • // since topic deletion cannot be retried until at least one replica is in TopicDeletionStarted state
  • deleteTopicManager.failReplicaDeletion(replicasForTopicsToBeDeleted)
  • }
  • // If broker failure did not require leader re-election, inform brokers of failed broker
  • // Note that during leader re-election, brokers update their metadata
  • // 发送UpdateMetadataRequest更新所有Broker的信息
  • if (partitionsWithoutLeader.isEmpty) {
  • sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
  • }
  • }

该方法的主要流程有以下六步:

  1. 首先从出现故障的Broker集合中移除正在正常关闭的Broker。
  2. 过滤得到Leader副本存在于故障Broker上的分区,将它们的状态转换为OfflinePartition。
  3. 再次将第2步中的分区的状态转换为OnlinePartition状态,这将触发新Leader副本的选举操作(使用OfflinePartitionLeaderSelector选举器),并向可用的Broker发送LeaderAndIsrRequest和UpdateMetaRequest以更新相关信息。
  4. 将故障Broker上的副本转换为OfflineReplica状态,并将其从对应分区的ISR集合中删除。
  5. 检查故障Broker上是否有待删除的主题,如果有则将其标记为不可删除。
  6. 向所有可用的Broker发送UpdateMetadataRequest请求以更新元数据。

以一个例子来讲解onBrokerStartup(...)onBrokerFailure(...)两个方法的流程;假设现在有Broker-0、Broker-1、Broker-2三个Broker,上面分别分配了三个分区,分区的副本因子都为3,正常的情况下,分配如下:

  • Topic: topic-1 Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 2,1,0
  • Topic: topic-1 Partition: 1 Leader: 2 Replicas: 2,0,1 Isr: 1,2,0
  • Topic: topic-1 Partition: 2 Leader: 0 Replicas: 0,1,2 Isr: 2,0,1

当Broker-0发生故障下线时,ZooKeeper中的/brokers/ids/0临时节点会被删除,并触发BrokerChangeListener进行处理。首先,将Broker-0从ControllerContext记录的可用Broker列表中删除。由于Partition-2的Leader副本在Broker-0上,因此需要将Partition-2转换为OfflinePartition状态,紧接着再将其转换成OnlinePartition状态,此时会使用OfflinePartitionLeaderSelector为其选举新的Leader副本和ISR集合并更新到ZooKeeper中,随后向可用的Broker发送LeaderAndIsrRequest和UpdateMetaRequest以更新相关信息。之后,将Broker-0上的三个副本转换成OfflineReplica,并将其从对应分区的ISR集合删除,此时会发送StopReplicaRequest(不删除副本)、LeaderAndIsrRequest和UpdateMetaRequest更新可用Broker的MetadataCache;最终得到的分配结果如下:

  • Topic: topic-1 Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 2,1 // ISR [2,1,0] -> [2,1]
  • Topic: topic-1 Partition: 1 Leader: 2 Replicas: 2,0,1 Isr: 1,2 // ISR [1,2,0] -> [1,2]
  • # Partition-2的Leader副本从Broker-0重新选举为Broker-2
  • Topic: topic-1 Partition: 2 Leader: 2 Replicas: 0,1,2 Isr: 2,1 // ISR [2,0,1] -> [2,1]
  1. 向集群中所有Broker发送UpdateMetadataRequest,内容为所有可用的Broker的ID信息,通过此请求,集群中所有Broker可以了解到新添加的Broker信息。
  2. 将新增的Broker上的副本转换为OnlineReplica状态,涉及发送LeaderAndIsrRequest请求。
  3. 将当前KafkaController上所有处于NewPartition和OfflinePartition状态的分区转换为OnlinePartition状态。
  4. 检查所有需要进行副本重分配的分区,如果分配给这个分区的新AR集合中,有某个副本存在于新的Broker节点上,就将该分区过滤出来,对其进行副本重分配操作。
  5. 如果新增Broker上有属于待删除Topic的副本,就唤醒DeleteTopicsThread线程进行删除。

当Broker-0恢复上线时,首先会向集群中当前所有的Broker发送UpdateMetadataRequest请求以告知Broker-0的信息;然后讲Broker-0上之前三个分区的副本都转换为OnlineReplica状态;但由于三个分区都处于OnlinePartition状态,不存在下线的分区,因此不会触发分区的Leader副本选举,只会向Broker-0上的分区发送LeaderAndIsrRequest使其成为Follower,并向可用Broker发送UpdateMetadataRequest更新MetadataCache信息;最终得到的分配结果如下:

  • Topic: topic-1 Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 2,1,0 // ISR [2,1,0] -> [2,1,0]
  • Topic: topic-1 Partition: 1 Leader: 2 Replicas: 2,0,1 Isr: 1,2,0 // ISR [1,2,0] -> [1,2,0]
  • # Partition-2的Leader副本依旧为Broker-2
  • Topic: topic-1 Partition: 2 Leader: 2 Replicas: 0,1,2 Isr: 2,1,0 // ISR [2,0,1] -> [2,1,0]

对于BrokerChangeListener我们需要理解的是,BrokerChangeListener的handleChildChange(...)只会维护Broker上下线导致的数据变更,对于Broker的上线与下线涉及到的分区、副本的状态转换以及新Leader副本的选举其实都委托给了KafkaController的相关方法来处理。

2.3. Broker的正常关闭

某些时候,Broker的下线可能是由管理员主动触发的,Kafka中提供了Controlled Shutdown的方式来关闭一个Broker实例,该方式有两个好处:

  1. 可以让日志文件完全同步到磁盘上,在Broker下次重现上线时不需要进行Log的恢复操作;
  2. Controller Shutdown方式在关闭Broker之前,会对其上的Leader副本进行迁移,以减少分区的不可用时间。

在Kafka的main()方法中加入了JVM关闭钩子操作,实现了以Controller Shutdown方式正常关闭Broker:

kafka.Kafka#main
  • def main(args: Array[String]): Unit = {
  • try {
  • val serverProps = getPropsFromArgs(args)
  • val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
  • // attach shutdown handler to catch control-c
  • // 虚拟机钩子
  • Runtime.getRuntime().addShutdownHook(new Thread() {
  • override def run() = {
  • // 关闭KafkaController,该方法会调用controlledShutdown()方法向Controller Leader发送ControlledShutdownRequest
  • kafkaServerStartable.shutdown
  • }
  • })
  • kafkaServerStartable.startup
  • kafkaServerStartable.awaitShutdown
  • }
  • catch {
  • case e: Throwable =>
  • fatal(e)
  • System.exit(1)
  • }
  • System.exit(0)
  • }

KafkaController的shutdownBroker(id: Int)方法是处理ControlledShutdownRequest请求的核心方法,该方法会使用ControlledShutdownLeaderSelector选举器重新选择Leader副本和ISR集合,实现Leader副本的迁移,该方法源码如下:

kafka.controller.KafkaController#shutdownBroker
  • /**
  • * On clean shutdown, the controller first determines the partitions that the
  • * shutting down broker leads, and moves leadership of those partitions to another broker
  • * that is in that partition's ISR.
  • *
  • * @param id Id of the broker to shutdown.
  • * @return The number of partitions that the broker still leads.
  • */
  • def shutdownBroker(id: Int) : Set[TopicAndPartition] = {
  • // 检查Controller是否存活
  • if (!isActive()) {
  • throw new ControllerMovedException("Controller moved to another broker. Aborting controlled shutdown")
  • }
  • controllerContext.brokerShutdownLock synchronized { // 加锁
  • info("Shutting down broker " + id)
  • inLock(controllerContext.controllerLock) {
  • // 检查Broker是否存活
  • if (!controllerContext.liveOrShuttingDownBrokerIds.contains(id))
  • throw new BrokerNotAvailableException("Broker id %d does not exist.".format(id))
  • // 将BrokerID添加到ControllerContext的shuttingDownBrokerIds中
  • controllerContext.shuttingDownBrokerIds.add(id)
  • debug("All shutting down brokers: " + controllerContext.shuttingDownBrokerIds.mkString(","))
  • debug("Live brokers: " + controllerContext.liveBrokerIds.mkString(","))
  • }
  • // 获取待关闭Broker上所有的Partition和副本信息
  • val allPartitionsAndReplicationFactorOnBroker: Set[(TopicAndPartition, Int)] =
  • inLock(controllerContext.controllerLock) {
  • controllerContext.partitionsOnBroker(id) // 所有Partition
  • .map(topicAndPartition => (topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition).size))
  • }
  • allPartitionsAndReplicationFactorOnBroker.foreach { // 遍历待关闭的Broker上的分区
  • case(topicAndPartition, replicationFactor) =>
  • // Move leadership serially to relinquish lock.
  • inLock(controllerContext.controllerLock) {
  • // 获取分区的Leader副本所在Broker的ID、ISR集合、年代信息
  • controllerContext.partitionLeadershipInfo.get(topicAndPartition).foreach { currLeaderIsrAndControllerEpoch =>
  • if (replicationFactor > 1) { // 检查是否开启副本机制
  • if (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id) {
  • // If the broker leads the topic partition, transition the leader and update isr. Updates zk and
  • // notifies all affected brokers
  • /**
  • * 将相关的分区切换为OnlinePartition状态,使用ControlledShutdownLeaderSelector重新选择Leader和ISR集合
  • * 并将结果写入ZooKeeper,之后发送LeaderAndIsrRequest和UpdateMetadataRequest
  • */
  • partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition,
  • controlledShutdownPartitionLeaderSelector)
  • } else {
  • // Stop the replica first. The state change below initiates ZK changes which should take some time
  • // before which the stop replica request should be completed (in most cases)
  • try {
  • // 发送StopReplicaRequest(不删除副本)
  • brokerRequestBatch.newBatch()
  • brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topicAndPartition.topic,
  • topicAndPartition.partition, deletePartition = false)
  • brokerRequestBatch.sendRequestsToBrokers(epoch)
  • } catch {
  • case e : IllegalStateException => {
  • // Resign if the controller is in an illegal state
  • error("Forcing the controller to resign")
  • brokerRequestBatch.clear()
  • controllerElector.resign()
  • throw e
  • }
  • }
  • // If the broker is a follower, updates the isr in ZK and notifies the current leader
  • // 将副本转换为OfflineReplica状态
  • replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic,
  • topicAndPartition.partition, id)), OfflineReplica)
  • }
  • }
  • }
  • }
  • }
  • // 该方法用于统计Leader副本依然处于待关闭Broker上的分区
  • def replicatedPartitionsBrokerLeads() = inLock(controllerContext.controllerLock) {
  • trace("All leaders = " + controllerContext.partitionLeadershipInfo.mkString(","))
  • controllerContext.partitionLeadershipInfo.filter {
  • case (topicAndPartition, leaderIsrAndControllerEpoch) =>
  • leaderIsrAndControllerEpoch.leaderAndIsr.leader == id && controllerContext.partitionReplicaAssignment(topicAndPartition).size > 1
  • }.map(_._1)
  • }
  • // 统计Leader副本依然处于待关闭Broker上的分区,转换为Set后返回
  • replicatedPartitionsBrokerLeads().toSet
  • }
  • }

3. TopicChangeListener

TopicChangeListener负责管理Topic的增删(实际上只处理了主题的增加,在处理主题删除时只维护了元数据记录,具体的删除操作交给了DeleteTopicsListener监听器),它监听Zookeeper的/brokers/topics节点的子节点变化,它是PartitionStateMachine的内部类,实现了IZkChildListener接口,源码如下:

kafka.controller.PartitionStateMachine.TopicChangeListener
  • /**
  • * This is the zookeeper listener that triggers all the state transitions for a partition
  • * TopicChangeListener负责管理Topic的增删,它监听/brokers/topics节点的子节点的变化
  • */
  • class TopicChangeListener extends IZkChildListener with Logging {
  • this.logIdent = "[TopicChangeListener on Controller " + controller.config.brokerId + "]: "
  • @throws(classOf[Exception])
  • def handleChildChange(parentPath : String, children : java.util.List[String]) {
  • inLock(controllerContext.controllerLock) {
  • if (hasStarted.get) {
  • try {
  • val currentChildren = { // 获取/brokers/topics下的子节点集合
  • import JavaConversions._
  • debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(",")))
  • (children: Buffer[String]).toSet
  • }
  • // 过滤,得到新添加的Topic
  • val newTopics = currentChildren -- controllerContext.allTopics
  • // 过滤,得到删除的Topic
  • val deletedTopics = controllerContext.allTopics -- currentChildren
  • // 更新ControllerContext的allTopics集合
  • controllerContext.allTopics = currentChildren
  • // 从Zookeeper的/brokers/topics/[topic_name]路径加载新增Topic的分区信息和AR集合信息
  • val addedPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(newTopics.toSeq)
  • // 更新ControllerContext的partitionReplicaAssignment记录的AR集合
  • // 去掉删除主题的副本,添加新增主题的副本
  • controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
  • !deletedTopics.contains(p._1.topic))
  • controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment)
  • info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics,
  • deletedTopics, addedPartitionReplicaAssignment))
  • // 调用KafkaController的onNewTopicCreation()处理新增Topic
  • if(newTopics.size > 0)
  • controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet.toSet)
  • } catch {
  • case e: Throwable => error("Error while handling new topic", e )
  • }
  • }
  • }
  • }
  • }

TopicChangeListener的handleChildChange(...)方法会在Zookeeper的/brokers/topics节点的子节点的发生变化被调用,传入的children即为新的子节点,表示新的主题列表。handleChildChange(...)方法主要分为以下三步:

  1. 会计算得到新添加的主题和即将被删除的主题,并更新ControllerContext的allTopics字段。
  2. 从Zookeeper的/brokers/topics/[topic_name]路径加载新增主题的分区信息和对应的AR集合信息,以此来更新ControllerContext的partitionReplicaAssignment集合中记录的副本信息。
  3. 通过KafkaController的onNewTopicCreation(...)方法处理新增的主题,主要是进行Leader副本、ISR集合的初始化,更新其它Broker的元数据、Zookeeper内的数据等操作。

注:删除主题的操作将由TopicDeletionManager负责,后面会讲解。

其中第三步涉及到的KafkaController的onNewTopicCreation(...)方法源码如下:

kafka.controller.KafkaController
  • /**
  • * This callback is invoked by the partition state machine's topic change listener with the list of new topics
  • * and partitions as input. It does the following -
  • * 1. Registers partition change listener. This is not required until KAFKA-347
  • * 2. Invokes the new partition callback
  • * 3. Send metadata request with the new topic to all brokers so they allow requests for that topic to be served
  • */
  • def onNewTopicCreation(topics: Set[String], newPartitions: Set[TopicAndPartition]) {
  • info("New topic creation callback for %s".format(newPartitions.mkString(",")))
  • // subscribe to partition changes
  • // 为每个新增的Topic注册PartitionModificationsListener监听器
  • topics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
  • // 完成新增Topic的分区状态及副本状态的转换
  • onNewPartitionCreation(newPartitions)
  • }
  • /**
  • * This callback is invoked by the topic change callback with the list of failed brokers as input.
  • * It does the following -
  • * 1. Move the newly created partitions to the NewPartition state
  • * 2. Move the newly created partitions from NewPartition->OnlinePartition state
  • */
  • def onNewPartitionCreation(newPartitions: Set[TopicAndPartition]) {
  • info("New partition creation callback for %s".format(newPartitions.mkString(",")))
  • // 将所有指定的新增分区转换为NewPartition状态
  • partitionStateMachine.handleStateChanges(newPartitions, NewPartition)
  • // 将指定分区的所有副本转换为NewReplica状态
  • replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), NewReplica)
  • // 将所有指定的新增分区转换为OnlinePartition状态
  • partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition, offlinePartitionSelector)
  • // 将指定分区的所有副本转换为OnlineReplica状态
  • replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), OnlineReplica)
  • }

KafkaController的onNewTopicCreation(...)方法会为每个新增主题注册PartitionModificationsListener监听器,并且调用onNewPartitionCreation(...)方法完成新增主题的分区状态及副本状态的转换,主要是将分区转换为NewPartition后再转换为OnlinePartition,将副本转换为NewReplica后再转换为OnlineReplica。在分区状态从NewPartition转换为OnlinePartition时会初始化对应的Leader副本和ISR集合,该操作由PartitionStateMachine的initializeLeaderAndIsrForPartition(...)方法实现,前面有讲解。最终会将分区副本的分配结果写入ZooKeeper,并向所有可用Broker发送LeaderAndIsrRequest来指导副本的角色切换,然后向所有可用Broker发送UpdateMetadataRequest来更新其MetadataCache。

4. DeleteTopicsListener

DeleteTopicsListener会监听ZooKeeper中/admin/delete_topics节点下的子节点变化,当管理员执行了删除主题的命令后,会在该路径下添加需要被删除的主题,DeleteTopicsListener会被触发,它会将该待删除的主题交给TopicDeletionManager处理。DeleteTopicsListener也是PartitionStateMachine的内部类,实现了IZkChildListener接口,源码如下:

kafka.controller.PartitionStateMachine.DeleteTopicsListener
  • /**
  • * Delete topics includes the following operations -
  • * 1. Add the topic to be deleted to the delete topics cache, only if the topic exists
  • * 2. If there are topics to be deleted, it signals the delete topic thread
  • *
  • * 主题删除操作监听器
  • * 1. 当被删除的主题存在时,将其添加到记录删除主题的缓存中;
  • * 2. 当有主题被删除时,会通知DeleteTopicThread线程
  • */
  • class DeleteTopicsListener() extends IZkChildListener with Logging {
  • this.logIdent = "[DeleteTopicsListener on " + controller.config.brokerId + "]: "
  • val zkUtils = controllerContext.zkUtils
  • /**
  • * Invoked when a topic is being deleted
  • * @throws Exception On any error.
  • */
  • @throws(classOf[Exception])
  • def handleChildChange(parentPath : String, children : java.util.List[String]) {
  • inLock(controllerContext.controllerLock) {
  • // 获取待删除的主题的集合
  • var topicsToBeDeleted = {
  • import JavaConversions._
  • (children: Buffer[String]).toSet
  • }
  • debug("Delete topics listener fired for topics %s to be deleted".format(topicsToBeDeleted.mkString(",")))
  • // 过滤得到不存在的删除主题
  • val nonExistentTopics = topicsToBeDeleted.filter(t => !controllerContext.allTopics.contains(t))
  • if(nonExistentTopics.size > 0) {
  • warn("Ignoring request to delete non-existing topics " + nonExistentTopics.mkString(","))
  • // 对于不存在的主题,直接将其在/admin/delete_topics下对应的节点删除
  • nonExistentTopics.foreach(topic => zkUtils.deletePathRecursive(getDeleteTopicPath(topic)))
  • }
  • // 过滤掉不存在的待删除主题
  • topicsToBeDeleted --= nonExistentTopics
  • if(topicsToBeDeleted.size > 0) {
  • info("Starting topic deletion for topics " + topicsToBeDeleted.mkString(","))
  • // mark topic ineligible for deletion if other state changes are in progress
  • topicsToBeDeleted.foreach { topic => // 遍历所有待删除的主题,检查待删除主题是否处于不可删除的情况
  • // 第1项:检查待删除主题中是否有分区正在进行"优先副本"选举
  • val preferredReplicaElectionInProgress =
  • controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic).contains(topic)
  • // 第2项:检查待删除主题中是否有分区正在进行副本重新分配
  • val partitionReassignmentInProgress =
  • controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic)
  • if(preferredReplicaElectionInProgress || partitionReassignmentInProgress)
  • // 有1项满足,就将待删除主题标记为不可删除
  • controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
  • }
  • // add topic to deletion list
  • // 通过该方法将待删除的主题放入到topicsToBeDeleted集合,将待删除的主题的分区放入到partitionsToBeDeleted集合
  • controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted)
  • }
  • }
  • }
  • /**
  • *
  • * @throws Exception
  • * On any error.
  • */
  • @throws(classOf[Exception])
  • def handleDataDeleted(dataPath: String) {
  • }
  • }

DeleteTopicsListener的handleChildChange(parentPath : String, children : java.util.List[String])是用于处理/admin/delete_topics节点的子节点变化的主要方法,其children参数记录了需要被删除的主题,该方法的实现主要分为以下几步:

  1. 从Zookeeper的/admin/delete_topics节点中获取需要删除的主题的集合,与本地ControllerContext进行对比,过滤掉不存在的主题,直接将其从/admin/delete_topics节点下删除。
  2. 遍历所有待删除的主题,检查主题中是否有分区正在进行“优先副本”选举或正在进行副本重分配操作,如果有则将该主题标记为不可删除(由TopicDeletionManager的markTopicIneligibleForDeletion(...)方法实现)。
  3. 将符合删除条件的主题存放到TopicDeletionManager的topicsToBeDeleted集合中,并将这些主题的分区存放到TopicDeletionManager的partitionsToBeDeleted集合中,然后唤醒对应的DeleteTopicThread线程(由TopicDeletionManager的enqueueTopicsForDeletion(...)方法实现)。

可见,在主题删除的操作中,TopicDeletionManager和DeleteTopicsThread是执行主要功能的两个类,下面就来了解这两个类的实现。

4.1. TopicDeletionManager

TopicDeletionManager用于管理待删除的主题以及不可删除的主题,它的定义和重要字段如下:

kafka.controller.TopicDeletionManager
  • class TopicDeletionManager(controller: KafkaController,
  • initialTopicsToBeDeleted: Set[String] = Set.empty,
  • initialTopicsIneligibleForDeletion: Set[String] = Set.empty) extends Logging {
  • ...
  • val controllerContext = controller.controllerContext
  • // 分区状态机
  • val partitionStateMachine = controller.partitionStateMachine
  • // 副本状态机
  • val replicaStateMachine = controller.replicaStateMachine
  • // 记录将要被删除的主题集合
  • val topicsToBeDeleted: mutable.Set[String] = mutable.Set.empty[String] ++ initialTopicsToBeDeleted
  • // 记录将要被删除的分区集合
  • val partitionsToBeDeleted: mutable.Set[TopicAndPartition] = topicsToBeDeleted.flatMap(controllerContext.partitionsForTopic)
  • val deleteLock = new ReentrantLock()
  • // 记录不可被删除的主题
  • val topicsIneligibleForDeletion: mutable.Set[String] = mutable.Set.empty[String] ++
  • (initialTopicsIneligibleForDeletion & initialTopicsToBeDeleted)
  • // 用于其他线程与deleteTopicsThread删除线程同步
  • val deleteTopicsCond = deleteLock.newCondition()
  • // 用于标识Topic删除操作是否开始
  • val deleteTopicStateChanged: AtomicBoolean = new AtomicBoolean(false)
  • // 用于删除Topic的后台线程
  • var deleteTopicsThread: DeleteTopicsThread = null
  • // 从配置中读取是否开启了Topic删除功能
  • val isDeleteTopicEnabled = controller.config.deleteTopicEnable // delete.topic.enable
  • ...
  • }

可以见,TopicDeletionManager实例中维护了topicsToBeDeletedpartitionsToBeDeletedtopicsIneligibleForDeletion三个集合分别记录待删除和不可删除的主题等信息。另外,通过读取配置delete.topic.enable来决定是否开启主题删除功能,isDeleteTopicEnabled的值会决定是否启动DeleteTopicsThread线程。

另外需要注意的是,当一个主题满足下列三种情况之一时不能被删除:

  1. 如果Topic中的任一分区正在重新分配副本,则此Topic不能被删除。
  2. 如果Topic中的任一分区正在进行“优先副本”选举,则此Topic不能被删除。
  3. 如果Topic中的任一分区的任一副本所在的Broker宕机,则此Topic不能被删除。

每个KafkaController中都存在一个TopicDeletionManager实例,TopicDeletionManager的初始化过程如下:

  • class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerState: BrokerState, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
  • ...
  • // 主题删除操作管理器
  • var deleteTopicManager: TopicDeletionManager = null
  • ...
  • private def initializeTopicDeletion() {
  • // 从Zookeeper的/admin/delete_topics路径中读取需要删除的主题名称集合
  • val topicsQueuedForDeletion = zkUtils.getChildrenParentMayNotExist(ZkUtils.DeleteTopicsPath).toSet
  • // 过滤得到在不可用Broker上还存在副本的主题名称集合
  • val topicsWithReplicasOnDeadBrokers = controllerContext.partitionReplicaAssignment.filter { case(partition, replicas) =>
  • replicas.exists(r => !controllerContext.liveBrokerIds.contains(r)) }.keySet.map(_.topic)
  • // 处于优先副本选举过程中的主题
  • val topicsForWhichPreferredReplicaElectionIsInProgress = controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic)
  • // 处于副本重分配过程中的主题
  • val topicsForWhichPartitionReassignmentIsInProgress = controllerContext.partitionsBeingReassigned.keySet.map(_.topic)
  • // 上面的三类主题暂时无法被删除
  • val topicsIneligibleForDeletion = topicsWithReplicasOnDeadBrokers | topicsForWhichPartitionReassignmentIsInProgress |
  • topicsForWhichPreferredReplicaElectionIsInProgress
  • info("List of topics to be deleted: %s".format(topicsQueuedForDeletion.mkString(",")))
  • info("List of topics ineligible for deletion: %s".format(topicsIneligibleForDeletion.mkString(",")))
  • // initialize the topic deletion manager
  • // 初始化TopicDeletionManager
  • deleteTopicManager = new TopicDeletionManager(this, topicsQueuedForDeletion, topicsIneligibleForDeletion)
  • }
  • ...
  • }

initializeTopicDeletion()方法是在KafkaController用于初始化ControllerContext的方法initializeControllerContext()时被调用的,同时在KafkaController的onControllerFailover()方法中会调用deleteTopicManagerstart()方法以启动TopicDeletionManager;大家可以回顾上一篇文章中的相关内容。

TopicDeletionManager的start()方法的源码如下:

kafka.controller.TopicDeletionManager#start
  • /**
  • * Invoked at the end of new controller initiation
  • * 启动时会调用该方法进行初始化
  • */
  • def start() {
  • if (isDeleteTopicEnabled) { // 判断是否开启了Topic删除功能
  • // 创建deleteTopicsThread删除线程
  • deleteTopicsThread = new DeleteTopicsThread()
  • if (topicsToBeDeleted.size > 0)
  • // 当存在将要被删除的Topic集合时,将deleteTopicStateChanged状态置为true
  • deleteTopicStateChanged.set(true)
  • // 启动deleteTopicsThread删除线程
  • deleteTopicsThread.start()
  • }
  • }

可见,该方法会根据isDeleteTopicEnabled(由delete.topic.enable参数配置)来决定是否实例化并开启DeleteTopicsThread线程,接下来我们分析一下DeleteTopicsThread的实现。

4.2. DeleteTopicsThread

DeleteTopicsThread是真正执行删除主题操作的线程,它是TopicDeletionManager的内部类,继承了ShutdownableThread,因此我们需要关注其入口方法doWork(),它的整体实现也只有这一个方法,源码如下:

kafka.controller.TopicDeletionManager.DeleteTopicsThread
  • // 用于删除主题的线程
  • class DeleteTopicsThread() extends ShutdownableThread(name = "delete-topics-thread-" + controller.config.brokerId, isInterruptible = false) {
  • val zkUtils = controllerContext.zkUtils
  • // 主要方法
  • override def doWork() {
  • // 等待删除操作开始
  • awaitTopicDeletionNotification()
  • if (!isRunning.get) // 判断状态
  • return
  • inLock(controllerContext.controllerLock) {
  • // 收集将被删除的主题集合
  • val topicsQueuedForDeletion = Set.empty[String] ++ topicsToBeDeleted
  • if(!topicsQueuedForDeletion.isEmpty) // 将被删除的主题集合不为空
  • info("Handling deletion for topics " + topicsQueuedForDeletion.mkString(","))
  • topicsQueuedForDeletion.foreach { topic => // 遍历将要被删除的主题
  • // if all replicas are marked as deleted successfully, then topic deletion is done
  • if(controller.replicaStateMachine.areAllReplicasForTopicDeleted(topic)) { // 通过状态机判断主题的副本是否都删除了
  • // clear up all state for this topic from controller cache and zookeeper
  • // 完成主题的删除,会处理状态切换、Zookeeper相关存储及本地缓存的删除
  • completeDeleteTopic(topic)
  • info("Deletion of topic %s successfully completed".format(topic))
  • } else {
  • if(controller.replicaStateMachine.isAtLeastOneReplicaInDeletionStartedState(topic)) {
  • // 检查该主题是否存在状态为ReplicaDeletionStarted的副本,如果存在则继续等待
  • // ignore since topic deletion is in progress
  • // 日志打印
  • val replicasInDeletionStartedState = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionStarted)
  • val replicaIds = replicasInDeletionStartedState.map(_.replica)
  • val partitions = replicasInDeletionStartedState.map(r => TopicAndPartition(r.topic, r.partition))
  • info("Deletion for replicas %s for partition %s of topic %s in progress".format(replicaIds.mkString(","),
  • partitions.mkString(","), topic))
  • } else {
  • // if you come here, then no replica is in TopicDeletionStarted and all replicas are not in
  • // TopicDeletionSuccessful. That means, that either given topic haven't initiated deletion
  • // or there is at least one failed replica (which means topic deletion should be retried).
  • // 任一副本处于ReplicaDeletionIneligible状态,则重置为OfflineReplica后重试
  • if(controller.replicaStateMachine.isAnyReplicaInState(topic, ReplicaDeletionIneligible)) {
  • // mark topic for deletion retry
  • markTopicForDeletionRetry(topic)
  • }
  • }
  • }
  • // Try delete topic if it is eligible for deletion.
  • // 检测当前Topic是否可以删除
  • if(isTopicEligibleForDeletion(topic)) {
  • info("Deletion of topic %s (re)started".format(topic))
  • // topic deletion will be kicked off
  • onTopicDeletion(Set(topic)) // 开始Topic的删除操作
  • } else if(isTopicIneligibleForDeletion(topic)) {
  • info("Not retrying deletion of topic %s at this time since it is marked ineligible for deletion".format(topic))
  • }
  • }
  • }
  • }
  • }

DeleteTopicsThread的doWork()方法的主要流程如下:

第一步:awaitTopicDeletionNotification()方法属于TopicDeletionManager类,它会阻塞等待直到有其他线程将其唤醒,这里使用的是ReentrantLock锁的条件队列,与该方法对应的是resumeTopicDeletionThread()唤醒方法,这两个方法源码如下:

kafka.controller.TopicDeletionManager
  • /**
  • * Invoked by the delete-topic-thread to wait until events that either trigger, restart or halt topic deletion occur.
  • * controllerLock should be acquired before invoking this API
  • *
  • * 等待直到删除主题的操作被触发:
  • * 1. deleteTopicsThread线程正在运行;
  • * 2. deleteTopicStateChanged状态为true,且CAS修改为false成功
  • */
  • private def awaitTopicDeletionNotification() {
  • inLock(deleteLock) {
  • // deleteTopicsThread线程在运行,但无法将deleteTopicStateChanged切换为false,将等待
  • while(deleteTopicsThread.isRunning.get() && !deleteTopicStateChanged.compareAndSet(true, false)) {
  • debug("Waiting for signal to start or continue topic deletion")
  • deleteTopicsCond.await()
  • }
  • }
  • }
  • /**
  • * Signals the delete-topic-thread to process topic deletion
  • */
  • private def resumeTopicDeletionThread() {
  • deleteTopicStateChanged.set(true) // 切换状态
  • inLock(deleteLock) {
  • // 唤醒主题删除线程
  • deleteTopicsCond.signal()
  • }
  • }

第二步:检查DeleteTopicsThread是否在运行,检查TopicDeletionManager的topicsToBeDeleted集合是否为空,不为空则表示有待删除的主题。

第三步:遍历待删除的主题,分别进行下面的操作:

  1. 首先需要判断主题的副本是否都删除了,会分为三种情况进行判断:
  • 全部副本都被删除了,此时会调用completeDeleteTopic(...)完成主题的删除,这一步后面讲解;
  • 至少已经有一个副本进入了ReplicaDeletionStarted状态,说明此时副本的删除已经开始了,不做任何操作,继续等待;
  • 如果没有副本进入ReplicaDeletionStarted状态,且有副本处于ReplicaDeletionIneligible状态,则调用TopicDeletionManager的markTopicForDeletionRetry(...)方法将这些处于ReplicaDeletionIneligible状态的副本转换为OfflineReplica后重试。

这里使用了ReplicaStateMachine的三个判断方法,源码如下:

  • // 判断主题的所有副本是否都处于ReplicaDeletionSuccessful状态
  • def areAllReplicasForTopicDeleted(topic: String): Boolean = {
  • // 获取主题的副本集合
  • val replicasForTopic = controller.controllerContext.replicasForTopic(topic)
  • // 获取所有副本的状态
  • val replicaStatesForTopic = replicasForTopic.map(r => (r, replicaState(r))).toMap
  • debug("Are all replicas for topic %s deleted %s".format(topic, replicaStatesForTopic))
  • // 判断这些副本的状态是否都是ReplicaDeletionSuccessful
  • replicaStatesForTopic.forall(_._2 == ReplicaDeletionSuccessful)
  • }
  • // 判断主题是否至少有一个副本处于ReplicaDeletionStarted状态
  • def isAtLeastOneReplicaInDeletionStartedState(topic: String): Boolean = {
  • val replicasForTopic = controller.controllerContext.replicasForTopic(topic)
  • val replicaStatesForTopic = replicasForTopic.map(r => (r, replicaState(r))).toMap
  • replicaStatesForTopic.foldLeft(false)((deletionState, r) => deletionState || r._2 == ReplicaDeletionStarted)
  • }
  • // 判断主题是否有副本处于特定的状态
  • def isAnyReplicaInState(topic: String, state: ReplicaState): Boolean = {
  • replicaState.exists(r => r._1.topic.equals(topic) && r._2 == state)
  • }

TopicDeletionManager的markTopicForDeletionRetry(...)方法的源码如下:

kafka.controller.TopicDeletionManager#markTopicForDeletionRetry
  • /**
  • * If the topic is queued for deletion but deletion is not currently under progress, then deletion is retried for that topic
  • * To ensure a successful retry, reset states for respective replicas from ReplicaDeletionIneligible to OfflineReplica state
  • *@param topic Topic for which deletion should be retried
  • */
  • private def markTopicForDeletionRetry(topic: String) {
  • // reset replica states from ReplicaDeletionIneligible to OfflineReplica
  • // 过滤得到失败的副本集
  • val failedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionIneligible)
  • info("Retrying delete topic for topic %s since replicas %s were not successfully deleted"
  • .format(topic, failedReplicas.mkString(",")))
  • // 将失败的副本集都重置为OfflineReplica状态
  • controller.replicaStateMachine.handleStateChanges(failedReplicas, OfflineReplica)
  • }
  1. 调用TopicDeletionManager的isTopicEligibleForDeletion(...)方法,判断主题是否可被删除,如果可以被删除,则调用TopicDeletionManager的onTopicDeletion(...)方法执行删除流程,主要是删除主题的副本;isTopicEligibleForDeletion(...)方法源码如下:
kafka.controller.TopicDeletionManager#isTopicEligibleForDeletion
  • /**
  • * Topic deletion can be retried if -
  • * 1. Topic deletion is not already complete
  • * 2. Topic deletion is currently not in progress for that topic
  • * 3. Topic is currently marked ineligible for deletion
  • * @param topic Topic
  • * @return Whether or not deletion can be retried for the topic
  • */
  • private def isTopicEligibleForDeletion(topic: String): Boolean = {
  • /**
  • * 1. 主题是待删除主题,即主题没有完全完成删除操作;
  • * 2. 主题并不处于正在删除的过程中,即主题的删除操作还没开始;
  • * 3. 主题并不处于topicsIneligibleForDeletion集合中,即主题没有标记为不可删除。
  • */
  • topicsToBeDeleted.contains(topic) && (!isTopicDeletionInProgress(topic) && !isTopicIneligibleForDeletion(topic))
  • }
  1. 删除主题的副本,这个操作中如果某个副本删除失败,则会将该副本转换为ReplicaDeletionIneligible状态,并标记此副本所在的主题为不可删除;该过程由TopicDeletionManager的onTopicDeletion(...)方法开启,源码如下:
kafka.controller.TopicDeletionManager#onTopicDeletion
  • /**
  • * This callback is invoked by the DeleteTopics thread with the list of topics to be deleted
  • * It invokes the delete partition callback for all partitions of a topic.
  • * The updateMetadataRequest is also going to set the leader for the topics being deleted to
  • * {@link LeaderAndIsr#LeaderDuringDelete}. This lets each broker know that this topic is being deleted and can be
  • * removed from their caches.
  • */
  • private def onTopicDeletion(topics: Set[String]) {
  • info("Topic deletion callback for %s".format(topics.mkString(",")))
  • // send update metadata so that brokers stop serving data for topics to be deleted
  • // 获取指定主题的所有分区
  • val partitions = topics.flatMap(controllerContext.partitionsForTopic)
  • // 向可用Broker发送UpdateMetadataRequest请求
  • controller.sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, partitions)
  • // 按照Topic进行分组
  • val partitionReplicaAssignmentByTopic = controllerContext.partitionReplicaAssignment.groupBy(p => p._1.topic)
  • // 开始分区的删除操作
  • topics.foreach { topic =>
  • // 删除分区方法
  • onPartitionDeletion(partitionReplicaAssignmentByTopic(topic).map(_._1).toSet)
  • }
  • }

onTopicDeletion(...)方法会获取待删除主题的所有分区,向这些分区所在的Broker发送UpdateMetadataRequest请求,注意在此过程中构造的LeaderAndIsr对象的leader字段为LeaderAndIsr.LeaderDuringDelete(值为-2),通知它们指定的主题要被删除,并删除MetadataCache中与此主题相关的缓存信息;onPartitionDeletion(...)方法用于删除主题的分区,它内部获取了分区的副本,并调用startReplicaDeletion(...)方法进行处理:

kafka.controller.TopicDeletionManager
  • /**
  • * This callback is invoked by the delete topic callback with the list of partitions for topics to be deleted
  • * It does the following -
  • * 1. Send UpdateMetadataRequest to all live brokers (that are not shutting down) for partitions that are being
  • * deleted. The brokers start rejecting all client requests with UnknownTopicOrPartitionException
  • * 2. Move all replicas for the partitions to OfflineReplica state. This will send StopReplicaRequest to the replicas
  • * and LeaderAndIsrRequest to the leader with the shrunk ISR. When the leader replica itself is moved to OfflineReplica state,
  • * it will skip sending the LeaderAndIsrRequest since the leader will be updated to -1
  • * 3. Move all replicas to ReplicaDeletionStarted state. This will send StopReplicaRequest with deletePartition=true. And
  • * will delete all persistent data from all replicas of the respective partitions
  • */
  • private def onPartitionDeletion(partitionsToBeDeleted: Set[TopicAndPartition]) {
  • info("Partition deletion callback for %s".format(partitionsToBeDeleted.mkString(",")))
  • // 获取需要删除的每个分区的副本
  • val replicasPerPartition = controllerContext.replicasForPartition(partitionsToBeDeleted)
  • // 调用startReplicaDeletion()方法进行删除
  • startReplicaDeletion(replicasPerPartition)
  • }
  • /**
  • * Invoked by the onPartitionDeletion callback. It is the 2nd step of topic deletion, the first being sending
  • * UpdateMetadata requests to all brokers to start rejecting requests for deleted topics. As part of starting deletion,
  • * the topics are added to the in progress list. As long as a topic is in the in progress list, deletion for that topic
  • * is never retried. A topic is removed from the in progress list when
  • * 1. Either the topic is successfully deleted OR
  • * 2. No replica for the topic is in ReplicaDeletionStarted state and at least one replica is in ReplicaDeletionIneligible state
  • * If the topic is queued for deletion but deletion is not currently under progress, then deletion is retried for that topic
  • * As part of starting deletion, all replicas are moved to the ReplicaDeletionStarted state where the controller sends
  • * the replicas a StopReplicaRequest (delete=true)
  • * This callback does the following things -
  • * 1. Move all dead replicas directly to ReplicaDeletionIneligible state. Also mark the respective topics ineligible
  • * for deletion if some replicas are dead since it won't complete successfully anyway
  • * 2. Move all alive replicas to ReplicaDeletionStarted state so they can be deleted successfully
  • *@param replicasForTopicsToBeDeleted
  • */
  • private def startReplicaDeletion(replicasForTopicsToBeDeleted: Set[PartitionAndReplica]) {
  • replicasForTopicsToBeDeleted.groupBy(_.topic) // 根据主题进行分区
  • .foreach { case(topic, replicas) => // 遍历进行删除
  • // 获取Topic中所有可用的副本
  • var aliveReplicasForTopic = controllerContext.allLiveReplicas().filter(p => p.topic.equals(topic))
  • // 获取Topic中所有不可用的副本
  • val deadReplicasForTopic = replicasForTopicsToBeDeleted -- aliveReplicasForTopic
  • // 获取Topic中所有已经完成删除的副本
  • val successfullyDeletedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)
  • // 第一次删除(或重试)的副本集合
  • val replicasForDeletionRetry = aliveReplicasForTopic -- successfullyDeletedReplicas
  • // move dead replicas directly to failed state
  • // 将不可用副本转换为ReplicaDeletionIneligible状态
  • replicaStateMachine.handleStateChanges(deadReplicasForTopic, ReplicaDeletionIneligible)
  • // send stop replica to all followers that are not in the OfflineReplica state so they stop sending fetch requests to the leader
  • /**
  • * 将待删除的副本转换为OfflineReplica状态,
  • * 此步骤会发送StopReplicaRequest到待删除的副本(不会删除副本),
  • * 同时还会向可用的Broker发送LeaderAndIsrRequest和UpdateMetadataRequest,将副本从ISR集合中删除
  • */
  • replicaStateMachine.handleStateChanges(replicasForDeletionRetry, OfflineReplica)
  • debug("Deletion started for replicas %s".format(replicasForDeletionRetry.mkString(",")))
  • // 将待删除的副本转换为ReplicaDeletionStarted状态,此步骤会向可用副本发送StopReplicaRequest(删除副本)
  • controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry, ReplicaDeletionStarted,
  • // 注意这里使用到了deleteTopicStopReplicaCallback回调函数
  • new Callbacks.CallbackBuilder().stopReplicaCallback(deleteTopicStopReplicaCallback).build)
  • if(deadReplicasForTopic.size > 0) {
  • debug("Dead Replicas (%s) found for topic %s".format(deadReplicasForTopic.mkString(","), topic))
  • // 标记当前Topic不可删除,即添加到topicsIneligibleForDeletion集合汇总
  • markTopicIneligibleForDeletion(Set(topic))
  • }
  • }
  • }

startReplicaDeletion(...)中调用ReplicaStateMachine的handleStateChanges(...)方法时传入了deleteTopicStopReplicaCallback(...)回调函数,它的源码如下:

kafka.controller.TopicDeletionManager#deleteTopicStopReplicaCallback
  • private def deleteTopicStopReplicaCallback(stopReplicaResponseObj: AbstractRequestResponse, replicaId: Int) {
  • val stopReplicaResponse = stopReplicaResponseObj.asInstanceOf[StopReplicaResponse]
  • debug("Delete topic callback invoked for %s".format(stopReplicaResponse))
  • // 响应集合
  • val responseMap = stopReplicaResponse.responses.asScala
  • // 整理响应中携带的每个分区的错误
  • val partitionsInError =
  • if (stopReplicaResponse.errorCode != Errors.NONE.code) responseMap.keySet
  • else responseMap.filter { case (_, error) => error != Errors.NONE.code }.map(_._1).toSet
  • // 整理得到每个副本的错误
  • val replicasInError = partitionsInError.map(p => PartitionAndReplica(p.topic, p.partition, replicaId))
  • inLock(controllerContext.controllerLock) {
  • // move all the failed replicas to ReplicaDeletionIneligible
  • // 使用failReplicaDeletion()方法处理响应中有错误信息的副本
  • failReplicaDeletion(replicasInError)
  • // 使用completeReplicaDeletion()方法处理响应中正常的副本
  • if (replicasInError.size != responseMap.size) {
  • // some replicas could have been successfully deleted
  • val deletedReplicas = responseMap.keySet -- partitionsInError
  • completeReplicaDeletion(deletedReplicas.map(p => PartitionAndReplica(p.topic, p.partition, replicaId)))
  • }
  • }
  • }

deleteTopicStopReplicaCallback(...)回调函数中会调用failReplicaDeletion(...)方法处理异常副本;如果StopReplicaResponse中的错误码表示出现异常,则将副本状态转换为ReplicaDeletionIneligible,并标记此副本所在主题不可删除,也就是将主题添加到topicsIneligibleForDeletion队列,最后唤醒DeleteTopicsThread线程:

kafka.controller.TopicDeletionManager#failReplicaDeletion
  • /**
  • * Invoked when a broker that hosts replicas for topics to be deleted goes down. Also invoked when the callback for
  • * StopReplicaResponse receives an error code for the replicas of a topic to be deleted. As part of this, the replicas
  • * are moved from ReplicaDeletionStarted to ReplicaDeletionIneligible state. Also, the topic is added to the list of topics
  • * ineligible for deletion until further notice. The delete topic thread is notified so it can retry topic deletion
  • * if it has received a response for all replicas of a topic to be deleted
  • * @param replicas Replicas for which deletion has failed
  • */
  • def failReplicaDeletion(replicas: Set[PartitionAndReplica]) {
  • if(isDeleteTopicEnabled) { // 检查是否开启了主题删除
  • // 过滤出正在等待删除的主题的副本
  • val replicasThatFailedToDelete = replicas.filter(r => isTopicQueuedUpForDeletion(r.topic))
  • if(replicasThatFailedToDelete.size > 0) {
  • val topics = replicasThatFailedToDelete.map(_.topic)
  • debug("Deletion failed for replicas %s. Halting deletion for topics %s"
  • .format(replicasThatFailedToDelete.mkString(","), topics))
  • // 将这些副本的状态转换为ReplicaDeletionIneligible
  • controller.replicaStateMachine.handleStateChanges(replicasThatFailedToDelete, ReplicaDeletionIneligible)
  • // 标记这些副本所属的Topic不可用
  • markTopicIneligibleForDeletion(topics)
  • // 唤醒DeleteTopicsThread线程
  • resumeTopicDeletionThread()
  • }
  • }
  • }
  • /**
  • * Halt delete topic if -
  • * 1. replicas being down
  • * 2. partition reassignment in progress for some partitions of the topic
  • * 3. preferred replica election in progress for some partitions of the topic
  • * @param topics Topics that should be marked ineligible for deletion. No op if the topic is was not previously queued up for deletion
  • */
  • def markTopicIneligibleForDeletion(topics: Set[String]) {
  • if(isDeleteTopicEnabled) { // 判断是否开启了主题删除功能
  • val newTopicsToHaltDeletion = topicsToBeDeleted & topics
  • // 将传入的主题添加到topicsIneligibleForDeletion集合中,即表示他们不可被删除
  • topicsIneligibleForDeletion ++= newTopicsToHaltDeletion
  • if(newTopicsToHaltDeletion.size > 0)
  • info("Halted deletion of topics %s".format(newTopicsToHaltDeletion.mkString(",")))
  • }
  • }

completeReplicaDeletion(...)方法处理返回正常StopReplicaResponse的副本,它会将副本状态转换为ReplicaDeletionSuccessful,并唤醒DeleteTopicsThread线程,源码如下:

kafka.controller.TopicDeletionManager#completeReplicaDeletion
  • /**
  • * Invoked by the StopReplicaResponse callback when it receives no error code for a replica of a topic to be deleted.
  • * As part of this, the replicas are moved from ReplicaDeletionStarted to ReplicaDeletionSuccessful state. The delete
  • * topic thread is notified so it can tear down the topic if all replicas of a topic have been successfully deleted
  • * @param replicas Replicas that were successfully deleted by the broker
  • */
  • private def completeReplicaDeletion(replicas: Set[PartitionAndReplica]) {
  • // 过滤得到成功删除的副本
  • val successfullyDeletedReplicas = replicas.filter(r => isTopicQueuedUpForDeletion(r.topic))
  • debug("Deletion successfully completed for replicas %s".format(successfullyDeletedReplicas.mkString(",")))
  • // 将成功删除的副本的状态转换为ReplicaDeletionSuccessful
  • controller.replicaStateMachine.handleStateChanges(successfullyDeletedReplicas, ReplicaDeletionSuccessful)
  • // 唤醒DeleteTopicsThread线程
  • resumeTopicDeletionThread()
  • }

回到第三步的第1小步中,当待删除主题的所有副本都被成功删除之后(状态都为ReplicaDeletionSuccessful),doWork()方法会调用completeDeleteTopic(...)完成主题的删除,该操作会处理状态切换、删除存储在Zookeeper中及本地缓存的信息。completeDeleteTopic(...)属于TopicDeletionManager类,它的源码如下:

kafka.controller.TopicDeletionManager#completeDeleteTopic
  • private def completeDeleteTopic(topic: String) {
  • // deregister partition change listener on the deleted topic. This is to prevent the partition change listener
  • // firing before the new topic listener when a deleted topic gets auto created
  • // 移除即将被删除的主题上的PartitionModificationsListener监听器
  • partitionStateMachine.deregisterPartitionChangeListener(topic)
  • // 从副本状态机中获取所有状态为ReplicaDeletionSuccessful的副本集
  • val replicasForDeletedTopic = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)
  • // controller will remove this replica from the state machine as well as its partition assignment cache
  • // 使用副本状态机将这些状态为ReplicaDeletionSuccessful的副本集的状态转换为NonExistentReplica
  • replicaStateMachine.handleStateChanges(replicasForDeletedTopic, NonExistentReplica)
  • // 获取即将被删除主题的所有分区
  • val partitionsForDeletedTopic = controllerContext.partitionsForTopic(topic)
  • // move respective partition to OfflinePartition and NonExistentPartition state
  • // 将即将被删除主题的所有分区的状态转换为OfflinePartition
  • partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, OfflinePartition)
  • // 将即将被删除主题的所有分区的状态转换为NonExistentPartition
  • partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, NonExistentPartition)
  • // 从topicsToBeDeleted中移除已经处理的主题
  • topicsToBeDeleted -= topic
  • // 从partitionsToBeDeleted中移除已经处理的主题的分区
  • partitionsToBeDeleted.retain(_.topic != topic)
  • // 维护Zookeeper中的信息
  • val zkUtils = controllerContext.zkUtils
  • zkUtils.zkClient.deleteRecursive(getTopicPath(topic))
  • zkUtils.zkClient.deleteRecursive(getEntityConfigPath(ConfigType.Topic, topic))
  • zkUtils.zkClient.delete(getDeleteTopicPath(topic))
  • controllerContext.removeTopic(topic) // 情况ControllerContext中的缓存
  • }

最终完成删除操作主要是移除对该主题的监听器、将该主题的所有状态为ReplicaDeletionSuccessful的副本转换为NonExistentReplica、所有分区转换为NonExistentPartition状态,并从本地缓存以及Zookeeper中移除主题的相关信息。

5. IsrChangeNotificationListener

分区的Follower副本会与Leader副本会进行消息同步,当Follower副本追上Leader副本时会被添加到ISR集合中,当Follower副本与Leader副本滞后太多时会被移出ISR集合。Leader副本不仅会在ISR集合变化时将其记录到ZooKeeper中,还会调用ReplicaManager的recordIsrChange(...)方法记录到isrChangeSet集合中,之后通过isr-change-propagation定时任务将该集合中周期性地写入到ZooKeeper的/isr_change_notification路径下。KafkaController中定义的IsrChangeNotificationListener用于监听此路径下的子节点变化,当某些分区的ISR集合变化时通知整个集群中的所有Broker;源码如下:

kafka.controller.IsrChangeNotificationListener
  • /**
  • * Called when leader intimates of isr change
  • * IsrChangeNotificationListener用于监听/isr_change_notification路径下的子节点变化,
  • * 当某些分区的ISR集合变化时通知整个集群中的所有Broker
  • * @param controller
  • */
  • class IsrChangeNotificationListener(controller: KafkaController) extends IZkChildListener with Logging {
  • override def handleChildChange(parentPath: String, currentChildren: util.List[String]): Unit = {
  • import scala.collection.JavaConverters._
  • inLock(controller.controllerContext.controllerLock) {
  • debug("[IsrChangeNotificationListener] Fired!!!")
  • val childrenAsScala: mutable.Buffer[String] = currentChildren.asScala
  • try {
  • // 获取发生变化的主题分区
  • val topicAndPartitions: immutable.Set[TopicAndPartition] = childrenAsScala.map(x => getTopicAndPartition(x)).flatten.toSet
  • if (topicAndPartitions.nonEmpty) {
  • // 从Zookeeper中读取指定分区的Leader副本、ISR集合等信息,更新ControllerContext
  • controller.updateLeaderAndIsrCache(topicAndPartitions)
  • // 向可用Broker发送UpdateMetadataRequest,更新它们的MetadataCache
  • processUpdateNotifications(topicAndPartitions)
  • }
  • } finally {
  • // delete processed children
  • // 删除/isr_change_notification/partitions下已经处理的信息
  • childrenAsScala.map(x => controller.controllerContext.zkUtils.deletePath(
  • ZkUtils.IsrChangeNotificationPath + "/" + x))
  • }
  • }
  • }
  • private def processUpdateNotifications(topicAndPartitions: immutable.Set[TopicAndPartition]) {
  • val liveBrokers: Seq[Int] = controller.controllerContext.liveOrShuttingDownBrokerIds.toSeq
  • debug("Sending MetadataRequest to Brokers:" + liveBrokers + " for TopicAndPartitions:" + topicAndPartitions)
  • // 向可用Broker发送UpdateMetadataRequest,更新它们的MetadataCache
  • controller.sendUpdateMetadataRequest(liveBrokers, topicAndPartitions)
  • }
  • private def getTopicAndPartition(child: String): Set[TopicAndPartition] = {
  • val changeZnode: String = ZkUtils.IsrChangeNotificationPath + "/" + child
  • val (jsonOpt, stat) = controller.controllerContext.zkUtils.readDataMaybeNull(changeZnode)
  • if (jsonOpt.isDefined) {
  • val json = Json.parseFull(jsonOpt.get)
  • json match {
  • case Some(m) =>
  • val topicAndPartitions: mutable.Set[TopicAndPartition] = new mutable.HashSet[TopicAndPartition]()
  • val isrChanges = m.asInstanceOf[Map[String, Any]]
  • val topicAndPartitionList = isrChanges("partitions").asInstanceOf[List[Any]]
  • topicAndPartitionList.foreach {
  • case tp =>
  • val topicAndPartition = tp.asInstanceOf[Map[String, Any]]
  • val topic = topicAndPartition("topic").asInstanceOf[String]
  • val partition = topicAndPartition("partition").asInstanceOf[Int]
  • topicAndPartitions += TopicAndPartition(topic, partition)
  • }
  • topicAndPartitions
  • case None =>
  • error("Invalid topic and partition JSON: " + jsonOpt.get + " in ZK: " + changeZnode)
  • Set.empty
  • }
  • } else {
  • Set.empty
  • }
  • }
  • }

6. PartitionModificationsListener

PartitionModificationsListener会监听/brokers/topics/[topic_name]节点中的数据变化,主要用于监听一个主题的分区的变化。在KafkaController启动或触发Leader选举时调用的onControllerFailover(...)方法、以及新增主题时调用的KafkaController.onNewTopicCreation(...)方法中都会为每个主题注册一个PartitionModificationsListener,而在成功删除主题之后在TopicDeletionManager的completeDeleteTopic(...)中会将注册的PartitionModificationsListener删除。该监听器是PartitionStateMachine的内部类,实现了IZkDataListener接口,源码如下:

kafka.controller.PartitionStateMachine.PartitionModificationsListener
  • /**
  • * 监听/brokers/topics/[topic_name]节点中的数据变化,主要用于监听一个Topic的分区变化。
  • * 并不对分区的删除进行处理,Topic的分区数量是不能减少的。
  • */
  • class PartitionModificationsListener(topic: String) extends IZkDataListener with Logging {
  • this.logIdent = "[AddPartitionsListener on " + controller.config.brokerId + "]: "
  • @throws(classOf[Exception])
  • def handleDataChange(dataPath : String, data: Object) {
  • inLock(controllerContext.controllerLock) {
  • try {
  • info(s"Partition modification triggered $data for path $dataPath")
  • // 从Zookeeper中获取Topic的Partition记录
  • val partitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(List(topic))
  • // 过滤出新增分区的记录
  • val partitionsToBeAdded = partitionReplicaAssignment.filter(p =>
  • !controllerContext.partitionReplicaAssignment.contains(p._1))
  • // 主题正在进行删除操作,输出日志后直接返回
  • if(controller.deleteTopicManager.isTopicQueuedUpForDeletion(topic))
  • error("Skipping adding partitions %s for topic %s since it is currently being deleted"
  • .format(partitionsToBeAdded.map(_._1.partition).mkString(","), topic))
  • else {
  • if (partitionsToBeAdded.size > 0) {
  • info("New partitions to be added %s".format(partitionsToBeAdded))
  • // 将新增分区信息添加到ControllerContext中
  • controllerContext.partitionReplicaAssignment.++=(partitionsToBeAdded)
  • // 切换新增分区及其副本的状态,最终使其上线对外服务
  • controller.onNewPartitionCreation(partitionsToBeAdded.keySet.toSet)
  • }
  • }
  • } catch {
  • case e: Throwable => error("Error while handling add partitions for data path " + dataPath, e )
  • }
  • }
  • }
  • @throws(classOf[Exception])
  • def handleDataDeleted(parentPath : String) {
  • // this is not implemented for partition change
  • }
  • }

注:需要注意的是,PartitionModificationsListener并不对分区的删除进行处理,因为Topic的分区数量是不能减少的。

当发现存在主题的分区的新增操作时,handleDataChange(...)会调用KafkaController的onNewPartitionCreation(...)方法进行处理,该方法在讲解TopicChangeListener时完整分析过,这个理不再赘述。

7. PreferredReplicaElectionListener

PreferredReplicaElectionListener负责监听ZooKeeper的/admin/preferred_replica_election节点。当通过kafka-preferred-replica-election.sh脚本命令启动某些分区的“优先副本”选举操作时,会将指定分区的信息写入该节点,以触发PreferredReplicaElectionListener进行处理。进行“优先副本”选举的目的是让分区的“优先副本”重新成为Leader副本,这是为了让Leader副本在整个集群中分布得更加均衡。PreferredReplicaElectionListener的源码如下:

kafka.controller.PreferredReplicaElectionListener
  • /**
  • * Starts the preferred replica leader election for the list of partitions specified under
  • * 负责监听ZooKeeper节点/admin/preferred_replica_election
  • * 当通过PreferredReplicaLeaderElectionCommand命令指定某些分区需要进行“优先副本”选举时会将指定分区的信息写入该节点,
  • * 从而触发PreferredReplicaElectionListener进行处理
  • * 进行“优先副本”选举的目的是让分区的“优先副本”重新成为Leader副本,这是为了让Leader副本在整个集群中分布得更加均衡
  • * /admin/preferred_replica_election -
  • */
  • class PreferredReplicaElectionListener(controller: KafkaController) extends IZkDataListener with Logging {
  • this.logIdent = "[PreferredReplicaElectionListener on " + controller.config.brokerId + "]: "
  • val zkUtils = controller.controllerContext.zkUtils
  • val controllerContext = controller.controllerContext
  • /**
  • * Invoked when some partitions are reassigned by the admin command
  • * @throws Exception On any error.
  • */
  • @throws(classOf[Exception])
  • def handleDataChange(dataPath: String, data: Object) {
  • debug("Preferred replica election listener fired for path %s. Record partitions to undergo preferred replica election %s"
  • .format(dataPath, data.toString))
  • inLock(controllerContext.controllerLock) {
  • // 获取需要进行"优先副本"选举的TopicAndPartition列表
  • val partitionsForPreferredReplicaElection = PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(data.toString)
  • if(controllerContext.partitionsUndergoingPreferredReplicaElection.size > 0)
  • info("These partitions are already undergoing preferred replica election: %s"
  • .format(controllerContext.partitionsUndergoingPreferredReplicaElection.mkString(",")))
  • // 过滤掉正在进行"优先副本"选举的分区
  • val partitions = partitionsForPreferredReplicaElection -- controllerContext.partitionsUndergoingPreferredReplicaElection
  • // 过滤掉待删除Topic的分区
  • val partitionsForTopicsToBeDeleted = partitions.filter(p => controller.deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
  • if(partitionsForTopicsToBeDeleted.size > 0) {
  • error("Skipping preferred replica election for partitions %s since the respective topics are being deleted"
  • .format(partitionsForTopicsToBeDeleted))
  • }
  • // 对剩余的分区调用onPreferredReplicaElection()方法进行"优先副本"的选举
  • controller.onPreferredReplicaElection(partitions -- partitionsForTopicsToBeDeleted)
  • }
  • }
  • /**
  • * @throws Exception
  • * On any error.
  • */
  • @throws(classOf[Exception])
  • def handleDataDeleted(dataPath: String) {
  • }
  • }

该监听器的handleDataChange(...)调用的onPreferredReplicaElection(...)方法即是负责“优先副本”选举的主要方法,它在上一篇文章的2.2.2.6节处理优先副本选举中完整讲解过,并给出了具体示例,读者可以自行回顾。

8. PartitionsReassignedListener

PartitionsReassignedListener监听ZooKeeper的/admin/reassign_partitions节点。当通过kafka-reassign-partitions.sh脚本命令指定某些分区需要重新分配副本时,会将指定分区的信息写入该节点,从而触发PartitionsReassignedListener进行处理。该监听器的源码如下:

kafka.controller.PartitionsReassignedListener
  • /**
  • * Starts the partition reassignment process unless -
  • * 1. Partition previously existed
  • * 2. New replicas are the same as existing replicas
  • * 3. Any replica in the new set of replicas are dead
  • * If any of the above conditions are satisfied, it logs an error and removes the partition from list of reassigned
  • * partitions.
  • *
  • * PartitionsReassignedListener监听的ZooKeeper节点是/admin/reassign_partitions。
  • * 当管理人员通过ReassignPartitionsCommand命令指定某些分区需要重新分配副本时,
  • * 会将指定分区的信息写入该节点,从而触发PartitionsReassignedListener进行处理。
  • */
  • class PartitionsReassignedListener(controller: KafkaController) extends IZkDataListener with Logging {
  • this.logIdent = "[PartitionsReassignedListener on " + controller.config.brokerId + "]: "
  • val zkUtils = controller.controllerContext.zkUtils
  • val controllerContext = controller.controllerContext
  • /**
  • * Invoked when some partitions are reassigned by the admin command
  • * @throws Exception On any error.
  • */
  • @throws(classOf[Exception])
  • def handleDataChange(dataPath: String, data: Object) {
  • debug("Partitions reassigned listener fired for path %s. Record partitions to be reassigned %s"
  • .format(dataPath, data))
  • // 从Zookeeper的/admin/reassign_partitions节点下读取分区的副本重分配信息
  • val partitionsReassignmentData = zkUtils.parsePartitionReassignmentData(data.toString)
  • // 过滤正在进行重分配的分区
  • val partitionsToBeReassigned = inLock(controllerContext.controllerLock) {
  • partitionsReassignmentData.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))
  • }
  • partitionsToBeReassigned.foreach { partitionToBeReassigned =>
  • inLock(controllerContext.controllerLock) {
  • // 检测分区所属的Topic是否为待删除Topic
  • if(controller.deleteTopicManager.isTopicQueuedUpForDeletion(partitionToBeReassigned._1.topic)) {
  • error("Skipping reassignment of partition %s for topic %s since it is currently being deleted"
  • .format(partitionToBeReassigned._1, partitionToBeReassigned._1.topic))
  • controller.removePartitionFromReassignedPartitions(partitionToBeReassigned._1)
  • } else {
  • val context = new ReassignedPartitionsContext(partitionToBeReassigned._2)
  • // 为副本重新分配做一些准备工作
  • controller.initiateReassignReplicasForTopicPartition(partitionToBeReassigned._1, context)
  • }
  • }
  • }
  • }
  • /**
  • * Called when the leader information stored in zookeeper has been delete. Try to elect as the leader
  • * @throws Exception
  • * On any error.
  • */
  • @throws(classOf[Exception])
  • def handleDataDeleted(dataPath: String) {
  • }
  • }

PartitionsReassignedListener的handleDataChange(...)会在监听到ZooKeeper的/admin/reassign_partitions节点发生变化时被调用,最终执行副本重新分配的操作其实交给了KafkaController的initiateReassignReplicasForTopicPartition(topicAndPartition: TopicAndPartition, reassignedPartitionContext: ReassignedPartitionsContext)方法,该方法在上一篇文章Kafka系列 20 - 服务端源码分析 11:KafkaController相关的Zookeeper监听器的“2.2.2.5 处理副本重分配”章节中完整讲解过,读者可以自行回顾。

9. ReassignedPartitionsIsrChangeListener

ReassignedPartitionsIsrChangeListener监听器是在分区副本重新分配的过程中被注册到ZooKeeper中/broker/topics/[topic_name]/partitions/[partitionId]/state分区所对应的节点上的,回顾这部分代码:

kafka.controller.KafkaController
  • // 对需要进行副本重新分配的分区做准备工作
  • def initiateReassignReplicasForTopicPartition(topicAndPartition: TopicAndPartition,
  • reassignedPartitionContext: ReassignedPartitionsContext) {
  • ...
  • // 为分区注册ReassignedPartitionsIsrChangeListener
  • watchIsrChangesForReassignedPartition(topic, partition, reassignedPartitionContext)
  • ...
  • }
  • // 为指定的需要进行副本重分配的分区绑定ReassignedPartitionsIsrChangeListener监听器
  • private def watchIsrChangesForReassignedPartition(topic: String,
  • partition: Int,
  • reassignedPartitionContext: ReassignedPartitionsContext) {
  • // 获取分配给指定分区的新的AR集合
  • val reassignedReplicas = reassignedPartitionContext.newReplicas
  • // 根据分区所属主题、分区ID以及新的AR集合创建ReassignedPartitionsIsrChangeListener监听器
  • val isrChangeListener = new ReassignedPartitionsIsrChangeListener(this, topic, partition,
  • reassignedReplicas.toSet)
  • // 将监听器绑定到分区对应的ReassignedPartitionsContext对象上
  • reassignedPartitionContext.isrChangeListener = isrChangeListener
  • // register listener on the leader and isr path to wait until they catch up with the current leader
  • // 在Zookeeper的/brokers/topics/[topic_name]/partitions/[partition_id]/state路径上注册刚刚创建的监听器
  • zkUtils.zkClient.subscribeDataChanges(getTopicPartitionLeaderAndIsrPath(topic, partition), isrChangeListener)
  • }

它主要负责处理进行副本重新分配的分区的ISR集合变化,源码如下:

kafka.controller.ReassignedPartitionsIsrChangeListener
  • /**
  • * 每个分区都会注册一个对应的ReassignedPartitionsIsrChangeListener监听器
  • * 监听路径为/broker/topics/[topic_name]/partitions/[partitionId]/state
  • * @param controller KafkaController
  • * @param topic 主题
  • * @param partition 分区
  • * @param reassignedReplicas 新分配的AR集合
  • */
  • class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic: String, partition: Int,
  • reassignedReplicas: Set[Int])
  • extends IZkDataListener with Logging {
  • this.logIdent = "[ReassignedPartitionsIsrChangeListener on controller " + controller.config.brokerId + "]: "
  • val zkUtils = controller.controllerContext.zkUtils
  • val controllerContext = controller.controllerContext
  • /**
  • * Invoked when some partitions need to move leader to preferred replica
  • * @throws Exception On any error.
  • */
  • @throws(classOf[Exception])
  • def handleDataChange(dataPath: String, data: Object) {
  • inLock(controllerContext.controllerLock) {
  • debug("Reassigned partitions isr change listener fired for path %s with children %s".format(dataPath, data))
  • val topicAndPartition = TopicAndPartition(topic, partition)
  • try {
  • // check if this partition is still being reassigned or not
  • // 判断当前监听器所绑定的分区是否正在进行副本重新分配
  • controllerContext.partitionsBeingReassigned.get(topicAndPartition) match {
  • case Some(reassignedPartitionContext) => // 正在进行
  • // need to re-read leader and isr from zookeeper since the zkclient callback doesn't return the Stat object
  • // 从Zookeeper中获取当前分区的Leader副本、ISR集合等信息
  • val newLeaderAndIsrOpt = zkUtils.getLeaderAndIsrForPartition(topic, partition)
  • newLeaderAndIsrOpt match { // Zookeeper中能获取到Leader相关信息
  • case Some(leaderAndIsr) => // check if new replicas have joined ISR
  • // 检查RAR集合中的所有副本是否已经加入了ISR集合
  • // 取RAR与ISR的并集
  • val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet
  • // 检查RAR与上述并集是否相同,如果相同说明RAR的副本都进入了ISR集合
  • if(caughtUpReplicas == reassignedReplicas) {
  • // resume the partition reassignment process
  • info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
  • .format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) +
  • "Resuming partition reassignment")
  • // RAR集合的副本都进入了ISR集合,进行副本重分配操作
  • controller.onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
  • }
  • else {
  • info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
  • .format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) +
  • "Replica(s) %s still need to catch up".format((reassignedReplicas -- leaderAndIsr.isr.toSet).mkString(",")))
  • }
  • case None => error("Error handling reassignment of partition %s to replicas %s as it was never created"
  • .format(topicAndPartition, reassignedReplicas.mkString(",")))
  • }
  • case None => // 没有进行,结束操作
  • }
  • } catch {
  • case e: Throwable => error("Error while handling partition reassignment", e)
  • }
  • }
  • }
  • /**
  • * @throws Exception
  • * On any error.
  • */
  • @throws(classOf[Exception])
  • def handleDataDeleted(dataPath: String) {
  • }
  • }

当ReassignedPartitionsIsrChangeListener监听到分区的ISR集合发生变化时,按照下列步骤进行处理:

  1. 检查当前分区是否正在进行副本的重新分配操作,若不是,则结束。
  2. 从ZooKeeper中读取当前分区的Leader和ISR集合。
  3. 如果新AR集合中的副本已完全进入当前ISR集合,则调用KafkaController的onPartitionReassignment(...)方法完成副本重分配的相关操作。
  4. 否则,输出日志后结束,等待下一次触发。