Flink計算PV,UV的案例及問題分析
- 2019 年 12 月 25 日
- 筆記
PV(訪問量):即Page View, 即頁面瀏覽量或點擊量,用戶每次刷新即被計算一次。
UV(獨立訪客):即Unique Visitor,訪問您網站的一台電腦客戶端為一個訪客。00:00-24:00內相同的客戶端只被計算一次。
一個UV可以用很多PV,一個PV也只能對應一個IP
沒有這些數據的支援,意味著你不知道產品的發展情況,用戶獲取成本,UV,PV,註冊轉化率;沒有這些數據做參考,你不會知道接下來提供什麼建議給領導採納,也推測不出領導為啥煩憂,那麼就么有任何錶現的機會。
舉兩個UV計算的場景:
1. 實時計算當天零點起,到當前時間的uv。
2. 實時計算當天每個小時的UV。0點…12點…24點
請問這個用spark streaming如何實現呢?是不是很難有好的思路呢?
今天主要是想給大家用flink來實現一下,在這方面flink確實比較優秀了。
主要技術點就在group by的使用。
下面就是完整的案例:
package org.table.uv; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.table.descriptors.Json; import org.apache.flink.table.descriptors.Kafka; import org.apache.flink.table.descriptors.Rowtime; import org.apache.flink.table.descriptors.Schema; import org.apache.flink.types.Row; public class ComputeUVDay { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); tEnv.registerFunction("DateUtil",new DateUtil()); tEnv.connect( new Kafka() .version("0.10") // "0.8", "0.9", "0.10", "0.11", and "universal" .topic("jsontest") .property("bootstrap.servers", "localhost:9092") .property("group.id","test") .startFromLatest() ) .withFormat( new Json() .failOnMissingField(false) .deriveSchema() ) .withSchema( new Schema() .field("rowtime", Types.SQL_TIMESTAMP) .rowtime(new Rowtime() .timestampsFromField("eventtime") .watermarksPeriodicBounded(2000) ) .field("fruit", Types.STRING) .field("number", Types.INT) ) .inAppendMode() .registerTableSource("source"); // 計算天級別的uv // Table table = tEnv.sqlQuery("select DateUtil(rowtime),count(distinct fruit) from source group by DateUtil(rowtime)"); // 計算小時級別uv Table table = tEnv.sqlQuery("select DateUtil(rowtime,'yyyyMMddHH'),count(distinct fruit) from source group by DateUtil(rowtime,'yyyyMMddHH')"); tEnv.toRetractStream(table, Row.class).addSink(new SinkFunction<Tuple2<Boolean, Row>>() { @Override public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception { System.out.println(value.f1.toString()); } }); System.out.println(env.getExecutionPlan()); env.execute("ComputeUVDay"); } }
其中DateUtil類如下:
package org.table.uv; import org.apache.flink.table.functions.ScalarFunction; import java.sql.Timestamp; import java.text.DateFormat; import java.text.SimpleDateFormat; public class DateUtil extends ScalarFunction { public static String eval(long timestamp){ String result = "null"; try { DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); result = sdf.format(new Timestamp(timestamp)); } catch (Exception e) { e.printStackTrace(); } return result; } public static String eval(long ts, String format) { String result = "null"; try { DateFormat sdf = new SimpleDateFormat(format); result = sdf.format(ts); } catch (Exception e) { e.printStackTrace(); } return result; } public static void main(String[] args) { String eval = eval(System.currentTimeMillis(),"yyyyMMddHH"); System.out.println(eval); } }
程式碼裡面的案例,是可以用於生產中的嗎?
假如數據量小可以直接使用,每秒數據量大的話,就比較麻煩。因為你看group by後面的維度,只有當天date 這個維度,這樣就會導致計算狀態超級集中而使得記憶體佔用超大進而引發oom。
這種情況解決辦法就是將狀態打散,然後再次聚合即可,典型的分治思想。
具體做法作為福利分享給球友吧。
還有一個問題就是由於存在全局去重及分組操作,flink內部必然要維護一定的狀態資訊,那麼這些狀態資訊肯定不是要一直保存的,比如uv,我們只需要更新今天,最多昨天的狀態,這個點之前的狀態要刪除的,不能讓他白白占著記憶體,而導致任務記憶體消耗巨大,甚至因oom而掛掉。
StreamQueryConfig streamQueryConfig = tEnv.queryConfig(); streamQueryConfig.withIdleStateRetentionTime(Time.minutes(10),Time.minutes(15)); tEnv.sqlUpdate(sql,streamQueryConfig);
再有就是能使用事件時間嗎?事件時間假如事件嚴重超時了,比如,我們狀態保留時間設置的是兩天,兩天之後狀態清除,那麼這時候來了事件時間剛剛好是兩天之前的,由於已經沒有狀態就會重新計算uv覆蓋已經生成的值,就導致值錯誤了,這個問題如何解決呢?
這算是一個疑問吧?