写点什么

分享一个 JDK 批量异步任务工具 Completion Service,超好用

  • 2022 年 4 月 27 日
  • 本文字数:2049 字

    阅读完需:约 7 分钟

本文分享自华为云社区《JDK批量异步任务最强工具Completion Service》,作者: JavaEdge。


如何优化一个查询各个价格接口的代码?若使用“Thread Pool Executor+Future”,可能优化如下:

三个线程异步执行查询价格,通过三次调用 Future 的 get()方法获取结果,之后将查询结果保存在 MySQL。 


若获取 price1 耗时很长,那么即便获取 price2 耗时短,也无法让保存 price2 的操作先执行,因为主线程都阻塞在 f1.get()。这种问题如何解决呢? 


加个阻塞队列! 获取到 price1、2、3 都进入阻塞队列,然后在主线程消费阻塞队列,就能保证先获取到的价格先保存: 


Completion Service 实现查询价格

实际开发推荐 Completion Service,不但能帮你解决先获取到的价格先保存,还能精简代码。

Completion Service 内部维护了一个阻塞队列,当任务执行结束就把任务的执行结果入队,但 Completion Service 是把任务执行结果的 Future 对象入队,而上面 demo 是把任务最终执行结果入队。

创建 Completion Service

Completion Service 接口的实现类是 Executor Completion Service,这个实现类的构造方法有两个,分别是:

  • Executor Completion Service(Executor executor); 

  • Executor Completion Service(Executor executor, Blocking Queue<Future<V>> completionQueue) 

  • 这俩构造器都需要传入一个线程池,若不指定 completion Queue,默认使用无界 Linked Blocking Queue。任务执行结果的 Future 对象就是加入到 completion Queue 中。


让我们试着利用 Completion Service 实现高性能的查询房价系统。 之后通过 Completion Service#submit()提交三个询价操作,这三个询价操作将会被 Completion Service 异步执行。

最后 Completion Service#take()获取一个 Future 对象(加入到阻塞队列的是任务执行结果的 Future 对象),调用 Future#get()就能返回执行结果。 


Completion Service 接口

Completion Service 接口提供的方法 

 submit()相关的方法有两个:

  • 一个方法参数是Callable<V> task

  • 一个方法有两个参数,分别是 Runnable task 和 V result,该方法类似于 ThreadPoolExecutor 的 <T> Future<T> submit(Runnable task, T result) ,

CompletionService 实现 Dubbo#Forking Cluster

Dubbo 中有一种叫做 Forking 的集群模式,这种集群模式下,支持并行调用多个查询服务,只要有一个成功返回结果,整个服务即可返回。例如你需要提供一个地址转坐标的服务,为了保证该服务的高可用和性能,可并行调用 3 个地图服务商的 API,然后只要有 1 个正确返回了结果 r,那么地址转坐标这个服务就可以直接返回 r 了。这种集群模式可以容忍 2 个地图服务商服务异常,但缺点是消耗的资源偏多。

 geocoder(addr) {   // 并行执行以下3个查询服务,    r1=geocoderByS1(addr);   r2=geocoderByS2(addr);   r3=geocoderByS3(addr);   // 只要r1,r2,r3有一个返回   // 则返回   return r1|r2|r3; }
复制代码

利用 CompletionService 可快速实现 Forking 这种集群模式,比如下面示例代码。 首先创建一个线程池 executor 、一个 CompletionService 对象 cs 和一个Future<Integer>类型的列表 futures,每次通过调用 CompletionService 的 submit()方法提交一个异步任务,会返回一个 Future 对象,把这些 Future 对象保存在列表 futures 中。通过调用 cs.take().get(),我们能够拿到最快返回的任务执行结果,只要我们拿到一个正确返回的结果,就可以取消所有任务并且返回最终结果了。

 // 创建线程池 ExecutorService executor =   Executors.newFixedThreadPool(3); // 创建CompletionService CompletionService<Integer> cs =   new ExecutorCompletionService<>(executor); // 用于保存Future对象 List<Future<Integer>> futures =   new ArrayList<>(3); // 提交异步任务,并保存future到futures  futures.add(   cs.submit(()->geocoderByS1())); futures.add(   cs.submit(()->geocoderByS2())); futures.add(   cs.submit(()->geocoderByS3())); // 获取最快返回的任务执行结果 Integer r = 0; try {   // 只要有一个成功返回,则break   for (int i = 0; i < 3; ++i) {     r = cs.take().get();     // 简单地通过判空来检查是否成功返回     if (r != null) {       break;     }   } } finally {   // 取消所有任务   for(Future<Integer> f : futures)     f.cancel(true); } // 返回结果 return r;
复制代码

总结

当需要批量提交异步任务,推荐 CompletionService。CompletionService 将线程池 Executor 和阻塞队列融合,让批量异步任务管理更简单。


CompletionService 能让异步任务的执行结果有序化,先执行完的先进入阻塞队列,利用该特性,可以轻松实现后续处理的有序性,避免无谓等待,同时还可以快速实现诸如 Forking Cluster 这样的需求。


CompletionService 的实现类 ExecutorCompletionService,需要你自己创建线程池,虽看上去有些啰嗦,但好处是你可以让多个 ExecutorCompletionService 的线程池隔离,这种隔离性能避免几个特别耗时的任务拖垮整个应用的风险。


点击关注,第一时间了解华为云新鲜技术~​

发布于: 刚刚阅读数: 2
用户头像

提供全面深入的云计算技术干货 2020.07.14 加入

华为云开发者社区,提供全面深入的云计算前景分析、丰富的技术干货、程序样例,分享华为云前沿资讯动态,方便开发者快速成长与发展,欢迎提问、互动,多方位了解云计算! 传送门:https://bbs.huaweicloud.com/

评论

发布
暂无评论
分享一个JDK批量异步任务工具Completion Service,超好用_jdk_华为云开发者社区_InfoQ写作社区