大数据
流式处理
Kafka
源码解析

Kafka系列 17 - 服务端源码分析 08:副本的管理

简介:讲解ReplicaManager对副本的管理,以及消息同步机制

1. ReplicaManager类

在一个Broker上可能分布着多个Partition的副本信息,ReplicaManager的主要功能是管理一个Broker范围内的Partition信息。ReplicaManager的实现依赖于前面介绍的日志存储子系统、DelayedOperationPurgatory、KafkaScheduler等组件,底层依赖于Partition和Replica;它的定义如下:

  • // kafka.server.ReplicaManager
  • /**
  • * @param config
  • * @param metrics
  • * @param time
  • * @param jTime
  • * @param zkUtils 操作Zookeeper的辅助对象
  • * @param scheduler KafkaScheduler调度器
  • * @param logManager LogManager对象,用于对日志系统进行读写
  • * @param isShuttingDown
  • * @param threadNamePrefix
  • */
  • class ReplicaManager(val config: KafkaConfig,
  • metrics: Metrics,
  • time: Time,
  • jTime: JTime,
  • val zkUtils: ZkUtils,
  • scheduler: Scheduler,
  • val logManager: LogManager,
  • val isShuttingDown: AtomicBoolean,
  • threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
  • /* epoch of the controller that last changed the leader */
  • // KafkaController年代信息,在重新选举Controller Leader时该字段值会递增
  • @volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
  • // 当前broker的ID,用于查找本地副本
  • private val localBrokerId = config.brokerId
  • // 保存了当前broker上分配的所有Partition信息
  • private val allPartitions = new Pool[(String, Int), Partition](valueFactory = Some { case (t, p) =>
  • // 当找不到键对应的Partition时会使用该valueFactory创建一个并放入Pool中
  • new Partition(t, p, time, this)
  • })
  • private val replicaStateChangeLock = new Object
  • /**
  • * ReplicaFetcherManager中管理了多个ReplicaFetchThread线程,
  • * ReplicaFetchThread线程会向Leader副本发送FetchRequest请求来同步Follower副本的消息数据
  • */
  • val replicaFetcherManager = new ReplicaFetcherManager(config, this, metrics, jTime, threadNamePrefix)
  • private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
  • // 存储了每个数据目录及映射的HighWatermark Checkpoint文件,HighWatermark Checkpoint文件记录了每个分区的HighWatermark线
  • val highWatermarkCheckpoints = config.logDirs.map(dir => (new File(dir).getAbsolutePath, new OffsetCheckpoint(new File(dir, ReplicaManager.HighWatermarkFilename)))).toMap
  • private var hwThreadInitialized = false
  • this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: "
  • val stateChangeLogger = KafkaController.stateChangeLogger
  • // 用于记录ISR集合发生变化的分区信息
  • private val isrChangeSet: mutable.Set[TopicAndPartition] = new mutable.HashSet[TopicAndPartition]()
  • private val lastIsrChangeMs = new AtomicLong(System.currentTimeMillis())
  • private val lastIsrPropagationMs = new AtomicLong(System.currentTimeMillis())
  • // 生产请求的DelayedOperationPurgatory
  • val delayedProducePurgatory = DelayedOperationPurgatory[DelayedProduce](
  • purgatoryName = "Produce", config.brokerId, config.producerPurgatoryPurgeIntervalRequests)
  • // 拉取请求的DelayedOperationPurgatory
  • val delayedFetchPurgatory = DelayedOperationPurgatory[DelayedFetch](
  • purgatoryName = "Fetch", config.brokerId, config.fetchPurgatoryPurgeIntervalRequests)
  • ...
  • }

在上面的字段中Pool类型的allPartitions保存了当前Broker上的所有分区信息,键是由主题和分区组成的Tuple,值为Partition对象,需要注意的是allPartitions提供了valueFactory值工厂,用于在找不到对应的Partition对象时直接进行创建。replicaFetcherManager是ReplicaFetcherManager类型的线程管理类,管理了多个ReplicaFetchThread线程,ReplicaFetchThread线程会向Leader副本发送FetchRequest请求来同步Follower副本的消息数据,该组件会在后面详细介绍。

2. 副本角色切换

在Kafka集群中会选举一个Broker成为KafkaController的Leader,它负责管理整个Kafka集群。Controller Leader根据Partition的Leader副本和Follower副本的状态向对应的Broker节点发送LeaderAndIsrRequest请求用于副本的角色切换,指导Broker将其上的哪些分区的副本切换成Leader角色,哪些分区的副本切换成Follower角色。

在KafkaApis中,对LeaderAndIsrRequest请求的处理都交给了handleLeaderAndIsrRequest(request: RequestChannel.Request)方法,源码如下:

  • // kafka.server.KafkaApis#handleLeaderAndIsrRequest
  • // 用于操作Leader或ISR的请求
  • def handleLeaderAndIsrRequest(request: RequestChannel.Request) {
  • // ensureTopicExists is only for client facing requests
  • // We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
  • // stop serving data to clients for the topic being deleted
  • val correlationId = request.header.correlationId
  • // 转换请求为LeaderAndIsrRequest
  • val leaderAndIsrRequest = request.body.asInstanceOf[LeaderAndIsrRequest]
  • try {
  • def onLeadershipChange(updatedLeaders: Iterable[Partition], updatedFollowers: Iterable[Partition]) {
  • // for each new leader or follower, call coordinator to handle consumer group migration.
  • // this callback is invoked under the replica state change lock to ensure proper order of
  • // leadership changes
  • // 处理Group迁移
  • updatedLeaders.foreach { partition =>
  • if (partition.topic == TopicConstants.GROUP_METADATA_TOPIC_NAME)
  • coordinator.handleGroupImmigration(partition.partitionId)
  • }
  • updatedFollowers.foreach { partition =>
  • if (partition.topic == TopicConstants.GROUP_METADATA_TOPIC_NAME)
  • coordinator.handleGroupEmigration(partition.partitionId)
  • }
  • }
  • // 根据correlationId构造响应头,请求端会根据响应的correlationId以匹配发送的请求对象
  • val responseHeader = new ResponseHeader(correlationId)
  • val leaderAndIsrResponse =
  • if (authorize(request.session, ClusterAction, Resource.ClusterResource)) { // 检查授权
  • // 授权通过,调用ReplicaManager的becomeLeaderOrFollower(...)方法进行处理,注意此处会传入上面定义的onLeadershipChange回调方法
  • val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, metadataCache, onLeadershipChange)
  • // 将处理的结果构造为LeaderAndIsrResponse响应对象
  • new LeaderAndIsrResponse(result.errorCode, result.responseMap.mapValues(new JShort(_)).asJava)
  • } else {
  • // 授权未通过,向leaderAndIsrRequest中记录CLUSTER_AUTHORIZATION_FAILED错误码
  • val result = leaderAndIsrRequest.partitionStates.asScala.keys.map((_, new JShort(Errors.CLUSTER_AUTHORIZATION_FAILED.code))).toMap
  • // 构造LeaderAndIsrResponse响应,错误码为CLUSTER_AUTHORIZATION_FAILED
  • new LeaderAndIsrResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.code, result.asJava)
  • }
  • // 使用RequestChannel发送响应
  • requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, responseHeader, leaderAndIsrResponse)))
  • } catch {
  • case e: KafkaStorageException =>
  • fatal("Disk error during leadership change.", e)
  • Runtime.getRuntime.halt(1)
  • }
  • }

根据处理流程,最终解析出来的leaderAndIsrRequest是会交给ReplicaManager的becomeLeaderOrFollower(...)方法处理的,该方法源码如下:

  • // kafka.server.ReplicaManager#becomeLeaderOrFollower
  • // 负责副本角色切换
  • def becomeLeaderOrFollower(correlationId: Int,
  • leaderAndISRRequest: LeaderAndIsrRequest,
  • metadataCache: MetadataCache,
  • onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): BecomeLeaderOrFollowerResult = {
  • // 记录日志
  • leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
  • stateChangeLogger.trace("Broker %d received LeaderAndIsr request %s correlation id %d from controller %d epoch %d for partition [%s,%d]"
  • .format(localBrokerId, stateInfo, correlationId,
  • leaderAndISRRequest.controllerId, leaderAndISRRequest.controllerEpoch, topicPartition.topic, topicPartition.partition))
  • }
  • replicaStateChangeLock synchronized { // 加锁
  • // 新建一个HashMap用于记录产生的错误码
  • val responseMap = new mutable.HashMap[TopicPartition, Short]
  • if (leaderAndISRRequest.controllerEpoch < controllerEpoch) { // 检查请求中的Controller的年代信息
  • // 如果请求中的Controller年代信息小于当前的年代信息,直接返回STALE_CONTROLLER_EPOCH异常吗
  • // 日志处理
  • leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
  • stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d since " +
  • "its controller epoch %d is old. Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.controllerId,
  • correlationId, leaderAndISRRequest.controllerEpoch, controllerEpoch))
  • }
  • // 构造BecomeLeaderOrFollowerResult对象用于返回,错误码为STALE_CONTROLLER_EPOCH
  • BecomeLeaderOrFollowerResult(responseMap, Errors.STALE_CONTROLLER_EPOCH.code)
  • } else {
  • // 年代信息符合要求
  • // 记录controllerId
  • val controllerId = leaderAndISRRequest.controllerId
  • // 更新Controller的年代信息
  • controllerEpoch = leaderAndISRRequest.controllerEpoch
  • // First check partition's leader epoch
  • // 构造一个HashMap用于记录对应的分区状态
  • val partitionState = new mutable.HashMap[Partition, PartitionState]()
  • // 遍历请求中的partitionStates
  • leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
  • // 获取对应主题分区的Partition对象,如果没有就创建
  • val partition = getOrCreatePartition(topicPartition.topic, topicPartition.partition)
  • // 获取Partition的Leader副本年代信息
  • val partitionLeaderEpoch = partition.getLeaderEpoch()
  • // If the leader epoch is valid record the epoch of the controller that made the leadership decision.
  • // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
  • if (partitionLeaderEpoch < stateInfo.leaderEpoch) { // 判断当前Partition的年代信息是否合法
  • // 判断该分区的副本是否被分配到了当前的broker
  • if(stateInfo.replicas.contains(config.brokerId))
  • // 保留与当前broker相关的Partition以及PartitionState
  • partitionState.put(partition, stateInfo)
  • else {
  • // 否则记录日志并在responseMap中记录UNKNOWN_TOPIC_OR_PARTITION错误码
  • stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " +
  • "epoch %d for partition [%s,%d] as itself is not in assigned replica list %s")
  • .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch,
  • topicPartition.topic, topicPartition.partition, stateInfo.replicas.asScala.mkString(",")))
  • responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
  • }
  • } else {
  • // Otherwise record the error code in response
  • // 当前Partition的年代信息不合法,记录日志,在responseMap中记录STALE_CONTROLLER_EPOCH错误码
  • stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " +
  • "epoch %d for partition [%s,%d] since its associated leader epoch %d is old. Current leader epoch is %d")
  • .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch,
  • topicPartition.topic, topicPartition.partition, stateInfo.leaderEpoch, partitionLeaderEpoch))
  • responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH.code)
  • }
  • }
  • // 根据PartitionState中指定的角色进行分类
  • val partitionsTobeLeader = partitionState.filter { case (partition, stateInfo) =>
  • stateInfo.leader == config.brokerId
  • }
  • val partitionsToBeFollower = (partitionState -- partitionsTobeLeader.keys)
  • // 根据分配结果切换Leader和Follower副本
  • val partitionsBecomeLeader = if (!partitionsTobeLeader.isEmpty)
  • // 切换Leader副本
  • makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, correlationId, responseMap)
  • else
  • Set.empty[Partition]
  • val partitionsBecomeFollower = if (!partitionsToBeFollower.isEmpty)
  • // 切换Follower副本
  • makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap, metadataCache)
  • else
  • Set.empty[Partition]
  • // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions
  • // have been completely populated before starting the checkpointing there by avoiding weird race conditions
  • // 启动HighWatermark Checkpoint任务
  • if (!hwThreadInitialized) {
  • startHighWaterMarksCheckPointThread()
  • hwThreadInitialized = true
  • }
  • replicaFetcherManager.shutdownIdleFetcherThreads()
  • // 执行onLeadershipChange回调函数
  • onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
  • // 构造BecomeLeaderOrFollowerResult结果对象,错误码为NONE
  • BecomeLeaderOrFollowerResult(responseMap, Errors.NONE.code)
  • }
  • }
  • }

