写点什么

一种新的流:为 Java 加入生成器 (Generator) 特性

  • 2023-04-25
    浙江
  • 本文字数:20678 字

    阅读完需:约 68 分钟

作者:文镭(依来)

前言

这篇文章不是工具推荐,也不是应用案例分享。其主题思想,是介绍一种全新的设计模式。它既拥有抽象的数学美感,仅仅从一个简单接口出发,就能推演出庞大的特性集合,引出许多全新概念。同时也有扎实的工程实用价值,由其实现的工具,性能均可显著超过同类的头部开源产品。


这一设计模式并非因 Java 而生,而是诞生于一个十分简陋的脚本语言。它对语言特性的要求非常之低,因而其价值对众多现代编程语言都是普适的。

关于 Stream

首先大概回顾下 Java 里传统的流式 API。自 Java8 引入 lambda 表达式和 Stream 以来,Java 的开发便捷性有了质的飞跃,Stream 在复杂业务逻辑的处理上让人效率倍增,是每一位 Java 开发者都应该掌握的基础技能。但排除掉 parallelStream 也即并发流之外,它其实并不是一个好的设计。


第一、封装过重,实现过于复杂,源码极其难读。我能理解这或许是为了兼容并发流所做的妥协,但毕竟耦合太深,显得艰深晦涩。每一位初学者被源码吓到之后,想必都会产生流是一种十分高级且实现复杂的特性的印象。实际上并不是这样,流其实可以用非常简单的方式构建


第二、API 过于冗长。冗长体现在 stream.collect 这一部分。作为对比,Kotlin 提供的 toList/toSet/associate(toMap)等等丰富操作是可以直接作用在流上的。Java 直到 16 才抠抠索索加进来一个 Stream 可以直接调用的 toList,他们甚至不肯把 toSet/toMap 一起加上。


第三、API 功能简陋。对于链式操作,在最初的 Java8 里只有 map/filter/skip/limit/peek/distinct/sorted 这七个,Java9 又加上了 takeWhile/dropWhile。然而在 Kotlin 中,除了这几个之外人还有许多额外的实用功能。


例如:


mapIndexed,mapNotNull,filterIndexed,filterNotNull,onEachIndexed,distinctBy, sortedBy,sortedWith,zip,zipWithNext 等等,翻倍了不止。这些东西实现起来并不复杂,就是个顺手的事,但对于用户而言有和没有的体验差异可谓巨大。


在这篇文章里,我将提出一种全新的机制用于构建流。这个机制极其简单,任何能看懂 lambda 表达式(闭包)的同学都能亲手实现,任何支持闭包的编程语言都能利用该机制实现自己的流。也正是由于这个机制足够简单,所以开发者可以以相当低的成本撸出大量的实用 API,使用体验甩开 Stream 两条街,不是问题。

关于生成器

生成器(Generator)[1]是许多现代编程语言里一个广受好评的重要特性,在 Python/Kotlin/C#/Javascript 等等语言中均有直接支持。它的核心 API 就是一个 yield 关键字(或者方法)。


有了生成器之后,无论是 iterable/iterator,还是一段乱七八糟的闭包,都可以直接映射为一个流。举个例子,假设你想实现一个下划线字符串转驼峰的方法,在 Python 里你可以利用生成器这么玩


def underscore_to_camelcase(s):    def camelcase():        yield str.lower        while True:            yield str.capitalize
return ''.join(f(sub) for sub, f in zip(s.split('_'), camelcase()))
复制代码


这短短几行代码可以说处处体现出了 Python 生成器的巧妙。首先,camelcase 方法里出现了 yield 关键字,解释器就会将其看作是一个生成器,这个生成器会首先提供一个 lower 函数,然后提供无数的 capitalize 函数。由于生成器的执行始终是 lazy 的,所以用 while true 的方式生成无限流是十分常见的手段,不会有性能或者内存上的浪费。其次,Python 里的流是可以和 list 一起进行 zip 的,有限的 list 和无限的流 zip 到一起,list 结束了流自然也会结束。


这段代码中,末尾那行 join()括号里的东西,Python 称之为生成器推导(Generator Comprehension)[2],其本质上依然是一个流,一个 zip 流被 map 之后的 string 流,最终通过 join 方法聚合为一个 string。


以上代码里的操作, 在任何支持生成器的语言里都可以轻易完成,但是在 Java 里你恐怕连想都不敢想。Java 有史以来,无论是历久弥新的 Java8,还是最新的引入了 Project Loom[3]的 OpenJDK19,连协程都有了,依然没有直接支持生成器。


本质上,生成器的实现要依赖于 continuation[4]的挂起和恢复,所谓 continuation 可以直观理解为程序执行到指定位置后的断点,协程就是指在这个函数的断点挂起后跳到另一个函数的某个断点继续执行,而不会阻塞线程,生成器亦如是。


Python 通过栈帧的保存与恢复实现函数重入以及生成器[5],Kotlin 在编译阶段利用 CPS(Continuation Passing Style)[6]技术对字节码进行了变换,从而在 JVM 上模拟了协程[7]。其他的语言要么大体如此,要么有更直接的支持。


那么,有没有一种办法,可以在没有协程的 Java 里,实现或者至少模拟出一个 yield 关键字,从而动态且高性能地创建流呢。答案是,有。

正文

Java 里的流叫 Stream,Kotlin 里的流叫 Sequence。我实在想不出更好的名字了,想叫 Flow 又被用了,简单起见姑且叫 Seq。

概念定义

首先给出 Seq 的接口定义


public interface Seq<T> {    void consume(Consumer<T> consumer);}
复制代码


它本质上就是一个 consumer of consumer,其真实含义我后边会讲。这个接口看似抽象,实则非常常见,java.lang.Iterable 天然自带了这个接口,那就是大家耳熟能详的 forEach。利用方法推导,我们可以写出第一个 Seq 的实例


List<Integer> list = Arrays.asList(1, 2, 3);Seq<Integer> seq = list::forEach;
复制代码


可以看到,在这个例子里 consume 和 forEach 是完全等价的,事实上这个接口我最早就是用 forEach 命名的,几轮迭代之后才改成含义更准确的 consume。


利用单方法接口在 Java 里会自动识别为 FunctionalInteraface 这一伟大特性,我们也可以用一个简单的 lambda 表达式来构造流,比如只有一个元素的流。


static <T> Seq<T> unit(T t) {    return c -> c.accept(t);}
复制代码


这个方法在数学上很重要(实操上其实用的不多),它定义了 Seq 这个泛型类型的单位元操作,即 T -> Seq<T>的映射。

map 与 flatMap

map

从 forEach 的直观角度出发,我们很容易写出 map[8],将类型为 T 的流,转换为类型为 E 的流,也即根据函数 T -> E 得到 Seq<T> -> Seq<E>的映射。


default <E> Seq<E> map(Function<T, E> function) {  return c -> consume(t -> c.accept(function.apply(t)));}
复制代码

flatMap

同理,可以继续写出 flatMap,即将每个元素展开为一个流之后再合并。


default <E> Seq<E> flatMap(Function<T, Seq<E>> function) {    return c -> consume(t -> function.apply(t).consume(c));}
复制代码


大家可以自己在 IDEA 里写写这两个方法,结合智能提示,写起来其实非常方便。如果你觉得理解起来不太直观,就把 Seq 看作是 List,把 consume 看作是 forEach 就好。

filter 与 take/drop

map 与 flatMap 提供了流的映射与组合能力,流还有几个核心能力:元素过滤与中断控制。

filter

过滤元素,实现起来也很简单


default Seq<T> filter(Predicate<T> predicate) {    return c -> consume(t -> {        if (predicate.test(t)) {            c.accept(t);        }    });}
复制代码

take

流的中断控制有很多场景,take 是最常见的场景之一,即获取前 n 个元素,后面的不要——等价于 Stream.limit。


由于 Seq 并不依赖 iterator,所以必须通过异常实现中断。为此需要构建一个全局单例的专用异常,同时取消这个异常对调用栈的捕获,以减少性能开销(由于是全局单例,不取消也没关系)


public final class StopException extends RuntimeException {    public static final StopException INSTANCE = new StopException();
@Override public synchronized Throwable fillInStackTrace() { return this; }}
复制代码


以及相应的方法


static <T> T stop() {    throw StopException.INSTANCE;}
default void consumeTillStop(C consumer) { try { consume(consumer); } catch (StopException ignore) {}}
复制代码


然后就可以实现 take 了:


default Seq<T> take(int n) {    return c -> {        int[] i = {n};        consumeTillStop(t -> {            if (i[0]-- > 0) {                c.accept(t);            } else {                stop();            }        });    };}
复制代码

drop

drop 是与 take 对应的概念,丢弃前 n 个元素——等价于 Stream.skip。它并不涉及流的中断控制,反而更像是 filter 的变种,一种带有状态的 filter。观察它和上面 take 的实现细节,内部随着流的迭代,存在一个计数器在不断刷新状态,但这个计数器并不能为外界感知。这里其实已经能体现出流的干净特性,它哪怕携带了状态,也丝毫不会外露。


default Seq<T> drop(int n) {    return c -> {        int[] a = {n - 1};        consume(t -> {            if (a[0] < 0) {                c.accept(t);            } else {                a[0]--;            }        });    };}
复制代码

其他 API

onEach

对流的某个元素添加一个操作 consumer,但是不执行流——对应 Stream.peek。


default Seq<T> onEach(Consumer<T> consumer) {    return c -> consume(consumer.andThen(c));}
复制代码

zip

流与一个 iterable 元素两两聚合,然后转换为一个新的流——在 Stream 里没有对应,但在 Python 里有同名实现。


default <E, R> Seq<R> zip(Iterable<E> iterable, BiFunction<T, E, R> function) {    return c -> {        Iterator<E> iterator = iterable.iterator();        consumeTillStop(t -> {            if (iterator.hasNext()) {                c.accept(function.apply(t, iterator.next()));            } else {                stop();            }        });    };}
复制代码

终端操作

上面实现的几个方法都是流的链式 API,它们将一个流映射为另一个流,但流本身依然是 lazy 或者说尚未真正执行的。真正执行这个流需要使用所谓终端操作,对流进行消费或者聚合。在 Stream 里,消费就是 forEach,聚合就是 Collector。对于 Collector,其实也可以有更好的设计,这里就不展开了。不过为了示例,可以先简单快速实现一个 join。


default String join(String sep) {    StringJoiner joiner = new StringJoiner(sep);    consume(t -> joiner.add(t.toString()));    return joiner.toString();}
复制代码


以及 toList。


default List<T> toList() {    List<T> list = new ArrayList<>();    consume(list::add);    return list;}
复制代码


至此为止,我们仅仅只用几十行代码,就实现出了一个五脏俱全的流式 API。在大部分情况下,这些 API 已经能覆盖百分之八九十的使用场景。你完全可以依样画葫芦,在其他编程语言里照着玩一玩,比如 Go(笑)。

生成器的推导

本文虽然从标题开始就在讲生成器,甚至毫不夸张的说生成器才是最核心的特性,但等到把几个核心的流式 API 写完了,依然没有解释生成器到底是咋回事——其实倒也不是我在卖关子,你只要仔细观察一下,生成器早在最开始讲到 Iterable 天生就是 Seq 的时候,就已经出现了。


List<Integer> list = Arrays.asList(1, 2, 3);Seq<Integer> seq = list::forEach;
复制代码


没看出来?那把这个方法推导改写为普通 lambda 函数,有


Seq<Integer> seq = c -> list.forEach(c);
复制代码


再进一步,把这个 forEach 替换为更传统的 for 循环,有


Seq<Integer> seq = c -> {    for (Integer i : list) {        c.accept(i);    }};
复制代码


由于已知这个 list 就是[1, 2, 3],所以以上代码可以进一步等价写为


Seq<Integer> seq = c -> {    c.accept(1);    c.accept(2);    c.accept(3);};
复制代码


是不是有点眼熟?不妨看看 Python 里类似的东西长啥样:


def seq():    yield 1    yield 2    yield 3
复制代码


二者相对比,形式几乎可以说一模一样——这其实就已经是生成器了,这段代码里的 accept 就扮演了 yield 的角色,consume 这个接口之所以取这个名字,含义就是指它是一个消费操作,所有的终端操作都是基于这个消费操作实现的。功能上看,它完全等价于 Iterable 的 forEach,之所以又不直接叫 forEach,是因为它的元素并不是本身自带的,而是通过闭包内的代码块临时生成的


这种生成器,并非传统意义上利用 continuation 挂起的生成器,而是利用闭包来捕获代码块里临时生成的元素,哪怕没有挂起,也能高度模拟传统生成器的用法和特性。其实上文所有链式 API 的实现,本质上也都是生成器,只不过生成的元素来自于原始的流罢了。


有了生成器,我们就可以把前文提到的下划线转驼峰的操作用 Java 也依样画葫芦写出来了。


static String underscoreToCamel(String str) {    // Java没有首字母大写方法,随便现写一个    UnaryOperator<String> capitalize = s -> s.substring(0, 1).toUpperCase() + s.substring(1).toLowerCase();     // 利用生成器构造一个方法的流    Seq<UnaryOperator<String>> seq = c -> {        // yield第一个小写函数        c.accept(String::toLowerCase);        // 这里IDEA会告警,提示死循环风险,无视即可        while (true) {            // 按需yield首字母大写函数            c.accept(capitalize);        }    };    List<String> split = Arrays.asList(str.split("_"));    // 这里的zip和join都在上文给出了实现    return seq.zip(split, (f, sub) -> f.apply(sub)).join("");}
复制代码


大家可以把这几段代码拷下来跑一跑,看它是不是真的实现了其目标功能。

生成器的本质

虽然已经推导出了生成器,但似乎还是有点摸不着头脑,这中间到底发生了什么,死循环是咋跳出的,怎么就能生成元素了。为了进一步解释,这里再举一个大家熟悉的例子。

生产者-消费者模式

生产者与消费者的关系不止出现在多线程或者协程语境下,在单线程里也有一些经典场景。比如 A 和 B 两名同学合作一个项目,分别开发两个模块:A 负责产出数据,B 负责使用数据。A 不关心 B 怎么处理数据,可能要先过滤一些,进行聚合后再做计算,也可能是写到某个本地或者远程的存储;B 自然也不关心 A 的数据是怎么来的。这里边唯一的问题在于,数据条数实在是太多了,内存一次性放不下。在这种情况下,传统的做法是让 A 提供一个带回调函数 consumer 的接口,B 在调用 A 的时候传入一个具体的 consumer。


public void produce(Consumer<String> callback) {    // do something that produce strings    // then use the callback consumer to eat them}
复制代码


这种基于回调函数的交互方式实在是过于经典了,原本没啥可多说的。但是在已经有了生成器之后,我们不妨胆子放大一点稍微做一下改造:仔细观察上面这个 produce 接口,它输入一个 consumer,返回 void——咦,所以它其实也是一个 Seq 嘛!


Seq<String> producer = this::produce;
复制代码


接下来,我们只需要稍微调整下代码,就能对这个原本基于回调函数的接口进行一次升级,将它变成一个生成器。


public Seq<String> produce() {    return c -> {        // still do something that produce strings        // then use the callback consumer to eat them    };}
复制代码


基于这一层抽象,作为生产者的 A 和作为消费者的 B 就真正做到完全的、彻底的解耦了。A 只需要把数据生产过程放到生成器的闭包里,期间涉及到的所有副作用,例如 IO 操作等,都被这个闭包完全隔离了。B 则直接拿到一个干干净净的流,他不需要关心流的内部细节,当然想关心也关心不了,他只用专注于自己想做的事情即可。


更重要的是,A 和 B 虽然在操作逻辑上完全解耦,互相不可见,但在 CPU 调度时间上它们却是彼此交错的,B 甚至还能直接阻塞、中断 A 的生产流程——可以说没有协程,胜似协程。


至此,我们终于成功发现了 Seq 作为生成器的真正本质 :consumer of callback。明明是一个回调函数的消费者,摇身一变就成了生产者,实在是有点奇妙。不过仔细一想倒也合理:能够满足消费者需求(callback)的家伙,不管这需求有多么奇怪,可不就是生产者么。


容易发现,基于 callback 机制的生成器,其调用开销完全就只有生成器闭包内部那堆代码块的执行开销,加上一点点微不足道的闭包创建开销。在诸多涉及到流式计算与控制的业务场景里,这将带来极为显著的内存与性能优势。后面我会给出展现其性能优势的具体场景实例。


另外,观察这段改造代码,会发现 produce 输出的东西,根本就还是个函数,没有任何数据被真正执行和产出。这就是生成器作为一个匿名接口的天生优势:惰性计算——消费者看似得到了整个流,实际那只是一张爱的号码牌,可以涂写,可以废弃,但只有在拿着货真价实的 callback 去兑换的那一刻,才会真正的执行流。


生成器的本质,正是人类本质的反面:鸽子克星——没有任何人可以鸽它

IO 隔离与流输出

Haskell 发明了所谓 IO Monad[9]来将 IO 操作与纯函数的世界隔离。Java 利用 Stream,勉强做到了类似的封装效果。以 java.io.BufferedReader 为例,将本地文件读取为一个 Stream<String>,可以这么写:


Stream<String> lines = new BufferedReader(new InputStreamReader(new FileInputStream("file"))).lines();
复制代码


如果你仔细查看一下这个 lines 方法的实现,会发现它使用了大段代码去创建了一个 iterator,而后才将其转变为 stream。暂且不提它的实现有多么繁琐,这里首先应该注意的是 BufferedReader 是一个 Closeable,安全的做法是在使用完毕后 close,或者利用 try-with-resources 语法包一层,实现自动 close。但是 BufferedReader.lines 并没有去关闭这个源,它是一个不那么安全的接口——或者说,它的隔离是不完整的。Java 对此也打了个补丁,使用 java.nio.file.Files.lines,它会添加加一个 onClose 的回调 handler,确保 stream 耗尽后执行关闭操作。


那么有没有更普适做法呢,毕竟不是所有人都清楚 BufferedReader.lines 和 Files.lines 会有这种安全性上的区别,也不是所有的 Closeable 都能提供类似的安全关闭的流式接口,甚至大概率压根就没有流式接口。


好在现在我们有了 Seq,它的闭包特性自带隔离副作用的先天优势。恰巧在涉及大量数据 IO 的场景里,利用 callback 交互又是极为经典的设计方式——这里简直就是它大展拳脚的最佳舞台。


用生成器实现 IO 的隔离非常简单,只需要整个包住 try-with-resources 代码即可,它同时就包住了 IO 的整个生命周期。


Seq<String> seq = c -> {    try (BufferedReader reader = Files.newBufferedReader(Paths.get("file"))) {        String s;        while ((s = reader.readLine()) != null) {            c.accept(s);        }    } catch (Exception e) {        throw new RuntimeException(e);    }};
复制代码


核心代码其实就 3 行,构建数据源,挨个读数据,然后 yield(即 accept)。后续对流的任何操作看似发生在创建流之后,实际执行起来都被包进了这个 IO 生命周期的内部,读一个消费一个,彼此交替,随用随走。


换句话讲,生成器的 callback 机制,保证了哪怕 Seq 可以作为变量四处传递,但涉及到的任何副作用操作,都是包在同一个代码块里惰性执行的。它不需要像 Monad 那样,还得定义诸如 IOMonad,StateMonad 等等花样众多的 Monad。


与之类似,这里不妨再举个阿里中间件的例子,利用 Tunnel 将大家熟悉的 ODPS 表数据下载为一个流:


public static Seq<Record> downloadRecords(TableTunnel.DownloadSession session) {    return c -> {        long count = session.getRecordCount();        try (TunnelRecordReader reader = session.openRecordReader(0, count)) {            for (long i = 0; i < count; i++) {                c.accept(reader.read());            }        } catch (Exception e) {            throw new RuntimeException(e);        }    };}
复制代码


有了 Record 流之后,如果再能实现出一个 map 函数,就可以非常方便的将 Record 流 map 为带业务语义的 DTO 流——这其实就等价于一个 ODPS Reader。

异步流

基于 callback 机制的生成器,除了可以在 IO 领域大展拳脚,它天然也是亲和异步操作的。毕竟一听到回调函数这个词,很多人就能条件反射式的想到异步,想到 Future。一个 callback 函数,它的命运就决定了它是不会在乎自己被放到哪里、被怎么使用的。比方说,丢给某个暴力的异步逻辑:


public static Seq<Integer> asyncSeq() {    return c -> {        CompletableFuture.runAsync(() -> c.accept(1));        CompletableFuture.runAsync(() -> c.accept(2));    };}
复制代码


这就是一个简单而粗暴的异步流生成器。对于外部使用者来说,异步流除了不能保证元素顺序,它和同步流没有任何区别,本质上都是一段可运行的代码,边运行边产生数据。 一个 callback 函数,谁给用不是用呢。

并发流

既然给谁用不是用,那么给 ForkJoinPool 用如何?——Java 大名鼎鼎的 parallelStream 就是基于 ForkJoinPool 实现的。我们也可以拿来搞一个自己的并发流。具体做法很简单,把上面异步流示例里的 CompletableFuture.runAsync 换成 ForkJoinPool.submit 即可,只是要额外注意一件事:parallelStream 最终执行后是要阻塞的(比如最常用的 forEach),它并非单纯将任务提交给 ForkJoinPool,而是在那之后还要做一遍 join。


对此我们不妨采用最为暴力而简单的思路,构造一个 ForkJoinTask 的 list,依次将元素提交 forkJoinPool 后,产生一个 task 并添加进这个 list,等所有元素全部提交完毕后,再对这个 list 里的所有 task 统一 join。


default Seq<T> parallel() {    ForkJoinPool pool = ForkJoinPool.commonPool();    return c -> map(t -> pool.submit(() -> c.accept(t))).cache().consume(ForkJoinTask::join);}
复制代码


这就是基于生成器的并发流,它的实现仅仅只需要两行代码——正如本文开篇所说,流可以用非常简单的方式构建。哪怕是 Stream 费了老大劲的并发流,换一种方式,实现起来可以简单到令人发指。


这里值得再次强调的是,这种机制并非 Java 限定,而是任何支持闭包的编程语言都能玩。事实上,这种流机制的最早验证和实现,就是我在 AutoHotKey_v2[10]这个软件自带的简陋的脚本语言上完成的。

再谈生产者-消费者模式

前面为了解释生成器的 callback 本质,引入了单线程下的生产者-消费者模式。那在实现了异步流之后,事情就更有意思了。


回想一下,Seq 作为一种中间数据结构,能够完全解耦生产者与消费者,一方只管生产数据交给它,另一方只管从它那里拿数据消费。这种构造有没有觉得有点眼熟?不错,正是 Java 开发者常见的阻塞队列,以及支持协程的语言里的通道(Channel) ,比如 Go 和 Kotlin。


通道某种意义上也是一种阻塞队列,它和传统阻塞队列的主要区别,在于当通道里的数据超出限制或为空时,对应的生产者/消费者会挂起而不是阻塞,两种方式都会暂停生产/消费,只是协程挂起后能让出 CPU,让它去别的协程里继续干活。


那 Seq 相比 Channel 有什么优势呢?优势可太多了:首先,生成器闭包里 callback 的代码块,严格确保了生产和消费必然交替执行,也即严格的先进先出、进了就出、不进不出,所以不需要单独开辟堆内存去维护一个队列,那没有队列自然也就没有锁,没有锁自然也就没有阻塞或挂起。其次,Seq 本质上是消费监听生产,没有生产自然没有消费,如果生产过剩了——啊,生产永远不会过剩,因为 Seq 是惰性的,哪怕生产者在那儿 while 死循环无限生产,也不过是个司空见惯的无限流罢了。


这就是生成器的另一种理解方式,一个无队列、无锁、无阻塞的通道。Go 语言 channel 常被诟病的死锁和内存泄露问题,在 Seq 身上压根就不存在;Kotlin 搞出来的异步流 Flow 和同步流 Sequence 这两套大同小异的 API,都能被 Seq 统一替换。


可以说,没有比 Seq 更安全的通道实现了,因为根本就没有安全问题。生产了没有消费?Seq 本来就是惰性的,没有消费,那就啥也不会生产。消费完了没有关闭通道?Seq 本来就不需要关闭——一个 lambda 而已有啥好关闭的。


为了更直观的理解,这里给一个简单的通道示例。先随便实现一个基于 ForkJoinPool 的异步消费接口,该接口允许用户自由选择消费完后是否 join。


default void asyncConsume(Consumer<T> consumer) {    ForkJoinPool pool = ForkJoinPool.commonPool();    map(t -> pool.submit(() -> consumer.accept(t))).cache().consume(ForkJoinTask::join);}
复制代码


有了异步消费接口,立马就可以演示出 Seq 的通道功能。


@Testpublic void testChan() {    // 生产无限的自然数,放入通道seq,这里流本身就是通道,同步流还是异步流都无所谓    Seq<Long> seq = c -> {        long i = 0;        while (true) {            c.accept(i++);        }    };    long start = System.currentTimeMillis();    // 通道seq交给消费者,消费者表示只要偶数,只要5个    seq.filter(i -> (i & 1) == 0).take(5).asyncConsume(i -> {        try {            Thread.sleep(1000);            System.out.printf("produce %d and consume\n", i);        } catch (InterruptedException e) {            throw new RuntimeException(e);        }    });    System.out.printf("elapsed time: %dms\n", System.currentTimeMillis() - start);}
复制代码


运行结果


produce 0 and consumeproduce 8 and consumeproduce 6 and consumeproduce 4 and consumeproduce 2 and consumeelapsed time: 1032ms
复制代码


可以看到,由于消费是并发执行的,所以哪怕每个元素的消费都要花 1 秒钟,最终总体耗时也就比 1 秒多一点点。当然,这和传统的通道模式还是不太一样,比如实际工作线程就有很大区别。更全面的设计是在流的基础上加上无锁非阻塞队列实现正经 Channel,可以附带解决 Go 通道的许多问题同时提升性能,后面我会另写文章专门讨论。

生成器的应用场景

上文介绍了生成器的本质特性,它是一个 consumer of callback,它可以以闭包的形式完美封装 IO 操作,它可以无缝切换为异步流和并发流,并在异步交互中扮演一个无锁的通道角色。除去这些核心特性带来的优势外,它还有非常多有趣且有价值的应用场景。

树遍历

一个 callback 函数,它的命运就决定了它是不会在乎自己被放到哪里、被怎么使用的,比如说,放进递归里。而递归的一个典型场景就是树遍历。作为对比,不妨先看看在 Python 里怎么利用 yield 遍历一棵二叉树的:


def scan_tree(node):    yield node.value    if node.left:        yield from scan_tree(node.left)    if node.right:        yield from scan_tree(node.right)
复制代码


对于 Seq,由于 Java 不允许函数内部套函数,所以要稍微多写一点。核心原理其实很简单,把 callback 函数丢给递归函数,每次递归记得捎带上就行。


//static <T> Seq<T> of(T... ts) {//    return Arrays.asList(ts)::forEach;//}
// 递归函数public static <N> void scanTree(Consumer<N> c, N node, Function<N, Seq<N>> sub) { c.accept(node); sub.apply(node).consume(n -> { if (n != null) { scanTree(c, n, sub); } });}
// 通用方法,可以遍历任何树public static <N> Seq<N> ofTree(N node, Function<N, Seq<N>> sub) { return c -> scanTree(c, node, sub);}
// 遍历一个二叉树public static Seq<Node> scanTree(Node node) { return ofTree(node, n -> Seq.of(n.left, n.right));}
复制代码


这里的 ofTree 就是一个非常强大的树遍历方法。遍历树本身并不是啥稀罕东西,但把遍历的过程输出为一个流,那想象空间就很大了。在编程语言的世界里树的构造可以说到处都是。比方说,我们可以十分简单的构造出一个遍历 JSONObject 的流。


static Seq<Object> ofJson(Object node) {    return Seq.ofTree(node, n -> c -> {        if (n instanceof Iterable) {            ((Iterable<?>)n).forEach(c);        } else if (n instanceof Map) {            ((Map<?, ?>)n).values().forEach(c);        }    });}
复制代码


然后分析 JSON 就会变得十分方便,比如你想校验某个 JSON 是否存在 Integer 字段,不管这个字段在哪一层。使用流的 any/anyMatch 这样的方法,一行代码就能搞定:


boolean hasInteger = ofJson(node).any(t -> t instanceof Integer);
复制代码


这个方法的厉害之处不仅在于它足够简单,更在于它是一个短路操作。用正常代码在一个深度优先的递归函数里执行短路,要不就抛出异常,要不就额外添加一个上下文参数参与递归(只有在返回根节点后才能停止),总之实现起来都挺麻烦。但是使用 Seq,你只需要一个 any/all/none。


再比如你想校验某个 JSON 字段里是否存在非法字符串“114514”,同样也是一行代码:


boolean isIllegal = ofJson(node).any(n -> (n instanceof String) && ((String)n).contains("114514"));
复制代码


对了,JSON 的前辈 XML 也是树的结构,结合众多成熟的 XML 的解析器,我们也可以实现出类似的流式扫描工具。比如说,更快的 Excel 解析器?

更好用的笛卡尔积

笛卡尔积对大部分开发而言可能用处不大,但它在函数式语言中是一种颇为重要的构造,在运筹学领域构建最优化模型时也极其常见。此前 Java 里若要利用 Stream 构建多重笛卡尔积,需要多层 flatMap 嵌套。


public static Stream<Integer> cartesian(List<Integer> list1, List<Integer> list2, List<Integer> list3) {    return list1.stream().flatMap(i1 ->        list2.stream().flatMap(i2 ->            list3.stream().map(i3 ->                 i1 + i2 + i3)));}
复制代码


对于这样的场景,Scala 提供了一种语法糖,允许用户以 for 循环+yield[11]的方式来组合笛卡尔积。不过 Scala 的 yield 就是个纯语法糖,与生成器并无直接关系,它会在编译阶段将代码翻译为上面 flatMap 的形式。这种糖形式上等价于 Haskell 里的 do annotation[12]。


好在现在有了生成器,我们有了更好的选择,可以在不增加语法、不引入关键字、不麻烦编译器的前提下,直接写个嵌套 for 循环并输出为流。且形式更为自由——你可以在 for 循环的任意一层随意添加代码逻辑。


public static Seq<Integer> cartesian(List<Integer> list1, List<Integer> list2, List<Integer> list3) {    return c -> {        for (Integer i1 : list1) {            for (Integer i2 : list2) {                for (Integer i3 : list3) {                    c.accept(i1 + i2 + i3);                }            }        }    };}
复制代码


换言之,Java 不需要这样的糖。Scala 或许原本也可以不要。

可能是 Java 下最快的 CSV/Excel 解析器

我在前文多次强调生成器将带来显著的性能优势,这一观点除了有理论上的支撑,也有明确的工程实践数据,那就是我为 CSV 家族所开发的架构统一的解析器。所谓 CSV 家族除了 CSV 以外,还包括 Excel 与阿里云的 ODPS,其实只要形式符合其统一范式,就都能进入这个家族。


但是对于 CSV 这一家子的处理其实一直是 Java 语言里的一个痛点。ODPS 就不说了,好像压根就没有。CSV 的库虽然很多,但好像都不是很让人满意,要么 API 繁琐,要么性能低下,没有一个的地位能与 Python 里的 Pandas 相提并论。其中相对知名一点的有 OpenCSV[13],Jackson 的 jackson-dataformat-csv[14],以及号称最快的 univocity-parsers[15]。


Excel 则不一样,有集团开源软件 EasyExcel[16]珠玉在前,我只能确保比它快,很难也不打算比它功能覆盖全。


对于其中的 CsvReader 实现,由于市面上类似产品实在太多,我也没精力挨个去比,我只能说反正它比公开号称最快的那个还要快不少——大概一年前我实现的 CsvReader 在我办公电脑上的速度最多只能达到 univocity-parsers 的 80%~90%,不管怎么优化也死活拉不上去。直到后来我发现了生成器机制并对其重构之后,速度直接反超前者 30%到 50% ,成为我已知的类似开源产品里的最快实现。


对于 Excel,在给定的数据集上,我实现的 ExcelReader 比 EasyExcel 快 50%~55% ,跟 POI 就懒得比了。测试详情见以上链接。


注:最近和 Fastjson 作者高铁有很多交流,在暂未正式发布的 Fastjson2 的 2.0.28-SNAPSHOT 版本上,其 CSV 实现的性能在多个 JDK 版本上已经基本追平我的实现。出于严谨,我只能说我的实现在本文发布之前可能是已知最快的哈哈。

改造 EasyExcel,让它可以直接输出流

上面提到的 EasyExcel 是阿里开源的知名产品,功能丰富,质量优秀,广受好评。恰好它本身又一个利用回调函数进行 IO 交互的经典案例,倒是也非常适合拿来作为例子讲讲。根据官网示例,我们可以构造一个最简单的基于回调函数的 excel 读取方法


public static <T> void readEasyExcel(String file, Class<T> cls, Consumer<T> consumer) {    EasyExcel.read(file, cls, new PageReadListener<T>(list -> {        for (T person : list) {            consumer.accept(person);        }    })).sheet().doRead();}
复制代码


EasyExcel 的使用是通过回调监听器来捕获数据的。例如这里的 PageReadListener,内部有一个 list 缓存。缓存满了,就喂给回调函数,然后继续刷缓存。这种基于回调函数的做法的确十分经典,但是难免有一些不方便的地方:


  1. 消费者需要关心生产者的内部缓存,比如这里的缓存就是一个 list。

  2. 消费者如果想拿走全部数据,需要放一个 list 进去挨个 add 或者每次 addAll。这个操作是非惰性的。

  3. 难以把读取过程转变为 Stream,任何流式操作都必须要用 list 存完并转为流后,才能再做处理。灵活性很差。

  4. 消费者不方便干预数据生产过程,比如达到某种条件(例如个数)后直接中断,除非你在实现回调监听器时把这个逻辑 override 进去[17]。


利用生成器,我们可以将上面示例中读取 excel 的过程完全封闭起来,消费者不需要传入任何回调函数,也不需要关心任何内部细节——直接拿到一个流就好。改造起来也相当简单,主体逻辑原封不动,只需要把那个 callback 函数用一个 consumer 再包一层即可:


public static <T> Seq<T> readExcel(String pathName, Class<T> head) {    return c -> {        ReadListener<T> listener = new ReadListener<T>() {            @Override            public void invoke(T data, AnalysisContext context) {                c.accept(data);            }
@Override public void doAfterAllAnalysed(AnalysisContext context) {} }; EasyExcel.read(pathName, head, listener).sheet().doRead(); };}
复制代码


这一改造我已经给 EasyExcel 官方提了 PR[18],不过不是输出 Seq,而是基于生成器原理构建的 Stream,后文会有构建方式的具体介绍。


更进一步的,完全可以将对 Excel 的解析过程改造为生成器方式,利用一次性的 callback 调用避免内部大量状态的存储与修改,从而带来可观的性能提升。这一工作由于要依赖上文 CsvReader 的一系列 API,所以暂时没法提交给 EasyExcel。

用生成器构建 Stream

生成器作为一种全新的设计模式,固然可以提供更为强大的流式 API 特性,但是毕竟不同于大家最为熟悉 Stream,总会有个适应成本或者迁移成本。对于既有的已经成熟的库而言,使用 Stream 依然是对用户最为负责的选择。值得庆幸的是,哪怕机制完全不同,Stream 和 Seq 仍是高度兼容的。


首先,显而易见,就如同 Iterable 那样,Stream 天然就是一个 Seq:


Stream<Integer> stream = Stream.of(1, 2, 3);Seq<Integer> seq = stream::forEach;
复制代码


那反过来 Seq 能否转化为 Stream 呢?在 Java Stream 提供的官方实现里,有一个 StreamSupport.stream 的构造工具,可以帮助用户将一个 iterator 转化为 stream。针对这个入口,我们其实可以用生成器来构造一个非标准的 iterator:不实现 hastNext 和 next,而是单独重载 forEachRemaining 方法,从而 hack 进 Stream 的底层逻辑——在那迷宫一般的源码里,有一个非常隐秘的角落,一个叫 AbstractPipeline.copyInto 的方法,会在真正执行流的时候调用 Spliterator 的 forEachRemaining 方法来遍历元素——虽然这个方法原本是通过 next 和 hasNext 实现的,但当我们把它重载之后,就可以做到假狸猫换真太子。


public static <T> Stream<T> stream(Seq<T> seq) {    Iterator<T> iterator = new Iterator<T>() {        @Override        public boolean hasNext() {            throw new NoSuchElementException();        }
@Override public T next() { throw new NoSuchElementException(); }
@Override public void forEachRemaining(Consumer<? super T> action) { seq.consume(action::accept); } }; return StreamSupport.stream( Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false);}
复制代码


也就是说,咱现在甚至能用生成器来构造 Stream 了!比如:


public static void main(String[] args) {    Stream<Integer> stream = stream(c -> {        c.accept(0);        for (int i = 1; i < 5; i++) {            c.accept(i);        }    });    System.out.println(stream.collect(Collectors.toList()));}
复制代码


图灵在上,感谢 Stream 的作者没有偷这个懒,没有用 while hasNext 来进行遍历,不然这操作咱还真玩不了。


当然由于这里的 Iterator 本质已经发生了改变,这种操作也会有一些限制,没法再使用 parallel 方法将其转为并发流,也不能用 limit 方法限制数量。不过除此以外,像 map, filter, flatMap, forEach, collect 等等方法,只要不涉及流的中断,都可以正常使用。

无限递推数列

实际应用场景不多。Stream 的 iterate 方法可以支持单个种子递推的无限数列,但两个乃至多个种子的递推就无能为力了,比如最受程序员喜爱的炫技专用斐波那契数列:


public static Seq<Integer> fibonaaci() {    return c -> {        int i = 1, j = 2;        c.accept(i);        c.accept(j);        while (true) {            c.accept(j = i + (i = j));        }    };}
复制代码


另外还有一个比较有意思的应用,利用法里树的特性,进行丢番图逼近[22],简而言之,就是用有理数逼近实数。这是一个非常适合拿来做 demo 的且足够有趣的例子,限于篇幅原因我就不展开了,有机会另写文章讨论。

流的更多特性

流的聚合

如何设计流的聚合接口是一个很复杂的话题,若要认真讨论几乎又可以整出大几千字,限于篇幅这里简单提几句好了。在我看来,好的流式 API 应该要让流本身能直接调用聚合函数,而不是像 Stream 那样,先用 Collectors 构造一个 Collector,再用 stream 去调用 collect。可以对比下以下两种方式,孰优孰劣一目了然:


Set<Integer> set1 = stream.collect(Collectors.toSet());String string1 = stream.map(Integer::toString).collect(Collectors.joinning(","));
Set<Integer> set2 = seq.toSet();String string2 = seq.join(",", Integer::toString);
复制代码


这一点上,Kotlin 做的比 Java 好太多。不过有利往往也有弊,从函数接口而非用户使用的角度来说,Collector 的设计其实更为完备,它对于流和 groupBy 是同构的:所有能用 collector 对流直接做到的事情,groupBy 之后用相同的 collector 也能做到,甚至 groupBy 本身也是一个 collector。


所以更好的设计是既保留函数式的完备性与同构性,同时也提供由流直接调用的快捷方式。为了说明,这里举一个 Java 和 Kotlin 都没有实现但需求很普遍的例子,求加权平均:


public static void main(String[] args) {    Seq<Integer> seq = Seq.of(1, 2, 3, 4, 5, 6, 7, 8, 9);
double avg1 = seq.average(i -> i, i -> i); // = 6.3333 double avg2 = seq.reduce(Reducer.average(i -> i, i -> i)); // = 6.3333 Map<Integer, Double> avgMap = seq.groupBy(i -> i % 2, Reducer.average(i -> i, i -> i)); // = {0=6.0, 1=6.6} Map<Integer, Double> avgMap2 = seq.reduce(Reducer.groupBy(i -> i % 2, Reducer.average(i -> i, i -> i)));}
复制代码


上面代码里的 average,Reducer.average,以及用在 groupBy 里的 average 都是完全同构的,换句话说,同一个 Reducer,可以直接用在流上,也可以对流进行分组之后用在每一个子流上。这是一套类似 Collector 的 API,既解决了 Collector 的一些问题,同时也能提供更丰富的特性。重点是,这玩意儿是开放的,且机制足够简单,谁都能写。

流的分段处理

分段处理其实是一直以来各种流式 API 的一个盲点,不论是 map 还是 forEach,我们偶尔会希望前半截和后半截采取不同的处理逻辑,或者更直接一点的说希望第一个元素特殊处理。对此,我提供了三种 API,元素替换 replace,分段 map,以及分段消费 consume。


还是以前文提到的下划线转驼峰的场景作为一个典型例子:在将下划线字符串 split 之后,对第一个元素使用 lowercase,对剩下的其他元素使用 capitalize。使用分段的 map 函数,可以更快速的实现这一个功能。


static String underscoreToCamel(String str, UnaryOperator<String> capitalize) {    // split=>分段map=>join    return Seq.of(str.split("_")).map(capitalize, 1, String::toLowerCase).join("");}
复制代码


再举个例子,当你解析一个 CSV 文件的时候,对于存在表头的情况,在解析时就要分别处理:利用表头信息对字段重排序,剩余的内容则按行转为 DTO。使用适当的分段处理逻辑,这一看似麻烦的操作是可以在一个流里一次性完成的。

一次性流还是可重用流?

熟悉 Stream 的同学应该清楚,Stream 是一种一次性的流,因为它的数据来源于一个 iterator,二次调用一个已经用完的 Stream 会抛出异常。Kotlin 的 Sequence 则采用了不同的设计理念,它的流来自于 Iterable,大部分情况下是可重用的。但是 Kotlin 在读文件流的时候,采用的依然是和 Stream 同样的思路,将 BufferedReader 封装为一个 Iterator,所以也是一次性的。


不同于以上二者,生成器的做法显然要更为灵活,流是否可重用,完全取决于被生成器包进去的数据源是否可重用。比如上面代码里不论是本地文件还是 ODPS 表,只要数据源的构建是在生成器里边完成的,那自然就是可重用的。你可以像使用一个普通 List 那样,多次使用同一个流。从这个角度上看,生成器本身就是一个 Immutable,它的元素生产,直接来自于代码块,不依赖于运行环境,不依赖于内存状态数据。对于任何消费者而言,都可以期待同一个生成器给出始终一致的流。


生成器的本质和人类一样,都是复读机


当然,复读机复读也是要看成本的,对于像 IO 这种高开销的流需要重复使用的场景,反复去做同样的 IO 操作肯定不合理,我们不妨设计出一个 cache 方法用于流的缓存。


最常用的缓存方式,是将数据读进一个 ArrayList。由于 ArrayList 本身并没有实现 Seq 的接口,所以不妨造一个 ArraySeq,它既是 ArrayList,又是 Seq——正如我前面多次提到的,List 天然就是 Seq。


public class ArraySeq<T> extends ArrayList<T> implements Seq<T> {    @Override    public void consume(Consumer<T> consumer) {        forEach(consumer);    }}
复制代码


有了 ArraySeq 之后,就可以立马实现流的缓存


default Seq<T> cache() {    ArraySeq<T> arraySeq = new ArraySeq<>();    consume(t -> arraySeq.add(t));    return arraySeq;}
复制代码


细心的朋友可能会注意到,这个 cache 方法我在前面构造并发流的时候已经用到了。除此以外,借助 ArraySeq,我们还能轻易的实现流的排序,感兴趣的朋友可以自行尝试。

二元流

既然可以用 consumer of callback 作为机制来构建流,那么有意思的问题来了,如果这个 callback 不是 Consumer 而是个 BiConsumer 呢?——答案就是,二元流!


public interface BiSeq<K, V> {    void consume(BiConsumer<K, V> consumer);}
复制代码


二元流是一个全新概念,此前任何基于迭代器的流,比如 Java Stream,Kotlin Sequence,还有 Python 的生成器,等等等等,都玩不了二元流。我倒也不是针对谁,毕竟在座诸位的 next 方法都必须吐出一个对象实例,意味着即便想构造同时有两个元素的流,也必须包进一个 Pair 之类的结构体里——故而其本质上依然是一个一元流。当流的元素数量很大时,它们的内存开销将十分显著。


哪怕是看起来最像二元流的 Python 的 zip:


for i, j in zip([1, 2, 3], [4, 5, 6]):    pass
复制代码


这里的 i 和 j,实际仍是对一个 tuple 进行解包之后的结果。


但是基于 callback 机制的二元流和它们完全不一样,它和一元流是同等轻量的!这就意味着节省内存同时还快。比如我在实现 CsvReader 时,重写了 String.split 方法使其输出为一个流,这个流与 DTO 字段 zip 为二元流,就能实现值与字段的一对一匹配。不需要借助下标,也不需要创建临时数组或 list 进行存储。每一个被分割出来的 substring,在整个生命周期里都是一次性的,随用随丢。


这里额外值得一提的是,同 Iterable 类似,Java 里的 Map 天生就是一个二元流。


Map<Integer, String> map = new HashMap<>();BiSeq<Integer, String> biSeq = map::forEach;
复制代码


有了基于 BiConsumer 的二元流,自然也可以有基于 TriConsumer 三元流,四元流,以及基于 IntConsumer、DoubleConsumer 等原生类型的流等等。这是一个真正的流的大家族,里边甚至还有很多不同于一元流的特殊操作,这里就不过多展开了,只提一个:


二元流和三元流乃至多元流,可以在 Java 里构造出货真价实的惰性元组 tuple。当你的函数需要返回多个返回值的时候,除了手写一个 Pair/Triple,你现在有了更好的选择,就是用生成器的方式直接返回一个 BiSeq/TriSeq,这比直接的元组还额外增加了的惰性计算的优势,可以在真正需要使用的时候再用回调函数去消费。你甚至连空指针检查都省了。

结束语

首先感谢你能读到这里,我要讲的故事大体已经讲完了,虽然还有许多称得上有趣的细节没放出来讨论,但已经不影响这个故事的完整性了。我想要再次强调的是,上面这所有的内容,代码也好,特性也好,案例也罢,包括我所实现的 CsvReader 系列——全部都衍生自这一个简单接口,它是一切的源头,是梦开始的地方,完全值得我在文末再写一遍


public interface Seq<T> {    void consume(Consumer<T> consumer);}
复制代码


对于这个神奇的接口,我愿称之为:


道生一——先有 Seq 定义


一生二——导出 Seq 一体两面的特性,既是流,又是生成器


二生三——由生成器实现出丰富的流式 API,而后导出可安全隔离的 IO 流,最终导出异步流、并发流以及通道特性


至于三生万物的部分,还会有后续文章,期待能早日对外开源吧。

附录

附录的原本内容包含 API 文档,引用地址,以及性能 benchmark。由于暂未开源,这里仅介绍下 Monad 相关。

Monad

Monad[24]是来自于范畴论里的一个概念,同时也是函数式编程语言代表者 Haskell 里极为重要的一种设计模式。但它无论是对流还是对生成器而言都不是必须的,所以放在附录讲。


我之所以要提 Monad,是因为 Seq 在实现了 unit, flatMap 之后,自然也就成为了一种 Monad。对于关注相关理论的同学来说,如果连提都不提,可能会有些难受。遗憾的是,虽然 Seq 在形式上是个 Monad,但它们在理念上是存在一些冲突的。比方说在 Monad 里至关重要的 flatMap,既是核心定义之一,还承担着组合与拆包两大重要功能。甚至连 map 对 Monad 来说都不是必须的,它完全可以由 flatMap 和 unit 推导出来(推导过程见下文),反之还不行。但是对于流式 API 而言,map 才是真正最为关键和高频的操作,flatMap 反而没那么重要,甚至压根都不太常用。


Monad 这种设计模式之所以被推崇备至,是因为它有几个重要特性,惰性求值、链式调用以及副作用隔离——在纯函数的世界里,后者甚至称得上是性命攸关的大事。但是对包括 Java 在内的大部分正常语言来说,实现惰性求值更直接的方式是面向接口而不是面向对象(实例)编程,接口由于没有成员变量,天生就是惰性的。链式操作则是流的天生特性,无须赘述。至于副作用隔离,这同样不是 Monad 的专利。生成器用闭包+callback 的方式也能做到,前文都有介绍。

推导 map 的实现

首先,map 可以由 unit 与 flatMap 直接组合得到,这里不妨称之为 map2:


default <E> Seq<E> map2(Function<T, E> function) {    return flatMap(t -> unit(function.apply(t)));}
复制代码


即把类型为 T 的元素,转变为类型为 E 的 Seq,再用 flatMap 合并。这个是最直观的,不需要流的先验概念,是 Monad 的固有属性。当然其在效率上肯定很差,我们可以对其化简。


已知 unit 与 flatMap 的实现


static <T> Seq<T> unit(T t) {    return c -> c.accept(t);}
default <E> Seq<E> flatMap(Function<T, Seq<E>> function) { return c -> supply(t -> function.apply(t).supply(c));}
复制代码


先展开 unit,代入上面 map2 的实现,有


default <E> Seq<E> map3(Function<T, E> function) {    return flatMap(t -> c -> c.accept(function.apply(t)));}
复制代码


把这个 flatMap 里边的函数提出来变成 flatFunction,再展开 flatMap,有


default <E> Seq<E> map4(Function<T, E> function) {    Function<T, Seq<E>> flatFunction = t -> c -> c.accept(function.apply(t));    return consumer -> supply(t -> flatFunction.apply(t).supply(consumer));}
复制代码


容易注意到,这里的 flatFunction 连续有两个箭头,它其实就完全等价于一个双参数(t, c)函数的柯里化 currying。我们对其做逆柯里化操作,反推出这个双参数函数:


Function<T, Seq<E>> flatFunction = t -> c -> c.accept(function.apply(t));// 等价于BiConsumer<T, Consumer<E>> biConsumer = (t, c) -> c.accept(function.apply(t));
复制代码


可以看到,这个等价的双参数函数其实就是一个 BiConsumer ,再将其代入 map4,有


default <E> Seq<E> map5(Function<T, E> function) {    BiConsumer<T, Consumer<E>> biConsumer = (t, c) -> c.accept(function.apply(t));    return c -> supply(t -> biConsumer.accept(t, c));}
复制代码


注意到,这里 biConsumer 的实参和形参是完全一致的,所以可以将它的方法体代入下边直接替换,于是有


default <E> Seq<E> map6(Function<T, E> function) {    return c -> supply(t -> c.accept(function.apply(t)));}
复制代码


到这一步,这个 map6,就和前文从流式概念出发直接写出来的 map 完全一致了。证毕!


参考链接:


[1]https://en.wikipedia.org/wiki/Generator_(computer_programming)


[2]https://www.pythonlikeyoumeanit.com/Module2_EssentialsOfPython/Generators_and_Comprehensions.html


[3]https://openjdk.org/projects/loom/


[4]https://en.wikipedia.org/wiki/Continuation


[5]https://hackernoon.com/the-magic-behind-python-generator-functions-bc8eeea54220


[6]https://en.wikipedia.org/wiki/Continuation-passing_style


[7]https://kotlinlang.org/spec/asynchronous-programming-with-coroutines.html


[8]https://zh.wikipedia.org/wiki/Map_(%E9%AB%98%E9%98%B6%E5%87%BD%E6%95%B0)


[9]https://crypto.stanford.edu/~blynn/haskell/io.html


[10]https://www.autohotkey.com/docs/v2/


[11]https://stackoverflow.com/questions/1052476/what-is-scalas-yield


[12]https://stackoverflow.com/questions/10441559/scala-equivalent-of-haskells-do-notation-yet-again


[13]https://opencsv.sourceforge.net/


[14]https://github.com/FasterXML/jackson-dataformats-text/tree/master/csv


[15]https://github.com/uniVocity/univocity-parsers


[16]https://github.com/alibaba/easyexcel


[17]https://github.com/alibaba/easyexcel/issues/1566


[18]https://github.com/alibaba/easyexcel/pull/3052


[20]https://github.com/alibaba/easyexcel/pull/3052


[21]https://github.com/alibaba/fastjson2/blob/f30c9e995423603d5b80f3efeeea229b76dc3bb8/extension/src/main/java/com/alibaba/fastjson2/support/csv/CSVParser.java#L197


[22]https://www.bilibili.com/video/BV1ha41137oW/?is_story_h5=false&p=1&share_from=ugc&share_medium=android&share_plat=android&share_session_id=96a03926-820b-4c9f-a2fd-162944103bed&share_source=COPY&share_tag=s_i&timestamp=1663058544&unique_k=p94n8tD


[24]https://en.wikipedia.org/wiki/Monad_(functional_programming)


更多内容,请点击此处进入云原生技术社区查看

发布于: 刚刚阅读数: 3
用户头像

阿里云云原生 2019-05-21 加入

还未添加个人简介

评论

发布
暂无评论
一种新的流:为 Java 加入生成器(Generator)特性_Java_阿里巴巴云原生_InfoQ写作社区