如何獲取流式應用程式中checkpoint的最新offset

對於流式應用程式,保證應用7*24小時的穩定運行,是非常必要的。因此對於計算引擎,要求必須能夠適應與應用程式邏輯本身無關的問題(比如driver應用失敗重啟、網路問題、伺服器問題、JVM崩潰等),具有自動容錯恢復的功能。

目前,Spark(Spark Streaming/Structured Streaming)和Flink的checkpoint機制,就是處理類似情況,實現容錯機制的核心利器。

對於Flink:

為了保證其高可用、Exactly Once的特性,提供了一套強大的checkpoint機制,它能夠根據配置周期性地基於流中各個operator的狀態來生成快照,從而將這些狀態數據定期持久化存儲下來,當Flink程式一旦出現故障時,能夠將整個應用流程式恢復到故障前的某一種態,從而修正因為故障帶來的程式數據狀態中斷。

對於Spark:

在流式應用中,Spark Streaming/Structured Streaming會將關於應用足夠多的資訊checkpoint到高可用、高容錯的分散式存儲系統,如HDFS中,以便從故障中進行恢復。checkpoint有兩種類型的數據:

1. 數據checkpoint

對於一些複雜程式,比如跨多個批次組合數據的有狀態轉換,生成的RDD依賴於先前批次的RDD,導致依賴鏈的長度隨批次的增加而增加。因為故障恢復時間與依賴鏈成正比,從而導致恢復時間也跟著增長。因此就有必要周期性的將RDD checkpoint到可靠的分散式存儲系統中,以此切斷依賴鏈。

這在Spark中的狀態運算元,如mapWithState、updateStateByKey中尤為常見。

2. 元數據checkpoint

顧名思義,就是將定義流式應用程式中的資訊保存到容錯系統中,用於從運行流應用程式的driver節點發生故障時,進行容錯恢復。元數據包括:

a. 配置:用於創建流應用程式DStream操作:

b. 定義流應用程式的DStream操作集

c. 未完成的批次:未完成的批次job

本文的重點不在於checkpoint具體含義,而是以Spark為例,闡述如何通過程式獲取checkpoint中最新的offset,以此為思路,來解決生產中的實際問題。

通常我們會checkpoint到HDFS,首先來看一下checkpoint資訊:


offsets目錄記錄了每個批次中的offset,此目錄中的第N條記錄表示當前正在處理,第N-1個及之前的記錄指示哪些偏移已處理完成。

/bigdatalearnshare/checkpointLocation/binlog-2-kafka/commits
/bigdatalearnshare/checkpointLocation/binlog-2-kafka/metadata
/bigdatalearnshare/checkpointLocation/binlog-2-kafka/offsets
/bigdatalearnshare/checkpointLocation/binlog-2-kafka/receivedData
/bigdatalearnshare/checkpointLocation/binlog-2-kafka/sources

 

hdfs dfs -ls /bigdatalearnshare/checkpointLocation/binlog-2-kafka/offsets

/bigdatalearnshare/checkpointLocation/binlog-2-kafka/offsets/0
/bigdatalearnshare/checkpointLocation/binlog-2-kafka/offsets/1
/bigdatalearnshare/checkpointLocation/binlog-2-kafka/offsets/2

 

hdfs dfs -cat /bigdatalearnshare/checkpointLocation/binlog-2-kafka/offsets/2

v1
{"batchWatermarkMs":0,"batchTimestampMs":1590632490083,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"1"}}
2400000001667289

 

最終獲取最新offset的程式示例:

/**
  * @Author bigdatalearnshare
  */
object ReadOffsets {

  def main(args: Array[String]): Unit = {
    val path = "/bigdatalearnshare/checkpointLocation/binlog-2-kafka/offsets/2"

    val fs = FileSystem.get(new Configuration())

    val lastFile = fs.listStatus(new Path(path)).filterNot(_.getPath.getName.endsWith(".tmp.crc"))
      .map { fileName =>
        (fileName.getPath.getName.split("/").last.toInt, fileName.getPath)
      }.maxBy(_._1)._2

    val offset = readFile(lastFile.toString).split("\n").last

    assert("2400000001667289".equals(offset))
  }

  def readFile(path: String): String = {
    val fs = FileSystem.get(new Configuration())
    var br: BufferedReader = null
    var line: String = null
    val result = ArrayBuffer.empty[String]
    try {
      br = new BufferedReader(new InputStreamReader(fs.open(new Path(path))))
      line = br.readLine()
      while (line != null) {
        result += line
        line = br.readLine()
      }
    } finally {
      if (br != null) br.close()
    }

    result.mkString("\n")
  }

}

 

這一點在生產環境中還是有一定應用場景的,比如,通過解析mysql binlog日誌,將數據同步到kafka,然後再通過消費者程式消費kafka中的數據保存到存儲系統中,如delta,通過offset資訊對比來校驗,binlog到kafka的延遲(如,通過獲取binlog中的offset資訊與流程式同步到kafka時進行checkpoint的offset做對比)、kafka到存儲系統中的延遲。

此外,要注意commits目錄下記錄的是已完成的批次資訊。在實際進行offset比對時,要以此為基準再去獲取offsets目錄下的offsets資訊。


關注微信公眾號:大數據學習與分享,獲取更對技術乾貨