[源码解析] 当 Java Stream 遇见 Flink

[源码解析] 当 Java Stream 遇见 Flink

0x00 摘要

在分析Alink源码的时候,发现Alink使用了 Java Stream,又去Flink源码搜索,发现Flink也有大量使用。一时兴起,想看看 Java Stream 和 Flink 这种流处理框架的异同点。当然这种比较还是注重于理念和设计思路上的。因为就应用领域和复杂程度来说, Java Stream 和 Flink 属于数量级别的差距。

因为Flink的分析文章我写了一些,所以本文源码的分析重点在Java Stream上。

0x01 领域

从几个权威来源可以看看Flink本质:

  • 我们直接从官网找出Flink本质:Apache Flink® — Stateful Computations over Data Streams,即 数据流上的有状态计算

  • 从github上看:Apache Flink is an open source stream processing framework with powerful stream- and batch-processing capabilities.

  • 从百度百科上看:Flink 其核心是用Java和Scala编写的分布式流数据流引擎。Flink以数据并行和流水线方式执行任意流数据程序,Flink的流水线运行时系统可以执行批处理和流处理程序。

因此可以总结如下:Flink 是分布式流数据计算,引擎,框架,系统,各种高大上 ……

1.2 Java Stream

直接看 java doc

Stream :A sequence of elements supporting sequential and parallel aggregate operations.

从其他网址看:

Java 8 API 添加了一个新的抽象称为流Stream,可以让你以一种声明的方式处理数据。

  • 这种风格将要处理的元素集合看作一种流, 流在管道中传输, 并且可以在管道的节点上进行处理, 比如筛选, 排序,聚合等。

  • 元素流在管道中经过中间操作(intermediate operation)的处理,最后由最终操作(terminal operation)得到前面处理的结果。

因此可以看到,Java Stream 是流抽象API,可以使用并行操作。

1.3 探寻角度

因此我们可以看出,Flink 和 Java Stream 最值得比较的三个方面就是:数据流模型,流水线,数据并行

下面我们就从这三个角度来分析。

0x02 数据流模型

2.1 Java Stream

Stream编程风格将要处理的元素集合看作一种流, 流在管道中传输, 并且可以在管道的节点上进行处理, 比如筛选, 排序,聚合等。

元素流在管道中经过中间操作(intermediate operation)的处理,最后由最终操作(terminal operation)得到前面处理的结果。

+--------------------+       +------+   +------+   +---+   +-------+
| stream of elements +-----> |filter+-> |sorted+-> |map+-> |collect|
+--------------------+       +------+   +------+   +---+   +-------+

以上的流程转换为 Java 代码为:

List<Integer> transactionsIds = widgets.stream()
             .filter(b -> b.getColor() == RED)
             .sorted((x,y) -> x.getWeight() - y.getWeight())
             .mapToInt(Widget::getWeight)
             .sum();

官方样例如下

case class WordWithCount(word: String, count: Long)

val text = env.socketTextStream(host, port, '\n')

val windowCounts = text.flatMap { w => w.split("\\s") }
  .map { w => WordWithCount(w, 1) }
  .keyBy("word")
  .timeWindow(Time.seconds(5))
  .sum("count")

windowCounts.print()

2.3 分析

可以看出来,大家思路都很类似,就是用一种类似用 SQL 语句从数据库查询数据的直观方式来提供一种对运算和表达的高阶抽象。这种抽象其实在目前已经是很多框架和语言的必备了。用起来都很爽,调试起来都崩溃。

0x03 流水线

本部分以 Java Stream为主,如果和Flink比较则会重点指出

3.1 总体对比

Java Stream 的流水线是在JVM内部,各种用户自定义函数都是在JVM中随意访问。

Flink的流水线节点可能分布在不同机器的JVM上,用户jar包需要提交给不同的JVM。

Flink 中的执行图可以分成四层:StreamGraph —> JobGraph —> ExecutionGraph -> 物理执行图

  • StreamGraph:是对用户逻辑的映射,代表程序的拓扑结构,是根据用户通过 Stream API 编写的代码生成的最初的图。
  • JobGraph:StreamGraph经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。主要的优化为,将多个符合条件的节点 chain 在一起作为一个节点,这样可以减少数据在节点之间流动所需要的序列化/反序列化/传输消耗。
  • ExecutionGraph:JobManager 根据 JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。
  • 物理执行图:JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个TaskManager 上部署 Task 后形成的“图”,并不是一个具体的数据结构。

Java Stream 的流水线可以分为两层:Stage —> Sink,即 “流水线构建阶段” 和 “流水线执行阶段”。

  • Stage: Stage是概念上的构建。此阶段类似于Flink的StreamGraph,每一个Stage相当于StreamNode
  • Sink: Sink 接口是执行阶段用到的。类似于Flink中的ExecutionGraph,每一个Sink相当于ExecutionVertex

3.2 示例代码

这里的示例代码如下:

import com.google.common.collect.Lists;
import java.util.List;
import java.util.stream.Collectors;

public class Java8Stream {
    public static void main(String[] args) {
        List<String> list = Lists.newArrayList(
                "bcd", "cde", "def", "abc");
        List<String> result = list.stream()
                .filter(e -> e.length() >= 3)
                .map(e -> e.charAt(0))
                .map(e -> String.valueOf(e))
                .collect(Collectors.toList());
        System.out.println(result);
    }
}

3.3 Stream操作分类

Java Stream上的所有操作分为两类:中间操作和结束操作。Flink算子其实也是这么区分,只不过没有像 Java Stream 这么做而已

  • 中间操作只是一种标记,只有结束操作才会触发实际计算。中间操作又可以分为无状态的(Stateless)和有状态的(Stateful),无状态中间操作是指元素的处理不受前面元素的影响,而有状态的中间操作必须等到所有元素处理之后才知道最终结果,比如排序是有状态操作,在读取所有元素之前并不能确定排序结果;
  • 结束操作又可以分为短路操作和非短路操作,短路操作是指不用处理全部元素就可以返回结果,比如找到第一个满足条件的元素。之所以要进行如此精细的划分,是因为底层对每一种情况的处理方式不同。

具体如下:

Stream操作分类
中间操作(Intermediate operations) 无状态(Stateless) unordered() filter() map() mapToInt() mapToLong() mapToDouble() flatMap() flatMapToInt() flatMapToLong() flatMapToDouble() peek()
有状态(Stateful) distinct() sorted() sorted() limit() skip()
结束操作(Terminal operations) 非短路操作 forEach() forEachOrdered() toArray() reduce() collect() max() min() count()
短路操作(short-circuiting) anyMatch() allMatch() noneMatch() findFirst() findAny()

3.4 Stage

3.4.1 分类

源码中把 Stream 的一个操作称为一个 stage,即由很多Stage构成了流水线Flink则要比它复杂很多

Stage包括三类Head,StatefulOp,StatelessOp,它们的继承链是这样的:

Head -----> ReferencePipeline -----> AbstractPipeline -------> PipelineHelper
    
StatefulOp -----> ReferencePipeline -----> AbstractPipeline -------> PipelineHelper
    
StatelessOp -----> ReferencePipeline -----> AbstractPipeline -------> PipelineHelper
    
ReferencePipeline 继承了 AbstractPipeline 和 Stream   

PipelineHelper主要用于Stream执行过程中相关结构的构建。

AbstractPipeline是流水线的核心抽象类,用于构建和管理流水线。它的实现类就是流水线的节点。

AbstractPipeline的直接实现类为ReferencePipeline,而Head 、StatefulOp 、StatelessOp又继承了ReferencePipeline类。因此Head / StatefulOp / StatelessOp 本身也是AbstractPipeline类型的。

3.4.2 AbstractPipeline

AbstractPipeline是流水线的核心,每一个stage就是一个AbstractPipeline的实例,这里的每一个pipeline都是一个节点。AbstractPipeline中定义了三个AbstractPipeline类型的变量:

  • sourceStage(源阶段),即保存的 Head 头节点引用,用于获取保存在头节点关于整个 Stream 处理流程中的关键信息,如是否是并行模式;
  • previousStage(上游pipeline,前一阶段),即当前中间操作节点的上一个节点,因为 Head 为整个双向链表最上游,故其前一个节点为 null;
  • nextStage(下一阶段);

能看到 prev,next 这就是指向前后两个stage,用来构建一个双向链表。

abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>> extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {
    private final AbstractPipeline sourceStage;
    private final AbstractPipeline previousStage;
    private AbstractPipeline nextStage;
}

其他比较重要的属性如下:

  1. sourceSpliterator 数据源的可分解迭代器,并行流中分解任务所需
  2. depth 当前节点的深度,Head 头节点深度为 0,该值在并行流大任务fork()分解子任务时可用于维护任务层级
  3. parallel 是否是并行模式,决定了是否启用 ForkJoinPool 用于并行执行任务

3.5 流水线构建阶段

这部分只是Stage这里这是概念上的构建。类似于Flink的StreamGraph。也为后续的运行做了准备。

