MapReduce原理深入理解(一)

1.MapReduce概念

1)MapReduce是一種分散式計算模型,由Google提出,主要用於搜索領域,解決海量數據的計算問題.

2)MapReduce是分散式運行的,由兩個階段組成:Map和Reduce,Map階段是一個獨立的程式,有很多個節點同時運行,每個節點處理一部分數據。Reduce階段是一個獨立的程式,有很多個節點同時運行,每個節點處理一部分數據【在這先把reduce理解為一個單獨的聚合程式即可】。

3)MapReduce框架都有默認實現,用戶只需要覆蓋map()和reduce()兩個函數,即可實現分散式計算,非常簡單。

4)兩個函數的形參和返回值都是<key、value>,使用的時候一定要注意構造<k,v>。

2.MapReduce核心思想

 

 

 

(1)分散式的運算程式往往需要分成至少2個階段。

 

(2)第一個階段的MapTask並發實例,完全並行運行,互不相干。

 

(3)第二個階段的ReduceTask並發實例互不相干,但是他們的數據依賴於上一個階段的所有MapTask並發實例的輸出。

 

(4)MapReduce編程模型只能包含一個Map階段和一個Reduce階段,如果用戶的業務邏輯非常複雜,那就只能多個MapReduce程式,串列運行。

 

總結:分析WordCount數據流走向深入理解MapReduce核心思想。

 

 

 

 

 

 3. MapReduce 中的shuffle

 

 4.Mapreduce程式碼

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WordCount {
    //分割任務
    // 第一對kv,是決定數據輸入的格式
    // 第二隊kv 是決定數據輸出的格式
    public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
        /*map階段數據是一行一行過來的
            每一行數據都需要執行程式碼*/
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            LongWritable longWritable = new LongWritable(1);
            String s = value.toString();
            context.write(new Text(s), longWritable);
        }
    }
   //接收Map端數據
    public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
       /* reduce 聚合程式 每一個k都會調用一次
         * 默認是一個節點
         * key:每一個單詞
         * values:map端 當前k所對應的所有的v
         */
        @Override
        protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
            //設置統計的初始值為0
            long sum = 0l;
            for (LongWritable value : values) {
                sum += value.get();
            }
            context.write(key, new LongWritable(sum));
        }
    }

    /**
     * 是當前mapreduce程式入口
     * 用來構建mapreduce程式
     */
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //創建一個job任務
        Job job=Job.getInstance();
        //指定job名稱
        job.setJobName("第一個mr程式");
        //構建mr
        //指定當前main所在類名(識別具體的類)
        job.setJarByClass(WordCount.class);
        //指定map端類
        // 指定map輸出的kv類型
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        //指定reduce端類
        //指定reduce端輸出的kv類型
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        // 指定輸入路徑
        Path in = new Path("/word");
        FileInputFormat.addInputPath(job,in);
        //輸出路徑指定
        Path out = new Path("/output");
        FileSystem fs = FileSystem.get(new Configuration());
        //如果文件存在
        if(fs.exists(out)){
            fs.delete(out,true);
        }
        //存在
        FileOutputFormat.setOutputPath(job,out);

        //啟動
        job.waitForCompletion(true);
        System.out.println("MapReduce正在執行");
    }
}