写点什么

☕【Java 深层系列】「并发编程系列」让我们一起探索一下 CompletionService 的技术原理和使用指南

作者:浩宇天尚
  • 2022 年 1 月 26 日
  • 本文字数:6109 字

    阅读完需:约 20 分钟

☕【Java深层系列】「并发编程系列」让我们一起探索一下CompletionService的技术原理和使用指南

CompletionService 基本介绍

  • CompletionService 与 ExecutorService 类似都可以用来执行线程池的任务,ExecutorService 继承了 Executor 接口,而 CompletionService 则是一个接口。

  • 主要是 Executor 的特性决定的,Executor 框架不能完全保证任务执行的异步性,那就是如果需要实现任务(task)的异步性,只要为每个 task 创建一个线程就实现了任务的异步性。


在高并发的情况下,不断创建线程异步执行任务将会极大增大线程创建的开销、造成极大的资源消耗和影响系统的稳定性。另外,Executor 框架还支持同步任务的执行,就是在 execute 方法中调用提交任务的 run()方法就属于同步调用,当我们采用异步的时候,需要进行的就是获取 Future 对象,之后在需要使用的时候 get 出来结果即可。

异步调用判断机制

一般情况下,如果需要判断任务是否完成,思路是得到 Future 列表的每个 Future,然后反复调用其 get 方法,并将 timeout 参数设为 0,从而通过轮询的方式判断任务是否完成。为了更精确实现任务的异步执行以及更简便的完成任务的异步执行,可以使用 CompletionService

CompletionService 实现原理

CompletionService 实际上可以看做是 Executor 和 BlockingQueue 的结合体。CompletionService 在接收到要执行的任务时,通过类似 BlockingQueue 的 put 和 take 获得任务执行的结果。CompletionService 的一个实现是 ExecutorCompletionService,ExecutorCompletionService 把具体的计算任务交给 Executor 完成。

QueueingFuture 的源码如下

  • ExecutorCompletionService 在构造函数中会创建一个 BlockingQueue(使用的基于链表的无界队列 LinkedBlockingQueue),该 BlockingQueue 的作用是保存 Executor 执行的结果。当计算完成时,调用 FutureTask 的 done 方法。

  • 当提交一个任务到 ExecutorCompletionService 时,首先将任务包装成 QueueingFuture,它是 FutureTask 的一个子类,然后改写 FutureTask 的 done 方法,之后把 Executor 执行的计算结果放入 BlockingQueue 中。


   private class QueueingFuture extends FutureTask<Void> {       QueueingFuture(RunnableFuture<V> task) {           super(task, null);           this.task = task;       }       protected void done() { completionQueue.add(task); }       private final Future<V> task;   }
复制代码


CompletionService 将提交的任务转化为 QueueingFuture,并且覆盖了 done 方法,在 done 方法中就是将任务加入任务队列中。

使用 ExecutorService 实现任务

比如:电商中加载商品详情这一操作,因为商品属性的多样性,将商品的图片显示与商品简介的显示设为两个独立执行的任务。


另外,由于商品的图片可能有许多张,所以图片的显示往往比简介显示更慢。这个时候异步执行能够在一定程度上加快执行的速度提高系统的性能。


