Stream—一個早產的嬰兒

當你會關注這篇文章時,那麼意味著你對Stream或多或少有些了解,甚至你在許多業務中有所應用。正如你所知,業界對Streamlambda褒貶不一,有人認為它是銀彈,也有人認為其降低了程式碼的可讀性。事實上,很多東西我們應該辯證的去看待,一方面Stream相關的api的確提供了諸多的便利,如果你願意花時間去理解和使用的話;然而另一方面,它像一個早產的嬰兒,當你去閱讀它源碼時,你會覺得詫異,像是一個臨時拼湊而成的模組。
在前面的Java函數式編程的前生今世篇章中,我們已經了解了lambda表達式的原理,以及常見的四大函數式介面。
我們可以先看一個Stream的demo:

Stream.of(1, 2, 3)
                .filter(num -> num > 2)
                .forEach(System.out::println);

語義比較清晰,從一個list中獲取數值大於2的,最後給列印出來。

源頭

在調用StreamAPI之前,我們都需要先創建一個Steam流,Stream流的創建方式有很多種,比如上述demo中的Stream.of,其使用的是StreamSupport這個類提供的方法;還有在集合類中在1.8之後預留了stream的獲取方法等。

//StreamSupport
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
        Objects.requireNonNull(spliterator);
        return new ReferencePipeline.Head<>(spliterator,
                                            StreamOpFlag.fromCharacteristics(spliterator),
                                            parallel);
    }
//Collection
    default Stream<E> stream() {
        return StreamSupport.stream(spliterator(), false);
    }

這裡可以稍微留意一下,有一個parallel參數,為我們後文去執行作準備。
不知道看到這裡你是否也會有同樣的疑惑:為什麼Stream明明是一個介面,要在裡面做static的實現?
這與以往的JDK程式碼有較大的出入,一般靜態功能都會提供一個xxxs來處理,比如PathPathsFileFiles等。而且更令人詫異的是,在1.8之後,這種靜態方法在ListCollection中比比皆是。
坦率地講,這並非一種好的設計,嚴格來講,介面只是聲明,不應該承載具體實現,雖然從語法而言提供了這種能力,但這並不意味著我們只有這樣才能實現。而這也像是對過去設計的妥協。
我們回到Stream,前面兩種方法都提到了,會返回一個Stream流。

default Stream<E> stream() {
   return StreamSupport.stream(spliterator(), false);
}

最開始當我看到StreamSupport這個類時,我第一感覺是類似於LockSupport,用於「輔助」,而非「創建」。然而事與願違的是,它更多的做的是「創建」。其實熟悉JDK源碼的人應該比較清楚,這種「創建」的事情,一般是在xxs(比如Paths)這種類中處理。
當然,這個僅是個人主觀的臆斷,也許他們內部並沒有這種「約定俗成」的東西。

public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
        Objects.requireNonNull(spliterator);
        return new ReferencePipeline.Head<>(spliterator,
                                            StreamOpFlag.fromCharacteristics(spliterator),
                                            parallel);
    }

ReferencePipeline.Head是所有流處理的源頭,ReferencePipeline繼承自AbstractPipelineSpliterator用於對數據迭代並加工,其中有一個較為關鍵的方法forEachRemaining,我們後面也會提到。

//創建頭節點
    AbstractPipeline(Spliterator<?> source,
                     int sourceFlags, boolean parallel) {
        this.previousStage = null;
        this.sourceSpliterator = source;
        this.sourceStage = this;
        this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
        // The following is an optimization of:
        // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
        this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
        this.depth = 0;
        this.parallel = parallel;
    }

頭節點,包括後面流水線的節點都繼承自這個AbstractPipeline,你會發現這裡的結構是一個雙向鏈表,通過previousStagenextStage來分別用於指向前一個和後一個節點。

2020-04-21T16:01:07.png

流水線

Stream體系中,操作被劃分成了兩種,一種流操作,他所做的事情是對數據的加工,而在流操作內部,又被劃分成了兩種,一種是有狀態的流(StatefulOp),一種是無狀態的流(StatelessOp),二者的區別在於,數據是否會隨著操作中的變化而變化,舉個例子,filter是無狀態的,你要過濾什麼就是什麼,而sort是有狀態的,在兩個執行緒中,如果你在數據層增加了數據或修改了數據,那麼二者最後得到的結果可能並不一致;

A stateful lambda expression is one whose result depends on any state that might change during the execution of a pipeline. On the other hand, a stateless lambda expression is one whose result does not depend on any state that might change during the execution of a pipeline.

另外一種是終止操作(TerminalOp),他意味著開始對流進行執行操作,如果程式碼中僅有流操作,那麼這個流是不會開始執行的,因為流返回的都是一個新的對象。

Stream中,流操作有很多種,比如常見的filtermapmapToInt等,都會在方法中返回一個新建的流操作對象,而這個對象也繼承了AbstractPipeline

