大数据
流式处理
Kafka
源码解析

Kafka系列 11 - 服务端源码分析 02:日志的存储构成(1)

简介:主要讲解OffsetIndex、Message、ByteBufferMessageSet、FileMessageSet及日志读写操作

1. 存储结构概览

Kafka会将生产者发送给自己的消息存储到日志文件中,生产者在发送消息时需要指定主题和分区(分区也可以不指定),为了提高写入的性能,Kafka的同一个分区中的消息是顺序写入的,避免了随机写入产生的性能问题。主题的分区会根据配置创建对应的副本,当一个分区的副本(无论是Leader 副本还是Follower 副本)被划分到某个Broker上时,Kafka就要在此Broker上为此分区建立相应的日志文件及对应的索引文件,而生产者发送的消息会存储在日志文件中,供消费者拉取后消费。

建立索引文件为了提高消息的检索效率,方便快速定位到指定的消息数据;在一个分区内的索引文件中存储的索引是顺序递增的稀疏索引,所谓稀疏索引的意思是,Kafka并不会为每一条消息数据都建立索引,以降低索引冗余。索引项包含offset和position两个相关联的值,offset值代表了消息顺序的逻辑值,并不是消息实际存放的物理地址,而position值则代表消息存放在日志文件中的物理地址。

Kafka以<topic_name>_<partition_id>的命名规则在指定存储数据文件的路径下为每个主题的每个分区建立目录,每个分区目录下存放了分区对应的日志文件和索引文件,例如拥有3个分区的主题Topic1的日志文件在某个Broker上存储的目录结构如下:

  • ├── Topic1_0
  • │   ├── 00000000000000000000.index
  • │   ├── 00000000000000000000.log
  • │   ├── 00000000000000933028.index
  • │   ├── 00000000000000933028.log
  • │   ├── 00000000000001892314.index
  • │   ├── 00000000000001892314.log
  • │   ├── 00000000000002810012.index
  • │   ├── 00000000000002810012.log
  • │   ...
  • ├── Topic1_1
  • │   ...
  • ├── Topic1_3
  • │   ...

Kafka日志层使用Log类描述一个主题分区存储目录,使用LogSegment描述这些目录下的一对关联的日志文件及索引文件,同时使用FileMessageSet类来描述日志文件,OffsetIndex类来描述索引文件。随着消息数据的不断写入,日志文件的大小到达一个阈值时,就创建新的日志文件和索引文件继续写入后续的消息和索引信息。索引文件和日志文件是同名的,命名规则是[baseOffset].logbaseOffset是日志文件中第一条消息的offset。它们的结构示意图如下:

1.日志存储层架构.png

2. OffsetIndex

索引机制是从Kafka 0.8版本开始引入的,每个日志文件都对应一个文件名相同,后缀名为.index的索引文件,每个索引文件由一个OffsetIndex进行描述。

在索引文件中,每个索引项占用8个字节的长度,其中前4字节表示相对offset,后4字节则表示对应的消息数据在日志文件中的绝对物理偏移量offset。相对offset表示的是消息相对于baseOffset的偏移量,索引文件的命名规则是[baseOffset].log,如果在切分后一个索引文件的baseOffset是9527,它的文件名就是00000000000000009527.index,与之对应的日志文件名为00000000000000009527.log,此时offset为9550的Message在索引文件中的相对offset就是9550 - 9527 = 23

我们先看一下OffsetIndex类的定义和重要字段:

  • /**
  • * @param _file 指向磁盘上的索引文件
  • * @param baseOffset 对应日志文件中第一条消息的offset
  • * @param maxIndexSize 当前索引文件中最多能够保存的索引项个数
  • */
  • class OffsetIndex(@volatile private[this] var _file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends Logging {
  • // 在对mmap操作时需要加锁保护
  • private val lock = new ReentrantLock
  • /**
  • * initialize the memory mapping for this index
  • * 用来操作索引文件的MappedByteBuffer
  • * */
  • @volatile
  • private[this] var mmap: MappedByteBuffer = { ... }
  • /**
  • * the number of eight-byte entries currently in the index
  • * 当前索引文件中的索引项个数
  • * */
  • @volatile
  • private[this] var _entries = mmap.position / 8
  • /**
  • * The maximum number of eight-byte entries this index can hold
  • * 当前索引文件中最多能够保存的索引项个数
  • * */
  • @volatile
  • private[this] var _maxEntries = mmap.limit / 8
  • // 最后一个索引项的offset
  • @volatile
  • private[this] var _lastOffset = readLastEntry.offset
  • ...
  • }

OffsetIndex类的lock字段是一个ReentrantLock重入锁,同时_filemmap_entries_maxEntries_lastOffset几个字段都使用@volatile进行修饰,用于控制并发状态下对索引文件的写入操作和线程可见性。

mmap字段是一个MappedByteBuffer类型的对象,用于映射磁盘的.index索引文件以提供对文件的读写功能,它的初始化源码如下:

  • // kafka.log.OffsetIndex类
  • @volatile
  • private[this] var mmap: MappedByteBuffer = {
  • // 如果索引文件不存在,则创建新文件并返回true,反之返回false
  • val newlyCreated = _file.createNewFile()
  • // 根据文件创建RandomAccessFile可读写对象
  • val raf = new RandomAccessFile(_file, "rw")
  • try {
  • /**
  • * pre-allocate the file if necessary
  • * 对于新创建的索引文件,进行扩容
  • * */
  • if (newlyCreated) {
  • if (maxIndexSize < 8)
  • throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize)
  • // 根据maxIndexSize的值对索引文件进行扩容,扩容结果是小于maxIndexSize的最大的8的倍数
  • raf.setLength(roundToExactMultiple(maxIndexSize, 8))
  • }
  • /**
  • * memory-map the file
  • * 进行内存映射
  • **/
  • val len = raf.length()
  • val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len)
  • /* set the position in the index for the next entry */
  • if (newlyCreated)
  • // 将新创建的索引文件的position设置为0,从头开始写文件
  • idx.position(0)
  • else
  • // if this is a pre-existing index, assume it is all valid and set position to last entry
  • /**
  • * 对于原来就存在的索引文件,则将position移动到所有索引项的结束为止,防止数据覆盖,
  • * idx是一个Buffer,limit表示目前数据的截止位置
  • */
  • idx.position(roundToExactMultiple(idx.limit, 8))
  • // 返回MappedByteBuffer
  • idx
  • } finally {
  • // 关闭RandomAccessFile对象并吞掉可能会产生的异常
  • CoreUtils.swallow(raf.close())
  • }
  • }
  • /**
  • * Round a number to the greatest exact multiple of the given factor less than the given number.
  • * E.g. roundToExactMultiple(67, 8) == 64
  • * 取小于number的最大的8的倍数
  • */
  • private def roundToExactMultiple(number: Int, factor: Int) = factor * (number / factor)

初始化过程不难理解,即为构造OffsetIndex对象传入的File对象_file映射得到MappedByteBuffer对象,在这个过程中会判断_file文件是否是新创建的,如果是新创建的会根据maxIndexSize设置文件的长度,不过需要注意的是,设置的长度永远是8的倍数,这个操作是通过roundToExactMultiple(number: Int, factor: Int)方法控制的。最后还会处理映射得到的MappedByteBuffer对象的position值,如果_file是新创建的文件就将position置为0,如果是已经存在的_file,则将position置为最后一个索引项的结束位置为止,以便新数据的写入。

2.1. 索引项的追加

OffsetIndex的def append(offset: Long, position: Int)方法用于写入指定的offset和position对,它的源码如下:

  • // kafka.log.OffsetIndex类
  • /**
  • * Append an entry for the given offset/location pair to the index. This entry must have a larger offset than all subsequent entries.
  • * 向索引文件中添加索引项
  • */
  • def append(offset: Long, position: Int) {
  • /**
  • * inLock是个柯里化方法,会先进行加锁,执行完后会解锁
  • */
  • inLock(lock) { // 加锁
  • // 检查是否存满了
  • require(!isFull, "Attempt to append to a full index (size = " + _entries + ").")
  • if (_entries == 0 || offset > _lastOffset) {
  • // 当前文件内没有索引项,传入的offset大于最后一个索引项
  • debug("Adding index entry %d => %d to %s.".format(offset, position, _file.getName))
  • mmap.putInt((offset - baseOffset).toInt)
  • mmap.putInt(position)
  • _entries += 1
  • _lastOffset = offset
  • require(_entries * 8 == mmap.position, _entries + " entries but file position in index is " + mmap.position + ".")
  • } else {
  • throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d) to %s."
  • .format(offset, _entries, _lastOffset, _file.getAbsolutePath))
  • }
  • }
  • }
  • /**
  • * True iff there are no more slots available in this index
  • * 索引文件的索引项数量是否达到最大值
  • */
  • def isFull: Boolean = _entries >= _maxEntries
  • // kafka.utils.CoreUtils类
  • /**
  • * Execute the given function inside the lock
  • */
  • def inLock[T](lock: Lock)(fun: => T): T = {
  • // 加锁
  • lock.lock()
  • try {
  • // 执行过程
  • fun
  • } finally {
  • // 解锁
  • lock.unlock()
  • }
  • }

apend(...)方法的实现比较简单,由于可能会有多个线程并发写入,所以在追加操作中进行了加锁;真正的追加过程会首先检查索引文件是否写满了,只有在还有空闲空间的情况下才进行写入。追加操作简单地将传入的offset和position以int值的方式添加到mmap缓冲区中,然后更新_entries_lastOffset字段的值。这里简单地介绍一下使用到的inLock[T](lock: Lock)(fun: => T): T方法,该方法是一个柯里化方法,通过传入的锁对象对传入的具体操作进行了环绕加锁和解锁过程,这种写法值得借鉴。

2.2. 索引文件的扩容

扩容操作的实现也比较简单,源码如下:

  • // kafka.log.OffsetIndex类
  • /**
  • * Reset the size of the memory map and the underneath file. This is used in two kinds of cases: (1) in
  • * trimToValidSize() which is called at closing the segment or new segment being rolled; (2) at
  • * loading segments from disk or truncating back to an old segment where a new log segment became active;
  • * we want to reset the index size to maximum index size to avoid rolling new segment.
  • */
  • def resize(newSize: Int) {
  • inLock(lock) {
  • val raf = new RandomAccessFile(_file, "rw")
  • // 合法化新大小
  • val roundedNewSize = roundToExactMultiple(newSize, 8)
  • // 当前mmap的position
  • val position = mmap.position
  • /* Windows won't let us modify the file length while the file is mmapped :-( */
  • if (Os.isWindows)
  • // windows下不能直接修改文件大小,需要手动清空底层的缓冲区
  • forceUnmap(mmap)
  • try {
  • // 修改文件大小为新的大小
  • raf.setLength(roundedNewSize)
  • // 重新映射MappedByteBuffer
  • mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize)
  • // 更新记录值
  • _maxEntries = mmap.limit / 8
  • // 更新position
  • mmap.position(position)
  • } finally {
  • CoreUtils.swallow(raf.close())
  • }
  • }
  • }
  • /**
  • * Forcefully free the buffer's mmap. We do this only on windows.
  • */
  • private def forceUnmap(m: MappedByteBuffer) {
  • try {
  • if(m.isInstanceOf[sun.nio.ch.DirectBuffer])
  • (m.asInstanceOf[sun.nio.ch.DirectBuffer]).cleaner().clean()
  • } catch {
  • case t: Throwable => warn("Error when freeing index buffer", t)
  • }
  • }

需要注意的是,设置的新的容量大小也必须是8的倍数,扩容操作中会对新容量进行检查。

2.3. 索引项的查找

  • // kafka.log.OffsetIndex类
  • /**
  • * Find the largest offset less than or equal to the given targetOffset
  • * and return a pair holding this offset and its corresponding physical file position.
  • *
  • * @param targetOffset The offset to look up.
  • *
  • * @return The offset found and the corresponding file position for this offset.
  • * If the target offset is smaller than the least entry in the index (or the index is empty),
  • * the pair (baseOffset, 0) is returned.
  • */
  • def lookup(targetOffset: Long): OffsetPosition = {
  • // Windows系统需要加锁
  • maybeLock(lock) {
  • // 创建一个mmap的副本
  • val idx = mmap.duplicate
  • // 查找targetOffset在idx中操作
  • val slot = indexSlotFor(idx, targetOffset)
  • if(slot == -1)
  • // 找不到
  • OffsetPosition(baseOffset, 0)
  • else
  • // 将offset和物理地址封装为offsetPosition对象并返回
  • OffsetPosition(baseOffset + relativeOffset(idx, slot), physical(idx, slot))
  • }
  • }
  • /**
  • * Execute the given function in a lock only if we are running on windows. We do this
  • * because Windows won't let us resize a file while it is mmapped. As a result we have to force unmap it
  • * and this requires synchronizing reads.
  • */
  • private def maybeLock[T](lock: Lock)(fun: => T): T = {
  • if(Os.isWindows)
  • lock.lock()
  • try {
  • fun
  • } finally {
  • if(Os.isWindows)
  • lock.unlock()
  • }
  • }

lookup(targetOffset: Long)会根据给定的offset查找对应的索引,并且返回由该索引项的绝对offset和position构成的OffsetPosition对象;这里有三个方法比较重要,其中两个方法relativeOffset(buffer: ByteBuffer, n: Int): Intphysical(buffer: ByteBuffer, n: Int): Int分别用于定位position和offset,前者会在索引文件中读取到第n个索引项的position值,后者会在索引文件中读取到第n个索引项的绝对offset值,这里所谓的绝对,意思是读取到的不是单纯的原始offset值,而是会加上baseOffset,它们的源码如下:

  • // kafka.log.OffsetIndex类
  • /**
  • * return the nth physical position
  • * 返回第n个offset所指向的消息数据在log文件中的物理偏移量
  • **/
  • private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8 + 4)
  • /**
  • * return the nth offset relative to the base offset
  • * 返回第n个offset相对于baseOffset的偏移量
  • **/
  • private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8)

另外一个是indexSlotFor(idx: ByteBuffer, targetOffset: Long): Int方法,它的源码如下:

  • // kafka.log.OffsetIndex类
  • /**
  • * Find the slot in which the largest offset less than or equal to the given
  • * target offset is stored.
  • * 找到小于或等于给定targetOffset的最大offset的槽位(即索引序号)
  • *
  • * @param idx The index buffer 索引项Buffer
  • * @param targetOffset The offset to look for 需要查找的offset
  • *
  • * @return The slot found or -1 if the least entry in the index is larger than the target offset or the index is empty
  • */
  • private def indexSlotFor(idx: ByteBuffer, targetOffset: Long): Int = {
  • // we only store the difference from the base offset so calculate that
  • // 根据baseOffset计算相对offset
  • val relOffset = targetOffset - baseOffset
  • // check if the index is empty
  • // 如果文件中没有索引项,直接返回-1
  • if (_entries == 0)
  • return -1
  • // check if the target offset is smaller than the least offset
  • /**
  • * 找到第0个offset的相对baseOffset的偏移量
  • * 如果第0个索引的相对偏移量都大于给定的targetOffset的相对偏移量
  • * 说明没有符合条件的offset,直接返回-1
  • */
  • if (relativeOffset(idx, 0) > relOffset)
  • return -1
  • // binary search for the entry
  • // 进行二分搜索查找符合的offset
  • var lo = 0
  • var hi = _entries - 1
  • while (lo < hi) {
  • // 找中点
  • val mid = ceil(hi/2.0 + lo/2.0).toInt
  • // 定位中点的相对offset
  • val found = relativeOffset(idx, mid)
  • if (found == relOffset)
  • // 如果中点就是需要找的,则直接返回
  • return mid
  • else if (found < relOffset)
  • // 中点小于要找的,在[mid, hi]之间找
  • lo = mid
  • else
  • // 中点大于要找的,在[lo, mid-1]之间找
  • hi = mid - 1
  • }
  • lo
  • }

indexSlotFor(...)方法可以在索引文件数据中查找到小于或等于给定targetOffset的最大offset的槽位(即索引位),使用了二分查找法。

2.4. 索引文件的裁剪

命名以“truncate”开始的方法用于对索引文件的裁剪,实现都比较简单,源码如下:

  • // kafka.log.OffsetIndex类
  • /**
  • * Truncate the entire index, deleting all entries
  • * 裁剪索引项,删除所有索引
  • */
  • def truncate() = truncateToEntries(0)
  • /**
  • * Remove all entries from the index which have an offset greater than or equal to the given offset.
  • * Truncating to an offset larger than the largest in the index has no effect.
  • * 裁剪索引项,删除offset之后(包括给定的offset)的所有索引项
  • */
  • def truncateTo(offset: Long) {
  • // 加锁
  • inLock(lock) {
  • /**
  • * 复制一份mmap副本,副本和原Buffer的数据是共享的
  • * 但操作副本的指针量时原Buffer的指针量不变
  • */
  • val idx = mmap.duplicate
  • // 在buffer中找到小于或等于offset的槽位(索引序号)
  • val slot = indexSlotFor(idx, offset)
  • /* There are 3 cases for choosing the new size
  • * 1) if there is no entry in the index <= the offset, delete everything
  • * 2) if there is an entry for this exact offset, delete it and everything larger than it
  • * 3) if there is no entry for this offset, delete everything larger than the next smallest
  • *
  • * 三种截断情况:
  • * 1. 如果没有找到小于offset的索引,删除所有索引;
  • * 2. 如果正好找到了offset,则取[0, offset)之间的,其他的删掉;
  • * 3. 如果offset不存在,则取[0, offset + 1]之间的,将(offset + 1, end]之间的全删除
  • */
  • val newEntries =
  • // 如果找到的为-1,表示没找到
  • if(slot < 0)
  • 0
  • else if(relativeOffset(idx, slot) == offset - baseOffset) // 如果slot的相对偏移量就是要找的,表示找到了
  • slot
  • else
  • // 否则取后面一个
  • slot + 1
  • // 删掉[newEntries, end]之间的索引
  • truncateToEntries(newEntries)
  • }
  • }
  • /**
  • * Truncates index to a known number of entries.
  • * 裁剪操作,裁剪给定的entries序号之后的索引项
  • */
  • private def truncateToEntries(entries: Int) {
  • inLock(lock) {
  • // 记录_entries
  • _entries = entries
  • // position到_entries * 8
  • mmap.position(_entries * 8)
  • // 更新_lastOffset
  • _lastOffset = readLastEntry.offset
  • }
  • }

3. Message

在介绍描述日志文件的FileMessageSet类之前,我们先了解描述消息的Message类和消息集合类ByteBufferMessageSet的原理。

首先关注Message类,该类表示具体的消息,内部使用ByteBuffer保存消息数据,消息数据具有特定的组织结构,示意图如下:

2.消息数据结构.png

对应于Message对象中的源码定义如下:

  • // kafka.message.Message类
  • object Message {
  • /**
  • * The current offset and size for all the fixed-length fields
  • */
  • // CRC偏移量
  • val CrcOffset = 0
  • // CRC长度,4字节
  • val CrcLength = 4
  • // 魔数偏移量
  • val MagicOffset = CrcOffset + CrcLength
  • // 魔数长度,1字节
  • val MagicLength = 1
  • // attributes偏移量
  • val AttributesOffset = MagicOffset + MagicLength
  • // attributes长度,1字节
  • val AttributesLength = 1
  • // Only message format version 1 has the timestamp field.
  • // 时间戳偏移量
  • val TimestampOffset = AttributesOffset + AttributesLength
  • // 时间戳长度,8字节
  • val TimestampLength = 8
  • // V0版本key size的偏移量(V0版本没有时间戳数据)
  • val KeySizeOffset_V0 = AttributesOffset + AttributesLength
  • // V1版本key size的偏移量(V1版本有时间戳数据)
  • val KeySizeOffset_V1 = TimestampOffset + TimestampLength
  • // key size的长度,4字节
  • val KeySizeLength = 4
  • // V0版本Key偏移量
  • val KeyOffset_V0 = KeySizeOffset_V0 + KeySizeLength
  • // V1版本Key偏移量
  • val KeyOffset_V1 = KeySizeOffset_V1 + KeySizeLength
  • // value size的长度,4字节
  • val ValueSizeLength = 4
  • // 消息的头信息长度
  • private val MessageHeaderSizeMap = Map (
  • // V0版本,没有时间戳
  • (0: Byte) -> (CrcLength + MagicLength + AttributesLength + KeySizeLength + ValueSizeLength),
  • // V1版本,有时间戳
  • (1: Byte) -> (CrcLength + MagicLength + AttributesLength + TimestampLength + KeySizeLength + ValueSizeLength))
  • /**
  • * The amount of overhead bytes in a message
  • * This value is only used to check if the message size is valid or not. So the minimum possible message bytes is
  • * used here, which comes from a message in message format V0 with empty key and value.
  • * 消息的overhead的大小,不包含键和值数据的长度
  • */
  • val MinMessageOverhead = KeyOffset_V0 + ValueSizeLength
  • /**
  • * The "magic" value
  • * When magic value is 0, the message uses absolute offset and does not have a timestamp field.
  • * When magic value is 1, the message uses relative offset and has a timestamp field.
  • * 魔数,V0版本为0,V1版本为1
  • */
  • val MagicValue_V0: Byte = 0
  • val MagicValue_V1: Byte = 1
  • // 当前魔数为1
  • val CurrentMagicValue: Byte = 1
  • /**
  • * Specifies the mask for the compression code. 3 bits to hold the compression codec.
  • * 0 is reserved to indicate no compression
  • * 取压缩器值的掩码
  • */
  • val CompressionCodeMask: Int = 0x07 // 00000000 00000000 00000000 00000111
  • /**
  • * Specifies the mask for timestamp type. 1 bit at the 4th least significant bit.
  • * 0 for CreateTime, 1 for LogAppendTime
  • * 取时间戳类型的掩码
  • */
  • val TimestampTypeMask: Byte = 0x08 // 00000000 00000000 00000000 00001000
  • val TimestampTypeAttributeBitOffset: Int = 3 // 时间戳类型在attributes数据中的偏移量
  • /**
  • * Compression code for uncompressed messages
  • * 表示无压缩器
  • */
  • val NoCompression: Int = 0
  • /**
  • * To indicate timestamp is not defined so "magic" value 0 will be used.
  • * 表示无时间戳数据
  • */
  • val NoTimestamp: Long = -1
  • /**
  • * Give the header size difference between different message versions.
  • * 比较两个魔数版本的消息的头信息的长度差
  • */
  • def headerSizeDiff(fromMagicValue: Byte, toMagicValue: Byte) : Int =
  • MessageHeaderSizeMap(toMagicValue) - MessageHeaderSizeMap(fromMagicValue)
  • }

从源码可以得知,不同魔数版本的消息,在对时间戳的处理上是不同的,当魔数为0时,消息格式中没有时间戳部分;当魔数为1时,消息格式中存在时间戳部分。魔数值不同,消息的长度也有差异。

另外从Message类的注释中我们可以得到一些有关压缩器和时间戳相关的有用信息,源码如下:

  • // kafka.message.Message类
  • /**
  • * A message. The format of an N byte message is the following:
  • *
  • * 1. 4 byte CRC32 of the message
  • * 2. 1 byte "magic" identifier to allow format changes, value is 0 or 1
  • * 3. 1 byte "attributes" identifier to allow annotations on the message independent of the version
  • * bit 0 ~ 2 : Compression codec.
  • * 0 : no compression
  • * 1 : gzip
  • * 2 : snappy
  • * 3 : lz4
  • * bit 3 : Timestamp type
  • * 0 : create time
  • * 1 : log append time
  • * bit 4 ~ 7 : reserved
  • * 4. (Optional) 8 byte timestamp only if "magic" identifier is greater than 0
  • * 5. 4 byte key length, containing length K
  • * 6. K byte key
  • * 7. 4 byte payload length, containing length V
  • * 8. V byte payload
  • *
  • * Default constructor wraps an existing ByteBuffer with the Message object with no change to the contents.
  • * @param buffer the byte buffer of this message.
  • * @param wrapperMessageTimestamp the wrapper message timestamp, which is only defined when the message is an inner
  • * message of a compressed message.
  • * @param wrapperMessageTimestampType the wrapper message timestamp type, which is only defined when the message is an
  • * inner message of a compressed message.
  • */
  • class Message(val buffer: ByteBuffer,
  • private val wrapperMessageTimestamp: Option[Long] = None,
  • private val wrapperMessageTimestampType: Option[TimestampType] = None) {

从注释可知,attributes部分的数据中:

  • 第0 ~ 2位表示压缩器类型,0表示无压缩器,1表示gzip压缩器,2表示snappy压缩器,3表示lz4压缩器;
  • 第3位表示时间戳类型,0表示创建时间,1表示日志追加时间;
  • 滴4 ~ 7位是保留位。

上面的规则与图中描述的一样。同时对于时间戳来说,只有当魔数大于0时才会有8位长度的时间戳数据。

在Message类的构造方法中,我们可以得到Message初始化过程中对以上各个字段的填充过程,这部分代码非常直观:

  • // kafka.message.Message类
  • /**
  • * A constructor to create a Message
  • * @param bytes The payload of the message 值数据载荷
  • * @param key The key of the message (null, if none) 键
  • * @param timestamp The timestamp of the message. 时间戳
  • * @param timestampType The timestamp type of the message. 时间戳类型
  • * @param codec The compression codec used on the contents of the message (if any) 压缩器
  • * @param payloadOffset The offset into the payload array used to extract payload 读取bytes时的起始offset
  • * @param payloadSize The size of the payload to use 读取bytes的字节数,payloadOffset和payloadSize用于控制量化读取bytes数组
  • * @param magicValue the magic value to use 魔数
  • */
  • def this(bytes: Array[Byte],
  • key: Array[Byte],
  • timestamp: Long,
  • timestampType: TimestampType,
  • codec: CompressionCodec,
  • payloadOffset: Int,
  • payloadSize: Int,
  • magicValue: Byte) = {
  • this(ByteBuffer.allocate(Message.CrcLength +
  • Message.MagicLength +
  • Message.AttributesLength +
  • // 魔数为V0(值为0)时没有时间戳,为V1(值为1)时有时间戳
  • (if (magicValue == Message.MagicValue_V0) 0
  • else Message.TimestampLength) +
  • Message.KeySizeLength +
  • // 键长度,键为空时,键长度为0
  • (if(key == null) 0 else key.length) +
  • Message.ValueSizeLength +
  • // 值长度,bytes为空时,值长度为0
  • // bytes不为空时,如果payloadSize大于等于0则取payloadSize
  • // bytes不为空时,如果payloadSize小于0则计算bytes.length - payloadOffset
  • (if(bytes == null) 0
  • else if(payloadSize >= 0) payloadSize
  • else bytes.length - payloadOffset)))
  • // 验证时间戳和魔数,这里的验证会根据魔数版本来判断时间戳是否合法
  • validateTimestampAndMagicValue(timestamp, magicValue)
  • // skip crc, we will fill that in at the end
  • // 先跳过CRC,CRC部分会在最后填充
  • buffer.position(MagicOffset)
  • // 填入魔数
  • buffer.put(magicValue)
  • val attributes: Byte =
  • if (codec.codec > 0)
  • // 有压缩器,需要写入压缩器类型及时间戳类型
  • timestampType.updateAttributes((CompressionCodeMask & codec.codec).toByte)
  • // 无压缩器,attribute为0
  • else 0
  • // 填入attributes
  • buffer.put(attributes)
  • // Only put timestamp when "magic" value is greater than 0
  • if (magic > MagicValue_V0)
  • // 魔数大于V0时,填入时间戳
  • buffer.putLong(timestamp)
  • if(key == null) { // 键为空
  • // 键长度,键为空时填入-1
  • buffer.putInt(-1)
  • } else { // 键不为空
  • // 键长度,键不为空时填入键的长度
  • buffer.putInt(key.length)
  • // 填入键
  • buffer.put(key, 0, key.length)
  • }
  • // 计算值长度
  • val size = if(bytes == null) -1
  • else if(payloadSize >= 0) payloadSize
  • else bytes.length - payloadOffset
  • // 填入值长度
  • buffer.putInt(size)
  • if(bytes != null)
  • // 值不为空,填入值
  • buffer.put(bytes, payloadOffset, size)
  • // rewind,position移到0,准备填入CRC
  • buffer.rewind()
  • // now compute the checksum and fill it in
  • // 计算并填入CRC
  • Utils.writeUnsignedInt(buffer, CrcOffset, computeChecksum)
  • }
  • /**
  • * Validate the timestamp and "magic" value
  • */
  • private def validateTimestampAndMagicValue(timestamp: Long, magic: Byte) {
  • // 魔数必须是V0或V1版本中的一个
  • if (magic != MagicValue_V0 && magic != MagicValue_V1)
  • throw new IllegalArgumentException(s"Invalid magic value $magic")
  • // 时间戳若小于0,则必须是-1
  • if (timestamp < 0 && timestamp != NoTimestamp)
  • throw new IllegalArgumentException(s"Invalid message timestamp $timestamp")
  • // V0版本的魔数,时间戳必须是-1
  • if (magic == MagicValue_V0 && timestamp != NoTimestamp)
  • throw new IllegalArgumentException(s"Invalid timestamp $timestamp. Timestamp must be ${NoTimestamp} when magic = ${MagicValue_V0}")
  • }
  • /**
  • * Compute the checksum of the message from the message contents
  • * 根据ByteBuffer数据计算checksum值
  • */
  • def computeChecksum: Long =
  • CoreUtils.crc32(buffer.array, buffer.arrayOffset + MagicOffset, buffer.limit - MagicOffset)
  • // org.apache.kafka.common.utils.Utils类
  • /**
  • * Write the given long value as a 4 byte unsigned integer. Overflow is ignored.
  • *
  • * @param buffer The buffer to write to
  • * @param index The position in the buffer at which to begin writing
  • * @param value The value to write
  • */
  • public static void writeUnsignedInt(ByteBuffer buffer, int index, long value) {
  • buffer.putInt(index, (int) (value & 0xffffffffL));
  • }

填充过程首先会跳过CRC校验码,先填充后面的数据,最终根据后面的数据计算CRC校验并往回填充。

Message类还提供了大量的方法用于从Buffer中读取数据,这些方法都比较简单,就不多赘述了:

  • /**
  • * Compute the checksum of the message from the message contents
  • * 根据ByteBuffer数据计算checksum值
  • */
  • def computeChecksum: Long =
  • CoreUtils.crc32(buffer.array, buffer.arrayOffset + MagicOffset, buffer.limit - MagicOffset)
  • /**
  • * Retrieve the previously computed CRC for this message
  • * 读取消息中的CRC值作为checksum
  • */
  • def checksum: Long = Utils.readUnsignedInt(buffer, CrcOffset)
  • /**
  • * Returns true if the crc stored with the message matches the crc computed off the message contents
  • * 判断消息是否有效,即将消息中的checksum与计算出的checksum进行比较
  • */
  • def isValid: Boolean = checksum == computeChecksum
  • /**
  • * Throw an InvalidMessageException if isValid is false for this message
  • * 保证消息是有效的,如果无效则抛出InvalidMessageException异常
  • */
  • def ensureValid() {
  • if(!isValid)
  • throw new InvalidMessageException(s"Message is corrupt (stored crc = ${checksum}, computed crc = ${computeChecksum})")
  • }
  • /**
  • * The complete serialized size of this message in bytes (including crc, header attributes, etc)
  • * 消息大小
  • */
  • def size: Int = buffer.limit
  • /**
  • * The position where the key size is stored.
  • * 表示键大小的数据的偏移量
  • */
  • private def keySizeOffset = {
  • // 当魔数为MagicValue_V0为KeySizeOffset_V0,否则为KeySizeOffset_V1
  • if (magic == MagicValue_V0) KeySizeOffset_V0
  • else KeySizeOffset_V1
  • }
  • /**
  • * The length of the key in bytes
  • * 键大小
  • */
  • def keySize: Int = buffer.getInt(keySizeOffset)
  • /**
  • * Does the message have a key?
  • * 消息是否有键
  • */
  • def hasKey: Boolean = keySize >= 0
  • /**
  • * The position where the payload size is stored
  • * 表示值大小的数据的偏移量
  • */
  • private def payloadSizeOffset = {
  • // payLoadSize数据紧跟着键的数据,魔数不同时,键的偏移量不同
  • if (magic == MagicValue_V0) KeyOffset_V0 + max(0, keySize)
  • else KeyOffset_V1 + max(0, keySize)
  • }
  • /**
  • * The length of the message value in bytes
  • * 获取值数据的大小
  • */
  • def payloadSize: Int = buffer.getInt(payloadSizeOffset)
  • /**
  • * Is the payload of this message null
  • * 判断值数据是否为空
  • */
  • def isNull: Boolean = payloadSize < 0
  • /**
  • * The magic version of this message
  • * 获取魔数,偏移量是MagicOffset
  • */
  • def magic: Byte = buffer.get(MagicOffset)
  • /**
  • * The attributes stored with this message
  • * 获取消息属性,一个字节长,偏移量是AttributesOffset
  • */
  • def attributes: Byte = buffer.get(AttributesOffset)
  • /**
  • * The timestamp of the message, only available when the "magic" value is greater than 0
  • * When magic > 0, The timestamp of a message is determined in the following way:
  • * 1. wrapperMessageTimestampType = None and wrapperMessageTimestamp is None - Uncompressed message, timestamp and timestamp type are in the message.
  • * 2. wrapperMessageTimestampType = LogAppendTime and wrapperMessageTimestamp is defined - Compressed message using LogAppendTime
  • * 3. wrapperMessageTimestampType = CreateTime and wrapperMessageTimestamp is defined - Compressed message using CreateTime
  • * 获取时间戳,其含义由attribute的第3位确定,0表示创建时间,1表示追加时间;
  • * magic值不同,消息的长度是不同的:
  • * - 当magic为0时,消息的offset使用绝对offset且消息格式中没有timestamp部分;
  • * - 当magic为1时,消息的offset使用相对offset且消息格式中存在timestamp部分。
  • */
  • def timestamp: Long = {
  • if (magic == MagicValue_V0)
  • Message.NoTimestamp
  • // Case 2
  • else if (wrapperMessageTimestampType.exists(_ == TimestampType.LOG_APPEND_TIME) && wrapperMessageTimestamp.isDefined)
  • wrapperMessageTimestamp.get
  • else // case 1, 3
  • buffer.getLong(Message.TimestampOffset)
  • }
  • /**
  • * The timestamp type of the message
  • * 时间戳类型
  • */
  • def timestampType = {
  • if (magic == MagicValue_V0)
  • // 无时间戳
  • TimestampType.NO_TIMESTAMP_TYPE
  • else
  • // 根据消息属性attributes来获取时间戳类型
  • wrapperMessageTimestampType.getOrElse(TimestampType.forAttributes(attributes))
  • }
  • /**
  • * The compression codec used with this message
  • * 获取压缩器类型
  • */
  • def compressionCodec: CompressionCodec =
  • CompressionCodec.getCompressionCodec(buffer.get(AttributesOffset) & CompressionCodeMask)
  • /**
  • * A ByteBuffer containing the content of the message
  • * 获取值数据
  • */
  • def payload: ByteBuffer = sliceDelimited(payloadSizeOffset)
  • /**
  • * A ByteBuffer containing the message key
  • * 获取键数据
  • */
  • def key: ByteBuffer = sliceDelimited(keySizeOffset)
  • /**
  • * convert the message to specified format
  • * 根据指定的魔数将消息转换为特定格式
  • */
  • def toFormatVersion(toMagicValue: Byte): Message = {
  • if (magic == toMagicValue)
  • // 指定魔数与当前魔数相同,直接返回即可
  • this
  • else {
  • // 否则进行转换
  • // 先创建需要的ByteBuffer
  • val byteBuffer = ByteBuffer.allocate(size + Message.headerSizeDiff(magic, toMagicValue))
  • // Copy bytes from old messages to new message
  • // 使用convertToBuffer()方法进行数据拷贝
  • convertToBuffer(toMagicValue, byteBuffer, Message.NoTimestamp)
  • // 创建新的Message对象并返回
  • new Message(byteBuffer)
  • }
  • }

Message的结构大致就是这样了,在后面的讲解中对Message的操作其实是透明的,我们大部分时间是无需关注的。

4. ByteBufferMessageSet

// TODO ByteBufferMessageSet的作用需要重审

在上面一节我们已经了解了每条消息是以Message对象的形式组织的,与Kafka生产者具有消息压缩机制一样,当向日志文件中追加消息数据时,Kafka也提供了以压缩状态存储批量消息的功能,ByteBufferMessageSet类就可以根据Message集合以及压缩器类型对追加的消息数据进行压缩,它的主要功能有三个:

  1. 将Message集合按照指定的压缩类型进行压缩,此功能主要用于构建ByteBufferMessageSet对象,通过ByteBufferMessageSet.create()方法完成。
  2. 提供迭代器,实现深层迭代和浅层迭代两种迭代方式。
  3. 提供了消息验证和offset分配的功能。

4.1. 消息集合的压缩

我们先查看第一步提到的ByteBufferMessageSet.create()方法,它的源码如下:

  • // kafka.message.ByteBufferMessageSet类
  • private def create(offsetAssigner: OffsetAssigner,
  • compressionCodec: CompressionCodec,
  • wrapperMessageTimestamp: Option[Long],
  • timestampType: TimestampType,
  • messages: Message*): ByteBuffer = {
  • // 如果传入的消息为空,则返回空的ByteBuffer
  • if (messages.isEmpty)
  • MessageSet.Empty.buffer
  • else if (compressionCodec == NoCompressionCodec) {// 无压缩器
  • // 计算并创建大小可容纳所有传入消息的ByteBuffer
  • val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
  • // 遍历所有传入的消息,并将数据写入到buffer
  • for (message <- messages) writeMessage(buffer, message, offsetAssigner.nextAbsoluteOffset())
  • buffer.rewind()
  • // 返回buffer
  • buffer
  • } else {
  • // 需要压缩
  • // 得到魔数和时间戳
  • val magicAndTimestamp = wrapperMessageTimestamp match {
  • case Some(ts) => MagicAndTimestamp(messages.head.magic, ts)
  • case None => MessageSet.magicAndLargestTimestamp(messages)
  • }
  • var offset = -1L
  • // 底层使用byte数组保存写入的压缩数据
  • val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 1 << 16))
  • // write是个柯里化方法
  • messageWriter.write(codec = compressionCodec, timestamp = magicAndTimestamp.timestamp, timestampType = timestampType, magicValue = magicAndTimestamp.magic) { outputStream =>
  • // 创建指定压缩类型的输入流
  • val output = new DataOutputStream(CompressionFactory(compressionCodec, magicAndTimestamp.magic, outputStream))
  • try {
  • // 遍历写入内层压缩消息
  • for (message <- messages) {
  • offset = offsetAssigner.nextAbsoluteOffset()
  • if (message.magic != magicAndTimestamp.magic)
  • throw new IllegalArgumentException("Messages in the message set must have same magic value")
  • // Use inner offset if magic value is greater than 0
  • // 魔数为1,写入的是相对offset;魔数为0,写的是offset
  • if (magicAndTimestamp.magic > Message.MagicValue_V0)
  • output.writeLong(offsetAssigner.toInnerOffset(offset))
  • else
  • output.writeLong(offset)
  • // 写入size
  • output.writeInt(message.size)
  • // 写入Message数据
  • output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit)
  • }
  • } finally {
  • output.close()
  • }
  • }
  • val buffer = ByteBuffer.allocate(messageWriter.size + MessageSet.LogOverhead)
  • // 按照消息格式写入整个外层消息,外层消息的offset是最后一条内层消息的offset
  • writeMessage(buffer, messageWriter, offset)
  • buffer.rewind()
  • buffer
  • }
  • }
  • // kafka.message.OffsetAssigner类
  • // 用于生成offset值
  • private class OffsetAssigner(offsets: Seq[Long]) {
  • private var index = 0
  • def nextAbsoluteOffset(): Long = {
  • val result = offsets(index)
  • index += 1
  • result
  • }
  • def toInnerOffset(offset: Long): Long = offset - offsets.head
  • }
  • // kafka.message.MessageWriter类
  • class MessageWriter(segmentSize: Int) extends BufferingOutputStream(segmentSize) {
  • import Message._
  • def write(key: Array[Byte] = null,
  • codec: CompressionCodec,
  • timestamp: Long,
  • timestampType: TimestampType,
  • magicValue: Byte)(writePayload: OutputStream => Unit): Unit = {
  • // 写入CRC信息
  • withCrc32Prefix {
  • // write magic value
  • // 写入魔数
  • write(magicValue)
  • // write attributes
  • // 写入属性
  • var attributes: Byte = 0
  • if (codec.codec > 0)
  • attributes = (attributes | (CompressionCodeMask & codec.codec)).toByte
  • // 根据魔数更新属性中的时间戳
  • if (magicValue > MagicValue_V0)
  • attributes = timestampType.updateAttributes(attributes)
  • write(attributes)
  • // Write timestamp
  • // 写入时间戳,当魔数大于MagicValue_V0(0)时才写入
  • if (magicValue > MagicValue_V0)
  • writeLong(timestamp)
  • // write the key
  • // 如果键为null,写入-1的int值
  • if (key == null) {
  • writeInt(-1)
  • } else {
  • // 否则先写入键的长度
  • writeInt(key.length)
  • // 然后写入键值
  • write(key, 0, key.length)
  • }
  • // write the payload with length prefix
  • // 写入值,在该过程中还会写入值长度数据
  • withLengthPrefix {
  • writePayload(this)
  • }
  • }
  • }
  • private def withCrc32Prefix(writeData: => Unit): Unit = {
  • // get a writer for CRC value
  • val crcWriter = reserve(CrcLength)
  • // save current position
  • var seg = currentSegment
  • val offset = currentSegment.written
  • // write data
  • writeData
  • // compute CRC32
  • val crc = new Crc32()
  • if (offset < seg.written) crc.update(seg.bytes, offset, seg.written - offset)
  • seg = seg.next
  • while (seg != null) {
  • if (seg.written > 0) crc.update(seg.bytes, 0, seg.written)
  • seg = seg.next
  • }
  • // write CRC32
  • writeInt(crcWriter, crc.getValue().toInt)
  • }
  • private def withLengthPrefix(writeData: => Unit): Unit = {
  • // get a writer for length value
  • // 获取用于写入值长度的写入器,在这个过程中会跳过ValueSizeLength个字节数
  • val lengthWriter = reserve(ValueSizeLength)
  • // save current size
  • val oldSize = size
  • // write data
  • // 写入值数据
  • writeData
  • // 使用lengthWriter写入值长度
  • // write length value
  • writeInt(lengthWriter, size - oldSize)
  • }
  • ...
  • }

create()方法中会根据压缩器情况对消息进行压缩并分配offset。如果传入的Message集合为空,则返回空ByteBuffer;如果不需要进行压缩,则使用OffsetAssigner分配每个消息的offset,返回写入了消息数据的ByteBuffer;如果需要进行压缩,则先将Message集合按照指定的压缩方式进行压缩并保存到缓冲区,完成offset的分配,然后按照压缩消息的格式写入外层消息,最后将整个外层消息所在的ByteBuffer返回。

4.2. 压缩消息的迭代

由于ByteBufferMessageSet中的消息可能是压缩消息,因此在迭代消息时需要考虑到深层的压缩数据。ByteBufferMessageSet对压缩消息的迭代实现与MemoryRecords非常相似,它提供了internalIterator(isShallow: Boolean = false): Iterator[MessageAndOffset]方法用于创建迭代器,其中的isShallow标识了迭代器的类型——深层迭代器(false)或浅层迭代器(true),同时提供了两个方法用于快速创建两个迭代器:

  • // kafka.message.ByteBufferMessageSet类
  • /**
  • * default iterator that iterates over decompressed messages
  • * 默认的迭代器是深层迭代器
  • * */
  • override def iterator: Iterator[MessageAndOffset] = internalIterator()
  • /**
  • * iterator over compressed messages without decompressing
  • * 浅层迭代器
  • * */
  • def shallowIterator: Iterator[MessageAndOffset] = internalIterator(true)

internalIterator(isShallow: Boolean = false): Iterator[MessageAndOffset]方法的源码如下:

  • // kafka.message.ByteBufferMessageSet类
  • /**
  • * When flag isShallow is set to be true, we do a shallow iteration: just traverse the first level of messages.
  • * isShallow为true时做浅层迭代,仅仅迭代第一层消息
  • */
  • private def internalIterator(isShallow: Boolean = false): Iterator[MessageAndOffset] = {
  • new IteratorTemplate[MessageAndOffset] {
  • // 顶层迭代器的缓冲区副本
  • var topIter = buffer.slice()
  • // 深层迭代器
  • var innerIter: Iterator[MessageAndOffset] = null
  • // 深层迭代是否完成,当深层迭代器为null或其中没有下一个元素时说明完成了
  • def innerDone(): Boolean = (innerIter == null || !innerIter.hasNext)
  • // 浅层迭代
  • def makeNextOuter: MessageAndOffset = {
  • // if there isn't at least an offset and size, we are done
  • if (topIter.remaining < 12) // 判断剩余字节数,最少是包含了offset(Long类型,8字节)和size(Int类型,4字节)字段的大小
  • return allDone()
  • // 获取偏移量
  • val offset = topIter.getLong()
  • // 获取大小
  • val size = topIter.getInt()
  • // 判断大小是否合法
  • if(size < Message.MinMessageOverhead)
  • throw new InvalidMessageException("Message found with corrupt size (" + size + ") in shallow iterator")
  • // we have an incomplete message
  • // 判断剩余字节数是否小于size,如果小于说明剩余字节不能凑成一条消息
  • if(topIter.remaining < size)
  • return allDone()
  • // read the current message and check correctness
  • // 获取缓冲区副本
  • val message = topIter.slice()
  • // 设置limit,此时message的position ~ limit之间就是当前这条消息的数据
  • message.limit(size)
  • // 原缓冲区移到下一个消息的起始偏移量
  • topIter.position(topIter.position + size)
  • // 读取当前消息的数据
  • val newMessage = new Message(message)
  • if(isShallow) {
  • // 浅层迭代,直接返回消息数据
  • new MessageAndOffset(newMessage, offset)
  • } else {
  • // 深层迭代,需要根据压缩器类型进行匹配操作
  • newMessage.compressionCodec match {
  • case NoCompressionCodec =>
  • // 无压缩器,直接返回消息
  • innerIter = null
  • new MessageAndOffset(newMessage, offset)
  • case _ =>
  • // 有压缩器,创建深层迭代器
  • innerIter = ByteBufferMessageSet.deepIterator(new MessageAndOffset(newMessage, offset))
  • // 使用深层迭代器进行迭代,迭代过程交给了makeNext()方法
  • if(!innerIter.hasNext)
  • innerIter = null
  • makeNext()
  • }
  • }
  • }
  • override def makeNext(): MessageAndOffset = {
  • if(isShallow){
  • // 如果是浅层迭代,返回浅层迭代的下一个元素
  • makeNextOuter
  • } else {
  • // 如果是深层迭代则需要根据情况判断
  • if(innerDone())
  • // 深层迭代已完成,返回浅层迭代的下一个元素
  • makeNextOuter
  • else
  • // 深层迭代未完成,返回深层迭代的下一个元素
  • innerIter.next()
  • }
  • }
  • }
  • }

从源码可以得知,internalIterator(...)方法通过判断浅层消息是否存储了压缩器类型来决定是否创建深层迭代器,ByteBufferMessageSet的deepIterator(wrapperMessageAndOffset: MessageAndOffset): Iterator[MessageAndOffset]方法会根据传入的浅层消息数据创建深层迭代器,它的源码如下:

  • // kafka.message.ByteBufferMessageSet类
  • /**
  • * Deep iterator that decompresses the message sets and adjusts timestamp and offset if needed.
  • * 返回一个深层迭代器,用于迭代压缩消息,如果有需要会调整时间戳和offset
  • * @param wrapperMessageAndOffset 外层消息的MessageAndOffset对象
  • * @return
  • */
  • def deepIterator(wrapperMessageAndOffset: MessageAndOffset): Iterator[MessageAndOffset] = {
  • import Message._
  • new IteratorTemplate[MessageAndOffset] {
  • // 解析外层消息的MessageAndOffset对象
  • val MessageAndOffset(wrapperMessage, wrapperMessageOffset) = wrapperMessageAndOffset
  • // 检查外层消息的载荷,因为内层消息存在外层消息的载荷中
  • if (wrapperMessage.payload == null)
  • throw new KafkaException(s"Message payload is null: $wrapperMessage")
  • // 根据魔数处理时间戳和时间戳类型
  • val wrapperMessageTimestampOpt: Option[Long] =
  • if (wrapperMessage.magic > MagicValue_V0) Some(wrapperMessage.timestamp) else None
  • val wrapperMessageTimestampTypeOpt: Option[TimestampType] =
  • if (wrapperMessage.magic > MagicValue_V0) Some(wrapperMessage.timestampType) else None
  • // 用于标记最后一个内层消息的offset
  • var lastInnerOffset = -1L
  • val messageAndOffsets = {
  • // 根据内层消息构造ByteBufferBackedInputStream流
  • val inputStream = new ByteBufferBackedInputStream(wrapperMessage.payload)
  • // 根据压缩器,ByteBufferBackedInputStream流构造DataInputStream流,装饰器模式
  • val compressed = try {
  • new DataInputStream(CompressionFactory(wrapperMessage.compressionCodec, wrapperMessage.magic, inputStream))
  • } catch {
  • case ioe: IOException =>
  • throw new InvalidMessageException(s"Failed to instantiate input stream compressed with ${wrapperMessage.compressionCodec}", ioe)
  • }
  • // 用于存放内层消息
  • val innerMessageAndOffsets = new ArrayDeque[MessageAndOffset]()
  • try {
  • // 不断循环,读取compressed流中的数据,添加到innerMessageAndOffsets
  • while (true)
  • innerMessageAndOffsets.add(readMessageFromStream(compressed))
  • } catch {
  • // 当读取到末尾时会抛出EOFException异常,结束读取即可
  • case eofe: EOFException =>
  • // we don't do anything at all here, because the finally
  • // will close the compressed input stream, and we simply
  • // want to return the innerMessageAndOffsets
  • case ioe: IOException =>
  • throw new InvalidMessageException(s"Error while reading message from stream compressed with ${wrapperMessage.compressionCodec}", ioe)
  • } finally {
  • CoreUtils.swallow(compressed.close())
  • }
  • // 返回读取到的内层消息集合
  • innerMessageAndOffsets
  • }
  • // 从压缩流中读取数据并转换为MessageAndOffset对象
  • private def readMessageFromStream(compressed: DataInputStream): MessageAndOffset = {
  • // 读取offset和消息大小
  • val innerOffset = compressed.readLong()
  • val recordSize = compressed.readInt()
  • // 检查边界,消息大小不可小于最小的消息头信息大小
  • if (recordSize < MinMessageOverhead)
  • throw new InvalidMessageException(s"Message found with corrupt size `$recordSize` in deep iterator")
  • // read the record into an intermediate record buffer (i.e. extra copy needed)
  • // 大小为recordSize字节数组,用于存放消息数据
  • val bufferArray = new Array[Byte](recordSize)
  • // 使用压缩流读取消息数据到bufferArray中
  • compressed.readFully(bufferArray, 0, recordSize)
  • // 使用bufferArray创建一个ByteBuffer
  • val buffer = ByteBuffer.wrap(bufferArray)
  • // Override the timestamp if necessary
  • // 根据buffer创建消息,如果有需要会更新时间戳
  • val newMessage = new Message(buffer, wrapperMessageTimestampOpt, wrapperMessageTimestampTypeOpt)
  • // Inner message and wrapper message must have same magic value
  • // 判断得到的消息的魔数是否与外层消息一致,如果不一致抛出IllegalStateException异常
  • if (newMessage.magic != wrapperMessage.magic)
  • throw new IllegalStateException(s"Compressed message has magic value ${wrapperMessage.magic} " +
  • s"but inner message has magic value ${newMessage.magic}")
  • // 更新lastInnerOffset
  • lastInnerOffset = innerOffset
  • // 返回以消息对象和offset构成的MessageAndOffset对象
  • new MessageAndOffset(newMessage, innerOffset)
  • }
  • // 获取迭代器中下一个元素
  • override def makeNext(): MessageAndOffset = {
  • /**
  • * messageAndOffsets的类型是new ArrayDeque[MessageAndOffset]()
  • * poll出第一个
  • */
  • messageAndOffsets.pollFirst() match {
  • // 为空,说明迭代完了
  • case null => allDone()
  • // 迭代到的元素是MessageAndOffset对象
  • case nextMessage@ MessageAndOffset(message, offset) =>
  • // 根据魔数版本处理offset
  • if (wrapperMessage.magic > MagicValue_V0) {
  • // 当魔数版本为MagicValue_V1时,压缩消息是倒序存储的
  • val relativeOffset = offset - lastInnerOffset
  • val absoluteOffset = wrapperMessageOffset + relativeOffset
  • new MessageAndOffset(message, absoluteOffset)
  • } else {
  • // 当魔数版本为MagicValue_V0时直接返回,不做处理
  • nextMessage
  • }
  • }
  • }
  • }
  • }

deepIterator(...)方法的实现中,readMessageFromStream(compressed: DataInputStream): MessageAndOffset方法会从流中读取消息数据并构建为MessageAndOffset对象,然后将其存放到队列中进行保存,在每次迭代时从队列中取即可。需要注意的是,这里处理流的方式依旧使用了装饰者模式,即根据消息数据创建ByteBufferBackedInputStream流,然后通过CompressionFactory工厂根据ByteBufferBackedInputStream流创建压缩流(GZIPInputStream、SnappyInputStream或KafkaLZ4BlockInputStream)、最后使用DataInputStream装饰压缩输入流。

4.3. 消息验证和offset分配

ByteBufferMessageSet类的validateMessagesAndAssignOffsets()方法实现了验证消息并分配offset的功能,它的源码如下:

  • // kafka.message.ByteBufferMessageSet#validateMessagesAndAssignOffsets
  • /**
  • * Update the offsets for this message set and do further validation on messages including:
  • * 更新集合中Message的offset,同时验证消息
  • * 1. Messages for compacted topics must have keys 压缩的消息必须有键
  • * 2. When magic value = 1, inner messages of a compressed message set must have monotonically increasing offsets
  • * starting from 0. 如果魔数为1,内层压缩消息的offset必须是从offset单调递增的
  • * 3. When magic value = 1, validate and maybe overwrite timestamps of messages. 当魔数为1,验证时间戳,同时在必要条件下重写时间戳
  • *
  • * This method will convert the messages in the following scenarios:
  • * 在下面的两类情况下会进行消息转换(即消息魔数与messageFormatVersion不一致时)
  • * A. Magic value of a message = 0 and messageFormatVersion is 1 魔数为0,messageFormatVersion为1
  • * B. Magic value of a message = 1 and messageFormatVersion is 0 模式为1,messageFormatVersion为0
  • *
  • * If no format conversion or value overwriting is required for messages, this method will perform in-place
  • * operations and avoid re-compression.
  • * 如果不需要进行消息格式转换,且不需要重写消息的部分数据,将会复用当前的ByteBufferMessageSet以避免重新压缩
  • *
  • * Returns the message set and a boolean indicating whether the message sizes may have changed.
  • * 用于验证消息并分配offset
  • */
  • private[kafka] def validateMessagesAndAssignOffsets(offsetCounter: LongRef,
  • now: Long,
  • sourceCodec: CompressionCodec,
  • targetCodec: CompressionCodec,
  • compactedTopic: Boolean = false,
  • messageFormatVersion: Byte = Message.CurrentMagicValue,
  • messageTimestampType: TimestampType,
  • messageTimestampDiffMaxMs: Long): (ByteBufferMessageSet, Boolean) = {
  • if (sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) {
  • // check the magic value
  • // 检查所有Message的魔数与指定的魔数是否一致
  • if (!isMagicValueInAllWrapperMessages(messageFormatVersion)) {
  • // Message format conversion
  • /**
  • * 因为存在Message的魔数不一致问题,需要进行统一,这可能会导致消息总长度变化,需要创建新的ByteBufferMessageSet。
  • * 同时还会进行offset的分配,验证并更新CRC32、时间戳等信息
  • */
  • (convertNonCompressedMessages(offsetCounter, compactedTopic, now, messageTimestampType, messageTimestampDiffMaxMs,
  • messageFormatVersion), true)
  • } else {
  • // Do in-place validation, offset assignment and maybe set timestamp
  • /**
  • * 处理非压缩消息且魔数值统一的情况,由于魔数确定,长度不会发生改变。
  • * 主要是进行offset分配,验证并更新CRC32、时间戳等信息。
  • */
  • (validateNonCompressedMessagesAndAssignOffsetInPlace(offsetCounter, now, compactedTopic, messageTimestampType,
  • messageTimestampDiffMaxMs), false)
  • }
  • } else {
  • /**
  • * 处理压缩消息的情况。
  • * inPlaceAssignment标识是否可以直接复用当前ByteBufferMessage对象。
  • * 下面4种情况不能复用:
  • * 1. 消息当前压缩类型与此broker指定的压缩类型不一致,需要重新压缩
  • * 2. 魔数为0时,需要重写消息的offset为绝对的offset
  • * 3. 魔数大于0,但内部压缩消息某些字段需要修改,例如时间戳
  • * 4. 消息格式需要转换
  • */
  • // Deal with compressed messages
  • // We cannot do in place assignment in one of the following situations:
  • // 1. Source and target compression codec are different
  • // 2. When magic value to use is 0 because offsets need to be overwritten
  • // 3. When magic value to use is above 0, but some fields of inner messages need to be overwritten.
  • // 4. Message format conversion is needed.
  • // No in place assignment situation 1 and 2
  • var inPlaceAssignment = sourceCodec == targetCodec && // 检查情况1
  • messageFormatVersion > Message.MagicValue_V0 // 检查情况2
  • var maxTimestamp = Message.NoTimestamp
  • val expectedInnerOffset = new LongRef(0)
  • val validatedMessages = new mutable.ArrayBuffer[Message]
  • // 遍历内层压缩消息,此步骤会解压
  • this.internalIterator(isShallow = false).foreach { messageAndOffset =>
  • val message = messageAndOffset.message
  • // 检查消息的键
  • validateMessageKey(message, compactedTopic)
  • if (message.magic > Message.MagicValue_V0 && messageFormatVersion > Message.MagicValue_V0) {
  • // No in place assignment situation 3
  • // Validate the timestamp
  • // 检查时间戳
  • validateTimestamp(message, now, messageTimestampType, messageTimestampDiffMaxMs)
  • // Check if we need to overwrite offset
  • // 检查情况3,检查内部offset是否正常
  • if (messageAndOffset.offset != expectedInnerOffset.getAndIncrement())
  • inPlaceAssignment = false
  • maxTimestamp = math.max(maxTimestamp, message.timestamp)
  • }
  • if (sourceCodec != NoCompressionCodec && message.compressionCodec != NoCompressionCodec)
  • throw new InvalidMessageException("Compressed outer message should not have an inner message with a " +
  • s"compression attribute set: $message")
  • // No in place assignment situation 4
  • // 检查情况4
  • if (message.magic != messageFormatVersion)
  • inPlaceAssignment = false
  • // 保存通过上述检测和转换的Message的集合
  • validatedMessages += message.toFormatVersion(messageFormatVersion)
  • }
  • if (!inPlaceAssignment) {
  • // 不能复用当前的ByteBufferMessage对象的场景
  • // Cannot do in place assignment.
  • val wrapperMessageTimestamp = {
  • if (messageFormatVersion == Message.MagicValue_V0)
  • Some(Message.NoTimestamp)
  • else if (messageFormatVersion > Message.MagicValue_V0 && messageTimestampType == TimestampType.CREATE_TIME)
  • Some(maxTimestamp)
  • else // Log append time
  • Some(now)
  • }
  • // 创建新ByteBufferMessageSet对象,重新压缩。此时调用上面介绍的create()方法进行压缩
  • (new ByteBufferMessageSet(compressionCodec = targetCodec,
  • offsetCounter = offsetCounter,
  • wrapperMessageTimestamp = wrapperMessageTimestamp,
  • timestampType = messageTimestampType,
  • messages = validatedMessages: _*), true)
  • } else {
  • // 复用当前ByteBufferMessageSet对象,这样可以减少一次压缩操作
  • // Do not do re-compression but simply update the offset, timestamp and attributes field of the wrapper message.
  • // 更新外层消息的offset,将其offset更新为内部最后一条压缩消息的offset
  • buffer.putLong(0, offsetCounter.addAndGet(validatedMessages.size) - 1)
  • // validate the messages
  • validatedMessages.foreach(_.ensureValid())
  • var crcUpdateNeeded = true
  • val timestampOffset = MessageSet.LogOverhead + Message.TimestampOffset
  • val attributeOffset = MessageSet.LogOverhead + Message.AttributesOffset
  • val timestamp = buffer.getLong(timestampOffset)
  • val attributes = buffer.get(attributeOffset)
  • if (messageTimestampType == TimestampType.CREATE_TIME && timestamp == maxTimestamp)
  • // We don't need to recompute crc if the timestamp is not updated.
  • crcUpdateNeeded = false
  • else if (messageTimestampType == TimestampType.LOG_APPEND_TIME) {
  • // Set timestamp type and timestamp
  • // 更新外层消息的时间戳
  • buffer.putLong(timestampOffset, now)
  • // 更新外层消息的attribute
  • buffer.put(attributeOffset, messageTimestampType.updateAttributes(attributes))
  • }
  • if (crcUpdateNeeded) {
  • // need to recompute the crc value
  • buffer.position(MessageSet.LogOverhead)
  • val wrapperMessage = new Message(buffer.slice())
  • // 更新外层消息的CRC32
  • Utils.writeUnsignedInt(buffer, MessageSet.LogOverhead + Message.CrcOffset, wrapperMessage.computeChecksum)
  • }
  • buffer.rewind()
  • (this, false)
  • }
  • }
  • }
  • // We create this method to avoid a memory copy. It reads from the original message set and directly
  • // writes the converted messages into new message set buffer. Hence we don't need to allocate memory for each
  • // individual message during message format conversion.
  • /**
  • * Message的魔数不一致问题,需要进行统一,这可能会导致消息总长度变化,需要创建新的ByteBufferMessageSet。
  • * 同时还会进行offset的分配,验证并更新CRC32、时间戳等信息。
  • * @param offsetCounter offset自增器
  • * @param compactedTopic 是否是压缩主题
  • * @param now 当前时间,用于在时间戳类型为CREATE_TIME时验证消息时间戳
  • * @param timestampType 时间戳类型
  • * @param messageTimestampDiffMaxMs
  • * @param toMagicValue 转换后的魔数
  • * @return 转换后的ByteBufferMessageSet对象
  • *
  • * ByteBufferMessageSet中单条消息格式
  • * |------- | ---- | ----- | ------|------------|-----------|----------|-----|------------|------ |
  • * | offset | size | CRC32 | magic | attributes | timestamp | key size | key | value size | value |
  • * |------- | ---- | ----- | ------|------------|-----------|----------|-----|------------|------ |
  • */
  • private def convertNonCompressedMessages(offsetCounter: LongRef,
  • compactedTopic: Boolean,
  • now: Long,
  • timestampType: TimestampType,
  • messageTimestampDiffMaxMs: Long,
  • toMagicValue: Byte): ByteBufferMessageSet = {
  • // 计算转换后需要的字节大小
  • val sizeInBytesAfterConversion = shallowValidBytes + // 所有消息的载荷大小
  • this.internalIterator(isShallow = true).map { messageAndOffset =>
  • Message.headerSizeDiff(messageAndOffset.message.magic, toMagicValue)
  • }.sum // 所有消息头信息的大小
  • // 根据转换后的字节大小创建ByteBuffer
  • val newBuffer = ByteBuffer.allocate(sizeInBytesAfterConversion)
  • var newMessagePosition = 0
  • // 因为无压缩,直接浅层迭代消息
  • this.internalIterator(isShallow = true).foreach { case MessageAndOffset(message, _) =>
  • // 验证消息的键,如果是压缩主题,消息必须要有键
  • validateMessageKey(message, compactedTopic)
  • // 验证时间戳
  • validateTimestamp(message, now, timestampType, messageTimestampDiffMaxMs)
  • newBuffer.position(newMessagePosition)
  • // 写入偏移量
  • newBuffer.putLong(offsetCounter.getAndIncrement())
  • // 新的消息大小 = 消息数据大小 + 消息头信息大小
  • val newMessageSize = message.size + Message.headerSizeDiff(message.magic, toMagicValue)
  • // 写入新的消息大小
  • newBuffer.putInt(newMessageSize)
  • // 从当前的position位置开始创建缓冲区副本
  • val newMessageBuffer = newBuffer.slice()
  • // 重设limit为新消息大小,此时position ~ limit之间的空间是用于装载消息数据的
  • newMessageBuffer.limit(newMessageSize)
  • // 转换消息并写入到newMessageBuffer缓冲区
  • message.convertToBuffer(toMagicValue, newMessageBuffer, now, timestampType)
  • // 更新position
  • newMessagePosition += MessageSet.LogOverhead + newMessageSize
  • }
  • // position置0
  • newBuffer.rewind()
  • // 返回以newBuffer创建ByteBufferMessageSet对象
  • new ByteBufferMessageSet(newBuffer)
  • }
  • /**
  • * 验证非压缩消息并分配offset
  • * ByteBufferMessageSet中单条消息格式
  • * |------- | ---- | ----- | ------|------------|-----------|----------|-----|------------|------ |
  • * | offset | size | CRC32 | magic | attributes | timestamp | key size | key | value size | value |
  • * |------- | ---- | ----- | ------|------------|-----------|----------|-----|------------|------ |
  • * 这个方法只会更新offset,验证并更新CRC32、时间戳等信息
  • * @param offsetCounter offset自增器
  • * @param now 当前时间戳
  • * @param compactedTopic 是否是压缩主题
  • * @param timestampType 时间戳类型
  • * @param timestampDiffMaxMs
  • * @return
  • */
  • private def validateNonCompressedMessagesAndAssignOffsetInPlace(offsetCounter: LongRef,
  • now: Long,
  • compactedTopic: Boolean,
  • timestampType: TimestampType,
  • timestampDiffMaxMs: Long): ByteBufferMessageSet = {
  • // do in-place validation and offset assignment
  • var messagePosition = 0
  • buffer.mark()
  • while (messagePosition < sizeInBytes - MessageSet.LogOverhead) {
  • buffer.position(messagePosition)
  • // 写入offset
  • buffer.putLong(offsetCounter.getAndIncrement())
  • // 获取消息大小
  • val messageSize = buffer.getInt()
  • // 得到标识消息数据的Buffer切片
  • val messageBuffer = buffer.slice()
  • // 设置limit为当前遍历到的Message数据尾部
  • messageBuffer.limit(messageSize)
  • // 创建Message对象
  • val message = new Message(messageBuffer)
  • // 验证消息的键,如果是压缩主题,消息必须要有键
  • validateMessageKey(message, compactedTopic)
  • if (message.magic > Message.MagicValue_V0) {
  • // 当魔数大于V0时,还需要验证消息的时间戳
  • validateTimestamp(message, now, timestampType, timestampDiffMaxMs)
  • if (timestampType == TimestampType.LOG_APPEND_TIME) { // 时间戳类型为追加类型
  • // 写入当前时间戳
  • message.buffer.putLong(Message.TimestampOffset, now)
  • // 更新attributes中的时间戳类型标识位
  • message.buffer.put(Message.AttributesOffset, timestampType.updateAttributes(message.attributes))
  • // 写入CRC校验码
  • Utils.writeUnsignedInt(message.buffer, Message.CrcOffset, message.computeChecksum)
  • }
  • }
  • // 更新messagePosition
  • messagePosition += MessageSet.LogOverhead + messageSize
  • }
  • buffer.reset()
  • this
  • }

因为涉及到不同版本魔数的消息之间的转换以及需要根据压缩情况分别处理,因此该方法内部判断非常多。需要验证的部分大致有以下几个:

  • 检查魔数;
  • 检查时间戳与时间戳类型;
  • 对于压缩消息需要检查它是否有key;
  • 可以重新设定时间戳类型和时间戳;
  • 进行offset分配;
  • 如果消息压缩类型与Broker指定压缩类型不一致,需要进行重新压缩。

5. FileMessageSet

Kafka中使用FileMessageSet管理上文介绍的日志文件,它对应磁盘上的一个真正的日志文件。它的定义和重要的几个字段如下:

  • /**
  • * An on-disk message set. An optional start and end position can be applied to the message set
  • * which will allow slicing a subset of the file.
  • * @param file The file name for the underlying log data 指向磁盘上对应的日志文件
  • * @param channel the underlying file channel used 用于读写对应的日志文件
  • * @param start A lower bound on the absolute position in the file from which the message set begins 在表示日志文件分片时,分片的起始位置偏移量
  • * @param end The upper bound on the absolute position in the file at which the message set ends 在表示日志文件分片时,分片的结束位置偏移量
  • * @param isSlice Should the start and end parameters be used for slicing? 是否是日志分片文件
  • */
  • @nonthreadsafe
  • class FileMessageSet private[kafka](@volatile var file: File,
  • private[log] val channel: FileChannel,
  • private[log] val start: Int,
  • private[log] val end: Int,
  • isSlice: Boolean) extends MessageSet with Logging {
  • /**
  • * the size of the message set in bytes
  • * 消息文件的大小,单位为字节
  • * 因为可能有多个Handler线程并发向同一个分区写入消息,_size是AtomicInteger类型。
  • **/
  • private val _size =
  • if(isSlice)
  • // 如果是分片,即为end - start
  • new AtomicInteger(end - start) // don't check the file size if this is just a slice view
  • else
  • // 否则表示整个日志文件的大小,需要进行计算
  • new AtomicInteger(math.min(channel.size.toInt, end) - start)
  • /* if this is not a slice, update the file pointer to the end of the file */
  • if (!isSlice)
  • /**
  • * set the file position to the last byte in the file
  • * 如果不是分片,则将FileChannel的position置为文件末尾,即从文件末尾开始写入数据
  • **/
  • channel.position(math.min(channel.size.toInt, end))
  • ...
  • }

FileMessageSet在构造时会获取时传入的File对象的FileChannel对象,然后通过该FileChannel实现对底层文件的操作,以其中的一个构造方法为例:

  • // kafka.log.FileMessageSet类
  • /**
  • * Create a file message set with no slicing, and with initFileSize and preallocate.
  • * For windows NTFS and some old LINUX file system, set preallocate to true and initFileSize
  • * with one value (for example 512 * 1024 *1024 ) can improve the kafka produce performance.
  • * If it's new file and preallocate is true, end will be set to 0. Otherwise set to Int.MaxValue.
  • * @param file 文件
  • * @param fileAlreadyExists 文件是否存在
  • * @param initFileSize 初始化文件大小
  • * @param preallocate 是否采用预分配
  • */
  • def this(file: File, fileAlreadyExists: Boolean, initFileSize: Int, preallocate: Boolean) =
  • // 调用重载构造方法
  • this(file,
  • channel = FileMessageSet.openChannel(file, mutable = true, fileAlreadyExists, initFileSize, preallocate),
  • start = 0,
  • // 当是新创建的文件,且采用了预分配时,end会被置为0,因此当该文件开始写入数据时,其实是从开头开始写入的
  • end = ( if ( !fileAlreadyExists && preallocate ) 0 else Int.MaxValue),
  • isSlice = false)
  • /**
  • * Open a channel for the given file
  • * For windows NTFS and some old LINUX file system, set preallocate to true and initFileSize
  • * with one value (for example 512 * 1025 *1024 ) can improve the kafka produce performance.
  • * @param file File path
  • * @param mutable mutable
  • * @param fileAlreadyExists File already exists or not
  • * @param initFileSize The size used for pre allocate file, for example 512 * 1025 *1024
  • * @param preallocate Pre allocate file or not, gotten from configuration.
  • */
  • def openChannel(file: File, mutable: Boolean, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false): FileChannel = {
  • // 根据mutable参数决定文件是否可写
  • if (mutable) {
  • if (fileAlreadyExists)
  • // 如果文件已经存在,构建RandomAccessFile对象,获取其channel并返回
  • new RandomAccessFile(file, "rw").getChannel()
  • else {
  • // 否则根据preallocate来决定如何创建
  • if (preallocate) {
  • // 进行文件预分配
  • val randomAccessFile = new RandomAccessFile(file, "rw")
  • randomAccessFile.setLength(initFileSize)
  • randomAccessFile.getChannel()
  • }
  • else
  • // 创建可读写的FileChannel并返回
  • new RandomAccessFile(file, "rw").getChannel()
  • }
  • }
  • else
  • // 创建只读的FileChannel并返回
  • new FileInputStream(file).getChannel()
  • }

FileMessageSet提供了分片功能,可以,由isSlice字段决定是否使用分片:

  • // kafka.log.FileMessageSet类
  • /**
  • * the size of the message set in bytes
  • * 消息文件的大小,单位为字节
  • * 因为可能有多个Handler线程并发向同一个分区写入消息,_size是AtomicInteger类型。
  • **/
  • private val _size =
  • if(isSlice)
  • // 如果是分片,即为end - start,不对文件大小进行检查
  • new AtomicInteger(end - start) // don't check the file size if this is just a slice view
  • else
  • // 否则需要对文件大小进行检查
  • new AtomicInteger(math.min(channel.size.toInt, end) - start)
  • /* if this is not a slice, update the file pointer to the end of the file */
  • if (!isSlice)
  • /**
  • * set the file position to the last byte in the file
  • * 如果不是分片,则将FileChannel的position置为文件末尾,即从文件末尾开始写入数据
  • * 会检查文件大小,取文件大小和end参数的最小值
  • **/
  • channel.position(math.min(channel.size.toInt, end))

针对是否使用分片,在初始化_size字段及设置FileChannel的position值的操作也会不同。

5.1. 日志的追加

FileMessageSet在使用append(messages: ByteBufferMessageSet)追加日志,注意该方法接收的参数是ByteBufferMessageSet类型,源码如下:

  • // kafka.log.FileMessageSet类
  • /**
  • * Append these messages to the message set
  • */
  • def append(messages: ByteBufferMessageSet) {
  • // 写文件
  • val written = messages.writeFullyTo(channel)
  • // 增加消息文件的大小记录值
  • _size.getAndAdd(written)
  • }

其内部其实调用了ByteBufferMessageSet的writeFullyTo(channel: GatheringByteChannel): Int方法将数据写入到指定的Channel中:

  • // kafka.message.ByteBufferMessageSet类
  • /** Write the messages in this set to the given channel */
  • def writeFullyTo(channel: GatheringByteChannel): Int = {
  • // mark一下
  • buffer.mark()
  • var written = 0
  • // 将ByteBufferMessageSet中的数据全部写入文件,并记录写入的字节数
  • while (written < sizeInBytes)
  • written += channel.write(buffer)
  • // reset
  • buffer.reset()
  • // 返回写入的字节数
  • written
  • }
  • /**
  • * The total number of bytes in this message set, including any partial trailing messages
  • */
  • def sizeInBytes: Int = buffer.limit

writeFullyTo(...)方法中的buffer是ByteBufferMessageSet内部用于存放消息数据的ByteBuffer缓冲区,由此可见写入操作是比较简单的,最终是由FileChannel管道完成的。

5.2. 日志的查找

查找指定消息的功能由searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition方法实现,使用该方法时需要给定一个targetOffset表示想要查找的消息的offset,而startingPosition用于指定从那个offset开始查找。FileMessageSet会从startingPosition开始逐条遍历映射的消息,并将每个消息的offsettargetOffset进行比较,直到offset大于等于targetOffset则表示找到了,返回查找到的offset即可。需要注意的是,在遍历过程中只会读取消息的LogOverhead(即offset和size),并通过size定位到下一条消息的开始位置,并不会读取消息的载荷;该方法的源码如下:

  • // kafka.log.FileMessageSet#searchFor
  • /**
  • * Search forward for the file position of the last offset that is greater than or equal to the target offset
  • * and return its physical position. If no such offsets are found, return null.
  • * @param targetOffset The offset to search for. 需要查找的偏移量
  • * @param startingPosition The starting position in the file to begin searching from. 开始查找的偏移量
  • */
  • def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition = {
  • var position = startingPosition
  • // 创建大小为LogOverHead的ByteBuffer,用于读取LogOverHead
  • val buffer = ByteBuffer.allocate(MessageSet.LogOverhead)
  • // 当前消息文件的大小
  • val size = sizeInBytes()
  • // 从position位置开始逐条读取消息
  • while(position + MessageSet.LogOverhead < size) {
  • // rewind指针,开始向ByteBuffer写入数据
  • buffer.rewind()
  • channel.read(buffer, position)
  • // 没有剩余数据,抛出异常
  • if(buffer.hasRemaining)
  • throw new IllegalStateException("Failed to read complete buffer for targetOffset %d startPosition %d in %s"
  • .format(targetOffset, startingPosition, file.getAbsolutePath))
  • // rewind指针,开始从ByteBuffer读取数据
  • buffer.rewind()
  • // 读取偏移量
  • val offset = buffer.getLong()
  • // 如果偏移量大于等于targetOffset,表示读到了
  • if(offset >= targetOffset)
  • // 返回OffsetPosition偏移量对象
  • return OffsetPosition(offset, position)
  • // 获取读取到的消息数据的大小
  • val messageSize = buffer.getInt()
  • // 如果消息数据大小小于消息最小大小,说明读取出错,抛出异常
  • if(messageSize < Message.MinMessageOverhead)
  • throw new IllegalStateException("Invalid message size: " + messageSize)
  • // 否则后移position,准备下次读取
  • position += MessageSet.LogOverhead + messageSize
  • }
  • // 找不到targetOffset偏移量对应的消息,返回null
  • null
  • }
  • // kafka.log.FileMessageSet#sizeInBytes
  • /**
  • * The number of bytes taken up by this file set
  • */
  • def sizeInBytes(): Int = _size.get()

5.3. 将日志写出到指定Channel

FileMessageSet的writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Int): Int方法可以将指定的日志写出到指定的Channel中,第一个参数即为目标Channel,第二个参数为写出日志的偏移量,第三个参数为写出日志的大小;该方法的源码如下:

  • // kafka.log.FileMessageSet#writeTo
  • /**
  • * Write some of this set to the given channel.
  • * 将消息数据写入到指定的其他的FileChannel中
  • * @param destChannel The channel to write to.
  • * @param writePosition The position in the message set to begin writing from. 写入数据的起始偏移量
  • * @param size The maximum number of bytes to write 写入数据的大小
  • * @return The number of bytes actually written. 写入数据的字节数
  • */
  • def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Int): Int = {
  • // Ensure that the underlying size has not changed.
  • // 计算可写数据的长度
  • val newSize = math.min(channel.size.toInt, end) - start
  • // 当可写数据长度小于消息文件长度,说明可能在写入过程中文件被截断了,抛出异常
  • if (newSize < _size.get()) {
  • throw new KafkaException("Size of FileMessageSet %s has been truncated during write: old size %d, new size %d"
  • .format(file.getAbsolutePath, _size.get(), newSize))
  • }
  • // 获取写入起始偏移量
  • val position = start + writePosition
  • // 计算可写数据大小
  • val count = math.min(size, sizeInBytes)
  • // 进行写入,使用Channel的transfer方法(零拷贝),并记录写入的数据量字节数
  • val bytesTransferred = (destChannel match {
  • case tl: TransportLayer => tl.transferFrom(channel, position, count)
  • case dc => channel.transferTo(position, count, dc)
  • }).toInt
  • trace("FileMessageSet " + file.getAbsolutePath + " : bytes transferred : " + bytesTransferred
  • + " bytes requested for transfer : " + math.min(size, sizeInBytes))
  • // 返回写入数据量字节数
  • bytesTransferred
  • }

写出实现是比较简单的,需要注意的是,这里的写出操作其实使用了Channel的”零拷贝“实现,直接将FileMessageSet的channel通道中的数据Transfer到destChannel通道中。

5.4. 日志的读取

FileMessageSet还提供了read(position: Int, size: Int): FileMessageSetdef readInto(buffer: ByteBuffer, relativePosition: Int): ByteBuffer用于读取日志文件数据;前一个方法会得到一个FileMessageSet分片,后一个方法则会得到一个装有指定消息数据的ByteBuffer。

5.4.1. 读取为FileMessageSet分片

read(position: Int, size: Int): FileMessageSet方法的源码非常简单,在检查完传入的positionsize参数后直接根据它们构造了一个FileMessageSet对象并返回:

  • // kafka.log.FileMessageSet#read
  • /**
  • * Return a message set which is a view into this set starting from the given position and with the given size limit.
  • *
  • * If the size is beyond the end of the file, the end will be based on the size of the file at the time of the read.
  • *
  • * If this message set is already sliced, the position will be taken relative to that slicing.
  • *
  • * @param position The start position to begin the read from 读取数据的起始偏移量
  • * @param size The number of bytes after the start position to include 读取数据的大小
  • *
  • * @return A sliced wrapper on this message set limited based on the given position and size
  • */
  • def read(position: Int, size: Int): FileMessageSet = {
  • // 检查参数
  • if(position < 0)
  • throw new IllegalArgumentException("Invalid position: " + position)
  • if(size < 0)
  • throw new IllegalArgumentException("Invalid size: " + size)
  • // 返回一个日志文件分片
  • new FileMessageSet(file,
  • channel,
  • start = this.start + position,
  • end = math.min(this.start + position + size, sizeInBytes()))
  • }

5.4.2. 读取到指定的Buffer

与读取为FileMessageSet分片不同,def readInto(buffer: ByteBuffer, relativePosition: Int): ByteBuffer方法会从FileMessageSet的buffer当前start所指的位置开始,读取relativePosition字节到传入的ByteBuffer中,源码如下:

  • // kafka.log.FileMessageSet#readInto
  • /**
  • * Read from the underlying file into the buffer starting at the given position
  • */
  • def readInto(buffer: ByteBuffer, relativePosition: Int): ByteBuffer = {
  • channel.read(buffer, relativePosition + this.start)
  • buffer.flip()
  • buffer
  • }

5.5. 日志文件的裁剪

与描述索引文件的OffsetIndex类类似,FileMessageSet也提供了裁剪日志文件的方法,实际裁剪功能使用FileChannel的truncate(long size)方法完成,将保留前targetSize个字节的数据,其他数据将会裁掉,源码如下:

  • // kafka.log.FileMessageSet#truncateTo
  • /**
  • * Truncate this file message set to the given size in bytes. Note that this API does no checking that the
  • * given size falls on a valid message boundary.
  • * In some versions of the JDK truncating to the same size as the file message set will cause an
  • * update of the files mtime, so truncate is only performed if the targetSize is smaller than the
  • * size of the underlying FileChannel.
  • * It is expected that no other threads will do writes to the log when this function is called.
  • * @param targetSize The size to truncate to. Must be between 0 and sizeInBytes. 裁剪的目标大小
  • * @return The number of bytes truncated off 裁剪掉的字节数
  • */
  • def truncateTo(targetSize: Int): Int = {
  • // 获取日志文件原始大小
  • val originalSize = sizeInBytes
  • // 检查targetSize的有效性
  • if(targetSize > originalSize || targetSize < 0)
  • throw new KafkaException("Attempt to truncate log segment to " + targetSize + " bytes failed, " +
  • " size of this log segment is " + originalSize + " bytes.")
  • if (targetSize < channel.size.toInt) {
  • // 裁剪文件
  • channel.truncate(targetSize)
  • // 移动position
  • channel.position(targetSize)
  • // 修改_size
  • _size.set(targetSize)
  • }
  • // 返回裁剪掉的字节数
  • originalSize - targetSize
  • }

5.6. 日志文件的删除

FileMessageSet提供的删除方法则更简单了,它会直接将底层文件删除:

  • // kafka.log.FileMessageSet#delete
  • /**
  • * Delete this message set from the filesystem
  • * 删除文件
  • * @return True iff this message set was deleted.
  • */
  • def delete(): Boolean = {
  • CoreUtils.swallow(channel.close())
  • // 删除文件
  • file.delete()
  • }