Java多线程
Java并发
线程池

Java多线程 47—— CompletionService详解

简介:JDK提供了CompletionService接口用于解决该问题。在它的实现类ExecutorCompletionService中,通过维护一个队列保存结束任务的Future,如果有任务结束,任务的Future会保存到队列中,从该队列中一定能拿到任务的返回结果;如果没有已经完成的任务,队列为空,取结果的线程才会进入阻塞等待。

1. 线程池任务执行结果的获取

日常处理中,我们通过会在任务提交到线程池后将返回的任务Future放到一个集合中,然后遍历集合通过Future的get()相关方法获取执行结果。但如果在遍历过程中get()方法阻塞,即使位于集合后面的Future已经完成,遍历集合的线程也要继续等待,会影响执行结果的处理效率。好的方案是,在任务执行结束后,返回值能够立即被获取,避免被未完成任务所影响。如下示例:

  • package com.coderap.pool;
  • import java.util.Random;
  • import java.util.concurrent.*;
  • import java.util.concurrent.Executors;
  • public class ExecuteResultManager {
  • public static void main(String[] args) {
  • // 队列
  • BlockingQueue<Future<String>> futures = new LinkedBlockingQueue<>();
  • // 任务提交到线程池执行
  • new Thread() {
  • @Override
  • public void run() {
  • // 线程池
  • ExecutorService pool = Executors.newCachedThreadPool();
  • // 循环添加任务
  • for (int i = 0; i < 10; i++) {
  • int index = i;
  • // 线程池添加任务
  • Future<String> future = pool.submit(new Callable<String>() {
  • @Override
  • public String call() throws Exception {
  • Thread.sleep(new Random().nextInt(3000));
  • System.out.println("Task " + index + "done");
  • return "Task done" + index;
  • }
  • });
  • try {
  • // 将添加任务后返回的Future添加到队列中
  • futures.put(future);
  • } catch (InterruptedException e) {
  • e.printStackTrace();
  • }
  • }
  • }
  • }.start();
  • // 获取任务执行结果
  • new Thread() {
  • @Override
  • public void run() {
  • while (true) {
  • // 遍历
  • try {
  • Future<String> future = futures.take();
  • String result = future.get();
  • System.out.println("Get Result: " + result);
  • } catch (Exception e) {
  • e.printStackTrace();
  • }
  • }
  • }
  • }.start();
  • }
  • }

运行结果如下:

  • Task 7 done
  • Task 4 done
  • Task 8 done
  • Task 5 done
  • Task 6 done
  • Task 1 done
  • Task 9 done
  • Task 3 done
  • Task 0 done
  • Get Result: Task 0 done
  • Get Result: Task 1 done
  • Task 2 done
  • Get Result: Task 2 done
  • Get Result: Task 3 done
  • Get Result: Task 4 done
  • Get Result: Task 5 done
  • Get Result: Task 6 done
  • Get Result: Task 7 done
  • Get Result: Task 8 done
  • Get Result: Task 9 done

从运行结果可以看出,其实Task 7等任务都已经执行结束了,但由于Task 0还没结束,因此无法获取它的执行结果,这种结果的处理方式将严重影响程序的吞吐性能。

JDK提供了CompletionService接口用于解决该问题。在它的实现类ExecutorCompletionService中,通过维护一个队列保存结束任务的Future,如果有任务结束,任务的Future会保存到队列中,从该队列中一定能拿到任务的返回结果;如果没有已经完成的任务,队列为空,取结果的线程才会进入阻塞等待。以这种方式可以避免前面提到的问题。

2. CompletionService接口

