大數據篇:數據倉庫案例

離線數據倉庫

數據倉庫(Data WareHouse)是為企業所有決策制定過程,提供所有系統數據支援的戰略集合

通過對數據倉庫中數據的分析,可以幫助企業,改進業務流程、控制、成本、提高產品品質等

數據倉庫,並不是數據最終目的地,而是為數據最終的目的地做好準備:清洗、轉義、分類、重組、合併、拆分、統計等等

1 項目簡介

1.1 項目需求

  1. 用戶行為數據採集平台搭建
  2. 業務數據採集平台搭建
  3. 數據倉庫維度建模
  4. 分析:用戶、流量、會員、商品、銷售、地區、活動等主題
  5. 採用即席查詢工具,隨時進行指標分析
  6. 對集群性能進行監控,發生異常需要報警
  7. 元數據管理
  8. 品質監控

1.2 技術選型

  • 數據採集功能如何技術選型
採集框架名稱 主要功能
Sqoop 大數據平台和關係型資料庫的導入導出
Datax 大數據平台和關係型資料庫的導入導出
flume 擅長日誌數據的採集和解析
logstash 擅長日誌數據的採集和解析
maxwell 常用作實時解析mysql的binlog數據
canal 常用作實時解析mysql的binlog數據
waterDrop 數據導入導出工具
  • 消息中間件的技術選型
開源MQ 概述
RabbitMQ LShift 用Erlang實現,支援多協議,broker架構,重量級
ZeroMQ AMQP最初設計者iMatix公司實現,輕量消息內核,無broker設計。C++實現
Kafka LinkedIn用Scala語言實現,支援hadoop數據並行載入
ActiveMQ Apach的一種JMS具體實現,支援代理和p2p部署。支援多協議。Java實現
Redis Key-value NoSQL資料庫,有MQ的功能
MemcacheQ 國人利用memcache緩衝隊列協議開發的消息隊列,C/C++實現
  • 數據永久存儲技術框架選型
框架名稱 主要用途
HDFS 分散式文件存儲系統
Hbase Key,value對的nosql資料庫
Kudu Cloudera公司開源提供的類似於Hbase的數據存儲
Hive 基於MR的數據倉庫工具
  • 數據離線計算框架技術選型(hive引擎)
框架名稱 基本介紹
MapReduce 最早期的分散式文件計算系統
Spark 基於spark,一站式解決批流處理問題
Flink 基於flink,一站式解決批流處理問題
  • 分析資料庫選型
對比項目 Druid Kylin Presto Impala ES
亞秒級響應 × × ×
百億數據集
SQL支援 √(需插件)
離線
實時 × × ×
精確去重 × ×
多表Join × ×
JDBC for BI × ×
  • 其他選型
    • 任務調度:DolphinScheduler
    • 集群監控:CM+CDH
    • 元數據管理:Atlas
    • BI工具:Zeppelin、Superset

1.3 架構

1.4 集群資源規劃

  • 如何確認集群規模(假設每台伺服器8T磁碟,128G記憶體)
    1. 每天日活躍用戶100萬,每人一天平均100條:100萬 * 100條 = 1億條
    2. 每條日誌1K左右,每天1一條:1億 / 1024 /1024 = 約100G
    3. 半年內不擴容伺服器來算:100G * 180天 = 約18T
    4. 保存3個副本:18T * 3 = 54T
    5. 預留20% ~ 30%BUF:54T / 0.7 = 77T
    6. 總結:約10台伺服器

由於資源有限,採用3台進行製作

服務名稱 子服務 伺服器 cdh01.cm 伺服器 cdh02.cm 伺服器 cdh03.cm
HDFS NameNode
DataNode
SecondaryNameNode


Yarn NodeManager
Resourcemanager

Zookeeper Zookeeper Server
Flume Flume
Flume(消費 Kafka)
Kafka Kafka
Hive Hive
MySQL MySQL
Sqoop Sqoop
Presto Coordinator
Worker
DolphinScheduler DolphinScheduler
Druid Druid
Kylin Kylin
Hbase HMaster
HRegionServer

Superset Superset
Atlas Atlas
Solr Solr

2 數據生成模組

此模組主要針對於用戶行為數據的採集,為什麼要進行用戶行為數據的採集呢?

因為對於企業來說,用戶就是錢,需要將用戶的習慣等數據進行採集,以便在大數據衍生產品如用戶畫像標籤系統進行分析,那麼一般情況下用戶的資訊都是離線分析的,後期我們可以將分析結果存入ES等倒排索引生態中,在使用實時計算的方式匹配用戶習慣,進行訂製化推薦,更進一步的深度學習,對相似用戶進行推薦。

2.1 埋點數據基本格式

  • 公共欄位:基本所有Android手機都包含的欄位

  • 業務欄位:埋點上報的欄位,有具體的業務類型

{
"ap":"xxxxx",//項目數據來源 app pc
"cm": { //公共欄位
	"mid": "", // (String) 設備唯一標識
	"uid": "", // (String) 用戶標識
	"vc": "1", // (String) versionCode,程式版本號
	"vn": "1.0", // (String) versionName,程式版本名
	"l": "zh", // (String) language 系統語言
	"sr": "", // (String) 渠道號,應用從哪個渠道來的。
	"os": "7.1.1", // (String) Android 系統版本
	"ar": "CN", // (String) area 區域
	"md": "BBB100-1", // (String) model 手機型號
	"ba": "blackberry", // (String) brand 手機品牌
	"sv": "V2.2.1", // (String) sdkVersion
	"g": "", // (String) gmail
	"hw": "1620x1080", // (String) heightXwidth,螢幕寬高
	"t": "1506047606608", // (String) 客戶端日誌產生時的時間
	"nw": "WIFI", // (String) 網路模式
	"ln": 0, // (double) lng 經度
	"la": 0 // (double) lat 緯度
},
"et": [ //事件
	{
	"ett": "1506047605364", //客戶端事件產生時間
	"en": "display", //事件名稱
	"kv": { //事件結果,以 key-value 形式自行定義
		"goodsid": "236",
		"action": "1",
		"extend1": "1",
		"place": "2",
		"category": "75"
		}
	}
]
}
  • 示例日誌(伺服器時間戳 | 日誌),時間戳可以有效判定網路服務的通訊時長:
1540934156385| {
	"ap": "gmall",	//數倉庫名
	"cm": {
	"uid": "1234",
	"vc": "2",
	"vn": "1.0",
	"la": "EN",
	"sr": "",
	"os": "7.1.1",
	"ar": "CN",
	"md": "BBB100-1",
	"ba": "blackberry",
	"sv": "V2.2.1",
	"g": "[email protected]",
	"hw": "1620x1080",
	"t": "1506047606608",
	"nw": "WIFI",
	"ln": 0,
        "la": 0
},
"et": [
	{
	"ett": "1506047605364", //客戶端事件產生時間
	"en": "display", //事件名稱
	"kv": { //事件結果,以 key-value 形式自行定義
		"goodsid": "236",
		"action": "1",
		"extend1": "1",
		"place": "2",
		"category": "75"
		}
	},{
	"ett": "1552352626835",
	"en": "active_background",
	"kv": {
		"active_source": "1"
		}
	}
	]
}
}

2.2 埋點事件日誌數據

2.2.1 商品列表頁

  • 事件名稱:loading
標籤 含義
action 動作:開始載入=1,載入成功=2,載入失敗=3
loading_time 載入時長:計算下拉開始到介面返回數據的時間,(開始載入報 0,載入成 功或載入失敗才上報時間)
loading_way 載入類型:1-讀取快取,2-從介面拉新數據 (載入成功才上報載入類型)
extend1 擴展欄位 Extend1
extend2 擴展欄位 Extend2
type 載入類型:自動載入=1,用戶下拽載入=2,底部載入=3(底部條觸發點擊底部提示條/點擊返回頂部載入)
type1 載入失敗碼:把載入失敗狀態碼報回來(報空為載入成功,沒有失敗)

2.2.2 商品點擊

  • 事件標籤:display
標籤 含義
action 動作:曝光商品=1,點擊商品=2
goodsid 商品 ID(服務端下發的 ID)
place 順序(第幾條商品,第一條為 0,第二條為 1,如此類推)
extend1 曝光類型:1 – 首次曝光 2-重複曝光
category 分類 ID(服務端定義的分類 ID)

2.2.3 商品詳情頁

  • 事件標籤:newsdetail
標籤 含義
entry 頁面入口來源:應用首頁=1、push=2、詳情頁相關推薦=3
action 動作:開始載入=1,載入成功=2(pv),載入失敗=3, 退出頁面=4
goodsid 商品 ID(服務端下發的 ID)
show_style 商品樣式:0、無圖、1、一張大圖、2、兩張圖、3、三張小圖、4、一張小圖、 5、一張大圖兩張小圖
news_staytime 頁面停留時長:從商品開始載入時開始計算,到用戶關閉頁面所用的時間。 若中途用跳轉到其它頁面了,則暫停計時,待回到詳情頁時恢復計時。或中 途划出的時間超過 10 分鐘,則本次計時作廢,不上報本次數據。如未載入成 功退出,則報空。
loading_time 載入時長:計算頁面開始載入到介面返回數據的時間 (開始載入報 0,載入 成功或載入失敗才上報時間)
type1 載入失敗碼:把載入失敗狀態碼報回來(報空為載入成功,沒有失敗)
category 分類 ID(服務端定義的分類 ID)

2.2.4 廣告

  • 事件名稱:ad
標籤 含義
entry 入口:商品列表頁=1 應用首頁=2 商品詳情頁=3
action 動作: 廣告展示=1 廣告點擊=2
contentType Type: 1 商品 2 營銷活動
displayMills 展示時長 毫秒數
itemId 商品 id
activityId 營銷活動 id

2.2.5 消息通知

  • 事件標籤:notification
標籤 含義
action 動作:通知產生=1,通知彈出=2,通知點擊=3,常駐通知展示(不重複上 報,一天之內只報一次)=4
type 通知 id:預警通知=1,天氣預報(早=2,晚=3),常駐=4
ap_time 客戶端彈出時間
content 備用欄位

2.2.6 用戶後台活躍

  • 事件標籤: active_background
標籤 含義
active_source 1=upgrade,2=download(下載),3=plugin_upgrade

2.2.7 評論

  • 描述:評論表(comment)
序號 欄位名稱 欄位描述 欄位類型 長度 允許空 預設值
1 comment_id 評論表 int 10,0
2 userid 用戶 id int 10,0 0
3 p_comment_id 父級評論 id(為 0 則是
一級評論,不 為 0 則是回復)
int 10,0
4 content 評論內容 string 1000
5 addtime 創建時間 string
6 other_id 評論的相關 id int 10,0
7 praise_count 點贊數量 int 10,0 0
8 reply_count 回複數量 int 10,0 0

2.2.8 收藏

  • 描述:收藏(favorites)
序號 欄位名稱 欄位描述 欄位類型 長度 允許空 預設值
1 id 主鍵 int 10,0
2 course_id 商品 id int 10,0 0
3 userid 用戶 ID int 10,0 0
4 add_time 創建時間 string

2.2.9 點贊

  • 描述:所有的點贊表(praise)
序號 欄位名稱 欄位描述 欄位類型 長度 允許空 預設值
1 id 主鍵 id int 10,0
2 userid 用戶 id int 10,0
3 target_id 點贊的對象 id int 10,0
4 type 創建點贊類型:1問答點贊 2問答評論點贊
3文章點贊數 4評論點贊
int 10,0
5 add_time 添加時間 string

2.2.10 錯誤日誌

errorBrief 錯誤摘要
errorBrief 錯誤詳情

2.3 埋點啟動日誌數據

{
	"action":"1",
	"ar":"MX",
	"ba":"HTC",
	"detail":"",
	"en":"start",
	"entry":"2",
	"extend1":"",
	"g":"[email protected]",
	"hw":"640*960",
	"l":"en",
	"la":"20.4",
	"ln":"-99.3",
	"loading_time":"2",
	"md":"HTC-2",
	"mid":"995",
	"nw":"4G",
	"open_ad_type":"2",
	"os":"8.1.2",
	"sr":"B",
	"sv":"V2.0.6",
	"t":"1561472502444",
	"uid":"995",
	"vc":"10",
	"vn":"1.3.4"
}
  • 事件標籤: start
標籤 含義
entry 入 口 : push=1 , widget=2 , icon=3 , notification=4, lockscreen_widget =5
open_ad_type 開屏廣告類型: 開屏原生廣告=1, 開屏插屏廣告=2
action 狀態:成功=1 失敗=2
loading_time 載入時長:計算下拉開始到介面返回數據的時間,(開始載入報 0,載入成 功或載入失敗才上報時間)
detail 失敗碼(沒有則上報空)
extend1 失敗的 message(沒有則上報空)
en 日誌類型 start

2.4 數據生成腳本

如下案例中將省略圖中紅框中的日誌生成過程,直接使用Java程式構建logFile文件。

2.4.1 數據生成格式

  • 啟動日誌

{“action”:”1″,”ar”:”MX”,”ba”:”Sumsung”,”detail”:”201″,”en”:”start”,”entry”:”4″,”extend1″:””,”g”:”[email protected]”,”hw”:”1080*1920″,”l”:”pt”,”la”:”-11.0″,”ln”:”-70.0″,”loading_time”:”9″,”md”:”sumsung-5″,”mid”:”244″,”nw”:”3G”,”open_ad_type”:”1″,”os”:”8.2.3″,”sr”:”D”,”sv”:”V2.1.3″,”t”:”1589612165914″,”uid”:”244″,”vc”:”16″,”vn”:”1.2.1″}

  • 事件日誌(由於轉換問題,圖中沒有 “時間戳|”)

1589695383284|{“cm”:{“ln”:”-79.4″,”sv”:”V2.5.3″,”os”:”8.0.6″,”g”:”[email protected]”,”mid”:”245″,”nw”:”WIFI”,”l”:”pt”,”vc”:”6″,”hw”:”1080*1920″,”ar”:”MX”,”uid”:”245″,”t”:”1589627025851″,”la”:”-39.6″,”md”:”HTC-7″,”vn”:”1.3.5″,”ba”:”HTC”,”sr”:”N”},”ap”:”app”,”et”:[{“ett”:”1589650631883″,”en”:”display”,”kv”:{“goodsid”:”53″,”action”:”2″,”extend1″:”2″,”place”:”3″,”category”:”50″}},{“ett”:”1589690866312″,”en”:”newsdetail”,”kv”:{“entry”:”3″,”goodsid”:”54″,”news_staytime”:”1″,”loading_time”:”6″,”action”:”4″,”showtype”:”0″,”category”:”78″,”type1″:””}},{“ett”:”1589641734037″,”en”:”loading”,”kv”:{“extend2″:””,”loading_time”:”0″,”action”:”1″,”extend1″:””,”type”:”2″,”type1″:”201″,”loading_way”:”2″}},{“ett”:”1589687684878″,”en”:”ad”,”kv”:{“activityId”:”1″,”displayMills”:”92030″,”entry”:”3″,”action”:”5″,”contentType”:”0″}},{“ett”:”1589632980772″,”en”:”active_background”,”kv”:{“active_source”:”1″}},{“ett”:”1589682030324″,”en”:”error”,”kv”:{“errorDetail”:”java.lang.NullPointerException\n at cn.lift.appIn.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)\n at cn.lift.dfdf.web.AbstractBaseController.validInbound”,”errorBrief”:”at cn.lift.dfdf.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)”}},{“ett”:”1589675065650″,”en”:”comment”,”kv”:{“p_comment_id”:2,”addtime”:”1589624299628″,”praise_count”:509,”other_id”:6,”comment_id”:7,”reply_count”:35,”userid”:3,”content”:”關色蘆候佰間綸珊斑禁尹贊滌仇彭企呵姜毅”}},{“ett”:”1589631359459″,”en”:”favorites”,”kv”:{“course_id”:7,”id”:0,”add_time”:”1589681240066″,”userid”:7}},{“ett”:”1589616574187″,”en”:”praise”,”kv”:{“target_id”:1,”id”:7,”type”:3,”add_time”:”1589642497314″,”userid”:8}}]}

2.4.2 創建maven工程




  • data-producer:pom.xml
    <!--版本號統一-->
    <properties>
        <slf4j.version>1.7.20</slf4j.version>
        <logback.version>1.0.7</logback.version>
    </properties>
    <dependencies> <!--阿里巴巴開源 json 解析框架-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.51</version>
        </dependency> <!--日誌生成框架-->
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>${logback.version}</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>${logback.version}</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.10</version>
            <scope>provided</scope>
        </dependency>

    </dependencies>
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <!--主類名-->
                            <mainClass>com.heaton.bigdata.datawarehouse.app.App</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
  • data-producer:logback.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false"> <!--定義日誌文件的存儲地址 勿在 LogBack 的配置中使用相對路徑 -->
    <property name="LOG_HOME" value="/root/logs/"/> <!-- 控制台輸出 -->
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder
                class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> <!--格式化輸出:%d 表示日期,%thread 表示執行緒名,%-5level:級別從左顯示 5 個字元寬度%msg: 日誌消息,%n 是換行符 -->
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
        </encoder>
    </appender> <!-- 按照每天生成日誌文件。存儲事件日誌 -->
    <appender name="FILE"
              class="ch.qos.logback.core.rolling.RollingFileAppender"> <!-- <File>${LOG_HOME}/app.log</File>設置日誌不超過${log.max.size}時的保存路徑,注意, 如果是 web 項目會保存到 Tomcat 的 bin 目錄 下 -->
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <!--日誌文件輸出的文件名 -->
            <FileNamePattern>${LOG_HOME}/app-%d{yyyy-MM-dd}.log</FileNamePattern> <!--日誌文件保留天數 -->
            <MaxHistory>30</MaxHistory>
        </rollingPolicy>
        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
            <pattern>%msg%n</pattern>
        </encoder> <!--日誌文件最大的大小 -->
        <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
            <MaxFileSize>10MB</MaxFileSize>
        </triggeringPolicy>
    </appender> <!--非同步列印日誌-->
    <appender name="ASYNC_FILE"
              class="ch.qos.logback.classic.AsyncAppender"> <!-- 不丟失日誌.默認的,如果隊列的 80%已滿,則會丟棄 TRACT、DEBUG、INFO 級別的日誌 -->
        <discardingThreshold>0</discardingThreshold> <!-- 更改默認的隊列的深度,該值會影響性能.默認值為 256 -->
        <queueSize>512</queueSize> <!-- 添加附加的 appender,最多只能添加一個 -->
        <appender-ref ref="FILE"/>
    </appender> <!-- 日誌輸出級別 -->
    <root level="INFO">
        <appender-ref ref="STDOUT"/>
        <appender-ref ref="ASYNC_FILE"/>
        <appender-ref ref="error"/>
    </root>
</configuration>
  • data-flume:pom.xml
    <dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.9.0</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>    
  • hive-function:pom.xml
    <dependencies>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>2.1.1</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

2.4.3 各事件bean

data-producer工程

2.4.3.1 公共日誌類
import lombok.Data;
/** 
* @author Heaton
* @email [email protected]
* @date 2020/4/25 14:54 
* @describe 公共日誌類
*/
@Data
public class AppBase {
    private String mid; // (String) 設備唯一
    private String uid; // (String) 用戶 uid
    private String vc; // (String) versionCode,程式版本號
    private String vn; // (String) versionName,程式版本名
    private String l; // (String) 系統語言
    private String sr; // (String) 渠道號,應用從哪個渠道來的。
    private String os; // (String) Android 系統版本
    private String ar; // (String) 區域
    private String md; // (String) 手機型號
    private String ba; // (String) 手機品牌
    private String sv; // (String) sdkVersion
    private String g; // (String) gmail
    private String hw; // (String) heightXwidth,螢幕寬高
    private String t; // (String) 客戶端日誌產生時的時間
    private String nw; // (String) 網路模式
    private String ln; // (double) lng 經度
    private String la; // (double) lat 緯度
}
2.4.3.2 啟動日誌類
import lombok.Data;
/**
 * @author Heaton
 * @email [email protected]
 * @date 2020/4/25 14:54
 * @describe 啟動日誌類
 */
@Data
public class AppStart extends AppBase {
    private String entry;//入口: push=1,widget=2,icon=3,notification=4, lockscreen_widget
    private String open_ad_type;//開屏廣告類型: 開屏原生廣告=1, 開屏插屏廣告=2
    private String action;//狀態:成功=1 失敗=2
    private String loading_time;//載入時長:計算下拉開始到介面返回數據的時間,(開始載入報 0,載入成功或載入失敗才上報時間)
    private String detail;//失敗碼(沒有則上報空)
    private String extend1;//失敗的 message(沒有則上報空)
    private String en;//啟動日誌類型標記
}
2.4.3.3 錯誤日誌類
import lombok.Data;
/**
 * @author Heaton
 * @email [email protected]
 * @date 2020/4/25 14:54
 * @describe 錯誤日誌類
 */
@Data
public class AppErrorLog {
    private String errorBrief; //錯誤摘要
    private String errorDetail; //錯誤詳情
}
2.4.3.4 商品點擊日誌類
import lombok.Data;
/**
 * @author Heaton
 * @email [email protected]
 * @date 2020/4/25 14:54
 * @describe 商品點擊日誌類
 */
@Data
public class AppDisplay {
    private String action;//動作:曝光商品=1,點擊商品=2
    private String goodsid;//商品 ID(服務端下發的 ID)
    private String place;//順序(第幾條商品,第一條為 0,第二條為 1,如此類推)
    private String extend1;//曝光類型:1 - 首次曝光 2-重複曝光(沒有使用)
    private String category;//分類 ID(服務端定義的分類 ID)
}
2.4.3.5 商品詳情類
import lombok.Data;
/**
 * @author Heaton
 * @email [email protected]
 * @date 2020/4/25 14:54
 * @describe 商品詳情類
 */
@Data
public class AppNewsDetail {
    private String entry;//頁面入口來源:應用首頁=1、push=2、詳情頁相關推薦
    private String action;//動作:開始載入=1,載入成功=2(pv),載入失敗=3, 退出頁面=4
    private String goodsid;//商品 ID(服務端下發的 ID)
    private String showtype;//商品樣式:0、無圖 1、一張大圖 2、兩張圖 3、三張小圖 4、一張小 圖 5、一張大圖兩張小圖 來源於詳情頁相關推薦的商品,上報樣式都為 0(因為都是左文右圖)
    private String news_staytime;//頁面停留時長:從商品開始載入時開始計算,到用戶關閉頁面 所用的時間。若中途用跳轉到其它頁面了,則暫停計時,待回到詳情頁時恢復計時。或中途划出的時間超 過 10 分鐘,則本次計時作廢,不上報本次數據。如未載入成功退出,則報空。
    private String loading_time;//載入時長:計算頁面開始載入到介面返回數據的時間 (開始加 載報 0,載入成功或載入失敗才上報時間)
    private String type1;//載入失敗碼:把載入失敗狀態碼報回來(報空為載入成功,沒有失敗)
    private String category;//分類 ID(服務端定義的分類 ID)
}
2.4.3.6 商品列表類
import lombok.Data;
/**
 * @author Heaton
 * @email [email protected]
 * @date 2020/4/25 14:54
 * @describe 商品列表類
 */
@Data
public class AppLoading {
    private String action;//動作:開始載入=1,載入成功=2,載入失敗
    private String loading_time;//載入時長:計算下拉開始到介面返回數據的時間,(開始載入報 0, 載入成功或載入失敗才上報時間)
    private String loading_way;//載入類型:1-讀取快取,2-從介面拉新數據 (載入成功才上報加 載類型)
    private String extend1;//擴展欄位 Extend1
    private String extend2;//擴展欄位 Extend2
    private String type;//載入類型:自動載入=1,用戶下拽載入=2,底部載入=3(底部條觸發點擊底 部提示條/點擊返回頂部載入)
    private String type1;//載入失敗碼:把載入失敗狀態碼報回來(報空為載入成功,沒有失敗)
}
2.4.3.7 廣告類
import lombok.Data;
/**
 * @author Heaton
 * @email [email protected]
 * @date 2020/4/25 14:54
 * @describe 廣告類
 */
@Data
public class AppAd {
    private String entry;//入口:商品列表頁=1  應用首頁=2 商品詳情頁=3
    private String action;//動作: 廣告展示=1 廣告點擊=2
    private String contentType;//Type: 1 商品 2 營銷活動
    private String displayMills;//展示時長 毫秒數
    private String itemId; //商品id
    private String activityId; //營銷活動id
}
2.4.3.8 消息通知日誌類
import lombok.Data;
/**
 * @author Heaton
 * @email [email protected]
 * @date 2020/4/25 14:54
 * @describe 消息通知日誌類
 */
