別再人云亦云了!!!你真的搞懂了RDD、DF、DS的區別嗎?

幾年前,包括最近,我看了各種書籍、教程、官網。但是真正能夠把RDD、DataFrame、DataSet解釋得清楚一點的、論據多一點少之又少,甚至有的人號稱Spark專家,但在這一塊根本說不清楚。還有國內的一些書籍,小猴真的想問一聲:Are you OK?書名別再叫精通xxx技術了,請改名為 xxx技術從入門到放棄。這樣可以有效避免耽誤別人學習,不好嗎?

大家都在告訴我們結論,但其實,小猴作為一名長期混跡於開源社區、並仍在一線大數據開發的技術人,深諳技術文化之一:

To experience | 去經歷

這是我要提倡的技術文化之一。之前有人把Experience譯為體驗,但在小猴的技術世界裏,Experience更多的是自己去經歷,而不能跟團去旅遊一樣,那樣你只能是一個外包而已,想要做到卓越,就得去經歷。技術,只有去經歷才會有成長。

目錄

RDD、DataFrame、DataSet介紹

我們每天都在基於框架開發,對於我們來說,一套易於使用的API太重要了。對於Spark來說,有三套API。

image-20210201000858671

分別是:

  • RDD
  • DataFrame
  • DataSet

三套的API,開發人員就要學三套。不過,從Spark 2.2開始,DataFrame和DataSet的API已經統一了。而編寫Spark程序的時候,RDD已經慢慢退出我們的視野了。

但Spark既然提供三套API,我們到底什麼時候用RDD、什麼時候用DataFrame、或者DataSet呢?我們先來了解下這幾套API。

RDD

RDD的概念

  • RDD是Spark 1.1版本開始引入的。
  • RDD是Spark的基本數據結構。
  • RDD是Spark的彈性分佈式數據集,它是不可變的(Immutable)。
  • RDD所描述的數據分佈在集群的各個節點中,基於RDD提供了很多的轉換的並行處理操作。
  • RDD具備容錯性,在任何節點上出現了故障,RDD是能夠進行容錯恢復的。
  • RDD專註的是How!就是如何處理數據,都由我們自己來去各種算子來實現。

什麼時候使用RDD?

  • 應該避免使用RDD!

RDD的短板

  • 集群間通信都需要將JVM中的對象進行序列化和反序列化,RDD開銷較大
  • 頻繁創建和銷毀對象會增加GC,GC的性能開銷較大

image-20210201231436680

Spark 2.0開始,RDD不再是一等公民

從Apache Spark 2.0開始,RDD已經被降級為二等公民,RDD已經被棄用了。而且,我們一會就會發現,DataFrame/DataSet是可以和RDD相互轉換的,DataFrame和DataSet也是建立在RDD上。

DataFrame

DataFrame概念

  • DataFrame是從Spark 1.3版本開始引入的。
  • 通過DataFrame可以簡化Spark程序的開發,讓Spark處理結構化數據變得更簡單。DataFrame可以使用SQL的方式來處理數據。例如:業務分析人員可以基於編寫Spark SQL來進行數據開發,而不僅僅是Spark開發人員。
  • DataFrame和RDD有一些共同點,也是不可變的分佈式數據集。但與RDD不一樣的是,DataFrame是有schema的,有點類似於關係型數據庫中的,每一行的數據都是一樣的,因為。有了schema,這也表明了DataFrame是比RDD提供更高層次的抽象。
  • DataFrame支持各種數據格式的讀取和寫入,例如:CSV、JSON、AVRO、HDFS、Hive表。
  • DataFrame使用Catalyst進行優化。
  • DataFrame專註的是What!,而不是How!

DataFrame的優點

  • 因為DataFrame是有統一的schema的,所以序列化和反序列無需存儲schema。這樣節省了一定的空間。
  • DataFrame存儲在off-heap(堆外內存)中,由操作系統直接管理(RDD是JVM管理),可以將數據直接序列化為二進制存入off-heap中。操作數據也是直接操作off-heap。

DataFrane的短板

  • DataFrame不是類型安全的
  • API也不是面向對象的

Apache Spark 2.0 統一API

從Spark 2.0開始,DataFrame和DataSet的API合併在一起,實現了跨庫統一成為一套API。這樣,開發人員的學習成本就降低了。只需要學習一個High Level的、類型安全的DataSet API就可以了。——這對於Spark開發人員來說,是一件好事。

Spark2.0統一API

上圖我們可以看到,從Spark 2.0開始,Dataset提供了兩組不同特性的API:

  • 非類型安全
  • 類型安全

其中非類型安全就是DataSet[Row],我們可以對Row中的字段取別名。這不就是DataFrame嗎?而類型安全就是JVM對象的集合,類型就是scala的樣例類,或者是Java的實體類。

有Spark 2.0源碼為證:

package object sql {
  // ...
  type DataFrame = Dataset[Row]
}

//github.com/IloveZiHan/spark/blob/branch-2.0/sql/core/src/main/scala/org/apache/spark/sql/package.scala

也就是說,每當我們用導DataFrame其實就是在使用Dataset。

針對Python或者R,不提供類型安全的DataSet,只能基於DataFrame API開發。

什麼時候使用DataFrame

DataSet

  • DataSet是從Spark 1.6版本開始引入的。
  • DataSet具有RDD和DataFrame的優點,既提供了更有效率的處理、以及類型安全的API。
  • DataSet API都是基於Lambda函數、以及JVM對象來進行開發,所以在編譯期間就可以快速檢測到錯誤,節省開發時間和成本。
  • DataSet使用起來很像,但它的執行效率、空間資源效率都要比RDD高很多。可以很方便地使用DataSet處理結構化、和非結構數據。

DataSet API的優點

image-20210201231136055

  • DataSet結合了RDD和DataFrame的優點。
  • 當序列化數據時,Encoder生成的位元組碼可以直接與堆交互,實現對數據按需訪問,而無需反序列化整個對象。
類型安全

寫過Java或者C#的同學都會知道,一旦在代碼中類型使用不當,編譯都編譯不過去。日常開發中,我們更多地是使用泛型。因為一旦我們使用非類型安全的類型,軟件的維護周期一長,如果集合中放入了一些不合適的類型,就會出現嚴重的故障。這也是為什麼Java、C#還有C++都要去支持泛型的原因。

在Spark中也會有類型安全的問題。而且,一旦在運行時出現類型安全問題,會影響整個大規模計算作業。這種作業的錯誤排除難度,要比單機故障排查起來更複雜。如果在運行時期間就能發現問題,這很美好啊。

DataFrame中編寫SQL進行數據處理分析,在編譯時是不做檢查的,只有在Spark程序運行起來,才會檢測到問題。

SQL DataFrame Dataset
語法錯誤 運行時 編譯時 編譯時
解析錯誤 運行時 運行時 編譯時
對結構化和半結構化數據的High Level抽象

例如:我們有一個較大的網站流量日誌JSON數據集,可以很容易的使用DataSet[WebLog]來處理,強類型操作可以讓處理起來更加簡單。

以RDD更易用的API

DataSet引入了更豐富的、更容易使用的API操作。這些操作是基於High Level抽象的,而且基於實體類的操作,例如:進行groupBy、agg、select、sum、avg、filter等操作會容易很多。

性能優化

使用DataFrame和DataSet API在性能和空間使用率上都有大幅地提升。

  1. DataFrame和DataSet API是基於Spark SQL引擎之上構建的,會使用Catalyst生成優化後的邏輯和物理執行計劃。尤其是無類型的DataSet[Row](DataFrame),它的速度更快,很適合交互式查詢。

  2. 由於Spark能夠理解DataSet中的JVM對象類型,所以Spark會將將JVM對象映射為Tungsten的內部內存方式存儲。而Tungsten編碼器可以讓JVM對象更有效地進行序列化和反序列化,生成更緊湊、更有效率的位元組碼。

    RDD存儲效率 VS DataSet存儲效率通過上圖可以看到,DataSet的空間存儲效率是RDD的4倍。RDD要使用60GB的空間,而DataSet只需要使用不到15GB就可以了。

Youtube視頻分析案例

數據集

去Kaggle下載youtube地址:

//www.kaggle.com/datasnaek/youtube-new?select=USvideos.csv

每個字段的含義都有說明。

