7.Flink實時項目之獨立訪客開發

1.架構說明

在上6節當中,我們已經完成了從ods層到dwd層的轉換,包括日誌數據和業務數據,下面我們開始做dwm層的任務。

DWM 層主要服務 DWS,因為部分需求直接從 DWD 層到DWS 層中間會有一定的計算量,而且這部分計算的結果很有可能被多個 DWS 層主題復用,所以部分 DWD 會形成一層 DWM,我們這裡主要涉及業務:

  • 訪問UV計算

  • 跳出明細計算

  • 訂單寬表

  • 支付寬表

因為實時計算與離線不同,實時計算的開發和運維成本都是非常高的,要結合實際情況考慮是否有必要象離線數倉一樣,建一個大而全的中間層。如果沒有必要大而全,這時候就需要大體規劃一下要實時計算出的指標需求了。把這些指標以主題寬表的形式輸出就是我們的 DWS 層。

統計主題 需求指標 輸出方式 計算來源 來源層級
訪客 pv 可視化大屏 page_log直接可求 dwd
uv 可視化大屏 需要用page_log過濾去重 dwm
跳出率 可視化大屏 需要用page_log行為判斷 dwm
進入頁面數 可視化大屏 需要識別開始訪問標識 dwd
連續訪問時長 可視化大屏 page_log直接可求 dwd
商品 點擊 多維分析 page_log直接可求 dwd
收藏 多維分析 收藏表 dwd
加入購物車 多維分析 購物車表 dwd
下單 可視化大屏 訂單寬表 dwm
支付 多維分析 支付寬表 dwm
退款 多維分析 退款表 dwd
評論 多維分析 評論表 dwd
地區 pv 多維分析 page_log直接可求 dwd
uv 多維分析 需要page_log過濾去重 dwm
下單 可視化大屏 訂單寬表 dwm
關鍵詞 搜索關鍵詞 可視化大屏 page_log直接可求 dwd
點擊商品關鍵詞 可視化大屏 商品主題下單再次聚合 dws
下單商品關鍵詞 可視化大屏 商品主題下單再次聚合 dws

2. 訪客UV計算

UV,全稱是 Unique Visitor,即獨立訪客,對於實時計算中,也可以稱為 DAU(Daily Active User),即每日活躍用戶,因為實時計算中的uv通常是指當日的訪客數。那麼如何從用戶行為日誌中識別出當日的訪客,那麼有兩點:

  • 其一,是識別出該訪客打開的第一個頁面,表示這個訪客開始進入我們的應用

  • 其二,由於訪客可以在一天中多次進入應用,所以我們要在一天的範圍內進行去重

程式碼,新建任務UniqueVisitApp.java,我們要從kafka的ods層消費數據,主題為:dwd_page_log

package com.zhangbao.gmall.realtime.app.dwm;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.zhangbao.gmall.realtime.utils.MyKafkaUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
​
/**
 * @author: zhangbao
 * @date: 2021/9/12 19:51
 * @desc: uv 計算
 **/
public class UniqueVisitApp {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        //設置並行度
        env.setParallelism(4);
        //設置檢查點
        env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        env.setStateBackend(new FsStateBackend("hdfs://hadoop101:9000/gmall/flink/checkpoint/uniqueVisit"));
        //指定哪個用戶讀取hdfs文件
        System.setProperty("HADOOP_USER_NAME","zhangbao");
​
        //從kafka讀取數據源
        String sourceTopic = "dwd_page_log";
        String group = "unique_visit_app_group";
        FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(sourceTopic, group);
        DataStreamSource<String> kafkaDs = env.addSource(kafkaSource);
​
        //數據轉換
        SingleOutputStreamOperator<JSONObject> jsonObjDs = kafkaDs.map(obj -> JSON.parseObject(obj));
​
        jsonObjDs.print("jsonObjDs >>>");
        try {
            env.execute("task uniqueVisitApp");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

測試從kafka消費數據

  • 啟動服務:zk,kf,logger.sh ,hadoop

  • 運行任務:BaseLogTask.java,UniqueVisitApp.java

  • 執行日誌生成伺服器

  • 查看控制台輸出

目前任務執行流程

UniqueVisitApp程式接收到的數據

{
  "common": {
    "ar": "440000",
    "uid": "48",
    "os": "Android 11.0",
    "ch": "xiaomi",
    "is_new": "0",
    "md": "Sumsung Galaxy S20",
    "mid": "mid_9",
    "vc": "v2.1.134",
    "ba": "Sumsung"
  },
  "page": {
    "page_id": "login",
    "during_time": 4621,
    "last_page_id": "good_detail"
  },
  "ts": 1631460110000
}

3. 核心過濾流程

從kafka的ods層取出數據之後,就該做具體的uv處理了。

1.首先用 keyby 按照 mid 進行分組,每組表示當前設備的訪問情況

2.分組後使用 keystate 狀態,記錄用戶進入時間,實現 RichFilterFunction 完成過濾

3.重寫 open 方法用來初始化狀態

4.重寫 filter 方法進行過濾

  • 可以直接篩掉 last_page_id 不為空的欄位,因為只要有上一頁,說明這條不是這個用戶進入的首個頁面。

  • 狀態用來記錄用戶的進入時間,只要這個 lastVisitDate 是今天,就說明用戶今天已經訪問過了所以篩除掉。如果為空或者不是今天,說明今天還沒訪問過,則保留。

  • 因為狀態值主要用於篩選是否今天來過,所以這個記錄過了今天基本上沒有用了,這裡 enableTimeToLive 設定了 1 天的過期時間,避免狀態過大。

package com.zhangbao.gmall.realtime.app.dwm;
​
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.zhangbao.gmall.realtime.utils.MyKafkaUtil;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
​
import java.text.SimpleDateFormat;
import java.util.Date;
​
/**
 * @author: zhangbao
 * @date: 2021/9/12 19:51
 * @desc: uv 計算
 **/
​
public class UniqueVisitApp {
    public static void main(String[] args) {
        //webui模式,需要添加pom依賴
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
//        StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createLocalEnvironment();
        //設置並行度
        env.setParallelism(4);
        //設置檢查點
        env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        env.setStateBackend(new FsStateBackend("hdfs://hadoop101:9000/gmall/flink/checkpoint/uniqueVisit"));
        //指定哪個用戶讀取hdfs文件
        System.setProperty("HADOOP_USER_NAME","zhangbao");
​
        //從kafka讀取數據源
        String sourceTopic = "dwd_page_log";
        String group = "unique_visit_app_group";
        FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(sourceTopic, group);
        DataStreamSource<String> kafkaDs = env.addSource(kafkaSource);
​
        //數據轉換
        SingleOutputStreamOperator<JSONObject> jsonObjDs = kafkaDs.map(obj -> JSON.parseObject(obj));
​
        //按照設備id分組
        KeyedStream<JSONObject, String> keyByMid = jsonObjDs.keyBy(jsonObject -> jsonObject.getJSONObject("common").getString("mid"));
​
        //過濾
        SingleOutputStreamOperator<JSONObject> filterDs = keyByMid.filter(new RichFilterFunction<JSONObject>() {
            ValueState<String> lastVisitDate = null;
            SimpleDateFormat sdf = null;
            @Override
            public void open(Configuration parameters) throws Exception {
                //初始化時間
                sdf = new SimpleDateFormat("yyyyMMdd");
                //初始化狀態
                ValueStateDescriptor<String> lastVisitDateDesc = new ValueStateDescriptor<>("lastVisitDate", String.class);
                //統計日活dau,狀態數據保存一天,過一天即失效
                StateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(Time.days(1)).build();
                lastVisitDateDesc.enableTimeToLive(stateTtlConfig);
                this.lastVisitDate = getRuntimeContext().getState(lastVisitDateDesc);
​
            }
​
            @Override
            public boolean filter(JSONObject jsonObject) throws Exception {
                //上一個頁面如果有值,則不是首次訪問
                String lastPageId = jsonObject.getJSONObject("page").getString("last_page_id");
                if(lastPageId != null && lastPageId.length()>0){
                    return false;
                }
                //獲取用戶訪問日期
                Long ts = jsonObject.getLong("ts");
                String mid = jsonObject.getJSONObject("common").getString("mid");
                String lastDate = sdf.format(new Date(ts));
                //獲取狀態日期
                String lastDateState = lastVisitDate.value();
                if(lastDateState != null && lastDateState.length()>0 && lastDateState.equals(lastDate)){
                    System.out.println(String.format("已訪問! mid:%s,lastDate:%s",mid,lastDate));
                    return false;
                }else {
                    lastVisitDate.update(lastDate);
                    System.out.println(String.format("未訪問! mid:%s,lastDate:%s",mid,lastDate));
                    return true;
                }
            }
        });
​
        filterDs.print("filterDs >>>");
​
        try {
            env.execute("task uniqueVisitApp");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

註:1.在測試時,發現uv沒有數據,所以把BaseLogTask任務的側輸出流改一下,如下圖所示:

2.webui模式添加pom依賴

<!-- flink webui -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-runtime-web_2.12</artifactId>
    <version>1.12.0</version>
</dependency>

4. 測試

  • 啟動zk,kafka,logger.sh,hdfs,BaseLogTask,UniqueVisitApp

  • 執行流程

    • 模擬生成的日誌jar >> nginx >> 日誌採集服務 >> kafka(ods) >> baseLogApp(分流) >> kafka(dwd) >> UniqueVisitApp(獨立訪客) >> dwm_unique_visit

經測試,流程已通。