大数据
流式处理
Kafka

Kafka系列 03 - 消费者的基本使用

简介:KafkaConsumer对象用于读取消息。创建KafkaConsumer对象与创建KafkaProducer对象非常相似。

1. 消费者

KafkaConsumer对象用于读取消息。创建KafkaConsumer对象与创建KafkaProducer对象非常相似——把想要传给消费者的属性放在Properties对象里,它有3个必要的属性:bootstrap.servers、key.deserializer和value.deserializer:

  • bootstrap.servers:指定了Kafka集群的连接字符串,与在KafkaProducer中的用途一样。
  • key.deserializervalue.deserializer:与生产者的Serializer定义也很类似,用于对读取到的消息的键和值进行反序列化。

group.id用于指定KafkaConsumer属于哪一个消费者群组,在大部分场景下,我们一般都会都指定消费者是属于某个群组的。下面是使用Java API创建并使用消费者的示例代码:

  • package com.coderap.customer;
  • import org.apache.kafka.clients.consumer.ConsumerConfig;
  • import org.apache.kafka.clients.consumer.ConsumerRecord;
  • import org.apache.kafka.clients.consumer.ConsumerRecords;
  • import org.apache.kafka.clients.consumer.KafkaConsumer;
  • import java.util.Arrays;
  • import java.util.Properties;
  • public class ConsumerTest {
  • public static void main(String[] args) {
  • Properties properties = new Properties();
  • // 设置broker
  • properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");
  • // 设置键反序列化器
  • properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  • // 设置值反序列化器
  • properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  • properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer.1");
  • // 创建消费者
  • KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
  • // 订阅主题
  • kafkaConsumer.subscribe(Arrays.asList("test"));
  • try {
  • while (true) {
  • // 拉取消息
  • ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);
  • // 遍历拉取到的消息
  • for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
  • System.out.println("[" + consumerRecord.topic() + ":" + consumerRecord.partition() + ":" + consumerRecord.offset() + "]:" + consumerRecord.key() + " -> " + consumerRecord.value());
  • }
  • }
  • } finally {
  • // 关闭消费者
  • kafkaConsumer.close();
  • }
  • }
  • }

运行后打印信息如下:

  • [test:2:66666]:key-2 -> value-2
  • [test:2:66667]:key-3 -> value-3
  • [test:2:66668]:key-5 -> value-5
  • [test:2:66669]:key-6 -> value-6
  • [test:0:66667]:key-1 -> value-1
  • [test:0:66668]:key-4 -> value-4
  • [test:0:66669]:key-9 -> value-9
  • [test:1:66667]:key-0 -> value-0
  • [test:1:66668]:key-7 -> value-7
  • [test:1:66669]:key-8 -> value-8

打印信息中,打印出的键值对即是前面讲解生产者时写入test主题的测试数据。在前面写入数据的过程中,数据的顺序是有序的,即消息键后面所标识的数据是顺序递增的,但从消费者拉取的数据打印信息来看,这些数据的顺序并没有顺序,这是由于test主题配置了三个分区,Kafka可以保证在写入分区时的数据有序性。在上述消费者对主题进行消息时,由于只配置了一个消费者和一个消费者组,因此该消费者会消费所有的分区。我们可以观察日志中打印的主题、分区和偏移量信息,可以发现,上述消费者来消费某一个分区内的数据时是保证数据有效性的。

2. 消费者的配置

