大数据
流式处理
Kafka
源码解析

Kafka系列 18 - 服务端源码分析 09:KafkaController基本组件

简介:主要简介KafkaController的辅助组件,包括ControllerContext、ControllerChannelManager和ControllerBrokerRequestBatch

1. KafkaController简介

在Kafka集群的多个Broker中,每个Broker启动的时候,都会实例化一个KafkaController,并将Broker的ID注册到Zookeeper;多个Broker的KafkaController中的一个会被选举为Controller Leader,负责管理整个集群中所有的分区和副本的状态。当某分区的Leader副本出现故障时,由Controller Leader负责为该分区重新选举新的Leader副本;当使用kafka-topics脚本增加某Topic的分区数量时,由Controller Leader管理分区的重新分配;当检测到分区的ISR集合发生变化时,由Controller Leader通知集群中所有的Broker更新其MetadataCache信息。

为了实现Controller的高可用,一个Broker被选为Controller Leader之后,其他的Broker的KafkaController都会成为Controller Follower,当Controller Leader因故障而出现宕机时,会从剩下的Controller Follower中选出新的Controller Leader来管理集群。选举Controller Leader依赖于ZooKeeper实现,每个Broker启动时都会创建一个KafkaController对象,但是集群中只能存在一个Controller Leader来对外提供服务。在集群启动时,多个Broker上的KafkaController会在指定的Zookeeper路径下竞争创建节点,只有第一个成功创建节点的KafkaController才能成为Controller Leader,而其余的KafkaController则成为Controller Follower。当Controller Leader出现故障后,所有的Controller Follower会收到通知,再次竞争在该Zookeeper路径下创建节点从而选出新的Controller Leader。

Zookeeper中与KafkaController相关的路径以及该路径中记录的内容的含义如下:

  • /brokers/ids/[id]:记录了集群中可用Broker的id。
  • /brokers/topics/[topic]/partitions:记录了一个Topic中所有分区的分配信息以及AR集合信息。
  • /brokers/topics/[topic]/partitions/[partition_id]/state:记录了某Partition的Leader副本所在Broker的ID、Leader年代信息、ISR集合、ZKVersion等信息。
  • /controller_epoch:记录了当前Controller Leader的年代信息。
  • /controller:记录了当前Controller Leader的ID,也用于Controller Leader的选举。
  • /admin/reassign_partitions:记录了需要进行副本重新分配的分区。
  • /admin/preferred_replica_election:记录了需要进行“优先副本”选举的分区。“优先副本”是在创建分区时为其指定的第一个副本。
  • /admin/delete_topics:记录了待删除的Topic。
  • /isr_change_notification:记录了一段时间内ISR集合发生变化的分区。
  • /config:记录了一些配置信息。

下面是Zookeeper中Kafka相关的路径节点示意图:

1.Kafka在Zookeeper节点的信息.png

KafkaController的设计依赖于以下的组件:

  • KafkaController:组织并封装了其他组件,对外提供API接口。
  • ZookeeperLeaderElector:主要用于Controller Leader的选举。
  • ControllerContext:KafkaController的上下文信息,缓存了ZooKeeper中记录的整个集群的元信息,例如,可用Broker、全部的Topic、分区、副本的信息。
  • ControllerChannelManager:维护了Controller Leader与集群中其他Broker之间的网络连接,是管理整个集群的基础。
  • TopicDeletionManager:用于对指定的Topic进行删除。
  • PartitionStateMachine:用于管理集群中所有Partition状态的状态机。
  • ReplicaStateMachine:用于管理集群中所有副本状态的状态机。
  • ControllerBrokerRequestBatch:实现了向Broker批量发送请求的功能。
  • PartitionLeaderSelector:实现了多种Leader副本选举策略,有以下几类:
    • OfflinePartitionLeaderSelector
    • ReassignedPartitionLeaderSelector
    • PreferredReplicaPartitionLeaderSelector
    • ControlledShutdownLeaderSelector
    • NoOpLeaderSelector
  • Listener:ZooKeeper上的监听器,实现了对ZooKeeper上某些节点中的数据、子节点或ZooKeeper Session状态的监听,被触发后调用相应的业务逻辑。有以下几类:
    • TopicChangeListener
    • TopicDeletionManager与DeleteTopicsListener
    • PartitionModificationsListener
    • BrokerChangeListener
    • IsrChangeNotificationListener
    • PreferredReplicaElectionListener
    • PartitionsReassignedListener
    • ReassignedPartitionsIsrChangeListener

KafkaController是ZooKeeper与Kafka集群交互的桥梁:它一方面对ZooKeeper进行监听,其中包括Broker写入到ZooKeeper中的数据,也包括管理员使用脚本写入的数据;另一方面根据ZooKeeper中数据的变化做出相应的处理,通过LeaderAndIsrRequest、StopReplicaRequest、UpdateMetadataRequest等请求控制每个Broker的工作。

2. ControllerContext

ControllerContext中维护了Controller使用到的上下文信息,与ZooKeeper有密切的关系,也可以将ControllerContext看作ZooKeeper数据的缓存;它的定义和重要字段如下:

  • /**
  • * 维护了Controller使用到的上下文信息,可以看作ZooKeeper数据的缓存
  • * @param zkUtils Zookeeper工具类
  • * @param zkSessionTimeout Zookeeper超时时间
  • */
  • class ControllerContext(val zkUtils: ZkUtils,
  • val zkSessionTimeout: Int) {
  • // 管理Controller与集群中Broker之间的连接
  • var controllerChannelManager: ControllerChannelManager = null
  • val controllerLock: ReentrantLock = new ReentrantLock()
  • // 正在关闭的Broker的ID集合
  • var shuttingDownBrokerIds: mutable.Set[Int] = mutable.Set.empty
  • val brokerShutdownLock: Object = new Object
  • /**
  • * Controller的年代信息,初始为0;
  • * Controller的年代信息存储在Zookeeper中的路径是/controller_epoch;
  • * 每次重新选举新的Leader Controller,epoch字段值就会增加1。
  • */
  • var epoch: Int = KafkaController.InitialControllerEpoch - 1
  • // 年代信息的Zookeeper版本,初始为0
  • var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion - 1
  • // 整个集群中全部的Topic的名称
  • var allTopics: Set[String] = Set.empty
  • // 记录了每个分区的AR集合
  • var partitionReplicaAssignment: mutable.Map[TopicAndPartition, Seq[Int]] = mutable.Map.empty
  • // 记录了每个分区的Leader副本所在Broker的ID、ISR集合、年代信息
  • var partitionLeadershipInfo: mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = mutable.Map.empty
  • // 记录了正在进行重新分配副本的分区
  • val partitionsBeingReassigned: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] = new mutable.HashMap
  • // 记录了正在进行“优先副本”选举的分区
  • val partitionsUndergoingPreferredReplicaElection: mutable.Set[TopicAndPartition] = new mutable.HashSet
  • // 记录了当前可用的Broker
  • private var liveBrokersUnderlying: Set[Broker] = Set.empty
  • // 记录了当前可用的Broker的ID
  • private var liveBrokerIdsUnderlying: Set[Int] = Set.empty
  • ...
  • }

其中涉及到了两个样例类:LeaderIsrAndControllerEpoch和ReassignedPartitionsContext;LeaderIsrAndControllerEpoch用于表示每个分区的Leader副本所在的Broker的ID、ISR集合、Controller年代信息等:

  • // kafka.controller.LeaderIsrAndControllerEpoch
  • /**
  • * @param leaderAndIsr 记录了Leader副本所在的Broker的ID、ISR集合
  • * @param controllerEpoch 记录了Controller年代信息
  • */
  • case class LeaderIsrAndControllerEpoch(leaderAndIsr: LeaderAndIsr, controllerEpoch: Int) {
  • override def toString(): String = {
  • val leaderAndIsrInfo = new StringBuilder
  • leaderAndIsrInfo.append("(Leader:" + leaderAndIsr.leader)
  • leaderAndIsrInfo.append(",ISR:" + leaderAndIsr.isr.mkString(","))
  • leaderAndIsrInfo.append(",LeaderEpoch:" + leaderAndIsr.leaderEpoch)
  • leaderAndIsrInfo.append(",ControllerEpoch:" + controllerEpoch + ")")
  • leaderAndIsrInfo.toString()
  • }
  • }
  • // kafka.api.LeaderAndIsr
  • object LeaderAndIsr {
  • val initialLeaderEpoch: Int = 0
  • val initialZKVersion: Int = 0
  • val NoLeader = -1
  • val LeaderDuringDelete = -2
  • }
  • /**
  • * @param leader Leader副本所在的Broker的ID
  • * @param leaderEpoch Leader的年代信息
  • * @param isr ISR集合
  • * @param zkVersion Zookeeper中存储的版本信息
  • */
  • case class LeaderAndIsr(var leader: Int, var leaderEpoch: Int, var isr: List[Int], var zkVersion: Int) {
  • def this(leader: Int, isr: List[Int]) = this(leader, LeaderAndIsr.initialLeaderEpoch, isr, LeaderAndIsr.initialZKVersion)
  • override def toString(): String = {
  • Json.encode(Map("leader" -> leader, "leader_epoch" -> leaderEpoch, "isr" -> isr))
  • }
  • }

ReassignedPartitionsContext则封装了新分配的AR集合信息以及用于监听ISR集合变化的ReassignedPartitionsIsrChangeListener,源码如下:

  • // kafka.controller.ReassignedPartitionsContext
  • /**
  • * @param newReplicas 新分配的AR集合信息
  • * @param isrChangeListener 用于监听ISR集合变化的ReassignedPartitionsIsrChangeListener监听器
  • */
  • case class ReassignedPartitionsContext(var newReplicas: Seq[Int] = Seq.empty,
  • var isrChangeListener: ReassignedPartitionsIsrChangeListener = null)
  • case class PartitionAndReplica(topic: String, partition: Int, replica: Int) {
  • override def toString(): String = {
  • "[Topic=%s,Partition=%d,Replica=%d]".format(topic, partition, replica)
  • }
  • }

ControllerContext还提供了大量操作内部字段的便捷方法:

  • // kafka.controller.ControllerContext#liveBrokers_=
  • // 对liveBrokersUnderlying和liveBrokerIdsUnderlying的setter方法
  • def liveBrokers_=(brokers: Set[Broker]) {
  • liveBrokersUnderlying = brokers
  • liveBrokerIdsUnderlying = liveBrokersUnderlying.map(_.id)
  • }
  • /**
  • * 对liveBrokersUnderlying和liveBrokerIdsUnderlying的getter方法
  • * 从liveBrokersUnderlying或liveBrokerIdsUnderlying集合中排除shuttingDownBrokerIds集合后返回
  • */
  • // kafka.controller.ControllerContext#liveBrokers
  • def liveBrokers = liveBrokersUnderlying.filter(broker => !shuttingDownBrokerIds.contains(broker.id))
  • // kafka.controller.ControllerContext#liveBrokerIds
  • def liveBrokerIds = liveBrokerIdsUnderlying.filter(brokerId => !shuttingDownBrokerIds.contains(brokerId))
  • // 获取liveBrokersUnderlying/liveBrokerIdsUnderlying集合
  • // kafka.controller.ControllerContext#liveOrShuttingDownBrokerIds
  • def liveOrShuttingDownBrokerIds = liveBrokerIdsUnderlying
  • // kafka.controller.ControllerContext#liveOrShuttingDownBrokers
  • def liveOrShuttingDownBrokers = liveBrokersUnderlying
  • // kafka.controller.ControllerContext#partitionsOnBroker
  • // 获取在指定Broker中存在有副本的分区集合
  • def partitionsOnBroker(brokerId: Int): Set[TopicAndPartition] = {
  • partitionReplicaAssignment
  • .filter { case(topicAndPartition, replicas) => replicas.contains(brokerId) }
  • .map { case(topicAndPartition, replicas) => topicAndPartition }
  • .toSet
  • }
  • // kafka.controller.ControllerContext#replicasOnBrokers
  • // 获取指定Broker集合中保存的所有副本
  • def replicasOnBrokers(brokerIds: Set[Int]): Set[PartitionAndReplica] = {
  • brokerIds.map { brokerId =>
  • partitionReplicaAssignment
  • .filter { case(topicAndPartition, replicas) => replicas.contains(brokerId) }
  • .map { case(topicAndPartition, replicas) =>
  • new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, brokerId) }
  • }.flatten.toSet
  • }
  • // kafka.controller.ControllerContext#replicasForTopic
  • // 获取指定Topic的所有副本
  • def replicasForTopic(topic: String): Set[PartitionAndReplica] = {
  • partitionReplicaAssignment
  • .filter { case(topicAndPartition, replicas) => topicAndPartition.topic.equals(topic) }
  • .map { case(topicAndPartition, replicas) =>
  • replicas.map { r =>
  • new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, r)
  • }
  • }.flatten.toSet
  • }
  • // kafka.controller.ControllerContext#partitionsForTopic
  • // 获取指定Topic的所有分区
  • def partitionsForTopic(topic: String): collection.Set[TopicAndPartition] = {
  • partitionReplicaAssignment
  • .filter { case(topicAndPartition, replicas) => topicAndPartition.topic.equals(topic) }.keySet
  • }
  • // kafka.controller.ControllerContext#allLiveReplicas
  • // 获取所有可用Broker中保存的副本
  • def allLiveReplicas(): Set[PartitionAndReplica] = {
  • replicasOnBrokers(liveBrokerIds)
  • }
  • // kafka.controller.ControllerContext#replicasForPartition
  • // 获取指定分区集合的副本
  • def replicasForPartition(partitions: collection.Set[TopicAndPartition]): collection.Set[PartitionAndReplica] = {
  • partitions.map { p =>
  • val replicas = partitionReplicaAssignment(p)
  • replicas.map(r => new PartitionAndReplica(p.topic, p.partition, r))
  • }.flatten
  • }
  • // kafka.controller.ControllerContext#removeTopic
  • // 删除指定Topic
  • def removeTopic(topic: String) = {
  • partitionLeadershipInfo = partitionLeadershipInfo.filter{ case (topicAndPartition, _) => topicAndPartition.topic != topic }
  • partitionReplicaAssignment = partitionReplicaAssignment.filter{ case (topicAndPartition, _) => topicAndPartition.topic != topic }
  • allTopics -= topic
  • }

3. ControllerChannelManager

ControllerChannelManager负责管理KafkaController与集群中各个Broker之间的网络交互,它内部为每个可用的Broker创建了对应的请求阻塞队列,并开启一个请求发送线程阻塞式地处理阻塞队列中的请求,它的定义及重要字段如下:

  • /**
  • * KafkaController使用ControllerChannelManager管理其与集群中各个Broker之间的网络交互
  • * @param controllerContext KafkaController上下文
  • * @param config 配置信息
  • * @param time
  • * @param metrics
  • * @param threadNamePrefix
  • */
  • class ControllerChannelManager(controllerContext: ControllerContext, config: KafkaConfig, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging {
  • // 用于管理集群中每个Broker对应的ControllerBrokerStateInfo对象
  • protected val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo]
  • // 操作brokerStateInfo使用的锁
  • private val brokerLock = new Object
  • ...
  • }

其中brokerStateInfo是一个HashMap[Int, ControllerBrokerStateInfo]类型的字典,它的键表示Broker的ID,值表示KafkaController与对应的Broker之间的连接,ControllerBrokerStateInfo的源码如下:

  • // kafka.controller.ControllerBrokerStateInfo
  • /**
  • * 表示ControllerChannelManager与一个Broker连接的各种信息
  • * @param networkClient 维护Controller与对应Broker通信的网络连接,使用NetworkClientBlockingOps配合实现的阻塞方式
  • * @param brokerNode 维护了对应的Broker的网络位置信息,其中记录了Broker的host、ip、port以及机架信息
  • * @param messageQueue 缓冲队列(LinkedBlockingQueue类型),存放了发往对应Broker的请求,其中每个元素是QueueItem类型,其中封装了Request本身和其对应的回调函数
  • * @param requestSendThread 用于发送请求的线程
  • */
  • case class ControllerBrokerStateInfo(networkClient: NetworkClient,
  • brokerNode: Node,
  • messageQueue: BlockingQueue[QueueItem],
  • requestSendThread: RequestSendThread)

从其定义可知,brokerNode即为该ControllerBrokerStateInfo连接的Broker节点,记录了Broker的host、ip、port以及机架信息;messageQueue则是一个缓冲队列,存放了发往对应Broker的请求,队列内部元素都为QueueItem类型,定义如下:

  • // kafka.controller.QueueItem
  • // 封装了Request对象和其对应的回调函数
  • case class QueueItem(apiKey: ApiKeys, apiVersion: Option[Short], request: AbstractRequest, callback: AbstractRequestResponse => Unit)

QueueItem中封装了request请求对象,同时还记录对应的回调函数。

ControllerBrokerStateInfo构造方法的第4个参数requestSendThread是用于发送请求的线程,类型是RequestSendThread,该线程类继承自前面经常出现的ShutdownableThread类,因此我们需要关注其doWork()方法,源码如下:

  • // kafka.controller.RequestSendThread#doWork
  • override def doWork(): Unit = {
  • // 定义线程睡眠300毫秒的方法
  • def backoff(): Unit = CoreUtils.swallowTrace(Thread.sleep(300))
  • // 从缓冲队列中获取QueueItem,并进行解析
  • val QueueItem(apiKey, apiVersion, request, callback) = queue.take()
  • import NetworkClientBlockingOps._
  • var clientResponse: ClientResponse = null
  • try {
  • lock synchronized { // 加锁
  • var isSendSuccessful = false
  • while (isRunning.get() && !isSendSuccessful) {
  • // if a broker goes down for a long time, then at some point the controller's zookeeper listener will trigger a
  • // removeBroker which will invoke shutdown() on this thread. At that point, we will stop retrying.
  • // 当Broker宕机后,会触发Zookeeper的监听器调用removeBroker()方法将当前线程停止,在停止钱会一直尝试重试
  • try {
  • if (!brokerReady()) {
  • isSendSuccessful = false
  • // 退避一段时候后重试
  • backoff()
  • }
  • else {
  • // 构建请求头
  • val requestHeader = apiVersion.fold(networkClient.nextRequestHeader(apiKey))(networkClient.nextRequestHeader(apiKey, _))
  • // 构建RequestSend请求数据对象
  • val send = new RequestSend(brokerNode.idString, requestHeader, request.toStruct)
  • // 构建ClientRequest请求对象
  • val clientRequest = new ClientRequest(time.milliseconds(), true, send, null)
  • // 发送请求并阻塞等待响应返回
  • clientResponse = networkClient.blockingSendAndReceive(clientRequest)(time)
  • // 标识标记
  • isSendSuccessful = true
  • }
  • } catch {
  • case e: Throwable => // if the send was not successful, reconnect to broker and resend the message
  • warn(("Controller %d epoch %d fails to send request %s to broker %s. " +
  • "Reconnecting to broker.").format(controllerId, controllerContext.epoch,
  • request.toString, brokerNode.toString()), e)
  • // 发送未成功,重新连接broker并重新发送
  • networkClient.close(brokerNode.idString)
  • isSendSuccessful = false
  • // 退避一段时间后重试
  • backoff()
  • }
  • }
  • // 处理响应
  • if (clientResponse != null) {
  • /**
  • * 检测不同的响应构建不同的响应对象,
  • * Controller只能发送LeaderAndIsrRequest、StopReplicaRequest、UpdateMetadataRequest三种请求(略)
  • */
  • val response = ApiKeys.forId(clientResponse.request.request.header.apiKey) match {
  • case ApiKeys.LEADER_AND_ISR => new LeaderAndIsrResponse(clientResponse.responseBody)
  • case ApiKeys.STOP_REPLICA => new StopReplicaResponse(clientResponse.responseBody)
  • case ApiKeys.UPDATE_METADATA_KEY => new UpdateMetadataResponse(clientResponse.responseBody)
  • case apiKey => throw new KafkaException(s"Unexpected apiKey received: $apiKey")
  • }
  • stateChangeLogger.trace("Controller %d epoch %d received response %s for a request sent to broker %s"
  • .format(controllerId, controllerContext.epoch, response.toString, brokerNode.toString))
  • if (callback != null) {
  • // 调用QueueItem中封装的回调函数
  • callback(response)
  • }
  • }
  • }
  • } catch {
  • case e: Throwable =>
  • error("Controller %d fails to send a request to broker %s".format(controllerId, brokerNode.toString()), e)
  • // If there is any socket error (eg, socket timeout), the connection is no longer usable and needs to be recreated.
  • networkClient.close(brokerNode.idString)
  • }
  • }

从该方法的实现中我们可以得到两点信息:

  1. 每次调用doWork()方法都会从阻塞队列中取出一个QueueItem元素,根据该QueueItem元素构造请求并进行发送。
  2. 该线程对象在发送请求时依旧是通过NetworkClientBlockingOps扩展的阻塞方法进行发送并完成响应的处理,每次发送如果未成功会退避一段时间后重试。

ControllerChannelManager中定义了一些方法用于管理brokerStateInfo字典,这些方法在操作brokerStateInfo无一例外地进行了加锁同步处理以保证线程安全:

  • // kafka.controller.ControllerChannelManager#addBroker
  • // 添加Broker以管理
  • def addBroker(broker: Broker) {
  • // be careful here. Maybe the startup() API has already started the request send thread
  • brokerLock synchronized { // 加锁
  • if(!brokerStateInfo.contains(broker.id)) { // 当前brokerStateInfo不存在该Broker
  • // 使用addNewBroker()方法添加
  • addNewBroker(broker)
  • // 启动对应的RequestSendThread线程
  • startRequestSendThread(broker.id)
  • }
  • }
  • }
  • // kafka.controller.ControllerChannelManager#addNewBroker
  • // 实现对brokerStateInfo集合的管理
  • private def addNewBroker(broker: Broker) {
  • // 创建消息队列
  • val messageQueue = new LinkedBlockingQueue[QueueItem]
  • debug("Controller %d trying to connect to broker %d".format(config.brokerId, broker.id))
  • // 从配置构建BrokerEndPoint及BrokerNode
  • val brokerEndPoint = broker.getBrokerEndPoint(config.interBrokerSecurityProtocol)
  • val brokerNode = new Node(broker.id, brokerEndPoint.host, brokerEndPoint.port)
  • // 构建网络通信组件
  • val networkClient = {
  • val channelBuilder = ChannelBuilders.create(
  • config.interBrokerSecurityProtocol,
  • Mode.CLIENT,
  • LoginType.SERVER,
  • config.values,
  • config.saslMechanismInterBrokerProtocol,
  • config.saslInterBrokerHandshakeRequestEnable
  • )
  • val selector = new Selector(
  • NetworkReceive.UNLIMITED,
  • config.connectionsMaxIdleMs,
  • metrics,
  • time,
  • "controller-channel",
  • Map("broker-id" -> broker.id.toString).asJava,
  • false,
  • channelBuilder
  • )
  • new NetworkClient(
  • selector,
  • new ManualMetadataUpdater(Seq(brokerNode).asJava),
  • config.brokerId.toString,
  • 1,
  • 0,
  • Selectable.USE_DEFAULT_BUFFER_SIZE,
  • Selectable.USE_DEFAULT_BUFFER_SIZE,
  • config.requestTimeoutMs,
  • time
  • )
  • }
  • /**
  • * 生成RequestSendThread线程的名称,格式为:
  • * threadNamePrefix:Controller-源brokerId-to-broker-目的brokerId-send-thread
  • */
  • val threadName = threadNamePrefix match {
  • case None => "Controller-%d-to-broker-%d-send-thread".format(config.brokerId, broker.id)
  • case Some(name) => "%s:Controller-%d-to-broker-%d-send-thread".format(name, config.brokerId, broker.id)
  • }
  • // 创建RequestSendThread线程
  • val requestThread = new RequestSendThread(config.brokerId, controllerContext, messageQueue, networkClient,
  • brokerNode, config, time, threadName)
  • requestThread.setDaemon(false)
  • // 将网络通信组件、Broker节点对象、消息队列和RequestSendThread线程封装为一个ControllerBrokerStateInfo对象,并放入brokerStateInfo
  • brokerStateInfo.put(broker.id, new ControllerBrokerStateInfo(networkClient, brokerNode, messageQueue, requestThread))
  • }
  • // kafka.controller.ControllerChannelManager#removeBroker
  • // 移除Broker
  • def removeBroker(brokerId: Int) {
  • brokerLock synchronized { // 加锁
  • // 获取Broker对应的ControllerBrokerStateInfo对象,然后交给removeExistingBroker()方法处理
  • removeExistingBroker(brokerStateInfo(brokerId))
  • }
  • }
  • // kafka.controller.ControllerChannelManager#removeExistingBroker
  • private def removeExistingBroker(brokerState: ControllerBrokerStateInfo) {
  • try {
  • // 关闭网络通信组件
  • brokerState.networkClient.close()
  • // 清空消息队列
  • brokerState.messageQueue.clear()
  • // 关闭RequestSendThread线程
  • brokerState.requestSendThread.shutdown()
  • // 从brokerStateInfo中移除对应的ControllerBrokerStateInfo对象
  • brokerStateInfo.remove(brokerState.brokerNode.id)
  • } catch {
  • case e: Throwable => error("Error while removing broker by the controller", e)
  • }
  • }

而ControllerChannelManager在初始化时,就会为ControllerContext中记录的可用Broker创建对应的ControllerBrokerStateInfo对象,该操作通过下面的代码实现:

  • // 为Broker创建对应的ControllerBrokerStateInfo对象
  • controllerContext.liveBrokers.foreach(addNewBroker(_))

同时在ControllerChannelManager的startup()方法中会启动brokerStateInfo字典里每个Broker对应的RequestSendThread线程:

  • // kafka.controller.ControllerChannelManager#startup
  • def startup() = {
  • brokerLock synchronized {
  • // 启动RequestSendThread线程
  • brokerStateInfo.foreach(brokerState => startRequestSendThread(brokerState._1))
  • }
  • }
  • // kafka.controller.ControllerChannelManager#startRequestSendThread
  • // 根据brokerId启动Broker对应的RequestSendThread线程
  • protected def startRequestSendThread(brokerId: Int) {
  • // 从brokerStateInfo获取brokerId对应的RequestSendThread线程
  • val requestThread = brokerStateInfo(brokerId).requestSendThread
  • if(requestThread.getState == Thread.State.NEW) // 如果该线程的状态为NEW,就将其启动
  • requestThread.start()
  • }

4. ControllerBrokerRequestBatch

ControllerBrokerRequestBatch用于实现批量发送请求的功能,它内部维护了三个字典分别存储了LeaderAndIsrRequest、StopReplicaRequest和UpdateMetadataRequest三类请求;ControllerBrokerRequestBatch的定义和重要字段如下:

  • // kafka.controller.ControllerBrokerRequestBatch
  • /**
  • * ControllerBrokerRequestBatch组件用于实现批量发送请求的功能
  • * @param controller
  • */
  • class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging {
  • val controllerContext = controller.controllerContext
  • val controllerId: Int = controller.config.brokerId
  • // 记录了发往指定Broker的LeaderAndIsrRequest所需的信息
  • val leaderAndIsrRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, PartitionStateInfo]]
  • // 记录了发往指定Broker的StopReplicaRequest所需的信息
  • val stopReplicaRequestMap = mutable.Map.empty[Int, Seq[StopReplicaRequestInfo]]
  • // 记录了发往指定Broker的UpdateMetadataRequest集合
  • val updateMetadataRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, PartitionStateInfo]]
  • ...
  • }

这三个字典中使用了PartitionStateInfo表示LeaderAndIsrRequest和UpdateMetadataRequest所需的信息,使用StopReplicaRequestInfo表示StopReplicaRequest所需的信息,它们的定义如下:

  • // kafka.api.PartitionStateInfo
  • case class PartitionStateInfo(leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, // ISR及Controller年代
  • allReplicas: Set[Int]) { // AR集合
  • def replicationFactor = allReplicas.size
  • def writeTo(buffer: ByteBuffer) {
  • buffer.putInt(leaderIsrAndControllerEpoch.controllerEpoch) // Controller年代
  • buffer.putInt(leaderIsrAndControllerEpoch.leaderAndIsr.leader) // Leader
  • buffer.putInt(leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch) // Leader年代
  • buffer.putInt(leaderIsrAndControllerEpoch.leaderAndIsr.isr.size) // ISR大小
  • leaderIsrAndControllerEpoch.leaderAndIsr.isr.foreach(buffer.putInt(_)) // ISR
  • buffer.putInt(leaderIsrAndControllerEpoch.leaderAndIsr.zkVersion) // Zookeeper版本信息
  • buffer.putInt(replicationFactor)
  • allReplicas.foreach(buffer.putInt(_))
  • }
  • def sizeInBytes(): Int = {
  • val size =
  • 4 /* epoch of the controller that elected the leader */ +
  • 4 /* leader broker id */ +
  • 4 /* leader epoch */ +
  • 4 /* number of replicas in isr */ +
  • 4 * leaderIsrAndControllerEpoch.leaderAndIsr.isr.size /* replicas in isr */ +
  • 4 /* zk version */ +
  • 4 /* replication factor */ +
  • allReplicas.size * 4
  • size
  • }
  • override def toString(): String = {
  • val partitionStateInfo = new StringBuilder
  • partitionStateInfo.append("(LeaderAndIsrInfo:" + leaderIsrAndControllerEpoch.toString)
  • partitionStateInfo.append(",ReplicationFactor:" + replicationFactor + ")")
  • partitionStateInfo.append(",AllReplicas:" + allReplicas.mkString(",") + ")")
  • partitionStateInfo.toString()
  • }
  • }
  • // kafka.controller.StopReplicaRequestInfo
  • case class StopReplicaRequestInfo(replica: PartitionAndReplica, deletePartition: Boolean, callback: AbstractRequestResponse => Unit = null)
  • // kafka.controller.PartitionAndReplica
  • /**
  • * @param topic 主题
  • * @param partition 分区
  • * @param replica 副本
  • */
  • case class PartitionAndReplica(topic: String, partition: Int, replica: Int) {
  • override def toString(): String = {
  • "[Topic=%s,Partition=%d,Replica=%d]".format(topic, partition, replica)
  • }
  • }

ControllerBrokerRequestBatch的newBatch()方法用于检测三个字典是否为空,其中有一个不为空就会抛出IllegalStateException异常:

  • // kafka.controller.ControllerBrokerRequestBatch#newBatch
  • // 用于检测leaderAndIsrRequestMap、stopReplicaRequestMap、updateMetadataRequestMap,有一个不为空就会抛出异常
  • def newBatch() {
  • // raise error if the previous batch is not empty
  • if (leaderAndIsrRequestMap.size > 0)
  • throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating " +
  • "a new one. Some LeaderAndIsr state changes %s might be lost ".format(leaderAndIsrRequestMap.toString()))
  • if (stopReplicaRequestMap.size > 0)
  • throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating a " +
  • "new one. Some StopReplica state changes %s might be lost ".format(stopReplicaRequestMap.toString()))
  • if (updateMetadataRequestMap.size > 0)
  • throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating a " +
  • "new one. Some UpdateMetadata state changes %s might be lost ".format(updateMetadataRequestMap.toString()))
  • }

而相对的,clear()方法用于清空三个字典:

  • // 清空leaderAndIsrRequestMap、stopReplicaRequestMap、updateMetadataRequestMap
  • def clear() {
  • leaderAndIsrRequestMap.clear()
  • stopReplicaRequestMap.clear()
  • updateMetadataRequestMap.clear()
  • }

ControllerBrokerRequestBatch中其它的几个方法的职责都非常分明;addLeaderAndIsrRequestForBrokers(...)方法用于向leaderAndIsrRequestMap集合中添加待发送的LeaderAndIsrRequest所需的数据,源码如下:

  • // kafka.controller.ControllerBrokerRequestBatch#addLeaderAndIsrRequestForBrokers
  • /**
  • * 向leaderAndIsrRequestMap集合中添加待发送的LeaderAndIsrRequest所需的数据,
  • * 同时调用addUpdateMetadataRequestForBrokers()方法准备向集群中所有可用的Broker发送UpdateMetadataRequest
  • * @param brokerIds 接收LeaderAndIsrRequest的Broker的ID集合
  • * @param topic 主题
  • * @param partition 分区
  • * @param leaderIsrAndControllerEpoch 年代信息
  • * @param replicas 新分配的副本集
  • * @param callback 回调
  • */
  • def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int,
  • leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
  • replicas: Seq[Int], callback: AbstractRequestResponse => Unit = null) {
  • // 通过主题和分区构造TopicPartition对象
  • val topicPartition = new TopicPartition(topic, partition)
  • // 对Broker的ID集合进行有效性过滤,然后进行遍历
  • brokerIds.filter(_ >= 0).foreach { brokerId =>
  • // 从leaderAndIsrRequestMap获取发往指定Broker的LeaderAndIsrRequest所需的信息
  • val result = leaderAndIsrRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty)
  • // 添加Leader、ISR、AR等构造LeaderAndIsrRequest请求所需要的信息
  • result.put(topicPartition, PartitionStateInfo(leaderIsrAndControllerEpoch, replicas.toSet))
  • }
  • // 准备向所有可用的Broker发送UpdateMetadataRequest请求
  • addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq,
  • Set(TopicAndPartition(topic, partition)))
  • }

该方法的实现非常简单,只需要将LeaderIsrAndControllerEpoch对象添加到每个Broker对应的字典中即可,注意LeaderIsrAndControllerEpoch对象是面向于分区的;最后该方法会调用addUpdateMetadataRequestForBrokers(...)方法准备向所有可用的Broker发送UpdateMetadataRequest请求,这是由于LeaderIsrAndControllerEpoch请求可能会更新Leader副本等信息,因此需要告知其它的Broker更新元数据;addUpdateMetadataRequestForBrokers(...)方法的源码如下:

  • // kafka.controller.ControllerBrokerRequestBatch#addUpdateMetadataRequestForBrokers
  • /**
  • * Send UpdateMetadataRequest to the given brokers for the given partitions and partitions that are being deleted
  • * 向给定的Broker发送UpdateMetadataRequest请求
  • * */
  • def addUpdateMetadataRequestForBrokers(brokerIds: Seq[Int],
  • partitions: collection.Set[TopicAndPartition] = Set.empty[TopicAndPartition],
  • callback: AbstractRequestResponse => Unit = null) {
  • // 定义回调函数
  • def updateMetadataRequestMapFor(partition: TopicAndPartition, beingDeleted: Boolean) {
  • // 找出Controller中保存的该分区的Leader
  • val leaderIsrAndControllerEpochOpt = controllerContext.partitionLeadershipInfo.get(partition)
  • leaderIsrAndControllerEpochOpt match {
  • case Some(leaderIsrAndControllerEpoch) =>
  • // 获取分区的AR集合
  • val replicas = controllerContext.partitionReplicaAssignment(partition).toSet
  • // 根据beingDeleted参数,构造PartitionStateInfo对象
  • val partitionStateInfo = if (beingDeleted) {
  • val leaderAndIsr = new LeaderAndIsr(LeaderAndIsr.LeaderDuringDelete, leaderIsrAndControllerEpoch.leaderAndIsr.isr)
  • PartitionStateInfo(LeaderIsrAndControllerEpoch(leaderAndIsr, leaderIsrAndControllerEpoch.controllerEpoch), replicas)
  • } else {
  • PartitionStateInfo(leaderIsrAndControllerEpoch, replicas)
  • }
  • // 对Broker的ID集合进行有效性过滤,然后进行遍历,向updateMetadataRequestMap中添加数据
  • brokerIds.filter(b => b >= 0).foreach { brokerId =>
  • updateMetadataRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty[TopicPartition, PartitionStateInfo])
  • updateMetadataRequestMap(brokerId).put(new TopicPartition(partition.topic, partition.partition), partitionStateInfo)
  • }
  • case None =>
  • info("Leader not yet assigned for partition %s. Skip sending UpdateMetadataRequest.".format(partition))
  • }
  • }
  • val filteredPartitions = {
  • val givenPartitions = if (partitions.isEmpty)
  • // 如果给定的Partition集合为空,返回空键集
  • controllerContext.partitionLeadershipInfo.keySet
  • else
  • partitions
  • if (controller.deleteTopicManager.partitionsToBeDeleted.isEmpty)
  • givenPartitions
  • else
  • // 过滤即将被删除的Partition
  • givenPartitions -- controller.deleteTopicManager.partitionsToBeDeleted
  • }
  • if (filteredPartitions.isEmpty) // 最后得到的Partition集合为空
  • // 更新updateMetadataRequestMap,每个BrokerID对应一个空的字典
  • brokerIds.filter(b => b >= 0).foreach { brokerId =>
  • updateMetadataRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty[TopicPartition, PartitionStateInfo])
  • }
  • else // 最后得到的Partition集合不为空
  • // 调用updateMetadataRequestMapFor()方法将filteredPartitions中的分区信息添加到updateMetadataRequestMap集合中,等待发送
  • filteredPartitions.foreach(partition => updateMetadataRequestMapFor(partition, beingDeleted = false))
  • // 将即将被删除的分区信息添加到updateMetadataRequestMap集合中,等待发送,beingDeleted参数为true
  • controller.deleteTopicManager.partitionsToBeDeleted.foreach(partition => updateMetadataRequestMapFor(partition, beingDeleted = true))
  • }

addStopReplicaRequestForBrokers()方法则会向stopReplicaRequestMap集合中添加StopReplicaRequest所需的数据

  • // kafka.controller.ControllerBrokerRequestBatch#addStopReplicaRequestForBrokers
  • // 向stopReplicaRequestMap集合添加StopReplicaRequest请求
  • def addStopReplicaRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, deletePartition: Boolean,
  • callback: (AbstractRequestResponse, Int) => Unit = null) {
  • // 对Broker的ID集合进行有效性过滤,然后进行遍历
  • brokerIds.filter(b => b >= 0).foreach { brokerId =>
  • // 从stopReplicaRequestMap获取发往Broker的StopReplicaRequestInfo集合,如果获取不到就更新为空集合
  • stopReplicaRequestMap.getOrElseUpdate(brokerId, Seq.empty[StopReplicaRequestInfo])
  • // 获取发往Broker的StopReplicaRequestInfo集合v
  • val v = stopReplicaRequestMap(brokerId)
  • // 根据回调情况构造StopReplicaRequestInfo并添加到集合v中
  • if(callback != null)
  • stopReplicaRequestMap(brokerId) = v :+ StopReplicaRequestInfo(PartitionAndReplica(topic, partition, brokerId),
  • deletePartition, (r: AbstractRequestResponse) => callback(r, brokerId))
  • else
  • stopReplicaRequestMap(brokerId) = v :+ StopReplicaRequestInfo(PartitionAndReplica(topic, partition, brokerId),
  • deletePartition)
  • }
  • }

sendRequestsToBrokers()方法使用上述三个集合中的数据来创建相应的请求,并添加到ControllerChannelManager中对应的消息队列中,最终由RequestSendThread线程发送请求,源码如下:

  • /**
  • * 根据leaderAndIsrRequestMap、stopReplicaRequestMap、updateMetadataRequestMap中的数据创建对应的请求
  • * 并添加到ControllerChannelManager中对应的消息队列中,最终由RequestSendThread线程发送这些请求
  • */
  • def sendRequestsToBrokers(controllerEpoch: Int) {
  • try {
  • // 遍历leaderAndIsrRequestMap集合
  • leaderAndIsrRequestMap.foreach { case (broker, partitionStateInfos) =>
  • // 日志记录Leader和Follower分配的情况
  • partitionStateInfos.foreach { case (topicPartition, state) =>
  • val typeOfRequest = if (broker == state.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower"
  • stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request %s to broker %d " +
  • "for partition [%s,%d]").format(controllerId, controllerEpoch, typeOfRequest,
  • state.leaderIsrAndControllerEpoch, broker,
  • topicPartition.topic, topicPartition.partition))
  • }
  • // Leader角色的ID集合
  • val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet
  • // 从可用Broker集合中过滤得到Leader角色所属的Node节点对象
  • val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map {
  • _.getNode(controller.config.interBrokerSecurityProtocol)
  • }
  • // 根据Leader角色集合构建对应的Map[TopicPartition, PartitionState]对象
  • val partitionStates = partitionStateInfos.map { case (topicPartition, partitionStateInfo) =>
  • // 解析得到Leader的ID、年代信息、ISR集合及Controller年代信息
  • val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch
  • // 构建PartitionState对象
  • val partitionState = new requests.PartitionState(controllerEpoch, leaderIsr.leader,
  • leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion,
  • partitionStateInfo.allReplicas.map(Integer.valueOf).asJava
  • )
  • topicPartition -> partitionState
  • }
  • // 创建LeaderAndIsrRequest请求对象
  • val leaderAndIsrRequest = new LeaderAndIsrRequest(controllerId, controllerEpoch, partitionStates.asJava, leaders.asJava)
  • // 使用KafkaController发送请求
  • controller.sendRequest(broker, ApiKeys.LEADER_AND_ISR, None, leaderAndIsrRequest, null)
  • }
  • // 清空leaderAndIsrRequestMap集合
  • leaderAndIsrRequestMap.clear()
  • // 遍历updateMetadataRequestMap集合
  • updateMetadataRequestMap.foreach { case (broker, partitionStateInfos) =>
  • // 遍历每个Broker对应的Map[TopicPartition, PartitionStateInfo]字典,打印日志
  • partitionStateInfos.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request %s " +
  • "to broker %d for partition %s").format(controllerId, controllerEpoch, p._2.leaderIsrAndControllerEpoch,
  • broker, p._1)))
  • // 为每个Broker对应的Map[TopicPartition, PartitionStateInfo]字典构建对应的Map[TopicPartition, PartitionState]对象
  • val partitionStates = partitionStateInfos.map { case (topicPartition, partitionStateInfo) =>
  • val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch
  • val partitionState = new requests.PartitionState(controllerEpoch, leaderIsr.leader,
  • leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion,
  • partitionStateInfo.allReplicas.map(Integer.valueOf).asJava
  • )
  • topicPartition -> partitionState
  • }
  • // Kafka版本判断
  • val version = if (controller.config.interBrokerProtocolVersion >= KAFKA_0_10_0_IV1) 2: Short
  • else if (controller.config.interBrokerProtocolVersion >= KAFKA_0_9_0) 1: Short
  • else 0: Short
  • // 根据Kafka版本构建UpdateMetadataRequest请求对象
  • val updateMetadataRequest =
  • if (version == 0) {
  • // 可用Broker集合
  • val liveBrokers = controllerContext.liveOrShuttingDownBrokers.map(_.getNode(SecurityProtocol.PLAINTEXT))
  • // 构建UpdateMetadataRequest请求对象
  • new UpdateMetadataRequest(controllerId, controllerEpoch, liveBrokers.asJava, partitionStates.asJava)
  • }
  • else {
  • // 遍历可用Broker集合
  • val liveBrokers = controllerContext.liveOrShuttingDownBrokers.map { broker =>
  • // 得到每个Broker的EndPoint
  • val endPoints = broker.endPoints.map { case (securityProtocol, endPoint) =>
  • securityProtocol -> new UpdateMetadataRequest.EndPoint(endPoint.host, endPoint.port)
  • }
  • // 根据BrokerID、EndPoint、Broker的机架信息构建对应的Broker对象
  • new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava, broker.rack.orNull)
  • }
  • // 构建UpdateMetadataRequest请求对象
  • new UpdateMetadataRequest(version, controllerId, controllerEpoch, partitionStates.asJava, liveBrokers.asJava)
  • }
  • // 使用KafkaController发送请求
  • controller.sendRequest(broker, ApiKeys.UPDATE_METADATA_KEY, Some(version), updateMetadataRequest, null)
  • }
  • // 清空updateMetadataRequestMap集合
  • updateMetadataRequestMap.clear()
  • // 遍历stopReplicaRequestMap集合
  • stopReplicaRequestMap.foreach { case (broker, replicaInfoList) =>
  • // 日志记录被删除和保留的副本集合
  • val stopReplicaWithDelete = replicaInfoList.filter(_.deletePartition).map(_.replica).toSet
  • val stopReplicaWithoutDelete = replicaInfoList.filterNot(_.deletePartition).map(_.replica).toSet
  • debug("The stop replica request (delete = true) sent to broker %d is %s"
  • .format(broker, stopReplicaWithDelete.mkString(",")))
  • debug("The stop replica request (delete = false) sent to broker %d is %s"
  • .format(broker, stopReplicaWithoutDelete.mkString(",")))
  • replicaInfoList.foreach { r =>
  • // 构建StopReplicaRequest请求
  • val stopReplicaRequest = new StopReplicaRequest(controllerId, controllerEpoch, r.deletePartition,
  • Set(new TopicPartition(r.replica.topic, r.replica.partition)).asJava)
  • // 使用KafkaController发送请求
  • controller.sendRequest(broker, ApiKeys.STOP_REPLICA, None, stopReplicaRequest, r.callback)
  • }
  • }
  • // 清空stopReplicaRequestMap集合
  • stopReplicaRequestMap.clear()
  • } catch {
  • case e : Throwable => {
  • if (leaderAndIsrRequestMap.size > 0) {
  • error("Haven't been able to send leader and isr requests, current state of " +
  • s"the map is $leaderAndIsrRequestMap. Exception message: $e")
  • }
  • if (updateMetadataRequestMap.size > 0) {
  • error("Haven't been able to send metadata update requests, current state of " +
  • s"the map is $updateMetadataRequestMap. Exception message: $e")
  • }
  • if (stopReplicaRequestMap.size > 0) {
  • error("Haven't been able to send stop replica requests, current state of " +
  • s"the map is $stopReplicaRequestMap. Exception message: $e")
  • }
  • throw new IllegalStateException(e)
  • }
  • }
  • }

有了对ControllerChannelManager、ControllerBrokerStateInfo及ControllerBrokerRequestBatch的理解,我们可以得到下面的协作图:

2.KafkaController与Broker的通信.png