flink系列(8)-streamGraph

  • 2019 年 10 月 4 日
  • 筆記

上一篇我們說完了transformation的產生,這裡來具體說一下如何產生streamGraph,下面先來看一下主要的產生邏輯

private Collection<Integer> transform(StreamTransformation<?> transform) {    		if (alreadyTransformed.containsKey(transform)) {  			return alreadyTransformed.get(transform);  		}    		LOG.debug("Transforming " + transform);    		if (transform.getMaxParallelism() <= 0) {    			// if the max parallelism hasn't been set, then first use the job wide max parallelism  			// from theExecutionConfig.  			int globalMaxParallelismFromConfig = env.getConfig().getMaxParallelism();  			if (globalMaxParallelismFromConfig > 0) {  				transform.setMaxParallelism(globalMaxParallelismFromConfig);  			}  		}    		// call at least once to trigger exceptions about MissingTypeInfo  		transform.getOutputType();    		Collection<Integer> transformedIds;  		if (transform instanceof OneInputTransformation<?, ?>) {  			transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);  		} else if (transform instanceof TwoInputTransformation<?, ?, ?>) {  			transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);  		} else if (transform instanceof SourceTransformation<?>) {  			transformedIds = transformSource((SourceTransformation<?>) transform);  		} else if (transform instanceof SinkTransformation<?>) {  			transformedIds = transformSink((SinkTransformation<?>) transform);  		} else if (transform instanceof UnionTransformation<?>) {  			transformedIds = transformUnion((UnionTransformation<?>) transform);  		} else if (transform instanceof SplitTransformation<?>) {  			transformedIds = transformSplit((SplitTransformation<?>) transform);  		} else if (transform instanceof SelectTransformation<?>) {  			transformedIds = transformSelect((SelectTransformation<?>) transform);  		} else if (transform instanceof FeedbackTransformation<?>) {  			transformedIds = transformFeedback((FeedbackTransformation<?>) transform);  		} else if (transform instanceof CoFeedbackTransformation<?>) {  			transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);  		} else if (transform instanceof PartitionTransformation<?>) {  			transformedIds = transformPartition((PartitionTransformation<?>) transform);  		} else if (transform instanceof SideOutputTransformation<?>) {  			transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);  		} else {  			throw new IllegalStateException("Unknown transformation: " + transform);  		}    		// need this check because the iterate transformation adds itself before  		// transforming the feedback edges  		if (!alreadyTransformed.containsKey(transform)) {  			alreadyTransformed.put(transform, transformedIds);  		}    		if (transform.getBufferTimeout() >= 0) {  			streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());  		}  		if (transform.getUid() != null) {  			streamGraph.setTransformationUID(transform.getId(), transform.getUid());  		}  		if (transform.getUserProvidedNodeHash() != null) {  			streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());  		}    		if (transform.getMinResources() != null && transform.getPreferredResources() != null) {  			streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());  		}    		return transformedIds;  	}

這裡是對dataStream操作產生的transformation進行轉換,構造出streamGraph,其中主要是對11種transformation進行轉換,結合入口處的循環,這裡對於每種transformation都進行遞歸,或者上游的transformaion id,設置並發度,下面我們以最常見的OneInputTransform為例,說一下具體的轉換

private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {         //遞歸獲取上游的id  		Collection<Integer> inputIds = transform(transform.getInput());    		// the recursive call might have already transformed this  		if (alreadyTransformed.containsKey(transform)) {  			return alreadyTransformed.get(transform);  		}    		String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);          //添加operator  		streamGraph.addOperator(transform.getId(),  				slotSharingGroup,  				transform.getCoLocationGroupKey(),  				transform.getOperator(),  				transform.getInputType(),  				transform.getOutputType(),  				transform.getName());          //設置partition的分區key  		if (transform.getStateKeySelector() != null) {  			TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(env.getConfig());  			streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);  		}          //設置並發度和最大的並發度  		streamGraph.setParallelism(transform.getId(), transform.getParallelism());  		streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());    		for (Integer inputId: inputIds) {  			streamGraph.addEdge(inputId, transform.getId(), 0);  		}    		return Collections.singleton(transform.getId());  	}

在addEdge中,對於union,select,split 不會添加邊,只會創建虛擬節點或在上有節點添加 selector,相應的transformation包括unionTransformation和SplitTransformation和SelectTransformation和PartitionTransformation只是添加虛擬的node