Spark中累加器的陷阱
- 2019 年 11 月 27 日
- 筆記
Spark中在使用累加器時出的一些問題的記錄
累加器(Accumulator)簡介
累加器(Accumulator)是Spark提供的累加器,顧名思義,該變量只能夠增加。由Driver端進行初始變量,Task再對聲明的變量進行累加操作。
可以為Accumulator命名,這樣就會在Spark web ui中看到每個節點的計數,以及累加後的值,可以幫助你了解程序運行的情況。
累加器使用的陷阱
在前段時間寫項目時用累加器稽核數據量,結果發現稽核的數據輸入量和輸出量明顯不同,此時要麼是程序存在問題,要麼是累加器使用有問題,從最終生成的結果文件中可以看出,是累加器的使用問題
下面來看一個Demo
val conf = new SparkConf() .setAppName("Accumulator Demo") .setMaster("local") val sc = new SparkContext(conf) sc.setLogLevel("WARN") val example = sc.longAccumulator("Example") val byKey = sc .parallelize(1 to 10) .map(x=>{ if(x%2==1){ example.add(-1) ("奇數",1) }else{ ("偶數",1) } }) byKey.foreach(println(_)) println("累加後的值:"+example.value) println(byKey.count()) println("累加後的值:"+example.value)
結果:

可以看出,如果一個算子在最終計算兩次,則累加器也會同樣增加兩次
那我們如果將涉及到累加的算子緩存會怎麼樣呢,修改部分代碼
val byKey = sc .parallelize(1 to 10) .map(x=>{ if(x%2==1){ example.add(1) ("奇數",1) }else{ ("偶數",1) } }).persist() //將計算結果進行緩存
結果:

原因分析&解決方案
官方對這個問題的解釋如下描述:
For accumulator updates performed inside actions only, Spark guarantees that each task』s update to the accumulator will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware of that each task』s update may be applied more than once if tasks or job stages are re-executed.
我們都知道,spark中的一系列transform操作會構成一串長的任務鏈,此時需要通過一個action操作來觸發,accumulator也是一樣。因此在一個action操作之前,你調用value方法查看其數值,肯定是沒有任何變化的。
所以在第一次foreach(action操作)之後,我們發現累加器的數值變成了5,是我們要的答案。
之後又對新產生的的byKey進行了一次count(action操作),其實這個時候又執行了一次map(transform)操作,所以累加器又增加了5。最終獲得的結果變成了10。
既然已經知道了造成的原因,那就是使用累加器的過程中只能使用一次action的操作才能保證結果的準確性。當然也可以通過切斷依賴關係,例如觸發一次Shuffle,Spark 會自動緩存Shuffle後生成的RDD(使用的Spark2.1,其他版本暫時不清楚),當然也可以通過Cache()、Persist()進行切斷