[源碼解析] 當 Java Stream 遇見 Flink

[源碼解析] 當 Java Stream 遇見 Flink

0x00 摘要

在分析Alink源碼的時候,發現Alink使用了 Java Stream,又去Flink源碼搜索,發現Flink也有大量使用。一時興起,想看看 Java Stream 和 Flink 這種流處理框架的異同點。當然這種比較還是注重於理念和設計思路上的。因為就應用領域和複雜程度來說, Java Stream 和 Flink 屬於數量級別的差距。

因為Flink的分析文章我寫了一些,所以本文源碼的分析重點在Java Stream上。

0x01 領域

從幾個權威來源可以看看Flink本質:

  • 我們直接從官網找出Flink本質:Apache Flink® — Stateful Computations over Data Streams,即 數據流上的有狀態計算

  • 從github上看:Apache Flink is an open source stream processing framework with powerful stream- and batch-processing capabilities.

  • 從百度百科上看:Flink 其核心是用Java和Scala編寫的分佈式流數據流引擎。Flink以數據並行和流水線方式執行任意流數據程序,Flink的流水線運行時系統可以執行批處理和流處理程序。

因此可以總結如下:Flink 是分佈式流數據計算,引擎,框架,系統,各種高大上 ……

1.2 Java Stream

直接看 java doc

Stream :A sequence of elements supporting sequential and parallel aggregate operations.

從其他網址看:

Java 8 API 添加了一個新的抽象稱為流Stream,可以讓你以一種聲明的方式處理數據。

  • 這種風格將要處理的元素集合看作一種流, 流在管道中傳輸, 並且可以在管道的節點上進行處理, 比如篩選, 排序,聚合等。

  • 元素流在管道中經過中間操作(intermediate operation)的處理,最後由最終操作(terminal operation)得到前面處理的結果。

因此可以看到,Java Stream 是流抽象API,可以使用並行操作。

1.3 探尋角度

因此我們可以看出,Flink 和 Java Stream 最值得比較的三個方面就是:數據流模型,流水線,數據並行

下面我們就從這三個角度來分析。

0x02 數據流模型

2.1 Java Stream

Stream編程風格將要處理的元素集合看作一種流, 流在管道中傳輸, 並且可以在管道的節點上進行處理, 比如篩選, 排序,聚合等。

元素流在管道中經過中間操作(intermediate operation)的處理,最後由最終操作(terminal operation)得到前面處理的結果。

+--------------------+       +------+   +------+   +---+   +-------+
| stream of elements +-----> |filter+-> |sorted+-> |map+-> |collect|
+--------------------+       +------+   +------+   +---+   +-------+

以上的流程轉換為 Java 代碼為:

List<Integer> transactionsIds = widgets.stream()
             .filter(b -> b.getColor() == RED)
             .sorted((x,y) -> x.getWeight() - y.getWeight())
             .mapToInt(Widget::getWeight)
             .sum();

官方樣例如下

case class WordWithCount(word: String, count: Long)

val text = env.socketTextStream(host, port, '\n')

val windowCounts = text.flatMap { w => w.split("\\s") }
  .map { w => WordWithCount(w, 1) }
  .keyBy("word")
  .timeWindow(Time.seconds(5))
  .sum("count")

windowCounts.print()

2.3 分析

可以看出來,大家思路都很類似,就是用一種類似用 SQL 語句從數據庫查詢數據的直觀方式來提供一種對運算和表達的高階抽象。這種抽象其實在目前已經是很多框架和語言的必備了。用起來都很爽,調試起來都崩潰。

0x03 流水線

本部分以 Java Stream為主,如果和Flink比較則會重點指出

3.1 總體對比

Java Stream 的流水線是在JVM內部,各種用戶自定義函數都是在JVM中隨意訪問。

Flink的流水線節點可能分佈在不同機器的JVM上,用戶jar包需要提交給不同的JVM。

Flink 中的執行圖可以分成四層:StreamGraph —> JobGraph —> ExecutionGraph -> 物理執行圖

  • StreamGraph:是對用戶邏輯的映射,代表程序的拓撲結構,是根據用戶通過 Stream API 編寫的代碼生成的最初的圖。
  • JobGraph:StreamGraph經過優化後生成了 JobGraph,提交給 JobManager 的數據結構。主要的優化為,將多個符合條件的節點 chain 在一起作為一個節點,這樣可以減少數據在節點之間流動所需要的序列化/反序列化/傳輸消耗。
  • ExecutionGraph:JobManager 根據 JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的並行化版本,是調度層最核心的數據結構。
  • 物理執行圖:JobManager 根據 ExecutionGraph 對 Job 進行調度後,在各個TaskManager 上部署 Task 後形成的「圖」,並不是一個具體的數據結構。

Java Stream 的流水線可以分為兩層:Stage —> Sink,即 “流水線構建階段” 和 “流水線執行階段”。

  • Stage: Stage是概念上的構建。此階段類似於Flink的StreamGraph,每一個Stage相當於StreamNode
  • Sink: Sink 接口是執行階段用到的。類似於Flink中的ExecutionGraph,每一個Sink相當於ExecutionVertex

3.2 示例代碼

這裡的示例代碼如下:

import com.google.common.collect.Lists;
import java.util.List;
import java.util.stream.Collectors;

public class Java8Stream {
    public static void main(String[] args) {
        List<String> list = Lists.newArrayList(
                "bcd", "cde", "def", "abc");
        List<String> result = list.stream()
                .filter(e -> e.length() >= 3)
                .map(e -> e.charAt(0))
                .map(e -> String.valueOf(e))
                .collect(Collectors.toList());
        System.out.println(result);
    }
}

3.3 Stream操作分類

Java Stream上的所有操作分為兩類:中間操作和結束操作。Flink算子其實也是這麼區分,只不過沒有像 Java Stream 這麼做而已

  • 中間操作只是一種標記,只有結束操作才會觸發實際計算。中間操作又可以分為無狀態的(Stateless)和有狀態的(Stateful),無狀態中間操作是指元素的處理不受前面元素的影響,而有狀態的中間操作必須等到所有元素處理之後才知道最終結果,比如排序是有狀態操作,在讀取所有元素之前並不能確定排序結果;
  • 結束操作又可以分為短路操作和非短路操作,短路操作是指不用處理全部元素就可以返回結果,比如找到第一個滿足條件的元素。之所以要進行如此精細的劃分,是因為底層對每一種情況的處理方式不同。

具體如下:

Stream操作分類
中間操作(Intermediate operations) 無狀態(Stateless) unordered() filter() map() mapToInt() mapToLong() mapToDouble() flatMap() flatMapToInt() flatMapToLong() flatMapToDouble() peek()
有狀態(Stateful) distinct() sorted() sorted() limit() skip()
結束操作(Terminal operations) 非短路操作 forEach() forEachOrdered() toArray() reduce() collect() max() min() count()
短路操作(short-circuiting) anyMatch() allMatch() noneMatch() findFirst() findAny()

3.4 Stage

3.4.1 分類

源碼中把 Stream 的一個操作稱為一個 stage,即由很多Stage構成了流水線Flink則要比它複雜很多

Stage包括三類Head,StatefulOp,StatelessOp,它們的繼承鏈是這樣的:

Head -----> ReferencePipeline -----> AbstractPipeline -------> PipelineHelper
    
StatefulOp -----> ReferencePipeline -----> AbstractPipeline -------> PipelineHelper
    
StatelessOp -----> ReferencePipeline -----> AbstractPipeline -------> PipelineHelper
    
ReferencePipeline 繼承了 AbstractPipeline 和 Stream   

PipelineHelper主要用於Stream執行過程中相關結構的構建。

AbstractPipeline是流水線的核心抽象類,用於構建和管理流水線。它的實現類就是流水線的節點。

AbstractPipeline的直接實現類為ReferencePipeline,而Head 、StatefulOp 、StatelessOp又繼承了ReferencePipeline類。因此Head / StatefulOp / StatelessOp 本身也是AbstractPipeline類型的。

3.4.2 AbstractPipeline

AbstractPipeline是流水線的核心,每一個stage就是一個AbstractPipeline的實例,這裡的每一個pipeline都是一個節點。AbstractPipeline中定義了三個AbstractPipeline類型的變量:

  • sourceStage(源階段),即保存的 Head 頭節點引用,用於獲取保存在頭節點關於整個 Stream 處理流程中的關鍵信息,如是否是並行模式;
  • previousStage(上游pipeline,前一階段),即當前中間操作節點的上一個節點,因為 Head 為整個雙向鏈表最上游,故其前一個節點為 null;
  • nextStage(下一階段);

能看到 prev,next 這就是指向前後兩個stage,用來構建一個雙向鏈表。

abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>> extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {
    private final AbstractPipeline sourceStage;
    private final AbstractPipeline previousStage;
    private AbstractPipeline nextStage;
}

其他比較重要的屬性如下:

  1. sourceSpliterator 數據源的可分解迭代器,並行流中分解任務所需
  2. depth 當前節點的深度,Head 頭節點深度為 0,該值在並行流大任務fork()分解子任務時可用於維護任務層級
  3. parallel 是否是並行模式,決定了是否啟用 ForkJoinPool 用於並行執行任務

3.5 流水線構建階段

