写点什么

深入浅出函数式编程:Stream 流水线的实现原理

用户头像
码农架构
关注
发布于: 2021 年 02 月 14 日

前面我们已经学会如何使用 Stream API

用起来真的很爽,但简洁的方法下面似乎隐藏着无尽的秘密,如此强大的 API 是如何实现的呢?比如 Pipeline 是怎么执行的,每次方法调用都会导致一次迭代吗?自动并行又是怎么做到的,线程个数是多少?本节我们学习 Stream 流水线的原理,这是 Stream 实现的关键所在。


首先回顾一下容器执行 Lambda 表达式的方式,以ArrayList.forEach()方法为例,具体代码如下:


我们看到ArrayList.forEach()方法的主要逻辑就是一个 for 循环,在该 for 循环里不断调用action.accept()回调方法完成对元素的遍历。这完全没有什么新奇之处,回调方法在 Java GUI 的监听器中广泛使用。Lambda 表达式的作用就是相当于一个回调方法,这很好理解。


Stream API 中大量使用 Lambda 表达式作为回调方法,但这并不是关键。理解 Stream 我们更关心的是另外两个问题:流水线和自动并行。使用 Stream 或许很容易写入如下形式的代码:


上述代码求出以字母 A 开头的字符串的最大长度,一种直白的方式是为每一次函数调用都执一次迭代,这样做能够实现功能,但效率上肯定是无法接受的。类库的实现着使用流水线(Pipeline)的方式巧妙的避免了多次迭代,其基本思想是在一次迭代中尽可能多的执行用户指定的操作。为讲解方便我们汇总了 Stream 的所有操作。

image.png

Stream 上的所有操作分为两类:中间操作和结束操作,中间操作只是一种标记,只有结束操作才会触发实际计算。中间操作又可以分为无状态的(Stateless)和有状态的(Stateful),无状态中间操作是指元素的处理不受前面元素的影响,而有状态的中间操作必须等到所有元素处理之后才知道最终结果,比如排序是有状态操作,在读取所有元素之前并不能确定排序结果;结束操作又可以分为短路操作和非短路操作,短路操作是指不用处理全部元素就可以返回结果,比如找到第一个满足条件的元素。之所以要进行如此精细的划分,是因为底层对每一种情况的处理方式不同。为了更好的理解流的中间操作和终端操作,可以通过下面的两段代码来看他们的执行过程。


image.png


输出为:A1B1C1 A2B2C2 A3B3C3 中间操作是懒惰的,也就是中间操作不会对数据做任何操作,直到遇到了最终操作。而最终操作,都是比较热情的。他们会往前回溯所有的中间操作。也就是当执行到最后的 forEach 操作的时候,它会回溯到它的上一步中间操作,上一步中间操作,又会回溯到上上一步的中间操作,...,直到最初的第一步。第一次 forEach 执行的时候,会回溯 peek 操作,然后 peek 会回溯更上一步的 limit 操作,然后 limit 会回溯更上一步的 peek 操作,顶层没有操作了,开始自上向下开始执行,输出:A1B1C1 第二次 forEach 执行的时候,然后会回溯 peek 操作,然后 peek 会回溯更上一步的 limit 操作,然后 limit 会回溯更上一步的 peek 操作,顶层没有操作了,开始自上向下开始执行,输出:A2B2C2


... 当第四次 forEach 执行的时候,然后会回溯 peek 操作,然后 peek 会回溯更上一步的 limit 操作,到 limit 的时候,发现 limit(3)这个 job 已经完成,这里就相当于循环里面的 break 操作,跳出来终止循环。

再来看第二段代码:

image.png

输出为:A1 A2 A3 A4 A5 A6 A7B7C7 A8B8C8 A9B9C9 第一次 forEach 执行的时候,会回溯 peek 操作,然后 peek 会回溯更上一步的 skip 操作,skip 回溯到上一步的 peek 操作,顶层没有操作了,开始自上向下开始执行,执行到 skip 的时候,因为执行到 skip,这个操作的意思就是跳过,下面的都不要执行了,也就是就相当于循环里面的 continue,结束本次循环。输出:A1


