大数据
流式处理
Kafka
源码解析

Kafka系列 22 - 服务端源码分析 13:GroupCoordinator相关组件

简介:主要讲解与GroupCoordinator相关的组件,包括GroupMetadata、MemberMetadata等

1. GroupCoordinator简介

在Kafka服务端,每一个Broker上都会实例化一个GroupCoordinator对象,Kafka按照Consumer Group的名称将其分配给对应的GroupCoordinator进行管理;每个GroupCoordinator只负责管理Consumer Group的一个子集,而非集群中全部的Consumer Group。GroupCoordinator内部依赖于一个GroupMetadata实例,用于记录Consumer Group的元数据信息,而每个GroupMetadata内部会维护一个HashMap字典,使用MemberMetadata记录了属于自己维护的Consumer Group的客户端信息,它们之间的关系示意图如下:

1.GroupCoordinator与ConsumerGroup的关系.png

GroupCoordinator有几项比较重要的功能:

  1. 负责处理JoinGroupRequest和SyncGroupRequest,完成Consumer Group中分区的分配工作;
  2. 通过GroupMetadataManager和内部主题__consumer_offsets维护Offset信息,以便消费者获知分区的消费位置;
  3. 记录Consumer Group的相关信息,即使Broker宕机导致Consumer Group由新的GroupCoordinator进行管理,新GroupCoordinator也可以知道Consumer Group中每个消费者负责处理哪个分区等信息;
  4. 通过心跳消息检测消费者的状态。

这些功能是通过GroupCoordinator与MemberMetadata、GroupMetadata及GroupMetadataManager等组件相互配合完成的,下面我们先分别了解这些依赖组件。

2. MemberMetadata

MemberMetadata是用于记录消费者元数据的类,它的定义和重要字段如下:

  • /**
  • * Member metadata contains the following metadata:
  • *
  • * Heartbeat metadata:
  • * 1. negotiated heartbeat session timeout
  • * 2. timestamp of the latest heartbeat
  • *
  • * Protocol metadata:
  • * 1. the list of supported protocols (ordered by preference)
  • * 2. the metadata associated with each protocol
  • *
  • * In addition, it also contains the following state information:
  • *
  • * 1. Awaiting rebalance callback: when the group is in the prepare-rebalance state,
  • * its rebalance callback will be kept in the metadata if the
  • * member has sent the join group request
  • * 2. Awaiting sync callback: when the group is in the awaiting-sync state, its sync callback
  • * is kept in metadata until the leader provides the group assignment
  • * and the group transitions to stable
  • *
  • * GroupCoordinator用于记录消费者的元数据的类
  • *
  • * @param memberId 对应消费者的ID,由服务端的GroupCoordinator分配
  • * @param groupId 记录消费者所在的Consumer Group的ID
  • * @param clientId 消费者客户端ID,与memberId不同
  • * @param clientHost 消费者客户端的Host信息
  • * @param sessionTimeoutMs
  • * @param supportedProtocols 对应消费者支持的PartitionAssignor
  • */
  • @nonthreadsafe
  • private[coordinator] class MemberMetadata(val memberId: String,
  • val groupId: String,
  • val clientId: String,
  • val clientHost: String,
  • val sessionTimeoutMs: Int,
  • var supportedProtocols: List[(String, Array[Byte])]) {
  • // 记录分配给当前Member的分区信息
  • var assignment: Array[Byte] = Array.empty[Byte]
  • // 与JoinGroupRequest相关的回调函数
  • var awaitingJoinCallback: JoinGroupResult => Unit = null
  • // 与SyncGroupRequest相关的回调函数
  • var awaitingSyncCallback: (Array[Byte], Short) => Unit = null
  • // 最后一次收到心跳消息的时间戳
  • var latestHeartbeat: Long = -1
  • // 标识对应的消费者是否已经离开了Consumer Group
  • var isLeaving: Boolean = false
  • // 当前Member支持的PartitionAssignor协议集合
  • def protocols = supportedProtocols.map(_._1).toSet
  • ...
  • }

在MemberMetadata类中,将客户端都称作为Member,assignment将记录分配给当前客户端的分区信息;awaitingJoinCallbackawaitingSyncCallback分别是处理JoinGroupRequest和SyncGroupRequest时会用到的回调函数,后面会详细介绍;latestHeartbeat则是GroupCoordinator最后一次收到该客户端心跳的时间,用于判断客户端是否存活。

MemberMetadata类提供了从给定候选PartitionAssignor中选择消费者支持的PartitionAssignor的方法vote(candidates: Set[String]): String,源码如下:

kafka.coordinator.MemberMetadata#vote
  • /**
  • * Vote for one of the potential group protocols. This takes into account the protocol preference as
  • * indicated by the order of supported protocols and returns the first one also contained in the set
  • * 从给定候选的PartitionAssignor中选择消费者支持的PartitionAssignor
  • */
  • def vote(candidates: Set[String]): String = {
  • // 遍历客户端支持的PartitionAssignor,根据指定的candidates包含的PartitionAssignor返回支持的PartitionAssignor
  • supportedProtocols.find({ case (protocol, _) => candidates.contains(protocol)}) match {
  • case Some((protocol, _)) => protocol
  • case None =>
  • throw new IllegalArgumentException("Member does not support any of the candidate protocols")
  • }
  • }

另外MemberMetadata还提供了一些辅助方法:

kafka.coordinator.MemberMetadata
  • /**
  • * Get metadata corresponding to the provided protocol.
  • */
  • def metadata(protocol: String): Array[Byte] = {
  • // 匹配查找传入的protocol对应的PartitionAssignor metadata
  • supportedProtocols.find(_._1 == protocol) match {
  • case Some((_, metadata)) => metadata
  • case None => // 未查找到会返回IllegalArgumentException异常
  • throw new IllegalArgumentException("Member does not support protocol")
  • }
  • }
  • /**
  • * Check if the provided protocol metadata matches the currently stored metadata.
  • *
  • * 检查当前MemberMetadata支持的PartitionAssignor是否与传入的protocols中的PartitionAssignor匹配
  • */
  • def matches(protocols: List[(String, Array[Byte])]): Boolean = {
  • // 大小不一样,必然是不匹配的
  • if (protocols.size != this.supportedProtocols.size)
  • return false
  • // 遍历进行一一对比,如有不同则是不匹配的
  • for (i <- 0 until protocols.size) {
  • val p1 = protocols(i)
  • val p2 = supportedProtocols(i)
  • if (p1._1 != p2._1 || !util.Arrays.equals(p1._2, p2._2))
  • return false
  • }
  • return true
  • }

MemberMetadata类的整体实现比较简单,关于它的使用也将在后面讲解。

3. GroupMetadata

GroupMetadata记录了Consumer Group的元数据信息,它的定义和重要字段如下:

kafka.coordinator.GroupMetadata
  • /**
  • * Group contains the following metadata:
  • *
  • * Membership metadata:
  • * 1. Members registered in this group
  • * 2. Current protocol assigned to the group (e.g. partition assignment strategy for consumers)
  • * 3. Protocol metadata associated with group members
  • *
  • * State metadata:
  • * 1. group state
  • * 2. generation id
  • * 3. leader id
  • *
  • * 记录了Consumer Group的元数据
  • *
  • * @param groupId 对应的Consumer Group的ID
  • * @param protocolType
  • */
  • @nonthreadsafe
  • private[coordinator] class GroupMetadata(val groupId: String, val protocolType: String) {
  • // key是memberId,value为对应的MemberMetadata对象
  • private val members = new mutable.HashMap[String, MemberMetadata]
  • // 标识当前Consumer Group所处的状态
  • private var state: GroupState = Stable
  • // 标识当前Consumer Group的年代信息,避免受到过期请求的影响
  • var generationId = 0
  • // 记录Consumer Group中的Leader消费者的memberId
  • var leaderId: String = null
  • // 记录当前Consumer Group选择的PartitionAssignor
  • var protocol: String = null
  • ...
  • }

member字段是一个HashMap,用于维护当前GroupMetadata管理的MemberMetadata对象,它的键为分配给客户端的MemberID,值为存储该Member元数据的MemberMetadata对象,这里先了解一下MemberID的生成方式,它对应于多处源码,这里将其过程单独抽取出来:

  • // 生产Member ID为"客户端ID-UUID随机字符串"
  • val memberId = clientId + "-" + group.generateMemberIdSuffix
  • ...
  • def generateMemberIdSuffix = UUID.randomUUID().toString

可见,MemberID其实是以客户端ID和UUID随机字符串拼接而成的。

GroupMetadata的generationId标识了当前Consumer Group的年代信息;leaderId则记录了当前Consumer Group中的Leader消费者的memberIdprotocol用于记录当前Consumer Group选择的PartitionAssignor。

state字段是一个GroupState类型的对象,用于表示Consumer Group的状态;GroupState其实是一个特质:

kafka.coordinator.GroupState
  • // 用于表示Consumer Group的状态
  • private[coordinator] sealed trait GroupState { def state: Byte }

GroupState有四个子类,分别代表GroupCoordinator的四种状态:PreparingRebalance、AwaitingSync、Stable和Dead。关于GroupState的内容后面会详细介绍。

GroupMetadata提供了Member的添加和移除,源码如下:

kafka.coordinator.GroupMetadata
  • def add(memberId: String, member: MemberMetadata) {
  • assert(supportsProtocols(member.protocols))
  • if (leaderId == null)
  • // 将第一个加入的Member作为Group Leader
  • leaderId = memberId
  • members.put(memberId, member)
  • }
  • def remove(memberId: String) {
  • members.remove(memberId)
  • if (memberId == leaderId) { // 如果移除的是Group Leader
  • // 重新选择Group Leader
  • leaderId = if (members.isEmpty) {
  • null
  • } else {
  • // Group Leader被删除,则重新选择第一个Member作为Group Leader
  • members.keys.head
  • }
  • }
  • }

我们可以发现,在添加Member时会将第一个加入的Member作为Group Leader,而在移除Member时,如果移除了角色为Group Leader的Member,也会重新选择Member集合中的第一个Member作为Group Leader。

GroupMetadata需要根据当前Consumer Group中所有Consumer支持的PartitionAssignor来选择合适的PartitionAssignor,这个过程由它的selectProtocol: String方法完成:

kafka.coordinator.GroupMetadata
  • // 为Consumer Group选择合适的PartitionAssignor
  • def selectProtocol: String = {
  • if (members.isEmpty)
  • throw new IllegalStateException("Cannot select protocol for empty group")
  • // select the protocol for this group which is supported by all members
  • // 所有Member都支持的协议作为"候选协议"集合
  • val candidates = candidateProtocols
  • // let each member vote for one of the protocols and choose the one with the most votes
  • /**
  • * 每个Member都会通过vote()方法进行投票,
  • * 每个Member会为其支持的协议中的第一个"候选协议"投一票,
  • * 最终将选择得票最多的PartitionAssignor
  • */
  • val votes: List[(String, Int)] = allMemberMetadata // 先获取所有Member对应的MemberMetadata
  • .map(_.vote(candidates)) // 使用MemberMetadata的vote()方法进行投票
  • .groupBy(identity) // 分组
  • .mapValues(_.size) // 计算每种PartitionAssignor的票数
  • .toList
  • // 取得票最多的PartitionAssignor
  • votes.maxBy(_._2)._1
  • }
  • // 得到所有Member支持的PartitionAssignor协议集合
  • private def candidateProtocols = {
  • // get the set of protocols that are commonly supported by all members
  • allMemberMetadata
  • .map(_.protocols) // 所有Member支持的协议集合
  • .reduceLeft((commonProtocols, protocols) => commonProtocols & protocols)
  • }

4. GroupTopicPartition和OffsetAndMetadata

GroupCoordinator需要知道每个Consumer Group对应着主题的哪些分区,同时还要记录每个分区的消费情况,它会使用GroupTopicPartition维护Consumer Group与分区的消费关系,使用OffsetAndMetadata记录offset的相关信息,它们都是样例类,源码如下:

  • // 维护Consumer Group与分区的消费关系
  • case class GroupTopicPartition(group: String, topicPartition: TopicPartition) {
  • def this(group: String, topic: String, partition: Int) =
  • this(group, new TopicPartition(topic, partition))
  • override def toString =
  • "[%s,%s,%d]".format(group, topicPartition.topic, topicPartition.partition)
  • }
  • // 记录offset相关信息
  • case class OffsetAndMetadata(offsetMetadata: OffsetMetadata,
  • commitTimestamp: Long = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP,
  • expireTimestamp: Long = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP) {
  • def offset = offsetMetadata.offset
  • def metadata = offsetMetadata.metadata
  • override def toString = "[%s,CommitTime %d,ExpirationTime %d]".format(offsetMetadata, commitTimestamp, expireTimestamp)
  • }
  • case class OffsetMetadata(offset: Long,
  • metadata: String = OffsetMetadata.NoMetadata) { // metadata字段默认为空字符串

5. GroupMetadataManager

GroupMetadataManager是GroupCoordinator中负责管理Consumer Group元数据以及其对应Offset信息的组件。GroupMetadataManager底层使用Offsets Topic(即内部主题__consumer_offsets),以消息的形式存储Consumer Group的GroupMetadata信息以及其消费的每个分区的Offset;为了提高查询的效率,GroupMetadataManager同时还会将Consumer Group的GroupMetadata信息和Offset信息在内存中维护一份相同的副本,并与Offsets Topic中存储的信息进行同步。

我们先关注GroupMetadataManager的定义和重要字段:

kafka.coordinator.GroupMetadataManager
  • /**
  • * 负责管理Consumer Group元数据以及其对应offset信息的组件
  • * @param brokerId
  • * @param config
  • * @param replicaManager 用于管理Leader副本、ISR集合、AR集合、Leader副本的迁移等
  • * @param zkUtils
  • * @param time
  • */
  • class GroupMetadataManager(val brokerId: Int,
  • val config: OffsetConfig,
  • replicaManager: ReplicaManager,
  • zkUtils: ZkUtils,
  • time: Time) extends Logging with KafkaMetricsGroup {
  • /** offsets cache
  • * 记录每个Consumer Group消费的分区的offset位置
  • **/
  • private val offsetsCache = new Pool[GroupTopicPartition, OffsetAndMetadata]
  • /** group metadata cache
  • * 记录每个Consumer Group在服务端对应的GroupMetadata对象
  • **/
  • private val groupsCache = new Pool[String, GroupMetadata]
  • /** partitions of consumer groups that are being loaded, its lock should be always called BEFORE offsetExpireLock and the group lock if needed
  • * 记录正在加载的Offset Topic分区的ID
  • **/
  • private val loadingPartitions: mutable.Set[Int] = mutable.Set()
  • /** partitions of consumer groups that are assigned, using the same loading partition lock
  • * 记录已经加载的Offsets Topic分区的ID
  • **/
  • private val ownedPartitions: mutable.Set[Int] = mutable.Set()
  • /** lock for expiring stale offsets, it should be always called BEFORE the group lock if needed */
  • private val offsetExpireLock = new ReentrantReadWriteLock()
  • /** shutting down flag */
  • private val shuttingDown = new AtomicBoolean(false)
  • /** number of partitions for the consumer metadata topic
  • * 记录Offsets Topic的分区数量
  • **/
  • private val groupMetadataTopicPartitionCount = getOffsetsTopicPartitionCount
  • ...
  • }

从上述的字段中可以得知,GroupMetadataManager使用offsetsCachegroupsCache两个类型为Pool的池对象存储OffsetAndMetadata及GroupMetadata对象;同时其内部维护了loadingPartitionsownedPartitions两个集合分别记录Offsets Topic中正在加载及已经加载的分区。

默认情况下,Offsets Topic主题有50个分区,副本因子为3,每个分区存储了特定的Consumer Group的信息,计算分区的方式由Consumer Group的Group ID进行哈希后与总分区数量进行取模,同一Consumer Group对应的这GroupMetadata信息和Offset信息会被分配到同一个Offsets Topic分区中,相关的实现源码如下:

kafka.coordinator.GroupMetadataManager
  • // 对Group ID取模,得到Consumer Group对应的Offsets Topic分区编号
  • def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount
  • /** number of partitions for the consumer metadata topic
  • * 记录Offsets Topic的分区数量
  • **/
  • private val groupMetadataTopicPartitionCount = getOffsetsTopicPartitionCount
  • /**
  • * Gets the partition count of the offsets topic from ZooKeeper.
  • * If the topic does not exist, the configured partition count is returned.
  • *
  • * 获取Offsets Topic主题的分区数量
  • */
  • private def getOffsetsTopicPartitionCount = {
  • val topic = TopicConstants.GROUP_METADATA_TOPIC_NAME
  • // 尝试从Zookeeper中读取__consumer_offsets主题的分区信息
  • val topicData = zkUtils.getPartitionAssignmentForTopics(Seq(topic))
  • if (topicData(topic).nonEmpty)
  • // 能读取到就取Zookeeper存储的分区数量
  • topicData(topic).size
  • else
  • // 否则__consumer_offsets主题默认分区数量为50
  • config.offsetsTopicNumPartitions
  • }

Offsets Topic中对消息的存储是混合连续存储的,它会将Consumer Group对应的GroupMetadata和Offset信息存储在Offsets Topic的分区中,以键进行区分二者的区别,有如下示意图:

2.OffsetsTopic的存储.png

GroupMetadata和Offset信息的键是不同的,GroupMetadataManager分别提供了groupMetadataKey(...)offsetCommitKey(...)两个方法分别生成二者的键:

kafka.coordinator.GroupMetadataManager
  • /**
  • * Generates the key for group metadata message for given group
  • *
  • * 用于创建记录GroupMetadata的消息的键,仅有groupId一个字段组成
  • *
  • * @return key bytes for group metadata message
  • */
  • private def groupMetadataKey(group: String): Array[Byte] = {
  • val key = new Struct(CURRENT_GROUP_KEY_SCHEMA)
  • key.set(GROUP_KEY_GROUP_FIELD, group) // 设置Group ID
  • val byteBuffer = ByteBuffer.allocate(2 /* version */ + key.sizeOf)
  • byteBuffer.putShort(CURRENT_GROUP_KEY_SCHEMA_VERSION)
  • key.writeTo(byteBuffer) // 将键写入到ByteBuffer中
  • byteBuffer.array() // 转换为数组并返回
  • }
  • /**
  • * Generates the key for offset commit message for given (group, topic, partition)
  • *
  • * 用于创建记录消费offset位置的消息的键,由groupId、主题名称、分区ID组成
  • *
  • * @return key for offset commit message
  • */
  • private def offsetCommitKey(group: String, topic: String, partition: Int, versionId: Short = 0): Array[Byte] = {
  • val key = new Struct(CURRENT_OFFSET_KEY_SCHEMA)
  • key.set(OFFSET_KEY_GROUP_FIELD, group) // groupId
  • key.set(OFFSET_KEY_TOPIC_FIELD, topic) // 主题名称
  • key.set(OFFSET_KEY_PARTITION_FIELD, partition) // 分区ID
  • val byteBuffer = ByteBuffer.allocate(2 /* version */ + key.sizeOf)
  • byteBuffer.putShort(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
  • key.writeTo(byteBuffer) // 写入ByteBuffer
  • byteBuffer.array() // 转换为数组返回
  • }

由源码可知,GroupMetadataKey键通过Consumer Group的ID即可生成,而OffsetCommitKey则需要通过Group ID、消费的主题、分区ID确定。

5.1. GroupMetadata池管理

groupsCache作为GroupMetadata缓存池,存储了每个Consumer Group对应的GroupMetadata对象,GroupMetadataManager提供了对groupsCache的各类操作,下面我们分析一下这些操作的具体实现。

5.1.1. 获取GroupMetadata

获取GroupMetadata的操作非常简单,直接从groupsCache中按Group ID为键取即可:

kafka.coordinator.GroupMetadataManager#getGroup
  • /**
  • * Get the group associated with the given groupId, or null if not found
  • *
  • * 根据groupId从groupsCache获取对应的GroupMetadata
  • */
  • def getGroup(groupId: String): GroupMetadata = {
  • groupsCache.get(groupId)
  • }

5.1.2. 添加GroupMetadata

添加GroupMetadata的操作也一样,直接向groupsCache中以Group ID为键,GroupMetadata对象为值添加即可即可:

kafka.coordinator.GroupMetadataManager#addGroup
  • /**
  • * Add a group or get the group associated with the given groupId if it already exists
  • *
  • * 添加GroupMetadata到groupsCache
  • */
  • def addGroup(group: GroupMetadata): GroupMetadata = {
  • // 当不存在时才能添加
  • val currentGroup = groupsCache.putIfNotExists(group.groupId, group)
  • if (currentGroup != null) {
  • currentGroup // 添加不成功返回已存在的GroupMetadata
  • } else {
  • group // 否则返回添加的GroupMetadata
  • }
  • }

5.1.3. 移除GroupMetadata

移除GroupMetadata的操作则相对复杂一点,不仅需要从groupsCache池中将对应的GroupMetadata移除,还要向Offsets Topic中写入“删除标记”消息——也即是值为空的消息标记该GroupMetadata已被删除;移除GroupMetadata的操作由GroupMetadataManager的removeGroup(group: GroupMetadata)方法实现,源码如下:

kafka.coordinator.GroupMetadataManager#removeGroup
  • /**
  • * Remove all metadata associated with the group
  • * @param group
  • */
  • def removeGroup(group: GroupMetadata) {
  • // guard this removal in case of concurrent access (e.g. if a delayed join completes with no members
  • // while the group is being removed due to coordinator emigration)
  • if (groupsCache.remove(group.groupId, group)) { // 删除groupsCache中对应的GroupMetadata
  • // Append the tombstone messages to the partition. It is okay if the replicas don't receive these (say,
  • // if we crash or leaders move) since the new leaders will still expire the consumers with heartbeat and
  • // retry removing this group.
  • // 获取Consumer Group在Offsets Topic中对应的分区的ID
  • val groupPartition = partitionFor(group.groupId)
  • // 获取当前对应的魔数和时间戳
  • val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(groupPartition)
  • // 产生"删除标记"消息,消息值为null,键由groupId封装而来
  • val tombstone = new Message(bytes = null, key = GroupMetadataManager.groupMetadataKey(group.groupId),
  • timestamp = timestamp, magicValue = magicValue)
  • // 获取Offsets Topic中该Consumer Group对应的Partition对象
  • val partitionOpt = replicaManager.getPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartition)
  • partitionOpt.foreach { partition => // 存在Partition对象
  • val appendPartition = TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartition)
  • trace("Marking group %s as deleted.".format(group.groupId))
  • try {
  • // do not need to require acks since even if the tombstone is lost,
  • // it will be appended again by the new leader
  • // TODO KAFKA-2720: periodic purging instead of immediate removal of groups
  • // 使用Partition对象的appendMessagesToLeader(...)方法写入消息
  • partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, tombstone))
  • } catch {
  • case t: Throwable =>
  • error("Failed to mark group %s as deleted in %s.".format(group.groupId, appendPartition), t)
  • // ignore and continue
  • }
  • }
  • }
  • }

注:大家可能会疑惑,为什么添加Group的时候没有向Offsets Topic中写入消息的操作,而移除Group时有;其实添加Group时写入消息的操作是由DelayedStore延迟任务完成的,这部分会在后面讲解。

5.2. OffsetAndMetadata池管理

offsetsCache作为OffsetAndMetadata缓存池,存储了Group中的消费者对指定分区消费时提交的Offset,GroupMetadataManager同样提供了对offsetsCache的各类操作,下面我们分析一下这些操作的具体实现。

5.2.1. 获取OffsetAndMetadata

获取OffsetAndMetadata的操作非常简单,直接从offsetsCache中按GroupTopicPartition对象为键获取即可,需要注意的是该方法返回的是OffsetFetchResponse.PartitionData对象:

kafka.coordinator.GroupMetadataManager#getOffset
  • /**
  • * Fetch the current offset for the given group/topic/partition from the underlying offsets storage.
  • *
  • * 根据Group ID、主题名称和分区ID获取对应的offset
  • *
  • * @param key The requested group-topic-partition
  • * @return If the key is present, return the offset and metadata; otherwise return None
  • */
  • private def getOffset(key: GroupTopicPartition): OffsetFetchResponse.PartitionData = {
  • val offsetAndMetadata = offsetsCache.get(key)
  • if (offsetAndMetadata == null)
  • // 无对应的数据,返回INVALID_OFFSET(值为-1)
  • new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NONE.code)
  • else
  • // 否则返回对应的OffsetAndMetadata中记录的数据
  • new OffsetFetchResponse.PartitionData(offsetAndMetadata.offset, offsetAndMetadata.metadata, Errors.NONE.code)
  • }

5.2.2. 添加OffsetAndMetadata

添加OffsetAndMetadata的操作也非常简单,直接以GroupTopicPartition对象为键、OffsetAndMetadata对象为值向offsetsCache中添加即可:

kafka.coordinator.GroupMetadataManager#getOffset
  • /**
  • * Put the (already committed) offset for the given group/topic/partition into the cache.
  • *
  • * 根据Group ID、主题名称及分区ID添加对应的OffsetAndMetadata信息
  • *
  • * @param key The group-topic-partition
  • * @param offsetAndMetadata The offset/metadata to be stored
  • */
  • private def putOffset(key: GroupTopicPartition, offsetAndMetadata: OffsetAndMetadata) {
  • // 直接添加到offsetsCache中,如果存在会被替换
  • offsetsCache.put(key, offsetAndMetadata)
  • }

5.2.3. 定时清理OffsetAndMetadata

GroupMetadataManager没有提供对offsetsCache池中的OffsetAndMetadata对象进行手动移除的操作,这是因为对于客户端来说,是没有移除OffsetAndMetadata的需求场景的,但GroupMetadataManager会在初始化时创建并启动一个KafkaScheduler调度器,提交名为“delete-expired-consumer-offsets”的定时任务,该任务它会周期性地调用deleteExpiredOffsets()方法进行过期Offset的清理:

注:OffsetAndMetadata消息的过期时间是在Offset被提交时指定的,默认为提交时间后的24 * 60 * 60 * 1000L毫秒,即保留一天。

kafka.coordinator.GroupMetadataManager
  • ...
  • /** Single-thread scheduler to handling offset/group metadata cache loading and unloading
  • * 用于执行delete-expired-consumer-offsets、GroupCoordinator迁移等任务
  • **/
  • private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix = "group-metadata-manager-")
  • // 启动定时任务调度器
  • scheduler.startup()
  • /**
  • * 提交delete-expired-consumer-offsets定时任务,调用的是deleteExpiredOffsets方法
  • * 用于定时对offsetsCache集合进行删除操作
  • * 执行周期默认为600秒,由offsets.retention.check.interval.ms项配置
  • */
  • scheduler.schedule(name = "delete-expired-consumer-offsets",
  • fun = deleteExpiredOffsets,
  • period = config.offsetsRetentionCheckIntervalMs,
  • unit = TimeUnit.MILLISECONDS)
  • ...

该任务的默认执行周期为600000毫秒(即600秒),deleteExpiredOffsets()方法除了需要删除offsetsCache池中对应的OffsetAndMetadata对象,还会向Offsets Topic中追加“删除标记”消息:

kafka.coordinator.GroupMetadataManager#deleteExpiredOffsets
  • // 删除offsetsCache中过期的OffsetMetadata对象,并向Offsets Topic中追加"删除标记"消息
  • private def deleteExpiredOffsets() {
  • debug("Collecting expired offsets.")
  • // 当前时间
  • val startMs = time.milliseconds()
  • val numExpiredOffsetsRemoved = inWriteLock(offsetExpireLock) {
  • // 得到offsetsCache中所有过期的OffsetAndMetadata对象
  • val expiredOffsets = offsetsCache.filter { case (groupTopicPartition, offsetAndMetadata) =>
  • // 所有过期时间标记小于当前时间,即认定为过期,过期时间默认是添加操作时间后的24*60*60*1000L毫秒,即保留一天
  • offsetAndMetadata.expireTimestamp < startMs
  • }
  • debug("Found %d expired offsets.".format(expiredOffsets.size))
  • // delete the expired offsets from the table and generate tombstone messages to remove them from the log
  • // 对过期offsetAndMetadata进行Map操作
  • val tombstonesForPartition = expiredOffsets.map { case (groupTopicAndPartition, offsetAndMetadata) =>
  • // 找到过期OffsetAndMetadata对应的分区的ID
  • val offsetsPartition = partitionFor(groupTopicAndPartition.group)
  • trace("Removing expired offset and metadata for %s: %s".format(groupTopicAndPartition, offsetAndMetadata))
  • // 先从offsetsCache中将该过期OffsetAndMetadata对象移除
  • offsetsCache.remove(groupTopicAndPartition)
  • // 获取该OffsetAndMetadata对应的Group ID在__consumer_offsets主题中存储的offset位置消息的键
  • val commitKey = GroupMetadataManager.offsetCommitKey(groupTopicAndPartition.group,
  • groupTopicAndPartition.topicPartition.topic, groupTopicAndPartition.topicPartition.partition)
  • // 获取消息对应的魔数和时间戳
  • val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(offsetsPartition)
  • // 构造对应的值为null的空消息
  • (offsetsPartition, new Message(bytes = null, key = commitKey, timestamp = timestamp, magicValue = magicValue))
  • }.groupBy { case (partition, tombstone) => partition } // 按照Offsets Topic的分区ID进行分组
  • // Append the tombstone messages to the offset partitions. It is okay if the replicas don't receive these (say,
  • // if we crash or leaders move) since the new leaders will get rid of expired offsets during their own purge cycles.
  • tombstonesForPartition.flatMap { case (offsetsPartition, tombstones) =>
  • // 根据主题名称__consumer_offsets及分区ID获取对应的Partition对象
  • val partitionOpt = replicaManager.getPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
  • partitionOpt.map { partition =>
  • val appendPartition = TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
  • // 获取要追加的"删除标记"的消息集合
  • val messages = tombstones.map(_._2).toSeq
  • trace("Marked %d offsets in %s for deletion.".format(messages.size, appendPartition))
  • try {
  • // do not need to require acks since even if the tombstone is lost,
  • // it will be appended again in the next purge cycle
  • // 追加"删除标记"消息,此处将messages转换为了多参数序列
  • partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages: _*))
  • tombstones.size
  • }
  • catch {
  • case t: Throwable =>
  • error("Failed to mark %d expired offsets for deletion in %s.".format(messages.size, appendPartition), t)
  • // ignore and continue
  • 0
  • }
  • }
  • }.sum
  • }
  • info("Removed %d expired offsets in %d milliseconds.".format(numExpiredOffsetsRemoved, time.milliseconds() - startMs))
  • }

5.3. GroupCoordinatorRequest的处理

在讲解消费者的实现的时候,我们提到过,消费者在于GroupCoordinator进行交互之前,首先会发送GroupCoordinatorRequest请求到负载较小的Broker,查询管理其所在Consumer Group对应的GroupCoordinator的网络位置,然后消费者才会与该GroupCoordinator所在的Broker建立连接,发送JoinGroupRequest和SyncGroupRequest请求。

GroupCoordinatorRequest请求到达服务端之后,会由KafkaApis的handleGroupCoordinatorRequest(...)方法负责处理,主要分为以下两步:

  1. 使用partitionFor(...)方法得到负责保存对应Consumer Group信息的Offsets Topic分区。
  2. 查找其MetadataCache,得到此Offsets Topic分区的Leader副本所在的Broker,其上的GroupCoordinator就负责管理该Consumer Group。

KafkaApis的handleGroupCoordinatorRequest(...)方法的源码如下:

kafka.server.KafkaApis#handleGroupCoordinatorRequest
  • // 负责处理GroupCoordinatorRequest,查询管理消费者所属的Consumer Group对应的GroupCoordinator的网络位置
  • def handleGroupCoordinatorRequest(request: RequestChannel.Request) {
  • // 转换请求体为GroupCoordinatorRequest对象
  • val groupCoordinatorRequest = request.body.asInstanceOf[GroupCoordinatorRequest]
  • // 根据correlationId构造请求头
  • val responseHeader = new ResponseHeader(request.header.correlationId)
  • if (!authorize(request.session, Describe, new Resource(Group, groupCoordinatorRequest.groupId))) { // 检查授权情况
  • // 未授权,返回GROUP_AUTHORIZATION_FAILED错误码,Node节点被置为NO_NODE
  • val responseBody = new GroupCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED.code, Node.noNode)
  • requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
  • } else {
  • // 通过groupId得到对应的Offsets Topic分区的ID
  • val partition = coordinator.partitionFor(groupCoordinatorRequest.groupId)
  • // get metadata (and create the topic if necessary)
  • // 从MetadataCache中获取Offsets Topic的相关信息,如果Offsets Topic未创建则会创建
  • val offsetsTopicMetadata = getOrCreateGroupMetadataTopic(request.securityProtocol)
  • val responseBody = if (offsetsTopicMetadata.error != Errors.NONE) { // 获取出错
  • // 返回GROUP_COORDINATOR_NOT_AVAILABLE错误码,Node节点被置为NO_NODE
  • new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code, Node.noNode)
  • } else {
  • // 通过Offsets Topic的分区的ID获取其Leader副本所在的节点,由该节点上的GroupCoordinator负责管理该Consumer Group组
  • val coordinatorEndpoint = offsetsTopicMetadata.partitionMetadata().asScala
  • .find(_.partition == partition)
  • .map(_.leader())
  • // 创建GroupCoordinatorResponse并返回
  • coordinatorEndpoint match {
  • case Some(endpoint) if !endpoint.isEmpty => // 存在,返回构造的GroupCoordinatorResponse
  • new GroupCoordinatorResponse(Errors.NONE.code, endpoint)
  • case _ =>
  • // 不存在,返回GROUP_COORDINATOR_NOT_AVAILABLE错误码,Node节点被置为NO_NODE
  • new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code, Node.noNode)
  • }
  • }
  • trace("Sending consumer metadata %s for correlation id %d to client %s."
  • .format(responseBody, request.header.correlationId, request.header.clientId))
  • // 将响应加入到RequestChannel中的队列中,等待发送
  • requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
  • }
  • }

从源码中我们可以得知,分配给消费者的GroupCoordinator所在的Broker,选择的是该消费者所在Consumer Group所对应的__consumer_offsets主题的分区的Leader副本所在的Broker;这也比较好理解,因为消费者需要向__consumer_offsets中提交分区分配信息(Group内Leader消费者的操作)、获取分区分配(Group内所有消费者的操作)、提交Offset信息(Group内所有消费者的操作),因此需要与__consumer_offsets对应分区的Leader副本进行拉取和写入数据的交互,选择该Leader副本所在Broker上的GroupCoordinator自然会省掉不必要的网络通信开销。

另外,在handleGroupCoordinatorRequest(...)方法中获取__consumer_offsets主题涉及到的方法的实现如下:

kafka.server.KafkaApis
  • // 获取存储Consumer Group中消费者消费的offset的主题__consumer_offsets,如果获取不到就创建
  • private def getOrCreateGroupMetadataTopic(securityProtocol: SecurityProtocol): MetadataResponse.TopicMetadata = {
  • val topicMetadata = metadataCache.getTopicMetadata(Set(TopicConstants.GROUP_METADATA_TOPIC_NAME), securityProtocol)
  • topicMetadata.headOption.getOrElse(createGroupMetadataTopic())
  • }
  • // 创建Group组的元数据存放的主题
  • private def createGroupMetadataTopic(): MetadataResponse.TopicMetadata = {
  • val aliveBrokers = metadataCache.getAliveBrokers
  • val offsetsTopicReplicationFactor =
  • if (aliveBrokers.nonEmpty)
  • // offsetsTopicReplicationFactor由offsets.topic.replication.factor参数配置
  • Math.min(config.offsetsTopicReplicationFactor.toInt, aliveBrokers.length)
  • else
  • config.offsetsTopicReplicationFactor.toInt
  • // 创建__consumer_offsets
  • createTopic(TopicConstants.GROUP_METADATA_TOPIC_NAME, // __consumer_offsets
  • config.offsetsTopicPartitions, // offsets.topic.num.partitions
  • offsetsTopicReplicationFactor,
  • coordinator.offsetsTopicConfigs)
  • }
  • // 创建主题
  • private def createTopic(topic: String,
  • numPartitions: Int,
  • replicationFactor: Int,
  • properties: Properties = new Properties()): MetadataResponse.TopicMetadata = {
  • try {
  • // 使用AdminUtils创建主题
  • AdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe)
  • info("Auto creation of topic %s with %d partitions and replication factor %d is successful"
  • .format(topic, numPartitions, replicationFactor))
  • new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, common.Topic.isInternal(topic),
  • java.util.Collections.emptyList())
  • } catch {
  • case e: TopicExistsException => // let it go, possibly another broker created this topic
  • new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, common.Topic.isInternal(topic),
  • java.util.Collections.emptyList())
  • case itex: InvalidTopicException =>
  • new MetadataResponse.TopicMetadata(Errors.INVALID_TOPIC_EXCEPTION, topic, common.Topic.isInternal(topic),
  • java.util.Collections.emptyList())
  • }
  • }

从上述过程的源码可知,如果获取不到__consumer_offsets则会尝试创建,该主题的分区数可由offsets.topic.num.partitions参数配置,副本因子则由offsets.topic.replication.factor参数和存活的Broker数量共同决定,取二者较小的值。

5.4. GroupCoordinator的迁移

当某个Broker发生故障出现宕机之后,该Broker上的Offsets Topic相关分区的Leader副本以及GroupCoordinator和Consumer Group相关的信息会发生迁移,我们先回顾一下Broker出现故障宕机下线后会触发的操作:当Broker下线后,会删除Zookeeper中/brokers/ids节点下对应的临时子节点,从而触发BrokerChangeListener监听器进行下面的流程:

  1. 首先从出现故障的Broker集合中移除正在正常关闭的Broker。
  2. 过滤得到Leader副本存在于故障Broker上的分区,将它们的状态转换为OfflinePartition。
  3. 再次将第2步中的分区的状态转换为OnlinePartition状态,这将触发新Leader副本的选举操作(使用OfflinePartitionLeaderSelector选举器),并发送向可用的Broker发送LeaderAndIsrRequest和UpdateMetaRequest以更新相关信息。
  4. 将故障Broker上的副本转换为OfflineReplica状态,并将其从对应分区的ISR集合中删除。
  5. 检查故障Broker上是否有待删除的主题,如果有则将其标记为不可删除。
  6. 向所有可用的Broker发送UpdateMetadataRequest请求以更新元数据。

从其中第3步我们可以得知,当Broker下线后,其它可用的Broker会收到KafkaController发送的LeaderAndIsrRequest请求以进行Leader副本的选举;LeaderAndIsrRequest请求由KafkaApis的handleLeaderAndIsrRequest(...)方法处理,我们回顾其源码:

kafka.server.KafkaApis#handleLeaderAndIsrRequest
  • // 用于操作Leader或ISR的请求
  • def handleLeaderAndIsrRequest(request: RequestChannel.Request) {
  • // ensureTopicExists is only for client facing requests
  • // We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
  • // stop serving data to clients for the topic being deleted
  • val correlationId = request.header.correlationId
  • // 转换请求为LeaderAndIsrRequest
  • val leaderAndIsrRequest = request.body.asInstanceOf[LeaderAndIsrRequest]
  • try {
  • def onLeadershipChange(updatedLeaders: Iterable[Partition], updatedFollowers: Iterable[Partition]) {
  • // for each new leader or follower, call coordinator to handle consumer group migration.
  • // this callback is invoked under the replica state change lock to ensure proper order of
  • // leadership changes
  • // 处理GroupCoordinator的迁移
  • updatedLeaders.foreach { partition =>
  • if (partition.topic == TopicConstants.GROUP_METADATA_TOPIC_NAME) // 主题为__consumer_offsets
  • // Broker成为Offsets Topic分区的Leader副本时调用
  • coordinator.handleGroupImmigration(partition.partitionId)
  • }
  • updatedFollowers.foreach { partition =>
  • if (partition.topic == TopicConstants.GROUP_METADATA_TOPIC_NAME) // 主题为__consumer_offsets
  • // Broker成为Offsets Topic分区的Follower副本时调用
  • coordinator.handleGroupEmigration(partition.partitionId)
  • }
  • }
  • // 根据correlationId构造响应头,请求端会根据响应的correlationId以匹配发送的请求对象
  • val responseHeader = new ResponseHeader(correlationId)
  • val leaderAndIsrResponse =
  • if (authorize(request.session, ClusterAction, Resource.ClusterResource)) { // 检查授权
  • // 授权通过,调用ReplicaManager的becomeLeaderOrFollower(...)方法进行处理,注意此处会传入上面定义的onLeadershipChange回调方法
  • val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, metadataCache, onLeadershipChange)
  • // 将处理的结果构造为LeaderAndIsrResponse响应对象
  • new LeaderAndIsrResponse(result.errorCode, result.responseMap.mapValues(new JShort(_)).asJava)
  • } else {
  • // 授权未通过,向leaderAndIsrRequest中记录CLUSTER_AUTHORIZATION_FAILED错误码
  • val result = leaderAndIsrRequest.partitionStates.asScala.keys.map((_, new JShort(Errors.CLUSTER_AUTHORIZATION_FAILED.code))).toMap
  • // 构造LeaderAndIsrResponse响应,错误码为CLUSTER_AUTHORIZATION_FAILED
  • new LeaderAndIsrResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.code, result.asJava)
  • }
  • // 使用RequestChannel发送响应
  • requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, responseHeader, leaderAndIsrResponse)))
  • } catch {
  • case e: KafkaStorageException =>
  • fatal("Disk error during leadership change.", e)
  • Runtime.getRuntime.halt(1)
  • }
  • }

在该方法中定义了onLeadershipChange(...)回到函数,会在正确处理完Leader副本选举之后完成GroupCoordinator的迁移操作。这里存在两种情况:

  1. 当前Broker成为Offsets Topic主题的某个分区的Leader副本时,会调用GroupCoordinator的handleGroupImmigration(...)方法进行处理。
  2. 当前Broker成为Offsets Topic主题的某个分区的Follower副本时,会调用GroupCoordinator的handleGroupEmigration(...)方法进行处理。

我们首先讨论第一种情况,GroupCoordinator的handleGroupImmigration(...)方法将具体实现直接交给了GroupMetadataManager的loadGroupsForPartition(...)方法:

kafka.coordinator.GroupCoordinator#handleGroupImmigration
  • def handleGroupImmigration(offsetTopicPartitionId: Int) {
  • groupManager.loadGroupsForPartition(offsetTopicPartitionId, onGroupLoaded)
  • }

GroupMetadataManager的loadGroupsForPartition(...)方法的源码如下:

kafka.coordinator.GroupMetadataManager#loadGroupsForPartition
  • /**
  • * Asynchronously read the partition from the offsets topic and populate the cache
  • */
  • def loadGroupsForPartition(offsetsPartition: Int,
  • onGroupLoaded: GroupMetadata => Unit) {
  • // 向KafkaScheduler提交一个线程任务
  • val topicPartition = TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
  • scheduler.schedule(topicPartition.toString, loadGroupsAndOffsets) // 任务调用了loadGroupsAndOffsets()方法
  • def loadGroupsAndOffsets() {
  • info("Loading offsets and group metadata from " + topicPartition)
  • // 检查指定的Offsets Topic的分区是否正在加载
  • loadingPartitions synchronized {
  • if (loadingPartitions.contains(offsetsPartition)) {
  • // 正在加载,直接返回
  • info("Offset load from %s already in progress.".format(topicPartition))
  • return
  • } else {
  • // 否则将该分区的ID添加到loadingPartitions中
  • loadingPartitions.add(offsetsPartition)
  • }
  • }
  • val startMs = time.milliseconds()
  • try {
  • replicaManager.logManager.getLog(topicPartition) match { // 得到该分区的对应的Log对象
  • case Some(log) => // 能获取到对应的Log对象
  • var currOffset = log.logSegments.head.baseOffset
  • val buffer = ByteBuffer.allocate(config.loadBufferSize) // 默认为5MB
  • // loop breaks if leader changes at any time during the load, since getHighWatermark is -1
  • inWriteLock(offsetExpireLock) {
  • // 构造需要加载的GroupMetadata字典
  • val loadedGroups = mutable.Map[String, GroupMetadata]()
  • // 构造需要移除的GroupMetadata对应的groupId集合
  • val removedGroups = mutable.Set[String]()
  • // 持续读取HighWatermark线之前的消息
  • while (currOffset < getHighWatermark(offsetsPartition) && !shuttingDown.get()) {
  • buffer.clear()
  • // 读取消息数据到buffer中
  • val messages = log.read(currOffset, config.loadBufferSize).messageSet.asInstanceOf[FileMessageSet]
  • messages.readInto(buffer, 0)
  • // 构造为ByteBufferMessageSet对象
  • val messageSet = new ByteBufferMessageSet(buffer)
  • // 对该ByteBufferMessageSet对象进行遍历
  • messageSet.foreach { msgAndOffset =>
  • require(msgAndOffset.message.key != null, "Offset entry key should not be null")
  • // 读取并解析消息的键
  • val baseKey = GroupMetadataManager.readMessageKey(msgAndOffset.message.key)
  • if (baseKey.isInstanceOf[OffsetKey]) { // 键是表示Offset位置的键
  • // load offset
  • // 转换键的key字段为GroupTopicPartition对象,其中保存了Group ID、主题名称和分区ID
  • val key = baseKey.key.asInstanceOf[GroupTopicPartition]
  • if (msgAndOffset.message.payload == null) { // 消息的值为null,表示该消息是"删除标记"
  • // 从offsetsCache中移除相应的键对应的OffsetAndMetadata对象
  • if (offsetsCache.remove(key) != null)
  • trace("Removed offset for %s due to tombstone entry.".format(key))
  • else
  • trace("Ignoring redundant tombstone for %s.".format(key))
  • } else {
  • // special handling for version 0:
  • // set the expiration time stamp as commit time stamp + server default retention time
  • // 消息的值不为null,表示该消息不是"删除标记",需要将消息解析为OffsetAndMetadata对象,存入到offsetsCache中
  • val value = GroupMetadataManager.readOffsetMessageValue(msgAndOffset.message.payload)
  • // 将相应的键和解析得到的OffsetAndMetadata对象存入到offsetsCache中
  • putOffset(key, value.copy (
  • expireTimestamp = {
  • if (value.expireTimestamp == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP) // 为-1时
  • value.commitTimestamp + config.offsetsRetentionMs // offset过期时间,默认为24*60*60*1000,即24小时
  • else // 不为-1
  • value.expireTimestamp
  • }
  • ))
  • trace("Loaded offset %s for %s.".format(value, key))
  • }
  • } else { // 键是表示GroupMetadata的键
  • // load group metadata
  • // 转换键的key字段为String对象,即对应的Group ID
  • val groupId = baseKey.key.asInstanceOf[String]
  • // 将消息的值解析为GroupMetadata对象
  • val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, msgAndOffset.message.payload)
  • if (groupMetadata != null) { // 解析得到的GroupMetadata对象不为空,表示该消息有效,需要写入到groupsCache
  • trace(s"Loaded group metadata for group ${groupMetadata.groupId} with generation ${groupMetadata.generationId}")
  • // 先移除removedGroups中记录了该GroupID
  • removedGroups.remove(groupId)
  • // 在将新的GroupMetadata添加到loadedGroups
  • loadedGroups.put(groupId, groupMetadata)
  • } else { // 解析得到的GroupMetadata对象为空,表示该消息为"删除标记",需要将其从groupsCache中移除
  • // 从loadedGroups中移除
  • loadedGroups.remove(groupId)
  • // 添加到removedGroups中
  • removedGroups.add(groupId)
  • }
  • }
  • currOffset = msgAndOffset.nextOffset
  • }
  • }
  • // 处理loadedGroups,主要是将其中的GroupMetadata对象添加到groupsCache中
  • loadedGroups.values.foreach { group =>
  • val currentGroup = addGroup(group)
  • if (group != currentGroup)
  • debug(s"Attempt to load group ${group.groupId} from log with generation ${group.generationId} failed " +
  • s"because there is already a cached group with generation ${currentGroup.generationId}")
  • else
  • // 触发onGroupLoaded回调
  • onGroupLoaded(group)
  • }
  • // 处理removedGroups,主要是将groupsCache中对应的GroupMetadata对象移除
  • removedGroups.foreach { groupId =>
  • val group = groupsCache.get(groupId)
  • if (group != null)
  • throw new IllegalStateException(s"Unexpected unload of acitve group ${group.groupId} while " +
  • s"loading partition ${topicPartition}")
  • }
  • }
  • if (!shuttingDown.get())
  • info("Finished loading offsets from %s in %d milliseconds."
  • .format(topicPartition, time.milliseconds() - startMs))
  • case None =>
  • warn("No log found for " + topicPartition)
  • }
  • }
  • catch {
  • case t: Throwable =>
  • error("Error in loading offsets from " + topicPartition, t)
  • }
  • finally {
  • // 将当前Offset Topic分区的id从loadingPartitions集合移入ownedPartitions集合,标识该分区加载完成
  • loadingPartitions synchronized {
  • ownedPartitions.add(offsetsPartition)
  • loadingPartitions.remove(offsetsPartition)
  • }
  • }
  • }
  • }

GroupMetadataManager的loadGroupsForPartition(...)方法内部会向KafkaScheduler调度器scheduler提交一个线程任务,任务功能由其内部函数loadGroupsAndOffsets()实现,用于从Offsets Topic中加载GroupMetadata及OffsetAndMetadata数据,具体流程如下:

  1. 检测Offsets Topic中当前的分区是否正在加载。如果是,则结束本次加载操作,否则将其加入loadingPartitions集合,标识该分区正在进行加载。
  2. 通过ReplicaManager组件得到当前分区对应的Log对象。
  3. 从Log对象中的第一个LogSegment开始加载,加载过程中可能会碰到记录了Offset信息的消息,也有可能碰到记录GroupMetadata信息的消息,还有可能是“删除标记”消息,需要区分处理:
    - 如果是记录Offset信息的消息且是“删除标记”,则删除offsetsCache集合中对应的OffsetAndMetadata对象。
    - 如果是记录Offset信息的消息且不是“删除标记”,则解析消息形成OffsetAndMetadata对象,添加到offsetsCache集合中。
    - 如果是记录GroupMetadata信息的消息,则统计是否为“删除标记”,在第4步中进行处理。
  4. 根据第3步的第三种情况中的统计,将需要加载的GroupMetadata信息加载到groupsCache集合中,并检测需要删除的GroupMetadata信息是否还在groupsCache集合中。
  5. 将当前Offset Topic分区的ID从loadingPartitions集合移入ownedPartitions集合,标识该分区加载完成,当前GroupCoordinator开始正式负责管理其对应的Consumer Group。

注意,此处传递给loadGroupsForPartition(...)方法的第二个参数为回调函数,它会在第4步中被调用,用于更新所有加载的GroupMetadata中MemberMetadata的心跳操作:

kafka.coordinator.GroupCoordinator#onGroupLoaded
  • /**
  • * 在handleGroupImmigration()方法中传入GroupMetadataManager.loadGroupsForPartition()方法的回调函数
  • * 当出现GroupMetadata重新加载时,会调用它更新心跳
  • */
  • private def onGroupLoaded(group: GroupMetadata) {
  • group synchronized {
  • info(s"Loading group metadata for ${group.groupId} with generation ${group.generationId}")
  • assert(group.is(Stable))
  • // 更新所有Member的心跳操作
  • group.allMemberMetadata.foreach(completeAndScheduleNextHeartbeatExpiration(group, _))
  • }
  • }

第二种情况中,GroupCoordinator的handleGroupEmigration(...)方法将具体实现直接交给了GroupMetadataManager的removeGroupsForPartition(...)方法:

kafka.coordinator.GroupCoordinator#handleGroupEmigration
  • def handleGroupEmigration(offsetTopicPartitionId: Int) {
  • groupManager.removeGroupsForPartition(offsetTopicPartitionId, onGroupUnloaded)
  • }

GroupMetadataManager的removeGroupsForPartition(...)方法的源码如下:

kafka.coordinator.GroupMetadataManager#removeGroupsForPartition
  • /**
  • * When this broker becomes a follower for an offsets topic partition clear out the cache for groups that belong to
  • * that partition.
  • * @param offsetsPartition Groups belonging to this partition of the offsets topic will be deleted from the cache.
  • */
  • def removeGroupsForPartition(offsetsPartition: Int,
  • onGroupUnloaded: GroupMetadata => Unit) {
  • // 向KafkaScheduler提交一个线程任务
  • val topicPartition = TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
  • scheduler.schedule(topicPartition.toString, removeGroupsAndOffsets) // 任务调用了removeGroupsAndOffsets()方法
  • def removeGroupsAndOffsets() {
  • // 记录offset和group被移除的数量
  • var numOffsetsRemoved = 0
  • var numGroupsRemoved = 0
  • loadingPartitions synchronized {
  • // we need to guard the group removal in cache in the loading partition lock
  • // to prevent coordinator's check-and-get-group race condition
  • // 从ownedPartitions中移除对应的Offsets Topic的分区,标识当前GroupCoordinator不再管理其对应Consumer Group
  • ownedPartitions.remove(offsetsPartition)
  • // clear the offsets for this partition in the cache
  • /**
  • * NOTE: we need to put this in the loading partition lock as well to prevent race condition of the leader-is-local check
  • * in getOffsets to protects against fetching from an empty/cleared offset cache (i.e., cleared due to a leader->follower
  • * transition right after the check and clear the cache), causing offset fetch return empty offsets with NONE error code
  • *
  • * 遍历offsetsCache字典的键集
  • */
  • offsetsCache.keys.foreach { key =>
  • if (partitionFor(key.group) == offsetsPartition) {
  • offsetsCache.remove(key) // 将对应的OffsetAndMetadata全部清除
  • // 计数
  • numOffsetsRemoved += 1
  • }
  • }
  • // clear the groups for this partition in the cache
  • // 遍历groupsCache字典的值集
  • for (group <- groupsCache.values) {
  • if (partitionFor(group.groupId) == offsetsPartition) {
  • // 调用回调函数
  • onGroupUnloaded(group)
  • // 将对应的GroupMetadata全部清除
  • groupsCache.remove(group.groupId, group)
  • // 计数
  • numGroupsRemoved += 1
  • }
  • }
  • }
  • if (numOffsetsRemoved > 0) info("Removed %d cached offsets for %s on follower transition."
  • .format(numOffsetsRemoved, TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, offsetsPartition)))
  • if (numGroupsRemoved > 0) info("Removed %d cached groups for %s on follower transition."
  • .format(numGroupsRemoved, TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, offsetsPartition)))
  • }
  • }

GroupMetadataManager的removeGroupsForPartition(...)loadGroupsForPartition(...)方法非常类似,内部也会向KafkaScheduler调度器scheduler提交一个线程任务,任务功能由其内部函数removeGroupsAndOffsets()实现,用于从Offsets Topic中groupsCacheoffsetsCache中移除对应的GroupMetadata及OffsetAndMetadata数据,具体流程如下:

  1. ownedPartitions集合中将对应的Offsets Topic分区删除,标识当前GroupCoordinator不再管理其对应Consumer Group。
  2. 遍历offsetsCache集合,将此分区对应的OffsetAndMetadata全部清除。
  3. 遍历groupsCache集合,将此分区对应的GroupMetadata全部清除

其中,第2步和第3步仅仅是清理offsetsCachegroupsCache集合,并没有向对应的Offsets Topic分区中追加“删除标记”消息,因为其他Broker会成为此Offsets Topic分区的Leader副本,还需要使用其中记录的GroupMetadata信息和Offset信息。

同时,此处传递给removeGroupsForPartition(...)方法的第二个参数为回调函数,它会在第3步中被调用,用于在GroupMetadata被删除前,将Consumer Group状态转换成Dead,并根据之前的Consumer Group状态进行相应的清理操作:

kafka.coordinator.GroupCoordinator#onGroupUnloaded
  • /**
  • * 在handleGroupEmigration()方法中传入GroupMetadataManager.removeGroupsForPartition()方法的回调函数,
  • * 它会在GroupMetadata被删除前,将Consumer Group状态转换成Dead,并根据之前的Consumer Group状态进行相应的清理操作
  • */
  • private def onGroupUnloaded(group: GroupMetadata) {
  • group synchronized {
  • info(s"Unloading group metadata for ${group.groupId} with generation ${group.generationId}")
  • // 记录旧状态
  • val previousState = group.currentState
  • // 转换为Dead状态
  • group.transitionTo(Dead)
  • // 根据Consumer Group之前的状态进行清理
  • previousState match {
  • case Dead =>
  • case PreparingRebalance =>
  • // 调用所有MemberMetadata的awaitingJoinCallback回调函数,返回NOT_COORDINATOR_FOR_GROUP错误码
  • for (member <- group.allMemberMetadata) {
  • if (member.awaitingJoinCallback != null) {
  • member.awaitingJoinCallback(joinError(member.memberId, Errors.NOT_COORDINATOR_FOR_GROUP.code))
  • // 清空awaitingJoinCallback回调函数
  • member.awaitingJoinCallback = null
  • }
  • }
  • // awaitingJoinCallback的变化,可能导致DelayedJoin满足条件,故进行尝试
  • joinPurgatory.checkAndComplete(GroupKey(group.groupId))
  • case Stable | AwaitingSync =>
  • // 调用所有MemberMetadata的awaitingJoinCallback回调函数,返回NOT_COORDINATOR_FOR_GROUP错误码
  • for (member <- group.allMemberMetadata) {
  • if (member.awaitingSyncCallback != null) {
  • member.awaitingSyncCallback(Array.empty[Byte], Errors.NOT_COORDINATOR_FOR_GROUP.code)
  • // 清空awaitingSyncCallback回调函数
  • member.awaitingSyncCallback = null
  • }
  • // awaitingSyncCallback的变化,可能导致DelayedHeartbeat满足条件,故进行尝试
  • heartbeatPurgatory.checkAndComplete(MemberKey(member.groupId, member.memberId))
  • }
  • }
  • }
  • }

在上述两个迁移涉及的过程中,loadingPartitions集合用于记录正在迁移Offsets Topic分区,ownedPartitions集合用于记录当前GroupCoordinator管理的Offsets Topic的分区。GroupMetadataManager中提供了两个检测方法:isGroupLocal()isGroupLoading()。其中isGroupLocal()用于检测当前GroupCoordinator是否管理指定的Consumer Group;isGroupLoading()方法用于检测指定的Consumer Group对应的Offsets Topic分区是否还处于上述加载过程之中:

  • /**
  • * 在处理JoinGroupRequest、OffsetFetchRequest、OffsetCommitRequest及HeartbeatRequest这些请求之前,
  • * 都会调用isGroupLocal()和isGroupLoading()两个方法进行检测,如果检测失败,则直接返回异常响应
  • */
  • // 检测当前GroupCoordinator是否管理指定的Consumer Group
  • def isGroupLocal(groupId: String): Boolean = loadingPartitions synchronized ownedPartitions.contains(partitionFor(groupId))
  • // 检测指定的Consumer Group对应的Offsets Topic分区是否还处于加载过程中
  • def isGroupLoading(groupId: String): Boolean = loadingPartitions synchronized loadingPartitions.contains(partitionFor(groupId))

在处理JoinGroupRequest、OffsetFetchRequest、OffsetCommitRequest及HeartbeatRequest这些请求之前,都会调用这两个方法进行检测,如果检测失败,则直接返回异常响应。

5.5. SyncGroupRequest的处理

在讲解消费者的实现时,我们提到过,Consumer Group中的Leader消费者会通过发送SyncGroupRequest请求将Consumer Group内的分区消费的分配结果报告给GroupCoordinator,然后由GroupCoordinator对分配结果进行解析后构造为SyncGroupResponse返回给所有该Consumer Group内的消费者,消费者通过解析SyncGroupResponse请求即可确认自己被分配了哪些分区。

SyncGroupRequest请求由KafkaApis的handleSyncGroupRequest(...)方法负责处理,源码如下:

kafka.server.KafkaApis#handleSyncGroupRequest
  • // 处理SyncGroupRequest请求
  • def handleSyncGroupRequest(request: RequestChannel.Request) {
  • import JavaConversions._
  • // 转换请求体为SyncGroupRequest对象
  • val syncGroupRequest = request.body.asInstanceOf[SyncGroupRequest]
  • // 回调函数
  • def sendResponseCallback(memberState: Array[Byte], errorCode: Short) {
  • val responseBody = new SyncGroupResponse(errorCode, ByteBuffer.wrap(memberState))
  • val responseHeader = new ResponseHeader(request.header.correlationId)
  • requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
  • }
  • if (!authorize(request.session, Read, new Resource(Group, syncGroupRequest.groupId()))) { // 检查授权
  • // 授权未通过,直接调用回调函数返回GROUP_AUTHORIZATION_FAILED错误码
  • sendResponseCallback(Array[Byte](), Errors.GROUP_AUTHORIZATION_FAILED.code)
  • } else {
  • // 授权通过,交给GroupCoordinator的handleSyncGroup()方法处理
  • coordinator.handleSyncGroup(
  • syncGroupRequest.groupId(), // Group ID
  • syncGroupRequest.generationId(), // Group的年代信息
  • syncGroupRequest.memberId(), // Member ID
  • syncGroupRequest.groupAssignment().mapValues(Utils.toArray(_)), // 分配结果
  • sendResponseCallback
  • )
  • }
  • }

KafkaApis的handleSyncGroupRequest(...)方法在经过授权检查通过后,直接将处理委托给了GroupCoordinator的handleSyncGroup(...)方法,该方法的源码如下:

kafka.coordinator.GroupCoordinator#handleSyncGroup
  • // 处理SyncGroupRequest请求
  • def handleSyncGroup(groupId: String,
  • generation: Int,
  • memberId: String,
  • groupAssignment: Map[String, Array[Byte]],
  • responseCallback: SyncCallback) {
  • if (!isActive.get) { // 检查GroupCoordinator是否在运行
  • // 调用回调函数,返回错误码GROUP_COORDINATOR_NOT_AVAILABLE
  • responseCallback(Array.empty, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code)
  • } else if (!isCoordinatorForGroup(groupId)) { // 判断当前GroupCoordinator是否负责管理该Consumer Group
  • // 调用回调函数,返回错误码NOT_COORDINATOR_FOR_GROUP
  • responseCallback(Array.empty, Errors.NOT_COORDINATOR_FOR_GROUP.code)
  • } else { // 检查通过
  • // 获取对应的GroupMetadata
  • val group = groupManager.getGroup(groupId)
  • if (group == null)
  • // GroupMetadata为空,直接返回
  • responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID.code)
  • else
  • // 否则使用doSyncGroup()方法处理
  • doSyncGroup(group, generation, memberId, groupAssignment, responseCallback)
  • }
  • }

handleSyncGroup(...)方法也会进行一系列的检查,然后使用doSyncGroup(...)方法进行处理,该方法源码如下:

kafka.coordinator.GroupCoordinator#doSyncGroup
  • private def doSyncGroup(group: GroupMetadata,
  • generationId: Int,
  • memberId: String,
  • groupAssignment: Map[String, Array[Byte]],
  • responseCallback: SyncCallback) {
  • var delayedGroupStore: Option[DelayedStore] = None
  • group synchronized {
  • if (!group.has(memberId)) { // 检测Member是否为此Consumer Group的成员
  • // 返回UNKNOWN_MEMBER_ID错误码
  • responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID.code)
  • } else if (generationId != group.generationId) { // 检测generationId是否合法,以屏蔽来自旧Member成员的请求
  • // 返回ILLEGAL_GENERATION错误码
  • responseCallback(Array.empty, Errors.ILLEGAL_GENERATION.code)
  • } else {
  • group.currentState match { // 根据不同的Group状态分别处理
  • case Dead =>
  • // 直接返回UNKNOWN_MEMBER_ID错误码
  • responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID.code)
  • case PreparingRebalance =>
  • // 返回REBALANCE_IN_PROGRESS错误码
  • responseCallback(Array.empty, Errors.REBALANCE_IN_PROGRESS.code)
  • case AwaitingSync =>
  • // 设置对应的MemberMetadata的awaitingSyncCallback为回调函数
  • group.get(memberId).awaitingSyncCallback = responseCallback
  • // 完成之前相关的DelayedHeartbeat并创建新的DelayedHeartbeat对象等待下次心跳到来
  • completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId))
  • // if this is the leader, then we can attempt to persist state and transition to stable
  • // 只有在收到的SyncGroupRequest是Group Leader发送的才会处理
  • if (memberId == group.leaderId) {
  • info(s"Assignment received from leader for group ${group.groupId} for generation ${group.generationId}")
  • // fill any missing members with an empty assignment
  • // 将未分配到分区的Member对应的分配结果填充为空的Byte数组
  • val missing = group.allMembers -- groupAssignment.keySet
  • val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap
  • // 通过GroupMetadataManager将GroupMetadata相关信息形成消息,并写入到对应的Offsets Topic分区中
  • // 第三个参数是回调函数,会在消息被追加后被调用
  • delayedGroupStore = Some(groupManager.prepareStoreGroup(group, assignment, (errorCode: Short) => {
  • group synchronized {
  • // another member may have joined the group while we were awaiting this callback,
  • // so we must ensure we are still in the AwaitingSync state and the same generation
  • // when it gets invoked. if we have transitioned to another state, then do nothing
  • // 检测Consumer Group状态及年代信息
  • if (group.is(AwaitingSync) && generationId == group.generationId) {
  • if (errorCode != Errors.NONE.code) {
  • // 有错误的情况,清空分区的分配结果,发送异常响应
  • resetAndPropagateAssignmentError(group, errorCode)
  • // 切换成PreparingRebalance状态
  • maybePrepareRebalance(group)
  • } else {
  • // 正常的情况,发送正常的SyncGroupResponse
  • setAndPropagateAssignment(group, assignment)
  • // 转换为Stable状态
  • group.transitionTo(Stable)
  • }
  • }
  • }
  • }))
  • }
  • case Stable =>
  • // if the group is stable, we just return the current assignment
  • // 将分配给此Member的负责处理的分区信息返回
  • val memberMetadata = group.get(memberId)
  • responseCallback(memberMetadata.assignment, Errors.NONE.code)
  • // 完成之前相关的DelayedHeartbeat并创建新的DelayedHeartbeat对象等待下次心跳到来
  • completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId))
  • }
  • }
  • }
  • // store the group metadata without holding the group lock to avoid the potential
  • // for deadlock when the callback is invoked
  • // 使用GroupCoordinatorManager的store()方法进行处理
  • delayedGroupStore.foreach(groupManager.store)
  • }

doSyncGroup(...)方法在经过一些检查后,会根据对应的GroupMetadata的状态进行处理,关于GroupState的内容我们后面会讨论,这里主要关注在AwaitingSync状态下才能够正常处理SyncGroupRequest请求。

注:关于Dead、PreparingRebalance和Stable的三种情况的处理,Dead状态下说明GroupMetadata是不可用的,直接返回UNKNOWN_MEMBER_ID错误码即可;PreparingRebalance表示当前Consumer Group正准备进行Group内消费者的重新均衡操作,因此也无法接受SyncGroupRequest请求以确定分区分配方案,返回是REBALANCE_IN_PROGRESS错误码;当GroupMetadata确定了分区分配方案后,会转换为Stable状态,在该状态下只会在出现了消费者客户端退出或新消费者客户端加入两种情况下,转换为PreparingRebalance状态,准备对Group内消费者进行重新均衡操作,对于SyncGroupRequest它会将已确定的分区分配方案直接返回。

这里我们注意跟踪一下传递给doSyncGroup(...)方法的responseCallback参数,是由KafkaApis的handleSyncGroupRequest(...)一路传递过来的,最终它会赋值给发送SyncGroupRequest请求的消费者客户端在GroupMetadata中对应的MemberMetadata的awaitingSyncCallback字段进行记录。

另外从源码中也可以得知,SyncGroupRequest请求只有是来自于Consumer Group的Leader消费者时,才会对其进行处理doSyncGroup(...)方法其实将具体的操作委托给了GroupMetadataManager的prepareStoreGroup(...)方法,该方法会根据来自Consumer Group Leader消费者携带的分区分配结果在Offsets Topic中创建日志记录,但该过程和对应的回调其实是被封装为了一个DelayedStore延迟任务返回给了上层的doSyncGroup(...),由它来进行实际的处理。GroupMetadataManager的prepareStoreGroup(...)方法如下:

kafka.coordinator.GroupMetadataManager#prepareStoreGroup
  • // 根据Consumer Group Leader的分区分配结果创建消息,仅仅创建DelayedStore对象,封装消息和回调函数
  • def prepareStoreGroup(group: GroupMetadata,
  • groupAssignment: Map[String, Array[Byte]],
  • responseCallback: Short => Unit): DelayedStore = {
  • // 获取__consumer_offsets中指定Consumer Group的消息数据的魔数和时间戳
  • val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(partitionFor(group.groupId))
  • // 构造消息对象
  • val message = new Message(
  • key = GroupMetadataManager.groupMetadataKey(group.groupId), // 键,包含了Group ID
  • bytes = GroupMetadataManager.groupMetadataValue(group, groupAssignment), // 值,包含了分配结果
  • timestamp = timestamp, // 时间戳
  • magicValue = magicValue) // 魔数
  • // 以主题为__consumer_offsets,分区为Group ID对应的分区,创建TopicPartition对象
  • val groupMetadataPartition = new TopicPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId))
  • // 构建消息字典,表示Offsets Topic分区与消息集合的对应关系
  • val groupMetadataMessageSet = Map(groupMetadataPartition ->
  • new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, message))
  • // 当前Consumer Group的年代信息
  • val generationId = group.generationId
  • // set the callback function to insert the created group into cache after log append completed
  • // 回调函数,会在上述消息成功追加到Offsets Topic对应的分区之后被调用,参数是追加消息的结果
  • def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
  • // the append response should only contain the topics partition
  • // 检查响应内容是否符合要求
  • if (responseStatus.size != 1 || ! responseStatus.contains(groupMetadataPartition))
  • throw new IllegalStateException("Append status %s should only have one partition %s"
  • .format(responseStatus, groupMetadataPartition))
  • // construct the error status in the propagated assignment response
  • // in the cache
  • // 根据消息追加结果更新错误码
  • val status = responseStatus(groupMetadataPartition)
  • var responseCode = Errors.NONE.code
  • if (status.errorCode != Errors.NONE.code) { // 有错误
  • debug("Metadata from group %s with generation %d failed when appending to log due to %s"
  • .format(group.groupId, generationId, Errors.forCode(status.errorCode).exceptionName))
  • // transform the log append error code to the corresponding the commit status error code
  • // 未知主题或分区错误,返回GROUP_COORDINATOR_NOT_AVAILABLE
  • responseCode = if (status.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code) {
  • Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code
  • // 不是分区Leader副本的错误,返回NOT_COORDINATOR_FOR_GROUP
  • } else if (status.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code) {
  • Errors.NOT_COORDINATOR_FOR_GROUP.code
  • // 请求超时错误,返回REBALANCE_IN_PROGRESS
  • } else if (status.errorCode == Errors.REQUEST_TIMED_OUT.code) {
  • Errors.REBALANCE_IN_PROGRESS.code
  • // 消息过大、记录过大、无效的拉取大小等错误,返回UNKNOWN
  • } else if (status.errorCode == Errors.MESSAGE_TOO_LARGE.code
  • || status.errorCode == Errors.RECORD_LIST_TOO_LARGE.code
  • || status.errorCode == Errors.INVALID_FETCH_SIZE.code) {
  • error("Appending metadata message for group %s generation %d failed due to %s, returning UNKNOWN error code to the client"
  • .format(group.groupId, generationId, Errors.forCode(status.errorCode).exceptionName))
  • Errors.UNKNOWN.code
  • } else {
  • error("Appending metadata message for group %s generation %d failed due to unexpected error: %s"
  • .format(group.groupId, generationId, status.errorCode))
  • // 其它错误,直接返回
  • status.errorCode
  • }
  • }
  • responseCallback(responseCode)
  • }
  • // 以消息数据、回调函数构造一个DelayedStore对象
  • DelayedStore(groupMetadataMessageSet, putCacheCallback)
  • }

真正实现追加消息操作的是GroupMetadataManager的store(...)方法,其中会调用ReplicaManager的appendMessages(...)方法追加消息,该方法在追加消息数据时,如果requiredAcks参数为-1表示分区的所有副本都同步了消息后才可以返回响应,该过程会创建DelayedProduce延迟任务放入时间轮中执行。

在GroupMetadataManager的store(...)方法中传递给appendMessages(...)方法中的第二个参数requiredAcks默认为-1,第三个参数是internalTopicsAllowed为true,表示可以向Kafka内部Topic追加消息,第四个参数对应于prepareStoreGroup()中的putCacheCallback()回调方法,该方法会在DelayedProduce延迟任务满足后被调用:

kafka.coordinator.GroupMetadataManager#store
  • // 实现追加消息的操作
  • def store(delayedAppend: DelayedStore) {
  • // call replica manager to append the group message
  • replicaManager.appendMessages(
  • config.offsetCommitTimeoutMs.toLong,
  • config.offsetCommitRequiredAcks, // 默认ACK是-1,需要所有副本都同步
  • true, // allow appending to internal offset topic 允许向内部主题追加消息
  • delayedAppend.messageSet, // 追加的消息集合
  • /**
  • * 回调,对应于prepareStoreGroup()中的putCacheCallback()回调方法
  • * 该方法会在DelayedProduce延迟任务满足后被调用
  • */
  • delayedAppend.callback)
  • }

putCacheCallback()回调方法会处理追加消息的结果,整理得到错误码,最后会调用prepareStoreGroup(...)responseCallback回调函数参数并将错误码传递给它,而responseCallback回调函数的定义其实是位于GroupMetadataManager的store(...)方法中的:

  • ...
  • // 通过GroupMetadataManager将GroupMetadata相关信息形成消息,并写入到对应的Offsets Topic分区中
  • // 第三个参数是回调函数,会在消息被追加后被调用
  • delayedGroupStore = Some(groupManager.prepareStoreGroup(group, assignment, (errorCode: Short) => {
  • group synchronized {
  • // another member may have joined the group while we were awaiting this callback,
  • // so we must ensure we are still in the AwaitingSync state and the same generation
  • // when it gets invoked. if we have transitioned to another state, then do nothing
  • // 检测Consumer Group状态及年代信息
  • if (group.is(AwaitingSync) && generationId == group.generationId) {
  • if (errorCode != Errors.NONE.code) {
  • // 有错误的情况,清空分区的分配结果,发送异常响应
  • resetAndPropagateAssignmentError(group, errorCode)
  • // 切换成PreparingRebalance状态
  • maybePrepareRebalance(group)
  • } else {
  • // 正常的情况,发送正常的SyncGroupResponse
  • setAndPropagateAssignment(group, assignment)
  • // 转换为Stable状态
  • group.transitionTo(Stable)
  • }
  • }
  • }
  • }))
  • ...

这里涉及到了三个方法,其中resetAndPropagateAssignmentError(...)maybePrepareRebalance(...)用于处理有错误的情况,它们会清空分区的分配结果,发送异常响应,并将GroupMetadata切换为PreparingRebalance状态,源码如下:

kafka.coordinator.GroupCoordinator
  • private def resetAndPropagateAssignmentError(group: GroupMetadata, errorCode: Short) {
  • // 检查GroupMetadata的状态是否是AwaitingSync
  • assert(group.is(AwaitingSync))
  • // 清空GroupMetadata中的分配情况
  • group.allMemberMetadata.foreach(_.assignment = Array.empty[Byte])
  • propagateAssignment(group, errorCode)
  • }
  • private def maybePrepareRebalance(group: GroupMetadata) {
  • group synchronized { // 加锁
  • // 只有在State和AwaitingSync的状态下才可以尝试切换到PreparingRebalance状态
  • if (group.canRebalance)
  • prepareRebalance(group)
  • }
  • }
  • // 切换状态PreparingRebalance
  • private def prepareRebalance(group: GroupMetadata) {
  • // if any members are awaiting sync, cancel their request and have them rejoin
  • if (group.is(AwaitingSync)) // 如果是AwaitingSync状态
  • // 重置MemberMetadata.assignment字段,调用awaitingSyncCallback向消费者返回REBALANCE_IN_PROGRESS的错误码
  • resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS.code)
  • // 切换为PreparingRebalance状态,表示准备执行Rebalance操作
  • group.transitionTo(PreparingRebalance)
  • info("Preparing to restabilize group %s with old generation %s".format(group.groupId, group.generationId))
  • // DelayedJoin的超时时长是GroupMetadata中所有Member设置的超时时长的最大值
  • val rebalanceTimeout = group.rebalanceTimeout
  • // 创建DelayedJoin对象
  • val delayedRebalance = new DelayedJoin(this, group, rebalanceTimeout)
  • // 创建DelayedJoin的Watcher Key
  • val groupKey = GroupKey(group.groupId)
  • // 尝试立即完成DelayedJoin,如果不能完成就添加到joinPurgatory炼狱
  • joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
  • }

setAndPropagateAssignment(...)处理消息追加成功的情况,它会发送正常的SyncGroupResponse,并将对应的GroupMetadata切换为Stable状态,源码如下:

kafka.coordinator.GroupCoordinator#setAndPropagateAssignment
  • private def setAndPropagateAssignment(group: GroupMetadata, assignment: Map[String, Array[Byte]]) {
  • assert(group.is(AwaitingSync))
  • // 更新GroupMetadata中每个相关的MemberMetadata.assignment字段
  • group.allMemberMetadata.foreach(member => member.assignment = assignment(member.memberId))
  • /**
  • * 调用每个MemberMetadata的awaitingSyncCallback回调函数,创建SyncGroupResponse对象并添加到RequestChannel中等待发送;
  • * 将本次心跳延迟任务完成并开始下次等待心跳的延迟任务的执行或超时
  • */
  • propagateAssignment(group, Errors.NONE.code)
  • }

可以发现,上述两个过程都调用了GroupCoordinator的propagateAssignment(...)处理分配结果,只是传入的错误码不同:

kafka.coordinator.GroupCoordinator#propagateAssignment
  • // 传递分配结果,其实内部调用了KafkaApis中处理SyncGroupRequest时定义的回调函数
  • private def propagateAssignment(group: GroupMetadata, errorCode: Short) {
  • for (member <- group.allMemberMetadata) { // 遍历所有的Member
  • if (member.awaitingSyncCallback != null) {
  • // 调用awaitingSyncCallback回调函数
  • member.awaitingSyncCallback(member.assignment, errorCode)
  • member.awaitingSyncCallback = null
  • // reset the session timeout for members after propagating the member's assignment.
  • // This is because if any member's session expired while we were still awaiting either
  • // the leader sync group or the storage callback, its expiration will be ignored and no
  • // future heartbeat expectations will not be scheduled.
  • // 开启等待下次心跳的延迟任务
  • completeAndScheduleNextHeartbeatExpiration(group, member)
  • }
  • }
  • }

这里调用的MemberMetadata对象的awaitingSyncCallback回调方法,其实就是由KafkaApis的handleSyncGroupRequest(...)一路传递过来的,调用该回调方法即是在向RequestChannel的相关队列中添加SyncGroupResponse等待发送。收到了SyncGroupRequest请求,其实就可以视为消费者客户端是存活的,因此会completeAndScheduleNextHeartbeatExpiration(...)方法用于将本次心跳延迟任务完成并开启下一次等待心跳的延迟任务的执行或超时。

5.6. OffsetCommitRequest的处理

消费者客户端在进行正常的消费过程以及Rebalance操作之前,都会进行提交Offset的操作,主要用于将消费者消费的每个分区对应的Offset封装成OffsetCommitRequest发送给GroupCoordinator。GroupCoordinator会将这些Offset封装成消息,追加到Offsets Topic主题对应的分区中。

OffsetCommitRequest请求由KafkaApis的handleOffsetCommitRequest(...)方法负责处理,源码如下:

kafka.server.KafkaApis#handleOffsetCommitRequest
  • def handleOffsetCommitRequest(request: RequestChannel.Request) {
  • // 获取请求头、转换请求对象类型
  • val header = request.header
  • val offsetCommitRequest = request.body.asInstanceOf[OffsetCommitRequest]
  • // reject the request if not authorized to the group
  • if (!authorize(request.session, Read, new Resource(Group, offsetCommitRequest.groupId))) { // 检查授权
  • val errorCode = new JShort(Errors.GROUP_AUTHORIZATION_FAILED.code)
  • val results = offsetCommitRequest.offsetData.keySet.asScala.map { topicPartition =>
  • (topicPartition, errorCode)
  • }.toMap
  • val responseHeader = new ResponseHeader(header.correlationId)
  • val responseBody = new OffsetCommitResponse(results.asJava)
  • // 授权失败的响应返回
  • requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
  • } else {
  • // filter non-existent topics
  • // 过滤不存在的主题
  • val invalidRequestsInfo = offsetCommitRequest.offsetData.asScala.filter { case (topicPartition, _) =>
  • !metadataCache.contains(topicPartition.topic)
  • }
  • val filteredRequestInfo = offsetCommitRequest.offsetData.asScala.toMap -- invalidRequestsInfo.keys
  • // 授权检查是否允许操作目标主题
  • val (authorizedRequestInfo, unauthorizedRequestInfo) = filteredRequestInfo.partition {
  • case (topicPartition, offsetMetadata) => authorize(request.session, Read, new Resource(Topic, topicPartition.topic))
  • }
  • // the callback for sending an offset commit response
  • // 回调方法
  • def sendResponseCallback(commitStatus: immutable.Map[TopicPartition, Short]) {
  • val mergedCommitStatus = commitStatus ++ unauthorizedRequestInfo.mapValues(_ => Errors.TOPIC_AUTHORIZATION_FAILED.code)
  • mergedCommitStatus.foreach { case (topicPartition, errorCode) =>
  • if (errorCode != Errors.NONE.code) {
  • debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " +
  • s"on partition $topicPartition failed due to ${Errors.forCode(errorCode).exceptionName}")
  • }
  • }
  • val combinedCommitStatus = mergedCommitStatus.mapValues(new JShort(_)) ++ invalidRequestsInfo.map(_._1 -> new JShort(Errors.UNKNOWN_TOPIC_OR_PARTITION.code))
  • val responseHeader = new ResponseHeader(header.correlationId)
  • val responseBody = new OffsetCommitResponse(combinedCommitStatus.asJava)
  • requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
  • }
  • if (authorizedRequestInfo.isEmpty) // 通过授权的请求为空,直接调用回调方法,传入空集合参数
  • sendResponseCallback(Map.empty)
  • else if (header.apiVersion == 0) { // apiVersion为0的情况,需要写入到Zookeeper中,新版本才是由主题__consumer_offsets进行管理
  • // for version 0 always store offsets to ZK
  • val responseInfo = authorizedRequestInfo.map { // 遍历处理通过授权的请求,最终获得处理结果
  • case (topicPartition, partitionData) =>
  • // 构造得到封装了/consumers/[group_id]/offsets/[topic_name]和/consumers/[group_id]/owners/[topic_name]路径的ZKGroupTopicDirs对象
  • val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicPartition.topic)
  • try {
  • if (!metadataCache.hasTopicMetadata(topicPartition.topic)) // 元数据不包含该主题
  • (topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION.code) // 返回UNKNOWN_TOPIC_OR_PARTITION错误
  • else if (partitionData.metadata != null && // 对应分区的元数据存在
  • partitionData.metadata.length > config.offsetMetadataMaxSize) // 分区的元数据过大(offset.metadata.max.bytes)
  • (topicPartition, Errors.OFFSET_METADATA_TOO_LARGE.code) // 返回OFFSET_METADATA_TOO_LARGE错误
  • else { // 正常情况
  • // 更新Zookeeper中/consumers/[group_id]/offsets/[topic_name]/[partition_id]的数据为提交的Offset
  • zkUtils.updatePersistentPath(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}", partitionData.offset.toString)
  • (topicPartition, Errors.NONE.code) // 无错误码返回
  • }
  • } catch {
  • case e: Throwable => (topicPartition, Errors.forException(e).code)
  • }
  • }
  • // 执行回调
  • sendResponseCallback(responseInfo)
  • } else { // apiVersion不为0的情况
  • // for version 1 and beyond store offsets in offset manager
  • // compute the retention time based on the request version:
  • // if it is v1 or not specified by user, we can use the default retention
  • // apiVersion <= 1或者没有指定offset的保留时长,使用默认时长(24小时)
  • val offsetRetention =
  • if (header.apiVersion <= 1 ||
  • offsetCommitRequest.retentionTime == OffsetCommitRequest.DEFAULT_RETENTION_TIME) // -1
  • coordinator.offsetConfig.offsetsRetentionMs // 默认为24*60*60*1000L
  • else
  • offsetCommitRequest.retentionTime
  • // commit timestamp is always set to now.
  • // "default" expiration timestamp is now + retention (and retention may be overridden if v2)
  • // expire timestamp is computed differently for v1 and v2.
  • // - If v1 and no explicit commit timestamp is provided we use default expiration timestamp.
  • // - If v1 and explicit commit timestamp is provided we calculate retention from that explicit commit timestamp
  • // - If v2 we use the default expiration timestamp
  • // 时间戳
  • val currentTimestamp = SystemTime.milliseconds
  • // 过期时间
  • val defaultExpireTimestamp = offsetRetention + currentTimestamp
  • // 遍历请求进行格式处理,转换为OffsetAndMetadata对象
  • val partitionData = authorizedRequestInfo.mapValues { partitionData =>
  • val metadata = if (partitionData.metadata == null) OffsetMetadata.NoMetadata else partitionData.metadata;
  • new OffsetAndMetadata(
  • // 消息数据
  • offsetMetadata = OffsetMetadata(partitionData.offset, metadata),
  • // 提交时间
  • commitTimestamp = currentTimestamp,
  • // 过期时间
  • expireTimestamp = {
  • if (partitionData.timestamp == OffsetCommitRequest.DEFAULT_TIMESTAMP)
  • defaultExpireTimestamp
  • else
  • offsetRetention + partitionData.timestamp
  • }
  • )
  • }
  • // call coordinator to handle commit offset
  • // 使用GroupCoordinator处理提交的offset消息
  • coordinator.handleCommitOffsets(
  • offsetCommitRequest.groupId,
  • offsetCommitRequest.memberId,
  • offsetCommitRequest.generationId,
  • partitionData,
  • sendResponseCallback)
  • }
  • }
  • }

KafkaApis的handleOffsetCommitRequest(...)会进行一系列的检查操作,在随后处理OffsetCommitRequest请求时,对Offset的存储方式存在两种情况:在旧版本的Kafka中,Offset是存储在Zookeeper中的,新版本改为存储到Offsets Topic中,因此handleOffsetCommitRequest(...)方法做了区别处理,即根据请求头中的apiVersion参数决定,如果apiVersion参数为0,就更新Zookeeper中/consumers/[group_id]/offsets/[topic_name]/[partition_id]节点的数据为提交的Offset后返回响应;如果apiVersion参数不为0,则将请求中的Offset等信息构造为OffsetAndMetadata对象,交由GroupCoordinator的handleCommitOffsets(...)方法处理,该方法源码如下:

kafka.coordinator.GroupCoordinator#handleCommitOffsets
  • /**
  • * 处理客户端提交的offset
  • * @param groupId 消费者组ID
  • * @param memberId 消费者的Member ID
  • * @param generationId 请求关联ID
  • * @param offsetMetadata offset提交数据
  • * @param responseCallback 回调函数
  • */
  • def handleCommitOffsets(groupId: String,
  • memberId: String,
  • generationId: Int,
  • offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
  • responseCallback: immutable.Map[TopicPartition, Short] => Unit) {
  • var delayedOffsetStore: Option[DelayedStore] = None
  • if (!isActive.get) { // 判断当前GroupCoordinator是否允许
  • // 调用回调函数,返回GROUP_COORDINATOR_NOT_AVAILABLE错误码
  • responseCallback(offsetMetadata.mapValues(_ => Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code))
  • } else if (!isCoordinatorForGroup(groupId)) { // 判断当前GroupCoordinator是否负责管理该Consumer Group
  • // 调用回调函数,返回NOT_COORDINATOR_FOR_GROUP错误码
  • responseCallback(offsetMetadata.mapValues(_ => Errors.NOT_COORDINATOR_FOR_GROUP.code))
  • } else if (isCoordinatorLoadingInProgress(groupId)) { // 判断该Consumer Group对应的Offsets Topic分区是否还处于加载过程中
  • // 调用回调函数,返回GROUP_LOAD_IN_PROGRESS错误码
  • responseCallback(offsetMetadata.mapValues(_ => Errors.GROUP_LOAD_IN_PROGRESS.code))
  • } else { // 正常情况
  • // 获取Consumer Group对应的GroupMetadata对象
  • val group = groupManager.getGroup(groupId)
  • if (group == null) { // GroupMetadata不存在
  • // 如果对应的GroupMetadata对象不存在且generationId < 0,则表示GroupCoordinator不维护Consumer Group的分区分配结果,只记录提交的offset信息
  • if (generationId < 0) // generationId小于0,说明该Consumer Group不依赖Kafka做分区管理,允许提交
  • // the group is not relying on Kafka for partition management, so allow the commit
  • delayedOffsetStore = Some(groupManager.prepareStoreOffsets(groupId, memberId, generationId, offsetMetadata,
  • responseCallback))
  • else
  • // the group has failed over to this coordinator (which will be handled in KAFKA-2017),
  • // or this is a request coming from an older generation. either way, reject the commit
  • // 否则可能是因为请求来自旧的GroupCoordinator年代,调用回调函数,返回ILLEGAL_GENERATION错误码
  • responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code))
  • } else { // GroupMetadata存在
  • group synchronized {
  • if (group.is(Dead)) { // GroupMetadata状态为Dead
  • // 调用回调函数,返回UNKNOWN_MEMBER_ID错误码
  • responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID.code))
  • } else if (group.is(AwaitingSync)) { // GroupMetadata状态为AwaitingSync
  • // 调用回调函数,返回REBALANCE_IN_PROGRESS错误码
  • responseCallback(offsetMetadata.mapValues(_ => Errors.REBALANCE_IN_PROGRESS.code))
  • } else if (!group.has(memberId)) { // GroupMetadata不包含该MemberID
  • // 调用回调函数,返回UNKNOWN_MEMBER_ID错误码
  • responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID.code))
  • } else if (generationId != group.generationId) { // 年代信息不匹配
  • // 调用回调函数,返回ILLEGAL_GENERATION错误码
  • responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code))
  • } else {
  • // 获取Member对应的MemberMetadata对象
  • val member = group.get(memberId)
  • // 准备下一次的心跳延迟任务
  • completeAndScheduleNextHeartbeatExpiration(group, member)
  • // 使用GroupMetadataManager提交该offset并在完成后处理回调
  • delayedOffsetStore = Some(groupManager.prepareStoreOffsets(groupId, memberId, generationId,
  • offsetMetadata, responseCallback))
  • }
  • }
  • }
  • }
  • // store the offsets without holding the group lock
  • delayedOffsetStore.foreach(groupManager.store)
  • }

handleCommitOffsets(...)方法在经过各类检查合法之后,会调用GroupMetadataManager的prepareStoreOffsets(...)方法创建DelayedStore对象,然后交给GroupMetadataManager的store(delayedAppend: DelayedStore)方法进行消息数据向日志系统的追加;GroupMetadataManager的prepareStoreOffsets(...)与前面讲解的prepareStoreGroup(...)方法非常类似,源码如下:

kafka.coordinator.GroupMetadataManager#prepareStoreOffsets
  • /**
  • * Store offsets by appending it to the replicated log and then inserting to cache
  • * 产生DelayedStore对象,封装待追加的消息集合和追加后需要执行的回调函数
  • */
  • def prepareStoreOffsets(groupId: String,
  • consumerId: String,
  • generationId: Int,
  • offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
  • responseCallback: immutable.Map[TopicPartition, Short] => Unit): DelayedStore = {
  • // first filter out partitions with offset metadata size exceeding limit
  • /**
  • * 检测OffsetAndMetadata.metadata的长度,该字段默认是空字段
  • * 消费者可以在OffsetCommitRequest中携带除offset之外的额外说明信息
  • * 经过解析后会添加到metadata字段
  • */
  • val filteredOffsetMetadata = offsetMetadata.filter { case (topicPartition, offsetAndMetadata) =>
  • validateOffsetMetadataLength(offsetAndMetadata.metadata)
  • }
  • // construct the message set to append
  • val messages = filteredOffsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>
  • // 获取对应Offsets Topic分区使用的消息格式信息
  • val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(partitionFor(groupId))
  • // 创建记录offset信息的消息,消息的值是offsetAndMetadata中的数据
  • new Message(
  • key = GroupMetadataManager.offsetCommitKey(groupId, topicAndPartition.topic, topicAndPartition.partition),
  • bytes = GroupMetadataManager.offsetCommitValue(offsetAndMetadata),
  • timestamp = timestamp,
  • magicValue = magicValue
  • )
  • }.toSeq
  • // 获取Consumer Group对应的Offsets Topic分区;__consumer_offsets主题,指定的Group ID
  • val offsetTopicPartition = new TopicPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, partitionFor(groupId))
  • // 获取Offsets Topic分区与消息集合的对应关系
  • val offsetsAndMetadataMessageSet = Map(offsetTopicPartition ->
  • new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*))
  • // set the callback function to insert offsets into cache after log append completed
  • // 定义回调方法
  • def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
  • // the append response should only contain the topics partition
  • // 检查响应
  • if (responseStatus.size != 1 || ! responseStatus.contains(offsetTopicPartition))
  • throw new IllegalStateException("Append status %s should only have one partition %s"
  • .format(responseStatus, offsetTopicPartition))
  • // construct the commit response status and insert
  • // the offset and metadata to cache if the append status has no error
  • // 获取响应状态
  • val status = responseStatus(offsetTopicPartition)
  • val responseCode =
  • if (status.errorCode == Errors.NONE.code) { // 无错误码
  • filteredOffsetMetadata.foreach { case (topicAndPartition, offsetAndMetadata) =>
  • // 追加消息成功,则更新offsetsCache集合中对应的OffsetAndMetadata对象
  • putOffset(GroupTopicPartition(groupId, topicAndPartition), offsetAndMetadata)
  • }
  • Errors.NONE.code
  • } else { // 有错误码
  • debug("Offset commit %s from group %s consumer %s with generation %d failed when appending to log due to %s"
  • .format(filteredOffsetMetadata, groupId, consumerId, generationId, Errors.forCode(status.errorCode).exceptionName))
  • // transform the log append error code to the corresponding the commit status error code
  • if (status.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code) // 未知主题或分区错误
  • // 返回GROUP_COORDINATOR_NOT_AVAILABLE
  • Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code
  • else if (status.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code) // 对应的Offsets Topic的分区副本不是Leader副本
  • // 返回NOT_COORDINATOR_FOR_GROUP
  • Errors.NOT_COORDINATOR_FOR_GROUP.code
  • else if (status.errorCode == Errors.MESSAGE_TOO_LARGE.code // 消息过大
  • || status.errorCode == Errors.RECORD_LIST_TOO_LARGE.code // 消息记录列表过大
  • || status.errorCode == Errors.INVALID_FETCH_SIZE.code) // 无效的拉取大小
  • // 返回INVALID_COMMIT_OFFSET_SIZE
  • Errors.INVALID_COMMIT_OFFSET_SIZE.code
  • else
  • // 其他错误,直接返回状态错误码
  • status.errorCode
  • }
  • // compute the final error codes for the commit response
  • // 整理最终的错误状态集
  • val commitStatus = offsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>
  • if (validateOffsetMetadataLength(offsetAndMetadata.metadata)) // 元数据符合大小
  • // 返回上面处理得到的错误码
  • (topicAndPartition, responseCode)
  • else
  • // 否则返回OFFSET_METADATA_TOO_LARGE错误码
  • (topicAndPartition, Errors.OFFSET_METADATA_TOO_LARGE.code)
  • }
  • // finally trigger the callback logic passed from the API layer
  • // 调用回调函数
  • responseCallback(commitStatus)
  • }
  • // 返回DelayedStore对象,其中包装了offset消息数据和回调函数
  • DelayedStore(offsetsAndMetadataMessageSet, putCacheCallback)
  • }

prepareStoreOffsets(...)方法返回的DelayedStore对象会通过store(...)方法完成消息的追加,最终会调用putCacheCallback(...)回调方法,而putCacheCallback(...)中调用的responseCallback()回调函数就是KafkaApis的handleOffsetCommitRequest(...)中定义的sendResponseCallback(...),主要逻辑就是创建OffsetCommitResponse并添加到RequestChannel中等待发送。

5.7. OffsetFetchRequest的处理

当消费者出现故障宕机重新上线后,可以通过向GroupCoordinator发送OffsetFetchRequest获取其最近一次提交的offset,并从此位置重新开始进行消费。OffsetFetchRequest请求首先由KafkaApis的handleOffsetFetchRequest(...)方法进行处理,源码如下:

kafka.server.KafkaApis#handleOffsetFetchRequest
  • /*
  • * Handle an offset fetch request
  • *
  • * Consumer Group向GroupCoordinator发送OffsetFetchRequest请求
  • * 以获取最近一次提交的offset
  • */
  • def handleOffsetFetchRequest(request: RequestChannel.Request) {
  • // 获取请求头,转换请求为OffsetFetchRequest对象
  • val header = request.header
  • val offsetFetchRequest = request.body.asInstanceOf[OffsetFetchRequest]
  • // 构造响应头
  • val responseHeader = new ResponseHeader(header.correlationId)
  • val offsetFetchResponse =
  • // reject the request if not authorized to the group
  • // 检查Group ID的授权
  • if (!authorize(request.session, Read, new Resource(Group, offsetFetchRequest.groupId))) { // 检查授权未通过
  • // 构造封装了INVALID_OFFSET和GROUP_AUTHORIZATION_FAILED的响应对象
  • val unauthorizedGroupResponse = new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.GROUP_AUTHORIZATION_FAILED.code)
  • val results = offsetFetchRequest.partitions.asScala.map { topicPartition => (topicPartition, unauthorizedGroupResponse)}.toMap
  • new OffsetFetchResponse(results.asJava)
  • } else {
  • // 检查分区的授权情况
  • val (authorizedTopicPartitions, unauthorizedTopicPartitions) = offsetFetchRequest.partitions.asScala.partition { topicPartition =>
  • authorize(request.session, Describe, new Resource(Topic, topicPartition.topic))
  • }
  • // 构造未通过授权的分区的响应集合
  • val unauthorizedTopicResponse = new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.TOPIC_AUTHORIZATION_FAILED.code)
  • val unauthorizedStatus = unauthorizedTopicPartitions.map(topicPartition => (topicPartition, unauthorizedTopicResponse)).toMap
  • // 未知分区的响应,包含INVALID_OFFSET和UNKNOWN_TOPIC_OR_PARTITION错误码
  • val unknownTopicPartitionResponse = new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
  • /**
  • * apiVersion为0表示使用旧版本请求,此时的offset存储在ZooKeeper中,所以其处理逻辑主要是ZooKeeper的读取操作。
  • * 在OffsetCommitRequest的处理过程中也有根据版本号进行不同操作的相关代码,请读者注意
  • * apiVersion为1表示使用新版本的请求,此时提交的offset由GroupCoordinator管理,由Kafka的主题__consumer_offsets进行管理
  • */
  • if (header.apiVersion == 0) { // apiVersion == 0的旧版本的处理
  • // version 0 reads offsets from ZK
  • // 遍历授权的主题分区
  • val responseInfo = authorizedTopicPartitions.map { topicPartition =>
  • // 构造构造得到封装了/consumers/[group_id]/offsets/[topic_name]和/consumers/[group_id]/owners/[topic_name]路径的ZKGroupTopicDirs对象
  • val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, topicPartition.topic)
  • try {
  • if (!metadataCache.hasTopicMetadata(topicPartition.topic)) // 元数据不包含该主题
  • (topicPartition, unknownTopicPartitionResponse) // 返回包含INVALID_OFFSET和UNKNOWN_TOPIC_OR_PARTITION错误码的响应
  • else {
  • // 从Zookeeper中读取/consumers/[group_id]/offsets/[topic_name]/[partition_id]路径的数据
  • val payloadOpt = zkUtils.readDataMaybeNull(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}")._1
  • payloadOpt match {
  • case Some(payload) => // 能够读取到数据,根据读取的数据构造响应
  • (topicPartition, new OffsetFetchResponse.PartitionData(payload.toLong, "", Errors.NONE.code))
  • case None => // 未能读取到数据,返回包含INVALID_OFFSET和UNKNOWN_TOPIC_OR_PARTITION错误码
  • (topicPartition, unknownTopicPartitionResponse)
  • }
  • }
  • } catch {
  • case e: Throwable =>
  • (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "",
  • Errors.forException(e).code))
  • }
  • }.toMap
  • // 构造响应对象
  • new OffsetFetchResponse((responseInfo ++ unauthorizedStatus).asJava)
  • } else { // apiVersion == 1的新版本的处理,需要从Kafka的主题__consumer_offsets中读取
  • // version 1 reads offsets from Kafka;
  • // 使用GroupCoordinator读取offset
  • val offsets = coordinator.handleFetchOffsets(offsetFetchRequest.groupId, authorizedTopicPartitions).toMap
  • // Note that we do not need to filter the partitions in the
  • // metadata cache as the topic partitions will be filtered
  • // in coordinator's offset manager through the offset cache
  • // 构造响应对象
  • new OffsetFetchResponse((offsets ++ unauthorizedStatus).asJava)
  • }
  • }
  • trace(s"Sending offset fetch response $offsetFetchResponse for correlation id ${header.correlationId} to client ${header.clientId}.")
  • // 将OffsetFetchResponse放入RequestChannel中等待发送
  • requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, responseHeader, offsetFetchResponse)))
  • }

从上述的源码可知,handleOffsetFetchRequest(...)方法在进行一系列的检查后,针对获取Offset的处理也分为了两种情况:针对旧版本的Kafka将从Zookeeper的/consumers/[group_id]/offsets/[topic_name]/[partition_id]节点读取Offset信息后返回响应;而针对于新版本则委托给GroupCoordinator的handleFetchOffsets(...)方法,该方法源码如下:

kafka.coordinator.GroupCoordinator#handleFetchOffsets
  • // 处理获取分区offset操作
  • def handleFetchOffsets(groupId: String,
  • partitions: Seq[TopicPartition]): Map[TopicPartition, OffsetFetchResponse.PartitionData] = {
  • if (!isActive.get) { // 判断当前GroupCoordinator是否正在运行
  • // 未运行将记录GROUP_COORDINATOR_NOT_AVAILABLE错误码
  • partitions.map { case topicPartition =>
  • (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code))}.toMap
  • } else if (!isCoordinatorForGroup(groupId)) { // 判断当前GroupCoordinator是否负责管理该Consumer Group
  • // 将记录NOT_COORDINATOR_FOR_GROUP错误码
  • partitions.map { case topicPartition =>
  • (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NOT_COORDINATOR_FOR_GROUP.code))}.toMap
  • } else if (isCoordinatorLoadingInProgress(groupId)) { // 判断该Consumer Group对应的Offsets Topic分区是否还处于加载过程中
  • // 将记录GROUP_LOAD_IN_PROGRESS错误码
  • partitions.map { case topicPartition =>
  • (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.GROUP_LOAD_IN_PROGRESS.code))}.toMap
  • } else { // 验证通过,使用GroupMetadataManager的getOffsets()方法获取对应的offset
  • // return offsets blindly regardless the current group state since the group may be using
  • // Kafka commit storage without automatic group management
  • groupManager.getOffsets(groupId, partitions)
  • }
  • }

经过各类检查合法后,GroupCoordinator的handleFetchOffsets(...)方法会直接调用GroupMetadataManager的getOffsets(...)方法获取对应的信息,该方法内部是直接从GroupMetadataManager的offsetsCache中进行获取的:

kafka.coordinator.GroupMetadataManager#getOffsets
  • /**
  • * The most important guarantee that this API provides is that it should never return a stale offset. i.e., it either
  • * returns the current offset or it begins to sync the cache from the log (and returns an error code).
  • *
  • * 获取指定主题分区的offset
  • */
  • def getOffsets(group: String, topicPartitions: Seq[TopicPartition]): Map[TopicPartition, OffsetFetchResponse.PartitionData] = {
  • trace("Getting offsets %s for group %s.".format(topicPartitions, group))
  • if (isGroupLocal(group)) { // 检测GroupCoordinator是否是Consumer Group的管理者
  • if (topicPartitions.isEmpty) { // 如果请求的分区为空,则表示请求全部分区对应的最近提交的offset
  • // Return offsets for all partitions owned by this consumer group. (this only applies to consumers that commit offsets to Kafka.)
  • offsetsCache.filter(_._1.group == group).map { // 遍历offsetsCache,找到对应的Group ID
  • case(groupTopicPartition, offsetAndMetadata) =>
  • // 获取对应的offset,构造为OffsetFetchResponse响应对象
  • (groupTopicPartition.topicPartition, new OffsetFetchResponse.PartitionData(offsetAndMetadata.offset, offsetAndMetadata.metadata, Errors.NONE.code))
  • }.toMap // 最终会将元组列表转换为Map
  • } else {
  • // 查找指定分区集合的最近提交offset
  • topicPartitions.map { topicPartition =>
  • val groupTopicPartition = GroupTopicPartition(group, topicPartition)
  • (groupTopicPartition.topicPartition, getOffset(groupTopicPartition))
  • }.toMap
  • }
  • } else {
  • debug("Could not fetch offsets for group %s (not offset coordinator).".format(group))
  • // 错误码NOT_COORDINATOR_FOR_GROUP
  • topicPartitions.map { topicPartition =>
  • val groupTopicPartition = GroupTopicPartition(group, topicPartition)
  • (groupTopicPartition.topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NOT_COORDINATOR_FOR_GROUP.code))
  • }.toMap
  • }
  • }

5.8. ListGroupsRequest的处理

ListGroupsRequest请求用于获取所有的Consumer Group的信息,该请求一般是通过kafka-consumer-groups.sh脚本产生的,它由KafkaApis的handleListGroupsRequest(...)方法负责处理:

kafka.server.KafkaApis#handleListGroupsRequest
  • // 使用kafka-consumer-groups.sh脚本列出所有的Consumer Group
  • def handleListGroupsRequest(request: RequestChannel.Request) {
  • // 构造请求头
  • val responseHeader = new ResponseHeader(request.header.correlationId)
  • // 构造请求体
  • val responseBody = if (!authorize(request.session, Describe, Resource.ClusterResource)) { // 未通过授权
  • // 返回错误码为CLUSTER_AUTHORIZATION_FAILED的响应
  • ListGroupsResponse.fromError(Errors.CLUSTER_AUTHORIZATION_FAILED)
  • } else { // 通过授权
  • // 使用GroupCoordinator的handleListGroups()方法来处理
  • val (error, groups) = coordinator.handleListGroups()
  • // 将得到的结果构造为ListGroupsResponse响应集合
  • val allGroups = groups.map { group => new ListGroupsResponse.Group(group.groupId, group.protocolType) }
  • // 构造响应对象
  • new ListGroupsResponse(error.code, allGroups.asJava)
  • }
  • // 将响应对象放入RequestChannel等待发送
  • requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
  • }

从源码可以得知,获取所有Consumer Group的操作其实是由GroupCoordinator的handleListGroups()方法实现的,源码如下:

kafka.coordinator.GroupCoordinator#handleListGroups
  • // 获取所有的Consumer Group
  • def handleListGroups(): (Errors, List[GroupOverview]) = {
  • if (!isActive.get) { // 检查GroupCoordinator运行状态
  • (Errors.GROUP_COORDINATOR_NOT_AVAILABLE, List[GroupOverview]())
  • } else {
  • // 如果存在正在加载的Offset Topic分区,则无法返回某些Consumer Group,直接返回GROUP_LOAD_IN_PROGRESS错误码
  • val errorCode = if (groupManager.isLoading()) Errors.GROUP_LOAD_IN_PROGRESS else Errors.NONE
  • // 否则获取GroupMetadataManager的groupsCache的值集,构造为GroupOverview对象列表进行返回
  • (errorCode, groupManager.currentGroups.map(_.overview).toList)
  • }
  • }

该方法会将GroupMetadataManager的currentGroups进行映射后转换为列表返回,GroupMetadataManager的currentGroups获取的其实是groupsCache的值集,也即是GroupMetadata的集合:

kafka.coordinator.GroupMetadataManager#currentGroups
  • def currentGroups(): Iterable[GroupMetadata] = groupsCache.values

而GroupMetadata的overview()方法其实会返回包含了Group ID的样例类GroupOverview实例:

kafka.coordinator.GroupMetadata#overview
  • def overview: GroupOverview = {
  • GroupOverview(groupId, protocolType)
  • }