MapReduce —— MapTask階段源碼分析(Input環節)

不得不說閱讀源碼的過程,極其痛苦 。Dream Car 鎮樓 ~ !



雖說整個MapReduce過程也就只有Map階段和Reduce階段,但是仔細想想,在Map階段要做哪些事情?這一階段具體應該包含數據輸入(input),數據計算(map),數據輸出(output),這三個步驟的劃分是非常符合思維習慣的。

從大數據開發的hello world案例入手,如下是一個word count 案例的map程式

public class WcMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    private Text k = new Text();
    private IntWritable v = new IntWritable(1);

    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        //1. 獲取一行數據
        String str = value.toString();
        //2. 按照空格分開單詞
        String[] words = str.split(" ");
        //3. 遍歷集合,拼裝成(word,one)形式
        for (String word : words) {
            this.k.set(word);
            context.write(k, v);
        }
    }
}

自定義的WcMapper類繼承了Mapper類,重寫了map()方法,在這個方法里我們按照需求,編寫了相應的業務邏輯。進入Mapper類中查看。

這個類包含的方法並不多,並且比較符合見名知義的思維規律,可以根據方法輔助注釋大概了解其具體功能。在這個類的頭上還包括一段對類的描述性注釋,大致意思就是map階段到底幹了什麼,嘗試簡單翻譯一下核心內容

  • 將輸入鍵/值對映射到一組中間鍵/值對。
  • 映射是將輸入記錄轉換為中間記錄的單個任務。 轉換後的中間記錄不需要與輸入記錄的類型相同。 一個給定的輸入對可以映射到零個或多個輸出對。
  • Hadoop Map-Reduce 框架為InputFormat為作業生成的每個InputSplit生成一個映射任務。 Mapper實現可以通過JobContext.getConfiguration()訪問作業的Configuration 。
  • 框架首先調用setup(Mapper.Context) ,然後為InputSplit中的每個鍵/值對調用map(Object, Object, Mapper.Context) 。 最後調用cleanup(Mapper.Context) 。
  • 與給定輸出鍵關聯的所有中間值隨後由框架分組,並傳遞給Reducer以確定最終輸出。 用戶可以通過指定兩個關鍵的RawComparator類來控制排序和分組。
  • Mapper輸出按Reducer進行分區。 用戶可以通過實現自定義Partitioner來控制哪些鍵(以及記錄)去哪個Reducer 。
  • 用戶可以選擇通過Job.setCombinerClass(Class)指定combiner來執行中間輸出的本地聚合,這有助於減少從Mapper傳輸到Reducer的數據量。
  • 應用程式可以指定是否以及如何壓縮中間輸出,以及通過Configuration使用哪些CompressionCodec 。
    如果作業有零減少,則Mapper的輸出將直接寫入OutputFormat而不按鍵排序。
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {

  public abstract class Context implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {}

  protected void setup(Context context ) throws IOException, InterruptedException {}

  protected void map(KEYIN key, VALUEIN value,Context context) throws IOException, InterruptedException 
  {context.write((KEYOUT) key, (VALUEOUT) value);}

  protected void cleanup(Context context) throws IOException, InterruptedException {}
  
  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
      while (context.nextKeyValue()) {
        map(context.getCurrentKey(), context.getCurrentValue(), context);
      }
    } finally {
      cleanup(context);
    }
  }
}

看到run(Context context) 這個方法就比較有框架的感覺了,這個方法裡面調用了一次setup(context)cleanup(context),而對map方法則為輸入拆分中的每個鍵/值對調用一次。

這個類看到這也就算結束了,其它的也看不出啥東西了。進入MapTask類,包含了大量的核心業務邏輯方法。這個類會被Yarn反射調用run方法,實例化MapTask。直接進run方法,刪除了部分非核心程式碼,清清爽爽。

@Override
  public void run(final JobConf job, final TaskUmbilicalProtocol umbilical){
    this.umbilical = umbilical;
    if (isMapTask()) {
      // reduce的個數為 0,所以整個任務只有map階段 
      if (conf.getNumReduceTasks() == 0) {
        mapPhase = getProgress().addPhase("map", 1.0f);
      } else {
       // 如果有reduce階段,將進行進度分配
        mapPhase = getProgress().addPhase("map", 0.667f);
       // 排序環節讓後續的reduce環節變得更輕鬆完成,只需拉取一次文件,減少I/O
        sortPhase  = getProgress().addPhase("sort", 0.333f);
      }
    }
    TaskReporter reporter = startReporter(umbilical);
 
    boolean useNewApi = job.getUseNewMapper();
    initialize(job, getJobID(), reporter, useNewApi);

    // check if it is a cleanupJobTask
                 .........
    if (useNewApi) {  // 新舊API的選擇
        // 進這個方法
      runNewMapper(job, splitMetaInfo, umbilical, reporter);
    } else {
      runOldMapper(job, splitMetaInfo, umbilical, reporter);
    }
    done(umbilical, reporter);
  }

繼續進入runNewMapper(job, splitMetaInfo, umbilical, reporter) 方法。裡邊有點長啊,一下不好找到重點。小常識:重要的東西放在try-catch中!! 所以首先看try-catch塊。

private <INKEY,INVALUE,OUTKEY,OUTVALUE>
  void runNewMapper(final JobConf job,final TaskSplitIndex splitIndex,final TaskUmbilicalProtocol umbilical,TaskReporter reporter)  {
               
             
          ............先刪了,略過不看............
    // 用人類的思維過一遍方法名  
    try { 
        // 1、初始化輸入流
      input.initialize(split, mapperContext);
        // 2、直覺調用這個run()方法,最終會調用到自定義的map方法
      mapper.run(mapperContext);
        // 3、完成map計算階段
      mapPhase.complete();
        // 4、排序階段走起
      setPhase(TaskStatus.Phase.SORT);
        // 5、狀態資訊更新或者傳遞(猜的)
      statusUpdate(umbilical);
        // 6、關閉輸入流
      input.close();
      input = null;
        // 7、進入到out階段,輸出map數據
      output.close(mapperContext);
      output = null;
    } finally { 
      // Quietly,默默的做一些事情 ...
      closeQuietly(input);
      closeQuietly(output, mapperContext);
    }
  }

這樣一來整個思路就就很絲滑順暢了,回過頭來看刪除掉的程式碼片段 ,原注釋資訊也蠻好懂的。

   // 1、make a task context so we can get the classes  封裝任務的上下文,job里有configuration
   // 常識:在框架中上下文對象是不可缺少的,有些資訊在業務線來回穿梭,封裝進上下文可以隨時獲取
   // 回憶:客戶端上傳任務到資源層,其中包括Jar包,配置文件,切片三個文件,container拿到可以實例化job
    org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
      new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID(),reporter);

    // 2、make a mapper:根據taskContext + job,實例化出來一個mapper對象 
    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
      (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
        // 就是自己寫的WCMapper對象,也就對應了下邊的 mapper.run(mapperContext)。絲滑~!
        ReflectionUtils.newInstance(taskContext.getMapperClass(), job);

    // 3、make the input format:輸入格式化,為啥需要這個玩意?split是一片數據,那讀一條數據就要這玩意了
    org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
      (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
        // 在寫job配置的時候,其實是可以指定InputFormat噠,默認是TextInputFormat
        ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);

    // 4、rebuild the input split,每個map都要確定自己往哪個split移動
    org.apache.hadoop.mapreduce.InputSplit split = null;
    // 每個mapper都要搞搞清楚自己要讀取哪個split 【計算向數據移動】
    split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
        splitIndex.getStartOffset());

     // 5、input = split + inputFormat (父類是RecordReader)
    org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
      new NewTrackingRecordReader<INKEY,INVALUE>
        // 所以input有能力在split讀取出來一條條的記錄
        (split, inputFormat, reporter, taskContext);
      // 小總結:3、4、5 三步要做的就是——想個辦法在Split中讀取一條數據

//--------------------NewTrackingRecordReader()  begin-------------------------------
  private final org.apache.hadoop.mapreduce.RecordReader<K,V> real; 
  NewTrackingRecordReader(...){
                               .....
       // 調用TextInputFormat的createRecordReader,返回一個LineRecordReader對象
       // 所以input就是一個LineRecordReader對象
      this.real = inputFormat.createRecordReader(split, taskContext);
                               .....
    }
//--------------------NewTrackingRecordReader()  end--------------------------------

                  ...........先略過輸出這一部分...........

    // 6、上面是任務上下文,這裡是map上下文,包含了input、output、split
    org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> 
    mapContext = 
      new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), 
          input, output, 
          committer, 
          reporter, split);
   // 7、又對map上下文包裝了一層mapperContext,包含了input、output、split
   // 這不就是Mapper類中的run(Context context)的入參嘛 ~!! 
    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
        mapperContext = 
          new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
              mapContext);

//-------------Mapper::run(Context context)  begin ----------------------------------
  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
        // 從mapper的上下文里判斷有無下一條數據
      while (context.nextKeyValue()) {
          // 取出切片中的下一條數據進行計算
        map(context.getCurrentKey(), context.getCurrentValue(), context);
      }
    } finally {
      cleanup(context);
    }
  }
// 從map的上下文資訊中是如何獲取到一條數據的?LineRecordReader返回的~~ 層層封裝真絕了
//-------------Mapper::run(Context context)  end ----------------------------------

現在可以回頭看try塊中的 input.initialize(split, mapperContext)的方法,進去看方法實現的細節。數據在HDFS層會被切割開,那麼它能被計算正確是如何實現的? 在這就有相應的實現程式碼不複雜,但是有小亮點。

只保留核心業務邏輯,還是該刪的刪,清清爽爽,開開心心閱讀源碼 ~

// 記住這是Recordreader的初始化方法 
public void initialize(InputSplit genericSplit,TaskAttemptContext context)  {

      // map任務計算是面向切片的,先拿到切片,再拿到切片的始端
    start = split.getStart();
      // 始端 + 切片大小,得到末端
    end = start + split.getLength();
      // 從切片中拿到文件路徑
    final Path file = split.getPath();

    // open the file and seek to the start of the split
      // 獲取到文件系統的一個對象
    final FileSystem fs = file.getFileSystem(job);
      //打開文件,會得到一個面向文件的輸入流
      // 各個map並行執行,所以不會都是從文件頭開始讀,所以它要搭配一個seek()方法 
    fileIn = fs.open(file);
      
        if (...) {
           ......
    } else {
       // 每個map 都會seek到自己切片偏移量的位置開始讀取數據
      fileIn.seek(start);
       // SplitLineReader:切片里的行記錄讀取器。這名字一看就很面向對象
      in = new SplitLineReader(fileIn, job, this.recordDelimiterBytes);
      filePosition = fileIn;
    }
    // If this is not the first split, we always throw away first record
    // because we always (except the last split) read one extra line in
    // next() method.
    // 如果這不是第一次拆分,我們總是丟棄第一條記錄。
    // 因為我們總是(除了最後一次拆分)在 next() 方法中讀取額外的一行。
    // 這就防止了 hello 被拆成了 he llo 導致計算錯誤
    if (start != 0) {
      start += in.readLine(new Text(), 0, maxBytesToConsume(start));
    }
    this.pos = start;
  }

in.readLine(new Text(), 0, maxBytesToConsume(start)) 這個方法把讀到的一行數據交給一個Text對象持有,返回值是一個int類型的數值,表示讀到了多少個位元組。

注意到方法傳參new Text()對象,當方法執行完是時候,這個對象會因為沒有引用被GC回收。那麼既然沒有引用,它在幹嘛?

回憶:切片是一個邏輯切分,默認的大小是一個block塊的大小。假如一個split小於block ,這個block就會被切成多個部分。如果就是尼瑪那麼寸, hello 兩個切片被拆成了 he llo 兩部分,就會導致計算錯誤。這時候向下多讀一行,哎,這個問題就解決啦。

再回頭說:計算向數據移動。被多讀的一行如果在其它的節點怎麼辦?答:把這一行數據傳過來,不必移動計算。

其實看到這裡也就可以明白了,在整個Map的input環節,真正干讀取數據活的是LineRecordReaderkey就是面向行的位元組偏移量。下邊這段程式碼已經出現多次了

  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
        // 從mapper的上下文里判斷有無下一條數據
      while (context.nextKeyValue()) {
          // 取出切片中的下一條數據進行計算
        map(context.getCurrentKey(), context.getCurrentValue(), context);
      }
    } finally {
      cleanup(context);
    }
  }

通過閱讀上邊的源碼我們已經知道此處傳參Context實際上就是一個MapContextImpl對象,context.nextKeyValue()方法也就是在調用LineRecordReader::nextKeyValue()方法。這個方法內部:會對key-value進行賦值,返回boolean值,代表是否賦值成功。總體下來可以說是感覺非常的絲滑~

總結:(我自己能看懂就行了~)

MapTask:input -> map -> output

intput:(Split + format)來自於輸入格式化類返回記錄讀取器對象

TextInputFormat – > LineRecordReader:

Split三個維度:file , offset , length

init():in = fs.open(file).seek。除了第一個切片,都會往下多讀一行。

nextKeyValue():

1、讀取數據中的一條記錄對應的key,value 賦值;

2、返回布爾值;

getCurrentKey()

getCurrentValue()