[Flink] Flink的waterMark的通俗理解
- 2020 年 3 月 31 日
- 筆記
導讀
Flink 為實時計算提供了三種時間,即事件時間(event time)、攝入時間(ingestion time)和處理時間(processing time)。
遇到的問題:
假設在一個5秒的Tumble窗口,有一個EventTime是 11秒的數據,在第16秒時候到來了。圖示第11秒的數據,在16秒到來了,如下圖:該如何處理遲到數據
什麼是Watermark
Watermark的關鍵點:
- 目的:處理EventTime 窗口計算
- 本質:時間戳
- 生成方式:Punctuated和Periodic(常用)
- 特性:單調遞增
Watermark的產生方式
-
Punctuated
數據流中每一個遞增的EventTime都會產生一個Watermark。
-
Periodic(推薦)
周期性的(一定時間間隔或者達到一定的記錄條數)產生一個Watermark。
Watermark解決的問題
上面的問題在於如何將遲來的EventTime 位11的元素正確處理?
當Watermark的時間戳等於Event中攜帶的EventTime時候,上面場景(Watermark=EventTime)的計算結果如下:
如果想正確處理遲來的數據可以定義Watermark生成策略為 Watermark = EventTime -5s, 如下:
WaterMark的例子
設置WaterMark步驟:
1.設置StreamTime Characteristic為Event Time,即設置流式時間窗口(也可以稱為流式時間特性)
2.創建的DataStreamSource調用assignTimestampsAndWatermarks方法,並設置WaterMark種類:AssignerWithPeriodicWatermarks / AssignerWithPunctuatedWatermarks
或者 實現AssignerWithPeriodicWatermarks介面 / 實現AssignerWithPunctuatedWatermarks介面
3.重寫getCurrentWatermark與extractTimestamp方法
getCurrentWatermark方法:獲取當前的水位線
extractTimestamp方法:提取數據流中的時間戳(必須顯式的指定數據中的Event Time)
實例
通過一段程式,實踐一下WaterMark的設定以及WaterMark的工作方式
數據示例:
key + 時間戳
hello,1553503210000
程式說明:
1.使用Socket模擬接收數據
2.設置WaterMark
設置的邏輯:在第一條數據進來時,設置WaterMark為0,指定第一條數據的時間戳後,獲取該時間戳與當前 WaterMark的最大值,並將最大值設置為下一條數據的WaterMark,以此類推
3.進行map基礎轉換,將String轉換為Tuple2<String,String>
4.根據Key分組
5.使用滾動Event Time窗口,將5秒內的同組數據,進行Fold拼接輸出
程式碼如下:
package waterMark; import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import javax.annotation.Nullable; /** * waterMark實例 * * @author lixiyan * @date 2019/10/22 4:45 PM */ public class MainWaterMark001 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); SingleOutputStreamOperator<String> dataStream = env.socketTextStream("localhost", 12345) .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<String>() { // 當前時間戳 long currentTimeStamp = 0L; // 允許的遲到數據 long maxDelayAllowed = 0L; // 當前水位線 long currentWaterMark; @Nullable @Override public Watermark getCurrentWatermark() { currentWaterMark = currentTimeStamp - maxDelayAllowed; System.out.println("當前水位線:" + currentWaterMark); return new Watermark(currentWaterMark); } @Override public long extractTimestamp(String s, long l) { String[] arr = s.split(","); long timeStamp = Long.parseLong(arr[1]); currentTimeStamp = Math.max(timeStamp, currentTimeStamp); System.out.println("Key:" + arr[0] + ",EventTime:" + timeStamp + ",水位線:" + currentWaterMark); return timeStamp; } }); dataStream.map(new MapFunction<String, Tuple2<String, String>>() { @Override public Tuple2<String, String> map(String s) throws Exception { return new Tuple2<String, String>(s.split(",")[0], s.split(",")[1]); } }).keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))) .fold("Start:", new FoldFunction<Tuple2<String, String>, String>() { @Override public String fold(String s, Tuple2<String, String> o) throws Exception { return s + " - " + o.f1; } }).print(); env.execute("MainWaterMark001"); } }
開啟9999埠,並輸入第一條數據:
hello,1553503185000
那麼,我先假設後續的數據Event Time間隔為1秒,推斷一下WaterMark的設定,如下圖所示
1.第一條數據的Event Time為1553503185000,那麼當前窗口時間為:1553503185000 -> 1553503189000,即下圖中紅色框線
2.第一條數據進來時,這條數據之前的WaterMark為0,當第一條數據已經進入後,指定Event Time位置,並與現在的WaterMark比較,將兩者中大的那個值設置為新的WaterMark,那麼當前數據的WaterMark為1553503185000
3.第二條數據進來時,前一條數據的WaterMark為1553503185000,第二條數據的Event Time比之前的WaterMark大,於是更新WaterMark,將當前的WaterMark更新為1553503186000,但還沒到窗口觸發時間,不進行計算
4.後面幾個以此類推,直到Event Time為:1553503190000的數據進來的時候,前一條數據的WaterMark為1553503189000,於是更新當前的WaterMark為155350390000,Flink認為1553503190000之前的數據都已經到達,且達到了窗口的觸發條件,開始進行計算
根據上面的推斷,啟動程式驗證一下
先啟動監聽9999埠,再啟動Flink程式,並向埠監聽終端輸入以下內容:
hello,1553503185000 hello,1553503186000 hello,1553503187000 hello,1553503188000 hello,1553503189000 hello,1553503190000
Flink輸出結果:
Key:hello,EventTime:1553503185000,水位線:0 Key:hello,EventTime:1553503186000,水位線:1553503185000 Key:hello,EventTime:1553503187000,水位線:1553503186000 Key:hello,EventTime:1553503188000,水位線:1553503187000 Key:hello,EventTime:1553503189000,水位線:1553503188000 Key:hello,EventTime:1553503190000,水位線:1553503189000 2> Start: - 1553503185000 - 1553503186000 - 1553503187000 - 1553503188000 - 1553503189000
通過結果可以發現,Flink在指定WaterMark時,先調用extractTimestamp方法,再調用getCurrentWatermark方法, 所以列印資訊中的WaterMark為上一條數據的WaterMark,並非當前的WaterMark
為了驗證這個結論,修改一下程式碼:
@Nullable @Override public Watermark getCurrentWatermark() { currentWaterMark = currentTimeStamp - maxDelayAllowed; System.out.println("當前水位線:" + currentWaterMark); return new Watermark(currentWaterMark); } @Override public long extractTimestamp(String s, long l) { String[] arr = s.split(","); long timeStamp = Long.parseLong(arr[1]); currentTimeStamp = Math.max(timeStamp, currentTimeStamp); System.out.println("Key:" + arr[0] + ",EventTime:" + timeStamp + ",前一條數據的水位線:" + currentWaterMark); return timeStamp; }
在監聽終端輸入同一批數據:
hello,1553503185000 hello,1553503186000 hello,1553503187000 hello,1553503188000 hello,1553503189000 hello,1553503190000
Flink輸出結果:
Key:hello,EventTime:1553503185000,前一條數據的水位線:0 當前水位線:1553503185000 Key:hello,EventTime:1553503186000,前一條數據的水位線:1553503185000 當前水位線:1553503186000 Key:hello,EventTime:1553503187000,前一條數據的水位線:1553503186000 當前水位線:1553503187000 Key:hello,EventTime:1553503188000,前一條數據的水位線:1553503187000 當前水位線:1553503188000 Key:hello,EventTime:1553503189000,前一條數據的水位線:1553503188000 當前水位線:1553503189000 Key:hello,EventTime:1553503190000,前一條數據的水位線:1553503189000 當前水位線:1553503190000 2> Start: - 1553503185000 - 1553503186000 - 1553503187000 - 1553503188000 - 1553503189000
通過上面的結果,驗證了之前的結論,在設置WaterMark方法中,先調用extractTimestamp方法,再調用getCurrentWatermark方法
數據亂序
上面的實例,Event Time是有序,現在來做一下數據亂序的場景模擬
啟動程式,在監聽終端中輸入如下數據:
其中,在觸發了了第一個窗口計算後,又來了兩條遲到數據hello,1553503187000,hello,1553503186000
hello,1553503185000 hello,1553503186000 hello,1553503187000 hello,1553503188000 hello,1553503189000 hello,1553503190000 hello,1553503187000 hello,1553503186000 hello,1553503191000 hello,1553503192000 hello,1553503193000 hello,1553503194000 hello,1553503195000
Flink結果:
Key:hello,EventTime:1553503185000,前一條數據的水位線:0 當前水位線:1553503185000 Key:hello,EventTime:1553503186000,前一條數據的水位線:1553503185000 當前水位線:1553503186000 Key:hello,EventTime:1553503187000,前一條數據的水位線:1553503186000 當前水位線:1553503187000 Key:hello,EventTime:1553503188000,前一條數據的水位線:1553503187000 當前水位線:1553503188000 Key:hello,EventTime:1553503189000,前一條數據的水位線:1553503188000 當前水位線:1553503189000 Key:hello,EventTime:1553503190000,前一條數據的水位線:1553503189000 當前水位線:1553503190000 2> Start: - 1553503185000 - 1553503186000 - 1553503187000 - 1553503188000 - 1553503189000 當前水位線:1553503190000 Key:hello,EventTime:1553503187000,前一條數據的水位線:1553503190000 當前水位線:1553503190000 Key:hello,EventTime:1553503186000,前一條數據的水位線:1553503190000 當前水位線:1553503190000 Key:hello,EventTime:1553503191000,前一條數據的水位線:1553503190000 當前水位線:1553503191000 Key:hello,EventTime:1553503192000,前一條數據的水位線:1553503191000 當前水位線:1553503192000 Key:hello,EventTime:1553503193000,前一條數據的水位線:1553503192000 當前水位線:1553503193000 Key:hello,EventTime:1553503194000,前一條數據的水位線:1553503193000 當前水位線:1553503194000 Key:hello,EventTime:1553503195000,前一條數據的水位線:1553503194000 當前水位線:1553503195000 2> Start: - 1553503190000 - 1553503191000 - 1553503192000 - 1553503193000 - 1553503194000
從結果中可以看到,在第二個窗口中,那兩條遲到數據並沒有進行處理,這個就是遲到丟棄。
亂序時間的設置:
為了解決上面的問題,我們允許Flink處理延遲以5秒內的遲到數據
修改最大亂序時間
long maxDelayAllowed = 5000l;
在監聽終端中,輸入數據
hello,1553503185000 hello,1553503186000 hello,1553503187000 hello,1553503188000 hello,1553503189000 hello,1553503190000 hello,1553503187000 hello,1553503186000 hello,1553503191000 hello,1553503192000 hello,1553503193000 hello,1553503194000 hello,1553503195000
Flink輸出結果:
Key:hello,EventTime:1553503185000,前一條數據的水位線:-5000 當前水位線:1553503180000 Key:hello,EventTime:1553503186000,前一條數據的水位線:1553503180000 當前水位線:1553503181000 Key:hello,EventTime:1553503187000,前一條數據的水位線:1553503181000 當前水位線:1553503182000 Key:hello,EventTime:1553503188000,前一條數據的水位線:1553503182000 當前水位線:1553503183000 Key:hello,EventTime:1553503189000,前一條數據的水位線:1553503183000 當前水位線:1553503184000 Key:hello,EventTime:1553503190000,前一條數據的水位線:1553503184000 當前水位線:1553503185000 Key:hello,EventTime:1553503187000,前一條數據的水位線:1553503185000 當前水位線:1553503185000 Key:hello,EventTime:1553503186000,前一條數據的水位線:1553503185000 當前水位線:1553503185000 Key:hello,EventTime:1553503191000,前一條數據的水位線:1553503185000 當前水位線:1553503186000 Key:hello,EventTime:1553503192000,前一條數據的水位線:1553503186000 當前水位線:1553503187000 Key:hello,EventTime:1553503193000,前一條數據的水位線:1553503187000 當前水位線:1553503188000 Key:hello,EventTime:1553503194000,前一條數據的水位線:1553503188000 當前水位線:1553503189000 Key:hello,EventTime:1553503195000,前一條數據的水位線:1553503189000 當前水位線:1553503190000 2> Start: - 1553503185000 - 1553503186000 - 1553503187000 - 1553503188000 - 1553503189000 - 1553503187000 - 1553503186000
可以看到,設置了最大允許亂序時間後,WaterMark要比原來低5秒,可以對延遲5秒內的數據進行處理,窗口的觸發條件也同樣會往後延遲
關於延遲時間,請結合業務場景進行設置
至此,WaterMark實例就寫完了
總結
一開始,你先不要把Windowing、WaterMark、Trigger三者混在一起去考慮最終輸出的結果是什麼,建議獨立考慮清楚這三者都做了什麼,以及三者之間的依賴關係是什麼:
1、Windowing:就是負責該如何生成Window,比如Fixed Window、Slide Window,當你配置好生成Window的策略時,Window就會根據時間動態生成,最終得到一個一個的Window,包含一個時間範圍:[起始時間, 結束時間),它們是一個一個受限於該時間範圍的事件記錄的容器,每個Window會收集一堆記錄,滿足指定條件會觸發Window內事件記錄集合的計算處理。
2、WaterMark:它其實不太好理解,可以將它定義為一個函數E=f(P),當前處理系統的處理時間P,根據一定的策略f會映射到一個事件時間E,可見E在坐標系中的表現形式是一條曲線,根據f的不同曲線形狀也不同。假設,處理時間12:00:00,我希望映射到事件時間11:59:30,這時對於延遲30秒以內(事件時範圍11:59:30~12:00:00)的事件記錄到達處理系統,都指派到時間範圍包含處理時間12:00:00這個Window中。事件時間超過12:00:00的就會由Trigger去做補償了。
3、Trigger:為了滿足實際不同的業務需求,對上述事件記錄指派給Window未能達到實際效果,而做出的一種補償,比如事件記錄在WaterMark時間戳之後到達事件處理系統,因為已經在對應的Window時間範圍之後,我有很多選擇:選擇丟棄,選擇是滿足延遲3秒後還是指派給該Window,選擇只接受對應的Window時間範圍之後的5個事件記錄,等等,這都是滿足業務需要而制定的觸發Window重新計算的策略,所以非常靈活。
本文由部落格群發一文多發等運營工具平台 OpenWrite 發布