2,MapReduce原理及源碼解讀

MapReduce原理及源碼解讀

一、分片

靈魂拷問:為什麼要分片?

  • 分而治之:MapReduce(MR)的核心思想就是分而治之;何時分,如何分就要從原理和源碼來入手。做為碼農大家都知道,不管一個程序多麼複雜,在寫代碼和學習代碼之前最重要的就是搞懂輸入和輸出,而MR的輸入其實就是一個目錄。而所謂的分而治之其實也是在把大文件分成小文件,然後一個機器處理一個小文件,最後再合併。所以MR的第一步就是對輸入的文件進行分片。

1.1 對誰分片

  • 對每個文件分片:分片是對輸入目錄中的每一個文件進行分片。後面的分片都是針對單個文件分片。

  • 源碼解讀(對誰分片):

// 分片的源碼位置  package org.apache.hadoop.mapreduce.lib.input;  abstract class FileInputFormat.java;    // 下面代碼所在方法  method getSplits();    // InputStatus表示一個切片類  List<InputSplit> splits = new ArrayList<InputSplit>();  // 得到所有輸入文件  List<FileStatus> files = listStatus(job);  // 遍歷每個文件。 根據每個文件來切片,而不是整個文件夾  for (FileStatus file : files) {        // 分片1  }  

1.2 長度是否為0

  • 文件長度:當文件長度不為0時才會進行下面的分片操作;如果文件長度為0,則會向分片列表中添加一個空的hosts文件數組和空長度的文件。也就是說,空文件也會創建一個空的分片。
  • 源碼解讀(長度是否為0):
for (FileStatus file : files) {         Path path = file.getPath();         // 獲取文件大小         long length = file.getLen();         if (length != 0) {                // 分片2          } else {// 如果文大小為空,默認就創建一個空的hosts文件數組和空長度的文件             //Create empty hosts array for zero length files                splits.add(makeSplit(path, 0, length, new String[0]));          }  }  

1.3 是否可以分片

  • 壓縮格式:並不是所有的文件都可以分片,有一些壓縮格式的文件是不可以分片的。因此只會對可以分片的文件進行分片,而不可以分片的文件即使再大也會作為一個整體來處理,相當於一個片。
  • 源碼解讀(是否可以分片):
// 如果可以分片  if (isSplitable(job, path)) {      // 分片3  } else { // not splitable          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),          blkLocations[0].getCachedHosts()));  }    // 判斷一個文件是否可以切片  // FileInputFormat抽象類中默認返回true,子類TextInputFormat中實現如下  @Override  protected boolean isSplitable(JobContext context, Path file) {       final CompressionCodec codec =             new CompressionCodecFactory(context.getConfiguration()).getCodec(file);       if (null == codec) {// 如果一個文件的壓縮編碼為null,那麼表示可以切片             return true;       }// 如果一個文件的壓縮編碼是SplittableCompressionCodec的子類,那麼表示當前文件也可以切片       return codec instanceof SplittableCompressionCodec;  }  

1.4 分片的大小

  • 分片大小:分片太大就失去了分片的意義;如果分片很小,則管理和構建map任務的時間就會增多,效率變低。並且如果分片跨越兩個數據塊,那麼分片的部分數據需要通過網絡傳輸到map任務運行的節點上,效率會更低。所以分片的最佳大小應該和HDFS的分塊大小一致。Hadoop2默認128M。
  • 源碼解讀(分片大小):
// FormatMinSplitSize是 1, MinSplitSize如果沒配置默認是 1  long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));  // 如果沒配置,則默認是 Long類型的最大值  long maxSize = getMaxSplitSize(job);  // 塊大小,Hadoop2是128M,本地模式為32M  long blockSize = file.getBlockSize();  // 分片大小計算公式。默認就是blockSize的大小  long splitSize=Math.max(minSize, Math.min(maxSize, blockSize));    
  • 自定義分片大小:由上面的公式可知,默認的分片大小就是blockSize的大小。如果要自定義大於blockSize,比如改為200M,就把minSize改為200;小於blockSize,比如20M,就把maxSize改為20
  • 1.1倍:最常見的問題就是:一個大小為130M的文件,在分片大小為128M的集群上會分成幾片?答案是1片;因為 128*1.1>130,準確來說應該是130 / 128 < 1.1 (源碼的公式)。也就是說,如果剩下的文件大小在分片大小的1.1倍以內,就不會再分片了。要這個1.1倍,是為了優化性能;試想如果不這樣,當還剩下130M大小的時候,就會分成一塊128M,一塊2M,後面還要為這個2M的塊單獨開一個map任務,不划算。至於為什麼是1.1,這個1.1是專家們通過反覆試驗得出來的結果。
  • 源碼解讀(1.1倍):
// 當剩餘文件的大小,大於分片大小的1.1倍時,才會分片  private static final double SPLIT_SLOP = 1.1;   // 10% slop  // bytesRemaining為文件剩餘大小,splitSize為上面計算出的分片大小  while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {          // 分片4  }  

1.5 開始分片

  • 終於分片了:經過上面的層層條件,下面就是// 分片4中的分片代碼。與HDFS的物理分塊不同的是,MapReduce的分片只是邏輯上的分片,即按照偏移量分片。
 // 封裝一個分片信息(包含文件的路徑,分片的起始偏移量,要處理的大小,分片包含的塊的信息,分片中包含的塊存在哪兒些機器上)  int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);  // makeSplit進行切片操作,返回值是一個切片,並且加入到切片列表中  splits.add(makeSplit(path, length - bytesRemaining, splitSize,                 blkLocations[blkIndex].getHosts(),                 blkLocations[blkIndex].getCachedHosts()));  // 剩餘文件大小  bytesRemaining -= splitSize;  

1.6 分片後讀取會不會斷行

  • 不會:由於分片時是按照長度進行分片的,那就有很大可能會把一行數據分在兩個片裏面,所以分片的時候確實會斷行。如果讀取並處理斷行的數據,就會導致結果不正確,那是肯定不行的。所以LineRecordReader類就充當了讀取記錄的角色,保證讀取不斷行;其中nextKeyValue()方法里是真正給Mapper中的key賦值的地方,並且調用了父類LineReader類中的readLine()方法來給value賦值。
  • 源碼解讀(讀取時不斷行):
public class TextInputFormat extends FileInputFormat<LongWritable, Text> {      @Override      public RecordReader<LongWritable, Text>      createRecordReader(InputSplit split,                         TaskAttemptContext context) {          String delimiter = context.getConfiguration().get(                  "textinputformat.record.delimiter");          // 行分隔符          byte[] recordDelimiterBytes = null;          if (null != delimiter)              recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);           // 返回LineRecordReader對象          return new LineRecordReader(recordDelimiterBytes);      }  }    // 行記錄讀取類,提供讀取片中數據的功能,並且保證不斷行  public class LineRecordReader extends RecordReader<LongWritable, Text> {      // ......其他代碼        public void initialize(InputSplit genericSplit,                             TaskAttemptContext context) throws IOException {          // ......            // 如果不是第一個分片,則開始位置退到下一行記錄的開始位置          // 因為為了保證讀取時不斷行,每個塊都會向後多讀一行(最後一個除外)          if (start != 0) {              start += in.readLine(new Text(), 0, maxBytesToConsume(start));          }      }        public boolean nextKeyValue() throws IOException {          // 給Mapper中輸入的key賦值          key.set(pos);          // 實例化Mapper中輸入的value          if (value == null) {              value = new Text();          }          // 注意是<=end,在等於end時還會執行一次,多讀了一行,所以不會斷行          while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {              if (pos == 0) {                  newSize = skipUtfByteOrderMark();              } else {                  // 給Mapper中輸入的value賦值。                  // readLine方法會根據是否自定義行分隔符來調用不同的方法。                  newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));                  pos += newSize;              }          }      }  }  

二、Map階段

2.1 實例化Mapper

  • 各種實例化:上面費了很大的勁來編寫分片TextInputFormat,和讀取類LineRecordReader;而這一切都是為了把輸入數據很好的傳給map()方法來運算,所以首先就要實例化我們自定義的Mapper類。

  • 源碼解讀(各種實例化):

package org.apache.hadoop.mapred;  class MapTask.java;    method runNewMapper();    // 通過反射來獲取Mapper。在Job中設置的Mapper,也就是自己定義的繼承自Mapper的類  org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =    (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)      ReflectionUtils.newInstance(taskContext.getMapperClass(), job);  // 通過反射來得到 InputFormat。默認是TextInputFormat  org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =    (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)      ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);  // 獲得當前MapTask要處理的split  org.apache.hadoop.mapreduce.InputSplit split = null;  split = getSplitDetails(new Path(splitIndex.getSplitLocation()),      splitIndex.getStartOffset());  LOG.info("Processing split: " + split);  // 根據InputFormat對象創建RecordReader對象。默認是LineRecordReader  org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =    new NewTrackingRecordReader<INKEY,INVALUE>      (split, inputFormat, reporter, taskContext);    // 初始化。用來打開文件,並且調整文件的頭指針  input.initialize(split, mapperContext);  // MapTask中調用Mapper的run()方法  mapper.run(mapperContext);  

2.2 調用map()方法

  • 每行數據調用一次:從上面的代碼中我們知道,MapTask中會調用Mapper類的run()方法;而run()方法會在while循環中調用map()方法,由退出條件可知,是每一行數據調用一次map()方法。
  • 源碼解讀(怎麼調用map()方法):
public void run(Context context) throws IOException, InterruptedException {      // 在所有map執行之前初始化,也可以根據業務需要來重寫此方法      setup(context);      try {          // context.nextKeyValue()其實就是LineRecordReader中的nextKeyValue()方法;          // 在run方法中遍歷所有的key,每行數據都執行一次自定義map方法;          while (context.nextKeyValue()) {              map(context.getCurrentKey(), context.getCurrentValue(), context);          }      } finally {          // 父類Mapper中的setup()和cleanup()方法中什麼都沒做;          // 只執行一次,可以根據業務需要來重寫此方法;          cleanup(context);      }  }  

三、Shuffle階段

靈魂拷問:哪來的Shuffle?

  • 理論與實現:看過源碼的都知道,其實源碼中根本就沒有什麼shuffle;shuffle只是一個過程,確切的來說是連貫Map階段和reduce階段的一個理論過程,而它的實現主要在MapTask和ReduceTask類中。shuffle階段可以說是MapReduce中最核心的一個階段。

3.1 shuffle的概念

  • 作用:shuffle這個單詞的本意是洗牌、打亂的意思,而在這裡則是:將map端的無規則輸出按照指定的規則「打亂」成具有一定規則的數據,以便reduce端接收和處理。
  • 流程:shuffle的範圍是map輸出後到reduce輸入前。它的流程主要包括Map端shuffle和reduce端shuffle。
  • MapReduce大致流程:

3.2 Map端Shuffle

  • 作用:Map端的shuffle過程是對Map的結果進行分區、排序、溢寫、合併分區,最後寫入磁盤;最終會得到一個分區有序的文件,即先按分區排序,再按key排序。
  • Map端shuffle大致流程:

3.2.1 分區(partition)

  • 概念:對於map的每一個輸出的鍵值對,都會根據key來生成partition再一起寫入環形緩衝區。每一個reduceTask會處理一個partition(第0個reduceTask處理partition為0的分區,以此類推)。
  • 如何分區:默認情況下,分區號是key的hash值對numReduceTask數量取模的結果。也可以自定義分區。
  • 源碼解讀(如何分區):
// 當設置的reduceTask數大於實際分區數時,可以正常執行,多出的分區為空文件;  // 當設置的reduceTask數小於實際分區數時,會報錯。  job.setNumReduceTasks(4);  // 如果設置的 numReduceTasks大於 1,而又沒有設置自定義的 PartitionerClass  // 則會調用系統默認的 HashPartitioner實現類來計算分區。  job.setPartitionerClass(WordCountPartitioner.class);  
// 自定義分區  public class WordCountPartitioner extends Partitioner<Text, IntWritable> {      private static HashMap<String, Integer> map = new HashMap<>();      static {          map.put("0734", 0);          map.put("0561", 1);          map.put("0428", 2);      }        // 當 Mapper的輸出要寫入環形緩衝區時,會調用此方法來計算當前<K,V>的分區號      @Override      public int getPartition(Text text, IntWritable intWritable, int numPartitions) {          String strText = text.toString();          return map.getOrDefault(strText.substring(0, 4), 3);      }  }  
// MapTask.java$NewOutputCollector  public void write(K key, V value) throws IOException, InterruptedException {        // 把 K,V以及分區號寫入環形緩衝區        collector.collect(key, value,                          partitioner.getPartition(key, value, partitions));  }  

3.2.2 寫入環形緩衝區

  • 概念:環形緩衝區是在內存中的一個位元組數組kvbuffer。kvbuffer不僅存放map輸出的<k, v>,還在另一端存放了<k, v>的索引(元數據) kvmeta,每個kvmeta包括value的起始位置、key的起始位置、partition值、value的長度,佔用4個int長度。上圖中的bufindex和kvindex分別表示kvbuffer和kvmeta的指針。環形緩衝區的默認大小是100M,當寫入數據大小超過80%(80M)就會觸發Spill,溢寫到磁盤。
  • 源碼解讀(Spill):
// SpillThread線程在MapTask$MapOutputBuffer類中初始化,在init()方法中啟動。  // 它會一直監視環形緩衝區,當大小超過80%的時候,就會調用sortAndSpill()方法。  protected class SpillThread extends Thread {      @Override        public void run() {              // ....              // run方法中調用排序並溢寫方法            while (true) {                // ....                sortAndSpill();            }              //....        }  }  

3.2.3 排序並溢寫(sortAndSpill):

  • 排序:觸發溢寫後,會先排序,再溢寫。排序是根據partition和key的升序排序,移動的只是索引數據,排序的結果是將kvmeta中到的數據按照partition聚合在一起,同一個partition內再根據key排序。

  • 溢寫:Spill線程根據排序後的kvmeta文件,將一個個partition輸出到文件,在這次溢寫過程中,會將環形緩衝區中已計算的數據(80M)寫入到一個文件spill.out,所以引入了索引文件spill.index,它記錄了partition在spill.out中的位置。

3.2.4 合併(merge):

  • 概念:如果Map的數據很大,那麼就會觸發多次Spill,spill.out和spill.index文件也會很多。所以最後就要把這些文件合併,方便Reduce讀取。
  • 合併過程:合併過程中,首先會根據spill.index文件,將spill.out文件中的partition使用歸併排序分別寫入到相應的segment中,然後再把所有的segment寫入到一個file.out文件中,並用file.out.index來記錄partition的索引。由於合併時可能有相同的key,所以如果設置了combine,那麼在寫入文件之前還會調用自定義的combine方法。

3.3 Reduce端Shuffle

3.3.1 拉取(Copy)

  • 前期工作:Reduce任務會通過HTTP向各個Map任務拉取它所需的partition數據。當Map任務成功完成之後會通知 TaskTracker狀態已跟新,TaskTracker進而通知JobTracker(都是通過心跳機制實現),所以JobTracker中記錄了Map輸出和TaskTracker的映射關係。
  • 何時拉取:Reduce會定期向JobTracker獲取Map的輸出位置,一旦拿到輸出位置,Reduce任務就會立即從此輸出對應的TaskTracker上複製相應的partition數據到本地,而不是等到所有Map任務結束。

3.3.2 排序合併(Merge Sort)

  • 合併:copy過來的數據會先放入內存緩衝區中(大小是 JVM的heap size的70%),如果緩衝區放得下就直接把數據寫入內存,即內存到內存merge。如果緩衝區中的Map數據達到一定大小(緩衝區的66%)的時候,就會開啟內存merge,並將merge後的數據寫入磁盤,即內存到磁盤merge。當屬於該Reduce任務的map輸出全部拉取完成,則會在reduce任務的磁盤上生成多個文件(如果所有map輸出的大小沒有超過緩衝區大小,則數據只存在於內存中),這時開始最後的合併操作,即磁盤到磁盤merge。如果設置了combine,合併時也會執行。
  • 排序:由於map輸出的數據已經是有序的,所以reduce在合併時的排序是歸併排序,並且reduce端的copy和sort是同時進行的,最終會得到一個整體有序的數據。

3.3.3 歸併分組(reduce)

  • 歸併分組(reduce):當reduce任務執行完拉取和排序合併後,就會對相同的key進行分組。默認情況下是根據key對象中重寫的compareTo()方法來分組,如果設置了GroupingComparator,則會調用它的compare()方法來分組。reduce會把compareTo(或compare)方法計算返回為 0 的key分為一組,最終會得到一個組<key, Iterable<value,>>,其中組的key是這一組的第一個數據的key,Iterable<value,>則是相同key的value迭代器。最後再對每一個組調用Reducer的reduce()方法。

  • 源碼解讀(分組):

// org.apache.hadoop.mapreduce.Reducer中的run()方法  while (context.nextKey()) {       // 調用自定義 reduce方法       reduce(context.getCurrentKey(), context.getValues(), context);       // .....  }    // org.apache.hadoop.mapreduce.task.ReduceContextImpl中的方法  public boolean nextKey() throws IOException,InterruptedException {      // 如果當前key與下一個key相同,則繼續往下走;      // 這一步就是把相同的key放到一組, 他們的value放到一個迭代器中;當下一個key不同時再調用reduce方法      while (hasMore && nextKeyIsSame) {        nextKeyValue();      }      if (hasMore) {        if (inputKeyCounter != null) {          // 計數器          inputKeyCounter.increment(1);        }        // 當nextKeyIsSame為false時,會再調用一次nextKeyValue(),而它的返回值必為true;        return nextKeyValue();      } else {        return false;      }  }    @Override  public boolean nextKeyValue() throws IOException, InterruptedException {      if (hasMore) {        nextKey = input.getKey();        // 在執行reduce方法之前調用ReduceContext中定義的GroupComparator        // 如果key的compareTo方法返回0則 nextKeyIsSame為true,也就會分到一組        nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0,                                       currentRawKey.getLength(),                                       nextKey.getData(),                                       nextKey.getPosition(),                                       nextKey.getLength() - nextKey.getPosition()                                           ) == 0;      } else {        nextKeyIsSame = false;      }      inputValueCounter.increment(1);      return true;  }  

四、Reduce階段

4.1 執行reduce()方法

  • 歸併:上面的Shuffle階段已經將數據分組成了<key, Iteralble<value,>>格式的數據,所以對於相同的key只會調用一次reduce()方法。
  • 注意事項:在reduce()方法中,一定要重新創建key對象,不要直接使用參數中的key。

4.2 輸出最終結果

  • 完結:整個MapReduce的輸出和輸入有點類似。輸出是實例化TextOutputFormat和LineRecordWrite對象。並由LineRecordWrite判斷是不是NullWriteable,最後輸出到文件