Spark的兩種核心Shuffle詳解


在 MapReduce 框架中, Shuffle 階段是連接 Map 與 Reduce 之間的橋樑, Map 階段通過 Shuffle 過程將數據輸出到 Reduce 階段中。由於 Shuffle 涉及磁盤的讀寫和網絡 I/O,因此 Shuffle 性能的高低直接影響整個程序的性能。 Spark 也有 Map 階段和 Reduce 階段,因此也會出現 Shuffle 。

Spark Shuffle

Spark Shuffle 分為兩種:一種是基於 Hash 的 Shuffle;另一種是基於 Sort 的 Shuffle。先介紹下它們的發展歷程,有助於我們更好的理解 Shuffle:

在 Spark 1.1 之前, Spark 中只實現了一種 Shuffle 方式,即基於 Hash 的 Shuffle 。在 Spark 1.1 版本中引入了基於 Sort 的 Shuffle 實現方式,並且 Spark 1.2 版本之後,默認的實現方式從基於 Hash 的 Shuffle 修改為基於 Sort 的 Shuffle 實現方式,即使用的 ShuffleManager 從默認的 hash 修改為 sort。在 Spark 2.0 版本中, Hash Shuffle 方式己經不再使用

Spark 之所以一開始就提供基於 Hash 的 Shuffle 實現機制,其主要目的之一就是為了避免不需要的排序,大家想下 Hadoop 中的 MapReduce,是將 sort 作為固定步驟,有許多並不需要排序的任務,MapReduce 也會對其進行排序,造成了許多不必要的開銷。

在基於 Hash 的 Shuffle 實現方式中,每個 Mapper 階段的 Task 會為每個 Reduce 階段的 Task
生成一個文件,通常會產生大量的文件(即對應為 M*R 個中間文件,其中, M 表示 Mapper
階段的 Task 個數, R 表示 Reduce 階段的 Task 個數) 伴隨大量的隨機磁盤 I/O 操作與大量的內存開銷。

為了緩解上述問題,在 Spark 0.8.1 版本中為基於 Hash 的 Shuffle 實現引入了 Shuffle
Consolidate 機制(即文件合併機制)
,將 Mapper 端生成的中間文件進行合併的處理機制。通過配置屬性 spark.shuffie.consolidateFiles=true,減少中間生成的文件數量。通過文件合併,可以將中間文件的生成方式修改為每個執行單位為每個 Reduce
階段的 Task 生成一個文件。

執行單位對應為:每個 Mapper 端的 Cores 數/每個 Task
分配的 Cores 數(默認為 1) 。最終可以將文件個數從 M*R 修改為 E*C/T*R,其中,
E 表示 Executors 個數, C 表示可用 Cores 個數, T 表示 Task 分配的 Cores 數。

Spark1.1 版本引入了 Sort Shuffle

基於 Hash 的 Shuffle 的實現方式中,生成的中間結果文件的個數都會依賴於 Reduce 階段的 Task 個數,即 Reduce 端的並行度,因此文件數仍然不可控,無法真正解決問題。為了更好地解決問題,在 Spark1.1 版本引入了基於 Sort 的 Shuffle 實現方式,並且在 Spark 1.2 版本之後,默認的實現方式也從基於 Hash 的 Shuffle,修改為基於 Sort 的 Shuffle 實現方式,即使用的 ShuffleManager 從默認的 hash 修改為 sort。

在基於 Sort 的 Shuffle 中,每個 Mapper 階段的 Task 不會為每 Reduce 階段的 Task 生成一個單獨的文件,而是全部寫到一個數據(Data)文件中,同時生成一個索引(Index)文件, Reduce 階段的各個 Task 可以通過該索引文件獲取相關的數據。避免產生大量文件的直接收益就是降低隨機磁盤 I/0 與內存的開銷。最終生成的文件個數減少到 2*M ,其中 M 表示 Mapper 階段的 Task 個數,每個 Mapper 階段的 Task 分別生成兩個文件(1 個數據文件、 1 個索引文件),最終的文件個數為 M 個數據文件與 M 個索引文件。因此,最終文件個數是 2*M 個。

從 Spark 1.4 版本開始,在 Shuffle 過程中也引入了基於 Tungsten-Sort 的 Shuffie 實現方式,通 Tungsten 項目所做的優化,可以極大提高 Spark 在數據處理上的性能。(Tungsten 翻譯為中文是鎢絲)

註:在一些特定的應用場景下,採用基於 Hash 實現 Shuffle 機制的性能會超過基於 Sort 的 Shuffle 實現機制。

一張圖了解下 Spark Shuffle 的迭代歷史:

Spark Shuffle 迭代歷史
Spark Shuffle 迭代歷史

為什麼 Spark 最終還是放棄了 HashShuffle ,使用了 Sorted-Based Shuffle?

