spark shuffle寫操作三部曲之UnsafeShuffleWriter
- 2019 年 10 月 3 日
- 筆記
前言
在前兩篇文章 spark shuffle的寫操作之準備工作 中引出了spark shuffle的三種實現,spark shuffle寫操作三部曲之BypassMergeSortShuffleWriter 講述了BypassMergeSortShuffleWriter 用於shuffle寫操作的具體細節,實現相對比較樸素,值得學習。本篇文章,主要剖析了 UnsafeShuffleWriter用作寫shuffle數據的具體細節,它在 BypassMergeSortShuffleWriter 的思路上更進一步,建議先看 spark shuffle寫操作三部曲之BypassMergeSortShuffleWriter,再來看本篇文章。下面先來看UnsafeShuffleWriter的主要依賴實現類 — ShuffleExternalSorter。
sort-based shuffle的外部sorter — ShuffleExternalSorter
在看本小節之前,建議先參照 spark 源碼分析之二十二– Task的記憶體管理 對任務的記憶體管理做一下詳細的了解,因為ShuffleExternalSorter使用了記憶體的排序。任務在做大數據量的記憶體操作時,記憶體是需要管理的。
在正式剖析之前,先剖析其依賴類。
依賴之記錄block元資訊– SpillInfo
它記錄了block的一些元數據資訊。
其類結構如下:
其中,blockId就是shuffle的臨時的blockId,file就是shuffle合併後的文件,partitionLengths表示每一個分區的大小。
依賴之分區排序器 — ShuffleInMemorySorter
可以在任何記憶體使用的數組–LongArray
它支援堆內記憶體和堆外記憶體,它有四個屬性:
數組裡的一個元素的地址等於:
if (baseObj == null) ? baseOffset(is real os address) + (length – 1) * WIDTH : address(baseObj) + baseOffset(is relative address 0) + (length – 1) * WIDTH
所有元素設為0:
設置元素
其底層使用unsafe類來設置值
獲取元素
其底層使用unsafe類來獲取值
記錄指針地址壓縮器 — PackedRecordPointer
全稱:org.apache.spark.shuffle.sort.PackedRecordPointer
成員常量:
壓縮記錄指針和分區:
獲取記錄的地址:
獲取記錄的分區:
自定義比較器–SortComparator
思路也很簡單,就是根據分區來排序,即相同分區的數據被排到了一起。
遍歷自定義數組的迭代器 — ShuffleSorterIterator
其定義如下:
其思路很簡單,hasNext跟JDK標準庫的實現一致,多了一個loadNext,每次都需要把數組中下一個位置的元素放到packetRecordPointer中,然後從packedRecordPointer中取出數據的地址和分區資訊。
獲取迭代器
獲取迭代器的源碼如下:
其中 useRadixSort表示是否使用基數排序,默認是使用基數排序的,由參數 spark.shuffle.sort.useRadixSort 配置。
如果不使用基數排序,則會使用Spark的Sorter排序,sorter底層實現是TimSort,TimSort是優化之後的MergeSort。
總之,ShuffleSorterIterator中的數據已經是有序的了,只需要迭代式取出即可。
插入數據到自定義的數組中
思路很簡單,插入的數據就是記錄的地址和分區數據,這兩種數據被PackedRecordPointer壓縮編碼之後被存入到數組中。
繼承關係
其繼承關係如下:
即它是MemoryConsumer的子類,其實現了spill方法。
成員變數
其成員變數如下:
DISK_WRITE_BUFFER_SIZE:寫到磁碟前的緩衝區大小為1M
numPartitions:reduce的分區數
taskMemoryManager:負責任務的記憶體管理。看 spark 源碼分析之二十二– Task的記憶體管理 做進一步了解。
blockManager:Spark存儲系統的核心類。看 spark 源碼分析之十八 — Spark存儲體系剖析 做進一步了解。
TaskContext:任務執行的上下文對象。
numElementsForSpillThreshold:ShuffleInMemorySorter 數據溢出前的元素閥值。
fileBufferSizeBytes:DiskBlockObjectWriter溢出前的buffer大小。
diskWriteBufferSize:溢出到磁碟前的buffer大小。
allocatedPages:記錄分配的記憶體頁。
spills:記錄溢出資訊
peakMemoryUsedBytes:記憶體使用峰值。
inMemSorter:記憶體排序器
currentPage:當前使用記憶體頁
pageCursor:記憶體頁游標,標誌在記憶體頁的位置。
構造方法
其構造方法如下:
fileBufferSizeBytes:通過參數 spark.shuffle.file.buffer 來配置,默認為 32k
numElementsForSpillThreshold:通過參數spark.shuffle.spill.numElementsForceSpillThreshold來配置,默認是整數的最大值。
diskWriteBufferSize:通過 spark.shuffle.spill.diskWriteBufferSize 來配置,默認為 1M
核心方法
主要方法如下:
我們主要分析其主要方法。
溢出操作
其源碼如下:
思路很簡單,調用writeSortedFile將數據寫入到文件中,釋放記憶體,重置inMemSorter。
freeMemory方法如下:
writeSortedFile 源碼如下:
圖中,我大致把步驟劃分為四部分。整體思路:遍歷sorter中的所有分區數據,最終同一分區的數據被寫入到同一個FileSegment中,這些FileSegment最終又構成了一個合併的文件,其中FileSegment的大小被存放在SpillInfo中,最後放到了spills集合中。重點說一下第三步的獲取地址資訊,如果是堆內地址,recordPage就是base對象,recordOffsetInPage就是記錄相對於base對象的偏移量,如果是堆外地址,recordPage為null,因為堆外地址沒有base對象,其baseOffset就是其在作業系統記憶體中的絕對地址,recordOffsetInPage = offsetInPage + baseOffset,具體可以在 spark 源碼分析之二十二– Task的記憶體管理 中看TaskMemoryManager的實現細節。
插入記錄
其源碼如下:
注意:如果是堆內記憶體,baseObject就是分配的數組,baseOffset就是數組的下標索引。如果是堆外記憶體,baseObject為null,baseOffset就是作業系統記憶體中的地址。
在地址編碼的時候,如果是堆內記憶體,頁內的偏移量就是baseObject,如果是堆外記憶體,頁內偏移量為: 真實偏移量 – baseOffset。
它在插入數據之前,offset做了位元組對齊,如果系統支援對齊,則向後錯4位,否則向後錯8位。這跟溢出操作里取數據是對應的,即可以跟上文中 writeSortedFile 方法對比看。
org.apache.spark.shuffle.sort.ShuffleExternalSorter#growPointerArrayIfNecessary源碼如下:
解釋:首先hasSpaceForAnotherRecord會比較數組中下一個寫的索引位置跟數組的最大容量比較,如果索引位置大於最大容量,那麼就沒有空間來存放下一個記錄了,則需要把擴容,used是指的數組現在使用的大小,擴容倍數為源數組的一倍。
org.apache.spark.shuffle.sort.ShuffleExternalSorter#acquireNewPageIfNecessary 源碼如下:
解釋:分配記憶體頁的條件是當前頁的游標 + 需要的頁大小 大於當前頁的最大容量,則需要重新分配一個記憶體頁。
關閉並且獲取spill資訊
其源碼如下:
思路:執行最後一次溢出,然後將數據溢出資訊返回。
清理資源
思路:釋放記憶體排序器的記憶體,刪除溢出的臨時文件。
獲取記憶體使用峰值
源碼如下:
思路:當前使用記憶體大於最大峰值則更新最大峰值,否則直接返回。
總結
這個sorter內部集成的記憶體sorter會把同一分區的數據排序到一起,數據溢出時,相同分區的數據會聚集到溢出文件的一個segment中。
使用UnsafeShuffleWriter寫數據
先上源碼,後解釋:
思路:流程很簡單,將所有的數據逐一遍歷放入sorter,然後將sorter關閉,獲取輸出文件,結束。
下面我們具體來看每一步是具體怎麼實現的:
初始化Sorter
在org.apache.spark.shuffle.sort.UnsafeShuffleWriter的構造方法源碼如下:
簡單做一下說明:
DEFAULT_INITIAL_SORT_BUFFER_SIZE為 4096
DEFAULT_INITIAL_SER_BUFFER_SIZE 大小為 1M
reduce 分區數量最大為 16777216
SHUFFLE_FILE_BUFFER_SIZE默認為32k,大小由參數 spark.shuffle.file.buffer 配置。
SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE 默認大小為32k,大小由參數 spark.shuffle.unsafe.file.output.buffer 配置。
其open方法如下:
這個方法里涉及了三個類:ShuffleExternalSorter,MyByteArrayOutputStream以及SerializationStream三個類。ShuffleExternalSorter在上文已經剖析過了,MyByteArrayOutputStream是一個ByteArrayOutputStream子類負責想堆內記憶體中寫數據,SerializationStream是一個序列化之後的流,數據最終會被寫入到serBuffer記憶體流中,調用其flush方法後,其內部的buf就是寫入的數據,如下:
數據寫入概述
核心方法write源碼如下:
其主要有兩步,一步是遍歷每一條記錄,將數據寫入到sorter中;第二步是關閉sorter,並將數據寫入到一個shuffle 文件中同時更新shuffle索引資訊;最後清除shuffle過程中sorter使用的資源。
先來看第一步:數據寫入到sorter中。
數據插入到Sorter
記錄中的鍵值被序列化到serBuffer的buf位元組數組中,然後被寫入到 sorter(ShuffleExternalSorter)中。在sorter中序列化數據被寫入到記憶體中(記憶體不足會溢出到磁碟中),其地址資訊被寫入到 ShuffleInMemorySorter 中,具體可以看上文介紹。
溢出文件歸併為一個文件
一步是遍歷每一條記錄,將數據寫入到sorter中後會調用sorter的closeAndGetSpills方法執行最後一次spill操作,然後獲取到整個shuffle過程中所有的SpillInfo資訊。然後使用ShuffleBlockResolver獲取到shuffle的blockId對應的shuffle文件,最終調用mergeSpills 方法合併所有的溢出文件到最終的shuffle文件,然後更新shuffle索引文件,設置Shuffle結果的MapStatus資訊,結束。
org.apache.spark.shuffle.sort.UnsafeShuffleWriter#closeAndWriteOutput 源碼如下:
其關鍵方法 org.apache.spark.shuffle.sort.UnsafeShuffleWriter#mergeSpills 源碼如下:
如果溢出文件為0,直接返回全是0的分區數組。
如果溢出文件為1,文件重命名後返回只有一個元素的分區數組。
如果溢出文件多於1個則,多個溢出文件開始merge。
首先先看一下五個變數:
encryptionEnabled:是否啟用加密,默認為false,通過 spark.io.encryption.enabled 參數來設置。
transferToEnabled:是否可以使用nio的transferTo傳輸,默認為true,通過 spark.file.transferTo 參數來設置。
compressionEnabled:是否使用壓縮,默認為true,通過 spark.shuffle.compress 參數來設置。
compressionCodec:默認壓縮類,默認為LZ4CompressionCodec,通過 spark.io.compression.codec 參數來設置。
fastMergeEnabled:是否啟用fast merge,默認為true,通過 spark.shuffle.unsafe.fastMergeEnabled 參數來設置。
fastMergeIsSupported:是否支援 fast merge,如果不使用壓縮或者是壓縮演算法是 org.apache.spark.io.SnappyCompressionCodec、org.apache.spark.io.LZFCompressionCodec、org.apache.spark.io.LZ4CompressionCodec、org.apache.spark.io.ZStdCompressionCodec這四種支援連接的壓縮演算法中的一種都是可以使用 fast merge的。
三種merge多個文件的方式:transfered-based fast merge、fileStream-based fast merge以及slow merge三種方式。
使用transfered-based fast merge條件:使用 fast merge並且壓縮演算法支援fast merge,並且啟用了nio的transferTo傳輸且不啟用文件加密。
使用fileStream-based fast merge條件:使用 fast merge並且壓縮演算法支援fast merge,並且未啟用nio的transferTo傳輸或啟用了文件加密。
使用slow merge條件:未使用 fast merge或壓縮演算法不支援fast merge。
下面我們來看三種合併溢出的方式。
transfered-based fast merge
其核心方法org.apache.spark.shuffle.sort.UnsafeShuffleWriter#mergeSpillsWithTransferTo 源碼如下:
其依賴方法 org.apache.spark.util.Utils#copyFileStreamNIO 如下:
很簡單,底層依賴於Java的NIO的transferTo方法實現。
fileStream-based fast merge
其核心方法 org.apache.spark.shuffle.sort.UnsafeShuffleWriter#mergeSpillsWithFileStream 源碼如下,這裡不傳入任何壓縮類,見 org.apache.spark.shuffle.sort.UnsafeShuffleWriter#mergeSpills 源碼。
slow merge
其其核心方法 org.apache.spark.shuffle.sort.UnsafeShuffleWriter#mergeSpillsWithFileStream 源碼跟 fileStream-based fast merge 里的一樣,不做過多解釋,只不過這裡多傳入了一個壓縮類,見 org.apache.spark.shuffle.sort.UnsafeShuffleWriter#mergeSpills 源碼。
更新shuffle索引
這部分更詳細的可以看 org.apache.spark.shuffle.IndexShuffleBlockResolver#writeIndexFileAndCommit 源碼。在上篇文章 spark shuffle寫操作三部曲之BypassMergeSortShuffleWriter 中使用BypassMergeSortShuffleWriter寫數據已經剖析過,不再剖析。
總結
ShuffleExternalSorter將數據不斷溢出到溢出小文件中,溢出文件內的數據是按分區規則排序的,分區內的數據是亂序的。
多個分區的數據同時溢出到一個溢出文件,最後使用三種歸併方式中的一種將多個溢出文件歸併到一個文件,分區內的數據是亂序的。最終數據的格式跟第一種shuffle寫操作的結果是一樣的,即有分區的shuffle數據文件和記錄分區大小的shuffle索引文件。