spark 集群優化
- 2020 年 4 月 24 日
- 筆記
- spark, spark streaming
只有滿懷自信的人,能在任何地方都懷有自信,沉浸在生活中,並認識自己的意志。
前言
最近公司有一個生產的小集群,專門用於運行spark作業。但是偶爾會因為nn或dn壓力過大而導致作業checkpoint操作失敗進而導致spark 流任務失敗。本篇記錄從應用層面對spark作業進行優化,進而達到優化集群的作用。
集群使用情況
有數據的目錄以及使用情況如下:
目錄
|
說明
|
大小
|
文件數量
|
數據數量佔比
|
數據大小佔比
|
---|---|---|---|---|---|
/user/root/.sparkStaging/applicationIdxxx | spark任務配置以及所需jar包 | 5G | 約1k | 約20% | 約100% |
/tmp/checkpoint/xxx/{commits|metadata|offsets|sources} | checkpoint文件,其中commits和offsets頻繁變動 | 2M | 約4k | 約40% | 約0% |
對於.sparkStaging目錄,不經常變動,只需要優化其大小即可。
對於 checkpoint目錄,頻繁性增刪,從生成周期和保留策略兩方面去考慮。
.sparkStaging目錄優化
對於/user/hadoop/.sparkStaging下文件,是spark任務依賴文件,可以將jar包上傳到指定目錄下,避免或減少了jar包的重複上傳,進而減少任務的等待時間。
可以在spark的配置文件spark-defaults.conf配置如下內容:
spark.yarn.archive=hdfs://hdfscluster/user/hadoop/jars spark.yarn.preserve.staging.files=false
參數說明
Property Name
|
Default
|
Meaning
|
---|---|---|
spark.yarn.archive | (none) | An archive containing needed Spark jars for distribution to the YARN cache. If set, this configuration replaces spark.yarn.jars and the archive is used in all the application’s containers. The archive should contain jar files in its root directory. Like with the previous option, the archive can also be hosted on HDFS to speed up file distribution. |
spark.yarn.preserve.staging.files | false | Set to true to preserve the staged files (Spark jar, app jar, distributed cache files) at the end of the job rather than delete them. |
checkpoint優化
首先了解一下 checkpoint文件代表的含義。
checkpoint文件說明
-
offsets 目錄 – 預先記錄日誌,記錄每個批次中存在的偏移量。為了確保給定的批次將始終包含相同的數據,我們在進行任何處理之前將其寫入此日誌。因此,該日誌中的第N個記錄指示當前正在處理的數據,第N-1個條目指示哪些偏移已持久地提交給sink。
-
commits 目錄 – 記錄已完成的批次ID的日誌。這用於檢查批處理是否已完全處理,並且其輸出已提交給接收器,因此無需再次處理。(例如)在重新啟動過程中使用,以幫助識別接下來要運行的批處理。
-
metadata 文件 – 與整個查詢關聯的元數據,只有一個 StreamingQuery 唯一ID
-
sources目錄 – 保存起始offset資訊
下面從兩個方面來優化checkpoint。
第一,從觸發checkpoint機制方面考慮
trigger的機制
Trigger是用於指示 StreamingQuery 多久生成一次結果的策略。
Trigger有三個實現類,分別為:
-
OneTimeTrigger – A Trigger that processes only one batch of data in a streaming query then terminates the query.
-
ProcessingTime – A trigger that runs a query periodically based on the processing time. If
interval
is 0, the query will run as fast as possible.by default,trigger isProcessingTime
, andinterval=0
-
ContinuousTrigger – A Trigger that continuously processes streaming data, asynchronously checkpointing at the specified interval.
可以為 ProcessingTime 指定一個時間 或者使用 指定時間的ContinuousTrigger ,固定生成checkpoint的周期,避免checkpoint生成過於頻繁,減輕多任務下小集群的nn的壓力
第二,從checkpoint保留機制考慮。
保留機制
spark.sql.streaming.minBatchesToRetain – 必須保留並使其可恢復的最小批次數,默認為 100
可以調小保留的batch的次數,比如調小到 20,這樣 checkpoint 小文件數量整體可以減少到原來的 20%
checkpoint 參數驗證
主要驗證trigger機制和保留機制
驗證trigger機制
未設置trigger效果
未設置trigger前,spark structured streaming 的查詢batch提交的周期截圖如下:
每一個batch的query任務的提交是毫無周期規律可尋。
設置trigger程式碼
trigger效果
設置trigger程式碼後效果截圖如下:
每一個batch的query任務的提交是有規律可尋的,即每隔5s提交一次程式碼,即trigger設置生效!
注意,如果消息不能馬上被消費,消息會有積壓,structured streaming 目前並無與spark streaming效果等同的背壓機制,為防止單批次query查詢的數據源數據量過大,避免程式出現數據傾斜或者無法挽回的OutOfMemory錯誤,可以通過 maxOffsetsPerTrigger 參數來設置單個批次允許抓取的最大消息條數。
使用案例如下:
spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "xxx:9092") .option("subscribe", "test-name") .option("startingOffsets", "earliest") .option("maxOffsetsPerTrigger", 1) .option("group.id", "2") .option("auto.offset.reset", "earliest") .load()
驗證保留機制
默認保留機制效果
spark任務提交參數
#!/bin/bash spark-submit \ --class zd.Example \ --master yarn \ --deploy-mode client \ --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3,org.apache.kafka:kafka-clients:2.0.0 \ --repositories http://maven.aliyun.com/nexus/content/groups/public/ \ /root/spark-test-1.0-SNAPSHOT.jar
如下圖,offsets和commits最終最少各保留100個文件。
修改保留策略
通過修改任務提交參數來進一步修改checkpoint的保留策略。
添加 –conf spark.sql.streaming.minBatchesToRetain=2 ,完整腳本如下:
#!/bin/bash spark-submit \ --class zd.Example \ --master yarn \ --deploy-mode client \ --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3,org.apache.kafka:kafka-clients:2.0.0 \ --repositories http://maven.aliyun.com/nexus/content/groups/public/ \ --conf spark.sql.streaming.minBatchesToRetain=2 \ /root/spark-test-1.0-SNAPSHOT.jar
修改後保留策略效果
修改後保留策略截圖如下:
即 checkpoint的保留策略參數設置生效!
總結
綜上,可以通過設置 trigger 來控制每一個batch的query提交的時間間隔,可以通過設置checkpoint文件最少保留batch的大小來減少checkpoint小文件的保留個數。
參照
- //github.com/apache/spark/blob/master/docs/running-on-yarn.md
- //blog.csdn.net/lm709409753/article/details/85250859
- //github.com/apache/spark/blob/v2.4.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
- //github.com/apache/spark/blob/v2.4.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
- //github.com/apache/spark/blob/v2.4.3/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala
- //github.com/apache/spark/blob/v2.4.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTrigger.scala
- //github.com/apache/spark/blob/v2.4.3/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala