大数据
流式处理
Kafka
源码解析

Kafka系列 07 - 消费者源码分析 01:KafkaConsumer的创建及主题订阅

简介:本文主要讲解KafkaConsumer的创建、心跳操作Heartbeat和HeartbeatTask、Offset自动提交任务AutoCommitTask及主体订阅相关。

1. 简介

本文将以KafkaConsumer拉取消息的过程为例,介绍KafkaConsumer的工作原理和源码构成。

2. KafkaConsumer的构建过程

下面是构建一个简单的KafkaConsumer的通用方式:

  • Properties properties = new Properties();
  • // 设置broker
  • properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");
  • // 设置键反序列化器
  • properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  • // 设置值反序列化器
  • properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  • properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer.1");
  • // 创建消费者
  • KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

KafkaConsumer的创建与KafkaProducer非常类似,需要提供一些必要的参数,直接通过KafkaConsumer的构造器创建即可。我们看一下这里用到的KafkaConsumer的构造方法:

org.apache.kafka.clients.consumer.KafkaConsumer
  • // KafkaConsumer
  • public KafkaConsumer(Properties properties) {
  • this(properties, null, null);
  • }
  • public KafkaConsumer(Properties properties,
  • Deserializer<K> keyDeserializer,
  • Deserializer<V> valueDeserializer) {
  • this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(properties, keyDeserializer, valueDeserializer)), keyDeserializer, valueDeserializer);
  • }
  • private KafkaConsumer(ConsumerConfig config,
  • Deserializer<K> keyDeserializer,
  • Deserializer<V> valueDeserializer) {
  • try {
  • ...
  • // 获取配置:请求超时时间、会话超时时间、拉取操作最大等待时间
  • this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); //(request.timeout.ms)
  • int sessionTimeOutMs = config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG); //(session.timeout.ms)
  • int fetchMaxWaitMs = config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG); //(fetch.max.wait.ms)
  • // 校验配置
  • if (this.requestTimeoutMs <= sessionTimeOutMs || this.requestTimeoutMs <= fetchMaxWaitMs)
  • throw new ConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG + " should be greater than " + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG + " and " + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
  • // Time工具类
  • this.time = new SystemTime();
  • // 获取消费者客户端ID
  • String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG); //(client.id)
  • if (clientId.length() <= 0)
  • // 如果没有指定则自动分配,按"consumer-" + 自增序列的格式
  • clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
  • this.clientId = clientId;
  • ...
  • // 退避时间
  • this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); //(retry.backoff.ms)
  • // 创建Metadata实例
  • this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG)); //(metadata.max.age.ms)
  • // 获取配置的broker的地址列表
  • List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); //(bootstrap.servers)
  • // 设置强制更新Metadata
  • this.metadata.update(Cluster.bootstrap(addresses), 0);
  • ...
  • // 创建KafkaChannel构建器
  • ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
  • // 创建NetworkClient实例
  • NetworkClient netClient = new NetworkClient(
  • // 创建Selector实例
  • new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder), //(connections.max.idle.ms)
  • this.metadata,
  • clientId,
  • 100, // a fixed large enough value will suffice
  • config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG), //(reconnect.backoff.ms)
  • config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG), //(send.buffer.bytes)
  • config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG), //(receive.buffer.bytes)
  • config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), time); //(request.timeout.ms)
  • // 使用ConsumerNetworkClient包装创建的NetworkClient实例
  • this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs,
  • config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG)); //(request.timeout.ms)
  • // 根据配置获取offset重置策略
  • OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT)); //(auto.offset.reset)
  • // 根据重置策略创建SubscriptionState对象
  • this.subscriptions = new SubscriptionState(offsetResetStrategy);
  • // 根据配置获取分区分配器列表
  • List<PartitionAssignor> assignors = config.getConfiguredInstances(
  • ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, //(partition.assignment.strategy)
  • PartitionAssignor.class);
  • ...
  • // 根据配置获取拦截器列表
  • List<ConsumerInterceptor<K, V>> interceptorList = (List) (new ConsumerConfig(userProvidedConfigs)).getConfiguredInstances(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, //(interceptor.classes)
  • ConsumerInterceptor.class);
  • this.interceptors = interceptorList.isEmpty() ? null : new ConsumerInterceptors<>(interceptorList);
  • // 根据配置创建ConsumerCoordinator
  • this.coordinator = new ConsumerCoordinator(this.client,
  • config.getString(ConsumerConfig.GROUP_ID_CONFIG), //(group.id)
  • config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), //(session.timeout.ms)
  • config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG), //(heartbeat.interval.ms)
  • assignors,
  • this.metadata,
  • this.subscriptions,
  • metrics,
  • metricGrpPrefix,
  • this.time,
  • retryBackoffMs,
  • new ConsumerCoordinator.DefaultOffsetCommitCallback(),
  • config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), //(enable.auto.commit)
  • config.getLong(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG), //(auto.commit.interval.ms)
  • this.interceptors,
  • config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG)); //(exclude.internal.topics)
  • // 根据配置创建键值序列化器
  • if (keyDeserializer == null) {
  • this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, //(key.deserializer)
  • Deserializer.class);
  • this.keyDeserializer.configure(config.originals(), true);
  • } else {
  • config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG); //(key.deserializer)
  • this.keyDeserializer = keyDeserializer;
  • }
  • if (valueDeserializer == null) {
  • this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, //(value.deserializer)
  • Deserializer.class);
  • this.valueDeserializer.configure(config.originals(), false);
  • } else {
  • config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG); //(value.deserializer)
  • this.valueDeserializer = valueDeserializer;
  • }
  • // 根据配置创建Fetcher拉取器
  • this.fetcher = new Fetcher<>(this.client,
  • config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG), //(fetch.min.bytes)
  • config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG), //(fetch.max.wait.ms)
  • config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG), //(max.partition.fetch.bytes)
  • config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), //(max.poll.records)
  • config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG), //(check.crcs)
  • this.keyDeserializer,
  • this.valueDeserializer,
  • this.metadata,
  • this.subscriptions,
  • metrics,
  • metricGrpPrefix,
  • this.time,
  • this.retryBackoffMs);
  • ...
  • } catch (Throwable t) {
  • // call close methods if internal objects are already constructed
  • // this is to prevent resource leak. see KAFKA-2121
  • close(true);
  • // now propagate the exception
  • throw new KafkaException("Failed to construct kafka consumer", t);
  • }
  • }

