MapReduce 编程模型 & WordCount 示例

  • 2019 年 10 月 3 日
  • 筆記

 

???????????????? MapReduce?

 

??

??????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????

 

???????

?? MapReduce ???????????????????????????????????????????????????????????????????????????????????????

???????????????????

?????????????10M?????????????????????????????????

??????? Java ????????????????????????????new ??map????????key ?????value ??????????

 ????????????????????????????????????????????????map ????????? value + 1,?????? value ? 1 ?

 ???????????????????????????

???????????? 10M????????????????????? 2T ???????????? T ??????????????????????????

????????????????8G?16G?32G …,??? 128G ?????????????????????128G ?????????????????????????

???????????????????????????????????????? N ?????????? 2T ???????????????????????????????????????????????????????????

 

MapReduce ??

MapReduce ???????????????????1TB????????? Google ??????????? “????” ???????????????????Map(??)?Reduce(??)???

??????????Map ????????????????????Reduce ??????????????

??????????????????????????

 

???????????

???????T ????????????block?????????????128M ???

??????????????map task ???????????????????

1.???????????????????????????????????

2.??? HashMap ????????? <?????>

3.????????????????????????

4.?????????HashMap ????????? 3 ?HashMap5.?3? HashMap ????? 3? Reduce task ??????????????????????????? Reduce task ????????????????

???? Reduce task ???????????????? hdfs ?????????

????????????

????????????????????????????????????????????????

•?? Map task ?????????

•?? Reduce task ?????????????

•Map task ? Reduce task ????????????????Reduce Task ??

•?? Map task ???????????

•Map task ?????????????????????????

•?????

 

????? MapReduce

??????????????????????????????????????????????????????????????????????????

? MapReduce ?????????????????

 

WordCount ??

??????????????3???????map task ???? reduce task ??????????????????????

??? map task :

package com.zhouq.mr;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/** * KEYIN ???????MR ??????????????????long ?? * ?hadoop ????????????????????Long ???? LongWritable * VALUEIN : ???????MR ????????????String ???????? Text ?? * <p> * KEYOUT ? ???????????????????????key ,???????????string ???Text * <p> * VALUEOUT? ????????????????????value ????????????Integer,???Integer ??????? IntWritable * <p> */public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {    /**     * map ??????????map ???     * maptask ????????? ???????????map ???     */    @Override    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {        //?????????        String line = value.toString();        //???????????????        String[] words = line.split(" ");        //?????? <word,1>        for (String word : words) {            //?????key ???? ??value???            // ????????????????????????            // ?????????????reduce task ?,????            context.write(new Text(word), new IntWritable(1));        }    }}

???? reduce task ???

/** * KEYIN VALUEIN ??map ?????KEYOUT VALUEOUT * <p> * KEYOUT :???? reduce ???????key * VALUEOUT : ????reduce ??????? value */public class WordcountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {    /**     * <zhouq,1>,<zhouq,1>,<zhouq,2> ......     * ??key ??????kv? ? key     */    @Override    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {        //????????? ??//        String word = key.toString();        //        int count = 0;        for (IntWritable value : values) {            count += value.get();        }        //???key  ????        context.write(key, new IntWritable(count));    }}

 

???????

 

/** * wc ??? */public class WordCountDriver {    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {        Configuration conf = new Configuration();        // mapreduce.framework.name ??? local ????????,????local        // ????????? yarn ,???????yarn ?. ????????????????.//        conf.set("mapreduce.framework.name", "yarn");//        conf.set("yarn.resoucemanager.hostname", "mini1");        //conf.set("fs.defaultFS","com.zhouq.hdfs://mini1:9000/");        Job job = Job.getInstance(conf);        //??????jar ? ???????        job.setJarByClass(WordCountDriver.class);        //???????mepper ? reduce ???        job.setMapperClass(WordCountMapper.class);        job.setReducerClass(WordcountReduce.class);        //??mapper ??? key  value ??        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(IntWritable.class);?        //?? ????? kv  ??        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(IntWritable.class);        //??job???????????        FileInputFormat.setInputPaths(job,new Path(args[0]));        //??job ???????        FileOutputFormat.setOutputPath(job,new Path(args[1]));        boolean waitForCompletion = job.waitForCompletion(true);        System.exit(waitForCompletion ? 0 : 1);    }}

 

????????????????????????????????????????

 

??????????????????????hadoop ?????????????????????????? 

 

?????? part-r-00000 ?????

?????https://github.com/heyxyw/bigdata/blob/master/bigdatastudy/mapreduce/src/main/java/com/zhouq/mr/WordCountDriver.java

 

??

?????????????? MapReduce ????????

 

??·?????

????????????

 


 

?????????????????????????????????????????????????

??????????????????? Java ????????????????????????????? 10?? 10??????????????????????? boss ??????????????????

 


 

Java ?????????????? Java ?????????????????????? Java ????????????????????????????????????????????

??????????????????“???”??????? Java ????/???????