大数据
流式处理
Kafka
源码解析

Kafka系列 23 - 服务端源码分析 14:GroupCoordinator功能详解

简介:主要讲解GroupCoordinator对各类操作的处理

1. GroupState

在前面我们有提到过,GroupMetadata中使用GroupState特质来表示其状态,该状态也即是Consumer Group的状态;GroupState有四个实现类分别代表了Consumer Group所处的四种不同状态,源码如下:

kafka.coordinator
  • // 用于表示Consumer Group的状态
  • private[coordinator] sealed trait GroupState { def state: Byte }
  • /**
  • * Group is preparing to rebalance
  • *
  • * Consumer Group当前正在准备进行Rebalance操作
  • *
  • * GroupCoordinator可以正常地处理OffsetFetchRequest、LeaveGroupRequest、OffsetCommitRequest,
  • * 但对于收到的HeartbeatRequest和SyncGroupRequest,则会在其响应中携带REBALANCE_IN_PROGRESS错误码进行标识。
  • * 当收到JoinGroupRequest时,GroupCoordinator会先创建对应的DelayedJoin,等待条件满足后对其进行响应。
  • *
  • * action: respond to heartbeats with REBALANCE_IN_PROGRESS(HeartbeatRequest)
  • * respond to sync group with REBALANCE_IN_PROGRESS(SyncGroupRequest)
  • * remove member on leave group request(LeaveGroupRequest)
  • * park join group requests from new or existing members until all expected members have joined(JoinGroupRequest)
  • * allow offset commits from previous generation(OffsetCommitRequest)
  • * allow offset fetch requests(OffsetFetchRequest)
  • * transition: some members have joined by the timeout => AwaitingSync 当有DelayedJoin超时或是Consumer Group之前的Member都已经重新申请加入时进行切换
  • * all members have left the group => Dead 所有的Member都离开Consumer Group时进行切换
  • */
  • private[coordinator] case object PreparingRebalance extends GroupState { val state: Byte = 1 }
  • /**
  • * Group is awaiting state assignment from the leader
  • *
  • * 表示正在等待Group Leader的SyncGroupRequest。
  • * 当GroupCoordinator收到OffsetCommitRequest和HeartbeatRequest请求时,会在其响应中携带REBALANCE_IN_PROGRESS错误码进行标识。
  • * 对于来自Group Follower的SyncGroupRequest,则直接抛弃,直到收到Group Leader的SyncGroupRequest时一起响应。
  • *
  • * Consumer Group当前正在等待Group Leader将分区的分配结果发送到GroupCoordinator
  • *
  • * action: respond to heartbeats with REBALANCE_IN_PROGRESS(HeartbeatRequest)
  • * respond to offset commits with REBALANCE_IN_PROGRESS(OffsetCommitRequest)
  • * park sync group requests from followers until transition to Stable(SyncGroupRequest)
  • * allow offset fetch requests(OffsetFetchRequest)
  • * transition: sync group with state assignment received from leader => Stable 当GroupCoordinator收到Group Leader发来的SyncGroupRequest时进行切换
  • * join group from new member or existing member with updated metadata => PreparingRebalance 有新的Member请求加入Consumer Group,已存在的Member更新元数据
  • * leave group from existing member => PreparingRebalance 已存在的Member退出Consumer Group
  • * member failure detected => PreparingRebalance Member心跳超时
  • */
  • private[coordinator] case object AwaitingSync extends GroupState { val state: Byte = 5}
  • /**
  • * Group is stable
  • *
  • * 标识Consumer Group处于正常状态,这也是Consumer Group的初始状态
  • *
  • * 该状态的Consumer Group,GroupCoordinator可以处理所有的请求:
  • * OffsetFetchRequest、HeartbeatRequest、OffsetCommitRequest、
  • * 来自Group Follower的JoinGroupRequest、来自Consumer Group中现有Member的SyncGroupRequest。
  • *
  • * action: respond to member heartbeats normally(HeartbeatRequest)
  • * respond to sync group from any member with current assignment(SyncGroupRequest)
  • * respond to join group from followers with matching metadata with current group metadata(JoinGroupRequest)
  • * allow offset commits from member of current generation(OffsetCommitRequest)
  • * allow offset fetch requests(OffsetFetchRequest)
  • * transition: member failure detected via heartbeat => PreparingRebalance 有Member心跳检测超时
  • * leave group from existing member => PreparingRebalance 有Member主动退出
  • * leader join-group received => PreparingRebalance 当前的Group Leader发送JoinGroupRequest
  • * follower join-group with new metadata => PreparingRebalance 有新的Member请求加入Consumer Group
  • */
  • private[coordinator] case object Stable extends GroupState { val state: Byte = 3 }
  • /**
  • * Group has no more members
  • *
  • * 处于此状态的Consumer Group中已经没有Member存在了
  • *
  • * 处于此状态的Consumer Group中没有Member,其对应的GroupMetadata也将被删除。
  • * 此状态的Consumer Group,除了OffsetCommitRequest,其他请求的响应中都会携带UNKNOWN_MEMBER_ID错误码进行标识。
  • *
  • * action: respond to join group with UNKNOWN_MEMBER_ID(JoinGroupRequest)
  • * respond to sync group with UNKNOWN_MEMBER_ID(SyncGroupRequest)
  • * respond to heartbeat with UNKNOWN_MEMBER_ID(HeartbeatRequest)
  • * respond to leave group with UNKNOWN_MEMBER_ID(LeaveGroupRequest)
  • * respond to offset commit with UNKNOWN_MEMBER_ID(OffsetCommitRequest)
  • * allow offset fetch requests(OffsetFetchRequest)
  • * transition: Dead is a final state before group metadata is cleaned up, so there are no transitions
  • */
  • private[coordinator] case object Dead extends GroupState { val state: Byte = 4 }

这四种状态的含义和解释如下:

  • PreparingRebalance:Consumer Group当前正在准备进行Rebalance操作。此时GroupCoordinator可以正常地处理OffsetFetchRequest、LeaveGroupRequest、OffsetCommitRequest,但对于收到的HeartbeatRequest和SyncGroupRequest,则会在其响应中携带REBALANCE_IN_PROGRESS错误码进行标识。当收到JoinGroupRequest时,GroupCoordinator会先创建对应的DelayedJoin,等待条件满足后对其进行响应。
  • AwaitingSync:Consumer Group当前正在等待Group Leader将分区的分配结果发送到GroupCoordinator。即表示正在等待Consumer Group Leader的SyncGroupRequest。当GroupCoordinator收到OffsetCommitRequest和HeartbeatRequest请求时,会在其响应中携带REBALANCE_IN_PROGRESS错误码进行标识。对于来自Group Follower的SyncGroupRequest,则直接抛弃,直到收到Consumer Group Leader的SyncGroupRequest时一起响应。
  • Stable:标识Consumer Group处于正常状态,这也是Consumer Group的初始状态。此时的GroupCoordinator可以处理所有的请求,如:OffsetFetchRequest、HeartbeatRequest、OffsetCommitRequest、来自Group Follower的JoinGroupRequest、来自Consumer Group中现有Member的SyncGroupRequest。
  • Dead:处于此状态的Consumer Group中已经没有Member存在了,其对应的GroupMetadata也将被删除。对于此状态的Consumer Group,除了OffsetCommitRequest,其他请求的响应中都会携带UNKNOWN_MEMBER_ID错误码进行标识。

对于这四种状态之间,有如下的装换示意图:

1.ConsumerGroup状态转换图示.png

2. GroupCoordinator概览

GroupCoordinator依赖于前面讲解的GroupMetadataManager,每个Kafka服务端实例都会拥有一个GroupCoordinator,它的定义和重要字段如下:

kafka.coordinator.GroupCoordinator
  • /**
  • * GroupCoordinator handles general group membership and offset management.
  • *
  • * Each Kafka server instantiates a coordinator which is responsible for a set of
  • * groups. Groups are assigned to coordinators based on their group names.
  • *
  • * @param brokerId
  • * @param groupConfig 记录Consumer Group中Consumer Session过期的最小时长和最大时长,这个区间是消费者指定的超时时长的合法区间
  • * @param offsetConfig 记录OffsetMetadata相关的配置项(metadata字段允许的最大长度、Offsets Topic中每个分区的副本个数等)
  • * @param groupManager
  • * @param heartbeatPurgatory 用于管理DelayedHeartbeat
  • * @param joinPurgatory 用于管理DelayedJoin
  • * @param time
  • */
  • class GroupCoordinator(val brokerId: Int,
  • val groupConfig: GroupConfig,
  • val offsetConfig: OffsetConfig,
  • val groupManager: GroupMetadataManager,
  • val heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat],
  • val joinPurgatory: DelayedOperationPurgatory[DelayedJoin],
  • time: Time) extends Logging {
  • ...
  • // 处理JoinGroupRequest的回调函数类型
  • type JoinCallback = JoinGroupResult => Unit
  • // 处理SyncGroupRequest的回调函数类型
  • type SyncCallback = (Array[Byte], Short) => Unit
  • // 标识当前GroupCoordinator是否正在运行
  • private val isActive = new AtomicBoolean(false)
  • ...
  • }

在GroupCoordinator的定义中,groupManager是当前GroupCoordinator关联的GroupMetadataManager实例;heartbeatPurgatoryjoinPurgatory是用于处理DelayedHeartbeat和DelayedJoin延迟任务的炼狱对象;我们需要注意的是groupConfigoffsetConfig两个字段,它们是用于记录Consumer Group和OffsetMetadata相关配置项的样例类,源码如下:

  • GroupConfig源码:
kafka.coordinator.GroupConfig
  • case class GroupConfig(groupMinSessionTimeoutMs: Int, // Consumer Group中消费者的Session过期的最小时长
  • groupMaxSessionTimeoutMs: Int) // Consumer Group中消费者的Session过期的最大时长
  • OffsetConfig源码:
kafka.coordinator.OffsetConfig
  • /**
  • * Configuration settings for in-built offset management
  • * @param maxMetadataSize The maximum allowed metadata for any offset commit.
  • * @param loadBufferSize Batch size for reading from the offsets segments when loading offsets into the cache.
  • * @param offsetsRetentionMs Offsets older than this retention period will be discarded.
  • * @param offsetsRetentionCheckIntervalMs Frequency at which to check for expired offsets.
  • * @param offsetsTopicNumPartitions The number of partitions for the offset commit topic (should not change after deployment).
  • * @param offsetsTopicSegmentBytes The offsets topic segment bytes should be kept relatively small to facilitate faster
  • * log compaction and faster offset loads
  • * @param offsetsTopicReplicationFactor The replication factor for the offset commit topic (set higher to ensure availability).
  • * @param offsetsTopicCompressionCodec Compression codec for the offsets topic - compression should be turned on in
  • * order to achieve "atomic" commits.
  • * @param offsetCommitTimeoutMs The offset commit will be delayed until all replicas for the offsets topic receive the
  • * commit or this timeout is reached. (Similar to the producer request timeout.)
  • * @param offsetCommitRequiredAcks The required acks before the commit can be accepted. In general, the default (-1)
  • * should not be overridden.
  • */
  • case class OffsetConfig(maxMetadataSize: Int = OffsetConfig.DefaultMaxMetadataSize, // OffsetMetadata中metadata携带信息的最大大小,默认为4KB
  • loadBufferSize: Int = OffsetConfig.DefaultLoadBufferSize, // 用于读取Offsets Topic中Offset的读缓冲大小,默认为5MB
  • offsetsRetentionMs: Long = OffsetConfig.DefaultOffsetRetentionMs, // Offsets Topic中Offset保留的最大时长。默认为24小时
  • offsetsRetentionCheckIntervalMs: Long = OffsetConfig.DefaultOffsetsRetentionCheckIntervalMs, // 检查Offset是否过期的间隔时长,默认为600秒
  • offsetsTopicNumPartitions: Int = OffsetConfig.DefaultOffsetsTopicNumPartitions, // Offsets Topic的分区个数,默认为50个
  • offsetsTopicSegmentBytes: Int = OffsetConfig.DefaultOffsetsTopicSegmentBytes, // Offsets Topic的Segment大小,默认为100MB
  • offsetsTopicReplicationFactor: Short = OffsetConfig.DefaultOffsetsTopicReplicationFactor, // Offsets Topic的副本因子,默认为3个
  • offsetsTopicCompressionCodec: CompressionCodec = OffsetConfig.DefaultOffsetsTopicCompressionCodec, // Offsets Topic的压缩器,默认没有压缩器
  • offsetCommitTimeoutMs: Int = OffsetConfig.DefaultOffsetCommitTimeoutMs, // Offsets Topic的处理OffsetCommitRequest的超时时间,默认为5秒
  • offsetCommitRequiredAcks: Short = OffsetConfig.DefaultOffsetCommitRequiredAcks) // Offsets Topic处理生产消息的ACK,默认为-1
  • object OffsetConfig {
  • val DefaultMaxMetadataSize = 4096
  • val DefaultLoadBufferSize = 5*1024*1024
  • val DefaultOffsetRetentionMs = 24*60*60*1000L
  • val DefaultOffsetsRetentionCheckIntervalMs = 600000L
  • val DefaultOffsetsTopicNumPartitions = 50
  • val DefaultOffsetsTopicSegmentBytes = 100*1024*1024
  • val DefaultOffsetsTopicReplicationFactor = 3.toShort
  • val DefaultOffsetsTopicCompressionCodec = NoCompressionCodec
  • val DefaultOffsetCommitTimeoutMs = 5000
  • val DefaultOffsetCommitRequiredAcks = (-1).toShort
  • }

从OffsetConfig的源码可以得知关于__consumer_offsets主题的各项默认配置。

3. HeartbeatRequest的处理

在前面讲解Kafka消费者的运行原理时我们知道,每个消费者都会通过HeartbeatTask任务定期向GroupCoordinator发送HeartbeatRequest,告知GroupCoordinator自己正常在线。HeartbeatRequest由KafkaApis的handleHeartbeatRequest(...)方法处理:

kafka.server.KafkaApis#handleHeartbeatRequest
  • // 处理HeartbeatRequest请求
  • def handleHeartbeatRequest(request: RequestChannel.Request) {
  • // 转换请求体对象为HeartbeatRequest类型
  • val heartbeatRequest = request.body.asInstanceOf[HeartbeatRequest]
  • // 构造响应头
  • val respHeader = new ResponseHeader(request.header.correlationId)
  • // the callback for sending a heartbeat response
  • // 回调函数
  • def sendResponseCallback(errorCode: Short) {
  • val response = new HeartbeatResponse(errorCode)
  • trace("Sending heartbeat response %s for correlation id %d to client %s."
  • .format(response, request.header.correlationId, request.header.clientId))
  • requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, respHeader, response)))
  • }
  • // 检查授权
  • if (!authorize(request.session, Read, new Resource(Group, heartbeatRequest.groupId))) {
  • // 授权未通过
  • val heartbeatResponse = new HeartbeatResponse(Errors.GROUP_AUTHORIZATION_FAILED.code)
  • requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, heartbeatResponse)))
  • }
  • else { // 授权通过
  • // let the coordinator to handle heartbeat
  • // 交由GroupCoordinator的handleHeartbeat()方法处理
  • coordinator.handleHeartbeat(
  • heartbeatRequest.groupId(), // Group ID
  • heartbeatRequest.memberId(), // Member ID
  • heartbeatRequest.groupGenerationId(), // Group年代信息
  • sendResponseCallback)
  • }
  • }

该方法会在授权检查通过后将HeartBeatRequest交给GroupCoordinator的handleHeartbeat(...)方法处理,注意其内部定义的sendResponseCallback(...)回调函数会作为handleHeartbeat(...)方法的最后一个参数传入;handleHeartbeat(...)方法的源码如下:

