mr原理簡單分析
背景
又是一個周末一天一天的過的好快,今天的任務幹啥呢,索引總結一些mr吧,因為前兩天有面試問過我?我當時也是簡單說了一下,畢竟現在寫mr程式的應該很少很少了,廢話不說了,結合官網和自己理解寫起。
簡單分析
一個mr作業通常數據會被切割成多個數據塊通過map任務來並行處理,就是說我們在處理文件的時候,首次我們寫入文件會被分割成多個塊,hdfs文件設計支援的語義 write-once-read-more,block塊是128m默認 //hadoop.apache.org/docs/r3.3.0/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html Data Blocks 位置。然後在我們讀取的時候也就是提交mr 作業後namenode會根據元數據管理然後從不同的datanode得知數據位置,從而進行讀取。mr框架包含一個單獨的master ResouceManager,每個集群結點一個工作NodeManager,每個應用一個MRAppMaster。
Input 和output,接受《key,value》最後落地《key,value》形式,需要實現Writetable介面,還要實現WritebaleCompare介面因為要對比排序。
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, “word count”);
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Mapper
我們會接受<key,value>對,hdfs生產map任務對每一個Inputsplit通過Inputformat在我們的作業中。
mapper設置通過Job.setMapperClass(Class) 方法,我們實現mapper介面對應就可以在map中寫內容了。如果向清理內容可以實現cleanUp方法。這個方法 備註 Called once at the end of the task.也就是說在task結束後會觸發一次。
輸出對並不一樣要跟輸入對類型保持一致,一個輸入對可能會產生一個或者多個輸出對。輸出通過context的write方法收集。
應用可以通過Counter 來上報統計,上下文中可以找到Counter counter = context.getCounter(MRJobConfig.JOB_NAME, MRJobConfig.JOB_NAME);
所有的結果通過key來進行發放傳遞給reduce然後落地結果。用戶也可以通過比較器來進行指定分組。
Job.setGroupingComparatorClass(Class).
mapper的輸出會分區到reduce中,然後數量與reduce的任務數量一致,也可以通過實現自定義Partitioner來進行分配。
combiner也是中間優化的一部分,通過ob.setCombinerClass(Class) 來設置combiner類,在發送到reduce之前在減少數量從而提高性能。數據結果如何存儲什麼格式,可以通過設置壓縮形式來存儲通過配置。
多少個Mapper
mapper的個數通常是通過輸入文件大小block數量來決定的。正常的並行度水平對於每個結點來看是10-100之間,離線跑hql腳本一般設置過大會導致佔用資源過多,其他任務排隊情況導致任務從而延遲,或者浪費資源。例子:如果你的輸入文件大小是10T,那麼hdfs默認塊是128M,那麼你就會有82,000個map數量,這個就需要通過配置文件來設置map 的大小了。
Reducer
通過mapper處理完以後發送到reduce端的對,reducer會對這些對再進行處理使得這個以key為分組的對集合更小。
Job.setNumReduceTasks(int) 設置recude個數Job.setReducerClass(Class) 設置reduce的執行類
reduce有三個階段:shuffle, sort and reduce.翻譯出來感覺不太好。
Shuffle
這個階段就是通過mapper的輸出結果數據進行一次分組partition
Sort
怎麼排序呢,就是mapper階段的task,輸出會是以key分組的,然後相同的key再進行merge 合併排序,在這個階段。
Secondary Sort
二次排序,可能用戶對於之前的key排序不滿了,希望再次修改進行重新排序分組那麼通過設置Job.setSortComparatorClass(Class).Job.setGroupingComparatorClass(Class)以達到目的。
Reduce
最後一個階段reduce,減少?這麼翻譯總感覺不夠精準那就直接叫reduce吧。以key分組到達一個reduce,拿到數據Context.write(WritableComparable, Writable)寫到文件系統搞定完畢。reduce的輸出是沒有排序的。
多少個reduce
一個合理的數量應該是0.95或者是1.75 乘以 結點數量 * 每個結點最大的容量數。
0.95啟動更快,1.75負載更好。reduce數量多了對集群是個開銷,但是對於提升成功率更好。因子小於整數的目的也是為了能夠留有餘地。
Reducer NONE
reducer設置為空的或者0都可以的。有些任務不要進行對map結果進行排序等操作,就可以直接寫入到文件系統。FileOutputFormat.setOutputPath(Job, Path)
Partitioner
分區以key為主鍵,對map結果partiton,按照哈希的函數方式,數量與reduce任務一樣。
//hadoop.apache.org/docs/r3.3.0/api/org/apache/hadoop/mapreduce/lib/partition/HashPartitioner.html HashPartitioner是默認的哈希分區方式。
Counter
一個統計工具,上文也說過了,可以通過上下文拿到。
Task Execution & Environment
任務的執行和環境,MRAppMaster執行 mr任務的時候 map 和reduce都是作為一個進程在一個分開的jvm中執行。
然後我們可以通過配置設置一些jvm的參數,堆棧大小,gc日誌,這些我們可以觀察到任務的運行情況等。
Job Submission and Monitoring
任務提交和監控說一下,步驟:
1、輸入輸出檢查
2、計算輸入文件大小
3、設置分散式快取資訊如果有必要
4、然後就是上傳jar 和配置 到mr 的執行目錄下
5、提交到ResourceManager上然後監控它的狀態
Job.submit() : 作業提交到集群立即返回.
Job.waitForCompletion(boolean) : 作業提交到集群然後等待完成
Memory Management Map Parameters Shuffle/Reduce Parameters Configured Parameters不介紹上了,主要是mr的運行原理說一下,剩下的可以通過開頭的mr連接了解到,官網這一篇還是有很多東西的,後面沒有說到的輸入輸出文件類,split類,分散式快取,提交debug腳本查看日誌等等,大家都可以去看一下,最後是個wordcount案例應用了這些特點。
看到的小夥伴有什麼工作機會可以跟我聯繫目前在考慮新機會,多謝!