別再人云亦云了!!!你真的搞懂了RDD、DF、DS的區別嗎?
幾年前,包括最近,我看了各種書籍、教程、官網。但是真正能夠把RDD、DataFrame、DataSet解釋得清楚一點的、論據多一點少之又少,甚至有的人號稱Spark專家,但在這一塊根本說不清楚。還有國內的一些書籍,小猴真的想問一聲:Are you OK?書名別再叫精通xxx技術了,請改名為 xxx技術從入門到放棄。這樣可以有效避免耽誤別人學習,不好嗎?
大家都在告訴我們結論,但其實,小猴作為一名長期混跡於開源社區、並仍在一線大數據開發的技術人,深諳技術文化之一:
To experience | 去經歷
這是我要提倡的技術文化之一。之前有人把Experience譯為體驗,但在小猴的技術世界裏,Experience更多的是自己去經歷,而不能跟團去旅遊一樣,那樣你只能是一個外包而已,想要做到卓越,就得去經歷。技術,只有去經歷才會有成長。
目錄
RDD、DataFrame、DataSet介紹
我們每天都在基於框架開發,對於我們來說,一套易於使用的API太重要了。對於Spark來說,有三套API。
分別是:
- RDD
- DataFrame
- DataSet
三套的API,開發人員就要學三套。不過,從Spark 2.2開始,DataFrame和DataSet的API已經統一了。而編寫Spark程序的時候,RDD已經慢慢退出我們的視野了。
但Spark既然提供三套API,我們到底什麼時候用RDD、什麼時候用DataFrame、或者DataSet呢?我們先來了解下這幾套API。
RDD
RDD的概念
- RDD是Spark 1.1版本開始引入的。
- RDD是Spark的基本數據結構。
- RDD是Spark的彈性分佈式數據集,它是不可變的(Immutable)。
- RDD所描述的數據分佈在集群的各個節點中,基於RDD提供了很多的轉換的並行處理操作。
- RDD具備容錯性,在任何節點上出現了故障,RDD是能夠進行容錯恢復的。
- RDD專註的是How!就是如何處理數據,都由我們自己來去各種算子來實現。
什麼時候使用RDD?
- 應該避免使用RDD!
RDD的短板
- 集群間通信都需要將JVM中的對象進行序列化和反序列化,RDD開銷較大
- 頻繁創建和銷毀對象會增加GC,GC的性能開銷較大
Spark 2.0開始,RDD不再是一等公民
從Apache Spark 2.0開始,RDD已經被降級為二等公民,RDD已經被棄用了。而且,我們一會就會發現,DataFrame/DataSet是可以和RDD相互轉換的,DataFrame和DataSet也是建立在RDD上。
DataFrame
DataFrame概念
- DataFrame是從Spark 1.3版本開始引入的。
- 通過DataFrame可以簡化Spark程序的開發,讓Spark處理結構化數據變得更簡單。DataFrame可以使用SQL的方式來處理數據。例如:業務分析人員可以基於編寫Spark SQL來進行數據開發,而不僅僅是Spark開發人員。
- DataFrame和RDD有一些共同點,也是不可變的分佈式數據集。但與RDD不一樣的是,DataFrame是有schema的,有點類似於關係型數據庫中的表,每一行的數據都是一樣的,因為。有了schema,這也表明了DataFrame是比RDD提供更高層次的抽象。
- DataFrame支持各種數據格式的讀取和寫入,例如:CSV、JSON、AVRO、HDFS、Hive表。
- DataFrame使用Catalyst進行優化。
- DataFrame專註的是What!,而不是How!
DataFrame的優點
- 因為DataFrame是有統一的schema的,所以序列化和反序列無需存儲schema。這樣節省了一定的空間。
- DataFrame存儲在off-heap(堆外內存)中,由操作系統直接管理(RDD是JVM管理),可以將數據直接序列化為二進制存入off-heap中。操作數據也是直接操作off-heap。
DataFrane的短板
- DataFrame不是類型安全的
- API也不是面向對象的
Apache Spark 2.0 統一API
從Spark 2.0開始,DataFrame和DataSet的API合併在一起,實現了跨庫統一成為一套API。這樣,開發人員的學習成本就降低了。只需要學習一個High Level的、類型安全的DataSet API就可以了。——這對於Spark開發人員來說,是一件好事。
上圖我們可以看到,從Spark 2.0開始,Dataset提供了兩組不同特性的API:
- 非類型安全
- 類型安全
其中非類型安全就是DataSet[Row],我們可以對Row中的字段取別名。這不就是DataFrame嗎?而類型安全就是JVM對象的集合,類型就是scala的樣例類,或者是Java的實體類。
有Spark 2.0源碼為證:
package object sql {
// ...
type DataFrame = Dataset[Row]
}
也就是說,每當我們用導DataFrame其實就是在使用Dataset。
針對Python或者R,不提供類型安全的DataSet,只能基於DataFrame API開發。
什麼時候使用DataFrame
DataSet
- DataSet是從Spark 1.6版本開始引入的。
- DataSet具有RDD和DataFrame的優點,既提供了更有效率的處理、以及類型安全的API。
- DataSet API都是基於Lambda函數、以及JVM對象來進行開發,所以在編譯期間就可以快速檢測到錯誤,節省開發時間和成本。
- DataSet使用起來很像,但它的執行效率、空間資源效率都要比RDD高很多。可以很方便地使用DataSet處理結構化、和非結構數據。
DataSet API的優點
- DataSet結合了RDD和DataFrame的優點。
- 當序列化數據時,Encoder生成的位元組碼可以直接與堆交互,實現對數據按需訪問,而無需反序列化整個對象。
類型安全
寫過Java或者C#的同學都會知道,一旦在代碼中類型使用不當,編譯都編譯不過去。日常開發中,我們更多地是使用泛型。因為一旦我們使用非類型安全的類型,軟件的維護周期一長,如果集合中放入了一些不合適的類型,就會出現嚴重的故障。這也是為什麼Java、C#還有C++都要去支持泛型的原因。
在Spark中也會有類型安全的問題。而且,一旦在運行時出現類型安全問題,會影響整個大規模計算作業。這種作業的錯誤排除難度,要比單機故障排查起來更複雜。如果在運行時期間就能發現問題,這很美好啊。
DataFrame中編寫SQL進行數據處理分析,在編譯時是不做檢查的,只有在Spark程序運行起來,才會檢測到問題。
SQL | DataFrame | Dataset | |
---|---|---|---|
語法錯誤 | 運行時 | 編譯時 | 編譯時 |
解析錯誤 | 運行時 | 運行時 | 編譯時 |
對結構化和半結構化數據的High Level抽象
例如:我們有一個較大的網站流量日誌JSON數據集,可以很容易的使用DataSet[WebLog]來處理,強類型操作可以讓處理起來更加簡單。
以RDD更易用的API
DataSet引入了更豐富的、更容易使用的API操作。這些操作是基於High Level抽象的,而且基於實體類的操作,例如:進行groupBy、agg、select、sum、avg、filter等操作會容易很多。
性能優化
使用DataFrame和DataSet API在性能和空間使用率上都有大幅地提升。
-
DataFrame和DataSet API是基於Spark SQL引擎之上構建的,會使用Catalyst生成優化後的邏輯和物理執行計劃。尤其是無類型的DataSet[Row](DataFrame),它的速度更快,很適合交互式查詢。
-
由於Spark能夠理解DataSet中的JVM對象類型,所以Spark會將將JVM對象映射為Tungsten的內部內存方式存儲。而Tungsten編碼器可以讓JVM對象更有效地進行序列化和反序列化,生成更緊湊、更有效率的位元組碼。
通過上圖可以看到,DataSet的空間存儲效率是RDD的4倍。RDD要使用60GB的空間,而DataSet只需要使用不到15GB就可以了。
Youtube視頻分析案例
數據集
去Kaggle下載youtube地址:
//www.kaggle.com/datasnaek/youtube-new?select=USvideos.csv
每個字段的含義都有說明。
Maven開發環境準備
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<scala.version>2.12</scala.version>
<spark.version>3.0.1</spark.version>
</properties>
<repositories>
<repository>
<id>central</id>
<url>//maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>central</id>
<url>//maven.aliyun.com/nexus/content/groups/public/</url>
</pluginRepository>
</pluginRepositories>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>com.opencsv</groupId>
<artifactId>opencsv</artifactId>
<version>5.3</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
</build>
RDD開發
/**
* Spark RDD處理示例
*/
object RddAnalysis {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RDD Process").setMaster("local[*]")
val sc = new SparkContext(conf)
// 讀取本地文件創建RDD
val youtubeVideosRDD = {
sc.textFile("""E:\05.git_project\dataset\youtube""")
}
// 統計不同分類Youtube視頻的喜歡人數、不喜歡人數
// 1. 添加行號
// 創建計數器
val rownumAcc = sc.longAccumulator("rownum")
// 帶上行號
youtubeVideosRDD.map(line => {
rownumAcc.add(1)
rownumAcc.value -> line
})
// 過濾掉第一行
.filter(_._1 != 1)
// 去除行號
.map(_._2)
// 過濾掉非法的數據
.filter(line => {
val fields = line.split("\001")
val try1 = scala.util.Try(fields(8).toLong)
val try2 = scala.util.Try(fields(9).toLong)
if(try1.isFailure || try2.isFailure)
false
else
true
})
// 讀取三個字段(視頻分類、喜歡的人數、不喜歡的人數
.map(line => {
// 按照\001解析CSV
val fields = line.split("\001")
// 取第4個(分類)、第8個(喜歡人數)、第9個(不喜歡人數)
// (分類id, 喜歡人數, 不喜歡人數)
(fields(4), fields(8).toLong, fields(9).toLong)
})
// 按照分類id分組
.groupBy(_._1)
.map(t => {
val result = t._2.reduce((r1, r2) => {
(r1._1, r1._2 + r2._2, r1._3 + r2._3)
})
result
})
.foreach(println)
}
}
運行結果如下:
("BBC Three",8980120,149525)
("Ryan Canty",11715543,80544)
("Al Jazeera English",34427,411)
("FBE",9003314,191819)
("Sugar Pine 7",1399232,81062)
("Rob Scallon",11652652,704748)
("CamilaCabelloVEVO",19077166,1271494)
("Grist",3133,37)
代碼中做了一些數據的過濾,然後進行了分組排序。如果Spark都要這麼來寫的話,業務人員幾乎是沒法寫了。着代碼完全解釋了How,而不是What。每一個處理的細節,都要我們自己親力親為。實現起來臃腫。
查看下基於RDD的DAG
打開瀏覽器,輸入:localhost:4040,來看下DAG。
DAG非常的直觀,按照shuffle分成了兩個Stage來執行。Stage中依次執行了每個Operator。程序沒有經過任何優化。我把每一個操作都和DAG上的節點對應了起來。
DataFrame開發
object DataFrameAnalysis {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("Youtube Analysis")
.master("local[*]")
.config("spark.sql.shuffle.partitions",1)
.getOrCreate()
import spark.sqlContext.implicits._
// 讀取CSV
val youtubeVideoDF = spark.read.option("header", true).csv("""E:\05.git_project\dataset\USvideos.csv""")
import org.apache.spark.sql.functions._
// 按照category_id分組聚合
youtubeVideoDF.select($"category_id", $"likes".cast(LongType), $"dislikes".cast(LongType))
.where($"likes".isNotNull)
.where( $"dislikes".isNotNull)
.groupBy($"category_id")
.agg(sum("likes"), sum("dislikes"))
.show()
}
}
大家可以看到,現在實現方式非常的簡單,而且清晰。
查看下基於DataFrame的執行計劃與DAG
但我們運行上面的Spark程序時,其實運行了兩個JOB。
下面這個是第一個Job的DAG。我們看到只有一個Stage。這個DAG我們看得不是特別清楚做了什麼,因為Spark SQL是做過優化的,我們需要查看Query的詳細信息,才能看到具體執行的工作。
第一個Job的詳細執行信息如下:
哦,原來這個JOB掃描了所有的行,然後執行了一個Filter過濾操作。再查看下查詢計劃:
== Parsed Logical Plan ==
GlobalLimit 1
+- LocalLimit 1
+- Filter (length(trim(value#6, None)) > 0)
+- Project [value#0 AS value#6]
+- Project [value#0]
+- Relation[value#0] text
== Analyzed Logical Plan ==
value: string
GlobalLimit 1
+- LocalLimit 1
+- Filter (length(trim(value#6, None)) > 0)
+- Project [value#0 AS value#6]
+- Project [value#0]
+- Relation[value#0] text
== Optimized Logical Plan ==
GlobalLimit 1
+- LocalLimit 1
+- Filter (length(trim(value#0, None)) > 0)
+- Relation[value#0] text
== Physical Plan ==
CollectLimit 1
+- *(1) Filter (length(trim(value#0, None)) > 0)
+- FileScan text [value#0] Batched: false, DataFilters: [(length(trim(value#0, None)) > 0)], Format: Text, Location: InMemoryFileIndex[file:/E:/05.git_project/dataset/USvideos.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value:string>
可以非常清晰地看到,我們說看到的DAG是經過優化後的。
第二個JOB的DAG如下,同樣,我們也只能看到個大概。例如:Scan csv讀取csv文件,然後執行Spark SQL自動生成、優化後的Codegen階段,再執行了一次Shuffle(Exchange),然後再執行Spark SQL的codegen,最後執行mapPartition操作。
為了一探究竟,我們依然得去查看Query Detail。這個Query Detail圖稍微長一點。我們很兩個部分來講解。
第一部分:
-
掃描csv文件,一共讀取了一個文件,大小是59.8MB,一共有41035行。鼠標移上去,可以看到讀取的文件路徑、讀取的schema是什麼。
-
執行過濾操作(Filter)過濾出來的結果是40949行。把鼠標放在該操作,可以看到具體過濾的內容。
-
執行Project投影查詢。其實就是執行select語句。
-
然後開始執行Hash聚合。按照category_id進行分組,並執行了partial_sum。
第二部分:
- Exchange表示進行數據交換(其實就是shuffle),shuffle一共讀取了122行。
- 接着進行Hash聚合,按照category分組,並進行sum求和,計算得到最終結果。
- 最後輸出21行,多出來的一行顯示的第頭部。
雖然DataFrame我們使用的是DSL方式,但我們可以感受這個過程處理起來比較簡單。根據列進行分組聚合的時候,在編譯時期是對類型不敏感的、非安全的。我們要保證列名、類型都是正確的。同時,我們可以清晰的看到Spark SQL對程序執行過程的優化。
DataSet開發
要使用DataSet開發,我們先來看一下csv讀取數據成為DataFrame的spark源碼。
def csv(path: String): DataFrame = {
// This method ensures that calls that explicit need single argument works, see SPARK-16009
csv(Seq(path): _*)
}
我們可以看到csv返回的是一個DataFrame類型。而進一步查看DataFrame的源碼,我們發現:
type DataFrame = Dataset[Row]
而Row是非類型安全的,就有點像JDBC裏面的ResultSet那樣。我們為了操作起來更順手一些,定義一個實體類來開發。
上代碼:
case class YoutubeVideo(video_id: String
, trending_date: String
, title: String
, channel_title: String
, category_id: String
, publish_time: String
, tags: String
, views: Long
, likes: Long
, dislikes: Long
, comment_count: String
, thumbnail_link: String
, comments_disabled: Boolean
, ratings_disabled: Boolean
, video_error_or_removed: String
, description: String)
case class CategoryResult(categoryId:String
, totalLikes:Long
, totalDislikes:Long)
object DataSetAnalysis {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("Youtube Analysis")
.master("local[*]")
.config("spark.sql.shuffle.partitions",1)
.getOrCreate()
import spark.sqlContext.implicits._
// 讀取CSV
val youtubeVideoDF:DataFrame = spark.read.option("header", true).csv("""E:\05.git_project\dataset\USvideos.csv""")
// 轉換為DataSet
youtubeVideoDF.printSchema()
// 轉換為Dataset[YoutubeVideo]
val youtubeVideoDS = youtubeVideoDF.filter(row => {
if(row.getString(7) != null && !row.getString(7).isBlank
&& row.getString(8) != null && !row.getString(8).isBlank
&& row.getString(9) != null && !row.getString(9).isBlank) {
if(util.Try(row.getString(7).toLong).isSuccess
&& util.Try(row.getString(8).toLong).isSuccess
&& util.Try(row.getString(9).toLong).isSuccess) {
true
}
else {
false
}
}
else {
false
}
})
.map(row => YoutubeVideo(row.getString(0)
, row.getString(1)
, row.getString(2)
, row.getString(3)
, row.getString(4)
, row.getString(5)
, row.getString(6)
, row.getString(7).toLong
, row.getString(8).toLong
, row.getString(9).toLong
, row.getString(10)
, row.getString(11)
, row.getString(12).toLowerCase().toBoolean
, row.getString(13).toLowerCase().toBoolean
, row.getString(14)
, row.getString(15)
))
youtubeVideoDS.groupByKey(_.category_id)
.mapValues(y => CategoryResult(y.category_id, y.likes, y.dislikes))
.reduceGroups{(cr1, cr2) => {
CategoryResult(cr1.categoryId, cr1.totalLikes + cr2.totalLikes, cr1.totalDislikes + cr2.totalDislikes)
}}
// 只獲取Value部分,key部分過濾掉
.map(t => t._2)
.toDF()
.show()
TimeUnit.HOURS.sleep(1)
}
}
可以看到,我們對DataFrame進行了類型的安全轉換。來看一下Spark SQL執行的JOB。
同樣,基於DataSet的代碼,也執行了兩個JOB。
第一個JOB是一樣的,因為我們一樣要處理CSV的header。
而第二部分,命名我們了用了很多的groupByKey、mapValues、reduceGroups、map等操作。但其底層,執行的還是與DataFrame一樣高效的DAG。
很明顯,這個部門是我們編寫的DSL得到的DAG代碼。查看詳細的執行過程:
Spark依然給我們做了不少的一些優化動作。
看一下執行計劃。
基於DataSet依然是有執行計劃的。依然會基於Catalyst進行優化。但可以看到,這個實現明顯比基於DataFrame的邏輯更加複雜,雖然做的事情差不太多。
對比RDD和DataSet的API
- RDD的操作都是最底層的,Spark不會做任何的優化。是low level的API,無法執行schema的高階聲明式操作
- DataSet支持很多類似於RDD的功能函數,而且支持DataFrame的所有操作。其實我們前面看到了DataFrame就是一種特殊的、能力稍微弱一點的DataSet。DataSet是一種High Level的API,在效率上比RDD有很大的提升。
對比RDD、DataFrame、DataSet
RDD | DataFrame | DataSet | |
---|---|---|---|
schema | 無 需要自己建立shcema |
有 支持自動識別schema |
有schema 支持自動識別schema |
聚合操作 | 慢 | 最快 | 快 |
自動性能優化 | 無 開發人員自己優化 |
有 | 有 |
類型安全 | 安全 | 非安全 | 安全 |
序列化 | Java序列化,存儲/讀取整個Java對象 | Tungsten,堆外內存,可以按需存儲訪問屬性 | Tungsten,堆外內存,可以按需存儲訪問屬性 |
內存使用率 | 低 | 高 | 高 |
GC | 創建和銷毀每一個對象都有GC開銷 | 無需GC,使用堆外存儲 | 無需GC,使用堆外存儲 |
懶執行 | 支持 | 支持 | 支持 |
參考文獻
[1]//spark.apache.org/docs/latest/rdd-programming-guide.html
[2]//spark.apache.org/docs/latest/sql-programming-guide.html
[3]//databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html