MapReduce: Simplified Data Processing on Large Clusters 翻譯和理解

MapReduce: Simplified Data Processing on Large Clusters

概述

MapReduce 是一種編程模型,用於處理和生成大型數據集的相應實現。用戶定義一個map函數以處理 key-value 鍵值對,生成中間態的 key-value 鍵值對。還要定義一個reduce函數來合併所有有相同中間態 key 的所有中間態 value。許多現實世界的工作都可以用這個模型實現。

以此風格編寫的程序可以自動並行化地在大型商用機器集群上運行,運行時系統負責以下任務:

  • 對輸入數據進行分區
  • 調度程序在一組機器上的運行
  • 處理機器故障
  • 管理所需的機器間通信

這令沒有任何經驗的程序員也可以設計出大型的分佈式系統。

1. MapReduce介紹

為了應對並行計算的複雜性,我們設計了一種新的抽象,它允許我們表達我們試圖執行的簡單計算,並且在庫函數中隱藏了並行化、容錯、數據分佈和負載均衡的繁瑣細節。

我們的大多數計算都涉及下面兩個操作

  • map 操作:將輸入中的每個邏輯記錄映射為一個 key-value 對。
  • reduce 操作:對所有的中間 key-value 對進行操作,生成派生數據。

MapReduce 是一個簡單強大的接口,在大型商用集群上我們需要對這個接口做高性能的實現。

2. 編程模型

MapReduce 編程模型的原理是:利用一個輸入的 key-value 鍵值對集合產生一個輸出的 key-value 鍵值對集合。

用戶自定義的 map 函數接受一個 key-value 鍵值對的輸入值,然後產生一個中間 key-value 鍵值對的集合。MapReduce 庫把所有具有相同中間態 key 的中間態 key-vlaue 對進行合併,傳遞給一個 reduce 函數。

用戶自定義的 reduce 函數接受一個中間態的 key,以及與之關聯的 value 集合。通常來說每次 reduce 只產生 0 個或 1 個輸出 value 值。通常我們通過一個迭代器把中間 value 值提供給 reduce 函數,這樣就可以處理無法放入內存的大量的 value 值的集合。

2.1 一個示例

思考一個問題,在大文檔集合中統計每個單詞出現的次數,我們依照 MapReduce 模型可能會寫出下面的偽代碼:

// key: document name | value: document contents 
map(String key, String value):
for each word w in value:
  EmitIntermediate(w, 「1″); 

// key: a word | values: a list of counts 
reduce(String key, Iterator values): 
int result = 0; 
for each v in values: 
  result += ParseInt(v); 
Emit(AsString(result));

map 函數輸出文檔中的每個詞、以及這個詞的出現次數(在這個簡單的例子里就是 1)。reduce 函數把 map 函數產生的每一個特定的詞的計數累加起來。

用戶需要編寫應用代碼,使用輸入和輸出文件的名字、可選的調節參數來完成一個符合 MapReduce 模型規範的對象。然後調用 MapReduce 函數,並把這個規範對象傳遞給它。用戶的代碼和 MapReduce 庫會自動鏈接在一起。

2.2 輸入輸出類型信息

在前面的偽代碼示例中,使用字符串處理輸入輸出。但實際上用戶定義的 map 和 reduce 函數都有相關聯的類型。

map     (k1,v1)       ->list(k2,v2) 
reduce  (k2,list(v2)) ->list(v2)

例如,在 map 函數中,輸入的 key 和 value 與輸出的 key 和 value 可能由不同的類型推導出來。而在 reduce 函數中,中間 key-value 鍵值對與輸出 key-value 鍵值對由相同的類型推導出來。

本文中的 cpp 實現將字符串作為用戶定義函數的輸入輸出,類型轉換留在用戶代碼中進行處理。

2.3 適用場景

下面是一些用 MapReduce 模型表示的簡單例子:

  • 分佈式 Grep。map 函數輸出匹配某個模式的一行,reduce 函數是一個恆等函數,即將中間數據複製到輸出。
  • 計算 URL 訪問頻率。map 函數處理日誌中 web 頁面請求的記錄,然後輸出(URL,1)。reduce 函數把相同 URL 的 value 值累加起來,產生(URL,記錄總數)作為返回結果。
  • 倒轉網絡鏈接圖。map 函數在源頁面 source 中搜索所有的鏈接目標 target,並輸出為 (target,source)。reduce 函數把給定鏈接目標 (target) 的鏈接組合成一個列表,輸出 (target,list(source))。
  • 分佈式排序。map 函數從每個記錄提取 key,輸出 (key, record),reduce 函數不改變任何值。這個運算依賴分區機制(4.1 節中描述)和排序屬性(4.2 節中描述)。

3. 實現

MapReduce 可以有多種不同的實現方式,如何選擇取決於實際環境。

  • 一些實現方式適用於小型的共享內存方式的機器。
  • 一些實現方式適用於大型 NUMA 架構的多處理器主機。
  • 一些實現方式更適合大型的網絡連接集群。

本章節描述一個 google 內部廣泛使用的運算環境下的實現:用以太網交換機連接、由普通 PC 機組成的大型集群,它有以下基本屬性:

  • x86 架構、運行 Linux 操作系統、雙處理器、2-4GB 內存的機器。
  • 普通的網絡硬件設備,每個機器的帶寬為百兆或者千兆,但是遠小於網絡的平均帶寬的一半。
  • 集群中包含成百上千的機器,因此,機器故障是常態。
  • 存儲為廉價的內置 IDE 硬盤。一個內部分佈式文件系統用來管理存儲在這些磁盤上的數據。文件系統通過數據複製來在不可靠的硬件上保證數據的可靠性和有效性。
  • 用戶提交工作(job)給調度系統。每個工作(job)都包含一系列的任務(task),調度系統將這些任務調度到集群中多台可用的機器上。

3.1 執行概況

通過將 map 調用的輸入數據自動分割為 M 個數據片段的集合,map 調用被分佈到多台機器上執行。輸入的數據片段能夠在不同的機器上並行處理。

使用分區函數將 map 調用產生的中間 key-value 鍵值對分成 R 個不同的分區。(例如hash(key) mod R),reduce 調用也被分佈到多台機器上執行。分區數量 R 和分區函數由用戶指定。

image-20220321192616647

Figure 1 展示了 MapReduce 視線中操作的全部流程。當用戶調用 MapReduce 時,將發生下面一系列操作:(序號和 Figure1 中的序號一一對應)

  1. 用戶程序首先調用的 MapReduce 庫將輸入文件分成 M 個數據片段,每個數據片段的大小一般從 16MB 到 64MB(可以通過可選的參數來控制每個數據片段的大小)。然後用戶程序在集群中創建(fork)大量的程序副本。
  2. 這些程序副本中有一個特殊的程序 Master,其他的程序都是 Worker。整個系統由 Master 分配任務。有 M 個 map 任務和 R 個 reduce 任務待分配。Master 將一個 map 任務或一個 reduce 任務分配給一個空閑的 Worker。
  3. 被分配了 map 任務的 Worker 讀取相關的輸入數據片段,從輸入的數據片段中讀取 key-value 鍵值對。然後把 key-value 鍵值對傳遞給用戶自定義的 map 函數,由 map 函數生成並輸出中間 key-value 鍵值對,並緩存在內存中。
  4. 緩存中的 key-value 鍵值對通過分區函數分為 R 個區域,周期性地寫入到磁盤上。緩存的 key-value 鍵值對在本地磁盤上的存儲位置將被回傳給 Master,由 Master 負責將這些存儲位置告訴 reduce Worker。
  5. 當 reduce Worker 接收到 Master 發來的數據存儲位置信息後,使用 RPC 從 map Worker 所在的主機的磁盤上讀取這些緩存數據。當 reduce Worker 讀取了所有的中間數據後,通過對 key 進行排序,使有相同 key 值的數據聚合在一起。由於許多不同的 key 會映射到相同的 reduce 上,因此必須進行排序。如果中間數據太大無法在內存中完成排序,那麼就要進行外排序。
  6. reduce Worker 程序遍歷排序後的中間 key-value 鍵值對,對於每一個唯一的 key,Worker 將這個 key 和與它相關的中間 value 值的集合傳遞給用戶自定義的 reduce 函數。reduce 函數的輸出被追加到所屬分區的輸出文件。
  7. 當所有的 map 和 reduce 任務完成後,Master 喚醒用戶程序,這個時候在用戶程序對 MapReduce 的調用才返回。

任務完成後,MapReduce 的輸出存放在 R 個輸出文件中,即每個 reduce 任務完成一個輸出文件。一般情況下用戶不需要將這些文件合併成一個文件,他們通常將這些文件作為另一個 MapReduce 的輸入,或者在另一個可以處理多個分割文件的分佈式應用中使用。

3.2 Master數據結構

Master 持有一些數據結構,存儲每一個 map 和 reduce 任務的狀態(空閑、工作中、完成),以及 Worker 機器的標識。

Master 就像一個數據管道,中間文件存儲區域的位置信息通過這個管道從 map 傳遞到 reduce。因此,對於每個已完成的 map 任務,Master 存儲了 map 任務產生的 R 個中間文件存儲區域的大小和位置。當 map 任務完成時,Master 接收到位置和大小的更新信息,這些信息被逐步遞增的推送給那些正在工作的 reduce 任務。

3.3 容錯機制

MapReduce 需要有處理集群中機器故障的能力。

3.3.1 Worker 錯誤

Master 會周期性地 ping 每個 Worker。如果在約定的時間範圍內沒有收到 Worker 返回的信息,那麼 Master 會將這個 Worker 標記為失效。

  • 所有由這個失效的 Worker 完成的完成的 map 任務會被重設為初始地空閑狀態,這些任務就會被重新安排給其他的 Worker。
  • Worker 失效時正在執行的 map 或 reduce 任務也會被重新置為空閑狀態,等待重新調度。

當 Worker 故障時,由於已經完成的 map 任務的輸出存儲在這台機器上,map 任務的輸出已經不可訪問,所以必須重新執行。而已經完成的 reduce 任務的輸出文件存儲在全局文件系統上,不需要再次執行。

當一個 map 任務首先被 Worker A 執行,之後由於 Worker A 失效了又被調度到 worker B 執行,這個重新執行的任務會被通知給所有執行 reduce 任務的 Worker,任何還沒有從 Worker A 讀取數據的 reduce 任務會從 Worker B 讀取數據。

MapReduce 可以處理大規模 Worker 失效的情況。比如 MapReduce 操作執行期間,有大面積的機器因網絡維護等原因不可用,MapReduce Master 只需要簡單地再次執行那些不可訪問的 Worker 完成的工作,之後繼續執行未完成的任務即可。最終就可以完成這個 MapReduce 操作。

3.3.2 Master 錯誤

Master 錯誤的一個簡單的解決辦法是讓 Master 周期性地將 3.2 節中描述的數據結構寫入磁盤,並保存一個檢查點 checkpoint。如果這個 Master 任務失效,可以從最後一個檢查點開始啟動另一個 Master 進程。

但是考慮到只有一個 Master 服務器,Master 失效後再恢復是比較麻煩的,因此我們在這裡的實現是如果 Master 失效,就中止 MapReduce 運算。客戶端可以檢查到這個狀態,也可以根據需要重新執行 MapReduce 操作。

3.3.3 出現故障時的重做語義

當用戶提供的 map 和 reduce 操作是確定函數(即輸入相同時一定產生相同輸出)時,我們的分佈式實現在任何情況下的輸出都是一致的,無論是順序還是所謂的亂序。

我們依賴 map 和 reduce 任務的輸出是原子的提交來完成這個特性。每個工作中的 Worker 都會把它的輸出寫入到私有的臨時文件中。每個 reduce 任務會生成這樣一個文件,而每個 map 任務則會生成 R 個這樣的文件(一個 reduce 任務對應一個文件)。

  • 當一個 map 任務完成時, Worker 發送一個包含 R 個臨時文件名的完成消息給 Master,如果 Master 從一個已經完成的 map 任務再次接受到一個完成消息,Master 將忽略這個消息;否則 Master 會把這 R 個文件的名字記錄在數據結構里。
  • 當一個 reduce 任務完成時,reduce Worker 進程以原子的方式把臨時文件重命名為最終的輸出文件。如果一個 reduce 任務在多台機器上執行,針對同一個最終的輸出文件將有多個重命名操作執行。我們依賴底層文件系統提供的重命名操作的原子性來保證最終的文件系統僅僅包含一個 reduce 任務產生的數據。

