【譯文】MapReduce:大型集群上的簡化數據處理

  • 2019 年 11 月 13 日
  • 筆記

版權聲明:本文為部落客原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接和本聲明。

本文鏈接:https://blog.csdn.net/jxq0816/article/details/102954127

作者:Jeffrey Dean 和 Sanjay Ghemawat

摘要:

MapReduce是一個編程模型,以及處理和生成大型數據集的一個相關實現,它適合各種各樣的現實任務。用戶指定計算的map和reduce函數。底層運行系統自動地將大規模集群機器間的計算並行化,處理機器故障,以及調度機器間通訊以充分利用網路和磁碟。程式設計師會發現這個系統很好使用:在過去的去年中,超過一萬個不同的MapReduce程式已經在Google內部實現,平均每天有十萬個MapReuce作業在Google集群上被執行,每天總共處理20PB以上的數據。

1、簡介

在MapReduce開發之前,作者和其他許多的Google員工實現了數以百計的處理大量原始數據(如抓取到的文檔、Web請求日誌等等)的專用計算方法,以計算各種導出的數據,如倒排索引、Web文檔圖結構的各種表示、每個host抓取到的頁面數的總結、某一天最頻繁的一組查詢。大多數這樣的計算在概念上是非常簡單的,然而它們的輸入數據量通常非常大。為了在合理的時間內完成這些計算,它們必須分布到成百上千的機器上。如何並行化計算,分發數據,以及處理故障,這些問題結合起來,往往會讓程式設計師使用大量複雜程式碼來處理,而掩蓋了原本簡單的計算。

為了應對這一複雜性,我們設計了一個新的抽象,它允許我們表達試圖執行的簡單計算,但將並行化、容錯、數據分布和負載均衡等凌亂的細節隱藏到了庫中。這個抽象的靈感來源於出現在Lisp和許多其他函數式語言中的map和reduce原語。我們實現了大部分的計算,包括為輸入的每一個邏輯記錄應用一個map操作以計算一組中間鍵值對,然後對所有共享同一個鍵的值應用一個reduce操作以恰當地結合導出的數據。此函數式模型支援用戶自定義map和reduce操作,使我們能非常容易地並行處理大型計算,和使用再執行(reexecution)作為主要的容錯機制。

這項工作的主要貢獻就是一個簡單而強大的介面,它完成自動並行化、大規模分布計算,結合該介面的一個實現在大型商用PC集群上獲得了很高的性能表現。該編程模型還可以用於同一台機器上多個核心間的並行計算。

第2部分描述了基本的編程模型並給出幾個例子。第3部分描述了MapReduce介面專門針對基於集群的計算環境的一個實現。第4部分描述了我們發現的這個編程模型的幾個很有用的改進(refinements)。第5部分描述了對各種不同任務的實現的性能度量。第6部分探索了MapReduce在Google中的應用,包括使用它作為重寫我們的生產索引系統的基礎的一些經驗。第7部分討論了相關和未來的工作。

2、編程模型

這個計算需要一組輸入鍵/值對,並生成一組輸出鍵/值對。MapReduce庫的使用者將計算表達為兩個函數:map和reduce。

map,由用戶編寫,需要一對輸入並生成一組中間鍵/值對。MapReduce庫將所有與相同鍵值 I 相關聯的值組合到一起,並將它們傳遞給reduce函數。

Reduce函數,同樣由用戶編寫,接受中間鍵 I 和這個鍵的一組值。它將這些值合併以形成一組可能更小的值。通常每次reduce調用只生成0個或1個輸出值。中間值靠一個迭代器提供給用戶的reduce函數。這使我們能夠處理大量太大以至於不能裝入記憶體的值列表。

2.1 例子

考慮一下在一個巨大的文檔集合中統計每個單詞出現次數的問題。使用者會編寫與下面偽程式碼類似的程式碼:

  1. map(String key, String value);
  2. // key: document name
  3. // value: document contents
  4. for each word w in value:
  5. EmitIntermediate(w, "1");
  6. reduce(String key, String values);
  7. // key: a word
  8. // values: a list of counts
  9. int result = 0;
  10. for each v in values:
  11. result += ParseInt(v);
  12. Emit(AsString(result));

map函數發出每個單詞加一個相關的出現次數(count)(在這個簡單例子中僅為1)。reduce函數對發給一個單詞的所有數(count)求和。

此外,用戶編寫程式碼將輸入和輸出文件名以及可選的調優參數填入mapreduce規範對象中。然後調用MapReduce函數,將它傳遞給規範對象。用戶的程式碼與MapReduce庫(C++實現)相連接。我們最初的MapReduce資料中有這個例子的完整程式【8】。

2.2 類型

儘管前面的偽程式碼是按照輸入輸出字元串形式編寫的,概念上由用戶提供的map和reduce函數是有相關類型的。

  1. map (k1, v1) --> list(k2, v2)
  2. reduce (k2, list(v2)) --> list(v2)

也就是說,輸入鍵和值與輸出鍵和值來自不同的域。此外,中間鍵和值與輸出鍵和值來自同一個域。

3、實現

MapRedue介面的許多不同實現都是可能的。正確的選擇取決於環境。例如,一種實現可能適合一個小型的共享記憶體的機器,另外一種可能適合一個大型的NUMA多處理器,而另外一種可能適合一個更大的聯網電腦集合。在我們最初的文章發表以後,已經發展出了很多MapReduce的開源實現【1, 2】,MapReduce在各種問題領域的適用性也得到了研究【7, 16】。

這一部分描述了我們的一種MapReduce實現,其目標是目前廣泛應用在Google中的計算環境:由交換千兆乙太網連接在一起的大型PC集群【4】。在該環境中,機器通常運行Linux系統,有雙核 x86 處理器以及4-8GB記憶體。個別機器擁有1GB/s的網路頻寬,但每台機器等分的頻寬遠遠低於1GB/s。一個計算集群包含了成千上萬台機器,因此機器故障是很常見的。存儲由直接附在單獨機器上的廉價IDE磁碟提供。GFS,Google內部開發的一個分散式文件系統【10】,用來管理存儲在這些磁碟上的數據。文件系統使用複製來提供不可靠的硬體之上的可用性與可靠性。

使用者提交 jobs 給調度系統。每個 job 包含一組任務,且由調度程式映射(mapped)到集群間的一組可用的機器上。

3.1 執行概述

通過自動將輸入數據分割為一個有M個分裂(splits)的組,map調用分布在多台機器間。輸入分裂可以由不同的機器並行處理。reduce調用通過利用分割函數(比如,hash(key) mod R)將中間鍵空間劃分為R片進行分布。分割數R和分割函數都是由使用者指定的。

圖1展示了在我們的實現中MapReduce操作的整體流程。當用戶程式調用MapReduce函數,以下順序行為將會發生(圖1中標記的數字對應下面列中的數字)。

  1. 用戶程式中的MapReduce庫首先將輸入文件劃分為M片,通常每片16~64MB(由用戶通過可選參數控制)。然後啟動集群中程式的多個副本。
  2. 這些程式副本中有一個特殊的master副本。其他副本則是由master分配了work的workers。集群中需要分配 M 個 map tasks 和 R 個 reduce tasks。master挑選閑置的workers且為每個worker分配一個 map task 或 reduce task。
  3. 分配了 map task 的一個worker讀取相應輸入劃分的內容。它從輸入數據中解析出鍵/值對並將每一對傳遞給用戶定義的map函數。由map函數產生的中間鍵/值對緩衝在記憶體中。
  4. 緩衝區的鍵/值對定期地寫入本地磁碟,由partition函數劃分到 R 個區域中。這些本地磁碟上的緩衝對的位置被傳遞會master,它將負責轉發這些位置給 reduce workers。
  5. 當一個 reduce worker 被 master 通知了這些位置後,它使用遠程進程調用來讀取來自map workers的本地磁碟中的緩衝數據。當reduce worker讀取到了所有分區中的中間數據後,它按照中間鍵將其排序,從而使所有相同鍵的出現次數組合在了一起。排序是必要的,因為通常很多不同的鍵被map到了同一個reduce task。如果中間數據太大以至於不能放在記憶體中,還需要使用一個外部的排序。
  6. reduce worker對排序好的中間數據執行迭代,對每個唯一的中間鍵,它將這個鍵以及相應的一組中間值傳遞個用戶的 reduce 函數。reduce 函數的輸出被附加到這個reduce分區的最終輸出文件中。
  7. 當所有的 map tasks 和 reduce tasks 都完成後,master喚醒用戶程式。在這一點上,用戶程式的MapReduce調用返回到用戶程式碼處。

成功完成後,mapreduce執行的輸出可以在R個輸出文件中獲得(每個reduce task一個,由用戶指定文件名)。通常,用戶無需將這R個輸出文件合併到一個文件中;他們通常將這些文件作為另一個MapReduce調用的輸入,或者在來自另外一個可以處理劃分到了多個文件中的輸入的分散式應用程式中使用它們。

3.2 master數據結構

master中有多種數據結構。對每一個map task和reduce task,它存儲了其狀態資訊(限制,進行,或完成)和worker機器的身份(對於非閑置tasks)。

master是map tasks傳播中間文件區域位置到reduce tasks的導管。因此,對於每個完成了的map task,master存儲由這個map task生成的R個中間文件區域的位置和大小。master在map tasks稱後接收到這些位置和大小資訊的更新。這些資訊將逐步推送到正在進行reduce tasks的workers中。

3.3 容錯

由於MapReduce庫旨在幫助利用成百上千的機器來處理大量數據,它必須優雅地容忍機器故障。

處理worker故障

master會定期地ping每一個worker。如果在一定時間內沒有收到來自某台worker的響應,master將這個worker標記為故障。任何由worker完成的map tasks都被重置為初始閑置狀態,因而可以在其他的workers中調度。同樣,在故障worker上的任何正在進行的map task和reduce task也被重置為閑置狀態以便進行重新調度。

故障worker上已完成的map task需要重新執行,因為它們的輸出存儲在了故障機器的本地磁碟中導致無法訪問。已完成的reduce tasks無需重新執行,因為它們的輸出存儲在了全局文件系統中。

當一個map task首先由worker A執行然後又由worker B執行(因為A發生了故障),所有執行reduce task的workers將被通知重新執行。任何還未從worker A讀取數據的reduce task將從worker B讀取數據。

MapReduce適應於大規模的worker故障。例如,在一個MapReduce操作中,在運行中的集群上的網路維護導致了一組80台機器在幾分鐘內無法到達。MapReduce master簡單地重新執行無法到達的worker機器的工作且繼續前進,最終完成MapReduce操作。

語義失敗

當用戶提供的map和reduce操作是它們他們的輸入值的特定函數時,我們的分散式實現生成的輸出將與整個程式的無錯順序執行生成的輸出相同。

我們依靠map和reduce任務輸出的原子的提交來實現這一性質。每個正在進行的task將其輸出寫入私有臨時文件中。一個reduce task生成一個這樣的文件,map task生成R個這樣的文件(每個reduce task一個)。當一個map task完成後,worker發送一條消息給master,這條消息中包含了R個臨時文件的名字。如果master接收到了來自一個已完成的map task的完成消息,它將忽略這條消息。否則,它將這R個文件名記錄到master數據結構中。

當一個reduce task完成後,reduce worker自動重命名其臨時輸出文件為最終輸出文件。如果同一個reduce task在多台機器上執行,同一個最終輸出文件的多個重命名調用將被執行。我們依靠由底層文件系統提供的原子的重命名操作來保證最終文件系統狀態僅包含來自一個reduce任務執行生成的數據。

絕大多數的map和reduce操作是確定的,事實上,我們的語義等價於這種情況下的一次順序執行,這使得程式設計師能夠非常容易地推斷程式的行為。當 map 和/或 reduce 操作不確定時,我們提供了較弱但仍然合理的語義。在不確定操作存在時,一個特定reduce task R1的輸出等價於由非確定性程式的一次順序執行R1生成的輸出。然而,另一個不同的reduce task R2的輸出可能對應該非確定性程式的另一個不同順序執行R2的輸出。

考慮map task M和reduce task R1和R2。令 e(Ri) 作為作為R1的執行(這確實是一個這樣的執行)。較弱的語義出現因為 e(R1) 可能讀取了M的一次執行生成的輸出,e(R2)可能讀取了M的另一次執行生成的輸出。

3.4 局部性

在我們的計算環境中,網路頻寬是一個相對稀缺的資源。我們靠充分利用輸入數據(由GFS管理【10】)存儲在組成集群的機器的本地磁碟中這一事實來節省網路頻寬。GFS將每個文件分成64MB的塊且在不同機器上存儲了每個塊的多個副本(通常3個)。MapReeuce master考慮每個輸入文件的位置資訊且試圖調度一台含有相應輸入數據的機器上的一個map task。如果失敗,它將試圖調度與該任務的輸入的複製品相鄰的一個map task(例如,同一網路交換機中包含相同數據的兩台機器)。當在一個集群的 workers 重要部分運行大型MapReduce操作時,大多數輸入數據都是本地讀取的,並不消耗網路頻寬。

3.5 Task粒度

我們將map階段細分為M個片段,reduce階段細分為R個片段,如前所述。理想情況下,M和R應該遠高於worker機器的數量。每個worker執行多個不同tasks改善了負載均衡,且當一個worker故障後加快了恢復速度:它完成的多個map tasks可以分布到所有其他worker機器上重新執行。

由於master必須做O(M+R)此調度決策和在記憶體中保持O(M*R)個狀態,如前所述,在我們的實現中M和R的數量大小是有實際界限的。(然而,記憶體的使用量很小。O(M*R)個狀態中大約包含每個map/reduce task對一位元組的數據。)

此外,R通常受到用戶限制,因為每個reduce task的輸出最終保存在一個單獨的輸出文件中。在實踐中,我們傾向於選擇M因而每個獨立task大約有16MB到64MB的輸入數據(因而之前所述的局部優化達到最搞笑),且我們讓R是我們希望使用的機器數量的一個小的倍數。我們通常以M=200000, R=5000,使用2000台worker機器執行MapReduce。

3.6 備份Tasks

延長MapReduce操作總時間的一個普遍原因是一個掉隊者(straggler),也就是說,在這個計算中有一台機器花了異常長的時間來完成最後幾個map或reduce tasks。掉隊者會以一大堆的理由出現。比如說,一台擁有壞磁碟的機器可能經歷頻繁的矯正錯誤從而使讀取性能從30MB/s降低到了1MB/s。集群調度系統可能在這個機器上調度了其他任務,導致它更慢地執行MapReduce程式碼,由於競爭CPU、記憶體、本地磁碟或網路頻寬等資源。我們經歷的一個最近的問題是機器初始化程式碼中的一個bug導致處理器快取失效:受影響的機器計算速度放慢了100倍。

我們有一個通用機制來減輕掉隊者問題。當一個MapReduce操作接近完成時,master將調度還在進行的任務的備份執行。無論是原始或者備份執行完成,這個任務都被標記為完成。我們調整了這個機制,因而它增加了該計算的計算資源的使用,但不超過幾個百分點。我們發現它大大降低了完成大型MapReduce操作的時間。作為一個例子,當沒有備份task機制時,在5.3部分描述的排序程式多花了44%的時間完成。

4、改進

雖然由簡單編寫的map和reduce函數提供的基本功能已足以滿足大多數需求,我們發現了一些有用的擴展。這包括:

  • 用戶指定的分區(partition)函數來決定如何將中間鍵值對映射到R個reduce碎片;
  • 排序保證:我們的實現保證這R個reduce分區中的每個,中間鍵值對都按鍵的升序處理;
  • 用戶指定的結合(combiner)函數的作用是,在同一個map task內,對按照同一個鍵生成的中間值進行局部結合,以減少必須在網路間傳輸的中間數據數量;
  • 自定義輸入輸出類型,為了讀新的輸入格式和生成新的輸出格式;
  • 在單機上執行簡單debug和小規模測試的一種方式。

在【8】中有對這幾項的詳細討論。

5、性能表現

在此部分,我們利用大型集群上的兩個計算來測量MapReduce的性能表現。一個計算通過搜索大約1TB的數據來找到一個特定的模式。另一個計算對大約1TB的數據進行排序。這兩個程式代表由MapReduce用戶編寫的真正程式的一個大的子集—–程式的一個類用來從一個表示(representation)向另一個表示shuffle數據,另一個類從大數據集中提取小部分關注的數據。

5.1 集群配置

所有程式都在一個擁有大約1800台機器的集群上執行。每台機器擁有兩個支援超執行緒的2GHz的Intel Xeon處理器,4GB記憶體,兩個160GB的IDE磁碟,和千兆乙太網接入。這些機器被安排在一個二級樹形的交換網路中,該網路根部大約有100~200Gbps的聚合頻寬。所有機器都在同一個託管設施中,因此任何一對機器間的往返通訊時間不超過1毫秒。

雖然有4GB記憶體,但是大約1~1.5GB保留給了運行在集群上的其他任務。這些程式在一個周末的下午執行,此時CPUs,磁碟和網路頻寬基本都空閑。

5.2 Grep

grep程式掃描了10^10個100位元組的記錄,搜索一個相對稀有的三字元模式串(該模式串大約出現在92337個記錄中)。輸入被劃分為了大約64MB大小的片(M=15000),整個輸出都放在了一個文件中(R=1)。

圖2展示了計算隨時間推移的進展。Y軸顯示了輸入數據的掃描速率。隨著安排到MapReduce計算的機器越來越多,速率也在逐步提升,當安排了1764個workers時速度達到峰值30GB/s以上。map任務結束後,速率來時下降且在大約80秒時到達0。整個計算從開始到結束大約花費了150秒。這包括1分鐘的啟動消耗。這個消耗來自向所有workers機器傳播程式、延遲與GFS的交互以開啟一組1000個輸入文件,和獲取局部優化所需的資訊。