環形緩衝區-Hadoop Shuffle過程中的利器

  • 2020 年 2 月 10 日
  • 筆記

這篇文章來自一個讀者在面試過程中的一個問題,Hadoop在shuffle過程中使用了一個數據結構-環形緩衝區

環形隊列是在實際編程極為有用的數據結構,它是一個首尾相連的FIFO的數據結構,採用數組的線性空間,數據組織簡單。能很快知道隊列是否滿為空。能以很快速度的來存取數據。 因為有簡單高效的原因,甚至在硬體都實現了環形隊列。

環形隊列廣泛用於網路數據收發,和不同程式間數據交換(比如內核與應用程式大量交換數據,從硬體接收大量數據)均使用了環形隊列。

環形緩衝區數據結構

Map過程中環形緩衝區是指數據被map處理之後會先放入記憶體,記憶體中的這片區域就是環形緩衝區。

環形緩衝區是在MapTask.MapOutputBuffer中定義的,相關的屬性如下:

// k/v accounting  // 存放meta數據的IntBuffer,都是int entry,佔4byte  private IntBuffer kvmeta; // metadata overlay on backing store  int kvstart;            // marks origin of spill metadata  int kvend;              // marks end of spill metadata  int kvindex;            // marks end of fully serialized records  // 分割meta和key value內容的標識  // meta數據和key value內容都存放在同一個環形緩衝區,所以需要分隔開  int equator;            // marks origin of meta/serialization  int bufstart;           // marks beginning of spill  int bufend;             // marks beginning of collectable  int bufmark;            // marks end of record  int bufindex;           // marks end of collected  int bufvoid;            // marks the point where we should stop                          // reading at the end of the buffer  // 存放key value的byte數組,單位是byte,注意與kvmeta區分  byte[] kvbuffer;        // main output buffer  private final byte[] b0 = new byte[0];    // key value在kvbuffer中的地址存放在偏移kvindex的距離  private static final int VALSTART = 0;         // val offset in acct  private static final int KEYSTART = 1;         // key offset in acct  // partition資訊存在kvmeta中偏移kvindex的距離  private static final int PARTITION = 2;        // partition offset in acct  private static final int VALLEN = 3;           // length of value  // 一對key value的meta數據在kvmeta中佔用的個數  private static final int NMETA = 4;            // num meta ints  // 一對key value的meta數據在kvmeta中佔用的byte數  private static final int METASIZE = NMETA * 4; // size in bytes

環形緩衝區其實是一個數組,數組中存放著key、value的序列化數據和key、value的元數據資訊,key/value的元數據存儲的格式是int類型,每個key/value對應一個元數據,元數據由4個int組成,第一個int存放value的起始位置,第二個存放key的起始位置,第三個存放partition,最後一個存放value的長度。

key/value序列化的數據和元數據在環形緩衝區中的存儲是由equator分隔的,key/value按照索引遞增的方向存儲,meta則按照索引遞減的方向存儲,將其數組抽象為一個環形結構之後,以equator為界,key/value順時針存儲,meta逆時針存儲

初始化

環形緩衝區的結構在MapOutputBuffer.init中創建。

public void init(MapOutputCollector.Context context                  ) throws IOException, ClassNotFoundException {  ...    //MAP_SORT_SPILL_PERCENT = mapreduce.map.sort.spill.percent    // map 端buffer所佔的百分比    //sanity checks    final float spillper =      job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);    //IO_SORT_MB = "mapreduce.task.io.sort.mb"    // map 端buffer大小    // mapreduce.task.io.sort.mb * mapreduce.map.sort.spill.percent 最好是16的整數倍    final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);    // 所有的spill index 在記憶體所佔的大小的閾值    indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,                                       INDEX_CACHE_MEMORY_LIMIT_DEFAULT);    ...    // 排序的實現類,可以自己實現。這裡用的是改寫的快排    sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",          QuickSort.class, IndexedSorter.class), job);    // buffers and accounting    // 上面IO_SORT_MB的單位是MB,左移20位將單位轉化為byte    int maxMemUsage = sortmb << 20;    // METASIZE是元數據的長度,元數據有4個int單元,分別為    // VALSTART、KEYSTART、PARTITION、VALLEN,而int為4個byte,    // 所以METASIZE長度為16。下面是計算buffer中最多有多少byte來存元數據    maxMemUsage -= maxMemUsage % METASIZE;    // 元數據數組  以byte為單位    kvbuffer = new byte[maxMemUsage];    bufvoid = kvbuffer.length;    // 將kvbuffer轉化為int型的kvmeta  以int為單位,也就是4byte    kvmeta = ByteBuffer.wrap(kvbuffer)       .order(ByteOrder.nativeOrder())       .asIntBuffer();    // 設置buf和kvmeta的分界線    setEquator(0);    bufstart = bufend = bufindex = equator;    kvstart = kvend = kvindex;    // kvmeta中存放元數據實體的最大個數    maxRec = kvmeta.capacity() / NMETA;    // buffer spill時的閾值(不單單是sortmb*spillper)    // 更加精確的是kvbuffer.length*spiller    softLimit = (int)(kvbuffer.length * spillper);    // 此變數較為重要,作為spill的動態衡量標準    bufferRemaining = softLimit;    ...    // k/v serialization    comparator = job.getOutputKeyComparator();    keyClass = (Class<K>)job.getMapOutputKeyClass();    valClass = (Class<V>)job.getMapOutputValueClass();    serializationFactory = new SerializationFactory(job);    keySerializer = serializationFactory.getSerializer(keyClass);    // 將bb作為key序列化寫入的output    keySerializer.open(bb);    valSerializer = serializationFactory.getSerializer(valClass);    // 將bb作為value序列化寫入的output    valSerializer.open(bb);    ...    // combiner    ...    spillInProgress = false;    // 最後一次merge時,在有combiner的情況下,超過此閾值才執行combiner    minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);    spillThread.setDaemon(true);    spillThread.setName("SpillThread");    spillLock.lock();    try {      spillThread.start();      while (!spillThreadRunning) {        spillDone.await();      }    } catch (InterruptedException e) {      throw new IOException("Spill thread failed to initialize", e);    } finally {      spillLock.unlock();    }    if (sortSpillException != null) {      throw new IOException("Spill thread failed to initialize",          sortSpillException);    }  }

init是對環形緩衝區進行初始化構造,由mapreduce.task.io.sort.mb決定map中環形緩衝區的大小sortmb,默認是100M。

此緩衝區也用於存放meta,一個meta佔用METASIZE(16byte),則其中用於存放數據的大小是maxMemUsage -= sortmb << 20 % METASIZE(由此可知最好設置sortmb轉換為byte之後是16的整數倍),然後用maxMemUsage初始化kvbuffer位元組數組kvmeta整形數組,最後設置數組的一些標識資訊。利用setEquator(0)設置kvbuffer和kvmeta的分界線,初始化的時候以0為分界線,kvindex為aligned – METASIZE + kvbuffer.length,其位置在環形數組中相當於按照逆時針方向減去METASIZE,由kvindex設置kvstart = kvend = kvindex,由equator設置bufstart = bufend = bufindex = equator,還得設置bufvoid = kvbuffer.length,bufvoid用於標識用於存放數據的最大位置。

為了提高效率,當buffer佔用達到閾值之後,會進行spill,這個閾值是由bufferRemaining進行檢查的,bufferRemaining由softLimit = (int)(kvbuffer.length * spillper); bufferRemaining = softLimit;進行初始化賦值,這裡需要注意的是softLimit並不是sortmb*spillper,而是kvbuffer.length * spillper,當sortmb << 20是16的整數倍時,才可以認為softLimit是sortmb*spillper。

下面是setEquator的程式碼

// setEquator(0)的程式碼如下  private void setEquator(int pos) {    equator = pos;    // set index prior to first entry, aligned at meta boundary    // 第一個 entry的末尾位置,即元數據和kv數據的分界線   單位是byte    final int aligned = pos - (pos % METASIZE);    // Cast one of the operands to long to avoid integer overflow    // 元數據中存放數據的起始位置    kvindex = (int)      (((long)aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;    LOG.info("(EQUATOR) " + pos + " kvi " + kvindex +        "(" + (kvindex * 4) + ")");  }

buffer初始化之後的抽象數據結構如下圖所示:

環形緩衝區數據結構圖

寫入buffer

Map通過NewOutputCollector.write方法調用collector.collect向buffer中寫入數據,數據寫入之前已在NewOutputCollector.write中對要寫入的數據進行逐條分區,下面看下collect

// MapOutputBuffer.collect  public synchronized void collect(K key, V value, final int partition                                   ) throws IOException {    ...    // 新數據collect時,先將剩餘的空間減去元數據的長度,之後進行判斷    bufferRemaining -= METASIZE;    if (bufferRemaining <= 0) {      // start spill if the thread is not running and the soft limit has been      // reached      spillLock.lock();      try {        do {          // 首次spill時,spillInProgress是false          if (!spillInProgress) {            // 得到kvindex的byte位置            final int kvbidx = 4 * kvindex;            // 得到kvend的byte位置            final int kvbend = 4 * kvend;            // serialized, unspilled bytes always lie between kvindex and            // bufindex, crossing the equator. Note that any void space            // created by a reset must be included in "used" bytes            final int bUsed = distanceTo(kvbidx, bufindex);            final boolean bufsoftlimit = bUsed >= softLimit;            if ((kvbend + METASIZE) % kvbuffer.length !=                equator - (equator % METASIZE)) {              // spill finished, reclaim space              resetSpill();              bufferRemaining = Math.min(                  distanceTo(bufindex, kvbidx) - 2 * METASIZE,                  softLimit - bUsed) - METASIZE;              continue;            } else if (bufsoftlimit && kvindex != kvend) {              // spill records, if any collected; check latter, as it may              // be possible for metadata alignment to hit spill pcnt              startSpill();              final int avgRec = (int)                (mapOutputByteCounter.getCounter() /                mapOutputRecordCounter.getCounter());              // leave at least half the split buffer for serialization data              // ensure that kvindex >= bufindex              final int distkvi = distanceTo(bufindex, kvbidx);              final int newPos = (bufindex +                Math.max(2 * METASIZE - 1,                        Math.min(distkvi / 2,                                 distkvi / (METASIZE + avgRec) * METASIZE)))                % kvbuffer.length;              setEquator(newPos);              bufmark = bufindex = newPos;              final int serBound = 4 * kvend;              // bytes remaining before the lock must be held and limits              // checked is the minimum of three arcs: the metadata space, the              // serialization space, and the soft limit              bufferRemaining = Math.min(                  // metadata max                  distanceTo(bufend, newPos),                  Math.min(                    // serialization max                    distanceTo(newPos, serBound),                    // soft limit                    softLimit)) - 2 * METASIZE;            }          }        } while (false);      } finally {        spillLock.unlock();      }    }    // 將key value 及元數據資訊寫入緩衝區    try {      // serialize key bytes into buffer      int keystart = bufindex;      // 將key序列化寫入kvbuffer中,並移動bufindex      keySerializer.serialize(key);      // key所佔空間被bufvoid分隔,則移動key,      // 將其值放在連續的空間中便於sort時key的對比      if (bufindex < keystart) {        // wrapped the key; must make contiguous        bb.shiftBufferedKey();        keystart = 0;      }      // serialize value bytes into buffer      final int valstart = bufindex;      valSerializer.serialize(value);      // It's possible for records to have zero length, i.e. the serializer      // will perform no writes. To ensure that the boundary conditions are      // checked and that the kvindex invariant is maintained, perform a      // zero-length write into the buffer. The logic monitoring this could be      // moved into collect, but this is cleaner and inexpensive. For now, it      // is acceptable.      bb.write(b0, 0, 0);        // the record must be marked after the preceding write, as the metadata      // for this record are not yet written      int valend = bb.markRecord();        mapOutputRecordCounter.increment(1);      mapOutputByteCounter.increment(          distanceTo(keystart, valend, bufvoid));        // write accounting info      kvmeta.put(kvindex + PARTITION, partition);      kvmeta.put(kvindex + KEYSTART, keystart);      kvmeta.put(kvindex + VALSTART, valstart);      kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend));      // advance kvindex      kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity();    } catch (MapBufferTooSmallException e) {      LOG.info("Record too large for in-memory buffer: " + e.getMessage());      spillSingleRecord(key, value, partition);      mapOutputRecordCounter.increment(1);      return;    }  }

每次寫入數據時,執行bufferRemaining -= METASIZE之後,檢查bufferRemaining

如果大於0,直接將key/value序列化對和對應的meta寫入buffer中,key/value是序列化之後寫入的,key/value經過一些列的方法調用Serializer.serialize(key/value) -> WritableSerializer.serialize(key/value) -> BytesWritable.write(dataOut) -> DataOutputStream.write(bytes, 0, size) -> MapOutputBuffer.Buffer.write(b, off, len),最後由MapOutputBuffer.Buffer.write(b, off, len)將數據寫入kvbuffer中,write方法如下:

public void write(byte b[], int off, int len)      throws IOException {    // must always verify the invariant that at least METASIZE bytes are    // available beyond kvindex, even when len == 0    bufferRemaining -= len;    if (bufferRemaining <= 0) {      // writing these bytes could exhaust available buffer space or fill      // the buffer to soft limit. check if spill or blocking are necessary      boolean blockwrite = false;      spillLock.lock();      try {        do {          checkSpillException();            final int kvbidx = 4 * kvindex;          final int kvbend = 4 * kvend;          // ser distance to key index          final int distkvi = distanceTo(bufindex, kvbidx);          // ser distance to spill end index          final int distkve = distanceTo(bufindex, kvbend);            // if kvindex is closer than kvend, then a spill is neither in          // progress nor complete and reset since the lock was held. The          // write should block only if there is insufficient space to          // complete the current write, write the metadata for this record,          // and write the metadata for the next record. If kvend is closer,          // then the write should block if there is too little space for          // either the metadata or the current write. Note that collect          // ensures its metadata requirement with a zero-length write          blockwrite = distkvi <= distkve            ? distkvi <= len + 2 * METASIZE            : distkve <= len || distanceTo(bufend, kvbidx) < 2 * METASIZE;            if (!spillInProgress) {            if (blockwrite) {              if ((kvbend + METASIZE) % kvbuffer.length !=                  equator - (equator % METASIZE)) {                // spill finished, reclaim space                // need to use meta exclusively; zero-len rec & 100% spill                // pcnt would fail                resetSpill(); // resetSpill doesn't move bufindex, kvindex                bufferRemaining = Math.min(                    distkvi - 2 * METASIZE,                    softLimit - distanceTo(kvbidx, bufindex)) - len;                continue;              }              // we have records we can spill; only spill if blocked              if (kvindex != kvend) {                startSpill();                // Blocked on this write, waiting for the spill just                // initiated to finish. Instead of repositioning the marker                // and copying the partial record, we set the record start                // to be the new equator                setEquator(bufmark);              } else {                // We have no buffered records, and this record is too large                // to write into kvbuffer. We must spill it directly from                // collect                final int size = distanceTo(bufstart, bufindex) + len;                setEquator(0);                bufstart = bufend = bufindex = equator;                kvstart = kvend = kvindex;                bufvoid = kvbuffer.length;                throw new MapBufferTooSmallException(size + " bytes");              }            }          }            if (blockwrite) {            // wait for spill            try {              while (spillInProgress) {                reporter.progress();                spillDone.await();              }            } catch (InterruptedException e) {                throw new IOException(                    "Buffer interrupted while waiting for the writer", e);            }          }        } while (blockwrite);      } finally {        spillLock.unlock();      }    }    // here, we know that we have sufficient space to write    if (bufindex + len > bufvoid) {      final int gaplen = bufvoid - bufindex;      System.arraycopy(b, off, kvbuffer, bufindex, gaplen);      len -= gaplen;      off += gaplen;      bufindex = 0;    }    System.arraycopy(b, off, kvbuffer, bufindex, len);    bufindex += len;  }

write方法將key/value寫入kvbuffer中,如果bufindex+len超過了bufvoid,則將寫入的內容分開存儲,將一部分寫入bufindex和bufvoid之間,然後重置bufindex,將剩餘的部分寫入,這裡不區分key和value,寫入key之後會在collect中判斷bufindex < keystart,當bufindex小時,則key被分開存儲,執行bb.shiftBufferedKey(),value則直接寫入,不用判斷是否被分開存儲,key不能分開存儲是因為要對key進行排序。

這裡需要注意的是要寫入的數據太長,並且kvinde==kvend,則拋出MapBufferTooSmallException異常,在collect中捕獲,將此數據直接spill到磁碟spillSingleRecord也就是當單條記錄過長時,不寫buffer,直接寫入磁碟

下面看下bb.shiftBufferedKey()程式碼

// BlockingBuffer.shiftBufferedKey  protected void shiftBufferedKey() throws IOException {    // spillLock unnecessary; both kvend and kvindex are current    int headbytelen = bufvoid - bufmark;    bufvoid = bufmark;    final int kvbidx = 4 * kvindex;    final int kvbend = 4 * kvend;    final int avail =      Math.min(distanceTo(0, kvbidx), distanceTo(0, kvbend));    if (bufindex + headbytelen < avail) {      System.arraycopy(kvbuffer, 0, kvbuffer, headbytelen, bufindex);      System.arraycopy(kvbuffer, bufvoid, kvbuffer, 0, headbytelen);      bufindex += headbytelen;      bufferRemaining -= kvbuffer.length - bufvoid;    } else {      byte[] keytmp = new byte[bufindex];      System.arraycopy(kvbuffer, 0, keytmp, 0, bufindex);      bufindex = 0;      out.write(kvbuffer, bufmark, headbytelen);      out.write(keytmp);    }  }

shiftBufferedKey時,判斷首部是否有足夠的空間存放key,有沒有足夠的空間,則先將首部的部分key寫入keytmp中,然後分兩次寫入,再次調用Buffer.write,如果有足夠的空間,分兩次copy,先將首部的部分key複製到headbytelen的位置,然後將末尾的部分key複製到首部,移動bufindex,重置bufferRemaining的值。

key/value寫入之後,繼續寫入元數據資訊並重置kvindex的值。

spill

一次寫入buffer結束,當寫入數據比較多,bufferRemaining小於等於0時,準備進行spill,首次spill,spillInProgress為false,此時查看bUsed = distanceTo(kvbidx, bufindex),此時bUsed >= softLimit 並且 (kvbend + METASIZE) % kvbuffer.length == equator - (equator % METASIZE),則進行spill,調用startSpill

private void startSpill() {    // 元數據的邊界賦值    kvend = (kvindex + NMETA) % kvmeta.capacity();    // key/value的邊界賦值    bufend = bufmark;    // 設置spill運行標識    spillInProgress = true;    ...    // 利用重入鎖,對spill執行緒進行喚醒    spillReady.signal();  }

startSpill喚醒spill執行緒之後,進程spill操作,但此時map向buffer的寫入操作並沒有阻塞,需要重新邊界equator和bufferRemaining的值,先來看下equator和bufferRemaining值的設定:

// 根據已經寫入的kv得出每個record的平均長度  final int avgRec = (int) (mapOutputByteCounter.getCounter() /    mapOutputRecordCounter.getCounter());  // leave at least half the split buffer for serialization data  // ensure that kvindex >= bufindex  // 得到空餘空間的大小  final int distkvi = distanceTo(bufindex, kvbidx);  // 得出新equator的位置  final int newPos = (bufindex +    Math.max(2 * METASIZE - 1,            Math.min(distkvi / 2,                     distkvi / (METASIZE + avgRec) * METASIZE)))    % kvbuffer.length;  setEquator(newPos);  bufmark = bufindex = newPos;  final int serBound = 4 * kvend;  // bytes remaining before the lock must be held and limits  // checked is the minimum of three arcs: the metadata space, the  // serialization space, and the soft limit  bufferRemaining = Math.min(      // metadata max      distanceTo(bufend, newPos),      Math.min(        // serialization max        distanceTo(newPos, serBound),        // soft limit        softLimit)) - 2 * METASIZE;

因為equator是kvbuffer和kvmeta的分界線,為了更多的空間存儲kv,則最多拿出distkvi的一半來存儲meta,並且利用avgRec估算distkvi能存放多少個record和meta對,根據record和meta對的個數估算meta所佔空間的大小,從distkvi/2和meta所佔空間的大小中取最小值,又因為distkvi中最少得存放一個meta,所佔空間為METASIZE,在選取kvindex時需要求aligned,aligned最多為METASIZE-1,總和上述因素,最終選取equator為(bufindex + Math.max(2 * METASIZE - 1, Math.min(distkvi / 2, distkvi / (METASIZE + avgRec) * METASIZE)))。equator選取之後,設置bufmark = bufindex = newPos和kvindex,但此時並不設置bufstart、bufend和kvstart、kvend,因為這幾個值要用來表示spill數據的邊界。

spill之後,可用的空間減少了,則控制spill的bufferRemaining也應該重新設置,bufferRemaining取三個值的最小值減去2*METASIZE,三個值分別是meta可用佔用的空間distanceTo(bufend, newPos),kv可用空間distanceTo(newPos, serBound)和softLimit。這裡為什麼要減去2*METASIZE,一個是spill之前kvend到kvindex的距離,另一個是當時的kvindex空間????此時,已有一個record要寫入buffer,需要從bufferRemaining中減去當前record的元數據佔用的空間,即減去METASIZE,另一個METASIZE是在計算equator時,沒有包括kvindex到kvend(spill之前)的這段METASIZE,所以要減去這個METASIZE。

接下來解析下SpillThread執行緒,查看其run方法:

public void run() { spillLock.lock(); spillThreadRunning = true; try { while (true) { spillDone.signal(); // 判斷是否在spill,false則掛起SpillThread執行緒,等待喚醒 while (!spillInProgress) { spillReady.await(); } try { spillLock.unlock(); // 喚醒之後,進行排序和溢寫到磁碟 sortAndSpill(); } catch (Throwable t) { sortSpillException = t; } finally { spillLock.lock(); if (bufend < bufstart) { bufvoid = kvbuffer.length; } kvstart = kvend; bufstart = bufend; spillInProgress = false; } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { spillLock.unlock(); spillThreadRunning = false; }}

run中主要是sortAndSpill

private void sortAndSpill() throws IOException, ClassNotFoundException,                                     InterruptedException {    //approximate the length of the output file to be the length of the    //buffer + header lengths for the partitions    final long size = distanceTo(bufstart, bufend, bufvoid) +                partitions * APPROX_HEADER_LENGTH;    FSDataOutputStream out = null;    try {      // create spill file      // 用來存儲index文件      final SpillRecord spillRec = new SpillRecord(partitions);      // 創建寫入磁碟的spill文件      final Path filename =          mapOutputFile.getSpillFileForWrite(numSpills, size);      // 打開文件流      out = rfs.create(filename);      // kvend/4 是截止到當前位置能存放多少個元數據實體      final int mstart = kvend / NMETA;      // kvstart 處能存放多少個元數據實體      // 元數據則在mstart和mend之間,(mstart - mend)則是元數據的個數      final int mend = 1 + // kvend is a valid record        (kvstart >= kvend        ? kvstart        : kvmeta.capacity() + kvstart) / NMETA;      // 排序  只對元數據進行排序,只調整元數據在kvmeta中的順序      // 排序規則是MapOutputBuffer.compare,      // 先對partition進行排序其次對key值排序      sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);      int spindex = mstart;      // 創建rec,用於存放該分區在數據文件中的資訊      final IndexRecord rec = new IndexRecord();      final InMemValBytes value = new InMemValBytes();      for (int i = 0; i < partitions; ++i) {        // 臨時文件是IFile格式的        IFile.Writer<K, V> writer = null;        try {          long segmentStart = out.getPos();          FSDataOutputStream partitionOut = CryptoUtils.wrapIfNecessary(job, out);          writer = new Writer<K, V>(job, partitionOut, keyClass, valClass, codec,                                    spilledRecordsCounter);          // 往磁碟寫數據時先判斷是否有combiner          if (combinerRunner == null) {            // spill directly            DataInputBuffer key = new DataInputBuffer();            // 寫入相同partition的數據            while (spindex < mend &&                kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {              final int kvoff = offsetFor(spindex % maxRec);              int keystart = kvmeta.get(kvoff + KEYSTART);              int valstart = kvmeta.get(kvoff + VALSTART);              key.reset(kvbuffer, keystart, valstart - keystart);              getVBytesForOffset(kvoff, value);              writer.append(key, value);              ++spindex;            }          } else {            int spstart = spindex;            while (spindex < mend &&                kvmeta.get(offsetFor(spindex % maxRec)                          + PARTITION) == i) {              ++spindex;            }            // Note: we would like to avoid the combiner if we've fewer            // than some threshold of records for a partition            if (spstart != spindex) {              combineCollector.setWriter(writer);              RawKeyValueIterator kvIter =                new MRResultIterator(spstart, spindex);              combinerRunner.combine(kvIter, combineCollector);            }          }            // close the writer          writer.close();            // record offsets          // 記錄當前partition i的資訊寫入索文件rec中          rec.startOffset = segmentStart;          rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job);          rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job);          // spillRec中存放了spill中partition的資訊,便於後續堆排序時,取出partition相關的數據進行排序          spillRec.putIndex(rec, i);            writer = null;        } finally {          if (null != writer) writer.close();        }      }      // 判斷記憶體中的index文件是否超出閾值,超出則將index文件寫入磁碟      // 當超出閾值時只是把當前index和之後的index寫入磁碟      if (totalIndexCacheMemory >= indexCacheMemoryLimit) {        // create spill index file        // 創建index文件        Path indexFilename =            mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions                * MAP_OUTPUT_INDEX_RECORD_LENGTH);        spillRec.writeToFile(indexFilename, job);      } else {        indexCacheList.add(spillRec);        totalIndexCacheMemory +=          spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;      }      LOG.info("Finished spill " + numSpills);      ++numSpills;    } finally {      if (out != null) out.close();    }  }

sortAndSpill中,有mstart和mend得到一共有多少條record需要spill到磁碟,調用sorter.sort對meta進行排序,先對partition進行排序,然後按key排序,排序的結果只調整meta的順序。

排序之後,判斷是否有combiner,沒有則直接將record寫入磁碟,寫入時是一個partition一個IndexRecord,如果有combiner,則將該partition的record寫入kvIter,然後調用combinerRunner.combine執行combiner。

寫入磁碟之後,將spillx.out對應的spillRec放入記憶體indexCacheList.add(spillRec),如果所佔記憶體totalIndexCacheMemory超過了indexCacheMemoryLimit,則創建index文件,將此次及以後的spillRec寫入index文件存入磁碟。

最後spill次數遞增。sortAndSpill結束之後,回到run方法中,執行finally中的程式碼,對kvstart和bufstart賦值,kvstart = kvendbufstart = bufend,設置spillInProgress的狀態為false。

在spill的同時,map往buffer的寫操作並沒有停止,依然在調用collect,再次回到collect方法中,

// MapOutputBuffer.collect  public synchronized void collect(K key, V value, final int partition                                   ) throws IOException {    ...    // 新數據collect時,先將剩餘的空間減去元數據的長度,之後進行判斷    bufferRemaining -= METASIZE;    if (bufferRemaining <= 0) {      // start spill if the thread is not running and the soft limit has been      // reached      spillLock.lock();      try {        do {          // 首次spill時,spillInProgress是false          if (!spillInProgress) {            // 得到kvindex的byte位置            final int kvbidx = 4 * kvindex;            // 得到kvend的byte位置            final int kvbend = 4 * kvend;            // serialized, unspilled bytes always lie between kvindex and            // bufindex, crossing the equator. Note that any void space            // created by a reset must be included in "used" bytes            final int bUsed = distanceTo(kvbidx, bufindex);            final boolean bufsoftlimit = bUsed >= softLimit;            if ((kvbend + METASIZE) % kvbuffer.length !=                equator - (equator % METASIZE)) {              // spill finished, reclaim space              resetSpill();              bufferRemaining = Math.min(                  distanceTo(bufindex, kvbidx) - 2 * METASIZE,                  softLimit - bUsed) - METASIZE;              continue;            } else if (bufsoftlimit && kvindex != kvend) {              ...            }          }        } while (false);      } finally {        spillLock.unlock();      }    }    ...  }

有新的record需要寫入buffer時,判斷bufferRemaining -= METASIZE,此時的bufferRemaining是在開始spill時被重置過的(此時的bufferRemaining應該比初始的softLimit要小),當bufferRemaining小於等最後一個METASIZE是當前record進入collect之後bufferRemaining減去的那個METASIZE。

private void resetSpill() {    final int e = equator;    bufstart = bufend = e;    final int aligned = e - (e % METASIZE);    // set start/end to point to first meta record    // Cast one of the operands to long to avoid integer overflow    kvstart = kvend = (int)      (((long)aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;    LOG.info("(RESET) equator " + e + " kv " + kvstart + "(" +      (kvstart * 4) + ")" + " kvi " + kvindex + "(" + (kvindex * 4) + ")");  }

當bufferRemaining再次小於等於0時,進行spill,這以後就都是套路了。環形緩衝區分析到此結束。