Magnet: Push-based Shuffle Service for Large-scale Data Processing

本文是閱讀 LinkedIn 公司2020年發表的論文 Magnet: Push-based Shuffle Service for Large-scale Data Processing 一點筆記。

什麼是Shuffle

image.png
以上圖為例,在一個DAG的執行圖中,節點與節點之間的數據交換就是Shuffle的過程。雖然Shuffle的過程很簡單,但是不同的引擎有不同的實現。
以shuffle數據傳輸的介質來看

  • 有基於磁盤的shuffle,例如Map/Reduce ,Spark,Flink Batch中,上下游之前的數據都是需要落盤後來進行傳輸,這類通常是離線處理框架,對延遲不敏感,基於磁盤更加可靠穩定。
  • 有基於內存的pipeline模式的shuffle方案,例如Presto/Flink Streaming中,主要是對時延比較敏感的場景,基於內存Shuffle,通過網絡rpc直接傳輸內存數據

而基於本地磁盤的Shuffle實現中又有很多種不同的實現

  • 有基於Hash的方案,每個map端的task為每個reduce task 產生一個 shuffle文件
  • 有基於Sort方案,每個map端的task按照 partitionId + hash(key) 排序,並最終merge成一個文件以及一個index文件,在reduce端讀取時根據每個task的index文件來讀取相應segment的數據

以部署方式來看

  • 有基於worker的本地shuffle的方案,直接通過worker來提供讀寫的功能
  • 有基於external shuffle的實現,通常託管於資源管理框架,在Yarn框架中就可以實現這種輔助服務,這樣就可以及時的釋放worker計算資源
  • 有基於Remote shuffle的實現,在雲計算時代逐漸成為主流,因為其存算分離的架構往往能帶來更好的可擴展性並且網絡帶寬的提高使得co-locate_也許_不再那麼重要。

Spark Shuffle實現

image.png
這裡再大致介紹下spark原生的external sort shuffle的詳細流程

  1. 每個spark executor啟動後和本地節點的external shuffle service註冊,同一個機器的多個executor會共享這個機器上的shuffle service服務。
  2. map stage處理完數據之後會產出兩個文件 shuffle data 和 index文件,map task會按照partition key 來進行排序,屬於同一個reduce 的數據作為一個Shuffle Block,而index文件中則會記錄不同的Shuffle Block 之間的邊界offset,輔助下游讀取
  3. 當下游reduce task開始運行,首先會查詢Spark driver 得到input shuffle blocks的位置信息,然後開始和spark ESS建立鏈接開始讀取數據,讀取數據時就會根據index文件來skip讀取自己task那個shuffle blocks

痛點

在LinkedIn公司主要採用了Spark自帶的基於Yarn的External sorted shuffle實現,主要遇到痛點:

All-To-All Connections

map 和 reduce task之間需要維護all-to-all 的鏈接,以M個Map端task,R和Reducer端task為例,理論上就會建立M * R 個connection。
在實際實現中,一個executor上的reducer可以共享一個和ess的tcp鏈接。因此實際上的鏈接數是和executor個數 E 和ess節點數 S相關。但是在生產集群中 E 和 S 可能都會達到上千,這時鏈接數就會非常的客觀,很容易帶來穩定性的問題,如果建立鏈接失敗可能會導致相關stage進行重跑,失敗代價很高。

Random IO

從上面的讀取流程我們可以看到因為多個reduce task數據在同一個文件中,很容易產生隨機讀取的問題,並且從linkedin公司觀察到的這些block通常都比較小,平均只有10KB。而LinkedIn shuffle集群主要使用的HDD磁盤,這個問題就會更大。並且隨機讀取以及大量的網絡小包會帶來性能的損失。

也許我們會想到說是否可以有辦法來通過調參來讓Shuffle Block 變大而減輕隨機小IO的問題呢?比如把reduce task端的並發調小,這樣每個task的數據量必然就變大了。
論文中也對此做了闡述,沒法通過簡單的調整reduce task的並發來增大shuffle block size的大小。

假設有一個M個mapper,R個reducer的任務,總的shuffle數據量為D。為了保持每個task處理的數據量恆定,當總數據量增長的時候,map和reduce的並發都要等比增長。
而shuffle block 大小就是 , 為什麼 是 呢,從上面的流程中可以看到每個map端可以近似看做是維護了R個reduce的block。所以總的block數是
那麼當數據量增長時,並且為了保證每個task處理的數據量恆定,即性能不下降,那麼shuffle block size必然會減小。最後也因為reduce端數據分散在所有的map端的task,導致不太能利用data locality的特性。

Magent 設計概要

image.png
總體架構

Push Merge Shuffle

Mapper 端的shuffle數據會push到遠程的 shuffle service,並按照reduce端合併成一個文件。這樣shuffle 文件的大小就可以提高到MB級別。
這裡Magnet主要考慮儘可能避免給shuffle service帶來過大的壓力(為了穩定性和可擴展性考慮),因此在Magent中,在mapper端,依然會將shuffle數據,首先保存到本地,然後再按照以下的算法,將shuffle blocks打包成一個個chunks發送到shuffle service。
image.png
計算blocks劃分到chunks算法
這個算法的含義如下:

  1. 按照 計算 第 i 個 reduce 數據所應該發送的shuffle service的下標,表示每台shuffle service機器所需要分配的Reduce task的數量,當其大於 k 時表示需要發送到下一個機器,則更新 k 的值為 k++
  2. 當chunk長度沒有超過限制L,將(長度為 )append到chunk中,並將chunk長度更新為
  3. 當chunk長度超過了限制L,那麼就把 append 到 下一個 chunk中,並將chunk 長度置為 , shuffe service 機器還是為 k。

算法最終輸出的是每個 shuffle service 機器和對應的所需要接收的chunk的集合。

這個算法保證,每個chunks只包含一個shuffle file中連續的不同shuffle partition 的 shuffle blocks。當達到一定大小後會另外創建一個chunk。但是不同mapper上的同一個shuffle parititon的數據最終會路由到同一個shuffle service節點上。

並且為了避免同時mapper端都按照同一順序往shuffle service 節點寫數據造成擠兌和merge時的文件並發鎖,所以在mapper端處理chunk的順序上做了隨機化。

在完成打包chunk和隨機化之後,就交由一個專門的線程池來將數據從按照chunk順序從本地磁盤load出來,所以這裡就是順序的讀取本地磁盤再push到遠程的shuffle service。Push操作是和Mapper端的task解耦的,push操作失敗不會影響map端的task。

Magnet Metadata

當magnet收到打包發送來的chunks,首先會根據block的元數據獲取他的分區信息,然後根據shuffle service本地維護的元數據做處理,shuffle service本地為每個Shuffle partition (reduce partition)維護了以下元信息

  • bitmap 存儲了以及merge的mapper的id
  • position offset 記錄了merge 文件中最近一次成功merge的 offset
  • currentMapId 記錄了當前正在merge的 mapper的 shuffle block id

image.png

這樣首先可以根據發送來的shuffle blocks的元數據判斷數據是否已經merge過了,避免重複存儲。通過currentMapId來避免多個mapper端數據同時往一個文件merge的問題,而position offset 則可以用作在merge 失敗的時候可以依舊保持文件能讀到最近一次成功的位置。下一次重寫的時候會依舊從position offset進行覆蓋寫入。通過這幾個元數據管理,就可以很優雅的處理在文件merge過程中的寫重複,寫衝突和寫失敗的問題。

Best effort

在Magent的設計中,push/merge的失敗,並不會影響整個任務的流程,可以fallback到讀取mapper端未merge的數據。

  1. 如果map task 在寫入本地shuffle數據完成之前失敗了,那麼map端task會進行重跑
  2. 如果map端push/merge失敗,那麼這部分數據就會直接從mapper端讀取
  3. 如果reduce fetch merge block失敗,那麼也會fallback到從mapper端讀取

我理解要實現這樣的目的,原始數據就需要被保留,所以可以看到在架構圖中Magent Shuffle Service實際上會和executor一起部署(還支持其他的部署形式)。在executor端作為external shuffle service的角色存在,mapper端的數據產出完之後就由本地的shuffle service 節點託管了。所以他可以在以上2、3兩種失敗場景下提供fallback的讀取能力。
同時數據是否Merge完的信息是在Spark Driver中通過MapStatusMergeStatus兩個結構來進行維護的,下游讀取數據時就是由driver來進行是否fallback的邏輯。
從整體上看Push/Merge 的操作可以理解為完全由Magent Shuffle Service節點託管的數據搬遷合併的動作(將各個mapper處的數據搬遷合併成redcuer端的數據),通過數據寫兩次的行為使得mapper端寫數據和合併解耦,並且在fault tolerance的設計中也利用了寫兩次這個行為所帶來的備份的好處。
同時我們需要關注到雖然通過這個操作,將mapper端的隨機讀取轉化成了順序讀取,但是在shuffle service時merge時,其實還是random write,這在數據重組的過程中是必然的。但是由於os cache 和 disk buffer的存在,會使得random write的吞吐比random read的吞吐大很多。

Flexible Deployment Strategy

Magnet支持兩種模式的部署

  • on-perm 表示和Spark計算集群一起部署,作為external shuffle service的方式存在。
  • cloud-based 表示以存算分離的模式部署,這樣就是以Remote shuffle service的方式部署。

在on-perm的集群中,Spark driver可以很好的利用data locality的特性,在push/merge節點結束後,可以將reduce task儘可能調度到數據所在的節點上,可以直接讀取本地數據,效率更高,減少了網絡的傳輸也不容易失敗。

Handling Stragglers and Data Skews

image.png
因為Spark計算引擎是BSP模型,所以在map端階段全部完成之前reduce端不會開始計算,因此在Push/Megre階段,為了防止部分Push/Merge較慢影響下游reduce task開始執行。Magnet支持了最大的超時機制,利用上面提到的fallback行為,在超時之後就標記該map端的分區為unmerged,這樣就跳過了這部分慢節點,直接開始reduce階段。
而針對數據傾斜場景,為了避免reduce端合併的文件過大,這時Magent的解法是和Spark的Adaptive execution 相結合,根據運行時採集到的每個block的大小,當block 大於某個閾值時,就在合併chunk的階段跳過這種block,還是通過fallback行為直接讀取原來mapper端較大的數據塊

Parallelizing Data Transfer and Task Execution

image.png
在Hadoop的Map-Reduce模型中,通過 “Slow start” 技術可以在Map task都完成之前,部分Reduce task可以先開始進行數據預拉,實現了比較有限的並行化
而在Spark中,通過數據拉取和數據處理的線程解耦,這兩者有點類似於一組生產者和消費者。
而在Magnet中也採用了類似的技術,在mapper端Push task 和 mapper task解耦,但是這裡不太理解這個mapper端解耦的收益,因為本身就是在mapper task結束之後才開始進行push task,也就不存在計算線程和io線程並行的說法。可以理解的是可以通過這個方式和mapper task的框架線程解耦。
然後在reduce端,為了最大化並行讀取的能力,不會將reduce端的數據只合併成一個文件,而是切成多個MB大小的slice,然後reduce task可以發起並行讀取的請求最大化的提高吞吐。

小結

從上面可以看出Magent的幾個設計宗旨

  • 儘可能的避免給shuffle service 增大負載
    • 所有的排序的動作只會發生在mapper端或者reducer端,所以排序佔用的資源是executor節點的
    • merge時不會有數據buffer的動作,數據buffer在executor端完成,在Shuffle Service側只要直接進行數據appen。
  • 儘力而為,數據備份讀取提供更好的容錯特性。並很好的利用了這兩份數據做了更多的設計
  • 儘管如今普遍都是存算分離的架構,但是在Magent的設計中data locality的特性還是佔據的很重要的位置

How to evaluate

很多系統設計最後對於系統的測試設計其實也很有看點。在論文里提到了Magent採用了模擬和生產集群兩個模式來最終衡量新的Shuffle Service的效果。

Magnet 開發了一個分佈式的壓測框架,主要可以模擬以下幾個維度

  • 模擬shuffle service集群所會創建的總的連接數
  • 每個block塊的大小
  • 總的shuffle的數據量

並且可以模擬fetch和push的請求

  • fetch請求會從一個Shuffle serice節點將block發送到多個客戶端
  • push請求會從多個客戶端將數據發送到一個shuffle service節點

那衡量的指標有哪些

  • 在不同的block大小下, Magnet完成Push Merge和Reduce fetch的時間已經Spark 原生Shuffle Service完成fetch的時間比較
  • Disk IO 衡量在fetch 和 push的場景下,不同的block大小對於磁盤吞吐能力的影響
  • Shuffle Service的資源開銷 主要是測試單機的shuffle service,這裡看到一個比較驚奇的數據,在測試的過程中的資源消耗為0.5c 300M,開銷的確很小。

其他的指標數據就不一一列舉了,可以查看原文相關章節獲取

最後上線後的優化效果
image.png
Figure 1: Shuffle locality ratio increase over past 6 months

image.png

參考

//mp.weixin.qq.com/s/8Fhn24vbZdt6zmCZRvhdOg Magent shuffle 解讀
//zhuanlan.zhihu.com/p/397391514 Magnet shuffle解讀
//zhuanlan.zhihu.com/p/67061627 spark shuffle 發展
//mp.weixin.qq.com/s/2yT4QGIc7XTI62RhpYEGjw
//mp.weixin.qq.com/s/2yT4QGIc7XTI62RhpYEGjw
//www.databricks.com/session_na21/magnet-shuffle-service-push-based-shuffle-at-linkedin
//issues.apache.org/jira/browse/SPARK-30602
//www.linkedin.com/pulse/bringing-next-gen-shuffle-architecture-data-linkedin-scale-min-shen