对于消费者来说,Kafka也提供了大量配置可供开发者自定定制消费者的某些特性和行为,Kafka的文档列出了所有与消费者相关的配置说明,大部分参数都有合理的默认值,一般不需要修改它们,不过有一些参数与消费者的性能和可用性有很大关系。接下来介绍这些重要的属性。

  1. fetch.min.bytes:该属性指定了消费者从服务器获取记录的最小字节数。broker在收到消费者的数据请求时,如果可用的数据量小于fetch.min.bytes指定的大小,那么它会等到有足够的可用数据时才把它返回给消费者。这样可以降低消费者和broker的工作负载,因为它们在主题不是很活跃的时候(或者一天里的低谷时段)就不需要来来回回地处理消息。如果没有很多可用数据,但消费者的CPU使用率却很高,那么就需要把该属性的值设得比默认值大。如果消费者的数量比较多,把该属性的值设置得大一点可以降低broker的工作负载。
  2. fetch.max.wait.ms:我们通过fetch.min.bytes告诉Kafka,等到有足够的数据时才把它返回给消费者。而feth.max.wait.ms则用于指定broker的等待时间,默认是500ms。如果没有足够的数据流入Kafka,消费者获取最小数据量的要求就得不到满足,最终导致500ms的延迟。如果要降低潜在的延迟(为了满足SLA),可以把该参数值设置得小一些。如果fetch.max.wait.ms被设为100ms,并且fetch.min.bytes被设为1MB,那么Kafka在收到消费者的请求后,要么返回1MB数据,要么在100ms后返回所有可用的数据,就看哪个条件先得到满足。
  3. max.partition.fetch.bytes:该属性指定了服务器从每个分区里返回给消费者的最大字节数。它的默认值是1MB,也就是说,KafkaConsumer.poll()方法从每个分区里返回的记录最多不超过max.partition.fetch.bytes指定的字节。如果一个主题有20个分区和5个消费者,那么每个消费者需要至少4MB的可用内存来接收记录。在为消费者分配内存时,可以给它们多分配一些,因为如果群组里有消费者发生崩溃,剩下的消费者需要处理更多的分区。max.partition.fetch.bytes的值必须比broker能够接收的最大消息的字节数(通过max.message.size属性配置)大,否则消费者可能无法读取这些消息,导致消费者一直挂起重试。在设置该属性时,另一个需要考虑的因素是消费者处理数据的时间。消费者需要频繁调用poll()方法来避免会话过期和发生分区再均衡,如果单次调用poll()返回的数据太多,消费者需要更多的时间来处理,可能无法及时进行下一个轮询来避免会话过期。如果出现这种情况,可以把max.partition.fetch.bytes值改小,或者延长会话过期时间。
  4. session.timeout.ms:该属性指定了消费者在被认为死亡之前可以与服务器断开连接的时间,默认是3s。如果消费者没有在session.timeout.ms指定的时间内发送心跳给群组协调器,就被认为已经死亡,协调器就会触发再均衡,把它的分区分配给群组里的其他消费者。该属性与heartbeat.interval.ms紧密相关。heartbeat.interval.ms指定了poll()方法向协调器发送心跳的频率,session.timeout.ms则指定了消费者可以多久不发送心跳。所以,一般需要同时修改这两个属性,heartbeat.interval.ms必须比session.timeout.ms小,一般是session.timeout.ms的三分之一。如果session.timeout.ms是3s,那么heartbeat.interval.ms应该是1s。把session.timeout.ms值设得比默认值小,可以更快地检测和恢复崩溃的节点,不过长时间的轮询或垃圾收集可能导致非预期的再均衡。把该属性的值设置得大一些,可以减少意外的再均衡,不过检测节点崩溃需要更长的时间。
  5. auto.offset.reset:该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下(因消费者长时间失效,包含偏移量的记录已经过时并被删除)该作何处理。它的默认值是latest,意思是说,在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)。另一个值是earliest,意思是说,在偏移量无效的情况下,消费者将从起始位置读取分区的记录。
  6. enable.auto.commit:我们稍后将介绍几种不同的提交偏移量的方式。该属性指定了消费者是否自动提交偏移量,默认值是true。为了尽量避免出现重复数据和数据丢失,可以把它设为false,由自己控制何时提交偏移量。如果把它设为true,还可以通过配置auto.commit.interval.ms属性来控制提交的频率。
  7. partition.assignment.strategy:我们知道,分区会被分配给群组里的消费者。PartitionAssignor根据给定的消费者和主题,决定哪些分区应该被分配给哪个消费者。Kafka有两个默认的分配策略,可以通过设置partition.assignment.strategy来选择分区策略。默认使用的是org.apache.kafka.clients.consumer.RangeAssignor,这个类实现了Range策略,不过也可以把它改成org.apache.kafka.clients.consumer.RoundRobinAssignor。我们还可以使用自定义策略,在这种情况下,partition.assignment.strategy属性的值就是自定义类的名字。这两个策略的解释如下:
  • Range:该策略会把主题的若干个连续的分区分配给消费者。假设消费者C1和消费者C2同时订阅了主题T1和主题T2,并且每个主题有3个分区。那么消费者C1有可能分配到这两个主题的分区0和分区1,而消费者C2分配到这两个主题的分区2。因为每个主题拥有奇数个分区,而分配是在主题内独立完成的,第一个消费者最后分配到比第二个消费者更多的分区。只要使用了Range策略,而且分区数量无法被消费者数量整除,就会出现这种情况。
  • RoundRobin:该策略把主题的所有分区逐个分配给消费者。如果使用RoundRobin策略来给消费者C1和消费者C2分配分区,那么消费者C1将分到主题T1的分区0和分区2以及主题T2的分区1,消费者C2将分配到主题T1的分区1以及主题T2的分区0和分区2。一般来说,如果所有消费者都订阅相同的主题(这种情况很常见),RoundRobin策略会给所有消费者分配相同数量的分区(或最多就差一个分区)。
  1. client.id:该属性可以是任意字符串,broker用它来标识从客户端发送过来的消息,通常被用在日志、度量指标和配额里。
  2. max.poll.records:该属性用于控制单次调用call()方法能够返回的记录数量,可以帮你控制在轮询里需要处理的数据量。
  3. receive.buffer.bytes和send.buffer.bytes:socket在读写数据时用到的TCP缓冲区也可以设置大小。如果它们被设为-1,就使用操作系统的默认值。如果生产者或消费者与broker处于不同的数据中心内,可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。

