大数据
流式处理
Kafka
源码解析

Kafka系列 12 - 服务端源码分析 03:日志的存储构成(2)

简介:主要讲解LogSegment、Log的实现

1. LogSegment

LogSegment中封装了一对FileMessageSet和一个OffsetIndex对象,构成对日志文件和索引文件的抽象描述,提供了日志文件和索引文件的读写功能以及其他辅助功能。LogSegment的定义和重要字段如下:

  • /**
  • * A segment of the log. Each segment has two components: a log and an index. The log is a FileMessageSet containing
  • * the actual messages. The index is an OffsetIndex that maps from logical offsets to physical file positions. Each
  • * segment has a base offset which is an offset <= the least offset of any message in this segment and > any offset in
  • * any previous segment.
  • *
  • * A segment with a base offset of [base_offset] would be stored in two files, a [base_offset].index and a [base_offset].log file.
  • *
  • * @param log The message set containing log entries 用于操作对应日志文件的FileMessageSet对象
  • * @param index The offset index 用于操作对应索引文件的OffsetIndex对象
  • * @param baseOffset A lower bound on the offsets in this segment LogSegment中第一条消息的offset值
  • * @param indexIntervalBytes The approximate number of bytes between entries in the index 索引项之间间隔的最小字节数
  • * @param time The time instance
  • */
  • @nonthreadsafe
  • class LogSegment(val log: FileMessageSet,
  • val index: OffsetIndex,
  • val baseOffset: Long,
  • val indexIntervalBytes: Int,
  • val rollJitterMs: Long,
  • time: Time) extends Logging {
  • /**
  • * 标识LogSegment对象创建时间,
  • * 当调用truncateTo()方法将整个日志文件清空时,会将此字段重置为当前时间。
  • * 参与创建新LogSegment的条件判断,在介绍Log类时会详细介绍
  • */
  • var created = time.milliseconds
  • /* the number of bytes since we last added an entry in the offset index */
  • // 记录自从上次添加索引项之后,在日志文件中累计加入的Message集合的字节数,用于判断下次索引项添加的时机
  • private var bytesSinceLastIndexEntry = 0
  • ...
  • }

LogSegment提供了对消息的管理,包括索引和日志;在对消息数据的操作上,包含了对索引文件的操作和对日志文件的操作两个部分,LogSegment通过与之关联的OffsetIndex和FileMessageSet对象来实现,它只提供了相关的辅助功能。

我们需要也别关注的是logindexbaseOffset三个字段,从它们的定义就可以得知,logindex分别表示描述日志文件和索引文件的FileMessageSet和OffsetIndex对象,而baseOffset字段则记录了当前LogSegment中第一条消息的offset值,这个值对于消息的定位来说是比较重要的,另外在后面Log类对LogSegment进行组织管理时,baseOffset字段也有很重要的作用,届时会详解。

1.1. 消息的追加

LogSegment的append(offset: Long, messages: ByteBufferMessageSet)方法用于追加消息,从其定义和参数就可以得知,它所面向的是更高层的消息实体,包括索引和日志两部分;我们来看一下该方法的源码:

  • // kafka.log.LogSegment#append
  • /**
  • * Append the given messages starting with the given offset. Add
  • * an entry to the index if needed.
  • *
  • * It is assumed this method is being called from within a lock.
  • *
  • * 追加消息
  • *
  • * @param offset The first offset in the message set. 表示messages中的第一条消息的offset,如果是压缩消息,则是第一条内层消息的offset
  • * @param messages The messages to append. 待追加的消息集合
  • */
  • @nonthreadsafe
  • def append(offset: Long, messages: ByteBufferMessageSet) {
  • if (messages.sizeInBytes > 0) {
  • trace("Inserting %d bytes at offset %d at position %d".format(messages.sizeInBytes, offset, log.sizeInBytes()))
  • // append an entry to the index (if needed)
  • // 检查是否满足添加索引项的条件(用于产生稀疏索引,并不是每一条消息都会产生offset记录的,通过这个条件可以达到只有一部分消息记录了offset)
  • if(bytesSinceLastIndexEntry > indexIntervalBytes) {
  • // 添加索引
  • index.append(offset, log.sizeInBytes())
  • // 添加成功后,将bytesSinceLastIndexEntry置为0
  • this.bytesSinceLastIndexEntry = 0
  • }
  • // append the messages
  • // 添加消息数据
  • log.append(messages)
  • // 更新bytesSinceLastIndexEntry,加上添加的消息大小
  • this.bytesSinceLastIndexEntry += messages.sizeInBytes
  • }
  • }

从该方法的实现可以看出,对索引和日志的添加其实是通过OffsetIndex和FileMessageSet的append(...)方法实现的,LogSegment的append(...)方法则只提供了边界控制;另外该方法还给出了创建索引的条件,前面介绍过,Kafka不会为每一条日志数据都建立索引,而是采用稀疏索引的方式为一部分日志建立索引,这里其实是通过indexIntervalBytes字段来控制的,从上一个索引项所对应的消息开始计算,如果累积追加的消息数据大于indexIntervalBytes的值时,就会添加一个索引项;indexIntervalBytes的值是可以通过index.interval.bytes配置项配置的。

1.2. 消息的读取

LogSegment提供了read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long = size): FetchDataInfo方法读取消息数据,该方法有四个参数:

  • startOffset:指定读取的起始消息的offset。
  • maxOffset:指定读取结束的offset,可以为空。
  • maxSize:指定读取的最大字节数。
  • maxPosition:指定读取的最大物理地址,可选参数,默认值是日志文件的大小。

这四个参数用于控制读取的数据量,需要注意的是在实际代码中是根据这四个参数来共同控制的,也就是说,读取的范围一定会同时满足四个参数的所有限制;read(...)方法源码如下:

  • // kafka.log.LogSegment#read
  • /**
  • * Read a message set from this segment beginning with the first offset >= startOffset. The message set will include
  • * no more than maxSize bytes and will end before maxOffset if a maxOffset is specified.
  • *
  • * @param startOffset A lower bound on the first offset to include in the message set we read 指定读取的起始消息的offset
  • * @param maxSize The maximum number of bytes to include in the message set we read 指定读取的最大字节数
  • * @param maxOffset An optional maximum offset for the message set we read 指定读取结束的offset,可以为空
  • * @param maxPosition The maximum position in the log segment that should be exposed for read 指定读取的最大物理地址,可选参数,默认值是日志文件的大小
  • *
  • * @return The fetched data and the offset metadata of the first message whose offset is >= startOffset,
  • * or null if the startOffset is larger than the largest offset in this log
  • */
  • @threadsafe
  • def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long = size): FetchDataInfo = {
  • // 检查参数
  • if(maxSize < 0)
  • throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize))
  • val logSize = log.sizeInBytes // this may change, need to save a consistent copy
  • // 转换startOffset为在消息文件中的物理偏移量
  • val startPosition = translateOffset(startOffset)
  • // if the start position is already off the end of the log, return null
  • if(startPosition == null)
  • return null
  • val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition.position)
  • // if the size is zero, still return a log segment but with zero size
  • // 当maxSize为0时,还是会返回一个大小为0的FetchDataInfo对象
  • if(maxSize == 0)
  • return FetchDataInfo(offsetMetadata, MessageSet.Empty)
  • // calculate the length of the message set to read based on whether or not they gave us a maxOffset
  • // 计算需要读取的字节数
  • val length = maxOffset match { // maxOffset通常是Replica的HighWater,消费者最多只能读到HighWater位置的消息
  • case None =>
  • // no max offset, just read until the max position
  • // 没有maxOffset
  • min((maxPosition - startPosition.position).toInt, maxSize)
  • case Some(offset) =>
  • // there is a max offset, translate it to a file position and use that to calculate the max read size;
  • // when the leader of a partition changes, it's possible for the new leader's high watermark to be less than the
  • // true high watermark in the previous leader for a short window. In this window, if a consumer fetches on an
  • // offset between new leader's high watermark and the log end offset, we want to return an empty response.
  • if(offset < startOffset)
  • // maxOffset小于startOffset,返回一个大小为0的FetchDataInfo对象
  • return FetchDataInfo(offsetMetadata, MessageSet.Empty)
  • // 将maxOffset转换为物理偏移量
  • val mapping = translateOffset(offset, startPosition.position)
  • // 计算读取数据时的结束偏移量
  • val endPosition =
  • if(mapping == null)
  • // 当转换maxOffset得到的物理偏移量为空时,则读到消息日志文件末尾
  • logSize // the max offset is off the end of the log, use the end of the file
  • else
  • // 否则取maxOffset的物理偏移量
  • mapping.position
  • // 计算读取的字节数
  • min(min(maxPosition, endPosition) - startPosition.position, maxSize).toInt
  • }
  • /**
  • * FetchDataInfo的第二个参数是一个MessageSet对象
  • * 该对象底层是一个设置了起始位置和长度的FileMessageSet分片对象
  • * 并没有读取消息数据到内存中
  • */
  • FetchDataInfo(offsetMetadata, log.read(startPosition.position, length))
  • }

其中translateOffset(offset: Long, startingFilePosition: Int = 0): OffsetPosition方法用于将offset转换为对应的物理地址,在read(...)方法的参数中,startOffsetmaxOffset参数都需要进行转换,我们先了解该方法的源码:

  • // kafka.log.LogSegment#translateOffset
  • /**
  • * Find the physical file position for the first message with offset >= the requested offset.
  • *
  • * The lowerBound argument is an optimization that can be used if we already know a valid starting position
  • * in the file higher than the greatest-lower-bound from the index.
  • *
  • * @param offset The offset we want to translate 想要转换的offset
  • * @param startingFilePosition A lower bound on the file position from which to begin the search. This is purely an optimization and 限定目标position的下限值,默认是0
  • * when omitted, the search will begin at the position in the offset index.
  • *
  • * @return The position in the log storing the message with the least offset >= the requested offset or null if no message meets this criteria.
  • */
  • @threadsafe
  • private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): OffsetPosition = {
  • /**
  • * 通过OffsetIndex在index文件中查找offset对应的OffsetPosition,
  • * 因为offset是稀疏的,所以此时的offset和position不一定是命中的
  • * 但查找到的结果offset和position必然小于目标offset和position
  • */
  • val mapping = index.lookup(offset)
  • /**
  • * 通过FileMessageSet在消息数据文件中查找目标offset和position
  • */
  • log.searchFor(offset, max(mapping.position, startingFilePosition))
  • }

translateOffset(...)方法内部则使用了OffsetIndex和FileMessageSet的lookup(...)searchFor(...)两个方法来定位实际的物理position,这两个方法在前面的内容中已经详细介绍过了;首先通过OffsetIndex的lookup(...)方法找到小于或等于offset的最大的索引项,该索引项中记录了真实的物理position,然后通过FileMessageSet的searchFor(...)找到offset大于或等于offset的最近的日志项,该日志项中也记录了真实的物理position,最终返回由offset和position构成的OffsetPosition对象。其实可以发现,在这三个过程中,lookup(...)方法得到的position其实只是用于限制searchFor(...)方法开始查找的偏移量,以缩小查找范围。

回到前面的read(...)方法,有了对translateOffset(...)方法的理解,其实read(...)方法相对来说是比较好理解的,最终读取的消息范围中,startOffset所对应的position是起始偏移量,maxOffsetmaxPositionmaxSize同时控制了终止偏移量,一般来说,表示的是副本之间的High Water值,消费者最多只能读到High Water位置的消息,在后面讲解副本机制时会回顾这部分代码。

1.3. 消息索引的重建

当日志文件对应的索引文件出现损坏或丢失时,此时会根据日志文件重建索引文件,LogSegment的recover()方法的功能就是根据日志文件重建索引文件,同时验证日志文件中消息的合法性;该方法的源码如下:

  • // kafka.log.LogSegment#recover
  • /**
  • * Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes from the end of the log and index.
  • * 根据日志文件重建索引文件,同时验证日志文件中消息的合法性
  • *
  • * @param maxMessageSize A bound the memory allocation in the case of a corrupt message size--we will assume any message larger than this
  • * is corrupt.
  • *
  • * @return The number of bytes truncated from the log
  • */
  • @nonthreadsafe
  • def recover(maxMessageSize: Int): Int = {
  • /**
  • * 清空索引文件
  • * 会修改OffsetIndex对象的记录
  • * - 将索引文件中的索引项个数(_entries)置为0
  • * - 将底层MappedByteBuffer对象(mmap)的position置为0
  • * - 将记录的最后一个索引项的offset(_lastOffset)置为baseOffset
  • */
  • index.truncate()
  • // 修改索引文件的大小
  • index.resize(index.maxIndexSize)
  • // 记录已经通过验证的字节数
  • var validBytes = 0
  • // 最后一个索引项对应的物理地址
  • var lastIndexEntry = 0
  • // FileMessageSet的迭代器
  • val iter = log.iterator(maxMessageSize)
  • // 遍历FileMessageSet中的消息
  • try {
  • while(iter.hasNext) {
  • val entry = iter.next
  • // 验证Message是否合法,主要是验证checkSum,不通过时会抛出InvalidMessageException异常
  • entry.message.ensureValid()
  • // 判断是否符合添加索引的条件
  • if(validBytes - lastIndexEntry > indexIntervalBytes) {
  • // we need to decompress the message, if required, to get the offset of the first uncompressed message
  • // 根据是否是压缩消息来决定写入的startOffset
  • val startOffset =
  • entry.message.compressionCodec match {
  • case NoCompressionCodec =>
  • // 不是压缩消息,直接返回消息的offset
  • entry.offset
  • case _ =>
  • // 是压缩消息,需要深层迭代,取压缩消息中第一条消息的offset
  • ByteBufferMessageSet.deepIterator(entry).next().offset
  • }
  • // 添加索引项
  • index.append(startOffset, validBytes)
  • // 记录lastIndexEntry
  • lastIndexEntry = validBytes
  • }
  • // 更新validBytes
  • validBytes += MessageSet.entrySize(entry.message)
  • }
  • } catch {
  • case e: CorruptRecordException =>
  • logger.warn("Found invalid messages in log segment %s at byte offset %d: %s.".format(log.file.getAbsolutePath, validBytes, e.getMessage))
  • }
  • /**
  • * 当try块抛出异常时,validBytes必然是小于日志文件的总大小的
  • * 此时日志文件可能出现了Message数据损坏
  • * 此处的处理时,只要出现Message损坏,则将其与之后所有的消息数据全部抛弃
  • */
  • // 计算日志文件需要截断的字节数
  • val truncated = log.sizeInBytes - validBytes
  • // 对日志文件进行截断操作,丢弃后面验证失败的Message
  • log.truncateTo(validBytes)
  • // 对索引文件进行截断
  • index.trimToValidSize()
  • // 返回日志文件截断的字节数
  • truncated
  • }
  • // kafka.message.Message#ensureValid
  • /**
  • * Throw an InvalidMessageException if isValid is false for this message
  • * 保证消息是有效的,如果无效则抛出InvalidMessageException异常
  • */
  • def ensureValid() {
  • if(!isValid)
  • throw new InvalidMessageException(s"Message is corrupt (stored crc = ${checksum}, computed crc = ${computeChecksum})")
  • }

// TODO 关于压缩消息的结构 We often refer to the nested messages as “inner messages” and the wrapping message as the “outer message.” Note that the key should be null for the outer message and its offset will be the offset of the last inner message.

// TODO 关于压缩消息offset的分配 When receiving recursive version 0 messages, the broker decompresses them and each inner message is assigned an offset individually. In version 1, to avoid server side re-compression, only the wrapper message will be assigned an offset. The inner messages will have relative offsets. The absolute offset can be computed using the offset from the outer message, which corresponds to the offset assigned to the last inner message.

重建索引的过程比较直观,通过迭代FileMessageSet对象以遍历日志文件中的消息数据,根据消息数据存储的offset来重新生成稀疏索引项。在重建的过程中还会通过Message类的ensureValid()方法检查消息数据的有效性。在碰到压缩消息时,取的是压缩消息集合中的第一条消息的offset。

  • MessageSet (Version: 0) => [offset message_size message]
  • offset => INT64
  • message_size => INT32
  • message => crc magic_byte attributes key value
  • crc => INT32
  • magic_byte => INT8
  • attributes => INT8
  • bit 0~2:
  • 0: no compression
  • 1: gzip
  • 2: snappy
  • bit 3~7: unused
  • key => BYTES
  • value => BYTES
  • MessageSet (Version: 1) => [offset message_size message]
  • offset => INT64
  • message_size => INT32
  • message => crc magic_byte attributes key value
  • crc => INT32
  • magic_byte => INT8
  • attributes => INT8
  • bit 0~2:
  • 0: no compression
  • 1: gzip
  • 2: snappy
  • 3: lz4
  • bit 3: timestampType
  • 0: create time
  • 1: log append time
  • bit 4~7: unused
  • timestamp =>INT64
  • key => BYTES
  • value => BYTES

2. Log

在上一篇文章中我们已经探讨了OffsetIndex、Message、ByteBufferMessageSet、FileMessageSet及LogSegment的结构及内部实现,对日志文件和索引文件的构成有了基本的了解;有了这些基础,我们就可以继续研究Kafka中对日志目录的管理。

前面提到过,Kafka以<topic_name>_<partition_id>的命名规则在指定存储数据文件的路径下为每个主题的每个分区建立目录,每个分区目录下存放了分区对应的日志文件和索引文件,并使用Log类描述一个主题分区存储目录。也即是说,一个Log类将管理多个LogSegment对象。

我们先观察Log类的定义及其重要的几个字段:

  • // kafka.log.Log
  • /**
  • * An append-only log for storing messages.
  • *
  • * The log is a sequence of LogSegments, each with a base offset denoting the first message in the segment.
  • *
  • * New log segments are created according to a configurable policy that controls the size in bytes or time interval
  • * for a given segment.
  • *
  • * @param dir The directory in which log segments are created. Log对应的磁盘目录,此目录下存放了每个LogSegment对应的日志文件和索引文件
  • * @param config The log configuration settings Log相关的配置信息
  • * @param recoveryPoint The offset at which to begin recovery--i.e. the first offset which has not been flushed to disk 指定恢复操作的起始offset,recoveryPoint之前的Message已经刷新到磁盘上持久存储,而其后的消息则不一定,出现宕机时可能会丢失。所以只需要恢复recoveryPoint之后的消息即可。
  • * @param scheduler The thread pool scheduler used for background actions 线程池调度器,用于后台操作
  • * @param time The time instance used for checking the clock
  • *
  • */
  • @threadsafe
  • class Log(val dir: File,
  • @volatile var config: LogConfig,
  • @volatile var recoveryPoint: Long = 0L,
  • scheduler: Scheduler,
  • time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
  • import kafka.log.Log._
  • /**
  • * A lock that guards all modifications to the log
  • * 可能存在多个Handler线程并发向同一个Log追加消息,所以对Log的修改操作需要进行同步
  • * */
  • private val lock = new Object
  • /**
  • * last time it was flushed
  • * 最后一次刷盘时间
  • * */
  • private val lastflushedTime = new AtomicLong(time.milliseconds)
  • /**
  • * the actual segments of the log
  • * 用于对LogSegment进行管理
  • * 键是LogSegment的baseOffset,值是LogSegment对象
  • * */
  • private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
  • // 加载LogSegment
  • loadSegments()
  • /**
  • * Calculate the offset of the next message
  • * LogOffsetMetadata对象。主要用于产生分配给消息的offset,同时也是当前副本的LEO(LogEndOffset)。
  • * - messageOffset字段记录了Log中最后一个offset值
  • * - segmentBaseOffset字段记录了activeSegment的baseOffset
  • * - relativePositionInSegment字段记录了activeSegment的大小
  • * */
  • @volatile var nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset(), activeSegment.baseOffset, activeSegment.size.toInt)
  • ...
  • }

在Log类所定义的字段中,我们主要关注segments字段,该字段是一个并发安全的字典,键是LogSegment的baseOffset,而值是LogSegment对象;该字典使用ConcurrentSkipListMap实现,ConcurrentSkipListMap是位于JUC包中的并发安全的字典,它的内部结构实现是一个跳表,提供了快速检索及操作内部元素的功能;跳表是一种典型的以空间换时间的数据结构,其内部的元素是有序的,对于ConcurrentSkipListMap来说,其内部根据键默认为升序排序(也可以指定比较器);关于ConcurrentSkipListMap的具体实现可以看本博客Java多线程系列博文——Java并发安全集合——ConcurrentSkipListMap的实现

2.1. 加载LogSegment

从上述的Log类片段代码中,最终会调用loadSegments()加载LogSegment对象,该方法的源码非常多,我们可以将其分为几个步骤分别分析:

  1. 处理.clean、.deleted、.swap文件;
  2. 加载全部的日志文件和索引文件(可能需要重建索引);
  3. 处理可能存在的由于Broker非正常关闭导致的消息异常。

其中第一个步骤涉及到日志的压缩,第三个步骤涉及日志的检查点机制,这两部分内容将在后面讲解,这里先略去这两部分的代码,直接分析独立的第二个步骤的代码:

  • /* Load the log segments from the log files on disk */
  • private def loadSegments() {
  • // create the log directory if it doesn't exist
  • dir.mkdirs()
  • ...
  • // now do a second pass and load all the .log and .index files
  • /**
  • * 加载全部的日志文件及索引文件,
  • * - 如果存在没有对应的日志文件的索引文件,就将该索引文件也删除
  • * - 如果存在没有对应的索引文件的日志文件,则为其重建索引文件
  • */
  • for(file <- dir.listFiles if file.isFile) {
  • val filename = file.getName
  • if(filename.endsWith(IndexFileSuffix)) {
  • // if it is an index file, make sure it has a corresponding .log file
  • // 是.index文件,判断对应的.log文件是否存在,如果不存在就将.index文件也删除
  • val logFile = new File(file.getAbsolutePath.replace(IndexFileSuffix, LogFileSuffix))
  • if(!logFile.exists) {
  • warn("Found an orphaned index file, %s, with no corresponding log file.".format(file.getAbsolutePath))
  • file.delete()
  • }
  • } else if(filename.endsWith(LogFileSuffix)) {
  • // if its a log file, load the corresponding log segment
  • // 是.log文件,先创建并加载LogSegment
  • val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong
  • val indexFile = Log.indexFilename(dir, start)
  • val segment = new LogSegment(dir = dir,
  • startOffset = start,
  • indexIntervalBytes = config.indexInterval,
  • maxIndexSize = config.maxIndexSize,
  • rollJitterMs = config.randomSegmentJitter,
  • time = time,
  • fileAlreadyExists = true)
  • // 判断对应的.index文件是否存在,如果不存在就根据.log文件重建.index文件
  • if(indexFile.exists()) {
  • // .index文件存在
  • try {
  • // 检查索引文件的完整性
  • segment.index.sanityCheck()
  • } catch {
  • case e: java.lang.IllegalArgumentException =>
  • warn("Found a corrupted index file, %s, deleting and rebuilding index...".format(indexFile.getAbsolutePath))
  • indexFile.delete()
  • segment.recover(config.maxMessageSize)
  • }
  • }
  • else {
  • // .index文件不存在
  • error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath))
  • // 根据.log文件重建.index文件
  • segment.recover(config.maxMessageSize)
  • }
  • // 将LogSegment添加到segments跳表中
  • segments.put(start, segment)
  • }
  • }
  • ...
  • }

上述的加载日志文件和索引文件的代码的流程是比较清晰的,通过遍历dir字段表示目录下的文件,分别进行判断:

  • 当遍历到的是索引文件时,判断对应的日志文件是否存在,如果不存在就将索引文件也删除掉;
  • 当遍历到的是日志文件时,判断对应的索引文件是否存在,如果不存在就重建索引;如果存在,将会检查索引文件的完整性,若检查过程总出现异常就删除索引文件并进行重建。

最终根据dir目录、每一对消息文件的baseOffset来创建相应数量的LogSegment对象,并以baseOffset为键,LogSegment对象为值存储到ConcurrentSkipListMap类型的字典segments中。

需要注意的是这里有一个细节,baseOffset是通过日志文件的文件名(即源码中的start变量)来确定的。由于segments跳表内部元素会根据键进行升序排序,因此通过以baseOffset为键向segments中添加的LogSegment对象可以保证有序,这也就保证了在通过baseOffsetsegments查找对应的LogSegment时的高效率。

2.2. 消息的追加

Log类的append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): LogAppendInfo方法用于向Log中追加消息数据。在前一篇介绍日志的存储构成的文章中,FileMessageSet和LogSegment都存在追加操作,我们需要清楚的是,Log的追加操作是针对消息数据的追加,它最终依旧会调用LogSegment的追加操作,而LogSegment的追加操作既包含日志数据的追加,也包含索引数据的追加,其中追加日志数据的操作交给了FileMessageSet类,追加索引数据的操作则交给了OffsetIndex类。

在了解Log类的append(...)方法之前,我们需要先认识Active Segment的概念;在Log类中定义了一个activeSegment字段:

  • // kafka.log.Log#activeSegment
  • /**
  • * The active segment that is currently taking appends
  • * 由于Kafka消息是顺序写的,因此只有最后一个日志文件是处于可写状态的
  • * activeSegment用于获取最后一个LogSegment
  • */
  • def activeSegment = segments.lastEntry.getValue

由于Kafka的顺序读写特性,向Log中追加消息时是顺序写入的,每个Log所管辖的LogSegment中只有最后一个能够进行写入操作,在其之前的所有LogSegment都不能写入数据,activeSegment()方法就是用于获取这最后一个LogSegment对象的,具体表现为获取segments集合中最后一个元素的值。activeSegment()的值是会改变的,因为随着数据的不断写入,当前activeSegment()表示的日志文件大小会到达一定阈值,此时需要创建新的日志和索引文件,而activeSegment()也会指向描述这个新的日志文件的的LogSegment对象。

有了对Active Segment的认识,我们来分析Log的append(...)方法,它的源码如下:

  • // kafka.log.Log#append
  • /**
  • * Append this message set to the active segment of the log, rolling over to a fresh segment if necessary.
  • *
  • * This method will generally be responsible for assigning offsets to the messages,
  • * however if the assignOffsets=false flag is passed we will only check that the existing offsets are valid.
  • *
  • * 向Log追加消息;
  • * Kafka服务端在处理生产者发来的ProducerRequest时,会将请求解析成ByteBufferMessageSet,并最终调用append()方法完成追加消息
  • *
  • * @param messages The message set to append
  • * @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given
  • *
  • * @throws KafkaStorageException If the append fails due to an I/O error.
  • *
  • * @return Information about the appended messages including the first and last offset.
  • */
  • def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): LogAppendInfo = {
  • // 对ByteBufferMessageSet类型的messages中的Message数据进行验证
  • val appendInfo = analyzeAndValidateMessageSet(messages) // 返回LogAppendInfo类型的对象
  • // if we have any valid messages, append them to the log
  • // shallowCount为0表示messages中没有消息
  • if (appendInfo.shallowCount == 0)
  • return appendInfo
  • // trim any invalid bytes or partial messages before appending it to the on-disk log
  • // 在添加到磁盘日志文件中之前删除无用的字节或部分消息
  • var validMessages = trimInvalidBytes(messages, appendInfo)
  • try {
  • // they are valid, insert them in the log
  • // synchronized加锁
  • lock synchronized {
  • if (assignOffsets) { // 是否需要分配offset,默认是需要的
  • // assign offsets to the message set
  • /**
  • * 获取Log中最后一个offset,从该offset开始向后分配
  • * LongRef内部封装了一些便捷的方法
  • */
  • val offset = new LongRef(nextOffsetMetadata.messageOffset)
  • // 使用appendInfo的firstOffset记录第一条消息的offset
  • appendInfo.firstOffset = offset.value
  • val now = time.milliseconds
  • val (validatedMessages, messageSizesMaybeChanged) = try {
  • /**
  • * 验证并分配offset,这个是ByteBufferMessageSet的方法,返回值为(ByteBufferMessageSet, Boolean)
  • * 注意,在这个过程中,offset(LongRef类型)的value会根据消息的分配而改变,最终变为最后一条消息的offset + 1
  • * 具体操作可以看ByteBufferMessageSet的validateMessagesAndAssignOffsets()方法
  • */
  • validMessages.validateMessagesAndAssignOffsets(offset,
  • now,
  • appendInfo.sourceCodec,
  • appendInfo.targetCodec,
  • config.compact,
  • config.messageFormatVersion.messageFormatVersion,
  • config.messageTimestampType,
  • config.messageTimestampDifferenceMaxMs)
  • } catch {
  • case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
  • }
  • // 记录已经通过验证并分配了offset的ByteBufferMessageSet
  • validMessages = validatedMessages
  • // 使用appendInfo的lastOffset记录最后一条消息的offset
  • appendInfo.lastOffset = offset.value - 1
  • // 如果需要配置了需要添加日志添加时间,就在appendInfo中添加时间戳
  • if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
  • appendInfo.timestamp = now
  • // re-validate message sizes if there's a possibility that they have changed (due to re-compression or message
  • // format conversion)
  • // 如果在ByteBufferMessageSet的validateMessagesAndAssignOffsets()方法中修改了ByteBufferMessageSet的长度,则需要重新检测消息长度
  • if (messageSizesMaybeChanged) {
  • // 遍历消息
  • for (messageAndOffset <- validMessages.shallowIterator) {
  • // 检查每个消息的大小是否大于配置的最大消息大小阈值
  • if (MessageSet.entrySize(messageAndOffset.message) > config.maxMessageSize) {
  • // we record the original message set size instead of the trimmed size
  • // to be consistent with pre-compression bytesRejectedRate recording
  • BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes)
  • BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes)
  • throw new RecordTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
  • .format(MessageSet.entrySize(messageAndOffset.message), config.maxMessageSize))
  • }
  • }
  • }
  • } else {
  • // we are taking the offsets we are given
  • /**
  • * 如果不需要分配offset,检查边界
  • * - 消息offset不是单调递增的
  • * - 第一条消息的offset小于Log中最后一个offset值
  • */
  • if (!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)
  • throw new IllegalArgumentException("Out of order offsets found in " + messages)
  • }
  • // check messages set size may be exceed config.segmentSize
  • // 检查所有消息的总大小是否大于单个文件的大小阈值
  • if (validMessages.sizeInBytes > config.segmentSize) {
  • throw new RecordBatchTooLargeException("Message set size is %d bytes which exceeds the maximum configured segment size of %d."
  • .format(validMessages.sizeInBytes, config.segmentSize))
  • }
  • // maybe roll the log if this segment is full
  • // 可能需要进行滚动操作,即当当前文件剩余大小无法满足本次消息写入时,需要新创建一个activeSegment文件
  • val segment = maybeRoll(validMessages.sizeInBytes)
  • // now append to the log
  • // 追加消息
  • segment.append(appendInfo.firstOffset, validMessages)
  • // increment the log end offset
  • // 更新LEO
  • updateLogEndOffset(appendInfo.lastOffset + 1)
  • trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s"
  • .format(this.name, appendInfo.firstOffset, nextOffsetMetadata.messageOffset, validMessages))
  • // 检查未刷新到磁盘的数据是否达到一定阈值,如果是则调用flush()刷新
  • if (unflushedMessages >= config.flushInterval)
  • flush()
  • appendInfo
  • }
  • } catch {
  • case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e)
  • }
  • }

append(...)方法的过程可以细分为以下几步:

  1. 首先调用Log类的analyzeAndValidateMessageSet()方法,对ByteBufferMessageSet中的Message数据进行验证,并返回LogAppendInfo对象。在LogAppendInfo中封装了ByteBufferMessageSet中第一个消息的offset、最后一个消息的offset、生产者采用的压缩方式、追加到Log的时间戳、服务端用的压缩方式、外层消息的个数、通过验证的总字节数等信息。
  2. 调用Log类的trimInvalidBytes()方法,清除未验证通过的Message。
  3. 调用ByteBufferMessageSet类的validateMessagesAndAssignOffsets()方法,进行内部压缩消息做进一步验证、消息格式转换、调整Magic值、修改时间戳等操作,并为Message分配offset。在ByteBufferMessageSet小节介绍过了,这里就不再赘述了。
  4. 如果在validateMessagesAndAssignOffsets()方法中修改了ByteBufferMessageSet的长度,则重新验证Message的长度是否合法。
  5. 调用Log类的maybeRoll()方法获取Active Segment,此过程可能分配新的Active Segment。
  6. 将ByteBufferMessageSetSet中的消息追加到Active Segment中,通过调用LogSegment的append()方法的实现。
  7. 更新当前副本的LEO,也就是Log.nextOffsetMetadata字段。
  8. 执行flush()操作,将LEO之前的全部Message刷新到磁盘。

这几个步骤中涉及了好几个辅助方法,下面将一一讲解。

2.2.1. 消息验证

Log类的analyzeAndValidateMessageSet(messages: ByteBufferMessageSet): LogAppendInfo方法用于对ByteBufferMessageSet中的Message数据进行验证,主要是验证消息的长度、CRC32校验码、消息offset是否单调递增,这些验证都是对外层消息进行的,并不会解压内部的压缩消息。它的源码如下:

  • // kafka.log.Log#analyzeAndValidateMessageSet
  • /**
  • * Validate the following:
  • * <ol>
  • * <li> each message matches its CRC
  • * <li> each message size is valid
  • * </ol>
  • *
  • * Also compute the following quantities:
  • * <ol>
  • * <li> First offset in the message set
  • * <li> Last offset in the message set
  • * <li> Number of messages
  • * <li> Number of valid bytes
  • * <li> Whether the offsets are monotonically increasing
  • * <li> Whether any compression codec is used (if many are used, then the last one is given)
  • * </ol>
  • */
  • private def analyzeAndValidateMessageSet(messages: ByteBufferMessageSet): LogAppendInfo = {
  • var shallowMessageCount = 0
  • var validBytesCount = 0
  • var firstOffset, lastOffset = -1L
  • var sourceCodec: CompressionCodec = NoCompressionCodec
  • var monotonic = true
  • // 遍历messages进行验证
  • for(messageAndOffset <- messages.shallowIterator) { // 每个都是MessageAndOffset对象
  • // update the first offset if on the first message
  • if(firstOffset < 0)
  • // 记录第一条消息额offset
  • firstOffset = messageAndOffset.offset
  • // check that offsets are monotonically increasing
  • // 检查offset是否是单调自增的
  • if(lastOffset >= messageAndOffset.offset)
  • monotonic = false
  • // update the last offset seen
  • // 更新lastOffset为当前消息的offset
  • lastOffset = messageAndOffset.offset
  • // 获取消息
  • val m = messageAndOffset.message
  • // Check if the message sizes are valid.
  • // 检查消息大小是否合法
  • val messageSize = MessageSet.entrySize(m)
  • if(messageSize > config.maxMessageSize) {
  • BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes)
  • BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes)
  • // 消息过大,抛出RecordTooLargeException异常
  • throw new RecordTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
  • .format(messageSize, config.maxMessageSize))
  • }
  • // check the validity of the message by checking CRC
  • // 检查CRC
  • m.ensureValid()
  • // 更新浅层迭代数量+1
  • shallowMessageCount += 1
  • // 更新有效字节数
  • validBytesCount += messageSize
  • // 记录压缩器
  • val messageCodec = m.compressionCodec
  • if(messageCodec != NoCompressionCodec)
  • sourceCodec = messageCodec
  • }
  • // Apply broker-side compression if any
  • // 获取压缩器
  • val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec)
  • // 根据上面的信息构建LogAppendInfo对象并返回
  • LogAppendInfo(firstOffset, lastOffset, Message.NoTimestamp, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic)
  • }
  • // kafka.message.Message
  • /**
  • * Throw an InvalidMessageException if isValid is false for this message
  • * 保证消息是有效的,如果无效则抛出InvalidMessageException异常
  • */
  • def ensureValid() {
  • if(!isValid)
  • throw new InvalidMessageException(s"Message is corrupt (stored crc = ${checksum}, computed crc = ${computeChecksum})")
  • }
  • /**
  • * Returns true if the crc stored with the message matches the crc computed off the message contents
  • * 判断消息是否有效,即将消息中的checksum与计算出的checksum进行比较
  • */
  • def isValid: Boolean = checksum == computeChecksum
  • /**
  • * Retrieve the previously computed CRC for this message
  • * 读取消息中的CRC值作为checksum
  • */
  • def checksum: Long = Utils.readUnsignedInt(buffer, CrcOffset)
  • /**
  • * Compute the checksum of the message from the message contents
  • * 根据ByteBuffer数据计算checksum值
  • */
  • def computeChecksum: Long =
  • CoreUtils.crc32(buffer.array, buffer.arrayOffset + MagicOffset, buffer.limit - MagicOffset)

2.2.2. 清除验证未通过的消息

trimInvalidBytes(messages: ByteBufferMessageSet, info: LogAppendInfo): ByteBufferMessageSet方法会根据analyzeAndValidateMessageSet(...)方法返回的LogAppendInfo对象,将未通过验证的消息截断,它的实现比较简单,源码如下:

  • // kafka.log.Log#trimInvalidBytes
  • /**
  • * Trim any invalid bytes from the end of this message set (if there are any)
  • * @param messages The message set to trim
  • * @param info The general information of the message set
  • * @return A trimmed message set. This may be the same as what was passed in or it may not.
  • */
  • private def trimInvalidBytes(messages: ByteBufferMessageSet, info: LogAppendInfo): ByteBufferMessageSet = {
  • val messageSetValidBytes = info.validBytes
  • // 如果有效字节数为0,抛出CorruptRecordException异常
  • if(messageSetValidBytes < 0)
  • throw new CorruptRecordException("Illegal length of message set " + messageSetValidBytes + " Message set cannot be appended to log. Possible causes are corrupted produce requests")
  • if(messageSetValidBytes == messages.sizeInBytes) {
  • // 有效字节数与消息大小相同,说明没有无效字节,直接返回即可
  • messages
  • } else {
  • // trim invalid bytes
  • // 创建一个副本
  • val validByteBuffer = messages.buffer.duplicate()
  • // 只取有效字节,将之后的字节全部丢弃,滤得有效字节
  • validByteBuffer.limit(messageSetValidBytes)
  • // 根据滤得的有效字节创建ByteBufferMessageSet对象并返回
  • new ByteBufferMessageSet(validByteBuffer)
  • }
  • }

2.2.3. 滚动Active Segment

Log类的maybeRoll()方法用于检测是否需要创建新Active Segment,条件有下面几个:

  • 当前Active Segment的日志大小加上本次待追加的消息集合大小,超过配置的LogSegment的最大长度。
  • 当前Active Segment的寿命超过了配置的LogSegment最长存活时间。
  • 索引文件满了。

maybeRoll()方法主要用于判断这些条件,并且在满足的情况下调用roll()方法进行Active Segment滚动操作,它的源码如下:

  • // kafka.log.Log#maybeRoll
  • /**
  • * Roll the log over to a new empty log segment if necessary.
  • *
  • * @param messagesSize The messages set size in bytes
  • * logSegment will be rolled if one of the following conditions met
  • * <ol>
  • * <li> The logSegment is full
  • * <li> The maxTime has elapsed
  • * <li> The index is full
  • * </ol>
  • * @return The currently active segment after (perhaps) rolling to a new segment
  • */
  • private def maybeRoll(messagesSize: Int): LogSegment = {
  • val segment = activeSegment
  • /**
  • * 需要触发roll操作的条件有三个,满足一个即需要roll:
  • * - 当前activeSegment的日志大小加上本次待追加的消息集合大小,超过配置的LogSegment的最大长度。
  • * - 当前activeSegment的寿命超过了配置的LogSegment最长存活时间。
  • * - 索引文件满了。
  • */
  • if (segment.size > config.segmentSize - messagesSize || // 当前activeSegment的日志大小加上本次待追加的消息集合大小,超过配置的LogSegment的最大长度。
  • segment.size > 0 && time.milliseconds - segment.created > config.segmentMs - segment.rollJitterMs || // 当前activeSegment的寿命超过了配置的LogSegment最长存活时间。
  • segment.index.isFull) { // 索引文件满了。
  • debug("Rolling new log segment in %s (log_size = %d/%d, index_size = %d/%d, age_ms = %d/%d)."
  • .format(name,
  • segment.size,
  • config.segmentSize,
  • segment.index.entries,
  • segment.index.maxEntries,
  • time.milliseconds - segment.created,
  • config.segmentMs - segment.rollJitterMs))
  • // 调用roll()方法滚动,得到新的activeSegment并返回
  • roll()
  • } else {
  • // 不需要roll,直接返回当前activeSegment
  • segment
  • }
  • }

真正对Active Segment的滚动操作定义在Log类的roll()方法中,源码如下:

  • // kafka.log.Log#roll
  • /**
  • * Roll the log over to a new active segment starting with the current logEndOffset.
  • * This will trim the index to the exact size of the number of entries it currently contains.
  • * 进行滚动操作
  • * @return The newly rolled segment
  • */
  • def roll(): LogSegment = {
  • // 记录开始时间
  • val start = time.nanoseconds
  • // 加锁
  • lock synchronized {
  • // 获取LEO
  • val newOffset = logEndOffset
  • // 根据LEO创建新的日志文件,名称为LEO值.log
  • val logFile = logFilename(dir, newOffset)
  • // 根据LEO创建新的索引文件,名称为LEO值.index
  • val indexFile = indexFilename(dir, newOffset)
  • // 判断需要创建的文件是否已存在,如果已存在就删除
  • for(file <- List(logFile, indexFile); if file.exists) {
  • warn("Newly rolled segment file " + file.getName + " already exists; deleting it first")
  • file.delete()
  • }
  • // 当前最后一个LogSegment,也即是旧的activeSegment
  • segments.lastEntry() match {
  • case null =>
  • case entry => {
  • // 对索引文件和日志文件都进行截断,保证文件中只保存了有效字节,这对预分配的文件尤其重要
  • entry.getValue.index.trimToValidSize()
  • entry.getValue.log.trim()
  • }
  • }
  • // 根据创建的文件创建新的LogSegment对象
  • val segment = new LogSegment(dir,
  • startOffset = newOffset,
  • indexIntervalBytes = config.indexInterval,
  • maxIndexSize = config.maxIndexSize,
  • rollJitterMs = config.randomSegmentJitter,
  • time = time,
  • fileAlreadyExists = false,
  • initFileSize = initFileSize,
  • preallocate = config.preallocate)
  • // 将新创建的LogSegment对象添加到segments集合中
  • val prev = addSegment(segment)
  • if(prev != null) // ConcurrentSkipListMap的put方法在添加时,如果键已经存在,会把旧值返回,并放弃插入;否则成功插入后返回null
  • // 之前就存在对应的baseOffset的LogSegment,抛出异常
  • throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists.".format(name, newOffset))
  • // We need to update the segment base offset and append position data of the metadata when log rolls.
  • // The next offset should not change.
  • // 更新nextOffsetMetadata,这次更新的目的是为了更新其中记录的activeSegment.baseOffset和activeSegment.size,而LEO并不会改变
  • updateLogEndOffset(nextOffsetMetadata.messageOffset)
  • // schedule an asynchronous flush of the old segment
  • // 提交执行flush()操作的定时任务
  • scheduler.schedule("flush-log", () => flush(newOffset), delay = 0L)
  • info("Rolled new log segment for '" + name + "' in %.0f ms.".format((System.nanoTime - start) / (1000.0*1000.0)))
  • // 返回新建的activeSegment
  • segment
  • }
  • }
  • /**
  • * The offset of the next message that will be appended to the log
  • * LEO
  • */
  • def logEndOffset: Long = nextOffsetMetadata.messageOffset

roll()方法会根据logEndOffset的值来创建新的日志文件和索引文件,新创建的文件名的组织方式如下:

  • /**
  • * Construct a log file name in the given dir with the given base offset
  • * 根据offset(一般是logEndOffset)来创建日志文件
  • * @param dir The directory in which the log will reside
  • * @param offset The base offset of the log file
  • */
  • def logFilename(dir: File, offset: Long) =
  • new File(dir, filenamePrefixFromOffset(offset) + LogFileSuffix)
  • /**
  • * Construct an index file name in the given dir using the given base offset
  • * 根据offset(一般是logEndOffset)来创建索引文件
  • * @param dir The directory in which the log will reside
  • * @param offset The base offset of the log file
  • */
  • def indexFilename(dir: File, offset: Long) =
  • new File(dir, filenamePrefixFromOffset(offset) + IndexFileSuffix)
  • /**
  • * Make log segment file name from offset bytes. All this does is pad out the offset number with zeros
  • * so that ls sorts the files numerically.
  • * 该方法用于生成日志文件或索引文件的文件名
  • * 文件名一共20位,offset值占据低位,高位不足的补0
  • * @param offset The offset to use in the file name
  • * @return The filename
  • */
  • def filenamePrefixFromOffset(offset: Long): String = {
  • val nf = NumberFormat.getInstance()
  • nf.setMinimumIntegerDigits(20)
  • nf.setMaximumFractionDigits(0)
  • nf.setGroupingUsed(false)
  • nf.format(offset)
  • }

从上面的源码可以得知,创建的索引文件和日志文件的文件名是一个20位数字的字符串,logEndOffset值将作为offset占据低位(也是索引文件的baseOffset),高位不足的位置以0填充。

在取得对应的File对象后,如果文件已存在就直接将其删除;然后将当前存在的Active Segment的索引文件及日志文件进行截断,保证文件中只保存了有效字节,最后构建一个startOffsetlogEndOffset的LogSegment对象并将其添加到segments集合中用于描述新创建的索引文件和日志文件。

调用updateLogEndOffset(nextOffsetMetadata.messageOffset)方法则会更新nextOffsetMetadata字段为新的LogOffsetMetadata对象:

  • /**
  • * Calculate the offset of the next message
  • * LogOffsetMetadata对象。主要用于产生分配给消息的offset,同时也是当前副本的LEO(LogEndOffset)。
  • * - messageOffset字段记录了Log中最后一个offset值
  • * - segmentBaseOffset字段记录了activeSegment的baseOffset
  • * - relativePositionInSegment字段记录了activeSegment的大小
  • * */
  • @volatile var nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset(), activeSegment.baseOffset, activeSegment.size.toInt)
  • private def updateLogEndOffset(messageOffset: Long) {
  • // 每次更新都会创建新的LogOffsetMetadata对象,LogOffsetMetadata是个不可变对象
  • nextOffsetMetadata = new LogOffsetMetadata(messageOffset, activeSegment.baseOffset, activeSegment.size.toInt)
  • }

nextOffsetMetadata记录了Active Segment的最后一个offset值(也即是logEndOffset,Log对象所对应的索引文件中最后一个offset值),Active Segment的baseOffset以及Active Segment的大小;roll()方法中调用的updateLogEndOffset(nextOffsetMetadata.messageOffset)只是修改了Active Segment的baseOffset和大小,并未修改logEndOffset的值。

2.2.4. 数据刷盘

roll()方法的最后还会向Scheduler提交一个flush()操作将recoveryPointnewOffset之间的数据刷新到磁盘,flush(offset: Long) : Unit方法的源码如下:

  • // kafka.log.Log#flush
  • /**
  • * Flush log segments for all offsets up to offset-1
  • * @param offset The offset to flush up to (non-inclusive); the new recovery point
  • */
  • def flush(offset: Long) : Unit = {
  • if (offset <= this.recoveryPoint)
  • return
  • debug("Flushing log '" + name + " up to offset " + offset + ", last flushed: " + lastFlushTime + " current time: " +
  • time.milliseconds + " unflushed = " + unflushedMessages)
  • // 将所有LogSegment的 revoceryPoint ~ LEO 之间的消息刷新到磁盘上
  • for(segment <- logSegments(this.recoveryPoint, offset))
  • segment.flush()
  • lock synchronized {
  • if(offset > this.recoveryPoint) {
  • // 更新recoverPoint的值
  • this.recoveryPoint = offset
  • // 更新最后一次刷盘时间记录
  • lastflushedTime.set(time.milliseconds)
  • }
  • }
  • }
  • // kafka.log.Log#logSegments
  • /**
  • * Get all segments beginning with the segment that includes "from" and ending with the segment
  • * that includes up to "to-1" or the end of the log (if to > logEndOffset)
  • */
  • def logSegments(from: Long, to: Long): Iterable[LogSegment] = {
  • import JavaConversions._
  • lock synchronized {
  • // 找出key仅大于from的最近的元素的键,也即是baseOffset仅大于from的最近的LogSegment对象的baseOffset
  • val floor = segments.floorKey(from)
  • if(floor eq null)
  • // 如果floor为null,则取baseOffset小于to的LogSegment对象
  • segments.headMap(to).values
  • else
  • // 否则取floor到to之间的LogSegment对象
  • segments.subMap(floor, true, to, false).values
  • }
  • }
  • // kafka.log.LogSegment#flush
  • /**
  • * Flush this log segment to disk
  • */
  • @threadsafe
  • def flush() {
  • LogFlushStats.logFlushTimer.time {
  • log.flush()
  • index.flush()
  • }
  • }
  • // kafka.log.FileMessageSet#flush
  • /**
  • * Commit all written data to the physical disk
  • */
  • def flush() = {
  • channel.force(true)
  • }
  • // kafka.log.OffsetIndex#flush
  • /**
  • * Flush the data in the index to disk
  • */
  • def flush() {
  • inLock(lock) {
  • mmap.force()
  • }
  • }

从源码可知,flush(offset: Long) : Unit方法会将revoceryPoint至传入的offset之间的消息进行刷盘操作,并且修改revoceryPoint记录的值;数据的刷盘操作实际上还是会交给FileMessageSet对象和OffsetIndex对象的flush()方法,最终由通道和缓冲区映射进行实现。

2.3. 消息的读取

Log类也提供了read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None): FetchDataInfo方法用于读取特定范围的日志数据,起始偏移量由startOffset参数决定,读取长度由maxLengthmaxOffset共同决定。read(...)方法的源码如下:

  • /**
  • * Read messages from the log.
  • *
  • * @param startOffset The offset to begin reading at 开始读取的startOffset
  • * @param maxLength The maximum number of bytes to read 最大读取的数据字节数
  • * @param maxOffset The offset to read up to, exclusive. (i.e. this offset NOT included in the resulting message set) 最大读取到的offset(不包含)
  • *
  • * @throws OffsetOutOfRangeException If startOffset is beyond the log end offset or before the base offset of the first segment.
  • * @return The fetch data information including fetch starting offset metadata and messages read.
  • */
  • def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None): FetchDataInfo = {
  • trace("Reading %d bytes from offset %d in log %s of length %d bytes".format(maxLength, startOffset, name, size))
  • // Because we don't use lock for reading, the synchronization is a little bit tricky.
  • // We create the local variables to avoid race conditions with updates to the log.
  • /**
  • * read操作不会加锁,因此将nextOffsetMetadata拷贝一份为局部变量避免线程竞争更新了nextOffsetMetadata
  • * nextOffsetMetadata每次在updateLogEndOffset()方法的代码中更新的时候,都是创建新的LogOffsetMetadata对象,
  • * 而且LogOffsetMetadata中也没有提供任何修改属性的方法,可见LogOffsetMetadata对象是个不可变对象
  • */
  • val currentNextOffsetMetadata = nextOffsetMetadata
  • // 下一条记录的offset
  • val next = currentNextOffsetMetadata.messageOffset
  • // 如果起始offset等于下一条记录的offset,说明读取的数据其实是空的,返回一个MessageSet为空的FetchDataInfo对象
  • if(startOffset == next)
  • return FetchDataInfo(currentNextOffsetMetadata, MessageSet.Empty)
  • // 查找offset仅小于startOffset的LogSegment对象
  • var entry = segments.floorEntry(startOffset)
  • // attempt to read beyond the log end offset is an error
  • // 检查边界
  • if(startOffset > next || entry == null)
  • throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %d to %d.".format(startOffset, segments.firstKey, next))
  • // Do the read on the segment with a base offset less than the target offset
  • // but if that segment doesn't contain any messages with an offset greater than that
  • // continue to read from successive segments until we get some messages or we reach the end of the log
  • while(entry != null) {
  • // 当查找到的entry不为null
  • // If the fetch occurs on the active segment, there might be a race condition where two fetch requests occur after
  • // the message is appended but before the nextOffsetMetadata is updated. In that case the second fetch may
  • // cause OffsetOutOfRangeException. To solve that, we cap the reading up to exposed position instead of the log
  • // end of the active segment.
  • /**
  • * 如果读取的entry是activeSegment,可能会有并发线程在读取期间添加了消息数据导致activeSegment发生改变
  • */
  • val maxPosition = {
  • if (entry == segments.lastEntry) { // 是否是segments集合中最后一个元素,即读取的是activeSegment
  • // 获取activeSegment表示的物理日志存储的偏移量
  • val exposedPos = nextOffsetMetadata.relativePositionInSegment.toLong
  • // Check the segment again in case a new segment has just rolled out.
  • // 再次检查当前的activeSegment是否被改变
  • if (entry != segments.lastEntry)
  • // New log segment has rolled out, we can read up to the file end.
  • // 写线程并发进行了roll()操作,此时activeSegment改变了,变成了一个新的LogSegment
  • entry.getValue.size
  • else
  • // activeSegment没有改变,直接返回exposedPos
  • exposedPos
  • } else {
  • // 读取的是非activeSegment的情况,可以直接读取到LogSegment的结尾
  • entry.getValue.size
  • }
  • }
  • // 使用LogSegment的read()方法读取消息数据
  • val fetchInfo = entry.getValue.read(startOffset, maxOffset, maxLength, maxPosition)
  • if(fetchInfo == null) {
  • // 如果在这个LogSegment中没有读取到数据,就继续读取下一个LogSegment
  • entry = segments.higherEntry(entry.getKey)
  • } else {
  • // 否则直接返回
  • return fetchInfo
  • }
  • }
  • // okay we are beyond the end of the last segment with no data fetched although the start offset is in range,
  • // this can happen when all messages with offset larger than start offsets have been deleted.
  • // In this case, we will return the empty set with log end offset metadata
  • // 找不到startOffset之后的消息,返回一个MessageSet为空的FetchDataInfo对象
  • FetchDataInfo(nextOffsetMetadata, MessageSet.Empty)
  • }

read(...)方法是没有进行并发控制的,它会在一开始保存一份nextOffsetMetadata的副本并从中获取下一条记录的offset,用于检查传入参数的边界是否有效,然后根据startOffsetsegments跳表中查找offset仅小于startOffset的LogSegment对象,然后使用LogSegment对象read(...)方法读取数据。

这里需要注意的是,在计算maxPosition时,从跳表中定位到的LogSegment可能是Active Segment,也有可能是普通的Segment;当为Active Segment时,需要考虑读取的同时其他线程并发写入的问题,有可能在当前线程正在读取Active Segment的数据时,其他线程的写入操作出发了Active Segment的滚动,此时Active Segment对象会被改变,这里进行了Double Check以保证maxPosition的计算是实时的。

另外,这里增加的maxPosition的计算也可以保证在读取数据时仅凭maxOffsetmaxLength控制读取长度时由于多个线程之间状态nextOffsetMetadata.messageOffset状态不同步导致的OffsetOutOfRangeException异常【ISSUE KAFKA-2477】。