大数据
流式处理
Kafka
源码解析

Kafka系列 13 - 服务端源码分析 04:日志的存储管理

简介:主要讲解LogManager的实现,以及日志压缩,日志系统的初始化

1. LogManager

LogManager用于管理一个Broker上所有的Log实例,提供加载Log、创建Log集合、删除Log集合、查询Log集合等操作。LogManager还维护了3个定时任务线程及Cleaner线程,分别用于日志刷盘、日志清理、检查点维护及日志压缩等操作。LogManager的定义及其重要字段如下:

注:这里的日志压缩与前面讲解的消息压缩并不一样,这里的日志压缩是用于删除日志中因消息键重复的较旧的消息数据,以实现日志文件的瘦身。

  • // kafka.log.LogManager
  • /**
  • * The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning.
  • * All read and write operations are delegated to the individual log instances.
  • *
  • * The log manager maintains logs in one or more directories. New logs are created in the data directory
  • * with the fewest logs. No attempt is made to move partitions after the fact or balance based on
  • * size or I/O rate.
  • *
  • * A background thread handles log retention by periodically truncating excess log segments.
  • *
  • * @param logDirs log目录集合,在server.properties配置文件中指定的
  • * @param topicConfigs 主题配置
  • * @param defaultConfig 默认配置
  • * @param cleanerConfig 日志压缩的配置
  • * @param ioThreads 为完成Log加载的相关操作,每个log目录下分配指定的线程执行加载
  • * @param flushCheckMs kafka-log-flusher定时任务的周期时间
  • * @param flushCheckpointMs kafka-recovery-point-checkpoint定时任务的周期时间
  • * @param retentionCheckMs kafka-log-retention定时任务的周期时间
  • * @param scheduler KafkaScheduler对象,用于执行周期任务的线程池
  • * @param brokerState Broker状态
  • * @param time
  • */
  • @threadsafe
  • class LogManager(val logDirs: Array[File],
  • val topicConfigs: Map[String, LogConfig],
  • val defaultConfig: LogConfig,
  • val cleanerConfig: CleanerConfig,
  • ioThreads: Int,
  • val flushCheckMs: Long,
  • val flushCheckpointMs: Long,
  • val retentionCheckMs: Long,
  • scheduler: Scheduler,
  • val brokerState: BrokerState,
  • private val time: Time) extends Logging {
  • val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
  • val LockFile = ".lock"
  • // 定时任务的延迟时间
  • val InitialTaskDelayMs = 30*1000
  • // 创建或删除Log时需要加锁进行同步
  • private val logCreationOrDeletionLock = new Object
  • // 用于管理TopicAndPartition与Log之间的对应关系,底层使用ConcurrentHashMap实现
  • private val logs = new Pool[TopicAndPartition, Log]()
  • ...
  • // FileLock集合
  • private val dirLocks = lockLogDirs(logDirs)
  • /**
  • * 用于管理每个log目录与其下的RecoveryPointCheckpoint文件之间的映射关系,
  • * 在LogManager对象初始化时,会在每个log目录下创建一个对应的RecoveryPointCheckpoint文件。
  • * 此Map的value是OffsetCheckpoint类型的对象,其中封装了对应log目录下的RecoveryPointCheckpoint文件,
  • * 并提供对RecoveryPointCheckpoint文件的读写操作。
  • * RecoveryPointCheckpoint文件中则记录了该log目录下的所有Log的recoveryPoint
  • */
  • private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap
  • // 加载Log
  • loadLogs()
  • ...
  • }

在构造LogManager时,logDirs参数是通过server.properties文件配置的,表示存放日志的目录集合;Kafka允许配置多数据目录,每个数据目录都可以创建多个分区目录(对应于一个Log对象),而LogManager则管理着所有的数据目录。logCreationOrDeletionLock是一个简单的Object对象,用于在创建或删除Log时进行加锁;logs字段则是用于记录主题分区与Log之间的映射关系,使用Pool对象存储,Pool内部使用ConcurrentHashMap实现并发安全的字典结构。dirLocks字段的值来自lockLogDirs(dirs: Seq[File]): Seq[FileLock]方法的返回值,是所有数据目录的目录锁,该方法的实现如下:

  • /**
  • * Lock all the given directories
  • * 对目录加锁,实现方式是在目录下创建一个.lock文件,然后对其Channel进行加锁
  • * 如果.lock文件的Channel被加锁了,说明该目录被锁了
  • */
  • private def lockLogDirs(dirs: Seq[File]): Seq[FileLock] = {
  • dirs.map { dir =>
  • // 创建一个.lock后缀结尾的文件,并以该文件创建一个FileLock
  • val lock = new FileLock(new File(dir, LockFile))
  • // 对该FileLock尝试进行加锁
  • if(!lock.tryLock())
  • throw new KafkaException("Failed to acquire lock on file .lock in " + lock.file.getParentFile.getAbsolutePath +
  • ". A Kafka instance in another process or thread is using this directory.")
  • // 返回FileLock
  • lock
  • }
  • }

目录锁的实现其实是通过在数据目录下创建一个.lock文件并且以这个文件创建FileLock作为当前文件的目录锁,lockLogDirs(...)方法会直接尝试对FileLock上锁,最终返回所有数据目录的锁集合。

recoveryPointCheckpoints字段是一个Map[File, OffsetCheckpoint]类型的字典结构,用于管理每个数据目录与其下的RecoveryPointCheckpoint文件之间的映射关系。在LogManager对象初始化时,会在每个数据目录下创建一个对应的RecoveryPointCheckpoint文件,RecoveryPointCheckpoint文件主要用于在Broker异常的情况下进行数据恢复等操作,其中记录了该数据目录下的所有Log的recoveryPoint;RecoveryPointCheckpoint文件最终存储的格式如下:

  • 0
  • 20
  • topic_test1 6 0
  • topic_test2 1 0
  • topic_test1 13 0
  • topic_test2 3 0
  • ...

其中第一行表示当前版本;第二行表示该数据目录下有多少个主题分区目录;后续每行存储了每个主题分区的RecoveryCheckpoint信息,以主题 分区编号 recovery-checkpoint值格式存储,有多少个主题分区目录就有多少行。在后面将详细介绍RecoveryCheckpoint机制。

1.1. 定时任务

在LogManager的startup()方法中会启动三个定时任务以及日志压缩任务,源码如下:

  • /**
  • * Start the background threads to flush logs and do log cleanup
  • */
  • def startup() {
  • /* Schedule the cleanup task to delete old logs */
  • if(scheduler != null) {
  • // 启动kafka-log-retention定时任务
  • info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
  • scheduler.schedule("kafka-log-retention",
  • cleanupLogs,
  • delay = InitialTaskDelayMs,
  • period = retentionCheckMs, // log.retention.check.interval.ms
  • TimeUnit.MILLISECONDS)
  • info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
  • // 启动kafka-log-flusher定时任务
  • scheduler.schedule("kafka-log-flusher",
  • flushDirtyLogs,
  • delay = InitialTaskDelayMs,
  • period = flushCheckMs, // log.flush.scheduler.interval.ms
  • TimeUnit.MILLISECONDS)
  • // 启动kafka-recovery-point-checkpoint定时任务
  • scheduler.schedule("kafka-recovery-point-checkpoint",
  • checkpointRecoveryPointOffsets,
  • delay = InitialTaskDelayMs,
  • period = flushCheckpointMs, // log.flush.offset.checkpoint.interval.ms
  • TimeUnit.MILLISECONDS)
  • }
  • if(cleanerConfig.enableCleaner) // log.cleaner.enable
  • // 启动LogCleaner
  • cleaner.startup()
  • }

1.1.1. KafkaScheduler任务调度器

在认识定时任务之前,先了解一下这里用到的Scheduler调度器;Kafka通过包装JDK提供的ScheduledThreadPoolExecutor定时任务线程池,构建了自己的定时任务调度器KafkaScheduler,源码如下:

  • // kafka.utils.KafkaScheduler
  • /**
  • * A scheduler based on java.util.concurrent.ScheduledThreadPoolExecutor
  • *
  • * It has a pool of kafka-scheduler- threads that do the actual work.
  • *
  • * @param threads The number of threads in the thread pool
  • * @param threadNamePrefix The name to use for scheduler threads. This prefix will have a number appended to it.
  • * @param daemon If true the scheduler threads will be "daemon" threads and will not block jvm shutdown.
  • */
  • @threadsafe
  • class KafkaScheduler(val threads: Int,
  • val threadNamePrefix: String = "kafka-scheduler-",
  • daemon: Boolean = true) extends Scheduler with Logging {
  • // JDK定时任务线程池
  • private var executor: ScheduledThreadPoolExecutor = null
  • private val schedulerThreadId = new AtomicInteger(0)
  • override def startup() {
  • debug("Initializing task scheduler.")
  • this synchronized {
  • if(isStarted)
  • throw new IllegalStateException("This scheduler has already been started!")
  • // 构造器内初始化定时任务线程池
  • // 配置线程数
  • executor = new ScheduledThreadPoolExecutor(threads)
  • // 配置在线程池关闭后放弃执行已存在的周期任务
  • executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)
  • // 配置在线程池关闭后放弃执行已存在的定时任务
  • executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
  • // 配置线程工厂
  • executor.setThreadFactory(new ThreadFactory() {
  • // 默认创建是守护线程
  • def newThread(runnable: Runnable): Thread =
  • Utils.newThread(threadNamePrefix + schedulerThreadId.getAndIncrement(), runnable, daemon)
  • })
  • }
  • }
  • override def shutdown() {
  • debug("Shutting down task scheduler.")
  • // We use the local variable to avoid NullPointerException if another thread shuts down scheduler at same time.
  • val cachedExecutor = this.executor
  • if (cachedExecutor != null) {
  • this synchronized {
  • cachedExecutor.shutdown()
  • this.executor = null
  • }
  • // 等待一天,需要等待剩余已存在的周期任务执行完成
  • cachedExecutor.awaitTermination(1, TimeUnit.DAYS)
  • }
  • }
  • def schedule(name: String, fun: ()=>Unit, delay: Long, period: Long, unit: TimeUnit) = {
  • debug("Scheduling task %s with initial delay %d ms and period %d ms."
  • .format(name, TimeUnit.MILLISECONDS.convert(delay, unit), TimeUnit.MILLISECONDS.convert(period, unit)))
  • this synchronized {
  • // 保证线程池已经启动
  • ensureRunning
  • // 封装fun为Runnable对象
  • val runnable = CoreUtils.runnable {
  • try {
  • trace("Beginning execution of scheduled task '%s'.".format(name))
  • // 主要的执行任务代码
  • fun()
  • } catch {
  • case t: Throwable => error("Uncaught exception in scheduled task '" + name +"'", t)
  • } finally {
  • trace("Completed execution of scheduled task '%s'.".format(name))
  • }
  • }
  • if(period >= 0)
  • // 提交周期任务
  • executor.scheduleAtFixedRate(runnable, delay, period, unit)
  • else
  • // 提交非周期任务
  • executor.schedule(runnable, delay, unit)
  • }
  • }
  • def isStarted: Boolean = {
  • this synchronized {
  • executor != null
  • }
  • }
  • private def ensureRunning = {
  • if(!isStarted)
  • throw new IllegalStateException("Kafka scheduler is not running.")
  • }
  • }
  • // org.apache.kafka.common.utils.Utils#newThread
  • /**
  • * Create a new thread
  • * @param name The name of the thread
  • * @param runnable The work for the thread to do
  • * @param daemon Should the thread block JVM shutdown?
  • * @return The unstarted thread
  • */
  • public static Thread newThread(String name, Runnable runnable, boolean daemon) {
  • Thread thread = new Thread(runnable, name);
  • thread.setDaemon(daemon);
  • thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
  • public void uncaughtException(Thread t, Throwable e) {
  • log.error("Uncaught exception in thread '" + t.getName() + "':", e);
  • }
  • });
  • return thread;
  • }

如果了解ScheduledThreadPoolExecutor的原理,KafkaScheduler的实现是非常简单的;我们主要关注几个要点:KafkaScheduler自定义了线程工厂,生成的线程将拥有特定的线程名标识(默认为”kafka-scheduler-“ + 自增序列),添加了对未知异常的捕获,同时还会根据初始化KafkaScheduler传入的参数配置是否为守护线程等。

在线程调度方面,会根据schedule(name: String, fun: ()=>Unit, delay: Long, period: Long, unit: TimeUnit)方法的period参数决定是创建周期任务还是非周期任务,同时还会捕获运行任务产生的异常,仅仅使用Error级别的日志进行记录。

需要注意的是KafkaScheduler的shutdown()方法,它用于关闭线程池,但由于KafkaScheduler调度的可能是耗时非常久的任务,因此在调用了线程池shutdown()方法关闭后采用cachedExecutor.awaitTermination(1, TimeUnit.DAYS)的方式等待一天,以尽量完成剩余已存在的周期任务的执行。

1.1.2. Log Retention定时任务

Log Retention任务是用于LogSegment清理工作的,在前面我们已经提到过,Kafka会定期清理过期日志,清理的标准有两个:日志保存的时长(由retention.ms配置决定,默认值7天),及日志文件的大小(由retention.bytes配置决定,默认值-1,即无限制)。Log Retention任务会根据这两项的配置定时清理过期日志。清理的周期时间由配置项log.retention.check.interval.ms决定(默认值300000,即300s),清理工作则主要由LogManager的cleanupLogs()方法完成,该方法的源码如下:

  • // kafka.log.LogManager#cleanupLogs
  • /**
  • * Delete any eligible logs. Return the number of segments deleted.
  • * 按照两个条件进行LogSegment的清理工作:
  • * - LogSegment的存活时长;
  • * - 整个Log的大小。
  • * log-retention任务不仅会将过期的LogSegment删除,还会根据Log的大小决定是否删除最旧的LogSegment,以控制整个Log的大小。
  • */
  • def cleanupLogs() {
  • debug("Beginning log cleanup...")
  • var total = 0
  • val startMs = time.milliseconds
  • // 遍历所有Log对象,只处理cleanup.policy配置为delete的Log
  • for(log <- allLogs; if !log.config.compact) {
  • debug("Garbage collecting '" + log.name + "'")
  • // 使用cleanupExpiredSegments()和cleanupSegmentsToMaintainSize()方法进行清理
  • total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log)
  • }
  • debug("Log cleanup completed. " + total + " files deleted in " +
  • (time.milliseconds - startMs) / 1000 + " seconds")
  • }

cleanupLogs()方法会遍历所有的Log对象,只处理cleanup.policy配置为delete的Log,主要的清理流程由cleanupExpiredSegments(log: Log): IntcleanupSegmentsToMaintainSize(log: Log): Int两个方法完成,它们的源码如下:

  • // kafka.log.LogManager#cleanupExpiredSegments
  • /**
  • * Runs through the log removing segments older than a certain age
  • * 删除时间层面过期的日志文件
  • */
  • private def cleanupExpiredSegments(log: Log): Int = {
  • // 检查配置的retention.ms是否小于0
  • if (log.config.retentionMs < 0)
  • return 0
  • // 当前时间
  • val startMs = time.milliseconds
  • /**
  • * 计算时间,并将删除操作交给LogSegment类处理
  • * 删除条件是LogSegment的日志文件在最近一段时间(retentionMs)内没有被修改
  • */
  • log.deleteOldSegments(startMs - _.lastModified > log.config.retentionMs)
  • }
  • // kafka.log.LogManager#cleanupSegmentsToMaintainSize
  • /**
  • * Runs through the log removing segments until the size of the log
  • * is at least logRetentionSize bytes in size
  • * 删除大小超限的日志文件
  • */
  • private def cleanupSegmentsToMaintainSize(log: Log): Int = {
  • // 检查是否配置了retention.bytes,且要求该配置大于0
  • if(log.config.retentionSize < 0 || log.size < log.config.retentionSize)
  • return 0
  • // 根据Log的大小及retentionSize计算需要删除的字节数
  • var diff = log.size - log.config.retentionSize
  • // 判断该LogSegment是否应该被删除
  • def shouldDelete(segment: LogSegment) = {
  • // 如果需要删除的字节数大于segment的字节数,说明该LogSegment可以被删除
  • if(diff - segment.size >= 0) {
  • // 更新需要删除的字节数
  • diff -= segment.size
  • true
  • } else {
  • false
  • }
  • }
  • // 调用Log的deleteOldSegments()方法筛选并删除日志文件
  • log.deleteOldSegments(shouldDelete)
  • }

这两个放的设计非常巧妙,它们将实际的删除操作委托给Log对象,但将是否删除的决定功能作为函数类型的参数传递给Log对象的deleteOldSegments(predicate: LogSegment => Boolean): Int方法,从该方法的声明可以得知,predicate是一个函数类型的参数,该函数接收一个LogSegment类型的参数,返回值为Boolean,用于决定是否删除传入的LogSegment对象;具体实现如下:

  • // kafka.log.Log#deleteOldSegments
  • /**
  • * Delete any log segments matching the given predicate function,
  • * starting with the oldest segment and moving forward until a segment doesn't match.
  • * @param predicate A function that takes in a single log segment and returns true iff it is deletable
  • * @return The number of segments deleted
  • */
  • def deleteOldSegments(predicate: LogSegment => Boolean): Int = {
  • lock synchronized { // 加锁
  • //find any segments that match the user-supplied predicate UNLESS it is the final segment
  • //and it is empty (since we would just end up re-creating it)
  • // 获取activeSegment
  • val lastEntry = segments.lastEntry
  • val deletable =
  • if (lastEntry == null) Seq.empty // 没有activeSegment
  • /**
  • * 得到segments跳表中value集合的迭代器
  • * 循环检测LogSegment是否符合删除条件,并将符合条件的LogSegment形成集合返回
  • * takeWhile会根据传入的条件(A => Boolean)筛选元素,条件如下:
  • * - s => predicate(s):使用predicate判断s是否符合条件
  • * - (s.baseOffset != lastEntry.getValue.baseOffset || s.size > 0)
  • * - s.baseOffset != lastEntry.getValue.baseOffset:s是否不是activeSegment
  • * - s.size > 0:s的大小大于0
  • * 即在判断时间是否超时的情况下:
  • * 1. LogSegment的最近修改日期在retention.ms之前;
  • * 2. LogSegment不是activeSegment
  • * 3. LogSegment的大小大于0
  • * 即在判断大小是否超额的情况下:
  • * 1. Log的大小已经大于retention.bytes,且LogSegment的大小小于前面二者的差额;
  • * 2. LogSegment不是activeSegment
  • * 3. LogSegment的大小大于0
  • */
  • else logSegments.takeWhile(s => predicate(s) && (s.baseOffset != lastEntry.getValue.baseOffset || s.size > 0))
  • // 需要删除的LogSegment数量
  • val numToDelete = deletable.size
  • if (numToDelete > 0) {
  • // we must always have at least one segment, so if we are going to delete all the segments, create a new one first
  • // 全部的LogSegment都需要删除
  • if (segments.size == numToDelete)
  • // 删除前先创建一个新的activeSegment,保证保留一个LogSegment
  • roll()
  • // remove the segments for lookups
  • // 遍历删除LogSegment
  • deletable.foreach(deleteSegment(_))
  • }
  • // 返回删除的数量
  • numToDelete
  • }
  • }

删除操作是加锁控制的,从Log对象的segments跳表中根据参数传入的条件函数筛选出需要删除的LogSegment集合后,通过调用deleteSegment(segment: LogSegment)方法对集合中的每一个LogSegment进行删除,该方法的源码如下:

  • // kafka.log.Log#deleteSegment
  • /**
  • * This method performs an asynchronous log segment delete by doing the following:
  • * <ol>
  • * <li>It removes the segment from the segment map so that it will no longer be used for reads.
  • * <li>It renames the index and log files by appending .deleted to the respective file name
  • * <li>It schedules an asynchronous delete operation to occur in the future
  • * </ol>
  • * This allows reads to happen concurrently without synchronization and without the possibility of physically
  • * deleting a file while it is being read from.
  • *
  • * 删除指定的LogSegment
  • *
  • * @param segment The log segment to schedule for deletion
  • */
  • private def deleteSegment(segment: LogSegment) {
  • info("Scheduling log segment %d for log %s for deletion.".format(segment.baseOffset, name))
  • // 加锁
  • lock synchronized {
  • // 从segments集合中移除
  • segments.remove(segment.baseOffset)
  • // 异步删除
  • asyncDeleteSegment(segment)
  • }
  • }
  • // kafka.log.Log#asyncDeleteSegment
  • /**
  • * Perform an asynchronous delete on the given file if it exists (otherwise do nothing)
  • * @throws KafkaStorageException if the file can't be renamed and still exists
  • */
  • private def asyncDeleteSegment(segment: LogSegment) {
  • // 改后缀名,在后面加上.deleted
  • segment.changeFileSuffixes("", Log.DeletedFileSuffix)
  • // 定义一个删除方法,用于在定时任务中执行
  • def deleteSeg() {
  • info("Deleting segment %d from log %s.".format(segment.baseOffset, name))
  • segment.delete()
  • }
  • // 添加定时任务删除文件
  • scheduler.schedule("delete-file", deleteSeg, delay = config.fileDeleteDelayMs)
  • }
  • // kafka.log.LogSegment#changeFileSuffixes
  • /**
  • * Change the suffix for the index and log file for this log segment
  • * 改变LogSegment描述的日志文件和索引文件的后缀
  • */
  • def changeFileSuffixes(oldSuffix: String, newSuffix: String) {
  • def kafkaStorageException(fileType: String, e: IOException) =
  • new KafkaStorageException(s"Failed to change the $fileType file suffix from $oldSuffix to $newSuffix for log segment $baseOffset", e)
  • // 将日志文件的后缀名由oldSuffix改为newSuffix
  • try log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, oldSuffix, newSuffix)))
  • catch {
  • case e: IOException => throw kafkaStorageException("log", e)
  • }
  • // 将索引文件的后缀名由oldSuffix改为newSuffix
  • try index.renameTo(new File(CoreUtils.replaceSuffix(index.file.getPath, oldSuffix, newSuffix)))
  • catch {
  • case e: IOException => throw kafkaStorageException("index", e)
  • }
  • }
  • // kafka.log.LogSegment#delete
  • /**
  • * Delete this log segment from the filesystem.
  • * @throws KafkaStorageException if the delete fails.
  • */
  • def delete() {
  • // 删除Log日志文件
  • val deletedLog = log.delete()
  • // 删除Index索引文件
  • val deletedIndex = index.delete()
  • // 检查是否删除成功
  • if(!deletedLog && log.file.exists)
  • throw new KafkaStorageException("Delete of log " + log.file.getName + " failed.")
  • if(!deletedIndex && index.file.exists)
  • throw new KafkaStorageException("Delete of index " + index.file.getName + " failed.")
  • }

Log对象的deleteSegment(segment: LogSegment)方法会首先将LogSegment从segments跳表中移除,然后调用asyncDeleteSegment(segment: LogSegment)方法异步删除对应的索引文件和日志文件。文件的删除流程如下:

  1. 首先给LogSegment描述的日志文件及索引文件添加.deleted后缀;
  2. 向KafkaScheduler调度器添加一个”delete-file“任务,延迟一定时间后(由配置项file.delete.delay.ms决定,默认60000,即1分钟)再执行”delete-file“任务进行删除。

1.1.3. Log Flusher定时任务

Log Flusher定时任务会根据配置的时长(由配置项log.flush.scheduler.interval.ms决定,默认值3000)定时对Log进行刷盘操作,保证数据的持久性;具体的刷盘工作由Log对象的flushDirtyLogs()方法完成,源码如下:

  • // kafka.log.LogManager#flushDirtyLogs
  • /**
  • * Flush any log which has exceeded its flush interval and has unwritten messages.
  • */
  • private def flushDirtyLogs() = {
  • debug("Checking for dirty logs to flush...")
  • // 遍历logs集合
  • for ((topicAndPartition, log) <- logs) {
  • try {
  • // 计算距离上次刷新的间隔时间
  • val timeSinceLastFlush = time.milliseconds - log.lastFlushTime
  • debug("Checking if flush is needed on " + topicAndPartition.topic + " flush interval " + log.config.flushMs +
  • " last flushed " + log.lastFlushTime + " time since last flush: " + timeSinceLastFlush)
  • // 检查是否到了需要执行flush操作的时间
  • if(timeSinceLastFlush >= log.config.flushMs)
  • // 调用Log的flush()方法刷新,实际刷新到LEO位置
  • log.flush
  • } catch {
  • case e: Throwable =>
  • error("Error flushing topic " + topicAndPartition.topic, e)
  • }
  • }
  • }
  • // kafka.log.Log#flush
  • /**
  • * Flush all log segments
  • */
  • def flush(): Unit = flush(this.logEndOffset)
  • // kafka.log.Log#flush
  • /**
  • * Flush log segments for all offsets up to offset-1
  • * @param offset The offset to flush up to (non-inclusive); the new recovery point
  • */
  • def flush(offset: Long) : Unit = {
  • if (offset <= this.recoveryPoint)
  • return
  • debug("Flushing log '" + name + " up to offset " + offset + ", last flushed: " + lastFlushTime + " current time: " +
  • time.milliseconds + " unflushed = " + unflushedMessages)
  • // 将所有LogSegment的 revoceryPoint ~ LEO 之间的消息刷新到磁盘上
  • for(segment <- logSegments(this.recoveryPoint, offset))
  • segment.flush()
  • lock synchronized {
  • if(offset > this.recoveryPoint) {
  • // 更新recoverPoint的值
  • this.recoveryPoint = offset
  • // 更新最后一次刷盘时间记录
  • lastflushedTime.set(time.milliseconds)
  • }
  • }
  • }

从源码可知,最终的刷盘操作还是交给了LogSegment对象,而LogSegment对象会将刷盘操作继续交给内部的FileMessageSet和OffsetIndex完成,这部分源码在前面1.2.4. 数据刷盘节已经介绍过了,这里就不再赘述。仅仅需要注意的是,在刷盘操作之后,Log对象的recoveryPoint会更新为传入的offset的值,这表明,recoveryPoint的数据都已经刷盘持久化了。

1.1.4. RecoveryPoint Checkpoint定时任务

RecoveryPoint Checkpoint定时任务用于定时将每个Log的recoveryPoint写入RecoveryPointCheckpoint文件中,间隔时长由配置项log.flush.offset.checkpoint.interval.ms决定(默认值60000,即1分钟),主要的检查操作由LogManager对象的checkpointRecoveryPointOffsets()方法完成,源码如下:

  • // kafka.log.LogManager#checkpointRecoveryPointOffsets
  • /**
  • * Write out the current recovery point for all logs to a text file in the log directory
  • * to avoid recovering the whole log on startup.
  • */
  • def checkpointRecoveryPointOffsets() {
  • // 对所有的logDirs都调用checkpointLogsInDir方法
  • this.logDirs.foreach(checkpointLogsInDir)
  • }
  • // kafka.log.LogManager#checkpointLogsInDir
  • /**
  • * Make a checkpoint for all logs in provided directory.
  • */
  • private def checkpointLogsInDir(dir: File): Unit = {
  • // 获取指定log目录下的TopicAndPartition信息,以及对应的Log对象
  • val recoveryPoints = this.logsByDir.get(dir.toString)
  • if (recoveryPoints.isDefined) { // recoveryPoints不为空
  • // 更新指定log目录下RecoveryPointCheckpoint文件
  • this.recoveryPointCheckpoints(dir) // OffsetCheckpoint对象
  • .write(recoveryPoints.get.mapValues(_.recoveryPoint))
  • }
  • }
  • // kafka.log.LogManager#logsByDir
  • /**
  • * Map of log dir to logs by topic and partitions in that dir
  • */
  • private def logsByDir = {
  • /**
  • * 对Map[TopicAndPartition, Log]进行目录分组聚合
  • * 最终得到Map[String, Map[TopicAndPartition, Log]]结构的字典数据
  • * 其中键为TopicAndPartition的父目录
  • */
  • this.logsByTopicPartition.groupBy {
  • case (_, log) => log.dir.getParent
  • }
  • }
  • // kafka.log.LogManager#recoveryPointCheckpoints
  • /**
  • * 用于管理每个log目录与其下的RecoveryPointCheckpoint文件之间的映射关系,
  • * 在LogManager对象初始化时,会在每个log目录下创建一个对应的RecoveryPointCheckpoint文件。
  • * 此Map的value是OffsetCheckpoint类型的对象,其中封装了对应log目录下的RecoveryPointCheckpoint文件,
  • * 并提供对RecoveryPointCheckpoint文件的读写操作。
  • * RecoveryPointCheckpoint文件中则记录了该log目录下的所有Log的recoveryPoint
  • */
  • private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap
  • // kafka.log.LogManager#RecoveryPointCheckpointFile
  • // RecoveryPointCheckpointFile的名称
  • val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"

从源码可知,checkpointRecoveryPointOffsets()方法会对所有数据目录进行遍历并对其执行checkpointLogsInDir(dir: File): Unit方法。

这里先介绍一下logsByDir字段,该字段的值是通过对LogManager的logsByTopicPartition字段进行分组得到的,logsByTopicPartitionlogs字段的字典形式,它们的定义如下:

  • // kafka.log.LogManager#logsByTopicPartition
  • /**
  • * Get a map of TopicAndPartition => Log
  • */
  • def logsByTopicPartition: Map[TopicAndPartition, Log] = logs.toMap
  • // kafka.log.LogManager#logs
  • // 用于管理TopicAndPartition与Log之间的对应关系,底层使用ConcurrentHashMap实现
  • private val logs = new Pool[TopicAndPartition, Log]()

因此logsByTopicPartition中存储了主题分区与Log对象的映射,通过groupBy分组后,会得到Map[String, Map[TopicAndPartition, Log]]结构的字典数据,键为数据目录名,值为TopicAndPartition到Log对象的映射字典。

checkpointLogsInDir(dir: File): Unit方法中会从logsByDir中获取对应数据目录的TopicAndPartition到Log对象的映射字典,然后将这些Log对象的recoveryPoint值写入到RecoveryPointCheckpointFile中。具体的写入操作是由OffsetCheckpoint对象的write(offsets: Map[TopicAndPartition, Long])方法完成的,源码如下:

  • // kafka.server.OffsetCheckpoint#write
  • /**
  • * 最终存储的格式如下:
  • * 0
  • * 20
  • * topic_test1 6 0
  • * topic_test2 1 0
  • * topic_test1 13 0
  • * topic_test2 3 0
  • * ...
  • *
  • * 第一行表示当前版本,第二行表示该LogDir目录下有多少个partition目录
  • * 后续每行存储了三个信息:topic partition编号 recovery-checkpoint,有多少个partition目录就有多少行
  • *
  • * @param offsets
  • */
  • def write(offsets: Map[TopicAndPartition, Long]) {
  • // 加锁
  • lock synchronized {
  • // write to temp file and then swap with the existing file
  • // 将数据先写入tmp文件
  • val fileOutputStream = new FileOutputStream(tempPath.toFile)
  • val writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream))
  • try {
  • // 写入版本号
  • writer.write(CurrentVersion.toString)
  • writer.newLine()
  • // 写入记录条数
  • writer.write(offsets.size.toString)
  • writer.newLine()
  • // 循环写入topic名称、分区编号以及对应Log的recoveryPoint
  • offsets.foreach { case (topicPart, offset) =>
  • writer.write(s"${topicPart.topic} ${topicPart.partition} $offset")
  • writer.newLine()
  • }
  • writer.flush()
  • // 将写入的数据刷新到磁盘
  • fileOutputStream.getFD().sync()
  • } catch {
  • case e: FileNotFoundException =>
  • if (FileSystems.getDefault.isReadOnly) {
  • fatal("Halting writes to offset checkpoint file because the underlying file system is inaccessible : ", e)
  • Runtime.getRuntime.halt(1)
  • }
  • throw e
  • } finally {
  • writer.close()
  • }
  • // 使用tmp文件替换原来的RecoveryPointCheckpoint文件
  • Utils.atomicMoveWithFallback(tempPath, path)
  • }
  • }

由源码可知,写入过程是先将数据目录下的所有Log的recoveryPoint写到名为”recovery-point-offset-checkpoint.tmp“临时文件中,然后用该临时文件替换原来的RecoveryPointCheckpoint文件。另外,RecoveryPointCheckpoint文件最终存储的格式如下:

  • 0
  • 20
  • topic_test1 6 0
  • topic_test2 1 0
  • topic_test1 13 0
  • topic_test2 3 0
  • ...

其中第一行表示当前版本;第二行表示该数据目录下有多少个主题分区目录;后续每行存储了每个主题分区的RecoveryCheckpoint信息,以主题 分区编号 recovery-checkpoint值格式存储,有多少个主题分区目录就有多少行。这部分内容在前面简单讲解过。对应的,OffsetCheckpoint类还提供了read(): Map[TopicAndPartition, Long]用于读取RecoveryPointCheckpoint文件,返回一个Map[TopicAndPartition, Long]类型的字典,比较简单,源码如下:

  • // kafka.server.OffsetCheckpoint#read
  • // 读取RecoveryPointCheckpoint文件
  • def read(): Map[TopicAndPartition, Long] = {
  • def malformedLineException(line: String) =
  • new IOException(s"Malformed line in offset checkpoint file: $line'")
  • // 加锁
  • lock synchronized {
  • // 读取器
  • val reader = new BufferedReader(new FileReader(file))
  • var line: String = null
  • try {
  • // 读取版本号
  • line = reader.readLine()
  • if (line == null)
  • return Map.empty
  • val version = line.toInt
  • // 针对版本号进行不同的处理
  • version match {
  • // 当前版本号
  • case CurrentVersion =>
  • // 读取期望大小
  • line = reader.readLine()
  • if (line == null)
  • return Map.empty
  • val expectedSize = line.toInt
  • val offsets = mutable.Map[TopicAndPartition, Long]()
  • /**
  • * 分别读取每行topic、partition、checkpoint offset信息
  • * 并转换为Map[TopicAndPartition, Long]格式
  • */
  • line = reader.readLine()
  • while (line != null) {
  • WhiteSpacesPattern.split(line) match {
  • case Array(topic, partition, offset) =>
  • offsets += TopicAndPartition(topic, partition.toInt) -> offset.toLong
  • line = reader.readLine()
  • case _ => throw malformedLineException(line)
  • }
  • }
  • // 检查大小是否匹配
  • if (offsets.size != expectedSize)
  • throw new IOException(s"Expected $expectedSize entries but found only ${offsets.size}")
  • // 返回得到的Map[TopicAndPartition, Long]集合
  • offsets
  • case _ =>
  • throw new IOException("Unrecognized version of the highwatermark checkpoint file: " + version)
  • }
  • } catch {
  • case e: NumberFormatException => throw malformedLineException(line)
  • } finally {
  • reader.close()
  • }
  • }
  • }

1.2. 日志压缩

Kafka还提供了日志压缩(Log Compaction)功能,通过此功能也可以有效地减小日志文件的大小,缓解磁盘紧张的情况。在很多实践场景中,消息的key与value的值之间的对应关系是不断变化的,就像数据库中的数据记录会不断被修改一样。如果消费者只关心key对应的最新value值,可以开启Kafka的日志压缩功能,服务端会在后台启动Cleaner线程池,定期将相同key的消息进行合并,只保留最新的value值。

Log在写入消息时是将消息追加到Active Segment的日志文件末尾。为了避免Active Segment成为热点,Active Segment不参与日志压缩操作,而是只压缩其余的只读的LogSegment。在日志压缩过程中会启动多条Cleaner线程,我们可以通过调整Cleaner线程池中的线程数量,优化并发压缩的性能,减少对整个服务端性能的影响。一般情况下,Log的数据量很大,为了避免Cleaner线程与其他业务线程长时间竞争CPU,并不会将除Active Segment之外的所有LogSegment在一次压缩操作中全部处理掉,而是将这些LogSegment分批进行压缩。

每个Log都可以通过Cleaner Checkpoint切分成Clean和Dirty两部分,Clean部分表示的是之前已经被压缩过的部分,而Dirty部分则表示未压缩的部分。日志压缩操作完成后,Dirty部分消息的offset依然是连续递增的,而Clean部分消息的offset是断断续续的。

Cleaner Checkpoint划分图.png

1.2.1. LogCleaner类

在LogManager的startup()方法中,会根据cleanerConfig.enableCleane配置(log.cleaner.enable,默认值为true)决定是否启动压缩任务,压缩任务是由LogCleaner对象实现的:

  • // public, so we can access this from kafka.admin.DeleteTopicTest
  • val cleaner: LogCleaner =
  • if(cleanerConfig.enableCleaner)
  • new LogCleaner(cleanerConfig, logDirs, logs, time = time)
  • else
  • null

LogCleaner对象在初始化时,会根据数据目录等参数构建LogCleanerManager对象,并根据CleanerConfig配置初始化对应数量的CleanerThread线程并由cleaners字段进行管理:

  • // kafka.log.LogCleaner
  • /**
  • * @param config Configuration parameters for the cleaner Cleaner的配置
  • * @param logDirs The directories where offset checkpoints reside 数据目录集合
  • * @param logs The pool of logs 主题分区到Log对象的映射字典
  • * @param time A way to control the passage of time 当前时间
  • */
  • class LogCleaner(val config: CleanerConfig,
  • val logDirs: Array[File],
  • val logs: Pool[TopicAndPartition, Log],
  • time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
  • /**
  • * for managing the state of partitions being cleaned. package-private to allow access in tests
  • * 负责每个Log的压缩状态管理以及cleaner checkpoint信息维护和更新
  • * */
  • private[log] val cleanerManager = new LogCleanerManager(logDirs, logs)
  • ...
  • /**
  • * the threads
  • * 用于管理CleanerThread线程
  • * */
  • private val cleaners = (0 until config.numThreads).map(new CleanerThread(_))
  • ...
  • }

在LogCleaner的startup()方法中会启动所有上面创建的CleanerThread线程:

  • // kafka.log.LogCleaner#startup
  • /**
  • * Start the background cleaning
  • * 启动cleaner线程
  • */
  • def startup() {
  • info("Starting the log cleaner")
  • // 遍历调用start()方法
  • cleaners.foreach(_.start())
  • }

1.2.2. CleanerThread和ShutdownableThread类

CleanerThread线程其实是继承自ShutdownableThread抽象类的,该类的源码如下:

  • // kafka.utils.ShutdownableThread
  • abstract class ShutdownableThread(val name: String, val isInterruptible: Boolean = true)
  • extends Thread(name) with Logging {
  • this.setDaemon(false)
  • this.logIdent = "[" + name + "], "
  • // 运行状态
  • val isRunning: AtomicBoolean = new AtomicBoolean(true)
  • // 用于控制线程关闭的锁
  • private val shutdownLatch = new CountDownLatch(1)
  • // 关闭线程
  • def shutdown() = {
  • initiateShutdown()
  • awaitShutdown()
  • }
  • def initiateShutdown(): Boolean = {
  • if(isRunning.compareAndSet(true, false)) { // CAS方式修改isRunning为false
  • info("Shutting down")
  • // 修改isRunning为false
  • isRunning.set(false)
  • // 处理中断
  • if (isInterruptible)
  • interrupt()
  • true
  • } else
  • // 修改isRunning失败
  • false
  • }
  • /**
  • * After calling initiateShutdown(), use this API to wait until the shutdown is complete
  • * 阻塞等待线程结束
  • */
  • def awaitShutdown(): Unit = {
  • // 阻塞
  • shutdownLatch.await()
  • info("Shutdown completed")
  • }
  • /**
  • * This method is repeatedly invoked until the thread shuts down or this method throws an exception
  • */
  • def doWork(): Unit
  • override def run(): Unit = {
  • info("Starting ")
  • try{
  • // while循环,处理主要工作
  • while(isRunning.get()){
  • // doWork()方法由子类实现
  • doWork()
  • }
  • } catch{
  • case e: Throwable =>
  • if(isRunning.get())
  • error("Error due to ", e)
  • }
  • // 解阻塞
  • shutdownLatch.countDown()
  • info("Stopped ")
  • }
  • }

ShutdownableThread类的设计是比较简单的,内部通过shutdownLatch来控制线程的真正结束,而在主要的run()方法中,如果线程的isRunning标识一直是true将不断执行doWork()方法,doWork()方法则是一个抽象方法,具体内容由子类实现,因此,我们需要关注CleanerThread类的doWork()方法,源码如下:

  • // kafka.log.LogCleaner.CleanerThread#doWork
  • /**
  • * The main loop for the cleaner thread
  • */
  • override def doWork() {
  • cleanOrSleep()
  • }
  • // kafka.log.LogCleaner.CleanerThread#cleanOrSleep
  • /**
  • * Clean a log if there is a dirty log available, otherwise sleep for a bit
  • */
  • private def cleanOrSleep() {
  • // 通过CleanerManager的grabFilthiestLog()方法获取需要进行日志压缩的Log
  • cleanerManager.grabFilthiestLog() match {
  • case None =>
  • // there are no cleanable logs, sleep a while
  • // 没有需要压缩的Log,退避一段时间
  • backOffWaitLatch.await(config.backOffMs, TimeUnit.MILLISECONDS)
  • case Some(cleanable) =>
  • // there's a log, clean it
  • // 有需要压缩的Log
  • // 获取Log的firstDirtyOffset,即dirty log的起始offset,是clean log和dirty log的分界线
  • var endOffset = cleanable.firstDirtyOffset
  • try {
  • // 调用Cleaner对象进行日志压缩
  • endOffset = cleaner.clean(cleanable)
  • // 用于记录状态并输出过程日志
  • recordStats(cleaner.id, cleanable.log.name, cleanable.firstDirtyOffset, endOffset, cleaner.stats)
  • } catch {
  • case pe: LogCleaningAbortedException => // task can be aborted, let it go.
  • } finally {
  • // 完成清理,进行状态转换,更新cleaner-offset-checkpoint文件
  • cleanerManager.doneCleaning(cleanable.topicPartition, cleanable.log.dir.getParentFile, endOffset)
  • }
  • }
  • }

1.2.3. LogCleanerManager和Cleaner类

从上面的代码可以看出,清理工作还涉及了LogCleanerManager对象和Cleaner对象,我们先了解这两个对象再回过头来分析。其中LogCleanerManager对象cleanerManager是在LogCleaner中初始化的,由于CleanerThread类是LogCleaner类的内部类,所以可以访问该对象:

  • /**
  • * for managing the state of partitions being cleaned. package-private to allow access in tests
  • * 负责每个Log的压缩状态管理以及cleaner checkpoint信息维护和更新
  • * */
  • private[log] val cleanerManager = new LogCleanerManager(logDirs, logs)

LogCleanerManager主要负责每个Log的压缩状态管理以及Cleaner Checkpoint信息维护和更新,它的定义和重要的字段如下:

  • // kafka.log.LogCleanerManager
  • /**
  • * Manage the state of each partition being cleaned.
  • * If a partition is to be cleaned, it enters the LogCleaningInProgress state.
  • * While a partition is being cleaned, it can be requested to be aborted and paused. Then the partition first enters
  • * the LogCleaningAborted state. Once the cleaning task is aborted, the partition enters the LogCleaningPaused state.
  • * While a partition is in the LogCleaningPaused state, it won't be scheduled for cleaning again, until cleaning is
  • * requested to be resumed.
  • */
  • private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[TopicAndPartition, Log]) extends Logging with KafkaMetricsGroup {
  • // package-private for testing
  • // Cleaner Checkpoint文件名
  • private[log] val offsetCheckpointFile = "cleaner-offset-checkpoint"
  • /**
  • * the offset checkpoints holding the last cleaned point for each log
  • * 用于维护data数据目录与cleaner-offset-checkpoint文件之间的对应关系,与LogManager的recoverPointCheckpoints集合类似
  • * */
  • private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, offsetCheckpointFile)))).toMap
  • /**
  • * a global lock used to control all access to the in-progress set and the offset checkpoints
  • * 用于保护checkpoints集合和inProgress集合的锁
  • * */
  • private val lock = new ReentrantLock
  • /**
  • * for coordinating the pausing and the cleaning of a partition
  • * 用于线程阻塞等待压缩状态由LogCleaningAborted转换为LogCleaningPaused
  • * */
  • private val pausedCleaningCond = lock.newCondition()
  • /**
  • * the set of logs currently being cleaned
  • * 记录正在进行清理的TopicAndPartition的压缩状态
  • * */
  • private val inProgress = mutable.HashMap[TopicAndPartition, LogCleaningState]()
  • ...
  • }

其中checkpoints用来维护data数据目录与Cleaner Checkpoint文件之间的对应关系;lock是用于保护checkpoints集合和inProgress集合的锁;pausedCleaningCond则构成了一个条件队列,用于控制压缩状态的装换;inProgress用于记录正在进行清理的TopicAndPartition的压缩状态,压缩状态都定义为了LogCleaningState的子类:

  • // kafka.log.LogCleaningState
  • private[log] sealed trait LogCleaningState
  • private[log] case object LogCleaningInProgress extends LogCleaningState // 刚开始进入压缩任务
  • private[log] case object LogCleaningAborted extends LogCleaningState // 压缩任务被中断
  • private[log] case object LogCleaningPaused extends LogCleaningState // 压缩任务被暂停

日志压缩的生命周期对应着上面三个状态的转换,当开始进行日志压缩任务时会先进入LogCleaningInProgress状态;压缩任务可以被暂停,此时进入LogCleaningPaused;压缩任务若被中断,则先进入LogCleaningAborted状态,等待Cleaner线程将其中的任务中止,然后进入LogCleaningPaused状态。处于LogCleaningPaused状态的TopicAndPartition的日志不会再被压缩,直到有其他线程恢复其压缩状态。

Cleaner对象cleaner是在CleanerThread线程类里初始化的:

  • // kafka.log.LogCleaner.CleanerThread#cleaner
  • // 初始化压缩器
  • val cleaner = new Cleaner(id = threadId,
  • offsetMap = new SkimpyOffsetMap(memory = math.min(config.dedupeBufferSize / config.numThreads, Int.MaxValue).toInt,
  • hashAlgorithm = config.hashAlgorithm),
  • ioBufferSize = config.ioBufferSize / config.numThreads / 2,
  • maxIoBufferSize = config.maxMessageSize,
  • dupBufferLoadFactor = config.dedupeBufferLoadFactor,
  • throttler = throttler,
  • time = time,
  • checkDone = checkDone)

Cleaner的定义和重要字段如下:

  • /**
  • * This class holds the actual logic for cleaning a log
  • * @param id An identifier used for logging
  • * @param offsetMap The map used for deduplication SkimpyOffsetMap类型的Map,为dirty部分的消息建立key与last_offset的映射关系
  • * @param ioBufferSize The size of the buffers to use. Memory usage will be 2x this number as there is a read and write buffer. 指定了读写LogSegment的ByteBuffer大小
  • * @param maxIoBufferSize The maximum size of a message that can appear in the log 指定的消息的最大长度
  • * @param dupBufferLoadFactor The maximum percent full for the deduplication buffer 指定了SkimpyOffsetMap的最大占用比例
  • * @param throttler The throttler instance to use for limiting I/O rate.
  • * @param time The time instance
  • * @param checkDone Check if the cleaning for a partition is finished or aborted. 用来检测Log的压缩状态
  • */
  • private[log] class Cleaner(val id: Int,
  • val offsetMap: OffsetMap,
  • ioBufferSize: Int,
  • maxIoBufferSize: Int,
  • dupBufferLoadFactor: Double,
  • throttler: Throttler,
  • time: Time,
  • checkDone: (TopicAndPartition) => Unit) extends Logging {
  • /**
  • * buffer used for read i/o
  • * 读缓冲区
  • **/
  • private var readBuffer = ByteBuffer.allocate(ioBufferSize)
  • /**
  • * buffer used for write i/o
  • * 写缓冲区
  • **/
  • private var writeBuffer = ByteBuffer.allocate(ioBufferSize)

在Cleaner类的构造函数声明中有几个重要参数:

  • offsetMap:该参数是一个SkimpyOffsetMap类型的Map,用于为dirty部分的消息建立key与last_offset的映射关系。SkimpyOffsetMap使用加密算法(例如MD2、MD5、SHA-1等)计算key的hash值,并以此hash值表示key,在Map中不直接保存key的值。SkimpyOffsetMap底层使用ByteBuffer实现,冲突解决方式使用的是线性探查法。SkimpyOffsetMap只支持put()方法和get()方法,不支持任何删除方法。
  • ioBufferSize:指定读写LogSegment的ByteBuffer大小。
  • maxIoBufferSize:指定的消息的最大长度。
  • dupBufferLoadFactor:指定SkimpyOffsetMap的最大占用比例。
  • checkDone:用来检测Log的压缩状态。

1.2.4. 筛选需要压缩的日志

回到CleanerThread类用于压缩日志的主要方法cleanOrSleep(),它内部调用LogCleanerManager对象的grabFilthiestLog()获取了需要进行日志压缩的Log,该方法的源码如下:

  • // kafka.log.LogCleanerManager#grabFilthiestLog
  • /**
  • * Choose the log to clean next and add it to the in-progress set. We recompute this
  • * every time off the full set of logs to allow logs to be dynamically added to the pool of logs
  • * the log manager maintains.
  • * 选取下一个需要进行日志压缩的Log
  • * filthiest
  • * 英 [ˈfɪlθɪɪst],美 [ˈfɪlθɪɪst]
  • * adv. 极其肮脏的;富得流油的
  • * adj. 肮脏的;污秽的;下流的;淫秽的;猥亵的;气愤的
  • * filthy的最高级
  • */
  • def grabFilthiestLog(): Option[LogToClean] = {
  • // 加锁
  • inLock(lock) {
  • // 获取全部Log的cleaner checkpoint
  • val lastClean = allCleanerCheckpoints()
  • val dirtyLogs = logs.filter {
  • // 过滤掉cleanup.policy为delete的Log
  • case (topicAndPartition, log) => log.config.compact // skip any logs marked for delete rather than dedupe
  • }.filterNot {
  • // 过滤掉inProgress中的Log
  • case (topicAndPartition, log) => inProgress.contains(topicAndPartition) // skip any logs already in-progress
  • }.map {
  • case (topicAndPartition, log) => // create a LogToClean instance for each
  • // if the log segments are abnormally truncated and hence the checkpointed offset
  • // is no longer valid, reset to the log starting offset and log the error event
  • // 获取Log中第一条消息的offset
  • val logStartOffset = log.logSegments.head.baseOffset
  • // 决定最终的压缩开始的位置,firstDirtyOffset的值可能是logStartOffset,也可能是clean checkpoint
  • val firstDirtyOffset = {
  • val offset = lastClean.getOrElse(topicAndPartition, logStartOffset)
  • if (offset < logStartOffset) {
  • error("Resetting first dirty offset to log start offset %d since the checkpointed offset %d is invalid."
  • .format(logStartOffset, offset))
  • logStartOffset
  • } else {
  • offset
  • }
  • }
  • // 为每个Log创建一个LogToClean对象,该对象内部维护了每个Log的clean部分字节数、dirty部分字节数以及cleanableRatio
  • LogToClean(topicAndPartition, log, firstDirtyOffset)
  • }.filter(ltc => ltc.totalBytes > 0) // skip any empty logs 过滤掉空Log
  • // 获取dirtyLogs集合中cleanableRatio的最大值
  • this.dirtiestLogCleanableRatio = if (!dirtyLogs.isEmpty) dirtyLogs.max.cleanableRatio else 0
  • // and must meet the minimum threshold for dirty byte ratio
  • // 过滤掉cleanableRatio小于配置的minCleanableRatio值的Log
  • val cleanableLogs = dirtyLogs.filter(ltc => ltc.cleanableRatio > ltc.log.config.minCleanableRatio)
  • if(cleanableLogs.isEmpty) {
  • None
  • } else {
  • // 选择要压缩的Log,只会选出最大的那个,内部是通过LogToClean对象的cleanableRatio值来比较
  • val filthiest = cleanableLogs.max
  • // 更新(或添加)此分区对应的压缩状态,将压缩状态置为LogCleaningInProgress
  • inProgress.put(filthiest.topicPartition, LogCleaningInProgress)
  • // 返回要压缩的日志对应的LogToClean对象
  • Some(filthiest)
  • }
  • }
  • }
  • // kafka.log.LogCleanerManager#allCleanerCheckpoints
  • /**
  • * @return the position processed for all logs.
  • */
  • def allCleanerCheckpoints(): Map[TopicAndPartition, Long] =
  • // 将所有的checkpoint读取出来
  • checkpoints.values.flatMap(_.read()).toMap

这里需要注意的是,日志压缩时会使用名为”cleaner-offset-checkpoint“的文件记录每个日志的Clean和Dirty两个部分的分界offset,该offset之前的为已经压缩过的Clean部分,后面为Dirty部分。上面提到过,LogCleanerManager的checkpoints字段用于维护数据目录与”cleaner-offset-checkpoint“文件之间的对应关系,该文件的结构与前面讲到的RecoveryPointCheckpoint文件是类似的,它们都使用OffsetCheckpoint类对offset信息进行管理。因此在上面代码的allCleanerCheckpoints(): Map[TopicAndPartition, Long]方法中同样是使用OffsetCheckpoint类的read()方法读取了”cleaner-offset-checkpoint“文件的内容并转换为类型为Map[TopicAndPartition, Long]的字典的。

grabFilthiestLog()方法会从logs中过滤出可以进行压缩的日志,将其封装为LogToClean对象;过滤的条件有以下几个:

  1. 日志主题未被标记为删除,即其cleanup.policy配置不可为delete;日志即将被删除了自然没有压缩的必要了;
  2. 日志并不处于正压缩的过程中;
  3. 日志内有数据;
  4. 日志的cleanableRatio大于配置的minCleanableRatio阈值(由min.cleanable.dirty.ratio配置),即只有Dirty部分的内容大于阈值才符合过滤条件。

过滤出的日志会根据”cleaner-offset-checkpoint“文件记录的Clean Offset及其中第一条消息的baseOffset比对得出最终的Dirty Offset,该Dirty Offset就是需要压缩的起始offset。

经过过滤得到的会是一个LogToClean对象的集合,对应了所有符合压缩条件的Log对象,然后会从该集合中取出最大的cleanableRatio的值作为LogCleanerManager对象的dirtiestLogCleanableRatio字段的值,这个过程其实是通过LogToClean的compare(that: LogToClean): Int进行的:

  • // kafka.log.LogToClean
  • /**
  • * Helper class for a log, its topic/partition, and the last clean position
  • */
  • private case class LogToClean(topicPartition: TopicAndPartition, log: Log, firstDirtyOffset: Long) extends Ordered[LogToClean] {
  • val cleanBytes = log.logSegments(-1, firstDirtyOffset).map(_.size).sum
  • val dirtyBytes = log.logSegments(firstDirtyOffset, math.max(firstDirtyOffset, log.activeSegment.baseOffset)).map(_.size).sum
  • val cleanableRatio = dirtyBytes / totalBytes.toDouble
  • def totalBytes = cleanBytes + dirtyBytes
  • // 比较方法,通过cleanableRatio值来比较
  • override def compare(that: LogToClean): Int = math.signum(this.cleanableRatio - that.cleanableRatio).toInt
  • }
  • // scala.collection.TraversableOnce#max
  • def max[B >: A](implicit cmp: Ordering[B]): A = {
  • if (isEmpty)
  • throw new UnsupportedOperationException("empty.max")
  • reduceLeft((x, y) => if (cmp.gteq(x, y)) x else y)
  • }

最终选取要压缩的Log,即从LogToClean集合中选择cleanableRatio值最大的对象作为压缩目标,将其TopicAndPartition作为键添加到inProgress字典中,值为LogCleaningInProgress表示该Log已经进入了压缩过程。

1.2.5. 日志压缩过程

在通过grabFilthiestLog()得到过滤结果之后,会对其进行判断,如果得到的结果中并没有包含任何LogToClean对象,就backOffWaitLatch退避锁退避一段时间后再重试,如果结果中有LogToClean对象,就将其传给Cleaner对象的clean(cleanable: LogToClean): Long方法进行压缩,压缩的过程主要有以下几步:

  1. 遍历需要压缩的日志所对应的消息数据,范围从First Dirty Offset至Active Segment的baseOffset,以填充OffsetMap。在OffsetMap中记录每个key应该保留的消息的offset。当OffsetMap被填充满时,就可以确定日志压缩的实际上限endOffset
  2. 根据deleteRetentionMs(由delete.retention.ms配置)配置,计算可以安全删除的“删除标识”(即value为空的消息)的LogSegment。
  3. logStartOffsetendOffset之间的LogSegmment进行分组。
  4. 按照第3步得到的分组分别进行日志压缩。

clean(...)方法的源码如下:

  • // kafka.log.Cleaner#clean
  • /**
  • * Clean the given log
  • *
  • * @param cleanable The log to be cleaned
  • *
  • * @return The first offset not cleaned
  • */
  • private[log] def clean(cleanable: LogToClean): Long = {
  • // 重置状态为这一次的压缩做准备
  • stats.clear()
  • info("Beginning cleaning of log %s.".format(cleanable.log.name))
  • // 获取需要压缩的Log对象
  • val log = cleanable.log
  • // build the offset map
  • info("Building offset map for %s...".format(cleanable.log.name))
  • // activeSegment不参与压缩,所以activeSegment的baseOffset是可以压缩的最大上限
  • val upperBoundOffset = log.activeSegment.baseOffset
  • // 第1步:填充OffsetMap,确定日志压缩的真正上限
  • val endOffset = buildOffsetMap(log, cleanable.firstDirtyOffset, upperBoundOffset, offsetMap) + 1
  • // 维护状态
  • stats.indexDone()
  • // figure out the timestamp below which it is safe to remove delete tombstones
  • // this position is defined to be a configurable time beneath the last modified time of the last clean segment
  • // 第2步:计算可以安全删除"删除标识"(即value为空的消息)的LogSegment
  • val deleteHorizonMs =
  • log.logSegments(0, cleanable.firstDirtyOffset).lastOption match {
  • case None => 0L
  • case Some(seg) => seg.lastModified - log.config.deleteRetentionMs
  • }
  • // group the segments and clean the groups
  • info("Cleaning log %s (discarding tombstones prior to %s)...".format(log.name, new Date(deleteHorizonMs)))
  • // 第3步:对要压缩的LogSegment进行分组,按照分组进行压缩
  • for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize))
  • // 第4步:开始进行压缩
  • cleanSegments(log, group, offsetMap, deleteHorizonMs)
  • // record buffer utilization
  • // 记录buffer使用率
  • stats.bufferUtilization = offsetMap.utilization
  • // 维护状态
  • stats.allDone()
  • // 返回压缩的真正上限
  • endOffset
  • }

下面将分步进行详细介绍。

1.2.5.1. 扫描消息并填充OffsetMap

第一步操作是由buildOffsetMap(log: Log, start: Long, end: Long, map: OffsetMap): Long方法完成的,该方法从First Dirty Offset开始遍历LogSegment直至Active Segment的baseOffset以填充offsetMap;前面提到过,offsetMap是一个SkimpyOffsetMap类型的Map,用于为dirty部分的消息建立key与last_offset的映射关系;buildOffsetMap(...)方法的源码如下:

  • // kafka.log.Cleaner#buildOffsetMap
  • /**
  • * Build a map of key_hash => offset for the keys in the dirty portion of the log to use in cleaning.
  • * @param log The log to use
  • * @param start The offset at which dirty messages begin
  • * @param end The ending offset for the map that is being built
  • * @param map The map in which to store the mappings
  • *
  • * @return The final offset the map covers
  • */
  • private[log] def buildOffsetMap(log: Log, start: Long, end: Long, map: OffsetMap): Long = {
  • // 清理传入的map
  • map.clear()
  • // 查找从[start, end)之间所有的LogSegment
  • val dirty = log.logSegments(start, end).toBuffer
  • info("Building offset map for log %s for %d segments in offset range [%d, %d).".format(log.name, dirty.size, start, end))
  • // Add all the dirty segments. We must take at least map.slots * load_factor,
  • // but we may be able to fit more (if there is lots of duplication in the dirty section of the log)
  • // 起始LogSegment的baseOffset
  • var offset = dirty.head.baseOffset
  • require(offset == start, "Last clean offset is %d but segment base offset is %d for log %s.".format(start, offset, log.name))
  • // 标识OffsetMap是否被填满
  • var full = false
  • // 在OffsetMap未满时,遍历所有的dirty LogSegment
  • for (segment <- dirty if !full) {
  • /**
  • * 检查LogCleanerManager记录的该分区的压缩状态,在该方法中
  • * 如果清理线程CleanerThread停止了,会抛出ThreadShutdownException异常;
  • * 如果LogSegment的压缩状态为LogCleaningAborted,会抛出LogCleaningAbortedException
  • */
  • checkDone(log.topicAndPartition)
  • // 处理单个LogSegment,将消息的key和offset添加到OffsetMap中
  • val newOffset = buildOffsetMapForSegment(log.topicAndPartition, segment, map)
  • if (newOffset > -1L) // OffsetMap未满,更新offset为newOffset
  • offset = newOffset
  • else {
  • // OffsetMap已满,此时如果offset大于start,表示OffsetMap中记录了最后一个LogSegment的一部分消息
  • // If not even one segment can fit in the map, compaction cannot happen
  • require(offset > start, "Unable to build the offset map for segment %s/%s. You can increase log.cleaner.dedupe.buffer.size or decrease log.cleaner.threads".format(log.name, segment.log.file.getName))
  • debug("Offset map is full, %d segments fully mapped, segment with base offset %d is partially mapped".format(dirty.indexOf(segment), segment.baseOffset))
  • // 标记OffsetMap已填满
  • full = true
  • }
  • }
  • info("Offset map for log %s complete.".format(log.name))
  • /**
  • * 返回offset,即本次日志压缩的结尾,
  • * 由于buildOffsetMapForSegment()方法的控制,不会出现半个LogSegment的情况
  • */
  • offset
  • }

buildOffsetMap(...)方法会遍历包含的消息的offset落在startend之间的LogSegment,通过buildOffsetMapForSegment(topicAndPartition: TopicAndPartition, segment: LogSegment, map: OffsetMap): Long方法建立每个LogSegment对应的符合OffsetMap要求的数据,并填充到offsetMap中,该方法源码如下:

  • // kafka.log.Cleaner#buildOffsetMapForSegment
  • /**
  • * Add the messages in the given segment to the offset map
  • *
  • * @param segment The segment to index
  • * @param map The map in which to store the key=>offset mapping
  • *
  • * @return The final offset covered by the map or -1 if the map is full
  • */
  • private def buildOffsetMapForSegment(topicAndPartition: TopicAndPartition, segment: LogSegment, map: OffsetMap): Long = {
  • var position = 0
  • var offset = segment.baseOffset
  • // 计算OffsetMap最大可装载的key和offset对的数量
  • val maxDesiredMapSize = (map.slots * this.dupBufferLoadFactor).toInt
  • // 遍历LogSegment
  • while (position < segment.log.sizeInBytes) {
  • // 检查压缩状态
  • checkDone(topicAndPartition)
  • readBuffer.clear()
  • // 从LogSegment中读取消息数据,并将其构造为ByteBufferMessageSet对象
  • val messages = new ByteBufferMessageSet(segment.log.readInto(readBuffer, position))
  • // 限速设置
  • throttler.maybeThrottle(messages.sizeInBytes)
  • val startPosition = position
  • // 遍历messages中的消息,可能会进行深层迭代
  • for (entry <- messages) {
  • val message = entry.message
  • if (message.hasKey) {
  • // 只会处理有键的消息
  • if (map.size < maxDesiredMapSize)
  • // 将key和offset放入OffsetMap
  • map.put(message.key, entry.offset)
  • else {
  • // OffsetMap填满了
  • // The map is full, stop looping and return
  • return -1L
  • }
  • }
  • // 更新offset
  • offset = entry.offset
  • // 维护记录处理的消息数量
  • stats.indexMessagesRead(1)
  • }
  • // 移动position,准备下一次读取
  • position += messages.validBytes
  • // 维护记录处理的消息字节数
  • stats.indexBytesRead(messages.validBytes)
  • // if we didn't read even one complete message, our read buffer may be too small
  • /**
  • * 如果position没有移动,还是与startPosition一致,说明没有读取到一个完整的Message,
  • * 可能是由于readBuffer和writeBuffer容量不够用,因此对readBuffer和writeBuffer进行扩容后重新读取
  • * 扩容后的新容量是旧容量的2倍,但不超过maxIoBufferSize
  • */
  • if(position == startPosition)
  • growBuffers()
  • }
  • // 重置readBuffer和writeBuffer的大小
  • restoreBuffers()
  • // 返回LogSegment的最后一个消息的offset
  • offset
  • }

在方法buildOffsetMapForSegment(...)遍历消息的过程中是需要读取消息数据的,在向SkimpyOffsetMap类型的offsetMap中添加数据时,如果遇到相同的键,新的offset会把旧的offset覆盖掉,这部分的源码如下:

  • // kafka.log.SkimpyOffsetMap#put
  • /**
  • * Associate this offset to the given key.
  • * @param key The key
  • * @param offset The offset
  • */
  • override def put(key: ByteBuffer, offset: Long) {
  • require(entries < slots, "Attempt to add a new entry to a full offset map.")
  • lookups += 1
  • // 对key进行hash操作(MD5加密,16位字节),hash后的数据会放在hash1中
  • hashInto(key, hash1)
  • // probe until we find the first empty slot
  • // 探测直到找到空位
  • var attempt = 0
  • var pos = positionOf(hash1, attempt)
  • while(!isEmpty(pos)) {
  • // 获取该位置的hash值
  • bytes.position(pos)
  • bytes.get(hash2)
  • // 比较两个hash值
  • if(Arrays.equals(hash1, hash2)) {
  • // 对比两个hash如果相同说明之前有key和offset已经保存过了,直接覆盖重写offset
  • // we found an existing entry, overwrite it and return (size does not change)
  • bytes.putLong(offset)
  • return
  • }
  • attempt += 1
  • // 继续查找
  • pos = positionOf(hash1, attempt)
  • }
  • // found an empty slot, update it--size grows by 1
  • // 没有找到,直接添加新的offset,存储的方式是hash与offset连续存储
  • bytes.position(pos)
  • bytes.put(hash1)
  • bytes.putLong(offset)
  • entries += 1
  • }

buildOffsetMapForSegment(...)方法会遍历完LogSegment中所有的消息或直到offsetMap被装满,当offsetMap被装满时会返回-1,否则返回遍历到的最后一条消息的offset。

1.2.5.2. 计算可安全删除的时间段

offsetMap填充完后,回到clean(cleanable: LogToClean): Long方法中,此时会计算可以安全删除”删除标识“的LogSegment,得到的计算结果称为deleteHorizonMs,这个值表示,从deleteHorizonMs的这个时间点至First Dirty Offset点的LogSegment的最后修改时间点的这段时间内的消息数据中,如果消息的value为空可以直接删除,如下图所示:

1.2.5.3. LogSegment分组

接下来要开始遍历消息数据了,先获取当前Log中offset从0至endOffset之间的LogSegment集合,然后使用groupSegmentsBySize(segments: Iterable[LogSegment], maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]]方法对该LogSegment集合进行分组,即每个LogSegment只属于一组,一个LogSegment不会被切分到多个组中。在分组过程中,除了限制了每个分组中消息的总字节数,还限制了每个分组对应索引的总字节数。该分组方法的源码如下:

  • /**
  • * Group the segments in a log into groups totaling less than a given size. the size is enforced separately for the log data and the index data.
  • * We collect a group of such segments together into a single
  • * destination segment. This prevents segment sizes from shrinking too much.
  • *
  • * 对LogSegment进行分组,将相同主题分区的LogSegment分到一组
  • *
  • * @param segments The log segments to group 需要分组的LogSegment集合
  • * @param maxSize the maximum size in bytes for the total of all log data in a group 一个分组最大的字节数
  • * @param maxIndexSize the maximum size in bytes for the total of all index data in a group 一个分组最大的索引字节数
  • *
  • * @return A list of grouped segments
  • */
  • private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]] = {
  • // 创建容器
  • var grouped = List[List[LogSegment]]()
  • var segs = segments.toList
  • // 遍历LogSegment数组
  • while(!segs.isEmpty) {
  • // 头LogSegment
  • var group = List(segs.head)
  • // 头LogSegment的大小
  • var logSize = segs.head.size
  • // 头LogSegment的索引大小
  • var indexSize = segs.head.index.sizeInBytes
  • // 截除头LogSegment
  • segs = segs.tail
  • while(!segs.isEmpty && // 还剩余有LogSegment
  • logSize + segs.head.size <= maxSize && // 检查logSize是否过大
  • indexSize + segs.head.index.sizeInBytes <= maxIndexSize && // 检查indexSize是否过大
  • /**
  • * 剩余LogSegment的头LogSegment的最后一个offset索引
  • * 与刚刚截除的头LogSegment的baseOffset索引的差值
  • * 小于等于Int.MaxValue时,表示是同一个LogSegment
  • */
  • segs.head.index.lastOffset - group.last.index.baseOffset <= Int.MaxValue) {
  • // 将头LogSegment添加到group头部
  • group = segs.head :: group
  • // 维护logSize和indexSize
  • logSize += segs.head.size
  • indexSize += segs.head.index.sizeInBytes
  • // 截除头LogSegment
  • segs = segs.tail
  • }
  • // 此时group已经是分好的一组了,将group反转后添加到grouped中
  • grouped ::= group.reverse
  • }
  • // 将grouped反转后返回
  • grouped.reverse
  • }

1.2.5.4. 进行压缩

分完组后,会通过cleanSegments(log: Log, segments: Seq[LogSegment], map: OffsetMap, deleteHorizonMs: Long)方法对每个分组的LogSegment进行压缩处理,压缩操作主要有以下几步:

  1. 为日志文件和索引文件分别创建以原文件名加上“.cleaned”后缀的新的文件,为两个文件分别创建对应的FileMessageSet和OffsetIndex对象,同时根据这两个对象构建LogSegment对象,用于存放压缩后的数据。
  2. 遍历LogSegment集合,计算每个LogSegment的最后修改时间是否位于安全删除时间段内。
  3. 调用cleanInto(topicAndPartition: TopicAndPartition, source: LogSegment, dest: LogSegment, map: OffsetMap, retainDeletes: Boolean, messageFormatVersion: Byte)方法对遍历的LogSegment进行清理,清理后的数据将存放到的新的日志文件和索引文件中(即以.cleaned后缀结尾的文件)。
  4. 裁剪索引文件,删除多余空间;将新的日志文件和索引文件刷盘持久化,并更新它们的最后修改时间。
  5. 替换Log的segments跳表里原来的旧LogSegment为新的LogSegment,流程如下:
    • 首先将两个后缀为.cleaned的新文件修改为.swap后缀;
    • 然后将新的LogSegment加入到Log.segments跳表,从Log.segments跳表中删除旧的LogSegment;
    • 将旧LogSegment对应的日志文件和索引文件删除,将.swap文件的.swap后缀删除。

该方法的源码如下:

  • /**
  • * Clean a group of segments into a single replacement segment
  • *
  • * @param log The log being cleaned
  • * @param segments The group of segments being cleaned
  • * @param map The offset map to use for cleaning segments
  • * @param deleteHorizonMs The time to retain delete tombstones
  • */
  • private[log] def cleanSegments(log: Log,
  • segments: Seq[LogSegment],
  • map: OffsetMap,
  • deleteHorizonMs: Long) {
  • // create a new segment with the suffix .cleaned appended to both the log and index name
  • // 创建新的日志文件和索引,文件名是分组中第一个LogSegment所在的文件的名称后面加上.cleaned后缀为新文件名
  • val logFile = new File(segments.head.log.file.getPath + Log.CleanedFileSuffix)
  • logFile.delete()
  • val indexFile = new File(segments.head.index.file.getPath + Log.CleanedFileSuffix)
  • indexFile.delete()
  • // 根据logFile创建FileMessageSet对象
  • val messages = new FileMessageSet(logFile, fileAlreadyExists = false, initFileSize = log.initFileSize(), preallocate = log.config.preallocate)
  • // 根据indexFile创建OffsetIndex对象
  • val index = new OffsetIndex(indexFile, segments.head.baseOffset, segments.head.index.maxIndexSize)
  • // 根据FileMessageSet和OffsetIndex创建LogSegment对象
  • val cleaned = new LogSegment(messages, index, segments.head.baseOffset, segments.head.indexIntervalBytes, log.config.randomSegmentJitter, time)
  • try {
  • // clean segments into the new destination segment
  • // 遍历一组内的LogSegment
  • for (old <- segments) {
  • // 判定此LogSegment中"删除标记"是否可以安全删除
  • val retainDeletes = old.lastModified > deleteHorizonMs
  • info("Cleaning segment %s in log %s (last modified %s) into %s, %s deletes."
  • .format(old.baseOffset, log.name, new Date(old.lastModified), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding"))
  • // 进行日志压缩
  • cleanInto(log.topicAndPartition, old, cleaned, map, retainDeletes, log.config.messageFormatVersion.messageFormatVersion)
  • }
  • // trim excess index
  • // 截除多余的索引项
  • index.trimToValidSize()
  • // flush new segment to disk before swap
  • // 将压缩得到的LogSegment进行刷盘
  • cleaned.flush()
  • // update the modification date to retain the last modified date of the original files
  • // 修改LogSegment的最后修改时间
  • val modified = segments.last.lastModified
  • cleaned.lastModified = modified
  • // swap in new segment
  • info("Swapping in cleaned segment %d for segment(s) %s in log %s.".format(cleaned.baseOffset, segments.map(_.baseOffset).mkString(","), log.name))
  • /**
  • * 替换原来的LogSegment
  • * 首先将.cleaned文件改为.swap后缀,将LogSegment加入到Log.segments跳表
  • * 从Log.segments跳表中删除旧的LogSegment(对应的文件也会被删除)
  • * 将.swap文件的.swap后缀删除
  • */
  • log.replaceSegments(cleaned, segments)
  • } catch {
  • case e: LogCleaningAbortedException =>
  • // 如果出错,删除cleaned文件
  • cleaned.delete()
  • throw e
  • }
  • }

cleanSegments(...)方法的源码是比较清晰的,这里主要讨论一下第3步中涉及到的`cleanInto(...)方法,该方法也是负责对日志进行压缩的主要方法:

  • /**
  • * Clean the given source log segment into the destination segment using the key=>offset mapping
  • * provided
  • *
  • * @param source The dirty log segment
  • * @param dest The cleaned log segment
  • * @param map The key=>offset mapping
  • * @param retainDeletes Should delete tombstones be retained while cleaning this segment
  • * @param messageFormatVersion the message format version to use after compaction
  • */
  • private[log] def cleanInto(topicAndPartition: TopicAndPartition,
  • source: LogSegment,
  • dest: LogSegment,
  • map: OffsetMap,
  • retainDeletes: Boolean,
  • messageFormatVersion: Byte) {
  • var position = 0
  • // 遍历LogSegment中日志数据
  • while (position < source.log.sizeInBytes) {
  • // 检查状态
  • checkDone(topicAndPartition)
  • // read a chunk of messages and copy any that are to be retained to the write buffer to be written out
  • // 清除readBuffer和writeBuffer
  • readBuffer.clear()
  • writeBuffer.clear()
  • // 将LogSegment的日志数据读到readBuffer中,然后根据该装有数据的buffer创建ByteBufferMessageSet对象
  • val messages = new ByteBufferMessageSet(source.log.readInto(readBuffer, position))
  • // 限速
  • throttler.maybeThrottle(messages.sizeInBytes)
  • // check each message to see if it is to be retained
  • var messagesRead = 0
  • // 对读取得到的ByteBufferMessageSet进行遍历,浅层迭代
  • for (entry <- messages.shallowIterator) {
  • // 得到消息大小
  • val size = MessageSet.entrySize(entry.message)
  • stats.readMessage(size)
  • // 判断消息是否使用了压缩器,如果使用了压缩器,需要深层迭代
  • if (entry.message.compressionCodec == NoCompressionCodec) {
  • // 未使用压缩
  • /**
  • * shouldRetainMessage方法会根据OffsetMap、retainDeletes在原LogSegment中判断entry是否需要保留
  • * 条件如下:
  • * 1. entry消息是否有键,无键表示是无效消息,可以删除;
  • * 2. OffsetMap中是否有与entry的键相同的且offset更大的消息,如果是表示entry是过期消息
  • * 3. entry消息被"删除标记"且LogSegment配置为"删除标记"可以安全删除,可以删除的前提条件
  • */
  • if (shouldRetainMessage(source, map, retainDeletes, entry)) {
  • // 需要保留消息
  • ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, entry.offset)
  • stats.recopyMessage(size)
  • }
  • messagesRead += 1
  • } else {
  • // 使用了压缩器,需要深层迭代
  • // We use the absolute offset to decide whether to retain the message or not. This is handled by the
  • // deep iterator.
  • val messages = ByteBufferMessageSet.deepIterator(entry)
  • var writeOriginalMessageSet = true
  • val retainedMessages = new mutable.ArrayBuffer[MessageAndOffset]
  • // 遍历深层消息
  • messages.foreach { messageAndOffset =>
  • messagesRead += 1
  • // 判断消息是否保留
  • if (shouldRetainMessage(source, map, retainDeletes, messageAndOffset))
  • retainedMessages += messageAndOffset
  • else writeOriginalMessageSet = false // 一旦有消息不保留,则置writeOriginalMessageSet为false
  • }
  • // There are no messages compacted out, write the original message set back
  • if (writeOriginalMessageSet)
  • // 如果writeOriginalMessageSet为true,表示内部压缩消息没有需要清理的,直接将Message写出即可
  • ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, entry.offset)
  • else
  • // 否则调用compressMessages()重新压缩retainedMessages集合,同时写入到writeBuffer
  • compressMessages(writeBuffer, entry.message.compressionCodec, messageFormatVersion, retainedMessages)
  • }
  • }
  • // 维护position位置
  • position += messages.validBytes
  • // if any messages are to be retained, write them out
  • // 如果有需要保留的消息,将其追加到压缩后的LogSegment中
  • if (writeBuffer.position > 0) {
  • writeBuffer.flip()
  • val retained = new ByteBufferMessageSet(writeBuffer)
  • dest.append(retained.head.offset, retained)
  • throttler.maybeThrottle(writeBuffer.limit)
  • }
  • // if we read bytes but didn't get even one complete message, our I/O buffer is too small, grow it and try again
  • // 未读取到完整的消息,readBuffer可能过小,进行扩容
  • if (readBuffer.limit > 0 && messagesRead == 0)
  • growBuffers()
  • }
  • // 重置readBuffer和writeBuffer
  • restoreBuffers()
  • }

该方法的逻辑是比较简单的,遍历LogSegment中的每条消息数据,判断是否需要保留消息,如果需要保留就将消息数据及offset写入到写缓冲区中,最后根据写缓冲区构建ByteBufferMessageSet对象,添加到新的LogSegment中。

可以发现,其中判断消息是否应该保留的方法是shouldRetainMessage(source: kafka.log.LogSegment, map: kafka.log.OffsetMap, retainDeletes: Boolean, entry: kafka.message.MessageAndOffset): Boolean,源码如下:

  • // 返回false表示可以删除消息,true表示保留消息
  • private def shouldRetainMessage(source: kafka.log.LogSegment,
  • map: kafka.log.OffsetMap,
  • retainDeletes: Boolean,
  • entry: kafka.message.MessageAndOffset): Boolean = {
  • // 获取entry的键
  • val key = entry.message.key
  • if (key != null) {
  • // 键不为空,从OffsetMap中查找该键记录的消息的offset
  • val foundOffset = map.get(key)
  • /* two cases in which we can get rid of a message:
  • * 1) if there exists a message with the same key but higher offset
  • * 2) if the message is a delete "tombstone" marker and enough time has passed
  • *
  • */
  • // 如果OffsetMap中获取的offset比entry的offset还要大,说明entry消息是旧消息,可以被删除
  • val redundant = foundOffset >= 0 && entry.offset < foundOffset
  • // 根据LogSegment配置的"删除标记"策略以及消息数据是否为空来决定是否删除
  • val obsoleteDelete = !retainDeletes && entry.message.isNull
  • !redundant && !obsoleteDelete
  • } else {
  • // 键为空,为无效消息,返回false
  • stats.invalidMessage()
  • false
  • }
  • }

shouldRetainMessage(...)方法中,没有键的消息会设为不保留;对有键的消息还需要根据键从offsetMap中查找对应记录的offset,如果查找到的offset比该消息的offset还要大,说明该消息是过期消息,将不予保留;同时还会根据retainDeletes参数来决定当前消息如果没有值是否需要丢弃。

至此,日志的压缩操作就讲解完了;日志压缩几乎是日志操作中最为复杂的操作,其原理虽然简单,但实现上面却非常复杂,整个过程需要对消息数据进行两次读取和一次写出。需要注意的是,日志压缩不仅删除了旧的键重复的消息,还会对键为空、值为空且处于安全删除期内的消息一并进行清理。

1.3. 辅助操作

LogManager还提供了几个辅助的方法用于创建、获取和删除Log,其中创建和删除操作时需要加锁控制并发的;下面分别对它们进行介绍。

1.3.1. 创建Log

创建Log操作由createLog(topicAndPartition: TopicAndPartition, config: LogConfig): Log方法实现,需要传入表示主题和分区的TopicAndPartition对象及其配置:

  • // kafka.log.LogManager#createLog
  • /**
  • * Create a log for the given topic and the given partition
  • * If the log already exists, just return a copy of the existing log
  • * 根据指定的Topic和Partition创建Log对象,如果已存在就直接返回存在的
  • * 会选取文件最少的log目录下创建
  • */
  • def createLog(topicAndPartition: TopicAndPartition, config: LogConfig): Log = {
  • // 加锁
  • logCreationOrDeletionLock synchronized {
  • var log = logs.get(topicAndPartition)
  • // check if the log has already been created in another thread
  • // Log已存在,直接返回
  • if(log != null)
  • return log
  • // if not, create it
  • // 不存在,选择Log最少的目录
  • val dataDir = nextLogDir()
  • // 在选择的log目录下创建文件,文件名为"topic名称 - partition序号"
  • val dir = new File(dataDir, topicAndPartition.topic + "-" + topicAndPartition.partition)
  • dir.mkdirs()
  • // 根据文件创建Log对象
  • log = new Log(dir,
  • config,
  • recoveryPoint = 0L,
  • scheduler,
  • time)
  • // 将Log放入logs池中
  • logs.put(topicAndPartition, log)
  • info("Created log for partition [%s,%d] in %s with properties {%s}."
  • .format(topicAndPartition.topic,
  • topicAndPartition.partition,
  • dataDir.getAbsolutePath,
  • {import JavaConversions._; config.originals.mkString(", ")}))
  • // 返回log
  • log
  • }
  • }

其中比较重要的是数据目录的选择,由于Kafka支持配置多数据目录,因此在每次创建Log对象对应的目录时需要选择已存放分区目录最少的数据目录,这个功能由nextLogDir()实现,源码比较简单:

  • // kafka.log.LogManager#nextLogDir
  • /**
  • * Choose the next directory in which to create a log. Currently this is done
  • * by calculating the number of partitions in each directory and then choosing the
  • * data directory with the fewest partitions.
  • */
  • private def nextLogDir(): File = {
  • if(logDirs.size == 1) {
  • // 只有一个log目录,直接返回
  • logDirs(0)
  • } else {
  • // count the number of logs in each parent directory (including 0 for empty directories
  • // 有多个log目录
  • // 计算每个log目录中的Log数量
  • val logCounts = allLogs.groupBy(_.dir.getParent).mapValues(_.size)
  • val zeros = logDirs.map(dir => (dir.getPath, 0)).toMap
  • var dirCounts = (zeros ++ logCounts).toBuffer
  • // choose the directory with the least logs in it
  • // 排序,选择Log最少的log目录
  • val leastLoaded = dirCounts.sortBy(_._2).head
  • new File(leastLoaded._1)
  • }
  • }

1.3.2. 获取Log

获取Log由getLog(topicAndPartition: TopicAndPartition): Option[Log]方法实现,比较简单:

  • // kafka.log.LogManager#getLog
  • /**
  • * Get the log if it exists, otherwise return None
  • */
  • def getLog(topicAndPartition: TopicAndPartition): Option[Log] = {
  • // 从logs目录中根据Topic和Partition获取Log
  • val log = logs.get(topicAndPartition)
  • if (log == null)
  • None
  • else
  • Some(log)
  • }

1.3.3. 删除Log

删除Log由deleteLog(topicAndPartition: TopicAndPartition)方法实现:

  • /**
  • * Delete a log.
  • * 删除Log
  • */
  • def deleteLog(topicAndPartition: TopicAndPartition) {
  • var removedLog: Log = null
  • // 加锁
  • logCreationOrDeletionLock synchronized {
  • // 从logs集合中移除
  • removedLog = logs.remove(topicAndPartition)
  • }
  • if (removedLog != null) {
  • //We need to wait until there is no more cleaning task on the log to be deleted before actually deleting it.
  • if (cleaner != null) {
  • // 停止对此Log的压缩操作,阻塞等待压缩状态的改变
  • cleaner.abortCleaning(topicAndPartition)
  • // 更新cleaner-offset-checkpoint文件
  • cleaner.updateCheckpoints(removedLog.dir.getParentFile)
  • }
  • // 删除相关的日志文件、索引文件及目录
  • removedLog.delete()
  • info("Deleted log for partition [%s,%d] in %s."
  • .format(topicAndPartition.topic,
  • topicAndPartition.partition,
  • removedLog.dir.getAbsolutePath))
  • }
  • }

在删除Log时,会先从logs池中将其移除,然后停止其日志压缩操作,最后删除对应的日志文件、索引文件和目录。

2. 日志系统的初始化

LogManager除了在startup()方法中定义了定时任务及日志压缩任务之外,在构造LogManager对象时还进行了一系列的日志系统的初始化工作,回顾它的源码:

  • @threadsafe
  • class LogManager(val logDirs: Array[File],
  • val topicConfigs: Map[String, LogConfig],
  • val defaultConfig: LogConfig,
  • val cleanerConfig: CleanerConfig,
  • ioThreads: Int,
  • val flushCheckMs: Long,
  • val flushCheckpointMs: Long,
  • val retentionCheckMs: Long,
  • scheduler: Scheduler,
  • val brokerState: BrokerState,
  • private val time: Time) extends Logging {
  • ...
  • // 保证每个log目录都存在且可读
  • createAndValidateLogDirs(logDirs)
  • ...
  • // 加载Log
  • loadLogs()
  • ...
  • }

从源码可知,在LogManager构建时还调用了createAndValidateLogDirs(dirs: Seq[File])loadLogs()方法;createAndValidateLogDirs(dirs: Seq[File])方法用于创建并检查数据目录,源码如下:

  • // kafka.log.LogManager#createAndValidateLogDirs
  • /**
  • * Create and check validity of the given directories, specifically:
  • * <ol>
  • * <li> Ensure that there are no duplicates in the directory list
  • * <li> Create each directory if it doesn't exist
  • * <li> Check that each path is a readable directory
  • * </ol>
  • */
  • private def createAndValidateLogDirs(dirs: Seq[File]) {
  • /**
  • * 美[kə'nɑnɪk(ə)l]英[kə'nɒnɪk(ə)l]
  • * adj.被收入真经篇目的;经典的;按照基督教教会法规的
  • * n.(布道时应穿的)法衣
  • * 网络标准;规范的;典型
  • */
  • if(dirs.map(_.getCanonicalPath).toSet.size < dirs.size)
  • // log目录的实际个数不准确
  • throw new KafkaException("Duplicate log directory found: " + logDirs.mkString(", "))
  • // 遍历目录
  • for(dir <- dirs) {
  • // 判断目录是否存在
  • if(!dir.exists) {
  • info("Log directory '" + dir.getAbsolutePath + "' not found, creating it.")
  • // 不存在则创建目录
  • val created = dir.mkdirs()
  • if(!created)
  • // 如果创建失败则抛出异常
  • throw new KafkaException("Failed to create data directory " + dir.getAbsolutePath)
  • }
  • // 保证是目录且可读
  • if(!dir.isDirectory || !dir.canRead)
  • throw new KafkaException(dir.getAbsolutePath + " is not a readable log directory.")
  • }
  • }

createAndValidateLogDirs(...)会保证数据目录存在(如果不存在就新创建)并且是可读的。

loadLogs()方法的作用主要有以下几个:

  1. 为每个数据目录分配一个有ioThreads条线程的线程池,用来执行恢复操作。
  2. 检测Broker上次关闭是否正常,并设置Broker的状态。在Broker正常关闭时,会创建一个“.kafka_cleanshutdown”的文件,这里就是通过此文件进行判断的。
  3. 读取RecoveryPointCheckpoint文件,载入每个Log的recoveryPoint
  4. 为每个Log创建一个恢复任务,交给线程池处理。主线程阻塞等待所有的恢复任务完成,并关闭所有在第1步中创建的线程池。

该方法的源码如下:

  • // kafka.log.LogManager#loadLogs
  • /**
  • * Recover and load all logs in the given data directories
  • */
  • private def loadLogs(): Unit = {
  • info("Loading logs.")
  • // 用于保存log目录对应的线程池
  • val threadPools = mutable.ArrayBuffer.empty[ExecutorService]
  • val jobs = mutable.Map.empty[File, Seq[Future[_]]]
  • // 遍历log目录,对每个log目录进行操作
  • for (dir <- this.logDirs) {
  • // 创建线程池,线程个数为ioThreads
  • val pool = Executors.newFixedThreadPool(ioThreads)
  • // 将线程池添加到threadPools进行记录
  • threadPools.append(pool)
  • /**
  • * 检测broker上次是否是正常关闭的
  • * 如果是正常关闭的,在目录下会保存有一个.kafka_cleanshutdown的文件
  • * 如果该文件不存在,说明broker上次是非正常关闭的
  • */
  • val cleanShutdownFile = new File(dir, Log.CleanShutdownFile)
  • if (cleanShutdownFile.exists) {
  • // 正常关闭
  • debug(
  • "Found clean shutdown file. " +
  • "Skipping recovery for all logs in data directory: " +
  • dir.getAbsolutePath)
  • } else {
  • // log recovery itself is being performed by `Log` class during initialization
  • // 非正常关闭,修改brokerState为RecoveringFromUncleanShutdown
  • brokerState.newState(RecoveringFromUncleanShutdown)
  • }
  • // 读取每个log目录下的RecoveryPointCheckpoint文件并生成TopicAndPartition与recoveryPoint的对应关闭
  • var recoveryPoints = Map[TopicAndPartition, Long]()
  • try {
  • // 载入recoveryPoints
  • recoveryPoints = this.recoveryPointCheckpoints(dir).read
  • } catch {
  • case e: Exception => {
  • warn("Error occured while reading recovery-point-offset-checkpoint file of directory " + dir, e)
  • warn("Resetting the recovery checkpoint to 0")
  • }
  • }
  • // 遍历所有的log目录的子文件,将文件过滤,只保留目录
  • val jobsForDir = for { // 这个括号内是遍历条件
  • dirContent <- Option(dir.listFiles).toList
  • logDir <- dirContent if logDir.isDirectory
  • } yield { // yield代码块是for的循环体
  • // 为每个log目录创建一个Runnable任务
  • CoreUtils.runnable {
  • debug("Loading log '" + logDir.getName + "'")
  • // 从目录名解析出Topic名称和分区编号
  • val topicPartition = Log.parseTopicPartitionName(logDir)
  • // 获取Log对应的配置,也即是主题分区对应的配置
  • val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
  • // 获取Log对应的recoveryPoint
  • val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)
  • // 创建Log对象
  • val current = new Log(logDir, config, logRecoveryPoint, scheduler, time)
  • // 将Log对象保存到logs集合中,所有分区的Log成功加载完成
  • val previous = this.logs.put(topicPartition, current)
  • if (previous != null) {
  • throw new IllegalArgumentException(
  • "Duplicate log directories found: %s, %s!".format(
  • current.dir.getAbsolutePath, previous.dir.getAbsolutePath))
  • }
  • }
  • }
  • // 将jobsForDir中的所有任务放到线程池中执行,并将Future形成Seq,保存到jobs中
  • jobs(cleanShutdownFile) = jobsForDir.map(pool.submit).toSeq
  • }
  • try {
  • // 等待jobs中的Runnable完成
  • for ((cleanShutdownFile, dirJobs) <- jobs) {
  • dirJobs.foreach(_.get)
  • // 删除.kafka_cleanshutdown文件
  • cleanShutdownFile.delete()
  • }
  • } catch {
  • case e: ExecutionException => {
  • error("There was an error in one of the threads during logs loading: " + e.getCause)
  • throw e.getCause
  • }
  • } finally {
  • // 关闭全部的线程池
  • threadPools.foreach(_.shutdown())
  • }
  • info("Logs loading complete.")
  • }

loadLogs()的源码在流程上是非常清晰的,前面的第1 ~ 3步的实现都比较简单,重要的操作全在第4步中,这一步会对每个数据目录做同样的操作,抽取出来的操作代码如下:

  • // kafka.log.LogManager#loadLogs
  • // 为每个log目录创建一个Runnable任务
  • CoreUtils.runnable {
  • debug("Loading log '" + logDir.getName + "'")
  • // 从目录名解析出Topic名称和分区编号
  • val topicPartition = Log.parseTopicPartitionName(logDir)
  • // 获取Log对应的配置,也即是主题分区对应的配置
  • val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
  • // 获取Log对应的recoveryPoint
  • val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)
  • // 创建Log对象
  • val current = new Log(logDir, config, logRecoveryPoint, scheduler, time)
  • // 将Log对象保存到logs集合中,所有分区的Log成功加载完成
  • val previous = this.logs.put(topicPartition, current)
  • if (previous != null) {
  • throw new IllegalArgumentException(
  • "Duplicate log directories found: %s, %s!".format(
  • current.dir.getAbsolutePath, previous.dir.getAbsolutePath))
  • }
  • }

可以发现,在该过程中会根据数据目录、配置信息、对应的RecoveryPoint、调度器等信息创建Log对象,然后将该Log对象添加到logs池中由LogManager进行管理。

而创建Log对象的过程,会调用它极其重要的方法loadSegments(),用来加载它所管辖的LogSegment对象;这个方法在上一篇文章中已经简单介绍过了,先贴出源码:

  • // kafka.log.Log#loadSegments
  • /* Load the log segments from the log files on disk */
  • private def loadSegments() {
  • // create the log directory if it doesn't exist
  • dir.mkdirs()
  • var swapFiles = Set[File]()
  • // first do a pass through the files in the log directory and remove any temporary files
  • // and find any interrupted swap operations
  • // 遍历dir目录下的文件,处理.cleaned、.deleted、.swap文件
  • for(file <- dir.listFiles if file.isFile) {
  • // 文件不可读,抛异常
  • if(!file.canRead)
  • throw new IOException("Could not read file " + file)
  • val filename = file.getName
  • if(filename.endsWith(DeletedFileSuffix) || filename.endsWith(CleanedFileSuffix)) {
  • // if the file ends in .deleted or .cleaned, delete it
  • // 文件是以.deleted或.cleaned后缀结尾,则直接删除;
  • // .deleted表示本就标记为要删除的文件;.cleaned文件表示压缩过程出现宕机
  • file.delete()
  • } else if(filename.endsWith(SwapFileSuffix)) {
  • // we crashed in the middle of a swap operation, to recover:
  • // if a log, delete the .index file, complete the swap operation later
  • // if an index just delete it, it will be rebuilt
  • // 文件是以.swap后缀结尾;.swap文件表示压缩完的完整消息,可以使用,将其后缀去掉
  • val baseName = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, ""))
  • if(baseName.getPath.endsWith(IndexFileSuffix)) {
  • // 如果该.swap文件是索引文件,直接删除
  • file.delete()
  • } else if(baseName.getPath.endsWith(LogFileSuffix)){
  • // delete the index
  • // 如果该.swap文件是日志文件,则需要进行恢复;获取该日志文件对应的索引文件,将其删除
  • val index = new File(CoreUtils.replaceSuffix(baseName.getPath, LogFileSuffix, IndexFileSuffix))
  • index.delete()
  • // 将该文件添加到swapFiles集合
  • swapFiles += file
  • }
  • }
  • }
  • // now do a second pass and load all the .log and .index files
  • /**
  • * 加载全部的日志文件及索引文件,
  • * - 如果存在没有对应的日志文件的索引文件,就将该索引文件也删除
  • * - 如果存在没有对应的索引文件的日志文件,则为其重建索引文件
  • */
  • for(file <- dir.listFiles if file.isFile) {
  • val filename = file.getName
  • if(filename.endsWith(IndexFileSuffix)) {
  • // if it is an index file, make sure it has a corresponding .log file
  • // 是.index文件,判断对应的.log文件是否存在,如果不存在就将.index文件也删除
  • val logFile = new File(file.getAbsolutePath.replace(IndexFileSuffix, LogFileSuffix))
  • if(!logFile.exists) {
  • warn("Found an orphaned index file, %s, with no corresponding log file.".format(file.getAbsolutePath))
  • file.delete()
  • }
  • } else if(filename.endsWith(LogFileSuffix)) {
  • // if its a log file, load the corresponding log segment
  • // 是.loh文件,先创建并加载LogSegment
  • val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong
  • val indexFile = Log.indexFilename(dir, start)
  • val segment = new LogSegment(dir = dir,
  • startOffset = start,
  • indexIntervalBytes = config.indexInterval,
  • maxIndexSize = config.maxIndexSize,
  • rollJitterMs = config.randomSegmentJitter,
  • time = time,
  • fileAlreadyExists = true)
  • // 判断对应的.index文件是否存在,如果不存在就根据.log文件重建.index文件
  • if(indexFile.exists()) {
  • // .index文件存在
  • try {
  • // 检查索引文件的完整性
  • segment.index.sanityCheck()
  • } catch {
  • case e: java.lang.IllegalArgumentException =>
  • warn("Found a corrupted index file, %s, deleting and rebuilding index...".format(indexFile.getAbsolutePath))
  • indexFile.delete()
  • segment.recover(config.maxMessageSize)
  • }
  • }
  • else {
  • // .index文件不存在
  • error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath))
  • // 根据.log文件重建.index文件
  • segment.recover(config.maxMessageSize)
  • }
  • // 将LogSegment添加到segments跳表中
  • segments.put(start, segment)
  • }
  • }
  • // Finally, complete any interrupted swap operations. To be crash-safe,
  • // log files that are replaced by the swap segment should be renamed to .deleted
  • // before the swap file is restored as the new segment file.
  • // 处理前面得到的.swap文件集合
  • for (swapFile <- swapFiles) {
  • // 去掉.swap后缀的文件
  • val logFile = new File(CoreUtils.replaceSuffix(swapFile.getPath, SwapFileSuffix, ""))
  • val fileName = logFile.getName
  • // 根据文件名得到baseOffset
  • val startOffset = fileName.substring(0, fileName.length - LogFileSuffix.length).toLong
  • // 获取对应的.index.swap文件
  • val indexFile = new File(CoreUtils.replaceSuffix(logFile.getPath, LogFileSuffix, IndexFileSuffix) + SwapFileSuffix)
  • // 根据上面得到的信息创建OffsetIndex
  • val index = new OffsetIndex(indexFile, baseOffset = startOffset, maxIndexSize = config.maxIndexSize)
  • // 创建LogSegment
  • val swapSegment = new LogSegment(new FileMessageSet(file = swapFile),
  • index = index,
  • baseOffset = startOffset,
  • indexIntervalBytes = config.indexInterval,
  • rollJitterMs = config.randomSegmentJitter,
  • time = time)
  • info("Found log file %s from interrupted swap operation, repairing.".format(swapFile.getPath))
  • // 重建索引文件并验证日志文件
  • swapSegment.recover(config.maxMessageSize)
  • // 查找swapSegment对应的日志压缩前的LogSegment集合
  • val oldSegments = logSegments(swapSegment.baseOffset, swapSegment.nextOffset)
  • /**
  • * 将swapSegment放入segments跳表,
  • * 将oldSegments的LogSegment从segments跳表中删除,同时删除对应的日志文件和索引文件
  • * 将swapSegment对应日志文件的.swap后缀去掉
  • */
  • replaceSegments(swapSegment, oldSegments.toSeq, isRecoveredSwapFile = true)
  • }
  • // 如果segments为空,需要添加一个LogSegment作为activeSegment
  • if(logSegments.size == 0) {
  • // no existing segments, create a new mutable segment beginning at offset 0
  • segments.put(0L, new LogSegment(dir = dir,
  • startOffset = 0,
  • indexIntervalBytes = config.indexInterval,
  • maxIndexSize = config.maxIndexSize,
  • rollJitterMs = config.randomSegmentJitter,
  • time = time,
  • fileAlreadyExists = false,
  • initFileSize = this.initFileSize(),
  • preallocate = config.preallocate))
  • } else {
  • // 进行Log的恢复工作
  • recoverLog()
  • // reset the index size of the currently active log segment to allow more entries
  • activeSegment.index.resize(config.maxIndexSize)
  • }
  • }

回顾一下它的作用有以下3个:

  1. 处理.clean、.deleted、.swap文件;
  2. 加载全部的日志文件和索引文件(可能需要重建索引);
  3. 处理可能存在的由于Broker非正常关闭导致的消息异常。

其中第2步的分析在上一篇文章中已经讲解过了,这里需要介绍第1步和第3步。

在第1步的操作中,.cleaned后缀的文件是日志压缩时使用的过渡文件,如果有该后缀的文件存在,表示在日志压缩过程出现过宕机,.cleaned文件中数据的状态不明确,无法进行恢复;.swap后缀的文件表示日志压缩已经完成了,但在替换原数据文件的过程中出现宕机,不过.swap文件中保存了日志压缩后的完整消息,可进行恢复;.deleted后缀的文件则是本来就要删除的日志文件或索引文件。处理这一步的源码如下,比较简单:

  • // kafka.log.Log#loadSegments
  • ...
  • var swapFiles = Set[File]()
  • // first do a pass through the files in the log directory and remove any temporary files
  • // and find any interrupted swap operations
  • // 遍历dir目录下的文件,处理.cleaned、.deleted、.swap文件
  • for(file <- dir.listFiles if file.isFile) {
  • // 文件不可读,抛异常
  • if(!file.canRead)
  • throw new IOException("Could not read file " + file)
  • val filename = file.getName
  • if(filename.endsWith(DeletedFileSuffix) || filename.endsWith(CleanedFileSuffix)) {
  • // if the file ends in .deleted or .cleaned, delete it
  • // 文件是以.deleted或.cleaned后缀结尾,则直接删除;
  • // .deleted表示本就标记为要删除的文件;.cleaned文件表示压缩过程出现宕机
  • file.delete()
  • } else if(filename.endsWith(SwapFileSuffix)) {
  • // we crashed in the middle of a swap operation, to recover:
  • // if a log, delete the .index file, complete the swap operation later
  • // if an index just delete it, it will be rebuilt
  • // 文件是以.swap后缀结尾;.swap文件表示压缩完的完整消息,可以使用,将其后缀去掉
  • val baseName = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, ""))
  • if(baseName.getPath.endsWith(IndexFileSuffix)) {
  • // 如果该.swap文件是索引文件,直接删除
  • file.delete()
  • } else if(baseName.getPath.endsWith(LogFileSuffix)){
  • // delete the index
  • // 如果该.swap文件是日志文件,则需要进行恢复;获取该日志文件对应的索引文件,将其删除
  • val index = new File(CoreUtils.replaceSuffix(baseName.getPath, LogFileSuffix, IndexFileSuffix))
  • index.delete()
  • // 将该文件添加到swapFiles集合
  • swapFiles += file
  • }
  • }
  • }
  • ...
  • // Finally, complete any interrupted swap operations. To be crash-safe,
  • // log files that are replaced by the swap segment should be renamed to .deleted
  • // before the swap file is restored as the new segment file.
  • // 处理前面得到的.swap文件集合
  • for (swapFile <- swapFiles) {
  • // 去掉.swap后缀的文件
  • val logFile = new File(CoreUtils.replaceSuffix(swapFile.getPath, SwapFileSuffix, ""))
  • val fileName = logFile.getName
  • // 根据文件名得到baseOffset
  • val startOffset = fileName.substring(0, fileName.length - LogFileSuffix.length).toLong
  • // 获取对应的.index.swap文件
  • val indexFile = new File(CoreUtils.replaceSuffix(logFile.getPath, LogFileSuffix, IndexFileSuffix) + SwapFileSuffix)
  • // 根据上面得到的信息创建OffsetIndex
  • val index = new OffsetIndex(indexFile, baseOffset = startOffset, maxIndexSize = config.maxIndexSize)
  • // 创建LogSegment
  • val swapSegment = new LogSegment(new FileMessageSet(file = swapFile),
  • index = index,
  • baseOffset = startOffset,
  • indexIntervalBytes = config.indexInterval,
  • rollJitterMs = config.randomSegmentJitter,
  • time = time)
  • info("Found log file %s from interrupted swap operation, repairing.".format(swapFile.getPath))
  • // 重建索引文件并验证日志文件
  • swapSegment.recover(config.maxMessageSize)
  • // 查找swapSegment对应的日志压缩前的LogSegment集合
  • val oldSegments = logSegments(swapSegment.baseOffset, swapSegment.nextOffset)
  • /**
  • * 将swapSegment放入segments跳表,
  • * 将oldSegments的LogSegment从segments跳表中删除,同时删除对应的日志文件和索引文件
  • * 将swapSegment对应日志文件的.swap后缀去掉
  • */
  • replaceSegments(swapSegment, oldSegments.toSeq, isRecoveredSwapFile = true)
  • }
  • ...

第3步的处理中,针对LogSegment集合为空Log,需要创建Active Segment,保证Log中至少有一个LogSegment;而对于LogSegment集合非空的Log,则可能需要进行恢复操作:

  • // kafka.log.Log#loadSegments
  • ...
  • // 如果segments为空,需要添加一个LogSegment作为activeSegment
  • if(logSegments.size == 0) {
  • // no existing segments, create a new mutable segment beginning at offset 0
  • segments.put(0L, new LogSegment(dir = dir,
  • startOffset = 0,
  • indexIntervalBytes = config.indexInterval,
  • maxIndexSize = config.maxIndexSize,
  • rollJitterMs = config.randomSegmentJitter,
  • time = time,
  • fileAlreadyExists = false,
  • initFileSize = this.initFileSize(),
  • preallocate = config.preallocate))
  • } else {
  • // 进行Log的恢复工作
  • recoverLog()
  • // reset the index size of the currently active log segment to allow more entries
  • activeSegment.index.resize(config.maxIndexSize)
  • }
  • ...

恢复操作统一交给recoverLog()方法处理,源码如下:

  • // kafka.log.Log#recoverLog
  • /**
  • * 进行Log的恢复工作,主要负责处理Broker非正常关闭时导致的消息异常,
  • * 需要将recoveryPoint ~ activeSegment中的所有消息进行验证,将验证失败的消息截断
  • */
  • private def recoverLog() {
  • // if we have the clean shutdown marker, skip recovery
  • // 如果broker上次是正常关闭的,则不需要恢复
  • if(hasCleanShutdownFile) {
  • // 更新recoveryPoint为activeSegment的nextOffset
  • this.recoveryPoint = activeSegment.nextOffset
  • return
  • }
  • // okay we need to actually recovery this log
  • // 非正常关闭,需要进行恢复操作,得到recoveryPoint点开始的LogSegment集合
  • val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator
  • // 遍历LogSegment集合
  • while(unflushed.hasNext) {
  • val curr = unflushed.next
  • info("Recovering unflushed segment %d in log %s.".format(curr.baseOffset, name))
  • val truncatedBytes =
  • try {
  • // 使用LogSegment的recover()方法重建索引文件并验证日志文件,验证失败的部分会被截除
  • curr.recover(config.maxMessageSize)
  • } catch {
  • case e: InvalidOffsetException =>
  • val startOffset = curr.baseOffset
  • warn("Found invalid offset during recovery for log " + dir.getName +". Deleting the corrupt segment and " +
  • "creating an empty one with starting offset " + startOffset)
  • curr.truncateTo(startOffset)
  • }
  • // LogSegment中是否有验证失败的消息
  • if(truncatedBytes > 0) {
  • // we had an invalid message, delete all remaining log
  • warn("Corruption found in segment %d of log %s, truncating to offset %d.".format(curr.baseOffset, name, curr.nextOffset))
  • // 将剩余的LogSegment全部删除
  • unflushed.foreach(deleteSegment)
  • }
  • }
  • }
  • // kafka.log.Log#hasCleanShutdownFile
  • /**
  • * Check if we have the "clean shutdown" file
  • */
  • private def hasCleanShutdownFile() = new File(dir.getParentFile, CleanShutdownFile).exists()

该方法会根据hasCleanShutdownFile来判断(也即是根据是否存在.kafka_cleanshutdown文件进行判断)是否需要进行Log的恢复,具体的恢复操作则比较简单,只需要对recoveryPoint至Active Segment中的所有消息进行验证,将验证失败的消息截断即可。