Java多线程 46 - ScheduledThreadPoolExecutor详解(2)
发布于 / 2018-12-17
简介:ScheduledThreadPoolExecutor用于执行周期性或延时性的定时任务,它是在ThreadPoolExecutor的基础上实现的任务调度线程池,内部使用延时工作队列DelayedWorkQueue实现对任务的延时调度。DelayedWorkQueue内部使用一个初始容量为16的数组来保存任务,容量不够时会按照现有容量的1.5倍进行扩容,最大容量可达Integer.MAX_VALUE。
1. ScheduledThreadPoolExecutor详解
有了对ScheduledFutureTask任务类和DelayedWorkQueue延时等待队列实现的了解基础后,我们就可以对ScheduledThreadPoolExecutor进行分析了。在继续阅读之前,读者首先需要对ThreadPoolExecutor有一定的认识,由于ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,因此很多方法是继承自ThreadPoolExecutor的,在本文的讲解中,对于继承的方法并不会做额外的讲解,如果对ThreadPoolExecutor并不熟悉的读者可以先自行阅读相关的两篇文章:
ScheduledThreadPoolExecutor的类图结构如下:
ScheduledThreadPoolExecutor的方法列表如下:
- // 根据给定的corePoolSize创建一个调度线程池
- public ScheduledThreadPoolExecutor(int corePoolSize);
- // 根据给定的corePoolSize和线程工厂创建一个调度线程池
- public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory);
- // 根据给定的corePoolSize和拒绝策略处理器创建一个调度线程池
- public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler);
- // 根据给定的corePoolSize、线程工厂和拒绝策略处理器创建一个调度线程池
- public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler);
- // 延迟执行任务command,延迟时间由delay和unit决定;只执行一次
- public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
- // 延迟执行会产生执行结果任务command,延迟时间由delay和unit决定;只执行一次
- public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
- /**
- * 该方法在initialDelay时长后第一次执行任务,以后每隔period时长,再次执行任务。
- * 注意,period是从任务开始执行算起的。
- * 开始执行任务后,定时器每隔period时长检查该任务是否完成,
- * 如果完成则再次启动任务,否则等该任务结束后才再次启动任务
- */
- public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
- /**
- * 该方法在initialDelay时长后第一次执行任务,
- * 以后每当任务执行完成后,等待delay时长,再次执行任务
- */
- public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
- // 执行任务command
- public void execute(Runnable command);
- // 执行任务task
- public Future<?> submit(Runnable task);
- // 执行任务task
- public <T> Future<T> submit(Runnable task, T result);
- // 执行任务task
- public <T> Future<T> submit(Callable<T> task);
- // 设置周期性任务在线程池关闭状态下是否继续运行的策略
- public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value);
- // 获取周期性任务在线程池关闭状态下是否继续运行的策略
- public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy();
- // 设置延时性任务在线程池关闭状态下是否继续运行的策略
- public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value);
- // 获取延时性任务在线程池关闭状态下是否继续运行的策略
- public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy();
- // 设置在取消任务时是否将任务移除的策略
- public void setRemoveOnCancelPolicy(boolean value);
- // 获取在取消任务时是否将任务移除的策略
- public boolean getRemoveOnCancelPolicy();
- // 关闭线程池
- public void shutdown();
- // 关闭线程池
- public List<Runnable> shutdownNow();
- // 获取任务队列
- public BlockingQueue<Runnable> getQueue();
2. ScheduledThreadPoolExecutor的构造方法
我们先查看ScheduledThreadPoolExecutor的构造方法,主要有以下几个:
- // 根据给定的corePoolSize创建一个调度线程池
- public ScheduledThreadPoolExecutor(int corePoolSize);
- // 根据给定的corePoolSize和线程工厂创建一个调度线程池
- public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory);
- // 根据给定的corePoolSize和拒绝策略处理器创建一个调度线程池
- public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler);
- // 根据给定的corePoolSize、线程工厂和拒绝策略处理器创建一个调度线程池
- public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler);
从构造方法的声明来看,提供的几个重载无非是让核心线程数corePoolSize
、线程工厂threadFactory
即拒绝策略处理器handler
可以自由配置,这几个构造方法内部无一例外地调用的父类的构造方法,等待队列统一使用DelayedWorkQueue,最大线程数统一为Integer.MAX_VALUE,核心线程空闲时保持活跃的时间统一为0;以最后一个构造方法为例,它的源码如下:
- // 根据给定的corePoolSize、线程工厂和拒绝策略处理器创建一个调度线程池
- public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
- super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler);
- }
3. 任务的添加
ScheduledThreadPoolExecutor对任务的添加提供了大量的方法,其中有一部分是重写ThreadPoolExecutor的,有一部分是重写AbstractExecutorService类的,还有一部分则是实现ScheduledExecutorService接口提供的,有以下几个:
- 重写ThreadPoolExecutor类的添加方法:
- // 执行任务command
- public void execute(Runnable command) {
- schedule(command, 0, TimeUnit.NANOSECONDS);
- }
- 重写AbstractExecutorService类的添加方法:
- // 执行任务task
- public Future<?> submit(Runnable task) {
- return schedule(task, 0, TimeUnit.NANOSECONDS);
- }
- // 执行任务task
- public <T> Future<T> submit(Runnable task, T result) {
- return schedule(Executors.callable(task, result), 0, TimeUnit.NANOSECONDS);
- }
- // 执行任务task
- public <T> Future<T> submit(Callable<T> task) {
- return schedule(task, 0, TimeUnit.NANOSECONDS);
- }
- 实现ScheduledExecutorService接口提供的方法:
- // 延迟执行任务command,延迟时间由delay和unit决定,只执行一次
- public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
- // 检查参数
- if (command == null || unit == null)
- throw new NullPointerException();
- /**
- * 使用triggerTime()计算执行时间,创建一个RunnableScheduledFuture对象
- * 默认的decorateTask()方法什么都没做,直接将传入的ScheduledFutureTask返回了
- */
- RunnableScheduledFuture<?> t = decorateTask(command,
- new ScheduledFutureTask<Void>(command, null,
- triggerTime(delay, unit)));
- // 延迟执行任务
- delayedExecute(t);
- return t;
- }
- // 延迟执行会产生执行结果任务command,延迟时间由delay和unit决定;只执行一次
- public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
- // 检查参数
- if (callable == null || unit == null)
- throw new NullPointerException();
- /**
- * 使用triggerTime()计算执行时间,创建一个RunnableScheduledFuture对象
- * 默认的decorateTask()方法什么都没做,直接将传入的ScheduledFutureTask返回了
- */
- RunnableScheduledFuture<V> t = decorateTask(callable,
- new ScheduledFutureTask<V>(callable,
- triggerTime(delay, unit)));
- // 延迟执行任务
- delayedExecute(t);
- return t;
- }
- /**
- * 该方法在initialDelay时长后第一次执行任务,以后每隔period时长,再次执行任务。
- * 注意,period是从任务开始执行算起的。
- * 开始执行任务后,定时器每隔period时长检查该任务是否完成,
- * 如果完成则再次启动任务,否则等该任务结束后才再次启动任务
- */
- public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
- // 检查参数
- if (command == null || unit == null)
- throw new NullPointerException();
- if (period <= 0)
- throw new IllegalArgumentException();
- // 使用triggerTime()计算执行时间,创建一个ScheduledFutureTask对象
- ScheduledFutureTask<Void> sft =
- new ScheduledFutureTask<Void>(command, null,
- triggerTime(initialDelay, unit), unit.toNanos(period));
- // 使用decorateTask()对ScheduledFutureTask对象进行装饰
- RunnableScheduledFuture<Void> t = decorateTask(command, sft);
- sft.outerTask = t;
- // 执行任务
- delayedExecute(t);
- return t;
- }
- /**
- * 该方法在initialDelay时长后第一次执行任务,
- * 以后每当任务执行完成后,等待delay时长,再次执行任务
- */
- public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
- // 检查参数
- if (command == null || unit == null)
- throw new NullPointerException();
- if (delay <= 0)
- throw new IllegalArgumentException();
- // 使用triggerTime()计算执行时间,创建一个ScheduledFutureTask对象
- ScheduledFutureTask<Void> sft =
- new ScheduledFutureTask<Void>(command, null,
- triggerTime(initialDelay, unit), unit.toNanos(-delay));
- RunnableScheduledFuture<Void> t = decorateTask(command, sft);
- sft.outerTask = t;
- delayedExecute(t);
- return t;
- }
从源码可以得知,前面的两类重写都调用了ScheduledExecutorService接口提供schedule(Runnable command, long delay, TimeUnit unit)
方法,因此我们主要考察ScheduledExecutorService接口提供四个被重写的方法。
从源码可知,这四个方法的具体代码其实都大同小异,首先都对参数进行了检查,然后根据参数创建了一个ScheduledFutureTask任务,并使用decorateTask(Runnable, RunnableScheduledFuture<V>)
或decorateTask(Callable<V>, RunnableScheduledFuture<V>)
方法对任务进行了装饰,最后调用delayedExecute(t)
执行任务并返回;不同的是,后两个方法中使用sft.outerTask = t
记录了被装饰后的任务。ScheduledFutureTask的outerTask
其实在之前提到过,它主要用于重复任务的辅助执行。正因为这一行代码的不一样,让前面两个方法和后面两个方法的执行策略变得不一样:
schedule(Runnable command, long delay, TimeUnit unit)
和schedule(Callable<V> callable, long delay, TimeUnit unit)
对任务只会执行一次;scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
和scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
会重复执行,除非任务被取消或线程池被关闭。
注:ScheduledThreadPoolExecutor中的
decorateTask(Runnable, RunnableScheduledFuture<V>)
和decorateTask(Callable<V>, RunnableScheduledFuture<V>)
两个装饰方法未做任何处理,直接返回了传入的RunnableScheduledFuture对象;这个方法是提供给开发者重写的,在自定义继承自ScheduledThreadPoolExecutor的定时调度线程池类中,可以通过重写这两个方法对任务进行特定装饰。
弄清了这几个方法的逻辑,接下来需要分析的重心自然是delayedExecute(RunnableScheduledFuture<?> task)
方法了,它的源码如下:
- /**
- * Main execution method for delayed or periodic tasks. If pool
- * is shut down, rejects the task. Otherwise adds task to queue
- * and starts a thread, if necessary, to run it. (We cannot
- * prestart the thread to run the task because the task (probably)
- * shouldn't be run yet,) If the pool is shut down while the task
- * is being added, cancel and remove it if required by state and
- * run-after-shutdown parameters.
- *
- * @param task the task
- */
- private void delayedExecute(RunnableScheduledFuture<?> task) {
- if (isShutdown())
- // 如果线程池关闭了就拒绝任务
- reject(task);
- else {
- // 将任务添加到等待队列中
- super.getQueue().add(task);
- /**
- * 如果线程池关闭,或者当前状态无法执行任务
- * 就将任务从等待队列中移除
- */
- if (isShutdown() &&
- !canRunInCurrentRunState(task.isPeriodic()) &&
- remove(task))
- // 任务从等待队列移除成功后,取消任务
- task.cancel(false);
- else
- // 走到这里表示可以执行,则预启动Worker线程
- ensurePrestart();
- }
- }
delayedExecute(RunnableScheduledFuture<?>)
方法首先判断线程池是否关闭,如果关闭将使用拒绝策略处理器拒绝任务的提交,否则将会把任务添加到等待队列中,然后重新检查线程池状态,以确保可以运行任务,如果无法运行就将任务从队列移除,然后取消任务的执行,如果可以运行任务将预启动Worker线程。
在上面代码中,super.getQueue().add(task)
获取的队列即是在实例化ScheduledThreadPoolExecutor线程池时创建的DelayedWorkQueue队列;从这行代码可以看出,与ThreadPoolExecutor的操作不同,ScheduledThreadPoolExecutor一开始会将任务都放入等待队列中,通过DelayedWorkQueue的特性以保证任务都具有延时执行的特性。
isShutdown()
、remove(Runnable)
和ensurePrestart()
都是父类ThreadPoolExecutor的方法,在ThreadPoolExecutor相关的文章中介绍过;这里主要关注canRunInCurrentRunState(boolean)
方法的用处,它的源码如下:
- /**
- * Returns true if can run a task given current run state
- * and run-after-shutdown parameters.
- *
- * 根据任务类型来判断任务在当前状态下是否可以运行
- *
- * @param periodic true if this task periodic, false if delayed
- */
- boolean canRunInCurrentRunState(boolean periodic) {
- /**
- * continueExistingPeriodicTasksAfterShutdown默认是false,是控制周期性任务的
- * executeExistingDelayedTasksAfterShutdown默认是true,是控制延时性任务的
- */
- return isRunningOrShutdown(periodic ?
- continueExistingPeriodicTasksAfterShutdown :
- executeExistingDelayedTasksAfterShutdown);
- }
这里会根据periodic
参数的不同向isRunningOrShutdown(boolean shutdownOK)
方法传入不同的参数,continueExistingPeriodicTasksAfterShutdown
和executeExistingDelayedTasksAfterShutdown
定义于ScheduledThreadPoolExecutor中:
- /**
- * False if should cancel/suppress periodic tasks on shutdown.
- * 当该值为false时,表示应该在线程池关闭时取消周期性任务的执行
- */
- private volatile boolean continueExistingPeriodicTasksAfterShutdown;
- /**
- * False if should cancel non-periodic tasks on shutdown.
- * 当该值为false时,表示应该在线程池关闭时取消延迟性任务的执行
- */
- private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
这两个参数代表两种策略,分别控制是否应该在线程池关闭时取消周期性任务的执行及是否应该在线程池关闭时取消延迟性任务的执行。isRunningOrShutdown(boolean shutdownOK)
方法继承自父类ThreadPoolExecutor,这里回顾一下:
- /**
- * State check needed by ScheduledThreadPoolExecutor to
- * enable running tasks during shutdown.
- * <p>
- * 判断runState是否是RUNNING或者SHUTDOWN
- * <p>
- * shutdownOK参数用于表示是否在状态为SHUTDOWN是应该返回true
- *
- * @param shutdownOK true if should return true if SHUTDOWN
- */
- final boolean isRunningOrShutdown(boolean shutdownOK) {
- int rs = runStateOf(ctl.get());
- return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
- }
因此canRunInCurrentRunState(boolean)
方法的用处其实很简单,就是用于决定如果线程池处于SHUTDOWN状态,是否可以执行当前的定时任务。
4. 任务的执行
当ScheduledThreadPoolExecutor将添加的任务都放入DelayedWorkQueue队列中后,如果线程池符合运行状态,会调用ensurePrestart()
预启动Worker线程,这个方法位于父类ThreadPoolExecutor中,我们回顾一下:
- /**
- * Same as prestartCoreThread except arranges that at least one
- * thread is started even if corePoolSize is 0.
- */
- void ensurePrestart() {
- // 获取workerCount
- int wc = workerCountOf(ctl.get());
- // 如果workerCount小于corePoolSize
- if (wc < corePoolSize)
- // 添加核心线程
- addWorker(null, true);
- else if (wc == 0)
- // 添加非核心线程
- addWorker(null, false);
- }
该方法会根据当前Worker数量及corePoolSize
的大小添加Worker线程,读过ThreadPoolExecutor讲解的读者应该清楚,在添加Worker线程后,它最终会启动Worker自线程,即调用Worker的run()
方法,而run()
方法则会调用ThreadPoolExecutor的runWorker(Worker w)
方法并将Worker自己传入,回顾以下runWorker(Worker w)
的源码:
- final void runWorker(Worker w) {
- // 引用worker的firstTask任务,并清除worker的firstTask
- Runnable task = w.firstTask;
- w.firstTask = null;
- // 用于标识worker是不是因异常而死亡
- boolean completedAbruptly = true;
- try {
- // worker取任务执行
- while (task != null || (task = getTask()) != null) {
- // 加锁
- w.lock();
- clearInterruptsForTaskRun();
- try {
- // 执行beforeExecute()钩子方法
- beforeExecute(w.thread, task);
- // 用于记录运行过程中的异常
- Throwable thrown = null;
- try {
- // 执行任务
- task.run();
- } catch (RuntimeException x) {
- thrown = x;
- throw x;
- } catch (Error x) {
- thrown = x;
- throw x;
- } catch (Throwable x) {
- thrown = x;
- throw new Error(x);
- } finally {
- // 执行afterExecute()钩子方法
- afterExecute(task, thrown);
- }
- } finally {
- // 将执行完的任务清空
- task = null;
- // 将worker的完成任务数加1
- w.completedTasks++;
- // 解锁
- w.unlock();
- }
- }
- // 运行到这里表示运行过程中没有出现异常
- completedAbruptly = false;
- } finally {
- // 调用processWorkerExit()方法处理Worker的后续清理和退出流程
- processWorkerExit(w, completedAbruptly);
- }
- }
从源码可知,该方法的while循环条件中会不断取出Worker的firstTask
,如果firstTask
为null就会调用getTask()
方法获取Runnable任务对象并调用其run()
方法以实现任务的运行。在ScheduledThreadPoolExecutor中,由于所有Worker都是预创建的,firstTask
都为null,因此会调用getTask()
方法获取任务;而getTask()
方法则实现了从等待队列中获取任务,回顾源码如下:
- private Runnable getTask() {
- // 用于记录poll()方法是否超时
- boolean timedOut = false; // Did the last poll() time out?
- retry:
- // 无限循环
- for (; ; ) {
- // 获取ctl和runState
- int c = ctl.get();
- int rs = runStateOf(c);
- // 检查状态
- // Check if queue empty only if necessary.
- if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
- /**
- * 如果runState为STOP、TIDYING或TERMINATED
- * 或者runState为SHUTDOWN,且workerQueue为空
- * 将workerCount减1,返回null结束运行
- */
- decrementWorkerCount();
- return null;
- }
- // 记录worker是否能够被移除
- boolean timed; // Are workers subject to culling?
- // 无限循环
- for (; ; ) {
- // 获取workerCount
- int wc = workerCountOf(c);
- /**
- * 判断当前Worker是否可以被移除,即当前Worker是否可以一直等待任务。
- * 如果allowCoreThreadTimeOut为true,或者workerCount大于核心线程数,
- * 则当前线程是有超时时间的(keepAliveTime),无法一直等待任务
- */
- timed = allowCoreThreadTimeOut || wc > corePoolSize;
- /**
- * workerCount小于等于核心线程数,且没有超时,则跳出内层循环
- */
- if (wc <= maximumPoolSize && !(timedOut && timed))
- break;
- // 否则表示已超时,将workerCount减1,如果成功直接返回null
- if (compareAndDecrementWorkerCount(c))
- return null;
- // 走到这里说明上一步workerCount减1失败了,重新读取ctl
- c = ctl.get(); // Re-read ctl
- // 如果与之前的runState不同,表示线程池状态发生改变了,跳出到外层循环重试
- if (runStateOf(c) != rs)
- continue retry;
- // else CAS failed due to workerCount change; retry inner loop
- }
- try {
- /**
- * 根据线程是否会超时来分别调用相应的方法,
- * poll()方法附带超时机制;take()方法没有超时机制,并且可能会阻塞等待
- */
- Runnable r = timed ?
- workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
- workQueue.take();
- // 如果获取到的任务不为null则返回
- if (r != null)
- return r;
- // 走到这里表示获取操作超时了
- timedOut = true;
- } catch (InterruptedException retry) {
- // 被中断,可能是超时等待过程中被中断了
- timedOut = false;
- }
- }
- }
这个方法在ThreadPoolExecutor的文章中详细讲解过,这里不赘述。我们只需要知道的是,在ScheduledThreadPoolExecutor中,等待队列workQueue
就是之前装入任务的DelayedWorkQueue队列,而此处任务的出队操作,自然是调用了DelayedWorkQueue实例的take()
或poll(long timeout, TimeUnit unit)
方法,其实这里由于keepAliveTime
置为0了,因此并不会调用poll(long timeout, TimeUnit unit)
方法。
而任务的延时执行,正是由DelayedWorkQueue的出队操作take()
实现的,它会根据任务的延迟时间决定是否可以出队,从而达到延迟执行任务,这部分代码在之前讲解DelayedWorkQueue时已经分析过了。
我们知道,要执行的任务在添加到等待队列中时都被包装为了一个ScheduledFutureTask对象,因此在任务出队后真正开始执行的时候,是调用了ScheduledFutureTask的run()
方法的,该方法的源码如下:
- /**
- * Overrides FutureTask version so as to reset/requeue if periodic.
- * 任务的具体执行
- */
- public void run() {
- // 任务是否是周期性的
- boolean periodic = isPeriodic();
- // 判断当前周期性任务是否可以执行
- if (!canRunInCurrentRunState(periodic))
- // 如果不能执行,就取消当前任务
- cancel(false);
- else if (!periodic)
- // 如果不是周期性任务,直接执行即可
- ScheduledFutureTask.super.run();
- else if (ScheduledFutureTask.super.runAndReset()) {
- // 否则执行任务并重置,然后计算并设置下次执行时间
- setNextRunTime();
- // 重新添加下一次任务,即将当前ScheduledFutureTask重新入队
- reExecutePeriodic(outerTask);
- }
- }
该方法会先检查线程池当前的运行状态是否符合执行任务的要求,如果符合就根据任务是否是重复性任务来决定不同的运行策略。
先观察非重复性任务的执行,会使用ScheduledFutureTask.super.run()
直接调用父类的run()
方法,即FutureTask的run()
方法,FutureTask在之前讲解过了,这个方法最终会调用到我们当时添加的任务的run()
方法。
而重复性任务的执行,会调用ScheduledFutureTask.super.runAndReset()
执行任务,这个方法也是FutureTask的,之前也有将结果,最终一样会调用到我们当时添加的任务的run()
方法。不同的是,该方法会在执行完任务后对线程池状态进行重置,如果任务执行成功且线程池状态重置成功会返回true继而执行else if分支内的代码,如果任务执行过程中抛出了异常会返回false,则不会执行。
5. 重复性任务的执行
根据上面的讲解,最后一个else if分支内的代码则是实现重复性任务的新一次的执行,当任务执行完成并线程池状态重置成功后,会调用setNextRunTime()
计算并设置任务的下一次执行执行时间,然后调用reExecutePeriodic(outerTask)
重新添加下一次任务,该方法源码如下:
- /**
- * Requeues a periodic task unless current run state precludes it.
- * Same idea as delayedExecute except drops task rather than rejecting.
- *
- * 周期性任务的重新入队
- *
- * @param task the task
- */
- void reExecutePeriodic(RunnableScheduledFuture<?> task) {
- // 判断当前线程池状态是否可以运行周期性任务
- if (canRunInCurrentRunState(true)) {
- // 任务重新入队
- super.getQueue().add(task);
- /**
- * 重新检查当前线程池状态是否可以运行周期性任务,如果不行就从队列中移除任务
- * 并取消任务的执行
- */
- if (!canRunInCurrentRunState(true) && remove(task))
- task.cancel(false);
- else
- // 如果可以运行就预启动Worker线程
- ensurePrestart();
- }
- }
这里的outerTask
在提交重复性任务的时候直接指向了装饰后的任务本身,大家可以回顾之前的代码。该方法也非常简单,在线程池状态符合条件时将任务重新入队,并进行重新检查,如果符合运行条件将预启动Worker线程,接下来任务的运行就和上面讲解的一样了:DelayedWorkQueue队列中任务的延迟出队、出队后执行任务、任务执行完后再次添加新一次的任务,循环往复以实现任务的重复执行。
6. 任务的取消
在线程池状态不满足任务运行时,任务会从等待队列中移除,并且调用任务的cancel(boolean mayInterruptIfRunning)
方法取消运行,该方法源码如下:
- // 取消任务的执行
- public boolean cancel(boolean mayInterruptIfRunning) {
- // 调用父类方法取消任务的执行
- boolean cancelled = super.cancel(mayInterruptIfRunning);
- // 如果取消成功,且removeOnCancel策略为true,任务的heapIndex大于等于0
- if (cancelled && removeOnCancel && heapIndex >= 0)
- // 则将任务从队列中移除
- remove(this);
- // 返回是否取消成功的结果
- return cancelled;
- }
代码比较简单,调用了父类FutureTask的cancel(boolean mayInterruptIfRunning)
方法,内部会根据传入参数决定是否对任务线程进行中断。在任务成功取消后,会根据removeOnCancel
和heapIndex
决定是否将任务从队列中移除。这里讲解一下removeOnCancel
成员变量,它属于ScheduledThreadPoolExecutor类,用于控制任务在取消时是否从队列移除,默认为false,定义如下:
- /**
- * True if ScheduledFutureTask.cancel should remove from queue
- * 当该值为true时,表示ScheduledFutureTask的cancel()方法应该将ScheduledFutureTask从队列中移除
- */
- private volatile boolean removeOnCancel = false;
7. 线程池关闭
ScheduledThreadPoolExecutor重写了ThreadPoolExecutor提供的钩子方法onShutdown()
,用于在线程池关闭时对任务进行策略性清理,源码如下:
- /**
- * Cancels and clears the queue of all tasks that should not be run
- * due to shutdown policy. Invoked within super.shutdown.
- *
- * 根据关闭策略在线程池关闭时取消及清理队列中的任务
- * 这是ThreadPoolExecutor中的一个钩子方法,会在shutdown()方法中执行
- * 即在线程池关闭后,该方法会被调用
- */
- @Override void onShutdown() {
- // 获取等待队列
- BlockingQueue<Runnable> q = super.getQueue();
- // 获取对延时性任务和周期性任务的处理方式
- boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy();
- boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy();
- if (!keepDelayed && !keepPeriodic) {
- /**
- * 如果对延时性任务和周期性任务都不做保留
- * 则遍历整个等待队列,取消队列中的任务
- */
- for (Object e : q.toArray())
- // 如果任务是RunnableScheduledFuture类型
- if (e instanceof RunnableScheduledFuture<?>)
- // 取消任务的执行
- ((RunnableScheduledFuture<?>) e).cancel(false);
- // 清理队列
- q.clear();
- }
- else {
- // Traverse snapshot to avoid iterator exceptions
- /**
- * 走到这里说明keepDelayed和keepPeriodic其中有一个或两个为true
- * 遍历等待队列的所有任务
- */
- for (Object e : q.toArray()) {
- // 如果任务是RunnableScheduledFuture类型
- if (e instanceof RunnableScheduledFuture) {
- // 强转
- RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e;
- /**
- * 根据任务情况进行相应的判断
- * 如果是周期性任务,根据keepPeriodic来判断
- * 如果是延时性任务,根据keepDelayed来判断
- * 或者任务已经被取消了,也应该从队列中移除
- */
- if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) || t.isCancelled()) { // also remove if already cancelled
- // 从队列中移除任务
- if (q.remove(t))
- // 移除成功后对任务进行取消
- t.cancel(false);
- }
- }
- }
- }
- // 尝试终止线程池
- tryTerminate();
- }
该方法实现中,会根据continueExistingPeriodicTasksAfterShutdown
和executeExistingDelayedTasksAfterShutdown
两个策略决定是否取消和清理相应的任务。
8. 其他方法
至此,其实ScheduledThreadPoolExecutor的核心方法都介绍完了,下面是它的一些辅助方法,非常简单:
- /**
- * Sets the policy on whether to continue executing existing
- * periodic tasks even when this executor has been {@code shutdown}.
- * In this case, these tasks will only terminate upon
- * {@code shutdownNow} or after setting the policy to
- * {@code false} when already shutdown.
- * This value is by default {@code false}.
- *
- * 设置周期性任务在线程池关闭状态下是否继续运行的策略
- *
- * @param value if {@code true}, continue after shutdown, else don't.
- * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
- */
- public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
- continueExistingPeriodicTasksAfterShutdown = value;
- // 如果设置的为false,且线程池关闭了
- if (!value && isShutdown())
- // 手动调用钩子方法
- onShutdown();
- }
- /**
- * Gets the policy on whether to continue executing existing
- * periodic tasks even when this executor has been {@code shutdown}.
- * In this case, these tasks will only terminate upon
- * {@code shutdownNow} or after setting the policy to
- * {@code false} when already shutdown.
- * This value is by default {@code false}.
- *
- * 获取周期性任务在线程池关闭状态下是否继续运行的策略
- *
- * @return {@code true} if will continue after shutdown
- * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
- */
- public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
- return continueExistingPeriodicTasksAfterShutdown;
- }
- /**
- * Sets the policy on whether to execute existing delayed
- * tasks even when this executor has been {@code shutdown}.
- * In this case, these tasks will only terminate upon
- * {@code shutdownNow}, or after setting the policy to
- * {@code false} when already shutdown.
- * This value is by default {@code true}.
- *
- * 设置延时性任务在线程池关闭状态下是否继续运行的策略
- *
- * @param value if {@code true}, execute after shutdown, else don't.
- * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
- */
- public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
- executeExistingDelayedTasksAfterShutdown = value;
- // 如果设置的为false,且线程池关闭了
- if (!value && isShutdown())
- // 手动调用钩子方法
- onShutdown();
- }
- /**
- * Gets the policy on whether to execute existing delayed
- * tasks even when this executor has been {@code shutdown}.
- * In this case, these tasks will only terminate upon
- * {@code shutdownNow}, or after setting the policy to
- * {@code false} when already shutdown.
- * This value is by default {@code true}.
- *
- * 获取延时性任务在线程池关闭状态下是否继续运行的策略
- *
- * @return {@code true} if will execute after shutdown
- * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
- */
- public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
- return executeExistingDelayedTasksAfterShutdown;
- }
- /**
- * Sets the policy on whether cancelled tasks should be immediately
- * removed from the work queue at time of cancellation. This value is
- * by default {@code false}.
- *
- * 设置在取消任务时是否将任务移除的策略
- *
- * @param value if {@code true}, remove on cancellation, else don't
- * @see #getRemoveOnCancelPolicy
- * @since 1.7
- */
- public void setRemoveOnCancelPolicy(boolean value) {
- removeOnCancel = value;
- }
- /**
- * Gets the policy on whether cancelled tasks should be immediately
- * removed from the work queue at time of cancellation. This value is
- * by default {@code false}.
- *
- * 获取在取消任务时是否将任务移除的策略
- *
- * @return {@code true} if cancelled tasks are immediately removed
- * from the queue
- * @see #setRemoveOnCancelPolicy
- * @since 1.7
- */
- public boolean getRemoveOnCancelPolicy() {
- return removeOnCancel;
- }
- /**
- * Initiates an orderly shutdown in which previously submitted
- * tasks are executed, but no new tasks will be accepted.
- * Invocation has no additional effect if already shut down.
- *
- * <p>This method does not wait for previously submitted tasks to
- * complete execution. Use {@link #awaitTermination awaitTermination}
- * to do that.
- *
- * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
- * has been set {@code false}, existing delayed tasks whose delays
- * have not yet elapsed are cancelled. And unless the {@code
- * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
- * {@code true}, future executions of existing periodic tasks will
- * be cancelled.
- *
- * @throws SecurityException {@inheritDoc}
- */
- public void shutdown() {
- super.shutdown();
- }
- /**
- * Attempts to stop all actively executing tasks, halts the
- * processing of waiting tasks, and returns a list of the tasks
- * that were awaiting execution.
- *
- * <p>This method does not wait for actively executing tasks to
- * terminate. Use {@link #awaitTermination awaitTermination} to
- * do that.
- *
- * <p>There are no guarantees beyond best-effort attempts to stop
- * processing actively executing tasks. This implementation
- * cancels tasks via {@link Thread#interrupt}, so any task that
- * fails to respond to interrupts may never terminate.
- *
- * @return list of tasks that never commenced execution.
- * Each element of this list is a {@link ScheduledFuture},
- * including those tasks submitted using {@code execute},
- * which are for scheduling purposes used as the basis of a
- * zero-delay {@code ScheduledFuture}.
- * @throws SecurityException {@inheritDoc}
- */
- public List<Runnable> shutdownNow() {
- return super.shutdownNow();
- }
- /**
- * Returns the task queue used by this executor. Each element of
- * this queue is a {@link ScheduledFuture}, including those
- * tasks submitted using {@code execute} which are for scheduling
- * purposes used as the basis of a zero-delay
- * {@code ScheduledFuture}. Iteration over this queue is
- * <em>not</em> guaranteed to traverse tasks in the order in
- * which they will execute.
- *
- * @return the task queue
- */
- public BlockingQueue<Runnable> getQueue() {
- return super.getQueue();
- }
9. ScheduledThreadPoolExecutor源码完整注释
下面是ScheduledThreadPoolExecutor源码的完整注释版本:
- package java.util.concurrent;
- import java.util.concurrent.atomic.*;
- import java.util.concurrent.locks.*;
- import java.util.*;
- public class ScheduledThreadPoolExecutor
- extends ThreadPoolExecutor
- implements ScheduledExecutorService {
- /**
- * False if should cancel/suppress periodic tasks on shutdown.
- * 当该值为false时,表示应该在线程池关闭时取消周期性任务的执行
- */
- private volatile boolean continueExistingPeriodicTasksAfterShutdown;
- /**
- * False if should cancel non-periodic tasks on shutdown.
- * 当该值为false时,表示应该在线程池关闭时取消延迟性任务的执行
- */
- private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
- /**
- * True if ScheduledFutureTask.cancel should remove from queue
- * 当该值为true时,表示ScheduledFutureTask的cancel()方法应该将ScheduledFutureTask从队列中移除
- */
- private volatile boolean removeOnCancel = false;
- /**
- * Sequence number to break scheduling ties, and in turn to
- * guarantee FIFO order among tied entries.
- * 用生成记录周期任务序号的原子类
- */
- private static final AtomicLong sequencer = new AtomicLong(0);
- /**
- * Returns current nanosecond time.
- * 返回当前纳秒时间
- */
- final long now() {
- return System.nanoTime();
- }
- private class ScheduledFutureTask<V>
- extends FutureTask<V>
- implements RunnableScheduledFuture<V> {
- /**
- * Sequence number to break ties FIFO
- * 任务在队列中的序号
- * */
- private final long sequenceNumber;
- /**
- * The time the task is enabled to execute in nanoTime units
- * 任务应该执行的时间,以纳秒计算
- * */
- private long time;
- /**
- * Period in nanoseconds for repeating tasks. A positive
- * value indicates fixed-rate execution. A negative value
- * indicates fixed-delay execution. A value of 0 indicates a
- * non-repeating task.
- *
- * 任务重复执行的时间间隔,以纳秒计算
- * 当其为正值时,表示固定速率执行
- * 当其为负值时,表示固定延迟执行
- * 当其为0时,表示是非重复任务
- */
- private final long period;
- /** The actual task to be re-enqueued by reExecutePeriodic */
- RunnableScheduledFuture<V> outerTask = this;
- /**
- * Index into delay queue, to support faster cancellation.
- */
- int heapIndex;
- /**
- * Creates a one-shot action with given nanoTime-based trigger time.
- * 根据给定开始时间,创建仅执行一次的任务
- */
- ScheduledFutureTask(Runnable r, V result, long ns) {
- super(r, result);
- this.time = ns;
- this.period = 0;
- this.sequenceNumber = sequencer.getAndIncrement();
- }
- /**
- * Creates a periodic action with given nano time and period.
- * 根据给定开始时间和间隔时间,创建周期性任务
- */
- ScheduledFutureTask(Runnable r, V result, long ns, long period) {
- super(r, result);
- this.time = ns;
- this.period = period;
- this.sequenceNumber = sequencer.getAndIncrement();
- }
- /**
- * Creates a one-shot action with given nanoTime-based trigger.
- * 根据给定开始时间,创建仅执行一次的任务
- */
- ScheduledFutureTask(Callable<V> callable, long ns) {
- super(callable);
- this.time = ns;
- this.period = 0;
- this.sequenceNumber = sequencer.getAndIncrement();
- }
- // 获取ScheduledFutureTask任务剩余的延迟时间
- public long getDelay(TimeUnit unit) {
- return unit.convert(time - now(), TimeUnit.NANOSECONDS);
- }
- /**
- * 与其他ScheduledFutureTask任务进行比较
- * 主要通过time成员变量来比较
- */
- public int compareTo(Delayed other) {
- // 判断二者是否是同一个对象,如果是则返回0
- if (other == this) // compare zero ONLY if same object
- return 0;
- // 判断传入的other对象是否是ScheduledFutureTask类型
- if (other instanceof ScheduledFutureTask) {
- // 强转
- ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
- // 计算二者时间差
- long diff = time - x.time;
- // 当前任务在other任务之前执行
- if (diff < 0)
- return -1;
- // 当前任务在other任务之后执行
- else if (diff > 0)
- return 1;
- // 如果两个任务的执行时间相同则比较序号
- else if (sequenceNumber < x.sequenceNumber)
- /**
- * 当前任务与other任务执行时间相同,但序号较小
- * 因此当前任务先执行
- */
- return -1;
- else
- /**
- * 当前任务与other任务执行时间相同,但序号较大
- * 因此当前任务后执行
- */
- return 1;
- }
- /**
- * 走到这里表示传入的other不是ScheduledFutureTask类型
- * 计算二者的执行时间差,根据该差值决定大小
- */
- long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS));
- return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
- }
- /**
- * Returns true if this is a periodic (not a one-shot) action.
- *
- * 判断是否是周期性任务
- *
- * @return true if periodic
- */
- public boolean isPeriodic() {
- return period != 0;
- }
- /**
- * Sets the next time to run for a periodic task.
- * 设置下一次执行时间
- */
- private void setNextRunTime() {
- // 获取周期时间间隔
- long p = period;
- if (p > 0)
- /**
- * 如果p大于0,表示是周期性执行任务,
- * 下一次执行任务在本次任务的period时间段后
- */
- time += p;
- else
- /**
- * 如果p小于0,表示是间隔性执行任务
- * 如果p等于0,表示是非重复性任务
- * 使用triggerTime()计算下一次执行任务的时间
- */
- time = triggerTime(-p);
- }
- // 取消任务的执行
- public boolean cancel(boolean mayInterruptIfRunning) {
- // 调用父类方法取消任务的执行
- boolean cancelled = super.cancel(mayInterruptIfRunning);
- // 如果取消成功,且removeOnCancel策略为true,任务的heapIndex大于等于0
- if (cancelled && removeOnCancel && heapIndex >= 0)
- // 则将任务从队列中移除
- remove(this);
- // 返回是否取消成功的结果
- return cancelled;
- }
- /**
- * Overrides FutureTask version so as to reset/requeue if periodic.
- * 任务的具体执行
- */
- public void run() {
- // 任务是否是周期性的
- boolean periodic = isPeriodic();
- // 判断当前周期性任务是否可以执行
- if (!canRunInCurrentRunState(periodic))
- // 如果不能执行,就取消当前任务
- cancel(false);
- else if (!periodic)
- // 如果不是周期性任务,直接执行即可
- ScheduledFutureTask.super.run();
- else if (ScheduledFutureTask.super.runAndReset()) {
- // 否则执行任务并重置,然后计算并设置下次执行时间
- setNextRunTime();
- // 重新添加下一次任务,即将当前ScheduledFutureTask重新入队
- reExecutePeriodic(outerTask);
- }
- }
- }
- /**
- * Returns true if can run a task given current run state
- * and run-after-shutdown parameters.
- *
- * 根据任务类型来判断任务在当前状态下是否可以运行
- *
- * @param periodic true if this task periodic, false if delayed
- */
- boolean canRunInCurrentRunState(boolean periodic) {
- /**
- * continueExistingPeriodicTasksAfterShutdown默认是false,是控制周期性任务的
- * executeExistingDelayedTasksAfterShutdown默认是true,是控制延时性任务的
- */
- return isRunningOrShutdown(periodic ?
- continueExistingPeriodicTasksAfterShutdown :
- executeExistingDelayedTasksAfterShutdown);
- }
- /**
- * Main execution method for delayed or periodic tasks. If pool
- * is shut down, rejects the task. Otherwise adds task to queue
- * and starts a thread, if necessary, to run it. (We cannot
- * prestart the thread to run the task because the task (probably)
- * shouldn't be run yet,) If the pool is shut down while the task
- * is being added, cancel and remove it if required by state and
- * run-after-shutdown parameters.
- *
- * @param task the task
- */
- private void delayedExecute(RunnableScheduledFuture<?> task) {
- if (isShutdown())
- // 如果线程池关闭了就拒绝任务
- reject(task);
- else {
- // 将任务添加到等待队列中
- super.getQueue().add(task);
- /**
- * 如果线程池关闭,或者当前状态无法执行任务
- * 就将任务从等待队列中移除
- */
- if (isShutdown() &&
- !canRunInCurrentRunState(task.isPeriodic()) &&
- remove(task))
- // 任务从等待队列移除成功后,取消任务
- task.cancel(false);
- else
- // 走到这里表示可以执行,则预启动Worker线程
- ensurePrestart();
- }
- }
- /**
- * Requeues a periodic task unless current run state precludes it.
- * Same idea as delayedExecute except drops task rather than rejecting.
- *
- * 周期性任务的重新入队
- *
- * @param task the task
- */
- void reExecutePeriodic(RunnableScheduledFuture<?> task) {
- // 判断当前线程池状态是否可以运行周期性任务
- if (canRunInCurrentRunState(true)) {
- // 任务重新入队
- super.getQueue().add(task);
- /**
- * 重新检查当前线程池状态是否可以运行周期性任务,如果不行就从队列中移除任务
- * 并取消任务的执行
- */
- if (!canRunInCurrentRunState(true) && remove(task))
- task.cancel(false);
- else
- // 如果可以运行就预启动Worker线程
- ensurePrestart();
- }
- }
- /**
- * Cancels and clears the queue of all tasks that should not be run
- * due to shutdown policy. Invoked within super.shutdown.
- *
- * 根据关闭策略在线程池关闭时取消及清理队列中的任务
- * 这是ThreadPoolExecutor中的一个钩子方法,会在shutdown()方法中执行
- * 即在线程池关闭后,该方法会被调用
- */
- @Override void onShutdown() {
- // 获取等待队列
- BlockingQueue<Runnable> q = super.getQueue();
- // 获取对延时性任务和周期性任务的处理方式
- boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy();
- boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy();
- if (!keepDelayed && !keepPeriodic) {
- /**
- * 如果对延时性任务和周期性任务都不做保留
- * 则遍历整个等待队列,取消队列中的任务
- */
- for (Object e : q.toArray())
- // 如果任务是RunnableScheduledFuture类型
- if (e instanceof RunnableScheduledFuture<?>)
- // 取消任务的执行
- ((RunnableScheduledFuture<?>) e).cancel(false);
- // 清理队列
- q.clear();
- }
- else {
- // Traverse snapshot to avoid iterator exceptions
- /**
- * 走到这里说明keepDelayed和keepPeriodic其中有一个或两个为true
- * 遍历等待队列的所有任务
- */
- for (Object e : q.toArray()) {
- // 如果任务是RunnableScheduledFuture类型
- if (e instanceof RunnableScheduledFuture) {
- // 强转
- RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e;
- /**
- * 根据任务情况进行相应的判断
- * 如果是周期性任务,根据keepPeriodic来判断
- * 如果是延时性任务,根据keepDelayed来判断
- * 或者任务已经被取消了,也应该从队列中移除
- */
- if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) || t.isCancelled()) { // also remove if already cancelled
- // 从队列中移除任务
- if (q.remove(t))
- // 移除成功后对任务进行取消
- t.cancel(false);
- }
- }
- }
- }
- // 尝试终止线程池
- tryTerminate();
- }
- /**
- * Modifies or replaces the task used to execute a runnable.
- * This method can be used to override the concrete
- * class used for managing internal tasks.
- * The default implementation simply returns the given task.
- *
- * 对任务进行装饰的方法,没有任何处理,可被子类继承以定制
- *
- * @param runnable the submitted Runnable
- * @param task the task created to execute the runnable
- * @return a task that can execute the runnable
- * @since 1.6
- */
- protected <V> RunnableScheduledFuture<V> decorateTask(
- Runnable runnable, RunnableScheduledFuture<V> task) {
- return task;
- }
- /**
- * Modifies or replaces the task used to execute a callable.
- * This method can be used to override the concrete
- * class used for managing internal tasks.
- * The default implementation simply returns the given task.
- *
- * 对任务进行装饰的方法,没有任何处理,可被子类继承以定制
- *
- * @param callable the submitted Callable
- * @param task the task created to execute the callable
- * @return a task that can execute the callable
- * @since 1.6
- */
- protected <V> RunnableScheduledFuture<V> decorateTask(
- Callable<V> callable, RunnableScheduledFuture<V> task) {
- return task;
- }
- /**
- * Creates a new {@code ScheduledThreadPoolExecutor} with the
- * given core pool size.
- *
- * 根据给定的corePoolSize创建一个调度线程池
- *
- * @param corePoolSize the number of threads to keep in the pool, even
- * if they are idle, unless {@code allowCoreThreadTimeOut} is set
- * @throws IllegalArgumentException if {@code corePoolSize < 0}
- */
- public ScheduledThreadPoolExecutor(int corePoolSize) {
- super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
- new DelayedWorkQueue());
- }
- /**
- * Creates a new {@code ScheduledThreadPoolExecutor} with the
- * given initial parameters.
- *
- * 根据给定的corePoolSize和线程工厂创建一个调度线程池
- *
- * @param corePoolSize the number of threads to keep in the pool, even
- * if they are idle, unless {@code allowCoreThreadTimeOut} is set
- * @param threadFactory the factory to use when the executor
- * creates a new thread
- * @throws IllegalArgumentException if {@code corePoolSize < 0}
- * @throws NullPointerException if {@code threadFactory} is null
- */
- public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
- super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
- new DelayedWorkQueue(), threadFactory);
- }
- /**
- * Creates a new ScheduledThreadPoolExecutor with the given
- * initial parameters.
- *
- * 根据给定的corePoolSize和拒绝策略处理器创建一个调度线程池
- *
- * @param corePoolSize the number of threads to keep in the pool, even
- * if they are idle, unless {@code allowCoreThreadTimeOut} is set
- * @param handler the handler to use when execution is blocked
- * because the thread bounds and queue capacities are reached
- * @throws IllegalArgumentException if {@code corePoolSize < 0}
- * @throws NullPointerException if {@code handler} is null
- */
- public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
- super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
- new DelayedWorkQueue(), handler);
- }
- /**
- * Creates a new ScheduledThreadPoolExecutor with the given
- * initial parameters.
- *
- * 根据给定的corePoolSize、线程工厂和拒绝策略处理器创建一个调度线程池
- *
- * @param corePoolSize the number of threads to keep in the pool, even
- * if they are idle, unless {@code allowCoreThreadTimeOut} is set
- * @param threadFactory the factory to use when the executor
- * creates a new thread
- * @param handler the handler to use when execution is blocked
- * because the thread bounds and queue capacities are reached
- * @throws IllegalArgumentException if {@code corePoolSize < 0}
- * @throws NullPointerException if {@code threadFactory} or
- * {@code handler} is null
- */
- public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
- super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
- new DelayedWorkQueue(), threadFactory, handler);
- }
- /**
- * Returns the trigger time of a delayed action.
- * 处理延迟时间,内部调用了重载方法
- */
- private long triggerTime(long delay, TimeUnit unit) {
- // 将传入的时间进行转换,然后使用重载方法处理
- return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
- }
- /**
- * Returns the trigger time of a delayed action.
- * 根据传入的延迟时间和当前时间,计算最终触发时间
- */
- long triggerTime(long delay) {
- /**
- * 延迟时间delay小于Long.MAX_VALUE >> 1,则返回now() + delay
- * 延迟时间delay大于Long.MAX_VALUE >> 1,则返回overflowFree()处理的结果,处理过程详见该方法
- */
- return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
- }
- /**
- * Constrains the values of all delays in the queue to be within
- * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
- * This may occur if a task is eligible to be dequeued, but has
- * not yet been, while some other task is added with a delay of
- * Long.MAX_VALUE.
- */
- private long overflowFree(long delay) {
- // 查看延迟等待队列(在ScheduledThreadPoolExecutor中使用的是延迟队列DelayedWorkQueue)中的队首任务
- Delayed head = (Delayed) super.getQueue().peek();
- if (head != null) {
- // 如果队首任务不为null,计算队首任务相对于当前时间还需要延迟的时间headDelay
- long headDelay = head.getDelay(TimeUnit.NANOSECONDS);
- /**
- * 如果headDelay小于0,表示此时队首任务应该出队了,但恰好有任务入队
- * 且delay - headDelay小于0,则表示此时delay的值小于0或大于Long.MAX_VALUE
- * 而传入该方法的delay一定是大于0的,因此此时delay大于Long.MAX_VALUE
- */
- if (headDelay < 0 && (delay - headDelay < 0))
- /**
- * 如果delay大于Long.MAX_VALUE,且队首任务应该要出队了(headDelay为负)
- * 则将delay置为Long.MAX_VALUE + headDelay
- */
- delay = Long.MAX_VALUE + headDelay;
- }
- // 返回delay
- return delay;
- }
- /**
- * 延迟执行任务command,延迟时间由delay和unit决定
- * 只执行一次
- * @throws RejectedExecutionException {@inheritDoc}
- * @throws NullPointerException {@inheritDoc}
- */
- public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
- // 检查参数
- if (command == null || unit == null)
- throw new NullPointerException();
- /**
- * 使用triggerTime()计算执行时间,创建一个RunnableScheduledFuture对象
- * 默认的decorateTask()方法什么都没做,直接将传入的ScheduledFutureTask返回了
- */
- RunnableScheduledFuture<?> t = decorateTask(command,
- new ScheduledFutureTask<Void>(command, null,
- triggerTime(delay, unit)));
- // 延迟执行任务
- delayedExecute(t);
- return t;
- }
- /**
- * 延迟执行会产生执行结果任务command,延迟时间由delay和unit决定
- * 只执行一次
- * @throws RejectedExecutionException {@inheritDoc}
- * @throws NullPointerException {@inheritDoc}
- */
- public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
- // 检查参数
- if (callable == null || unit == null)
- throw new NullPointerException();
- /**
- * 使用triggerTime()计算执行时间,创建一个RunnableScheduledFuture对象
- * 默认的decorateTask()方法什么都没做,直接将传入的ScheduledFutureTask返回了
- */
- RunnableScheduledFuture<V> t = decorateTask(callable,
- new ScheduledFutureTask<V>(callable,
- triggerTime(delay, unit)));
- // 延迟执行任务
- delayedExecute(t);
- return t;
- }
- /**
- * 该方法在initialDelay时长后第一次执行任务,以后每隔period时长,再次执行任务。
- * 注意,period是从任务开始执行算起的。
- * 开始执行任务后,定时器每隔period时长检查该任务是否完成,
- * 如果完成则再次启动任务,否则等该任务结束后才再次启动任务
- * @throws RejectedExecutionException {@inheritDoc}
- * @throws NullPointerException {@inheritDoc}
- * @throws IllegalArgumentException {@inheritDoc}
- */
- public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,
- long period, TimeUnit unit) {
- // 检查参数
- if (command == null || unit == null)
- throw new NullPointerException();
- if (period <= 0)
- throw new IllegalArgumentException();
- // 使用triggerTime()计算执行时间,创建一个ScheduledFutureTask对象
- ScheduledFutureTask<Void> sft =
- new ScheduledFutureTask<Void>(command, null,
- triggerTime(initialDelay, unit), unit.toNanos(period));
- // 使用decorateTask()对ScheduledFutureTask对象进行装饰
- RunnableScheduledFuture<Void> t = decorateTask(command, sft);
- sft.outerTask = t;
- // 执行任务
- delayedExecute(t);
- return t;
- }
- /**
- * 该方法在initialDelay时长后第一次执行任务,
- * 以后每当任务执行完成后,等待delay时长,再次执行任务
- * @throws RejectedExecutionException {@inheritDoc}
- * @throws NullPointerException {@inheritDoc}
- * @throws IllegalArgumentException {@inheritDoc}
- */
- public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
- long delay, TimeUnit unit) {
- // 检查参数
- if (command == null || unit == null)
- throw new NullPointerException();
- if (delay <= 0)
- throw new IllegalArgumentException();
- // 使用triggerTime()计算执行时间,创建一个ScheduledFutureTask对象
- ScheduledFutureTask<Void> sft =
- new ScheduledFutureTask<Void>(command, null,
- triggerTime(initialDelay, unit), unit.toNanos(-delay));
- RunnableScheduledFuture<Void> t = decorateTask(command, sft);
- sft.outerTask = t;
- delayedExecute(t);
- return t;
- }
- /**
- * Executes {@code command} with zero required delay.
- * This has effect equivalent to
- * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
- * Note that inspections of the queue and of the list returned by
- * {@code shutdownNow} will access the zero-delayed
- * {@link ScheduledFuture}, not the {@code command} itself.
- *
- * <p>A consequence of the use of {@code ScheduledFuture} objects is
- * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
- * called with a null second {@code Throwable} argument, even if the
- * {@code command} terminated abruptly. Instead, the {@code Throwable}
- * thrown by such a task can be obtained via {@link Future#get}.
- *
- * 执行任务command
- *
- * @throws RejectedExecutionException at discretion of
- * {@code RejectedExecutionHandler}, if the task
- * cannot be accepted for execution because the
- * executor has been shut down
- * @throws NullPointerException {@inheritDoc}
- */
- public void execute(Runnable command) {
- schedule(command, 0, TimeUnit.NANOSECONDS);
- }
- // Override AbstractExecutorService methods
- /**
- * 执行任务task
- * @throws RejectedExecutionException {@inheritDoc}
- * @throws NullPointerException {@inheritDoc}
- */
- public Future<?> submit(Runnable task) {
- return schedule(task, 0, TimeUnit.NANOSECONDS);
- }
- /**
- * 执行任务task
- * @throws RejectedExecutionException {@inheritDoc}
- * @throws NullPointerException {@inheritDoc}
- */
- public <T> Future<T> submit(Runnable task, T result) {
- return schedule(Executors.callable(task, result),
- 0, TimeUnit.NANOSECONDS);
- }
- /**
- * 执行任务task
- * @throws RejectedExecutionException {@inheritDoc}
- * @throws NullPointerException {@inheritDoc}
- */
- public <T> Future<T> submit(Callable<T> task) {
- return schedule(task, 0, TimeUnit.NANOSECONDS);
- }
- /**
- * Sets the policy on whether to continue executing existing
- * periodic tasks even when this executor has been {@code shutdown}.
- * In this case, these tasks will only terminate upon
- * {@code shutdownNow} or after setting the policy to
- * {@code false} when already shutdown.
- * This value is by default {@code false}.
- *
- * 设置周期性任务在线程池关闭状态下是否继续运行的策略
- *
- * @param value if {@code true}, continue after shutdown, else don't.
- * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
- */
- public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
- continueExistingPeriodicTasksAfterShutdown = value;
- // 如果设置的为false,且线程池关闭了
- if (!value && isShutdown())
- // 手动调用钩子方法
- onShutdown();
- }
- /**
- * Gets the policy on whether to continue executing existing
- * periodic tasks even when this executor has been {@code shutdown}.
- * In this case, these tasks will only terminate upon
- * {@code shutdownNow} or after setting the policy to
- * {@code false} when already shutdown.
- * This value is by default {@code false}.
- *
- * 获取周期性任务在线程池关闭状态下是否继续运行的策略
- *
- * @return {@code true} if will continue after shutdown
- * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
- */
- public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
- return continueExistingPeriodicTasksAfterShutdown;
- }
- /**
- * Sets the policy on whether to execute existing delayed
- * tasks even when this executor has been {@code shutdown}.
- * In this case, these tasks will only terminate upon
- * {@code shutdownNow}, or after setting the policy to
- * {@code false} when already shutdown.
- * This value is by default {@code true}.
- *
- * 设置延时性任务在线程池关闭状态下是否继续运行的策略
- *
- * @param value if {@code true}, execute after shutdown, else don't.
- * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
- */
- public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
- executeExistingDelayedTasksAfterShutdown = value;
- // 如果设置的为false,且线程池关闭了
- if (!value && isShutdown())
- // 手动调用钩子方法
- onShutdown();
- }
- /**
- * Gets the policy on whether to execute existing delayed
- * tasks even when this executor has been {@code shutdown}.
- * In this case, these tasks will only terminate upon
- * {@code shutdownNow}, or after setting the policy to
- * {@code false} when already shutdown.
- * This value is by default {@code true}.
- *
- * 获取延时性任务在线程池关闭状态下是否继续运行的策略
- *
- * @return {@code true} if will execute after shutdown
- * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
- */
- public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
- return executeExistingDelayedTasksAfterShutdown;
- }
- /**
- * Sets the policy on whether cancelled tasks should be immediately
- * removed from the work queue at time of cancellation. This value is
- * by default {@code false}.
- *
- * 设置在取消任务时是否将任务移除的策略
- *
- * @param value if {@code true}, remove on cancellation, else don't
- * @see #getRemoveOnCancelPolicy
- * @since 1.7
- */
- public void setRemoveOnCancelPolicy(boolean value) {
- removeOnCancel = value;
- }
- /**
- * Gets the policy on whether cancelled tasks should be immediately
- * removed from the work queue at time of cancellation. This value is
- * by default {@code false}.
- *
- * 获取在取消任务时是否将任务移除的策略
- *
- * @return {@code true} if cancelled tasks are immediately removed
- * from the queue
- * @see #setRemoveOnCancelPolicy
- * @since 1.7
- */
- public boolean getRemoveOnCancelPolicy() {
- return removeOnCancel;
- }
- /**
- * Initiates an orderly shutdown in which previously submitted
- * tasks are executed, but no new tasks will be accepted.
- * Invocation has no additional effect if already shut down.
- *
- * <p>This method does not wait for previously submitted tasks to
- * complete execution. Use {@link #awaitTermination awaitTermination}
- * to do that.
- *
- * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
- * has been set {@code false}, existing delayed tasks whose delays
- * have not yet elapsed are cancelled. And unless the {@code
- * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
- * {@code true}, future executions of existing periodic tasks will
- * be cancelled.
- *
- * @throws SecurityException {@inheritDoc}
- */
- public void shutdown() {
- super.shutdown();
- }
- /**
- * Attempts to stop all actively executing tasks, halts the
- * processing of waiting tasks, and returns a list of the tasks
- * that were awaiting execution.
- *
- * <p>This method does not wait for actively executing tasks to
- * terminate. Use {@link #awaitTermination awaitTermination} to
- * do that.
- *
- * <p>There are no guarantees beyond best-effort attempts to stop
- * processing actively executing tasks. This implementation
- * cancels tasks via {@link Thread#interrupt}, so any task that
- * fails to respond to interrupts may never terminate.
- *
- * @return list of tasks that never commenced execution.
- * Each element of this list is a {@link ScheduledFuture},
- * including those tasks submitted using {@code execute},
- * which are for scheduling purposes used as the basis of a
- * zero-delay {@code ScheduledFuture}.
- * @throws SecurityException {@inheritDoc}
- */
- public List<Runnable> shutdownNow() {
- return super.shutdownNow();
- }
- /**
- * Returns the task queue used by this executor. Each element of
- * this queue is a {@link ScheduledFuture}, including those
- * tasks submitted using {@code execute} which are for scheduling
- * purposes used as the basis of a zero-delay
- * {@code ScheduledFuture}. Iteration over this queue is
- * <em>not</em> guaranteed to traverse tasks in the order in
- * which they will execute.
- *
- * @return the task queue
- */
- public BlockingQueue<Runnable> getQueue() {
- return super.getQueue();
- }
- /**
- * Specialized delay queue. To mesh with TPE declarations, this
- * class must be declared as a BlockingQueue<Runnable> even though
- * it can only hold RunnableScheduledFutures.
- * 延时队列,用于装载被执行的任务,然后根据调度配置进行入队和出队操作
- * 小顶堆模式实现
- */
- static class DelayedWorkQueue extends AbstractQueue<Runnable>
- implements BlockingQueue<Runnable> {
- // 初始容量为16
- private static final int INITIAL_CAPACITY = 16;
- // 用于装载任务的数组
- private RunnableScheduledFuture[] queue = new RunnableScheduledFuture[INITIAL_CAPACITY];
- // 锁
- private final ReentrantLock lock = new ReentrantLock();
- // 大小
- private int size = 0;
- /**
- * Thread designated to wait for the task at the head of the
- * queue. This variant of the Leader-Follower pattern
- * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
- * minimize unnecessary timed waiting. When a thread becomes
- * the leader, it waits only for the next delay to elapse, but
- * other threads await indefinitely. The leader thread must
- * signal some other thread before returning from take() or
- * poll(...), unless some other thread becomes leader in the
- * interim. Whenever the head of the queue is replaced with a
- * task with an earlier expiration time, the leader field is
- * invalidated by being reset to null, and some waiting
- * thread, but not necessarily the current leader, is
- * signalled. So waiting threads must be prepared to acquire
- * and lose leadership while waiting.
- *
- * 试图执行队首任务的正处于等待状态的线程对象
- */
- private Thread leader = null;
- /**
- * Condition signalled when a newer task becomes available at the
- * head of the queue or a new thread may need to become leader.
- * 等待条件
- */
- private final Condition available = lock.newCondition();
- /**
- * Set f's heapIndex if it is a ScheduledFutureTask.
- * 设置ScheduledFutureTask的heapIndex
- */
- private void setIndex(RunnableScheduledFuture f, int idx) {
- // 当传入的f是ScheduledFutureTask类型时,设置其heapIndex
- if (f instanceof ScheduledFutureTask)
- ((ScheduledFutureTask)f).heapIndex = idx;
- }
- /**
- * Sift element added at bottom up to its heap-ordered spot.
- * Call only when holding lock.
- * 堆上浮操作
- * @param k 添加之前的数组大小
- * @param key 添加的任务
- */
- private void siftUp(int k, RunnableScheduledFuture key) {
- // 循环直到k为0
- while (k > 0) {
- // 获取k位置任务的父任务的索引
- int parent = (k - 1) >>> 1;
- // 取出父任务
- RunnableScheduledFuture e = queue[parent];
- /**
- * 将key与父任务进行比较,如果key大于等于父任务,
- * 表示key要在父任务之后或者与父任务同时执行,则跳出循环
- */
- if (key.compareTo(e) >= 0)
- break;
- /**
- * 否则key小于父任务,表示key要比父任务早执行
- * 因此将k位置替换为父任务
- */
- queue[k] = e;
- // 修改父任务的heapIndex为k
- setIndex(e, k);
- // 将父任务索引赋值给k,进行下一次循环
- k = parent;
- }
- /**
- * 走到这里说明key要在父任务之后或者与父任务同时执行
- * 则已经找到了key所应该在的位置
- * 将key放在k索引位置
- */
- queue[k] = key;
- // 将key的heapIndex设置为k
- setIndex(key, k);
- }
- /**
- * Sift element added at top down to its heap-ordered spot.
- * Call only when holding lock.
- * 堆下沉操作
- */
- private void siftDown(int k, RunnableScheduledFuture key) {
- // 取数组queue中心位置索引half
- int half = size >>> 1;
- /**
- * 循环直到k大于或等于中心索引half时,
- * 由于从half开始,节点就不再有孩子节点
- */
- while (k < half) {
- // 获取左孩子节点的索引
- int child = (k << 1) + 1;
- // 获取左孩子节点
- RunnableScheduledFuture c = queue[child];
- // 获取右孩子节点的索引
- int right = child + 1;
- /**
- * 当右孩子节点的索引小于size时,说明右孩子还存在
- * 比较左孩子与右孩子的大小
- * 如果左孩子比右孩子大,表明右孩子比左孩子的任务更早执行
- * 则将c指向右孩子,child指向right索引
- * 此时c指向的是更早执行的任务所在的那个孩子节点
- */
- if (right < size && c.compareTo(queue[right]) > 0)
- c = queue[child = right];
- /**
- * 比较key与c,如果小于等于0,说明key比c还要更早执行,
- * 说明此时k就是key应该放在的位置,跳出循环
- */
- if (key.compareTo(c) <= 0)
- break;
- /**
- * 否则说明key在c后面执行
- * 将更早执行的c放在k(即c的父节点)位置
- */
- queue[k] = c;
- // 设置c的heapIndex为k
- setIndex(c, k);
- // 将k指向child(此时的右孩子的索引),继续下次循环
- k = child;
- }
- // 将key放在queue的k索引位置上
- queue[k] = key;
- // 设置key的heapIndex为k
- setIndex(key, k);
- }
- /**
- * Resize the heap array. Call only when holding lock.
- * 扩容操作
- */
- private void grow() {
- // 旧的容量
- int oldCapacity = queue.length;
- // 新的容量是旧的容量的1.5倍
- int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
- // 如果溢出就将其设置为Integer.MAX_VALUE
- if (newCapacity < 0) // overflow
- newCapacity = Integer.MAX_VALUE;
- // 拷贝旧数组的任务到新数组,然后将新数组赋值给queue
- queue = Arrays.copyOf(queue, newCapacity);
- }
- /**
- * Find index of given object, or -1 if absent
- * 查找给定x在数组中的索引
- */
- private int indexOf(Object x) {
- // x不能为null
- if (x != null) {
- // x要是ScheduledFutureTask类型的对象
- if (x instanceof ScheduledFutureTask) {
- // 获取x的heapIndex为i
- int i = ((ScheduledFutureTask) x).heapIndex;
- // Sanity check; x could conceivably be a
- // ScheduledFutureTask from some other pool.
- // 如果i大于等于0,且i小于size,且queue中i位置的任务与x相等
- if (i >= 0 && i < size && queue[i] == x)
- // i即是x在数组中的索引,直接返回
- return i;
- } else {
- /**
- * 如果x不是ScheduledFutureTask对象
- * 遍历queue数组,依次查找
- */
- for (int i = 0; i < size; i++)
- // 当查找到的任务与x相同,则返回此时的索引i
- if (x.equals(queue[i]))
- return i;
- }
- }
- // 否则返回-1,表示没找到
- return -1;
- }
- // 判断任务x是否存在于queue中
- public boolean contains(Object x) {
- // 加锁
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- // 使用indexOf()进行查找并判断
- return indexOf(x) != -1;
- } finally {
- // 解锁
- lock.unlock();
- }
- }
- // 从queue中移除任务x
- public boolean remove(Object x) {
- // 加锁
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- // 查找x在queue中的索引
- int i = indexOf(x);
- // 如果返回值小于0表示没找到,直接返回false
- if (i < 0)
- return false;
- /**
- * 走到此处表示查找到了,i即为x在queue中的索引
- * 将queue中i位置的任务的heapIndex设置为-1(如果x是ScheduledFutureTask类型的话)
- */
- setIndex(queue[i], -1);
- // 计算移除后的size大小
- int s = --size;
- // 获取最后一个任务replacement
- RunnableScheduledFuture replacement = queue[s];
- // 将queue中s位置(即最后有任务的位置)上的任务置为null
- queue[s] = null;
- /**
- * 如果s与i不同,表示移除的不是最后一个任务
- * 此时要将i位置的缺补上
- * 补缺方式是将最后一个任务放在i位置,然后尝试对其进行下沉或上浮操作
- * 先尝试下沉,如果下沉结束发现最后一个任务还处于i位置,则表明该任务没有移动
- * 因此尝试对该任务进行上浮操作
- */
- if (s != i) {
- // 需要将replacement从i位置尝试下沉操作
- siftDown(i, replacement);
- /**
- * 如果下沉操作完成,i位置任务还为replacement
- * 则表示replacement没有下沉,尝试上浮操作
- */
- if (queue[i] == replacement)
- siftUp(i, replacement);
- }
- // 返回true表示移除成功
- return true;
- } finally {
- // 解锁
- lock.unlock();
- }
- }
- // 查看队列大小
- public int size() {
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- // 直接返回size即可
- return size;
- } finally {
- lock.unlock();
- }
- }
- // 队列是否为空
- public boolean isEmpty() {
- return size() == 0;
- }
- // 剩余容量是无限的
- public int remainingCapacity() {
- return Integer.MAX_VALUE;
- }
- // 查看队首任务
- public RunnableScheduledFuture peek() {
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- // 返回queue数组中第一个任务(并未取出)
- return queue[0];
- } finally {
- lock.unlock();
- }
- }
- // 添加任务
- public boolean offer(Runnable x) {
- // 检查任务
- if (x == null)
- throw new NullPointerException();
- //将任务转换为RunnableScheduledFuture对象
- RunnableScheduledFuture e = (RunnableScheduledFuture)x;
- // 加锁
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- int i = size;
- if (i >= queue.length)
- // 如果任务个数大于等于队列数组容量,进行扩容操作
- grow();
- // size加1
- size = i + 1;
- if (i == 0) {
- /**
- * i为0,即原始size为0,没有任务
- * 将e放在第0位置
- */
- queue[0] = e;
- // 设置e的heapIndex为0
- setIndex(e, 0);
- } else {
- // 否则调用siftUp()对e进行添加及上浮操作
- siftUp(i, e);
- }
- /**
- * 如果第0个位置的任务是e,
- * 表示此时入队的是队首元素,可以将leader清空了
- * 同时可以通知其他线程可以竞争成为leader了
- */
- if (queue[0] == e) {
- // 将leader置为null
- leader = null;
- // 唤醒等待在available上的线程
- available.signal();
- }
- } finally {
- // 解锁
- lock.unlock();
- }
- return true;
- }
- // 添加任务,内部调用offer()
- public void put(Runnable e) {
- offer(e);
- }
- // 添加任务,内部调用offer()
- public boolean add(Runnable e) {
- return offer(e);
- }
- // 添加任务,内部调用offer()
- public boolean offer(Runnable e, long timeout, TimeUnit unit) {
- return offer(e);
- }
- /**
- * Performs common bookkeeping for poll and take: Replaces
- * first element with last and sifts it down. Call only when
- * holding lock.
- * 将第一个任务与最后一个任务进行替换,然后进行下沉操作
- * @param f the task to remove and return
- */
- private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) {
- int s = --size;
- // 取出最后一个任务为x
- RunnableScheduledFuture x = queue[s];
- // 将最后索引位置上置为null
- queue[s] = null;
- // 如果最后索引不是0,表示堆中元素个数大于1
- if (s != 0)
- // 将x从堆顶开始做下沉操作
- siftDown(0, x);
- // 设置f的heapIndex为-1,并将其返回
- setIndex(f, -1);
- return f;
- }
- // 取出队首任务
- public RunnableScheduledFuture poll() {
- // 加锁
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- // 获取queue索引为0位置的任务为first
- RunnableScheduledFuture first = queue[0];
- // 如果first任务为null,或者first任务的延迟时间还大于0,就返回null
- if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
- return null;
- else
- // 否则调用finishPoll()将first返回,并对堆进行调整
- return finishPoll(first);
- } finally {
- // 解锁
- lock.unlock();
- }
- }
- // 获取队首任务
- public RunnableScheduledFuture take() throws InterruptedException {
- // 加锁,可中断
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
- try {
- // 无限循环
- for (;;) {
- // 获取queue中索引为0位置的任务(即任务)为first
- RunnableScheduledFuture first = queue[0];
- // 如果first为null,表示此时queue是空的,进入等待
- if (first == null)
- available.await();
- else {
- // 走到这里表示取到了first不为null,获取其延迟时间
- long delay = first.getDelay(TimeUnit.NANOSECONDS);
- // 如果延迟时间小于等于0,表示该任务应该被执行了
- if (delay <= 0)
- // 调用finishPoll()将其返回并对堆进行调整
- return finishPoll(first);
- else if (leader != null)
- /**
- * 否则表示任务还没有到执行时间,判断leader是否为null,
- * 如果leader不为null,表示已经有任务(即leader)在队首进行等待了
- * 那么就无条件进入等待状态
- */
- available.await();
- else {
- /**
- * 否则说明leader为null,表示此时还没有任务在队首等待
- * 使用leader记录当前线程,然后使当前线程进入超时等待
- */
- Thread thisThread = Thread.currentThread();
- leader = thisThread;
- try {
- // 然后进入带有超时机制的等待,超时时间为delay
- available.awaitNanos(delay);
- } finally {
- /**
- * 超时等待结束后
- * 如果leader还为当前线程,就将leader置为null
- */
- if (leader == thisThread)
- leader = null;
- }
- }
- }
- }
- } finally {
- /**
- * 最终当某个任务被返回后
- * 如果leader为null(表示此时队首没有任务在超时等待过程中),
- * 且queue首任务不为null(表示还有任务未执行)时,
- * 则唤醒等待在available上的线程
- */
- if (leader == null && queue[0] != null)
- available.signal();
- // 解锁
- lock.unlock();
- }
- }
- // 获取队首任务,带有超时机制
- public RunnableScheduledFuture poll(long timeout, TimeUnit unit)
- throws InterruptedException {
- // 计算超时时间
- long nanos = unit.toNanos(timeout);
- // 加锁
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
- try {
- // 无限循环
- for (;;) {
- // 获取queue中索引为0位置的任务(即任务)为first
- RunnableScheduledFuture first = queue[0];
- /**
- * 如果first为null,表示此时queue是空的,
- * 检查剩余的超时时间,如果剩余时间小于等于0,表示超时了,直接返回null
- * 否则使用available进入超时等待
- */
- if (first == null) {
- if (nanos <= 0)
- return null;
- else
- nanos = available.awaitNanos(nanos);
- } else {
- // 走到这里表示取到了first不为null,获取其延迟时间
- long delay = first.getDelay(TimeUnit.NANOSECONDS);
- // 如果延迟时间小于等于0,表示该任务应该被执行了
- if (delay <= 0)
- // 调用finishPoll()将其返回并对堆进行调整
- return finishPoll(first);
- // 走到这里表示first任务的延迟时间还大于0,即还没有到执行时间
- if (nanos <= 0)
- // 如果此时超时了,就直接返回null
- return null;
- /**
- * 走到这里表示没有超时,
- * 如果超时时间小于延迟执行时间delay,
- * 或者超时时间虽然大于或等于延迟时间,但leader不为null(表示队首已经有任务在等待了),
- * 如果是就进入超时等待,等待时间为nanos
- */
- if (nanos < delay || leader != null)
- nanos = available.awaitNanos(nanos);
- else {
- /**
- * 走到表示超时时间大于等于延迟时间,且leader为null
- * 使用leader记录当前线程
- */
- Thread thisThread = Thread.currentThread();
- leader = thisThread;
- try {
- // 进入超时等待,超时时间为delay,返回值为未等待的时间(可能中途被中断了,或者被唤醒)
- long timeLeft = available.awaitNanos(delay);
- // 等待从超时时间中减去已等待的时间,即为剩余的超时时间
- nanos -= delay - timeLeft;
- } finally {
- /**
- * 超时等待结束后
- * 如果leader还为当前线程,就将leader置为null
- */
- if (leader == thisThread)
- leader = null;
- }
- }
- }
- }
- } finally {
- /**
- * 最终当某个任务被返回后
- * 如果leader为null(表示此时队首没有任务在超时等待过程中),
- * 且queue首任务不为null(表示还有任务未执行)时,
- * 则唤醒等待在available上的线程
- */
- if (leader == null && queue[0] != null)
- available.signal();
- // 解锁
- lock.unlock();
- }
- }
- // 清除队列任务
- public void clear() {
- // 加锁
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- // 遍历queue数组
- for (int i = 0; i < size; i++) {
- // 取出i位置任务为t
- RunnableScheduledFuture t = queue[i];
- if (t != null) {
- // 如果t不为null,将queue的i位置任务置为null
- queue[i] = null;
- // 将t的heapIndex置为-1
- setIndex(t, -1);
- }
- }
- // size置为0
- size = 0;
- } finally {
- // 解锁
- lock.unlock();
- }
- }
- /**
- * Return and remove first element only if it is expired.
- * Used only by drainTo. Call only when holding lock.
- * 当队首任务延时过期了,将其移除;一般用于drainTo()方法中
- */
- private RunnableScheduledFuture pollExpired() {
- // 获取队首任务
- RunnableScheduledFuture first = queue[0];
- // 如果队首任务为null,或者不为null但延时时间大于0
- if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
- // 返回null
- return null;
- // 否则满足条件,将其从堆中移除,调整堆并返回该任务
- return finishPoll(first);
- }
- /**
- * 将队列中的延时过期任务依次转移,直到遇到第一个延时未过期的任务停止
- * @param c 集合
- * @return 转移的个数
- */
- public int drainTo(Collection<? super Runnable> c) {
- // 检查参数
- if (c == null)
- throw new NullPointerException();
- if (c == this)
- throw new IllegalArgumentException();
- // 加锁
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- RunnableScheduledFuture first;
- int n = 0;
- // 不断移除队首延时过期的任务,直到遇到第一个延时未过期的任务停止
- while ((first = pollExpired()) != null) {
- // 将移除的任务添加到集合c中
- c.add(first);
- // 计数
- ++n;
- }
- // 返回转移的个数
- return n;
- } finally {
- // 解锁
- lock.unlock();
- }
- }
- /**
- * 将队列中的延时过期任务依次转移,
- * 最多转移maxElements个,或者直到遇到第一个延时未过期的任务停止
- * @param c 集合
- * @param maxElements 最多转移的个数
- * @return 转移的个数
- */
- public int drainTo(Collection<? super Runnable> c, int maxElements) {
- // 检查参数
- if (c == null)
- throw new NullPointerException();
- if (c == this)
- throw new IllegalArgumentException();
- if (maxElements <= 0)
- return 0;
- // 加锁
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- RunnableScheduledFuture first;
- int n = 0;
- // 不断移除队首延时过期的任务,直到计数大于等于maxElements,或者遇到第一个延时未过期的任务停止
- while (n < maxElements && (first = pollExpired()) != null) {
- // 将移除的任务添加到集合c中
- c.add(first);
- // 计数
- ++n;
- }
- // 返回转移的个数
- return n;
- } finally {
- // 解锁
- lock.unlock();
- }
- }
- // 转为数组
- public Object[] toArray() {
- // 加锁
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- // 进行拷贝并返回
- return Arrays.copyOf(queue, size, Object[].class);
- } finally {
- // 解锁
- lock.unlock();
- }
- }
- // 转为数组
- @SuppressWarnings("unchecked")
- public <T> T[] toArray(T[] a) {
- // 加锁
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- // 如果a的容量小于任务总数,将创建一个新的数组,拷贝任务后返回
- if (a.length < size)
- return (T[]) Arrays.copyOf(queue, size, a.getClass());
- // 否则直接进行拷贝,
- System.arraycopy(queue, 0, a, 0, size);
- // 如果a的容量大于任务总数,将a中size位置的元素置为null
- if (a.length > size)
- a[size] = null;
- // 返回a
- return a;
- } finally {
- // 解锁
- lock.unlock();
- }
- }
- // 迭代器
- public Iterator<Runnable> iterator() {
- // 直接以queue转为的数组创建迭代器
- return new Itr(Arrays.copyOf(queue, size));
- }
- /**
- * Snapshot iterator that works off copy of underlying q array.
- * 迭代器类
- */
- private class Itr implements Iterator<Runnable> {
- // 记录需要迭代的数组
- final RunnableScheduledFuture[] array;
- // 当前需要返回的元素的索引游标
- int cursor = 0; // index of next element to return
- // 上一次返回的元素
- int lastRet = -1; // index of last element, or -1 if no such
- // 构造方法
- Itr(RunnableScheduledFuture[] array) {
- // 将传入的数组进行记录
- this.array = array;
- }
- // 是否还有下一个
- public boolean hasNext() {
- return cursor < array.length;
- }
- // 获取下一个元素
- public Runnable next() {
- // 如果游标大于等于数组长度,表示迭代到最末尾了,抛出异常即可
- if (cursor >= array.length)
- throw new NoSuchElementException();
- // 否则使用lastRet记录当前游标
- lastRet = cursor;
- // 然后返回当前游标处的元素
- return array[cursor++];
- }
- // 移除元素
- public void remove() {
- // 如果lastRet小于0说明它还没有指向任何元素
- if (lastRet < 0)
- throw new IllegalStateException();
- // 使用DelayedWorkQueue的移除方法移除元素
- DelayedWorkQueue.this.remove(array[lastRet]);
- // 移除后将lastRet置为-1
- lastRet = -1;
- }
- }
- }
- }
推荐阅读
Java多线程 46 - ScheduledThreadPoolExecutor详解(2)
ScheduledThreadPoolExecutor用于执行周期性或延时性的定时任务,它是在ThreadPoolExe...