大数据
流式处理
Kafka

Kafka系列 02——生产者的基本使用

简介:Kafka生产者是提供给开发者用于向Kafka中写入消息的组件。

1. 生产者

要往Kafka写入消息,首先要创建一个生产者对象,并设置一些属性。Kafka生产者有3个必选的属性:

  • bootstrap.servers:该属性指定broker的地址清单,地址的格式为host:port,多个broker地址之间用,分隔。清单里不需要包含所有的broker地址,生产者会从给定的broker里查找到其他broker的信息。不过建议至少要提供两个broker的信息,一旦其中一个宕机,生产者仍然能够连接到集群上。
  • key.serializer:broker希望接收到的消息的键和值都是字节数组。生产者接口允许使用参数化类型,因此可以把Java对象作为键和值发送给broker。这样的代码具有良好的可读性,不过生产者需要知道如何把这些Java对象转换成字节数组。key.serializer必须被设置为一个实现了org.apache.kafka.common.serialization.Serializer接口的类,生产者会使用这个类把键对象序列化成字节数组。Kafka客户端默认提供了ByteArraySerializer、StringSerializer和IntegerSerializer,因此,如果你只使用常见的几种Java对象类型,那么就没必要实现自己的序列化器。要注意,key.serializer是必须设置的,就算你打算只发送值内容。
  • value.serializer:与key.serializer一样,value.serializer指定的类会将值序列化。如果键和值都是字符串,可以使用与key.serializer一样的序列化器。如果键是整数类型而值是字符串,那么需要使用不同的序列化器。

生产者发送消息主要有以下三种方式:

  1. 发送并忘记(fire-and-forget):我们把消息发送给服务器,但并不关心它是否正常到达。大多数情况下,消息会正常到达,因为Kafka是高可用的,而且生产者会自动尝试重发。不过,使用这种方式有时候也会丢失一些消息。
  2. 同步发送:我们使用send()方法发送消息,它会返回一个Future对象,调用get()方法进行等待,就可以知道消息是否发送成功。
  3. 异步发送:我们调用send()方法,并指定一个回调函数,服务器在返回响应时调用该函数。

使用Java代码实现生产者主要会使用到Kafka包中的以下三个类:

  1. org.apache.kafka.clients.producer.ProducerConfig:该类用于配置生产者,其中归纳了生产者所有的配置项的键名。
  2. org.apache.kafka.clients.producer.KafkaProducer:Kafka生产者的消息发送类,主要用于发送消息。
  3. org.apache.kafka.clients.producer.ProducerRecord:用于包装定义要发送的消息,可以传入主题、分区、键和值等信息来定制需要发送的消息。

注:这里演示的Kafka生产者的代码都是基于新版本API使用的,位于org.apache.kafka.clients包内,新版本API具有更高的灵活性。Kafka.producer包内还提供了两个旧版本的生产者API:SyncProducer(根据acks参数的具体配置情况,在发送更多的消息之前,它会等待服务器对已发消息或批次进行确认)和AsyncProducer(在后台将消息分为不同的批次,使用单独的线程发送这些批次,不为客户端提供发送结果)。这两个生产者属于Kafka的核心模块。

1.1. 同步发送消息

