Spark內核解析

Spark內核概述

Spark內核泛指Spark的核心運行機制,包括Spark核心組件的運行機制、Spark任務調度機制、Spark記憶體管理機制、Spark核心功能的運行原理等,熟練掌握Spark內核原理。

一、Spark核心組件回顧

Driver

Spark驅動器節點,用於執行Spark任務中的main方法,負責實際程式碼的執行工作。Driver在Spark作業執行時主要負責:

1、將用戶程式轉化為任務(Job);

2、在Executor之間調度任務(task);

3、跟蹤Executor的執行情況;

4、通過UI展示查詢運行情況。

Executor

Spark Executor節點是一個JVM進程,負責在Spark作業中運行具體任務,任務彼此之間相互獨立。Spark應用啟動時,Executor節點被同時啟動,並且始終伴隨著整個Spark應用的生命周期而存在。如果有Executor節點發生了故障或崩潰,Spark應用也可以繼續執行,會將出錯節點上的任務調度到其他Executor節點上繼續運行。

Executor有兩個核心功能:

1、負責運行組成Spark應用的任務,並將結果返回給Driver進程;

2、它們通過自身的塊管理器(Block Manager)為用戶程式中要求快取的RDD提供記憶體式存儲。RDD是直接快取在Executor進程內的,因此任務可以在運行時充分利用快取數據加速運算。

Spark通用運行流程概述

img

上圖為Spark通用運行流程,不論Spark以何種模式進行部署,任務提交後,都會先啟動Driver進程,隨後Driver進程向集群管理器註冊應用程式,之後集群管理器根據此任務的配置文件分配Executor並啟動,當Driver所需的資源全部滿足後,Driver開始執行main函數,Spark查詢為懶執行,當執行到action運算元時開始反向推算,根據寬依賴進行stage的劃分,隨後每一個stage對應一個taskset,taskset中有多個task,根據本地化原則,task會被分發到指定的Executor去執行,在任務執行的過程中,Executor也會不斷與Driver進行通訊,報告任務運行情況。

二、Spark部署模式

Spark支援三種集群管理器(Cluster Manager),分別為:

1、Standalone:獨立模式,Spark原生的簡單集群管理器,自帶完整的服務,可單獨部署到一個集群中,無需依賴任何其他資源管理系統,使用Standalone可以很方便地搭建一個集群;

2、Apache Mesos:一個強大的分散式資源管理框架,它允許多種不同的框架部署在其上,包括yarn;

3、Hadoop YARN:統一的資源管理機制,在上面可以運行多套計算框架,如map reduce、storm等,根據driver在集群中的位置不同,分為yarn client和yarn cluster。

實際上,除了上述這些通用的集群管理器外,Spark內部也提供了一些方便用戶測試和學習的簡單集群部署模式。由於在實際工廠環境下使用的絕大多數的集群管理器是Hadoop YARN,因此我們關注的重點是Hadoop YARN模式下的Spark集群部署。

Spark的運行模式取決於傳遞給SparkContext的MASTER環境變數的值,個別模式還需要輔助的程式介面來配合使用,目前支援的Master字元串及URL包括:

img

用戶在提交任務給Spark處理時,以下兩個參數共同決定了Spark的運行方式。

– master MASTER_URL:決定了Spark任務提交給哪種集群處理。

– deploy-mode DEPLOY_MODE:決定了Driver的運行方式,可選值為Client或者Cluster。

Standalone模式運行機制

Standalone集群有四個重要組成部分,分別是:

(1)Driver:是一個進程,我們編寫的Spark應用程式就運行在Driver上,由Driver進程執行;

(2)Master:是一個進程,主要負責資源調度和分配,並進行集群的監控等職責;

(3)Worker:是一個進程,一個Worker運行在集群中的一台伺服器上,主要負責兩個職責,一個是用自己的記憶體存儲RDD的某個或某些partition;另一個是啟動其他進程和執行緒(Executor),對RDD上的partition進行並行的處理和計算。

(4)Executor:是一個進程,一個Worker上可以運行多個Executor,Executor通過啟動多個執行緒(task)來執行對RDD的partition進行並行計算,也就是執行我們對RDD定義的例如map、flatMap、reduce等運算元操作。

Standalone Client模式

img

1、在Standalone Client模式下,Driver在任務提交的本地機器上運行;

2、Driver啟動後向Master註冊應用程式,Master根據submit腳本的資源需求找到內部資源至少可以啟動一個Executor的所有Worker,然後在這些Worker之間分配Executor;

3、Worker上的Executor啟動後會向Driver反向註冊;

4、當所有的Executor註冊完成後,Driver開始執行main函數;

5、之後執行到Action運算元時,開始劃分stage;

6、每個stage生成對應的taskSet,之後將task 分發到各個Executor上執行。

Standalone Cluster模式

img

1、在Standalone Cluster模式下,任務提交後,Master會找到一個Worker啟動Driver進程;

2、Driver啟動後向 Master註冊應用程式;

3、Master根據submit腳本的資源需求找到內部資源至少可以啟動一個Executor的所有 Worker,然後在這些Worker之間分配Executor;

4、Worker上的Executor啟動後會向Driver反向註冊;

5、所有的 Executor註冊完成後,Driver開始執行main函數;

6、之後執行到Action運算元時,開始劃分stage,每個stage生成對應的taskSet;

7、之後將task分發到各個Executor上執行。

注意,Standalone的兩種模式下(client/Cluster),Master在接到Driver註冊Spark應用程式的請求後,會獲取其所管理的剩餘資源能夠啟動一個 Executor的所有Worker,然後在這些Worker之間分發Executor,此時的分發只考慮Worker上的資源是否足夠使用,直到當前應用程式所需的所有Executor都分配完畢,Executor反向註冊完畢後,Driver開始執行main程式。

YARN模式運行機制

YARN Client模式

img

1、在YARN Client模式下,Driver在任務提交的本地機器上運行;

2、Driver啟動後會和ResourceManager通訊申請啟動ApplicationMaster;

3、隨後ResourceManager分配container,在合適的NodeManager上啟動ApplicationMaster,此時的ApplicationMaster的功能相當於一個ExecutorLaucher(執行者發射器),只負責向ResourceManager申請Executor記憶體;

4、ResourceManager接到ApplicationMaster的資源申請後會分配container,然後ApplicationMaster在資源分配指定的NodeManager上啟動Executor進程;

5、Executor進程啟動後會向Driver反向註冊;

6、Executor全部註冊完成後Driver開始執行main函數;

7、之後執行到Action運算元時,觸發一個job,並根據寬依賴開始劃分stage;

8、每個stage生成對應的taskSet,之後將task分發到各個Executor上執行。

YARN Cluster模式

img

1、在YARN Cluster模式下,任務提交後會和ResourceManager通訊申請啟動ApplicationMaster;

2、隨後ResourceManager分配container,在合適的NodeManager上啟動ApplicationMaster;(此時的ApplicationMaster就是Driver)

3、Driver啟動後向ResourceManager申請Executor記憶體,ResourceManager接到ApplicationMaster的資源申請後會分配container,然後在合適的NodeManager上啟動Executor進程;

4、Executor進程啟動後會向Driver反向註冊;

5、Executor全部註冊完成後Driver開始執行main函數;

6、之後執行到Action運算元時,觸發一個job,並根據寬依賴開始劃分stage;

7、每個stage生成對應的taskSet,之後將task分發到各個Executor上執行。

三、Spark通訊架構

Spark通訊架構概述

Spark2.x版本使用Netty通訊架構作為內部通訊組件。Spark基於Netty新的rpc框架借鑒了Akka中的設計,它是基於Actor模型,如下圖所示:

img

