Flink計算PV,UV的案例及問題分析

  • 2019 年 12 月 25 日
  • 筆記

PV(訪問量):即Page View, 即頁面瀏覽量或點擊量,用戶每次刷新即被計算一次。

UV(獨立訪客):即Unique Visitor,訪問您網站的一台電腦客戶端為一個訪客。00:00-24:00內相同的客戶端只被計算一次。

一個UV可以用很多PV,一個PV也只能對應一個IP

沒有這些數據的支援,意味著你不知道產品的發展情況,用戶獲取成本,UV,PV,註冊轉化率;沒有這些數據做參考,你不會知道接下來提供什麼建議給領導採納,也推測不出領導為啥煩憂,那麼就么有任何錶現的機會。

舉兩個UV計算的場景:

1. 實時計算當天零點起,到當前時間的uv。

2. 實時計算當天每個小時的UV。0點…12點…24點

請問這個用spark streaming如何實現呢?是不是很難有好的思路呢?

今天主要是想給大家用flink來實現一下,在這方面flink確實比較優秀了。

主要技術點就在group by的使用。

下面就是完整的案例:

package org.table.uv;    import org.apache.flink.api.common.typeinfo.Types;  import org.apache.flink.api.java.tuple.Tuple2;  import org.apache.flink.streaming.api.TimeCharacteristic;  import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  import org.apache.flink.streaming.api.functions.sink.SinkFunction;  import org.apache.flink.table.api.Table;  import org.apache.flink.table.api.TableEnvironment;  import org.apache.flink.table.api.java.StreamTableEnvironment;  import org.apache.flink.table.descriptors.Json;  import org.apache.flink.table.descriptors.Kafka;  import org.apache.flink.table.descriptors.Rowtime;  import org.apache.flink.table.descriptors.Schema;  import org.apache.flink.types.Row;    public class ComputeUVDay {      public static void main(String[] args) throws Exception {          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();          env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);          StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);          tEnv.registerFunction("DateUtil",new DateUtil());          tEnv.connect(                  new Kafka()                          .version("0.10")                          //   "0.8", "0.9", "0.10", "0.11", and "universal"                          .topic("jsontest")                          .property("bootstrap.servers", "localhost:9092")                          .property("group.id","test")                          .startFromLatest()          )                  .withFormat(                          new Json()                                  .failOnMissingField(false)                                  .deriveSchema()                  )                  .withSchema(                            new Schema()                                  .field("rowtime", Types.SQL_TIMESTAMP)                                  .rowtime(new Rowtime()                                          .timestampsFromField("eventtime")                                          .watermarksPeriodicBounded(2000)                                  )                                  .field("fruit", Types.STRING)                                  .field("number", Types.INT)                  )                  .inAppendMode()                  .registerTableSource("source");            // 計算天級別的uv  //        Table table = tEnv.sqlQuery("select  DateUtil(rowtime),count(distinct fruit) from source group by DateUtil(rowtime)");            // 計算小時級別uv          Table table = tEnv.sqlQuery("select  DateUtil(rowtime,'yyyyMMddHH'),count(distinct fruit) from source group by DateUtil(rowtime,'yyyyMMddHH')");            tEnv.toRetractStream(table, Row.class).addSink(new SinkFunction<Tuple2<Boolean, Row>>() {              @Override              public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception {                  System.out.println(value.f1.toString());              }          });            System.out.println(env.getExecutionPlan());          env.execute("ComputeUVDay");      }  }  

其中DateUtil類如下:

package org.table.uv;    import org.apache.flink.table.functions.ScalarFunction;    import java.sql.Timestamp;  import java.text.DateFormat;  import java.text.SimpleDateFormat;    public class DateUtil extends ScalarFunction {      public static String eval(long timestamp){          String result = "null";          try {              DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");              result = sdf.format(new Timestamp(timestamp));          } catch (Exception e) {              e.printStackTrace();          }          return result;      }      public static String eval(long ts, String format) {            String result = "null";          try {              DateFormat sdf = new SimpleDateFormat(format);              result = sdf.format(ts);          } catch (Exception e) {              e.printStackTrace();          }          return result;      }      public static void main(String[] args) {          String eval = eval(System.currentTimeMillis(),"yyyyMMddHH");          System.out.println(eval);      }  }  

程式碼裡面的案例,是可以用於生產中的嗎?

假如數據量小可以直接使用,每秒數據量大的話,就比較麻煩。因為你看group by後面的維度,只有當天date 這個維度,這樣就會導致計算狀態超級集中而使得記憶體佔用超大進而引發oom。

這種情況解決辦法就是將狀態打散,然後再次聚合即可,典型的分治思想。

具體做法作為福利分享給球友吧。

還有一個問題就是由於存在全局去重及分組操作,flink內部必然要維護一定的狀態資訊,那麼這些狀態資訊肯定不是要一直保存的,比如uv,我們只需要更新今天,最多昨天的狀態,這個點之前的狀態要刪除的,不能讓他白白占著記憶體,而導致任務記憶體消耗巨大,甚至因oom而掛掉。

StreamQueryConfig streamQueryConfig = tEnv.queryConfig();  streamQueryConfig.withIdleStateRetentionTime(Time.minutes(10),Time.minutes(15));    tEnv.sqlUpdate(sql,streamQueryConfig);

再有就是能使用事件時間嗎?事件時間假如事件嚴重超時了,比如,我們狀態保留時間設置的是兩天,兩天之後狀態清除,那麼這時候來了事件時間剛剛好是兩天之前的,由於已經沒有狀態就會重新計算uv覆蓋已經生成的值,就導致值錯誤了,這個問題如何解決呢?

這算是一個疑問吧?