KafkaConsumer构造过程中创建了几个重要的组件,其中Metadata和NetworkClient在KafkaProducer中已经讲解过了,用于元数据操作以及请求发送和响应处理;ConsumerNetworkClient组件则对NetworkClient进行了包装,提供了更高级的API操作;SubscriptionState组件用于追踪TopicPartition与offset的对应关系;PartitionAssignor指向了具体的分区分配实例;ConsumerInterceptors组件是消息拉取的拦截器集合,与ProducerInterceptors类似;ConsumerCoordinator组件用于KafkaConsumer与服务端的GroupCoordinator的交互;Fetcher组件主要用于处理拉取消息的请求和响应。其中大部分组件的实例化过程都比较简单,除了ConsumerCoordinator,因此我们首先关注ConsumerCoordinator的设计。

3. ConsumerCoordinator类

前面我们介绍过,每个broker启动的时候,都会创建GroupCoordinator实例,管理部分消费组和组下每个消费者消费的偏移量。每个消费者实例化时,同时实例化一个ConsumerCoordinator消费者协调器对象,负责同一个消费组下各个消费者和服务端组协调器之前的通信。消费者通过ConsumerCoordinator向被指派为群组协调器的broker(不同的群组可以有不同的协调器)的GroupCoordinator实例发送心跳来维持它们和群组的从属关系以及它们对分区的所有权关系。

我们先来观察上面构建KafkaConsumer时实例化ConsumerCoordinator对象时使用的构造方法,如下:

org.apache.kafka.clients.consumer.internals.ConsumerCoordinator
  • public ConsumerCoordinator(ConsumerNetworkClient client,
  • String groupId,
  • int sessionTimeoutMs,
  • int heartbeatIntervalMs,
  • List<PartitionAssignor> assignors,
  • Metadata metadata,
  • SubscriptionState subscriptions,
  • Metrics metrics,
  • String metricGrpPrefix,
  • Time time,
  • long retryBackoffMs,
  • OffsetCommitCallback defaultOffsetCommitCallback,
  • boolean autoCommitEnabled,
  • long autoCommitIntervalMs,
  • ConsumerInterceptors<?, ?> interceptors,
  • boolean excludeInternalTopics) {
  • // 调用父类AbstractCoordinator的构造器
  • super(client,
  • groupId,
  • sessionTimeoutMs,
  • heartbeatIntervalMs,
  • metrics,
  • metricGrpPrefix,
  • time,
  • retryBackoffMs);
  • this.metadata = metadata;
  • // 设置强制更新集群元数据
  • this.metadata.requestUpdate();
  • // 根据消费者的SubscriptionState实例和集群元数据构建元数据快照
  • this.metadataSnapshot = new MetadataSnapshot(subscriptions, metadata.fetch());
  • // 记录消费者的SubscriptionState实例
  • this.subscriptions = subscriptions;
  • // offset提交回调
  • this.defaultOffsetCommitCallback = defaultOffsetCommitCallback;
  • // 是否自动提交offset
  • this.autoCommitEnabled = autoCommitEnabled;
  • // 分区分配器集合
  • this.assignors = assignors;
  • // 添加Metadata监听器
  • addMetadataListener();
  • if (autoCommitEnabled) {
  • // 如果设置了自动提交offset,根据配置提交间隔时间,创建AutoCommitTask任务
  • this.autoCommitTask = new AutoCommitTask(autoCommitIntervalMs);
  • // 启动AutoCommitTask
  • this.autoCommitTask.reschedule();
  • } else {
  • this.autoCommitTask = null;
  • }
  • this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix);
  • // 拦截器集合
  • this.interceptors = interceptors;
  • // 是否排除Kafka内部使用的Topic
  • this.excludeInternalTopics = excludeInternalTopics;
  • }

ConsumerCoordinator继承自AbstractCoordinator类:

org.apache.kafka.clients.consumer.internals.AbstractCoordinator
  • // AbstractCoordinator类
  • public AbstractCoordinator(ConsumerNetworkClient client,
  • String groupId,
  • int sessionTimeoutMs,
  • int heartbeatIntervalMs,
  • Metrics metrics,
  • String metricGrpPrefix,
  • Time time,
  • long retryBackoffMs) {
  • this.client = client;
  • this.time = time;
  • // 年代信息
  • this.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID; // -1
  • // Consumer Member ID
  • this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID; // ""
  • this.groupId = groupId;
  • this.coordinator = null;
  • // 会话超时时间
  • this.sessionTimeoutMs = sessionTimeoutMs;
  • // 心跳操作辅助对象
  • this.heartbeat = new Heartbeat(this.sessionTimeoutMs, heartbeatIntervalMs, time.milliseconds());
  • // 心跳任务
  • this.heartbeatTask = new HeartbeatTask();
  • this.sensors = new GroupCoordinatorMetrics(metrics, metricGrpPrefix);
  • this.retryBackoffMs = retryBackoffMs;
  • }

ConsumerCoordinator类继承自AbstractCoordinator抽象类,在上面ConsumerCoordinator类的构造器中,首先调用了父类AbstractCoordinator的有参构造器,从该构造方法的源码可以得知,ConsumerCoordinator通过父类AbstractCoordinator在内部维护了generation年代信息(初始值为-1)、Member ID(初始值为空字符串)、Heartbeat心跳辅助对象以及HeartbeatTask心跳任务对象,这些成员属性在后面都会详细讲解,这里只需要有一个印象即可。在ConsumerCoordinator接下来的构造方法代码中,设置了强制更新集群元数据,根据消费者的SubscriptionState实例和集群元数据构建元数据快照,添加监听元数据更新操作的监听器,如果设置了自动提交Offset操作在此时还会创建并开启AutoCommitTask任务用于周期性提交offset。

从上面的分析得知,ConsumerCoordinator在初始化时涉及到Heartbeat、HeartbeatTask、SubscriptionState及AutoCommitTask实例的相关操作,下面将先对Heartbeat、HeartbeatTask及AutoCommitTask这几个类分别进行简单介绍,SubscriptionState类在后面会介绍。

3.1. Heartbeat和HeartbeatTask类

Heartbeat类管理了心跳操作过程中的各类指标,主要用于辅助心跳操作的进行,相对比较简单,它的定义如下:

org.apache.kafka.clients.consumer.internals.Heartbeat
  • public final class Heartbeat {
  • // 判断是否超时的时间长度
  • private final long timeout;
  • // 两次发送心跳的间隔
  • private final long interval;
  • // 最后发送心跳的时间
  • private long lastHeartbeatSend;
  • // 最后收到心跳响应的时间
  • private long lastHeartbeatReceive;
  • // 心跳任务重置时间
  • private long lastSessionReset;
  • public Heartbeat(long timeout,
  • long interval,
  • long now) {
  • // 心跳间隔必须大于超时时间
  • if (interval >= timeout)
  • throw new IllegalArgumentException("Heartbeat must be set lower than the session timeout");
  • this.timeout = timeout;
  • this.interval = interval;
  • this.lastSessionReset = now;
  • }
  • public void sentHeartbeat(long now) {
  • this.lastHeartbeatSend = now;
  • }
  • public void receiveHeartbeat(long now) {
  • this.lastHeartbeatReceive = now;
  • }
  • public boolean shouldHeartbeat(long now) {
  • return timeToNextHeartbeat(now) == 0;
  • }
  • public long lastHeartbeatSend() {
  • return this.lastHeartbeatSend;
  • }
  • // 计算下次发送心跳的时间
  • public long timeToNextHeartbeat(long now) {
  • // 当前距离上次发送心跳的时间
  • long timeSinceLastHeartbeat = now - Math.max(lastHeartbeatSend, lastSessionReset);
  • if (timeSinceLastHeartbeat > interval)
  • // 如果间隔时间大于设置的心跳间隔时间,说明时间到了,要发送心跳了,返回0
  • return 0;
  • else
  • // 否则计算还需要等待的时间
  • return interval - timeSinceLastHeartbeat;
  • }
  • // 检测是否过期
  • public boolean sessionTimeoutExpired(long now) {
  • return now - Math.max(lastSessionReset, lastHeartbeatReceive) > timeout;
  • }
  • public long interval() {
  • return interval;
  • }
  • public void resetSessionTimeout(long now) {
  • this.lastSessionReset = now;
  • }
  • }

HeartbeatTask类则实现了心跳发送主要业务逻辑,它是AbstractCoordinator的私有内部类,源码如下:

org.apache.kafka.clients.consumer.internals.AbstractCoordinator.HeartbeatTask
  • // 定时任务,负责定时发送HeartBeatRequest并处理其响应
  • private class HeartbeatTask implements DelayedTask {
  • private boolean requestInFlight = false;
  • public void reset() {
  • // start or restart the heartbeat task to be executed at the next chance
  • // 获取当前时间
  • long now = time.milliseconds();
  • // 重置最后一次心跳时间lastSessionReset为当前时间
  • heartbeat.resetSessionTimeout(now);
  • // 将当前HeartbeatTask任务对象从delayedTasks队列中移除
  • client.unschedule(this);
  • // 没有正在发送的心跳请求时
  • if (!requestInFlight)
  • // 使用ConsumerNetworkClient重新调度心跳任务
  • client.schedule(this, now);
  • }
  • @Override
  • public void run(final long now) {
  • /**
  • * 检查是否需要发送HeartbeatRequest
  • * 1. GroupCoordinator已确定且已连接
  • * 2. 不处于正在等待Partition分配结果的状态
  • * 3. 之前的HeartbeatRequest请求正常收到响应且没有过期
  • */
  • if (generation < 0 || needRejoin() || coordinatorUnknown()) {
  • // no need to send the heartbeat we're not using auto-assignment or if we are
  • // awaiting a rebalance
  • return;
  • }
  • // 检测HeartbeatResponse是否超时,若超时则认为GroupCoordinator宕机
  • if (heartbeat.sessionTimeoutExpired(now)) {
  • // we haven't received a successful heartbeat in one session interval
  • // so mark the coordinator dead
  • // 清空unsent集合中该GroupCoordinator所在Node对应的请求队列并将这些请求标记为异常
  • coordinatorDead();
  • return;
  • }
  • // 检测HeartbeatTask是否到期
  • if (!heartbeat.shouldHeartbeat(now)) {
  • /**
  • * 如果未到期,更新到期时间,将HeartbeatTask对象重新添加到DelayedTaskQueue中
  • * 注意,此时的时间已经更新为now + heartbeat.timeToNextHeartbeat(now)
  • */
  • // we don't need to heartbeat now, so reschedule for when we do
  • client.schedule(this, now + heartbeat.timeToNextHeartbeat(now));
  • } else {
  • // 已到期,更新最近发送HeartbeatRequest请求的时间,即将lastHeartbeatSend更新为当前时间
  • heartbeat.sentHeartbeat(now);
  • // 更新该字段,表示有HeartbeatRequest请求正在发送,还未收到响应,防止重复发送
  • requestInFlight = true;
  • // 发送心跳请求
  • RequestFuture<Void> future = sendHeartbeatRequest();
  • // 在返回的RequestFuture上添加RequestFutureListener监听器
  • future.addListener(new RequestFutureListener<Void>() {
  • @Override
  • public void onSuccess(Void value) {
  • requestInFlight = false;
  • long now = time.milliseconds();
  • heartbeat.receiveHeartbeat(now);
  • long nextHeartbeatTime = now + heartbeat.timeToNextHeartbeat(now);
  • client.schedule(HeartbeatTask.this, nextHeartbeatTime);
  • }
  • @Override
  • public void onFailure(RuntimeException e) {
  • requestInFlight = false;
  • client.schedule(HeartbeatTask.this, time.milliseconds() + retryBackoffMs);
  • }
  • });
  • }
  • }
  • }

从源码可知,HeartbeatTask类实现了DelayedTask接口,定性为延迟任务,发送心跳操作的具体代码位于run()方法中,具体实现是将HeartbeatTask添加到一个DelayedTaskQueue对象中,该队列内部维护了一个PriorityQueue优先队列,以小顶堆的结构对HeartbeatTask进行排序,每次取出队首的HeartbeatTask进行发送以确保心跳时序;这种顺序任务组织方式与ScheduledThreadPoolExecutor定时任务线程池的实现非常相似。具体的心跳操作将在后面详细介绍。

3.2. AutoCommitTask类

AutoCommitTask类也是ConsumerCoordinator的内部类,它的设计与HeartbeatTask类似,也实现了DelayedTask接口,同时由ConsumerNetworkClient对其进行调度处理;它源码如下:

org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.AutoCommitTask
  • // 定时任务,用于周期性地调用commitOffsetAsync()方法自动提交offset
  • private class AutoCommitTask implements DelayedTask {
  • // 提交间隔时间
  • private final long interval;
  • public AutoCommitTask(long interval) {
  • this.interval = interval;
  • }
  • private void reschedule() {
  • client.schedule(this, time.milliseconds() + interval);
  • }
  • private void reschedule(long at) {
  • client.schedule(this, at);
  • }
  • public void run(final long now) {
  • if (coordinatorUnknown()) {
  • log.debug("Cannot auto-commit offsets for group {} since the coordinator is unknown", groupId);
  • reschedule(now + retryBackoffMs);
  • return;
  • }
  • if (needRejoin()) {
  • // skip the commit when we're rejoining since we'll commit offsets synchronously
  • // before the revocation callback is invoked
  • reschedule(now + interval);
  • return;
  • }
  • // 异步提交偏移量
  • commitOffsetsAsync(subscriptions.allConsumed(), new OffsetCommitCallback() {
  • // 在提交成功的回调中重新规划下一次提交计划
  • @Override
  • public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
  • if (exception == null) {
  • reschedule(now + interval);
  • } else {
  • log.warn("Auto offset commit failed for group {}: {}", groupId, exception.getMessage());
  • reschedule(now + interval);
  • }
  • }
  • });
  • }
  • }

具体的自动提交操作位于run()方法中,调用了外部类ConsumerCoordinator的commitOffsetsAsync(...)方法,具体实现我们在后面详细介绍。

4. 订阅主题

在了解了KafkaConsumer的构建过程和ConsumerCoordinator类的基本构成后,我们从KafkaConsumer消费消息的动作开始深入讲解其中涉及到的各类组件以及设计思想。在之前的文章中讲到了KafkaConsumer的完整例子,构建KafkaConsumer实例后我们需要使用subscribe(...)方法指定KafkaConsumer订阅哪些主题,然后使用轮询操作产生真正的拉取消息动作,代码如下:

  • ...
  • // 创建消费者
  • KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
  • // 订阅主题
  • kafkaConsumer.subscribe(Arrays.asList("test"));
  • ...

KafkaConsumer的subscribe(...)方法有多个重载,定义如下:

org.apache.kafka.clients.consumer.KafkaConsumer#subscribe
  • // 订阅主题,主题以集合形式传入
  • public void subscribe(Collection<String> topics) {
  • // 默认使用NoOpConsumerRebalanceListener重均衡监听器
  • subscribe(topics, new NoOpConsumerRebalanceListener());
  • }
  • // 订阅主题,主题以集合形式传入,并指定特定的重均衡监听器
  • public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
  • // 检查是否多线程调用
  • acquire();
  • try {
  • if (topics.isEmpty()) {
  • // treat subscribing to empty topic list as the same as unsubscribing
  • // topics为空时,表示取消订阅
  • this.unsubscribe();
  • } else {
  • log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
  • // 调用SubscriptionState实例的subscribe()方法,更新状态记录
  • this.subscriptions.subscribe(topics, listener);
  • // 使用metadata已知主题集合记录订阅的主题
  • metadata.setTopics(subscriptions.groupSubscription());
  • }
  • } finally {
  • // 释放重入次数
  • release();
  • }
  • }
  • // 订阅主题,主题以正则表达式的形式指定,并指定特定的重均衡监听器
  • public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
  • // 检查是否多线程调用
  • acquire();
  • try {
  • log.debug("Subscribed to pattern: {}", pattern);
  • // 调用SubscriptionState实例的subscribe()方法,更新状态记录
  • this.subscriptions.subscribe(pattern, listener);
  • // 标记需要更新所有的主题元数据
  • this.metadata.needMetadataForAllTopics(true);
  • // 设置强制更新
  • this.metadata.requestUpdate();
  • } finally {
  • // 释放重入次数
  • release();
  • }
  • }