Spark通訊框架中各個組件(Client/Master/Worker)可以認為是一個個獨立的實體,各個實體之間通過消息來進行通訊。具體各個組件之間的關係如下:

img

Endpoint(Client/Master/Worker)有一個InBox和N個OutBox(N>=1,N取決於當前Endpoint與多少其他的Endpoint進行通訊,一個與其通訊的其他Endpoint對應一個OutBox),Endpoint接收到的消息被寫入InBox,發送出去的消息寫入OutBox並被發送到其他Endpoint的InBox中。

Spark通訊架構解析

Spark通訊架構如下圖所示:

img

  1. RpcEndpoint:RPC端點,Spark針對每個節點(Client/Master/Worker)都稱之為一個Rpc 端點,且都實現RpcEndpoint介面,內部根據不同端點的需求,設計不同的消息和不同的業務處理,如果需要發送(詢問)則調用 Dispatcher;

  2. RpcEnv:RPC上下文環境,每個RPC端點運行時依賴的上下文環境稱為 RpcEnv;

  3. Dispatcher:消息分發器,針對於RPC端點需要發送消息或者從遠程 RPC 接收到的消息,分發至對應的指令收件箱/發件箱。如果指令接收方是自己則存入收件箱,如果指令接收方不是自己,則放入發件箱;

  4. Inbox:指令消息收件箱,一個本地RpcEndpoint對應一個收件箱,Dispatcher在每次向Inbox存入消息時,都將對應EndpointData加入內部ReceiverQueue中,另外Dispatcher創建時會啟動一個單獨執行緒進行輪詢ReceiverQueue,進行收件箱消息消費;

  5. RpcEndpointRef:RpcEndpointRef是對遠程RpcEndpoint的一個引用。當我 們需要向一個具體的RpcEndpoint發送消息時,一般我們需要獲取到該RpcEndpoint的引用,然後通過該應用發送消息。

  6. OutBox:指令消息發件箱,對於當前RpcEndpoint來說,一個目標RpcEndpoint對應一個發件箱,如果向多個目標RpcEndpoint發送資訊,則有多個OutBox。當消息放入Outbox後,緊接著通過TransportClient將消息發送出去。消息放入發件箱以及發送過程是在同一個執行緒中進行;

  7. RpcAddress:表示遠程的RpcEndpointRef的地址,Host + Port。

  8. TransportClient:Netty通訊客戶端,一個OutBox對應一個TransportClient,TransportClient不斷輪詢OutBox,根據OutBox消息的receiver資訊,請求對應的遠程TransportServer;

  9. TransportServer:Netty通訊服務端,一個RpcEndpoint對應一個TransportServer,接受遠程消息後調用 Dispatcher分發消息至對應收發件箱;

根據上面的分析,Spark通訊架構的高層視圖如下圖所示:

img

四、SparkContext解析

在Spark中由SparkContext負責與集群進行通訊、資源的申請以及任務的分配和監控等。當 Worker節點中的Executor運行完畢Task後,Driver同時負責將SparkContext關閉。

通常也可以使用SparkContext來代表驅動程式(Driver)。

img

SparkContext是用戶通往Spark集群的唯一入口,可以用來在Spark集群中創建RDD、累加器和廣播變數。

SparkContext也是整個Spark應用程式中至關重要的一個對象,可以說是整個Application運行調度的核心(不包括資源調度)。

SparkContext的核心作用是初始化Spark應用程式運行所需的核心組件,包括高層調度器(DAGScheduler)、底層調度器(TaskScheduler)和調度器的通訊終端(SchedulerBackend),同時還會負責Spark程式向Cluster Manager的註冊等。

img

在實際的編碼過程中,我們會先創建SparkConf實例,並對SparkConf的屬性進行自定義設置,隨後,將SparkConf作為SparkContext類的唯一構造參數傳入來完成SparkContext實例對象的創建。SparkContext在實例化的過程中會初始化DAGScheduler、TaskScheduler和SchedulerBackend,當RDD的action運算元觸發了作業(Job)後,SparkContext會調用DAGScheduler根據寬窄依賴將Job劃分成幾個小的階段(Stage),TaskScheduler會調度每個Stage的任務(Task),另外,SchedulerBackend負責申請和管理集群為當前Application分配的計算資源(即Executor)。

如果我們將Spark Application比作汽車,那麼SparkContext就是汽車的引擎,而SparkConf就是引擎的配置參數。

下圖描述了Spark-On-Yarn模式下在任務調度期間,ApplicationMaster、Driver以及Executor內部模組的交互過程:

img

Driver初始化SparkContext過程中,會分別初始化DAGScheduler、TaskScheduler、SchedulerBackend以及HeartbeatReceiver,並啟動SchedulerBackend以及HeartbeatReceiver。SchedulerBackend通過ApplicationMaster申請資源,並不斷從TaskScheduler中拿到合適的Task分發到Executor執行。HeartbeatReceiver負責接收Executor的心跳資訊,監控Executor的存活狀況,並通知到TaskScheduler。

五、Spark任務調度機制

在工廠環境下,Spark集群的部署方式一般為YARN-Cluster模式,之後的內核分析內容中我們默認集群的部署方式為YARN-Cluster模式。

Spark任務提交流程

img

​ Spark YARN-Cluster模式下的任務提交流程

下面的時序圖清晰地說明了一個Spark應用程式從提交到運行的完整流程:

img

1、提交一個Spark應用程式,首先通過Client向ResourceManager請求啟動一個Application,同時檢查是否有足夠的資源滿足Application的需求,如果資源條件滿足,則準備ApplicationMaster的啟動上下文,交給ResourceManager,並循環監控Application狀態。

2、當提交的資源隊列中有資源時,ResourceManager會在某個 NodeManager上啟動ApplicationMaster進程,ApplicationMaster會單獨啟動Driver後台執行緒,當Driver啟動後,ApplicationMaster會通過本地的RPC連接Driver,並開始向ResourceManager申請Container資源運行Executor進程(一個Executor對應與一個Container),當ResourceManager返回Container資源,ApplicationMaster則在對應的Container上啟動Executor。

3、Driver執行緒主要是初始化SparkContext對象,準備運行所需的上下文,然後一方面保持與ApplicationMaster的RPC連接,通過ApplicationMaster申請資源,另一方面根據用戶業務邏輯開始調度任務,將任務下發到已有的空閑Executor上。

4、當ResourceManager向ApplicationMaster返回Container資源時,ApplicationMaster就嘗試在對應的Container上啟動Executor進程,Executor進程起來後,會向Driver反向註冊,註冊成功後保持與Driver的心跳,同時等待Driver分發任務,當分發的任務執行完畢後,將任務狀態上報給 Driver。

從上述時序圖可知,Client只負責提交Application並監控Application 的狀態。對於Spark的任務調度主要是集中在兩個方面: 資源申請和任務分發,其主要是通過ApplicationMaster、Driver以及Executor之間來完成。

Spark任務調度概述

當Driver起來後,Driver則會根據用戶程式邏輯準備任務,並根據Executor資源情況逐步分發任務。在詳細闡述任務調度前,首先說明下Spark里的幾個概念。一個Spark應用程式包括Job、Stage以及Task三個概念:

Job是以Action方法為界,遇到一個Action方法則觸發一個Job;

Stage是Job的子集,以RDD寬依賴(即 Shuffle)為界,遇到Shuffle做一次劃分;

Task是Stage的子集,以並行度(分區數)來衡量,分區數是多少,則有多少個task。

Spark的任務調度總體來說分兩路進行,一路是Stage級的調度,一路是Task級的調度,總體調度流程如下圖所示:

img

