Kotlin 协程到底是怎么切换线程的?你是否知晓?(1),kotlin 开源项目实战
[](
)1.2 GlobalScope
与ViewModelScope
有什么区别?
public object GlobalScope : CoroutineScope {
/**
返回 [EmptyCoroutineContext].
*/
override val coroutineContext: CoroutineContext
get() = EmptyCoroutineContext
}
public val ViewModel.viewModelScope: CoroutineScope
get() {
val scope: CoroutineScope? = this.getTag(JOB_KEY)
if (scope != null) {
return scope
}
return setTagIfAbsent(
JOB_KEY,
CloseableCoroutineScope(SupervisorJob() + Dispatchers.Main.immediate)
)
}
两者的代码都挺简单,从上面可以看出
1.GlobalScope
返回的为CoroutineContext
的空实现
2.ViewModelScope
则往CoroutineContext
中添加了Job
与Dispatcher
我们先来看一段简单的代码
fun testOne(){
GlobalScope.launch {
print("1:" + Thread.currentThread().name)
delay(1000)
print("2:" + Thread.currentThread().name)
}
}
//打印结果为:DefaultDispatcher-worker-1
fun testTwo(){
viewModelScope.launch {
print("1:" + Thread.currentThread().name)
delay(1000)
print("2:" + Thread.currentThread().name)
}
}
//打印结果为: main
上面两种Scope
启动协程后,打印当前线程名是不同的,一个是线程池中的一个线程,一个则是主线程
这是因为ViewModelScope
在CoroutineContext
中添加了Dispatchers.Main.immediate
的原因
我们可以得出结论:协程就是通过Dispatchers
调度器来控制线程切换的
[](
)1.3 什么是调度器?
从使用上来讲,调度器就是我们使用的Dispatchers.Main
,Dispatchers.Default
,Dispatcher.IO
等
从作用上来讲,调度器的作用是控制协程运行的线程
从结构上来讲,Dispatchers
的父类是ContinuationInterceptor
,然后再继承于CoroutineContext
它们的类结构关系如下:
这也是为什么Dispatchers
能加入到CoroutineContext
中的原因,并且支持+
操作符来完成增加
[](
)1.4 什么是拦截器
从命名上很容易看出,ContinuationInterceptor
即协程拦截器,先看一下接口
interface ContinuationInterceptor : CoroutineContext.Element {
// ContinuationInterceptor 在 CoroutineContext 中的 Key
companion object Key : CoroutineContext.Key<ContinuationInterceptor>
/**
拦截 continuation
*/
fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
//...
}
从上面可以提炼出两个信息
1.拦截器的Key
是单例的,因此当你添加多个拦截器时,生效的只会有一个
2.我们都知道,Continuation
在调用其Continuation#resumeWith()
方法,会执行其suspend
修饰的函数的代码块,如果我们提前拦截到,是不是可以做点其他事情?这就是调度器切换线程的原理
上面我们已经介绍了是通过Dispatchers
指定协程运行的线程,通过interceptContinuation
在协程恢复前进行拦截,从而切换线程
带着这些前置知识,我们一起来看下协程启动的具体流程,明确下协程切换线程源码具体实现
[](
)2. 协程线程切换源码分析
[](
)2.1 launch
方法解析
我们首先看一下协程是怎样启动的,传入了什么参数
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
总共有 3 个参数:
1.传入的协程上下文
2.CoroutinStart
启动器,是个枚举类,定义了不同的启动方法,默认是CoroutineStart.DEFAULT
3.block
就是我们传入的协程体,真正要执行的代码
这段代码主要做了两件事:
1.组合新的CoroutineContext
2.再创建一个 Continuation
[](
)2.1.1 组合新的CoroutineContext
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
val combined = coroutineContext + context
val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
debug + Dispatchers.Default else debug
}
从上面可以提炼出以下信息:
1.会将launch
方法传入的context
与CoroutineScope
中的context
组合起来
2.如果combined
中没有拦截器,会传入一个默认的拦截器,即Dispatchers.Default
,这也解释了为什么我们没有传入拦截器时会有一个默认切换线程的效果
[](
)2.1.2 创建一个Continuation
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
默认情况下,我们会创建一个StandloneCoroutine
值得注意的是,这个coroutine
其实是我们协程体的complete
,即成功后的回调,而不是协程体本身
然后调用coroutine.start
,这表明协程开始启动了
[](
)2.2 协程的启动
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
initParentJob()
start(block, receiver, this)
}
接着调用CoroutineStart
的start
来启动协程,默认情况下调用的是CoroutineStart.Default
经过层层调用,最后到达了:
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
runSafely(completion) {
// 外面再包一层 Coroutine
createCoroutineUnintercepted(receiver, completion)
// 如果需要,做拦截处理
.intercepted()
// 调用 resumeWith 方法
.resumeCancellableWith(Result.success(Unit))
}
这里就是协程启动的核心代码,虽然比较短,却包括 3 个步骤:
1.创建协程体Continuation
2.创建拦截 Continuation
,即DispatchedContinuation
3.执行DispatchedContinuation.resumeWith
方法
[](
)2.3 创建协程体Continuation
调用createCoroutineUnintercepted
,会把我们的协程体即suspend block
转换成Continuation
,它是SuspendLambda
,继承自ContinuationImpl
createCoroutineUnintercepted
方法在源码中找不到具体实现,不过如果你把协程体代码反编译后就可以看到真正的实现
详情可见:[字节码反编译](
)
[](
)2.4 创建DispatchedContinuation
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
(this as? ContinuationImpl)?.intercepted() ?: this
//ContinuationImpl
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
//CoroutineDispatcher
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
从上可以提炼出以下信息
1.interepted
是个扩展方法,最后会调用到ContinuationImpl.intercepted
方法
2.在intercepted
会利用CoroutineContext
,获取当前的拦截器
3.因为当前的拦截器是CoroutineDispatcher
,因此最终会返回一个DispatchedContinuation
,我们其实也是利用它实现线程切换的
4.我们将协程体的Continuation
传入DispatchedContinuation
,这里其实用到了装饰器模式
,实现功能的增强
这里其实很明显了,通过DispatchedContinuation
装饰原有协程,在DispatchedContinuation
里通过调度器处理线程切换,不影响原有逻辑,实现功能的增强
[](
)2.5 拦截处理
//DispatchedContinuation
inline fun resumeCancellableWith(
result: Result<T>,
noinline onCancellation: ((cause: Throwable) -> Unit)?
) {
val state = result.toState(onCancellation)
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_CANCELLABLE
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_CANCELLABLE) {
if (!resumeCancelled(state)) {
resumeUndispatchedWith(result)
}
}
}
}
上面说到了启动时会调用DispatchedContinuation
的resumeCancellableWith
方法
这里面做的事也很简单:
1.如果需要切换线程,调用dispatcher.dispatcher
方法,这里的dispatcher
是通过CoroutineConext
取出来的
2.如果不需要切换线程,直接运行原有线程即可
[](
)2.5.2 调度器的具体实现
我们首先明确下,CoroutineDispatcher
是通过CoroutineContext
取出来的,这也是协程上下文作用的体现
CoroutineDispater
官方提供了四种实现:Dispatchers.Main
,Dispatchers.IO
,Dispatchers.Default
,Dispatchers.Unconfined
我们一起简单看下Dispatchers.Main
的实现
internal class HandlerContext private constructor(
private val handler: Handler,
private val name: String?,
private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
public constructor(
handler: Handler,
name: String? = null
) : this
(handler, name, false)
//...
override fun dispatch(context: CoroutineContext, block: Runnable) {
评论