sparkstreaming的狀態計算-updateStateByKey源碼
- 2020 年 3 月 4 日
- 筆記
轉發請註明原創地址:https://www.cnblogs.com/dongxiao-yang/p/11358781.html
本文基於spark源碼版本為2.4.3
在流式計算中通常會有狀態計算的需求,即當前計算結果不僅依賴於目前收到數據還需要之前結果進行合併計算的場景,由於sparkstreaming的mini-batch機制,必須將之前的狀態結果存儲在RDD中並在下一次batch計算時將其取出進行合併,這就是updateStateByKey方法的用處。
簡單用例:
def main(args: Array[String]): Unit = { val host = "localhost" val port = "8001" StreamingExamples.setStreamingLogLevels() // Create the context with a 1 second batch size val sparkConf = new SparkConf().setMaster("local[4]").setAppName("NetworkWordCount") val ssc = new StreamingContext(sparkConf, Seconds(10)) ssc.checkpoint("/Users/dyang/Desktop/checkpoittmp") val lines = ssc.socketTextStream(host, port.toInt, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(_.split(" ")) val wordCounts: DStream[(String, Int)] = words.map(x => (x, 1)) //.reduceByKey(_ + _) val totalCounts = wordCounts.updateStateByKey{(values:Seq[Int],state:Option[Int])=> Some(values.sum + state.getOrElse(0))} totalCounts.print() ssc.start() ssc.awaitTermination() }
上面例子展示了一個簡單的wordcount版本的有狀態統計,在updateStateByKey的作用下,應用會記住每個word之前count的總和並把下次到來的數據進行累加.
updateStateByKey擁有不同的參數封裝版本,比較全的一個定義如下
/** * Return a new "state" DStream where the state for each key is updated by applying * the given function on the previous state of the key and the new values of each key. * In every batch the updateFunc will be called for each state even if there are no new values. * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. * @param updateFunc State update function. Note, that this function may generate a different * tuple with a different key than the input key. Therefore keys may be removed * or added in this way. It is up to the developer to decide whether to * remember the partitioner despite the key being changed. * @param partitioner Partitioner for controlling the partitioning of each RDD in the new * DStream * @param rememberPartitioner Whether to remember the partitioner object in the generated RDDs. * @tparam S State type */ def updateStateByKey[S: ClassTag]( updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], partitioner: Partitioner, rememberPartitioner: Boolean): DStream[(K, S)] = ssc.withScope { val cleanedFunc = ssc.sc.clean(updateFunc) val newUpdateFunc = (_: Time, it: Iterator[(K, Seq[V], Option[S])]) => { cleanedFunc(it) } new StateDStream(self, newUpdateFunc, partitioner, rememberPartitioner, None) }
其中,參數里的updateFunc的是用戶原本傳入函數updateFunc: (Seq[V], Option[S]) => Option[S]的一次轉化:
val cleanedUpdateF: (Seq[V], Option[S]) => Option[S] = sparkContext.clean(updateFunc) val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => { iterator.flatMap(t => { cleanedUpdateF(t._2, t._3).map(s => (t._1, s)) }) } updateStateByKey(newUpdateFunc, partitioner, true)
最終updateStateByKey的結果是將一個PairDStreamFunctions轉化成了一個StateDStream。對於所有的Dstream,compute(time)方法都是他們生成每個duration RDD的具體實現
override def compute(validTime: Time): Option[RDD[(K, S)]] = { // Try to get the previous state RDD getOrCompute(validTime - slideDuration) match { case Some(prevStateRDD) => // If previous state RDD exists // Try to get the parent RDD parent.getOrCompute(validTime) match { case Some(parentRDD) => // If parent RDD exists, then compute as usual computeUsingPreviousRDD (validTime, parentRDD, prevStateRDD) case None => // If parent RDD does not exist // Re-apply the update function to the old state RDD val updateFuncLocal = updateFunc val finalFunc = (iterator: Iterator[(K, S)]) => { val i = iterator.map(t => (t._1, Seq.empty[V], Option(t._2))) updateFuncLocal(validTime, i) } val stateRDD = prevStateRDD.mapPartitions(finalFunc, preservePartitioning) Some(stateRDD) } case None => // If previous session RDD does not exist (first input data) // Try to get the parent RDD parent.getOrCompute(validTime) match { case Some(parentRDD) => // If parent RDD exists, then compute as usual initialRDD match { case None => // Define the function for the mapPartition operation on grouped RDD; // first map the grouped tuple to tuples of required type, // and then apply the update function val updateFuncLocal = updateFunc val finalFunc = (iterator: Iterator[(K, Iterable[V])]) => { updateFuncLocal (validTime, iterator.map (tuple => (tuple._1, tuple._2.toSeq, None))) } val groupedRDD = parentRDD.groupByKey(partitioner) val sessionRDD = groupedRDD.mapPartitions(finalFunc, preservePartitioning) // logDebug("Generating state RDD for time " + validTime + " (first)") Some (sessionRDD) case Some (initialStateRDD) => computeUsingPreviousRDD(validTime, parentRDD, initialStateRDD) } case None => // If parent RDD does not exist, then nothing to do! // logDebug("Not generating state RDD (no previous state, no parent)") None } } }
這裡需要解釋一下parent的含義:parent,是本 DStream
上游依賴的 DStream,從上面
updateStateByKey最後對StateDstream實例化程式碼可知,它將self也就是生成PairDStreamFunctions的Dstream本身傳了進來構造了Dstream之間的DAG關係。
每個Dstream內部通過一個HashMap[Time, RDD[T]] ()來管理已經生成過的RDD列表, key 是一個 Time
;這個 Time
是與用戶指定的 batchDuration
對齊了的時間 —— 如每 15s 生成一個 batch 的話,那麼這裡的 key 的時間就是 08h:00m:00s
,08h:00m:15s
這種,所以其實也就代表是第幾個 batch。generatedRDD
的 value 就是 RDD
的實例,所以parent.getOrCompute(validTime)這個調用表示了獲取經過上游Dstream的transfer操作後生成對應的RDD。
上述源碼已經帶了非常詳細的注釋,排除掉各種parentRDD/(prevStateRDD/initialRDD)不完整的邊界情況之後,方法進入到了合併當前數據和歷史狀態的方法:computeUsingPreviousRDD
private [this] def computeUsingPreviousRDD( batchTime: Time, parentRDD: RDD[(K, V)], prevStateRDD: RDD[(K, S)]) = { // Define the function for the mapPartition operation on cogrouped RDD; // first map the cogrouped tuple to tuples of required type, // and then apply the update function val updateFuncLocal = updateFunc val finalFunc = (iterator: Iterator[(K, (Iterable[V], Iterable[S]))]) => { val i = iterator.map { t => val itr = t._2._2.iterator val headOption = if (itr.hasNext) Some(itr.next()) else None (t._1, t._2._1.toSeq, headOption) } updateFuncLocal(batchTime, i) } val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner) val stateRDD = cogroupedRDD.mapPartitions(finalFunc, preservePartitioning) Some(stateRDD) }
這個方法首先將當前數據parentRDD和prevStateRDD進行了cogroup運算,返回的數據類型位RDD[(K, (Iterable[V], Iterable[S]))],其中K是DStream的key的類型,value類型是當前數據的terable[V]和歷史狀態的Iterable[S])的二元Tuple,為了匹配這個參數類型spark將前面的updateFunc: (Iterator[(K, Seq[V], Option[S])])繼續進行了封裝
val finalFunc = (iterator: Iterator[(K, (Iterable[V], Iterable[S]))])
反過來看就是,最初形式為(K, (Iterable[V], Iterable[S]))的RDD數據經過一次封裝變成了(Iterator[(K, Seq[V], Option[S])]格式再經過第二次封裝變成了對用戶自定義狀態函數updateFunc: (Seq[V], Option[S]) => Option[S]的調用並返回RDD[(K, S)]格式的RDD。
註:
1 在spark源碼中存在大量的隱式轉換,比如updateStateByKey方法並不存在Dstream而是PairDStreamFunctions對象內,這是由於DStream的伴生對象中有一個隱式轉換
implicit def toPairDStreamFunctions[K, V](stream: DStream[(K, V)]) (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairDStreamFunctions[K, V] = { new PairDStreamFunctions[K, V](stream) }
所有符合DStream[(K, V)]類型的key-value都會通過這個隱式轉換適配成PairDStreamFunctions對象
2 在使用狀態運算元的時候必須打開checkpoint功能,程式啟動器就無法通過條件檢查報錯:
java.lang.IllegalArgumentException: requirement failed: The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint()
參考文獻: