flink系列(7)-streamGraph

  • 2019 年 10 月 4 日
  • 筆記

StreamGraph是flink四層執行圖中的第一層圖,代碼在org.apache.flink.streaming.api.graph包中,第一層graph主要做的事情是將所有的stransformation添加到DAG中,並設置並行度,設置slot槽位

具體涉及到的transformation大概有11個,繼承圖如下

首先我們來看一下如何獲取StreamTransformation

public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo) {    		if (typeInfo == null) {  			if (function instanceof ResultTypeQueryable) {  				typeInfo = ((ResultTypeQueryable<OUT>) function).getProducedType();  			} else {  				try {  					typeInfo = TypeExtractor.createTypeInfo(  							SourceFunction.class,  							function.getClass(), 0, null, null);  				} catch (final InvalidTypesException e) {  					typeInfo = (TypeInformation<OUT>) new MissingTypeInfo(sourceName, e);  				}  			}  		}    		boolean isParallel = function instanceof ParallelSourceFunction;    		clean(function);  		StreamSource<OUT, ?> sourceOperator;  		if (function instanceof StoppableFunction) {  			sourceOperator = new StoppableStreamSource<>(cast2StoppableSourceFunction(function));  		} else {  			sourceOperator = new StreamSource<>(function);  		}    		return new DataStreamSource<>(this, typeInfo, sourceOperator, isParallel, sourceName);  	}

最終返回的是DataStreamSource,內部封裝了SourceTransformation,下面看一下DataStream的類圖結構

可以看到DataStreamSource是DataStream的子類

DataStreamSource是DataStream的數據流抽象,StreamSource是StreamOperator的抽象,在 flink 中一個 DataStream 封裝了一次數據流轉換,一個 StreamOperator 封裝了一個函數接口,比如 map、reduce、keyBy等。下面我們在看一下StreamOperator的類圖關係

可以看到StreamMap/StreamFlatMap都是operator的子類,下面來看一段具體生成operator和transformation的代碼

/**  	 * Applies a Map transformation on a {@link DataStream}. The transformation  	 * calls a {@link MapFunction} for each element of the DataStream. Each  	 * MapFunction call returns exactly one element. The user can also extend  	 * {@link RichMapFunction} to gain access to other features provided by the  	 * {@link org.apache.flink.api.common.functions.RichFunction} interface.  	 *  	 * @param mapper  	 *            The MapFunction that is called for each element of the  	 *            DataStream.  	 * @param <R>  	 *            output type  	 * @return The transformed {@link DataStream}.  	 */  	public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {    		TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(),  				Utils.getCallLocationName(), true);    		return transform("Map", outType, new StreamMap<>(clean(mapper)));  	}    /**  	 * Method for passing user defined operators along with the type  	 * information that will transform the DataStream.  	 *  	 * @param operatorName  	 *            name of the operator, for logging purposes  	 * @param outTypeInfo  	 *            the output type of the operator  	 * @param operator  	 *            the object containing the transformation logic  	 * @param <R>  	 *            type of the return stream  	 * @return the data stream constructed  	 */  	@PublicEvolving  	public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {    		// read the output type of the input Transform to coax out errors about MissingTypeInfo  		transformation.getOutputType();    		OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(  				this.transformation,  				operatorName,  				operator,  				outTypeInfo,  				environment.getParallelism());    		@SuppressWarnings({ "unchecked", "rawtypes" })  		SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);    		getExecutionEnvironment().addOperator(resultTransform);    		return returnStream;  	}

到這裡基本說完了DataStream和StreamOperator,包含transformation的產生,DataStream的操作等,下一篇我們在來說一下transformation的轉換