大数据
流式处理
Kafka
源码解析

Kafka系列 14 - 服务端源码分析 05:时间轮的实现

简介:主要讲解时间轮算法和Kafka中的时间轮实现

1. 时间轮的基本概念

时间轮是一种非常常见的数据结构,主要用于管理定时任务,其思想应用范围非常广泛,各种操作系统的定时任务调度、Crontab、基于Java的通信框架Netty中都有时间轮的实现,几乎所有的时间任务调度系统采用的都是时间轮的思想。时间轮的实现方式有两种:Round型或分层型。我们先简单认识时间轮的基本概念,然后介绍Kafka中时间轮的实现。

定时任务的需求一般是两种:约定一段时间后执行和约定某个时间点执行;一般来说,二者可以相互转换。例如当前时间是上午9点,任务A要求在1个小时后执行,可以转换为在上午10点时执行;任务B约定在上午11点时执行,也可以转换为约定在2个小时后执行,如图所示:

1.绝对时间和相对时间的相互转换.png

1.1. 重复执行的任务

时间轮管理定时任务的思想是,将定时任务放置到一个刻有时间刻度的轮盘上,然后推进时间指针,指针指到的任务即是此刻要执行的任务。但我们还需要考虑重复执行的任务,这个问题比较好解决,当一个任务定义为重复任务时,只需要在时间指针推进到执行该任务时,执行完任务后重新将任务添加到该时刻上即可,在下次时间指针再次推进到该时刻就又可以执行该任务了;依次重复这个动作,就可以实现任务的重复,当某时需要取消该重复任务,只需要将该任务从时间轮上移除即可;示意图如下:

2.重复执行的任务处理.png

1.2. 同一时刻多个执行任务

对于同一时刻存在多个定时任务的情况,处理方式可以采用时间轮+链表对任务进行分批管理,也就是说,将在每个时刻只放置一个定时任务,转换为每个时间放置一个链表,这个链表里的任务都是需要在该时刻执行的,示意图如下:

3.同一时刻执行多个任务.png

1.3. 时间刻度不足的情况

在现实情况中,定时任务可能不只是按照小时来规划的,也有可能按照天、周、月等时间跨度进行规划的,例如Linux系统中的Crontab是按照分、时、日、月、周五个跨度进行任务的规划。按照我们上面的情况,按照1小时的跨度、共12个刻度进行规划是无法满足的。解决这种缺陷有三种方法:

  1. 增加时间刻度;例如既有按照小时的跨度,也有周的跨度进行任务时,可以将刻度调整为168个(一周有168个小时),刻度间隔为1小时,这样就能满足了。
  2. 使用Round方式;例如时间轮只有24个刻度,刻度间隔依旧是1小时,对于以周为跨度的定时任务,可以增加一个Round记录,每次时间指针推进到该任务所在的时刻说明距离上次已经过了24小时,此时将Round值减1,当Round值为0时,说明到了执行时刻。
  3. 使用多层时间轮;例如总跨度24小时为一个轮,总跨度168小时为一个轮,不同执行时刻的任务放置在不同的轮上,每个轮自己有推进的指针用于执行自己的任务。

对于上面的解释,大家可能非常模糊,并不知道具体是如何实现的,下面将对着三种方式分别进行介绍。先假设有3个定时任务,在一周的时间内:

  1. 任务A需要在每天的9点执行;
  2. 任务B需要在本周二的12点执行;
  3. 任务C需要在本周四的14点执行。

1.3.1. 增加时间刻度

我们可以以一周的小时数作为时间轮的总刻度,一周有7 * 24 = 168个小时,也即是有168个刻度,我们以每周第一天为周一计算,通过时间的换算,可以得到以下结论:

  1. 任务A需要在第9(0 * 24 + 9)、33(1 * 24 + 9)、57(2 * 24 + 9)、81(3 * 24 + 9)、105(4 * 24 + 9)、129(5 * 24 + 9)、153(6 * 24 + 9)小时时刻都要执行一次;
  2. 任务B需要在第36(1 * 24 + 12)个小时时刻执行一次;
  3. 任务C需要在第86(3 * 24 + 14)个小时时刻执行一次。

有以下的示意图:

4.增加时间刻度处理大跨度时间.png

1.3.2. 使用Round

使用Round的方式其实很好理解,与我们日常生活中,以24小时表示一天,如果一周是7天,只需要时钟转7圈就可以达到,这里的“7天”就类似于7个Round。对于上面A、B、C三个任务,时间轮依旧使用总跨度24小时、刻度间隔为1小时,而我们只需要给它们添加一个Round标记用于记录该任务是处于第几个Round才执行即可;每次时间指针推进到任务所在的时刻,Round减1,只有当Round为0时才执行任务:

  1. 对于任务A,因为它每天都会执行,因此它其实是一个重复任务,它的Round为0,只需要每次执行完后重新在该时刻设置新的任务即可;
  2. 对于任务B,它需要在本周二的12点执行,因此它的Round应该是2,放置在12点的时刻,第一次经过任务B,Round减1后为1,不执行任务;第二次经过任务B,Round减1后为0,表示任务B应该执行了;
  3. 对于任务C则与任务B一样,它的Round应该是4,放置在14点的时刻。

Round方式的示意图如下:

5.Round方式处理大跨度时间.png

1.3.3. 多层时间轮

从名称就可以得知,多层时间轮由多个时间轮组成,每个时间轮的跨度不同;例如对于上面的情况,我们可以设置一个天轮和一个周轮,天轮的刻度间隔为1小时,一共有24个跨度,周轮的刻度间隔为1天,一共有7个跨度;当某个任务的执行时间大于天轮的总跨度时,就将其放置在周轮上;当周轮上的某个任务距离执行时间不足一个周轮跨度,就将其移到天轮上等待执行。

针对于上面的A、B、C三个任务,我们有以下的时序示意图:

6.多层时间轮方式处理大跨度时间.png

2. Kafka的时间轮实现

通过上面的介绍,相信大家对时间轮这种数据结构有了一定的认识,接下来我们开始讲解Kafka中时间轮的实现。Kafka的时间轮是使用多层时间轮来实现的,对同一时刻的任务也是通过链表的形式进行组织的。

2.1. TimingWheel类