我们先看下接口CompletionService的定义,如下:

  • package java.util.concurrent;
  • public interface CompletionService<V> {
  • /**
  • * Submits a value-returning task for execution and returns a Future
  • * representing the pending results of the task. Upon completion,
  • * this task may be taken or polled.
  • *
  • * 提交任务
  • *
  • * @param task the task to submit
  • * @return a Future representing pending completion of the task
  • * @throws RejectedExecutionException if the task cannot be
  • * scheduled for execution
  • * @throws NullPointerException if the task is null
  • */
  • Future<V> submit(Callable<V> task);
  • /**
  • * Submits a Runnable task for execution and returns a Future
  • * representing that task. Upon completion, this task may be
  • * taken or polled.
  • *
  • * 提交任务
  • *
  • * @param task the task to submit
  • * @param result the result to return upon successful completion
  • * @return a Future representing pending completion of the task,
  • * and whose <tt>get()</tt> method will return the given
  • * result value upon completion
  • * @throws RejectedExecutionException if the task cannot be
  • * scheduled for execution
  • * @throws NullPointerException if the task is null
  • */
  • Future<V> submit(Runnable task, V result);
  • /**
  • * Retrieves and removes the Future representing the next
  • * completed task, waiting if none are yet present.
  • *
  • * 取下一个已经结束任务的返回值,如果没有则等待;注意,该方法响应中断
  • *
  • * @return the Future representing the next completed task
  • * @throws InterruptedException if interrupted while waiting
  • */
  • Future<V> take() throws InterruptedException;
  • /**
  • * Retrieves and removes the Future representing the next
  • * completed task or <tt>null</tt> if none are present.
  • *
  • * 取下一个已经结束任务的返回值,如果没有返回null
  • *
  • * @return the Future representing the next completed task, or
  • * <tt>null</tt> if none are present
  • */
  • Future<V> poll();
  • /**
  • * Retrieves and removes the Future representing the next
  • * completed task, waiting if necessary up to the specified wait
  • * time if none are yet present.
  • *
  • * 取下一个已经结束任务的返回值,如果没有最多等待timeout时间
  • *
  • * @param timeout how long to wait before giving up, in units of
  • * <tt>unit</tt>
  • * @param unit a <tt>TimeUnit</tt> determining how to interpret the
  • * <tt>timeout</tt> parameter
  • * @return the Future representing the next completed task or
  • * <tt>null</tt> if the specified waiting time elapses
  • * before one is present
  • * @throws InterruptedException if interrupted while waiting
  • */
  • Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
  • }

该接口很简单,主要定义了任务提交及执行结果获取的相关方法。

3. ExecutorCompletionService类

该类实现了CompletionService,维护一个阻塞队列(默认为LinkedBlockingQueue类型)保存已经完成的任务Future。某个任务执行结束后时会将任务的Future添加到该阻塞队列,阻塞队列按任务的完成顺序保存了任务的Future以便获取。该类的实现如下:

  • package java.util.concurrent;
  • public class ExecutorCompletionService<V> implements CompletionService<V> {
  • // 任务的执行委托给了executor
  • private final Executor executor;
  • // 如果executor继承自AbstractExecutorService,aes和executor指向同一个对象,否则aes为空
  • private final AbstractExecutorService aes;
  • // 保存完成任务的Future
  • private final BlockingQueue<Future<V>> completionQueue;
  • /**
  • * FutureTask extension to enqueue upon completion
  • * 继承自FutureTask,重写done()方法以扩展保存结果的功能
  • * 会在任务结束后将任务的Future加到completionQueue中
  • */
  • private class QueueingFuture extends FutureTask<Void> {
  • QueueingFuture(RunnableFuture<V> task) {
  • super(task, null);
  • this.task = task;
  • }
  • // done()是一个钩子方法,会在任务结束时被调用
  • protected void done() {
  • // 任务结束后将Future加到队列中
  • completionQueue.add(task);
  • }
  • private final Future<V> task;
  • }
  • // 将任务包装成FutureTask
  • private RunnableFuture<V> newTaskFor(Callable<V> task) {
  • if (aes == null)
  • return new FutureTask<V>(task);
  • else
  • return aes.newTaskFor(task);
  • }
  • // 将任务包装成FutureTask
  • private RunnableFuture<V> newTaskFor(Runnable task, V result) {
  • if (aes == null)
  • return new FutureTask<V>(task, result);
  • else
  • return aes.newTaskFor(task, result);
  • }
  • /**
  • * Creates an ExecutorCompletionService using the supplied
  • * executor for base task execution and a
  • * {@link LinkedBlockingQueue} as a completion queue.
  • *
  • * 构造方法
  • *
  • * @param executor the executor to use 被使用的线程池
  • * @throws NullPointerException if executor is {@code null}
  • */
  • public ExecutorCompletionService(Executor executor) {
  • // 检查传入的参数
  • if (executor == null)
  • throw new NullPointerException();
  • // 记录线程池
  • this.executor = executor;
  • // 如果线程池是AbstractExecutorService会将其用aes进行记录
  • this.aes = (executor instanceof AbstractExecutorService) ?
  • (AbstractExecutorService) executor : null;
  • // 初始化装载完成任务的阻塞队列
  • this.completionQueue = new LinkedBlockingQueue<Future<V>>();
  • }
  • /**
  • * Creates an ExecutorCompletionService using the supplied
  • * executor for base task execution and the supplied queue as its
  • * completion queue.
  • *
  • * 构造方法,可指定装载任务结果的队列
  • *
  • * @param executor the executor to use
  • * @param completionQueue the queue to use as the completion queue
  • * normally one dedicated for use by this service. This
  • * queue is treated as unbounded -- failed attempted
  • * {@code Queue.add} operations for completed taskes cause
  • * them not to be retrievable.
  • * @throws NullPointerException if executor or completionQueue are {@code null}
  • */
  • public ExecutorCompletionService(Executor executor,
  • BlockingQueue<Future<V>> completionQueue) {
  • // 检查参数
  • if (executor == null || completionQueue == null)
  • throw new NullPointerException();
  • // 记录线程池
  • this.executor = executor;
  • // 如果线程池是AbstractExecutorService会将其用aes进行记录
  • this.aes = (executor instanceof AbstractExecutorService) ?
  • (AbstractExecutorService) executor : null;
  • // 记录装载任务结果的队列
  • this.completionQueue = completionQueue;
  • }
  • // 提交任务
  • public Future<V> submit(Callable<V> task) {
  • if (task == null) throw new NullPointerException();
  • // 对任务进行包装
  • RunnableFuture<V> f = newTaskFor(task);
  • // 使用线程池提交任务,提交的任务被QueueingFuture包装了
  • executor.execute(new QueueingFuture(f));
  • return f;
  • }
  • // 提交任务
  • public Future<V> submit(Runnable task, V result) {
  • if (task == null) throw new NullPointerException();
  • // 对任务进行包装
  • RunnableFuture<V> f = newTaskFor(task, result);
  • // 使用线程池提交任务,提交的任务被QueueingFuture包装了
  • executor.execute(new QueueingFuture(f));
  • return f;
  • }
  • // 获取任务结果,如果队列为空则阻塞
  • public Future<V> take() throws InterruptedException {
  • // 从任务结果队列中获取已完成的任务
  • return completionQueue.take();
  • }
  • // 获取任务结果,如果队列为空返回null
  • public Future<V> poll() {
  • // 从任务结果队列中获取已完成的任务
  • return completionQueue.poll();
  • }
  • // 获取任务结果,如果超时,返回空
  • public Future<V> poll(long timeout, TimeUnit unit)
  • throws InterruptedException {
  • // 从任务结果队列中获取已完成的任务
  • return completionQueue.poll(timeout, unit);
  • }
  • }

ExecutorCompletionService的实现比较简单,它将任务包装为QueueingFuture对象,然后将QueueingFuture作为任务来执行,QueueingFuture继承FutureTask,重写了done()钩子方法,任务在完成后会调用done()方法,QueueingFuture任务对象会将任务的Future加到装载执行完成的任务队列completionQueue中,这样就可以从completionQueue中及时取得已完成任务的执行结果了,改进示例如下:

  • package com.coderap.pool;
  • import java.util.Random;
  • import java.util.concurrent.*;
  • import java.util.concurrent.Executors;
  • public class ExecuteResultManager {
  • public static void main(String[] args) throws Exception {
  • // 经过ExecutorCompletionService包装的线程池
  • ExecutorCompletionService<String> executorCompletionService = new ExecutorCompletionService<>(Executors.newCachedThreadPool());
  • // 任务提交到线程池执行
  • new Thread() {
  • @Override
  • public void run() {
  • // 循环添加任务
  • for (int i = 0; i < 10; i++) {
  • int index = i;
  • // 线程池添加任务
  • Future<String> future = executorCompletionService.submit(new Callable<String>() {
  • @Override
  • public String call() throws Exception {
  • Thread.sleep(new Random().nextInt(3000));
  • System.out.println("Task " + index + " done");
  • return "Task " + index + " done";
  • }
  • });
  • }
  • }
  • }.start();
  • // 获取任务执行结果
  • new Thread() {
  • @Override
  • public void run() {
  • while (true) {
  • // 遍历
  • try {
  • Future<String> future = executorCompletionService.take();
  • String result = future.get();
  • System.out.println("Get Result: " + result);
  • } catch (Exception e) {
  • e.printStackTrace();
  • }
  • }
  • }
  • }.start();
  • }
  • }

执行结果如下:

  • Task 3 done
  • Get Result: Task 3 done
  • Task 5 done
  • Get Result: Task 5 done
  • Task 0 done
  • Get Result: Task 0 done
  • Task 6 done
  • Get Result: Task 6 done
  • Task 4 done
  • Get Result: Task 4 done
  • Task 9 done
  • Get Result: Task 9 done
  • Task 7 done
  • Get Result: Task 7 done
  • Task 2 done
  • Get Result: Task 2 done
  • Task 1 done
  • Get Result: Task 1 done
  • Task 8 done
  • Get Result: Task 8 done

从运行结果可以看出,每个任务运行完成后,结果都能及时得到处理,避免了之前损耗吞吐量的情况。