Java多线程
Java并发
线程池

Java多线程 42 - Runnable、Callable和Future详解

简介:对于Runnable接口我们其实是很熟悉的,在前面对线程的使用中经常接触它。Runnable接口是针对单纯的无返回值任务,但在Java的多线程机制中,还提供了有返回值任务相应的接口,接下将详细讨论这些线程执行的任务。

对于Runnable接口我们其实是很熟悉的,在前面对线程的使用中经常接触它。Runnable接口是针对单纯的无返回值任务,但在Java的多线程机制中,还提供了有返回值任务相应的接口,接下将详细讨论这些线程执行的任务。

1. Runnable、Callable和Future

  1. Runnable

Runnable是一个没有返回值的任务,接口的定义如下:

  • public interface Runnable {
  • // 线程执行该任务时将会运行该方法
  • public abstract void run();
  • }
  1. Callable

Callable是一个有返回值的任务,看下该接口的定义:

  • public interface Callable<V> {
  • // 线程执行任务时将会运行该方法
  • V call() throws Exception;
  • }

Callable是一个泛型接口,它的call()方法可以返回定义时规定的泛型值,或者抛出异常;Runnable的run()方法不会抛出检查异常。

  1. Future

Future接口可用于异步获取执行结果,它提供了检查任务是否完成及获取执行结果的方法。其中获取执行结果的方法有两个,都是阻塞式等待的方法,其中一个方法提供了超时机制;在获取执行结果时如果任务没有执行完成就会进入等待状态,直到任务执行完成返回结果。Future还提供了取消任务的方法,如果执行没有返回值(例如Runnable)的任务,并且希望能够取消任务,可以使用该方法。Future接口的源码如下:

  • public interface Future<V> {
  • /**
  • * 尝试取消正在执行的任务,如果任务已经完成该操作就会失败,如果任务还未执行取消就会成功
  • * 如果任务正在执行,方法的参数就会指示线程是否需要中断,从而尝试停止任务的执行
  • */
  • boolean cancel(boolean mayInterruptIfRunning);
  • // 任务完成之前是否已经被取消
  • boolean isCancelled();
  • /**
  • * 任务是否已经完成
  • * 任务正常终止、抛出异常、或者被取消,该方法都会返回true
  • */
  • boolean isDone();
  • /**
  • * 取任务执行的结果,如果任务未完成,取结果的线程会阻塞
  • * 如果任务被取消了,该方法抛出CancellationException
  • * 如果线程被中断,抛出中断异常
  • */
  • V get() throws InterruptedException, ExecutionException;
  • // 支持超时时间的get方法,可能会抛出超时异常
  • V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
  • }
  1. RunnableFuture

RunnableFuture继承了Runnable和Future接口,它是一个具有异步执行结果的任务,并且可以通过Future的方法取该任务的执行结果。定义如下:

  • public interface RunnableFuture<V> extends Runnable, Future<V> {
  • void run();
  • }

2. FutureTask

FutureTask是一个可取消的异步任务,实现了RunnableFuture接口。

2.1. FutureTask整体结构

以JDK 1.7.0_07版本为例,我们先观察FutureTask的构造方法,以确定任务是如何被添加的:

  • /**
  • * Creates a <tt>FutureTask</tt> that will, upon running, execute the
  • * given <tt>Callable</tt>.
  • * <p>
  • * 根据传入的Callable对象创建FutrueTask
  • *
  • * @param callable the callable task
  • * @throws NullPointerException if callable is null
  • */
  • public FutureTask(Callable<V> callable) {
  • // 检查参数
  • if (callable == null)
  • throw new NullPointerException();
  • // 初始化同步器
  • sync = new Sync(callable);
  • }
  • /**
  • * Creates a <tt>FutureTask</tt> that will, upon running, execute the
  • * given <tt>Runnable</tt>, and arrange that <tt>get</tt> will return the
  • * given result on successful completion.
  • * <p>
  • * 根据传入的Runnable对象创建Futrue
  • * 由于Runnable任务没有返回值,因此可以用result参数指定任务完成后的返回值
  • *
  • * @param runnable the runnable task
  • * @param result the result to return on successful completion. If
  • * you don't need a particular result, consider using
  • * constructions of the form:
  • * {@code Future<?> f = new FutureTask<Void>(runnable, null)}
  • * @throws NullPointerException if runnable is null
  • */
  • public FutureTask(Runnable runnable, V result) {
  • // 初始化同步器
  • sync = new Sync(Executors.callable(runnable, result));
  • }

FutureTask的构造方法有两个,当传入的参数为Callable对象时会将其作为参数构造Sync对象;当传入对象是Runnable对象时,会使用Executors对Runnable对象进行Callable转换,Executors的callable(Runnable task, T result)方法的源码如下:

  • /**
  • * Returns a {@link Callable} object that, when
  • * called, runs the given task and returns the given result. This
  • * can be useful when applying methods requiring a
  • * <tt>Callable</tt> to an otherwise resultless action.
  • * @param task the task to run
  • * @param result the result to return
  • * @return a callable object
  • * @throws NullPointerException if task null
  • */
  • public static <T> Callable<T> callable(Runnable task, T result) {
  • if (task == null)
  • throw new NullPointerException();
  • return new RunnableAdapter<T>(task, result);
  • }

这个方法最终会将Runnable对象包装为一个RunnableAdapter对象,RunnableAdapter的实现是比较简单的,它是Executors的一个静态内部类:

  • static final class RunnableAdapter<T> implements Callable<T> {
  • final Runnable task;
  • final T result;
  • RunnableAdapter(Runnable task, T result) {
  • this.task = task;
  • this.result = result;
  • }
  • public T call() {
  • task.run();
  • return result;
  • }
  • }

RunnableAdapter会扩展一个call()方法,内部调用传入的Runnable对象taskrun()方法,并将传入的result作为执行结果返回。

FutureTask内部其实是依赖一个继承自AbstractQueuedSynchronized的同步器进行主要的操作。先看FutureTask的自己的方法,源码如下:

  • public class FutureTask<V> implements RunnableFuture<V> {
  • /** Synchronization control for FutureTask */
  • private final Sync sync;
  • /**
  • * Creates a <tt>FutureTask</tt> that will, upon running, execute the
  • * given <tt>Callable</tt>.
  • *
  • * 根据传入的Callable对象创建FutureTask
  • *
  • * @param callable the callable task
  • * @throws NullPointerException if callable is null
  • */
  • public FutureTask(Callable<V> callable) {
  • // 检查参数
  • if (callable == null)
  • throw new NullPointerException();
  • // 初始化同步器
  • sync = new Sync(callable);
  • }
  • /**
  • * Creates a <tt>FutureTask</tt> that will, upon running, execute the
  • * given <tt>Runnable</tt>, and arrange that <tt>get</tt> will return the
  • * given result on successful completion.
  • *
  • * 根据传入的Runnable对象创建Futrue
  • * 由于Runnable任务没有返回值,因此可以用result参数指定任务完成后的返回值
  • *
  • * @param runnable the runnable task
  • * @param result the result to return on successful completion. If
  • * you don't need a particular result, consider using
  • * constructions of the form:
  • * {@code Future<?> f = new FutureTask<Void>(runnable, null)}
  • * @throws NullPointerException if runnable is null
  • */
  • public FutureTask(Runnable runnable, V result) {
  • // 初始化同步器
  • sync = new Sync(Executors.callable(runnable, result));
  • }
  • // 判断任务是否被取消,内部调用同步器的方法
  • public boolean isCancelled() {
  • return sync.innerIsCancelled();
  • }
  • // 判断任务是否完成,内部调用同步器的方法
  • public boolean isDone() {
  • return sync.innerIsDone();
  • }
  • // 取消任务,内部调用同步器的方法
  • public boolean cancel(boolean mayInterruptIfRunning) {
  • return sync.innerCancel(mayInterruptIfRunning);
  • }
  • /**
  • * 获取任务结果,内部调用同步器的方法
  • * 如果任务没有执行完会进入阻塞等待状态
  • * @throws CancellationException {@inheritDoc}
  • */
  • public V get() throws InterruptedException, ExecutionException {
  • return sync.innerGet();
  • }
  • /**
  • * 获取任务结果,内部调用同步器的方法,带有超时机制
  • * 如果任务没有执行完会进入阻塞等待状态,直到超时会抛出TimeoutException
  • * @throws CancellationException {@inheritDoc}
  • */
  • public V get(long timeout, TimeUnit unit)
  • throws InterruptedException, ExecutionException, TimeoutException {
  • return sync.innerGet(unit.toNanos(timeout));
  • }
  • /**
  • * Protected method invoked when this task transitions to state
  • * <tt>isDone</tt> (whether normally or via cancellation). The
  • * default implementation does nothing. Subclasses may override
  • * this method to invoke completion callbacks or perform
  • * bookkeeping. Note that you can query status inside the
  • * implementation of this method to determine whether this task
  • * has been cancelled.
  • *
  • * 钩子方法,供开发者使用
  • */
  • protected void done() { }
  • /**
  • * Sets the result of this Future to the given value unless
  • * this future has already been set or has been cancelled.
  • * This method is invoked internally by the <tt>run</tt> method
  • * upon successful completion of the computation.
  • *
  • * 设置任务结果,内部调用同步器的方法
  • *
  • * @param v the value
  • */
  • protected void set(V v) {
  • sync.innerSet(v);
  • }
  • /**
  • * Causes this future to report an <tt>ExecutionException</tt>
  • * with the given throwable as its cause, unless this Future has
  • * already been set or has been cancelled.
  • * This method is invoked internally by the <tt>run</tt> method
  • * upon failure of the computation.
  • *
  • * 设置异常信息,内部调用同步器的方法
  • *
  • * @param t the cause of failure
  • */
  • protected void setException(Throwable t) {
  • sync.innerSetException(t);
  • }
  • // The following (duplicated) doc comment can be removed once
  • //
  • // 6270645: Javadoc comments should be inherited from most derived
  • // superinterface or superclass
  • // is fixed.
  • /**
  • * Sets this Future to the result of its computation
  • * unless it has been cancelled.
  • *
  • * 运行任务,内部调用同步器的方法
  • */
  • public void run() {
  • sync.innerRun();
  • }
  • /**
  • * Executes the computation without setting its result, and then
  • * resets this Future to initial state, failing to do so if the
  • * computation encounters an exception or is cancelled. This is
  • * designed for use with tasks that intrinsically execute more
  • * than once.
  • *
  • * 运行任务,运行完成后会重置同步器,内部调用同步器的方法
  • *
  • * @return true if successfully run and reset
  • */
  • protected boolean runAndReset() {
  • return sync.innerRunAndReset();
  • }
  • ...
  • }

从FutureTask自己的一些方法可知,它内部都调用了sync对象的相关方法。sync对象会在创建FutureTask的构造方法中初始化,传入了相应的任务。

sync对象的类型是Sync,该类继承自AQS类,实现了共享锁机制。先观察该类的几个成员变量如下:

  • private final class Sync extends AbstractQueuedSynchronizer {
  • private static final long serialVersionUID = -7828117401763700385L;
  • /**
  • * State value representing that task is ready to run
  • * 任务准备运行
  • * */
  • private static final int READY = 0;
  • /**
  • * State value representing that task is running
  • * 任务正在运行
  • * */
  • private static final int RUNNING = 1;
  • /**
  • * State value representing that task ran
  • * 任务运行完毕
  • * */
  • private static final int RAN = 2;
  • /**
  • * State value representing that task was cancelled
  • * 任务取消
  • * */
  • private static final int CANCELLED = 4;
  • /**
  • * The underlying callable
  • * 内部使用的Callable对象
  • * */
  • private final Callable<V> callable;
  • /**
  • * The result to return from get()
  • * 用于get()方法返回的执行结果
  • * */
  • private V result;
  • /**
  • * The exception to throw from get()
  • * 运行过程中出现的异常
  • * */
  • private Throwable exception;
  • /**
  • * The thread running task. When nulled after set/cancel, this
  • * indicates that the results are accessible. Must be
  • * volatile, to ensure visibility upon completion.
  • *
  • * 执行任务的线程,在设置或取消后会为null,表示执行结果是可以访问的
  • */
  • private volatile Thread runner;
  • ...
  • }

其中值得关注的是任务的四种状态:READY、RUNNING、RAN和CANCELLED,分别表示就绪、运行中、运行结束和取消运行四种状态。在Sync中会以CAS方式来控制这些状态的转换以标识任务当前情况。

另外,callable用于记录需要执行的任务,result用于记录任务执行结果,exception用于记录任务执行过程中产生的异常,Thread类型的runner则用于记录运行当前任务的线程。

2.2. 任务执行

FutureTask是一个实现了Runnable接口的类,从它的run()方法实现可知底层其实调用的syncinnerRun()方法,这也是FutureTask主要执行逻辑,该方法的源码如下:

  • void innerRun() {
  • // 尝试将任务状态从READY转换为RUNNING,如果转换失败就直接返回
  • if (!compareAndSetState(READY, RUNNING))
  • return;
  • // 运行到这里表示任务状态已经转换为RUNNING成功了
  • runner = Thread.currentThread();
  • // 重新检查任务状态是否是RUNNING
  • if (getState() == RUNNING) { // recheck after setting thread
  • // 检查通过
  • V result;
  • try {
  • // 尝试获取任务运行结果
  • result = callable.call();
  • } catch (Throwable ex) {
  • /**
  • * 如果出现异常,就使用exception成员变量进行记录
  • * 此操作会将任务状态设置为RAN
  • */
  • setException(ex);
  • // 直接返回结束运行
  • return;
  • }
  • /**
  • * 运行到这里表明任务执行完成且没有发生异常,
  • * 将结果用result记录
  • */
  • set(result);
  • } else {
  • // 释放共享锁
  • releaseShared(0); // cancel
  • }
  • }

从该方法的源码可知,任务在READY状态下,并成功将其转换为RUNNING状态才表示任务开始执行了。执行任务的主要逻辑其实是调用callable.call()方法,同时会捕获运行过程中产生的异常并且用exception进行记录,最后会使用result记录callable.call()的执行结果。其中setException(Throwable)set(V)方法属于FutureTask类,内部调用了sync.innerSetException(t)sync.innerSet(v),Sync类的这两个方法源码如下:

  • // 设置异常
  • void innerSetException(Throwable t) {
  • // 无限循环
  • for (; ; ) {
  • // 获取任务状态
  • int s = getState();
  • // 如果任务状态是RAN,则直接返回,因此此时任务已经结束了
  • if (s == RAN)
  • return;
  • // 如果任务状态是CANCELLED,需要释放共享锁,以将runner设置为null
  • if (s == CANCELLED) {
  • // aggressively release to set runner to null,
  • // in case we are racing with a cancel request
  • // that will try to interrupt runner
  • releaseShared(0);
  • return;
  • }
  • // 尝试将任务状态设置为RAN
  • if (compareAndSetState(s, RAN)) {
  • // 如果设置成功,则将exception设置为t
  • exception = t;
  • // 释放共享锁
  • releaseShared(0);
  • // 调用done()钩子方法
  • done();
  • // 返回
  • return;
  • }
  • }
  • }
  • // 设置结果
  • void innerSet(V v) {
  • // 无限循环
  • for (; ; ) {
  • // 获取任务状态
  • int s = getState();
  • // 如果任务状态是RAN,则直接返回,因此此时任务已经结束了
  • if (s == RAN)
  • return;
  • // 如果任务状态是CANCELLED,需要释放共享锁,以将runner设置为null
  • if (s == CANCELLED) {
  • // aggressively release to set runner to null,
  • // in case we are racing with a cancel request
  • // that will try to interrupt runner
  • releaseShared(0);
  • // 无结果返回
  • return;
  • }
  • // 尝试将任务状态设置为RAN
  • if (compareAndSetState(s, RAN)) {
  • // 如果设置成功,则将result设置为v
  • result = v;
  • // 释放共享锁
  • releaseShared(0);
  • // 调用done()钩子方法
  • done();
  • // 返回
  • return;
  • }
  • }
  • }

这两个方法的整体代码很相似,都处于一个无限循环中,同时首先会检查任务状态,如果为RAN或CANCELLED就不进行处理。检查通过后会尝试将任务状态转换为RAN,唯一的区别在于转换状态成功后,innerSetException(Throwable)使用exception = t对异常进行记录,而innerSet(V)使用result = v对结果进行记录。两个方法在记录成功后都释放了共享锁,并且调用了done()钩子方法。

除此之外,FutureTask还提供了一个runAndReset()方法可以运行任务并重置同步器状态,该方法使用protected修饰,并不对外公开,源码如下:

  • /**
  • * Executes the computation without setting its result, and then
  • * resets this Future to initial state, failing to do so if the
  • * computation encounters an exception or is cancelled. This is
  • * designed for use with tasks that intrinsically execute more
  • * than once.
  • * <p>
  • * 运行任务,运行完成后会重置同步器,内部调用同步器的方法
  • *
  • * @return true if successfully run and reset
  • */
  • protected boolean runAndReset() {
  • return sync.innerRunAndReset();
  • }

调用的Sync的innerRunAndReset()方法源码如下:

  • boolean innerRunAndReset() {
  • // 尝试将任务状态从READY转换为RUNNING,如果转换失败就直接返回false
  • if (!compareAndSetState(READY, RUNNING))
  • return false;
  • // 运行到这里表示任务状态已经转换为RUNNING成功了
  • try {
  • runner = Thread.currentThread();
  • // 重新检查任务状态是否是RUNNING
  • if (getState() == RUNNING)
  • // 如果是RUNNING,则运行任务,但不记录任务结果
  • callable.call(); // don't set result
  • /**
  • * 运行到这里有两种情况:
  • * 1. 任务运行完成
  • * 2. 任务状态不是RUNNING
  • * 将runner置为null
  • */
  • runner = null;
  • // 如果任务状态为RUNNING,则将任务状态设置为READY
  • return compareAndSetState(RUNNING, READY);
  • } catch (Throwable ex) {
  • /**
  • * 如果出现异常,就使用exception成员变量进行记录
  • * 此操作会将任务状态设置为RAN
  • */
  • setException(ex);
  • // 直接返回false结束运行
  • return false;
  • }
  • }

innerRunAndReset()中,任务的执行将不会记录执行结果,仅仅是单纯执行,执行完后会将任务状态置为READY,同时将runner成员变量变量置为null。

2.3. 任务取消

FutureTask对任务的取消操作也调用了Sync的相关方法:

  • // 取消任务,内部调用同步器的方法
  • public boolean cancel(boolean mayInterruptIfRunning) {
  • return sync.innerCancel(mayInterruptIfRunning);
  • }

Sync的innerCancel(boolean)方法源码如下:

  • // 将任务状态设置为CANCELLED
  • boolean innerCancel(boolean mayInterruptIfRunning) {
  • // 无限循环
  • for (; ; ) {
  • // 获取任务状态
  • int s = getState();
  • // 如果状态是RAN或CANCELLED,表示此时任务已经结束了,无法设置,返回false
  • if (ranOrCancelled(s))
  • return false;
  • // 尝试设置任务状态为CANCELLED,设置成功就跳出for循环
  • if (compareAndSetState(s, CANCELLED))
  • break;
  • }
  • /**
  • * 运行到此处表示设置任务状态为CANCELLED已经成功
  • * 此时如果需要中断任务线程,则将其中断
  • */
  • if (mayInterruptIfRunning) {
  • Thread r = runner;
  • if (r != null)
  • r.interrupt();
  • }
  • // 释放共享锁
  • releaseShared(0);
  • // 调用done()钩子方法
  • done();
  • // 返回true表示设置状态成功
  • return true;
  • }

从源码可以得知,只有在任务状态不为RAN或CANCELLED时任务才可以被取消;同时如果传入的参数mayInterruptIfRunning为false,对任务的取消仅仅是将任务状态转换为CANCELLED,而任务还是会继续执行,当mayInterruptIfRunning为true时,会对执行任务的线程进行中断操作。

2.3. 执行结果获取

在FutureTask中,对任务的获取有两种方式,在任务还未执行完成时,这两种方式都会进入阻塞等待的状态,其中一种的不同之处在于它还带有超时机制,源码如下:

  • /**
  • * 获取任务结果,内部调用同步器的方法
  • * 如果任务没有执行完会进入阻塞等待状态
  • *
  • * @throws CancellationException {@inheritDoc}
  • */
  • public V get() throws InterruptedException, ExecutionException {
  • return sync.innerGet();
  • }
  • /**
  • * 获取任务结果,内部调用同步器的方法,带有超时机制
  • * 如果任务没有执行完会进入阻塞等待状态,直到超时会抛出TimeoutException
  • *
  • * @throws CancellationException {@inheritDoc}
  • */
  • public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
  • return sync.innerGet(unit.toNanos(timeout));
  • }

从源码可知,内部也是调用的Sync的相关方法。我们先关注不带超时机制的innerGet(),源码如下:

  • // 获取结果,可能会进入阻塞等待
  • V innerGet() throws InterruptedException, ExecutionException {
  • // 尝试获取共享锁,如果没获取到,会阻塞等待(可中断)
  • acquireSharedInterruptibly(0);
  • // 已获取到共享锁
  • if (getState() == CANCELLED)
  • // 如果状态是已取消,则抛出异常
  • throw new CancellationException();
  • // 状态不是已取消
  • if (exception != null)
  • // 如果任务状态是已取消,则没有运行结果,直接抛出异常
  • throw new ExecutionException(exception);
  • // 返回运行结果
  • return result;
  • }

实现比较简单,首先会使用acquireSharedInterruptibly(int)尝试获取共享锁,当任务还未执行完成时,这里会进入阻塞等待状态;已获取到共享锁后会根据执行的情况依次判断处理方式,当任务被取消时,获取结果将会得到CancellationException异常;而当任务执行过程中发生异常时,获取结果操作将会抛出这个异常(使用ExecutionException进行了包装);只有在任务顺利执行完成时才能正确地获取到结果。

带有超时机制的获取结果方法innerGet(long)仅仅是将获取共享锁操作换成了AQStryAcquireSharedNanos(int arg, long nanosTimeout)方法进行处理,该方法自带超时机制;其他的实现代码与innerGet()一样;源码如下:

  • // 获取结果,可能会进入带有超时机制的阻塞等待
  • V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException {
  • // 尝试获取共享锁,带有超时机制,如果等待超时会抛出TimeoutException异常
  • if (!tryAcquireSharedNanos(0, nanosTimeout))
  • throw new TimeoutException();
  • // 已获取到共享锁
  • if (getState() == CANCELLED)
  • // 如果任务状态是已取消,则没有运行结果,直接抛出异常
  • throw new CancellationException();
  • // 状态不是已取消
  • if (exception != null)
  • // 如果exception不为null,则抛出exception异常
  • throw new ExecutionException(exception);
  • // 返回运行结果
  • return result;
  • }

至此FutureTask的主要方法都介绍完了。

FutureTask源码完整注释

下面是FutureTask源码的完整注释,基于JDK 1.7.0_07:

  • package java.util.concurrent;
  • import java.util.concurrent.locks.*;
  • public class FutureTask<V> implements RunnableFuture<V> {
  • /**
  • * Synchronization control for FutureTask
  • */
  • private final Sync sync;
  • /**
  • * Creates a <tt>FutureTask</tt> that will, upon running, execute the
  • * given <tt>Callable</tt>.
  • * <p>
  • * 根据传入的Callable对象创建FutrueTask
  • *
  • * @param callable the callable task
  • * @throws NullPointerException if callable is null
  • */
  • public FutureTask(Callable<V> callable) {
  • // 检查参数
  • if (callable == null)
  • throw new NullPointerException();
  • // 初始化同步器
  • sync = new Sync(callable);
  • }
  • /**
  • * Creates a <tt>FutureTask</tt> that will, upon running, execute the
  • * given <tt>Runnable</tt>, and arrange that <tt>get</tt> will return the
  • * given result on successful completion.
  • * <p>
  • * 根据传入的Runnable对象创建Futrue
  • * 由于Runnable任务没有返回值,因此可以用result参数指定任务完成后的返回值
  • *
  • * @param runnable the runnable task
  • * @param result the result to return on successful completion. If
  • * you don't need a particular result, consider using
  • * constructions of the form:
  • * {@code Future<?> f = new FutureTask<Void>(runnable, null)}
  • * @throws NullPointerException if runnable is null
  • */
  • public FutureTask(Runnable runnable, V result) {
  • // 初始化同步器
  • sync = new Sync(Executors.callable(runnable, result));
  • }
  • // 判断任务是否被取消,内部调用同步器的方法
  • public boolean isCancelled() {
  • return sync.innerIsCancelled();
  • }
  • // 判断任务是否完成,内部调用同步器的方法
  • public boolean isDone() {
  • return sync.innerIsDone();
  • }
  • // 取消任务,内部调用同步器的方法
  • public boolean cancel(boolean mayInterruptIfRunning) {
  • return sync.innerCancel(mayInterruptIfRunning);
  • }
  • /**
  • * 获取任务结果,内部调用同步器的方法
  • * 如果任务没有执行完会进入阻塞等待状态
  • *
  • * @throws CancellationException {@inheritDoc}
  • */
  • public V get() throws InterruptedException, ExecutionException {
  • return sync.innerGet();
  • }
  • /**
  • * 获取任务结果,内部调用同步器的方法,带有超时机制
  • * 如果任务没有执行完会进入阻塞等待状态,直到超时会抛出TimeoutException
  • *
  • * @throws CancellationException {@inheritDoc}
  • */
  • public V get(long timeout, TimeUnit unit)
  • throws InterruptedException, ExecutionException, TimeoutException {
  • return sync.innerGet(unit.toNanos(timeout));
  • }
  • /**
  • * Protected method invoked when this task transitions to state
  • * <tt>isDone</tt> (whether normally or via cancellation). The
  • * default implementation does nothing. Subclasses may override
  • * this method to invoke completion callbacks or perform
  • * bookkeeping. Note that you can query status inside the
  • * implementation of this method to determine whether this task
  • * has been cancelled.
  • * <p>
  • * 钩子方法,供开发者使用
  • */
  • protected void done() {
  • }
  • /**
  • * Sets the result of this Future to the given value unless
  • * this future has already been set or has been cancelled.
  • * This method is invoked internally by the <tt>run</tt> method
  • * upon successful completion of the computation.
  • * <p>
  • * 设置任务结果,内部调用同步器的方法
  • *
  • * @param v the value
  • */
  • protected void set(V v) {
  • sync.innerSet(v);
  • }
  • /**
  • * Causes this future to report an <tt>ExecutionException</tt>
  • * with the given throwable as its cause, unless this Future has
  • * already been set or has been cancelled.
  • * This method is invoked internally by the <tt>run</tt> method
  • * upon failure of the computation.
  • * <p>
  • * 设置异常信息,内部调用同步器的方法
  • *
  • * @param t the cause of failure
  • */
  • protected void setException(Throwable t) {
  • sync.innerSetException(t);
  • }
  • // The following (duplicated) doc comment can be removed once
  • //
  • // 6270645: Javadoc comments should be inherited from most derived
  • // superinterface or superclass
  • // is fixed.
  • /**
  • * Sets this Future to the result of its computation
  • * unless it has been cancelled.
  • * <p>
  • * 运行任务,内部调用同步器的方法
  • */
  • public void run() {
  • sync.innerRun();
  • }
  • /**
  • * Executes the computation without setting its result, and then
  • * resets this Future to initial state, failing to do so if the
  • * computation encounters an exception or is cancelled. This is
  • * designed for use with tasks that intrinsically execute more
  • * than once.
  • * <p>
  • * 运行任务,运行完成后会重置同步器,内部调用同步器的方法
  • *
  • * @return true if successfully run and reset
  • */
  • protected boolean runAndReset() {
  • return sync.innerRunAndReset();
  • }
  • /**
  • * Synchronization control for FutureTask. Note that this must be
  • * a non-static inner class in order to invoke the protected
  • * <tt>done</tt> method. For clarity, all inner class support
  • * methods are same as outer, prefixed with "inner".
  • * <p>
  • * Uses AQS sync state to represent run status
  • */
  • private final class Sync extends AbstractQueuedSynchronizer {
  • private static final long serialVersionUID = -7828117401763700385L;
  • /**
  • * State value representing that task is ready to run
  • * 任务准备运行
  • */
  • private static final int READY = 0;
  • /**
  • * State value representing that task is running
  • * 任务正在运行
  • */
  • private static final int RUNNING = 1;
  • /**
  • * State value representing that task ran
  • * 任务运行完毕
  • */
  • private static final int RAN = 2;
  • /**
  • * State value representing that task was cancelled
  • * 任务取消
  • */
  • private static final int CANCELLED = 4;
  • /**
  • * The underlying callable
  • * 内部使用的Callable对象
  • */
  • private final Callable<V> callable;
  • /**
  • * The result to return from get()
  • * 用于get()方法返回的执行结果
  • */
  • private V result;
  • /**
  • * The exception to throw from get()
  • * 运行过程中出现的异常
  • */
  • private Throwable exception;
  • /**
  • * The thread running task. When nulled after set/cancel, this
  • * indicates that the results are accessible. Must be
  • * volatile, to ensure visibility upon completion.
  • * <p>
  • * 执行任务的线程,在设置或取消后会为null,表示执行结果是可以访问的
  • */
  • private volatile Thread runner;
  • // 构造方法,会将传入的Callable对象进行引用
  • Sync(Callable<V> callable) {
  • this.callable = callable;
  • }
  • // 判断状态是否是已运行完毕或取消
  • private boolean ranOrCancelled(int state) {
  • return (state & (RAN | CANCELLED)) != 0;
  • }
  • /**
  • * Implements AQS base acquire to succeed if ran or cancelled
  • * <p>
  • * 尝试获取共享锁
  • * 如果任务已执行完(处于RAN或CANCELLED状态,且runner为null),则获取成功
  • * 否则获取失败
  • */
  • protected int tryAcquireShared(int ignore) {
  • return innerIsDone() ? 1 : -1;
  • }
  • /**
  • * Implements AQS base release to always signal after setting
  • * final done status by nulling runner thread.
  • * <p>
  • * 尝试释放共享锁
  • * 将runner置为null,直接返回true
  • */
  • protected boolean tryReleaseShared(int ignore) {
  • runner = null;
  • return true;
  • }
  • // 判断任务是否已被取消
  • boolean innerIsCancelled() {
  • return getState() == CANCELLED;
  • }
  • // 判断任务是否执行完毕,必须是处于RAN或CANCELLED状态,并且runner为null
  • boolean innerIsDone() {
  • return ranOrCancelled(getState()) && runner == null;
  • }
  • // 获取结果,可能会进入阻塞等待
  • V innerGet() throws InterruptedException, ExecutionException {
  • // 尝试获取共享锁,如果没获取到,会阻塞等待(可中断)
  • acquireSharedInterruptibly(0);
  • // 已获取到共享锁
  • if (getState() == CANCELLED)
  • // 如果状态是已取消,则抛出异常
  • throw new CancellationException();
  • // 状态不是已取消
  • if (exception != null)
  • // 如果任务状态是已取消,则没有运行结果,直接抛出异常
  • throw new ExecutionException(exception);
  • // 返回运行结果
  • return result;
  • }
  • // 获取结果,可能会进入带有超时机制的阻塞等待
  • V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException {
  • // 尝试获取共享锁,带有超时机制,如果等待超时会抛出TimeoutException异常
  • if (!tryAcquireSharedNanos(0, nanosTimeout))
  • throw new TimeoutException();
  • // 已获取到共享锁
  • if (getState() == CANCELLED)
  • // 如果任务状态是已取消,则没有运行结果,直接抛出异常
  • throw new CancellationException();
  • // 状态不是已取消
  • if (exception != null)
  • // 如果exception不为null,则抛出exception异常
  • throw new ExecutionException(exception);
  • // 返回运行结果
  • return result;
  • }
  • // 设置结果
  • void innerSet(V v) {
  • // 无限循环
  • for (; ; ) {
  • // 获取任务状态
  • int s = getState();
  • // 如果任务状态是RAN,则直接返回,因此此时任务已经结束了
  • if (s == RAN)
  • return;
  • // 如果任务状态是CANCELLED,需要释放共享锁,以将runner设置为null
  • if (s == CANCELLED) {
  • // aggressively release to set runner to null,
  • // in case we are racing with a cancel request
  • // that will try to interrupt runner
  • releaseShared(0);
  • // 无结果返回
  • return;
  • }
  • // 尝试将任务状态设置为RAN
  • if (compareAndSetState(s, RAN)) {
  • // 如果设置成功,则将result设置为v
  • result = v;
  • // 释放共享锁
  • releaseShared(0);
  • // 调用done()钩子方法
  • done();
  • // 返回
  • return;
  • }
  • }
  • }
  • // 设置异常
  • void innerSetException(Throwable t) {
  • // 无限循环
  • for (; ; ) {
  • // 获取任务状态
  • int s = getState();
  • // 如果任务状态是RAN,则直接返回,因此此时任务已经结束了
  • if (s == RAN)
  • return;
  • // 如果任务状态是CANCELLED,需要释放共享锁,以将runner设置为null
  • if (s == CANCELLED) {
  • // aggressively release to set runner to null,
  • // in case we are racing with a cancel request
  • // that will try to interrupt runner
  • releaseShared(0);
  • return;
  • }
  • // 尝试将任务状态设置为RAN
  • if (compareAndSetState(s, RAN)) {
  • // 如果设置成功,则将exception设置为t
  • exception = t;
  • // 释放共享锁
  • releaseShared(0);
  • // 调用done()钩子方法
  • done();
  • // 返回
  • return;
  • }
  • }
  • }
  • // 将任务状态设置为CANCELLED
  • boolean innerCancel(boolean mayInterruptIfRunning) {
  • // 无限循环
  • for (; ; ) {
  • // 获取任务状态
  • int s = getState();
  • // 如果状态是RAN或CANCELLED,表示此时任务已经结束了,无法设置,返回false
  • if (ranOrCancelled(s))
  • return false;
  • // 尝试设置任务状态为CANCELLED,设置成功就跳出for循环
  • if (compareAndSetState(s, CANCELLED))
  • break;
  • }
  • /**
  • * 运行到此处表示设置任务状态为CANCELLED已经成功
  • * 此时如果需要中断任务线程,则将其中断
  • */
  • if (mayInterruptIfRunning) {
  • Thread r = runner;
  • if (r != null)
  • r.interrupt();
  • }
  • // 释放共享锁
  • releaseShared(0);
  • // 调用done()钩子方法
  • done();
  • // 返回true表示设置状态成功
  • return true;
  • }
  • void innerRun() {
  • // 尝试将任务状态从READY转换为RUNNING,如果转换失败就直接返回
  • if (!compareAndSetState(READY, RUNNING))
  • return;
  • // 运行到这里表示任务状态已经转换为RUNNING成功了
  • runner = Thread.currentThread();
  • // 重新检查任务状态是否是RUNNING
  • if (getState() == RUNNING) { // recheck after setting thread
  • // 检查通过
  • V result;
  • try {
  • // 尝试获取任务运行结果
  • result = callable.call();
  • } catch (Throwable ex) {
  • /**
  • * 如果出现异常,就使用exception成员变量进行记录
  • * 此操作会将任务状态设置为RAN
  • */
  • setException(ex);
  • // 直接返回结束运行
  • return;
  • }
  • /**
  • * 运行到这里表明任务执行完成且没有发生异常,
  • * 将结果用result记录
  • */
  • set(result);
  • } else {
  • // 释放共享锁
  • releaseShared(0); // cancel
  • }
  • }
  • boolean innerRunAndReset() {
  • // 尝试将任务状态从READY转换为RUNNING,如果转换失败就直接返回false
  • if (!compareAndSetState(READY, RUNNING))
  • return false;
  • // 运行到这里表示任务状态已经转换为RUNNING成功了
  • try {
  • runner = Thread.currentThread();
  • // 重新检查任务状态是否是RUNNING
  • if (getState() == RUNNING)
  • // 如果是RUNNING,则运行任务,但不记录任务结果
  • callable.call(); // don't set result
  • /**
  • * 运行到这里有两种情况:
  • * 1. 任务运行完成
  • * 2. 任务状态不是RUNNING
  • * 将runner置为null
  • */
  • runner = null;
  • // 如果任务状态为RUNNING,则将任务状态设置为READY
  • return compareAndSetState(RUNNING, READY);
  • } catch (Throwable ex) {
  • /**
  • * 如果出现异常,就使用exception成员变量进行记录
  • * 此操作会将任务状态设置为RAN
  • */
  • setException(ex);
  • // 直接返回false结束运行
  • return false;
  • }
  • }
  • }
  • }