說說Flink DataStream的八種物理分區邏輯

  • 2019 年 12 月 26 日
  • 筆記

By 大數據技術與架構

場景描述:Spark的RDD有分區的概念,Flink的DataStream同樣也有,只不過沒有RDD那麼顯式而已。Flink通過流分區器StreamPartitioner來控制DataStream中的元素往下游的流向。

Spark的RDD有分區的概念,Flink的DataStream同樣也有,只不過沒有RDD那麼顯式而已。Flink通過流分區器StreamPartitioner來控制DataStream中的元素往下游的流向,以StreamPartitioner抽象類為中心的類圖如下所示。

在Flink的Web UI界面中,各算子之間的分區器類型會在箭頭上標註出來,如下所示。

StreamPartitioner繼承自ChannelSelector接口。這裡的Channel概念與Netty不同,只是Flink對於數據寫入目的地的簡單抽象,我們可以直接認為它就是下游算子的並發實例(即物理分區)。所有StreamPartitioner的子類都要實現selectChannel()方法,用來選擇分區號。下面分別來看看Flink提供的8種StreamPartitioner的源碼。

GlobalPartitioner
    // dataStream.global()      @Override      public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {          return 0;      }

GlobalPartitioner只會將數據輸出到下游算子的第一個實例,簡單暴力。

ShufflePartitioner
    private Random random = new Random();      // dataStream.shuffle()      @Override      public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {          return random.nextInt(numberOfChannels);      }

ShufflePartitioner會將數據隨機輸出到下游算子的並發實例。由於java.util.Random生成的隨機數符合均勻分佈,故能夠近似保證平均。

RebalancePartitioner
    private int nextChannelToSendTo;        @Override      public void setup(int numberOfChannels) {          super.setup(numberOfChannels);          nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels);      }      // dataStream.rebalance()      @Override      public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {          nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;          return nextChannelToSendTo;      }

RebalancePartitioner會先隨機選擇一個下游算子的實例,然後用輪詢(round-robin)的方式從該實例開始循環輸出。該方式能保證完全的下游負載均衡,所以常用來處理有傾斜的原數據流。

KeyGroupStreamPartitioner
    private final KeySelector<T, K> keySelector;      private int maxParallelism;      // dataStream.keyBy()      @Override      public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {          K key;          try {              key = keySelector.getKey(record.getInstance().getValue());          } catch (Exception e) {              throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);          }          return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels);      }        public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {          return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));      }        public static int assignToKeyGroup(Object key, int maxParallelism) {          return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);      }        public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {          return MathUtils.murmurHash(keyHash) % maxParallelism;      }        public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {          return keyGroupId * parallelism / maxParallelism;      }

這就是keyBy()算子底層所採用的StreamPartitioner,可見是先在key值的基礎上經過了兩重哈希得到key對應的哈希值,第一重是Java自帶的hashCode(),第二重則是MurmurHash。然後將哈希值乘以算子並行度,併除以最大並行度,得到最終的分區ID。

BroadcastPartitioner
    // dataStream.broadcast()      @Override      public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {          throw new UnsupportedOperationException("Broadcast partitioner does not support select channels.");      }        @Override      public boolean isBroadcast() {          return true;      }

BroadcastPartitioner是廣播流專用的分區器。由於廣播流發揮作用必須靠DataStream.connect()方法與正常的數據流連接起來,所以實際上不需要BroadcastPartitioner來選擇分區(廣播數據總會投遞給下游算子的所有並發),selectChannel()方法也就不必實現了。細節請參見Flink中BroadcastStream相關的源碼,這裡就不再列舉了。

RescalePartitioner
    private int nextChannelToSendTo = -1;      // dataStream.rescale()      @Override      public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {          if (++nextChannelToSendTo >= numberOfChannels) {              nextChannelToSendTo = 0;          }          return nextChannelToSendTo;      }

這個看起來也太簡單了,並且與RebalancePartitioner的邏輯是相同的?實際上並不是。我們看看StreamingJobGraphGenerator類,它負責把Flink執行計劃中的StreamGraph(邏輯執行計劃)轉換為JobGraph(優化的邏輯執行計劃)。其connect()方法中有如下代碼。

        if (partitioner instanceof ForwardPartitioner || partitioner instanceof RescalePartitioner) {              jobEdge = downStreamVertex.connectNewDataSetAsInput(                  headVertex,                  DistributionPattern.POINTWISE,                  resultPartitionType);          } else {              jobEdge = downStreamVertex.connectNewDataSetAsInput(                      headVertex,                      DistributionPattern.ALL_TO_ALL,                      resultPartitionType);

粗略地講,如果分區邏輯是RescalePartitioner或ForwardPartitioner(下面會說),那麼採用POINTWISE模式來連接上下游的頂點,對於其他分區邏輯,都用ALL_TO_ALL模式來連接。看下面兩張圖會比較容易理解。

也就是說,POINTWISE模式的RescalePartitioner在中間結果傳送給下游節點時,會根據並行度的比值來輪詢分配給下游算子實例的子集,對TaskManager來說本地性會比較好。而ALL_TO_ALL模式的RebalancePartitioner是真正的全局輪詢分配,更加均衡,但是就會不可避免地在節點之間交換數據,如果數據量大的話,造成的網絡流量會很可觀。

ForwardPartitioner
   // dataStream.forward()     @Override     public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {         return 0;     }

與GlobalPartitioner的實現相同。但通過上面對POINTWISE和ALL_TO_ALL連接模式的講解,我們能夠知道,它會將數據輸出到本地運行的下游算子的第一個實例,而非全局。在上下游算子的並行度相同的情況下,默認就會採用ForwardPartitioner。反之,若上下游算子的並行度不同,默認會採用前述的RebalancePartitioner。

CustomPartitionerWrapper
    Partitioner<K> partitioner;      KeySelector<T, K> keySelector;      // dataStream.partitionCustom()      @Override      public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {          K key;          try {              key = keySelector.getKey(record.getInstance().getValue());          } catch (Exception e) {              throw new RuntimeException("Could not extract key from " + record.getInstance(), e);          }            return partitioner.partition(key, numberOfChannels);      }

這就是自定義的分區邏輯了,我們可以通過繼承Partitioner接口自己實現,並傳入partitionCustom()方法。舉個簡單的栗子,以key的長度做分區:

    sourceStream.partitionCustom(new Partitioner<String>() {        @Override        public int partition(String key, int numPartitions) {          return key.length() % numPartitions;        }      }, 0);