說說Flink DataStream的八種物理分區邏輯
- 2019 年 12 月 26 日
- 筆記
By 大數據技術與架構
場景描述:Spark的RDD有分區的概念,Flink的DataStream同樣也有,只不過沒有RDD那麼顯式而已。Flink通過流分區器StreamPartitioner來控制DataStream中的元素往下游的流向。
Spark的RDD有分區的概念,Flink的DataStream同樣也有,只不過沒有RDD那麼顯式而已。Flink通過流分區器StreamPartitioner來控制DataStream中的元素往下游的流向,以StreamPartitioner抽象類為中心的類圖如下所示。

在Flink的Web UI界面中,各算子之間的分區器類型會在箭頭上標註出來,如下所示。

StreamPartitioner繼承自ChannelSelector接口。這裡的Channel概念與Netty不同,只是Flink對於數據寫入目的地的簡單抽象,我們可以直接認為它就是下游算子的並發實例(即物理分區)。所有StreamPartitioner的子類都要實現selectChannel()方法,用來選擇分區號。下面分別來看看Flink提供的8種StreamPartitioner的源碼。
GlobalPartitioner
// dataStream.global() @Override public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { return 0; }
GlobalPartitioner只會將數據輸出到下游算子的第一個實例,簡單暴力。
ShufflePartitioner
private Random random = new Random(); // dataStream.shuffle() @Override public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { return random.nextInt(numberOfChannels); }
ShufflePartitioner會將數據隨機輸出到下游算子的並發實例。由於java.util.Random生成的隨機數符合均勻分佈,故能夠近似保證平均。
RebalancePartitioner
private int nextChannelToSendTo; @Override public void setup(int numberOfChannels) { super.setup(numberOfChannels); nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels); } // dataStream.rebalance() @Override public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels; return nextChannelToSendTo; }
RebalancePartitioner會先隨機選擇一個下游算子的實例,然後用輪詢(round-robin)的方式從該實例開始循環輸出。該方式能保證完全的下游負載均衡,所以常用來處理有傾斜的原數據流。
KeyGroupStreamPartitioner
private final KeySelector<T, K> keySelector; private int maxParallelism; // dataStream.keyBy() @Override public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { K key; try { key = keySelector.getKey(record.getInstance().getValue()); } catch (Exception e) { throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e); } return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels); } public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) { return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism)); } public static int assignToKeyGroup(Object key, int maxParallelism) { return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism); } public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) { return MathUtils.murmurHash(keyHash) % maxParallelism; } public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) { return keyGroupId * parallelism / maxParallelism; }
這就是keyBy()算子底層所採用的StreamPartitioner,可見是先在key值的基礎上經過了兩重哈希得到key對應的哈希值,第一重是Java自帶的hashCode(),第二重則是MurmurHash。然後將哈希值乘以算子並行度,併除以最大並行度,得到最終的分區ID。
BroadcastPartitioner
// dataStream.broadcast() @Override public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { throw new UnsupportedOperationException("Broadcast partitioner does not support select channels."); } @Override public boolean isBroadcast() { return true; }
BroadcastPartitioner是廣播流專用的分區器。由於廣播流發揮作用必須靠DataStream.connect()方法與正常的數據流連接起來,所以實際上不需要BroadcastPartitioner來選擇分區(廣播數據總會投遞給下游算子的所有並發),selectChannel()方法也就不必實現了。細節請參見Flink中BroadcastStream相關的源碼,這裡就不再列舉了。
RescalePartitioner
private int nextChannelToSendTo = -1; // dataStream.rescale() @Override public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { if (++nextChannelToSendTo >= numberOfChannels) { nextChannelToSendTo = 0; } return nextChannelToSendTo; }
這個看起來也太簡單了,並且與RebalancePartitioner的邏輯是相同的?實際上並不是。我們看看StreamingJobGraphGenerator類,它負責把Flink執行計劃中的StreamGraph(邏輯執行計劃)轉換為JobGraph(優化的邏輯執行計劃)。其connect()方法中有如下代碼。
if (partitioner instanceof ForwardPartitioner || partitioner instanceof RescalePartitioner) { jobEdge = downStreamVertex.connectNewDataSetAsInput( headVertex, DistributionPattern.POINTWISE, resultPartitionType); } else { jobEdge = downStreamVertex.connectNewDataSetAsInput( headVertex, DistributionPattern.ALL_TO_ALL, resultPartitionType);
粗略地講,如果分區邏輯是RescalePartitioner或ForwardPartitioner(下面會說),那麼採用POINTWISE模式來連接上下游的頂點,對於其他分區邏輯,都用ALL_TO_ALL模式來連接。看下面兩張圖會比較容易理解。


也就是說,POINTWISE模式的RescalePartitioner在中間結果傳送給下游節點時,會根據並行度的比值來輪詢分配給下游算子實例的子集,對TaskManager來說本地性會比較好。而ALL_TO_ALL模式的RebalancePartitioner是真正的全局輪詢分配,更加均衡,但是就會不可避免地在節點之間交換數據,如果數據量大的話,造成的網絡流量會很可觀。
ForwardPartitioner
// dataStream.forward() @Override public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { return 0; }
與GlobalPartitioner的實現相同。但通過上面對POINTWISE和ALL_TO_ALL連接模式的講解,我們能夠知道,它會將數據輸出到本地運行的下游算子的第一個實例,而非全局。在上下游算子的並行度相同的情況下,默認就會採用ForwardPartitioner。反之,若上下游算子的並行度不同,默認會採用前述的RebalancePartitioner。
CustomPartitionerWrapper
Partitioner<K> partitioner; KeySelector<T, K> keySelector; // dataStream.partitionCustom() @Override public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { K key; try { key = keySelector.getKey(record.getInstance().getValue()); } catch (Exception e) { throw new RuntimeException("Could not extract key from " + record.getInstance(), e); } return partitioner.partition(key, numberOfChannels); }
這就是自定義的分區邏輯了,我們可以通過繼承Partitioner接口自己實現,並傳入partitionCustom()方法。舉個簡單的栗子,以key的長度做分區:
sourceStream.partitionCustom(new Partitioner<String>() { @Override public int partition(String key, int numPartitions) { return key.length() % numPartitions; } }, 0);