1. LogSegment
LogSegment中封装了一对FileMessageSet和一个OffsetIndex对象,构成对日志文件和索引文件的抽象描述,提供了日志文件和索引文件的读写功能以及其他辅助功能。LogSegment的定义和重要字段如下:
LogSegment提供了对消息的管理,包括索引和日志;在对消息数据的操作上,包含了对索引文件的操作和对日志文件的操作两个部分,LogSegment通过与之关联的OffsetIndex和FileMessageSet对象来实现,它只提供了相关的辅助功能。
我们需要也别关注的是log
、index
和baseOffset
三个字段,从它们的定义就可以得知,log
和index
分别表示描述日志文件和索引文件的FileMessageSet和OffsetIndex对象,而baseOffset
字段则记录了当前LogSegment中第一条消息的offset值,这个值对于消息的定位来说是比较重要的,另外在后面Log类对LogSegment进行组织管理时,baseOffset
字段也有很重要的作用,届时会详解。
1.1. 消息的追加
LogSegment的append(offset: Long, messages: ByteBufferMessageSet)
方法用于追加消息,从其定义和参数就可以得知,它所面向的是更高层的消息实体,包括索引和日志两部分;我们来看一下该方法的源码:
从该方法的实现可以看出,对索引和日志的添加其实是通过OffsetIndex和FileMessageSet的append(...)
方法实现的,LogSegment的append(...)
方法则只提供了边界控制;另外该方法还给出了创建索引的条件,前面介绍过,Kafka不会为每一条日志数据都建立索引,而是采用稀疏索引的方式为一部分日志建立索引,这里其实是通过indexIntervalBytes
字段来控制的,从上一个索引项所对应的消息开始计算,如果累积追加的消息数据大于indexIntervalBytes
的值时,就会添加一个索引项;indexIntervalBytes
的值是可以通过index.interval.bytes
配置项配置的。
1.2. 消息的读取
LogSegment提供了read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long = size): FetchDataInfo
方法读取消息数据,该方法有四个参数:
startOffset
:指定读取的起始消息的offset。maxOffset
:指定读取结束的offset,可以为空。maxSize
:指定读取的最大字节数。maxPosition
:指定读取的最大物理地址,可选参数,默认值是日志文件的大小。
这四个参数用于控制读取的数据量,需要注意的是在实际代码中是根据这四个参数来共同控制的,也就是说,读取的范围一定会同时满足四个参数的所有限制;read(...)
方法源码如下:
其中translateOffset(offset: Long, startingFilePosition: Int = 0): OffsetPosition
方法用于将offset转换为对应的物理地址,在read(...)
方法的参数中,startOffset
和maxOffset
参数都需要进行转换,我们先了解该方法的源码:
translateOffset(...)
方法内部则使用了OffsetIndex和FileMessageSet的lookup(...)
和searchFor(...)
两个方法来定位实际的物理position,这两个方法在前面的内容中已经详细介绍过了;首先通过OffsetIndex的lookup(...)
方法找到小于或等于offset
的最大的索引项,该索引项中记录了真实的物理position,然后通过FileMessageSet的searchFor(...)
找到offset大于或等于offset
的最近的日志项,该日志项中也记录了真实的物理position,最终返回由offset和position构成的OffsetPosition对象。其实可以发现,在这三个过程中,lookup(...)
方法得到的position其实只是用于限制searchFor(...)
方法开始查找的偏移量,以缩小查找范围。
回到前面的read(...)
方法,有了对translateOffset(...)
方法的理解,其实read(...)
方法相对来说是比较好理解的,最终读取的消息范围中,startOffset
所对应的position是起始偏移量,maxOffset
、maxPosition
和maxSize
同时控制了终止偏移量,一般来说,表示的是副本之间的High Water值,消费者最多只能读到High Water位置的消息,在后面讲解副本机制时会回顾这部分代码。
1.3. 消息索引的重建
当日志文件对应的索引文件出现损坏或丢失时,此时会根据日志文件重建索引文件,LogSegment的recover()
方法的功能就是根据日志文件重建索引文件,同时验证日志文件中消息的合法性;该方法的源码如下:
// TODO 关于压缩消息的结构 We often refer to the nested messages as “inner messages” and the wrapping message as the “outer message.” Note that the key should be null for the outer message and its offset will be the offset of the last inner message.
// TODO 关于压缩消息offset的分配 When receiving recursive version 0 messages, the broker decompresses them and each inner message is assigned an offset individually. In version 1, to avoid server side re-compression, only the wrapper message will be assigned an offset. The inner messages will have relative offsets. The absolute offset can be computed using the offset from the outer message, which corresponds to the offset assigned to the last inner message.
重建索引的过程比较直观,通过迭代FileMessageSet对象以遍历日志文件中的消息数据,根据消息数据存储的offset来重新生成稀疏索引项。在重建的过程中还会通过Message类的ensureValid()
方法检查消息数据的有效性。在碰到压缩消息时,取的是压缩消息集合中的第一条消息的offset。
- MessageSet (Version: 0) => [offset message_size message]
- offset => INT64
- message_size => INT32
- message => crc magic_byte attributes key value
- crc => INT32
- magic_byte => INT8
- attributes => INT8
- bit 0~2:
- 0: no compression
- 1: gzip
- 2: snappy
- bit 3~7: unused
- key => BYTES
- value => BYTES
- MessageSet (Version: 1) => [offset message_size message]
- offset => INT64
- message_size => INT32
- message => crc magic_byte attributes key value
- crc => INT32
- magic_byte => INT8
- attributes => INT8
- bit 0~2:
- 0: no compression
- 1: gzip
- 2: snappy
- 3: lz4
- bit 3: timestampType
- 0: create time
- 1: log append time
- bit 4~7: unused
- timestamp =>INT64
- key => BYTES
- value => BYTES
2. Log
在上一篇文章中我们已经探讨了OffsetIndex、Message、ByteBufferMessageSet、FileMessageSet及LogSegment的结构及内部实现,对日志文件和索引文件的构成有了基本的了解;有了这些基础,我们就可以继续研究Kafka中对日志目录的管理。
前面提到过,Kafka以<topic_name>_<partition_id>
的命名规则在指定存储数据文件的路径下为每个主题的每个分区建立目录,每个分区目录下存放了分区对应的日志文件和索引文件,并使用Log类描述一个主题分区存储目录。也即是说,一个Log类将管理多个LogSegment对象。
我们先观察Log类的定义及其重要的几个字段:
在Log类所定义的字段中,我们主要关注segments
字段,该字段是一个并发安全的字典,键是LogSegment的baseOffset
,而值是LogSegment对象;该字典使用ConcurrentSkipListMap实现,ConcurrentSkipListMap是位于JUC包中的并发安全的字典,它的内部结构实现是一个跳表,提供了快速检索及操作内部元素的功能;跳表是一种典型的以空间换时间的数据结构,其内部的元素是有序的,对于ConcurrentSkipListMap来说,其内部根据键默认为升序排序(也可以指定比较器);关于ConcurrentSkipListMap的具体实现可以看本博客Java多线程系列博文——Java并发安全集合——ConcurrentSkipListMap的实现。
2.1. 加载LogSegment
从上述的Log类片段代码中,最终会调用loadSegments()
加载LogSegment对象,该方法的源码非常多,我们可以将其分为几个步骤分别分析:
- 处理.clean、.deleted、.swap文件;
- 加载全部的日志文件和索引文件(可能需要重建索引);
- 处理可能存在的由于Broker非正常关闭导致的消息异常。
其中第一个步骤涉及到日志的压缩,第三个步骤涉及日志的检查点机制,这两部分内容将在后面讲解,这里先略去这两部分的代码,直接分析独立的第二个步骤的代码:
上述的加载日志文件和索引文件的代码的流程是比较清晰的,通过遍历dir
字段表示目录下的文件,分别进行判断:
- 当遍历到的是索引文件时,判断对应的日志文件是否存在,如果不存在就将索引文件也删除掉;
- 当遍历到的是日志文件时,判断对应的索引文件是否存在,如果不存在就重建索引;如果存在,将会检查索引文件的完整性,若检查过程总出现异常就删除索引文件并进行重建。
最终根据dir
目录、每一对消息文件的baseOffset
来创建相应数量的LogSegment对象,并以baseOffset
为键,LogSegment对象为值存储到ConcurrentSkipListMap类型的字典segments
中。
需要注意的是这里有一个细节,baseOffset
是通过日志文件的文件名(即源码中的start
变量)来确定的。由于segments
跳表内部元素会根据键进行升序排序,因此通过以baseOffset
为键向segments
中添加的LogSegment对象可以保证有序,这也就保证了在通过baseOffset
在segments
查找对应的LogSegment时的高效率。
2.2. 消息的追加
Log类的append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): LogAppendInfo
方法用于向Log中追加消息数据。在前一篇介绍日志的存储构成的文章中,FileMessageSet和LogSegment都存在追加操作,我们需要清楚的是,Log的追加操作是针对消息数据的追加,它最终依旧会调用LogSegment的追加操作,而LogSegment的追加操作既包含日志数据的追加,也包含索引数据的追加,其中追加日志数据的操作交给了FileMessageSet类,追加索引数据的操作则交给了OffsetIndex类。
在了解Log类的append(...)
方法之前,我们需要先认识Active Segment的概念;在Log类中定义了一个activeSegment
字段:
由于Kafka的顺序读写特性,向Log中追加消息时是顺序写入的,每个Log所管辖的LogSegment中只有最后一个能够进行写入操作,在其之前的所有LogSegment都不能写入数据,activeSegment()
方法就是用于获取这最后一个LogSegment对象的,具体表现为获取segments
集合中最后一个元素的值。activeSegment()
的值是会改变的,因为随着数据的不断写入,当前activeSegment()
表示的日志文件大小会到达一定阈值,此时需要创建新的日志和索引文件,而activeSegment()
也会指向描述这个新的日志文件的的LogSegment对象。
有了对Active Segment的认识,我们来分析Log的append(...)
方法,它的源码如下:
append(...)
方法的过程可以细分为以下几步:
- 首先调用Log类的
analyzeAndValidateMessageSet()
方法,对ByteBufferMessageSet中的Message数据进行验证,并返回LogAppendInfo对象。在LogAppendInfo中封装了ByteBufferMessageSet中第一个消息的offset、最后一个消息的offset、生产者采用的压缩方式、追加到Log的时间戳、服务端用的压缩方式、外层消息的个数、通过验证的总字节数等信息。 - 调用Log类的
trimInvalidBytes()
方法,清除未验证通过的Message。 - 调用ByteBufferMessageSet类的
validateMessagesAndAssignOffsets()
方法,进行内部压缩消息做进一步验证、消息格式转换、调整Magic值、修改时间戳等操作,并为Message分配offset。在ByteBufferMessageSet小节介绍过了,这里就不再赘述了。 - 如果在
validateMessagesAndAssignOffsets()
方法中修改了ByteBufferMessageSet的长度,则重新验证Message的长度是否合法。 - 调用Log类的
maybeRoll()
方法获取Active Segment,此过程可能分配新的Active Segment。 - 将ByteBufferMessageSetSet中的消息追加到Active Segment中,通过调用LogSegment的
append()
方法的实现。 - 更新当前副本的LEO,也就是
Log.nextOffsetMetadata
字段。 - 执行
flush()
操作,将LEO之前的全部Message刷新到磁盘。
这几个步骤中涉及了好几个辅助方法,下面将一一讲解。
2.2.1. 消息验证
Log类的analyzeAndValidateMessageSet(messages: ByteBufferMessageSet): LogAppendInfo
方法用于对ByteBufferMessageSet中的Message数据进行验证,主要是验证消息的长度、CRC32校验码、消息offset是否单调递增,这些验证都是对外层消息进行的,并不会解压内部的压缩消息。它的源码如下:
2.2.2. 清除验证未通过的消息
trimInvalidBytes(messages: ByteBufferMessageSet, info: LogAppendInfo): ByteBufferMessageSet
方法会根据analyzeAndValidateMessageSet(...)
方法返回的LogAppendInfo对象,将未通过验证的消息截断,它的实现比较简单,源码如下:
2.2.3. 滚动Active Segment
Log类的maybeRoll()
方法用于检测是否需要创建新Active Segment,条件有下面几个:
- 当前Active Segment的日志大小加上本次待追加的消息集合大小,超过配置的LogSegment的最大长度。
- 当前Active Segment的寿命超过了配置的LogSegment最长存活时间。
- 索引文件满了。
maybeRoll()
方法主要用于判断这些条件,并且在满足的情况下调用roll()
方法进行Active Segment滚动操作,它的源码如下:
真正对Active Segment的滚动操作定义在Log类的roll()
方法中,源码如下:
roll()
方法会根据logEndOffset
的值来创建新的日志文件和索引文件,新创建的文件名的组织方式如下:
从上面的源码可以得知,创建的索引文件和日志文件的文件名是一个20位数字的字符串,logEndOffset
值将作为offset占据低位(也是索引文件的baseOffset
),高位不足的位置以0填充。
在取得对应的File对象后,如果文件已存在就直接将其删除;然后将当前存在的Active Segment的索引文件及日志文件进行截断,保证文件中只保存了有效字节,最后构建一个startOffset
为logEndOffset
的LogSegment对象并将其添加到segments
集合中用于描述新创建的索引文件和日志文件。
调用updateLogEndOffset(nextOffsetMetadata.messageOffset)
方法则会更新nextOffsetMetadata
字段为新的LogOffsetMetadata对象:
nextOffsetMetadata
记录了Active Segment的最后一个offset值(也即是logEndOffset
,Log对象所对应的索引文件中最后一个offset值),Active Segment的baseOffset
以及Active Segment的大小;roll()
方法中调用的updateLogEndOffset(nextOffsetMetadata.messageOffset)
只是修改了Active Segment的baseOffset
和大小,并未修改logEndOffset
的值。
2.2.4. 数据刷盘
roll()
方法的最后还会向Scheduler提交一个flush()
操作将recoveryPoint
至newOffset
之间的数据刷新到磁盘,flush(offset: Long) : Unit
方法的源码如下:
从源码可知,flush(offset: Long) : Unit
方法会将revoceryPoint
至传入的offset之间的消息进行刷盘操作,并且修改revoceryPoint
记录的值;数据的刷盘操作实际上还是会交给FileMessageSet对象和OffsetIndex对象的flush()
方法,最终由通道和缓冲区映射进行实现。
2.3. 消息的读取
Log类也提供了read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None): FetchDataInfo
方法用于读取特定范围的日志数据,起始偏移量由startOffset
参数决定,读取长度由maxLength
和maxOffset
共同决定。read(...)
方法的源码如下:
read(...)
方法是没有进行并发控制的,它会在一开始保存一份nextOffsetMetadata
的副本并从中获取下一条记录的offset,用于检查传入参数的边界是否有效,然后根据startOffset
去segments
跳表中查找offset
仅小于startOffset
的LogSegment对象,然后使用LogSegment对象read(...)
方法读取数据。
这里需要注意的是,在计算maxPosition
时,从跳表中定位到的LogSegment可能是Active Segment,也有可能是普通的Segment;当为Active Segment时,需要考虑读取的同时其他线程并发写入的问题,有可能在当前线程正在读取Active Segment的数据时,其他线程的写入操作出发了Active Segment的滚动,此时Active Segment对象会被改变,这里进行了Double Check以保证maxPosition
的计算是实时的。
另外,这里增加的maxPosition
的计算也可以保证在读取数据时仅凭maxOffset
和maxLength
控制读取长度时由于多个线程之间状态nextOffsetMetadata.messageOffset
状态不同步导致的OffsetOutOfRangeException异常【ISSUE KAFKA-2477】。
推荐阅读
Java多线程 46 - ScheduledThreadPoolExecutor详解(2)
ScheduledThreadPoolExecutor用于执行周期性或延时性的定时任务,它是在ThreadPoolExe...