Apache Flink 管理大型狀態之增量 Checkpoint 詳解
- 2019 年 10 月 4 日
- 筆記
來源 | zh.ververica.com
作者 | Stefan Ricther & Chris Ward 翻譯 | 邱從賢(山智)
Apache Flink 是一個有狀態的流計算框架,狀態是作業算子中已經處理過的內存狀態,供後續處理時使用。狀態在流計算很多複雜場景中非常重要,比如:
- 保存所有歷史記錄,用來尋找某種記錄模式
- 保存最近一分鐘的所有記錄,用於對每分鐘的記錄進行聚合統計
- 保存當前的模型參數,用於進行模型訓練
有狀態的流計算框架必須有很好的容錯性,才能在生產環境中發揮用處。這裡的容錯性是指,不管是發生硬件故障,還是程序異常,最終的結果不丟也不重。
Flink 的容錯性從一開始就是一個非常強大的特性,在遇到故障時,能夠保證不丟不重,且對正常邏輯處理的性能影響很小。
這裏面的核心就是 checkpoint 機制,Flink 使用 checkpoint 機制來進行狀態保證,在 Flink 中 checkpoint 是一個定時觸發的全局異步快照,並持久化到持久存儲系統上(通常是分佈式文件系統)。發生故障後,Flink 選擇從最近的一個快照進行恢復。有用戶的作業狀態達到 GB 甚至 TB 級別,對這麼大的作業狀態做一次 checkpoint 會非常耗時,耗資源,因此我們在 Flink 1.3 中引入了增量 checkpoint 機制。
在增量 checkpoint 之前,Flink 的每個 checkpoint 都包含作業的所有狀態。我們在觀察到狀態在 checkpoint 之間的變化並沒有那麼大之後,支持了增量 checkpoint。增量 checkpoint 僅包含上次 checkpoint 和本次 checkpoint 之間狀態的差異(也就是「增量」)。
對於狀態非常大的作業,增量 checkpoint 對性能的提升非常明顯。有生產用戶反饋對於 TB 級別的作業,使用增量 checkpoint 後能將 checkpoint 的整體時間從 3 分鐘降到 30 秒。這些時間節省主要歸功於不需要在每次 checkpoint 都將所有狀態寫到持久化存儲系統。
如何使用
當前,僅能夠在 RocksDB StateBackend 上使用增量 checkpoint 機制,Flink 依賴 RocksDB 內部的備份機制來生成 checkpoint 文件。Flink 會自動清理掉之前的 checkpoint 文件, 因此增量 checkpoint 的歷史記錄不會無限增長。
為了在作業中開啟增量 checkpoint,建議詳細閱讀 Apache Flink 的 checkpoint 文檔,簡單的說,你可以像之前一樣開啟 checkpoint,然後將構造函數的第二個參數設置為 true 來啟用增量 checkpoint。
Java 示例
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new RocksDBStateBackend(filebackend, true));
Scala 示例
val env = StreamExecutionEnvironment.getExecutionEnvironment() env.setStateBackend(new RocksDBStateBackend(filebackend, true))
Flink 默認保留一個成功的 checkpoint,如果你需要保留多個的話,可以通過下面的配置進行設置:
state.checkpoints.num-retained
原理解析
Flink 的增量 checkpoint 以 RocksDB 的 checkpoint 為基礎。RocksDB 是一個 LSM 結構的 KV 數據庫,把所有的修改保存在內存的可變緩存中(稱為 memtable),所有對 memtable 中 key 的修改,會覆蓋之前的 value,當前 memtable 滿了之後,RocksDB 會將所有數據以有序的寫到磁盤。當 RocksDB 將 memtable 寫到磁盤後,整個文件就不再可變,稱為有序字符串表(sstable)。
RocksDB 的後台壓縮線程會將 sstable 進行合併,就重複的鍵進行合併,合併後的 sstable 包含所有的鍵值對,RocksDB 會刪除合併前的 sstable。
在這個基礎上,Flink 會記錄上次 checkpoint 之後所有新生成和刪除的 sstable,另外因為 sstable 是不可變的,Flink 用 sstable 來記錄狀態的變化。為此,Flink 調用 RocksDB 的 flush,強制將 memtable 的數據全部寫到 sstable,並硬鏈到一個臨時目錄中。這個步驟是在同步階段完成,其他剩下的部分都在異步階段完成,不會阻塞正常的數據處理。
Flink 將所有新生成的 sstable 備份到持久化存儲(比如 HDFS,S3),並在新的 checkpoint 中引用。Flink 並不備份前一個 checkpoint 中已經存在的 sstable,而是引用他們。Flink 還能夠保證所有的 checkpoint 都不會引用已經刪除的文件,因為 RocksDB 中文件刪除是由壓縮完成的,壓縮後會將原來的內容合併寫成一個新的 sstable。因此,Flink 增量 checkpoint 能夠切斷 checkpoint 歷史。
為了追蹤 checkpoint 間的差距,備份合併後的 sstable 是一個相對冗餘的操作。但是 Flink 會增量的處理,增加的開銷通常很小,並且可以保持一個更短的 checkpoint 歷史,恢復時從更少的 checkpoint 進行讀取文件,因此我們認為這是值得的。
舉個栗子

添加描述
上圖以一個有狀態的算子為例,checkpoint 最多保留 2 個,上圖從左到右分別記錄每次 checkpoint 時本地的 RocksDB 狀態文件,引用的持久化存儲上的文件,以及當前 checkpoint 完成後文件的引用計數情況。
- Checkpoint 1 的時候,本地 RocksDB 包含兩個 sstable 文件,該 checkpoint 會把這兩個文件備份到持久化存儲,當 checkpoint 完成後,對這兩個文件的引用計數進行加 1,引用計數使用鍵值對的方式保存,其中鍵由算子的當前並發以及文件名所組成。我們同時會維護一個引用計數中鍵到對應文件的隱射關係。
- Checkpoint 2 的時候,RocksDB 生成兩個新的 sstable 文件,並且兩個舊的文件還存在。Flink 會把兩個新的文件進行備份,然後引用兩個舊的文件,當 checkpoint 完成時,Flink 對這 4 個文件都進行引用計數 +1 操作。
- Checkpoint 3 的時候,RocksDB 將 sstable-(1),sstable-(2) 以及 sstable-(3) 合併成 sstable-(1,2,3),並且刪除了三個舊文件,新生成的文件包含了三個刪除文件的所有鍵值對。sstable-(4) 還繼續存在,生成一個新的 sstable-(5) 文件。Flink 會將 sstable-(1,2,3) 和 sstable-(5) 備份到持久化存儲,然後增加 sstable-4 的引用計數。由於保存的 checkpoint 數達到上限(2 個),因此會刪除 checkpoint 1,然後對 checkpoint 1 中引用的所有文件(sstable-(1) 和 sstable-(2))的引用計數進行 -1 操作。
- Checkpoint 4 的時候,RocksDB 將 sstable-(4),sstable-(5) 以及新生成的 sstable-(6) 合併成一個新的 sstable-(4,5,6)。Flink 將 sstable-(4,5,6) 備份到持久化存儲,並對 sstabe-(1,2,3) 和 sstable-(4,5,6) 進行引用計數 +1 操作,然後刪除 checkpoint 2,並對 checkpoint 引用的文件進行引用計數 -1 操作。這個時候 sstable-(1),sstable-(2) 以及 sstable-(3) 的引用計數變為 0,Flink 會從持久化存儲刪除這三個文件。
競爭問題以及並發 checkpoint
Flink 支持並發 checkpoint,有時晚觸發的 checkpoint 會先完成,因此增量 checkpoint 需要選擇一個正確的基準。Flink 僅會引用成功的 checkpoint 文件,從而防止引用一些被刪除的文件。
從 checkpoint 恢復以及性能
開啟增量 checkpoint 之後,不需要再進行其他額外的配置。如果 Job 異常,Flink 的 JobMaster 會通知所有 task 從上一個成功的 checkpoint 進行恢復,不管是全量 checkpoint 還是增量 checkpoint。每個 TaskManager 會從持久化存儲下載他們需要的狀態文件。
儘管增量 checkpoint 能減少大狀態下的 checkpoint 時間,但是天下沒有免費的午餐,我們需要在其他方面進行捨棄。增量 checkpoint 可以減少 checkpoint 的總時間,但是也可能導致恢復的時候需要更長的時間。如果集群的故障頻繁,Flink 的 TaskManager 需要從多個 checkpoint 中下載需要的狀態文件(這些文件中包含一些已經被刪除的狀態),作業恢復的整體時間可能比不使用增量 checkpoint 更長。
另外在增量 checkpoint 情況下,我們不能刪除舊 checkpoint 生成的文件,因為新的 checkpoint 會繼續引用它們,這可能導致需要更多的存儲空間,並且恢復的時候可能消耗更多的帶寬。
公眾號《小晨說數據》,專註互聯網熱門技術的公眾號。
