Spark Shuffle机制详细源码解析

Shuffle过程主要分为Shuffle write和Shuffle read两个阶段,2.0版本之后hash shuffle被删除,只保留sort shuffle,下面结合代码分析:

1.ShuffleManager

Spark在初始化SparkEnv的时候,会在create()方法里面初始化ShuffleManager

// Let the user specify short names for shuffle managers
    val shortShuffleMgrNames = Map(
      "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
      "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
    val shuffleMgrName = conf.get(config.SHUFFLE_MANAGER)
    val shuffleMgrClass =
      shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)
    val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

这里可以看到包含sort和tungsten-sort两种shuffle,通过反射创建了ShuffleManager,ShuffleManager是一个特质,核心方法有下面几个:

private[spark] trait ShuffleManager {

  /**
   * 注册一个shuffle返回句柄
   */
  def registerShuffle[K, V, C](
      shuffleId: Int,
      dependency: ShuffleDependency[K, V, C]): ShuffleHandle

  /** 获取一个Writer根据给定的分区,在executors执行map任务时被调用 */
  def getWriter[K, V](
      handle: ShuffleHandle,
      mapId: Long,
      context: TaskContext,
      metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V]

  /**
   * 获取一个Reader根据reduce分区的范围,在executors执行reduce任务时被调用
   */
  def getReader[K, C](
      handle: ShuffleHandle,
      startPartition: Int,
      endPartition: Int,
      context: TaskContext,
      metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C]
	...
}

2.SortShuffleManager

SortShuffleManager是ShuffleManager的唯一实现类,对于以上三个方法的实现如下:

2.1 registerShuffle

/**
   * Obtains a [[ShuffleHandle]] to pass to tasks.
   */
  override def registerShuffle[K, V, C](
      shuffleId: Int,
      dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
    // 1.首先检查是否符合BypassMergeSort
    if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
      // If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
      // need map-side aggregation, then write numPartitions files directly and just concatenate
      // them at the end. This avoids doing serialization and deserialization twice to merge
      // together the spilled files, which would happen with the normal code path. The downside is
      // having multiple files open at a time and thus more memory allocated to buffers.
      new BypassMergeSortShuffleHandle[K, V](
        shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
      // 2.否则检查是否能够序列化
    } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
      // Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:
      new SerializedShuffleHandle[K, V](
        shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
    } else {
      // Otherwise, buffer map outputs in a deserialized form:
      new BaseShuffleHandle(shuffleId, dependency)
    }
  }

1.首先检查是否符合BypassMergeSort,这里需要满足两个条件,首先是当前shuffle依赖中没有map端的聚合操作,其次是分区数要小于spark.shuffle.sort.bypassMergeThreshold的值,默认为200,如果满足这两个条件,会返回BypassMergeSortShuffleHandle,启用bypass merge-sort shuffle机制

def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {
  // We cannot bypass sorting if we need to do map-side aggregation.
  if (dep.mapSideCombine) {
    false
  } else {
    // 默认值为200
    val bypassMergeThreshold: Int = conf.get(config.SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD)
    dep.partitioner.numPartitions <= bypassMergeThreshold
  }
}

2.如果不满足上面条件,检查是否满足canUseSerializedShuffle()方法,如果满足该方法中的3个条件,则会返回SerializedShuffleHandle,启用tungsten-sort shuffle机制

def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = {
  val shufId = dependency.shuffleId
  val numPartitions = dependency.partitioner.numPartitions
  // 序列化器需要支持Relocation
  if (!dependency.serializer.supportsRelocationOfSerializedObjects) {
    log.debug(s"Can't use serialized shuffle for shuffle $shufId because the serializer, " +
      s"${dependency.serializer.getClass.getName}, does not support object relocation")
    false
    // 不能有map端聚合操作
  } else if (dependency.mapSideCombine) {
    log.debug(s"Can't use serialized shuffle for shuffle $shufId because we need to do " +
      s"map-side aggregation")
    false
    // 分区数不能大于16777215+1
  } else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {
    log.debug(s"Can't use serialized shuffle for shuffle $shufId because it has more than " +
      s"$MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE partitions")
    false
  } else {
    log.debug(s"Can use serialized shuffle for shuffle $shufId")
    true
  }
}

3.如果以上两个条件都不满足的话,会返回BaseShuffleHandle,采用基本sort shuffle机制

2.2 getReader

/**
 * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive).
 * Called on executors by reduce tasks.
 */
override def getReader[K, C](
    handle: ShuffleHandle,
    startPartition: Int,
    endPartition: Int,
    context: TaskContext,
    metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
  val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(
    handle.shuffleId, startPartition, endPartition)
  new BlockStoreShuffleReader(
    handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics,
    shouldBatchFetch = canUseBatchFetch(startPartition, endPartition, context))
}

这里返回BlockStoreShuffleReader

2.3 getWriter

/** Get a writer for a given partition. Called on executors by map tasks. */
override def getWriter[K, V](
    handle: ShuffleHandle,
    mapId: Long,
    context: TaskContext,
    metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] = {
  val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent(
    handle.shuffleId, _ => new OpenHashSet[Long](16))
  mapTaskIds.synchronized { mapTaskIds.add(context.taskAttemptId()) }
  val env = SparkEnv.get
  // 根据handle获取不同ShuffleWrite
  handle match {
    case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
      new UnsafeShuffleWriter(
        env.blockManager,
        context.taskMemoryManager(),
        unsafeShuffleHandle,
        mapId,
        context,
        env.conf,
        metrics,
        shuffleExecutorComponents)
    case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
      new BypassMergeSortShuffleWriter(
        env.blockManager,
        bypassMergeSortHandle,
        mapId,
        env.conf,
        metrics,
        shuffleExecutorComponents)
    case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
      new SortShuffleWriter(
        shuffleBlockResolver, other, mapId, context, shuffleExecutorComponents)
  }
}

这里会根据handle获取不同ShuffleWrite,如果是SerializedShuffleHandle,使用UnsafeShuffleWriter,如果是BypassMergeSortShuffleHandle,采用BypassMergeSortShuffleWriter,否则使用SortShuffleWriter

3.三种Writer的实现

如上文所说,当开启bypass机制后,会使用BypassMergeSortShuffleWriter,如果serializer支持relocation并且map端没有聚合同时分区数目不大于16777215+1三个条件都满足,使用UnsafeShuffleWriter,否则使用SortShuffleWriter

3.1 BypassMergeSortShuffleWriter

BypassMergeSortShuffleWriter继承ShuffleWriter,用java实现,会将map端的多个输出文件合并为一个文件,同时生成一个索引文件,索引记录到每个分区的初始地址,write()方法如下:

@Override
public void write(Iterator<Product2<K, V>> records) throws IOException {
  assert (partitionWriters == null);
  // 新建一个ShuffleMapOutputWriter
  ShuffleMapOutputWriter mapOutputWriter = shuffleExecutorComponents
  .createMapOutputWriter(shuffleId, mapId, numPartitions);
  try {
    // 如果没有数据的话
    if (!records.hasNext()) {
      // 返回所有分区的写入长度
      partitionLengths = mapOutputWriter.commitAllPartitions();
      // 更新mapStatus
      mapStatus = MapStatus$.MODULE$.apply(
        blockManager.shuffleServerId(), partitionLengths, mapId);
      return;
    }
    final SerializerInstance serInstance = serializer.newInstance();
    final long openStartTime = System.nanoTime();
    // 创建和分区数相等的DiskBlockObjectWriter FileSegment
    partitionWriters = new DiskBlockObjectWriter[numPartitions];
    partitionWriterSegments = new FileSegment[numPartitions];
    // 对于每个分区
    for (int i = 0; i < numPartitions; i++) {
      // 创建一个临时的block
      final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
      blockManager.diskBlockManager().createTempShuffleBlock();
      // 获取temp block的file和id
      final File file = tempShuffleBlockIdPlusFile._2();
      final BlockId blockId = tempShuffleBlockIdPlusFile._1();
      // 对于每个分区,创建一个DiskBlockObjectWriter
      partitionWriters[i] =
      blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);
    }
    // Creating the file to write to and creating a disk writer both involve interacting with
    // the disk, and can take a long time in aggregate when we open many files, so should be
    // included in the shuffle write time.
    // 创建文件和写入文件都需要大量时间,也需要包含在shuffle写入时间里面
    writeMetrics.incWriteTime(System.nanoTime() - openStartTime);

    // 如果有数据的话
    while (records.hasNext()) {
      final Product2<K, V> record = records.next();
      final K key = record._1();
      // 对于每条数据按key写入相应分区对应的文件
      partitionWriters[partitioner.getPartition(key)].write(key, record._2());
    }

    for (int i = 0; i < numPartitions; i++) {
      try (DiskBlockObjectWriter writer = partitionWriters[i]) {
        // 提交
        partitionWriterSegments[i] = writer.commitAndGet();
      }
    }

    // 将所有分区文件合并成一个文件
    partitionLengths = writePartitionedData(mapOutputWriter);
    // 更新mapStatus
    mapStatus = MapStatus$.MODULE$.apply(
      blockManager.shuffleServerId(), partitionLengths, mapId);
  } catch (Exception e) {
    try {
      mapOutputWriter.abort(e);
    } catch (Exception e2) {
      logger.error("Failed to abort the writer after failing to write map output.", e2);
      e.addSuppressed(e2);
    }
    throw e;
  }
}

合并文件的方法writePartitionedData()如下,默认采用零拷贝的方式来合并文件:

private long[] writePartitionedData(ShuffleMapOutputWriter mapOutputWriter) throws IOException {
  // Track location of the partition starts in the output file
  if (partitionWriters != null) {
    // 开始时间
    final long writeStartTime = System.nanoTime();
    try {
      for (int i = 0; i < numPartitions; i++) {
        // 获取每个文件
        final File file = partitionWriterSegments[i].file();
        ShufflePartitionWriter writer = mapOutputWriter.getPartitionWriter(i);
        if (file.exists()) {
          // 采取零拷贝方式
          if (transferToEnabled) {
            // Using WritableByteChannelWrapper to make resource closing consistent between
            // this implementation and UnsafeShuffleWriter.
            Optional<WritableByteChannelWrapper> maybeOutputChannel = writer.openChannelWrapper();
            // 在这里会调用Utils.copyFileStreamNIO方法,最终调用FileChannel.transferTo方法拷贝文件
            if (maybeOutputChannel.isPresent()) {
              writePartitionedDataWithChannel(file, maybeOutputChannel.get());
            } else {
              writePartitionedDataWithStream(file, writer);
            }
          } else {
            // 否则采取流的方式拷贝
            writePartitionedDataWithStream(file, writer);
          }
          if (!file.delete()) {
            logger.error("Unable to delete file for partition {}", i);
          }
        }
      }
    } finally {
      writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
    }
    partitionWriters = null;
  }
  return mapOutputWriter.commitAllPartitions();
}

3.2 UnsafeShuffleWriter

UnsafeShuffleWriter也是继承ShuffleWriter,用java实现,write方法如下:

@Override
public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
  // Keep track of success so we know if we encountered an exception
  // We do this rather than a standard try/catch/re-throw to handle
  // generic throwables.
  // 跟踪异常
  boolean success = false;
  try {
    while (records.hasNext()) {
      // 将数据插入ShuffleExternalSorter进行外部排序
      insertRecordIntoSorter(records.next());
    }
    // 合并并输出文件
    closeAndWriteOutput();
    success = true;
  } finally {
    if (sorter != null) {
      try {
        sorter.cleanupResources();
      } catch (Exception e) {
        // Only throw this error if we won't be masking another
        // error.
        if (success) {
          throw e;
        } else {
          logger.error("In addition to a failure during writing, we failed during " +
                       "cleanup.", e);
        }
      }
    }
  }
}

这里主要有两个方法:

3.2.1 insertRecordIntoSorter()

@VisibleForTesting
void insertRecordIntoSorter(Product2<K, V> record) throws IOException {
  assert(sorter != null);
  // 获取key和分区
  final K key = record._1();
  final int partitionId = partitioner.getPartition(key);
  // 重置缓冲区
  serBuffer.reset();
  // 将key和value写入缓冲区
  serOutputStream.writeKey(key, OBJECT_CLASS_TAG);
  serOutputStream.writeValue(record._2(), OBJECT_CLASS_TAG);
  serOutputStream.flush();

  // 获取序列化数据大小
  final int serializedRecordSize = serBuffer.size();
  assert (serializedRecordSize > 0);

  // 将序列化后的数据插入ShuffleExternalSorter处理
  sorter.insertRecord(
    serBuffer.getBuf(), Platform.BYTE_ARRAY_OFFSET, serializedRecordSize, partitionId);
}

该方法会将数据进行序列化,并且将序列化后的数据通过insertRecord()方法插入外部排序器中,insertRecord()方法如下:

public void insertRecord(Object recordBase, long recordOffset, int length, int partitionId)
  throws IOException {

  // for tests
  assert(inMemSorter != null);
  // 如果数据条数超过溢写阈值,直接溢写磁盘
  if (inMemSorter.numRecords() >= numElementsForSpillThreshold) {
    logger.info("Spilling data because number of spilledRecords crossed the threshold " +
      numElementsForSpillThreshold);
    spill();
  }

  // Checks whether there is enough space to insert an additional record in to the sort pointer
  // array and grows the array if additional space is required. If the required space cannot be
  // obtained, then the in-memory data will be spilled to disk.
  // 检查是否有足够的空间插入额外的记录到排序指针数组中,如果需要额外的空间对数组进行扩容,如果空间不够,内存中的数据将会被溢写到磁盘上
  growPointerArrayIfNecessary();
  final int uaoSize = UnsafeAlignedOffset.getUaoSize();
  // Need 4 or 8 bytes to store the record length.
  // 需要额外的4或8个字节存储数据长度
  final int required = length + uaoSize;
  // 如果需要更多的内存,会想TaskMemoryManager申请新的page
  acquireNewPageIfNecessary(required);

  assert(currentPage != null);
  final Object base = currentPage.getBaseObject();
  //Given a memory page and offset within that page, encode this address into a 64-bit long.
  //This address will remain valid as long as the corresponding page has not been freed.
  // 通过给定的内存页和偏移量,将当前数据的逻辑地址编码成一个long型
  final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor);
  // 写长度值
  UnsafeAlignedOffset.putSize(base, pageCursor, length);
  // 移动指针
  pageCursor += uaoSize;
  // 写数据
  Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length);
  // 移动指针
  pageCursor += length;
  // 将编码的逻辑地址和分区id传给ShuffleInMemorySorter进行排序
  inMemSorter.insertRecord(recordAddress, partitionId);
}

在这里对于数据的缓存和溢写不借助于其他高级数据结构,而是直接操作内存空间

growPointerArrayIfNecessary()方法如下:

/**
 * Checks whether there is enough space to insert an additional record in to the sort pointer
 * array and grows the array if additional space is required. If the required space cannot be
 * obtained, then the in-memory data will be spilled to disk.
 */
private void growPointerArrayIfNecessary() throws IOException {
  assert(inMemSorter != null);
  // 如果没有空间容纳新的数据
  if (!inMemSorter.hasSpaceForAnotherRecord()) {
    // 获取当前内存使用量
    long used = inMemSorter.getMemoryUsage();
    LongArray array;
    try {
      // could trigger spilling
      // 分配给缓存原来两倍的容量
      array = allocateArray(used / 8 * 2);
    } catch (TooLargePageException e) {
      // The pointer array is too big to fix in a single page, spill.
      // 如果超出了一页的大小,直接溢写,溢写方法见后面
      // 一页的大小为128M,在PackedRecordPointer类中
      // static final int MAXIMUM_PAGE_SIZE_BYTES = 1 << 27;  // 128 megabytes
      spill();
      return;
    } catch (SparkOutOfMemoryError e) {
      // should have trigger spilling
      if (!inMemSorter.hasSpaceForAnotherRecord()) {
        logger.error("Unable to grow the pointer array");
        throw e;
      }
      return;
    }
    // check if spilling is triggered or not
    if (inMemSorter.hasSpaceForAnotherRecord()) {
      // 如果有了剩余空间,则表明没必要扩容,释放分配的空间
      freeArray(array);
    } else {
      // 否则把原来的数组复制到新的数组
      inMemSorter.expandPointerArray(array);
    }
  }
}

spill()方法如下:

@Override
public long spill(long size, MemoryConsumer trigger) throws IOException {
  if (trigger != this || inMemSorter == null || inMemSorter.numRecords() == 0) {
    return 0L;
  }

  logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)",
    Thread.currentThread().getId(),
    Utils.bytesToString(getMemoryUsage()),
    spills.size(),
    spills.size() > 1 ? " times" : " time");

  // Sorts the in-memory records and writes the sorted records to an on-disk file.
  // This method does not free the sort data structures.
  // 对内存中的数据进行排序并且将有序记录写到一个磁盘文件中,这个方法不会释放排序的数据结构
  writeSortedFile(false);
  final long spillSize = freeMemory();
  // 重置ShuffleInMemorySorter
  inMemSorter.reset();
  // Reset the in-memory sorter's pointer array only after freeing up the memory pages holding the
  // records. Otherwise, if the task is over allocated memory, then without freeing the memory
  // pages, we might not be able to get memory for the pointer array.
  taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
  return spillSize;
}

writeSortedFile()方法:

private void writeSortedFile(boolean isLastFile) {

  // This call performs the actual sort.
  // 返回一个排序好的迭代器
  final ShuffleInMemorySorter.ShuffleSorterIterator sortedRecords =
    inMemSorter.getSortedIterator();

  // If there are no sorted records, so we don't need to create an empty spill file.
  if (!sortedRecords.hasNext()) {
    return;
  }

  final ShuffleWriteMetricsReporter writeMetricsToUse;

  // 如果为true,则为输出文件,否则为溢写文件
  if (isLastFile) {
    // We're writing the final non-spill file, so we _do_ want to count this as shuffle bytes.
    writeMetricsToUse = writeMetrics;
  } else {
    // We're spilling, so bytes written should be counted towards spill rather than write.
    // Create a dummy WriteMetrics object to absorb these metrics, since we don't want to count
    // them towards shuffle bytes written.
    writeMetricsToUse = new ShuffleWriteMetrics();
  }

  // Small writes to DiskBlockObjectWriter will be fairly inefficient. Since there doesn't seem to
  // be an API to directly transfer bytes from managed memory to the disk writer, we buffer
  // data through a byte array. This array does not need to be large enough to hold a single
  // record;
  // 创建一个字节缓冲数组,大小为1m
  final byte[] writeBuffer = new byte[diskWriteBufferSize];

  // Because this output will be read during shuffle, its compression codec must be controlled by
  // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
  // createTempShuffleBlock here; see SPARK-3426 for more details.
  // 创建一个临时的shuffle block
  final Tuple2<TempShuffleBlockId, File> spilledFileInfo =
    blockManager.diskBlockManager().createTempShuffleBlock();
  // 获取文件和id
  final File file = spilledFileInfo._2();
  final TempShuffleBlockId blockId = spilledFileInfo._1();
  final SpillInfo spillInfo = new SpillInfo(numPartitions, file, blockId);

  // Unfortunately, we need a serializer instance in order to construct a DiskBlockObjectWriter.
  // Our write path doesn't actually use this serializer (since we end up calling the `write()`
  // OutputStream methods), but DiskBlockObjectWriter still calls some methods on it. To work
  // around this, we pass a dummy no-op serializer.
  // 不做任何转换的序列化器,因为需要一个实例来构造DiskBlockObjectWriter
  final SerializerInstance ser = DummySerializerInstance.INSTANCE;

  int currentPartition = -1;
  final FileSegment committedSegment;
  try (DiskBlockObjectWriter writer =
      blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, writeMetricsToUse)) {

    final int uaoSize = UnsafeAlignedOffset.getUaoSize();
    // 遍历
    while (sortedRecords.hasNext()) {
      sortedRecords.loadNext();
      final int partition = sortedRecords.packedRecordPointer.getPartitionId();
      assert (partition >= currentPartition);
      if (partition != currentPartition) {
        // Switch to the new partition
        // 如果切换到了新的分区,提交当前分区,并且记录当前分区大小
        if (currentPartition != -1) {
          final FileSegment fileSegment = writer.commitAndGet();
          spillInfo.partitionLengths[currentPartition] = fileSegment.length();
        }
        // 然后切换到下一个分区
        currentPartition = partition;
      }

      // 获取指针,通过指针获取页号和偏移量
      final long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer();
      final Object recordPage = taskMemoryManager.getPage(recordPointer);
      final long recordOffsetInPage = taskMemoryManager.getOffsetInPage(recordPointer);
      // 获取剩余数据
      int dataRemaining = UnsafeAlignedOffset.getSize(recordPage, recordOffsetInPage);
      // 跳过数据前面存储的长度
      long recordReadPosition = recordOffsetInPage + uaoSize; // skip over record length
      while (dataRemaining > 0) {
        final int toTransfer = Math.min(diskWriteBufferSize, dataRemaining);
        // 将数据拷贝到缓冲数组中
        Platform.copyMemory(
          recordPage, recordReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer);
        // 从缓冲数组中转入DiskBlockObjectWriter
        writer.write(writeBuffer, 0, toTransfer);
        // 更新位置
        recordReadPosition += toTransfer;
        // 更新剩余数据
        dataRemaining -= toTransfer;
      }
      writer.recordWritten();
    }

    // 提交
    committedSegment = writer.commitAndGet();
  }
  // If `writeSortedFile()` was called from `closeAndGetSpills()` and no records were inserted,
  // then the file might be empty. Note that it might be better to avoid calling
  // writeSortedFile() in that case.
  // 记录溢写文件的列表
  if (currentPartition != -1) {
    spillInfo.partitionLengths[currentPartition] = committedSegment.length();
    spills.add(spillInfo);
  }

  // 如果是溢写文件,更新溢写的指标
  if (!isLastFile) {  
    writeMetrics.incRecordsWritten(
      ((ShuffleWriteMetrics)writeMetricsToUse).recordsWritten());
    taskContext.taskMetrics().incDiskBytesSpilled(
      ((ShuffleWriteMetrics)writeMetricsToUse).bytesWritten());
  }
}

encodePageNumberAndOffset()方法如下:

public long encodePageNumberAndOffset(MemoryBlock page, long offsetInPage) {
  // 如果开启了堆外内存,偏移量为绝对地址,可能需要64位进行编码,由于页大小限制,将其减去当前页的基地址,变为相对地址
  if (tungstenMemoryMode == MemoryMode.OFF_HEAP) {
    // In off-heap mode, an offset is an absolute address that may require a full 64 bits to
    // encode. Due to our page size limitation, though, we can convert this into an offset that's
    // relative to the page's base offset; this relative offset will fit in 51 bits.
    offsetInPage -= page.getBaseOffset();
  }
  return encodePageNumberAndOffset(page.pageNumber, offsetInPage);
}

@VisibleForTesting
public static long encodePageNumberAndOffset(int pageNumber, long offsetInPage) {
  assert (pageNumber >= 0) : "encodePageNumberAndOffset called with invalid page";
  // 高13位为页号,低51位为偏移量
  // 页号左移51位,再拼偏移量和上一个低51位都为1的掩码0x7FFFFFFFFFFFFL
  return (((long) pageNumber) << OFFSET_BITS) | (offsetInPage & MASK_LONG_LOWER_51_BITS);
}

ShuffleInMemorySorter的insertRecord()方法如下:

public void insertRecord(long recordPointer, int partitionId) {
  if (!hasSpaceForAnotherRecord()) {
    throw new IllegalStateException("There is no space for new record");
  }
  array.set(pos, PackedRecordPointer.packPointer(recordPointer, partitionId));
  pos++;
}

PackedRecordPointer.packPointer()方法:

public static long packPointer(long recordPointer, int partitionId) {
  assert (partitionId <= MAXIMUM_PARTITION_ID);
  // Note that without word alignment we can address 2^27 bytes = 128 megabytes per page.
  // Also note that this relies on some internals of how TaskMemoryManager encodes its addresses.
  // 将页号右移24位,和低27位拼在一起,这样逻辑地址被压缩成40位
  final long pageNumber = (recordPointer & MASK_LONG_UPPER_13_BITS) >>> 24;
  final long compressedAddress = pageNumber | (recordPointer & MASK_LONG_LOWER_27_BITS);
  // 将分区号放在高24位上
  return (((long) partitionId) << 40) | compressedAddress;
}

getSortedIterator()方法:

public ShuffleSorterIterator getSortedIterator() {
  int offset = 0;
  // 使用基数排序对内存分区ID进行排序。基数排序要快得多,但是在添加指针时需要额外的内存作为保留内存
  if (useRadixSort) {
    offset = RadixSort.sort(
      array, pos,
      PackedRecordPointer.PARTITION_ID_START_BYTE_INDEX,
      PackedRecordPointer.PARTITION_ID_END_BYTE_INDEX, false, false);
    // 否则采用timSort排序
  } else {
    MemoryBlock unused = new MemoryBlock(
      array.getBaseObject(),
      array.getBaseOffset() + pos * 8L,
      (array.size() - pos) * 8L);
    LongArray buffer = new LongArray(unused);
    Sorter<PackedRecordPointer, LongArray> sorter =
      new Sorter<>(new ShuffleSortDataFormat(buffer));

    sorter.sort(array, 0, pos, SORT_COMPARATOR);
  }
  return new ShuffleSorterIterator(pos, array, offset);
}

3.2.2 closeAndWriteOutput()

@VisibleForTesting
void closeAndWriteOutput() throws IOException {
  assert(sorter != null);
  updatePeakMemoryUsed();
  serBuffer = null;
  serOutputStream = null;
  // 获取溢写文件
  final SpillInfo[] spills = sorter.closeAndGetSpills();
  sorter = null;
  final long[] partitionLengths;
  try {
    // 合并溢写文件
    partitionLengths = mergeSpills(spills);
  } finally {
    // 删除溢写文件
    for (SpillInfo spill : spills) {
      if (spill.file.exists() && !spill.file.delete()) {
        logger.error("Error while deleting spill file {}", spill.file.getPath());
      }
    }
  }
  // 更新mapstatus
  mapStatus = MapStatus$.MODULE$.apply(
    blockManager.shuffleServerId(), partitionLengths, mapId);
}

mergeSpills()方法:

private long[] mergeSpills(SpillInfo[] spills) throws IOException {
  long[] partitionLengths;
  // 如果没有溢写文件,创建空的
  if (spills.length == 0) {
    final ShuffleMapOutputWriter mapWriter = shuffleExecutorComponents
        .createMapOutputWriter(shuffleId, mapId, partitioner.numPartitions());
    return mapWriter.commitAllPartitions();
    // 如果只有一个溢写文件,将它合并输出
  } else if (spills.length == 1) {
    Optional<SingleSpillShuffleMapOutputWriter> maybeSingleFileWriter =
        shuffleExecutorComponents.createSingleFileMapOutputWriter(shuffleId, mapId);
    if (maybeSingleFileWriter.isPresent()) {
      // Here, we don't need to perform any metrics updates because the bytes written to this
      // output file would have already been counted as shuffle bytes written.
      partitionLengths = spills[0].partitionLengths;
      maybeSingleFileWriter.get().transferMapSpillFile(spills[0].file, partitionLengths);
    } else {
      partitionLengths = mergeSpillsUsingStandardWriter(spills);
    }
    // 如果有多个,合并输出,合并的时候有NIO和BIO两种方式
  } else {
    partitionLengths = mergeSpillsUsingStandardWriter(spills);
  }
  return partitionLengths;
}

3.3 SortShuffleWriter

SortShuffleWriter会使用PartitionedAppendOnlyMap或PartitionedPariBuffer在内存中进行排序,如果超过内存限制,会溢写到文件中,在全局输出有序文件的时候,对之前的所有输出文件和当前内存中的数据进行全局归并排序,对key相同的元素会使用定义的function进行聚合,入口为write()方法:

override def write(records: Iterator[Product2[K, V]]): Unit = {
  // 创建一个外部排序器,如果map端有预聚合,就传入aggregator和keyOrdering,否则不需要传入
  sorter = if (dep.mapSideCombine) {
    new ExternalSorter[K, V, C](
      context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
  } else {
    // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
    // care whether the keys get sorted in each partition; that will be done on the reduce side
    // if the operation being run is sortByKey.
    new ExternalSorter[K, V, V](
      context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
  }
  // 将数据放入ExternalSorter进行排序
  sorter.insertAll(records)

  // Don't bother including the time to open the merged output file in the shuffle write time,
  // because it just opens a single file, so is typically too fast to measure accurately
  // (see SPARK-3570).
  // 创建一个输出Wrtier
  val mapOutputWriter = shuffleExecutorComponents.createMapOutputWriter(
    dep.shuffleId, mapId, dep.partitioner.numPartitions)
  // 将外部排序的数据写入Writer
  sorter.writePartitionedMapOutput(dep.shuffleId, mapId, mapOutputWriter)
  val partitionLengths = mapOutputWriter.commitAllPartitions()
  // 更新mapstatus
  mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId)
}

insertAll()方法:

def insertAll(records: Iterator[Product2[K, V]]): Unit = {
  // TODO: stop combining if we find that the reduction factor isn't high
  val shouldCombine = aggregator.isDefined

  // 是否需要map端聚合
  if (shouldCombine) {
    // Combine values in-memory first using our AppendOnlyMap
    // 使用AppendOnlyMap在内存中聚合values
    // 获取mergeValue()函数,将新值合并到当前聚合结果中
    val mergeValue = aggregator.get.mergeValue
    // 获取createCombiner()函数,创建聚合初始值
    val createCombiner = aggregator.get.createCombiner
    var kv: Product2[K, V] = null
    // 如果一个key当前有聚合值,则合并,如果没有创建初始值
    val update = (hadValue: Boolean, oldValue: C) => {
      if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
    }
    // 遍历
    while (records.hasNext) {
      // 增加读取记录数
      addElementsRead()
      kv = records.next()
      // map为PartitionedAppendOnlyMap,将分区和key作为key,聚合值作为value
      map.changeValue((getPartition(kv._1), kv._1), update)
      // 是否需要溢写到磁盘
      maybeSpillCollection(usingMap = true)
    }
    // 如果不需要map端聚合
  } else {
    // Stick values into our buffer
    while (records.hasNext) {
      addElementsRead()
      val kv = records.next()
      // buffer为PartitionedPairBuffer,将分区和key加进去
      buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
      // 是否需要溢写到磁盘
      maybeSpillCollection(usingMap = false)
    }
  }
}

该方法主要是判断在插入数据时,是否需要在map端进行预聚合,分别采用两种数据结构来保存

maybeSpillCollection()方法里面会调用maybeSpill()方法检查是否需要溢写,如果发生溢写,重新构造一个map或者buffer结构从头开始缓存,如下:

private def maybeSpillCollection(usingMap: Boolean): Unit = {
  var estimatedSize = 0L
  if (usingMap) {
    estimatedSize = map.estimateSize()
    // 判断是否需要溢写
    if (maybeSpill(map, estimatedSize)) {
      map = new PartitionedAppendOnlyMap[K, C]
    }
  } else {
    estimatedSize = buffer.estimateSize()
    // 判断是否需要溢写
    if (maybeSpill(buffer, estimatedSize)) {
      buffer = new PartitionedPairBuffer[K, C]
    }
  }

  if (estimatedSize > _peakMemoryUsedBytes) {
    _peakMemoryUsedBytes = estimatedSize
  }
}


  protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
    var shouldSpill = false
    // 如果读取的记录数是32的倍数,并且预估map或者buffer内存占用大于默认的5m阈值
    if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
      // Claim up to double our current memory from the shuffle memory pool
      // 尝试申请2*currentMemory-5m的内存
      val amountToRequest = 2 * currentMemory - myMemoryThreshold
      val granted = acquireMemory(amountToRequest)
      // 更新阈值
      myMemoryThreshold += granted
      // If we were granted too little memory to grow further (either tryToAcquire returned 0,
      // or we already had more memory than myMemoryThreshold), spill the current collection
      // 判断,如果还是不够,确定溢写
      shouldSpill = currentMemory >= myMemoryThreshold
    }
    // 如果shouldSpill为false,但是读取的记录数大于Integer.MAX_VALUE,也是需要溢写
    shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
    // Actually spill
    if (shouldSpill) {
      // 溢写次数+1
      _spillCount += 1
      logSpillage(currentMemory)
      // 溢写缓存的集合
      spill(collection)
      _elementsRead = 0
      _memoryBytesSpilled += currentMemory
      // 释放内存
      releaseMemory()
    }
    shouldSpill
  }

maybeSpill()方法里面会调用spill()进行溢写,如下:

  override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = {
    // 根据给定的比较器进行排序,返回排序结果的迭代器
    val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)
    // 将迭代器中的数据溢写到磁盘文件中
    val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)
    // ArrayBuffer记录所有溢写的文件
    spills += spillFile
  }

spillMemoryIteratorToDisk()方法如下:

private[this] def spillMemoryIteratorToDisk(inMemoryIterator: WritablePartitionedIterator)
    : SpilledFile = {
  // Because these files may be read during shuffle, their compression must be controlled by
  // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
  // createTempShuffleBlock here; see SPARK-3426 for more context.
  // 创建一个临时块
  val (blockId, file) = diskBlockManager.createTempShuffleBlock()

  // These variables are reset after each flush
  var objectsWritten: Long = 0
  val spillMetrics: ShuffleWriteMetrics = new ShuffleWriteMetrics
  // 创建溢写文件的DiskBlockObjectWriter
  val writer: DiskBlockObjectWriter =
    blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, spillMetrics)

  // List of batch sizes (bytes) in the order they are written to disk
  // 记录写入批次大小
  val batchSizes = new ArrayBuffer[Long]

  // How many elements we have in each partition
  // 记录每个分区条数
  val elementsPerPartition = new Array[Long](numPartitions)

  // Flush the disk writer's contents to disk, and update relevant variables.
  // The writer is committed at the end of this process.
  // 将内存中的数据按批次刷写到磁盘中
  def flush(): Unit = {
    val segment = writer.commitAndGet()
    batchSizes += segment.length
    _diskBytesSpilled += segment.length
    objectsWritten = 0
  }

  var success = false
  try {
    // 遍历map或者buffer中的记录
    while (inMemoryIterator.hasNext) {
      val partitionId = inMemoryIterator.nextPartition()
      require(partitionId >= 0 && partitionId < numPartitions,
        s"partition Id: ${partitionId} should be in the range [0, ${numPartitions})")
      // 写入并更新计数值
      inMemoryIterator.writeNext(writer)
      elementsPerPartition(partitionId) += 1
      objectsWritten += 1

      // 写入条数达到10000条时,将这批刷写到磁盘
      if (objectsWritten == serializerBatchSize) {
        flush()
      }
    }
    // 遍历完以后,将剩余的刷写到磁盘
    if (objectsWritten > 0) {
      flush()
    } else {
      writer.revertPartialWritesAndClose()
    }
    success = true
  } finally {
    if (success) {
      writer.close()
    } else {
      // This code path only happens if an exception was thrown above before we set success;
      // close our stuff and let the exception be thrown further
      writer.revertPartialWritesAndClose()
      if (file.exists()) {
        if (!file.delete()) {
          logWarning(s"Error deleting ${file}")
        }
      }
    }
  }

  // 返回溢写文件
  SpilledFile(file, blockId, batchSizes.toArray, elementsPerPartition)
}

接下来就是排序合并操作,调用ExternalSorter.writePartitionedMapOutput()方法:

def writePartitionedMapOutput(
    shuffleId: Int,
    mapId: Long,
    mapOutputWriter: ShuffleMapOutputWriter): Unit = {
  var nextPartitionId = 0
  // 如果没有发生溢写
  if (spills.isEmpty) {
    // Case where we only have in-memory data
    val collection = if (aggregator.isDefined) map else buffer
    // 根据指定的比较器进行排序
    val it = collection.destructiveSortedWritablePartitionedIterator(comparator)
    while (it.hasNext()) {
      val partitionId = it.nextPartition()
      var partitionWriter: ShufflePartitionWriter = null
      var partitionPairsWriter: ShufflePartitionPairsWriter = null
      TryUtils.tryWithSafeFinally {
        partitionWriter = mapOutputWriter.getPartitionWriter(partitionId)
        val blockId = ShuffleBlockId(shuffleId, mapId, partitionId)
        partitionPairsWriter = new ShufflePartitionPairsWriter(
          partitionWriter,
          serializerManager,
          serInstance,
          blockId,
          context.taskMetrics().shuffleWriteMetrics)
        // 将分区内的数据依次取出
        while (it.hasNext && it.nextPartition() == partitionId) {
          it.writeNext(partitionPairsWriter)
        }
      } {
        if (partitionPairsWriter != null) {
          partitionPairsWriter.close()
        }
      }
      nextPartitionId = partitionId + 1
    }
    // 如果发生溢写,将溢写文件和缓存数据进行归并排序,排序完成后按照分区依次写入ShufflePartitionPairsWriter
  } else {
    // We must perform merge-sort; get an iterator by partition and write everything directly.
    // 这里会进行归并排序
    for ((id, elements) <- this.partitionedIterator) {
      val blockId = ShuffleBlockId(shuffleId, mapId, id)
      var partitionWriter: ShufflePartitionWriter = null
      var partitionPairsWriter: ShufflePartitionPairsWriter = null
      TryUtils.tryWithSafeFinally {
        partitionWriter = mapOutputWriter.getPartitionWriter(id)
        partitionPairsWriter = new ShufflePartitionPairsWriter(
          partitionWriter,
          serializerManager,
          serInstance,
          blockId,
          context.taskMetrics().shuffleWriteMetrics)
        if (elements.hasNext) {
          for (elem <- elements) {
            partitionPairsWriter.write(elem._1, elem._2)
          }
        }
      } {
        if (partitionPairsWriter != null) {
          partitionPairsWriter.close()
        }
      }
      nextPartitionId = id + 1
    }
  }

  context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled)
  context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled)
  context.taskMetrics().incPeakExecutionMemory(peakMemoryUsedBytes)
}

partitionedIterator()方法:

def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = {
  val usingMap = aggregator.isDefined
  val collection: WritablePartitionedPairCollection[K, C] = if (usingMap) map else buffer
  if (spills.isEmpty) {
    // Special case: if we have only in-memory data, we don't need to merge streams, and perhaps
    // we don't even need to sort by anything other than partition ID
    // 如果没有溢写,并且没有排序,只按照分区id排序
    if (ordering.isEmpty) {
      // The user hasn't requested sorted keys, so only sort by partition ID, not key
      groupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(None)))
      // 如果没有溢写但是排序,先按照分区id排序,再按key排序
    } else {
      // We do need to sort by both partition ID and key
      groupByPartition(destructiveIterator(
        collection.partitionedDestructiveSortedIterator(Some(keyComparator))))
    }
  } else {
    // Merge spilled and in-memory data
    // 如果有溢写,就将溢写文件和内存中的数据归并排序
    merge(spills, destructiveIterator(
      collection.partitionedDestructiveSortedIterator(comparator)))
  }
}

归并方法如下:

private def merge(spills: Seq[SpilledFile], inMemory: Iterator[((Int, K), C)])
    : Iterator[(Int, Iterator[Product2[K, C]])] = {
  // 读取溢写文件
  val readers = spills.map(new SpillReader(_))
  val inMemBuffered = inMemory.buffered
  // 遍历分区
  (0 until numPartitions).iterator.map { p =>
    val inMemIterator = new IteratorForPartition(p, inMemBuffered)
    // 合并溢写文件和内存中的数据
    val iterators = readers.map(_.readNextPartition()) ++ Seq(inMemIterator)
    // 如果有聚合逻辑,按分区聚合,对key按照keyComparator排序
    if (aggregator.isDefined) {
      // Perform partial aggregation across partitions
      (p, mergeWithAggregation(
        iterators, aggregator.get.mergeCombiners, keyComparator, ordering.isDefined))
      // 如果没有聚合,但是有排序逻辑,按照ordering做归并
    } else if (ordering.isDefined) {
      // No aggregator given, but we have an ordering (e.g. used by reduce tasks in sortByKey);
      // sort the elements without trying to merge them
      (p, mergeSort(iterators, ordering.get))
      // 什么都没有直接归并
    } else {
      (p, iterators.iterator.flatten)
    }
  }
}

在write()方法中调用commitAllPartitions()方法输出数据,其中调用writeIndexFileAndCommit()方法写出数据和索引文件,如下:

def writeIndexFileAndCommit(
    shuffleId: Int,
    mapId: Long,
    lengths: Array[Long],
    dataTmp: File): Unit = {
  // 创建索引文件和临时索引文件
  val indexFile = getIndexFile(shuffleId, mapId)
  val indexTmp = Utils.tempFileWith(indexFile)
  try {
    // 获取shuffle data file
    val dataFile = getDataFile(shuffleId, mapId)
    // There is only one IndexShuffleBlockResolver per executor, this synchronization make sure
    // the following check and rename are atomic.
    // 对于每个executor只有一个IndexShuffleBlockResolver,确保原子性
    synchronized {
      // 检查索引是否和数据文件已经有了对应关系
      val existingLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length)
      if (existingLengths != null) {
        // Another attempt for the same task has already written our map outputs successfully,
        // so just use the existing partition lengths and delete our temporary map outputs.
        // 如果存在对应关系,说明shuffle write已经完成,删除临时索引文件
        System.arraycopy(existingLengths, 0, lengths, 0, lengths.length)
        if (dataTmp != null && dataTmp.exists()) {
          dataTmp.delete()
        }
      } else {
        // 如果不存在,创建一个BufferedOutputStream
        // This is the first successful attempt in writing the map outputs for this task,
        // so override any existing index and data files with the ones we wrote.
        val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp)))
        Utils.tryWithSafeFinally {
          // We take in lengths of each block, need to convert it to offsets.
          // 获取每个分区的大小,累加偏移量,写入临时索引文件
          var offset = 0L
          out.writeLong(offset)
          for (length <- lengths) {
            offset += length
            out.writeLong(offset)
          }
        } {
          out.close()
        }

        // 删除可能存在的其他索引文件
        if (indexFile.exists()) {
          indexFile.delete()
        }
        // 删除可能存在的其他数据文件
        if (dataFile.exists()) {
          dataFile.delete()
        }
        // 将临时文件重命名成正式文件
        if (!indexTmp.renameTo(indexFile)) {
          throw new IOException("fail to rename file " + indexTmp + " to " + indexFile)
        }
        if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) {
          throw new IOException("fail to rename file " + dataTmp + " to " + dataFile)
        }
      }
    }
  } finally {
    if (indexTmp.exists() && !indexTmp.delete()) {
      logError(s"Failed to delete temporary index file at ${indexTmp.getAbsolutePath}")
    }
  }
}

4.小结

  • Spark在初始化SparkEnv的时候,会在create()方法里面初始化ShuffleManager,包含sort和tungsten-sort两种shuffle

  • ShuffleManager是一个特质,核心方法有registerShuffle()、getReader()、getWriter(),

  • SortShuffleManager是ShuffleManager的唯一实现类,在registerShuffle()方法里面选择采用哪种shuffle机制,getReader()方法只会返回一种BlockStoreShuffleReader,getWriter()方法根据不同的handle选择不同的Writer,共有三种

  • BypassMergeSortShuffleWriter:如果当前shuffle依赖中没有map端的聚合操作,并且分区数小于spark.shuffle.sort.bypassMergeThreshold的值,默认为200,启用bypass机制,核心方法有:write()、writePartitionedData()(合并所有分区文件,默认采用零拷贝方式)

  • UnsafeShuffleWriter:如果serializer支持relocation并且map端没有聚合同时分区数目不大于16777215+1三个条件都满足,采用该Writer,核心方法有:write()、insertRecordIntoSorter()(将数据插入外部选择器排序)、closeAndWriteOutput()(合并并输出文件),前一个方法里核心方法有:insertRecord()(将序列化数据插入外部排序器)、growPointerArrayIfNecessary()(如果需要额外空间需要对数组扩容或溢写到磁盘)、spill()(溢写到磁盘)、writeSortedFile()(将内存中的数据进行排序并写出到磁盘文件中)、encodePageNumberAndOffset()(对当前数据的逻辑地址进行编码,转成long型),后面的方法里核心方法有:mergeSpills()(合并溢写文件),合并文件的时候有BIO和NIO两种方式

  • SortShuffleWriter:如果上面两者都不满足,采用该Writer,该Writer会使用PartitionedAppendOnlyMap或PartitionedPariBuffer在内存中进行排序,如果超过内存限制,会溢写到文件中,在全局输出有序文件的时候,对之前的所有输出文件和当前内存中的数据进行全局归并排序,对key相同的元素会使用定义的function进行聚合核心方法有:write()、insertAll()(将数据放入ExternalSorter进行排序)、maybeSpillCollection()(是否需要溢写到磁盘)、maybeSpill()、spill()、spillMemoryIteratorToDisk()(将内存中数据溢写到磁盘)、writePartitionedMapOutput()、commitAllPartitions()里面调用writeIndexFileAndCommit()方法写出数据和索引文件

Tags: