当程序员多年了,现在竟然还有人只会多线程 -,不会异步编程!- 我惊呆了
ExecutorService executor = Executors.newSingleThreadExecutor();CompletableFuture<Void> rFuture = CompletableFuture.runAsync(() -> System.out.println("hello siting"), executor);//supplyAsync 的使用 CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {System.out.print("hello ");return "siting";}, executor);
//阻塞等待,runAsync 的 future 无返回值,输出 nullSystem.out.println(rFuture.join());//阻塞等待 String name = future.join();System.out.println(name);executor.shutdown(); // 线程池需要关闭--------输出结果--------hello sitingnullhello siting
常量值作为 CompletableFuture 返回
//有时候是需要构建一个常量的 CompletableFuturepublic static <U> CompletableFuture<U> completedFuture(U value)2 、线程串行执行精通多线程,却不会异步编程?任务完成则运行 action,不关心上一个任务的结果,无返回值 public CompletableFuture<Void> thenRun(Runnable action)public CompletableFuture<Void> thenRunAsync(Runnable action)public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)
使用示例
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "hello siting", executor).thenRunAsync(() -> System.out.println("OK"), executor);executor.shutdown();--------输出结果--------OK
任务完成则运行 action,依赖上一个任务的结果,无返回值
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
使用示例
ExecutorService executor = Executors.newSingleThreadExecutor();CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "hello siting", executor).thenAcceptAsync(System.out::println, executor);executor.shutdown();--------输出结果--------hello siting
任务完成则运行 fn,依赖上一个任务的结果,有返回值
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
使用示例
ExecutorService executor = Executors.newSingleThreadExecutor();CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "hello world", executor).thenApplyAsync(data -> {System.out.println(data); return "OK";}, executor);System.out.println(future.join());executor.shutdown();--------输出结果--------hello worldOK
thenCompose - 任务完成则运行 fn,依赖上一个任务的结果,有返回值
类似 thenApply(区别是 thenCompose 的返回值是 CompletionStage,thenApply 则是返回 U),提供该方法为了和其他 CompletableFuture 任务更好地配套组合使用 public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,Executor executor)
使用示例
//第一个异步任务,常量任务 CompletableFuture<String> f = CompletableFuture.completedFuture("OK");//第二个异步任务 ExecutorService executor = Executors.newSingleThreadExecutor();CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "hello world", executor).thenComposeAsync(data -> {System.out.println(data); return f; //使用第一个任务作为返回}, executor);System.out.println(future.join());executor.shutdown();--------输出结果--------hello worldOK
3 、线程并行执行
两个 CompletableFuture[并行]执行完,然后执行 action,不依赖上两个任务的结果,无返回值
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action)public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor)
使用示例
//第一个异步任务,常量任务 CompletableFuture<String> first = CompletableFuture.completedFuture("hello world");ExecutorService executor = Executors.newSingleThreadExecutor();CompletableFuture<Void> future = CompletableFuture//第二个异步任务.supplyAsync(() -> "hello siting", executor)// () -> System.out.println("OK") 是第三个任务.runAfterBothAsync(first, () -> System.out.println("OK"), executor);executor.shutdown();--------输出结果--------OK
两个 CompletableFuture[并行]执行完,然后执行 action,依赖上两个任务的结果,无返回值
//第一个任务完成再运行 other,fn 再依赖消费两个任务的结果,无返回值 public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action)//两个任务异步完成,fn 再依赖消费两个任务的结果,无返回值
public <U> CompletableFuture<Void> thenAcceptBo
thAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action)
//两个任务异步完成(第二个任务用指定线程池执行),fn 再依赖消费两个任务的结果,无返回值
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor)
使用示例
//第一个异步任务,常量任务 CompletableFuture<String> first = CompletableFuture.completedFuture("hello world");ExecutorService executor = Executors.newSingleThreadExecutor();CompletableFuture<Void> future = CompletableFuture//第二个异步任务.supplyAsync(() -> "hello siting", executor)// (w, s) -> System.out.println(s) 是第三个任务.thenAcceptBothAsync(first, (s, w) -> System.out.println(s), executor);executor.shutdown();--------输出结果--------hello siting
两个 CompletableFuture[并行]执行完,然后执行 action,依赖上两个任务的结果,有返回值
//第一个任务完成再运行 other,fn 再依赖消费两个任务的结果,有返回值 public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn)//两个任务异步完成,fn 再依赖消费两个任务的结果,有返回值 public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn)
//两个任务异步完成(第二个任务用指定线程池执行),fn 再依赖消费两个任务的结果,有返回值
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn, Executor executor)
使用示例
//第一个异步任务,常量任务 CompletableFuture<String> first = CompletableFuture.completedFuture("hello world");ExecutorService executor = Executors.newSingleThreadExecutor();CompletableFuture<String> future = CompletableFuture//第二个异步任务.supplyAsync(() -> "hello siting", executor)// (w, s) -> System.out.println(s) 是第三个任务.thenCombineAsync(first, (s, w) -> {System.out.println(s);return "OK";}, executor);System.out.println(future.join());executor.shutdown();--------输出结果--------hello sitingOK
4 、线程并行执行,谁先执行完则谁触发下一任务(二者选其最快)
上一个任务或者 other 任务完成, 运行 action,不依赖前一任务的结果,无返回值
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action)
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action)public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action, Executor executor)
使用示例
//第一个异步任务,休眠 1 秒,保证最晚执行晚 CompletableFuture<String> first = CompletableFuture.supplyAsync(()->{try{ Thread.sleep(1000); }catch (Exception e){}System.out.println("hello world");return "hello world";});ExecutorService executor = Executors.newSingleThreadExecutor();CompletableFuture<Void> future = CompletableFuture//第二个异步任务.supplyAsync(() ->{System.out.println("hello siting");return "hello siting";} , executor)//() -> System.out.println("OK") 是第三个任务.runAfterEitherAsync(first, () -> System.out.println("OK") , executor);executor.shutdown();--------输出结果--------hello sitingOK
上一个任务或者 other 任务完成, 运行 action,依赖最先完成任务的结果,无返回值
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action)public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action, Executor executor)
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action, Executor executor)
使用示例
//第一个异步任务,休眠 1 秒,保证最晚执行晚 CompletableFuture<String> first = CompletableFuture.supplyAsync(()->{try{ Thread.sleep(1000); }catch (Exception e){}return "hello world";});ExecutorService executor = Executors.newSingleThreadExecutor();CompletableFuture<Void> future = CompletableFuture//第二个异步任务.supplyAsync(() -> "hello siting", executor)// data -> System.out.println(data) 是第三个任务.acceptEitherAsync(first, data -> System.out.println(data) , executor);executor.shutdown();--------输出结果--------hello siting
上一个任务或者 other 任务完成, 运行 fn,依赖最先完成任务的结果,有返回值
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn)public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn, Executor executor)
使用示例
//第一个异步任务,休眠 1 秒,保证最晚执行晚 CompletableFuture<String> first = CompletableFuture.supplyAsync(()->{try{ Thread.sleep(1000); }catch (Exception e){}return "hello world";});ExecutorService executor = Executors.newSingleThreadExecutor();CompletableFuture<String> future = CompletableFuture//第二个异步任务.supplyAsync(() -> "hello siting", executor)// data -> System.out.println(data) 是第三个任务.applyToEitherAsync(first, data -> {System.out.println(data);return "OK";} , executor);System.out.println(future);executor.shutdown();--------输出结果--------hello sitingOK
5 、处理任务结果或者异常
exceptionally-处理异常
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)如果之前的处理环节有异常问题,则会触发 exceptionally 的调用相当于 try...catch
使用示例
CompletableFuture<Integer> first = CompletableFuture.supplyAsync(() -> {if (true) {throw new RuntimeException("main error!");}return "hello world";}).thenApply(data -> 1).exceptionally(e -> {e.printStackTrace(); // 异常捕捉处理,前面两个处理环节的日常都能捕获 return 0;});
handle-任务完成或者异常时运行 fn,返回值为 fn 的返回
相比 exceptionally 而言,即可处理上一环节的异常也可以处理其正常返回值 public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor)
使用示例
CompletableFuture<Integer> first = CompletableFuture.supplyAsync(() -> {if (true) { throw new RuntimeException("main error!"); }return "hello world";}).thenApply(data -> 1).handleAsync((data,e) -> {e.printStackTrace(); // 异常捕捉处理 return data;});System.out.println(first.join());--------输出结果--------java.util.concurrent.CompletionException: java.lang.RuntimeException: main error!... 5 morenull
whenComplete-任务完成或者异常时运行 action,有返回值
whenComplete 与 handle 的区别在于,它不参与返回结果的处理,把它当成监听器即可
即使异常被处理,在 CompletableFuture 外层,异常也会再次复现
*使用 whenCompleteAsync 时,返回结果则需要考虑多线程操作问题,毕竟会出现两个线程同时操作一个结果
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action,Executor executor)
使用示例
CompletableFuture<AtomicBoolean> first = CompletableFuture
评论