這部分只是Stage這裡這是概念上的構建。類似於Flink的StreamGraph。也為後續的運行做了準備。

示例代碼中,通過Collection.stream()方法得到Head也就是stage0,緊接着調用一系列的中間操作,不斷產生新的Stream。這些Stream對象以雙向鏈表的形式組織在一起,構成整個流水線,由於每個Stage都記錄了前一個Stage和本次的操作以及回調函數,依靠這種結構就能建立起對數據源的所有操作。這就是Stream記錄操作的方式。

每一步Stream的方法調用都產生一個新的stage,這些stage會以雙向鏈表的方式鏈接,而每個stage都記錄了每一個階段的操作,這樣我們就可以依賴這種數據結構來保存對數據源的所有操作了。

3.5.1 Head

Head 用於表示第一個Stage,也就是source stage,調用諸如Collection.stream()方法產生的Stage,很顯然這個Stage里不包含任何操作;Head就類似於Flink的Source

從程序開始看起。

list.stream()

會調用到

default Stream<E> stream() {
	return StreamSupport.stream(spliterator(), false);
}

然後會建立一個Stream,這個Stream就是一個 ReferencePipeline.Head。這裡Head也是一個ReferencePipeline。 static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT>

public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
	return new ReferencePipeline.Head<>(spliterator,
                                           StreamOpFlag.fromCharacteristics(spliterator),
                                           parallel);
}

3.5.2 中間操作

StatelessOp和StatefulOp分別表示無狀態和有狀態的Stage,對應於無狀態和有狀態的中間操作

這種屬於算子的邏輯概念,Flink對應的算子也具有類似的區別

示例代碼中,可以看到 filter 返回了一個無狀態stage,也是一個AbstractPipeline、stream,即是流水線的一個階段。同時還實現了AbstractPipeline定義的opWrapSink方法。其重寫的 opWrapSink() 規定了該操作的下游操作的Sink是如何組織數據處理邏輯的。

後續的filter,map都分別構建了一個StatelessOp。這裡需要注意的是:每個StatelessOp都在其內部有opWrapSink函數,如果調用opWrapSink時候,就會生成一個Sink,其作用我們當分析到程序運行時候會講解

public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
        Objects.requireNonNull(predicate);
    
        return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SIZED) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
                return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                    @Override
                    public void begin(long size) {
                        downstream.begin(-1);
                    }

                    @Override
                    public void accept(P_OUT u) {
                        if (predicate.test(u))
                            downstream.accept(u);
                    }
                };
            }
        };
}

這裡要注意的是構建了一個雙向鏈表。比如filter的構建最終調用到:

AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
        previousStage.nextStage = this; // 構建雙向鏈表
      	this.previousStage = previousStage; // 構建雙向鏈表
        this.sourceStage = previousStage.sourceStage;
        ......
}

所以我們最終得到的,用stage標識的流水線就是如下,注意這裡都是雙向鏈表。

+------------+       
| Collection |
+------------+                  
      │ 
      │ stream()
      │ 
+------------+       
|    Head    |
+------------+     
      │ 
      │ filter()
      │
+-------------+       
| StatelessOp |
+-------------+     
      │ 
      │ map()
      │ 
+-------------+       
| StatelessOp |
+-------------+     
      │ 
      │ map()
      │  
+-------------+       
| StatelessOp |
+-------------+     
      │ 
      │ collect()
      │
+------------+       
| TerminalOp |
+------------+     

運行時的流水線摘錄如下:

this = {ReferencePipeline$2$1@767} 

 this$1 = {ReferencePipeline$2@688} 
  predicate = {Java8Stream$lambda@681} 
  this$0 = {ReferencePipeline$Head@682} 
  sourceStage = {ReferencePipeline$Head@682} 
  previousStage = {ReferencePipeline$Head@682} 
  sourceOrOpFlags = 128
  nextStage = {ReferencePipeline$3@691} 
  depth = 1
  combinedFlags = 159
  sourceSpliterator = null
  sourceSupplier = null
  linkedOrConsumed = true
  sourceAnyStateful = false
  sourceCloseAction = null
  parallel = false
    
 downstream = {ReferencePipeline$3$1@768} 
  this$1 = {ReferencePipeline$3@691} 
  downstream = {ReferencePipeline$3$1@770} 
   this$1 = {ReferencePipeline$3@694} 
   downstream = {ReduceOps$3ReducingSink@771} 
    supplier = {Collectors$lambda@772} 
    accumulator = {Collectors$lambda@773} 
    combiner = {Collectors$lambda@774} 
    state = {ArrayList@775}  size = 0

