Kotlin 协程,flutterplugin 打包 aar
协程是编译器的能力,因为协程并不需要操作系统和硬件的支持(线程需要),是编译器为了让开发者写代码更简单方便, 提供了一些关键字, 并在内部自动生成了一些字节码
线程和协程的目的差异
线程的目的是提高 CPU 资源使用率, 使多个任务得以并行的运行,是为了服务于机器的.
协程的目的是为了让多个任务之间更好的协作,主要体现在代码逻辑上,是为了服务开发者 (能提升资源的利用率, 但并不是原始目的)
线程和协程的调度差异
线程的调度是系统完成的,一般是抢占式的,根据优先级来分配
协程的调度是开发者根据程序逻辑指定好的,在不同的时期把资源合理的分配给不同的任务.
协程与线程的关系 协程并不是取代线程,而且抽象于线程之上,线程是被分割的 CPU 资源,协程是组织好的代码流程,协程需要线程来承载运行,线程是协程的资源
2、基本使用
CoroutineScope.launch
launch 函数可以启动新协程而不将结果返回给调用方
代码实现
//获取一个协程作用域用于创建协程 private val mScope = MainScope()
mScope.launch(Dispatchers.IO) {//IO 线程执行 getStringInfo()方法,返回结果 var res = getStringInfo()//获取结果后主线程提示更新 withContext(Dispatchers.Main) {Alerter.create(this@LearnCoroutineActivity).setTitle("Result").setText(res).show()}}
private suspend fun getStringInfo(): String {return withContext(Dispatchers.IO) {//在这 1000 毫秒内该协程所处的线程不会阻塞 delay(1000)"Coroutine-launch"}}
//在 onDestroy 生命周期方法之中要手动取消 override fun onDestroy() {super.onDestroy()mScope.cancel()}
步骤
获取一个协程作用域用于创建协程
通过协程作用域.launch 方法启动新的协程任务
启动时可以指定执行线程
内部通过 withContext()方法实现切换线程
在 onDestroy 生命周期方法之中要手动取消
协程作用域
MainScope 是协程默认提供的作用域,但是还有其他作用域更为方便
可使用 lifecycleScope 或者 viewModelScope,这两种作用域会自动取消
在 UI 组件中使用 LifecycleOwner.lifecycleScope,在 ViewModel 中使用 ViewModel.viewModelScope
CoroutineScope.async
async 函数实现返回值处理或者并发处理
返回值处理
private fun asyncReturn() {mScope.launch(Dispatchers.Main) {//新开一个协程去执行协程体,父协程的代码会接着往下走 var deferred = async(Dispatchers.IO) {delay(1000)"Coroutine-Async"}//等待 async 执行完成获取返回值,并不会阻塞线程,而是挂起,将线程的执行权交出去//直到 async 的协程体执行完毕后,会恢复协程继续执行 val data = deferred.await()Alerter.create(this@LearnCoroutineActivity).setTitle("Result").setText(data).show()}}
并发处理
private fun asyncConcurrent() {//coroutineContext 的创建下文会有分析 var coroutineContext = Job() +Dispatchers.Main +CoroutineExceptionHandler { coroutineContext, throwable ->Log.e("CoroutineException","CoroutineExceptionHandler: throwable")} +CoroutineName("asyncConcurrent")mScope.launch(coroutineContext) {val job1 = async(Dispatchers.IO) {delay(1000)"job1-finish"}val job2 = async(Dispatchers.IO) {delay(2000)"job2-finish"}val job3 = async(Dispatchers.IO) {delay(500)"job3-finish"}//等待各job执行完 将结果合并Alerter.create(this@LearnCoroutineActivity).setTitle("Result").setText("job1:{job1.await()},job2:{job3.await()}").show()}}
可以看到,这就是同步代码风格编写异步代码的强大之处,下面开始源码解析
3、coroutineContext 解析
CoroutineContext 是一个特殊的集合,同时包含了 Map 和 Set 的特点
集合内部的元素 Element 是根据 key 去对应(Map 特点),但是不允许重复(Set 特点)
Element 之间可以通过+号进行组合
Element 有如下四类,共同组成了 CoroutineContext
Job:协程的唯一标识,用来控制协程的生命周期(new、active、completing、completed、cancelling、cancelled)
CoroutineDispatcher:指定协程运行的线程(IO、Default、Main、Unconfined)
CoroutineName: 指定协程的名称,默认为 coroutine
CoroutineExceptionHandler: 指定协程的异常处理器,用来处理未捕获的异常
3.1、Job Element
每一个所创建的协程 (通过
launch
或者 async),会返回一个Job
实例,该实例是协程的唯一标识,并且负责管理协程的生命周期
Job 状态
Job 在执行的过程中,包含了一系列状态,虽然开发者没办法直接获取所有状态,但是 Job 之中有如下三个属性
isActive(是否活动)
isCompleted(是否已完成)
isCancelled(是否已取消)
根据属性就可以推断出 Job 的所处状态,状态如下
新创建 (New)
当一个协程创建后就处于新建(New)状态
活跃 (Active)
当调用 Job 的 start/join 方法后协程就处于活跃(Active)状态
完成中 (Completing)
当协程执行完成后或者调用 CompletableJob(CompletableJob 是 Job 的一个子接口)的 complete 方法都会让当前协程进入完成中(Completing)状态
已完成 (Completed)
处于完成中状态的协程会等所有子协程都完成后才进入完成(Completed)状态
取消中 (Cancelling)
当运行出错或者调用 Job 的 cancel 方法都会将当前协程置为取消中(Cancelling)状态
已取消 (Cancelled)
处于取消中状态的协程会等所有子协程都完成后才进入取消 (Cancelled)状态
wait children+-----+ start +--------+ complete +-------------+ finish +-----------+| New | -----> | Active | ---------> | Completing | -------> | Completed |+-----+ +--------+ +-------------+ +-----------+| cancel / fail || +----------------+| |V V+------------+ finish +-----------+| Cancelling | --------------------------------> | Cancelled |+------------+ +-----------+
Job 方法
fun start(): Boolean
调用该函数来启动这个
Coroutine
,如果当前Coroutine
还没有执行调用该函数返回true
,如果当前Coroutine
已经执行或者已经执行完毕,则调用该函数返回false
fun cancel(cause: CancellationException? = null)
通过可选的取消原因取消 Job
fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
通过这个函数可以给
Job
设置一个完成通知,当Job
执行完成的时候会同步执行这个通知函数。 回调的通知对象类型为:typealias CompletionHandler = (cause: Throwable?) -> Unit
.CompletionHandler
参数代表了Job
是如何执行完成的。cause
有下面三种情况:如果
Job
是正常执行完成的,则cause
参数为null
如果
Job
是正常取消的,则cause
参数为CancellationException
对象。这种情况不应该当做错误处理,这是任务正常取消的情形。所以一般不需要在错误日志中记录这种情况。其他情况表示
Job
执行失败了。这个函数的返回值为
DisposableHandle
对象,如果不再需要监控Job
的完成情况了, 则可以调用DisposableHandle.dispose
函数来取消监听。如果Job
已经执行完了, 则无需调用dispose
函数了,会自动取消监听。
suspend fun join()(suspend 函数)
用来在另外一个
Coroutine
中等待 job 执行完成后继续执行。
Job 异常传播
协程是有父子级的概念,如果子 Job 在运行过程之中发生异常,那么父 Job 就会感知到并抛出异常。如果要抑制这种行为就需要使用 SupervisorJob
除了 CancellationException 以外的异常
SupervisorJob
fun main(){val parentJob = GlobalScope.launch {//childJob 是一个 SupervisorJobval childJob = launch(SupervisorJob()){throw NullPointerException()}childJob.join()println("parent complete")}Thread.sleep(1000)}
此时 childJob 抛出异常并不会影响 parentJob 的运行,parentJob 会继续运行并输出 parent complete。
3.2、CoroutineDispatcher Element
用于指定协程的运行线程
kotlin 已经内置了 CoroutineDispatcher 的 4 个实现,可以通过 Dispatchers 的 Default、IO、Main、Unconfined 字段分别返回使用
public actual object Dispatchers {@JvmStaticpublic actual val Default: CoroutineDispatcher = createDefaultDispatcher()@JvmStaticpublic val IO: CoroutineDispatcher = DefaultScheduler.IO@JvmStaticpublic actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined@JvmStaticpublic actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher}
Default,IO
Default,IO 其实内部用的是一个线程池,下面逐个解析,看实现原理
default
Default 会根据 useCoroutinesScheduler 属性(默认为 true)去获取对应的线程池
DefaultScheduler(useCoroutinesScheduler=ture):kotlin 自己实现的线程池逻辑
CommonPool(useCoroutinesScheduler=false):java 类库中的 Executor 实现线程池逻辑
internal actual fun createDefaultDispatcher(): CoroutineDispatcher =if (useCoroutinesScheduler) DefaultScheduler else CommonPoolinternal object DefaultScheduler : ExperimentalCoroutineDispatcher() {.....}//委托类 public open class ExperimentalCoroutineDispatcher(private val corePoolSize: Int,private val maxPoolSize: Int,private val idleWorkerKeepAliveNs: Long,private val schedulerName: String = "CoroutineScheduler") : ExecutorCoroutineDispatcher() {}//java 类库中的 Executor 实现线程池逻辑 internal object CommonPool : ExecutorCoroutineDispatcher() {}//共同父类,定义行为 public abstract class ExecutorCoroutineDispatcher: CoroutineDispatcher(), Closeable {}
ExperimentalCoroutineDispatcher
DefaultScheduler 的主要实现都在它的父类 ExperimentalCoroutineDispatcher 中
public open class ExperimentalCoroutineDispatcher(private val corePoolSize: Int,private val maxPoolSize: Int,private val idleWorkerKeepAliveNs: Long,private val schedulerName: String = "CoroutineScheduler") : ExecutorCoroutineDispatcher() {public constructor(corePoolSize: Int = CORE_POOL_SIZE,maxPoolSize: Int = MAX_POOL_SIZE,schedulerName: String = DEFAULT_SCHEDULER_NAME) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName)
...//省略构造
//创建 CoroutineScheduler 实例 private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
override val executor: Executorget() = coroutineScheduler
override fun dispatch(context: CoroutineContext, block: Runnable): Unit =try {//dispatch 方法委托到 CoroutineScheduler 的 dispatch 方法 coroutineScheduler.dispatch(block)} catch (e: RejectedExecutionException) {....}
override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit =try {//dispatchYield 方法委托到 CoroutineScheduler 的 dispatchYield 方法 coroutineScheduler.dispatch(block, tailDispatch = true)} catch (e: RejectedExecutionException) {...}
internal fun dispatchWithContext(block: Runnable, context: TaskContext, tailDispatch: Boolean) {try {//dispatchWithContext 方法委托到 CoroutineScheduler 的 dispatchWithContext 方法 coroutineScheduler.dispatch(block, context, tailDispatch)} catch (e: RejectedExecutionException) {....}}override fun close(): Unit = coroutineScheduler.close()//实现请求阻塞 public fun blocking(parallelism: Int = BLOCKING_DEFAULT_PARALLELISM): CoroutineDispatcher {require(parallelism > 0) { "Expected positive parallelism level, but have parallelism" }return LimitingDispatcher(this, parallelism, null, TASK_PROBABLY_BLOCKING)}//实现请求数量限制public fun limited(parallelism: Int): CoroutineDispatcher {require(parallelism > 0) { "Expected positive parallelism level, but have parallelism" }require(parallelism <= corePoolSize) { "Expected parallelism level lesser than core pool size (parallelism" }return LimitingDispatcher(this, parallelism, null, TASK_NON_BLOCKING)}
....//省略一些供测试的方法,更好的跟踪同步状态}
IO
IO 的实现其实是 LimitingDispatcher
val IO: CoroutineDispatcher = LimitingDispatcher(this,systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)),"Dispatchers.IO",TASK_PROBABLY_BLOCKING)
LimitingDispatcher
IO 的实现类会有一些最大请求限制,以及队列处理
private class LimitingDispatcher(private val dispatcher: ExperimentalCoroutineDispatcher,private val parallelism: Int,private val name: String?,override val taskMode: Int) : ExecutorCoroutineDispatcher(), TaskContext, Executor {//同步阻塞队列 private val queue = ConcurrentLinkedQueue<Runnable>()//cas 计数 private val inFlightTasks = atomic(0)
override fun dispatch(context: CoroutineContext, block: Runnable) = dispatch(block, false)
private fun dispatch(block: Runnable, tailDispatch: Boolean) {var taskToSchedule = blockwhile (true) {
if (inFlight <= parallelism) {//LimitingDispatcher 的 dispatch 方法委托给了 DefaultScheduler 的 dispatchWithContext 方法 dispatcher.dispatchWithContext(taskToSchedule, this, tailDispatch)return}..//省略了一些队列处理逻辑}}}
CoroutineScheduler
Default、IO 其实都是共享 CoroutineScheduler 线程池,Kotlin 实现了一套线程池两种调度策略
通过内部的 mode 区分
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {......if (task.mode == TASK_NON_BLOCKING) {if (skipUnpark) returnsignalCpuWork()} else {signalBlockingWork(skipUnpark = skipUnpark)}}
Mode
处理策略
1.公式:max(corePoolSize, min(CPU 核心数 * 128, 2^21 - 2)),即大于 corePoolSize,小于 2^21 - 2
2.2^21 - 2 是一个很大的数约为 2M,但是 CoroutineScheduler 是不可能创建这么多线程的,所以就需要外部限制提交的任务数
3.Dispatchers.IO 构造时就通过 LimitingDispatcher 默认限制了最大线程并发数 parallelism 为 max(64, CPU 核心数),即最多只能提交 parallelism 个任务到 CoroutineScheduler 中执行,剩余的任务被放进队列中等待。 |
适合场景
2.复杂计算、视频解码等,如果此时线程数太多,超过了 CPU 核心数,那么这些超出来的线程是得不到 CPU 的执行的,只会浪费内存资源
3.因为线程本身也有栈等空间,同时线程过多,频繁的线程切换带来的消耗也会影响线程池的性能
4.对于 CPU 密集型任务,线程池并发线程数等于 CPU 核心数才能让 CPU 的执行效率最大化 || IO | 1.IO 密集型任务的特点是执行任务时 CPU 会处于闲置状态,任务不会消耗大量的 CPU 资源
2.网络请求、IO 操作等,线程执行 IO 密集型任务时大多数处于阻塞状态,处于阻塞状态的线程是不占用 CPU 的执行时间
3.此时 CPU 就处于闲置状态,为了让 CPU 忙起来,执行 IO 密集型任务时理应让线程的创建数量更多一点,理想情况下线程数应该等于提交的任务数,对于这些多创建出来的线程,当它们闲置时,线程池一般会有一个超时回收策略,所以大部分情况下并不会占用大量的内存资源
4.但也会有极端情况,所以对于 IO 密集型任务,线程池并发线程数应尽可能地多才能提高 CPU 的吞吐量,这个尽可能地多的程度并不是无限大,而是根据业务情况设定,但肯定要大于 CPU 核心数。 |
Unconfined
任务执行在默认的启动线程。之后由调用
resume
的线程决定恢复协程的线程。
internal object Unconfined : CoroutineDispatcher() {//为 false 为不需要 dispatchoverride fun isDispatchNeeded(context: CoroutineContext): Boolean = false
override fun dispatch(context: CoroutineContext, block: Runnable) {// 只有当调用 yield 方法时,Unconfined 的 dispatch 方法才会被调用// yield() 表示当前协程让出自己所在的线程给其他协程运行 val yieldContext = context[YieldContext]if (yieldContext != null) {yieldContext.dispatcherWasUnconfined = truereturn}throw UnsupportedOperationException("Dispatchers.Unconfined.dispatch function can only be used by the yield function. " +"If you wrap Unconfined dispatcher in your code, make sure you properly delegate " +"isDispatchNeeded and dispatch calls.")}}
每一个协程都有对应的 Continuation 实例,其中的 resumeWith 用于协程的恢复,存在于 DispatchedContinuation
DispatchedContinuation
我们重点看 resumeWith 的实现以及类委托
internal class DispatchedContinuation<in T>(@JvmField val dispatcher: CoroutineDispatcher,@JvmField val continuation: Continuation<T>) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {.....override fun
resumeWith(result: Result<T>) {val context = continuation.context
评论