從0到1進行Spark history分析

一、總體思路

從0到1進行Spark history分析總體思路

以上是我在平時工作中分析spark程序報錯以及性能問題時的一般步驟。當然,首先說明一下,以上分析步驟是基於企業級大數據平台,該平台會抹平很多開發難度,比如會有調度日誌(spark-submit日誌)、運維平台等加持,減少了開發人員直接接觸生成服務器命令行的可能,從物理角度進行了硬控制,提高了安全性。

下面我將帶領大家從零到一,從取日誌,到在Spark WebUI進行可視化分析相關報錯、性能問題的方法。

二、步驟

(一)獲取applicationID

1.從調度日誌獲取

一般企業級大數據平台會相對重視日誌的採集,這不僅有助於事後對相關問題、現象的分析;同時也是相關審計環節的要求。

我們知道,spark的調度是通過spark-submit執行觸發的,每一次spark-submit都會有對應的applicationID生成,所以,一般我們可以在調度日誌中可以找到本次調度的applicationID。

2.從運維平台獲取

企業級大數據平台為了減少開發、運維人員直接通過ftp、putty、xshell等工具直連生產服務器,避免誤操作等風險發生,一般會提供一個運維平台,在頁面上便可直接查看到job粒度的作業運行情況,以及其唯一標誌applicationID,縮短了開發、運維人員獲取applicationID的「路徑」,減少了機械性的步驟。

(二)從HDFS下載Spark history

我們假設我們的大數據平台把spark history保存到了HDFS的/sparkJobHistory/目錄下,下面我們來看看具體如何獲取我們對應applicationID的spark history。

1.命令行下載HDFS文件

hadoop fs -get /sparkJobHistory/applicationID_xxxxx_1-meta localfile
hadoop fs -get /sparkJobHistory/applicationID_xxxxx_1-part1 localfile

2.HUE工具下載HDFS文件

我們使用租戶登錄HUE工具,進入到File Browser頁面,並通過頁面上的目錄訪問按鈕進入到sparkJobHistory目錄下,然後在搜索框中輸入applicationID ,就可以看到該applicationID對應的spark history(meta和part1兩個文件)顯示在了頁面中,勾選並下載,便可以將spark history下載到我們本地。

(三)上傳Spark history到開發環境

接下來我們要做的工作就是將從生產上獲取到的spark history放到我們開發環境上,以便進行後續分析。

1.命令行推送spark history到HDFS上

首先我們需要將從生產下載到本地的spark history上傳到我們測試環境的任意一個節點上,這裡我們將其上傳到測試環境01節點的${WORK_ROOT}/etluser/tmp/spark_history/applicationID_xxxxx/路徑下,接下來在測試環境命令行執行以下命令,將spark history上傳到HDFS:

hadoop fs -put ${WORK_ROOT}/etluser/tmp/spark_history/applicationID_xxxxx/* /sparkJobHistory/

2.HUE工具上傳spark history到HDFS上

這個步驟和我們在生產HUE類似,首先使用租戶登錄到測試環境HUE,然後進入File Browser的/sparkJobHistory/路徑下,點擊Upload按鈕,上傳spark history到該路徑下即可。

(四)Spark WebUI進行分析

1.搜索applicationID

進入Spark WebUI頁面,在搜索框輸入applicationID,即可篩選出該applicationID的spark history。

2.進行stage可視化分析

點擊該App ID進去即可看到該applicationID的每一個Job詳細運行情況;點擊進一個Job,即可看到該Job的DAG圖,以及對應的stage運行情況;點擊進一個stage,即可看到該stage下所有的task運行情況。通過每一個頁面上的運行耗時、GC時間、input/output數據量大小等,根據這些信息即可分析出異常的task、stage、job。

(五)查看相關代碼段

1.定位異常Job

通過applicationID進入到我們要分析的Job的運行情況頁面後,可以看到該App ID下每一個Job,一般通過觀察每一個Job的運行時長可以識別出哪一個Job是異常的,通常運行時間過長的Job就是異常的,可以進入該Job的詳細頁面進行進一步分析。

2.回溯Job對應代碼段

一般定位到異常的Job我們就可以知道對應的代碼段,通常這個Job運行情況頁會顯示其對應的代碼行,通過回到代碼中找到這個代碼行,其前後一小部分代碼段就是這個異常Job的執行代碼段。

(六)分析與定位問題

1.列出代碼段涉及的源表

如果是SparkSQL代碼,可以通過SQL直觀獲取到其所涉及的源表,將這些源表記錄下來,以便後續分析。

如果是SparkRDD代碼,可以通過代碼中所使用的數據集追溯到該數據集所對應的源表,同樣,我們把它們記錄下來,以便後續進行分析。

2.分析源表的使用方式:廣播、普通關聯、…

我們可以通過代碼中分析出這些源表的使用方式。最常見的應該是對小表進行廣播的方式。所謂廣播,就是把小表的數據完整地發送到集群的各個DataNode本地緩存起來,當大表與之進行關聯操作時,存在於各個DataNode之上的大表數據塊便可以根據就近計算原則,與小表數據進行關聯計算,從而減少了網絡傳輸,提高了運行速度。

在SparkSQL中,一般如果大表和小表進行關聯,會通過hint語法對小表進行廣播,具體來說是使用/*mapjoin(small_table_name)*/這樣的形式。
而在SparkRDD中,一般也會對小表進行廣播操作,通過broadcast()接口進行實現。在Java中,具體來說是使用Broadcast data_broadcast = JavaSparkContext.broadcast(table_data.collect());這樣的語法進行實現。

對於普通關聯,即沒有對錶進行特殊處理的關聯。這種寫法一般在大小表關聯的場景下容易出現性能問題,需要特別關注。經常可以作為問題分析的切入點。

3.查看源表數據量

一般通過第一步把問題代碼段中所涉及的源表羅列出來後,就需要到生產查看這些源表的數據量是多少,以方便分析是否是因為數據量過大而導致的性能問題,一般如果是數據量導致的問題,多半是因為資源不足,可以考慮通過調整資源數量來解決。

當然,除了常規的查看源表的記錄數外,還可以查看該表在HDFS上佔用的空間大小。

查看源表記錄數SQL:

select count(1) as cnt from db_name.table_name where pt_dt=』xxxx-xx-xx』;

查看源表佔用的空間大小,這裡使用GB為單位(102410241024)顯示:

hadoop fs -du /user/hive/warehouse/db_name.db/ table_name/pt_dt='xxxx-xx-xx' | awk 『${SUM+=$1} END {print SUM/(1024*1024*1024)}'

4.查看源表數據塊分佈情況

有時候源表的數據量並不算大,但是還是出現了性能瓶頸,這時候通過觀察tasks數的多少,大致可以猜測到是因為源表的數據塊大小分佈不均勻,或是數據塊過少導致的。可以通過以下命令查看源表的數據塊分佈情況:

hadoop fs -ls -h /user/hive/warehouse/db_name.db/ table_name/pt_dt='xxxx-xx-xx'

通過該命令的輸出我們可以看到源表的每一個數據塊大小,以此有多少個數據塊。舉個例子,如果數據塊只有2~3塊,第一個數據塊有80MB,第二、第三個數據塊分別只有1KB,那基本可以判定這幾個數據塊分佈不合理。

數據塊分佈不合理可以通過對源表數據進行重分佈(repartition)或設置spark處理的每個map數據塊大小上限來前置將過大的數據塊分散到各個task中處理,以減少關鍵task的處理耗時,提升程序性能。

三、總結

本文深入淺出,具體到步驟和實際操作,帶領大家從獲取作業applicationID,到下載Spark history,再到上傳Spark history至開發環境,再進行Spark WebUI分析異常stage,再而定位到問題代碼段,最後給出一般問題的分析方向以及分析方法。

對於Spark相關問題的分析,最直接有效的就是對Spark history的分析了,希望大家能通過練習和實操掌握這項技能。當然,平時進行Spark、Hadoop生態體系的理論知識積累也是必不可少的,所謂萬丈高樓平地起,根基要穩,才能讓樓起得更高。