Flink CheckPoint奇巧 | 原理和在生產中的應用

  • 2019 年 10 月 6 日
  • 筆記

來源:暴走大數據

作者:王知無

By 暴走大數據

場景描述:Flink本身為了保證其高可用的特性,以及保證作用的Exactly Once的快速恢復,進而提供了一套強大的Checkpoint機制。這個機制在原理是什麼?有哪些需要注意的呢?

關鍵詞:Flink CheckPoint

Flink本身為了保證其高可用的特性,以及保證作用的Exactly Once的快速恢復,進而提供了一套強大的Checkpoint機制。

Checkpoint機制是Flink可靠性的基石,可以保證Flink集群在某個運算元因為某些原因(如異常退出)出現故障時,能夠將整個應用流圖的狀態恢復到故障之前的某一狀態,保 證應用流圖狀態的一致性。Flink的Checkpoint機制原理來自「Chandy-Lamport algorithm」演算法 (分散式快照演算法)。

Checkpoint的執行流程

每個需要checkpoint的應用在啟動時,Flink的JobManager為其創建一個 CheckpointCoordinator,CheckpointCoordinator全權負責本應用的快照製作。

  • CheckpointCoordinator周期性的向該流應用的所有source運算元發送barrier;
  • 當某個source運算元收到一個barrier時,便暫停數據處理過程,然後將自己的當前狀 態製作成快照,並保存到指定的持久化存儲中,最後向CheckpointCoordinator報告 自己快照製作情況,同時向自身所有下游運算元廣播該barrier,恢複數據處理;
  • 下游運算元收到barrier之後,會暫停自己的數據處理過程,然後將自身的相關狀態製作成快照,並保存到指定的持久化存儲中,最後向CheckpointCoordinator報告自身 快照情況,同時向自身所有下游運算元廣播該barrier,恢複數據處理;
  • 每個運算元按照步驟3不斷製作快照並向下游廣播,直到最後barrier傳遞到sink運算元,快照製作完成。
  • 當CheckpointCoordinator收到所有運算元的報告之後,認為該周期的快照製作成功; 否則,如果在規定的時間內沒有收到所有運算元的報告,則認為本周期快照製作失敗

Checkpoint常用設置

// start a checkpoint every 1000 ms  env.enableCheckpointing(1000);    // advanced options:    // set mode to exactly-once (this is the default)  env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);    // checkpoints have to complete within one minute, or are discarded  env.getCheckpointConfig().setCheckpointTimeout(60000);    // make sure 500 ms of progress happen between checkpoints  env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);    // allow only one checkpoint to be in progress at the same time  env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);    // enable externalized checkpoints which are retained after job cancellation  env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);    // This determines if a task will be failed if an error occurs in the execution of the task』s checkpoint procedure.  env.getCheckpointConfig().setFailOnCheckpointingErrors(true);  
  • 使用StreamExecutionEnvironment.enableCheckpointing方法來設置開啟checkpoint;具體可以使用enableCheckpointing(long interval),或者enableCheckpointing(long interval, CheckpointingMode mode);interval用於指定checkpoint的觸發間隔(單位milliseconds),而CheckpointingMode默認是CheckpointingMode.EXACTLY_ONCE,也可以指定為CheckpointingMode.AT_LEAST_ONCE
  • 也可以通過StreamExecutionEnvironment.getCheckpointConfig().setCheckpointingMode來設置CheckpointingMode,一般對於超低延遲的應用(大概幾毫秒)可以使用CheckpointingMode.AT_LEAST_ONCE,其他大部分應用使用CheckpointingMode.EXACTLY_ONCE就可以
  • checkpointTimeout用於指定checkpoint執行的超時時間(單位milliseconds),超時沒完成就會被abort掉
  • minPauseBetweenCheckpoints用於指定checkpoint coordinator上一個checkpoint完成之後最小等多久可以出發另一個checkpoint,當指定這個參數時,maxConcurrentCheckpoints的值為1
  • maxConcurrentCheckpoints用於指定運行中的checkpoint最多可以有多少個,用於包裝topology不會花太多的時間在checkpoints上面;如果有設置了minPauseBetweenCheckpoints,則maxConcurrentCheckpoints這個參數就不起作用了(大於1的值不起作用)
  • enableExternalizedCheckpoints用於開啟checkpoints的外部持久化,但是在job失敗的時候不會自動清理,需要自己手工清理state;ExternalizedCheckpointCleanup用於指定當job canceled的時候externalized checkpoint該如何清理,DELETE_ON_CANCELLATION的話,在job canceled的時候會自動刪除externalized state,但是如果是FAILED的狀態則會保留;RETAIN_ON_CANCELLATION則在job canceled的時候會保留externalized checkpoint state
  • failOnCheckpointingErrors用於指定在checkpoint發生異常的時候,是否應該fail該task,默認為true,如果設置為false,則task會拒絕checkpoint然後繼續運行

flink-conf.yaml相關配置

#==============================================================================  # Fault tolerance and checkpointing  #==============================================================================    # The backend that will be used to store operator state checkpoints if  # checkpointing is enabled.  #  # Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the  # <class-name-of-factory>.  #  # state.backend: filesystem    # Directory for checkpoints filesystem, when using any of the default bundled  # state backends.  #  # state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints    # Default target directory for savepoints, optional.  #  # state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints    # Flag to enable/disable incremental checkpoints for backends that  # support incremental checkpoints (like the RocksDB state backend).  #  # state.backend.incremental: false  
  • state.backend用於指定checkpoint state存儲的backend,默認為none
  • state.backend.async用於指定backend是否使用非同步snapshot(默認為true),有些不支援async或者只支援async的state backend可能會忽略這個參數
  • state.backend.fs.memory-threshold,默認為1024,用於指定存儲於files的state大小閾值,如果小於該值則會存儲在root checkpoint metadata file
  • state.backend.incremental,默認為false,用於指定是否採用增量checkpoint,有些不支援增量checkpoint的backend會忽略該配置
  • state.backend.local-recovery,默認為false
  • state.checkpoints.dir,默認為none,用於指定checkpoint的data files和meta data存儲的目錄,該目錄必須對所有參與的TaskManagers及JobManagers可見
  • state.checkpoints.num-retained,默認為1,用於指定保留的已完成的checkpoints個數
  • state.savepoints.dir,默認為none,用於指定savepoints的默認目錄
  • taskmanager.state.local.root-dirs,默認為none

增量檢查點- Checkpoint設置的奇技淫巧

增量式檢查點

Flink的檢查點是一個全局的、非同步的程式快照,它周期性的生成並送到持久化存儲(一般使用分散式系統)。當發生故障時,Flink使用最新的檢查點進行重啟。一些Flink的用戶在程式「狀態」中保存了GB甚至TB的數據。這些用戶回饋在大量 的狀態下,創建檢查點通常很慢並且耗資源,這也是為什麼Flink在 1.3版本開始引入「增量式的檢查點」。

在引入「增量式的檢查點」之前,每一個Flink的檢查點都保存了程式完整的狀態。後來我們意識到在大部分情況下這是不必要的,因為上一次和這次的檢查點之前 ,狀態發生了很大的變化,所以我們創建了「增量式的檢查點」。增量式的檢查點僅保存過去和現在狀態的差異部分。

增量式的檢查點可以為擁有大量狀態的程式帶來很大的提升。在早期的測試中,一個擁有TB級別「狀態」程式將生成檢查點的耗時從3分鐘以上降低 到了30秒左右。因為增量式的檢查點不需要每次把完整的狀態發送到存儲中。

現在只能通過RocksDB state back-end來獲取增量式檢查點的功能,Flink使用RocksDB內置的備份機制來合併檢查點數據。這樣Flink增量式檢查點的數據不會無限制的增大,它會自動合併老的檢查點數據並清理掉。

要啟用這個機制,可以如下設置: RocksDBStateBackend backend = new RocksDBStateBackend(filebackend, true);

增量式檢查點如何工作

Flink 增量式的檢查點以「RocksDB」為基礎,RocksDB是一個基於 LSM樹的KV存儲,新的數據保存在記憶體中,稱為memtable。如果Key相同,後到的數據將覆蓋之前的數據,一旦memtable寫滿了,RocksDB將數據壓縮並寫入到磁碟。memtable的數據持久化到磁碟後,他們就變成了不可變的sstable。

RocksDB會在後台執行compaction,合併sstable並刪除其中重複的數據。之後RocksDB刪除原來的sstable,替換成新合成的ssttable,這個sstable包含了之前的sstable中的資訊。

在這個基礎之上,Flink跟蹤前一個checkpoint創建和刪除的RocksDB sstable文件,因為sstable是不可變的,Flink可以因此計算出 狀態有哪些改變。為了達到這個目標,Flink在RocksDB上觸發了一個刷新操作,強制將memtable刷新到磁碟上。這個操作在Flink中是同步的,其他的操作是非同步的,不會阻塞數據處理。

Flink 的checkpoint會將新的sstable發送到持久化存儲(例如HDFS,S3)中,同時保留引用。Flink不會發送所有的sstable, 一些數據在之前的checkpoint存在並且寫入到持久化存儲中了,這樣只需要增加引用次數就可以了。因為compaction的作用,一些sstable會合併成一個sstable並刪除這些sstable,這也是為什麼Flink可以減少checkpoint的歷史文件。

為了分析checkpoint的數據變更,而上傳整理過的sstable是多餘的(這裡的意思是之前已經上傳過的,不需要再次上傳)。Flink處理這種情況,僅帶來一點點開銷。這個過程很重要,因為在任務需要重啟的時候,Flink只需要保留較少的歷史文件。

假設有一個子任務,擁有一個keyed state的operator,checkpoint最多保留2個。上面的圖片描述了每個checkpoint對應的RocksDB 的狀態,它引用到的文件,以及在checkpoint完成後共享狀態中的count值。

checkpoint 『CP2』,本地的RocksDB目錄有兩個sstable文件,這些文件是新生成的,於是Flink將它們傳到了checkpoint 對應的存儲目錄。當checkpoint完成後,Flink在共享狀態中創建兩個實體,並將count設為1。在這個共享狀態中,這個key 由operator、subtask,原始的sstable名字組成,value為sstable實際存儲目錄。

checkpoint『CP2』,RocksDB有2個老的sstable文件,又創建了2個新的sstable文件。Flink將這兩個新的sstable傳到 持久化存儲中,然後引用他們。當checkpoint完成後,Flink將所有的引用的相應計數加1。

checkpoint『CP3』,RocksDB的compaction將sstable-(1), sstable-(2), sstable-(3) 合併成 sstable-(1,2,3),然後刪除 原始的sstable。這個合併後的文件包含了和之前源文件一樣的資訊,並且清理掉了重複的部分。sstable-(4)還保留著,然後有一個 新生成的sstable-(5)。Flink將新的 sstable-(1,2,3)以及 sstable-(5)傳到持久化存儲中, sstable-(4)仍被『CP2』引用,所以 將計數增加1。現在有了3個checkpoint,'CP1','CP2','CP3',超過了預設的保留數目2,所以CP1被刪除。作為刪除的一部分, CP1對應的文件(sstable-(1)、sstable-(2)) 的引用計數減1。

checkpoint『CP4』,RocksDB將sstable-(4), sstable-(5), 新的 sstable-(6) 合併成 sstable-(4,5,6)。Flink將新合併 的 sstable-(4,5,6)發送到持久化存儲中,sstable-(1,2,3)、sstable-(4,5,6) 的引用計數增加1。由於再次到達了checkpoint的 保留數目,『CP2』將被刪除,『CP2』對應的文件(sstable-(1)、sstable-(2)、sstable(3) )的引用計數減1。由於『CP2』對應 的文件的引用計數達到0,這些文件將被刪除。

需要注意的地方

如果使用增量式的checkpoint,那麼在錯誤恢復的時候,不需要考慮很多的配置項。一旦發生了錯誤,Flink的JobManager會告訴 task需要從最新的checkpoint中恢復,它可以是全量的或者是增量的。之後TaskManager從分散式系統中下載checkpoint文件, 然後從中恢復狀態。

增量式的checkpoint能為擁有大量狀態的程式帶來較大的提升,但還有一些trade-off需要考慮。總的來說,增量式減少了checkpoint操作的時間,但是相對的,從checkpoint中恢復可能更耗時,具體情況需要根據應用程式包含的狀態大小而定。相對的,如果程式只是部分失敗,Flink TaskManager需要從多個checkpoint中讀取數據,這時候使用全量的checkpoint來恢複數據可能更加耗時。同時,由於新的checkpoint可能引用到老的checkpoint,這樣老的checkpoint就不能被刪除,這樣下去,歷史的版本數據會越來越大。需要考慮使用分散式來存儲checkpoint,另外還需要考慮讀取帶來的頻寬消耗。

歡迎點贊+收藏