2,MapReduce原理及源码解读

MapReduce原理及源码解读

一、分片

灵魂拷问:为什么要分片?

  • 分而治之:MapReduce(MR)的核心思想就是分而治之;何时分,如何分就要从原理和源码来入手。做为码农大家都知道,不管一个程序多么复杂,在写代码和学习代码之前最重要的就是搞懂输入和输出,而MR的输入其实就是一个目录。而所谓的分而治之其实也是在把大文件分成小文件,然后一个机器处理一个小文件,最后再合并。所以MR的第一步就是对输入的文件进行分片。

1.1 对谁分片

  • 对每个文件分片:分片是对输入目录中的每一个文件进行分片。后面的分片都是针对单个文件分片。

  • 源码解读(对谁分片):

// 分片的源码位置  package org.apache.hadoop.mapreduce.lib.input;  abstract class FileInputFormat.java;    // 下面代码所在方法  method getSplits();    // InputStatus表示一个切片类  List<InputSplit> splits = new ArrayList<InputSplit>();  // 得到所有输入文件  List<FileStatus> files = listStatus(job);  // 遍历每个文件。 根据每个文件来切片,而不是整个文件夹  for (FileStatus file : files) {        // 分片1  }  

1.2 长度是否为0

  • 文件长度:当文件长度不为0时才会进行下面的分片操作;如果文件长度为0,则会向分片列表中添加一个空的hosts文件数组和空长度的文件。也就是说,空文件也会创建一个空的分片。
  • 源码解读(长度是否为0):
for (FileStatus file : files) {         Path path = file.getPath();         // 获取文件大小         long length = file.getLen();         if (length != 0) {                // 分片2          } else {// 如果文大小为空,默认就创建一个空的hosts文件数组和空长度的文件             //Create empty hosts array for zero length files                splits.add(makeSplit(path, 0, length, new String[0]));          }  }  

1.3 是否可以分片

  • 压缩格式:并不是所有的文件都可以分片,有一些压缩格式的文件是不可以分片的。因此只会对可以分片的文件进行分片,而不可以分片的文件即使再大也会作为一个整体来处理,相当于一个片。
  • 源码解读(是否可以分片):
// 如果可以分片  if (isSplitable(job, path)) {      // 分片3  } else { // not splitable          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),          blkLocations[0].getCachedHosts()));  }    // 判断一个文件是否可以切片  // FileInputFormat抽象类中默认返回true,子类TextInputFormat中实现如下  @Override  protected boolean isSplitable(JobContext context, Path file) {       final CompressionCodec codec =             new CompressionCodecFactory(context.getConfiguration()).getCodec(file);       if (null == codec) {// 如果一个文件的压缩编码为null,那么表示可以切片             return true;       }// 如果一个文件的压缩编码是SplittableCompressionCodec的子类,那么表示当前文件也可以切片       return codec instanceof SplittableCompressionCodec;  }  

1.4 分片的大小

  • 分片大小:分片太大就失去了分片的意义;如果分片很小,则管理和构建map任务的时间就会增多,效率变低。并且如果分片跨越两个数据块,那么分片的部分数据需要通过网络传输到map任务运行的节点上,效率会更低。所以分片的最佳大小应该和HDFS的分块大小一致。Hadoop2默认128M。
  • 源码解读(分片大小):
// FormatMinSplitSize是 1, MinSplitSize如果没配置默认是 1  long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));  // 如果没配置,则默认是 Long类型的最大值  long maxSize = getMaxSplitSize(job);  // 块大小,Hadoop2是128M,本地模式为32M  long blockSize = file.getBlockSize();  // 分片大小计算公式。默认就是blockSize的大小  long splitSize=Math.max(minSize, Math.min(maxSize, blockSize));    
  • 自定义分片大小:由上面的公式可知,默认的分片大小就是blockSize的大小。如果要自定义大于blockSize,比如改为200M,就把minSize改为200;小于blockSize,比如20M,就把maxSize改为20
  • 1.1倍:最常见的问题就是:一个大小为130M的文件,在分片大小为128M的集群上会分成几片?答案是1片;因为 128*1.1>130,准确来说应该是130 / 128 < 1.1 (源码的公式)。也就是说,如果剩下的文件大小在分片大小的1.1倍以内,就不会再分片了。要这个1.1倍,是为了优化性能;试想如果不这样,当还剩下130M大小的时候,就会分成一块128M,一块2M,后面还要为这个2M的块单独开一个map任务,不划算。至于为什么是1.1,这个1.1是专家们通过反复试验得出来的结果。
  • 源码解读(1.1倍):
// 当剩余文件的大小,大于分片大小的1.1倍时,才会分片  private static final double SPLIT_SLOP = 1.1;   // 10% slop  // bytesRemaining为文件剩余大小,splitSize为上面计算出的分片大小  while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {          // 分片4  }  

1.5 开始分片

  • 终于分片了:经过上面的层层条件,下面就是// 分片4中的分片代码。与HDFS的物理分块不同的是,MapReduce的分片只是逻辑上的分片,即按照偏移量分片。
 // 封装一个分片信息(包含文件的路径,分片的起始偏移量,要处理的大小,分片包含的块的信息,分片中包含的块存在哪儿些机器上)  int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);  // makeSplit进行切片操作,返回值是一个切片,并且加入到切片列表中  splits.add(makeSplit(path, length - bytesRemaining, splitSize,                 blkLocations[blkIndex].getHosts(),                 blkLocations[blkIndex].getCachedHosts()));  // 剩余文件大小  bytesRemaining -= splitSize;  

1.6 分片后读取会不会断行

  • 不会:由于分片时是按照长度进行分片的,那就有很大可能会把一行数据分在两个片里面,所以分片的时候确实会断行。如果读取并处理断行的数据,就会导致结果不正确,那是肯定不行的。所以LineRecordReader类就充当了读取记录的角色,保证读取不断行;其中nextKeyValue()方法里是真正给Mapper中的key赋值的地方,并且调用了父类LineReader类中的readLine()方法来给value赋值。
  • 源码解读(读取时不断行):
public class TextInputFormat extends FileInputFormat<LongWritable, Text> {      @Override      public RecordReader<LongWritable, Text>      createRecordReader(InputSplit split,                         TaskAttemptContext context) {          String delimiter = context.getConfiguration().get(                  "textinputformat.record.delimiter");          // 行分隔符          byte[] recordDelimiterBytes = null;          if (null != delimiter)              recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);           // 返回LineRecordReader对象          return new LineRecordReader(recordDelimiterBytes);      }  }    // 行记录读取类,提供读取片中数据的功能,并且保证不断行  public class LineRecordReader extends RecordReader<LongWritable, Text> {      // ......其他代码        public void initialize(InputSplit genericSplit,                             TaskAttemptContext context) throws IOException {          // ......            // 如果不是第一个分片,则开始位置退到下一行记录的开始位置          // 因为为了保证读取时不断行,每个块都会向后多读一行(最后一个除外)          if (start != 0) {              start += in.readLine(new Text(), 0, maxBytesToConsume(start));          }      }        public boolean nextKeyValue() throws IOException {          // 给Mapper中输入的key赋值          key.set(pos);          // 实例化Mapper中输入的value          if (value == null) {              value = new Text();          }          // 注意是<=end,在等于end时还会执行一次,多读了一行,所以不会断行          while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {              if (pos == 0) {                  newSize = skipUtfByteOrderMark();              } else {                  // 给Mapper中输入的value赋值。                  // readLine方法会根据是否自定义行分隔符来调用不同的方法。                  newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));                  pos += newSize;              }          }      }  }  

二、Map阶段

2.1 实例化Mapper

  • 各种实例化:上面费了很大的劲来编写分片TextInputFormat,和读取类LineRecordReader;而这一切都是为了把输入数据很好的传给map()方法来运算,所以首先就要实例化我们自定义的Mapper类。

  • 源码解读(各种实例化):

package org.apache.hadoop.mapred;  class MapTask.java;    method runNewMapper();    // 通过反射来获取Mapper。在Job中设置的Mapper,也就是自己定义的继承自Mapper的类  org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =    (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)      ReflectionUtils.newInstance(taskContext.getMapperClass(), job);  // 通过反射来得到 InputFormat。默认是TextInputFormat  org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =    (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)      ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);  // 获得当前MapTask要处理的split  org.apache.hadoop.mapreduce.InputSplit split = null;  split = getSplitDetails(new Path(splitIndex.getSplitLocation()),      splitIndex.getStartOffset());  LOG.info("Processing split: " + split);  // 根据InputFormat对象创建RecordReader对象。默认是LineRecordReader  org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =    new NewTrackingRecordReader<INKEY,INVALUE>      (split, inputFormat, reporter, taskContext);    // 初始化。用来打开文件,并且调整文件的头指针  input.initialize(split, mapperContext);  // MapTask中调用Mapper的run()方法  mapper.run(mapperContext);  

2.2 调用map()方法

  • 每行数据调用一次:从上面的代码中我们知道,MapTask中会调用Mapper类的run()方法;而run()方法会在while循环中调用map()方法,由退出条件可知,是每一行数据调用一次map()方法。
  • 源码解读(怎么调用map()方法):
public void run(Context context) throws IOException, InterruptedException {      // 在所有map执行之前初始化,也可以根据业务需要来重写此方法      setup(context);      try {          // context.nextKeyValue()其实就是LineRecordReader中的nextKeyValue()方法;          // 在run方法中遍历所有的key,每行数据都执行一次自定义map方法;          while (context.nextKeyValue()) {              map(context.getCurrentKey(), context.getCurrentValue(), context);          }      } finally {          // 父类Mapper中的setup()和cleanup()方法中什么都没做;          // 只执行一次,可以根据业务需要来重写此方法;          cleanup(context);      }  }  

三、Shuffle阶段

灵魂拷问:哪来的Shuffle?

  • 理论与实现:看过源码的都知道,其实源码中根本就没有什么shuffle;shuffle只是一个过程,确切的来说是连贯Map阶段和reduce阶段的一个理论过程,而它的实现主要在MapTask和ReduceTask类中。shuffle阶段可以说是MapReduce中最核心的一个阶段。

3.1 shuffle的概念

  • 作用:shuffle这个单词的本意是洗牌、打乱的意思,而在这里则是:将map端的无规则输出按照指定的规则“打乱”成具有一定规则的数据,以便reduce端接收和处理。
  • 流程:shuffle的范围是map输出后到reduce输入前。它的流程主要包括Map端shuffle和reduce端shuffle。
  • MapReduce大致流程:

3.2 Map端Shuffle

  • 作用:Map端的shuffle过程是对Map的结果进行分区、排序、溢写、合并分区,最后写入磁盘;最终会得到一个分区有序的文件,即先按分区排序,再按key排序。
  • Map端shuffle大致流程:

3.2.1 分区(partition)

  • 概念:对于map的每一个输出的键值对,都会根据key来生成partition再一起写入环形缓冲区。每一个reduceTask会处理一个partition(第0个reduceTask处理partition为0的分区,以此类推)。
  • 如何分区:默认情况下,分区号是key的hash值对numReduceTask数量取模的结果。也可以自定义分区。
  • 源码解读(如何分区):
// 当设置的reduceTask数大于实际分区数时,可以正常执行,多出的分区为空文件;  // 当设置的reduceTask数小于实际分区数时,会报错。  job.setNumReduceTasks(4);  // 如果设置的 numReduceTasks大于 1,而又没有设置自定义的 PartitionerClass  // 则会调用系统默认的 HashPartitioner实现类来计算分区。  job.setPartitionerClass(WordCountPartitioner.class);  
// 自定义分区  public class WordCountPartitioner extends Partitioner<Text, IntWritable> {      private static HashMap<String, Integer> map = new HashMap<>();      static {          map.put("0734", 0);          map.put("0561", 1);          map.put("0428", 2);      }        // 当 Mapper的输出要写入环形缓冲区时,会调用此方法来计算当前<K,V>的分区号      @Override      public int getPartition(Text text, IntWritable intWritable, int numPartitions) {          String strText = text.toString();          return map.getOrDefault(strText.substring(0, 4), 3);      }  }  
// MapTask.java$NewOutputCollector  public void write(K key, V value) throws IOException, InterruptedException {        // 把 K,V以及分区号写入环形缓冲区        collector.collect(key, value,                          partitioner.getPartition(key, value, partitions));  }  

3.2.2 写入环形缓冲区

  • 概念:环形缓冲区是在内存中的一个字节数组kvbuffer。kvbuffer不仅存放map输出的<k, v>,还在另一端存放了<k, v>的索引(元数据) kvmeta,每个kvmeta包括value的起始位置、key的起始位置、partition值、value的长度,占用4个int长度。上图中的bufindex和kvindex分别表示kvbuffer和kvmeta的指针。环形缓冲区的默认大小是100M,当写入数据大小超过80%(80M)就会触发Spill,溢写到磁盘。
  • 源码解读(Spill):
// SpillThread线程在MapTask$MapOutputBuffer类中初始化,在init()方法中启动。  // 它会一直监视环形缓冲区,当大小超过80%的时候,就会调用sortAndSpill()方法。  protected class SpillThread extends Thread {      @Override        public void run() {              // ....              // run方法中调用排序并溢写方法            while (true) {                // ....                sortAndSpill();            }              //....        }  }  

3.2.3 排序并溢写(sortAndSpill):

  • 排序:触发溢写后,会先排序,再溢写。排序是根据partition和key的升序排序,移动的只是索引数据,排序的结果是将kvmeta中到的数据按照partition聚合在一起,同一个partition内再根据key排序。

  • 溢写:Spill线程根据排序后的kvmeta文件,将一个个partition输出到文件,在这次溢写过程中,会将环形缓冲区中已计算的数据(80M)写入到一个文件spill.out,所以引入了索引文件spill.index,它记录了partition在spill.out中的位置。

3.2.4 合并(merge):

  • 概念:如果Map的数据很大,那么就会触发多次Spill,spill.out和spill.index文件也会很多。所以最后就要把这些文件合并,方便Reduce读取。
  • 合并过程:合并过程中,首先会根据spill.index文件,将spill.out文件中的partition使用归并排序分别写入到相应的segment中,然后再把所有的segment写入到一个file.out文件中,并用file.out.index来记录partition的索引。由于合并时可能有相同的key,所以如果设置了combine,那么在写入文件之前还会调用自定义的combine方法。

3.3 Reduce端Shuffle

3.3.1 拉取(Copy)

  • 前期工作:Reduce任务会通过HTTP向各个Map任务拉取它所需的partition数据。当Map任务成功完成之后会通知 TaskTracker状态已跟新,TaskTracker进而通知JobTracker(都是通过心跳机制实现),所以JobTracker中记录了Map输出和TaskTracker的映射关系。
  • 何时拉取:Reduce会定期向JobTracker获取Map的输出位置,一旦拿到输出位置,Reduce任务就会立即从此输出对应的TaskTracker上复制相应的partition数据到本地,而不是等到所有Map任务结束。

3.3.2 排序合并(Merge Sort)

  • 合并:copy过来的数据会先放入内存缓冲区中(大小是 JVM的heap size的70%),如果缓冲区放得下就直接把数据写入内存,即内存到内存merge。如果缓冲区中的Map数据达到一定大小(缓冲区的66%)的时候,就会开启内存merge,并将merge后的数据写入磁盘,即内存到磁盘merge。当属于该Reduce任务的map输出全部拉取完成,则会在reduce任务的磁盘上生成多个文件(如果所有map输出的大小没有超过缓冲区大小,则数据只存在于内存中),这时开始最后的合并操作,即磁盘到磁盘merge。如果设置了combine,合并时也会执行。
  • 排序:由于map输出的数据已经是有序的,所以reduce在合并时的排序是归并排序,并且reduce端的copy和sort是同时进行的,最终会得到一个整体有序的数据。

3.3.3 归并分组(reduce)

  • 归并分组(reduce):当reduce任务执行完拉取和排序合并后,就会对相同的key进行分组。默认情况下是根据key对象中重写的compareTo()方法来分组,如果设置了GroupingComparator,则会调用它的compare()方法来分组。reduce会把compareTo(或compare)方法计算返回为 0 的key分为一组,最终会得到一个组<key, Iterable<value,>>,其中组的key是这一组的第一个数据的key,Iterable<value,>则是相同key的value迭代器。最后再对每一个组调用Reducer的reduce()方法。

  • 源码解读(分组):

// org.apache.hadoop.mapreduce.Reducer中的run()方法  while (context.nextKey()) {       // 调用自定义 reduce方法       reduce(context.getCurrentKey(), context.getValues(), context);       // .....  }    // org.apache.hadoop.mapreduce.task.ReduceContextImpl中的方法  public boolean nextKey() throws IOException,InterruptedException {      // 如果当前key与下一个key相同,则继续往下走;      // 这一步就是把相同的key放到一组, 他们的value放到一个迭代器中;当下一个key不同时再调用reduce方法      while (hasMore && nextKeyIsSame) {        nextKeyValue();      }      if (hasMore) {        if (inputKeyCounter != null) {          // 计数器          inputKeyCounter.increment(1);        }        // 当nextKeyIsSame为false时,会再调用一次nextKeyValue(),而它的返回值必为true;        return nextKeyValue();      } else {        return false;      }  }    @Override  public boolean nextKeyValue() throws IOException, InterruptedException {      if (hasMore) {        nextKey = input.getKey();        // 在执行reduce方法之前调用ReduceContext中定义的GroupComparator        // 如果key的compareTo方法返回0则 nextKeyIsSame为true,也就会分到一组        nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0,                                       currentRawKey.getLength(),                                       nextKey.getData(),                                       nextKey.getPosition(),                                       nextKey.getLength() - nextKey.getPosition()                                           ) == 0;      } else {        nextKeyIsSame = false;      }      inputValueCounter.increment(1);      return true;  }  

四、Reduce阶段

4.1 执行reduce()方法

  • 归并:上面的Shuffle阶段已经将数据分组成了<key, Iteralble<value,>>格式的数据,所以对于相同的key只会调用一次reduce()方法。
  • 注意事项:在reduce()方法中,一定要重新创建key对象,不要直接使用参数中的key。

4.2 输出最终结果

  • 完结:整个MapReduce的输出和输入有点类似。输出是实例化TextOutputFormat和LineRecordWrite对象。并由LineRecordWrite判断是不是NullWriteable,最后输出到文件