Kotlin 协程,flutterplugin 打包 aar
协程是编译器的能力,因为协程并不需要操作系统和硬件的支持(线程需要),是编译器为了让开发者写代码更简单方便, 提供了一些关键字, 并在内部自动生成了一些字节码
线程和协程的目的差异
线程的目的是提高 CPU 资源使用率, 使多个任务得以并行的运行,是为了服务于机器的.
协程的目的是为了让多个任务之间更好的协作,主要体现在代码逻辑上,是为了服务开发者 (能提升资源的利用率, 但并不是原始目的)
线程和协程的调度差异
线程的调度是系统完成的,一般是抢占式的,根据优先级来分配
协程的调度是开发者根据程序逻辑指定好的,在不同的时期把资源合理的分配给不同的任务.
协程与线程的关系 协程并不是取代线程,而且抽象于线程之上,线程是被分割的 CPU 资源,协程是组织好的代码流程,协程需要线程来承载运行,线程是协程的资源
2、基本使用
CoroutineScope.launch
launch 函数可以启动新协程而不将结果返回给调用方
代码实现
//获取一个协程作用域用于创建协程 private val mScope = MainScope()
mScope.launch(Dispatchers.IO) {//IO 线程执行 getStringInfo()方法,返回结果 var res = getStringInfo()//获取结果后主线程提示更新 withContext(Dispatchers.Main) {Alerter.create(this@LearnCoroutineActivity).setTitle("Result").setText(res).show()}}
private suspend fun getStringInfo(): String {return withContext(Dispatchers.IO) {//在这 1000 毫秒内该协程所处的线程不会阻塞 delay(1000)"Coroutine-launch"}}
//在 onDestroy 生命周期方法之中要手动取消 override fun onDestroy() {super.onDestroy()mScope.cancel()}
步骤
获取一个协程作用域用于创建协程
通过协程作用域.launch 方法启动新的协程任务
启动时可以指定执行线程
内部通过 withContext()方法实现切换线程
在 onDestroy 生命周期方法之中要手动取消
协程作用域
MainScope 是协程默认提供的作用域,但是还有其他作用域更为方便
可使用 lifecycleScope 或者 viewModelScope,这两种作用域会自动取消
在 UI 组件中使用 LifecycleOwner.lifecycleScope,在 ViewModel 中使用 ViewModel.viewModelScope
CoroutineScope.async
async 函数实现返回值处理或者并发处理
返回值处理
private fun asyncReturn() {mScope.launch(Dispatchers.Main) {//新开一个协程去执行协程体,父协程的代码会接着往下走 var deferred = async(Dispatchers.IO) {delay(1000)"Coroutine-Async"}//等待 async 执行完成获取返回值,并不会阻塞线程,而是挂起,将线程的执行权交出去//直到 async 的协程体执行完毕后,会恢复协程继续执行 val data = deferred.await()Alerter.create(this@LearnCoroutineActivity).setTitle("Result").setText(data).show()}}
并发处理
private fun asyncConcurrent() {//coroutineContext 的创建下文会有分析 var coroutineContext = Job() +Dispatchers.Main +CoroutineExceptionHandler { coroutineContext, throwable ->Log.e("CoroutineException","CoroutineExceptionHandler: throwable")} +CoroutineName("asyncConcurrent")mScope.launch(coroutineContext) {val job1 = async(Dispatchers.IO) {delay(1000)"job1-finish"}val job2 = async(Dispatchers.IO) {delay(2000)"job2-finish"}val job3 = async(Dispatchers.IO) {delay(500)"job3-finish"}//等待各job执行完 将结果合并Alerter.create(this@LearnCoroutineActivity).setTitle("Result").setText("job1:{job1.await()},job2:{job3.await()}").show()}}
可以看到,这就是同步代码风格编写异步代码的强大之处,下面开始源码解析
3、coroutineContext 解析
CoroutineContext 是一个特殊的集合,同时包含了 Map 和 Set 的特点
集合内部的元素 Element 是根据 key 去对应(Map 特点),但是不允许重复(Set 特点)
Element 之间可以通过+号进行组合
Element 有如下四类,共同组成了 CoroutineContext
Job:协程的唯一标识,用来控制协程的生命周期(new、active、completing、completed、cancelling、cancelled)
CoroutineDispatcher:指定协程运行的线程(IO、Default、Main、Unconfined)
CoroutineName: 指定协程的名称,默认为 coroutine
CoroutineExceptionHandler: 指定协程的异常处理器,用来处理未捕获的异常
3.1、Job Element
每一个所创建的协程 (通过
launch或者 async),会返回一个Job实例,该实例是协程的唯一标识,并且负责管理协程的生命周期
Job 状态
Job 在执行的过程中,包含了一系列状态,虽然开发者没办法直接获取所有状态,但是 Job 之中有如下三个属性
isActive(是否活动)
isCompleted(是否已完成)
isCancelled(是否已取消)
根据属性就可以推断出 Job 的所处状态,状态如下
新创建 (New)
当一个协程创建后就处于新建(New)状态
活跃 (Active)
当调用 Job 的 start/join 方法后协程就处于活跃(Active)状态
完成中 (Completing)
当协程执行完成后或者调用 CompletableJob(CompletableJob 是 Job 的一个子接口)的 complete 方法都会让当前协程进入完成中(Completing)状态
已完成 (Completed)
处于完成中状态的协程会等所有子协程都完成后才进入完成(Completed)状态
取消中 (Cancelling)
当运行出错或者调用 Job 的 cancel 方法都会将当前协程置为取消中(Cancelling)状态
已取消 (Cancelled)
处于取消中状态的协程会等所有子协程都完成后才进入取消 (Cancelled)状态
wait children+-----+ start +--------+ complete +-------------+ finish +-----------+| New | -----> | Active | ---------> | Completing | -------> | Completed |+-----+ +--------+ +-------------+ +-----------+| cancel / fail || +----------------+| |V V+------------+ finish +-----------+| Cancelling | --------------------------------> | Cancelled |+------------+ +-----------+
Job 方法
fun start(): Boolean
调用该函数来启动这个
Coroutine,如果当前Coroutine还没有执行调用该函数返回true,如果当前Coroutine已经执行或者已经执行完毕,则调用该函数返回false
fun cancel(cause: CancellationException? = null)
通过可选的取消原因取消 Job
fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
通过这个函数可以给
Job设置一个完成通知,当Job执行完成的时候会同步执行这个通知函数。 回调的通知对象类型为:typealias CompletionHandler = (cause: Throwable?) -> Unit.CompletionHandler参数代表了Job是如何执行完成的。cause有下面三种情况:如果
Job是正常执行完成的,则cause参数为null如果
Job是正常取消的,则cause参数为CancellationException对象。这种情况不应该当做错误处理,这是任务正常取消的情形。所以一般不需要在错误日志中记录这种情况。其他情况表示
Job执行失败了。这个函数的返回值为
DisposableHandle对象,如果不再需要监控Job的完成情况了, 则可以调用DisposableHandle.dispose函数来取消监听。如果Job已经执行完了, 则无需调用dispose函数了,会自动取消监听。
suspend fun join()(suspend 函数)
用来在另外一个
Coroutine中等待 job 执行完成后继续执行。
Job 异常传播
协程是有父子级的概念,如果子 Job 在运行过程之中发生异常,那么父 Job 就会感知到并抛出异常。如果要抑制这种行为就需要使用 SupervisorJob
除了 CancellationException 以外的异常
SupervisorJob
fun main(){val parentJob = GlobalScope.launch {//childJob 是一个 SupervisorJobval childJob = launch(SupervisorJob()){throw NullPointerException()}childJob.join()println("parent complete")}Thread.sleep(1000)}
此时 childJob 抛出异常并不会影响 parentJob 的运行,parentJob 会继续运行并输出 parent complete。
3.2、CoroutineDispatcher Element
用于指定协程的运行线程
kotlin 已经内置了 CoroutineDispatcher 的 4 个实现,可以通过 Dispatchers 的 Default、IO、Main、Unconfined 字段分别返回使用
public actual object Dispatchers {@JvmStaticpublic actual val Default: CoroutineDispatcher = createDefaultDispatcher()@JvmStaticpublic val IO: CoroutineDispatcher = DefaultScheduler.IO@JvmStaticpublic actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined@JvmStaticpublic actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher}
Default,IO
Default,IO 其实内部用的是一个线程池,下面逐个解析,看实现原理
default
Default 会根据 useCoroutinesScheduler 属性(默认为 true)去获取对应的线程池
DefaultScheduler(useCoroutinesScheduler=ture):kotlin 自己实现的线程池逻辑
CommonPool(useCoroutinesScheduler=false):java 类库中的 Executor 实现线程池逻辑
internal actual fun createDefaultDispatcher(): CoroutineDispatcher =if (useCoroutinesScheduler) DefaultScheduler else CommonPoolinternal object DefaultScheduler : ExperimentalCoroutineDispatcher() {.....}//委托类 public open class ExperimentalCoroutineDispatcher(private val corePoolSize: Int,private val maxPoolSize: Int,private val idleWorkerKeepAliveNs: Long,private val schedulerName: String = "CoroutineScheduler") : ExecutorCoroutineDispatcher() {}//java 类库中的 Executor 实现线程池逻辑 internal object CommonPool : ExecutorCoroutineDispatcher() {}//共同父类,定义行为 public abstract class ExecutorCoroutineDispatcher: CoroutineDispatcher(), Closeable {}
ExperimentalCoroutineDispatcher
DefaultScheduler 的主要实现都在它的父类 ExperimentalCoroutineDispatcher 中
public open class ExperimentalCoroutineDispatcher(private val corePoolSize: Int,private val maxPoolSize: Int,private val idleWorkerKeepAliveNs: Long,private val schedulerName: String = "CoroutineScheduler") : ExecutorCoroutineDispatcher() {public constructor(corePoolSize: Int = CORE_POOL_SIZE,maxPoolSize: Int = MAX_POOL_SIZE,schedulerName: String = DEFAULT_SCHEDULER_NAME) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName)
...//省略构造
//创建 CoroutineScheduler 实例 private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
override val executor: Executorget() = coroutineScheduler
override fun dispatch(context: CoroutineContext, block: Runnable): Unit =try {//dispatch 方法委托到 CoroutineScheduler 的 dispatch 方法 coroutineScheduler.dispatch(block)} catch (e: RejectedExecutionException) {....}
override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit =try {//dispatchYield 方法委托到 CoroutineScheduler 的 dispatchYield 方法 coroutineScheduler.dispatch(block, tailDispatch = true)} catch (e: RejectedExecutionException) {...}
internal fun dispatchWithContext(block: Runnable, context: TaskContext, tailDispatch: Boolean) {try {//dispatchWithContext 方法委托到 CoroutineScheduler 的 dispatchWithContext 方法 coroutineScheduler.dispatch(block, context, tailDispatch)} catch (e: RejectedExecutionException) {....}}override fun close(): Unit = coroutineScheduler.close()//实现请求阻塞 public fun blocking(parallelism: Int = BLOCKING_DEFAULT_PARALLELISM): CoroutineDispatcher {require(parallelism > 0) { "Expected positive parallelism level, but have parallelism" }return LimitingDispatcher(this, parallelism, null, TASK_PROBABLY_BLOCKING)}//实现请求数量限制public fun limited(parallelism: Int): CoroutineDispatcher {require(parallelism > 0) { "Expected positive parallelism level, but have parallelism" }require(parallelism <= corePoolSize) { "Expected parallelism level lesser than core pool size (parallelism" }return LimitingDispatcher(this, parallelism, null, TASK_NON_BLOCKING)}
....//省略一些供测试的方法,更好的跟踪同步状态}
IO
IO 的实现其实是 LimitingDispatcher
val IO: CoroutineDispatcher = LimitingDispatcher(this,systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)),"Dispatchers.IO",TASK_PROBABLY_BLOCKING)
LimitingDispatcher
IO 的实现类会有一些最大请求限制,以及队列处理
private class LimitingDispatcher(private val dispatcher: ExperimentalCoroutineDispatcher,private val parallelism: Int,private val name: String?,override val taskMode: Int) : ExecutorCoroutineDispatcher(), TaskContext, Executor {//同步阻塞队列 private val queue = ConcurrentLinkedQueue<Runnable>()//cas 计数 private val inFlightTasks = atomic(0)
override fun dispatch(context: CoroutineContext, block: Runnable) = dispatch(block, false)
private fun dispatch(block: Runnable, tailDispatch: Boolean) {var taskToSchedule = blockwhile (true) {
if (inFlight <= parallelism) {//LimitingDispatcher 的 dispatch 方法委托给了 DefaultScheduler 的 dispatchWithContext 方法 dispatcher.dispatchWithContext(taskToSchedule, this, tailDispatch)return}..//省略了一些队列处理逻辑}}}
CoroutineScheduler
Default、IO 其实都是共享 CoroutineScheduler 线程池,Kotlin 实现了一套线程池两种调度策略
通过内部的 mode 区分
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {......if (task.mode == TASK_NON_BLOCKING) {if (skipUnpark) returnsignalCpuWork()} else {signalBlockingWork(skipUnpark = skipUnpark)}}
Mode
处理策略
1.公式:max(corePoolSize, min(CPU 核心数 * 128, 2^21 - 2)),即大于 corePoolSize,小于 2^21 - 2
2.2^21 - 2 是一个很大的数约为 2M,但是 CoroutineScheduler 是不可能创建这么多线程的,所以就需要外部限制提交的任务数
3.Dispatchers.IO 构造时就通过 LimitingDispatcher 默认限制了最大线程并发数 parallelism 为 max(64, CPU 核心数),即最多只能提交 parallelism 个任务到 CoroutineScheduler 中执行,剩余的任务被放进队列中等待。 |
适合场景
2.复杂计算、视频解码等,如果此时线程数太多,超过了 CPU 核心数,那么这些超出来的线程是得不到 CPU 的执行的,只会浪费内存资源
3.因为线程本身也有栈等空间,同时线程过多,频繁的线程切换带来的消耗也会影响线程池的性能
4.对于 CPU 密集型任务,线程池并发线程数等于 CPU 核心数才能让 CPU 的执行效率最大化 || IO | 1.IO 密集型任务的特点是执行任务时 CPU 会处于闲置状态,任务不会消耗大量的 CPU 资源
2.网络请求、IO 操作等,线程执行 IO 密集型任务时大多数处于阻塞状态,处于阻塞状态的线程是不占用 CPU 的执行时间
3.此时 CPU 就处于闲置状态,为了让 CPU 忙起来,执行 IO 密集型任务时理应让线程的创建数量更多一点,理想情况下线程数应该等于提交的任务数,对于这些多创建出来的线程,当它们闲置时,线程池一般会有一个超时回收策略,所以大部分情况下并不会占用大量的内存资源
4.但也会有极端情况,所以对于 IO 密集型任务,线程池并发线程数应尽可能地多才能提高 CPU 的吞吐量,这个尽可能地多的程度并不是无限大,而是根据业务情况设定,但肯定要大于 CPU 核心数。 |
Unconfined
任务执行在默认的启动线程。之后由调用
resume的线程决定恢复协程的线程。
internal object Unconfined : CoroutineDispatcher() {//为 false 为不需要 dispatchoverride fun isDispatchNeeded(context: CoroutineContext): Boolean = false
override fun dispatch(context: CoroutineContext, block: Runnable) {// 只有当调用 yield 方法时,Unconfined 的 dispatch 方法才会被调用// yield() 表示当前协程让出自己所在的线程给其他协程运行 val yieldContext = context[YieldContext]if (yieldContext != null) {yieldContext.dispatcherWasUnconfined = truereturn}throw UnsupportedOperationException("Dispatchers.Unconfined.dispatch function can only be used by the yield function. " +"If you wrap Unconfined dispatcher in your code, make sure you properly delegate " +"isDispatchNeeded and dispatch calls.")}}
每一个协程都有对应的 Continuation 实例,其中的 resumeWith 用于协程的恢复,存在于 DispatchedContinuation
DispatchedContinuation
我们重点看 resumeWith 的实现以及类委托
internal class DispatchedContinuation<in T>(@JvmField val dispatcher: CoroutineDispatcher,@JvmField val continuation: Continuation<T>) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {.....override fun
resumeWith(result: Result<T>) {val context = continuation.context











评论