写点什么

Kotlin 协程,flutterplugin 打包 aar

用户头像
Android架构
关注
发布于: 7 小时前


协程是编译器的能力,因为协程并不需要操作系统和硬件的支持(线程需要),是编译器为了让开发者写代码更简单方便, 提供了一些关键字, 并在内部自动生成了一些字节码


线程和协程的目的差异


  • 线程的目的是提高 CPU 资源使用率, 使多个任务得以并行的运行,是为了服务于机器的.

  • 协程的目的是为了让多个任务之间更好的协作,主要体现在代码逻辑上,是为了服务开发者 (能提升资源的利用率, 但并不是原始目的)


线程和协程的调度差异


  • 线程的调度是系统完成的,一般是抢占式的,根据优先级来分配

  • 协程的调度是开发者根据程序逻辑指定好的,在不同的时期把资源合理的分配给不同的任务.


协程与线程的关系 协程并不是取代线程,而且抽象于线程之上,线程是被分割的 CPU 资源,协程是组织好的代码流程,协程需要线程来承载运行,线程是协程的资源

2、基本使用

CoroutineScope.launch

  • launch 函数可以启动新协程而不将结果返回给调用方

代码实现

//获取一个协程作用域用于创建协程 private val mScope = MainScope()


mScope.launch(Dispatchers.IO) {//IO 线程执行 getStringInfo()方法,返回结果 var res = getStringInfo()//获取结果后主线程提示更新 withContext(Dispatchers.Main) {Alerter.create(this@LearnCoroutineActivity).setTitle("Result").setText(res).show()}}


private suspend fun getStringInfo(): String {return withContext(Dispatchers.IO) {//在这 1000 毫秒内该协程所处的线程不会阻塞 delay(1000)"Coroutine-launch"}}


//在 onDestroy 生命周期方法之中要手动取消 override fun onDestroy() {super.onDestroy()mScope.cancel()}

步骤

  1. 获取一个协程作用域用于创建协程

  2. 通过协程作用域.launch 方法启动新的协程任务

  3. 启动时可以指定执行线程

  4. 内部通过 withContext()方法实现切换线程

  5. 在 onDestroy 生命周期方法之中要手动取消

协程作用域

  • MainScope 是协程默认提供的作用域,但是还有其他作用域更为方便

  • 可使用 lifecycleScope 或者 viewModelScope,这两种作用域会自动取消

  • 在 UI 组件中使用 LifecycleOwner.lifecycleScope,在 ViewModel 中使用 ViewModel.viewModelScope

CoroutineScope.async

  • async 函数实现返回值处理或者并发处理

返回值处理

private fun asyncReturn() {mScope.launch(Dispatchers.Main) {//新开一个协程去执行协程体,父协程的代码会接着往下走 var deferred = async(Dispatchers.IO) {delay(1000)"Coroutine-Async"}//等待 async 执行完成获取返回值,并不会阻塞线程,而是挂起,将线程的执行权交出去//直到 async 的协程体执行完毕后,会恢复协程继续执行 val data = deferred.await()Alerter.create(this@LearnCoroutineActivity).setTitle("Result").setText(data).show()}}

并发处理

private fun asyncConcurrent() {//coroutineContext 的创建下文会有分析 var coroutineContext = Job() +Dispatchers.Main +CoroutineExceptionHandler { coroutineContext, throwable ->Log.e("CoroutineException","CoroutineExceptionHandler: throwable")} +CoroutineName("asyncConcurrent")mScope.launch(coroutineContext) {val job1 = async(Dispatchers.IO) {delay(1000)"job1-finish"}val job2 = async(Dispatchers.IO) {delay(2000)"job2-finish"}val job3 = async(Dispatchers.IO) {delay(500)"job3-finish"}//等待各job执行完 将结果合并Alerter.create(this@LearnCoroutineActivity).setTitle("Result").setText("job1:{job1.await()},job2:{job3.await()}").show()}}


可以看到,这就是同步代码风格编写异步代码的强大之处,下面开始源码解析

3、coroutineContext 解析

  • CoroutineContext 是一个特殊的集合,同时包含了 Map 和 Set 的特点

  • 集合内部的元素 Element 是根据 key 去对应(Map 特点),但是不允许重复(Set 特点)

  • Element 之间可以通过+号进行组合

  • Element 有如下四类,共同组成了 CoroutineContext

  • Job:协程的唯一标识,用来控制协程的生命周期(new、active、completing、completed、cancelling、cancelled)

  • CoroutineDispatcher:指定协程运行的线程(IO、Default、Main、Unconfined)

  • CoroutineName: 指定协程的名称,默认为 coroutine

  • CoroutineExceptionHandler: 指定协程的异常处理器,用来处理未捕获的异常

3.1、Job Element

  • 每一个所创建的协程 (通过 launch 或者 async),会返回一个 Job实例,该实例是协程的唯一标识,并且负责管理协程的生命周期

Job 状态

Job 在执行的过程中,包含了一系列状态,虽然开发者没办法直接获取所有状态,但是 Job 之中有如下三个属性


  • isActive(是否活动)

  • isCompleted(是否已完成)

  • isCancelled(是否已取消)


根据属性就可以推断出 Job 的所处状态,状态如下


  • 新创建 (New)

  • 当一个协程创建后就处于新建(New)状态

  • 活跃 (Active)

  • 当调用 Job 的 start/join 方法后协程就处于活跃(Active)状态

  • 完成中 (Completing)

  • 当协程执行完成后或者调用 CompletableJob(CompletableJob 是 Job 的一个子接口)的 complete 方法都会让当前协程进入完成中(Completing)状态

  • 已完成 (Completed)

  • 处于完成中状态的协程会等所有子协程都完成后才进入完成(Completed)状态

  • 取消中 (Cancelling)

  • 当运行出错或者调用 Job 的 cancel 方法都会将当前协程置为取消中(Cancelling)状态

  • 已取消 (Cancelled)

  • 处于取消中状态的协程会等所有子协程都完成后才进入取消 (Cancelled)状态



wait children+-----+ start +--------+ complete +-------------+ finish +-----------+| New | -----> | Active | ---------> | Completing | -------> | Completed |+-----+ +--------+ +-------------+ +-----------+| cancel / fail || +----------------+| |V V+------------+ finish +-----------+| Cancelling | --------------------------------> | Cancelled |+------------+ +-----------+

Job 方法

fun start(): Boolean
  • 调用该函数来启动这个 Coroutine,如果当前 Coroutine 还没有执行调用该函数返回 true,如果当前 Coroutine 已经执行或者已经执行完毕,则调用该函数返回 false

fun cancel(cause: CancellationException? = null)
  • 通过可选的取消原因取消 Job

fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
  • 通过这个函数可以给 Job 设置一个完成通知,当 Job 执行完成的时候会同步执行这个通知函数。 回调的通知对象类型为:typealias CompletionHandler = (cause: Throwable?) -> Unit.

  • CompletionHandler 参数代表了 Job 是如何执行完成的。 cause 有下面三种情况:

  • 如果 Job 是正常执行完成的,则 cause 参数为 null

  • 如果 Job 是正常取消的,则 cause 参数为 CancellationException 对象。这种情况不应该当做错误处理,这是任务正常取消的情形。所以一般不需要在错误日志中记录这种情况。

  • 其他情况表示 Job 执行失败了。

  • 这个函数的返回值为 DisposableHandle 对象,如果不再需要监控 Job 的完成情况了, 则可以调用 DisposableHandle.dispose 函数来取消监听。如果 Job 已经执行完了, 则无需调用 dispose 函数了,会自动取消监听。

suspend fun join()(suspend 函数)
  • 用来在另外一个 Coroutine 中等待 job 执行完成后继续执行。

Job 异常传播

  • 协程是有父子级的概念,如果子 Job 在运行过程之中发生异常,那么父 Job 就会感知到并抛出异常。如果要抑制这种行为就需要使用 SupervisorJob


除了 CancellationException 以外的异常

SupervisorJob

fun main(){val parentJob = GlobalScope.launch {//childJob 是一个 SupervisorJobval childJob = launch(SupervisorJob()){throw NullPointerException()}childJob.join()println("parent complete")}Thread.sleep(1000)}


此时 childJob 抛出异常并不会影响 parentJob 的运行,parentJob 会继续运行并输出 parent complete。

3.2、CoroutineDispatcher Element

  • 用于指定协程的运行线程

  • kotlin 已经内置了 CoroutineDispatcher 的 4 个实现,可以通过 Dispatchers 的 Default、IO、Main、Unconfined 字段分别返回使用


public actual object Dispatchers {@JvmStaticpublic actual val Default: CoroutineDispatcher = createDefaultDispatcher()@JvmStaticpublic val IO: CoroutineDispatcher = DefaultScheduler.IO@JvmStaticpublic actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined@JvmStaticpublic actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher}

Default,IO

Default,IO 其实内部用的是一个线程池,下面逐个解析,看实现原理

default
  • Default 会根据 useCoroutinesScheduler 属性(默认为 true)去获取对应的线程池

  • DefaultScheduler(useCoroutinesScheduler=ture):kotlin 自己实现的线程池逻辑

  • CommonPool(useCoroutinesScheduler=false):java 类库中的 Executor 实现线程池逻辑


internal actual fun createDefaultDispatcher(): CoroutineDispatcher =if (useCoroutinesScheduler) DefaultScheduler else CommonPoolinternal object DefaultScheduler : ExperimentalCoroutineDispatcher() {.....}//委托类 public open class ExperimentalCoroutineDispatcher(private val corePoolSize: Int,private val maxPoolSize: Int,private val idleWorkerKeepAliveNs: Long,private val schedulerName: String = "CoroutineScheduler") : ExecutorCoroutineDispatcher() {}//java 类库中的 Executor 实现线程池逻辑 internal object CommonPool : ExecutorCoroutineDispatcher() {}//共同父类,定义行为 public abstract class ExecutorCoroutineDispatcher: CoroutineDispatcher(), Closeable {}


ExperimentalCoroutineDispatcher


  • DefaultScheduler 的主要实现都在它的父类 ExperimentalCoroutineDispatcher 中


public open class ExperimentalCoroutineDispatcher(private val corePoolSize: Int,private val maxPoolSize: Int,private val idleWorkerKeepAliveNs: Long,private val schedulerName: String = "CoroutineScheduler") : ExecutorCoroutineDispatcher() {public constructor(corePoolSize: Int = CORE_POOL_SIZE,maxPoolSize: Int = MAX_POOL_SIZE,schedulerName: String = DEFAULT_SCHEDULER_NAME) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName)


...//省略构造


//创建 CoroutineScheduler 实例 private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)


override val executor: Executorget() = coroutineScheduler


override fun dispatch(context: CoroutineContext, block: Runnable): Unit =try {//dispatch 方法委托到 CoroutineScheduler 的 dispatch 方法 coroutineScheduler.dispatch(block)} catch (e: RejectedExecutionException) {....}


override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit =try {//dispatchYield 方法委托到 CoroutineScheduler 的 dispatchYield 方法 coroutineScheduler.dispatch(block, tailDispatch = true)} catch (e: RejectedExecutionException) {...}


internal fun dispatchWithContext(block: Runnable, context: TaskContext, tailDispatch: Boolean) {try {//dispatchWithContext 方法委托到 CoroutineScheduler 的 dispatchWithContext 方法 coroutineScheduler.dispatch(block, context, tailDispatch)} catch (e: RejectedExecutionException) {....}}override fun close(): Unit = coroutineScheduler.close()//实现请求阻塞 public fun blocking(parallelism: Int = BLOCKING_DEFAULT_PARALLELISM): CoroutineDispatcher {require(parallelism > 0) { "Expected positive parallelism level, but have parallelism" }return LimitingDispatcher(this, parallelism, null, TASK_PROBABLY_BLOCKING)}//实现请求数量限制public fun limited(parallelism: Int): CoroutineDispatcher {require(parallelism > 0) { "Expected positive parallelism level, but have parallelism" }require(parallelism <= corePoolSize) { "Expected parallelism level lesser than core pool size (parallelism" }return LimitingDispatcher(this, parallelism, null, TASK_NON_BLOCKING)}


....//省略一些供测试的方法,更好的跟踪同步状态}

IO
  • IO 的实现其实是 LimitingDispatcher


val IO: CoroutineDispatcher = LimitingDispatcher(this,systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)),"Dispatchers.IO",TASK_PROBABLY_BLOCKING)


LimitingDispatcher


  • IO 的实现类会有一些最大请求限制,以及队列处理


private class LimitingDispatcher(private val dispatcher: ExperimentalCoroutineDispatcher,private val parallelism: Int,private val name: String?,override val taskMode: Int) : ExecutorCoroutineDispatcher(), TaskContext, Executor {//同步阻塞队列 private val queue = ConcurrentLinkedQueue<Runnable>()//cas 计数 private val inFlightTasks = atomic(0)


override fun dispatch(context: CoroutineContext, block: Runnable) = dispatch(block, false)


private fun dispatch(block: Runnable, tailDispatch: Boolean) {var taskToSchedule = blockwhile (true) {


if (inFlight <= parallelism) {//LimitingDispatcher 的 dispatch 方法委托给了 DefaultScheduler 的 dispatchWithContext 方法 dispatcher.dispatchWithContext(taskToSchedule, this, tailDispatch)return}..//省略了一些队列处理逻辑}}}

CoroutineScheduler

  • Default、IO 其实都是共享 CoroutineScheduler 线程池,Kotlin 实现了一套线程池两种调度策略

  • 通过内部的 mode 区分


fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {......if (task.mode == TASK_NON_BLOCKING) {if (skipUnpark) returnsignalCpuWork()} else {signalBlockingWork(skipUnpark = skipUnpark)}}

Mode
处理策略


1.公式:max(corePoolSize, min(CPU 核心数 * 128, 2^21 - 2)),即大于 corePoolSize,小于 2^21 - 2


2.2^21 - 2 是一个很大的数约为 2M,但是 CoroutineScheduler 是不可能创建这么多线程的,所以就需要外部限制提交的任务数


3.Dispatchers.IO 构造时就通过 LimitingDispatcher 默认限制了最大线程并发数 parallelism 为 max(64, CPU 核心数),即最多只能提交 parallelism 个任务到 CoroutineScheduler 中执行,剩余的任务被放进队列中等待。 |

适合场景


2.复杂计算、视频解码等,如果此时线程数太多,超过了 CPU 核心数,那么这些超出来的线程是得不到 CPU 的执行的,只会浪费内存资源


3.因为线程本身也有栈等空间,同时线程过多,频繁的线程切换带来的消耗也会影响线程池的性能


4.对于 CPU 密集型任务,线程池并发线程数等于 CPU 核心数才能让 CPU 的执行效率最大化 || IO | 1.IO 密集型任务的特点是执行任务时 CPU 会处于闲置状态,任务不会消耗大量的 CPU 资源


2.网络请求、IO 操作等,线程执行 IO 密集型任务时大多数处于阻塞状态,处于阻塞状态的线程是不占用 CPU 的执行时间


3.此时 CPU 就处于闲置状态,为了让 CPU 忙起来,执行 IO 密集型任务时理应让线程的创建数量更多一点,理想情况下线程数应该等于提交的任务数,对于这些多创建出来的线程,当它们闲置时,线程池一般会有一个超时回收策略,所以大部分情况下并不会占用大量的内存资源


4.但也会有极端情况,所以对于 IO 密集型任务,线程池并发线程数应尽可能地多才能提高 CPU 的吞吐量,这个尽可能地多的程度并不是无限大,而是根据业务情况设定,但肯定要大于 CPU 核心数。 |

Unconfined

  • 任务执行在默认的启动线程。之后由调用resume的线程决定恢复协程的线程。


internal object Unconfined : CoroutineDispatcher() {//为 false 为不需要 dispatchoverride fun isDispatchNeeded(context: CoroutineContext): Boolean = false


override fun dispatch(context: CoroutineContext, block: Runnable) {// 只有当调用 yield 方法时,Unconfined 的 dispatch 方法才会被调用// yield() 表示当前协程让出自己所在的线程给其他协程运行 val yieldContext = context[YieldContext]if (yieldContext != null) {yieldContext.dispatcherWasUnconfined = truereturn}throw UnsupportedOperationException("Dispatchers.Unconfined.dispatch function can only be used by the yield function. " +"If you wrap Unconfined dispatcher in your code, make sure you properly delegate " +"isDispatchNeeded and dispatch calls.")}}


  • 每一个协程都有对应的 Continuation 实例,其中的 resumeWith 用于协程的恢复,存在于 DispatchedContinuation

DispatchedContinuation
  • 我们重点看 resumeWith 的实现以及类委托


internal class DispatchedContinuation<in T>(@JvmField val dispatcher: CoroutineDispatcher,@JvmField val continuation: Continuation<T>) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {.....override fun


《Android学习笔记总结+最新移动架构视频+大厂安卓面试真题+项目实战源码讲义》
浏览器打开:qq.cn.hn/FTe 免费领取
复制代码


resumeWith(result: Result<T>) {val context = continuation.context

用户头像

Android架构

关注

还未添加个人签名 2021.10.31 加入

还未添加个人简介

评论

发布
暂无评论
Kotlin协程,flutterplugin打包aar