下面的代码片段演示了如何创建一个新的生产者,并使用同步方式向test主题发送消息;这里只指定了必要的属性,其他使用默认设置:

  • package com.coderap.producer;
  • import org.apache.kafka.clients.producer.BufferExhaustedException;
  • import org.apache.kafka.clients.producer.KafkaProducer;
  • import org.apache.kafka.clients.producer.ProducerRecord;
  • import org.apache.kafka.clients.producer.RecordMetadata;
  • import org.apache.kafka.common.errors.InterruptException;
  • import org.apache.kafka.common.errors.SerializationException;
  • import org.apache.kafka.common.errors.TimeoutException;
  • import java.util.Properties;
  • import java.util.concurrent.ExecutionException;
  • import java.util.concurrent.Future;
  • public class SyncProducerTest {
  • public static void main(String[] args) {
  • // 生产者需要的配置信息
  • Properties properties = new Properties();
  • // broker列表
  • properties.put("bootstrap.servers", "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");
  • // 键序列化器
  • properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  • // 值序列化器
  • properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  • KafkaProducer<String, String> kafkaProducer = null;
  • try {
  • // 根据配置信息创建生产者
  • kafkaProducer = new KafkaProducer<>(properties);
  • // 循环发送消息
  • for (int i = 0; i < 10; i++) {
  • ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test", "key-" + i, "value-" + i);
  • try {
  • System.out.println("发送消息" + "key-" + i + ", value-" + i);
  • // 发送消息
  • Future<RecordMetadata> sendResult = kafkaProducer.send(producerRecord);
  • // 获取发送结果
  • RecordMetadata recordMetadata = sendResult.get();
  • System.out.println("发送结果:Topic -> " + recordMetadata.topic() + ", Partition -> " + recordMetadata.partition() + ", Offset -> " + recordMetadata.offset());
  • } catch (SerializationException e) {
  • System.out.println("序列化错误:" + e.getMessage());
  • e.printStackTrace();
  • } catch (BufferExhaustedException e) {
  • System.out.println("缓冲区已满:" + e.getMessage());
  • e.printStackTrace();
  • } catch (TimeoutException e) {
  • System.out.println("超时:" + e.getMessage());
  • e.printStackTrace();
  • } catch (InterruptException e) {
  • System.out.println("线程被中断:" + e.getMessage());
  • e.printStackTrace();
  • } catch (InterruptedException e) {
  • System.out.println("获取发送结果被中断:" + e.getMessage());
  • e.printStackTrace();
  • } catch (ExecutionException e) {
  • e.printStackTrace();
  • }
  • }
  • } finally {
  • if (kafkaProducer != null) {
  • try {
  • kafkaProducer.close();
  • } catch (Exception e) {
  • System.out.println("关闭生产者出错:" + e.getMessage());
  • e.printStackTrace();
  • }
  • }
  • }
  • }
  • }

运行上述代码,没有出现异常的话,打印的信息如下:

  • 发送消息key-0, value-0
  • 发送结果:Topic -> test, Partition -> 1, Offset -> 2020
  • 发送消息key-1, value-1
  • 发送结果:Topic -> test, Partition -> 0, Offset -> 1935
  • 发送消息key-2, value-2
  • 发送结果:Topic -> test, Partition -> 2, Offset -> 2043
  • 发送消息key-3, value-3
  • 发送结果:Topic -> test, Partition -> 2, Offset -> 2044
  • 发送消息key-4, value-4
  • 发送结果:Topic -> test, Partition -> 0, Offset -> 1936
  • 发送消息key-5, value-5
  • 发送结果:Topic -> test, Partition -> 2, Offset -> 2045
  • 发送消息key-6, value-6
  • 发送结果:Topic -> test, Partition -> 2, Offset -> 2046
  • 发送消息key-7, value-7
  • 发送结果:Topic -> test, Partition -> 1, Offset -> 2021
  • 发送消息key-8, value-8
  • 发送结果:Topic -> test, Partition -> 1, Offset -> 2022
  • 发送消息key-9, value-9
  • 发送结果:Topic -> test, Partition -> 0, Offset -> 1937

注:上述代码中,在填写Properties对象的配置信息时,使用的都是字符串,这种方式是比较容易出错的。Kafka将Producer的配置项的键都统一归纳到org.apache.kafka.clients.producer.ProducerConfig类中,我们可以使用直接使用该类,如:

  • // broker列表
  • properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");
  • // 键序列化器
  • properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  • // 值序列化器
  • properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "com.coderap.producer.serializer.PersonSerializer");

1.2. 异步发送消息

上面的例子演示的是同步模式的消息发送方式,Kafka还提供了异步发送模式,以避免处理消息发送的过程影响发送过程的吞吐性能。在异步发送模式中,Kafka为生产者提供了回调支持,下面的代码是异步发送的例子:

  • package com.coderap.producer;
  • import org.apache.kafka.clients.producer.*;
  • import org.apache.kafka.common.errors.InterruptException;
  • import org.apache.kafka.common.errors.SerializationException;
  • import org.apache.kafka.common.errors.TimeoutException;
  • import java.util.Properties;
  • public class AsyncProducerTest {
  • public static void main(String[] args) {
  • // 生产者需要的配置信息
  • Properties properties = new Properties();
  • // broker列表
  • properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");
  • // 键序列化器
  • properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  • // 值序列化器
  • properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "com.coderap.producer.serializer.PersonSerializer");
  • KafkaProducer<String, String> kafkaProducer = null;
  • try {
  • // 根据配置信息创建生产者
  • kafkaProducer = new KafkaProducer<>(properties);
  • // 循环发送消息
  • for (int i = 0; i < 10; i++) {
  • ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test", "key-" + i, "value-" + i);
  • try {
  • System.out.println("发送消息" + "key-" + i + ", value-" + i);
  • // 发送消息
  • kafkaProducer.send(producerRecord, new Callback() {
  • @Override
  • public void onCompletion(RecordMetadata recordMetadata, Exception e) {
  • if (e != null) {
  • System.out.println("获取发送结果出错:" + e.getMessage());
  • } else {
  • // 异步获取发送结果
  • System.out.println("发送结果:Topic -> " + recordMetadata.topic() + ", Partition -> " + recordMetadata.partition() + ", Offset -> " + recordMetadata.offset());
  • }
  • }
  • });
  • } catch (SerializationException e) {
  • System.out.println("序列化错误:" + e.getMessage());
  • e.printStackTrace();
  • } catch (BufferExhaustedException e) {
  • System.out.println("缓冲区已满:" + e.getMessage());
  • e.printStackTrace();
  • } catch (TimeoutException e) {
  • System.out.println("超时:" + e.getMessage());
  • e.printStackTrace();
  • } catch (InterruptException e) {
  • System.out.println("线程被中断:" + e.getMessage());
  • e.printStackTrace();
  • }
  • }
  • } finally {
  • if (kafkaProducer != null) {
  • try {
  • kafkaProducer.close();
  • } catch (Exception e) {
  • System.out.println("关闭生产者出错:" + e.getMessage());
  • e.printStackTrace();
  • }
  • }
  • }
  • }
  • }

从代码中可以看出,异步与同步的区别在于,调用KafkaProducer对象的send()方法时传入了第二个参数,该参数是一个org.apache.kafka.clients.producer.Callback类型对象;Callback是一个接口,我们需要实现其中的onCompletion(RecordMetadata, Exception)方法获取最终返回的发送结果以及可能产生的异常。

2. 生产者相关配置

Kafka生产者提供了一些可配置的参数,大部分都有合理的默认值,有几个参数在内存使用、性能和可靠性方面对生产者影响比较大:

  1. acks:该参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。这个参数对消息丢失的可能性有重要影响。该参数有如下选项。
  • 如果acks=0,生产者在成功写入消息之前不会等待任何来自服务器的响应。也就是说,如果当中出现了问题,导致服务器没有收到消息,那么生产者就无从得知,消息也就丢失了。不过,因为生产者不需要等待服务器的响应,所以它可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量。
  • 如果acks=1,只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功响应。如果消息无法到达首领节点(比如首领节点崩溃,新的首领还没有被选举出来),生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。不过,如果一个没有收到消息的节点成为新首领,消息还是会丢失。这个时候的吞吐量取决于使用的是同步发送还是异步发送。如果让发送客户端等待服务器的响应(通过调用Future对象的get()方法),显然会增加延迟(在网络上传输一个来回的延迟)。如果客户端使用回调,延迟问题就可以得到缓解,不过吞吐量还是会受发送中消息数量的限制(比如,生产者在收到服务器响应之前可以发送多少个消息)。
  • 如果acks=all,只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。这种模式是最安全的,它可以保证不止一个服务器收到消息,就算有服务器发生崩溃,整个集群仍然可以运行。不过,它的延迟比acks=1时更高,因为我们要等待不只一个服务器节点接收消息。
  1. buffer.memory:该参数用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。如果应用程序发送消息的速度超过发送到服务器的速度,会导致生产者空间不足。这个时候,send()方法调用要么被阻塞,要么抛出异常,取决于如何设置block.on.buffer.full参数(在0.9.0.0版本里被替换成了max.block.ms,表示在抛出异常之前可以阻塞一段时间)。
  2. compression.type:默认情况下,消息发送时不会被压缩。该参数可以设置为snappy、gzip或lz4,它指定了消息被发送给broker之前使用哪一种压缩算法进行压缩。
  3. retries:retries参数的值决定了生产者在遇到临时性错误时可以重发消息的次数,如果达到这个次数,生产者会放弃重试并返回错误。默认情况下,生产者会在每次重试之间等待100ms,可以通过retry.backoff.ms参数来改变这个时间间隔。
  4. batch.size:当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算(而不是消息个数)。当批次被填满,批次里的所有消息会被发送出去。不过生产者并不一定都会等到批次被填满才发送,半满的批次,甚至只包含一个消息的批次也有可能被发送。所以就算把批次大小设置得很大,也不会造成延迟,只是会占用更多的内存而已。但如果设置得太小,因为生产者需要更频繁地发送消息,会增加一些额外的开销。
  5. linger.ms:该参数指定了生产者在发送批次之前等待更多消息加入批次的时间。KafkaProducer会在批次填满或linger.ms达到上限时把批次发送出去。默认情况下,只要有可用的线程,生产者就会把消息发送出去,就算批次里只有一个消息。把linger.ms设置成比0大的数,让生产者在发送批次之前等待一会儿,使更多的消息加入到这个批次。虽然这样会增加延迟,但也会提升吞吐量(因为一次性发送更多的消息,每个消息的开销就变小了)。
  6. client.id:该参数可以是任意的字符串,服务器会用它来识别消息的来源,还可以用在日志和配额指标里。
  7. max.in.flight.requests.per.connection:该参数指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量。把它设为1可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。
  8. timeout.msrequest.timeout.msmetadata.fetch.timeout.msrequest.timeout.ms指定了生产者在发送数据时等待服务器返回响应的时间,metadata.fetch.timeout.ms指定了生产者在获取元数据(比如目标分区的首领是谁)时等待服务器返回响应的时间。如果等待响应超时,那么生产者要么重试发送数据,要么返回一个错误(抛出异常或执行回调)。timeout.ms指定了broker等待同步副本返回消息确认的时间,与asks的配置相匹配——如果在指定时间内没有收到同步副本的确认,那么broker就会返回一个错误。
  9. max.block.ms:该参数指定了在调用send()方法或使用partitionsFor()方法获取元数据时生产者的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会阻塞。在阻塞时间达到max.block.ms`时,生产者会抛出超时异常。
  10. max.request.size:该参数用于控制生产者发送的请求大小。它可以指能发送的单个消息的最大值,也可以指单个请求里所有消息总的大小。例如,假设这个值为1MB,那么可以发送的单个最大消息为1MB,或者生产者可以在单个请求里发送一个批次,该批次包含了1000个消息,每个消息大小为1KB。另外,broker对可接收的消息最大值也有自己的限制(message.max.bytes),所以两边的配置最好可以匹配,避免生产者发送的消息被broker拒绝。
  11. receive.buffer.bytessend.buffer.bytes:这两个参数分别指定了TCP Socket接收和发送数据包的缓冲区大小。如果它们被设为-1,就使用操作系统的默认值。如果生产者或消费者与broker处于不同的数据中心,那么可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。

3. 消息顺序性

Kafka可以保证同一个分区里的消息是有序的。也就是说,如果生产者按照一定的顺序发送消息,broker就会按照这个顺序把它们写入分区,消费者也会按照同样的顺序读取它们。在某些情况下,顺序是非常重要的。

如果把retries设为非零整数,同时把max.in.flight.requests.per.connection设为比1大的数,那么,如果第一个批次消息写入失败,而第二个批次写入成功,broker会重试写入第一个批次。如果此时第一个批次也写入成功,那么两个批次的顺序就反过来了。

一般来说,如果某些场景要求消息是有序的,那么消息是否写入成功也是很关键的,所以不建议把retries设为0。可以把max.in.flight.requests.per.connection设为1,这样在生产者尝试发送第一批消息时,就不会有其他的消息发送给broker。不过这样会严重影响生产者的吞吐量,所以只有在对消息的顺序有严格要求的情况下才能这么做。

4. 序列化器

Kafka发送的消息对象必须是可以序列化的对象,因此在使用生产者向Kafka发送消息时要保证发送的键和值支持序列化操作,可以通过实现Serialization接口来使用Java内置的序列化机制。在实际的生产环境中,为了达到更好的序列化和反序列性能,可以使用序列化框架来对消息进行序列化,如Avro、Thrift或Protobuf,或者使用自定义序列化器。下面将介绍如何自定义一个序列化器。

假设我们需要发送的消息值是一个Person对象,代码如下:

  • package com.coderap.producer.model;
  • public class Person {
  • private int id;
  • private String name;
  • private int age;
  • public Person(int id, String name, int age) {
  • this.id = id;
  • this.name = name;
  • this.age = age;
  • }
  • public int getId() {
  • return id;
  • }
  • public void setId(int id) {
  • this.id = id;
  • }
  • public String getName() {
  • return name;
  • }
  • public void setName(String name) {
  • this.name = name;
  • }
  • public int getAge() {
  • return age;
  • }
  • public void setAge(int age) {
  • this.age = age;
  • }
  • }

我们可以通过实现org.apache.kafka.common.serialization.Serializer接口来自定义序列化器,代码如下:

  • package com.coderap.producer.serializer;
  • import com.coderap.producer.model.Person;
  • import org.apache.kafka.common.errors.SerializationException;
  • import org.apache.kafka.common.serialization.Serializer;
  • import java.nio.ByteBuffer;
  • import java.util.Map;
  • public class PersonSerializer implements Serializer<Person> {
  • @Override
  • public void configure(Map<String, ?> configs, boolean isKey) {
  • // 可选配置
  • }
  • @Override
  • public byte[] serialize(String topic, Person data) {
  • try {
  • // 序列化数组
  • byte[] serializedName;
  • // 名字长度
  • int nameSize;
  • // 如果data为空,直接返回null
  • if (data == null)
  • return null;
  • else {
  • // 否则先计算name字段需要占几个字节
  • if (data.getName() != null) {
  • serializedName = data.getName().getBytes("UTF-8");
  • nameSize = serializedName.length;
  • } else {
  • serializedName = new byte[0];
  • nameSize = 0;
  • }
  • // id占4字节,表示name的长度的nameSize占4字节,name占nameSize个字节,age占4字节
  • ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + nameSize + 4);
  • // 放入id
  • buffer.putInt(data.getId());
  • // 放入name长度
  • buffer.putInt(nameSize);
  • // 放入name
  • buffer.put(serializedName);
  • // 放入age
  • buffer.putInt(data.getAge());
  • return buffer.array();
  • }
  • } catch (Exception e) {
  • throw new SerializationException("Error when serializing Person to byte[] " + e);
  • }
  • }
  • @Override
  • public void close() {
  • // 可选操作
  • }
  • }

在自定义序列化器中,需要实现的主要方法是serialize(String topic, Person data),该方法会传入消息写入的主题和消息对象,主要的序列化逻辑则开发者实现,最后只需要返回一个byte类型的数组即可。

使用序列化器则只需要在构建KafkaProducer时传入的Properties对象中,配置相应的键值即可,如:properties.put("value.serializer", "com.coderap.producer.serializer.PersonSerializer");

5. 消息分区

以上述的示例代码为例,每次我们需要发送的消息都被封装为一个ProducerRecord对象,这里使用到的ProducerRecord类的构造方法有三个参数:目标主题、键和值。ProducerRecord对象可以只包含目标主题和值,键可以设置为默认的null,不过大多数应用程序会用到键。键有两个用途:可以作为消息的附加信息,也可以用来决定消息该被写到主题的哪个分区。拥有相同键的消息将被写到同一个分区。也就是说,如果一个进程只从一个主题的分区读取数据,那么具有相同键的所有记录都会被该进程读取。

如果键值为null,并且使用了默认的分区器,那么记录将被随机地发送到主题内各个可用的分区上。分区器使用轮询(RoundRobin)算法将消息均衡地分布到各个分区上。

如果键不为空,并且使用了默认的分区器,那么Kafka会对键进行散列(使用Kafka自己的散列算法,即使升级Java版本,散列值也不会发生变化),然后根据散列值把消息映射到特定的分区上。这里的关键之处在于,同一个键总是被映射到同一个分区上,所以在进行映射时,我们会使用主题所有的分区,而不仅仅是可用的分区。这也意味着,如果写入数据的分区是不可用的,那么就会发生错误。但这种情况很少发生。

5.1. 实现自定义分区策略

Kafka默认提供的分区器可能并不能满足某些场景,因此Kafka提供了org.apache.kafka.clients.producer.Partitioner接口,我们可以通过实现该接口来自定义分区器,以达到手动分区的效果,示例代码如下:

  • package com.coderap.producer.partitioner;
  • import org.apache.kafka.clients.producer.Partitioner;
  • import org.apache.kafka.common.Cluster;
  • import org.apache.kafka.common.PartitionInfo;
  • import org.apache.kafka.common.record.InvalidRecordException;
  • import org.apache.kafka.common.utils.Utils;
  • import java.util.List;
  • import java.util.Map;
  • public class CustomPartitioner implements Partitioner {
  • @Override
  • public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
  • List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
  • int numPartitions = partitions.size();
  • if ((keyBytes == null) || !(key instanceof String)) {
  • throw new InvalidRecordException("消息缺少键,无法自定义分区");
  • }
  • // 散列
  • return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1));
  • }
  • @Override
  • public void close() {
  • // 可选操作
  • }
  • @Override
  • public void configure(Map<String, ?> configs) {
  • // 可选配置
  • }
  • }

如果需要应用自定义分区器,只需要在构造KafkaProducer时传入的Properties对象中配置partitioner.class项为自定义分区类的全限定名即可,如properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.coderap.producer.partitioner.CustomPartitioner");

推荐阅读

Java虚拟机06——垃圾收集器之CMS

Java
Java虚拟机

2015-01-29 0 328

Java虚拟机在执行Java程序的过程中会把它所管理的内存划分为若干个不同的数据区域。这些区域都有各自的用途,以及创建和...

目录