1. GroupState
在前面我们有提到过,GroupMetadata中使用GroupState特质来表示其状态,该状态也即是Consumer Group的状态;GroupState有四个实现类分别代表了Consumer Group所处的四种不同状态,源码如下:
这四种状态的含义和解释如下:
- PreparingRebalance:Consumer Group当前正在准备进行Rebalance操作。此时GroupCoordinator可以正常地处理OffsetFetchRequest、LeaveGroupRequest、OffsetCommitRequest,但对于收到的HeartbeatRequest和SyncGroupRequest,则会在其响应中携带REBALANCE_IN_PROGRESS错误码进行标识。当收到JoinGroupRequest时,GroupCoordinator会先创建对应的DelayedJoin,等待条件满足后对其进行响应。
- AwaitingSync:Consumer Group当前正在等待Consumer Group的Consumer Leader将分区的分配结果发送到GroupCoordinator。即表示正在等待Consumer Group的Consumer Leader的SyncGroupRequest。当GroupCoordinator收到OffsetCommitRequest和HeartbeatRequest请求时,会在其响应中携带REBALANCE_IN_PROGRESS错误码进行标识。对于来自Consumer Follower的SyncGroupRequest时仅仅是处理回调相关的操作,不做多余的工作,直到收到Consumer Leader的SyncGroupRequest时一起响应。
- Stable:标识Consumer Group处于正常状态,这也是Consumer Group的初始状态。此时的GroupCoordinator可以处理所有的请求,如:OffsetFetchRequest、HeartbeatRequest、OffsetCommitRequest、JoinGroupRequest、来自Consumer Group中现有Member的SyncGroupRequest。
- Dead:处于此状态的Consumer Group中已经没有Member存在了,其对应的GroupMetadata也将被删除。对于此状态的Consumer Group,除了OffsetCommitRequest,其他请求的响应中都会携带UNKNOWN_MEMBER_ID错误码进行标识。
对于这四种状态之间,有如下的转换示意图:
2. GroupCoordinator概览
GroupCoordinator依赖于前面讲解的GroupMetadataManager,Kafka的每个Broker都会拥有一个GroupCoordinator,它的定义和重要字段如下:
在GroupCoordinator的定义中,groupManager
是当前GroupCoordinator关联的GroupMetadataManager实例;heartbeatPurgatory
和joinPurgatory
是用于处理DelayedHeartbeat和DelayedJoin延迟任务的炼狱对象;我们需要注意的是groupConfig
和offsetConfig
两个字段,它们是用于记录Consumer Group和OffsetMetadata相关配置项的样例类,源码如下:
- GroupConfig源码:
- OffsetConfig源码:
从OffsetConfig的源码可以得知关于__consumer_offsets
主题的各项默认配置。
3. HeartbeatRequest的处理
在前面讲解Kafka消费者的运行原理时我们知道,每个消费者都会通过HeartbeatTask任务定期向GroupCoordinator发送HeartbeatRequest,告知GroupCoordinator自己正常在线。HeartbeatRequest由KafkaApis的handleHeartbeatRequest(...)
方法处理:
该方法会在授权检查通过后将HeartBeatRequest交给GroupCoordinator的handleHeartbeat(...)
方法处理,注意其内部定义的sendResponseCallback(...)
回调函数会作为handleHeartbeat(...)
方法的最后一个参数传入;handleHeartbeat(...)
方法的源码如下:
handleHeartbeat(...)
方法内也会做几项检查,这里主要注意的是,只有在对应的GroupMetadata存在且状态为Stable时才会进行处理,当GroupMetadata为AwaitingSync或PreparingRebalance会返回REBALANCE_IN_PROGRESS错误码。
在检查通过后会直接调用completeAndScheduleNextHeartbeatExpiration(...)
,该方法中会更新收到此Member心跳的时间戳,尝试执行其对应的DelayedHeartbeat,并创建新的DelayedHeartbeat对象放入heartbeatPurgatory
中等待下次心跳到来或DelayedHeartbeat超时:
很明显,心跳检测其实是通过DelayedHeartbeat延迟任务来实现的。我们其实可以追溯到第一次DelayedHeartbeat任务的添加时机,是在消费者向服务端发送SyncGroupRequest被正确处理时就添加了。
另外,completeAndScheduleNextHeartbeatExpiration(...)
会在多个方法中被调用,如处理JoinGroupRequest、SyncGroupRequest、OffsetCommitRequest请求,处理GroupCoordinator迁移等操作中都会调用:
- GroupCoordinator.completeAndScheduleNextHeartbeatExpiration(...)
- ↖ GroupCoordinator.propagateAssignment(...)
- ↖ GroupCoordinator.handleHeartbeat(...)
- ↖ GroupCoordinator.onGroupLoaded(...)
- ↖ GroupCoordinator.doSyncGroup(...)
- ↖ GroupCoordinator.onCompleteJoin(...)
- ↖ GroupCoordinator.handleCommitOffsets(...)
3.1. DelayedHeartbeat延迟任务
接下来我们了解一下DelayedHeartbeat延迟任务的实现,它的源码如下:
DelayedHeartbeat与DelayedJoin一样,将所有操作都委托给了GroupCoordinator的相关方法。
在添加DelayedHeartbeat任务时就会尝试一次执行,调用的是tryComplete()
方法,也即是GroupCoordinator的tryCompleteHeartbeat(...)
方法,该方法会检测下列四个条件,如果满足其中的任意一个条件,则认为DelayedHeartbeat符合执行条件:
- 最后一次收到心跳信息的时间与
heartbeatDeadline
的差距大于sessionTimeout
。 awaitingJoinCallback
不为null,即消费者正在等待JoinGroupResponse。awaitingSyncCallback
不为null,即消费者正在等待SyncGroupResponse。- 消费者已经离开了Consumer Group。
源码如下:
forceComplete()
中调用的是onComplete()
,最终还是调用的GroupCoordinator的onCompleteHeartbeat()
,而该方法是空实现,没有任何动作。
当DelayedHeartbeat到期时会调用onExpiration()
方法,该方法会调用GroupCoordinator的onExpireHeartbeat(...)
方法,它会将其对应的MemberMetadata从GroupMetadata中删除,并按照当前GroupMetadata所处的状态进行分类处理:
4. JoinGroupRequest的处理
回顾前面讲解的Kafka消费者的运行原理中提到过,Consumer Group的Rebalance操作主要通过JoinGroupRequest请求和JoinGroupResponse响应来完成。JoinGroupRequest是消费者发往GroupCoordinator节点用于获取分区信息的请求对象,如果有多个消费者同时发送JoinGroupRequest请求进行获取,最终返回的JoinGroupResponse响应中,会有一个消费者被标识为Consumer Group的Consumer Leader,其他的节点均为Consumer Group的Consumer Follower。JoinGroupRequest在API层是由KafkaApis的handleJoinGroupRequest(...)
方法负责处理的,源码如下:
从源码可知,该方法在经过权限验证之后,会将具体的处理委托给GroupCoordinator的handleJoinGroup(...)
方法,注意此处传递的参数,从JoinGroupRequest中得到的Group ID、Member ID、Client ID、Client Address以及定义的sendResponseCallback
回调函数都会一并传入;handleJoinGroup(...)
方法的源码如下:
GroupCoordinator的handleJoinGroup(...)
方法也会执行一系列的检查,然后根据Group ID尝试从GroupMetadataManager中获取对应的GroupMetadata对象,这里会分三种情况分别进行处理:
- 如果对应的GroupMetadata不存在,但消费者的Member ID不为空,说明消费者发送了错误的JoinGroupRequest,此时会返回UNKNOWN_MEMBER_ID错误码。
- 如果对应的GroupMetadata不存在,且消费者的Member ID为空,说明这个JoinGroupRequest可能是某个新的Consumer Group中的第一个消费者发送的,需要创建新的GroupMetadata,然后交由
doJoinGroup(...)
方法处理。 - 如果对应的GroupMetadata存在,就直接交给
doJoinGroup(...)
方法处理。
从上面三种情况可知,doJoinGroup(...)
方法是处理该请求的主要方法,源码如下:
doJoinGroup(...)
方法需要检测每个消费者支持的PartitionAssignor集合与GroupMetadata中的候选PartitionAssignor集合(即candidateProtocols
字段)是否有交集,以便确定消费者支持的PartitionAssignor,如果无法支持则返回INCONSISTENT_GROUP_PROTOCOL错误码;还会检测消费者的Member ID,如果消费者携带的Member ID不为空,说明该JoinGroupRequest是来自Consumer Goup中已知的Member,此时需要检测Member ID对应的MemberMetadata是否能被GroupMetadata识别,如果无法识别则返回UNKNOWN_MEMBER_ID错误码。
在两项检查通过后,会根据Consumer Group的状态分别进行处理:
Dead状态:直接返回UNKNOWN_MEMBER_ID错误码即可。
PreparingRebalance状态:此时需要根据JoinGroupRequest请求携带的Member ID分别处理;如果携带的Member ID为空,则创建对应的MemberMetadata,分配Member ID,并将MemberMetadata加入GroupMetadata中。也有可能JoinGroupRequest是来自Consumer Goup中已知的Member,请求会携带之前被分配过的Member ID,此时会更新GroupMetadata中记录的Member信息。
AwaitingSync状态:该状态下也会根据Member ID进行分别处理;如果Member ID为空,表示是新Member申请加入,则与PreparingRebalance状态一样调用addMemberAndRebalance(...)
方法进行处理。如果Member ID不为空,表示是已知的Member重新申请加入,则要区分Member支持的PartitionAssignor是否发生了变化,若未发生变化,则将当前GroupMetadata中存储的MemberMetadata集合信息返回给Consumer Group的Consumer Leader。若发生变化,则更新Member对应的MemberMetadata信息,这一步由上面提到的updateMemberAndRebalance(...)
方法完成。
Stable状态:该状态下同样会根据Member ID进行分别处理;如果是未知的新Member申请加入,则创建MemberMetadata对象,分配Member ID,并加入GroupMetadata中。然后调用将Consumer Group状态切换为PreparingRebalance。如果是已知Member重新申请加入,则要区分Member支持的PartitionAssignor是否发生了变化:如果发生变化或者发送JoinGroupRequest请求的是Consumer Group的Consumer Leader,则更新对应的MemberMetadata信息,并将Consumer Group状态切换为PreparingRebalance;如果未发生变化则将GroupMetadata的当前状态返回,消费者会发送SyncGroupRequest继续后面的操作。
doJoinGroup(...)
方法的最后会根据当前的状态决定是否尝试完成相关的DelayedJoin操作。
在上面的四个状态对应的操作中,添加新的MemberMetadata对象和更新已存在的MemberMetadata对象的信息这两个操作分别由addMemberAndRebalance(...)
方法和updateMemberAndRebalance(...)
实现,源码如下:
注:我们注意这两个方法中创建MemberMetadata时赋值给
awaitingJoinCallback
的参数callback
,它其实来自于KafkaApis的handleJoinGroupRequest(...)
中定义的sendResponseCallback(...)
回调函数;同时,MemberMetadata的awaitingJoinCallback
不仅是一个回调函数,还是一个非常重要的标识,它会参与检测所有已知MemberMetadata是否已经发送JoinGroupRequest申请重新加入。
在这两个方法的最后都会调用maybePrepareRebalance(...)
方法,这个方法在前面的文章中提到过,它会判断Consumer Group的状态,如果是Stable或AwaitingSync,则会调用prepareRebalance(...)
方法将状态切换成PreparingRebalance,并创建相应的DelayedJoin:
对于AwaitingSync状态的Consumer Group来说,有的Consumer Group的Consumer Follower已经发送了SyncGroupRequest,GroupCoordinator在等待Consumer Group的Consumer Leader通过SyncGroupRequest将分区的分配结果发送过来。如果此时进行状态切换,需要对这些已经发送SyncGroupRequest的Consumer Follower返回错误码,这部分操作在resetAndPropagateAssignmentError()
方法中完成:
4.1. DelayedJoin延迟任务
在分析DelayedJoin之前,我们先关注一个问题,DelayedJoin什么时候才会被创建?其实这里需要从JoinGroupRequest请求的性质来讨论。在前面的内容中,我们提到JoinGroupRequest请求可能来自未知的消费者,也可能来自已知的消费者。
我们先讨论前一种情况:即JoinGroupRequest请求来自未知的消费者,此时该消费者的意图其实是“新消费者请求加入”;这里有一个极端的情况,即Consumer Group对应的GroupMetadata不存在,所有JoinGroupRequest请求都来自未知的消费者,这种情况下该Consumer Group会被新创建,即需要创建新的GroupMetadata,并从所有发送JoinGroupRequest请求的消费者中选择一个Consumer Group的Consumer Leader(默认为Group内的第一个消费者);注意,创建新GroupMetadata的操作是在GroupCoordinator的handleJoinGroup(...)
方法之前执行的,也就是在调用doJoinGroup(...)
之前,且新创建的GroupMetadata的初始状态是Stable,因此在doJoinGroup(...)
中处理JoinGroupRequest请求时,对于该GroupMetadata中的第一个消费者会调用addMemberAndRebalance(...)
方法为其创建对应的MemberMetadata对象并添加到GroupMetadata中,然后切换GroupMetadata的状态为PreparingRebalance,创建对应的DelayedJoin延迟任务。
在这种极端情况结束后,此时GroupMetadata已经创建且状态切换为PreparingRebalance了,对于在此之后的新的消费者发送的JoinGroupRequest请求,在doJoinGroup(...)
方法中会执行GroupMetadata状态为PreparingRebalance分支,即为该消费者创建对应的MemberMetadata对象并添加到GroupMetadata中,但由于此时GroupMetadata状态已经转换为PreparingRebalance,因此并不会执行prepareRebalance(...)
方法来为消费者端创建DelayedJoin延迟任务,仅仅只是将KafkaApis的handleJoinGroupRequest(...)
中定义的sendResponseCallback(...)
回调函数记录到了对应的MemberMetadata的awaitingJoinCallback
字段上。
不过,在Consumer Group处理完所有Consumer的加入,并转换为Stable状态之后,如果有新的Consumer要求加入时,此时GroupCoordinator也会为新消费者分配Member ID及创建对应的MemberMetadata并添加到对应的GroupMetadata,然后将状态切换为PreparingRebalance,创建对应的DelayedJoin延迟任务。
对于第二种情况:即JoinGroupRequest请求来自已知消费者,这种情况是由于消费者之前出现故障宕机退出了,当再次上线后发送JoinGroupRequest要求重新加入之前的Consumer Group。这里需要先了解GroupMetadata的状态切换:初始化时为Stable,收到Consumer Group的Consumer Leader的JoinGroupRequest后切换为PreparingRebalance,在执行完DelayedJoin之后会切换为AwaitingSync(后面会讲解),在处理完SyncGroupRequest之后又会切换为Stable;因此当收到这种情况下的JoinGroupRequest请求时,其实此时对应的GroupMetadata的状态是Stable,那么这一次的处理其实会根据发送JoinGroupRequest的新消费者所支持的分区分配协议来分别处理,如果新消费者不支持当前Consumer Group已确定的分区分配协议,则调用updateMemberAndRebalance(...)
方法更新消费者对应的MemberMetadata对象,然后切换GroupMetadata的状态为PreparingRebalance,创建对应的DelayedJoin延迟任务;如果新消费者支持当前Consumer Group已确定的分区分配协议,则直接回复JoinGroupResponse,以便让消费者继续发送SyncGroupRequest请求以获取分配给自己的分区。
因此,我们可以总结得到,只有在下面的三种情况下,DelayedJoin才会被创建:
- Consumer Group不存在,在收到该Consumer Group中第一个消费者发送JoinGroupRequest后,会创建新的GroupMetadata,并切换为PreparingRebalance状态,然后创建DelayedJoin延迟任务。
- Consumer Group已存在,对应的GroupMetadata状态为Stable时,此时说明该Consumer Group已经稳定运行了一段时间,当有新的未知消费者要求加入Consumer Group,会将对应的GroupMetadata切换为PreparingRebalance状态,然后创建DelayedJoin延迟任务。
- Consumer Group存在,有已知的消费者要求重新加入Consumer Group,且该消费者不支持当前Consumer Group已确定的分区分配协议,则会将对应的GroupMetadata切换为PreparingRebalance状态,然后创建DelayedJoin延迟任务。
讨论了DelayedJoin什么时候被创建,我们还需要了解DelayedJoin延迟任务什么时候被执行?
其实从上面讲解过的prepareRebalance(...)
方法中我们可以看到,当提交DelayedJoin时就已经尝试了一次执行操作,这也是DelayedJoin被执行的第一种情况。该操作会执行DelayedJoin的tryComplete()
方法,先观察DelayedJoin的源码:如下:
从源码可知,它的三个操作都委托给了GroupCoordinator对象。其中tryComplete()
会调用GroupCoordinator的tryCompleteJoin(...)
,源码如下:
此方法内部使用到GroupMetadata的notYetRejoinedMembers()
方法:
如果notYetRejoinedMembers()
方法返回的列表为空,也即是GroupMetadata中所有的MemberMetadata的awaitingJoinCallback
字段都为空,则执行forceComplete()
尝试强制完成DelayedJoin任务。
对于消费者来说,只要它发送了JoinGroupRequest请求尝试加入Consumer Group,最终该消费者在GroupMetadata中对应的MemberMetadata对象的awaitingJoinCallback
字段就会记录来自于KafkaApis的handleJoinGroupRequest(...)
中定义的sendResponseCallback(...)
回调函数,该awaitingJoinCallback
字段也可以作为判断一个消费者是否已经申请加入Consumer Group的标记;notYetRejoinedMembers()
方法正是通过判断该字段是否为空来判断是否所有的消费者都已经重新申请加入Consumer Group了。
其实这里有一个容易被误解的地方,从notYetRejoinedMembers()
方法的名称来看,它是针对于“Rejoined”的Member而言的,而对于一个新的Consumer Group,企图加入该Consumer Group的所有消费者所对应的awaitingJoinCallback
字段必然不为空,也即是说,对于角色为Consumer Leader的消费者而言,为它创建的DelayedJoin延迟任务会被立即执行(即使此时有其它的消费者发送了JoinGroupRequest请求,这些消费者对应的MemberMetadata的awaitingJoinCallback
字段也必然不为空)。
“Rejoined”的情况,是针对出现宕机恢复后重新加入Consumer Group的消费者而言的。这里有一个细节,awaitingJoinCallback
字段记录的回调函数在执行之后会被置为null,也即是说,如果有消费者宕机退出了Consumer Group,它在GroupMetadata中对应的MemberMetadata的awaitingJoinCallback
字段是null,而当该消费者尝试重新加入Consumer Group而发送JoinGroupRequest请求之后,它对应的MemberMetadata的awaitingJoinCallback
字段又会被赋值为handleJoinGroupRequest(...)
中定义的sendResponseCallback(...)
回调函数,且对应的GroupMetadata会切换为PreparingRebalance状态同时创建对应的DelayedJoin延迟任务;而此时notYetRejoinedMembers()
方法的结果就不是空列表了,因为该Consumer Group其他已存在的多个消费者的MemberMetadata的awaitingJoinCallback
字段是null,因此此时是不会执行forceComplete()
尝试强制完成DelayedJoin任务的。
这也就促成了DelayedJoin任务被执行的第二种情况:超时。在以前的文章中讲解过,对于DelayedJoin这类延迟任务是存在超时时间的,一旦超时也会调用其forceComplete()
方法尝试强制完成。forceComplete()
方法内部会调用onComplete()
方法,也即是GroupCoordinator的onCompleteJoin(...)
方法。
在讲解onCompleteJoin(...)
方法之前,我们还需要明白一个很重要的细节,当某个消费者宕机之后,服务端由于一直收不到它的心跳请求,因此它对应的DelayedHeartbeat延迟任务会超时执行,GroupCoordinator会将该消费者所在的Consumer Group的状态切换为PreparingRebalance;同时,对消费者宕机恢复后重新加入Consumer Group的情况的处理过程中,也定然会触发Rebalance操作重新均衡Consumer Group内的消费者的分区分配,那么组内其他的处于正常状态的消费者如何知道需要进行Rebalance操作呢?这就是由消费者与服务端的心跳操作实现的。在上面我们提到过,GroupMetadata在正常状态下是处于Stable状态的,一旦某个消费者心跳操作超时,或收到某个消费者发送的JoinGroupRequest请求后,如果检查通过会切换为PreparingRebalance状态,而处于PreparingRebalance状态的GroupMetadata在收到消费者的心跳请求后是会返回REBALANCE_IN_PROGRESS错误码的,回顾处理HeartbeatRequest请求的源码片段:
而KafkaConsumer消费者在心跳任务收到REBALANCE_IN_PROGRESS错误码时,会标记rejoinNeeded
为true,重新发送JoinGroupRequest请求尝试重新加入Consumer Group,回顾源码片段:
因此我们可以知道,一旦有消费者心跳操作超时或发送了JoinGroupRequest请求,必然会引起Rebalance操作,此时其它正常运行的消费者也会因为GroupMetadata切换为PreparingRebalance状态而收到携带了REBALANCE_IN_PROGRESS错误码的心跳响应,从而向服务端发送JoinGroupRequest请求。因此这也就能解释notYetRejoinedMembers()
方法的“Rejoined”情况了。
在明白这些内容之后,我们来分析GroupCoordinator的onCompleteJoin(...)
方法源码如下:
该方法中首先将未重新申请加入的已知Member删除,如果GroupMetadata中不再包含任何Member,则将Consumer Grouop转换成Dead状态,删除对应的GroupMetadata对象并向Offsets Topic中对应的分区写入“删除标记”;否则会更新generationId
,切换GroupMetadata的状态为AwaitingSync以等待SyncGroupRequest请求,然后调用所有MemberMetadata的awaitingJoinCallback(...)
回调函数发送JoinGroupResponse响应。
需要注意的是,针对新Consumer Group创建的GroupMetadata而言,显然不会执行if (group.isEmpty || !failedMembers.isEmpty) {...}
分支的代码。
5. LeaveGroupRequest的处理
当消费者离开Consumer Group,会向GroupCoordinator发送LeaveGroupRequest请求,该请求由KafkaApis的handleLeaveGroupRequest(...)
方法处理,源码如下:
该方法在检查授权通过后会交由GroupCoordinator的handleLeaveGroup(...)
方法处理:
handleLeaveGroup(...)
方法会将对应MemberMetadata的isLeaving
字段设置为true,并尝试完成相应的DelayedHeartbeat,之后将对应的MemberMetadata对象从GroupMetadata中删除。最后Consumer Group的状态决定是否进行Consumer Group状态切换,是否尝试完成对应的DelayedJoin。在Consumer Group处于Stable或AwaitingSync的情况下,将切换为PreparingRebalance状态并触发新的Rebalance操作。
推荐阅读
Java多线程 46 - ScheduledThreadPoolExecutor详解(2)
ScheduledThreadPoolExecutor用于执行周期性或延时性的定时任务,它是在ThreadPoolExe...