【推薦系統演算法實戰】 Spark :大數據處理框架

  • 2019 年 12 月 24 日
  • 筆記

Spark 簡介

http://spark.apache.org/ https://github.com/to-be-architect/spark

HadoopStorm等其他大數據和MapReduce技術相比,Spark有如下優勢:

  • Spark提供了一個全面、統一的框架用於管理各種有著不同性質(文本數據、圖表數據等)的數據集和數據源(批量數據或實時的流數據)的大數據處理的需求.
  • 官方資料介紹Spark可以將Hadoop集群中的應用在記憶體中的運行速度提升100倍,甚至能夠將應用在磁碟上的運行速度提升10倍

架構及生態

通常當需要處理的數據量超過了單機尺度(比如我們的電腦有4GB的記憶體,而我們需要處理100GB以上的數據)這時我們可以選擇spark集群進行計算,有時我們可能需要處理的數據量並不大,但是計算很複雜,需要大量的時間,這時我們也可以選擇利用spark集群強大的計算資源,並行化地計算,其架構示意圖如下:

Spark組成(BDAS):全稱伯克利數據分析棧,通過大規模集成演算法、機器、人之間展現大數據應用的一個平台。也是處理大數據、雲計算、通訊的技術解決方案。

它的主要組件有:

  • SparkCore:將分散式數據抽象為彈性分散式數據集(RDD),實現了應用任務調度、RPC、序列化和壓縮,並為運行在其上的上層組件提供API。
  • SparkSQL:Spark Sql 是Spark來操作結構化數據的程式包,可以讓我使用SQL語句的方式來查詢數據,Spark支援 多種數據源,包含Hive表,parquest以及JSON等內容。
  • SparkStreaming: 是Spark提供的實時數據進行流式計算的組件。
  • MLlib:提供常用機器學習演算法的實現庫。 https://spark.apache.org/docs/latest/mllib-guide.html
  • GraphX:提供一個分散式圖計算框架,能高效進行圖計算。
  • BlinkDB:用於在海量數據上進行互動式SQL的近似查詢引擎。
  • Tachyon:以記憶體為中心高容錯的的分散式文件系統。

Spark結構設計

Spark運行架構包括集群資源管理器(Cluster Manager)、運行作業任務的工作節點(Worker Node)、每個應用的任務控制節點(Driver)和每個工作節點上負責具體任務的執行進程(Executor)。其中,集群資源管理器可以是Spark自帶的資源管理器,也可以是YARN或Mesos等資源管理框架。

Spark運行基本流程

Spark的基本運行流程如下:

當一個Spark應用被提交時,首先需要為這個應用構建起基本的運行環境,即由任務控制節點(Driver)創建一個SparkContext,由SparkContext負責和資源管理器(Cluster Manager)的通訊以及進行資源的申請、任務的分配和監控等。SparkContext會向資源管理器註冊並申請運行Executor的資源;

資源管理器為Executor分配資源,並啟動Executor進程,Executor運行情況將隨著「心跳」發送到資源管理器上;

SparkContext根據RDD的依賴關係構建DAG圖,DAG圖提交給DAG調度器(DAGScheduler)進行解析,將DAG圖分解成多個「階段」(每個階段都是一個任務集),並且計算出各個階段之間的依賴關係,然後把一個個「任務集」提交給底層的任務調度器(TaskScheduler)進行處理;Executor向SparkContext申請任務,任務調度器將任務分發給Executor運行,同時,SparkContext將應用程式程式碼發放給Executor;

任務在Executor上運行,把執行結果回饋給任務調度器,然後回饋給DAG調度器,運行完畢後寫入數據並釋放所有資源。

Spark三種部署方式

Spark應用程式在集群上部署運行時,可以由不同的組件為其提供資源管理調度服務(資源包括CPU、記憶體等)。比如,可以使用自帶的獨立集群管理器(standalone),或者使用YARN,也可以使用Mesos。因此,Spark包括三種不同類型的集群部署方式,包括standalone、Spark on Mesos和Spark on YARN。

1.standalone模式

與MapReduce1.0框架類似,Spark框架本身也自帶了完整的資源調度管理服務,可以獨立部署到一個集群中,而不需要依賴其他系統來為其提供資源管理調度服務。在架構的設計上,Spark與MapReduce1.0完全一致,都是由一個Master和若干個Slave構成,並且以槽(slot)作為資源分配單位。不同的是,Spark中的槽不再像MapReduce1.0那樣分為Map 槽和Reduce槽,而是只設計了統一的一種槽提供給各種任務來使用。

2.Spark on Mesos模式

Mesos是一種資源調度管理框架,可以為運行在它上面的Spark提供服務。Spark on Mesos模式中,Spark程式所需要的各種資源,都由Mesos負責調度。由於Mesos和Spark存在一定的血緣關係,因此,Spark這個框架在進行設計開發的時候,就充分考慮到了對Mesos的充分支援,因此,相對而言,Spark運行在Mesos上,要比運行在YARN上更加靈活、自然。目前,Spark官方推薦採用這種模式,所以,許多公司在實際應用中也採用該模式。

3. Spark on YARN模式

Spark可運行於YARN之上,與Hadoop進行統一部署,即「Spark on YARN」,其架構如圖9-13所示,資源管理和調度依賴YARN,分散式存儲則依賴HDFS。

「Spark on YARN」

Hadoop和Spark統一部署

一方面,由於Hadoop生態系統中的一些組件所實現的功能,目前還是無法由Spark取代的,比如,Storm可以實現毫秒級響應的流計算,但是,Spark則無法做到毫秒級響應。另一方面,企業中已經有許多現有的應用,都是基於現有的Hadoop組件開發的,完全轉移到Spark上需要一定的成本。因此,在許多企業實際應用中,Hadoop和Spark的統一部署是一種比較現實合理的選擇。 由於Hadoop MapReduce、HBase、Storm和Spark等,都可以運行在資源管理框架YARN之上,因此,可以在YARN之上進行統一部署(如圖9-16所示)。這些不同的計算框架統一運行在YARN中,可以帶來如下好處:

 計算資源按需伸縮;  不用負載應用混搭,集群利用率高;  共享底層存儲,避免數據跨集群遷移。

MapReduce & Spark

image

七個MapReduce作業意味著需要七次讀取和寫入HDFS,而它們的輸入輸出數據存在關聯,七個作業輸入輸出數據關係如下圖。

image

基於MapReduce實現此演算法存在以下問題:

  • 為了實現一個業務邏輯需要使用七個MapReduce作業,七個作業間的數據交換通過HDFS完成,增加了網路和磁碟的開銷。
  • 七個作業都需要分別調度到集群中運行,增加了Gaia集群的資源調度開銷。
  • MR2和MR3重複讀取相同的數據,造成冗餘的HDFS讀寫開銷。

這些問題導致作業運行時間大大增長,作業成本增加。相比與MapReduce編程模型,Spark提供了更加靈活的DAG(Directed Acyclic Graph) 編程模型, 不僅包含傳統的map、reduce介面, 還增加了filter、flatMap、union等操作介面,使得編寫Spark程式更加靈活方便。使用Spark編程介面實現上述的業務邏輯如下圖所示。

image

相對於MapReduce,Spark在以下方面優化了作業的執行時間和資源使用。

  • DAG編程模型。 通過Spark的DAG編程模型可以把七個MapReduce簡化為一個Spark作業。Spark會把該作業自動切分為八個Stage,每個Stage包含多個可並行執行的Tasks。Stage之間的數據通過Shuffle傳遞。最終只需要讀取和寫入HDFS一次。減少了六次HDFS的讀寫,讀寫HDFS減少了70%。
  • Spark作業啟動後會申請所需的Executor資源,所有Stage的Tasks以執行緒的方式運行,共用Executors,相對於MapReduce方式,Spark申請資源的次數減少了近90%。
  • Spark引入了RDD(Resilient Distributed Dataset)模型,中間數據都以RDD的形式存儲,而RDD分布存儲於slave節點的記憶體中,這就減少了計算過程中讀寫磁碟的次數。RDD還提供了Cache機制,例如對上圖的rdd3進行Cache後,rdd4和rdd7都可以訪問rdd3的數據。相對於MapReduce減少MR2和MR3重複讀取相同數據的問題。

