Spark RDD教程

  • 2020 年 3 月 14 日
  • 筆記

這個教程將會幫助你理解和使用Apache Spark RDD。所有的在這個教程中使用的RDD例子將會提供在github上,供大家快速的瀏覽。

什麼是RDD(Rssilient Distributed Dataset)?

RDD是Spark的基礎數據結構,是Spark和Spark內核的主要數據抽象。RDD是容錯的、不可變的對象分佈式集合,這意味一旦創建了RDD,就不能更改它。RDD中的每個數據集都被劃分為邏輯分區,這些邏輯分區可以在集群的不同節點上計算。

換句話說,RDD是類似於Scala中的集合的對象集合,不同之處在於RDD是分散在多個物理服務器(也稱為集群中的節點)上的多個JVM上計算的,而Scala集合則位於單個JVM上。

另外,RDD提供對數據進行分區和分配的數據抽象,這些數據只在在多個節點上並行運行計算,而大多數時候,在RDD上進行轉換時,我們不必擔心默認情況下Spark提供的並行性。

本Apache Spark RDD教程使用Scala示例描述了RDD上可用的基本C座,例如map,filter和persist等。此外,本教程還介紹了pair RDD函數,該函數可在鍵值對的RDD上運行,例如groupByKey和join等。

RDD的優勢

  • In-Memory Processing
  • Immutability
  • Fault Tolerance
  • Lazy Evolution
  • Partitioning
  • Parallelize

限制

Spark RDDs不太適合對狀態存儲(如web應用程序的存儲系統)進行更新的應用程序。對於這些應用程序,使用執行傳統更新日誌記錄和數據檢查點(如數據庫)的系統更有效。RDD的目標是為批處理分析提供一個有效的編程模型,而不考慮這些異步應用程序。

RDD的創建

RDD主要以兩種不同的方式創建,首先是並行化現有集合,其次是引用外部存儲系統(HDFS,S3等)中的數據集。

在查看實例之前,首先讓我們使用SparkSession類中定義的builder模式方法初始化SparkSession。在初始化時,我們需要提供如下所示的主名稱和應用程序名稱。

val spark:SparkSession = SparkSession.builder()          .master("local[1]")          .appName("SparkByExamples.com")          .getOrCreate()

使用sparkContext.parallelize()

sparkContext.parallelize用於並行化驅動程序中的現有集合。這是創建RDD的基本方法,主要在POC或原型製作時使用,它要求在創建RDD之前將所有數據都存在於驅動程序中,因此它並不是最常用於生產應用程序的。

val dataSeq = Seq(("Java", 1000), ("Python", 2000), ("Scala", 3000))  val rdd = spark.sparkContext.parallelize(dataSeq)

對於生產應用程序,我們主要通過使用外部存儲系統(如HDFS、S3、HBase e.t.c)來創建RDD。

使用sparkContext.textFile()

使用testFile()方法,我們能把一個txt文件讀到RDD中。

val rdd2 = spark.sparkContext.textFile("/path/textFile.txt")

使用sparkContext.wholeTextFiles()

wholeTextFiles()方法返回一個PairRDD,鍵是文件路徑,值是內容

val rdd3 = spark.SparkContext.wholeTextFiles("/path/textFile.txt")

除了使用text文件,還可以使用csv文件,json和其他格式的文件。

使用sparkContext.emptyRDD

使用sparkContext的emptyRDD()方法,創建一個沒有數據的RDD。這個方法創建一個空的RDD,並且沒有分區。

val rdd = spark.sparkContext.emptyRDD  val rddString = spark.sparkContext.emptyRDD[String]

創建帶分區的空的RDD

有時我們可能需要按分區將空的RDD寫入文件,在這種情況下,您應該使用分區創建空的RDD。

val rdd2 = spark.sparkContext.parallelize(Seq.empty[String])

RDD並行和重新分區

當我們使用parallelize()或textFile()或SparkContext的wholeTextFile()方法來初始化RDD時,它會根據資源可用性自動將數據分割為分區。

getNumPartitions- 返回數據的分區數。在RDD上應用的任何轉換都是並行執行的。Spark將為集群的每個分區運行一個任務。

println("initial partition count:" + rdd.getNumPartitions)  // Outputs: initial partition count:2

手動設置並行度- 我們可以手動設置一個我們需要的分區數量,將分區數作為第二參數傳遞給這些函數sparkContext.parallelize(dataSeq, 10)

