大数据
流式处理
Kafka
源码解析

Kafka系列 16 - 服务端源码分析 07:副本机制介绍

简介:讲解副本和分区相关内容

1. 副本机制简介

Kafka从0.8版本开始引入副本(Replica)机制,以增加Kafka集群的高可用性;每个分区(Partition)可以有多个副本,并且会从其副本集合(Assigned Replicas,AR)中选出一个副本作为Leader副本,所有的读写请求都由选举出的Leader副本处理,剩余的副本都作为Follower副本,Follower副本会从Leader副本获取消息并更新到自己的Log中,作为Leader副本的热备份;Leader副本以及所有与Leader副本保持一定程度同步状态的Follower副本一起组成了可用副本集(In-Sync Replicas,ISR)。同一分区的多个副本会被均匀地分配到集群中的不同Broker上,当Leader副本的所在的Broker出现故障后,会从剩余的可用副本集中重新选举新的Leader副本继续对外提供服务。

2. 副本和分区

Kafka使用Replica类来描述一个副本,使用Partition来描述一个分区,由于一个分区可以拥有多个副本,因此Partition实例和Replica实例的关系应该是一对多的关系。如图所示:

1.Partition角色和Replica角色的关系.png

注:在上图中描述的是实际情况下分区和副本两种角色在Broker节点上可能的分配情况,并不代表Partition对象和Replica对象在Broker节点上的分配情况;实际上,Partition和Replica的对象实例是分布于多个Broker节点上,正因为如此,副本其实有“本地副本”和“远程副本”的区别,当Replica实例和对应的副本日志文件都存在于同一个Broker上时,那么该副本就是“本地副本”,对于Replica对象实例与对应的副本日志文件存放于不同的Broker节点上的副本则称作“远程副本”,这个在后面会解释。

2.1. Replica类

我们先来看看Replica类,它的定义及重要字段如下:

  • // kafka.cluster.Replica
  • /**
  • * @param brokerId 标识该副本所在的Broker的id
  • * @param partition 此副本对应的分区
  • * @param time
  • * @param initialHighWatermarkValue
  • * @param log 本地副本对应的Log对象,远程副本的该字段为空,可以通过这个字段区分本地副本和远程副本
  • */
  • class Replica(val brokerId: Int,
  • val partition: Partition,
  • time: Time = SystemTime,
  • initialHighWatermarkValue: Long = 0L,
  • val log: Option[Log] = None) extends Logging {
  • // the high watermark offset value, in non-leader replicas only its message offsets are kept
  • // 用于记录HighWatermark值,由Leader负责更新和维护
  • @volatile private[this] var highWatermarkMetadata: LogOffsetMetadata = new LogOffsetMetadata(initialHighWatermarkValue)
  • // the log end offset value, kept in all replicas;
  • // for local replica it is the log's end offset, for remote replicas its value is only updated by follower fetch
  • /**
  • * 对于本地副本,此字段记录的是追加到Log中的最新消息的offset,可以直接从Log.nextOffsetMetadata字段中获取;
  • * 对于远程副本,此字段含义相同,但是由其他Broker发送请求来更新此值,并不能直接从本地获取到
  • */
  • @volatile private[this] var logEndOffsetMetadata: LogOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata
  • // 副本所属的主题
  • val topic = partition.topic
  • // 副本所属的分区的ID
  • val partitionId = partition.partitionId
  • // 是否是本地副本,根据log字段是否为null来判断
  • def isLocal: Boolean = {
  • log match {
  • case Some(l) => true
  • case None => false
  • }
  • }
  • // 记录Follower副本最后一次追赶上Leader的时间戳
  • private[this] val lastCaughtUpTimeMsUnderlying = new AtomicLong(time.milliseconds)
  • ...
  • }

在这里需要注意的是,通过isLocal: Boolean方法可知,Replica有“本地副本”和“远程副本”的区别,“本地副本”是指副本对应的Log分配在当前的Broker上,“远程副本”则是指副本对应的Log分配在其他的Broker上,在当前Broker上仅仅维护了副本的HighWatermark、LogEndOffset等信息。

Replica的定义中指明了副本所在的Broker节点的ID、副本所属的分区以及其对应用于描述日志文件的Log对象等参数,在其内部还定义了highWatermarkMetadatalogEndOffsetMetadata两个用于描述副本的HighWatermark及LogEndOffset量的字段,其中highWatermarkMetadata字段只会在Leader副本中进行维护,而所在的副本都需要维护自己的logEndOffsetMetadata字段。

Replica类提供了一些方法用于读写内部定义的私有字段,源码如下:

  • // 读取lastCaughtUpTimeMsUnderlying
  • def lastCaughtUpTimeMs = lastCaughtUpTimeMsUnderlying.get()
  • def updateLogReadResult(logReadResult : LogReadResult) {
  • // 更新LEO
  • logEndOffset = logReadResult.info.fetchOffsetMetadata
  • /* If the request read up to the log end offset snapshot when the read was initiated,
  • * set the lastCaughtUpTimeMsUnderlying to the current time.
  • * This means that the replica is fully caught up.
  • * 如果此时Follower副本的同步进度完全追赶上Leader副本,则更新lastCaughtUpTimeMsUnderlying为当前时间
  • */
  • if(logReadResult.isReadFromLogEnd) {
  • lastCaughtUpTimeMsUnderlying.set(time.milliseconds)
  • }
  • }
  • // _=即logEndOffset的setter方法,该方法表示更新logEndOffsetMetadata
  • private def logEndOffset_=(newLogEndOffset: LogOffsetMetadata) {
  • if (isLocal) {
  • // 本地副本不能直接更新LEO,由Log.logEndOffsetMetadata字段决定
  • throw new KafkaException("Should not set log end offset on partition [%s,%d]'s local replica %d".format(topic, partitionId, brokerId))
  • } else {
  • // 远程副本LEO是通过请求进行更新的
  • logEndOffsetMetadata = newLogEndOffset
  • trace("Setting log end offset for replica %d for partition [%s,%d] to [%s]"
  • .format(brokerId, topic, partitionId, logEndOffsetMetadata))
  • }
  • }
  • // 获取LEO
  • def logEndOffset =
  • if (isLocal)
  • // 本地副本由Log.logEndOffsetMetadata字段获取
  • log.get.logEndOffsetMetadata
  • else
  • // 远程副本通过logEndOffsetMetadata获取
  • logEndOffsetMetadata
  • // 用于更新highWatermarkMetadata
  • def highWatermark_=(newHighWatermark: LogOffsetMetadata) {
  • if (isLocal) {
  • // 只有本地副本可以更新HW
  • highWatermarkMetadata = newHighWatermark
  • trace("Setting high watermark for replica %d partition [%s,%d] on broker %d to [%s]"
  • .format(brokerId, topic, partitionId, brokerId, newHighWatermark))
  • } else {
  • throw new KafkaException("Should not set high watermark on partition [%s,%d]'s non-local replica %d".format(topic, partitionId, brokerId))
  • }
  • }
  • // 获取highWatermarkMetadata
  • def highWatermark = highWatermarkMetadata
  • // 从本地Log中更新highWatermarkMetadata
  • def convertHWToLocalOffsetMetadata() = {
  • if (isLocal) {
  • // 如果是本地副本,由Log对象负责更新
  • highWatermarkMetadata = log.get.convertToOffsetMetadata(highWatermarkMetadata.messageOffset)
  • } else {
  • // 否则抛出异常
  • throw new KafkaException("Should not construct complete high watermark on partition [%s,%d]'s non-local replica %d".format(topic, partitionId, brokerId))
  • }
  • }

2.2. Partition类

从上面的Replica可以得知,Kafka服务端使用Partition表示分区;Partition负责管理每个副本对应的Replica对象,它的定义与重要字段如下:

  • /**
  • * Data structure that represents a topic partition. The leader maintains the AR, ISR, CUR, RAR
  • * @param topic 代表的主题
  • * @param partitionId 代表的分区编号
  • * @param time
  • * @param replicaManager
  • */
  • class Partition(val topic: String,
  • val partitionId: Int,
  • time: Time,
  • replicaManager: ReplicaManager) extends Logging with KafkaMetricsGroup {
  • // 该分区所在的brokerId
  • private val localBrokerId = replicaManager.config.brokerId
  • // 当前broker上的LogManager对象
  • private val logManager = replicaManager.logManager
  • // 操作Zookeeper的辅助类
  • private val zkUtils = replicaManager.zkUtils
  • // 维护该分区的全部副本的集合信息(AR集合)
  • private val assignedReplicaMap = new Pool[Int, Replica]
  • // The read lock is only required when multiple reads are executed and needs to be in a consistent manner
  • // 更新ISR使用的读写锁
  • private val leaderIsrUpdateLock = new ReentrantReadWriteLock()
  • // Zookeeper中的版本
  • private var zkVersion: Int = LeaderAndIsr.initialZKVersion
  • // Leader副本的年代信息
  • @volatile private var leaderEpoch: Int = LeaderAndIsr.initialLeaderEpoch - 1
  • // 该分区的Leader副本的ID
  • @volatile var leaderReplicaIdOpt: Option[Int] = None
  • // 维护该分区的ISR集合,ISR是AR的子集
  • @volatile var inSyncReplicas: Set[Replica] = Set.empty[Replica]
  • /* Epoch of the controller that last changed the leader. This needs to be initialized correctly upon broker startup.
  • * One way of doing that is through the controller's start replica state change command. When a new broker starts up
  • * the controller sends it a start replica command containing the leader for each partition that the broker hosts.
  • * In addition to the leader, the controller can also send the epoch of the controller that elected the leader for
  • * each partition. */
  • private var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
  • this.logIdent = "Partition [%s,%d] on broker %d: ".format(topic, partitionId, localBrokerId)
  • // 根据replicaId判断指定副本是否是本地副本,replicaId与brokerId相同说明是本地副本
  • private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId)
  • val tags = Map("topic" -> topic, "partition" -> partitionId.toString)
  • ...
  • }

在上面的定义中可以得知,一个Partition对象必然对应一个主题及分区,Partition的参数中使用topicpartitionId来记录这两个信息;AR副本集和ISR副本集分别对应于assignedReplicaMapinSyncReplicas两个字段,其中assignedReplicaMap字段是一个Pool[Int, Replica]的池对象,键为副本的ID,值为描述具体副本的Replica对象,inSyncReplicas是一个Set.empty[Replica]类型的集合,内部只记录了ISR副本集内描述具体副本的Replica对象;leaderReplicaIdOpt记录了分区Leader副本的ID。

由于Partition维护了AR和ISR信息,因此它的大部分工作都是与这两个信息打交道;同时它还会负责分区副本的管理,例如副本的创建、获取、角色切换、数据写入、副本同步情况检测等工作。

2.2.1. AR和ISR相关

Partition类中提供了对AR副本的一些基础操作,例如添加、获取和移除副本对象等:

  • // kafka.cluster.Partition
  • // 当前分区是否使用了副本
  • def isUnderReplicated(): Boolean = {
  • leaderReplicaIfLocal() match {
  • case Some(_) => // 获取到了Leader副本
  • // 因为Leader副本存在,如果此时ISR小于AR说明至少存在两个副本,即当前分区使用了副本
  • inSyncReplicas.size < assignedReplicas.size
  • case None =>
  • false
  • }
  • }
  • // 将Replica添加到assignedReplicaMap集合,键为Replica的brokerId
  • def addReplicaIfNotExists(replica: Replica) = {
  • assignedReplicaMap.putIfNotExists(replica.brokerId, replica)
  • }
  • // 获取AR集合中的所有Replica对象
  • def assignedReplicas(): Set[Replica] = {
  • assignedReplicaMap.values.toSet
  • }
  • // 根据replicaId从AR集合中移除指定的Replica对象
  • def removeReplica(replicaId: Int) {
  • assignedReplicaMap.remove(replicaId)
  • }

对于ISR集合,Partition提供了maybeExpandIsr(replicaId: Int)maybeShrinkIsr(replicaMaxLagTimeMs: Long)两个方法用于扩充或缩减集合内的副本;其中maybeExpandIsr(replicaId: Int)在前面讲解定时任务的文章中提到过,它的源码比较简单,如下:

  • // kafka.cluster.Partition#maybeExpandIsr
  • /**
  • * Check and maybe expand the ISR of the partition.
  • *
  • * This function can be triggered when a replica's LEO has incremented
  • * ISR集合扩充
  • */
  • def maybeExpandIsr(replicaId: Int) {
  • val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) {
  • // check if this replica needs to be added to the ISR
  • // 检测当前Replica是否应该被添加到In-Sync集合中
  • leaderReplicaIfLocal() match {
  • case Some(leaderReplica) =>
  • // 根据replicaId获取Replica对象
  • val replica = getReplica(replicaId).get
  • // 查看Leader副本的HighWatermark
  • val leaderHW = leaderReplica.highWatermark
  • /**
  • * 同时满足以下三个条件则可以将当前副本添加到In-Sync集合中:
  • * 1. 当前In-Sync集合不包含当前Replica副本;
  • * 2. 当前副本是assignedReplicas副本集合(AR集合)之一;
  • * 3. 当前副本的LogEndOffset大于等于Leader副本的HighWatermark;
  • */
  • if(!inSyncReplicas.contains(replica) &&
  • assignedReplicas.map(_.brokerId).contains(replicaId) &&
  • replica.logEndOffset.offsetDiff(leaderHW) >= 0) {
  • // 将当前副本添加到In-Sync集合
  • val newInSyncReplicas = inSyncReplicas + replica
  • info("Expanding ISR for partition [%s,%d] from %s to %s"
  • .format(topic, partitionId, inSyncReplicas.map(_.brokerId).mkString(","),
  • newInSyncReplicas.map(_.brokerId).mkString(",")))
  • // update ISR in ZK and cache
  • // 更新缓存及Zookeeper中的In-Sync集合数据
  • updateIsr(newInSyncReplicas)
  • replicaManager.isrExpandRate.mark()
  • }
  • // check if the HW of the partition can now be incremented
  • // since the replica maybe now be in the ISR and its LEO has just incremented
  • // 检测分区的HighWatermark是否可以更新了,如果更新了,该方法会返回true
  • maybeIncrementLeaderHW(leaderReplica)
  • case None => false // nothing to do if no longer leader
  • }
  • }
  • // some delayed operations may be unblocked after HW changed
  • if (leaderHWIncremented)
  • // 如果分区的HighWatermark发生了更新,尝试完成时间轮中的DelayedProduce和DelayedFetch延迟操作
  • tryCompleteDelayedRequests()
  • }

该方法会根据传入的replicaId参数,获取对应的Replica副本,当同时满足以下三个条件则可以将该副本添加到In-Sync集合中:

  1. 当前In-Sync集合不包含当前Replica副本;
  2. 当前副本是assignedReplicas副本集合(AR集合)之一;
  3. 当前副本的LogEndOffset大于等于Leader副本的HighWatermark。

相对的,maybeShrinkIsr(replicaMaxLagTimeMs: Long)方法的实现就有一些不一样了:

  • // kafka.cluster.Partition#maybeShrinkIsr
  • // ISR集合缩减
  • def maybeShrinkIsr(replicaMaxLagTimeMs: Long) {
  • // 加锁
  • val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) {
  • // 获取Leader副本对应的Replica对象
  • leaderReplicaIfLocal() match {
  • case Some(leaderReplica) =>
  • // 找出滞后的Follower副本的集合
  • val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs)
  • if(outOfSyncReplicas.size > 0) {
  • // 从In-Sync副本集中减去滞后的副本
  • val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas
  • assert(newInSyncReplicas.size > 0)
  • info("Shrinking ISR for partition [%s,%d] from %s to %s".format(topic, partitionId,
  • inSyncReplicas.map(_.brokerId).mkString(","), newInSyncReplicas.map(_.brokerId).mkString(",")))
  • // update ISR in zk and in cache
  • // 更新缓存及Zookeeper中的In-Sync信息
  • updateIsr(newInSyncReplicas)
  • // we may need to increment high watermark since ISR could be down to 1
  • replicaManager.isrShrinkRate.mark()
  • // 因为更新了ISR,尝试更新Leader的HighWatermark
  • maybeIncrementLeaderHW(leaderReplica)
  • } else {
  • false
  • }
  • case None => false // do nothing if no longer leader
  • }
  • }
  • // some delayed operations may be unblocked after HW changed
  • // 如果HighWatermark发生了更新,尝试完成时间轮中的DelayedProduce和DelayedFetch延迟操作
  • if (leaderHWIncremented)
  • tryCompleteDelayedRequests()
  • }
  • // kafka.cluster.Partition#getOutOfSyncReplicas
  • // 获取同步状态滞后的副本集合
  • def getOutOfSyncReplicas(leaderReplica: Replica, maxLagMs: Long): Set[Replica] = {
  • /**
  • * there are two cases that will be handled here -
  • * 1. Stuck followers: If the leo of the replica hasn't been updated for maxLagMs ms,
  • * the follower is stuck and should be removed from the ISR
  • * 该副本的LEO长时间没有更新,表示该副本可能卡住了(Stuck)
  • * 2. Slow followers: If the replica has not read up to the leo within the last maxLagMs ms,
  • * then the follower is lagging and should be removed from the ISR
  • * 该副本长时间没有读取到LEO的数据,表示该副本处于滞后状态
  • * Both these cases are handled by checking the lastCaughtUpTimeMs which represents
  • * the last time when the replica was fully caught up. If either of the above conditions
  • * is violated, that replica is considered to be out of sync
  • *
  • **/
  • // Leader副本的LEO
  • val leaderLogEndOffset = leaderReplica.logEndOffset
  • // Follower副本集合(即除开Leader副本)
  • val candidateReplicas = inSyncReplicas - leaderReplica
  • // 从Follower副本集合中找出滞后的副本,通过当前时间距副本的lastCaughtUpTimeMs时间差与maxLagMs进行比较
  • val laggingReplicas = candidateReplicas.filter(r => (time.milliseconds - r.lastCaughtUpTimeMs) > maxLagMs)
  • if(laggingReplicas.size > 0)
  • debug("Lagging replicas for partition %s are %s".format(TopicAndPartition(topic, partitionId), laggingReplicas.map(_.brokerId).mkString(",")))
  • laggingReplicas
  • }

该方法接收的replicaMaxLagTimeMs参数指定了Follower副本同步状态落后于Leader副本的最大时间,然后通过getOutOfSyncReplicas(leaderReplica: Replica, maxLagMs: Long): Set[Replica]方法从除Leader副本以外的副本集中进行筛选,根据副本的lastCaughtUpTimeMs时间来计算副本的同步状态是否滞后,最后从AR副本集记录字段inSyncReplicas中减去滞后的副本即可。

扩充和缩减ISR集合的两个方法都有一个共同点,它们都调用了updateIsr(newInSyncReplicas)更新缓存及Zookeeper中的In-Sync信息,调用maybeIncrementLeaderHW(leaderReplica)尝试更新分区的HighWatermark线,同时如果分区的HighWatermark线发生的改变,还会调用tryCompleteDelayedRequests()尝试完成时间轮中的DelayedProduce和DelayedFetch延迟操作;两个方法中对着三个操作的调用方式几乎是一致的。我们先观察updateIsr(newIsr: Set[Replica])的源码:

  • // kafka.cluster.Partition#updateIsr
  • // 更新Zookeeper和缓存中的In-Sync集合记录
  • private def updateIsr(newIsr: Set[Replica]) {
  • val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion)
  • val (updateSucceeded,newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partitionId,
  • newLeaderAndIsr, controllerEpoch, zkVersion)
  • if(updateSucceeded) {
  • // 使用ReplicaManager记录ISR的更新
  • replicaManager.recordIsrChange(new TopicAndPartition(topic, partitionId))
  • // 更新记录的ISR
  • inSyncReplicas = newIsr
  • zkVersion = newVersion
  • trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newIsr.mkString(","), zkVersion))
  • } else {
  • info("Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR".format(zkVersion))
  • }
  • }

可以发现,该方法主要是通过ReplicationUtils的updateLeaderAndIsr(...)更新缓存及Zookeeper中的ISR数据,当更新成功后,会使用ReplicaManager的recordIsrChange(topicAndPartition: TopicAndPartition)将发生ISR更新的分区进行记录;具体流程在后面的文章中会介绍。

更新了ISR集合信息之后,会调用maybeIncrementLeaderHW(leaderReplica)尝试更新分区的HighWatermark,该方法内部是根据分区中所有副本最小的LogEndOffset来决定HighWatermark值的,同时该方法的返回值标识了HighWatermark是否更新,源码如下:

  • // kafka.cluster.Partition#maybeIncrementLeaderHW
  • /**
  • * Check and maybe increment the high watermark of the partition;
  • * this function can be triggered when
  • *
  • * 1. Partition ISR changed
  • * 2. Any replica's LEO changed
  • *
  • * Returns true if the HW was incremented, and false otherwise.
  • * Note There is no need to acquire the leaderIsrUpdate lock here
  • * since all callers of this private API acquire that lock
  • *
  • * 在分区的ISR发生变化,或任何副本的LEO发生变化时会触发该方法更新HW
  • * 当HW发生更新时返回true,否则返回false
  • */
  • private def maybeIncrementLeaderHW(leaderReplica: Replica): Boolean = {
  • // 获取In-Sync副本的LEO
  • val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset)
  • // 根据所有In_Sync副本的LEO来计算HighWatermark,即取最小的LEO为HighWatermark
  • val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering)
  • // 旧的HighWatermark
  • val oldHighWatermark = leaderReplica.highWatermark
  • // 新的HighWatermark比旧的HighWatermark大,或者新的HighWatermark还处于该LogSegment上
  • if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset || oldHighWatermark.onOlderSegment(newHighWatermark)) {
  • // 更新HighWatermark
  • leaderReplica.highWatermark = newHighWatermark
  • debug("High watermark for partition [%s,%d] updated to %s".format(topic, partitionId, newHighWatermark))
  • true
  • } else {
  • debug("Skipping update high watermark since Old hw %s is larger than new hw %s for partition [%s,%d]. All leo's are %s"
  • .format(oldHighWatermark, newHighWatermark, topic, partitionId, allLogEndOffsets.mkString(",")))
  • false
  • }
  • }

如果HighWatermark发生了更新,会调用tryCompleteDelayedRequests()尝试执行DelayedProduce和DelayedFetch延时任务:

  • // kafka.cluster.Partition#tryCompleteDelayedRequests
  • /**
  • * Try to complete any pending requests. This should be called without holding the leaderIsrUpdateLock.
  • */
  • private def tryCompleteDelayedRequests() {
  • val requestKey = new TopicPartitionOperationKey(this.topic, this.partitionId)
  • // 尝试完成消息拉取请求
  • replicaManager.tryCompleteDelayedFetch(requestKey)
  • // 尝试完成消息写入请求
  • replicaManager.tryCompleteDelayedProduce(requestKey)
  • }

2.2.2. 分区副本的获取和创建

Partition的getOrCreateReplica(replicaId: Int = localBrokerId): Replica用于根据副本ID获取对应的Replica副本对象,如果不存在就创建副本,源码如下:

  • // kafka.cluster.Partition#getOrCreateReplica
  • // 获取或创建Replica
  • def getOrCreateReplica(replicaId: Int = localBrokerId): Replica = {
  • // 根据replicaId从assignedReplicaMap中查找Replica对象
  • val replicaOpt = getReplica(replicaId)
  • replicaOpt match {
  • // 查找到了,直接返回
  • case Some(replica) => replica
  • case None =>
  • // 未查找到,将创建
  • if (isReplicaLocal(replicaId)) { // 判断是否为Local Replica
  • // 获取配置信息,Zookeeper中的配置会覆盖默认的配置
  • val config = LogConfig.fromProps(logManager.defaultConfig.originals, // 默认配置
  • // 从Zookeeper中获取
  • AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic))
  • // 使用LogManager创建对应的Log,如果对应的Log已存在会直接返回
  • val log = logManager.createLog(TopicAndPartition(topic, partitionId), config)
  • // 获取HighWatermark Checkpoint,该信息存储在每个数据目录下的replication-offset-checkpoint文件中
  • val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath)
  • // 读取replication-offset-checkpoint内的数据
  • val offsetMap = checkpoint.read
  • // 该Checkpoint数据中没有当前Partition的HighWatermark信息
  • if (!offsetMap.contains(TopicAndPartition(topic, partitionId)))
  • info("No checkpointed highwatermark is found for partition [%s,%d]".format(topic, partitionId))
  • // 获取对应的HighWatermark,并且与LEO进行比较,最小的那个作为此副本的HighWatermark
  • val offset = offsetMap.getOrElse(TopicAndPartition(topic, partitionId), 0L).min(log.logEndOffset)
  • // 创建Replica对象,并添加到assignedReplicaMap集合中管理
  • val localReplica = new Replica(replicaId, this, time, offset, Some(log))
  • addReplicaIfNotExists(localReplica)
  • } else {
  • // 是远程副本,直接创建Replica对象并添加到AR集合中,此Replica的log为null
  • val remoteReplica = new Replica(replicaId, this, time)
  • addReplicaIfNotExists(remoteReplica)
  • }
  • // 返回对应的Replica
  • getReplica(replicaId).get
  • }
  • }

对于这个方法的实现,需要注意以下几点:

  1. 如果在AR集合中没有找到replicaId对应的Replica对象,就尝试创建。
  2. 在创建对应的Replica对象时,需要区别对待“本地副本”和“远程副本”;本地副本需要构建对应的日志系统描述,远程副本则只需要记录replicaId、所属Partition对象即可。
  3. 在创建“本地副本”时,会使用LogManager来根据配置创建对应的Log对象(如果已经存在将直接返回)。
  4. “本地副本”对应的Log对象创建好后,会尝试从对应的数据目录中读取HighWatermark Checkpoint文件中的HighWatermark信息,然后与Log的LogEndOffset进行比较,取其中的极小值作为该副本的初始HighWatermark。
  5. 最终创建出来的Replica副本会添加到AR集合assignedReplicaMap中。

注:这里用到的HighWatermark Checkpoint文件与前面讲解的日志的Recovery Checkpoint文件类似,它记录了每个分区的HighWatermark信息,后面会详细介绍。

2.2.3. 副本角色切换

Partition提供了makeLeader(controllerId: Int, partitionStateInfo: PartitionState, correlationId: Int): BooleanmakeFollower(controllerId: Int, partitionStateInfo: PartitionState, correlationId: Int): Boolean用于切换副本的Leader/Follower角色,这两个方法会在Broker根据KafkaController发送的LeaderAndISRRequest请求时被调用,关于该请求在后面会详细介绍,这里主要先讲解两个方法的实现。makeLeader(...)方法用于将“本地副本”切换为Leader副本,它的源码如下:

  • // kafka.cluster.Partition#makeLeader
  • /**
  • * Make the local replica the leader by resetting LogEndOffset for remote replicas (there could be old LogEndOffset
  • * from the time when this broker was the leader last time) and setting the new leader and ISR.
  • * If the leader replica id does not change, return false to indicate the replica manager.
  • * 切换为Leader副本角色
  • */
  • def makeLeader(controllerId: Int, partitionStateInfo: PartitionState, correlationId: Int): Boolean = {
  • val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {
  • // 获取需要分配的AR集合
  • val allReplicas = partitionStateInfo.replicas.asScala.map(_.toInt)
  • // record the epoch of the controller that made the leadership decision. This is useful while updating the isr
  • // to maintain the decision maker controller's epoch in the zookeeper path
  • // 记录Controller的年代信息
  • controllerEpoch = partitionStateInfo.controllerEpoch
  • // add replicas that are new
  • // 1. 创建AR集合中所有副本对应的Replica对象(如果不存在该Replica)
  • allReplicas.foreach(replica => getOrCreateReplica(replica))
  • // 2. 获取ISR集合
  • val newInSyncReplicas = partitionStateInfo.isr.asScala.map(r => getOrCreateReplica(r)).toSet
  • // remove assigned replicas that have been removed by the controller
  • // 3. 根据allReplicas更新assignedReplicas集合
  • (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_))
  • // 4. 更新Partition中的相关字段
  • inSyncReplicas = newInSyncReplicas // 更新ISR
  • leaderEpoch = partitionStateInfo.leaderEpoch // 更新Leader的年代信息
  • zkVersion = partitionStateInfo.zkVersion // 更新Zookeeper中的版本信息
  • // 5. 检测Leader是否发生变化
  • val isNewLeader =
  • if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == localBrokerId) {
  • // Leader所在的broker没有发生变化
  • false
  • } else {
  • // Leader之前并不在此broker上,表示Leader发生了变化,更新leaderReplicaIdOpt
  • leaderReplicaIdOpt = Some(localBrokerId)
  • true
  • }
  • // 获取Leader副本
  • val leaderReplica = getReplica().get
  • // we may need to increment high watermark since ISR could be down to 1
  • if (isNewLeader) {
  • /**
  • * 6. 初始化Leader的highWatermarkMetadata
  • * 如果Leader副本发生了迁移,则表示Leader副本通过上面的步骤刚刚分配到此broker上
  • * 可能是刚启动,也可能是Follower副本成为Leader副本,此时需要更新leaderReplica的highWatermarkMetadata
  • * 由leaderReplica的convertHWToLocalOffsetMetadata()方法完成
  • */
  • // construct the high watermark metadata for the new leader replica
  • leaderReplica.convertHWToLocalOffsetMetadata()
  • // reset log end offset for remote replicas
  • // 7. 重置所有远程副本的LEO为-1
  • assignedReplicas.filter(_.brokerId != localBrokerId).foreach(_.updateLogReadResult(LogReadResult.UnknownLogReadResult))
  • }
  • // 8. 尝试更新HighWatermark
  • (maybeIncrementLeaderHW(leaderReplica), isNewLeader)
  • }
  • // some delayed operations may be unblocked after HW changed
  • if (leaderHWIncremented)
  • // 如果HighWatermark被更新了,尝试执行DelayedFetch和DelayedProduce延迟任务
  • tryCompleteDelayedRequests()
  • // 返回结果表示是否发生Leader迁移
  • isNewLeader
  • }

需要注意的是该方法的三个参数:第一个参数和第三个参数表明了KafkaController的ID以及用于关联请求Request对象的correlationId,第二个参数比较重要,它是一个PartitionState对象,定义如下:

  • public class PartitionState {
  • // Controller的年代信息
  • public final int controllerEpoch;
  • // Leader副本的ID,实际上是Leader所在的broker的ID
  • public final int leader;
  • public final int leaderEpoch;
  • // ISR集合,其中保存的是ISR集合中副本所在的broker的ID
  • public final List<Integer> isr;
  • public final int zkVersion;
  • // AR集合,其中保存的是AR集合中副本所在的broker的ID
  • public final Set<Integer> replicas;
  • ...
  • }

该参数是通过上层的方法从LeaderAndISRRequest请求中解析并构建而得到的,其中记录了新的Leader副本的ID、AR集合、ISR集合等信息,makeLeader(...)方法就是通过该参数进行上下文检查,更新分区的AR、ISR集合、Leader副本,然后通过检查Leader副本的ID是否发生改变来判断是否发生了Leader角色切换,一旦Leader角色发生切换,就需要根据情况更新HighWatermark线,该操作由Leader副本对象的convertHWToLocalOffsetMetadata()方法完成:

  • // kafka.cluster.Replica#convertHWToLocalOffsetMetadata
  • def convertHWToLocalOffsetMetadata() = {
  • if (isLocal) {
  • // 如果是本地副本,由Log对象负责更新
  • highWatermarkMetadata = log.get.convertToOffsetMetadata(highWatermarkMetadata.messageOffset)
  • } else {
  • // 否则抛出异常
  • throw new KafkaException("Should not construct complete high watermark on partition [%s,%d]'s non-local replica %d".format(topic, partitionId, brokerId))
  • }
  • }
  • // kafka.log.Log#convertToOffsetMetadata
  • /**
  • * Given a message offset, find its corresponding offset metadata in the log.
  • * If the message offset is out of range, return unknown offset metadata
  • */
  • def convertToOffsetMetadata(offset: Long): LogOffsetMetadata = {
  • try {
  • // 通过Log进行读取
  • val fetchDataInfo = read(offset, 1)
  • fetchDataInfo.fetchOffsetMetadata
  • } catch {
  • // 如果初始化失败,则将LogOffsetMetadata.messageOffset重置为-1,另外两个字段的值重置为0
  • case e: OffsetOutOfRangeException => LogOffsetMetadata.UnknownOffsetMetadata
  • }
  • }

convertHWToLocalOffsetMetadata()方法内部是通过调用Leader副本对应的Log的convertToOffsetMetadata(offset: Long): LogOffsetMetadata方法实现的,它会从底层日志尝试读取Leader副本的HighWatermark线(通过从HighWatermark线开始尝试读取1个字节来实现),如果没有读取到就将Leader副本的HighWatermark值置为LogOffsetMetadata.UnknownOffsetMetadata(值为-1)。由于当ISR集合发生增减或是ISR集合中任一副本的LogEndOffset发生变化时,都可能导致ISR集合中最小的LogEndOffset变大,所以这些情况都要调用maybeIncrementLeaderHW(...)方法进行检测。

makeFollower(...)方法则用于将“本地副本”切换为Follower副本,源码如下:

  • // kafka.cluster.Partition#makeFollower
  • /**
  • * Make the local replica the follower by setting the new leader and ISR to empty
  • * If the leader replica id does not change, return false to indicate the replica manager
  • * 切换为Follower副本角色
  • */
  • def makeFollower(controllerId: Int, partitionStateInfo: PartitionState, correlationId: Int): Boolean = {
  • // 加锁
  • inWriteLock(leaderIsrUpdateLock) {
  • val allReplicas = partitionStateInfo.replicas.asScala.map(_.toInt)
  • val newLeaderBrokerId: Int = partitionStateInfo.leader
  • // record the epoch of the controller that made the leadership decision. This is useful while updating the isr
  • // to maintain the decision maker controller's epoch in the zookeeper path
  • controllerEpoch = partitionStateInfo.controllerEpoch
  • // add replicas that are new
  • // 创建对应的Replica对象
  • allReplicas.foreach(r => getOrCreateReplica(r))
  • // remove assigned replicas that have been removed by the controller
  • // 根据partitionStateInfo信息更新AR集合
  • (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_))
  • // ISR集合在Leader副本上维护,Follower副本上不维护,因此Follower副本上的ISR会置为空集合
  • inSyncReplicas = Set.empty[Replica]
  • // 更新Leader年代信息
  • leaderEpoch = partitionStateInfo.leaderEpoch
  • // 更新Zookeeper中的版本信息
  • zkVersion = partitionStateInfo.zkVersion
  • // 检测Leader是否发生改变
  • if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == newLeaderBrokerId) {
  • false
  • } else {
  • // 更新leaderReplicaIdOpt字段
  • leaderReplicaIdOpt = Some(newLeaderBrokerId)
  • true
  • }
  • }
  • }

makeFollower(...)方法与makeLeader(...)方法是大同小异的,区别在于makeFollower(...)方法不用维护HighWatermark和LogEndOffset等信息。

2.2.4. 副本数据写入

Partition中用于数据写入的方法在前面的文章中已经详细介绍过来,这里贴一下它的源码,就不赘述了:

  • // kafka.cluster.Partition#appendMessagesToLeader
  • // 向日志系统写入消息
  • def appendMessagesToLeader(messages: ByteBufferMessageSet, requiredAcks: Int = 0) = {
  • val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
  • /**
  • * 检查当前分区是否就是Leader分区,内部根据ReplicaID是否与BrokerID相同来判断
  • * 如果当前分区就是Leader分区就返回该分区副本(即Leader副本),否则返回None
  • * 只有Leader分区才可以处理读写请求,读写日志数据
  • */
  • val leaderReplicaOpt = leaderReplicaIfLocal()
  • leaderReplicaOpt match {
  • // 当前分区是Leader分区,并且得到了对应的副本分区
  • case Some(leaderReplica) =>
  • // 获取对应的Log对象
  • val log = leaderReplica.log.get
  • // 根据Log的配置获取最小ISR
  • val minIsr = log.config.minInSyncReplicas
  • // 查看当前Leader的In-Sync副本数量
  • val inSyncSize = inSyncReplicas.size
  • // Avoid writing to leader if there are not enough insync replicas to make it safe
  • // 如果In-Sync小于分区要求的最小ISR,且ACK要求为-1,则表示In-Sync满足不了ISR,抛出NotEnoughReplicasException异常
  • if (inSyncSize < minIsr && requiredAcks == -1) {
  • throw new NotEnoughReplicasException("Number of insync replicas for partition [%s,%d] is [%d], below required minimum [%d]"
  • .format(topic, partitionId, inSyncSize, minIsr))
  • }
  • // 否则In-Sync是满足最小ISR的,将消息数据添加Log中
  • val info = log.append(messages, assignOffsets = true)
  • // probably unblock some follower fetch requests since log end offset has been updated
  • // 尝试完成延迟的拉取操作,这个拉取操作一般是副本的拉取操作,传入的键是以主题和分区ID组成的TopicPartitionOperationKey
  • replicaManager.tryCompleteDelayedFetch(new TopicPartitionOperationKey(this.topic, this.partitionId))
  • // we may need to increment high watermark since ISR could be down to 1
  • // 可能需要更新HighWatermark值
  • (info, maybeIncrementLeaderHW(leaderReplica))
  • // 当前分区不是Leader分区,抛出NotLeaderForPartitionException异常
  • case None =>
  • throw new NotLeaderForPartitionException("Leader not local for partition [%s,%d] on broker %d"
  • .format(topic, partitionId, localBrokerId))
  • }
  • }
  • // some delayed operations may be unblocked after HW changed
  • if (leaderHWIncremented)
  • // 如果HighWatermark发生了更新,尝试完成延迟请求
  • tryCompleteDelayedRequests()
  • info
  • }

注:在Kafka中,只有Leader副本才可以接受数据的写入和读取操作。

2.2.5. 副本同步情况检测

Partition类中还提供了checkEnoughReplicasReachOffset(requiredOffset: Long): (Boolean, Short)方法用于检测是否有足够的Follower副本保持与Leader副本的同步,它是以传入参数requiredOffset作为基准的;该方法在前面的文章中有详细介绍,这里贴一下它的源码:

  • /*
  • * Note that this method will only be called if requiredAcks = -1
  • * and we are waiting for all replicas in ISR to be fully caught up to
  • * the (local) leader's offset corresponding to this produce request
  • * before we acknowledge the produce request.
  • * 检测Follower副本与Leader副本的同步情况
  • */
  • def checkEnoughReplicasReachOffset(requiredOffset: Long): (Boolean, Short) = {
  • leaderReplicaIfLocal() match {
  • // 当前副本是Leader副本
  • case Some(leaderReplica) =>
  • // keep the current immutable replica list reference
  • // 获取当前的In-Sync副本
  • val curInSyncReplicas = inSyncReplicas
  • // 已经确认同步的个数,通过遍历副本,判断副本的LEO是否大于requiredOffset
  • val numAcks = curInSyncReplicas.count(r => {
  • if (!r.isLocal)
  • // 判断副本的LEO是否大于requiredOffset
  • if (r.logEndOffset.messageOffset >= requiredOffset) {
  • trace("Replica %d of %s-%d received offset %d".format(r.brokerId, topic, partitionId, requiredOffset))
  • true
  • }
  • else
  • false
  • else
  • true /* also count the local (leader) replica */
  • })
  • trace("%d acks satisfied for %s-%d with acks = -1".format(numAcks, topic, partitionId))
  • // 主题配置的最小In-Sync副本数
  • val minIsr = leaderReplica.log.get.config.minInSyncReplicas
  • /**
  • * 如果Leader的HighWatermark大于等于requiredOffset,说明所有In-Sync副本都已经完成同步了,HighWatermark更新了
  • * 1. 此时如果满足了最小In-Sync副本数,直接返回true及NONE错误码即可;
  • * 2. 此时如果还不满足最小In-Sync副本数,说明副本数满足不了,但同步完成了,就返回true及NOT_ENOUGH_REPLICAS_AFTER_APPEND错误码;
  • * 如果Leader的HighWatermark小于requiredOffset,说明In-Sync副本未完成同步,返回false及NONE错误码。
  • */
  • if (leaderReplica.highWatermark.messageOffset >= requiredOffset ) {
  • /*
  • * The topic may be configured not to accept messages if there are not enough replicas in ISR
  • * in this scenario the request was already appended locally and then added to the purgatory before the ISR was shrunk
  • */
  • // 判断是否满足In-Sync副本最低要求
  • if (minIsr <= curInSyncReplicas.size) {
  • // 同步完成
  • (true, Errors.NONE.code)
  • } else {
  • // 同步完成,但副本数不够
  • (true, Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND.code)
  • }
  • } else
  • // 未同步完成
  • (false, Errors.NONE.code)
  • // 当前副本不是Leader副本
  • case None =>
  • (false, Errors.NOT_LEADER_FOR_PARTITION.code)
  • }
  • }

从源码可知,检测情况有以下的几类:

  • 当前Partition不是Leader副本,返回结果为false,错误码为NOT_LEADER_FOR_PARTITION;
  • 完成In-Sync副本同步并满足最小In-Sync副本要求时,返回结果为true,错误码为NONE;
  • 完成In-Sync副本同步但不满足最小In-Sync副本要求时,返回结果为true,错误码为NOT_ENOUGH_REPLICAS_AFTER_APPEND;
  • 未完成同步,返回结果为false,错误码为NONE。