Spark RDD通過其Transactions操作,形成了RDD血緣關係圖,即DAG,最後通過Action的調用,觸發Job並調度執行。DAGScheduler負責Stage級的調度,主要是將job切分成若干個Stage,並將每個Stage打包成TaskSet交給TaskScheduler調度。TaskScheduler負責Task級的調度,將DAGScheduler給過來的TaskSet按照指定的調度策略分發到Executor上執行,調度過程中SchedulerBackend負責提供可用資源,其中SchedulerBackend有多種實現,分別對接不同的資源管理系統。

Spark Stage級調度

Spark的任務調度是從DAG切割開始,主要是由DAGScheduler來完成。當遇到一個Action操作後就會觸發一個Job的計算,並交給DAGScheduler來提交,下圖是涉及到Job提交的相關方法調用流程圖。

img

Job由最終的RDD和Action方法封裝而成,SparkContext 將Job交給DAGScheduler提交,它會根據RDD的血緣關係構成的DAG進行切分,將一個Job劃分為若干Stages,具體劃分策略是,由最終的RDD不斷通過依賴回溯判斷父依賴 是否是寬依賴,即以Shuffle為界,劃分Stage,窄依賴的RDD之間被劃分到同一個Stage中,可以進行pipeline式的計算,如上圖紫色流程部分。劃分的Stages分兩類,一類叫做ResultStage,為DAG最下游的Stage,由Action方法決定,另一類叫做ShuffleMapStage,為下游Stage準備數據,下面看一個簡單的例子WordCount。

img

Job由saveAsTextFile觸發,該Job由RDD-3和saveAsTextFile方法組成,根據RDD之間的依賴關係從RDD-3開始回溯搜索,直到沒有依賴的RDD-0,在回溯搜索過程中,RDD-3依賴RDD-2,並且是寬依賴,所以在RDD-2和RDD-3之間劃分Stage,RDD-3被划到最後一個Stage,即ResultStage中,RDD-2依賴RDD-1,RDD-1依賴RDD-0,這些依賴都是窄依賴,所以將RDD-0、RDD-1和RDD-2劃分到同一個 Stage,即 ShuffleMapStage中,實際執行的時候,數據記錄會一氣呵成地執行RDD-0到RDD-2的轉化。不難看出,其本質上是一個深度優先搜索演算法。一個Stage是否被提交,需要判斷它的父Stage是否執行,只有在父Stage執行完畢才能提交當前Stage,如果一個Stage沒有父Stage,那麼從該Stage開始提交。Stage提交時會將Task資訊(分區資訊以及方法等)序列化並被打包成TaskSet 交給TaskScheduler,一個Partition對應一個Task,另一方面TaskScheduler會監控Stage的運行狀態,只有Executor丟失或者Task由於Fetch失敗才需要重新提交失敗的Stage以調度運行失敗的任務,其他類型的Task失敗會在TaskScheduler的調度過程中重試。相對來說DAGScheduler做的事情較為簡單,僅僅是在Stage層面上劃分DAG,提交Stage並監控相關狀態資訊。TaskScheduler則相對較為複雜,下面詳細闡述其細節。

Spark Task級調度

Spark Task的調度是由TaskScheduler來完成,DAGScheduler將Stage打包到TaskSet交給TaskScheduler,TaskScheduler會將TaskSet封裝為TaskSetManager加入到調度隊列中,TaskSetManager結構如下圖所示。

img

TaskSetManager負責監控管理同一個Stage中的Tasks,TaskScheduler就是以TaskSetManager為單元來調度任務。

TaskScheduler初始化後會啟動SchedulerBackend,它負責跟外界打交道,接收Executor的註冊資訊,並維護Executor的狀態,所以說SchedulerBackend是管「糧食」的,同時它在啟動後會定期地去「詢問」TaskScheduler有沒有任務要運行,也就是說,它會定期地「問」TaskScheduler「我有這麼餘量,你要不要啊」,TaskScheduler在SchedulerBackend「問 」它的時候,會從調度隊列中按照指定的調度策略選擇TaskSetManager去調度運行,大致方法調用流程如下圖所示:

img

將TaskSetManager加入rootPool調度池中之後,調用SchedulerBackend的riviveOffers方法給driverEndpoint發送ReviveOffer消息;driverEndpoint收到ReviveOffer消息後調用makeOffers方法,過濾出活躍狀態的Executor(這些Executor都是任務啟動時反向註冊到Driver的Executor),然後將Executor封裝成WorkerOffer對象;準備好計算資源(WorkerOffer)後,taskScheduler基於這些資源調用resourceOffer在Executor上分配task。

六、調度策略

前面講到,TaskScheduler會先把DAGScheduler給過來的TaskSet封裝成TaskSetManager扔到任務隊列里,然後再從任務隊列里按照一定的規則把它們取出來在SchedulerBackend給過來的Executor上運行。這個調度過程實際上還是比較粗粒度的,是面向TaskSetManager的。調度隊列的層次結構如下圖所示:

img

TaskScheduler是以樹的方式來管理任務隊列,樹中的節點類型為Schdulable,葉子節點為TaskSetManager,非葉子節點為Pool,下圖是它們之間的繼承關係。

img

TaskScheduler支援兩種調度策略,一種是FIFO,也是默認的調度策略,另一種是FAIR。在TaskScheduler初始化過程中會實例化rootPool,表示樹的根節點,是Pool類型。

1、FIFO調度策略

FIFO調度策略執行步驟如下:

1)對s1和s2兩個Schedulable的優先順序(Schedulable類的一個屬性,記為priority,值越小,優先順序越高);

2)如果兩個Schedulable的優先順序相同,則對s1,s2所屬的Stage的身份進行標識進行比較(Schedulable類的一個屬性,記為priority,值越小,優先順序越高);

3)如果比較的結果小於0,則優先調度s1,否則優先調度s2。

img

2、FAIR 調度策略

FAIR 調度策略的樹結構如下圖所示:

img

FAIR模式中有一個rootPool和多個子Pool,各個子Pool中存儲著所有待分配的TaskSetMagager。

可以通過在Properties中指定spark.scheduler.pool屬性,指定調度池中的某個調度池作為TaskSetManager的父調度池,如果根調度池不存在此屬性值對應的調度池,會創建以此屬性值為名稱的調度池作為TaskSetManager的父調度池,並將此調度池作為根調度池的子調度池。

在FAIR模式中,需要先對子Pool進行排序,再對子Pool裡面的TaskSetMagager進行排序,因為Pool和TaskSetMagager都繼承了Schedulable特質,因此使用相同的排序演算法。

排序過程的比較是基於Fair-share來比較的,每個要排序的對象包含三個屬性:runningTasks值(正在運行的Task數)、minShare值、weight值,比較時會綜合考量runningTasks值,minShare值以及weight值。

注意,minShare、weight的值均在公平調度配置文件fairscheduler.xml中被指定,調度池在構建階段會讀取此文件的相關配置。

1)如果A對象的runningTasks大於它的minShare,B對象的runningTasks小於它的minShare,那麼B排在A前面;(runningTasks比minShare小的先執行)

2)如果A、B對象的runningTasks都小於它們的minShare,那麼就比較runningTasks與minShare的比值(minShare使用率),誰小誰排前面;(minShare使用率低的先執行)

3)如果A、B對象的runningTasks都大於它們的minShare,那麼就比較runningTasks與weight的比值(權重使用率),誰小誰排前面。(權重使用率低的先執行)

4)如果上述比較均相等,則比較名字。

整體上來說就是通過minShare和weight這兩個參數控制比較過程,可以做到讓minShare使用率和權重使用率少(實際運行task比例較少)的先運行。

FAIR模式排序完成後,所有的TaskSetManager被放入一個ArrayBuffer里,之後依次被取出並發送給Executor執行。

