[Flink] Flink的waterMark的通俗理解

  • 2020 年 3 月 31 日
  • 筆記

導讀

Flink 為實時計算提供了三種時間,即事件時間(event time)、攝入時間(ingestion time)和處理時間(processing time)。

遇到的問題:

假設在一個5秒的Tumble窗口,有一個EventTime是 11秒的數據,在第16秒時候到來了。圖示第11秒的數據,在16秒到來了,如下圖:該如何處理遲到數據

undefined

什麼是Watermark

Watermark的關鍵點:

  • 目的:處理EventTime 窗口計算
  • 本質:時間戳
  • 生成方式:Punctuated和Periodic(常用)
  • 特性:單調遞增

Watermark的產生方式

  • Punctuated

    數據流中每一個遞增的EventTime都會產生一個Watermark。

  • Periodic(推薦)

    周期性的(一定時間間隔或者達到一定的記錄條數)產生一個Watermark。

Watermark解決的問題

上面的問題在於如何將遲來的EventTime 位11的元素正確處理?

當Watermark的時間戳等於Event中攜帶的EventTime時候,上面場景(Watermark=EventTime)的計算結果如下:

undefined

如果想正確處理遲來的數據可以定義Watermark生成策略為 Watermark = EventTime -5s, 如下:

undefined

WaterMark的例子

設置WaterMark步驟:

1.設置StreamTime Characteristic為Event Time,即設置流式時間窗口(也可以稱為流式時間特性)

2.創建的DataStreamSource調用assignTimestampsAndWatermarks方法,並設置WaterMark種類:AssignerWithPeriodicWatermarks / AssignerWithPunctuatedWatermarks

或者 實現AssignerWithPeriodicWatermarks介面 / 實現AssignerWithPunctuatedWatermarks介面

3.重寫getCurrentWatermark與extractTimestamp方法

getCurrentWatermark方法:獲取當前的水位線

extractTimestamp方法:提取數據流中的時間戳(必須顯式的指定數據中的Event Time)

實例

通過一段程式,實踐一下WaterMark的設定以及WaterMark的工作方式

數據示例

key + 時間戳

hello,1553503210000  

程式說明

1.使用Socket模擬接收數據

2.設置WaterMark

設置的邏輯:在第一條數據進來時,設置WaterMark為0,指定第一條數據的時間戳後,獲取該時間戳與當前 WaterMark的最大值,並將最大值設置為下一條數據的WaterMark,以此類推

3.進行map基礎轉換,將String轉換為Tuple2<String,String>

4.根據Key分組

5.使用滾動Event Time窗口,將5秒內的同組數據,進行Fold拼接輸出

程式碼如下:

package waterMark;    import org.apache.flink.api.common.functions.FoldFunction;  import org.apache.flink.api.common.functions.MapFunction;  import org.apache.flink.api.java.tuple.Tuple2;  import org.apache.flink.streaming.api.TimeCharacteristic;  import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;  import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;  import org.apache.flink.streaming.api.watermark.Watermark;  import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;  import org.apache.flink.streaming.api.windowing.time.Time;      import javax.annotation.Nullable;    /**   * waterMark實例   *   * @author lixiyan   * @date 2019/10/22 4:45 PM   */  public class MainWaterMark001 {      public static void main(String[] args) throws Exception {            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();          env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);          SingleOutputStreamOperator<String> dataStream = env.socketTextStream("localhost", 12345)                  .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<String>() {                      // 當前時間戳                      long currentTimeStamp = 0L;                      // 允許的遲到數據                      long maxDelayAllowed = 0L;                      // 當前水位線                      long currentWaterMark;                        @Nullable                      @Override                      public Watermark getCurrentWatermark() {                          currentWaterMark = currentTimeStamp - maxDelayAllowed;                          System.out.println("當前水位線:" + currentWaterMark);                          return new Watermark(currentWaterMark);                      }                        @Override                      public long extractTimestamp(String s, long l) {                          String[] arr = s.split(",");                          long timeStamp = Long.parseLong(arr[1]);                          currentTimeStamp = Math.max(timeStamp, currentTimeStamp);                          System.out.println("Key:" + arr[0] + ",EventTime:" + timeStamp + ",水位線:" + currentWaterMark);                          return timeStamp;                      }                  });            dataStream.map(new MapFunction<String, Tuple2<String, String>>() {              @Override              public Tuple2<String, String> map(String s) throws Exception {                  return new Tuple2<String, String>(s.split(",")[0], s.split(",")[1]);              }          }).keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5)))                  .fold("Start:", new FoldFunction<Tuple2<String, String>, String>() {                      @Override                      public String fold(String s, Tuple2<String, String> o) throws Exception {                          return s + " - " + o.f1;                      }                  }).print();            env.execute("MainWaterMark001");        }  }    

開啟9999埠,並輸入第一條數據:

hello,1553503185000  

那麼,我先假設後續的數據Event Time間隔為1秒,推斷一下WaterMark的設定,如下圖所示

1.第一條數據的Event Time為1553503185000,那麼當前窗口時間為:1553503185000 -> 1553503189000,即下圖中紅色框線

2.第一條數據進來時,這條數據之前的WaterMark為0,當第一條數據已經進入後,指定Event Time位置,並與現在的WaterMark比較,將兩者中大的那個值設置為新的WaterMark,那麼當前數據的WaterMark為1553503185000

3.第二條數據進來時,前一條數據的WaterMark為1553503185000,第二條數據的Event Time比之前的WaterMark大,於是更新WaterMark,將當前的WaterMark更新為1553503186000,但還沒到窗口觸發時間,不進行計算

4.後面幾個以此類推,直到Event Time為:1553503190000的數據進來的時候,前一條數據的WaterMark為1553503189000,於是更新當前的WaterMark為155350390000,Flink認為1553503190000之前的數據都已經到達,且達到了窗口的觸發條件,開始進行計算

undefined

根據上面的推斷,啟動程式驗證一下

先啟動監聽9999埠,再啟動Flink程式,並向埠監聽終端輸入以下內容:

hello,1553503185000  hello,1553503186000  hello,1553503187000  hello,1553503188000  hello,1553503189000  hello,1553503190000  

Flink輸出結果:

Key:hello,EventTime:1553503185000,水位線:0  Key:hello,EventTime:1553503186000,水位線:1553503185000  Key:hello,EventTime:1553503187000,水位線:1553503186000  Key:hello,EventTime:1553503188000,水位線:1553503187000  Key:hello,EventTime:1553503189000,水位線:1553503188000  Key:hello,EventTime:1553503190000,水位線:1553503189000  2> Start: - 1553503185000 - 1553503186000 - 1553503187000 - 1553503188000 - 1553503189000  

通過結果可以發現,Flink在指定WaterMark時,先調用extractTimestamp方法,再調用getCurrentWatermark方法, 所以列印資訊中的WaterMark為上一條數據的WaterMark,並非當前的WaterMark

為了驗證這個結論,修改一下程式碼:

@Nullable  @Override  public Watermark getCurrentWatermark() {      currentWaterMark = currentTimeStamp - maxDelayAllowed;      System.out.println("當前水位線:" + currentWaterMark);      return new Watermark(currentWaterMark);  }    @Override  public long extractTimestamp(String s, long l) {      String[] arr = s.split(",");      long timeStamp = Long.parseLong(arr[1]);      currentTimeStamp = Math.max(timeStamp, currentTimeStamp);      System.out.println("Key:" + arr[0] + ",EventTime:" + timeStamp + ",前一條數據的水位線:" + currentWaterMark);      return timeStamp;  }  

在監聽終端輸入同一批數據:

hello,1553503185000  hello,1553503186000  hello,1553503187000  hello,1553503188000  hello,1553503189000  hello,1553503190000  

Flink輸出結果:

Key:hello,EventTime:1553503185000,前一條數據的水位線:0  當前水位線:1553503185000    Key:hello,EventTime:1553503186000,前一條數據的水位線:1553503185000  當前水位線:1553503186000    Key:hello,EventTime:1553503187000,前一條數據的水位線:1553503186000  當前水位線:1553503187000    Key:hello,EventTime:1553503188000,前一條數據的水位線:1553503187000  當前水位線:1553503188000    Key:hello,EventTime:1553503189000,前一條數據的水位線:1553503188000  當前水位線:1553503189000    Key:hello,EventTime:1553503190000,前一條數據的水位線:1553503189000  當前水位線:1553503190000  2> Start: - 1553503185000 - 1553503186000 - 1553503187000 - 1553503188000 - 1553503189000  

通過上面的結果,驗證了之前的結論,在設置WaterMark方法中,先調用extractTimestamp方法,再調用getCurrentWatermark方法

數據亂序

上面的實例,Event Time是有序,現在來做一下數據亂序的場景模擬

啟動程式,在監聽終端中輸入如下數據:

其中,在觸發了了第一個窗口計算後,又來了兩條遲到數據hello,1553503187000,hello,1553503186000

hello,1553503185000  hello,1553503186000  hello,1553503187000  hello,1553503188000  hello,1553503189000  hello,1553503190000  hello,1553503187000  hello,1553503186000  hello,1553503191000  hello,1553503192000  hello,1553503193000  hello,1553503194000  hello,1553503195000  

Flink結果:

Key:hello,EventTime:1553503185000,前一條數據的水位線:0  當前水位線:1553503185000    Key:hello,EventTime:1553503186000,前一條數據的水位線:1553503185000  當前水位線:1553503186000    Key:hello,EventTime:1553503187000,前一條數據的水位線:1553503186000  當前水位線:1553503187000    Key:hello,EventTime:1553503188000,前一條數據的水位線:1553503187000  當前水位線:1553503188000    Key:hello,EventTime:1553503189000,前一條數據的水位線:1553503188000  當前水位線:1553503189000    Key:hello,EventTime:1553503190000,前一條數據的水位線:1553503189000  當前水位線:1553503190000  2> Start: - 1553503185000 - 1553503186000 - 1553503187000 - 1553503188000 - 1553503189000  當前水位線:1553503190000    Key:hello,EventTime:1553503187000,前一條數據的水位線:1553503190000  當前水位線:1553503190000    Key:hello,EventTime:1553503186000,前一條數據的水位線:1553503190000  當前水位線:1553503190000    Key:hello,EventTime:1553503191000,前一條數據的水位線:1553503190000  當前水位線:1553503191000    Key:hello,EventTime:1553503192000,前一條數據的水位線:1553503191000  當前水位線:1553503192000    Key:hello,EventTime:1553503193000,前一條數據的水位線:1553503192000  當前水位線:1553503193000    Key:hello,EventTime:1553503194000,前一條數據的水位線:1553503193000  當前水位線:1553503194000    Key:hello,EventTime:1553503195000,前一條數據的水位線:1553503194000  當前水位線:1553503195000  2> Start: - 1553503190000 - 1553503191000 - 1553503192000 - 1553503193000 - 1553503194000  

從結果中可以看到,在第二個窗口中,那兩條遲到數據並沒有進行處理,這個就是遲到丟棄。

亂序時間的設置:

為了解決上面的問題,我們允許Flink處理延遲以5秒內的遲到數據

修改最大亂序時間

long maxDelayAllowed = 5000l;  

在監聽終端中,輸入數據

hello,1553503185000  hello,1553503186000  hello,1553503187000  hello,1553503188000  hello,1553503189000  hello,1553503190000  hello,1553503187000  hello,1553503186000  hello,1553503191000  hello,1553503192000  hello,1553503193000  hello,1553503194000  hello,1553503195000  

Flink輸出結果:

Key:hello,EventTime:1553503185000,前一條數據的水位線:-5000  當前水位線:1553503180000    Key:hello,EventTime:1553503186000,前一條數據的水位線:1553503180000  當前水位線:1553503181000    Key:hello,EventTime:1553503187000,前一條數據的水位線:1553503181000  當前水位線:1553503182000    Key:hello,EventTime:1553503188000,前一條數據的水位線:1553503182000  當前水位線:1553503183000    Key:hello,EventTime:1553503189000,前一條數據的水位線:1553503183000  當前水位線:1553503184000    Key:hello,EventTime:1553503190000,前一條數據的水位線:1553503184000  當前水位線:1553503185000    Key:hello,EventTime:1553503187000,前一條數據的水位線:1553503185000  當前水位線:1553503185000    Key:hello,EventTime:1553503186000,前一條數據的水位線:1553503185000  當前水位線:1553503185000    Key:hello,EventTime:1553503191000,前一條數據的水位線:1553503185000  當前水位線:1553503186000    Key:hello,EventTime:1553503192000,前一條數據的水位線:1553503186000  當前水位線:1553503187000    Key:hello,EventTime:1553503193000,前一條數據的水位線:1553503187000  當前水位線:1553503188000    Key:hello,EventTime:1553503194000,前一條數據的水位線:1553503188000  當前水位線:1553503189000    Key:hello,EventTime:1553503195000,前一條數據的水位線:1553503189000  當前水位線:1553503190000  2> Start: - 1553503185000 - 1553503186000 - 1553503187000 - 1553503188000 - 1553503189000 - 1553503187000 - 1553503186000  

可以看到,設置了最大允許亂序時間後,WaterMark要比原來低5秒,可以對延遲5秒內的數據進行處理,窗口的觸發條件也同樣會往後延遲

關於延遲時間,請結合業務場景進行設置

至此,WaterMark實例就寫完了

總結

一開始,你先不要把Windowing、WaterMark、Trigger三者混在一起去考慮最終輸出的結果是什麼,建議獨立考慮清楚這三者都做了什麼,以及三者之間的依賴關係是什麼:

1、Windowing:就是負責該如何生成Window,比如Fixed Window、Slide Window,當你配置好生成Window的策略時,Window就會根據時間動態生成,最終得到一個一個的Window,包含一個時間範圍:[起始時間, 結束時間),它們是一個一個受限於該時間範圍的事件記錄的容器,每個Window會收集一堆記錄,滿足指定條件會觸發Window內事件記錄集合的計算處理。

2、WaterMark:它其實不太好理解,可以將它定義為一個函數E=f(P),當前處理系統的處理時間P,根據一定的策略f會映射到一個事件時間E,可見E在坐標系中的表現形式是一條曲線,根據f的不同曲線形狀也不同。假設,處理時間12:00:00,我希望映射到事件時間11:59:30,這時對於延遲30秒以內(事件時範圍11:59:30~12:00:00)的事件記錄到達處理系統,都指派到時間範圍包含處理時間12:00:00這個Window中。事件時間超過12:00:00的就會由Trigger去做補償了。

3、Trigger:為了滿足實際不同的業務需求,對上述事件記錄指派給Window未能達到實際效果,而做出的一種補償,比如事件記錄在WaterMark時間戳之後到達事件處理系統,因為已經在對應的Window時間範圍之後,我有很多選擇:選擇丟棄,選擇是滿足延遲3秒後還是指派給該Window,選擇只接受對應的Window時間範圍之後的5個事件記錄,等等,這都是滿足業務需要而制定的觸發Window重新計算的策略,所以非常靈活。

本文由部落格群發一文多發等運營工具平台 OpenWrite 發布