7,MapReduce基礎

  • 2020 年 3 月 14 日
  • 筆記

MapReduce基礎

一、關於MapReduce

1.1 為什麼要MapReduce

  • 單機資源有限:由於單台計算機的資源有限,計算能力不足以處理海量數據;所以需要多台計算機組成分佈式集群來處理海量數據。
  • 分佈式計算較複雜:在分佈式計算中,計算任務的分發,各個主機之間的協作;程序的啟動以及運行過程中的監控、容錯、重試等都會變得很複雜。所以引入了MapReduce框架,框架解決了分佈式開發中的複雜性,開發人員只需要將大部分工作集中在業務邏輯的開發上,從而極大的提高了工作效率。

1.2 MapReduce的定義

  • MapReduce是一個分佈式運算程序的編程框架,用於大規模數據集(大於1TB)的並行計算;Map(映射)和reduce(歸約)是它的主要思想;它極大地方便了編程人員在不會分佈式並行編程的情況下,將自己的程序運行在分佈式系統上。

二、MapReduce的優缺點

2.1 優點:

  • 易於編程:只需要實現一些接口,就可以完成一個分佈式程序的編寫;跟編寫一個串行程序一樣;
  • 良好的擴展性:當計算資源不足時,只需要簡單的增加機器來擴展它的計算能力;
  • 高容錯性:當一個機器掛了之後,會自動把上面的計算任務轉移到另一個節點上運行,無需人工干預;
  • 海量:適合PB級海量數據的離線處理。

2.2 缺點:

  • 不適合實時計算:MapReduce由於過程較為複雜,IO次數較多,所以無法做到毫秒或秒級響應;
  • 不適合流式計算:流式計算的輸入是動態的,可以不斷添加,而MapReduce的輸入是靜態的;
  • 不適合DAG(有向圖)計算:對於多個程序之間有依賴關係,即後一個程序的輸入是前一個程序的輸出;雖然MapReduce也可以完成,但都是通過磁盤來傳遞中間數據,造成大量的磁盤IO,性能極低。

三、MapReduce的執行階段

3.1 執行的兩個階段

  • Map階段:若干個maptask並發實例,完全並行運行,互不相干。

  • Reduce階段:若干個reducetask並發實例,完全並行運行,但是他們的數據依賴於Map階段的輸出。

  • 注意:MapReduce模型只能包含一個map階段和一個reduce階段;如果業務邏輯非常複雜,就只能使用多個MapReduce程序,串行運行。

四、編寫MapReduce程序

  • 用戶需要編寫的三個部分:Mapper、Reducer、Driver(提交MR程序)。

4.1 以WordCount為例:

1. 編寫Mapper

// 注意:hadoop1.0版本中是mapred下包,hadoop2.0是mapreduce下的包  import org.apache.hadoop.mapreduce.Mapper;  // 繼承Mapper父類,泛型為輸入和輸出的<K, V>;並重寫父類的map方法  public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {      /**       * 每行文本都會執行一次map方法.       *       * @param key     文本偏移量.       * @param value   一行文本.       * @param context 上下文對象.       * @throws IOException          .       * @throws InterruptedException 當阻塞方法收到中斷請求時拋出.       */      @Override      protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {          String line = value.toString();          String[] words = line.split("\s+");   // 拆分一行中的單詞          for (String word : words) {              context.write(new Text(word), new IntWritable(1));   // 輸出一個<K, V>          }      }  }

2. 編寫Reducer

// 繼承Reducer類,輸入的<K, V>類型為map端輸出<K, V>類型  public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {      /**       * 相同的key只會執行一次reduce方法       *       * @param key     map端輸出的key       * @param values  相同key的value集合       * @param context 上下文對象       * @throws IOException          .       * @throws InterruptedException .       */      @Override      protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {          // 當前的 key出現了多少次          int count = 0;          // values中的數據是反序列化過來的,最好不要直接使用values中的bean          for (IntWritable value : values) {              count += value.get();          }          context.write(key, new IntWritable(count));  // 輸出      }  }

3. 編寫Driver

// Driver的作用是將這個Mapper和Reducer程序打包成一個Job,並提交該Job  public class WordCountDriver {      public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {            // 不需要為 conf設置HDFS等參數,因為conf會調用系統默認的配置文件,          // 所以這個mr程序在哪裡運行就會調用哪裡的配置文件,在集群上運行就會使用集群的設置文件。          Configuration conf = new Configuration();          // 刪除輸出文件,或者手動刪除          // FileHelper.deleteDir(args[1], conf);            // 根據配置文件實例化一個 Job,並取個名字          Job job = Job.getInstance(conf, "MyWordCount");            // 設置 Jar的位置          job.setJarByClass(WordCountDriver.class);            // 設置 Mapper運行類,以及輸出的key和value的類型          job.setMapperClass(WordCountMapper.class);          job.setMapOutputKeyClass(Text.class);          job.setMapOutputValueClass(IntWritable.class);            // 設置 Reducer的運行類,以及輸出的key和value的類型          job.setReducerClass(WordCountReducer.class);          job.setOutputKeyClass(Text.class);          job.setOutputValueClass(IntWritable.class);            // 設置分區(可以不用設置)          // 當設置的分區數大於實際分區數時,可以正常執行,多出的分區為空文件;          // 當設置的分區數小於實際分區數時,會報錯。          job.setNumReduceTasks(4);          // 如果設置的 numReduceTasks大於 1,而又沒有設置自定義的 PartitionerClass          // 則會調用系統默認的 HashPartitioner實現類來計算分區。          job.setPartitionerClass(WordCountPartitioner.class);          // 設置combine          job.setCombinerClass(WordCountCombiner.class);            // 設置輸入和輸出文件的位置          FileInputFormat.setInputPaths(job, new Path(args[0]));          FileOutputFormat.setOutputPath(job, new Path(args[1]));            // 提交任務,等待執行結果,參數為 true表示打印信息          boolean result = job.waitForCompletion(true);          // 根據 job的返回值自定義退出          System.exit(result?0:1);      }  }

4. 運行

  • 如果在Hadoop集群上運行還需要將這個project打包成jar包,所以一般是先在windows上運行調試。
  • 由於要從命令行輸入input和output參數,所以這裡配置一下輸入和輸出的位置。

五、MapReduce的主要執行流程

  1. job.waitForCompletion(true):將這個MapReduce任務(Job)提交,默認是提交到本地運行;部署到集群時,是提交給YARN運行。
  2. map():在父類Mapper的run()方法中會調用子類重寫的map()方法。輸入文件的每一行都會調用一次map()方法,map()方法的參數中:key為當前輸入行的偏移量,LongWritable類型;value為當前輸入行的數據,Text類型;context為上下文對象。父類Mapper是一個泛型類,泛型的類型表示map()方法輸入和輸出的<K, V>類型,子類在繼承時要傳入實際輸入輸出的<K, V>類型。map()使用context.write(k, v)來輸出數據到shuffle階段的環形緩衝區。
  3. shuffle階段簡述:shuffle階段起到承上啟下的作用;從接收map()方法的輸出,到執行reduce()方法之前都屬於shuffle階段。shuffle接收map()輸出<K,V>並通過K計算出分區號,然後與元數據一起寫入環形緩存區;環形緩衝區溢寫時會將數據排序並寫入小文件,然後歸併成一個大的分區文件。一個ReducerTask主機會到所有MapTask主機上拉取對應的分區文件,歸併所有分區文件後會對相同的key進行合併,再執行reduce方法。
  4. reduce():在父類Reducer的run()方法中會調用子類重寫的reduce()方法。相同的key只會調用一次reduce()方法,reduce()方法的參數中:key為相同key合併後的第一個key,與map()的輸出key類型相同;values為相同key的value列表,類型是Iterable<map()的輸出value類型>。與Mapper類類似,子類在繼承Reducer時輸入的<K, V>類型是Mapper輸出的<K, V>類型、Reducer輸出的<K, V>類型是context.write(K, V)中<K, V>的類型。reduce中的context.write(K, V)最終會寫入到輸出文件中,就是這次MapReduce的結果。