我們可以從 Spark 最根本要優化和迫切要解決的問題中找到答案,使用 HashShuffle 的 Spark 在 Shuffle 時產生大量的文件。當數據量越來越多時,產生的文件量是不可控的,這嚴重製約了 Spark 的性能及擴展能力,所以 Spark 必須要解決這個問題,減少 Mapper 端 ShuffleWriter 產生的文件數量,這樣便可以讓 Spark 從幾百台集群的規模瞬間變成可以支持幾千台,甚至幾萬台集群的規模。

但使用 Sorted-Based Shuffle 就完美了嗎,答案是否定的,Sorted-Based Shuffle 也有缺點,其缺點反而是它排序的特性,它強制要求數據在 Mapper 端必須先進行排序,所以導致它排序的速度有點慢。好在出現了 Tungsten-Sort Shuffle ,它對排序算法進行了改進,優化了排序的速度。Tungsten-Sort
Shuffle 已經併入了 Sorted-Based Shuffle,Spark 的引擎會自動識別程序需要的是 Sorted-Based
Shuffle,還是 Tungsten-Sort Shuffle。

下面詳細剖析每個 Shuffle 的底層執行原理:

一、Hash Shuffle 解析

以下的討論都假設每個 Executor 有 1 個 cpu core。

1. HashShuffleManager

shuffle write 階段,主要就是在一個 stage 結束計算之後,為了下一個 stage 可以執行 shuffle 類的算子(比如 reduceByKey),而將每個 task 處理的數據按 key 進行「劃分」。所謂「劃分」,就是對相同的 key 執行 hash 算法,從而將相同 key 都寫入同一個磁盤文件中,而每一個磁盤文件都只屬於下游 stage 的一個 task。在將數據寫入磁盤之前,會先將數據寫入內存緩衝中,當內存緩衝填滿之後,才會溢寫到磁盤文件中去

下一個 stage 的 task 有多少個,當前 stage 的每個 task 就要創建多少份磁盤文件。比如下一個 stage 總共有 100 個 task,那麼當前 stage 的每個 task 都要創建 100 份磁盤文件。如果當前 stage 有 50 個 task,總共有 10 個 Executor,每個 Executor 執行 5 個 task,那麼每個 Executor 上總共就要創建 500 個磁盤文件,所有 Executor 上會創建 5000 個磁盤文件。由此可見,未經優化的 shuffle write 操作所產生的磁盤文件的數量是極其驚人的

shuffle read 階段,通常就是一個 stage 剛開始時要做的事情。此時該 stage 的每一個 task 就需要將上一個 stage 的計算結果中的所有相同 key,從各個節點上通過網絡都拉取到自己所在的節點上,然後進行 key 的聚合或連接等操作。由於 shuffle write 的過程中,map task 給下游 stage 的每個 reduce task 都創建了一個磁盤文件,因此 shuffle read 的過程中,每個 reduce task 只要從上游 stage 的所有 map task 所在節點上,拉取屬於自己的那一個磁盤文件即可。

shuffle read 的拉取過程是一邊拉取一邊進行聚合的。每個 shuffle read task 都會有一個自己的 buffer 緩衝,每次都只能拉取與 buffer 緩衝相同大小的數據,然後通過內存中的一個 Map 進行聚合等操作。聚合完一批數據後,再拉取下一批數據,並放到 buffer 緩衝中進行聚合操作。以此類推,直到最後將所有數據到拉取完,並得到最終的結果。

HashShuffleManager 工作原理如下圖所示:

未優化的HashShuffleManager工作原理
未優化的HashShuffleManager工作原理

2. 優化的 HashShuffleManager

為了優化 HashShuffleManager 我們可以設置一個參數:spark.shuffle.consolidateFiles,該參數默認值為 false,將其設置為 true 即可開啟優化機制,通常來說,如果我們使用 HashShuffleManager,那麼都建議開啟這個選項

開啟 consolidate 機制之後,在 shuffle write 過程中,task 就不是為下游 stage 的每個 task 創建一個磁盤文件了,此時會出現shuffleFileGroup的概念,每個 shuffleFileGroup 會對應一批磁盤文件,磁盤文件的數量與下游 stage 的 task 數量是相同的。一個 Executor 上有多少個 cpu core,就可以並行執行多少個 task。而第一批並行執行的每個 task 都會創建一個 shuffleFileGroup,並將數據寫入對應的磁盤文件內

當 Executor 的 cpu core 執行完一批 task,接着執行下一批 task 時,下一批 task 就會復用之前已有的 shuffleFileGroup,包括其中的磁盤文件,也就是說,此時 task 會將數據寫入已有的磁盤文件中,而不會寫入新的磁盤文件中。因此,consolidate 機制允許不同的 task 復用同一批磁盤文件,這樣就可以有效將多個 task 的磁盤文件進行一定程度上的合併,從而大幅度減少磁盤文件的數量,進而提升 shuffle write 的性能