從調度隊列中拿到TaskSetManager後,由於TaskSetManager封裝了一個Stage的所有Task,並負責管理調度這些Task,那麼接下來的工作就是TaskSetManager按照一定的規則一個個取出Task給TaskScheduler,TaskScheduler再交給SchedulerBackend去發到Executor上執行。

本地化調度

DAGScheduler切割Job,劃分Stage,通過調用submitStage來提交一個Stage對應的tasks,submitStage會調用submitMissingTasks,submitMissingTasks確定每個需要計算的task的preferredLocations,通過調用getPreferrdeLocations()得到partition的優先位置,由於一個partition對應一個task,此partition的優先位置就是task的優先位置,對於要提交到TaskScheduler的TaskSet中的每一個task,該task優先位置與其對應的partition對應的優先位置一致。從調度隊列中拿到TaskSetManager後,那麼接下來的工作就是TaskSetManager按照一定的規則一個個取出task給TaskScheduler,TaskScheduler再交給SchedulerBackend去發到Executor上執行。前面也提到,TaskSetManager封裝了一個Stage的所有task,並負責管理調度這些task。根據每個task的優先位置,確定task的Locality級別,Locality一共有五種,優先順序由高到低順序:

img

在調度執行時,Spark調度總是會盡量讓每個task以最高的本地性級別來啟動,當一個task以X本地性級別啟動,但是該本地性級別對應的所有節點都沒有空閑資源而啟動失敗,此時並不會馬上降低本地性級別啟動而是在某個時間長度內再次以X本地性級別來啟動該task,若超過限時時間則降級啟動,去嘗試下一個本地性級別,依次類推。可以通過調大每個類別的最大容忍延遲時間,在等待階段對應的Executor可能就會有相應的資源去執行此task,這就在在一定程度上提到了運行性能。

失敗重試與黑名單機制

除了選擇合適的Task調度運行外,還需要監控Task的執行狀態,前面也提到,與外部打交道的是SchedulerBackend,Task被提交到Executor啟動執行後,Executor會將執行狀態上報給SchedulerBackend,SchedulerBackend則告訴TaskScheduler,TaskScheduler找到該Task對應的TaskSetManager,並通知到該TaskSetManager,這樣TaskSetManager就知道Task的失敗與成功狀態,對於失敗的Task,會記錄它失敗的次數,如果失敗次數還沒有超過最大重試次數,那麼就把它放回待調度的Task池子中,否則整個Application失敗。在記錄Task失敗次數過程中,會記錄它上一次失敗所在的ExecutorId和Host,這樣下次再調度這個Task時,會使用黑名單機制,避免它被調度到上一次失敗的節點上,起到一定的容錯作用。黑名單記錄Task上一次失敗所在的ExecutorId和Host,以及其對應的「拉黑」時間,「拉黑」時間是指這段時間內不要再往這個節點上調度這個Task了。

七、Spark Shuffle解析

ShuffleMapStage與FinalStage

img

在劃分stage時,最後一個stage成為FinalStage,它本質上是一個ResultStage對象,前面的所有stage被稱為ShuffleMapStage。

ShuffleMapStage的結束伴隨著shuffle文件的寫磁碟。

ResultStage基本上對應程式碼中的action運算元,即將一個函數應用在RDD的各個partition的數據集上,意味著一個job的運行結束。

Shuffle中的任務個數

map端task個數的確定

Shuffle過程中的task個數由RDD分區數決定,而RDD的分區個數與參數spark.default.parallelism有密切關係。

在Yarn Cluster模式下,如果沒有手動設置spark.default.parallelism,則有:

Others: total number of cores on all executor nodes or 2, whichever is larger. spark.default.parallelism = max(所有executor使用的core總數,2)

如果進行了手動配置,則:

spark.default.parallelism = 配置值

還有一個重要的配置:

The maximum number of bytes to pack into a single partition when reading files. spark.files.maxPartitionBytes = 128 M (默認)

代表著rdd的一個分區能存放數據的最大位元組數,如果一個400MB的文件,只分了兩個區,則在action時會發生錯誤。

當一個spark應用程式執行時,生成sparkContext,同時會生成兩個參數,由上面得到的spark.default.parallelism推導出這兩個參數的值:

sc.defaultParallelism = spark.default.parallelism

sc.defaultMinPartitions = min(spark.default.parallelism,2)

當以上參數確定後,就可以推算RDD分區數目了。

(1)通過scala集合方式parallelize生成的RDD

val rdd = sc.parallelize(1 to 10)

這種方式下,如果在parallelize操作時沒有指定分區數,則有:

rdd的分區數 = sc.defaultParallelism

(2)在本地文件系統通過textFile方式生成的RDD

val rdd = sc.textFile(“path/file”)

rdd的分區數 = max(本地file的分片數,sc.defaultMinPartitions)

(3)在HDFS文件系統生成的RDD

rdd的分區數 = max(HDFS文件的Block數目,sc.defaultMinPartitions)

(4)從HBase數據表獲取數據並轉換為RDD

rdd的分區數 = Table的region個數

(5)通過獲取json(或者parquet等等)文件轉換成的DataFrame

rdd的分區數 = 該文件在文件系統中存放的Block數目

(6)Spark Streaming獲取Kafka消息對應的分區數

基於Receiver:

在Receiver的方式中,Spark中的partition和kafka中的partition並不是相關的,所以如果我們加大每個topic的partition數量,僅僅是增加執行緒來處理由單一Receiver消費的主題。但是這並沒有增加Spark在處理數據上的並行度。

基於DirectDStream:

Spark會創建跟Kafka partition一樣多的RDD partition,並且會並行從Kafka中讀取數據,所以在Kafka partition和RDD partition之間,有一個一對一的映射關係。

reduce端task個數的確定

Reduce端進行數據的聚合,一部分聚合運算元可以手動指定reduce task的並行度,如果沒有指定,則以map端的最後一個RDD的分區數作為其分區數,那麼分區數就決定了reduce端的task的個數。

reduce端數據的讀取

根據stage的劃分我們知道,map端task和reduce端task不在相同的stage中,map task位於ShuffleMapStage,reduce task位於ResultStage,map task會先執行,那麼後執行的reduce task如何知道從哪裡去拉去map task落盤後的數據呢?

reduce端的數據拉取過程如下:

1、map task執行完畢後會將計算狀態以及磁碟小文件位置等資訊封裝到mapStatue對象中,然後由本進程中的MapOutPutTrackerWorker對象將mapstatus對象發送給Driver進程的MapOutPutTrackerMaster對象;

2、在reduce task開始執行之前會先讓本進程中的MapOutPutTrackerWorker向Driver進程中的MapOutPutTrackerMaster發動請求,請求磁碟小文件位置資訊;

3、當所有的Map task執行完畢後,Driver進程中的MapOutPutTrackerMaster就掌握了所有的磁碟小文件的位置資訊。此時MapOutPutTrackerMaster會告訴MapOutPutTrackerWorker磁碟小文件的位置資訊;

4、完成之前的操作之後,由BlockerTransforService去Executor所在的節點拉數據,默認會啟動五個子執行緒。每次拉取的數據量不能超過48M(reduce task每次最多拉取48M數據,將拉來的數據存儲到Executor記憶體的20%記憶體中)。

HashShuffle解析

以下的討論都假設每個Executor有一個CPU core。

1、未經優化的HashShuffleManager

shuffle write階段,主要就是在一個stage結束計算之後,為了下一個stage可以執行shuffle類的運算元(比如reduceByKey),而將每個task處理的數據按key進行「劃分」。所謂「劃分」,就是對相同的key執行hash演算法,從而將相同key都寫入同一個磁碟文件中,而每一個磁碟文件都只屬於下游stage的一個task。在將數據寫入磁碟之前,會先將數據寫入記憶體緩衝中,當記憶體緩衝填滿之後,才會溢寫到磁碟文件中去。

下一個stage的task有多少個,當前stage的每個task就要創建多少份磁碟文件。比如下一個stage總共有100個task,那麼當前stage的每個task都要創建100份磁碟文件。如果當前stage有50個task,總共有10個Executor,每個Executor執行5個task,那麼每個Executor上總共要創建500個磁碟文件,所有Executor上會創建5000個磁碟文件。由此可見,未經優化的shuffle write操作所產生的磁碟文件的數量是極其驚人的。

shuffle read階段,通常就是一個stage剛開始時要做的事情。此時該stage的每一個task就需要將上一個stage的計算結果中的所有相同key,從各個節點上通過網路都拉取到自己所在的節點上,然後進行key的集合或鏈接等操作。由於shuffle write的過程中,map task個下游stage的每個reduce task都創建了一個磁碟文件,因此shuffle read的過程中,每個reduce task只要從上游stage的所有map task所在的節點上,拉取屬於自己的那一個磁碟文件即可。

shuffle read的拉取過程是一邊拉取一邊進行聚合的。每個shuffle read task都會有一個自己的buffer緩衝,每次都只能拉取與buffer緩衝相同大小的數據,然後通過你村中的一個Map進行聚合等操作。聚合完一批數據後,再拉取下一批數據,並放到buffer緩衝中進行聚合操作。以此類推,知道最後將所有數據到拉取完,並得到最終的結果。

未經優化的HashShuffleManager工作原理如下圖所示:

img

2、優化後的HashShuffleManager

為了優化HashShuffleManager我們可以設置一個參數,spark.shuffle.consolidateFiles,該參數默認值為false,將其設置為true即可開啟優化機制,通常來說,如果我們使用HashShuffleManager,那麼都建議開啟這個選項。

開啟consolidate機制之後,在shuffle write過程中,task就不是為了下游stage的每個task創建一個磁碟文件了,此時會出現shuffleFileGroup的概念,每個shuffleFileGroup會對應一批磁碟文件,磁碟文件的數量與下游stage的task數量是相同的。一個Executor上有多少個CPU core,就可以並行執行多少個task。而第一批並行執行的每個task都會闖將一個shuffleFileGroup,並將數據寫入對應的磁碟文件內。

當Executor的CPU core執行完一批task,接著執行下一批task時,下一批task就會復用之前已有的shuffleFileGroup,包括其中的磁碟文件,也就是說,此時task會將數據寫入已有的磁碟文件中,而不會寫入新的磁碟文件中。因此,consolidate機制允許不同的task復用同一批磁碟文件,這樣就可以有效將多個task的磁碟文件進行一定程度上的合併,從而大幅度減少磁碟文件的數量,進而提升shuffle write的性能。

假設第二個stage有100個task,第一個stage有50個task,總共還是有10個Executor(Executor CPU個數為1),每個Executor執行5個task。那麼原本使用未經優化的HashSHuffleManager時,每個Executor會產生500個磁碟文件,所有Executor會產生5000個磁碟文件的。但是此時經過優化之後,每個Executor創建的磁碟文件的數量的計算公式為:CPU core的數量 * 下一個stage的task數量,也就是說,每個Executor此時只會創建100個磁碟文件,所有Executor只會創建1000個磁碟文件。

優化後的HashShuffleManager工作原理如下圖所示:

img

SortShuffle解析

SortShuffleManager的運行機制主要分為兩種,一種是普通運行機制,另一種是bypass運行機制。當shuffle read task的數量小於等於spark.shuffle.sort.bypassMergeThreshold參數的值時(默認為200),就會啟用bypass機制。

1、普通運行機制

在該模式下,數據會先寫入一個記憶體數據結構中此時根據不同的shuffle運算元,可能選用不同的數據結構,如果是reduceByKey這種聚合類的shuffle運算元,那麼會選用Map數據結構,一邊通過Map進行聚合,一邊寫入記憶體;如果是join這種普通的shuffle運算元,那麼會選用Array數據結構,直接寫入記憶體。接著,每寫一條數據進如記憶體數據結構之後,就會判斷一下,是否達到了某個臨界閾值。如果達到臨界閾值的話,那麼就會嘗試將記憶體數據結構中的數據溢寫到磁碟,然後清空記憶體數據結構。

在溢寫到磁碟文件之前,會先根據key對記憶體數據結構中已有的數據進行排序。排序過後,會分批將數據寫入磁碟文件。默認的batch數量是10000條,也就是說,排序好的數據,會以每批1萬條數據的形式分批寫入磁碟文件。寫入磁碟文件是通過Java的BufferedOutputStream實現的。BufferedOutputStream是Java的緩衝輸出流,首先會將數據緩衝在記憶體中,當記憶體緩衝滿溢之後再一次寫入磁碟文件中,這樣可以減少磁碟IO次數,提升性能。

一個task將所有數據寫入記憶體數據結構的過程中,會發生多次磁碟溢寫操作,也就會產生多個臨時文件。最後會將之前所有的臨時磁碟文件都進行合併,這就是merge過程,此時會將之前所有臨時磁碟文件中的數據讀取出來,然後依次寫入最終的磁碟文件之中。此外,由於一個task就只對應一個磁碟文件,也就意味著該task為下游stage的task準備的數據都在這一個文件中,一次你還會單獨寫一份索引文件,其中標識了下游各個task的數據在文件中的start offset與end offset。

SortShuffleManager由於有一個磁碟文件merge的過程,因此大大減少了文件數量。比如第一個stage有50個task,總共有10個Executor,每個Executor執行5個task,而第二個stage有100個task。由於每個task最終只有一個磁碟文件,因此此時每個Executor上只有5個磁碟文件,所有Executor只有50個磁碟文件。

普通運行機制的SortShuffleManager工作原理如下圖所示:

img

2、bypass運行機制

bypass運行機制的觸發條件如下:

(1)shuffle map task數量小於spark.shuffle.sort.bypassMergeThreshold參數的值。

(2)不是聚合類的shuffle運算元。

此時,每個task會為每個下游task都創建一個臨時磁碟文件,並將數據按key進行hash然後根據key的hash值,將key寫入對應的磁碟文件之中。當然,寫入磁碟文件時也是先寫入記憶體緩衝,緩衝寫滿之後再溢寫到磁碟文件的。最後,同樣會將所有臨時磁碟文件都合併成一個磁碟文件,並創建一個單獨的索引文件。

該過程的磁碟寫機制其實跟未經優化的HashShuffleManager是一模一樣的,因為都要創建數量驚人的磁碟文件,只是在最後會做一個磁碟文件的合併而已。因此少量的最終磁碟文件,也讓該機制相對未經優化的HashShuffleManager來說,shuffleread的性能會更好。

而該機制與普通SortShuffleManager運行機制的不同在於:第一,磁碟寫機制不同;第二,不會進行排序。也就是說,啟用該機制的最大好處在於,shuffle write過程中,不需要進行數據的排序操作,也就節省掉了這部分的性能開銷。

普通運行機制的SortShuffleManager工作原理如下圖所示:

img

八、Spark記憶體管理

在執行Spark應用程式時,Spark集群會啟動Driver和Executor兩種JVM進程,前者為主控進程,負責創建Spark上下文,提交Spark作業(Job),並將作業轉化為計算任務(Task),在各個Executor進程間協調任務的調度,後者負責在工作節點上執行具體的計算任務,並將結果返回給Driver,同時為需要持久化的RDD提供存儲功能。

堆內和堆外記憶體規劃

作為一個JVM進程,Executor的記憶體管理建立在JVM的記憶體管理之上,Spark對JVM的堆內(On-heap)空間進行了更為詳細的分配,以充分利用記憶體。同時,Spark引入了堆外(Off-heap)記憶體,使之可以直接在工作節點的系統記憶體中開闢空間,進一步優化了記憶體的使用。

堆內記憶體受到JVM統一管理,堆外記憶體是直接向作業系統進行記憶體的申請和釋放。

img

1、堆內記憶體

堆內記憶體的大小,由Spark應用程式啟動時的- executor-memory或spark.executor.memory參數配置。Executor內運行的並發任務共享JVM堆內記憶體,這些任務在快取RDD數據和廣播(Broadcast)數據時佔用的記憶體被規劃為存儲(Storage)記憶體,而這些任務在執行Shuffle時佔用的記憶體被規劃為執行(Execution)記憶體,剩餘的部分不做特殊規劃,那些Spark內部的對象實例,或者用戶定義的Spark應用程式中的對象實例,均佔用剩餘的空間。不同的管理模式下,這三部分佔用的空間大小各不相同。

Spark對堆內記憶體的管理是一種邏輯上的俄「規劃式」的管理,因為對象實例佔用記憶體的申請和釋放都由JVM完成,Spark只能在申請後和釋放前記錄這些記憶體。其具體流程如下:

1、Spark在程式碼中new一個對象實例;

2、JVM從堆內記憶體分配空間,創建對象並返回對象引用;

3、Spark保存該對象的引用,記錄該對象佔用的記憶體。

釋放記憶體流程如下:

1、Spark記錄該對象釋放的記憶體,刪除該對象的引用;

2、等待JVM的垃圾回收機制釋放該對象佔用的堆內記憶體。

我們知道,JVM的對象可以以序列化的方式存儲,序列化的過程是將對象轉換為二進位位元組流,本質上可以理解為將非連續空間的鏈式存儲轉化為連續空間或塊存儲,在訪問時則需要進行序列化的逆過程–反序列化,將位元組流轉化為對象,序列化的方式可以節省存儲空間,但增加了存儲和讀取時候的計算開銷。

對於Spark中序列化的對象,由於是位元組流的形式,其佔用的記憶體大小可直接計算,而對於非序列化的對象,其佔用的記憶體是通過周期性地取樣近似估算而得,即並不是每次新增的數據項都會計算一次佔用的記憶體大小,這種方法降低了時間開銷但是有可能誤差較大,導致某一時刻的實際記憶體可能遠遠超出預期。此外,在被Spark標記為釋放的對象實例,很有可能在實際上並沒有被JVM回收,導致實際可用的記憶體小於Spark記錄的可用記憶體。所以Spark並不能準確記錄實際可用的堆內記憶體,從而也就無法完全避免記憶體溢出(OOM,Out of Memory)的異常。

雖然不能精確控制堆內記憶體的申請和釋放,但Spark通過對存儲記憶體和執行記憶體各自獨立的規劃管理,可以決定是否要在存儲記憶體里緩衝新的RDD,以及是否為新的任務分配執行記憶體,在一定程度上可以提升記憶體的利用率,減少異常的出現。

2、堆外記憶體

為了進一步優化記憶體的使用以及提高Shuffle時排序的效率,Spark引入了堆外(Off-heap)記憶體,使之可以直接在工作節點的系統記憶體中開闢空間,存儲經過序列化的二進位數據。

堆外記憶體意味著把記憶體對象分配在Java虛擬機的堆以外的記憶體,這些記憶體直接受作業系統管理(而不是虛擬機)。這樣做的結果就是能保持一個較小的堆,以減少垃圾收集對應用的影響。

利用JDK Unsafe API(從spark2.0開始,在管理堆外的存儲記憶體時不再基於Tachyon,而是與堆外的執行記憶體一樣,基於JDK Unsafe API實現),Spark可以直接作業系統堆外記憶體,減少了不必要的記憶體開銷,以及頻繁的GC掃描和回收,提升了處理性能。堆外記憶體可以被精確地申請和釋放(堆外記憶體之所以能夠被精確的申請和釋放,是由於記憶體的申請和釋放不再通過JVM機制,而是直接向作業系統申請,JVM對於記憶體的清理是無法準確指定時間點的,因此無法實現精確的釋放),而且序列化的數據佔用的空間可以被精確計算,所以相比堆內記憶體來說降低了管理的難度,也降低了誤差。

在默認情況下堆外記憶體並不啟用,可以通過配置spark.memory.offHeap.enabled參數啟用,並由spark.memory.offHeap.size參數設定堆外空間的大小。除了沒有other空間,堆外記憶體與堆內記憶體的劃分方式相同,所有運行中的並發任務共享存儲記憶體和執行記憶體。

(該部分記憶體主要用於程式的共享庫,Perm Space、執行緒Stack和一些Memory mapping等,或者類C方式allocate object)

記憶體空間分配

1、靜態記憶體管理

在Spark最初採用的靜態記憶體管理機制下,存儲記憶體、執行記憶體和其他記憶體的大小在Spark應用程式運行期間均為固定的,但用戶可以應用程式啟動前進行配置,堆內記憶體的分配如下圖所示:

img

可以看到,可用的堆內記憶體的大小需要按照程式碼清單的方式計算:

可用的存儲記憶體 = systemMaxMemory * spark.storage.memoryFraction * spark.storage.safety Fraction

可用的執行記憶體 = systemMaxMemory * spark.shuffle.memoryFraction * spark.shuffle.safety Fraction

其中systemMaxMemory取決於當前JVM堆內記憶體的大小,最後可用的執行記憶體或者存儲記憶體要在此基礎上與各自的memoryFraction參數和safetyFraction參數相乘得出。上述計算公式中的兩個safetyFraction參數,其意義在於在邏輯預留出1-safetyFraction這麼一塊保險區域,降低因實際記憶體超出當前預設範圍而導致OOM的風險(上文提到,對於非序列化對象的記憶體取樣估算會產生誤差)。值得注意的是,這個預留的保險區域僅僅是一種邏輯上的規劃,再具體使用時Spark並沒有區別對待,和「其他記憶體」一樣交給了JVM去管理。

Storage記憶體和Executor記憶體都有預留空間,目的是防止OOM,因為Spark堆內記憶體大小的記錄是不準確的,需要留出保險區域。

堆外的空間分配較為簡單,只有存儲記憶體和執行記憶體。可用的執行記憶體和存儲記憶體佔用的空間大小直接由參數spark.memory.storageFraction決定,由於堆外記憶體佔用的空間可以被精確計算,所以無需再設定保險區域。

img

靜態記憶體管理機制實現起來較為簡單,但如果用戶不熟悉Spark的鵆機制,或沒有根據具體的數據規模和計算任務或做相應的配置,很容易造成「一般海水,一般火焰」的局面,即存儲記憶體和執行記憶體中的一方剩餘大量的空間,而另一方卻早早被佔滿,不得不淘汰或移出舊的內容以存儲新的內容。由於新的記憶體管理機制的出現,這種方式目前已經很少有開發者使用,出於兼容舊版本的應用程式的目的,Spark依然保留了它的實現。

2、統一記憶體管理

Spark1.6之後引入的統一記憶體管理機制,與靜態記憶體管理的區別在於存儲記憶體和執行記憶體共享同一塊空間,可以動態佔用對方的空閑區域,統一記憶體管理的堆內記憶體結構如下圖所示:

img

統一記憶體管理的堆外記憶體結構如下圖所示:

img

其中最重要的優化在於動態佔用機制,其規則如下:

1、設定基本的存儲記憶體和執行記憶體區域(spark.storage.storageFraction參數),該設定確定了雙方各自擁有的空間的範圍;

2、雙方的空間都不足時,則存儲到磁碟;若己方空間不足而對方空餘時,可借用對方的空間;(存儲空間不足是指不足以放下一個完整的Block)

3、執行記憶體的空間被對方佔用後,可讓對方將佔用的部分轉存到磁碟,然後「歸還」借用的空間;

4、存儲記憶體的空間被對方佔用後,無法讓對方「歸還」,因為需要考慮Shuffle過程中的很多因素,實現起來較為複雜。

統一記憶體管理的動態佔用機制如下圖所示:

img

憑藉統一記憶體管理機制,spark在一定程度上提高了堆內和堆外記憶體資源的利用率,降低了開發者維護spark記憶體的難度。如果存儲記憶體的空間太大或者說快取的數據過多,反而會導致頻繁的全量垃圾回收,降低任務執行時的性能,因為快取的RDD數據通常都是長期主流記憶體的。所以要想充分發揮Spark的性能,需要開發者進一步了解存儲記憶體和執行記憶體各自管理方式和實現原理。

存儲記憶體管理

1、RDD持久化機制

彈性分散式數據集(RDD)作為Spark最根本的數據抽象,是只讀的分區記錄(Partition)的集合,只能基於在穩定物理存儲中的數據集上創建,或者在其他已有的RDD上執行轉換(Transformation)操作產生一個新的RDD。轉換後的RDD與原始的RDD之間產生了依賴關係,構成了血統(Lineage)。憑藉血統,Spark保證了每一個RDD都可以被重新恢復。但是RDD的所有轉換都是有惰性的,即只有當一個返回結果給Driver的行動(Action)發生時,Spark才會創建任務讀取RDD,然後真正觸發轉換的執行。

Task在啟動之初讀取一個分區時,會先判斷這個分區是否已經被持久化,如果沒有則需要檢查Checkpoint或按照血統重新計算。所以如果一個RDD上要執行多次行動,可以在第一次行動中使用persist或cache方法,在記憶體或磁碟中持久化或快取這個RDD,從而在後面的行動中提升計算速度。

事實上,cache方法是使用默認的MEMORY_ONLY的存儲級別將RDD持久化到記憶體,故快取是一種特殊的持久化。堆內和堆外存儲記憶體的設計,便可以對快取RDD時使用的記憶體做統一的規劃和管理。

RDD的持久化由Spark的Storage模組負責,實現了RDD與物理存儲的解耦合。Storage模組負責管理Spark在計算過程中產生的數據,將那些在記憶體或磁碟、在本地或遠程存取數據的功能封裝了起來。在具體實現時Driver端和Executor端的Storage模組構成了主從式的架構,即Driver端的BlockManager為Master,Executor端的BlockManager為Slave。

Storage模組在邏輯上以Block為基本存儲單位,RDD的每個Partition經過處理後位移對應一個Block(BlockId的格式為rdd_RDD-ID_PARTITION-ID)。Driver端的Master負責整個Spark應用程式的Block的元數據資訊的管理和維護,而Executor端的Slave需要將Block的更新等狀態上報到Master,同時接受Master的命令,例如新增或刪除一個RDD。

img

在對RDD持久化時,Spark規定了MEMORY_ONLY、MEMORY_AND_DISK等7中不同的存儲級別,而存儲級別是以下5個變數的組合:

class StorageLevel private(

private var _useDisk: Boolean, //磁碟

private var _useMemory: Boolean, //這裡其實是指堆內記憶體

private var _useOffHeap: Boolean, //堆外記憶體

private var _deserialized: Boolean, //是否為非序列化

private var _replication: Int = 1 //副本個數

)

Spark中7中存儲級別如下:

img

通過對數據結構的分析,可以看出存儲級別從三個維度定義了RDD的Partition(同時也就是Block)的存儲方式:

(1)存儲位置:磁碟/堆內記憶體/堆外記憶體。如MEMORY_AND_DISK是同時在磁碟和堆內記憶體上存儲,實現了冗餘備份。OFF_HEAP則是只在堆外記憶體存儲,目前選擇堆外記憶體時不能同時存儲到其他位置。

(2)存儲形式:Block快取到存儲記憶體後,是否為非序列化的形式。如MEMORY_ONLY是非序列化方式存儲,OFF_HEAP是序列化方式存儲。

(3)副本數量:大於1時需要遠程冗餘備份到其他節點。如DISK_ONLY_2需要遠程備份1個副本。

2、RDD的快取過程

RDD在快取到存儲記憶體之前,Partition中的數據一般以迭代器(Iterator)的數據結構來訪問,這是Scala語言中一種遍曆數據集合的方法。通過Iterator可以獲取分區中每一條序列化或者非序列化的數據項(Record),這些Record的對象實例在邏輯上佔用了JVM堆內記憶體的other部分的空間,同一Partition的不同Record的存儲空間並不連續。

RDD在快取到存儲記憶體之後,Partition被轉換成Block,Record在堆內或堆外存儲記憶體中佔用一塊連續的空間。將Partition由不連續的存儲空間轉換為連續存儲空間的過程,Spark稱之為「展開」(Unroll)。

Block有序列化和非序列化兩種存儲格式,具體以哪種方式取決於該RDD的存儲級別。非序列化的Block以一種DeserializedMemoryEntry的數據結構定義,用一個數組存儲所有的對象實例,序列化的Block則以SerializedMemoryEntry的數據結構定義,用位元組緩衝區(ByteBuffer)來存儲二進位數據。每個Executor的Storage模組用一個鏈式Map結構(LinkedHashMap)來管理堆內和堆外存儲記憶體中所有的Block對象的實例,對這個LinkedHashMap新增和刪除間接記錄了記憶體的申請和釋放。

因為不能保證存儲空間可以一次容納Iterator中的所有數據,當前的計算任務在Unroll時要向MemoryManager申請足夠的Unroll空間來臨時佔位,空間不足則Unroll失敗,空間足夠時可以繼續進行。

對於序列化的Partition,其所需的Unroll空間可以直接累加計算,一次申請。

對於非序列化的Partition則要在便利Record的過程中一次申請,即每讀取一條Record,取樣估算其所需的Unroll空間並進行申請,空間不足時可以中斷,釋放已佔用的Unroll空間。

如果最終Unroll成功,當前Partition所佔用的Unroll空間被轉換為正常的快取RDD的存儲空間,如下圖所示。

img

在靜態記憶體管理時,Spark在存儲記憶體中專門劃分了一塊Unroll空間,其大小是固定的,統一記憶體管理時則沒有對Unroll空間進行特別區分,當存儲空間不足時會根據動態佔用機制進行處理。

3、淘汰與落盤

由於同一個Executor的所有的計算任務共享有限的存儲記憶體空間,當有新的Block需要快取單數剩餘空間不足且無法動態佔用時,就要對LinkedHashMap中的舊Block進行淘汰(Eviction),而被淘汰的Block如果其存儲級別中同時包含存儲到磁碟的要求,則要對其進行落盤(Drop),否則直接刪除該Block。

存儲記憶體的淘汰規則為:

被淘汰的舊Block要與新的Block的MemoryMode相同,即同屬於堆外或堆內記憶體;

新舊Block不能屬於同一個RDD,避免循環淘汰;

舊Block所屬RDD不能處於被讀狀態,避免引發一致性問題;

遍歷LinkedHashMap中Block,按照最近最少使用(LRU)的順序淘汰,直到滿足新Block所需的空間。其中LRU是LinkedHashMap的特性。

落盤的流程則比較簡單,如果其存儲級別符合_useDisk為true的條件,再根據其_deserialized判斷是否是非序列化的形式,若是則對其進行序列化,最後將數據存儲到磁碟,在Storage模組中更新其資訊。

執行記憶體管理

