Spark閉包 | driver & executor程式程式碼執行

Spark中的閉包

閉包的作用可以理解為:函數可以訪問函數外部定義的變數,但是函數內部對該變數進行的修改,在函數外是不可見的,即對函數外源變數不會產生影響。

其實,在學習Spark時,一個比較難理解的點就是,在集群模式下,定義的變數和方法作用域的範圍和生命周期。這在你操作RDD時,比如調用一些函數map、foreach時,訪問其外部變數進行操作時,很容易產生疑惑。為什麼我本地程式運行良好且結果正確,放到集群上卻得不到想要的結果呢?

首先通過下邊對RDD中的元素進行求和的示例,來看相同的程式碼本地模式和集群模式運行結果的區別:

Spark為了執行任務,會將RDD的操作分解為多個task,並且這些task是由executor執行的。在執行之前,Spark會計算task的閉包即定義的一些變數和方法,比如例子中的counter變數和foreach方法,並且閉包必須對executor而言是可見的,這些閉包會被序列化發送到每個executor。

在集群模式下,driver和executor運行在不同的JVM進程中,發送給每個executor的閉包中的變數是driver端變數的副本。因此,當foreach函數內引用counter時,其實處理的只是driver端變數的副本,與driver端本身的counter無關。driver節點的記憶體中仍有一個計數器,但該變數對executor是不可見的!executor只能看到序列化閉包的副本。因此,上述例子輸出的counter最終值仍然為零,因為counter上的所有操作都只是引用了序列化閉包內的值。

在本地模式下,往往driver和executor運行在同一JVM進程中。那麼這些閉包將會被共享,executor操作的counter和driver持有的counter是同一個,那麼counter在處理後最終值為6。

但是在生產中,我們的任務都是在集群模式下運行,如何能滿足這種業務場景呢?

這就必須引出一個後續要重點講解的概念:Accumulator即累加器。Spark中的累加器專門用於提供一種機制,用於在集群中的各個worker節點之間執行時安全地更新變數。

一般來說,closures – constructs比如循環或本地定義的方法,就不應該被用來改變一些全局狀態,Spark並沒有定義或保證對從閉包外引用的對象進行更新的行為。如果你這樣操作只會導致一些程式碼在本地模式下能夠達到預期的效果,但是在分散式環境下卻事與願違。如果需要某些全局聚合,請改用累加器。對於其他的業務場景,我們適時考慮引入外部存儲系統、廣播變數等。

 

閉包函數從產生到在executor執行經歷了什麼?

首先,對RDD相關的操作需要傳入閉包函數,如果這個函數需要訪問外部定義的變數,就需要滿足一定條件(比如必須可被序列化),否則會拋出運行時異常。閉包函數在最終傳入到executor執行,需要經歷以下步驟:

1. driver通過反射,運行時找到閉包訪問的變數,並封裝成一個對象,然後序列化該對象

2. 將序列化後的對象通過網路傳輸到worker節點

3. worker節點反序列化閉包對象

4. worker節點的executor執行閉包函數

簡而言之,就是要通過網路傳遞函數、然後執行,期間會經歷序列化和反序列化,所以要求被傳遞的變數必須可以被序列化和反序列化,否則會拋類似Error:Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects這樣的異常。即使是本地執行時,也會按照上述的步驟執行,這也是為什麼不允許在RDD內部直接操作RDD的原因(SparkContext不支援序列化)。同時,在這些運算元閉包內修改外部定義的變數不會被回饋到driver端。

 

driver & executor

driver是運行用戶編寫Application 的main()函數的地方,具體負責DAG的構建、任務的劃分、task的生成與調度等。job,stage,task生成都離不開rdd自身,rdd的相關的操作不能缺少driver端的sparksession/sparkcontext。

executor是真正執行task地方,而task執行離不開具體的數據,這些task運行的結果可以是shuffle中間結果,也可以持久化到外部存儲系統。一般都是將結果、狀態等彙集到driver。但是,目前executor之間不能互相通訊,只能藉助第三方來實現數據的共享或者通訊。

 

編寫的Spark程式程式碼,運行在driver端還是executor端呢?

先看個簡單例子:通常我們在本地測試程式的時候,要列印RDD中的數據。

在本地模式下,直接使用rdd.foreach(println)或rdd.map(println)在單台機器上,能夠按照預期列印並輸出所有RDD的元素。

但是,在集群模式下,由executor執行輸出寫入的是executor的stdout,而不是driver上的stdout,所以driver的stdout不會顯示這些!

要想在driver端列印所有元素,可以使用collect()方法先將RDD數據帶到driver節點,然後在調用foreach(println)(但需要注意一點,由於會把RDD中所有元素都載入到driver端,可能引起driver端記憶體不足導致OOM。如果你只是想獲取RDD中的部分元素,可以考慮使用take或者top方法)

總之,在這裡RDD中的元素即為具體的數據,對這些數據的操作都是由負責task執行的executor處理的,所以想在driver端輸出這些數據就必須先將數據載入到driver端進行處理。

最後做個總結:所有對RDD具體數據的操作都是在executor上執行的,所有對rdd自身的操作都是在driver上執行的。比如foreach、foreachPartition都是針對rdd內部數據進行處理的,所以我們傳遞給這些運算元的函數都是執行於executor端的。但是像foreachRDD、transform則是對RDD本身進行一列操作,所以它的參數函數是執行在driver端的,那麼它內部是可以使用外部變數,比如在SparkStreaming程式中操作offset、動態更新廣播變數等。


關注微信公眾號:大數據學習與分享,獲取更對技術乾貨