[源碼分析] 從FlatMap用法到Flink的內部實現

  • 2020 年 3 月 30 日
  • 筆記

[源碼分析] 從FlatMap用法到Flink的內部實現

0x00 摘要

本文將從FlatMap概念和如何使用開始入手,深入到Flink是如何實現FlatMap。希望能讓大家對這個概念有更深入的理解。

0x01 Map vs FlatMap

首先我們先從概念入手。

自從響應式編程慢慢壯大以來,這兩個單詞現在越來越被大家熟悉了。前端能見到它們的身影,後台也能見到;Android裡面有,iOS也有。很多兄弟剛遇到它們時候是懵圈的,搞不清楚之間的區別。下面我就給大家簡單講解下。

map

它把數組流中的每一個值,使用所提供的函數執行一遍,一一對應。得到與元素個數相同的數組流。然後返回這個新數據流。

flatMap

flat是扁平的意思。所以這個操作是:先映射(map),再拍扁(join)。

flatMap輸入可能是多個子數組流。所以flatMap先針對 每個子數組流的每個元素進行映射操作。然後進行扁平化處理,最後彙集所有進行扁平化處理的結果集形成一個新的列表(扁平化簡而言之就是去除所有的修飾)。

flatMap與map另外一個不一樣的地方就是傳入的函數在處理完後返回值必須是List。

實例

比如拿到一個文本文件之後,我們是按行讀取,按行處理。如果要對每一行的單詞數進行計數,那麼應該選擇Map方法,如果是統計詞頻,就應該選擇flatMap方法。

如果還不清楚,可以看看下面這個例子:

梁山新進一批好馬,準備給每個馬軍頭領配置一批。於是得到函數以及頭領名單如下:    函數 = ( 頭領 => 頭領 + 好馬 )  五虎將 = List(關勝、林沖、秦明、呼延灼、董平 )  八驃騎 = List(花榮、徐寧、楊志、索超、張清、朱仝、史進、穆弘 )    // Map函數的例子  利用map函數,我們可以得到 五虎將馬軍    五虎將馬軍 = 五虎將.map( 頭領 => 頭領 + 好馬 )  結果是 List( 關勝 + 馬、林沖 + 馬、秦明 + 馬、呼延灼 + 馬、董平 + 馬 )    // flatMap函數的例子  但是為了得到統一的馬軍,則可以用flatMap:    馬軍頭領 = List(五虎將,八驃騎)  馬軍 = 馬軍頭領.flatMap( 頭領 => 頭領 + 好馬 )    結果就是:List( 關勝 + 馬、林沖 + 馬、秦明 + 馬、呼延灼 + 馬、董平 + 馬,花榮 + 馬、徐寧 + 馬、楊志 + 馬、索超 + 馬、張清 + 馬、朱仝 + 馬、史進 + 馬、穆弘 + 馬 )  

現在大家應該清楚了吧。接下來看看幾個FlatMap的實例。

Scala語言的實現

Scala本身對於List類型就有map和flatMap操作。舉例如下:

val names = List("Alice","James","Apple")  val strings = names.map(x => x.toUpperCase)  println(strings)  // 輸出 List(ALICE, JAMES, APPLE)    val chars = names.flatMap(x=> x.toUpperCase())  println(chars)  // 輸出 List(A, L, I, C, E, J, A, M, E, S, A, P, P, L, E)  

Flink的例子

以上是scala語言層面的實現。下面我們看看Flink框架是如何使用FlatMap的。

網上常見的一個Flink應用的例子:

//載入數據源  val source = env.fromElements("china is the best country","beijing is the capital of china")    //轉化處理數據  val ds = source.flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1)  

Flink源碼中的例子

case class WordWithCount(word: String, count: Long)    val text = env.socketTextStream(host, port, 'n')    val windowCounts = text.flatMap { w => w.split("\s") }    .map { w => WordWithCount(w, 1) }    .keyBy("word")    .timeWindow(Time.seconds(5))    .sum("count")    windowCounts.print()  

上面提到的都是簡單的使用,如果有複雜需求,在Flink中,我們可以通過繼承FlatMapFunction和RichFlatMapFunction來自定義運算元。

函數類FlatMapFunction

對於不涉及到狀態的使用,可以直接繼承 FlatMapFunction,其定義如下:

@Public  @FunctionalInterface  public interface FlatMapFunction<T, O> extends Function, Serializable {  	void flatMap(T value, Collector<O> out) throws Exception;  }  

如何自定義運算元呢,這個可以直接看看Flink中的官方例子

// FlatMapFunction that tokenizes a String by whitespace characters and emits all String tokens.  public class Tokenizer implements FlatMapFunction<String, String> {    @Override    public void flatMap(String value, Collector<String> out) {      for (String token : value.split("\W")) {        out.collect(token);      }    }  }    // [...]  DataSet<String> textLines = // [...]  DataSet<String> words = textLines.flatMap(new Tokenizer());  

Rich函數類RichFlatMapFunction

對於涉及到狀態的情況,用戶可以使用繼承 RichFlatMapFunction 類的方式來實現UDF。

RichFlatMapFunction屬於Flink的Rich函數類。從名稱上來看,這種函數類在普通的函數類上增加了Rich前綴,比如RichMapFunctionRichFlatMapFunctionRichReduceFunction等等。比起普通的函數類,Rich函數類增加了:

  • open()方法:Flink在運算元調用前會執行這個方法,可以用來進行一些初始化工作。
  • close()方法:Flink在運算元最後一次調用結束後執行這個方法,可以用來釋放一些資源。
  • getRuntimeContext方法:獲取運行時上下文。每個並行的運算元子任務都有一個運行時上下文,上下文記錄了這個運算元運行過程中的一些資訊,包括運算元當前的並行度、運算元子任務序號、廣播數據、累加器、監控數據。最重要的是,我們可以從上下文里獲取狀態數據

FlatMap對應的RichFlatMapFunction如下:

@Public  public abstract class RichFlatMapFunction<IN, OUT> extends AbstractRichFunction implements FlatMapFunction<IN, OUT> {  	@Override  	public abstract void flatMap(IN value, Collector<OUT> out) throws Exception;  }  

其基類 AbstractRichFunction 如下,可以看到主要是和運行時上下文建立了聯繫,並且有初始化和退出操作

@Public  public abstract class AbstractRichFunction implements RichFunction, Serializable {    	private transient RuntimeContext runtimeContext;    	@Override  	public void setRuntimeContext(RuntimeContext t) {  		this.runtimeContext = t;  	}    	@Override  	public RuntimeContext getRuntimeContext() {  			return this.runtimeContext;  	}    	@Override  	public IterationRuntimeContext getIterationRuntimeContext() {      if (this.runtimeContext instanceof IterationRuntimeContext) {  			return (IterationRuntimeContext) this.runtimeContext;  		}  	}    	@Override  	public void open(Configuration parameters) throws Exception {}    	@Override  	public void close() throws Exception {}  }  

如何最好的使用? 當然還是官方文檔和例子最靠譜。

因為涉及到狀態,所以如果使用,你必須創建一個 StateDescriptor,才能得到對應的狀態句柄。 這保存了狀態名稱(你可以創建多個狀態,並且它們必須具有唯一的名稱以便可以引用它們),狀態所持有值的類型,並且可能包含用戶指定的函數,例如ReduceFunction。 根據不同的狀態類型,可以創建ValueStateDescriptorListStateDescriptorReducingStateDescriptorFoldingStateDescriptorMapStateDescriptor

狀態通過 RuntimeContext 進行訪問,因此只能在 rich functions 中使用。 但是我們也會看到一個例子。RichFunctionRuntimeContext 提供如下方法:

  • ValueState getState(ValueStateDescriptor)
  • ReducingState getReducingState(ReducingStateDescriptor)
  • ListState getListState(ListStateDescriptor)
  • AggregatingState getAggregatingState(AggregatingStateDescriptor)
  • FoldingState getFoldingState(FoldingStateDescriptor)
  • MapState getMapState(MapStateDescriptor)

下面是一個 FlatMapFunction 的例子,展示了如何將這些部分組合起來:

class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] {      private var sum: ValueState[(Long, Long)] = _      override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {        // access the state value      val tmpCurrentSum = sum.value        // If it hasn't been used before, it will be null      val currentSum = if (tmpCurrentSum != null) {        tmpCurrentSum      } else {        (0L, 0L)      }        // update the count      val newSum = (currentSum._1 + 1, currentSum._2 + input._2)        // update the state      sum.update(newSum)        // if the count reaches 2, emit the average and clear the state      if (newSum._1 >= 2) {        out.collect((input._1, newSum._2 / newSum._1))        sum.clear()      }    }      override def open(parameters: Configuration): Unit = {      sum = getRuntimeContext.getState(        new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])      )    }  }    object ExampleCountWindowAverage extends App {    val env = StreamExecutionEnvironment.getExecutionEnvironment      env.fromCollection(List(      (1L, 3L),      (1L, 5L),      (1L, 7L),      (1L, 4L),      (1L, 2L)    )).keyBy(_._1)      .flatMap(new CountWindowAverage())      .print()    // the printed output will be (1,4) and (1,5)      env.execute("ExampleManagedState")  }  

這個例子實現了一個簡單的計數窗口。 我們把元組的第一個元素當作 key(在示例中都 key 都是 「1」)。 該函數將出現的次數以及總和存儲在 「ValueState」 中。 一旦出現次數達到 2,則將平均值發送到下游,並清除狀態重新開始。 請注意,我們會為每個不同的 key(元組中第一個元素)保存一個單獨的值。

0x03 從Flink源碼入手看FlatMap實現

FlatMap從Flink編程模型角度講屬於一個運算元,用來對數據流或者數據集進行轉換。從框架角度說,FlatMap是怎麼實現的呢? 或者說FlatMap是怎麼從用戶程式碼轉換到Flink運行時呢 ?

1. DataSet

首先說說 DataSet相關這套系統中FlatMap的實現。

請注意,DataSteam對應的那套系統中,operator名字都是帶著Stream的,比如StreamOperator。

DataSet

val ds = source.flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1) 這段程式碼調用的就是DataSet中的API。具體如下:

public abstract class DataSet<T> {    	public <R> FlatMapOperator<T, R> flatMap(FlatMapFunction<T, R> flatMapper) {    		String callLocation = Utils.getCallLocationName();    		TypeInformation<R> resultType = TypeExtractor.getFlatMapReturnTypes(flatMapper, getType(), callLocation, true);  		return new FlatMapOperator<>(this, resultType, clean(flatMapper), callLocation);  	}  }  

FlatMapOperator

可以看出,flatMap @ DataSet 主要就是生成了一個FlatMapOperator,這個可以理解為是邏輯運算元。其定義如下:

public class FlatMapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, FlatMapOperator<IN, OUT>> {    	protected final FlatMapFunction<IN, OUT> function;  	protected final String defaultName;    	public FlatMapOperator(DataSet<IN> input, TypeInformation<OUT> resultType, FlatMapFunction<IN, OUT> function, String defaultName) {  		super(input, resultType);  		this.function = function;  		this.defaultName = defaultName;  	}    	@Override  	protected FlatMapFunction<IN, OUT> getFunction() {  		return function;  	}      // 這個translateToDataFlow就是生成計劃(Plan)的關鍵程式碼  	@Override  	protected FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN, OUT>> translateToDataFlow(Operator<IN> input) {  		String name = getName() != null ? getName() : "FlatMap at " + defaultName;  		// create operator  		FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN, OUT>> po = new FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN, OUT>>(function,  			new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);  		// set input  		po.setInput(input);  		// set parallelism  		if (this.getParallelism() > 0) {  			// use specified parallelism  			po.setParallelism(this.getParallelism());  		} else {  			// if no parallelism has been specified, use parallelism of input operator to enable chaining  			po.setParallelism(input.getParallelism());  		}  		return po;  	}  }  

FlatMapOperator的基類如下:

public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOperator<IN, OUT, O>> extends SingleInputOperator<IN, OUT, O> implements UdfOperator<O> {    }    // Base class for operations that operates on a single input data set.  public abstract class SingleInputOperator<IN, OUT, O extends SingleInputOperator<IN, OUT, O>> extends Operator<OUT, O> {    	private final DataSet<IN> input;  }  

生成計劃

DataSet API所編寫的批處理程式跟DataStream API所編寫的流處理程式在生成作業圖(JobGraph)之前的實現差別很大。流處理程式是生成流圖(StreamGraph),而批處理程式是生成計劃(Plan)並由優化器對其進行優化並生成優化後的計劃(OptimizedPlan)。

計劃(Plan)以數據流(dataflow)的形式來表示批處理程式,但它只是批處理程式最初的表示,在一個批處理程式生成作業圖之前,計劃還會被進行優化以產生更高效的方案。Plan不同於流圖(StreamGraph),它以sink為入口,因為一個批處理程式可能存在若干個sink,所以Plan採用集合來存儲它。另外Plan還封裝了批處理作業的一些基本屬性:jobId、jobName以及defaultParallelism等。

生成Plan的核心部件是運算元翻譯器(OperatorTranslation),createProgramPlan方法通過它來」翻譯「出計劃,核心程式碼如下

public class OperatorTranslation {       // 接收每個需遍歷的DataSink對象,然後將其轉換成GenericDataSinkBase對象     public Plan translateToPlan(List<DataSink<?>> sinks, String jobName) {         List<GenericDataSinkBase<?>> planSinks = new ArrayList<>();         //遍歷sinks集合         for (DataSink<?> sink : sinks) {              //將翻譯生成的GenericDataSinkBase加入planSinks集合*,對每個sink進行」翻譯「              planSinks.add(translate(sink));          }         //以planSins集合構建Plan對象         Plan p = new Plan(planSinks);         p.setJobName(jobName);         return p;      }    	private <I, O> org.apache.flink.api.common.operators.Operator<O>    translateSingleInputOperator(SingleInputOperator<?, ?, ?> op) {      //會調用到 FlatMapOperator 的 translateToDataFlow  	org.apache.flink.api.common.operators.Operator<O> dataFlowOp = typedOp.translateToDataFlow(input);    }    }  

FlatMapOperatorBase就是生成的plan中的一員。

public class FlatMapOperatorBase<IN, OUT, FT extends FlatMapFunction<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> {  	@Override  	protected List<OUT> executeOnCollections(List<IN> input, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {  		FlatMapFunction<IN, OUT> function = userFunction.getUserCodeObject();    		FunctionUtils.setFunctionRuntimeContext(function, ctx);  		FunctionUtils.openFunction(function, parameters);    		ArrayList<OUT> result = new ArrayList<OUT>(input.size());    		TypeSerializer<IN> inSerializer = getOperatorInfo().getInputType().createSerializer(executionConfig);  		TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);    		CopyingListCollector<OUT> resultCollector = new CopyingListCollector<OUT>(result, outSerializer);    		for (IN element : input) {  			IN inCopy = inSerializer.copy(element);  			function.flatMap(inCopy, resultCollector);  		}    		FunctionUtils.closeFunction(function);    		return result;  	}  }  

而最後優化時候,則FlatMapOperatorBase會被優化成FlatMapNode。

public class GraphCreatingVisitor implements Visitor<Operator<?>> {  	public boolean preVisit(Operator<?> c) {      else if (c instanceof FlatMapOperatorBase) {  			n = new FlatMapNode((FlatMapOperatorBase<?, ?, ?>) c);  		}    }  }  

自此,FlatMap就被組合到 DataSet的 OptimizedPlan 中。下一步Flink會依據OptimizedPlan來生成 JobGraph。

作業圖(JobGraph)是唯一被Flink的數據流引擎所識別的表述作業的數據結構,也正是這一共同的抽象體現了流處理和批處理在運行時的統一。至此就完成了從用戶業務程式碼到Flink運行系統的轉化。

在運行狀態下,如果上游有數據流入,則FlatMap這個運算元就會發揮作用。

2. DataStream

對於DataStream,則是另外一套體系結構。首先我們找一個使用DataStream的例子看看。

//獲取數據: 從socket中獲取  val textDataStream = env.socketTextStream("127.0.0.1", 8888, 'n')  val tupDataStream = textDataStream.flatMap(_.split(" ")).map(WordWithCount(_,1))    //groupby: 按照指定的欄位聚合  val windowDstram = tupDataStream.keyBy("word").timeWindow(Time.seconds(5),Time.seconds(1))  windowDstram.sum("count").print()  

上面例子中,flatMap 調用的是DataStream中的API,具體如下:

public class DataStream<T> {    	public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {      //clean函數用來移除FlatMapFunction類對象的外部類部分,這樣就可以進行序列化      //getType用來獲取類對象的輸出類型  		TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),  				getType(), Utils.getCallLocationName(), true);  		return flatMap(flatMapper, outType);  	}      // 構建了一個StreamFlatMap的Operator  	public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper, TypeInformation<R> outputType) {  		return transform("Flat Map", outputType, new StreamFlatMap<>(clean(flatMapper)));  	}      // 依次調用下去  	@PublicEvolving  	public <R> SingleOutputStreamOperator<R> transform(  			String operatorName,  			TypeInformation<R> outTypeInfo,  			OneInputStreamOperator<T, R> operator) {  		return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));  	}    	protected <R> SingleOutputStreamOperator<R> doTransform(  			String operatorName,  			TypeInformation<R> outTypeInfo,  			StreamOperatorFactory<R> operatorFactory) {  		// read the output type of the input Transform to coax out errors about MissingTypeInfo  		transformation.getOutputType();      // 構建Transform對象,Transform對象中包含其上游Transform對象,這樣上游下游就串成了一個Transform鏈。  		OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(  				this.transformation,  				operatorName,  				operatorFactory,  				outTypeInfo,  				environment.getParallelism());  		@SuppressWarnings({"unchecked", "rawtypes"})  		SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);      // 將這Transform對象放入env的transform對象列表。  		getExecutionEnvironment().addOperator(resultTransform);      // 返迴流  		return returnStream;  	}  }  

上面源碼中的幾個概念需要澄清。

Transformation:首先,FlatMap在FLink編程模型中是運算元API,在DataStream中會生成一個Transformation,即邏輯運算元。

邏輯運算元Transformation最後會對應到物理運算元Operator,這個概念對應的就是StreamOperator

StreamOperator:DataStream 上的每一個 Transformation 都對應了一個 StreamOperator,StreamOperator是運行時的具體實現,會決定UDF(User-Defined Funtion)的調用方式。

processElement()方法也是UDF的邏輯被調用的地方,例如FlatMapFunction里的flatMap()方法。

public class StreamFlatMap<IN, OUT>  		extends AbstractUdfStreamOperator<OUT, FlatMapFunction<IN, OUT>>  		implements OneInputStreamOperator<IN, OUT> {    	private transient TimestampedCollector<OUT> collector;    	public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper) {  		super(flatMapper);  		chainingStrategy = ChainingStrategy.ALWAYS;  	}    	@Override  	public void open() throws Exception {  		super.open();  		collector = new TimestampedCollector<>(output);  	}    	@Override  	public void processElement(StreamRecord<IN> element) throws Exception {  		collector.setTimestamp(element);      // 調用用戶定義的FlatMap  		userFunction.flatMap(element.getValue(), collector);  	}  }  

我們可以看到,StreamFlatMap繼承了AbstractUdfStreamOperator,從而間接繼承了StreamOperator。

public abstract class AbstractStreamOperator<OUT>  		implements StreamOperator<OUT>, SetupableStreamOperator<OUT>, Serializable {  }  

StreamOperator是根介面。對於 Streaming 來說所有的運算元都繼承自 StreamOperator。繼承了StreamOperator的擴展介面則有OneInputStreamOperator,TwoInputStreamOperator。實現了StreamOperator的抽象類有AbstractStreamOperator以及它的子類AbstractUdfStreamOperator。

從 API 到 邏輯運算元 Transformation,再到 物理運算元Operator,就生成了 StreamGraph。下一步Flink會依據StreamOperator來生成 JobGraph。

作業圖(JobGraph)是唯一被Flink的數據流引擎所識別的表述作業的數據結構,也正是這一共同的抽象體現了流處理和批處理在運行時的統一。至此就完成了從用戶業務程式碼到Flink運行系統的轉化。

0x04 參考

Flink中richfunction的一點小作用

【淺顯易懂】scala中map與flatMap的區別

Working with State

flink簡單應用: scala編寫wordcount

【Flink】Flink基礎之實現WordCount程式(Java與Scala版本)

Flink進階教程:以flatMap為例,如何進行運算元自定義

Flink運行時之批處理程式生成計劃