写点什么

Java Stream 源码分析

作者:java易二三
  • 2023-07-29
    湖南
  • 本文字数:3697 字

    阅读完需:约 12 分钟

前言 Java 8 的 Stream 使得代码更加简洁易懂,本篇文章深入分析 Java Stream 的工作原理,并探讨 Steam 的性能问题。

Java 8 集合中的 Stream 相当于高级版的 Iterator,它可以通过 Lambda 表达式对集合进行各种非常便利、高效的聚合操作(Aggregate Operation),或者大批量数据操作 (Bulk Data Operation)。

Stream 的聚合操作与数据库 SQL 的聚合操作 sorted、filter、map 等类似。我们在应用层就可以高效地实现类似数据库 SQL 的聚合操作了,而在数据操作方面,Stream 不仅可以通过串行的方式实现数据操作,还可以通过并行的方式处理大批量数据,提高数据的处理效率。

操作分类

官方将 Stream 中的操作分为两大类:

  • 中间操作(Intermediate operations),只对操作进行了记录,即只会返回一个流,不会进行计算操作。

  • 终结操作(Terminal operations),实现了计算操作。

中间操作又可以分为:

  • 无状态(Stateless)操作,元素的处理不受之前元素的影响。

  • 有状态(Stateful)操作,指该操作只有拿到所有元素之后才能继续下去。

终结操作又可以分为:

  • 短路(Short-circuiting)操作,指遇到某些符合条件的元素就可以得到最终结果

  • 非短路(Unshort-circuiting)操作,指必须处理完所有元素才能得到最终结果。

操作分类详情如下图所示:

源码结构

Stream 相关类和接口的继承关系如下图所示:



BaseStream

最顶端的接口类,定义了流的基本接口方法,最主要的方法为 spliterator、isParallel。


Stream

最顶端的接口类。定义了流的常用方法,例如 map、filter、sorted、limit、skip、collect 等。



ReferencePipeline

ReferencePipeline 是一个结构类,定义内部类组装了各种操作流,定义了HeadStatelessOpStatefulOp三个内部类,实现了 BaseStream 与 Stream 的接口方法。


Sink

Sink 接口定义了 Stream 之间的操作行为,包含 begin()end()cancellationRequested()accpt()四个方法。ReferencePipeline 最终会将整个 Stream 流操作组装成一个调用链,而这条调用链上的各个 Stream 操作的上下关系就是通过 Sink 接口协议来定义实现的。



操作叠加

Stream 的基础用法就不再叙述了,这里从一段代码开始,分析 Stream 的工作原理。

@Testpublic void testStream() {    List<String> names = Arrays.asList("kotlin", "java", "go");    int maxLength = names.stream().filter(name -> name.length() <= 4).map(String::length)            .max(Comparator.naturalOrder()).orElse(-1);    System.out.println(maxLength);}

复制代码

当使用 Stream 时,主要有 3 部分组成,下面一一讲解。

加载数据源

调用 names.stream() 方法,会初次加载 ReferencePipeline 的 Head 对象,此时为加载数据源操作。

java.util.Collection#stream


java.util.Collection#stream


default Stream<E> stream() {return StreamSupport.stream(spliterator(), false);}


StreamSupport 类中的 stream 方法,初始化了一个 ReferencePipeline 的 Head 内部类对象。


java.util.stream.StreamSupport#stream(java.util.Spliterator<t style="margin: 0px; padding: 0px;">, boolean)</t>


public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {Objects.requireNonNull(spliterator);return new ReferencePipeline.Head<>(spliterator,StreamOpFlag.fromCharacteristics(spliterator),parallel);}


中间操作接着为 filter(name -> name.length() <= 4).mapToInt(String::length),是中间操作,分为无状态中间操作 StatelessOp 对象和有状态操作 StatefulOp 对象,此时的 Stage 并没有执行,而是通过 AbstractPipeline 生成了一个中间操作 Stage 链表。


java.util.stream.ReferencePipeline#filter


