Java多线程
Java并发
线程池

Java多线程 45 - ScheduledThreadPoolExecutor详解(1)

简介:ScheduledThreadPoolExecutor用于执行周期性或延时性的定时任务,它是在ThreadPoolExecutor的基础上实现的任务调度线程池,内部使用延时工作队列DelayedWorkQueue实现对任务的延时调度。DelayedWorkQueue内部使用一个初始容量为16的数组来保存任务,容量不够时会按照现有容量的1.5倍进行扩容,最大容量可达Integer.MAX_VALUE。

注:本文讲解的ScheduledThreadPoolExecutor基于JDK 1.7.0_07版本。在阅读本文之前,读者需要先理解ThreadPoolExecutor的实现,可以阅读ThreadPoolExecutor详解相关的两篇文章:

  1. Java多线程 43—— ThreadPoolExecutor详解(一)
  2. Java多线程 44—— ThreadPoolExecutor详解(二)

ScheduledThreadPoolExecutor用于执行周期性或延时性的定时任务,它是在ThreadPoolExecutor的基础上实现的任务调度线程池,内部使用延时工作队列DelayedWorkQueue实现对任务的延时调度。DelayedWorkQueue内部使用一个初始容量为16的数组来保存任务,容量不够时会按照现有容量的1.5倍进行扩容,最大容量可达Integer.MAX_VALUE

在讲解ScheduledThreadPoolExecutor之前,我们先理解ScheduledThreadPoolExecutor线程池中涉及到的任务类ScheduledFutureTask和延时等待队列DelayedWorkQueue。

1. ScheduledFutureTask任务类

ScheduledThreadPoolExecutor中的任务都被包装为了ScheduledFutureTask类型的对象,ScheduledFutureTask在普通任务(Runnable或Callable)的基础上扩展了额外的功能。

1.1. ScheduledFutureTask的结构

我们首先关注ScheduledFutureTask的整体结构;该类继承自FutureTask,同时实现了RunnableScheduledFuture接口,定义如下:

  • private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {
  • ...
  • }

其中FutureTask类在之前我们已经介绍过了,因此这里我们主要关注RunnableScheduledFuture接口,它的定义如下:

  • public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V> {
  • /**
  • * Returns true if this is a periodic task. A periodic task may
  • * re-run according to some schedule. A non-periodic task can be
  • * run only once.
  • *
  • * @return true if this task is periodic
  • */
  • boolean isPeriodic();
  • }

该接口也位于java.util.concurrent包下,只提供了一个方法isPeriodic()给实现类实现,用于判断任务是否是周期性的任务。同时该接口还继承了RunnableFuture和ScheduledFuture两个接口,RunnableFuture在前面已经讲解过了,它继承了Runnable和Future接口;这里主要关注ScheduledFuture接口,它的源码如下:

  • public interface ScheduledFuture<V> extends Delayed, Future<V> {
  • }

ScheduledFuture是一个标记接口,没有任何方法的声明,但它继承了Delayed和Future接口,其中Future接口之前讲解过,Delayed接口的源码如下:

  • public interface Delayed extends Comparable<Delayed> {
  • /**
  • * Returns the remaining delay associated with this object, in the
  • * given time unit.
  • *
  • * @param unit the time unit
  • * @return the remaining delay; zero or negative values indicate
  • * that the delay has already elapsed
  • */
  • long getDelay(TimeUnit unit);
  • }

Delayed接口继承了Comparable接口,因此它可以被比较;Delayed接口自己提供的方法仅仅有getDelay(TimeUnit unit),用于换算和获取任务的延迟时间。

理清了这些接口的关系,可以得到下面的类图结构:

1.2. ScheduledFutureTask的成员变量

ScheduledFutureTask在普通任务类的基础上扩展了一些成员变量,用于支撑任务的定时调度,如下:

  • /**
  • * Sequence number to break ties FIFO
  • * 任务在队列中的序号
  • * */
  • private final long sequenceNumber;
  • /**
  • * The time the task is enabled to execute in nanoTime units
  • * 任务应该执行的时间,以纳秒计算
  • * */
  • private long time;
  • /**
  • * Period in nanoseconds for repeating tasks. A positive
  • * value indicates fixed-rate execution. A negative value
  • * indicates fixed-delay execution. A value of 0 indicates a
  • * non-repeating task.
  • *
  • * 任务重复执行的时间间隔,以纳秒计算
  • * 当其为正值时,表示固定速率执行
  • * 当其为负值时,表示固定延迟执行
  • * 当其为0时,表示是非重复任务
  • */
  • private final long period;
  • /** The actual task to be re-enqueued by reExecutePeriodic */
  • RunnableScheduledFuture<V> outerTask = this;
  • /**
  • * Index into delay queue, to support faster cancellation.
  • */
  • int heapIndex;

这里对着几个成员变量进行解释:

  1. sequenceNumber用于标识在等待队列中的顺序,因为可能会碰到多个任务的延时是一样的,所以需要另一个序号进行区分标记;
  2. time字段用于记录计算出来的任务执行时间;
  3. period字段用于标识任务是否是定时任务,当该字段为正值时,表示是周期性重复任务,当该值为负值时,表示是延迟性重复任务,当该值为0时,表示是非重复任务;
  4. outerTask用于记录当前任务,它的默认值就指向自己;该成员变量主要用在重复性任务中,当前一次任务执行结束时,任务会通过outerTask将自己再次加入到等待队列中,以实现重复执行,它的作用会在后面的源码中进行讲解;
  5. heapIndex用于记录任务在队列中的堆排序顺序。ScheduledThreadPoolExecutor内部使用DelayedWorkQueue作为任务的等待队列,该队列的具体实现其实是一个小顶堆结构,heapIndex则记录了每个任务在堆中的排序顺序。

1.3. ScheduledFutureTask的方法

首先关注ScheduledFutureTask的构造方法,有以下三个:

  • /**
  • * Creates a one-shot action with given nanoTime-based trigger time.
  • * 根据给定开始时间,创建仅执行一次的任务
  • */
  • ScheduledFutureTask(Runnable r, V result, long ns) {
  • super(r, result);
  • this.time = ns;
  • this.period = 0;
  • this.sequenceNumber = sequencer.getAndIncrement();
  • }
  • /**
  • * Creates a periodic action with given nano time and period.
  • * 根据给定开始时间和间隔时间,创建周期性任务
  • */
  • ScheduledFutureTask(Runnable r, V result, long ns, long period) {
  • super(r, result);
  • this.time = ns;
  • this.period = period;
  • this.sequenceNumber = sequencer.getAndIncrement();
  • }
  • /**
  • * Creates a one-shot action with given nanoTime-based trigger.
  • * 根据给定开始时间,创建仅执行一次的任务
  • */
  • ScheduledFutureTask(Callable<V> callable, long ns) {
  • super(callable);
  • this.time = ns;
  • this.period = 0;
  • this.sequenceNumber = sequencer.getAndIncrement();
  • }

构造方法都比较简单,调用了父类的构造方法后,会对一些成员变量进行赋值,需要注意的是sequenceNumber,它会通过sequencer进行自增获取,sequencer是ScheduledThreadPoolExecutor的常量,是一个Long型的原子类,初值为0,定义如下:

  • private static final AtomicLong sequencer = new AtomicLong(0);

由于sequenceNumber是原子性自增的,因此可以保证每个任务的sequenceNumber会按照初始化顺序严格递增。

在ScheduledFutureTask剩余的方法中,getDelay(TimeUnit)方法用于通过构造方法中得到的time值计算相对于当前时间的延迟时间,实现比较简单:

  • // 获取ScheduledFutureTask任务剩余的延迟时间
  • public long getDelay(TimeUnit unit) {
  • return unit.convert(time - now(), TimeUnit.NANOSECONDS);
  • }

now()方法定义在ScheduledThreadPoolExecutor中,获取的是系统当前的纳秒时间:

  • /**
  • * Returns current nanosecond time.
  • * 返回当前纳秒时间
  • */
  • final long now() {
  • return System.nanoTime();
  • }

getDelay(TimeUnit)方法每次调用都会重新计算新的延迟时间。

setNextRunTime()方法则会根据periodtime的值,计算出下一次的运行时间:

  • /**
  • * Sets the next time to run for a periodic task.
  • * 设置下一次执行时间
  • */
  • private void setNextRunTime() {
  • // 获取周期时间间隔
  • long p = period;
  • if (p > 0)
  • /**
  • * 如果p大于0,表示是周期性执行任务,
  • * 下一次执行任务在本次任务的period时间段后
  • */
  • time += p;
  • else
  • /**
  • * 如果p小于0,表示是间隔性执行任务
  • * 使用triggerTime()计算下一次执行任务的时间
  • */
  • time = triggerTime(-p);
  • }

period大于0时,表示是周期性执行任务,只需要在time的基础上加上period即可得到任务下一次的运行时间;当period小于或等于0时,将用triggerTime(-p)方法来处理时间,该方法定义在ScheduledThreadPoolExecutor中:

  • /**
  • * Returns the trigger time of a delayed action.
  • * 根据传入的延迟时间和当前时间,计算最终触发时间
  • */
  • long triggerTime(long delay) {
  • /**
  • * 延迟时间delay小于Long.MAX_VALUE >> 1,则返回now() + delay
  • * 延迟时间delay大于Long.MAX_VALUE >> 1,则返回overflowFree()处理的结果,处理过程详见该方法
  • */
  • return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
  • }

其中overflowFree(long)方法的源码如下:

  • /**
  • * Constrains the values of all delays in the queue to be within
  • * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
  • * This may occur if a task is eligible to be dequeued, but has
  • * not yet been, while some other task is added with a delay of
  • * Long.MAX_VALUE.
  • */
  • private long overflowFree(long delay) {
  • // 查看延迟等待队列(在ScheduledThreadPoolExecutor中使用的是延迟队列DelayedWorkQueue)中的队首任务
  • Delayed head = (Delayed) super.getQueue().peek();
  • if (head != null) {
  • // 如果队首任务不为null,计算队首任务相对于当前时间还需要延迟的时间headDelay
  • long headDelay = head.getDelay(TimeUnit.NANOSECONDS);
  • /**
  • * 如果headDelay小于0,表示此时队首任务应该出队了,但恰好有任务入队
  • * 且delay - headDelay小于0,则表示此时delay的值小于0或大于Long.MAX_VALUE
  • * 而传入该方法的delay一定是大于0的,因此此时delay大于Long.MAX_VALUE
  • */
  • if (headDelay < 0 && (delay - headDelay < 0))
  • /**
  • * 如果delay大于Long.MAX_VALUE,且队首任务应该要出队了(headDelay为负)
  • * 则将delay置为Long.MAX_VALUE + headDelay
  • */
  • delay = Long.MAX_VALUE + headDelay;
  • }
  • // 返回delay
  • return delay;
  • }

triggerTime(long delay)中传入的delay小于Long.MAX_VALUE >> 1时会直接取delay,否则交由overflowFree(long delay处理,该方法会根据等待队列的队首任务的延时时间来计算最终下一次任务的执行时间。

ScheduledFutureTask另外的isPeriodic()compareTo(Delayed other)两个方法则比较简单,主要用于判断任务是否是重复性任务以及与另一个任务比较执行的先后顺序,这个比较方法主要用于在等待队列DelayedWorkQueue进行堆排序,它们的源码如下:

  • /**
  • * Returns true if this is a periodic (not a one-shot) action.
  • *
  • * 判断是否是周期性任务
  • *
  • * @return true if periodic
  • */
  • public boolean isPeriodic() {
  • return period != 0;
  • }
  • /**
  • * 与其他ScheduledFutureTask任务进行比较
  • * 主要通过time成员变量来比较
  • */
  • public int compareTo(Delayed other) {
  • // 判断二者是否是同一个对象,如果是则返回0
  • if (other == this) // compare zero ONLY if same object
  • return 0;
  • // 判断传入的other对象是否是ScheduledFutureTask类型
  • if (other instanceof ScheduledFutureTask) {
  • // 强转
  • ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
  • // 计算二者时间差
  • long diff = time - x.time;
  • // 当前任务在other任务之前执行
  • if (diff < 0)
  • return -1;
  • // 当前任务在other任务之后执行
  • else if (diff > 0)
  • return 1;
  • // 如果两个任务的执行时间相同则比较序号
  • else if (sequenceNumber < x.sequenceNumber)
  • /**
  • * 当前任务与other任务执行时间相同,但序号较小
  • * 因此当前任务先执行
  • */
  • return -1;
  • else
  • /**
  • * 当前任务与other任务执行时间相同,但序号较大
  • * 因此当前任务后执行
  • */
  • return 1;
  • }
  • /**
  • * 走到这里表示传入的other不是ScheduledFutureTask类型
  • * 计算二者的执行时间差,根据该差值决定大小
  • */
  • long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS));
  • return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
  • }

compareTo(Delayed other)方法中就可以看出成员变量sequenceNumber的用处了。

ScheduledFutureTask对任务的主要操作方法体现在cancel(boolean mayInterruptIfRunning)run()中,这两个会在后面讲解。

2. DelayedWorkQueue等待队列

ScheduledThreadPoolExecutor是继承自ThreadPoolExecutor的线程池,因此也会使用等待队列,ScheduledThreadPoolExecutor中使用的DelayedWorkQueue队列实现了任务的延时执行。

2.1. DelayedWorkQueue基本设计

DelayedWorkQueue是ScheduledThreadPoolExecutor的一个静态内部类,定义如下:

  • /**
  • * Specialized delay queue. To mesh with TPE declarations, this
  • * class must be declared as a BlockingQueue<Runnable> even though
  • * it can only hold RunnableScheduledFutures.
  • * 延时队列,用于装载被执行的任务,然后根据调度配置进行入队和出队操作
  • * 小顶堆模式实现
  • */
  • static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {
  • ...
  • }

从定义可知,DelayedWorkQueue是一个实现了BlockingQueue接口的阻塞队列,同时继承了AbstractQueue抽象类;由于泛型定义指定为Runnable,DelayedWorkQueue入队的元素都是Runnable类型的对象。DelayedWorkQueue中定义的成员变量如下:

  • // 初始容量为16
  • private static final int INITIAL_CAPACITY = 16;
  • // 用于装载任务的数组
  • private RunnableScheduledFuture[] queue = new RunnableScheduledFuture[INITIAL_CAPACITY];
  • // 锁
  • private final ReentrantLock lock = new ReentrantLock();
  • // 大小
  • private int size = 0;
  • /**
  • * Thread designated to wait for the task at the head of the
  • * queue. This variant of the Leader-Follower pattern
  • * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
  • * minimize unnecessary timed waiting. When a thread becomes
  • * the leader, it waits only for the next delay to elapse, but
  • * other threads await indefinitely. The leader thread must
  • * signal some other thread before returning from take() or
  • * poll(...), unless some other thread becomes leader in the
  • * interim. Whenever the head of the queue is replaced with a
  • * task with an earlier expiration time, the leader field is
  • * invalidated by being reset to null, and some waiting
  • * thread, but not necessarily the current leader, is
  • * signalled. So waiting threads must be prepared to acquire
  • * and lose leadership while waiting.
  • *
  • * 试图执行队首任务的正处于等待状态的线程对象
  • */
  • private Thread leader = null;
  • /**
  • * Condition signalled when a newer task becomes available at the
  • * head of the queue or a new thread may need to become leader.
  • * 等待条件
  • */
  • private final Condition available = lock.newCondition();

DelayedWorkQueue内部使用了一个RunnableScheduledFuture[]类型的数组queue装载元素,初始化容量为16(INITIAL_CAPACITY),虽然queue数组有具体大小,但DelayedWorkQueue在添加元素过程中如果queue数组已满就会对其进行扩容,扩容操作后面会讲解。DelayedWorkQueue还是用了可重入锁保证线程并发的安全性,另外还使用了Condition等待条件available来协调元素的入队和出队操作。size表示队列中已有的元素个数,leader变量指向试图执行队首任务的正处于等待状态的线程对象,具体用法会在后面讲解。

我们首先关注DelayedWorkQueue的一些基础方法,分别是indexOf(Object)contains(Object)isEmpty()size(),分别用于查找某个元素在DelayedWorkQueue中的索引、判断DelayedWorkQueue是否包含某个元素、DelayedWorkQueue是否为空以及DelayedWorkQueue的大小;其中contains(Object)内部使用indexOf(Object)实现;它们的源码如下:

  • /**
  • * Find index of given object, or -1 if absent
  • * 查找给定x在数组中的索引
  • */
  • private int indexOf(Object x) {
  • // x不能为null
  • if (x != null) {
  • // x要是ScheduledFutureTask类型的对象
  • if (x instanceof ScheduledFutureTask) {
  • // 获取x的heapIndex为i
  • int i = ((ScheduledFutureTask) x).heapIndex;
  • // Sanity check; x could conceivably be a
  • // ScheduledFutureTask from some other pool.
  • // 如果i大于等于0,且i小于size,且queue中i位置的任务与x相等
  • if (i >= 0 && i < size && queue[i] == x)
  • // i即是x在数组中的索引,直接返回
  • return i;
  • } else {
  • /**
  • * 如果x不是ScheduledFutureTask对象
  • * 遍历queue数组,依次查找
  • */
  • for (int i = 0; i < size; i++)
  • // 当查找到的任务与x相同,则返回此时的索引i
  • if (x.equals(queue[i]))
  • return i;
  • }
  • }
  • // 否则返回-1,表示没找到
  • return -1;
  • }
  • // 判断任务x是否存在于queue中
  • public boolean contains(Object x) {
  • // 加锁
  • final ReentrantLock lock = this.lock;
  • lock.lock();
  • try {
  • // 使用indexOf()进行查找并判断
  • return indexOf(x) != -1;
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • // 队列是否为空
  • public boolean isEmpty() {
  • return size() == 0;
  • }
  • // 查看队列大小
  • public int size() {
  • final ReentrantLock lock = this.lock;
  • lock.lock();
  • try {
  • // 直接返回size即可
  • return size;
  • } finally {
  • lock.unlock();
  • }
  • }

这里主要关注indexOf(Object)方法,其它的三个方法都比较简单。从indexOf(Object)的源码可知,当获取索引的元素对象为ScheduledFutureTask类型时,其实ScheduledFutureTask对象的heapIndex成员变量记录了自己位于queue数组中的索引,因此可以直接通过该成员变量辅助获取;但当获取索引的元素对象并不是ScheduledFutureTask类型时,会遍历queue数组逐一进行比较。

上面提到过,DelayedWorkQueue会在queue数组已满时进行扩容操作,具体体现在grow()方法,源码如下:

  • /**
  • * Resize the heap array. Call only when holding lock.
  • * 扩容操作
  • */
  • private void grow() {
  • // 旧的容量
  • int oldCapacity = queue.length;
  • // 新的容量是旧的容量的1.5倍
  • int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
  • // 如果溢出就将其设置为Integer.MAX_VALUE
  • if (newCapacity < 0) // overflow
  • newCapacity = Integer.MAX_VALUE;
  • // 拷贝旧数组的任务到新数组,然后将新数组赋值给queue
  • queue = Arrays.copyOf(queue, newCapacity);
  • }

源码表示,每次扩容的容量将是当前容量的1.5倍,同时最大容量为Integer.MAX_VALUE,这一点从remainingCapacity()方法也可以得知:

  • // 剩余容量是无限的
  • public int remainingCapacity() {
  • return Integer.MAX_VALUE;
  • }

2.2. DelayedWorkQueue的堆结构

DelayedWorkQueue内部其实是以小顶堆来装载元素,以实现对元素进行从小到大的排序的,装载的元素是RunnableScheduledFuture类型对象,在ScheduledThreadPoolExecutor中则体现为ScheduledFutureTask类型对象;ScheduledFutureTask类在上面已经讲解过,它实现了Comparable接口,compareTo(Delayed other)方法中提供了具体的比较过程,读者可以回顾上面的内容。

读者如对堆的结构不了解,可以阅读文章数据结构——堆结构

2.2.1. 任务入队

DelayedWorkQueue中的元素会根据ScheduledFutureTask任务对象执行时间的先后以及其在初始化时获取的sequenceNumber从小到大进行排序,以便实现任务的先后调度。我们首先关注与任务入队的相关的方法:put(Runnable e)add(Runnable e)offer(Runnable e, long timeout, TimeUnit unit)offer(Runnable x);实际前三个方法内部都调用了offer(Runnable x),它们的源码如下:

  • // 添加任务,内部调用offer()
  • public void put(Runnable e) {
  • offer(e);
  • }
  • // 添加任务,内部调用offer()
  • public boolean add(Runnable e) {
  • return offer(e);
  • }
  • // 添加任务,内部调用offer()
  • public boolean offer(Runnable e, long timeout, TimeUnit unit) {
  • return offer(e);
  • }

因此我们主要需要关注offer(Runnable x)的实现,它的源码如下:

  • // 添加任务
  • public boolean offer(Runnable x) {
  • // 检查任务
  • if (x == null)
  • throw new NullPointerException();
  • //将任务转换为RunnableScheduledFuture对象
  • RunnableScheduledFuture e = (RunnableScheduledFuture)x;
  • // 加锁
  • final ReentrantLock lock = this.lock;
  • lock.lock();
  • try {
  • int i = size;
  • if (i >= queue.length)
  • // 如果任务个数大于等于队列数组容量,进行扩容操作
  • grow();
  • // size加1
  • size = i + 1;
  • if (i == 0) {
  • /**
  • * i为0,即原始size为0,没有任务
  • * 将e放在第0位置
  • */
  • queue[0] = e;
  • // 设置e的heapIndex为0
  • setIndex(e, 0);
  • } else {
  • // 否则调用siftUp()对e进行添加及上浮操作
  • siftUp(i, e);
  • }
  • /**
  • * 如果第0个位置的任务是e,
  • * 表示此时入队的是队首元素,可以将leader清空了
  • * 同时可以通知其他线程可以竞争成为leader了
  • */
  • if (queue[0] == e) {
  • // 将leader置为null
  • leader = null;
  • // 唤醒等待在available上的线程
  • available.signal();
  • }
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • return true;
  • }

offer(Runnable x)方法会首先将传入的Runnable对象强转为RunnableScheduledFuture对象,然后检查queue数组是否已满,如果已满调用grow()方法进行扩容。接下来主要的代码中,会判断任务添加到queue数组中时位置的索引是否是0,如果是0表示当前queue中没有任务,只需要将任务放入queue索引为0的位置即可,然后调用setIndex(e, 0)将任务的heapIndex设置为0,setIndex(RunnableScheduledFuture, int)的源码如下:

  • /**
  • * Set f's heapIndex if it is a ScheduledFutureTask.
  • * 设置ScheduledFutureTask的heapIndex
  • */
  • private void setIndex(RunnableScheduledFuture f, int idx) {
  • // 当传入的f是ScheduledFutureTask类型时,设置其heapIndex
  • if (f instanceof ScheduledFutureTask)
  • ((ScheduledFutureTask)f).heapIndex = idx;
  • }

如果任务添加到queue数组中时位置的索引不是0,表示当前queue数组中已有任务,则调用siftUp(i, e)进行添加;siftUp(int, RunnableScheduledFuture)主要作用是将新的任务添加到queue数组中,然后根据小顶堆的排序方式进行上浮操作,具体代码如下:

  • /**
  • * Sift element added at bottom up to its heap-ordered spot.
  • * Call only when holding lock.
  • * 堆上浮操作
  • * @param k 添加之前的数组大小
  • * @param key 添加的任务
  • */
  • private void siftUp(int k, RunnableScheduledFuture key) {
  • // 循环直到k为0
  • while (k > 0) {
  • // 获取k位置任务的父任务的索引
  • int parent = (k - 1) >>> 1;
  • // 取出父任务
  • RunnableScheduledFuture e = queue[parent];
  • /**
  • * 将key与父任务进行比较,如果key大于等于父任务,
  • * 表示key要在父任务之后或者与父任务同时执行,则跳出循环
  • */
  • if (key.compareTo(e) >= 0)
  • break;
  • /**
  • * 否则key小于父任务,表示key要比父任务早执行
  • * 因此将k位置替换为父任务
  • */
  • queue[k] = e;
  • // 修改父任务的heapIndex为k
  • setIndex(e, k);
  • // 将父任务索引赋值给k,进行下一次循环
  • k = parent;
  • }
  • /**
  • * 走到这里说明key要在父任务之后或者与父任务同时执行
  • * 则已经找到了key所应该在的位置
  • * 将key放在k索引位置
  • */
  • queue[k] = key;
  • // 将key的heapIndex设置为k
  • setIndex(key, k);
  • }

小顶堆的结构是以二叉树构成的,从树的根节点从上往下节点的大小依次递减。当添加了新任务后,它会被放在queue队列的末尾,此时新任务的位置并不是确定的,需要根据任务的执行时间在堆中调整它的位置;调整方式是获取该节点的父节点,然后比较其与父节点的大小,如果比父节点小,就将其与父节点交换位置,重复执行这个操作,直到父节点比自己小,则说明找到合适的位置了。siftUp(int, RunnableScheduledFuture)对堆的操作过程上面的源码的注释讲解得非常清楚了。

2.2.2. 任务出队

DelayedWorkQueue的任务出队操作则相对比较麻烦,它提供了poll()poll(long timeout, TimeUnit unit)take()三个出队方法。其中poll()方法最简单,它是无阻塞模式的出队操作,如果能获取到符合要求的任务就进行出队,否则直接返回null,源码如下:

  • // 取出队首任务
  • public RunnableScheduledFuture poll() {
  • // 加锁
  • final ReentrantLock lock = this.lock;
  • lock.lock();
  • try {
  • // 获取queue索引为0位置的任务为first
  • RunnableScheduledFuture first = queue[0];
  • // 如果first任务为null,或者first任务的延迟时间还大于0,就返回null
  • if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
  • return null;
  • else
  • // 否则调用finishPoll()将first返回,并对堆进行调整
  • return finishPoll(first);
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }

出队的永远是队首任务,因此先获取队首任务为first,然后判断其延迟的时间是否大于0,如果大于0表示还未到执行时间,直接返回null即可。如果first符合出队条件(即已到达执行时间),就调用finishPoll(first)做具体的出队操作,finishPoll(RunnableScheduledFuture f)方法源码如下:

  • /**
  • * Performs common bookkeeping for poll and take: Replaces
  • * first element with last and sifts it down. Call only when
  • * holding lock.
  • * 将第一个任务与最后一个任务进行替换,然后进行下沉操作
  • * @param f the task to remove and return
  • */
  • private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) {
  • int s = --size;
  • // 取出最后一个任务为x
  • RunnableScheduledFuture x = queue[s];
  • // 将最后索引位置上置为null
  • queue[s] = null;
  • // 如果最后索引不是0,表示堆中元素个数大于1
  • if (s != 0)
  • // 将x从堆顶开始做下沉操作
  • siftDown(0, x);
  • // 设置f的heapIndex为-1,并将其返回
  • setIndex(f, -1);
  • return f;
  • }

finishPoll(RunnableScheduledFuture)方法主要用于对堆中剩余元素的顺序进行调整,首先会取得queue数组中的最后一个元素为x,并将数组的该索引位置置为null,当堆中元素个数大于1时(即s != 0),就调用siftDown(0, x)进行堆下沉调整,最后将要出队的任务fheapIndex置为-1后返回。siftDown(int, RunnableScheduledFuture)堆下沉操作的源码如下:

  • /**
  • * Sift element added at top down to its heap-ordered spot.
  • * Call only when holding lock.
  • * 堆下沉操作
  • */
  • private void siftDown(int k, RunnableScheduledFuture key) {
  • // 取数组queue中心位置索引half
  • int half = size >>> 1;
  • /**
  • * 循环直到k大于或等于中心索引half时,
  • * 由于从half开始,节点就不再有孩子节点
  • */
  • while (k < half) {
  • // 获取左孩子节点的索引
  • int child = (k << 1) + 1;
  • // 获取左孩子节点
  • RunnableScheduledFuture c = queue[child];
  • // 获取右孩子节点的索引
  • int right = child + 1;
  • /**
  • * 当右孩子节点的索引小于size时,说明右孩子还存在
  • * 比较左孩子与右孩子的大小
  • * 如果左孩子比右孩子大,表明右孩子比左孩子的任务更早执行
  • * 则将c指向右孩子,child指向right索引
  • * 此时c指向的是更早执行的任务所在的那个孩子节点
  • */
  • if (right < size && c.compareTo(queue[right]) > 0)
  • c = queue[child = right];
  • /**
  • * 比较key与c,如果小于等于0,说明key比c还要更早执行,
  • * 说明此时k就是key应该放在的位置,跳出循环
  • */
  • if (key.compareTo(c) <= 0)
  • break;
  • /**
  • * 否则说明key在c后面执行
  • * 将更早执行的c放在k(即c的父节点)位置
  • */
  • queue[k] = c;
  • // 设置c的heapIndex为k
  • setIndex(c, k);
  • // 将k指向child(此时的右孩子的索引),继续下次循环
  • k = child;
  • }
  • // 将key放在queue的k索引位置上
  • queue[k] = key;
  • // 设置key的heapIndex为k
  • setIndex(key, k);
  • }

finishPoll(RunnableScheduledFuture)方法中调用的siftDown(0, x)为例,x元素是queue数组的最后一个元素,调用siftDown(0, x)即是将x移到队列头,然后逐步下沉。先关注siftDown(int, RunnableScheduledFuture)的while循环体代码,它会根据k获取k位置节点的左孩子节点和右孩子节点的索引,同时找出左孩子节点和右孩子节点中小的那个记为c,然后将传入的RunnableScheduledFuture对象keyc进行对比,如果keyc小,表示key的位置k是合理的,跳出循环即可,否则将c放在索引k位置上,然后将k指向child,继续下一次循环;这种操作能够一直往下直到找到key合适的位置。

poll(long timeout, TimeUnit unit)出队操作则在poll()方法上附加了超时机制,它的源码如下:

  • // 获取队首任务,带有超时机制
  • public RunnableScheduledFuture poll(long timeout, TimeUnit unit)
  • throws InterruptedException {
  • // 计算超时时间
  • long nanos = unit.toNanos(timeout);
  • // 加锁
  • final ReentrantLock lock = this.lock;
  • lock.lockInterruptibly();
  • try {
  • // 无限循环
  • for (;;) {
  • // 获取queue中索引为0位置的任务(即任务)为first
  • RunnableScheduledFuture first = queue[0];
  • /**
  • * 如果first为null,表示此时queue是空的,
  • * 检查剩余的超时时间,如果剩余时间小于等于0,表示超时了,直接返回null
  • * 否则使用available进入超时等待
  • */
  • if (first == null) {
  • if (nanos <= 0)
  • return null;
  • else
  • nanos = available.awaitNanos(nanos);
  • } else {
  • // 走到这里表示取到了first不为null,获取其延迟时间
  • long delay = first.getDelay(TimeUnit.NANOSECONDS);
  • // 如果延迟时间小于等于0,表示该任务应该被执行了
  • if (delay <= 0)
  • // 调用finishPoll()将其返回并对堆进行调整
  • return finishPoll(first);
  • // 走到这里表示first任务的延迟时间还大于0,即还没有到执行时间
  • if (nanos <= 0)
  • // 如果此时超时了,就直接返回null
  • return null;
  • /**
  • * 走到这里表示没有超时,
  • * 如果超时时间小于延迟执行时间delay,
  • * 或者超时时间虽然大于或等于延迟时间,但leader不为null(表示队首已经有任务在等待了),
  • * 如果是就进入超时等待,等待时间为nanos
  • */
  • if (nanos < delay || leader != null)
  • nanos = available.awaitNanos(nanos);
  • else {
  • /**
  • * 走到表示超时时间大于等于延迟时间,且leader为null
  • * 使用leader记录当前线程
  • */
  • Thread thisThread = Thread.currentThread();
  • leader = thisThread;
  • try {
  • // 进入超时等待,超时时间为delay,返回值为未等待的时间(可能中途被中断了,或者被唤醒)
  • long timeLeft = available.awaitNanos(delay);
  • // 等待从超时时间中减去已等待的时间,即为剩余的超时时间
  • nanos -= delay - timeLeft;
  • } finally {
  • /**
  • * 超时等待结束后
  • * 如果leader还为当前线程,就将leader置为null
  • */
  • if (leader == thisThread)
  • leader = null;
  • }
  • }
  • }
  • }
  • } finally {
  • /**
  • * 最终当某个任务被返回后
  • * 如果leader为null(表示此时队首没有任务在超时等待过程中),
  • * 且queue首任务不为null(表示还有任务未执行)时,
  • * 则唤醒等待在available上的线程
  • */
  • if (leader == null && queue[0] != null)
  • available.signal();
  • // 解锁
  • lock.unlock();
  • }
  • }

在附带超时机制的出队操作中,首先也会获取queue数组索引0位置的任务为first,当first为null时,会判断是否超时,如果超时将直接返回null,否则调用available.awaitNanos(nanos)进行超时等待。当first不为null时,如果first的延迟时间小于等于0表示first任务已经到达执行时间,直接将其出队即可,如果还未到达执行时间,将判断出队操作是否超时(通过nanos <= 0),如果超时了就直接返回null,否则会尝试进行等待。

在这一次的等待过程中,会根据超时时间nanos、延迟时间delayleader来判断如何进入等待状态:

  1. nanos小于delay时,直接进入时间长度为nanos的超时等待即可,不会影响任务的延时执行;
  2. nanos大于或等于delay时,如果此时leader不为null,表示其实已经有任务在等待了,此时进入时间长度为nanos的超时等待即可;注意这里虽然等待时长为nanos,可能会大于delay,但leader所在的线程也处于等待状态,也就是说其实前一个任务还没有被执行,当前一个任务解除阻塞顺利出队后,它会唤醒等待在available的线程,所以此处不用担心由于等待时间为nanos而错过了任务的正常执行时间。
  3. nanos大于或等于delay的同时,leader也为null,则使用leader记录当前线程,并且尝试进入时间长度为delay的超时等待(此时等待时长还未nanos就不合理了,会错过任务的正常执行时间),在超时等待结束后,会计算新的超时时间,如果leader还指向当前线程就将leader置为null,并进入下一次循环。

另外需要注意的是,在整个方法最外层的try … finally的的finally块中,会判断leader是否为null,如果leader为null且queue数组中还有任务,就会调用available.signal()唤醒在available上等待最久的线程,也就是说,总会有一个合适的leader会唤醒处于等待状态的线程。

我们再来看看take()方法,它的源码如下:

  • // 获取队首任务
  • public RunnableScheduledFuture take() throws InterruptedException {
  • // 加锁,可中断
  • final ReentrantLock lock = this.lock;
  • lock.lockInterruptibly();
  • try {
  • // 无限循环
  • for (;;) {
  • // 获取queue中索引为0位置的任务(即任务)为first
  • RunnableScheduledFuture first = queue[0];
  • // 如果first为null,表示此时queue是空的,进入等待
  • if (first == null)
  • available.await();
  • else {
  • // 走到这里表示取到了first不为null,获取其延迟时间
  • long delay = first.getDelay(TimeUnit.NANOSECONDS);
  • // 如果延迟时间小于等于0,表示该任务应该被执行了
  • if (delay <= 0)
  • // 调用finishPoll()将其返回并对堆进行调整
  • return finishPoll(first);
  • else if (leader != null)
  • /**
  • * 否则表示任务还没有到执行时间,判断leader是否为null,
  • * 如果leader不为null,表示已经有任务(即leader)在队首进行等待了
  • * 那么就无条件进入等待状态
  • */
  • available.await();
  • else {
  • /**
  • * 否则说明leader为null,表示此时还没有任务在队首等待
  • * 使用leader记录当前线程,然后使当前线程进入超时等待
  • */
  • Thread thisThread = Thread.currentThread();
  • leader = thisThread;
  • try {
  • // 然后进入带有超时机制的等待,超时时间为delay
  • available.awaitNanos(delay);
  • } finally {
  • /**
  • * 超时等待结束后
  • * 如果leader还为当前线程,就将leader置为null
  • */
  • if (leader == thisThread)
  • leader = null;
  • }
  • }
  • }
  • }
  • } finally {
  • /**
  • * 最终当某个任务被返回后
  • * 如果leader为null(表示此时队首没有任务在超时等待过程中),
  • * 且queue首任务不为null(表示还有任务未执行)时,
  • * 则唤醒等待在available上的线程
  • */
  • if (leader == null && queue[0] != null)
  • available.signal();
  • // 解锁
  • lock.unlock();
  • }
  • }

其实理解了poll(long timeout, TimeUnit unit)的实现,take()方法就不在话下了,take()方法就是无超时机制的出队操作,没有符合条件的出队任务,就一直阻塞。需要注意的是,当线程在进入阻塞时,如果leader为null,则会采取超时阻塞,超时时间为任务的延迟时间。具体实现大家可以阅读源码注释,这里不再赘述。

2.2.3. 任务移除

DelayedWorkQueue提供了remove(Object x)方法可以将队列中的任务进行移除,该方法源码如下:

  • // 从queue中移除任务x
  • public boolean remove(Object x) {
  • // 加锁
  • final ReentrantLock lock = this.lock;
  • lock.lock();
  • try {
  • // 查找x在queue中的索引
  • int i = indexOf(x);
  • // 如果返回值小于0表示没找到,直接返回false
  • if (i < 0)
  • return false;
  • /**
  • * 走到此处表示查找到了,i即为x在queue中的索引
  • * 将queue中i位置的任务的heapIndex设置为-1(如果x是ScheduledFutureTask类型的话)
  • */
  • setIndex(queue[i], -1);
  • // 计算移除后的size大小
  • int s = --size;
  • // 获取最后一个任务replacement
  • RunnableScheduledFuture replacement = queue[s];
  • // 将queue中s位置(即最后有任务的位置)上的任务置为null
  • queue[s] = null;
  • /**
  • * 如果s与i不同,表示移除的不是最后一个任务
  • * 此时要将i位置的缺补上
  • * 补缺方式是将最后一个任务放在i位置,然后尝试对其进行下沉或上浮操作
  • * 先尝试下沉,如果下沉结束发现最后一个任务还处于i位置,则表明该任务没有移动
  • * 因此尝试对该任务进行上浮操作
  • */
  • if (s != i) {
  • // 需要将replacement从i位置尝试下沉操作
  • siftDown(i, replacement);
  • /**
  • * 如果下沉操作完成,i位置任务还为replacement
  • * 则表示replacement没有下沉,尝试上浮操作
  • */
  • if (queue[i] == replacement)
  • siftUp(i, replacement);
  • }
  • // 返回true表示移除成功
  • return true;
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }

移除操作的逻辑比较简单,直接通过indexOf(Object)方法找出xqueue中的索引为i,如果能找到表示x的确在queue数组中,只需要将xheapIndex置为-1,然后将queue中最后一个元素放在i位置进行下沉或上浮操作,调整堆顺序即可。源码注释讲解地比较清楚,大家可以自行阅读。

2.3. 超时任务导出


drainTo(Collection<? super Runnable> c)drainTo(Collection<? super Runnable> c, int maxElements)方法用于将队列中的任务导出到集合c中,需要注意的是,这里的导出的并不是全部元素,而只会导出延时时间超时的任务,它们的源码如下:

  • /**
  • * 将队列中的延时过期任务依次转移,直到遇到第一个延时未过期的任务停止
  • * @param c 集合
  • * @return 转移的个数
  • */
  • public int drainTo(Collection<? super Runnable> c) {
  • // 检查参数
  • if (c == null)
  • throw new NullPointerException();
  • if (c == this)
  • throw new IllegalArgumentException();
  • // 加锁
  • final ReentrantLock lock = this.lock;
  • lock.lock();
  • try {
  • RunnableScheduledFuture first;
  • int n = 0;
  • // 不断移除队首延时过期的任务,直到遇到第一个延时未过期的任务停止
  • while ((first = pollExpired()) != null) {
  • // 将移除的任务添加到集合c中
  • c.add(first);
  • // 计数
  • ++n;
  • }
  • // 返回转移的个数
  • return n;
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • /**
  • * 将队列中的延时过期任务依次转移,
  • * 最多转移maxElements个,或者直到遇到第一个延时未过期的任务停止
  • * @param c 集合
  • * @param maxElements 最多转移的个数
  • * @return 转移的个数
  • */
  • public int drainTo(Collection<? super Runnable> c, int maxElements) {
  • // 检查参数
  • if (c == null)
  • throw new NullPointerException();
  • if (c == this)
  • throw new IllegalArgumentException();
  • if (maxElements <= 0)
  • return 0;
  • // 加锁
  • final ReentrantLock lock = this.lock;
  • lock.lock();
  • try {
  • RunnableScheduledFuture first;
  • int n = 0;
  • // 不断移除队首延时过期的任务,直到计数大于等于maxElements,或者遇到第一个延时未过期的任务停止
  • while (n < maxElements && (first = pollExpired()) != null) {
  • // 将移除的任务添加到集合c中
  • c.add(first);
  • // 计数
  • ++n;
  • }
  • // 返回转移的个数
  • return n;
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }

其中pollExpired()正是用于从队首移除超时任务,源码如下:

  • /**
  • * Return and remove first element only if it is expired.
  • * Used only by drainTo. Call only when holding lock.
  • * 当队首任务延时过期了,将其移除;一般用于drainTo()方法中
  • */
  • private RunnableScheduledFuture pollExpired() {
  • // 获取队首任务
  • RunnableScheduledFuture first = queue[0];
  • // 如果队首任务为null,或者不为null但延时时间大于0
  • if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
  • // 返回null
  • return null;
  • // 否则满足条件,将其从堆中移除,调整堆并返回该任务
  • return finishPoll(first);
  • }

2.4. DelayedWorkQueue的遍历

DelayedWorkQueue提供了迭代器进行遍历,实现也比较简单,源码如下:

  • // 迭代器
  • public Iterator<Runnable> iterator() {
  • // 直接以queue转为的数组创建迭代器
  • return new Itr(Arrays.copyOf(queue, size));
  • }
  • /**
  • * Snapshot iterator that works off copy of underlying q array.
  • * 迭代器类
  • */
  • private class Itr implements Iterator<Runnable> {
  • // 记录需要迭代的数组
  • final RunnableScheduledFuture[] array;
  • // 当前需要返回的元素的索引游标
  • int cursor = 0; // index of next element to return
  • // 上一次返回的元素
  • int lastRet = -1; // index of last element, or -1 if no such
  • // 构造方法
  • Itr(RunnableScheduledFuture[] array) {
  • // 将传入的数组进行记录
  • this.array = array;
  • }
  • // 是否还有下一个
  • public boolean hasNext() {
  • return cursor < array.length;
  • }
  • // 获取下一个元素
  • public Runnable next() {
  • // 如果游标大于等于数组长度,表示迭代到最末尾了,抛出异常即可
  • if (cursor >= array.length)
  • throw new NoSuchElementException();
  • // 否则使用lastRet记录当前游标
  • lastRet = cursor;
  • // 然后返回当前游标处的元素
  • return array[cursor++];
  • }
  • // 移除元素
  • public void remove() {
  • // 如果lastRet小于0说明它还没有指向任何元素
  • if (lastRet < 0)
  • throw new IllegalStateException();
  • // 使用DelayedWorkQueue的移除方法移除元素
  • DelayedWorkQueue.this.remove(array[lastRet]);
  • // 移除后将lastRet置为-1
  • lastRet = -1;
  • }
  • }

2.5. DelayedWorkQueue的其他方法

DelayedWorkQueue还提供了一些其他的方法方便对队列进行操作,主要有以下几个:

  • // 查看队首任务
  • public RunnableScheduledFuture peek() {
  • final ReentrantLock lock = this.lock;
  • lock.lock();
  • try {
  • // 返回queue数组中第一个任务(并未取出)
  • return queue[0];
  • } finally {
  • lock.unlock();
  • }
  • }
  • // 清除队列任务
  • public void clear() {
  • // 加锁
  • final ReentrantLock lock = this.lock;
  • lock.lock();
  • try {
  • // 遍历queue数组
  • for (int i = 0; i < size; i++) {
  • // 取出i位置任务为t
  • RunnableScheduledFuture t = queue[i];
  • if (t != null) {
  • // 如果t不为null,将queue的i位置任务置为null
  • queue[i] = null;
  • // 将t的heapIndex置为-1
  • setIndex(t, -1);
  • }
  • }
  • // size置为0
  • size = 0;
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • // 转为数组
  • public Object[] toArray() {
  • // 加锁
  • final ReentrantLock lock = this.lock;
  • lock.lock();
  • try {
  • // 进行拷贝并返回
  • return Arrays.copyOf(queue, size, Object[].class);
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }
  • // 转为数组
  • @SuppressWarnings("unchecked")
  • public <T> T[] toArray(T[] a) {
  • // 加锁
  • final ReentrantLock lock = this.lock;
  • lock.lock();
  • try {
  • // 如果a的容量小于任务总数,将创建一个新的数组,拷贝任务后返回
  • if (a.length < size)
  • return (T[]) Arrays.copyOf(queue, size, a.getClass());
  • // 否则直接进行拷贝,
  • System.arraycopy(queue, 0, a, 0, size);
  • // 如果a的容量大于任务总数,将a中size位置的元素置为null
  • if (a.length > size)
  • a[size] = null;
  • // 返回a
  • return a;
  • } finally {
  • // 解锁
  • lock.unlock();
  • }
  • }

这些方法都比较简单,也不做赘述了。