Maven開發環境準備

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <scala.version>2.12</scala.version>
        <spark.version>3.0.1</spark.version>
    </properties>

    <repositories>
        <repository>
            <id>central</id>
            <url>//maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
    </repositories>
    <pluginRepositories>
        <pluginRepository>
            <id>central</id>
            <url>//maven.aliyun.com/nexus/content/groups/public/</url>
        </pluginRepository>
    </pluginRepositories>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>com.opencsv</groupId>
            <artifactId>opencsv</artifactId>
            <version>5.3</version>
        </dependency>
    </dependencies>

    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
    </build>

RDD開發

/**
 * Spark RDD處理示例
 */
object RddAnalysis {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("RDD Process").setMaster("local[*]")
        val sc = new SparkContext(conf)

        // 讀取本地文件創建RDD
        val youtubeVideosRDD = {
            sc.textFile("""E:\05.git_project\dataset\youtube""")
        }

        // 統計不同分類Youtube視頻的喜歡人數、不喜歡人數
        // 1. 添加行號
        // 創建計數器
        val rownumAcc = sc.longAccumulator("rownum")
        // 帶上行號
        youtubeVideosRDD.map(line => {
                rownumAcc.add(1)
                rownumAcc.value -> line
            })
            // 過濾掉第一行
            .filter(_._1 != 1)
            // 去除行號
            .map(_._2)
            // 過濾掉非法的數據
            .filter(line => {
                val fields = line.split("\001")
                val try1 = scala.util.Try(fields(8).toLong)
                val try2 = scala.util.Try(fields(9).toLong)

                if(try1.isFailure || try2.isFailure)
                    false
                else
                    true
            })
            // 讀取三個字段(視頻分類、喜歡的人數、不喜歡的人數
            .map(line => {
                // 按照\001解析CSV
                val fields = line.split("\001")
                // 取第4個(分類)、第8個(喜歡人數)、第9個(不喜歡人數)
                // (分類id, 喜歡人數, 不喜歡人數)
                (fields(4), fields(8).toLong, fields(9).toLong)
            })
            // 按照分類id分組
            .groupBy(_._1)
            .map(t => {
                val result = t._2.reduce((r1, r2) => {
                    (r1._1, r1._2 + r2._2, r1._3 + r2._3)
                })
                result
            })
            .foreach(println)
    }
}

運行結果如下:

("BBC Three",8980120,149525)
("Ryan Canty",11715543,80544)
("Al Jazeera English",34427,411)
("FBE",9003314,191819)
("Sugar Pine 7",1399232,81062)
("Rob Scallon",11652652,704748)
("CamilaCabelloVEVO",19077166,1271494)
("Grist",3133,37)

代碼中做了一些數據的過濾,然後進行了分組排序。如果Spark都要這麼來寫的話,業務人員幾乎是沒法寫了。着代碼完全解釋了How,而不是What。每一個處理的細節,都要我們自己親力親為。實現起來臃腫。

查看下基於RDD的DAG

打開瀏覽器,輸入:localhost:4040,來看下DAG。

image-20210203225714246

DAG非常的直觀,按照shuffle分成了兩個Stage來執行。Stage中依次執行了每個Operator。程序沒有經過任何優化。我把每一個操作都和DAG上的節點對應了起來。

DataFrame開發

object DataFrameAnalysis {
    def main(args: Array[String]): Unit = {
        val spark = SparkSession
            .builder()
            .appName("Youtube Analysis")
            .master("local[*]")
            .config("spark.sql.shuffle.partitions",1)
            .getOrCreate()

        import spark.sqlContext.implicits._

        // 讀取CSV
        val youtubeVideoDF = spark.read.option("header", true).csv("""E:\05.git_project\dataset\USvideos.csv""")

        import org.apache.spark.sql.functions._

        // 按照category_id分組聚合
        youtubeVideoDF.select($"category_id", $"likes".cast(LongType), $"dislikes".cast(LongType))
            .where($"likes".isNotNull)
            .where( $"dislikes".isNotNull)
            .groupBy($"category_id")
            .agg(sum("likes"), sum("dislikes"))
            .show()
    }
}

大家可以看到,現在實現方式非常的簡單,而且清晰。

查看下基於DataFrame的執行計劃與DAG

image-20210203230500292

但我們運行上面的Spark程序時,其實運行了兩個JOB。

image-20210203230807773

下面這個是第一個Job的DAG。我們看到只有一個Stage。這個DAG我們看得不是特別清楚做了什麼,因為Spark SQL是做過優化的,我們需要查看Query的詳細信息,才能看到具體執行的工作。

image-20210203233802280

第一個Job的詳細執行信息如下:

image-20210203231757894

哦,原來這個JOB掃描了所有的行,然後執行了一個Filter過濾操作。再查看下查詢計劃:

== Parsed Logical Plan ==
GlobalLimit 1
+- LocalLimit 1
   +- Filter (length(trim(value#6, None)) > 0)
      +- Project [value#0 AS value#6]
         +- Project [value#0]
            +- Relation[value#0] text

== Analyzed Logical Plan ==
value: string
GlobalLimit 1
+- LocalLimit 1
   +- Filter (length(trim(value#6, None)) > 0)
      +- Project [value#0 AS value#6]
         +- Project [value#0]
            +- Relation[value#0] text

== Optimized Logical Plan ==
GlobalLimit 1
+- LocalLimit 1
   +- Filter (length(trim(value#0, None)) > 0)
      +- Relation[value#0] text

== Physical Plan ==
CollectLimit 1
+- *(1) Filter (length(trim(value#0, None)) > 0)
   +- FileScan text [value#0] Batched: false, DataFilters: [(length(trim(value#0, None)) > 0)], Format: Text, Location: InMemoryFileIndex[file:/E:/05.git_project/dataset/USvideos.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value:string>

可以非常清晰地看到,我們說看到的DAG是經過優化後的。

第二個JOB的DAG如下,同樣,我們也只能看到個大概。例如:Scan csv讀取csv文件,然後執行Spark SQL自動生成、優化後的Codegen階段,再執行了一次Shuffle(Exchange),然後再執行Spark SQL的codegen,最後執行mapPartition操作。

image-20210203233725024

為了一探究竟,我們依然得去查看Query Detail。這個Query Detail圖稍微長一點。我們很兩個部分來講解。

第一部分:

image-20210203234238998

  1. 掃描csv文件,一共讀取了一個文件,大小是59.8MB,一共有41035行。鼠標移上去,可以看到讀取的文件路徑、讀取的schema是什麼。

    image-20210203234552243

  2. 執行過濾操作(Filter)過濾出來的結果是40949行。把鼠標放在該操作,可以看到具體過濾的內容。

    image-20210203234453182

  3. 執行Project投影查詢。其實就是執行select語句。

    image-20210203235430045

  4. 然後開始執行Hash聚合。按照category_id進行分組,並執行了partial_sum。

    image-20210203235623836

第二部分:

image-20210203235724388

  1. Exchange表示進行數據交換(其實就是shuffle),shuffle一共讀取了122行。
  2. 接着進行Hash聚合,按照category分組,並進行sum求和,計算得到最終結果。
  3. 最後輸出21行,多出來的一行顯示的第頭部。

image-20210204000050064

雖然DataFrame我們使用的是DSL方式,但我們可以感受這個過程處理起來比較簡單。根據列進行分組聚合的時候,在編譯時期是對類型不敏感的、非安全的。我們要保證列名、類型都是正確的。同時,我們可以清晰的看到Spark SQL對程序執行過程的優化。

DataSet開發

要使用DataSet開發,我們先來看一下csv讀取數據成為DataFrame的spark源碼。

def csv(path: String): DataFrame = {
    // This method ensures that calls that explicit need single argument works, see SPARK-16009
    csv(Seq(path): _*)
}

我們可以看到csv返回的是一個DataFrame類型。而進一步查看DataFrame的源碼,我們發現:

type DataFrame = Dataset[Row]

而Row是非類型安全的,就有點像JDBC裏面的ResultSet那樣。我們為了操作起來更順手一些,定義一個實體類來開發。

上代碼:

case class YoutubeVideo(video_id: String
                        , trending_date: String
                        , title: String
                        , channel_title: String
                        , category_id: String
                        , publish_time: String
                        , tags: String
                        , views: Long
                        , likes: Long
                        , dislikes: Long
                        , comment_count: String
                        , thumbnail_link: String
                        , comments_disabled: Boolean
                        , ratings_disabled: Boolean
                        , video_error_or_removed: String
                        , description: String)

case class CategoryResult(categoryId:String
                          , totalLikes:Long
                          , totalDislikes:Long)

object DataSetAnalysis {
    def main(args: Array[String]): Unit = {
        val spark = SparkSession
            .builder()
            .appName("Youtube Analysis")
            .master("local[*]")
            .config("spark.sql.shuffle.partitions",1)
            .getOrCreate()

        import spark.sqlContext.implicits._

        // 讀取CSV
        val youtubeVideoDF:DataFrame = spark.read.option("header", true).csv("""E:\05.git_project\dataset\USvideos.csv""")
        // 轉換為DataSet
        youtubeVideoDF.printSchema()

        // 轉換為Dataset[YoutubeVideo]
        val youtubeVideoDS = youtubeVideoDF.filter(row => {
            if(row.getString(7) != null && !row.getString(7).isBlank
                && row.getString(8) != null && !row.getString(8).isBlank
                && row.getString(9) != null && !row.getString(9).isBlank) {
                if(util.Try(row.getString(7).toLong).isSuccess
                    && util.Try(row.getString(8).toLong).isSuccess
                    && util.Try(row.getString(9).toLong).isSuccess) {
                    true
                }
                else {
                    false
                }
            }
            else {
                false
            }
        })
        .map(row => YoutubeVideo(row.getString(0)
            , row.getString(1)
            , row.getString(2)
            , row.getString(3)
            , row.getString(4)
            , row.getString(5)
            , row.getString(6)
            , row.getString(7).toLong
            , row.getString(8).toLong
            , row.getString(9).toLong
            , row.getString(10)
            , row.getString(11)
            , row.getString(12).toLowerCase().toBoolean
            , row.getString(13).toLowerCase().toBoolean
            , row.getString(14)
            , row.getString(15)
        ))


        youtubeVideoDS.groupByKey(_.category_id)
            .mapValues(y => CategoryResult(y.category_id, y.likes, y.dislikes))
            .reduceGroups{(cr1, cr2) => {
                CategoryResult(cr1.categoryId, cr1.totalLikes + cr2.totalLikes, cr1.totalDislikes + cr2.totalDislikes)
            }}
            // 只獲取Value部分,key部分過濾掉
            .map(t => t._2)
            .toDF()
            .show()

        TimeUnit.HOURS.sleep(1)
    }
}

可以看到,我們對DataFrame進行了類型的安全轉換。來看一下Spark SQL執行的JOB。

同樣,基於DataSet的代碼,也執行了兩個JOB。

image-20210204004356287

第一個JOB是一樣的,因為我們一樣要處理CSV的header。

而第二部分,命名我們了用了很多的groupByKey、mapValues、reduceGroups、map等操作。但其底層,執行的還是與DataFrame一樣高效的DAG。

image-20210204004430820

很明顯,這個部門是我們編寫的DSL得到的DAG代碼。查看詳細的執行過程:

image-20210204004602444

Spark依然給我們做了不少的一些優化動作。

image-20210204004631145

看一下執行計劃。

image-20210204004703810

基於DataSet依然是有執行計劃的。依然會基於Catalyst進行優化。但可以看到,這個實現明顯比基於DataFrame的邏輯更加複雜,雖然做的事情差不太多。

對比RDD和DataSet的API

  • RDD的操作都是最底層的,Spark不會做任何的優化。是low level的API,無法執行schema的高階聲明式操作
  • DataSet支持很多類似於RDD的功能函數,而且支持DataFrame的所有操作。其實我們前面看到了DataFrame就是一種特殊的、能力稍微弱一點的DataSet。DataSet是一種High Level的API,在效率上比RDD有很大的提升。

對比RDD、DataFrame、DataSet

RDD DataFrame DataSet
schema
需要自己建立shcema

支持自動識別schema
有schema
支持自動識別schema
聚合操作 最快
自動性能優化
開發人員自己優化
類型安全 安全 非安全 安全
序列化 Java序列化,存儲/讀取整個Java對象 Tungsten,堆外內存,可以按需存儲訪問屬性 Tungsten,堆外內存,可以按需存儲訪問屬性
內存使用率
GC 創建和銷毀每一個對象都有GC開銷 無需GC,使用堆外存儲 無需GC,使用堆外存儲
懶執行 支持 支持 支持

參考文獻

[1]//spark.apache.org/docs/latest/rdd-programming-guide.html

[2]//spark.apache.org/docs/latest/sql-programming-guide.html

[3]//databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html