leaderAndISRRequest对象的partitionStates中保存了分配的要求,ReplicaManager的becomeLeaderOrFollower(...)方法会从中分离出需要作为Leader和Follower的角色,然后分别交给自己的makeLeaders(...)makeFollowers(...)处理;makeLeaders(...)方法会将指定分区的本地副本切换为Leader角色,如果是从Follower副本切换成Leader副本,那么要先停止相关的Fetcher线程,之后调用Partition的makeLeader()方法完成切换。源码如下:

  • // kafka.server.ReplicaManager#makeLeaders
  • /*
  • * Make the current broker to become leader for a given set of partitions by:
  • *
  • * 1. Stop fetchers for these partitions
  • * 2. Update the partition metadata in cache
  • * 3. Add these partitions to the leader partitions set
  • *
  • * If an unexpected error is thrown in this function, it will be propagated to KafkaApis where
  • * the error message will be set on each partition since we do not know which partition caused it. Otherwise,
  • * return the set of partitions that are made leader due to this method
  • *
  • * TODO: the above may need to be fixed later
  • */
  • private def makeLeaders(controllerId: Int,
  • epoch: Int,
  • partitionState: Map[Partition, PartitionState],
  • correlationId: Int,
  • responseMap: mutable.Map[TopicPartition, Short]): Set[Partition] = {
  • // 记录日志
  • partitionState.foreach(state =>
  • stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " +
  • "starting the become-leader transition for partition %s")
  • .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId))))
  • // 初始化每个分区的错误码为NONE
  • for (partition <- partitionState.keys)
  • responseMap.put(new TopicPartition(partition.topic, partition.partitionId), Errors.NONE.code)
  • val partitionsToMakeLeaders: mutable.Set[Partition] = mutable.Set()
  • try {
  • // First stop fetchers for all the partitions
  • // 在此broker上的副本之前可能是Follower,需要先暂停它们的拉取操作
  • replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(new TopicAndPartition(_)))
  • // Update the partition information to be the leader
  • // 更新即将成为Leader的Partition信息
  • partitionState.foreach{ case (partition, partitionStateInfo) =>
  • // 使用Partition的makeLeader(...)方法将分区的本地副本切换为Leader
  • if (partition.makeLeader(controllerId, partitionStateInfo, correlationId))
  • // 记录成功从其他状态(第一次启动或Follower副本)切换成Leader副本的分区
  • partitionsToMakeLeaders += partition
  • else
  • // 日志记录,Leader所在的broker没有发生变化
  • stateChangeLogger.info(("Broker %d skipped the become-leader state change after marking its partition as leader with correlation id %d from " +
  • "controller %d epoch %d for partition %s since it is already the leader for the partition.")
  • .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(partition.topic, partition.partitionId)));
  • }
  • partitionsToMakeLeaders.foreach { partition =>
  • stateChangeLogger.trace(("Broker %d stopped fetchers as part of become-leader request from controller " +
  • "%d epoch %d with correlation id %d for partition %s")
  • .format(localBrokerId, controllerId, epoch, correlationId, TopicAndPartition(partition.topic, partition.partitionId)))
  • }
  • } catch {
  • case e: Throwable =>
  • partitionState.foreach { state =>
  • val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId %d received from controller %d" +
  • " epoch %d for partition %s").format(localBrokerId, correlationId, controllerId, epoch,
  • TopicAndPartition(state._1.topic, state._1.partitionId))
  • stateChangeLogger.error(errorMsg, e)
  • }
  • // Re-throw the exception for it to be caught in KafkaApis
  • throw e
  • }
  • partitionState.foreach { state =>
  • stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d " +
  • "for the become-leader transition for partition %s")
  • .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId)))
  • }
  • // 返回切换成Leader的副本
  • partitionsToMakeLeaders
  • }
  • // kafka.server.AbstractFetcherManager#fetcherThreadMap
  • // BrokerAndFetcherId类型的键中封装了broker的网络位置信息(brokerId、host、port等)以及对应的Fetcher线程的ID
  • private val fetcherThreadMap = new mutable.HashMap[BrokerAndFetcherId, AbstractFetcherThread]
  • // kafka.server.AbstractFetcherManager#removeFetcherForPartitions
  • def removeFetcherForPartitions(partitions: Set[TopicAndPartition]) {
  • mapLock synchronized { // 加锁
  • // 遍历fetcherThreadMap,移除对应的任务
  • for ((key, fetcher) <- fetcherThreadMap) {
  • fetcher.removePartitions(partitions)
  • }
  • }
  • info("Removed fetcher for partitions %s".format(partitions.mkString(",")))
  • }

ReplicaManager的makeFollowers(...)方法会将分区的本地副本切换为Follower副本,如果是从Leader副本切换为Follower副本,需要先检测新Leader副本是否存活,然后决定是否进行切换,同时需要在切换结束后先停止与旧Leader副本同步的Fetcher线程,然后对Log进行相应的截断处理,再启动与新Leader副本的Fetcher线程进行同步,最后还会尝试完成分区相关的延时操作;源码如下:

  • // kafka.server.ReplicaManager#makeFollowers
  • /*
  • * Make the current broker to become follower for a given set of partitions by:
  • *
  • * 1. Remove these partitions from the leader partitions set.
  • * 2. Mark the replicas as followers so that no more data can be added from the producer clients.
  • * 3. Stop fetchers for these partitions so that no more data can be added by the replica fetcher threads.
  • * 4. Truncate the log and checkpoint offsets for these partitions.
  • * 5. Clear the produce and fetch requests in the purgatory
  • * 6. If the broker is not shutting down, add the fetcher to the new leaders.
  • *
  • * The ordering of doing these steps make sure that the replicas in transition will not
  • * take any more messages before checkpointing offsets so that all messages before the checkpoint
  • * are guaranteed to be flushed to disks
  • *
  • * If an unexpected error is thrown in this function, it will be propagated to KafkaApis where
  • * the error message will be set on each partition since we do not know which partition caused it. Otherwise,
  • * return the set of partitions that are made follower due to this method
  • */
  • private def makeFollowers(controllerId: Int,
  • epoch: Int,
  • partitionState: Map[Partition, PartitionState],
  • correlationId: Int,
  • responseMap: mutable.Map[TopicPartition, Short],
  • metadataCache: MetadataCache) : Set[Partition] = {
  • partitionState.foreach { state =>
  • stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " +
  • "starting the become-follower transition for partition %s")
  • .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId)))
  • }
  • // 初始化每个分区对应的错误码为NONE
  • for (partition <- partitionState.keys)
  • responseMap.put(new TopicPartition(partition.topic, partition.partitionId), Errors.NONE.code)
  • // 用于记录切换为Follower副本的集合
  • val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set()
  • try {
  • // TODO: Delete leaders from LeaderAndIsrRequest
  • partitionState.foreach{ case (partition, partitionStateInfo) =>
  • // 检测新的Leader所在的broker是否存活
  • // 获取Leader副本所在broker节点的ID
  • val newLeaderBrokerId = partitionStateInfo.leader
  • // 从元数据中获取存活的broker集合,然后在其中查找Leader副本所在Broker的ID
  • metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match {
  • // Only change partition state when the leader is available
  • case Some(leaderBroker) => // 找到了
  • // 调用Partition的makeFollower(...)方法将Partition的本地副本切换为Follower副本
  • if (partition.makeFollower(controllerId, partitionStateInfo, correlationId))
  • // 记录成功从其他状态(第一次启动或Leader)切换到Follower副本的分区
  • partitionsToMakeFollower += partition
  • else
  • stateChangeLogger.info(("Broker %d skipped the become-follower state change after marking its partition as follower with correlation id %d from " +
  • "controller %d epoch %d for partition [%s,%d] since the new leader %d is the same as the old leader")
  • .format(localBrokerId, correlationId, controllerId, partitionStateInfo.controllerEpoch,
  • partition.topic, partition.partitionId, newLeaderBrokerId))
  • case None => // 没找到
  • // The leader broker should always be present in the metadata cache.
  • // If not, we should record the error message and abort the transition process for this partition
  • stateChangeLogger.error(("Broker %d received LeaderAndIsrRequest with correlation id %d from controller" +
  • " %d epoch %d for partition [%s,%d] but cannot become follower since the new leader %d is unavailable.")
  • .format(localBrokerId, correlationId, controllerId, partitionStateInfo.controllerEpoch,
  • partition.topic, partition.partitionId, newLeaderBrokerId))
  • // Create the local replica even if the leader is unavailable. This is required to ensure that we include
  • // the partition's high watermark in the checkpoint file (see KAFKA-1647)
  • // 即使Leader副本所在的broker不可用,也需要创建Local Replica
  • partition.getOrCreateReplica()
  • }
  • }
  • // 停止与旧Leader同步的拉取线程
  • replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(new TopicAndPartition(_)))
  • // 日志记录
  • partitionsToMakeFollower.foreach { partition =>
  • stateChangeLogger.trace(("Broker %d stopped fetchers as part of become-follower request from controller " +
  • "%d epoch %d with correlation id %d for partition %s")
  • .format(localBrokerId, controllerId, epoch, correlationId, TopicAndPartition(partition.topic, partition.partitionId)))
  • }
  • /**
  • * 由于Leader副本已发生变化,所以新旧Leader副本在HighWatermark ~ LogEndOffset之间的消息可能是不一致的,
  • * 但HighWatermark之前的消息是一致的,所以将Log截断到HighWatermark。
  • */
  • logManager.truncateTo(partitionsToMakeFollower.map(partition => (new TopicAndPartition(partition), partition.getOrCreateReplica().highWatermark.messageOffset)).toMap)
  • // 尝试完成与切换为Follower副本的分区相关的DelayedOperation延迟任务
  • partitionsToMakeFollower.foreach { partition =>
  • val topicPartitionOperationKey = new TopicPartitionOperationKey(partition.topic, partition.partitionId)
  • tryCompleteDelayedProduce(topicPartitionOperationKey)
  • tryCompleteDelayedFetch(topicPartitionOperationKey)
  • }
  • partitionsToMakeFollower.foreach { partition =>
  • stateChangeLogger.trace(("Broker %d truncated logs and checkpointed recovery boundaries for partition [%s,%d] as part of " +
  • "become-follower request with correlation id %d from controller %d epoch %d").format(localBrokerId,
  • partition.topic, partition.partitionId, correlationId, controllerId, epoch))
  • }
  • if (isShuttingDown.get()) { // 检测ReplicaManager的运行状态
  • partitionsToMakeFollower.foreach { partition =>
  • stateChangeLogger.trace(("Broker %d skipped the adding-fetcher step of the become-follower state change with correlation id %d from " +
  • "controller %d epoch %d for partition [%s,%d] since it is shutting down").format(localBrokerId, correlationId,
  • controllerId, epoch, partition.topic, partition.partitionId))
  • }
  • }
  • else {
  • // 重新开启与新Leader副本同步的拉取线程
  • // we do not need to check if the leader exists again since this has been done at the beginning of this process
  • // 将partitionsToMakeFollower映射为TopicAndPartition -> BrokerAndInitialOffset的字典
  • val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition =>
  • // TopicAndPartition -> BrokerAndInitialOffset的键值对
  • new TopicAndPartition(partition) -> BrokerAndInitialOffset(
  • metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.getBrokerEndPoint(config.interBrokerSecurityProtocol),
  • partition.getReplica().get.logEndOffset.messageOffset)).toMap
  • // 添加拉取任务
  • replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
  • partitionsToMakeFollower.foreach { partition =>
  • stateChangeLogger.trace(("Broker %d started fetcher to new leader as part of become-follower request from controller " +
  • "%d epoch %d with correlation id %d for partition [%s,%d]")
  • .format(localBrokerId, controllerId, epoch, correlationId, partition.topic, partition.partitionId))
  • }
  • }
  • } catch {
  • case e: Throwable =>
  • val errorMsg = ("Error on broker %d while processing LeaderAndIsr request with correlationId %d received from controller %d " +
  • "epoch %d").format(localBrokerId, correlationId, controllerId, epoch)
  • stateChangeLogger.error(errorMsg, e)
  • // Re-throw the exception for it to be caught in KafkaApis
  • throw e
  • }
  • partitionState.foreach { state =>
  • stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d " +
  • "for the become-follower transition for partition %s")
  • .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId)))
  • }
  • partitionsToMakeFollower
  • }

ReplicaManager的makeLeaders(...)makeFollowers(...)方法最终还是会将具体的角色切换操作交给Partition对象处理,这两个方法在前面已经讲解过了,这里不再赘述。

3. 消息追加和读取

ReplicaManager的appendMessages(...)方法用于处理KafkaApis收到ProducerRequest请求后的消息写入操作,内部会调用自己的appendToLocalLog(...)方法;fetchMessages(...)方法用于处理KafkaApis收到的FetchRequest请求后的消息读取操作,内部会调用自己的readFromLocalLog(...)方法;这两个操作本质都是使用Log对象的append(...)方法read(...)实现的,这些源码在上一篇文章介绍延迟任务案例时都已经讲解过,这里就不再赘述。

