写点什么

Java Stream 源码深入解析

用户头像
Zexho
关注
发布于: 6 小时前

类图

概念解释

Pipline 和 Stage

Pipline 是流水线,表示一整个流程。Stage 表示流水线的其中一个阶段。是一个比较抽象层面的描述,因为 stage 主要表示一种逻辑上的顺序关系,而具体每一个阶段要干嘛、怎么干,使用 Sink 来进行描述。


new stream          //stage 0    .filter()       //stage 1    .sort()         //stage 2
复制代码


Sink

直译为水槽,生活中水槽的作用无非


  • 打开水龙头,知道有水要来

  • 水在水槽里, 可以进行一些操作

  • 打开水闸,放水 Java 中的 Sink 核心功能为:

  • begin(): 告诉该水槽水流要来了,可以进行一些初始化操作

  • accept():接受水流,然后进行操作

  • end():水流全部处理完了。看一个 sort()的示例,sort 这个 stage 的目的就是对所有水流进行排序,然后再流到下游。


private static final class SizedRefSortingSink<T> extends AbstractRefSortingSink<T> {        private T[] array;  //要进行排序,需要一个数组进行缓存        private int offset; 
SizedRefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) { super(sink, comparator); }
@Override @SuppressWarnings("unchecked") public void begin(long size) { if (size >= Nodes.MAX_ARRAY_SIZE) throw new IllegalArgumentException(Nodes.BAD_SIZE); //上游调用begin(),通知sort进行初始化操作,生产一个数组 array = (T[]) new Object[(int) size]; }
//上游调用end()方法,告诉sort水已经全部流过来了。sort开始执行操作 @Override public void end() { //操作 Arrays.sort(array, 0, offset, comparator); //告诉sort的下游准备接受水流 downstream.begin(offset); //一个个元素的传递给下游 if (!cancellationWasRequested) { for (int i = 0; i < offset; i++) downstream.accept(array[i]); } else { for (int i = 0; i < offset && !downstream.cancellationRequested(); i++) downstream.accept(array[i]); } //告诉下游水流传递结束 downstream.end(); //缓存清空 array = null; }
//上游调用accept()方法,将水流存储到到sort的缓存数组中 @Override public void accept(T t) { array[offset++] = t; } }
复制代码

创建 Head

疑问

  • 官方说 Stream 不存储数据,那么数据保存在那里呢?解答在后面。

使用方式

可以使用 Stream.of()创建一个流,例如


//创建方式 of()Stream<Integer> stream = Stream.of(1, 2, 3);
复制代码

源码分析

of()方法调用


StreamSupport.stream(Arrays.spliterator(arr, 0, arr.length), false);
复制代码


stream()方法逻辑:


public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {    Objects.requireNonNull(spliterator);    return new ReferencePipeline.Head<>(spliterator,                                        StreamOpFlag.fromCharacteristics(spliterator),                                        parallel);}
复制代码


调用了ReferencePipeline.Head<>,返回一个 Head 对象。Head 是 ReferencePipeline 的子类。可以理解为 Head 是流水线的第一个 stage。构造方法的主要了逻辑要一直 super()到AbstractPipeline


    /**     * The source spliterator. Only valid for the head pipeline.     * Before the pipeline is consumed if non-null then {@code sourceSupplier}     * must be null. After the pipeline is consumed if non-null then is set to     * null.     */    private Spliterator<?> sourceSpliterator;
/** * Constructor for the head of a stream pipeline. * * @param source {@code Spliterator} describing the stream source * @param sourceFlags the source flags for the stream source, described in * {@link StreamOpFlag} * @param parallel {@code true} if the pipeline is parallel */ AbstractPipeline(Spliterator<?> source, int sourceFlags, boolean parallel) { this.previousStage = null; //使用一个字段指向数据集合的Spliterator,后续终结操作的时候,引用的方式操作数据 this.sourceSpliterator = source; this.sourceStage = this; this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK; // The following is an optimization of: // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE); this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE; this.depth = 0; this.parallel = parallel; }
复制代码


疑问解答

  1. 官方说 Stream 不存储数据,那么数据保存在那里呢?

  2. Head 中保存数据源的 Spliterator 对象,后续操作 Spliterator 的方式操作数据

中间操作

几个疑问

  1. 各个中间操作是如何进行关联的?

  2. 如何执行完一个中间操作,然后执行下一个?

  3. 有状态的中间操作是怎么保存状态的?

  4. 懒加载如何实现的

使用方式

Stream<Integer> st = headStream.filter(e-> e=1).distinct().sort();//等同于Stream<Integer> afterFilter = headStream.filter(e -> e != 2);Stream<Integer> afterDistinct = afterFilter.distinct();Stream<Integer> afterSort = afterDistinct.sort();
复制代码

Filter

执行 filter(op)会发生什么?


Stream<Integer> afterFilter = head.filter(e -> e = 1);
复制代码


filter()方法定义在Stream类,实现在ReferencePipeline类。


//ReferencePipeline.class
@Overridepublic final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) { Objects.requireNonNull(predicate);
// 返回一个StatelessOp类 // 构造函数参数为(this,) return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SIZED) { @Override Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) { return new Sink.ChainedReference<P_OUT, P_OUT>(sink) { @Override public void begin(long size) { downstream.begin(-1); }
@Override public void accept(P_OUT u) { if (predicate.test(u)) downstream.accept(u); } }; } };}
复制代码


返回一个StatelessOp类(因为 filter 是一个无状态操作),看下StatelessOp类,是一个静态抽象内部类,继承了ReferencePipeline类。


//ReferencePipeline.class
/** * Base class for a stateless intermediate stage of a Stream. * * @param <E_IN> type of elements in the upstream source * @param <E_OUT> type of elements in produced by this stage * @since 1.8 */ abstract static class StatelessOp<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> { /** * Construct a new Stream by appending a stateless intermediate * operation to an existing stream. * * @param upstream The upstream pipeline stage * @param inputShape The stream shape for the upstream pipeline stage * @param opFlags Operation flags for the new stage */ StatelessOp(AbstractPipeline<?, E_IN, ?> upstream, StreamShape inputShape, int opFlags) { super(upstream, opFlags); assert upstream.getOutputShape() == inputShape; }
@Override final boolean opIsStateful() { return false; } }
复制代码


中间 super()会执行AbstractPipeline类的构造方法, 连接 stage 之间的关系


//AbstractPipeline.class
/** * Constructor for appending an intermediate operation stage onto an * existing pipeline. * * @param previousStage the upstream pipeline stage * @param opFlags the operation flags for the new stage, described in * {@link StreamOpFlag} */ AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) { if (previousStage.linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); previousStage.linkedOrConsumed = true; previousStage.nextStage = this;
this.previousStage = previousStage; this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK; this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags); this.sourceStage = previousStage.sourceStage; if (opIsStateful()) sourceStage.sourceAnyStateful = true; this.depth = previousStage.depth + 1; }
复制代码


Distinct

示例


Stream<Integer> afterDistinct = afterFilter.distinct();
复制代码


distinct 的方法实现在ReferencePipeline类下


@Overridepublic final Stream<P_OUT> distinct() {    return DistinctOps.makeRef(this);}
复制代码


调用 DistinctOps 类的 makeRef()方法,返回一个 StatefulOp 类,并重写了 4 个方法,实现逻辑在 opWrapSink()中:


    /**     * Appends a "distinct" operation to the provided stream, and returns the     * new stream.     *     * @param <T> the type of both input and output elements     * @param upstream a reference stream with element type T     * @return the new stream     */    static <T> ReferencePipeline<T, T> makeRef(AbstractPipeline<?, T, ?> upstream) {        // 返回一个StatefulOp类        return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE,                                                      StreamOpFlag.IS_DISTINCT | StreamOpFlag.NOT_SIZED) {
// 重写了以下几个方法,内容省略... <P_IN> Node<T> reduce(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {...}
@Override <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper, Spliterator<P_IN> spliterator, IntFunction<T[]> generator) {...}
@Override <P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {...}
@Override Sink<T> opWrapSink(int flags, Sink<T> sink) { Objects.requireNonNull(sink);
if (StreamOpFlag.DISTINCT.isKnown(flags)) { ... } else if (StreamOpFlag.SORTED.isKnown(flags)) { ... } else { // 返回一个SinkChainedReference类 return new Sink.ChainedReference<T, T>(sink) { //使用一个Set缓存数据,进行去重 Set<T> seen;
//当上游通知begin的时候,初始化Set @Override public void begin(long size) { seen = new HashSet<>(); downstream.begin(-1); }
//略 @Override public void end() { seen = null; downstream.end(); }
//如果已经存在,之间抛弃 @Override public void accept(T t) { if (!seen.contains(t)) { seen.add(t); downstream.accept(t); } } }; } } }; }
复制代码


StatefulOp 类与 StatelessOp 类相似,都是继承了ReferencePipeline类,然后中间 super()页会执行AbstractPipeline类的构造方法, 连接 stage 之间的关系


    /**     * Base class for a stateful intermediate stage of a Stream.     *     * @param <E_IN> type of elements in the upstream source     * @param <E_OUT> type of elements in produced by this stage     * @since 1.8     */    abstract static class StatefulOp<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {     //省略    }
复制代码



至于其他的中间操作,套路都是类似的,操作逻辑封装在opWrapSink()方法里, 可以慢慢的看。

疑问解答

  1. 各个中间操作是如何进行关联的?

  2. 一个个的操作封装成了一个个的statelessOpstateFulOp对象,以双向链表的方法串起来。

  3. 如何执行完一个中间操作,然后执行下一个?

  4. Sink 类负责流水线操作的承接上下游和执行操作的任务,核心方法为 begain()、accept()、end()。

  5. 有状态的中间操作是怎么保存状态的?

  6. 有状态的中间操作封装成stateFulOp对象,各自都有独立的逻辑,具体的参考sort()的实现逻辑。

  7. 懒加载如何实现的

  8. 每个中间操作调用后,只是 append 在流程的尾部,保存了关联关系而已。

  9. 流水线操作的启动,要交给 wrapAndCopyInto()方法调用 Head 的 Sink()操作,而 wrapAndCopyInto()方法都需要由终结操作进行触发。

终结操作

几个疑问

  1. 终结方法是如何进行操作的?

  2. 如何实现由终结操作触发流的运作的?

  3. 如何保证一个流一次只能执行一个终结方法?

使用方式

列举四种终结操作,在 Stream 提供的 API 中,也是四类:



// 计算流中元素数量,FindOPlong count = afterLimit.count();
// 遍历所有元素,ForEachOpafterLimit.forEach(System.out::printl);
// 获取第一个元素,MatchOpOptional<Integer> any = afterLimit.findFirst();
// 判断是否,ReduceOpboolean flag = afterLimit.anyMatch(i -> i == 1);
复制代码

count()

ReferencePipeline类中实现


@Overridepublic final long count() {   // 调用mapToLong将所有元素变成1,然后计算sum    return mapToLong(e -> 1L).sum();}
复制代码

maoToLong()

mapToLong()方法,是一个中间操作


@Override    public final LongStream mapToLong(ToLongFunction<? super P_OUT> mapper) {        Objects.requireNonNull(mapper);        return new LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {            @Override            Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) {                return new Sink.ChainedReference<P_OUT, Long>(sink) {                    @Override                    public void accept(P_OUT u) {                        //                        downstream.accept(mapper.applyAsLong(u));                    }                };            }        };    }
复制代码


ToLongFunction 是一个函数式接口类, accept()里的逻辑便是e -> 1L.


@FunctionalInterfacepublic interface ToLongFunction<T> {
/** * Applies this function to the given argument. * * @param value the function argument * @return the function result */ long applyAsLong(T value);}
复制代码


看下 Sum()方法,在 LongPipeline 类中,传入参数是一个 Long::sum, sum 的作用是相加两个值。


@Overridepublic final long sum() {    // use better algorithm to compensate for intermediate overflow?    return reduce(0, Long::sum);}
//public static long sum(long a, long b) {// return a + b;//}
复制代码

reduce()

reduce 方法,将操作函数 op 封装成一个 Sink,makeLong()的作用就是会生产一个 Sink


@Overridepublic final long reduce(long identity, LongBinaryOperator op) {    return evaluate(ReduceOps.makeLong(identity, op));}
复制代码


    /**     * Constructs a {@code TerminalOp} that implements a functional reduce on     * {@code long} values.     *     * @param identity the identity for the combining function     * @param operator the combining function     * @return a {@code TerminalOp} implementing the reduction     */    public static TerminalOp<Long, Long>    makeLong(long identity, LongBinaryOperator operator) {        Objects.requireNonNull(operator);        class ReducingSink                implements AccumulatingSink<Long, Long, ReducingSink>, Sink.OfLong {                            //state是一个用作记录的值            private long state;
@Override public void begin(long size) { state = identity; }
//参数传进来的就是sun(),所以这里的accept()的作用就是对state不断进行累加 @Override public void accept(long t) { state = operator.applyAsLong(state, t); }
@Override public Long get() { return state; }
@Override public void combine(ReducingSink other) { accept(other.state); } } return new ReduceOp<Long, Long, ReducingSink>(StreamShape.LONG_VALUE) { @Override public ReducingSink makeSink() { return new ReducingSink(); } }; }
复制代码

evaluate()

看回 evaluate()方法,这个方法用来执行终结操作的


final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {    assert getOutputShape() == terminalOp.inputShape();    //判断流是否已被使用    if (linkedOrConsumed)        throw new IllegalStateException(MSG_STREAM_LINKED);    //设置使用标记为true    linkedOrConsumed = true;
//根据流类型,执行相应的推断操作 return isParallel() ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));}
复制代码


关注时序流的推断方法,可以看到这个方法的实现分为四种,对应上面提到的四类类型操作,count 属于 ReduceOp,进去看下。




@Overridepublic <P_IN> R evaluateSequential(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) { //调用wrapAndCopyInto()方法 return helper.wrapAndCopyInto(makeSink(), spliterator).get();}
复制代码

wrapAndCopyInto()

保证所有 stage -> sink 链表,然后执行 copyInto()方法


  @Override    final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {        copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);        return sink;    }
复制代码

warpSink()

就是在这里,从后向前,包装所有的 stage 阶段,形成一条 sink 链表。这样将之前一个个 stage 的链表结构包装成一个个 Sink。


  @Override    @SuppressWarnings("unchecked")    final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {        Objects.requireNonNull(sink);            //从后向前遍历        for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {            //执行每个opWrapSink()方法,这个方法在每个操作类中都进行了重写            sink = p.opWrapSink(p.previousStage.combinedFlags, sink);        }        //返回头sink        return (Sink<P_IN>) sink;    }
复制代码

copyInto()

这个方法是整个流水线的启动开关,流程如下:


  1. 调用第一个 sink 的 begin()方法

  2. 执行数据源的 spliterator.forEachRemaining(wrappendSink)方法遍历调用 accept()方法

  3. end() 通知结束


  @Override    final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {        Objects.requireNonNull(wrappedSink);
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) { //通知第一个sink,做好准备接受流 wrappedSink.begin(spliterator.getExactSizeIfKnown()); //执行 spliterator.forEachRemaining(wrappedSink); wrappedSink.end(); } else { copyIntoWithCancel(wrappedSink, spliterator); } }
复制代码

forEachRemaining()

在各个容器中都有实现 forEachRemaining()这个方法,在 ArrayList 中:


public boolean tryAdvance(Consumer<? super E> action) {    if (action == null)        throw new NullPointerException();    int hi = getFence(), i = index;    if (i < hi) {        index = i + 1;        @SuppressWarnings("unchecked") E e = (E)list.elementData[i];        //执行accept()方法        action.accept(e);        if (list.modCount != expectedModCount)            throw new ConcurrentModificationException();        return true;    }    return false;}
复制代码

其他终结操作

forEach()

ReferencePipeline类中,实现了 forEach()方法,


// from ReferencePipeline.class
@Overridepublic void forEach(Consumer<? super P_OUT> action) { // ForEachOps.. evaluate(ForEachOps.makeRef(action, false));}
复制代码


evaluate 后面的逻辑与 count 后面的一样了,略。

findFirst() anyMatch()

findFirst()和 anyMatch()的逻辑也不再多说了,一个套路,看下实现


@Overridepublic final Optional<P_OUT> findFirst() {    return evaluate(FindOps.makeRef(true));}
@Overridepublic final boolean anyMatch(Predicate<? super P_OUT> predicate) { return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ANY));}
复制代码

疑问解答

  1. 终结方法是如何进行操作的?

  2. 终结操作的实现里面都有调用 evaluate()方法,这个方法最后会 warp 所有操作变成一串 sink,然后从头开始执行 begin(),accept(),end()方法

  3. 如何实现由终结操作触发流的运作的?

  4. 触发的开关是 wrapAndCopyInto(),这个方法只有在终结操作中才有被调用。

  5. 如何保证一个流一次只能执行一个终结方法?

  6. evaluate()方法中执行一次后linkedOrConsumed设为 true,后续再出发 evaluate()方法就会异常。


参考引用:


发布于: 6 小时前阅读数: 9
用户头像

Zexho

关注

芝兰生于幽谷,不因无人而不芳 2019.02.15 加入

https://github.com/zouzhihao-994

评论

发布
暂无评论
Java Stream 源码深入解析