@Overridepublic final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {Objects.requireNonNull(predicate);return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,StreamOpFlag.NOT_SIZED) {@OverrideSink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {@Overridepublic void begin(long size) {downstream.begin(-1);}


            @Override            public void accept(P_OUT u) {                if (predicate.test(u))                    downstream.accept(u);            }        };    }};
复制代码


}


java.util.stream.ReferencePipeline#map


@Override@SuppressWarnings("unchecked")public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {Objects.requireNonNull(mapper);return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {@OverrideSink<P_OUT> opWrapSink(int flags, Sink<R> sink) {return new Sink.ChainedReference<P_OUT, R>(sink) {@Overridepublic void accept(P_OUT u) {downstream.accept(mapper.apply(u));}};}};}


可以看到 filter 和 map 方法都返回了一个新的 StatelessOp 对象。new StatelessOp 将会调用父类 AbstractPipeline 的构造函数,这个构造函数将前后的 Stage 联系起来,生成一个 Stage 链表:


@Overridepublic final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {Objects.requireNonNull(predicate);return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,StreamOpFlag.NOT_SIZED) {@OverrideSink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {@Overridepublic void begin(long size) {downstream.begin(-1);}


            @Override            public void accept(P_OUT u) {                if (predicate.test(u))                    downstream.accept(u);            }        };    }};
复制代码


}

this.previousStage = previousStage;this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);this.sourceStage = previousStage.sourceStage;if (opIsStateful())    sourceStage.sourceAnyStateful = true;this.depth = previousStage.depth + 1;
复制代码


}


终结操作最后为 max(Comparator.naturalOrder()),是终结操作,会生成一个最终的 Stage,通过这个 Stage 触发之前的中间操作,从最后一个 Stage 开始,递归产生一个 Sink 链。


java.util.stream.ReferencePipeline#max

@Overridepublic final Optional<P_OUT> max(Comparator<? super P_OUT> comparator) {    return reduce(BinaryOperator.maxBy(comparator));}
复制代码


最终调用到 java.util.stream.AbstractPipeline#wrapSink,这个方法会调用 opWrapSink 生成一个 Sink 链表,对应到本文的例子,就是 filter 和 map 操作。


@Override@SuppressWarnings("unchecked")final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {Objects.requireNonNull(sink);for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {    sink = p.opWrapSink(p.previousStage.combinedFlags, sink);}return (Sink<P_IN>) sink;}
复制代码


在上面 opWrapSink 上断点调试,发现最终会调用到本例中的 filter 和 map 操作


wrapAndCopyInto 生成 Sink 链表后,会通过 copyInfo 方法执行 Sink 链表的具体操作。


@Overridefinal <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {Objects.requireNonNull(wrappedSink);


if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {    wrappedSink.begin(spliterator.getExactSizeIfKnown());    spliterator.forEachRemaining(wrappedSink);    wrappedSink.end();}else {    copyIntoWithCancel(wrappedSink, spliterator); }}
复制代码


上面的核心代码是:

spliterator.forEachRemaining(wrappedSink);

java.util.Spliterators.ArraySpliterator#forEachRemaining


@Overridepublic void forEachRemaining(Consumer<? super T> action) {

Object[] a; int i, hi; // hoist accesses and checks from loop

if (action == null)

throw new NullPointerException();

if ((a = array).length >= (hi = fence) &&

(i = index) >= 0 && i < (index = hi)) {

do { action.accept((T)a[i]); } while (++i < hi);}}

断点调试,可以发现首先进入了 filter 的 Sink,其中 accept 方法的入参是 list 中的第一个元素“kotlin”(代码中的 3 个元素是:"kotlin", "java", "go")。filter 的传入是一个 Lambda 表达式:


filter(name -> name.length() <= 4)


显然这个第一个元素“kotlin”的 predicate 是不会进入的。


对于第二个元素“java”,predicate.test 会返回 true(字符串“java”的长度<=4),则会进入 map 的 accept 方法。



本次调用 accept 方法时,empty 为 false,会将 map 后的结果(int 类型的 4)赋值给 t。


用户头像

java易二三

关注

还未添加个人签名 2021-11-23 加入

还未添加个人简介

评论

发布
暂无评论
Java Stream 源码分析_Java_java易二三_InfoQ写作社区