第二次 forEach 执行的时候,会回溯 peek 操作,然后 peek 会回溯更上一步的 skip 操作,skip 回溯到上一步的 peek 操作,顶层没有操作了,开始自上向下开始执行,执行到 skip 的时候,发现这是第二次 skip,结束本次循环。输出:A2

...


第七次 forEach 执行的时候,会回溯 peek 操作,然后 peek 会回溯更上一步的 skip 操作,skip 回溯到上一步的 peek 操作,顶层没有操作了,开始自上向下开始执行,执行到 skip 的时候,发现这是第七次 skip,已经大于 6 了,它已经执行完了 skip(6)的 job 了。这次 skip 就直接跳过,继续执行下面的操作。输出:A7B7C7

...直到循环结束。


一种直白的实现方式

仍然考虑上述求最长字符串的程序,一种直白的流水线实现方式是为每一次函数调用都执一次迭代,并将处理中间结果放到某种数据结构中(比如数组,容器等)。具体说来,就是调用filter()方法后立即执行,选出所有以 A 开头的字符串并放到一个列表 list1 中,之后让 list1 传递给mapToInt()方法并立即执行,生成的结果放到 list2 中,最后遍历 list2 找出最大的数字作为最终结果。程序的执行流程如如所示:


image.png


这样做实现起来非常简单直观,但有两个明显的弊端:

  1. 迭代次数多。迭代次数跟函数调用的次数相等。

  2. 频繁产生中间结果。每次函数调用都产生一次中间结果,存储开销无法接受。

这些弊端使得效率底下,根本无法接受。如果不使用 Stream API 我们都知道上述代码该如何在一次迭代中完成,大致是如下形式:


image.png


采用这种方式我们不但减少了迭代次数,也避免了存储中间结果,显然这就是流水线,因为我们把三个操作放在了一次迭代当中。只要我们事先知道用户意图,总是能够采用上述方式实现跟 Stream API 等价的功能,但问题是 Stream 类库的设计者并不知道用户的意图是什么。如何在无法假设用户行为的前提下实现流水线,是类库的设计者要考虑的问题。


Stream 流水线解决方案

我们大致能够想到,应该采用某种方式记录用户每一步的操作,当用户调用结束操作时将之前记录的操作叠加到一起在一次迭代中全部执行掉。沿着这个思路,有几个问题需要解决:

  1. 用户的操作如何记录?

  2. 操作如何叠加?

  3. 叠加之后的操作如何执行?(后续专门针对问题点分析)

  4. 执行后的结果(如果有)在哪里?(后续专门针对问题点分析)


操作如何记录

注意这里使用的是“操作(operation)”一词,指的是“Stream 中间操作”的操作,很多 Stream 操作会需要一个回调函数(Lambda 表达式),因此一个完整的操作是<数据来源,操作,回调函数>构成的三元组。Stream 中使用 Stage 的概念来描述一个完整的操作,并用某种实例化后的 PipelineHelper 来代表 Stage,将具有先后顺序的各个 Stage 连到一起,就构成了整个流水线。跟 Stream 相关类和接口的继承关系图示。


还有 IntPipeline, LongPipeline, DoublePipeline 没在图中画出,这三个类专门为三种基本类型(不是包装类型)而定制的,跟 ReferencePipeline 是并列关系。图中 Head 用于表示第一个 Stage,即调用调用诸如 Collection.stream()方法产生的 Stage,很显然这个 Stage 里不包含任何操作;StatelessOp 和 StatefulOp 分别表示无状态和有状态的 Stage,对应于无状态和有状态的中间操作。


Stream 流水线组织结构示意图如下:


图中通过Collection.stream()方法得到 Head 也就是 stage0,紧接着调用一系列的中间操作,不断产生新的 Stream。这些 Stream 对象以双向链表的形式组织在一起,构成整个流水线,由于每个 Stage 都记录了前一个 Stage 和本次的操作以及回调函数,依靠这种结构就能建立起对数据源的所有操作。这就是 Stream 记录操作的方式。


操作如何叠加

以上只是解决了操作记录的问题,要想让流水线起到应有的作用我们需要一种将所有操作叠加到一起的方案。你可能会觉得这很简单,只需要从流水线的 head 开始依次执行每一步的操作(包括回调函数)就行了。这听起来似乎是可行的,但是你忽略了前面的 Stage 并不知道后面 Stage 到底执行了哪种操作,以及回调函数是哪种形式。换句话说,只有当前 Stage 本身才知道该如何执行自己包含的动作。这就需要有某种协议来协调相邻 Stage 之间的调用关系。


这种协议由 Sink 接口完成,Sink 接口包含的方法如下表所示:



有了上面的协议,相邻 Stage 之间调用就很方便了,每个 Stage 都会将自己的操作封装到一个 Sink 里,前一个 Stage 只需调用后一个 Stage 的accept()方法即可,并不需要知道其内部是如何处理的。当然对于有状态的操作,Sink 的begin()end()方法也是必须实现的。比如 Stream.sorted()是一个有状态的中间操作,其对应的 Sink.begin()方法可能创建一个盛放结果的容器,而 accept()方法负责将元素添加到该容器,最后 end()负责对容器进行排序。对于短路操作,Sink.cancellationRequested()也是必须实现的,比如 Stream.findFirst()是短路操作,只要找到一个元素,cancellationRequested()就应该返回 true,以便调用者尽快结束查找。Sink 的四个接口方法常常相互协作,共同完成计算任务。实际上 Stream API 内部实现的的本质,就是如何重写 Sink 的这四个接口方法


有了 Sink 对操作的包装,Stage 之间的调用问题就解决了,执行时只需要从流水线的 head 开始对数据源依次调用每个 Stage 对应的 Sink.{begin(), accept(), cancellationRequested(), end()}方法就可以了。一种可能的 Sink.accept()方法流程是这样的:


Sink 接口的其他几个方法也是按照这种[处理->转发]的模型实现。下面我们结合具体例子看看 Stream 的中间操作是如何将自身的操作包装成 Sink 以及 Sink 是如何将处理结果转发给下一个 Sink 的。先看 Stream.map()方法:


上述代码看似复杂,其实逻辑很简单,就是将回调函数 mapper 包装到一个 Sink 当中。由于 Stream.map()是一个无状态的中间操作,所以 map()方法返回了一个 StatelessOp 内部类对象(一个新的 Stream),调用这个新 Stream 的 opWripSink()方法将得到一个包装了当前回调函数的 Sink。


再来看一个复杂一点的例子。Stream.sorted()方法将对 Stream 中的元素进行排序,显然这是一个有状态的中间操作,因为读取所有元素之前是没法得到最终顺序的。抛开模板代码直接进入问题本质,sorted()方法是如何将操作封装成 Sink 的呢?sorted()一种可能封装的 Sink 代码如下:


上述代码完美的展现了 Sink 的四个接口方法是如何协同工作的:

  1. 首先 begin()方法告诉 Sink 参与排序的元素个数,方便确定中间结果容器的的大小;

  2. 之后通过 accept()方法将元素添加到中间结果当中,最终执行时调用者会不断调用该方法,直到遍历所有元素;

  3. 最后 end()方法告诉 Sink 所有元素遍历完毕,启动排序步骤,排序完成后将结果传递给下游的 Sink;

  4. 如果下游的 Sink 是短路操作,将结果传递给下游时不断询问下游 cancellationRequested()是否可以结束处理。

总结

本文详细介绍了 Stream 流水线的组织方式,后续会持续针对 Stream 流水线的执行过程进行详细介绍。学习本文将有助于理解原理并写出正确的 Stream 代码,同时打消你对 Stream API 效率方面的顾虑。如你所见,Stream API 实现如此巧妙,即使我们使用外部迭代手动编写等价代码,也未必更加高效。


发布于: 2021 年 02 月 14 日阅读数: 26
用户头像

码农架构

关注

公众号:码农架构 2018.03.22 加入

专注于系统架构、高可用、高性能、高并发类技术分享

评论

发布
暂无评论
深入浅出函数式编程:Stream流水线的实现原理