大数据
流式处理
Kafka
源码解析

Kafka系列 15 - 服务端源码分析 06:延迟任务案例

简介:讲解生产请求和消费请求的延迟任务处理

1. KafkaApis请求派发

在前面的文章中我们讲到过KafkaApis类的设计,它会根据RequestChannel.Request的不同使用不同的方式处理请求,回顾源码:

  • /**
  • * Logic to handle the various Kafka requests
  • */
  • class KafkaApis(val requestChannel: RequestChannel,
  • val replicaManager: ReplicaManager,
  • val coordinator: GroupCoordinator,
  • val controller: KafkaController,
  • val zkUtils: ZkUtils,
  • val brokerId: Int,
  • val config: KafkaConfig,
  • val metadataCache: MetadataCache,
  • val metrics: Metrics,
  • val authorizer: Option[Authorizer]) extends Logging {
  • ...
  • def handle(request: RequestChannel.Request) {
  • try {
  • // 根据requestId获取请求对应的ApiKeys,进行匹配
  • ApiKeys.forId(request.requestId) match {
  • case ApiKeys.PRODUCE => handleProducerRequest(request) // 生产
  • case ApiKeys.FETCH => handleFetchRequest(request) // 拉取
  • ...
  • }
  • }
  • ...
  • }
  • ...
  • }

从源码可知,对应于生产消息和拉取消息请求来说,分别交给了handleProducerRequest(request: RequestChannel.Request)handleFetchRequest(request: RequestChannel.Request)两个方法,在本文中就根据这两个方法的处理流程为例来介绍Kafka中时间轮的具体应用,也用这两个具体应用弥补前面讲解生产者和消费者部分服务端处理流程的缺失细节。

2. 消息的生产请求

我们先分析handleProducerRequest(request: RequestChannel.Request)方法的构成;该方法内部代码非常多,主要流程分为以下几步:

  1. 转换RequestChannel.Request参数为ProducerRequest对象,检查该请求需要操作的主题和分区是否授权了写操作;
  2. 定义sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse])内部回调方法处理响应回调,sendResponseCallback(...)方法的流程如下:
  3. 根据响应状态字典构造PartitionResponse数组,检查响应中是否存在错误信息;
  4. 定义produceResponseCallback(delayTimeMs: Int)内部响应处理方法;根据ACK参数决定响应方式,如果ACK为0,只需要处理在发生异常情况下的操作,如果发生异常将会返回RequestChannel.CloseConnectionAction类型的响应,以关闭连接,否则将返回RequestChannel.NoOpAction类型的响应,以继续读取客户端的请求;如果ACK为-1或1,表示生产者需要服务端的响应,将根据返回RequestChannel.SendAction类型的响应。
  5. 最终调用上面定义的produceResponseCallback(...)处理响应。
  6. 使用ReplicaManager对象添加消息数据到日志系统中,根据结果情况调用sendResponseCallback(...)方法处理响应。

上面叙述的步骤其实是比较抽象的,同时内部还有很多细节,下面我们一一分析。

2.1. handleProducerRequest(…)方法

在下面的handleProducerRequest(request: RequestChannel.Request)方法中,省去了sendResponseCallback(...)produceResponseCallback(...)方法的代码放在后面分析,我们先关注handleProducerRequest(...)方法的主流程:

  • // kafka.server.KafkaApis#handleProducerRequest
  • /**
  • * Handle a produce request
  • */
  • def handleProducerRequest(request: RequestChannel.Request) {
  • // 获取请求体并转换为ProduceRequest对象
  • val produceRequest = request.body.asInstanceOf[ProduceRequest]
  • // 计算需要添加的字节数
  • val numBytesAppended = request.header.sizeOf + produceRequest.sizeOf
  • // 检查ProducerRequest请求定义的分区是否授权有写操作
  • val (authorizedRequestInfo, unauthorizedRequestInfo) = produceRequest.partitionRecords.asScala.partition {
  • case (topicPartition, _) => authorize(request.session, Write, new Resource(Topic, topicPartition.topic))
  • }
  • // the callback for sending a produce response
  • /**
  • * 这个函数会当做回调函数最终传递给DelayedProduce的responseCallback参数
  • * @param responseStatus
  • */
  • def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
  • ...
  • // 定义produceResponseCallback()回调函数
  • def produceResponseCallback(delayTimeMs: Int) {
  • ...
  • }
  • ...
  • }
  • if (authorizedRequestInfo.isEmpty)
  • // 如果ProducerRequest请求的所有分区都无写授权,就直接调用sendResponseCallback()回调,无响应数据
  • sendResponseCallback(Map.empty)
  • else {
  • // 决定是否可以操作内部主题,只有发出请求的客户端ID是"__admin_client"才可以操作内部主题
  • val internalTopicsAllowed = request.header.clientId == AdminUtils.AdminClientId
  • // Convert ByteBuffer to ByteBufferMessageSet
  • // 将授权的ProducerRequest请求写入数据转换为ByteBufferMessageSet对象
  • val authorizedMessagesPerPartition = authorizedRequestInfo.map {
  • case (topicPartition, buffer) => (topicPartition, new ByteBufferMessageSet(buffer))
  • }
  • // call the replica manager to append messages to the replicas
  • // 使用ReplicaManager的appendMessages(...)方法添加数据
  • replicaManager.appendMessages(
  • produceRequest.timeout.toLong,
  • produceRequest.acks,
  • internalTopicsAllowed,
  • authorizedMessagesPerPartition,
  • sendResponseCallback)
  • // if the request is put into the purgatory, it will have a held reference
  • // and hence cannot be garbage collected; hence we clear its data here in
  • // order to let GC re-claim its memory since it is already appended to log
  • // 将ProducerRequest内部数据清除以进行GC
  • produceRequest.clearPartitionRecords()
  • }
  • }

注:从val internalTopicsAllowed = request.header.clientId == AdminUtils.AdminClientId代码行可以得知,只有在客户端ID是“__admin_client”时该客户端才可以操作Kafka的内部主题分区。

如果不看sendResponseCallback(...)produceResponseCallback(...)方法,其实handleProducerRequest(...)方法的流程是非常清晰的,先验证生产请求需要操作的主题分区的授权情况,然后使用ReplicaManager的appendMessages(...)方法添加消息数据到日志系统,注意这里会将sendResponseCallback(...)方法作为responseCallback参数传递给ReplicaManager的appendMessages(...)方法。

2.2. 消息写入

ReplicaManager的appendMessages(...)方法主要用于追加生产请求的消息数据到日志系统中,该方法源码如下:

  • // kafka.server.ReplicaManager#appendMessages
  • /**
  • * Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas;
  • * the callback function will be triggered either when timeout or the required acks are satisfied
  • */
  • def appendMessages(timeout: Long,
  • requiredAcks: Short,
  • internalTopicsAllowed: Boolean,
  • messagesPerPartition: Map[TopicPartition, MessageSet],
  • responseCallback: Map[TopicPartition, PartitionResponse] => Unit) {
  • if (isValidRequiredAcks(requiredAcks)) { // 检查ACK参数是否合法-1、0或1中的一个
  • val sTime = SystemTime.milliseconds
  • // 将消息追加到Log中,同时还会检测delayedFetchPurgatory中相关key对应的DelayedFetch,满足条件则将其执行完成
  • val localProduceResults = appendToLocalLog(internalTopicsAllowed, messagesPerPartition, requiredAcks)
  • debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
  • // 对追加结果进行转换,注意ProducePartitionStatus的参数
  • val produceStatus = localProduceResults.map { case (topicPartition, result) =>
  • topicPartition ->
  • ProducePartitionStatus(
  • result.info.lastOffset + 1, // required offset
  • new PartitionResponse(result.errorCode, result.info.firstOffset, result.info.timestamp)) // response status
  • }
  • // 检测是否生成DelayedProduce,其中一个条件是检测ProduceRequest中的acks字段是否为-1
  • if (delayedRequestRequired(requiredAcks, messagesPerPartition, localProduceResults)) {
  • /**
  • * 走到这里说明delayedRequestRequired()方法返回值为true,需要满足下面三个条件:
  • * 1. ACK为-1;
  • * 2. 有数据需要写入;
  • * 3. 至少有一个分区的数据写入是成功的。
  • */
  • // create delayed produce operation
  • // 创建DelayedProduce对象
  • val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
  • val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback)
  • // create a list of (topic, partition) pairs to use as keys for this delayed produce operation
  • // 将ProduceRequest中的主题映射为TopicPartitionOperationKey序列
  • val producerRequestKeys = messagesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq
  • // try to complete the request immediately, otherwise put it into the purgatory
  • // this is because while the delayed produce operation is being created, new
  • // requests may arrive and hence make this operation completable.
  • // 尝试完成DelayedProduce,否则将DelayedProduce添加到delayedProducePurgatory中管理
  • delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
  • } else {
  • // we can respond immediately
  • val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus)
  • responseCallback(produceResponseStatus)
  • }
  • } else {
  • // If required.acks is outside accepted range, something is wrong with the client
  • // Just return an error and don't handle the request at all
  • // ACK参数不合法,返回错误信息,错误码为INVALID_REQUIRED_ACKS
  • val responseStatus = messagesPerPartition.map {
  • case (topicAndPartition, messageSet) =>
  • (topicAndPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS.code,
  • LogAppendInfo.UnknownLogAppendInfo.firstOffset,
  • Message.NoTimestamp))
  • }
  • responseCallback(responseStatus)
  • }
  • }

在ACK参数不合法的情况下(为-1、0或1以外的配置),appendMessages(...)方法会直接为每个主题分区构造对应的PartitionResponse并记录错误吗为INVALID_REQUIRED_ACKS,然后直接触发responseCallback回调。

在ACK参数合法时,会通过ReplicaManager的appendToLocalLog(internalTopicsAllowed: Boolean, messagesPerPartition: Map[TopicPartition, MessageSet], requiredAcks: Short): Map[TopicPartition, LogAppendResult]方法将消息数据添加到日志系统,源码如下:

  • // kafka.server.ReplicaManager#appendToLocalLog
  • /**
  • * Append the messages to the local replica logs
  • */
  • private def appendToLocalLog(internalTopicsAllowed: Boolean,
  • messagesPerPartition: Map[TopicPartition, MessageSet],
  • requiredAcks: Short): Map[TopicPartition, LogAppendResult] = {
  • trace("Append [%s] to local log ".format(messagesPerPartition))
  • // 遍历消息集合
  • messagesPerPartition.map { case (topicPartition, messages) =>
  • BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).totalProduceRequestRate.mark()
  • BrokerTopicStats.getBrokerAllTopicsStats().totalProduceRequestRate.mark()
  • // reject appending to internal topics if it is not allowed
  • if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) {
  • // 内部主题且内部主题不允许添加,拒绝添加
  • (topicPartition, LogAppendResult(
  • LogAppendInfo.UnknownLogAppendInfo,
  • Some(new InvalidTopicException("Cannot append to internal topic %s".format(topicPartition.topic)))
  • ))
  • } else {
  • try {
  • // 根据主题和分区获取对应的Partition对象
  • val partitionOpt = getPartition(topicPartition.topic, topicPartition.partition)
  • val info = partitionOpt match {
  • // 分区存在
  • case Some(partition) =>
  • // 通过Partition对象将消息添加到Leader副本分区,该方法还会尝试在HighWatermark发生更新时尝试完成写入消息和拉取消息的请求
  • partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet], requiredAcks)
  • // 分区不存在,抛出UnknownTopicOrPartitionException异常
  • case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d"
  • .format(topicPartition, localBrokerId))
  • }
  • // 添加的消息的数量
  • val numAppendedMessages =
  • if (info.firstOffset == -1L || info.lastOffset == -1L)
  • 0
  • else
  • info.lastOffset - info.firstOffset + 1
  • // update stats for successfully appended bytes and messages as bytesInRate and messageInRate
  • BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).bytesInRate.mark(messages.sizeInBytes)
  • BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(messages.sizeInBytes)
  • BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).messagesInRate.mark(numAppendedMessages)
  • BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numAppendedMessages)
  • trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d"
  • .format(messages.sizeInBytes, topicPartition.topic, topicPartition.partition, info.firstOffset, info.lastOffset))
  • // 返回字典结果
  • (topicPartition, LogAppendResult(info))
  • } catch {
  • // NOTE: Failed produce requests metric is not incremented for known exceptions
  • // it is supposed to indicate un-expected failures of a broker in handling a produce request
  • case e: KafkaStorageException =>
  • fatal("Halting due to unrecoverable I/O error while handling produce request: ", e)
  • Runtime.getRuntime.halt(1)
  • (topicPartition, null)
  • // 将异常包装后向上抛出,这里包括了NotLeaderForPartitionException异常
  • case e@ (_: UnknownTopicOrPartitionException |
  • _: NotLeaderForPartitionException | // 当前分区不是Leader分区的异常
  • _: RecordTooLargeException |
  • _: RecordBatchTooLargeException |
  • _: CorruptRecordException |
  • _: InvalidMessageException |
  • _: InvalidTimestampException) =>
  • (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e)))
  • case t: Throwable =>
  • BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).failedProduceRequestRate.mark()
  • BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark()
  • error("Error processing append operation on partition %s".format(topicPartition), t)
  • (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(t)))
  • }
  • }
  • }
  • }

appendToLocalLog(...)方法中,如果在不允许操作内部主题分区的情况下尝试操作内部主题分区时会记录InvalidTopicException异常并返回;否则根据主题和分区信息构造Partition对象,由该Partition对象的appendMessagesToLeader(messages: ByteBufferMessageSet, requiredAcks: Int = 0)方法负责消息数据的写入,该方法在完成写入时会返回LogAppendInfo对象,上层的appendToLocalLog(...)方法则根据该LogAppendInfo对象构造LogAppendResult对象并向上返回;appendMessagesToLeader(...)的源码如下:

  • // kafka.cluster.Partition#appendMessagesToLeader
  • def appendMessagesToLeader(messages: ByteBufferMessageSet, requiredAcks: Int = 0) = {
  • val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
  • /**
  • * 检查当前分区是否就是Leader分区,内部根据ReplicaID是否与BrokerID相同来判断
  • * 如果当前分区就是Leader分区就返回该分区副本(即Leader副本),否则返回None
  • */
  • val leaderReplicaOpt = leaderReplicaIfLocal()
  • leaderReplicaOpt match {
  • // 当前分区是Leader分区,并且得到了对应的副本分区
  • case Some(leaderReplica) =>
  • // 获取对应的Log对象
  • val log = leaderReplica.log.get
  • // 根据Log的配置获取最小ISR
  • val minIsr = log.config.minInSyncReplicas
  • // 查看当前Leader的In-Sync副本数量
  • val inSyncSize = inSyncReplicas.size
  • // Avoid writing to leader if there are not enough insync replicas to make it safe
  • // 如果In-Sync小于分区要求的最小ISR,且ACK要求为-1,则表示In-Sync满足不了ISR,抛出NotEnoughReplicasException异常
  • if (inSyncSize < minIsr && requiredAcks == -1) {
  • throw new NotEnoughReplicasException("Number of insync replicas for partition [%s,%d] is [%d], below required minimum [%d]"
  • .format(topic, partitionId, inSyncSize, minIsr))
  • }
  • // 否则In-Sync是满足最小ISR的,将消息数据添加Log中
  • val info = log.append(messages, assignOffsets = true)
  • // probably unblock some follower fetch requests since log end offset has been updated
  • // 尝试完成延迟的拉取操作,这个拉取操作一般是副本的拉取操作,传入的键是以主题和分区ID组成的TopicPartitionOperationKey
  • replicaManager.tryCompleteDelayedFetch(new TopicPartitionOperationKey(this.topic, this.partitionId))
  • // we may need to increment high watermark since ISR could be down to 1
  • // 可能需要更新HighWatermark值
  • (info, maybeIncrementLeaderHW(leaderReplica))
  • // 当前分区不是Leader分区,抛出NotLeaderForPartitionException异常
  • case None =>
  • throw new NotLeaderForPartitionException("Leader not local for partition [%s,%d] on broker %d"
  • .format(topic, partitionId, localBrokerId))
  • }
  • }
  • // some delayed operations may be unblocked after HW changed
  • if (leaderHWIncremented)
  • // 如果HighWatermark发生了更新,尝试完成延迟请求
  • tryCompleteDelayedRequests()
  • info
  • }

Partition类的appendMessagesToLeader(...)方法会判断当前Partition是否是Leader分区,如果是则获取分区对应的Replica对象,然后使用Replica对象来向日志系统追加数据。在追加时会判断该分区的In-Sync副本数量是否满足对应的日志要求的最小In-Sync数量,如果不满足将会抛出NotEnoughReplicasException异常。最终消息追加还是由Log对象的append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): LogAppendInfo完成的,这个方法在前面Kafka日志储存相关的文章中有详细介绍,这里不再赘述。

这一次的追加操作其实是对Leader分区的追加,其它的副本分区此时还没有同步新追加的数据。同步操作由replicaManager.tryCompleteDelayedFetch(new TopicPartitionOperationKey(this.topic, this.partitionId))触发,它会尝试完成DelayedFetch延迟拉取操作,传入的键是以主题和分区ID组成的TopicPartitionOperationKey,它的底层其实就涉及到了DelayedOperationPurgatory对象:

  • // kafka.server.ReplicaManager#tryCompleteDelayedFetch
  • /**
  • * Try to complete some delayed fetch requests with the request key;
  • * this can be triggered when:
  • *
  • * 1. The partition HW has changed (for regular fetch)
  • * 2. A new message set is appended to the local log (for follower fetch)
  • */
  • def tryCompleteDelayedFetch(key: DelayedOperationKey) {
  • // 使用DelayedOperationPurgatory来完成延迟操作
  • val completed = delayedFetchPurgatory.checkAndComplete(key)
  • debug("Request key %s unblocked %d fetch requests.".format(key.keyLabel, completed))
  • }

至于这里用到的delayedFetchPurgatory,其实还有与之对应的delayedProducePurgatory的定义,它们在ReplicaManager初始化时已经被构建了:

  • // kafka.server.ReplicaManager#delayedProducePurgatory
  • // 生产请求的DelayedOperationPurgatory
  • val delayedProducePurgatory = DelayedOperationPurgatory[DelayedProduce](
  • purgatoryName = "Produce", config.brokerId, config.producerPurgatoryPurgeIntervalRequests)
  • // kafka.server.ReplicaManager#delayedFetchPurgatory
  • // 拉取请求的DelayedOperationPurgatory
  • val delayedFetchPurgatory = DelayedOperationPurgatory[DelayedFetch](
  • purgatoryName = "Fetch", config.brokerId, config.fetchPurgatoryPurgeIntervalRequests)

这里我们先不关注DelayedFetch的处理,我们往前回溯到ReplicaManager的appendMessages(...)方法,在经过appendMessagesToLeader(...)appendToLocalLog(...)的处理,已经将消息数据写入到对应的Leader分区的副本中并得到了写入结果Map[TopicPartition, LogAppendResult]类型的字典,其中键表示写入的Leader分区,值表示追加结果。appendMessages(...)方法根据这个结果以及ACK参数决定是否需要生成DelayedProduce延时任务,这部分判断由delayedRequestRequired(requiredAcks: Short, messagesPerPartition: Map[TopicPartition, MessageSet], localProduceResults: Map[TopicPartition, LogAppendResult]): Boolean,代码简单这里就不贴了,满足下面的3个条件则说明要生成DelayedProduce延时任务:

  1. ACK为-1;
  2. 有数据需要写入;
  3. 至少有一个分区的数据写入是成功的。

其实这很好理解,当前已经将消息写入了Leader分区的副本,如果ACK参数为0表示客户端不需要确认,如果ACK参数为1表示只要有一个副本写入了数据即可,此时Leader分区副本是写入了的,这两种情况均不需要产生DelayedProduce延时任务;只有在ACK参数为-1,且已经成功将消息数据写入了Leader分区副本时,此时才需要产生DelayedProduce延时任务,直到其他所有副本完成数据同步后才向客户端发送确认响应。

2.3. DelayedProduce延迟任务

产生的延迟任务会扔进delayedProducePurgatory炼狱中进行监听,监听的键是以生产消息的主题分区构成的TopicPartitionOperationKey对象;这部分的源码如下:

  • /**
  • * Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas;
  • * the callback function will be triggered either when timeout or the required acks are satisfied
  • */
  • def appendMessages(timeout: Long,
  • requiredAcks: Short,
  • internalTopicsAllowed: Boolean,
  • messagesPerPartition: Map[TopicPartition, MessageSet],
  • responseCallback: Map[TopicPartition, PartitionResponse] => Unit) {
  • ...
  • val sTime = SystemTime.milliseconds
  • // 将消息追加到Log中,同时还会检测delayedFetchPurgatory中相关key对应的DelayedFetch,满足条件则将其执行完成
  • val localProduceResults = appendToLocalLog(internalTopicsAllowed, messagesPerPartition, requiredAcks)
  • debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
  • // 对追加结果进行转换,注意ProducePartitionStatus的参数
  • val produceStatus = localProduceResults.map { case (topicPartition, result) =>
  • topicPartition ->
  • ProducePartitionStatus(
  • result.info.lastOffset + 1, // required offset
  • new PartitionResponse(result.errorCode, result.info.firstOffset, result.info.timestamp)) // response status
  • }
  • // 检测是否生成DelayedProduce,其中一个条件是检测ProduceRequest中的acks字段是否为-1
  • if (delayedRequestRequired(requiredAcks, messagesPerPartition, localProduceResults)) {
  • /**
  • * 走到这里说明delayedRequestRequired()方法返回值为true,需要满足下面三个条件:
  • * 1. ACK为-1;
  • * 2. 有数据需要写入;
  • * 3. 至少有一个分区的数据写入是成功的。
  • */
  • // create delayed produce operation
  • // 创建DelayedProduce对象
  • val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
  • val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback)
  • // create a list of (topic, partition) pairs to use as keys for this delayed produce operation
  • // 将ProduceRequest中的主题映射为TopicPartitionOperationKey序列
  • val producerRequestKeys = messagesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq
  • // try to complete the request immediately, otherwise put it into the purgatory
  • // this is because while the delayed produce operation is being created, new
  • // requests may arrive and hence make this operation completable.
  • // 尝试完成DelayedProduce,否则将DelayedProduce添加到delayedProducePurgatory中管理
  • delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
  • } else {
  • // we can respond immediately
  • val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus)
  • responseCallback(produceResponseStatus)
  • }
  • ...
  • }

在构造DelayedProduce对象时会将responseCallback作为参数传递给DelayedProduce的构造方法,大家可以向上追溯会发现responseCallback就是前面提到过的KafkaApis的handleProducerRequest(request: RequestChannel.Request)中定义的sendResponseCallback(...)方法;同时这里还涉及到了两个类:ProducePartitionStatus和ProduceMetadata,它们用于组织生产请求中各个主题分区对应的状态:

  • /**
  • * @param requiredOffset requiredOffset是ProducerRequest中追加到此分区的最后一个消息的offset
  • * @param responseStatus responseStatus字段主要用来记录ProducerResponse中的错误码
  • */
  • case class ProducePartitionStatus(requiredOffset: Long, responseStatus: PartitionResponse) {
  • /**
  • * 表示是否在等待ISR集合中其他副本与Leader副本同步requiredOffset之前的消息
  • * 如果ISR集合中所有副本已经完成了requiredOffset之前消息的同步,此值被设置为false
  • */
  • @volatile var acksPending = false
  • override def toString = "[acksPending: %b, error: %d, startOffset: %d, requiredOffset: %d]"
  • .format(acksPending, responseStatus.errorCode, responseStatus.baseOffset, requiredOffset)
  • }
  • /**
  • * The produce metadata maintained by the delayed produce operation
  • * @param produceRequiredAcks 记录了ProduceRequest中acks字段的值
  • * @param produceStatus 记录了每个Partition的ProducePartitionStatus
  • */
  • case class ProduceMetadata(produceRequiredAcks: Short,
  • produceStatus: Map[TopicPartition, ProducePartitionStatus]) {
  • override def toString = "[requiredAcks: %d, partitionStatus: %s]"
  • .format(produceRequiredAcks, produceStatus)
  • }

这两个类比较简单,注释也已说明了它们的用处,这里就不赘述了;我们需要关注的是DelayedProduce类的实现。

DelayedProduce是继承自DelayedOperation的延迟任务类,对应于生产消息的延迟操作,它的定义如下:

  • // kafka.server.DelayedProduce
  • /**
  • * A delayed produce operation that can be created by the replica manager and watched
  • * in the produce operation purgatory
  • * @param delayMs DelayedProduce的延迟时长
  • * @param produceMetadata ProduceMetadata对象。ProduceMetadata中为一个ProducerRequest中的所有相关分区记录了一些追加消息后的返回结果,主要用于判断DelayedProduce是否满足执行条件
  • * @param replicaManager 此DelayedProduce关联的ReplicaManager对象
  • * @param responseCallback 任务满足条件或到期执行时,在DelayedProduce.onComplete()方法中调用的回调函数
  • */
  • class DelayedProduce(delayMs: Long,
  • produceMetadata: ProduceMetadata,
  • replicaManager: ReplicaManager,
  • responseCallback: Map[TopicPartition, PartitionResponse] => Unit)
  • extends DelayedOperation(delayMs) {
  • // first update the acks pending variable according to the error code
  • /**
  • * 对produceMetadata字段中的produceStatus集合进行设置
  • * 根据前面写入消息返回的结果,设置ProducePartitionStatus的acksPending字段和responseStatus字段的值
  • */
  • produceMetadata.produceStatus.foreach { case (topicPartition, status) =>
  • // 对应分区的写入操作成功,则等待ISR集合中的副本完成同步;如果写入操作异常,则该分区不需要等待
  • if (status.responseStatus.errorCode == Errors.NONE.code) {
  • // Timeout error state will be cleared when required acks are received
  • status.acksPending = true
  • // 预设错误码,如果ISR集合中副本在此请求超时之前顺利完成了同步,会清楚该错误码
  • status.responseStatus.errorCode = Errors.REQUEST_TIMED_OUT.code
  • } else {
  • // 如果追加日志已经抛出异常,则不必等待此Partition对应的ISR返回ACK了
  • status.acksPending = false
  • }
  • trace("Initial partition status for %s is %s".format(topicPartition, status))
  • }
  • ...
  • }

从上面DelayedProduce的定义源码可知,DelayedProduce在初始化时会对传入的produceMetadataproduceStatus进行遍历处理,统一处理了produceStatusacksPending和错误码信息。我们需要注意的是,produceStatus是会记录Leader副本写入的结果的,如果Leader副本写入成功,则需要等待In-Sync副本完成同步,此时将acksPending置为true,并将错误码预设为REQUEST_TIMED_OUT;如果Leader副本写入失败则直接将acksPending置为false,表示不需要等待In-Sync副本同步,直接抛出异常即可。

在上面的appendMessages(...)方法中会调用delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)尝试完成DelayedProduce任务,否则将其添加到delayedProducePurgatory中管理,前面分析过tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean方法内会调用operation参数的tryComplete()方法尝试完成任务,在DelayedProduce中,tryComplete()方法的实现如下:

  • // kafka.server.DelayedProduce#tryComplete
  • /**
  • * The delayed produce operation can be completed if every partition
  • * it produces to is satisfied by one of the following:
  • *
  • * Case A: This broker is no longer the leader: set an error in response
  • * 当前broker不再是Leader副本了(可能发生Leader迁移),则需要在响应中记录错误
  • * Case B: This broker is the leader:
  • * 当前broker还是Leader副本
  • * B.1 - If there was a local error thrown while checking if at least requiredAcks
  • * replicas have caught up to this operation: set an error in response
  • * 如果在In-Sync副本完成了同步,但在这个过程中发生了本地错误,则需要在响应中记录错误
  • * B.2 - Otherwise, set the response with no error.
  • * 其他情况,清除响应中的错误
  • */
  • override def tryComplete(): Boolean = {
  • // check for each partition if it still has pending acks
  • // 遍历produceMetadata中的所有分区的状态
  • produceMetadata.produceStatus.foreach { case (topicAndPartition, status) =>
  • trace("Checking produce satisfaction for %s, current status %s"
  • .format(topicAndPartition, status))
  • // skip those partitions that have already been satisfied
  • if (status.acksPending) { // 检查此分区是否已经满足DelayedProduce执行条件
  • // 获取对应的Partition对象
  • val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition)
  • val (hasEnough, errorCode) = partitionOpt match {
  • case Some(partition) =>
  • /**
  • * 检查此分区的HW位置是否大于requiredOffset
  • * 这里涉及Partition类中的checkEnoughReplicasReachOffset(...)方法
  • */
  • partition.checkEnoughReplicasReachOffset(status.requiredOffset)
  • case None =>
  • // Case A
  • // 条件A:找不到此分区的Leader,记录错误
  • (false, Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
  • }
  • if (errorCode != Errors.NONE.code) { // 条件B.1: 出现异常
  • // Case B.1
  • status.acksPending = false
  • // 记录错误
  • status.responseStatus.errorCode = errorCode
  • } else if (hasEnough) { // 条件B.2:此分区Leader副本的HW大于对应的requiredOffset
  • // Case B.2
  • status.acksPending = false
  • // 清除错误
  • status.responseStatus.errorCode = Errors.NONE.code
  • }
  • }
  • }
  • // check if each partition has satisfied at lease one of case A and case B
  • // 检查全部的分区是否都已经符合DelayedProduce的执行条件
  • if (! produceMetadata.produceStatus.values.exists(p => p.acksPending))
  • forceComplete()
  • else
  • false
  • }

从上面代码可知,tryComplete()方法会遍历所有的produceStatus,然后对每个生产请求涉及到的主题分区对象Partition执行checkEnoughReplicasReachOffset(requiredOffset: Long): (Boolean, Short)方法,该方法用于检测是否已经完成了副本同步,源码如下:

  • // kafka.cluster.Partition#checkEnoughReplicasReachOffset
  • /*
  • * Note that this method will only be called if requiredAcks = -1
  • * and we are waiting for all replicas in ISR to be fully caught up to
  • * the (local) leader's offset corresponding to this produce request
  • * before we acknowledge the produce request.
  • */
  • def checkEnoughReplicasReachOffset(requiredOffset: Long): (Boolean, Short) = {
  • leaderReplicaIfLocal() match {
  • // 当前副本是Leader副本
  • case Some(leaderReplica) =>
  • // keep the current immutable replica list reference
  • // 获取当前的In-Sync副本
  • val curInSyncReplicas = inSyncReplicas
  • // 已经确认同步的个数,通过遍历副本,判断副本的LEO是否大于requiredOffset
  • val numAcks = curInSyncReplicas.count(r => {
  • if (!r.isLocal)
  • // 判断副本的LEO是否大于requiredOffset
  • if (r.logEndOffset.messageOffset >= requiredOffset) {
  • trace("Replica %d of %s-%d received offset %d".format(r.brokerId, topic, partitionId, requiredOffset))
  • true
  • }
  • else
  • false
  • else
  • true /* also count the local (leader) replica */
  • })
  • trace("%d acks satisfied for %s-%d with acks = -1".format(numAcks, topic, partitionId))
  • // 主题配置的最小In-Sync副本数
  • val minIsr = leaderReplica.log.get.config.minInSyncReplicas
  • /**
  • * 如果Leader的HighWatermark大于等于requiredOffset,说明所有In-Sync副本都已经完成同步了,HighWatermark更新了
  • * 1. 此时如果满足了最小In-Sync副本数,直接返回true及NONE错误码即可;
  • * 2. 此时如果还不满足最小In-Sync副本数,说明副本数满足不了,但同步完成了,就返回true及NOT_ENOUGH_REPLICAS_AFTER_APPEND错误码;
  • * 如果Leader的HighWatermark小于requiredOffset,说明In-Sync副本未完成同步,返回false及NONE错误码。
  • */
  • if (leaderReplica.highWatermark.messageOffset >= requiredOffset ) {
  • /*
  • * The topic may be configured not to accept messages if there are not enough replicas in ISR
  • * in this scenario the request was already appended locally and then added to the purgatory before the ISR was shrunk
  • */
  • // 判断是否满足In-Sync副本最低要求
  • if (minIsr <= curInSyncReplicas.size) {
  • // 同步完成
  • (true, Errors.NONE.code)
  • } else {
  • // 同步完成,但副本数不够
  • (true, Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND.code)
  • }
  • } else
  • // 未同步完成
  • (false, Errors.NONE.code)
  • // 当前副本不是Leader副本
  • case None =>
  • (false, Errors.NOT_LEADER_FOR_PARTITION.code)
  • }
  • }

checkEnoughReplicasReachOffset(...)的实现比较简单,这里需要注意的是它的返回值,

  • 当前Partition不是Leader副本,返回结果为false,错误码为NOT_LEADER_FOR_PARTITION;
  • 完成In-Sync副本同步并满足最小In-Sync副本要求时,返回结果为true,错误码为NONE;
  • 完成In-Sync副本同步但不满足最小In-Sync副本要求时,返回结果为true,错误码为NOT_ENOUGH_REPLICAS_AFTER_APPEND;
  • 未完成同步,返回结果为false,错误码为NONE。

如果checkEnoughReplicasReachOffset(...)返回true,表示对应主题分区的In-Sync副本已经全部完成同步了;当生产请求所涉及的所有主题分区都完成副本同步后,tryComplete()方法会调用forceComplete()强制结束DelayedProduce任务,forceComplete()最终其实是会调用onComplete()方法的,源码如下:

  • // kafka.server.DelayedOperation#forceComplete
  • def forceComplete(): Boolean = {
  • // CAS修改completed字段为true,标识延迟操作已完成
  • if (completed.compareAndSet(false, true)) {
  • // cancel the timeout timer
  • // 修改成功,调用TimerTask的cancel()方法将其从TimerTaskList中删除
  • cancel()
  • // 调用onComplete()方法
  • onComplete()
  • true
  • } else {
  • false
  • }
  • }
  • // kafka.server.DelayedProduce#onComplete
  • /**
  • * Upon completion, return the current response status along with the error code per partition
  • */
  • override def onComplete() {
  • // 根据ProduceMetadata记录的相关信息,为每个Partition产生响应状态
  • val responseStatus = produceMetadata.produceStatus.mapValues(status => status.responseStatus)
  • // 调用responseCallback回调函数
  • responseCallback(responseStatus)
  • }

DelayedProduce的onComplete()方法会调用responseCallback回调,传入构造的Map[TopicPartition, PartitionResponse]类型的响应字典。

2.4. 响应处理

在上面我们讲到过,DelayedProduce类中的responseCallback回调就是前面提到过的KafkaApis的handleProducerRequest(request: RequestChannel.Request)中定义的sendResponseCallback(...)方法,现在可以来分析这两个方法的源码:

  • // sendResponseCallback
  • // the callback for sending a produce response
  • /**
  • * 这个函数会当做回调函数最终传递给DelayedProduce的responseCallback参数
  • * @param responseStatus
  • */
  • def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
  • // 生成响应状态集合,其中包括通过授权验证并处理完成的状态(responseStatus),以及未通过授权验证的状态(unauthorizedRequestInfo)
  • val mergedResponseStatus = responseStatus ++ unauthorizedRequestInfo.mapValues(_ =>
  • new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1, Message.NoTimestamp))
  • // 标识处理ProducerRequest的过程中是否出现异常
  • var errorInResponse = false
  • // 遍历响应状态集合
  • mergedResponseStatus.foreach { case (topicPartition, status) =>
  • if (status.errorCode != Errors.NONE.code) { // 当存在错误状态码时
  • // 将errorInResponse置为true
  • errorInResponse = true
  • debug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(
  • request.header.correlationId,
  • request.header.clientId,
  • topicPartition,
  • Errors.forCode(status.errorCode).exceptionName))
  • }
  • }
  • // 定义produceResponseCallback()回调函数
  • def produceResponseCallback(delayTimeMs: Int) {
  • // 处理acks字段为0的情况,即生产者不需要服务端确认ACK
  • if (produceRequest.acks == 0) {
  • // no operation needed if producer request.required.acks = 0; however, if there is any error in handling
  • // the request, since no response is expected by the producer, the server will close socket server so that
  • // the producer client will know that some error has happened and will refresh its metadata
  • if (errorInResponse) {
  • val exceptionsSummary = mergedResponseStatus.map { case (topicPartition, status) =>
  • topicPartition -> Errors.forCode(status.errorCode).exceptionName
  • }.mkString(", ")
  • info(
  • s"Closing connection due to error during produce request with correlation id ${request.header.correlationId} " +
  • s"from client id ${request.header.clientId} with ack=0\n" +
  • s"Topic and partition to exceptions: $exceptionsSummary"
  • )
  • /**
  • * 处理ProducerRequest过程中出现异常,
  • * 则向对应的responseQueue中添加RequestChannel.CloseConnectionAction类型的响应,关闭连接
  • */
  • requestChannel.closeConnection(request.processor, request)
  • } else {
  • /**
  • * 处理ProducerRequest过程中未出现异常,
  • * 则向对应的responseQueue中添加RequestChannel.NoOpAction类型的响应,继续读取客户端的请求
  • */
  • requestChannel.noOperation(request.processor, request)
  • }
  • } else { // 处理acks字段为1或-1的情况,即生产者需要服务端确认ACK
  • // 创建消息头
  • val respHeader = new ResponseHeader(request.header.correlationId)
  • // 创建消息体
  • val respBody = request.header.apiVersion match {
  • case 0 => new ProduceResponse(mergedResponseStatus.asJava)
  • case version@(1 | 2) => new ProduceResponse(mergedResponseStatus.asJava, delayTimeMs, version)
  • // This case shouldn't happen unless a new version of ProducerRequest is added without
  • // updating this part of the code to handle it properly.
  • case version => throw new IllegalArgumentException(s"Version `$version` of ProduceRequest is not handled. Code must be updated.")
  • }
  • // 向对应的responseQueue中添加RequestChannel.SendAction类型的响应,将响应返回给客户端
  • requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, respHeader, respBody)))
  • }
  • }
  • // When this callback is triggered, the remote API call has completed
  • // 设置API调用的远程完成时间为当前时间
  • request.apiRemoteCompleteTimeMs = SystemTime.milliseconds
  • // 记录监控数据,但在其内部会调用produceResponseCallback()回调函数
  • quotaManagers(ApiKeys.PRODUCE.id).recordAndMaybeThrottle(
  • request.header.clientId,
  • numBytesAppended,
  • produceResponseCallback)
  • }

其实回过头来看这部分代码是非常好理解的,sendResponseCallback(...)方法会遍历响应字典以记录在处理过程中是否发生了错误,而produceResponseCallback(...)方法则会根据ACK参数、处理过程是否存在错误来选择不同的响应处理方式:

  1. 当ACK参数为0时,不需要向客户端确认ACK,如果处理过程发生错误,则向对应的responseQueue中添加RequestChannel.CloseConnectionAction类型的响应,关闭连接;如果未发生错误,则向对应的responseQueue中添加RequestChannel.NoOpAction类型的响应,继续读取客户端的请求;
  2. 当ACK参数为-1或1时,需要向客户端确认ACK,则向对应的responseQueue中添加RequestChannel.SendAction类型的响应,将响应返回给客户端。

调用produceResponseCallback(...)方法的操作其实是在sendResponseCallback(...)方法的最后通过KafkaApis的quotaManagers字典中保存的ClientQuotaManager对象的recordAndMaybeThrottle(clientId: String, value: Int, callback: Int => Unit): Int方法完成的:

  • /**
  • * Records that a clientId changed some metric being throttled (produced/consumed bytes, QPS etc.)
  • * @param clientId clientId that produced the data
  • * @param value amount of data written in bytes
  • * @param callback Callback function. This will be triggered immediately if quota is not violated.
  • * If there is a quota violation, this callback will be triggered after a delay
  • * @return Number of milliseconds to delay the response in case of Quota violation.
  • * Zero otherwise
  • */
  • def recordAndMaybeThrottle(clientId: String, value: Int, callback: Int => Unit): Int = {
  • val clientSensors = getOrCreateQuotaSensors(clientId)
  • var throttleTimeMs = 0
  • try {
  • clientSensors.quotaSensor.record(value)
  • // trigger the callback immediately if quota is not violated
  • // 调用callback
  • callback(0)
  • } catch {
  • case qve: QuotaViolationException =>
  • // 异常处理
  • // Compute the delay
  • val clientMetric = metrics.metrics().get(clientRateMetricName(clientId))
  • throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(quota(clientId)))
  • clientSensors.throttleTimeSensor.record(throttleTimeMs)
  • // If delayed, add the element to the delayQueue
  • delayQueue.add(new ThrottledResponse(time, throttleTimeMs, callback))
  • delayQueueSensor.record()
  • logger.debug("Quota violated for sensor (%s). Delay time: (%d)".format(clientSensors.quotaSensor.name(), throttleTimeMs))
  • }
  • throttleTimeMs
  • }

3. 消息的拉取请求

消息的拉取请求由KafkaApis的handleFetchRequest(request: RequestChannel.Request)方法负责处理,这个方法与前面介绍的用于处理消息生产请求的handleProducerRequest(...)方法在结构上非常类似,主要分为以下几步:

  1. 转换RequestChannel.Request参数为FetchRequest对象,检查该请求需要操作的主题和分区是否授权了写操作;
  2. 定义sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData])内部回调方法处理响应回调,sendResponseCallback(...)方法的流程如下:
  3. 根据请求的协议版本转换传入的响应数据参数responsePartitionData,记录处理过程中产生的错误信息;
  4. 定义fetchResponseCallback(delayTimeMs: Int)内部响应处理方法,该方法会构造FetchResponse响应对象,最后由RequestChannel负责发送响应。
  5. 最终调用上面定义的fetchResponseCallback(...)处理响应。
  6. 使用ReplicaManager对象从日志系统中读取消息数据,根据结果情况调用sendResponseCallback(...)方法处理响应。

在方法结构上,这两个方法几乎是一致的,但具体的处理流程是不同的,下面将展开详细介绍。

3.1. handleFetchRequest(…)方法

同样的,在下面的handleFetchRequest(request: RequestChannel.Request)方法中,省去了sendResponseCallback(...)fetchResponseCallback(...)方法的代码放在后面分析,我们先关注handleFetchRequest(...)方法的主流程:

  • // kafka.server.KafkaApis#handleFetchRequest
  • /**
  • * Handle a fetch request
  • */
  • def handleFetchRequest(request: RequestChannel.Request) {
  • // 转换请求为FetchRequest对象
  • val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
  • // 验证主题分区授权情况
  • val (authorizedRequestInfo, unauthorizedRequestInfo) = fetchRequest.requestInfo.partition {
  • case (topicAndPartition, _) => authorize(request.session, Read, new Resource(Topic, topicAndPartition.topic))
  • }
  • // 处理未授权主题分区应该返回的数据
  • val unauthorizedPartitionData = unauthorizedRequestInfo.mapValues { _ =>
  • FetchResponsePartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1, MessageSet.Empty)
  • }
  • // the callback for sending a fetch response
  • // 该Callback函数最后会传递给DelayedFetch对象作为其responseCallback参数
  • def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData]) {
  • ...
  • // 用于发送响应
  • def fetchResponseCallback(delayTimeMs: Int) {
  • ...
  • }
  • ...
  • }
  • if (authorizedRequestInfo.isEmpty)
  • // 没有通过授权的主题分区,直接返回空字典
  • sendResponseCallback(Map.empty)
  • else {
  • // call the replica manager to fetch messages from the local replica
  • // 有通过授权的主题分区,使用ReplicaManager进行处理
  • replicaManager.fetchMessages(
  • fetchRequest.maxWait.toLong,
  • fetchRequest.replicaId,
  • fetchRequest.minBytes,
  • authorizedRequestInfo,
  • sendResponseCallback)
  • }
  • }

可以发现,拉取消息的请求是交给了ReplicaManager的fetchMessages(...)方法,这里也会将sendResponseCallback(...)方法作为responseCallback参数传递给ReplicaManager的fetchMessages(...)方法。

3.2. 消息拉取

ReplicaManager的fetchMessages(...)方法主要用于从日志系统中拉取消息数据,该方法源码如下:

  • // kafka.server.ReplicaManager#fetchMessages
  • /**
  • * Fetch messages from the leader replica, and wait until enough data can be fetched and return;
  • * the callback function will be triggered either when timeout or required fetch info is satisfied
  • */
  • def fetchMessages(timeout: Long,
  • replicaId: Int,
  • fetchMinBytes: Int,
  • fetchInfo: immutable.Map[TopicAndPartition, PartitionFetchInfo],
  • responseCallback: Map[TopicAndPartition, FetchResponsePartitionData] => Unit) {
  • // 只有Follower副本才有replicaId,消费者是没有的,消费者的replicaId始终是-1
  • val isFromFollower = replicaId >= 0
  • // 如果replicaId != -2,就说明是正常的拉取请求,则只能拉取Leader副本的数据
  • val fetchOnlyFromLeader: Boolean = replicaId != Request.DebuggingConsumerId
  • // 判断是否是从broker发出的拉取请求,不是的话就只能读取HighWatermark线以下的数据
  • val fetchOnlyCommitted: Boolean = ! Request.isValidBrokerId(replicaId)
  • // read from local logs
  • // 从Log中读取消息
  • val logReadResults = readFromLocalLog(fetchOnlyFromLeader, fetchOnlyCommitted, fetchInfo)
  • // if the fetch comes from the follower,
  • // update its corresponding log end offset
  • // 检测FetchRequest是否来自Follower副本
  • if(Request.isValidBrokerId(replicaId))
  • /**
  • * updateFollowerLogReadResults()方法用来处理来自Follower副本的FetchRequest请求,主要做下面4件事:
  • * 1. 在Leader中维护了Follower副本的各种状态,这里会更新对应Follower副本的状态,例如,更新LEO、更新lastCaughtUpTimeMsUnderlying等;
  • * 2. 检测是否需要对ISR集合进行扩张,如果ISR集合发生变化,则将新的ISR集合的记录保存到ZooKeeper中;
  • * 3. 检测是否后移Leader的HighWatermark线;
  • * 4. 检测delayedProducePurgatory中相关key对应的DelayedProduce,满足条件则将其执行完成。
  • */
  • updateFollowerLogReadResults(replicaId, logReadResults)
  • // check if this fetch request can be satisfied right away
  • // 统计从Log中读取的字节总数
  • val bytesReadable = logReadResults.values.map(_.info.messageSet.sizeInBytes).sum
  • // 统计在从Log中读取消息的时候,是否发生了异常
  • val errorReadingData = logReadResults.values.foldLeft(false) ((errorIncurred, readResult) =>
  • errorIncurred || (readResult.errorCode != Errors.NONE.code))
  • // respond immediately if 1) fetch request does not want to wait
  • // 2) fetch request does not require any data
  • // 3) has enough data to respond
  • // 4) some error happens while reading data
  • /**
  • * 判断是否能够立即返回FetchResponse,下面四个条件满足任意一个就可以立即返回FetchResponse:
  • * 1. FetchRequest的timeout<=0,即消费者或Follower副本不希望等待;
  • * 2. FetchRequest没有指定要读取的分区,即fetchInfo.size <= 0;
  • * 3. 已经读取了足够的数据,即bytesReadable >= fetchMinBytes;
  • * 4. 在读取数据的过程中发生了异常,即检查errorReadingData。
  • */
  • if(timeout <= 0 || fetchInfo.size <= 0 || bytesReadable >= fetchMinBytes || errorReadingData) {
  • val fetchPartitionData = logReadResults.mapValues(result =>
  • FetchResponsePartitionData(result.errorCode, result.hw, result.info.messageSet))
  • // 直接调用回调函数,生成并发送FetchResponse
  • responseCallback(fetchPartitionData)
  • } else {
  • // construct the fetch results from the read results
  • // 对读取Log的结果进行转换
  • val fetchPartitionStatus = logReadResults.map { case (topicAndPartition, result) =>
  • (topicAndPartition, FetchPartitionStatus(result.info.fetchOffsetMetadata, fetchInfo.get(topicAndPartition).get))
  • }
  • val fetchMetadata = FetchMetadata(fetchMinBytes, fetchOnlyFromLeader, fetchOnlyCommitted, isFromFollower, fetchPartitionStatus)
  • // 创建DelayedFetch对象
  • val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, responseCallback)
  • // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation
  • // 创建delayedFetchKeys
  • val delayedFetchKeys = fetchPartitionStatus.keys.map(new TopicPartitionOperationKey(_)).toSeq
  • // try to complete the request immediately, otherwise put it into the purgatory;
  • // this is because while the delayed fetch operation is being created, new requests
  • // may arrive and hence make this operation completable.
  • // 完成DelayedFetch,否则将DelayedFetch添加到delayedFetchPurgatory中管理
  • delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys)
  • }
  • }

这里有一点需要注意,拉取消息的请求可能来自客户端拉取操作也可能来自其他Follower副本的同步操作,客户端拉取操作只能够读取HighWatermark线以下的数据(即已经确认提交的数据),而其他Follower副本的同步操作则没有这个限制,上面的代码中通过拉取请求对象中的replicaId字段来判断拉取请求是否来自客户端(客户端消费者的replicaId恒为FetchRequest.CONSUMER_REPLICA_ID,即-1,在org.apache.kafka.common.requests.FetchRequest类中有定义);具体拉取数据的操作由readFromLocalLog(...)方法处理,源码如下:

  • // kafka.server.ReplicaManager#readFromLocalLog
  • /**
  • * Read from a single topic/partition at the given offset upto maxSize bytes
  • * 从指定的主题分区读取数据,PartitionFetchInfo对象中存储了读取的起始offset和字节大小
  • */
  • def readFromLocalLog(fetchOnlyFromLeader: Boolean,
  • readOnlyCommitted: Boolean,
  • readPartitionInfo: Map[TopicAndPartition, PartitionFetchInfo]): Map[TopicAndPartition, LogReadResult] = {
  • readPartitionInfo.map { case (TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize)) =>
  • BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.mark()
  • BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.mark()
  • val partitionDataAndOffsetInfo =
  • try {
  • trace("Fetching log segment for topic %s, partition %d, offset %d, size %d".format(topic, partition, offset, fetchSize))
  • // decide whether to only fetch from leader
  • // 根据fetchOnlyFromLeader参数决定是否只从Leader分区拉取
  • val localReplica = if (fetchOnlyFromLeader)
  • // 该方法会获取Leader分区的Replica对象
  • getLeaderReplicaIfLocal(topic, partition)
  • else
  • // 该方法获取的Replica对象不保证是Leader分区的
  • getReplicaOrException(topic, partition)
  • // decide whether to only fetch committed data (i.e. messages below high watermark)
  • // 根据readOnlyCommitted参数决定是否只拉取HighWatermark线以下的数据
  • val maxOffsetOpt = if (readOnlyCommitted)
  • Some(localReplica.highWatermark.messageOffset)
  • else
  • None
  • /* Read the LogOffsetMetadata prior to performing the read from the log.
  • * We use the LogOffsetMetadata to determine if a particular replica is in-sync or not.
  • * Using the log end offset after performing the read can lead to a race condition
  • * where data gets appended to the log immediately after the replica has consumed from it
  • * This can cause a replica to always be out of sync.
  • */
  • // 获取Replica的logEndOffset
  • val initialLogEndOffset = localReplica.logEndOffset
  • val logReadInfo = localReplica.log match {
  • case Some(log) =>
  • // 从Replica的Log中读取数据,注意此时读取的最大offset由maxOffsetOpt控制了
  • log.read(offset, fetchSize, maxOffsetOpt)
  • case None =>
  • error("Leader for partition [%s,%d] does not have a local log".format(topic, partition))
  • FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty)
  • }
  • // 检查是否读到了Log的结尾
  • val readToEndOfLog = initialLogEndOffset.messageOffset - logReadInfo.fetchOffsetMetadata.messageOffset <= 0
  • // 将读取到的数据构造为LogReadResult对象
  • LogReadResult(logReadInfo, localReplica.highWatermark.messageOffset, fetchSize, readToEndOfLog, None)
  • } catch {
  • // NOTE: Failed fetch requests metric is not incremented for known exceptions since it
  • // is supposed to indicate un-expected failure of a broker in handling a fetch request
  • case utpe: UnknownTopicOrPartitionException => // 未知主题分区异常
  • LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, false, Some(utpe))
  • case nle: NotLeaderForPartitionException => // 当前副本不是Leader副本异常
  • LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, false, Some(nle))
  • case rnae: ReplicaNotAvailableException => // 副本不可用异常
  • LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, false, Some(rnae))
  • case oor : OffsetOutOfRangeException => // 读取数据超限异常
  • LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, false, Some(oor))
  • case e: Throwable =>
  • BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark()
  • BrokerTopicStats.getBrokerAllTopicsStats().failedFetchRequestRate.mark()
  • error("Error processing fetch operation on partition [%s,%d] offset %d".format(topic, partition, offset), e)
  • LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, false, Some(e))
  • }
  • (TopicAndPartition(topic, partition), partitionDataAndOffsetInfo)
  • }
  • }

该方法首先会通过传入的fetchOnlyFromLeaderreadOnlyCommitted参数确定拉取分区和可拉取消息的最大偏移量(客户端拉取请求受HighWatermark线的限制,而Follower副本的同步请求则不受限制),然后通过分区的Log对象的read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None): FetchDataInfo方法读取数据,读取的数据会被包装为LogReadResult对象进行返回。

接下来的操作就交给上层的fetchMessages(...)方法了,如果发起拉取请求的是Follower副本,会调用updateReplicaLogReadResult(replicaId: Int, logReadResult: LogReadResult)方法,该方法要处理的事情比较多,先看它的源码:

  • // kafka.cluster.Partition#updateReplicaLogReadResult
  • /**
  • * Update the log end offset of a certain replica of this partition
  • */
  • def updateReplicaLogReadResult(replicaId: Int, logReadResult: LogReadResult) {
  • // 根据replicaId获取对应的Replica对象
  • getReplica(replicaId) match {
  • case Some(replica) =>
  • // 更新副本的LEO
  • replica.updateLogReadResult(logReadResult)
  • // check if we need to expand ISR to include this replica
  • // if it is not in the ISR yet
  • /**
  • * 检查是否应该将当前副本加入In-Sync集合;有可能当前副本由于滞后被In-Sync集合排除了,
  • * 此时可以执行一次检测,如果满足条件就将当前副本重新添加到In-Sync集合中
  • */
  • maybeExpandIsr(replicaId)
  • debug("Recorded replica %d log end offset (LEO) position %d for partition %s."
  • .format(replicaId,
  • logReadResult.info.fetchOffsetMetadata.messageOffset,
  • TopicAndPartition(topic, partitionId)))
  • case None =>
  • throw new NotAssignedReplicaException(("Leader %d failed to record follower %d's position %d since the replica" +
  • " is not recognized to be one of the assigned replicas %s for partition %s.")
  • .format(localBrokerId,
  • replicaId,
  • logReadResult.info.fetchOffsetMetadata.messageOffset,
  • assignedReplicas().map(_.brokerId).mkString(","),
  • TopicAndPartition(topic, partitionId)))
  • }
  • }

此时已经拉取到对应的数据了,如果是Follower副本拉取的,则应该更新Follower副本的LogEndOffset记录,上面的源码中replica.updateLogReadResult(logReadResult)就是用于做这个操作,该方法位于Replica类,源码非常简单如下:

  • // kafka.cluster.Replica#updateLogReadResult
  • def updateLogReadResult(logReadResult : LogReadResult) {
  • // 更新LEO
  • logEndOffset = logReadResult.info.fetchOffsetMetadata
  • /* If the request read up to the log end offset snapshot when the read was initiated,
  • * set the lastCaughtUpTimeMsUnderlying to the current time.
  • * This means that the replica is fully caught up.
  • */
  • if(logReadResult.isReadFromLogEnd) {
  • lastCaughtUpTimeMsUnderlying.set(time.milliseconds)
  • }
  • }

同时,当Follower副本成功拉取数据后,还应该检查该Follower副本是否存在于In-Sync集合中;某些Follower副本可能由于某种原因导致同步状态滞后,被排除到In-Sync之外,此时Follower副本成功地拉取了数据,如果数据同步没有严重滞后就应该将其重新添加到In-Sync集合,该功能由Partition的maybeExpandIsr(replicaId: Int)方法实现,源码如下:

  • // kafka.cluster.Partition#maybeExpandIsr
  • /**
  • * Check and maybe expand the ISR of the partition.
  • *
  • * This function can be triggered when a replica's LEO has incremented
  • */
  • def maybeExpandIsr(replicaId: Int) {
  • val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) {
  • // check if this replica needs to be added to the ISR
  • // 检测当前Replica是否应该被添加到In-Sync集合中
  • leaderReplicaIfLocal() match {
  • case Some(leaderReplica) =>
  • // 根据replicaId获取Replica对象
  • val replica = getReplica(replicaId).get
  • // 查看Leader副本的HighWatermark
  • val leaderHW = leaderReplica.highWatermark
  • /**
  • * 同时满足以下三个条件则可以将当前副本添加到In-Sync集合中:
  • * 1. 当前In-Sync集合不包含当前Replica副本;
  • * 2. 当前副本是否是assignedReplicas副本(AR)之一;
  • * 3. 当前副本的LEO大于等于Leader副本的HighWatermark;
  • */
  • if(!inSyncReplicas.contains(replica) &&
  • assignedReplicas.map(_.brokerId).contains(replicaId) &&
  • replica.logEndOffset.offsetDiff(leaderHW) >= 0) {
  • // 将当前副本添加到In-Sync集合
  • val newInSyncReplicas = inSyncReplicas + replica
  • info("Expanding ISR for partition [%s,%d] from %s to %s"
  • .format(topic, partitionId, inSyncReplicas.map(_.brokerId).mkString(","),
  • newInSyncReplicas.map(_.brokerId).mkString(",")))
  • // update ISR in ZK and cache
  • // 更新缓存及Zookeeper中的In-Sync集合数据
  • updateIsr(newInSyncReplicas)
  • replicaManager.isrExpandRate.mark()
  • }
  • // check if the HW of the partition can now be incremented
  • // since the replica maybe now be in the ISR and its LEO has just incremented
  • // 检测分区的HighWatermark是否可以更新了,如果更新了,该方法会返回true
  • maybeIncrementLeaderHW(leaderReplica)
  • case None => false // nothing to do if no longer leader
  • }
  • }
  • // some delayed operations may be unblocked after HW changed
  • if (leaderHWIncremented)
  • // 如果分区的HighWatermark发生了更新,尝试完成时间轮中的DelayedProduce和DelayedFetch延迟操作
  • tryCompleteDelayedRequests()
  • }

该方法在进行一系列检查后,如果满足条件就会更新Zookeeper及内存缓存中记录的In-Sync集合数据,这部分操作由updateIsr(newIsr: Set[Replica])方法实现,这部分内容会在后面的文章中介绍。另外还会检测整个分区的HighWatermark线是否可以更新了,这部分操作由maybeIncrementLeaderHW(leaderReplica: Replica): Boolean方法完成,源码如下:

  • // kafka.cluster.Partition#maybeIncrementLeaderHW
  • /**
  • * Check and maybe increment the high watermark of the partition;
  • * this function can be triggered when
  • *
  • * 1. Partition ISR changed
  • * 2. Any replica's LEO changed
  • *
  • * Returns true if the HW was incremented, and false otherwise.
  • * Note There is no need to acquire the leaderIsrUpdate lock here
  • * since all callers of this private API acquire that lock
  • *
  • * 在分区的ISR发生变化,或任何副本的LEO发生变化时会触发该方法更新HW
  • * 当HW发生更新时返回true,否则返回false
  • */
  • private def maybeIncrementLeaderHW(leaderReplica: Replica): Boolean = {
  • // 获取In-Sync副本的LEO
  • val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset)
  • // 根据所有In_Sync副本的LEO来计算HighWatermark,即取最小的LEO为HighWatermark
  • val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering)
  • // 旧的HighWatermark
  • val oldHighWatermark = leaderReplica.highWatermark
  • // 新的HighWatermark比旧的HighWatermark大,或者新的HighWatermark还处于该LogSegment上
  • if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset || oldHighWatermark.onOlderSegment(newHighWatermark)) {
  • // 更新HighWatermark
  • leaderReplica.highWatermark = newHighWatermark
  • debug("High watermark for partition [%s,%d] updated to %s".format(topic, partitionId, newHighWatermark))
  • true
  • } else {
  • debug("Skipping update high watermark since Old hw %s is larger than new hw %s for partition [%s,%d]. All leo's are %s"
  • .format(oldHighWatermark, newHighWatermark, topic, partitionId, allLogEndOffsets.mkString(",")))
  • false
  • }
  • }

这部分源码比较简单,这里只需要知道,HighWatermark线取决于所有分区副本的LogEndOffset中最小的那个,同时一旦HighWatermark线发生更新,该方法会返回结果true给上层方法maybeExpandIsr(replicaId: Int),以根据该结果决定是否调用tryCompleteDelayedRequests()来尝试完成延迟操作,该方法源码如下:

  • // kafka.cluster.Partition#tryCompleteDelayedRequests
  • /**
  • * Try to complete any pending requests. This should be called without holding the leaderIsrUpdateLock.
  • */
  • private def tryCompleteDelayedRequests() {
  • val requestKey = new TopicPartitionOperationKey(this.topic, this.partitionId)
  • // 尝试完成消息拉取请求
  • replicaManager.tryCompleteDelayedFetch(requestKey)
  • // 尝试完成消息写入请求
  • replicaManager.tryCompleteDelayedProduce(requestKey)
  • }

可以发现,这里会同时尝试完成时间轮中的相关的DelayedProduce和DelayedFetch延时操作,相信大家也知道为什么需要这么做。

回到ReplicaManager的fetchMessages(...)方法,此时我们已经知道了,如果是Follower副本向Leader副本发起拉取消息请求时,Follower副本会从Leader副本读取消息;在实际的环境中,Follower副本是有多个的,拉取请求也是并发进行的,只有在所有的Follower副本都完成了拉取数据以同步,才能最终更新分区的HighWatermark线。我们接着分析fetchMessages(...)方法的源码,在接下来的代码中,会根据读取数据的字节数、读取过程是否发生异常、发起拉取请求指定的超时等待时间来决定如何返回响应,回顾这部分源码如下:

  • // kafka.server.ReplicaManager#fetchMessages
  • /**
  • * Fetch messages from the leader replica, and wait until enough data can be fetched and return;
  • * the callback function will be triggered either when timeout or required fetch info is satisfied
  • */
  • def fetchMessages(timeout: Long,
  • replicaId: Int,
  • fetchMinBytes: Int,
  • fetchInfo: immutable.Map[TopicAndPartition, PartitionFetchInfo],
  • responseCallback: Map[TopicAndPartition, FetchResponsePartitionData] => Unit) {
  • // 只有Follower副本才有replicaId,消费者是没有的,消费者的replicaId始终是-1
  • val isFromFollower = replicaId >= 0
  • // 如果replicaId != -2,就说明是正常的拉取请求,则只能拉取Leader副本的数据
  • val fetchOnlyFromLeader: Boolean = replicaId != Request.DebuggingConsumerId
  • // 判断是否是从broker发出的拉取请求,不是的话就只能读取HighWatermark线以下的数据
  • val fetchOnlyCommitted: Boolean = ! Request.isValidBrokerId(replicaId)
  • // read from local logs
  • // 从Log中读取消息
  • val logReadResults = readFromLocalLog(fetchOnlyFromLeader, fetchOnlyCommitted, fetchInfo)
  • // if the fetch comes from the follower,
  • // update its corresponding log end offset
  • // 检测FetchRequest是否来自Follower副本
  • if(Request.isValidBrokerId(replicaId))
  • /**
  • * updateFollowerLogReadResults()方法用来处理来自Follower副本的FetchRequest请求,主要做下面4件事:
  • * 1. 在Leader中维护了Follower副本的各种状态,这里会更新对应Follower副本的状态,例如,更新LEO、更新lastCaughtUpTimeMsUnderlying等;
  • * 2. 检测是否需要对ISR集合进行扩张,如果ISR集合发生变化,则将新的ISR集合的记录保存到ZooKeeper中;
  • * 3. 检测是否后移Leader的HighWatermark;
  • * 4. 检测delayedProducePurgatory中相关key对应的DelayedProduce,满足条件则将其执行完成。
  • */
  • updateFollowerLogReadResults(replicaId, logReadResults)
  • // check if this fetch request can be satisfied right away
  • // 统计从Log中读取的字节总数
  • val bytesReadable = logReadResults.values.map(_.info.messageSet.sizeInBytes).sum
  • // 统计在从Log中读取消息的时候,是否发生了异常
  • val errorReadingData = logReadResults.values.foldLeft(false) ((errorIncurred, readResult) =>
  • errorIncurred || (readResult.errorCode != Errors.NONE.code))
  • // respond immediately if 1) fetch request does not want to wait
  • // 2) fetch request does not require any data
  • // 3) has enough data to respond
  • // 4) some error happens while reading data
  • /**
  • * 判断是否能够立即返回FetchResponse,下面四个条件满足任意一个就可以立即返回FetchResponse:
  • * 1. FetchRequest的timeout<=0,即消费者或Follower副本不希望等待;
  • * 2. FetchRequest没有指定要读取的分区,即fetchInfo.size <= 0;
  • * 3. 已经读取了足够的数据,即bytesReadable >= fetchMinBytes;
  • * 4. 在读取数据的过程中发生了异常,即检查errorReadingData。
  • */
  • if(timeout <= 0 || fetchInfo.size <= 0 || bytesReadable >= fetchMinBytes || errorReadingData) {
  • val fetchPartitionData = logReadResults.mapValues(result =>
  • FetchResponsePartitionData(result.errorCode, result.hw, result.info.messageSet))
  • // 直接调用回调函数,生成并发送FetchResponse
  • responseCallback(fetchPartitionData)
  • } else {
  • // construct the fetch results from the read results
  • // 对读取Log的结果进行转换
  • val fetchPartitionStatus = logReadResults.map { case (topicAndPartition, result) =>
  • (topicAndPartition, FetchPartitionStatus(result.info.fetchOffsetMetadata, fetchInfo.get(topicAndPartition).get))
  • }
  • val fetchMetadata = FetchMetadata(fetchMinBytes, fetchOnlyFromLeader, fetchOnlyCommitted, isFromFollower, fetchPartitionStatus)
  • // 创建DelayedFetch对象
  • val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, responseCallback)
  • // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation
  • // 创建delayedFetchKeys
  • val delayedFetchKeys = fetchPartitionStatus.keys.map(new TopicPartitionOperationKey(_)).toSeq
  • // try to complete the request immediately, otherwise put it into the purgatory;
  • // this is because while the delayed fetch operation is being created, new requests
  • // may arrive and hence make this operation completable.
  • // 完成DelayedFetch,否则将DelayedFetch添加到delayedFetchPurgatory中管理
  • delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys)
  • }
  • }

满足下面四个条件任意一个即可就可以立即返回FetchResponse:

  1. 发起拉取的请求不希望等待;
  2. 拉取请求没有指定要读取的分区;
  3. 已经读取了足够的数据;
  4. 在读取数据的过程中发生了异常。

这四种情况很好理解,这里就不做赘述,我们主要关心另一种情况下,需要创建DelayedFetch延迟操作来完成,这种情况一般发生在请求拉取的消息并不能足量读取到,但此时可能请求写入的操作正在进行,如果发起拉取的请求愿意等待(timeout大于0),则可以添加一个DelayedFetch延迟操作等待一段时间,说不定在这段时间内写入的消息刚好满足了想要读取的需求,就可以执行DelayedFetch响应拉取请求了;这里有一个细节,DelayedFetch是有超时时间的,也就是说DelayedFetch并不会无限期等待,如果等待一段时间后可拉取数据还无法满足拉取需求,DelayedFetch会直接超时结束的。

在上面的代码中,创建了DelayedFetch后,会将其添加到delayedFetchPurgatory炼狱中,与前面对DelayedProduce的操作方式如出一辙。

3.3. DelayedFetch延迟操作

我们关注DelayedFetch的定义:

  • // kafka.server.DelayedFetch
  • /**
  • * A delayed fetch operation that can be created by the replica manager and watched
  • * in the fetch operation purgatory
  • * @param delayMs 延迟操作的延迟时长
  • * @param fetchMetadata FetchMetadata中为FetchRequest中的所有相关分区记录了相关状态,主要用于判断DelayedProduce是否满足执行条件
  • * @param replicaManager 此DelayedFetch关联的ReplicaManager对象
  • * @param responseCallback 任务满足条件或到期执行时,在DelayedFetch.onComplete()方法中调用的回调函数,其主要功能是创建FetchResponse并添加到RequestChannels中对应的responseQueue队列中
  • */
  • class DelayedFetch(delayMs: Long,
  • fetchMetadata: FetchMetadata,
  • replicaManager: ReplicaManager,
  • responseCallback: Map[TopicAndPartition, FetchResponsePartitionData] => Unit)
  • extends DelayedOperation(delayMs) {
  • ...
  • }

DelayedFetch的定义比DelayedProduce要简单很多,类似的,其中也涉及了FetchPartitionStatus和FetchMetadata两个类,比较简单:

  • // kafka.server.FetchPartitionStatus
  • /**
  • * @param startOffsetMetadata 记录了在前面读取Log时已经读取到的offset位置
  • * @param fetchInfo 记录FetchRequest携带的一些信息,主要是请求的offset以及读取最大字节数
  • */
  • case class FetchPartitionStatus(startOffsetMetadata: LogOffsetMetadata, fetchInfo: PartitionFetchInfo) {
  • override def toString = "[startOffsetMetadata: " + startOffsetMetadata + ", " +
  • "fetchInfo: " + fetchInfo + "]"
  • }
  • // kafka.server.FetchMetadata
  • /**
  • * The fetch metadata maintained by the delayed fetch operation
  • * @param fetchMinBytes 记录需要读取的最小字节数
  • * @param fetchOnlyLeader
  • * @param fetchOnlyCommitted
  • * @param isFromFollower
  • * @param fetchPartitionStatus 记录每个分区的FetchPartitionStatus
  • */
  • case class FetchMetadata(fetchMinBytes: Int,
  • fetchOnlyLeader: Boolean,
  • fetchOnlyCommitted: Boolean,
  • isFromFollower: Boolean,
  • fetchPartitionStatus: Map[TopicAndPartition, FetchPartitionStatus]) {
  • override def toString = "[minBytes: " + fetchMinBytes + ", " +
  • "onlyLeader:" + fetchOnlyLeader + ", "
  • "onlyCommitted: " + fetchOnlyCommitted + ", "
  • "partitionStatus: " + fetchPartitionStatus + "]"
  • }

我们分析一下DelayedFetch的tryComplete()核心方法,源码如下:

  • // kafka.server.DelayedFetch#tryComplete
  • /**
  • * The operation can be completed if:
  • * 主要负责检测是否满足DelayedFetch的执行条件,并在满足条件时调用forceComplete()方法执行延迟操作。
  • * 满足下面任一条件,即表示此分区满足DelayedFetch的执行条件:
  • * Case A: This broker is no longer the leader for some partitions it tries to fetch
  • * 发生Leader副本迁移,当前节点不再是该分区的Leader副本所在的节点。
  • * Case B: This broker does not know of some partitions it tries to fetch
  • * 当前Broker找不到需要读取数据的分区副本。
  • * Case C: The fetch offset locates not on the last segment of the log
  • * 开始读取的offset不在activeSegment中,此时可能是发生了Log截断,也有可能是发生了roll操作产生了新的activeSegment。
  • * Case D: The accumulated bytes from all the fetching partitions exceeds the minimum bytes
  • * 累计读取的字节数超过最小字节数限制。
  • *
  • * Upon completion, should return whatever data is available for each valid partition
  • */
  • override def tryComplete() : Boolean = {
  • var accumulatedSize = 0
  • // 遍历fetchMetadata中所有Partition的状态
  • fetchMetadata.fetchPartitionStatus.foreach {
  • case (topicAndPartition, fetchStatus) =>
  • // 获取前面读取Log时的结束位置
  • val fetchOffset = fetchStatus.startOffsetMetadata
  • try {
  • if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) {
  • // 查找分区的Leader副本,如果找不到就抛出异常
  • val replica = replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition)
  • /**
  • * 根据FetchRequest请求的来源设置能读取的最大offset值。
  • * 很显然,消费者对应的endOffset是HW,而Follower副本对应的endOffset是LEO
  • */
  • val endOffset =
  • if (fetchMetadata.fetchOnlyCommitted)
  • replica.highWatermark
  • else
  • replica.logEndOffset
  • // Go directly to the check for Case D if the message offsets are the same. If the log segment
  • // has just rolled, then the high watermark offset will remain the same but be on the old segment,
  • // which would incorrectly be seen as an instance of Case C.
  • /** Case D
  • * 检查上次读取后endOffset是否发生变化。
  • * 如果没改变,之前读不到足够的数据现在还是读不到,即任务条件依然不满足;
  • * 如果变了,则继续下面的检查,看是否真正满足任务执行条件
  • */
  • if (endOffset.messageOffset != fetchOffset.messageOffset) {
  • if (endOffset.onOlderSegment(fetchOffset)) {
  • // Case C, this can happen when the new fetch operation is on a truncated leader
  • /** Case C
  • * 此时,endOffset出现减小的情况,跑到baseOffset较小的Segment上了,
  • * 可能是Leader副本的Log出现了truncate操作
  • */
  • debug("Satisfying fetch %s since it is fetching later segments of partition %s.".format(fetchMetadata, topicAndPartition))
  • return forceComplete()
  • } else if (fetchOffset.onOlderSegment(endOffset)) {
  • // Case C, this can happen when the fetch operation is falling behind the current segment
  • // or the partition has just rolled a new segment
  • /** Case C
  • * 此时,fetchOffset虽然依然在endOffset之前,但是产生了新的activeSegment
  • * fetchOffset在较旧的LogSegment,而endOffset在activeSegment
  • */
  • debug("Satisfying fetch %s immediately since it is fetching older segments.".format(fetchMetadata))
  • return forceComplete()
  • } else if (fetchOffset.messageOffset < endOffset.messageOffset) {
  • // we need take the partition fetch size as upper bound when accumulating the bytes
  • // endOffset和fetchOffset依然在同一个LogSegment中,且endOffset向后移动,那就尝试计算累计的字节数
  • accumulatedSize += math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.fetchSize)
  • }
  • }
  • }
  • } catch {
  • case utpe: UnknownTopicOrPartitionException => // Case B
  • debug("Broker no longer know of %s, satisfy %s immediately".format(topicAndPartition, fetchMetadata))
  • return forceComplete()
  • case nle: NotLeaderForPartitionException => // Case A
  • debug("Broker is no longer the leader of %s, satisfy %s immediately".format(topicAndPartition, fetchMetadata))
  • return forceComplete()
  • }
  • }
  • // Case D
  • // Case D 累计读取字节数足够,满足Case 4
  • if (accumulatedSize >= fetchMetadata.fetchMinBytes)
  • // 调用onComplete()方法,在其中会重新读取数据
  • forceComplete()
  • else
  • false
  • }

上面的注释解释的比较清楚了,这里只需要注意一点,对于客户端来说,只能拉取到HighWatermark线以前的数据,而对于Follower副本则可以拉取到LogEndOffset线以前的数据。

在满足条件之后,该方法会调用forceComplete()方法强制完成延迟操作,该方法内部其实调用了onComplete()方法:

  • // kafka.server.DelayedFetch#onComplete
  • /**
  • * Upon completion, read whatever data is available and pass to the complete callback
  • */
  • override def onComplete() {
  • // 重新从Log中读取数据
  • val logReadResults = replicaManager.readFromLocalLog(fetchMetadata.fetchOnlyLeader,
  • fetchMetadata.fetchOnlyCommitted,
  • fetchMetadata.fetchPartitionStatus.mapValues(status => status.fetchInfo))
  • // 将读取结果进行封装
  • val fetchPartitionData = logReadResults.mapValues(result =>
  • FetchResponsePartitionData(result.errorCode, result.hw, result.info.messageSet))
  • // 调用回调函数
  • responseCallback(fetchPartitionData)
  • }

与DelayedProduce一样,DelayedFetch最后也会调用responseCallback回调,传入构造的Map[TopicAndPartition, FetchResponsePartitionData]类型的响应字典。

3.4. 响应处理

DelayedFetch类中的responseCallback回调就是前面提到过的KafkaApis的handleFetchRequest(request: RequestChannel.Request)中定义的sendResponseCallback(...)方法,现在可以来分析这两个方法的源码:

  • // the callback for sending a fetch response
  • // 该Callback函数最后会传递给DelayedFetch对象作为其responseCallback参数
  • def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData]) {
  • // 协议版本转换
  • val convertedPartitionData =
  • // Need to down-convert message when consumer only takes magic value 0.
  • if (fetchRequest.versionId <= 1) {
  • responsePartitionData.map { case (tp, data) =>
  • // We only do down-conversion when:
  • // 1. The message format version configured for the topic is using magic value > 0, and
  • // 2. The message set contains message whose magic > 0
  • // This is to reduce the message format conversion as much as possible. The conversion will only occur
  • // when new message format is used for the topic and we see an old request.
  • // Please note that if the message format is changed from a higher version back to lower version this
  • // test might break because some messages in new message format can be delivered to consumers before 0.10.0.0
  • // without format down conversion.
  • val convertedData = if (replicaManager.getMessageFormatVersion(tp).exists(_ > Message.MagicValue_V0) &&
  • !data.messages.isMagicValueInAllWrapperMessages(Message.MagicValue_V0)) {
  • trace(s"Down converting message to V0 for fetch request from ${fetchRequest.clientId}")
  • new FetchResponsePartitionData(data.error, data.hw, data.messages.asInstanceOf[FileMessageSet].toMessageFormat(Message.MagicValue_V0))
  • } else data
  • tp -> convertedData
  • }
  • } else responsePartitionData
  • // 将之前把未认证通过的集合与convertedPartitionData合并
  • val mergedPartitionData = convertedPartitionData ++ unauthorizedPartitionData
  • // 记录错误日志
  • mergedPartitionData.foreach { case (topicAndPartition, data) =>
  • if (data.error != Errors.NONE.code)
  • debug(s"Fetch request with correlation id ${fetchRequest.correlationId} from client ${fetchRequest.clientId} " +
  • s"on partition $topicAndPartition failed due to ${Errors.forCode(data.error).exceptionName}")
  • // record the bytes out metrics only when the response is being sent
  • BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesOutRate.mark(data.messages.sizeInBytes)
  • BrokerTopicStats.getBrokerAllTopicsStats().bytesOutRate.mark(data.messages.sizeInBytes)
  • }
  • // 用于发送响应
  • def fetchResponseCallback(delayTimeMs: Int) {
  • trace(s"Sending fetch response to client ${fetchRequest.clientId} of " +
  • s"${convertedPartitionData.values.map(_.messages.sizeInBytes).sum} bytes")
  • // 生成FetchResponse对象
  • val response = FetchResponse(fetchRequest.correlationId, mergedPartitionData, fetchRequest.versionId, delayTimeMs)
  • // 向对应responseQueue中添加一个SendAction的Response,其中封装了上面的FetchResponse对象
  • requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, response)))
  • }
  • // When this callback is triggered, the remote API call has completed
  • request.apiRemoteCompleteTimeMs = SystemTime.milliseconds
  • // Do not throttle replication traffic
  • if (fetchRequest.isFromFollower) {
  • // 调用fetchResponseCallback()返回FetchResponse
  • fetchResponseCallback(0)
  • } else {
  • // 底层调用fetchResponseCallback()
  • quotaManagers(ApiKeys.FETCH.id).recordAndMaybeThrottle(fetchRequest.clientId,
  • FetchResponse.responseSize(mergedPartitionData.groupBy(_._1.topic),
  • fetchRequest.versionId),
  • fetchResponseCallback)
  • }
  • }

相对来说,DelayedFetch发送响应的回调方法比DelayedProduce的要简单不少,最终也是由KafkaApis的quotaManagers字典中保存的ClientQuotaManager对象的recordAndMaybeThrottle(clientId: String, value: Int, callback: Int => Unit): Int方法完成的,代码与前面的一样,这里就不赘述了。

4. 总结

从上面对DelayedProduce和DelayedFetch两个延迟操作的讲解来看,我们可以对Kafka服务端处理生产和拉取消息的两种请求的流程有一定的理解了,下面是综合DelayedProduce和DelayedFetch处理的流程示意图:

1.DelayedProduce和DelayedFetch的协同.png

对于生产请求来说,有以下流程:

  • 当生产者向Kafka服务端发送ProducerRequest时,会由KafkaApis负责路由到特定的处理方法上。
  • 生产请求被解析后,会由ReplicaManager负责将消息数据写入到日志系统。
  • 服务端需要根据生产请求的ACK参数决定如何返回响应;在ACK参数为0或1时,只需要Leader副本成功写入数据即可返回响应,但在ACK参数为-1时,需要等待Follower副本都同步了数据才可以返回响应;此时会构造一个DelayedProduce延迟任务扔进delayedProducePurgator炼狱中进行等待。
  • 在生产请求产生的DelayedProduce任务等待期间,会有其他的消费客户端或Follower副本从Leader副本拉取数据,当所有Follower副本同步了Leader的数据之后,会尝试执行delayedProducePurgator炼狱中的DelayedProduce任务,此时DelayedProduce任务就可以完成生产请求处理并向生产客户端返回响应了。
  • 如果DelayedProduce在等待期间Follower副本始终没有完成数据同步,DelayedProduce也会由于超时而被执行。

对于拉取请求来说,有以下流程(以客户端消费者为例):

  • 当消费者客户端向Kafka服务端发送FetchRequest时,会由KafkaApis负责路由到特定的处理方法上。
  • 消费请求被解析后,会由ReplicaManager负责从日志系统中读取数据。
  • 读取的数据范围可能超出了当前日志系统的范围,也即是没有读取到足够的数据,此时会构造一个DelayedFetch延迟任务扔进delayedFetchPurgator炼狱中进行等待。
  • 在消费者请求产生的DelayedFetch任务等待期间,可能会有其他的生产者副本正在向日志系统写入数据,当写入了一定的数据之后会尝试执行delayedFetchPurgator炼狱中的DelayedFetch任务,此时DelayedFetch任务就可以完成消费请求处理并向生产客户端返回响应了。
  • 如果DelayedFetch在等待期间始终未能读取到足量的数据,DelayedFetch也会由于超时而被执行。