vivo 互联网客户端团队- Ruan Wen
本文是 Kotlin 协程解析系列文章的开篇,主要介绍 Kotlin 协程的创建、协程调度与协程挂起相关的内容
一、协程引入
Kotlin 中引入 Coroutine(协程) 的概念,可以帮助编写异步代码。
在使用和分析协程前,首先要了解一下:
协程是什么?
为什么需要协程?
协程最为人称道的就是可以用看起来同步的方式写出异步的代码,极大提高了代码的可读性。在实际开发中最常见的异步操作莫过于网络请求。通常我们需要通过各种回调的方式去处理网络请求,很容易就陷入到地狱回调中。
WalletHttp.target(VCoinTradeSubmitResult.class).setTag(tag)
.setFullUrl(Constants.VCOIN_TRADE_SUBMIT_URL).setParams(params)
.callback(new HttpCallback<VCoinTradeSubmitResult>() {
@Override
public void onSuccess(VCoinTradeSubmitResult vCoinTradeSubmitResult) {
super.onSuccess(vCoinTradeSubmitResult);
if (mView == null) {
return;
}
//......
}
}).post();
复制代码
上述示例是一个项目开发中常见的一个网络请求操作,通过接口回调的方式去获取网络请求结果。实际开发中也会经常遇到连续多个接口请求的情况,例如我们项目中的个人中心页的逻辑就是先去异步获取。
本地缓存,获取失败的话就需要异步刷新一下账号 token,然后网络请求相关个人中心的其他信息。这里简单举一个支付示例,进行支付时,可能要先去获取账号 token,然后依赖该 token 再去做支付。
请求操作,根据支付返回数据再去查询支付结果,这种情况通过回调就可能演变为“地狱回调”。
//获取账号token
WalletHttp.target(Account.class).setTag(tag)
.setFullUrl(Constants.ACCOUNT_URL).setParams(params)
.callback(new HttpCallback<Account>() {
@Override
public void onSuccess(Account account) {
super.onSuccess(account);
//根据账号token进行支付操作
WalletHttp.target(Pay.class).setFullUrl(Constants.PAY_URL).addToken(account.getToken()).callback(new HttpCallback<Pay>() {
@Override
public void onSuccess(Pay pay){
super.onSuccess(pay);
//根据支付操作返回查询支付结果
WalletHttp.target(PayResult.class).setFullUrl(Constants.RESULT_URL).addResultCode(pay.getResultCode()).callback(new HttpCallback<PayResult>() {
@Override
public void onSuccess(PayResult result){
super.onSuccess(result);
//......
}
}).post();
}
}).post();
}
}).post();
复制代码
对于这种场景,kotlin 协程“同步方式写出异步代码”的这个特性就可以很好的解决上述问题。若上述场景用 kotlin 协程代码实现呢,可能就为:
fun postItem(tag: String, params: Map<String, Any?>) = viewModelScope.launch {
// 获取账号信息
val account = repository.queryAccount(tag, params)
// 进行支付操作
val pay = repository.paySubmit(tag,account.token)
//查询支付结果
val result = repository.queryPayResult(tag,pay.resultCode)
//......
}
复制代码
可以看出,协程代码非常简洁,以顺序的方式书写异步代码,代码可读性极强。
如果想要将原先的网络回调请求也改写成这种同步模式呢,只需要对原先请求回调用协程提供的 suspendCancellableCoroutine 等方法进行封装处理,即可让早期的异步代码也享受上述“同步代码”的丝滑。
协程:
一种非抢占式或者协作式的计算机程序并发调度实现,程序可以主动挂起或者恢复执行,其核心点是函数或一段程序能够被挂起,稍后再在挂起的位置恢复,通过主动让出运行权来实现协作,程序自己处理挂起和恢复来实现程序执行流程的协作调度。
协程本质上是轻量级线程。
协程的特点有:
Kotlin 协程实现层次:
基础设施层:标准库的协程 API,主要对协程提供了概念和语义上最基本的支持;
业务框架层:协程的上层框架支持,基于标准库实现的封装,也是我们日常开发使用的协程扩展库。
二、协程启动
具体在使用协程前,首先要配置对 Kotlin 协程的依赖。
(1)项目根目录 build.gradle
buildscript {
...
ext.kotlin_coroutines = 'xxx'
...
}
复制代码
(2)Module 下 build.gradle
dependencies {
...
//协程标准库
implementation "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_coroutines"
//依赖协程核心库,包含协程公共API部分
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlin_coroutines"
//依赖android支持库,协程Android平台的具体实现方式
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:$kotlin_coroutines"
...
}
复制代码
2.1 Thread 启动
在 Java 中,可以通过 Thread 开启并发操作:
new Thread(new Runnable() {
@Override
public void run() {
//... do what you want
}
}).start();
复制代码
在 Kotlin 中,使用线程更为便捷:
val myThread = thread {
//.......
}
复制代码
这个 Thread 方法有个参数 start 默认为 true,即创造出来的线程默认启动,你可以自定义启动时机:
val myThread = thread(start = false) {
//......
}
myThread.start()
复制代码
2.2 协程启动
动协程需要三部分:上下文、启动模式、协程体。
启动方式一般有三种,其中最简单的启动协程的方式为:
GlobalScope.launch {
//......
}
复制代码
GlobalScope.launch()属于协程构建器 Coroutine builders,Kotlin 中还有其他几种 Builders,负责创建协程:
runBlocking:T
使用 runBlocking 顶层函数创建,会创建一个新的协程同时阻塞当前线程,直到协程结束。适用于 main 函数和单元测试
launch
创建一个新的协程,不会阻塞当前线程,必须在协程作用域中才可以调用。它返回的是一个该协程任务的引用,即 Job 对象。这是最常用的启动协程的方式。
async
创建一个新的协程,不会阻塞当前线程,必须在协程作用域中才可以调用,并返回 Deffer 对象。可通过调用 Deffer.await()方法等待该子协程执行完成并获取结果。常用于并发执行-同步等待和获取返回值的情况。
2.2.1 runBlocking
public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T
复制代码
runBlocking 是一个顶层函数,可以在任意地方独立使用。它能创建一个新的协程同时阻塞当前线程,直到其内部所有逻辑以及子协程所有逻辑全部执行完成。常用于 main 函数和测试中。
//main函数中应用
fun main() = runBlocking {
launch { // 创建一个新协程,runBlocking会阻塞线程,但内部运行的协程是非阻塞的
delay(1000L)
println("World!")
}
println("Hello,")
delay(2000L) // 延时2s,保证JVM存活
}
//测试中应用
class MyTest {
@Test
fun testMySuspendingFunction() = runBlocking {
// ......
}
}
复制代码
2.2.2 launch
launch 是最常用的用于启动协程的方式,会在不阻塞当前线程的情况下启动一个协程,并返回对该协程任务的引用,即 Job 对象。
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job
复制代码
协程需要运行在协程上下文环境中,在非协程环境中的 launch 有两种:GlobalScope 与 CoroutineScope 。
在应用范围内启动一个新协程,不会阻塞调用线程,协程的生命周期与应用程序一致。
fun launchTest() {
print("start")
GlobalScope.launch {
delay(1000)//1秒无阻塞延迟
print("GlobalScope.launch")
}
print("end")
}
/** 打印结果
start
end
GlobalScope.launch
*/
复制代码
这种启动的协程存在组件被销毁但协程还存在的情况,一般不推荐。其中 GlobalScope 本身就是一个作用域,launch 属于其子作用域。
启动一个新的协程而不阻塞当前线程,并返回对协程的引用作为一个 Job。
fun launchTest2() {
print("start")
val job = CoroutineScope(Dispatchers.IO).launch {
delay(1000)
print("CoroutineScope.launch")
}
print("end")
}
复制代码
协程上下文控制协程生命周期和线程调度,使得协程和该组件生命周期绑定,组件销毁时,协程一并销毁,从而实现安全可靠地协程调用。这是在应用中最推荐的协程使用方式。
关于 launch,根据业务需求需要创建一个或多个协程,则可能就需要在一个协程中启动子协程。
fun launchTest3() {
print("start")
GlobalScope.launch {
delay(1000)
print("CoroutineScope.launch")
//在协程内创建子协程
launch {
delay(1500)
print("launch 子协程")
}
}
print("end")
}
/**** 打印结果
start
end
CoroutineScope.launch
launch 子协程
*/
复制代码
2.2.3 async
async 类似于 launch,都是创建一个不会阻塞当前线程的新的协程。区别在于:async 的返回是 Deferred 对象,可通过 Deffer.await()等待协程执行完成并获取结果,而 launch 不行。常用于并发执行-同步等待和获取返回值的情况。
public fun <T> CoroutineScope.async(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): Deferred<T>
复制代码
注意:
await() 不能在协程之外调用,因为它需要挂起直到计算完成,而且只有协程可以以非阻塞的方式挂起。所以把它放到协程中。
如果 Deferred 不执行 await()则 async 内部抛出的异常不会被 logCat 或 try Catch 捕获,但是依然会导致作用域取消和异常崩溃; 但当执行 await 时异常信息会重新抛出
如果将 async 函数中的启动模式设置为 CoroutineStart.LAZY 懒加载模式时则只有调用 Deferred 对象的 await 时(或者执行 async.satrt())才会开始执行异步任务。
三、协程补充知识
在叙述协程启动内容,涉及到了 Job、Deferred、启动模式、作用域等概念,这里补充介绍一下上述概念。
3.1 Job
Job 是协程的句柄,赋予协程可取消,赋予协程以生命周期,赋予协程以结构化并发的能力。
Job 是 launch 构建协程返回的一个协程任务,完成时是没有返回值的。可以把 Job 看成协程对象本身,封装了协程中需要执行的代码逻辑,协程的操作方法都在 Job 身上。Job 具有生命周期并且可以取消,它也是上下文元素,继承自 CoroutineContext。
在日常 Android 开发过程中,协程配合 Lifecycle 可以做到自动取消。
Job 生命周期
Job 的生命周期分为 6 种状态,分为
New
Active
Completing
Cancelling
Cancelled
Completed
通常外界会持有 Job 接口作为引用被协程调用者所持有。Job 接口提供 isActive、isCompleted、isCancelled 3 个变量使外界可以感知 Job 内部的状态。
val job = launch(start = CoroutineStart.LAZY) {
println("Active")
}
println("New")
job.join()
println("Completed")
/**打印结果**/
New
Active
Completed
/**********
* 1. 以 lazy 方式创建出来的协程 state 为 New
* 2. 对应的 job 调用 join 函数后,协程进入 Active 状态,并开始执行协程对应的具体代码
* 3. 当协程执行完毕后,由于没有需要等待的子协程,协程直接进入 Completed 状态
*/
复制代码
关于 Job,常用的方法有:
//活跃的,是否仍在执行
public val isActive: Boolean
//启动协程,如果启动了协程,则为true;如果协程已经启动或完成,则为false
public fun start(): Boolean
//取消Job,可通过传入Exception说明具体原因
public fun cancel(cause: CancellationException? = null)
//挂起协程直到此Job完成
public suspend fun join()
//取消任务并等待任务完成,结合了[cancel]和[join]的调用
public suspend fun Job.cancelAndJoin()
//给Job设置一个完成通知,当Job执行完成的时候会同步执行这个函数
public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
复制代码
Job 父子层级
对于 Job,还需要格外关注的是 Job 的父子层级关系。
一个 Job 可以包含多个子 Job。
当父 Job 被取消后,所有的子 Job 也会被自动取消。
当子 Job 被取消或者出现异常后父 Job 也会被取消。
具有多个子 Job 的父 Job 会等待所有子 Job 完成(或者取消)后,自己才会执行完成。
3.2 Deferred
Deferred 继承自 Job,具有与 Job 相同的状态机制。
它是 async 构建协程返回的一个协程任务,可通过调用 await()方法等待协程执行完成并获取结果。其中 Job 没有结果值,Deffer 有结果值。
public interface Deferred<out T> : Job
复制代码
3.3 作用域
协程作用域(CoroutineScope):协程定义的作用范围,本质是一个接口。
确保所有的协程都会被追踪,Kotlin 不允许在没有使用 CoroutineScope 的情况下启动新的协程。CoroutineScope 可被看作是一个具有超能力的 ExecutorService 的轻量级版本。它能启动新的协程,同时这个协程还具备 suspend 和 resume 的优势。
每个协程生成器 launch、async 等都是 CoroutineScope 的扩展,并继承了它的 coroutineContext,自动传播其所有元素和取消。
启动协程需要作用域,但是作用域又是在协程创建过程中产生的。
public interface CoroutineScope {
/**
* 此域的上下文。Context被作用域封装,用于在作用域上扩展的协程构建器的实现。
*/
public val coroutineContext: CoroutineContext
}
复制代码
官方提供的常用作用域:
顶层函数,可启动协程,但会阻塞当前线程
全局协程作用域。通过 GlobalScope 创建的协程不会有父协程,可以把它称为根协程。它启动的协程的生命周期只受整个应用程序的生命周期的限制,且不能取消,在运行时会消耗一些内存资源,这可能会导致内存泄露,不适用于业务开发。
创建一个独立的协程作用域,直到所有启动的协程都完成后才结束自身。
它是一个挂起函数,需要运行在协程内或挂起函数内。当这个作用域中的任何一个子协程失败时,这个作用域失败,所有其他的子协程都被取消。
与 coroutineScope 类似,不同的是子协程的异常不会影响父协程,也不会影响其他子协程。(作用域本身的失败(在 block 或取消中抛出异常)会导致作用域及其所有子协程失败,但不会取消父协程。)
为 UI 组件创建主作用域。一个顶层函数,上下文是 SupervisorJob() + Dispatchers.Main,说明它是一个在主线程执行的协程作用域,通过 cancel 对协程进行取消。
fun scopeTest() {
GlobalScope.launch {//父协程
launch {//子协程
print("GlobalScope的子协程")
}
launch {//第二个子协程
print("GlobalScope的第二个子协程")
}
}
val mainScope = MainScope()
mainScope.launch {//启动协程
//todo
}
}
复制代码
Jetpack 的 Lifecycle 相关组件提供了已经绑定 UV 声明周期的作用域供我们直接使用:
Lifecycle Ktx 库提供的具有生命周期感知的协程作用域,与 Lifecycle 绑定生命周期,生命周期被销毁时,此作用域将被取消。会与当前的 UI 组件绑定生命周期,界面销毁时该协程作用域将被取消,不会造成协程泄漏,推荐使用。
与 lifecycleScope 类似,与 ViewModel 绑定生命周期,当 ViewModel 被清除时,这个作用域将被取消。推荐使用。
3.4 启动模式
前述进行协程创建启动时涉及到了启动模式 CoroutineStart,其是一个枚举类,为协程构建器定义启动选项。在协程构建的 start 参数中使用。
DEFAULT 模式
DEFAULT 是饿汉式启动,launch 调用后,会立即进入待调度状态,一旦调度器 OK 就可以开始执行。
suspend fun main() {
log(1)
val job = GlobalScope.launch{
log(2)
}
log(3)
Thread.sleep(5000) //防止程序退出
}
fun log(o: Any?) {
println("[${Thread.currentThread().name}]:$o")
}
复制代码
前述示例代码采用默认的启动模式和默认的调度器,,运行结果取决于当前线程与后台线程的调度顺序。
/**可能的运行结果一****/
[main]:1
[main]:3
[main]:2
/**可能的运行结果二****/
[main]:1
[main]:2
[main]:3
复制代码
LAZY 模式
LAZY 是懒汉式启动,launch 后并不会有任何调度行为,协程体不会进入执行状态,直到我们需要他的运行结果时进行执行,其 launch 调用后会返回一个 Job 实例。
对于这种情况,可以:
调用 Job.start,主动触发协程的调度执行
调用 Job.join,隐式的触发协程的调度执行
suspend fun main() {
log(1)
val job = GlobalScope.launch(start = CoroutineStart.LAZY){
log(2)
}
log(3)
job.join()
log(4)
}
fun log(o: Any?) {
println("[${Thread.currentThread().name}]:$o")
}
复制代码
对于 join,一定要等待协程执行完毕,所以其运行结果一定为:
[main]:1
[main]:3
[DefaultDispatcher-worker-1]:2
[main]:4
复制代码
如果把 join()换为 start(),则输出结果不一定。
ATOMIC 模式
ATOMIC 只有涉及 cancel 的时候才有意义。调用 cancel 的时机不同,结果也有差异。
suspend fun main() {
log(1)
val job = GlobalScope.launch(start = CoroutineStart.ATOMIC){
log(2)
}
job.cancel()
log(3)
Thread.sleep(2000)
}
fun log(o: Any?) {
println("[${Thread.currentThread().name}]:$o")
}
复制代码
前述代码示例创建协程后立即 cancel,由于是 ATOMIC 模式,因此协程一定会被调度,则 log 1、2、3 一定都会被打印输出。如果将模式改为 DEFAULT 模式,则 log 2 有可能打印输出,也可能不会。
其实 cancel 调用一定会将该 job 的状态置为 cancelling,只不过 ATOMIC 模式的协程在启动时无视了这一状态。
suspend fun main() {
log(1)
val job = GlobalScope.launch(start = CoroutineStart.ATOMIC) {
log(2)
delay(1000)
log(3)
}
job.cancel()
log(4)
job.join()
Thread.sleep(2000)
}
fun log(o: Any?) {
println("[${Thread.currentThread().name}]:$o")
}
/**打印输出结果可能如下****/
[main]:1
[DefaultDispatcher-worker-1]:2
[main]:4
复制代码
前述代码中,2 和 3 中加了一个 delay,delay 会使得协程体的执行被挂起,1000ms 之后再次调度后面的部分。对于 ATOMIC 模式,它一定会被启动,实际上在遇到第一个挂起点之前,它的执行是不会停止的,而 delay 是一个 suspend 函数,这时我们的协程迎来了自己的第一个挂起点,恰好 delay 是支持 cancel 的,因此后面的 3 将不会被打印。
UNDISPATCHED 模式
协程在这种模式下会直接开始在当前线程下执行,直到第一个挂起点。
与 ATOMIC 的不同之处在于 UNDISPATCHED 不经过任何调度器即开始执行协程体。遇到挂起点之后的执行就取决于挂起点本身的逻辑以及上下文当中的调度器了。
suspend fun main() {
log(1)
val job = GlobalScope.launch(start = CoroutineStart.UNDISPATCHED) {
log(2)
delay(100)
log(3)
}
log(4)
job.join()
log(5)
Thread.sleep(2000)
}
fun log(o: Any?) {
println("[${Thread.currentThread().name}]:$o")
}
复制代码
协程启动后会立即在当前线程执行,因此 1、2 会连续在同一线程中执行,delay 是挂起点,因此 3 会等 100ms 后再次调度,这时候 4 执行,join 要求等待协程执行完,因此等 3 输出后再执行 5。
结果如下:
[main]:1
[main]:2
[main]:4
[DefaultDispatcher-worker-1]:3
[DefaultDispatcher-worker-1]:5
复制代码
3.5 withContext
withContext {}不会创建新的协程。在指定协程上运行挂起代码块,放在该块内的任何代码都始终通过 IO 调度器执行,并挂起该协程直至代码块运行完成。
public suspend fun <T> withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T
复制代码
withContext 会使用新指定的上下文的 dispatcher,将 block 的执行转移到指定的线程中。
它会返回结果, 可以和当前协程的父协程存在交互关系, 主要作用为了来回切换调度器。
coroutineScope{
launch(Dispatchers.Main) { // 在 UI 线程开始
val image = withContext(Dispatchers.IO) { // 切换到 IO 线程,并在执行完成后切回 UI 线程
getImage(imageId) // 将会运行在 IO 线程
}
avatarIv.setImageBitmap(image) // 回到 UI 线程更新 UI
}
}
复制代码
四、协程调度
4.1 协程上下文
在协程启动部分提到,启动协程需要三个部分,其中一个部分就是上下文,其接口类型是 CoroutineContext,通常所见的上下文类型是 CombinedContext 或者 EmptyCoroutineContext,一个表示上下文组合,另一个表示空。
协程上下文是 Kotlin 协程的基本结构单元,主要承载着资源获取,配置管理等工作,是执行环境的通用数据资源的统一管理者。除此之外,也包括携带参数,拦截协程执行等,是实现正确的线程行为、生命周期、异常以及调试的关键。
协程使用以下几种元素集定义协程行为,他们均继承自 CoroutineContext:
【Job】:协程的句柄,对协程的控制和管理生命周期。
【CoroutineName】:协程的名称,用于调试
【CoroutineDispatcher】:调度器,确定协程在指定的线程执行
【CoroutineExceptionHandler】:协程异常处理器,处理未捕获的异常
这里回顾一下 launch 和 async 两个函数签名。
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job
public fun <T> CoroutineScope.async(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): Deferred<T>
复制代码
两个函数第一个参数都是 CoroutineContext 类型。
所有协程构建函数都是以 CoroutineScope 的扩展函数的形式被定义的,而 CoroutineScope 的接口唯一成员就是 CoroutineContext 类型。
public interface CoroutineScope {
public val coroutineContext: CoroutineContext
}
复制代码
简而言之,协程上下文是协程必备组成部分,管理了协程的线程绑定、生命周期、异常处理和调试。
4.1.1 协程上下文结构
看一下 CoroutineContext 的接口方法:
public interface CoroutineContext {
//操作符[]重载,可以通过CoroutineContext[Key]这种形式来获取与Key关联的Element
public operator fun <E : Element> get(key: Key<E>): E?
//提供遍历CoroutineContext中每一个Element的能力,并对每一个Element做operation操作
public fun <R> fold(initial: R, operation: (R, Element) -> R): R
//操作符+重载,可以CoroutineContext + CoroutineContext这种形式把两个CoroutineContext合并成一个
public operator fun plus(context: CoroutineContext): CoroutineContext = .......
//返回一个新的CoroutineContext,这个CoroutineContext删除了Key对应的Element
public fun minusKey(key: Key<*>): CoroutineContext
//Key定义,空实现,仅仅做一个标识
public interface Key<E : Element>
///Element定义,每个Element都是一个CoroutineContext
public interface Element : CoroutineContext {
//每个Element都有一个Key实例
public val key: Key<*>
......
}
}
复制代码
Element:协程上下文的一个元素,本身就是一个单例上下文,里面有一个 key,是这个元素的索引。
可知,Element 本身也实现了 CoroutineContext 接口。
这里我们再看一下官方解释:
/**
Persistent context for the coroutine. It is an indexed set of [Element] instances.
An indexed set is a mix between a set and a map.
Every element in this set has a unique [Key].*/
从官方解释可知,CoroutineContext 是一个 Element 的集合,这种集合被称为 indexed set,介于 set 和 map 之间的一种结构。set 意味着其中的元素有唯一性,map 意味着每个元素都对应一个键。
如果将协程上下文内部的一系列上下文称为子上下文,上下文为每个子上下文分配了一个 Key,它是一个带有类型信息的接口。
这个接口通常被实现为 companion object。
//Job
public interface Job : CoroutineContext.Element {
/**
* Key for [Job] instance in the coroutine context.
*/
public companion object Key : CoroutineContext.Key<Job>
}
//拦截器
public interface ContinuationInterceptor : CoroutineContext.Element {
/**
* The key that defines *the* context interceptor.
*/
companion object Key : CoroutineContext.Key<ContinuationInterceptor>
}
//协程名
public data class CoroutineName(
val name: String
) : AbstractCoroutineContextElement(CoroutineName) {
/**
* Key for [CoroutineName] instance in the coroutine context.
*/
public companion object Key : CoroutineContext.Key<CoroutineName>
}
//异常处理器
public interface CoroutineExceptionHandler : CoroutineContext.Element {
/**
* Key for [CoroutineExceptionHandler] instance in the coroutine context.
*/
public companion object Key : CoroutineContext.Key<CoroutineExceptionHandler>
}
复制代码
源码中定义的子上下文,都会在内部声明一个静态的 Key,类内部的静态变量意味着被所有类实例共享,即全局唯一的 Key 实例可以对应多个子上下文实例。
在一个类似 map 的结构中,每个键必须是唯一的,因为对相同的键 put 两次值,新值会代替旧值。通过上述方式,通过键的唯一性保证了上下文中的所有子上下文实例都是唯一的。
我们按照这个格式仿写一下然后反编译。
class MyElement :AbstractCoroutineContextElement(MyElement) {
companion object Key : CoroutineContext.Key<MyElement>
}
//反编译的java文件
public final class MyElement extends AbstractCoroutineContextElement {
@NotNull
public static final MyElement.Key Key = new MyElement.Key((DefaultConstructorMarker)null);
public MyElement() {
super((kotlin.coroutines.CoroutineContext.Key)Key);
}
public static final class Key implements kotlin.coroutines.CoroutineContext.Key {
private Key() {
}
// $FF: synthetic method
public Key(DefaultConstructorMarker $constructor_marker) {
this();
}
}
}
复制代码
对比 kt 和 Java 文件,可以看到 Key 就是一个静态变量,且其实现类未做处理,作用与 HashMap 中的 Key 类似。
Key 是静态变量,全局唯一,为 Element 提供唯一性保障。
前述内容总结如下:
协程上下文是一个元素的集合,单个元素本身也是一个上下文,其定义是递归的,自己包含若干个自己。
协程上下文这个集合有点像 set 结构,其中的元素都是唯一的,不重复的。其通过给每一个元素配有一个静态的键实例,构成一组键值对的方式实现。这使其类似 map 结构。这种介于 set 和 map 之间的结构称为 indexed set。
CoroutineContext.get()获取元素
关于 CoroutineContext,我们先看一下其是如何取元素的。
这里看一下 Element、CombinedContext、EmptyCoroutineContext 的内部实现,其中 CombinedContext 就是 CoroutineContext 集合结构的实现,EmptyCoroutineContext 就表示一个空的 CoroutineContext,它里面是空实现。
@SinceKotlin("1.3")
internal class CombinedContext(
//左上下文
private val left: CoroutineContext,
//右元素
private val element: Element
) : CoroutineContext, Serializable {
override fun <E : Element> get(key: Key<E>): E? {
var cur = this
while (true) {
//如果输入 key 和右元素的 key 相同,则返回右元素
cur.element[key]?.let { return it }
// 若右元素不匹配,则向左继续查找
val next = cur.left
if (next is CombinedContext) {
cur = next
} else { // 若左上下文不是混合上下文,则结束递归
return next[key]
}
}
}
......
}
public interface Element : CoroutineContext {
public val key: Key<*>
public override operator fun <E : Element> get(key: Key<E>): E? =
@Suppress("UNCHECKED_CAST")
// 如果给定键和元素本身键相同,则返回当前元素,否则返回空
if (this.key == key) this as E else null
......
}
public object EmptyCoroutineContext : CoroutineContext, Serializable {
//返回空
public override fun <E : Element> get(key: Key<E>): E? = null
}
复制代码
通过 Key 检索 Element,返回值只能是 Element 或 null,链表节点中的元素值,其中 CombinedContext 利用 while 循环实现了类似递归的效果,其中较早被遍历到的元素自然具有较高的优先级。
//使用示例
println(coroutineContext[CoroutineName])
println(Dispatchers.Main[CoroutineName])
复制代码
CoroutineContext.minusKey()删除元素
同理看一下 Element、CombinedContext、EmptyCoroutineContext 的内部实现。
internal class CombinedContext(
//左上下文
private val left: CoroutineContext,
//右元素
private val element: Element
) : CoroutineContext, Serializable {
public override fun minusKey(key: Key<*>): CoroutineContext {
//如果element就是要删除的元素,返回left,否则说明要删除的元素在left中,继续从left中删除对应的元素
element[key]?.let { return left }
//在左上下文中去掉对应元素
val newLeft = left.minusKey(key)
return when {
//如果left中不存在要删除的元素,那么当前CombinedContext就不存在要删除的元素,直接返回当前CombinedContext实例
newLeft === left -> this
//如果left中存在要删除的元素,删除了这个元素后,left变为了空,那么直接返回当前CombinedContext的element就行
newLeft === EmptyCoroutineContext -> element
//如果left中存在要删除的元素,删除了这个元素后,left不为空,那么组合一个新的CombinedContext返回
else -> CombinedContext(newLeft, element)
}
}
......
}
public object EmptyCoroutineContext : CoroutineContext, Serializable {
public override fun minusKey(key: Key<*>): CoroutineContext = this
......
}
public interface Element : CoroutineContext {
//如果key和自己的key匹配,那么自己就是要删除的Element,返回EmptyCoroutineContext(表示删除了自己),否则说明自己不需要被删除,返回自己
public override fun minusKey(key: Key<*>): CoroutineContext =
if (this.key == key) EmptyCoroutineContext else this
......
}
复制代码
如果把 CombinedContext 和 Element 结合来看,那么 CombinedContext 的整体结构如下:
其结构类似链表,left 就是指向下一个结点的指针,get、minusKey 操作大体逻辑都是先访问当前 element,不满足,再访问 left 的 element,顺序都是从 right 到 left。
CoroutineContext.fold()元素遍历
internal class CombinedContext(
//左上下文
private val left: CoroutineContext,
//右元素
private val element: Element
) : CoroutineContext, Serializable {
//先对left做fold操作,把left做完fold操作的的返回结果和element做operation操作
public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =
operation(left.fold(initial, operation), element)
......
}
public object EmptyCoroutineContext : CoroutineContext, Serializable {
public override fun <R> fold(initial: R, operation: (R, Element) -> R): R = initial
......
}
public interface Element : CoroutineContext {
//对传入的initial和自己做operation操作
public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =
operation(initial, this)
......
}
复制代码
fold 也是递归的形式操作,fold 的操作大体逻辑是:先访问 left,直到递归到最后的 element,然后再从 left 到 right 的返回,从而访问了所有的 element。
CoroutineContext.plus()添加元素
关于 CoroutineContext 的元素添加方法,直接看其 plus()实现,也是唯一没有被重写的方法。
public operator fun plus(context: CoroutineContext): CoroutineContext =
//如果要相加的CoroutineContext为空,那么不做任何处理,直接返回
if (context === EmptyCoroutineContext) this else
//如果要相加的CoroutineContext不为空,那么对它进行fold操作,可以把acc理解成+号左边的CoroutineContext,element理解成+号右边的CoroutineContext的某一个element
context.fold(this) { acc, element ->
//首先从左边CoroutineContext中删除右边的这个element
val removed = acc.minusKey(element.key)
//如果removed为空,说明左边CoroutineContext删除了和element相同的元素后为空,那么返回右边的element即可
if (removed === EmptyCoroutineContext) element else {
//如果removed不为空,说明左边CoroutineContext删除了和element相同的元素后还有其他元素,那么构造一个新的CombinedContext返回
val interceptor = removed[ContinuationInterceptor]
if (interceptor == null) CombinedContext(removed, element) else {
val left = removed.minusKey(ContinuationInterceptor)
if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
CombinedContext(CombinedContext(left, element), interceptor)
}
}
}
复制代码
plus 方法大部分情况下返回一个 CombinedContext,即我们把两个 CoroutineContext 相加后,返回一个 CombinedContext,在组合成 CombinedContext 时,+号右边的 CoroutineContext 中的元素会覆盖+号左边的 CoroutineContext 中的含有相同 key 的元素。
这个覆盖操作就在 fold 方法的参数 operation 代码块中完成,通过 minusKey 方法删除掉重复元素。
plus 方法中可以看到里面有个对 ContinuationInterceptor 的处理,目的是让 ContinuationInterceptor 在每次相加后都能变成 CoroutineContext 中的最后一个元素。
ContinuationInterceptor 继承自 Element,称为协程上下文拦截器,作用是在协程执行前拦截它,从而在协程执行前做出一些其他的操作。通过把 ContinuationInterceptor 放在最后面,协程在查找上下文的 element 时,总能最快找到拦截器,避免了递归查找,从而让拦截行为前置执行。
4.1.2 CoroutineName
public data class CoroutineName(
val name: String
) : AbstractCoroutineContextElement(CoroutineName) {
复制代码
CoroutineName 是用户用来指定的协程名称的,用于方便调试和定位问题。
GlobalScope.launch(CoroutineName("GlobalScope")) {
launch(CoroutineName("CoroutineA")) {//指定协程名称
val coroutineName = coroutineContext[CoroutineName]//获取协程名称
print(coroutineName)
}
}
/** 打印结果
CoroutineName(CoroutineA)
*/
复制代码
协程内部可以通过 coroutineContext 这个全局属性直接获取当前协程的上下文。
4.1.3 上下文组合
如果要传递多个上下文元素,CoroutineContext 可以使用"+"运算符进行合并。由于 CoroutineContext 是由一组元素组成的,所以加号右侧的元素会覆盖加号左侧的元素,进而组成新创建的 CoroutineContext。
GlobalScope.launch {
//通过+号运算添加多个上下文元素
var context = CoroutineName("协程1") + Dispatchers.Main
print("context == $context")
context += Dispatchers.IO //添加重复Dispatchers元素,Dispatchers.IO 会替换 ispatchers.Main
print("context == $context")
val contextResult = context.minusKey(context[CoroutineName]!!.key)//移除CoroutineName元素
print("contextResult == $contextResult")
}
/**打印结果
context == [CoroutineName(协程1), Dispatchers.Main]
context == [CoroutineName(协程1), Dispatchers.IO]
contextResult == Dispatchers.IO
*/
复制代码
如果有重复的元素(key 一致)则右边的会代替左边的元素,相关原理参看协程上下文结构章节。
4.1.4 CoroutineScope 构建
CoroutineScope 实际上是一个 CoroutineContext 的封装,当我们需要启动一个协程时,会在 CoroutineScope 的实例上调用构建函数,如 async 和 launch。
在构建函数中,一共出现了 3 个 CoroutineContext。
查看协程构建函数 async 和 launch 的源码,其第一行都是如下代码:
val newContext = newCoroutineContext(context)
复制代码
进一步查看:
@ExperimentalCoroutinesApi
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
val combined = coroutineContext + context //CoroutineContext拼接组合
val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
debug + Dispatchers.Default else debug
}
复制代码
构建器内部进行了一个 CoroutineContext 拼接操作,plus 左值是 CoroutineScope 内部的 CoroutineContext,右值是作为构建函数参数的 CoroutineContext。
抽象类 AbstractCoroutineScope 实现了 CoroutineScope 和 Job 接口。大部分 CoroutineScope 的实现都继承自 AbstractCoroutineScope,意味着他们同时也是一个 Job。
public abstract class AbstractCoroutine<in T>(
parentContext: CoroutineContext,
initParentJob: Boolean,
active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
/**
* The context of this coroutine that includes this coroutine as a [Job].
*/
public final override val context: CoroutineContext = parentContext + this
//重写了父类的coroutineContext属性
public override val coroutineContext: CoroutineContext get() = context
}
复制代码
从上述分析可知:coroutine context = parent context + coroutine job
4.1.5 典型用例
全限定 Context
launch( Dispatchers.Main + Job() + CoroutineName("HelloCoroutine") + CoroutineExceptionHandler { _, _ -> /* ... */ }) {
/* ... */
}
复制代码
全限定 Context,即全部显式指定具体值的 Elements。不论你用哪一个 CoroutineScope 构建该协程,它都具有一致的表现,不会受到 CoroutineScope 任何影响。
CoroutineScope Context
基于 Activity 生命周期实现一个 CoroutineScope
abstract class ScopedAppActivity:
AppCompatActivity(),
CoroutineScope
{
protected lateinit var job: Job
override val coroutineContext: CoroutineContext
get() = job + Dispatchers.Main // 注意这里使用+拼接CoroutineContext
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
job = Job()
}
override fun onDestroy() {
super.onDestroy()
job.cancel()
}
}
复制代码
Dispatcher:使用 Dispatcher.Main,以在 UI 线程进行绘制
Job:在 onCreate 时构建,在 onDestroy 时销毁,所有基于该 CoroutineContext 创建的协程,都会在 Activity 销毁时取消,从而避免 Activity 泄露的问题
临时指定参数
CoroutineContext 的参数主要有两个来源:从 scope 中继承+参数指定。我们可以用 withContext 便捷地指定某个参数启动子协程,例如我们想要在协程内部执行一个无法被取消的子协程:
withContext(NonCancellable) {
/* ... */
}
复制代码
读取协程上下文参数
通过顶级挂起只读属性 coroutineContext 获取协程上下文参数,它位于 kotlin-stdlib / kotlin.coroutines / coroutineContext
println("Running in ${coroutineContext[CoroutineName]}")
复制代码
Nested Context 内嵌上下文
内嵌上下文切换:在协程 A 内部构建协程 B 时,B 会自动继承 A 的 Dispatcher。
可以在调用 async 时加入 Dispatcher 参数,切换到工作线程
// 错误的做法,在主线程中直接调用async,若耗时过长则阻塞UI
GlobalScope.launch(Dispatchers.Main) {
val deferred = async {
/* ... */
}
/* ... */
}
// 正确的做法,在工作线程执行协程任务
GlobalScope.launch(Dispatchers.Main) {
val deferred = async(Dispatchers.Default) {
/* ... */
}
/* ... */
}
复制代码
4.2 协程拦截器
@SinceKotlin("1.3")
public interface ContinuationInterceptor : CoroutineContext.Element {
companion object Key : CoroutineContext.Key<ContinuationInterceptor>
public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
//......
}
复制代码
无论在 CoroutineContext 后面 放了多少个拦截器,Key 为 ContinuationInterceptor 的拦截器只能有一个。
Continuation 在调用其 Continuation#resumeWith() 方法,会执行其 suspend 修饰的函数的代码块,如果我们提前拦截到,可以做点其他事情,比如说切换线程,这是 ContinuationInterceptor 的主要作用。
协程的本质就是回调,这个回调就是被拦截的 Continuation。OkHttp 用拦截器做缓存,打日志,模拟请求等,协程拦截器同理。
我们通过 Dispatchers 来指定协程发生的线程,Dispatchers 实现了 ContinuationInterceptor 接口。
这里我们自定义一个拦截器放到协程上下文,看一下会发生什么。
class MyContinuationInterceptor: ContinuationInterceptor{
override val key = ContinuationInterceptor
override fun <T> interceptContinuation(continuation: Continuation<T>) = MyContinuation(continuation)
}
class MyContinuation<T>(val continuation: Continuation<T>): Continuation<T> {
override val context = continuation.context
override fun resumeWith(result: Result<T>) {
log("<MyContinuation> $result" )
continuation.resumeWith(result)
}
}
suspend fun main(args: Array<String>) { // start main coroutine
GlobalScope.launch(MyContinuationInterceptor()) {
log(1)
val job = async {
log(2)
delay(1000)
log(3)
"Hello"
}
log(4)
val result = job.await()
log("5. $result")
}.join()
log(6)
}
fun log(o: Any?) {
println("[${Thread.currentThread().name}]:$o")
}
复制代码
/******打印结果******/
[main]:<MyContinuation> Success(kotlin.Unit) //11
[main]:1
[main]:<MyContinuation> Success(kotlin.Unit) //22
[main]:2
[main]:4
[kotlinx.coroutines.DefaultExecutor]:<MyContinuation> Success(kotlin.Unit) //33
[kotlinx.coroutines.DefaultExecutor]:3
[kotlinx.coroutines.DefaultExecutor]:<MyContinuation> Success(Hello)
[kotlinx.coroutines.DefaultExecutor]:5. Hello
[kotlinx.coroutines.DefaultExecutor]:6
复制代码
前述分析 CoroutineContext 的 plus 方法涉及到了 ContinuationInterceptor,plus 每次都会将 ContinuationInterceptor 添加到拼接链的尾部,这里再详细解释一下原因。
public operator fun plus(context: CoroutineContext): CoroutineContext =
//如果要相加的CoroutineContext为空,那么不做任何处理,直接返回
if (context === EmptyCoroutineContext) this else
//如果要相加的CoroutineContext不为空,那么对它进行fold操作,可以把acc理解成+号左边的CoroutineContext,element理解成+号右边的CoroutineContext的某一个element
context.fold(this) { acc, element ->
//首先从左边CoroutineContext中删除右边的这个element
val removed = acc.minusKey(element.key)
//如果removed为空,说明左边CoroutineContext删除了和element相同的元素后为空,那么返回右边的element即可
if (removed === EmptyCoroutineContext) element else {
//如果removed不为空,说明左边CoroutineContext删除了和element相同的元素后还有其他元素,那么构造一个新的CombinedContext返回
val interceptor = removed[ContinuationInterceptor]
if (interceptor == null) CombinedContext(removed, element) else {
val left = removed.minusKey(ContinuationInterceptor)
if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
CombinedContext(CombinedContext(left, element), interceptor)
}
}
}
复制代码
原因一:CombinedContext 的结构决定。
其有两个元素,left 是一个前驱集合,element 为一个纯粹 CoroutineContext,它的 get 方法每次都是从 element 开始进行查找对应 Key 的 CoroutineContext 对象;没有匹配到才会去 left 集合中进行递归查找。为了加快查找 ContinuationInterceptor 类型的实例,才将它加入到拼接链的尾部,对应的就是 element。
原因二:ContinuationInterceptor 使用很频繁
每次创建协程都会去尝试查找当前协程的 CoroutineContext 中是否存在 ContinuationInterceptor。这里我们用 launch 来验证一下
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.()
->
Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
复制代码
如果使用的 launch 使用的是默认参数,此时 Coroutine 就是 StandaloneCoroutine,然后调用 start 方法启动协程。
start(block, receiver, this)
}
public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit =
when (this) {
DEFAULT -> block.startCoroutineCancellable(completion)
ATOMIC -> block.startCoroutine(completion)
UNDISPATCHED -> block.startCoroutineUndispatched(completion)
LAZY -> Unit // will start lazily
}
复制代码
如果我们使用默认参数,看一下默认参数对应执行的 block.startCoroutineCancellable(completion)
public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) {
createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))
}
复制代码
首先通过 createCoroutineUnintercepted 来创建一个协程
然后再调用 intercepted 方法进行拦截操作
最后调用 resumeCancellable,即 Continuation 的 resumeWith 方法,启动协程,所以每次启动协程都会自动回调一次 resumeWith 方法
这里看一下 intercepted
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
(this as? ContinuationImpl)?.intercepted() ?: this
复制代码
看其在 ContinuationImpl 的 intercepted 方法实现
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
复制代码
首先获取到 ContinuationInterceptor 实例
然后调用它的 interceptContinuation 方法返回一个处理过的 Continuation(多次调用 intercepted,对应的 interceptContinuation 只会调用一次)
至此可知,ContinuationInterceptor 的拦截是通过 interceptContinuation 方法进行
下面再看一个 ContinuationInterceptor 的典型示例
val interceptor = object : ContinuationInterceptor {
override val key: CoroutineContext.Key<*> = ContinuationInterceptor
override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> {
println("intercept todo something. change run to thread")
return object : Continuation<T> by continuation {
override fun resumeWith(result: Result<T>) {
println("create new thread")
thread {
continuation.resumeWith(result)
}
}
}
}
}
println(Thread.currentThread().name)
lifecycleScope.launch(interceptor) {
println("launch start. current thread: ${Thread.currentThread().name}")
withContext(Dispatchers.Main) {
println("new continuation todo something in the main thread. current thread: ${Thread.currentThread().name}")
}
launch {
println("new continuation todo something. current thread: ${Thread.currentThread().name}")
}
println("launch end. current thread: ${Thread.currentThread().name}")
}
复制代码
/******打印结果******/
main
// 第一次launch
intercept todo something. change run to thread
create new thread
launch start. current thread: Thread-2
new continuation todo something in the main thread. current thread: main
create new thread
// 第二次launch
intercept todo something. change run to thread
create new thread
launch end. current thread: Thread-7
new continuation todo something. current thread: Thread-8
复制代码
首先程序运行在 main 线程,启动协程时将自定义的 interceptor 加入到上下文中,协程启动时进行拦截,将在 main 线程运行的程序切换到新的 thread 线程
withContext 没有拦截成功,具体原因在下面的调度器再详细解释,简单来说就是我们自定义的 interceptor 被替换了。
launch start 与 launch end 所处的线程不一样,因为 withContext 结束之后,它内部还会进行一次线程恢复,将自身所处的 main 线程切换到之前的线程。协程每一个挂起后恢复都是通过回调 resumeWith 进行的,然而外部 launch 协程我们进行了拦截,在它返回的 Continuation 的 resumeWith 回调中总是会创建新的 thread。
4.3 调度器
CoroutineDispatcher 调度器指定指定执行协程的目标载体,它确定了相关的协程在哪个线程或哪些线程上执行。可以将协程限制在一个特定的线程执行,或将它分派到一个线程池,亦或是让它不受限地运行。
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
//将可运行块的执行分派到给定上下文中的另一个线程上
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
//返回一个continuation,它封装了提供的[continuation],拦截了所有的恢复
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
//......
}
复制代码
协程需要调度的位置就是挂起点的位置,只有当挂起点正在挂起的时候才会进行调度,实现调度需要使用协程的拦截器。
调度的本质就是解决挂起点恢复之后的协程逻辑在哪里运行的问题。调度器也属于协程上下文一类,它继承自拦截器。
【val Default】: CoroutineDispatcher
【val Main】: MainCoroutineDispatcher
【val Unconfined】: CoroutineDispatcher
IO 仅在 Jvm 上有定义,它基于 Default 调度器背后的线程池,并实现了独立的队列和限制,因此协程调度器从 Default 切换到 IO 并不会触发线程切换
关于调度器介绍到这里,还没有详细解释前述协程拦截器中的 withContext 为什么拦截失败。这里针对这个详细看一下源码实现。
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
复制代码
其返回类型为 MainCoroutineDispatcher,继承自 CoroutineDispatcher。
public abstract class MainCoroutineDispatcher : CoroutineDispatcher()
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
public open fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = dispatch(context, block)
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
......
}
复制代码
CoroutineDispatch 实现了 ContinuationInterceptor,根据前述解释的 CoroutineContext 结构,可知我们自定义的拦截器没有生效是因为被替换了。
CoroutineDispatch 中的 isDispatchNeeded 就是判断是否需要分发,然后 dispatch 就是执行分发。
ContinuationInterceptor 重要的方法就是 interceptContinuation,在 CoroutineDispatcher 中直接返回了 DispatchedContinuation 对象,它是一个 Continuation 类型,看一下其 resumeWith 实现。
override fun resumeWith(result: Result<T>) {
val context = continuation.context
val state = result.toState()
//判断是否需要分发
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_ATOMIC
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_ATOMIC) {
withCoroutineContext(this.context, countOrElement) {
//不需要分发,直接使用原先的continuation对象的resumewith
continuation.resumeWith(result)
}
}
}
}
复制代码
那么分发的判断逻辑是怎么实现的?这要根据具体的 dispatcher 来看。
如果我们拿的是 Dispatchers.Main,其 dispatcher 为 HandlerContext。
internal class HandlerContext private constructor(
private val handler: Handler,
private val name: String?,
private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
override fun isDispatchNeeded(context: CoroutineContext): Boolean {
return !invokeImmediately || Looper.myLooper() != handler.looper
}
override fun dispatch(context: CoroutineContext, block: Runnable) {
if (!handler.post(block)) {
cancelOnRejection(context, block)
}
}
......
复制代码
其中 HandlerContext 继承于 HandlerDispatcher,而 HandlerDispatcher 继承于 MainCoroutineDispatcher
Dispatcher 的基本实现原理大致为:
首先在协程进行启动的时候通过拦截器的方式进行拦截,对应的方法是 interceptContinuation
然后返回一个具有切换线程功能的 Continuation
在每次进行 resumeWith 的时候,内部再通过 isDispatchNeeded 进行判断当前协程的运行是否需要切换线程。
如果需要则调用 dispatch 进行线程的切换,保证协程的正确运行。如果要自定义协程线程的切换,可以通过继承 CoroutineDispatcher 来实现。
这里再简单看一下 WithContext,我们都知道其不仅可以接受 CoroutineDispatcher 来帮助我们切换线程,同时在执行完毕之后还会帮助我们将之前切换掉的线程进恢复,保证协程运行的连贯性。那这是怎么实现的呢?
withContext 的线程恢复原理是它内部生成了一个 DispatchedCoroutine,保存切换线程时的 CoroutineContext 与切换之前的 Continuation,最后在 onCompletionInternal 进行恢复。我们简单翻一翻其源码实现。
public suspend fun <T> withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
// 创建新的CoroutineContext
val oldContext = uCont.context
val newContext = oldContext + context
......
//使用新的Dispatcher,覆盖外层
val coroutine = DispatchedCoroutine(newContext, uCont)
block.startCoroutineCancellable(coroutine, coroutine)
coroutine.getResult()
}
}
复制代码
internal class DispatchedCoroutine<in T>(
context: CoroutineContext,
uCont: Continuation<T>
) : ScopeCoroutine<T>(context, uCont) {
//在complete时会会回调
override fun afterCompletion(state: Any?) {
// Call afterResume from afterCompletion and not vice-versa, because stack-size is more
// important for afterResume implementation
afterResume(state)
}
override fun afterResume(state: Any?) {
////uCont就是父协程,context仍是老版context,因此可以切换回原来的线程上
if (tryResume()) return // completed before getResult invocation -- bail out
// Resume in a cancellable way because we have to switch back to the original dispatcher
uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont))
}
......
}
复制代码
对于 withContext,传入的 context 会覆盖外层的拦截器并生成一个 newContext,因此可以实现线程切换。
DispatchedCoroutine 作为 complete 传入协程体的创建函数中,因此协程体执行完成后会回调到 afterCompletion 中。
DispatchedCoroutine 中传入的 uCont 是父协程,它的拦截器仍是外层的拦截器,因此会切换回原来的线程中。
4.3.1 典型用例
例如:点击一个按钮,进行异步操作后再回调刷新 UI
getUserBtn.setOnClickListener {
getUser { user ->
handler.post {
userNameView.text = user.name
}
}
}
typealias Callback = (User) -> Unit
fun getUser(callback: Callback){
...
}
复制代码
由于 getUser 函数需要切到其他线程执行,因此回调通常也会在这个非 UI 的线程中调用,所以为了确保 UI 正确被刷新,我们需要用 handler.post 切换到 UI 线程。
如果要用协程实现呢?
suspend fun getUserCoroutine() = suspendCoroutine<User> {
continuation ->
getUser {
continuation.resume(it)
}
}
getUserBtn.setOnClickListener {
GlobalScope.launch(Dispatchers.Main) {
userNameView.text = getUserCoroutine().name
}
}
复制代码
suspendCoroutine 这个方法并不是帮我们启动协程的,它运行在协程当中并且帮我们获取到当前协程的 Continuation 实例,也就是拿到回调,方便后面我们调用它的 resume 或者 resumeWithException 来返回结果或者抛出异常。
4.3.2 线程绑定
调度器的目的就是切线程,我们只要提供线程,调度器就应该很方便的创建出来。
suspend fun main() {
val myDispatcher= Executors.newSingleThreadExecutor{ r -> Thread(r, "MyThread") }.asCoroutineDispatcher()
GlobalScope.launch(myDispatcher) {
log(1)
}.join()
log(2)
}
复制代码
由于这个线程池是我们自己创建的,因此我们需要在合适的时候关闭它。
除了上述的方法,kotlin 协程还给出了更简单的 api,如下:
GlobalScope.launch(newSingleThreadContext("Dispather")) {
//......
}.join()
复制代码
前述我们是通过线程的方式,同理可以通过线程池转为调度器实现。
Executors.newScheduledThreadPool(10)
.asCoroutineDispatcher().use { dispatcher ->
GlobalScope.launch(dispatcher) {
//......
}.join
复制代码
五、协程挂起
在前述协程时,经常会出现 suspend 关键字和挂起的说法,其含义和用法是什么?一起深入看一下。
5.1 概述
suspend 翻译过来就是中断、挂起,用在函数声明前,起到挂起协程的标识,本质作用是代码调用时为方法添加一个 Continuation 类型的参数,保证协程中 Continuation 的上下传递。
挂起函数只能在协程或另一个挂起函数中被调用,如果你在非协程中使用到了挂起函数,会报错。
阻塞:
函数 A 必须在函数 B 之前完成执行,线程被锁定以便函数 A 能够完成其执行
挂起:
函数 A 虽然已经启动,但可以暂停,让函数 B 执行,然后只在稍后恢复。线程没有被函数 A 锁定。
“挂起”是指协程从它当前线程脱离,切换到另一个线程运行。当线程运行到 suspend 函数时,会暂时挂起这个函数及后续代码的执行。简而言之,挂起函数是一个可以启动、暂停和恢复的函数。
协程运行的时候每遇到被 suspend 修饰的方法时,都可能会挂起当前协程,不是必会挂起,例如如下方法就不会被挂起。
private suspend fun a() {
println("aa")
}
复制代码
这是因为这种方法不会返回 COROUTINE_SUSPENDED 类型,这在后面详细解释。
5.2 suspend 本质
Kotlin 使用堆栈帧来管理要运行哪个函数以及所有局部变量。
协程在常规函数基础上添加了 suspend 和 resume 两项操作用于处理长时间运行的任务。
【suspend】:挂起或暂停,用于挂起执行当前协程,并保存所有局部变量
【resume】:恢复,用于让已挂起的协程从挂起处恢复继续执行
挂起(暂停)协程时,会复制并保存当前的堆栈帧以供稍后使用,将信息保存到 Continuation 对象中。
恢复协程时,会将堆栈帧从其保存位置复制回来,对应的 Continuation 通过调用 resumeWith 函数才会恢复协程的执行,然后函数再次开始运行。同时返回 Result 类型的成功或者异常的结果。
public interface Continuation<in T> {
//对应这个Continuation的协程上下文
public val context: CoroutineContext
//恢复相应协程的执行,传递一个成功或失败的结果作为最后一个挂起点的返回值。
public fun resumeWith(result: Result<T>)
}
//将[value]作为最后一个挂起点的返回值,恢复相应协程的执行。
@SinceKotlin("1.3")
@InlineOnly
public inline fun <T> Continuation<T>.resume(value: T): Unit =
resumeWith(Result.success(value))
//恢复相应协程的执行,以便在最后一个挂起点之后重新抛出[异常]。
@SinceKotlin("1.3")
@InlineOnly
public inline fun <T> Continuation<T>.resumeWithException(exception: Throwable): Unit =
resumeWith(Result.failure(exception))
复制代码
Continuation 类有一个 resumeWith 函数可以接收 Result 类型的参数。
在结果成功获取时,调用 resumeWith(Result.success(value))或者调用拓展函数 resume(value);出现异常时,调用 resumeWith(Result.failure(exception))或者调用拓展函数 resumeWithException(exception)。这就是 Continuation 的恢复调用。
@FormUrlEncoded
@POST("/api/common/countryList")
suspend fun fetchCountryList(@FieldMap params: Map<String, String?>): CountryResponse
复制代码
前述挂起函数解析后反编译如下:
@FormUrlEncoded
@POST("/api/common/countryList")
@Nullable
Object fetchCountryList(@FieldMap @NotNull Map var1, @NotNull Continuation var2);
复制代码
挂起函数反编译后,发现多了一个 Continuation 参数,有编译器传递,说明调用挂起函数需要 Continuation。
只有挂起函数或者协程中才有 Continuation,所以挂起函数只能在协程或者其他挂起函数中执行。
5.2.1 Continuation
这里看一下该 Continuation 的传递来源。
这个函数只能在协程或者挂起函数中执行,说明 Continuation 很有可能是从协程中传入来的,查看协程构建源码。
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
复制代码
通过 launch 启动一个协程时,其通过 coroutine 的 start 方法启动协程:
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
start(block, receiver, this)
}
复制代码
然后 start 方法里面调用了 CoroutineStart 的 invoke,这个时候我们发现了 Continuation:
//CoroutineStart的invoke方法出现了Continuation
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
when (this) {
DEFAULT -> block.startCoroutineCancellable(receiver, completion)
ATOMIC -> block.startCoroutine(receiver, completion)
UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
LAZY -> Unit // will start lazily
}
@InternalCoroutinesApi
public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) {
createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))
}
复制代码
最终回调到 Continuation 的 resumeWith()恢复函数中。
public fun <T> Continuation<T>.resumeCancellableWith(
result: Result<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
else -> resumeWith(result)
}
复制代码
我们再深入 kotlin 源码看一下其内部实现。
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
receiver: R,
completion: Continuation<T>
): Continuation<Unit> {
val probeCompletion = probeCoroutineCreated(completion)
return if (this is BaseContinuationImpl)
create(receiver, probeCompletion)
else {
createCoroutineFromSuspendFunction(probeCompletion) {
(this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)//1
}
}
}
复制代码
private inline fun <T> createCoroutineFromSuspendFunction(
completion: Continuation<T>,
crossinline block: (Continuation<T>) -> Any?
): Continuation<Unit> {
val context = completion.context
// label == 0 when coroutine is not started yet (initially) or label == 1 when it was
return if (context === EmptyCoroutineContext)
object : RestrictedContinuationImpl(completion as Continuation<Any?>) {
private var label = 0
override fun invokeSuspend(result: Result<Any?>): Any? =
when (label) {
0 -> {
label = 1
result.getOrThrow() // Rethrow exception if trying to start with exception (will be caught by BaseContinuationImpl.resumeWith
block(this) // run the block, may return or suspend
}
1 -> {
label = 2
result.getOrThrow() // this is the result if the block had suspended
}
else -> error("This coroutine had already completed")
}
}
else
object : ContinuationImpl(completion as Continuation<Any?>, context) {
private var label = 0
override fun invokeSuspend(result: Result<Any?>): Any? =
when (label) {
0 -> {
label = 1
result.getOrThrow() // Rethrow exception if trying to start with exception (will be caught by BaseContinuationImpl.resumeWith
block(this) // run the block, may return or suspend
}
1 -> {
label = 2
result.getOrThrow() // this is the result if the block had suspended
}
else -> error("This coroutine had already completed")
}
}
}
复制代码
createCoroutineUnintercepted(receiver, completion)方法在 Kotlin 源码中是通过 suspend 关键字修饰的扩展方法。
suspend 关键字修饰(suspend R.() -> T)对象实际被编译成为一个 Function2<r, continuation, Any?>接口对象,而关键字 suspend 实际编译成了 Continuation 接口。
所以:
协程体本身就是 Continuation,即必须在协程内调用 suspend 挂起函数。
suspend 关键字并不具备暂停、挂起代码块或者函数方法功能。
5.2.2 状态机 CPS
协程实际挂起是如何实现的?
这里首先通过一个示例来演示一下状态机。
suspend fun main() {
log(1)
log(returnSuspended())
log(2)
delay(1000)
log(3)
log(returnImmediately())
log(4)
}
suspend fun returnSuspended() = suspendCoroutineUninterceptedOrReturn<String>{
continuation ->
thread {
Thread.sleep(1000)
continuation.resume("Return suspended.")
}
COROUTINE_SUSPENDED
}
suspend fun returnImmediately() = suspendCoroutineUninterceptedOrReturn<String>{
log(5)
"Return immediately."
}
复制代码
这里我们定义了两个挂起函数,一个会真正挂起,一个会直接返回结果,其运行结果为:
[main]:1
[Thread-2]:Return suspended.
[Thread-2]:2
[kotlinx.coroutines.DefaultExecutor]:3
[kotlinx.coroutines.DefaultExecutor]:5
[kotlinx.coroutines.DefaultExecutor]:Return immediately.
[kotlinx.coroutines.DefaultExecutor]:4
复制代码
前述代码的实际实现情况大致如下:
public class ContinuationImpl implements Continuation<Object> {
private int label = 0;
private final Continuation<Unit> completion;
public ContinuationImpl(Continuation<Unit> completion) {
this.completion = completion;
}
@Override
public CoroutineContext getContext() {
return EmptyCoroutineContext.INSTANCE;
}
@Override
public void resumeWith(@NotNull Object o) {
try {
Object result = o;
switch (label) {
case 0: {
LogKt.log(1);
result = SuspendFunctionsKt.returnSuspended( this);
label++;
if (isSuspended(result)) return;
}
case 1: {
LogKt.log(result);
LogKt.log(2);
result = DelayKt.delay(1000, this);
label++;
if (isSuspended(result)) return;
}
case 2: {
LogKt.log(3);
result = SuspendFunctionsKt.returnImmediately( this);
label++;
if (isSuspended(result)) return;
}
case 3:{
LogKt.log(result);
LogKt.log(4);
}
}
completion.resumeWith(Unit.INSTANCE);
} catch (Exception e) {
completion.resumeWith(e);
}
}
private boolean isSuspended(Object result) {
return result == IntrinsicsKt.getCOROUTINE_SUSPENDED();
}
}
复制代码
首先定义了一个 ContinuationImpl,即一个 Continuation 的实现。
可以在 Kotlin 的标准库当中找到一个名叫 ContinuationImpl 的类,其 resumeWith 最终调用到了 invokeSuspend,而这个 invokeSuspend 实际上就是我们的协程体,通常也就是一个 Lambda 表达式。
通过 launch 启动协程,传入的那个 Lambda 表达式,实际上会被编译成一个 SuspendLambda 的子类,而它又是 ContinuationImpl 的子类。
public class RunSuspend implements Continuation<Unit> {
private Object result;
@Override
public CoroutineContext getContext() {
return EmptyCoroutineContext.INSTANCE;
}
@Override
public void resumeWith(@NotNull Object result) {
synchronized (this){
this.result = result;
notifyAll(); // 协程已经结束,通知下面的 wait() 方法停止阻塞
}
}
public void await() throws Throwable {
synchronized (this){
while (true){
Object result = this.result;
if(result == null) wait(); // 调用了 Object.wait(),阻塞当前线程,在 notify 或者 notifyAll 调用时返回
else if(result instanceof Throwable){
throw (Throwable) result;
} else return;
}
}
}
}
复制代码
接着,定义了一个 RunSuspend,用来接收结果。
public static void main(String... args) throws Throwable {
RunSuspend runSuspend = new RunSuspend();
ContinuationImpl table = new ContinuationImpl(runSuspend);
table.resumeWith(Unit.INSTANCE);
runSuspend.await();
}
复制代码
作为 completion 传入的 RunSuspend 实例的 resumeWith 实际上是在 ContinuationImpl 的 resumeWtih 的最后才会被调用,因此它的 await() 一旦进入阻塞态,直到 ContinuationImpl 的整体状态流转完毕才会停止阻塞,此时进程也就运行完毕正常退出了。
这段代码的运行结果为:
/******打印结果******/
[main]:1
[Thread-2]:Return suspended.
[Thread-2]:2
[kotlinx.coroutines.DefaultExecutor]:3
[kotlinx.coroutines.DefaultExecutor]:5
[kotlinx.coroutines.DefaultExecutor]:Return immediately.
[kotlinx.coroutines.DefaultExecutor]:4
复制代码
协程体的执行就是一个状态机,每一次遇到挂起函数,都是一次状态转移,就像我们前面例子中的 label 不断的自增来实现状态流转一样
状态机即代码中每一个挂起点和初始挂起点对应的 Continuation 都会转化为一种状态,协程恢复只是跳转到下一种状态。
挂起函数将执行过程分为多个 Continuation 片段,并且利用状态机的方式保证各个片段是顺序执行的,所以异步逻辑也可以用顺序的代码来实现。
5.3 协程运行原理
前述相关示例更多是为了验证分析协程的一些特性,这里从协程的创建、启动、恢复、线程调度,协程切换等详细解析协程的实现。
5.3.1 协程创建与启动
首先创建一个协程并启动,最常见的莫过于 CoroutineScope.launch{},其源码实现为:
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
复制代码
我们如果不指定 start 参数,所以它会使用默认的 CoroutineStart.DEFAULT,最终 coroutine 会得到一个 StandaloneCoroutine。其实现自 AbstractCoroutine,实现了 Continuation。
前述分析 suspend 本质时已知,其最终会调用到 createCoroutineUnintercepted,主要是创建了一个新的可挂起计算,通过调用 resume(Unit)启动协程,返回值为 Continuation,Continuation 提供了 resumeWith 恢复协程的接口,用以实现协程恢复,Continuation 封装了协程的代码运行逻辑和恢复接口。
将协程代码进行反编译,再看一下其字节码和 java 实现,例如
suspend fun test() {
CoroutineScope(Dispatchers.IO).launch {
delay(11)
}
}
复制代码
查看其字节码实现时,可知其编译生成内部类。
协程的计算逻辑封装在 invokeSuspend 方法中,而 SuspendLambda 的继承关系为 ,
SuspendLambda -> ContinuationImpl -> BaseContinuationImpl -> Continuation
其中 BaseContinuationImpl 部分关键源码如下:
internal abstract class BaseContinuationImpl(...) {
// 实现 Continuation 的 resumeWith,并且是 final 的,不可被重写
public final override fun resumeWith(result: Result<Any?>) {
...
val outcome = invokeSuspend(param)
...
}
// 由编译生成的协程相关类来实现
protected abstract fun invokeSuspend(result: Result<Any?>): Any?
}
复制代码
前述的协程示例代码反编译为:
public static final Object test(@NotNull Continuation $completion) {
Job var10000 = BuildersKt.launch$default(CoroutineScopeKt.CoroutineScope((CoroutineContext)Dispatchers.getIO()), (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
//挂起标识
Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch(this.label) {
case 0:
ResultKt.throwOnFailure($result);
//设置挂起后恢复,进入的状态
this.label = 1;
if (DelayKt.delay(11L, this) == var2) {
return var2;
}
break;
case 1:
// 是否需要抛出异常
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
return Unit.INSTANCE;
}
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
return var3;
}
public final Object invoke(Object var1, Object var2) {
return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
}
}), 3, (Object)null);
return var10000 == IntrinsicsKt.getCOROUTINE_SUSPENDED() ? var10000 : Unit.INSTANCE;
}
复制代码
所以,协程的启动流程为:resume(Unit)->resumeWith()->invokeSuspend()。
协程的挂起通过 suspend 挂起函数实现,协程的恢复通过 Continuation.resumeWith 实现。
5.3.2 协程线程调度
协程的线程调度是通过拦截器实现的,前面提到了协程启动调用到了 startCoroutineCancellable,关于协程调度在前述的协程调度器部分已详细介绍了,这里再简单过一下。
@InternalCoroutinesApi
public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) {
createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))
}
复制代码
看一下其 intercepted()的具体实现:
@SinceKotlin("1.3")
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
(this as? ContinuationImpl)?.intercepted() ?: this
internal abstract class ContinuationImpl(
......
) : BaseContinuationImpl(completion) {
constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)
public override val context: CoroutineContext
get() = _context!!
@Transient
private var intercepted: Continuation<Any?>? = null
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
// context[ContinuationInterceptor] 就是协程的 CoroutineDispatcher
......
}
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
......
}
复制代码
intercepted()最终会使用协程的 CoroutineDispatcher 的 interceptContinuation 方法包装原来的 Continuation,拦截所有的协程运行操作。
DispatchedContinuation 拦截了协程的启动和恢复,分别是 resumeCancellableWith 和重写的 resumeWith(Result)。
internal class DispatchedContinuation<in T>(
@JvmField val dispatcher: CoroutineDispatcher,
@JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
@Suppress("NOTHING_TO_INLINE")
inline fun resumeCancellableWith(
result: Result<T>,
noinline onCancellation: ((cause: Throwable) -> Unit)?
) {
val state = result.toState(onCancellation)
//判断是否需要线程调度
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_CANCELLABLE
//将协程运算分发到另一个线程
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_CANCELLABLE) {
if (!resumeCancelled(state)) {
//直接在当前线程执行协程运算
resumeUndispatchedWith(result)
}
}
}
}
override fun resumeWith(result: Result<T>) {
val context = continuation.context
val state = result.toState()
//判断是否需要线程调度
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_ATOMIC
//将协程的运算分发到另一个线程
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_ATOMIC) {
withCoroutineContext(this.context, countOrElement) {
//直接在当前线程执行协程运算
continuation.resumeWith(result)
}
}
}
}
}
internal abstract class DispatchedTask<in T>(
@JvmField public var resumeMode: Int
) : SchedulerTask(){
public final override fun run() {
//封装了 continuation.resume 逻辑
}
......
}
复制代码
5.3.3 协程挂起与恢复
编译器会生成继承自 SuspendLambda 的子类,协程的真正运算逻辑都在 invokeSuspend 中。这里我们先再次回到 startCoroutineCancellable 函数中。
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
receiver: R, completion: Continuation<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
) =
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
}
复制代码
看一下其中的 resumeCancellableWith 方法。
public fun <T> Continuation<T>.resumeCancellableWith(
result: Result<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
else -> resumeWith(result)
}
复制代码
这是 Continuation 的扩展方法,最后都会调用到 Continuation 的 resumeWith,这里的 Continuation 就是前述所说的 SuspendLambda,它继承了 BaseContinuationImpl
internal abstract class BaseContinuationImpl(
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
// This implementation is final. This fact is used to unroll resumeWith recursion.
public final override fun resumeWith(result: Result<Any?>) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result
while (true) {
probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
//执行invokeSuspend内的代码块
val outcome = invokeSuspend(param)
//如果代码块内执行了挂起方法,协程挂起,resumeWith执行结束,再次调用resumeWith时协程挂起点之后的代码才能继续执行
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
// 如果完成的completion也是BaseContinuationImpl,就会进入循环
current = completion
param = outcome
} else {
// 执行completion resumeWith方法
completion.resumeWith(outcome)
return
}
}
}
}
protected abstract fun invokeSuspend(result: Result<Any?>): Any?
.....
}
复制代码
下面看一下 invokeSuspend 的实现逻辑。
fun main(args: Array<String>) {
val coroutineDispatcher = newSingleThreadContext("ctx")
// 启动协程 1
GlobalScope.launch(coroutineDispatcher) {
println("the first coroutine")
async (Dispatchers.IO) {
println("the second coroutine 11111")
delay(100)
println("the second coroutine 222222")
}.await()
println("the first coroutine end end end")
}
// 保证 main 线程存活,确保上面两个协程运行完成
Thread.sleep(500)
}
复制代码
前述示例编译成 SuspendLambda 子类的 invokeSuspend 方法为:
public final Object invokeSuspend(@NotNull Object $result) {
//挂起函数返回标识SUSPEND_FLAG
Object var5 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
String var3;
boolean var4;
//label默认初始值为0
switch(this.label) {
case 0:
ResultKt.throwOnFailure($result);
CoroutineScope $this$launch = (CoroutineScope)this.L$0;
var3 = "the first coroutine";
var4 = false;
System.out.println(var3);
//新建并启动 async 协程
Deferred var10000 = BuildersKt.async$default($this$launch, (CoroutineContext)Dispatchers.getIO(), (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
//挂起标识
Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
String var2;
boolean var3;
switch(this.label) {
case 0:
ResultKt.throwOnFailure($result);
var2 = "the second coroutine 11111";
var3 = false;
System.out.println(var2);
this.label = 1;
//判断是否执行delay挂起函数
if (DelayKt.delay(100L, this) == var4) {
//挂起,跳出该方法
return var4;
}
break;
case 1:
ResultKt.throwOnFailure($result);
// 恢复协程后再执行一次 resumeWith(),然后无异常的话执行最后的 println()
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
var2 = "the second coroutine 222222";
var3 = false;
System.out.println(var2);
return Unit.INSTANCE;
}
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
return var3;
}
public final Object invoke(Object var1, Object var2) {
return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
}
}), 2, (Object)null);
//设置挂起后恢复时,进入的状态
this.label = 1;
//调用await()挂起函数
if (var10000.await(this) == var5) {
return var5;
}
break;
case 1:
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
var3 = "the first coroutine end end end";
var4 = false;
System.out.println(var3);
return Unit.INSTANCE;
}
复制代码
如果 async 线程未执行完成,await()返回为 IntrinsicsKt.getCOROUTINE_SUSPENDED(),就会 return,launch 协程的 invokeSuspend 方法执行完成,协程所在线程继续往下运行,此时 launch 协程处于挂起状态。
所以协程的挂起在代码层面来说就是跳出协程执行的方法体,或者说跳出协程当前状态机下的对应状态,然后等待下一个状态来临时在进行执行。
关于协程挂起有三点注意事项:
启动其他协程并不会挂起当前协程,所以 launch 和 async 启动线程时,除非新协程运行在当前线程,则当前协程只能在新协程运行完成后继续执行,否则当前协程都会马上继续运行。
协程挂起并不会阻塞线程,因为协程挂起时相当于执行完协程的方法,线程继续执行其他之后的逻辑。
挂起函数并一定都会挂起协程,例如 await()挂起函数如果返回值不等于 IntrinsicsKt.getCOROUTINE_SUSPENDED(),则协程继续执行挂起点之后逻辑。
看完 invokeSuspend,我们再次回到 startCoroutineCancellable 函数中,其调用的 createCoroutineUnintercepted 方法中创建的 SuspendLambda 实例是 BaseContinuationImpl 的子类对象,其 completion 参数为下:
launch: if (isLazy) LazyStandaloneCoroutine else StandaloneCoroutine
async: if (isLazy) LazyDeferredCoroutine else DeferredCoroutine
上面这几个类都是 AbstractCoroutine 的子类。而根据 completion 的类型会执行不同的逻辑:
BaseContinuationImpl: 执行协程逻辑
其它: 调用 resumeWith 方法,处理协程的状态,协程挂起后的恢复即与它有关
前述的示例中 async 启动的协程,也会调用其 invokeSuspend 方法执行 async 协程,假设 async 返回的结果已经可用时,即非 COROUTINE_SUSPENDED 值,此时 completion 是 DeferredCoroutine 对象,因此就会调用 DeferredCoroutine.resumeWith 方法,然后返回,父协程的恢复逻辑便是在这里。
public final override fun resumeWith(result: Result<T>) {
val state = makeCompletingOnce(result.toState())
if (state === COMPLETING_WAITING_CHILDREN) return
afterResume(state)
}
复制代码
在 makeCompletingOnce 方法中,会根据 state 去处理协程状态,这里最终会走到 ResumeAwaitOnCompletion.invoke 来恢复父协程,必要的话还会把 async 的结果给它。
private class ResumeAwaitOnCompletion<T>(
private val continuation: CancellableContinuationImpl<T>
) : JobNode() {
override fun invoke(cause: Throwable?) {
val state = job.state
assert { state !is Incomplete }
if (state is CompletedExceptionally) {
// Resume with with the corresponding exception to preserve it
continuation.resumeWithException(state.cause)
} else {
// resume 被挂起的协程
@Suppress("UNCHECKED_CAST")
continuation.resume(state.unboxState() as T)
}
}
}
复制代码
这里的 continuation 就是 launch 协程体,也就是 SuspendLambda 对象,于是 invoke 方法会再一次调用到 BaseContinuationImpl.resumeWith 方法,接着调用 SuspendLambda.invokeSuspend, 然后根据 label 取值继续执行接下来的逻辑!
launch 协程恢复的过程,从 async 协程的 SuspendLambda 的子类的 completion.resumeWith(outcome) -> AbstractCoroutine.resumeWith(result) ..-> JobSupport.tryFinalizeSimpleState() -> JobSupport.completeStateFinalization() -> state.list?.notifyCompletion(cause) -> node.invoke,最后 handler 节点里面通过调用 resume(result)恢复协程。
await()挂起函数恢复协程的原理:
将 launch 协程封装为 ResumeAwaitOnCompletion 作为 handler 节点添加到 aynsc 协程的 state.list
然后在 async 协程完成时会通知 handler 节点调用 launch 协程的 resume(result) 方法将结果传给 launch 协程,并恢复 launch 协程继续执行 await 挂起点之后的逻辑。
5.3.4 协程三层封装
通过前述的一系列分析可知,协程有三层封装:
常用的 launch 和 async 返回的 Job、Deferred,里面封装了协程状态,提供了取消协程接口,而它们的实例都是继承自 AbstractCoroutine,它是协程的第一层包装。
第二层包装是编译器生成的 SuspendLambda 的子类,封装了协程的真正运算逻辑,继承自 BaseContinuationImpl,其中 completion 属性就是协程的第一层包装。
第三层包装是前面分析协程的线程调度时提到的 DispatchedContinuation,封装了线程调度逻辑,包含了协程的第二层包装。
协程其实就是一段可以挂起和恢复执行的运算逻辑,而协程的挂起通过挂起函数实现,挂起函数用状态机的方式用挂起点将协程的运算逻辑拆分成不同的片段,每次运行协程执行不同的逻辑片段。
所以协程有两个很大的好处:
简化异步编程,支持异步返回;
挂起不阻塞线程,提供线程利用率
六、总结
本文通过为什么使用协程,协程如何创建启动,协程的调度原理和协程的挂起原理几个方面对协程进行了初步剖析,下面一起回顾一下全文重点内容,对全文内容进行一个总结
协程引入:
协程可以让异步代码同步化,降低程序涉及的复杂度
协程本质是轻量级线程,单个线程可以运行多个协程,协程的运行不会导致线程阻塞
协程启动:
协程启动需要三部分:上下文、启动模式、协程体。创建协程的方式有 runBlocking、launch 和 async,推荐使用 CoroutineScope.launch 的方式创建协程,使用 async 的方式创建并发执行,同步等待获取返回值的情况。
Job 是 launch 构建协程返回的一个协程任务,完成时没有返回值,可看成协程对象本身。其提供相关方法可用于观察协程执行情况。Deferred 继承自 Job,是 async 构建协程返回的一个协程任务,可通过调用 await()方法等待执行完成获取结果。
启动协程需要作用域,作用域在协程创建过程中产生,常见的协程作用域有 GlobalScope、coroutineScope 等,协程配合 Jetpack Lifecycle 相关组件提供的 lifecycleScope 等作用域进行使用,异常丝滑好用。
协程的启动模式有 DEFAULT、ATOMIC、UNDISPATCHED、LAZY 四种,注意不同启动模式的区别。
如果要在父协程中进行子协程切换操作,可以使用 withContext。
协程调度:
协程上下文是一个元素的集合,其定义是递归的,自己包含若干个自己,其结构介于 set 和 map 之间。
协程实现的本质是回调,这个回调即 Continuation。协程拦截器的实现就是拦截 Continuation,可在此处进行缓存、日志打印等拦截处理
调度器即确认相关协程在哪个线程上执行,调度的本质是解决挂起恢复后协程逻辑在哪里运行的问题,其继承自拦截器。
调度器的是实现原理即在协程启动时通过拦截器进行拦截,返回一个 Continuation,再在协程恢复进行 resumeWith 操作时,进行线程切换判断和线程切换。
协程挂起:
挂起函数是一个可启动、暂停和恢复的函数,被 suspend 修饰的函数在协程运行时不是一定会被挂起的。
挂起函数的挂起实现原理就是状态机的状态转移。协程体的执行就是一个状态机,每遇到一次挂起函数就是一次状态转移,而协程的恢复不过是从一种状态跳转到下一种状态。挂起函数将整个执行过程划分为多个 Continuation 片段,利用状态机的方式保证各个片段时顺序执行的,从而实现了用顺序的代码实现异步逻辑。
参考资料:
【1】破解Kotlin协程
【2】Kotlin Jetpack 实战 | 09.图解协程原理
【3】一文看透 Kotlin 协程本质
【4】抽丝剥茧Kotlin - 协程
【5】Kotlin协程实现原理
【6】kotlin 协程-Android实战
【7】kotlin 协程 官方指导文档
评论