假設第二個 stage 有 100 個 task,第一個 stage 有 50 個 task,總共還是有 10 個 Executor(Executor CPU 個數為 1),每個 Executor 執行 5 個 task。那麼原本使用未經優化的 HashShuffleManager 時,每個 Executor 會產生 500 個磁盤文件,所有 Executor 會產生 5000 個磁盤文件的。但是此時經過優化之後,每個 Executor 創建的磁盤文件的數量的計算公式為:cpu core的數量 * 下一個stage的task數量,也就是說,每個 Executor 此時只會創建 100 個磁盤文件,所有 Executor 只會創建 1000 個磁盤文件。

這個功能優點明顯,但為什麼 Spark 一直沒有在基於 Hash Shuffle 的實現中將功能設置為默認選項呢,官方給出的說法是這個功能還欠穩定。

優化後的 HashShuffleManager 工作原理如下圖所示:

優化後的HashShuffleManager工作原理
優化後的HashShuffleManager工作原理
基於 Hash 的 Shuffle 機制的優缺點

優點

  • 可以省略不必要的排序開銷。

  • 避免了排序所需的內存開銷。

缺點

  • 生產的文件過多,會對文件系統造成壓力。

  • 大量小文件的隨機讀寫帶來一定的磁盤開銷。

  • 數據塊寫入時所需的緩存空間也會隨之增加,對內存造成壓力。

二、SortShuffle 解析

SortShuffleManager 的運行機制主要分成三種:

  1. 普通運行機制

  2. bypass 運行機制,當 shuffle read task 的數量小於等於spark.shuffle.sort.bypassMergeThreshold參數的值時(默認為 200),就會啟用 bypass 機制;

  3. Tungsten Sort 運行機制,開啟此運行機制需設置配置項 spark.shuffle.manager=tungsten-sort。開啟此項配置也不能保證就一定採用此運行機制(後面會解釋)。

1. 普通運行機制

在該模式下,數據會先寫入一個內存數據結構中,此時根據不同的 shuffle 算子,可能選用不同的數據結構。如果是 reduceByKey 這種聚合類的 shuffle 算子,那麼會選用 Map 數據結構,一邊通過 Map 進行聚合,一邊寫入內存如果是 join 這種普通的 shuffle 算子,那麼會選用 Array 數據結構,直接寫入內存。接着,每寫一條數據進入內存數據結構之後,就會判斷一下,是否達到了某個臨界閾值。如果達到臨界閾值的話,那麼就會嘗試將內存數據結構中的數據溢寫到磁盤,然後清空內存數據結構。

在溢寫到磁盤文件之前,會先根據 key 對內存數據結構中已有的數據進行排序。排序過後,會分批將數據寫入磁盤文件。默認的 batch 數量是 10000 條,也就是說,排序好的數據,會以每批 1 萬條數據的形式分批寫入磁盤文件。寫入磁盤文件是通過 Java 的 BufferedOutputStream 實現的。BufferedOutputStream 是 Java 的緩衝輸出流,首先會將數據緩衝在內存中,當內存緩衝滿溢之後再一次寫入磁盤文件中,這樣可以減少磁盤 IO 次數,提升性能

一個 task 將所有數據寫入內存數據結構的過程中,會發生多次磁盤溢寫操作,也就會產生多個臨時文件。最後會將之前所有的臨時磁盤文件都進行合併,這就是merge 過程,此時會將之前所有臨時磁盤文件中的數據讀取出來,然後依次寫入最終的磁盤文件之中。此外,由於一個 task 就只對應一個磁盤文件,也就意味着該 task 為下游 stage 的 task 準備的數據都在這一個文件中,因此還會單獨寫一份索引文件,其中標識了下游各個 task 的數據在文件中的 start offset 與 end offset。

SortShuffleManager 由於有一個磁盤文件 merge 的過程,因此大大減少了文件數量。比如第一個 stage 有 50 個 task,總共有 10 個 Executor,每個 Executor 執行 5 個 task,而第二個 stage 有 100 個 task。由於每個 task 最終只有一個磁盤文件,因此此時每個 Executor 上只有 5 個磁盤文件,所有 Executor 只有 50 個磁盤文件。

普通運行機制的 SortShuffleManager 工作原理如下圖所示:

普通運行機制的SortShuffleManager工作原理
普通運行機制的SortShuffleManager工作原理

2. bypass 運行機制

