Spark RDD教程
- 2020 年 3 月 14 日
- 筆記
這個教程將會幫助你理解和使用Apache Spark RDD。所有的在這個教程中使用的RDD例子將會提供在github上,供大家快速的瀏覽。
什麼是RDD(Rssilient Distributed Dataset)?
RDD是Spark的基礎數據結構,是Spark和Spark內核的主要數據抽象。RDD是容錯的、不可變的對象分散式集合,這意味一旦創建了RDD,就不能更改它。RDD中的每個數據集都被劃分為邏輯分區,這些邏輯分區可以在集群的不同節點上計算。
換句話說,RDD是類似於Scala中的集合的對象集合,不同之處在於RDD是分散在多個物理伺服器(也稱為集群中的節點)上的多個JVM上計算的,而Scala集合則位於單個JVM上。
另外,RDD提供對數據進行分區和分配的數據抽象,這些數據只在在多個節點上並行運行計算,而大多數時候,在RDD上進行轉換時,我們不必擔心默認情況下Spark提供的並行性。
本Apache Spark RDD教程使用Scala示例描述了RDD上可用的基本C座,例如map,filter和persist等。此外,本教程還介紹了pair RDD函數,該函數可在鍵值對的RDD上運行,例如groupByKey和join等。
RDD的優勢
- In-Memory Processing
- Immutability
- Fault Tolerance
- Lazy Evolution
- Partitioning
- Parallelize
限制
Spark RDDs不太適合對狀態存儲(如web應用程式的存儲系統)進行更新的應用程式。對於這些應用程式,使用執行傳統更新日誌記錄和數據檢查點(如資料庫)的系統更有效。RDD的目標是為批處理分析提供一個有效的編程模型,而不考慮這些非同步應用程式。
RDD的創建
RDD主要以兩種不同的方式創建,首先是並行化現有集合,其次是引用外部存儲系統(HDFS,S3等)中的數據集。
在查看實例之前,首先讓我們使用SparkSession類中定義的builder模式方法初始化SparkSession。在初始化時,我們需要提供如下所示的主名稱和應用程式名稱。
val spark:SparkSession = SparkSession.builder() .master("local[1]") .appName("SparkByExamples.com") .getOrCreate()
使用sparkContext.parallelize()
sparkContext.parallelize用於並行化驅動程式中的現有集合。這是創建RDD的基本方法,主要在POC或原型製作時使用,它要求在創建RDD之前將所有數據都存在於驅動程式中,因此它並不是最常用於生產應用程式的。
val dataSeq = Seq(("Java", 1000), ("Python", 2000), ("Scala", 3000)) val rdd = spark.sparkContext.parallelize(dataSeq)
對於生產應用程式,我們主要通過使用外部存儲系統(如HDFS、S3、HBase e.t.c)來創建RDD。
使用sparkContext.textFile()
使用testFile()
方法,我們能把一個txt文件讀到RDD中。
val rdd2 = spark.sparkContext.textFile("/path/textFile.txt")
使用sparkContext.wholeTextFiles()
wholeTextFiles()
方法返回一個PairRDD,鍵是文件路徑,值是內容
val rdd3 = spark.SparkContext.wholeTextFiles("/path/textFile.txt")
除了使用text文件,還可以使用csv文件,json和其他格式的文件。
使用sparkContext.emptyRDD
使用sparkContext的emptyRDD()方法,創建一個沒有數據的RDD。這個方法創建一個空的RDD,並且沒有分區。
val rdd = spark.sparkContext.emptyRDD val rddString = spark.sparkContext.emptyRDD[String]
創建帶分區的空的RDD
有時我們可能需要按分區將空的RDD寫入文件,在這種情況下,您應該使用分區創建空的RDD。
val rdd2 = spark.sparkContext.parallelize(Seq.empty[String])
RDD並行和重新分區
當我們使用parallelize()或textFile()或SparkContext的wholeTextFile()方法來初始化RDD時,它會根據資源可用性自動將數據分割為分區。
getNumPartitions- 返回數據的分區數。在RDD上應用的任何轉換都是並行執行的。Spark將為集群的每個分區運行一個任務。
println("initial partition count:" + rdd.getNumPartitions) // Outputs: initial partition count:2
手動設置並行度- 我們可以手動設置一個我們需要的分區數量,將分區數作為第二參數傳遞給這些函數sparkContext.parallelize(dataSeq, 10)
使用重新 分區和合併進行重新分配:有時候我們可能需要重新劃分RDD,Spark提供了兩種重新劃分的方法;首先使用repartition()方法從所有節點shuffle數據,也稱為完全混洗。第二種coalesce()方法,該方法shuffle最少節點的數據,舉個例子,如果你有數據分布在4個分區,現在你使用coalesce(2),僅僅只從兩個節點移動數據。
這兩個函數都會重新分配分區。repartition()
方法的代價非常的巨大,它將會混洗集群上所有節點的數據。
val reparRdd = rdd.repartiton(4) println("re-partition count:" + reparRdd.getNumPartitions) // Outputs: "re-partition count:4"
Note:repartition() or coalesce()
方法都返回一個新的RDD
RDD操作
RDD轉換:轉換時惰性操作,這些操作不會更新RDD,而是返回另一個RDD
RDD操作:除法計算並返回RDD值得操作。
RDD轉換例子
Spark RDD上的轉換操作返回另一個RDD,並且轉換操作是惰性的,這意味著他們不會立即執行,直到你調用一個RDD action時才會執行。RDD上的一些轉換操作,如flatMap, map, reductByKey, filter, sortByKey
,這些轉換操作都會返回一個新的RDD,而不是更新已有的RDD。
在這個Spark RDD轉換教程中,我將使用單詞計數示例老解釋轉換。下圖演示了我們將要使用的不同的RDD轉換。
首先,從一個text文件創建一個RDD。
val rdd:RDD[String] = spark.spark.Context.textFile("src/main/scala/test.txt")
flatMap:flatMap
轉換將RDD展平,並返回新的RDD。在下面的示例中,首先它在RDD中空格分隔記錄,最後將其展平。結果RDD在每個記錄上都包含一個單詞。
val rdd2 = rdd.flatMap(f => f.split(" "))
map:映射轉換用於任何複雜的操作,比如添加一個列,更新一個列e.t.c。映射轉換的輸出總是與輸入有相同數量的記錄。
在我們的單詞計數示例中,我們將為每個單詞添加一個值為1的新列,RDD的結果為PairRDDFunctions,其中包含鍵值對,String類型的單詞為Key,Int類型的1位為value。為了更好的理解,我們為rdd3變數定義了類型。
val rdd3:RDD[(String:Int)] = rdd2.map(m => (m, 1))
filter:filter轉換操作是用來在RDD中過濾記錄的。在我們的例子中,過濾所有以’a’開頭的單詞。
val rdd4 = rdd3.filter(a => a._1.startsWith("a"))
reductByKey:reduceByKey用指定的函數來合併相同key對應的value值。在我們的示例中,它通過對值應用sum函數來減少單詞字元串。我們的RDD的結果包含唯一的單詞和他們的計數。
val rdd5 = rdd4.reductByKey(_ + _)
sortByKey:sortByKey轉換是對RDD的key列進行排序。在我們的示例中,首先我們使用映射轉換RDD[(String,Int)] to RDD[(Int,String)],並應用sortBykey,它在理想情況下對整數值進行排序。最後,使用println語句的foreach返回RDD中的所有單詞及其作為鍵-值對的計數。
val rdd6 = rdd5.map(a => (a._2, a._1)).sortByKey() // Print rd6 result to console rdd6.foreach(println)
RDD Actions with example
RDD Action操作從RDD返回原始值。換句話說,任何返回非RDD[T]的RDD函數都被視為一個動作。
count:返回RDD中的記錄數
//Action - count println("Count : " + rdd6.count())
first:返回第一條記錄
// Action - first val firstRec = rdd6.first() println("First Record : " + firstRec._1 + "," + firstRec._2)
max:返回最大的記錄
val datMax = rdd6.max() println("Max Record : " + datMax._1 + "," + datMax._2)
reduct:將記錄減少為單個,我們可以使用它來計數或求和
val totalWordCount = rdd6.reduce((a, b) => (a._1 + b._1, a._2)) println("dataReduce Record : " + totalWordCount._1)
take:返回指定數目的記錄
val data3 = rdd6.take(3) data3.foreach(f => { println("data3 Key:" + f._1 + ", Value:" + f._2) })
collect:以數據形式返回RDD中的所有數據。當你在處理帶有成千上萬億數據的巨大的RDD時,請小心使用此操作,因為你可能會耗盡驅動程式上的記憶體。
val data = rdd6.collect() data.foreach(f => { println("Key:" + f._1 + ", Value:" + f._2) })
saveAsTextFile:使用saveAsTextFile操作,可以把RDD寫入到text文件。
rdd6.saveAsTextFile("/tmp/wordCount")