執行記憶體主要用來存儲任務再在執行Shuffle時佔用的記憶體,Shuffle是按照一定規則對RDD數據重新分區的過程,Shuffle的Write和Read兩階段對執行記憶體的使用:

Shuffle Write

在map端會採用ExternalSorter進行外排,在記憶體中存儲數據時主要佔用堆內執行空間。

Shuffle Read

(1)在對reduce端的數據進行聚合時,要將數據交給Aggregator處理,在記憶體中存儲數據時佔用堆內執行空間。

(2)如果需要進行最終結果排序,則要將再次將數據交給ExternalSorter處理,佔用堆內執行空間。

在ExternalSorter和Aggregator中,Spark會使用一種叫做AppendOnlyMap的哈希表在堆內執行記憶體中存儲數據,但是Shuffle過程中所有數據並不能都保存到該哈希表中,當這個哈希表佔用的記憶體會進行周期性地取樣估算,當其大到一定程度,無法再從MemoryManager申請到新的執行記憶體時,Spark就會將其全部內容存儲到磁碟文件中,這個過程被稱為溢存(Spill),溢存到磁碟的文件最後會被歸併(Merge)。

Spark的存儲記憶體和執行記憶體有著截然不同的管理方式:對於存儲記憶體來說,Spark用一個LinkedHashMap來集中管理所有的Block,Block由需要快取的RDD的Partition轉化而成;而對於執行記憶體,Spark用AppendOnlyMap來存儲Shuffle過程中的數據,在Tungsten排序中甚至抽象稱為頁式記憶體管理,開闢了全新的JVM記憶體管理機制。

九、Spark核心組件解析

BlockManager數據存儲與管理機制

BlockManager是整個Spark底層負責數據存儲與管理的一個組件,Driver和Executor的所有數據都由對應的BlockManager進行管理。

Driver上有BlockManagerMaster,負責對各個節點上的BlockManager內部管理的數據的元數據進行維護,比如block的增刪改等操作,都會在這裡維護好元數據的變更。

每個節點都有一個BlockManager,每個BlockManager創建之後,第一件事即使去向BlockManagerMaster進行註冊,此時BlockManagerMaster會為其創建對應的BlockManagerInfo。

img

BlockManagerMaster與BlockManager的關係非常像NameNode與DataNode的關係,BlockManagerMaster中保存BlockManager內部管理數據的元數據,進行維護,當BlockManager進行Block增刪改等操作時,都會在BlockManagerMaster中進行元數據的變更,這與NameNode維護DataNode的元數據資訊,DataNode中數據發生變化時NameNode中的元數據也會相應變化是一致的。

每個節點上都有一個BlockManager,BlockManager中有三個非常重要的組件:

DisStore:負責對磁碟數據進行讀寫;

MemoryStore:負責對記憶體數據進行讀寫;

BlockTransferService:負責建立BlockManager到遠程其他節點的BlockManager的連接,負責對遠程其他節點的BlockManager的數據進行讀寫;

每個BlockManager創建之後,做的第一件事就是向BlockManagerMaster進行註冊,此時BlockManagerMaster會為其創建對應的BlockManagerInfo。

使用BlockManager進行寫操作時,比如說,RDD運行過程中的一些中間數據,或者我們手動指定了persist(),會優先將數據寫入記憶體中,如果記憶體大小不夠,會使用自己的演算法,將記憶體中的部分數據寫入磁碟;此外,如果persist()指定了要replica,那麼會使用BlockTransferService將數據replicate一份到其他節點的BlockManager上去。

使用BlockManager進行讀操作時,比如說,shuffleRead操作,如果能從本地讀取,就利用DisStore或者MemoryStore從本地讀取數據,但是本地沒有數據的話,那麼會用BlockTransferService與有數據的BlockManager建立連接,然後用BlockTransferService從遠程BlockManager讀取數據;例如,shuffle Read操作中,很有可能要拉取的數據本地沒有,那麼此時就會從遠程有數據的節點上,找那個節點的BlockManager來拉取需要的數據。

只要使用BlockManager執行了數據增刪改的操作,那麼必須將Block的BlockStatus上報到BlockManagerMaster,在BlockManagerMaster上會對指定BlockManager的BlockManagerInfo內部的BlockStatus進行增刪改操作,從而達到元數據的維護功能。

Spark共享變數底層實現

Spark一個非常重要的特性就是共享變數。

默認情況下,如果在一個運算元的函數中使用到了某個外部的變數,那麼這個變數的值會被拷貝到每個task中,此時每個task只能操作自己的那份變數副本。如果多個task想要共享某個變數,那麼這種方式是做不到的。

Spark為此提供了兩種共享變數,一種是Broadcast Variable(廣播變數),另一種是Accumulator(累加變數)。Broadcast Variable會將用到的變數,僅僅為每個節點拷貝一份,即每個Executor拷貝一份,更大的用途是優化性能,見上網路傳輸以及記憶體損耗。Accumulator則可以讓多個task共同操作一份變數,主要可以進行累加操作。Broadcast Variable是共享讀變數,task不能去修改它,而Accumulator可以讓多個task操作一個變數。

廣播變數

廣播變數允許編程者在每個Executor上暴力外部數據的只讀變數,而不是給每個任務發送一個副本。

每個task都會保存一份它所使用的外部變數的副本,當一個Executor上的多個task都使用一個外部變數時,對於Executor記憶體的消耗是非常大的,因此,我們可以將大型外部變數封裝為廣播變數,此時一個Executor保存一個變數副本,此Executor上的所有task共用此變數,不再是一個task單獨保存一個副本,這在一定程度上降低了Spark任務的記憶體佔用。

img

使用外部變數

img

使用廣播變數

Spark還嘗試使用高效的廣播演算法分發廣播變數,以降低通訊成本。

Spark提供的Broadcast Variable是只讀的,並且在每個Executor上只會有一個副本,而不會為每個task都拷貝一份副本,因此,它的最大作用,就是減少變數到各個節點的網路傳輸消耗,以及在各個節點上的記憶體消耗。此外,Spark內部也是用了高效的廣播演算法來減少網路消耗。

可以通過調用SparkContext的broadcast()方法來針對每個變數創建廣播變數。然後再運算元的函數內,使用到廣播變數時,每個Executor只會拷貝一份副本了,每個task可以使用廣播變數的value()方法獲取值。

在任務運行時,Executor並不獲取廣播變數,當task執行到使用廣播變數的程式碼時,會向Executor的記憶體中請求廣播變數,如下圖所示:

img

之後Executor會通過BlockManager向Driver拉取廣播變數,然後提供給task進行使用,如下圖所示:

img

廣播大變數是Spark中常用的基礎優化方法,通過減少記憶體佔用實現任務執行性能的提升。

累加器

累加器(accumulator):Accumulator是僅僅被相關操作累加的變數,因此可以在並行中被有效地支援。它們可用於實現計數器(如MapReduce)或總和計數。

Accumulator是存在於Driver端的,集群上運行的task進行Accumulator的累加,隨後把值發送到Driver端,在Driver端匯總(Spark UI在SparkContext創建時被創建,即在Driver端被創建,因此它可以讀取Accumulator的數值),由於Accumulator存在於Driver端,從節點讀取不到Accumulator的數值。

Spark提供的Accumulator主要用於多個節點對一個變數進行共享性的操作。Accumulator只提供了累加的功能,但是卻給我們提供了多個task對於同一個變數並行操作的功能,但是task只能對Accumulator進行累加操作,不能讀取它的值,只有Driver程式可以讀取Accumulator的值。

Accumulator的底層原理如下圖所示:

img

關注我們的微信公眾號

關注公眾號,獲得精美書籍、資料,獲取第一手最新文章。

Tags: