1. LogManager
LogManager用于管理一个Broker上所有的Log实例,提供加载Log、创建Log集合、删除Log集合、查询Log集合等操作。LogManager还维护了3个定时任务线程及Cleaner线程,分别用于日志刷盘、日志清理、检查点维护及日志压缩等操作。LogManager的定义及其重要字段如下:
注:这里的日志压缩与前面讲解的消息压缩并不一样,这里的日志压缩是用于删除日志中因消息键重复的较旧的消息数据,以实现日志文件的瘦身。
在构造LogManager时,logDirs
参数是通过server.properties文件配置的,表示存放日志的目录集合;Kafka允许配置多数据目录,每个数据目录都可以创建多个分区目录(对应于一个Log对象),而LogManager则管理着所有的数据目录。logCreationOrDeletionLock
是一个简单的Object对象,用于在创建或删除Log时进行加锁;logs
字段则是用于记录主题分区与Log之间的映射关系,使用Pool对象存储,Pool内部使用ConcurrentHashMap实现并发安全的字典结构。dirLocks
字段的值来自lockLogDirs(dirs: Seq[File]): Seq[FileLock]
方法的返回值,是所有数据目录的目录锁,该方法的实现如下:
目录锁的实现其实是通过在数据目录下创建一个.lock文件并且以这个文件创建FileLock作为当前文件的目录锁,lockLogDirs(...)
方法会直接尝试对FileLock上锁,最终返回所有数据目录的锁集合。
recoveryPointCheckpoints
字段是一个Map[File, OffsetCheckpoint]类型的字典结构,用于管理每个数据目录与其下的RecoveryPointCheckpoint文件之间的映射关系。在LogManager对象初始化时,会在每个数据目录下创建一个对应的RecoveryPointCheckpoint文件,RecoveryPointCheckpoint文件主要用于在Broker异常的情况下进行数据恢复等操作,其中记录了该数据目录下的所有Log的recoveryPoint
;RecoveryPointCheckpoint文件最终存储的格式如下:
- 0
- 20
- topic_test1 6 0
- topic_test2 1 0
- topic_test1 13 0
- topic_test2 3 0
- ...
其中第一行表示当前版本;第二行表示该数据目录下有多少个主题分区目录;后续每行存储了每个主题分区的RecoveryCheckpoint信息,以主题 分区编号 recovery-checkpoint值
格式存储,有多少个主题分区目录就有多少行。在后面将详细介绍RecoveryCheckpoint机制。
1.1. 定时任务
在LogManager的startup()
方法中会启动三个定时任务以及日志压缩任务,源码如下:
1.1.1. KafkaScheduler任务调度器
在认识定时任务之前,先了解一下这里用到的Scheduler调度器;Kafka通过包装JDK提供的ScheduledThreadPoolExecutor定时任务线程池,构建了自己的定时任务调度器KafkaScheduler,源码如下:
如果了解ScheduledThreadPoolExecutor的原理,KafkaScheduler的实现是非常简单的;我们主要关注几个要点:KafkaScheduler自定义了线程工厂,生成的线程将拥有特定的线程名标识(默认为”kafka-scheduler-“ + 自增序列),添加了对未知异常的捕获,同时还会根据初始化KafkaScheduler传入的参数配置是否为守护线程等。
在线程调度方面,会根据schedule(name: String, fun: ()=>Unit, delay: Long, period: Long, unit: TimeUnit)
方法的period
参数决定是创建周期任务还是非周期任务,同时还会捕获运行任务产生的异常,仅仅使用Error级别的日志进行记录。
需要注意的是KafkaScheduler的shutdown()
方法,它用于关闭线程池,但由于KafkaScheduler调度的可能是耗时非常久的任务,因此在调用了线程池shutdown()
方法关闭后采用cachedExecutor.awaitTermination(1, TimeUnit.DAYS)
的方式等待一天,以尽量完成剩余已存在的周期任务的执行。
1.1.2. Log Retention定时任务
Log Retention任务是用于LogSegment清理工作的,在前面我们已经提到过,Kafka会定期清理过期日志,清理的标准有两个:日志保存的时长(由retention.ms
配置决定,默认值7天),及日志文件的大小(由retention.bytes
配置决定,默认值-1,即无限制)。Log Retention任务会根据这两项的配置定时清理过期日志。清理的周期时间由配置项log.retention.check.interval.ms
决定(默认值300000,即300s),清理工作则主要由LogManager的cleanupLogs()
方法完成,该方法的源码如下:
cleanupLogs()
方法会遍历所有的Log对象,只处理cleanup.policy
配置为delete
的Log,主要的清理流程由cleanupExpiredSegments(log: Log): Int
和cleanupSegmentsToMaintainSize(log: Log): Int
两个方法完成,它们的源码如下:
这两个放的设计非常巧妙,它们将实际的删除操作委托给Log对象,但将是否删除的决定功能作为函数类型的参数传递给Log对象的deleteOldSegments(predicate: LogSegment => Boolean): Int
方法,从该方法的声明可以得知,predicate
是一个函数类型的参数,该函数接收一个LogSegment类型的参数,返回值为Boolean,用于决定是否删除传入的LogSegment对象;具体实现如下:
删除操作是加锁控制的,从Log对象的segments
跳表中根据参数传入的条件函数筛选出需要删除的LogSegment集合后,通过调用deleteSegment(segment: LogSegment)
方法对集合中的每一个LogSegment进行删除,该方法的源码如下:
Log对象的deleteSegment(segment: LogSegment)
方法会首先将LogSegment从segments
跳表中移除,然后调用asyncDeleteSegment(segment: LogSegment)
方法异步删除对应的索引文件和日志文件。文件的删除流程如下:
- 首先给LogSegment描述的日志文件及索引文件添加.deleted后缀;
- 向KafkaScheduler调度器添加一个”delete-file“任务,延迟一定时间后(由配置项
file.delete.delay.ms
决定,默认60000,即1分钟)再执行”delete-file“任务进行删除。
1.1.3. Log Flusher定时任务
Log Flusher定时任务会根据配置的时长(由配置项log.flush.scheduler.interval.ms
决定,默认值3000)定时对Log进行刷盘操作,保证数据的持久性;具体的刷盘工作由Log对象的flushDirtyLogs()
方法完成,源码如下:
从源码可知,最终的刷盘操作还是交给了LogSegment对象,而LogSegment对象会将刷盘操作继续交给内部的FileMessageSet和OffsetIndex完成,这部分源码在前面1.2.4. 数据刷盘
节已经介绍过了,这里就不再赘述。仅仅需要注意的是,在刷盘操作之后,Log对象的recoveryPoint
会更新为传入的offset
的值,这表明,recoveryPoint
的数据都已经刷盘持久化了。
1.1.4. RecoveryPoint Checkpoint定时任务
RecoveryPoint Checkpoint定时任务用于定时将每个Log的recoveryPoint
写入RecoveryPointCheckpoint文件中,间隔时长由配置项log.flush.offset.checkpoint.interval.ms
决定(默认值60000,即1分钟),主要的检查操作由LogManager对象的checkpointRecoveryPointOffsets()
方法完成,源码如下:
从源码可知,checkpointRecoveryPointOffsets()
方法会对所有数据目录进行遍历并对其执行checkpointLogsInDir(dir: File): Unit
方法。
这里先介绍一下logsByDir
字段,该字段的值是通过对LogManager的logsByTopicPartition
字段进行分组得到的,logsByTopicPartition
是logs
字段的字典形式,它们的定义如下:
因此logsByTopicPartition
中存储了主题分区与Log对象的映射,通过groupBy分组后,会得到Map[String, Map[TopicAndPartition, Log]]结构的字典数据,键为数据目录名,值为TopicAndPartition到Log对象的映射字典。
在checkpointLogsInDir(dir: File): Unit
方法中会从logsByDir
中获取对应数据目录的TopicAndPartition到Log对象的映射字典,然后将这些Log对象的recoveryPoint
值写入到RecoveryPointCheckpointFile中。具体的写入操作是由OffsetCheckpoint对象的write(offsets: Map[TopicAndPartition, Long])
方法完成的,源码如下:
由源码可知,写入过程是先将数据目录下的所有Log的recoveryPoint
写到名为”recovery-point-offset-checkpoint.tmp“临时文件中,然后用该临时文件替换原来的RecoveryPointCheckpoint文件。另外,RecoveryPointCheckpoint文件最终存储的格式如下:
- 0
- 20
- topic_test1 6 0
- topic_test2 1 0
- topic_test1 13 0
- topic_test2 3 0
- ...
其中第一行表示当前版本;第二行表示该数据目录下有多少个主题分区目录;后续每行存储了每个主题分区的RecoveryCheckpoint信息,以主题 分区编号 recovery-checkpoint值
格式存储,有多少个主题分区目录就有多少行。这部分内容在前面简单讲解过。对应的,OffsetCheckpoint类还提供了read(): Map[TopicAndPartition, Long]
用于读取RecoveryPointCheckpoint文件,返回一个Map[TopicAndPartition, Long]类型的字典,比较简单,源码如下:
1.2. 日志压缩
Kafka还提供了日志压缩(Log Compaction)功能,通过此功能也可以有效地减小日志文件的大小,缓解磁盘紧张的情况。在很多实践场景中,消息的key与value的值之间的对应关系是不断变化的,就像数据库中的数据记录会不断被修改一样。如果消费者只关心key对应的最新value值,可以开启Kafka的日志压缩功能,服务端会在后台启动Cleaner线程池,定期将相同key的消息进行合并,只保留最新的value值。
Log在写入消息时是将消息追加到Active Segment的日志文件末尾。为了避免Active Segment成为热点,Active Segment不参与日志压缩操作,而是只压缩其余的只读的LogSegment。在日志压缩过程中会启动多条Cleaner线程,我们可以通过调整Cleaner线程池中的线程数量,优化并发压缩的性能,减少对整个服务端性能的影响。一般情况下,Log的数据量很大,为了避免Cleaner线程与其他业务线程长时间竞争CPU,并不会将除Active Segment之外的所有LogSegment在一次压缩操作中全部处理掉,而是将这些LogSegment分批进行压缩。
每个Log都可以通过Cleaner Checkpoint切分成Clean和Dirty两部分,Clean部分表示的是之前已经被压缩过的部分,而Dirty部分则表示未压缩的部分。日志压缩操作完成后,Dirty部分消息的offset依然是连续递增的,而Clean部分消息的offset是断断续续的。
1.2.1. LogCleaner类
在LogManager的startup()
方法中,会根据cleanerConfig.enableCleane
配置(log.cleaner.enable
,默认值为true)决定是否启动压缩任务,压缩任务是由LogCleaner对象实现的:
LogCleaner对象在初始化时,会根据数据目录等参数构建LogCleanerManager对象,并根据CleanerConfig配置初始化对应数量的CleanerThread线程并由cleaners
字段进行管理:
在LogCleaner的startup()
方法中会启动所有上面创建的CleanerThread线程:
1.2.2. CleanerThread和ShutdownableThread类
CleanerThread线程其实是继承自ShutdownableThread抽象类的,该类的源码如下:
ShutdownableThread类的设计是比较简单的,内部通过shutdownLatch
来控制线程的真正结束,而在主要的run()
方法中,如果线程的isRunning
标识一直是true将不断执行doWork()
方法,doWork()
方法则是一个抽象方法,具体内容由子类实现,因此,我们需要关注CleanerThread类的doWork()
方法,源码如下:
1.2.3. LogCleanerManager和Cleaner类
从上面的代码可以看出,清理工作还涉及了LogCleanerManager对象和Cleaner对象,我们先了解这两个对象再回过头来分析。其中LogCleanerManager对象cleanerManager
是在LogCleaner中初始化的,由于CleanerThread类是LogCleaner类的内部类,所以可以访问该对象:
LogCleanerManager主要负责每个Log的压缩状态管理以及Cleaner Checkpoint信息维护和更新,它的定义和重要的字段如下:
其中checkpoints
用来维护data数据目录与Cleaner Checkpoint文件之间的对应关系;lock
是用于保护checkpoints
集合和inProgress
集合的锁;pausedCleaningCond
则构成了一个条件队列,用于控制压缩状态的装换;inProgress
用于记录正在进行清理的TopicAndPartition的压缩状态,压缩状态都定义为了LogCleaningState的子类:
日志压缩的生命周期对应着上面三个状态的转换,当开始进行日志压缩任务时会先进入LogCleaningInProgress状态;压缩任务可以被暂停,此时进入LogCleaningPaused;压缩任务若被中断,则先进入LogCleaningAborted状态,等待Cleaner线程将其中的任务中止,然后进入LogCleaningPaused状态。处于LogCleaningPaused状态的TopicAndPartition的日志不会再被压缩,直到有其他线程恢复其压缩状态。
Cleaner对象cleaner
是在CleanerThread线程类里初始化的:
Cleaner的定义和重要字段如下:
在Cleaner类的构造函数声明中有几个重要参数:
offsetMap
:该参数是一个SkimpyOffsetMap类型的Map,用于为dirty部分的消息建立key与last_offset
的映射关系。SkimpyOffsetMap使用加密算法(例如MD2、MD5、SHA-1等)计算key的hash值,并以此hash值表示key,在Map中不直接保存key的值。SkimpyOffsetMap底层使用ByteBuffer实现,冲突解决方式使用的是线性探查法。SkimpyOffsetMap只支持put()
方法和get()
方法,不支持任何删除方法。ioBufferSize
:指定读写LogSegment的ByteBuffer大小。maxIoBufferSize
:指定的消息的最大长度。dupBufferLoadFactor
:指定SkimpyOffsetMap的最大占用比例。checkDone
:用来检测Log的压缩状态。
1.2.4. 筛选需要压缩的日志
回到CleanerThread类用于压缩日志的主要方法cleanOrSleep()
,它内部调用LogCleanerManager对象的grabFilthiestLog()
获取了需要进行日志压缩的Log,该方法的源码如下:
这里需要注意的是,日志压缩时会使用名为”cleaner-offset-checkpoint“的文件记录每个日志的Clean和Dirty两个部分的分界offset,该offset之前的为已经压缩过的Clean部分,后面为Dirty部分。上面提到过,LogCleanerManager的checkpoints
字段用于维护数据目录与”cleaner-offset-checkpoint“文件之间的对应关系,该文件的结构与前面讲到的RecoveryPointCheckpoint文件是类似的,它们都使用OffsetCheckpoint类对offset信息进行管理。因此在上面代码的allCleanerCheckpoints(): Map[TopicAndPartition, Long]
方法中同样是使用OffsetCheckpoint类的read()
方法读取了”cleaner-offset-checkpoint“文件的内容并转换为类型为Map[TopicAndPartition, Long]的字典的。
grabFilthiestLog()
方法会从logs
中过滤出可以进行压缩的日志,将其封装为LogToClean对象;过滤的条件有以下几个:
- 日志主题未被标记为删除,即其
cleanup.policy
配置不可为delete
;日志即将被删除了自然没有压缩的必要了; - 日志并不处于正压缩的过程中;
- 日志内有数据;
- 日志的
cleanableRatio
大于配置的minCleanableRatio
阈值(由min.cleanable.dirty.ratio
配置),即只有Dirty部分的内容大于阈值才符合过滤条件。
过滤出的日志会根据”cleaner-offset-checkpoint“文件记录的Clean Offset及其中第一条消息的baseOffset
比对得出最终的Dirty Offset,该Dirty Offset就是需要压缩的起始offset。
经过过滤得到的会是一个LogToClean对象的集合,对应了所有符合压缩条件的Log对象,然后会从该集合中取出最大的cleanableRatio
的值作为LogCleanerManager对象的dirtiestLogCleanableRatio
字段的值,这个过程其实是通过LogToClean的compare(that: LogToClean): Int
进行的:
最终选取要压缩的Log,即从LogToClean集合中选择cleanableRatio
值最大的对象作为压缩目标,将其TopicAndPartition作为键添加到inProgress
字典中,值为LogCleaningInProgress表示该Log已经进入了压缩过程。
1.2.5. 日志压缩过程
在通过grabFilthiestLog()
得到过滤结果之后,会对其进行判断,如果得到的结果中并没有包含任何LogToClean对象,就backOffWaitLatch
退避锁退避一段时间后再重试,如果结果中有LogToClean对象,就将其传给Cleaner对象的clean(cleanable: LogToClean): Long
方法进行压缩,压缩的过程主要有以下几步:
- 遍历需要压缩的日志所对应的消息数据,范围从First Dirty Offset至Active Segment的
baseOffset
,以填充OffsetMap。在OffsetMap中记录每个key应该保留的消息的offset。当OffsetMap被填充满时,就可以确定日志压缩的实际上限endOffset
。 - 根据
deleteRetentionMs
(由delete.retention.ms
配置)配置,计算可以安全删除的“删除标识”(即value为空的消息)的LogSegment。 - 将
logStartOffset
到endOffset
之间的LogSegmment进行分组。 - 按照第3步得到的分组分别进行日志压缩。
clean(...)
方法的源码如下:
下面将分步进行详细介绍。
1.2.5.1. 扫描消息并填充OffsetMap
第一步操作是由buildOffsetMap(log: Log, start: Long, end: Long, map: OffsetMap): Long
方法完成的,该方法从First Dirty Offset开始遍历LogSegment直至Active Segment的baseOffset
以填充offsetMap
;前面提到过,offsetMap
是一个SkimpyOffsetMap类型的Map,用于为dirty部分的消息建立key与last_offset
的映射关系;buildOffsetMap(...)
方法的源码如下:
buildOffsetMap(...)
方法会遍历包含的消息的offset落在start
至end
之间的LogSegment,通过buildOffsetMapForSegment(topicAndPartition: TopicAndPartition, segment: LogSegment, map: OffsetMap): Long
方法建立每个LogSegment对应的符合OffsetMap要求的数据,并填充到offsetMap
中,该方法源码如下:
在方法buildOffsetMapForSegment(...)
遍历消息的过程中是需要读取消息数据的,在向SkimpyOffsetMap类型的offsetMap
中添加数据时,如果遇到相同的键,新的offset会把旧的offset覆盖掉,这部分的源码如下:
buildOffsetMapForSegment(...)
方法会遍历完LogSegment中所有的消息或直到offsetMap
被装满,当offsetMap
被装满时会返回-1,否则返回遍历到的最后一条消息的offset。
1.2.5.2. 计算可安全删除的时间段
在offsetMap
填充完后,回到clean(cleanable: LogToClean): Long
方法中,此时会计算可以安全删除”删除标识“的LogSegment,得到的计算结果称为deleteHorizonMs
,这个值表示,从deleteHorizonMs
的这个时间点至First Dirty Offset点的LogSegment的最后修改时间点的这段时间内的消息数据中,如果消息的value为空可以直接删除,如下图所示:
1.2.5.3. LogSegment分组
接下来要开始遍历消息数据了,先获取当前Log中offset从0至endOffset
之间的LogSegment集合,然后使用groupSegmentsBySize(segments: Iterable[LogSegment], maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]]
方法对该LogSegment集合进行分组,即每个LogSegment只属于一组,一个LogSegment不会被切分到多个组中。在分组过程中,除了限制了每个分组中消息的总字节数,还限制了每个分组对应索引的总字节数。该分组方法的源码如下:
1.2.5.4. 进行压缩
分完组后,会通过cleanSegments(log: Log, segments: Seq[LogSegment], map: OffsetMap, deleteHorizonMs: Long)
方法对每个分组的LogSegment进行压缩处理,压缩操作主要有以下几步:
- 为日志文件和索引文件分别创建以原文件名加上“.cleaned”后缀的新的文件,为两个文件分别创建对应的FileMessageSet和OffsetIndex对象,同时根据这两个对象构建LogSegment对象,用于存放压缩后的数据。
- 遍历LogSegment集合,计算每个LogSegment的最后修改时间是否位于安全删除时间段内。
- 调用
cleanInto(topicAndPartition: TopicAndPartition, source: LogSegment, dest: LogSegment, map: OffsetMap, retainDeletes: Boolean, messageFormatVersion: Byte)
方法对遍历的LogSegment进行清理,清理后的数据将存放到的新的日志文件和索引文件中(即以.cleaned后缀结尾的文件)。 - 裁剪索引文件,删除多余空间;将新的日志文件和索引文件刷盘持久化,并更新它们的最后修改时间。
- 替换Log的
segments
跳表里原来的旧LogSegment为新的LogSegment,流程如下:- 首先将两个后缀为.cleaned的新文件修改为.swap后缀;
- 然后将新的LogSegment加入到Log.segments跳表,从Log.segments跳表中删除旧的LogSegment;
- 将旧LogSegment对应的日志文件和索引文件删除,将.swap文件的.swap后缀删除。
该方法的源码如下:
cleanSegments(...)
方法的源码是比较清晰的,这里主要讨论一下第3步中涉及到的`cleanInto(...)
方法,该方法也是负责对日志进行压缩的主要方法:
该方法的逻辑是比较简单的,遍历LogSegment中的每条消息数据,判断是否需要保留消息,如果需要保留就将消息数据及offset写入到写缓冲区中,最后根据写缓冲区构建ByteBufferMessageSet对象,添加到新的LogSegment中。
可以发现,其中判断消息是否应该保留的方法是shouldRetainMessage(source: kafka.log.LogSegment, map: kafka.log.OffsetMap, retainDeletes: Boolean, entry: kafka.message.MessageAndOffset): Boolean
,源码如下:
在shouldRetainMessage(...)
方法中,没有键的消息会设为不保留;对有键的消息还需要根据键从offsetMap
中查找对应记录的offset,如果查找到的offset比该消息的offset还要大,说明该消息是过期消息,将不予保留;同时还会根据retainDeletes
参数来决定当前消息如果没有值是否需要丢弃。
至此,日志的压缩操作就讲解完了;日志压缩几乎是日志操作中最为复杂的操作,其原理虽然简单,但实现上面却非常复杂,整个过程需要对消息数据进行两次读取和一次写出。需要注意的是,日志压缩不仅删除了旧的键重复的消息,还会对键为空、值为空且处于安全删除期内的消息一并进行清理。
1.3. 辅助操作
LogManager还提供了几个辅助的方法用于创建、获取和删除Log,其中创建和删除操作时需要加锁控制并发的;下面分别对它们进行介绍。
1.3.1. 创建Log
创建Log操作由createLog(topicAndPartition: TopicAndPartition, config: LogConfig): Log
方法实现,需要传入表示主题和分区的TopicAndPartition对象及其配置:
其中比较重要的是数据目录的选择,由于Kafka支持配置多数据目录,因此在每次创建Log对象对应的目录时需要选择已存放分区目录最少的数据目录,这个功能由nextLogDir()
实现,源码比较简单:
1.3.2. 获取Log
获取Log由getLog(topicAndPartition: TopicAndPartition): Option[Log]
方法实现,比较简单:
1.3.3. 删除Log
删除Log由deleteLog(topicAndPartition: TopicAndPartition)
方法实现:
在删除Log时,会先从logs
池中将其移除,然后停止其日志压缩操作,最后删除对应的日志文件、索引文件和目录。
2. 日志系统的初始化
LogManager除了在startup()
方法中定义了定时任务及日志压缩任务之外,在构造LogManager对象时还进行了一系列的日志系统的初始化工作,回顾它的源码:
从源码可知,在LogManager构建时还调用了createAndValidateLogDirs(dirs: Seq[File])
和loadLogs()
方法;createAndValidateLogDirs(dirs: Seq[File])
方法用于创建并检查数据目录,源码如下:
createAndValidateLogDirs(...)
会保证数据目录存在(如果不存在就新创建)并且是可读的。
loadLogs()
方法的作用主要有以下几个:
- 为每个数据目录分配一个有
ioThreads
条线程的线程池,用来执行恢复操作。 - 检测Broker上次关闭是否正常,并设置Broker的状态。在Broker正常关闭时,会创建一个“.kafka_cleanshutdown”的文件,这里就是通过此文件进行判断的。
- 读取RecoveryPointCheckpoint文件,载入每个Log的
recoveryPoint
。 - 为每个Log创建一个恢复任务,交给线程池处理。主线程阻塞等待所有的恢复任务完成,并关闭所有在第1步中创建的线程池。
该方法的源码如下:
loadLogs()
的源码在流程上是非常清晰的,前面的第1 ~ 3步的实现都比较简单,重要的操作全在第4步中,这一步会对每个数据目录做同样的操作,抽取出来的操作代码如下:
可以发现,在该过程中会根据数据目录、配置信息、对应的RecoveryPoint、调度器等信息创建Log对象,然后将该Log对象添加到logs
池中由LogManager进行管理。
而创建Log对象的过程,会调用它极其重要的方法loadSegments()
,用来加载它所管辖的LogSegment对象;这个方法在上一篇文章中已经简单介绍过了,先贴出源码:
回顾一下它的作用有以下3个:
- 处理.clean、.deleted、.swap文件;
- 加载全部的日志文件和索引文件(可能需要重建索引);
- 处理可能存在的由于Broker非正常关闭导致的消息异常。
其中第2步的分析在上一篇文章中已经讲解过了,这里需要介绍第1步和第3步。
在第1步的操作中,.cleaned后缀的文件是日志压缩时使用的过渡文件,如果有该后缀的文件存在,表示在日志压缩过程出现过宕机,.cleaned文件中数据的状态不明确,无法进行恢复;.swap后缀的文件表示日志压缩已经完成了,但在替换原数据文件的过程中出现宕机,不过.swap文件中保存了日志压缩后的完整消息,可进行恢复;.deleted后缀的文件则是本来就要删除的日志文件或索引文件。处理这一步的源码如下,比较简单:
第3步的处理中,针对LogSegment集合为空Log,需要创建Active Segment,保证Log中至少有一个LogSegment;而对于LogSegment集合非空的Log,则可能需要进行恢复操作:
恢复操作统一交给recoverLog()
方法处理,源码如下:
该方法会根据hasCleanShutdownFile
来判断(也即是根据是否存在.kafka_cleanshutdown文件进行判断)是否需要进行Log的恢复,具体的恢复操作则比较简单,只需要对recoveryPoint
至Active Segment中的所有消息进行验证,将验证失败的消息截断即可。
推荐阅读
Java多线程 46 - ScheduledThreadPoolExecutor详解(2)
ScheduledThreadPoolExecutor用于执行周期性或延时性的定时任务,它是在ThreadPoolExe...