Flink 徹底理解 window(窗口)
- 2019 年 11 月 14 日
- 筆記
一、描述
Window 是處理無限流的核心。Flink 認為 Batch 是 Streaming 的一個特例,所以 Flink 底層的引擎是一個流式引擎,在上面實現了流處理和批處理。
而窗口(Window)就是從Streaming 到 batch 的一個橋樑。Flink 提供了非常完善的窗口機制,這是 Flink 最大的亮點之一(其他的亮點包括消息亂序處理和 Checkpoint 機制)
二、窗口的生命周期
窗口的生命周期,就是創建和銷毀。
窗口的開始時間和結束時間是基於自然時間創建的,比如指定一個5s的窗口,那麼1分鐘內就會創建12個窗口。
什麼時候窗口會被創建?當第一個元素進入到窗口開始時間的時候,這個窗口就被創建了。
什麼時候窗口會被銷毀?當時間(ProcessTime、EventTime或者 IngestionTime)越過了窗口的結束時間,再加上用戶自定義的窗口延遲時間(allowed lateness),窗口就會被銷毀。
舉個例子來說,假設我們定義了一個基於事件時間的窗口,長度是5分鐘,並且允許有1分鐘的延遲。
當第一個元素包含了一個12:00的事件時間進來時,Flink會創建一個12:00 到 12:05 的窗口;在水位到 12:06 的時候,會銷毀這個窗口。
每個窗口都會綁定一個觸發器和一個執行函數。觸發器定義了何時會觸發窗口的執行函數的計算
,比如在窗口元素數量大於等於4的時候,或者水位經過了窗口結束時間的時候。
另外,每個窗口可以指定 驅逐器(Evictor),它的作用是在觸發器觸發後,執行函數執行前,移除一些元素。
三、Keyed 和 Non-keyed Window
在定義窗口之前,首先要指定你的流是否應該被分區,使用 keyBy(…) 後,相同的 key 會被劃分到不同的流裡面,每個流可以被一個單獨的 task 處理。如果 不使用 keyBy ,所有數據會被劃分到一個窗口裡,只有一個task處理,並行度是1.
四、窗口的分類和選擇
在指定了數據流是否分區之後,下一步是要去指定窗口的類型。窗口分配器(window assigner)定義了元素如何劃分到不同的窗口中。
對於 keyed Streams,使用 window (…) 來定義,對於 非 keyed Streams,使用 windowAll(…)來定義。
Flink 預定義了很多種窗口類型,可以滿足大多數日常使用需求:tumbling windows(翻滾窗口), sliding windows(滑動窗口), session windows(會話窗口) and global windows(全局窗口)。
所有內置的窗口(除了全局窗口)都是基於時間(ProcessTime或 EventTime)的。
1、Tumbling Windows
翻滾窗口有一個固定的長度,並且不會重複。比如,下圖是指定了一個5分鐘的翻滾窗口的樣子:

(每個窗口都不重疊,每5分鐘一個窗口)
// 例子1:tumbling event-time windows // 定義一個數據流 val input: DataStream[T] = ... // 這裡的 key selector,如果是元組的化,可以使用_._1,如果是case class 可以使用欄位名來指定 input .keyBy(<key selector>) // 指定了一個TumblingEventTimeWindows,窗口大小為5分鐘 .window(TumblingEventTimeWindows.of(Time.seconds(5))) // 窗口的操作 .<windowed transformation>(<window function>) // 例子2:tumbling processing-time windows input .keyBy(<key selector>) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .<windowed transformation>(<window function>) // 例子3:daily tumbling event-time windows offset by -8 hours. // input .keyBy(<key selector>) .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) .<windowed transformation>(<window function>)
在例子3中,TumblingEventTimeWindows.of 指定了第二個參數 offset,它的作用是改變窗口的時間。
如果我們指定了一個15分鐘的窗口,那麼每個小時內,每個窗口的開始時間和結束時間為:
[00:00,00:15)
[00:15,00:30)
[00:30,00:45)
[00:45,01:00)
如果我們指定了一個5分鐘的offset,那麼每個窗口的開始時間和結束時間為:
[00:05,00:20)
[00:20,00:35)
[00:35,00:50)
[00:50,01:05)
一個實際的應用場景是,我們可以使用 offset 使我們的時區以0時區為準。比如我們生活在中國,時區是
UTC+08:00,可以指定一個 Time.hour(-8),使時間以0時區為準。
2、Slidding Windows
滑動窗口指定了兩個參數,第一個參數是窗口大小,第二個參數控制了新的窗口開始的頻率。
如果 滑動距離小於窗口距離的話,那麼一個元素可能被分配到多個窗口中。
比如,窗口大小10分鐘,每5分鐘滑動一次,如下圖:
val input: DataStream[T] = ... // 例子1:sliding event-time windows input .keyBy(<key selector>) .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) .<windowed transformation>(<window function>) // 例子2:sliding processing-time windows input .keyBy(<key selector>) .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) .<windowed transformation>(<window function>) // 例子3,sliding processing-time windows offset by -8 hours input .keyBy(<key selector>) .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8))) .<windowed transformation>(<window function>)
3、Session Windows
會話窗口根據會話的間隔來把數據分配到不同的窗口。
會話窗口不重疊,沒有固定的開始時間和結束時間。
比如音樂 app 聽歌的場景,我們想統計一個用戶在一個獨立的 session 中聽了多久的歌曲(如果超過15分鐘沒聽歌,那麼就是一個新的 session 了)
我們可以用 spark Streaming ,每一個小時進行一次批處理,計算用戶session的數據分布,但是 spark Streaming 沒有內置對 session 的支援,我們只能手工寫程式碼來維護每個 user 的 session 狀態,裡面仍然會有諸多的問題。
下一次會單獨寫一篇文章來討論,如何使用flink 的 session window 來實現這個問題
4、Global Windows
全局 window 把所有相同 key 的數據,放到一個 window 來,它沒有自然的窗口結束時間,所以我們需要自己指定觸發器
val input: DataStream[T] = ... input .keyBy(<key selector>) .window(GlobalWindows.create()) .<windowed transformation>(<window function>)