如何获取流式应用程序中checkpoint的最新offset

对于流式应用程序,保证应用7*24小时的稳定运行,是非常必要的。因此对于计算引擎,要求必须能够适应与应用程序逻辑本身无关的问题(比如driver应用失败重启、网络问题、服务器问题、JVM崩溃等),具有自动容错恢复的功能。

目前,Spark(Spark Streaming/Structured Streaming)和Flink的checkpoint机制,就是处理类似情况,实现容错机制的核心利器。

对于Flink:

为了保证其高可用、Exactly Once的特性,提供了一套强大的checkpoint机制,它能够根据配置周期性地基于流中各个operator的状态来生成快照,从而将这些状态数据定期持久化存储下来,当Flink程序一旦出现故障时,能够将整个应用流程序恢复到故障前的某一种态,从而修正因为故障带来的程序数据状态中断。

对于Spark:

在流式应用中,Spark Streaming/Structured Streaming会将关于应用足够多的信息checkpoint到高可用、高容错的分布式存储系统,如HDFS中,以便从故障中进行恢复。checkpoint有两种类型的数据:

1. 数据checkpoint

对于一些复杂程序,比如跨多个批次组合数据的有状态转换,生成的RDD依赖于先前批次的RDD,导致依赖链的长度随批次的增加而增加。因为故障恢复时间与依赖链成正比,从而导致恢复时间也跟着增长。因此就有必要周期性的将RDD checkpoint到可靠的分布式存储系统中,以此切断依赖链。

这在Spark中的状态算子,如mapWithState、updateStateByKey中尤为常见。

2. 元数据checkpoint

顾名思义,就是将定义流式应用程序中的信息保存到容错系统中,用于从运行流应用程序的driver节点发生故障时,进行容错恢复。元数据包括:

a. 配置:用于创建流应用程序DStream操作:

b. 定义流应用程序的DStream操作集

c. 未完成的批次:未完成的批次job

本文的重点不在于checkpoint具体含义,而是以Spark为例,阐述如何通过程序获取checkpoint中最新的offset,以此为思路,来解决生产中的实际问题。

通常我们会checkpoint到HDFS,首先来看一下checkpoint信息:


offsets目录记录了每个批次中的offset,此目录中的第N条记录表示当前正在处理,第N-1个及之前的记录指示哪些偏移已处理完成。

/bigdatalearnshare/checkpointLocation/binlog-2-kafka/commits
/bigdatalearnshare/checkpointLocation/binlog-2-kafka/metadata
/bigdatalearnshare/checkpointLocation/binlog-2-kafka/offsets
/bigdatalearnshare/checkpointLocation/binlog-2-kafka/receivedData
/bigdatalearnshare/checkpointLocation/binlog-2-kafka/sources

 

hdfs dfs -ls /bigdatalearnshare/checkpointLocation/binlog-2-kafka/offsets

/bigdatalearnshare/checkpointLocation/binlog-2-kafka/offsets/0
/bigdatalearnshare/checkpointLocation/binlog-2-kafka/offsets/1
/bigdatalearnshare/checkpointLocation/binlog-2-kafka/offsets/2

 

hdfs dfs -cat /bigdatalearnshare/checkpointLocation/binlog-2-kafka/offsets/2

v1
{"batchWatermarkMs":0,"batchTimestampMs":1590632490083,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"1"}}
2400000001667289

 

最终获取最新offset的程序示例:

/**
  * @Author bigdatalearnshare
  */
object ReadOffsets {

  def main(args: Array[String]): Unit = {
    val path = "/bigdatalearnshare/checkpointLocation/binlog-2-kafka/offsets/2"

    val fs = FileSystem.get(new Configuration())

    val lastFile = fs.listStatus(new Path(path)).filterNot(_.getPath.getName.endsWith(".tmp.crc"))
      .map { fileName =>
        (fileName.getPath.getName.split("/").last.toInt, fileName.getPath)
      }.maxBy(_._1)._2

    val offset = readFile(lastFile.toString).split("\n").last

    assert("2400000001667289".equals(offset))
  }

  def readFile(path: String): String = {
    val fs = FileSystem.get(new Configuration())
    var br: BufferedReader = null
    var line: String = null
    val result = ArrayBuffer.empty[String]
    try {
      br = new BufferedReader(new InputStreamReader(fs.open(new Path(path))))
      line = br.readLine()
      while (line != null) {
        result += line
        line = br.readLine()
      }
    } finally {
      if (br != null) br.close()
    }

    result.mkString("\n")
  }

}

 

这一点在生产环境中还是有一定应用场景的,比如,通过解析mysql binlog日志,将数据同步到kafka,然后再通过消费者程序消费kafka中的数据保存到存储系统中,如delta,通过offset信息对比来校验,binlog到kafka的延迟(如,通过获取binlog中的offset信息与流程序同步到kafka时进行checkpoint的offset做对比)、kafka到存储系统中的延迟。

此外,要注意commits目录下记录的是已完成的批次信息。在实际进行offset比对时,要以此为基准再去获取offsets目录下的offsets信息。


关注微信公众号:大数据学习与分享,获取更对技术干货