异步编程利器:CompletableFuture 深度解析
本文已收录至 Github,推荐阅读 👉 Java随想录
微信公众号:Java随想录
摘要
在异步编程中,我们经常需要处理各种异步任务和操作。Java 8 引入的 CompletableFuture 类为我们提供了一种强大而灵活的方式来处理异步编程需求。CompletableFuture 类提供了丰富的方法和功能,能够简化异步任务的处理和组合。
本文将深入解析 CompletableFuture,希望对各位读者能有所帮助。
CompletableFuture 适用于以下场景
并发执行多个异步任务,等待它们全部完成或获取其中任意一个的结果。
对已有的异步任务进行进一步的转换、组合和操作。
异步任务之间存在依赖关系,需要按照一定的顺序进行串行执行。
需要对异步任务的结果进行异常处理、超时控制或取消操作。
如何使用
下面是一个演示 CompletableFuture 如何使用的代码示例:
结果输出:
首先,我们创建了一个CompletableFuture
对象future
。在future
中,我们使用supplyAsync
方法定义了一个异步任务,其中 lambda 表达式 中的代码会在另一个线程中执行。在这个例子中,我们模拟了一个耗时操作,通过TimeUnit.SECONDS.sleep(2)
暂停了 2 秒钟。
然后,我们添加了一个回调方法resultFuture
。在这个回调方法中,将前一个异步任务的结果作为参数进行处理,并返回处理后的新结果。在这个例子中,我们将前一个任务的结果与字符串 "World!" 连接起来,形成新的结果。
接下来,我们使用thenCombine
方法组合了两个CompletableFuture
对象:future
和resultFuture
。在这个组合任务中,我们将两个任务的结果进行组合处理,返回最终的结果。在这个例子中,我们将前两个任务的结果与字符串 " Welcome to the CompletableFuture world!" 连接起来。
此外,我们还处理了异常情况。通过exceptionally
方法,我们定义了一个异常处理回调方法。如果在任务执行过程中发生了异常,我们可以在这里对异常进行处理,并返回一个默认值作为结果。
最后,我们使用get
方法等待并获取最终的任务结果。需要注意的是,get
方法可能会阻塞当前线程,直到任务完成并返回结果。在这个例子中,我们使用try-catch
块捕获可能的异常情况,并打印出最终的任务结果。
这个例子只是部分展示了CompletableFuture
的功能,实际上它比你想象的还要强大!
源码解析
CompletableFuture 的源码非常庞大和复杂,涉及到并发、线程池、同步机制等多方面的知识。在这里,我们只重点介绍 CompletableFuture 的核心实现原理。
基本结构
CompletableFuture 的作者是大名鼎鼎的 Doug Lea。CompletableFuture 类是实现了 Future 和 CompletionStage 接口的一个关键类。它可以表示异步计算的结果,并提供了一系列方法来操作和处理这些结果。
CompletableFuture 内部使用了一个属性result
来保存计算结果,以及若干个属性waiters
来保存等待结果的任务。当计算完成后,CompletableFuture 将会通知所有等待结果的任务,并将结果传递给它们。
为了实现链式操作,CompletableFuture 还定义了内部类:Completion
, UniCompletion
, 和 BiCompletion
。
Completion
, UniCompletion
, 和 BiCompletion
是 CompletableFuture
内部用于处理异步任务完成的辅助类。
Completion
是一个通用的辅助类,它包含了任务完成后的回调方法,以及处理异常的方法。UniCompletion
是Completion
的子类,是一元依赖的基类,用于处理单个任务的完成情况,并提供了更多的方法来处理结果和异常。BiCompletion
是UniCompletion
的子类,是二元依赖的基类,同时也是多元依赖的基类,用于处理两个任务的完成情况,并提供了更多的方法来组合和处理这两个任务的结果和异常。
这些辅助类在 CompletableFuture
的内部被使用,以实现异步任务的执行、结果的处理和组合等操作。它们提供了一种灵活的方式来处理异步任务的完成情况,并通过回调方法或其他一些方法来处理任务的结果和异常。
内部原理
CompletableFuture 中包含两个字段:result 和 stack。result 用于存储当前 CF 的结果,stack (Completion)表示当前 CF 完成后需要触发的依赖动作(Dependency Actions),去触发依赖它的 CF 的计算,依赖动作可以有多个(表示有多个依赖它的 CF),以栈(Treiber stack)的形式存储,stack 表示栈顶元素。
CompletableFuture 在设计思想上类似 “观察者模式,每个 CompletableFuture 都可以被看作一个被观察者,其内部有一个 Completion 类型的链表成员变量 stack,用来存储注册到其中的所有观察者。当被观察者执行完成后会弹栈 stack 属性,依次通知注册到其中的观察者。
执行流程
CompletableFuture 的执行流程如下:
创建 CompletableFuture 对象:通过调用
CompletableFuture
类的构造方法或静态工厂方法创建一个新的 CompletableFuture 对象。定义异步任务:使用
supplyAsync()
、runAsync()
等方法定义需要在后台线程中执行的异步任务,这些方法接受一个 lambda 表达式 或 Supplier/Runnable 接口作为参数。启动异步任务:一旦 CompletableFuture 对象创建并定义了异步任务,任务会立即在后台线程中开始执行,并返回一个代表异步计算结果的 CompletableFuture 对象。
异步任务执行过程:
当异步任务完成时,它会设置自己的结果值,将状态标记为已完成。
如果有其他线程在此之前调用了
complete()
、completeExceptionally()
、cancel()
等方法,可能会影响任务的最终状态。注册回调方法:
使用
thenApply()
,thenAccept()
,thenRun()
等方法来注册回调函数,当异步任务完成或异常时,这些回调函数会被触发。回调函数也可以是异步的,通过
thenApplyAsync()
,thenAcceptAsync()
,thenRunAsync()
等方法注册。组合多个 CompletableFuture:
使用
thenCompose()
,thenCombine()
,allOf()
,anyOf()
等方法,可以将多个 CompletableFuture 对象进行组合,形成更复杂的异步任务处理流程。处理异常:
通过使用
exceptionally()
,handle()
,whenComplete()
等方法,可以注册异常处理函数,当异步任务出现异常时,这些处理函数会被触发。等待结果:
使用
get()
或join()
方法来阻塞当前线程,并等待 CompletableFuture 对象的完成并获取最终的结果。get()
方法会抛出可能的异常(InterruptedException, ExecutionException)。join()
方法与get()
类似,但不会抛出 checked 异常。取消任务:通过调用 CompletableFuture 对象的
cancel()
方法取消异步任务的执行。
请注意,以上步骤的顺序和具体实现可能略有不同,但大致上反映了 CompletableFuture 的执行流程。在实际应用中,我们可以根据需求选择适合的方法来处理异步任务的完成情况、结果、异常以及任务之间的关系。
方法介绍
CompletableFuture 类提供了一系列用于处理和组合异步任务的方法。以下是这些方法的介绍:
创建对象
创建一个 CompletableFuture
对象有以下几种方法:
使用
CompletableFuture
的构造方法
使用
CompletableFuture
的静态工厂方法
使用转换方法
直接创建一个已完成状态的 CompletableFuture
toCompletableFuture
用于将当前的 CompletionStage
对象转换为一个 CompletableFuture
对象。
异步执行任务
以下是在 CompletableFuture
对象上异步执行任务的一些方法示例:
supplyAsync(Supplier<U> supplier)
:异步执行一个有返回值的供应商(Supplier)任务。
runAsync(Runnable runnable)
:异步执行一个没有返回值的任务。
链式操作
CompletableFuture 提供了不同的方式来对异步任务进行链式操作。
thenRun
thenRun
方法用于在 CompletableFuture 完成后执行一个 Runnable 任务。它返回一个新的 CompletableFuture 对象,该对象没有返回值。
thenAccept
thenAccept
方法用于在 CompletableFuture 完成后对结果进行处理。它接收一个 Consumer 函数作为参数,并返回一个新的 CompletableFuture 对象。
thenApply
thenApply
方法用于在 CompletableFuture 完成后对结果进行转换。它接收一个 Function 函数作为参数,并返回一个新的 CompletableFuture 对象。
thenCompose
用于对异步任务的结果进行处理,并返回一个新的异步任务。
whenComplete
用于在异步任务完成后执行指定的动作。它允许你在任务完成时处理结果或处理异常。
thenCompose()
用于对异步任务的结果进行处理,并返回一个新的异步任务。它接受一个函数式接口参数,根据原始任务的结果创建并返回一个新的CompletionStage
对象。whenComplete()
用于在异步任务完成后执行指定的动作。它接受一个消费者函数式接口参数,用于处理任务的结果或异常,但没有返回值。
异步任务组合
CompletableFuture 还提供了一系列方法来组合和处理多个异步任务的结果。
allOf
allOf
方法接收一组 CompletableFuture 对象作为参数,并返回一个新的 CompletableFuture 对象,该对象在所有给定的 CompletableFuture 都完成时完成。这样我们可以等待所有任务都完成后再进行下一步操作。
anyOf
anyOf
方法与allOf
类似,不同之处在于它返回的 CompletableFuture 对象在任何一个给定的 CompletableFuture 完成时就完成。这样我们可以获取最先完成的任务的结果。
thenCombine
thenCombine
方法接收两个 CompletableFuture 对象和一个函数作为参数,用于指定当这两个 CompletableFuture 都完成时如何处理它们的结果。返回的新的 CompletableFuture 对象将接收到计算后的结果。
applyToEither
applyToEither
方法用于获取两个 CompletableFuture 中任意一个完成的结果,并对该结果进行处理。它接收一个 Function 函数作为参数,并返回一个新的 CompletableFuture 对象。
acceptEither
用于在两个 CompletableFuture
对象中任意一个完成时执行指定的操作。该方法接收两个参数:另一个 CompletableFuture
对象和一个消费者函数(Consumer
)。当其中任何一个 CompletableFuture
完成时,将其结果作为参数传递给消费者函数进行处理。
runAfterBoth
用于在两个异步任务都完成后执行指定的动作,需要注意的是,runAfterBoth()
方法是一个非阻塞方法,动作将在两个异步任务都完成后立即执行。
runAfterEither
用于在两个异步任务中任意一个完成后执行指定的动作。
thenAcceptBoth
用于在两个异步任务都完成后执行指定的动作。它的作用是接收两个异步任务的结果,并将结果作为参数传递给指定的消费者函数。
异常处理
CompletableFuture 提供了多种方式来处理异步任务的异常情况。
exceptionally
通过exceptionally
方法,我们可以对 CompletableFuture 的异常情况进行处理。它接收一个 Function 函数作为参数,用于处理异常并返回一个新的 CompletableFuture 对象。
handle
handle
方法可以同时处理正常结果和异常情况。它接收一个 BiFunction 函数作为参数,用于处理结果和异常,并返回一个新的 CompletableFuture 对象。
completeExceptionally
异常地完成 CompletableFuture
,将结果设置为一个异常。
isCompletedExceptionally
该方法返回一个布尔值,表示当前异步任务是否已经异常完成。
obtrudeException
用于强制将指定的异常作为异步任务的结果,调用 obtrudeException(Throwable ex)
方法后,异步任务将立即完成,并将指定的异常作为结果返回。
取值与状态
join
join()
方法不会抛出已检查异常,因为它是基于 CompletableFuture
类设计的,如果异步任务抛出异常,join()
方法会将该异常包装在 CompletionException
中并抛出。
get
get()
方法会抛出一个 InterruptedException
异常和一个 ExecutionException
异常,前者表示获取结果时被中断,后者表示获取结果时任务本身抛出了异常。
有异常则抛出异常,最长等待一个小时,一个小时之后,如果还没有数据,则异常。
getNow
getNow(T value)
是 CompletableFuture
类的一个方法,用于获取异步操作的结果,如果异步操作尚未完成,则返回给定的默认值,该方法会立即返回结果,不会阻塞当前线程。
超时控制与取消操作
CompletableFuture 也支持超时控制和取消操作,以便更好地管理异步任务的执行。
completeOnTimeout
completeOnTimeout
方法在指定的超时时间内等待 CompletableFuture 的完成,如果超时则将其设置为默认结果。它返回一个新的 CompletableFuture 对象。
cancel
cancel
方法可用于取消 CompletableFuture 的执行。它接收一个 boolean 参数,指示是否中断正在执行的任务。返回值表示是否成功取消了任务。
isCancelled
isCancelled()
是 CompletableFuture
类的一个方法,用于判断当前异步任务是否已被取消。如果异步任务已被取消,则返回 true
;否则返回 false
。
依赖
getNumberOfDependents
getNumberOfDependents()
用于获取当前 CompletableFuture
对象所依赖的其他异步任务的数量。如果没有任何依赖任务,或者所有依赖任务已经完成,则返回的数量为 0。
完成
complete
complete(T value)
:该方法返回布尔值,表示是否成功地将结果设置到 CompletableFuture
中。如果 CompletableFuture
未完成,则将结果设置,并返回 true
;如果 CompletableFuture
已经完成,则不进行任何操作并返回 false
。
obtrudeValue
用于强制将指定的值作为异步任务的结果,调用 obtrudeValue(T value)
方法后,异步任务将立即完成,并将指定的值作为结果返回。
与 complete()
不同,obtrudeValue()
必须在任务已经完成的情况下调用,否则会引发 IllegalStateException
异常。并且complete()
方法对于已经完成的任务会忽略额外的完成操作,并返回 false
。而obtrudeValue()
方法即使任务已经完成,仍然会强制使用新的结果值,并返回 true
。
isDone
用于判断当前异步任务是否已经完成(无论是正常完成还是异常完成)。
并发限制
CompletableFuture 也支持并发限制,以控制同时执行的异步任务数量。
我们可以通过使用线程池来限制 CompletableFuture 的并发执行数量。通过创建一个固定大小的线程池,并将其作为参数传递给 CompletableFuture,就可以控制并发执行任务的数量。
记忆窍门
CompletableFuture 类提供了许多方法,但实际上常用的方法只有几个。为了方便记忆,以下是一些总结的规律:
方法名带 Async 的都是异步方法,对应的没有 Async 则是同步方法,比如
thenAccept
与thenAcceptAsync
。方法名带 run 的入参为 Runnable,且无返回值。
方法名带 supply 的入参为 Supplier,且有返回值。
方法名带 Accept 的入参为 Consumer,且无返回值。
方法名带 Apply 的入参为 Function,且有返回值。
方法名带 Either 的方法表示谁先完成就消费谁。
方法名带 Both 的方法表示两个任务都完成才消费。
掌握以上规律后,就可以基本记住大部分方法,剩下的其他方法可以单独记忆。
总结
本文详细探讨了 CompletableFuture 的原理和方法,学习了如何在任务完成后执行操作、处理结果和转换结果。
CompletableFuture 是 Java 中强大的异步编程工具之一,合理利用它的方法和策略可以更好地处理异步任务和操作。
希望本文对读者有所启发和帮助。
版权声明: 本文为 InfoQ 作者【码农BookSea】的原创文章。
原文链接:【http://xie.infoq.cn/article/ed4def6d12da202fe0eb0a762】。文章转载请联系作者。
评论