hadoop之mapreduce詳解(基礎篇)
- 2019 年 10 月 3 日
- 筆記
本篇文章主要從mapreduce運行作業的過程,shuffle,以及mapreduce作業失敗的容錯幾個方面進行詳解。
一、mapreduce作業運行過程
1.1、mapreduce介紹
MapReduce是一種編程模型,用於大規模數據集(大於1TB)的並行運算。概念”Map(映射)”和”Reduce(歸約)”,是它們的主要思想,都是從函數式編程語言里借來的,還有從矢量編程語言里借來的特性。它極大地方便了編程人員在不會分佈式並行編程的情況下,將自己的程序運行在分佈式系統上。 當前的軟件實現是指定一個Map(映射)函數,用來把一組鍵值對映射成一組新的鍵值對,指定並發的Reduce(歸約)函數,用來保證所有映射的鍵值對中的每一個共享相同的鍵組。 —來源於百度百科
MapReduce是一個基於集群的高性能並行計算平台(Cluster Infrastructure)
MapReduce是一個並行計算與運行軟件框架(Software Framework)
MapReduce是一個並行程序設計模型與方法(Programming Model & Methodology)
mapreduce是hadoop中一個批量計算的框架,在整個mapreduce作業的過程中,包括從數據的輸入,數據的處理,數據的數據輸入這些部分,而其中數據的處理部分就要map,reduce,combiner等操作組成。在一個mapreduce的作業中必定會涉及到如下一些組件:
1、客戶端,提交mapreduce作業
2、yarn資源管理器,負責集群上計算資源的協調
3、yarn節點管理器,負責啟動和監控集群中機器上的計算容器(container)
4、mapreduce的application master,負責協調運行mapreduce的作業
5、hdfs,分佈式文件系統,負責與其他實體共享作業文件
1.2、作業運行過程
作業的運行過程主要包括如下幾個步驟:
1、作業的提交 2、作業的初始化 3、作業任務的分配 4、作業任務的執行 5、作業執行狀態更新 6、作業完成
具體作業執行過程的流程圖如下圖所示:
1.2.1、作業的提交
作業提交源碼分析詳情見:hadoop2.7之作業提交詳解(上) hadoop2.7之作業提交詳解(下)
在MR的代碼中調用waitForCompletion()方法,裏面封裝了Job.submit()方法,而Job.submit()方法裏面會創建一個JobSubmmiter對象。當我們在waitForCompletion(true)時,則waitForCompletion方法會每秒輪詢作業的執行進度,如果發現與上次查詢到的狀態有差別,則將詳情打印到控制台。如果作業執行成功,就顯示作業計數器,否則將導致作業失敗的記錄輸出到控制台。
其中JobSubmmiter實現的大概過程如下:
1、向資源管理器resourcemanager提交申請,用於一個mapreduce作業ID,如圖步驟2所示
2、檢查作業的輸出配置,判斷目錄是否已經存在等信息
3、計算作業的輸入分片的大小
4、將運行作業的jar,配置文件,輸入分片的計算資源複製到一個以作業ID命名的hdfs臨時目錄下,作業jar的複本比較多,默認為10個(通過參數mapreduce.client.submit.file.replication控制),
5、通過資源管理器的submitApplication方法提交作業
1.2.2、作業的初始化
1、當資源管理器通過方法submitApplication方法被調用後,便將請求傳給了yarn的調度器,然後調度器在一個節點管理器上分配一個容器(container0)用來啟動application master(主類是MRAppMaster)進程。該進程一旦啟動就會向resourcemanager註冊並報告自己的信息,application master並且可以監控map和reduce的運行狀態。因此application master對作業的初始化是通過創建多個薄記對象以保持對作業進度的跟蹤。
2、application master接收作業提交時的hdfs臨時共享目錄中的資源文件,jar,分片信息,配置信息等。並對每一個分片創建一個map對象,以及通過mapreduce.job.reduces參數(作業通過setNumReduceTasks()方法設定)確定reduce的數量。
3、application master會判斷是否使用uber(作業與application master在同一個jvm運行,也就是maptask和reducetask運行在同一個節點上)模式運行作業,uber模式運行條件:map數量小於10個,1個reduce,且輸入數據小於一個hdfs塊
可以通過參數:
mapreduce.job.ubertask.enable #是否啟用uber模式 mapreduce.job.ubertask.maxmaps #ubertask的最大map數 mapreduce.job.ubertask.maxreduces #ubertask的最大reduce數 mapreduce.job.ubertask.maxbytes #ubertask最大作業大小
4、application master調用setupJob方法設置OutputCommiter,FileOutputCommiter為默認值,表示建立做的最終輸出目錄和任務輸出的臨時工作空間
1.2.3、作業任務的分配
1、在application master判斷作業不符合uber模式的情況下,那麼application master則會向資源管理器為map和reduce任務申請資源容器。
2、首先就是為map任務發出資源申請請求,直到有5%的map任務完成時,才會為reduce任務所需資源申請發出請求。
3、在任務的分配過程中,reduce任務可以在任何的datanode節點運行,但是map任務執行的時候需要考慮到數據本地化的機制,在給任務指定資源的時候每個map和reduce默認為1G內存,可以通過如下參數配置:
mapreduce.map.memory.mb mapreduce.map.cpu.vcores mapreduce.reduce.memory.mb mapreduce.reduce.cpu.vcores
1.2.4、作業任務的執行
application master提交申請後,資源管理器為其按需分配資源,這時,application master就與節點管理器通信來啟動容器。該任務由主類YarnChild的一個java應用程序執行。在運行任務之前,首先將所需的資源進行本地化,包括作業的配置,jar文件等。接下來就是運行map和reduce任務。YarnChild在單獨的JVM中運行。
1.2.5、作業任務的狀態更新
每個作業和它的每個任務都有一個狀態:作業或者任務的狀態(運行中,成功,失敗等),map和reduce的進度,作業計數器的值,狀態消息或描述當作業處於正在運行中的時候,客戶端可以直接與application master通信,每秒(可以通過參數mapreduce.client.progressmonitor.pollinterval設置)輪詢作業的執行狀態,進度等信息。
1.2.6、作業的完成
當application master收到最後一個任務已完成的通知,便把作業的狀態設置為成功。
在job輪詢作業狀態時,知道任務已經完成,然後打印消息告知用戶,並從waitForCompletion()方法返回。
當作業完成時,application master和container會清理中間數據結果等臨時問題。OutputCommiter的commitJob()方法被調用,作業信息由作業歷史服務存檔,以便用戶日後查詢。
二、shuffle
mapreduce確保每個reduce的輸入都是按照鍵值排序的,系統執行排序,將map的輸入作為reduce的輸入過程稱之為shuffle過程。shuffle也是我們優化的重點部分。shuffle流程圖如下圖所示:
2.1、map端
在生成map之前,會計算文件分片的大小:計算源碼詳見:hadoop2.7作業提交詳解之文件分片
然後會根據分片的大小計算map的個數,對每一個分片都會產生一個map作業,或者是一個文件(小於分片大小*1.1)生成一個map作業,然後通過自定的map方法進行自定義的邏輯計算,計算完畢後會寫到本地磁盤。
在這裡不是直接寫入磁盤,為了保證IO效率,採用了先寫入內存的環形緩衝區,並做一次預排序(快速排序)。緩衝區的大小默認為100MB(可通過修改配置項mpareduce.task.io.sort.mb進行修改),當寫入內存緩衝區的大小到達一定比例時,默認為80%(可通過mapreduce.map.sort.spill.percent配置項修改),將啟動一個溢寫線程將內存緩衝區的內容溢寫到磁盤(spill to disk),這個溢寫線程是獨立的,不影響map向緩衝區寫結果的線程,在溢寫到磁盤的過程中,map繼續輸入到緩衝中,如果期間緩衝區被填滿,則map寫會被阻塞到溢寫磁盤過程完成。溢寫是通過輪詢的方式將緩衝區中的內存寫入到本地mapreduce.cluster.local.dir目錄下。在溢寫到磁盤之前,我們會知道reduce的數量,然後會根據reduce的數量劃分分區,默認根據hashpartition對溢寫的數據寫入到相對應的分區。在每個分區中,後台線程會根據key進行排序,所以溢寫到磁盤的文件是分區且排序的。如果有combiner函數,它在排序後的輸出運行,使得map輸出更緊湊。減少寫到磁盤的數據和傳輸給reduce的數據。
每次環形換沖區的內存達到閾值時,就會溢寫到一個新的文件,因此當一個map溢寫完之後,本地會存在多個分區切排序的文件。在map完成之前會把這些文件合併成一個分區且排序(歸併排序)的文件,可以通過參數mapreduce.task.io.sort.factor控制每次可以合併多少個文件。
在map溢寫磁盤的過程中,對數據進行壓縮可以提交速度的傳輸,減少磁盤io,減少存儲。默認情況下不壓縮,使用參數mapreduce.map.output.compress控制,壓縮算法使用mapreduce.map.output.compress.codec參數控制。
2.2、reduce端
map任務完成後,監控作業狀態的application master便知道map的執行情況,並啟動reduce任務,application master並且知道map輸出和主機之間的對應映射關係,reduce輪詢application master便知道主機所要複製的數據。
一個Map任務的輸出,可能被多個Reduce任務抓取。每個Reduce任務可能需要多個Map任務的輸出作為其特殊的輸入文件,而每個Map任務的完成時間可能不同,當有一個Map任務完成時,Reduce任務就開始運行。Reduce任務根據分區號在多個Map輸出中抓取(fetch)對應分區的數據,這個過程也就是Shuffle的copy過程。。reduce有少量的複製線程,因此能夠並行的複製map的輸出,默認為5個線程。可以通過參數mapreduce.reduce.shuffle.parallelcopies控制。
這個複製過程和map寫入磁盤過程類似,也有閥值和內存大小,閥值一樣可以在配置文件里配置,而內存大小是直接使用reduce的tasktracker的內存大小,複製時候reduce還會進行排序操作和合併文件操作。
如果map輸出很小,則會被複制到Reducer所在節點的內存緩衝區,緩衝區的大小可以通過mapred-site.xml文件中的mapreduce.reduce.shuffle.input.buffer.percent指定。一旦Reducer所在節點的內存緩衝區達到閥值,或者緩衝區中的文件數達到閥值,則合併溢寫到磁盤。
如果map輸出較大,則直接被複制到Reducer所在節點的磁盤中。隨着Reducer所在節點的磁盤中溢寫文件增多,後台線程會將它們合併為更大且有序的文件。當完成複製map輸出,進入sort階段。這個階段通過歸併排序逐步將多個map輸出小文件合併成大文件。最後幾個通過歸併合併成的大文件作為reduce的輸出
2.3、總結
當Reducer的輸入文件確定後,整個Shuffle操作才最終結束。之後就是Reducer的執行了,最後Reducer會把結果存到HDFS上。
在Hadoop集群環境中,大部分map 任務與reduce任務的執行是在不同的節點上。當然很多情況下Reduce執行時需要跨節點去拉取其它節點上的map任務結果。如果集群正在運行的job有很多,那麼task的正常執行對集群內部的網絡資源消耗會很嚴重。這種網絡消耗是正常的,我們不能限制,能做的就是最大化地減少不必要的消耗。還有在節點內,相比於內存,磁盤IO對job完成時間的影響也是可觀的。從最基本的要求來說,我們對Shuffle過程的期望可以有:
1、完整地從map task端拉取數據到reduce 端。
2、在跨節點拉取數據時,儘可能地減少對帶寬的不必要消耗。
3、減少磁盤IO對task執行的影響。
在MapReduce計算框架中,主要用到兩種排序算法:快速排序和歸併排序。在Map任務發生了2次排序,Reduce任務發生一次排序:
1、第1次排序發生在Map輸出的內存環形緩衝區,使用快速排序。當緩衝區達到閥值時,在溢寫到磁盤之前,後台線程會將緩衝區的數據劃分成相應分區,在每個分區中按照鍵值進行內排序。
2、第2次排序是在Map任務輸出的磁盤空間上將多個溢寫文件歸併成一個已分區且有序的輸出文件。由於溢寫文件已經經過一次排序,所以合併溢寫文件時只需一次歸併排序即可使輸出文件整體有序。
3、第3次排序發生在Shuffle階段,將多個複製過來的Map輸出文件進行歸併,同樣經過一次歸併排序即可得到有序文件。
三、作業失敗和容錯
既然有作業的運行,肯定會有作業的失敗,作業的失敗(不考慮硬件,平台原因引起的失敗)可能會存在不同的問題,如下:
3.1、任務運行失敗
用戶代碼拋出異常(代碼沒寫好):這種情況任務JVM會在退出之前向application master發送錯誤報告,並記錄進用戶日誌,application master對該作業標記為failed,並釋放掉佔有的資源容器。
另一種就是JVM突然退出,這種情況節點管理器會注意到進程已經退出,並通知application master將此任務標記為失敗,如果是因為推測執行而導致任務被終止,則不會被被標記為失敗。而任務掛起又不同,一旦application master注意到有一段時間沒有收到進度更新,便會把任務標記為失敗,默認為10分鐘,參數mapreduce.task.timeout控制application master被告知一個任務失敗,將會重新調度該任務執行(會在與之前失敗的不同節點上運行),默認重試4次,如果四次都失敗,則作業判定為失敗,參數控制為:
mapreduce.map.maxattempts mapreduce.reduce.maxattempts
3.2、application master運行失敗
AM也可能由於各種原因(如網絡問題或者硬件故障)失效,Yarn同樣會嘗試重啟AM
可以為每個作業單獨配置AM的嘗試重啟次數:mapreduce.am.max-attempts,默認值為2
Yarn中的上限一起提高:yarn.resourcemanager.am.nax-attempts,默認為2,單個應用程序不可以超過這個限制,除非同時修改這兩個參數。
恢復過程:application master向資源管理器發送周期性的心跳。當application master失敗時,資源管理器會檢測到該失敗,並在一個新的容器中啟動application master,並使用作業歷史來恢復失敗的應用程序中的運行任務狀態,使其不必重新運行,默認情況下恢復功能是開啟的,yarn.app.mapreduce.am.job.recovery.enable控制客戶端向application master輪詢作業狀態時,如果application master運行失敗了,則客戶端會向資源管理器resourcemanager詢問和緩存application master地址。
3.3、節點管理器運行失敗
如果節點管理器崩潰或者運行非常緩慢,則就會停止向資源管理器發送心跳信息,如果10分鐘(可以通過參數yarn.resourcemanager.nm.liveness-monitor.expiry-interval-ms設置)資源管理器沒有收到一條心跳信息,則資源管理器將會通知停止發送心跳的節點管理器,並將其從自己的資源池中移除該節點管理器,在該節點上的application master和任務的失敗,都通過如上兩種恢復機制進行恢復。
3.4、資源管理器運行失敗
資源管理器失敗時一個很嚴重的問題,所有的任務將不能被分配資源,作業和容器都無法啟動,那麼整個通過yarn控制資源的集群都處於癱瘓狀態。
容錯機制:resourcemanager HA 詳情見:hadoop高可用安裝和原理詳解
更多hadoop生態文章見: hadoop生態系列
參考:
《Hadoop權威指南 大數據的存儲與分析 第四版》