MapReduce (MIT 6.824: Lec 1: Preparation)

  • 2020 年 5 月 2 日
  • AI

從本篇文章開始,專欄將以《MIT 6.824:Distributes Systems》的課程邏輯出發,逐步更新課程內的全部內容,敬請關注,謝謝。

如果想要跟方便的查看課程的更新內容,也歡迎關注微信公眾號:《油麻酥愛學習愛健身》,微訊號:youmasu

除了MIT分散式課程的學習以外,公眾號還會不定期分享自己的健身經驗,包括,家庭自重健身,健身房增肌減脂,日常飲食營養等健身內容,再次謝謝大家。

一,摘要

MapReduce是一種編程模型,也是一種處理和生成大規模數據集的具體實現。

它使用map函數處理一個key-value源數據,生成若干個key-value的中間數據集合。再使用reduce函數將所有中間數據中擁有相同key的value合併起來生成目標結果。

使用MapReduce方式實現的程式將自動獲得並發的能力,並能在大規模集群上運行。Google使用的MapReduce實現擁有高度的擴展性,它每天在數千台機器上處理TB級的數據。

二,導論

Google每天都要處理海量數據,但大多數情況下,程式設計師都需要設計和編寫極其複雜的程式來處理簡單的數據運算,將大量的時間都浪費在對輸入數據的拆分,程式在集群上的調度,機器失敗處理,集群間的通訊等通用功能上。

因此,受到Lisp中map,reduce原語的啟發,我們設計了一個只需要關注數據核心處理邏輯,隱藏複雜系統設計的處理框架MapReduce。該框架用戶只需自定義map和reduce操作,就能執行分散式並發處理,並在處理失敗時默認重新執行作為容錯的基本原則。

這項篇文章會提供一個可以用於在大規模集群上自動並行的,簡單而又強大的編程介面及其高性能實現。

三,編程模型

任務的輸入和輸出都是kv數據集,使用Map和Reduce函數來表示具體運算。

Map函數,由用戶編寫,輸入一個kv數據,生成一系列中間的kv數據集。MapReduce會將中間數據集的values按相同的key(比如I)分組,作為Reduce函數的輸入。

Reduce函數,由用戶編寫,接收一個中間數據的key(比如I)和它擁有的values集合,並將它們合併成一個更小規模的values集合。通常,MapReduce通過迭代器向用戶定義的Reduce函數分批提供輸入數據,以避免全部數據不能載入記憶體的情況,最後Reduce函數在每次被調用後,計算得到0個和1個結果作為輸出。

當然,針對不同的任務,甚至是相同的任務,不同的用戶對kv的定義,都各有不同。比如,詞頻統計中,k1是文章id,v1是文章text,k2是單詞word,v2是詞頻1。

但是,針對一次map-reduce中原始輸入,中間結果和最終輸出的k-v類型,卻有一個默認的限定。即,map輸入的k-v類型和輸出的k-v類型不同,reduce輸入的k-v類型和輸出的k-v相同

四,MapReduce實現

對同一個map-reduce的基本思想,不同的人在不同的系統環境下會有不同的實現。一個建議的依賴準則是系統環境,比如,針對共享記憶體的多進程map-reduce,針對NUMA多CPU的map-reduce或者針對網路通訊的集群map-reduce等等。

GoogleMapReduce的具體實現依賴於普通商用機集群,主要的配置如下:(1)Linux系統,雙核x86機器,2-4GB記憶體;(2)理論網路速度是100M/s或1GB/s,但實際場景下要小得多;(3)成百上千台普通機器組成的集群,機器錯誤時有發生;(4)基於GFS的分散式文件系統,系統內基於replication來實現軟體層面的高可用和高可靠;(5)用戶向調度系統提交Job(每個Job包含一組tasks),調度器負責在集群上尋找可用的機器執行Job。

1,實現概述

全部的輸入數據被自動並行拆分成M份後,多台機器分別讀取M份數據後並行調用Map過程,輸出中間數據。Reduce函數將M份中間數據的key值再拆分成R份進行處理。其中,拆分方法和R值由用戶定義,通常是按hash取模的實現。具體流程如下:

(1),用戶程式端的MapReduce框架首先將輸入文件拆分成16M-64M左右大小的M份文件後,fork出多個MapReduce Worker進程,執行指定的MapReduce操作。

(2),被Fork出的進程中,有一個是Master進程,所有的Map或Reduce任務,將由它指派給其他空閑的Worker進程。其中,Map任務有M個,Reduce任務有R個。

(3),如果一個Worker進程被分派為Map任務。它將首先讀取一份被拆分的數據,並從數據中解析出原始的key-value值,並將其輸入到用戶定義的Map函數中。Map函數輸出的key-value中間數據被快取在記憶體里。

(4),記憶體中快取的中間數據會被周期性調用拆分函數,拆分成R份刷入磁碟。寫入磁碟的位置會通知給master進程,master會將這些位置資訊轉交給Reduce Worker。

(5),當Reduce Worker被通知到後,它會使用RPC調用從硬碟內讀取Map Worker輸出的中間數據結果,並對所有中間數據按key值排序分組。因為要將所有的中間數據分配給若干個Reduce Worker並發執行,排序是必要的。當記憶體無法滿足其數據規模時,可以使用藉助文件系統的外部排序。

(6),Reduce Worker將排序後的,按key值分組後的中間數據轉交給用戶定義的Reduce函數迭代執行。Reduce函數的輸出結果會被依次附加到對應這R份中間數據中的特定輸出文件中。

(7),當所有Map和Reduce Worker完成後,Master進程喚醒用戶程式。此時,MapReduce框架會將控制器交還給用戶進程。

(8),該執行過程會成功輸出R個輸出文件(每個 Reduce任務生成一個,文件名由用戶定義)。通常情況下,用戶不需要將這R份文件合併成一個,因為它們會被作為下一個MapReduce任務或另一個分散式應用的輸入數據,它們會很好的處理這個問題。

2,Master節點數據結構

Master節點有一些特定的數據結構來管理每一個map任務和reduce任務。首先,它會存儲3種狀態(空閑,就緒,完成),並且會記錄每一個非空閑Worker進程的ID。

Master節點就好像map任務和reduce任務中間的導管一樣,傳遞中間數據文件的區域位置。所以,對每一個已完成map任務生成的R個中間數據文件,Master節點都會存儲它們的位置和大小,當map任務有更新時也會通知更新Master內的對應數據,Master也會同步增量通知給就緒狀態的reduce任務。

3,容錯

MapReduce設計上就是在大規模集群上處理TB級數據的框架,容錯處理顯得尤其重要。

  1. ,Worker錯誤

Master節點會定期ping Worker節點。如果ping不通,表示worker節點錯誤。當前節點上的所有map已完成的任務都要被回滾到空閑狀態,並對所有其他worker可見。同理,錯誤Worker上的所有就緒狀態的map和reduce任務,也要被回滾到空閑狀態,供Master重新調度。

當一個map任務首先被Worker A執行,出錯後被Worker B執行。此時所有正在執行對應reduce任務的Worker都會被通知需要重新執行。所有還沒有從Worker A讀到數據的reduce任務會從Worker B中去讀。

MapReduece在設計上就可應對大規模的Worker錯誤。比如,因集群維護導致的大規模網路不可達就是最常見的錯誤之一,這時,MapReduce只需要簡單地重新調度執行所有不可達節點上已完成的任務,然後繼續推進,直到全部完成 。

  1. ,Master錯誤

Master節點會周期性的checkpoints它所包含的數據。當Master節點失敗後,新節點可以非常簡單的從最新的一個checkpoints恢復集群。但是,Google當前實現的Master是單點的,如果它發生錯誤,整個集群的MapReduce計算就會失敗。用戶可以定義失敗後的處理,比如依賴特定條件的重試操作等。

(3),保持確定性的輸出結果

當用戶定義的map/reduce任務可以對輸入數據產生確定性輸出時,MapReduce能產生一條確定的執行序列輸出一樣的最終結果。當某些map/reduce任務的輸出不確定時(錯誤發生),MapReduce可能會生成若干條不同的輸出序列,產生不同的結果,但對特定的一條路徑,只會輸出特定的結果。

為了實現這一點,map和reduce任務的原子提交就最為重要。每一個就緒任務都會將它的輸出寫入到私有的臨時文件,比如,一個reduce任務生成一個文件,一個map任務生成R個文件(每個對應一個reduce任務)。當map任務完成的時候,它會通知master節點,並將R份臨時文件的位置也同時送到。如果這是一條全新的消息,Master會記下這些文件位置,否則,直接丟棄。

當reduce任務完成後,它會原子性的重命名它輸出的臨時文件至最終的文件名。如果在其他機器上也執行完全相同的reduce任務,多個相同的重命名操作會被並發執行,因此,MapReduce所依賴的分散式文件系統GFS必須要能保證原子性的重命名操作,確保只有一個重名操作能被執行。

(4),本地化

網路頻寬是稀有資源,所以MapReduce盡量使用集群的本地硬碟來完成數據輸入。本地數據存儲在GFS上,它將數據按64M分塊並在不同的機器上存儲多份(通常是3份)。MapReduce的Master節點會優先調度那些本地硬碟上有對應輸入數據備份的map任務,當map任務失敗,它會嘗試調度和對應輸入數據處於一個交換機子網內的Map Worker,來節約頻寬。當MapReduce集群非常大時,輸入數據基本都是從本地讀取,基本不消耗網路頻寬。

(5),任務粒度

通常Map階段會並發M份,Reduce階段並發R份,並且M和R也遠大於集群中的Worker數量。當一個Worker可以同時處理多個map或reduce任務時,可有效提高系統的整體負載並加速Worker失敗後的恢復效率:因為失敗Worker上已完成的任務,可以快速」蔓延「到其他Worker機器上,重新執行。

那麼,如果估算M和R的上限呢?對Master節點而言,它必須做出O(M+R)次調度決策並在記憶體中保存O(M*R)個任務狀態,不過O(M*R)的常數項非常小,一對map/reduce任務,大約也就1byte的數據而已。

通常情況下,因為reduce任務都已一個單獨的輸出文件結束,R通常由用戶來指定。在實踐中,我們傾向於去選擇M的大小,以至於使每個map任務的輸入文件都在16M到64M的範圍內,來提高」本地化「的效率。而R的選擇,通常只是Worker機器數的一個小倍數而已 。比如,Google在使用2000台Worker機器的前提下,通常設置M=200000和R=5000。

(6),執行備份任務

MapReduce框架執行時間超長的原因,通常是因為某台機器在執行最後一些map或者reduce任務時,花費了異常超長的計算時間。這可能有很多因素產生,比如,一台機器上磁碟有損壞,一直在做錯誤校正,導致它的讀取性能從30M/s下降至1M/s;或者,混合部署的集群在執行其他分散式系統的任務,導致它執行更消耗CPU,記憶體,磁碟和網路頻寬的MapReduce任務更慢;再或者,一台機器系統初始化的bug,導致CPU快取被非預期的禁用,導致計算效率下降了100倍。

MapReduce框架有一個通用策略來緩解這個問題。當一個MapReduce操作要完成之前,Master的調度器會將剩餘的就緒任務起一個備份同時執行,所以,一個MapReduce的操作完成只需要主任務或備份任務有一個完成即可。Google將這一機制優化到只需要額外增加幾個百分點資源的消耗,卻能將任務的執行時長平均縮減幾十個百分點。

4,細節

雖然MapReduce提供的Map和Reduce函數已經足夠業務使用了。但還有一些小的函數擴展可以考慮使用。

(1),拆分函數(Partitioning Function)

在map結束後,記憶體中的數據要被拆分成R份寫入磁碟,供R個reduce任務使用,並輸出R個文件。「拆分成R份」的工作默認都使用 hash(Key) mod R來實現,雖然它簡單有效,但有時候需要更訂製化的需求,比如,Key是URL的時候,或許希望將同一個站點的URL都放到一個reduce任務中執行,此時,MapReduce框架提供hash(Hostname(urlkey))來實現目標。

(2),排序函數(Ordering Guarantees)

在給定拆分函數後,拆分後的中間數據會按照key/value升序依次處理,排序函數實現有序的輸出文件也非常簡單,便於後期查找和檢索。

(3),合併函數(Combiner Function)

在某些場景下,map任務中生成中間數據的部分可能和reduce任務很像,比如,詞頻統計中,map任務不斷的生成<word,1>,並通過大量的網路傳輸給到reduce任務,由reduce匯總得到word的匯總結果。其實大可不必這樣,用戶可以自定義合併函數,在map任務中就先將<word, 1>匯總一次,再給reduce任務使用,減少網路傳輸,提高效率。

合併函數可以在每台執行map任務的機器上運行,程式碼上通常和reduce任務相似,兩者的區別是MapReduce框架對兩者輸出結果的不同處理。合併函數輸出到中間文件作為reduce的輸入,reduce函數則是直接輸出最後的結果文件。

(4),輸入輸出類型

MapReduce框架支援不同格式的文件輸入,比如,txt文件輸入,在map任務中通常以文件偏移為key,偏移對應那一行的內容為value。即,用戶可以自定義對不同輸入類型數據的處理,只需要定義對key-value的拆分,以便輸入給map任務即可。並且,如果用戶實現reader介面,他甚至可以自定義實現從任何地方讀任何格式數據作為輸入。

MapReduce框架對輸出格式的支援也是一樣。

(5),副作用(Side-effects)

用戶可以使用MapReduce在輸出最終文件的時候,順便輸出一些說明文件。但MapReduce僅支援對寫單個輸出文件和重命名的原子操作。如果一個map/reduce任務想要生成多個輸出文件的話,就需要自己保證文件一致性操作,因為MapReduce框架不支援對多個輸出文件的原子化「兩階段提交」。

(6),忽略「壞」數據

通常用戶程式的bug可能會導致MapReduce框架崩潰,雖然最常見的解決方法是修復bug,但可能bug來源於三方庫,那怎麼辦呢?MapReduce提供一種機制,可以忽略「壞」數據,這在一些大數據統計場景下非常有用。

當Worker進程內置的訊號處理器捕獲到用戶程式因為段錯誤或者其他bug產生訊號時,會發送一個包含一個序列號的「last gasp」UDP包到Master節點。當Master節點發現某一條數據存在多個失敗時,它會在下一次重新調度Map/Reduce任務的時候會通知跳過這條數據。

(7),本地擴展

因為所有計算都發生在分散式系統上,任務又是靠Master節點動態調度完成,當機器數量很多時,map任務和reduce任務的debug非常麻煩。MapReduce框架提供一種可選的實現,可以在單機順序執行所有map/reduce的工作,幫助用戶在本地debug,profile和small-scale testing。用戶甚至可以只執行特定的map任務並使用gdb調試它。

(8),狀態資訊

Master節點內部還開啟了一個HTTP服務,提供一系列的狀態資訊供用戶閱讀。比如,當前計算的進度,就緒任務數,輸出文件鏈接,輸入輸出文件大小,標準錯誤等等。用戶可以評估當前計算任務的狀態是否符合預期。同時,在頂層的狀態頁,Master還會展示哪一個 Worker在處理哪一個具體的map/reduce任務的時候,發生了錯誤,供用戶調試。

(9),計數器

MapReduce框架提供一個計數器包來統計各類事件的出現次數。用戶可以在程式碼內創建一個計數器對象,並在指定的map/reduce任務中遞增即可。每一個Worker中統計的結果會定期發給Master,由Master匯總後再返回給用戶程式碼。Master保證計數正確,比如,忽略重複執行的map/reduce任務等等。重複執行通常發生在「執行備份任務」或「Worker失敗重試」中。框架會維護某些特定的計數器,比如,輸入key-value數和輸出key-value數等等。用戶可以通過自定義的計數器在程式碼內assert邏輯。

5,性能

MapReduce將演示2個任務的執行效率:一個是從TB級數據中匹配特定模式(Grep);另一個是對TB級數據排序(Sort)。這2個任務基本包含了:在電腦內載入數據,並篩選特定數據的所有場景。

(1),集群配置

1800台機器。實驗在周末的下午完成,電腦資源比較空閑。

每台機器的配置: 2個Intel 2GHz CPU,開啟超執行緒;4G記憶體,大概有1-1.15G是供集群中的其他業務使用;2個160GB的IDE硬碟和千兆網卡。

所有機器位於一個2層樹狀的交換機網路,其中匯聚節點擁有100-200Gbps的網路頻寬,並且放置在同一個機房,機房內機器間的消息往返時間小於1ms。

(2),Grep

在TB級數據內匹配某個稀有模式(3個字元,大約92337條匹配)。輸入數據被分成15000份,每份64M(M=15000),最終輸出到一個文件中(R=1)。

圖片中縱軸表示輸入數據的載入速度。當數據被分配到更多機器執行MapReduce計算時,載入速度逐漸提升至最高30GB/s,此時有1764個Worker在工作。當map任務完成後,速度開始下降,直到第80s的時候回到0。整個計算過程花費150秒,包含了一分鐘左右的預熱時間,包括MapReduce程式在不同Worker中傳遞和GFS打開超過1000個輸入文件讀取資訊以期進行「本地化」優化的延遲等。

(3),Sort

對TB級數據排序,程式碼不到50行。其中3行是Map函數,負責從每行文本中提取10個位元組的key用於排序,並以key-value的形式保存為中間數據。Reduce函數使用內置的Identity功能,直接輸出中間輸出作為最終的結果,在GFS上保存2份。

同樣,MapReduce任務中,M=15000,R設置為4000,即將排序結果輸出到4000個文件中。拆分函數以key值的第一個位元組作為依據,並按不同的分布取樣排序結果的分割點。

圖中第一行表示輸入數據的讀取速度。不同於Grep任務,Sort任務讀取輸入數據的峰值是13GB/s,200秒左右排序完成。讀取峰值較低的原因是排序任務要花將近一半的時間在中間數據讀取的IO上,而Grep則沒有這個問題。

圖中第二行表示Map任務完成後,中間數據通過網路傳遞給Reduce任務的速度。一旦第一個Map任務完成,就開始有數據傳輸。第一個峰值發生在1700台機器上的Reduce任務開始執行,大概300秒後,第一批Reduce任務完成,開始執行剩下的Reduce任務,最終花費600秒時間。

圖中第三行表示排序後將結果輸出到文件的速度。輸出發生在Shuffle第一個駝峰後的幾秒,因為機器正忙於對中間數據進行排序,數據的寫入速率保持在2-4GB/s,最終在第850秒的時候全部完成。包含啟動的預熱時間在內,總計891秒的排序時間。與當前最高效的排序框架TeraSort的1057秒的速度接近。

注意,輸入數據的速度最快,是因為我們有「本地化」的優化,大多數數據都是從本地磁碟讀取。shuffle速度第二,是因為輸出的結果文件要在GFS上寫2份,來保證數據的可靠性和可用性。如果文件系統使用擦除編碼(erasure coding)的話,相比replication,對網路頻寬的要求會進一步降低。

(4),備份任務的優化

在不開啟備份任務的執行下,圖形與普通類似,最終完成的時間顯著增長。直到第960秒,還有5個Reduce任務尚未完成。直到300秒後,最後一個Reduce任務才徹底結束。整個過程花費1283秒,比開啟備份任務的情況下,延長44%。

(5),任務出錯的優化

在1746個Worker機器執行一段時間後,特意kill掉200台機器上的處理任務,來模擬在排序過程中機器出錯的情況。Master調度器會立刻在這些機器上重啟任務開始執行。

之所以存在負的輸入速度,是因為之前已經完成的Map任務被殺掉了,需要重新執行。最終的執行時間是933秒,比正常情況只延長5%。

6,經驗

MapReduce最大的好處使用簡單的程式,就可以在一個大的分散式集群中執行計算任務。在Google中,MapReduce重構了生產環境中的索引系統,為搜索提供可靠服務。相比重構前的系統,MapReduce的優勢如下:

(1),索引程式碼更簡單,小巧和易於理解。因為容錯,分散式和並行處理都丟給MapReduce,業務程式碼從3800行C++減少為700行。

(2),性能更好,邏輯更解耦,使得擴展和優化更簡單。老系統上可能需要幾個月的優化工作在MapReduce上只需幾天就能搞定。

(3),運維更簡單。大量的機器出錯,機器太慢和網路衝突都被MapReduce接管後自動解決,同時,只需擴展機器就能提升整個索引系統的性能。

7,類似的框架

(1),很多系統提供一套受限的編程框架來解決並行問題,比如,Bulk同步編程和某些MPI原語對並行編程提供高層抽象。但MapReduce簡化和提煉了它們。更重要的是,MapReduce還在大規模集群上原生提供一套容錯機制,這是擁有並行能力的框架所不具備的。

(2),MapReduce「本地化」的靈感來自於active disks技術,它們會將計算推送至距離本地硬碟更近的計算單元,減少對網路頻寬的消耗。MapReduce則更進一步,直接在擁有數據的機器上執行計算。實現不同,思想類似。

(3),MapReduce「備份任務」的靈感與在Charlotte系統中的eager調度機制類似,但eager調度機器可能會重複執行出錯的任務,致使整個任務失敗。MapReduce則默認會跳過一些錯誤數據。

(4),MapReduce所依賴的集群管理系統的實現與Condor的集群管理思想類似。

(5),MapReduce排序功能的實現參考了NOW-Sort。即,Map任務將數據拆分成能排序的分組,並發送給其中一個Reduce任務在記憶體中執行排序。但NOW-Sort不能自定義Map和Reduce函數,使用場景更有限。

(6),River提供的編程模型是通過消息隊列來完成任務的調度,為了在異構的硬體環境和系統擾動的前提下,提供更短的平均計算時間,River會小心的在磁碟和網路傳輸中平衡完成時間。MapReduce則通過受限的編程模型提供另一種解決方案,它將一個大問題拆分成更多的小問題,並在所有可利用的機器上動態調度小問題的任務,使得更快的機器可以執行更多的任務,甚至當錯誤發生時,也可以在更接近任務所需數據的地方重新調度,減少完成時間。

(7),BAD-FS則使用一種完全不同的編程模型,它的任務執行發生在廣域網網路中。不過,MapReduce和它有2個相似點:使用重複調度來恢復機器失敗的問題;使用「本地化」減少在擁塞網路環境下的網路傳輸。

(8),最後,使用重複調度(re-execution)來容錯的方案也被TACC所採用。

8,結論

MapReduce在Google內部獲得成功有3個原因:

(1),受限制的編程模型,使得在大規模集群運算中易於使用,因為它隱藏了幾乎所有分散式細節(並行,容錯,本地化和負載均衡)。

(2),有限的網路頻寬,使得MapReduce著重優化網路傳輸,比如,使用本地磁碟讀取數據,完成Map任務後在本地磁碟寫入中間數據等。

(3),重複調度(re-execution,redundant execution)減少慢機器的影響(備份任務)和解決機器錯誤或數據丟失。

9,參考文章

(1),pdos.csail.mit.edu/6.82