Spark RDD詳解 | RDD特性、lineage、快取、checkpoint、依賴關係

RDD(Resilient Distributed Datasets)彈性的分散式數據集,又稱Spark core,它代表一個只讀的、不可變、可分區,裡面的元素可分散式並行計算的數據集。

RDD是一個很抽象的概念,不易於理解,但是要想學好Spark,必須要掌握RDD,熟悉它的編程模型,這是學習Spark其他組件的基礎。筆者在這裡從名字和幾個重要的概念給大家一一解讀:

  • Resilient(彈性的)

     提到大數據必提分散式,而在大規模的分散式集群中,任何一台伺服器隨時都有可能出現故障,如果一個task任務所在的伺服器出現故障,必然導致這個task執行失敗。此時,RDD的”彈性的”特點可以使這個task在集群內進行遷移,從而保證整體任務對故障伺服器的平穩過渡。對於整個任務而言,只需重跑某些失敗的task即可,而無需完全重跑,大大提高性能

  • Distributed(分散式)

 首先了解一下分區,即數據根據一定的切分規則切分成一個個的子集。spark中分區劃分規則默認是根據key進行哈希取模,切分後的數據子集可以獨立運行在各個task中並且在各個集群伺服器中並行執行。當然使用者也可以自定義分區規則,這個還是很有應用場景的,比如自定義分區打散某個key特別多的數據集以避免數據傾斜(數據傾斜是大數據領域常見問題也是調優重點,後續會單獨講解)

  • Datasets(數據集)

    初學者很容易誤解,認為RDD是存儲數據的,畢竟從名字看來它是一個”彈性的分散式數據集”。但是,筆者強調,RDD並不存儲數據,它只記錄數據存儲的位置。內部處理邏輯是通過使用者調用不同的Spark運算元,一個RDD會轉換為另一個RDD(這也體現了RDD只讀不可變的特點,即一個RDD只能由另一個RDD轉換而來),以transformation運算元為例,RDD彼此之間會形成pipeline管道,無需等到上一個RDD所有數據處理邏輯執行完就可以立即交給下一個RDD進行處理,性能也得到了很大提升。但是RDD在進行transform時,不是每處理一條數據就交給下一個RDD,而是使用小批量的方式進行傳遞(這也是一個優化點)

  • lineage

    既然Spark將RDD之間以pipeline的管道連接起來,如何避免在伺服器出現故障後,重算這些數據呢?這些失敗的RDD由哪來呢?這就牽涉到,Spark中的一個很重要的概念:Lineage即血統關係。它會記錄RDD的元數據資訊和依賴關係,當該RDD的部分分區數據丟失時,可以根據這些資訊來重新運算和恢復丟失的分區數據。簡單而言就是它會記錄哪些RDD是怎麼產生的、怎麼「丟失」的等,然後Spark會根據lineage記錄的資訊,恢復丟失的數據子集,這也是保證Spark RDD彈性的關鍵點之一

  • Spark快取和checkpoint

    • 快取(cache/persist)
         cache和persist其實是RDD的兩個API,並且cache底層調用的就是persist,區別之一就在於cache不能顯示指定快取方式,只能快取在記憶體中,但是persist可以通過指定快取方式,比如顯示指定快取在記憶體中、記憶體和磁碟並且序列化等。通過RDD的快取,後續可以對此RDD或者是基於此RDD衍生出的其他的RDD處理中重用這些快取的數據集

    • 容錯(checkpoint)
          本質上是將RDD寫入磁碟做檢查點(通常是checkpoint到HDFS上,同時利用了hdfs的高可用、高可靠等特徵)。上面提到了Spark lineage,但在實際的生產環境中,一個業務需求可能非常非常複雜,那麼就可能會調用很多運算元,產生了很多RDD,那麼RDD之間的linage鏈條就會很長,一旦某個環節出現問題,容錯的成本會非常高。此時,checkpoint的作用就體現出來了。使用者可以將重要的RDD checkpoint下來,出錯後,只需從最近的checkpoint開始重新運算即可使用方式也很簡單,指定checkpoint的地址[SparkContext.setCheckpointDir(“checkpoint的地址”)],然後調用RDD的checkpoint的方法即可。

    • checkpoint與cache/persist對比

      • 都是lazy操作,只有action運算元觸發後才會真正進行快取或checkpoint操作(懶載入操作是Spark任務很重要的一個特性,不僅適用於Spark RDD還適用於Spark sql等組件)

      • cache只是快取數據,但不改變lineage。通常存於記憶體,丟失數據可能性更大

      • 改變原有lineage,生成新的CheckpointRDD。通常存於hdfs,高可用且更可靠

  • RDD的依賴關係
    Spark中使用DAG(有向無環圖)來描述RDD之間的依賴關係,根據依賴關係的不同,劃分為寬依賴和窄依賴

     

通過上圖,可以很容易得出所謂寬依賴:多個子RDD的partition會依賴同一個parentRDD的partition;窄依賴:每個parentRDD的partition最多被子RDD的一個partition使用。這兩個概念很重要,像寬依賴是劃分stage的關鍵,並且一般都會伴有shuffle,而窄依賴之間其實就形成前文所述的pipeline管道進行處理數據。(圖中的map、filter等是Spark提供的運算元,具體含義大家可以自行到Spark官網了解,順便感受一下scala函數式程式語言的強大)。

 

Spark任務以及stage等的具體劃分,牽涉到源碼,後續會單獨講解

 

最後筆者以RDD源碼中的注釋,闡述一下RDD的屬性:

1.分區列表(數據塊列表,只保存數據位置,不保存具體地址)

2. 計算每個分片的函數(根據父RDD計算出子RDD)

3. RDD的依賴列表

4. RDD默認是存儲於記憶體,但當記憶體不足時,會spill到disk(可通過設置StorageLevel來控制)

5. 默認hash分區,可自定義分區器

6. 每一個分片的優先計算位置(preferred locations)列表,比如HDFS的block的所在位置應該是優先計算的位置


 關注微信公眾號:大數據學習與分享,獲取更對技術乾貨