Flink之對時間的處理
window+trigger+watermark處理全局亂序數據,指定窗口上的allowedLateness可以處理特定窗口操作的局部事件時間亂序數據
1、流處理系統中的微批
Flink內部也使用了某種形式的微批處理技術,在shuffle階段將含有多個事件的緩衝容器通過網絡發送,而不是發送單個事件
流處理系統中的批處理必須滿足以下兩點要求:
- 批處理只作為提高系統性能的機制。批量越大,系統的吞吐量就越大。
- 為了提高性能而使用的批處理必須完全獨立於定義窗口時所用的緩衝,或者為了保證容錯性而提交的代碼,也不能作為 API 的一部分。否則,系統將受到限制,並且變得脆弱且難以使用。
2、時間概念
- 事件時間,即事件實際發生的時間(由水印觸發器實現),基於事件時間處理可實現時間回溯並正確地重新處理數據
- 處理時間,即事件被處理的時間,是處理事件的機器所測量的時間
- 攝取時間,即事件進入流處理框架的時間,缺乏事件時間的數據會被處理器附上攝取時間(由source函數完成)
3、窗口
所有內置窗口都由同一種機制實現,開窗機制與檢查點機制完全分離;可直接用基本的開窗機制定義更複雜的窗口(如某種時間窗口,可基於元素計數生成中間結果)
窗口時間區間是按自然時間分配的,比如3秒的時間間隔,[0,3) [3,6)
(1)時間窗口(每隔B時長對A時長內數據聚合)
- 設置事件時間 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- 設置處理時間 env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
- 設置攝取時間 env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
- 滾動窗口A stream.timeWindow(Time.minutes(1)) stream.window(TumblingEventTimeWindows.of(Time.seconds(1)))
- 滑動窗口B stream.timeWindow(Time.minutes(1), Time.seconds(30)) stream.window(TumblingEventTimeWindows.of(Time.seconds(1)), SlidingEventTimeWindows.of(Time.seconds(30)))
(2)計數窗口(每隔B個元素對A個元素進行聚合)
為避免永遠達不到計數窗口而浪費內存,可用時間窗口觸發超時
- 滾動窗口A stream.countWindow(4)
- 滑動窗口B stream.countWindow(4, 2)
(3)會話窗口(會話即活動階段,其前後都是非活躍階段,常用於無固定持續時間或無固定交互次數的場景)
由超時時間設定,即希望非活躍狀態持續多久才結束窗口。window區間:當b比上一條記錄a延遲超過超時時間t時,出現會話窗口[上一個window_end, b-t)
- 事件時間會話窗口 stream.window(EventTimeSessionWindows.withGap(Time.minutes(5))
- 處理時間會話窗口 stream.window(ProcessingTimeSessionWindows.withGap(Time.minutes(5))
處理延遲數據
- allowedLateness(Time.minutes(60))
縮短反饋時間(若用戶會話遲遲不結束,反饋時間過長)
- trigger(ContinuousEventTrigger.of(Time.minutes(10)) #每10分鐘輸出一個結果並覆蓋之前的
(4)全局窗口(對全部數據進行統計,使用流方法實現批處理)
內置觸發器是NeverTrigger,永遠不會觸發,需要自定義觸發器才有意義
stream.window(GlobalWindows.create()).trigger(…)
4、觸發器
繼承Trigger類,Trigger抽象類的結構:
boolean canMerge()
void clear(W window, TriggerContext ctx)
TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) 每個元素到來時執行
TriggerResult onEventTime(long time, W window, TriggerContext ctx) Timer到期後執行
void onMerge(W window, OnMergeContext ctx)
TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)
5、水印
窗口 + 水印,用於解決亂序問題(並不是解決,而是假定所有正常的事件都只是一定程度內亂序,可以解決此程度內的亂序)
當Watermark在紅色區域時,窗口內的元素會計算
(1)基於事件時間處理時,水印是判斷所有事件到達的標誌,開始計算和輸出結果,晚於處理時間但早於此水印時間的事件也可被正確處理。
水印定義最長遲到數據(比當前watermark還早的數據會被丟棄,水印閾值越大,允許的遲到數據越久)
watermark的值不是全局的,但與key無關,有幾個並行,就有幾個watermark,window的觸發條件與最小的watermark有關
水印時間 = 收到的最大事件時間 – 水印閾值
一個操作算子收到多個並行流的輸入時,取最小的watermark作為當前算子的watermark
(2)異常情況:如果水印遲到得太久(可能是maxOrderness設置太大,也可能是後序事件過晚到達),收到結果的速度會變慢,解決方法是在水印到達之前輸出近似結果,其實就是後面設置Lateness的方案;如果水印到達得太早(可能是maxOrderness設置太小,也可能是後序事件過早到達),則可能丟失一些前序事件,收到錯誤結果,解決方法是採用Flink作業監控事件流,學習事件的遲到規律,以此構建水印模型
(3)分配Timestamp和Watermark
timestamp和watermark都是通過從1970年1月1日0時0分0秒到現在的毫秒數來指定的
先後順序:分配timstamp是按設置的時間間隔定時執行的,即使無數據進來也會執行,這就造成了getCurrentWatermark調用後看上去第一個watermark永遠是以0為基準計算顯示的 ,但實際並不是按那個算的。第2條的watermark如果是23的話,是不大於window_end 24的,也就不應該觸發,而如果是下一條的24則可以觸發。AssignerWithPeriodicWatermarks子類是每隔一段時間執行的,這個具體由ExecutionConfig.setAutoWatermarkInterval設置,如果沒有設置會幾乎沒有間隔地調用getCurrentWatermark方法。之所以會出現-10000時因為你沒有數據進入窗口,當然一直都是-10000,但是getCurrentWatermark方法不是在執行extractTimestamp後才執行的
直接在數據源生成(推薦,數據生成時即分配timestamp和watermark)
實現SourceFuntion接口的run方法,並調用如下方法:
- 分配timestamp:SourceContext.collectWithTimestamp(…)
- 分配watermark:SourceContext.emitWatermark(new WaterMark(…))
獲取流後使用生成器生成新流(使用此種方式,會覆蓋源提供的timestamp和watermark,注意一定要在時間窗口之前生成)
stram.assignTimestampsAndWatermarks( AssignerWithPeriodicWatermarks/AssignerWithPunctuatedWatermarks 實現類對象)
定義分配器
AssignerWithPeriodicWatermarks(周期性水印,分配時間戳並定期生成水印)
watermark產生的事件間隔(每n毫秒)是通過ExecutionConfig.setAutoWatermarkInterval(…)來定義的,當getCurrentWatermark()被調用時,若返回的watermark非空且大於上一個watermark,則發射一個新的watermark
- 預定義實現類(使用時重寫extractTimestamp):
- AscendingTimestampExtractor 適用於時間戳遞增的情況
- BoundedOutOfOrdernessTimestampExtractor 適用於亂序但最大延遲已知的情況
- 自定義實現類(使用時重寫getCurrentWatermark、extractTimestamp)
AssignerWithPunctuatedWatermarks(帶斷點水印)
事件驅動生成水印,每個單獨的event都可以產生一個watermark,會有額外計算,過多可能導致性能降低。任何一個event都觸發extractTimestamp(…)來為元素分配一個timestamp,然後立即調用該元素上的checkAndGetNextWatermark(…)方法,一旦checkAndGetNextWatermark(…)返回一個非空的watermark並且watermark比前一個watermark大的話,這個新的watermark將會被發送
(4)設定水印後觸發window的條件:
- watermark >= window_end(開啟多並發後,每個算子接收到的watermark都會進行對齊,取最小的watermark作為最終的watermark並往下一個算子發送)
- 在[window_begin, window_end)中有數據存在
(5)不足之處
無法應對遲到數據,如果一個窗口已經被觸發了,即使滿足上述條件也不會第二次觸發窗口。水印被發射到下一個算子前已默認比水印更早的數據已經全部處理了
6、allowedLateness
主要用於解決遲到問題,給遲到數據第二次或多次觸發window的機會,可對無法觸發window的遲到數據單獨處理
默認情況下,watermark超過end-of-window後,將忽略之後到達的符合window的數據
在Watermark < 窗口結束時間 + Lateness時,仍會繼續等待窗口內的元素參與窗口計算,計算時要注意狀態值的重複,直到Watermark >= 窗口結束時間 + Lateness 時清空緩存
要注意再次觸發窗口時,UDF中的狀態值的處理,要考慮state在計算時的去重問題
(1)
- 對於trigger是默認的EventTimeTrigger的情況,allowedLateness會再次觸發窗口的計算,而之前觸發的數據,會buffer起來,直到watermark超過end-of-window + allowedLateness的時間,窗口的數據及元數據信息才會被刪除。再次計算就是DataFlow模型中的Accumulating的情況。
- 對於sessionWindow情況,當late element在allowedLateness範圍之內到達時,可能會引起窗口的merge,這樣,之前窗口的數據會在新窗口中累加計算,這就是DataFlow模型中的AccumulatingAndRetracting的情況。
(2)觸發條件
- watermark < window_end + allowedLateness
- 在[window_begin, window_end)中有late數據存在
7、定時器Timer
Flink Streaming API提供的用於感知並利用處理時間/事件時間變化的機制
(1)在KeyedProcessFunction實現類里定義定時器為例:
重寫processElement(),對每個輸入元素註冊定時器
重寫onTimer(),定時器觸發時執行的邏輯
根據時間特徵的不同,具體如下:
處理時間——調用Context.timerService().registerProcessingTimeTimer()註冊;onTimer()在系統時間戳達到Timer設定的時間戳時觸發。
事件時間——調用Context.timerService().registerEventTimeTimer()註冊;onTimer()在Flink內部水印達到或超過Timer設定的時間戳時觸發。
(2)EventTimeTrigger使用Timer實現觸發時間窗口
@Override public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { if (window.maxTimestamp() <= ctx.getCurrentWatermark()) { return TriggerResult.FIRE; } else { ctx.registerEventTimeTimer(window.maxTimestamp()); return TriggerResult.CONTINUE; } }