写点什么

JUC 并发—Future 模式和异步编程简介

  • 2025-08-05
    福建
  • 本文字数:17434 字

    阅读完需:约 57 分钟

1.Runnable 接口与 Callable 接口


(1)Runnable 接口实现异步任务


也就是通过创建实现了 Runnable 接口的 Thread 线程来实现异步任务。

 

Runnable 接口实现的异步任务存在的问题:

一.Runnable 接口不支持获取返回值

二.Runnable 接口不支持抛出异常

@FunctionalInterfacepublic interface Runnable {    public abstract void run();}
public class Thread implements Runnable { ... private Runnable target; public Thread() { init(null, null, "Thread-" + nextThreadNum(), 0); } @Override public void run() { if (target != null) { target.run(); } } ...}
@RunWith(SpringRunner.class)@SpringBootTestpublic class Test { @Test public void testNewThread() { Thread t1 = new Thread(new Runnable() { @Override public void run() { System.out.println("实现的异步任务"); } }); t1.start(); }}
复制代码


(2)Callable 接口实现异步任务


Callable 接口需要与 Future 和 ExecutorService 结合使用:通过 ExecutorService 的 submit()方法提交一个实现 Callable 接口的任务,然后 ExecutorService 的 submit()方法会返回一个实现 Future 接口的对象,接着调用 Future 接口的 get()方法就可以获取异步任务的结果。

public interface ExecutorService extends Executor {    ...    //Submits a value-returning task for execution and returns a Future representing the pending results of the task.     //The Future's get method will return the task's result upon successful completion.    //@param task the task to submit    //@param <T> the type of the task's result    //@return a Future representing pending completion of the task    <T> Future<T> submit(Callable<T> task);    ...}
public interface Future<V> { ... //Waits if necessary for the computation to complete, and then retrieves its result. V get() throws InterruptedException, ExecutionException; //Waits if necessary for at most the given time for the computation to complete, and then retrieves its result, if available. V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; ...}
复制代码


2.Future 模式


(1)Future 模式的概念


当前线程有一个任务,提交给了 Future,由 Future 来完成这个任务,在此期间当前线程可以处理其他事情了。一段时间后,当前线程就可以从 Future 中获取结果。

 

(2)Future 接口的使用


一.Future 接口源码

Future 就是对实现 Runnable 或 Callable 接口的任务进行查询、中断、获取。

public interface Future<V> {    //用来取消任务,取消成功则返回true,取消失败则返回false    //mayInterruptIfRunning参数表示是否允许取消正在执行却没有执行完毕的任务,设为true,则表示可以取消正在执行过程中的任务    //如果任务已完成,则无论mayInterruptIfRunning为true还是false,此方法都返回false,即如果取消已经完成的任务会返回false    //如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false    //如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true    boolean cancel(boolean mayInterruptIfRunning);
//表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回true boolean isCancelled();
//表示任务是否已经完成,若任务完成,则返回true boolean isDone();
//获取执行结果,如果最终结果还没得出该方法会产生阻塞,直到任务执行完毕返回结果 V get() throws InterruptedException, ExecutionException;
//获取执行结果,如果在指定时间内,还没获取到结果,则抛出TimeoutException V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;}
复制代码


二.普通模式计算 1000 次 1 到 1 亿的和

public class NormalTest {    //普通模式计算1000次1到1亿的和    public static void main(String[] args) {        long start = System.currentTimeMillis();        List<Integer> retList = new ArrayList<>();        //计算1000次1至1亿的和        for(int i = 0; i < 1000; i++) {            retList.add(Calc.cal(100000000));        }        System.out.println("耗时: " + (System.currentTimeMillis() - start));        for (int i = 0; i < 1000; i++) {            try {                Integer result = retList.get(i);                System.out.println("第" + i + "个结果: " + result);            } catch (Exception e) {                      }        }        System.out.println("耗时: " + (System.currentTimeMillis() - start));    }
public static class Calc implements Callable<Integer> { @Override public Integer call() throws Exception { return cal(10000); } public static int cal (int num) { int sum = 0; for (int i = 0; i < num; i++) { sum += i; } return sum; } }}
--------------------------------------------------执行结果:耗时: 43659第0个结果: 887459712第1个结果: 887459712第2个结果: 887459712...第999个结果: 887459712耗时: 43688
复制代码


三.Future 模式计算 1000 次 1 到 1 亿的和

public class FutureTest {    //Future模式计算1000次1到1亿的和    public static void main(String[] args) {        long start = System.currentTimeMillis();        ExecutorService executorService = Executors.newCachedThreadPool();        List<Future<Integer>> futureList = new ArrayList<>();        //计算1000次1至1亿的和        for (int i = 0; i < 1000; i++) {            //调度执行            futureList.add(executorService.submit(new Calc()));        }        System.out.println("耗时: " + (System.currentTimeMillis() - start));        for (int i = 0; i < 1000; i++) {            try {                Integer result = futureList.get(i).get();                System.out.println("第" + i + "个结果: " + result);            } catch (InterruptedException | ExecutionException e) {            }        }        System.out.println("耗时: " + (System.currentTimeMillis() - start));    }
public static class Calc implements Callable<Integer> { @Override public Integer call() throws Exception { return cal(100000000); }
public static int cal (int num) { int sum = 0; for (int i = 0; i < num; i++) { sum += i; } return sum; } }}
--------------------------------------------------执行结果:耗时: 12058第0个结果: 887459712第1个结果: 887459712...第999个结果: 887459712耗时: 12405
复制代码


(3)FutureTask 类的使用


一.FutureTask 类的简介

FutureTask 类实现了 RunnableFuture 接口,而 RunnableFuture 接口又继承了 Runnable 接口和 Future 接口。所以 FutureTask 类既可以作为 Runnable 被线程执行,又可以作为 Future 得到 Callable 的 run()方法的返回值。同时,FutureTask 类是 Future 接口的唯一实现类。

public class FutureTask<V> implements RunnableFuture<V> {    ...    ...}
public interface RunnableFuture<V> extends Runnable, Future<V> { void run();}
复制代码


二.Callable + Future 获取异步任务的执行结果

//Callable+Future获取执行结果public class FutureTest {    public static void main(String[] args) {    		ExecutorService executor = Executors.newCachedThreadPool();        Task task = new Task();        Future<Integer> result = executor.submit(task);        executor.shutdown();        try {            Thread.sleep(1000);        } catch (InterruptedException e1) {            e1.printStackTrace();        }        System.out.println("主线程在执行任务");        try {            System.out.println("task运行结果" + result.get());        } catch (InterruptedException | ExecutionException e) {            e.printStackTrace();        }        System.out.println("所有任务执行完毕");    }}
class Task implements Callable<Integer> { @Override public Integer call() throws Exception { System.out.println("子线程在进行计算"); Thread.sleep(3000); int sum = 0; for (int i = 0; i < 100; i++) { sum += i; } return sum; }}
复制代码


三.Callable + FutureTask 获取异步任务的结果

//Callable + FutureTask获取执行结果public class FutureTest {    public static void main(String[] args) {        //第一种方式        ExecutorService executor = Executors.newCachedThreadPool();        Task task = new Task();        FutureTask<Integer> futureTask = new FutureTask<>(task);        executor.submit(futureTask);        executor.shutdown();
//第二种方式 //注意这种方式和第一种方式效果是类似的,只不过之前使用的是ExecutorService,现在使用的是Thread //Task task = new Task(); //FutureTask<Integer> futureTask = new FutureTask<Integer>(task); //Thread thread = new Thread(futureTask); //thread.start();
try { Thread.sleep(1000); } catch (InterruptedException e1) { e1.printStackTrace(); } System.out.println("主线程在执行任务"); try { System.out.println("task运行结果" + futureTask.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } System.out.println("所有任务执行完毕"); }}
class Task implements Callable<Integer>{ @Override public Integer call() throws Exception { System.out.println("子线程在进行计算"); Thread.sleep(3000); int sum = 0; for(int i=0;i<100;i++) sum += i; return sum; }}
复制代码


3.CompletableFuture 的使用和异步编程


(1)使用 Future 时的问题


一.通过 Future 获取结果演示

如果主线程需要执行一个很耗时的计算任务,那么可通过 Future 把该任务放到异步线程执行,让主线程继续处理其他任务。当这个耗时的任务处理完成后,再让主线程通过 Future 获取计算结果。

 

如下所示,有两个服务:

public class UserInfoService {    public UserInfo getUserInfo(Long userId) throws InterruptedException {        Thread.sleep(300);//模拟调用耗时        return new UserInfo("...");//一般是查数据库,或者远程调用返回    }}
public class OrderService { public OrderInfo getOrderInfo(long userId) throws InterruptedException { Thread.sleep(500);//模拟调用耗时 return new OrderInfo("..."); }}
复制代码


接下来,演示在主线程中是如何使用 Future 来进行异步调用的。

public class FutureTest {    public static void main(String[] args) throws ExecutionException, InterruptedException {        ExecutorService executorService = Executors.newFixedThreadPool(10);        UserInfoService userInfoService = new UserInfoService();        OrderService orderService = new OrderService();        long userId = 1L;        long startTime = System.currentTimeMillis();                //调用用户服务获取用户基本信息        FutureTask<UserInfo> userInfoFutureTask = new FutureTask<>(new Callable<UserInfo>() {            @Override            public UserInfo call() throws Exception {                return userInfoService.getUserInfo(userId);            }        });        //提交任务给线程池异步执行        executorService.submit(userInfoFutureTask);                //模拟主线程其它操作耗时        Thread.sleep(300);                //调用订单服务获取用户订单信息        FutureTask<OrderInfo> orderInfoFutureTask = new FutureTask<>(new Callable<OrderInfo>() {            @Override            public MedalInfo call() throws Exception {                return medalService.getMedalInfo(userId);            }        });        //提交任务给线程池异步执行        executorService.submit(medalInfoFutureTask);                UserInfo userInfo = userInfoFutureTask.get();//获取用户信息结果        OrderInfo orderInfo = orderInfoFutureTask.get();//获取订单信息结果        System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");    }}//总共用时806ms
复制代码


如果不使用 Future 进行并行异步调用,而是在主线程串行执行,那么耗时大约为 300 + 500 + 300 = 1100ms。

 

二.Future 获取结果时存在的问题

可见,Future + 线程池异步配合,提高了程序的执行效率。但是由于根据 Future 获取结果的方式不是很友好,所以只能通过阻塞或者轮询的方式来得到任务的结果。

 

方式一:

通过 Future 提供的 get()方法,进行阻塞调用。在主线程获取到异步任务的执行结果前,get()方法会一直阻塞。

 

方式二:

通过 Future 提供的 isDone()方法,进行轮询调用。可以让主线程在程序中轮询 isDone()方法来查询异步任务的执行结果。

 

阻塞的方式会违背异步编程的理念,轮询的方式又会空耗 CPU 资源,因此 JDK8 设计出了 CompletableFuture。

 

CompletableFuture 提供了一种类似观察者模式的机制,可以让异步任务执行完成后通知主线程。

 

三.使用 Future 的问题总结

首先需要单独创建一个线程池来提交 Callable 任务。然后如果使用 Future 的 get()方法获取结果,那么需要进行阻塞调用。如果使用 Future 的 isDone()方法获取结果,那么需要进行轮询调用。

 

(2)CompletableFuture 的使用例子


如下所示,使用 CompletableFuture,代码简洁了很多。CompletableFuture 的 supplyAsync()方法,提供了异步执行的功能,线程池也不用单独创建了,实际上使用的默认线程池是 ForkJoinPool.commonPool。

public class CompletableFutureTest {    public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {        UserInfoService userInfoService = new UserInfoService();        OrderService orderService = new OrderService();        long userId = 1L;        long startTime = System.currentTimeMillis();                //调用用户服务获取用户基本信息        CompletableFuture<UserInfo> completableUserInfoFuture =            CompletableFuture.supplyAsync(() -> userInfoService.getUserInfo(userId));                //模拟主线程其它操作耗时        Thread.sleep(300);                //调用订单服务获取用户订单信息        CompletableFuture<OrderInfo> completableOrderInfoFuture =             CompletableFuture.supplyAsync(() -> orderService.getOrderInfo(userId));
//获取个人信息结果 UserInfo userInfo = completableUserInfoFuture.get(); //获取订单信息结果 OrderInfo orderInfo = completableOrderInfoFuture.get(); System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms"); }}
复制代码


(3)CompletableFuture 的使用场景


一.创建异步任务

二.简单任务异步回调

三.多个任务组合处理

 

(4)CompletableFuture 的创建异步任务


CompletableFuture 创建异步任务的方法:supplyAsync()和 runAsync();

一.supplyAsync()方法执行 CompletableFuture 任务,有返回值。

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {    ...    //Returns a new CompletableFuture that is asynchronously completed by a task running in the ForkJoinPool#commonPool()     //with the value obtained by calling the given Supplier.    //@param supplier a function returning the value to be used to complete the returned CompletableFuture    //@param <U> the function's return type    //@return the new CompletableFuture    //使用默认内置线程池ForkJoinPool.commonPool(),根据supplier构建执行任务    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {        return asyncSupplyStage(asyncPool, supplier);    }
//Returns a new CompletableFuture that is asynchronously completed by a task running in the given executor //with the value obtained by calling the given Supplier. //@param supplier a function returning the value to be used to complete the returned CompletableFuture //@param executor the executor to use for asynchronous execution //@param <U> the function's return type //@return the new CompletableFuture //使用自定义的线程池,根据supplier构建执行任务 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) { return asyncSupplyStage(screenExecutor(executor), supplier); } ...}
复制代码


二.runAsync()方法执行 CompletableFuture 任务,没有返回值。

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {    ...    //Returns a new CompletableFuture that is asynchronously completed by a task running in the ForkJoinPool#commonPool()     //after it runs the given action.    //@param runnable the action to run before completing the returned CompletableFuture    //@return the new CompletableFuture    //使用默认内置线程池ForkJoinPool.commonPool(),根据runnable构建执行任务    public static CompletableFuture<Void> runAsync(Runnable runnable) {        return asyncRunStage(asyncPool, runnable);    }
//Returns a new CompletableFuture that is asynchronously completed by a task running in the given executor //after it runs the given action. //@param runnable the action to run before completing the returned CompletableFuture //@param executor the executor to use for asynchronous execution //@return the new CompletableFuture //使用自定义的线程池,根据runnable构建执行任务 public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) { return asyncRunStage(screenExecutor(executor), runnable); } ...}
复制代码


使用示例如下:

public class CompletableFuture {    public static void main(String[] args) {        //自定义线程池        ExecutorService executor = Executors.newCachedThreadPool();                //runAsync的使用,没有返回值        CompletableFuture<Void> runFuture =            CompletableFuture.runAsync(() -> System.out.println("没有返回值"), executor);        //supplyAsync的使用,有返回值        CompletableFuture<String> supplyFuture =            CompletableFuture.supplyAsync(() -> { System.out.print("有返回值"); return "OK"; }, executor);                //runAsync的future没有返回值,输出null        System.out.println(runFuture.join());        //supplyAsync的future,有返回值        System.out.println(supplyFuture.join());                //关闭线程池        executor.shutdown();    }}
复制代码


(5)CompletableFuture 的简单任务异步回调



一.thenRun()和 thenRunAsync()方法

CompletableFuture 的 thenRun()方法就是:执行完第一个任务后,再执行第二个任务。也就是当某个任务执行完成后,会执行设置给该任务的回调方法。但是前后两个任务没有传递参数,第二个任务也没有返回值。

 

thenRun()和 thenRunAsync()方法的区别是:如果执行第一个任务时,传入了一个自定义线程池。当调用 thenRun()方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池。当调用 thenRunAsync()方法执行第二个任务时,则第一个任务使用传入的线程池,第二个任务使用 ForkJoin 线程池。也就是说,thenRunAsync()会使用 ForkJoin 线程池来异步执行任务。

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {    private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();    ...    public CompletableFuture<Void> thenRun(Runnable action) {        return uniRunStage(null, action);    }    public CompletableFuture<Void> thenRunAsync(Runnable action) {        return uniRunStage(asyncPool, action);    }    ...}
public class FutureThenRunTest { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(() -> { System.out.println("先执行第一个任务"); return "第一个任务执行完成"; }); CompletableFuture thenRunFuture = orgFuture.thenRun(() -> { System.out.println("接着执行第二个任务"); }); System.out.println("输出" + thenRunFuture.get()); }}
//执行程序输出的结果如下://先执行第一个任务//接着执行第二个任务//输出null
复制代码


二.thenAccept()和 thenAcceptAsync()方法

CompletableFuture 的 thenAccept()方法表示:第一个任务执行完成后,执行第二个任务(回调方法)时,会将第一个任务的执行结果作为入参,传递到第二个任务中,但是第二个任务是没有返回值的。

 

CompletableFuture 的 thenAcceptAsync()方法会使用 ForkJoin 线程池来异步执行任务。

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {    private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();    ...    public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {        return uniAcceptStage(null, action);    }    public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {        return uniAcceptStage(asyncPool, action);    }    ...}
public class FutureThenAcceptTest { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(() -> { System.out.println("执行第一个任务"); return "第一个任务的返回值"; }); CompletableFuture thenAcceptFuture = orgFuture.thenAccept((a) -> { System.out.println("执行第二个任务"); if ("第一个任务的返回值".equals(a)) { System.out.println("收到传入的第一个任务的返回值"); } }); System.out.println("输出" + thenAcceptFuture.get()); }}
//执行程序输出的结果如下://执行第一个任务//执行第二个任务//收到传入的第一个任务的返回值//输出null
复制代码


三.thenApply()和 thenApplyAsync()方法

CompletableFuture 的 thenApply()方法表示:第一个任务执行完成后,执行第二个任务(回调方法)时,会将第一个任务的执行结果作为入参,传递到第二个任务中,并且第二个任务是有返回值的。

 

CompletableFuture 的 thenApplyAsync()方法会使用 ForkJoin 线程池来异步执行任务。

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {    private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();    ...    public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {        return uniApplyStage(null, fn);    }    public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) {        return uniApplyStage(asyncPool, fn);    }    ...}
public class FutureThenApplyTest { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(() -> { System.out.println("执行第一个任务"); return "第一个任务的返回值"; }); CompletableFuture<String> thenApplyFuture = orgFuture.thenApply((a) -> { if ("第一个任务的返回值".equals(a)) { System.out.println("收到传入的第一个任务的返回值"); System.out.println("执行第二个任务"); return "第二个任务的返回值"; } return "第二个任务的返回值"; }); System.out.println("输出" + thenApplyFuture.get()); }}
//执行程序输出的结果如下://执行第一个任务//收到传入的第一个任务的返回值//执行第二个任务//输出第二个任务的返回值
复制代码


四.exceptionally()方法

CompletableFuture 的 exceptionally()方法表示:某个任务执行异常时,才执行的回调方法。并且将抛出的异常作为参数,传递到回调方法中。

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {    ...    public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) {        return uniExceptionallyStage(fn);    }    ...}
public class FutureExceptionTest { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(() -> { System.out.println("异步执行任务时抛出异常"); throw new RuntimeException(); }); CompletableFuture<String> exceptionFuture = orgFuture.exceptionally((e) -> { e.printStackTrace(); return "返回处理异步执行任务抛出的异常的结果"; }); System.out.println(exceptionFuture.get()); }}
//执行程序输出的结果如下://异步执行任务时抛出异常//java.util.concurrent.CompletionException: java.lang.RuntimeException//返回处理异步执行任务抛出的异常的结果
复制代码


五.whenComplete()和 whenCompleteAsync()

CompletableFuture 的 whenComplete()方法表示:某个任务执行完成后,紧接着执行的回调方法无返回值。whenComplete()方法返回的 CompletableFuture 的 result 是上个任务的结果。

 

CompletableFuture 的 whenCompleteAsync()方法会使用 ForkJoin 线程池来异步执行回调方法。

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {    private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();    ...        public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) {        return uniWhenCompleteStage(null, action);    }
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action) { return uniWhenCompleteStage(asyncPool, action); } ...}
public class FutureWhenTest { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(() -> { System.out.println("异步执行第一个任务"); try { Thread.sleep(2000L); } catch (InterruptedException e) { e.printStackTrace(); } return "第一个任务的返回值"; }); CompletableFuture<String> rstFuture = orgFuture.whenComplete((a, throwable) -> { System.out.println("异步执行第二个任务"); if ("第一个任务的返回值".equals(a)) { System.out.println("收到传入的第一个任务的返回值"); } System.out.println("whenComplete()执行的回调方法没有返回值"); }); System.out.println("输出" + rstFuture.get()); }}
//执行程序输出的结果如下://异步执行第一个任务//异步执行第二个任务//收到传入的第一个任务的返回值//whenComplete()执行的回调方法没有返回值//输出第一个任务的返回值
复制代码


六.handle()和 handleAsync()方法

CompletableFuture 的 handle()方法表示:异步任务执行完成后,紧接着执行的回调方法是有返回值的。handle()方法返回的 CompletableFuture 的 result 是回调方法执行的结果。

 

CompletableFuture 的 handleAsync()方法会使用 ForkJoin 线程池来异步执行回调方法。

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {    private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();    ...        public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) {        return uniHandleStage(null, fn);    }
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn) { return uniHandleStage(asyncPool, fn); } ...}
public class FutureHandlerTest { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(() -> { System.out.println(""); try { Thread.sleep(2000L); } catch (InterruptedException e) { e.printStackTrace(); } return "第一个任务的返回值"; }); CompletableFuture<String> rstFuture = orgFuture.handle((a, throwable) -> { System.out.println("执行第二个任务"); if ("第一个任务的返回值".equals(a)) { System.out.println("收到传入的第一个任务的返回值"); return "第二个任务的返回值"; } return "第二个任务的返回值"; }); System.out.println("输出" + rstFuture.get()); }}
//执行程序输出的结果如下://执行第二个任务//收到传入的第一个任务的返回值//输出第二个任务的返回值
复制代码


(6)CompletableFuture 的多个任务组合处理


一.AND 组合关系

thenCombine()、thenAcceptBoth()、runAfterBoth()都表示:将两个 CompletableFuture 任务组合起来,只有这两个任务都正常执行完后,才会执行后面的回调方法。区别如下:

 

thenCombine()方法会将两个任务的执行结果作为方法入参,传递到指定的回调方法中,且指定的回调方法有返回值。

 

thenAcceptBoth()方法会将两个任务的执行结果作为方法入参,传递到指定的回调方法中,但指定的回调方法无返回值。

 

runAfterBoth()方法则不会把两个任务的执行结果当做方法入参,传递到指定的回调方法中,且指定的回调方法没有返回值。

public class ThenCombineTest {    public static void main(String[] args) throws Exception {        CompletableFuture<String> firstFuture = CompletableFuture.supplyAsync(() -> {            System.out.println("第一个异步任务要执行3秒");            try {                Thread.sleep(3000L);            } catch (InterruptedException e) {                e.printStackTrace();            }            System.out.println("第一个异步任务执行完毕");            return "第一个任务的返回值";        });        ExecutorService executor = Executors.newFixedThreadPool(2);        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {            System.out.println("第二个异步任务要执行2秒");            try {                Thread.sleep(2000L);            } catch (InterruptedException e) {                e.printStackTrace();            }            System.out.println("第二个异步任务执行完毕");            return "第二个任务的返回值";        }, executor).thenCombineAsync(firstFuture, (secondResult, firstResult) -> {            System.out.println("两个异步任务都执行完毕后才能执行这里");            System.out.println("接收到" + firstResult);            System.out.println("接收到" + secondResult);            return "两个异步任务都执行完后执行的回调的返回值";        }, executor);        System.out.println("输出" + future.join());        executor.shutdown();    }}
//执行程序输出的结果如下://第一个异步任务要执行3秒//第二个异步任务要执行2秒//第二个异步任务执行完毕//第一个异步任务执行完毕//两个异步任务都执行完毕后才能执行这里//接收到第一个任务的返回值//接收到第二个任务的返回值//输出两个异步任务都执行完后执行的回调的返回值
复制代码


二.OR 组合关系

applyToEither()、acceptEither()、runAfterEither()都表示:将两个 CompletableFuture 组合起来,只要其中一个执行完了,就会执行某个任务。区别如下:

 

applyToEither()方法会将已经执行完成的任务的结果,作为方法入参,传递到指定的回调方法中,且指定的回调方法有返回值。

 

acceptEither()方法会将已经执行完成的任务的结果,作为方法入参,传递到指定的回调方法中,且指定的回调方法无返回值。

 

runAfterEither()方法不会把已经执行完成的任务的结果当做方法入参,传递到指定的回调方法中,且指定的回调方法没有返回值。

public class AcceptEitherTest {    public static void main(String[] args) {        //第一个异步任务,休眠2秒,保证它执行晚点        CompletableFuture<String> first = CompletableFuture.supplyAsync(() -> {            try{                Thread.sleep(2000L);                System.out.println("执行完第一个任务");            } catch (Exception e) {                return "执行第一个任务异常";            }            return "返回第一个任务的结果";        });        ExecutorService executor = Executors.newSingleThreadExecutor();        //第二个异步任务        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {            System.out.println("执行完第二个任务");            return "返回第二个任务的结果";        }, executor).acceptEitherAsync(first, (lastResult) -> {            System.out.println("执行完第一个或第二个任务后的回调");            System.out.println("获取到传入的先执行完的任务的返回结果是:" + lastResult);        }, executor);        executor.shutdown();    }}
//执行程序输出的结果如下://执行完第二个任务//执行完第一个或第二个任务后的回调//获取到传入的先执行完的任务的返回结果是:返回第二个任务的结果
复制代码


三.anyOf

任意一个任务执行完,就执行 anyOf()方法返回的 CompletableFuture。如果执行的任务异常,anyOf()方法返回的 CompletableFuture 在执行 get()方法时,会抛出异常。

public class AnyOfFutureTest {    public static void main(String[] args) throws ExecutionException, InterruptedException {        CompletableFuture<Void> a = CompletableFuture.runAsync(() -> {            try {                Thread.sleep(3000L);            } catch (InterruptedException e) {                e.printStackTrace();            }            System.out.println("任务A执行完了");        });        CompletableFuture<Void> b = CompletableFuture.runAsync(() -> {            System.out.println("任务B执行完了");        });        CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(a, b).whenComplete((m, k) -> {            System.out.println("finish");        });        anyOfFuture.join();    }}
//执行程序输出的结果如下://任务B执行完了//finish
复制代码


四.allOf

所有任务都执行完成后,才执行 allOf()方法返回的 CompletableFuture。如果任意一个任务异常,allOf()方法返回的 CompletableFuture 在执行 get()方法时,会抛出异常。

public class allOfFutureTest {    public static void main(String[] args) throws ExecutionException, InterruptedException {        CompletableFuture<Void> a = CompletableFuture.runAsync(() -> {            System.out.println("任务A执行完了");        });        CompletableFuture<Void> b = CompletableFuture.runAsync(() -> {            System.out.println("任务B执行完了");        });        CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(a, b).whenComplete((m, k) -> {            System.out.println("finish:" + m + "," + k);        });    }}
//执行程序输出的结果如下://任务A执行完了//任务B执行完了//finish: null,null
复制代码


五.thenCompose

thenCompose()方法会在某个任务执行完成后,将该任务的执行结果作为方法入参去执行指定的方法。thenCompose()方法会返回一个新的 CompletableFuture 实例。

public class ThenComposeTest {    public static void main(String[] args) throws ExecutionException, InterruptedException {        CompletableFuture<String> f = CompletableFuture.completedFuture("第一个任务");        ExecutorService executor = Executors.newSingleThreadExecutor();        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {            System.out.println("执行第二个任务");            return "返回第二个任务的结果";        }, executor).thenComposeAsync(data -> {            System.out.println("执行第三个任务");            System.out.println("收到传入的:" + data);            return f;        }, executor);        System.out.println(future.join());        executor.shutdown();    }}
//执行程序输出的结果如下://执行第二个任务//执行第三个任务//收到传入的:返回第二个任务的结果//第一个任务
复制代码


(7)CompletableFuture 的使用注意事项


一.Future 需要获取返回值,才能获取异常信息

public class ThenComposeTest {    public static void main(String[] args) throws ExecutionException, InterruptedException {        ExecutorService executorService = newThreadPoolExecutor(            5,            10,            5L,            TimeUnit.SECONDS,            new ArrayBlockingQueue<>(10)        );        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {            int a = 0;            int b = 666;            int c = b / a;            return true;        },executorService).thenAccept((a) -> {            System.out.println(a);        });
//如果如下这一行get()方法,是看不到异常信息的 //future.get(); }}
复制代码


二.CompletableFuture 的 get()方法是阻塞的

CompletableFuture 的 get()方法是阻塞的,如果使用它来获取异步调用的返回值,需要添加超时时间。

 

三.默认线程池的注意点

CompletableFuture 代码中使用了默认的线程池,处理的线程个数是机器 CPU 核数 - 1。在大量请求过来时,如果处理逻辑复杂,那么响应就会很慢。所以一般建议使用自定义线程池,优化线程池配置参数。

 

四.自定义线程池时注意饱和策略

由于 CompletableFuture 的 get()方法是阻塞的,所以一般建议使用类似 future.get(3, TimeUnit.SECONDS),并且一般建议使用自定义线程池。

 

但如果线程池拒绝策略是 DiscardPolicy 或者 DiscardOldestPolicy,那么当线程池饱和时,会直接丢弃任务,不会抛出异常。

 

因此建议 CompletableFuture 线程池的拒绝策略最好使用 AbortPolicy,然后对耗时的异步线程做好线程池隔离。


文章转载自:东阳马生架构

原文链接:https://www.cnblogs.com/mjunz/p/18737024

体验地址:http://www.jnpfsoft.com/?from=001YH

用户头像

还未添加个人签名 2023-06-19 加入

还未添加个人简介

评论

发布
暂无评论
JUC并发—Future模式和异步编程简介_Java_不在线第一只蜗牛_InfoQ写作社区