Java多线程 47 - CompletionService详解
发布于 / 2018-12-22
简介:JDK提供了CompletionService接口用于解决该问题。在它的实现类ExecutorCompletionService中,通过维护一个队列保存结束任务的Future,如果有任务结束,任务的Future会保存到队列中,从该队列中一定能拿到任务的返回结果;如果没有已经完成的任务,队列为空,取结果的线程才会进入阻塞等待。
1. 线程池任务执行结果的获取
日常处理中,我们通过会在任务提交到线程池后将返回的任务Future放到一个集合中,然后遍历集合通过Future的get()
相关方法获取执行结果。但如果在遍历过程中get()
方法阻塞,即使位于集合后面的Future已经完成,遍历集合的线程也要继续等待,会影响执行结果的处理效率。好的方案是,在任务执行结束后,返回值能够立即被获取,避免被未完成任务所影响。如下示例:
- package com.coderap.pool;
- import java.util.Random;
- import java.util.concurrent.*;
- import java.util.concurrent.Executors;
- public class ExecuteResultManager {
- public static void main(String[] args) {
- // 队列
- BlockingQueue<Future<String>> futures = new LinkedBlockingQueue<>();
- // 任务提交到线程池执行
- new Thread() {
- @Override
- public void run() {
- // 线程池
- ExecutorService pool = Executors.newCachedThreadPool();
- // 循环添加任务
- for (int i = 0; i < 10; i++) {
- int index = i;
- // 线程池添加任务
- Future<String> future = pool.submit(new Callable<String>() {
- @Override
- public String call() throws Exception {
- Thread.sleep(new Random().nextInt(3000));
- System.out.println("Task " + index + "done");
- return "Task done" + index;
- }
- });
- try {
- // 将添加任务后返回的Future添加到队列中
- futures.put(future);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }.start();
- // 获取任务执行结果
- new Thread() {
- @Override
- public void run() {
- while (true) {
- // 遍历
- try {
- Future<String> future = futures.take();
- String result = future.get();
- System.out.println("Get Result: " + result);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }.start();
- }
- }
运行结果如下:
- Task 7 done
- Task 4 done
- Task 8 done
- Task 5 done
- Task 6 done
- Task 1 done
- Task 9 done
- Task 3 done
- Task 0 done
- Get Result: Task 0 done
- Get Result: Task 1 done
- Task 2 done
- Get Result: Task 2 done
- Get Result: Task 3 done
- Get Result: Task 4 done
- Get Result: Task 5 done
- Get Result: Task 6 done
- Get Result: Task 7 done
- Get Result: Task 8 done
- Get Result: Task 9 done
从运行结果可以看出,其实Task 7等任务都已经执行结束了,但由于Task 0还没结束,因此无法获取它的执行结果,这种结果的处理方式将严重影响程序的吞吐性能。
JDK提供了CompletionService接口用于解决该问题。在它的实现类ExecutorCompletionService中,通过维护一个队列保存结束任务的Future,如果有任务结束,任务的Future会保存到队列中,从该队列中一定能拿到任务的返回结果;如果没有已经完成的任务,队列为空,取结果的线程才会进入阻塞等待。以这种方式可以避免前面提到的问题。
2. CompletionService接口
我们先看下接口CompletionService的定义,如下:
- package java.util.concurrent;
- public interface CompletionService<V> {
- /**
- * Submits a value-returning task for execution and returns a Future
- * representing the pending results of the task. Upon completion,
- * this task may be taken or polled.
- *
- * 提交任务
- *
- * @param task the task to submit
- * @return a Future representing pending completion of the task
- * @throws RejectedExecutionException if the task cannot be
- * scheduled for execution
- * @throws NullPointerException if the task is null
- */
- Future<V> submit(Callable<V> task);
- /**
- * Submits a Runnable task for execution and returns a Future
- * representing that task. Upon completion, this task may be
- * taken or polled.
- *
- * 提交任务
- *
- * @param task the task to submit
- * @param result the result to return upon successful completion
- * @return a Future representing pending completion of the task,
- * and whose <tt>get()</tt> method will return the given
- * result value upon completion
- * @throws RejectedExecutionException if the task cannot be
- * scheduled for execution
- * @throws NullPointerException if the task is null
- */
- Future<V> submit(Runnable task, V result);
- /**
- * Retrieves and removes the Future representing the next
- * completed task, waiting if none are yet present.
- *
- * 取下一个已经结束任务的返回值,如果没有则等待;注意,该方法响应中断
- *
- * @return the Future representing the next completed task
- * @throws InterruptedException if interrupted while waiting
- */
- Future<V> take() throws InterruptedException;
- /**
- * Retrieves and removes the Future representing the next
- * completed task or <tt>null</tt> if none are present.
- *
- * 取下一个已经结束任务的返回值,如果没有返回null
- *
- * @return the Future representing the next completed task, or
- * <tt>null</tt> if none are present
- */
- Future<V> poll();
- /**
- * Retrieves and removes the Future representing the next
- * completed task, waiting if necessary up to the specified wait
- * time if none are yet present.
- *
- * 取下一个已经结束任务的返回值,如果没有最多等待timeout时间
- *
- * @param timeout how long to wait before giving up, in units of
- * <tt>unit</tt>
- * @param unit a <tt>TimeUnit</tt> determining how to interpret the
- * <tt>timeout</tt> parameter
- * @return the Future representing the next completed task or
- * <tt>null</tt> if the specified waiting time elapses
- * before one is present
- * @throws InterruptedException if interrupted while waiting
- */
- Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
- }
该接口很简单,主要定义了任务提交及执行结果获取的相关方法。
3. ExecutorCompletionService类
该类实现了CompletionService,维护一个阻塞队列(默认为LinkedBlockingQueue类型)保存已经完成的任务Future。某个任务执行结束后时会将任务的Future添加到该阻塞队列,阻塞队列按任务的完成顺序保存了任务的Future以便获取。该类的实现如下:
- package java.util.concurrent;
- public class ExecutorCompletionService<V> implements CompletionService<V> {
- // 任务的执行委托给了executor
- private final Executor executor;
- // 如果executor继承自AbstractExecutorService,aes和executor指向同一个对象,否则aes为空
- private final AbstractExecutorService aes;
- // 保存完成任务的Future
- private final BlockingQueue<Future<V>> completionQueue;
- /**
- * FutureTask extension to enqueue upon completion
- * 继承自FutureTask,重写done()方法以扩展保存结果的功能
- * 会在任务结束后将任务的Future加到completionQueue中
- */
- private class QueueingFuture extends FutureTask<Void> {
- QueueingFuture(RunnableFuture<V> task) {
- super(task, null);
- this.task = task;
- }
- // done()是一个钩子方法,会在任务结束时被调用
- protected void done() {
- // 任务结束后将Future加到队列中
- completionQueue.add(task);
- }
- private final Future<V> task;
- }
- // 将任务包装成FutureTask
- private RunnableFuture<V> newTaskFor(Callable<V> task) {
- if (aes == null)
- return new FutureTask<V>(task);
- else
- return aes.newTaskFor(task);
- }
- // 将任务包装成FutureTask
- private RunnableFuture<V> newTaskFor(Runnable task, V result) {
- if (aes == null)
- return new FutureTask<V>(task, result);
- else
- return aes.newTaskFor(task, result);
- }
- /**
- * Creates an ExecutorCompletionService using the supplied
- * executor for base task execution and a
- * {@link LinkedBlockingQueue} as a completion queue.
- *
- * 构造方法
- *
- * @param executor the executor to use 被使用的线程池
- * @throws NullPointerException if executor is {@code null}
- */
- public ExecutorCompletionService(Executor executor) {
- // 检查传入的参数
- if (executor == null)
- throw new NullPointerException();
- // 记录线程池
- this.executor = executor;
- // 如果线程池是AbstractExecutorService会将其用aes进行记录
- this.aes = (executor instanceof AbstractExecutorService) ?
- (AbstractExecutorService) executor : null;
- // 初始化装载完成任务的阻塞队列
- this.completionQueue = new LinkedBlockingQueue<Future<V>>();
- }
- /**
- * Creates an ExecutorCompletionService using the supplied
- * executor for base task execution and the supplied queue as its
- * completion queue.
- *
- * 构造方法,可指定装载任务结果的队列
- *
- * @param executor the executor to use
- * @param completionQueue the queue to use as the completion queue
- * normally one dedicated for use by this service. This
- * queue is treated as unbounded -- failed attempted
- * {@code Queue.add} operations for completed taskes cause
- * them not to be retrievable.
- * @throws NullPointerException if executor or completionQueue are {@code null}
- */
- public ExecutorCompletionService(Executor executor,
- BlockingQueue<Future<V>> completionQueue) {
- // 检查参数
- if (executor == null || completionQueue == null)
- throw new NullPointerException();
- // 记录线程池
- this.executor = executor;
- // 如果线程池是AbstractExecutorService会将其用aes进行记录
- this.aes = (executor instanceof AbstractExecutorService) ?
- (AbstractExecutorService) executor : null;
- // 记录装载任务结果的队列
- this.completionQueue = completionQueue;
- }
- // 提交任务
- public Future<V> submit(Callable<V> task) {
- if (task == null) throw new NullPointerException();
- // 对任务进行包装
- RunnableFuture<V> f = newTaskFor(task);
- // 使用线程池提交任务,提交的任务被QueueingFuture包装了
- executor.execute(new QueueingFuture(f));
- return f;
- }
- // 提交任务
- public Future<V> submit(Runnable task, V result) {
- if (task == null) throw new NullPointerException();
- // 对任务进行包装
- RunnableFuture<V> f = newTaskFor(task, result);
- // 使用线程池提交任务,提交的任务被QueueingFuture包装了
- executor.execute(new QueueingFuture(f));
- return f;
- }
- // 获取任务结果,如果队列为空则阻塞
- public Future<V> take() throws InterruptedException {
- // 从任务结果队列中获取已完成的任务
- return completionQueue.take();
- }
- // 获取任务结果,如果队列为空返回null
- public Future<V> poll() {
- // 从任务结果队列中获取已完成的任务
- return completionQueue.poll();
- }
- // 获取任务结果,如果超时,返回空
- public Future<V> poll(long timeout, TimeUnit unit)
- throws InterruptedException {
- // 从任务结果队列中获取已完成的任务
- return completionQueue.poll(timeout, unit);
- }
- }
ExecutorCompletionService的实现比较简单,它将任务包装为QueueingFuture对象,然后将QueueingFuture作为任务来执行,QueueingFuture继承FutureTask,重写了done()
钩子方法,任务在完成后会调用done()
方法,QueueingFuture任务对象会将任务的Future加到装载执行完成的任务队列completionQueue
中,这样就可以从completionQueue
中及时取得已完成任务的执行结果了,改进示例如下:
- package com.coderap.pool;
- import java.util.Random;
- import java.util.concurrent.*;
- import java.util.concurrent.Executors;
- public class ExecuteResultManager {
- public static void main(String[] args) throws Exception {
- // 经过ExecutorCompletionService包装的线程池
- ExecutorCompletionService<String> executorCompletionService = new ExecutorCompletionService<>(Executors.newCachedThreadPool());
- // 任务提交到线程池执行
- new Thread() {
- @Override
- public void run() {
- // 循环添加任务
- for (int i = 0; i < 10; i++) {
- int index = i;
- // 线程池添加任务
- Future<String> future = executorCompletionService.submit(new Callable<String>() {
- @Override
- public String call() throws Exception {
- Thread.sleep(new Random().nextInt(3000));
- System.out.println("Task " + index + " done");
- return "Task " + index + " done";
- }
- });
- }
- }
- }.start();
- // 获取任务执行结果
- new Thread() {
- @Override
- public void run() {
- while (true) {
- // 遍历
- try {
- Future<String> future = executorCompletionService.take();
- String result = future.get();
- System.out.println("Get Result: " + result);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }.start();
- }
- }
执行结果如下:
- Task 3 done
- Get Result: Task 3 done
- Task 5 done
- Get Result: Task 5 done
- Task 0 done
- Get Result: Task 0 done
- Task 6 done
- Get Result: Task 6 done
- Task 4 done
- Get Result: Task 4 done
- Task 9 done
- Get Result: Task 9 done
- Task 7 done
- Get Result: Task 7 done
- Task 2 done
- Get Result: Task 2 done
- Task 1 done
- Get Result: Task 1 done
- Task 8 done
- Get Result: Task 8 done
从运行结果可以看出,每个任务运行完成后,结果都能及时得到处理,避免了之前损耗吞吐量的情况。
推荐阅读
Java多线程 46 - ScheduledThreadPoolExecutor详解(2)
ScheduledThreadPoolExecutor用于执行周期性或延时性的定时任务,它是在ThreadPoolExe...