Java多线程
Java并发
线程池

Java多线程 41 - 线程池基础

简介:从上一篇文章可知,ThreadPoolExecutor是我们最常用的线程池类,在介绍该类之前,我们先研究一下它的父类。

从上一篇文章可知,ThreadPoolExecutor是我们最常用的线程池类,在介绍该类之前,我们先研究一下它的父类。

1. Executor接口介绍

ThreadPoolExecutor继承自AbstractExecutorService,从类名可知该类是一个抽象类,而AbstractExecutorService则实现了ExecutorService接口,ExecutorService接口则继承了Executor接口,我们从顶层的Executor接口开始研究,它的源码如下:

  • package java.util.concurrent;
  • public interface Executor {
  • /**
  • * Executes the given command at some time in the future. The command
  • * may execute in a new thread, in a pooled thread, or in the calling
  • * thread, at the discretion of the <tt>Executor</tt> implementation.
  • *
  • * 用于执行给定的Runnable命令,
  • * 该命令可能会由一个新线程执行,也有可能在一个线程池内的线程执行或者由一个正在被调用的线程中执行
  • * 具体情况由实现类自行决定
  • * 当Runnable命令被拒绝时会抛出异常RejectedExecutionException
  • *
  • * @param command the runnable task
  • * @throws RejectedExecutionException if this task cannot be
  • * accepted for execution.
  • * @throws NullPointerException if command is null
  • */
  • void execute(Runnable command);
  • }

Executor接口非常简单,它只有一个方法execute(Runnable)提供给子类实现,该方法用于执行传入的Runnable命令。

2. ExecutorService接口介绍

ExecutorService接口继承自Executor接口,声明了一些主要的操作方法,源码如下:

  • package java.util.concurrent;
  • import java.util.List;
  • import java.util.Collection;
  • import java.security.PrivilegedAction;
  • import java.security.PrivilegedExceptionAction;
  • public interface ExecutorService extends Executor {
  • /**
  • * Initiates an orderly shutdown in which previously submitted
  • * tasks are executed, but no new tasks will be accepted.
  • * Invocation has no additional effect if already shut down.
  • *
  • * <p>This method does not wait for previously submitted tasks to
  • * complete execution. Use {@link #awaitTermination awaitTermination}
  • * to do that.
  • *
  • * 启动一次顺序关闭,执行以前提交的任务,但不接受新任务
  • *
  • * @throws SecurityException if a security manager exists and
  • * shutting down this ExecutorService may manipulate
  • * threads that the caller is not permitted to modify
  • * because it does not hold {@link
  • * java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
  • * or the security manager's <tt>checkAccess</tt> method
  • * denies access.
  • */
  • void shutdown();
  • /**
  • * Attempts to stop all actively executing tasks, halts the
  • * processing of waiting tasks, and returns a list of the tasks
  • * that were awaiting execution.
  • *
  • * <p>This method does not wait for actively executing tasks to
  • * terminate. Use {@link #awaitTermination awaitTermination} to
  • * do that.
  • *
  • * <p>There are no guarantees beyond best-effort attempts to stop
  • * processing actively executing tasks. For example, typical
  • * implementations will cancel via {@link Thread#interrupt}, so any
  • * task that fails to respond to interrupts may never terminate.
  • *
  • * 试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表
  • *
  • * @return list of tasks that never commenced execution
  • * @throws SecurityException if a security manager exists and
  • * shutting down this ExecutorService may manipulate
  • * threads that the caller is not permitted to modify
  • * because it does not hold {@link
  • * java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
  • * or the security manager's <tt>checkAccess</tt> method
  • * denies access.
  • */
  • List<Runnable> shutdownNow();
  • /**
  • * Returns <tt>true</tt> if this executor has been shut down.
  • *
  • * 如果此执行程序已关闭,则返回true
  • *
  • * @return <tt>true</tt> if this executor has been shut down
  • */
  • boolean isShutdown();
  • /**
  • * Returns <tt>true</tt> if all tasks have completed following shut down.
  • * Note that <tt>isTerminated</tt> is never <tt>true</tt> unless
  • * either <tt>shutdown</tt> or <tt>shutdownNow</tt> was called first.
  • *
  • * 如果关闭后所有任务都已完成,则返回true
  • *
  • * @return <tt>true</tt> if all tasks have completed following shut down
  • */
  • boolean isTerminated();
  • /**
  • * Blocks until all tasks have completed execution after a shutdown
  • * request, or the timeout occurs, or the current thread is
  • * interrupted, whichever happens first.
  • *
  • * 请求关闭、发生超时或者当前线程中断,无论哪一个首先发生之后,都将导致阻塞,直到所有任务完成执行
  • *
  • * @param timeout the maximum time to wait
  • * @param unit the time unit of the timeout argument
  • * @return <tt>true</tt> if this executor terminated and
  • * <tt>false</tt> if the timeout elapsed before termination
  • * @throws InterruptedException if interrupted while waiting
  • */
  • boolean awaitTermination(long timeout, TimeUnit unit)
  • throws InterruptedException;
  • /**
  • * Submits a value-returning task for execution and returns a
  • * Future representing the pending results of the task. The
  • * Future's <tt>get</tt> method will return the task's result upon
  • * successful completion.
  • *
  • * <p>
  • * If you would like to immediately block waiting
  • * for a task, you can use constructions of the form
  • * <tt>result = exec.submit(aCallable).get();</tt>
  • *
  • * <p> Note: The {@link Executors} class includes a set of methods
  • * that can convert some other common closure-like objects,
  • * for example, {@link java.security.PrivilegedAction} to
  • * {@link Callable} form so they can be submitted.
  • *
  • * 提交一个返回值的任务用于执行,返回一个表示任务的未决结果的Future
  • *
  • * @param task the task to submit
  • * @return a Future representing pending completion of the task
  • * @throws RejectedExecutionException if the task cannot be
  • * scheduled for execution
  • * @throws NullPointerException if the task is null
  • */
  • <T> Future<T> submit(Callable<T> task);
  • /**
  • * Submits a Runnable task for execution and returns a Future
  • * representing that task. The Future's <tt>get</tt> method will
  • * return the given result upon successful completion.
  • *
  • * 提交一个Runnable任务用于执行,并返回一个表示该任务的Future
  • *
  • * @param task the task to submit
  • * @param result the result to return
  • * @return a Future representing pending completion of the task
  • * @throws RejectedExecutionException if the task cannot be
  • * scheduled for execution
  • * @throws NullPointerException if the task is null
  • */
  • <T> Future<T> submit(Runnable task, T result);
  • /**
  • * Submits a Runnable task for execution and returns a Future
  • * representing that task. The Future's <tt>get</tt> method will
  • * return <tt>null</tt> upon <em>successful</em> completion.
  • *
  • * 提交一个Runnable任务用于执行,并返回一个表示该任务的Future
  • *
  • * @param task the task to submit
  • * @return a Future representing pending completion of the task
  • * @throws RejectedExecutionException if the task cannot be
  • * scheduled for execution
  • * @throws NullPointerException if the task is null
  • */
  • Future<?> submit(Runnable task);
  • /**
  • * Executes the given tasks, returning a list of Futures holding
  • * their status and results when all complete.
  • * {@link Future#isDone} is <tt>true</tt> for each
  • * element of the returned list.
  • * Note that a <em>completed</em> task could have
  • * terminated either normally or by throwing an exception.
  • * The results of this method are undefined if the given
  • * collection is modified while this operation is in progress.
  • *
  • * 执行给定的任务,当所有任务完成时,返回保持任务状态和结果的Future列表
  • *
  • * @param tasks the collection of tasks
  • * @return A list of Futures representing the tasks, in the same
  • * sequential order as produced by the iterator for the
  • * given task list, each of which has completed.
  • * @throws InterruptedException if interrupted while waiting, in
  • * which case unfinished tasks are cancelled.
  • * @throws NullPointerException if tasks or any of its elements are <tt>null</tt>
  • * @throws RejectedExecutionException if any task cannot be
  • * scheduled for execution
  • */
  • <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
  • throws InterruptedException;
  • /**
  • * Executes the given tasks, returning a list of Futures holding
  • * their status and results
  • * when all complete or the timeout expires, whichever happens first.
  • * {@link Future#isDone} is <tt>true</tt> for each
  • * element of the returned list.
  • * Upon return, tasks that have not completed are cancelled.
  • * Note that a <em>completed</em> task could have
  • * terminated either normally or by throwing an exception.
  • * The results of this method are undefined if the given
  • * collection is modified while this operation is in progress.
  • *
  • * 执行给定的任务,当所有任务完成或超时期满时(无论哪个首先发生),返回保持任务状态和结果的Future列表
  • *
  • * @param tasks the collection of tasks
  • * @param timeout the maximum time to wait
  • * @param unit the time unit of the timeout argument
  • * @return a list of Futures representing the tasks, in the same
  • * sequential order as produced by the iterator for the
  • * given task list. If the operation did not time out,
  • * each task will have completed. If it did time out, some
  • * of these tasks will not have completed.
  • * @throws InterruptedException if interrupted while waiting, in
  • * which case unfinished tasks are cancelled
  • * @throws NullPointerException if tasks, any of its elements, or
  • * unit are <tt>null</tt>
  • * @throws RejectedExecutionException if any task cannot be scheduled
  • * for execution
  • */
  • <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
  • long timeout, TimeUnit unit)
  • throws InterruptedException;
  • /**
  • * Executes the given tasks, returning the result
  • * of one that has completed successfully (i.e., without throwing
  • * an exception), if any do. Upon normal or exceptional return,
  • * tasks that have not completed are cancelled.
  • * The results of this method are undefined if the given
  • * collection is modified while this operation is in progress.
  • *
  • * 执行给定的任务,如果某个任务已成功完成(也就是未抛出异常),则返回其结果
  • *
  • * @param tasks the collection of tasks
  • * @return the result returned by one of the tasks
  • * @throws InterruptedException if interrupted while waiting
  • * @throws NullPointerException if tasks or any element task
  • * subject to execution is <tt>null</tt>
  • * @throws IllegalArgumentException if tasks is empty
  • * @throws ExecutionException if no task successfully completes
  • * @throws RejectedExecutionException if tasks cannot be scheduled
  • * for execution
  • */
  • <T> T invokeAny(Collection<? extends Callable<T>> tasks)
  • throws InterruptedException, ExecutionException;
  • /**
  • * Executes the given tasks, returning the result
  • * of one that has completed successfully (i.e., without throwing
  • * an exception), if any do before the given timeout elapses.
  • * Upon normal or exceptional return, tasks that have not
  • * completed are cancelled.
  • * The results of this method are undefined if the given
  • * collection is modified while this operation is in progress.
  • *
  • * 执行给定的任务,如果某个任务已成功完成(也就是未抛出异常),则返回其结果
  • *
  • * @param tasks the collection of tasks
  • * @param timeout the maximum time to wait
  • * @param unit the time unit of the timeout argument
  • * @return the result returned by one of the tasks.
  • * @throws InterruptedException if interrupted while waiting
  • * @throws NullPointerException if tasks, or unit, or any element
  • * task subject to execution is <tt>null</tt>
  • * @throws TimeoutException if the given timeout elapses before
  • * any task successfully completes
  • * @throws ExecutionException if no task successfully completes
  • * @throws RejectedExecutionException if tasks cannot be scheduled
  • * for execution
  • */
  • <T> T invokeAny(Collection<? extends Callable<T>> tasks,
  • long timeout, TimeUnit unit)
  • throws InterruptedException, ExecutionException, TimeoutException;
  • }

