写点什么

☕【Java 技术指南】「并发编程专题」CompletionService 框架基本使用和原理探究(基础篇)

发布于: 11 小时前
☕【Java技术指南】「并发编程专题」CompletionService框架基本使用和原理探究(基础篇)

前提概要

在开发过程中在使用多线程进行并行处理一些事情的时候,大部分场景在处理多线程并行执行任务的时候,可以通过 List 添加 Future 来获取执行结果,有时候我们是不需要获取任务的执行结果的,方便后面引出 ExecutorCompletionService。

CompletionService 的介绍

  • CompletionService 接口是一个独立的接口,并没有扩展 ExecutorService 。 其默认实现类是 ExecutorCompletionService。

  • 接口 CompletionService 的功能是:以异步的方式一边执行未完成的任务,一边记录、处理已完成任务的结果。从而可以将任务的执行与处理任务的执行结果分离开来。

CompletionService 的实现原理

  • CompletionService 就是监视着 Executor 线程池执行的任务,用 BlockingQueue 将完成的任务的结果存储下来。

  • 要不断遍历与每个任务关联的 Future,然后不断去轮询,判断任务是否已经完成,功能比较繁琐。


public interface CompletionService<V> {    Future<V> submit(Callable<V> task);    Future<V> submit(Runnable task, V result);    Future<V> take() throws InterruptedException;    Future<V> poll();    Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;}
复制代码

方法摘要

提交一个 Callable 任务;一旦完成,便可以由 take()、poll()方法获取


Future submit(Callable task):
复制代码


提交一个 Runnable 任务,并指定计算结果;


Future submit(Runnable task, V result):
复制代码


获取并移除表示下一个已完成任务的 Future,如果目前不存在这样的任务,则等待。


Future take() throws InterruptedException
复制代码


获取并移除表示下一个已完成任务的 Future,如果不存在这样的任务,则返回 null。


Future poll()
复制代码


获取并移除表示下一个已完成任务的 Future,如果目前不存在这样的任务,则将等待指定的时间(如果有必要)。


Future poll(long timeout, TimeUnit unit) throws InterruptedException
复制代码




例子,程序提交了多个任务,但只要有一个任务完成并返回一个非空的结果,并可以忽略掉其余的任务。


 void eample(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException {     CompletionService<Result> completionService = new ExecutorCompletionService<Result>(e);     int n = solvers.size();     List<Future<Result>> futures = new ArrayList<Future<Result>>(n);     Result result = null;     try {         //提交多个任务         for (Callable<Result> s : solvers)             futures.add(completionService.submit(s));        //         for (int i = 0; i < n; ++i) {             try {                 //等待获取一个已经完成的任务                 Result r = completionService.take().get();                 //判断返回结果是否为空                 if (r != null) {                     result = r;                     break;                 }             } catch (ExecutionException ignore) {}         }     }     finally {         //取消所有任务         for (Future<Result> f : futures)               f.cancel(true);         }     if (result != null)         use(result); }
复制代码



ExecutorCompletionService 的介绍

  • ExecutorCompletionService 内部有一个先进先出的阻塞队列,用于保存已经执行完成的 Future,通过调用它的 take 方法或 poll 方法可以获取到一个已经执行完成的 Future,进而通过调用 Future 接口实现类的 get 方法获取最终的结果。

  • ExecutorCompletionService 实现了 CompletionService,内部通过 Executor 以及 BlockingQueue 来实现接口提出的规范,ExecutorCompletionService,提交任务后,可以按任务返回结果的先后顺序来获取各任务执行后的结果,该类实现了接口 CompletionService

构造方法

  • 指定一个 Executor 来执行任务,存储完成的任务的完成队列是 LinkedBlockingQueue ;

  • Executor 由调用者传递进来,而 Blocking 可以使用默认的 LinkedBlockingQueue,也可以由调用者传递。


ExecutorCompletionService(Executor executor):
复制代码


指定了任务执行器 Executor 和已完成的任务队列 completionQueue


ExecutorCompletionService(Executor executor, BlockingQueue<Future> completionQueue)
复制代码
实现构造器
public ExecutorCompletionService(Executor executor) {    if (executor == null)        throw new NullPointerException();    this.executor = executor;    this.aes = (executor instanceof AbstractExecutorService) ?        (AbstractExecutorService) executor : null;    this.completionQueue = new LinkedBlockingQueue<Future<V>>();}
复制代码


  • 该接口定义了一系列方法:提交实现了 Callable 或 Runnable 接口的任务,并获取这些任务的结果。

  • 包装后提交任务的 submit()方法,该类还会将提交的任务封装成 QueueingFuture,这样就可以实现 FutureTask.done()方法,以便于在任务执行完毕后,将结果放入阻塞队列中。


public Future<V> submit(Callable<V> task) {        if (task == null) throw new NullPointerException();        RunnableFuture<V> f = newTaskFor(task);        executor.execute(new QueueingFuture(f));        return f;}
复制代码
QueueingFuture 为内部类:

在提交任务时,将任务封装成 QueueingFuture:


private class QueueingFuture extends FutureTask<Void> {        QueueingFuture(RunnableFuture<V> task) {            super(task, null);            this.task = task;        }        protected void done() { completionQueue.add(task); }        private final Future<V> task;}
复制代码


其中,done()方法就是在任务执行完毕后,将任务放入队列中。


  • 在调用 take()、poll()方法时,会从阻塞队列中获取 Future 对象,以取得任务执行的结果。

  • 它继承自 FutureTask,并且重写了 done 方法,其方法把任务放到我们包装线程池创建的堵塞队列里面;就是当任务执行完成后,就会被放到队列里面去了。

  • 调用其 take() 方法,就是阻塞等待,等到的一定是能够获取的结果的 future,然后再调用 get()方法获取执行结果;


最后,如果工作中并行处理任务不需要获取结果的,我们正常使用线程池提交就可以,任务技术只要适合工作的业务场景就是好的。

发布于: 11 小时前阅读数: 7
用户头像

🏆 2021年InfoQ写作平台-签约作者 🏆 2020.03.25 加入

👑【酷爱计算机技术、醉心开发编程、喜爱健身运动、热衷悬疑推理的”极客狂人“】 🏅 【Java技术领域,MySQL技术领域,APM全链路追踪技术及微服务、分布式方向的技术体系等】 “任何足够先进的技术都是魔法“

评论

发布
暂无评论
☕【Java技术指南】「并发编程专题」CompletionService框架基本使用和原理探究(基础篇)