4. 消息同步

在Kafka中,Follower副本需要与Leader副本保持数据同步,需要Follower副本主动向Leader副本拉取消息数据,该功能是由ReplicaFetcherManager类和AbstractFetcherThread类来实现的,我们先来了解它的父类ReplicaFetcherManager。

4.1. 拉取线程管理器

ReplicaFetcherManager的父类是AbstractFetcherManager类,它是一个抽象类,实现了管理数据拉取线程的大部分功能,只将创建拉取线程的任务交给子类,这是模板方法设计模式的应用。AbstractFetcherManager的定义如下:

  • // kafka.server.AbstractFetcherManager
  • abstract class AbstractFetcherManager(protected val name: String, clientId: String, numFetchers: Int = 1)
  • extends Logging with KafkaMetricsGroup {
  • // map of (source broker_id, fetcher_id per source broker) => fetcher
  • // 用于管理AbstractFetcherThread对象;
  • // BrokerAndFetcherId类型的键中封装了broker的网络位置信息(brokerId、host、port等)以及对应的Fetcher线程的ID
  • private val fetcherThreadMap = new mutable.HashMap[BrokerAndFetcherId, AbstractFetcherThread]
  • private val mapLock = new Object
  • this.logIdent = "[" + name + "] "
  • // 根据主题和分区ID计算拉取线程ID
  • private def getFetcherId(topic: String, partitionId: Int) : Int = {
  • Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers
  • }
  • ...
  • }

其中fetcherThreadMap是一个字典,键为BrokerAndFetcherId类型对象,值为AbstractFetcherThread类型对象,用于记录每个Broker上每个IP和端口对应的数据拉取线程;AbstractFetcherThread后面会详细介绍,这里先关注BrokerAndFetcherId的实现,它是一个样例类,比较简单:

  • // kafka.server.BrokerAndFetcherId
  • case class BrokerAndFetcherId(broker: BrokerEndPoint, fetcherId: Int)
  • // kafka.cluster.BrokerEndPoint
  • case class BrokerEndPoint(id: Int, host: String, port: Int) {
  • def connectionString(): String = formatAddress(host, port)
  • def writeTo(buffer: ByteBuffer): Unit = {
  • buffer.putInt(id)
  • writeShortString(buffer, host)
  • buffer.putInt(port)
  • }
  • def sizeInBytes: Int =
  • 4 + /* broker Id */
  • 4 + /* port */
  • shortStringLength(host)
  • }

AbstractFetcherManager声明了创建拉取线程的方法供子类实现:

  • // kafka.server.AbstractFetcherManager#createFetcherThread
  • // to be defined in subclass to create a specific fetcher
  • def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread

AbstractFetcherManager自己提供了为指定分区添加数据拉取线程的方法:

  • // kafka.server.AbstractFetcherManager#addFetcherForPartitions
  • def addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition, BrokerAndInitialOffset]) {
  • mapLock synchronized { // 加锁
  • /**
  • * 对partitionAndOffsets根据构造的BrokerAndFetcherId为分组项进行分组
  • * 每个Fetcher线程只服务于一个broker,但可以为多个分区的Follower完成同步
  • * 得到的结果为 Map[BrokerAndFetcherId, Seq[Map[TopicAndPartition, BrokerAndInitialOffset]]]
  • */
  • val partitionsPerFetcher = partitionAndOffsets.groupBy{ case(topicAndPartition, brokerAndInitialOffset) =>
  • BrokerAndFetcherId(brokerAndInitialOffset.broker, getFetcherId(topicAndPartition.topic, topicAndPartition.partition))}
  • // 遍历得到的partitionsPerFetcher
  • for ((brokerAndFetcherId, partitionAndOffsets) <- partitionsPerFetcher) {
  • var fetcherThread: AbstractFetcherThread = null
  • // 根据brokerAndFetcherId取出对应的Fetcher线程,即每个broker应该有一个对应的Fetcher线程
  • fetcherThreadMap.get(brokerAndFetcherId) match {
  • // 取到了就赋值给fetcherThread
  • case Some(f) => fetcherThread = f
  • // 没取到,创建一个新的Fetcher线程,将其添加到fetcherThreadMap,并启动该线程
  • case None =>
  • fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker)
  • fetcherThreadMap.put(brokerAndFetcherId, fetcherThread)
  • fetcherThread.start
  • }
  • // 将分区信息以及同步起始位置传递给Fetcher线程,并唤醒Fetcher线程,开始同步
  • fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map { case (topicAndPartition, brokerAndInitOffset) =>
  • topicAndPartition -> brokerAndInitOffset.initOffset
  • })
  • }
  • }
  • info("Added fetcher for partitions %s".format(partitionAndOffsets.map{ case (topicAndPartition, brokerAndInitialOffset) =>
  • "[" + topicAndPartition + ", initOffset " + brokerAndInitialOffset.initOffset + " to broker " + brokerAndInitialOffset.broker + "] "}))
  • }

该方法接收的参数partitionAndOffsets的类型为Map[TopicAndPartition, BrokerAndInitialOffset],其中键为主题分区对象,值为BrokerAndInitialOffset对象,封装了Broker的网络位置信息以及同步的起始offset,BrokerAndInitialOffset的定义如下:

  • case class BrokerAndInitialOffset(broker: BrokerEndPoint, initOffset: Long)

addFetcherForPartitions(...)方法会根据以Broker的网络位置信息和以主题名和分区ID计算的拉取线程ID构建的BrokerAndFetcherId对象来对partitionAndOffsets参数进行分组,然后为每个不同的BrokerAndFetcherId对象分配一个类型为AbstractFetcherThread数据拉取线程;分配线程时会首先从fetcherThreadMap中取,如果没取到就调用子类实现的createFetcherThread(...)方法进行创建并放入fetcherThreadMap中保存。

在方法的最后会使用AbstractFetcherThread线程的addPartitions(partitionAndOffsets: Map[TopicAndPartition, Long])方法将分区信息以及同步起始位置传递给拉取线程,并唤醒它开始同步。

createFetcherThread(...)方法自然是由AbstractFetcherManager的子类ReplicaFetcherManager实现的,事实上它内部仅实现了该方法:

  • // kafka.server.ReplicaFetcherManager
  • class ReplicaFetcherManager(brokerConfig: KafkaConfig, replicaMgr: ReplicaManager, metrics: Metrics, time: Time, threadNamePrefix: Option[String] = None)
  • extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId,
  • "Replica", brokerConfig.numReplicaFetchers) {
  • override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread = {
  • // 构建拉取线程名称
  • val threadName = threadNamePrefix match {
  • case None =>
  • "ReplicaFetcherThread-%d-%d".format(fetcherId, sourceBroker.id)
  • case Some(p) =>
  • "%s:ReplicaFetcherThread-%d-%d".format(p, fetcherId, sourceBroker.id)
  • }
  • // 创建ReplicaFetcherThread拉取线程进行返回
  • new ReplicaFetcherThread(threadName, fetcherId, sourceBroker, brokerConfig,
  • replicaMgr, metrics, time)
  • }
  • // 关闭所有管理的拉取线程
  • def shutdown() {
  • info("shutting down")
  • // 调用父类的方法关闭所有管理的拉取线程
  • closeAllFetchers()
  • info("shutdown completed")
  • }
  • }

可以发现,其内部创建的拉取线程是ReplicaFetcherThread类型的,该类后面会详细介绍。

AbstractFetcherManager线程的移除、关闭等操作,方法实现都比较简单:

  • // kafka.server.AbstractFetcherManager#removeFetcherForPartitions
  • // 移除某些分区的拉取任务
  • def removeFetcherForPartitions(partitions: Set[TopicAndPartition]) {
  • mapLock synchronized { // 加锁
  • // 遍历fetcherThreadMap,移除对应的任务
  • for ((key, fetcher) <- fetcherThreadMap) {
  • fetcher.removePartitions(partitions)
  • }
  • }
  • info("Removed fetcher for partitions %s".format(partitions.mkString(",")))
  • }
  • // kafka.server.AbstractFetcherManager#shutdownIdleFetcherThreads
  • // 关闭空闲拉取线程
  • def shutdownIdleFetcherThreads() {
  • mapLock synchronized { // 加锁
  • val keysToBeRemoved = new mutable.HashSet[BrokerAndFetcherId]
  • // 遍历fetcherThreadMap
  • for ((key, fetcher) <- fetcherThreadMap) {
  • // 如果Fetcher线程没有为任何Follower副本进行同步,就将其停止
  • if (fetcher.partitionCount <= 0) {
  • fetcher.shutdown()
  • // 记录停止Fetcher线程的broker
  • keysToBeRemoved += key
  • }
  • }
  • // 从fetcherThreadMap中移除对应的记录
  • fetcherThreadMap --= keysToBeRemoved
  • }
  • }
  • // kafka.server.AbstractFetcherManager#closeAllFetchers
  • // 关闭所有管理的拉取线程
  • def closeAllFetchers() {
  • mapLock synchronized {
  • // 遍历,调用shutdown()
  • for ( (_, fetcher) <- fetcherThreadMap) {
  • fetcher.shutdown()
  • }
  • // 清空fetcherThreadMap
  • fetcherThreadMap.clear()
  • }
  • }

4.2. 拉取线程

从ReplicaFetcherManager的createFetcherThread(...)方法得知,创建的数据拉取线程都是ReplicaFetcherThread类型,该类继承自AbstractFetcherThread,而AbstractFetcherThread又继承自ShutdownableThread;ShutdownableThread类在前面的文章中已经出现过多次了,它的重要方法是doWork(): Unit,留给子类实现;其实这三个类的关系与前面ReplicaFetcherManager和AbstractFetcherManager一样,都是模板方法设计模式的实现。我们先关注AbstractFetcherThread类的定义:

  • abstract class AbstractFetcherThread(name: String,
  • clientId: String,
  • sourceBroker: BrokerEndPoint,
  • fetchBackOffMs: Int = 0,
  • isInterruptible: Boolean = true)
  • extends ShutdownableThread(name, isInterruptible) {
  • type REQ <: FetchRequest
  • type PD <: PartitionData
  • // 维护了TopicAndPartition与PartitionFetchState之间的对应关系,PartitionFetchState记录了对应分区的同步offset位置以及同步状态
  • private val partitionMap = new mutable.HashMap[TopicAndPartition, PartitionFetchState] // a (topic, partition) -> partitionFetchState map
  • // 用于保证partitionMap并发安全性的重入锁
  • private val partitionMapLock = new ReentrantLock
  • // 重入锁的条件队列
  • private val partitionMapCond = partitionMapLock.newCondition()
  • ...
  • }

其中partitionMap字典会记录每个分区的拉取状态,由PartitionFetchState类型对象表示,定义如下:

  • // kafka.server.PartitionFetchState
  • case class PartitionFetchState(offset: Long, delay: DelayedItem) {
  • // DelayedItem用于表示延迟时间,它实现了Delayed接口
  • def this(offset: Long) = this(offset, new DelayedItem(0))
  • // 是否是激活状态
  • def isActive: Boolean = { delay.getDelay(TimeUnit.MILLISECONDS) == 0 }
  • override def toString = "%d-%b".format(offset, isActive)
  • }
  • // kafka.utils.DelayedItem
  • class DelayedItem(delayMs: Long) extends Delayed with Logging {
  • // 到期时间是在当前时间上延迟delayMs毫秒
  • private val dueMs = SystemTime.milliseconds + delayMs
  • def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay))
  • /**
  • * The remaining delay time
  • */
  • def getDelay(unit: TimeUnit): Long = {
  • // 检查剩余等待时间
  • unit.convert(max(dueMs - SystemTime.milliseconds, 0), TimeUnit.MILLISECONDS)
  • }
  • def compareTo(d: Delayed): Int = {
  • val other = d.asInstanceOf[DelayedItem]
  • // 比较到期时间
  • if(dueMs < other.dueMs) -1
  • else if(dueMs > other.dueMs) 1
  • else 0
  • }
  • }

AbstractFetcherThread内部有大量的抽象方法交给子类实现,会在后面ReplicaFetcherThread中讲解具体的实现:

  • // process fetched data
  • // 处理拉取得到的数据
  • def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: PD)
  • // handle a partition whose offset is out of range and return a new fetch offset
  • // 处理offset越界的情况
  • def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long
  • // deal with partitions with errors, potentially due to leadership changes
  • // 处理错误
  • def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition])
  • // 创建FetchRequest
  • protected def buildFetchRequest(partitionMap: Map[TopicAndPartition, PartitionFetchState]): REQ
  • // 拉取操作
  • protected def fetch(fetchRequest: REQ): Map[TopicAndPartition, PD]

我们需要关注主要的方法doWork(),源码如下:

  • // kafka.server.AbstractFetcherThread#doWork
  • override def doWork() {
  • val fetchRequest = inLock(partitionMapLock) { // 加锁
  • // 创建FetchRequest
  • val fetchRequest = buildFetchRequest(partitionMap)
  • if (fetchRequest.isEmpty) {
  • trace("There are no active partitions. Back off for %d ms before sending a fetch request".format(fetchBackOffMs))
  • // FetchRequest为空时,退避一段时间后重试
  • partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
  • }
  • fetchRequest
  • }
  • // FetchRequest不为空,发送并处理FetchRequest请求
  • if (!fetchRequest.isEmpty)
  • processFetchRequest(fetchRequest)
  • }

doWork()的实现可以看到,构建FetchRequest请求、发送并处理请求的操作分别交给了buildFetchRequest(...)processFetchRequest(...)方法。

4.2.1. 构建FetchRequest请求

buildFetchRequest(...)方法由子类ReplicaFetcherThread实现,用于构建FetchRequest请求:

  • // kafka.server.ReplicaFetcherThread#buildFetchRequest
  • // 构造FetchRequest请求
  • protected def buildFetchRequest(partitionMap: Map[TopicAndPartition, PartitionFetchState]): FetchRequest = {
  • // 定义一个字典用于存放定义的请求
  • val requestMap = mutable.Map.empty[TopicPartition, JFetchRequest.PartitionData]
  • partitionMap.foreach { case ((TopicAndPartition(topic, partition), partitionFetchState)) =>
  • if (partitionFetchState.isActive) // 检测分区的拉取状态是否为激活状态
  • // 为每个分区构造请求对象
  • requestMap(new TopicPartition(topic, partition)) = new JFetchRequest.PartitionData(partitionFetchState.offset, fetchSize) // replica.fetch.max.bytes
  • }
  • // 构造一个FetchRequest请求对象并返回
  • new FetchRequest(new JFetchRequest(replicaId, maxWait, minBytes, requestMap.asJava)) // replica.fetch.wait.max.ms、replica.fetch.min.bytes
  • }

该实现会为每个partitionFetchState为激活状态的分区构建对应的PartitionData对象,记录了分区的同步offset、拉取数据大小等信息,然后存放在字典requestMap中,最终以requestMap构造FetchRequest对象并返回。

4.2.2. 拉取流程

processFetchRequest(...)方法则是由AbstractFetcherThread类实现的,源码如下:

  • // kafka.server.AbstractFetcherThread#processFetchRequest
  • private def processFetchRequest(fetchRequest: REQ) {
  • // 创建一个集合记录出错的分区
  • val partitionsWithError = new mutable.HashSet[TopicAndPartition]
  • // 记录响应数据的字典
  • var responseData: Map[TopicAndPartition, PD] = Map.empty
  • try {
  • trace("Issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest))
  • /**
  • * 发送FetchRequest并等待FetchResponse,该方法是抽象方法,交由子类实现
  • * responseData用于记录返回的响应
  • */
  • responseData = fetch(fetchRequest)
  • } catch {
  • case t: Throwable =>
  • if (isRunning.get) {
  • warn(s"Error in fetch $fetchRequest", t)
  • inLock(partitionMapLock) {
  • partitionsWithError ++= partitionMap.keys
  • // there is an error occurred while fetching partitions, sleep a while
  • // 出现异常,退避一段时间
  • partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
  • }
  • }
  • }
  • fetcherStats.requestRate.mark()
  • if (responseData.nonEmpty) { // 处理FetchResponse
  • // process fetched data
  • inLock(partitionMapLock) { // 加锁
  • // 遍历responseData
  • responseData.foreach { case (topicAndPartition, partitionData) =>
  • val TopicAndPartition(topic, partitionId) = topicAndPartition
  • // 从partitionMap获取分区对应的PartitionFetchState
  • partitionMap.get(topicAndPartition).foreach(currentPartitionFetchState =>
  • // we append to the log if the current offset is defined and it is the same as the offset requested during fetch
  • // 响应中得到的该分区的offset与之前partitionMap中记录的一致,将进行数据写入
  • if (fetchRequest.offset(topicAndPartition) == currentPartitionFetchState.offset) {
  • // 处理错误码
  • Errors.forCode(partitionData.errorCode) match {
  • case Errors.NONE => // 没有错误
  • try {
  • // 获取返回的消息集合
  • val messages = partitionData.toByteBufferMessageSet
  • // 有效的字节数
  • val validBytes = messages.validBytes
  • // 获取返回的最后一条消息的offset
  • val newOffset = messages.shallowIterator.toSeq.lastOption match {
  • case Some(m: MessageAndOffset) => m.nextOffset
  • case None => currentPartitionFetchState.offset
  • }
  • // 更新Fetch状态
  • partitionMap.put(topicAndPartition, new PartitionFetchState(newOffset))
  • fetcherLagStats.getAndMaybePut(topic, partitionId).lag = Math.max(0L, partitionData.highWatermark - newOffset)
  • fetcherStats.byteRate.mark(validBytes)
  • // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
  • // 将从Leader副本获取的消息集合追加到Log中,抽象方法,交由子类实现
  • processPartitionData(topicAndPartition, currentPartitionFetchState.offset, partitionData)
  • } catch {
  • // 异常处理
  • case ime: CorruptRecordException =>
  • // we log the error and continue. This ensures two things
  • // 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag
  • // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and
  • // should get fixed in the subsequent fetches
  • logger.error("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentPartitionFetchState.offset + " error " + ime.getMessage)
  • case e: Throwable =>
  • throw new KafkaException("error processing data for partition [%s,%d] offset %d"
  • .format(topic, partitionId, currentPartitionFetchState.offset), e)
  • }
  • case Errors.OFFSET_OUT_OF_RANGE =>
  • // 若Follower副本请求的offset超出了Leader的LEO,则返回该错误码
  • try {
  • // 生成新的offset,handleOffsetOutOfRange()是抽象方法,交由子类实现
  • val newOffset = handleOffsetOutOfRange(topicAndPartition)
  • // 更新Fetch状态
  • partitionMap.put(topicAndPartition, new PartitionFetchState(newOffset))
  • error("Current offset %d for partition [%s,%d] out of range; reset offset to %d"
  • .format(currentPartitionFetchState.offset, topic, partitionId, newOffset))
  • } catch {
  • case e: Throwable =>
  • error("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e)
  • partitionsWithError += topicAndPartition
  • }
  • case _ =>
  • // 返回其他错误码,则进行收集后,由handlePartitionsWithErrors()方法处理
  • if (isRunning.get) {
  • error("Error for partition [%s,%d] to broker %d:%s".format(topic, partitionId, sourceBroker.id,
  • partitionData.exception.get))
  • partitionsWithError += topicAndPartition
  • }
  • }
  • })
  • }
  • }
  • }
  • if (partitionsWithError.nonEmpty) {
  • debug("handling partitions with error for %s".format(partitionsWithError))
  • // 处理错误,抽象方法,交由子类实现
  • handlePartitionsWithErrors(partitionsWithError)
  • }
  • }

从源码可以发现,processFetchRequest(...)方法只是负责规划请求发送、响应数据处理以及异常和错误处理的整体流程,具体的功能由fetch(...)processPartitionData(...)handleOffsetOutOfRange(...)handlePartitionsWithErrors(...)负责实现,而这些方法无一例外都是抽象方法,由子类ReplicaFetcherThread实现,我们先关注这些方法。

4.2.2.1. 请求发送

ReplicaFetcherThread类的fetch(...)方法用于发送请求并获取响应数据,但具体功能还是由sendRequest(...)实现:

  • // kafka.server.ReplicaFetcherThread#fetch
  • protected def fetch(fetchRequest: FetchRequest): Map[TopicAndPartition, PartitionData] = {
  • // 发送请求
  • val clientResponse = sendRequest(ApiKeys.FETCH, Some(fetchRequestVersion), fetchRequest.underlying)
  • // 根据请求返回的数据构造FetchResponse响应对象
  • new FetchResponse(clientResponse.responseBody).responseData.asScala.map { case (key, value) =>
  • TopicAndPartition(key.topic, key.partition) -> new PartitionData(value)
  • }
  • }
  • // kafka.server.ReplicaFetcherThread#sendRequest
  • private def sendRequest(apiKey: ApiKeys, apiVersion: Option[Short], request: AbstractRequest): ClientResponse = {
  • import kafka.utils.NetworkClientBlockingOps._
  • // 构造请求头
  • val header = apiVersion.fold(networkClient.nextRequestHeader(apiKey))(networkClient.nextRequestHeader(apiKey, _))
  • try {
  • // 阻塞等待Node的状态变为Ready,超时会抛出异常
  • if (!networkClient.blockingReady(sourceNode, socketTimeout)(time))
  • throw new SocketTimeoutException(s"Failed to connect within $socketTimeout ms")
  • else {
  • // 构造RequestSend对象
  • val send = new RequestSend(sourceBroker.id.toString, header, request.toStruct)
  • // 构造ClientRequest对象
  • val clientRequest = new ClientRequest(time.milliseconds(), true, send, null)
  • // 发送请求并阻塞等待响应
  • networkClient.blockingSendAndReceive(clientRequest)(time)
  • }
  • }
  • catch {
  • // 异常处理
  • case e: Throwable =>
  • networkClient.close(sourceBroker.id.toString)
  • throw e
  • }
  • }

这里涉及到了请求的网络通信处理,在ReplicaFetcherThread类中定义了NetworkClient对象,用于处理网络通信:

  • // kafka.server.ReplicaFetcherThread#networkClient
  • // we need to include both the broker id and the fetcher id
  • // as the metrics tag to avoid metric name conflicts with
  • // more than one fetcher thread to the same broker
  • private val networkClient = {
  • val channelBuilder = ChannelBuilders.create(
  • brokerConfig.interBrokerSecurityProtocol,
  • Mode.CLIENT,
  • LoginType.SERVER,
  • brokerConfig.values,
  • brokerConfig.saslMechanismInterBrokerProtocol,
  • brokerConfig.saslInterBrokerHandshakeRequestEnable
  • )
  • val selector = new Selector(
  • NetworkReceive.UNLIMITED,
  • brokerConfig.connectionsMaxIdleMs,
  • metrics,
  • time,
  • "replica-fetcher",
  • Map("broker-id" -> sourceBroker.id.toString, "fetcher-id" -> fetcherId.toString).asJava,
  • false,
  • channelBuilder
  • )
  • new NetworkClient(
  • selector,
  • new ManualMetadataUpdater(),
  • clientId,
  • 1,
  • 0,
  • Selectable.USE_DEFAULT_BUFFER_SIZE,
  • brokerConfig.replicaSocketReceiveBufferBytes,
  • brokerConfig.requestTimeoutMs,
  • time
  • )
  • }

不过在使用该NetworkClient时,ReplicaFetcherThread是通过将其包装为NetworkClientBlockingOps对象来使用的;NetworkClientBlockingOps提供了blockingReady(node: Node, timeout: Long)(implicit time: JTime): BooleanblockingSendAndReceive(request: ClientRequest)(implicit time: JTime): ClientResponse两个阻塞方法,分别阻塞等待直到指定Node处于Ready状态及发送请求后阻塞等待响应:

  • // kafka.utils.NetworkClientBlockingOps#blockingReady
  • /**
  • * Invokes `client.ready` followed by 0 or more `client.poll` invocations until the connection to `node` is ready,
  • * the timeout expires or the connection fails.
  • *
  • * It returns `true` if the call completes normally or `false` if the timeout expires. If the connection fails,
  • * an `IOException` is thrown instead.
  • *
  • * This method is useful for implementing blocking behaviour on top of the non-blocking `NetworkClient`, use it with
  • * care.
  • */
  • def blockingReady(node: Node, timeout: Long)(implicit time: JTime): Boolean = {
  • require(timeout >=0, "timeout should be >= 0")
  • // ready()方法会在Node节点未准备好时尝试进行连接
  • client.ready(node, time.milliseconds()) || pollUntil(timeout) { (_, now) =>
  • if (client.isReady(node, now)) // 检测Node是否Ready
  • true
  • else if (client.connectionFailed(node))
  • // 抛出异常
  • throw new IOException(s"Connection to $node failed")
  • else false
  • }
  • }
  • // kafka.utils.NetworkClientBlockingOps#blockingSendAndReceive
  • /**
  • * Invokes `client.send` followed by 1 or more `client.poll` invocations until a response is received or a
  • * disconnection happens (which can happen for a number of reasons including a request timeout).
  • *
  • * In case of a disconnection, an `IOException` is thrown.
  • *
  • * This method is useful for implementing blocking behaviour on top of the non-blocking `NetworkClient`, use it with
  • * care.
  • */
  • def blockingSendAndReceive(request: ClientRequest)(implicit time: JTime): ClientResponse = {
  • client.send(request, time.milliseconds()) // 发送请求
  • pollContinuously { responses =>
  • // 找到上面的发送请求对应的响应,根据请求和响应的correlationId进行匹配
  • val response = responses.find { response =>
  • response.request.request.header.correlationId == request.request.header.correlationId
  • }
  • response.foreach { r =>
  • if (r.wasDisconnected) {
  • // 连接断开,抛出IOException异常
  • val destination = request.request.destination
  • throw new IOException(s"Connection to $destination was disconnected before the response was read")
  • }
  • }
  • response
  • }
  • }

这两个方法分别使用了pollUntil(...)pollContinuously(...)方法,源码如下:

  • // kafka.utils.NetworkClientBlockingOps#pollUntil
  • /**
  • * Invokes `client.poll` until `predicate` returns `true` or the timeout expires.
  • *
  • * It returns `true` if the call completes normally or `false` if the timeout expires. Exceptions thrown via
  • * `predicate` are not handled and will bubble up.
  • *
  • * This method is useful for implementing blocking behaviour on top of the non-blocking `NetworkClient`, use it with
  • * care.
  • */
  • private def pollUntil(timeout: Long)(predicate: (Seq[ClientResponse], Long) => Boolean)(implicit time: JTime): Boolean = {
  • val methodStartTime = time.milliseconds()
  • // 计算超时时间
  • val timeoutExpiryTime = methodStartTime + timeout
  • // 递归方法,有超时时间
  • @tailrec
  • def recursivePoll(iterationStartTime: Long): Boolean = {
  • // 剩余超时时间
  • val pollTimeout = timeoutExpiryTime - iterationStartTime
  • // 发送请求
  • val responses = client.poll(pollTimeout, iterationStartTime).asScala
  • // 检测是否满足递归结束条件
  • if (predicate(responses, iterationStartTime)) true
  • else {
  • val afterPollTime = time.milliseconds()
  • // 未超时,继续进行递归操作
  • if (afterPollTime < timeoutExpiryTime) recursivePoll(afterPollTime)
  • else false // 超时返回
  • }
  • }
  • // 进入递归方法
  • recursivePoll(methodStartTime)
  • }
  • // kafka.utils.NetworkClientBlockingOps#pollContinuously
  • /**
  • * Invokes `client.poll` until `collect` returns `Some`. The value inside `Some` is returned.
  • *
  • * Exceptions thrown via `collect` are not handled and will bubble up.
  • *
  • * This method is useful for implementing blocking behaviour on top of the non-blocking `NetworkClient`, use it with
  • * care.
  • */
  • private def pollContinuously[T](collect: Seq[ClientResponse] => Option[T])(implicit time: JTime): T = {
  • // 递归方法,没有超时时间
  • @tailrec
  • def recursivePoll: T = {
  • // rely on request timeout to ensure we don't block forever
  • // poll操作发送请求,ClientRequest有超时时间,所以此处虽然超时时间是Long.MaxValue,但并不会永远阻塞
  • val responses = client.poll(Long.MaxValue, time.milliseconds()).asScala
  • collect(responses) match { // 检测是否满足递归结束条件
  • // 有结果,返回
  • case Some(result) => result
  • // 没有结果,进行下一次递归
  • case None => recursivePoll
  • }
  • }
  • // 进入递归方法
  • recursivePoll
  • }

回到ReplicaFetcherThread的sendRequest(...)方法,该方法使用NetworkClientBlockingOps提供的blockingReady(...)blockingSendAndReceive(...)发送请求获取到响应后会将得到的ClientResponse响应对象返回给上层的fetch(...),最终包装为FetchResponse向上层方法返回。

4.2.2.2. 响应处理

fetch(...)方法得到的FetchResponse响应对象会在AbstractFetcherThread的processFetchRequest(...)方法内被遍历处理,与partitionMap内存储的各个分区的原同步状态中记录的offset进行对比,如果一致说明本次拉取请求是成功的,如果没有错误产生,就处理拉取的数据并写入Follower副本的日志系统,这个操作由子类ReplicaFetcherThread的processPartitionData(...)实现,源码如下:

  • // kafka.server.ReplicaFetcherThread#processPartitionData
  • // process fetched data
  • // 处理fetch()方法请求返回的数据
  • def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: PartitionData) {
  • try {
  • val TopicAndPartition(topic, partitionId) = topicAndPartition
  • // 获取对应的Replica副本对象
  • val replica = replicaMgr.getReplica(topic, partitionId).get
  • // 获取响应的消息数据,转换为ByteBufferMessageSet
  • val messageSet = partitionData.toByteBufferMessageSet
  • // 检查消息数据大小是否合法
  • warnIfMessageOversized(messageSet, topicAndPartition)
  • // 检查拉取的offset是否是从副本的LEO开始的,如果不是将抛出异常
  • if (fetchOffset != replica.logEndOffset.messageOffset)
  • throw new RuntimeException("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.".format(topicAndPartition, fetchOffset, replica.logEndOffset.messageOffset))
  • // 日志记录
  • if (logger.isTraceEnabled)
  • trace("Follower %d has replica log end offset %d for partition %s. Received %d messages and leader hw %d"
  • .format(replica.brokerId, replica.logEndOffset.messageOffset, topicAndPartition, messageSet.sizeInBytes, partitionData.highWatermark))
  • // 将消息追加到Log中,第二个参数为false,表示使用Leader已经为消息分配了offset,Follower副本不再对消息分配offset
  • replica.log.get.append(messageSet, assignOffsets = false)
  • // 日志记录
  • if (logger.isTraceEnabled)
  • trace("Follower %d has replica log end offset %d after appending %d bytes of messages for partition %s"
  • .format(replica.brokerId, replica.logEndOffset.messageOffset, messageSet.sizeInBytes, topicAndPartition))
  • // 计算副本的HighWatermark
  • val followerHighWatermark = replica.logEndOffset.messageOffset.min(partitionData.highWatermark)
  • // for the follower replica, we do not need to keep
  • // its segment base offset the physical position,
  • // these values will be computed upon making the leader
  • // 更新副本的HighWatermark
  • replica.highWatermark = new LogOffsetMetadata(followerHighWatermark)
  • if (logger.isTraceEnabled)
  • trace("Follower %d set replica high watermark for partition [%s,%d] to %s"
  • .format(replica.brokerId, topic, partitionId, followerHighWatermark))
  • } catch {
  • case e: KafkaStorageException =>
  • fatal(s"Disk error while replicating data for $topicAndPartition", e)
  • Runtime.getRuntime.halt(1)
  • }
  • }

processPartitionData(...)方法会将解析得到的消息追加到Follower副本的Log中,追加操作交给了Log对象负责,并更新Follower副本的HighWatermark。

4.2.2.3. 异常和错误处理

processFetchRequest(...)方法的整个过程中,难免会出现各种错误和异常;fetch(...)方法在出现异常时,处理方式是退避一段时间会再尝试,而在处理响应数据时,当碰到CorruptRecordException时会记录错误日志,出现OFFSET_OUT_OF_RANGE错误码时交由handleOffsetOutOfRange(...)方法处理,同时processFetchRequest(...)方法会在整个流程中将拉取操作出现错误的分区存储在partitionsWithError中,最终由handlePartitionsWithErrors(...)统一处理。

handleOffsetOutOfRange(...)方法主要处理Follower副本请求的offset超出了Leader副本的offset范围,可能是超过了Leader的LogEndOffset,也可能是小于Leader的最小offset(startOffset)。当发生“Unclean leader election”时就可能出现第一种情况,这种场景简单来说就是将不在ISR集合中的Follower副本被选举成为了Leader副本,发生此场景的过程如下:

  1. 一个Follower副本发生宕机,而Leader副本不断接收来自生产者的消息并追加到Log中,此时Follower副本因为宕机并没有与Leader副本进行同步。
  2. 此Follower副本重新上线,在它与Leader完全同步之前,它没有资格进入ISR集合。假设ISR集合中的Follower副本在此时全部宕机,只能选举此Follower副本为新Leader副本。
  3. 之后,旧Leader重新上线成为Follower副本,此时就会出现Follower副本的LogEndOffset超越了Leader副本的LogEndOffset值的场景。

handleOffsetOutOfRange(...)方法针对“Unclean leader election”场景的处理如下:

  • // kafka.server.ReplicaFetcherThread#handleOffsetOutOfRange
  • /**
  • * Handle a partition whose offset is out of range and return a new fetch offset.
  • * handleOffsetOutOfRange()方法主要处理Follower副本请求的offset超出了Leader副本的offset范围,
  • * 可能是超过了Leader的LEO,也可能是小于Leader的最小offset(startOffset)
  • */
  • def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long = {
  • // 获取对应的Replica
  • val replica = replicaMgr.getReplica(topicAndPartition.topic, topicAndPartition.partition).get
  • /**
  • * Unclean leader election: A follower goes down, in the meanwhile the leader keeps appending messages. The follower comes back up
  • * and before it has completely caught up with the leader's logs, all replicas in the ISR go down. The follower is now uncleanly
  • * elected as the new leader, and it starts appending messages from the client. The old leader comes back up, becomes a follower
  • * and it may discover that the current leader's end offset is behind its own end offset.
  • *
  • * In such a case, truncate the current follower's log to the current leader's end offset and continue fetching.
  • *
  • * There is a potential for a mismatch between the logs of the two replicas here. We don't fix this mismatch as of now.
  • */
  • // 发送ListOffsetRequest,获取Leader副本的LEO,使用的是阻塞的方式
  • val leaderEndOffset: Long = earliestOrLatestOffset(topicAndPartition, ListOffsetRequest.LATEST_TIMESTAMP,
  • brokerConfig.brokerId)
  • // 判断Leader副本的LEO是否依然落后于Follower副本的LEO
  • if (leaderEndOffset < replica.logEndOffset.messageOffset) { // Leader副本的LEO落后于Follower副本的LEO
  • // Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election.
  • // This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise,
  • // we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration.
  • // 根据配置决定是否需要停机
  • if (!LogConfig.fromProps(brokerConfig.originals, AdminUtils.fetchEntityConfig(replicaMgr.zkUtils,
  • ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) { // unclean.leader.election.enable配置
  • // Log a fatal error and shutdown the broker to ensure that data loss does not unexpectedly occur.
  • fatal("Exiting because log truncation is not allowed for topic %s,".format(topicAndPartition.topic) +
  • " Current leader %d's latest offset %d is less than replica %d's latest offset %d"
  • .format(sourceBroker.id, leaderEndOffset, brokerConfig.brokerId, replica.logEndOffset.messageOffset))
  • // 退出JVM
  • System.exit(1)
  • }
  • warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's latest offset %d"
  • .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset.messageOffset, sourceBroker.id, leaderEndOffset))
  • // 将分区对应的Log截断到Leader副本的LEO的位置,之后从此offset开始重新与Leader进行同步
  • replicaMgr.logManager.truncateTo(Map(topicAndPartition -> leaderEndOffset))
  • leaderEndOffset
  • } else { // Leader副本的LEO没有落后于Follower副本的LEO,但可能是Leader副本中的startOffset(第一条消息的offset)大于Follower副本的LEO
  • /**
  • * If the leader's log end offset is greater than the follower's log end offset, there are two possibilities:
  • * 1. The follower could have been down for a long time and when it starts up, its end offset could be smaller than the leader's
  • * start offset because the leader has deleted old logs (log.logEndOffset < leaderStartOffset).
  • * 2. When unclean leader election occurs, it is possible that the old leader's high watermark is greater than
  • * the new leader's log end offset. So when the old leader truncates its offset to its high watermark and starts
  • * to fetch from the new leader, an OffsetOutOfRangeException will be thrown. After that some more messages are
  • * produced to the new leader. While the old leader is trying to handle the OffsetOutOfRangeException and query
  • * the log end offset of the new leader, the new leader's log end offset becomes higher than the follower's log end offset.
  • *
  • * In the first case, the follower's current log end offset is smaller than the leader's log start offset. So the
  • * follower should truncate all its logs, roll out a new segment and start to fetch from the current leader's log
  • * start offset.
  • * In the second case, the follower should just keep the current log segments and retry the fetch. In the second
  • * case, there will be some inconsistency of data between old and new leader. We are not solving it here.
  • * If users want to have strong consistency guarantees, appropriate configurations needs to be set for both
  • * brokers and producers.
  • *
  • * Putting the two cases together, the follower should fetch from the higher one of its replica log end offset
  • * and the current leader's log start offset.
  • *
  • */
  • // 发送ListOffsetRequest,获取Leader副本的startOffset,使用的是阻塞的方式
  • val leaderStartOffset: Long = earliestOrLatestOffset(topicAndPartition, ListOffsetRequest.EARLIEST_TIMESTAMP,
  • brokerConfig.brokerId)
  • warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's start offset %d"
  • .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset.messageOffset, sourceBroker.id, leaderStartOffset))
  • // 选择下次获取消息的起始offset
  • val offsetToFetch = Math.max(leaderStartOffset, replica.logEndOffset.messageOffset)
  • // Only truncate log when current leader's log start offset is greater than follower's log end offset.
  • if (leaderStartOffset > replica.logEndOffset.messageOffset)
  • // 将Log全部截断,并创建新的activeSegment
  • replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderStartOffset)
  • offsetToFetch // 返回下次获取消息的offset
  • }
  • }

在Follower副本发送ListOffsetRequest期间,新Leader副本可能在不断追加消息,新Leader副本的LogEndOffset落后于Follower副本的LogEndOffset的场景得到改变,此时就不再需要进行截断操作了,Follower副本可以继续从其LogEndOffset与Leader副本进行同步,这样新Leader副本与Follower副本的消息可能存在不一致的情况,如果出现这种情况,我们就需要根据实际的业务逻辑进行权衡。如果业务逻辑可以忍受“Unclean leader election”场景带来的消息丢失和不一致,则可以将unclean.leader.election.enable配置为true;如果业务逻辑不能接受这种场景,则关闭对“Unclean leader election”的支持,将停机作为优先选项。

如果Follower副本宕机后过了很长一段时间才重新上线,Leader副本在此期间可能执行了多次删除陈旧的日志的操作,这就可能导致Leader副本中第一条消息的offset(startOffset)大于Follower副本的LogEndOffset。

handlePartitionsWithErrors(...)方法,它会对其他错误码做统一处理,其实现是将分区的同步操作暂停一段时间:

  • // kafka.server.ReplicaFetcherThread#handlePartitionsWithErrors
  • // any logic for partitions whose leader has changed
  • def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) {
  • delayPartitions(partitions, brokerConfig.replicaFetchBackoffMs.toLong)
  • }
  • // kafka.server.AbstractFetcherThread#delayPartitions
  • // 延迟某些分区的拉取操作
  • def delayPartitions(partitions: Iterable[TopicAndPartition], delay: Long) {
  • partitionMapLock.lockInterruptibly() // 加锁
  • try {
  • for (partition <- partitions) {
  • // 获取分区对应的PartitionFetchState
  • partitionMap.get(partition).foreach (currentPartitionFetchState =>
  • if (currentPartitionFetchState.isActive) // 检测分区的同步状态
  • // 将分区对应的同步状态由激活状态设置为延时状态,延迟时长为delay毫秒
  • partitionMap.put(partition, new PartitionFetchState(currentPartitionFetchState.offset, new DelayedItem(delay)))
  • )
  • }
  • // 唤醒fetcher线程
  • partitionMapCond.signalAll()
  • } finally partitionMapLock.unlock()
  • }

5. 关闭副本

KafkaController可以通过向Broker发送StopReplicaRequest请求以关闭副本,StopReplicaRequest请求中通过deletePartitions字段标识在关闭副本的同时是否需要删除副本对应的文件。使用关闭副本操作的过程有很多种,如分区的副本发生重新分配、Broker关闭等。KafkaApis接口层使用handleStopReplicaRequest(request: RequestChannel.Request)方法处理该请求,源码如下:

  • // kafka.server.KafkaApis#handleStopReplicaRequest
  • def handleStopReplicaRequest(request: RequestChannel.Request) {
  • // ensureTopicExists is only for client facing requests
  • // We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
  • // stop serving data to clients for the topic being deleted
  • // 把请求转换为StopReplicaRequest
  • val stopReplicaRequest = request.body.asInstanceOf[StopReplicaRequest]
  • // 构造响应头
  • val responseHeader = new ResponseHeader(request.header.correlationId)
  • val response =
  • if (authorize(request.session, ClusterAction, Resource.ClusterResource)) { // 检查授权
  • // 使用ReplicaManager处理请求
  • val (result, error) = replicaManager.stopReplicas(stopReplicaRequest)
  • // 构造StopReplicaResponse响应对象
  • new StopReplicaResponse(error, result.asInstanceOf[Map[TopicPartition, JShort]].asJava)
  • } else {
  • // 授权失败,将响应结果中的分区的错误码设置为CLUSTER_AUTHORIZATION_FAILED
  • val result = stopReplicaRequest.partitions.asScala.map((_, new JShort(Errors.CLUSTER_AUTHORIZATION_FAILED.code))).toMap
  • new StopReplicaResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.code, result.asJava)
  • }
  • // 使用RequestChannel发送响应
  • requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, response)))
  • // 停止空闲的副本同步线程
  • replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads()
  • }

源码中显示,关闭副本的操作是由ReplicaManager的stopReplicas(...)方法实现的,同时在关闭副本之后,还会通过ReplicaFetcherManager停止对应Follower副本空闲的同步任务线程。其中ReplicaManager的stopReplicas(...)方法的源码如下:

  • // kafka.server.ReplicaManager#stopReplicas
  • def stopReplicas(stopReplicaRequest: StopReplicaRequest): (mutable.Map[TopicPartition, Short], Short) = {
  • replicaStateChangeLock synchronized { // 加锁
  • // 构造字典,用于存放每个分区的响应结果
  • val responseMap = new collection.mutable.HashMap[TopicPartition, Short]
  • if(stopReplicaRequest.controllerEpoch() < controllerEpoch) { // 检查Controller的年代信息是否合法
  • stateChangeLogger.warn("Broker %d received stop replica request from an old controller epoch %d. Latest known controller epoch is %d"
  • .format(localBrokerId, stopReplicaRequest.controllerEpoch, controllerEpoch))
  • // 返回年代信息过期的错误
  • (responseMap, Errors.STALE_CONTROLLER_EPOCH.code)
  • } else {
  • // 从请求中得到操作的分区
  • val partitions = stopReplicaRequest.partitions.asScala
  • // 更新年代信息
  • controllerEpoch = stopReplicaRequest.controllerEpoch
  • // First stop fetchers for all partitions, then stop the corresponding replicas
  • // 停止对指定分区的拉取操作
  • replicaFetcherManager.removeFetcherForPartitions(partitions.map(r => TopicAndPartition(r.topic, r.partition)))
  • for(topicPartition <- partitions){
  • // 使用stopReplica()方法关闭指定的分区的副本
  • val errorCode = stopReplica(topicPartition.topic, topicPartition.partition, stopReplicaRequest.deletePartitions)
  • // 向responseMap记录错误码
  • responseMap.put(topicPartition, errorCode)
  • }
  • (responseMap, Errors.NONE.code)
  • }
  • }
  • }
  • // kafka.server.ReplicaManager#stopReplica
  • def stopReplica(topic: String, partitionId: Int, deletePartition: Boolean): Short = {
  • stateChangeLogger.trace("Broker %d handling stop replica (delete=%s) for partition [%s,%d]".format(localBrokerId,
  • deletePartition.toString, topic, partitionId))
  • // 先记录错误码为NONE
  • val errorCode = Errors.NONE.code
  • // 从allPartitions获取分区进行操作
  • getPartition(topic, partitionId) match {
  • // 有对应的分区
  • case Some(partition) =>
  • if(deletePartition) { // deletePartition标识了是否需要删除分区,会删除分区对应的副本及其Log
  • // 从allPartitions中移除对应分区
  • val removedPartition = allPartitions.remove((topic, partitionId))
  • if (removedPartition != null) {
  • // 删除副本
  • removedPartition.delete() // this will delete the local log
  • val topicHasPartitions = allPartitions.keys.exists { case (t, _) => topic == t }
  • if (!topicHasPartitions)
  • BrokerTopicStats.removeMetrics(topic)
  • }
  • }
  • // 没有对应的分区,可以直接删除Log
  • case None =>
  • // Delete log and corresponding folders in case replica manager doesn't hold them anymore.
  • // This could happen when topic is being deleted while broker is down and recovers.
  • if(deletePartition) {
  • val topicAndPartition = TopicAndPartition(topic, partitionId)
  • // 使用LogManager中删除对应的Log
  • if(logManager.getLog(topicAndPartition).isDefined) {
  • logManager.deleteLog(topicAndPartition)
  • }
  • }
  • stateChangeLogger.trace("Broker %d ignoring stop replica (delete=%s) for partition [%s,%d] as replica doesn't exist on broker"
  • .format(localBrokerId, deletePartition, topic, partitionId))
  • }
  • stateChangeLogger.trace("Broker %d finished handling stop replica (delete=%s) for partition [%s,%d]"
  • .format(localBrokerId, deletePartition, topic, partitionId))
  • errorCode
  • }

最终方法交给了ReplicaManager的stopReplicas(...)方法,会首先检查请求中的Controller年代信息是否合法,然后停止指定分区的同步操作,并根据deletePartition字段决定是否对Log进行删除。

6. 定时任务

ReplicaManager内部设置了三个定时任务:HighWatermark Checkpoint、ISR Expiration和ISR Change Propagation,作用分别如下:

  • HighWatermark Checkpoint:该任务会周期性地记录每个副本的HighWatermark并保存到其数据目录下的replication-offset-checkpoint文件中。
  • ISR Expiration:任务会周期性地调用maybeShrinkIsr()方法检测每个分区是否需要缩减其ISR集合。
  • ISR Change Propagation:任务会周期性地将ISR集合发生变化的分区记录到ZooKeeper中。

在ReplicaManager的startup()方法中会启动ISR Expiration和ISR Change Propagation两个定时任务,分别调用maybeShrinkIsr()maybePropagateIsrChanges()方法;它们都是周期任务,ISR Expiration任务的周期时间由replica.lag.time.max.ms配置项决定,ISR Change Propagation任务的周期时间固定为2500毫秒:

  • // kafka.server.ReplicaManager#startup
  • def startup() {
  • // start ISR expiration thread
  • // ISR Expiration定时任务
  • scheduler.schedule("isr-expiration", maybeShrinkIsr, period = config.replicaLagTimeMaxMs, unit = TimeUnit.MILLISECONDS)
  • // ISR Change Propagation定时任务
  • scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges, period = 2500L, unit = TimeUnit.MILLISECONDS)
  • }

而HighWatermark Checkpoint任务则是在becomeLeaderOrFollower(...)中开启的,如果检测到HighWatermark Checkpoint任务未启动,会调用startHighWaterMarksCheckPointThread()方法启动HighWatermark Checkpoint周期任务,任务调用的是checkpointHighWatermarks()方法,周期时间由replica.high.watermark.checkpoint.interval.ms配置项决定:

  • // kafka.server.ReplicaManager#becomeLeaderOrFollower
  • // 负责副本角色切换
  • def becomeLeaderOrFollower(correlationId: Int,
  • leaderAndISRRequest: LeaderAndIsrRequest,
  • metadataCache: MetadataCache,
  • onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): BecomeLeaderOrFollowerResult = {
  • ...
  • replicaStateChangeLock synchronized { // 加锁
  • // 新建一个HashMap用于记录产生的错误码
  • val responseMap = new mutable.HashMap[TopicPartition, Short]
  • if (leaderAndISRRequest.controllerEpoch < controllerEpoch) { // 检查请求中的Controller的年代信息
  • // 如果请求中的Controller年代信息小于当前的年代信息,直接返回STALE_CONTROLLER_EPOCH异常码
  • ...
  • } else {
  • // 年代信息符合要求
  • ...
  • // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions
  • // have been completely populated before starting the checkpointing there by avoiding weird race conditions
  • // 启动HighWatermark Checkpoint任务
  • if (!hwThreadInitialized) {
  • startHighWaterMarksCheckPointThread()
  • hwThreadInitialized = true
  • }
  • ...
  • }
  • }
  • }
  • // kafka.server.ReplicaManager#hwThreadInitialized
  • // hwThreadInitialized默认为false
  • private var hwThreadInitialized = false
  • // kafka.server.ReplicaManager#startHighWaterMarksCheckPointThread
  • def startHighWaterMarksCheckPointThread() = {
  • if(highWatermarkCheckPointThreadStarted.compareAndSet(false, true)) // 检测HighWatermark Checkpoint任务是否启动
  • // 启动HighWatermark Checkpoint任务
  • scheduler.schedule("highwatermark-checkpoint", checkpointHighWatermarks, period = config.replicaHighWatermarkCheckpointIntervalMs, unit = TimeUnit.MILLISECONDS)
  • }

6.1. HighWatermark Checkpoint

我们先来关注HighWatermark Checkpoint定时任务checkpointHighWatermarks()方法的实现:

  • // kafka.server.ReplicaManager#checkpointHighWatermarks
  • // Flushes the highwatermark value for all partitions to the highwatermark file
  • def checkpointHighWatermarks() {
  • // 获取全部的Replica对象,按照副本所在的数据目录进行分组
  • val replicas = allPartitions.values.flatMap(_.getReplica(config.brokerId))
  • val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParentFile.getAbsolutePath)
  • // 遍历所有的数据目录
  • for ((dir, reps) <- replicasByDir) {
  • // 收集当前数据目录下的全部副本的HighWatermark
  • val hwms = reps.map(r => new TopicAndPartition(r) -> r.highWatermark.messageOffset).toMap
  • try {
  • // 更新对应数据目录下的replication-offset-checkpoint文件
  • highWatermarkCheckpoints(dir).write(hwms)
  • } catch {
  • case e: IOException =>
  • fatal("Error writing to highwatermark file: ", e)
  • Runtime.getRuntime().halt(1)
  • }
  • }
  • }
  • // kafka.server.ReplicaManager#highWatermarkCheckpoints
  • // 存储了每个数据目录及映射的HighWatermark Checkpoint文件,HighWatermark Checkpoint文件记录了每个分区的HighWatermark线
  • val highWatermarkCheckpoints = config.logDirs.map(dir => (new File(dir).getAbsolutePath, new OffsetCheckpoint(new File(dir, ReplicaManager.HighWatermarkFilename)))).toMap

该方法会遍历该ReplicaManager管理的所有的数据目录,依次从highWatermarkCheckpoints字典中读取每个数据目录对应的OffsetCheckpoint对象,然后将该目录下所有副本的HighWatermark值使用OffsetCheckpoint对象的write(offsets: Map[TopicAndPartition, Long])写入到对应数据目录下的replication-offset-checkpoint文件中;这里使用的OffsetCheckpoint对象与前面RecoveryPoint Checkpoint任务写入recovery-point-offset-checkpoint文件时是一样的,由此可见,replication-offset-checkpoint文件和recovery-point-offset-checkpoint文件的内部内容的格式是相同的,这里不再赘述,大家可以回顾前面发布的服务端源码分析:日志的存储管理一文的【1.1.4. RecoveryPoint Checkpoint定时任务】章节。

6.2. ISR Expiration

ISR Expiration定时任务maybeShrinkIsr()方法则比较简单,它会对ReplicaManager管理的Partition调用其maybeShrinkIsr(replicaMaxLagTimeMs: Long)方法:

  • // kafka.server.ReplicaManager#maybeShrinkIsr
  • private def maybeShrinkIsr(): Unit = {
  • trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR")
  • // 尝试对ISR进行紧缩操作,使用的是Partition的maybeShrinkIsr()方法
  • allPartitions.values.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs))
  • }

Partition类的maybeShrinkIsr(replicaMaxLagTimeMs: Long)方法在前面已经讲解过,这里不再赘述。

6.3. ISR Change Propagation

SR Change Propagation定时任务maybePropagateIsrChanges()方法的源码如下:

  • // kafka.server.ReplicaManager#maybePropagateIsrChanges
  • /**
  • * This function periodically runs to see if ISR needs to be propagated. It propagates ISR when:
  • * 1. There is ISR change not propagated yet.
  • * 2. There is no ISR Change in the last five seconds, or it has been more than 60 seconds since the last ISR propagation.
  • * This allows an occasional ISR change to be propagated within a few seconds, and avoids overwhelming controller and
  • * other brokers when large amount of ISR change occurs.
  • */
  • def maybePropagateIsrChanges() {
  • val now = System.currentTimeMillis()
  • isrChangeSet synchronized { // 加锁
  • /**
  • * 检查是否需要执行定时任务:1 & (2 || 3)
  • * 1. isrChangeSet不为空;
  • * 2. 最后一次有ISR集合发生变化的时间距今已超过5秒;
  • * 3. 上次写入Zookeeper的时间距今已超过60秒。
  • */
  • if (isrChangeSet.nonEmpty &&
  • (lastIsrChangeMs.get() + ReplicaManager.IsrChangePropagationBlackOut < now ||
  • lastIsrPropagationMs.get() + ReplicaManager.IsrChangePropagationInterval < now)) {
  • // 将isrChangeSet写入Zookeeper
  • ReplicationUtils.propagateIsrChanges(zkUtils, isrChangeSet)
  • // 清空isrChangeSet
  • isrChangeSet.clear()
  • // 更新lastIsrPropagationMs
  • lastIsrPropagationMs.set(now)
  • }
  • }
  • }

Change Propagation任务会定期将ISR集合中发生变化的分区记录到ZooKeeper中。KafkaController对相应路径添加了Wathcer,当Watcher被触发后会向其管理的Broker发送UpdateMetadataRequest请求,由于频繁地触发Watcher会影响KafkaController、ZooKeeper甚至其他Broker的性能,为避免这种情况,maybePropagateIsrChanges()方法设置了一定的写入条件:isrChangeSet集合不为空且最后一次有ISR集合发生变化的时间距今已超过5秒或者上次写入ZooKeeper的时间距今已超过60秒。

7. 元数据更新

Kafka的Broker使用MetadataCache对象缓存了每个分区的状态,以及对应的可用的Broker、可用的节点等信息,它的定义和重要字段如下:

  • // kafka.server.MetadataCache
  • /**
  • * A cache for the state (e.g., current leader) of each partition. This cache is updated through
  • * UpdateMetadataRequest from the controller. Every broker maintains the same cache, asynchronously.
  • *
  • * 每个分区状态的元数据缓存;该缓存会通过Controller的UpdateMetadataRequest进行更新;
  • * 每个broker维护的该元数据缓存是一致的。
  • */
  • private[server] class MetadataCache(brokerId: Int) extends Logging {
  • private val stateChangeLogger = KafkaController.stateChangeLogger
  • // 记录了每个分区的状态,使用PartitionStateInfo记录Partition状态
  • private val cache = mutable.Map[String, mutable.Map[Int, PartitionStateInfo]]()
  • private var controllerId: Option[Int] = None
  • // 记录了当前可用的Broker信息,其中使用Broker类记录每个存活Broker的网络位置信息(host、ip、port等)
  • private val aliveBrokers = mutable.Map[Int, Broker]()
  • // 记录了可用节点的信息
  • private val aliveNodes = mutable.Map[Int, collection.Map[SecurityProtocol, Node]]()
  • private val partitionMetadataLock = new ReentrantReadWriteLock()
  • ...
  • }

其中cache使用PartitionStateInfo对象记录了每个分区的状态,该对象中记录了AR集合、ISR集合、Leader副本的id、Leader副本年代信息和Controller年代信息,定义如下:

  • case class PartitionStateInfo(leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, // ISR及Controller年代
  • allReplicas: Set[Int]) { // AR集合
  • def replicationFactor = allReplicas.size
  • ...
  • }

7.1. KafkaController的更新请求

KafkaController通过向集群中的Broker发送UpdateMetadataRequest请求以通知Broker更新MetadataCache中缓存的元数据,Broker在会根据请求中携带的数据异步更新MetadataCache。KafkaApis中,UpdateMetadataRequest请求交给了handleUpdateMetadataRequest(request: RequestChannel.Request)方法处理:

  • // kafka.server.KafkaApis#handle
  • def handle(request: RequestChannel.Request) {
  • ...
  • // 根据requestId获取请求对应的ApiKeys,进行匹配
  • ApiKeys.forId(request.requestId) match {
  • ...
  • case ApiKeys.UPDATE_METADATA_KEY => handleUpdateMetadataRequest(request) // 更新元数据(由KafkaController要求Broker更新)
  • ...
  • }
  • ...
  • }

handleUpdateMetadataRequest(request: RequestChannel.Request)方法的源码如下:

  • // kafka.server.KafkaApis#handleUpdateMetadataRequest
  • // 处理更新Broker元数据的请求
  • def handleUpdateMetadataRequest(request: RequestChannel.Request) {
  • // 请求关联ID
  • val correlationId = request.header.correlationId
  • // 将请求转换为UpdateMetadataRequest
  • val updateMetadataRequest = request.body.asInstanceOf[UpdateMetadataRequest]
  • val updateMetadataResponse =
  • if (authorize(request.session, ClusterAction, Resource.ClusterResource)) { // 检查授权
  • // 使用ReplicaManager更新
  • replicaManager.maybeUpdateMetadataCache(correlationId, updateMetadataRequest, metadataCache)
  • // 构造错误码为NONE的响应
  • new UpdateMetadataResponse(Errors.NONE.code)
  • } else {
  • // 未授权,直接返回错误码为CLUSTER_AUTHORIZATION_FAILED的响应
  • new UpdateMetadataResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.code)
  • }
  • // 构造响应头
  • val responseHeader = new ResponseHeader(correlationId)
  • // 使用RequestChannel发送请求
  • requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, responseHeader, updateMetadataResponse)))
  • }

可以发现,最终解析出来UpdateMetadataRequest对象会交给ReplicaManager的maybeUpdateMetadataCache(...)方法进行处理,源码如下:

  • // kafka.server.ReplicaManager#maybeUpdateMetadataCache
  • // 可能需要更新Broker的元数据
  • def maybeUpdateMetadataCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest, metadataCache: MetadataCache) {
  • replicaStateChangeLock synchronized { // 加锁
  • if(updateMetadataRequest.controllerEpoch < controllerEpoch) { // 检查Controller年代信息
  • val stateControllerEpochErrorMessage = ("Broker %d received update metadata request with correlation id %d from an " +
  • "old controller %d with epoch %d. Latest known controller epoch is %d").format(localBrokerId,
  • correlationId, updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch,
  • controllerEpoch)
  • stateChangeLogger.warn(stateControllerEpochErrorMessage)
  • // 抛出年代信息过期的异常
  • throw new ControllerMovedException(stateControllerEpochErrorMessage)
  • } else {
  • // 使用MetadataCache更新
  • metadataCache.updateCache(correlationId, updateMetadataRequest)
  • // 更新Controller年代信息
  • controllerEpoch = updateMetadataRequest.controllerEpoch
  • }
  • }
  • }

ReplicaManager的maybeUpdateMetadataCache(...)方法只是做了一些年代信息的验证,最终该请求还是传达给了MetadataCache对象自己的updateCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest),由它自行处理:

  • // kafka.server.MetadataCache#updateCache
  • // 更新缓存的元数据信息
  • def updateCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest) {
  • inWriteLock(partitionMetadataLock) { // 加锁
  • // 获取请求中指定的Controller ID
  • controllerId = updateMetadataRequest.controllerId match {
  • case id if id < 0 => None
  • case id => Some(id)
  • }
  • // 先清除aliveNodes和aliveBrokers的信息
  • aliveNodes.clear()
  • aliveBrokers.clear()
  • // 遍历请求中的liveBrokers
  • updateMetadataRequest.liveBrokers.asScala.foreach { broker =>
  • val nodes = new EnumMap[SecurityProtocol, Node](classOf[SecurityProtocol])
  • val endPoints = new EnumMap[SecurityProtocol, EndPoint](classOf[SecurityProtocol])
  • // 遍历broker得到更新的EndPoint和Node信息
  • broker.endPoints.asScala.foreach { case (protocol, ep) =>
  • endPoints.put(protocol, EndPoint(ep.host, ep.port, protocol))
  • nodes.put(protocol, new Node(broker.id, ep.host, ep.port))
  • }
  • // 更新aliveBrokers和aliveNodes
  • aliveBrokers(broker.id) = Broker(broker.id, endPoints.asScala, Option(broker.rack))
  • aliveNodes(broker.id) = nodes.asScala
  • }
  • // 根据UpdateMetadataRequest.partitionStates字段更新Cache集合
  • updateMetadataRequest.partitionStates.asScala.foreach { case (tp, info) =>
  • // 获取Controller ID
  • val controllerId = updateMetadataRequest.controllerId
  • // 获取Controller年代信息
  • val controllerEpoch = updateMetadataRequest.controllerEpoch
  • if (info.leader == LeaderAndIsr.LeaderDuringDelete) {
  • // 删除分区对应的PartitionStateInfo
  • removePartitionInfo(tp.topic, tp.partition)
  • stateChangeLogger.trace(s"Broker $brokerId deleted partition $tp from metadata cache in response to UpdateMetadata " +
  • s"request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId")
  • } else {
  • val partitionInfo = partitionStateToPartitionStateInfo(info)
  • // 更新PartitionStateInfo
  • addOrUpdatePartitionInfo(tp.topic, tp.partition, partitionInfo)
  • stateChangeLogger.trace(s"Broker $brokerId cached leader info $partitionInfo for partition $tp in response to " +
  • s"UpdateMetadata request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId")
  • }
  • }
  • }
  • }
  • // kafka.server.MetadataCache#addOrUpdatePartitionInfo
  • private def addOrUpdatePartitionInfo(topic: String,
  • partitionId: Int,
  • stateInfo: PartitionStateInfo) {
  • inWriteLock(partitionMetadataLock) {
  • val infos = cache.getOrElseUpdate(topic, mutable.Map())
  • infos(partitionId) = stateInfo
  • }
  • }

updateCache(...)方法会根据请求携带的数据更新MetadataCache的aliveBrokerscache等字段。

7.2. 客户端的更新请求

在使用Kafka提供的生产者和消费者的时候,首先需要获取Kafka集群的元数据信息以得知集群各种状态,这个操作是由KafkaProducer或KafkaConsumer实例通过向Kafka集群发送MetadataRequest请求来完成的,在KafkaApis中,MetadataRequest请求交由handleTopicMetadataRequest(request: RequestChannel.Request)方法处理:

  • // kafka.server.KafkaApis#handle
  • def handle(request: RequestChannel.Request) {
  • ...
  • // 根据requestId获取请求对应的ApiKeys,进行匹配
  • ApiKeys.forId(request.requestId) match {
  • ...
  • case ApiKeys.METADATA => handleTopicMetadataRequest(request) // 获取元数据(由生产者或消费者客户端向服务端获取集群元数据)
  • ...
  • }
  • ...
  • }

handleTopicMetadataRequest(request: RequestChannel.Request)方法的源码如下:

  • // kafka.server.KafkaApis#handleTopicMetadataRequest
  • /**
  • * Handle a topic metadata request
  • */
  • def handleTopicMetadataRequest(request: RequestChannel.Request) {
  • // 转换请求为MetadataRequest
  • val metadataRequest = request.body.asInstanceOf[MetadataRequest]
  • // 获取API版本
  • val requestVersion = request.header.apiVersion()
  • val topics =
  • // Handle old metadata request logic. Version 0 has no way to specify "no topics".
  • if (requestVersion == 0) {
  • // 请求的topics字段为空
  • if (metadataRequest.topics() == null || metadataRequest.topics().isEmpty)
  • // 则读取所有的Topic信息
  • metadataCache.getAllTopics()
  • else
  • // 否则从请求中得到需要获取信息的Topic
  • metadataRequest.topics.asScala.toSet
  • } else {
  • if (metadataRequest.isAllTopics) // 内部实现为topics == null,即所请求的topics字段为空
  • // 则读取所有的Topic信息
  • metadataCache.getAllTopics()
  • else
  • // 否则从请求中得到需要获取信息的Topic
  • metadataRequest.topics.asScala.toSet
  • }
  • // 授权验证,将主题分为授权主题和未授权主题
  • var (authorizedTopics, unauthorizedTopics) =
  • topics.partition(topic => authorize(request.session, Describe, new Resource(Topic, topic)))
  • if (authorizedTopics.nonEmpty) {
  • // 过滤出元数据中没有的已授权的主题
  • val nonExistingTopics = metadataCache.getNonExistingTopics(authorizedTopics)
  • // 如果开启了自动创建主题
  • if (config.autoCreateTopicsEnable && nonExistingTopics.nonEmpty) {
  • authorizer.foreach { az =>
  • // 验证授权,授权不通过就将这些主题从授权集合移动到未授权集合
  • if (!az.authorize(request.session, Create, Resource.ClusterResource)) {
  • authorizedTopics --= nonExistingTopics
  • unauthorizedTopics ++= nonExistingTopics
  • }
  • }
  • }
  • }
  • // 对未授权的主题统一构建带有TOPIC_AUTHORIZATION_FAILED错误码的TopicMetadata对象
  • val unauthorizedTopicMetadata = unauthorizedTopics.map(topic =>
  • new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, common.Topic.isInternal(topic),
  • java.util.Collections.emptyList()))
  • // In version 0, we returned an error when brokers with replicas were unavailable,
  • // while in higher versions we simply don't include the broker in the returned broker list
  • /**
  • * 根据请求版本决定是否要对副本不可用的broker返回错误
  • * 1. 在版本0中,副本不可用的broker会返回错误;
  • * 2. 高版本中仅仅会在返回的broker列表中去除副本不可用的broker,但不会返回错误
  • */
  • val errorUnavailableEndpoints = requestVersion == 0
  • val topicMetadata =
  • if (authorizedTopics.isEmpty)
  • // 如果已授权主题集合为空,返回空字典
  • Seq.empty[MetadataResponse.TopicMetadata]
  • else
  • // 否则获取已授权主题的元数据
  • getTopicMetadata(authorizedTopics, request.securityProtocol, errorUnavailableEndpoints)
  • val completeTopicMetadata = topicMetadata ++ unauthorizedTopicMetadata
  • // 获取可用broker
  • val brokers = metadataCache.getAliveBrokers
  • trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(completeTopicMetadata.mkString(","),
  • brokers.mkString(","), request.header.correlationId, request.header.clientId))
  • // 创建与Request关联的响应头
  • val responseHeader = new ResponseHeader(request.header.correlationId)
  • // 创建响应体
  • val responseBody = new MetadataResponse(
  • brokers.map(_.getNode(request.securityProtocol)).asJava,
  • metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID),
  • completeTopicMetadata.asJava,
  • requestVersion
  • )
  • // 使用RequestChannel返回响应
  • requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
  • }

handleTopicMetadataRequest(...)方法通过一系列的授权验证、主题查找来收集MetadataRequest请求对象中要求的主题的元数据信息,如果遇到未知的主题(MetadataCache中没有记录),会根据配置以及主题的名称决定是否自动创建,这部分操作都由KafkaApis的getTopicMetadata(...)方法完成:

  • // kafka.server.KafkaApis#getTopicMetadata
  • // 获取指定主题集合的元数据
  • private def getTopicMetadata(topics: Set[String], securityProtocol: SecurityProtocol, errorUnavailableEndpoints: Boolean): Seq[MetadataResponse.TopicMetadata] = {
  • // 通过MetadataCache获取主题元数据
  • val topicResponses = metadataCache.getTopicMetadata(topics, securityProtocol, errorUnavailableEndpoints)
  • if (topics.isEmpty || topicResponses.size == topics.size) {
  • topicResponses
  • } else {
  • val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet
  • // 处理未知的主题
  • val responsesForNonExistentTopics = nonExistentTopics.map { topic =>
  • if (topic == TopicConstants.GROUP_METADATA_TOPIC_NAME) {
  • // 创建Group组的元数据存放的主题__consumer_offsets
  • createGroupMetadataTopic()
  • } else if (config.autoCreateTopicsEnable) { // auto.create.topics.enable
  • // 创建其他未知主题
  • createTopic(topic, config.numPartitions, config.defaultReplicationFactor)
  • } else {
  • // 无法创建未知主题
  • new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, common.Topic.isInternal(topic),
  • java.util.Collections.emptyList())
  • }
  • }
  • topicResponses ++ responsesForNonExistentTopics
  • }
  • }
  • // kafka.server.KafkaApis#createGroupMetadataTopic
  • // 创建Group组的元数据存放的主题
  • private def createGroupMetadataTopic(): MetadataResponse.TopicMetadata = {
  • val aliveBrokers = metadataCache.getAliveBrokers
  • val offsetsTopicReplicationFactor =
  • if (aliveBrokers.nonEmpty)
  • Math.min(config.offsetsTopicReplicationFactor.toInt, aliveBrokers.length)
  • else
  • config.offsetsTopicReplicationFactor.toInt
  • // 创建__consumer_offsets主题
  • createTopic(TopicConstants.GROUP_METADATA_TOPIC_NAME, config.offsetsTopicPartitions,
  • offsetsTopicReplicationFactor, coordinator.offsetsTopicConfigs)
  • }
  • // kafka.server.KafkaApis#createTopic
  • // 创建主题
  • private def createTopic(topic: String,
  • numPartitions: Int,
  • replicationFactor: Int,
  • properties: Properties = new Properties()): MetadataResponse.TopicMetadata = {
  • try {
  • // 使用AdminUtils创建主题
  • AdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe)
  • info("Auto creation of topic %s with %d partitions and replication factor %d is successful"
  • .format(topic, numPartitions, replicationFactor))
  • new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, common.Topic.isInternal(topic),
  • java.util.Collections.emptyList())
  • } catch {
  • case e: TopicExistsException => // let it go, possibly another broker created this topic
  • new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, common.Topic.isInternal(topic),
  • java.util.Collections.emptyList())
  • case itex: InvalidTopicException =>
  • new MetadataResponse.TopicMetadata(Errors.INVALID_TOPIC_EXCEPTION, topic, common.Topic.isInternal(topic),
  • java.util.Collections.emptyList())
  • }
  • }

KafkaApis的getTopicMetadata(...)方法内部会调用MetadataCache的getTopicMetadata(...)方法,该方法源码如下:

  • // kafka.server.MetadataCache#getTopicMetadata
  • // errorUnavailableEndpoints exists to support v0 MetadataResponses
  • def getTopicMetadata(topics: Set[String], protocol: SecurityProtocol, errorUnavailableEndpoints: Boolean = false): Seq[MetadataResponse.TopicMetadata] = {
  • inReadLock(partitionMetadataLock) { // 加锁
  • topics.toSeq.flatMap { topic =>
  • // 获取元数据
  • getPartitionMetadata(topic, protocol, errorUnavailableEndpoints).map { partitionMetadata =>
  • // 将获取的数据构造为TopicMetadata对象
  • new MetadataResponse.TopicMetadata(Errors.NONE, topic, Topic.isInternal(topic), partitionMetadata.toBuffer.asJava)
  • }
  • }
  • }
  • }

其中getPartitionMetadata(...)方法的源码如下:

  • // kafka.server.MetadataCache#getPartitionMetadata
  • // errorUnavailableEndpoints exists to support v0 MetadataResponses
  • private def getPartitionMetadata(topic: String, protocol: SecurityProtocol, errorUnavailableEndpoints: Boolean): Option[Iterable[MetadataResponse.PartitionMetadata]] = {
  • // 获取主题对应的分区字典
  • cache.get(topic).map { partitions =>
  • // 遍历所有的分区
  • partitions.map { case (partitionId, partitionState) =>
  • // 根据主题和分区构造TopicAndPartition对象
  • val topicPartition = TopicAndPartition(topic, partitionId)
  • // 获取分区的Leader副本ID、Leader副本年代信息、ISR信息、Controller年代信息
  • val leaderAndIsr = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr
  • // 获取Leader副本所在的Node,其中记录了host、ip、port
  • val maybeLeader = getAliveEndpoint(leaderAndIsr.leader, protocol)
  • // 获取分区的AR集合
  • val replicas = partitionState.allReplicas
  • // 获取分区AR集合中可用的副本
  • val replicaInfo = getEndpoints(replicas, protocol, errorUnavailableEndpoints)
  • maybeLeader match {
  • case None => // 分区的Leader可能宕机了,返回错误码为LEADER_NOT_AVAILABLE
  • debug(s"Error while fetching metadata for $topicPartition: leader not available")
  • new MetadataResponse.PartitionMetadata(Errors.LEADER_NOT_AVAILABLE, partitionId, Node.noNode(),
  • replicaInfo.asJava, java.util.Collections.emptyList())
  • case Some(leader) => // Leader副本存活
  • // 获取分区的ISR集合
  • val isr = leaderAndIsr.isr
  • // 获取ISR集合中可用的副本
  • val isrInfo = getEndpoints(isr, protocol, errorUnavailableEndpoints)
  • // 检测AR集合中的副本是否都是可用的
  • if (replicaInfo.size < replicas.size) {
  • debug(s"Error while fetching metadata for $topicPartition: replica information not available for " +
  • s"following brokers ${replicas.filterNot(replicaInfo.map(_.id).contains).mkString(",")}")
  • new MetadataResponse.PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE, partitionId, leader,
  • replicaInfo.asJava, isrInfo.asJava)
  • } else if (isrInfo.size < isr.size) { // 检测ISR集合中的副本是否都是可用的
  • debug(s"Error while fetching metadata for $topicPartition: in sync replica information not available for " +
  • s"following brokers ${isr.filterNot(isrInfo.map(_.id).contains).mkString(",")}")
  • new MetadataResponse.PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE, partitionId, leader,
  • replicaInfo.asJava, isrInfo.asJava)
  • } else { // AR和ISR集合的副本都是可用的
  • new MetadataResponse.PartitionMetadata(Errors.NONE, partitionId, leader, replicaInfo.asJava,
  • isrInfo.asJava)
  • }
  • }
  • }
  • }
  • }