Java多线程 44 - ThreadPoolExecutor详解(2)
发布于 / 2018-11-27
简介:ThreadPoolExecutor是线程池类,可以通俗的将它理解为存放一定数量线程的一个线程集合。线程池允许若个线程同时运行,同时运行的线程数量就是线程池的容量;当添加的到线程池中的线程超过它的容量时,会有一部分线程阻塞等待。线程池会通过相应的调度策略和拒绝策略,对添加到线程池中的线程进行管理。
1. ThreadPoolExecutor源码解析
下面我们来详细分析一下ThreadPoolExecutor的源码。先观察其中一组与线程池生命周期相关的重要的变量和方法。
1.1. 线程池生命周期相关
在前面的内容中,我们介绍过线程的状态一共有六种:NEW、RUNNABLE、BLOCKED、WAITING、TIME_WAITING、TERMINATED;线程池则有五种状态,不同于线程,线程池的五种状态分别是:RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED。在ThreadPoolExecutor中,有一组线程池状态的定义代码如下:
- /**
- * ctl维护两个概念上的参数:runState和workCount
- * runState表示线程池的运行状态,workCount表示有效的线程数量
- */
- private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
- // 用于计算runState的二进制移位数,即32 - 3 = 29
- private static final int COUNT_BITS = Integer.SIZE - 3;
- // 容量是1 << 29 - 1,即低29位用于表示最大容量(最大只能是2^29 - 1)
- private static final int CAPACITY = (1 << COUNT_BITS) - 1;
- // 高3位用于表示运行状态
- // runState is stored in the high-order bits
- private static final int RUNNING = -1 << COUNT_BITS; // 高三位为111
- private static final int SHUTDOWN = 0 << COUNT_BITS; // 高三位为000
- private static final int STOP = 1 << COUNT_BITS; // 高三位为001
- private static final int TIDYING = 2 << COUNT_BITS; // 高三位为010
- private static final int TERMINATED = 3 << COUNT_BITS; // 高三位为011
ctl
是一个AtomicInteger类型的原子对象,记录了线程池中的任务数量和线程池状态2个信息。ctl
共包括32位。其中,高3位表示线程池状态runState
,低29位表示线程池中的任务数量workerCount
。
ThreadPoolExecutor还提供了相关的操作方法便于对ctl
进行操作:
- // Packing and unpacking ctl
- // 从ctl中获取运行状态
- private static int runStateOf(int c) { return c & ~CAPACITY; }
- // 从ctl中获取有效的线程数量
- private static int workerCountOf(int c) { return c & CAPACITY; }
- // 根据运行状态和有效的线程数量获取ctl
- private static int ctlOf(int rs, int wc) { return rs | wc; }
- /*
- * Bit field accessors that don't require unpacking ctl.
- * These depend on the bit layout and on workerCount being never negative.
- */
- // 比较运行状态
- private static boolean runStateLessThan(int c, int s) {
- return c < s;
- }
- private static boolean runStateAtLeast(int c, int s) {
- return c >= s;
- }
- // 判断是否是运行状态
- private static boolean isRunning(int c) {
- return c < SHUTDOWN;
- }
- /**
- * Attempt to CAS-increment the workerCount field of ctl.
- * CAS方式自增workCount
- */
- private boolean compareAndIncrementWorkerCount(int expect) {
- return ctl.compareAndSet(expect, expect + 1);
- }
- /**
- * Attempt to CAS-decrement the workerCount field of ctl.
- * CAS方式自减workCount
- */
- private boolean compareAndDecrementWorkerCount(int expect) {
- return ctl.compareAndSet(expect, expect - 1);
- }
- /**
- * Decrements the workerCount field of ctl. This is called only on
- * abrupt termination of a thread (see processWorkerExit). Other
- * decrements are performed within getTask.
- *
- * 自减workerCount,如果失败就自旋直至成功
- */
- private void decrementWorkerCount() {
- do {} while (! compareAndDecrementWorkerCount(ctl.get()));
- }
- /**
- * Transitions runState to given target, or leaves it alone if
- * already at least the given target.
- *
- * 将runState转换为给定的targetState
- *
- * @param targetState the desired state, either SHUTDOWN or STOP
- * (but not TIDYING or TERMINATED -- use tryTerminate for that)
- */
- private void advanceRunState(int targetState) {
- for (;;) {
- // 获取ctl的值
- int c = ctl.get();
- // 只有在runState大于或等于targetState,才能转换
- if (runStateAtLeast(c, targetState) ||
- // 通过CAS方式修改ctl的runState为targetState
- ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
- break;
- }
- }
由于ctl
是一个复合变量,同时存储了workerCount
和runState
的值,上面这些方法则用于对ctl
进行各类分拆、合并和比较操作。其中对应的runState
的值如下:
- RUNNING:对应的高3位值是111。
- SHUTDOWN:对应的高3位值是000。
- STOP:对应的高3位值是001。
- TIDYING:对应的高3位值是010。
- TERMINATED:对应的高3位值是011。
线程池各个状态之间的切换如下图所示:
- RUNNING
- 状态说明:线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。
- 状态切换:线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,就处于RUNNING状态。在
ctl
的初始化代码中,就将它初始化为RUNNING状态,并且任务数量初始化为0,如下代码:
- private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
-
SHUTDOWN
- 状态说明:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。
- 状态切换:调用线程池的
shutdown()
接口时,线程池由RUNNING转变为SHUTDOWN。
-
STOP
- 状态说明:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。
- 状态切换:调用线程池的
shutdownNow()
接口时,线程池由(RUNNING or SHUTDOWN)转变为STOP。
-
TIDYING
- 状态说明:当所有的任务已终止,
ctl
记录的任务数量为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()
。terminated()
在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()
函数来实现。 - 状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由SHUTDOWN转变为TIDYING;当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP转变为TIDYING。
- 状态说明:当所有的任务已终止,
-
TERMINATED
- 状态说明:线程池彻底终止,就变成TERMINATED状态。
- 状态切换:线程池处在TIDYING状态时,执行完
terminated()
之后,就会由TIDYING转变为TERMINATED。
运行状态之间并不是随意转换的,大多数状态都只能由固定的状态转换而来,转换关系如下:
- RUNNING -> SHUTDOWN:在调用
shutdown()
时,可能隐含在finalize()
方法。 - RUNNING or SHUTDOWN -> STOP:调用
shutdownNow()
。 - SHUTDOWN -> TIDYING:当队列和线程池都是空的时。
- STOP -> TIDYING:当线程池为空时。
- TIDYING -> TERMINATED:当
terminate()
方法完成时。
1.2. 主要的成员变量
ThreadPoolExecutor还存在一些关键的成员变量,作用于源码的很多地方:
- /**
- * The queue used for holding tasks and handing off to worker
- * threads. We do not require that workQueue.poll() returning
- * null necessarily means that workQueue.isEmpty(), so rely
- * solely on isEmpty to see if the queue is empty (which we must
- * do for example when deciding whether to transition from
- * SHUTDOWN to TIDYING). This accommodates special-purpose
- * queues such as DelayQueues for which poll() is allowed to
- * return null even if it may later return non-null when delays
- * expire.
- *
- * 阻塞队列
- */
- private final BlockingQueue<Runnable> workQueue;
- /**
- * Lock held on access to workers set and related bookkeeping.
- * While we could use a concurrent set of some sort, it turns out
- * to be generally preferable to use a lock. Among the reasons is
- * that this serializes interruptIdleWorkers, which avoids
- * unnecessary interrupt storms, especially during shutdown.
- * Otherwise exiting threads would concurrently interrupt those
- * that have not yet interrupted. It also simplifies some of the
- * associated statistics bookkeeping of largestPoolSize etc. We
- * also hold mainLock on shutdown and shutdownNow, for the sake of
- * ensuring workers set is stable while separately checking
- * permission to interrupt and actually interrupting.
- *
- * 主要的锁对象
- */
- private final ReentrantLock mainLock = new ReentrantLock();
- /**
- * Set containing all worker threads in pool. Accessed only when
- * holding mainLock.
- *
- * 用于存放worker
- */
- private final HashSet<Worker> workers = new HashSet<Worker>();
- /**
- * Wait condition to support awaitTermination
- *
- * 用于支持awaitTermination的等待队列
- */
- private final Condition termination = mainLock.newCondition();
- /**
- * Tracks largest attained pool size. Accessed only under mainLock.
- * 记录最大线程池大小,仅在获取锁的前提下可以访问
- */
- private int largestPoolSize;
- /**
- * Counter for completed tasks. Updated only on termination of
- * worker threads. Accessed only under mainLock.
- * 用于记录已完成的任务数
- * 仅在获取锁的前提下,worker线程终止时进行更新
- */
- private long completedTaskCount;
- /*
- * All user control parameters are declared as volatiles so that
- * ongoing actions are based on freshest values, but without need
- * for locking, since no internal invariants depend on them
- * changing synchronously with respect to other actions.
- */
- /**
- * Factory for new threads. All threads are created using this
- * factory (via method addWorker). All callers must be prepared
- * for addWorker to fail, which may reflect a system or user's
- * policy limiting the number of threads. Even though it is not
- * treated as an error, failure to create threads may result in
- * new tasks being rejected or existing ones remaining stuck in
- * the queue. On the other hand, no special precautions exist to
- * handle OutOfMemoryErrors that might be thrown while trying to
- * create threads, since there is generally no recourse from
- * within this class.
- *
- * 线程工厂。所有线程都是使用线程工厂来创建的
- */
- private volatile ThreadFactory threadFactory;
- /**
- * Handler called when saturated or shutdown in execute.
- * 当线程池饱和或关闭时,导致任务失败时的回调,用于通知调用者
- */
- private volatile RejectedExecutionHandler handler;
- /**
- * Timeout in nanoseconds for idle threads waiting for work.
- * Threads use this timeout when there are more than corePoolSize
- * present or if allowCoreThreadTimeOut. Otherwise they wait
- * forever for new work.
- * 等待线程最大的超时时间
- */
- private volatile long keepAliveTime;
- /**
- * If false (default), core threads stay alive even when idle.
- * If true, core threads use keepAliveTime to time out waiting
- * for work.
- * 当该值为false时,核心线程在空闲时也会保持活跃;这是默认值;
- * 当该值为true时,核心线程在空闲期间会有根据keepAliveTime计算等待任务的超时时间
- */
- private volatile boolean allowCoreThreadTimeOut;
- /**
- * Core pool size is the minimum number of workers to keep alive
- * (and not allow to time out etc) unless allowCoreThreadTimeOut
- * is set, in which case the minimum is zero.
- *
- * 核心池大小,保持存活的worker的最小数量(默认会忽略超时)
- * 但如果设置了allowCoreTimeOut为true,那么当核心线程闲置时会被回收。
- */
- private volatile int corePoolSize;
- /**
- * Maximum pool size. Note that the actual maximum is internally
- * bounded by CAPACITY.
- *
- * 最大线程池尺寸,被CAPACITY限制(2^29 - 1)
- */
- private volatile int maximumPoolSize;
- /**
- * The default rejected execution handler
- * 默认的拒绝执行策略处理器
- */
- private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
- /**
- * Permission required for callers of shutdown and shutdownNow.
- * We additionally require (see checkShutdownAccess) that callers
- * have permission to actually interrupt threads in the worker set
- * (as governed by Thread.interrupt, which relies on
- * ThreadGroup.checkAccess, which in turn relies on
- * SecurityManager.checkAccess). Shutdowns are attempted only if
- * these checks pass.
- *
- * All actual invocations of Thread.interrupt (see
- * interruptIdleWorkers and interruptWorkers) ignore
- * SecurityExceptions, meaning that the attempted interrupts
- * silently fail. In the case of shutdown, they should not fail
- * unless the SecurityManager has inconsistent policies, sometimes
- * allowing access to a thread and sometimes not. In such cases,
- * failure to actually interrupt threads may disable or delay full
- * termination. Other uses of interruptIdleWorkers are advisory,
- * and failure to actually interrupt will merely delay response to
- * configuration changes so is not handled exceptionally.
- */
- private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread");
对下面这些重要的成员变量有以下的解释:
workers
:线程池通过wokers
实现了允许多个线程同时运行。workers
是HashSet类型,即它是一个装载Worker类型实例的集合。一个Worker对应一个线程,也就是说线程池通过 workers
包含了一个线程集合。当Worker对应的线程启动时,它会执行线程池中的任务;当执行完一个任务后,它会从线程池的阻塞队列中取出一个阻塞的任务来继续运行。workQueue
:线程池通过workQueue
实现了阻塞等待的功能。workQueue
是BlockingQueue类型的阻塞队列。当线程池中的线程数超过它的核心线程池容量的时候,提交的任务会进入阻塞队列进行阻塞等待。mainLock
:mainLock
是互斥锁,通过mainLock
实现了对线程池的互斥访问。corePoolSize
和maximumPoolSize
:corePoolSize
是核心池大小,maximumPoolSize
是最大池大小。它们的作用是调整线程池中实际运行的线程的数量。例如,当新任务提交给线程池时,如果此时,线程池中运行的线程数量小于corePoolSize
,则创建新线程来处理请求;如果此时,线程池中运行的线程数量大于corePoolSize
,但小于maximumPoolSize
,则仅当阻塞队列满时才创建新线程。如果设置的corePoolSize
和maximumPoolSize
相同,则创建了固定大小的线程池。如果将maximumPoolSize
设置为基本类型的无界值(如Integer.MAX_VALUE
),则允许线程池适应任意数量的并发任务。在大多数情况下,核心池大小和最大池大小的值是在创建线程池设置的;但是,也可以使用setCorePoolSize(int)
和setMaximumPoolSize(int)
方法进行动态更改。poolSize
:poolSize
是当前线程池的实际大小,即线程池中任务的数量。allowCoreThreadTimeOut
和keepAliveTime
:allowCoreThreadTimeOut
表示是否允许核心线程在空闲状态时仍然能够存活,默认为false,表示允许核心线程超时,也即是说,默认情况下,核心线程即使空闲也不会终止,如果该值设置为true,核心线程一旦超时也会被终止;而超时时间则由keepAliveTime
参数来指定,它表示当线程池处于空闲状态的时候,超过keepAliveTime
指定的时间之后,空闲的线程会被终止,在allowCoreThreadTimeOut
取默认值的情况下,超时终止的是非核心线程,如果allowCoreThreadTimeOut
为true,核心线程也会随着超时而被终止。threadFactory
:threadFactory
是ThreadFactory对象。它是一个线程工厂类,线程池通过ThreadFactory创建线程。handler
:handler
是RejectedExecutionHandler类型。它是线程池拒绝策略的句柄,也就是说当某任务添加到线程池中,而线程池拒绝该任务时,线程池会通过handler
进行相应的处理。
在上面提到的成员变量中,涉及了几个重要的概念,这里先简单介绍,后面会有对应的详细解释。
1.2.1. 工作队列
如果新请求的到达速率超过了线程池的处理速率,那么新到来的请求将累积起来。在线程池中,这些请求会在一个由Executor管理的Runnable队列中等待,而不会像线程那样去竞争CPU资源。常见的工作队列有以下几种,前三种用的最多。
- ArrayBlockingQueue:列表形式的工作队列,必须要有初始队列大小,有界队列,先进先出。
- LinkedBlockingQueue:链表形式的工作队列,可以选择设置初始队列大小,有界/无界队列,先进先出。
- SynchronousQueue:SynchronousQueue不是一个真正的队列,而是一种在线程之间移交的机制。要将一个元素放入SynchronousQueue中,必须有另一个线程正在等待接受这个元素。如果没有线程等待,并且线程池的当前大小小于最大值,那么ThreadPoolExecutor将创建一个线程,否则根据饱和策略,这个任务将被拒绝。使用直接移交将更高效,因为任务会直接移交给执行它的线程,而不是被首先放在队列中,然后由工作者线程从队列中提取任务。只有当线程池是无解的或者可以拒绝任务时,SynchronousQueue才有实际价值。
- PriorityBlockingQueue:优先级队列,有界队列,根据优先级来安排任务,任务的优先级是通过自然顺序或Comparator(如果任务实现了Comparator)来定义的。
- DelayedWorkQueue:延迟的工作队列,无界队列。ScheduledThreadPoolExecutor默认使用的延迟队列。
1.2.2. 饱和策略
饱和策略也称为拒绝策略;当有界队列被填满后,饱和策略开始发挥作用。ThreadPoolExecutor的饱和策略可以通过调用setRejectedExecutionHandler()
来修改(如果某个任务被提交到一个已被关闭的Executor时,也会用到饱和策略)。饱和策略有以下四种,一般使用默认的AbortPolicy。
- AbortPolicy:中止策略。默认的饱和策略,抛出未检查的RejectedExecutionException。调用者可以捕获这个异常,然后根据需求编写自己的处理代码。
- DiscardPolicy:抛弃策略。当新提交的任务无法保存到队列中等待执行时,该策略会悄悄抛弃该任务。
- DiscardOldestPolicy:抛弃最旧的策略。当新提交的任务无法保存到队列中等待执行时,则会抛弃下一个将被执行的任务,然后尝试重新提交新的任务。如果工作队列是一个优先队列,那么该策略将导致抛弃优先级最高的任务,因此最好不要将该策略和优先级队列放在一起使用。
- CallerRunsPolicy:调用者运行策略。该策略实现了一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者(调用线程池执行任务的主线程),从而降低新任务的流程。它不会在线程池的某个线程中执行新提交的任务,而是在一个调用了
execute(Runnable)
的线程中执行该任务。当线程池的所有线程都被占用,并且工作队列被填满后,下一个任务会在调用execute(Runnable)
时在主线程中执行(调用线程池执行任务的主线程)。由于执行任务需要一定时间,因此主线程至少在一段时间内不能提交任务,从而使得工作者线程有时间来处理完正在执行的任务。
1.2.3. 线程工厂
每当线程池需要创建一个线程时,都是通过线程工厂方法来完成的。在ThreadFactory中只定义了一个方法newThread()
,每当线程池需要创建一个新线程时都会调用这个方法。Executors提供的线程工厂有两种,一般使用默认的,当然如果有特殊需求,也可以自己定制。
- DefaultThreadFactory:默认线程工厂,创建一个新的、非守护的线程,并且不包含特殊的配置信息。
- PrivilegedThreadFactory:通过这种方式创建出来的线程,将与创建PrivilegedThreadFactory的线程拥有相同的访问权限、AccessControlContext、ContextClassLoader。如果不使用PrivilegedThreadFactory,线程池创建的线程将从在需要新线程时调用
execute(Runnable)
或submit(Runnable)
的客户程序中继承访问权限。 - 自定义线程工厂:可以自己实现ThreadFactory接口来定制自己的线程工厂方法。
1.2.4. 线程处理流程
ThreadPoolExecutor在提交线程执行时,遵循以下的规则:
- 默认情况下,创建完线程池后并不会立即创建线程,而是等到有任务提交时才会创建线程来进行处理(除非调用
prestartCoreThread()
或prestartAllCoreThreads()
方法)。 - 当线程数小于核心线程数时,每提交一个任务就创建一个线程来执行,即使当前线程池中有线程处于空闲状态,直到当前线程数达到核心线程数。这些线程会通过
workers
来管理。 - 当前线程数达到核心线程数时,如果这个时候还提交任务,这些任务会被放到工作队列里。
- 当核心线程池处理完了当前的任务后,会来工作队列中取任务处理。
- 当前线程数达到核心线程数并且工作队列也满了,如果这个时候还提交任务,则会继续创建线程来处理,直到线程数达到最大线程数。
- 当前线程数达到最大线程数并且队列也满了,如果这个时候还提交任务,则会触发饱和策略。
- 如果某个线程的空闲时间超过了
keepAliveTime
,那么将被标记为可回收的,并且当前线程池的当前大小超过了核心线程数时,这个线程将被终止。
流程如下图所示:
1.3. Worker类
Worker是ThreadPoolExecutor中用于执行任务的主体内部类,它继承自AbstractQueuedSynchronizer类,并且实现了Runnable接口;定义如下:
- /**
- * Class Worker mainly maintains interrupt control state for
- * threads running tasks, along with other minor bookkeeping.
- * This class opportunistically extends AbstractQueuedSynchronizer
- * to simplify acquiring and releasing a lock surrounding each
- * task execution. This protects against interrupts that are
- * intended to wake up a worker thread waiting for a task from
- * instead interrupting a task being run. We implement a simple
- * non-reentrant mutual exclusion lock rather than use ReentrantLock
- * because we do not want worker tasks to be able to reacquire the
- * lock when they invoke pool control methods like setCorePoolSize.
- *
- * Worker线程类,该类是AQS的子类
- */
- private final class Worker extends AbstractQueuedSynchronizer implements Runnable
- {
- /**
- * This class will never be serialized, but we provide a
- * serialVersionUID to suppress a javac warning.
- *
- * 该类不会被序列化,提供serialVersionUID是为了压制警告信息
- */
- private static final long serialVersionUID = 6138294804551838833L;
- /**
- * Thread this worker is running in. Null if factory fails.
- * Worker所运行的线程
- * */
- final Thread thread;
- /**
- * Initial task to run. Possibly null.
- * 运行的初始任务,是一个Runnable对象,可能为null
- * */
- Runnable firstTask;
- /**
- * Per-thread task counter
- * 当前Worker完成的任务数
- * */
- volatile long completedTasks;
- /**
- * Creates with given first task and thread from ThreadFactory.
- * 通过给定的firstTask和从ThreadFactory获取的线程创建一个Worker
- * @param firstTask the first task (null if none)
- */
- Worker(Runnable firstTask) {
- this.firstTask = firstTask;
- this.thread = getThreadFactory().newThread(this);
- }
- /**
- * Delegates main run loop to outer runWorker
- * 用于运行Worker
- * */
- public void run() {
- // 将主要的运行逻辑交给runWorker()方法
- runWorker(this);
- }
- // Lock methods
- //
- // The value 0 represents the unlocked state.
- // The value 1 represents the locked state.
- // 是否拥有独占锁
- protected boolean isHeldExclusively() {
- return getState() == 1;
- }
- // 尝试获取独占锁
- protected boolean tryAcquire(int unused) {
- if (compareAndSetState(0, 1)) {
- setExclusiveOwnerThread(Thread.currentThread());
- return true;
- }
- return false;
- }
- // 尝试释放独占锁
- protected boolean tryRelease(int unused) {
- setExclusiveOwnerThread(null);
- setState(0);
- return true;
- }
- // 获取锁
- public void lock() { acquire(1); }
- // 尝试获取锁
- public boolean tryLock() { return tryAcquire(1); }
- // 释放锁
- public void unlock() { release(1); }
- // 判断是否拥有锁
- public boolean isLocked() { return isHeldExclusively(); }
- }
从该类的定义可知,之所以继承AQS类,是因为Worker类也实现了相应的锁机制,需要通过AQS的state
变量及相关机制来控制并发安全性。另外该类的thread
成员变量正是用于执行任务的线程对象,它由线程工厂的newThread()
方法获取,线程工厂相关的内容会在后面讲解。由于Worker类实现了Runnable接口,说明它也是一个线程类,它的run()
方法简单地调用了ThreadPoolExecutor的runWorker(this)
,将自己作为参数传入,以处理Worker运行的业务。
1.4. 构造方法
下面我们从ThreadPoolExecutor的构造方法开始分析,它的构造方法有多个重载,但最终都调用了同一个参数齐全的重载构造方法:
- // 使用默认的线程工厂和拒绝策略处理器创建线程池
- public ThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue) {
- this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
- Executors.defaultThreadFactory(), defaultHandler);
- }
- // 使用默认拒绝策略处理器创建线程池
- public ThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- ThreadFactory threadFactory) {
- this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
- threadFactory, defaultHandler);
- }
- // 使用默认的线程工厂创建线程池
- public ThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- RejectedExecutionHandler handler) {
- this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
- Executors.defaultThreadFactory(), handler);
- }
- // 根据传入的参数创建线程池
- public ThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- ThreadFactory threadFactory,
- RejectedExecutionHandler handler) {
- /**
- * 检查各类参数,
- * maximumPoolSize要大于0,
- * corePoolSize不能小于0,且corePoolSize不能大于maximumPoolSize
- * keepAliveTime不能小于0
- */
- if (corePoolSize < 0 ||
- maximumPoolSize <= 0 ||
- maximumPoolSize < corePoolSize ||
- keepAliveTime < 0)
- throw new IllegalArgumentException();
- // 工作队列,线程工厂和拒绝处理器不能为null
- if (workQueue == null || threadFactory == null || handler == null)
- throw new NullPointerException();
- // 对各项参数赋值
- this.corePoolSize = corePoolSize;
- this.maximumPoolSize = maximumPoolSize;
- this.workQueue = workQueue;
- this.keepAliveTime = unit.toNanos(keepAliveTime);
- this.threadFactory = threadFactory;
- this.handler = handler;
- }
通过构造方法的定义我们可以得知,ThreadPoolExecutor提供了默认的线程工厂Executors.defaultThreadFactory()
和拒绝策略处理器defaultHandler
供我们选择使用,它们的定义如下:
- // Executors的defaultThreadFactory()方法
- public static ThreadFactory defaultThreadFactory() {
- return new DefaultThreadFactory();
- }
- // DefaultThreadFactory类
- static class DefaultThreadFactory implements ThreadFactory {
- private static final AtomicInteger poolNumber = new AtomicInteger(1);
- private final ThreadGroup group;
- private final AtomicInteger threadNumber = new AtomicInteger(1);
- private final String namePrefix;
- DefaultThreadFactory() {
- SecurityManager s = System.getSecurityManager();
- group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
- // 线程名前缀
- namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
- }
- public Thread newThread(Runnable r) {
- // 创建线程
- Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
- // 设置线程为非守护线程
- if (t.isDaemon())
- t.setDaemon(false);
- // 设置线程的优先级为NORM_PRIORITY
- if (t.getPriority() != Thread.NORM_PRIORITY)
- t.setPriority(Thread.NORM_PRIORITY);
- return t;
- }
- }
- // 默认使用AbortPolicy拒绝策略处理器
- private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
由上面的源码可知,默认的线程工厂会直接根据传入的Runnable对象创建线程,并放在同一个线程组中,同时该线程工厂创建出的线程都是非守护线程,并且使用NORM_PRIORITY优先级。另外默认使用的拒绝策略处理器是AbortPolicy,也即是当线程池饱和后,将对无法执行的任务直接抛出异常。
1.5. 任务提交
在构造了ThreadPoolExecutor线程池对象后,就可以通过execute(Runnable)
或submit(...)
方法向线程池中提交任务了,其中submit(...)
方法继承自父类AbstractExecutorService,定义如下:
- /**
- * 将传入的Runnable包装为一个RunnableFuture对象,交由execute(Runnable)方法处理
- *
- * @throws RejectedExecutionException {@inheritDoc}
- * @throws NullPointerException {@inheritDoc}
- */
- public Future<?> submit(Runnable task) {
- if (task == null) throw new NullPointerException();
- RunnableFuture<Void> ftask = newTaskFor(task, null);
- execute(ftask);
- return ftask;
- }
- /**
- * 将传入的Runnable和默认结果值包装为一个RunnableFuture对象,交由execute(Runnable)方法处理
- *
- * @throws RejectedExecutionException {@inheritDoc}
- * @throws NullPointerException {@inheritDoc}
- */
- public <T> Future<T> submit(Runnable task, T result) {
- if (task == null) throw new NullPointerException();
- RunnableFuture<T> ftask = newTaskFor(task, result);
- execute(ftask);
- return ftask;
- }
- /**
- * 将传入的Callable包装为一个RunnableFuture对象,交由execute(Runnable)方法处理
- *
- * @throws RejectedExecutionException {@inheritDoc}
- * @throws NullPointerException {@inheritDoc}
- */
- public <T> Future<T> submit(Callable<T> task) {
- if (task == null) throw new NullPointerException();
- RunnableFuture<T> ftask = newTaskFor(task);
- execute(ftask);
- return ftask;
- }
从AbstractExecutorService的三个重载submit(...)
方法的源码可知,内部依旧是将传入的任务包装为了一个Runnable对象,然后交给execute(Runnable)
方法执行,这部分的代码之前已经分析过了,所以我们只需要重点关注ThreadPoolExecutor的execute(Runnable)
方法即可,源码如下:
- /**
- * Executes the given task sometime in the future. The task
- * may execute in a new thread or in an existing pooled thread.
- *
- * If the task cannot be submitted for execution, either because this
- * executor has been shutdown or because its capacity has been reached,
- * the task is handled by the current {@code RejectedExecutionHandler}.
- *
- * @param command the task to execute
- * @throws RejectedExecutionException at discretion of
- * {@code RejectedExecutionHandler}, if the task
- * cannot be accepted for execution
- * @throws NullPointerException if {@code command} is null
- */
- public void execute(Runnable command) {
- // 参数检查
- if (command == null)
- throw new NullPointerException();
- /*
- * Proceed in 3 steps:
- *
- * 1. If fewer than corePoolSize threads are running, try to
- * start a new thread with the given command as its first
- * task. The call to addWorker atomically checks runState and
- * workerCount, and so prevents false alarms that would add
- * threads when it shouldn't, by returning false.
- *
- * 2. If a task can be successfully queued, then we still need
- * to double-check whether we should have added a thread
- * (because existing ones died since last checking) or that
- * the pool shut down since entry into this method. So we
- * recheck state and if necessary roll back the enqueuing if
- * stopped, or start a new thread if there are none.
- *
- * 3. If we cannot queue task, then we try to add a new
- * thread. If it fails, we know we are shut down or saturated
- * and so reject the task.
- */
- // 获取ctl值
- int c = ctl.get();
- // 计算workerCount是否小于corePoolSize
- if (workerCountOf(c) < corePoolSize) {
- /**
- * 如果小于,就调用addWorker()方法新建一个Worker线程来处理任务
- * 这里传入的core参数为true,表示创建的是核心线程
- */
- if (addWorker(command, true))
- // 如果添加成功,直接返回
- return;
- // 否则重新拿到ctl的值
- c = ctl.get();
- }
- /**
- * 走到这里,说明workerCount是大于等于corePoolSize的
- * 检查当前的runState是否是RUNNING,
- * 如果是,就将传入的command任务放入workQueue等待队列
- */
- if (isRunning(c) && workQueue.offer(command)) {
- // 放入等待队列成功之后,重新获取ctl的值进行检查
- int recheck = ctl.get();
- // 如果runState不是RUNNING状态,则将command任务从workQueue队列移除
- if (! isRunning(recheck) && remove(command))
- // 然后调用reject拒绝任务,底层会使用拒绝策略进行处理
- reject(command);
- // 否则如果workerCount为0,则新建一个线程(firstTask为null,core为false,非核心线程)
- else if (workerCountOf(recheck) == 0)
- addWorker(null, false);
- }
- /**
- * 走到这里说明将任务添加到workQueue失败了,等待队列已满,
- * 则创建一个非核心线程来处理任务
- */
- else if (!addWorker(command, false))
- // 如果创建失败则调用reject拒绝任务
- reject(command);
- }
从execute(Runnable)
方法可知,在添加任务时会分为多种情况:
- 当前的Worker数量小于核心线程数
corePoolSize
时,使用addWorker(Runnable, boolean)
方法根据传入的任务command
添加Worker对象(核心线程),添加成功后直接返回; - 当前的Worker数量不小于核心线程数
corePoolSize
时,判断线程池是否正在运行,如果在运行就将任务command
放入等待队列workQueue
;如果放入等待队列失败,就尝试再次添加Worker对象(非核心线程),如果这次添加依旧失败,就调用reject(command)
使用拒绝策略处理器拒绝任务。
这里我们需要注意任务提交的流程:
- 核心线程数未满时,添加核心线程,即
workerCount
<corePoolSize
; - 核心线程数已满时,放入等待队列,即
workerCount
>=corePoolSize
,workQueue
未满; - 等待队列已满时,添加非核心线程,即
workerCount
>=corePoolSize
,workQueue
已满,workerCount
<maximumPoolSize
; - 非核心线程已满时,拒绝执行,即
workerCount
>=corePoolSize
,workQueue
已满,workerCount
>=maximumPoolSize
。
1.6. 拒绝任务
当线程池饱和之后,会通过reject(Runnable)
方法根据设定的拒绝策略拒绝新提交的任务,该方法源码如下:
- /**
- * Invokes the rejected execution handler for the given command.
- * Package-protected for use by ScheduledThreadPoolExecutor.
- *
- * 拒绝任务
- */
- final void reject(Runnable command) {
- // 使用拒绝任务处理器来拒绝任务的执行
- handler.rejectedExecution(command, this);
- }
从该方法可以得知,其实最终执行拒绝操作的是handler
对象的rejectedExecution(Runnable, ThreadPoolExecutor)
方法;从前面介绍的内容可知,ThreadPoolExecutor默认的拒绝策略处理器是AbortPolicy类,其实ThreadPoolExecutor还定义了其他三种拒绝策略处理器,这四个类的源码如下:
- /* Predefined RejectedExecutionHandlers */
- /**
- * A handler for rejected tasks that runs the rejected task
- * directly in the calling thread of the {@code execute} method,
- * unless the executor has been shut down, in which case the task
- * is discarded.
- *
- * 调用者运行策略
- * 当线程池拒绝接受任务后,会由调用者自行运行任务
- */
- public static class CallerRunsPolicy implements RejectedExecutionHandler {
- /**
- * Creates a {@code CallerRunsPolicy}.
- */
- public CallerRunsPolicy() { }
- /**
- * Executes task r in the caller's thread, unless the executor
- * has been shut down, in which case the task is discarded.
- *
- * @param r the runnable task requested to be executed
- * @param e the executor attempting to execute this task
- */
- public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
- // 如果线程池还未关闭,就直接运行任务
- if (!e.isShutdown()) {
- // 不启动新的线程,直接调用任务的run()方法
- r.run();
- }
- }
- }
- /**
- * A handler for rejected tasks that throws a
- * {@code RejectedExecutionException}.
- *
- * 中止策略,会直接抛出异常
- */
- public static class AbortPolicy implements RejectedExecutionHandler {
- /**
- * Creates an {@code AbortPolicy}.
- */
- public AbortPolicy() { }
- /**
- * Always throws RejectedExecutionException.
- *
- * @param r the runnable task requested to be executed
- * @param e the executor attempting to execute this task
- * @throws RejectedExecutionException always.
- */
- public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
- throw new RejectedExecutionException("Task " + r.toString() +
- " rejected from " +
- e.toString());
- }
- }
- /**
- * A handler for rejected tasks that silently discards the
- * rejected task.
- * 抛弃策略,直接抛弃任务,不会做任何处理
- */
- public static class DiscardPolicy implements RejectedExecutionHandler {
- /**
- * Creates a {@code DiscardPolicy}.
- */
- public DiscardPolicy() { }
- /**
- * Does nothing, which has the effect of discarding task r.
- *
- * @param r the runnable task requested to be executed
- * @param e the executor attempting to execute this task
- */
- public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
- }
- }
- /**
- * A handler for rejected tasks that discards the oldest unhandled
- * request and then retries {@code execute}, unless the executor
- * is shut down, in which case the task is discarded.
- *
- * 抛弃最旧策略,会将等待队列中等待最久的任务放弃执行,然后添加被拒绝的任务
- */
- public static class DiscardOldestPolicy implements RejectedExecutionHandler {
- /**
- * Creates a {@code DiscardOldestPolicy} for the given executor.
- */
- public DiscardOldestPolicy() { }
- /**
- * Obtains and ignores the next task that the executor
- * would otherwise execute, if one is immediately available,
- * and then retries execution of task r, unless the executor
- * is shut down, in which case task r is instead discarded.
- *
- * @param r the runnable task requested to be executed
- * @param e the executor attempting to execute this task
- */
- public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
- // 如果线程池没有关闭
- if (!e.isShutdown()) {
- // 将等待最久的的任务移除,放弃执行
- e.getQueue().poll();
- // 然后添加被拒绝的任务
- e.execute(r);
- }
- }
- }
其实这四个类的源码非常简单,上面的注释也讲解得比较清楚了,大家可以自行理解,这里不再赘述。
1.7. 添加Worker
从execute(Runnable)
方法可知,在提交任务后,其实会根据条件调用addWorker(Runnable, boolean)
方法,该方法的源码如下:
- /**
- * Checks if a new worker can be added with respect to current
- * pool state and the given bound (either core or maximum). If so,
- * the worker count is adjusted accordingly, and, if possible, a
- * new worker is created and started running firstTask as its
- * first task. This method returns false if the pool is stopped or
- * eligible to shut down. It also returns false if the thread
- * factory fails to create a thread when asked, which requires a
- * backout of workerCount, and a recheck for termination, in case
- * the existence of this worker was holding up termination.
- *
- * 添加Worker
- *
- * @param firstTask the task the new thread should run first (or
- * null if none). Workers are created with an initial first task
- * (in method execute()) to bypass queuing when there are fewer
- * than corePoolSize threads (in which case we always start one),
- * or when the queue is full (in which case we must bypass queue).
- * Initially idle threads are usually created via
- * prestartCoreThread or to replace other dying workers.
- *
- * @param core if true use corePoolSize as bound, else
- * maximumPoolSize. (A boolean indicator is used here rather than a
- * value to ensure reads of fresh values after checking other pool
- * state). 当core为true时以corePoolSize为上限,否则以maximumPoolSize为上限
- * @return true if successful
- */
- private boolean addWorker(Runnable firstTask, boolean core) {
- retry:
- for (;;) {
- // 获取runState
- int c = ctl.get();
- int rs = runStateOf(c);
- // Check if queue empty only if necessary.
- /**
- * 如果runState大于等于SHUTDOWN(即为SHUTDOWN、STOP、TIDYING或TERMINATED),则返回false,
- * 即如果线程池停止或有资格关闭,则此方法返回false;分为以下五种情况:
- * 1,如果runState为STOP、TIDYING或TERMINATED则直接返回false;
- * 这三个状态表示已经进入最后的清理终止,不接受任务不处理队列任务。
- * 2,如果runState为SHUTDOWN,但firstTask不为null时直接返回false;
- * SHUTDOWN状态表示不接受任务但处理队列任务,因此任务不为null时返回false。
- * 3,如果runState为SHUTDOWN,firstTask为null,但workQueue为空时直接返回false;
- * 此时线程池有资格关闭,因此会直接返回false。
- * 4. 如果runState为SHUTDOWN,firstTask为null,但workQueue不为为空,通过校验;
- * SHUTDOWN状态下可以处理队列任务。
- * 5. runState为RUNNING,通过检验。
- */
- if (rs >= SHUTDOWN &&
- ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
- return false;
- // 状态符合条件,使用无限for循环
- for (;;) {
- // 获取workerCount
- int wc = workerCountOf(c);
- /**
- * core参数可用于判断添加的是否是核心线程;
- * 如果workerCount大于等于CAPACITY,表示线程池已经满了,无法添加Worker,直接返回false结束;
- * 或者当添加核心线程时,workerCount大于等于corePoolSize,无法添加Worker,直接返回false结束;
- * 当添加非核心线程时,workerCount大于等于maximumPoolSize,无法添加Worker,直接返回false结束;
- */
- if (wc >= CAPACITY ||
- wc >= (core ? corePoolSize : maximumPoolSize))
- return false;
- // 如果成功将workerCount加1,那么跳出外层循环
- if (compareAndIncrementWorkerCount(c))
- break retry;
- /**
- * 走到这里说明尝试workerCount加1失败,
- * 重新获取runState与之前保存的runState值rs对比,如果不同说明runState被改变了,
- * 因此直接开始下一次外层循环重新尝试
- */
- c = ctl.get(); // Re-read ctl
- if (runStateOf(c) != rs)
- continue retry;
- // else CAS failed due to workerCount change; retry inner loop
- }
- }
- // 使用firstTask创建一个新的Worker
- Worker w = new Worker(firstTask);
- // 获取Worker线程
- Thread t = w.thread;
- // 加锁
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- // Recheck while holding lock.
- // Back out on ThreadFactory failure or if
- // shut down before lock acquired.
- // 获取ctl和runState
- int c = ctl.get();
- int rs = runStateOf(c);
- /**
- * 如果创建的Worker线程为null,即线程工厂创建线程失败
- * 或者虽然创建的Worker线程不为null,但runState被关闭了(处于STOP、TIDYING或TERMINATED状态)
- * 且如果runState处于SHUTDOWN,且firstTask不为null
- * 说明无法创建Worker
- */
- if (t == null || (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null))) {
- // 将workerCount减1
- decrementWorkerCount();
- // 尝试将runState转换为TERMINATED,并返回false表示添加Worker失败
- tryTerminate();
- return false;
- }
- // 符合条件,添加新创建的worker
- workers.add(w);
- // 更新largestPoolSize
- int s = workers.size();
- if (s > largestPoolSize)
- largestPoolSize = s;
- } finally {
- // 解锁
- mainLock.unlock();
- }
- // 启动worker线程
- t.start();
- // It is possible (but unlikely) for a thread to have been
- // added to workers, but not yet started, during transition to
- // STOP, which could result in a rare missed interrupt,
- // because Thread.interrupt is not guaranteed to have any effect
- // on a non-yet-started Thread (see Thread#interrupt).
- // 如果runState为STOP,且worker线程没有被中断,那么尝试中断worker线程
- if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted())
- t.interrupt();
- return true;
- }
该方法首先做了一系列的判断来决定是否可以添加Worker线程,这些判断过程源码中的注释已经讲解得比较清楚了,我们主要关注添加Worker线程的过程;在将传入的任务作为参数创建了Worker对象后,会维护相应的记录信息,然后将新创建的Worker对象添加到workers
集合中,最后会通过t.start()
启动w.thread
,也即是此时Worker对象的thread
线程;从之前的分析可知,thread
线程其实就是以Worker对象自己来创建的,因此t.start()
方法其实调用的是Worker对象的start()
方法,而Worker对象的run()
方法内部直接调用了runWorker(this)
,因此我们接下来关注ThreadPoolExecutor的runWorker(Worker)
方法。
1.8. 运行Worker
ThreadPoolExecutor的runWorker(Worker)
方法负责运行Worker任务,它的源码如下:
- /**
- * Main worker run loop. Repeatedly gets tasks from queue and
- * executes them, while coping with a number of issues:
- *
- * 1. We may start out with an initial task, in which case we
- * don't need to get the first one. Otherwise, as long as pool is
- * running, we get tasks from getTask. If it returns null then the
- * worker exits due to changed pool state or configuration
- * parameters. Other exits result from exception throws in
- * external code, in which case completedAbruptly holds, which
- * usually leads processWorkerExit to replace this thread.
- *
- * 2. Before running any task, the lock is acquired to prevent
- * other pool interrupts while the task is executing, and
- * clearInterruptsForTaskRun called to ensure that unless pool is
- * stopping, this thread does not have its interrupt set.
- *
- * 3. Each task run is preceded by a call to beforeExecute, which
- * might throw an exception, in which case we cause thread to die
- * (breaking loop with completedAbruptly true) without processing
- * the task.
- *
- * 4. Assuming beforeExecute completes normally, we run the task,
- * gathering any of its thrown exceptions to send to
- * afterExecute. We separately handle RuntimeException, Error
- * (both of which the specs guarantee that we trap) and arbitrary
- * Throwables. Because we cannot rethrow Throwables within
- * Runnable.run, we wrap them within Errors on the way out (to the
- * thread's UncaughtExceptionHandler). Any thrown exception also
- * conservatively causes thread to die.
- *
- * 5. After task.run completes, we call afterExecute, which may
- * also throw an exception, which will also cause thread to
- * die. According to JLS Sec 14.20, this exception is the one that
- * will be in effect even if task.run throws.
- *
- * The net effect of the exception mechanics is that afterExecute
- * and the thread's UncaughtExceptionHandler have as accurate
- * information as we can provide about any problems encountered by
- * user code.
- *
- * @param w the worker
- */
- final void runWorker(Worker w) {
- // 引用worker的firstTask任务,并清除worker的firstTask
- Runnable task = w.firstTask;
- w.firstTask = null;
- // 用于标识worker是不是因异常而死亡
- boolean completedAbruptly = true;
- try {
- // worker取任务执行
- while (task != null || (task = getTask()) != null) {
- // 加锁
- w.lock();
- clearInterruptsForTaskRun();
- try {
- // 执行beforeExecute()钩子方法
- beforeExecute(w.thread, task);
- // 用于记录运行过程中的异常
- Throwable thrown = null;
- try {
- // 执行任务
- task.run();
- } catch (RuntimeException x) {
- thrown = x; throw x;
- } catch (Error x) {
- thrown = x; throw x;
- } catch (Throwable x) {
- thrown = x; throw new Error(x);
- } finally {
- // 执行afterExecute()钩子方法
- afterExecute(task, thrown);
- }
- } finally {
- // 将执行完的任务清空
- task = null;
- // 将worker的完成任务数加1
- w.completedTasks++;
- // 解锁
- w.unlock();
- }
- }
- // 运行到这里表示运行过程中没有出现异常
- completedAbruptly = false;
- } finally {
- // 调用processWorkerExit()方法处理Worker的后续清理和退出流程
- processWorkerExit(w, completedAbruptly);
- }
- }
其实runWorker(Worker)
方法的整体逻辑是比较简单的,内部使用了一个while循环不断地获取任务对象,当能够获取到任务对象时,直接调用task.run()
来执行任务,同时在任务执行前后都会调用相应的钩子方法beforeExecute(Thread, Runnable)
和afterExecute(Runnable, Throwable)
,这两个钩子方法的实现是空的,可以交由开发人员进行定制(通过继承)。
另外在调用task.run()
之前还调用了clearInterruptsForTaskRun()
对当前线程的中断状态进行了清除:
- /**
- * Ensures that unless the pool is stopping, the current thread
- * does not have its interrupt set. This requires a double-check
- * of state in case the interrupt was cleared concurrently with a
- * shutdownNow -- if so, the interrupt is re-enabled.
- */
- private void clearInterruptsForTaskRun() {
- /**
- * 判断当前runState是否为RUNNING或SHUTDOWN,
- * 如果是则判断当前线程是否是中断的(判断同时会清除中断标志位为false),
- * 如果当前线程是中断然后再次判断runState是否为TIDYING或TERMINATED,
- * 如果是,则对当前线程进行中断
- */
- if (runStateLessThan(ctl.get(), STOP) &&
- Thread.interrupted() &&
- runStateAtLeast(ctl.get(), STOP))
- Thread.currentThread().interrupt();
- }
除了通过Worker对象的firstTask
属性获取任务对象,还可以通过getTask()
方法获取,且只有在firstTask
为空的情况下才会使用该方法获取,此时表示Worker对应的任务被置为了null,这种情况表明Worker所肩负的任务执行完了,因此可以从等待队列中获取了,而getTask()
方法的作用正是从等待队列中获取任务,该方法的源码如下:
- /**
- * Performs blocking or timed wait for a task, depending on
- * current configuration settings, or returns null if this worker
- * must exit because of any of:
- * 1. There are more than maximumPoolSize workers (due to
- * a call to setMaximumPoolSize).
- * 2. The pool is stopped.
- * 3. The pool is shutdown and the queue is empty.
- * 4. This worker timed out waiting for a task, and timed-out
- * workers are subject to termination (that is,
- * {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
- * both before and after the timed wait.
- *
- * @return task, or null if the worker must exit, in which case
- * workerCount is decremented
- */
- private Runnable getTask() {
- // 用于记录poll()方法是否超时
- boolean timedOut = false; // Did the last poll() time out?
- retry:
- // 无限循环
- for (; ; ) {
- // 获取ctl和runState
- int c = ctl.get();
- int rs = runStateOf(c);
- // 检查状态
- // Check if queue empty only if necessary.
- if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
- /**
- * 如果runState为STOP、TIDYING或TERMINATED
- * 或者runState为SHUTDOWN,且workerQueue为空
- * 将workerCount减1,返回null结束运行
- */
- decrementWorkerCount();
- return null;
- }
- // 记录worker是否能够被移除
- boolean timed; // Are workers subject to culling?
- // 无限循环
- for (; ; ) {
- // 获取workerCount
- int wc = workerCountOf(c);
- /**
- * 判断当前Worker是否可以被移除,即当前Worker是否可以一直等待任务。
- * 如果allowCoreThreadTimeOut为true,允许回收所有工作线程,包括非核心线程,
- * 如果allowCoreThreadTimeOut为false,但workerCount大于核心线程数,此时允许回收非核心线程,
- * 如果允许回收线程,则当前线程是有超时时间的(keepAliveTime),无法一直等待任务。
- */
- timed = allowCoreThreadTimeOut || wc > corePoolSize;
- /**
- * workerCount小于等于最大核心线程数,且没有超时,则跳出内层循环
- */
- if (wc <= maximumPoolSize && !(timedOut && timed))
- break;
- /**
- * 否则表示已超时,将workerCount减1,如果成功直接返回null,这里有两种情况:
- * 1. workerCount大于最大核心线程数,不管是否超时,都需要清理核心线程。这种情况一般不会出现。
- * 2. workerCount小于等于最大核心线程数,但是Worker超时了,且允许清理核心线程。
- */
- if (compareAndDecrementWorkerCount(c))
- return null;
- // 走到这里说明上一步workerCount减1失败了,重新读取ctl
- c = ctl.get(); // Re-read ctl
- // 如果与之前的runState不同,表示线程池状态发生改变了,跳出到外层循环重试
- if (runStateOf(c) != rs)
- continue retry;
- // else CAS failed due to workerCount change; retry inner loop
- }
- try {
- /**
- * 根据线程是否会超时来分别调用相应的方法,
- * poll()方法附带超时机制;take()方法没有超时机制,并且可能会阻塞等待。
- * 这里的设计很巧妙,如果timed为true,则表示允许核心线程超时被清理,
- * 但是需要达到超时时间,此时就使用poll()方法拉取任务,而poll的超时时间就是keepAliveTime,
- * 当超过keepAliveTime时间poll未能拉取到任务,说明该Worker超时了,此时r就会为null,
- * 而在下一次循环中,会由if (wc <= maximumPoolSize && !(timedOut && timed))这行代码,
- * break出循环,返回到runWorker()方法,而runWorker()方法中如果拉取的Task为空,
- * 也会跳出while循环,此时就会调用processWorkerExit()执行空闲线程清理工作。
- */
- Runnable r = timed ?
- workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
- workQueue.take();
- // 如果获取到的任务不为null则返回
- if (r != null)
- return r;
- // 走到这里表示获取操作超时了
- timedOut = true;
- } catch (InterruptedException retry) {
- // 被中断,可能是超时等待过程中被中断了
- timedOut = false;
- }
- }
- }
通过线程池的配置,Worker从工作队列获取任务,如果allowCoreThreadTimeOut
为false
且workerCount
小于等于corePoolSize
,则这些核心线程永远存活,并且一直在尝试获取工作队列的任务;否则,线程会有超时时间(keepAliveTime
),当在keepAliveTime
时间内使用poll(keepAliveTime, TimeUnit.NANOSECOND)
拉取不到任务,该线程的Worker会被移除。
1.9. 移除空闲Worker
移除Worker操作是在processWorkerExit(Worker, boolean
方法中实现的,当getTask()
方法返回null后,会导致runWorker(Worker)
方法中跳出while循环,调用processWorkerExit(Worker, boolean
方法将Worker移除。注意在返回null的之前,已经将workerCount
进行减1操作,因此在processWorkerExit(Worker, boolean
方法中,completedAbruptly
为false
的情况(即正常超时退出)不需要再将workerCount
减1。processWorkerExit(Worker, boolean
方法的源码如下:
- /**
- * Performs cleanup and bookkeeping for a dying worker. Called
- * only from worker threads. Unless completedAbruptly is set,
- * assumes that workerCount has already been adjusted to account
- * for exit. This method removes thread from worker set, and
- * possibly terminates the pool or replaces the worker if either
- * it exited due to user task exception or if fewer than
- * corePoolSize workers are running or queue is non-empty but
- * there are no workers.
- *
- * @param w the worker
- * @param completedAbruptly if the worker died due to user exception
- * 用于标识Worker线程死亡是否是由于用户产生的异常
- */
- private void processWorkerExit(Worker w, boolean completedAbruptly) {
- // completedAbruptly为true,此时workerCount还没有更新,因此需要手动减1
- if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
- decrementWorkerCount();
- // 获取锁
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- // 更新完成的任务数量
- completedTaskCount += w.completedTasks;
- // 从workers数组中移除w
- workers.remove(w);
- } finally {
- // 解锁
- mainLock.unlock();
- }
- // 尝试终止线程池
- tryTerminate();
- // 获取ctl
- int c = ctl.get();
- // 如果runState状态是SHUTDOWN或RUNNING,即还没有停止
- if (runStateLessThan(c, STOP)) {
- // 如果worker处理任务过程中没有出现异常
- if (!completedAbruptly) {
- /**
- * 获取允许的最小核心线程数,逻辑如下:
- * 1. 如果不允许核心线程超时,则min为核心线程数
- * 2. 如果允许核心线程超时,且workQueue不为空,则min为1,
- * 以确保至少有一个worker来处理队列里的任务
- */
- int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
- if (min == 0 && ! workQueue.isEmpty())
- min = 1;
- // 当workerCount大于等于min时,直接返回,不需要添加新的Worker。
- if (workerCountOf(c) >= min)
- return; // replacement not needed
- }
- /**
- * 添加一个新的Worker线程,走到这里有下面两种情况:
- * 1. worker中的任务因异常而退出了,此时Worker也会因抛出异常而终止,因此需要添加新的Worker;
- * 2. workerCount小于计算出来的最小线程数,有可能是因为核心线程超时了,因此需要添加新的Worker。
- */
- addWorker(null, false);
- }
- }
processWorkerExit(Worker, boolean
方法在统计完传入的Worker所完成的任务数之后将其从workers
集合中移除,然后尝试关闭线程池,tryTerminate()
方法源码如下:
- /**
- * Transitions to TERMINATED state if either (SHUTDOWN and pool
- * and queue empty) or (STOP and pool empty). If otherwise
- * eligible to terminate but workerCount is nonzero, interrupts an
- * idle worker to ensure that shutdown signals propagate. This
- * method must be called following any action that might make
- * termination possible -- reducing worker count or removing tasks
- * from the queue during shutdown. The method is non-private to
- * allow access from ScheduledThreadPoolExecutor.
- *
- * 当runState为SHUTDOWN或者STOP,且线程池为空时,尝试转换runState为TERMINATED
- */
- final void tryTerminate() {
- for (;;) {
- // 获取ctl
- int c = ctl.get();
- /**
- * 判断条件是否满足,如果不满足,直接返回
- * 1. 是RUNNING或SHUTDOWN状态,不满足;
- * 2. 是TIDYING或TERMINATED状态,不满足;
- * 3. 是SHUTDOWN状态,但workQueue不为空,不满足
- */
- if (isRunning(c) ||
- runStateAtLeast(c, TIDYING) ||
- (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
- return;
- // 如果worker数量不为0
- if (workerCountOf(c) != 0) { // Eligible to terminate
- /**
- * 尝试中断worker中的线程,
- * ONLY_ONE表示只尝试中断第一个worker中的线程
- */
- interruptIdleWorkers(ONLY_ONE);
- return;
- }
- // 获取锁
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- // 先修改runState为TIDYING,workerCount为0
- if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
- try {
- // 修改成功后,尝试调用终止钩子方法
- terminated();
- } finally {
- // 最终将runState修改为TERMINATED,workerCount修改为0
- ctl.set(ctlOf(TERMINATED, 0));
- // 唤醒等待在termination上的所有线程
- termination.signalAll();
- }
- return;
- }
- } finally {
- // 解锁
- mainLock.unlock();
- }
- // else retry on failed CAS
- }
- }
tryTerminate()
方法在检查环境合适之后,会尝试关闭线程池,流程尝试修改runState
和workerCount
,先修改runState
为TIDYING,workerCount
为0,修改成功后会调用terminated()
钩子方法,然后修改runState
为TERMINATED,并且唤醒阻塞在termination
上的所有线程。termination
的await()
阻塞方法只在awaitTermination(long, TimeUnit)
一个方法中有应用:
- public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
- // 转换超时时间
- long nanos = unit.toNanos(timeout);
- // 获取锁
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- for (;;) {
- // 尝试将runState转换为TERMINATED状态
- if (runStateAtLeast(ctl.get(), TERMINATED))
- // 成功就返回true
- return true;
- // 否则判断是否超时,如果超时就返回false
- if (nanos <= 0)
- return false;
- // 计算超时时间
- nanos = termination.awaitNanos(nanos);
- }
- } finally {
- // 解锁
- mainLock.unlock();
- }
- }
tryTerminate()
方法中还调用了interruptIdleWorkers(boolean)
对空闲线程进行中断操作,这类方法一共有三个,分别用于中断所有Worker线程、中断空闲Worker线程,源码如下:
- /**
- * Interrupts all threads, even if active. Ignores SecurityExceptions
- * (in which case some threads may remain uninterrupted).
- *
- * 中断所有worker中的线程
- */
- private void interruptWorkers() {
- // 获取锁
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- // 遍历Worker
- for (Worker w : workers) {
- try {
- // 中断Worker中的线程
- w.thread.interrupt();
- } catch (SecurityException ignore) {
- }
- }
- } finally {
- // 解锁
- mainLock.unlock();
- }
- }
- /**
- * Interrupts threads that might be waiting for tasks (as
- * indicated by not being locked) so they can check for
- * termination or configuration changes. Ignores
- * SecurityExceptions (in which case some threads may remain
- * uninterrupted).
- *
- * 中断空闲的Worker中的线程
- *
- * @param onlyOne If true, interrupt at most one worker. This is
- * called only from tryTerminate when termination is otherwise
- * enabled but there are still other workers. In this case, at
- * most one waiting worker is interrupted to propagate shutdown
- * signals in case all threads are currently waiting.
- * Interrupting any arbitrary thread ensures that newly arriving
- * workers since shutdown began will also eventually exit.
- * To guarantee eventual termination, it suffices to always
- * interrupt only one idle worker, but shutdown() interrupts all
- * idle workers so that redundant workers exit promptly, not
- * waiting for a straggler task to finish.
- */
- private void interruptIdleWorkers(boolean onlyOne) {
- // 获取锁
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- // 遍历Worker
- for (Worker w : workers) {
- // 获取worker中的线程
- Thread t = w.thread;
- // 如果t没有中断,尝试获取worker的锁
- if (!t.isInterrupted() && w.tryLock()) {
- try {
- // 尝试中断t
- t.interrupt();
- } catch (SecurityException ignore) {
- } finally {
- // 解锁Worker
- w.unlock();
- }
- }
- /**
- * 如果onlyOne,则直接break
- * 即这个参数表示只尝试中断第一个worker中的线程
- */
- if (onlyOne)
- break;
- }
- } finally {
- mainLock.unlock();
- }
- }
- /**
- * Common form of interruptIdleWorkers, to avoid having to
- * remember what the boolean argument means.
- * 重载方法,传入的onlyOne参数为false
- * 即将对所有的空闲Worker中的线程进行中断尝试
- */
- private void interruptIdleWorkers() {
- interruptIdleWorkers(false);
- }
注:
interruptIdleWorkers(boolean onlyOne)
的参数onlyOne
用于控制是否只中断第一个Worker的thread
线程。
1.10. 线程池的关闭
ThreadPoolExecutor中关于线程池关闭的方法有两类,源码如下:
- /**
- * Initiates an orderly shutdown in which previously submitted
- * tasks are executed, but no new tasks will be accepted.
- * Invocation has no additional effect if already shut down.
- *
- * <p>This method does not wait for previously submitted tasks to
- * complete execution. Use {@link #awaitTermination awaitTermination}
- * to do that.
- *
- * 发起线程池的有序关闭,会对所有空闲Worker进行中断
- * 但已经运行的任务还会继续运行
- *
- * @throws SecurityException {@inheritDoc}
- */
- public void shutdown() {
- // 加锁
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- // 检查可否关闭的权限
- checkShutdownAccess();
- // 将runState转换为SHUTDOWN
- advanceRunState(SHUTDOWN);
- // 中断空闲的Worker
- interruptIdleWorkers();
- // 调用onShutdown()钩子方法
- onShutdown(); // hook for ScheduledThreadPoolExecutor
- } finally {
- // 解锁
- mainLock.unlock();
- }
- // 尝试终止线程池
- tryTerminate();
- }
- /**
- * Attempts to stop all actively executing tasks, halts the
- * processing of waiting tasks, and returns a list of the tasks
- * that were awaiting execution. These tasks are drained (removed)
- * from the task queue upon return from this method.
- *
- * <p>This method does not wait for actively executing tasks to
- * terminate. Use {@link #awaitTermination awaitTermination} to
- * do that.
- *
- * <p>There are no guarantees beyond best-effort attempts to stop
- * processing actively executing tasks. This implementation
- * cancels tasks via {@link Thread#interrupt}, so any task that
- * fails to respond to interrupts may never terminate.
- *
- * 发起线程池的有序关闭,会对所有的Worker进行中断
- * 已经运行的任务
- *
- * @throws SecurityException {@inheritDoc}
- */
- public List<Runnable> shutdownNow() {
- List<Runnable> tasks;
- // 加锁
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- // 检查可否关闭的权限
- checkShutdownAccess();
- // 将runState转换为SHUTDOWN
- advanceRunState(STOP);
- // 中断所有的Worker
- interruptWorkers();
- // 将队列中的任务全部取出赋值给task
- tasks = drainQueue();
- } finally {
- // 解锁
- mainLock.unlock();
- }
- // 尝试终止线程池
- tryTerminate();
- // 返回保存的任务
- return tasks;
- }
这两类方法的区别在于,shutdown()
方法会将线程池状态切换为SHUTDOWN,中断所有空闲的Worker线程,然后调用tryTerminate()
尝试终止线程池,此时虽然线程池关闭了,正在运行的任务并不会立即结束,而是继续正常运行,但任务队列中的任务将会被直接丢弃。shutdownNow()
方法则不同,它会将线程池状态切换为STOP,中断所有的Worker线程,然后调用tryTerminate()
尝试终止线程池,此时线程池关闭动作会将所有正在运行的任务也一并停止,同时还会将任务队列中的等待执行的任务全部取出放在一个列表中返回给调用者。
1.11. 钩子方法
ThreadPoolExecutor提供了一些钩子方法给开发者进行重写,以获取线程池的不同时刻的状态信息,这些方法如下:
- /**
- * Performs any further cleanup following run state transition on
- * invocation of shutdown. A no-op here, but used by
- * ScheduledThreadPoolExecutor to cancel delayed tasks.
- */
- void onShutdown() {
- }
- /**
- * Method invoked prior to executing the given Runnable in the
- * given thread. This method is invoked by thread {@code t} that
- * will execute task {@code r}, and may be used to re-initialize
- * ThreadLocals, or to perform logging.
- *
- * <p>This implementation does nothing, but may be customized in
- * subclasses. Note: To properly nest multiple overridings, subclasses
- * should generally invoke {@code super.beforeExecute} at the end of
- * this method.
- *
- * 执行前钩子函数
- *
- * @param t the thread that will run task {@code r}
- * @param r the task that will be executed
- */
- protected void beforeExecute(Thread t, Runnable r) { }
- /**
- * Method invoked upon completion of execution of the given Runnable.
- * This method is invoked by the thread that executed the task. If
- * non-null, the Throwable is the uncaught {@code RuntimeException}
- * or {@code Error} that caused execution to terminate abruptly.
- *
- * <p>This implementation does nothing, but may be customized in
- * subclasses. Note: To properly nest multiple overridings, subclasses
- * should generally invoke {@code super.afterExecute} at the
- * beginning of this method.
- *
- * <p><b>Note:</b> When actions are enclosed in tasks (such as
- * {@link FutureTask}) either explicitly or via methods such as
- * {@code submit}, these task objects catch and maintain
- * computational exceptions, and so they do not cause abrupt
- * termination, and the internal exceptions are <em>not</em>
- * passed to this method. If you would like to trap both kinds of
- * failures in this method, you can further probe for such cases,
- * as in this sample subclass that prints either the direct cause
- * or the underlying exception if a task has been aborted:
- *
- * <pre> {@code
- * class ExtendedExecutor extends ThreadPoolExecutor {
- * // ...
- * protected void afterExecute(Runnable r, Throwable t) {
- * super.afterExecute(r, t);
- * if (t == null && r instanceof Future<?>) {
- * try {
- * Object result = ((Future<?>) r).get();
- * } catch (CancellationException ce) {
- * t = ce;
- * } catch (ExecutionException ee) {
- * t = ee.getCause();
- * } catch (InterruptedException ie) {
- * Thread.currentThread().interrupt(); // ignore/reset
- * }
- * }
- * if (t != null)
- * System.out.println(t);
- * }
- * }}</pre>
- *
- * 执行后钩子函数
- *
- * @param r the runnable that has completed
- * @param t the exception that caused termination, or null if
- * execution completed normally
- */
- protected void afterExecute(Runnable r, Throwable t) { }
- /**
- * Method invoked when the Executor has terminated. Default
- * implementation does nothing. Note: To properly nest multiple
- * overridings, subclasses should generally invoke
- * {@code super.terminated} within this method.
- *
- * 线程池已终止的钩子函数
- */
- protected void terminated() { }
我们可以自定义继承于ThreadPoolExecutor的类,重写这些方法以应用。
1.12. 其他方法
ThreadPoolExecutor类中还有提供了很多Getter/Setter方法、工具方法供开发者使用,具体实现也比较简单,这里不在赘述,读者可以在下面完整的ThreadPoolExecutor源码注释中查阅理解。
推荐阅读
Java多线程 46 - ScheduledThreadPoolExecutor详解(2)
ScheduledThreadPoolExecutor用于执行周期性或延时性的定时任务,它是在ThreadPoolExe...