示例代码中,通过Collection.stream()方法得到Head也就是stage0,紧接着调用一系列的中间操作,不断产生新的Stream。这些Stream对象以双向链表的形式组织在一起,构成整个流水线,由于每个Stage都记录了前一个Stage和本次的操作以及回调函数,依靠这种结构就能建立起对数据源的所有操作。这就是Stream记录操作的方式。

每一步Stream的方法调用都产生一个新的stage,这些stage会以双向链表的方式链接,而每个stage都记录了每一个阶段的操作,这样我们就可以依赖这种数据结构来保存对数据源的所有操作了。

3.5.1 Head

Head 用于表示第一个Stage,也就是source stage,调用诸如Collection.stream()方法产生的Stage,很显然这个Stage里不包含任何操作;Head就类似于Flink的Source

从程序开始看起。

list.stream()

会调用到

default Stream<E> stream() {
	return StreamSupport.stream(spliterator(), false);
}

然后会建立一个Stream,这个Stream就是一个 ReferencePipeline.Head。这里Head也是一个ReferencePipeline。 static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT>

public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
	return new ReferencePipeline.Head<>(spliterator,
                                           StreamOpFlag.fromCharacteristics(spliterator),
                                           parallel);
}

3.5.2 中间操作

StatelessOp和StatefulOp分别表示无状态和有状态的Stage,对应于无状态和有状态的中间操作

这种属于算子的逻辑概念,Flink对应的算子也具有类似的区别

示例代码中,可以看到 filter 返回了一个无状态stage,也是一个AbstractPipeline、stream,即是流水线的一个阶段。同时还实现了AbstractPipeline定义的opWrapSink方法。其重写的 opWrapSink() 规定了该操作的下游操作的Sink是如何组织数据处理逻辑的。

后续的filter,map都分别构建了一个StatelessOp。这里需要注意的是:每个StatelessOp都在其内部有opWrapSink函数,如果调用opWrapSink时候,就会生成一个Sink,其作用我们当分析到程序运行时候会讲解

public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
        Objects.requireNonNull(predicate);
    
        return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SIZED) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
                return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                    @Override
                    public void begin(long size) {
                        downstream.begin(-1);
                    }

                    @Override
                    public void accept(P_OUT u) {
                        if (predicate.test(u))
                            downstream.accept(u);
                    }
                };
            }
        };
}

这里要注意的是构建了一个双向链表。比如filter的构建最终调用到:

AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
        previousStage.nextStage = this; // 构建双向链表
      	this.previousStage = previousStage; // 构建双向链表
        this.sourceStage = previousStage.sourceStage;
        ......
}

所以我们最终得到的,用stage标识的流水线就是如下,注意这里都是双向链表。

+------------+       
| Collection |
+------------+                  
      │ 
      │ stream()
      │ 
+------------+       
|    Head    |
+------------+     
      │ 
      │ filter()
      │
+-------------+       
| StatelessOp |
+-------------+     
      │ 
      │ map()
      │ 
+-------------+       
| StatelessOp |
+-------------+     
      │ 
      │ map()
      │  
+-------------+       
| StatelessOp |
+-------------+     
      │ 
      │ collect()
      │
+------------+       
| TerminalOp |
+------------+     

运行时的流水线摘录如下:

this = {ReferencePipeline$2$1@767} 

 this$1 = {ReferencePipeline$2@688} 
  predicate = {Java8Stream$lambda@681} 
  this$0 = {ReferencePipeline$Head@682} 
  sourceStage = {ReferencePipeline$Head@682} 
  previousStage = {ReferencePipeline$Head@682} 
  sourceOrOpFlags = 128
  nextStage = {ReferencePipeline$3@691} 
  depth = 1
  combinedFlags = 159
  sourceSpliterator = null
  sourceSupplier = null
  linkedOrConsumed = true
  sourceAnyStateful = false
  sourceCloseAction = null
  parallel = false
    
 downstream = {ReferencePipeline$3$1@768} 
  this$1 = {ReferencePipeline$3@691} 
  downstream = {ReferencePipeline$3$1@770} 
   this$1 = {ReferencePipeline$3@694} 
   downstream = {ReduceOps$3ReducingSink@771} 
    supplier = {Collectors$lambda@772} 
    accumulator = {Collectors$lambda@773} 
    combiner = {Collectors$lambda@774} 
    state = {ArrayList@775}  size = 0

Java Stream相对简单,使用 Stage 一个数据结构就都搞定(比如双向链表本身就是Stage双向链表),而Flink则要复杂多了,比如:

  • StreamNode 是用来描述 operator 的逻辑节点,并具有所有相关的属性,如并发度、入边和出边等。
  • StreamEdge 是用来描述两个 StreamNode(operator) 逻辑的链接边。

