Flink中Periodic水印和Punctuated水印實現原理(源碼分析)

  • 2019 年 10 月 3 日
  • 筆記

在用戶程式碼中,我們設置生成水印和事件時間的方法assignTimestampsAndWatermarks()中這裡有個方法的重載

我們傳入的對象分為兩種

AssignerWithPunctuatedWatermarks(可以理解為每條數據都會產生水印,如果不想產生水印,返回一個null的水印)

AssignerWithPeriodicWatermarks(周期性的生成水印)

來看一下源碼中是如何實現這兩種水印的

二話不說打開org.apache.flink.streaming.runtime.operators.TimestampsAndPunctuatedWatermarksOperator.java

這個類的processElement方法

 

看到源碼這裡這段邏輯就 非常的清晰了

先通過用戶的程式碼獲取到事件時間,注入到element裡面就直接往下個opeartor發送了

然後通過用戶程式碼獲取水印,這裡會判斷水印是否為null

不為null的就直接往下游emit 了

現在看一下AssignerWithPeriodicWatermarks如何周期的發送生成的水印

直接打開TimestampsAndPeriodicWatermarksOperator.java這個類

這裡先不看processElement()方法,先看open方法

 

可以看到它將  當前時間其實就是System.currentTimeMillis()+ watermarkInterval水印間隔 註冊作為了一個timer定時器

這樣就知道了,當他過了這個水印間隔時間以後肯定會觸發操作

來看一下這個間隔時間以後觸發了什麼操作

 

可以看到,他先是獲取了當前的水印時間,然後直接emit出去了????

Periodic模式明明是在接收數據的processElement()發送水印的

然後又再次註冊了一個 當前時間+間隔的 timer,這樣就無限的觸發下去了

既然他在這裡發送了水印,來看下他的processElement方法

 

果然他周期性的發送水印以後,接收數據的processElement()方法裡面就沒有發送水印了

只有獲取事件時間的邏輯了