浪尖說spark的coalesce的利弊及原理
- 2020 年 4 月 7 日
- 筆記
浪尖的粉絲應該很久沒見浪尖發過spark源碼解讀的文章,今天浪尖在這裡給大家分享一篇文章,幫助大家進一步理解rdd如何在spark中被計算的,同時解釋一下coalesce降低分區的原理及使用問題。
主要是知識星球有人問到過coalesce方法的使用和原理的問題,並且參考閱讀了網上關於coalesce方法的錯誤介紹,有了錯誤的理解,所以浪尖忙裡偷閒給大家解釋一下。
浪尖這裡建議多看看spark源碼上,spark源碼我覺得是注釋最全的一套源碼了,而且整體代碼邏輯比較清晰,就是scala高階函數的使用會使得前期閱讀的時候很頭疼,但是不可否認spark是大家學習scala編程規範性的參考代碼。
這裡不得不吐槽一下:flink的代碼寫的很挫,注釋又不好,感覺不太適合人們閱讀學習。
1. coalesce 函數start
對於Spark 算子使用,大家還是要經常翻看一下源碼上的注釋及理解一下spark 算子的源碼實現邏輯,注釋很多時候已經很清楚了講了算子的應用場景及原理,比如本文要講的關於coalesce函數的注釋如下:
/** * Return a new RDD that is reduced into `numPartitions` partitions. * * This results in a narrow dependency, e.g. if you go from 1000 partitions * to 100 partitions, there will not be a shuffle, instead each of the 100 * new partitions will claim 10 of the current partitions. If a larger number * of partitions is requested, it will stay at the current number of partitions. * * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, * this may result in your computation taking place on fewer nodes than * you like (e.g. one node in the case of numPartitions = 1). To avoid this, * you can pass shuffle = true. This will add a shuffle step, but means the * current upstream partitions will be executed in parallel (per whatever * the current partitioning is). * * @note With shuffle = true, you can actually coalesce to a larger number * of partitions. This is useful if you have a small number of partitions, * say 100, potentially with a few partitions being abnormally large. Calling * coalesce(1000, shuffle = true) will result in 1000 partitions with the * data distributed using a hash partitioner. The optional partition coalescer * passed in must be serializable. */
注釋的大致意思就是假設父rdd 1000分區,然後調用coalesce(100),實際上就是將父rdd的1000分區分成100組,每組10個,叫做partitionGroup,每個partitionGroup作為coalescedrdd的一個分區,在compute方法中迭代處理,以此來避免shuffle。
coalesce函數總共三個參數:分區數,是否進行shuffle(默認不shuffle),Coalesce分區器(用來決定哪些父rdd的分區組成一組,作為一個partitiongroup,也即是決定了coalescedrdd的分區情況)。
def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty) (implicit ord: Ordering[T] = null) : RDD[T] = withScope { require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.") if (shuffle) { /** Distributes elements evenly across output partitions, starting from a random partition. */ val distributePartition = (index: Int, items: Iterator[T]) => { var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions) items.map { t => // Note that the hash code of the key will just be the key itself. The HashPartitioner // will mod it with the number of total partitions. position = position + 1 (position, t) } } : Iterator[(Int, T)] // include a shuffle step so that our upstream tasks are still distributed new CoalescedRDD( new ShuffledRDD[Int, T, T]( mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true), new HashPartitioner(numPartitions)), numPartitions, partitionCoalescer).values } else { new CoalescedRDD(this, numPartitions, partitionCoalescer) } }
研究coalescedrdd源碼之前,浪尖覺得應該要強調一下rdd的五大特性,大家要不理解rdd的五大特性的話,很難理解本文的內容。
吐槽一下:
其實,大家對於RDD五大特性都能背誦入流,但是要說真正理解,浪尖感覺很多人還是差點。
* - A list of partitions * - A function for computing each split * - A list of dependencies on other RDDs * - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) * - Optionally, a list of preferred locations to compute each split on (e.g. block locations for * an HDFS file)
也即:
每個RDD都有一系列的分區,每個rdd都有一系列的父rdd,也有一個針對rdd的當前分區的compute計算函數,可選的分區器和可選的本地性策略。
那麼,提個問題: RDD和父RDD的分區關係是如何確定的?
這裡又要強調五大特性了:
所有的RDD的分區數都是由getPartitions函數來確定分區,所有的RDD都是通過getDependencies()函數來確定依賴關係:窄依賴和寬依賴。而所有的rdd都是通過compute方法來計算rdd數據的。
coalesce函數的shuffle的我們這裡就暫時不介紹了,只介紹不進行shuffle操作的功能,也即是:
new CoalescedRDD(this, numPartitions, partitionCoalescer)
2. getPartitions 分區分組
默認coalesce函數的partitionCoalescer為空,所以你要想自己實現父RDD分區分組策略也是可以的。對於CoalescedRDD,默認指定分區器為空,那麼看一下其getPartitions函數,會使用默認的分區器DefaultPartitionCoalescer。
override def getPartitions: Array[Partition] = { val pc = partitionCoalescer.getOrElse(new DefaultPartitionCoalescer()) pc.coalesce(maxPartitions, prev).zipWithIndex.map { case (pg, i) => val ids = pg.partitions.map(_.index).toArray CoalescedRDDPartition(i, prev, ids, pg.prefLoc) } }
可以看看DefaultPartitionCoalescer分區器的coalesce方法,實際上就是將父RDD的分區分組縮減為指定的分區數,該函數返回的就是Array[PartitionGroup],每個PartitionGroup代表一組父RDD分區,也代表一個CoalescedRDD的分區。
/** * Runs the packing algorithm and returns an array of PartitionGroups that if possible are * load balanced and grouped by locality * * @return array of partition groups */ def coalesce(maxPartitions: Int, prev: RDD[_]): Array[PartitionGroup] = { val partitionLocs = new PartitionLocations(prev) // setup the groups (bins) setupGroups(math.min(prev.partitions.length, maxPartitions), partitionLocs) // assign partitions (balls) to each group (bins) throwBalls(maxPartitions, prev, balanceSlack, partitionLocs) getPartitions }
一個PartitionGroup實際上就是按照一定的規則組合的父RDD的partition數組,可以看一下該類。
/** * ::DeveloperApi:: * A group of `Partition`s * @param prefLoc preferred location for the partition group */ @DeveloperApi class PartitionGroup(val prefLoc: Option[String] = None) { val partitions = mutable.ArrayBuffer[Partition]() def numPartitions: Int = partitions.size }
3. getDependencies 血緣
上面說了,CoalescedRDD的getPartitions()方法,也就是完成了父RDD的分區到當前RDD分區的映射關係。這個映射關係的使用實際上就是通過getDependencies方法來調用的。具體如下:
override def getDependencies: Seq[Dependency[_]] = { Seq(new NarrowDependency(prev) { def getParents(id: Int): Seq[Int] = partitions(id).asInstanceOf[CoalescedRDDPartition].parentsIndices }) }
partitions數組是在RDD類里實現的,其實調用了getPartitions函數。
/** * Get the array of partitions of this RDD, taking into account whether the * RDD is checkpointed or not. */ final def partitions: Array[Partition] = { checkpointRDD.map(_.partitions).getOrElse { if (partitions_ == null) { partitions_ = getPartitions partitions_.zipWithIndex.foreach { case (partition, index) => require(partition.index == index, s"partitions($index).partition == ${partition.index}, but it should equal $index") } } partitions_ } }
再說回窄依賴 NarrowDependency,其實他的getParents方法就是通過當前分區的id獲取一個coalescedRDDPartition,也即一個父RDD分區數組。該數組是通過CoalescedRDD的getPartitions中實現的對父RDD分區分組得到的。
4. compute 計算分區
compute五大特性之一,針對分區的計算函數,對於CoalescedRDD,那麼其計算函數的實現如下:
override def compute(partition: Partition, context: TaskContext): Iterator[T] = { partition.asInstanceOf[CoalescedRDDPartition].parents.iterator.flatMap { parentPartition => firstParent[T].iterator(parentPartition, context) } }
觀察上述方法就會發現,是針對CoalescedRDDPartition的計算,這個其實是就是針對一個PartitionsGroup進行計算,也即使一個父RDD的分組。在getPartitions方法里生成的哦。
到這裡就很明顯了,coalescedrdd的compute方法雖然是針對Coalescedrdd的一個分區計算,實際上是計算的父RDD的一組RDD分區,降低了父RDD 的並行度哦,所以大家使用要慎重哦。
該使用shuffle決不能手軟
5. shuffle模式 開篇
對於支持shuffle的Coalesce函數,我們可以看到其實是外層包括了一個shuffleRDD,同時CoalescedRDD傳入的分區數和構建的父shuffleRDD一樣,就實現了一對一分區轉化,以此來實現shuffle功能的,針對shuffleRDD我們星球里分析分享。
if (shuffle) { /** Distributes elements evenly across output partitions, starting from a random partition. */ val distributePartition = (index: Int, items: Iterator[T]) => { var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions) items.map { t => // Note that the hash code of the key will just be the key itself. The HashPartitioner // will mod it with the number of total partitions. position = position + 1 (position, t) } } : Iterator[(Int, T)] // include a shuffle step so that our upstream tasks are still distributed new CoalescedRDD( new ShuffledRDD[Int, T, T]( mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true), new HashPartitioner(numPartitions)), numPartitions, partitionCoalescer).values