使用重新 分區和合併進行重新分配:有時候我們可能需要重新劃分RDD,Spark提供了兩種重新劃分的方法;首先使用repartition()方法從所有節點shuffle數據,也稱為完全混洗。第二種coalesce()方法,該方法shuffle最少節點的數據,舉個例子,如果你有數據分佈在4個分區,現在你使用coalesce(2),僅僅只從兩個節點移動數據。

這兩個函數都會重新分配分區。repartition()方法的代價非常的巨大,它將會混洗集群上所有節點的數據。

val reparRdd = rdd.repartiton(4)  println("re-partition count:" + reparRdd.getNumPartitions)  // Outputs: "re-partition count:4"

Note:repartition() or coalesce()方法都返回一個新的RDD

RDD操作

RDD轉換:轉換時惰性操作,這些操作不會更新RDD,而是返回另一個RDD

RDD操作:除法計算並返回RDD值得操作。

RDD轉換例子

Spark RDD上的轉換操作返回另一個RDD,並且轉換操作是惰性的,這意味着他們不會立即執行,直到你調用一個RDD action時才會執行。RDD上的一些轉換操作,如flatMap, map, reductByKey, filter, sortByKey,這些轉換操作都會返回一個新的RDD,而不是更新已有的RDD。

在這個Spark RDD轉換教程中,我將使用單詞計數示例老解釋轉換。下圖演示了我們將要使用的不同的RDD轉換。

image

首先,從一個text文件創建一個RDD。

val rdd:RDD[String] = spark.spark.Context.textFile("src/main/scala/test.txt")

flatMap:flatMap轉換將RDD展平,並返回新的RDD。在下面的示例中,首先它在RDD中空格分隔記錄,最後將其展平。結果RDD在每個記錄上都包含一個單詞。

val rdd2 = rdd.flatMap(f => f.split(" "))

map:映射轉換用於任何複雜的操作,比如添加一個列,更新一個列e.t.c。映射轉換的輸出總是與輸入有相同數量的記錄。

在我們的單詞計數示例中,我們將為每個單詞添加一個值為1的新列,RDD的結果為PairRDDFunctions,其中包含鍵值對,String類型的單詞為Key,Int類型的1位為value。為了更好的理解,我們為rdd3變量定義了類型。

val rdd3:RDD[(String:Int)] = rdd2.map(m => (m, 1))

filter:filter轉換操作是用來在RDD中過濾記錄的。在我們的例子中,過濾所有以’a’開頭的單詞。

val rdd4 = rdd3.filter(a => a._1.startsWith("a"))

reductByKey:reduceByKey用指定的函數來合併相同key對應的value值。在我們的示例中,它通過對值應用sum函數來減少單詞字符串。我們的RDD的結果包含唯一的單詞和他們的計數。

val rdd5 = rdd4.reductByKey(_ + _)

sortByKey:sortByKey轉換是對RDD的key列進行排序。在我們的示例中,首先我們使用映射轉換RDD[(String,Int)] to RDD[(Int,String)],並應用sortBykey,它在理想情況下對整數值進行排序。最後,使用println語句的foreach返回RDD中的所有單詞及其作為鍵-值對的計數。

val rdd6 = rdd5.map(a => (a._2, a._1)).sortByKey()  // Print rd6 result to console  rdd6.foreach(println)

RDD Actions with example

RDD Action操作從RDD返回原始值。換句話說,任何返回非RDD[T]的RDD函數都被視為一個動作。

count:返回RDD中的記錄數

//Action - count  println("Count : " + rdd6.count())

first:返回第一條記錄

// Action - first  val firstRec = rdd6.first()  println("First Record : " + firstRec._1 + "," + firstRec._2)

max:返回最大的記錄

val datMax = rdd6.max()  println("Max Record : " + datMax._1 + "," + datMax._2)

reduct:將記錄減少為單個,我們可以使用它來計數或求和

val totalWordCount = rdd6.reduce((a, b) => (a._1 + b._1, a._2))  println("dataReduce Record : " + totalWordCount._1)

take:返回指定數目的記錄

val data3 = rdd6.take(3)  data3.foreach(f => {      println("data3 Key:" + f._1 + ", Value:" + f._2)  })

collect:以數據形式返回RDD中的所有數據。當你在處理帶有成千上萬億數據的巨大的RDD時,請小心使用此操作,因為你可能會耗盡驅動程序上的內存。

val data = rdd6.collect()  data.foreach(f => {      println("Key:" + f._1 + ", Value:" + f._2)  })

saveAsTextFile:使用saveAsTextFile操作,可以把RDD寫入到text文件。

rdd6.saveAsTextFile("/tmp/wordCount")