[译] 管中窥豹:RxJava 与 Kotlin 协程的对比,Android 社招面试题
(System.currentTimeMillis() - t0)) // (9)
}
添加的一些输出是用来观察这段代码如何运行的。
通常线性编程的情况下,是没有直接重试某个操作的快捷方法的,因此,我们需要建立一个循环以及重试计数器 i。
通过 async(CommonPool) 来执行异步操作,该函数可以在一些后台线程立即启动并执行函数。该函数会返回一个 Deferred,稍后会用到这个值。 如果用 await() 来得到 v1 作为最终值的话,当前线程将会挂起,另外,对 v2 的计算也不会开始,除非前一个恢复执行。除此以外,我们还需要在超时的情况下取消当前操作的方法。参考步骤 3 和 5。
如果想让两个操作都超时的话,看起来我们只能在另一个异步线程中执行等待操作。launch(CommonPool) 方法会返回一个可以用在这种情况下的 Job 对象。 与 async 的区别是,这样执行无法返回值。之所以保存返回的 Job 是因为先前的异步操作可能及时返回,就不再需要取消操作了。
在超时的任务中,我们用 TimeoutException 来取消 v1 和 v2 ,这将恢复任何已经挂起来等待二者返回的操作。
等待两个函数运行结果。如果超时,await 将重新扔出在第四步中使用的异常。
如果没有异常,则取消不再需要执行的超时任务,并跳出循环。
如果有超时,则走老一套捕获异常并执行状态检查来确定下一步操作。注意任何其他异常都会直接被抛出并退出循环。
万一是第三次或更多次的尝试,直接扔出异常,什么都不做。
如果一切按剧本走,打印运行的总时间,然后退出当前函数。
看起来挺简单的,尽管取消机制可能搞个大新闻:如果 v2 因为其他异常(比如网络原因导致的 IOException)崩溃了呢?当然我们得处理这些情况来确保任务可以在各种情况下被取消(举个栗子,试试 Kotlin 中的资源?)。然而,这种情况发生的背景是 v1 会及时返回,直到尝试 await 之前都无法取消 v1 或检测 v2 的崩溃。
不要在意那些细节,反正程序跑起来了,运行结果如下:
Attempt 1 at T=0Cancelling at T=531Crash at T=2017Attempt 2 at T=2017Cancelling at T=2517Crash at T=4026Attempt 3 at T=40263End a
一共进行了 3 次尝试,最后一次成功了,值是 3。是不是和剧本一模一样的?一点都不快(此处有双关(译者并没有看出来哪里有双关))! 我们可以看到取消事件发生的大概时间,两次不成功的请求之后大约 500 ms ,然而异常捕获发生在大约 2000 ms 之后!我们知道 cancel() 被成功调用是因为我们捕获了异常。然而,看起来函数中的 Thread.sleep() 并没有被打断,或者用协程的说法,没有在打断异常时恢复。这可能是 CommonPool 的一部分,对 Future.cancel(false) 的调用处于基础结构中,抑或只是简单的程序限制。
响应式
接下来我们看看 RxJava 2 是如何实现相同操作的。让人失望的是,如果函数前加了 suspended,就无法通过普通方式调用了,所以我们还得用普通方法重写一下两个函数:
fun f3(i: Int) : Int {Thread.sleep(if (i != 2) 2000L else 200L)return 1}
fun f4(i: Int) : Int {Thread.sleep(if (i != 2) 2000L else 200L)return 2}
为了匹配阻塞外部环境的功能,我们采用 ?[RxJava 2 Extensions](
) 中的 BlockingScheduler 来提供返回到主线程的功能。顾名思义,它阻塞了一开始的调用者/主线程,直到有任务通过调度器来提交并运行。
fun reactiveWay() {RxJavaPlugins.setErrorHandler({ }) // (1)
val sched = BlockingScheduler() // (2)sched.execute {val t0 = System.curre
ntTimeMillis()val count = Array<Int>(1, { 0 }) // (3)
Single.defer({ // (4)val c = count[0]++;println("Attempt " + (c + 1) +" at T=" + (System.currentTimeMillis() - t0))
Single.zip( // (5)Single.fromCallable({ f3(c) }).subscribeOn(Schedulers.io()),Single.fromCallable({ f4(c) }).subscribeOn(Schedulers.io()),BiFunction<Int, Int> { a, b -> a + b } // (6))}).doOnDispose({ // (7)println(" Cancelling at T=" +(System.currentTimeMillis() - t0))}).timeout(500, TimeUnit.MILLISECONDS) // (8).retry({ x, e ->println(" Crash at " +(System.currentTimeMillis() - t0))x < 3 && e is TimeoutException // (9)}).doAfterTerminate { sched.shutdown() } // (10).subscribe({println(it)println("End at T=" +(System.currentTimeMillis() - t0)) // (11)},{ it.printStackTrace() })}}
实现起来有点长,对那些不熟悉 lambda 的人来说看起来可能有点可怕。
众所周知 RxJava 2 无论如何都会传递异常。在 Android 上,无法传递的异常会使应用崩溃,除非使用 RxJavaPlugins.setErrorHandler 来捕获。在此,因为我们知道取消事件会打断 Thread.sleep() ,调用栈打出来的结果只会是一团乱麻,我们也不会去注意这么多的异常。
设置 BlockingScheduler 并分发第一个执行的任务,以及剩下的主线程执行逻辑。 这是由于一旦锁住, start() 将会给主线程增加一个活锁状态,直到有任何随后事件打破锁定,主线程才会继续执行。
设置一个堆变量来记录重试次数。
一旦有通过 Single.defer 的订阅,计数器加一并打印 “Attempt” 字符串。该操作符允许保留每个订阅的状态,这正是我们在下游执行的 retry() 操作符所期望的。
使用 zip 操作符来异步执行两个元素的计算,二者都在后台线程执行自己的函数。
当二者都完成时,将结果相加。
为了让超时取消,使用 doOnDispose 操作符来打印当前状态和时间。
使用?timeout 操作符定义求和的超时。如果超时则会发送 TimeoutException(例如该场景下没有反馈时)。
retry 操作符的重载提供了重试时间以及当前错误。打印错误后,应该返回 true ——也就是说必须执行重试——如果重试次数小于三并且当前错误是 TimeoutException 的话。任何其他错误只会终止而不是触发重试。
一旦完成,我们需要关闭调度器,来让释放主线程并退出 JVM。
当然,在完成前我们需要打印求和结果以及整个操作的耗时。
可能有人说,这比协程的实现复杂多了。不过……至少跑起来了:
Cancelling at T=4527
Attempt 1 at T=72Cancelling at T=587Crash at 587Attempt 2 at T=587Cancelling at T=1089Crash at 1090Attempt 3 at T=1090Cancelling at T=12913End at T=1292
有趣的是,如果在 main 函数中同时调用两个函数的话,Cancelling at T=4527 是在调用 coroutineWay() 方法时打印出来的:尽管最后根本没有时间消耗,取消事件自身就浪费在无法停止的计算问题上,也因此在取消已经完成的任务上增加了额外消耗。
另一方面,RxJava 至少及时地取消和重试了函数。然而,实际上也有几乎没必要的 Cancelling at T=1291 被打印出来了。呐,没办法,写出来就这样了,或者说我懒吧,在 Single.timeout 中是这样实现的:如果没有延时就完成了的话,无论操作符真实情况如何,内部的 CompositeDisposable 代理了上游的 Disposable 并将其和操作符一起取消了。
结论
最后呢,我们通过一个小小的改进来看一下响应式设计的强大之处:如果只需要重试没有响应的函数的话,为什么我们要重试整个过程呢?改进方法也可以很容易地在 RxJava 中找到:将 doOnDispose().timeout().retry()?放到每一个函数调用链中(也许用 transfomer 可以避免代码的重复):
val timeoutRetry = SingleTransformer<Int, Int> {it.doOnDispose({println(" Cancelling at T=" +(System.currentTimeMillis() - t0))}).timeout(500, TimeUnit.MILLISECONDS).retry({ x, e ->println(" Crash at " +(System.currentTimeMillis() - t0))x < 3 && e is TimeoutException})}
// ...
Single.zip(Single.fromCallable({ f3(c) }).subscribeOn(Schedulers.io()).compose(timeoutRetry),Single.fromCallable({ f4(c) })
评论