写点什么

Java 多线程 CompletionService 和 ExecutorCompletionService

作者:Yeats_Liao
  • 2022-10-17
    江西
  • 本文字数:6787 字

    阅读完需:约 1 分钟

一、说明

Future 的不足


  • 当通过 .get() 方法获取线程的返回值时,会导致阻塞

  • 也就是和当前这个 Future 关联的计算任务真正执行完成的时候才返回结果

  • 新任务必须等待已完成任务的结果才能继续进行处理,会浪费很多时间,最好是谁最先执行完成谁最先返回


CompletionService 的引入


  • 解决阻塞的问题

  • 以异步的方式一边处理新的线程任务,一边处理已完成任务的结果,将执行任务与处理任务分开进行处理

二、理解

CompletionService


  • java.util.concurrent包下CompletionService<V>接口,但并不继承Executor接口,仅有一个实现类ExecutorCompletionService用于管理线程对象

  • 更加有效地处理 Future 的返回值,避免阻塞,使用.submit()方法执行任务,使用.take()取得已完成的任务,并按照完成这些任务的时间顺序处理它们的结果


public interface CompletionService<V> {    Future<V> submit(Callable<V> task);    Future<V> submit(Runnable task, V result);    Future<V> take() throws InterruptedException;    Future<V> poll();    Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;}
复制代码


  • submit()方法用来执行线程任务

  • take()方法从队列中获取完成任务的 Future 对象,谁最先执行完成谁最先返回,获取到的对象再调用.get()方法获取结果

  • poll()方法获取并删除代表下一个已完成任务的 Future,如果不存在,则返回 null,此无阻塞的效果

  • poll(long timeout, TimeUnit unti) timeout 表示等待的最长时间,unit 表示时间单位,在指定时间内还没获取到结果,则返回 null


ExecutorCompletionService


  • java.util.concurrent包下ExecutorCompletionService<V>类实现CompletionService<V>接口,方法与接口相同

  • ExecutorService可以更精确和简便地完成异步任务的执行

  • executor执行任务,completionQueue保存异步任务执行的结果


public class ExecutorCompletionService<V> implements CompletionService<V> {    private final Executor executor;    private final AbstractExecutorService aes;    private final BlockingQueue<Future<V>> completionQueue;    ……    Future<V> submit(Callable<V> task)     Future<V> submit(Runnable task, V result)     Future<V> take() throws InterruptedException    Future<V> poll()     Future<V> poll(long timeout, TimeUnit unit)    ……}
复制代码


  • completionQueue初始化了一个LinkedBlockingQueue类型的先进先出阻塞队列


    public ExecutorCompletionService(Executor executor) {        if (executor == null)            throw new NullPointerException();        this.executor = executor;        this.aes = (executor instanceof AbstractExecutorService) ?            (AbstractExecutorService) executor : null;        this.completionQueue = new LinkedBlockingQueue<Future<V>>();    }
复制代码


  • submit()方法中QueueingFutureExecutorCompletionService中的内部类


    public Future<V> submit(Callable<V> task) {        if (task == null) throw new NullPointerException();        RunnableFuture<V> f = newTaskFor(task);        executor.execute(new QueueingFuture<V>(f, completionQueue));        return f;    }
复制代码


  • QueueingFutureRunnableFuture实例对象赋值给了task,内部的done()方法将task添加到已完成阻塞队列中,调用take()poll()方法获取已完成的 Future


    private static class QueueingFuture<V> extends FutureTask<Void> {        QueueingFuture(RunnableFuture<V> task,                       BlockingQueue<Future<V>> completionQueue) {            super(task, null);            this.task = task;            this.completionQueue = completionQueue;        }        private final Future<V> task;        private final BlockingQueue<Future<V>> completionQueue;        protected void done() { completionQueue.add(task); }    }
复制代码

三、实现

1.使用 Future

创建CompletionServiceDemo类,创建好的线程对象,使用Executors工厂类来创建ExecutorService的实例(即线程池),通过ThreadPoolExecutor.submit()方法提交到线程池去执行,线程执行后,返回值 Future 可被拿到


public class CompletionServiceDemo {    public static void main(String[] args) throws ExecutionException, InterruptedException {        // 1.创建线程池        ExecutorService executorService = Executors.newFixedThreadPool(5);
// 2.创建Callable子线程对象任务 Callable callable_1 = new Callable() { @Override public String call() throws Exception { Thread.sleep(5000); return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " ); } };
Callable callable_2 = new Callable() { @Override public String call() throws Exception { Thread.sleep(3000); return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " ); } };
Callable callable_3 = new Callable() { @Override public String call() throws Exception { Thread.sleep(1000); return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " ); } };
// 3.使用Future提交三个任务到线程池 Future future_1 = executorService.submit(callable_1); Future future_2 = executorService.submit(callable_2); Future future_3 = executorService.submit(callable_3);
// 4.获取返回值 System.out.println("开始获取结果 " + getStringDate()); System.out.println(future_1.get() + "" + getStringDate()); System.out.println(future_2.get() + "" + getStringDate()); System.out.println(future_3.get() + "" + getStringDate()); System.out.println("结束 " + getStringDate()); // 5.关闭线程池 executorService.shutdown(); }
// 获取时间函数 public static String getStringDate() { Date currentTime = new Date(); SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH:mm:ss.SSS"); String date = simpleDateFormat.format(currentTime); return date; }}
复制代码


future_1.get()会等待执行时间阻塞 5 秒再获取到结果,而在这 5 秒内future_2future_3的任务已完成,所以会立马得到结果


2.使用 ExecutorCompletionService

创建一个ExecutorCompletionService放入线程池实现CompletionService接口,将创建好的线程对象通过CompletionService提交任务和获取结果


public class CompletionServiceDemo {    public static void main(String[] args) throws ExecutionException, InterruptedException {        // 1.创建线程池        ExecutorService executorService = Executors.newFixedThreadPool(5);                // 2.创建一个ExecutorCompletionService放入线程池实现CompletionService接口        CompletionService completionService = new ExecutorCompletionService(executorService);                // 3.创建Callable子线程对象任务        Callable callable_1 = new Callable() {            @Override            public String call() throws Exception {                Thread.sleep(5000);                return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " );            }        };
Callable callable_2 = new Callable() { @Override public String call() throws Exception { Thread.sleep(3000); return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " ); } };
Callable callable_3 = new Callable() { @Override public String call() throws Exception { Thread.sleep(1000); return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " ); } };
// 3.使用CompletionService提交三个任务到线程池 completionService.submit(callable_1); completionService.submit(callable_2); completionService.submit(callable_3);
// 4.获取返回值 System.out.println("开始获取结果 " + getStringDate()); System.out.println(completionService.take().get() + "" + getStringDate()); System.out.println(completionService.take().get() + "" + getStringDate()); System.out.println(completionService.take().get() + "" + getStringDate()); System.out.println("结束 " + getStringDate());
// 5.关闭线程池 executorService.shutdown(); }
// 获取时间函数 public static String getStringDate() { Date currentTime = new Date(); SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH:mm:ss.SSS"); String date = simpleDateFormat.format(currentTime); return date; }}
复制代码


提交顺序是 1-2-3,按照完成这些任务的时间顺序处理它们的结果,返回顺序是 3-2-1


3.take()方法

take()方法从队列中获取完成任务的 Future 对象,会阻塞,一直等待线程池中返回一个结果,谁最先执行完成谁最先返回,获取到的对象再调用.get()方法获取结果


如果调用take()方法的次数大于任务数,会因为等不到有任务返回结果而阻塞,只有三个任务,第四次 take 等不到结果而阻塞


4.poll()方法

poll()方法不会去等结果造成阻塞,没有结果则返回 null,接着程序继续往下运行


直接用completionService.poll().get()会引发 NullPointerException



创建一个循环,连续调用poll()方法,每次隔 1 秒调用,没有结果则返回 null



public class CompletionServiceDemo {    public static void main(String[] args) throws ExecutionException, InterruptedException {        // 1.创建线程池        ExecutorService executorService = Executors.newFixedThreadPool(5);        // 2.创建一个ExecutorCompletionService放入线程池实现CompletionService接口        CompletionService completionService = new ExecutorCompletionService(executorService);        // 3.创建Callable子线程对象任务        Callable callable_1 = new Callable() {            @Override            public String call() throws Exception {                Thread.sleep(5000);                return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " );            }        };
Callable callable_2 = new Callable() { @Override public String call() throws Exception { Thread.sleep(3000); return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " ); } };
Callable callable_3 = new Callable() { @Override public String call() throws Exception { Thread.sleep(1000); return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " ); } };
// 3.使用CompletionService提交三个任务到线程池 completionService.submit(callable_1); completionService.submit(callable_2); completionService.submit(callable_3);
// 4.获取返回值 System.out.println("开始获取结果 " + getStringDate());
// 5.创建一个循环,连续调用poll()方法,间隔1秒 for (int i = 0; i < 8; i++) { Future future = completionService.poll(); if (future!=null){ System.out.println(future.get() + getStringDate()); }else { System.out.println(future+" "+getStringDate()); } Thread.sleep(1000); } System.out.println("结束 " + getStringDate());
// 6.关闭线程池 executorService.shutdown(); }
// 获取时间函数 public static String getStringDate() { Date currentTime = new Date(); SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH:mm:ss.SSS"); String date = simpleDateFormat.format(currentTime); return date; }}
复制代码

5.poll(long timeout, TimeUnit unit)方法

poll(long timeout, TimeUnit unit)方法设置了等待时间,等待超时还没有结果就返回 null


不使用 Thread.sleep(1000),将等待时间设置成 0.5 秒,由于只有 8 次循环,也就是 4 秒执行时间,而callable_1需要执行 5 秒,获取不到结果则返回 null



public class CompletionServiceDemo {    public static void main(String[] args) throws ExecutionException, InterruptedException {        // 1.创建线程池        ExecutorService executorService = Executors.newFixedThreadPool(5);        // 2.创建一个ExecutorCompletionService放入线程池实现CompletionService接口        CompletionService completionService = new ExecutorCompletionService(executorService);        // 3.创建Callable子线程对象任务        Callable callable_1 = new Callable() {            @Override            public String call() throws Exception {                Thread.sleep(5000);                return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " );            }        };
Callable callable_2 = new Callable() { @Override public String call() throws Exception { Thread.sleep(3000); return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " ); } };
Callable callable_3 = new Callable() { @Override public String call() throws Exception { Thread.sleep(1000); return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " ); } };
// 3.使用CompletionService提交三个任务到线程池 completionService.submit(callable_1); completionService.submit(callable_2); completionService.submit(callable_3);
// 4.获取返回值 System.out.println("开始获取结果 " + getStringDate());
// 5.创建一个循环,连续调用poll()方法,间隔1秒 for (int i = 0; i < 8; i++) { Future future = completionService.poll(500, TimeUnit.MILLISECONDS); if (future!=null){ System.out.println(future.get() + getStringDate()); }else { System.out.println(future+" "+getStringDate()); } } System.out.println("结束 " + getStringDate());
// 6.关闭线程池 executorService.shutdown(); }
// 获取时间函数 public static String getStringDate() { Date currentTime = new Date(); SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH:mm:ss.SSS"); String date = simpleDateFormat.format(currentTime); return date; }}
复制代码


发布于: 刚刚阅读数: 3
用户头像

Yeats_Liao

关注

Hello,World! 2022-10-02 加入

这里更多的是记录个人学习,如果有侵权内容请联系我! 个人邮箱是:yeats_liao@foxmail.com

评论

发布
暂无评论
Java多线程 CompletionService和ExecutorCompletionService_后端_Yeats_Liao_InfoQ写作社区