flink table窗口聚合的open函數未調用的bug分析
- 2019 年 12 月 25 日
- 筆記
今天分析一下,flink table聚合udf AggregateFunction的open函數未被調用的bug。
情景一:
當然,對於udf的聚合操作,在flink裏面有兩種用法,一種是不用窗口的分組聚合類似於
Table table = tEnv.sqlQuery("select DateUtil(rowtime,'yyyyMMddHH'),WeightedAvg(number,number) from source group by DateUtil(rowtime,'yyyyMMddHH')");
情景二:
一種是使用窗口的分組聚合操作,例如:
tEnv.sqlUpdate("insert into sink select fruit,WeightedAvg(number,number),TUMBLE_END(rowtime, INTERVAL '5' SECOND) from source group by fruit,TUMBLE(rowtime, INTERVAL '5' SECOND)");
表面上看是是同一個類型的udf,底層執行邏輯應該一樣。但是flink內部coden的時候,被完全解析成了不同的聚合函數。
假設我們定義一個AggregateFunction的udf叫做WeightedAvg,主要進行求平均值,其中有一個變量 flag,初始值為1 ,我們想我在open的時候更改為100.
package org.table.agg; import org.apache.flink.table.functions.AggregateFunction; import org.apache.flink.table.functions.FunctionContext; import java.util.Iterator; /** * Weighted Average user-defined aggregate function. */ public class WeightedAvg extends AggregateFunction<Integer, WeightedAvgAccum> { @Override public void open( FunctionContext context) throws Exception, Exception { this.flag =100; } private int flag =1; @Override public WeightedAvgAccum createAccumulator() { return new WeightedAvgAccum(); } @Override public Integer getValue(WeightedAvgAccum acc) { System.out.println("value of flag is : "+flag); if (acc.count == 0) { return null; } else { int i = acc.sum / acc.count; return i; } } public void accumulate(WeightedAvgAccum acc, int iValue, int iWeight) { acc.sum += iValue * iWeight; acc.count += iWeight; } public void retract(WeightedAvgAccum acc, int iValue, int iWeight) { acc.sum -= iValue * iWeight; acc.count -= iWeight; } public void merge(WeightedAvgAccum acc, Iterable<WeightedAvgAccum> it) { Iterator<WeightedAvgAccum> iter = it.iterator(); while (iter.hasNext()) { WeightedAvgAccum a = iter.next(); acc.count += a.count; acc.sum += a.sum; } } public void resetAccumulator(WeightedAvgAccum acc) { acc.count = 0; acc.sum = 0; } } package org.table.agg; /** * Accumulator for WeightedAvg. */ public class WeightedAvgAccum { public int sum = 0; public int count = 0; }
分別執行兩個sql之後,你會發現:
情景一:value of flag is : 100
情景二:value of flag is : 1
之所以會情景二沒有被更改為 100 主要原因是open函數沒有調用,顯然這種情況下,在AggregateFunction的open函數里初始化外部客戶端,比如mysql,redis等客戶端初始化,或者通過open的context參數傳遞一些參數到AggregateFunction,比如權重閾值等,都變的行不通了。
直接給出大致結論,主要原因是:
情景一對應DataStream的GroupAggProcessFunction。
情景二對應DataStream的AggregateFunction,而該函數並沒有open方法。僅僅說的是滾動窗口,還有其它窗口AggregateUtil。
解決辦法是有很多,比如使用構造函數在註冊的時候傳參並初始化,比如使用readobject()|writeObject()方法等。
如代碼,可以給WeightedAvg加入構造函數:
public WeightedAvg(int flag) { this.flag = flag; }
然後註冊udf的時候直接初始化:
tEnv.registerFunction("WeightedAvg",new WeightedAvg(100));
哎,只能說flink的坑太多,有待改進。但是這個也體現出了我們碼農的存在的必要性。
本文舉例僅僅是一種窗口操作,更多的窗口聚合是否會調用aggregateFunction的open方法,可以仔細閱讀AggregateUtil。