flink系列(8)-streamGraph

  • 2019 年 10 月 4 日
  • 筆記

上一篇我们说完了transformation的产生,这里来具体说一下如何产生streamGraph,下面先来看一下主要的产生逻辑

private Collection<Integer> transform(StreamTransformation<?> transform) {    		if (alreadyTransformed.containsKey(transform)) {  			return alreadyTransformed.get(transform);  		}    		LOG.debug("Transforming " + transform);    		if (transform.getMaxParallelism() <= 0) {    			// if the max parallelism hasn't been set, then first use the job wide max parallelism  			// from theExecutionConfig.  			int globalMaxParallelismFromConfig = env.getConfig().getMaxParallelism();  			if (globalMaxParallelismFromConfig > 0) {  				transform.setMaxParallelism(globalMaxParallelismFromConfig);  			}  		}    		// call at least once to trigger exceptions about MissingTypeInfo  		transform.getOutputType();    		Collection<Integer> transformedIds;  		if (transform instanceof OneInputTransformation<?, ?>) {  			transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);  		} else if (transform instanceof TwoInputTransformation<?, ?, ?>) {  			transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);  		} else if (transform instanceof SourceTransformation<?>) {  			transformedIds = transformSource((SourceTransformation<?>) transform);  		} else if (transform instanceof SinkTransformation<?>) {  			transformedIds = transformSink((SinkTransformation<?>) transform);  		} else if (transform instanceof UnionTransformation<?>) {  			transformedIds = transformUnion((UnionTransformation<?>) transform);  		} else if (transform instanceof SplitTransformation<?>) {  			transformedIds = transformSplit((SplitTransformation<?>) transform);  		} else if (transform instanceof SelectTransformation<?>) {  			transformedIds = transformSelect((SelectTransformation<?>) transform);  		} else if (transform instanceof FeedbackTransformation<?>) {  			transformedIds = transformFeedback((FeedbackTransformation<?>) transform);  		} else if (transform instanceof CoFeedbackTransformation<?>) {  			transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);  		} else if (transform instanceof PartitionTransformation<?>) {  			transformedIds = transformPartition((PartitionTransformation<?>) transform);  		} else if (transform instanceof SideOutputTransformation<?>) {  			transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);  		} else {  			throw new IllegalStateException("Unknown transformation: " + transform);  		}    		// need this check because the iterate transformation adds itself before  		// transforming the feedback edges  		if (!alreadyTransformed.containsKey(transform)) {  			alreadyTransformed.put(transform, transformedIds);  		}    		if (transform.getBufferTimeout() >= 0) {  			streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());  		}  		if (transform.getUid() != null) {  			streamGraph.setTransformationUID(transform.getId(), transform.getUid());  		}  		if (transform.getUserProvidedNodeHash() != null) {  			streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());  		}    		if (transform.getMinResources() != null && transform.getPreferredResources() != null) {  			streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());  		}    		return transformedIds;  	}

这里是对dataStream操作产生的transformation进行转换,构造出streamGraph,其中主要是对11种transformation进行转换,结合入口处的循环,这里对于每种transformation都进行递归,或者上游的transformaion id,设置并发度,下面我们以最常见的OneInputTransform为例,说一下具体的转换

private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {         //递归获取上游的id  		Collection<Integer> inputIds = transform(transform.getInput());    		// the recursive call might have already transformed this  		if (alreadyTransformed.containsKey(transform)) {  			return alreadyTransformed.get(transform);  		}    		String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);          //添加operator  		streamGraph.addOperator(transform.getId(),  				slotSharingGroup,  				transform.getCoLocationGroupKey(),  				transform.getOperator(),  				transform.getInputType(),  				transform.getOutputType(),  				transform.getName());          //设置partition的分区key  		if (transform.getStateKeySelector() != null) {  			TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(env.getConfig());  			streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);  		}          //设置并发度和最大的并发度  		streamGraph.setParallelism(transform.getId(), transform.getParallelism());  		streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());    		for (Integer inputId: inputIds) {  			streamGraph.addEdge(inputId, transform.getId(), 0);  		}    		return Collections.singleton(transform.getId());  	}

在addEdge中,对于union,select,split 不会添加边,只会创建虚拟节点或在上有节点添加 selector,相应的transformation包括unionTransformation和SplitTransformation和SelectTransformation和PartitionTransformation只是添加虚拟的node