高并发编程 / 一张图精通 CompletableFuture 整体执行流程与设计 (高手篇)
CompletableFuture
是 Java 8 引入的异步编程工具,它极大地丰富了并发编程的解决方案。作为 Future
的增强版,它不仅支持异步操作的结果管理,还提供了强大的链式调用能力,允许开发者以声明式的方式编排复杂的异步逻辑。CompletableFuture
的出现,使得代码更加简洁、清晰,同时提高了程序的响应性和吞吐量,是现代 Java 并发编程中不可或缺的一部分。
肖哥弹架构 跟大家“弹弹” 高并发锁, 关注公号回复 'mvcc' 获得手写数据库事务代码
欢迎 点赞,关注,评论。
关注公号 Solomon 肖哥弹架构获取更多精彩内容
历史热点文章
0、CompletableFuture 执行流程设计
0、组件设计需求
CompletableFuture
是 Java 8 引入的一个强大的异步编程工具,它位于 java.util.concurrent
包中。以下是 CompletableFuture
的设计背景和主要特点:
设计原有
在 Java 8 之前,异步编程通常依赖于 Future
和 Callable
接口,但这些工具的使用相对繁琐,且不支持链式调用和组合操作。CompletableFuture
的引入旨在提供一种更简洁、更强大的异步编程模型,以支持复杂的异步逻辑和提高代码的可读性及可维护性。
设计特点
异步执行:
CompletableFuture
可以异步执行任务,不会阻塞当前线程,允许任务在另一个线程上执行,而主线程可以继续执行其他任务。组合性:多个
CompletableFuture
实例可以组合在一起,形成复杂的异步流程。例如,一个CompletableFuture
的结果可以作为另一个CompletableFuture
的输入。回调机制:提供了多种方法来注册回调函数,这些回调函数会在未来的某个时刻(如异步操作完成时)被自动调用。
异常处理:提供了一种处理异步任务中抛出的异常的方式,例如使用
exceptionally
方法来指定异常发生时的处理逻辑。可定制的执行器:可以通过指定
Executor
来控制异步任务的执行线程,提供了更好的控制和资源管理能力。支持同步和异步方法:
CompletableFuture
提供了同步和异步两种方法,例如thenApply
和thenApplyAsync
,允许开发者根据需要选择使用同步或异步方式处理结果。
2、CompletableFuture 功能范围
2.1. 异步执行
CompletableFuture
提供了 supplyAsync
和 runAsync
方法,允许异步执行任务。这些方法会将任务提交给 ForkJoinPool
(默认的公共线程池)或其他指定的 Executor
来执行。
2.2. 组合异步任务
CompletableFuture
支持通过 thenApply
、thenAccept
、thenRun
等方法将异步任务的结果传递给后续的任务,实现任务之间的数据流和控制流的组合。
2.3. 错误处理
通过 exceptionally
和 handle
方法,CompletableFuture
允许开发者定义异常处理逻辑,以优雅地处理异步任务中发生的异常。
2.4. 组合多个异步任务
CompletableFuture
提供了 thenCombine
、thenAcceptBoth
、thenCompose
等方法,允许将多个异步任务的结果合并处理。
2.5. 任意一个或全部完成
CompletableFuture
提供了 anyOf
和 allOf
方法,用于处理多个异步任务中的任何一个完成或全部完成的场景。
2.6. 超时和取消
CompletableFuture
支持通过 orTimeout
方法设置超时,以及通过 cancel
方法取消异步任务。
2.7. 结果获取
CompletableFuture
提供了 get
和 join
方法,用于在需要时阻塞当前线程以等待异步任务的结果。
2.8. 非阻塞结果处理
CompletableFuture
的设计允许开发者以非阻塞的方式处理异步任务的结果,这是通过提供回调函数来实现的,这些回调函数会在异步任务完成时被调用。
3、CompletableFuture 设计原则
非阻塞性:
CompletableFuture
的设计避免了在等待异步任务结果时阻塞主线程。可组合性:
CompletableFuture
允许通过链式调用来组合多个异步任务,使得复杂的异步逻辑可以被分解为一系列简单的步骤。灵活性:
CompletableFuture
提供了多种方法来处理异步任务的不同方面,包括执行、组合、异常处理等。健壮性:
CompletableFuture
的设计考虑了错误处理和异常情况,提供了丰富的 API 来处理这些情况。
4、CompletableFuture 方法定义
4.1 常规方法相关执行流程图
4.2. 创建和启动异步计算
CompletableFuture<Void>
runAsync(Runnable runnable)
: 在默认的ForkJoinPool
中异步执行一个任务。CompletableFuture<Void>
runAsync(Runnable runnable, Executor executor)
: 在指定的Executor
中异步执行一个任务。CompletableFuture<U>
supplyAsync(Supplier<U> supplier)
: 在默认的ForkJoinPool
中异步执行一个供应者。CompletableFuture<U>
supplyAsync(Supplier<U> supplier, Executor executor)
: 在指定的Executor
中异步执行一个供应者。
4.2. 完成时的回调
CompletableFuture<T>
whenComplete(BiConsumer<? super T, ? super Throwable> action)
: 当任务完成时(无论成功或失败)执行一个回调。CompletableFuture<T>
whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
: 当任务完成时(无论成功或失败)异步执行一个回调。CompletableFuture<T>
whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)
: 在指定的Executor
中异步执行一个回调。
4.3. 正常完成时的回调
CompletableFuture<T>
thenAccept(Consumer<? super T> action)
: 当任务正常完成时执行一个回调。CompletableFuture<T>
thenAcceptAsync(Consumer<? super T> action)
: 当任务正常完成时异步执行一个回调。CompletableFuture<T>
thenAcceptAsync(Consumer<? super T> action, Executor executor)
: 在指定的Executor
中异步执行一个回调。
4.4. 正常完成时的转换
CompletableFuture<U>
thenApply(Function<? super T, ? extends U> fn)
: 当任务正常完成时,将一个函数应用于结果。CompletableFuture<U>
thenApplyAsync(Function<? super T, ? extends U> fn)
: 当任务正常完成时,异步应用一个函数于结果。CompletableFuture<U>
thenApplyAsync(Function<? super T, ? extends U> fn, Executor executor)
: 在指定的Executor
中异步应用一个函数于结果。
4.5. 组合 CompletableFuture
CompletableFuture<T>
thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
: 当两个任务都完成时,消费它们的结果。CompletableFuture<T>
thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
: 异步消费两个任务的结果。CompletableFuture<T>
thenCombine(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn)
: 当两个任务都完成时,组合它们的结果。CompletableFuture<T>
thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
: 当任务完成时,以它的结果为输入启动另一个CompletableFuture
。CompletableFuture<T>
thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)
: 异步启动另一个CompletableFuture
。
4.6. 异常处理
CompletableFuture<T>
exceptionally(Function<Throwable, ? extends T> fn)
: 当任务异常完成时,应用一个函数来计算结果。
4.7. 完成时的回调(无论成功或失败)
CompletableFuture<T>
handle(BiFunction<? super T, Throwable, ? extends U> fn)
: 当任务完成时(无论成功或失败)执行一个回调。
4.8. 等待其他 CompletableFuture
CompletableFuture<T>
thenRun(Runnable action)
: 当任务完成时执行一个动作。CompletableFuture<T>
thenRunAsync(Runnable action)
: 当任务完成时异步执行一个动作。CompletableFuture<T>
thenRunAsync(Runnable action, Executor executor)
: 在指定的Executor
中异步执行一个动作。
4.9. 任意一个或全部完成
CompletableFuture<T>
acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
: 当当前或另一个任务完成时消费结果。CompletableFuture<T>
acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
: 异步消费结果。CompletableFuture<T>
applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)
: 当当前或另一个任务完成时应用函数。CompletableFuture<T>
applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn)
: 异步应用函数。CompletableFuture<Void>
runAfterBoth(CompletionStage<?> other, Runnable action)
: 当当前和另一个任务都完成时执行动作。CompletableFuture<Void>
runAfterBothAsync(CompletionStage<?> other, Runnable action)
: 异步执行动作。CompletableFuture<Void>
runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor)
: 在指定的Executor
中异步执行动作。CompletableFuture<T>
thenCompose(CompletionStage<Function<? super T, ? extends U>> fn)
: 当任务完成时,以它的结果为输入启动另一个CompletableFuture
。CompletableFuture<U>
thenComposeAsync(CompletionStage<Function<? super T, ? extends U>> fn)
: 异步启动另一个CompletableFuture
。CompletableFuture<Void>
allOf(CompletableFuture<?>... cfs)
: 当所有给定的任务都完成时完成。CompletableFuture<T>
anyOf(CompletableFuture<? extends T>... cfs)
: 当任何一个给定的任务完成时完成。
4.10. 超时和取消
CompletableFuture<T>
orTimeout(long timeout, TimeUnit unit)
: 如果在给定的超时时间内没有完成,则异常完成。CompletableFuture<T>
completeOnTimeout(T value, long timeout, TimeUnit unit)
: 如果在给定的超时时间内没有完成,则以给定的值完成。boolean
cancel(boolean mayInterruptIfRunning)
: 尝试取消执行。
4.11. 结果获取
T get()
: 等待任务完成并获取结果。T join()
: 等待任务完成并获取结果。T getNow(T valueIfAbsent)
: 获取当前结果,如果未完成则返回给定的值。T getThenApplyAsync(Function<? super T, ? extends U> fn)
: 获取结果并异步应用函数。
5、CompletableFuture 应用案例
代码解释:
创建和启动异步计算:使用
supplyAsync
检查库存。正常完成时的转换:使用
thenApplyAsync
创建订单。组合 CompletableFuture:使用
thenComposeAsync
处理支付。异常处理:使用
exceptionally
处理支付过程中的异常。完成时的回调(无论成功或失败) :使用
handle
处理最终结果。等待其他 CompletableFuture:使用
allOf
等待所有任务完成。任意一个或全部完成:使用
anyOf
等待任一任务完成。超时和取消:使用
orTimeout
设置超时。结果获取:使用
thenAccept
打印最终结果。
版权声明: 本文为 InfoQ 作者【肖哥弹架构】的原创文章。
原文链接:【http://xie.infoq.cn/article/a5fba6f851578942329d4189e】。文章转载请联系作者。
评论