[源碼分析] 從FlatMap用法到Flink的內部實現
- 2020 年 3 月 30 日
- 筆記
[源碼分析] 從FlatMap用法到Flink的內部實現
0x00 摘要
本文將從FlatMap概念和如何使用開始入手,深入到Flink是如何實現FlatMap。希望能讓大家對這個概念有更深入的理解。
0x01 Map vs FlatMap
首先我們先從概念入手。
自從響應式編程慢慢壯大以來,這兩個單詞現在越來越被大家熟悉了。前端能見到它們的身影,後台也能見到;Android裡面有,iOS也有。很多兄弟剛遇到它們時候是懵圈的,搞不清楚之間的區別。下面我就給大家簡單講解下。
map
它把數組流
中的每一個值,使用所提供的函數執行一遍,一一對應。得到與元素個數相同的數組流
。然後返回這個新數據流。
flatMap
flat是扁平的意思。所以這個操作是:先映射(map),再拍扁(join)。
flatMap輸入可能是多個子數組流
。所以flatMap先針對 每個子數組流
的每個元素進行映射操作。然後進行扁平化處理,最後彙集所有進行扁平化處理的結果集形成一個新的列表(扁平化簡而言之就是去除所有的修飾)。
flatMap與map另外一個不一樣的地方就是傳入的函數在處理完後返回值必須是List。
實例
比如拿到一個文本文件之後,我們是按行讀取,按行處理。如果要對每一行的單詞數進行計數,那麼應該選擇Map方法,如果是統計詞頻,就應該選擇flatMap方法。
如果還不清楚,可以看看下面這個例子:
梁山新進一批好馬,準備給每個馬軍頭領配置一批。於是得到函數以及頭領名單如下: 函數 = ( 頭領 => 頭領 + 好馬 ) 五虎將 = List(關勝、林沖、秦明、呼延灼、董平 ) 八驃騎 = List(花榮、徐寧、楊志、索超、張清、朱仝、史進、穆弘 ) // Map函數的例子 利用map函數,我們可以得到 五虎將馬軍 五虎將馬軍 = 五虎將.map( 頭領 => 頭領 + 好馬 ) 結果是 List( 關勝 + 馬、林沖 + 馬、秦明 + 馬、呼延灼 + 馬、董平 + 馬 ) // flatMap函數的例子 但是為了得到統一的馬軍,則可以用flatMap: 馬軍頭領 = List(五虎將,八驃騎) 馬軍 = 馬軍頭領.flatMap( 頭領 => 頭領 + 好馬 ) 結果就是:List( 關勝 + 馬、林沖 + 馬、秦明 + 馬、呼延灼 + 馬、董平 + 馬,花榮 + 馬、徐寧 + 馬、楊志 + 馬、索超 + 馬、張清 + 馬、朱仝 + 馬、史進 + 馬、穆弘 + 馬 )
現在大家應該清楚了吧。接下來看看幾個FlatMap的實例。
Scala語言的實現
Scala本身對於List類型就有map和flatMap操作。舉例如下:
val names = List("Alice","James","Apple") val strings = names.map(x => x.toUpperCase) println(strings) // 輸出 List(ALICE, JAMES, APPLE) val chars = names.flatMap(x=> x.toUpperCase()) println(chars) // 輸出 List(A, L, I, C, E, J, A, M, E, S, A, P, P, L, E)
Flink的例子
以上是scala語言層面的實現。下面我們看看Flink框架是如何使用FlatMap的。
網上常見的一個Flink應用的例子:
//載入數據源 val source = env.fromElements("china is the best country","beijing is the capital of china") //轉化處理數據 val ds = source.flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1)
Flink源碼中的例子
case class WordWithCount(word: String, count: Long) val text = env.socketTextStream(host, port, 'n') val windowCounts = text.flatMap { w => w.split("\s") } .map { w => WordWithCount(w, 1) } .keyBy("word") .timeWindow(Time.seconds(5)) .sum("count") windowCounts.print()
0x02 自定義運算元(in Flink)
上面提到的都是簡單的使用,如果有複雜需求,在Flink中,我們可以通過繼承FlatMapFunction和RichFlatMapFunction來自定義運算元。
函數類FlatMapFunction
對於不涉及到狀態的使用,可以直接繼承 FlatMapFunction,其定義如下:
@Public @FunctionalInterface public interface FlatMapFunction<T, O> extends Function, Serializable { void flatMap(T value, Collector<O> out) throws Exception; }
如何自定義運算元呢,這個可以直接看看Flink中的官方例子
// FlatMapFunction that tokenizes a String by whitespace characters and emits all String tokens. public class Tokenizer implements FlatMapFunction<String, String> { @Override public void flatMap(String value, Collector<String> out) { for (String token : value.split("\W")) { out.collect(token); } } } // [...] DataSet<String> textLines = // [...] DataSet<String> words = textLines.flatMap(new Tokenizer());
Rich函數類RichFlatMapFunction
對於涉及到狀態的情況,用戶可以使用繼承 RichFlatMapFunction 類的方式來實現UDF。
RichFlatMapFunction屬於Flink的Rich函數類。從名稱上來看,這種函數類在普通的函數類上增加了Rich前綴,比如RichMapFunction
、RichFlatMapFunction
或RichReduceFunction
等等。比起普通的函數類,Rich函數類增加了:
open()
方法:Flink在運算元調用前會執行這個方法,可以用來進行一些初始化工作。close()
方法:Flink在運算元最後一次調用結束後執行這個方法,可以用來釋放一些資源。getRuntimeContext
方法:獲取運行時上下文。每個並行的運算元子任務都有一個運行時上下文,上下文記錄了這個運算元運行過程中的一些資訊,包括運算元當前的並行度、運算元子任務序號、廣播數據、累加器、監控數據。最重要的是,我們可以從上下文里獲取狀態數據。
FlatMap對應的RichFlatMapFunction如下:
@Public public abstract class RichFlatMapFunction<IN, OUT> extends AbstractRichFunction implements FlatMapFunction<IN, OUT> { @Override public abstract void flatMap(IN value, Collector<OUT> out) throws Exception; }
其基類 AbstractRichFunction 如下,可以看到主要是和運行時上下文建立了聯繫,並且有初始化和退出操作:
@Public public abstract class AbstractRichFunction implements RichFunction, Serializable { private transient RuntimeContext runtimeContext; @Override public void setRuntimeContext(RuntimeContext t) { this.runtimeContext = t; } @Override public RuntimeContext getRuntimeContext() { return this.runtimeContext; } @Override public IterationRuntimeContext getIterationRuntimeContext() { if (this.runtimeContext instanceof IterationRuntimeContext) { return (IterationRuntimeContext) this.runtimeContext; } } @Override public void open(Configuration parameters) throws Exception {} @Override public void close() throws Exception {} }
如何最好的使用? 當然還是官方文檔和例子最靠譜。
因為涉及到狀態,所以如果使用,你必須創建一個 StateDescriptor
,才能得到對應的狀態句柄。 這保存了狀態名稱(你可以創建多個狀態,並且它們必須具有唯一的名稱以便可以引用它們),狀態所持有值的類型,並且可能包含用戶指定的函數,例如ReduceFunction
。 根據不同的狀態類型,可以創建ValueStateDescriptor
,ListStateDescriptor
, ReducingStateDescriptor
,FoldingStateDescriptor
或 MapStateDescriptor
。
狀態通過 RuntimeContext
進行訪問,因此只能在 rich functions 中使用。 但是我們也會看到一個例子。RichFunction
中 RuntimeContext
提供如下方法:
ValueState getState(ValueStateDescriptor)
ReducingState getReducingState(ReducingStateDescriptor)
ListState getListState(ListStateDescriptor)
AggregatingState getAggregatingState(AggregatingStateDescriptor)
FoldingState getFoldingState(FoldingStateDescriptor)
MapState getMapState(MapStateDescriptor)
下面是一個 FlatMapFunction
的例子,展示了如何將這些部分組合起來:
class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] { private var sum: ValueState[(Long, Long)] = _ override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = { // access the state value val tmpCurrentSum = sum.value // If it hasn't been used before, it will be null val currentSum = if (tmpCurrentSum != null) { tmpCurrentSum } else { (0L, 0L) } // update the count val newSum = (currentSum._1 + 1, currentSum._2 + input._2) // update the state sum.update(newSum) // if the count reaches 2, emit the average and clear the state if (newSum._1 >= 2) { out.collect((input._1, newSum._2 / newSum._1)) sum.clear() } } override def open(parameters: Configuration): Unit = { sum = getRuntimeContext.getState( new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)]) ) } } object ExampleCountWindowAverage extends App { val env = StreamExecutionEnvironment.getExecutionEnvironment env.fromCollection(List( (1L, 3L), (1L, 5L), (1L, 7L), (1L, 4L), (1L, 2L) )).keyBy(_._1) .flatMap(new CountWindowAverage()) .print() // the printed output will be (1,4) and (1,5) env.execute("ExampleManagedState") }
這個例子實現了一個簡單的計數窗口。 我們把元組的第一個元素當作 key(在示例中都 key 都是 「1」)。 該函數將出現的次數以及總和存儲在 「ValueState」 中。 一旦出現次數達到 2,則將平均值發送到下游,並清除狀態重新開始。 請注意,我們會為每個不同的 key(元組中第一個元素)保存一個單獨的值。
0x03 從Flink源碼入手看FlatMap實現
FlatMap從Flink編程模型角度講屬於一個運算元,用來對數據流或者數據集進行轉換。從框架角度說,FlatMap是怎麼實現的呢? 或者說FlatMap是怎麼從用戶程式碼轉換到Flink運行時呢 ?
1. DataSet
首先說說 DataSet相關這套系統中FlatMap的實現。
請注意,DataSteam對應的那套系統中,operator名字都是帶著Stream的,比如StreamOperator。
DataSet
val ds = source.flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1)
這段程式碼調用的就是DataSet中的API。具體如下:
public abstract class DataSet<T> { public <R> FlatMapOperator<T, R> flatMap(FlatMapFunction<T, R> flatMapper) { String callLocation = Utils.getCallLocationName(); TypeInformation<R> resultType = TypeExtractor.getFlatMapReturnTypes(flatMapper, getType(), callLocation, true); return new FlatMapOperator<>(this, resultType, clean(flatMapper), callLocation); } }
FlatMapOperator
可以看出,flatMap @ DataSet
主要就是生成了一個FlatMapOperator,這個可以理解為是邏輯運算元。其定義如下:
public class FlatMapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, FlatMapOperator<IN, OUT>> { protected final FlatMapFunction<IN, OUT> function; protected final String defaultName; public FlatMapOperator(DataSet<IN> input, TypeInformation<OUT> resultType, FlatMapFunction<IN, OUT> function, String defaultName) { super(input, resultType); this.function = function; this.defaultName = defaultName; } @Override protected FlatMapFunction<IN, OUT> getFunction() { return function; } // 這個translateToDataFlow就是生成計劃(Plan)的關鍵程式碼 @Override protected FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN, OUT>> translateToDataFlow(Operator<IN> input) { String name = getName() != null ? getName() : "FlatMap at " + defaultName; // create operator FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN, OUT>> po = new FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN, OUT>>(function, new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name); // set input po.setInput(input); // set parallelism if (this.getParallelism() > 0) { // use specified parallelism po.setParallelism(this.getParallelism()); } else { // if no parallelism has been specified, use parallelism of input operator to enable chaining po.setParallelism(input.getParallelism()); } return po; } }
FlatMapOperator的基類如下:
public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOperator<IN, OUT, O>> extends SingleInputOperator<IN, OUT, O> implements UdfOperator<O> { } // Base class for operations that operates on a single input data set. public abstract class SingleInputOperator<IN, OUT, O extends SingleInputOperator<IN, OUT, O>> extends Operator<OUT, O> { private final DataSet<IN> input; }
生成計劃
DataSet API所編寫的批處理程式跟DataStream API所編寫的流處理程式在生成作業圖(JobGraph)之前的實現差別很大。流處理程式是生成流圖(StreamGraph),而批處理程式是生成計劃(Plan)並由優化器對其進行優化並生成優化後的計劃(OptimizedPlan)。
計劃(Plan)以數據流(dataflow)的形式來表示批處理程式,但它只是批處理程式最初的表示,在一個批處理程式生成作業圖之前,計劃還會被進行優化以產生更高效的方案。Plan不同於流圖(StreamGraph),它以sink為入口,因為一個批處理程式可能存在若干個sink,所以Plan採用集合來存儲它。另外Plan還封裝了批處理作業的一些基本屬性:jobId、jobName以及defaultParallelism等。
生成Plan的核心部件是運算元翻譯器(OperatorTranslation),createProgramPlan方法通過它來」翻譯「出計劃,核心程式碼如下
public class OperatorTranslation { // 接收每個需遍歷的DataSink對象,然後將其轉換成GenericDataSinkBase對象 public Plan translateToPlan(List<DataSink<?>> sinks, String jobName) { List<GenericDataSinkBase<?>> planSinks = new ArrayList<>(); //遍歷sinks集合 for (DataSink<?> sink : sinks) { //將翻譯生成的GenericDataSinkBase加入planSinks集合*,對每個sink進行」翻譯「 planSinks.add(translate(sink)); } //以planSins集合構建Plan對象 Plan p = new Plan(planSinks); p.setJobName(jobName); return p; } private <I, O> org.apache.flink.api.common.operators.Operator<O> translateSingleInputOperator(SingleInputOperator<?, ?, ?> op) { //會調用到 FlatMapOperator 的 translateToDataFlow org.apache.flink.api.common.operators.Operator<O> dataFlowOp = typedOp.translateToDataFlow(input); } }
FlatMapOperatorBase就是生成的plan中的一員。
public class FlatMapOperatorBase<IN, OUT, FT extends FlatMapFunction<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> { @Override protected List<OUT> executeOnCollections(List<IN> input, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception { FlatMapFunction<IN, OUT> function = userFunction.getUserCodeObject(); FunctionUtils.setFunctionRuntimeContext(function, ctx); FunctionUtils.openFunction(function, parameters); ArrayList<OUT> result = new ArrayList<OUT>(input.size()); TypeSerializer<IN> inSerializer = getOperatorInfo().getInputType().createSerializer(executionConfig); TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig); CopyingListCollector<OUT> resultCollector = new CopyingListCollector<OUT>(result, outSerializer); for (IN element : input) { IN inCopy = inSerializer.copy(element); function.flatMap(inCopy, resultCollector); } FunctionUtils.closeFunction(function); return result; } }
而最後優化時候,則FlatMapOperatorBase會被優化成FlatMapNode。
public class GraphCreatingVisitor implements Visitor<Operator<?>> { public boolean preVisit(Operator<?> c) { else if (c instanceof FlatMapOperatorBase) { n = new FlatMapNode((FlatMapOperatorBase<?, ?, ?>) c); } } }
自此,FlatMap就被組合到 DataSet的 OptimizedPlan 中。下一步Flink會依據OptimizedPlan來生成 JobGraph。
作業圖(JobGraph)是唯一被Flink的數據流引擎所識別的表述作業的數據結構,也正是這一共同的抽象體現了流處理和批處理在運行時的統一。至此就完成了從用戶業務程式碼到Flink運行系統的轉化。
在運行狀態下,如果上游有數據流入,則FlatMap這個運算元就會發揮作用。
2. DataStream
對於DataStream,則是另外一套體系結構。首先我們找一個使用DataStream的例子看看。
//獲取數據: 從socket中獲取 val textDataStream = env.socketTextStream("127.0.0.1", 8888, 'n') val tupDataStream = textDataStream.flatMap(_.split(" ")).map(WordWithCount(_,1)) //groupby: 按照指定的欄位聚合 val windowDstram = tupDataStream.keyBy("word").timeWindow(Time.seconds(5),Time.seconds(1)) windowDstram.sum("count").print()
上面例子中,flatMap 調用的是DataStream中的API,具體如下:
public class DataStream<T> { public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) { //clean函數用來移除FlatMapFunction類對象的外部類部分,這樣就可以進行序列化 //getType用來獲取類對象的輸出類型 TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper), getType(), Utils.getCallLocationName(), true); return flatMap(flatMapper, outType); } // 構建了一個StreamFlatMap的Operator public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper, TypeInformation<R> outputType) { return transform("Flat Map", outputType, new StreamFlatMap<>(clean(flatMapper))); } // 依次調用下去 @PublicEvolving public <R> SingleOutputStreamOperator<R> transform( String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) { return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator)); } protected <R> SingleOutputStreamOperator<R> doTransform( String operatorName, TypeInformation<R> outTypeInfo, StreamOperatorFactory<R> operatorFactory) { // read the output type of the input Transform to coax out errors about MissingTypeInfo transformation.getOutputType(); // 構建Transform對象,Transform對象中包含其上游Transform對象,這樣上游下游就串成了一個Transform鏈。 OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>( this.transformation, operatorName, operatorFactory, outTypeInfo, environment.getParallelism()); @SuppressWarnings({"unchecked", "rawtypes"}) SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform); // 將這Transform對象放入env的transform對象列表。 getExecutionEnvironment().addOperator(resultTransform); // 返迴流 return returnStream; } }
上面源碼中的幾個概念需要澄清。
Transformation:首先,FlatMap在FLink編程模型中是運算元API,在DataStream中會生成一個Transformation,即邏輯運算元。
邏輯運算元Transformation最後會對應到物理運算元Operator,這個概念對應的就是StreamOperator。
StreamOperator:DataStream 上的每一個 Transformation 都對應了一個 StreamOperator,StreamOperator是運行時的具體實現,會決定UDF(User-Defined Funtion)的調用方式。
processElement()
方法也是UDF的邏輯被調用的地方,例如FlatMapFunction
里的flatMap()
方法。
public class StreamFlatMap<IN, OUT> extends AbstractUdfStreamOperator<OUT, FlatMapFunction<IN, OUT>> implements OneInputStreamOperator<IN, OUT> { private transient TimestampedCollector<OUT> collector; public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper) { super(flatMapper); chainingStrategy = ChainingStrategy.ALWAYS; } @Override public void open() throws Exception { super.open(); collector = new TimestampedCollector<>(output); } @Override public void processElement(StreamRecord<IN> element) throws Exception { collector.setTimestamp(element); // 調用用戶定義的FlatMap userFunction.flatMap(element.getValue(), collector); } }
我們可以看到,StreamFlatMap繼承了AbstractUdfStreamOperator,從而間接繼承了StreamOperator。
public abstract class AbstractStreamOperator<OUT> implements StreamOperator<OUT>, SetupableStreamOperator<OUT>, Serializable { }
StreamOperator是根介面。對於 Streaming 來說所有的運算元都繼承自 StreamOperator。繼承了StreamOperator的擴展介面則有OneInputStreamOperator,TwoInputStreamOperator。實現了StreamOperator的抽象類有AbstractStreamOperator以及它的子類AbstractUdfStreamOperator。
從 API 到 邏輯運算元 Transformation,再到 物理運算元Operator,就生成了 StreamGraph。下一步Flink會依據StreamOperator來生成 JobGraph。
作業圖(JobGraph)是唯一被Flink的數據流引擎所識別的表述作業的數據結構,也正是這一共同的抽象體現了流處理和批處理在運行時的統一。至此就完成了從用戶業務程式碼到Flink運行系統的轉化。
0x04 參考
【Flink】Flink基礎之實現WordCount程式(Java與Scala版本)
Flink進階教程:以flatMap為例,如何進行運算元自定義