實際上,我們絕大多數的 map 操作和 reduce 操作都是確定性的,並且存在這樣一個事實:我們的失效處理機制的結果等價於一個順序的執行操作。這使得程序員可以很輕易地推斷出他們的 MapReduce 程序最後呈現的結果。

當 map 或 reduce 操作不是確定性的函數時,我們提供雖然較弱但依然合理的處理機制。當使用非確定操作的時候,一個 reduce 任務 R1 的輸出等價於一個 R1 連續執行時產生的輸出。但另一個 reduce 任務 R2 的輸出也許等價於一個 R2 以另一個順序連續執行時的輸出。

考慮 map 任務 M 和 reduce 任務 R1、R2。我們設定 $e(R_i)$ 是 $R_i$ 已經提交的執行過程。當 $e(R_1)$ 讀取了由 M 一次執行產生的輸出,而 $e(R_2)$讀取了由 M 的另一次執行產生的輸出,較弱的失效處理語義就可能發生。

3.4 數據存儲位置(數據本地優化策略)

在我們部署 MapReduce 集群的運行環境中,網絡帶寬是相當匱乏的資源。我們通過盡量把輸入數據(由 GFS 管理)存儲在集群中機器的本地磁盤上來節省網絡帶寬。

GFS 把每個文件按 64MB 一個 Block 來分隔,每個 Block 保存在多台機器上,環境中就存放了多份拷貝(一般是三個)。MapReduce 的 Master 在調度 map 任務時就會考慮輸入文件的位置信息,盡量將一個 map 任務調度在包含相關輸入數據拷貝的機器上執行。

如果上述努力失敗了,Master 將嘗試在保存有輸入數據拷貝的機器附近的機器上執行 map 任務,比如一個 switch 內的兩個 Worker。當在一個足夠大的 cluster 集群上執行大型 MapReduce 操作時,大部分的輸入數據都能從本地機器讀取,因此消耗很小的網絡帶寬。

3.5 任務粒度和M、R的取值

如前所述,我們把 map 拆分為了 M 個片段,把 reduce 拆分為 R 個片段執行。理想狀態下 M 和 R 應當比集群中 Worker 的機器數量要多得多。在每台 Worker 都執行大量不同任務能夠提高集群的動態負載均衡能力,並且加快故障恢復的速度(失效機器上的大量 map 任務都可以分不到所有其他的 Worker 上執行)。

但是事實上,我們的具體實現中對 M 和 R 的取值都有一定的客觀限制,因為 Master 必須執行 O(M+R) 次調度,並且在內存中保存 O(M*R) 個狀態。

更進一步,R 值通常由用戶指定,因為每個 reduce 任務最終都會生成一個獨立的輸出文件。實際使用時我們也傾向於選擇合適的 M 值,以使得每個獨立任務都是處理大約 16M 到 64M 的輸入數據,這樣上面的數據本地存儲優化策略才最有效。

另外,我們把 R 值設置為我們想使用的 Worker 機器的數量的小的倍數。

在 Google 的實踐中,往往有這樣的組合:M = 2000,R = 5000,並有 2000 台 Worker。

3.6 任務備份進程

在生產環境中,影響一個 MapReduce 總執行時間的重要因素通常是落伍者,即那些最後花了很長時間才完成的 map 和 reduce 任務,他們往往是導致 MapReduce 操作總執行時間超過預期的決定性因素。

出現落伍者的原因非常多,比如:

  • 如果一個機器硬盤出問題,讀取數據速度大大降低。
  • 如果 cluster 的調度系統在一台機器上調度了其他任務。
  • 由於 CPU、硬盤、網絡等各種客觀因素。
  • 代碼上有 bug,導致緩存失效等問題。

我們有一個通用的機制來減少落伍者,即任務備份進程。當一個 MapReduce 接近完成時,Master 調度備用的任務備份進程來執行剩下的,處於處理中狀態的任務。無論時最初的執行進程,還是任務備份進程完成了任務,都把這個任務標記為已完成。

任務備份進程往往只會佔用比正常操作多幾個百分點的計算資源,但對減少超大 MapReduce 的總處理時間效果顯著。

4. 優化

4.1 分區函數

MapReduce 的使用者會指定 reduce 任務和 reduce 任務輸出文件的數量 R。我們在中間鍵值對的 key 上可以使用分區函數來對數據進行分區,之後再輸入到後續任務執行進程。

一個默認的分區函數是使用hash方法,如hash(key) mod R進行分區。然而有時其他的一些分區函數對 key 進行的分區將非常有用。比如當我們的 key 是 URLs,我們希望把每個主機的所有條目保持在同一個輸出文件中,那麼 MapReduce 的用戶可能要考慮提供專用的分區函數。比如hash(Hostname(urlkey)) mod R,使用這個分區函數就可以把所有來自同一個主機的 URLs 保存在同一個輸出文件中。

4.2 分區內的順序擔保

MapReduce 保證在給定的分區中,中間鍵值對的處理順序是按照 key 值增量順序處理的。這樣的順序保證在每個分區產生一個有序的輸出文件。當輸出文件需要按 key 進行高效的隨機訪問查找,或者輸出的數據集需要進行排序時,這非常有意義。

4.3 合併函數

某些情況下,map 函數會產生大量重複的中間 key。並且用戶自定義的 reduce 函數滿足交換律和結合律。MapReduce 允許用戶指定一個 combiner 函數,首先在 map Worker 本地上對記錄進行一次合併,再將合併後的結果通過網絡發送出去。

combiner 函數在每台 map Worker 上都會執行一次。一般清涼下 combiner 和 reduce 函數是一樣的,唯一的區別是 MapReduce 如何處理他們的輸出。reduce 函數的輸出會寫在最終的輸出文件中,combiner 函數的輸出會寫在一個中間文件中並傳輸給 reduce Worker。

部分合併在合適的場景中顯着地提升了 MapReduce 的效率。

4.4 輸入輸出類型

MapReduce 庫支持幾種不同格式的輸入。

  • 例如文本模式的輸入數據的每一行被視為一個 key-vlaue 對。key 是文件的偏移量,value 是那一行的內容。
  • 另一種常見的格式是以 key 進行排序來存儲的 key-value 對的序列。

每種輸入類型的實現都必須能把輸入數據分割成數據片段,該片段能夠由單獨的 map 任務來進行後續處理。雖然大多數 MapReduce 的使用者只需要使用庫中預定義的輸入類型能滿足要求,但是在需要的時候使用者依然可以通過提供一個簡單的 Reader 接口來支持一個新的輸入類型。

Reader 不一定要從文件中讀取數據,也可以從內存中、網絡中讀取數據。

4.5 輸出副作用的處理

MapReduce 的使用者有時期望在 map 和 reduce 輸出時額外輸出一些帶有輔助信息的文件。我們依靠輸出程序 Writer 把這種輸出的副作用變成原子和冪等的

通常應用程序先把輸出結果寫到一個臨時文件中,在輸出全部數據後使用系統級的原子操作重命名這個臨時文件。

如果一個任務產生多個輸出文件,我們沒有提供類似兩階段提交的原子操作支持這種情況。因此對於會產生多個輸出文件,並且對跨文件有一致性要求的任務,都必須是確定性的任務。

4.6 跳過「壞記錄」

有時用戶程序中的 bug 會導致 map 或 reduce 函數在處理某些記錄時會 crash 掉。慣常的做法是修復 bug 後重做 MapReduce。但有時找出 bug 並修復並不是一件容易的事。我們需要一種機制,讓 MapReduce 找出哪些記錄會導致確定性的 crash,在下次執行時跳過這些記錄不處理。

每個 Worker 進程都設置了信號處理函數捕獲內存段異常和總線錯誤。在執行 map 和 reduce 之前,MapReduce 會通過全局變量保存記錄序號,如果用戶程序觸發了一個一場系統信號,消息處理函數將儘力通過 UDP 包向 Master 發送處理的最後一條記錄的序號。當 Master 看到在處理某條特定記錄不止失敗一次時,就會認為這是一條「壞記錄」,就會在下次執行 map 或 reduce 任務時跳過這條記錄。

4.7 本地執行

在 map 和 reduce 函數中調試 bug 是非常困難的,因為執行操作是在分佈式系統中的大量主機上執行的,具體的執行位置由 Master 進行動態調度。

為了簡化測試,我們開發了一台 MapReduce 庫的本地實現,用戶可以通過這個庫令 MapReduce 操作在本地計算機上順序執行,那麼就可以很容易地進行調試。

4.8 狀態信息

Master 使用嵌入式的 HTTP 服務器如 Jetty 來顯示一組狀態信息頁面,用戶可以監控各種執行狀態。狀態信息頁面包括計算執行進度、任務執行情況、輸入輸出位元組數、處理百分比等信息。頁面還包含了指向每個任務的stderrstdout文件的鏈接。用戶根據這些數據預測計算需要執行多長時間,預測是否需要增加額外的計算資源等等。

狀態信息頁面上也可以看到哪些 Worker 失效了,以及他們失效時正在執行的 map 和 reduce 任務。

4.9 計數器

MapReduce 使用計數器統計事件的發生次數。

用戶可以在程序中創建一個命名的計數器對象,在 map 和 reduce 函數中相應地增加計數器的值:

Counter* uppercase; 
uppercase = GetCounter(「uppercase」); 
map(String name, String contents): 
  for each word w in contents: 
     if (IsCapitalized(w)):
        uppercase->Increment(); 
     EmitIntermediate(w, 「1″);

這些計數器的值周期性地從各個單獨的 Worker 機器上傳遞給 Master(附加在 Ping 的應答包中)。Master 把執行成功的 map 和 reduce 任務的計數器值進行累計。當 MapReduce 操作完成後將計數器的值返回給用戶代碼。計數器的當前值也會展現在狀態信息頁面中。

當累加計數器的值時,Master 要檢查重複運行的 map 或 reduce 任務,避免重複累加。

有些計數器的值由 MapReduce 庫自動維持,比如已經處理的輸入的 key-value 對的數量、輸出的 key-value 對的數量等等。

計數器機制對 MapReduce 操作的完整性檢查非常有用。比如在某些 MapReduce 操作中,用戶需要確保輸出的 key-value 對數量精準地等於輸入的 key-value 對數量。或者處理的某種文檔數量要求在某個範圍內。

5. 性能表現

本節中我們用在一個大型集群上運行得兩個計算來衡量 MapReduce 性能。一個計算在大約 1TB 的數據中進行特定的模式匹配,另一個計算對大約 1TB 的數據進行排序。

這兩個程序在 MapReduce 中是非常典型的應用,比如一個是對數據的表現形式進行轉換,另一個是從海量數據中抽取少部分用戶感興趣的數據。

5.1 集群配置

這些程序運行在由 1800 台機器構成的集群上,每台機器配置兩個 2G 主頻,支持超線程的 Intel Xeon 服務器,4GB 物理內存,兩個 160GB 的 IDE 硬盤和一個千兆以太網卡。這些機器部署在一個兩層的樹形交換網絡中,在 root 節點大概有 100~200GBPS 的傳輸帶寬。所有這些機器都採用對等部署,因此任意兩點之間的網絡來回時間小於 1ms。

5.2 Grep程序

這個分佈式的 grep 程序將掃描 $10^{10}$ 個 100byte 長的記錄,並查詢出現概率較小的三字符模式(它出現在 92337 個記錄中)。輸入數據被拆分為 64M 的 Block,整個輸出數據存放在一個文件中。

我們設定 M = 15000,R = 1。

下圖展示了這個運算隨時間的處理過程,其中 Y 軸標識輸入數據的處理速度。處理速度隨着參與 MapReduce 計算的機器數量的增加而增加。當 1746 台 Worker 參與計算時,處理速度達到了 30GB/s。當 map 任務結束,即在計算開始後 80 秒,輸入的處理速度降為 0。

image-20220323161108785

整個計算消耗大約 150s,但有約一分鐘用於了集群的啟動。啟動階段主要用於將程序傳送到 Worker,等待 GFS 系統打開文件,獲取相關的文件本地位置優化信息的時間。

5.3 Sort程序

排序程序處理 $10^{10}$ 個 100byte 長的記錄,共大約 1TB 的數據。

排序程序由不到 50 行代碼組成,只有三行的 map 函數從文本行中解析出 10 個位元組的 key 值作為排序的 key,並且把這個 key 和原始文本行作為中間的 key-value 鍵值對輸出。我們使用了一個內置的恆等函數作為 reduce 操作函數。這個函數把中間的 key-value 鍵值對不做任何改變輸出。最終排序結果輸出到兩路複製的 GFS 文件系統(輸出 2TB 數據)。

如前所述,輸入數據被分為 64MB 的 Block,並將輸出結果分區後存儲到 4000 個文件中。

我們設定 M = 15000,R = 4000。

我們的分區函數用 key 的原始位元組來把數據分區到 R 個片段中。

在這個測試中,我們使用的分區函數直到 key 的分區情況。通常對排序程序,我們會增加一個預處理的 MapReduce 程序用於採樣 key 的分佈情況,通過採樣的數據來計算對最終排序處理的分區點。

image-20220323162614546

上圖 a 顯示了整個排序程序的正常執行過程。左上角的圖例顯示了讀取輸入的速率,峰值約為 13GB/s。注意此處的輸入速率小於 grep 程序中的輸入速率,因為排序映射任務花了大約一半的時間和 IO 帶寬將中間輸出寫入到本地磁盤。

左側中間的圖顯示了中間數據從 map 任務發送到 reduce 任務的網絡速度。

左下角的圖顯示了 reduce 任務把排序後的數據寫到最終的輸出文件的速度。在第一個排序階段結束和數據開始寫入磁盤之間有一個小的延時,這是因為 Worker 正在忙於排序中間數據。

  • 輸入數據的讀取速度比中間數據排序速度和 reduce 輸出速度要快不少,這是因為我們的輸入數據本地優化策略起了作用。大部分數據都是從本地磁盤讀取的,從而節省了網絡帶寬。

  • 排序速度比輸出數據寫入到磁盤塊,這是因為輸出數據寫了兩份,用於保證數據可靠性和可用性。

5.4 任務備份進程的測試

Figure3 b 展示了關閉任務備份進程後的程序執行情況。執行過程和左圖相似,但輸出數據寫磁盤的動作在時間上拖了一個很長的尾巴,而且在這段時間裏幾乎沒有什麼寫入動作。

總耗時增加了將近百分之五十。

5.5 機器錯誤的測試

在 Figure3 c 中,我們在程序開始後幾分鐘 kill 了 1746 個 Worker 中的 200 個。集群底層的調度立刻在這些機器上重新開始新的 Worker 處理進程。

圖上顯示了一個 「負」 的輸入數據讀取速度,這是因為一些已經完成的 Map 任務丟失了,需要重新執行。整個運算只慢了大約百分之五。

6. 經驗

MapReduce 在各個領域都得到了廣泛應用:

  • 大規模機器學習問題。
  • Google News 產品的集群問題。
  • 從公眾查詢產品的報告中抽取數據。
  • 從大量的新應用和新產品的網頁中提取有用信息。
  • 大規模的圖形計算。

圖四顯示了 MapReduce 程序數量的顯著增長,使用 MapReduce 庫可以讓沒有分佈式開發經驗的程序員充分利用大量資源。

image-20220323165554820

在每次任務結束時,MapReduce library 記錄下了計算資源的使用情況,下面是 2004 年 8 月 MapReduce 運行任務所佔用的相關資源。

image-20220323165601621

6.1 大規模index

目前為止,MapReduce 最成功的應用是重寫了 Google 網絡搜索服務使用的 index 系統。索引系統的輸入數據是網絡爬蟲抓取回來的海量文檔,這些文檔數據保存在 GFS 文件系統中。

索引程序通過 5~10 次 MapReduce 操作來建立索引,這帶來了這些好處:

  • 實現索引部分的代碼簡單、小巧、易於理解。
  • MapReduce 的性能相當理想,所以我們可以把在概念上不相關的計算步驟分開處理,而不是混在一起來減少數據傳遞的額外消耗。這樣在未來可以容易地改變索引的處理方式。
  • 極大減少了索引系統的管理成本,因為機器失效、緩慢、網絡阻塞等問題都已經由 MapReduce 庫解決。我們可以通過在集群中增加機器的簡單方法來提高整體性能。

7. 相關工作

許多系統都提供了有限制的編程模式,通過這些對編程的嚴格限制來實現並行計算。例如,一個函數可以通過並行計算的算法,在 O(logN) 時間內計算 N 個元素數組的前綴。(這一段沒有看懂原文)

MapReduce 可以看作是我們結合在真實環境下處理海量數據的經驗,對這些經典模型進行簡化和萃取的成果。我們還是先了上千台處理集群中的容錯處理。

  • Bulk Synchronous Programming 和一些 MPI 原語提供了更高級別的並行處理抽象,可以更容易寫出並行處理的程序。MapReduce 的不同之處在於,他利用限制性編程模式實現了用戶程序的自動並發處理,並且提供了透明的容錯處理。

  • 我們數據本地優化策略的靈感來自於 active disks[12,15] 等技術,在 active disks,計算任務是盡量推送到數據存儲的節點處理,這樣就減少了網絡和 IO 子系統的吞吐量。我們在掛載幾個磁盤的普通機器上執行我們的計算,而不是在磁盤處理器上執行工作,但可以達到一樣的目的。

  • MapReduce 的任務備份進程機制和 Charlotte System[3] 提出的 eager 調度機制比較類似。Eager 調度機制的一個缺點是如果一個任務反覆失效,那麼整個計算就不能完成。我們通過忽略引起故障的記錄的方式在某種程度上解決了這個問題。

  • MapReduce的實現依賴於一個內部的集群管理系統,這個集群管理系統負責在一個超大的、共享機器的集群上分佈和運行用戶任務。雖然這個不是本論文的重點,但是有必要提一下,這個集群管理系統在理念上和其它系統,如 Condor[16] 是一樣。

  • MapReduce 庫的排序機制和 NOW-Sort[1] 的操作上很類似。讀取輸入源的機器(map workers)把待排序的數據進行分區後,發送到 R 個 Reduce worker 中的一個進行處理。每個 Reduce worker 在本地對數據進行排序(儘可能在內存中排序)。當然,NOW-Sort 沒有給用戶自定義的 map 和 reduce 函數的機會,因此不具備 MapReduce 庫廣泛的實用性。

  • River[2] 提供了一個編程模型:處理進程通過分佈式隊列傳送數據的方式進行互相通訊。和 MapReduce 類似,River 系統嘗試在不對等的硬件環境下,或者在系統顛簸的情況下也能提供近似平均的性能。River 是通過精心調度硬盤和網絡的通訊來平衡任務的完成時間。MapReduce 庫採用了其它的方法。通過對編程模型進行限制,MapReduce 框架把問題分解成為大量的「小」任務。這些任務在可用的 Worker 集群上動態的調度,這樣快速的 Worker 就可以執行更多的任務。通過對編程模型進行限制,我們可用在工作接近完成的時候調度備用任務,縮短在硬件配置不均衡的情況下縮小整個操作完成的時間(比如有的機器性能差、或者機器被某些操作阻塞了)。

  • BAD-FS[5] 採用了和 MapReduce 完全不同的編程模式,它是面向廣域網的。不過,這兩個系統有兩個基礎功能很類似:

    • 兩個系統採用重新執行的方式來防止由於失效導致的數據丟失。
    • 兩個都使用數據本地化調度策略,減少網絡通訊的數據量。
  • TACC[7] 是一個用於簡化構造高可用性網絡服務的系統。和 MapReduce 一樣,它也依靠重新執行機制來實現的容錯處理。

8. 總結

MapReduce 在 Google 內部成功應用於多個領域,我們把這種成功歸結為幾個方面:

  • 由於 MapReduce 封裝了並行處理、容錯處理、數據本地化優化、負載均衡等等技術難點的細節,使得 MapReduce 易於使用。
  • 大量不同類型的問題都可以通過 MapReduce 簡單解決,比如用於 Google 的網絡搜索服務,用於排序,用於數據挖掘,用於機器學習。
  • 我們實現了在超大型集群上能夠靈活部署運行的 MapReduce。

MapReduce 的開發過程也給人們以下啟發:

  • 約束編程模式使並行和分佈式計算非常容易,也易於構造容錯的計算環境。
  • 網絡帶寬是稀有資源,大量的系統優化是針對減少網絡傳輸量為目的。
  • 任務備份進程機制執行相同任務可以減少慢機器帶來的負面影響(硬件配置不平衡),也解決了由於機器失效導致的數據丟失問題。