public class DisplayProductInfoWithExecutorService {    //线程池    private final ExecutorService executorService = Executors.newFixedThreadPool(2);    //日期格式器    private final DateFormat format = new SimpleDateFormat("HH:mm:ss");    // 由于可能商品的图片可能会有很多张,所以显示商品的图片往往会有一定的延迟    // 除了商品的详情外还包括商品简介等信息的展示,由于这里信息主要的是文字为    // 主,所以能够比图片更快显示出来。下面的代码就以执行这两个任务为主线,完    // 成这两个任务的执行。由于这两个任务的执行存在较大差距,所以想到的第一个    // 思路就是异步执行,首先执行图像的下载任务,之后(不会很久)开始执行商品    // 简介信息的展示,如果网络足够好,图片又不是很大的情况下,可能在开始展示    // 商品的时候图像就下载完成了,所以自然想到使用Executor和Callable完成异    // 步任务的执行。     public void renderProductDetail() {        final List<ProductInfo>  productInfos = loadProductImages();        //异步下载图像的任务        Callable<List<ProductImage>> task = new Callable<List<ProductImage>>() {            @Override            public List<ProductImage> call() throws Exception {                List<ProductImage> imageList = new ArrayList<>();                for (ProductInfo info : productInfos){                    imageList.add(info.getImage());                }                return imageList;            }        };        //提交给线程池执行        Future<List<ProductImage>> listFuture = executorService.submit(task);        //展示商品简介的信息        renderProductText(productInfos);        try {            //显示商品的图片            List<ProductImage> imageList = listFuture.get();            renderProductImage(imageList);        } catch (InterruptedException e) {            // 如果显示图片发生中断异常则重新设置线程的中断状态            // 这样做可以让wait中的线程唤醒            Thread.currentThread().interrupt();            // 同时取消任务的执行,参数false表示在线程在执行不中断            listFuture.cancel(true);        } catch (ExecutionException e) {            try {                throw new Throwable(e.getCause());            } catch (Throwable throwable) {                throwable.printStackTrace();            }        }     }     private void renderProductImage(List<ProductImage> imageList ) {        for (ProductImage image : imageList){            try {                TimeUnit.SECONDS.sleep(1);            } catch (InterruptedException e) {                e.printStackTrace();            }        }        System.out.println(Thread.currentThread().getName() + " display products images! "            + format.format(new Date()));    }     private void renderProductText(List<ProductInfo> productInfos) {        for (ProductInfo info : productInfos){            try {                Thread.sleep(50);            } catch (InterruptedException e) {                e.printStackTrace();            }        }        System.out.println(Thread.currentThread().getName() + " display products description! "            + format.format(new Date()));    }     private List<ProductInfo> loadProductImages() {        List<ProductInfo> list = new ArrayList<>();        try {            TimeUnit.SECONDS.sleep(5);        } catch (InterruptedException e) {            e.printStackTrace();        }        ProductInfo info = new ProductInfo();        info.setImage(new ProductImage());        list.add(info);        System.out.println(Thread.currentThread().getName() + " load products info! "                + format.format(new Date()));        return list;    }     /**     * 商品     */    private static class ProductInfo{        private ProductImage image;         public ProductImage getImage() {            return image;        }         public void setImage(ProductImage image) {            this.image = image;        }    }     private static class ProductImage{}     public static void main(String[] args){        DisplayProductInfoWithExecutorService cd = new DisplayProductInfoWithExecutorService();        cd.renderProductDetail();        System.exit(0);    }}
复制代码

CompletionService 实现任务

使用 CompletionService 的一大改进就是把多个图片的加载分发给多个工作单元进行处理,这样通过分发的方式就缩小了商品图片的加载与简介信息的加载的速度之间的差距,让这些小任务在线程池中执行,这样就大大降低了下载所有图片的时间,所以在这个时候可以认为这两个任务是同构的。使用 CompletionService 完成最合适不过了。


public class DisplayProductInfoWithCompletionService {     //线程池    private final ExecutorService executorService;    //日期格式器    private final DateFormat format = new SimpleDateFormat("HH:mm:ss");     public DisplayProductInfoWithCompletionService(ExecutorService executorService) {        this.executorService = executorService;    }     public void renderProductDetail() {        final List<ProductInfo> productInfos = loadProductInfos();        CompletionService<ProductImage> completionService = new ExecutorCompletionService<ProductImage>(executorService);        //为每个图像的下载建立一个工作任务        for (final ProductInfo info : productInfos) {            completionService.submit(new Callable<ProductImage>() {                @Override                public ProductImage call() throws Exception {                    return info.getImage();                }            });        }        //展示商品简介的信息        renderProductText(productInfos);        try {            //显示商品图片            for (int i = 0, n = productInfos.size(); i < n; i++){                Future<ProductImage> imageFuture = completionService.take();                ProductImage image = imageFuture.get();                renderProductImage(image);            }        } catch (InterruptedException e) {            // 如果显示图片发生中断异常则重新设置线程的中断状态            // 这样做可以让wait中的线程唤醒            Thread.currentThread().interrupt();        } catch (ExecutionException e) {            try {                throw new Throwable(e.getCause());            } catch (Throwable throwable) {                throwable.printStackTrace();            }        }    }    private void renderProductImage(ProductImage image) {        try {            Thread.sleep(100);        } catch (InterruptedException e) {            e.printStackTrace();        }        System.out.println(Thread.currentThread().getName() + " display products images! "                + format.format(new Date()));    }    private void renderProductText(List<ProductInfo> productInfos) {        for (ProductInfo info : productInfos) {            try {                Thread.sleep(50);            } catch (InterruptedException e) {                e.printStackTrace();            }        }        System.out.println(Thread.currentThread().getName() + " display products description! "                + format.format(new Date()));    }    private List<ProductInfo> loadProductInfos() {        List<ProductInfo> list = new ArrayList<>();        try {            TimeUnit.SECONDS.sleep(3);        } catch (InterruptedException e) {            e.printStackTrace();        }        ProductInfo info = new ProductInfo();        info.setImage(new ProductImage());        list.add(info);        System.out.println(Thread.currentThread().getName() + " load products info! "                + format.format(new Date()));        return list;    }    /**     * 商品     */    private static class ProductInfo {        private ProductImage image;         public ProductImage getImage() {            return image;        }         public void setImage(ProductImage image) {            this.image = image;        }    }     private static class ProductImage {    }     public static void main(String[] args) {        DisplayProductInfoWithCompletionService cd = new DisplayProductInfoWithCompletionService(Executors.newCachedThreadPool());        cd.renderProductDetail();    }}
复制代码


执行结果与上面的一样。因为多个 ExecutorCompletionService 可以共享一个 Executor,因此可以创建一个特定某个计算的私有的,又能共享公共的 Executor 的 ExecutorCompletionService。

CompletionService 解决 Future 的 get 方法阻塞问题

解决方法:

CompletionService 的 take()方法获取最先执行完的线程的 Future 对象。

测试方法

public static void main(String[] args) throws Exception {    CallableDemo callable = new CallableDemo(1,100000);    CallableDemo callable2 = new CallableDemo(1,100);    ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 5, 5L,TimeUnit.SECONDS, new LinkedBlockingDeque());    CompletionService csRef = new ExecutorCompletionService(executor);    System.out.println("main 1 " +System.currentTimeMillis());    csRef.submit(callable);    csRef.submit(callable2);    System.out.println("main 2 " +System.currentTimeMillis());    System.out.println(csRef.take().get());    System.out.println("main 3 " +System.currentTimeMillis());    System.out.println(csRef.take().get());    System.out.println("main 4 " +System.currentTimeMillis());}
复制代码

线程类

import java.util.concurrent.Callable;public class CallableDemo implements Callable<String> {    private int begin;    private int end;    private int sum;   public CallableDemo(int begin, int end) {     super();     this.begin = begin;     this.end = end;  }   public String call() throws Exception {       for(int i=begin;i<=end;i++){           for(int j=begin;j<=end;j++){              sum+=j;           }      }      Thread.sleep(8000);    return begin+"-" +end+"的和:"+ sum;   }}
复制代码

CompletionService 小结

相比 ExecutorService,CompletionService 可以更精确和简便地完成异步任务的执行 CompletionService 的一个实现是 ExecutorCompletionService,它是 Executor 和 BlockingQueue 功能的融合体,Executor 完成计算任务,BlockingQueue 负责保存异步任务的执行结果在执行大量相互独立和同构的任务时,可以使用 CompletionServiceCompletionService 可以为任务的执行设置时限,主要是通过 BlockingQueue 的 poll(long time,TimeUnit unit)为任务执行结果的取得限制时间,如果没有完成就取消任务.

发布于: 刚刚阅读数: 2
用户头像

浩宇天尚

关注

🏆InfoQ写作平台-签约作者🏆 2020.03.25 加入

【个人简介】酷爱计算机科学、醉心编程技术、喜爱健身运动、热衷悬疑推理的“极客达人” 【技术格言】任何足够先进的技术都与魔法无异 【技术范畴】Java领域、Spring生态、MySQL专项、微服务/分布式体系和算法设计等

评论

发布
暂无评论
☕【Java深层系列】「并发编程系列」让我们一起探索一下CompletionService的技术原理和使用指南