Reducer 端任務數比較少的情況下,基於 Hash Shuffle 實現機制明顯比基於 Sort Shuffle 實現機制要快,因此基於 Sort huffle 實現機制提供了一個回退方案,就是 bypass 運行機制。對於 Reducer 端任務數少於配置屬性spark.shuffle.sort.bypassMergeThreshold設置的個數時,使用帶 Hash 風格的回退計劃。

bypass 運行機制的觸發條件如下:

  • shuffle map task 數量小於spark.shuffle.sort.bypassMergeThreshold=200參數的值。
  • 不是聚合類的 shuffle 算子。

此時,每個 task 會為每個下游 task 都創建一個臨時磁盤文件,並將數據按 key 進行 hash 然後根據 key 的 hash 值,將 key 寫入對應的磁盤文件之中。當然,寫入磁盤文件時也是先寫入內存緩衝,緩衝寫滿之後再溢寫到磁盤文件的。最後,同樣會將所有臨時磁盤文件都合併成一個磁盤文件,並創建一個單獨的索引文件。

該過程的磁盤寫機制其實跟未經優化的 HashShuffleManager 是一模一樣的,因為都要創建數量驚人的磁盤文件,只是在最後會做一個磁盤文件的合併而已。因此少量的最終磁盤文件,也讓該機制相對未經優化的 HashShuffleManager 來說,shuffle read 的性能會更好。

而該機制與普通 SortShuffleManager 運行機制的不同在於:第一,磁盤寫機制不同;第二,不會進行排序。也就是說,啟用該機制的最大好處在於,shuffle write 過程中,不需要進行數據的排序操作,也就節省掉了這部分的性能開銷。

bypass 運行機制的 SortShuffleManager 工作原理如下圖所示:

bypass運行機制的SortShuffleManager工作原理
bypass運行機制的SortShuffleManager工作原理

3. Tungsten Sort Shuffle 運行機制

基於 Tungsten Sort 的 Shuffle 實現機制主要是藉助 Tungsten 項目所做的優化來高效處理
Shuffle。

Spark 提供了配置屬性,用於選擇具體的 Shuffle 實現機制,但需要說明的是,雖然默認情況下 Spark 默認開啟的是基於 SortShuffle 實現機制,但實際上,參考 Shuffle 的框架內核部分可知基於 SortShuffle 的實現機制與基於 Tungsten Sort Shuffle 實現機制都是使用 SortShuffleManager,而內部使用的具體的實現機制,是通過提供的兩個方法進行判斷的:

對應非基於 Tungsten Sort 時,通過 SortShuffleWriter.shouldBypassMergeSort 方法判斷是否需要回退到 Hash 風格的 Shuffle 實現機制,當該方法返回的條件不滿足時,則通過 SortShuffleManager.canUseSerializedShuffle 方法判斷是否需要採用基於 Tungsten Sort Shuffle 實現機制,而當這兩個方法返回都為 false,即都不滿足對應的條件時,會自動採用普通運行機制。

因此,當設置了 spark.shuffle.manager=tungsten-sort 時,也不能保證就一定採用基於 Tungsten Sort 的 Shuffle 實現機制。

要實現 Tungsten Sort Shuffle 機制需要滿足以下條件

  1. Shuffle 依賴中不帶聚合操作或沒有對輸出進行排序的要求。

  2. Shuffle 的序列化器支持序列化值的重定位(當前僅支持 KryoSerializer Spark SQL 框架自定義的序列化器)。

  3. Shuffle 過程中的輸出分區個數少於 16777216 個。

實際上,使用過程中還有其他一些限制,如引入 Page 形式的內存管理模型後,內部單條記錄的長度不能超過 128 MB (具體內存模型可以參考 PackedRecordPointer 類)。另外,分區個數的限制也是該內存模型導致的。

所以,目前使用基於 Tungsten Sort Shuffle 實現機制條件還是比較苛刻的。

基於 Sort 的 Shuffle 機制的優缺點

優點

  • 小文件的數量大量減少,Mapper 端的內存佔用變少;

  • Spark 不僅可以處理小規模的數據,即使處理大規模的數據,也不會很容易達到性能瓶頸。

缺點

  • 如果 Mapper 中 Task 的數量過大,依舊會產生很多小文件,此時在 Shuffle 傳數據的過程中到 Reducer 端, Reducer 會需要同時大量地記錄進行反序列化,導致大量內存消耗和 GC 負擔巨大,造成系統緩慢,甚至崩潰;

  • 強制了在 Mapper 端必須要排序,即使數據本身並不需要排序;

  • 它要基於記錄本身進行排序,這就是 Sort-Based Shuffle 最致命的性能消耗。


參考資料:

  • 《Spark大數據商業實戰三部曲》
  • //spark.apache.org/docs/2.0.0/programming-guide.html#shuffle-operations
  • //mp.weixin.qq.com/s/2yT4QGIc7XTI62RhpYEGjw