@Data
public class AppNotification {
    private String action;//動作:通知產生=1,通知彈出=2,通知點擊=3,常駐通知展示(不重複上 報,一天之內只報一次)
    private String type;//通知 id:預警通知=1,天氣預報(早=2,晚=3),常駐=4
    private String ap_time;//客戶端彈出時間
    private String content;//備用欄位
}
2.4.3.9 用戶後台活躍類
import lombok.Data;
/**
 * @author Heaton
 * @email [email protected]
 * @date 2020/4/25 14:54
 * @describe 用戶後台活躍類
 */
@Data
public class AppActive {
    private String active_source;//1=upgrade,2=download(下載),3=plugin_upgrade
}
2.4.3.10 用戶評論類
import lombok.Data;
/**
 * @author Heaton
 * @email [email protected]
 * @date 2020/4/25 14:54
 * @describe 用戶評論類
 */
@Data
public class AppComment {
    private int comment_id;//評論表
    private int userid;//用戶 id
    private int p_comment_id;//父級評論 id(為 0 則是一級評論,不為 0 則是回復)
    private String content;//評論內容
    private String addtime;//創建時間
    private int other_id;//評論的相關 id
    private int praise_count;//點贊數量
    private int reply_count;//回複數量
}
2.4.3.11 用戶收藏類
import lombok.Data;
/**
 * @author Heaton
 * @email [email protected]
 * @date 2020/4/25 14:54
 * @describe 用戶收藏類
 */
@Data
public class AppFavorites {
    private int id;//主鍵
    private int course_id;//商品 id
    private int userid;//用戶 ID
    private String add_time;//創建時間
}
2.4.3.12 用戶點贊類
import lombok.Data;
/**
 * @author Heaton
 * @email [email protected]
 * @date 2020/4/25 14:54
 * @describe 用戶點贊類
 */
@Data
public class AppPraise {
    private int id; //主鍵 id
    private int userid;//用戶 id
    private int target_id;//點贊的對象 id
    private int type;//點贊類型 1 問答點贊 2 問答評論點贊 3 文章點贊數 4 評論點贊
    private String add_time;//添加時間
}

2.4.4 啟動類

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.UnsupportedEncodingException;
import java.util.Random;
/**
 * @author Heaton
 * @email [email protected]
 * @date 2020/4/25 14:54
 * @describe 啟動類
 */
public class App {

    private final static Logger logger = LoggerFactory.getLogger(App.class);
    private static Random rand = new Random();
    // 設備id
    private static int s_mid = 0;

    // 用戶id
    private static int s_uid = 0;

    // 商品id
    private static int s_goodsid = 0;

    public static void main(String[] args) {

        // 參數一:控制發送每條的延時時間,默認是0
        Long delay = args.length > 0 ? Long.parseLong(args[0]) : 0L;

        // 參數二:循環遍歷次數
        int loop_len = args.length > 1 ? Integer.parseInt(args[1]) : 1000;

        // 生成數據
        generateLog(delay, loop_len);
    }

    private static void generateLog(Long delay, int loop_len) {

        for (int i = 0; i < loop_len; i++) {

            int flag = rand.nextInt(2);

            switch (flag) {
                case (0):
                    //應用啟動
                    AppStart appStart = generateStart();
                    String jsonString = JSON.toJSONString(appStart);

                    //控制台列印
                    logger.info(jsonString);
                    break;

                case (1):

                    JSONObject json = new JSONObject();

                    json.put("ap", "app");
                    json.put("cm", generateComFields());

                    JSONArray eventsArray = new JSONArray();

                    // 事件日誌
                    // 商品點擊,展示
                    if (rand.nextBoolean()) {
                        eventsArray.add(generateDisplay());
                        json.put("et", eventsArray);
                    }

                    // 商品詳情頁
                    if (rand.nextBoolean()) {
                        eventsArray.add(generateNewsDetail());
                        json.put("et", eventsArray);
                    }

                    // 商品列表頁
                    if (rand.nextBoolean()) {
                        eventsArray.add(generateNewList());
                        json.put("et", eventsArray);
                    }

                    // 廣告
                    if (rand.nextBoolean()) {
                        eventsArray.add(generateAd());
                        json.put("et", eventsArray);
                    }

                    // 消息通知
                    if (rand.nextBoolean()) {
                        eventsArray.add(generateNotification());
                        json.put("et", eventsArray);
                    }

                    // 用戶後台活躍
                    if (rand.nextBoolean()) {
                        eventsArray.add(generateBackground());
                        json.put("et", eventsArray);
                    }

                    //故障日誌
                    if (rand.nextBoolean()) {
                        eventsArray.add(generateError());
                        json.put("et", eventsArray);
                    }

                    // 用戶評論
                    if (rand.nextBoolean()) {
                        eventsArray.add(generateComment());
                        json.put("et", eventsArray);
                    }

                    // 用戶收藏
                    if (rand.nextBoolean()) {
                        eventsArray.add(generateFavorites());
                        json.put("et", eventsArray);
                    }

                    // 用戶點贊
                    if (rand.nextBoolean()) {
                        eventsArray.add(generatePraise());
                        json.put("et", eventsArray);
                    }

                    //時間
                    long millis = System.currentTimeMillis();

                    //控制台列印
                    logger.info(millis + "|" + json.toJSONString());
                    break;
            }

            // 延遲
            try {
                Thread.sleep(delay);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 公共欄位設置
     */
    private static JSONObject generateComFields() {

        AppBase appBase = new AppBase();

        //設備id
        appBase.setMid(s_mid + "");
        s_mid++;

        // 用戶id
        appBase.setUid(s_uid + "");
        s_uid++;

        // 程式版本號 5,6等
        appBase.setVc("" + rand.nextInt(20));

        //程式版本名 v1.1.1
        appBase.setVn("1." + rand.nextInt(4) + "." + rand.nextInt(10));

        // Android系統版本
        appBase.setOs("8." + rand.nextInt(3) + "." + rand.nextInt(10));

        // 語言  es,en,pt
        int flag = rand.nextInt(3);
        switch (flag) {
            case (0):
                appBase.setL("es");
                break;
            case (1):
                appBase.setL("en");
                break;
            case (2):
                appBase.setL("pt");
                break;
        }

        // 渠道號   從哪個渠道來的
        appBase.setSr(getRandomChar(1));

        // 區域
        flag = rand.nextInt(2);
        switch (flag) {
            case 0:
                appBase.setAr("BR");
            case 1:
                appBase.setAr("MX");
        }

        // 手機品牌 ba ,手機型號 md,就取2位數字了
        flag = rand.nextInt(3);
        switch (flag) {
            case 0:
                appBase.setBa("Sumsung");
                appBase.setMd("sumsung-" + rand.nextInt(20));
                break;
            case 1:
                appBase.setBa("Huawei");
                appBase.setMd("Huawei-" + rand.nextInt(20));
                break;
            case 2:
                appBase.setBa("HTC");
                appBase.setMd("HTC-" + rand.nextInt(20));
                break;
        }

        // 嵌入sdk的版本
        appBase.setSv("V2." + rand.nextInt(10) + "." + rand.nextInt(10));
        // gmail
        appBase.setG(getRandomCharAndNumr(8) + "@gmail.com");

        // 螢幕寬高 hw
        flag = rand.nextInt(4);
        switch (flag) {
            case 0:
                appBase.setHw("640*960");
                break;
            case 1:
                appBase.setHw("640*1136");
                break;
            case 2:
                appBase.setHw("750*1134");
                break;
            case 3:
                appBase.setHw("1080*1920");
                break;
        }

        // 客戶端產生日誌時間
        long millis = System.currentTimeMillis();
        appBase.setT("" + (millis - rand.nextInt(99999999)));

        // 手機網路模式 3G,4G,WIFI
        flag = rand.nextInt(3);
        switch (flag) {
            case 0:
                appBase.setNw("3G");
                break;
            case 1:
                appBase.setNw("4G");
                break;
            case 2:
                appBase.setNw("WIFI");
                break;
        }

        // 拉丁美洲 西經34°46′至西經117°09;北緯32°42′至南緯53°54′
        // 經度
        appBase.setLn((-34 - rand.nextInt(83) - rand.nextInt(60) / 10.0) + "");
        // 緯度
        appBase.setLa((32 - rand.nextInt(85) - rand.nextInt(60) / 10.0) + "");

        return (JSONObject) JSON.toJSON(appBase);
    }

    /**
     * 商品展示事件
     */
    private static JSONObject generateDisplay() {

        AppDisplay appDisplay = new AppDisplay();

        boolean boolFlag = rand.nextInt(10) < 7;

        // 動作:曝光商品=1,點擊商品=2,
        if (boolFlag) {
            appDisplay.setAction("1");
        } else {
            appDisplay.setAction("2");
        }

        // 商品id
        String goodsId = s_goodsid + "";
        s_goodsid++;

        appDisplay.setGoodsid(goodsId);

        // 順序  設置成6條吧
        int flag = rand.nextInt(6);
        appDisplay.setPlace("" + flag);

        // 曝光類型
        flag = 1 + rand.nextInt(2);
        appDisplay.setExtend1("" + flag);

        // 分類
        flag = 1 + rand.nextInt(100);
        appDisplay.setCategory("" + flag);

        JSONObject jsonObject = (JSONObject) JSON.toJSON(appDisplay);

        return packEventJson("display", jsonObject);
    }

    /**
     * 商品詳情頁
     */
    private static JSONObject generateNewsDetail() {

        AppNewsDetail appNewsDetail = new AppNewsDetail();

        // 頁面入口來源
        int flag = 1 + rand.nextInt(3);
        appNewsDetail.setEntry(flag + "");

        // 動作
        appNewsDetail.setAction("" + (rand.nextInt(4) + 1));

        // 商品id
        appNewsDetail.setGoodsid(s_goodsid + "");

        // 商品來源類型
        flag = 1 + rand.nextInt(3);
        appNewsDetail.setShowtype(flag + "");

        // 商品樣式
        flag = rand.nextInt(6);
        appNewsDetail.setShowtype("" + flag);

        // 頁面停留時長
        flag = rand.nextInt(10) * rand.nextInt(7);
        appNewsDetail.setNews_staytime(flag + "");

        // 載入時長
        flag = rand.nextInt(10) * rand.nextInt(7);
        appNewsDetail.setLoading_time(flag + "");

        // 載入失敗碼
        flag = rand.nextInt(10);
        switch (flag) {
            case 1:
                appNewsDetail.setType1("102");
                break;
            case 2:
                appNewsDetail.setType1("201");
                break;
            case 3:
                appNewsDetail.setType1("325");
                break;
            case 4:
                appNewsDetail.setType1("433");
                break;
            case 5:
                appNewsDetail.setType1("542");
                break;
            default:
                appNewsDetail.setType1("");
                break;
        }

        // 分類
        flag = 1 + rand.nextInt(100);
        appNewsDetail.setCategory("" + flag);

        JSONObject eventJson = (JSONObject) JSON.toJSON(appNewsDetail);

        return packEventJson("newsdetail", eventJson);
    }

    /**
     * 商品列表
     */
    private static JSONObject generateNewList() {

        AppLoading appLoading = new AppLoading();

        // 動作
        int flag = rand.nextInt(3) + 1;
        appLoading.setAction(flag + "");

        // 載入時長
        flag = rand.nextInt(10) * rand.nextInt(7);
        appLoading.setLoading_time(flag + "");

        // 失敗碼
        flag = rand.nextInt(10);
        switch (flag) {
            case 1:
                appLoading.setType1("102");
                break;
            case 2:
                appLoading.setType1("201");
                break;
            case 3:
                appLoading.setType1("325");
                break;
            case 4:
                appLoading.setType1("433");
                break;
            case 5:
                appLoading.setType1("542");
                break;
            default:
                appLoading.setType1("");
                break;
        }

        // 頁面  載入類型
        flag = 1 + rand.nextInt(2);
        appLoading.setLoading_way("" + flag);

        // 擴展欄位1
        appLoading.setExtend1("");

        // 擴展欄位2
        appLoading.setExtend2("");

        // 用戶載入類型
        flag = 1 + rand.nextInt(3);
        appLoading.setType("" + flag);

        JSONObject jsonObject = (JSONObject) JSON.toJSON(appLoading);

        return packEventJson("loading", jsonObject);
    }

    /**
     * 廣告相關欄位
     */
    private static JSONObject generateAd() {

        AppAd appAd = new AppAd();

        // 入口
        int flag = rand.nextInt(3) + 1;
        appAd.setEntry(flag + "");

        // 動作
        flag = rand.nextInt(5) + 1;
        appAd.setAction(flag + "");

        // 內容類型類型
        flag = rand.nextInt(6) + 1;
        appAd.setContentType(flag + "");

        // 展示樣式
        flag = rand.nextInt(120000) + 1000;
        appAd.setDisplayMills(flag + "");

        flag = rand.nextInt(1);
        if (flag == 1) {
            appAd.setContentType(flag + "");
            flag = rand.nextInt(6);
            appAd.setItemId(flag + "");
        } else {
            appAd.setContentType(flag + "");
            flag = rand.nextInt(1) + 1;
            appAd.setActivityId(flag + "");
        }

        JSONObject jsonObject = (JSONObject) JSON.toJSON(appAd);

        return packEventJson("ad", jsonObject);
    }

    /**
     * 啟動日誌
     */
    private static AppStart generateStart() {

        AppStart appStart = new AppStart();

        //設備id
        appStart.setMid(s_mid + "");
        s_mid++;

        // 用戶id
        appStart.setUid(s_uid + "");
        s_uid++;

        // 程式版本號 5,6等
        appStart.setVc("" + rand.nextInt(20));

        //程式版本名 v1.1.1
        appStart.setVn("1." + rand.nextInt(4) + "." + rand.nextInt(10));

        // Android系統版本
        appStart.setOs("8." + rand.nextInt(3) + "." + rand.nextInt(10));

        //設置日誌類型
        appStart.setEn("start");

        //    語言  es,en,pt
        int flag = rand.nextInt(3);
        switch (flag) {
            case (0):
                appStart.setL("es");
                break;
            case (1):
                appStart.setL("en");
                break;
            case (2):
                appStart.setL("pt");
                break;
        }

        // 渠道號   從哪個渠道來的
        appStart.setSr(getRandomChar(1));

        // 區域
        flag = rand.nextInt(2);
        switch (flag) {
            case 0:
                appStart.setAr("BR");
            case 1:
                appStart.setAr("MX");
        }

        // 手機品牌 ba ,手機型號 md,就取2位數字了
        flag = rand.nextInt(3);
        switch (flag) {
            case 0:
                appStart.setBa("Sumsung");
                appStart.setMd("sumsung-" + rand.nextInt(20));
                break;
            case 1:
                appStart.setBa("Huawei");
                appStart.setMd("Huawei-" + rand.nextInt(20));
                break;
            case 2:
                appStart.setBa("HTC");
                appStart.setMd("HTC-" + rand.nextInt(20));
                break;
        }

        // 嵌入sdk的版本
        appStart.setSv("V2." + rand.nextInt(10) + "." + rand.nextInt(10));
        // gmail
        appStart.setG(getRandomCharAndNumr(8) + "@gmail.com");

        // 螢幕寬高 hw
        flag = rand.nextInt(4);
        switch (flag) {
            case 0:
                appStart.setHw("640*960");
                break;
            case 1:
                appStart.setHw("640*1136");
                break;
            case 2:
                appStart.setHw("750*1134");
                break;
            case 3:
                appStart.setHw("1080*1920");
                break;
        }

        // 客戶端產生日誌時間
        long millis = System.currentTimeMillis();
        appStart.setT("" + (millis - rand.nextInt(99999999)));

        // 手機網路模式 3G,4G,WIFI
        flag = rand.nextInt(3);
        switch (flag) {
            case 0:
                appStart.setNw("3G");
                break;
            case 1:
                appStart.setNw("4G");
                break;
            case 2:
                appStart.setNw("WIFI");
                break;
        }

        // 拉丁美洲 西經34°46′至西經117°09;北緯32°42′至南緯53°54′
        // 經度
        appStart.setLn((-34 - rand.nextInt(83) - rand.nextInt(60) / 10.0) + "");
        // 緯度
        appStart.setLa((32 - rand.nextInt(85) - rand.nextInt(60) / 10.0) + "");

        // 入口
        flag = rand.nextInt(5) + 1;
        appStart.setEntry(flag + "");

        // 開屏廣告類型
        flag = rand.nextInt(2) + 1;
        appStart.setOpen_ad_type(flag + "");

        // 狀態
        flag = rand.nextInt(10) > 8 ? 2 : 1;
        appStart.setAction(flag + "");

        // 載入時長
        appStart.setLoading_time(rand.nextInt(20) + "");

        // 失敗碼
        flag = rand.nextInt(10);
        switch (flag) {
            case 1:
                appStart.setDetail("102");
                break;
            case 2:
                appStart.setDetail("201");
                break;
            case 3:
                appStart.setDetail("325");
                break;
            case 4:
                appStart.setDetail("433");
                break;
            case 5:
                appStart.setDetail("542");
                break;
            default:
                appStart.setDetail("");
                break;
        }

        // 擴展欄位
        appStart.setExtend1("");

        return appStart;
    }

    /**
     * 消息通知
     */
    private static JSONObject generateNotification() {

        AppNotification appNotification = new AppNotification();

        int flag = rand.nextInt(4) + 1;

        // 動作
        appNotification.setAction(flag + "");

        // 通知id
        flag = rand.nextInt(4) + 1;
        appNotification.setType(flag + "");

        // 客戶端彈時間
        appNotification.setAp_time((System.currentTimeMillis() - rand.nextInt(99999999)) + "");

        // 備用欄位
        appNotification.setContent("");

        JSONObject jsonObject = (JSONObject) JSON.toJSON(appNotification);

        return packEventJson("notification", jsonObject);
    }

    /**
     * 後台活躍
     */
    private static JSONObject generateBackground() {

        AppActive appActive_background = new AppActive();

        // 啟動源
        int flag = rand.nextInt(3) + 1;
        appActive_background.setActive_source(flag + "");

        JSONObject jsonObject = (JSONObject) JSON.toJSON(appActive_background);

        return packEventJson("active_background", jsonObject);
    }

    /**
     * 錯誤日誌數據
     */
    private static JSONObject generateError() {

        AppErrorLog appErrorLog = new AppErrorLog();

        String[] errorBriefs = {"at cn.lift.dfdf.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)", "at cn.lift.appIn.control.CommandUtil.getInfo(CommandUtil.java:67)"};        //錯誤摘要
        String[] errorDetails = {"java.lang.NullPointerException\\n    " + "at cn.lift.appIn.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)\\n " + "at cn.lift.dfdf.web.AbstractBaseController.validInbound", "at cn.lift.dfdfdf.control.CommandUtil.getInfo(CommandUtil.java:67)\\n " + "at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\\n" + " at java.lang.reflect.Method.invoke(Method.java:606)\\n"};        //錯誤詳情

        //錯誤摘要
        appErrorLog.setErrorBrief(errorBriefs[rand.nextInt(errorBriefs.length)]);
        //錯誤詳情
        appErrorLog.setErrorDetail(errorDetails[rand.nextInt(errorDetails.length)]);

        JSONObject jsonObject = (JSONObject) JSON.toJSON(appErrorLog);

        return packEventJson("error", jsonObject);
    }

    /**
     * 為各個事件類型的公共欄位(時間、事件類型、Json數據)拼接
     */
    private static JSONObject packEventJson(String eventName, JSONObject jsonObject) {

        JSONObject eventJson = new JSONObject();

        eventJson.put("ett", (System.currentTimeMillis() - rand.nextInt(99999999)) + "");
        eventJson.put("en", eventName);
        eventJson.put("kv", jsonObject);

        return eventJson;
    }

    /**
     * 獲取隨機字母組合
     *
     * @param length 字元串長度
     */
    private static String getRandomChar(Integer length) {

        StringBuilder str = new StringBuilder();
        Random random = new Random();

        for (int i = 0; i < length; i++) {
            // 字元串
            str.append((char) (65 + random.nextInt(26)));// 取得大寫字母
        }

        return str.toString();
    }

    /**
     * 獲取隨機字母數字組合
     *
     * @param length 字元串長度
     */
    private static String getRandomCharAndNumr(Integer length) {

        StringBuilder str = new StringBuilder();
        Random random = new Random();

        for (int i = 0; i < length; i++) {

            boolean b = random.nextBoolean();

            if (b) { // 字元串
                // int choice = random.nextBoolean() ? 65 : 97; 取得65大寫字母還是97小寫字母
                str.append((char) (65 + random.nextInt(26)));// 取得大寫字母
            } else { // 數字
                str.append(String.valueOf(random.nextInt(10)));
            }
        }

        return str.toString();
    }

    /**
     * 收藏
     */
    private static JSONObject generateFavorites() {

        AppFavorites favorites = new AppFavorites();

        favorites.setCourse_id(rand.nextInt(10));
        favorites.setUserid(rand.nextInt(10));
        favorites.setAdd_time((System.currentTimeMillis() - rand.nextInt(99999999)) + "");

        JSONObject jsonObject = (JSONObject) JSON.toJSON(favorites);

        return packEventJson("favorites", jsonObject);
    }

    /**
     * 點贊
     */
    private static JSONObject generatePraise() {

        AppPraise praise = new AppPraise();

        praise.setId(rand.nextInt(10));
        praise.setUserid(rand.nextInt(10));
        praise.setTarget_id(rand.nextInt(10));
        praise.setType(rand.nextInt(4) + 1);
        praise.setAdd_time((System.currentTimeMillis() - rand.nextInt(99999999)) + "");

        JSONObject jsonObject = (JSONObject) JSON.toJSON(praise);

        return packEventJson("praise", jsonObject);
    }

    /**
     * 評論
     */
    private static JSONObject generateComment() {

        AppComment comment = new AppComment();

        comment.setComment_id(rand.nextInt(10));
        comment.setUserid(rand.nextInt(10));
        comment.setP_comment_id(rand.nextInt(5));

        comment.setContent(getCONTENT());
        comment.setAddtime((System.currentTimeMillis() - rand.nextInt(99999999)) + "");

        comment.setOther_id(rand.nextInt(10));
        comment.setPraise_count(rand.nextInt(1000));
        comment.setReply_count(rand.nextInt(200));

        JSONObject jsonObject = (JSONObject) JSON.toJSON(comment);

        return packEventJson("comment", jsonObject);
    }

    /**
     * 生成單個漢字
     */
    private static char getRandomChar() {
        String str = "";
        int hightPos; //
        int lowPos;

        Random random = new Random();

        //隨機生成漢子的兩個位元組
        hightPos = (176 + Math.abs(random.nextInt(39)));
        lowPos = (161 + Math.abs(random.nextInt(93)));

        byte[] b = new byte[2];
        b[0] = (Integer.valueOf(hightPos)).byteValue();
        b[1] = (Integer.valueOf(lowPos)).byteValue();

        try {
            str = new String(b, "GBK");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
            System.out.println("錯誤");
        }
        return str.charAt(0);
    }

    /**
     * 拼接成多個漢字
     */
    private static String getCONTENT() {

        StringBuilder str = new StringBuilder();

        for (int i = 0; i < rand.nextInt(100); i++) {
            str.append(getRandomChar());
        }
        return str.toString();
    }
}

2.4.5 啟動測試

注意,需要將日誌模擬放到2台伺服器上,模擬日誌每一條中即包括公共日誌,又包含事件日誌,需要flume攔截器進行日誌分發,當然也需要兩個flume-ng來做這個事情

打包上傳2台伺服器節點,生產數據為後面的測試做準備,這裡為用戶目錄test文件夾下

通過參數控制生成消息速度及產量(如下 2秒一條,列印1000條)

#控制時間及條數
nohup java -jar data-producer-1.0-SNAPSHOT-jar-with-dependencies.jar 2000 1000 &
#監控日誌
tail -F /root/logs/*.log

通過www.json.cn查看數據格式

3 創建KafKa-Topic

  • 創建啟動日誌主題:topic_start
  • 創建事件日誌主題:topic_event

4 Flume準備

共分為2組flume

第一組:將伺服器日誌收集,並使用Kafka-Channels將數據發往Kafka不同的Topic,其中使用攔截器進行公共日誌和事件日誌的分發,

第二組:收集Kafka數據,使用Flie-Channels快取數據,最終發往Hdfs保存

4.1 Flume:File->Kafka配置編寫

  • vim /root/test/file-flume-kafka.conf
#1 定義組件
a1.sources=r1
a1.channels=c1 c2

# 2 source配置 type類型 positionFile記錄日誌讀取位置 filegroups讀取哪些目錄  app.+為讀取什麼開頭 channels發往哪裡
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /root/test/flume/log_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /root/logs/app.+
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1 c2

#3 攔截器 這裡2個為自定義的攔截器 multiplexing為類型區分選擇器 header頭用於區分類型 mapping匹配頭
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = com.heaton.bigdata.flume.LogETLInterceptor$Builder
a1.sources.r1.interceptors.i2.type = com.heaton.bigdata.flume.LogTypeInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = topic
a1.sources.r1.selector.mapping.topic_start = c1
a1.sources.r1.selector.mapping.topic_event = c2

#4 channel配置 kafkaChannel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = cdh01.cm:9092,cdh02.cm:9092,cdh03.cm:9092
a1.channels.c1.kafka.topic = topic_start
a1.channels.c1.parseAsFlumeEvent = false
a1.channels.c1.kafka.consumer.group.id = flume-consumer

a1.channels.c2.type =org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.kafka.bootstrap.servers = cdh01.cm:9092,cdh02.cm:9092,cdh03.cm:9092
a1.channels.c2.kafka.topic = topic_event
a1.channels.c2.parseAsFlumeEvent = false
a1.channels.c2.kafka.consumer.group.id = flume-consumer

在生產日誌的2台伺服器節點上創建flume配置文件。

LogETLInterceptor,LogTypeInterceptor為自定義攔截

4.2 自定義攔截器

data-flume工程

  • LogUtils
import org.apache.commons.lang.math.NumberUtils;

public class LogUtils {
    public static boolean validateEvent(String log) {
        /** 伺服器時間 | json
         1588319303710|{
         "cm":{
         "ln":"-51.5","sv":"V2.0.7","os":"8.0.8","g":"[email protected]","mid":"13",
         "nw":"4G","l":"en","vc":"7","hw":"640*960","ar":"MX","uid":"13","t":"1588291826938",
         "la":"-38.2","md":"Huawei-14","vn":"1.3.6","ba":"Huawei","sr":"Y"
         },
         "ap":"app",
         "et":[{
                "ett":"1588228193191","en":"ad","kv":{"activityId":"1","displayMills":"113201","entry":"3","action":"5","contentType":"0"}
                },{
                "ett":"1588300304713","en":"notification","kv":{"ap_time":"1588277440794","action":"2","type":"3","content":""}
                },{
                "ett":"1588249203743","en":"active_background","kv":{"active_source":"3"}
                },{
                "ett":"1588254200122","en":"favorites","kv":{"course_id":5,"id":0,"add_time":"1588264138625","userid":0}
                },{
                "ett":"1588281152824","en":"praise","kv":{"target_id":4,"id":3,"type":3,"add_time":"1588307696417","userid":8}
                }]
         }
         */
        // 1 切割
        String[] logContents = log.split("\\|");
        // 2 校驗
        if (logContents.length != 2) {
            return false;
        }
        //3 校驗伺服器時間
        if (logContents[0].length() != 13 || !NumberUtils.isDigits(logContents[0])) {
            return false;
        }
        // 4 校驗 json
        if (!logContents[1].trim().startsWith("{")
                || !logContents[1].trim().endsWith("}")) {
            return false;
        }
        return true;
    }

    public static boolean validateStart(String log) {
        /**
         {
         "action":"1","ar":"MX","ba":"HTC","detail":"201","en":"start","entry":"4","extend1":"",
         "g":"[email protected]","hw":"750*1134","l":"pt","la":"-29.7","ln":"-48.1","loading_time":"0",
         "md":"HTC-18","mid":"14","nw":"3G","open_ad_type":"2","os":"8.0.8","sr":"D","sv":"V2.8.2",
         "t":"1588251833523","uid":"14","vc":"15","vn":"1.2.9"
         }
        */
        if (log == null) {
            return false;
        }
        // 校驗 json
        if (!log.trim().startsWith("{") || !log.trim().endsWith("}")) {
            return false;
        }
        return true;
    }
}
  • LogETLInterceptor
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;

public class LogETLInterceptor implements Interceptor {
    @Override
    public void initialize() {
    //初始化
    }

    @Override
    public Event intercept(Event event) {
        // 1 獲取數據
        byte[] body = event.getBody();
        String log = new String(body, Charset.forName("UTF-8"));
        // 2 判斷數據類型並向 Header 中賦值
        if (log.contains("start")) {
            if (LogUtils.validateStart(log)) {
                return event;
            }
        } else {
            if (LogUtils.validateEvent(log)) {
                return event;
            }
        }
        // 3 返回校驗結果
        return null;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        ArrayList<Event> interceptors = new ArrayList<>();
        for (Event event : events) {
            Event intercept1 = intercept(event);
            if (intercept1 != null) {
                interceptors.add(intercept1);
            }
        }
        return interceptors;
    }

    @Override
    public void close() {
    //關閉
    }

    public static class Builder implements Interceptor.Builder {
        @Override
        public Interceptor build() {
            return new LogETLInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}
  • LogTypeInterceptor
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class LogTypeInterceptor implements Interceptor {
    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
        // 區分日誌類型: body header
        // 1 獲取 body 數據
        byte[] body = event.getBody();
        String log = new String(body, Charset.forName("UTF-8"));
        // 2 獲取 header
        Map<String, String> headers = event.getHeaders();
        // 3 判斷數據類型並向 Header 中賦值
        if (log.contains("start")) {
            headers.put("topic", "topic_start");
        } else {
            headers.put("topic", "topic_event");
        }
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        ArrayList<Event> interceptors = new ArrayList<>();
        for (Event event : events) {
            Event intercept1 = intercept(event);
            interceptors.add(intercept1);
        }
        return interceptors;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder {
        @Override
        public Interceptor build() {
            return new LogTypeInterceptor();
        }

        @Override
        public void configure(Context context) {
        }
    }
}

將項目打包放入Flume/lib目錄下(所有節點):

CDH路徑參考:/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/flume-ng/lib

4.3 Flume啟停腳本

  • vim /root/log-kafka-flume.sh
#! /bin/bash

case $1 in
"start"){
	for i in cdh02.cm cdh03.cm
	do
		echo " --------啟動 $i 消費 flume-------"
		ssh $i "nohup flume-ng agent --conf-file /root/test/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/root/test/file-flume-kafka.log 2>&1 &"
	done
};;
"stop"){
	for i in cdh02.cm cdh03.cm
	do
		echo " --------停止 $i 消費 flume-------"
		ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk '{print \$2}' | xargs kill"
	done
};;
esac


4.4 Flume:Kafka->HDFS配置編寫

在第三台服務上準備

  • vim /root/test/kafka-flume-hdfs.conf
## 組件 
a1.sources=r1 r2
a1.channels=c1 c2 
a1.sinks=k1 k2
    
## Kafka-source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers= cdh01.cm:9092,cdh02.cm:9092,cdh03.cm:9092
a1.sources.r1.kafka.topics = topic_start
## Kafka- source2
a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.batchSize = 5000
a1.sources.r2.batchDurationMillis = 2000
a1.sources.r2.kafka.bootstrap.servers = cdh01.cm:9092,cdh02.cm:9092,cdh03.cm:9092
a1.sources.r2.kafka.topics = topic_event
    
## channel1
a1.channels.c1.type = file
##索引文件路徑
a1.channels.c1.checkpointDir=/root/test/flume/checkpoint/behavior1
##持久化路徑
a1.channels.c1.dataDirs = /root/test/flume/data/behavior1/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6
## channel2
a1.channels.c2.type = file
##索引文件路徑
a1.channels.c1.checkpointDir=/root/test/flume/checkpoint/behavior2
##持久化路徑
a1.channels.c1.dataDirs = /root/test/flume/data/behavior2/
a1.channels.c2.maxFileSize = 2146435071
a1.channels.c2.capacity = 1000000
a1.channels.c2.keep-alive = 6
    
## HDFS-sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path=/origin_data/gmall/log/topic_start/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logstart-
## HDFS-sink2       
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = logevent-
    
## 不要產生大量小文件
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k2.hdfs.rollInterval = 50
a1.sinks.k2.hdfs.rollSize = 134217728
a1.sinks.k2.hdfs.rollCount = 0
## 控制輸出文件是原生文件。
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k2.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = snappy
a1.sinks.k2.hdfs.codeC = snappy
    
## 組件拼裝
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
a1.sources.r2.channels = c2
a1.sinks.k2.channel= c2

4.5 Flume啟停腳本

在第三台服務上準備

  • vim /root/test/kafka-hdfs-flume.sh
#! /bin/bash

case $1 in
"start"){
	for i in cdh01.cm
	do
		echo " --------啟動 $i 消費 flume-------"
		ssh $i "nohup flume-ng agent --conf-file /root/test/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/root/test/kafka-flume-hdfs.log 2>&1 &"
	done
};;
"stop"){
	for i in cdh01.cm
	do
		echo " --------停止 $i 消費 flume-------"
		ssh $i "ps -ef | grep kafka-flume-hdfs | grep -v grep |awk '{print \$2}' | xargs kill"
	done
};;
esac


5 業務數據

此模組後主要針對於企業報表決策,為數據分析提供數據支援,解決大數據量下,無法快速產出報表,及一些即席業務需求的快速展示提供數據支撐。劃分企業離線與實時業務,用離線的方式直觀的管理數據呈現,為實時方案奠定良好基礎。

5.1 電商業務流程

5.2 SKU-SPU

  • SKU(Stock Keeping Unit):庫存量基本單位,現在已經被引申為產品統一編號的簡稱, 每種產品均對應有唯一的 SKU 號。
  • SPU(Standard Product Unit):是商品資訊聚合的最小單位,是一組可復用、易檢索的 標準化資訊集合。
  • 總結:黑鯊3 手機就是 SPU。一台鎧甲灰、256G 記憶體的就是 SKU。

5.3 業務表結構

5.3.1 訂單表(order_info)

5.3.2 訂單詳情表(order_detail)

5.3.3 SKU 商品表(sku_info)

5.3.4 用戶表(user_info)

5.3.5 商品一級分類表(base_category1)

5.3.6 商品二級分類表(base_category2)

5.3.7 商品三級分類表(base_category3)

5.3.8 支付流水表(payment_info)

5.3.9 省份表(base_province)

5.3.10 地區表(base_region)

5.3.11 品牌表(base_trademark)

5.3.12 訂單狀態表(order_status_log)

5.3.13 SPU 商品表(spu_info)

5.3.14 商品評論表(comment_info)

5.3.15 退單表(order_refund_info)

5.3.16 加入購物車表(cart_info)

5.3.17 商品收藏表(favor_info)

5.3.18 優惠券領用表(coupon_use)

5.3.19 優惠券表(coupon_info)

5.3.20 活動表(activity_info)

5.3.21 活動訂單關聯表(activity_order)

5.3.22 優惠規則表(activity_rule)

5.3.23 編碼字典表(base_dic)

5.3.24 活動參與商品表(activity_sku)

5.4 時間表結構

5.4.1 時間表(date_info)

5.4.2 假期表(holiday_info)

5.4.3 假期年表(holiday_year)

6 同步策略及數倉分層

數據同步策略的類型包括:全量表、增量表、新增及變化表

  • 全量表:每天一個分區,存儲完整的數據。

  • 增量表:每天新增數據放在一個分區,存儲新增加的數據。

  • 新增及變化表:每天新增和變化的數據放在一個分區,存儲新增加的數據和變化的數據。

  • 特殊表:沒有分區,只需要存儲一次。

6.1 全量策略

每日全量,每天存儲一份完整數據,作為一個分區。

適合場景:表數據量不大,且有新增或修改業務的場景

例如:品牌表、編碼表、商品分類表、優惠規則表、活動表、商品表、加購表、收藏表、SKU/SPU表

6.2 增量策略

每日增量,每天儲存一份增量數據,作為一個分區

適合場景:表數據量大,且只會有新增數據的場景。

例如:退單表、訂單狀態表、支付流水表、訂單詳情表、活動與訂單關聯表、商品評論表

6.3 新增及變化策略

每日新增及變化,儲存創建時間和操作時間都是今天的數據,作為一個分區

適合場景:表數據量大,既會有新增,又會有修改。

例如:用戶表、訂單表、優惠卷領用表。

6.4 特殊策略

某些特殊的維度表,可不必遵循上述同步策略,在數倉中只做一次同步,數據不變化不更新

適合場景:表數據幾乎不會變化

1.客觀世界維度:沒變化的客觀世界的維度(比如性別,地區,民族,政治成分,鞋子尺碼)可以只存一 份固定值

2.日期維度:日期維度可以一次性導入一年或若干年的數據。

3.地區維度:省份表、地區表

6.5 分析業務表同步策略

考慮到特殊表可能會緩慢變化,比如打仗佔地盤,地區表可能就會發生變化,故也選擇分區全量同步策略。

6.6 數倉分層

  • 為什麼分層:
    • 簡單化:把複雜的任務分解為多層來完成,每層處理各自的任務,方便定位問題。
    • 減少重複開發:規範數據分層,通過中間層數據,能夠極大的減少重複計算,增加結果復用性。
    • 隔離數據:不論是數據異常還是數據敏感性,使真實數據和統計數據解耦。
    • 一般在DWD層進行維度建模
  • ODS層:原始數據層,存放原始數據
  • DWD層:對ODS層數據進行清洗(去空、臟數據,轉換類型等),維度退化,脫敏(保護隱私)
  • DWS層:以DWD為基礎,按天進行匯總
  • DWT層:以DWS為基礎,按主題進行匯總
  • ADS層:為各種數據分析報表提供數據

7 Sqoop同步數據

Sqoop注意點:

Hive 中的 Null 在底層是以「\N」來存儲,而 MySQL 中的 Null 在底層就是 Null,為了 保證數據兩端的一致性。

  • 在導出數據時採用 –input-null-string 和 –input-null-non-string
  • 導入數據時採用 –null-string 和 –null-non-string

本例思路為:sqoop抽取mysql數據上傳至Hdfs上,存儲為parquet文件,在建立hive-ods表,使用對應數據。

使用DolphinScheduler調度執行腳本。

  • Sqoop採集Mysql和Hive數據格式
mysql欄位類型 hive:ods欄位類型 hive:dwd-ads欄位類型
tinyint tinyint tinyint
int int int
bigint bigint bigint
varchar string string
datetime bigint string
bit boolean int
double double double
decimal decimal decimal

8 ods層構建

8.1 ods建表

hive創建ods資料庫,使用DolphinScheduler創建數據源,在創建DAG時需要選擇hive庫。

順便將dwd,dws,dwt,ads一起創建了

  1. base_dic
drop table if exists ods.mall__base_dic

CREATE EXTERNAL TABLE `ods.mall__base_dic`(
  `dic_code` string COMMENT '編號',
  `dic_name` string COMMENT '編碼名稱',
  `parent_code`  string COMMENT '父編號',
  `create_time` bigint COMMENT '創建日期',
  `operate_time` bigint COMMENT '修改日期'
  ) COMMENT '編碼字典表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/base_dic/'
tblproperties ("parquet.compression"="snappy") 
  1. base_trademark
drop table if exists ods.mall__base_trademark

CREATE EXTERNAL TABLE `ods.mall__base_trademark`(
  `tm_id` string COMMENT '品牌id',
  `tm_name` string COMMENT '品牌名稱'
  ) COMMENT '品牌表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/base_trademark/'
tblproperties ("parquet.compression"="snappy")
  1. base_category3
drop table if exists ods.mall__base_category3

CREATE EXTERNAL TABLE `ods.mall__base_category3`(
  `id` bigint COMMENT '編號',
  `name` string COMMENT '三級分類名稱',
  `category2_id` bigint COMMENT '二級分類編號'
  ) COMMENT '三級分類表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/base_category3/'
tblproperties ("parquet.compression"="snappy") 

  1. base_category2
drop table if exists ods.mall__base_category2

CREATE EXTERNAL TABLE `ods.mall__base_category2`(
  `id` bigint COMMENT '編號',
  `name` string COMMENT '二級分類名稱',
  `category1_id` bigint COMMENT '一級分類編號'
  ) COMMENT '二級分類表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/base_category2/'
tblproperties ("parquet.compression"="snappy") 
  1. base_category1
drop table if exists ods.mall__base_category1

CREATE EXTERNAL TABLE `ods.mall__base_category1`(
  `id` bigint COMMENT '編號',
  `name` string COMMENT '分類名稱'
  ) COMMENT '一級分類表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/base_category1/'
tblproperties ("parquet.compression"="snappy") 
  1. activity_rule
drop table if exists ods.mall__activity_rule

CREATE EXTERNAL TABLE `ods.mall__activity_rule`(
  `id` int COMMENT '編號',
  `activity_id` int COMMENT '類型',
  `condition_amount` decimal(16,2) COMMENT '滿減金額',
  `condition_num` bigint COMMENT '滿減件數',
  `benefit_amount` decimal(16,2) COMMENT '優惠金額',
  `benefit_discount` bigint COMMENT '優惠折扣',
  `benefit_level` bigint COMMENT '優惠級別'
  ) COMMENT '優惠規則'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/activity_rule/'
tblproperties ("parquet.compression"="snappy") 
  1. activity_info
drop table if exists ods.mall__activity_info

CREATE EXTERNAL TABLE `ods.mall__activity_info`(
  `id` bigint COMMENT '活動id',
  `activity_name` string COMMENT '活動名稱',
  `activity_type` string COMMENT '活動類型',
  `start_time` bigint COMMENT '開始時間',
  `end_time` bigint COMMENT '結束時間',
  `create_time` bigint COMMENT '創建時間'
  ) COMMENT '活動表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/activity_info/'
tblproperties ("parquet.compression"="snappy") 
  1. activity_sku
drop table if exists ods.mall__activity_sku

CREATE EXTERNAL TABLE `ods.mall__activity_sku`(
  `id` bigint COMMENT '編號',
  `activity_id` bigint COMMENT '活動id',
  `sku_id` bigint COMMENT 'sku_id',
  `create_time` bigint COMMENT '創建時間'
  ) COMMENT '活動參與商品'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/activity_sku/'
tblproperties ("parquet.compression"="snappy") 
  1. cart_info
drop table if exists ods.mall__cart_info

CREATE EXTERNAL TABLE `ods.mall__cart_info`(
  `id` bigint COMMENT '編號',
  `user_id` bigint COMMENT '用戶id',
  `sku_id` bigint COMMENT 'sku_id',
  `cart_price` decimal(10,2) COMMENT '放入購物車時價格',
  `sku_num` bigint COMMENT '數量',
  `sku_name` string COMMENT 'sku名稱',
  `create_time` bigint COMMENT '創建時間',
  `operate_time` bigint COMMENT '修改時間',
  `is_ordered` bigint COMMENT '是否已經下單',
  `order_time` bigint COMMENT '下單時間'
  ) COMMENT '購物車表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/cart_info/'
tblproperties ("parquet.compression"="snappy") 
  1. favor_info
drop table if exists ods.mall__favor_info

CREATE EXTERNAL TABLE `ods.mall__favor_info`(
  `id` bigint COMMENT '編號',
  `user_id` bigint COMMENT '用戶id',
  `sku_id` bigint COMMENT 'sku_id',
  `spu_id` bigint COMMENT '商品id',
  `is_cancel` string COMMENT '是否已取消 0 正常 1 已取消',
  `create_time` bigint COMMENT '創建時間',
  `cancel_time` bigint COMMENT '修改時間'
  ) COMMENT '商品收藏表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/favor_info/'
tblproperties ("parquet.compression"="snappy") 
  1. coupon_info
drop table if exists ods.mall__coupon_info

CREATE EXTERNAL TABLE `ods.mall__coupon_info`(
  `id` bigint COMMENT '購物券編號',
  `coupon_name` string COMMENT '購物券名稱',
  `coupon_type` string COMMENT '購物券類型 1 現金券 2 折扣券 3 滿減券 4 滿件打折券',
  `condition_amount` decimal(10,2) COMMENT '滿額數',
  `condition_num` bigint COMMENT '滿件數',
  `activity_id` bigint COMMENT '活動編號',
  `benefit_amount` decimal(16,2) COMMENT '減金額',
  `benefit_discount` bigint COMMENT '折扣',
  `create_time` bigint COMMENT '創建時間',
  `range_type` string COMMENT '範圍類型 1、商品 2、品類 3、品牌',
  `spu_id` bigint COMMENT '商品id',
  `tm_id` bigint COMMENT '品牌id',
  `category3_id` bigint COMMENT '品類id',
  `limit_num` int COMMENT '最多領用次數',
  `operate_time` bigint COMMENT '修改時間',
  `expire_time` bigint COMMENT '過期時間'
  ) COMMENT '優惠券表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/coupon_info/'
tblproperties ("parquet.compression"="snappy") 
  1. sku_info
drop table if exists ods.mall__sku_info

CREATE EXTERNAL TABLE `ods.mall__sku_info`(
  `id` bigint COMMENT 'skuid',
  `spu_id` bigint COMMENT 'spuid',
  `price` decimal(10,0) COMMENT '價格',
  `sku_name` string COMMENT 'sku名稱',
  `sku_desc` string COMMENT '商品規格描述',
  `weight` decimal(10,2) COMMENT '重量',
  `tm_id` bigint COMMENT '品牌',
  `category3_id` bigint COMMENT '三級分類id',
  `create_time` bigint COMMENT '創建時間'
  ) COMMENT '庫存單元表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/sku_info/'
tblproperties ("parquet.compression"="snappy") 
  1. spu_info
drop table if exists ods.mall__spu_info

CREATE EXTERNAL TABLE `ods.mall__spu_info`(
  `id` bigint COMMENT '商品id',
  `spu_name` string COMMENT '商品名稱',
  `category3_id` bigint COMMENT '三級分類id',
  `tm_id` bigint COMMENT '品牌id'
  ) COMMENT '商品表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/spu_info/'
tblproperties ("parquet.compression"="snappy") 
  1. base_province
drop table if exists ods.mall__base_province

CREATE EXTERNAL TABLE `ods.mall__base_province`(
  `id` bigint COMMENT 'id',
  `name` string COMMENT '省名稱',
  `region_id` string COMMENT '大區id',
  `area_code` string COMMENT '行政區位碼',
  `iso_code` string COMMENT '國際編碼'
  ) COMMENT '省份表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/base_province/'
tblproperties ("parquet.compression"="snappy") 
  1. base_region
drop table if exists ods.mall__base_region

CREATE EXTERNAL TABLE `ods.mall__base_region`(
  `id` string COMMENT '大區id',
  `region_name` string COMMENT '大區名稱'
  ) COMMENT '地區表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/base_region/'
tblproperties ("parquet.compression"="snappy") 
  1. refund_info
drop table if exists ods.mall__order_refund_info

CREATE EXTERNAL TABLE `ods.mall__order_refund_info`(
  `id` bigint COMMENT '編號',
  `user_id` bigint COMMENT '用戶id',
  `order_id` bigint COMMENT '訂單編號',
  `sku_id` bigint COMMENT 'skuid',
  `refund_type` string COMMENT '退款類型',
  `refund_num` bigint COMMENT '退貨件數',
  `refund_amount` decimal(16,2) COMMENT '退款金額',
  `refund_reason_type` string COMMENT '原因類型',
  `create_time` bigint COMMENT '創建時間'
  ) COMMENT '退單表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/order_refund_info/'
tblproperties ("parquet.compression"="snappy") 
  1. order_status_log
drop table if exists ods.mall__order_status_log

CREATE EXTERNAL TABLE `ods.mall__order_status_log`(
  `id` bigint COMMENT '編號',
  `order_id` bigint COMMENT '訂單編號',
  `order_status` string COMMENT '訂單狀態',
  `operate_time` bigint COMMENT '操作時間'
  ) COMMENT '訂單狀態表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/order_status_log/'
tblproperties ("parquet.compression"="snappy") 
  1. payment_info
drop table if exists ods.mall__payment_info

CREATE EXTERNAL TABLE `ods.mall__payment_info`(
  `id` bigint COMMENT '編號',
  `out_trade_no` string COMMENT '對外業務編號',
  `order_id` bigint COMMENT '訂單編號',
  `user_id` bigint COMMENT '用戶編號',
  `alipay_trade_no` string COMMENT '支付寶交易流水編號',
  `total_amount` decimal(16,2) COMMENT '支付金額',
  `subject` string COMMENT '交易內容',
  `payment_type` string COMMENT '支付方式',
  `payment_time` bigint COMMENT '支付時間'
  ) COMMENT '支付流水表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/payment_info/'
tblproperties ("parquet.compression"="snappy") 
  1. order_detail
drop table if exists ods.mall__order_detail

CREATE EXTERNAL TABLE `ods.mall__order_detail`(
  `id` bigint COMMENT '編號',
  `order_id` bigint COMMENT '訂單編號',
  `user_id` bigint COMMENT '用戶id',
  `sku_id` bigint COMMENT 'sku_id',
  `sku_name` string COMMENT 'sku名稱',
  `order_price` decimal(10,2) COMMENT '購買價格(下單時sku價格)',
  `sku_num` string COMMENT '購買個數',
  `create_time` bigint COMMENT '創建時間'
  ) COMMENT '訂單明細表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/order_detail/'
tblproperties ("parquet.compression"="snappy") 
  1. activity_order
drop table if exists ods.mall__activity_order

CREATE EXTERNAL TABLE `ods.mall__activity_order`(
  `id` bigint COMMENT '編號',
  `activity_id` bigint COMMENT '活動id',
  `order_id` bigint COMMENT '訂單編號',
  `create_time` bigint COMMENT '發生日期'
  ) COMMENT '活動與訂單關聯表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/activity_order/'
tblproperties ("parquet.compression"="snappy") 
  1. comment_info
drop table if exists ods.mall__comment_info

CREATE EXTERNAL TABLE `ods.mall__comment_info`(
  `id` bigint COMMENT '編號',
  `user_id` bigint COMMENT '用戶名稱',
  `sku_id` bigint COMMENT 'skuid',
  `spu_id` bigint COMMENT '商品id',
  `order_id` bigint COMMENT '訂單編號',
  `appraise` string COMMENT '評價 1 好評 2 中評 3 差評',
  `comment_txt` string COMMENT '評價內容',
  `create_time` bigint COMMENT '創建時間'
  ) COMMENT '商品評論表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/comment_info/'
tblproperties ("parquet.compression"="snappy") 
  1. coupon_use
drop table if exists ods.mall__coupon_use

CREATE EXTERNAL TABLE `ods.mall__coupon_use`(
  `id` bigint COMMENT '編號',
  `coupon_id` bigint COMMENT '購物券ID',
  `user_id` bigint COMMENT '用戶ID',
  `order_id` bigint COMMENT '訂單ID',
  `coupon_status` string COMMENT '購物券狀態',
  `get_time` bigint COMMENT '領券時間',
  `using_time` bigint COMMENT '使用時間',
  `used_time` bigint COMMENT '過期時間'
  ) COMMENT '優惠券領用表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/coupon_use/'
tblproperties ("parquet.compression"="snappy")
  1. user_info
drop table if exists ods.mall__user_info

CREATE EXTERNAL TABLE `ods.mall__user_info`(
  `id` bigint COMMENT '編號',
  `name` string COMMENT '用戶姓名',
  `email` string COMMENT '郵箱',
  `user_level` string COMMENT '用戶級別',
  `birthday` bigint COMMENT '用戶生日',
  `gender` string COMMENT '性別 M男,F女',
  `create_time` bigint COMMENT '創建時間',
  `operate_time` bigint COMMENT '修改時間'
  ) COMMENT '用戶表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/user_info/'
tblproperties ("parquet.compression"="snappy") 
  1. order_info
drop table if exists ods.mall__order_info

CREATE EXTERNAL TABLE `ods.mall__order_info`(
  `id` bigint COMMENT '編號',
  `final_total_amount` decimal(16,2) COMMENT '總金額',
  `order_status` string COMMENT '訂單狀態',
  `user_id` bigint COMMENT '用戶id',
  `out_trade_no` string COMMENT '訂單交易編號(第三方支付用)',
  `create_time` bigint COMMENT '創建時間',
  `operate_time` bigint COMMENT '操作時間',
  `province_id` int COMMENT '地區',
  `benefit_reduce_amount` decimal(16,2) COMMENT '優惠金額',
  `original_total_amount` decimal(16,2) COMMENT '原價金額',
  `feight_fee` decimal(16,2) COMMENT '運費'
  ) COMMENT '訂單表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/order_info/'
tblproperties ("parquet.compression"="snappy")
  1. start_log

此為埋點啟動日誌表

drop table if exists ods.mall__start_log

CREATE EXTERNAL TABLE `ods.mall__start_log`(
  `line` string COMMENT '啟動日誌'
  ) COMMENT '啟動日誌表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
location '/warehouse/ods/mall/start_log/'
  1. event_log

此為埋點事件日誌表

drop table if exists ods.mall__event_log

CREATE EXTERNAL TABLE `ods.mall__event_log`(
  `line` string COMMENT '事件日誌'
  ) COMMENT '事件日誌表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
location '/warehouse/ods/mall/event_log/'
  1. date_info

此為時間表

drop table if exists ods.mall__date_info

CREATE EXTERNAL TABLE `ods.mall__date_info`(
`date_id` int COMMENT '日',
`week_id` int COMMENT '周',
`week_day` int COMMENT '周的第幾天',
`day` int COMMENT '每月的第幾天',
`month` int COMMENT '第幾月',
`quarter` int COMMENT '第幾季度',
`year` int COMMENT '年',
`is_workday` int COMMENT '是否是周末',
`holiday_id` int COMMENT '是否是節假日'
  ) COMMENT '時間維度表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/date_info/'
tblproperties ("parquet.compression"="snappy") 

8.2 mysql數據抽取

  • sqoop抽取腳本基礎
#!/bin/bash

db_date=${date}
mysql_db_name=${db_name}
mysql_db_addr=${db_addr}
mysql_db_user=${db_user}
mysql_db_password=${db_password}

# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

echo "日期:"$db_date
echo "mysql庫名:"$mysql_db_name


import_data() {
/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/sqoop import \
--connect jdbc:mysql://$mysql_db_addr:3306/$mysql_db_name?tinyInt1isBit=false \
--username $mysql_db_user \
--password $mysql_db_password \
--target-dir /origin_data/$mysql_db_name/$1/$db_date \
--delete-target-dir \
--num-mappers 1 \
--null-string '' \
--null-non-string '\\n' \
--fields-terminated-by "\t" \
--query "$2"' and $CONDITIONS;' \
--as-parquetfile 
}
  • DolphinScheduler全局參數

date 不傳為昨天
db_name 資料庫名字
db_addr 資料庫IP地址
db_user 資料庫用戶
db_password 資料庫密碼

元數據中數據開始日期為2020-03-15

如下導入數據程式碼片段,拼接上述的基礎片段執行

  • 全量表程式碼片段
import_data "base_dic" "select
dic_code,
dic_name,
parent_code,
create_time,
operate_time
from base_dic
where 1=1"

import_data "base_trademark" "select
tm_id,
tm_name
from base_trademark
where 1=1"

import_data "base_category3" "select
id,
name,
category2_id
from base_category3 where 1=1"

import_data "base_category2" "select
id,
name,
category1_id
from base_category2 where 1=1"

import_data "base_category1" "select
id,
name
from base_category1 where 1=1"

import_data "activity_rule" "select
id,
activity_id,
condition_amount,
condition_num,
benefit_amount,
benefit_discount,
benefit_level
from activity_rule
where 1=1"

import_data "activity_info" "select
id,
activity_name,
activity_type,
start_time,
end_time,
create_time
from activity_info
where 1=1"

import_data "activity_sku" "select
id,
activity_id,
sku_id,
create_time
FROM
activity_sku
where 1=1"

import_data "cart_info" "select
id,
user_id,
sku_id,
cart_price,
sku_num,
sku_name,
create_time,
operate_time,
is_ordered,
order_time
from cart_info
where 1=1"

import_data "favor_info" "select
id,
user_id,
sku_id,
spu_id,
is_cancel,
create_time,
cancel_time
from favor_info
where 1=1"

import_data "coupon_info" "select
id,
coupon_name,
coupon_type,
condition_amount,
condition_num,
activity_id,
benefit_amount,
benefit_discount,
create_time,
range_type,
spu_id,
tm_id,
category3_id,
limit_num,
operate_time,
expire_time
from coupon_info
where 1=1"

import_data "sku_info" "select
id,
spu_id,
price,
sku_name,
sku_desc,
weight,
tm_id,
category3_id,
create_time
from sku_info where 1=1"

import_data "spu_info" "select
id,
spu_name,
category3_id,
tm_id
from spu_info
where 1=1"
  • 特殊表程式碼片段
import_data "base_province" "select
id,
name,
region_id,
area_code,
iso_code
from base_province
where 1=1"

import_data "base_region" "select
id,
region_name
from base_region
where 1=1"

import_data "date_info" "select
date_id,
week_id,
week_day,
day,
month,
quarter,
year,
is_workday,
holiday_id
from date_info
where 1=1"
  • 增量表程式碼片段
import_data "order_refund_info" "select
id,
user_id,
order_id,
sku_id,
refund_type,
refund_num,
refund_amount,
refund_reason_type,
create_time
from order_refund_info
where
date_format(create_time,'%Y-%m-%d')='$db_date'"

import_data "order_status_log" "select
id,
order_id,
order_status,
operate_time
from order_status_log
where
date_format(operate_time,'%Y-%m-%d')='$db_date'"

import_data "payment_info" "select
id,
out_trade_no,
order_id,
user_id,
alipay_trade_no,
total_amount,
subject,
payment_type,
payment_time
from payment_info
where
DATE_FORMAT(payment_time,'%Y-%m-%d')='$db_date'"

import_data "order_detail" "select
od.id,
od.order_id,
oi.user_id,
od.sku_id,
od.sku_name,
od.order_price,
od.sku_num,
od.create_time
from order_detail od
join order_info oi
on od.order_id=oi.id
where
DATE_FORMAT(od.create_time,'%Y-%m-%d')='$db_date'"

import_data "activity_order" "select
id,
activity_id,
order_id,
create_time
from activity_order
where
date_format(create_time,'%Y-%m-%d')='$db_date'"

import_data "comment_info" "select
id,
user_id,
sku_id,
spu_id,
order_id,
appraise,
comment_txt,
create_time
from comment_info
where date_format(create_time,'%Y-%m-%d')='$db_date'"
  • 增量及變化表程式碼片段
import_data "coupon_use" "select
id,
coupon_id,
user_id,
order_id,
coupon_status,
get_time,
using_time,
used_time
from coupon_use
where (date_format(get_time,'%Y-%m-%d')='$db_date'
or date_format(using_time,'%Y-%m-%d')='$db_date'
or date_format(used_time,'%Y-%m-%d')='$db_date')"

import_data "user_info" "select
id,
name,
birthday,
gender,
email,
user_level,
create_time,
operate_time
from user_info
where (DATE_FORMAT(create_time,'%Y-%m-%d')='$db_date'
or DATE_FORMAT(operate_time,'%Y-%m-%d')='$db_date')"

import_data "order_info" "select
id,
final_total_amount,
order_status,
user_id,
out_trade_no,
create_time,
operate_time,
province_id,
benefit_reduce_amount,
original_total_amount,
feight_fee
from order_info
where (date_format(create_time,'%Y-%m-%d')='$db_date'
or date_format(operate_time,'%Y-%m-%d')='$db_date')"

8.3 ods層數據載入

  • 腳本修改$table_name即可

注意2張埋點日誌表的數據導出目錄

#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ods
table_name=base_dic
hive_table_name=$APP2.mall__$table_name

# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
load data inpath '/origin_data/$APP1/$table_name/$db_date' OVERWRITE into table $hive_table_name partition(dt='$db_date');
"
$hive -e "$sql"

9 dwd層構建

9.1 dwd層構建(啟動-事件日誌)



9.1.1 啟動日誌表

  • 建表
drop table if exists dwd.mall__start_log

CREATE EXTERNAL TABLE `dwd.mall__start_log`(
  `mid_id` string COMMENT '設備唯一標識',
  `user_id` string COMMENT '用戶標識',
  `version_code` string COMMENT '程式版本號',
  `version_name` string COMMENT '程式版本名',
  `lang` string COMMENT '系統語言',
  `source` string COMMENT '渠道號',
  `os` string COMMENT '系統版本',
  `area` string COMMENT '區域',
  `model` string COMMENT '手機型號',
  `brand` string COMMENT '手機品牌',
  `sdk_version` string COMMENT 'sdkVersion',
  `gmail` string COMMENT 'gmail',
  `height_width` string COMMENT '螢幕寬高',
  `app_time` string COMMENT '客戶端日誌產生時的時間',
  `network` string COMMENT '網路模式',
  `lng` string COMMENT '經度',
  `lat` string COMMENT '緯度',
  `entry` string COMMENT '入口: push=1,widget=2,icon=3,notification=4,lockscreen_widget=5',
  `open_ad_type` string COMMENT '開屏廣告類型: 開屏原生廣告=1, 開屏插屏廣告=2',
  `action` string COMMENT '狀態:成功=1 失敗=2',
  `loading_time` string COMMENT '載入時長',
  `detail` string COMMENT '失敗碼',
  `extend1` string COMMENT '失敗的 message'
  ) COMMENT '啟動日誌表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/start_log/'
tblproperties ("parquet.compression"="snappy") 
  • 數據導入
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
APP3=ods
table_name=start_log
hive_table_name=$APP2.mall__$table_name
hive_origin_table_name=$APP3.mall__$table_name

# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
	get_json_object(line,'$.mid') mid_id,
	get_json_object(line,'$.uid') user_id,
	get_json_object(line,'$.vc') version_code,
	get_json_object(line,'$.vn') version_name,
	get_json_object(line,'$.l') lang,
	get_json_object(line,'$.sr') source,
	get_json_object(line,'$.os') os,
	get_json_object(line,'$.ar') area,
	get_json_object(line,'$.md') model,
	get_json_object(line,'$.ba') brand,
	get_json_object(line,'$.sv') sdk_version,
	get_json_object(line,'$.g') gmail,
	get_json_object(line,'$.hw') height_width,
	get_json_object(line,'$.t') app_time,
	get_json_object(line,'$.nw') network,
	get_json_object(line,'$.ln') lng,
	get_json_object(line,'$.la') lat,
	get_json_object(line,'$.entry') entry,
	get_json_object(line,'$.open_ad_type') open_ad_type,
	get_json_object(line,'$.action') action,
	get_json_object(line,'$.loading_time') loading_time,
	get_json_object(line,'$.detail') detail,
	get_json_object(line,'$.extend1') extend1
from $hive_origin_table_name
where dt='$db_date';
"
$hive -e "$sql"

9.1.2 事件日誌表



  • 建表
drop table if exists dwd.mall__event_log

CREATE EXTERNAL TABLE `dwd.mall__event_log`(
  `mid_id` string COMMENT '設備唯一標識',
  `user_id` string COMMENT '用戶標識',
  `version_code` string COMMENT '程式版本號',
  `version_name` string COMMENT '程式版本名',
  `lang` string COMMENT '系統語言',
  `source` string COMMENT '渠道號',
  `os` string COMMENT '系統版本',
  `area` string COMMENT '區域',
  `model` string COMMENT '手機型號',
  `brand` string COMMENT '手機品牌',
  `sdk_version` string COMMENT 'sdkVersion',
  `gmail` string COMMENT 'gmail',
  `height_width` string COMMENT '螢幕寬高',
  `app_time` string COMMENT '客戶端日誌產生時的時間',
  `network` string COMMENT '網路模式',
  `lng` string COMMENT '經度',
  `lat` string COMMENT '緯度',
  `event_name` string COMMENT '事件名稱',
  `event_json` string COMMENT '事件詳情',
  `server_time` string COMMENT '伺服器時間'
  ) COMMENT '事件日誌表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/event_log/'
tblproperties ("parquet.compression"="snappy") 
9.2.1 製作 UDF UDTF
  • udf


import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.json.JSONException;
import org.json.JSONObject;

public class BaseFieldUDF extends UDF {
    public String evaluate(String line, String key) throws JSONException {
        String[] log = line.split("\\|");
        if (log.length != 2 || StringUtils.isBlank(log[1])) {
            return "";
        }
        JSONObject baseJson = new JSONObject(log[1].trim());
        String result = "";
        // 獲取伺服器時間
        if ("st".equals(key)) {
            result = log[0].trim();
        } else if ("et".equals(key)) {
        // 獲取事件數組
            if (baseJson.has("et")) {
                result = baseJson.getString("et");
            }
        } else {
            JSONObject cm = baseJson.getJSONObject("cm");
        // 獲取 key 對應公共欄位的 value
            if (cm.has(key)) {
                result = cm.getString(key);
            }
        }
        return result;
    }

    public static void main(String[] args) throws JSONException {
        String line = "         1588319303710|{\n" +
                "        \"cm\":{\n" +
                "            \"ln\":\"-51.5\",\"sv\":\"V2.0.7\",\"os\":\"8.0.8\",\"g\":\"[email protected]\",\"mid\":\"13\",\n" +
                "                    \"nw\":\"4G\",\"l\":\"en\",\"vc\":\"7\",\"hw\":\"640*960\",\"ar\":\"MX\",\"uid\":\"13\",\"t\":\"1588291826938\",\n" +
                "                    \"la\":\"-38.2\",\"md\":\"Huawei-14\",\"vn\":\"1.3.6\",\"ba\":\"Huawei\",\"sr\":\"Y\"\n" +
                "        },\n" +
                "        \"ap\":\"app\",\n" +
                "                \"et\":[{\n" +
                "            \"ett\":\"1588228193191\",\"en\":\"ad\",\"kv\":{\"activityId\":\"1\",\"displayMills\":\"113201\",\"entry\":\"3\",\"action\":\"5\",\"contentType\":\"0\"}\n" +
                "        },{\n" +
                "            \"ett\":\"1588300304713\",\"en\":\"notification\",\"kv\":{\"ap_time\":\"1588277440794\",\"action\":\"2\",\"type\":\"3\",\"content\":\"\"}\n" +
                "        },{\n" +
                "            \"ett\":\"1588249203743\",\"en\":\"active_background\",\"kv\":{\"active_source\":\"3\"}\n" +
                "        },{\n" +
                "            \"ett\":\"1588225856101\",\"en\":\"comment\",\"kv\":{\"p_comment_id\":0,\"addtime\":\"1588263895040\",\"praise_count\":231,\"other_id\":5,\"comment_id\":5,\"reply_count\":62,\"userid\":7,\"content\":\"骸汞\"}\n" +
                "        },{\n" +
                "            \"ett\":\"1588254200122\",\"en\":\"favorites\",\"kv\":{\"course_id\":5,\"id\":0,\"add_time\":\"1588264138625\",\"userid\":0}\n" +
                "        },{\n" +
                "            \"ett\":\"1588281152824\",\"en\":\"praise\",\"kv\":{\"target_id\":4,\"id\":3,\"type\":3,\"add_time\":\"1588307696417\",\"userid\":8}\n" +
                "        }]\n" +
                "    }";
        String s = new BaseFieldUDF().evaluate(line, "mid");
        String ss = new BaseFieldUDF().evaluate(line, "st");
        String sss = new BaseFieldUDF().evaluate(line, "et");
        System.out.println(s);
        System.out.println(ss);
        System.out.println(sss);
    }
}

結果:

13
1588319303710
[{“ett”:”1588228193191″,”en”:”ad”,”kv”:{“activityId”:”1″,”displayMills”:”113201″,”entry”:”3″,”action”:”5″,”contentType”:”0″}},{“ett”:”1588300304713″,”en”:”notification”,”kv”:{“ap_time”:”1588277440794″,”action”:”2″,”type”:”3″,”content”:””}},{“ett”:”1588249203743″,”en”:”active_background”,”kv”:{“active_source”:”3″}},{“ett”:”1588225856101″,”en”:”comment”,”kv”:{“p_comment_id”:0,”addtime”:”1588263895040″,”praise_count”:231,”other_id”:5,”comment_id”:5,”reply_count”:62,”userid”:7,”content”:”骸汞”}},{“ett”:”1588254200122″,”en”:”favorites”,”kv”:{“course_id”:5,”id”:0,”add_time”:”1588264138625″,”userid”:0}},{“ett”:”1588281152824″,”en”:”praise”,”kv”:{“target_id”:4,”id”:3,”type”:3,”add_time”:”1588307696417″,”userid”:8}}]

  • udtf


import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.json.JSONArray;
import org.json.JSONException;
import java.util.ArrayList;

public class EventJsonUDTF extends GenericUDTF {
    //該方法中,我們將指定輸出參數的名稱和參數類型:
    public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
        ArrayList<String> fieldNames = new ArrayList<String>();
        ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
        fieldNames.add("event_name");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        fieldNames.add("event_json");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,
                fieldOIs);
    }

    //輸入 1 條記錄,輸出若干條結果
    @Override
    public void process(Object[] objects) throws HiveException {
        // 獲取傳入的 et
        String input = objects[0].toString();
        // 如果傳進來的數據為空,直接返回過濾掉該數據
        if (StringUtils.isBlank(input)) {
            return;
        } else {
            try {
                // 獲取一共有幾個事件(ad/facoriters)
                JSONArray ja = new JSONArray(input);
                if (ja == null)
                    return;
                // 循環遍歷每一個事件
                for (int i = 0; i < ja.length(); i++) {
                    String[] result = new String[2];
                    try {
                        // 取出每個的事件名稱(ad/facoriters)
                        result[0] = ja.getJSONObject(i).getString("en");
                        // 取出每一個事件整體
                        result[1] = ja.getString(i);
                    } catch (JSONException e) {
                        continue;
                    }
                    // 將結果返回
                    forward(result);
                }
            } catch (JSONException e) {
                e.printStackTrace();
            }
        }
    }

    //當沒有記錄處理的時候該方法會被調用,用來清理程式碼或者產生額外的輸出
    @Override
    public void close() throws HiveException {
    }
}
9.1.2.2 直接永久使用UDF
  • 上傳UDF資源

將hive-function-1.0-SNAPSHOT包傳到HDFS 的/user/hive/jars下

hadoop dfs -mkdir  /user/hive/jars
hadoop dfs -put hive-function-1.0-SNAPSHOT.jar /user/hive/jars/hive-function-1.0-SNAPSHOT.jar

在hive中創建永久UDF

create function base_analizer as 'com.heaton.bigdata.udf.BaseFieldUDF' using jar 'hdfs://cdh01.cm:8020/user/hive/jars/hive-function-1.0-SNAPSHOT.jar';

create function flat_analizer as 'com.heaton.bigdata.udtf.EventJsonUDTF' using jar 'hdfs://cdh01.cm:8020/user/hive/jars/hive-function-1.0-SNAPSHOT.jar';
9.1.2.3 Dolphin使用方式UDF

在DAG圖創建SQL工具中選擇對應UDF函數即可使用,但是目前Dolphin1.2.0中關聯函數操作保存無效。

大家可以使用UDF管理功能將JAR傳入到HDFS上,這樣通過腳本加入臨時函數,也可以很好的完成功能。

臨時函數語句:

create temporary function base_analizer as 'com.heaton.bigdata.udf.BaseFieldUDF' using jar 'hdfs://cdh01.cm:8020/dolphinscheduler/dolphinscheduler/udfs/hive-function-1.0-SNAPSHOT.jar';
create temporary function flat_analizer as 'com.heaton.bigdata.udtf.EventJsonUDTF' using jar 'hdfs://cdh01.cm:8020/dolphinscheduler/dolphinscheduler/udfs/hive-function-1.0-SNAPSHOT.jar';
9.2.4 數據導入
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
APP3=ods
table_name=event_log
hive_table_name=$APP2.mall__$table_name
hive_origin_table_name=$APP3.mall__$table_name

# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
	base_analizer(line,'mid') as mid_id,
	base_analizer(line,'uid') as user_id,
	base_analizer(line,'vc') as version_code,
	base_analizer(line,'vn') as version_name,
	base_analizer(line,'l') as lang,
	base_analizer(line,'sr') as source,
	base_analizer(line,'os') as os,
	base_analizer(line,'ar') as area,
	base_analizer(line,'md') as model,
	base_analizer(line,'ba') as brand,
	base_analizer(line,'sv') as sdk_version,
	base_analizer(line,'g') as gmail,
	base_analizer(line,'hw') as height_width,
	base_analizer(line,'t') as app_time,
	base_analizer(line,'nw') as network,
	base_analizer(line,'ln') as lng,
	base_analizer(line,'la') as lat,
	event_name,
	event_json,
	base_analizer(line,'st') as server_time
from $hive_origin_table_name lateral view flat_analizer(base_analizer(line,'et')) tmp_flat as event_name,event_json
where dt='$db_date' and base_analizer(line,'et')<>'';
"
$hive -e "$sql"

9.1.3 商品點擊表

  • 建表
drop table if exists dwd.mall__display_log

CREATE EXTERNAL TABLE `dwd.mall__display_log`(
	`mid_id` string,
	`user_id` string,
	`version_code` string,
	`version_name` string,
	`lang` string,
	`source` string,
	`os` string,
	`area` string,
	`model` string,
	`brand` string,
	`sdk_version` string,
	`gmail` string,
	`height_width` string,
	`app_time` string,
	`network` string,
	`lng` string,
	`lat` string,
	`action` string,
	`goodsid` string,
	`place` string,
	`extend1` string,
	`category` string,
	`server_time` string
  ) COMMENT '商品點擊表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/display_log/'
tblproperties ("parquet.compression"="snappy") 
  • 數據導入
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
APP3=ods
table_name=display_log
hive_table_name=$APP2.mall__$table_name
hive_origin_table_name=$APP3.mall__$table_name

# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
	select
	mid_id,
	user_id,
	version_code,
	version_name,
	lang,
	source,
	os,
	area,
	model,
	brand,
	sdk_version,
	gmail,
	height_width,
	app_time,
	network,
	lng,
	lat,
	get_json_object(event_json,'$.kv.action') action,
	get_json_object(event_json,'$.kv.goodsid') goodsid,
	get_json_object(event_json,'$.kv.place') place,
	get_json_object(event_json,'$.kv.extend1') extend1,
	get_json_object(event_json,'$.kv.category') category,
	server_time
from dwd.mall__event_log
where dt='$db_date' and event_name='display';
"
$hive -e "$sql"

9.1.4 商品列表表

  • 建表
drop table if exists dwd.mall__loading_log

CREATE EXTERNAL TABLE `dwd.mall__loading_log`(
	`mid_id` string,
	`user_id` string,
	`version_code` string,
	`version_name` string,
	`lang` string,
	`source` string,
	`os` string,
	`area` string,
	`model` string,
	`brand` string,
	`sdk_version` string,
	`gmail` string,
	`height_width` string,
	`app_time` string,
	`network` string,
	`lng` string,
	`lat` string,
	`action` string,
	`loading_time` string,
	`loading_way` string,
	`extend1` string,
	`extend2` string,
	`type` string,
	`type1` string,
	`server_time` string
  ) COMMENT '商品列表表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/loading_log/'
tblproperties ("parquet.compression"="snappy") 
  • 數據導入
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
APP3=ods
table_name=loading_log
hive_table_name=$APP2.mall__$table_name
hive_origin_table_name=$APP3.mall__$table_name

# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
	mid_id,
	user_id,
	version_code,
	version_name,
	lang,
	source,
	os,
	area,
	model,
	brand,
	sdk_version,
	gmail,
	height_width,
	app_time,
	network,
	lng,
	lat,
	get_json_object(event_json,'$.kv.action') action,
	get_json_object(event_json,'$.kv.loading_time') loading_time,
	get_json_object(event_json,'$.kv.loading_way') loading_way,
	get_json_object(event_json,'$.kv.extend1') extend1,
	get_json_object(event_json,'$.kv.extend2') extend2,
	get_json_object(event_json,'$.kv.type') type,
	get_json_object(event_json,'$.kv.type1') type1,
	server_time
from dwd.mall__event_log
where dt='$db_date' and event_name='loading';
"
$hive -e "$sql"

9.1.5 廣告表

  • 建表
drop table if exists dwd.mall__ad_log

CREATE EXTERNAL TABLE `dwd.mall__ad_log`(
	`mid_id` string,
	`user_id` string,
	`version_code` string,
	`version_name` string,
	`lang` string,
	`source` string,
	`os` string,
	`area` string,
	`model` string,
	`brand` string,
	`sdk_version` string,
	`gmail` string,
	`height_width` string,
	`app_time` string,
	`network` string,
	`lng` string,
	`lat` string,
	`entry` string,
	`action` string,
	`contentType` string,
	`displayMills` string,
	`itemId` string,
	`activityId` string,
	`server_time` string
  ) COMMENT '廣告表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/ad_log/'
tblproperties ("parquet.compression"="snappy") 
  • 數據導入
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
APP3=ods
table_name=ad_log
hive_table_name=$APP2.mall__$table_name
hive_origin_table_name=$APP3.mall__$table_name

# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
	mid_id,
	user_id,
	version_code,
	version_name,
	lang,
	source,
	os,
	area,
	model,
	brand,
	sdk_version,
	gmail,
	height_width,
	app_time,
	network,
	lng,
	lat,
	get_json_object(event_json,'$.kv.entry') entry,
	get_json_object(event_json,'$.kv.action') action,
	get_json_object(event_json,'$.kv.contentType') contentType,
	get_json_object(event_json,'$.kv.displayMills') displayMills,
	get_json_object(event_json,'$.kv.itemId') itemId,
	get_json_object(event_json,'$.kv.activityId') activityId,
	server_time
from dwd.mall__event_log
where dt='db_date' and event_name='ad';
"
$hive -e "$sql"

9.1.6 消息通知表

  • 建表
drop table if exists dwd.mall__notification_log

CREATE EXTERNAL TABLE `dwd.mall__notification_log`(
	`mid_id` string,
	`user_id` string,
	`version_code` string,
	`version_name` string,
	`lang` string,
	`source` string,
	`os` string,
	`area` string,
	`model` string,
	`brand` string,
	`sdk_version` string,
	`gmail` string,
	`height_width` string,
	`app_time` string,
	`network` string,
	`lng` string,
	`lat` string,
	`action` string,
	`noti_type` string,
	`ap_time` string,
	`content` string,
	`server_time` string
  ) COMMENT '消息通知表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/notification_log/'
tblproperties ("parquet.compression"="snappy") 
  • 數據導入
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
APP3=ods
table_name=notification_log
hive_table_name=$APP2.mall__$table_name
hive_origin_table_name=$APP3.mall__$table_name

# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
	mid_id,
	user_id,
	version_code,
	version_name,
	lang,
	source,
	os,
	area,
	model,
	brand,
	sdk_version,
	gmail,
	height_width,
	app_time,
	network,
	lng,
	lat,
	get_json_object(event_json,'$.kv.action') action,
	get_json_object(event_json,'$.kv.noti_type') noti_type,
	get_json_object(event_json,'$.kv.ap_time') ap_time,
	get_json_object(event_json,'$.kv.content') content,
	server_time
from dwd.mall__event_log
where dt='$db_date' and event_name='notification';
"
$hive -e "$sql"

9.1.7 用戶後台活躍表

  • 建表
drop table if exists dwd.mall__active_background_log

CREATE EXTERNAL TABLE `dwd.mall__active_background_log`(
	`mid_id` string,
	`user_id` string,
	`version_code` string,
	`version_name` string,
	`lang` string,
	`source` string,
	`os` string,
	`area` string,
	`model` string,
	`brand` string,
	`sdk_version` string,
	`gmail` string,
	`height_width` string,
	`app_time` string,
	`network` string,
	`lng` string,
	`lat` string,
	`active_source` string,
	`server_time` string
  ) COMMENT '用戶後台活躍表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/active_background_log/'
tblproperties ("parquet.compression"="snappy") 
  • 數據導入
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
APP3=ods
table_name=active_background_log
hive_table_name=$APP2.mall__$table_name
hive_origin_table_name=$APP3.mall__$table_name

# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
	mid_id,
	user_id,
	version_code,
	version_name,
	lang,
	source,
	os,
	area,
	model,
	brand,
	sdk_version,
	gmail,
	height_width,
	app_time,
	network,
	lng,
	lat,
	get_json_object(event_json,'$.kv.active_source') active_source,
	server_time
from dwd.mall__event_log
where dt='$db_date' and event_name='active_background';
"
$hive -e "$sql"

9.1.8 評論表

  • 建表
drop table if exists dwd.mall__comment_log

CREATE EXTERNAL TABLE `dwd.mall__comment_log`(
	`mid_id` string,
	`user_id` string,
	`version_code` string,
	`version_name` string,
	`lang` string,
	`source` string,
	`os` string,
	`area` string,
	`model` string,
	`brand` string,
	`sdk_version` string,
	`gmail` string,
	`height_width` string,
	`app_time` string,
	`network` string,
	`lng` string,
	`lat` string,
	`comment_id` int,
	`userid` int,
	`p_comment_id` int,
	`content` string,
	`addtime` string,
	`other_id` int,
	`praise_count` int,
	`reply_count` int,
	`server_time` string
  ) COMMENT '評論表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/comment_log/'
tblproperties ("parquet.compression"="snappy")  
  • 數據導入
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
APP3=ods
table_name=comment_log
hive_table_name=$APP2.mall__$table_name
hive_origin_table_name=$APP3.mall__$table_name

# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
	mid_id,
	user_id,
	version_code,
	version_name,
	lang,
	source,
	os,
	area,
	model,
	brand,
	sdk_version,
	gmail,
	height_width,
	app_time,
	network,
	lng,
	lat,
	get_json_object(event_json,'$.kv.comment_id') comment_id,
	get_json_object(event_json,'$.kv.userid') userid,
	get_json_object(event_json,'$.kv.p_comment_id') p_comment_id,
	get_json_object(event_json,'$.kv.content') content,
	get_json_object(event_json,'$.kv.addtime') addtime,
	get_json_object(event_json,'$.kv.other_id') other_id,
	get_json_object(event_json,'$.kv.praise_count') praise_count,
	get_json_object(event_json,'$.kv.reply_count') reply_count,
	server_time
from dwd.mall__event_log
where dt='$db_date' and event_name='comment';
"
$hive -e "$sql"

9.1.9 收藏表

  • 建表
drop table if exists dwd.mall__favorites_log

CREATE EXTERNAL TABLE `dwd.mall__favorites_log`(
	`mid_id` string,
	`user_id` string,
	`version_code` string,
	`version_name` string,
	`lang` string,
	`source` string,
	`os` string,
	`area` string,
	`model` string,
	`brand` string,
	`sdk_version` string,
	`gmail` string,
	`height_width` string,
	`app_time` string,
	`network` string,
	`lng` string,
	`lat` string,
	`id` int,
	`course_id` int,
	`userid` int,
	`add_time` string,
	`server_time` string
  ) COMMENT '收藏表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/favorites_log/'
tblproperties ("parquet.compression"="snappy") 
  • 數據導入
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
APP3=ods
table_name=favorites_log
hive_table_name=$APP2.mall__$table_name
hive_origin_table_name=$APP3.mall__$table_name

# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
	mid_id,
	user_id,
	version_code,
	version_name,
	lang,
	source,
	os,
	area,
	model,
	brand,
	sdk_version,
	gmail,
	height_width,
	app_time,
	network,
	lng,
	lat,
	get_json_object(event_json,'$.kv.id') id,
	get_json_object(event_json,'$.kv.course_id') course_id,
	get_json_object(event_json,'$.kv.userid') userid,
	get_json_object(event_json,'$.kv.add_time') add_time,
	server_time
from dwd.mall__event_log
where dt='$db_date' and event_name='favorites';
"
$hive -e "$sql"

9.1.10 點贊表

  • 建表
drop table if exists dwd.mall__praise_log

CREATE EXTERNAL TABLE `dwd.mall__praise_log`(
	`mid_id` string,
	`user_id` string,
	`version_code` string,
	`version_name` string,
	`lang` string,
	`source` string,
	`os` string,
	`area` string,
	`model` string,
	`brand` string,
	`sdk_version` string,
	`gmail` string,
	`height_width` string,
	`app_time` string,
	`network` string,
	`lng` string,
	`lat` string,
	`id` string,
	`userid` string,
	`target_id` string,
	`type` string,
	`add_time` string,
	`server_time` string
  ) COMMENT '點贊表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/praise_log/'
tblproperties ("parquet.compression"="snappy") 
  • 數據導入
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
APP3=ods
table_name=praise_log
hive_table_name=$APP2.mall__$table_name
hive_origin_table_name=$APP3.mall__$table_name

# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
	mid_id,
	user_id,
	version_code,
	version_name,
	lang,
	source,
	os,
	area,
	model,
	brand,
	sdk_version,
	gmail,
	height_width,
	app_time,
	network,
	lng,
	lat,
	get_json_object(event_json,'$.kv.id') id,
	get_json_object(event_json,'$.kv.userid') userid,
	get_json_object(event_json,'$.kv.target_id') target_id,
	get_json_object(event_json,'$.kv.type') type,
	get_json_object(event_json,'$.kv.add_time') add_time,
	server_time
from dwd.mall__event_log
where dt='$db_date' and event_name='praise';
"
$hive -e "$sql"

9.1.11 錯誤日誌表

  • 建表
drop table if exists dwd.mall__error_log

CREATE EXTERNAL TABLE `dwd.mall__error_log`(
	`mid_id` string,
	`user_id` string,
	`version_code` string,
	`version_name` string,
	`lang` string,
	`source` string,
	`os` string,
	`area` string,
	`model` string,
	`brand` string,
	`sdk_version` string,
	`gmail` string,
	`height_width` string,
	`app_time` string,
	`network` string,
	`lng` string,
	`lat` string,
	`errorBrief` string,
	`errorDetail` string,
	`server_time` string
  ) COMMENT '錯誤日誌表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/error_log/'
tblproperties ("parquet.compression"="snappy") 
  • 數據導入
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
APP3=ods
table_name=error_log
hive_table_name=$APP2.mall__$table_name
hive_origin_table_name=$APP3.mall__$table_name

# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
	mid_id,
	user_id,
	version_code,
	version_name,
	lang,
	source,
	os,
	area,
	model,
	brand,
	sdk_version,
	gmail,
	height_width,
	app_time,
	network,
	lng,
	lat,
	get_json_object(event_json,'$.kv.errorBrief') errorBrief,
	get_json_object(event_json,'$.kv.errorDetail') errorDetail,
	server_time
from dwd.mall__event_log
where dt='$db_date' and event_name='error';
"
$hive -e "$sql"

9.2 dwd層構建(業務庫)

此層在構建之初,增量表需要動態分區來劃分時間,將數據放入指定分區

事實/維度 時間 用戶 地區 商品 優惠卷 活動 編碼 度量
訂單 件數/金額
訂單詳情 件數/金額
支付 次數/金額
加入購物車 件數/金額
收藏 個數
評價 個數
退款 件數/金額
優惠卷領用 個數

9.2.1 商品維度表(全量)

  • 建表
drop table if exists dwd.mall__dim_sku_info
    
CREATE EXTERNAL TABLE `dwd.mall__dim_sku_info`(
`id` string COMMENT '商品 id',
`spu_id` string COMMENT 'spuid',
`price` double COMMENT '商品價格',
`sku_name` string COMMENT '商品名稱',
`sku_desc` string COMMENT '商品描述',
`weight` double COMMENT '重量',
`tm_id` string COMMENT '品牌 id',
`tm_name` string COMMENT '品牌名稱',
`category3_id` string COMMENT '三級分類 id',
`category2_id` string COMMENT '二級分類 id',
`category1_id` string COMMENT '一級分類 id',
`category3_name` string COMMENT '三級分類名稱',
`category2_name` string COMMENT '二級分類名稱',
`category1_name` string COMMENT '一級分類名稱',
`spu_name` string COMMENT 'spu 名稱',
`create_time` string COMMENT '創建時間'
  ) COMMENT '商品維度表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/dim_sku_info/'
tblproperties ("parquet.compression"="snappy") 
  • 數據導入
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=dim_sku_info
hive_table_name=$APP2.mall__$table_name


# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
	sku.id,
	sku.spu_id,
	sku.price,
	sku.sku_name,
	sku.sku_desc,
	sku.weight,
	sku.tm_id,
	ob.tm_name,
	sku.category3_id,
	c2.id category2_id,
	c1.id category1_id,
	c3.name category3_name,
	c2.name category2_name,
	c1.name category1_name,
	spu.spu_name,
	from_unixtime(cast(sku.create_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') create_time
from
(
select * from ods.mall__sku_info where dt='$db_date'
)sku
join
(
select * from ods.mall__base_trademark where dt='$db_date'
)ob on sku.tm_id=ob.tm_id
join
(
select * from ods.mall__spu_info where dt='$db_date'
)spu on spu.id = sku.spu_id
join
(
select * from ods.mall__base_category3 where dt='$db_date'
)c3 on sku.category3_id=c3.id
join
(
select * from ods.mall__base_category2 where dt='$db_date'
)c2 on c3.category2_id=c2.id
join
(
select * from ods.mall__base_category1 where dt='$db_date'
)c1 on c2.category1_id=c1.id;
"
$hive -e "$sql"

9.2.2 優惠券資訊維度表(全量)

  • 建表
drop table if exists dwd.mall__dim_coupon_info
    
CREATE EXTERNAL TABLE `dwd.mall__dim_coupon_info`(
`id` string COMMENT '購物券編號',
`coupon_name` string COMMENT '購物券名稱',
`coupon_type` string COMMENT '購物券類型 1 現金券 2 折扣券 3 滿減券 4 滿件打折券',
`condition_amount` string COMMENT '滿額數',
`condition_num` string COMMENT '滿件數',
`activity_id` string COMMENT '活動編號',
`benefit_amount` string COMMENT '減金額',
`benefit_discount` string COMMENT '折扣',
`create_time` string COMMENT '創建時間',
`range_type` string COMMENT '範圍類型 1、商品 2、品類 3、品牌',
`spu_id` string COMMENT '商品 id',
`tm_id` string COMMENT '品牌 id',
`category3_id` string COMMENT '品類 id',
`limit_num` string COMMENT '最多領用次數',
`operate_time` string COMMENT '修改時間',
`expire_time` string COMMENT '過期時間'
  ) COMMENT '優惠券資訊維度表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/dim_coupon_info/'
tblproperties ("parquet.compression"="snappy") 
  • 數據導入
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=dim_coupon_info
hive_table_name=$APP2.mall__$table_name


# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
	id,
	coupon_name,
	coupon_type,
	condition_amount,
	condition_num,
	activity_id,
	benefit_amount,
	benefit_discount,
	from_unixtime(cast(create_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') create_time,
	range_type,
	spu_id,
	tm_id,
	category3_id,
	limit_num,
	from_unixtime(cast(operate_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') operate_time,
	from_unixtime(cast(expire_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') expire_time
from ods.mall__coupon_info
where dt='$db_date';
"
$hive -e "$sql"

9.2.3 活動維度表(全量)

  • 建表
drop table if exists dwd.mall__dim_activity_info
    
CREATE EXTERNAL TABLE `dwd.mall__dim_activity_info`(
`id` string COMMENT '編號',
`activity_name` string COMMENT '活動名稱',
`activity_type` string COMMENT '活動類型',
`condition_amount` string COMMENT '滿減金額',
`condition_num` string COMMENT '滿減件數',
`benefit_amount` string COMMENT '優惠金額',
`benefit_discount` string COMMENT '優惠折扣',
`benefit_level` string COMMENT '優惠級別',
`start_time` string COMMENT '開始時間',
`end_time` string COMMENT '結束時間',
`create_time` string COMMENT '創建時間'
  ) COMMENT '活動維度表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/dim_activity_info/'
tblproperties ("parquet.compression"="snappy") 
  • 數據導入
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=dim_activity_info
hive_table_name=$APP2.mall__$table_name


# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
	info.id,
	info.activity_name,
	info.activity_type,
	rule.condition_amount,
	rule.condition_num,
	rule.benefit_amount,
	rule.benefit_discount,
	rule.benefit_level,
	from_unixtime(cast(info.start_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') start_time,
	from_unixtime(cast(info.end_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') end_time,
	from_unixtime(cast(info.create_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') create_time
from
(
select * from ods.mall__activity_info where dt='$db_date'
)info
left join
(
select * from ods.mall__activity_rule where dt='$db_date'
)rule on info.id = rule.activity_id;
"
$hive -e "$sql"

9.2.4 地區維度表(特殊)

  • 建表
drop table if exists dwd.mall__dim_base_province
    
CREATE EXTERNAL TABLE `dwd.mall__dim_base_province`(
`id` string COMMENT 'id',
`province_name` string COMMENT '省市名稱',
`area_code` string COMMENT '地區編碼',
`iso_code` string COMMENT 'ISO 編碼',
`region_id` string COMMENT '地區 id',
`region_name` string COMMENT '地區名稱'
  ) COMMENT '地區維度表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/dim_base_province/'
tblproperties ("parquet.compression"="snappy")
  • 數據導入
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=dim_base_province
hive_table_name=$APP2.mall__$table_name


# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
	bp.id,
	bp.name,
	bp.area_code,
	bp.iso_code,
	bp.region_id,
	br.region_name
from ods.mall__base_province bp
join ods.mall__base_region br
on bp.region_id=br.id;
"
$hive -e "$sql"

9.2.5 時間維度表(特殊)

  • 建表
drop table if exists dwd.mall__dim_date_info
    
CREATE EXTERNAL TABLE `dwd.mall__dim_date_info`(
`date_id` string COMMENT '日',
`week_id` int COMMENT '周',
`week_day` int COMMENT '周的第幾天',
`day` int COMMENT '每月的第幾天',
`month` int COMMENT '第幾月',
`quarter` int COMMENT '第幾季度',
`year` int COMMENT '年',
`is_workday` int COMMENT '是否是周末',
`holiday_id` int COMMENT '是否是節假日'
  ) COMMENT '時間維度表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/dim_date_info/'
tblproperties ("parquet.compression"="snappy") 
  • 數據導入
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=dim_date_info
hive_table_name=$APP2.mall__$table_name


# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
	date_id,
	week_id,
	week_day,
	day,
	month,
	quarter,
	year,
	is_workday,
	holiday_id
from ods.mall__date_info
"
$hive -e "$sql"

9.2.6 用戶維度表(新增及變化-緩慢變化維-拉鏈表)

9.2.6.1 拉鏈表介紹

拉鏈表,記錄每條資訊的生命周期,一旦一條記錄的生命周期結束,就重新開始一條新的記錄,並把當前日期放入生效開始日期。

如果當前資訊至今有效,在生效結束日期中填入一個極大值(如:9999-99-99),下表為張三的手機號變化例子

用戶ID 姓名 手機號 開始日期 結束日期
1 張三 134XXXX5050 2019-01-01 2019-01-02
1 張三 139XXXX3232 2019-01-03 2020-01-01
1 張三 137XXXX7676 2020-01-02 9999-99-99
  • 適合場景:數據會發生變化,但是大部分不變(即:緩慢變化維)

比如:用戶資訊發生變化,但是每天變化比例不高,按照每日全量,則效率低

  • 如何使用拉鏈表:通過–>生效開始日期<=某個日期 且 生效結束日期>=某個日期,能夠得到某個時間點的數據全量切片。

  • 拉鏈表形成過程

  • 製作流程

用戶當日全部數據MySQL中每天變化的數據拼接在一起,形成一個<新的臨時拉鏈表

臨時拉鏈表覆蓋舊的拉鏈表數據

從而解決Hive中數據不能更新的問題

9.2.6.2 用戶維度表

用戶表中的數據每日既有可能新增,也有可能修改,屬於緩慢變化維度,此處採用拉鏈表存儲用戶維度數據。

  • 建表
drop table if exists dwd.mall__dim_user_info_his
    
CREATE EXTERNAL TABLE `dwd.mall__dim_user_info_his`(
`id` string COMMENT '用戶 id',
`name` string COMMENT '姓名',
`birthday` string COMMENT '生日',
`gender` string COMMENT '性別',
`email` string COMMENT '郵箱',
`user_level` string COMMENT '用戶等級',
`create_time` string COMMENT '創建時間',
`operate_time` string COMMENT '操作時間',
`start_date` string COMMENT '有效開始日期',
`end_date` string COMMENT '有效結束日期'
  ) COMMENT '用戶拉鏈表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/dim_user_info_his/'
tblproperties ("parquet.compression"="snappy") 
  • 臨時表建表(結構與主表相同)
drop table if exists dwd.mall__dim_user_info_his_tmp
    
CREATE EXTERNAL TABLE `dwd.mall__dim_user_info_his_tmp`(
`id` string COMMENT '用戶 id',
`name` string COMMENT '姓名',
`birthday` string COMMENT '生日',
`gender` string COMMENT '性別',
`email` string COMMENT '郵箱',
`user_level` string COMMENT '用戶等級',
`create_time` string COMMENT '創建時間',
`operate_time` string COMMENT '操作時間',
`start_date` string COMMENT '有效開始日期',
`end_date` string COMMENT '有效結束日期'
  ) COMMENT '用戶拉鏈表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/dim_user_info_his_tmp/'
tblproperties ("parquet.compression"="snappy") 
  • 首先(主表)數據初始化,只做一次
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=dim_user_info_his
hive_table_name=$APP2.mall__$table_name


# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert overwrite table $hive_table_name
select
	id,
	name,
	from_unixtime(cast(birthday/1000 as bigint),'yyyy-MM-dd HH:mm:ss') birthday,
	gender,
	email,
	user_level,
	from_unixtime(cast(create_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') create_time,
	from_unixtime(cast(operate_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') operate_time,
	'$db_date',
	'9999-99-99'
from ods.mall__user_info oi
where oi.dt='$db_date';
"
$hive -e "$sql"
  • 臨時表數據計算導入(在主表數據之後執行)
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=dim_user_info_his_tmp
hive_table_name=$APP2.mall__$table_name


# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert overwrite table $hive_table_name

select
* 
from
(      --查詢當前時間的所有資訊
	select
		cast(id as string) id,
		name,
		from_unixtime(cast(birthday/1000 as bigint),'yyyy-MM-dd HH:mm:ss') birthday,
		gender,
		email,
		user_level,
		from_unixtime(cast(create_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') create_time,
		from_unixtime(cast(operate_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') operate_time,
		'$db_date' start_date,
		'9999-99-99' end_date
	from ods.mall__user_info where dt='$db_date'
	union all
	 --查詢當前變化了的數據,修改日期
	select
		uh.id,
		uh.name,
		from_unixtime(cast(uh.birthday/1000 as bigint),'yyyy-MM-dd HH:mm:ss') birthday,
		uh.gender,
		uh.email,
		uh.user_level,
		from_unixtime(cast(uh.create_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') create_time,
		from_unixtime(cast(uh.operate_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') operate_time,
		uh.start_date,
		if(ui.id is not null and uh.end_date='9999-99-99', date_add(ui.dt,-1),uh.end_date) end_date
	from dwd.mall__dim_user_info_his uh left join
	(
        --查詢當前時間的所有資訊
	select
		cast(id as string) id,
		name,
		from_unixtime(cast(birthday/1000 as bigint),'yyyy-MM-dd HH:mm:ss') birthday,
		gender,
		email,
		user_level,
		from_unixtime(cast(create_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') create_time,
		from_unixtime(cast(operate_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') operate_time,
		dt
	from ods.mall__user_info
	where dt='$db_date'
	) ui on uh.id=ui.id
)his
order by his.id, start_date;
"
$hive -e "$sql"
  • 數據導入
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=dim_user_info_his
hive_table_name=$APP2.mall__$table_name


# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert overwrite table $hive_table_name
select * from dwd.mall__dim_user_info_his_tmp;
"
$hive -e "$sql"

9.2.7 訂單詳情事實表(事務型快照事實表-新增)

  • 建表
drop table if exists dwd.mall__fact_order_detail
    
CREATE EXTERNAL TABLE `dwd.mall__fact_order_detail`(
  `id` bigint COMMENT '編號',
  `order_id` bigint COMMENT '訂單編號',
  `user_id` bigint COMMENT '用戶id',
  `sku_id` bigint COMMENT 'sku_id',
  `sku_name` string COMMENT 'sku名稱',
  `order_price` decimal(10,2) COMMENT '購買價格(下單時sku價格)',
  `sku_num` string COMMENT '購買個數',
  `create_time` bigint COMMENT '創建時間',
  `province_id` string COMMENT '省份ID',
  `total_amount` decimal(20,2) COMMENT '訂單總金額'
  ) COMMENT '訂單明細表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/fact_order_detail/'
tblproperties ("parquet.compression"="snappy") 
  • 數據導入
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=fact_order_detail
hive_table_name=$APP2.mall__$table_name


# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
	od.id, 
	od.order_id, 
	od.user_id, 
	od.sku_id, 
	od.sku_name, 
	od.order_price, 
	od.sku_num, 
	od.create_time, 
	oi.province_id, 
	od.order_price*od.sku_num 
from (select * from ods.mall__order_detail where dt='$db_date' ) od 
join (select * from ods.mall__order_info where dt='$db_date' ) oi 
on od.order_id=oi.id;
"
$hive -e "$sql"

9.2.7 支付事實表(事務型快照事實表-新增)

  • 建表
drop table if exists dwd.mall__fact_payment_info
    
CREATE EXTERNAL TABLE `dwd.mall__fact_payment_info`(
`id` string COMMENT '',
`out_trade_no` string COMMENT '對外業務編號',
`order_id` string COMMENT '訂單編號',
`user_id` string COMMENT '用戶編號',
`alipay_trade_no` string COMMENT '支付寶交易流水編號',
`payment_amount` decimal(16,2) COMMENT '支付金額',
`subject` string COMMENT '交易內容',
`payment_type` string COMMENT '支付類型',
`payment_time` string COMMENT '支付時間',
`province_id` string COMMENT '省份 ID'
  ) COMMENT '支付事實表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/fact_payment_info/'
tblproperties ("parquet.compression"="snappy") 
  • 數據導入
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=fact_payment_info
hive_table_name=$APP2.mall__$table_name


# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
	pi.id,
	pi.out_trade_no,
	pi.order_id,
	pi.user_id,
	pi.alipay_trade_no,
	pi.total_amount,
	pi.subject,
	pi.payment_type,
	from_unixtime(cast(pi.payment_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') payment_time,
	oi.province_id
from
(
select * from ods.mall__payment_info where dt='$db_date'
)pi
join
(
select id, province_id from ods.mall__order_info where dt='$db_date'
)oi
on pi.order_id = oi.id;
"
$hive -e "$sql"

9.2.8 退款事實表(事務型快照事實表-新增)

  • 建表
drop table if exists dwd.mall__fact_order_refund_info
    
CREATE EXTERNAL TABLE `dwd.mall__fact_order_refund_info`(
`id` string COMMENT '編號',
`user_id` string COMMENT '用戶 ID',
`order_id` string COMMENT '訂單 ID',
`sku_id` string COMMENT '商品 ID',
`refund_type` string COMMENT '退款類型',
`refund_num` bigint COMMENT '退款件數',
`refund_amount` decimal(16,2) COMMENT '退款金額',
`refund_reason_type` string COMMENT '退款原因類型',
`create_time` string COMMENT '退款時間'
  ) COMMENT '退款事實表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/fact_order_refund_info/'
tblproperties ("parquet.compression"="snappy") 
  • 數據導入
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=fact_order_refund_info
hive_table_name=$APP2.mall__$table_name


# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
	id,
	user_id,
	order_id,
	sku_id,
	refund_type,
	refund_num,
	refund_amount,
	refund_reason_type,
	from_unixtime(cast(create_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') create_time
from ods.mall__order_refund_info
where dt='$db_date';
"
$hive -e "$sql"

9.2.9 評價事實表(事務型快照事實表-新增)

  • 建表
drop table if exists dwd.mall__fact_comment_info
    
CREATE EXTERNAL TABLE `dwd.mall__fact_comment_info`(
`id` string COMMENT '編號',
`user_id` string COMMENT '用戶 ID',
`sku_id` string COMMENT '商品 sku',
`spu_id` string COMMENT '商品 spu',
`order_id` string COMMENT '訂單 ID',
`appraise` string COMMENT '評價',
`create_time` string COMMENT '評價時間'
  ) COMMENT '評價事實表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/fact_comment_info/'
tblproperties ("parquet.compression"="snappy") 
  • 數據導入
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=fact_comment_info
hive_table_name=$APP2.mall__$table_name


# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
	id,
	user_id,
	sku_id,
	spu_id,
	order_id,
	appraise,
	from_unixtime(cast(create_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') create_time
from ods.mall__comment_info
where dt='$db_date';
"
$hive -e "$sql"

9.2.10 加購事實表(周期型快照事實表-全量)

  • 建表
drop table if exists dwd.mall__fact_cart_info
    
CREATE EXTERNAL TABLE `dwd.mall__fact_cart_info`(
`id` string COMMENT '編號',
`user_id` string COMMENT '用戶 id',
`sku_id` string COMMENT 'skuid',
`cart_price` string COMMENT '放入購物車時價格',
`sku_num` string COMMENT '數量',
`sku_name` string COMMENT 'sku 名稱 (冗餘)',
`create_time` string COMMENT '創建時間',
`operate_time` string COMMENT '修改時間',
`is_ordered` string COMMENT '是否已經下單。1 為已下單;0 為未下單',
`order_time` string COMMENT '下單時間'
  ) COMMENT '加購事實表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/fact_cart_info/'
tblproperties ("parquet.compression"="snappy") 
  • 數據導入
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=fact_cart_info
hive_table_name=$APP2.mall__$table_name


# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
	id,
	user_id,
	sku_id,
	cart_price,
	sku_num,
	sku_name,
	from_unixtime(cast(create_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') create_time,
	from_unixtime(cast(operate_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') operate_time,
	is_ordered,
	from_unixtime(cast(order_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') order_time
from ods.mall__cart_info
where dt='$db_date';
"
$hive -e "$sql"

9.2.11 收藏事實表(周期型快照事實表-全量)

  • 建表
drop table if exists dwd.mall__fact_favor_info
    
CREATE EXTERNAL TABLE `dwd.mall__fact_favor_info`(
`id` string COMMENT '編號',
`user_id` string COMMENT '用戶 id',
`sku_id` string COMMENT 'skuid',
`spu_id` string COMMENT 'spuid',
`is_cancel` string COMMENT '是否取消',
`create_time` string COMMENT '收藏時間',
`cancel_time` string COMMENT '取消時間'
  ) COMMENT '收藏事實表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/fact_favor_info/'
tblproperties ("parquet.compression"="snappy") 
  • 數據導入
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=fact_favor_info
hive_table_name=$APP2.mall__$table_name


# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
	id,
	user_id,
	sku_id,
	spu_id,
	is_cancel,
	from_unixtime(cast(create_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') create_time,
	from_unixtime(cast(cancel_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') cancel_time
from ods.mall__favor_info
where dt='$db_date';
"
$hive -e "$sql"

9.2.12 優惠券領用事實表(累積型快照事實表-新增及變化)

  • 建表
drop table if exists dwd.mall__fact_coupon_use
    
CREATE EXTERNAL TABLE `dwd.mall__fact_coupon_use`(
`` string COMMENT '編號',
`coupon_id` string COMMENT '優惠券 ID',
`user_id` string COMMENT 'userid',
`order_id` string COMMENT '訂單 id',
`coupon_status` string COMMENT '優惠券狀態',
`get_time` string COMMENT '領取時間',
`using_time` string COMMENT '使用時間(下單)',
`used_time` string COMMENT '使用時間(支付)'
  ) COMMENT '優惠券領用事實表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/fact_coupon_use/'
tblproperties ("parquet.compression"="snappy") 

dt 是按照優惠卷領用時間 get_time 做為分區。

get_time 為領用時間,領用過後數據就需要存在,然後在下單和支付的時候疊加更新時間

  • 數據導入
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=fact_coupon_use
hive_table_name=$APP2.mall__$table_name


# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
set hive.exec.dynamic.partition.mode=nonstrict;

insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
	if(new.id is null,old.id,new.id) id,
	if(new.coupon_id is null,old.coupon_id,new.coupon_id) coupon_id,
	if(new.user_id is null,old.user_id,new.user_id) user_id,
	if(new.order_id is null,old.order_id,new.order_id) order_id,
	if(new.coupon_status is null,old.coupon_status,new.coupon_status) coupon_status,
	from_unixtime(cast(if(new.get_time is null,old.get_time,new.get_time)/1000 as bigint),'yyyy-MM-dd') get_time,
	from_unixtime(cast(if(new.using_time is null,old.using_time,new.using_time)/1000 as bigint),'yyyy-MM-dd') using_time,
	from_unixtime(cast(if(new.used_time is null,old.used_time,new.used_time)/1000 as bigint),'yyyy-MM-dd'),
	from_unixtime(cast(if(new.get_time is null,old.get_time,new.get_time)/1000 as bigint),'yyyy-MM-dd') 
from
(
	select
		id,
		coupon_id,
		user_id,
		order_id,
		coupon_status,
		get_time,
		using_time,
		used_time
	from dwd.mall__fact_coupon_use
	where dt in
	(
	select
		from_unixtime(cast(get_time/1000 as bigint),'yyyy-MM-dd')
	from ods.mall__coupon_use
	where dt='$db_date'
	)
)old
full outer join
(
	select
		id,
		coupon_id,
		user_id,
		order_id,
		coupon_status,
		get_time,
		using_time,
		used_time
	from ods.mall__coupon_use
	where dt='$db_date'
)new
on old.id=new.id;
"
$hive -e "$sql"

9.2.13 訂單事實表(累積型快照事實表-新增及變化)

  • 建表
drop table if exists dwd.mall__fact_order_info
    
CREATE EXTERNAL TABLE `dwd.mall__fact_order_info`(
`id` string COMMENT '訂單編號',
`order_status` string COMMENT '訂單狀態',
`user_id` string COMMENT '用戶 id',
`out_trade_no` string COMMENT '支付流水號',
`create_time` string COMMENT '創建時間(未支付狀態)',
`payment_time` string COMMENT '支付時間(已支付狀態)',
`cancel_time` string COMMENT '取消時間(已取消狀態)',
`finish_time` string COMMENT '完成時間(已完成狀態)',
`refund_time` string COMMENT '退款時間(退款中狀態)',
`refund_finish_time` string COMMENT '退款完成時間(退款完成狀態)',
`province_id` string COMMENT '省份 ID',
`activity_id` string COMMENT '活動 ID',
`original_total_amount` string COMMENT '原價金額',
`benefit_reduce_amount` string COMMENT '優惠金額',
`feight_fee` string COMMENT '運費',
`final_total_amount` decimal(10,2) COMMENT '訂單金額'
  ) COMMENT '訂單事實表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/fact_order_info/'
tblproperties ("parquet.compression"="snappy") 

  • 數據導入
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=fact_order_info
hive_table_name=$APP2.mall__$table_name


# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
	if(new.id is null,old.id,new.id),
	if(new.order_status is null,old.order_status,new.order_status),
	if(new.user_id is null,old.user_id,new.user_id),
	if(new.out_trade_no is null,old.out_trade_no,new.out_trade_no),
	if(new.tms['1001'] is null,from_unixtime(cast(old.create_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss'),new.tms['1001']),--1001 對應未支付狀態
	if(new.tms['1002'] is null,from_unixtime(cast(old.payment_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss'),new.tms['1002']),
	if(new.tms['1003'] is null,from_unixtime(cast(old.cancel_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss'),new.tms['1003']),
	if(new.tms['1004'] is null,from_unixtime(cast(old.finish_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss'),new.tms['1004']),
	if(new.tms['1005'] is null,from_unixtime(cast(old.refund_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss'),new.tms['1005']),
	if(new.tms['1006'] is null,from_unixtime(cast(old.refund_finish_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss'),new.tms['1006']),
	if(new.province_id is null,old.province_id,new.province_id),
	if(new.activity_id is null,old.activity_id,new.activity_id),
	if(new.original_total_amount is null,old.original_total_amount,new.original_total_amount),
	if(new.benefit_reduce_amount is null,old.benefit_reduce_amount,new.benefit_reduce_amount),
	if(new.feight_fee is null,old.feight_fee,new.feight_fee),
	if(new.final_total_amount is null,old.final_total_amount,new.final_total_amount)
from
(
	select
		id,
		order_status,
		user_id,
		out_trade_no,
		create_time,
		payment_time,
		cancel_time,
		finish_time,
		refund_time,
		refund_finish_time,
		province_id,
		activity_id,
		original_total_amount,
		benefit_reduce_amount,
		feight_fee,
		final_total_amount
	from dwd.mall__fact_order_info
	where dt in 
	(
	select
		from_unixtime(cast(create_time/1000 as bigint),'yyyy-MM-dd')
	from ods.mall__order_info
	where dt='$db_date'
	)
)old
full outer join
(
	select
		info.id,
		info.order_status,
		info.user_id,
		info.out_trade_no,
		info.province_id,
		act.activity_id,
		log.tms,
		info.original_total_amount,
		info.benefit_reduce_amount,
		info.feight_fee,
		info.final_total_amount
	from
	(
	select
		order_id,
		str_to_map(concat_ws(',',collect_set(concat(order_status,'=',from_unixtime(cast(operate_time/1000 as bigint),'yyyy-MM-dd')))),',','=')
		tms
	from ods.mall__order_status_log
	where dt='$db_date'
	group by order_id
	)log
	join
	(
	select * from ods.mall__order_info where dt='$db_date'
	)info
	on log.order_id=info.id
	left join
	(
	select * from ods.mall__activity_order where dt='$db_date'
	)act
	on log.order_id=act.order_id
)new
on old.id=new.id;
"
$hive -e "$sql"

10 DWS層構建

不在進行壓縮處理,因為壓縮對於硬碟是好的,但是對於CPU計算是差的,對於DWS層的表,會被經常使用,那麼講究的是計算效率,此層主要處理每日主題行為

10.1 每日設備行為(用戶行為)

  • 建表
drop table if exists dws.mall__uv_detail_daycount

CREATE EXTERNAL TABLE `dws.mall__uv_detail_daycount`(
`mid_id` string COMMENT '設備唯一標識',
`user_id` string COMMENT '用戶標識',
`version_code` string COMMENT '程式版本號',
`version_name` string COMMENT '程式版本名',
`lang` string COMMENT '系統語言',
`source` string COMMENT '渠道號',
`os` string COMMENT 'Android系統版本',
`area` string COMMENT '區域',
`model` string COMMENT '手機型號',
`brand` string COMMENT '手機品牌',
`sdk_version` string COMMENT 'sdkVersion',
`gmail` string COMMENT 'gmail',
`height_width` string COMMENT '螢幕寬高',
`app_time` string COMMENT '客戶端日誌產生時的時間',
`network` string COMMENT '網路模式',
`lng` string COMMENT '經度',
`lat` string COMMENT '緯度',
`login_count` bigint COMMENT '活躍次數'
  ) COMMENT '每日設備行為表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dws/mall/uv_detail_daycount/'
  • 導入數據
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dws
table_name=uv_detail_daycount
hive_table_name=$APP2.mall__$table_name


# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
	mid_id,
	concat_ws('|', collect_set(user_id)) user_id,
	concat_ws('|', collect_set(version_code)) version_code,
	concat_ws('|', collect_set(version_name)) version_name,
	concat_ws('|', collect_set(lang))lang,
	concat_ws('|', collect_set(source)) source,
	concat_ws('|', collect_set(os)) os,
	concat_ws('|', collect_set(area)) area,
	concat_ws('|', collect_set(model)) model,
	concat_ws('|', collect_set(brand)) brand,
	concat_ws('|', collect_set(sdk_version)) sdk_version,
	concat_ws('|', collect_set(gmail)) gmail,
	concat_ws('|', collect_set(height_width)) height_width,
	concat_ws('|', collect_set(app_time)) app_time,
	concat_ws('|', collect_set(network)) network,
	concat_ws('|', collect_set(lng)) lng,
	concat_ws('|', collect_set(lat)) lat,
	count(*) login_count
from dwd.mall__start_log
where dt='$db_date'
group by mid_id;
"
$hive -e "$sql"

10.2 每日會員行為(業務)

  • 建表
drop table if exists dws.mall__user_action_daycount

CREATE EXTERNAL TABLE `dws.mall__user_action_daycount`(
user_id string comment '用戶 id',
login_count bigint comment '登錄次數',
cart_count bigint comment '加入購物車次數',
cart_amount double comment '加入購物車金額',
order_count bigint comment '下單次數',
order_amount decimal(16,2) comment '下單金額',
payment_count bigint comment '支付次數',
payment_amount decimal(16,2) comment '支付金額'
  ) COMMENT '每日會員行為表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dws/mall/user_action_daycount/'
  • 導入數據
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dws
table_name=user_action_daycount
hive_table_name=$APP2.mall__$table_name


# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
with
tmp_login as
(
	select
		user_id,
		count(*) login_count
	from dwd.mall__start_log
	where dt='$db_date'
	and user_id is not null
	group by user_id
),
tmp_cart as
(
	select
		user_id,
		count(*) cart_count,
		sum(cart_price*sku_num) cart_amount
	from dwd.mall__fact_cart_info
	where dt='$db_date'
	and user_id is not null
	and date_format(create_time,'yyyy-MM-dd')='$db_date'
	group by user_id
),
tmp_order as
(
	select
		user_id,
		count(*) order_count,
		sum(final_total_amount) order_amount
	from dwd.mall__fact_order_info
	where dt='$db_date'
	group by user_id
) ,
tmp_payment as
(
	select
		user_id,
		count(*) payment_count,
		sum(payment_amount) payment_amount
	from dwd.mall__fact_payment_info
	where dt='$db_date'
	group by user_id
)
insert overwrite table $hive_table_name partition(dt='$db_date')
select
	user_actions.user_id,
	sum(user_actions.login_count),
	sum(user_actions.cart_count),
	sum(user_actions.cart_amount),
	sum(user_actions.order_count),
	sum(user_actions.order_amount),
	sum(user_actions.payment_count),
	sum(user_actions.payment_amount)
from
(
	select
		user_id,
		login_count,
		0 cart_count,
		0 cart_amount,
		0 order_count,
		0 order_amount,
		0 payment_count,
		0 payment_amount
	from
	tmp_login
union all
	select
		user_id,
		0 login_count,
		cart_count,
		cart_amount,
		0 order_count,
		0 order_amount,
		0 payment_count,
		0 payment_amount
	from
	tmp_cart
union all
	select
		user_id,
		0 login_count,
		0 cart_count,
		0 cart_amount,
		order_count,
		order_amount,
		0 payment_count,
		0 payment_amount
	from tmp_order
union all
	select
		user_id,
		0 login_count,
		0 cart_count,
		0 cart_amount,
		0 order_count,
		0 order_amount,
		payment_count,
		payment_amount
	from tmp_payment
) user_actions
group by user_id;
"
$hive -e "$sql"

10.3 每日商品行為(業務)

  • 建表
drop table if exists dws.mall__sku_action_daycount

CREATE EXTERNAL TABLE `dws.mall__sku_action_daycount`(
sku_id string comment 'sku_id',
order_count bigint comment '被下單次數',
order_num bigint comment '被下單件數',
order_amount decimal(16,2) comment '被下單金額',
payment_count bigint comment '被支付次數',
payment_num bigint comment '被支付件數',
payment_amount decimal(16,2) comment '被支付金額',
refund_count bigint comment '被退款次數',
refund_num bigint comment '被退款件數',
refund_amount decimal(16,2) comment '被退款金額',
cart_count bigint comment '被加入購物車次數',
cart_num bigint comment '被加入購物車件數',
favor_count bigint comment '被收藏次數',
appraise_good_count bigint comment '好評數',
appraise_mid_count bigint comment '中評數',
appraise_bad_count bigint comment '差評數',
appraise_default_count bigint comment '默認評價數'
  ) COMMENT '每日商品行為表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dws/mall/sku_action_daycount/'
  • 導入數據
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dws
table_name=sku_action_daycount
hive_table_name=$APP2.mall__$table_name


# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
with
tmp_order as
(
	select
		cast(sku_id as string) sku_id,
		count(*) order_count,
		sum(sku_num) order_num,
		sum(total_amount) order_amount
	from dwd.mall__fact_order_detail
	where dt='$db_date'
	group by sku_id
),
tmp_payment as
(
	select
		cast(sku_id as string) sku_id,
		count(*) payment_count,
		sum(sku_num) payment_num,
		sum(total_amount) payment_amount
	from dwd.mall__fact_order_detail
	where dt='$db_date'
	and order_id in
	(
	select
		id
	from dwd.mall__fact_order_info
	where (dt='$db_date' or dt=date_add('$db_date',-1))
	and date_format(payment_time,'yyyy-MM-dd')='$db_date'
	)
	group by sku_id
),
tmp_refund as
(
	select
		cast(sku_id as string) sku_id,
		count(*) refund_count,
		sum(refund_num) refund_num,
		sum(refund_amount) refund_amount
	from dwd.mall__fact_order_refund_info
	where dt='$db_date'
	group by sku_id
),
tmp_cart as
(
	select
		cast(sku_id as string) sku_id,
		count(*) cart_count,
		sum(sku_num) cart_num
	from dwd.mall__fact_cart_info
	where dt='$db_date'
	and date_format(create_time,'yyyy-MM-dd')='$db_date'
	group by sku_id
),
tmp_favor as
(
	select
		cast(sku_id as string) sku_id,
		count(*) favor_count
	from dwd.mall__fact_favor_info
	where dt='$db_date'
	and date_format(create_time,'yyyy-MM-dd')='$db_date'
	group by sku_id
),
tmp_appraise as
(
	select
		cast(sku_id as string) sku_id,
		sum(if(appraise='1201',1,0)) appraise_good_count,
		sum(if(appraise='1202',1,0)) appraise_mid_count,
		sum(if(appraise='1203',1,0)) appraise_bad_count,
		sum(if(appraise='1204',1,0)) appraise_default_count
	from dwd.mall__fact_comment_info
	where dt='$db_date'
	group by sku_id
)
insert overwrite table $hive_table_name partition(dt='$db_date')
select
	sku_id,
	sum(order_count),
	sum(order_num),
	sum(order_amount),
	sum(payment_count),
	sum(payment_num),
	sum(payment_amount),
	sum(refund_count),
	sum(refund_num),
	sum(refund_amount),
	sum(cart_count),
	sum(cart_num),
	sum(favor_count),
	sum(appraise_good_count),
	sum(appraise_mid_count),
	sum(appraise_bad_count),
	sum(appraise_default_count)
from
(
	select
		sku_id,
		order_count,
		order_num,
		order_amount,
		0 payment_count,
		0 payment_num,
		0 payment_amount,
		0 refund_count,
		0 refund_num,
		0 refund_amount,
		0 cart_count,
		0 cart_num,
		0 favor_count,
		0 appraise_good_count,
		0 appraise_mid_count,
		0 appraise_bad_count,
		0 appraise_default_count
	from tmp_order
union all
	select
		sku_id,
		0 order_count,
		0 order_num,
		0 order_amount,
		payment_count,
		payment_num,
		payment_amount,
		0 refund_count,
		0 refund_num,
		0 refund_amount,
		0 cart_count,
		0 cart_num,
		0 favor_count,
		0 appraise_good_count,
		0 appraise_mid_count,
		0 appraise_bad_count,
		0 appraise_default_count
	from tmp_payment
union all
	select
		sku_id,
		0 order_count,
		0 order_num,
		0 order_amount,
		0 payment_count,
		0 payment_num,
		0 payment_amount,
		refund_count,
		refund_num,
		refund_amount,
		0 cart_count,
		0 cart_num,
		0 favor_count,
		0 appraise_good_count,
		0 appraise_mid_count,
		0 appraise_bad_count,
		0 appraise_default_count
	from tmp_refund
union all
	select
		sku_id,
		0 order_count,
		0 order_num,
		0 order_amount,
		0 payment_count,
		0 payment_num,
		0 payment_amount,
		0 refund_count,
		0 refund_num,
		0 refund_amount,
		cart_count,
		cart_num,
		0 favor_count,
		0 appraise_good_count,
		0 appraise_mid_count,
		0 appraise_bad_count,
		0 appraise_default_count
	from tmp_cart
union all
	select
		sku_id,
		0 order_count,
		0 order_num,
		0 order_amount,
		0 payment_count,
		0 payment_num,
		0 payment_amount,
		0 refund_count,
		0 refund_num,
		0 refund_amount,
		0 cart_count,
		0 cart_num,
		favor_count,
		0 appraise_good_count,
		0 appraise_mid_count,
		0 appraise_bad_count,
		0 appraise_default_count
	from tmp_favor
union all
	select
		sku_id,
		0 order_count,
		0 order_num,
		0 order_amount,
		0 payment_count,
		0 payment_num,
		0 payment_amount,
		0 refund_count,
		0 refund_num,
		0 refund_amount,
		0 cart_count,
		0 cart_num,
		0 favor_count,
		appraise_good_count,
		appraise_mid_count,
		appraise_bad_count,
		appraise_default_count
	from tmp_appraise
)tmp
group by sku_id;
"
$hive -e "$sql"

10.4 每日優惠券統計(業務)

  • 建表
drop table if exists dws.mall__coupon_use_daycount

CREATE EXTERNAL TABLE `dws.mall__coupon_use_daycount`(
`coupon_id` string COMMENT '優惠券 ID',
`coupon_name` string COMMENT '購物券名稱',
`coupon_type` string COMMENT '購物券類型 1 現金券 2 折扣券 3 滿減券 4 滿件打折券',
`condition_amount` string COMMENT '滿額數',
`condition_num` string COMMENT '滿件數',
`activity_id` string COMMENT '活動編號',
`benefit_amount` string COMMENT '減金額',
`benefit_discount` string COMMENT '折扣',
`create_time` string COMMENT '創建時間',
`range_type` string COMMENT '範圍類型 1、商品 2、品類 3、品牌',
`spu_id` string COMMENT '商品 id',
`tm_id` string COMMENT '品牌 id',
`category3_id` string COMMENT '品類 id',
`limit_num` string COMMENT '最多領用次數',
`get_count` bigint COMMENT '領用次數',
`using_count` bigint COMMENT '使用(下單)次數',
`used_count` bigint COMMENT '使用(支付)次數'
  ) COMMENT '每日優惠券統計表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dws/mall/coupon_use_daycount/'
  • 導入數據
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dws
table_name=coupon_use_daycount
hive_table_name=$APP2.mall__$table_name


# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert overwrite table $hive_table_name partition(dt='$db_date')
select
	cu.coupon_id,
	ci.coupon_name,
	ci.coupon_type,
	ci.condition_amount,
	ci.condition_num,
	ci.activity_id,
	ci.benefit_amount,
	ci.benefit_discount,
	ci.create_time,
	ci.range_type,
	ci.spu_id,
	ci.tm_id,
	ci.category3_id,
	ci.limit_num,
	cu.get_count,
	cu.using_count,
	cu.used_count
from
(
	select
		coupon_id,
		sum(if(date_format(get_time,'yyyy-MM-dd')='$db_date',1,0))
		get_count,
		sum(if(date_format(using_time,'yyyy-MM-dd')='$db_date',1,0))
		using_count,
		sum(if(date_format(used_time,'yyyy-MM-dd')='$db_date',1,0))
		used_count
	from dwd.mall__fact_coupon_use
	where dt='$db_date'
	group by coupon_id
)cu
left join
(
	select
		*
	from dwd.mall__dim_coupon_info
	where dt='$db_date'
)ci on cu.coupon_id=ci.id;
"
$hive -e "$sql"

10.5 每日活動統計(業務)

  • 建表
drop table if exists dws.mall__activity_info_daycount

CREATE EXTERNAL TABLE `dws.mall__activity_info_daycount`(
`id` string COMMENT '編號',
`activity_name` string COMMENT '活動名稱',
`activity_type` string COMMENT '活動類型',
`start_time` string COMMENT '開始時間',
`end_time` string COMMENT '結束時間',
`create_time` string COMMENT '創建時間',
`order_count` bigint COMMENT '下單次數',
`payment_count` bigint COMMENT '支付次數'
  ) COMMENT '每日活動統計表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dws/mall/activity_info_daycount/'

  • 導入數據
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dws
table_name=activity_info_daycount
hive_table_name=$APP2.mall__$table_name


# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert overwrite table $hive_table_name partition(dt='$db_date')
select
	oi.activity_id,
	ai.activity_name,
	ai.activity_type,
	ai.start_time,
	ai.end_time,
	ai.create_time,
	oi.order_count,
	oi.payment_count
from
(
	select
		activity_id,
		sum(if(date_format(create_time,'yyyy-MM-dd')='$db_date',1,0))
		order_count,
		sum(if(date_format(payment_time,'yyyy-MM-dd')='$db_date',1,0))
		payment_count
	from dwd.mall__fact_order_info
	where (dt='$db_date' or dt=date_add('$db_date',-1))
	and activity_id is not null
	group by activity_id
)oi
join
(
	select
		*
	from dwd.mall__dim_activity_info
	where dt='$db_date'
)ai
on oi.activity_id=ai.id;
"
$hive -e "$sql"

10.6 每日購買行為(業務)

  • 建表
drop table if exists dws.mall__sale_detail_daycount

CREATE EXTERNAL TABLE `dws.mall__sale_detail_daycount`(
user_id string comment '用戶 id',
sku_id string comment '商品 id',
user_gender string comment '用戶性別',
user_age string comment '用戶年齡',
user_level string comment '用戶等級',
order_price decimal(10,2) comment '商品價格',
sku_name string comment '商品名稱',
sku_tm_id string comment '品牌 id',
sku_category3_id string comment '商品三級品類 id',
sku_category2_id string comment '商品二級品類 id',
sku_category1_id string comment '商品一級品類 id',
sku_category3_name string comment '商品三級品類名稱',
sku_category2_name string comment '商品二級品類名稱',
sku_category1_name string comment '商品一級品類名稱',
spu_id string comment '商品 spu',
sku_num int comment '購買個數',
order_count bigint comment '當日下單單數',
order_amount decimal(16,2) comment '當日下單金額'
  ) COMMENT '每日購買行為表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dws/mall/sale_detail_daycount/'
  • 導入數據
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dws
table_name=sale_detail_daycount
hive_table_name=$APP2.mall__$table_name


# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert overwrite table $hive_table_name partition(dt='$db_date')
select
	op.user_id,
	op.sku_id,
	ui.gender,
	months_between('$db_date', ui.birthday)/12 age,
	ui.user_level,
	si.price,
	si.sku_name,
	si.tm_id,
	si.category3_id,
	si.category2_id,
	si.category1_id,
	si.category3_name,
	si.category2_name,
	si.category1_name,
	si.spu_id,
	op.sku_num,
	op.order_count,
	op.order_amount
from
(
	select
		user_id,
		sku_id,
		sum(sku_num) sku_num,
		count(*) order_count,
		sum(total_amount) order_amount
	from dwd.mall__fact_order_detail
	where dt='$db_date'
	group by user_id, sku_id
)op
join
(
	select
		*
	from dwd.mall__dim_user_info_his
	where end_date='9999-99-99'
)ui on op.user_id = ui.id
join
(
	select
		*
	from dwd.mall__dim_sku_info
	where dt='$db_date'
)si on op.sku_id = si.id;
"
$hive -e "$sql"

11 DWT層構建

此層主要針對dws層每日數據進行匯總,不建立分區,不壓縮,每日進行數據覆蓋

11.1 設備主題寬表

  • 建表
drop table if exists dwt.mall__uv_topic

CREATE EXTERNAL TABLE `dwt.mall__uv_topic`(
`mid_id` string COMMENT '設備唯一標識',
`user_id` string COMMENT '用戶標識',
`version_code` string COMMENT '程式版本號',
`version_name` string COMMENT '程式版本名',
`lang` string COMMENT '系統語言',
`source` string COMMENT '渠道號',
`os` string COMMENT 'Android系統版本',
`area` string COMMENT '區域',
`model` string COMMENT '手機型號',
`brand` string COMMENT '手機品牌',
`sdk_version` string COMMENT 'sdkVersion',
`gmail` string COMMENT 'gmail',
`height_width` string COMMENT '螢幕寬高',
`app_time` string COMMENT '客戶端日誌產生時的時間',
`network` string  COMMENT '網路模式',
`lng` string COMMENT '經度',
`lat` string COMMENT '緯度',
`login_date_first` string comment '首次活躍時間',
`login_date_last` string comment '末次活躍時間',
`login_day_count` bigint comment '當日活躍次數',
`login_count` bigint comment '累積活躍天數'
  ) COMMENT '設備主題寬表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwt/mall/uv_topic/'

  • 導入數據
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwt
table_name=uv_topic
hive_table_name=$APP2.mall__$table_name


# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert overwrite table $hive_table_name
select
	nvl(new.mid_id,old.mid_id),
	nvl(new.user_id,old.user_id),
	nvl(new.version_code,old.version_code),
	nvl(new.version_name,old.version_name),
	nvl(new.lang,old.lang),
	nvl(new.source,old.source),
	nvl(new.os,old.os),
	nvl(new.area,old.area),
	nvl(new.model,old.model),
	nvl(new.brand,old.brand),
	nvl(new.sdk_version,old.sdk_version),
	nvl(new.gmail,old.gmail),
	nvl(new.height_width,old.height_width),
	nvl(new.app_time,old.app_time),
	nvl(new.network,old.network),
	nvl(new.lng,old.lng),
	nvl(new.lat,old.lat),
	if(old.mid_id is null,'2020-03-10',old.login_date_first),
	if(new.mid_id is not null,'2020-03-10',old.login_date_last),
	if(new.mid_id is not null, new.login_count,0),
	nvl(old.login_count,0)+if(new.login_count>0,1,0)
from
(
	select
		*
	from dwt.mall__uv_topic
)old
full outer join
(
	select
		*
	from dws.mall__uv_detail_daycount
	where dt='$db_date'
)new
on old.mid_id=new.mid_id;
"
$hive -e "$sql"

11.2 會員主題寬表

  • 建表
drop table if exists dwt.mall__user_topic

CREATE EXTERNAL TABLE `dwt.mall__user_topic`(
user_id string comment '用戶 id',
login_date_first string comment '首次登錄時間',
login_date_last string comment '末次登錄時間',
login_count bigint comment '累積登錄天數',
login_last_30d_count bigint comment '最近 30 日登錄天數',
order_date_first string comment '首次下單時間',
order_date_last string comment '末次下單時間',
order_count bigint comment '累積下單次數',
order_amount decimal(16,2) comment '累積下單金額',
order_last_30d_count bigint comment '最近 30 日下單次數',
order_last_30d_amount bigint comment '最近 30 日下單金額',
payment_date_first string comment '首次支付時間',
payment_date_last string comment '末次支付時間',
payment_count decimal(16,2) comment '累積支付次數',
payment_amount decimal(16,2) comment '累積支付金額',
payment_last_30d_count decimal(16,2) comment '最近 30 日支付次數',
payment_last_30d_amount decimal(16,2) comment '最近 30 日支付金額'
  ) COMMENT '會員主題寬表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwt/mall/user_topic/'

  • 導入數據
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwt
table_name=user_topic
hive_table_name=$APP2.mall__$table_name


# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert overwrite table $hive_table_name
select
	nvl(new.user_id,old.user_id),
	if(old.login_date_first is null and
	new.login_count>0,'$db_date',old.login_date_first),
	if(new.login_count>0,'$db_date',old.login_date_last),
	nvl(old.login_count,0)+if(new.login_count>0,1,0),
	nvl(new.login_last_30d_count,0),
	if(old.order_date_first is null and
	new.order_count>0,'$db_date',old.order_date_first),
	if(new.order_count>0,'$db_date',old.order_date_last),
	nvl(old.order_count,0)+nvl(new.order_count,0),
	nvl(old.order_amount,0)+nvl(new.order_amount,0),
	nvl(new.order_last_30d_count,0),
	nvl(new.order_last_30d_amount,0),
	if(old.payment_date_first is null and
	new.payment_count>0,'$db_date',old.payment_date_first),
	if(new.payment_count>0,'$db_date',old.payment_date_last),
	nvl(old.payment_count,0)+nvl(new.payment_count,0),
	nvl(old.payment_amount,0)+nvl(new.payment_amount,0),
	nvl(new.payment_last_30d_count,0),
	nvl(new.payment_last_30d_amount,0)
from
dwt.mall__user_topic old
full outer join
(
	select
		user_id,
		sum(if(dt='$db_date',login_count,0)) login_count,
		sum(if(dt='$db_date',order_count,0)) order_count,
		sum(if(dt='$db_date',order_amount,0)) order_amount,
		sum(if(dt='$db_date',payment_count,0)) payment_count,
		sum(if(dt='$db_date',payment_amount,0)) payment_amount,
		sum(if(login_count>0,1,0)) login_last_30d_count,
		sum(order_count) order_last_30d_count,
		sum(order_amount) order_last_30d_amount,
		sum(payment_count) payment_last_30d_count,
		sum(payment_amount) payment_last_30d_amount
	from dws.mall__user_action_daycount
	where dt>=date_add( '$db_date',-30)
	group by user_id
)new
on old.user_id=new.user_id;
"
$hive -e "$sql"

11.3 商品主題寬表

  • 建表
drop table if exists dwt.mall__sku_topic

CREATE EXTERNAL TABLE `dwt.mall__sku_topic`(
sku_id string comment 'sku_id',
spu_id string comment 'spu_id',
order_last_30d_count bigint comment '最近 30 日被下單次數',
order_last_30d_num bigint comment '最近 30 日被下單件數',
order_last_30d_amount decimal(16,2) comment '最近 30 日被下單金額',
order_count bigint comment '累積被下單次數',
order_num bigint comment '累積被下單件數',
order_amount decimal(16,2) comment '累積被下單金額',
payment_last_30d_count bigint comment '最近 30 日被支付次數',
payment_last_30d_num bigint comment '最近 30 日被支付件數',
payment_last_30d_amount decimal(16,2) comment '最近 30 日被支付金額',
payment_count bigint comment '累積被支付次數',
payment_num bigint comment '累積被支付件數',
payment_amount decimal(16,2) comment '累積被支付金額',
refund_last_30d_count bigint comment '最近三十日退款次數',
refund_last_30d_num bigint comment '最近三十日退款件數',
refund_last_30d_amount decimal(10,2) comment '最近三十日退款金額',
refund_count bigint comment '累積退款次數',
refund_num bigint comment '累積退款件數',
refund_amount decimal(10,2) comment '累積退款金額',
cart_last_30d_count bigint comment '最近 30 日被加入購物車次數',
cart_last_30d_num bigint comment '最近 30 日被加入購物車件數',
cart_count bigint comment '累積被加入購物車次數',
cart_num bigint comment '累積被加入購物車件數',
favor_last_30d_count bigint comment '最近 30 日被收藏次數',
favor_count bigint comment '累積被收藏次數',
appraise_last_30d_good_count bigint comment '最近 30 日好評數',
appraise_last_30d_mid_count bigint comment '最近 30 日中評數',
appraise_last_30d_bad_count bigint comment '最近 30 日差評數',
appraise_last_30d_default_count bigint comment '最近 30 日默認評價數',
appraise_good_count bigint comment '累積好評數',
appraise_mid_count bigint comment '累積中評數',
appraise_bad_count bigint comment '累積差評數',
appraise_default_count bigint comment '累積默認評價數'
  ) COMMENT '商品主題寬表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwt/mall/sku_topic/'

  • 導入數據
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwt
table_name=sku_topic
hive_table_name=$APP2.mall__$table_name


# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert overwrite table $hive_table_name
select
	nvl(new.sku_id,old.sku_id), sku_info.spu_id,
	nvl(new.order_count30,0),
	nvl(new.order_num30,0),
	nvl(new.order_amount30,0),
	nvl(old.order_count,0) + nvl(new.order_count,0),
	nvl(old.order_num,0) + nvl(new.order_num,0),
	nvl(old.order_amount,0) + nvl(new.order_amount,0),
	nvl(new.payment_count30,0),
	nvl(new.payment_num30,0),
	nvl(new.payment_amount30,0),
	nvl(old.payment_count,0) + nvl(new.payment_count,0),
	nvl(old.payment_num,0) + nvl(new.payment_count,0),
	nvl(old.payment_amount,0) + nvl(new.payment_count,0),
	nvl(new.refund_count30,0),
	nvl(new.refund_num30,0),
	nvl(new.refund_amount30,0),
	nvl(old.refund_count,0) + nvl(new.refund_count,0),
	nvl(old.refund_num,0) + nvl(new.refund_num,0),
	nvl(old.refund_amount,0) + nvl(new.refund_amount,0),
	nvl(new.cart_count30,0),
	nvl(new.cart_num30,0),
	nvl(old.cart_count,0) + nvl(new.cart_count,0),
	nvl(old.cart_num,0) + nvl(new.cart_num,0),
	nvl(new.favor_count30,0),
	nvl(old.favor_count,0) + nvl(new.favor_count,0),
	nvl(new.appraise_good_count30,0),
	nvl(new.appraise_mid_count30,0),
	nvl(new.appraise_bad_count30,0),
	nvl(new.appraise_default_count30,0) ,
	nvl(old.appraise_good_count,0) + nvl(new.appraise_good_count,0),
	nvl(old.appraise_mid_count,0) + nvl(new.appraise_mid_count,0),
	nvl(old.appraise_bad_count,0) + nvl(new.appraise_bad_count,0),
	nvl(old.appraise_default_count,0) + nvl(new.appraise_default_count,0)
from
(
	select
		sku_id,
		spu_id,
		order_last_30d_count,
		order_last_30d_num,
		order_last_30d_amount,
		order_count,
		order_num,
		order_amount ,
		payment_last_30d_count,
		payment_last_30d_num,
		payment_last_30d_amount,
		payment_count,
		payment_num,
		payment_amount,
		refund_last_30d_count,
		refund_last_30d_num,
		refund_last_30d_amount,
		refund_count,
		refund_num,
		refund_amount,
		cart_last_30d_count,
		cart_last_30d_num,
		cart_count,
		cart_num,
		favor_last_30d_count,
		favor_count,
		appraise_last_30d_good_count,
		appraise_last_30d_mid_count,
		appraise_last_30d_bad_count,
		appraise_last_30d_default_count,
		appraise_good_count,
		appraise_mid_count,
		appraise_bad_count,
		appraise_default_count
	from dwt.mall__sku_topic
)old
full outer join
(
	select
		sku_id,
		sum(if(dt='$db_date', order_count,0 )) order_count,
		sum(if(dt='$db_date',order_num ,0 )) order_num,
		sum(if(dt='$db_date',order_amount,0 )) order_amount ,
		sum(if(dt='$db_date',payment_count,0 )) payment_count,
		sum(if(dt='$db_date',payment_num,0 )) payment_num,
		sum(if(dt='$db_date',payment_amount,0 )) payment_amount,
		sum(if(dt='$db_date',refund_count,0 )) refund_count,
		sum(if(dt='$db_date',refund_num,0 )) refund_num,
		sum(if(dt='$db_date',refund_amount,0 )) refund_amount,
		sum(if(dt='$db_date',cart_count,0 )) cart_count,
		sum(if(dt='$db_date',cart_num,0 )) cart_num,
		sum(if(dt='$db_date',favor_count,0 )) favor_count,
		sum(if(dt='$db_date',appraise_good_count,0 )) appraise_good_count,
		sum(if(dt='$db_date',appraise_mid_count,0 ) ) appraise_mid_count ,
		sum(if(dt='$db_date',appraise_bad_count,0 )) appraise_bad_count,
		sum(if(dt='$db_date',appraise_default_count,0 )) appraise_default_count,
		sum(order_count) order_count30 ,
		sum(order_num) order_num30,
		sum(order_amount) order_amount30,
		sum(payment_count) payment_count30,
		sum(payment_num) payment_num30,
		sum(payment_amount) payment_amount30,
		sum(refund_count) refund_count30,
		sum(refund_num) refund_num30,
		sum(refund_amount) refund_amount30,
		sum(cart_count) cart_count30,
		sum(cart_num) cart_num30,
		sum(favor_count) favor_count30,
		sum(appraise_good_count) appraise_good_count30,
		sum(appraise_mid_count) appraise_mid_count30,
		sum(appraise_bad_count) appraise_bad_count30,
		sum(appraise_default_count) appraise_default_count30
	from dws.mall__sku_action_daycount
	where dt >= date_add ('$db_date', -30)
	group by sku_id
)new
on new.sku_id = old.sku_id
left join
(
	select * from dwd.mall__dim_sku_info where dt='$db_date'
) sku_info
on nvl(new.sku_id,old.sku_id)= sku_info.id;
"
$hive -e "$sql"

11.4 優惠卷主題寬表

  • 建表
drop table if exists dwt.mall__coupon_topic

CREATE EXTERNAL TABLE `dwt.mall__coupon_topic`(
`coupon_id` string COMMENT '優惠券 ID',
`get_day_count` bigint COMMENT '當日領用次數',
`using_day_count` bigint COMMENT '當日使用(下單)次數',
`used_day_count` bigint COMMENT '當日使用(支付)次數',
`get_count` bigint COMMENT '累積領用次數',
`using_count` bigint COMMENT '累積使用(下單)次數',
`used_count` bigint COMMENT '累積使用(支付)次數'
  ) COMMENT '優惠券主題寬表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwt/mall/coupon_topic/'

  • 導入數據
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwt
table_name=coupon_topic
hive_table_name=$APP2.mall__$table_name


# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert overwrite table $hive_table_name
select
	nvl(new.coupon_id,old.coupon_id),
	nvl(new.get_count,0),
	nvl(new.using_count,0),
	nvl(new.used_count,0),
	nvl(old.get_count,0)+nvl(new.get_count,0),
	nvl(old.using_count,0)+nvl(new.using_count,0),
	nvl(old.used_count,0)+nvl(new.used_count,0)
from
(
	select
		*
	from dwt.mall__coupon_topic
)old
full outer join
(
	select
		coupon_id,
		get_count,
		using_count,
		used_count
	from dws.mall__coupon_use_daycount
	where dt='$db_date'
)new
on old.coupon_id=new.coupon_id;
"
$hive -e "$sql"

11.5 活動主題寬表

  • 建表
drop table if exists dwt.mall__activity_topic

CREATE EXTERNAL TABLE `dwt.mall__activity_topic`(
`id` string COMMENT '活動 id',
`activity_name` string COMMENT '活動名稱',
`order_day_count` bigint COMMENT '當日日下單次數',
`payment_day_count` bigint COMMENT '當日支付次數',
`order_count` bigint COMMENT '累積下單次數',
`payment_count` bigint COMMENT '累積支付次數'
  ) COMMENT '活動主題寬表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwt/mall/activity_topic/'

  • 導入數據
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwt
table_name=activity_topic
hive_table_name=$APP2.mall__$table_name


# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert overwrite table $hive_table_name
select
	nvl(new.id,old.id),
	nvl(new.activity_name,old.activity_name),
	nvl(new.order_count,0),
	nvl(new.payment_count,0),
	nvl(old.order_count,0)+nvl(new.order_count,0),
	nvl(old.payment_count,0)+nvl(new.payment_count,0)
from
(
	select
		*
	from dwt.mall__activity_topic
)old
full outer join
(
	select
		id,
		activity_name,
		order_count,
		payment_count
	from dws.mall__activity_info_daycount
where dt='$db_date'
)new
on old.id=new.id;
"
$hive -e "$sql"

12 ADS層構建

此層為最終數據需求層,考慮數據導出和數據數量決定是否需要壓縮,不需要分區,每天刷寫

12.1 設備主題

12.1.1 活躍設備數(日、周、月)

日活:當日活躍的設備數

周活:當周活躍的設備數

月活:當月活躍的設備數

  • 建表
drop table if exists ads.mall__uv_count

CREATE EXTERNAL TABLE `ads.mall__uv_count`(
`dt` string COMMENT '統計日期',
`day_count` bigint COMMENT '當日用戶數量',
`wk_count` bigint COMMENT '當周用戶數量',
`mn_count` bigint COMMENT '當月用戶數量',
`is_weekend` string COMMENT 'Y,N 是否是周末,用於得到本周最終結果',
`is_monthend` string COMMENT 'Y,N 是否是月末,用於得到本月最終結果'
  ) COMMENT '活躍設備數表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/uv_count/'
tblproperties ("parquet.compression"="snappy") 
  • 導入數據
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=uv_count
hive_table_name=$APP2.mall__$table_name


# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert into table $hive_table_name
select
	'$db_date' dt,
	daycount.ct,
	wkcount.ct,
	mncount.ct,
	if(date_add(next_day('$db_date','MO'),-1)='$db_date','Y','N') ,
	if(last_day('$db_date')='$db_date','Y','N')
from
(
	select
		'$db_date' dt,
		count(*) ct
	from dwt.mall__uv_topic
	where login_date_last='$db_date'
)daycount join
(
	select
	'$db_date' dt,
	count (*) ct
	from dwt.mall__uv_topic
	where login_date_last>=date_add(next_day('$db_date','MO'),-7)
	and login_date_last<= date_add(next_day('$db_date','MO'),-1)
) wkcount on daycount.dt=wkcount.dt
join
(
	select
		'$db_date' dt,
		count (*) ct
	from dwt.mall__uv_topic
	where
	date_format(login_date_last,'yyyy-MM')=date_format('$db_date','yyyy-MM')
)mncount on daycount.dt=mncount.dt;
"
$hive -e "$sql"

12.1.2 每日新增設備

  • 建表
drop table if exists ads.mall__new_mid_count

CREATE EXTERNAL TABLE `ads.mall__new_mid_count`(
`create_date` string comment '創建時間' ,
`new_mid_count` bigint comment '新增設備數量'
  ) COMMENT '每日新增設備表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/new_mid_count/'
tblproperties ("parquet.compression"="snappy") 
  • 導入數據
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=new_mid_count
hive_table_name=$APP2.mall__$table_name


# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert into table $hive_table_name
select
	login_date_first,
	count(*)
from dwt.mall__uv_topic
where login_date_first='$db_date'
group by login_date_first;
"
$hive -e "$sql"

12.1.3 沉默用戶數

沉默用戶:只在安裝當天啟動過,且啟動時間是在 7 天前

  • 建表
drop table if exists ads.mall__silent_count

CREATE EXTERNAL TABLE `ads.mall__silent_count`(
`dt` string COMMENT '統計日期',
`silent_count` bigint COMMENT '沉默設備數'
  ) COMMENT '沉默用戶數表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/silent_count/'
tblproperties ("parquet.compression"="snappy") 
  • 導入數據
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=silent_count
hive_table_name=$APP2.mall__$table_name


# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert into table $hive_table_name
select
	'$db_date',
	count(*)
from dwt.mall__uv_topic
where login_date_first=login_date_last
and login_date_last<=date_add('$db_date',-7);
"
$hive -e "$sql"

12.1.4 本周迴流用戶數

本周迴流用戶:上周未活躍,本周活躍的設備,且不是本周新增設備

  • 建表
drop table if exists ads.mall__back_count

CREATE EXTERNAL TABLE `ads.mall__back_count`(
`wk_dt` string COMMENT '統計日期所在周',
`wastage_count` bigint COMMENT '迴流設備數'
  ) COMMENT '本周迴流用戶數表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/back_count/'
tblproperties ("parquet.compression"="snappy") 
  • 導入數據
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=back_count
hive_table_name=$APP2.mall__$table_name


# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert into table $hive_table_name
select
	'$db_date',
	count(*)
from
(
	select
		mid_id
	from dwt.mall__uv_topic
	where login_date_last>=date_add(next_day('$db_date','MO'),-7)
	and login_date_last<= date_add(next_day('$db_date','MO'),-1)
	and login_date_first<date_add(next_day('$db_date','MO'),-7)
)current_wk
left join
(
	select
		mid_id
	from dws.mall__uv_detail_daycount
	where dt>=date_add(next_day('$db_date','MO'),-7*2)
	and dt<= date_add(next_day('$db_date','MO'),-7-1)
	group by mid_id
)last_wk
on current_wk.mid_id=last_wk.mid_id
where last_wk.mid_id is null;
"
$hive -e "$sql"

12.1.5 流失用戶數

流失用戶:最近 7 天未活躍的設備

  • 建表
drop table if exists ads.mall__wastage_count

CREATE EXTERNAL TABLE `ads.mall__wastage_count`(
`dt` string COMMENT '統計日期',
`wastage_count` bigint COMMENT '流失設備數'
  ) COMMENT '流失用戶數表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/wastage_count/'
tblproperties ("parquet.compression"="snappy") 
  • 導入數據
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=wastage_count
hive_table_name=$APP2.mall__$table_name


# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert into table $hive_table_name
select
	'$db_date',
	count(*)
from
(
	select
		mid_id
	from dwt.mall__uv_topic
	where login_date_last<=date_add('$db_date',-7)
	group by mid_id
)t1;
"
$hive -e "$sql"

12.1.6 留存率

  • 建表
drop table if exists ads.mall__user_retention_day_rate

CREATE EXTERNAL TABLE `ads.mall__user_retention_day_rate`(
`stat_date` string comment '統計日期',
`create_date` string comment '設備新增日期',
`retention_day` int comment '截止當前日期留存天數',
`retention_count` bigint comment '留存數量',
`new_mid_count` bigint comment '設備新增數量',
`retention_ratio` decimal(10,2) comment '留存率'
  ) COMMENT '留存率表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/user_retention_day_rate/'
tblproperties ("parquet.compression"="snappy") 
  • 導入數據
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=user_retention_day_rate
hive_table_name=$APP2.mall__$table_name


# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert into table $hive_table_name
select
	'$db_date',--統計日期
	date_add('$db_date',-1),--新增日期
	1,--留存天數
	sum(if(login_date_first=date_add('$db_date',-1) and
	login_date_last='$db_date',1,0)),--2020-03-09 的 1 日留存數
	sum(if(login_date_first=date_add('$db_date',-1),1,0)),--2020-03-09 新增
	sum(if(login_date_first=date_add('$db_date',-1) and
	login_date_last='$db_date',1,0))/sum(if(login_date_first=date_add('$db_date',-1),1,0))*100
from dwt.mall__uv_topic
union all
select
	'$db_date',--統計日期
	date_add('$db_date',-2),--新增日期
	2,--留存天數
	sum(if(login_date_first=date_add('$db_date',-2) and
	login_date_last='$db_date',1,0)),--2020-03-08 的 2 日留存數
	sum(if(login_date_first=date_add('$db_date',-2),1,0)),--2020-03-08 新增
	sum(if(login_date_first=date_add('$db_date',-2) and
	login_date_last='$db_date',1,0))/sum(if(login_date_first=date_add('$db_date',-2),1,0))*100
from dwt.mall__uv_topic
union all
select
	'$db_date',--統計日期
	date_add('$db_date',-3),--新增日期
	3,--留存天數
	sum(if(login_date_first=date_add('$db_date',-3) and
	login_date_last='$db_date',1,0)),--2020-03-07 的 3 日留存數
	sum(if(login_date_first=date_add('$db_date',-3),1,0)),--2020-03-07 新增
	sum(if(login_date_first=date_add('$db_date',-3) and
	login_date_last='$db_date',1,0))/sum(if(login_date_first=date_add('$db_date',-3),1,0))*100
from dwt.mall__uv_topic;
"
$hive -e "$sql"

12.1.7 最近連續三周活躍用戶數

  • 建表
drop table if exists ads.mall__continuity_wk_count

CREATE EXTERNAL TABLE `ads.mall__continuity_wk_count`(
`dt` string COMMENT '統計日期,一般用結束周周日日期,如果每天計算一次,可用當天日期',
`wk_dt` string COMMENT '持續時間',
`continuity_count` bigint COMMENT '活躍次數'
  ) COMMENT '最近連續三周活躍用戶數表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/continuity_wk_count/'
tblproperties ("parquet.compression"="snappy") 
  • 導入數據
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=continuity_wk_count
hive_table_name=$APP2.mall__$table_name


# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert into table $hive_table_name
select
	'$db_date',
	concat(date_add(next_day('$db_date','MO'),-7*3),'_',date_add(next_day('$db_date','MO'),-1)),
	count(*)
from
(
	select
		mid_id
	from
	(
		select
			mid_id
		from dws.mall__uv_detail_daycount
		where dt>=date_add(next_day('$db_date','monday'),-7)
		and dt<=date_add(next_day('$db_date','monday'),-1)
		group by mid_id
		union all
		select
			mid_id
		from dws.mall__uv_detail_daycount
		where dt>=date_add(next_day('$db_date','monday'),-7*2)
		and dt<=date_add(next_day('$db_date','monday'),-7-1)
		group by mid_id
		union all
		select
			mid_id
		from dws.mall__uv_detail_daycount
		where dt>=date_add(next_day('$db_date','monday'),-7*3)
		and dt<=date_add(next_day('$db_date','monday'),-7*2-1)
		group by mid_id
	)t1
	group by mid_id
	having count(*)=3
)t2
"
$hive -e "$sql"

12.1.8 最近七天內連續三天活躍用戶數

  • 建表
drop table if exists ads.mall__continuity_uv_count

CREATE EXTERNAL TABLE `ads.mall__continuity_uv_count`(
`dt` string COMMENT '統計日期',
`wk_dt` string COMMENT '最近 7 天日期',
`continuity_count` bigint
  ) COMMENT '最近七天內連續三天活躍用戶數表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/continuity_uv_count/'
tblproperties ("parquet.compression"="snappy") 
  • 導入數據
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=continuity_uv_count
hive_table_name=$APP2.mall__$table_name


# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert into table $hive_table_name
select
	'$db_date',
	concat(date_add('db_date',-6),'_','db_date'),
	count(*)
from
(
	select 
		mid_id
	from
	(
		select 
			mid_id
		from
		(
			select
				mid_id,
				date_sub(dt,rank) date_dif
			from
			(
				select
					mid_id,
					dt,
					rank() over(partition by mid_id order by dt) rank
				from dws.mall__uv_detail_daycount
				where dt>=date_add('db_date',-6) and
				dt<='db_date'
			)t1
		)t2
		group by mid_id,date_dif
		having count(*)>=3
	)t3
	group by mid_id
)t4;
"
$hive -e "$sql"

12.2 會員主題

12.2.1 會員主題資訊

  • 建表
drop table if exists ads.mall__user_topic

CREATE EXTERNAL TABLE `ads.mall__user_topic`(
`dt` string COMMENT '統計日期',
`day_users` string COMMENT '活躍會員數',
`day_new_users` string COMMENT '新增會員數',
`day_new_payment_users` string COMMENT '新增消費會員數',
`payment_users` string COMMENT '總付費會員數',
`users` string COMMENT '總會員數',
`day_users2users` decimal(10,2) COMMENT '會員活躍率',
`payment_users2users` decimal(10,2) COMMENT '會員付費率',
`day_new_users2users` decimal(10,2) COMMENT '會員新鮮度'
  ) COMMENT '會員主題資訊表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/user_topic/'
tblproperties ("parquet.compression"="snappy") 
  • 導入數據
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=user_topic
hive_table_name=$APP2.mall__$table_name


# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert into table $hive_table_name
select
	'$db_date',
	sum(if(login_date_last='$db_date',1,0)),
	sum(if(login_date_first='$db_date',1,0)),
	sum(if(payment_date_first='$db_date',1,0)),
	sum(if(payment_count>0,1,0)),
	count(*),
	sum(if(login_date_last='$db_date',1,0))/count(*),
	sum(if(payment_count>0,1,0))/count(*),
	sum(if(login_date_first='$db_date',1,0))/sum(if(login_date_last='$db_date',1,0))
from dwt.mall__user_topic
"
$hive -e "$sql"

12.2.2 漏斗分析

統計「瀏覽->購物車->下單->支付」的轉化率

思路:統計各個行為的人數,然後計算比值。

  • 建表
drop table if exists ads.mall__user_action_convert_day

CREATE EXTERNAL TABLE `ads.mall__user_action_convert_day`(
`dt` string COMMENT '統計日期',
`total_visitor_m_count` bigint COMMENT '總訪問人數',
`cart_u_count` bigint COMMENT '加入購物車的人數',
`visitor2cart_convert_ratio` decimal(10,2) COMMENT '訪問到加入購物車轉化率',
`order_u_count` bigint COMMENT '下單人數',
`cart2order_convert_ratio` decimal(10,2) COMMENT '加入購物車到下單轉化率',
`payment_u_count` bigint COMMENT '支付人數',
`order2payment_convert_ratio` decimal(10,2) COMMENT '下單到支付的轉化率'
  ) COMMENT '漏斗分析表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/user_action_convert_day/'
tblproperties ("parquet.compression"="snappy") 
  • 導入數據
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=user_action_convert_day
hive_table_name=$APP2.mall__$table_name


# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert into table $hive_table_name
select
	'$db_date',
	uv.day_count,
	ua.cart_count,
	cast(ua.cart_count/uv.day_count as decimal(10,2)) visitor2cart_convert_ratio,
	ua.order_count,
	cast(ua.order_count/ua.cart_count as decimal(10,2)) visitor2order_convert_ratio,
	ua.payment_count,
	cast(ua.payment_count/ua.order_count as decimal(10,2)) order2payment_convert_ratio
from
(
	select
		dt,
		sum(if(cart_count>0,1,0)) cart_count,
		sum(if(order_count>0,1,0)) order_count,
		sum(if(payment_count>0,1,0)) payment_count
	from dws.mall__user_action_daycount
	where dt='$db_date'
	group by dt
)ua join ads.mall__uv_count uv on uv.dt=ua.dt;
"
$hive -e "$sql"

12.3 商品主題

12.3.1 商品個數資訊

  • 建表
drop table if exists ads.mall__product_info

CREATE EXTERNAL TABLE `ads.mall__product_info`(
`dt` string COMMENT '統計日期',
`sku_num` string COMMENT 'sku 個數',
`spu_num` string COMMENT 'spu 個數'
  ) COMMENT '商品個數資訊表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/product_info/'
tblproperties ("parquet.compression"="snappy") 
  • 導入數據
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=product_info
hive_table_name=$APP2.mall__$table_name


# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert into table $hive_table_name
select
	'$db_date' dt,
	sku_num,
	spu_num
from
(
	select
		'$db_date' dt,
		count(*) sku_num
	from
	dwt.mall__sku_topic
) tmp_sku_num
join
(
	select
		'$db_date' dt,
		count(*) spu_num
	from
	(
		select
			spu_id
		from
		dwt.mall__sku_topic
		group by
		spu_id
	) tmp_spu_id
) tmp_spu_num
on
tmp_sku_num.dt=tmp_spu_num.dt;
"
$hive -e "$sql"

12.3.2 商品銷量排行

  • 建表
drop table if exists ads.mall__product_sale_topN

CREATE EXTERNAL TABLE `ads.mall__product_sale_topN`(
`dt` string COMMENT '統計日期',
`sku_num` string COMMENT 'sku 個數',
`spu_num` string COMMENT 'spu 個數'
  ) COMMENT '商品銷量排名表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/product_sale_topN/'
tblproperties ("parquet.compression"="snappy") 
  • 導入數據
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=product_sale_topN
hive_table_name=$APP2.mall__$table_name


# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert into table $hive_table_name
select
	'$db_date' dt,
	sku_id,
	payment_amount
from
dws.mall__sku_action_daycount
where
dt='$db_date'
order by payment_amount desc
limit 10;
"
$hive -e "$sql"

12.3.3 商品收藏排名

  • 建表
drop table if exists ads.mall__product_favor_topN

CREATE EXTERNAL TABLE `ads.mall__product_favor_topN`(
`dt` string COMMENT '統計日期',
`sku_id` string COMMENT '商品 ID',
`favor_count` bigint COMMENT '收藏量'
  ) COMMENT '商品收藏排名表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/product_favor_topN/'
tblproperties ("parquet.compression"="snappy") 
  • 導入數據
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=product_favor_topN
hive_table_name=$APP2.mall__$table_name


# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert into table $hive_table_name
select
	'$db_date' dt,
	sku_id,
	favor_count
from
dws.mall__sku_action_daycount
where
dt='$db_date'
order by favor_count desc
limit 10;
"
$hive -e "$sql"

12.3.4 商品加入購物車排名

  • 建表
drop table if exists ads.mall__product_cart_topN

CREATE EXTERNAL TABLE `ads.mall__product_cart_topN`(
`dt` string COMMENT '統計日期',
`sku_id` string COMMENT '商品 ID',
`cart_num` bigint COMMENT '加入購物車數量'
  ) COMMENT '商品加入購物車排名表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/product_cart_topN/'
tblproperties ("parquet.compression"="snappy") 
  • 導入數據
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=product_cart_topN
hive_table_name=$APP2.mall__$table_name


# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert into table $hive_table_name
select
	'$db_date' dt,
	sku_id,
	cart_num
from
dws.mall__sku_action_daycount
where
dt='$db_date'
order by cart_num desc
limit 10;
"
$hive -e "$sql"

12.3.5 商品退款率排名(近30天)

  • 建表
drop table if exists ads.mall__product_refund_topN

CREATE EXTERNAL TABLE `ads.mall__product_refund_topN`(
`dt` string COMMENT '統計日期',
`sku_id` string COMMENT '商品 ID',
`refund_ratio` decimal(10,2) COMMENT '退款率'
  ) COMMENT '商品退款率排名(最近 30 天)表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/product_refund_topN/'
tblproperties ("parquet.compression"="snappy") 
  • 導入數據
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=product_refund_topN
hive_table_name=$APP2.mall__$table_name


# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert into table $hive_table_name
select
	'$db_date',
	sku_id,
	refund_last_30d_count/payment_last_30d_count*100 refund_ratio
from dwt.mall__sku_topic
order by refund_ratio desc
limit 10;
"
$hive -e "$sql"

12.3.6 商品差評率

  • 建表
drop table if exists ads.mall__appraise_bad_topN

CREATE EXTERNAL TABLE `ads.mall__appraise_bad_topN`(
`dt` string COMMENT '統計日期',
`sku_id` string COMMENT '商品 ID',
`appraise_bad_ratio` decimal(10,2) COMMENT '差評率'
  ) COMMENT '商品差評率表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/appraise_bad_topN/'
tblproperties ("parquet.compression"="snappy") 
  • 導入數據
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=appraise_bad_topN
hive_table_name=$APP2.mall__$table_name


# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert into table $hive_table_name
select
	'$db_date' dt,
	sku_id,
	appraise_bad_count/(appraise_good_count+appraise_mid_count+appraise_bad_count+appraise_default_count) appraise_bad_ratio
from
dws.mall__sku_action_daycount
where
dt='$db_date'
order by appraise_bad_ratio desc
limit 10;
"
$hive -e "$sql"

12.4 營銷主題

12.4.1 下單數目統計

  • 建表
drop table if exists ads.mall__order_daycount

CREATE EXTERNAL TABLE `ads.mall__order_daycount`(
dt string comment '統計日期',
order_count bigint comment '單日下單筆數',
order_amount bigint comment '單日下單金額',
order_users bigint comment '單日下單用戶數'
  ) COMMENT '下單數目統計表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/order_daycount/'
tblproperties ("parquet.compression"="snappy") 
  • 導入數據
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=order_daycount
hive_table_name=$APP2.mall__$table_name


# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert into table $hive_table_name
select
	'$db_date',
	sum(order_count),
	sum(order_amount),
	sum(if(order_count>0,1,0))
from dws.mall__user_action_daycount
where dt='$db_date';
"
$hive -e "$sql"

12.4.2 支付資訊統計

  • 建表
drop table if exists ads.mall__payment_daycount

CREATE EXTERNAL TABLE `ads.mall__payment_daycount`(
dt string comment '統計日期',
order_count bigint comment '單日支付筆數',
order_amount bigint comment '單日支付金額',
payment_user_count bigint comment '單日支付人數',
payment_sku_count bigint comment '單日支付商品數',
payment_avg_time double comment '下單到支付的平均時長,取分鐘數'
  ) COMMENT '支付資訊統計表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/payment_daycount/'
tblproperties ("parquet.compression"="snappy") 
  • 導入數據
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=payment_daycount
hive_table_name=$APP2.mall__$table_name


# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert into table $hive_table_name
select
	tmp_payment.dt,
	tmp_payment.payment_count,
	tmp_payment.payment_amount,
	tmp_payment.payment_user_count,
	tmp_skucount.payment_sku_count,
	tmp_time.payment_avg_time
from
(
	select
		'$db_date' dt,
		sum(payment_count) payment_count,
		sum(payment_amount) payment_amount,
		sum(if(payment_count>0,1,0)) payment_user_count
	from dws.mall__user_action_daycount
	where dt='$db_date'
)tmp_payment
join
(
	select
		'$db_date' dt,
		sum(if(payment_count>0,1,0)) payment_sku_count
	from dws.mall__sku_action_daycount
	where dt='$db_date'
)tmp_skucount on tmp_payment.dt=tmp_skucount.dt
join
(
	select
		'$db_date' dt,
		sum(unix_timestamp(payment_time)-unix_timestamp(create_time))/count(*)/60
		payment_avg_time
	from dwd.mall__fact_order_info
	where dt='$db_date'
	and payment_time is not null
)tmp_time on tmp_payment.dt=tmp_time.dt
"
$hive -e "$sql"

12.4.3 復購率

  • 建表
drop table if exists ads.mall__sale_tm_category1_stat_mn

CREATE EXTERNAL TABLE `ads.mall__sale_tm_category1_stat_mn`(
tm_id string comment '品牌 id',
category1_id string comment '1 級品類 id ',
category1_name string comment '1 級品類名稱 ',
buycount bigint comment '購買人數',
buy_twice_last bigint comment '兩次以上購買人數',
buy_twice_last_ratio decimal(10,2) comment '單次復購率',
buy_3times_last bigint comment '三次以上購買人數',
buy_3times_last_ratio decimal(10,2) comment '多次復購率',
stat_mn string comment '統計月份',
stat_date string comment '統計日期'
  ) COMMENT '復購率表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/sale_tm_category1_stat_mn/'
tblproperties ("parquet.compression"="snappy") 
  • 導入數據
#!/bin/bash

db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=sale_tm_category1_stat_mn
hive_table_name=$APP2.mall__$table_name


# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
        db_date=${date}
else 
        db_date=`date -d "-1 day" +%F`
fi

sql=" 
insert into table $hive_table_name
select
	mn.sku_tm_id,
	mn.sku_category1_id,
	mn.sku_category1_name,
	sum(if(mn.order_count>=1,1,0)) buycount,
	sum(if(mn.order_count>=2,1,0)) buyTwiceLast,
	sum(if(mn.order_count>=2,1,0))/sum( if(mn.order_count>=1,1,0)) buyTwiceLastRatio,
	sum(if(mn.order_count>=3,1,0)) buy3timeLast ,
	sum(if(mn.order_count>=3,1,0))/sum( if(mn.order_count>=1,1,0)) buy3timeLastRatio,
	date_format('$db_date' ,'yyyy-MM') stat_mn,
	'$db_date' stat_date
from
(
	select
		user_id,
		sd.sku_tm_id,
		sd.sku_category1_id,
		sd.sku_category1_name,
		sum(order_count) order_count
	from dws.mall__sale_detail_daycount sd
	where date_format(dt,'yyyy-MM')=date_format('$db_date' ,'yyyy-MM')
	group by user_id, sd.sku_tm_id, sd.sku_category1_id, sd.sku_category1_name
) mn
group by mn.sku_tm_id, mn.sku_category1_id, mn.sku_category1_name;
"
$hive -e "$sql"