如何獲取流式應用程式中checkpoint的最新offset
對於流式應用程式,保證應用7*24小時的穩定運行,是非常必要的。因此對於計算引擎,要求必須能夠適應與應用程式邏輯本身無關的問題(比如driver應用失敗重啟、網路問題、伺服器問題、JVM崩潰等),具有自動容錯恢復的功能。
目前,Spark(Spark Streaming/Structured Streaming)和Flink的checkpoint機制,就是處理類似情況,實現容錯機制的核心利器。
對於Flink:
為了保證其高可用、Exactly Once的特性,提供了一套強大的checkpoint機制,它能夠根據配置周期性地基於流中各個operator的狀態來生成快照,從而將這些狀態數據定期持久化存儲下來,當Flink程式一旦出現故障時,能夠將整個應用流程式恢復到故障前的某一種態,從而修正因為故障帶來的程式數據狀態中斷。
對於Spark:
在流式應用中,Spark Streaming/Structured Streaming會將關於應用足夠多的資訊checkpoint到高可用、高容錯的分散式存儲系統,如HDFS中,以便從故障中進行恢復。checkpoint有兩種類型的數據:
1. 數據checkpoint
對於一些複雜程式,比如跨多個批次組合數據的有狀態轉換,生成的RDD依賴於先前批次的RDD,導致依賴鏈的長度隨批次的增加而增加。因為故障恢復時間與依賴鏈成正比,從而導致恢復時間也跟著增長。因此就有必要周期性的將RDD checkpoint到可靠的分散式存儲系統中,以此切斷依賴鏈。
這在Spark中的狀態運算元,如mapWithState、updateStateByKey中尤為常見。
2. 元數據checkpoint
顧名思義,就是將定義流式應用程式中的資訊保存到容錯系統中,用於從運行流應用程式的driver節點發生故障時,進行容錯恢復。元數據包括:
a. 配置:用於創建流應用程式DStream操作:
b. 定義流應用程式的DStream操作集
c. 未完成的批次:未完成的批次job
本文的重點不在於checkpoint具體含義,而是以Spark為例,闡述如何通過程式獲取checkpoint中最新的offset,以此為思路,來解決生產中的實際問題。
通常我們會checkpoint到HDFS,首先來看一下checkpoint資訊:
offsets目錄記錄了每個批次中的offset,此目錄中的第N條記錄表示當前正在處理,第N-1個及之前的記錄指示哪些偏移已處理完成。
/bigdatalearnshare/checkpointLocation/binlog-2-kafka/commits
/bigdatalearnshare/checkpointLocation/binlog-2-kafka/metadata
/bigdatalearnshare/checkpointLocation/binlog-2-kafka/offsets
/bigdatalearnshare/checkpointLocation/binlog-2-kafka/receivedData
/bigdatalearnshare/checkpointLocation/binlog-2-kafka/sources
hdfs dfs -ls /bigdatalearnshare/checkpointLocation/binlog-2-kafka/offsets
/bigdatalearnshare/checkpointLocation/binlog-2-kafka/offsets/0
/bigdatalearnshare/checkpointLocation/binlog-2-kafka/offsets/1
/bigdatalearnshare/checkpointLocation/binlog-2-kafka/offsets/2
hdfs dfs -cat /bigdatalearnshare/checkpointLocation/binlog-2-kafka/offsets/2
v1
{"batchWatermarkMs":0,"batchTimestampMs":1590632490083,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"1"}}
2400000001667289
最終獲取最新offset的程式示例:
/**
* @Author bigdatalearnshare
*/
object ReadOffsets {
def main(args: Array[String]): Unit = {
val path = "/bigdatalearnshare/checkpointLocation/binlog-2-kafka/offsets/2"
val fs = FileSystem.get(new Configuration())
val lastFile = fs.listStatus(new Path(path)).filterNot(_.getPath.getName.endsWith(".tmp.crc"))
.map { fileName =>
(fileName.getPath.getName.split("/").last.toInt, fileName.getPath)
}.maxBy(_._1)._2
val offset = readFile(lastFile.toString).split("\n").last
assert("2400000001667289".equals(offset))
}
def readFile(path: String): String = {
val fs = FileSystem.get(new Configuration())
var br: BufferedReader = null
var line: String = null
val result = ArrayBuffer.empty[String]
try {
br = new BufferedReader(new InputStreamReader(fs.open(new Path(path))))
line = br.readLine()
while (line != null) {
result += line
line = br.readLine()
}
} finally {
if (br != null) br.close()
}
result.mkString("\n")
}
}
這一點在生產環境中還是有一定應用場景的,比如,通過解析mysql binlog日誌,將數據同步到kafka,然後再通過消費者程式消費kafka中的數據保存到存儲系統中,如delta,通過offset資訊對比來校驗,binlog到kafka的延遲(如,通過獲取binlog中的offset資訊與流程式同步到kafka時進行checkpoint的offset做對比)、kafka到存儲系統中的延遲。
此外,要注意commits目錄下記錄的是已完成的批次資訊。在實際進行offset比對時,要以此為基準再去獲取offsets目錄下的offsets資訊。
關注微信公眾號:大數據學習與分享,獲取更對技術乾貨