Flink系列之狀態及檢查點

  • 2020 年 3 月 11 日
  • 筆記

  Flink不同於其他實時計算的框架之處是它可以提供針對不同的狀態進行編程和計算。本篇文章的主要思路如下,大家可以選擇性閱讀。

  1. Flink的狀態分類及不同點。

  2. Flink針對不同的狀態進行編程。

  3. 檢查點機制和配置。

  4. 狀態的存儲。

  

  •  Flilnk的狀態分類及不同點

    Flink有兩種不同的狀態分類,一種是Keyed State(鍵狀態),一種是Operator State(運算元狀態)。

    • Keyed State

      主要是針對KeyedStream中使用,當使用keyBy方法的時候進行計算。 我們都知道在計算的過程中就是將Flink按照<並行operator, key> 進行計算,每個key又歸屬於單個Operator,所以我們可以簡單的理解為<operator, key>。也就是說首先按Operator分配到不同的實例,然後在不同的實例中,相同的Key分配到相同的組中,然後這些狀態就可以在相同的組中進行獲取和計算。

    • Operator State

      主要針對不同的運算元的狀態計算。按照不同的運算元如Map, FlatMap,Reduce等運算元去分配不同的實例群。像Kafka Connector的例子就很好的應用了這個功能,根據不同的topic去讀取不同的狀態,比如計算獲取到topic的paritition分區和 offset偏移量。 每個運算元實例會維護著這個topic的partition及offset的Map狀態,這個例子就是很好的使用了Opertator的state。如果Operator並行度發生改變了的話,那麼狀態也會相應的分配好對應的狀態。

 

  • 可管理的及原生狀態

  這兩種狀態又分為 Managed State (可管理狀態)和 Raw State (原生狀態)

    • Managed State : 可管理狀態就是自己去定義和編寫狀態處理的邏輯,全部由自己和Flink進行控制。
    • Raw State : 原生狀態也就是在Operator運算元觸發 checkPoint 檢查點的時候,Flink會在其數據結構中寫入一部分位元組碼Byte,Flink只能看到其中有一些碼,但是無法去進行控制。

  所有的流數據功能都可以使用Managed State,這個也是Flink編程所推薦的。因為要使用Raw State的話比較底層也比較複雜,要實現運算元方法時才使用。

 

  • Flink針對不同的狀態進行編程

  我們只針對可管理的狀態進行操作,不同的管理 Keyed State 和Operator State 狀態原始方法定義可參考官網介紹

    • Keyed State

     我們針對Keyed managed state進行編程。來個場景,假如Flink計算某個功能的時間,如果某個功能Key時間超過某個閾值了則進行計數,如果數據超過了設置的次數,那麼直接輸出到控制台。直接參考如下程式碼。

  程式碼大致的思路是:

  繼承RichFlatMapFunction, 定義一個ListState<Long>用於記錄當前的狀態。

  定義閾值和錯誤次數值,觸發後直接輸出控制台下。

  open方法實例化ListState。在裡邊設置了一下狀態的TTL,即狀態的生命周期。

  flatMap方法按key分配後的value進行判斷和記錄。

  最後main方法進行數據準備和輸出。

package myflink.state;    import org.apache.commons.compress.utils.Lists;  import org.apache.flink.api.common.functions.RichFlatMapFunction;  import org.apache.flink.api.common.state.ListState;  import org.apache.flink.api.common.state.ListStateDescriptor;  import org.apache.flink.api.common.state.StateTtlConfig;  import org.apache.flink.api.common.time.Time;  import org.apache.flink.api.common.typeinfo.TypeHint;  import org.apache.flink.api.common.typeinfo.TypeInformation;  import org.apache.flink.api.java.tuple.Tuple2;  import org.apache.flink.configuration.Configuration;  import org.apache.flink.streaming.api.datastream.DataStreamSource;  import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  import org.apache.flink.util.Collector;    import java.util.ArrayList;  import java.util.List;    public class ThresholdWarning extends RichFlatMapFunction<Tuple2<String, Long>, Tuple2<String, List<Long>>> {        //通過ListState來存儲非正常數據的狀態      private transient ListState<Long> abnormalData;      //需要監控的閾值      private Long threshold;      //觸發報警的次數      private Integer numberOfTimes;        public ThresholdWarning(Long threshold, Integer numberOfTimes) {          this.threshold = threshold;          this.numberOfTimes = numberOfTimes;      }        @Override      public void open(Configuration parameters) throws Exception {            ListStateDescriptor listStateDescriptor = new ListStateDescriptor<Long>("abnormal-state",                  TypeInformation.of(Long.class));            //狀態存活生命周期設置TTL Time To Live          StateTtlConfig  ttlConfig = StateTtlConfig                  //設置有效期為10秒                  .newBuilder(Time.seconds(10L))                  //設置有效的更新規則,當創建和寫入的時候需要重新更新為10S                  .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)                  //設置狀態的可見性,設置狀態如果沒有刪除,那麼就是可見的,另外一個值:ReturnExpiredIfNotCleanedUp ,                  // 如果沒有清理的話,狀態會一直可見的                  .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)                  .build();            //設置TTL配置          listStateDescriptor.enableTimeToLive(ttlConfig);            //通過狀態名稱(句柄)獲取狀態實例,如果不存在則會自動建          abnormalData = getRuntimeContext().getListState(new ListStateDescriptor<Long>("abnormal-state",                  TypeInformation.of(Long.class)));      }            @Override      public void flatMap(Tuple2<String, Long> value, Collector<Tuple2<String, List<Long>>> out) throws Exception {          Long inputValue = value.f1;          //如果輸入的值超過閾值,則記錄該次不正常的數據資訊          if(inputValue >= threshold) {              abnormalData.add(inputValue);          }            ArrayList<Long> list = Lists.newArrayList(abnormalData.get().iterator());            //如果不正常的數據超過了指定的數量,則輸出報警資訊          if(list.size() >= numberOfTimes) {              out.collect(Tuple2.of(value.f0 + " 超過指定閾值數量", list));              //報警資訊輸出後,清空狀態              abnormalData.clear();          }      }        public static void main(String[] args) throws Exception {          final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();            //設置並行度為1,用於觀察輸出  //        env.setParallelism(1);            DataStreamSource<Tuple2<String, Long>> tuple2DataStreamSource = env.fromElements(                  Tuple2.of("a", 50L), Tuple2.of("a", 80L), Tuple2.of("a", 400L),                  Tuple2.of("a", 100L), Tuple2.of("a", 200L), Tuple2.of("a", 200L),                  Tuple2.of("b", 100L), Tuple2.of("b", 200L), Tuple2.of("b", 200L),                  Tuple2.of("b", 500L), Tuple2.of("b", 600L), Tuple2.of("b", 700L));            tuple2DataStreamSource                  .keyBy(0)                  //超過100的閾值3次後輸出報警資訊                  .flatMap(new ThresholdWarning(100L, 3))                  .printToErr();            env.execute("Managed Keyed State");      }  }

  輸出的結果如下,大於等於100的出現3次即進行輸出。和我們想像的都一樣。

1> (b 超過指定閾值數量,[100, 200, 200])  1> (b 超過指定閾值數量,[500, 600, 700])  3> (a 超過指定閾值數量,[400, 100, 200])

    •  operator state

  我們還在原來基礎的例子上調整一下,不按key,按Operator類型,只要超過時間的次數達到了就要輸出。在其中,把Operator的hashCode進行輸出一下,用於區分是否為相同的Operator。首先我們將並行度設置為1,然後一會兒再把並行度調整成2。

  程式碼的大致思路為:

  繼承RichFlatMapFunction,實現CheckpointedFunction介面,即在觸發檢查點的時候進行操作。

  initializeState方法的時候將opertor的狀態和檢查點狀態進行初始化。

  snapshotState方法即存儲狀態時將當時的鏡像進行存儲。可以存儲到外部設備。

  flatMap的時候進行閾值判斷和數據收集。

  main方法進行檢查點設置,數據準備,執行和輸出。

package myflink.state;    import org.apache.flink.api.common.functions.RichFlatMapFunction;  import org.apache.flink.api.common.state.ListState;  import org.apache.flink.api.common.state.ListStateDescriptor;  import org.apache.flink.api.common.typeinfo.TypeHint;  import org.apache.flink.api.common.typeinfo.TypeInformation;  import org.apache.flink.api.java.tuple.Tuple2;  import org.apache.flink.runtime.state.FunctionInitializationContext;  import org.apache.flink.runtime.state.FunctionSnapshotContext;  import org.apache.flink.streaming.api.CheckpointingMode;  import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;  import org.apache.flink.streaming.api.datastream.DataStreamSource;  import org.apache.flink.streaming.api.environment.CheckpointConfig;  import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  import org.apache.flink.util.Collector;    import java.util.ArrayList;  import java.util.List;    public class ThresholdOperatorWarning extends RichFlatMapFunction<Tuple2<String, Long>, Tuple2<String,          List<Tuple2<String, Long>>>> implements CheckpointedFunction {          //非正確數據狀態      private List<Tuple2<String, Long>> bufferedData;        //檢查點狀態      private transient ListState<Tuple2<String, Long>> checkPointedState;        //需要監控的閾值      private Long threshold;      //次數      private Integer numberOfTimes;        ThresholdOperatorWarning(Long threshold, Integer numberOfTimes) {          this.threshold = threshold;          this.numberOfTimes = numberOfTimes;          this.bufferedData = new ArrayList<>();      }        @Override      public void flatMap(Tuple2<String, Long> value, Collector<Tuple2<String, List<Tuple2<String, Long>>>> out)              throws Exception {          Long inputValue = value.f1;            //超過閾值則進行記錄          if(inputValue >= threshold) {              bufferedData.add(value);          }            //超過指定次數則進行匯總和匯總輸出          if(bufferedData.size() >= numberOfTimes) {              //輸出狀態實例的hashCode              out.collect(Tuple2.of(checkPointedState.hashCode() + "閾值報警! " , bufferedData ));              //清理快取              bufferedData.clear();          }        }        @Override      public void snapshotState(FunctionSnapshotContext context) throws Exception {          //當數據進行快照時,將數據存儲到checkPointedState          checkPointedState.clear();          for (Tuple2<String, Long> element : bufferedData) {              checkPointedState.add(element);          }      }        @Override      public void initializeState(FunctionInitializationContext context) throws Exception {          //這裡獲取的是operatorStateStore          checkPointedState = context.getOperatorStateStore().getListState(new ListStateDescriptor<Tuple2<String, Long>>(                  "abnormalData", TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {})          ));            //如果發生重啟,則需要從快照中將狀態進行恢復          if(context.isRestored()) {              for (Tuple2<String, Long> element : checkPointedState.get()) {                  bufferedData.add(element);              }          }      }          public static void main(String[] args) throws Exception {          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();                //開啟檢查點          env.enableCheckpointing(1000L);          // 其他可選配置如下:            // 設置語義,默認是EXACTLY_ONCE          env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);          // 設置檢查點之間最小停頓時間          env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);          // 設置執行Checkpoint操作時的超時時間          env.getCheckpointConfig().setCheckpointTimeout(60000);          // 設置最大並發執行的檢查點的數量          env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);          // 將檢查點持久化到外部存儲          env.getCheckpointConfig().enableExternalizedCheckpoints(                  CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);                //設置並行度為1          env.setParallelism(1);          //數據源            DataStreamSource<Tuple2<String, Long>>  tuple2DataStreamSource = env.fromElements(                  Tuple2.of("a", 50L), Tuple2.of("a", 80L), Tuple2.of("a", 400L),                  Tuple2.of("a", 100L), Tuple2.of("a", 200L), Tuple2.of("a", 200L),                  Tuple2.of("b", 100L), Tuple2.of("b", 200L), Tuple2.of("b", 200L),                  Tuple2.of("b", 500L), Tuple2.of("b", 600L), Tuple2.of("b", 700L)          );            tuple2DataStreamSource.flatMap(new ThresholdOperatorWarning(100L, 3))                  .printToErr();            env.execute("managed Operator State");        }    }

  當前並行度為1,結果如下,數據沒有按key統計,而是按照裡邊的值進行統計,符合我們的要求。因為是同一個Operator,所以hashcode是一樣的。

(1629838640閾值報警! ,[(a,400), (a,100), (a,200)])  (1629838640閾值報警! ,[(a,200), (b,100), (b,200)])  (1629838640閾值報警! ,[(b,200), (b,500), (b,600)])

  接下來將並行度設置為2,結果如下。我們看一下main裡邊的數據,符合大於等於100的數據一共有10個,那麼兩個不同的operator分配的時候這10數據的時候,一個operator分5個,那麼滿足超過3個的時候才收集並輸出。因為5個裡邊只有一組3個滿足,2個不滿足所以不會輸出,所以符合我們的預期。

1> (475161679閾值報警! ,[(a,100), (a,200), (b,200)])  2> (1633355453閾值報警! ,[(a,400), (a,200), (b,100)])

  • 檢查點的機制和配置
    • 檢查點的機制

      上邊我們程式裡邊設置了檢查點,檢查點是當數據進行處理的時候將數據的狀態進行記錄,當程式出現問題的時候方便恢復。

     可以像這樣的情況:  數據源——>  123456789|12345678| 12341234|——>sink。|即檢查點,是一個checkpoint barrier,當運算元運行計算的時候會把當前的狀態進行記錄,比如讀取Kafka的數據,假如讀取到offset=6868,然後將這個值進行了記錄, 當這時有機器出現了問題,程式需要進行恢復並執行,那麼需要重新讀取這條數據再計算。引用一張圖片可以有更清楚的認識。

  

 

 

    •  檢查點的配置

      默認情況下,檢查點是關閉著的,我們需要明確開啟。其他的一些配置可參考如下內容:

     //開啟檢查點          env.enableCheckpointing(1000L);          // 其他可選配置如下:            // 設置語義,默認是EXACTLY_ONCE          env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);          // 設置檢查點之間最小停頓時間          env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);          // 設置執行Checkpoint操作時的超時時間          env.getCheckpointConfig().setCheckpointTimeout(60000);          // 設置最大並發執行的檢查點的數量          env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);          // 將檢查點持久化到外部存儲          env.getCheckpointConfig().enableExternalizedCheckpoints(                  CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

    • 檢查點手工保存

       flink支援手工將檢查點的狀態存儲到外部,也可以指定存儲到HDFS文件。存儲到外邊是為了程式出現了問題時進行恢復,比如OOM問題。程式升級和重啟時也需要重新從檢查點進行恢復。

# 觸髮指定id的作業的Savepoint,並將結果存儲到指定目錄下  bin/flink savepoint :jobId [:targetDirectory]

  •  狀態的存儲

    Keyed State和Operator State會存儲在記憶體中,因為數據是持續不斷的輸入的,當數據量非常大的時候,記憶體會出現不足的情況,那麼我們也是需要將當前的狀態進行保存的。官方稱為狀態後端。

    • Flink的狀態保存支援3種方式

      MemoryStateBackend,這種方式是將數據存儲在JVM中,這種方式是用於開發。

      FsStateBackend, 即以文件的形式存儲到磁碟中,可以是HDFS或本地文件。當JobManger把任務發送給Taskmanger進行計算,此時數據會在JVM中,當觸發了checkpoint後才會將數據存儲到文件中。

      RocksDBStateBackend,這種形式是介於前邊兩種的情況,這個是將狀態數據到KV資料庫中,當觸髮狀態的時候會將數據再持久化到文件中。這樣即提高了速度,空間也變得更大了。

 

    • 狀態存儲配置

      默認情況是MemoryStateBackend,即記憶體中。

      剩下兩種的配置如下,這種方式只對當前Job有效。RocksDB配置的話需要額外引用一下包。

// 配置 FsStateBackend  env.setStateBackend(new FsStateBackend("hdfs://namenode:60060/flink/checkpoints"));  // 配置 RocksDBStateBackend  env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:60060/flink/checkpoints"))

      通過修改flink-yaml.conf可以對該集群所有作業生效。

state.backend: filesystem  state.checkpoints.dir: hdfs://namenode:60060/flink/checkpoints

 

     

MemoryStateBackend