Flink Window&Time 原理

Flink 中可以使用一套 API 完成對有界數據集以及無界數據的統一處理,而無界數據集的處理一般會伴隨著對某些固定時間間隔的數據聚合處理。比如:每五分鐘統計一次系統活躍用戶、每十秒更新熱搜榜單等等

這些需求在 Flink 中都由 Window 提供支援,Window 本質上就是藉助狀態後端快取著一定時間段內的數據,然後在達到某些條件時觸發對這些快取數據的聚合計算,輸出外部系統。

實際上,有的時候對於一些實時性要求不高的、下游系統無法負載實時輸出的場景,也會通過窗口做一個聚合,然後再輸出下游系統。

Time

時間類型

Flink 是基於事件流的實時處理引擎,那麼流入系統的每一件事件都應該有一個時間,Flink 提供以下四種時間類型來定義你的事件時間:

  • Event Time:這是我們最常用的時間類型,它表示事件真實發生時的時間(比如你點擊一個按鈕,就是點擊的一瞬間的那個時間)
  • Storage Time:不常用,表示事件以消息的形式進入隊列時的時間
  • Ingestion Time:不常用,表示事件進入 Flink Source 的時間
  • Processing Time:相對常用一些,表示事件實際進入到 window 運算元被處理的時間

以上四種實際上用的最多的還是 EventTime,ProcessingTime 偶爾會用一用。

因為 EventTime 是描述事件真實發生的時間,我們知道事件發生是有順序的,但經過網路傳輸後不一定能保證接收順序。比如:你先買了 A 商品,再買了 B 商品,那麼其實有很大可能 Flink 先收到 B 商品的購買事件,再收到 A 的。通過 EventTime 就可以保證即便 A 事件後到來我也知道它是先發生的。

而 ProcessingTime 描述的是事件被處理時的時間,準確來說並不是事件真實發生的時間,所以它往往在一些不關注事件到達順序的情境中使用。

Watermark 水位線

Watermark 在很多系統中都有應用,可能各個系統的叫法不同,但這種思想還是比較常見的。比如:Kafka 中副本同步機制中的高水位、MySQL 事務隔離機制中可見事務的高低水位等等。

在 Flink 中 Watermark 描述的也是一種水位線的概念,他表示水位線之下的所有數據都已經被 Flink 接收並處理了。

窗口的觸發一般就會基於 Watermark 來實現,水位線動態更新,當達到某某條件就觸發哪些窗口的計算。

關於 Watermark 如何更新,Flink 是開放給你實現的,當然它也提供了一些默認實現。

Timestamp 的抽取

如果你指定 Flink 需要使用 EventTime,那麼你就需要在 WatermarkStrategy 策略中通過 withTimestampAssigner 指定如何從你的事件中抽取出 Timestamp 作為 EventTime。比如:

Watermark 的生成

Watermark 的生成方式本質上是有兩種:周期性生成和標記生成。

/**
 * {@code WatermarkGenerator} 可以基於事件或者周期性的生成 watermark。
 *
 * <p><b>注意:</b>  WatermarkGenerator 將以前互相獨立的 {@code AssignerWithPunctuatedWatermarks} 
 * 和 {@code AssignerWithPeriodicWatermarks} 一同包含了進來。
 */
@Public
public interface WatermarkGenerator<T> {

    /**
     * 每來一條事件數據調用一次,可以檢查或者記錄事件的時間戳,或者也可以基於事件數據本身去生成 watermark。
     */
    void onEvent(T event, long eventTimestamp, WatermarkOutput output);

    /**
     * 周期性的調用,也許會生成新的 watermark,也許不會。
     *
     * <p>調用此方法生成 watermark 的間隔時間由 {@link ExecutionConfig#getAutoWatermarkInterval()} 決定。
     */
    void onPeriodicEmit(WatermarkOutput output);
}

周期性生成器通常通過 onEvent() 觀察傳入的事件數據,然後在框架調用 onPeriodicEmit() 時更新 Watermark。

標記生成器將查看 onEvent() 中的事件數據,然後根據你自定義的邏輯是否需要更新 Watermark。

比如這是一個官網給出的例子:

/**
 * 該 watermark 生成器可以覆蓋的場景是:數據源在一定程度上亂序。
 * 即某個最新到達的時間戳為 t 的元素將在最早到達的時間戳為 t 的元素之後最多 n 毫秒到達。
 */
public class BoundedOutOfOrdernessGenerator implements WatermarkGenerator<MyEvent> {

    private final long maxOutOfOrderness = 3000; // 3 秒

    private long currentMaxTimestamp;

    @Override
    public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
        currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // 發出的 watermark = 當前最大時間戳 - 最大亂序時間
        output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
    }

}

它實現的邏輯就是:每個事件到來會根據自身攜帶的 EventTime 和當前已到達的最大時間戳進行對比,保留兩者較大的時間戳用以描述當前已到達的最大事件。

然後 onPeriodicEmit 周期性的更新 WaterMark:最多接收 3s 的延遲數據,也就是 “2022 07-24 10:10:20” 的事件到達就會生成一個 “2022 07-24 10:10:17” 的 WaterMark 表示在此水位線之前的數據全部收到並且不再接收此水位線之前的事件。(這部分不再被接收的數據實際上會被叫做遲到數據)

Flink 中內置的一個用的比較多的生成器就是:

WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3));

這個其實就是我們上面的示例封裝,它的內部實現就是這樣:

Watermark 的傳播

在多並行度下,Watermark 具有木桶效應,取最小的。比如下圖中
map1 和 map2 會 keyby 把部分數據流到 window1,map1 產生的 w(29) 和 map2 產生的 w(14),最終 window1 會以 w(14) 往下游運算元傳播。

傳小不傳大應該是比較好理解的,如果傳大的就會讓進度慢的 map2 後續的數據全部被認為遲到數據而被丟棄。

這裡其實會存在一個問題,如果 map2 突然沒數據了,也就是不再更新 Watermark 往下游傳播了,那麼是不是就整個數據流再也不會推進 Watermark 了?

實際上,這種情況是存在的,Flink 中提供如下配置可以將某個源標記為空閑,即將它刨除 Watermark 的計算列表中。比如一分鐘沒有數據流出即標記為空閑數據源。

WatermarkStrategy
        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
        .withIdleness(Duration.ofMinutes(1));

Window

Window 本質上就是藉助狀態後端快取著一定時間段內的數據,然後在達到某些條件時觸發對這些快取數據的聚合計算,輸出外部系統。

Flink 中會根據當前數據流是否經過 keyby 運算元分為「Keyed 和 Non-Keyed Windows」

KeyedWindow 實際上就是每個 key 都對應一個窗口,而 Non-KeyedWindow 實際上是全局並行度為1的窗口(即便你手動指定多並行度也是無效的)

一個完整的 WindowStream 的處理流程大概是這樣的,數據經過 assigner 的挑選進入對應的窗口,經過 trigger 的邏輯觸發窗口,再經過 evictor 的剔除邏輯,然後由 WindowFuction 完成處理邏輯,最終輸出結果。

Window Assigners

Window assigner 定義了 stream 中的元素如何被分發到各個窗口。換句話說,每一個事件數據到來,Flink 通過 assigner 的邏輯來確定當前事件數據應該發往哪個或者哪幾個窗口。

內置的 WindowAssigners

Flink 中預定義好了四款 WindowAssigner,幾乎可以滿足日常百分之八九十的場景需求。

滾動窗口(Tumbling Windows)

滾動窗口的 assigner 分發元素到指定大小的窗口。滾動窗口的大小是固定的,且各自範圍之間不重疊。

通過使用 TumblingEventTimeWindows 或者 TumblingProcessingTimeWindows 來指定使用滾動窗口。

除此之外,滾動窗口還實現好了一個默認的 Trigger 觸發器 EventTimeTrigger,也就是說使用滾動窗口默認不需要再指定觸發器了,至於觸發器是什麼待會兒會介紹,這裡只是需要知道它是有默認觸發器實現的。

滑動窗口(Sliding Windows)

滑動窗口和滾動窗口的區別在於,多了一個滑動維度,也就是說窗口仍然是固定長度,但是窗口會以一個固定步長進行滑動。

比如窗口是 10m,滑動步長是 5m,那麼 window1 後 5m 的數據其實也是 window2 前 5m 的數據,這種窗口的特點就是存在數據重複。

這種窗口的數據場景還是比較多的,比如:每隔 5 分鐘輸出最近一小時內點擊量最多的前 N 個商品。(windowsize=1h,slide=5m,每間隔 5m 會有一個窗口產生,而每個窗口包含 1h 的數據)

通過 SlidingEventTimeWindows 和 SlidingProcessingTimeWindows 來指定使用滑動窗口。區別的是,滑動窗口對於一個事件可能返回多個窗口,以表示該數據同時存在於多個窗口之中。

滑動窗口和滾動窗口使用的是同一個觸發器 EventTimeTrigger。

會話窗口(Session Windows)

會話窗口的 assigner 會把數據按活躍的會話分組。會話窗口沒有固定的開始和結束時間,我們唯一需要指定的 sessionGap,表示如果兩條數據之間差距查過這個時間間隔即切分兩個窗口。

其實從 Flink 源碼的角度看會話窗口的實現就是:每條數據過來都會創建一個窗口(timestamp, timestamp+sessionGap),然後會對重合的窗口集進行不斷的 merge 輸出成一個窗口。這樣,窗口的截止就是最後一個活躍事件加上 sessionGap。非常巧妙的實現了 gap 這個語義。

默認的窗口觸發器依然是 EventTimeTrigger。

全局窗口(Global Windows)
全局窗口就是會將所有的數據 Shuffle 到一個實例上,單並行度收集所有數據。

通過使用 GlobalWindows 來指定使用全局窗口,需要注意的是:全局窗口沒有默認的觸發器,也就是數據默認永遠不會觸發。

所以,如果需要用到全局窗口,一定記得指定窗口觸發器。實際上 countWindow 本質上就是一個全局窗口,全局計數的窗口。

自定義 WindowAssigners

上面說的四種 WindowAssigners 是 Flink 內置的默認的實現,應該可以滿足大家平常百分之八九十的需求。除此之外的是,Flink 也允許你自定義實現 WindowAssigner,以下是它的一些核心方法:

  • assignWindows
  • getDefaultTrigger
  • isEventTime
  • getWindowSerializer

其中 assignWindows 方法它將返回一個 window 用以表示當前事件處於哪個窗口中。

getDefaultTrigger 方法返回一個默認實現的觸發器,這個觸發器默認和當前 WindowAssigner 綁定,當然你也可以外部再顯式指定替換。

isEventTime 用於標記當前 WindowAssigner 是否是基於 EventTime 實現的,getWindowSerializer 方法將告訴 Flink 應該如何序列化當前窗口。

總之,重點重寫 assignWindows 的邏輯即可,你也可以去打開 Flink 內置的四種 WindowAssigner 的源碼實現進行參考。

窗口函數(Window Functions)

WindowFunction 就是定義了窗口在觸發後應該如何計算的邏輯。

窗口函數有三種:ReduceFunction、AggregateFunction 或 ProcessWindowFunction。

  • ReduceFunction 指定兩條輸入數據如何合併起來產生一條輸出數據,輸入和輸出數據的類型必須相同。
  • AggregateFunction 會在每條數據到達窗口後 進行增量聚合,相對性能較好。
  • ProcessWindowFunction 只會在觸發器生效時將窗口中所有的數據全部發到 ProcessWindowFunction 進行計算,更靈活但性能更差。

Triggers

顧名思義,觸發器用於決定窗口是否觸發,Flink 中內置了一些觸發器,如圖:

其中,EventTimeTrigger 已經在上文中多次出現,它的邏輯也比較簡單,就是當每個事件過來時判斷當前 Watermark 是否越過窗口邊界,如果是則觸發窗口,Flink 也將調用你的 ProcessFunction 傳入窗口中所有數據進行計算。

Trigger 介面中有如下一下核心方法需要關註:

  • onElement
  • onEventTime
  • onProcessingTime

其中,onElement 會在每個事件到來被調用,onEventTime 和 onProcessingTime 都將在 Flink timer 的定時器中被調用。

其餘的一些 Triggers 相對不是特別常用,不過也沒有特別複雜,你可以直接查看它的源碼實現。

比如:ContinuousEventTimeTrigger,它就是在 EventTimeTrigger 的基礎上增加了固定時間間隔觸發,每個事件過來如果沒有達到觸發條件,會通過 ReducingState 記錄下 “time+interval” 也就是下一次觸發的時間並註冊一個 timer,最終會在 timer 的調度下執行 onEventTime 完成窗口觸發。

Evictors

Flink 的窗口模型允許在 WindowAssigner 和 Trigger 之外指定可選的 Evictor,在 trigger 觸發後、調用窗口函數之前或之後從窗口中刪除元素,我們也稱它為剔除器。

用法也比較簡單,就是在 windowStream 後調用 evictor()方法,並提供 Evictor 實現類,Evictor 類中有兩個方法需要實現,evictBefore() 包含在調用窗口函數前的邏輯,而 evictAfter() 包含在窗口函數調用之後的邏輯。

Flink 中也提供了內置的一些剔除器:

  • CountEvictor:僅記錄用戶指定數量的元素,一旦窗口中的元素超過這個數量,多餘的元素會從窗口快取的開頭移除
  • DeltaEvictor:接收 DeltaFunction 和 threshold 參數,計算最後一個元素與窗口快取中所有元素的差值, 並移除差值大於或等於 threshold 的元素
  • TimeEvictor:接收 interval 參數,以毫秒表示。 它會找到窗口中元素的最大 timestamp max_ts 並移除比 max_ts – interval 小的所有元素

最後說一下關於遲到數據,沒有被窗口包含的數據在 Flink 中可以不被丟棄,Flink 中有 Allowed Lateness 策略,即通過 allowedLateness 方法指定一個最大可接受的延遲時間,那麼這部分遲到的數據將可以通過旁路輸出(sideOutputLateData)獲取到。

Tags: