大数据
流式处理
Kafka

Kafka系列 01——基本概念

简介:Kafka是分布式的、分区的且具有副本机制的消息服务器。它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现。Kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外Kafka集群有多个Kafka实例组成,每个实例(server)成为broker。无论是Kafka集群,还是Producer和Consumer都依赖于Zookeeper来保证系统可用性集群保存一些meta信息。

1. Kafka介绍

注:本系列博客根据kafka_2.11-0.10.0.1版本进行编写。

更新历程:

  • 新版的0.9.0版把Consumer的The high-level Consumer API和The SimpleConsumer API结合到了一起。

Kafka是分布式的、分区的且具有副本机制的消息服务器。它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现。Kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外Kafka集群有多个Kafka实例组成,每个实例(server)成为broker。无论是Kafka集群,还是Producer和Consumer都依赖于Zookeeper来保证系统可用性集群保存一些meta信息。

1.Kafka架构.png

1.1. Topics / logs

一个Topic可以认为是一类消息,每个Topic将被分成多个partition(区),每个partition在物理存储层面是append log文件。任何发布到此partition的消息都会被直接追加到log文件的尾部,每条消息在文件中的位置称为offset(偏移量),offset为一个long型数字,它是唯一标记一条消息,唯一的标记一条消息。Kafka并没有提供其他额外的索引机制来存储offset,因为在Kafka中几乎不允许对消息进行“随机读写”。正是由于Kafka对log文件实行是线性存储和读写,从而让它的消息系统达到了更高的吞吐性能。

2.Topic消息写入.png

Kafka和JMS(Java Message Service)实现(ActiveMQ)不同的是:即使消息被消费,消息仍然不会被立即删除。日志文件将会根据broker中的配置要求,保留一定的时间之后删除;比如log文件保留7天,那么7天后,文件会被清除,无论其中的消息是否被消费。Kafka通过这种简单的手段,来释放磁盘空间,以及减少消息消费之后对文件内容改动的磁盘IO开支。

对于Consumer而言,它需要保存消费消息的offset,对于offset的保存和使用,有Consumer来控制;当Consumer正常消费消息时,offset将会”线性“的向前驱动,即消息将依次顺序被消费。事实上Consumer可以使用任意顺序消费消息,它只需要将offset重置为任意值。

Kafka集群几乎不需要维护任何Consumer和Producer状态信息,这些信息有Zookeeper保存;因此Producer和Consumer的客户端实现非常轻量级,它们可以随意离开,而不会对集群造成额外的影响。

partitions的设计目的有多个。最根本原因是Kafka基于文件存储。通过分区,可以将日志内容分散到多个服务器节点上,来避免文件尺寸达到单机磁盘的上限,每个partiton都会被当前服务器节点(Kafka实例)保存;可以将一个Topic切分多任意多个partitions,来消息保存/消费的效率。此外越多的partitions意味着可以容纳更多的Consumer,有效提升并发消费的能力。

1.2. Distribution

一个Topic的多个partitions,被分布在Kafka集群中的多个服务器节点上;每个服务器节点(Kafka实例)负责partitions中消息的读写操作;此外Kafka还可以配置partitions需要备份的个数(replicas),每个partition将会被备份到多台机器上,以提高可用性。

基于replicated方案,那么就意味着需要对多个备份进行调度;每个partition都有一个服务器节点为”leader”;leader负责所有的读写操作,如果leader失效,那么将会有其他follower来接管(成为新的leader);follower只是单调的和leader跟进,同步消息即可。。由此可见作为leader的服务器节点承载了全部的请求压力,因此从集群的整体考虑,有多少个partitions就意味着有多少个”leader”,Kafka会将”leader”均衡的分散在每个实例上,来确保整体的性能稳定。

Producers:Producer将消息发布到指定的Topic中,同时Producer也能决定将此消息归属于哪个partition;比如基于”round-robin”方式或者通过其他的一些算法等。

Consumers:本质上Kafka只支持Topic。每个Consumer属于一个Consumer Group;反过来说,每个Group中可以有多个Consumer。发送到Topic的消息,只会被订阅此Topic的每个Group中的一个Consumer消费。

如果所有的Consumer都具有相同的Group,这种情况和queue模式很像;消息将会在Consumers之间负载均衡;如果所有的Consumer都具有不同的Group,那这就是”发布-订阅”;消息将会广播给所有的消费者。

在Kafka中,一个partition中的消息只会被Group中的一个Consumer消费;每个Group中Consumer消息消费互相独立;我们可以认为一个Group是一个”订阅”者,一个Topic中的每个partions,只会被一个”订阅者”中的一个Consumer消费,不过一个Consumer可以消费多个partitions中的消息。Kafka只能保证一个partition中的消息被某个Consumer消费时,消息是顺序的。事实上,从Topic角度来说,消息仍不是有序的。

Kafka的设计原理决定,对于一个Topic,同一个Group中不能有多于partitions个数的Consumer同时消费,否则将意味着某些Consumer将无法得到消息。

2. Kafka安装

生产环境中,Kafka一般是集群部署,但Kafka也支持单机部署。在下面的例子中将分别介绍单机部署和完全分布式部署的过程。

2.1. Kafka单机模式

单机环境Kakfa的安装方式非常简单,下载安装包解压,配置环境变量即可:

  • ubuntu@s100:~/software$ tar zxf kafka_2.11-0.10.0.1.tgz -C /soft/
  • ubuntu@s100:~/software$ cd /soft/
  • ubuntu@s100:/soft$ ln -s kafka_2.11-0.10.0.1 kafka

配置环境变量如下:

  • KAFKA_HOME=/soft/kafka
  • PATH="/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/soft/jdk/bin:/soft/hadoop/bin:/soft/hadoop/sbin:/soft/eclipse:/soft/maven/bin:/soft/hive/bin:/soft/hbase/bin:/soft/zk/bin:/soft/pig/bin:/soft/sqoop/bin:/soft/flume/bin:/soft/kafka/bin"

同时,Kafka是依赖Zookeeper集群的,我们开启s101、s102及s103上的Zookeeper集群后,配置${KAFKA_HOME}/config目录下的server.properties文件的Zookeeper集群项如下:

  • zookeeper.connect=s101:2181,s102:2181

接下来尝试启动Kafka:

  • ubuntu@s100:/soft/kafka$ kafka-server-start.sh config/server.properties
  • ...
  • [2017-07-24 05:36:21,307] INFO [Kafka Server 0], started (kafka.server.KafkaServer)

启动后,我们可以查看Zookeeper上有关Kafka的数据节点:

  • [zk: localhost:2181(CONNECTED) 1] ls /
  • [controller, controller_epoch, brokers, zookeeper, flume, admin, isr_change_notification, consumers, config, hbase]
  • [zk: localhost:2181(CONNECTED) 2] ls /config
  • [changes, clients, topics]

2.1.1. 简单示例

  1. 创建Topic

我们可以创建一个Topic:

  • ubuntu@s100:/soft/kafka$ kafka-topics.sh --create --zookeeper s101:2181 --replication-factor 1 --partitions 1 --topic test
  • Created topic "test".
  • ubuntu@s100:/soft/kafka$ [2017-07-24 06:52:26,630] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager)
  • [2017-07-24 06:52:26,690] INFO Completed load of log test-0 with log end offset 0 (kafka.log.Log)
  • [2017-07-24 06:52:26,702] INFO Created log for partition [test,0] in /tmp/kafka-logs with properties {compression.type -> producer, message.format.version -> 0.10.0-IV1, file.delete.delay.ms -> 60000, max.message.bytes -> 1000012, message.timestamp.type -> CreateTime, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> true, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> delete, flush.ms -> 9223372036854775807, segment.ms -> 604800000, segment.bytes -> 1073741824, retention.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.LogManager)
  • [2017-07-24 06:52:26,706] INFO Partition [test,0] on broker 0: No checkpointed highwatermark is found for partition [test,0] (kafka.cluster.Partition)

注1:关于replication副本。broker以消息达到的顺序进行存储。对每个主题可配置分区。副本技术可以保证broker故障,消息仍能被生产和消费(容灾)。Kafka可以支撑n-1的broker故障。
注2:isr(in-sync replica),同步副本,用于leader推选。
注3:kafka支持的副本模式:
1. 同步副本:生产者从zk中确认leader,发送消息。消息写入leader的log,followers从leader pull出消息,一旦消息被follower写入本地,回传确认信息给leader。如果leader收到了所有的确认信息,再发送确认消息给producer;消费者从leader中获取消息。
2. 异步副本:只要leader写入本地log,即刻发送ack信息给producer。

  1. 查看Topic

此时可以在Zookeeper上查看创建的Topic:

  • [zk: localhost:2181(CONNECTED) 3] ls /brokers/topics
  • [test]
  • [zk: localhost:2181(CONNECTED) 4] get /brokers/topics/test
  • {"version":1,"partitions":{"0":[0]}}
  • cZxid = 0x100000001d
  • ctime = Mon Jul 24 06:52:26 PDT 2017
  • mZxid = 0x100000001d
  • mtime = Mon Jul 24 06:52:26 PDT 2017
  • pZxid = 0x1000000021
  • cversion = 1
  • dataVersion = 0
  • aclVersion = 0
  • ephemeralOwner = 0x0
  • dataLength = 36
  • numChildren = 1

使用Kafka查看刚刚创建的Topic:

  • ubuntu@s100:/soft/kafka$ kafka-topics.sh --list --zookeeper s101:2181
  • test

注:Kafka默认的服务端口是9092:

  • ubuntu@s100:/soft/kafka$ netstat -ano | grep 9092
  • tcp6 0 0 :::9092 :::* LISTEN off (0.00/0/0)
  • tcp6 0 0 192.168.127.100:60326 192.168.127.100:9092 ESTABLISHED keepalive (7002.03/0/0)
  • tcp6 0 0 192.168.127.100:9092 192.168.127.100:60326 ESTABLISHED keepalive (7002.03/0/0)
  1. 创建控制台消费者
  • ubuntu@s100:/soft/kafka$ kafka-console-consumer.sh --zookeeper s101:2181 --topic test --from-beginning

此时它会一直阻塞,等待生产者生产内容。

  1. 创建控制台生产者

我们可以创建一个生产者,并且在等待命令行上发送一条消息:

  • ubuntu@s100:/soft/kafka$ kafka-console-producer.sh --broker-list localhost:9092 --topic test
  • hello world

此时再消费者的控制台上会收到这条消息。Kafka中,生产者发送的消息会源源不断地转发给消费者,即便消费者不在线,Kafka会保存所有的历史消息,等待消费者下次上线之后,可以通过偏移量读取指定的离线消息。

注:在{KAFKA_HOME}/conf/producer.properties文件中可以指定生产消息的压缩方式compression.type=none | gzip | lz4 | snappy,这些内容会在消息的header中以最后两位进行表示(00,01,10,11)。

2.2. Kafka完全分布式

Kafka的完全分布式搭建也相对简单,只需要在多台主机上安装Kafka的安装包,配置相应的server.properties文件即可。我们选择在s101、s102及s103上安装Kafka:

首先Kafka的安装包及环境变量文件到相应的主机:

  • ubuntu@s100:/soft$ xrsync kafka_2.11-0.10.0.1
  • filepath is /soft/kafka_2.11-0.10.0.1
  • ---------------- s101 --------------------
  • rsync -lr /soft/kafka_2.11-0.10.0.1 ubuntu@s101:/soft/kafka_2.11-0.10.0.1
  • ---------------- s102 --------------------
  • rsync -lr /soft/kafka_2.11-0.10.0.1 ubuntu@s102:/soft/kafka_2.11-0.10.0.1
  • ---------------- s103 --------------------
  • rsync -lr /soft/kafka_2.11-0.10.0.1 ubuntu@s103:/soft/kafka_2.11-0.10.0.1
  • ---------------- s104 --------------------
  • rsync -lr /soft/kafka_2.11-0.10.0.1 ubuntu@s104:/soft/kafka_2.11-0.10.0.1
  • ---------------- s105 --------------------
  • rsync -lr /soft/kafka_2.11-0.10.0.1 ubuntu@s105:/soft/kafka_2.11-0.10.0.1
  • ---------------- s106 --------------------
  • rsync -lr /soft/kafka_2.11-0.10.0.1 ubuntu@s106:/soft/kafka_2.11-0.10.0.1
  • ubuntu@s100:/soft$ sudo xrsync /etc/environment
  • [sudo] password for ubuntu:
  • filepath is /etc/environment
  • ---------------- s101 --------------------
  • rsync -lr /etc/environment root@s101:/etc/environment
  • ---------------- s102 --------------------
  • rsync -lr /etc/environment root@s102:/etc/environment
  • ---------------- s103 --------------------
  • rsync -lr /etc/environment root@s103:/etc/environment
  • ---------------- s104 --------------------
  • rsync -lr /etc/environment root@s104:/etc/environment
  • ---------------- s105 --------------------
  • rsync -lr /etc/environment root@s105:/etc/environment
  • ---------------- s106 --------------------
  • rsync -lr /etc/environment root@s106:/etc/environment

生成软连接并source环境变量:

  • ubuntu@s100:/soft$ xcalln 'ln -s /soft/kafka_2.11-0.10.0.1 /soft/kafka'
  • ubuntu@s100:/soft$ xcalln 'source /etc/environment'

接下来修改三台主机上的server.properties文件,需要修改的是以下几项:

  • broker.id=101
  • LISTENERS=PLAINTEXT://0.0.0.0:9092
  • zookeeper.connect=s101:2181,s102:2181,s103:2181

其中不同的是broker.id配置项,这个配置项需要不一样,一般配置为主机名。

配置完好后,可以尝试启动三台主机上的Kafka:

  • ubuntu@s101:~$ kafka-server-start.sh /soft/kafka/config/server.properties &
  • ...
  • ubuntu@s102:~$ kafka-server-start.sh /soft/kafka/config/server.properties &
  • ...
  • ubuntu@s103:~$ kafka-server-start.sh /soft/kafka/config/server.properties &

此时我们可以查看Zookeeper上的节点情况:

  • ubuntu@s100:/soft$ zkCli.sh -server s101:2181
  • ...
  • [zk: s101:2181(CONNECTED) 1] ls /brokers
  • [ids, topics, seqid]
  • [zk: s101:2181(CONNECTED) 2] ls /brokers/ids
  • [101, 102, 103]

查看启动的进程:

  • ubuntu@s100:/soft$ xcall jps
  • ----------- local execute ------------
  • 3615 Jps
  • ----------- s101 execute -----------
  • 2409 Jps
  • 1642 QuorumPeerMain
  • 2125 Kafka
  • ----------- s102 execute -----------
  • 1648 QuorumPeerMain
  • 2162 Jps
  • 1882 Kafka
  • ----------- s103 execute -----------
  • 1681 QuorumPeerMain
  • 1954 Kafka
  • 2234 Jps
  • ----------- s104 execute -----------
  • 1873 Jps
  • ----------- s105 execute -----------
  • 1839 Jps
  • ----------- s106 execute -----------
  • 1896 Jps

3. Kafka集群成员

Kafka集群由多个部署有Kafka程序的服务器节点构成,每一个独立的Kafka服务器被称为broker,Kafka使用Zookeeper来维护集群成员的信息。

3.1. Broker

broker是集群的组成部分。broker接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。broker为消费者提供服务,对读取分区的请求作出响应,返回已经提交到磁盘上的消息。根据特定的硬件及其性能特征,单个broker可以轻松处理数千个分区以及每秒百万级的消息量。

Kafka使用Zookeeper来维护broker的信息。每个broker都有一个唯一标识符,这个标识符可以在配置文件里指定,也可以自动生成。在broker启动的时候,它通过创建临时节点把自己的ID注册到Zookeeper。Kafka组件订阅Zookeeper的/brokers/ids路径(broker在Zookeeper上的注册路径),当有broker加入集群或退出集群时,这些组件就可以获得通知。

如果你要启动另一个具有相同ID的broker,会得到一个错误——新broker会试着进行注册,但不会成功,因为Zookeeper里已经有一个具有相同ID的broker。

在broker停机、出现网络分区或长时间垃圾回收停顿时,broker会从Zookeeper上断开连接,此时broker在启动时创建的临时节点会自动从Zookeeper上移除。监听broker列表的Kafka组件会被告知该broker已移除。

在关闭broker时,它对应的节点也会消失,不过它的ID会继续存在于其他数据结构中。例如,主题的副本列表(下面会介绍)里就可能包含这些ID。

注:在完全关闭一个broker之后,如果使用相同的ID启动另一个全新的broker,它会立即加入集群,并拥有与旧broker相同的分区和主题。

3.2. 控制器

控制器其实就是一个broker,只不过它除了具有一般broker的功能之外,还负责分区首领的选举。

  1. 控制器的产生

集群里第一个启动的broker通过在Zookeeper里创建一个临时节点/controller让自己成为控制器。其他broker在启动时也会尝试创建这个节点,不过它们会收到一个“节点已存在”的异常,然后“意识”到控制器节点已存在。此时其他broker在Zookeeper中控制器节点/controller上创建watch对象,以便获取这个节点的变更通知,以确保集群里一次只有一个控制器存在。

  1. 控制器的变更

如果控制器被关闭或者与Zookeeper断开连接,Zookeeper上的临时节点/controller就会消失。集群里的其他broker通过watch对象得到控制器节点消失的通知,它们会尝试让自己成为新的控制器。第一个在Zookeeper里成功创建控制器节点的broker就会成为新的控制器,其他节点会收到“节点已存在”的异常,然后在新的控制器节点/controller上再次创建watch对象。每个新选出的控制器通过Zookeeper的条件递增操作获得一个全新的、数值更大的controller epoch。其他broker在知道当前controller epoch后,如果收到由控制器发出的包含较旧epoch的消息,就会忽略它们。控制器使用 epoch 来避免“脑裂”。

注:epoch:n. 时期;新纪元;新时代;阶段。

3.2.1. 控制器的作用

  1. 处理首领分区选举

当控制器发现一个broker已经离开集群(通过观察相关的Zookeeper路径),它就知道,那些失去首领的分区需要一个新首领(这些分区的首领刚好是在这个broker上)。控制器遍历这些分区,并确定谁应该成为新首领(简单来说就是分区副本列表里的下一个副本),然后向所有包含新首领或现有跟随者的broker发送请求。该请求消息包含了谁是新首领以及谁是分区跟随者的信息。随后,新首领开始处理来自生产者和消费者的请求,而跟随者开始从新首领那里复制消息。

  1. 处理新broker的加入

当控制器发现一个broker加入集群时,它会使用brokerID来检查新加入的broker是否包含现有分区的副本。如果有,控制器就把变更通知发送给新加入的broker和其他broker,新broker上的副本开始从首领那里复制消息。

4. 分区

Kafka的基本存储单元是分区。分区无法在多个broker间进行再细分,也无法在同一个broker的多个磁盘上进行再细分。所以分区的大小受到单个挂载点可用空间的限制。

在配置Kafka的时候,管理员指定了一个用于存储分区的目录清单——也就是server.properties文件中的log.dirs参数的值,该参数一般会包含每个挂载点的目录,可以使用,对多个目录进行分隔。

4.1. 分区分配

在创建主题时,Kafka首先会决定如何在broker间分配分区。假设你有3个broker,打算创建一个包含3个分区的主题,并且复制系数为3。那么Kafka就会有9个分区副本,它们可以被分配给3个broker。在进行分区分配时,我们要达到如下的目标:

  • 在broker间平均地分布分区副本,即要保证每个broker可以分到3个副本。
  • 确保每个分区的每个副本分布在不同的broker上。假设分区0的首领副本在broker101上,那么可以把跟随者副本放在broker102和broker103上,但不能放在broker101上,也不能两个都放在broker102上。
  • 如果为broker指定了机架信息,那么尽可能把每个分区的副本分配到不同机架的broker上,以保证一个机架的不可用不会导致整体的分区不可用。

以上述的目标为例,有以下的分配方式:

  1. 没有指定机架信息的情况

在没有指定机架信息时,会先随机选择一个broker(假设是101),然后使用轮询的方式给每个broker分配分区来确定首领分区的位置。于是,首领分区0会在broker101上,首领分区1会在broker102上,首领分区2会在broker103上,并以此类推。然后,我们从分区首领开始,依次分配跟随者副本。如果分区0的首领在broker101上,那么它的第一个跟随者副本会在broker102上,第二个跟随者副本会在broker103上。分区1的首领在broker102上,那么它的第一个跟随者副本在broker103上,第二个跟随者副本在broker101上。

  1. 指定了机架信息的情况

如果配置了机架信息,那么就不是按照数字顺序来选择broker了,而是按照交替机架的方式来选择broker。假设broker101和broker102放置在同一个机架上,broker103放置在其他不同的机架上。我们不是按照从101到103的顺序来选择broker,而是按照101,103,102的顺序来选择,这样每个相邻的broker都在不同的机架上。于是,如果分区0的首领在broker101上,那么第一个跟随者副本会在broker103上,这两个broker在不同的机架上。如果第一个机架下线,还有其他副本仍然活跃着,所以分区仍然可用。这对所有副本来说都是一样的,因此在机架下线时仍然能够保证可用性。

为分区和副本选好合适的broker之后,接下来要决定这些分区应该使用哪个目录。我们单独为每个分区分配目录,规则很简单:计算每个目录里的分区数量,新的分区总是被添加到数量最小的那个目录里。也就是说,如果添加了一个新磁盘,所有新的分区都会被创建到这个磁盘上。因为在完成分配工作之前,新磁盘的分区数量总是最少的。

我们可以在创建主题的时候,指定分区布局的配置:

  • ubuntu@s100:~$ kafka-topics.sh --create --zookeeper s101:2181 --replica-assignment 101:103,101:103,101:103,101:103 --topic test3
  • Created topic "test3".

查看test3的分区布局:

  • [zk: s101:2181(CONNECTED) 19] get /brokers/topics/test3/partitions/0/state
  • {"controller_epoch":3,"leader":101,"version":1,"leader_epoch":0,"isr":[101,103]}
  • ...
  • [zk: s101:2181(CONNECTED) 22] get /brokers/topics/test3/partitions/1/state
  • {"controller_epoch":3,"leader":101,"version":1,"leader_epoch":0,"isr":[101,103]}
  • ...
  • [zk: s101:2181(CONNECTED) 20] get /brokers/topics/test3/partitions/2/state
  • {"controller_epoch":3,"leader":101,"version":1,"leader_epoch":0,"isr":[101,103]}
  • ..
  • [zk: s101:2181(CONNECTED) 21] get /brokers/topics/test3/partitions/3/state
  • {"controller_epoch":3,"leader":101,"version":1,"leader_epoch":0,"isr":[101,103]}
  • ...

5. 副本机制

Kafka使用主题来组织数据,每个主题被分为若干个分区,每个分区有多个副本。那些副本被保存在broker上,每个broker可以保存成百上千个属于不同主题和分区的副本。副本有以下两种类型:

  • 首领副本:每个分区都有一个首领副本。为了保证一致性,所有生产者请求和消费者请求都会经过这个副本。
  • 跟随者副本:首领以外的副本都是跟随者副本。跟随者副本不处理来自客户端的请求,它们唯一的任务就是从首领那里复制消息,保持与首领一致的状态。如果首领发生崩溃,其中的一个跟随者会被提升为新首领。

首领的另一个任务是搞清楚哪个跟随者的状态与自己是一致的。跟随者为了保持与首领的状态一致,在有新消息到达时尝试从首领那里复制消息,不过有各种原因会导致同步失败。如果跟随者在10s内没有请求任何消息,或者虽然在请求消息,但在10s内没有请求最新的数据,那么它就会被认为是不同步的。如果一个副本无法与首领保持一致,在首领发生失效时,它就不可能成为新首领——毕竟它没有包含全部的消息。相反,持续请求得到的最新消息副本被称为同步的副本。在首领发生失效时,只有同步副本才有可能被选为新首领。跟随者的正常不活跃时间或在成为不同步副本之前的时间是通过replica.lag.time.max.ms参数来配置的。这个时间间隔直接影响着首领选举期间的客户端行为和数据保留机制。

除了当前首领之外,每个分区都有一个首选首领——创建主题时选定的首领就是分区的首选首领。在创建分区时,需要在broker之间均衡首领,以便broker间的负载最终会得到均衡。默认情况下,Kafka的auto.leader.rebalance.enable被设为true,它会检查首选首领是不是当前首领,如果不是,并且该副本是同步的,那么就会触发首领选举,让首选首领成为当前首领。

例如下面,我们创建了一个分区为3,副本数也为3的主题test,然后可以使用kafka-topics.sh脚本的--describe选项来查看该主题的分区和副本分布情况:

  • ubuntu@s100:~$ kafka-topics.sh --create --zookeeper s101:2181 --replication-factor 3 --partitions 3 --topic test
  • Created topic "test".
  • ubuntu@s100:~$ kafka-topics.sh --describe --zookeeper s101:2181 --topic test
  • Topic:test PartitionCount:3 ReplicationFactor:3 Configs:
  • Topic: test Partition: 0 Leader: 102 Replicas: 102,103,101 Isr: 102,103,101
  • Topic: test Partition: 1 Leader: 103 Replicas: 103,101,102 Isr: 103,101,102
  • Topic: test Partition: 2 Leader: 101 Replicas: 101,102,103 Isr: 101,102,103

从打印信息PartitionCount:3可以得知分区数为3,从ReplicationFactor:3可以得知副本数为3。以第0个分区为例,它的打印信息中,Leader: 102表示该分区的首领分区分配在ID为102的节点上,Replicas: 102,103,101表示它的三个副本分别位于ID为102、103、101三个broker上,Isr: 102,103,101表示该分区的三个同步副本的情况,只要出现在该列表中则表示该分区是同步的。

需要注意的是,当在使用kafka-topics.sh脚本的--replica-assignment选项进行手动进行副本分配时,第一个指定的副本就是首选首领,所以要避免让包含了首领的broker负载过重。

6. 生产者

生产者负责把消息发送给它所选择的Topic。

消息可以包含键和值,值用于表示消息内容,而键可以作为消息的附加信息,也可以用来决定消息该被写到主题的哪个分区。拥有相同键的消息将被写到同一个分区。如果键值为 null,并且使用了默认的分区器,那么记录将被随机地发送到主题内各个可用的分区上。分区器使用轮询(Round Robin)算法将消息均衡地分布到各个分区上。

如果键不为空,并且使用了默认的分区器,那么Kafka会对键进行散列(使用Kafka自己的散列算法,即使升级Java版本,散列值也不会发生变化),然后根据散列值把消息映射到特定的分区上。这里的关键之处在于,同一个键总是被映射到同一个分区上,所以在进行映射时,我们会使用主题所有的分区,而不仅仅是可用的分区。这也意味着,如果写入数据的分区是不可用的,那么就会发生错误。但这种情况很少发生。

生产者也可以使用自定义分区器来具体指定某条消息发给Topic的哪一个partition。

Kafka可以无缝地支持多个生产者,不管客户端在使用单个主题还是多个主题。所以它很适合用来从多个前端系统收集数据,并以统一的格式对外提供数据。例如,一个包含了多个微服务的网站,可以为页面视图创建一个单独的主题,所有服务都以相同的消息格式向该主题写入数据。消费者应用程序会获得统一的页面视图,而无需协调来自不同生产者的数据流。

7. 消费者

消费者Consumer是一个抽象的概念,调用Consumer API的程序都可以称作为一个Consumer,它从broker端订阅某个Topic的消息。如果只有一个Consumer的话,Topic(可能含有多个partition)下所有消息都会被这个Consumer接收。但是在分布式的环境中,我们可能会遇到这样一种情景,对于一个有多个partition的Topic,我们希望启动多个Consumer去消费这些partition(如果发送速度较快,一个Consumer是无法消费完的),并且要求Topic的一条消息只能发给其中一个Consumer,不希望这些Conusmer出现重复接收一条消息的情况。对于这种情况,Kafka提供了Consumer Group机制(如果只有一个Consumer时,是不需要指定Consumer Group,这时Kafka会自动给这个Consumer生成一个Group名)。

在调用Conusmer API时,一般都会指定一个Consumer Group,该Group订阅的Topic的每一条消息都发送到这个Group的某一台机器上。假如Kafka集群有两台broker,集群上有一个Topic,它有4个partition,其中0和1在broker1上,2和3在broker2上,这时有两个Consumer Group同时订阅这个Topic,其中一个Group有2个Consumer,另一个Consumer有4个Consumer,则它们的订阅消息情况如下图所示:

3.Kafka消费者组.png

因为Group A只有两个Consumer,所以一个Consumer会消费两个partition;而Group B有4个Consumer,一个Consumer会去消费一个partition。这里要注意的是,Kafka可以保证一个partition内的数据是有序的,所以Group B中的Consumer收到的数据是可以保证有序的,但是Group A中的Consumer就无法保证了。

Group读取Topic,partition分配机制是:

  • 如果Group中的Consumer数小于Topic中的partition数,那么Group中的Consumer就会消费多个partition;
  • 如果Group中的Consumer数等于Topic中的partition数,那么Group中的一个Consumer就会消费Topic中的一个partition;
  • 如果Group中的Consumer数大于Topic中的partition数,那么Group中就会有一部分的Consumer处于空闲状态。

8. 消息文件管理

保留数据是Kafka的一个基本特性,Kafka不会一直保留数据,也不会等到所有消费者都读取了消息之后才删除消息。开发者可以为每个主题配置了数据保留期限,规定数据被删除之前可以保留多长时间,或者清理数据之前可以保留的数据量大小。

因为在一个大文件里查找和删除消息是很费时的,也很容易出错,所以Kakfa把分区分成若干个片段。默认情况下,每个片段包含1GB或一周的数据,以较小的那个为准。在broker往分区写入数据时,如果达到片段上限,就关闭当前文件,并打开一个新文件。

当前正在写入数据的片段叫作活跃片段。活动片段永远不会被删除,所以如果你要保留数据1天,但片段里包含了5天的数据,那么这些数据会被保留5天,因为在片段被关闭之前这些数据无法被删除。如果你要保留数据一周,而且每天使用一个新片段,那么你就会看到,每天在使用一个新片段的同时会删除一个最老的片段——所以大部分时间该分区会有7个片段存在。broker会为分区里的每个片段打开一个文件句柄,哪怕片段是不活跃的。这样会导致打开过多的文件句柄,所以操作系统必须根据实际情况做一些调优。

9. 消息文件存储

在前面介绍过,Kafka的消息是以Topic为基本单位,不同Topic之间是相互独立的。每个Topic又可分为几个不同的partition,每个partition存储一部的分message。为了保证查找和删除消息的性能,Kafka将每个partition中的数据分成若干个片段进行存储,也即是Segment文件。其中,partition是以文件夹的形式存储在具体Broker本机上,每个partition对应的文件夹中存储了该partition的Segment文件,包括.log和.index两类。

Segment文件由和.index索引文件和.log数据文件构成,这两个文件是一一对应的。partition的第一对Segment文件从0开始,后续每对Segment文件的名称为上一个Segment文件最后一条消息的offset,ofsset的数值最大为64位(long类型),20位数字字符长度,高位不够时以数字0填充:

  • $> ls -lh
  • total 64
  • -rw-r--r-- 1 ubuntu ubuntu 10M 3 11 10:05 00000000000000000000.index
  • -rw-r--r-- 1 ubuntu ubuntu 49B 3 11 10:06 00000000000000000000.log

Segment文件中.log文件和.index文件的名称是一一对应的,同时它们的内容也是存在映射关系的;.index索引文件中存储着大量的元数据,.log数据文件中存储着大量消息数据,索引文件中的元数据指向与之对应的数据文件中的消息数据的物理偏移地址。如图所示:

4.索引与消息的映射.png

在上图的显示中,以索引文件中的(4, 578)为例,其中的4代表某条消息的逻辑offset值,而578则代表该条消息在消息文件中的偏移量,根据这个偏移量就可以在消息文件中定位到该条消息。

图中为了便于理解,索引文件和消息文件中的编号都是顺序化的,而在实际环境中,索引并没有为数据文件中的每条消息都建立索引,而是采取稀疏索引存储方式,每隔一定字节的数据建立一条索引,它减少了索引文件大小;稀疏索引为数据文件的每个对应消息设置一个元数据指针,它比稠密索引节省了更多的存储空间,但查找起来需要消耗更多的时间。

注:partition中的每条消息由offset来表示它在这个partition中的偏移量,这个offset并不是该消息在partition中实际存储位置,而是逻辑上的一个值,但它却唯一确定了partition中的一条消息(可以认为offset是partition中消息的id)。

10. 消息文件格式

我们把Kafka的消息和偏移量保存在文件里。保存在磁盘上的数据格式与从生产者发送过来或者发送给消费者的消息格式是一样的。因为使用了相同的消息格式进行磁盘存储和网络传输,Kafka可以使用零复制技术给消费者发送消息,同时避免了对生产者已经压缩过的消息进行解压和再压缩。

除了键、值和偏移量外,消息里还包含了消息大小、校验和、消息格式版本号、压缩算法(Snappy、GZip或LZ4)和时间戳(在0.10.0版本里引入的)。时间戳可以是生产者发送消息的时间,也可以是消息到达broker的时间,这个是可配置的。

如果生产者发送的是压缩过的消息,那么同一个批次的消息会被压缩在一起,被当作“包装消息”进行发送。于是,broker就会收到一个这样的消息,然后再把它发送给消费者。消费者在解压这个消息之后,会看到整个批次的消息,它们都有自己的时间戳和偏移量。如图所示:

5.消息结构.png

Kafka附带了一个叫DumpLogSegment的工具,可以用它查看片段的内容。它可以显示每个消息的偏移量、校验和、魔术数字节、消息大小和压缩算法。运行该工具的方法如下:

  • $> bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.log
  • Dumping 00000000000000000000.log
  • Starting offset: 0
  • offset: 0 position: 0 CreateTime: 1552269984954 isvalid: true payloadsize: 15 magic: 1 compresscodec: NONE crc: 937294350

如果使用了--deep-iteration参数,可以显示被压缩到包装消息里的消息。

11. Kafka在Zookeeper中的节点说明

当我们Kafka启动运行以后,就会在Zookeeper上初始化Kafka相关数据,主要包括六大类:

  • consumers:存储消费者相关信息;
  • admin:存储管理信息;
  • config:存储配置信息;
  • controller:中央控制器相关信息;
  • brokers:broker相关信息;
  • controller_epoch:中央控制器选举次数。

示意图如下:

6.Kafka在Zookeeper节点的信息.png

下面将详细介绍这些节点中存储的消息格式和内容。

11.1. /brokers节点结构说明

  1. Broker信息

/brokers/ids/[0...N]:每个broker的配置文件中都需要指定一个数字类型的id(全局不可重复),此节点为临时znode(EPHEMERAL)

JSON Schema:

  • {
  • "fields": [
  • {
  • "name": "version",
  • "type": "int",
  • "doc": "版本ID"
  • },
  • {
  • "name": "host",
  • "type": "string",
  • "doc": "Broker所在物理节点的主机名或IP地址"
  • },
  • {
  • "name": "port",
  • "type": "int",
  • "doc": "Broker所在物理节点的端口号"
  • },
  • {
  • "name": "jmx_port",
  • "type": "int",
  • "doc": "Broker所在物理节点的JMX端口号,由server.properties中参数port确定"
  • },
  • {
  • "name": "timestamp",
  • "type": "string",
  • "doc": "Broker初始启动的时间戳"
  • }
  • ]
  • }

存储示例:

  • {
  • "version": 1,
  • "host": "127.0.0.1",
  • "jmx_port": 5051,
  • "port": 8081,
  • "timestamp":"1403061000000"
  • }
  1. Topic信息结构

/brokers/topics/[topic]:存储Topic的分区分配信息。

JSON Schema:

  • {
  • "fields": [
  • {
  • "name": "version",
  • "type": "int",
  • "doc": "版本ID"
  • },
  • {
  • "name": "partitions",
  • "type": {
  • "type": "map",
  • "key": {
  • "type": "string",
  • "doc": "分区ID"
  • },
  • "value": [
  • {
  • "type": "int",
  • "doc": "分区的AR副本所在的Broker的ID"
  • }
  • ],
  • "doc": "字典结构,键为分区的ID,值为该分区的副本所在Broker的ID的集合"
  • }
  • }
  • ]
  • }

存储示例:

  • {
  • "version": 1,
  • "partitions": {
  • "0": [1, 2],
  • "1": [2, 1],
  • "2": [1, 2]
  • }
  • }
  1. Partition信息

/brokers/topics/[topic]/partitions/[0...N]:其中[0..N]表示partition索引号,/brokers/topics/[topic]/partitions/[partition_id]/state

JSON Schema:

  • {
  • "fields": [
  • {
  • "name": "version",
  • "type": "int",
  • "doc": "版本ID"
  • },
  • {
  • "name": "isr",
  • "type": {
  • "type": "array",
  • "items": "int",
  • "doc": "ISR副本集合所在的Broker的ID列表"
  • }
  • },
  • {
  • "name": "leader",
  • "type": "int",
  • "doc": "表示该分区的Leader副本所在的Broker的ID"
  • },
  • {
  • "name": "controller_epoch",
  • "type": "int",
  • "doc": "表示KafkaController的年代信息"
  • },
  • {
  • "name": "leader_epoch",
  • "type": "int",
  • "doc": "该分区的Leader副本的年代信息"
  • }
  • ]
  • }

存储示例:

  • {
  • "version": 1,
  • "isr": [2, 1],
  • "leader": 2,
  • "controller_epoch": 1,
  • "leader_epoch": 0
  • }

11.2. /controller_epoch节点结构说明

/controller_epoch:此值为一个int类型的数字,Kafka集群中第一个Broker第一次启动时为1,以后只要集群中Kafka Controller Leader(中央控制器)所在Broker变更或挂掉,就会重新选举新的Kafka Controller Leader,每次Kafka Controller Leader变更controller_epoch值就会加1。

11.3. /controller节点结构说明

/controller:存储Kafka Controller Leader(中央控制器)所在的Broker的信息。

JSON Schema:

  • {
  • "fields": [
  • {
  • "name": "version",
  • "type": "int",
  • "doc": "版本ID"
  • },
  • {
  • "name": "brokerid",
  • "type": "int",
  • "doc": "Kafka集群中Broker唯一编号"
  • },
  • {
  • "name": "timestamp",
  • "type": "string",
  • "doc": "Kafka Controller Leader中央控制器变更时的时间戳"
  • }
  • ]
  • }

存储示例:

  • {
  • "version": 1,
  • "brokerid": 3,
  • "timestamp": "1403061802981"
  • }

这个的意思就说明,当前的Controller所在的Broker机器是哪台,变更时间是多少等。

11.4. /admin节点结构说明

  1. “优先副本”选举信息

/admin/preferred_replica_election:存储了需要进行“优先副本”选举的分区的信息。

JSON Schema:

  • {
  • "fields": [
  • {
  • "name": "version",
  • "type": "int",
  • "doc": "版本ID"
  • },
  • {
  • "name": "partitions",
  • "type": {
  • "type": "array",
  • "items": {
  • "fields": [
  • {
  • "name": "topic",
  • "type": "string",
  • "doc": "需要进行优先副本选举的分区所属主题的名称"
  • },
  • {
  • "name": "partition",
  • "type": "int",
  • "doc": "需要进行优先副本选举的分区的ID"
  • }
  • ]
  • },
  • "doc": "需要进行优先副本选举的分区集合"
  • }
  • }
  • ]
  • }

存储示例:

  • {
  • "version": 1,
  • "partitions": [
  • {
  • "topic": "topic-1",
  • "partition": 8
  • },
  • {
  • "topic": "topic-2",
  • "partition": 16
  • }
  • ]
  • }
  1. 分区重分配信息

/admin/reassign_partitions:存于了将要进行重分配的分区的信息。

JSON Schema:

  • {
  • "fields": [
  • {
  • "name": "version",
  • "type": "int",
  • "doc": "版本ID"
  • },
  • {
  • "name": "partitions",
  • "type": {
  • "type": "array",
  • "items": {
  • "fields": [
  • {
  • "name": "topic",
  • "type": "string",
  • "doc": "需要进行重分配的分区所属的主题名称"
  • },
  • {
  • "name": "partition",
  • "type": "int",
  • "doc": "需要进行重分配的分区的ID"
  • },
  • {
  • "name": "replicas",
  • "type": "array",
  • "items": "int",
  • "doc": "需要进行重分配的分区的副本所在的Broker的ID集合"
  • }
  • ]
  • },
  • "doc": "需要进行重分配的分区集合"
  • }
  • }
  • ]
  • }

存储示例:

  • {
  • "version": 1,
  • "partitions": [
  • {
  • "topic": "topic-1",
  • "partition": 1,
  • "replicas": [
  • 1,
  • 2,
  • 3
  • ]
  • }
  • ]
  • }
  1. 待删除主题信息

/admin/delete_topics:存储了标记为待删除的主题的信息。

JSON Schema:

  • {
  • "fields": [
  • {
  • "name": "version",
  • "type": "int",
  • "doc": "版本ID"
  • },
  • {
  • "name": "topics",
  • "type": {
  • "type": "array",
  • "items": "string",
  • "doc": "待删除的主题的名称集合"
  • }
  • }
  • ]
  • }

存储示例:

  • {
  • "version": 1,
  • "topics": [
  • "topic-1",
  • "topic-2"
  • ]
  • }

11.5. Consumer及Customer Group

  1. 每个Consumer客户端被创建时,会向Zookeeper注册自己的信息;
  2. 此作用主要是为了“负载均衡”;
  3. 同一个Consumer Group中的Consumers,Kafka将相应Topic中的每个消息只发送给其中一个Consumer;
  4. Consumer Group中的每个Consumer读取Topic的一个或多个Partitions,并且是唯一的Consumer;
  5. 一个Consumer Group的多个Consumer的所有线程依次有序地消费一个Topic的所有partitions,如果Consumer Group中所有Consumer总线程大于partitions数量,则会出现Consumer空闲的情况;

举例说明:Kafka集群中创建一个Topic为report-log,共有4个partitions且索引编号为0-3;假如有目前有三个消费者节点:

注:一个Consumer中一个消费线程可以消费一个或多个partition。

  • 如果每个Consumer创建一个Consumer Thread线程,各个节点消费情况是:节点1消费索引编号为0、1分区,节点2消费索引编号为2,节点3消费索引编号为3。
  • 如果每个Consumer创建2个Consumer Thread线程,各个节点消费情况是(从Consumer节点先后启动状态来确定的):节点1消费索引编号为0、1分区;节点2消费索引编号为2、3;节点3为空闲状态。

总结:从以上可知,Consumer Group中各个Consumer是根据先后启动的顺序有序消费一个Topic的所有partitions的。如果Consumer Group中所有Consumer的总线程数大于partitions数量,则可能Consumer Thread或Consumer会出现空闲状态。

Consumer均衡算法:当一个group中,有Consumer加入或者离开时,会触发partitions均衡,均衡的最终目的是提升Topic的并发消费能力:

  1. 假如topic1,具有如下partitions:P0,P1,P2,P3;
  2. 加入group中,有如下Consumer:C0,C1;
  3. 首先根据partition索引号对partitions排序:P0,P1,P2,P3;
  4. 根据(consumer.id + ‘-‘ + thread序号)排序:C0,C1;
  5. 计算倍数:
M = [P0,P1,P2,P3].size / [C0,C1].size

本例值M=2(向上取整);

  1. 然后依次分配partitions:
C0 = [P0,P1],C1=[P2,P3]

即:

Ci = [P(i * M),P((i + 1) * M -1)]

11.5.1. Consumer注册信息

/consumers/[group_id]/ids/[consumer_id_string]:每个Consumer都有一个唯一的ID(Consumer ID可以通过配置文件指定,也可以由系统生成),此ID用来标记消费者信息。是一个临时的Znode,此节点的值请看consumerIdString产生规则,即表示此Consumer目前所消费的topic + partitions列表。consumerId产生规则:

  • StringconsumerUuid = null;
  • if(config.consumerId!=null && config.consumerId) {
  • consumerUuid = consumerId;
  • } else {
  • String uuid = UUID.randomUUID();
  • consumerUuid = "%s-%d-%s".format(
  • InetAddress.getLocalHost.getHostName, System.currentTimeMillis());
  • uuid.getMostSignificantBits().toHexString.substring(0,8));
  • }
  • String consumerIdString = config.groupId + "_" + consumerUuid;

Schema:

  • {
  • "version": 版本编号默认为1,
  • "subscription": { // 订阅topic列表
  • },
  • "topic名称": Consumer中Topic消费者线程数,
  • "pattern": "static",
  • "timestamp": "Consumer启动时的时间戳"
  • }

存储示例:

  • {
  • "version": 1,
  • "subscription": {
  • "open_platform_opt_push_plus1": 5
  • },
  • "pattern": "static",
  • "timestamp": "1411294187842"
  • }

11.5.2. Consumer Owner:

consumers/[group_id]/owners/[topic]/[partition_id]: consumerIdString + threadId索引编号;Consumer启动时,所触发的操作:

  1. 进行Consumer ID注册;
  2. 在Consumer ID注册节点下注册一个Watch用来监听当前group中其他Consumer的退出和加入;只要此znode path下节点列表变更,都会触发此group下Consumer的负载均衡。(比如一个Consumer失效,那么其他Consumer接管partitions);
  3. 在broker id注册节点下,注册一个watch用来监听broker的存活情况;如果broker列表变更,将会触发组下所有的Consumer重新balance。

11.6. Consumer Offset信息

/consumers/[groupId]/offsets/[topic]/[partitionId] -> long (offset):用来跟踪每个Consumer目前所消费的partition中最大的offset。此znode为持久节点,可以看出offset跟group_id有关,以表明当消费者组(Consumer Group)中一个消费者失效,重新触发balance,其他Consumer可以继续消费。