在上面的三个重载方法中,我们先关注acquire()release()两个方法;这两个方法在KafkaConsumer的很多方法中都会用到,但与AQS并发相关的同名方法的作用不同,它们主要是用于检查是否存在多线程使用KafkaConsumer实例的情况,以及对线程重入次数进行记载和管理,它们的源码如下:

org.apache.kafka.clients.consumer.KafkaConsumer
  • // 标识没有线程使用过当前KafkaConsumer实例
  • private static final long NO_CURRENT_THREAD = -1L;
  • // 记录了当前使用KafkaConsumer的线程的ID,用于防止多线程访问KafkaConsumer
  • private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
  • // refcount is used to allow reentrant access by the thread who has acquired currentThread
  • // 记录当前使用KafkaConsumer的线程的重入次数
  • private final AtomicInteger refcount = new AtomicInteger(0);
  • // 检测是否有其他线程尝试并发操作KafkaConsumer
  • private void acquire() {
  • // 确保KafkaConsumer没有被关闭
  • ensureNotClosed();
  • // 当前线程ID
  • long threadId = Thread.currentThread().getId();
  • /**
  • * 比对当前线程与currentThread保存的线程的ID
  • * 如果不同,尝试用CAS方式更新currentThread为当前线程的ID
  • * 需要注意的是,只有在currentThread的值为NO_CURRENT_THREAD(-1)时才可能修改成功
  • * 也即是,只有在之前没有线程操作KafkaConsumer时,才能修改成功,以防止多线程操作KafkaConsumer
  • */
  • if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
  • // 抛出ConcurrentModificationException异常
  • throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
  • // 当前使用KafkaConsumer的线程的重入次数加1
  • refcount.incrementAndGet();
  • }
  • private void ensureNotClosed() {
  • if (this.closed)
  • throw new IllegalStateException("This consumer has already been closed.");
  • }
  • private void release() {
  • // 当前使用KafkaConsumer的线程的重入次数为0时,才表示KafkaConsumer被释放,将currentThread置为NO_CURRENT_THREAD
  • if (refcount.decrementAndGet() == 0)
  • currentThread.set(NO_CURRENT_THREAD);
  • }

KafkaConsumer定义了两个原子对象currentThreadrefcount分别用于记录当前KafkaConsumer所属的线程以及该线程重入几个主要方法的次数,重入次数在每次调用acquire()时会自增,在每次调用release()会自减,代码的实现比较简单。由此可见,KafkaConsumer实例是无法在多线程环境中使用的,如果想要并行消费,只能定义多个KafkaConsumer实例,但需要注意的是,一个分区只能同时被一个KafkaConsumer线程消费,多余的消费者线程将会空闲,这种限制是为了保证同一个分区内消息被消费的时序,同时便于Kafka服务端管理对应的偏移量,因为Kafka的服务器端是以每个Topic的每个Partition的每个Consumer Group对应一个Offset,即(Topic, Partition, Consumer_Group_ID)单独确认对应的Offset,如果多个Consumer并行消费同一个Partition,那Offset的管理就会出问题。

4.1. SubscriptionState类

从前面的源码可知,KafkaConsumer的subscribe(...)方法最终都将具体的订阅操作交给SubscriptionState负责了,SubscriptionState用于追踪TopicPartition与Offset对应关系,KafkaConsumer在构造时就对其进行了实例化:

org.apache.kafka.clients.consumer.KafkaConsumer
  • ...
  • // 根据配置获取offset重置策略
  • OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT)); //(auto.offset.reset)
  • // 根据重置策略创建SubscriptionState对象
  • this.subscriptions = new SubscriptionState(offsetResetStrategy);
  • ...

OffsetResetStrateg的定义如下:

org.apache.kafka.clients.consumer.OffsetResetStrategy
  • // OffsetResetStrategy枚举
  • public enum OffsetResetStrategy {
  • LATEST, // 从最新的消息记录开始拉取
  • EARLIEST, // 从最早的消息记录开始拉取
  • NONE // 没有指定
  • }

其中OffsetResetStrategy是一个枚举类,只定义了LATEST、EARLIEST、NONE三个枚举值,相信大家对LATEST和EARLIEST都很了解,指定了拉取消息时偏移量的处理策略;我们可以通过auto.offset.reset配置来设置偏移量的重置策略,而SubscriptionState初始化时传入了配置的OffsetResetStrategy实例。

我们先看一下SubscriptionState类的定义和重要的成员属性和方法:

org.apache.kafka.clients.consumer.internals.SubscriptionState
  • public class SubscriptionState {
  • // 订阅模式枚举
  • private enum SubscriptionType {
  • NONE, // 初始值
  • AUTO_TOPICS, // 根据指定的Topic名字进行订阅,自动分配分区
  • AUTO_PATTERN, // 按照指定的正则表达式匹配Topic进行订阅,自动分配分区
  • USER_ASSIGNED // 用户手动指定消费者消费的Topic及分区编号
  • };
  • /* the type of subscription */
  • // 订阅模式
  • private SubscriptionType subscriptionType;
  • /* the pattern user has requested */
  • // 使用AUTO_PATTERN正则匹配时,该字段记录了正则表达式
  • private Pattern subscribedPattern;
  • /* the list of topics the user has requested */
  • // 使用AUTO_TOPICS或AUTO_PATTERN模式时,使用该集合记录所有订阅的Topic
  • private final Set<String> subscription;
  • /* the list of topics the group has subscribed to (set only for the leader on join group completion) */
  • // Group Leader使用该集合记录Group中所有消费者订阅的Topic,其他Follower只记录了自己订阅的Topic
  • private final Set<String> groupSubscription;
  • /* the list of partitions the user has requested */
  • // 使用USER_ASSIGNED模式时,此集合记录了分配给当前消费者的TopicPartition集合,与subscription集合互斥
  • private final Set<TopicPartition> userAssignment;
  • /* the list of partitions currently assigned */
  • // 此集合记录了每个TopicPartition的消费状态
  • private final Map<TopicPartition, TopicPartitionState> assignment;
  • /* do we need to request a partition assignment from the coordinator? */
  • // 是否需要进行一次分区分配
  • private boolean needsPartitionAssignment;
  • /* do we need to request the latest committed offsets from the coordinator? */
  • /**
  • * 是否需要从GroupCoordinator获取最近提交的offset,
  • * 当出现异步提交offset操作或者Rebalance操作刚完成时会将其设置为true,
  • * 成功获取最近提交的offset之后会设置为false
  • */
  • private boolean needsFetchCommittedOffsets;
  • /* Default offset reset strategy */
  • // 默认的Offset重置策略
  • private final OffsetResetStrategy defaultResetStrategy;
  • /* Listener to be invoked when assignment changes */
  • // 用于监听分区分配操作的监听器
  • private ConsumerRebalanceListener listener;
  • ...
  • }

在SubscriptionState中,SubscriptionType类型的属性subscriptionType记录了当前消费者对主题的订阅模式,SubscriptionType也是一个枚举,定义了NONE、AUTO_TOPICS、AUTO_PATTERN和USER_ASSIGNED几个枚举值,分别对应了主题的几种订阅模式;Map类型的属性assignment则记录了当前消费者对指定主题及分区的的消费状态。

4.1.1. TopicPartitionState类

assignment属性的键为TopicPartition类型,这个类之前已经讲过了,我们主要观察一下值类型TopicPartitionState,TopicPartitionState类是SubscriptionState的静态内部类,其中维护了消费的主题分区的信息,源码如下:

org.apache.kafka.clients.consumer.internals.SubscriptionState.TopicPartitionState
  • // TopicPartitionState是SubscriptionState的静态内部类
  • // 表示TopicPartition的消费状态
  • private static class TopicPartitionState {
  • // 记录了下次要从Kafka服务端获取的消息的offset
  • private Long position; // last consumed position
  • // 记录了最近一次提交的offset
  • private OffsetAndMetadata committed; // last committed position
  • // 记录了当前TopicPartition是否处于暂停状态,用于Consumer接口的pause()方法
  • private boolean paused; // whether this partition has been paused by the user
  • // 重置position的策略,该字段是否为空代表是否需要重置position的值
  • private OffsetResetStrategy resetStrategy; // the strategy to use if the offset needs resetting
  • public TopicPartitionState() {
  • this.paused = false;
  • this.position = null;
  • this.committed = null;
  • this.resetStrategy = null;
  • }
  • private void awaitReset(OffsetResetStrategy strategy) {
  • this.resetStrategy = strategy;
  • this.position = null;
  • }
  • public boolean awaitingReset() {
  • return resetStrategy != null;
  • }
  • // position是否合法
  • public boolean hasValidPosition() {
  • return position != null;
  • }
  • // 设置下次要从Kafka服务端获取的消息的offset
  • private void seek(long offset) {
  • this.position = offset;
  • this.resetStrategy = null;
  • }
  • // 设置position值
  • private void position(long offset) {
  • // 只有在position合法时才可以设置
  • if (!hasValidPosition())
  • throw new IllegalStateException("Cannot set a new position without a valid current position");
  • this.position = offset;
  • }
  • // 设置最近一次提交的offset
  • private void committed(OffsetAndMetadata offset) {
  • this.committed = offset;
  • }
  • // 暂停当前主题分区的消费
  • private void pause() {
  • this.paused = true;
  • }
  • // 重启当前主题分区的消费
  • private void resume() {
  • this.paused = false;
  • }
  • /**
  • * 判断是否可以拉取,isFetchable()为true有两个条件
  • * 1. 对应的TopicPartition未被标记为暂停状态;
  • * 2. 对应的TopicPartitionState的position不为null
  • */
  • private boolean isFetchable() {
  • return !paused && hasValidPosition();
  • }
  • }

从上面的源码可知,TopicPartitionState维护了消费者在消费过程中,对指定的主题分区的消费信息,position记录了下次要从Kafka服务端获取的消息的offset,committed则记录了最近一次提交的offset。paused字段则用于标识当前的主题分区是否正暂停消费,该字段一般由KafkaConsumer的pause(Collection<TopicPartition> partitions)方法调用,用于暂停对特定主题分区的消费:

org.apache.kafka.clients.consumer.KafkaConsumer
  • // kafkaConsumer类
  • public void pause(Collection<TopicPartition> partitions) {
  • acquire();
  • try {
  • // 遍历主题分区
  • for (TopicPartition partition: partitions) {
  • log.debug("Pausing partition {}", partition);
  • // 调用SubscriptionState进行暂停操作
  • subscriptions.pause(partition);
  • }
  • } finally {
  • release();
  • }
  • }

SubscriptionState类的pause()方法如下

org.apache.kafka.clients.consumer.internals.SubscriptionState.TopicPartitionState
  • public void pause(TopicPartition tp) {
  • // 获取tp对应的TopicPartitionState,调用pause()方法
  • assignedState(tp).pause();
  • }
  • // 获取tp对应的TopicPartitionState对象
  • private TopicPartitionState assignedState(TopicPartition tp) {
  • // 从assignment字典中根据键获取
  • TopicPartitionState state = this.assignment.get(tp);
  • if (state == null)
  • throw new IllegalStateException("No current assignment for partition " + tp);
  • return state;
  • }

paused字段的具体用处则是用于判断当前主题分区是否是可以被拉取的,具体实现在TopicPartitionState的isFetchable()方法中,源码上面已经贴出了。

4.1.2. Subscribe操作

SubscriptionState的subscribe(...)系列方法承载了KafkaConsumer传递过来的订阅操作的实现,其中subscribe(Collection<String> topics, ConsumerRebalanceListener listener)方法的源码如下:

org.apache.kafka.clients.consumer.internals.SubscriptionState#subscribe(java.util.Collection, org.apache.kafka.clients.consumer.ConsumerRebalanceListener)
  • // 根据主题集合及特定的重均衡监听器来订阅主题
  • public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
  • // 指定的重均衡监听器不可为空
  • if (listener == null)
  • throw new IllegalArgumentException("RebalanceListener cannot be null");
  • // 设置订阅模式
  • setSubscriptionType(SubscriptionType.AUTO_TOPICS);
  • this.listener = listener;
  • // 修改subscription字段,记录订阅的分区
  • changeSubscription(topics);
  • }
  • // 更新subscriptionType字段
  • private void setSubscriptionType(SubscriptionType type) {
  • // 只有在NONE模式下才可以指定为其他格式
  • if (this.subscriptionType == SubscriptionType.NONE)
  • this.subscriptionType = type;
  • else if (this.subscriptionType != type)
  • // 如果已经设置过一次,再次设置为不同的模式会报错
  • throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE);
  • }
  • // 改变subscription字段
  • public void changeSubscription(Collection<String> topicsToSubscribe) {
  • // 订阅的主题有变化
  • if (!this.subscription.equals(new HashSet<>(topicsToSubscribe))) {
  • // 清空原有的订阅
  • this.subscription.clear();
  • // 添加新的
  • this.subscription.addAll(topicsToSubscribe);
  • this.groupSubscription.addAll(topicsToSubscribe);
  • // 设置需要重新分配分区
  • this.needsPartitionAssignment = true;
  • // Remove any assigned partitions which are no longer subscribed to
  • // 同步assignment,将没有再订阅的主题移除
  • for (Iterator<TopicPartition> it = assignment.keySet().iterator(); it.hasNext(); ) {
  • TopicPartition tp = it.next();
  • if (!subscription.contains(tp.topic()))
  • it.remove();
  • }
  • }
  • }

从源码可以得知,以传入主题集合的方式订阅主题时,该subscribe(..)方法会调用setSubscriptionType(SubscriptionType type)方法更新subscriptionType字段,这里需要注意的是,subscriptionType字段只有在值为SubscriptionType.NONE时才能够被更新,也就是说如果已经设置过subscriptionType值为AUTO_TOPICS、AUTO_PATTERN或USER_ASSIGNED其中的一种之后就不允许更新了。

另外该subscribe(..)方法还调用changeSubscription(Collection<String> topicsToSubscribe)对订阅主题集合进行处理,如果发现这一次订阅的主题与之前订阅的不一样,会将needsPartitionAssignment置为true,表示需要进行分区重分配操作;最终SubscriptionState实例会使用subscriptiongroupSubscription字段记录订阅的分区,并将assignment记录中没有订阅的主题移除。

以传入正则表达式匹配主题的方式订阅的实现与以主题集合方式略有不同,subscribe(Pattern pattern, ConsumerRebalanceListener listener)的源码如下:

org.apache.kafka.clients.consumer.internals.SubscriptionState#subscribe
  • // 根据正则表达式匹配主题,及特定的重均衡监听器来订阅主题
  • public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
  • // 检查监听器
  • if (listener == null)
  • throw new IllegalArgumentException("RebalanceListener cannot be null");
  • // 设置订阅模式
  • setSubscriptionType(SubscriptionType.AUTO_PATTERN);
  • // 记录参数
  • this.listener = listener;
  • this.subscribedPattern = pattern;
  • }

从源码可以发现,相较于前一种方式,这里省略了changeSubscription(topics)的调用,这是因为在这种方式下主题的更新操作放在了ConsumerCoordinator类的构造方法中,回顾一下这部分代码:

org.apache.kafka.clients.consumer.internals.ConsumerCoordinator
  • public ConsumerCoordinator(ConsumerNetworkClient client,
  • String groupId,
  • int sessionTimeoutMs,
  • int heartbeatIntervalMs,
  • List<PartitionAssignor> assignors,
  • Metadata metadata,
  • SubscriptionState subscriptions,
  • Metrics metrics,
  • String metricGrpPrefix,
  • Time time,
  • long retryBackoffMs,
  • OffsetCommitCallback defaultOffsetCommitCallback,
  • boolean autoCommitEnabled,
  • long autoCommitIntervalMs,
  • ConsumerInterceptors<?, ?> interceptors,
  • boolean excludeInternalTopics) {
  • ...
  • // 记录消费者的SubscriptionState实例
  • this.subscriptions = subscriptions;
  • ...
  • // 添加Metadata监听器
  • addMetadataListener();
  • ...
  • }

可以发现,在构造ConsumerCoordinator实例时,调用addMetadataListener()添加了一个Metadata元数据更新监听器;正则表达式匹配主题的订阅方式与直接传入主题集合的订阅方式的设计是不同的,这是由于,以主题集合的方式订阅时,订阅的主题是明确的,无论Kafka服务端的主题是否存在;但正则表达式匹配的方式需要先获取所有的主题信息,然后根据正则匹配进行过滤。addMetadataListener()方法的源码如下:

org.apache.kafka.clients.consumer.internals.ConsumerCoordinator
  • /**
  • * 用来存储Metadata的快照信息,主要用来检测Topic是否发生了分区数量的变化。
  • * 在ConsumerCoordinator的构造方法中,会为Metadata添加一个监听器,当Metadata更新时会做下面几件事:
  • * - 如果是AUTO_PATTERN模式,则使用用户自定义的正则表达式过滤Topic,
  • * 得到需要订阅的Topic集合后,设置到SubscriptionState的subscription集合和groupSubscription集合中。
  • * - 如果是AUTO_PATTERN或AUTO_TOPICS模式,为当前Metadata做一个快照,这个快照底层是使用HashMap记录每个Topic中Partition的个数。
  • * 将新旧快照进行比较,发生变化的话,则表示消费者订阅的Topic发生分区数量变化,
  • * 则将SubscriptionState的needsPartitionAssignment字段置为true,需要重新进行分区分配。
  • * - 使用metadataSnapshot字段记录变化后的新快照。
  • */
  • private MetadataSnapshot metadataSnapshot;
  • private void addMetadataListener() {
  • this.metadata.addListener(new Metadata.Listener() {
  • @Override
  • public void onMetadataUpdate(Cluster cluster) {
  • // AUTO_PATTERN模式的处理
  • if (subscriptions.hasPatternSubscription()) {
  • // 权限验证
  • Set<String> unauthorizedTopics = new HashSet<String>();
  • for (String topic : cluster.unauthorizedTopics()) {
  • if (filterTopic(topic))
  • unauthorizedTopics.add(topic);
  • }
  • if (!unauthorizedTopics.isEmpty())
  • throw new TopicAuthorizationException(unauthorizedTopics);
  • // 定义一个List装载需要订阅的主题
  • final List<String> topicsToSubscribe = new ArrayList<>();
  • // 遍历Cluster存储的集群元数据中的Topic信息
  • for (String topic : cluster.topics())
  • // 过滤Topic将匹配的Topic加入到topicsToSubscribe
  • if (filterTopic(topic))
  • topicsToSubscribe.add(topic);
  • // 更新subscriptions集合、groupSubscription集合、assignment集合
  • subscriptions.changeSubscription(topicsToSubscribe);
  • // 更新Metadata需要记录元数据的Topic集合
  • metadata.setTopics(subscriptions.groupSubscription());
  • } else if (!cluster.unauthorizedTopics().isEmpty()) {
  • // 当非AUTO_PATTERN模式时,如果非授权的主题不为空,则抛出异常
  • throw new TopicAuthorizationException(new HashSet<>(cluster.unauthorizedTopics()));
  • }
  • // check if there are any changes to the metadata which should trigger a rebalance
  • // 检测是否为AUTO_PATTERN或AUTO_TOPICS模式
  • if (subscriptions.partitionsAutoAssigned()) {
  • // 根据新的subscriptions和cluster数据创建快照
  • MetadataSnapshot snapshot = new MetadataSnapshot(subscriptions, cluster);
  • // 比较新旧快照,如果不相等,则更新快照,并标识需要重新进行分区分配
  • if (!snapshot.equals(metadataSnapshot)) {
  • // 记录快照
  • metadataSnapshot = snapshot;
  • // 更新needsPartitionAssignment字段为true,表示需要重新进行分区分配
  • subscriptions.needReassignment();
  • }
  • }
  • }
  • });
  • }
  • public boolean hasPatternSubscription() {
  • return this.subscriptionType == SubscriptionType.AUTO_PATTERN;
  • }
  • private boolean filterTopic(String topic) {
  • return subscriptions.getSubscribedPattern().matcher(topic).matches() &&
  • !(excludeInternalTopics && TopicConstants.INTERNAL_TOPICS.contains(topic));
  • }

其中用到了SubscriptionState类的相关方法:

org.apache.kafka.clients.consumer.internals.SubscriptionState
  • // SubscriptionState类
  • public boolean partitionsAutoAssigned() {
  • return this.subscriptionType == SubscriptionType.AUTO_TOPICS || this.subscriptionType == SubscriptionType.AUTO_PATTERN;
  • }
  • public void needReassignment() {
  • // 去除groupSubscription中除subscription集合所有元素之外的元素
  • this.groupSubscription.retainAll(subscription);
  • this.needsPartitionAssignment = true;
  • }

在添加的Metadata.Listener监听器中,当集群元数据发生更新时,会调用onMetadataUpdate(Cluster cluster)回调方法,该方法分两个阶段对数据进行了处理:首先处理AUTO_PATTERN订阅模式下的主题过滤,同时依旧会调用subscriptions.changeSubscription(topicsToSubscribe)更新SubscriptionState中存储的订阅主题信息;最后如果是AUTO_PATTERN或AUTO_TOPICS订阅模式,还会根据SubscriptionState实例和Cluster集群元数据创建MetadataSnapshot快照,并与当前ConsumerCoordinator实例中保存的旧快照metadataSnapshot进行对比,以判断是否需要进行分区重分配。