3. ScheduledExecutorService接口介绍

ScheduledExecutorService接口是用于声明定时调度任务的接口方法,主要有四种,源码如下:

  • package java.util.concurrent;
  • import java.util.concurrent.atomic.*;
  • import java.util.*;
  • public interface ScheduledExecutorService extends ExecutorService {
  • // 创建延迟调度任务,会在delay时间之后开始执行任务
  • public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
  • // 创建延迟调度任务,会在delay时间之后开始执行任务
  • public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
  • /**
  • * 创建延迟周期性调度任务,第一次任务会在initialDelay时刻执行,
  • * 然后每次执行完看执行耗时是否达到period时间段,
  • * 如果达到就直接执行下一次任务,如果小于就等达到period再执行下一次任务
  • */
  • public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
  • // 创建延迟调度任务,第一次任务在initialDelay时刻执行,然后每隔delay时间执行下一次任务
  • public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
  • }

ScheduledExecutorService在后面的定时任务调度器ScheduledThreadPoolExecutor中会详细介绍。

4. AbstractExecutorService类介绍

我们先观察AbstractExecutorService的源码注释:

  • package java.util.concurrent;
  • import java.util.*;
  • public abstract class AbstractExecutorService implements ExecutorService {
  • /**
  • * Returns a <tt>RunnableFuture</tt> for the given runnable and default
  • * value.
  • *
  • * 通过给定的Runnable和默认结果值返回一个RunnableFuture对象
  • *
  • * @param runnable the runnable task being wrapped
  • * @param value the default value for the returned future
  • * @return a <tt>RunnableFuture</tt> which when run will run the
  • * underlying runnable and which, as a <tt>Future</tt>, will yield
  • * the given value as its result and provide for cancellation of
  • * the underlying task.
  • * @since 1.6
  • */
  • protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
  • return new FutureTask<T>(runnable, value);
  • }
  • /**
  • * Returns a <tt>RunnableFuture</tt> for the given callable task.
  • *
  • * 通过给定的Runnable返回一个RunnableFuture对象
  • *
  • * @param callable the callable task being wrapped
  • * @return a <tt>RunnableFuture</tt> which when run will call the
  • * underlying callable and which, as a <tt>Future</tt>, will yield
  • * the callable's result as its result and provide for
  • * cancellation of the underlying task.
  • * @since 1.6
  • */
  • protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
  • return new FutureTask<T>(callable);
  • }
  • /**
  • * 将传入的Runnable包装为一个RunnableFuture对象,交由execute(Runnable)方法处理
  • *
  • * @throws RejectedExecutionException {@inheritDoc}
  • * @throws NullPointerException {@inheritDoc}
  • */
  • public Future<?> submit(Runnable task) {
  • if (task == null) throw new NullPointerException();
  • RunnableFuture<Void> ftask = newTaskFor(task, null);
  • execute(ftask);
  • return ftask;
  • }
  • /**
  • * 将传入的Runnable和默认结果值包装为一个RunnableFuture对象,交由execute(Runnable)方法处理
  • *
  • * @throws RejectedExecutionException {@inheritDoc}
  • * @throws NullPointerException {@inheritDoc}
  • */
  • public <T> Future<T> submit(Runnable task, T result) {
  • if (task == null) throw new NullPointerException();
  • RunnableFuture<T> ftask = newTaskFor(task, result);
  • execute(ftask);
  • return ftask;
  • }
  • /**
  • * 将传入的Callable包装为一个RunnableFuture对象,交由execute(Runnable)方法处理
  • *
  • * @throws RejectedExecutionException {@inheritDoc}
  • * @throws NullPointerException {@inheritDoc}
  • */
  • public <T> Future<T> submit(Callable<T> task) {
  • if (task == null) throw new NullPointerException();
  • RunnableFuture<T> ftask = newTaskFor(task);
  • execute(ftask);
  • return ftask;
  • }
  • /**
  • * the main mechanics of invokeAny.
  • */
  • private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos)
  • throws InterruptedException, ExecutionException, TimeoutException {
  • // 检查传入的tasks
  • if (tasks == null)
  • throw new NullPointerException();
  • // 获取tasks集合的大小
  • int ntasks = tasks.size();
  • // tasks集合大小不能为0,即tasks里面应该至少有一个Callable对象
  • if (ntasks == 0)
  • throw new IllegalArgumentException();
  • // 根据tasks的大小创建一个装载Future的ArrayList
  • List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
  • // 将本对象作为Executor创建ExecutorCompletionService对象
  • ExecutorCompletionService<T> ecs = new ExecutorCompletionService<T>(this);
  • // For efficiency, especially in executors with limited
  • // parallelism, check to see if previously submitted tasks are
  • // done before submitting more of them. This interleaving
  • // plus the exception mechanics account for messiness of main
  • // loop.
  • try {
  • // Record exceptions so that if we fail to obtain any
  • // result, we can throw the last exception we got.
  • // 执行异常的记录对象
  • ExecutionException ee = null;
  • // 当前时间
  • long lastTime = timed ? System.nanoTime() : 0;
  • // 获取tasks的迭代器
  • Iterator<? extends Callable<T>> it = tasks.iterator();
  • // Start one task for sure; the rest incrementally
  • // 使用ecs提交task任务,并将结果添加到futures数组
  • futures.add(ecs.submit(it.next()));
  • // tasks数量减一
  • --ntasks;
  • // 设置正在执行的task任务数为1
  • int active = 1;
  • for (;;) {
  • // 获取ecs中第一个执行完任务得到的Future结果对象(非阻塞)
  • Future<T> f = ecs.poll();
  • // 检查结果对象是否为null
  • if (f == null) {
  • // 结果对象为null,代表该task还没有执行完;检查还有没有剩余的task
  • if (ntasks > 0) {
  • // 还有剩余的task,tasks数量减一
  • --ntasks;
  • // 使用ecs提交下一个task任务,并将结果添加到futures数组
  • futures.add(ecs.submit(it.next()));
  • // 正在执行的task任务数加1
  • ++active;
  • } else if (active == 0)
  • // 如果正在执行的task任务数为0,则跳出循环
  • break;
  • else if (timed) {
  • // 如果传入了超时时间,则根据超时时间从ecs中获取第一个执行完任务得到的Future结果对象
  • f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
  • // 获取的Future结果对象为null,直接抛出超时异常
  • if (f == null)
  • throw new TimeoutException();
  • // 计算超时时间
  • long now = System.nanoTime();
  • nanos -= now - lastTime;
  • lastTime = now;
  • } else
  • // 使用take()方法获取(阻塞)
  • f = ecs.take();
  • }
  • if (f != null) {
  • // 结果对象不为null,代表该task执行完了,将正在执行的task任务数减1
  • --active;
  • try {
  • // 获取执行结果并返回
  • return f.get();
  • } catch (ExecutionException eex) {
  • // 记录异常
  • ee = eex;
  • } catch (RuntimeException rex) {
  • // 记录异常
  • ee = new ExecutionException(rex);
  • }
  • }
  • }
  • // 如果记录的异常不为null,则抛出
  • if (ee == null)
  • ee = new ExecutionException();
  • throw ee;
  • } finally {
  • // 取消所有没有执行的task任务
  • for (Future<T> f : futures)
  • f.cancel(true);
  • }
  • }
  • public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
  • throws InterruptedException, ExecutionException {
  • try {
  • // 调用doInvokeAny()方法
  • return doInvokeAny(tasks, false, 0);
  • } catch (TimeoutException cannotHappen) {
  • assert false;
  • return null;
  • }
  • }
  • public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
  • throws InterruptedException, ExecutionException, TimeoutException {
  • // 调用doInvokeAny()方法
  • return doInvokeAny(tasks, true, unit.toNanos(timeout));
  • }
  • public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
  • throws InterruptedException {
  • // 检查参数
  • if (tasks == null)
  • throw new NullPointerException();
  • // 根据tasks大小创建一个装载Future对象的ArrayList
  • List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
  • boolean done = false;
  • try {
  • // 循环遍历tasks
  • for (Callable<T> t : tasks) {
  • // 将遍历到的task包装为一个RunnableFuture对象f
  • RunnableFuture<T> f = newTaskFor(t);
  • // 将f放入futures中
  • futures.add(f);
  • // 执行f
  • execute(f);
  • }
  • // 遍历futures
  • for (Future<T> f : futures) {
  • // 如果遍历到的Future还没完成,就使用get()进行获取(阻塞)
  • if (!f.isDone()) {
  • try {
  • f.get();
  • } catch (CancellationException ignore) {
  • } catch (ExecutionException ignore) {
  • }
  • }
  • }
  • // 走到这里说明所有的Future都完成了
  • done = true;
  • // 返回futures
  • return futures;
  • } finally {
  • // 如果还有Future没有完成,就遍历futures将它们取消
  • if (!done)
  • for (Future<T> f : futures)
  • f.cancel(true);
  • }
  • }
  • public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
  • throws InterruptedException {
  • // 检查参数
  • if (tasks == null || unit == null)
  • throw new NullPointerException();
  • // 计算超时时间
  • long nanos = unit.toNanos(timeout);
  • // 根据tasks大小创建一个装载Future对象的ArrayList
  • List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
  • boolean done = false;
  • try {
  • // 循环遍历tasks
  • for (Callable<T> t : tasks)
  • // 将它们包装为RunnableFuture对象,然后放入futures中
  • futures.add(newTaskFor(t));
  • // 获取当前系统时间
  • long lastTime = System.nanoTime();
  • // Interleave time checks and calls to execute in case
  • // executor doesn't have any/much parallelism.
  • // 迭代futures
  • Iterator<Future<T>> it = futures.iterator();
  • while (it.hasNext()) {
  • // 使用execute(Runnable)方法执行下一个Future
  • execute((Runnable)(it.next()));
  • // 计算超时时间
  • long now = System.nanoTime();
  • nanos -= now - lastTime;
  • lastTime = now;
  • // 如果超时了,返回futures
  • if (nanos <= 0)
  • return futures;
  • }
  • // 遍历futures
  • for (Future<T> f : futures) {
  • // 如果Future没有完成执行
  • if (!f.isDone()) {
  • // 且超时了
  • if (nanos <= 0)
  • // 返回futures
  • return futures;
  • try {
  • // 否则没超时就尝试使用get(0)方法获取
  • f.get(nanos, TimeUnit.NANOSECONDS);
  • } catch (CancellationException ignore) {
  • } catch (ExecutionException ignore) {
  • } catch (TimeoutException toe) {
  • return futures;
  • }
  • // 计算超时时间
  • long now = System.nanoTime();
  • nanos -= now - lastTime;
  • lastTime = now;
  • }
  • }
  • // 标识所有Future执行完成了
  • done = true;
  • // 返回futures
  • return futures;
  • } finally {
  • // 如果还有Future没有完成,就遍历futures将它们取消
  • if (!done)
  • for (Future<T> f : futures)
  • f.cancel(true);
  • }
  • }
  • }

ExecutorService中的下面几个方法值得关注:

  1. doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos)实现了附带超时等待Callable任务调用方法,它内部通过ExecutorCompletionService实例来实现对所有提交的任务执行完成时返回结果的存储和获取;ExecutorCompletionService实现了CompletionService接口,内部拥有一个Executor类型的成员变量executor,在构造方法中它将传入的Executor对象赋值给成员变量executor
  • public ExecutorCompletionService(Executor executor) {
  • if (executor == null)
  • throw new NullPointerException();
  • // 引用传入的Executor对象
  • this.executor = executor;
  • this.aes = (executor instanceof AbstractExecutorService) ?
  • (AbstractExecutorService) executor : null;
  • this.completionQueue = new LinkedBlockingQueue<Future<V>>();
  • }
  • public Future<V> submit(Callable<V> task) {
  • if (task == null) throw new NullPointerException();
  • // 将Callable任务包装为一个RunnableFuture
  • RunnableFuture<V> f = newTaskFor(task);
  • // 使用构造方法传入的Executor的execute()方法提交任务
  • executor.execute(new QueueingFuture(f));
  • return f;
  • }
  1. invokeAny(Collection<? extends Callable<T>> tasks)相关的方法没有对Callable任务进行显示地包装,因为在ExecutorCompletionService的submit(Callable<V> task)方法提交任务时,实际上调用了newTaskFor(Callable<V> task)方法将Callable任务包装为RunnableFuture对象,然后调用了本对象的execute(Runnable command)方法提交任务,并返回异步计算结果对象;ExecutorCompletionService的submit(Callable<V> task)方法源码如下:
  • public Future<V> submit(Callable<V> task) {
  • if (task == null) throw new NullPointerException();
  • RunnableFuture<V> f = newTaskFor(task);
  • executor.execute(new QueueingFuture(f));
  • return f;
  • }
  1. invokeAny(Collection<? extends Callable<T>> tasks)相关的方法使用了ExecutorCompletionService对象对任务执行完成时结果的存取,隐含了对任务是否完成的判断;所以对返回结果就不用通过isDone()方法判断是否任务已经完成了;ExecutorCompletionService部分源码如下:
  • // 用来存储已经完成任务的阻塞队列
  • private final BlockingQueue<Future<V>> completionQueue;
  • // 拓展FutureTask在完成时将任务入队的功能
  • private class QueueingFuture extends FutureTask<Void> {
  • QueueingFuture(RunnableFuture<V> task) {
  • super(task, null);
  • this.task = task;
  • }
  • // 此方法在FutureTask任务run()方法完成时调用,这里是将完成的任务入队
  • protected void done() {
  • completionQueue.add(task);
  • }
  • private final Future<V> task;
  • }
  • public Future<V> poll() {
  • return completionQueue.poll();
  • }
  • public Future<V> poll(long timeout, TimeUnit unit)
  • throws InterruptedException {
  • return completionQueue.poll(timeout, unit);
  • }
  1. invokeAll(Collection<? extends Callable<T>> tasks)方法也实现了无超时和超时两个版本;无超时版本首先将所有任务包装后提交给本对象的执行器(调用execute(Runnable)方法)执行,并将返回的结果添加到futures集合中,然后对futures集合进行遍历判断是否已经完成,如果没有完成则使用get()方法阻塞获取,等待结果的返回;无超时版本对异常进行了封装处理,并在finally代码块对是否全部完成的标记变量done进行检查,取消已经提交但没有完成执行的任务。超时版本和无超时版本基本一致,但是加了超时逻辑。在添加执行任务时超时判断,如果超时则立刻返回futures集合,同时每次对结果进行判断时都会进行超时判断。