大数据
流式处理
Kafka
源码解析

Kafka系列 19 - 服务端源码分析 10:分区与副本状态机及Leader副本选举器

简介:主要讲解分区和副本的状态机机制以及Leader副本的选举器

1. PartitionStateMachine状态机

在KafkaController Leader中,每个分区都有自己的状态,分区状态通过PartitionState接口进行定义,该接口有四个实现类,分别表示四种状态:

  • // kafka.controller.PartitionState
  • /**
  • * 分区的状态是通过PartitionState接口定义;
  • * sealed关键字可以修饰类和特质(特质)。密封类提供了一种约束:不能在类定义的文件之外定义任何新的子类。
  • */
  • sealed trait PartitionState { def state: Byte }
  • // 分区被创建后就处于此状态。此时分区可能已经被分配了AR集合,但是还没有指定Leader副本和ISR集合
  • case object NewPartition extends PartitionState { val state: Byte = 0 }
  • // 分区成功选举出Leader副本之后,分区会转换为此状态
  • case object OnlinePartition extends PartitionState { val state: Byte = 1 }
  • // 已经成功选举出分区的Leader副本后,但Leader副本发生宕机,则分区转换为此状态。或者,新创建的分区直接转换为此状态
  • case object OfflinePartition extends PartitionState { val state: Byte = 2 }
  • // 分区从来没有被创建或是分区被创建之后被又删除掉了,这两种场景下的分区都处于此状态
  • case object NonExistentPartition extends PartitionState { val state: Byte = 3 }

KafkaController Leader则使用PartitionStateMachine作为维护这些状态的状态机,该类的定义和重要字段如下:

  • // kafka.controller.PartitionStateMachine
  • /**
  • * This class represents the state machine for partitions. It defines the states that a partition can be in, and
  • * transitions to move the partition to another legal state. The different states that a partition can be in are -
  • * 1. NonExistentPartition: This state indicates that the partition was either never created or was created and then
  • * deleted. Valid previous state, if one exists, is OfflinePartition
  • * 2. NewPartition : After creation, the partition is in the NewPartition state. In this state, the partition should have
  • * replicas assigned to it, but no leader/isr yet. Valid previous states are NonExistentPartition
  • * 3. OnlinePartition : Once a leader is elected for a partition, it is in the OnlinePartition state.
  • * Valid previous states are NewPartition/OfflinePartition
  • * 4. OfflinePartition : If, after successful leader election, the leader for partition dies, then the partition
  • * moves to the OfflinePartition state. Valid previous states are NewPartition/OnlinePartition
  • *
  • * PartitionStateMachine是Controller Leader用于维护分区状态的状态机
  • * @param controller
  • */
  • class PartitionStateMachine(controller: KafkaController) extends Logging {
  • // 用于维护KafkaController的上下文信息
  • private val controllerContext = controller.controllerContext
  • private val controllerId = controller.config.brokerId
  • // Zookeeper客户端,用于与Zookeeper服务器交互
  • private val zkUtils = controllerContext.zkUtils
  • // 记录了每个分区对应的PartitionState状态
  • private val partitionState: mutable.Map[TopicAndPartition, PartitionState] = mutable.Map.empty
  • // 用于向指定的Broker批量发送请求
  • private val brokerRequestBatch = new ControllerBrokerRequestBatch(controller)
  • private val hasStarted = new AtomicBoolean(false)
  • // 默认的Leader副本选举器,继承自PartitionLeaderSelector
  • private val noOpPartitionLeaderSelector = new NoOpLeaderSelector(controllerContext)
  • // Zookeeper的监听器,用于监听Topic的变化
  • private val topicChangeListener = new TopicChangeListener()
  • // Zookeeper的监听器,用于监听Topic的删除
  • private val deleteTopicsListener = new DeleteTopicsListener()
  • // 用于监听分区的修改
  • private val partitionModificationsListeners: mutable.Map[String, PartitionModificationsListener] = mutable.Map.empty
  • ...
  • }

PartitionStateMachine的重要字段的含义在上面的源码中已有注释进行了说明,在后面的讲解中涉及到相应的字段会详细介绍。这里我们先关注与分区状态相关的字段partitionState

1.1. 分区状态初始化

partitionState字典记录了每个分区对应的分区状态,在PartitionStateMachine的startup()方法中会对partitionState字典进行初始化,源码如下:

  • // kafka.controller.PartitionStateMachine#startup
  • /**
  • * Invoked on successful controller election. First registers a topic change listener since that triggers all
  • * state transitions for partitions. Initializes the state of partitions by reading from zookeeper. Then triggers
  • * the OnlinePartition state change for all new or offline partitions.
  • */
  • def startup() {
  • // initialize partition state
  • initializePartitionState() // 初始化分区的状态
  • // set started flag
  • hasStarted.set(true) // 标识PartitionStateMachine已启动
  • // try to move partitions to online state
  • triggerOnlinePartitionStateChange() // 尝试将分区切换到OnlinePartition状态
  • info("Started partition state machine with initial state -> " + partitionState.toString())
  • }

其中initializePartitionState()的源码如下:

  • // kafka.controller.PartitionStateMachine#initializePartitionState
  • /**
  • * Invoked on startup of the partition's state machine to set the initial state for all existing partitions in
  • * zookeeper
  • */
  • private def initializePartitionState() {
  • // 遍历集群中的所有分区
  • for((topicPartition, replicaAssignment) <- controllerContext.partitionReplicaAssignment) {
  • // check if leader and isr path exists for partition. If not, then it is in NEW state
  • controllerContext.partitionLeadershipInfo.get(topicPartition) match { // 获取对应的Leader副本和ISR集合信息
  • case Some(currentLeaderIsrAndEpoch) => // 存在Leader副本和ISR集合的信息
  • // else, check if the leader for partition is alive. If yes, it is in Online state, else it is in Offline state
  • // 检查分区的Leader所在的Broker是否是可用的
  • controllerContext.liveBrokerIds.contains(currentLeaderIsrAndEpoch.leaderAndIsr.leader) match {
  • // Leader副本所在的Broker可用,初始化为OnlinePartition状态
  • case true => // leader is alive
  • partitionState.put(topicPartition, OnlinePartition)
  • // Leader副本所在的Broker不可用,初始化为OfflinePartition状态
  • case false =>
  • partitionState.put(topicPartition, OfflinePartition)
  • }
  • // 没有Leader副本和ISR集合的信息,初始化为NewPartition状态
  • case None =>
  • partitionState.put(topicPartition, NewPartition)
  • }
  • }
  • }

初始化时,对分区的初始状态有以下的设定:

  1. 不存在Leader副本及ISR集合信息的分区,初始化为NewPartition状态。
  2. 存在Leader副本,且Leader副本所在的Broker节点可用,初始化为OnlinePartition状态。
  3. 存在Leader副本,但Leader副本所在的Broker节点不可用,初始化为OfflinePartition状态。

在分区初始化完成后,会调用triggerOnlinePartitionStateChange()方法尝试将所有处于OnlinePartition状态的分区切换为OnlinePartition状态,切换需要满足的条件如下:

  1. brokerRequestBatch中不存在未发送的请求。
  2. 分区所属的主题不是待删除主题。
  3. 分区的状态为OfflinePartition或NewPartition。

该方法的源码如下:

  • // kafka.controller.PartitionStateMachine#triggerOnlinePartitionStateChange
  • /**
  • * This API invokes the OnlinePartition state change on all partitions in either the NewPartition or OfflinePartition
  • * state. This is called on a successful controller election and on broker changes
  • *
  • * 对partitionState集合中的全部分区进行遍历,将OfflinePartition和NewPartition状态的分区转换成OnlinePartition状态
  • */
  • def triggerOnlinePartitionStateChange() {
  • try {
  • // 检查ControllerBrokerRequestBatch的leaderAndIsrRequestMap、stopReplicaRequestMap、updateMetadataRequestMap三个集合是否有缓存的请求
  • brokerRequestBatch.newBatch()
  • // try to move all partitions in NewPartition or OfflinePartition state to OnlinePartition state except partitions
  • // that belong to topics to be deleted
  • // 遍历partitionState
  • for((topicAndPartition, partitionState) <- partitionState
  • if(!controller.deleteTopicManager.isTopicQueuedUpForDeletion(topicAndPartition.topic))) { // 如果不是标记为删除的Topic
  • // 当状态为OfflinePartition或NewPartition,就将其转换为OnlinePartition
  • if(partitionState.equals(OfflinePartition) || partitionState.equals(NewPartition))
  • handleStateChange(topicAndPartition.topic, topicAndPartition.partition, OnlinePartition, controller.offlinePartitionSelector,
  • (new CallbackBuilder).build)
  • }
  • // 发送请求
  • brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
  • } catch {
  • case e: Throwable => error("Error while moving some partitions to the online state", e)
  • // TODO: It is not enough to bail out and log an error, it is important to trigger leader election for those partitions
  • }
  • }

triggerOnlinePartitionStateChange()使用handleStateChange(...)方法进行分区状态的切换,在分区状态转换期间,会产生LeaderAndIsrRequest、StopReplicaRequestInfo或UpdateMetadataRequest等请求存入到ControllerBrokerRequestBatch类型的变量brokerRequestBatch中,最终通过brokerRequestBatch.sendRequestsToBrokers(controller.epoch)将这些请求发出。

1.2. 分区状态的转换

转换分区状态的操作主要由handleStateChange(topic: String, partition: Int, targetState: PartitionState, leaderSelector: PartitionLeaderSelector, callbacks: Callbacks)方法完成,它的第1和第2个参数分别表示所需要转换的分区所属的主题及ID,第3个参数表示转换的目标状态,第4个参数则指定了用于Leader副本选举的选举器;在该方法运行的过程中会产生LeaderAndIsrRequest、StopReplicaRequestInfo或UpdateMetadataRequest等待发送的请求。

在前面我们介绍了分区的四种状态NonExistentPartition、NewPartition、OfflinePartition及OnlinePartition,这四个状态之间有着如下的转换示意图:

1.分区状态转移图示.png

这个转换示意图的操作过程在handleStateChange(...)方法的源码中有着很详细的体现,如下:

  • // kafka.controller.PartitionStateMachine#handleStateChange
  • /**
  • * This API exercises the partition's state machine. It ensures that every state transition happens from a legal
  • * previous state to the target state. Valid state transitions are:
  • * NonExistentPartition -> NewPartition:
  • * --load assigned replicas from ZK to controller cache
  • *
  • * NewPartition -> OnlinePartition
  • * --assign first live replica as the leader and all live replicas as the isr; write leader and isr to ZK for this partition
  • * --send LeaderAndIsr request to every live replica and UpdateMetadata request to every live broker
  • *
  • * OnlinePartition,OfflinePartition -> OnlinePartition
  • * --select new leader and isr for this partition and a set of replicas to receive the LeaderAndIsr request, and write leader and isr to ZK
  • * --for this partition, send LeaderAndIsr request to every receiving replica and UpdateMetadata request to every live broker
  • *
  • * NewPartition,OnlinePartition,OfflinePartition -> OfflinePartition
  • * --nothing other than marking partition state as Offline
  • *
  • * OfflinePartition -> NonExistentPartition
  • * --nothing other than marking the partition state as NonExistentPartition
  • *
  • * 管理分区状态的核心方法,控制PartitionState的转换
  • *
  • * @param topic The topic of the partition for which the state transition is invoked 主题
  • * @param partition The partition for which the state transition is invoked 分区
  • * @param targetState The end state that the partition should be moved to 转换的目标State
  • * @param leaderSelector 用于选举Leader副本的PartitionLeaderSelector对象
  • * @param callbacks
  • */
  • private def handleStateChange(topic: String, partition: Int, targetState: PartitionState,
  • leaderSelector: PartitionLeaderSelector,
  • callbacks: Callbacks) {
  • val topicAndPartition = TopicAndPartition(topic, partition)
  • // 检测当前PartitionStateMachine对象是否已经启动,只有Controller Leader的PartitionStateMachine对象才启动
  • if (!hasStarted.get)
  • // 没有启动将抛出StateChangeFailedException异常
  • throw new StateChangeFailedException(("Controller %d epoch %d initiated state change for partition %s to %s failed because " +
  • "the partition state machine has not started")
  • .format(controllerId, controller.epoch, topicAndPartition, targetState))
  • // 从partitionState中获取分区的状态,如果没有对应的状态,则初始化为NonExistentPartition
  • val currState = partitionState.getOrElseUpdate(topicAndPartition, NonExistentPartition)
  • try {
  • targetState match { // 匹配目标State,检查前置状态是否合法
  • case NewPartition => // 将要转换为NewPartition
  • // pre: partition did not exist before this
  • // 转换为NewPartition状态时,前置状态要为NonExistentPartition,否则会抛出IllegalStateException异常
  • assertValidPreviousStates(topicAndPartition, List(NonExistentPartition), NewPartition)
  • // 将分区状态设置为NewPartition
  • partitionState.put(topicAndPartition, NewPartition)
  • // 日志记录
  • val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(",")
  • stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s with assigned replicas %s"
  • .format(controllerId, controller.epoch, topicAndPartition, currState, targetState,
  • assignedReplicas))
  • // post: partition has been assigned replicas
  • case OnlinePartition => // 将要转换为OnlinePartition
  • // 转换为OnlinePartition状态时,前置状态要为NewPartition、OnlinePartition或OfflinePartition其中一个,否则会抛出IllegalStateException异常
  • assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OnlinePartition)
  • // 对前置状态进行匹配,根据不同的前置状态做出不同的操作
  • partitionState(topicAndPartition) match {
  • case NewPartition => // 前置状态为NewPartition,为分区初始化Leader副本和ISR集合
  • // initialize leader and isr path for new partition
  • initializeLeaderAndIsrForPartition(topicAndPartition)
  • case OfflinePartition => // 前置状态为OfflinePartition,为分区选举新的Leader副本
  • electLeaderForPartition(topic, partition, leaderSelector)
  • case OnlinePartition => // invoked when the leader needs to be re-elected 前置状态为OnlinePartition,为分区重新选举新的Leader副本
  • electLeaderForPartition(topic, partition, leaderSelector)
  • case _ => // should never come here since illegal previous states are checked above
  • }
  • // 修改分区状态为OnlinePartition
  • partitionState.put(topicAndPartition, OnlinePartition)
  • // 日志记录
  • val leader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader
  • stateChangeLogger.trace("Controller %d epoch %d changed partition %s from %s to %s with leader %d"
  • .format(controllerId, controller.epoch, topicAndPartition, currState, targetState, leader))
  • // post: partition has a leader
  • case OfflinePartition => // 将要转换为OfflinePartition
  • // pre: partition should be in New or Online state
  • // 转换为OfflinePartition状态时,前置状态要为NewPartition、OnlinePartition或OfflinePartition其中一个,否则会抛出IllegalStateException异常
  • assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OfflinePartition)
  • // should be called when the leader for a partition is no longer alive
  • // 日志记录
  • stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s"
  • .format(controllerId, controller.epoch, topicAndPartition, currState, targetState))
  • // 修改状态为OfflinePartition
  • partitionState.put(topicAndPartition, OfflinePartition)
  • // post: partition has no alive leader
  • case NonExistentPartition => // 将要转换为NonExistentPartition
  • // pre: partition should be in Offline state
  • // 转换为NonExistentPartition状态时,前置状态要为OfflinePartition,否则会抛出IllegalStateException异常
  • assertValidPreviousStates(topicAndPartition, List(OfflinePartition), NonExistentPartition)
  • // 日志记录
  • stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s"
  • .format(controllerId, controller.epoch, topicAndPartition, currState, targetState))
  • // 修改状态为NonExistentPartition
  • partitionState.put(topicAndPartition, NonExistentPartition)
  • // post: partition state is deleted from all brokers and zookeeper
  • }
  • } catch {
  • case t: Throwable =>
  • stateChangeLogger.error("Controller %d epoch %d initiated state change for partition %s from %s to %s failed"
  • .format(controllerId, controller.epoch, topicAndPartition, currState, targetState), t)
  • }
  • }

handleStateChange(...)方法是一个私有方法,由PartitionStateMachine的handleStateChanges(...)方法和triggerOnlinePartitionStateChange()方法调用以进行PartitionState切换;其中triggerOnlinePartitionStateChange()方法在前面已经讲解过了,handleStateChanges(...)方法对指定的分区集合循环调用handleStateChange(...)方法进行状态转换,源码如下:

kafka.controller.PartitionStateMachine#handleStateChanges
  • /**
  • * This API is invoked by the partition change zookeeper listener
  • * @param partitions The list of partitions that need to be transitioned to the target state
  • * @param targetState The state that the partitions should be moved to
  • */
  • def handleStateChanges(partitions: Set[TopicAndPartition], targetState: PartitionState,
  • leaderSelector: PartitionLeaderSelector = noOpPartitionLeaderSelector,
  • callbacks: Callbacks = (new CallbackBuilder).build) {
  • info("Invoking state change to %s for partitions %s".format(targetState, partitions.mkString(",")))
  • try {
  • // 检查ControllerBrokerRequestBatch的leaderAndIsrRequestMap、stopReplicaRequestMap、updateMetadataRequestMap三个集合是否有缓存的请求
  • brokerRequestBatch.newBatch()
  • // 对每个分区都调用handleStateChange()处理状态切换
  • partitions.foreach { topicAndPartition =>
  • handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState, leaderSelector, callbacks)
  • }
  • // 使用ControllerBrokerRequestBatch对象发送请求
  • brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
  • }catch {
  • case e: Throwable => error("Error while moving some partitions to %s state".format(targetState), e)
  • // TODO: It is not enough to bail out and log an error, it is important to trigger state changes for those partitions
  • }
  • }

handleStateChange(...)方法内分支繁多,下面对该方法的分支分别进行讲解。

1.2.1. 前置状态检测

根据传入的目标状态参数targetState的不同,该方法进行了差异化处理。首先在转换之前,需要检查分区的前置状态,针对不同的目标状态规定了特定前置状态:

  1. 转换为NewPartition时,分区前置状态需要为NonExistentPartition;
  2. 转换为OnlinePartition时,分区前置状态需要为NewPartition、OnlinePartition或OfflinePartition;
  3. 转换为OfflinePartition时,分区前置状态需要为NewPartition、OnlinePartition或OfflinePartition;
  4. 转换为NonExistentPartition时,分区前置状态需要为OfflinePartition;

注:如果进行状态转换的分区没有对应的状态,则初始化为NonExistentPartition。

前置状态的检查由PartitionStateMachine的assertValidPreviousStates(...)方法实现,源码如下:

  • // kafka.controller.PartitionStateMachine#assertValidPreviousStates
  • private def assertValidPreviousStates(topicAndPartition: TopicAndPartition, fromStates: Seq[PartitionState],
  • targetState: PartitionState) {
  • if(!fromStates.contains(partitionState(topicAndPartition))) // 状态不对时会抛出IllegalStateException异常
  • throw new IllegalStateException("Partition %s should be in the %s states before moving to %s state"
  • .format(topicAndPartition, fromStates.mkString(","), targetState) + ". Instead it is in %s state"
  • .format(partitionState(topicAndPartition)))
  • }

1.2.2. 转换为NewPartition

只有前置状态为NonExistentPartition的分区才能转换为NewPartition状态,具体的转换操作代码片段如下:

kafka.controller.PartitionStateMachine#handleStateChange
  • ...
  • case NewPartition => // 将要转换为NewPartition
  • // pre: partition did not exist before this
  • // 转换为NewPartition状态时,前置状态要为NonExistentPartition,否则会抛出IllegalStateException异常
  • assertValidPreviousStates(topicAndPartition, List(NonExistentPartition), NewPartition)
  • // 将分区状态设置为NewPartition
  • partitionState.put(topicAndPartition, NewPartition)
  • // 日志记录
  • val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(",")
  • stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s with assigned replicas %s"
  • .format(controllerId, controller.epoch, topicAndPartition, currState, targetState,
  • assignedReplicas))
  • ...

从源码可知,分区状态从NonExistentPartition转换为NewPartition时仅仅简单地将partitionState字典中分区对应的状态修改为NewPartition即可。

1.2.3. 转换为OnlinePartition

只有前置状态为NewPartition、OnlinePartition或OfflinePartition的分区才能转换为OnlinePartition状态,具体的转换操作代码片段如下:

kafka.controller.PartitionStateMachine#handleStateChange
  • ...
  • case OnlinePartition => // 将要转换为OnlinePartition
  • // 转换为OnlinePartition状态时,前置状态要为NewPartition、OnlinePartition或OfflinePartition其中一个,否则会抛出IllegalStateException异常
  • assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OnlinePartition)
  • // 对前置状态进行匹配,根据不同的前置状态做出不同的操作
  • partitionState(topicAndPartition) match {
  • case NewPartition => // 前置状态为NewPartition,为分区初始化Leader副本和ISR集合
  • // initialize leader and isr path for new partition
  • initializeLeaderAndIsrForPartition(topicAndPartition)
  • case OfflinePartition => // 前置状态为OfflinePartition,为分区选举新的Leader副本
  • electLeaderForPartition(topic, partition, leaderSelector)
  • case OnlinePartition => // invoked when the leader needs to be re-elected 前置状态为OnlinePartition,为分区重新选举新的Leader副本
  • electLeaderForPartition(topic, partition, leaderSelector)
  • case _ => // should never come here since illegal previous states are checked above
  • }
  • // 修改分区状态为OnlinePartition
  • partitionState.put(topicAndPartition, OnlinePartition)
  • // 日志记录
  • val leader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader
  • stateChangeLogger.trace("Controller %d epoch %d changed partition %s from %s to %s with leader %d"
  • .format(controllerId, controller.epoch, topicAndPartition, currState, targetState, leader))
  • ...

对于不同的前置状态,转换为OnlinePartition时所作的操作是不同的,前置状态为NewPartition时,需要执行initializeLeaderAndIsrForPartition(...)方法,该方法源码如下:

kafka.controller.PartitionStateMachine#initializeLeaderAndIsrForPartition
  • /**
  • * Invoked on the NewPartition->OnlinePartition state change. When a partition is in the New state, it does not have
  • * a leader and isr path in zookeeper. Once the partition moves to the OnlinePartition state, its leader and isr
  • * path gets initialized and it never goes back to the NewPartition state. From here, it can only go to the
  • * OfflinePartition state.
  • *
  • * NewPartition -> OnlinePartition
  • *
  • * @param topicAndPartition The topic/partition whose leader and isr path is to be initialized
  • */
  • private def initializeLeaderAndIsrForPartition(topicAndPartition: TopicAndPartition) {
  • // 获取分区的AR集合信息
  • val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition)
  • // 获取AR集合中的可用副本集合
  • val liveAssignedReplicas = replicaAssignment.filter(r => controllerContext.liveBrokerIds.contains(r))
  • // 如果没有可用的副本,则抛出StateChangeFailedException异常,否则尝试选择一个Leader副本
  • liveAssignedReplicas.size match {
  • case 0 => // 无可用副本
  • val failMsg = ("encountered error during state change of partition %s from New to Online, assigned replicas are [%s], " +
  • "live brokers are [%s]. No assigned replica is alive.")
  • .format(topicAndPartition, replicaAssignment.mkString(","), controllerContext.liveBrokerIds)
  • stateChangeLogger.error("Controller %d epoch %d ".format(controllerId, controller.epoch) + failMsg)
  • // 抛异常
  • throw new StateChangeFailedException(failMsg)
  • case _ =>
  • debug("Live assigned replicas for partition %s are: [%s]".format(topicAndPartition, liveAssignedReplicas))
  • // make the first replica in the list of assigned replicas, the leader
  • // 选择可用副本的第一个副本为Leader
  • val leader = liveAssignedReplicas.head
  • // 创建LeaderIsrAndControllerEpoch,其中ISR集合是可用的AR集合,leaderEpoch和zkVersion都初始化为0
  • val leaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader, liveAssignedReplicas.toList),
  • controller.epoch)
  • debug("Initializing leader and isr for partition %s to %s".format(topicAndPartition, leaderIsrAndControllerEpoch))
  • try {
  • /**
  • * 将LeaderIsrAndControllerEpoch中的消息转换为JSON格式存储到Zookeeper中
  • * 路径为:/brokers/topics/[topic_name]/partitions/[partitionId]/state
  • */
  • zkUtils.createPersistentPath(
  • getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),
  • zkUtils.leaderAndIsrZkData(leaderIsrAndControllerEpoch.leaderAndIsr, controller.epoch))
  • // NOTE: the above write can fail only if the current controller lost its zk session and the new controller
  • // took over and initialized this partition. This can happen if the current controller went into a long
  • // GC pause
  • // 更新ControllerContext的partitionLeadershipInfo中的记录
  • controllerContext.partitionLeadershipInfo.put(topicAndPartition, leaderIsrAndControllerEpoch)
  • // 添加LeaderAndISRRequest,等待发送
  • brokerRequestBatch.addLeaderAndIsrRequestForBrokers(liveAssignedReplicas, topicAndPartition.topic,
  • topicAndPartition.partition, leaderIsrAndControllerEpoch, replicaAssignment)
  • } catch {
  • case e: ZkNodeExistsException => // Zookeeper结点已存在异常
  • // read the controller epoch
  • val leaderIsrAndEpoch = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topicAndPartition.topic,
  • topicAndPartition.partition).get
  • val failMsg = ("encountered error while changing partition %s's state from New to Online since LeaderAndIsr path already " +
  • "exists with value %s and controller epoch %d")
  • .format(topicAndPartition, leaderIsrAndEpoch.leaderAndIsr.toString(), leaderIsrAndEpoch.controllerEpoch)
  • stateChangeLogger.error("Controller %d epoch %d ".format(controllerId, controller.epoch) + failMsg)
  • throw new StateChangeFailedException(failMsg)
  • }
  • }
  • }

该方法的主要步骤有四步:

  1. 从ControllerContext的partitionReplicaAssignment集合中选择第一个可用的副本作为Leader副本,其余的副本构成ISR集合。
  2. 将Leader副本和ISR集合的信息写入到ZooKeeper。
  3. 更新ControllerContext的partitionLeadershipInfo中缓存的Leader副本、ISR集合等信息。
  4. 将上述步骤中确定的Leader副本、ISR集合、AR集合等信息添加到ControllerBrokerRequestBatch,之后会封装成LeaderAndIsrRequest发送给相关的Broker。

这其实比较好理解,当一个分区的前置状态为NewPartition时,表示它是一个新的分区,如果要将其转换为OnlinePartition状态,理应为该分区选举出Leader副本;**需要注意的是,分区从NewPartition转换为OnlinePartition状态时选举Leader副本的方式是选择AR集合中的第1个副本作为Leader副本,这与后面讲解的在已有Leader副本出现问题时所触发的Leader副本选举的方式不同。

当分区前置状态为OfflinePartition或OnlinePartition时,转换为OnlinePartition状态都会调用electLeaderForPartition(...)方法进行新Leader副本的选举,该方法源码如下:

kafka.controller.PartitionStateMachine#electLeaderForPartition
  • /**
  • * Invoked on the OfflinePartition,OnlinePartition->OnlinePartition state change.
  • * It invokes the leader election API to elect a leader for the input offline partition
  • *
  • * OfflinePartition, OnlinePartition -> OnlinePartition
  • *
  • * @param topic The topic of the offline partition
  • * @param partition The offline partition
  • * @param leaderSelector Specific leader selector (e.g., offline/reassigned/etc.)
  • */
  • def electLeaderForPartition(topic: String, partition: Int, leaderSelector: PartitionLeaderSelector) {
  • val topicAndPartition = TopicAndPartition(topic, partition)
  • // handle leader election for the partitions whose leader is no longer alive
  • stateChangeLogger.trace("Controller %d epoch %d started leader election for partition %s"
  • .format(controllerId, controller.epoch, topicAndPartition))
  • try {
  • // 记录Zookeeper中的路径是否更新成功,初始标记为false
  • var zookeeperPathUpdateSucceeded: Boolean = false
  • var newLeaderAndIsr: LeaderAndIsr = null
  • var replicasForThisPartition: Seq[Int] = Seq.empty[Int]
  • while(!zookeeperPathUpdateSucceeded) { // 当Zookeeper中的路径没有成功更新时
  • // 从Zookeeper中获取分区当前的Leader副本、ISR集合、zkVersion等信息,如果不存在将抛出StateChangeFailedException异常
  • val currentLeaderIsrAndEpoch = getLeaderIsrAndEpochOrThrowException(topic, partition)
  • // 当前的Leader及ISR信息
  • val currentLeaderAndIsr = currentLeaderIsrAndEpoch.leaderAndIsr
  • // 当前的Controller年代信息
  • val controllerEpoch = currentLeaderIsrAndEpoch.controllerEpoch
  • // 校验年代信息
  • if (controllerEpoch > controller.epoch) {
  • val failMsg = ("aborted leader election for partition [%s,%d] since the LeaderAndIsr path was " +
  • "already written by another controller. This probably means that the current controller %d went through " +
  • "a soft failure and another controller was elected with epoch %d.")
  • .format(topic, partition, controllerId, controllerEpoch)
  • stateChangeLogger.error("Controller %d epoch %d ".format(controllerId, controller.epoch) + failMsg)
  • throw new StateChangeFailedException(failMsg)
  • }
  • // elect new leader or throw exception
  • /**
  • * 使用指定的PartitionLeaderSelector以及当前的LeaderAndIsr信息,选举新的Leader副本和ISR集合
  • * 返回的第二个值为需要接收LeaderAndIsrRequest的副本集合
  • */
  • val (leaderAndIsr, replicas) = leaderSelector.selectLeader(topicAndPartition, currentLeaderAndIsr)
  • /**
  • * 将新的LeaderAndIsr信息转换为JSON格式保存到Zookeeper中,
  • * 路径为:/brokers/topics/[topic_name]/partitions/[partitionId]/state
  • * 第一个返回值表示是否更新成功,第二个返回值表示更新后的zkVersion
  • */
  • val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partition,
  • leaderAndIsr, controller.epoch, currentLeaderAndIsr.zkVersion)
  • // 更新记录值
  • newLeaderAndIsr = leaderAndIsr
  • newLeaderAndIsr.zkVersion = newVersion
  • zookeeperPathUpdateSucceeded = updateSucceeded // 更新成功后修改为true时,跳出循环
  • // 记录分配后该分区的副本集合
  • replicasForThisPartition = replicas
  • }
  • // 维护ControllerContext的partitionLeadershipInfo内的记录
  • val newLeaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch(newLeaderAndIsr, controller.epoch)
  • // update the leader cache
  • controllerContext.partitionLeadershipInfo.put(TopicAndPartition(topic, partition), newLeaderIsrAndControllerEpoch)
  • stateChangeLogger.trace("Controller %d epoch %d elected leader %d for Offline partition %s"
  • .format(controllerId, controller.epoch, newLeaderAndIsr.leader, topicAndPartition))
  • // 获取分区新的AR集合
  • val replicas = controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition))
  • // store new leader and isr info in cache
  • // 添加LeaderAndISRRequest,等待发送
  • brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic, partition,
  • newLeaderIsrAndControllerEpoch, replicas)
  • } catch {
  • case lenne: LeaderElectionNotNeededException => // swallow
  • case nroe: NoReplicaOnlineException => throw nroe
  • case sce: Throwable =>
  • val failMsg = "encountered error while electing leader for partition %s due to: %s.".format(topicAndPartition, sce.getMessage)
  • stateChangeLogger.error("Controller %d epoch %d ".format(controllerId, controller.epoch) + failMsg)
  • throw new StateChangeFailedException(failMsg, sce)
  • }
  • debug("After leader election, leader cache is updated to %s".format(controllerContext.partitionLeadershipInfo.map(l => (l._1, l._2))))
  • }

electLeaderForPartition(...)方法的操作即为在已有Leader副本出现问题时所触发的Leader副本选举,其操作的主要步骤也是以下的四步:

  1. 使用指定的PartitionLeaderSelector为分区选举新的Leader副本。
  2. 将Leader副本和ISR集合的信息写入到Zookeeper。
  3. 更新ControllerContext的partitionLeadershipInfo集合中缓存的Leader副本、ISR集合等信息。
  4. 将上述步骤中确定的Leader副本、ISR集合、AR集合等信息添加到ControllerBrokerRequestBatch,之后会封装成LeaderAndIsrRequest发送给相关的Broker。

这里的Leader副本则不是选择AR集合的第1个副本,而是使用的PartitionLeaderSelector选举器进行选举的,关于PartitionLeaderSelector选举器的内容将在后面介绍。除了Leader副本的选举方式以外electLeaderForPartition(...)方法的其它操作与initializeLeaderAndIsrForPartition(...)方法是一致的。

1.2.4. 转换为OfflinePartition

只有前置状态为NewPartition、OnlinePartition或OfflinePartition的分区才能转换为OfflinePartition状态,具体的转换操作代码片段如下:

kafka.controller.PartitionStateMachine#handleStateChange
  • ...
  • case OfflinePartition => // 将要转换为OfflinePartition
  • // pre: partition should be in New or Online state
  • // 转换为OfflinePartition状态时,前置状态要为NewPartition、OnlinePartition或OfflinePartition其中一个,否则会抛出IllegalStateException异常
  • assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OfflinePartition)
  • // should be called when the leader for a partition is no longer alive
  • // 日志记录
  • stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s"
  • .format(controllerId, controller.epoch, topicAndPartition, currState, targetState))
  • // 修改状态为OfflinePartition
  • partitionState.put(topicAndPartition, OfflinePartition)
  • ...

从源码可知,分区状态从前置状态转换为OfflinePartition时仅仅简单地将partitionState字典中分区对应的状态修改为OfflinePartition即可。

1.2.5. 转换为NonExistentPartition

只有前置状态为OfflinePartition的分区才能转换为NonExistentPartition状态,具体的转换操作代码片段如下:

kafka.controller.PartitionStateMachine#handleStateChange
  • ...
  • case NonExistentPartition => // 将要转换为NonExistentPartition
  • // pre: partition should be in Offline state
  • // 转换为NonExistentPartition状态时,前置状态要为OfflinePartition,否则会抛出IllegalStateException异常
  • assertValidPreviousStates(topicAndPartition, List(OfflinePartition), NonExistentPartition)
  • // 日志记录
  • stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s"
  • .format(controllerId, controller.epoch, topicAndPartition, currState, targetState))
  • // 修改状态为NonExistentPartition
  • partitionState.put(topicAndPartition, NonExistentPartition)
  • ...

从源码可知,分区状态从OfflinePartition状态转换为NonExistentPartition时仅仅简单地将partitionState字典中分区对应的状态修改为NonExistentPartition即可。

2. ReplicaStateMachine状态机

与分区类似,副本也有相应的状态,副本状态由ReplicaState接口表示,它有七个子类,源码如下:

kafka.controller
  • // 副本状态
  • sealed trait ReplicaState { def state: Byte }
  • // 创建新Topic或进行副本重新分配时,新创建的副本就处于这个状态。处于此状态的副本只能成为Follower副本。
  • case object NewReplica extends ReplicaState { val state: Byte = 1 }
  • // 副本开始正常工作时处于此状态,处在此状态的副本可以成为Leader副本,也可以成为Follower副本。
  • case object OnlineReplica extends ReplicaState { val state: Byte = 2 }
  • // 副本所在的Broker下线后,会转换为此状态。
  • case object OfflineReplica extends ReplicaState { val state: Byte = 3 }
  • // 刚开始删除副本时,会先将副本转换为此状态,然后开始删除操作。
  • case object ReplicaDeletionStarted extends ReplicaState { val state: Byte = 4}
  • // 副本被成功删除后,副本状态会处于此状态。
  • case object ReplicaDeletionSuccessful extends ReplicaState { val state: Byte = 5}
  • // 如果副本删除操作失败,会将副本转换为此状态。
  • case object ReplicaDeletionIneligible extends ReplicaState { val state: Byte = 6}
  • // 副本被成功删除后最终转换为此状态。
  • case object NonExistentReplica extends ReplicaState { val state: Byte = 7 }

KafkaController Leader使用ReplicaStateMachine维护副本状态的状态机,该类的定义和重要字段如下:

kafka.controller.ReplicaStateMachine
  • /**
  • * This class represents the state machine for replicas. It defines the states that a replica can be in, and
  • * transitions to move the replica to another legal state. The different states that a replica can be in are -
  • * 1. NewReplica : The controller can create new replicas during partition reassignment. In this state, a
  • * replica can only get become follower state change request. Valid previous
  • * state is NonExistentReplica
  • * 2. OnlineReplica : Once a replica is started and part of the assigned replicas for its partition, it is in this
  • * state. In this state, it can get either become leader or become follower state change requests.
  • * Valid previous state are NewReplica, OnlineReplica or OfflineReplica
  • * 3. OfflineReplica : If a replica dies, it moves to this state. This happens when the broker hosting the replica
  • * is down. Valid previous state are NewReplica, OnlineReplica
  • * 4. ReplicaDeletionStarted: If replica deletion starts, it is moved to this state. Valid previous state is OfflineReplica
  • * 5. ReplicaDeletionSuccessful: If replica responds with no error code in response to a delete replica request, it is
  • * moved to this state. Valid previous state is ReplicaDeletionStarted
  • * 6. ReplicaDeletionIneligible: If replica deletion fails, it is moved to this state. Valid previous state is ReplicaDeletionStarted
  • * 7. NonExistentReplica: If a replica is deleted successfully, it is moved to this state. Valid previous state is
  • * ReplicaDeletionSuccessful
  • *
  • * ReplicaStateMachine是Controller Leader用于维护副本状态的状态机。
  • */
  • class ReplicaStateMachine(controller: KafkaController) extends Logging {
  • private val controllerContext = controller.controllerContext
  • private val controllerId = controller.config.brokerId
  • private val zkUtils = controllerContext.zkUtils
  • // 记录每个副本对应的ReplicaState状态
  • private val replicaState: mutable.Map[PartitionAndReplica, ReplicaState] = mutable.Map.empty
  • // 用于监听Broker的变化,例如Broker宕机或重新上线等事件
  • private val brokerChangeListener = new BrokerChangeListener()
  • private val brokerRequestBatch = new ControllerBrokerRequestBatch(controller)
  • // 标识状态机是否启动
  • private val hasStarted = new AtomicBoolean(false)
  • ...
  • }

ReplicaStateMachine与PartitionStateMachine非常类似,使用replicaState字典记录每个副本的状态,ControllerBrokerRequestBatch类型的brokerRequestBatch字段负责请求的批量发送。

2.1. 副本状态初始化

ReplicaStateMachine的startup()方法会对副本的状态进行初始化,源码如下:

kafka.controller.ReplicaStateMachine#startup
  • /**
  • * Invoked on successful controller election. First registers a broker change listener since that triggers all
  • * state transitions for replicas. Initializes the state of replicas for all partitions by reading from zookeeper.
  • * Then triggers the OnlineReplica state change for all replicas.
  • */
  • def startup() {
  • // initialize replica state
  • initializeReplicaState() // 初始化replicaState集合
  • // set started flag
  • hasStarted.set(true) // 标识已启动
  • // move all Online replicas to Online
  • // 尝试将所有可用副本转换为OnlineReplica状态
  • handleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica)
  • info("Started replica state machine with initial state -> " + replicaState.toString())
  • }

其中调用initializeReplicaState()用于初始化初始化replicaState集合:

kafka.controller.ReplicaStateMachine#initializeReplicaState
  • /**
  • * Invoked on startup of the replica's state machine to set the initial state for replicas of all existing partitions
  • * in zookeeper
  • *
  • * 根据ControllerContext的partitionLeadershipInfo中记录的Broker状态来设置每个副本的初始状态
  • */
  • private def initializeReplicaState() {
  • // 遍历ControllerContext中记录的所有分区及对应的AR副本集
  • for((topicPartition, assignedReplicas) <- controllerContext.partitionReplicaAssignment) {
  • val topic = topicPartition.topic
  • val partition = topicPartition.partition
  • // 遍历每个分区的AR副本集
  • assignedReplicas.foreach { replicaId =>
  • val partitionAndReplica = PartitionAndReplica(topic, partition, replicaId)
  • // 遍历可用Broker的ID集
  • controllerContext.liveBrokerIds.contains(replicaId) match {
  • // 如果副本是可用的,则设置为OnlineReplica状态
  • case true => replicaState.put(partitionAndReplica, OnlineReplica)
  • case false => // 否则设置为ReplicaDeletionIneligible状态
  • // mark replicas on dead brokers as failed for topic deletion, if they belong to a topic to be deleted.
  • // This is required during controller failover since during controller failover a broker can go down,
  • // so the replicas on that broker should be moved to ReplicaDeletionIneligible to be on the safer side.
  • replicaState.put(partitionAndReplica, ReplicaDeletionIneligible)
  • }
  • }
  • }
  • }

从该方法的源码可知,对于每个副本而言,如果该副本所在的Broker节点是可用的,则将该副本的状态设置为OnlineReplica,否则设置为ReplicaDeletionIneligible。

在调用完initializeReplicaState()方法初始化副本状态后,还会调用handleStateChanges(...)方法将ControllerContext中所有可用的副本转换为OnlineReplica状态;handleStateChanges(...)方法与PartitionStateMachine的handleStateChanges(...)方法非常类似,主要是遍历传入的PartitionAndReplica集合,对每个副本进行状态转换,:

kafka.controller.ReplicaStateMachine#handleStateChanges
  • /**
  • * This API is invoked by the broker change controller callbacks and the startup API of the state machine
  • * @param replicas The list of replicas (brokers) that need to be transitioned to the target state
  • * @param targetState The state that the replicas should be moved to
  • * The controller's allLeaders cache should have been updated before this
  • */
  • def handleStateChanges(replicas: Set[PartitionAndReplica], targetState: ReplicaState,
  • callbacks: Callbacks = (new CallbackBuilder).build) {
  • if(replicas.size > 0) {
  • info("Invoking state change to %s for replicas %s".format(targetState, replicas.mkString(",")))
  • try {
  • brokerRequestBatch.newBatch()
  • replicas.foreach(r => handleStateChange(r, targetState, callbacks))
  • brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
  • }catch {
  • case e: Throwable => error("Error while moving some replicas to %s state".format(targetState), e)
  • }
  • }
  • }

从上述源码可知,ReplicaStateMachine类的handleStateChange(...)方法是执行副本状态转换的核心方法。

2.2. 副本状态的转换

转换副本状态的操作主要由handleStateChange(partitionAndReplica: PartitionAndReplica, targetState: ReplicaState, callbacks: Callbacks)方法完成,它的第1参数表示所需要转换状态的副本,第2个参数表示转换的目标状态。其中第1个参数的类型为PartitionAndReplica,它的定义非常简单,记录了副本所属的主题、副本所属的分区的ID以及副本的ID:

kafka.controller.PartitionAndReplica
  • /**
  • * @param topic 主题
  • * @param partition 分区
  • * @param replica 副本
  • */
  • case class PartitionAndReplica(topic: String, partition: Int, replica: Int) {
  • override def toString(): String = {
  • "[Topic=%s,Partition=%d,Replica=%d]".format(topic, partition, replica)
  • }
  • }

在前面我们介绍了副本的七种状态NewReplica、OnlineReplica、OfflineReplica、ReplicaDeletionStarted、ReplicaDeletionSuccessful、ReplicaDeletionIneligible及NonExistentReplica,这七个状态之间有着如下的转换示意图:

2.副本状态转移图示.png

这个转换示意图的操作过程在ReplicaStateMachine的handleStateChange(...)方法的源码中有着很详细的体现,如下:

kafka.controller.ReplicaStateMachine#handleStateChange
  • /**
  • * This API exercises the replica's state machine. It ensures that every state transition happens from a legal
  • * previous state to the target state. Valid state transitions are:
  • * NonExistentReplica --> NewReplica
  • * --send LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the
  • * partition to every live broker
  • *
  • * NewReplica -> OnlineReplica
  • * --add the new replica to the assigned replica list if needed
  • *
  • * OnlineReplica,OfflineReplica -> OnlineReplica
  • * --send LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the
  • * partition to every live broker
  • *
  • * NewReplica,OnlineReplica,OfflineReplica,ReplicaDeletionIneligible -> OfflineReplica
  • * --send StopReplicaRequest to the replica (w/o deletion)
  • * --remove this replica from the isr and send LeaderAndIsr request (with new isr) to the leader replica and
  • * UpdateMetadata request for the partition to every live broker.
  • *
  • * OfflineReplica -> ReplicaDeletionStarted
  • * --send StopReplicaRequest to the replica (with deletion)
  • *
  • * ReplicaDeletionStarted -> ReplicaDeletionSuccessful
  • * -- mark the state of the replica in the state machine
  • *
  • * ReplicaDeletionStarted -> ReplicaDeletionIneligible
  • * -- mark the state of the replica in the state machine
  • *
  • * ReplicaDeletionSuccessful -> NonExistentReplica
  • * -- remove the replica from the in memory partition replica assignment cache
  • *
  • * 该方法用于控制ReplicaState转换
  • *
  • * @param partitionAndReplica The replica for which the state transition is invoked
  • * @param targetState The end state that the replica should be moved to
  • */
  • def handleStateChange(partitionAndReplica: PartitionAndReplica, targetState: ReplicaState,
  • callbacks: Callbacks) {
  • // 获取主题、分区和副本ID
  • val topic = partitionAndReplica.topic
  • val partition = partitionAndReplica.partition
  • val replicaId = partitionAndReplica.replica
  • val topicAndPartition = TopicAndPartition(topic, partition)
  • // 如果状态机没有启动则抛出异常
  • if (!hasStarted.get)
  • throw new StateChangeFailedException(("Controller %d epoch %d initiated state change of replica %d for partition %s " +
  • "to %s failed because replica state machine has not started")
  • .format(controllerId, controller.epoch, replicaId, topicAndPartition, targetState))
  • // 获取副本当前的状态,如果没有对应的状态就设置初始值为NonExistentReplica
  • val currState = replicaState.getOrElseUpdate(partitionAndReplica, NonExistentReplica)
  • try {
  • // 获取分区的AR集合
  • val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition)
  • // 针对目标状态进行匹配
  • targetState match {
  • case NewReplica => // 目标状态是NewReplica
  • // 检查前置状态是否合理,必须是NonExistentReplica
  • assertValidPreviousStates(partitionAndReplica, List(NonExistentReplica), targetState)
  • // start replica as a follower to the current leader for its partition
  • // 获取分区的Leader、ISR、Controller年代等信息
  • val leaderIsrAndControllerEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition)
  • leaderIsrAndControllerEpochOpt match {
  • // 能够获取到Leader、ISR等信息
  • case Some(leaderIsrAndControllerEpoch) =>
  • // 当前分区是Leader分区,Leader副本不可以处于NewReplica状态
  • if(leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId)
  • throw new StateChangeFailedException("Replica %d for partition %s cannot be moved to NewReplica"
  • .format(replicaId, topicAndPartition) + "state as it is being requested to become leader")
  • // 向对应副本发送LeaderAndIsrRequest请求,并发送UpdateMetadataRequest给所有可用的Broker
  • brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),
  • topic, partition, leaderIsrAndControllerEpoch,
  • replicaAssignment)
  • // 未能获取Leader、ISR等信息
  • case None => // new leader request will be sent to this replica when one gets elected
  • }
  • // 更新状态
  • replicaState.put(partitionAndReplica, NewReplica)
  • stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
  • .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState,
  • targetState))
  • case ReplicaDeletionStarted => // 目标状态是ReplicaDeletionStarted
  • // 检查前置状态是否合理,必须是OfflineReplica
  • assertValidPreviousStates(partitionAndReplica, List(OfflineReplica), targetState)
  • // 更新状态
  • replicaState.put(partitionAndReplica, ReplicaDeletionStarted)
  • // send stop replica command
  • // 向副本发送StopReplicaRequest请求
  • brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = true,
  • callbacks.stopReplicaResponseCallback)
  • stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
  • .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
  • case ReplicaDeletionIneligible => // 目标状态是ReplicaDeletionIneligible
  • // 检查前置状态是否合理,必须是ReplicaDeletionStarted
  • assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState)
  • // 更新状态
  • replicaState.put(partitionAndReplica, ReplicaDeletionIneligible)
  • stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
  • .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
  • case ReplicaDeletionSuccessful => // 目标状态是ReplicaDeletionSuccessful
  • // 检查前置状态是否合理,必须是ReplicaDeletionStarted
  • assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState)
  • // 更新状态
  • replicaState.put(partitionAndReplica, ReplicaDeletionSuccessful)
  • stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
  • .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
  • case NonExistentReplica => // 目标状态是NonExistentReplica
  • // 检查前置状态是否合理,必须是ReplicaDeletionSuccessful
  • assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionSuccessful), targetState)
  • // remove this replica from the assigned replicas list for its partition
  • // 获取分区对应的AR集合
  • val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
  • // 从AR集合中删除该副本
  • controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas.filterNot(_ == replicaId))
  • // 删除副本状态
  • replicaState.remove(partitionAndReplica)
  • stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
  • .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
  • case OnlineReplica => // 目标状态是OnlineReplica
  • // 检查前置状态是否合理,必须是NewReplica、OnlineReplica、OfflineReplica、ReplicaDeletionIneligible
  • assertValidPreviousStates(partitionAndReplica,
  • List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState)
  • replicaState(partitionAndReplica) match {
  • case NewReplica => // 当前状态是NewReplica
  • // add this replica to the assigned replicas list for its partition
  • // 获取分区的AR集合
  • val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
  • // 分区当前的AR集合中不包含指定的副本时,将该副本添加到AR集合中
  • if(!currentAssignedReplicas.contains(replicaId))
  • controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId)
  • stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
  • .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState,
  • targetState))
  • case _ => // 当前状态是除NewReplica以外的其他状态(OnlineReplica、OfflineReplica、ReplicaDeletionIneligible)
  • // check if the leader for this partition ever existed
  • // 获取分区的Leader副本信息
  • controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
  • // 如果存在Leader副本
  • case Some(leaderIsrAndControllerEpoch) =>
  • // 向副本发送LeaderAndIsrRequest请求,并向集群中所有可用的Broker发送UpdateMetadataRequest请求
  • brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderIsrAndControllerEpoch,
  • replicaAssignment)
  • // 更改状态为OnlineReplica
  • replicaState.put(partitionAndReplica, OnlineReplica)
  • stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
  • .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
  • case None => // that means the partition was never in OnlinePartition state, this means the broker never
  • // started a log for that partition and does not have a high watermark value for this partition
  • }
  • }
  • replicaState.put(partitionAndReplica, OnlineReplica)
  • case OfflineReplica => // 目标状态是OfflineReplica
  • // 检查前置状态是否合理,必须是NewReplica、OnlineReplica、OfflineReplica、ReplicaDeletionIneligible
  • assertValidPreviousStates(partitionAndReplica,
  • List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState)
  • // send stop replica command to the replica so that it stops fetching from the leader
  • // 向该副本发送StopReplicaRequest请求,但不会删除副本(deletePartition为false)
  • brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = false)
  • // As an optimization, the controller removes dead replicas from the ISR
  • val leaderAndIsrIsEmpty: Boolean =
  • // 获取分区的Leader副本
  • controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
  • // 能够获取到Leader副本
  • case Some(currLeaderIsrAndControllerEpoch) =>
  • // 从ISR集合中移除该副本
  • controller.removeReplicaFromIsr(topic, partition, replicaId) match {
  • // ISR集合移除该副本后Leader副本还存在,即移除的副本不是Leader副本
  • case Some(updatedLeaderIsrAndControllerEpoch) =>
  • // send the shrunk ISR state change request to all the remaining alive replicas of the partition.
  • // 当前的ISR集合
  • val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
  • if (!controller.deleteTopicManager.isPartitionToBeDeleted(topicAndPartition)) { // 当前分区并不是将要删除的分区
  • // 向其他可用副本发送LeaderAndIsrRequest请求,并向集群中所有可用的Broker发送UpdateMetadataRequest
  • brokerRequestBatch.addLeaderAndIsrRequestForBrokers(currentAssignedReplicas.filterNot(_ == replicaId),
  • topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment)
  • }
  • // 更新状态
  • replicaState.put(partitionAndReplica, OfflineReplica)
  • stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
  • .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
  • false // 能找到Leader副本,返回false
  • case None =>
  • true // 找不到Leader副本,返回true
  • }
  • case None =>
  • true // 找不到Leader副本,返回true
  • }
  • if (leaderAndIsrIsEmpty && !controller.deleteTopicManager.isPartitionToBeDeleted(topicAndPartition))
  • throw new StateChangeFailedException(
  • "Failed to change state of replica %d for partition %s since the leader and isr path in zookeeper is empty"
  • .format(replicaId, topicAndPartition))
  • }
  • }
  • catch {
  • case t: Throwable =>
  • stateChangeLogger.error("Controller %d epoch %d initiated state change of replica %d for partition [%s,%d] from %s to %s failed"
  • .format(controllerId, controller.epoch, replicaId, topic, partition, currState, targetState), t)
  • }
  • }

handleStateChange(...)方法与前面讲解的PartitionStateMachine的handleStateChange(...)方法很类似,只是状态的情况更多了,下面对该方法的分支分别进行讲解。

2.2.1. 前置状态检测

副本在状态转换之前,也需要检查前置状态,针对不同的目标状态规定了特定前置状态:

  1. 转换为NewReplica时,副本前置状态需要为NonExistentReplica;
  2. 转换为ReplicaDeletionStarted时,副本前置状态需要为OfflineReplica;
  3. 转换为ReplicaDeletionIneligible时,副本前置状态需要为ReplicaDeletionStarted;
  4. 转换为ReplicaDeletionSuccessful时,副本前置状态需要为ReplicaDeletionStarted;
  5. 转换为NonExistentReplica时,副本前置状态需要为ReplicaDeletionSuccessful;
  6. 转换为OnlineReplica时,副本前置状态需要为NewReplica、OnlineReplica、OfflineReplica或ReplicaDeletionIneligible;
  7. 转换为OfflineReplica时,副本前置状态需要为NewReplica、OnlineReplica、OfflineReplica或ReplicaDeletionIneligible;

注:如果进行状态转换的副本没有对应的状态,则初始化为NonExistentReplica。

副本的前置状态的检查由ReplicaStateMachine的assertValidPreviousStates(...)方法实现,源码如下:

kafka.controller.ReplicaStateMachine#assertValidPreviousStates
  • private def assertValidPreviousStates(partitionAndReplica: PartitionAndReplica, fromStates: Seq[ReplicaState],
  • targetState: ReplicaState) {
  • assert(fromStates.contains(replicaState(partitionAndReplica)),
  • "Replica %s should be in the %s states before moving to %s state"
  • .format(partitionAndReplica, fromStates.mkString(","), targetState) +
  • ". Instead it is in %s state".format(replicaState(partitionAndReplica)))
  • }

2.2.2. 转换为NewReplica

只有前置状态为NonExistentReplica的副本才能转换为NewReplica状态,具体的转换操作代码片段如下:

kafka.controller.ReplicaStateMachine#handleStateChange
  • ...
  • case NewReplica => // 目标状态是NewReplica
  • // 检查前置状态是否合理,必须是NonExistentReplica
  • assertValidPreviousStates(partitionAndReplica, List(NonExistentReplica), targetState)
  • // start replica as a follower to the current leader for its partition
  • // 获取分区的Leader、ISR、Controller年代等信息
  • val leaderIsrAndControllerEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition)
  • leaderIsrAndControllerEpochOpt match {
  • // 能够获取到Leader、ISR等信息
  • case Some(leaderIsrAndControllerEpoch) =>
  • // 当前分区是Leader分区,Leader副本不可以处于NewReplica状态
  • if(leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId)
  • throw new StateChangeFailedException("Replica %d for partition %s cannot be moved to NewReplica"
  • .format(replicaId, topicAndPartition) + "state as it is being requested to become leader")
  • // 向对应副本发送LeaderAndIsrRequest请求,并发送UpdateMetadataRequest给所有可用的Broker
  • brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),
  • topic, partition, leaderIsrAndControllerEpoch,
  • replicaAssignment)
  • // 未能获取Leader、ISR等信息
  • case None => // new leader request will be sent to this replica when one gets elected
  • }
  • // 更新状态
  • replicaState.put(partitionAndReplica, NewReplica)
  • stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
  • .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState,
  • targetState))
  • ...

该转换过程中,会由KafkaController向此副本所在Broker发送LeaderAndIsrRequest请求,该请求包含了Leader副本所在的Broker的ID、Leader副本的年代信息、ISR集合及Controller年代信息等数据;同时该转换操作还会向集群中所有可用的Broker发送UpdateMetadataRequest以通知Broker更新元数据。

这里需要注意的是,Leader副本是不可处于NewReplica状态的,因此当尝试将Leader副本转换为NewReplica状态时会抛出StateChangeFailedException异常。

2.2.3. 转换为ReplicaDeletionStarted

只有前置状态为OfflineReplica的副本才能转换为ReplicaDeletionStarted状态,具体的转换操作代码片段如下:

kafka.controller.ReplicaStateMachine#handleStateChange
  • ...
  • case ReplicaDeletionStarted => // 目标状态是ReplicaDeletionStarted
  • // 检查前置状态是否合理,必须是OfflineReplica
  • assertValidPreviousStates(partitionAndReplica, List(OfflineReplica), targetState)
  • // 更新状态
  • replicaState.put(partitionAndReplica, ReplicaDeletionStarted)
  • // send stop replica command
  • // 向副本发送StopReplicaRequest请求
  • brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = true,
  • callbacks.stopReplicaResponseCallback)
  • stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
  • .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
  • ...

当某个副本要转换为ReplicaDeletionStarted状态,说明该副本将要被删除,因此KafkaController会向副本所在Broker发送StopReplicaRequest以停止该副本,注意,此处构造的StopReplicaRequestInfo对象的deletePartition参数为true,表示删除副本所在的分区。

注:副本的删除操作涉及三个状态:ReplicaDeletionStarted、ReplicaDeletionSuccessful和ReplicaDeletionIneligible。其中ReplicaDeletionStarted仅仅表示删除操作开始。

2.2.4. 转换为ReplicaDeletionIneligible

只有前置状态为ReplicaDeletionStarted的副本才能转换为ReplicaDeletionIneligible状态,表示删除操作失败,具体的转换操作代码片段如下:

kafka.controller.ReplicaStateMachine#handleStateChange
  • ...
  • case ReplicaDeletionIneligible => // 目标状态是ReplicaDeletionIneligible
  • // 检查前置状态是否合理,必须是ReplicaDeletionStarted
  • assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState)
  • // 更新状态
  • replicaState.put(partitionAndReplica, ReplicaDeletionIneligible)
  • stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
  • .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
  • ...

从源码可知,副本状态从ReplicaDeletionStarted转换为ReplicaDeletionIneligible时仅仅简单地将replicaState字典中副本对应的状态修改为ReplicaDeletionIneligible即可。

2.2.5. 转换为ReplicaDeletionSuccessful

只有前置状态为ReplicaDeletionStarted的副本才能转换为ReplicaDeletionSuccessful状态,表示删除操作成功,具体的转换操作代码片段如下:

kafka.controller.ReplicaStateMachine#handleStateChange
  • ...
  • case ReplicaDeletionSuccessful => // 目标状态是ReplicaDeletionSuccessful
  • // 检查前置状态是否合理,必须是ReplicaDeletionStarted
  • assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState)
  • // 更新状态
  • replicaState.put(partitionAndReplica, ReplicaDeletionSuccessful)
  • stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
  • .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
  • ...

从源码可知,副本状态从ReplicaDeletionStarted转换为ReplicaDeletionSuccessful时仅仅简单地将replicaState字典中副本对应的状态修改为ReplicaDeletionSuccessful即可。

2.2.6. 转换为NonExistentReplica

只有前置状态为ReplicaDeletionSuccessful的副本才能转换为NonExistentReplica状态,具体的转换操作代码片段如下:

kafka.controller.ReplicaStateMachine#handleStateChange
  • ...
  • case NonExistentReplica => // 目标状态是NonExistentReplica
  • // 检查前置状态是否合理,必须是ReplicaDeletionSuccessful
  • assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionSuccessful), targetState)
  • // remove this replica from the assigned replicas list for its partition
  • // 获取分区对应的AR集合
  • val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
  • // 从AR集合中删除该副本
  • controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas.filterNot(_ == replicaId))
  • // 删除副本状态
  • replicaState.remove(partitionAndReplica)
  • stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
  • .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
  • ...

在该状态的转换过程中,KafkaController会从AR集合中删除对应的副本,并从replicaState字典中移除副本相关的键值对。

2.2.7. 转换为OnlineReplica

只有前置状态为NewReplica、OnlineReplica、OfflineReplica或ReplicaDeletionIneligible的副本才能转换为OnlineReplica状态,表示副本上线;具体的转换操作代码片段如下:

kafka.controller.ReplicaStateMachine#handleStateChange
  • ...
  • case OnlineReplica => // 目标状态是OnlineReplica
  • // 检查前置状态是否合理,必须是NewReplica、OnlineReplica、OfflineReplica、ReplicaDeletionIneligible
  • assertValidPreviousStates(partitionAndReplica,
  • List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState)
  • replicaState(partitionAndReplica) match {
  • case NewReplica => // 当前状态是NewReplica
  • // add this replica to the assigned replicas list for its partition
  • // 获取分区的AR集合
  • val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
  • // 分区当前的AR集合中不包含指定的副本时,将该副本添加到AR集合中
  • if(!currentAssignedReplicas.contains(replicaId))
  • controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId)
  • stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
  • .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState,
  • targetState))
  • case _ => // 当前状态是除NewReplica以外的其他状态(OnlineReplica、OfflineReplica、ReplicaDeletionIneligible)
  • // check if the leader for this partition ever existed
  • // 获取分区的Leader副本信息
  • controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
  • // 如果存在Leader副本
  • case Some(leaderIsrAndControllerEpoch) =>
  • // 向副本发送LeaderAndIsrRequest请求,并向集群中所有可用的Broker发送UpdateMetadataRequest请求
  • brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderIsrAndControllerEpoch,
  • replicaAssignment)
  • // 更改状态为OnlineReplica
  • replicaState.put(partitionAndReplica, OnlineReplica)
  • stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
  • .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
  • case None => // that means the partition was never in OnlinePartition state, this means the broker never
  • // started a log for that partition and does not have a high watermark value for this partition
  • }
  • }
  • replicaState.put(partitionAndReplica, OnlineReplica)
  • ...

对于前置状态为NewReplica的副本,在转换过程中会由KafkaController将该副本加入到对应分区的AR集合中;而对于前置状态为OnlineReplica、OfflineReplica或ReplicaDeletionIneligible的副本,在转换过程中KafkaController会向此副本所在的Broker发送LeaderAndIsrRequest请求告知其Leader副本和Controller年代等信息,并向集群中所有可用的Broker发送UpdateMetadataRequest以更新通知它们元数据。

2.2.8. 转换为OfflineReplica

只有前置状态为NewReplica、OnlineReplica、OfflineReplica、ReplicaDeletionIneligible的副本才能转换为OfflineReplica状态,表示副本下线;具体的转换操作代码片段如下:

kafka.controller.ReplicaStateMachine#handleStateChange
  • ...
  • case OfflineReplica => // 目标状态是OfflineReplica
  • // 检查前置状态是否合理,必须是NewReplica、OnlineReplica、OfflineReplica、ReplicaDeletionIneligible
  • assertValidPreviousStates(partitionAndReplica,
  • List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState)
  • // send stop replica command to the replica so that it stops fetching from the leader
  • // 向该副本发送StopReplicaRequest请求,但不会删除副本(deletePartition为false)
  • brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = false)
  • // As an optimization, the controller removes dead replicas from the ISR
  • val leaderAndIsrIsEmpty: Boolean =
  • // 获取分区的Leader副本
  • controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
  • // 能够获取到Leader副本
  • case Some(currLeaderIsrAndControllerEpoch) =>
  • // 从ISR集合中移除该副本
  • controller.removeReplicaFromIsr(topic, partition, replicaId) match {
  • // ISR集合移除该副本后Leader副本还存在,即移除的副本不是Leader副本
  • case Some(updatedLeaderIsrAndControllerEpoch) =>
  • // send the shrunk ISR state change request to all the remaining alive replicas of the partition.
  • // 当前的ISR集合
  • val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
  • if (!controller.deleteTopicManager.isPartitionToBeDeleted(topicAndPartition)) { // 当前分区并不是将要删除的分区
  • // 向其他可用副本发送LeaderAndIsrRequest请求,并向集群中所有可用的Broker发送UpdateMetadataRequest
  • brokerRequestBatch.addLeaderAndIsrRequestForBrokers(currentAssignedReplicas.filterNot(_ == replicaId),
  • topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment)
  • }
  • // 更新状态
  • replicaState.put(partitionAndReplica, OfflineReplica)
  • stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
  • .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
  • false // 能找到Leader副本,返回false
  • case None =>
  • true // 找不到Leader副本,返回true
  • }
  • case None =>
  • true // 找不到Leader副本,返回true
  • }
  • if (leaderAndIsrIsEmpty && !controller.deleteTopicManager.isPartitionToBeDeleted(topicAndPartition))
  • throw new StateChangeFailedException(
  • "Failed to change state of replica %d for partition %s since the leader and isr path in zookeeper is empty"
  • .format(replicaId, topicAndPartition))
  • ...

在转换为OfflineReplica状态的过程中,KafkaController会向副本所在Broker发送StopReplicaRequest以停止副本,然后从对应分区的ISR集合中清除此副本,最后向其它可用副本所在的Broker发送LeaderAndIsrRequest告知Leader副本及Controller年代等信息,并向集群中所有可用的Broker发送UpdateMetadataRequest以通知其更新元数据。

3. PartitionLeaderSelector选举器

从前面讲解PartitionStateMachine状态机时,我们了解到PartitionMachine将Leader副本选举、确定ISR集合的工作委托给了PartitionLeaderSelector特质实现;使用PartitionLeaderSelector选举器选举Leader副本的操作只会应用在Leader副本已存在且出现故障的场景。

PartitionLeaderSelector是一个特质,它有五个具体的实现类;我们先了解PartitionLeaderSelector的源码:

kafka.controller.PartitionLeaderSelector
  • // 分区Leader选择器
  • trait PartitionLeaderSelector {
  • /**
  • * @param topicAndPartition The topic and partition whose leader needs to be elected
  • * 需要进行Leader副本选举的分区
  • * @param currentLeaderAndIsr The current leader and isr of input partition read from zookeeper
  • * 当前Leader副本信息、ISR信息
  • * @throws NoReplicaOnlineException If no replica in the assigned replicas list is alive
  • * @return The leader and isr request, with the newly selected leader and isr, and the set of replicas to receive
  • * the LeaderAndIsrRequest. 选举后的新Leader副本和新ISR集合信息,以及需要接收LeaderAndIsrRequest的BrokerID
  • */
  • def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int])
  • }

可以得知,该特质只声明了selectLeader(...)一个方法用于选举操作。

PartitionLeaderSelector的五个具体实现类如下:

  1. OfflinePartitionLeaderSelector。
  2. ReassignedPartitionLeaderSelector。
  3. PreferredReplicaPartitionLeaderSelector。
  4. ControlledShutdownLeaderSelector。
  5. NoOpLeaderSelector。

下面将分别对这五个选举器进行讲解。

3.1. OfflinePartitionLeaderSelector

OfflinePartitionLeaderSelector会根据传给selectLeader(...)方法的currentLeaderAndIsr参数选举新的Leader副本和ISR集合,策略如下:

  1. 如果在ISR集合中存在至少一个可用的副本,则从ISR集合中选择新的Leader副本(ISR集合的第1个副本),当前ISR集合为新ISR集合。
  2. 如果ISR集合中没有可用的副本且“Unclean leader election”配置被禁用,那么就抛出异常。
  3. 如果“Unclean leader election”被开启,则从AR集合中选择新的Leader副本(AR集合的第1个副本)和 ISR集合。
  4. 如果AR集合中没有可用的副本,抛出异常。

该选举器源码如下:

kafka.controller.OfflinePartitionLeaderSelector
  • /**
  • * Select the new leader, new isr and receiving replicas (for the LeaderAndIsrRequest):
  • * 1. If at least one broker from the isr is alive, it picks a broker from the live isr as the new leader and the live
  • * isr as the new isr.
  • * 2. Else, if unclean leader election for the topic is disabled, it throws a NoReplicaOnlineException.
  • * 3. Else, it picks some alive broker from the assigned replica list as the new leader and the new isr.
  • * 4. If no broker in the assigned replica list is alive, it throws a NoReplicaOnlineException
  • * Replicas to receive LeaderAndIsr request = live assigned replicas
  • * Once the leader is successfully registered in zookeeper, it updates the allLeaders cache
  • *
  • * 1. 如果在ISR集合中存在至少一个可用的副本,则从ISR集合中选择新的Leader副本,当前ISR集合为新ISR集合。
  • * 2. 如果ISR集合中没有可用的副本且“Unclean leader election”配置被禁用,那么就抛出异常。
  • * 3. 如果“Unclean leader election”被开启,则从AR集合中选择新的Leader副本和ISR集合。
  • * 4. 如果AR集合中没有可用的副本,抛出异常。
  • */
  • class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, config: KafkaConfig)
  • extends PartitionLeaderSelector with Logging {
  • this.logIdent = "[OfflinePartitionLeaderSelector]: "
  • def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
  • // 获取当前分区的AR集合
  • controllerContext.partitionReplicaAssignment.get(topicAndPartition) match {
  • case Some(assignedReplicas) => // 存在AR集合
  • // 过滤得到AR集合中可用的副本
  • val liveAssignedReplicas = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
  • // 过滤得到ISR集合中可用的副本
  • val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => controllerContext.liveBrokerIds.contains(r))
  • // 当前Controller年代信息及Leader副本的zkVersion
  • val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
  • val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
  • // 尝试从ISR集合的可用副本中选取一个Leader副本
  • val newLeaderAndIsr = liveBrokersInIsr.isEmpty match {
  • case true => // ISR集合中没有可用副本,且没有开启unclean leader election,抛出NoReplicaOnlineException异常
  • // Prior to electing an unclean (i.e. non-ISR) leader, ensure that doing so is not disallowed by the configuration
  • // for unclean leader election.
  • if (!LogConfig.fromProps(config.originals, AdminUtils.fetchEntityConfig(controllerContext.zkUtils,
  • ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) {
  • throw new NoReplicaOnlineException(("No broker in ISR for partition " +
  • "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) +
  • " ISR brokers are: [%s]".format(currentLeaderAndIsr.isr.mkString(",")))
  • }
  • debug("No broker in ISR is alive for %s. Pick the leader from the alive assigned replicas: %s"
  • .format(topicAndPartition, liveAssignedReplicas.mkString(",")))
  • // 走到这里说明ISR中没有可用副本,但开启了unclean leader election
  • // 尝试从AR集合的可用副本中选取一个Leader副本
  • liveAssignedReplicas.isEmpty match {
  • case true => // AR集合中也没有可用副本,抛出NoReplicaOnlineException异常
  • throw new NoReplicaOnlineException(("No replica for partition " +
  • "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) +
  • " Assigned replicas are: [%s]".format(assignedReplicas))
  • case false => // AR集合中有可用的副本
  • ControllerStats.uncleanLeaderElectionRate.mark()
  • // 选择AR集合可用副本的第一个作为Leader副本
  • val newLeader = liveAssignedReplicas.head
  • warn("No broker in ISR is alive for %s. Elect leader %d from live brokers %s. There's potential data loss."
  • .format(topicAndPartition, newLeader, liveAssignedReplicas.mkString(",")))
  • // 构造LeaderAndIsr对象,有新Leader副本、Leader年代信息 + 1、ISR集合只有新Leader副本,当前Leader的zkVersion + 1
  • new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion + 1)
  • }
  • case false => // ISR集合中有可用的副本
  • // 从AR集合中过滤得到还存活于ISR集合中的副本集,该副本集将作为新的ISR集合,并选择其中的第一个副本作为Leader副本
  • val liveReplicasInIsr = liveAssignedReplicas.filter(r => liveBrokersInIsr.contains(r))
  • val newLeader = liveReplicasInIsr.head
  • debug("Some broker in ISR is alive for %s. Select %d from ISR %s to be the leader."
  • .format(topicAndPartition, newLeader, liveBrokersInIsr.mkString(",")))
  • // 构造LeaderAndIsr对象,有新Leader副本、Leader年代信息 + 1、新的ISR集合,当前Leader的zkVersion + 1
  • new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1)
  • }
  • info("Selected new leader and ISR %s for offline partition %s".format(newLeaderAndIsr.toString(), topicAndPartition))
  • // 返回新的Leader副本信息及新的AR集合,并需要向AR集合所有存活的副本发送LeaderAndIsrRequest
  • (newLeaderAndIsr, liveAssignedReplicas)
  • case None =>
  • // AR集合为空,抛出NoReplicaOnlineException异常
  • throw new NoReplicaOnlineException("Partition %s doesn't have replicas assigned to it".format(topicAndPartition))
  • }
  • }
  • }

3.2. ReassignedPartitionLeaderSelector

ReassignedPartitionLeaderSelector的策略为:选取的新Leader副本必须在新指定的AR集合中且同时在当前ISR集合中,当前ISR集合为新ISR集合,接收LeaderAndIsrRequest的副本是新指定的AR集合中的副本。该选举器涉及到副本的重新分配,将在后面讲解;源码如下:

kafka.controller.ReassignedPartitionLeaderSelector
  • /**
  • * New leader = a live in-sync reassigned replica
  • * New isr = current isr
  • * Replicas to receive LeaderAndIsr request = reassigned replicas
  • *
  • * 选取的新Leader副本必须在新指定的AR集合中且同时在当前ISR集合中,
  • * 当前ISR集合为新ISR集合,接收LeaderAndIsrRequest的副本是新指定的AR集合中的副本。
  • */
  • class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging {
  • this.logIdent = "[ReassignedPartitionLeaderSelector]: "
  • /**
  • * The reassigned replicas are already in the ISR when selectLeader is called.
  • */
  • def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
  • // 获取指定分区重新分配到的ISR集合
  • val reassignedInSyncReplicas = controllerContext.partitionsBeingReassigned(topicAndPartition).newReplicas
  • // 当前Leader年代信息和zkVersion
  • val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
  • val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
  • // 过滤得到存活的且存在于当前ISR集合中的副本,取第1个作为新的Leader副本
  • val aliveReassignedInSyncReplicas = reassignedInSyncReplicas.filter(r => controllerContext.liveBrokerIds.contains(r) &&
  • currentLeaderAndIsr.isr.contains(r))
  • // 取第1个
  • val newLeaderOpt = aliveReassignedInSyncReplicas.headOption
  • newLeaderOpt match {
  • // 存在,将其作为新的Leader副本,当前ISR集合为新的ISR集合,向新分配到的AR集合中的副本发送LeaderAndIsrRequest请求
  • case Some(newLeader) => (new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, currentLeaderAndIsr.isr,
  • currentLeaderIsrZkPathVersion + 1), reassignedInSyncReplicas)
  • // 不存在,抛出NoReplicaOnlineException异常
  • case None =>
  • reassignedInSyncReplicas.size match {
  • case 0 =>
  • throw new NoReplicaOnlineException("List of reassigned replicas for partition " +
  • " %s is empty. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
  • case _ =>
  • throw new NoReplicaOnlineException("None of the reassigned replicas for partition " +
  • "%s are in-sync with the leader. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
  • }
  • }
  • }
  • }

3.3. PreferredReplicaPartitionLeaderSelector

PreferredReplicaPartitionLeaderSelector的策略为:如果“优先副本”可用且在ISR集合中,则选取其为Leader副本,当前的ISR集合为新的ISR集合,并向AR集合中所有可用副本发送LeaderAndIsrRequest,否则会抛出异常。源码如下:

kafka.controller.PreferredReplicaPartitionLeaderSelector
  • /**
  • * New leader = preferred (first assigned) replica (if in isr and alive);
  • * New isr = current isr;
  • * Replicas to receive LeaderAndIsr request = assigned replicas
  • *
  • * 如果“优先副本”(即AR的第1个副本)可用且在ISR集合中且存活,则选取其为Leader副本,当前的ISR集合为新的ISR集合,
  • * 并向AR集合中所有可用副本发送LeaderAndIsrRequest,
  • * 否则会抛出异常。
  • */
  • class PreferredReplicaPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector
  • with Logging {
  • this.logIdent = "[PreferredReplicaPartitionLeaderSelector]: "
  • def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
  • // 分区的AR集合
  • val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
  • // AR集合中第1个副本为"优先副本"
  • val preferredReplica = assignedReplicas.head
  • // check if preferred replica is the current leader
  • // 检查该"优先副本"是不是当前的Leader副本
  • // 获取当前的Leader副本
  • val currentLeader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader
  • if (currentLeader == preferredReplica) {
  • // 如果"优先副本"就是当前的Leader副本,抛出LeaderElectionNotNeededException异常
  • throw new LeaderElectionNotNeededException("Preferred replica %d is already the current leader for partition %s"
  • .format(preferredReplica, topicAndPartition))
  • } else {
  • // "优先副本"不是当前的Leader副本
  • info("Current leader %d for partition %s is not the preferred replica.".format(currentLeader, topicAndPartition) +
  • " Trigerring preferred replica leader election")
  • // check if preferred replica is not the current leader and is alive and in the isr
  • // "优先副本"可用且存在于ISR集合中
  • if (controllerContext.liveBrokerIds.contains(preferredReplica) && currentLeaderAndIsr.isr.contains(preferredReplica)) {
  • // 将"优先副本"作为Leader副本,当前ISR集合为新的ISR集合,并向AR集合所有的副本发送LeaderAndIsrRequest请求
  • (new LeaderAndIsr(preferredReplica, currentLeaderAndIsr.leaderEpoch + 1, currentLeaderAndIsr.isr,
  • currentLeaderAndIsr.zkVersion + 1), assignedReplicas)
  • } else {
  • throw new StateChangeFailedException("Preferred replica %d for partition ".format(preferredReplica) +
  • "%s is either not alive or not in the isr. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
  • }
  • }
  • }
  • }

3.4. ControlledShutdownLeaderSelector

ControlledShutdownLeaderSelector的策略是:从当前ISR集合中排除正在关闭的副本后作为新的ISR集合,从新ISR集合中选择新的Leader,需要向AR集合中可用的副本发送LeaderAndIsrRequest。源码如下:

kafka.controller.ControlledShutdownLeaderSelector
  • /**
  • * New leader = replica in isr that's not being shutdown;
  • * New isr = current isr - shutdown replica;
  • * Replicas to receive LeaderAndIsr request = live assigned replicas
  • *
  • * 从当前ISR集合中排除正在关闭的副本后作为新的ISR集合,从新ISR集合中选择新的Leader,
  • * 需要向AR集合中可用的副本发送LeaderAndIsrRequest
  • */
  • class ControlledShutdownLeaderSelector(controllerContext: ControllerContext)
  • extends PartitionLeaderSelector
  • with Logging {
  • this.logIdent = "[ControlledShutdownLeaderSelector]: "
  • def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
  • // 当前Leader副本的年代信息和zkVersion
  • val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
  • val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
  • // 当前的Leader副本
  • val currentLeader = currentLeaderAndIsr.leader
  • // 当前AR集合
  • val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
  • // 当前可用的Broker的ID
  • val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
  • // 过滤得到AR集合中可用的副本
  • val liveAssignedReplicas = assignedReplicas.filter(r => liveOrShuttingDownBrokerIds.contains(r))
  • // 过滤得到ISR集合中可用的副本
  • val newIsr = currentLeaderAndIsr.isr.filter(brokerId => !controllerContext.shuttingDownBrokerIds.contains(brokerId))
  • // 从AR集合中过滤得到存在于ISR集合中可用的副本,选择第1个作为Leader副本
  • liveAssignedReplicas.filter(newIsr.contains).headOption match {
  • case Some(newLeader) => // 第1个存在,构建LeaderAndIsr,过滤得到的ISR集合为新的ISR集合,并向AR集合所有可用的副本发送LeaderAndIsrRequest请求
  • debug("Partition %s : current leader = %d, new leader = %d".format(topicAndPartition, currentLeader, newLeader))
  • (LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1), liveAssignedReplicas)
  • case None =>
  • // 第1个不存在,抛出StateChangeFailedException异常
  • throw new StateChangeFailedException(("No other replicas in ISR %s for %s besides" +
  • " shutting down brokers %s").format(currentLeaderAndIsr.isr.mkString(","), topicAndPartition, controllerContext.shuttingDownBrokerIds.mkString(",")))
  • }
  • }
  • }

3.5. NoOpLeaderSelector

NoOpLeaderSelector没有进行Leader选举,而是将currentLeaderAndIsr直接返回,需要接收LeaderAndIsrRequest的Broker则是分区的AR集合。源码如下:

kafka.controller.NoOpLeaderSelector
  • /**
  • * Essentially does nothing. Returns the current leader and ISR, and the current
  • * set of replicas assigned to a given topic/partition.
  • * 没有进行Leader选举,而是将currentLeaderAndIsr和分区的AR集合直接返回
  • */
  • class NoOpLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging {
  • this.logIdent = "[NoOpLeaderSelector]: "
  • def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
  • warn("I should never have been asked to perform leader election, returning the current LeaderAndIsr and replica assignment.")
  • // 直接返回了currentLeaderAndIsr,分区的AR集合
  • (currentLeaderAndIsr, controllerContext.partitionReplicaAssignment(topicAndPartition))
  • }
  • }