MapReduce 编程模型 & WordCount 示例
- 2019 年 10 月 3 日
- 笔记
???????????????? MapReduce?
??
??????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????
???????
?? MapReduce ???????????????????????????????????????????????????????????????????????????????????????
???????????????????
?????????????10M?????????????????????????????????
??????? Java ????????????????????????????new ??map????????key ?????value ??????????
????????????????????????????????????????????????map ????????? value + 1,?????? value ? 1 ?
???????????????????????????
???????????? 10M????????????????????? 2T ???????????? T ??????????????????????????
????????????????8G?16G?32G …,??? 128G ?????????????????????128G ?????????????????????????
???????????????????????????????????????? N ?????????? 2T ???????????????????????????????????????????????????????????
MapReduce ??
MapReduce ???????????????????1TB????????? Google ??????????? “????” ???????????????????Map(??)?Reduce(??)???
??????????Map ????????????????????Reduce ??????????????
??????????????????????????
???????????
???????T ????????????block?????????????128M ???
??????????????map task ???????????????????
1.???????????????????????????????????
2.??? HashMap ????????? <?????>
3.????????????????????????
4.?????????HashMap ????????? 3 ?HashMap5.?3? HashMap ????? 3? Reduce task ??????????????????????????? Reduce task ????????????????
???? Reduce task ???????????????? hdfs ?????????
????????????
????????????????????????????????????????????????
•?? Map task ?????????
•?? Reduce task ?????????????
•Map task ? Reduce task ????????????????Reduce Task ??
•?? Map task ???????????
•Map task ?????????????????????????
•?????
????? MapReduce
??????????????????????????????????????????????????????????????????????????
? MapReduce ?????????????????
WordCount ??
??????????????3???????map task ???? reduce task ??????????????????????
??? map task :
package com.zhouq.mr;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* KEYIN ???????MR ??????????????????long ??
* ?hadoop ????????????????????Long ???? LongWritable
* VALUEIN : ???????MR ????????????String ???????? Text ??
* <p>
* KEYOUT ? ???????????????????????key ,???????????string ???Text
* <p>
* VALUEOUT? ????????????????????value ????????????Integer,???Integer ??????? IntWritable
* <p>
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
/**
* map ??????????map ???
* maptask ????????? ???????????map ???
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//?????????
String line = value.toString();
//???????????????
String[] words = line.split(" ");
//?????? <word,1>
for (String word : words) {
//?????key ???? ??value???
// ????????????????????????
// ?????????????reduce task ?,????
context.write(new Text(word), new IntWritable(1));
}
}
}
???? reduce task ???
/**
* KEYIN VALUEIN ??map ?????KEYOUT VALUEOUT
* <p>
* KEYOUT :???? reduce ???????key
* VALUEOUT : ????reduce ??????? value
*/
public class WordcountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
/**
* <zhouq,1>,<zhouq,1>,<zhouq,2> ......
* ??key ??????kv? ? key
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//????????? ??
// String word = key.toString();
//
int count = 0;
for (IntWritable value : values) {
count += value.get();
}
//???key ????
context.write(key, new IntWritable(count));
}
}
???????
/**
* wc ???
*/
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
// mapreduce.framework.name ??? local ????????,????local
// ????????? yarn ,???????yarn ?. ????????????????.
// conf.set("mapreduce.framework.name", "yarn");
// conf.set("yarn.resoucemanager.hostname", "mini1");
//conf.set("fs.defaultFS","com.zhouq.hdfs://mini1:9000/");
Job job = Job.getInstance(conf);
//??????jar ? ???????
job.setJarByClass(WordCountDriver.class);
//???????mepper ? reduce ???
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordcountReduce.class);
//??mapper ??? key value ??
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
?
//?? ????? kv ??
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//??job???????????
FileInputFormat.setInputPaths(job,new Path(args[0]));
//??job ???????
FileOutputFormat.setOutputPath(job,new Path(args[1]));
boolean waitForCompletion = job.waitForCompletion(true);
System.exit(waitForCompletion ? 0 : 1);
}
}
????????????????????????????????????????
??????????????????????hadoop ??????????????????????????
?????? part-r-00000 ?????
?????https://github.com/heyxyw/bigdata/blob/master/bigdatastudy/mapreduce/src/main/java/com/zhouq/mr/WordCountDriver.java
??
?????????????? MapReduce ????????
??·?????
?????????????????????????????????????????????????
??????????????????? Java ????????????????????????????? 10?? 10??????????????????????? boss ??????????????????
Java ?????????????? Java ?????????????????????? Java ????????????????????????????????????????????
??????????????????“???”??????? Java ????/???????