响应式编程的复杂度和简化
作者:蒋文豪
响应式系统不是今天的主题,我们要讨论更具体的话题,即响应式代码的编写会有哪些复杂度,应该如何简化。
一、什么是响应式编程
什么是响应式编程,它是一种编程范式?还是一种设计模式?抑或是其他?响应式系统和响应式编程有什么关系?又比如,响应式编程它适用于什么场景?解决什么问题?
微软于 2011 年率先建设了.Net 上的 Rx 库,以简化容易出错的异步和事件驱动编程,迈出了响应式编程的第一步,随后业界为许多编程语言提供了对应的实现。
.Net 上的 Rx 库地址:https://docs.microsoft.com/en-us/previous-versions/dotnet/reactive-extensions/hh242985(v=vs.103)
什么是响应式,我们从一个例子开始。
在上面的表格中,建立了单元格之间的关系:A1 = B1 + C1
,建立关系之后 A1 将响应任何对于 B1 和 C1 的变化,毫无疑问,这就是一种响应式行为。
我觉得这个例子很棒的地方在于,它显然很简单,同时又足够深刻,首先,它充分的体现了响应式的概念,其次,变化发生时,肯定触发了某些过程的执行,说明背后存在关系的建立和沿着关系传播的变化,再次,稍微深入一点看,B1 和 C1 的变化可以是一系列的变化,可以很自然的引申到流的概念,最后,它有一个很高级的抽象,对使用方来说,整个过程是声明式的。
当然,举例子来说明一个概念的时候,本质上是用一个外延在解释一个概念的内涵,往往是会将内涵缩小的,所以我们可以尝试推广这个外延。列出这个例子的特征:
描述了一个单元格里的整数等于另外两个单元格里整数相加,当后者每次发生变化时,变化都会传播到第一个单元格,并进行求值。
关键词为整数、等于、相加、变化、每次、传播、求值。前三个关键词仅仅和例子相关,可以直接去掉。变化可以推广为数据,每次可以在逻辑上等价于流的概念,流可以有 0 个、1 个或多个数据,传播可以推广为通信(在这个意义上,函数调用、RPC、socket、MQ 都是通信),求值推广为执行一个过程。所以我们可以得出响应式编程的定义:
通过声明式的通信定义,将数据流与过程组合起来,从而实现数据驱动过程的一种复合编程范式。
时至今日,业界对于响应式的定义仍然是不统一的,因此这是我自己的理解。响应式的基础概念是数据流,理念是过程的执行是通过响应数据来驱动的,核心是构造数据和过程的响应关系,并且能够让数据沿着关系传播驱动过程,因此响应式编程本质上是一种对通信的抽象,说它是一种编程范式,是因为它提供一种对于数据与过程组合方式的看法,说它是复合范式而不是基本范式,是因为它不像 OOP 或者 FP 一样提供的是对于数据和过程的看法,而是以两者为基础,所以可以有对象响应式和函数响应式。
当我们基于响应式构建系统时,就是响应式系统,响应式系统的构建原则可以参考此处(地址:https://www.reactiveprinciples.org/patterns/communicate-facts.html),总的来说,系统会分割成一个一个的分区,分区内部对状态进行本地化,分区之间通过通信进行异步解耦,可以通过控制这个通信的过程,实现系统的弹性扩缩容和部分组件失败的回弹性。
响应式系统不是今天的主题,我们要讨论更具体的话题,即响应式代码的编写会有哪些复杂度,应该如何简化。
二、响应式编程的复杂度
响应式编程的复杂度来自于 4 个方面:
可以有 0 次、1 次或多次数据产生,也就是数据流;
除了数据之外,还有能够标识错误和完成(正常结束);
数据流和数据流、数据流和过程的组合复杂度很高;
在上面的基础上,需要处理整个过程中线程切换、并发同步、数据缓冲等问题。
为了支持数据流的概念,可以产生 0 次、1 次或多次数据产生,API 设计需要把数据回调和结果回调分开,通常也会把错误回调和完成回调分开,这种接口被称为流式接口,一个标准的流式接口设计如下所示:
显然,流式接口是普通异步接口将一次结果向多次结果的推广,这种推广同时也增加了逻辑的复杂度。
我们可以通过一个逻辑上简单的例子来看一下流式接口的使用过程,为了关注于核心的复杂度,只会体现前 3 个方面,一方面是由于加入第 4 点的话会导致代码过于冗长混淆关注点,另一方面相信各位对第 4 点本身的复杂度和它引起的众多问题已经非常熟悉了。
这个例子很简单,只有三步:
1)假设需要为一个店铺提供一个订单展示页面,这些订单来自两个不同的平台“鹅鹅鹅”和“鸭鸭鸭”,他们各自提供了查询的接口(listOrders,为了简单假设他们提供的模型和接口完全一致);
2)订单列表需要展示用户的昵称等信息,需要通过对应平台的另外一个接口(queryUserInfo)查询;
3)由于 SDK 缓存、持久化、网络请求策略,数据无法一次性获取,这两个接口可能存在多次数据回调。
进一步简化问题,我们忽略变更处理、UI 渲染和用户交互处理,仅仅考虑数据加载,这需要组合 2 个阶段的 4 次接口调用,先分别请求两个平台的订单,使用订单请求对应平台的 userInfo,最后合并成完整数据:
在这个接口的实现中,数据回调最简单,在没有结束的情况下,多次回调的数据可以直接回调,问题是如何保证错误和完成有且仅有一次回调,且结果回调后不再回调数据,即:
什么时候回调错误?
什么时候回调完成?
如果我们认为一个接口出错,就回调错误,这是最简单的错误处理,只需要检查和设置结束状态,在没有结束时的第一个错误进行回调即可,注意,我们需要在 userInfo 的请求中也做类似的处理,并保证错误回调后不再执行任何回调。
完成的回调要比错误复杂的多,我们可以来思考一下:
首先,我们不能在 listOrders 的 onComplete 里面取回调完成,因为这里不能代表 queryUserInfo 这个接口也完成了;
其次,我们也不能简单的通过所有 queryUserInfo 都完成了就回调完成,因为 listOrders 在完成前仍然有可能返回新的订单数据。
也就是说,这里的完成需要在 queryUserInfo 进行判断,并且也需要考虑外层请求的完成情况,比普通异步接口的级联要多了两个维度。这仅仅是 2 种接口 4 次请求,在真实的编程中,接口数量会多得多,并且需要把第 4 点加进来,线程/队列、并发、同步、缓冲区,还要处理新数据推送响应,再考虑调试、监控、排查,复杂度显然会继续大幅增长,保证这个过程的正确性是一件痛苦的事情。
三、响应式编程的复杂度使用 Rx/Combine 简化响应式编程
为了解决这些问题,业界搞出了 Reactive Streams 规范(地址:https://www.reactive-streams.org/),也出现了若干的实现,都以工具库的形式提供,包括 Rx 系列、Reactor,以及苹果功能类似的 Combine。作为一个 iOS 开发,我对 RxSwift 和 Combine 比较了解,两者主要的区别在于 Combine 多了一个 Subscription 的抽象来协调 Publisher 和 Subscriber 之间的行为,尤其是 Back Pressure 相关的控制,但总的来说,都提供了对于异步数据流的抽象和组合能力,用法上也很类似,这里以 RxSwift 为例来重写上面的过程。
第一步,实现一个将流式函数转换成 Observable 的工具类,这个是通用的,非常直观:
第二步,针对这个例子,将 listOrder 和 queryUserInfo 转换成 StreamFunc 形式,listOrder 本来就是 StreamFunc,对 queryUserInfo 进行偏应用也可以转换为 StreamFunc 形式,这是具体接口相关的:
第三步,这样就可以将 load 方法简化为:
可以看到,第一步是通用的,实际代码中只需要做第二步和第三步,这就对上面的接口进行了大量的简化,并且库以统一的方式处理掉了合并、级联、多数据返回的复杂逻辑,我们有相当的把握来保证正确性。当然,除了学习成本较高以外,也还是有缺点的,主要是使用方式仍然是异步形式,在部分环节仍然需要处理异步带来的复杂度:
Rx 确实大大简化了异步编程,但是还不够,因为它的使用仍然是异步形式。
四、使用 AsyncSequence 简化响应式编程
4.1 迭代器与序列
迭代器是很多语言都有的一个概念,一个迭代器的核心是 next()函数,每次调用都会返回下一个数据,这些数据构成了一个序列(Sequence),迭代器也意味着序列可以被遍历。
4.2 异步序列
如果让迭代器的 next()方法支持异步,就产生了异步序列。Swift 对此提供了一个 AsyncSequence 的协议,并对它提供了语言级别的支持,使得开发者可以以同步的形式遍历一个异步序列:
实际上,Swift 在 Combine 中支持了 Publisher 的同步遍历:
4.3 CPS 变换
如果能将流式接口转换为异步序列,那么就可以实现响应式代码的同步编写,这个转换过程可以通过 CPS 变换实现。
CPS 变换全称 Continuation-Pass-Style,这个概念来自 Lisp 语系,是一种显式传递控制流的编程风格,其传递控制流的载体就是 continuation。continuation 可以理解为当前代码执行的后续,如果一个函数 f 有一个 continuation 参数,我们就可以把当前的 continuation 传递进去,当函数产生结果时,通过 continuation 回到函数 f 外,继续执行,这种函数调用方式成为 call/cc(call with current continuation)。
这种变换,称为 CPS 变换。
作为一个类比,我觉得可以将 continuation 理解为 return 的在两个方面的推广形式,首先,continuation 是 first-class 的,可以作为变量存储,可以作为函数的参数和返回值,其次,continuation 可以多次使用,而 return 只能有一次。
4.4 响应式编程的同步形式
回头看最原始的代码,当我们调用 orderService.listOrders 时,传进去的 callback,其实就相当于一个弱化版的 continuation。这意味着,如果我们可以将使用 continuation 将数据表示为 AsyncSequence,那么就可以将响应式代码写成同步形式,从而大幅简化响应式编程。
Swift 提供了 continuation 的概念,提供了 AsyncStream 和 AsyncThrowingStream 来实现这个过程,对上节 Rx 的实现稍作改动即可。
第一步,实现一个将流式函数转换成 AsyncThrowingStream 的工具类,这个是通用的:
第二步,由于 AsyncSequence 还不支持 merge,需要自己实现一个 merge 工具方法来实现多个流的组合,这个也是通用的:
第三步,将 listOrder 和 queryUserInfo 转换成 StreamFunc 形式,与 Rx 中的第二步实现完全相同;
第四步,这样就可以将 load 方法简化为:
可以发现,代码与 RxSwift 几乎是完全相同的,所以我们仍然有对于代码正确性的信心,不同的是,现在使用方也得以获得同样的信心:
五、总结
同步是编程中的田园世界,而流式接口作为异步接口最复杂的形态,我们通过 CPS 变换的控制流技术,将流式接口表示为 AsyncSequence,实现了对异步序列遍历的同步形式,从而将响应式编程在形式上统一回了田园世界。
上面的第一步和第二步实现了 AsyncSequence 和 StreamFunc 的相互转换,所以实际上我们证明了它们是同构的,更进一步的,我们可以证明它们与 Rx、Combine 也是同构的。换言之,它们是同一个概念的不同形式,理论上它们的表达能力是等价的,这个概念就是数据流,这个概念在 Rx 中叫做 Observable,在 Combine 中叫做 Publisher。
在实际实现上,Rx 和 Combine 提供了大量的操作符,因此目前它们的能力远远强于 AsyncSequence 和 StreamFunc,比如 AsyncSequence 居然不支持 merge。
AsyncSequence 的优势是可以支持同步写法,在我看来这个优势是很大的。看到社区有过 AsyncSequence 替换 Combine 的相关的讨论,我认为逻辑上是讲得通的。
AsyncSequence 替换 Combine 的相关讨论地址:https://forums.swift.org/t/should-asyncsequence-replace-combine-in-the-future-or-should-they-coexist/53370
版权声明: 本文为 InfoQ 作者【阿里技术】的原创文章。
原文链接:【http://xie.infoq.cn/article/d821b9fbb1bfdb04d9e067e8b】。文章转载请联系作者。
评论