Java Stream相對簡單,使用 Stage 一個數據結構就都搞定(比如雙向鏈表本身就是Stage雙向鏈表),而Flink則要複雜多了,比如:

  • StreamNode 是用來描述 operator 的邏輯節點,並具有所有相關的屬性,如並發度、入邊和出邊等。
  • StreamEdge 是用來描述兩個 StreamNode(operator) 邏輯的鏈接邊。

3.6 流水線執行階段

因為Stream 是一個惰性求值的系統,所以直到當執行如下時候,才會進行最後求值。這一步驟就相當於Flink程序需要加一個 print,env.execute 才能運行

.collect(Collectors.toList());

調用時候涉及到的部分調用棧如下:

makeSink:180, ReduceOps$3 (java.util.stream)
makeSink:177, ReduceOps$3 (java.util.stream)
evaluateSequential:708, ReduceOps$ReduceOp (java.util.stream)
evaluate:234, AbstractPipeline (java.util.stream)
collect:499, ReferencePipeline (java.util.stream)
main:20, Java8Stream (com.alibaba.alink)

這就牽扯出來Java Stream的另外一部分操作:結束操作(Terminal operations)。

3.6.1 TerminalOp

TerminalOp是流水線上的一個算子,其完成了最後的計算操作。在FindOp、ForEachOp、MatchOp 和 ReduceOp 中會覆蓋其evaluateParallel函數。

注意:終結操作不會添加節點

3.6.2 ReduceOp

ReduceOp是TerminalOp的一個具體實現,其執行了一個reduce操作。可以看到 makeSink() 這裡做了一個Sink。

每個Stage都會將自己的操作封裝到一個Sink里,前一個Stage只需調用後一個Stage的accept()方法即可,並不需要知道其內部是如何處理的。

private static abstract class ReduceOp<T, R, S extends AccumulatingSink<T, R, S>>
            implements TerminalOp<T, R> {
        private final StreamShape inputShape;

        public abstract S makeSink();

        @Override
        public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
                                           Spliterator<P_IN> spliterator) {
            return helper.wrapAndCopyInto(makeSink(), spliterator).get();
        }

        @Override
        public <P_IN> R evaluateParallel(PipelineHelper<T> helper,
                                         Spliterator<P_IN> spliterator) {
            return new ReduceTask<>(this, helper, spliterator).invoke().get();
        }
}

3.6.3 Sink

Sink接口是執行階段用到的。類似於Flink中的ExecutionVertex

在上一步已經在stage中記錄了每一步操作,此時並沒有執行。但是stage只是保存了當前的操作,並不能確定下一個stage需要何種操作,何種數據。

JDK為此定義了Sink接口來處理具體操作

interface Sink<T> extends Consumer<T>

Sink接口,其中只有begin()、end()、cancellationRequested()、accept()四個接口,其中間操作的子類中包含一個指向下游sink的指針。

方法名 作用
void begin(long size) 開始遍曆元素之前調用該方法,通知Sink做好準備。
void end() 所有元素遍歷完成之後調用,通知Sink沒有更多的元素了。
boolean cancellationRequested() 是否可以結束操作,可以讓短路操作儘早結束。
void accept(T t) 遍曆元素時調用,接受一個待處理元素,並對元素進行處理。Stage把自己包含的操作和回調方法封裝到該方法里,前一個Stage只需要調用當前Stage.accept(T t)方法就行了。

3.6.4 構建執行鏈

具體以ReduceOp為例,執行關鍵其實就是調用到了 AbstractPipeline#wrapAndCopyInto()

public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
                                   Spliterator<P_IN> spliterator) {
	return helper.wrapAndCopyInto(makeSink(), spliterator).get();
}

wrapAndCopyInto其實現如下,正如其名字示意,主要包含了兩個步驟:

  1. wrapSink() 從操作鏈表的尾部開始,調用操作對象自身重寫的 opWrapSink()方法將每一個操作對象中的數據處理邏輯封裝成 Sink.ChainedReference,並將傳入的 Sink 作為新建 Sink 的 downStream,從而形成單向調用鏈。這部分屬於構建階段
  2. copyInto()從調用鏈頭部開始執行中間操作數據處理邏輯封裝成的 Sink 對象的方法,完成對數據源的處理。其實這部分就是執行階段
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
        copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
        return sink;
}
3.6.4.1 構建Sink鏈條

當終結操作觸發時,以終結操作本身的數據處理邏輯的封裝對象 Sink 為起點,從操作鏈表尾部 stage 逆向遍歷,將操作動作中封裝的數據處理邏輯封裝成 ChaineReference 對象,並將傳入的上一個 Sink 引用賦值給新建 Sink 的 downStream 變量,從而形成單向的調用鏈。

+------+   downStream    +------+  downStream  +------+  downStream  +------+ 
| Sink +---------------> | Sink +------------> | Sink +------------> | Sink +
+------+                 +------+              +------+              +------+

從操作鏈表的尾部開始,調用操作對象自身重寫的 opWrapSink()方法將每一個操作對象中的數據處理邏輯封裝成 Sink.ChainedReference,並將傳入的 Sink 作為新建 Sink 的 downStream,從而形成單向調用鏈。

final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
        for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
            sink = p.opWrapSink(p.previousStage.combinedFlags, sink); // 從後往前處理
        }
        return (Sink<P_IN>) sink; // 返回單向調用鏈
    }

以map算子為例,就是生成 return 了一個Sink.ChainedReference(其也是一個Sink),這些Sink最後會串聯在一起,形成Sink鏈。

public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
        Objects.requireNonNull(mapper);
	return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                  StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                return new Sink.ChainedReference<P_OUT, R>(sink) { // 這裡返回Sink
                    @Override
                    public void accept(P_OUT u) {
                        downstream.accept(mapper.apply(u));
                    }
                };
            }
        };
}

ChainedReference包括:

  • begin:在遍曆元素前調用,做好遍歷準備;
  • accept:遍歷每個元素的時候調用,包含每個stage的操作和回掉函數;
  • end:遍歷結束後調用
  • cancellationRequested:是否能夠儘早結束遍歷,用於短路操作

每個stage都把操作實現在Sink里,上游stage調用下游stage的accept方法,達到按順序執行每個操作的目的。

可以看到調用完成之後

sink = {ReferencePipeline$2$1@741} 
     this$1 = {ReferencePipeline$2@687} 
     downstream = {ReferencePipeline$3$1@742} 
          this$1 = {ReferencePipeline$3@693} 
          downstream = {ReferencePipeline$3$1@739} 
               this$1 = {ReferencePipeline$3@714} 
               downstream = {ReduceOps$3ReducingSink@735} 
                    supplier = {Collectors$lambda@723} 
                    accumulator = {Collectors$lambda@722} 
                    combiner = {Collectors$lambda@721} 
                    state = null

從結束操作的sink開始,一層一層包裝sink,最後第一個中間操作的sink在最外層,在每個操作的opWrapSink方法里返回的sink都維護了一個downstream指向後一個操作,這樣,雙向鏈表的結構就完成了。這樣我們在copyInto方法里調用beginacceptend的時候就會通過downstream一層一層的調用下去,最終在結束操作執行實際計算。

Flink要複雜太多。

ExecutionGraph: JobManager根據JobGraph生成ExecutionGraph.ExecutionGraph是JobGraph的並行化版本,是調度層最核心的數據結構。下面只列舉和 Java Stream大致能對應的模塊。

  • ExecutionJobVertex:和JobGraph中的JobVertex一一對應。每一個ExecutionJobVertex都有和並發度一樣多的ExecutionVertex。
  • ExecutionVertex:表示ExecutionJobVertex的其中一個並發子任務,輸入是ExecutionEdge,輸出是IntermediateResultPartition。一個JobVertex/ExecutionJobVertex代表的是一個operator,而具體的ExecutionVertex則代表了每一個Task。
  • IntermediateResult:和JobGraph中的IntermediateDataSet一一對應每一個IntermediateResult有與下游ExecutionJobVertex相同並發數的IntermediateResultPartition。
  • IntermediateResultPartition:表示ExecutionVertex的一個輸出分區,生產者是ExecutionVertex,消費者是若干個ExecutionEdge。
  • ExecutionEdge:表示ExecutionVertex的輸入,源是IntermediateResultPartition,目標是ExecutionVertex.source和目標都只能是一個。

3.7 執行調用Sink鏈

3.7.1 調用執行

有了Sink對操作的包裝,Stage之間的調用問題就解決了,執行時只需要從流水線的head開始對數據源依次調用每個Stage對應的Sink.{ begin(), accept(), cancellationRequested(), end() }方法就可以了。一種可能的Sink.accept()方法流程是這樣的:

void accept(U u){
    1. 使用當前Sink包裝的回調函數處理 u
    2. 將處理結果傳遞給流水線下游的Sink
}

Sink接口的其他幾個方法也是按照這種[處理->轉發]的模型實現。

這就是我們前面提到的 “copyInto() 從調用鏈頭部開始執行中間操作數據處理邏輯封裝成的 Sink 對象的方法,完成對數據源的處理“,具體調用如下:

final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
        if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
            wrappedSink.begin(spliterator.getExactSizeIfKnown());
            spliterator.forEachRemaining(wrappedSink);  // 調用到這裡
            wrappedSink.end();
        }
        else {
            copyIntoWithCancel(wrappedSink, spliterator);
        }
}

會調用到 ArrayListSpliterator 的 forEachRemaining。

public void forEachRemaining(Consumer<? super E> action) {
         for (; i < hi; ++i) {
              @SuppressWarnings("unchecked") E e = (E) a[i];
              action.accept(e);
         }
}

以map算子為例,就是調用到了之前生成的 Sink.ChainedReference(其也是一個Sink)中的 accept 函數,執行本算子的業務操作,然後傳遞給下游stream調用。

public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
        Objects.requireNonNull(mapper);
	return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                  StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                return new Sink.ChainedReference<P_OUT, R>(sink) {
                    @Override
                    public void accept(P_OUT u) {
                        downstream.accept(mapper.apply(u)); // 調用到下游算子
                    }
                };
            }
        };
}

具體調用棧如下:

lambda$main$0:14, Java8Stream (com.alibaba.alink)
test:-1, 13138721 (com.alibaba.alink.Java8Stream$$Lambda$1)
accept:174, ReferencePipeline$2$1 (java.util.stream)
forEachRemaining:1374, ArrayList$ArrayListSpliterator (java.util)
copyInto:481, AbstractPipeline (java.util.stream)
wrapAndCopyInto:471, AbstractPipeline (java.util.stream)
evaluateSequential:708, ReduceOps$ReduceOp (java.util.stream)
evaluate:234, AbstractPipeline (java.util.stream)
collect:499, ReferencePipeline (java.util.stream)
main:20, Java8Stream (com.alibaba.alink)

這裡也只列舉大致可對應或者可參考的。

物理執行圖: JobManager根據ExecutionGraph對工作進行調度後,在各個TaskManager上部署任務後形成的「圖」,並不是一個具體的數據結構。

  • 任務:執行被調度後在分配的TaskManager中啟動對應的Task。Task包裹了具有用戶執行邏輯的運算符。
  • ResultPartition:代表由一個任務的生成的數據,和ExecutionGraph中的IntermediateResultPartition一一對應。
  • ResultSubpartition:是ResultPartition的一個子分區。每個ResultPartition包含多個ResultSubpartition,其數目要由下游消費任務數和DistributionPattern來決定。
  • InputGate:代表任務的輸入封裝,和JobGraph中JobEdge一一對應每個InputGate消費了一個或多個的ResultPartition。
  • InputChannel:每個InputGate會包含一個以上的InputChannel,和ExecutionGraph中的ExecutionEdge一一對應,也和ResultSubpartition一對一地相連,即一個InputChannel接收一個ResultSubpartition的輸出。

Flink會根據ExecutionJobVertices的數量創建異步任務。並且給每個ExecutionJobVertices分配適當的slot,然後調用execution.deploy();方法。

從Execution Graph到真正物理執行圖轉換時候,會將IntermediateResultPartition轉化成ResultPartition,ExecutionEdge轉成InputChannelDeploymentDescriptor(最終會在執行時轉化成InputGate)。

最後通過RPC方法提交task,實際會調用到TaskExecutor.submitTask方法中。這個方法會創建真正的Task,然後調用task.startTaskThread();開始task的執行。

0x04 數據並行

4.1 對比

4.1.1 範疇

Java Stream 的並行指的是在JVM內部並行。

Flink 並行的範疇就大得多。首先Task Manager是JVM層級,在Task Manager內部又有多個slot任務槽可以並行。其次多個Task Manager即可在同一個機器上,也可以在不同機器上。

Flink中的執行資源是通過任務槽定義。每個TaskManager都有一個或多個任務槽,每個任務槽可以運行一個並行任務的流水線(pipeline)。流水線由多個連續的任務組成,例如 MapFunction 的第n個並行實例和 ReduceFunction 的第n個並行實例。

所以Flink 並行的範疇包括:

  • JVM內部Slot概念。
  • 同一個機器的JVM之間。
  • 不同機器的JVM之間。

4.1.2 並行度影響因素

Java Stream 並行流內部使用了默認的ForkJoinPool線程池,所以它默認的線程數量就是處理器的數量,通過Runtime.getRuntime().availableProcessors()可以得到這個值。如果需修改則需設置-Djava.util.concurrent.ForkJoinPool.common.parallelism=xxx。

Flink 並行度具體設置取決於部署模式。

  • 如果Standalone模式,則並行度是通過配置來調整。
  • 如果是Yarn來控制資源調度,則Flink on YARN時的容器數量——亦即TaskManager數量——將由程序的並行度自動推算。

4.2 Java Stream實現

parallelStream是一個並行執行的流,其使用 fork/join (ForkJoinPool)並行方式來拆分任務和加速處理過程。研究parallelStream之前,搞清楚ForkJoinPool是很有必要的。

ForkJoinPool的核心是採用分治法的思想,將一個大任務拆分為若干互不依賴的子任務,把這些子任務分別放到不同的隊列里,並為每個隊列創建一個單獨的線程來執行隊列里的任務。

同時,為了最大限度地提高並行處理能力,採用了工作竊取算法來運行任務,也就是說當某個線程處理完自己工作隊列中的任務後,嘗試當其他線程的工作隊列中竊取一個任務來執行,直到所有任務處理完畢。所以為了減少線程之間的競爭,通常會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執行。

我們修改下代碼,增加 .parallel() 調用,這樣就從串行進化成了並行。

import com.google.common.collect.Lists;
import java.util.List;
import java.util.stream.Collectors;

public class Java8Stream {
    public static void main(String[] args) {
        List<String> list = Lists.newArrayList(
                "bcd", "cde", "def", "abc");
        List<String> result = list.stream()
                .parallel()
                .filter(e -> e.length() >= 3)
                .map(e -> e.charAt(0))
                .map(e -> String.valueOf(e))
                .collect(Collectors.toList());
        System.out.println(result);
    }
}

AbstractPipeline 中能看到,就是標記個並行的標記,設置為true。sourceStage其實就是自身代表的算子。

private boolean parallel;
private final AbstractPipeline sourceStage;

public final S parallel() {
        sourceStage.parallel = true;
        return (S) this;
}

程序是在collect處開始執行的。

public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
        A container;
        container = evaluate(ReduceOps.makeRef(collector));
}

執行時候如果設置了並行,就會並行調用。

    final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
        assert getOutputShape() == terminalOp.inputShape();
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;

        return isParallel()
               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    }

evaluateParallel 此處並行調用主要是通過 ReduceOp —> ReduceTask 來完成的。

private static abstract class ReduceOp<T, R, S extends AccumulatingSink<T, R, S>>        implements TerminalOp<T, R> {
        @Override
        public <P_IN> R evaluateParallel(PipelineHelper<T> helper,
                                         Spliterator<P_IN> spliterator) {
            return new ReduceTask<>(this, helper, spliterator).invoke().get();
        }
}

這時候會發現,該方法中new了一個ReduceTask類,然後調用了它的invoke()方法,看看ReduceTask類相關信息,最後會發現它的繼承鏈是這樣的:

ReduceTask -----> AbstractTask -----> CountedCompleter -------> ForkJoinTask

可以看出所有的Task 都繼承自Jdk7 中引入的ForkJoin 並行框架的ForkJoinTask。所以我們可以看出Stream 的並行是依賴於ForkJoin 框架的。

abstract class AbstractTask<P_IN, P_OUT, R, K extends AbstractTask<P_IN, P_OUT, R, K>>        extends CountedCompleter<R> {
    @Override
    public void compute() {
        Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators
        long sizeEstimate = rs.estimateSize();
        long sizeThreshold = getTargetSize(sizeEstimate);
        boolean forkRight = false;
        @SuppressWarnings("unchecked") K task = (K) this;
        while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) {
            K leftChild, rightChild, taskToFork;
            task.leftChild  = leftChild = task.makeChild(ls);
            task.rightChild = rightChild = task.makeChild(rs);
            task.setPendingCount(1);
            if (forkRight) {
                forkRight = false;
                rs = ls;
                task = leftChild;
                taskToFork = rightChild;
            }
            else {
                forkRight = true;
                task = rightChild;
                taskToFork = leftChild;
            }
            taskToFork.fork();
            sizeEstimate = rs.estimateSize();
        }
        task.setLocalResult(task.doLeaf());
        task.tryComplete();
    }    
}

調用棧是:

compute:297, AbstractTask (java.util.stream)
exec:731, CountedCompleter (java.util.concurrent)
doExec:289, ForkJoinTask (java.util.concurrent)
doInvoke:401, ForkJoinTask (java.util.concurrent)
invoke:734, ForkJoinTask (java.util.concurrent)
evaluateParallel:714, ReduceOps$ReduceOp (java.util.stream)
evaluate:233, AbstractPipeline (java.util.stream)
collect:499, ReferencePipeline (java.util.stream)
main:20, Java8Stream (com.alibaba.alink)

這裏面的主要邏輯就是

  • 先調用當前splititerator 方法的estimateSize 方法,預估這個分片中的數據量。
  • 根據預估的數據量獲取最小處理單元的大小閾值,即當數據量已經小於這個閾值的時候進行計算,否則進行fork 將任務劃分成更小的數據塊,進行求解。這裡值得注意的是,getTargetSize 在第一次調用的時候會設置 預測數據量大小 / (默認並發度 * 4) 的結果作為最小執行單元的數量(配置的默認值是cpu 數 – 1,可以通過java.util.concurrent.ForkJoinPool.common.parallelism設置)。
  • 如果當前分片大小仍然大於處理數據單元的閾值,且分片繼續嘗試切分成功,那麼就繼續切分,分別將左右分片的任務創建為新的Task,並且將當前的任務關聯為兩個新任務的父級任務(邏輯在makeChild 裏面)。
  • 先後對左右子節點的任務進行fork,對另外的分區進行分解。同時設定pending 為1,這代表一個task 實際上只會有一個等待的子節點(被fork)。
  • 當任務已經分解到足夠小的時候退出循環,嘗試進行結束。調用子類實現的doLeaf方法,完成最小計算單元的計算任務,並設置到當前任務的localResult中。
  • 調用tryComplete 方法進行最終任務的掃尾工作,如果該任務pending 值不等於0,則原子的減1,如果已經等於0,說明任務都已經完成,則調用onCompletion 回調,如果該任務是葉子任務,則直接銷毀中間數據結束;如果是中間節點會將左右子節點的結果進行合併。
  • 檢查如果這個任務已經沒有父級任務了,則將該任務置為正常結束,如果還有則嘗試遞歸的去調用父級節點的onCompletion回調,逐級進行任務的合併。

0x05 Java Stream在Flink的應用

我從Flink的各個release版本找了下,發現Flink從1.5開始才引入 Java Stream,源碼中只有三處使用到

但是最新的代碼則有幾百處調用

這說明起初,Flink開發者中大概只有一個兄弟一時興起實驗了 Java Stream,結果發現很好用,就陸續推廣開來。

我們還要發現,Flink在 Java Stream 的用法上,並沒有使用其並行版本。

個人覺得,Flink框架中使用 Java Stream 的並行版本對於框架性能提高意義不大,反而會造成調試差錯的難度( 需要時刻考慮線程安全問題。否則可能造成程序死鎖,或數據的準確性。造成的後果完全取決於使用非線程安全類的效果 ),所以Flink沒有使用其並行版本。但是用戶在自己代碼中可以使用其並行版本。

0x06 總結

這裡我們再總結下。Flink 和 Java Stream 最值得比較的三個方面就是:數據流模型,流水線,數據並行

6.1 數據流模型

大家思路都很類似,就是用一種類似用 SQL 語句從數據庫查詢數據的直觀方式來提供一種對運算和表達的高階抽象。這種抽象其實在目前已經是很多框架和語言的必備了。

6.2 流水線

Flink 中的執行圖( Flink這裡形成了圖結構 )可以分成四層:StreamGraph —> JobGraph —> ExecutionGraph -> 物理執行圖

Java Stream 的流水線可以分為兩層:Stage —> Sink,即 “流水線構建階段” 和 “流水線執行階段”。

Java Stream Stage部分只是概念上的構建。類似於Flink的StreamGraphHead就類似於Flink的Source

Java Stream Sink 接口是執行階段用到的。類似於Flink中的ExecutionGraph,每一個Sink相當於ExecutionVertex。

Sink: Sink 接口是執行階段用到的。類似於Flink中的ExecutionGraph,每一個Sink相當於ExecutionVertex

Java Stream 有無狀態和有狀態的中間操作這種屬於算子的邏輯概念,Flink對應的算子也具有類似的區別

因為 Java Stream 是一個惰性求值的系統,所以直到當執行如下時候,才會進行最後求值。這一步驟就相當於Flink程序需要加一個 print,env.execute 才能運行

6.3 數據並行

Java Stream 的並行指的是在JVM內部並行。

Flink 並行的範疇就大得多。Flink的範疇包括:

  • JVM內部Slot概念
  • 同一個機器的JVM之間
  • 不同機器的JVM之間

Java Stream 並行流內部使用了默認的ForkJoinPool線程池,所以它默認的線程數量就是處理器的數量。

Flink 並行度具體設置取決於部署模式。

  • 如果Standalone模式,則並行度是通過配置來調整。
  • 如果是Yarn來控制資源調度,則Flink on YARN時的容器數量——亦即TaskManager數量——將由程序的並行度自動推算。

0xFF 參考

Java 8 Stream

[三]java8 函數式編程Stream 概念深入理解 Stream 運行原理 Stream設計思路

java8學習總結——Stream的理解

深入理解Java8中Stream的實現原理

淺析Java8 Stream原理

java8 Stream的實現原理 (從零開始實現一個stream流)

Java8 Stream 並行計算實現的原理

java8Stream原理深度解析

為什麼說Java8的Stream並行流底層使用了Fork/Join框架

記一次java8 parallelStream使用不當引發的血案

深入理解Java Stream流水線

java8 Stream Pipelines 淺析

Java 8 Stream(2)-原理解析