深度理解 Flink 的 parallelism 和 slot

  • 2020 年 3 月 31 日
  • 筆記

一、什麼是 parallelism(並行度)

parallelism 在 Flink 中表示每個運算元的並行度。

舉兩個例子

(1)比如 kafka 某個 topic 數據量太大,設置了10個分區,但 source 端的運算元並行度卻為1,只有一個 subTask 去同時消費10個分區,明顯很慢。此時需要適當的調大並行度。

(2)比如 某個運算元執行了比較複雜的操作,導致該運算元執行特別慢,那麼可以考慮給該運算元增加並行度。

二、如何調節並行度

1. 配置文件

Flink 安裝根目錄下,conf 里的 flink-conf.yml 里有一個配置,默認並行度為1

/usr/local/flink-1.9.2/conf/flink-conf.yml

意味著如果程式中不設置任何並行度,那所有運算元的並行度都是1

2. 通過 env 變數設置

val env = StreamExecutionEnvironment.getExecutionEnvironment()  env.setParallelism(10);

這樣設置的並行度是程式中每個運算元的並行度,如果運算元沒有單獨覆蓋的話,那就是默認是這個全局的並行度了

3. 為每個運算元單獨設置並行度

env.addSource(...)  .map(...).setParallelism(5)  .keyBy(...)  .addSink(...).setParallelism(1)

綜上優先順序是 運算元設置的並行度 > env 設置的並行度 > 配置文件默認的並行度

三、什麼是 slot

slot 是 TaskManager 資源的最小單元。比如 TaskManager 有 5 個 slot,那麼每個 slot 分配 25% 的記憶體,所有 slot 共享 TaskManager 的 cpu。

在一個 slot 中可以運行一個或者多個執行緒。

問題來了,是不是每個 slot 里只能跑一個運算元的一個子任務呢?

當然不是,這樣的話,資源共享的效率也就太低了。實際上,一個 slot 可以跑同一個job裡面,不同運算元的不同子任務。

我們拿 Flink 官網的幾張圖來解釋一下

如上這張圖,2 個 TaskManager,6 個 slot。

Source 和 map 運算元組成了任務鏈,並行度是2,跑在了 2 個 slot 中。

keyBy()/window()/apply 運算元組成了任務鏈,並行度也是2,也跑在了 2 個 slot 中。

sink 的並行度 是 1,跑在 1 個 slot 中。

這其中有個疑問是,為啥 source/map 要和 keyBy 運算元分開,他們不能是一個任務鏈嗎?

答案是否定的,因為 keyBy 相當於是分區,得把數據分到不同的運算元上,當然不能在一個任務鏈裡面了。

那上面這樣分配的問題是,可能 source/map 運算元的任務很輕,分分鐘就跑完了,然後 cpu 在那閑著。但是 keyBy/window/apply 運算元一直在忙著計算,資源很緊張。

這樣資源也是很不合理的。事實上,任務可以向下面的圖這樣分配

source/map 運算元 和 keyBy/window/apply 和 sink 運算元共享了一個 slot 資源。他們的並行度都是6。

這樣資源就很合理了。

所以, flink 任務,最大並行度的那個運算元,決定了需要多少個 slot 。把消耗並行度最大的那個運算元解決了,其他運算元也都沒問題。

為了加深大家的理解,這裡再對照著幾幅圖加深一下認識

slot 是指 TaskManager 的最大並發能力

如上圖,3 個 TaskManager,每個 TaskManager 3 個 slot,此時一共有 9 個 slot。

如上圖,所有的運算元並行度為1,只需要 1 個 slot 就能解決問題,有 8 個處於空閑。

如上圖的上半部分,並行度為2,使用了 2 個 slot。

下半部分,設置並行度為9,所有的 slot 都用到了。

四、如何合理的設置並行度

設置並行度,需要考慮到集群可用的 slot 數量,如果 並行度設置的過大,集群的 slot 又不足,那麼任務可能會一直等待,直到超時拋出異常退出。

在集群資源夠用時,可以充分的利用集群資源,比如 kafka 的分區有10個,那麼可以設置 source 的並行度為 10. 每個 subTask 消費一個 1個分區。