環境安裝

http://spark.apache.org/downloads.html

Spark的部署模式有Local、Local-Cluster、Standalone、Yarn、Mesos,我們選擇最具代表性的Standalone集群部署模式。

進入到Spark安裝目錄

cd /home/bigdata/hadoop/spark-2.1.1-bin-hadoop2.7/conf

將slaves.template複製為slaves 將spark-env.sh.template 複製為 spark-env.sh 修改 slave 文件,將 work 的 hostname 輸入:

修改spark-env.sh文件,添加如下配置:

將配置好的Spark文件拷貝到其他節點上

Spark集群配置完畢,目前是1個Master,2個Work,linux01上啟動Spark集群 /opt/modules/spark-2.1.1-bin-hadoop2.7/sbin/start-all.sh

啟動後執行jps命令,主節點上有Master進程,其他子節點上有Work進行,登錄Spark管理介面查看集群狀態(主節點):http://linux01:8080/

jack@Jack-MacBook-Pro:~/soft/spark-3.0.0-preview-bin-hadoop2.7/sbin$ ./start-all.sh  starting org.apache.spark.deploy.master.Master, logging to /Users/jack/soft/spark-3.0.0-preview-bin-hadoop2.7/logs/spark-jack-org.apache.spark.deploy.master.Master-1-Jack-MacBook-Pro.local.out  localhost: Warning: Permanently added 'localhost' (ECDSA) to the list of known hosts.  Password:  localhost: starting org.apache.spark.deploy.worker.Worker, logging to /Users/jack/soft/spark-3.0.0-preview-bin-hadoop2.7/logs/spark-jack-org.apache.spark.deploy.worker.Worker-1-Jack-MacBook-Pro.local.out  localhost: bash: shell_session_update: command not found  jack@Jack-MacBook-Pro:~/soft/spark-3.0.0-preview-bin-hadoop2.7/sbin$  jack@Jack-MacBook-Pro:~/soft/spark-3.0.0-preview-bin-hadoop2.7/sbin$ jps  4160 Launcher  82225 Worker  23393 Preloader  71349 Launcher  43252 Launcher  82244 Jps  82068 Master  54599 ApplicationKt  59352 Main  59291 org.eclipse.equinox.launcher_1.5.600.v20191014-2022.jar  53807 

到此為止,Spark集群安裝完畢.

注意:如果遇到 「JAVA_HOME not set」 異常,可以在sbin目錄下的spark-config.sh 文件中加入如下配置: export JAVA_HOME=XXXX

快速開始

http://spark.apache.org/examples.html

spark任務部署:

./bin/spark-submit --class org.apache.spark.examples.SparkPi         #作業類名      --master yarn                        #spark模式      --deploy-mode cluster                 #spark on yarn 模式      --driver-memory 4g                     #每一個driver的記憶體      --executor-memory 2g                     #每一個executor的記憶體      --executor-cores 1                         #每一個executor佔用的core數量      --queue thequeue                             #作業執行的隊列      examples/jars/spark-examples*.jar             #jar包      10                                               #傳入類中所需要的參數

spark 任務劃分:

一個jar包就是一個Application

一個行動操作就是一個Job, 對應於Hadoop中的一個MapReduce任務

一個Job有很多Stage組成,劃分Stage是從後往前劃分,遇到寬依賴則將前面的所有轉換換分為一個Stage

一個Stage有很多Task組成,一個分區被一個Task所處理,所有分區數也叫並行度。

參考資料:

https://blog.csdn.net/qq_17677907/article/details/88685705 https://blog.csdn.net/c391183914/article/details/78672555 https://www.jianshu.com/p/11c4cfa094aa


Kotlin 開發者社區

中國第一Kotlin 開發者社區公眾號,主要分享、交流 Kotlin 程式語言、Spring Boot、Android、React.js/Node.js、函數式編程、編程思想等相關主題。

越是喧囂的世界,越需要寧靜的思考。