3. 轮询操作

在上面的示例代码的while无限循环中,有这样一行语句:

  • ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);

我们调用了KafkaConsumer消费者对象的poll(long timeout)拉取消息数据。poll()方法是用于消息轮询的消费者核心API,通过while无限循环以及poll(long timeout)方法所构建的的持续轮询向服务器请求数据。轮询操作会处理所有的细节,包括群组协调、分区再均衡、发送心跳和获取数据,开发者只需要关注对所获取数据的处理业务即可。

消费者必须持续对Kafka进行轮询,否则会被认为已经死亡,它的分区会被移交给群组里的其他消费者。传给poll()方法的参数是一个超时时间,用于控制poll()方法的阻塞时间(在消费者的缓冲区里没有可用数据时会发生阻塞)。如果该参数被设为0,poll()会立即返回,否则它会在指定的毫秒数内一直等待broker返回数据。

poll()方法返回一个消息记录列表,其中每条消息记录都包含了消息所属主题的信息、消息所在分区的信息、消息在分区里的偏移量,以及消息的键值对。我们一般会遍历这个列表,逐条处理这些消息记录。poll()方法有一个超时参数,它指定了方法在多久之后可以返回,不管有没有可用的数据都要返回。超时时间的设置取决于应用程序对响应速度的要求,比如要在多长时间内把控制权归还给执行轮询的线程。

在退出应用程序之前应该使用close()方法关闭消费者,以关闭网络连接和socket服务,并立即触发一次再均衡(再均衡会在后面介绍)。

轮询不只是获取数据那么简单。在第一次调用新消费者的poll()方法时,它会负责查找GroupCoordinator,然后加入群组,接受分配的分区。如果发生了再均衡,整个过程也是在轮询期间进行的。当然,心跳也是从轮询里发送出去的。所以,我们要确保在轮询期间所做的任何处理工作都应该尽快完成。

4. 偏移量的提交

Kafka在0.9.0.0之后对分区的偏移量的存储位置作了变更,之前的旧版本中,偏移量信息是存储在Zookeeper中,新版本改为存储到一个特殊的Topic中(__consumer_offsets)。每次调用poll()方法,它总是返回由生产者写入Kafka但还没有被消费者读取过的记录,我们因此可以通过偏移量信息来获知哪些记录是被群组里的哪个消费者读取的,而更新分区当前偏移量的操作叫作偏移量的提交。

关于Kafka的分区偏移量有以下几个重要的概念:

1.偏移量相关概念.png

这四个分区偏移量属性中中,Last Committed Offset和Current Position是与消费者的数据消费行为相关的,而High Watermark和Log End Offset则与生产者的数据生产行为及数据副本的同步有关。

  • Last Committed Offset:这个属性表示当前分区最新一次由消费者组提交的Offset,表示该消费者组已经把Last Committed Offset之前的数据都消费成功了;
  • Current Position:消费者组当前正在消费的数据的Offset,也就是说,Last Committed Offset到Current Position之间的数据已经拉取成功,可能正在处理,但还未提交;
  • Log End Offset:生产者写入到Kafka中的最新一条数据的Offset;
  • High Watermark:已经成功备份到其他跟随节点中的最新一条数据的Offset,也就是说High Watermark至Log End Offset之间的数据已经写入到该分区的的首领节点中,但是还未成功备份到其他的跟随节点中,这部分数据被认为是不安全的,是不允许消费者消费的。

在上图的例子中,介绍了两种由Last Committed Offset和Current Position引起的消费问题。这种问题一般出现在消费者发生崩溃或者有新的消费者加入群组情况下,因触发分区再均衡导致消费者可能分配到新的分区,此时消费者需要读取新获取的分区最后一次提交的偏移量,然后从偏移量指定的地方继续处理。此时Last Committed Offset和Current Position之间的消息会出现丢失或重复消费的情况,因此处理偏移量的方式对客户端会有很大的影响。KafkaConsumer API提供了很多种方式来提交偏移量。

4.1. 自动提交

最简单的提交方式是让消费者自动提交偏移量。如果enable.auto.commit配置项被设为true,那么每过5s,消费者会自动把从poll()方法接收到的最大偏移量提交上去。提交时间间隔由auto.commit.interval.ms配置项控制,默认值是5s。与消费者里的其他东西一样,自动提交也是在轮询里进行的。消费者每次在进行轮询时会检查是否该提交偏移量了,如果是,那么就会提交从上一次轮询返回的偏移量。

虽然自动提交简单方便,但这种方式并没有提供相应的接口来处理消息丢失或重复消费。在使用自动提交时,每次调用轮询方法都会把上一次调用返回的偏移量提交上去,这是Kafka自发的提交行为,无论消息是否被正确处理。

4.2. 同步提交

大部分开发者通过控制偏移量提交时间来消除丢失消息的可能性,并在发生再均衡时减少重复消息的数量。消费者API提供了同步提交偏移量的方式,开发者可以在必要的时候提交当前偏移量,而不是基于时间间隔。

auto.commit.offset设为false,让应用程序决定何时提交偏移量。使用commitSync()提交偏移量最简单也最可靠。这个API会提交由poll()方法返回的最新偏移量,提交成功后马上返回,如果提交失败就抛出异常。

需要注意的是,commitSync()将会提交由poll()返回的最新偏移量,所以在处理完所有记录后要确保调用了commitSync(),否则还是会有丢失消息的风险。如果发生了再均衡,从最近一批消息到发生再均衡之间的所有消息都将被重复处理。

使用commitSync()方法提交的例子如下:

  • ...
  • // 订阅主题
  • kafkaConsumer.subscribe(Arrays.asList("test"));
  • try {
  • while (true) {
  • // 拉取消息
  • ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);
  • // 遍历拉取到的消息
  • for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
  • System.out.println("[" + consumerRecord.topic() + ":" + consumerRecord.partition() + ":" + consumerRecord.offset() + "]:" + consumerRecord.key() + " -> " + consumerRecord.value());
  • }
  • // 同步提交Offset
  • try {
  • kafkaConsumer.commitSync();
  • } catch (CommitFailedException e) {
  • System.out.println("Offset同步提交失败");
  • e.printStackTrace();
  • }
  • }
  • } finally {
  • // 关闭消费者
  • kafkaConsumer.close();
  • }
  • ...

4.3. 异步提交

同步提交偏移量时,commitSync()方法在broker对提交请求作出回应之前会一直阻塞,这样会降低应用程序的吞吐量。Kafka也提供了异步提交的API方法commitAsync(),该提交操作无需等待broker的响应:

  • // 订阅主题
  • kafkaConsumer.subscribe(Arrays.asList("test"));
  • try {
  • while (true) {
  • // 拉取消息
  • ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);
  • // 遍历拉取到的消息
  • for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
  • System.out.println("[" + consumerRecord.topic() + ":" + consumerRecord.partition() + ":" + consumerRecord.offset() + "]:" + consumerRecord.key() + " -> " + consumerRecord.value());
  • }
  • try {
  • // 同步提交Offset
  • kafkaConsumer.commitAsync();
  • } catch (CommitFailedException e) {
  • System.out.println("Offset同步提交失败");
  • e.printStackTrace();
  • }
  • }
  • } finally {
  • // 关闭消费者
  • kafkaConsumer.close();
  • }

在成功提交或碰到无法恢复的错误之前,commitSync()会一直重试,但是commitAsync()不会,但它提供了回调操作,在broker作出响应时会执行回调。回调经常被用于记录提交错误或生成度量指标:

  • // 订阅主题
  • kafkaConsumer.subscribe(Arrays.asList("test"));
  • try {
  • while (true) {
  • // 拉取消息
  • ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);
  • // 遍历拉取到的消息
  • for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
  • System.out.println("[" + consumerRecord.topic() + ":" + consumerRecord.partition() + ":" + consumerRecord.offset() + "]:" + consumerRecord.key() + " -> " + consumerRecord.value());
  • }
  • // 异步提交Offset
  • kafkaConsumer.commitAsync(new OffsetCommitCallback() {
  • @Override
  • public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
  • if (exception != null) {
  • System.out.println("Offset异步提交失败");
  • }
  • }
  • });
  • }
  • } finally {
  • // 关闭消费者
  • kafkaConsumer.close();
  • }

由于偏移量的提交是异步的,因此每一次的提交顺序并不能得到保证,在异步提交后收到服务器响应的时候,可能有一个更大的偏移量已经提交成功。如果较小的偏移量覆盖了较大的偏移量,此时发生分区再均衡就会出现重复消息。我们可以使用一个单调递增的序列号来维护异步提交的顺序。在每次提交偏移量之后或在回调里提交偏移量时递增序列号。在发送提交错误进行重试前,可以先检查回调的序列号和即将提交的偏移量是否相等,如果相等,说明没有新的提交,那么可以安全地进行重试;如果序列号比较大,说明有一个新的提交已经发送出去了,应该停止重试。

4.4. 同步和异步组合提交

一般情况下,针对偶尔出现的提交失败,不进行重试不会有太大问题,因为如果提交失败是因为临时问题导致的,那么后续的提交总会有成功的。但如果这是发生在关闭消费者或再均衡前的最后一次提交,就要确保能够提交成功。

因此,在消费者关闭前一般会组合使用commitAsync()commitSync()

  • // 订阅主题
  • kafkaConsumer.subscribe(Arrays.asList("test"));
  • try {
  • while (true) {
  • // 拉取消息
  • ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);
  • // 遍历拉取到的消息
  • for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
  • System.out.println("[" + consumerRecord.topic() + ":" + consumerRecord.partition() + ":" + consumerRecord.offset() + "]:" + consumerRecord.key() + " -> " + consumerRecord.value());
  • }
  • // 异步提交Offset
  • kafkaConsumer.commitAsync(new OffsetCommitCallback() {
  • @Override
  • public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
  • if (exception != null) {
  • System.out.println("Offset异步提交失败");
  • }
  • }
  • });
  • }
  • } catch (Exception e) {
  • e.printStackTrace();
  • } finally {
  • try {
  • // 同步提交Offset
  • kafkaConsumer.commitSync();
  • } catch (CommitFailedException e) {
  • System.out.println("Offset同步提交失败");
  • e.printStackTrace();
  • } finally {
  • // 关闭消费者
  • kafkaConsumer.close();
  • }
  • }

4.5. 提交特定的偏移量

简单调用commitSync()commitAsync()只会提交每个poll()方法拉取批次消息的最后一个偏移量,如果我们想要在消费过程中提交特定的偏移量,可以使用带有参数的commitSync()commitAsync()方法,传入希望提交的分区和偏移量的Map对象。但由于消费者可能不只读取一个分区,我们需要跟踪所有分区的偏移量,所以在这个层面上控制偏移量的提交会让代码变复杂。

  • // 计数器,记录消费了多少条消息
  • private static int count = 0;
  • // 存储分区和偏移量信息
  • private static Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
  • // 订阅主题
  • kafkaConsumer.subscribe(Arrays.asList("test"));
  • try {
  • while (true) {
  • // 拉取消息
  • ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);
  • // 遍历拉取到的消息
  • for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
  • System.out.println("[" + consumerRecord.topic() + ":" + consumerRecord.partition() + ":" + consumerRecord.offset() + "]:" + consumerRecord.key() + " -> " + consumerRecord.value());
  • // 将分区和偏移量信息添加到Map中
  • currentOffsets.put(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), new OffsetAndMetadata(consumerRecord.offset() + 1, "no metadata"));
  • // 每消费1000条提交一次Offset
  • if (count % 1000 == 0)
  • // 异步提交,也可以使用同步提交,需要处理可能发生的错误
  • kafkaConsumer.commitAsync(currentOffsets, null);
  • count++;
  • }
  • }
  • } catch (Exception e) {
  • e.printStackTrace();
  • } finally {
  • try {
  • // 同步提交Offset
  • kafkaConsumer.commitSync();
  • } catch (CommitFailedException e) {
  • System.out.println("Offset同步提交失败");
  • e.printStackTrace();
  • } finally {
  • // 关闭消费者
  • kafkaConsumer.close();
  • }
  • }

5. 分区再均衡

Kafka消费者可以以群组的方式进行组织,每个群组中的消费者消费一个主题的所有分区,分区会根据群组中的消费者数量进行均衡分配,且能够保证整个主题的消费不会被群组中的多个消费者重复消费。

Kafka允许动态地改变群组中的消费者数量,往群组里增加或减少消费者是横向伸缩消费能力的主要方式。Kafka消费者经常会做一些高延迟的操作,比如把数据写到数据库或HDFS,或者使用数据进行比较耗时的计算。在这些情况下,单个消费者无法跟上数据生成的速度,所以可以增加更多的消费者,让它们分担负载,每个消费者只处理部分分区的消息,这就是横向伸缩的主要手段。我们有必要为主题创建大量的分区,在负载增长时可以加入更多的消费者。不过要注意,不要让消费者的数量超过主题分区的数量,多余的消费者只会被闲置。

一个新的消费者加入群组时,它读取的是原本由其他消费者读取的消息。当一个消费者被关闭或发生崩溃时,它就离开群组,原本由它读取的分区将由群组里的其他消费者来读取。在主题发生变化时,比如管理员添加了新的分区,会发生分区重分配。

分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为再均衡。再均衡非常重要,它为消费者群组带来了高可用性和伸缩性(我们可以放心地添加或移除消费者),不过在正常情况下,我们并不希望发生这样的行为。在再均衡期间,消费者无法读取消息,造成整个群组一小段时间的不可用。另外,当分区被重新分配给另一个消费者时,消费者当前的读取状态会丢失,它有可能还需要去刷新缓存,在它重新恢复状态之前会拖慢应用程序。

5.1. GroupCoordinator

每个broker启动的时候,都会创建GroupCoordinator实例,管理部分消费组和组下每个消费者消费的偏移量。每个消费者实例化时,同时实例化一个ConsumerCoordinator消费者协调器对象,负责同一个消费组下各个消费者和服务端组协调器之前的通信。消费者通过ConsumerCoordinator向被指派为群组协调器的broker(不同的群组可以有不同的协调器)的GroupCoordinator实例发送心跳来维持它们和群组的从属关系以及它们对分区的所有权关系。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读取分区里的消息。消费者会在轮询消息(为了获取消息)或提交偏移量时发送心跳。如果消费者停止发送心跳的时间足够长,会话就会过期,群组协调器认为它已经死亡,就会触发一次再均衡。

如果一个消费者发生崩溃,并停止读取消息,群组协调器会等待几秒钟,确认它死亡了才会触发再均衡。在这几秒钟时间里,死掉的消费者不会读取分区里的消息。在清理消费者时,消费者会通知协调器它将要离开群组,协调器会立即触发一次再均衡,尽量降低处理停顿。

注:在0.10.1版本里,Kafka引入了一个独立的心跳线程,可以在轮询消息的空档发送心跳。这样一来,发送心跳的频率(也就是消费者群组用于检测发生崩溃的消费者或不再发送心跳的消费者的时间)与消息轮询的频率(由处理消息所花费的时间来确定)之间就是相互独立的。在新版本的Kafka里,可以指定消费者在离开群组并触发再均衡之前可以有多长时间不进行消息轮询,这样可以避免出现活锁(livelock),比如有时候应用程序并没有崩溃,只是由于某些原因导致无法正常运行。这个配置与session.timeout.ms是相互独立的,后者用于控制检测消费者发生崩溃的时间和停止发送心跳的时间。

当消费者要加入群组时,它会向群组协调器发送一个JoinGroup请求。第一个加入群组的消费者将成为“群主”。群主从协调器那里获得群组的成员列表(列表中包含了所有最近发送过心跳的消费者,它们被认为是活跃的),并负责给每一个消费者分配分区。它使用一个实现了PartitionAssignor接口的类来决定哪些分区应该被分配给哪个消费者。

根据内置的分配策略(Range或RoundRobin)将分区分配给相应的消费者之后,群主会把分配情况列表发送给群组协调器,协调器再把这些信息发送给所有消费者。每个消费者只能看到自己的分配信息,只有群主知道群组里所有消费者的分配信息。这个过程会在每次再均衡时重复发生。

我们在前面提到过,新版本中Kafka将分区的偏移量信息存储在__consumer_offsets主题上,该主题是Kafka内部使用的一个主题,专门用来存储Group消费的情况,默认情况下有50个partition,每个partition有三个副本,而具体Group的消费情况要存储到哪一个partition上,是根据公式abs(GroupId.hashCode()) % NumPartitions来计算,其中,NumPartitions__consumer_offsets的partition数,默认是50个。同时,该partition的首领分区所在的broker即为该Group所对应的GroupCoordinator,GroupCoordinator会存储与该Group相关的所有的Meta信息。

在broker启动时,每个broker都会启动一个GroupCoordinator服务,但只有__consumer_offsets的partition的首领分区所在节点的GroupCoordinator服务才会直接与消费者客户端进行交互,也就是其Group的GroupCoordinator,其他的GroupCoordinator只是作为备份,在首领分区所在的broker挂掉之后及时进行替代。

Server端,Consumer的Group共定义了五个状态:

  • Empty:Group没有任何成员,如果与之相关的所有的offsets都过期的话就会转变成Dead状态;当Group新创建时是Empty状态,也有可能这个Group仅仅用于偏移量提交,并没有任何消费者成员;
  • PreparingRebalance:Group正在准备进行Rebalance在均衡;
  • AwaitingSync:Group正在等待GroupCoordinator的分配;
  • Stable:稳定的状态;
  • Dead:Group内已经没有成员,并且它的Meta信息已经被GroupCoordinator移除。

其各个状态的定义及转换都在GroupMetadata中定义,状态转移图如下所示:

2.Group状态转移图示.png

5.2. 再均衡监听器

消费者在退出和进行分区再均衡之前,会做一些清理工作。可以在消费者失去对一个分区的所有权之前提交最后一个已处理记录的偏移量。如果消费者准备了一个缓冲区用于处理偶发的事件,那么在失去分区所有权之前,需要处理在缓冲区累积下来的记录,可能还需要关闭文件句柄、数据库连接等。在为消费者分配新分区或移除旧分区时,可以通过消费者API执行一些应用程序代码;subscribe()方法允许开发者传入一个ConsumerRebalanceListener监听器,它有两个需要实现的方法:

  1. onPartitionsRevoked(Collection<TopicPartition> partitions):该方法会在再均衡开始之前和消费者停止读取消息之后被调用。如果在这里提交偏移量,下一个接管分区的消费者就知道该从哪里开始读取了。
  2. onPartitionsAssigned(Collection<TopicPartition> partitions):该方法会在重新分配分区之后和消费者开始读取消息之前被调用。

下面的例子中演示了如何使用ConsumerRebalanceListener监听器:

  • // 存储分区和偏移量信息
  • private static Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
  • ...
  • // 创建消费者
  • final KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
  • // 订阅主题
  • kafkaConsumer.subscribe(Arrays.asList("test"), new ConsumerRebalanceListener() {
  • @Override
  • public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
  • System.out.println("发送分区再均衡,需要提交当前Offset:" + currentOffsets);
  • // 提交记录的偏移量信息
  • kafkaConsumer.commitSync(currentOffsets);
  • }
  • @Override
  • public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
  • // TODO
  • }
  • });
  • ...
  • while (true) {
  • // 拉取消息
  • ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);
  • // 遍历拉取到的消息
  • for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
  • System.out.println("[" + consumerRecord.topic() + ":" + consumerRecord.partition() + ":" + consumerRecord.offset() + "]:" + consumerRecord.key() + " -> " + consumerRecord.value());
  • // 将分区和偏移量信息添加到Map中
  • currentOffsets.put(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), new OffsetAndMetadata(consumerRecord.offset() + 1, "no metadata"));
  • }
  • // 异步提交Offset
  • kafkaConsumer.commitAsync(new OffsetCommitCallback() {
  • @Override
  • public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
  • if (exception != null) {
  • System.out.println("Offset异步提交失败");
  • }
  • }
  • });
  • }
  • ...

需要注意的是在监听器中提交的是最近处理过的偏移量,而不是批次中还在处理的最后一个偏移量。因为分区有可能在我们还在处理消息的时候被撤回。我们要提交所有分区的偏移量,而不只是那些即将失去所有权的分区的偏移量——因为提交的偏移量是已经处理过的,所以不会有什么问题。

6. 从指定偏移量开始消费

有时候我们也需要从分区特定的偏移量处开始读取消息,或者直接跳到分区的末尾开始读取消息,seekToBeginning(Collection<TopicPartition> tp)seekToEnd(Collection<TopicPartition> tp)提供了跳到分区开头或末尾进行读取。同时,Kafka也提供了用于查找特定偏移量的API方法seek()定位到特定的偏移量后进行读取。

下面的代码结合ConsumerRebalanceListener和seek()方法演示如何从指定的偏移量位置开始处理消息:

  • // 存储分区和偏移量信息
  • private static Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
  • ...
  • // 创建消费者
  • final KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
  • // 订阅主题
  • kafkaConsumer.subscribe(Arrays.asList("test"), new ConsumerRebalanceListener() {
  • @Override
  • public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
  • System.out.println("发送分区再均衡,需要提交当前Offset:" + currentOffsets);
  • // 提交记录的偏移量信息
  • kafkaConsumer.commitSync(currentOffsets);
  • }
  • @Override
  • public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
  • // 从特定的偏移量读取信息
  • for (TopicPartition partition : partitions)
  • // 定位到特定的偏移量
  • kafkaConsumer.seek(partition, getOffsetForPartition(partition));
  • }
  • private long getOffsetForPartition(TopicPartition partition) {
  • // 从记录的偏移量信息中读取偏移量
  • return currentOffsets.get(partition).offset();
  • }
  • });
  • ...
  • while (true) {
  • // 拉取消息
  • ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);
  • // 遍历拉取到的消息
  • for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
  • System.out.println("[" + consumerRecord.topic() + ":" + consumerRecord.partition() + ":" + consumerRecord.offset() + "]:" + consumerRecord.key() + " -> " + consumerRecord.value());
  • // 将分区和偏移量信息添加到Map中
  • currentOffsets.put(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), new OffsetAndMetadata(consumerRecord.offset() + 1, "no metadata"));
  • }
  • // 异步提交Offset
  • kafkaConsumer.commitAsync(new OffsetCommitCallback() {
  • @Override
  • public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
  • if (exception != null) {
  • System.out.println("Offset异步提交失败");
  • }
  • }
  • });
  • }
  • ...

7. 反序列化器

在前面的例子里,我们假设每个消息的键值对都是字符串,所以我们使用了默认的String Deserializer。与自定义序列化器一样,Kafka允许我们自定义反序列化器,只需要继承自org.apache.kafka.common.serialization.Deserializer类即可;下面是自定义反序列化器的例子:

  • package com.coderap.customer.deserializer;
  • import com.coderap.customer.model.Person;
  • import org.apache.kafka.common.errors.SerializationException;
  • import org.apache.kafka.common.serialization.Deserializer;
  • import java.nio.ByteBuffer;
  • import java.util.Map;
  • public class PersonDeserializer implements Deserializer<Person> {
  • @Override
  • public void configure(Map<String, ?> configs, boolean isKey) {
  • // 可选配置
  • }
  • @Override
  • public Person deserialize(String topic, byte[] data) {
  • int id;
  • String name;
  • int age;
  • try {
  • if (data == null) {
  • return null;
  • }
  • if (data.length < 12) {
  • // 由于有两个int字段,以及表示name字段长度的int值,因此最少有8个字节长度
  • throw new SerializationException("Data size is to short");
  • }
  • ByteBuffer byteBuffer = ByteBuffer.wrap(data);
  • // 读取id
  • id = byteBuffer.getInt();
  • // 读取name长度
  • int nameSize = byteBuffer.getInt();
  • // 创建byte数组
  • byte[] nameBytes = new byte[nameSize];
  • // 读取相应长度的数据到nameBytes
  • byteBuffer.get(nameBytes);
  • // 将nameBytes转换为字符串,即为name字段
  • name = new String(nameBytes, "UTF-8");
  • // 读取age
  • age = byteBuffer.getInt();
  • // 返回Person对象
  • return new Person(id, name, age);
  • } catch (Exception e) {
  • throw new SerializationException("Deserializer Error");
  • }
  • }
  • @Override
  • public void close() {
  • // 可选操作
  • }
  • }

这个反序列化器与前面的序列化器是一对的,都是对Person实例进行操作。如果要使用反序列化器也比较简单,只需要在创建消费者时进行设置即可:

  • // 设置值反序列化器
  • properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.coderap.customer.deserializer.PersonDeserializer");

8. 消费者的退出

在前面的例子中,我们都是在一个无限循环中不断地进行poll操作以拉取消息的,如果确定要退出循环,则可以通过另一个线程调用kafkaConsumer.wakeup()方法。如果循环运行在主线程里,可以在Runtime.getRuntime().addShutdownHook(...)里调用该方法。kafkaConsumer.wakeup()是消费者唯一一个可以从其他线程里安全调用的方法。调用kafkaConsumer.wakeup()可以退出poll(),并抛出WakeupException异常,或者如果调用kafkaConsumer.wakeup()时线程没有等待轮询,那么异常将在下一轮调用poll()时抛出。我们不需要处理WakeupException,因为它只是用于跳出循环的一种方式。不过,在退出线程之前调用kafkaConsumer.close()是很有必要的,它会提交任何还没有提交的东西,并向群组协调器发送消息,告知自己要离开群组,接下来就会触发再均衡,而不需要等待会话超时。

下面是使用kafkaConsumer.wakeup()退出消费者的例子:

  • ...
  • // 创建消费者
  • final KafkaConsumer<String, Person> kafkaConsumer = new KafkaConsumer<>(properties);
  • // 添加shutdownHook
  • final Thread mainThread = Thread.currentThread();
  • Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
  • @Override
  • public void run() {
  • // 停止消费者
  • kafkaConsumer.wakeup();
  • try {
  • // 主线程join
  • mainThread.join();
  • } catch (InterruptedException e) {
  • e.printStackTrace();
  • }
  • }
  • }));
  • ...