kafka.coordinator.GroupCoordinator#handleHeartbeat
  • // 处理HeartbeatRequest请求的方法
  • def handleHeartbeat(groupId: String,
  • memberId: String,
  • generationId: Int,
  • responseCallback: Short => Unit) {
  • if (!isActive.get) { // 判断当前GroupCoordinator是否允许
  • // 调用回调函数,返回GROUP_COORDINATOR_NOT_AVAILABLE错误码
  • responseCallback(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code)
  • } else if (!isCoordinatorForGroup(groupId)) { // 判断当前GroupCoordinator是否负责管理该Consumer Group
  • // 调用回调函数,返回NOT_COORDINATOR_FOR_GROUP错误码
  • responseCallback(Errors.NOT_COORDINATOR_FOR_GROUP.code)
  • } else if (isCoordinatorLoadingInProgress(groupId)) { // 判断该Consumer Group对应的Offsets Topic分区是否还处于加载过程中
  • // 调用回调函数,返回NONE错误码
  • // the group is still loading, so respond just blindly
  • responseCallback(Errors.NONE.code)
  • } else { // 检查通过
  • // 获得对应的GroupMetadata
  • val group = groupManager.getGroup(groupId)
  • if (group == null) { // GroupMetadata为空,表示对应的分组不存在
  • // 返回UNKNOWN_MEMBER_ID错误码
  • responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
  • } else {
  • // 对应的分组存在
  • group synchronized {
  • if (group.is(Dead)) { // 状态为Dead,返回UNKNOWN_MEMBER_ID错误码
  • // if the group is marked as dead, it means some other thread has just removed the group
  • // from the coordinator metadata; this is likely that the group has migrated to some other
  • // coordinator OR the group is in a transient unstable phase. Let the member retry
  • // joining without the specified member id,
  • responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
  • } else if (!group.is(Stable)) { // 状态不为Stable,返回REBALANCE_IN_PROGRESS错误码
  • responseCallback(Errors.REBALANCE_IN_PROGRESS.code)
  • } else if (!group.has(memberId)) { // 分组内并没有该Member,返回UNKNOWN_MEMBER_ID错误码
  • responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
  • } else if (generationId != group.generationId) { // 分组年代信息不对,返回ILLEGAL_GENERATION错误码
  • responseCallback(Errors.ILLEGAL_GENERATION.code)
  • } else { // 检查通过
  • // 获取对应的MemberMetadata
  • val member = group.get(memberId)
  • // 调度下一次心跳定时任务
  • completeAndScheduleNextHeartbeatExpiration(group, member)
  • // 调用回调函数
  • responseCallback(Errors.NONE.code)
  • }
  • }
  • }
  • }
  • }

handleHeartbeat(...)方法内也会做几项检查,这里主要注意的是,只有在对应的GroupMetadata存在且状态为Stable时才会进行处理,当GroupMetadata为AwaitingSync或PreparingRebalance会返回REBALANCE_IN_PROGRESS错误码。

在检查通过后会直接调用completeAndScheduleNextHeartbeatExpiration(...),该方法中会更新收到此Member心跳的时间戳,尝试执行其对应的DelayedHeartbeat,并创建新的DelayedHeartbeat对象放入heartbeatPurgatory中等待下次心跳到来或DelayedHeartbeat超时:

kafka.coordinator.GroupCoordinator#completeAndScheduleNextHeartbeatExpiration
  • /**
  • * Complete existing DelayedHeartbeats for the given member and schedule the next one
  • */
  • private def completeAndScheduleNextHeartbeatExpiration(group: GroupMetadata, member: MemberMetadata) {
  • // complete current heartbeat expectation
  • // 更新最后的心跳到达时间为当前时间
  • member.latestHeartbeat = time.milliseconds()
  • // 构造以Group ID和Member ID构成的键
  • val memberKey = MemberKey(member.groupId, member.memberId)
  • // 检查该键对应的Watchers,尝试完成该Watchers中的DelayedOperation
  • heartbeatPurgatory.checkAndComplete(memberKey)
  • // reschedule the next heartbeat expiration deadline
  • // 下一次心跳的过期时间
  • val newHeartbeatDeadline = member.latestHeartbeat + member.sessionTimeoutMs
  • // 创建心跳延迟任务,并提交到heartbeatPurgatory炼狱
  • val delayedHeartbeat = new DelayedHeartbeat(this, group, member, newHeartbeatDeadline, member.sessionTimeoutMs)
  • heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(memberKey))
  • }

很明显,心跳检测其实是通过DelayedHeartbeat延迟任务来实现的。我们其实可以追溯到第一次DelayedHeartbeat任务的添加时机,是在客户端向服务端发送SyncGroupRequest被正确处理时就添加了。

另外,completeAndScheduleNextHeartbeatExpiration(...)会在多个方法中被调用,如处理JoinGroupRequest、SyncGroupRequest、OffsetCommitRequest请求,处理GroupCoordinator迁移等操作中都会调用:

  • GroupCoordinator.completeAndScheduleNextHeartbeatExpiration(...)
  • ↖ GroupCoordinator.propagateAssignment(...)
  • ↖ GroupCoordinator.handleHeartbeat(...)
  • ↖ GroupCoordinator.onGroupLoaded(...)
  • ↖ GroupCoordinator.doSyncGroup(...)
  • ↖ GroupCoordinator.onCompleteJoin(...)
  • ↖ GroupCoordinator.handleCommitOffsets(...)

3.1. DelayedHeartbeat延迟任务

接下来我们了解一下DelayedHeartbeat延迟任务的实现,它的源码如下:

kafka.coordinator.DelayedHeartbeat
  • /**
  • * Delayed heartbeat operations that are added to the purgatory for session timeout checking.
  • * Heartbeats are paused during rebalance.
  • *
  • * @param coordinator GroupCoordinator对象,DelayedHeartbeat中方法的实现方式是调用GroupCoordinator中对应的方法
  • * @param group 对应的GroupMetadata对象
  • * @param member 对应的MemberMetadata对象
  • * @param heartbeatDeadline DelayedHeartbeat的到期时间戳
  • * @param sessionTimeout 指定了DelayedHeartbeat的到期时长,此时间是消费者在JoinGroupRequest中设置的,且符合GroupConfig指定的合法区间
  • */
  • private[coordinator] class DelayedHeartbeat(coordinator: GroupCoordinator,
  • group: GroupMetadata,
  • member: MemberMetadata,
  • heartbeatDeadline: Long,
  • sessionTimeout: Long)
  • extends DelayedOperation(sessionTimeout) {
  • override def tryComplete(): Boolean = coordinator.tryCompleteHeartbeat(group, member, heartbeatDeadline, forceComplete)
  • override def onExpiration() = coordinator.onExpireHeartbeat(group, member, heartbeatDeadline)
  • override def onComplete() = coordinator.onCompleteHeartbeat()
  • }

DelayedHeartbeat与DelayedJoin一样,将所有操作都委托给了GroupCoordinator的相关方法。

在添加DelayedHeartbeat任务时就会尝试一次执行,调用的是tryComplete()方法,也即是GroupCoordinator的tryCompleteHeartbeat(...)方法,该方法会检测下列四个条件,如果满足其中的任意一个条件,则认为DelayedHeartbeat符合执行条件:

  1. 最后一次收到心跳信息的时间与heartbeatDeadline的差距大于sessionTimeout。
  2. awaitingJoinCallback不为null,即消费者正在等待JoinGroupResponse。
  3. awaitingSyncCallback不为null,即消费者正在等待SyncGroupResponse。
  4. 消费者已经离开了Consumer Group。

源码如下:

kafka.coordinator.GroupCoordinator
  • def tryCompleteHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long, forceComplete: () => Boolean) = {
  • group synchronized {
  • /**
  • * 四个条件满足一个即可:
  • * 1. awaitingJoinCallback不为null,即消费者正在等待JoinGroupResponse
  • * 2. awaitingSyncCallback不为null,即消费者正在等待SyncGroupResponse
  • * 3. 最后一次收到心跳信息的时间与heartbeatDeadline的差距大于sessionTimeout
  • * 4. 消费者已经离开了Consumer Group
  • */
  • if (shouldKeepMemberAlive(member, heartbeatDeadline) || member.isLeaving)
  • forceComplete()
  • else false
  • }
  • }
  • // 判断是否应该判定Member为存活状态
  • private def shouldKeepMemberAlive(member: MemberMetadata, heartbeatDeadline: Long) =
  • member.awaitingJoinCallback != null || // awaitingJoinCallback不为null,即消费者正在等待JoinGroupResponse
  • member.awaitingSyncCallback != null || // awaitingSyncCallback不为null,即消费者正在等待SyncGroupResponse
  • member.latestHeartbeat + member.sessionTimeoutMs > heartbeatDeadline // 最后一次收到心跳信息的时间与heartbeatDeadline的差距大于sessionTimeout

forceComplete()中调用的是onComplete(),最终还是调用的GroupCoordinator的onCompleteHeartbeat(),而该方法是空实现,没有任何动作。

当DelayedHeartbeat到期时会调用onExpiration()方法,该方法会调用GroupCoordinator的onExpireHeartbeat(...)方法,它会将其对应的MemberMetadata从GroupMetadata中删除,并按照当前GroupMetadata所处的状态进行分类处理:

kafka.coordinator.GroupCoordinator
  • // DelayedHeartbeat到期时会执行该方法
  • def onExpireHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long) {
  • group synchronized {
  • if (!shouldKeepMemberAlive(member, heartbeatDeadline))
  • onMemberFailure(group, member)
  • }
  • }
  • // 使Member下线
  • private def onMemberFailure(group: GroupMetadata, member: MemberMetadata) {
  • trace("Member %s in group %s has failed".format(member.memberId, group.groupId))
  • group.remove(member.memberId) // 将Member从对应的GroupMetadata中移除
  • group.currentState match { // 根据GroupMetadata状态进行处理
  • // 无操作
  • case Dead =>
  • // 之前的分区分配可能已经失效了,将GroupMetadata切换成PreparingRebalance状态
  • case Stable | AwaitingSync => maybePrepareRebalance(group)
  • // GroupMetadata中的Member减少,可能满足DelayedJoin的执行条件,尝试执行
  • case PreparingRebalance => joinPurgatory.checkAndComplete(GroupKey(group.groupId))
  • }
  • }

4. JoinGroupRequest的处理

回顾前面讲解的Kafka消费者的运行原理中提到过,Consumer Group的Rebalance操作主要通过JoinGroupRequest请求和JoinGroupResponse响应来完成。JoinGroupRequest是消费者客户端发往GroupCoordinator节点用于获取分区信息的请求对象,如果有多个消费者客户端同时发送JoinGroupRequest请求进行获取,最终返回的JoinGroupResponse响应中,会有一个消费者客户端被标识为Group Leader,其他的节点均为Group Follower。JoinGroupRequest在API层是由KafkaApis的handleJoinGroupRequest(...)方法负责处理的,源码如下:

kafka.server.KafkaApis#handleJoinGroupRequest
  • // 处理JoinGroupRequest请求
  • def handleJoinGroupRequest(request: RequestChannel.Request) {
  • import JavaConversions._
  • // 转换请求体为JoinGroupRequest对象
  • val joinGroupRequest = request.body.asInstanceOf[JoinGroupRequest]
  • // 构造响应头
  • val responseHeader = new ResponseHeader(request.header.correlationId)
  • // the callback for sending a join-group response
  • // 定义回调函数
  • def sendResponseCallback(joinResult: JoinGroupResult) {
  • val members = joinResult.members map { case (memberId, metadataArray) => (memberId, ByteBuffer.wrap(metadataArray)) }
  • val responseBody = new JoinGroupResponse(joinResult.errorCode, joinResult.generationId, joinResult.subProtocol,
  • joinResult.memberId, joinResult.leaderId, members)
  • trace("Sending join group response %s for correlation id %d to client %s."
  • .format(responseBody, request.header.correlationId, request.header.clientId))
  • requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
  • }
  • if (!authorize(request.session, Read, new Resource(Group, joinGroupRequest.groupId()))) { // 检查授权
  • val responseBody = new JoinGroupResponse(
  • Errors.GROUP_AUTHORIZATION_FAILED.code,
  • JoinGroupResponse.UNKNOWN_GENERATION_ID, // -1
  • JoinGroupResponse.UNKNOWN_PROTOCOL, // 空字符串
  • JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId,空字符串
  • JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId,空字符串
  • Map.empty[String, ByteBuffer])
  • // 将响应放入RequestChannel等待发送
  • requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
  • } else { // 授权检查通过
  • // let the coordinator to handle join-group
  • // 构造可接受的PartitionAssignor协议集合
  • val protocols = joinGroupRequest.groupProtocols().map(protocol =>
  • (protocol.name, Utils.toArray(protocol.metadata))).toList
  • // 将JoinGroupRequest交给GroupCoordinator的handleJoinGroup()方法处理
  • coordinator.handleJoinGroup(
  • joinGroupRequest.groupId, // 组ID
  • joinGroupRequest.memberId, // Member ID
  • request.header.clientId, // 客户端ID
  • request.session.clientAddress.toString, // 客户端地址
  • joinGroupRequest.sessionTimeout, // 客户端配置的超时时间
  • joinGroupRequest.protocolType,
  • protocols, // 客户端可接受的PartitionAssignor协议
  • sendResponseCallback) // 回调
  • }
  • }

从源码可知,该方法在经过权限验证之后,会将具体的处理委托给GroupCoordinator的handleJoinGroup(...)方法,注意此处传递的参数,从JoinGroupRequest中得到的Group ID、Member ID、Client ID、Client Address以及定义的sendResponseCallback回调函数都会一并传入;handleJoinGroup(...)方法的源码如下:

kafka.coordinator.GroupCoordinator#handleJoinGroup
  • // 处理JoinGroupRequest
  • def handleJoinGroup(groupId: String,
  • memberId: String,
  • clientId: String,
  • clientHost: String,
  • sessionTimeoutMs: Int,
  • protocolType: String,
  • protocols: List[(String, Array[Byte])],
  • responseCallback: JoinCallback) {
  • if (!isActive.get) { // 检查GroupCoordinator是否在运行
  • // 调用回调函数,返回错误码GROUP_COORDINATOR_NOT_AVAILABLE
  • responseCallback(joinError(memberId, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code))
  • } else if (!validGroupId(groupId)) { // 检查是否提供了Group ID
  • // 调用回调函数,返回错误码INVALID_GROUP_ID
  • responseCallback(joinError(memberId, Errors.INVALID_GROUP_ID.code))
  • } else if (!isCoordinatorForGroup(groupId)) { // 判断当前GroupCoordinator是否负责管理该Consumer Group
  • // 调用回调函数,返回错误码NOT_COORDINATOR_FOR_GROUP
  • responseCallback(joinError(memberId, Errors.NOT_COORDINATOR_FOR_GROUP.code))
  • } else if (isCoordinatorLoadingInProgress(groupId)) { // 判断该Consumer Group对应的Offsets Topic分区是否还处于加载过程中
  • // 调用回调函数,返回错误码GROUP_LOAD_IN_PROGRESS
  • responseCallback(joinError(memberId, Errors.GROUP_LOAD_IN_PROGRESS.code))
  • } else if (sessionTimeoutMs < groupConfig.groupMinSessionTimeoutMs ||
  • sessionTimeoutMs > groupConfig.groupMaxSessionTimeoutMs) { // 检查超时时间是否合法,不能超出GroupConfig中配置的超时时长区间
  • // 调用回调函数,返回错误码INVALID_SESSION_TIMEOUT
  • responseCallback(joinError(memberId, Errors.INVALID_SESSION_TIMEOUT.code))
  • } else { // 各类检查合格
  • // only try to create the group if the group is not unknown AND
  • // the member id is UNKNOWN, if member is specified but group does not
  • // exist we should reject the request
  • // 尝试从GroupMetadataManager的groupsCache集合中获取Group ID对应的GroupMetadata
  • var group = groupManager.getGroup(groupId)
  • if (group == null) { // 获取的GroupMetadata为空
  • if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID) { // MemberID不为空,返回UNKNOWN_MEMBER_ID错误
  • responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))
  • } else { // MemberID为空,表示该Group ID是新的,需要创建GroupMetadata
  • // 创建新的GroupMetadata并存入GroupMetadataManager的groupsCache集合
  • group = groupManager.addGroup(new GroupMetadata(groupId, protocolType))
  • // 进行后续处理
  • doJoinGroup(group, memberId, clientId, clientHost, sessionTimeoutMs, protocolType, protocols, responseCallback)
  • }
  • } else { // 获取的GroupMetadata不为空,直接进行后续处理
  • doJoinGroup(group, memberId, clientId, clientHost, sessionTimeoutMs, protocolType, protocols, responseCallback)
  • }
  • }
  • }

GroupCoordinator的handleJoinGroup(...)方法也会执行一系列的检查,然后根据Group ID尝试从GroupMetadataManager中获取对应的GroupMetadata对象,这里会分三种情况分别进行处理:

  1. 如果对应的GroupMetadata不存在,但客户端的Member ID不为空,说明客户端发送了错误的JoinGroupRequest,此时会返回UNKNOWN_MEMBER_ID错误码。
  2. 如果对应的GroupMetadata不存在,且客户端的Member ID为空,说明这个JoinGroupRequest可能是某个新的Consumer Group中的第一个消费者发送的,需要创建新的GroupMetadata,然后交由doJoinGroup(...)方法处理。
  3. 如果对应的GroupMetadata存在,就直接交给doJoinGroup(...)方法处理。

从上面三种情况可知,doJoinGroup(...)方法是处理该请求的主要方法,源码如下:

kafka.coordinator.GroupCoordinator#doJoinGroup
  • // 处理JoinGroupRequest
  • private def doJoinGroup(group: GroupMetadata,
  • memberId: String,
  • clientId: String,
  • clientHost: String,
  • sessionTimeoutMs: Int,
  • protocolType: String,
  • protocols: List[(String, Array[Byte])],
  • responseCallback: JoinCallback) {
  • group synchronized { // 加锁
  • /**
  • * 第1个分支的检测:检测Member支持的PartitionAssignor,
  • * 需要检测每个消费者支持的PartitionAssignor集合与GroupMetadata中的候选PartitionAssignor集合(即candidateProtocols字段)是否有交集。
  • * 第2个分支的检测:检测memberId,
  • * JoinGroupRequest可能是来自Consumer Group中已知的Member,此时请求会携带之前被分配过的memberId,这里就要检测memberId是否能被GroupMetadata识别
  • */
  • if (group.protocolType != protocolType || !group.supportsProtocols(protocols.map(_._1).toSet)) {
  • // if the new member does not support the group protocol, reject it
  • // 调用回调函数,返回INCONSISTENT_GROUP_PROTOCOL错误码
  • responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL.code))
  • } else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && !group.has(memberId)) {
  • // if the member trying to register with a un-recognized id, send the response to let
  • // it reset its member id and retry
  • // 调用回调函数,返回UNKNOWN_MEMBER_ID错误码
  • responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))
  • } else {
  • // 根据Group当前的状态进行分别处理
  • group.currentState match {
  • case Dead => // Dead状态,直接返回UNKNOWN_MEMBER_ID错误码
  • // if the group is marked as dead, it means some other thread has just removed the group
  • // from the coordinator metadata; this is likely that the group has migrated to some other
  • // coordinator OR the group is in a transient unstable phase. Let the member retry
  • // joining without the specified member id,
  • responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))
  • case PreparingRebalance =>
  • if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
  • // 未知的新Member申请加入,则创建Member并分配memberId,并加入GroupMetadata中
  • addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocols, group, responseCallback)
  • } else {
  • // 已知Member重新申请加入,则更新GroupMetadata中记录的Member信息
  • val member = group.get(memberId)
  • updateMemberAndRebalance(group, member, protocols, responseCallback)
  • }
  • case AwaitingSync =>
  • if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
  • // 未知的新Member申请加入,则创建MemberMetadata并分配memberId,并加入GroupMetadata中
  • addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocols, group, responseCallback)
  • } else { // 已知Member重新申请加入,要区分Member支持的PartitionAssignor是否发生了变化
  • // 获取对应的MemberMetadata对象
  • val member = group.get(memberId)
  • if (member.matches(protocols)) { // 未发生变化,将当前Member集合信息返回给Group Leader
  • // member is joining with the same metadata (which could be because it failed to
  • // receive the initial JoinGroup response), so just return current group information
  • // for the current generation.
  • responseCallback(JoinGroupResult(
  • members = if (memberId == group.leaderId) { // 发送JoinGroupRequest的是Group Leader
  • // 直接返回MemberMetadata
  • group.currentMemberMetadata
  • } else {
  • // 否则返回空字典
  • Map.empty
  • },
  • memberId = memberId, // Member ID
  • generationId = group.generationId, // 当前Group的年代信息
  • subProtocol = group.protocol, // 当前Group的PartitionAssignor
  • leaderId = group.leaderId, // 当前Group的Leader ID
  • errorCode = Errors.NONE.code))
  • } else {
  • // member has changed metadata, so force a rebalance
  • // 发生变化,更新Member信息;将状态切换为PreparingRebalance
  • updateMemberAndRebalance(group, member, protocols, responseCallback)
  • }
  • }
  • case Stable =>
  • if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) { // 未知的新Member申请加入
  • // if the member id is unknown, register the member to the group
  • // 创建Member并分配memberId,并加入GroupMetadata中,将状态切换为PreparingRebalance
  • addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocols, group, responseCallback)
  • } else { // 已知Member重新申请加入,要区分Member支持的PartitionAssignor是否发生了变化
  • val member = group.get(memberId)
  • if (memberId == group.leaderId || !member.matches(protocols)) { // 发生变化或者发送JoinGroupRequest请求的是Group Leader
  • // force a rebalance if a member has changed metadata or if the leader sends JoinGroup.
  • // The latter allows the leader to trigger rebalances for changes affecting assignment
  • // which do not affect the member metadata (such as topic metadata changes for the consumer)
  • // 更新Member信息,将状态切换为PreparingRebalance
  • updateMemberAndRebalance(group, member, protocols, responseCallback)
  • } else { // 未发生变化
  • // for followers with no actual change to their metadata, just return group information
  • // for the current generation which will allow them to issue SyncGroup
  • // 将GroupMetadata的当前状态返回,消费者会发送SyncGroupRequest继续后面的操作
  • responseCallback(JoinGroupResult(
  • members = Map.empty,
  • memberId = memberId, // Member ID
  • generationId = group.generationId, // 当前Group的年代信息
  • subProtocol = group.protocol, // 当前Group的PartitionAssignor
  • leaderId = group.leaderId, // 当前Group的Leader ID
  • errorCode = Errors.NONE.code))
  • }
  • }
  • }
  • // 根据当前的状态决定是否尝试完成相关的DelayedJoin操作
  • if (group.is(PreparingRebalance))
  • joinPurgatory.checkAndComplete(GroupKey(group.groupId))
  • }
  • }
  • }

doJoinGroup(...)方法需要检测每个客户端支持的PartitionAssignor集合与GroupMetadata中的候选PartitionAssignor集合(即candidateProtocols字段)是否有交集,以便确定客户端支持的PartitionAssignor,如果无法支持则返回INCONSISTENT_GROUP_PROTOCOL错误码;还会检测客户端的Member ID,如果客户端携带的Member ID不为空,说明该JoinGroupRequest是来自Consumer Goup中已知的Member,此时需要检测Member ID对应的MemberMetadata是否能被GroupMetadata识别,如果无法识别则返回UNKNOWN_MEMBER_ID错误码。

在两项检查通过后,会根据Consumer Group的状态分别进行处理:

Dead状态:直接返回UNKNOWN_MEMBER_ID错误码即可。

PreparingRebalance状态:此时需要根据JoinGroupRequest请求携带的Member ID分别处理;如果携带的Member ID为空,则创建对应的MemberMetadata,分配Member ID,并将MemberMetadata加入GroupMetadata中。也有可能JoinGroupRequest是来自Consumer Goup中已知的Member,请求会携带之前被分配过的Member ID,此时会更新GroupMetadata中记录的Member信息。

AwaitingSync状态:该状态下也会根据Member ID进行分别处理;如果Member ID为空,表示是新Member申请加入,则与PreparingRebalance状态一样调用addMemberAndRebalance(...)方法进行处理。如果Member ID不为空,表示是已知的Member重新申请加入,则要区分Member支持的PartitionAssignor是否发生了变化,若未发生变化,则将当前GroupMetadata中存储的MemberMetadata集合信息返回给Group Leader。若发生变化,则更新Member对应的MemberMetadata信息,这一步由上面提到的updateMemberAndRebalance(...)方法完成。

Stable状态:该状态下同样会根据Member ID进行分别处理;如果是未知的新Member申请加入,则创建MemberMetadata对象,分配Member ID,并加入GroupMetadata中。然后调用将Consumer Group状态切换为PreparingRebalance。如果是已知Member重新申请加入,则要区分Member支持的PartitionAssignor是否发生了变化:如果发生变化或者发送JoinGroupRequest请求的是Group Leader,则更新对应的MemberMetadata信息,并将Consumer Group状态切换为PreparingRebalance;如果未发生变化则将GroupMetadata的当前状态返回,消费者会发送SyncGroupRequest继续后面的操作。

doJoinGroup(...)方法的最后会根据当前的状态决定是否尝试完成相关的DelayedJoin操作。

在上面的四个状态对应的操作中,添加新的MemberMetadata对象和更新已存在的MemberMetadata对象的信息这两个操作分别由addMemberAndRebalance(...)方法和updateMemberAndRebalance(...)实现,源码如下:

kafka.coordinator.GroupCoordinator
  • // 负责添加Member信息
  • private def addMemberAndRebalance(sessionTimeoutMs: Int,
  • clientId: String,
  • clientHost: String,
  • protocols: List[(String, Array[Byte])],
  • group: GroupMetadata,
  • callback: JoinCallback) = {
  • // use the client-id with a random id suffix as the member-id
  • // 生产Member ID为"客户端ID-UUID随机字符串"
  • val memberId = clientId + "-" + group.generateMemberIdSuffix
  • // 创建MemberMetadata对象
  • val member = new MemberMetadata(memberId, group.groupId, clientId, clientHost, sessionTimeoutMs, protocols)
  • // 设置MemberMetadata对象的awaitingJoinCallback为响应回调
  • member.awaitingJoinCallback = callback
  • // 将MemberMetadata添加到GroupMetadata中
  • group.add(member.memberId, member)
  • // 将状态切换为PreparingRebalance
  • maybePrepareRebalance(group)
  • member
  • }
  • // 负责更新Member信息
  • private def updateMemberAndRebalance(group: GroupMetadata,
  • member: MemberMetadata,
  • protocols: List[(String, Array[Byte])],
  • callback: JoinCallback) {
  • // 更新MemberMetadata支持的PartitionAssignor的协议和awaitingJoinCallback回调
  • member.supportedProtocols = protocols
  • member.awaitingJoinCallback = callback
  • // 尝试进行状态切换到PreparingRebalance
  • maybePrepareRebalance(group)
  • }

注:我们注意这两个方法中创建MemberMetadata时赋值给awaitingJoinCallback的参数callback,它其实来自于KafkaApis的handleJoinGroupRequest(...)中定义的sendResponseCallback(...)回调函数;同时,MemberMetadata的awaitingJoinCallback不仅是一个回调函数,还是一个非常重要的标识,它会参与检测所有已知MemberMetadata是否已经发送JoinGroupRequest申请重新加入。

在这两个方法的最后都会调用maybePrepareRebalance(...)方法,这个方法在前面的文章中提到过,它会判断Consumer Group的状态,如果是Stable或AwaitingSync,则会调用prepareRebalance(...)方法将状态切换成PreparingRebalance,并创建相应的DelayedJoin:

kafka.coordinator.GroupCoordinator
  • private def maybePrepareRebalance(group: GroupMetadata) {
  • group synchronized { // 加锁
  • // 只有在Stable和AwaitingSync的状态下才会执行prepareRebalance()
  • if (group.canRebalance)
  • prepareRebalance(group)
  • }
  • }
  • // 判断GroupMetadata是否在Stable或AwaitingSync的状态
  • def canRebalance = state == Stable || state == AwaitingSync
  • // 切换状态PreparingRebalance
  • private def prepareRebalance(group: GroupMetadata) {
  • // if any members are awaiting sync, cancel their request and have them rejoin
  • if (group.is(AwaitingSync)) // 如果是AwaitingSync状态
  • // 重置MemberMetadata.assignment字段,调用awaitingSyncCallback向消费者返回REBALANCE_IN_PROGRESS的错误码
  • resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS.code)
  • // 切换为PreparingRebalance状态,表示准备执行Rebalance操作
  • group.transitionTo(PreparingRebalance)
  • info("Preparing to restabilize group %s with old generation %s".format(group.groupId, group.generationId))
  • // DelayedJoin的超时时长是GroupMetadata中所有Member设置的超时时长的最大值
  • val rebalanceTimeout = group.rebalanceTimeout
  • // 创建DelayedJoin对象
  • val delayedRebalance = new DelayedJoin(this, group, rebalanceTimeout)
  • // 创建DelayedJoin的Watcher Key
  • val groupKey = GroupKey(group.groupId)
  • // 尝试立即完成DelayedJoin,如果不能完成就添加到joinPurgatory炼狱
  • joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
  • }

对于AwaitingSync状态的Consumer Group来说,有的Group Follower已经发送了SyncGroupRequest,GroupCoordinator在等待Group Leader通过SyncGroupRequest将分区的分配结果发送过来。如果此时进行状态切换,需要对这些已经发送SyncGroupRequest的Group Follower返回错误码,这部分操作在resetAndPropagateAssignmentError()方法中完成:

kafka.coordinator.GroupCoordinator
  • private def resetAndPropagateAssignmentError(group: GroupMetadata, errorCode: Short) {
  • // 检查GroupMetadata的状态是否是AwaitingSync
  • assert(group.is(AwaitingSync))
  • // 清空GroupMetadata中的分配情况
  • group.allMemberMetadata.foreach(_.assignment = Array.empty[Byte])
  • propagateAssignment(group, errorCode)
  • }
  • // 传递分配结果,其实内部调用了KafkaApis中处理SyncGroupRequest时定义的回调函数
  • private def propagateAssignment(group: GroupMetadata, errorCode: Short) {
  • for (member <- group.allMemberMetadata) { // 遍历所有的Member
  • if (member.awaitingSyncCallback != null) {
  • // 调用awaitingSyncCallback回调函数
  • member.awaitingSyncCallback(member.assignment, errorCode)
  • member.awaitingSyncCallback = null
  • // reset the session timeout for members after propagating the member's assignment.
  • // This is because if any member's session expired while we were still awaiting either
  • // the leader sync group or the storage callback, its expiration will be ignored and no
  • // future heartbeat expectations will not be scheduled.
  • // 开启等待下次心跳的延迟任务
  • completeAndScheduleNextHeartbeatExpiration(group, member)
  • }
  • }
  • }

4.1. DelayedJoin延迟任务

在分析DelayedJoin之前,我们先关注一个问题,DelayedJoin什么时候才会被创建?其实这里需要从JoinGroupRequest请求的性质来讨论。在前面的内容中,我们提到JoinGroupRequest请求可能来自未知的客户端,也可能来自已知的客户端。

我们先讨论前一种情况:即JoinGroupRequest请求来自未知的客户端,此时该的客户端的意图其实是“新客户端请求加入”;这里有一个极端的情况,即Consumer Group对应的GroupMetadata不存在,所有JoinGroupRequest请求都来自未知的客户端,这种情况表明该Consumer Group会被新创建,即需要创建新的GroupMetadata,并从所有发送JoinGroupRequest请求的客户端中选择一个Group Leader(默认为Group内的第一个客户端);注意,创建新GroupMetadata的操作是在GroupCoordinator的handleJoinGroup(...)方法之前执行的,也就是在调用doJoinGroup(...)之前,且新创建的GroupMetadata的状态是Stable,因此在doJoinGroup(...)中处理JoinGroupRequest请求时,对于该GroupMetadata中的第一个客户端会调用addMemberAndRebalance(...)方法为其创建对应的MemberMetadata对象并添加到GroupMetadata中,然后切换GroupMetadata的状态为PreparingRebalance,创建对应的DelayedJoin延迟任务。

在这种极端情况结束后,此时GroupMetadata已经创建且状态切换为PreparingRebalance了,对于在此之后的新的客户端发送的JoinGroupRequest请求,在doJoinGroup(...)方法中会执行GroupMetadata状态为PreparingRebalance分支,即为该客户端创建对应的MemberMetadata对象并添加到GroupMetadata中,但由于此时GroupMetadata状态已经转换为PreparingRebalance,因此并不会执行prepareRebalance(...)方法,也即是并不会为该客户端创建DelayedJoin延迟任务,仅仅只是将KafkaApis的handleJoinGroupRequest(...)中定义的sendResponseCallback(...)回调函数记录到了对应的MemberMetadata的awaitingJoinCallback字段上。

对于第二种情况:即JoinGroupRequest请求来自已知客户端,这种情况是由于客户端之前出现故障宕机退出了,当再次上线后发送JoinGroupRequest要求重新加入之前的Consumer Group。这里需要先了解GroupMetadata的状态切换:初始化时为Stable,收到Group Leader的JoinGroupRequest后切换为PreparingRebalance,在执行完DelayedJoin之后会切换为AwaitingSync(后面会讲解),在处理完SyncGroupRequest之后又会切换为Stable;因此当收到这种情况下的JoinGroupRequest请求时,其实此时对应的GroupMetadata的状态是Stable,那么这一次的处理其实会调用updateMemberAndRebalance(...)方法更新客户端对应的MemberMetadata对象,然后切换GroupMetadata的状态为PreparingRebalance,创建对应的DelayedJoin延迟任务。

因此,我们可以总结得到,只有在下面的两种情况下,DelayedJoin才会被创建:

  1. Consumer Group不存在,在收到该Consumer Group中第一个客户端发送JoinGroupRequest后,会创建新的GroupMetadata,并切换为PreparingRebalance状态,然后创建DelayedJoin延迟任务。
  2. Consumer Group存在,有已知的客户端要求重新加入Consumer Group时,会将对应的GroupMetadata切换为PreparingRebalance状态,然后创建DelayedJoin延迟任务。

讨论了DelayedJoin什么时候被创建,我们还需要了解DelayedJoin延迟任务什么时候被执行?

其实从上面讲解过的prepareRebalance(...)方法中我们可以看到,当提交DelayedJoin时就已经尝试了一次执行操作,这也是DelayedJoin被执行的第一种情况。该操作会执行DelayedJoin的tryComplete()方法,先观察DelayedJoin的源码:如下:

kafka.coordinator.DelayedJoin
  • /**
  • * Delayed rebalance operations that are added to the purgatory when group is preparing for rebalance
  • *
  • * Whenever a join-group request is received, check if all known group members have requested
  • * to re-join the group; if yes, complete this operation to proceed rebalance.
  • *
  • * When the operation has expired, any known members that have not requested to re-join
  • * the group are marked as failed, and complete this operation to proceed rebalance with
  • * the rest of the group.
  • *
  • * 等待Consumer Group中所有的消费者发送JoinGroupRequest申请加入。
  • * 每当处理完新收到的JoinGroupRequest时,都会检测相关的DelayedJoin是否能够完成,
  • * 经过一段时间的等待,DelayedJoin也会到期执行。
  • *
  • * @param coordinator GroupCoordinator对象
  • * @param group DelayedJoin对应的GroupMetadata对象
  • * @param sessionTimeout 到期时长,是GroupMetadata中所有Member设置的超时时间的最大值
  • */
  • private[coordinator] class DelayedJoin(coordinator: GroupCoordinator,
  • group: GroupMetadata,
  • sessionTimeout: Long)
  • extends DelayedOperation(sessionTimeout) {
  • // 三个方法调用的都是GroupCoordinator的方法
  • override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete)
  • override def onExpiration() = coordinator.onExpireJoin()
  • // 当已知Member都已申请重新加入或DelayedJoin到期时执行该方法
  • override def onComplete() = coordinator.onCompleteJoin(group)
  • }

从源码可知,它的三个操作都委托给了GroupCoordinator对象。其中tryComplete()会调用GroupCoordinator的tryCompleteJoin(...),源码如下:

kafka.coordinator.GroupCoordinator#tryCompleteJoin
  • def tryCompleteJoin(group: GroupMetadata, forceComplete: () => Boolean) = {
  • group synchronized {
  • // 判断已知的Member是否已经申请加入
  • if (group.notYetRejoinedMembers.isEmpty) // 不为空表示还有Member没有加入Group
  • // 尝试强制完成
  • forceComplete()
  • else false
  • }
  • }

此方法内部使用到GroupMetadata的notYetRejoinedMembers()方法:

kafka.coordinator.GroupMetadata#notYetRejoinedMembers
  • /**
  • * 还没有加入Group的Member列表,使用其awaitingJoinCallback是否为空进行判断
  • * awaitingJoinCallback是个非常重要的标记,它用来判断一个Member是否已经申请加入
  • */
  • def notYetRejoinedMembers = members.values.filter(_.awaitingJoinCallback == null).toList

如果notYetRejoinedMembers()方法返回的列表为空,也即是GroupMetadata中所有的MemberMetadata的awaitingJoinCallback字段都为空,则执行forceComplete()尝试强制完成DelayedJoin任务。

对于消费者客户端来说,只要它发送了JoinGroupRequest请求尝试加入Consumer Group,最终该客户端在GroupMetadata中对应的MemberMetadata对象的awaitingJoinCallback字段就会记录来自于KafkaApis的handleJoinGroupRequest(...)中定义的sendResponseCallback(...)回调函数,该awaitingJoinCallback字段也可以作为判断一个客户端是否已经申请加入Consumer Group的标记;notYetRejoinedMembers()方法正是通过判断该字段是否为空来判断是否所有的客户端都已经重新申请加入Consumer Group了。

其实这里有一个容易被误解的地方,从notYetRejoinedMembers()方法的名称来看,它是针对于“Rejoined”的Member而言的,而对于一个新的Consumer Group,企图加入该Consumer Group的所有客户端所对应的awaitingJoinCallback字段必然不为空,也即是说,对于角色而Group Leader的客户端而言,为它创建的DelayedJoin延迟任务会被立即执行(即使此时有其它的客户端发送了JoinGroupRequest请求,这些客户端对应的MemberMetadata的awaitingJoinCallback字段也必然不为空)。

“Rejoined”的情况,是针对出现宕机恢复后重新加入Consumer Group的客户端而言的。这里有一个细节,awaitingJoinCallback字段记录的回调函数在执行之后,awaitingJoinCallback字段会被置为null,也即是说,如果有客户端宕机退出了Consumer Group,它在GroupMetadata中对应的MemberMetadata的awaitingJoinCallback字段是null,而当该客户端尝试重新加入Consumer Group而发送JoinGroupRequest请求之后,它对应的MemberMetadata的awaitingJoinCallback字段又会被赋值为handleJoinGroupRequest(...)中定义的sendResponseCallback(...)回调函数,且对应的GroupMetadata会切换为PreparingRebalance状态同时会创建对应的DelayedJoin延迟任务;而此时notYetRejoinedMembers()方法的结果就不是空列表了,因为该Consumer Group已存在的多个客户端的MemberMetadata的awaitingJoinCallback字段是null,因此此时是不会执行forceComplete()尝试强制完成DelayedJoin任务的。

这也就促成了DelayedJoin任务被执行的第二种情况:超时。在以前的文章中讲解过,对于DelayedJoin这类延迟任务是存在超时时间的,一旦超时也会调用其forceComplete()方法尝试强制完成。forceComplete()方法内部会调用onComplete()方法,也即是GroupCoordinator的onCompleteJoin(...)方法。

在讲解onCompleteJoin(...)方法之前,我们还需要明白一个很重要的细节,也就是客户端宕机恢复后重新加入Consumer Group的情况的处理,此时定然会触发Rebalance操作重新均衡Consumer Group内的客户端的分区分配,那么组内其他的处于正常状态的客户端如何知道需要进行Rebalance操作呢?这就是由客户端与服务端的心跳操作实现的。在上面我们提到过,GroupMetadata在正常状态下是处于Stable状态的,一旦收到JoinGroupRequest请求后如果检查通过会切换为PreparingRebalance状态,而处于PreparingRebalance状态的GroupMetadata在收到客户端的心跳请求后是会返回REBALANCE_IN_PROGRESS错误码的,回顾处理HeartbeatRequest请求的源码片段:

kafka.coordinator.GroupCoordinator#handleHeartbeat
  • // 处理HeartbeatRequest请求的方法
  • def handleHeartbeat(groupId: String,
  • memberId: String,
  • generationId: Int,
  • responseCallback: Short => Unit) {
  • ...
  • else if (!group.is(Stable)) { // 状态不为Stable,返回REBALANCE_IN_PROGRESS错误码
  • responseCallback(Errors.REBALANCE_IN_PROGRESS.code)
  • }
  • ...
  • }

而KafkaConsumer客户端在心跳任务收到REBALANCE_IN_PROGRESS错误码时,会标记rejoinNeeded为true,重新发送JoinGroupRequest请求尝试重新加入Consumer Group,回顾源码片段:

org.apache.kafka.clients.consumer.internals.AbstractCoordinator.HeartbeatCompletionHandler#handle
  • public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
  • ...
  • else if (error == Errors.REBALANCE_IN_PROGRESS) {
  • // 正在Rebalance过程中
  • log.debug("Attempt to heart beat failed for group {} since it is rebalancing.", groupId);
  • // 标记rejoinNeeded为true,重新发送JoinGroupRequest请求尝试重新加入Consumer Group
  • AbstractCoordinator.this.rejoinNeeded = true;
  • // 抛出异常
  • future.raise(Errors.REBALANCE_IN_PROGRESS);
  • }
  • ...
  • }

因此我们可以知道,一旦有客户端发送了JoinGroupRequest必然会引起Rebalance操作,此时其它正常运行的客户端也会因为GroupMetadata切换为PreparingRebalance状态而收到执行Rebalance的指令,从而向服务端发送JoinGroupRequest请求。因此这也就能解释notYetRejoinedMembers()方法的“Rejoined”情况了。

在明白这些内容之后,我们来分析GroupCoordinator的onCompleteJoin(...)方法源码如下:

kafka.coordinator.GroupCoordinator#onCompleteJoin
  • // 当已知Member都已申请重新加入或DelayedJoin到期时执行该方法
  • def onCompleteJoin(group: GroupMetadata) {
  • group synchronized { // 加锁
  • // 获取未重新加入的已知Member集合
  • val failedMembers = group.notYetRejoinedMembers
  • if (group.isEmpty || !failedMembers.isEmpty) {
  • failedMembers.foreach { failedMember =>
  • group.remove(failedMember.memberId) // 移除未加入的已知Member
  • // TODO: cut the socket connection to the client
  • }
  • // TODO KAFKA-2720: only remove group in the background thread
  • // 如果GroupMetadata中已经没有Member,就将GroupMetadata切换成Dead状态并从groupsCache中移除
  • if (group.isEmpty) {
  • // 切换成Dead状态
  • group.transitionTo(Dead)
  • // 从GroupMetadataManager的groupsCache集合中移除Group
  • groupManager.removeGroup(group)
  • info("Group %s generation %s is dead and removed".format(group.groupId, group.generationId))
  • }
  • }
  • if (!group.is(Dead)) {
  • // 递增generationId,选择该Consumer Group最终使用的PartitionAssignor
  • group.initNextGeneration() // 此处还会将GroupMetadata的状态转换为AwaitingSync
  • info("Stabilized group %s generation %s".format(group.groupId, group.generationId))
  • // trigger the awaiting join group response callback for all the members after rebalancing
  • // 向GroupMetadata中所有的Member发送JoinGroupResponse响应
  • for (member <- group.allMemberMetadata) {
  • assert(member.awaitingJoinCallback != null)
  • // 构造响应结果,Leader和Follower是不同的
  • val joinResult = JoinGroupResult(
  • // Group Leader会收到相应的分配结果
  • members=if (member.memberId == group.leaderId) { group.currentMemberMetadata } else { Map.empty },
  • memberId=member.memberId,
  • generationId=group.generationId,
  • subProtocol=group.protocol,
  • leaderId=group.leaderId,
  • errorCode=Errors.NONE.code)
  • // 调用回调函数
  • member.awaitingJoinCallback(joinResult)
  • member.awaitingJoinCallback = null
  • // 重置心跳延迟任务
  • completeAndScheduleNextHeartbeatExpiration(group, member)
  • }
  • }
  • }
  • }

该方法中首先将未重新申请加入的已知Member删除,如果GroupMetadata中不再包含任何Member,则将Consumer Grouop转换成Dead状态,删除对应的GroupMetadata对象并向Offsets Topic中对应的分区写入“删除标记”;否则会更新generationId切换GroupMetadata的状态为AwaitingSync以等待SyncGroupRequest请求,然后调用所有MemberMetadata的awaitingJoinCallback(...)回调函数发送JoinGroupResponse响应。

需要注意的是,针对新Consumer Group创建的GroupMetadata而言,显然不会执行if (group.isEmpty || !failedMembers.isEmpty) {...}分支的代码。

5. LeaveGroupRequest的处理

当消费者离开Consumer Group,会向GroupCoordinator发送LeaveGroupRequest请求,该请求由KafkaApis的handleLeaveGroupRequest(...)方法处理,源码如下:

kafka.server.KafkaApis#handleLeaveGroupRequest
  • // 处理LeaveGroupRequest请求
  • def handleLeaveGroupRequest(request: RequestChannel.Request) {
  • // 转换请求体为LeaveGroupRequest对象
  • val leaveGroupRequest = request.body.asInstanceOf[LeaveGroupRequest]
  • // 构造响应头
  • val respHeader = new ResponseHeader(request.header.correlationId)
  • // the callback for sending a leave-group response
  • // 定义回调函数
  • def sendResponseCallback(errorCode: Short) {
  • val response = new LeaveGroupResponse(errorCode)
  • trace("Sending leave group response %s for correlation id %d to client %s."
  • .format(response, request.header.correlationId, request.header.clientId))
  • requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, respHeader, response)))
  • }
  • if (!authorize(request.session, Read, new Resource(Group, leaveGroupRequest.groupId))) { // 检查授权
  • // 授权未通过
  • val leaveGroupResponse = new LeaveGroupResponse(Errors.GROUP_AUTHORIZATION_FAILED.code)
  • requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, leaveGroupResponse)))
  • } else {
  • // let the coordinator to handle leave-group
  • // 授权通过,交给GroupCoordinator的handleLeaveGroup()处理
  • coordinator.handleLeaveGroup(
  • leaveGroupRequest.groupId(),
  • leaveGroupRequest.memberId(),
  • sendResponseCallback)
  • }
  • }

该方法在检查授权通过后会交由GroupCoordinator的handleLeaveGroup(...)方法处理:

kafka.coordinator.GroupCoordinator
  • // 处理LeaveGroupRequest
  • def handleLeaveGroup(groupId: String, consumerId: String, responseCallback: Short => Unit) {
  • if (!isActive.get) { // 检查GroupCoordinator是否在运行
  • // 调用回调函数,返回错误码GROUP_COORDINATOR_NOT_AVAILABLE
  • responseCallback(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code)
  • } else if (!isCoordinatorForGroup(groupId)) { // 判断当前GroupCoordinator是否负责管理该Consumer Group
  • // 调用回调函数,返回错误码NOT_COORDINATOR_FOR_GROUP
  • responseCallback(Errors.NOT_COORDINATOR_FOR_GROUP.code)
  • } else if (isCoordinatorLoadingInProgress(groupId)) { // 判断该Consumer Group对应的Offsets Topic分区是否还处于加载过程中
  • // 调用回调函数,返回错误码GROUP_LOAD_IN_PROGRESS
  • responseCallback(Errors.GROUP_LOAD_IN_PROGRESS.code)
  • } else { // 检查通过
  • // 获取对应的GroupMetadata
  • val group = groupManager.getGroup(groupId)
  • if (group == null) {
  • // if the group is marked as dead, it means some other thread has just removed the group
  • // from the coordinator metadata; this is likely that the group has migrated to some other
  • // coordinator OR the group is in a transient unstable phase. Let the consumer to retry
  • // joining without specified consumer id,
  • // GroupMetadata为空,调用回调函数,返回错误码UNKNOWN_MEMBER_ID
  • responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
  • } else {
  • // GroupMetadata不为空,要分别处理
  • group synchronized {
  • if (group.is(Dead)) {
  • // GroupMetadata状态为Dead,返回UNKNOWN_MEMBER_ID错误码
  • responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
  • } else if (!group.has(consumerId)) {
  • // GroupMetadata不管理该MemberID,返回UNKNOWN_MEMBER_ID错误码
  • responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
  • } else {
  • // 获取对应的MemberMetadata
  • val member = group.get(consumerId)
  • // 将对应MemberMetadata的isLeaving字段设置为true,尝试完成相应的DelayedHeartbeat
  • removeHeartbeatForLeavingMember(group, member)
  • // 调用onMemberFailure()方法移除对应的MemberMetadata对象并完成状态变化
  • onMemberFailure(group, member)
  • // 调用回调函数
  • responseCallback(Errors.NONE.code)
  • }
  • }
  • }
  • }
  • }
  • // 将对应MemberMetadata的isLeaving字段设置为true,尝试完成相应的DelayedHeartbeat
  • private def removeHeartbeatForLeavingMember(group: GroupMetadata, member: MemberMetadata) {
  • // 将MemberMetadata的isLeaving设置为true
  • member.isLeaving = true
  • // 尝试完成相应的DelayedHeartbeat
  • val memberKey = MemberKey(member.groupId, member.memberId)
  • heartbeatPurgatory.checkAndComplete(memberKey)
  • }
  • // 使Member下线
  • private def onMemberFailure(group: GroupMetadata, member: MemberMetadata) {
  • trace("Member %s in group %s has failed".format(member.memberId, group.groupId))
  • group.remove(member.memberId) // 将Member从对应的GroupMetadata中移除
  • group.currentState match { // 根据GroupMetadata状态进行处理
  • // 无操作
  • case Dead =>
  • // 之前的分区分配可能已经失效了,将GroupMetadata切换成PreparingRebalance状态
  • case Stable | AwaitingSync => maybePrepareRebalance(group)
  • // GroupMetadata中的Member减少,可能满足DelayedJoin的执行条件,尝试执行
  • case PreparingRebalance => joinPurgatory.checkAndComplete(GroupKey(group.groupId))
  • }
  • }

handleLeaveGroup(...)方法会将对应MemberMetadata的isLeaving字段设置为true,并尝试完成相应的DelayedHeartbeat,之后将对应的MemberMetadata对象从GroupMetadata中删除。最后Consumer Group的状态决定是否进行Consumer Group状态切换,是否尝试完成对应的DelayedJoin。