大数据
流式处理
Kafka
源码解析

Kafka系列 20 - 服务端源码分析 11:KafkaController相关的Zookeeper监听器

简介:主要讲解KafkaController相关的Zookeeper监听器的实现

1. Zookeeper Listener概览

Kafka使用I0Itec ZkClient作为Zookeeper客户端管理工具,通过ZooKeeper监控整个Kafka集群的运行状态,响应管理员指定的相关操作,具体的实现方式是在ZooKeeper的指定节点上添加Listener,监听该节点中的数据变化或是其子节点的变化,从而触发相应的业务逻辑。

I0Itec ZkClient提供了IZkDataListener、IZkChildListener和IZkStateListener三类Listener接口,它们的作用如下:

  • IZkDataListener:监听指定节点的数据变化。
  • IZkChildListener:监听指定节点的子节点变化。
  • IZkStateListener:监听ZooKeeper连接状态的变化。

它们都位于org.I0Itec.zkclient包中,定义如下:

org.I0Itec.zkclient
  • public interface IZkDataListener {
  • // 当被监听的节点中保存的数据发生变化时,该方法会被触发
  • public void handleDataChange(String dataPath, Object data) throws Exception;
  • // 当被监听的节点被删除时,该方法会被触发
  • public void handleDataDeleted(String dataPath) throws Exception;
  • }
  • public interface IZkChildListener {
  • /**
  • * Called when the children of the given path changed.
  • * 当被监听节点的子节点发生变化时,该方法会被触发
  • * @param parentPath 监听的路径
  • * The parent path
  • * @param currentChilds 当前子节点的集合
  • * The children or null if the root node (parent path) was deleted.
  • */
  • public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception;
  • }
  • public interface IZkStateListener {
  • /**
  • * Called when the zookeeper connection state has changed.
  • * 当客户端与ZooKeeper连接状态发生变化时,handleStateChanged()方法会被触发
  • *
  • * @param state 新的状态
  • * The new state.
  • */
  • public void handleStateChanged(KeeperState state) throws Exception;
  • /**
  • * Called after the zookeeper session has expired and a new session has been created. You would have to re-create
  • * any ephemeral nodes here.
  • * 当客户端的Session过期后重新建立新的Session时,handleNewSession()方法会被触发
  • */
  • public void handleNewSession() throws Exception;
  • /**
  • * Called when a session cannot be re-established. This should be used to implement connection
  • * failure handling e.g. retry to connect or pass the error up
  • * 当Session重建失败时,handleSessionEstablishmentError()方法会被触发
  • *
  • * @param error Session重建连接失败时收到的错误
  • * The error that prevents a session from being established
  • */
  • public void handleSessionEstablishmentError(final Throwable error) throws Exception;
  • }

Kafka中提供了五个IZkDataListener接口的实现:

  1. LeaderChangeListener:监听/controller节点的数据变化,当此节点中保存的Leader ID发生变化时,会触发LeaderChangeListener进行相应的处理。
  2. PartitionModificationsListener:监听/brokers/topics/[topic_name]节点中的数据变化,用于监听一个Topic的分区变化。
  3. PreferredReplicaElectionListener:监听/admin/preferred_replica_election节点的数据变化。当通过kafka-preferred-replica-election.sh脚本命令指定某些分区需要进行“优先副本”选举时会将指定分区的信息写入该节点,从而触发PreferredReplicaElectionListener进行处理。
  4. PartitionsReassignedListener:监听/admin/reassign_partitions节点的数据变化。当管理人员通过kafka-reassign-partitions.sh脚本命令指定某些分区需要重新分配副本时,会将指定分区的信息写入该节点,从而触发PartitionsReassignedListener进行处理。
  5. ReassignedPartitionsIsrChangeListener:监听/broker/topics/[topic_name]/partitions/[partitionId]/state节点的数据变化,负责处理进行副本重新分配的分区的ISR集合变化。

同时提供了四个IZkChildListener接口的实现:

  1. DeleteTopicsListener:监听/admin/delete_topics节点下的子节点变化,管理Topic的删除操作。
  2. TopicChangeListener:监听/brokers/topics节点的子节点变化,管理Topic的更改操作。
  3. IsrChangeNotificationListener:监听/isr_change_notification路径下的子节点变化,当某些分区的ISR集合变化时通知整个集群中的所有Broker。
  4. BrokerChangeListener:监听/brokers/ids节点下的子节点变化,负责处理Broker的上线和故障下线。当Broker上线时会在/brokers/ids下创建临时节点,下线时会删除对应的临时节点。

IZkStateListener接口的实现只有SessionExpirationListener一个,负责监听KafkaController与ZooKeeper的连接状态。

2. KafkaController相关Listener

在前面我们了解到,Kafka集群中每个Broker启动时都会创建一个KafkaController对象,用于管理该Broker;KafkaController类的定义与重要字段如下:

kafka.controller.KafkaController
  • /**
  • * @param config 配置信息
  • * @param zkUtils Zookeeper工具
  • * @param brokerState 当前KafkaController所处的Broker的状态
  • */
  • class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerState: BrokerState, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
  • this.logIdent = "[Controller " + config.brokerId + "]: "
  • // 标识当前KafkaController是否在运行
  • private var isRunning = true
  • private val stateChangeLogger = KafkaController.stateChangeLogger
  • // 当前KafkaController依赖的ControllerContext对象
  • val controllerContext = new ControllerContext(zkUtils, config.zkSessionTimeoutMs)
  • // 当前KafkaController依赖的分区状态机及副本状态机
  • val partitionStateMachine = new PartitionStateMachine(this)
  • val replicaStateMachine = new ReplicaStateMachine(this)
  • // Controller Leader选举器
  • private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
  • onControllerResignation, config.brokerId)
  • // have a separate scheduler for the controller to be able to start and stop independently of the
  • // kafka server
  • // 分区自动均衡任务的调度器
  • private val autoRebalanceScheduler = new KafkaScheduler(1)
  • // 主题删除操作管理器
  • var deleteTopicManager: TopicDeletionManager = null
  • // Leader副本选举器
  • val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext, config)
  • private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
  • private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
  • private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
  • // 请求批量发送器
  • private val brokerRequestBatch = new ControllerBrokerRequestBatch(this)
  • // 分区重分配监听器
  • private val partitionReassignedListener = new PartitionsReassignedListener(this)
  • // "优先副本"选举监听器
  • private val preferredReplicaElectionListener = new PreferredReplicaElectionListener(this)
  • // ISR变化监听器
  • private val isrChangeNotificationListener = new IsrChangeNotificationListener(this)
  • ...
  • }

从源码可以得知,每个KafkaController依赖着其独有的ControllerContext、PartitionStateMachine、ReplicaStateMachine、ControllerBrokerRequestBatch、TopicDeletionManager、ZookeeperLeaderElector以及大量的监听器对象。KafkaController的启动过程由它的startup()方法完成,源码如下:

kafka.controller.KafkaController#startup
  • /**
  • * Invoked when the controller module of a Kafka server is started up. This does not assume that the current broker
  • * is the controller. It merely registers the session expiration listener and starts the controller leader
  • * elector
  • */
  • def startup() = {
  • inLock(controllerContext.controllerLock) {
  • info("Controller starting up")
  • // 注册SessionExpirationListener
  • registerSessionExpirationListener()
  • isRunning = true
  • // 启动ZookeeperLeaderElector
  • controllerElector.startup
  • info("Controller startup complete")
  • }
  • }

下面将分别介绍注册SessionExpirationListener及ZookeeperLeaderElector辅助实现的startup()方法。

2.1. SessionExpirationListener

KafkaController的startup()方法调用registerSessionExpirationListener()以注册SessionExpirationListener,并通过controllerElector.startup则启动了ZookeeperLeaderElector选举器。其中registerSessionExpirationListener()方法的源码如下:

kafka.controller.KafkaController#registerSessionExpirationListener
  • private def registerSessionExpirationListener() = {
  • zkUtils.zkClient.subscribeStateChanges(new SessionExpirationListener())
  • }

SessionExpirationListener继承自IZkStateListener接口,它被定义为KafkaController的内部类,监听KafkaController与ZooKeeper的连接状态,其源码非常少:

kafka.controller.KafkaController.SessionExpirationListener
  • /**
  • * 监听KafkaController与ZooKeeper的连接状态。
  • * 当KafkaController与ZooKeeper的连接超时后创建新连接时会触发handleNewSession()方法
  • */
  • class SessionExpirationListener() extends IZkStateListener with Logging {
  • this.logIdent = "[SessionExpirationListener on " + config.brokerId + "], "
  • @throws(classOf[Exception])
  • def handleStateChanged(state: KeeperState) {
  • // do nothing, since zkclient will do reconnect for us.
  • }
  • /**
  • * Called after the zookeeper session has expired and a new session has been created. You would have to re-create
  • * any ephemeral nodes here.
  • *
  • * @throws Exception
  • * On any error.
  • */
  • @throws(classOf[Exception])
  • def handleNewSession() {
  • info("ZK expired; shut down all controller components and try to re-elect")
  • inLock(controllerContext.controllerLock) {
  • // 负责清理KafkaController依赖的对象
  • onControllerResignation()
  • // 尝试选举新Controller Leader
  • controllerElector.elect
  • }
  • }
  • override def handleSessionEstablishmentError(error: Throwable): Unit = {
  • //no-op handleSessionEstablishmentError in KafkaHealthCheck should handle this error in its handleSessionEstablishmentError
  • }
  • }

当KafkaController与ZooKeeper的连接超时后创建新连接时会触发其handleNewSession()方法,该方法会调用onControllerResignation()清理KafkaController原来注册的监听器以及依赖的ControllerChannelManager、PartitionStateMachine、ReplicaStateMachine等对象,源码如下:

kafka.controller.KafkaController#onControllerResignation
  • /**
  • * This callback is invoked by the zookeeper leader elector when the current broker resigns as the controller. This is
  • * required to clean up internal controller data structures
  • * 当LeaderChangeListener监听到/controller中的数据被删除或改变时,
  • * 旧的Controller Leader需要调用onControllerResignation()回调函数进行一些清理工作
  • */
  • def onControllerResignation() {
  • debug("Controller resigning, broker id %d".format(config.brokerId))
  • // de-register listeners
  • // 取消Zookeeper上的监听器
  • // 取消IsrChangeNotificationListener
  • deregisterIsrChangeNotificationListener()
  • // 取消PartitionsReassignedListener
  • deregisterReassignedPartitionsListener()
  • // 取消PreferredReplicaElec tionListener
  • deregisterPreferredReplicaElectionListener()
  • // shutdown delete topic manager
  • // 关闭TopicDeletionManager
  • if (deleteTopicManager != null)
  • deleteTopicManager.shutdown()
  • // shutdown leader rebalance scheduler
  • // 如果配置开启了Leader自动均衡,则关闭对应的partition-rebalance定时任务
  • if (config.autoLeaderRebalanceEnable)
  • autoRebalanceScheduler.shutdown()
  • inLock(controllerContext.controllerLock) {
  • // de-register partition ISR listener for on-going partition reassignment task
  • // 取消所有的ReassignedPartitionsIsrChangeListener
  • deregisterReassignedPartitionsIsrChangeListeners()
  • // shutdown partition state machine
  • // 关闭PartitionStateMachine和ReplicaStateMachine
  • partitionStateMachine.shutdown()
  • // shutdown replica state machine
  • replicaStateMachine.shutdown()
  • // shutdown controller channel manager
  • if(controllerContext.controllerChannelManager != null) {
  • // 关闭ControllerChannelManager,断开与集群中其他Broker的链接
  • controllerContext.controllerChannelManager.shutdown()
  • controllerContext.controllerChannelManager = null
  • }
  • // reset controller context
  • // 重置Controller的Epoch和epochZkVersion
  • controllerContext.epoch=0
  • controllerContext.epochZkVersion=0
  • // 切换Broker状态
  • brokerState.newState(RunningAsBroker)
  • info("Broker %d resigned as the controller".format(config.brokerId))
  • }
  • }

同时由于存在KafkaController断线重连的情况,因此handleNewSession()方法还会调用controllerElector.elect选举新的KafkaController Leader,这里使用的controllerElector选举器对象为ZookeeperLeaderElector类型,我们将在后面讲解其elect()方法的实现。

2.2. ZookeeperLeaderElector

在KafkaController的startup()方法中,最后会将启动操作交给controllerElectorstartup()方法;我们先来了解ZookeeperLeaderElector类的定义:

kafka.server.ZookeeperLeaderElector
  • /**
  • * This class handles zookeeper based leader election based on an ephemeral path. The election module does not handle
  • * session expiration, instead it assumes the caller will handle it by probably try to re-elect again. If the existing
  • * leader is dead, this class will handle automatic re-election and if it succeeds, it invokes the leader state change
  • * callback
  • *
  • * @param controllerContext 与之相关联的KafkaController的ControllerContext对象
  • * @param electionPath 选举监听路径,即/controller
  • * @param onBecomingLeader KafkaController成为Leader的回调方法,由与之相关联的KafkaController对象传入
  • * @param onResigningAsLeader KafkaController从Leader成为Follower的回调方法,由与之相关联的KafkaController对象传入
  • * @param brokerId 与之相关联的KafkaController所在的Broker的ID
  • */
  • class ZookeeperLeaderElector(controllerContext: ControllerContext,
  • electionPath: String,
  • onBecomingLeader: () => Unit,
  • onResigningAsLeader: () => Unit,
  • brokerId: Int)
  • extends LeaderElector with Logging {
  • // 缓存当前的Controller LeaderId
  • var leaderId = -1
  • // create the election path in ZK, if one does not exist
  • // 当electionPath路径在Zookeeper中不存在时会尝试创建该路径
  • val index = electionPath.lastIndexOf("/")
  • if (index > 0)
  • controllerContext.zkUtils.makeSurePersistentPathExists(electionPath.substring(0, index))
  • // 监听Zookeeper的/controller节点的数据变化,当此节点中保存的LeaderID发生变化时,出发LeaderChangeListener进行相应的处理
  • val leaderChangeListener = new LeaderChangeListener
  • ...
  • }

从上面的源码可知,ZookeeperLeaderElector在初始化时创建了LeaderChangeListener监听器。

2.2.1. LeaderChangeListener

LeaderChangeListener监听器定义为ZookeeperLeaderElector的内部类,它会监听Zookeeper的/controller节点的数据变化,当此节点中保存的Leader ID发生变化时,会触发其handleDataChange(dataPath: String, data: Object)方法进行相应的处理,当/controller节点中的数据被删除时会触发其handleDataDeleted(dataPath: String)方法进行处理,该监听器只有这两个处理方法,源码如下:

kafka.server.ZookeeperLeaderElector.LeaderChangeListener
  • /**
  • * We do not have session expiration listen in the ZkElection, but assuming the caller who uses this module will
  • * have its own session expiration listener and handler
  • */
  • class LeaderChangeListener extends IZkDataListener with Logging {
  • /**
  • * Called when the leader information stored in zookeeper has changed. Record the new leader in memory
  • * @throws Exception On any error.
  • */
  • @throws(classOf[Exception])
  • def handleDataChange(dataPath: String, data: Object) {
  • inLock(controllerContext.controllerLock) {
  • // 标识了发生变化前,对应的KafkaController的角色是否是Leader
  • val amILeaderBeforeDataChange = amILeader
  • // 记录新的Controller Leader的ID
  • leaderId = KafkaController.parseControllerId(data.toString)
  • info("New leader is %d".format(leaderId))
  • // The old leader needs to resign leadership if it is no longer the leader
  • // 如果当前Broker由Controller Leader变成Follower,则要进行清理工作
  • if (amILeaderBeforeDataChange && !amILeader) // 数据变化前是Leader,变化后不是Leader,说明出现了角色切换
  • // onResigningAsLeader()实际调用了KafkaController的onControllerResignation()
  • onResigningAsLeader()
  • }
  • }
  • /**
  • * Called when the leader information stored in zookeeper has been delete. Try to elect as the leader
  • * 当Zookeeper的/controller节点中的数据被删除时会触发handleDataDeleted()方法进行处理
  • * @throws Exception
  • * On any error.
  • */
  • @throws(classOf[Exception])
  • def handleDataDeleted(dataPath: String) {
  • inLock(controllerContext.controllerLock) {
  • debug("%s leader change listener fired for path %s to handle data deleted: trying to elect as a leader"
  • .format(brokerId, dataPath))
  • if(amILeader) // 如果发生变化前对应的KafkaController的角色是Leader,则需要进行清理操作
  • // onResigningAsLeader()实际是KafkaController的onControllerResignation()
  • onResigningAsLeader()
  • // 尝试新Controller Leader的选举
  • elect
  • }
  • }
  • }

这两个方法的实现比较简单,handleDataChange(...)被触发时表示可能发生了KafkaController Leader选举,它会从传入参数解析新的Leader的Broker ID并进行记录,然后对比数据变化前后当前Broker上的KafkaController是否从Leader角色切换为了Follower角色,如果是则会调用onResigningAsLeader()回调函数进行该KafkaController的相关监听器及依赖对象的清理操作;而handleDataDeleted(...)方法被触发时表示KafkaController Leader可能宕机下线了,需要选举新的Leader,此时如果当前Broker上的KafkaController就是旧Leader角色,则需要调用onResigningAsLeader()回调函数对该KafkaController的相关监听器及依赖对象的清理操作。

onResigningAsLeader()回调函数属于ZookeeperLeaderElector对象,其实是在KafkaController初始化时构造ZookeeperLeaderElector对象传入的,就是前面讲到的KafkaController的onControllerResignation()方法:

kafka.controller.KafkaController#controllerElector
  • // Controller Leader选举器
  • private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
  • onControllerResignation, config.brokerId)

2.2.2. KafkaController Leader选举

回到ZookeeperLeaderElector的startup()方法,前面讲到,KafkaController的startup()方法将启动操作委托给了ZookeeperLeaderElector的startup()方法,而ZookeeperLeaderElector的startup()的调用必然是在其初始化完成并创建了LeaderChangeListener之后,源码如下:

kafka.server.ZookeeperLeaderElector#startup
  • def startup {
  • inLock(controllerContext.controllerLock) {
  • // 在Zookeeper的/controller节点上注册LeaderChangeListener进行监听
  • controllerContext.zkUtils.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)
  • // 立即进行选举操作
  • elect
  • }
  • }

该方法的工作非常简单,首先在Zookeeper的/controller节点上注册LeaderChangeListener监听器,然后调用自己的elect: Boolean方法进行KafkaController Leader的选举,elect: Boolean方法的源码如下:

kafka.server.ZookeeperLeaderElector#elect
  • // 具体的选举操作方法
  • def elect: Boolean = {
  • // 当前时间
  • val timestamp = SystemTime.milliseconds.toString
  • // 转换选举信息为JSON串
  • val electString = Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp))
  • // 获取Zookeeper中当前记录的Controller Leader的ID
  • leaderId = getControllerID
  • /*
  • * We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition,
  • * it's possible that the controller has already been elected when we get here. This check will prevent the following
  • * createEphemeralPath method from getting into an infinite loop if this broker is already the controller.
  • */
  • if(leaderId != -1) { // 已存在Controller Leader,放弃选举
  • debug("Broker %d has been elected as leader, so stopping the election process.".format(leaderId))
  • return amILeader
  • }
  • try {
  • // 尝试创建临时节点,如果临时节点已经存在,则抛出异常
  • val zkCheckedEphemeral = new ZKCheckedEphemeral(electionPath,
  • electString,
  • controllerContext.zkUtils.zkConnection.getZookeeper,
  • JaasUtils.isZkSecurityEnabled())
  • zkCheckedEphemeral.create()
  • info(brokerId + " successfully elected as leader")
  • // 走到这里说明临时节点创建成功
  • // 更新LeaderId字段为当前Broker的ID,即当前Broker成功成为Controller Leader
  • leaderId = brokerId
  • // onBecomingLeader()实际调用了KafkaController的onControllerFailover()方法
  • onBecomingLeader()
  • } catch {
  • case e: ZkNodeExistsException =>
  • // If someone else has written the path, then
  • leaderId = getControllerID
  • if (leaderId != -1)
  • debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
  • else
  • warn("A leader has been elected but just resigned, this will result in another round of election")
  • case e2: Throwable =>
  • error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
  • // 对onBecomingLeader()方法抛出的异常的处理,重置leaderId,并删除Zookeeper的/controller路径
  • resign()
  • }
  • // 检测当前Broker是否是Controller Leader
  • amILeader
  • }
  • // kafka.server.ZookeeperLeaderElector#getControllerID
  • // 从Zookeeper的/controller路径读取KafkaController Leader的Broker ID
  • private def getControllerID(): Int = {
  • controllerContext.zkUtils.readDataMaybeNull(electionPath)._1 match {
  • case Some(controller) => KafkaController.parseControllerId(controller)
  • case None => -1
  • }
  • }
  • // kafka.server.ZookeeperLeaderElector#resign
  • // 处理onBecomingLeader()方法抛出的异常
  • def resign() = {
  • // 重置LeaderID为-1
  • leaderId = -1
  • // 删除Zookeeper中的/controller路径
  • controllerContext.zkUtils.deletePath(electionPath)
  • }

注:KafkaController触发选举的地方有三处:

  1. 第一次启动的时候,即由ZookeeperLeaderElector的startup()方法触发。
  2. LeaderChangeListener监听到/controller节点中数据被删除,位于前面讲解过的LeaderChangeListener的handleDataDeleted(...)方法中。
  3. ZooKeeper连接过期并重新建立之后,位于前面讲解的SessionExpirationListener的handleNewSession()方法中。

elect: Boolean方法的实现其实比较简单,即尝试向Zookeeper的/controller路径下写入临时节点,节点内容是一个JSON字符串,包含了versionbrokeridtimestamp三个字段,其中brokerid即为当前KafkaController所在的Broker的ID;如果创建临时节点成功,那么当前KafkaController就成为Leader角色,如果创建不成功则说明有其他KafkaController成为了Leader。当前KafkaController成为Leader之后,会调用ZookeeperLeaderElector的onBecomingLeader()方法,该方法是在KafkaController初始化时构造ZookeeperLeaderElector对象传入的,即KafkaController的onControllerFailover()方法,源码如下:

kafka.controller.KafkaController#onControllerFailover
  • /**
  • * 当前Broker成功选举为Controller Leader时会通过该方法完成一系列的初始化操作
  • *
  • * This callback is invoked by the zookeeper leader elector on electing the current broker as the new controller.
  • * It does the following things on the become-controller state change -
  • * 1. Register controller epoch changed listener
  • * 2. Increments the controller epoch
  • * 3. Initializes the controller's context object that holds cache objects for current topics, live brokers and
  • * leaders for all existing partitions.
  • * 4. Starts the controller's channel manager
  • * 5. Starts the replica state machine
  • * 6. Starts the partition state machine
  • * If it encounters any unexpected exception/error while becoming controller, it resigns as the current controller.
  • * This ensures another controller election will be triggered and there will always be an actively serving controller
  • */
  • def onControllerFailover() {
  • if(isRunning) {
  • info("Broker %d starting become controller state transition".format(config.brokerId))
  • //read controller epoch from zk
  • // 读取Zookeeper中记录的ControllerEpochPath信息并更新到ControllerContext中
  • readControllerEpochFromZookeeper()
  • // increment the controller epoch
  • // 递增Controller Epoch,并写入Zookeeper
  • incrementControllerEpoch(zkUtils.zkClient)
  • // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks
  • // 注册一系列Zookeeper监听器
  • // 注册PartitionsReassignedListener
  • registerReassignedPartitionsListener()
  • // 注册IsrChangeNotificationListener
  • registerIsrChangeNotificationListener()
  • // 注册PreferredReplicaElecti onListener
  • registerPreferredReplicaElectionListener()
  • // 注册TopicChangeListener、DeleteTopicsListener
  • partitionStateMachine.registerListeners()
  • // 注册BrokerChangeListener
  • replicaStateMachine.registerListeners()
  • /**
  • * 初始化ControllerContext,从Zookeeper中读取主题、分区、副本等原数据
  • * 启动ControllerChannelManager、TopicDeletionManager等组件
  • */
  • initializeControllerContext()
  • // 启动副本状态机,以初始化各个副本的状态
  • replicaStateMachine.startup()
  • // 启动烦去状态机,以初始化各个分区的状态
  • partitionStateMachine.startup()
  • // register the partition change listeners for all existing topics on failover
  • // 为所有Topic注册PartitionModificationsListener
  • controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
  • info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch))
  • // 修改Broker状态
  • brokerState.newState(RunningAsController)
  • // 处理副本重新分配的分区,内部会调用initiateReassignReplicasForTopicPartition()
  • maybeTriggerPartitionReassignment()
  • // 处理需要进行"优先副本"选举的分区,内部会调用onPreferredReplicaElection()
  • maybeTriggerPreferredReplicaElection()
  • /* send partition leadership info to all live brokers */
  • // 向集群中所有的Broker发送UpdateMetadataRequest更新其MetadataCache
  • sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
  • if (config.autoLeaderRebalanceEnable) {
  • info("starting the partition rebalance scheduler")
  • // 启动partition-rebalance-thread定时任务,周期性检测是否需要进行分区的自动均衡
  • autoRebalanceScheduler.startup()
  • // 每隔5秒周期性调用checkAndTriggerPartitionRebalance()方法
  • autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
  • 5, config.leaderImbalanceCheckIntervalSeconds.toLong, TimeUnit.SECONDS)
  • }
  • // 启动TopicDeletionManager,底层会启动DeleteTopicsThread线程
  • deleteTopicManager.start()
  • }
  • else
  • info("Controller has been shut down, aborting startup/failover")
  • }

该方法的流程涉及到多个环节,同时我们还需要明确一点,只有成为Controller Leader的KafkaController才会调用上面这个方法进行一系列的初始化操作,其他的Controller Follower是不会调用的,也即是说,KafkaController Leader相对于其他的Follower,将承担更多的职责:

  1. 监听分区相关的变化。为ZooKeeper中的/admin/reassign_partitions节点注册PartitionsReassignedListener,用来处理分区重分配的动作。为ZooKeeper中的/isr_change_notification节点注册IsrChangeNotificationListener,用来处理ISR集合变更的动作。为ZooKeeper中的/admin/preferred-replica-election节点添加PreferredReplicaElectionListener,用来处理优先副本的选举动作。
  2. 监听主题相关的变化。为ZooKeeper中的/brokers/topics节点添加TopicChangeListener,用来处理主题增减的变化;为ZooKeeper中的/admin/delete_topics节点添加DeleteTopicsListener,用来处理删除主题的动作。
  3. 监听Broker相关的变化。为ZooKeeper中的/brokers/ids节点添加BrokerChangeListener,用来处理Broker增减的变化。
  4. 从ZooKeeper中读取获取当前所有与主题、分区及Broker有关的信息并进行相应的管理。对所有主题对应的ZooKeeper中的/brokers/topics/[topic_name]节点添加PartitionModificationsListener,用来监听主题中的分区分配变化。
  5. 启动并管理分区状态机和副本状态机。
  6. 更新集群的元数据信息。
  7. 如果参数auto.leader.rebalance.enable设置为true,则还会开启一个名为“partition-rebalance-thread”的定时任务来负责维护分区的优先副本的均衡。
  8. 启动主题删除管理器,会创建并启动DeleteTopicsThread线程,用于进行主题删除。

下面将分别对该方法涉及到的每个环节进行详细介绍。

2.2.2.1. 递增Controller Epoch

在每次完成KafkaController Leader的选举之后,需要递增记录在Zookeeper中的Controller Epoch年代信息,以便其它KafkaController根据该年代信息判断本地缓存数据是否有效。onControllerFailover()方法的最开始就会通过readControllerEpochFromZookeeper()incrementControllerEpoch(zkUtils.zkClient)方法进行递增更新,这两个方法的源码如下:

kafka.controller.KafkaController
  • // kafka.controller.KafkaController#readControllerEpochFromZookeeper
  • private def readControllerEpochFromZookeeper() {
  • // initialize the controller epoch and zk version by reading from zookeeper
  • if(controllerContext.zkUtils.pathExists(ZkUtils.ControllerEpochPath)) { // 判断是否存在/controller_epoch路径
  • // 读取/controller_epoch路径的数据,读到的是一个Tuple结构,(epoch, zkVersion)
  • val epochData = controllerContext.zkUtils.readData(ZkUtils.ControllerEpochPath)
  • // 解析Controller Epoch并记录到ControllerContext中
  • controllerContext.epoch = epochData._1.toInt
  • // 解析epochZkVersion并记录到ControllerContext中
  • controllerContext.epochZkVersion = epochData._2.getVersion
  • info("Initialized controller epoch to %d and zk version %d".format(controllerContext.epoch, controllerContext.epochZkVersion))
  • }
  • }
  • // kafka.controller.KafkaController#incrementControllerEpoch
  • def incrementControllerEpoch(zkClient: ZkClient) = {
  • try {
  • // 将ControllerContext中记录的Controller Epoch递增加1
  • var newControllerEpoch = controllerContext.epoch + 1
  • // 更新Zookeeper中的Controller Epoch
  • val (updateSucceeded, newVersion) = zkUtils.conditionalUpdatePersistentPathIfExists(
  • ZkUtils.ControllerEpochPath, newControllerEpoch.toString, controllerContext.epochZkVersion)
  • if(!updateSucceeded)
  • // 更新Zookeeper中的信息失败,抛出ControllerMovedException异常
  • throw new ControllerMovedException("Controller moved to another broker. Aborting controller startup procedure")
  • else {
  • // 更新Zookeeper中的信息成功,记录到ControllerContext中
  • controllerContext.epochZkVersion = newVersion
  • controllerContext.epoch = newControllerEpoch
  • }
  • } catch {
  • case nne: ZkNoNodeException => // 节点不存在异常
  • // if path doesn't exist, this is the first controller whose epoch should be 1
  • // the following call can still fail if another controller gets elected between checking if the path exists and
  • // trying to create the controller epoch path
  • try {
  • // 节点不存在则创建节点,初始Controller Epoch为1
  • zkClient.createPersistent(ZkUtils.ControllerEpochPath, KafkaController.InitialControllerEpoch.toString)
  • controllerContext.epoch = KafkaController.InitialControllerEpoch
  • controllerContext.epochZkVersion = KafkaController.InitialControllerEpochZkVersion
  • } catch {
  • // 创建节点也有可能失败,由于其他KafkaController抢先创建了节点此时会抛出ZkNodeExistsException异常
  • case e: ZkNodeExistsException => throw new ControllerMovedException("Controller moved to another broker. " +
  • "Aborting controller startup procedure")
  • case oe: Throwable => error("Error while incrementing controller epoch", oe)
  • }
  • case oe: Throwable => error("Error while incrementing controller epoch", oe)
  • }
  • info("Controller %d incremented epoch to %d".format(config.brokerId, controllerContext.epoch))
  • }

这两个方法的功能实现是比较简单的,都是用于操作/controller_epoch路径下的数据,一个用于读取,一个用于递增更新。

2.2.2.2. 注册监听器

在对Zookeeper中的Controller Epoch进行更新之后,还会向Zookeeper中的某些路径注册大量的监听器,代码片段如下:

kafka.controller.KafkaController#onControllerFailover
  • ...
  • // 注册一系列Zookeeper监听器
  • // 注册PartitionsReassignedListener
  • registerReassignedPartitionsListener()
  • // 注册IsrChangeNotificationListener
  • registerIsrChangeNotificationListener()
  • // 注册PreferredReplicaElectionListener
  • registerPreferredReplicaElectionListener()
  • // 注册TopicChangeListener、DeleteTopicsListener
  • partitionStateMachine.registerListeners()
  • // 注册BrokerChangeListener
  • replicaStateMachine.registerListeners()
  • ...
  • // register the partition change listeners for all existing topics on failover
  • // 为所有Topic注册PartitionModificationsListener
  • controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
  • ...

其中前三个方法都属于KafkaController,源码如下:

kafka.controller.KafkaController
  • // 注册IsrChangeNotificationListener监听器,监听路径为/isr_change_notification
  • private def registerIsrChangeNotificationListener() = {
  • debug("Registering IsrChangeNotificationListener")
  • zkUtils.zkClient.subscribeChildChanges(ZkUtils.IsrChangeNotificationPath, isrChangeNotificationListener)
  • }
  • // 注册PartitionsReassignedListener监听器,监听路径为/admin/reassign_partitions
  • private def registerReassignedPartitionsListener() = {
  • zkUtils.zkClient.subscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignedListener)
  • }
  • // 注册PreferredReplicaElectionListener监听器,监听路径为/admin/preferred_replica_election
  • private def registerPreferredReplicaElectionListener() {
  • zkUtils.zkClient.subscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, preferredReplicaElectionListener)
  • }

这些监听器都是在KafkaController实例初始化时构造的。

PartitionStateMachine状态机的registerListeners()注册了TopicChangeListener、DeleteTopicsListener两个监听器,registerPartitionChangeListener(...)为每个主题注册了PartitionModificationsListener监听器,源码如下:

kafka.controller.PartitionStateMachine
  • // register topic and partition change listeners
  • def registerListeners() {
  • // 注册TopicChangeListener
  • registerTopicChangeListener()
  • if(controller.config.deleteTopicEnable) // 判断是否开启了主题删除功能
  • // 注册DeleteTopicListener
  • registerDeleteTopicListener()
  • }
  • // 注册TopicChangeListener监听器,监听路径为/brokers/topics
  • private def registerTopicChangeListener() = {
  • zkUtils.zkClient.subscribeChildChanges(BrokerTopicsPath, topicChangeListener)
  • }
  • // 注册DeleteTopicListener监听器,监听路径为/admin/delete_topics
  • private def registerDeleteTopicListener() = {
  • zkUtils.zkClient.subscribeChildChanges(DeleteTopicsPath, deleteTopicsListener)
  • }
  • // 为指定主题注册PartitionModificationsListener监听器,监听路径为/brokers/topics/[topic_name]
  • def registerPartitionChangeListener(topic: String) = {
  • partitionModificationsListeners.put(topic, new PartitionModificationsListener(topic))
  • // 向Zookeeper的/brokers/topics/[topic_name]路径注册PartitionModificationsListener监听器
  • zkUtils.zkClient.subscribeDataChanges(getTopicPath(topic), partitionModificationsListeners(topic))
  • }

ReplicaStateMachine状态机的registerListeners()注册了BrokerChangeListener监听器,源码如下:

kafka.controller.ReplicaStateMachine
  • // register ZK listeners of the replica state machine
  • def registerListeners() {
  • // register broker change listener
  • registerBrokerChangeListener()
  • }
  • // 注册BrokerChangeListener监听器,监听路径为/brokers/ids
  • private def registerBrokerChangeListener() = {
  • zkUtils.zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, brokerChangeListener)
  • }

关于上面涉及到的TopicChangeListener、DeleteTopicListener和BrokerChangeListener三个监听器会在后面详细介绍。

2.2.2.3. 初始化ControllerContext

在注册完大量监听器后,会调用initializeControllerContext()方法初始化KafkaController的ControllerContext对象,该过程主要是从Zookeeper中读取主题、分区、副本等元数据并记录到本地缓存,然后启动ControllerChannelManager、TopicDeletionManager等组件,源码如下:

kafka.controller.KafkaController#initializeControllerContext
  • private def initializeControllerContext() {
  • // update controller cache with delete topic information
  • // 从Zookeeper中初始化某些信息以更新ControllerContext的字段
  • // 读取/brokers/ids初始化可用Broker集合
  • controllerContext.liveBrokers = zkUtils.getAllBrokersInCluster().toSet
  • // 读取/brokers/topics初始化集群中全部的Topic信息
  • controllerContext.allTopics = zkUtils.getAllTopics().toSet
  • // 读取/brokers/topics/[topic_name]/partitions初始化每个Partition的AR集合信息
  • controllerContext.partitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(controllerContext.allTopics.toSeq)
  • controllerContext.partitionLeadershipInfo = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch]
  • controllerContext.shuttingDownBrokerIds = mutable.Set.empty[Int]
  • // update the leader and isr cache for all existing partitions from Zookeeper
  • // 读取/brokers/topics/[topic_name]/partitions/[partition_id]/state初始化每个Partition的Leader、ISR集合等信息
  • updateLeaderAndIsrCache()
  • // start the channel manager
  • // 启动ControllerChannelManager
  • startChannelManager()
  • // 读取/admin/preferred_replica_election初始化需要“优先副本”选举的Partition
  • initializePreferredReplicaElection()
  • // 读取/admin/reassign_partitions初始化需要进行副本重新分配的Partition
  • initializePartitionReassignment()
  • // 启动TopicDeletionManager
  • initializeTopicDeletion()
  • info("Currently active brokers in the cluster: %s".format(controllerContext.liveBrokerIds))
  • info("Currently shutting brokers in the cluster: %s".format(controllerContext.shuttingDownBrokerIds))
  • info("Current list of topics in the cluster: %s".format(controllerContext.allTopics))
  • }

下面将分别介绍该方法中涉及到的每一步流程。其中读取集群Broker集合、Topic集合、分区的AR集合信息时涉及的ZkUtils的方法源码如下:

kafka.utils.ZkUtils
  • // 获取集群中所有的Broker
  • def getAllBrokersInCluster(): Seq[Broker] = {
  • // 从Zookeeper的/brokers/ids路径读取
  • val brokerIds = getChildrenParentMayNotExist(BrokerIdsPath).sorted
  • brokerIds.map(_.toInt).map(getBrokerInfo(_)).filter(_.isDefined).map(_.get)
  • }
  • // 获取集群中所有的主题
  • def getAllTopics(): Seq[String] = {
  • // 从Zookeeper的/brokers/topics路径读取
  • val topics = getChildrenParentMayNotExist(BrokerTopicsPath)
  • if(topics == null)
  • Seq.empty[String]
  • else
  • topics
  • }
  • // 获取主题的每个分区的AR集合
  • def getReplicaAssignmentForTopics(topics: Seq[String]): mutable.Map[TopicAndPartition, Seq[Int]] = {
  • // 构造HashMap存储结果
  • val ret = new mutable.HashMap[TopicAndPartition, Seq[Int]]
  • topics.foreach { topic => // 遍历主题集合
  • // 从路径/brokers/topics/[topic_name]读取分区数据
  • val jsonPartitionMapOpt = readDataMaybeNull(getTopicPath(topic))._1
  • jsonPartitionMapOpt match {
  • case Some(jsonPartitionMap) => // 存在分区数据
  • // 解析分区数据
  • Json.parseFull(jsonPartitionMap) match {
  • case Some(m) => m.asInstanceOf[Map[String, Any]].get("partitions") match {
  • case Some(repl) =>
  • // 对应结构为[partitionId, Seq[副本所在的BrokerID]]
  • val replicaMap = repl.asInstanceOf[Map[String, Seq[Int]]]
  • // 遍历字典结构
  • for((partition, replicas) <- replicaMap){
  • // 最终得到的结果为[主题分区, Seq[副本所在的BrokerID]]
  • ret.put(TopicAndPartition(topic, partition.toInt), replicas)
  • debug("Replicas assigned to topic [%s], partition [%s] are [%s]".format(topic, partition, replicas))
  • }
  • case None =>
  • }
  • case None =>
  • }
  • case None =>
  • }
  • }
  • ret
  • }
  • def getChildrenParentMayNotExist(path: String): Seq[String] = {
  • import scala.collection.JavaConversions._
  • // triggers implicit conversion from java list to scala Seq
  • try {
  • zkClient.getChildren(path)
  • } catch {
  • case e: ZkNoNodeException => Nil
  • case e2: Throwable => throw e2
  • }
  • }

初始化每个分区的Leader、ISR集合等信息的updateLeaderAndIsrCache()方法则相对复杂一点,源码如下:

kafka.controller.KafkaController#updateLeaderAndIsrCache
  • def updateLeaderAndIsrCache(topicAndPartitions: Set[TopicAndPartition] = controllerContext.partitionReplicaAssignment.keySet) {
  • // 读取分区的Leader副本、ISR集合等信息,更新ControllerContext,会得到Map[TopicAndPartition, LeaderIsrAndControllerEpoch]结构的结果
  • val leaderAndIsrInfo = zkUtils.getPartitionLeaderAndIsrForTopics(zkUtils.zkClient, topicAndPartitions)
  • for((topicPartition, leaderIsrAndControllerEpoch) <- leaderAndIsrInfo)
  • // 更新ControllerContext中保存的更新
  • controllerContext.partitionLeadershipInfo.put(topicPartition, leaderIsrAndControllerEpoch)
  • }

可见该方法将具体的操作委托给了ZkUtils的getPartitionLeaderAndIsrForTopics(...)方法,源码如下:

kafka.utils.ZkUtils#getPartitionLeaderAndIsrForTopics
  • def getPartitionLeaderAndIsrForTopics(zkClient: ZkClient, topicAndPartitions: Set[TopicAndPartition])
  • : mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = {
  • val ret = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch]
  • // 遍历传入的分区集合
  • for(topicAndPartition <- topicAndPartitions) {
  • // 该过程会构造Zookeeper路径,并读取该路径的信息,构造为LeaderIsrAndControllerEpoch对象返回
  • ReplicationUtils.getLeaderIsrAndEpochForPartition(this, topicAndPartition.topic, topicAndPartition.partition) match {
  • case Some(leaderIsrAndControllerEpoch) => ret.put(topicAndPartition, leaderIsrAndControllerEpoch)
  • case None =>
  • }
  • }
  • ret
  • }

而ZkUtils的getPartitionLeaderAndIsrForTopics(...)方法会遍历传入的分区集合,使用ReplicationUtils的getLeaderIsrAndEpochForPartition(...)分别获取每个分区的Leader、ISR集合等信息:

kafka.utils.ReplicationUtils
  • def getLeaderIsrAndEpochForPartition(zkUtils: ZkUtils, topic: String, partition: Int):Option[LeaderIsrAndControllerEpoch] = {
  • // 根据主题和分区构建路径,即/brokers/topics/[topic_name]/partitions/[partition_id]/state
  • val leaderAndIsrPath = getTopicPartitionLeaderAndIsrPath(topic, partition)
  • // 从Zookeeper读取数据
  • val (leaderAndIsrOpt, stat) = zkUtils.readDataMaybeNull(leaderAndIsrPath)
  • // 将数据解析并构造为LeaderIsrAndControllerEpoch对象
  • leaderAndIsrOpt.flatMap(leaderAndIsrStr => parseLeaderAndIsr(leaderAndIsrStr, leaderAndIsrPath, stat))
  • }
  • // 得到/brokers/topics/[topic_name]/partitions/[partition_id]/state
  • def getTopicPartitionLeaderAndIsrPath(topic: String, partitionId: Int): String =
  • getTopicPartitionPath(topic, partitionId) + "/" + "state"
  • // 得到/brokers/topics/[topic_name]/partitions/[partition_id]
  • def getTopicPartitionPath(topic: String, partitionId: Int): String =
  • getTopicPartitionsPath(topic) + "/" + partitionId
  • // 得到/brokers/topics/[topic_name]/partitions
  • def getTopicPartitionsPath(topic: String): String = {
  • getTopicPath(topic) + "/partitions"
  • }
  • // 得到/brokers/topics/[topic_name]
  • def getTopicPath(topic: String): String = {
  • ZkUtils.BrokerTopicsPath + "/" + topic
  • }
  • // 解析从Zookeeper中读取的Leader及ISR数据
  • // 解析从Zookeeper中读取的Leader及ISR数据
  • private def parseLeaderAndIsr(leaderAndIsrStr: String, path: String, stat: Stat)
  • : Option[LeaderIsrAndControllerEpoch] = {
  • Json.parseFull(leaderAndIsrStr).flatMap {m =>
  • val leaderIsrAndEpochInfo = m.asInstanceOf[Map[String, Any]]
  • val leader = leaderIsrAndEpochInfo.get("leader").get.asInstanceOf[Int] // Leader ID
  • val epoch = leaderIsrAndEpochInfo.get("leader_epoch").get.asInstanceOf[Int] // Leader年代信息
  • val isr = leaderIsrAndEpochInfo.get("isr").get.asInstanceOf[List[Int]] // ISR集合
  • val controllerEpoch = leaderIsrAndEpochInfo.get("controller_epoch").get.asInstanceOf[Int] // Controller年代信息
  • val zkPathVersion = stat.getVersion
  • debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for leaderAndIsrPath %s".format(leader, epoch,
  • isr.toString(), zkPathVersion, path))
  • Some(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr, zkPathVersion), controllerEpoch))}
  • }

最终每个分区的信息是从Zookeeper的/brokers/topics/[topic_name]/partitions/[partition_id]/state路径中读取的。

startChannelManager()方法用于初始化和启动KafkaController对应的ControllerChannelManager,源码如下:

kafka.controller.KafkaController#startChannelManager
  • private def startChannelManager() {
  • controllerContext.controllerChannelManager = new ControllerChannelManager(controllerContext, config, time, metrics, threadNamePrefix)
  • controllerContext.controllerChannelManager.startup()
  • }

在ControllerChannelManager的startup()方法中会启动对应的RequestSendThread线程,该过程在前面已经讲解过了。

initializePreferredReplicaElection()会读取Zookeeper的/admin/preferred_replica_election以初始化需要进行“优先副本”选举的分区,该方法源码如下:

kafka.controller.KafkaController#initializePreferredReplicaElection
  • private def initializePreferredReplicaElection() {
  • // initialize preferred replica election state
  • // 得到需要进行优先副本选举的分区集合
  • val partitionsUndergoingPreferredReplicaElection = zkUtils.getPartitionsUndergoingPreferredReplicaElection()
  • // check if they are already completed or topic was deleted
  • /**
  • * 过滤出已完成优先副本选举的分区,满足下面任意一个条件即认定为已完成:
  • * 1. 其AR集合为空;
  • * 2. AR集合不为空,但Leader副本就是AR集合中的第一个副本
  • */
  • val partitionsThatCompletedPreferredReplicaElection = partitionsUndergoingPreferredReplicaElection.filter { partition =>
  • // 获取分区的AR集合
  • val replicasOpt = controllerContext.partitionReplicaAssignment.get(partition)
  • // 判断AR集合是否为空
  • val topicDeleted = replicasOpt.isEmpty
  • // 如果AR集合不为空,判断该分区的Leader是否是AR集合中的第一个副本
  • val successful =
  • if(!topicDeleted) controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader == replicasOpt.get.head else false
  • // 根据条件进行过滤
  • successful || topicDeleted
  • }
  • // 添加需要进行优先副本选举的分区
  • controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitionsUndergoingPreferredReplicaElection
  • // 减去已完成优先副本选举的分区
  • controllerContext.partitionsUndergoingPreferredReplicaElection --= partitionsThatCompletedPreferredReplicaElection
  • info("Partitions undergoing preferred replica election: %s".format(partitionsUndergoingPreferredReplicaElection.mkString(",")))
  • info("Partitions that completed preferred replica election: %s".format(partitionsThatCompletedPreferredReplicaElection.mkString(",")))
  • info("Resuming preferred replica election for partitions: %s".format(controllerContext.partitionsUndergoingPreferredReplicaElection.mkString(",")))
  • }

最终需要进行“优先副本”选举的分区会被记录到KafkaController的ControllerContext实例的partitionsUndergoingPreferredReplicaElection字段中。

接下来的initializePartitionReassignment()方法会从Zookeeper中读取/admin/reassign_partitions路径的数据以初始化需要进行副本重新分配的分区,源码如下:

kafka.controller.KafkaController#initializePartitionReassignment
  • private def initializePartitionReassignment() {
  • // read the partitions being reassigned from zookeeper path /admin/reassign_partitions
  • val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned()
  • // check if they are already completed or topic was deleted
  • /**
  • * 需要过滤掉不需要进行副本重分配的分区,满足下面任意一个条件即不需要:
  • * 1. 分区的AR副本集为空,表示该分区所属的主题可能被删除了;
  • * 2. 分区的AR副本集不为空,但与从Zookeeper中读取到的分区副本集相同,说明不需要重分配。
  • */
  • val reassignedPartitions = partitionsBeingReassigned.filter { partition =>
  • // 分区的AR副本集
  • val replicasOpt = controllerContext.partitionReplicaAssignment.get(partition._1)
  • // AR副本是否为空
  • val topicDeleted = replicasOpt.isEmpty
  • // 如果AR副本集不为空,则比对当前AR副本集合Zookeeper中得到的分区副本集是否相同
  • val successful = if(!topicDeleted) replicasOpt.get == partition._2.newReplicas else false
  • topicDeleted || successful
  • }.map(_._1)
  • // 更新ControllerContext和Zookeeper中记录的需要进行副本重分配的分区
  • reassignedPartitions.foreach(p => removePartitionFromReassignedPartitions(p))
  • var partitionsToReassign: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] = new mutable.HashMap
  • // 添加需要进行副本重分配的分区
  • partitionsToReassign ++= partitionsBeingReassigned
  • // 减去不需要进行副本重分配的分区
  • partitionsToReassign --= reassignedPartitions
  • // 维护ControllerContext的partitionsBeingReassigned集合
  • controllerContext.partitionsBeingReassigned ++= partitionsToReassign
  • info("Partitions being reassigned: %s".format(partitionsBeingReassigned.toString()))
  • info("Partitions already reassigned: %s".format(reassignedPartitions.toString()))
  • info("Resuming reassignment of partitions: %s".format(partitionsToReassign.toString()))
  • }

该方法使用了removePartitionFromReassignedPartitions(...)方法维护了ControllerContext和Zookeeper中记录的需要进行副本重分配的分区,源码如下:

kafka.controller.KafkaController#removePartitionFromReassignedPartitions
  • // 维护ControllerContext和Zookeeper中记录的需要进行副本重分配的分区
  • def removePartitionFromReassignedPartitions(topicAndPartition: TopicAndPartition) {
  • if(controllerContext.partitionsBeingReassigned.get(topicAndPartition).isDefined) { // 判断ControllerContext中是否有记录对应的分区
  • // stop watching the ISR changes for this partition
  • // 如果记录了,先停止对其分区ISR变化的监听器
  • zkUtils.zkClient.unsubscribeDataChanges(getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),
  • controllerContext.partitionsBeingReassigned(topicAndPartition).isrChangeListener)
  • }
  • // read the current list of reassigned partitions from zookeeper
  • // 从Zookeeper中官读取需要进行副本重分配的分区集合
  • val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned()
  • // remove this partition from that list
  • // 减去不需要进行副本重分配的分区
  • val updatedPartitionsBeingReassigned = partitionsBeingReassigned - topicAndPartition
  • // write the new list to zookeeper
  • // 更新Zookeeper中的记录
  • zkUtils.updatePartitionReassignmentData(updatedPartitionsBeingReassigned.mapValues(_.newReplicas))
  • // update the cache. NO-OP if the partition's reassignment was never started
  • // 更新本地缓存
  • controllerContext.partitionsBeingReassigned.remove(topicAndPartition)
  • }

在初始化需要进行副本重分配的分区之后,还会调用initializeTopicDeletion()初始化TopicDeletionManager对象,用于主题删除操作,源码如下:

kafka.controller.KafkaController#initializeTopicDeletion
  • private def initializeTopicDeletion() {
  • // 从Zookeeper的/admin/delete_topics路径中读取需要删除的主题名称集合
  • val topicsQueuedForDeletion = zkUtils.getChildrenParentMayNotExist(ZkUtils.DeleteTopicsPath).toSet
  • // 过滤得到在不可用Broker上还存在副本的主题名称集合
  • val topicsWithReplicasOnDeadBrokers = controllerContext.partitionReplicaAssignment.filter { case(partition, replicas) =>
  • replicas.exists(r => !controllerContext.liveBrokerIds.contains(r)) }.keySet.map(_.topic)
  • // 处于优先副本选举过程中的主题
  • val topicsForWhichPreferredReplicaElectionIsInProgress = controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic)
  • // 处于副本重分配过程中的主题
  • val topicsForWhichPartitionReassignmentIsInProgress = controllerContext.partitionsBeingReassigned.keySet.map(_.topic)
  • // 上面的三类主题暂时无法被删除
  • val topicsIneligibleForDeletion = topicsWithReplicasOnDeadBrokers | topicsForWhichPartitionReassignmentIsInProgress |
  • topicsForWhichPreferredReplicaElectionIsInProgress
  • info("List of topics to be deleted: %s".format(topicsQueuedForDeletion.mkString(",")))
  • info("List of topics ineligible for deletion: %s".format(topicsIneligibleForDeletion.mkString(",")))
  • // initialize the topic deletion manager
  • // 初始化TopicDeletionManager
  • deleteTopicManager = new TopicDeletionManager(this, topicsQueuedForDeletion, topicsIneligibleForDeletion)
  • }

该方法会从Zookeeper的/admin/delete_topics路径中读取需要删除的主题名称集合,但需要注意的是,下面三类主题会被标记为无法删除:

  1. 主题的某些副本存在于某些不可用的Broker。
  2. 主题的某些分区正在进行“优先副本”选举。
  3. 主题的某些分区正在进行副本重分配。

至此,KafkaController的onControllerFailover()方法所涉及的initializeControllerContext()的过程就讲解完了。

2.2.2.4. 启动状态机

在初始化ControllerContext,从Zookeeper中读取主题、分区、副本等元数据、启动ControllerChannelManager、TopicDeletionManager等组件的过程完成后,KafkaController的onControllerFailover()方法会启动PartitionStateMachine和ReplicaStateMachine两个状态机,该过程在前面的文章中讲解过,这里不再赘述。

2.2.2.5. 处理副本重分配

PartitionStateMachine和ReplicaStateMachine两个状态机启动之后,会将当前KafkaController所在的Broker的状态切换为RunningAsController,然后调用maybeTriggerPartitionReassignment()方法处理需要重新分配副本的分区,该方法内部会对每个待处理分区都调用initiateReassignReplicasForTopicPartition()方法,完成副本重新分配的准备工作,源码如下:

kafka.controller.KafkaController
  • private def maybeTriggerPartitionReassignment() {
  • // 遍历需要进行副本重新分配的分区,分别对它们调用initiateReassignReplicasForTopicPartition()方法
  • controllerContext.partitionsBeingReassigned.foreach { topicPartitionToReassign =>
  • initiateReassignReplicasForTopicPartition(topicPartitionToReassign._1, topicPartitionToReassign._2)
  • }
  • }
  • // 对需要进行副本重新分配的分区做准备工作
  • def initiateReassignReplicasForTopicPartition(topicAndPartition: TopicAndPartition,
  • reassignedPartitionContext: ReassignedPartitionsContext) {
  • // 新的AR副本集
  • val newReplicas = reassignedPartitionContext.newReplicas
  • val topic = topicAndPartition.topic
  • val partition = topicAndPartition.partition
  • // 可用的新的AR副本集
  • val aliveNewReplicas = newReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
  • try {
  • // 分区当前的AR副本集
  • val assignedReplicasOpt = controllerContext.partitionReplicaAssignment.get(topicAndPartition)
  • assignedReplicasOpt match {
  • case Some(assignedReplicas) => // 当前AR副本集存在
  • if(assignedReplicas == newReplicas) { // 新旧AR副本集相同,抛出KafkaException异常
  • throw new KafkaException("Partition %s to be reassigned is already assigned to replicas".format(topicAndPartition) +
  • " %s. Ignoring request for partition reassignment".format(newReplicas.mkString(",")))
  • } else {
  • if(aliveNewReplicas == newReplicas) { // 判断新的AR副本是否都是可用的
  • info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition, newReplicas.mkString(",")))
  • // first register ISR change listener
  • // 为分区注册ReassignedPartitionsIsrChangeListener
  • watchIsrChangesForReassignedPartition(topic, partition, reassignedPartitionContext)
  • controllerContext.partitionsBeingReassigned.put(topicAndPartition, reassignedPartitionContext)
  • // mark topic ineligible for deletion for the partitions being reassigned
  • // 将主题标记为不可删除
  • deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
  • // 执行副本的重新分配
  • onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
  • } else {
  • // some replica in RAR is not alive. Fail partition reassignment
  • throw new KafkaException("Only %s replicas out of the new set of replicas".format(aliveNewReplicas.mkString(",")) +
  • " %s for partition %s to be reassigned are alive. ".format(newReplicas.mkString(","), topicAndPartition) +
  • "Failing partition reassignment")
  • }
  • }
  • case None => throw new KafkaException("Attempt to reassign partition %s that doesn't exist"
  • .format(topicAndPartition))
  • }
  • } catch {
  • case e: Throwable => error("Error completing reassignment of partition %s".format(topicAndPartition), e)
  • // remove the partition from the admin path to unblock the admin client
  • removePartitionFromReassignedPartitions(topicAndPartition)
  • }
  • }

initiateReassignReplicasForTopicPartition(...)会检查分配给当前分区的新的AR副本集是否符合下面四个要求:

  1. 当前分区存在分配给它的新的AR副本集,否则会抛出KafkaException异常。
  2. 新的AR副本集不能与旧的AR副本集相同,否则会抛出KafkaException异常。
  3. 新的AR副本集所在的Broker必须是可用的,否则会抛出KafkaException异常。
  4. 当前分区所属的主题是否是待删除的主题,如果是则调用removePartitionFromReassignedPartitions(...)方法进行处理,主要工作是取消为该分区注册的ReassignedPartitionsIsrChangeListener监听器;删除Zookeeper中/admin/reassign_partitions节点中与当前分区相关的数据;并从ControllerContext的从partitionsBeingReassigned集合中删除分区相关的数据。

处理分区的副本重分配主要过程有下面四步:

  1. 为该分区注册ReassignedPartitionsIsrChangeListener监听器。
  2. 将该分区以及与之对应的ReassignedPartitionContext添加到ControllerContext的partitionsBeingReassigned字典中,表示该分区开始进行副本重分配了。
  3. 将该分区所属的主题标记为不可删除。
  4. 调用onPartitionReassignment(...)方法进行副本重分配。

其中第一步通过watchIsrChangesForReassignedPartition(...)方法完成,源码如下:

kafka.controller.KafkaController#watchIsrChangesForReassignedPartition
  • // 为指定的需要进行副本重分配的分区绑定ReassignedPartitionsIsrChangeListener监听器
  • private def watchIsrChangesForReassignedPartition(topic: String,
  • partition: Int,
  • reassignedPartitionContext: ReassignedPartitionsContext) {
  • // 获取分配给指定分区的新的AR集合
  • val reassignedReplicas = reassignedPartitionContext.newReplicas
  • // 根据分区所属主题、分区ID以及新的AR集合创建ReassignedPartitionsIsrChangeListener监听器
  • val isrChangeListener = new ReassignedPartitionsIsrChangeListener(this, topic, partition,
  • reassignedReplicas.toSet)
  • // 将监听器绑定到分区对应的ReassignedPartitionsContext对象上
  • reassignedPartitionContext.isrChangeListener = isrChangeListener
  • // register listener on the leader and isr path to wait until they catch up with the current leader
  • // 在Zookeeper的/brokers/topics/[topic_name]/partitions/[partition_id]/state路径上注册刚刚创建的监听器
  • zkUtils.zkClient.subscribeDataChanges(getTopicPartitionLeaderAndIsrPath(topic, partition), isrChangeListener)
  • }

第二步和第三步的操作都比较简单,其中第三步涉及到TopicDeletionManager类的markTopicIneligibleForDeletion(topics: Set[String])方法,源码如下:

kafka.controller.TopicDeletionManager#markTopicIneligibleForDeletion
  • /**
  • * Halt delete topic if -
  • * 1. replicas being down
  • * 2. partition reassignment in progress for some partitions of the topic
  • * 3. preferred replica election in progress for some partitions of the topic
  • * @param topics Topics that should be marked ineligible for deletion. No op if the topic is was not previously queued up for deletion
  • */
  • def markTopicIneligibleForDeletion(topics: Set[String]) {
  • if(isDeleteTopicEnabled) { // 判断是否开启了主题删除功能
  • val newTopicsToHaltDeletion = topicsToBeDeleted & topics
  • // 将传入的主题添加到topicsIneligibleForDeletion集合中,即表示他们不可被删除
  • topicsIneligibleForDeletion ++= newTopicsToHaltDeletion
  • if(newTopicsToHaltDeletion.size > 0)
  • info("Halted deletion of topics %s".format(newTopicsToHaltDeletion.mkString(",")))
  • }
  • }

第四步的操作中,onPartitionReassignment(...)方法是执行分区副本重新分配的主要方法,它的内部流程非常多,源码如下:

kafka.controller.KafkaController#onPartitionReassignment
  • /**
  • * This callback is invoked by the reassigned partitions listener. When an admin command initiates a partition
  • * reassignment, it creates the /admin/reassign_partitions path that triggers the zookeeper listener.
  • * Reassigning replicas for a partition goes through a few steps listed in the code.
  • * RAR = Reassigned replicas
  • * OAR = Original list of replicas for partition
  • * AR = current assigned replicas
  • *
  • * 1. Update AR in ZK with OAR + RAR.
  • * 2. Send LeaderAndIsr request to every replica in OAR + RAR (with AR as OAR + RAR). We do this by forcing an update
  • * of the leader epoch in zookeeper.
  • * 3. Start new replicas RAR - OAR by moving replicas in RAR - OAR to NewReplica state.
  • * 4. Wait until all replicas in RAR are in sync with the leader.
  • * 5 Move all replicas in RAR to OnlineReplica state.
  • * 6. Set AR to RAR in memory.
  • * 7. If the leader is not in RAR, elect a new leader from RAR. If new leader needs to be elected from RAR, a LeaderAndIsr
  • * will be sent. If not, then leader epoch will be incremented in zookeeper and a LeaderAndIsr request will be sent.
  • * In any case, the LeaderAndIsr request will have AR = RAR. This will prevent the leader from adding any replica in
  • * RAR - OAR back in the isr.
  • * 8. Move all replicas in OAR - RAR to OfflineReplica state. As part of OfflineReplica state change, we shrink the
  • * isr to remove OAR - RAR in zookeeper and sent a LeaderAndIsr ONLY to the Leader to notify it of the shrunk isr.
  • * After that, we send a StopReplica (delete = false) to the replicas in OAR - RAR.
  • * 9. Move all replicas in OAR - RAR to NonExistentReplica state. This will send a StopReplica (delete = false) to
  • * the replicas in OAR - RAR to physically delete the replicas on disk.
  • * 10. Update AR in ZK with RAR.
  • * 11. Update the /admin/reassign_partitions path in ZK to remove this partition.
  • * 12. After electing leader, the replicas and isr information changes. So resend the update metadata request to every broker.
  • *
  • * For example, if OAR = {1, 2, 3} and RAR = {4,5,6}, the values in the assigned replica (AR) and leader/isr path in ZK
  • * may go through the following transition.
  • * AR leader/isr
  • * {1,2,3} 1/{1,2,3} (initial state)
  • * {1,2,3,4,5,6} 1/{1,2,3} (step 2)
  • * {1,2,3,4,5,6} 1/{1,2,3,4,5,6} (step 4)
  • * {1,2,3,4,5,6} 4/{1,2,3,4,5,6} (step 7)
  • * {1,2,3,4,5,6} 4/{4,5,6} (step 8)
  • * {4,5,6} 4/{4,5,6} (step 10)
  • *
  • * Note that we have to update AR in ZK with RAR last since it's the only place where we store OAR persistently.
  • * This way, if the controller crashes before that step, we can still recover.
  • */
  • def onPartitionReassignment(topicAndPartition: TopicAndPartition, reassignedPartitionContext: ReassignedPartitionsContext) {
  • // 获取新AR集合
  • val reassignedReplicas = reassignedPartitionContext.newReplicas
  • areReplicasInIsr(topicAndPartition.topic, topicAndPartition.partition, reassignedReplicas) match { // 检查新AR集合是否都存在于ISR集合中
  • case false => // 新AR集合中有副本不存在于当前ISR集合中
  • info("New replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +
  • "reassigned not yet caught up with the leader")
  • // RAR - OAR,新AR集合与旧AR集合的差集
  • val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSet
  • // RAR + OAR,新旧AR集合的合集
  • val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicAndPartition)).toSet
  • //1. Update AR in ZK with OAR + RAR.
  • // 将Zookeeper中的AR集合信息更新为RAR + OAR
  • updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq)
  • //2. Send LeaderAndIsr request to every replica in OAR + RAR (with AR as OAR + RAR).
  • // 通过发送LeaderAndIsrRequest,递增leader_epoch
  • updateLeaderEpochAndSendRequest(topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition),
  • newAndOldReplicas.toSeq)
  • //3. replicas in RAR - OAR -> NewReplica
  • // 将RAR - OAR中的副本的状态更新为NewReplica
  • startNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList)
  • info("Waiting for new replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +
  • "reassigned to catch up with the leader")
  • case true => // 新AR集合中的副本都存在于当前ISR集合中
  • //4. Wait until all replicas in RAR are in sync with the leader.
  • // OAR - RAR,旧AR集合与新AR集合的差集,即从旧AR集合减去存在于新AR集合中的副本,因此oldReplicas中的副本都是不需要的
  • val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet
  • //5. replicas in RAR -> OnlineReplica
  • // 将新AR集合中的所有副本状态都转换为OnlinePartition
  • reassignedReplicas.foreach { replica =>
  • replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition,
  • replica)), OnlineReplica)
  • }
  • //6. Set AR to RAR in memory.
  • //7. Send LeaderAndIsr request with a potential new leader (if current leader not in RAR) and
  • // a new AR (using RAR) and same isr to every broker in RAR
  • /**
  • * 1. 将ControllerContext中的AR记录更新为新AR集合;
  • * 2. 如果当前Leader副本在新AR集合中,则递增ZooKeeper和ControllerContext中记录的leader_epoch值,并发送LeaderAndIsrRequest和UpdateMetadataRequest
  • * 3. 如果当前Leader不在新AR集合中或Leader副本不可用,则将分区状态转换为OnlinePartition(之前也是OnlinePartition),
  • * 主要目的使用ReassignedPartitionLeaderSelec tor选举新的Leader副本,使得新AR集合中的一个副本成为新Leader副本,
  • * 然后会发送LeaderAndIsrRequest和UpdateMetadataRequest
  • */
  • moveReassignedPartitionLeaderIfRequired(topicAndPartition, reassignedPartitionContext)
  • //8. replicas in OAR - RAR -> Offline (force those replicas out of isr)
  • //9. replicas in OAR - RAR -> NonExistentReplica (force those replicas to be deleted)
  • // 将OAR - RAR中的副本转换为OfflinePartition状态
  • stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext, oldReplicas)
  • //10. Update AR in ZK with RAR.
  • // 更新Zookeeper中记录的AR信息
  • updateAssignedReplicasForPartition(topicAndPartition, reassignedReplicas)
  • //11. Update the /admin/reassign_partitions path in ZK to remove this partition.
  • // 将此Partition的相关信息从Zookeeper的/admin/ressign_partitions节点中移除
  • removePartitionFromReassignedPartitions(topicAndPartition)
  • info("Removed partition %s from the list of reassigned partitions in zookeeper".format(topicAndPartition))
  • controllerContext.partitionsBeingReassigned.remove(topicAndPartition)
  • //12. After electing leader, the replicas and isr information changes, so resend the update metadata request to every broker
  • // 向所有可用的Broker发送一次UpdateMetadataRequest
  • sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicAndPartition))
  • // signal delete topic thread if reassignment for some partitions belonging to topics being deleted just completed
  • // 尝试取消相关的Topic的不可删除标记,并唤醒DeleteTopicsThread线程
  • deleteTopicManager.resumeDeletionForTopics(Set(topicAndPartition.topic))
  • }
  • }

onPartitionReassignment(...)方法的执行流程主要分为以下几步:

注:这里将新AR集合成为RAR,旧的AR集合成为OAR。

第1步:检查分配给该分区的新AR集合中的所有副本是否都属于In-Sync副本(即是否存在于ISR集合中),该操作由areReplicasInIsr(...)方法完成,源码如下:

kafka.controller.KafkaController#areReplicasInIsr
  • // 判断指定的副本是否存在于ISR副本中
  • private def areReplicasInIsr(topic: String, partition: Int, replicas: Seq[Int]): Boolean = {
  • zkUtils.getLeaderAndIsrForPartition(topic, partition) match { // 获取Leader副本及ISR集合
  • case Some(leaderAndIsr) =>
  • // 判断参数replicas中的副本是否存在于ISR集合中
  • val replicasNotInIsr = replicas.filterNot(r => leaderAndIsr.isr.contains(r))
  • replicasNotInIsr.isEmpty
  • case None => false
  • }
  • }

该方法执行结果有两种,如果为false,表示RAR中有副本不存在于ISR集合内,则需要进行下面几步额外的处理:

  1. 将ControllerContext及Zookeeper中记录的该分区的AR集合更新为RAR和OAR的并集(RAR + OAR),这一步由updateAssignedReplicasForPartition(...)方法完成。
  2. 递增ZooKeeper中记录该分区的leader_epoch值,并向RAR和OAR的并集中的副本发送LeaderAndIsrRequest请求,这一步由updateLeaderEpochAndSendRequest(...)方法完成。
  3. 将RAR和OAR的差集(RAR - OAR)中的副本的状态更新成NewReplica状态,即向这些副本发送LeaderAndIsrRequest使其成为Follower副本,并发送UpdateMetadataRequest,这一步由startNewReplicasForReassignedPartition(...)方法完成。

上述三个方法的源码如下:

  • updateAssignedReplicasForPartition(...)方法:
kafka.controller.KafkaController
  • // 更新ControllerContext中记录的指定分区的AR集合为特定副本集合
  • private def updateAssignedReplicasForPartition(topicAndPartition: TopicAndPartition,
  • replicas: Seq[Int]) {
  • // 从ControllerContext的partitionReplicaAssignment过滤得到该分区所属主题的所有分区和副本
  • val partitionsAndReplicasForThisTopic = controllerContext.partitionReplicaAssignment.filter(_._1.topic.equals(topicAndPartition.topic))
  • // 更新对应分区的AR集合为传入的副本集合
  • partitionsAndReplicasForThisTopic.put(topicAndPartition, replicas)
  • // 调用重载方法,该方法会更新Zookeeper中的数据
  • updateAssignedReplicasForPartition(topicAndPartition, partitionsAndReplicasForThisTopic)
  • info("Updated assigned replicas for partition %s being reassigned to %s ".format(topicAndPartition, replicas.mkString(",")))
  • // update the assigned replica list after a successful zookeeper write
  • // 更新ControllerContext中的数据
  • controllerContext.partitionReplicaAssignment.put(topicAndPartition, replicas)
  • }
  • // 重载方法,会更新Zookeeper中的数据
  • def updateAssignedReplicasForPartition(topicAndPartition: TopicAndPartition,
  • newReplicaAssignmentForTopic: Map[TopicAndPartition, Seq[Int]]) {
  • try {
  • // 构造Zookeeper中的主题路径/brokers/topics/[topic_name]
  • val zkPath = getTopicPath(topicAndPartition.topic)
  • // 构造更新的JSON字符串
  • val jsonPartitionMap = zkUtils.replicaAssignmentZkData(newReplicaAssignmentForTopic.map(e => (e._1.partition.toString -> e._2)))
  • // 更新Zookeeper中主题的分区信息
  • zkUtils.updatePersistentPath(zkPath, jsonPartitionMap)
  • debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionMap))
  • } catch {
  • case e: ZkNoNodeException => throw new IllegalStateException("Topic %s doesn't exist".format(topicAndPartition.topic))
  • case e2: Throwable => throw new KafkaException(e2.toString)
  • }
  • }
  • updateLeaderEpochAndSendRequest(...)方法:
kafka.controller.KafkaController
  • // 更新指定分区在Zookeeper中的Leader Epoch,同时replicasToReceiveRequest中记录的Broker发送LeaderAndIsrRequest请求
  • private def updateLeaderEpochAndSendRequest(topicAndPartition: TopicAndPartition, replicasToReceiveRequest: Seq[Int], newAssignedReplicas: Seq[Int]) {
  • // 检查是否有未发送的请求
  • brokerRequestBatch.newBatch()
  • // 更新Zookeeper中该分区的Leader Epoch和zkVersion信息,返回的LeaderIsrAndControllerEpoch中包含了新的Leader Epoch及zkVersion
  • updateLeaderEpoch(topicAndPartition.topic, topicAndPartition.partition) match {
  • case Some(updatedLeaderIsrAndControllerEpoch) =>
  • try {
  • // 添加LeaderAndIsrRequest请求,等待发送
  • brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasToReceiveRequest, topicAndPartition.topic,
  • topicAndPartition.partition, updatedLeaderIsrAndControllerEpoch, newAssignedReplicas)
  • // 发送请求
  • brokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)
  • } catch {
  • case e : IllegalStateException => {
  • // Resign if the controller is in an illegal state
  • error("Forcing the controller to resign")
  • // 当Controller的状态处于非法时,清空brokerRequestBatch,重置controllerElector记录的leaderId,删除Zookeeper的/controller节点
  • brokerRequestBatch.clear()
  • controllerElector.resign()
  • throw e
  • }
  • }
  • stateChangeLogger.trace(("Controller %d epoch %d sent LeaderAndIsr request %s with new assigned replica list %s " +
  • "to leader %d for partition being reassigned %s").format(config.brokerId, controllerContext.epoch, updatedLeaderIsrAndControllerEpoch,
  • newAssignedReplicas.mkString(","), updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader, topicAndPartition))
  • case None => // fail the reassignment
  • stateChangeLogger.error(("Controller %d epoch %d failed to send LeaderAndIsr request with new assigned replica list %s " +
  • "to leader for partition being reassigned %s").format(config.brokerId, controllerContext.epoch,
  • newAssignedReplicas.mkString(","), topicAndPartition))
  • }
  • }
  • /**
  • * Does not change leader or isr, but just increments the leader epoch
  • * 更新Leader Epoch年代信息,不会修改Leader及ISR信息,返回的LeaderIsrAndControllerEpoch中包含了新的Leader Epoch及zkVersion
  • * @param topic topic
  • * @param partition partition
  • * @return the new leaderAndIsr with an incremented leader epoch, or None if leaderAndIsr is empty.
  • */
  • private def updateLeaderEpoch(topic: String, partition: Int): Option[LeaderIsrAndControllerEpoch] = {
  • val topicAndPartition = TopicAndPartition(topic, partition)
  • debug("Updating leader epoch for partition %s.".format(topicAndPartition))
  • var finalLeaderIsrAndControllerEpoch: Option[LeaderIsrAndControllerEpoch] = None
  • var zkWriteCompleteOrUnnecessary = false
  • while (!zkWriteCompleteOrUnnecessary) { // 循环写,直到写成功为止
  • // refresh leader and isr from zookeeper again
  • // 读取Zookeeper中存储的指定主题分区的Leader、Leader Epoch和ISR信息
  • val leaderIsrAndEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition)
  • zkWriteCompleteOrUnnecessary = leaderIsrAndEpochOpt match {
  • case Some(leaderIsrAndEpoch) =>
  • // Zookeeper之前存储的ISR和Controller Epoch
  • val leaderAndIsr = leaderIsrAndEpoch.leaderAndIsr
  • val controllerEpoch = leaderIsrAndEpoch.controllerEpoch
  • if(controllerEpoch > epoch) // 检查Controller Epoch是否合法
  • throw new StateChangeFailedException("Leader and isr path written by another controller. This probably" +
  • "means the current controller with epoch %d went through a soft failure and another ".format(epoch) +
  • "controller was elected with epoch %d. Aborting state change by this controller".format(controllerEpoch))
  • // increment the leader epoch even if there are no leader or isr changes to allow the leader to cache the expanded
  • // assigned replica list
  • // 构造存有新的Leader Epoch的LeaderAndIsr对象
  • val newLeaderAndIsr = new LeaderAndIsr(leaderAndIsr.leader, leaderAndIsr.leaderEpoch + 1,
  • leaderAndIsr.isr, leaderAndIsr.zkVersion + 1)
  • // update the new leadership decision in zookeeper or retry
  • // 更新Zookeeper中存储的Leader Epoch,注意,此处并没有更新LeaderID及ISR集合
  • val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic,
  • partition, newLeaderAndIsr, epoch, leaderAndIsr.zkVersion)
  • // 记录新的zkVersion
  • newLeaderAndIsr.zkVersion = newVersion
  • // 以更新后的Leader Epoch、zkVersion构造LeaderIsrAndControllerEpoch对象用于返回
  • finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(newLeaderAndIsr, epoch))
  • if (updateSucceeded)
  • info("Updated leader epoch for partition %s to %d".format(topicAndPartition, newLeaderAndIsr.leaderEpoch))
  • updateSucceeded
  • case None =>
  • throw new IllegalStateException(("Cannot update leader epoch for partition %s as leaderAndIsr path is empty. " +
  • "This could mean we somehow tried to reassign a partition that doesn't exist").format(topicAndPartition))
  • true
  • }
  • }
  • // 返回更新了Leader Epoch和zkVersion后的LeaderIsrAndControllerEpoch对象
  • finalLeaderIsrAndControllerEpoch
  • }
  • startNewReplicasForReassignedPartition(...)方法:
kafka.controller.KafkaController
  • // 转换指定副本的状态为NewReplica
  • private def startNewReplicasForReassignedPartition(topicAndPartition: TopicAndPartition,
  • reassignedPartitionContext: ReassignedPartitionsContext,
  • newReplicas: Set[Int]) {
  • // send the start replica request to the brokers in the reassigned replicas list that are not in the assigned
  • // replicas list
  • newReplicas.foreach { replica =>
  • replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, replica)), NewReplica)
  • }
  • }

如果areReplicasInIsr(...)方法返回true,表示RAR中所有副本都存在于ISR集合内,则可以进行第2步了。

第2步:将RAR集合中的副本都转换为OnlineReplica状态,即让新AR中的副本都上线,并将ControllerContext中记录的AR集合更新为新的AR集合,这一步由ReplicaStateMachine状态机以及moveReassignedPartitionLeaderIfRequired(...)方法完成。其中状态转换前面讲解过,这里主要分析moveReassignedPartitionLeaderIfRequired(...)方法的实现,源码如下:

kafka.controller.KafkaController#moveReassignedPartitionLeaderIfRequired
  • // 更新ControllerContext中记录的指定分区的AR集合,必要的时候会进行Leader选举
  • private def moveReassignedPartitionLeaderIfRequired(topicAndPartition: TopicAndPartition,
  • reassignedPartitionContext: ReassignedPartitionsContext) {
  • // 分区新的AR集合
  • val reassignedReplicas = reassignedPartitionContext.newReplicas
  • // 分区当前的Leader的ID
  • val currentLeader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader
  • // change the assigned replica list to just the reassigned replicas in the cache so it gets sent out on the LeaderAndIsr
  • // request to the current or new leader. This will prevent it from adding the old replicas to the ISR
  • // 分区旧的AR集合
  • val oldAndNewReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
  • // 将ControllerContext中该分区的AR集合更新为新的AR集合
  • controllerContext.partitionReplicaAssignment.put(topicAndPartition, reassignedReplicas)
  • // 判断新的AR集合是否包含原来的Leader副本
  • if(!reassignedPartitionContext.newReplicas.contains(currentLeader)) { // 新的AR集合不包含原来的Leader副本
  • info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) +
  • "is not in the new list of replicas %s. Re-electing leader".format(reassignedReplicas.mkString(",")))
  • // move the leader to one of the alive and caught up new replicas
  • /**
  • * 将分区状态转换为OnlinePartition(之前也是OnlinePartition),
  • * 该操作会触发Leader副本的选举,使得新AR集合中的一个副本成为新Leader副本,
  • * 可回顾 {@link kafka.controller.PartitionStateMachine#handleStateChange } 方法
  • * 使用的选举器是传入的reassignedPartitionLeaderSelector
  • */
  • partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition, reassignedPartitionLeaderSelector)
  • } else { // 新AR集合中包含原来的Leader副本
  • // check if the leader is alive or not
  • // 判断Leader副本所在的Broker是否是存活的
  • controllerContext.liveBrokerIds.contains(currentLeader) match {
  • case true => // Leader副本所在的Broker可用
  • info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) +
  • "is already in the new list of replicas %s and is alive".format(reassignedReplicas.mkString(",")))
  • // shrink replication factor and update the leader epoch in zookeeper to use on the next LeaderAndIsrRequest
  • // 更新指定分区在Zookeeper中的Leader Epoch,同时向oldAndNewReplicas中记录的Broker发送LeaderAndIsrRequest请求
  • updateLeaderEpochAndSendRequest(topicAndPartition, oldAndNewReplicas, reassignedReplicas)
  • case false => // Leader副本所在的Broker不可用,需要选举Leader副本
  • info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) +
  • "is already in the new list of replicas %s but is dead".format(reassignedReplicas.mkString(",")))
  • // 否则转换分区状态为OnlinePartition以触发Leader选举
  • partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition, reassignedPartitionLeaderSelector)
  • }
  • }
  • }

moveReassignedPartitionLeaderIfRequired(...)方法的作用其实非常重要,在更新分区的AR副本集时,需要判断Leader副本的情况,如果新AR集合中不包含原来的Leader副本、或者虽然包含但Leader副本不可用,此时则需要重新选择一个Leader副本出来,通过将分区的状态从OnlinePartition转换为OnlinePartition,会触发一次Leader副本的选举。

第3步:将OAR与RAR的差集(OAR - RAR)中所有的副本转换为NonExistentReplica状态,即让未被使用的副本都下线,这一步由stopOldReplicasOfReassignedPartition(...)完成,源码比较简单,如下:

kafka.controller.KafkaController#stopOldReplicasOfReassignedPartition
  • // 将oldReplicas集合中的副本的状态转换为NonExistentReplica状态
  • private def stopOldReplicasOfReassignedPartition(topicAndPartition: TopicAndPartition,
  • reassignedPartitionContext: ReassignedPartitionsContext,
  • oldReplicas: Set[Int]) {
  • val topic = topicAndPartition.topic
  • val partition = topicAndPartition.partition
  • // first move the replica to offline state (the controller removes it from the ISR)
  • // 得到需要删除的副本对应的PartitionAndReplica集合
  • val replicasToBeDeleted = oldReplicas.map(r => PartitionAndReplica(topic, partition, r))
  • // 陆续转换为OfflineReplica -> ReplicaDeletionStarted -> ReplicaDeletionSuccessful -> NonExistentReplica
  • replicaStateMachine.handleStateChanges(replicasToBeDeleted, OfflineReplica)
  • // send stop replica command to the old replicas
  • replicaStateMachine.handleStateChanges(replicasToBeDeleted, ReplicaDeletionStarted)
  • // TODO: Eventually partition reassignment could use a callback that does retries if deletion failed
  • replicaStateMachine.handleStateChanges(replicasToBeDeleted, ReplicaDeletionSuccessful)
  • replicaStateMachine.handleStateChanges(replicasToBeDeleted, NonExistentReplica)
  • }

第4步:更新Zookeeper中记录的该分区的AR信息,并将该分区的副本重分配信息从Zookeeper的/admin/ressign_partitions节点中删除,这一步由updateAssignedReplicasForPartition(...)removePartitionFromReassignedPartitions(...)两个方法完成,这两个方法在前面都已经讲解过,这里不再赘述。

第5步:向所有可用的Broker发送UpdateMetadataRequest通知其更新元数据;取消该分区所属主题的不可删除标记并唤醒对应的DeleteTopicsThread线程,这一步由sendUpdateMetadataRequest(...)和TopicDeletionManager的resumeDeletionForTopics(...)完成:

  • KafkaController的sendUpdateMetadataRequest(...)方法:
kafka.controller.KafkaController#sendUpdateMetadataRequest
  • /**
  • * Send the leader information for selected partitions to selected brokers so that they can correctly respond to
  • * metadata requests
  • * @param brokers The brokers that the update metadata request should be sent to
  • */
  • def sendUpdateMetadataRequest(brokers: Seq[Int], partitions: Set[TopicAndPartition] = Set.empty[TopicAndPartition]) {
  • try {
  • // 检查leaderAndIsrRequestMap、stopReplicaRequestMap、updateMetadataRequestMap,有一个为空就会抛出异常
  • brokerRequestBatch.newBatch()
  • // 向给定的Broker发送UpdateMetadataRequest请求
  • brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokers, partitions)
  • /**
  • * 根据leaderAndIsrRequestMap、stopReplicaRequestMap、updateMetadataRequestMap中的数据创建对应的请求
  • * 并添加到ControllerChannelManager中对应的消息队列中,最终由RequestSendThread线程发送这些请求
  • */
  • brokerRequestBatch.sendRequestsToBrokers(epoch)
  • } catch {
  • case e : IllegalStateException => {
  • // Resign if the controller is in an illegal state
  • error("Forcing the controller to resign")
  • brokerRequestBatch.clear()
  • controllerElector.resign()
  • throw e
  • }
  • }
  • }
  • TopicDeletionManager的resumeDeletionForTopics(...)方法(关于TopicDeletionManager的内容会在后面详细讲解):
kafka.controller.TopicDeletionManager#resumeDeletionForTopics
  • /**
  • * Invoked when any event that can possibly resume topic deletion occurs. These events include -
  • * 1. New broker starts up. Any replicas belonging to topics queued up for deletion can be deleted since the broker is up
  • * 2. Partition reassignment completes. Any partitions belonging to topics queued up for deletion finished reassignment
  • * 3. Preferred replica election completes. Any partitions belonging to topics queued up for deletion finished
  • * preferred replica election
  • * @param topics Topics for which deletion can be resumed
  • */
  • def resumeDeletionForTopics(topics: Set[String] = Set.empty) {
  • if(isDeleteTopicEnabled) { // 检查是否开启主题删除功能
  • // 找出给定Topic集合与删除Topic集合的交集,将该交集中的Topic从topicsIneligibleForDeletion集合中删除
  • val topicsToResumeDeletion = topics & topicsToBeDeleted
  • if(topicsToResumeDeletion.size > 0) {
  • topicsIneligibleForDeletion --= topicsToResumeDeletion
  • // 唤醒DeleteTopicsThread线程处理待删除的Topic
  • resumeTopicDeletionThread()
  • }
  • }
  • }

至此,onPartitionReassignment(...)方法的整体流程就讲解完了。

onPartitionReassignment(...)方法涉及到分区副本的重新分配的主体流程,实现比较负责,步骤繁多,但也是Kafka中非常重要的功能实现。

2.2.2.6. 处理优先副本选举

KafkaController的onControllerFailover()在处理完分区副本重分配操作后,会继续调用maybeTriggerPreferredReplicaElection(...)方法处理需要进行“优先副本”选举的分区,该方法内部直接调用了onPreferredReplicaElection(...)方法,源码如下:

kafka.controller.KafkaController
  • // 可能需要触发"优先副本"选举
  • private def maybeTriggerPreferredReplicaElection() {
  • onPreferredReplicaElection(controllerContext.partitionsUndergoingPreferredReplicaElection.toSet)
  • }
  • /**
  • * 通过PreferredReplicaPartitionLeaderSelector选举Leader副本和ISR集合
  • * @param partitions 进行"优先副本"选举的分区集合
  • * @param isTriggeredByAutoRebalance 标识这次的选举是否由AutoRebalance自动任务触发
  • */
  • def onPreferredReplicaElection(partitions: Set[TopicAndPartition], isTriggeredByAutoRebalance: Boolean = false) {
  • info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(",")))
  • try {
  • // 将参与"优先副本"选举的分区添加到ControllerContext的partitionsUndergoingPreferredReplicaElection集合
  • controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions
  • // 将对应的Topic标记为不可删除
  • deleteTopicManager.markTopicIneligibleForDeletion(partitions.map(_.topic))
  • // 将分区转换为OnlinePartition状态,除了重选Leader,还会更新Zookeeper中的数据,并发送LeaderAndIsrRequest和UpdateMetadataRequest请求
  • partitionStateMachine.handleStateChanges(partitions, OnlinePartition, preferredReplicaPartitionLeaderSelector)
  • } catch {
  • case e: Throwable => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e)
  • } finally {
  • // 清理ControllerContext的partitionsUndergoingPreferredReplicaElection集合以及Zookeeper上的相关数据
  • removePartitionsFromPreferredReplicaElection(partitions, isTriggeredByAutoRebalance)
  • // 将Topic标记为可删除,并唤醒DeleteTopicsThread线程
  • deleteTopicManager.resumeDeletionForTopics(partitions.map(_.topic))
  • }
  • }

此处触发选举的方式与上一节中的类似,即通过将分区状态从OnlinePartition转换为OnlinePartition以触发选举,不过这一次使用的选举器是PreferredReplicaPartitionLeaderSelector;下面以一个例子来介绍“优先副本”的选举过程。

注:该例子来源于技术世界,原文链接http://www.jasongj.com/2015/06/08/KafkaColumn3

假设当前Kafka集群中有编号0 ~ 8的8个Broker实例,在该Kafka集群上创建主题topic-1,分区数为8,副本因子为3;在初始状态下,Leader副本都为AR副本集中的第一个副本(所谓的“优先副本”),即Replicas[0]:

  • Topic: topic-1 Partition: 0 Leader: 1 Replicas: 1,3,4 Isr: 4,1,3
  • Topic: topic-1 Partition: 1 Leader: 2 Replicas: 2,4,5 Isr: 5,2,4
  • Topic: topic-1 Partition: 2 Leader: 3 Replicas: 3,5,6 Isr: 5,3,6
  • Topic: topic-1 Partition: 3 Leader: 4 Replicas: 4,6,7 Isr: 4,7,6
  • Topic: topic-1 Partition: 4 Leader: 5 Replicas: 5,7,0 Isr: 5,7,0
  • Topic: topic-1 Partition: 5 Leader: 6 Replicas: 6,0,1 Isr: 0,6,1
  • Topic: topic-1 Partition: 6 Leader: 7 Replicas: 7,1,2 Isr: 7,1,2
  • Topic: topic-1 Partition: 7 Leader: 0 Replicas: 0,2,3 Isr: 0,2,3

此时,如果编号为1、2、4的Broker宕机故障,导致topic-1主题的0、1、2、3号分区的Leader副本发生迁移;Partition-0的Leader副本从Broker-1迁移到Broker-3,Partition-1的Leader副本从Broker-2迁移到Broker-5,Partition-2的Leader副本从Broker-3迁移到Broker-6,Partition-3的Leader副本从Broker-4迁移到Broker-7,即这四个分区此时都不是以“优先副本”作为Leader副本了:

  • Topic: topic-1 Partition: 0 Leader: 3 Replicas: 1,3,4 Isr: 3 // Leader 1 -> 3,ISR [4,1,3] -> 3
  • Topic: topic-1 Partition: 1 Leader: 5 Replicas: 2,4,5 Isr: 5 // Leader 2 -> 5,ISR [5,2,4] -> 5
  • Topic: topic-1 Partition: 2 Leader: 6 Replicas: 3,5,6 Isr: 6,5,3 // Leader 3 -> 6,ISR [5,3,6] -> 6,5,3
  • Topic: topic-1 Partition: 3 Leader: 7 Replicas: 4,6,7 Isr: 7,6 // Leader 4 -> 7,ISR [4,7,6] -> 7,6
  • Topic: topic-1 Partition: 4 Leader: 5 Replicas: 5,7,0 Isr: 7,5,0
  • Topic: topic-1 Partition: 5 Leader: 6 Replicas: 6,0,1 Isr: 0,6
  • Topic: topic-1 Partition: 6 leader: 7 Replicas: 7,1,2 Isr: 7
  • Topic: topic-1 Partition: 7 Leader: 0 Replicas: 0,2,3 Isr: 0,3

此时重新启动Broker-1,由于之前Partition-0、Partition-5和Partition-6的AR集合中是包含Broker-1的,此时Broker-1会加入到这三个分区的ISR;但此时Broker-1上还不存在任何一个分区的Leader副本,同时此时,Broker-5、Broker-6和Broker7上存在两个分区的Leader副本,即Broker-5上存在Partition-1、Partition-4的Leader副本,Broker-6上存在Partition-2、Partition-5的副本,Broker-7上存在Partition-3、Partition-6的Leader副本:

  • Topic: topic-1 Partition: 0 Leader: 3 Replicas: 1,3,4 Isr: 3,1 // ISR [3] -> [3,1],Broker-1上线
  • Topic: topic-1 Partition: 1 Leader: 5 Replicas: 2,4,5 Isr: 5
  • Topic: topic-1 Partition: 2 Leader: 6 Replicas: 3,5,6 Isr: 6,5,3
  • Topic: topic-1 Partition: 3 Leader: 7 Replicas: 4,6,7 Isr: 7,6
  • Topic: topic-1 Partition: 4 Leader: 5 Replicas: 5,7,0 Isr: 7,5,0
  • Topic: topic-1 Partition: 5 Leader: 6 Replicas: 6,0,1 Isr: 0,6,1 // ISR [0,6] -> [0,6,1],Broker-1上线
  • Topic: topic-1 Partition: 6 Leader: 7 Replicas: 7,1,2 Isr: 7,1 // ISR [7] -> [7,1],Broker-1上线
  • Topic: topic-1 Partition: 7 Leader: 0 Replicas: 0,2,3 Isr: 0,3

当通过kafka-preferred-replica-election.sh脚本命令执行“优先副本”选举之后,会发现Partition-0的Leader副本从Broker-3上迁移到了Broker-1上,Partition-2的Leader副本从Broker-6上迁移到了Broker-3上:

  • Topic: topic-1 Partition: 0 Leader: 1 Replicas: 1,3,4 Isr: 3,1 // Leader 3 -> 1,即Replicas[0]
  • Topic: topic-1 Partition: 1 Leader: 5 Replicas: 2,4,5 Isr: 5 // Leader还是5
  • Topic: topic-1 Partition: 2 Leader: 3 Replicas: 3,5,6 Isr: 6,5,3 // Leader 6 -> 3,即Replicas[0]
  • Topic: topic-1 Partition: 3 Leader: 7 Replicas: 4,6,7 Isr: 7,6 // Leader还是7
  • Topic: topic-1 Partition: 4 Leader: 5 Replicas: 5,7,0 Isr: 7,5,0
  • Topic: topic-1 Partition: 5 Leader: 6 Replicas: 6,0,1 Isr: 0,6,1
  • Topic: topic-1 Partition: 6 Leader: 7 Replicas: 7,1,2 Isr: 7,1
  • Topic: topic-1 Partition: 7 Leader: 0 Replicas: 0,2,3 Isr: 0,3

但由于此时Broker-2和Broker-4还未上线,所以Partition-1和Partition-3的Leader副本并未切换为“优先副本”。

接下来启动Broker-2和Broker-4,Leader副本的分布与上一步相比并未变化:

  • Topic: topic-1 Partition: 0 Leader: 1 Replicas: 1,3,4 Isr: 3,1,4 // ISR [3,1] -> [3,1,4],Broker-4上线
  • Topic: topic-1 Partition: 1 Leader: 5 Replicas: 2,4,5 Isr: 5,2,4 // ISR [5] -> [5,2,4],Broker-2、Broker-4上线
  • Topic: topic-1 Partition: 2 Leader: 3 Replicas: 3,5,6 Isr: 6,5,3
  • Topic: topic-1 Partition: 3 Leader: 7 Replicas: 4,6,7 Isr: 7,6,4
  • Topic: topic-1 Partition: 4 Leader: 5 Replicas: 5,7,0 Isr: 7,5,0
  • Topic: topic-1 Partition: 5 Leader: 6 Replicas: 6,0,1 Isr: 0,6,1
  • Topic: topic-1 Partition: 6 Leader: 7 Replicas: 7,1,2 Isr: 7,1,2 // ISR [7,1] -> [7,1,2],Broker-2上线
  • Topic: topic-1 Partition: 7 Leader: 0 Replicas: 0,2,3 Isr: 0,3,2 // ISR [0,3] -> [0,3,2],Broker-2上线

当通过kafka-preferred-replica-election.sh脚本命令再次执行“优先副本”选举之后,会发现Partition-1和Partition-3的Leader副本都切换为了“优先副本”:

  • Topic: topic-1 Partition: 0 Leader: 1 Replicas: 1,3,4 Isr: 3,1,4
  • Topic: topic-1 Partition: 1 Leader: 2 Replicas: 2,4,5 Isr: 5,2,4 // Leader 5 -> 2,即Replicas[0]
  • Topic: topic-1 Partition: 2 Leader: 3 Replicas: 3,5,6 Isr: 6,5,3
  • Topic: topic-1 Partition: 3 Leader: 4 Replicas: 4,6,7 Isr: 7,6,4 // Leader 7 -> 4,即Replicas[0]
  • Topic: topic-1 Partition: 4 Leader: 5 Replicas: 5,7,0 Isr: 7,5,0
  • Topic: topic-1 Partition: 5 Leader: 6 Replicas: 6,0,1 Isr: 0,6,1
  • Topic: topic-1 Partition: 6 Leader: 7 Replicas: 7,1,2 Isr: 7,1,2
  • Topic: topic-1 Partition: 7 Leader: 0 Replicas: 0,2,3 Isr: 0,3,2

注:“优先副本”选举还会被分区自动均衡任务触发。

2.2.2.7. 更新MetadataCache

在完成上面的操作之后,表示此时涉及到元数据的操作都完成了,因此需要通知其它的Broker更新自己的MetadataCache,该操作通过KafkaController的sendUpdateMetadataRequest(...)方法完成,该方法在前面有介绍,这里不再赘述。

2.2.2.8. 启动分区自动均衡任务

当Kafka集群开了分区自动均衡配置时(通过auto.leader.rebalance.enable参数),KafkaController的onControllerFailover()会启动一个自动任务用于执行分区的自动均衡;此处使用的调度器autoRebalanceScheduler依旧是KafkaScheduler类型的,任务的名称为partition-rebalance-thread,该任务每隔5秒执行一次,每次的执行周期长度由leader.imbalance.check.interval.seconds参数配置,任务触发的操作定义在KafkaController的checkAndTriggerPartitionRebalance()方法中,该方法源码如下:

kafka.controller.KafkaController#checkAndTriggerPartitionRebalance
  • /**
  • * 对失衡的Broker上相关的分区进行“优先副本”选举,使得相关分区的“优先副本”重新成为Leader副本,
  • * 整个集群中Leader副本的分布也会重新恢复平衡
  • */
  • private def checkAndTriggerPartitionRebalance(): Unit = {
  • if (isActive()) {
  • trace("checking need to trigger partition rebalance")
  • // get all the active brokers
  • // 获取所有可用的Broker的副本
  • var preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicAndPartition, Seq[Int]]] = null
  • inLock(controllerContext.controllerLock) {
  • // 获取"优先副本"所在的BrokerId与分区的对应关系
  • preferredReplicasForTopicsByBrokers =
  • controllerContext.partitionReplicaAssignment // 每个分区的AR集合,Map[TopicAndPartition, Seq[Int]]类型
  • .filterNot(p => deleteTopicManager.isTopicQueuedUpForDeletion(p._1.topic)) // 过滤掉即将被删除的主题的分区
  • .groupBy { // 通过该分区的AR集合第一个副本的ID(即Leader副本)进行分组
  • case(topicAndPartition, assignedReplicas) => assignedReplicas.head
  • } // 结果为Map[Int, Map[TopicAndPartition, Seq[Int]]]类型,即Map[leaderBrokerId, Map[TopicAndPartition, Seq[TopicAndPartition BrokerId]]]
  • }
  • debug("preferred replicas by broker " + preferredReplicasForTopicsByBrokers)
  • // for each broker, check if a preferred replica election needs to be triggered
  • // 计算每个Broker的imbalance比率
  • preferredReplicasForTopicsByBrokers.foreach {
  • // 每一项为 Leader副本所在的Broker -> 该Broker上的分区
  • case(leaderBroker, topicAndPartitionsForBroker) => { // [leaderBrokerId, Map[TopicAndPartition, Seq[TopicAndPartition BrokerId]]]
  • var imbalanceRatio: Double = 0
  • var topicsNotInPreferredReplica: Map[TopicAndPartition, Seq[Int]] = null
  • inLock(controllerContext.controllerLock) {
  • topicsNotInPreferredReplica =
  • topicAndPartitionsForBroker.filter {
  • case(topicPartition, replicas) => {
  • // 该分区存在Leader副本
  • controllerContext.partitionLeadershipInfo.contains(topicPartition) &&
  • // 该分区的Leader副本不是"优先副本"
  • controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != leaderBroker
  • }
  • }
  • debug("topics not in preferred replica " + topicsNotInPreferredReplica)
  • // 当前Broker上分区的数量
  • val totalTopicPartitionsForBroker = topicAndPartitionsForBroker.size
  • // 当前Broker上Leader副本不是"优先副本"的数量
  • val totalTopicPartitionsNotLedByBroker = topicsNotInPreferredReplica.size
  • // 计算imbalance比率
  • imbalanceRatio = totalTopicPartitionsNotLedByBroker.toDouble / totalTopicPartitionsForBroker
  • trace("leader imbalance ratio for broker %d is %f".format(leaderBroker, imbalanceRatio))
  • }
  • // check ratio and if greater than desired ratio, trigger a rebalance for the topic partitions
  • // that need to be on this broker
  • // Broker上的“imbalance”比率大于一定阈值时,触发“优先副本”选举
  • if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
  • topicsNotInPreferredReplica.foreach {
  • case(topicPartition, replicas) => {
  • inLock(controllerContext.controllerLock) {
  • // do this check only if the broker is live and there are no partitions being reassigned currently
  • // and preferred replica election is not in progress
  • if (controllerContext.liveBrokerIds.contains(leaderBroker) && // Leader副本所在的Broker是可用的
  • controllerContext.partitionsBeingReassigned.size == 0 && // 当前没有正在进行重新分配副本的分区
  • controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0 && // 当前没有正在进行“优先副本”选举的分区
  • !deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) && // 分区所属主题不是待删除主题
  • controllerContext.allTopics.contains(topicPartition.topic)) { // ControllerContext中包含分区所属的主题
  • // 触发"优先副本"选举
  • onPreferredReplicaElection(Set(topicPartition), true)
  • }
  • }
  • }
  • }
  • }
  • }
  • }
  • }
  • }

checkAndTriggerPartitionRebalance()方法首先会获取“优先副本”所在的Broker与该Broker上的分区的对应关系,然后利用此对应关系,计算每个“优先副本”所在的Broker的imbalance比率,该值是当前Leader副本为非“优先副本”的分区的数量与此Broker上所有分区的总数量的比值,当imbalance比率大于一定阈值(由leader.imbalance.per.broker.percentage参数配置,默认是10,即10%),则触发“优先副本”选举。

2.2.2.9. 启动TopicDeletionManager

KafkaController中onControllerFailover()方法的最后一步,是调用TopicDeletionManager对象的start()方法启动主题删除管理器,该方法内部其实会创建并启动一个DeleteTopicsThread线程,用于进行主题删除,具体的功能和实现会在后面的文章中介绍。

3. 总结

至此,关于KafkaController相关的监听器已经讲解得差不多了,讲解的过程中我们了解了KafkaController的启动、Controller Leader的选举等功能的实现,在这些功能中穿插了许多关于Kafka初始化过程的细节。在后面的文章中,将讲解Kafka服务端涉及到的其他的监听器。