Java多线程
Java并发
线程池

Java多线程 43 - ThreadPoolExecutor详解(1)

简介:ThreadPoolExecutor是线程池类,可以通俗的将它理解为存放一定数量线程的一个线程集合。线程池允许若个线程同时运行,同时运行的线程数量就是线程池的容量;当添加的到线程池中的线程超过它的容量时,会有一部分线程阻塞等待。线程池会通过相应的调度策略和拒绝策略,对添加到线程池中的线程进行管理。

1. ThreadPoolExecutor简介

ThreadPoolExecutor是线程池类,可以通俗的将它理解为存放一定数量线程的一个线程集合。线程池允许若个线程同时运行,同时运行的线程数量就是线程池的容量;当添加的到线程池中的线程超过它的容量时,会有一部分线程阻塞等待。线程池会通过相应的调度策略和拒绝策略,对添加到线程池中的线程进行管理。

线程池是与任务队列密切相关的,任务队列中保存了所有等待执行的任务。工作线程的任务很简单:从任务队列中获取一个任务,执行任务,然后返回线程池并等待下一个任务。

在线程池中执行任务比为每个任务分配一个线程优势更多,通过重用现有的线程可以避免在处理多个请求时分摊在线程创建和销毁过程中产生的巨大开销;同时工作线程通常会提前创建,因此可以避免创建线程操作而产生的任务执行的延迟,从而提高了响应性。通过适当的调整线程池的大小,可以创建足够的线程使处理器保持忙碌状态,同时还可以防止过多线程相互竞争资源而使应用程序耗尽内存或失败。

ThreadPoolExecutor的类图结构如下:

1.ThreadPoolExecutor类图结构.png

2. ThreadPoolExecutor完整源码注释

下面先贴出ThreadPoolExecutor的完整源码注释,基于JDK 1.7.0_07,详细的源码介绍请看下一篇文章。

  • package java.util.concurrent;
  • import java.util.concurrent.locks.*;
  • import java.util.concurrent.atomic.*;
  • import java.util.*;
  • public class ThreadPoolExecutor extends AbstractExecutorService {
  • /**
  • * ctl维护两个概念上的参数:runState和workCount
  • * runState表示线程池的运行状态,workCount表示有效的线程数量
  • */
  • private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  • // 用于计算runState的二进制移位数,即32 - 3 = 29
  • private static final int COUNT_BITS = Integer.SIZE - 3;
  • // 容量是1 << 29 - 1,即低29位用于表示最大容量(最大只能是2^29 - 1)
  • private static final int CAPACITY = (1 << COUNT_BITS) - 1;
  • // 高3位用于表示运行状态
  • // runState is stored in the high-order bits
  • private static final int RUNNING = -1 << COUNT_BITS; // 高三位为111
  • private static final int SHUTDOWN = 0 << COUNT_BITS; // 高三位为000
  • private static final int STOP = 1 << COUNT_BITS; // 高三位为001
  • private static final int TIDYING = 2 << COUNT_BITS; // 高三位为010
  • private static final int TERMINATED = 3 << COUNT_BITS; // 高三位为011
  • // Packing and unpacking ctl
  • // 从ctl中获取运行状态
  • private static int runStateOf(int c) {
  • return c & ~CAPACITY;
  • }
  • // 从ctl中获取有效的线程数量
  • private static int workerCountOf(int c) {
  • return c & CAPACITY;
  • }
  • // 根据运行状态和有效的线程数量获取ctl
  • private static int ctlOf(int rs, int wc) {
  • return rs | wc;
  • }
  • /*
  • * Bit field accessors that don't require unpacking ctl.
  • * These depend on the bit layout and on workerCount being never negative.
  • */
  • // 比较运行状态
  • private static boolean runStateLessThan(int c, int s) {
  • return c < s;
  • }
  • private static boolean runStateAtLeast(int c, int s) {
  • return c >= s;
  • }
  • // 判断是否是运行状态
  • private static boolean isRunning(int c) {
  • return c < SHUTDOWN;
  • }
  • /**
  • * Attempt to CAS-increment the workerCount field of ctl.
  • * CAS方式自增workCount
  • */
  • private boolean compareAndIncrementWorkerCount(int expect) {
  • return ctl.compareAndSet(expect, expect + 1);
  • }
  • /**
  • * Attempt to CAS-decrement the workerCount field of ctl.
  • * CAS方式自减workCount
  • */
  • private boolean compareAndDecrementWorkerCount(int expect) {
  • return ctl.compareAndSet(expect, expect - 1);
  • }
  • /**
  • * Decrements the workerCount field of ctl. This is called only on
  • * abrupt termination of a thread (see processWorkerExit). Other
  • * decrements are performed within getTask.
  • * <p>
  • * 自减workerCount,如果失败就自旋直至成功
  • */
  • private void decrementWorkerCount() {
  • do {
  • } while (!compareAndDecrementWorkerCount(ctl.get()));
  • }
  • /**
  • * The queue used for holding tasks and handing off to worker
  • * threads. We do not require that workQueue.poll() returning
  • * null necessarily means that workQueue.isEmpty(), so rely
  • * solely on isEmpty to see if the queue is empty (which we must
  • * do for example when deciding whether to transition from
  • * SHUTDOWN to TIDYING). This accommodates special-purpose
  • * queues such as DelayQueues for which poll() is allowed to
  • * return null even if it may later return non-null when delays
  • * expire.
  • * <p>
  • * 阻塞队列
  • */
  • private final BlockingQueue<Runnable> workQueue;
  • /**
  • * Lock held on access to workers set and related bookkeeping.
  • * While we could use a concurrent set of some sort, it turns out
  • * to be generally preferable to use a lock. Among the reasons is
  • * that this serializes interruptIdleWorkers, which avoids
  • * unnecessary interrupt storms, especially during shutdown.
  • * Otherwise exiting threads would concurrently interrupt those
  • * that have not yet interrupted. It also simplifies some of the
  • * associated statistics bookkeeping of largestPoolSize etc. We
  • * also hold mainLock on shutdown and shutdownNow, for the sake of
  • * ensuring workers set is stable while separately checking
  • * permission to interrupt and actually interrupting.
  • * <p>
  • * 主要的锁对象
  • */
  • private final ReentrantLock mainLock = new ReentrantLock();
  • /**
  • * Set containing all worker threads in pool. Accessed only when
  • * holding mainLock.
  • * <p>
  • * 用于存放worker
  • */
  • private final HashSet<Worker> workers = new HashSet<Worker>();
  • /**
  • * Wait condition to support awaitTermination
  • * <p>
  • * 用于支持awaitTermination的等待队列
  • */
  • private final Condition termination = mainLock.newCondition();
  • /**
  • * Tracks largest attained pool size. Accessed only under mainLock.
  • * 记录最大线程池大小,仅在获取锁的前提下可以访问
  • */
  • private int largestPoolSize;
  • /**
  • * Counter for completed tasks. Updated only on termination of
  • * worker threads. Accessed only under mainLock.
  • * 用于记录已完成的任务数
  • * 仅在获取锁的前提下,worker线程终止时进行更新
  • */
  • private long completedTaskCount;
  • /*
  • * All user control parameters are declared as volatiles so that
  • * ongoing actions are based on freshest values, but without need
  • * for locking, since no internal invariants depend on them
  • * changing synchronously with respect to other actions.
  • */
  • /**
  • * Factory for new threads. All threads are created using this
  • * factory (via method addWorker). All callers must be prepared
  • * for addWorker to fail, which may reflect a system or user's
  • * policy limiting the number of threads. Even though it is not
  • * treated as an error, failure to create threads may result in
  • * new tasks being rejected or existing ones remaining stuck in
  • * the queue. On the other hand, no special precautions exist to
  • * handle OutOfMemoryErrors that might be thrown while trying to
  • * create threads, since there is generally no recourse from
  • * within this class.
  • * <p>
  • * 线程工厂。所有线程都是使用线程工厂来创建的
  • */
  • private volatile ThreadFactory threadFactory;
  • /**
  • * Handler called when saturated or shutdown in execute.
  • * 当线程池饱和或关闭时,导致任务失败时的回调,用于通知调用者
  • */
  • private volatile RejectedExecutionHandler handler;
  • /**
  • * Timeout in nanoseconds for idle threads waiting for work.
  • * Threads use this timeout when there are more than corePoolSize
  • * present or if allowCoreThreadTimeOut. Otherwise they wait
  • * forever for new work.
  • * 等待线程最大的超时时间,如果有线程超过corePoolSize时会生效,
  • * 如果有线程空闲时间超过该值,则会被终止
  • */
  • private volatile long keepAliveTime;
  • /**
  • * If false (default), core threads stay alive even when idle.
  • * If true, core threads use keepAliveTime to time out waiting
  • * for work.
  • * <p>
  • * 是否允许核心线程超时
  • * 当该值为false时,核心线程在空闲时也会保持活跃;这是默认值;
  • * 当该值为true时,核心线程在空闲期间会有根据keepAliveTime计算等待任务的超时时间
  • */
  • private volatile boolean allowCoreThreadTimeOut;
  • /**
  • * Core pool size is the minimum number of workers to keep alive
  • * (and not allow to time out etc) unless allowCoreThreadTimeOut
  • * is set, in which case the minimum is zero.
  • * <p>
  • * 核心池大小,保持存活的worker的最小数量(默认会忽略超时)
  • * 但如果设置了allowCoreTimeOut为true,那么当核心线程闲置时会被回收。
  • */
  • private volatile int corePoolSize;
  • /**
  • * Maximum pool size. Note that the actual maximum is internally
  • * bounded by CAPACITY.
  • * <p>
  • * 最大线程池尺寸,被CAPACITY限制(2^29 - 1)
  • */
  • private volatile int maximumPoolSize;
  • /**
  • * The default rejected execution handler
  • * 默认的拒绝执行策略处理器
  • */
  • private static final RejectedExecutionHandler defaultHandler =
  • new AbortPolicy();
  • /**
  • * Permission required for callers of shutdown and shutdownNow.
  • * We additionally require (see checkShutdownAccess) that callers
  • * have permission to actually interrupt threads in the worker set
  • * (as governed by Thread.interrupt, which relies on
  • * ThreadGroup.checkAccess, which in turn relies on
  • * SecurityManager.checkAccess). Shutdowns are attempted only if
  • * these checks pass.
  • * <p>
  • * All actual invocations of Thread.interrupt (see
  • * interruptIdleWorkers and interruptWorkers) ignore
  • * SecurityExceptions, meaning that the attempted interrupts
  • * silently fail. In the case of shutdown, they should not fail
  • * unless the SecurityManager has inconsistent policies, sometimes
  • * allowing access to a thread and sometimes not. In such cases,
  • * failure to actually interrupt threads may disable or delay full
  • * termination. Other uses of interruptIdleWorkers are advisory,
  • * and failure to actually interrupt will merely delay response to
  • * configuration changes so is not handled exceptionally.
  • */
  • private static final RuntimePermission shutdownPerm =
  • new RuntimePermission("modifyThread");
  • /**
  • * Class Worker mainly maintains interrupt control state for
  • * threads running tasks, along with other minor bookkeeping.
  • * This class opportunistically extends AbstractQueuedSynchronizer
  • * to simplify acquiring and releasing a lock surrounding each
  • * task execution. This protects against interrupts that are
  • * intended to wake up a worker thread waiting for a task from
  • * instead interrupting a task being run. We implement a simple
  • * non-reentrant mutual exclusion lock rather than use ReentrantLock
  • * because we do not want worker tasks to be able to reacquire the
  • * lock when they invoke pool control methods like setCorePoolSize.
  • * <p>
  • * Worker线程类,该类是AQS的子类
  • */
  • private final class Worker
  • extends AbstractQueuedSynchronizer
  • implements Runnable {
  • /**
  • * This class will never be serialized, but we provide a
  • * serialVersionUID to suppress a javac warning.
  • * <p>
  • * 该类不会被序列化,提供serialVersionUID是为了压制警告信息
  • */
  • private static final long serialVersionUID = 6138294804551838833L;
  • /**
  • * Thread this worker is running in. Null if factory fails.
  • * Worker所运行的线程
  • */
  • final Thread thread;
  • /**
  • * Initial task to run. Possibly null.
  • * 运行的初始任务,是一个Runnable对象,可能为null
  • */
  • Runnable firstTask;
  • /**
  • * Per-thread task counter
  • * 当前Worker完成的任务数
  • */
  • volatile long completedTasks;
  • /**
  • * Creates with given first task and thread from ThreadFactory.
  • * 通过给定的firstTask和从ThreadFactory获取的线程创建一个Worker
  • *
  • * @param firstTask the first task (null if none)
  • */
  • Worker(Runnable firstTask) {
  • this.firstTask = firstTask;
  • this.thread = getThreadFactory().newThread(this);
  • }
  • /**
  • * Delegates main run loop to outer runWorker
  • * 用于运行Worker
  • */
  • public void run() {
  • // 将主要的运行逻辑交给runWorker()方法
  • runWorker(this);
  • }
  • // Lock methods
  • //
  • // The value 0 represents the unlocked state.
  • // The value 1 represents the locked state.
  • // 是否拥有独占锁
  • protected boolean isHeldExclusively() {
  • return getState() == 1;
  • }
  • // 尝试获取独占锁
  • protected boolean tryAcquire(int unused) {
  • if (compareAndSetState(0, 1)) {
  • setExclusiveOwnerThread(Thread.currentThread());
  • return true;
  • }
  • return false;
  • }
  • // 尝试释放独占锁
  • protected boolean tryRelease(int unused) {
  • setExclusiveOwnerThread(null);
  • setState(0);
  • return true;
  • }
  • // 获取锁
  • public void lock() {
  • acquire(1);
  • }
  • // 尝试获取锁
  • public boolean tryLock() {
  • return tryAcquire(1);
  • }
  • // 释放锁
  • public void unlock() {
  • release(1);
  • }
  • // 判断是否拥有锁
  • public boolean isLocked() {
  • return isHeldExclusively();
  • }
  • }
  • /*
  • * Methods for setting control state
  • */
  • /**
  • * Transitions runState to given target, or leaves it alone if
  • * already at least the given target.
  • * <p>
  • * 将runState转换为给定的targetState
  • *
  • * @param targetState the desired state, either SHUTDOWN or STOP
  • * (but not TIDYING or TERMINATED -- use tryTerminate for that)
  • */
  • private void advanceRunState(int targetState) {
  • for (; ; ) {
  • // 获取ctl的值
  • int c = ctl.get();
  • // 只有在runState大于或等于targetState,才能转换
  • if (runStateAtLeast(c, targetState) ||
  • // 通过CAS方式修改ctl的runState为targetState
  • ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
  • break;
  • }
  • }
  • /**
  • * Transitions to TERMINATED state if either (SHUTDOWN and pool
  • * and queue empty) or (STOP and pool empty). If otherwise
  • * eligible to terminate but workerCount is nonzero, interrupts an
  • * idle worker to ensure that shutdown signals propagate. This
  • * method must be called following any action that might make
  • * termination possible -- reducing worker count or removing tasks
  • * from the queue during shutdown. The method is non-private to
  • * allow access from ScheduledThreadPoolExecutor.
  • * <p>
  • * 当runState为SHUTDOWN或者STOP,且线程池为空时,尝试转换runState为TERMINATED
  • */
  • final void tryTerminate() {
  • for (; ; ) {
  • // 获取ctl
  • int c = ctl.get();
  • /**
  • * 判断条件是否满足,如果不满足,直接返回
  • * 1. 是RUNNING或SHUTDOWN状态,不满足;
  • * 2. 是TIDYING或TERMINATED状态,不满足;
  • * 3. 是SHUTDOWN状态,但workQueue不为空,不满足
  • */
  • if (isRunning(c) ||
  • runStateAtLeast(c, TIDYING) ||
  • (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
  • return;
  • // 如果worker数量不为0
  • if (workerCountOf(c) != 0) { // Eligible to terminate
  • /**
  • * 尝试中断worker中的线程,
  • * ONLY_ONE表示只尝试中断第一个worker中的线程
  • */
  • interruptIdleWorkers(ONLY_ONE);
  • return;
  • }
  • // 获取锁
  • final ReentrantLock mainLock = this.mainLock;
  • mainLock.lock();
  • try {
  • // 先修改runState为TIDYING,workerCount为0
  • if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
  • try {
  • // 修改成功后,尝试调用终止钩子方法
  • terminated();
  • } finally {
  • // 最终将runState修改为TERMINATED,workerCount修改为0
  • ctl.set(ctlOf(TERMINATED, 0));
  • // 唤醒等待在termination上的所有线程
  • termination.signalAll();
  • }
  • return;
  • }
  • } finally {
  • // 解锁
  • mainLock.unlock();
  • }
  • // else retry on failed CAS
  • }
  • }
  • /*
  • * Methods for controlling interrupts to worker threads.
  • */
  • /**
  • * If there is a security manager, makes sure caller has
  • * permission to shut down threads in general (see shutdownPerm).
  • * If this passes, additionally makes sure the caller is allowed
  • * to interrupt each worker thread. This might not be true even if
  • * first check passed, if the SecurityManager treats some threads
  • * specially.
  • * <p>
  • * 如果存在安全检查器,需要检查调用者是否有权限关闭线程
  • */
  • private void checkShutdownAccess() {
  • // 安全管理器
  • SecurityManager security = System.getSecurityManager();
  • if (security != null) {
  • security.checkPermission(shutdownPerm);
  • // 获取锁
  • final ReentrantLock mainLock = this.mainLock;
  • mainLock.lock();
  • try {
  • // 遍历Worker,检查线程访问权限
  • for (Worker w : workers)
  • security.checkAccess(w.thread);
  • } finally {
  • // 解锁
  • mainLock.unlock();
  • }
  • }
  • }
  • /**
  • * Interrupts all threads, even if active. Ignores SecurityExceptions
  • * (in which case some threads may remain uninterrupted).
  • * <p>
  • * 中断所有worker中的线程
  • */
  • private void interruptWorkers() {
  • // 获取锁
  • final ReentrantLock mainLock = this.mainLock;
  • mainLock.lock();
  • try {
  • // 遍历Worker
  • for (Worker w : workers) {
  • try {
  • // 中断Worker中的线程
  • w.thread.interrupt();
  • } catch (SecurityException ignore) {
  • }
  • }
  • } finally {
  • // 解锁
  • mainLock.unlock();
  • }
  • }
  • /**
  • * Interrupts threads that might be waiting for tasks (as
  • * indicated by not being locked) so they can check for
  • * termination or configuration changes. Ignores
  • * SecurityExceptions (in which case some threads may remain
  • * uninterrupted).
  • * <p>
  • * 中断空闲的Worker中的线程
  • *
  • * @param onlyOne If true, interrupt at most one worker. This is
  • * called only from tryTerminate when termination is otherwise
  • * enabled but there are still other workers. In this case, at
  • * most one waiting worker is interrupted to propagate shutdown
  • * signals in case all threads are currently waiting.
  • * Interrupting any arbitrary thread ensures that newly arriving
  • * workers since shutdown began will also eventually exit.
  • * To guarantee eventual termination, it suffices to always
  • * interrupt only one idle worker, but shutdown() interrupts all
  • * idle workers so that redundant workers exit promptly, not
  • * waiting for a straggler task to finish.
  • */
  • private void interruptIdleWorkers(boolean onlyOne) {
  • // 获取锁
  • final ReentrantLock mainLock = this.mainLock;
  • mainLock.lock();
  • try {
  • // 遍历Worker
  • for (Worker w : workers) {
  • // 获取worker中的线程
  • Thread t = w.thread;
  • // 如果t没有中断,尝试获取worker的锁
  • if (!t.isInterrupted() && w.tryLock()) {
  • try {
  • // 尝试中断t
  • t.interrupt();
  • } catch (SecurityException ignore) {
  • } finally {
  • // 解锁Worker
  • w.unlock();
  • }
  • }
  • /**
  • * 如果onlyOne,则直接break
  • * 即这个参数表示只尝试中断第一个worker中的线程
  • */
  • if (onlyOne)
  • break;
  • }
  • } finally {
  • mainLock.unlock();
  • }
  • }
  • /**
  • * Common form of interruptIdleWorkers, to avoid having to
  • * remember what the boolean argument means.
  • * 重载方法,传入的onlyOne参数为false
  • * 即将对所有的空闲Worker中的线程进行中断尝试
  • */
  • private void interruptIdleWorkers() {
  • interruptIdleWorkers(false);
  • }
  • private static final boolean ONLY_ONE = true;
  • /**
  • * Ensures that unless the pool is stopping, the current thread
  • * does not have its interrupt set. This requires a double-check
  • * of state in case the interrupt was cleared concurrently with a
  • * shutdownNow -- if so, the interrupt is re-enabled.
  • */
  • private void clearInterruptsForTaskRun() {
  • /**
  • * 判断当前runState是否为RUNNING或SHUTDOWN,
  • * 如果是则判断当前线程是否是中断的(判断同时会清除中断标志位为false),
  • * 如果当前线程是中断然后再次判断runState是否为TIDYING或TERMINATED,
  • * 如果是,则对当前线程进行中断
  • */
  • if (runStateLessThan(ctl.get(), STOP) &&
  • Thread.interrupted() &&
  • runStateAtLeast(ctl.get(), STOP))
  • Thread.currentThread().interrupt();
  • }
  • /*
  • * Misc utilities, most of which are also exported to
  • * ScheduledThreadPoolExecutor
  • */
  • /**
  • * Invokes the rejected execution handler for the given command.
  • * Package-protected for use by ScheduledThreadPoolExecutor.
  • * <p>
  • * 拒绝任务
  • */
  • final void reject(Runnable command) {
  • // 使用拒绝任务处理器来拒绝任务的执行
  • handler.rejectedExecution(command, this);
  • }
  • /**
  • * Performs any further cleanup following run state transition on
  • * invocation of shutdown. A no-op here, but used by
  • * ScheduledThreadPoolExecutor to cancel delayed tasks.
  • */
  • void onShutdown() {
  • }
  • /**
  • * State check needed by ScheduledThreadPoolExecutor to
  • * enable running tasks during shutdown.
  • * <p>
  • * 判断runState是否是RUNNING或者SHUTDOWN
  • * <p>
  • * shutdownOK参数用于表示是否在状态为SHUTDOWN是应该返回true
  • *
  • * @param shutdownOK true if should return true if SHUTDOWN
  • */
  • final boolean isRunningOrShutdown(boolean shutdownOK) {
  • int rs = runStateOf(ctl.get());
  • return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
  • }
  • /**
  • * Drains the task queue into a new list, normally using
  • * drainTo. But if the queue is a DelayQueue or any other kind of
  • * queue for which poll or drainTo may fail to remove some
  • * elements, it deletes them one by one.
  • * <p>
  • * 将当前的任务队列转移到一个新的列表中
  • */
  • private List<Runnable> drainQueue() {
  • // 获取workQueue
  • BlockingQueue<Runnable> q = workQueue;
  • // 创建一个ArrayList类型的taskList
  • List<Runnable> taskList = new ArrayList<Runnable>();
  • // 使用BlockingQueue的drainTo()方法进行转移
  • q.drainTo(taskList);
  • // workQueue可能会出现移除某个元素失败,因此需要重新处理
  • if (!q.isEmpty()) {
  • // 将q中的元素放入一个新数组,然后遍历该数组
  • for (Runnable r : q.toArray(new Runnable[0])) {
  • // 从q中移除遍历到的元素
  • if (q.remove(r))
  • // 将元素添加到taskList中
  • taskList.add(r);
  • }
  • }
  • // 将保存的任务返回
  • return taskList;
  • }
  • /*
  • * Methods for creating, running and cleaning up after workers
  • */
  • /**
  • * Checks if a new worker can be added with respect to current
  • * pool state and the given bound (either core or maximum). If so,
  • * the worker count is adjusted accordingly, and, if possible, a
  • * new worker is created and started running firstTask as its
  • * first task. This method returns false if the pool is stopped or
  • * eligible to shut down. It also returns false if the thread
  • * factory fails to create a thread when asked, which requires a
  • * backout of workerCount, and a recheck for termination, in case
  • * the existence of this worker was holding up termination.
  • * <p>
  • * 添加Worker
  • *
  • * @param firstTask the task the new thread should run first (or
  • * null if none). Workers are created with an initial first task
  • * (in method execute()) to bypass queuing when there are fewer
  • * than corePoolSize threads (in which case we always start one),
  • * or when the queue is full (in which case we must bypass queue).
  • * Initially idle threads are usually created via
  • * prestartCoreThread or to replace other dying workers.
  • * @param core if true use corePoolSize as bound, else
  • * maximumPoolSize. (A boolean indicator is used here rather than a
  • * value to ensure reads of fresh values after checking other pool
  • * state). 当core为true时以corePoolSize为上限,否则以maximumPoolSize为上限
  • * @return true if successful
  • */
  • private boolean addWorker(Runnable firstTask, boolean core) {
  • retry:
  • for (; ; ) {
  • // 获取runState
  • int c = ctl.get();
  • int rs = runStateOf(c);
  • // Check if queue empty only if necessary.
  • /**
  • * 如果runState大于等于SHUTDOWN(即为SHUTDOWN、STOP、TIDYING或TERMINATED),则返回false,
  • * 即如果线程池停止或有资格关闭,则此方法返回false;分为以下五种情况:
  • * 1,如果runState为STOP、TIDYING或TERMINATED则直接返回false;
  • * 这三个状态表示已经进入最后的清理终止,不接受任务不处理队列任务。
  • * 2,如果runState为SHUTDOWN,但firstTask不为null时直接返回false;
  • * SHUTDOWN状态表示不接受任务但处理队列任务,因此任务不为null时返回false。
  • * 3,如果runState为SHUTDOWN,firstTask为null,但workQueue为空时直接返回false;
  • * 此时线程池有资格关闭,因此会直接返回false。
  • * 4. 如果runState为SHUTDOWN,firstTask为null,但workQueue不为为空,通过校验;
  • * SHUTDOWN状态下可以处理队列任务。
  • * 5. runState为RUNNING,通过检验。
  • */
  • if (rs >= SHUTDOWN &&
  • !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
  • return false;
  • // 状态符合条件,使用无限for循环
  • for (; ; ) {
  • // 获取workerCount
  • int wc = workerCountOf(c);
  • /**
  • * core参数可用于判断添加的是否是核心线程;
  • * 如果workerCount大于等于CAPACITY,表示线程池已经满了,无法添加Worker,直接返回false结束;
  • * 或者当添加核心线程时,workerCount大于等于corePoolSize,无法添加Worker,直接返回false结束;
  • * 当添加非核心线程时,workerCount大于等于maximumPoolSize,无法添加Worker,直接返回false结束;
  • */
  • if (wc >= CAPACITY ||
  • wc >= (core ? corePoolSize : maximumPoolSize))
  • return false;
  • // 如果成功将workerCount加1,那么跳出外层循环
  • if (compareAndIncrementWorkerCount(c))
  • break retry;
  • /**
  • * 走到这里说明尝试workerCount加1失败,
  • * 重新获取runState与之前保存的runState值rs对比,如果不同说明runState被改变了,
  • * 因此直接开始下一次外层循环重新尝试
  • */
  • c = ctl.get(); // Re-read ctl
  • if (runStateOf(c) != rs)
  • continue retry;
  • // else CAS failed due to workerCount change; retry inner loop
  • }
  • }
  • // 使用firstTask创建一个新的Worker
  • Worker w = new Worker(firstTask);
  • // 获取Worker线程
  • Thread t = w.thread;
  • // 加锁
  • final ReentrantLock mainLock = this.mainLock;
  • mainLock.lock();
  • try {
  • // Recheck while holding lock.
  • // Back out on ThreadFactory failure or if
  • // shut down before lock acquired.
  • // 获取ctl和runState
  • int c = ctl.get();
  • int rs = runStateOf(c);
  • /**
  • * 如果创建的Worker线程为null,即线程工厂创建线程失败
  • * 或者虽然创建的Worker线程不为null,但runState被关闭了(处于STOP、TIDYING或TERMINATED状态)
  • * 且如果runState处于SHUTDOWN,且firstTask不为null
  • * 说明无法创建Worker
  • */
  • if (t == null || (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null))) {
  • // 将workerCount减1
  • decrementWorkerCount();
  • // 尝试将runState转换为TERMINATED,并返回false表示添加Worker失败
  • tryTerminate();
  • return false;
  • }
  • // 符合条件,添加新创建的worker
  • workers.add(w);
  • // 更新largestPoolSize
  • int s = workers.size();
  • if (s > largestPoolSize)
  • largestPoolSize = s;
  • } finally {
  • // 解锁
  • mainLock.unlock();
  • }
  • // 启动worker线程
  • t.start();
  • // It is possible (but unlikely) for a thread to have been
  • // added to workers, but not yet started, during transition to
  • // STOP, which could result in a rare missed interrupt,
  • // because Thread.interrupt is not guaranteed to have any effect
  • // on a non-yet-started Thread (see Thread#interrupt).
  • // 如果runState为STOP,且worker线程没有被中断,那么尝试中断worker线程
  • if (runStateOf(ctl.get()) == STOP && !t.isInterrupted())
  • t.interrupt();
  • return true;
  • }
  • /**
  • * Performs cleanup and bookkeeping for a dying worker. Called
  • * only from worker threads. Unless completedAbruptly is set,
  • * assumes that workerCount has already been adjusted to account
  • * for exit. This method removes thread from worker set, and
  • * possibly terminates the pool or replaces the worker if either
  • * it exited due to user task exception or if fewer than
  • * corePoolSize workers are running or queue is non-empty but
  • * there are no workers.
  • *
  • * @param w the worker
  • * @param completedAbruptly if the worker died due to user exception
  • * 用于标识Worker线程死亡是否是由于用户产生的异常
  • */
  • private void processWorkerExit(Worker w, boolean completedAbruptly) {
  • // completedAbruptly为true,此时workerCount还没有更新,因此需要手动减1
  • if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
  • decrementWorkerCount();
  • // 获取锁
  • final ReentrantLock mainLock = this.mainLock;
  • mainLock.lock();
  • try {
  • // 更新完成的任务数量
  • completedTaskCount += w.completedTasks;
  • // 从workers数组中移除w
  • workers.remove(w);
  • } finally {
  • // 解锁
  • mainLock.unlock();
  • }
  • // 尝试终止线程池
  • tryTerminate();
  • // 获取ctl
  • int c = ctl.get();
  • // 如果runState状态是SHUTDOWN或RUNNING,即还没有停止
  • if (runStateLessThan(c, STOP)) {
  • // 如果worker处理任务过程中没有出现异常
  • if (!completedAbruptly) {
  • /**
  • * 获取允许的最小核心线程数,逻辑如下:
  • * 1. 如果不允许核心线程超时,则min为核心线程数
  • * 2. 如果允许核心线程超时,且workQueue不为空,则min为1,
  • * 以确保至少有一个worker来处理队列里的任务
  • */
  • int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
  • if (min == 0 && !workQueue.isEmpty())
  • min = 1;
  • // 当workerCount大于等于min时,直接返回,不需要添加新的Worker。
  • if (workerCountOf(c) >= min)
  • return; // replacement not needed
  • }
  • /**
  • * 添加一个新的Worker线程,走到这里有下面两种情况:
  • * 1. worker中的任务因异常而退出了,此时Worker也会因抛出异常而终止,因此需要添加新的Worker;
  • * 2. workerCount小于计算出来的最小线程数,有可能是因为核心线程超时了,因此需要添加新的Worker。
  • */
  • addWorker(null, false);
  • }
  • }
  • /**
  • * Performs blocking or timed wait for a task, depending on
  • * current configuration settings, or returns null if this worker
  • * must exit because of any of:
  • * 1. There are more than maximumPoolSize workers (due to
  • * a call to setMaximumPoolSize).
  • * 2. The pool is stopped.
  • * 3. The pool is shutdown and the queue is empty.
  • * 4. This worker timed out waiting for a task, and timed-out
  • * workers are subject to termination (that is,
  • * {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
  • * both before and after the timed wait.
  • *
  • * @return task, or null if the worker must exit, in which case
  • * workerCount is decremented
  • */
  • private Runnable getTask() {
  • // 用于记录poll()方法是否超时
  • boolean timedOut = false; // Did the last poll() time out?
  • retry:
  • // 无限循环
  • for (; ; ) {
  • // 获取ctl和runState
  • int c = ctl.get();
  • int rs = runStateOf(c);
  • // 检查状态
  • // Check if queue empty only if necessary.
  • if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
  • /**
  • * 如果runState为STOP、TIDYING或TERMINATED
  • * 或者runState为SHUTDOWN,且workerQueue为空
  • * 将workerCount减1,返回null结束运行
  • */
  • decrementWorkerCount();
  • return null;
  • }
  • // 记录worker是否能够被移除
  • boolean timed; // Are workers subject to culling?
  • // 无限循环
  • for (; ; ) {
  • // 获取workerCount
  • int wc = workerCountOf(c);
  • /**
  • * 判断当前Worker是否可以被移除,即当前Worker是否可以一直等待任务。
  • * 如果allowCoreThreadTimeOut为true,或者workerCount大于核心线程数,
  • * 则当前线程是有超时时间的(keepAliveTime),无法一直等待任务
  • */
  • timed = allowCoreThreadTimeOut || wc > corePoolSize;
  • /**
  • * workerCount小于等于核心线程数,且没有超时,则跳出内层循环
  • */
  • if (wc <= maximumPoolSize && !(timedOut && timed))
  • break;
  • // 否则表示已超时,将workerCount减1,如果成功直接返回null
  • if (compareAndDecrementWorkerCount(c))
  • return null;
  • // 走到这里说明上一步workerCount减1失败了,重新读取ctl
  • c = ctl.get(); // Re-read ctl
  • // 如果与之前的runState不同,表示线程池状态发生改变了,跳出到外层循环重试
  • if (runStateOf(c) != rs)
  • continue retry;
  • // else CAS failed due to workerCount change; retry inner loop
  • }
  • try {
  • /**
  • * 根据线程是否会超时来分别调用相应的方法,
  • * poll()方法附带超时机制;take()方法没有超时机制,并且可能会阻塞等待
  • */
  • Runnable r = timed ?
  • workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
  • workQueue.take();
  • // 如果获取到的任务不为null则返回
  • if (r != null)
  • return r;
  • // 走到这里表示获取操作超时了
  • timedOut = true;
  • } catch (InterruptedException retry) {
  • // 被中断,可能是超时等待过程中被中断了
  • timedOut = false;
  • }
  • }
  • }
  • /**
  • * Main worker run loop. Repeatedly gets tasks from queue and
  • * executes them, while coping with a number of issues:
  • * <p>
  • * 1. We may start out with an initial task, in which case we
  • * don't need to get the first one. Otherwise, as long as pool is
  • * running, we get tasks from getTask. If it returns null then the
  • * worker exits due to changed pool state or configuration
  • * parameters. Other exits result from exception throws in
  • * external code, in which case completedAbruptly holds, which
  • * usually leads processWorkerExit to replace this thread.
  • * <p>
  • * 2. Before running any task, the lock is acquired to prevent
  • * other pool interrupts while the task is executing, and
  • * clearInterruptsForTaskRun called to ensure that unless pool is
  • * stopping, this thread does not have its interrupt set.
  • * <p>
  • * 3. Each task run is preceded by a call to beforeExecute, which
  • * might throw an exception, in which case we cause thread to die
  • * (breaking loop with completedAbruptly true) without processing
  • * the task.
  • * <p>
  • * 4. Assuming beforeExecute completes normally, we run the task,
  • * gathering any of its thrown exceptions to send to
  • * afterExecute. We separately handle RuntimeException, Error
  • * (both of which the specs guarantee that we trap) and arbitrary
  • * Throwables. Because we cannot rethrow Throwables within
  • * Runnable.run, we wrap them within Errors on the way out (to the
  • * thread's UncaughtExceptionHandler). Any thrown exception also
  • * conservatively causes thread to die.
  • * <p>
  • * 5. After task.run completes, we call afterExecute, which may
  • * also throw an exception, which will also cause thread to
  • * die. According to JLS Sec 14.20, this exception is the one that
  • * will be in effect even if task.run throws.
  • * <p>
  • * The net effect of the exception mechanics is that afterExecute
  • * and the thread's UncaughtExceptionHandler have as accurate
  • * information as we can provide about any problems encountered by
  • * user code.
  • *
  • * @param w the worker
  • */
  • final void runWorker(Worker w) {
  • // 引用worker的firstTask任务,并清除worker的firstTask
  • Runnable task = w.firstTask;
  • w.firstTask = null;
  • // 用于标识worker是不是因异常而死亡
  • boolean completedAbruptly = true;
  • try {
  • // worker取任务执行
  • while (task != null || (task = getTask()) != null) {
  • // 加锁
  • w.lock();
  • clearInterruptsForTaskRun();
  • try {
  • // 执行beforeExecute()钩子方法
  • beforeExecute(w.thread, task);
  • // 用于记录运行过程中的异常
  • Throwable thrown = null;
  • try {
  • // 执行任务
  • task.run();
  • } catch (RuntimeException x) {
  • thrown = x;
  • throw x;
  • } catch (Error x) {
  • thrown = x;
  • throw x;
  • } catch (Throwable x) {
  • thrown = x;
  • throw new Error(x);
  • } finally {
  • // 执行afterExecute()钩子方法
  • afterExecute(task, thrown);
  • }
  • } finally {
  • // 将执行完的任务清空
  • task = null;
  • // 将worker的完成任务数加1
  • w.completedTasks++;
  • // 解锁
  • w.unlock();
  • }
  • }
  • // 运行到这里表示运行过程中没有出现异常
  • completedAbruptly = false;
  • } finally {
  • // 调用processWorkerExit()方法处理Worker的后续清理和退出流程
  • processWorkerExit(w, completedAbruptly);
  • }
  • }
  • // Public constructors and methods
  • /**
  • * Creates a new {@code ThreadPoolExecutor} with the given initial
  • * parameters and default thread factory and rejected execution handler.
  • * It may be more convenient to use one of the {@link Executors} factory
  • * methods instead of this general purpose constructor.
  • *
  • * @param corePoolSize the number of threads to keep in the pool, even
  • * if they are idle, unless {@code allowCoreThreadTimeOut} is set
  • * @param maximumPoolSize the maximum number of threads to allow in the
  • * pool
  • * @param keepAliveTime when the number of threads is greater than
  • * the core, this is the maximum time that excess idle threads
  • * will wait for new tasks before terminating.
  • * @param unit the time unit for the {@code keepAliveTime} argument
  • * @param workQueue the queue to use for holding tasks before they are
  • * executed. This queue will hold only the {@code Runnable}
  • * tasks submitted by the {@code execute} method.
  • * @throws IllegalArgumentException if one of the following holds:<br>
  • * {@code corePoolSize < 0}<br>
  • * {@code keepAliveTime < 0}<br>
  • * {@code maximumPoolSize <= 0}<br>
  • * {@code maximumPoolSize < corePoolSize}
  • * @throws NullPointerException if {@code workQueue} is null
  • */
  • public ThreadPoolExecutor(int corePoolSize,
  • int maximumPoolSize,
  • long keepAliveTime,
  • TimeUnit unit,
  • BlockingQueue<Runnable> workQueue) {
  • this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  • Executors.defaultThreadFactory(), defaultHandler);
  • }
  • /**
  • * Creates a new {@code ThreadPoolExecutor} with the given initial
  • * parameters and default rejected execution handler.
  • *
  • * @param corePoolSize the number of threads to keep in the pool, even
  • * if they are idle, unless {@code allowCoreThreadTimeOut} is set
  • * @param maximumPoolSize the maximum number of threads to allow in the
  • * pool
  • * @param keepAliveTime when the number of threads is greater than
  • * the core, this is the maximum time that excess idle threads
  • * will wait for new tasks before terminating.
  • * @param unit the time unit for the {@code keepAliveTime} argument
  • * @param workQueue the queue to use for holding tasks before they are
  • * executed. This queue will hold only the {@code Runnable}
  • * tasks submitted by the {@code execute} method.
  • * @param threadFactory the factory to use when the executor
  • * creates a new thread
  • * @throws IllegalArgumentException if one of the following holds:<br>
  • * {@code corePoolSize < 0}<br>
  • * {@code keepAliveTime < 0}<br>
  • * {@code maximumPoolSize <= 0}<br>
  • * {@code maximumPoolSize < corePoolSize}
  • * @throws NullPointerException if {@code workQueue}
  • * or {@code threadFactory} is null
  • */
  • public ThreadPoolExecutor(int corePoolSize,
  • int maximumPoolSize,
  • long keepAliveTime,
  • TimeUnit unit,
  • BlockingQueue<Runnable> workQueue,
  • ThreadFactory threadFactory) {
  • this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  • threadFactory, defaultHandler);
  • }
  • /**
  • * Creates a new {@code ThreadPoolExecutor} with the given initial
  • * parameters and default thread factory.
  • *
  • * @param corePoolSize the number of threads to keep in the pool, even
  • * if they are idle, unless {@code allowCoreThreadTimeOut} is set
  • * @param maximumPoolSize the maximum number of threads to allow in the
  • * pool
  • * @param keepAliveTime when the number of threads is greater than
  • * the core, this is the maximum time that excess idle threads
  • * will wait for new tasks before terminating.
  • * @param unit the time unit for the {@code keepAliveTime} argument
  • * @param workQueue the queue to use for holding tasks before they are
  • * executed. This queue will hold only the {@code Runnable}
  • * tasks submitted by the {@code execute} method.
  • * @param handler the handler to use when execution is blocked
  • * because the thread bounds and queue capacities are reached
  • * @throws IllegalArgumentException if one of the following holds:<br>
  • * {@code corePoolSize < 0}<br>
  • * {@code keepAliveTime < 0}<br>
  • * {@code maximumPoolSize <= 0}<br>
  • * {@code maximumPoolSize < corePoolSize}
  • * @throws NullPointerException if {@code workQueue}
  • * or {@code handler} is null
  • */
  • public ThreadPoolExecutor(int corePoolSize,
  • int maximumPoolSize,
  • long keepAliveTime,
  • TimeUnit unit,
  • BlockingQueue<Runnable> workQueue,
  • RejectedExecutionHandler handler) {
  • this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  • Executors.defaultThreadFactory(), handler);
  • }
  • /**
  • * Creates a new {@code ThreadPoolExecutor} with the given initial
  • * parameters.
  • * <p>
  • * 构造一个ThreadPoolExecutor
  • *
  • * @param corePoolSize the number of threads to keep in the pool, even
  • * if they are idle, unless {@code allowCoreThreadTimeOut} is set
  • * @param maximumPoolSize the maximum number of threads to allow in the
  • * pool
  • * @param keepAliveTime when the number of threads is greater than
  • * the core, this is the maximum time that excess idle threads
  • * will wait for new tasks before terminating.
  • * @param unit the time unit for the {@code keepAliveTime} argument
  • * @param workQueue the queue to use for holding tasks before they are
  • * executed. This queue will hold only the {@code Runnable}
  • * tasks submitted by the {@code execute} method.
  • * @param threadFactory the factory to use when the executor
  • * creates a new thread
  • * @param handler the handler to use when execution is blocked
  • * because the thread bounds and queue capacities are reached
  • * @throws IllegalArgumentException if one of the following holds:<br>
  • * {@code corePoolSize < 0}<br>
  • * {@code keepAliveTime < 0}<br>
  • * {@code maximumPoolSize <= 0}<br>
  • * {@code maximumPoolSize < corePoolSize}
  • * @throws NullPointerException if {@code workQueue}
  • * or {@code threadFactory} or {@code handler} is null
  • */
  • public ThreadPoolExecutor(int corePoolSize,
  • int maximumPoolSize,
  • long keepAliveTime,
  • TimeUnit unit,
  • BlockingQueue<Runnable> workQueue,
  • ThreadFactory threadFactory,
  • RejectedExecutionHandler handler) {
  • /**
  • * 检查各类参数,
  • * maximumPoolSize要大于0,
  • * corePoolSize不能小于0,且corePoolSize不能大于maximumPoolSize
  • * keepAliveTime不能小于0
  • */
  • if (corePoolSize < 0 ||
  • maximumPoolSize <= 0 ||
  • maximumPoolSize < corePoolSize ||
  • keepAliveTime < 0)
  • throw new IllegalArgumentException();
  • // 工作队列,线程工厂和拒绝处理器不能为null
  • if (workQueue == null || threadFactory == null || handler == null)
  • throw new NullPointerException();
  • // 对各项参数赋值
  • this.corePoolSize = corePoolSize;
  • this.maximumPoolSize = maximumPoolSize;
  • this.workQueue = workQueue;
  • this.keepAliveTime = unit.toNanos(keepAliveTime);
  • this.threadFactory = threadFactory;
  • this.handler = handler;
  • }
  • /**
  • * Executes the given task sometime in the future. The task
  • * may execute in a new thread or in an existing pooled thread.
  • * <p>
  • * If the task cannot be submitted for execution, either because this
  • * executor has been shutdown or because its capacity has been reached,
  • * the task is handled by the current {@code RejectedExecutionHandler}.
  • *
  • * @param command the task to execute
  • * @throws RejectedExecutionException at discretion of
  • * {@code RejectedExecutionHandler}, if the task
  • * cannot be accepted for execution
  • * @throws NullPointerException if {@code command} is null
  • */
  • public void execute(Runnable command) {
  • // 参数检查
  • if (command == null)
  • throw new NullPointerException();
  • /*
  • * Proceed in 3 steps:
  • *
  • * 1. If fewer than corePoolSize threads are running, try to
  • * start a new thread with the given command as its first
  • * task. The call to addWorker atomically checks runState and
  • * workerCount, and so prevents false alarms that would add
  • * threads when it shouldn't, by returning false.
  • *
  • * 2. If a task can be successfully queued, then we still need
  • * to double-check whether we should have added a thread
  • * (because existing ones died since last checking) or that
  • * the pool shut down since entry into this method. So we
  • * recheck state and if necessary roll back the enqueuing if
  • * stopped, or start a new thread if there are none.
  • *
  • * 3. If we cannot queue task, then we try to add a new
  • * thread. If it fails, we know we are shut down or saturated
  • * and so reject the task.
  • */
  • // 获取ctl值
  • int c = ctl.get();
  • // 计算workerCount是否小于corePoolSize
  • if (workerCountOf(c) < corePoolSize) {
  • /**
  • * 如果小于,就调用addWorker()方法新建一个Worker线程来处理任务
  • * 这里传入的core参数为true,表示创建的是核心线程
  • */
  • if (addWorker(command, true))
  • // 如果添加成功,直接返回
  • return;
  • // 否则重新拿到ctl的值
  • c = ctl.get();
  • }
  • /**
  • * 走到这里,说明workerCount是大于等于corePoolSize的
  • * 检查当前的runState是否是RUNNING,
  • * 如果是,就将传入的command任务放入workQueue等待队列
  • */
  • if (isRunning(c) && workQueue.offer(command)) {
  • // 放入等待队列成功之后,重新获取ctl的值进行检查
  • int recheck = ctl.get();
  • // 如果runState不是RUNNING状态,则将command任务从workQueue队列移除
  • if (!isRunning(recheck) && remove(command))
  • // 然后调用reject拒绝任务,底层会使用拒绝策略进行处理
  • reject(command);
  • // 否则如果workerCount为0,则新建一个线程(firstTask为null,core为false,非核心线程)
  • else if (workerCountOf(recheck) == 0)
  • addWorker(null, false);
  • }
  • /**
  • * 走到这里说明将任务添加到workQueue失败了,等待队列已满,
  • * 则创建一个非核心线程来处理任务
  • */
  • else if (!addWorker(command, false))
  • // 如果创建失败则调用reject拒绝任务
  • reject(command);
  • }
  • /**
  • * Initiates an orderly shutdown in which previously submitted
  • * tasks are executed, but no new tasks will be accepted.
  • * Invocation has no additional effect if already shut down.
  • *
  • * <p>This method does not wait for previously submitted tasks to
  • * complete execution. Use {@link #awaitTermination awaitTermination}
  • * to do that.
  • * <p>
  • * 发起线程池的有序关闭,会对所有空闲Worker进行中断
  • * 但已经运行的任务还会继续运行
  • *
  • * @throws SecurityException {@inheritDoc}
  • */
  • public void shutdown() {
  • // 加锁
  • final ReentrantLock mainLock = this.mainLock;
  • mainLock.lock();
  • try {
  • // 检查可否关闭的权限
  • checkShutdownAccess();
  • // 将runState转换为SHUTDOWN
  • advanceRunState(SHUTDOWN);
  • // 中断空闲的Worker
  • interruptIdleWorkers();
  • // 调用onShutdown()钩子方法
  • onShutdown(); // hook for ScheduledThreadPoolExecutor
  • } finally {
  • // 解锁
  • mainLock.unlock();
  • }
  • // 尝试终止线程池
  • tryTerminate();
  • }
  • /**
  • * Attempts to stop all actively executing tasks, halts the
  • * processing of waiting tasks, and returns a list of the tasks
  • * that were awaiting execution. These tasks are drained (removed)
  • * from the task queue upon return from this method.
  • *
  • * <p>This method does not wait for actively executing tasks to
  • * terminate. Use {@link #awaitTermination awaitTermination} to
  • * do that.
  • *
  • * <p>There are no guarantees beyond best-effort attempts to stop
  • * processing actively executing tasks. This implementation
  • * cancels tasks via {@link Thread#interrupt}, so any task that
  • * fails to respond to interrupts may never terminate.
  • * <p>
  • * 发起线程池的有序关闭,会对所有的Worker进行中断
  • * 已经运行的任务
  • *
  • * @throws SecurityException {@inheritDoc}
  • */
  • public List<Runnable> shutdownNow() {
  • List<Runnable> tasks;
  • // 加锁
  • final ReentrantLock mainLock = this.mainLock;
  • mainLock.lock();
  • try {
  • // 检查可否关闭的权限
  • checkShutdownAccess();
  • // 将runState转换为SHUTDOWN
  • advanceRunState(STOP);
  • // 中断所有的Worker
  • interruptWorkers();
  • // 将队列中的任务全部取出赋值给task
  • tasks = drainQueue();
  • } finally {
  • // 解锁
  • mainLock.unlock();
  • }
  • // 尝试终止线程池
  • tryTerminate();
  • // 返回保存的任务
  • return tasks;
  • }
  • // 判断线程池是否已关闭
  • public boolean isShutdown() {
  • return !isRunning(ctl.get());
  • }
  • /**
  • * Returns true if this executor is in the process of terminating
  • * after {@link #shutdown} or {@link #shutdownNow} but has not
  • * completely terminated. This method may be useful for
  • * debugging. A return of {@code true} reported a sufficient
  • * period after shutdown may indicate that submitted tasks have
  • * ignored or suppressed interruption, causing this executor not
  • * to properly terminate.
  • * <p>
  • * 判断线程池是否正处于终止过程中
  • *
  • * @return true if terminating but not yet terminated
  • */
  • public boolean isTerminating() {
  • int c = ctl.get();
  • return !isRunning(c) && runStateLessThan(c, TERMINATED);
  • }
  • // 判断线程池是否已终止
  • public boolean isTerminated() {
  • return runStateAtLeast(ctl.get(), TERMINATED);
  • }
  • public boolean awaitTermination(long timeout, TimeUnit unit)
  • throws InterruptedException {
  • // 转换超时时间
  • long nanos = unit.toNanos(timeout);
  • // 获取锁
  • final ReentrantLock mainLock = this.mainLock;
  • mainLock.lock();
  • try {
  • for (; ; ) {
  • // 尝试将runState转换为TERMINATED状态
  • if (runStateAtLeast(ctl.get(), TERMINATED))
  • // 成功就返回true
  • return true;
  • // 否则判断是否超时,如果超时就返回false
  • if (nanos <= 0)
  • return false;
  • // 计算超时时间
  • nanos = termination.awaitNanos(nanos);
  • }
  • } finally {
  • // 解锁
  • mainLock.unlock();
  • }
  • }
  • /**
  • * Invokes {@code shutdown} when this executor is no longer
  • * referenced and it has no threads.
  • */
  • protected void finalize() {
  • shutdown();
  • }
  • /**
  • * Sets the thread factory used to create new threads.
  • * <p>
  • * 设置线程工厂
  • *
  • * @param threadFactory the new thread factory
  • * @throws NullPointerException if threadFactory is null
  • * @see #getThreadFactory
  • */
  • public void setThreadFactory(ThreadFactory threadFactory) {
  • if (threadFactory == null)
  • throw new NullPointerException();
  • this.threadFactory = threadFactory;
  • }
  • /**
  • * Returns the thread factory used to create new threads.
  • * <p>
  • * 获取线程工厂
  • *
  • * @return the current thread factory
  • * @see #setThreadFactory
  • */
  • public ThreadFactory getThreadFactory() {
  • return threadFactory;
  • }
  • /**
  • * Sets a new handler for unexecutable tasks.
  • * <p>
  • * 设置拒绝策略处理器
  • *
  • * @param handler the new handler
  • * @throws NullPointerException if handler is null
  • * @see #getRejectedExecutionHandler
  • */
  • public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
  • if (handler == null)
  • throw new NullPointerException();
  • this.handler = handler;
  • }
  • /**
  • * Returns the current handler for unexecutable tasks.
  • * <p>
  • * 获取拒绝策略处理器
  • *
  • * @return the current handler
  • * @see #setRejectedExecutionHandler
  • */
  • public RejectedExecutionHandler getRejectedExecutionHandler() {
  • return handler;
  • }
  • /**
  • * Sets the core number of threads. This overrides any value set
  • * in the constructor. If the new value is smaller than the
  • * current value, excess existing threads will be terminated when
  • * they next become idle. If larger, new threads will, if needed,
  • * be started to execute any queued tasks.
  • * <p>
  • * 设置核心线程池大小
  • *
  • * @param corePoolSize the new core size
  • * @throws IllegalArgumentException if {@code corePoolSize < 0}
  • * @see #getCorePoolSize
  • */
  • public void setCorePoolSize(int corePoolSize) {
  • // 检查参数
  • if (corePoolSize < 0)
  • throw new IllegalArgumentException();
  • // 计算差量
  • int delta = corePoolSize - this.corePoolSize;
  • // 更新corePoolSize值
  • this.corePoolSize = corePoolSize;
  • // 如果当前wokerCount大于需要设置的corePoolSize则终止空闲Worker
  • if (workerCountOf(ctl.get()) > corePoolSize)
  • interruptIdleWorkers();
  • else if (delta > 0) {
  • // We don't really know how many new threads are "needed".
  • // As a heuristic, prestart enough new workers (up to new
  • // core size) to handle the current number of tasks in
  • // queue, but stop if queue becomes empty while doing so.
  • /**
  • * 如果设置的corePoolSize比旧的大,则计算差值与workQueue的大小的最小值
  • * 根据该值添加相应数量的新的Worker
  • */
  • int k = Math.min(delta, workQueue.size());
  • while (k-- > 0 && addWorker(null, true)) {
  • // 添加过程中如果遇到workQueue为空,则直接结束添加
  • if (workQueue.isEmpty())
  • break;
  • }
  • }
  • }
  • /**
  • * Returns the core number of threads.
  • * <p>
  • * 获取核心线程数
  • *
  • * @return the core number of threads
  • * @see #setCorePoolSize
  • */
  • public int getCorePoolSize() {
  • return corePoolSize;
  • }
  • /**
  • * Starts a core thread, causing it to idly wait for work. This
  • * overrides the default policy of starting core threads only when
  • * new tasks are executed. This method will return {@code false}
  • * if all core threads have already been started.
  • * <p>
  • * 预启动核心线程,即提前创建核心线程
  • *
  • * @return {@code true} if a thread was started
  • */
  • public boolean prestartCoreThread() {
  • return workerCountOf(ctl.get()) < corePoolSize &&
  • addWorker(null, true);
  • }
  • /**
  • * Same as prestartCoreThread except arranges that at least one
  • * thread is started even if corePoolSize is 0.
  • */
  • void ensurePrestart() {
  • // 获取workerCount
  • int wc = workerCountOf(ctl.get());
  • // 如果workerCount小于corePoolSize
  • if (wc < corePoolSize)
  • // 添加核心线程
  • addWorker(null, true);
  • else if (wc == 0)
  • // 添加非核心线程
  • addWorker(null, false);
  • }
  • /**
  • * Starts all core threads, causing them to idly wait for work. This
  • * overrides the default policy of starting core threads only when
  • * new tasks are executed.
  • * <p>
  • * 预启动所有核心线程
  • * 该方法会不停启动核心线程,直到核心线程数达到要求
  • *
  • * @return the number of threads started
  • */
  • public int prestartAllCoreThreads() {
  • int n = 0;
  • while (addWorker(null, true))
  • ++n;
  • // 返回创建的核心线程数量
  • return n;
  • }
  • /**
  • * Returns true if this pool allows core threads to time out and
  • * terminate if no tasks arrive within the keepAlive time, being
  • * replaced if needed when new tasks arrive. When true, the same
  • * keep-alive policy applying to non-core threads applies also to
  • * core threads. When false (the default), core threads are never
  • * terminated due to lack of incoming tasks.
  • * <p>
  • * 获取是否允许核心线程超时
  • *
  • * @return {@code true} if core threads are allowed to time out,
  • * else {@code false}
  • * @since 1.6
  • */
  • public boolean allowsCoreThreadTimeOut() {
  • return allowCoreThreadTimeOut;
  • }
  • /**
  • * Sets the policy governing whether core threads may time out and
  • * terminate if no tasks arrive within the keep-alive time, being
  • * replaced if needed when new tasks arrive. When false, core
  • * threads are never terminated due to lack of incoming
  • * tasks. When true, the same keep-alive policy applying to
  • * non-core threads applies also to core threads. To avoid
  • * continual thread replacement, the keep-alive time must be
  • * greater than zero when setting {@code true}. This method
  • * should in general be called before the pool is actively used.
  • * <p>
  • * 设置是否允许核心线程超时
  • *
  • * @param value {@code true} if should time out, else {@code false}
  • * @throws IllegalArgumentException if value is {@code true}
  • * and the current keep-alive time is not greater than zero
  • * @since 1.6
  • */
  • public void allowCoreThreadTimeOut(boolean value) {
  • // 检查参数
  • if (value && keepAliveTime <= 0)
  • throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
  • if (value != allowCoreThreadTimeOut) {
  • allowCoreThreadTimeOut = value;
  • // 如果允许超时,则中断空闲worker
  • if (value)
  • interruptIdleWorkers();
  • }
  • }
  • /**
  • * Sets the maximum allowed number of threads. This overrides any
  • * value set in the constructor. If the new value is smaller than
  • * the current value, excess existing threads will be
  • * terminated when they next become idle.
  • * <p>
  • * 设置最大线程数
  • *
  • * @param maximumPoolSize the new maximum
  • * @throws IllegalArgumentException if the new maximum is
  • * less than or equal to zero, or
  • * less than the {@linkplain #getCorePoolSize core pool size}
  • * @see #getMaximumPoolSize
  • */
  • public void setMaximumPoolSize(int maximumPoolSize) {
  • // 检查参数
  • if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
  • throw new IllegalArgumentException();
  • // 设置核心线程数
  • this.maximumPoolSize = maximumPoolSize;
  • // 如果当前的WorkerCount大于最大线程数,中断空闲Worker
  • if (workerCountOf(ctl.get()) > maximumPoolSize)
  • interruptIdleWorkers();
  • }
  • /**
  • * Returns the maximum allowed number of threads.
  • * <p>
  • * 获取最大线程数
  • *
  • * @return the maximum allowed number of threads
  • * @see #setMaximumPoolSize
  • */
  • public int getMaximumPoolSize() {
  • return maximumPoolSize;
  • }
  • /**
  • * Sets the time limit for which threads may remain idle before
  • * being terminated. If there are more than the core number of
  • * threads currently in the pool, after waiting this amount of
  • * time without processing a task, excess threads will be
  • * terminated. This overrides any value set in the constructor.
  • * <p>
  • * 设置核心线程最大空闲时间
  • *
  • * @param time the time to wait. A time value of zero will cause
  • * excess threads to terminate immediately after executing tasks.
  • * @param unit the time unit of the {@code time} argument
  • * @throws IllegalArgumentException if {@code time} less than zero or
  • * if {@code time} is zero and {@code allowsCoreThreadTimeOut}
  • * @see #getKeepAliveTime
  • */
  • public void setKeepAliveTime(long time, TimeUnit unit) {
  • // 检查参数
  • if (time < 0)
  • throw new IllegalArgumentException();
  • if (time == 0 && allowsCoreThreadTimeOut())
  • throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
  • long keepAliveTime = unit.toNanos(time);
  • long delta = keepAliveTime - this.keepAliveTime;
  • this.keepAliveTime = keepAliveTime;
  • // 如果新的超时时间比旧的小,则中断空闲的核心线程
  • if (delta < 0)
  • interruptIdleWorkers();
  • }
  • /**
  • * Returns the thread keep-alive time, which is the amount of time
  • * that threads in excess of the core pool size may remain
  • * idle before being terminated.
  • * <p>
  • * 获取核心线程最大空闲时间
  • *
  • * @param unit the desired time unit of the result
  • * @return the time limit
  • * @see #setKeepAliveTime
  • */
  • public long getKeepAliveTime(TimeUnit unit) {
  • return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
  • }
  • /* User-level queue utilities */
  • /**
  • * Returns the task queue used by this executor. Access to the
  • * task queue is intended primarily for debugging and monitoring.
  • * This queue may be in active use. Retrieving the task queue
  • * does not prevent queued tasks from executing.
  • * <p>
  • * 获取等待队列
  • *
  • * @return the task queue
  • */
  • public BlockingQueue<Runnable> getQueue() {
  • return workQueue;
  • }
  • /**
  • * Removes this task from the executor's internal queue if it is
  • * present, thus causing it not to be run if it has not already
  • * started.
  • *
  • * <p> This method may be useful as one part of a cancellation
  • * scheme. It may fail to remove tasks that have been converted
  • * into other forms before being placed on the internal queue. For
  • * example, a task entered using {@code submit} might be
  • * converted into a form that maintains {@code Future} status.
  • * However, in such cases, method {@link #purge} may be used to
  • * remove those Futures that have been cancelled.
  • * <p>
  • * 移除等待的任务
  • *
  • * @param task the task to remove
  • * @return true if the task was removed
  • */
  • public boolean remove(Runnable task) {
  • // 从workQueue中移除任务
  • boolean removed = workQueue.remove(task);
  • // 尝试终止线程池
  • tryTerminate(); // In case SHUTDOWN and now empty
  • // 返回是否移除成功
  • return removed;
  • }
  • /**
  • * Tries to remove from the work queue all {@link Future}
  • * tasks that have been cancelled. This method can be useful as a
  • * storage reclamation operation, that has no other impact on
  • * functionality. Cancelled tasks are never executed, but may
  • * accumulate in work queues until worker threads can actively
  • * remove them. Invoking this method instead tries to remove them now.
  • * However, this method may fail to remove tasks in
  • * the presence of interference by other threads.
  • * <p>
  • * 移除被取消的任务
  • */
  • public void purge() {
  • // 获取等待队列
  • final BlockingQueue<Runnable> q = workQueue;
  • try {
  • // 获取迭代器
  • Iterator<Runnable> it = q.iterator();
  • while (it.hasNext()) {
  • Runnable r = it.next();
  • // 如果任务被取消,就将其移除
  • if (r instanceof Future<?> && ((Future<?>) r).isCancelled())
  • it.remove();
  • }
  • } catch (ConcurrentModificationException fallThrough) {
  • // Take slow path if we encounter interference during traversal.
  • // Make copy for traversal and call remove for cancelled entries.
  • // The slow path is more likely to be O(N*N).
  • /**
  • * 如果移除过程中出现了ConcurrentModificationException异常
  • * 说明等待队列不支持并发,因此将workQueue转换为数组,然后重新遍历移除
  • */
  • for (Object r : q.toArray())
  • if (r instanceof Future<?> && ((Future<?>) r).isCancelled())
  • q.remove(r);
  • }
  • // 尝试终止线程池
  • tryTerminate(); // In case SHUTDOWN and now empty
  • }
  • /* Statistics */
  • /**
  • * Returns the current number of threads in the pool.
  • * <p>
  • * 获取当前池中线程数量
  • *
  • * @return the number of threads
  • */
  • public int getPoolSize() {
  • final ReentrantLock mainLock = this.mainLock;
  • mainLock.lock();
  • try {
  • // Remove rare and surprising possibility of
  • // isTerminated() && getPoolSize() > 0
  • // 如果线程池状态是TIDYING或TERMINATED,则返回0,否则返回workers的大小
  • return runStateAtLeast(ctl.get(), TIDYING) ? 0 : workers.size();
  • } finally {
  • mainLock.unlock();
  • }
  • }
  • /**
  • * Returns the approximate number of threads that are actively
  • * executing tasks.
  • * <p>
  • * 获取正在执行的任务的近似数量
  • *
  • * @return the number of threads
  • */
  • public int getActiveCount() {
  • final ReentrantLock mainLock = this.mainLock;
  • mainLock.lock();
  • try {
  • int n = 0;
  • // 遍历workers进行统计
  • for (Worker w : workers)
  • if (w.isLocked())
  • ++n;
  • return n;
  • } finally {
  • mainLock.unlock();
  • }
  • }
  • /**
  • * Returns the largest number of threads that have ever
  • * simultaneously been in the pool.
  • * <p>
  • * 获取同时存在于池中的最大线程数。
  • *
  • * @return the number of threads
  • */
  • public int getLargestPoolSize() {
  • final ReentrantLock mainLock = this.mainLock;
  • mainLock.lock();
  • try {
  • return largestPoolSize;
  • } finally {
  • mainLock.unlock();
  • }
  • }
  • /**
  • * Returns the approximate total number of tasks that have ever been
  • * scheduled for execution. Because the states of tasks and
  • * threads may change dynamically during computation, the returned
  • * value is only an approximation.
  • * <p>
  • * 获取任务总数量,包含已完成的任务数
  • *
  • * @return the number of tasks
  • */
  • public long getTaskCount() {
  • final ReentrantLock mainLock = this.mainLock;
  • mainLock.lock();
  • try {
  • // 获取已经完成的任务数
  • long n = completedTaskCount;
  • // 遍历worker,加上每个worker完成的任务数,
  • for (Worker w : workers) {
  • n += w.completedTasks;
  • if (w.isLocked())
  • ++n;
  • }
  • // 加上等待队列中的任务数,并返回
  • return n + workQueue.size();
  • } finally {
  • mainLock.unlock();
  • }
  • }
  • /**
  • * Returns the approximate total number of tasks that have
  • * completed execution. Because the states of tasks and threads
  • * may change dynamically during computation, the returned value
  • * is only an approximation, but one that does not ever decrease
  • * across successive calls.
  • * <p>
  • * 获取已经完成的任务数
  • *
  • * @return the number of tasks
  • */
  • public long getCompletedTaskCount() {
  • final ReentrantLock mainLock = this.mainLock;
  • mainLock.lock();
  • try {
  • // 获取已经完成的任务数
  • long n = completedTaskCount;
  • for (Worker w : workers)
  • // 加上每个worker完成的任务书
  • n += w.completedTasks;
  • return n;
  • } finally {
  • mainLock.unlock();
  • }
  • }
  • /**
  • * Returns a string identifying this pool, as well as its state,
  • * including indications of run state and estimated worker and
  • * task counts.
  • *
  • * @return a string identifying this pool, as well as its state
  • */
  • public String toString() {
  • long ncompleted;
  • int nworkers, nactive;
  • final ReentrantLock mainLock = this.mainLock;
  • mainLock.lock();
  • try {
  • ncompleted = completedTaskCount;
  • nactive = 0;
  • nworkers = workers.size();
  • for (Worker w : workers) {
  • ncompleted += w.completedTasks;
  • if (w.isLocked())
  • ++nactive;
  • }
  • } finally {
  • mainLock.unlock();
  • }
  • int c = ctl.get();
  • String rs = (runStateLessThan(c, SHUTDOWN) ? "Running" :
  • (runStateAtLeast(c, TERMINATED) ? "Terminated" :
  • "Shutting down"));
  • return super.toString() +
  • "[" + rs +
  • ", pool size = " + nworkers +
  • ", active threads = " + nactive +
  • ", queued tasks = " + workQueue.size() +
  • ", completed tasks = " + ncompleted +
  • "]";
  • }
  • /* Extension hooks */
  • /**
  • * Method invoked prior to executing the given Runnable in the
  • * given thread. This method is invoked by thread {@code t} that
  • * will execute task {@code r}, and may be used to re-initialize
  • * ThreadLocals, or to perform logging.
  • *
  • * <p>This implementation does nothing, but may be customized in
  • * subclasses. Note: To properly nest multiple overridings, subclasses
  • * should generally invoke {@code super.beforeExecute} at the end of
  • * this method.
  • * <p>
  • * 执行前钩子函数
  • *
  • * @param t the thread that will run task {@code r}
  • * @param r the task that will be executed
  • */
  • protected void beforeExecute(Thread t, Runnable r) {
  • }
  • /**
  • * Method invoked upon completion of execution of the given Runnable.
  • * This method is invoked by the thread that executed the task. If
  • * non-null, the Throwable is the uncaught {@code RuntimeException}
  • * or {@code Error} that caused execution to terminate abruptly.
  • *
  • * <p>This implementation does nothing, but may be customized in
  • * subclasses. Note: To properly nest multiple overridings, subclasses
  • * should generally invoke {@code super.afterExecute} at the
  • * beginning of this method.
  • *
  • * <p><b>Note:</b> When actions are enclosed in tasks (such as
  • * {@link FutureTask}) either explicitly or via methods such as
  • * {@code submit}, these task objects catch and maintain
  • * computational exceptions, and so they do not cause abrupt
  • * termination, and the internal exceptions are <em>not</em>
  • * passed to this method. If you would like to trap both kinds of
  • * failures in this method, you can further probe for such cases,
  • * as in this sample subclass that prints either the direct cause
  • * or the underlying exception if a task has been aborted:
  • *
  • * <pre> {@code
  • * class ExtendedExecutor extends ThreadPoolExecutor {
  • * // ...
  • * protected void afterExecute(Runnable r, Throwable t) {
  • * super.afterExecute(r, t);
  • * if (t == null && r instanceof Future<?>) {
  • * try {
  • * Object result = ((Future<?>) r).get();
  • * } catch (CancellationException ce) {
  • * t = ce;
  • * } catch (ExecutionException ee) {
  • * t = ee.getCause();
  • * } catch (InterruptedException ie) {
  • * Thread.currentThread().interrupt(); // ignore/reset
  • * }
  • * }
  • * if (t != null)
  • * System.out.println(t);
  • * }
  • * }}</pre>
  • * <p>
  • * 执行后钩子函数
  • *
  • * @param r the runnable that has completed
  • * @param t the exception that caused termination, or null if
  • * execution completed normally
  • */
  • protected void afterExecute(Runnable r, Throwable t) {
  • }
  • /**
  • * Method invoked when the Executor has terminated. Default
  • * implementation does nothing. Note: To properly nest multiple
  • * overridings, subclasses should generally invoke
  • * {@code super.terminated} within this method.
  • * <p>
  • * 线程池已终止的钩子函数
  • */
  • protected void terminated() {
  • }
  • /* Predefined RejectedExecutionHandlers */
  • /**
  • * A handler for rejected tasks that runs the rejected task
  • * directly in the calling thread of the {@code execute} method,
  • * unless the executor has been shut down, in which case the task
  • * is discarded.
  • * <p>
  • * 调用者运行策略
  • * 当线程池拒绝接受任务后,会由调用者自行运行任务
  • */
  • public static class CallerRunsPolicy implements RejectedExecutionHandler {
  • /**
  • * Creates a {@code CallerRunsPolicy}.
  • */
  • public CallerRunsPolicy() {
  • }
  • /**
  • * Executes task r in the caller's thread, unless the executor
  • * has been shut down, in which case the task is discarded.
  • *
  • * @param r the runnable task requested to be executed
  • * @param e the executor attempting to execute this task
  • */
  • public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  • // 如果线程池还未关闭,就直接运行任务
  • if (!e.isShutdown()) {
  • // 不启动新的线程,直接调用任务的run()方法
  • r.run();
  • }
  • }
  • }
  • /**
  • * A handler for rejected tasks that throws a
  • * {@code RejectedExecutionException}.
  • * <p>
  • * 中止策略,会直接抛出异常
  • */
  • public static class AbortPolicy implements RejectedExecutionHandler {
  • /**
  • * Creates an {@code AbortPolicy}.
  • */
  • public AbortPolicy() {
  • }
  • /**
  • * Always throws RejectedExecutionException.
  • *
  • * @param r the runnable task requested to be executed
  • * @param e the executor attempting to execute this task
  • * @throws RejectedExecutionException always.
  • */
  • public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  • throw new RejectedExecutionException("Task " + r.toString() +
  • " rejected from " +
  • e.toString());
  • }
  • }
  • /**
  • * A handler for rejected tasks that silently discards the
  • * rejected task.
  • * 抛弃策略,直接抛弃任务,不会做任何处理
  • */
  • public static class DiscardPolicy implements RejectedExecutionHandler {
  • /**
  • * Creates a {@code DiscardPolicy}.
  • */
  • public DiscardPolicy() {
  • }
  • /**
  • * Does nothing, which has the effect of discarding task r.
  • *
  • * @param r the runnable task requested to be executed
  • * @param e the executor attempting to execute this task
  • */
  • public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  • }
  • }
  • /**
  • * A handler for rejected tasks that discards the oldest unhandled
  • * request and then retries {@code execute}, unless the executor
  • * is shut down, in which case the task is discarded.
  • * <p>
  • * 抛弃最旧策略,会将等待队列中等待最久的任务放弃执行,然后添加被拒绝的任务
  • */
  • public static class DiscardOldestPolicy implements RejectedExecutionHandler {
  • /**
  • * Creates a {@code DiscardOldestPolicy} for the given executor.
  • */
  • public DiscardOldestPolicy() {
  • }
  • /**
  • * Obtains and ignores the next task that the executor
  • * would otherwise execute, if one is immediately available,
  • * and then retries execution of task r, unless the executor
  • * is shut down, in which case task r is instead discarded.
  • *
  • * @param r the runnable task requested to be executed
  • * @param e the executor attempting to execute this task
  • */
  • public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  • // 如果线程池没有关闭
  • if (!e.isShutdown()) {
  • // 将等待最久的的任务移除,放弃执行
  • e.getQueue().poll();
  • // 然后添加被拒绝的任务
  • e.execute(r);
  • }
  • }
  • }
  • }