//filter操作
    @Override
    public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
        Objects.requireNonNull(predicate);
        //這裡的this就是前面提到的流的源頭
        return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SIZED) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
                return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                    @Override
                    public void begin(long size) {
                        downstream.begin(-1);
                    }

                    @Override
                    public void accept(P_OUT u) {
                        if (predicate.test(u))
                            downstream.accept(u);
                    }
                };
            }
        };
    }

//StatelessOp類
    abstract static class StatelessOp<E_IN, E_OUT>
            extends ReferencePipeline<E_IN, E_OUT> {
        /**
         * Construct a new Stream by appending a stateless intermediate
         * operation to an existing stream.
         *
         * @param upstream The upstream pipeline stage
         * @param inputShape The stream shape for the upstream pipeline stage
         * @param opFlags Operation flags for the new stage
         */
        StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
                    StreamShape inputShape,
                    int opFlags) {
            super(upstream, opFlags);
            assert upstream.getOutputShape() == inputShape;
        }

        @Override
        final boolean opIsStateful() {
            return false;
        }
    }

//StatelessOp最終也繼承自AbstractPipeline
    AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
        if (previousStage.linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        previousStage.linkedOrConsumed = true;
        previousStage.nextStage = this;

        this.previousStage = previousStage;
        this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
        this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
        this.sourceStage = previousStage.sourceStage;
        if (opIsStateful())
            sourceStage.sourceAnyStateful = true;
        this.depth = previousStage.depth + 1;
    }

StatelessOp對象在創建時,會注入一個參數this,而這個this即我們前面提到的流的源頭。在AbstractPipeline的另外一個構造方法中,完成了雙向鏈表的指定以及深度的自增。

這裡有一個方法opIsStateful,用於判定前面提到的是否是有狀態的。

終止符

所有的流操作的執行,都取決於最終的終止操作(TerminalOp),如果流中沒有這個操作,那麼前面提到的操作流都無法執行。
而所有的終止操作都實現了TerminalOp這個介面,包括向我們常見的foreachreducefind等。我們還是以前面例子中提到的foreach來演示我們的原理。

//Stream
void forEach(Consumer<? super T> action);

//ReferencePipeline中的forEach實現
    @Override
public void forEach(Consumer<? super P_OUT> action) {
   evaluate(ForEachOps.makeRef(action, false));
}

StreamforEach方法中,有一個參數Consumer,是一個函數式介面,我們在前面的文章中有所涉及,有興趣的可以自行查閱其原理。

//ForEachOps
static final class OfRef<T> extends ForEachOp<T> {
            final Consumer<? super T> consumer;

            OfRef(Consumer<? super T> consumer, boolean ordered) {
                super(ordered);
                this.consumer = consumer;
            }

            @Override
            public void accept(T t) {
                consumer.accept(t);
            }
        }

ForEachOps有一個ForEachOp類用於生成操作類,同時,ForEachOp還實現了TerminalSink,後面會提到。不過,還有另外一個OfRef來繼承自ForEachOp作為調用入口去使用,不過至今我還沒明白這裡為何單獨需要在ForEachOp下面再嵌套一層,有了解的可以告知我一下。

//AbstractPipeline
    final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
        assert getOutputShape() == terminalOp.inputShape();
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;

//用於判定是並行還是串列
        return isParallel()
               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    }

    @Override
    public final boolean isParallel() {
        return sourceStage.parallel;
    }

這裡會根據最開始的源頭注入的parallel來判定,在前面也有所提及。這裡有一個方法sourceSpliterator用於協助我們去獲取數據源分割器,其實在前面有所提及,在創建流的時候,就已經有自動創建一個spliterator,如果是串列流,那麼會直接使用源頭流的分割器,如果是並行流,而且其中有有狀態的操作,那麼會使用這個狀態流實現的方法去返回。

//AbstractPipeline
    @SuppressWarnings("unchecked")
    private Spliterator<?> sourceSpliterator(int terminalFlags) {
        // Get the source spliterator of the pipeline
        Spliterator<?> spliterator = null;
       //最開始的源頭流的分割器
        if (sourceStage.sourceSpliterator != null) {
            spliterator = sourceStage.sourceSpliterator;
            sourceStage.sourceSpliterator = null;
        }
        else if (sourceStage.sourceSupplier != null) {
            spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get();
            sourceStage.sourceSupplier = null;
        }
        else {
            throw new IllegalStateException(MSG_CONSUMED);
        }

     //如果是並行流並且有有狀態的操作流
        if (isParallel() && sourceStage.sourceAnyStateful) {
            // Adapt the source spliterator, evaluating each stateful op
            // in the pipeline up to and including this pipeline stage.
            // The depth and flags of each pipeline stage are adjusted accordingly.
            int depth = 1;
            for (@SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;
                 u != e;
                 u = p, p = p.nextStage) {

                int thisOpFlags = p.sourceOrOpFlags;
                if (p.opIsStateful()) {
                    depth = 0;

                    if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) {
                        // Clear the short circuit flag for next pipeline stage
                        // This stage encapsulates short-circuiting, the next
                        // stage may not have any short-circuit operations, and
                        // if so spliterator.forEachRemaining should be used
                        // for traversal
                        thisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT;
                    }

                    spliterator = p.opEvaluateParallelLazy(u, spliterator);

                    // Inject or clear SIZED on the source pipeline stage
                    // based on the stage's spliterator
                    thisOpFlags = spliterator.hasCharacteristics(Spliterator.SIZED)
                            ? (thisOpFlags & ~StreamOpFlag.NOT_SIZED) | StreamOpFlag.IS_SIZED
                            : (thisOpFlags & ~StreamOpFlag.IS_SIZED) | StreamOpFlag.NOT_SIZED;
                }
                p.depth = depth++;
                p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags);
            }
        }

        if (terminalFlags != 0)  {
            // Apply flags from the terminal operation to last pipeline stage
            combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags);
        }

        return spliterator;
    }

在我們拿到分割器之後,我們會調用terminalOp.evaluateSequential方法去處理。需要說明的是,並行流我暫時沒有深入研究,所以暫時不在此章的討論範疇,後續有機會我會補充上去。

//ForEachOps
        @Override
        public <S> Void evaluateSequential(PipelineHelper<T> helper,
                                           Spliterator<S> spliterator) {
//這裡的helper也就是前面在AbstractPipeline中注入的this
            return helper.wrapAndCopyInto(this, spliterator).get();
        }

//AbstractPipeline 
    @Override
    final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
        copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
        return sink;
    }

    @Override
    @SuppressWarnings("unchecked")
    final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
        Objects.requireNonNull(sink);
//遍歷流鏈表,逐一執行前面的opWrapSink方法
        for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
            sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
        }
        return (Sink<P_IN>) sink;
    }

在操作流中,一般會返回一個StatelessOp類,這裡前面有所提及,中間有一個opWrapSink就是現在我們在調用的方法,而在這個方法中,又會繼續返回一個類Sink.ChainedReference,這個類會在downstream記錄我們傳入的sink,也就是我們目前正在操作的ForEachOp

//前面的filter
    @Override
    public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
        Objects.requireNonNull(predicate);
        return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SIZED) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
                //繼續返回一個類,記錄terminalOp
                return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                    @Override
                    public void begin(long size) {
                        downstream.begin(-1);
                    }

                    @Override
                    public void accept(P_OUT u) {
                        if (predicate.test(u))
                            downstream.accept(u);
                    }
                };
            }
        };
    }

sink也是一個簡單的單項鏈表,他的順序與Stream相反,通過downStream一層層向前指定。在獲取到最前面一層包裝好的sink之後,我們繼續看copyInto方法。

//AbstractPipeline
    @Override
    final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
        //這裡的wrappedSink是最前面的流操作,也就是我們生成流之後的第一個操作,在此案例中也就是filter
        Objects.requireNonNull(wrappedSink);

        if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
            wrappedSink.begin(spliterator.getExactSizeIfKnown());
            //調用分割器的遍歷方法
            spliterator.forEachRemaining(wrappedSink);
            wrappedSink.end();
        }
        else {
            copyIntoWithCancel(wrappedSink, spliterator);
        }
    }

//Spliterators
        public void forEachRemaining(Consumer<? super T> action) {
            Object[] a; int i, hi; // hoist accesses and checks from loop
            if (action == null)
                throw new NullPointerException();
            if ((a = array).length >= (hi = fence) &&
                (i = index) >= 0 && i < (index = hi)) {
               //將數據源遍歷,執行sink中的accept方法
                do { action.accept((T)a[i]); } while (++i < hi);
            }
        }

//filter accept方法被遍歷執行
                    @Override
                    public void accept(P_OUT u) {
//這裡的predicate也就是我們最開始通過lambda表達式創建的action
                        if (predicate.test(u))
//如果檢測通過,那麼執行downstream也就是ForEach.OfRef類的accept方法
                            downstream.accept(u);
                    }

//OfRef accept被調用
            @Override
            public void accept(T t) {
//這裡的consumer也就是我們stream.foreach調用時注入的System.out::println
                consumer.accept(t);
            }

Spliterators通過遍歷所有數據源,執行filteraccept方法,如果校驗通過,那麼會執行downstreamaccept方法,而這個downstream我們已經提及很多次,也就是我們這個例子中的foreachforeachaccept被調用時,此時又有一個consumer,這裡的consumer也就是我們最開始例子中的System.out::println
至此,整體流程就執行完畢了。

回到我們的標題,為什麼說stream是一個「早產的嬰兒」呢?在對stream整體源碼有所大體閱讀之後,你會發現很多類的命名、類的設計風格、以及結構的整理設計能力與之前的模組有較大的差異,有些命名明明可以更為規範,有些設計明明可以設計的更為優雅,甚至於,許多地方的設計還不夠簡練,這裡就不一一舉例了。當然,這一切都只是我個人的想法,也有可能是我的水平還沒到達另外一個層次,或許幾年之後再拜讀時又會有不一樣的感悟。

歡迎關注我的公眾號,每周至少一篇比較有深度的原創文章:
2020-04-23T15:32:19.png

Tags: