Kafka系列 22 - 服务端源码分析 13:GroupCoordinator相关组件
发布于 / 2019-07-09
简介:主要讲解与GroupCoordinator相关的组件,包括GroupMetadata、MemberMetadata等
1. GroupCoordinator简介
在Kafka服务端,每一个Broker上都会实例化一个GroupCoordinator对象,Kafka按照Consumer Group的名称将其分配给对应的GroupCoordinator进行管理;每个GroupCoordinator只负责管理Consumer Group的一个子集,而非集群中全部的Consumer Group。每个GroupCoordinator内部依赖于一个GroupMetadata实例,用于记录Consumer Group的元数据信息,而每个GroupMetadata内部会维护一个HashMap字典,使用MemberMetadata记录了属于自己维护的Consumer Group的客户端信息,它们之间的关系示意图如下:
GroupCoordinator有几项比较重要的功能:
- 负责处理JoinGroupRequest和SyncGroupRequest,完成Consumer Group中分区的分配工作;
- 通过GroupMetadataManager和内部主题
__consumer_offsets
维护Offset信息,以便消费者获知分区的消费位置; - 记录Consumer Group的相关信息,即使Broker宕机导致Consumer Group由新的GroupCoordinator进行管理,新GroupCoordinator也可以知道Consumer Group中每个消费者负责处理哪个分区等信息;
- 通过心跳消息检测消费者的状态。
这些功能是通过GroupCoordinator与MemberMetadata、GroupMetadata及GroupMetadataManager等组件相互配合完成的,下面我们先分别了解这些依赖组件。
2. MemberMetadata
MemberMetadata是用于记录消费者元数据的类,它的定义和重要字段如下:
在MemberMetadata类中,将客户端都称作为Member,assignment
将记录分配给当前客户端的分区信息;awaitingJoinCallback
和awaitingSyncCallback
分别是处理JoinGroupRequest和SyncGroupRequest时会用到的回调函数,后面会详细介绍;latestHeartbeat
则是GroupCoordinator最后一次收到该客户端心跳的时间,用于判断客户端是否存活。
MemberMetadata类提供了从给定候选PartitionAssignor中选择消费者支持的PartitionAssignor的方法vote(candidates: Set[String]): String
,源码如下:
另外MemberMetadata还提供了一些辅助方法:
MemberMetadata类的整体实现比较简单,关于它的使用也将在后面讲解。
3. GroupMetadata
GroupMetadata记录了Consumer Group的元数据信息,它的定义和重要字段如下:
member
字段是一个HashMap,用于维护当前GroupMetadata管理的MemberMetadata对象,它的键为分配给客户端的Member ID,值为表示该Member元数据的MemberMetadata对象,这里先了解一下Member ID的生成方式,它对应于多处源码,这里将其过程单独抽取出来:
- // 生产Member ID为"客户端ID-UUID随机字符串"
- val memberId = clientId + "-" + group.generateMemberIdSuffix
- ...
- def generateMemberIdSuffix = UUID.randomUUID().toString
可见,Member ID其实是以客户端ID和UUID随机字符串拼接而成的。
GroupMetadata的generationId
标识了当前Consumer Group的年代信息;leaderId
则记录了当前Consumer Group中的Leader消费者的memberId
;protocol
用于记录当前Consumer Group选择的PartitionAssignor。
state
字段是一个GroupState类型的对象,用于表示Consumer Group的状态;GroupState其实是一个特质:
GroupState有四个子类,分别代表GroupCoordinator的四种状态:PreparingRebalance、AwaitingSync、Stable和Dead。关于GroupState的内容后面会详细介绍。
GroupMetadata提供了Member的添加和移除,源码如下:
我们可以发现,在添加Member时会将第一个加入的Member作为Group Leader,而在移除Member时,如果移除了角色为Group Leader的Member,也会重新选择Member集合中的第一个Member作为Group Leader。
GroupMetadata需要根据当前Consumer Group中所有Consumer支持的PartitionAssignor来选择合适的PartitionAssignor,这个过程由它的selectProtocol: String
方法完成:
4. GroupTopicPartition和OffsetAndMetadata
GroupCoordinator需要知道每个Consumer Group对应着主题的哪些分区,同时还要记录每个分区的消费情况,它会使用GroupTopicPartition维护Consumer Group与分区的消费关系,使用OffsetAndMetadata记录offset的相关信息,它们都是样例类,源码如下:
5. GroupMetadataManager
GroupMetadataManager是GroupCoordinator中负责管理Consumer Group元数据以及其对应Offset信息的组件。GroupMetadataManager底层使用Offsets Topic(即内部主题__consumer_offsets
),以消息的形式存储Consumer Group的GroupMetadata信息以及其消费的每个分区的Offset;为了提高查询的效率,GroupMetadataManager同时还会将Consumer Group的GroupMetadata信息和Offset信息在内存中维护一份相同的副本,并与Offsets Topic中存储的信息进行同步。
我们先关注GroupMetadataManager的定义和重要字段:
从上述的字段中可以得知,GroupMetadataManager使用offsetsCache
和groupsCache
两个类型为Pool的池对象存储OffsetAndMetadata及GroupMetadata对象;同时其内部维护了loadingPartitions
和ownedPartitions
两个集合分别记录Offsets Topic中正在加载及已经加载的分区。
默认情况下,Offsets Topic主题有50个分区,副本因子为3,每个分区存储了特定的Consumer Group的信息,计算分区的方式由Consumer Group的Group ID进行哈希后与总分区数量进行取模,同一Consumer Group对应的这GroupMetadata信息和Offset信息会被分配到同一个Offsets Topic分区中,相关的实现源码如下:
Offsets Topic中对消息的存储是混合连续存储的,它会将Consumer Group对应的GroupMetadata和Offset信息存储在Offsets Topic的分区中,以键进行区分二者的区别,有如下示意图:
GroupMetadata和Offset信息的键是不同的,GroupMetadataManager分别提供了groupMetadataKey(...)
和offsetCommitKey(...)
两个方法分别生成二者的键:
由源码可知,GroupMetadataKey键通过Consumer Group的ID即可生成,而OffsetCommitKey则需要通过Group ID、消费的主题、分区ID确定。
5.1. GroupMetadata池管理
groupsCache
作为GroupMetadata缓存池,存储了每个Consumer Group对应的GroupMetadata对象,GroupMetadataManager提供了对groupsCache
的各类操作,下面我们分析一下这些操作的具体实现。
5.1.1. 获取GroupMetadata
获取GroupMetadata的操作非常简单,直接从groupsCache
中按Group ID为键取即可:
5.1.2. 添加GroupMetadata
添加GroupMetadata的操作也一样,直接向groupsCache
中以Group ID为键,GroupMetadata对象为值添加即可即可:
5.1.3. 移除GroupMetadata
移除GroupMetadata的操作则相对复杂一点,不仅需要从groupsCache
池中将对应的GroupMetadata移除,还要向Offsets Topic中写入“删除标记”消息——也即是值为空的消息标记该GroupMetadata已被删除;移除GroupMetadata的操作由GroupMetadataManager的removeGroup(group: GroupMetadata)
方法实现,源码如下:
注:大家可能会疑惑,为什么添加Group的时候没有向Offsets Topic中写入消息的操作,而移除Group时有;其实添加Group时写入消息的操作是由DelayedStore延迟任务完成的,这部分会在后面讲解。
5.2. OffsetAndMetadata池管理
offsetsCache
作为OffsetAndMetadata缓存池,存储了Group中的消费者对指定分区消费时提交的Offset,GroupMetadataManager同样提供了对offsetsCache
的各类操作,下面我们分析一下这些操作的具体实现。
5.2.1. 获取OffsetAndMetadata
获取OffsetAndMetadata的操作非常简单,直接从offsetsCache
中按GroupTopicPartition对象为键获取即可,需要注意的是该方法返回的是OffsetFetchResponse.PartitionData对象:
5.2.2. 添加OffsetAndMetadata
添加OffsetAndMetadata的操作也非常简单,直接以GroupTopicPartition对象为键、OffsetAndMetadata对象为值向offsetsCache
中添加即可:
5.2.3. 定时清理OffsetAndMetadata
GroupMetadataManager没有提供对offsetsCache
池中的OffsetAndMetadata对象进行手动移除的操作,这是因为对于客户端来说,是没有移除OffsetAndMetadata的需求场景的,但GroupMetadataManager会在初始化时创建并启动一个KafkaScheduler调度器,提交名为“delete-expired-consumer-offsets”的定时任务,该任务它会周期性地调用deleteExpiredOffsets()
方法进行过期Offset的清理:
注:OffsetAndMetadata消息的过期时间是在Offset被提交时指定的,默认为提交时间后的24 * 60 * 60 * 1000L毫秒,即保留一天。
该任务的默认执行周期为600000毫秒(即600秒),deleteExpiredOffsets()
方法除了需要删除offsetsCache
池中对应的OffsetAndMetadata对象,还会向Offsets Topic中追加“删除标记”消息:
5.3. GroupCoordinatorRequest的处理
在讲解消费者的实现的时候,我们提到过,消费者在于GroupCoordinator进行交互之前,首先会发送GroupCoordinatorRequest请求到负载较小的Broker,查询管理其所在Consumer Group对应的GroupCoordinator的网络位置,然后消费者才会与该GroupCoordinator所在的Broker建立连接,发送JoinGroupRequest和SyncGroupRequest请求。
GroupCoordinatorRequest请求到达服务端之后,会由KafkaApis的handleGroupCoordinatorRequest(...)
方法负责处理,主要分为以下两步:
- 使用
partitionFor(...)
方法得到负责保存对应Consumer Group信息的Offsets Topic分区。 - 查找其MetadataCache,得到此Offsets Topic分区的Leader副本所在的Broker,其上的GroupCoordinator就负责管理该Consumer Group。
KafkaApis的handleGroupCoordinatorRequest(...)
方法的源码如下:
从源码中我们可以得知,分配给消费者的GroupCoordinator所在的Broker,选择的是该消费者所在Consumer Group所对应的__consumer_offsets
主题的分区的Leader副本所在的Broker;这也比较好理解,因为消费者需要向__consumer_offsets
中提交分区分配信息(Group内Leader消费者的操作)、获取分区分配(Group内所有消费者的操作)、提交Offset信息(Group内所有消费者的操作),因此需要与__consumer_offsets
对应分区的Leader副本进行拉取和写入数据的交互,选择该Leader副本所在Broker上的GroupCoordinator自然会省掉不必要的网络通信开销。
另外,在handleGroupCoordinatorRequest(...)
方法中获取__consumer_offsets
主题涉及到的方法的实现如下:
从上述过程的源码可知,如果获取不到__consumer_offsets
则会尝试创建,该主题的分区数可由offsets.topic.num.partitions
参数配置,副本因子则由offsets.topic.replication.factor
参数和存活的Broker数量共同决定,取二者较小的值。
5.4. GroupCoordinator的迁移
当某个Broker发生故障出现宕机之后,该Broker上的Offsets Topic相关分区的Leader副本以及GroupCoordinator和Consumer Group相关的信息会发生迁移,我们先回顾一下Broker出现故障宕机下线后会触发的操作:当Broker下线后,会删除Zookeeper中/brokers/ids
节点下对应的临时子节点,从而触发KafkaController Leader上的BrokerChangeListener监听器进行下面的流程:
- 首先从出现故障的Broker集合中移除正在正常关闭的Broker。
- 过滤得到Leader副本存在于故障Broker上的分区,将它们的状态转换为OfflinePartition。
- 再次将第2步中的分区的状态转换为OnlinePartition状态,这将触发新Leader副本的选举操作(使用OfflinePartitionLeaderSelector选举器),并发送向可用的Broker发送LeaderAndIsrRequest和UpdateMetaRequest以更新相关信息。
- 将故障Broker上的副本转换为OfflineReplica状态,并将其从对应分区的ISR集合中删除。
- 检查故障Broker上是否有待删除的主题,如果有则将其标记为不可删除。
- 向所有可用的Broker发送UpdateMetadataRequest请求以更新元数据。
从其中第3步我们可以得知,当Broker下线后,其它可用的Broker会收到KafkaController发送的LeaderAndIsrRequest请求以更新选举得到的新的Leader副本及ISR集合相关信息;LeaderAndIsrRequest请求由KafkaApis的handleLeaderAndIsrRequest(...)
方法处理,我们回顾其源码:
在该方法中定义了onLeadershipChange(...)
回调函数,会在正确处理完Leader副本选举之后完成GroupCoordinator的迁移操作。这里存在两种情况:
- 当前Broker成为Offsets Topic主题的某个分区的Leader副本时,会调用GroupCoordinator的
handleGroupImmigration(...)
方法进行处理。 - 当前Broker成为Offsets Topic主题的某个分区的Follower副本时,会调用GroupCoordinator的
handleGroupEmigration(...)
方法进行处理。
我们首先讨论第一种情况,GroupCoordinator的handleGroupImmigration(...)
方法将具体实现直接交给了GroupMetadataManager的loadGroupsForPartition(...)
方法:
GroupMetadataManager的loadGroupsForPartition(...)
方法的源码如下:
GroupMetadataManager的loadGroupsForPartition(...)
方法内部会向KafkaScheduler调度器scheduler
提交一个线程任务,任务功能由其内部函数loadGroupsAndOffsets()
实现,用于从Offsets Topic中加载GroupMetadata及OffsetAndMetadata数据,具体流程如下:
- 检测Offsets Topic中当前的分区是否正在加载。如果是,则结束本次加载操作,否则将其加入
loadingPartitions
集合,标识该分区正在进行加载。 - 通过ReplicaManager组件得到当前分区对应的Log对象。
- 从Log对象中的第一个LogSegment开始加载,加载过程中可能会碰到记录了Offset信息的消息,也有可能碰到记录GroupMetadata信息的消息,还有可能是“删除标记”消息,需要区分处理:
- 如果是记录Offset信息的消息且是“删除标记”,则删除offsetsCache集合中对应的OffsetAndMetadata对象。
- 如果是记录Offset信息的消息且不是“删除标记”,则解析消息形成OffsetAndMetadata对象,添加到offsetsCache集合中。
- 如果是记录GroupMetadata信息的消息,则统计是否为“删除标记”,在第4步中进行处理。
- 根据第3步的第三种情况中的统计,将需要加载的GroupMetadata信息加载到
groupsCache
集合中,并检测需要删除的GroupMetadata信息是否还在groupsCache
集合中。 - 将当前Offset Topic分区的ID从
loadingPartitions
集合移入ownedPartitions
集合,标识该分区加载完成,当前GroupCoordinator开始正式负责管理其对应的Consumer Group。
注意,此处传递给loadGroupsForPartition(...)
方法的第二个参数为回调函数,它会在第4步中被调用,用于更新所有加载的GroupMetadata中MemberMetadata的心跳操作:
第二种情况中,GroupCoordinator的handleGroupEmigration(...)
方法将具体实现直接交给了GroupMetadataManager的removeGroupsForPartition(...)
方法:
GroupMetadataManager的removeGroupsForPartition(...)
方法的源码如下:
GroupMetadataManager的removeGroupsForPartition(...)
与loadGroupsForPartition(...)
方法非常类似,内部也会向KafkaScheduler调度器scheduler
提交一个线程任务,任务功能由其内部函数removeGroupsAndOffsets()
实现,用于从groupsCache
和offsetsCache
中移除对应的GroupMetadata及OffsetAndMetadata数据,具体流程如下:
- 从
ownedPartitions
集合中将对应的Offsets Topic分区删除,标识当前GroupCoordinator不再管理其对应Consumer Group。 - 遍历
offsetsCache
集合,将此分区对应的OffsetAndMetadata全部清除。 - 遍历
groupsCache
集合,将此分区对应的GroupMetadata全部清除
其中,第2步和第3步仅仅是清理offsetsCache
和groupsCache
集合,并没有向对应的Offsets Topic分区中追加“删除标记”消息,因为其他Broker会成为此Offsets Topic分区的Leader副本,还需要使用其中记录的GroupMetadata信息和Offset信息。
同时,此处传递给removeGroupsForPartition(...)
方法的第二个参数为回调函数,它会在第3步中被调用,用于在GroupMetadata被删除前,将Consumer Group状态转换成Dead,并根据之前的Consumer Group状态进行相应的清理操作:
在上述两个迁移涉及的过程中,loadingPartitions
集合用于记录正在迁移Offsets Topic分区,ownedPartitions
集合用于记录当前GroupCoordinator管理的Offsets Topic的分区。GroupMetadataManager中提供了两个检测方法:isGroupLocal()
和isGroupLoading()
。其中isGroupLocal()
用于检测当前GroupCoordinator是否管理指定的Consumer Group;isGroupLoading()
方法用于检测指定的Consumer Group对应的Offsets Topic分区是否还处于上述加载过程之中:
在处理JoinGroupRequest、OffsetFetchRequest、OffsetCommitRequest及HeartbeatRequest这些请求之前,都会调用这两个方法进行检测,如果检测失败,则直接返回异常响应。
5.5. SyncGroupRequest的处理
在讲解消费者的实现时,我们提到过,Consumer Group中的Leader消费者会通过发送SyncGroupRequest请求将Consumer Group内的分区消费的分配结果报告给GroupCoordinator,然后由GroupCoordinator对分配结果进行解析后构造为SyncGroupResponse返回给所有该Consumer Group内的消费者,消费者通过解析SyncGroupResponse请求即可确认自己被分配了哪些分区。
SyncGroupRequest请求由KafkaApis的handleSyncGroupRequest(...)
方法负责处理,源码如下:
KafkaApis的handleSyncGroupRequest(...)
方法在经过授权检查通过后,直接将处理委托给了GroupCoordinator的handleSyncGroup(...)
方法,该方法的源码如下:
handleSyncGroup(...)
方法也会进行一系列的检查,然后使用doSyncGroup(...)
方法进行处理,该方法源码如下:
doSyncGroup(...)
方法在经过一些检查后,会根据对应的GroupMetadata的状态进行处理,关于GroupState的内容我们后面会讨论,这里主要关注在AwaitingSync状态下才能够正常处理SyncGroupRequest请求。
注:关于Dead、PreparingRebalance和Stable的三种情况的处理,Dead状态下说明GroupMetadata是不可用的,直接返回UNKNOWN_MEMBER_ID错误码即可;PreparingRebalance表示当前Consumer Group正准备进行Group内消费者的重新均衡操作,因此也无法接受SyncGroupRequest请求以确定分区分配方案,返回是REBALANCE_IN_PROGRESS错误码;当GroupMetadata确定了分区分配方案后,会转换为Stable状态,在该状态下只会在出现了消费者客户端退出或新消费者客户端加入两种情况下,转换为PreparingRebalance状态,准备对Group内消费者进行重新均衡操作,对于SyncGroupRequest它会将已确定的分区分配方案直接返回。
这里我们注意跟踪一下传递给doSyncGroup(...)
方法的responseCallback
参数,是由KafkaApis的handleSyncGroupRequest(...)
一路传递过来的,最终它会赋值给发送SyncGroupRequest请求的消费者客户端在GroupMetadata中对应的MemberMetadata的awaitingSyncCallback
字段进行记录。
另外从源码中也可以得知,SyncGroupRequest请求只有是来自于Consumer Group的Leader消费者时,才会对其进行处理,对于非Leader消费者发送的SyncGroupRequest请求,仅仅是将从KafkaApis的handleSyncGroupRequest(...)
一路传递过来的responseCallback
记录到对应的MemberMetadata的awaitingSyncCallback
字段上,以便在后续操作中进行处理;doSyncGroup(...)
方法其实将具体的操作委托给了GroupMetadataManager的prepareStoreGroup(...)
方法,该方法会根据来自Consumer Group Leader消费者携带的分区分配结果在Offsets Topic中创建日志记录,但该过程和对应的回调其实是被封装为了一个DelayedStore延迟任务返回给了上层的doSyncGroup(...)
,由它来进行实际的处理。GroupMetadataManager的prepareStoreGroup(...)
方法如下:
真正实现追加消息操作的是GroupMetadataManager的store(...)
方法,其中会调用ReplicaManager的appendMessages(...)
方法向__consumer_offsets
主题中追加记录了Consumer Group中分区分配结果的消息,该方法在追加消息数据时,如果requiredAcks
参数为-1表示分区的所有副本都同步了消息后才可以返回响应,该过程会创建DelayedProduce延迟任务放入时间轮中执行。
在GroupMetadataManager的store(...)
方法中传递给appendMessages(...)
方法中的第二个参数requiredAcks
默认为-1,第三个参数是internalTopicsAllowed
为true,表示可以向Kafka内部Topic追加消息,第四个参数对应于prepareStoreGroup(...)
中的putCacheCallback()
回调方法,该方法会在DelayedProduce延迟任务满足后被调用:
putCacheCallback()
回调方法会处理追加消息的结果,整理得到错误码,最后会调用prepareStoreGroup(...)
的responseCallback
回调函数参数并将错误码传递给它,而responseCallback
回调函数的定义其实是位于GroupMetadataManager的store(...)
方法中的:
这里涉及到了三个方法,其中resetAndPropagateAssignmentError(...)
和maybePrepareRebalance(...)
用于处理有错误的情况,它们会清空分区的分配结果,发送异常响应,并将GroupMetadata切换为PreparingRebalance状态,源码如下:
setAndPropagateAssignment(...)
处理消息追加成功的情况,它会发送正常的SyncGroupResponse,并将对应的GroupMetadata切换为Stable状态,源码如下:
可以发现,上述两个过程都调用了GroupCoordinator的propagateAssignment(...)
处理分配结果,只是传入的错误码不同:
这里调用的MemberMetadata对象的awaitingSyncCallback
回调方法,其实就是由KafkaApis的handleSyncGroupRequest(...)
一路传递过来的,调用该回调方法即是在向RequestChannel的相关队列中添加SyncGroupResponse等待发送。收到了SyncGroupRequest请求,其实就可以视为消费者客户端是存活的,因此会completeAndScheduleNextHeartbeatExpiration(...)
方法用于将本次心跳延迟任务完成并开启下一次等待心跳的延迟任务的执行或超时。
5.6. OffsetCommitRequest的处理
消费者客户端在进行正常的消费过程以及Rebalance操作之前,都会进行提交Offset的操作,主要用于将消费者消费的每个分区对应的Offset封装成OffsetCommitRequest发送给GroupCoordinator。GroupCoordinator会将这些Offset封装成消息,追加到Offsets Topic主题对应的分区中。
OffsetCommitRequest请求由KafkaApis的handleOffsetCommitRequest(...)
方法负责处理,源码如下:
KafkaApis的handleOffsetCommitRequest(...)
会进行一系列的检查操作,在随后处理OffsetCommitRequest请求时,对Offset的存储方式存在两种情况:在旧版本的Kafka中,Offset是存储在Zookeeper中的,新版本改为存储到Offsets Topic中,因此handleOffsetCommitRequest(...)
方法做了区别处理,即根据请求头中的apiVersion
参数决定,如果apiVersion
参数为0,就更新Zookeeper中/consumers/[group_id]/offsets/[topic_name]/[partition_id]
节点的数据为提交的Offset后返回响应;如果apiVersion
参数不为0,则将请求中的Offset等信息构造为OffsetAndMetadata对象,交由GroupCoordinator的handleCommitOffsets(...)
方法处理,该方法源码如下:
handleCommitOffsets(...)
方法在经过各类检查合法之后,会调用GroupMetadataManager的prepareStoreOffsets(...)
方法创建DelayedStore对象,然后交给GroupMetadataManager的store(delayedAppend: DelayedStore)
方法进行消息数据向日志系统的追加;GroupMetadataManager的prepareStoreOffsets(...)
与前面讲解的prepareStoreGroup(...)
方法非常类似,源码如下:
prepareStoreOffsets(...)
方法返回的DelayedStore对象会通过store(...)
方法完成消息的追加,最终会调用putCacheCallback(...)
回调方法,而putCacheCallback(...)
中调用的responseCallback()
回调函数就是KafkaApis的handleOffsetCommitRequest(...)
中定义的sendResponseCallback(...)
,主要逻辑就是创建OffsetCommitResponse并添加到RequestChannel中等待发送。
5.7. OffsetFetchRequest的处理
当消费者出现故障宕机重新上线后,可以通过向GroupCoordinator发送OffsetFetchRequest获取其最近一次提交的offset,并从此位置重新开始进行消费。OffsetFetchRequest请求首先由KafkaApis的handleOffsetFetchRequest(...)
方法进行处理,源码如下:
从上述的源码可知,handleOffsetFetchRequest(...)
方法在进行一系列的检查后,针对获取Offset的处理也分为了两种情况:针对旧版本的Kafka将从Zookeeper的/consumers/[group_id]/offsets/[topic_name]/[partition_id]
节点读取Offset信息后返回响应;而针对于新版本则委托给GroupCoordinator的handleFetchOffsets(...)
方法,该方法源码如下:
经过各类检查合法后,GroupCoordinator的handleFetchOffsets(...)
方法会直接调用GroupMetadataManager的getOffsets(...)
方法获取对应的信息,该方法内部是直接从GroupMetadataManager的offsetsCache
中进行获取的:
5.8. ListGroupsRequest的处理
ListGroupsRequest请求用于获取所有的Consumer Group的信息,该请求一般是通过kafka-consumer-groups.sh脚本产生的,它由KafkaApis的handleListGroupsRequest(...)
方法负责处理:
从源码可以得知,获取所有Consumer Group的操作其实是由GroupCoordinator的handleListGroups()
方法实现的,源码如下:
该方法会将GroupMetadataManager的currentGroups
进行映射后转换为列表返回,GroupMetadataManager的currentGroups
获取的其实是groupsCache
的值集,也即是GroupMetadata的集合:
而GroupMetadata的overview()
方法其实会返回包含了Group ID的样例类GroupOverview实例:
推荐阅读
Java多线程 46 - ScheduledThreadPoolExecutor详解(2)
ScheduledThreadPoolExecutor用于执行周期性或延时性的定时任务,它是在ThreadPoolExe...