Flink中的window、watermark和ProcessFunction
一、Flink中的window
1,window簡述
window 是一種切割無限數據為有限塊進行處理的手段。Window 是無限數據流處理的核心,Window 將一個無限的 stream 拆分成有限大小的」buckets」桶,我們可以在這些桶上做計算操作。
2,window類型
window可分為CountWindow和TimeWindow兩類:CountWindow:按照指定的數據條數生成一個 Window,與時間無關;TimeWindow:按照時間生成 Window。
a)滾動窗口
將數據依據固定的窗口長度對數據進行切片。特點:時間對齊,窗口長度固定,沒有重疊。
適用場景:適合做 BI 統計等(做每個時間段的聚合計算)。
b)滑動窗口
滑動窗口是固定窗口的更廣義的一種形式,滑動窗口由固定的窗口長度和滑動間隔組成。特點:時間對齊,窗口長度固定,可以有重疊。
適用場景:對最近一個時間段內的統計(求某介面最近 5min 的失敗率來決定是否要報警)。
c)會話窗口
由一系列事件組合一個指定時間長度的 timeout 間隙組成,類似於 web 應用的session,也就是一段時間沒有接收到新數據就會生成新的窗口。特點:時間無對齊。
session 窗口分配器通過 session 活動來對元素進行分組,session 窗口跟滾動窗口和滑動窗口相比,不會有重疊和固定的開始時間和結束時間的情況,相反,當它在一個固定的時間周期內不再收到元素,即非活動間隔產生,那個這個窗口就會關閉。一個 session 窗口通過一個 session 間隔來配置,這個 session 間隔定義了非活躍周期的長度,當這個非活躍周期產生,那麼當前的 session 將關閉並且後續的元素將被分配到新的 session 窗口中去。
3,window API
a)timeWindow
TimeWindow 是將指定時間範圍內的所有數據組成一個 window,一次對一個 window 裡面的所有數據進行計算。
①滾動窗口
Flink 默認的時間窗口根據 Processing Time 進行窗口的劃分,將 Flink 獲取到的數據根據進入 Flink 的時間劃分到不同的窗口中。
val result:DataStream[Item] = mapDStream.keyBy(0) //默認的時間窗口根據 Processing Time .timeWindow(Time.seconds(5)) .min(2) //獲取processing time滾動5s內的最小值
val input: DataStream[T] = ... / / 滾動事件時間窗口( tumbling event-time windows ) input .keyBy(<key selector>) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .<windowed transformation>(<window function>) // 滾動處理時間窗口(tumbling processing-time windows) input .keyBy(<key selector>) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .<windowed transformation>(<window function>) // 每日偏移8小時的滾動事件時間窗口(daily tumbling event-time windows offset by -8 hours. ) input .keyBy(<key selector>) .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) .<windowed transformation>(<window function>)
②滑動窗口
滑動窗口和滾動窗口的函數名是完全一致的,只是在傳參數時需要傳入兩個參數,一個是 window_size,一個是 sliding_size。
val result:DataStream[Item] = mapDStream.keyBy(0) //默認的時間窗口根據 Processing Time .timeWindow(Time.seconds(15),Time.seconds(5)) .min(2) //根據processing time 每5s統計一次15s內的最小值
val input: DataStream[T] = ... // 滑動事件時間窗口 (sliding event-time windows) input .keyBy(<key selector>) .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) .<windowed transformation>(<window function>) // 滑動處理時間窗口 (sliding processing-time windows) input .keyBy(<key selector>) .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) .<windowed transformation>(<window function>) // 偏移8小時的滑動處理時間窗口(sliding processing-time windows offset by -8 hours) input .keyBy(<key selector>) .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8))) .<windowed transformation>(<window function>)
③會話窗口
因為session看窗口沒有一個固定的開始和結束,他們的評估與滑動窗口和滾動窗口不同。在內部,session操作為每一個到達的元素創建一個新的窗口,併合並間隔時間小於指定非活動間隔的窗口。為了進行合併,session窗口的操作需要指定一個合併觸發器(Trigger)和一個合併窗口函數(Window Function),如:ReduceFunction或者WindowFunction(FoldFunction不能合併)。
val input: DataStream[T] = ... // 事件時間會話窗口(event-time session windows with static gap) input .keyBy(<key selector>) .window(EventTimeSessionWindows.withGap(Time.minutes(10))) .<windowed transformation>(<window function>) // 具有動態間隙的事件時間會話窗口 (event-time session windows with dynamic gap) input .keyBy(<key selector>) .window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] { override def extract(element: String): Long = { // determine and return session gap } })) .<windowed transformation>(<window function>) // 具有靜態間隙的處理時間會話窗口(processing-time session windows with static gap) input .keyBy(<key selector>) .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))) .<windowed transformation>(<window function>) // 具有動態間隙的處理時間會話窗口(processing-time session windows with dynamic gap) input .keyBy(<key selector>) .window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] { override def extract(element: String): Long = { // determine and return session gap } })) .<windowed transformation>(<window function>)
b)countWindow
CountWindow 根據窗口中相同 key 元素的數量來觸發執行,執行時只計算元素數量達到窗口大小的 key 對應的結果。注意:CountWindow 的 window_size 指的是相同 Key 的元素的個數,不是輸入的所有元素的總數。
①滾動窗口
默認的 CountWindow 是一個滾動窗口,只需要指定窗口大小即可,當元素數量達到窗口大小時,就會觸發窗口的執行。
val result:DataStream[Item] = mapDStream.keyBy(0) .countWindow(5) .min(2) //統計5個元素中的item的第三個值最小的元素
②滑動窗口
滑動窗口和滾動窗口的函數名是完全一致的,只是在傳參數時需要傳入兩個參數,一個是 window_size,一個是 sliding_size。
val result:DataStream[Item] = mapDStream.keyBy(0) .countWindow(5,2) .min(2) //每2個元素統計一次 每次統計5個元素中的item的第三個值最小的元素
c)window Function
window function 定義了要對窗口中收集的數據做的計算操作,主要可以分為兩類:
增量聚合函數(incremental aggregation functions )每條數據到來就進行計算,保持一個簡單的狀態。典型的增量聚合函數有 ReduceFunction, AggregateFunction。
全窗口函數(full window functions)先把窗口所有數據收集起來,等到計算的時候會遍歷所有數據。ProcessWindowFunction 就是一個全窗口函數。
d)其他API
trigger() — — 觸發器,定義window什麼時候關閉,觸發計算並輸出
evitor() — — 移除器,定義移出某些數據的邏輯
allowedLateness() — — 允許處理遲到的數據
sideOutputLateData() — — 將遲到的數據放入側輸出
getSideOutput() — — 獲取側輸出流
二、時間語義
1,時間語義
在flink的流式處理中,會涉及到時間的不同概念:
Event Time:是事件創建的時間。 它通常由事件中的時間戳描述,例如採集的日誌數據中,每一條日誌都會記錄自己的生成時間,Flink 通過時間戳分配器訪問事件時間戳。
Ingestion Time:是數據進入 Flink 的時間。
Processing Time:是每一個執行基於時間操作的運算元的本地系統時間,與機器相關,默認的時間屬性就是 Processing Time。
2,EventTime的引入
在 Flink 的流式處理中,絕大部分的業務都會使用 eventTime,一般只在eventTime 無法使用時,才會被迫使用 ProcessingTime 或者 IngestionTime。
val env = StreamExecutionEnvironment.getExecutionEnvironment //從調用時刻開始給 env 創建的每一個 stream 追加時間特徵 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
三、Watermark
一旦出現亂序,如果只根據 eventTime 決定 window 的運行,我們不能明確數據是否全部到位,但又不能無限期的等下去,此時必須要有個機制來保證一個特定的時間後,必須觸發window去進行計算了,這個特別的機制,就是Watermark。
Watermark 是一種衡量 Event Time 進展的機制。
Watermark 是用於處理亂序事件的 ,而正確的處理亂序事件,通常用 Watermark 機制結合 window 來實現。
數據流中的 Watermark 用於表示 timestamp 小於 Watermark 的數據,都已經到達了,因此,window 的執行也是由 Watermark 觸發的。
Watermark可以理解成一個延遲觸發機制,我們可以設置 Watermark的延時時長t,每次系統會校驗已經到達的數據中最大的 maxEventTime,然後認定 eventTime小於 maxEventTime - t 的所有數據都已經到達,如果有窗口的停止時間等於 maxEventTime – t,那麼這個窗口被觸發執行。
亂序流的 Watermarker 如下圖所示:(Watermark 設置為 2)
上圖中,我們設置的允許最大延遲到達時間為 2s,所以時間戳為 7s 的事件對應的 Watermark 是 5s,時間戳為 12s 的事件的 Watermark 是 10s,如果我們的窗口 1是 1s~5s,窗口 2 是 6s~10s,那麼時間戳為 7s 的事件到達時的 Watermarker 恰好觸發窗口 1,時間戳為 12s 的事件到達時的 Watermark 恰好觸發窗口 2。
當 Flink 接收到數據時,會按照一定的規則去生成 Watermark,這條 Watermark就等於當前所有到達數據中的 maxEventTime – 延遲時長,也就是說,Watermark 是由數據攜帶的,一旦數據攜帶的 Watermark 比當前未觸發的窗口的停止時間要晚,那麼就會觸發相應窗口的執行。由於 Watermark 是由數據攜帶的,因此,如果運行過程中無法獲取新的數據,那麼沒有被觸發的窗口將永遠都不被觸發。
1,watermark的引入
BoundedOutOfOrdernessTimestampExtractor的使用
val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //設置時間為事件時間,默認採用process //採用socketTextStream輸入 val socketDStream = env.socketTextStream("localhost",8888) val mapDStream = socketDStream.map { item => val arr = item.split(",") Item(arr(0), arr(1).toLong, arr(2).toDouble) } val waterDStream = mapDStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Item](Time.seconds(1)) { //1s為結束延時時間 override def extractTimestamp(element: Item): Long = element.timestap * 1000L //抽取timestap作為時間標準 }) val resultDStream = waterDStream.keyBy(_.id) .timeWindow(Time.seconds(6), Time.seconds(3)) .min(2) //當currentEverntTime - 1s(late延時) > window_end_time觸發 resultDStream.print("bound min") env.execute("executor")
此時輸入:
item_1,1596419969,33.1 item_1,1596419970,32.1 item_1,1596419971,31.1 //觸發上一個窗口的的window_end操作,1596419971-1 = 1596419970(window_end_time) [1596419964,1596419970) 此時輸出的結果為33.1 item_1,1596419972,30.1 item_1,1596419973,31.1 item_1,1596419974,32.1 //觸發上一個窗口的的window_end操作,1596419974-1 = 1596419973(window_end_time) [1596419967,1596419973) 此時輸出的結果為30.1 item_1,1596419977,29.1 //觸發上一個窗口的的window_end操作,1596419977-1 = 1596419976(window_end_time) [1596419970,1596419976) 此時輸出的結果為30.1
源碼解讀:
@Override public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) { timestamp = context.getCurrentProcessingTime(); List<TimeWindow> windows = new ArrayList<>((int) (size / slide)); long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide); //滑動步長為3s,當時間戳為1596419969時lastStart=1596419967 for (long start = lastStart; start > timestamp - size; start -= slide) { //此時可以得到窗口64~70,67~73 windows.add(new TimeWindow(start, start + size)); } return windows; } public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) { //時間戳 偏移量(時區)默認為0 滑動步長 return timestamp - (timestamp - offset + windowSize) % windowSize; }
2,AssignerWithPeriodicWatermarks
object PeriodicInsertWatermarks { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val stream = env .socketTextStream("hadoop102", 7777) .map { item => val arr: Array[String] = item.split(",") Item(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble) }.assignTimestampsAndWatermarks(new MyAssigner) .keyBy(_.id) .timeWindow(Time.seconds(5)) //滾動窗口 .min(2) stream.print() env.execute() } // `BoundedOutOfOrdernessTimestampExtractor`的底層實現 class MyAssigner extends AssignerWithPeriodicWatermarks[Item] { val bound = 1000L // 最大延遲時間 var maxTs = Long.MinValue + bound // 觀察到的最大時間戳 // 每來一條元素就要調用一次 override def extractTimestamp(t: Item, l: Long): Long = { maxTs = maxTs.max(t.timestamp * 1000) t.timestamp * 1000 } // 產生水位線的函數,默認200ms調用一次 override def getCurrentWatermark: Watermark = { val water: Long = maxTs - bound println("當前水位值為:\t" + water) // 水位線 = 觀察到的最大時間戳 - 最大延遲時間 new Watermark(water) } } } case class Item(id:String,timestamp: Long,rate:Double)
3,AssignerWithPunctuatedWatermarks
間斷式地生成watermark。和周期性生成的方式不同,這種方式不是固定時間的,而是可以根據需要對每條數據進行篩選和處理。
class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[Item] { val bound: Long = 1 * 1000 override def checkAndGetNextWatermark(r: Item, extractedTS: Long): Watermark = { //觸發watermark的條件為id if (r.id == "item_1") { new Watermark(extractedTS - bound) } else { null } } override def extractTimestamp(r: Item, previousTS: Long): Long = { r.timestamp * 1000 } }
四、ProcessFunction(底層API)
我們之前了解到的轉換運算元是無法訪問事件的時間戳資訊和水位線資訊的。而這在一些應用場景下,極為重要。 例如 MapFunction 這樣的 map 轉換運算元就無法訪問時間戳或者當前事件的事件時間。
Process Function 用來構建事件驅動的應用以及實現自定義的業務邏輯 (使用之前的 window 函數和轉換運算元無法實現)。 例如,Flink SQL 就是使用 Process Function 實現的。
ProcessFunction
KeyedProcessFunction
CoProcessFunction
ProcessJoinFunction
BroadcastProcessFunction
KeyedBroadcastProcessFunction
ProcessWindowFunction
ProcessAllWindowFunction
1,KeyedProcessFunction
KeyedProcessFunction用來操作KeyedStream。KeyedProcessFunction 會處理流的每一個元素,輸出為0個、1個或者多個元素。所有的Process Function都繼承自RichFunction介面,所以都有open()、close()和getRuntimeContext()等方法。而KeyedProcessFunction[KEY, IN, OUT]還額外提供了兩個方法 :
- processElement(v: IN, ctx: Context, out: Collector[OUT]), 流中的每一個元素都會調用這個方法,調用結果將會放在Collector數據類型中輸出。Context可以訪問元素的時間戳,元素的key,以及TimerService時間服務。Context還可以將結果輸出到別的流(side outputs)。
- onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT]) 是一個回調函數。當之前註冊的定時器觸發時調用。參數timestamp為定時器所設定的觸發的時間戳。Collector 為輸出結果的集合。OnTimerContext 和 processElement 的 Context 參數一樣,提供了上下文的一些資訊,例如定時器觸發的時間資訊(事件時間或者處理時間)。
2,TimerService和定時器(Timers)
Context 和 OnTimerContext 所持有的 TimerService 對象擁有以下方法:
currentProcessingTime(): Long 返回當前處理時間 currentWatermark(): Long 返回當前 watermark 的時間戳 registerProcessingTimeTimer(timestamp: Long): Unit 會註冊當前 key 的 processing time 的定時器。 當 processing time 到達定時時間時,觸發 timer。 registerEventTimeTimer(timestamp: Long): Unit 會註冊當前 key 的 event time定時器。當水位線大於等於定時器註冊的時間時,觸發定時器執行回調函數。 deleteProcessingTimeTimer(timestamp: Long): Unit 刪除之前註冊處理時間定時器。如果沒有這個時間戳的定時器,則不執行。 deleteEventTimeTimer(timestamp: Long): Unit 刪除之前註冊的事件時間定時器,如果沒有此時間戳的定時器,則不執行。 當定時器 timer 觸發時,會執行回調函數 onTimer()。注意定時器 timer 只能在 keyed streams 上面使用。
需求:監控item的評分,如果評分值在3s之內(processing time)連續上升,則報喜。
object KeyProcessFunctionDemo01 { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val stream = env .socketTextStream("localhost", 7777) .map { item => val arr: Array[String] = item.split(",") Item(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble) } .keyBy(_.id) .process(new MyKeyedProcess) stream.print() env.execute() } } //需求:監控item的評分,如果評分值在3s之內(processing time)連續上升,則報喜。 class MyKeyedProcess extends KeyedProcessFunction[String,Item,String]{ //保存定時器的時間戳 lazy val lastTime = getRuntimeContext.getState(new ValueStateDescriptor[Long]("time",Types.of[Long])) //保存上次的評分 lazy val lastRate = getRuntimeContext.getState(new ValueStateDescriptor[Double]("rate",Types.of[Double])) override def processElement(i: Item, context: KeyedProcessFunction[String, Item, String]#Context, collector: Collector[String]): Unit = { val time: Long = lastTime.value() val rate: Double = lastRate.value() val currentRate: Double = i.rate val currentTime: Long = context.timerService().currentProcessingTime() //當前時間 val nextTimer: Long = currentTime+3000 // println("定時器時間為:\t"+time) println("當前時間為:\t\t"+currentTime) if (rate != 0.0 && rate < currentRate && currentTime<time ) { //如果上一次評分不為0(不是第一次進入) 並且 本次評分大於上次評分 並且當前時間在定時器時間之內 context.timerService().registerProcessingTimeTimer(nextTimer) } lastTime.update(nextTimer) lastRate.update(currentRate) } //定時回調 override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, Item, String]#OnTimerContext, out: Collector[String]): Unit = { out.collect("當前key:\t"+ctx.getCurrentKey+"\t報喜") //輸出報喜資訊 } } case class Item(id:String, timestap:Long, rate:Double)
3,測輸出流(SideOutput)
process function 的side outputs功能可以產生多條流,並且這些流的數據類型可以不一樣。一個side output 可以定義為OutputTag[X]對象,X 是輸出流的數據類型。process function可以通過Context對象發射一個事件到一個或者多個side outputs。
val outPutDStream:DataStream[Item] = sourceDStream.process(new MySideOutputProcess) val lowDStream:DataStream[Item] = outPutDStream.getSideOutput(new OutputTag[Item]("low")) val highDStream:DataStream[Item] = outPutDStream.getSideOutput(new OutputTag[Item]("high")) lowDStream.print("low") highDStream.print("high")
//按照item的評分30為分界線,大於30輸出high的item,小於0輸出low的item class MySideOutputProcess extends ProcessFunction[Item,Item]{ //為數據打標 lazy val lowOutput = new OutputTag[Item]("low") lazy val highOutput = new OutputTag[Item]("high") //處理每條輸入數據 override def processElement(value: Item, ctx: ProcessFunction[Item, Item]#Context, out: Collector[Item]): Unit = { if (value.rate < 30.0) { ctx.output(lowOutput,value) }else{ ctx.output(highOutput,value) } } }