Flink 徹底理解 window(窗口)

  • 2019 年 11 月 14 日
  • 筆記

一、描述

Window 是處理無限流的核心。Flink 認為 Batch 是 Streaming 的一個特例,所以 Flink 底層的引擎是一個流式引擎,在上面實現了流處理和批處理。

而窗口(Window)就是從Streaming 到 batch 的一個橋樑。Flink 提供了非常完善的窗口機制,這是 Flink 最大的亮點之一(其他的亮點包括消息亂序處理和 Checkpoint 機制)

二、窗口的生命周期

窗口的生命周期,就是創建和銷毀。

窗口的開始時間和結束時間是基於自然時間創建的,比如指定一個5s的窗口,那麼1分鐘內就會創建12個窗口。

什麼時候窗口會被創建?當第一個元素進入到窗口開始時間的時候,這個窗口就被創建了。

什麼時候窗口會被銷毀?當時間(ProcessTime、EventTime或者 IngestionTime)越過了窗口的結束時間,再加上用戶自定義的窗口延遲時間(allowed lateness),窗口就會被銷毀。

舉個例子來說,假設我們定義了一個基於事件時間的窗口,長度是5分鐘,並且允許有1分鐘的延遲。

當第一個元素包含了一個12:00的事件時間進來時,Flink會創建一個12:00 到 12:05 的窗口;在水位到 12:06 的時候,會銷毀這個窗口。

每個窗口都會綁定一個觸發器和一個執行函數。觸發器定義了何時會觸發窗口的執行函數的計算

,比如在窗口元素數量大於等於4的時候,或者水位經過了窗口結束時間的時候。

另外,每個窗口可以指定 驅逐器(Evictor),它的作用是在觸發器觸發後,執行函數執行前,移除一些元素。

三、Keyed 和 Non-keyed Window

在定義窗口之前,首先要指定你的流是否應該被分區,使用 keyBy(…) 後,相同的 key 會被劃分到不同的流裡面,每個流可以被一個單獨的 task 處理。如果 不使用 keyBy ,所有數據會被劃分到一個窗口裡,只有一個task處理,並行度是1.

四、窗口的分類和選擇

在指定了數據流是否分區之後,下一步是要去指定窗口的類型。窗口分配器(window assigner)定義了元素如何劃分到不同的窗口中。

對於 keyed Streams,使用 window (…) 來定義,對於 非 keyed Streams,使用 windowAll(…)來定義。

Flink 預定義了很多種窗口類型,可以滿足大多數日常使用需求:tumbling windows(翻滾窗口), sliding windows(滑動窗口), session windows(會話窗口) and global windows(全局窗口)。

所有內置的窗口(除了全局窗口)都是基於時間(ProcessTime或 EventTime)的。

1、Tumbling Windows

翻滾窗口有一個固定的長度,並且不會重複。比如,下圖是指定了一個5分鐘的翻滾窗口的樣子:

(每個窗口都不重疊,每5分鐘一個窗口)

// 例子1:tumbling event-time windows  // 定義一個數據流  val input: DataStream[T] = ...  // 這裡的 key selector,如果是元組的化,可以使用_._1,如果是case class 可以使用欄位名來指定  input      .keyBy(<key selector>)  // 指定了一個TumblingEventTimeWindows,窗口大小為5分鐘      .window(TumblingEventTimeWindows.of(Time.seconds(5)))  // 窗口的操作      .<windowed transformation>(<window function>)    // 例子2:tumbling processing-time windows  input      .keyBy(<key selector>)      .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))      .<windowed transformation>(<window function>)    // 例子3:daily tumbling event-time windows offset by -8 hours.  //  input      .keyBy(<key selector>)      .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))      .<windowed transformation>(<window function>)

在例子3中,TumblingEventTimeWindows.of 指定了第二個參數 offset,它的作用是改變窗口的時間。

如果我們指定了一個15分鐘的窗口,那麼每個小時內,每個窗口的開始時間和結束時間為:

[00:00,00:15)

[00:15,00:30)

[00:30,00:45)

[00:45,01:00)

如果我們指定了一個5分鐘的offset,那麼每個窗口的開始時間和結束時間為:

[00:05,00:20)

[00:20,00:35)

[00:35,00:50)

[00:50,01:05)

一個實際的應用場景是,我們可以使用 offset 使我們的時區以0時區為準。比如我們生活在中國,時區是

UTC+08:00,可以指定一個 Time.hour(-8),使時間以0時區為準。

2、Slidding Windows

滑動窗口指定了兩個參數,第一個參數是窗口大小,第二個參數控制了新的窗口開始的頻率。

如果 滑動距離小於窗口距離的話,那麼一個元素可能被分配到多個窗口中。

比如,窗口大小10分鐘,每5分鐘滑動一次,如下圖:

val input: DataStream[T] = ...    // 例子1:sliding event-time windows  input      .keyBy(<key selector>)      .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))      .<windowed transformation>(<window function>)    // 例子2:sliding processing-time windows  input      .keyBy(<key selector>)      .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))      .<windowed transformation>(<window function>)    // 例子3,sliding processing-time windows offset by -8 hours  input      .keyBy(<key selector>)      .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))      .<windowed transformation>(<window function>)

3、Session Windows

會話窗口根據會話的間隔來把數據分配到不同的窗口。

會話窗口不重疊,沒有固定的開始時間和結束時間。

比如音樂 app 聽歌的場景,我們想統計一個用戶在一個獨立的 session 中聽了多久的歌曲(如果超過15分鐘沒聽歌,那麼就是一個新的 session 了)

我們可以用 spark Streaming ,每一個小時進行一次批處理,計算用戶session的數據分布,但是 spark Streaming 沒有內置對 session 的支援,我們只能手工寫程式碼來維護每個 user 的 session 狀態,裡面仍然會有諸多的問題。

下一次會單獨寫一篇文章來討論,如何使用flink 的 session window 來實現這個問題

4、Global Windows

全局 window 把所有相同 key 的數據,放到一個 window 來,它沒有自然的窗口結束時間,所以我們需要自己指定觸發器

val input: DataStream[T] = ...    input      .keyBy(<key selector>)      .window(GlobalWindows.create())      .<windowed transformation>(<window function>)