Kafka系列 18 - 服务端源码分析 09:KafkaController基本组件
发布于 / 2019-06-21
简介:主要简介KafkaController的辅助组件,包括ControllerContext、ControllerChannelManager和ControllerBrokerRequestBatch
1. KafkaController简介
在Kafka集群的多个Broker中,每个Broker启动的时候,都会实例化一个KafkaController,并将Broker的ID注册到Zookeeper;多个Broker的KafkaController中的一个会被选举为Controller Leader,负责管理整个集群中所有的分区和副本的状态。当某分区的Leader副本出现故障时,由Controller Leader负责为该分区重新选举新的Leader副本;当使用kafka-topics脚本增加某Topic的分区数量时,由Controller Leader管理分区的重新分配;当检测到分区的ISR集合发生变化时,由Controller Leader通知集群中所有的Broker更新其MetadataCache信息。
为了实现Controller的高可用,一个Broker被选为Controller Leader之后,其他的Broker的KafkaController都会成为Controller Follower,当Controller Leader因故障而出现宕机时,会从剩下的Controller Follower中选出新的Controller Leader来管理集群。选举Controller Leader依赖于ZooKeeper实现,每个Broker启动时都会创建一个KafkaController对象,但是集群中只能存在一个Controller Leader来对外提供服务。在集群启动时,多个Broker上的KafkaController会在指定的Zookeeper路径下竞争创建节点,只有第一个成功创建节点的KafkaController才能成为Controller Leader,而其余的KafkaController则成为Controller Follower。当Controller Leader出现故障后,所有的Controller Follower会收到通知,再次竞争在该Zookeeper路径下创建节点从而选出新的Controller Leader。
Zookeeper中与KafkaController相关的路径以及该路径中记录的内容的含义如下:
/brokers/ids/[id]
:记录了集群中可用Broker的id。/brokers/topics/[topic]/partitions
:记录了一个Topic中所有分区的分配信息以及AR集合信息。/brokers/topics/[topic]/partitions/[partition_id]/state
:记录了某Partition的Leader副本所在Broker的ID、Leader年代信息、ISR集合、ZKVersion等信息。/controller_epoch
:记录了当前Controller Leader的年代信息。/controller
:记录了当前Controller Leader的ID,也用于Controller Leader的选举。/admin/reassign_partitions
:记录了需要进行副本重新分配的分区。/admin/preferred_replica_election
:记录了需要进行“优先副本”选举的分区。“优先副本”是在创建分区时为其指定的第一个副本。/admin/delete_topics
:记录了待删除的Topic。/isr_change_notification
:记录了一段时间内ISR集合发生变化的分区。/config
:记录了一些配置信息。
下面是Zookeeper中Kafka相关的路径节点示意图:
KafkaController的设计依赖于以下的组件:
- KafkaController:组织并封装了其他组件,对外提供API接口。
- ZookeeperLeaderElector:主要用于Controller Leader的选举。
- ControllerContext:KafkaController的上下文信息,缓存了ZooKeeper中记录的整个集群的元信息,例如,可用Broker、全部的Topic、分区、副本的信息。
- ControllerChannelManager:维护了Controller Leader与集群中其他Broker之间的网络连接,是管理整个集群的基础。
- TopicDeletionManager:用于对指定的Topic进行删除。
- PartitionStateMachine:用于管理集群中所有Partition状态的状态机。
- ReplicaStateMachine:用于管理集群中所有副本状态的状态机。
- ControllerBrokerRequestBatch:实现了向Broker批量发送请求的功能。
- PartitionLeaderSelector:实现了多种Leader副本选举策略,有以下几类:
- OfflinePartitionLeaderSelector
- ReassignedPartitionLeaderSelector
- PreferredReplicaPartitionLeaderSelector
- ControlledShutdownLeaderSelector
- NoOpLeaderSelector
- Listener:ZooKeeper上的监听器,实现了对ZooKeeper上某些节点中的数据、子节点或ZooKeeper Session状态的监听,被触发后调用相应的业务逻辑。有以下几类:
- TopicChangeListener
- TopicDeletionManager与DeleteTopicsListener
- PartitionModificationsListener
- BrokerChangeListener
- IsrChangeNotificationListener
- PreferredReplicaElectionListener
- PartitionsReassignedListener
- ReassignedPartitionsIsrChangeListener
KafkaController是ZooKeeper与Kafka集群交互的桥梁:它一方面对ZooKeeper进行监听,其中包括Broker写入到ZooKeeper中的数据,也包括管理员使用脚本写入的数据;另一方面根据ZooKeeper中数据的变化做出相应的处理,通过LeaderAndIsrRequest、StopReplicaRequest、UpdateMetadataRequest等请求控制每个Broker的工作。
2. ControllerContext
ControllerContext中维护了Controller使用到的上下文信息,与ZooKeeper有密切的关系,也可以将ControllerContext看作ZooKeeper数据的缓存;它的定义和重要字段如下:
- /**
- * 维护了Controller使用到的上下文信息,可以看作ZooKeeper数据的缓存
- * @param zkUtils Zookeeper工具类
- * @param zkSessionTimeout Zookeeper超时时间
- */
- class ControllerContext(val zkUtils: ZkUtils,
- val zkSessionTimeout: Int) {
- // 管理Controller与集群中Broker之间的连接
- var controllerChannelManager: ControllerChannelManager = null
- val controllerLock: ReentrantLock = new ReentrantLock()
- // 正在关闭的Broker的ID集合
- var shuttingDownBrokerIds: mutable.Set[Int] = mutable.Set.empty
- val brokerShutdownLock: Object = new Object
- /**
- * Controller的年代信息,初始为0;
- * Controller的年代信息存储在Zookeeper中的路径是/controller_epoch;
- * 每次重新选举新的Leader Controller,epoch字段值就会增加1。
- */
- var epoch: Int = KafkaController.InitialControllerEpoch - 1
- // 年代信息的Zookeeper版本,初始为0
- var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion - 1
- // 整个集群中全部的Topic的名称
- var allTopics: Set[String] = Set.empty
- // 记录了每个分区的AR集合
- var partitionReplicaAssignment: mutable.Map[TopicAndPartition, Seq[Int]] = mutable.Map.empty
- // 记录了每个分区的Leader副本所在Broker的ID、ISR集合、年代信息
- var partitionLeadershipInfo: mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = mutable.Map.empty
- // 记录了正在进行重新分配副本的分区
- val partitionsBeingReassigned: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] = new mutable.HashMap
- // 记录了正在进行“优先副本”选举的分区
- val partitionsUndergoingPreferredReplicaElection: mutable.Set[TopicAndPartition] = new mutable.HashSet
- // 记录了当前可用的Broker
- private var liveBrokersUnderlying: Set[Broker] = Set.empty
- // 记录了当前可用的Broker的ID
- private var liveBrokerIdsUnderlying: Set[Int] = Set.empty
- ...
- }
其中涉及到了两个样例类:LeaderIsrAndControllerEpoch和ReassignedPartitionsContext;LeaderIsrAndControllerEpoch用于表示每个分区的Leader副本所在的Broker的ID、ISR集合、Controller年代信息等:
- // kafka.controller.LeaderIsrAndControllerEpoch
- /**
- * @param leaderAndIsr 记录了Leader副本所在的Broker的ID、ISR集合
- * @param controllerEpoch 记录了Controller年代信息
- */
- case class LeaderIsrAndControllerEpoch(leaderAndIsr: LeaderAndIsr, controllerEpoch: Int) {
- override def toString(): String = {
- val leaderAndIsrInfo = new StringBuilder
- leaderAndIsrInfo.append("(Leader:" + leaderAndIsr.leader)
- leaderAndIsrInfo.append(",ISR:" + leaderAndIsr.isr.mkString(","))
- leaderAndIsrInfo.append(",LeaderEpoch:" + leaderAndIsr.leaderEpoch)
- leaderAndIsrInfo.append(",ControllerEpoch:" + controllerEpoch + ")")
- leaderAndIsrInfo.toString()
- }
- }
- // kafka.api.LeaderAndIsr
- object LeaderAndIsr {
- val initialLeaderEpoch: Int = 0
- val initialZKVersion: Int = 0
- val NoLeader = -1
- val LeaderDuringDelete = -2
- }
- /**
- * @param leader Leader副本所在的Broker的ID
- * @param leaderEpoch Leader的年代信息
- * @param isr ISR集合
- * @param zkVersion Zookeeper中存储的版本信息
- */
- case class LeaderAndIsr(var leader: Int, var leaderEpoch: Int, var isr: List[Int], var zkVersion: Int) {
- def this(leader: Int, isr: List[Int]) = this(leader, LeaderAndIsr.initialLeaderEpoch, isr, LeaderAndIsr.initialZKVersion)
- override def toString(): String = {
- Json.encode(Map("leader" -> leader, "leader_epoch" -> leaderEpoch, "isr" -> isr))
- }
- }
ReassignedPartitionsContext则封装了新分配的AR集合信息以及用于监听ISR集合变化的ReassignedPartitionsIsrChangeListener,源码如下:
- // kafka.controller.ReassignedPartitionsContext
- /**
- * @param newReplicas 新分配的AR集合信息
- * @param isrChangeListener 用于监听ISR集合变化的ReassignedPartitionsIsrChangeListener监听器
- */
- case class ReassignedPartitionsContext(var newReplicas: Seq[Int] = Seq.empty,
- var isrChangeListener: ReassignedPartitionsIsrChangeListener = null)
- case class PartitionAndReplica(topic: String, partition: Int, replica: Int) {
- override def toString(): String = {
- "[Topic=%s,Partition=%d,Replica=%d]".format(topic, partition, replica)
- }
- }
ControllerContext还提供了大量操作内部字段的便捷方法:
- // kafka.controller.ControllerContext#liveBrokers_=
- // 对liveBrokersUnderlying和liveBrokerIdsUnderlying的setter方法
- def liveBrokers_=(brokers: Set[Broker]) {
- liveBrokersUnderlying = brokers
- liveBrokerIdsUnderlying = liveBrokersUnderlying.map(_.id)
- }
- /**
- * 对liveBrokersUnderlying和liveBrokerIdsUnderlying的getter方法
- * 从liveBrokersUnderlying或liveBrokerIdsUnderlying集合中排除shuttingDownBrokerIds集合后返回
- */
- // kafka.controller.ControllerContext#liveBrokers
- def liveBrokers = liveBrokersUnderlying.filter(broker => !shuttingDownBrokerIds.contains(broker.id))
- // kafka.controller.ControllerContext#liveBrokerIds
- def liveBrokerIds = liveBrokerIdsUnderlying.filter(brokerId => !shuttingDownBrokerIds.contains(brokerId))
- // 获取liveBrokersUnderlying/liveBrokerIdsUnderlying集合
- // kafka.controller.ControllerContext#liveOrShuttingDownBrokerIds
- def liveOrShuttingDownBrokerIds = liveBrokerIdsUnderlying
- // kafka.controller.ControllerContext#liveOrShuttingDownBrokers
- def liveOrShuttingDownBrokers = liveBrokersUnderlying
- // kafka.controller.ControllerContext#partitionsOnBroker
- // 获取在指定Broker中存在有副本的分区集合
- def partitionsOnBroker(brokerId: Int): Set[TopicAndPartition] = {
- partitionReplicaAssignment
- .filter { case(topicAndPartition, replicas) => replicas.contains(brokerId) }
- .map { case(topicAndPartition, replicas) => topicAndPartition }
- .toSet
- }
- // kafka.controller.ControllerContext#replicasOnBrokers
- // 获取指定Broker集合中保存的所有副本
- def replicasOnBrokers(brokerIds: Set[Int]): Set[PartitionAndReplica] = {
- brokerIds.map { brokerId =>
- partitionReplicaAssignment
- .filter { case(topicAndPartition, replicas) => replicas.contains(brokerId) }
- .map { case(topicAndPartition, replicas) =>
- new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, brokerId) }
- }.flatten.toSet
- }
- // kafka.controller.ControllerContext#replicasForTopic
- // 获取指定Topic的所有副本
- def replicasForTopic(topic: String): Set[PartitionAndReplica] = {
- partitionReplicaAssignment
- .filter { case(topicAndPartition, replicas) => topicAndPartition.topic.equals(topic) }
- .map { case(topicAndPartition, replicas) =>
- replicas.map { r =>
- new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, r)
- }
- }.flatten.toSet
- }
- // kafka.controller.ControllerContext#partitionsForTopic
- // 获取指定Topic的所有分区
- def partitionsForTopic(topic: String): collection.Set[TopicAndPartition] = {
- partitionReplicaAssignment
- .filter { case(topicAndPartition, replicas) => topicAndPartition.topic.equals(topic) }.keySet
- }
- // kafka.controller.ControllerContext#allLiveReplicas
- // 获取所有可用Broker中保存的副本
- def allLiveReplicas(): Set[PartitionAndReplica] = {
- replicasOnBrokers(liveBrokerIds)
- }
- // kafka.controller.ControllerContext#replicasForPartition
- // 获取指定分区集合的副本
- def replicasForPartition(partitions: collection.Set[TopicAndPartition]): collection.Set[PartitionAndReplica] = {
- partitions.map { p =>
- val replicas = partitionReplicaAssignment(p)
- replicas.map(r => new PartitionAndReplica(p.topic, p.partition, r))
- }.flatten
- }
- // kafka.controller.ControllerContext#removeTopic
- // 删除指定Topic
- def removeTopic(topic: String) = {
- partitionLeadershipInfo = partitionLeadershipInfo.filter{ case (topicAndPartition, _) => topicAndPartition.topic != topic }
- partitionReplicaAssignment = partitionReplicaAssignment.filter{ case (topicAndPartition, _) => topicAndPartition.topic != topic }
- allTopics -= topic
- }
3. ControllerChannelManager
ControllerChannelManager负责管理KafkaController与集群中各个Broker之间的网络交互,它内部为每个可用的Broker创建了对应的请求阻塞队列,并开启一个请求发送线程阻塞式地处理阻塞队列中的请求,它的定义及重要字段如下:
- /**
- * KafkaController使用ControllerChannelManager管理其与集群中各个Broker之间的网络交互
- * @param controllerContext KafkaController上下文
- * @param config 配置信息
- * @param time
- * @param metrics
- * @param threadNamePrefix
- */
- class ControllerChannelManager(controllerContext: ControllerContext, config: KafkaConfig, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging {
- // 用于管理集群中每个Broker对应的ControllerBrokerStateInfo对象
- protected val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo]
- // 操作brokerStateInfo使用的锁
- private val brokerLock = new Object
- ...
- }
其中brokerStateInfo
是一个HashMap[Int, ControllerBrokerStateInfo]类型的字典,它的键表示Broker的ID,值表示KafkaController与对应的Broker之间的连接,ControllerBrokerStateInfo的源码如下:
- // kafka.controller.ControllerBrokerStateInfo
- /**
- * 表示ControllerChannelManager与一个Broker连接的各种信息
- * @param networkClient 维护Controller与对应Broker通信的网络连接,使用NetworkClientBlockingOps配合实现的阻塞方式
- * @param brokerNode 维护了对应的Broker的网络位置信息,其中记录了Broker的host、ip、port以及机架信息
- * @param messageQueue 缓冲队列(LinkedBlockingQueue类型),存放了发往对应Broker的请求,其中每个元素是QueueItem类型,其中封装了Request本身和其对应的回调函数
- * @param requestSendThread 用于发送请求的线程
- */
- case class ControllerBrokerStateInfo(networkClient: NetworkClient,
- brokerNode: Node,
- messageQueue: BlockingQueue[QueueItem],
- requestSendThread: RequestSendThread)
从其定义可知,brokerNode
即为该ControllerBrokerStateInfo连接的Broker节点,记录了Broker的host、ip、port以及机架信息;messageQueue
则是一个缓冲队列,存放了发往对应Broker的请求,队列内部元素都为QueueItem类型,定义如下:
- // kafka.controller.QueueItem
- // 封装了Request对象和其对应的回调函数
- case class QueueItem(apiKey: ApiKeys, apiVersion: Option[Short], request: AbstractRequest, callback: AbstractRequestResponse => Unit)
QueueItem中封装了request
请求对象,同时还记录对应的回调函数。
ControllerBrokerStateInfo构造方法的第4个参数requestSendThread
是用于发送请求的线程,类型是RequestSendThread,该线程类继承自前面经常出现的ShutdownableThread类,因此我们需要关注其doWork()
方法,源码如下:
- // kafka.controller.RequestSendThread#doWork
- override def doWork(): Unit = {
- // 定义线程睡眠300毫秒的方法
- def backoff(): Unit = CoreUtils.swallowTrace(Thread.sleep(300))
- // 从缓冲队列中获取QueueItem,并进行解析
- val QueueItem(apiKey, apiVersion, request, callback) = queue.take()
- import NetworkClientBlockingOps._
- var clientResponse: ClientResponse = null
- try {
- lock synchronized { // 加锁
- var isSendSuccessful = false
- while (isRunning.get() && !isSendSuccessful) {
- // if a broker goes down for a long time, then at some point the controller's zookeeper listener will trigger a
- // removeBroker which will invoke shutdown() on this thread. At that point, we will stop retrying.
- // 当Broker宕机后,会触发Zookeeper的监听器调用removeBroker()方法将当前线程停止,在停止钱会一直尝试重试
- try {
- if (!brokerReady()) {
- isSendSuccessful = false
- // 退避一段时候后重试
- backoff()
- }
- else {
- // 构建请求头
- val requestHeader = apiVersion.fold(networkClient.nextRequestHeader(apiKey))(networkClient.nextRequestHeader(apiKey, _))
- // 构建RequestSend请求数据对象
- val send = new RequestSend(brokerNode.idString, requestHeader, request.toStruct)
- // 构建ClientRequest请求对象
- val clientRequest = new ClientRequest(time.milliseconds(), true, send, null)
- // 发送请求并阻塞等待响应返回
- clientResponse = networkClient.blockingSendAndReceive(clientRequest)(time)
- // 标识标记
- isSendSuccessful = true
- }
- } catch {
- case e: Throwable => // if the send was not successful, reconnect to broker and resend the message
- warn(("Controller %d epoch %d fails to send request %s to broker %s. " +
- "Reconnecting to broker.").format(controllerId, controllerContext.epoch,
- request.toString, brokerNode.toString()), e)
- // 发送未成功,重新连接broker并重新发送
- networkClient.close(brokerNode.idString)
- isSendSuccessful = false
- // 退避一段时间后重试
- backoff()
- }
- }
- // 处理响应
- if (clientResponse != null) {
- /**
- * 检测不同的响应构建不同的响应对象,
- * Controller只能发送LeaderAndIsrRequest、StopReplicaRequest、UpdateMetadataRequest三种请求(略)
- */
- val response = ApiKeys.forId(clientResponse.request.request.header.apiKey) match {
- case ApiKeys.LEADER_AND_ISR => new LeaderAndIsrResponse(clientResponse.responseBody)
- case ApiKeys.STOP_REPLICA => new StopReplicaResponse(clientResponse.responseBody)
- case ApiKeys.UPDATE_METADATA_KEY => new UpdateMetadataResponse(clientResponse.responseBody)
- case apiKey => throw new KafkaException(s"Unexpected apiKey received: $apiKey")
- }
- stateChangeLogger.trace("Controller %d epoch %d received response %s for a request sent to broker %s"
- .format(controllerId, controllerContext.epoch, response.toString, brokerNode.toString))
- if (callback != null) {
- // 调用QueueItem中封装的回调函数
- callback(response)
- }
- }
- }
- } catch {
- case e: Throwable =>
- error("Controller %d fails to send a request to broker %s".format(controllerId, brokerNode.toString()), e)
- // If there is any socket error (eg, socket timeout), the connection is no longer usable and needs to be recreated.
- networkClient.close(brokerNode.idString)
- }
- }
从该方法的实现中我们可以得到两点信息:
- 每次调用
doWork()
方法都会从阻塞队列中取出一个QueueItem元素,根据该QueueItem元素构造请求并进行发送。 - 该线程对象在发送请求时依旧是通过NetworkClientBlockingOps扩展的阻塞方法进行发送并完成响应的处理,每次发送如果未成功会退避一段时间后重试。
ControllerChannelManager中定义了一些方法用于管理brokerStateInfo
字典,这些方法在操作brokerStateInfo
无一例外地进行了加锁同步处理以保证线程安全:
- // kafka.controller.ControllerChannelManager#addBroker
- // 添加Broker以管理
- def addBroker(broker: Broker) {
- // be careful here. Maybe the startup() API has already started the request send thread
- brokerLock synchronized { // 加锁
- if(!brokerStateInfo.contains(broker.id)) { // 当前brokerStateInfo不存在该Broker
- // 使用addNewBroker()方法添加
- addNewBroker(broker)
- // 启动对应的RequestSendThread线程
- startRequestSendThread(broker.id)
- }
- }
- }
- // kafka.controller.ControllerChannelManager#addNewBroker
- // 实现对brokerStateInfo集合的管理
- private def addNewBroker(broker: Broker) {
- // 创建消息队列
- val messageQueue = new LinkedBlockingQueue[QueueItem]
- debug("Controller %d trying to connect to broker %d".format(config.brokerId, broker.id))
- // 从配置构建BrokerEndPoint及BrokerNode
- val brokerEndPoint = broker.getBrokerEndPoint(config.interBrokerSecurityProtocol)
- val brokerNode = new Node(broker.id, brokerEndPoint.host, brokerEndPoint.port)
- // 构建网络通信组件
- val networkClient = {
- val channelBuilder = ChannelBuilders.create(
- config.interBrokerSecurityProtocol,
- Mode.CLIENT,
- LoginType.SERVER,
- config.values,
- config.saslMechanismInterBrokerProtocol,
- config.saslInterBrokerHandshakeRequestEnable
- )
- val selector = new Selector(
- NetworkReceive.UNLIMITED,
- config.connectionsMaxIdleMs,
- metrics,
- time,
- "controller-channel",
- Map("broker-id" -> broker.id.toString).asJava,
- false,
- channelBuilder
- )
- new NetworkClient(
- selector,
- new ManualMetadataUpdater(Seq(brokerNode).asJava),
- config.brokerId.toString,
- 1,
- 0,
- Selectable.USE_DEFAULT_BUFFER_SIZE,
- Selectable.USE_DEFAULT_BUFFER_SIZE,
- config.requestTimeoutMs,
- time
- )
- }
- /**
- * 生成RequestSendThread线程的名称,格式为:
- * threadNamePrefix:Controller-源brokerId-to-broker-目的brokerId-send-thread
- */
- val threadName = threadNamePrefix match {
- case None => "Controller-%d-to-broker-%d-send-thread".format(config.brokerId, broker.id)
- case Some(name) => "%s:Controller-%d-to-broker-%d-send-thread".format(name, config.brokerId, broker.id)
- }
- // 创建RequestSendThread线程
- val requestThread = new RequestSendThread(config.brokerId, controllerContext, messageQueue, networkClient,
- brokerNode, config, time, threadName)
- requestThread.setDaemon(false)
- // 将网络通信组件、Broker节点对象、消息队列和RequestSendThread线程封装为一个ControllerBrokerStateInfo对象,并放入brokerStateInfo
- brokerStateInfo.put(broker.id, new ControllerBrokerStateInfo(networkClient, brokerNode, messageQueue, requestThread))
- }
- // kafka.controller.ControllerChannelManager#removeBroker
- // 移除Broker
- def removeBroker(brokerId: Int) {
- brokerLock synchronized { // 加锁
- // 获取Broker对应的ControllerBrokerStateInfo对象,然后交给removeExistingBroker()方法处理
- removeExistingBroker(brokerStateInfo(brokerId))
- }
- }
- // kafka.controller.ControllerChannelManager#removeExistingBroker
- private def removeExistingBroker(brokerState: ControllerBrokerStateInfo) {
- try {
- // 关闭网络通信组件
- brokerState.networkClient.close()
- // 清空消息队列
- brokerState.messageQueue.clear()
- // 关闭RequestSendThread线程
- brokerState.requestSendThread.shutdown()
- // 从brokerStateInfo中移除对应的ControllerBrokerStateInfo对象
- brokerStateInfo.remove(brokerState.brokerNode.id)
- } catch {
- case e: Throwable => error("Error while removing broker by the controller", e)
- }
- }
而ControllerChannelManager在初始化时,就会为ControllerContext中记录的可用Broker创建对应的ControllerBrokerStateInfo对象,该操作通过下面的代码实现:
- // 为Broker创建对应的ControllerBrokerStateInfo对象
- controllerContext.liveBrokers.foreach(addNewBroker(_))
同时在ControllerChannelManager的startup()
方法中会启动brokerStateInfo
字典里每个Broker对应的RequestSendThread线程:
- // kafka.controller.ControllerChannelManager#startup
- def startup() = {
- brokerLock synchronized {
- // 启动RequestSendThread线程
- brokerStateInfo.foreach(brokerState => startRequestSendThread(brokerState._1))
- }
- }
- // kafka.controller.ControllerChannelManager#startRequestSendThread
- // 根据brokerId启动Broker对应的RequestSendThread线程
- protected def startRequestSendThread(brokerId: Int) {
- // 从brokerStateInfo获取brokerId对应的RequestSendThread线程
- val requestThread = brokerStateInfo(brokerId).requestSendThread
- if(requestThread.getState == Thread.State.NEW) // 如果该线程的状态为NEW,就将其启动
- requestThread.start()
- }
4. ControllerBrokerRequestBatch
ControllerBrokerRequestBatch用于实现批量发送请求的功能,它内部维护了三个字典分别存储了LeaderAndIsrRequest、StopReplicaRequest和UpdateMetadataRequest三类请求;ControllerBrokerRequestBatch的定义和重要字段如下:
- // kafka.controller.ControllerBrokerRequestBatch
- /**
- * ControllerBrokerRequestBatch组件用于实现批量发送请求的功能
- * @param controller
- */
- class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging {
- val controllerContext = controller.controllerContext
- val controllerId: Int = controller.config.brokerId
- // 记录了发往指定Broker的LeaderAndIsrRequest所需的信息
- val leaderAndIsrRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, PartitionStateInfo]]
- // 记录了发往指定Broker的StopReplicaRequest所需的信息
- val stopReplicaRequestMap = mutable.Map.empty[Int, Seq[StopReplicaRequestInfo]]
- // 记录了发往指定Broker的UpdateMetadataRequest集合
- val updateMetadataRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, PartitionStateInfo]]
- ...
- }
这三个字典中使用了PartitionStateInfo表示LeaderAndIsrRequest和UpdateMetadataRequest所需的信息,使用StopReplicaRequestInfo表示StopReplicaRequest所需的信息,它们的定义如下:
- // kafka.api.PartitionStateInfo
- case class PartitionStateInfo(leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, // ISR及Controller年代
- allReplicas: Set[Int]) { // AR集合
- def replicationFactor = allReplicas.size
- def writeTo(buffer: ByteBuffer) {
- buffer.putInt(leaderIsrAndControllerEpoch.controllerEpoch) // Controller年代
- buffer.putInt(leaderIsrAndControllerEpoch.leaderAndIsr.leader) // Leader
- buffer.putInt(leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch) // Leader年代
- buffer.putInt(leaderIsrAndControllerEpoch.leaderAndIsr.isr.size) // ISR大小
- leaderIsrAndControllerEpoch.leaderAndIsr.isr.foreach(buffer.putInt(_)) // ISR
- buffer.putInt(leaderIsrAndControllerEpoch.leaderAndIsr.zkVersion) // Zookeeper版本信息
- buffer.putInt(replicationFactor)
- allReplicas.foreach(buffer.putInt(_))
- }
- def sizeInBytes(): Int = {
- val size =
- 4 /* epoch of the controller that elected the leader */ +
- 4 /* leader broker id */ +
- 4 /* leader epoch */ +
- 4 /* number of replicas in isr */ +
- 4 * leaderIsrAndControllerEpoch.leaderAndIsr.isr.size /* replicas in isr */ +
- 4 /* zk version */ +
- 4 /* replication factor */ +
- allReplicas.size * 4
- size
- }
- override def toString(): String = {
- val partitionStateInfo = new StringBuilder
- partitionStateInfo.append("(LeaderAndIsrInfo:" + leaderIsrAndControllerEpoch.toString)
- partitionStateInfo.append(",ReplicationFactor:" + replicationFactor + ")")
- partitionStateInfo.append(",AllReplicas:" + allReplicas.mkString(",") + ")")
- partitionStateInfo.toString()
- }
- }
- // kafka.controller.StopReplicaRequestInfo
- case class StopReplicaRequestInfo(replica: PartitionAndReplica, deletePartition: Boolean, callback: AbstractRequestResponse => Unit = null)
- // kafka.controller.PartitionAndReplica
- /**
- * @param topic 主题
- * @param partition 分区
- * @param replica 副本
- */
- case class PartitionAndReplica(topic: String, partition: Int, replica: Int) {
- override def toString(): String = {
- "[Topic=%s,Partition=%d,Replica=%d]".format(topic, partition, replica)
- }
- }
ControllerBrokerRequestBatch的newBatch()
方法用于检测三个字典是否为空,其中有一个不为空就会抛出IllegalStateException异常:
- // kafka.controller.ControllerBrokerRequestBatch#newBatch
- // 用于检测leaderAndIsrRequestMap、stopReplicaRequestMap、updateMetadataRequestMap,有一个不为空就会抛出异常
- def newBatch() {
- // raise error if the previous batch is not empty
- if (leaderAndIsrRequestMap.size > 0)
- throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating " +
- "a new one. Some LeaderAndIsr state changes %s might be lost ".format(leaderAndIsrRequestMap.toString()))
- if (stopReplicaRequestMap.size > 0)
- throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating a " +
- "new one. Some StopReplica state changes %s might be lost ".format(stopReplicaRequestMap.toString()))
- if (updateMetadataRequestMap.size > 0)
- throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating a " +
- "new one. Some UpdateMetadata state changes %s might be lost ".format(updateMetadataRequestMap.toString()))
- }
而相对的,clear()
方法用于清空三个字典:
- // 清空leaderAndIsrRequestMap、stopReplicaRequestMap、updateMetadataRequestMap
- def clear() {
- leaderAndIsrRequestMap.clear()
- stopReplicaRequestMap.clear()
- updateMetadataRequestMap.clear()
- }
ControllerBrokerRequestBatch中其它的几个方法的职责都非常分明;addLeaderAndIsrRequestForBrokers(...)
方法用于向leaderAndIsrRequestMap
集合中添加待发送的LeaderAndIsrRequest所需的数据,源码如下:
- // kafka.controller.ControllerBrokerRequestBatch#addLeaderAndIsrRequestForBrokers
- /**
- * 向leaderAndIsrRequestMap集合中添加待发送的LeaderAndIsrRequest所需的数据,
- * 同时调用addUpdateMetadataRequestForBrokers()方法准备向集群中所有可用的Broker发送UpdateMetadataRequest
- * @param brokerIds 接收LeaderAndIsrRequest的Broker的ID集合
- * @param topic 主题
- * @param partition 分区
- * @param leaderIsrAndControllerEpoch 年代信息
- * @param replicas 新分配的副本集
- * @param callback 回调
- */
- def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int,
- leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
- replicas: Seq[Int], callback: AbstractRequestResponse => Unit = null) {
- // 通过主题和分区构造TopicPartition对象
- val topicPartition = new TopicPartition(topic, partition)
- // 对Broker的ID集合进行有效性过滤,然后进行遍历
- brokerIds.filter(_ >= 0).foreach { brokerId =>
- // 从leaderAndIsrRequestMap获取发往指定Broker的LeaderAndIsrRequest所需的信息
- val result = leaderAndIsrRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty)
- // 添加Leader、ISR、AR等构造LeaderAndIsrRequest请求所需要的信息
- result.put(topicPartition, PartitionStateInfo(leaderIsrAndControllerEpoch, replicas.toSet))
- }
- // 准备向所有可用的Broker发送UpdateMetadataRequest请求
- addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq,
- Set(TopicAndPartition(topic, partition)))
- }
该方法的实现非常简单,只需要将LeaderIsrAndControllerEpoch对象添加到每个Broker对应的字典中即可,注意LeaderIsrAndControllerEpoch对象是面向于分区的;最后该方法会调用addUpdateMetadataRequestForBrokers(...)
方法准备向所有可用的Broker发送UpdateMetadataRequest请求,这是由于LeaderIsrAndControllerEpoch请求可能会更新Leader副本等信息,因此需要告知其它的Broker更新元数据;addUpdateMetadataRequestForBrokers(...)
方法的源码如下:
- // kafka.controller.ControllerBrokerRequestBatch#addUpdateMetadataRequestForBrokers
- /**
- * Send UpdateMetadataRequest to the given brokers for the given partitions and partitions that are being deleted
- * 向给定的Broker发送UpdateMetadataRequest请求
- * */
- def addUpdateMetadataRequestForBrokers(brokerIds: Seq[Int],
- partitions: collection.Set[TopicAndPartition] = Set.empty[TopicAndPartition],
- callback: AbstractRequestResponse => Unit = null) {
- // 定义回调函数
- def updateMetadataRequestMapFor(partition: TopicAndPartition, beingDeleted: Boolean) {
- // 找出Controller中保存的该分区的Leader
- val leaderIsrAndControllerEpochOpt = controllerContext.partitionLeadershipInfo.get(partition)
- leaderIsrAndControllerEpochOpt match {
- case Some(leaderIsrAndControllerEpoch) =>
- // 获取分区的AR集合
- val replicas = controllerContext.partitionReplicaAssignment(partition).toSet
- // 根据beingDeleted参数,构造PartitionStateInfo对象
- val partitionStateInfo = if (beingDeleted) {
- val leaderAndIsr = new LeaderAndIsr(LeaderAndIsr.LeaderDuringDelete, leaderIsrAndControllerEpoch.leaderAndIsr.isr)
- PartitionStateInfo(LeaderIsrAndControllerEpoch(leaderAndIsr, leaderIsrAndControllerEpoch.controllerEpoch), replicas)
- } else {
- PartitionStateInfo(leaderIsrAndControllerEpoch, replicas)
- }
- // 对Broker的ID集合进行有效性过滤,然后进行遍历,向updateMetadataRequestMap中添加数据
- brokerIds.filter(b => b >= 0).foreach { brokerId =>
- updateMetadataRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty[TopicPartition, PartitionStateInfo])
- updateMetadataRequestMap(brokerId).put(new TopicPartition(partition.topic, partition.partition), partitionStateInfo)
- }
- case None =>
- info("Leader not yet assigned for partition %s. Skip sending UpdateMetadataRequest.".format(partition))
- }
- }
- val filteredPartitions = {
- val givenPartitions = if (partitions.isEmpty)
- // 如果给定的Partition集合为空,返回空键集
- controllerContext.partitionLeadershipInfo.keySet
- else
- partitions
- if (controller.deleteTopicManager.partitionsToBeDeleted.isEmpty)
- givenPartitions
- else
- // 过滤即将被删除的Partition
- givenPartitions -- controller.deleteTopicManager.partitionsToBeDeleted
- }
- if (filteredPartitions.isEmpty) // 最后得到的Partition集合为空
- // 更新updateMetadataRequestMap,每个BrokerID对应一个空的字典
- brokerIds.filter(b => b >= 0).foreach { brokerId =>
- updateMetadataRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty[TopicPartition, PartitionStateInfo])
- }
- else // 最后得到的Partition集合不为空
- // 调用updateMetadataRequestMapFor()方法将filteredPartitions中的分区信息添加到updateMetadataRequestMap集合中,等待发送
- filteredPartitions.foreach(partition => updateMetadataRequestMapFor(partition, beingDeleted = false))
- // 将即将被删除的分区信息添加到updateMetadataRequestMap集合中,等待发送,beingDeleted参数为true
- controller.deleteTopicManager.partitionsToBeDeleted.foreach(partition => updateMetadataRequestMapFor(partition, beingDeleted = true))
- }
addStopReplicaRequestForBrokers()
方法则会向stopReplicaRequestMap集合中添加StopReplicaRequest所需的数据
- // kafka.controller.ControllerBrokerRequestBatch#addStopReplicaRequestForBrokers
- // 向stopReplicaRequestMap集合添加StopReplicaRequest请求
- def addStopReplicaRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, deletePartition: Boolean,
- callback: (AbstractRequestResponse, Int) => Unit = null) {
- // 对Broker的ID集合进行有效性过滤,然后进行遍历
- brokerIds.filter(b => b >= 0).foreach { brokerId =>
- // 从stopReplicaRequestMap获取发往Broker的StopReplicaRequestInfo集合,如果获取不到就更新为空集合
- stopReplicaRequestMap.getOrElseUpdate(brokerId, Seq.empty[StopReplicaRequestInfo])
- // 获取发往Broker的StopReplicaRequestInfo集合v
- val v = stopReplicaRequestMap(brokerId)
- // 根据回调情况构造StopReplicaRequestInfo并添加到集合v中
- if(callback != null)
- stopReplicaRequestMap(brokerId) = v :+ StopReplicaRequestInfo(PartitionAndReplica(topic, partition, brokerId),
- deletePartition, (r: AbstractRequestResponse) => callback(r, brokerId))
- else
- stopReplicaRequestMap(brokerId) = v :+ StopReplicaRequestInfo(PartitionAndReplica(topic, partition, brokerId),
- deletePartition)
- }
- }
sendRequestsToBrokers()
方法使用上述三个集合中的数据来创建相应的请求,并添加到ControllerChannelManager中对应的消息队列中,最终由RequestSendThread线程发送请求,源码如下:
- /**
- * 根据leaderAndIsrRequestMap、stopReplicaRequestMap、updateMetadataRequestMap中的数据创建对应的请求
- * 并添加到ControllerChannelManager中对应的消息队列中,最终由RequestSendThread线程发送这些请求
- */
- def sendRequestsToBrokers(controllerEpoch: Int) {
- try {
- // 遍历leaderAndIsrRequestMap集合
- leaderAndIsrRequestMap.foreach { case (broker, partitionStateInfos) =>
- // 日志记录Leader和Follower分配的情况
- partitionStateInfos.foreach { case (topicPartition, state) =>
- val typeOfRequest = if (broker == state.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower"
- stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request %s to broker %d " +
- "for partition [%s,%d]").format(controllerId, controllerEpoch, typeOfRequest,
- state.leaderIsrAndControllerEpoch, broker,
- topicPartition.topic, topicPartition.partition))
- }
- // Leader角色的ID集合
- val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet
- // 从可用Broker集合中过滤得到Leader角色所属的Node节点对象
- val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map {
- _.getNode(controller.config.interBrokerSecurityProtocol)
- }
- // 根据Leader角色集合构建对应的Map[TopicPartition, PartitionState]对象
- val partitionStates = partitionStateInfos.map { case (topicPartition, partitionStateInfo) =>
- // 解析得到Leader的ID、年代信息、ISR集合及Controller年代信息
- val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch
- // 构建PartitionState对象
- val partitionState = new requests.PartitionState(controllerEpoch, leaderIsr.leader,
- leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion,
- partitionStateInfo.allReplicas.map(Integer.valueOf).asJava
- )
- topicPartition -> partitionState
- }
- // 创建LeaderAndIsrRequest请求对象
- val leaderAndIsrRequest = new LeaderAndIsrRequest(controllerId, controllerEpoch, partitionStates.asJava, leaders.asJava)
- // 使用KafkaController发送请求
- controller.sendRequest(broker, ApiKeys.LEADER_AND_ISR, None, leaderAndIsrRequest, null)
- }
- // 清空leaderAndIsrRequestMap集合
- leaderAndIsrRequestMap.clear()
- // 遍历updateMetadataRequestMap集合
- updateMetadataRequestMap.foreach { case (broker, partitionStateInfos) =>
- // 遍历每个Broker对应的Map[TopicPartition, PartitionStateInfo]字典,打印日志
- partitionStateInfos.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request %s " +
- "to broker %d for partition %s").format(controllerId, controllerEpoch, p._2.leaderIsrAndControllerEpoch,
- broker, p._1)))
- // 为每个Broker对应的Map[TopicPartition, PartitionStateInfo]字典构建对应的Map[TopicPartition, PartitionState]对象
- val partitionStates = partitionStateInfos.map { case (topicPartition, partitionStateInfo) =>
- val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch
- val partitionState = new requests.PartitionState(controllerEpoch, leaderIsr.leader,
- leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion,
- partitionStateInfo.allReplicas.map(Integer.valueOf).asJava
- )
- topicPartition -> partitionState
- }
- // Kafka版本判断
- val version = if (controller.config.interBrokerProtocolVersion >= KAFKA_0_10_0_IV1) 2: Short
- else if (controller.config.interBrokerProtocolVersion >= KAFKA_0_9_0) 1: Short
- else 0: Short
- // 根据Kafka版本构建UpdateMetadataRequest请求对象
- val updateMetadataRequest =
- if (version == 0) {
- // 可用Broker集合
- val liveBrokers = controllerContext.liveOrShuttingDownBrokers.map(_.getNode(SecurityProtocol.PLAINTEXT))
- // 构建UpdateMetadataRequest请求对象
- new UpdateMetadataRequest(controllerId, controllerEpoch, liveBrokers.asJava, partitionStates.asJava)
- }
- else {
- // 遍历可用Broker集合
- val liveBrokers = controllerContext.liveOrShuttingDownBrokers.map { broker =>
- // 得到每个Broker的EndPoint
- val endPoints = broker.endPoints.map { case (securityProtocol, endPoint) =>
- securityProtocol -> new UpdateMetadataRequest.EndPoint(endPoint.host, endPoint.port)
- }
- // 根据BrokerID、EndPoint、Broker的机架信息构建对应的Broker对象
- new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava, broker.rack.orNull)
- }
- // 构建UpdateMetadataRequest请求对象
- new UpdateMetadataRequest(version, controllerId, controllerEpoch, partitionStates.asJava, liveBrokers.asJava)
- }
- // 使用KafkaController发送请求
- controller.sendRequest(broker, ApiKeys.UPDATE_METADATA_KEY, Some(version), updateMetadataRequest, null)
- }
- // 清空updateMetadataRequestMap集合
- updateMetadataRequestMap.clear()
- // 遍历stopReplicaRequestMap集合
- stopReplicaRequestMap.foreach { case (broker, replicaInfoList) =>
- // 日志记录被删除和保留的副本集合
- val stopReplicaWithDelete = replicaInfoList.filter(_.deletePartition).map(_.replica).toSet
- val stopReplicaWithoutDelete = replicaInfoList.filterNot(_.deletePartition).map(_.replica).toSet
- debug("The stop replica request (delete = true) sent to broker %d is %s"
- .format(broker, stopReplicaWithDelete.mkString(",")))
- debug("The stop replica request (delete = false) sent to broker %d is %s"
- .format(broker, stopReplicaWithoutDelete.mkString(",")))
- replicaInfoList.foreach { r =>
- // 构建StopReplicaRequest请求
- val stopReplicaRequest = new StopReplicaRequest(controllerId, controllerEpoch, r.deletePartition,
- Set(new TopicPartition(r.replica.topic, r.replica.partition)).asJava)
- // 使用KafkaController发送请求
- controller.sendRequest(broker, ApiKeys.STOP_REPLICA, None, stopReplicaRequest, r.callback)
- }
- }
- // 清空stopReplicaRequestMap集合
- stopReplicaRequestMap.clear()
- } catch {
- case e : Throwable => {
- if (leaderAndIsrRequestMap.size > 0) {
- error("Haven't been able to send leader and isr requests, current state of " +
- s"the map is $leaderAndIsrRequestMap. Exception message: $e")
- }
- if (updateMetadataRequestMap.size > 0) {
- error("Haven't been able to send metadata update requests, current state of " +
- s"the map is $updateMetadataRequestMap. Exception message: $e")
- }
- if (stopReplicaRequestMap.size > 0) {
- error("Haven't been able to send stop replica requests, current state of " +
- s"the map is $stopReplicaRequestMap. Exception message: $e")
- }
- throw new IllegalStateException(e)
- }
- }
- }
有了对ControllerChannelManager、ControllerBrokerStateInfo及ControllerBrokerRequestBatch的理解,我们可以得到下面的协作图:
推荐阅读
Java多线程 46 - ScheduledThreadPoolExecutor详解(2)
ScheduledThreadPoolExecutor用于执行周期性或延时性的定时任务,它是在ThreadPoolExe...