Flink 狀態管理與 Checkpoint 機制

  • 2020 年 2 月 18 日
  • 筆記

一、狀態分類

相對於其他流計算框架,Flink 一個比較重要的特性就是其支援有狀態計算。即你可以將中間的計算結果進行保存,並提供給後續的計算使用:

具體而言,Flink 又將狀態 (State) 分為 Keyed State 與 Operator State。

1.1 運算元狀態

運算元狀態 (Operator State):顧名思義,狀態是和運算元進行綁定的,一個運算元的狀態不能被其他運算元所訪問到。官方文檔上對 Operator State 的解釋是:each operator state is bound to one parallel operator instance,所以更為確切的說一個運算元狀態是與一個並發的運算元實例所綁定的,即假設運算元的並行度是 2,那麼其應有兩個對應的運算元狀態:

1.2 鍵控狀態

鍵控狀態 (Keyed State) :是一種特殊的運算元狀態,即狀態是根據 key 值進行區分的,Flink 會為每類鍵值維護一個狀態實例。如下圖所示,每個顏色代表不同 key 值,對應四個不同的狀態實例。需要注意的是鍵控狀態只能在 KeyedStream 上進行使用,我們可以通過 stream.keyBy(…) 來得到 KeyedStream 。

二、狀態編程

2.1 鍵控狀態

Flink 提供了以下數據格式來管理和存儲鍵控狀態 (Keyed State):

•ValueState:存儲單值類型的狀態。可以使用 update(T) 進行更新,並通過 T value() 進行檢索。•ListState:存儲列表類型的狀態。可以使用 add(T) 或 addAll(List) 添加元素;並通過 get() 獲得整個列表。•ReducingState:用於存儲經過 ReduceFunction 計算後的結果,使用 add(T) 增加元素。•AggregatingState:用於存儲經過 AggregatingState 計算後的結果,使用 add(IN) 添加元素。•FoldingState:已被標識為廢棄,會在未來版本中移除,官方推薦使用 AggregatingState 代替。•MapState:維護 Map 類型的狀態。

以上所有增刪改查方法不必硬記,在使用時通過語法提示來調用即可。這裡給出一個具體的使用示例:假設我們正在開發一個監控系統,當監控數據超過閾值一定次數後,需要發出報警資訊。這裡之所以要達到一定次數,是因為由於偶發原因,偶爾一次超過閾值並不能代表什麼,故需要達到一定次數後才觸發報警,這就需要使用到 Flink 的狀態編程。相關程式碼如下:

publicclassThresholdWarningextends  RichFlatMapFunction<Tuple2<String, Long>, Tuple2<String, List<Long>>> {    // 通過ListState來存儲非正常數據的狀態  privatetransientListState<Long> abnormalData;  // 需要監控的閾值  privateLong threshold;  // 觸發報警的次數  privateInteger numberOfTimes;    ThresholdWarning(Long threshold, Integer numberOfTimes) {  this.threshold = threshold;  this.numberOfTimes = numberOfTimes;  }    @Override  publicvoid open(Configuration parameters) {  // 通過狀態名稱(句柄)獲取狀態實例,如果不存在則會自動創建          abnormalData = getRuntimeContext().getListState(  newListStateDescriptor<>("abnormalData", Long.class));  }    @Override  publicvoid flatMap(Tuple2<String, Long> value, Collector<Tuple2<String, List<Long>>> out)  throwsException{  Long inputValue = value.f1;  // 如果輸入值超過閾值,則記錄該次不正常的數據資訊  if(inputValue >= threshold) {              abnormalData.add(inputValue);  }  ArrayList<Long> list = Lists.newArrayList(abnormalData.get().iterator());  // 如果不正常的數據出現達到一定次數,則輸出報警資訊  if(list.size() >= numberOfTimes) {  out.collect(Tuple2.of(value.f0 + " 超過指定閾值 ", list));  // 報警資訊輸出後,清空狀態              abnormalData.clear();  }  }  }  

調用自定義的狀態監控,這裡我們使用 a,b 來代表不同類型的監控數據,分別對其數據進行監控:

finalStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  DataStreamSource<Tuple2<String, Long>> tuple2DataStreamSource = env.fromElements(  Tuple2.of("a", 50L), Tuple2.of("a", 80L), Tuple2.of("a", 400L),  Tuple2.of("a", 100L), Tuple2.of("a", 200L), Tuple2.of("a", 200L),  Tuple2.of("b", 100L), Tuple2.of("b", 200L), Tuple2.of("b", 200L),  Tuple2.of("b", 500L), Tuple2.of("b", 600L), Tuple2.of("b", 700L));  tuple2DataStreamSource  .keyBy(0)  .flatMap(newThresholdWarning(100L, 3))  // 超過100的閾值3次後就進行報警  .printToErr();  env.execute("Managed Keyed State");  

輸出如下結果如下:

2.2 狀態有效期

以上任何類型的 keyed state 都支援配置有效期 (TTL) ,示例如下:

StateTtlConfig ttlConfig = StateTtlConfig  // 設置有效期為 10 秒  .newBuilder(Time.seconds(10))  // 設置有效期更新規則,這裡設置為當創建和寫入時,都重置其有效期到規定的10秒  .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)  /*設置只要值過期就不可見,另外一個可選值是ReturnExpiredIfNotCleanedUp,  代表即使值過期了,但如果還沒有被物理刪除,就是可見的*/  .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)  .build();  ListStateDescriptor<Long> descriptor = newListStateDescriptor<>("abnormalData", Long.class);  descriptor.enableTimeToLive(ttlConfig);  

2.3 運算元狀態

相比於鍵控狀態,運算元狀態目前支援的存儲類型只有以下三種:

•ListState:存儲列表類型的狀態。•UnionListState:存儲列表類型的狀態,與 ListState 的區別在於:如果並行度發生變化,ListState 會將該運算元的所有並發的狀態實例進行匯總,然後均分給新的 Task;而 UnionListState 只是將所有並發的狀態實例匯總起來,具體的劃分行為則由用戶進行定義。•BroadcastState:用於廣播的運算元狀態。

這裡我們繼續沿用上面的例子,假設此時我們不需要區分監控數據的類型,只要有監控數據超過閾值並達到指定的次數後,就進行報警,程式碼如下:

  publicclassThresholdWarningextendsRichFlatMapFunction<Tuple2<String, Long>,  Tuple2<String, List<Tuple2<String, Long>>>> implementsCheckpointedFunction{    // 非正常數據  privateList<Tuple2<String, Long>> bufferedData;  // checkPointedState  privatetransientListState<Tuple2<String, Long>> checkPointedState;  // 需要監控的閾值  privateLong threshold;  // 次數  privateInteger numberOfTimes;    ThresholdWarning(Long threshold, Integer numberOfTimes) {  this.threshold = threshold;  this.numberOfTimes = numberOfTimes;  this.bufferedData = newArrayList<>();  }    @Override  publicvoid initializeState(FunctionInitializationContext context) throwsException{  // 注意這裡獲取的是OperatorStateStore          checkPointedState = context.getOperatorStateStore().              getListState(newListStateDescriptor<>("abnormalData",  TypeInformation.of(newTypeHint<Tuple2<String, Long>>() {  })));  // 如果發生重啟,則需要從快照中將狀態進行恢復  if(context.isRestored()) {  for(Tuple2<String, Long> element : checkPointedState.get()) {                  bufferedData.add(element);  }  }  }    @Override  publicvoid flatMap(Tuple2<String, Long> value,  Collector<Tuple2<String, List<Tuple2<String, Long>>>> out) {  Long inputValue = value.f1;  // 超過閾值則進行記錄  if(inputValue >= threshold) {              bufferedData.add(value);  }  // 超過指定次數則輸出報警資訊  if(bufferedData.size() >= numberOfTimes) {  // 順便輸出狀態實例的hashcode  out.collect(Tuple2.of(checkPointedState.hashCode() + "閾值警報!", bufferedData));              bufferedData.clear();  }  }    @Override  publicvoid snapshotState(FunctionSnapshotContext context) throwsException{  // 在進行快照時,將數據存儲到checkPointedState          checkPointedState.clear();  for(Tuple2<String, Long> element : bufferedData) {              checkPointedState.add(element);  }  }  }  

調用自定義運算元狀態,這裡需要將並行度設置為 1:

finalStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  // 開啟檢查點機制  env.enableCheckpointing(1000);  // 設置並行度為1  DataStreamSource<Tuple2<String, Long>> tuple2DataStreamSource = env.setParallelism(1).fromElements(  Tuple2.of("a", 50L), Tuple2.of("a", 80L), Tuple2.of("a", 400L),  Tuple2.of("a", 100L), Tuple2.of("a", 200L), Tuple2.of("a", 200L),  Tuple2.of("b", 100L), Tuple2.of("b", 200L), Tuple2.of("b", 200L),  Tuple2.of("b", 500L), Tuple2.of("b", 600L), Tuple2.of("b", 700L));  tuple2DataStreamSource  .flatMap(newThresholdWarning(100L, 3))  .printToErr();  env.execute("Managed Keyed State");  }  

此時輸出如下:

在上面的調用程式碼中,我們將程式的並行度設置為 1,可以看到三次輸出中狀態實例的 hashcode 全是一致的,證明它們都同一個狀態實例。假設將並行度設置為 2,此時輸出如下:

可以看到此時兩次輸出中狀態實例的 hashcode 是不一致的,代表它們不是同一個狀態實例,這也就是上文提到的,一個運算元狀態是與一個並發的運算元實例所綁定的。同時這裡只輸出兩次,是因為在並發處理的情況下,執行緒 1 可能拿到 5 個非正常值,執行緒 2 可能拿到 4 個非正常值,因為要大於 3 次才能輸出,所以在這種情況下就會出現只輸出兩條記錄的情況,所以需要將程式的並行度設置為 1。

三、檢查點機制

3.1 CheckPoints

為了使 Flink 的狀態具有良好的容錯性,Flink 提供了檢查點機制 (CheckPoints) 。通過檢查點機制,Flink 定期在數據流上生成 checkpoint barrier ,當某個運算元收到 barrier 時,即會基於當前狀態生成一份快照,然後再將該 barrier 傳遞到下游運算元,下游運算元接收到該 barrier 後,也基於當前狀態生成一份快照,依次傳遞直至到最後的 Sink 運算元上。當出現異常後,Flink 就可以根據最近的一次的快照數據將所有運算元恢復到先前的狀態。

3.2 開啟檢查點

默認情況下,檢查點機制是關閉的,需要在程式中進行開啟:

// 開啟檢查點機制,並指定狀態檢查點之間的時間間隔  env.enableCheckpointing(1000);    // 其他可選配置如下:  // 設置語義  env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);  // 設置兩個檢查點之間的最小時間間隔  env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);  // 設置執行Checkpoint操作時的超時時間  env.getCheckpointConfig().setCheckpointTimeout(60000);  // 設置最大並發執行的檢查點的數量  env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);  // 將檢查點持久化到外部存儲  env.getCheckpointConfig().enableExternalizedCheckpoints(  ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);  // 如果有更近的保存點時,是否將作業回退到該檢查點  env.getCheckpointConfig().setPreferCheckpointForRecovery(true);  

3.3 保存點機制

保存點機制 (Savepoints) 是檢查點機制的一種特殊的實現,它允許你通過手工的方式來觸發 Checkpoint,並將結果持久化存儲到指定路徑中,主要用於避免 Flink 集群在重啟或升級時導致狀態丟失。示例如下:

觸髮指定id的作業的Savepoint,並將結果存儲到指定目錄下

bin/flink savepoint :jobId [:targetDirectory] 複製程式碼更多命令和配置可以參考官方文檔:savepoints

四、狀態後端

4.1 狀態管理器分類

默認情況下,所有的狀態都存儲在 JVM 的堆記憶體中,在狀態數據過多的情況下,這種方式很有可能導致記憶體溢出,因此 Flink 該提供了其它方式來存儲狀態數據,這些存儲方式統一稱為狀態後端 (或狀態管理器):

主要有以下三種:

•MemoryStateBackend

默認的方式,即基於 JVM 的堆記憶體進行存儲,主要適用於本地開發和調試。

•FsStateBackend

基於文件系統進行存儲,可以是本地文件系統,也可以是 HDFS 等分散式文件系統。需要注意而是雖然選擇使用了 FsStateBackend ,但正在進行的數據仍然是存儲在 TaskManager 的記憶體中的,只有在 checkpoint 時,才會將狀態快照寫入到指定文件系統上。

•RocksDBStateBackend

RocksDBStateBackend 是 Flink 內置的第三方狀態管理器,採用嵌入式的 key-value 型資料庫 RocksDB 來存儲正在進行的數據。等到 checkpoint 時,再將其中的數據持久化到指定的文件系統中,所以採用 RocksDBStateBackend 時也需要配置持久化存儲的文件系統。之所以這樣做是因為 RocksDB 作為嵌入式資料庫安全性比較低,但比起全文件系統的方式,其讀取速率更快;比起全記憶體的方式,其存儲空間更大,因此它是一種比較均衡的方案。

4.2 配置方式

Flink 支援使用兩種方式來配置後端管理器:第一種方式:基於程式碼方式進行配置,只對當前作業生效:

// 配置 FsStateBackend  env.setStateBackend(newFsStateBackend("hdfs://namenode:40010/flink/checkpoints"));  // 配置 RocksDBStateBackend  env.setStateBackend(newRocksDBStateBackend("hdfs://namenode:40010/flink/checkpoints"));  

配置 RocksDBStateBackend 時,需要額外導入下面的依賴:

  <dependency>  <groupId>org.apache.flink</groupId>  <artifactId>flink-statebackend-rocksdb_2.11</artifactId>  <version>1.9.0</version>  </dependency>  

第二種方式:基於 flink-conf.yaml 配置文件的方式進行配置,對所有部署在該集群上的作業都生效:

state.backend: filesystem  state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints

參考資料

•Working with State•Checkpointing•Savepoints•State Backends•Fabian Hueske , Vasiliki Kalavri . 《Stream Processing with Apache Flink》. O'Reilly Media . 2019-4-30

本文轉自:https://juejin.im/post/5dd2661cf265da0bf175d5bb

如果覺得文章對你有幫助,請轉發朋友圈、點在看,讓更多人獲益,感謝您的支援!

END  關注我  公眾號(zhisheng)里回復 面經、ES、Flink、 Spring、Java、Kafka、監控 等關鍵字可以查看更多關鍵字對應的文章。    Flink 源碼解析        知識星球裡面可以看到下面文章        Flink 系列文章