TimingWheel类是用于描述时间轮的类,它的底层通过数组实现,数组的每个位置存放一个TimerTaskList对象,TimerTaskList是一个环形双向链表,其中每个链表项TimeTaskEntry中封装了真正的定时任务TimerTask。TimerTaskList使用expiration字段记录了整个TimerTaskList的超时时间。TimerTaskEntry中的expirationMs字段记录了超时时间戳,timerTask字段指向了对应的TimerTask任务。TimeTask中的delayMs记录了任务的延迟时间,timerTaskEntry字段记录了对应的TimerTaskEntry对象。这三个对象是TimingWheel实现的基础。

我们先来观察TimingWheel的定义和其中重要的属性:

  • /*
  • * @param tickMs 当前时间轮中一个时间格表示的时间跨度
  • * @param wheelSize 当前时间轮的格数,也是buckets数组的大小
  • * @param startMs 当前时间轮的创建时间
  • * @param taskCounter 各层级时间轮中任务的总数
  • * @param queue DelayQueue类型,整个层级时间轮共用的一个任务队列,泛型为TimerTaskList(实现了Delayed接口)
  • */
  • @nonthreadsafe
  • private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, taskCounter: AtomicInteger, queue: DelayQueue[TimerTaskList]) {
  • /**
  • * 当前时间轮的时间跨度,即tickMs*wheelSize。
  • * 当前时间轮只能处理时间范围在currentTime ~ currentTime+tickMs*WheelSize之间的定时任务,
  • * 超过这个范围,则需要将任务添加到上层时间轮中。
  • */
  • private[this] val interval = tickMs * wheelSize
  • /**
  • * 每一项都对应时间轮中的一个时间格,用于保存TimerTaskList的;
  • * 在TimingWheel中,同一个TimerTaskList中的不同定时任务的到期时间可能不同,但是相差时间在一个时间格的范围内。
  • */
  • private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => new TimerTaskList(taskCounter) }
  • /**
  • * 时间轮的指针,将整个时间轮划分为到期部分和未到期部分。
  • * 在初始化时,currentTime被修剪成tickMs的倍数,近似等于创建时间,但并不是严格的创建时间。
  • */
  • private[this] var currentTime = startMs - (startMs % tickMs) // rounding down to multiple of tickMs
  • // overflowWheel can potentially be updated and read by two concurrent threads through add().
  • // Therefore, it needs to be volatile due to the issue of Double-Checked Locking pattern with JVM
  • // 上层时间轮的引用
  • @volatile private[this] var overflowWheel: TimingWheel = null
  • ...
  • }

上面的注释已经解释得比较清楚了,在初始化TimingWheel对象时,可以通过tickMs指定时间格跨度,wheelSize则表示时间轮的总格数,通过tickMs * wheelSize就可以得到时间轮总的时间跨度interval;每个时间格通过buckets的一个元素来组织任务,而每个时间格中都存放了一个TimerTaskList链表,TimerTaskList实现了Delayed接口。queue是整个多层时间轮共用的一个任务队列,类型为DelayQueue[TimerTaskList]。

另外我们需要注意的是,当前时间格的时间在初始化时是通过startMs - (startMs % tickMs)来获取的,也就是说当前时间是会被修建为tickMs的倍数,近似等于创建时间,但并不是严格的创建时间。

由于是多层时间轮,当前时间轮可以通过overflowWheel引用它的上层时间轮,通过提供addOverflowWheel()用于添加上层时间轮:

  • // kafka.utils.timer.TimingWheel#addOverflowWheel
  • // 添加上层时间轮
  • private[this] def addOverflowWheel(): Unit = {
  • // 加锁
  • synchronized {
  • if (overflowWheel == null) {
  • /**
  • * 创建上层时间轮,具有以下特点
  • * 1. 上层时间轮的时间格是本层时间轮的总跨度
  • * 2. 上层时间轮的格数与本层时间轮相同
  • * 3. 上层时间轮的启动时间是当前时间
  • * 4. 上层时间轮的任务总数与本层时间轮相同
  • * 5. 上层时间轮的任务队列是本层时间轮的任务队列
  • *
  • * 注意:这里可以看出,所有的时间轮使用的任务队列其实是同一个,但因为每个时间轮的时间格跨度是下层时间轮的总跨度
  • * 因此任务队列任务的过期时间之间的间隔其实是逐渐递增的,随着任务的过期时间越来越近,任务会从上层时间轮逐步降级到下层时间轮
  • */
  • overflowWheel = new TimingWheel(
  • tickMs = interval,
  • wheelSize = wheelSize,
  • startMs = currentTime,
  • taskCounter = taskCounter,
  • queue
  • )
  • }
  • }
  • }

从源码可以得知创建上层时间轮具有以下几个特点:

  1. 上层时间轮的时间格是本层时间轮的总跨度;
  2. 上层时间轮的格数与本层时间轮相同;
  3. 上层时间轮的启动时间是当前时间;
  4. 上层时间轮的任务总数与本层时间轮相同;
  5. 上层时间轮的任务队列是本层时间轮的任务队列。

这里可以看出,所有的时间轮使用的任务队列其实是同一个,但因为每个时间轮的时间格间隔是下层时间轮的总跨度,因此任务队列中任务的过期时间之间的间隔其实是逐渐递增的,随着任务的过期时间越来越近,任务会从上层时间轮逐步降级到下层时间轮。

add(timerTaskEntry: TimerTaskEntry): Boolean方法用于将TimerTaskEntry类型的任务对象添加到当前时间轮中,返回值表示是否添加成功,该方法源码如下:

  • // kafka.utils.timer.TimingWheel#add
  • /**
  • * 添加定时任务
  • * @param timerTaskEntry 任务实体,实现了Ordered特质
  • * @return 是否添加成功
  • */
  • def add(timerTaskEntry: TimerTaskEntry): Boolean = {
  • // 获取任务的过期时间
  • val expiration = timerTaskEntry.expirationMs
  • if (timerTaskEntry.cancelled) {
  • // Cancelled
  • // 任务状态为取消,直接返回false
  • false
  • } else if (expiration < currentTime + tickMs) {
  • // Already expired
  • // 任务已过期,直接返回false(过期时间当在前时间格内)
  • false
  • } else if (expiration < currentTime + interval) {
  • // Put in its own bucket
  • // 任务的过期时间符合时间轮的跨度,可以将其添加到时间轮中
  • // 计算任务应该存放的时间格的序号
  • val virtualId = expiration / tickMs
  • // 得到时间格
  • val bucket = buckets((virtualId % wheelSize.toLong).toInt)
  • // 向时间格中添加任务
  • bucket.add(timerTaskEntry)
  • // Set the bucket expiration time
  • // 将任务的过期时间设置为所在时间格的到期时间
  • if (bucket.setExpiration(virtualId * tickMs)) { // 如果设置失败,将任务移除
  • // The bucket needs to be enqueued because it was an expired bucket
  • // We only need to enqueue the bucket when its expiration time has changed, i.e. the wheel has advanced
  • // and the previous buckets gets reused; further calls to set the expiration within the same wheel cycle
  • // will pass in the same value and hence return false, thus the bucket with the same expiration will not
  • // be enqueued multiple times.
  • queue.offer(bucket)
  • }
  • // 添加任务成功,返回true
  • true
  • } else {
  • // Out of the interval. Put it into the parent timer
  • // 任务的过期时间超过了当前时间轮的总跨度,需要添加到上层的时间轮中
  • // 上层时间轮不存在,则创建
  • if (overflowWheel == null) addOverflowWheel()
  • // 创建完后添加到上层时间轮
  • overflowWheel.add(timerTaskEntry)
  • }
  • }

该方法在任务已过期或已取消的情况下会直接返回false,不做任何操作;在任务的超时时间距离当前时间的间隔小于当前时间轮总跨度时会将其添加到当前时间轮中,否则将其添加到上层时间轮中;;这里会判断是否存在上层时间轮,如果没有就通过addOverflowWheel()进行创建。

advanceClock(timeMs: Long): Unit方法用于推进当前时间轮的指针向前到timeMs时刻,会将timeMs转换为tickMs倍数,需要注意的是,这个方法会将推进操作逐级传递到上层时间轮,源码较为简单:

  • // kafka.utils.timer.TimingWheel#advanceClock
  • // Try to advance the clock
  • // 尝试推进当前时间轮的currentTime指针
  • def advanceClock(timeMs: Long): Unit = {
  • if (timeMs >= currentTime + tickMs) {
  • // 推进当前时间轮的指针
  • currentTime = timeMs - (timeMs % tickMs)
  • // Try to advance the clock of the overflow wheel if present
  • // 尝试推进上层时间轮的指针
  • if (overflowWheel != null) overflowWheel.advanceClock(currentTime)
  • }
  • }

2.2. TimerTaskList、TimerTaskEntry和TimerTask类

上面讲到过,TimerTaskList是用于存放TimerTaskEntry的双向循环链表;TimerTaskEntry是用于包装TimerTask的类,作为TimerTaskList中的节点;TimerTask则是一个特质,定义了定时任务的公共属性和方法;它们的源码都比较简单,如下:

  • kafka.utils.timer.TimerTaskList类:
  • // kafka.utils.timer.TimerTaskList
  • /**
  • * @param taskCounter 任务数量
  • */
  • @threadsafe
  • private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed {
  • // TimerTaskList forms a doubly linked cyclic list using a dummy root entry
  • // root.next points to the head
  • // root.prev points to the tail
  • // root节点,其TimerTask为null,过期时间为-1
  • private[this] val root = new TimerTaskEntry(null, -1)
  • // 初始化时root节点的前驱和后继都指向自己
  • root.next = root
  • root.prev = root
  • // 过期时间
  • private[this] val expiration = new AtomicLong(-1L)
  • // Set the bucket's expiration time
  • // Returns true if the expiration time is changed
  • // 设置过期时间
  • def setExpiration(expirationMs: Long): Boolean = {
  • expiration.getAndSet(expirationMs) != expirationMs
  • }
  • // Get the bucket's expiration time
  • // 获取过期时间
  • def getExpiration(): Long = {
  • expiration.get()
  • }
  • // Apply the supplied function to each of tasks in this list
  • // 遍历所有的TimerTask对象,对其执行f操作
  • def foreach(f: (TimerTask)=>Unit): Unit = {
  • synchronized {
  • var entry = root.next
  • // 遍历
  • while (entry ne root) {
  • val nextEntry = entry.next
  • // 任务未取消时,对其执行f操作
  • if (!entry.cancelled) f(entry.timerTask)
  • // 后移
  • entry = nextEntry
  • }
  • }
  • }
  • // Add a timer task entry to this list
  • // 添加TimerTaskEntry
  • def add(timerTaskEntry: TimerTaskEntry): Unit = {
  • var done = false
  • // 循环尝试
  • while (!done) {
  • // Remove the timer task entry if it is already in any other list
  • // We do this outside of the sync block below to avoid deadlocking.
  • // We may retry until timerTaskEntry.list becomes null.
  • // 先将其从原TimerTaskList链中移除
  • timerTaskEntry.remove()
  • synchronized {
  • timerTaskEntry.synchronized {
  • if (timerTaskEntry.list == null) {
  • // put the timer task entry to the end of the list. (root.prev points to the tail entry)
  • // 将其链接到链表尾
  • val tail = root.prev
  • timerTaskEntry.next = root
  • timerTaskEntry.prev = tail
  • timerTaskEntry.list = this
  • tail.next = timerTaskEntry
  • root.prev = timerTaskEntry
  • // 更新Task数量计数器
  • taskCounter.incrementAndGet()
  • // 标识完成
  • done = true
  • }
  • }
  • }
  • }
  • }
  • // Remove the specified timer task entry from this list
  • // 从链表中移除TimerTaskEntry
  • def remove(timerTaskEntry: TimerTaskEntry): Unit = {
  • synchronized {
  • timerTaskEntry.synchronized {
  • // TimerTaskEntry中记录了自己所在的TimerTaskList链,判断是否与当前TimerTaskList是同一个
  • if (timerTaskEntry.list eq this) {
  • // 后继节点的前驱改为自己的前驱
  • timerTaskEntry.next.prev = timerTaskEntry.prev
  • // 前驱节点的后继改为自己的后继
  • timerTaskEntry.prev.next = timerTaskEntry.next
  • // 将自己的后继和前驱都设为null
  • timerTaskEntry.next = null
  • timerTaskEntry.prev = null
  • // 将自己引用的当前链表设为null
  • timerTaskEntry.list = null
  • // 维护链表的任务个数
  • taskCounter.decrementAndGet()
  • }
  • }
  • }
  • }
  • // Remove all task entries and apply the supplied function to each of them
  • // 对所有TimerTaskEntry执行f操作并将其从链表中移除
  • def flush(f: (TimerTaskEntry)=>Unit): Unit = {
  • synchronized {
  • // 头节点
  • var head = root.next
  • // 从头向尾遍历链表
  • while (head ne root) {
  • // 先从链表中移除
  • remove(head)
  • // 调用传入的方法参数进行处理
  • f(head)
  • // 移向下一个元素
  • head = root.next
  • }
  • // 重置过期时间
  • expiration.set(-1L)
  • }
  • }
  • // 获取延迟时间
  • def getDelay(unit: TimeUnit): Long = {
  • unit.convert(max(getExpiration - SystemTime.milliseconds, 0), TimeUnit.MILLISECONDS)
  • }
  • // 比较两个TimerTaskList链,是以过期时间进行比较的
  • def compareTo(d: Delayed): Int = {
  • // 类型转换
  • val other = d.asInstanceOf[TimerTaskList]
  • // 比较过期时间
  • if(getExpiration < other.getExpiration) -1
  • else if(getExpiration > other.getExpiration) 1
  • else 0
  • }
  • }
  • kafka.utils.timer.TimerTaskEntry类:
  • // kafka.utils.timer.TimerTaskEntry
  • /**
  • * @param timerTask 任务对象
  • * @param expirationMs 过期时间
  • */
  • private[timer] class TimerTaskEntry(val timerTask: TimerTask, val expirationMs: Long) extends Ordered[TimerTaskEntry] {
  • @volatile
  • // 当前TimerTaskEntry所在TimerTaskList链
  • var list: TimerTaskList = null
  • // 后继TimerTaskEntry
  • var next: TimerTaskEntry = null
  • // 前驱TimerTaskEntry
  • var prev: TimerTaskEntry = null
  • // if this timerTask is already held by an existing timer task entry,
  • // setTimerTaskEntry will remove it.
  • // 在TimerTask中记录TimerTaskEntry
  • if (timerTask != null) timerTask.setTimerTaskEntry(this)
  • // 任务是否取消
  • def cancelled: Boolean = {
  • // TimerTask中记录TimerTaskEntry不是自己说明取消了
  • timerTask.getTimerTaskEntry != this
  • }
  • // 将当前TimerTaskEntry从所在的TimerTaskList链中移除
  • def remove(): Unit = {
  • var currentList = list
  • // If remove is called when another thread is moving the entry from a task entry list to another,
  • // this may fail to remove the entry due to the change of value of list. Thus, we retry until the list becomes null.
  • // In a rare case, this thread sees null and exits the loop, but the other thread insert the entry to another list later.
  • while (currentList != null) {
  • currentList.remove(this)
  • currentList = list
  • }
  • }
  • // 比较两个TimerTaskEntry,通过过期时间进行比较
  • override def compare(that: TimerTaskEntry): Int = {
  • this.expirationMs compare that.expirationMs
  • }
  • }
  • kafka.utils.timer.TimerTask特质:
  • // kafka.utils.timer.TimerTask
  • // TimerTask特质
  • trait TimerTask extends Runnable {
  • // 延迟时间
  • val delayMs: Long // timestamp in millisecond
  • // 包装TimerTask的TimerTaskEntry
  • private[this] var timerTaskEntry: TimerTaskEntry = null
  • // 取消TimerTask
  • def cancel(): Unit = {
  • synchronized {
  • // 将当前任务的timerTaskEntry从TimerTaskList中移除
  • if (timerTaskEntry != null) timerTaskEntry.remove()
  • // 将所在的TimerTaskEntry置为null
  • timerTaskEntry = null
  • }
  • }
  • private[timer] def setTimerTaskEntry(entry: TimerTaskEntry): Unit = {
  • synchronized {
  • // if this timerTask is already held by an existing timer task entry,
  • // we will remove such an entry first.
  • // 先将旧的TimerTaskEntry从TimerTaskList链移除
  • if (timerTaskEntry != null && timerTaskEntry != entry)
  • timerTaskEntry.remove()
  • // 然后进行设置
  • timerTaskEntry = entry
  • }
  • }
  • // 获取当前TimerTask的TimerTaskEntry
  • private[timer] def getTimerTaskEntry(): TimerTaskEntry = {
  • timerTaskEntry
  • }
  • }

2.3. DelayedOperation类

DelayedOperation类是继承自TimerTask特质的抽象类,它内部实现了某些特定的方法,但也保留了一些抽象方法供其子类实现。常见的DelayedOperation类的子类有DelayedProduce、DelayedFetch、DelayedHeartbeat、DelayedJoin等。我们这里首先了解一下DelayedOperation的源码:

  • // kafka.server.DelayedOperation
  • abstract class DelayedOperation(override val delayMs: Long) extends TimerTask with Logging {
  • private val completed = new AtomicBoolean(false)
  • /*
  • * Force completing the delayed operation, if not already completed.
  • * This function can be triggered when
  • *
  • * 1. The operation has been verified to be completable inside tryComplete()
  • * 2. The operation has expired and hence needs to be completed right now
  • *
  • * Return true iff the operation is completed by the caller: note that
  • * concurrent threads can try to complete the same operation, but only
  • * the first thread will succeed in completing the operation and return
  • * true, others will still return false
  • */
  • def forceComplete(): Boolean = {
  • // CAS修改completed字段为true,标识延迟操作已完成
  • if (completed.compareAndSet(false, true)) {
  • // cancel the timeout timer
  • // 修改成功,调用TimerTask的cancel()方法将其从TimerTaskList中删除
  • cancel()
  • // 调用onComplete()方法
  • onComplete()
  • true
  • } else {
  • false
  • }
  • }
  • /**
  • * Check if the delayed operation is already completed
  • * 检测任务是否完成
  • */
  • def isCompleted(): Boolean = completed.get()
  • /**
  • * Call-back to execute when a delayed operation gets expired and hence forced to complete.
  • * 抽象方法,DelayedOperation到期时执行的具体逻辑
  • */
  • def onExpiration(): Unit
  • /**
  • * Process for completing an operation; This function needs to be defined
  • * in subclasses and will be called exactly once in forceComplete()
  • * 抽象方法,DelayedOperation的具体业务逻辑
  • * 在DelayedOperation的整个生命周期中只能被调用一次
  • */
  • def onComplete(): Unit
  • /*
  • * Try to complete the delayed operation by first checking if the operation
  • * can be completed by now. If yes execute the completion logic by calling
  • * forceComplete() and return true iff forceComplete returns true; otherwise return false
  • *
  • * This function needs to be defined in subclasses
  • * 抽象方法,在该方法中子类会根据自身的具体类型,检测执行条件是否满足,若满足则会调用forceComplete()完成延迟操作
  • */
  • def tryComplete(): Boolean
  • /*
  • * run() method defines a task that is executed on timeout
  • * DelayedOperation到期时会提交到SystemTimer.taskExecutor线程池中执行。
  • * 其中会调用forceComplete()方法完成延迟操作,然后调用onExpiration()方法执行延迟操作到期执行的相关代码
  • */
  • override def run(): Unit = {
  • if (forceComplete())
  • onExpiration()
  • }
  • }

上面讲到过,DelayedOperation的父类TimerTask是一个继承自Runnable接口的特质,因此我们应该关注DelayedOperation的run()方法,从上面的源码可知,DelayedOperation的run()方法只做了一件事:调用forceComplete()如果得到返回值为true就调用onExpiration()forceComplete(): Boolean方法内部使用CAS方式修改原子类标识变量completed为true,如果修改成功就调用cancel()方法将其从TimerTaskList链中移除,然后调用onComplete(): Unit方法,因此可知,onComplete(): Unit方法才是DelayedOperation类型任务的主要代码所在处。

2.4. SystemTimer类

Kafka中使用SystemTimer实现了定时器功能,它的底层使用的即是TimeWheel时间轮,添加了执行到期任务、阻塞等待最近到期任务的功能;我们先来观察一下它的定义及重要属性:

  • @threadsafe
  • class SystemTimer(executorName: String,
  • tickMs: Long = 1,
  • wheelSize: Int = 20,
  • startMs: Long = System.currentTimeMillis) extends Timer {
  • // timeout timer
  • // JDK线程池,固定线程数为1,自定义了线程工厂
  • private[this] val taskExecutor = Executors.newFixedThreadPool(1, new ThreadFactory() {
  • def newThread(runnable: Runnable): Thread =
  • Utils.newThread("executor-"+executorName, runnable, false)
  • })
  • // 各个层级的时间轮共用的DelayQueue队列,主要作用是阻塞推进时间轮表针的线程(ExpiredOperationReaper),等待最近到期任务到期
  • private[this] val delayQueue = new DelayQueue[TimerTaskList]()
  • // 各个层级时间轮共用的任务个数计数器
  • private[this] val taskCounter = new AtomicInteger(0)
  • // 层级时间轮中最底层的时间轮
  • private[this] val timingWheel = new TimingWheel(
  • tickMs = tickMs,
  • wheelSize = wheelSize,
  • startMs = startMs,
  • taskCounter = taskCounter,
  • delayQueue
  • )
  • // Locks used to protect data structures while ticking
  • // 用来同步时间轮指针currentTime修改的读写锁
  • private[this] val readWriteLock = new ReentrantReadWriteLock()
  • private[this] val readLock = readWriteLock.readLock()
  • private[this] val writeLock = readWriteLock.writeLock()
  • ...
  • }

从源码可以得知,SystemTimer继承自Timer,Timer是一个特质,其中定义了几个基础的方法,比较简单,这里就不贴源码了。SystemTimer中使用JDK提供的ThreadPoolExecutor创建执行任务的线程池taskExecutor,固定线程池大小为1,并且自定义了线程工厂以产生满足需求的线程。delayQueue用于存放延迟任务,这个队列将在各个层级的时间轮中共享。timingWheel则是SystemTimer内部根据相关参数创建的时间轮对象了。另外SystemTimer还使用读写锁来控制延迟任务的添加和时间指针推进操作。

SystemTimer提供了add(timerTask: TimerTask): Unit添加定时任务TimerTask对象,源码如下:

  • // kafka.utils.timer.SystemTimer#add
  • // 添加定时任务
  • def add(timerTask: TimerTask): Unit = {
  • // 加读锁
  • readLock.lock()
  • try {
  • // 进行添加
  • addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + System.currentTimeMillis()))
  • } finally {
  • readLock.unlock()
  • }
  • }
  • // kafka.utils.timer.SystemTimer#addTimerTaskEntry
  • // 添加定时任务
  • private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {
  • // 向时间轮中添加任务
  • if (!timingWheel.add(timerTaskEntry)) {
  • // Already expired or cancelled
  • // 如果向时间轮中添加失败,可能是因为任务过期或已经取消了
  • if (!timerTaskEntry.cancelled)
  • // 如果任务是因为过期被拒,则直接放置到线程池中执行
  • taskExecutor.submit(timerTaskEntry.timerTask)
  • }
  • }

add(...)方法会将TimerTask对象包装为TimerTaskEntry对象,然后交给addTimerTaskEntry(...)方法进行添加;而addTimerTaskEntry(...)方法则直接调用timingWheeladd(timerTaskEntry: TimerTaskEntry): Boolean方法尝试将TimerTaskEntry对象添加到时间轮中,前面我们讲到过,这个添加方法在任务过期或取消的情况下会直接返回false,在addTimerTaskEntry(...)方法中,调用add(timerTaskEntry: TimerTaskEntry): Boolean方法返回false的任务会先判断是否取消了,如果不是取消(则为超时)就直接将其提交给taskExecutor线程池进行执行。

reinsert是一个函数字段,字面意思用于将TimerTaskEntry重新添加到时间轮中:

  • // kafka.utils.timer.SystemTimer#reinsert
  • // 将timerTaskEntry重新添加到时间轮中
  • private[this] val reinsert = (timerTaskEntry: TimerTaskEntry) => addTimerTaskEntry(timerTaskEntry)

SystemTimer也有一个advanceClock(timeoutMs: Long): Boolean方法用于推进时间指针,源码如下:

  • // kafka.utils.timer.SystemTimer#advanceClock
  • /*
  • * Advances the clock if there is an expired bucket. If there isn't any expired bucket when called,
  • * waits up to timeoutMs before giving up.
  • * 推进时间轮指针,同时对到期的TimerTaskList中的任务进行处理。
  • * 如果TimerTaskList到期,但是其中的某些任务未到期,会将未到期任务进行降级,添加到低层次的时间轮中继续等待;
  • * 如果任务到期了,则提交到taskExecutor线程池中执行。
  • */
  • def advanceClock(timeoutMs: Long): Boolean = {
  • /**
  • * 从队列中取出第一个TimerTaskList元素,会阻塞,等待超时时间为timeoutMs
  • * 注意,这里取出的TimerTaskList的过期时间是到期的,也就是说该链表中的任务已经到执行时间了
  • * 如果没有到期的TimerTaskList将在阻塞等待超时后返回null
  • */
  • var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)
  • if (bucket != null) { // 取出TimerTaskList元素不为空
  • // 加写锁
  • writeLock.lock()
  • try {
  • while (bucket != null) { // 重新检查取出的TimerTaskList元素
  • // 根据TimerTaskList的过期时间推进时间轮指针
  • timingWheel.advanceClock(bucket.getExpiration())
  • /**
  • * 调用TimerTaskList对象的flush()方法重新处理其中的任务
  • * 该操作会将bucket中所有元素移除来然后重新添加到时间轮
  • * 在重新添加的操作中如果遇到到期的任务会交给taskExecutor线程池执行
  • * 也有可能对其进行降级到下层时间轮
  • * 注意,在这个重新添加的过程中,到期的任务是会被直接提交到线程池执行的
  • * 可以回顾 addTimerTaskEntry() 方法
  • */
  • bucket.flush(reinsert)
  • // 再次从delayQueue中取一个TimerTaskList元素,不会阻塞
  • bucket = delayQueue.poll()
  • }
  • } finally {
  • writeLock.unlock()
  • }
  • true
  • } else {
  • false
  • }
  • }

advanceClock(...)方法内部其实不止有推进时钟指针的功能,它虽然将推进指针的功能交给了底层的TimerWheel时间轮,但其实是根据是否有到期任务来决定是否推进时间轮;通过从delayQueue延迟队列中尝试取出到期的TimerTaskList任务链,如果取到了说明有任务超时,则进行时间轮推进并将任务链中的任务重新添加到时间轮中,在这个添加过程中是会将超时任务提交到线程池执行的。

2.5. DelayedOperationPurgatory类

Purgatory单词的中文意思是【n. 炼狱;受难的处所(或状态);惩戒所;折磨;磨难;】,正如其名,DelayedOperationPurgatory是对DelayedOperation类型任务的控制类,我们先看一下它的object定义:

  • // kafka.server.DelayedOperationPurgatory
  • object DelayedOperationPurgatory {
  • /**
  • * @param purgatoryName 炼狱名称
  • * @param brokerId broker的ID
  • * @param purgeInterval 清理阈值;当DelayedOperationPurgatory中存储的DelayedOperation数量与时间轮中的DelayedOperation数量差距大于该阈值时会出发清理操作
  • * @tparam T 泛型,必须是DelayedOperation子类
  • * @return
  • */
  • def apply[T <: DelayedOperation](purgatoryName: String,
  • brokerId: Int = 0,
  • purgeInterval: Int = 1000): DelayedOperationPurgatory[T] = {
  • // 创建SystemTimer
  • val timer = new SystemTimer(purgatoryName)
  • new DelayedOperationPurgatory[T](purgatoryName, timer, brokerId, purgeInterval)
  • }
  • }

从object的apply(...)方法的定义可知,DelayedOperationPurgatory接收的元素类型必须是DelayedOperation的子类,同时DelayedOperationPurgatory默认使用SystemTimer作为定时器。

DelayedOperationPurgatory的类定义及重要字段如下:

  • /**
  • * A helper purgatory class for bookkeeping delayed operations with a timeout, and expiring timed out operations.
  • * @param purgatoryName
  • * @param timeoutTimer
  • * @param brokerId
  • * @param purgeInterval 清理阈值;当DelayedOperationPurgatory中存储的DelayedOperation数量与时间轮中的DelayedOperation数量差距大于该阈值时会出发清理操作
  • * @param reaperEnabled
  • * @tparam T
  • */
  • class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String,
  • timeoutTimer: Timer,
  • brokerId: Int = 0,
  • purgeInterval: Int = 1000,
  • reaperEnabled: Boolean = true)
  • extends Logging with KafkaMetricsGroup {
  • /** a list of operation watching keys
  • * 用于管理DelayedOperation对象
  • * key表示的是Watchers中的DelayedOperation关心的对象,
  • * value是Watchers类型的对象,
  • * Watchers是DelayedOperationPurgatory的内部类,表示一个DelayedOperation的集合,底层使用LinkedList实现
  • */
  • private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers(key)))
  • // 对watchersForKey进行同步的读写锁操作
  • private val removeWatchersLock = new ReentrantReadWriteLock()
  • // the number of estimated total operations in the purgatory
  • // 记录了该DelayedOperationPurgatory中的DelayedOperation的个数
  • private[this] val estimatedTotalOperations = new AtomicInteger(0)
  • /**
  • * background thread expiring operations that have timed out
  • * ExpiredOperationReaper继承了ShutdownableThread类,是一个线程对象
  • * 主要有两个功能:
  • * 1. 推进时间轮表针;
  • * 2. 定期清理watchersForKey中已完成的DelayedOperation,清理条件由purgeInterval字段指定。
  • * 在DelayedOperationPurgatory初始化时会启动此线程。
  • **/
  • private val expirationReaper = new ExpiredOperationReaper()
  • ...
  • }

这里我们需要关注watchersForKeyexpirationReaper两个字段;先看watchersForKey字段,它是一个键为Any类型值为Watchers类型Pool池,Watchers类是DelayedOperationPurgatory的一个私有内部类,内部有一个LinkedList类型的链表operations,其内部存储的元素的泛型与构建DelayedOperationPurgatory时指定的一样,定义如下:

  • // kafka.server.DelayedOperationPurgatory.Watchers
  • /**
  • * A linked list of watched delayed operations based on some key
  • */
  • private class Watchers(val key: Any) {
  • // 用于管理DelayedOperation的队列
  • private[this] val operations = new LinkedList[T]()
  • // 已经参与监听操作的操作的个数
  • def watched: Int = operations synchronized operations.size
  • ...
  • }

因此watchersForKey池中存储的结构是一个对象(键)对应于一个LinkedList链表(值),LinkedList链表中存储的元素都是DelayedOperation的子类型。watchersForKey池的作用其实是用于监听机制,位于LinkedList链表(值)中的DelayedOperation都监听了对应键对象的变化,相关的操作将在后面详细介绍。Watchers的示意图如下:

7.Watcher监听池结构.png

expirationReaper的类型是ExpiredOperationReaper,意为“过期操作收割机”,它也是DelayedOperationPurgatory的私有内部类,定义非常简单:

  • // kafka.server.DelayedOperationPurgatory.ExpiredOperationReaper
  • /**
  • * A background reaper to expire delayed operations that have timed out
  • */
  • private class ExpiredOperationReaper extends ShutdownableThread(
  • "ExpirationReaper-%d".format(brokerId),
  • false) {
  • // 主要方法
  • override def doWork() {
  • // 调用了DelayedOperationPurgatory的advanceClock()方法
  • advanceClock(200L)
  • }
  • }
  • // DelayedOperationPurgatory初始化时会调用
  • // 如果开启了收割功能,就启动收割机
  • if (reaperEnabled)
  • expirationReaper.start()

ExpiredOperationReaper继承了ShutdownableThread类,也即是说它是一个线程类,ShutdownableThread类在前面介绍过,ExpiredOperationReaper也重写了最重要的doWork()方法,内部调用了DelayedOperationPurgatory的advanceClock(timeoutMs: Long)方法尝试推进DelayedOperationPurgatory内部SystemTimer定时器;在DelayedOperationPurgatory初始化时,如果开启了收割功能就会启动该收割机。

我们就来关注一下DelayedOperationPurgatory的advanceClock(timeoutMs: Long)方法,源码如下:

  • // kafka.server.DelayedOperationPurgatory#advanceClock
  • def advanceClock(timeoutMs: Long) {
  • // 尝试推进时间轮的指针
  • timeoutTimer.advanceClock(timeoutMs)
  • // Trigger a purge if the number of completed but still being watched operations is larger than
  • // the purge threshold. That number is computed by the difference btw the estimated total number of
  • // operations and the number of pending delayed operations.
  • /**
  • * DelayedOperation到期后被SystemTimer.taskExecutor完成后,并不会通知
  • * DelayedOperationPurgatory删除DelayedOperation
  • * 当DelayedOperationPurgatory与SystemTimer中的DelayedOperation数量相差到一个阈值时,
  • * 会调用purgeCompleted()方法清理已完成的DelayedOperation
  • * stimatedTotalOperations.get:当前DelayedOperationPurgatory中的DelayedOperation的个数
  • * delayed:时间轮中DelayedOperation的个数
  • * purgeInterval:阈值
  • */
  • if (estimatedTotalOperations.get - delayed > purgeInterval) {
  • // now set estimatedTotalOperations to delayed (the number of pending operations) since we are going to
  • // clean up watchers. Note that, if more operations are completed during the clean up, we may end up with
  • // a little overestimated total number of operations.
  • // 将当前DelayedOperationPurgatory中DelayedOperation的个数改为delayed
  • estimatedTotalOperations.getAndSet(delayed)
  • debug("Begin purging watch lists")
  • // 对所有Watchers调用purgeCompleted()方法清理已完成的DelayedOperation
  • val purged = allWatchers.map(_.purgeCompleted()).sum
  • debug("Purged %d elements from watch lists.".format(purged))
  • }
  • }
  • // the number of estimated total operations in the purgatory
  • // 记录了该DelayedOperationPurgatory中的DelayedOperation的个数
  • private[this] val estimatedTotalOperations = new AtomicInteger(0)
  • /**
  • * Return the number of delayed operations in the expiry queue
  • * 时间轮中有效DelayedOperation的个数
  • */
  • def delayed() = timeoutTimer.size

该方法调用了SystemTimer的advanceClock(timeoutMs: Long): Boolean方法,这个方法前面讲到过,它会阻塞等待以取出超时的任务链,推进底层时间轮,同时会将链上的TimerTaskEntry重新添加到时间轮中,这个重新添加的过程中会执行到期的任务。也即是说,SystemTimer每一次的时间推进都将尝试执行到期的任务。

estimatedTotalOperations是DelayedOperationPurgatory中用于记录DelayedOperation个数的字段,但由于时间轮中某些已完成的DelayedOperation会从时间轮中移除,但不会主动从DelayedOperationPurgatory的watchersForKey中移除,因此这里根据阈值purgeInterval来决定是否要对watchersForKey中的DelayedOperation进行净化,净化操作由Watchers类的purgeCompleted()方法完成,源码如下:

  • // kafka.server.DelayedOperationPurgatory.Watchers#purgeCompleted
  • // traverse the list and purge elements that are already completed by others
  • /**
  • * 清理operations队列,将已完成的DelayedOperation从operations队列中移除
  • * 如果operations队列为空,则将Watchers从DelayedOperationPurgatory.watchersForKey中移除
  • */
  • def purgeCompleted(): Int = {
  • var purged = 0
  • operations synchronized {
  • // 遍历operations队列
  • val iter = operations.iterator()
  • while (iter.hasNext) {
  • val curr = iter.next()
  • if (curr.isCompleted) {
  • // 如果DelayedOperation已完成就将其移除
  • iter.remove()
  • // 维护计数器
  • purged += 1
  • }
  • }
  • }
  • if (operations.size == 0)
  • /** operations队列为空,将Watchers从DelayedOperationPurgatory.watchersForKey中移除
  • * removeKeyIfEmpty()方法是DelayedOperationPurgatory的方法
  • */
  • removeKeyIfEmpty(key, this)
  • // 返回已移除的DelayedOperation的数量
  • purged
  • }
  • // kafka.server.DelayedOperationPurgatory#removeKeyIfEmpty
  • /*
  • * Remove the key from watcher lists if its list is empty
  • * 如果watchersForKey中某个键对应的Watchers中没有参与监听的DelayedOperation
  • * 就将该键和Watchers从watchersForKey中移除
  • */
  • private def removeKeyIfEmpty(key: Any, watchers: Watchers) {
  • inWriteLock(removeWatchersLock) {
  • // if the current key is no longer correlated to the watchers to remove, skip
  • if (watchersForKey.get(key) != watchers)
  • return
  • // 执行移除
  • if (watchers != null && watchers.watched == 0) {
  • watchersForKey.remove(key)
  • }
  • }
  • }

DelayedOperationPurgatory的核心方法有两个,其中checkAndComplete(key: Any): Int方法会根据传入的key尝试执行对应的Watchers中的DelayedOperation,通过调用Watchers的tryCompleteWatched(): Int方法实现,源码比较简单,如下:

  • // kafka.server.DelayedOperationPurgatory#checkAndComplete
  • /**
  • * Check if some some delayed operations can be completed with the given watch key,
  • * and if yes complete them.
  • * 根据传入的key尝试执行对应的Watchers中的DelayedOperation
  • *
  • * @return the number of completed operations during this process
  • */
  • def checkAndComplete(key: Any): Int = {
  • // 获取Watchers
  • val watchers = inReadLock(removeWatchersLock) { watchersForKey.get(key) }
  • if(watchers == null)
  • 0
  • else
  • // 对Watchers调用tryCompleteWatched()方法
  • watchers.tryCompleteWatched()
  • }
  • // kafka.server.DelayedOperationPurgatory.Watchers#tryCompleteWatched
  • // traverse the list and try to complete some watched elements
  • /**
  • * 遍历operations队列,对于未完成DelayedOperation尝试调用tryComplete()方法,将已完成的DelayedOperation移除
  • * 如果operations队列为空,则将Watchers从DelayedOperationPurgatory.watchersForKey中移除
  • * @return
  • */
  • def tryCompleteWatched(): Int = {
  • var completed = 0
  • operations synchronized {
  • // 遍历operations
  • val iter = operations.iterator()
  • while (iter.hasNext) {
  • val curr = iter.next()
  • if (curr.isCompleted) {
  • // another thread has completed this operation, just remove it
  • // 如果DelayedOperation已完成,将其从operations移除
  • iter.remove()
  • } else if (curr synchronized curr.tryComplete()) { // 未完成,尝试调用tryComplete()方法
  • // tryComplete()尝试完成成功,更新计数器,并将DelayedOperation从operations移除
  • completed += 1
  • iter.remove()
  • }
  • }
  • }
  • if (operations.size == 0)
  • /** operations队列为空,将Watchers从DelayedOperationPurgatory.watchersForKey中移除
  • * removeKeyIfEmpty()方法是DelayedOperationPurgatory的方法
  • */
  • removeKeyIfEmpty(key, this)
  • // 返回已完成的DelayedOperation的数量
  • completed
  • }

tryCompleteElseWatch()方法则用于检测DelayedOperation是否已经完成,若未完成则添加到watchersForKey以及SystemTimer中:

  • // kafka.server.DelayedOperationPurgatory#tryCompleteElseWatch
  • /**
  • * Check if the operation can be completed, if not watch it based on the given watch keys
  • *
  • * Note that a delayed operation can be watched on multiple keys. It is possible that
  • * an operation is completed after it has been added to the watch list for some, but
  • * not all of the keys. In this case, the operation is considered completed and won't
  • * be added to the watch list of the remaining keys. The expiration reaper thread will
  • * remove this operation from any watcher list in which the operation exists.
  • *
  • * 检测DelayedOperation是否完成,未完成则添加到watchersForKey以及SystemTimer中
  • *
  • * @param operation the delayed operation to be checked
  • * @param watchKeys keys for bookkeeping the operation
  • * @return true iff the delayed operations can be completed by the caller
  • */
  • def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = {
  • // 检查watchKeys的大小必须大于0
  • assert(watchKeys.size > 0, "The watch key list can't be empty")
  • // The cost of tryComplete() is typically proportional to the number of keys. Calling
  • // tryComplete() for each key is going to be expensive if there are many keys. Instead,
  • // we do the check in the following way. Call tryComplete(). If the operation is not completed,
  • // we just add the operation to all keys. Then we call tryComplete() again. At this time, if
  • // the operation is still not completed, we are guaranteed that it won't miss any future triggering
  • // event since the operation is already on the watcher list for all keys. This does mean that
  • // if the operation is completed (by another thread) between the two tryComplete() calls, the
  • // operation is unnecessarily added for watch. However, this is a less severe issue since the
  • // expire reaper will clean it up periodically.
  • // 尝试完成operation操作,如果执行完成则直接返回true
  • var isCompletedByMe = operation synchronized operation.tryComplete()
  • if (isCompletedByMe)
  • return true
  • var watchCreated = false
  • for(key <- watchKeys) {
  • // If the operation is already completed, stop adding it to the rest of the watcher list.
  • if (operation.isCompleted()) // 如果operation已完成,则放弃添加,直接返回false
  • return false
  • // 否则将operation添加到watchersForKey对应的Watchers中
  • watchForOperation(key, operation)
  • if (!watchCreated) {
  • // 更新标记
  • watchCreated = true
  • // 更新计数器estimatedTotalOperations
  • estimatedTotalOperations.incrementAndGet()
  • }
  • }
  • // 再次尝试执行operation,如果成功则直接返回
  • isCompletedByMe = operation synchronized operation.tryComplete()
  • if (isCompletedByMe)
  • return true
  • // if it cannot be completed by now and hence is watched, add to the expire queue also
  • /**
  • * 由于已经将operation添加到了对应的Watchers中,因此该operation可以响应出发的checkAndComplete()操作
  • * 将operation提交到SysteTimer
  • */
  • if (! operation.isCompleted()) {
  • timeoutTimer.add(operation)
  • if (operation.isCompleted()) {
  • // cancel the timer task
  • // 如果任务已完成,则将其从SystemTimer中移除
  • operation.cancel()
  • }
  • }
  • false
  • }
  • /*
  • * Return the watch list of the given key, note that we need to
  • * grab the removeWatchersLock to avoid the operation being added to a removed watcher list
  • */
  • private def watchForOperation(key: Any, operation: T) {
  • inReadLock(removeWatchersLock) {
  • val watcher = watchersForKey.getAndMaybePut(key)
  • watcher.watch(operation)
  • }
  • }

注意tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean方法的参数,它的第一个参数标识要添加的DelayedOperation对象,而第二个参数watchKeys是一个Seq[Any]类型的序列,因此该方法会遍历watchKeys中所有的键对象,然后将operation添加到watchersForKey中这些键对应的LinkedList链表中会。

关于Kafka时间轮及定时任务控制的实现就先讲解这么多了,下一篇文章将会以两个实际的操作演示时间轮是如何运转的。