浪尖說spark的coalesce的利弊及原理

浪尖的粉絲應該很久沒見浪尖發過spark源碼解讀的文章,今天浪尖在這裡給大家分享一篇文章,幫助大家進一步理解rdd如何在spark中被計算的,同時解釋一下coalesce降低分區的原理及使用問題。

主要是知識星球有人問到過coalesce方法的使用和原理的問題,並且參考閱讀了網上關於coalesce方法的錯誤介紹,有了錯誤的理解,所以浪尖忙裡偷閒給大家解釋一下。

浪尖這裡建議多看看spark源碼上,spark源碼我覺得是注釋最全的一套源碼了,而且整體代碼邏輯比較清晰,就是scala高階函數的使用會使得前期閱讀的時候很頭疼,但是不可否認spark是大家學習scala編程規範性的參考代碼。

這裡不得不吐槽一下:flink的代碼寫的很挫,注釋又不好,感覺不太適合人們閱讀學習。

1. coalesce 函數start

對於Spark 算子使用,大家還是要經常翻看一下源碼上的注釋及理解一下spark 算子的源碼實現邏輯,注釋很多時候已經很清楚了講了算子的應用場景及原理,比如本文要講的關於coalesce函數的注釋如下:

 /**     * Return a new RDD that is reduced into `numPartitions` partitions.     *     * This results in a narrow dependency, e.g. if you go from 1000 partitions     * to 100 partitions, there will not be a shuffle, instead each of the 100     * new partitions will claim 10 of the current partitions. If a larger number     * of partitions is requested, it will stay at the current number of partitions.     *     * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,     * this may result in your computation taking place on fewer nodes than     * you like (e.g. one node in the case of numPartitions = 1). To avoid this,     * you can pass shuffle = true. This will add a shuffle step, but means the     * current upstream partitions will be executed in parallel (per whatever     * the current partitioning is).     *     * @note With shuffle = true, you can actually coalesce to a larger number     * of partitions. This is useful if you have a small number of partitions,     * say 100, potentially with a few partitions being abnormally large. Calling     * coalesce(1000, shuffle = true) will result in 1000 partitions with the     * data distributed using a hash partitioner. The optional partition coalescer     * passed in must be serializable.     */

注釋的大致意思就是假設父rdd 1000分區,然後調用coalesce(100),實際上就是將父rdd的1000分區分成100組,每組10個,叫做partitionGroup,每個partitionGroup作為coalescedrdd的一個分區,在compute方法中迭代處理,以此來避免shuffle。

coalesce函數總共三個參數:分區數,是否進行shuffle(默認不shuffle),Coalesce分區器(用來決定哪些父rdd的分區組成一組,作為一個partitiongroup,也即是決定了coalescedrdd的分區情況)。

def coalesce(numPartitions: Int, shuffle: Boolean = false,                 partitionCoalescer: Option[PartitionCoalescer] = Option.empty)                (implicit ord: Ordering[T] = null)        : RDD[T] = withScope {      require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")      if (shuffle) {        /** Distributes elements evenly across output partitions, starting from a random partition. */        val distributePartition = (index: Int, items: Iterator[T]) => {          var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)          items.map { t =>            // Note that the hash code of the key will just be the key itself. The HashPartitioner            // will mod it with the number of total partitions.            position = position + 1            (position, t)          }        } : Iterator[(Int, T)]        // include a shuffle step so that our upstream tasks are still distributed        new CoalescedRDD(          new ShuffledRDD[Int, T, T](            mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true),            new HashPartitioner(numPartitions)),          numPartitions,          partitionCoalescer).values      } else {        new CoalescedRDD(this, numPartitions, partitionCoalescer)      }    }

研究coalescedrdd源碼之前,浪尖覺得應該要強調一下rdd的五大特性,大家要不理解rdd的五大特性的話,很難理解本文的內容。

吐槽一下:

其實,大家對於RDD五大特性都能背誦入流,但是要說真正理解,浪尖感覺很多人還是差點。

 *  - A list of partitions   *  - A function for computing each split   *  - A list of dependencies on other RDDs   *  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)   *  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for   *    an HDFS file)

也即:

每個RDD都有一系列的分區,每個rdd都有一系列的父rdd,也有一個針對rdd的當前分區的compute計算函數,可選的分區器和可選的本地性策略。

那麼,提個問題: RDD和父RDD的分區關係是如何確定的?

這裡又要強調五大特性了:

所有的RDD的分區數都是由getPartitions函數來確定分區,所有的RDD都是通過getDependencies()函數來確定依賴關係:窄依賴和寬依賴。而所有的rdd都是通過compute方法來計算rdd數據的。

coalesce函數的shuffle的我們這裡就暫時不介紹了,只介紹不進行shuffle操作的功能,也即是:

new CoalescedRDD(this, numPartitions, partitionCoalescer)

2. getPartitions 分區分組

默認coalesce函數的partitionCoalescer為空,所以你要想自己實現父RDD分區分組策略也是可以的。對於CoalescedRDD,默認指定分區器為空,那麼看一下其getPartitions函數,會使用默認的分區器DefaultPartitionCoalescer。

override def getPartitions: Array[Partition] = {      val pc = partitionCoalescer.getOrElse(new DefaultPartitionCoalescer())      pc.coalesce(maxPartitions, prev).zipWithIndex.map {        case (pg, i) =>          val ids = pg.partitions.map(_.index).toArray          CoalescedRDDPartition(i, prev, ids, pg.prefLoc)      }    }

可以看看DefaultPartitionCoalescer分區器的coalesce方法,實際上就是將父RDD的分區分組縮減為指定的分區數,該函數返回的就是Array[PartitionGroup],每個PartitionGroup代表一組父RDD分區,也代表一個CoalescedRDD的分區。

 /**     * Runs the packing algorithm and returns an array of PartitionGroups that if possible are     * load balanced and grouped by locality      *      * @return array of partition groups     */    def coalesce(maxPartitions: Int, prev: RDD[_]): Array[PartitionGroup] = {      val partitionLocs = new PartitionLocations(prev)      // setup the groups (bins)      setupGroups(math.min(prev.partitions.length, maxPartitions), partitionLocs)      // assign partitions (balls) to each group (bins)      throwBalls(maxPartitions, prev, balanceSlack, partitionLocs)      getPartitions    }

一個PartitionGroup實際上就是按照一定的規則組合的父RDD的partition數組,可以看一下該類。

/**   * ::DeveloperApi::   * A group of `Partition`s   * @param prefLoc preferred location for the partition group   */  @DeveloperApi  class PartitionGroup(val prefLoc: Option[String] = None) {    val partitions = mutable.ArrayBuffer[Partition]()    def numPartitions: Int = partitions.size  }

3. getDependencies 血緣

上面說了,CoalescedRDD的getPartitions()方法,也就是完成了父RDD的分區到當前RDD分區的映射關係。這個映射關係的使用實際上就是通過getDependencies方法來調用的。具體如下:

 override def getDependencies: Seq[Dependency[_]] = {      Seq(new NarrowDependency(prev) {        def getParents(id: Int): Seq[Int] =          partitions(id).asInstanceOf[CoalescedRDDPartition].parentsIndices      })    }

partitions數組是在RDD類里實現的,其實調用了getPartitions函數。

/**     * Get the array of partitions of this RDD, taking into account whether the     * RDD is checkpointed or not.     */    final def partitions: Array[Partition] = {      checkpointRDD.map(_.partitions).getOrElse {        if (partitions_ == null) {          partitions_ = getPartitions          partitions_.zipWithIndex.foreach { case (partition, index) =>            require(partition.index == index,              s"partitions($index).partition == ${partition.index}, but it should equal $index")          }        }        partitions_      }    }

再說回窄依賴 NarrowDependency,其實他的getParents方法就是通過當前分區的id獲取一個coalescedRDDPartition,也即一個父RDD分區數組。該數組是通過CoalescedRDD的getPartitions中實現的對父RDD分區分組得到的。

4. compute 計算分區

compute五大特性之一,針對分區的計算函數,對於CoalescedRDD,那麼其計算函數的實現如下:

 override def compute(partition: Partition, context: TaskContext): Iterator[T] = {      partition.asInstanceOf[CoalescedRDDPartition].parents.iterator.flatMap { parentPartition =>        firstParent[T].iterator(parentPartition, context)      }    }

觀察上述方法就會發現,是針對CoalescedRDDPartition的計算,這個其實是就是針對一個PartitionsGroup進行計算,也即使一個父RDD的分組。在getPartitions方法里生成的哦。

到這裡就很明顯了,coalescedrdd的compute方法雖然是針對Coalescedrdd的一個分區計算,實際上是計算的父RDD的一組RDD分區,降低了父RDD 的並行度哦,所以大家使用要慎重哦

該使用shuffle決不能手軟

5. shuffle模式 開篇

對於支持shuffle的Coalesce函數,我們可以看到其實是外層包括了一個shuffleRDD,同時CoalescedRDD傳入的分區數和構建的父shuffleRDD一樣,就實現了一對一分區轉化,以此來實現shuffle功能的,針對shuffleRDD我們星球里分析分享。

if (shuffle) {        /** Distributes elements evenly across output partitions, starting from a random partition. */        val distributePartition = (index: Int, items: Iterator[T]) => {          var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)          items.map { t =>            // Note that the hash code of the key will just be the key itself. The HashPartitioner            // will mod it with the number of total partitions.            position = position + 1            (position, t)          }        } : Iterator[(Int, T)]        // include a shuffle step so that our upstream tasks are still distributed        new CoalescedRDD(          new ShuffledRDD[Int, T, T](            mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true),            new HashPartitioner(numPartitions)),          numPartitions,          partitionCoalescer).values