写点什么

Kotlin 学习手记——协程进阶,嵌入式 android 开发教程

用户头像
Android架构
关注
发布于: 7 小时前

val producer = GlobalScope.launch {


for (i in 0..3) {


log("sending", i)


channel.send(i)


log("sent", i)


}


channel.close()


}


//消费者 收


val consumer = GlobalScope.launch {


while (!channel.isClosedForReceive) {


log("receiving")


val value = channel.receiveOrNull()


log("received", value)


}


}


producer.join()


consumer.join()


}


Channel(Channel.RENDEZVOUS ) 的方式是发一个收一个,边发边收,如果没有接受的,发送者会挂起等待,输出如下:



Channel(Channel.UNLIMITED ) 的方式是全部发送完毕,才会接收到,先发后收,发送者发送完就返回了,不管有没有接受者,输出如下:



Channel(Channel.CONFLATED ) 的方式是不管发了多少个,只能收到最后一个,也是发送完就返回了,不管有没有接受者,输出如下:



Channel(Channel.BUFFERED ) 的方式也是发送者发送完就返回了,不管有没有接受者,可以指定 buffer 大小,输出如下:



Channel(1) 的方式指定管道的容量大小,如果数据超过容量,发送者就会挂起等待,直到有接受者取走数据,发送者才发送下一批数据,




channel 接受数据的时候可以直接当成迭代器使用:


suspend fun iterateChannel() {


val channel = Channel<Int>(Channel.UNLIMITED)


val producer = GlobalScope.launch {


for (i in 0..3) {


log("sending", i)


channel.send(i)


log("sent", i)


}


channel.close()


}


val consumer = GlobalScope.launch {


for (i in channel) {


log("received: ", i)


}


}


producer.join()


consumer.join()


}



suspend fun producer() {


val receiveChannel = GlobalScope.produce(capacity = Channel.UNLIMITED) {


for (i in 0..3) {


log("sending", i)


send(i)


log("sent", i)


}


}


val consumer = GlobalScope.launch {


for (i in receiveChannel) {


log("received: ", i)


}


}


consumer.join()


}


suspend fun consumer() {


val sendChannel = GlobalScope.actor<Int>(capacity = Channel.UNLIMITED) {


for (i in this) {


log("received: ", i)


}


}


val producer = GlobalScope.launch {


for (i in 0..3) {


log("sending", i)


sendChannel.send(i)


log("sent", i)


}


}


producer.join()


}




suspend fun broadcast() {


//下面几种都可以创建一个 BroadcastChannel


//val broadcastChannel = BroadcastChannel<Int>(Channel.BUFFERED)


//val broadcastChannel = Channel<Int>(Channel.BUFFERED).broadcast()


val broadcastChannel = GlobalScope.broadcast {


for (i in 0..5) {


send(i)


}


}


//启动 5 个接受者,每个都能收到


List(5) { index ->


GlobalScope.launch {


val receiveChannel = broadcastChannel.openSubscription()


for (i in receiveChannel) {


log("[#i")


}


}


}.joinAll()


}


输出:


Task :ChannelsKt.main()


21:07:12:924 [DefaultDispatcher-worker-3 @coroutine#2] [#0] received: 0


21:07:12:924 [DefaultDispatcher-worker-5 @coroutine#6] [#4] received: 0


21:07:12:924 [DefaultDispatcher-worker-1 @coroutine#3] [#1] received: 0


21:07:12:925 [DefaultDispatcher-worker-4 @coroutine#5] [#3] received: 0


21:07:12:925 [DefaultDispatcher-worker-2 @coroutine#4] [#2] received: 0


21:07:12:944 [DefaultDispatcher-worker-1 @coroutine#3] [#1] received: 1


21:07:12:943 [DefaultDispatcher-worker-3 @coroutine#2] [#0] received: 1


21:07:12:943 [DefaultDispatcher-worker-5 @coroutine#6] [#4] received: 1


21:07:12:944 [DefaultDispatcher-worker-4 @coroutine#5] [#3] received: 1


21:07:12:945 [DefaultDispatcher-worker-2 @coroutine#4] [#2] received: 1


21:07:12:945 [DefaultDispatcher-worker-2 @coroutine#4] [#2] received: 2


21:07:12:945 [DefaultDispatcher-worker-8 @coroutine#3] [#1] received: 2


21:07:12:945 [DefaultDispatcher-worker-8 @coroutine#3] [#1] received: 3


21:07:12:945 [DefaultDispatcher-worker-7 @coroutine#4] [#2] received: 3


21:07:12:945 [DefaultDispatcher-worker-2 @coroutine#6] [#4] received: 2


21:07:12:946 [DefaultDispatcher-worker-2 @coroutine#6] [#4] received: 3


21:07:12:946 [DefaultDispatcher-worker-8 @coroutine#5] [#3] received: 2


21:07:12:946 [DefaultDispatcher-worker-8 @coroutine#5] [#3] received: 3


21:07:12:946 [DefaultDispatcher-worker-3 @coroutine#2] [#0] received: 2


21:07:12:946 [DefaultDispatcher-worker-3 @coroutine#2] [#0] received: 3


21:07:12:946 [DefaultDispatcher-worker-1 @coroutine#3] [#1] received: 4


21:07:12:946 [DefaultDispatcher-worker-3 @coroutine#2] [#0] received: 4


21:07:12:946 [DefaultDispatcher-worker-1 @coroutine#6] [#4] received: 4


21:07:12:947 [DefaultDispatcher-worker-6 @coroutine#5] [#3] received: 4


21:07:12:947 [DefaultDispatcher-worker-6 @coroutine#5] [#3] received: 5


21:07:12:947 [DefaultDispatcher-worker-2 @coroutine#3] [#1] received: 5


21:07:12:947 [DefaultDispatcher-worker-6 @coroutine#2] [#0] received: 5


21:07:12:947 [DefaultDispatcher-worker-2 @coroutine#6] [#4] received: 5


21:07:12:947 [DefaultDispatcher-worker-3 @coroutine#4] [#2] received: 4


21:07:12:947 [DefaultDispatcher-worker-3 @coroutine#4] [#2] received: 5




Select 的使用场景是多个协程异步执行时,获取最先结束的那个协程结果返回,比如加载图片时,可能从网络获取,也可能从本地获取,这两种可能同时异步执行,使用 Select 就会优先获取返回比较快的本地结果展示,然后我们再去获取网络最新的更新即可。



使用例子:


val localDir = File("localCache").also { it.mkdirs() }


val gson = Gson()


fun CoroutineScope.getUserFromApi(login: String) = async(Dispatchers.IO){


gitHubServiceApi.getUserSuspend(login)


}


fun CoroutineScope.getUserFromLocal(login:String) = async(Dispatchers.IO){


File(localDir, login).takeIf { it.exists() }?.readText()?.let { gson.fromJson(it, User::class.java) }


}


fun cacheUser(login: String, user: User){


File(localDir, login).writeText(gson.toJson(user))


}


data class Response<T>(val value: T, val isLocal: Boolean)


suspend fun main() {


val login = "test"


GlobalScope.launch {


val localDeferred = getUserFromLocal(login)


val remoteDeferred = getUserFromApi(login)


//val userResponse = Response(localDeferred.await(), true)


//select 选择优先返回的结果


val userResponse = select<Response<User?>> {


localDeferred.onAwait { Response(it, true) }


remoteDeferred.onAwait { Response(it, false) }


}


userResponse.value?.let { log(it) } //获取结果显示 输出


//如果是本地的结果,重新请求,并缓存本地


userResponse.isLocal.takeIf { it }?.let {


val userFromApi = remoteDeferred.await()


cacheUser(login, userFromApi)


log(userFromApi)


}


}.join()


}


如果有多个异步请求同时返回,select 会按顺序取第一个,想要随机的取可以使用 selectUnbiased


![在这里插入图片描述](https://img-blog.csdnimg.cn/2021012921120296.jpg?x-oss-proc


《Android学习笔记总结+最新移动架构视频+大厂安卓面试真题+项目实战源码讲义》
浏览器打开:qq.cn.hn/FTe 免费领取
复制代码


ess=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2x5YWJjMTIzNDU2,size_16,color_FFFFFF,t_70#pic_center)


select 大括号中 onAwait 的写法等价于 await() 的写法,localDeferred.await(),还有很多操作join send等都是一样的前面加on



例子:使用 channel 和 select 实现统计代码行数


val KotlinFileFilter = { file: File -> file.isDirectory || file.name.endsWith(".kt") }


data class FileLines(val file: File, val lines: Int) {


override fun toString(): String {


return "lines"


}


}


suspend fun main() {


val result = lineCounter(File("."))


log(result)


}


suspend fun lineCounter(root: File): HashMap<File, Int> {


return Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1).asCoroutineDispatcher()


//使用 use 自动关闭资源


.use {


//withContext 是一个挂起函数 返回值是最后一行表达式的值


withContext(it){


val fileChannel = walkFile(root)


//定义 5 个同时读取


val fileLinesChannels = List(5){


fileLineCounter(fileChannel)


}


resultAggregator(fileLinesChannels)


}


}


}


//创建生产者返回 ReceiveChannel


fun CoroutineScope.walkFile(root: File): ReceiveChannel<File> {


return produce(capacity = Channel.BUFFERED) {


fileWalker(root)


}


}


//递归过滤 kotlin 文件并发送文件


suspend fun SendChannel<File>.fileWalker(file: File) {


if(file.isDirectory){


file.listFiles()?.filter(KotlinFileFilter)?.forEach { fileWalker(it) }


} else {


send(file)


}


}


//输入 File 返回 FileLines 对象


fun CoroutineScope.fileLineCounter(input: ReceiveChannel<File>): ReceiveChannel<FileLines> {


return produce(capacity = Channel.BUFFERED) {


for (file in input){


//统计行数


file.useLines {


send(FileLines(file, it.count())) //发送结果


}


}


}


}


suspend fun CoroutineScope.resultAggregator(channels: List<ReceiveChannel<FileLines>>): HashMap<File, Int> {


val map = HashMap<File, Int>()


channels.aggregate {


filteredChannels ->


//使用 select 返回最快统计的那一个


select<FileLines?> {


filteredChannels.forEach {


it.onReceiveOrNull {


log("received: $it")


it


}


}


} ?.let {


map[it.file] = it.lines


}


}


return map


}


//tailrec-递归优化 定义 List<ReceiveChannel<FileLines>>的扩展函数,过滤掉已完成的


tailrec suspend fun List<ReceiveChannel<FileLines>>.aggregate(block: suspend (List<ReceiveChannel<FileLines>>) -> Unit) {


block(this)//消费一次


//从当前 list 中过掉 isClosedForReceive=true 的 ReceiveChannel


filter { !it.isClosedForReceive }.takeIf { it.isNotEmpty() }?.aggregate(block)//递归


}



Sequence 中不能调用其他挂起函数,不能设置调度器,只能单线程中使用。而 Flow 可以支持:




Flow 中调用 delay 会把后面的代码切换到默认调度器上执行,也可以显示的指定调度器:




suspend fun flows(){


val intFlow = flow {


emit(1)


delay(100)


emit(2)


emit(3)


}


val dispatcher = Executors.newSingleThreadExecutor { Thread(it, "MyThread").also { it.isDaemon = true } }.asCoroutineDispatcher()


GlobalScope.launch(dispatcher) {


intFlow.flowOn(Dispatchers.IO)


.collect { log(it) }


}.join()


}



对比 RxJava 的线程切换方式很像,flowOn 传递的调度器指定 flow 里面的代码执行在哪个线程上,而 launch 传递的调度器指定 flow 执行完后 resume 恢复执行在哪个线程上。




flow-catch-onCompletion 和 java 的 try-catch-finally 基本类似,onCompletion 中的代码是一定会执行的,不同的是有异常发生的时候,会携带一个异常参数。


suspend fun exception(){


flow<Int> {


emit(1)


throw ArithmeticException("Div 0")


}.catch {t: Throwable ->


log("caught error: $t")


}.onCompletion { t: Throwable? ->


log("finally.")


}.flowOn(Dispatchers.IO)


.collect { log(it) }


// flow { // bad!!!


// try {


// emit(1)


// throw ArithmeticException("Div 0")


// } catch (t: Throwable){


// log("caught error: $t")


// } finally {


// log("finally.")


// }


// }


}


flow 的异常捕获使用 flow 自己的 api 处理就行,不需要内部再进行 try-catch.



Flow 没有提供单独的取消方法,要取消 Flow 只需要取消 flow.collect { } 所在的协程即可。





Flow 内部不能再去切换线程,如果需要这样做可以使用 channelFlow



由于是流的概念 flow 也有背压的问题,也就是接受端来不及消费,发送端会累积大量的数据,感觉 kotlin 抄 RxJava 也抄了不少啊啊啊。。。背压解决办法,要么采用只保留最新 conflate,要么取消之前发送的值 collectLatest



suspend fun backPressure(){


flow {


emit(1)


delay(50)


emit(2)


}.collectLatest { value ->


println("Collecting $value")


delay(100) // Emulate work


println("$value collected")


}


}


上面的例子 collectLatest 当中 100 毫秒之后只能接受到 2,因为延时 100 的过程中发送 2 的时候会把 1 取消掉。

用户头像

Android架构

关注

还未添加个人签名 2021.10.31 加入

还未添加个人简介

评论

发布
暂无评论
Kotlin学习手记——协程进阶,嵌入式android开发教程