3.6 流水线执行阶段

因为Stream 是一个惰性求值的系统,所以直到当执行如下时候,才会进行最后求值。这一步骤就相当于Flink程序需要加一个 print,env.execute 才能运行

.collect(Collectors.toList());

调用时候涉及到的部分调用栈如下:

makeSink:180, ReduceOps$3 (java.util.stream)
makeSink:177, ReduceOps$3 (java.util.stream)
evaluateSequential:708, ReduceOps$ReduceOp (java.util.stream)
evaluate:234, AbstractPipeline (java.util.stream)
collect:499, ReferencePipeline (java.util.stream)
main:20, Java8Stream (com.alibaba.alink)

这就牵扯出来Java Stream的另外一部分操作:结束操作(Terminal operations)。

3.6.1 TerminalOp

TerminalOp是流水线上的一个算子,其完成了最后的计算操作。在FindOp、ForEachOp、MatchOp 和 ReduceOp 中会覆盖其evaluateParallel函数。

注意:终结操作不会添加节点

3.6.2 ReduceOp

ReduceOp是TerminalOp的一个具体实现,其执行了一个reduce操作。可以看到 makeSink() 这里做了一个Sink。

每个Stage都会将自己的操作封装到一个Sink里,前一个Stage只需调用后一个Stage的accept()方法即可,并不需要知道其内部是如何处理的。

private static abstract class ReduceOp<T, R, S extends AccumulatingSink<T, R, S>>
            implements TerminalOp<T, R> {
        private final StreamShape inputShape;

        public abstract S makeSink();

        @Override
        public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
                                           Spliterator<P_IN> spliterator) {
            return helper.wrapAndCopyInto(makeSink(), spliterator).get();
        }

        @Override
        public <P_IN> R evaluateParallel(PipelineHelper<T> helper,
                                         Spliterator<P_IN> spliterator) {
            return new ReduceTask<>(this, helper, spliterator).invoke().get();
        }
}

3.6.3 Sink

Sink接口是执行阶段用到的。类似于Flink中的ExecutionVertex

在上一步已经在stage中记录了每一步操作,此时并没有执行。但是stage只是保存了当前的操作,并不能确定下一个stage需要何种操作,何种数据。

JDK为此定义了Sink接口来处理具体操作

interface Sink<T> extends Consumer<T>

Sink接口,其中只有begin()、end()、cancellationRequested()、accept()四个接口,其中间操作的子类中包含一个指向下游sink的指针。

方法名 作用
void begin(long size) 开始遍历元素之前调用该方法,通知Sink做好准备。
void end() 所有元素遍历完成之后调用,通知Sink没有更多的元素了。
boolean cancellationRequested() 是否可以结束操作,可以让短路操作尽早结束。
void accept(T t) 遍历元素时调用,接受一个待处理元素,并对元素进行处理。Stage把自己包含的操作和回调方法封装到该方法里,前一个Stage只需要调用当前Stage.accept(T t)方法就行了。

3.6.4 构建执行链

具体以ReduceOp为例,执行关键其实就是调用到了 AbstractPipeline#wrapAndCopyInto()

public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
                                   Spliterator<P_IN> spliterator) {
	return helper.wrapAndCopyInto(makeSink(), spliterator).get();
}

wrapAndCopyInto其实现如下,正如其名字示意,主要包含了两个步骤:

  1. wrapSink() 从操作链表的尾部开始,调用操作对象自身重写的 opWrapSink()方法将每一个操作对象中的数据处理逻辑封装成 Sink.ChainedReference,并将传入的 Sink 作为新建 Sink 的 downStream,从而形成单向调用链。这部分属于构建阶段
  2. copyInto()从调用链头部开始执行中间操作数据处理逻辑封装成的 Sink 对象的方法,完成对数据源的处理。其实这部分就是执行阶段
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
        copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
        return sink;
}
3.6.4.1 构建Sink链条

当终结操作触发时,以终结操作本身的数据处理逻辑的封装对象 Sink 为起点,从操作链表尾部 stage 逆向遍历,将操作动作中封装的数据处理逻辑封装成 ChaineReference 对象,并将传入的上一个 Sink 引用赋值给新建 Sink 的 downStream 变量,从而形成单向的调用链。

+------+   downStream    +------+  downStream  +------+  downStream  +------+ 
| Sink +---------------> | Sink +------------> | Sink +------------> | Sink +
+------+                 +------+              +------+              +------+

从操作链表的尾部开始,调用操作对象自身重写的 opWrapSink()方法将每一个操作对象中的数据处理逻辑封装成 Sink.ChainedReference,并将传入的 Sink 作为新建 Sink 的 downStream,从而形成单向调用链。

final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
        for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
            sink = p.opWrapSink(p.previousStage.combinedFlags, sink); // 从后往前处理
        }
        return (Sink<P_IN>) sink; // 返回单向调用链
    }

以map算子为例,就是生成 return 了一个Sink.ChainedReference(其也是一个Sink),这些Sink最后会串联在一起,形成Sink链。

public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
        Objects.requireNonNull(mapper);
	return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                  StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                return new Sink.ChainedReference<P_OUT, R>(sink) { // 这里返回Sink
                    @Override
                    public void accept(P_OUT u) {
                        downstream.accept(mapper.apply(u));
                    }
                };
            }
        };
}

ChainedReference包括:

  • begin:在遍历元素前调用,做好遍历准备;
  • accept:遍历每个元素的时候调用,包含每个stage的操作和回掉函数;
  • end:遍历结束后调用
  • cancellationRequested:是否能够尽早结束遍历,用于短路操作

每个stage都把操作实现在Sink里,上游stage调用下游stage的accept方法,达到按顺序执行每个操作的目的。

可以看到调用完成之后

sink = {ReferencePipeline$2$1@741} 
     this$1 = {ReferencePipeline$2@687} 
     downstream = {ReferencePipeline$3$1@742} 
          this$1 = {ReferencePipeline$3@693} 
          downstream = {ReferencePipeline$3$1@739} 
               this$1 = {ReferencePipeline$3@714} 
               downstream = {ReduceOps$3ReducingSink@735} 
                    supplier = {Collectors$lambda@723} 
                    accumulator = {Collectors$lambda@722} 
                    combiner = {Collectors$lambda@721} 
                    state = null

从结束操作的sink开始,一层一层包装sink,最后第一个中间操作的sink在最外层,在每个操作的opWrapSink方法里返回的sink都维护了一个downstream指向后一个操作,这样,双向链表的结构就完成了。这样我们在copyInto方法里调用beginacceptend的时候就会通过downstream一层一层的调用下去,最终在结束操作执行实际计算。

Flink要复杂太多。

ExecutionGraph: JobManager根据JobGraph生成ExecutionGraph.ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。下面只列举和 Java Stream大致能对应的模块。

  • ExecutionJobVertex:和JobGraph中的JobVertex一一对应。每一个ExecutionJobVertex都有和并发度一样多的ExecutionVertex。
  • ExecutionVertex:表示ExecutionJobVertex的其中一个并发子任务,输入是ExecutionEdge,输出是IntermediateResultPartition。一个JobVertex/ExecutionJobVertex代表的是一个operator,而具体的ExecutionVertex则代表了每一个Task。
  • IntermediateResult:和JobGraph中的IntermediateDataSet一一对应每一个IntermediateResult有与下游ExecutionJobVertex相同并发数的IntermediateResultPartition。
  • IntermediateResultPartition:表示ExecutionVertex的一个输出分区,生产者是ExecutionVertex,消费者是若干个ExecutionEdge。
  • ExecutionEdge:表示ExecutionVertex的输入,源是IntermediateResultPartition,目标是ExecutionVertex.source和目标都只能是一个。

3.7 执行调用Sink链

3.7.1 调用执行

有了Sink对操作的包装,Stage之间的调用问题就解决了,执行时只需要从流水线的head开始对数据源依次调用每个Stage对应的Sink.{ begin(), accept(), cancellationRequested(), end() }方法就可以了。一种可能的Sink.accept()方法流程是这样的:

void accept(U u){
    1. 使用当前Sink包装的回调函数处理 u
    2. 将处理结果传递给流水线下游的Sink
}

Sink接口的其他几个方法也是按照这种[处理->转发]的模型实现。

这就是我们前面提到的 “copyInto() 从调用链头部开始执行中间操作数据处理逻辑封装成的 Sink 对象的方法,完成对数据源的处理“,具体调用如下:

final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
        if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
            wrappedSink.begin(spliterator.getExactSizeIfKnown());
            spliterator.forEachRemaining(wrappedSink);  // 调用到这里
            wrappedSink.end();
        }
        else {
            copyIntoWithCancel(wrappedSink, spliterator);
        }
}

会调用到 ArrayListSpliterator 的 forEachRemaining。

public void forEachRemaining(Consumer<? super E> action) {
         for (; i < hi; ++i) {
              @SuppressWarnings("unchecked") E e = (E) a[i];
              action.accept(e);
         }
}

以map算子为例,就是调用到了之前生成的 Sink.ChainedReference(其也是一个Sink)中的 accept 函数,执行本算子的业务操作,然后传递给下游stream调用。

public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
        Objects.requireNonNull(mapper);
	return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                  StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                return new Sink.ChainedReference<P_OUT, R>(sink) {
                    @Override
                    public void accept(P_OUT u) {
                        downstream.accept(mapper.apply(u)); // 调用到下游算子
                    }
                };
            }
        };
}

具体调用栈如下:

lambda$main$0:14, Java8Stream (com.alibaba.alink)
test:-1, 13138721 (com.alibaba.alink.Java8Stream$$Lambda$1)
accept:174, ReferencePipeline$2$1 (java.util.stream)
forEachRemaining:1374, ArrayList$ArrayListSpliterator (java.util)
copyInto:481, AbstractPipeline (java.util.stream)
wrapAndCopyInto:471, AbstractPipeline (java.util.stream)
evaluateSequential:708, ReduceOps$ReduceOp (java.util.stream)
evaluate:234, AbstractPipeline (java.util.stream)
collect:499, ReferencePipeline (java.util.stream)
main:20, Java8Stream (com.alibaba.alink)

这里也只列举大致可对应或者可参考的。

物理执行图: JobManager根据ExecutionGraph对工作进行调度后,在各个TaskManager上部署任务后形成的“图”,并不是一个具体的数据结构。

  • 任务:执行被调度后在分配的TaskManager中启动对应的Task。Task包裹了具有用户执行逻辑的运算符。
  • ResultPartition:代表由一个任务的生成的数据,和ExecutionGraph中的IntermediateResultPartition一一对应。
  • ResultSubpartition:是ResultPartition的一个子分区。每个ResultPartition包含多个ResultSubpartition,其数目要由下游消费任务数和DistributionPattern来决定。
  • InputGate:代表任务的输入封装,和JobGraph中JobEdge一一对应每个InputGate消费了一个或多个的ResultPartition。
  • InputChannel:每个InputGate会包含一个以上的InputChannel,和ExecutionGraph中的ExecutionEdge一一对应,也和ResultSubpartition一对一地相连,即一个InputChannel接收一个ResultSubpartition的输出。

Flink会根据ExecutionJobVertices的数量创建异步任务。并且给每个ExecutionJobVertices分配适当的slot,然后调用execution.deploy();方法。

从Execution Graph到真正物理执行图转换时候,会将IntermediateResultPartition转化成ResultPartition,ExecutionEdge转成InputChannelDeploymentDescriptor(最终会在执行时转化成InputGate)。

最后通过RPC方法提交task,实际会调用到TaskExecutor.submitTask方法中。这个方法会创建真正的Task,然后调用task.startTaskThread();开始task的执行。

0x04 数据并行

4.1 对比

4.1.1 范畴

Java Stream 的并行指的是在JVM内部并行。

Flink 并行的范畴就大得多。首先Task Manager是JVM层级,在Task Manager内部又有多个slot任务槽可以并行。其次多个Task Manager即可在同一个机器上,也可以在不同机器上。

Flink中的执行资源是通过任务槽定义。每个TaskManager都有一个或多个任务槽,每个任务槽可以运行一个并行任务的流水线(pipeline)。流水线由多个连续的任务组成,例如 MapFunction 的第n个并行实例和 ReduceFunction 的第n个并行实例。

所以Flink 并行的范畴包括:

  • JVM内部Slot概念。
  • 同一个机器的JVM之间。
  • 不同机器的JVM之间。

4.1.2 并行度影响因素

Java Stream 并行流内部使用了默认的ForkJoinPool线程池,所以它默认的线程数量就是处理器的数量,通过Runtime.getRuntime().availableProcessors()可以得到这个值。如果需修改则需设置-Djava.util.concurrent.ForkJoinPool.common.parallelism=xxx。

Flink 并行度具体设置取决于部署模式。

  • 如果Standalone模式,则并行度是通过配置来调整。
  • 如果是Yarn来控制资源调度,则Flink on YARN时的容器数量——亦即TaskManager数量——将由程序的并行度自动推算。

4.2 Java Stream实现

parallelStream是一个并行执行的流,其使用 fork/join (ForkJoinPool)并行方式来拆分任务和加速处理过程。研究parallelStream之前,搞清楚ForkJoinPool是很有必要的。

ForkJoinPool的核心是采用分治法的思想,将一个大任务拆分为若干互不依赖的子任务,把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务。

同时,为了最大限度地提高并行处理能力,采用了工作窃取算法来运行任务,也就是说当某个线程处理完自己工作队列中的任务后,尝试当其他线程的工作队列中窃取一个任务来执行,直到所有任务处理完毕。所以为了减少线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。

我们修改下代码,增加 .parallel() 调用,这样就从串行进化成了并行。

import com.google.common.collect.Lists;
import java.util.List;
import java.util.stream.Collectors;

public class Java8Stream {
    public static void main(String[] args) {
        List<String> list = Lists.newArrayList(
                "bcd", "cde", "def", "abc");
        List<String> result = list.stream()
                .parallel()
                .filter(e -> e.length() >= 3)
                .map(e -> e.charAt(0))
                .map(e -> String.valueOf(e))
                .collect(Collectors.toList());
        System.out.println(result);
    }
}

AbstractPipeline 中能看到,就是标记个并行的标记,设置为true。sourceStage其实就是自身代表的算子。

private boolean parallel;
private final AbstractPipeline sourceStage;

public final S parallel() {
        sourceStage.parallel = true;
        return (S) this;
}

程序是在collect处开始执行的。

public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
        A container;
        container = evaluate(ReduceOps.makeRef(collector));
}

执行时候如果设置了并行,就会并行调用。

    final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
        assert getOutputShape() == terminalOp.inputShape();
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;

        return isParallel()
               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    }

evaluateParallel 此处并行调用主要是通过 ReduceOp —> ReduceTask 来完成的。

private static abstract class ReduceOp<T, R, S extends AccumulatingSink<T, R, S>>        implements TerminalOp<T, R> {
        @Override
        public <P_IN> R evaluateParallel(PipelineHelper<T> helper,
                                         Spliterator<P_IN> spliterator) {
            return new ReduceTask<>(this, helper, spliterator).invoke().get();
        }
}

这时候会发现,该方法中new了一个ReduceTask类,然后调用了它的invoke()方法,看看ReduceTask类相关信息,最后会发现它的继承链是这样的:

ReduceTask -----> AbstractTask -----> CountedCompleter -------> ForkJoinTask

可以看出所有的Task 都继承自Jdk7 中引入的ForkJoin 并行框架的ForkJoinTask。所以我们可以看出Stream 的并行是依赖于ForkJoin 框架的。

abstract class AbstractTask<P_IN, P_OUT, R, K extends AbstractTask<P_IN, P_OUT, R, K>>        extends CountedCompleter<R> {
    @Override
    public void compute() {
        Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators
        long sizeEstimate = rs.estimateSize();
        long sizeThreshold = getTargetSize(sizeEstimate);
        boolean forkRight = false;
        @SuppressWarnings("unchecked") K task = (K) this;
        while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) {
            K leftChild, rightChild, taskToFork;
            task.leftChild  = leftChild = task.makeChild(ls);
            task.rightChild = rightChild = task.makeChild(rs);
            task.setPendingCount(1);
            if (forkRight) {
                forkRight = false;
                rs = ls;
                task = leftChild;
                taskToFork = rightChild;
            }
            else {
                forkRight = true;
                task = rightChild;
                taskToFork = leftChild;
            }
            taskToFork.fork();
            sizeEstimate = rs.estimateSize();
        }
        task.setLocalResult(task.doLeaf());
        task.tryComplete();
    }    
}

调用栈是:

compute:297, AbstractTask (java.util.stream)
exec:731, CountedCompleter (java.util.concurrent)
doExec:289, ForkJoinTask (java.util.concurrent)
doInvoke:401, ForkJoinTask (java.util.concurrent)
invoke:734, ForkJoinTask (java.util.concurrent)
evaluateParallel:714, ReduceOps$ReduceOp (java.util.stream)
evaluate:233, AbstractPipeline (java.util.stream)
collect:499, ReferencePipeline (java.util.stream)
main:20, Java8Stream (com.alibaba.alink)

这里面的主要逻辑就是

  • 先调用当前splititerator 方法的estimateSize 方法,预估这个分片中的数据量。
  • 根据预估的数据量获取最小处理单元的大小阈值,即当数据量已经小于这个阈值的时候进行计算,否则进行fork 将任务划分成更小的数据块,进行求解。这里值得注意的是,getTargetSize 在第一次调用的时候会设置 预测数据量大小 / (默认并发度 * 4) 的结果作为最小执行单元的数量(配置的默认值是cpu 数 – 1,可以通过java.util.concurrent.ForkJoinPool.common.parallelism设置)。
  • 如果当前分片大小仍然大于处理数据单元的阈值,且分片继续尝试切分成功,那么就继续切分,分别将左右分片的任务创建为新的Task,并且将当前的任务关联为两个新任务的父级任务(逻辑在makeChild 里面)。
  • 先后对左右子节点的任务进行fork,对另外的分区进行分解。同时设定pending 为1,这代表一个task 实际上只会有一个等待的子节点(被fork)。
  • 当任务已经分解到足够小的时候退出循环,尝试进行结束。调用子类实现的doLeaf方法,完成最小计算单元的计算任务,并设置到当前任务的localResult中。
  • 调用tryComplete 方法进行最终任务的扫尾工作,如果该任务pending 值不等于0,则原子的减1,如果已经等于0,说明任务都已经完成,则调用onCompletion 回调,如果该任务是叶子任务,则直接销毁中间数据结束;如果是中间节点会将左右子节点的结果进行合并。
  • 检查如果这个任务已经没有父级任务了,则将该任务置为正常结束,如果还有则尝试递归的去调用父级节点的onCompletion回调,逐级进行任务的合并。

0x05 Java Stream在Flink的应用

我从Flink的各个release版本找了下,发现Flink从1.5开始才引入 Java Stream,源码中只有三处使用到

但是最新的代码则有几百处调用

这说明起初,Flink开发者中大概只有一个兄弟一时兴起实验了 Java Stream,结果发现很好用,就陆续推广开来。

我们还要发现,Flink在 Java Stream 的用法上,并没有使用其并行版本。

个人觉得,Flink框架中使用 Java Stream 的并行版本对于框架性能提高意义不大,反而会造成调试差错的难度( 需要时刻考虑线程安全问题。否则可能造成程序死锁,或数据的准确性。造成的后果完全取决于使用非线程安全类的效果 ),所以Flink没有使用其并行版本。但是用户在自己代码中可以使用其并行版本。

0x06 总结

这里我们再总结下。Flink 和 Java Stream 最值得比较的三个方面就是:数据流模型,流水线,数据并行

6.1 数据流模型

大家思路都很类似,就是用一种类似用 SQL 语句从数据库查询数据的直观方式来提供一种对运算和表达的高阶抽象。这种抽象其实在目前已经是很多框架和语言的必备了。

6.2 流水线

Flink 中的执行图( Flink这里形成了图结构 )可以分成四层:StreamGraph —> JobGraph —> ExecutionGraph -> 物理执行图

Java Stream 的流水线可以分为两层:Stage —> Sink,即 “流水线构建阶段” 和 “流水线执行阶段”。

Java Stream Stage部分只是概念上的构建。类似于Flink的StreamGraphHead就类似于Flink的Source

Java Stream Sink 接口是执行阶段用到的。类似于Flink中的ExecutionGraph,每一个Sink相当于ExecutionVertex。

Sink: Sink 接口是执行阶段用到的。类似于Flink中的ExecutionGraph,每一个Sink相当于ExecutionVertex

Java Stream 有无状态和有状态的中间操作这种属于算子的逻辑概念,Flink对应的算子也具有类似的区别

因为 Java Stream 是一个惰性求值的系统,所以直到当执行如下时候,才会进行最后求值。这一步骤就相当于Flink程序需要加一个 print,env.execute 才能运行

6.3 数据并行

Java Stream 的并行指的是在JVM内部并行。

Flink 并行的范畴就大得多。Flink的范畴包括:

  • JVM内部Slot概念
  • 同一个机器的JVM之间
  • 不同机器的JVM之间

Java Stream 并行流内部使用了默认的ForkJoinPool线程池,所以它默认的线程数量就是处理器的数量。

Flink 并行度具体设置取决于部署模式。

  • 如果Standalone模式,则并行度是通过配置来调整。
  • 如果是Yarn来控制资源调度,则Flink on YARN时的容器数量——亦即TaskManager数量——将由程序的并行度自动推算。

0xFF 参考

Java 8 Stream

[三]java8 函数式编程Stream 概念深入理解 Stream 运行原理 Stream设计思路

java8学习总结——Stream的理解

深入理解Java8中Stream的实现原理

浅析Java8 Stream原理

java8 Stream的实现原理 (从零开始实现一个stream流)

Java8 Stream 并行计算实现的原理

java8Stream原理深度解析

为什么说Java8的Stream并行流底层使用了Fork/Join框架

记一次java8 parallelStream使用不当引发的血案

深入理解Java Stream流水线

java8 Stream Pipelines 浅析

Java 8 Stream(2)-原理解析