大數據篇:數據倉庫案例
離線數據倉庫
數據倉庫(Data WareHouse)是為企業所有決策制定過程,提供所有系統數據支援的戰略集合
通過對數據倉庫中數據的分析,可以幫助企業,改進業務流程、控制、成本、提高產品品質等
數據倉庫,並不是數據最終目的地,而是為數據最終的目的地做好準備:清洗、轉義、分類、重組、合併、拆分、統計等等
1 項目簡介
1.1 項目需求
- 用戶行為數據採集平台搭建
- 業務數據採集平台搭建
- 數據倉庫維度建模
- 分析:用戶、流量、會員、商品、銷售、地區、活動等主題
- 採用即席查詢工具,隨時進行指標分析
- 對集群性能進行監控,發生異常需要報警
- 元數據管理
- 品質監控
1.2 技術選型
- 數據採集功能如何技術選型
採集框架名稱 | 主要功能 |
---|---|
Sqoop | 大數據平台和關係型資料庫的導入導出 |
Datax | 大數據平台和關係型資料庫的導入導出 |
flume | 擅長日誌數據的採集和解析 |
logstash | 擅長日誌數據的採集和解析 |
maxwell | 常用作實時解析mysql的binlog數據 |
canal | 常用作實時解析mysql的binlog數據 |
waterDrop | 數據導入導出工具 |
- 消息中間件的技術選型
開源MQ | 概述 |
---|---|
RabbitMQ | LShift 用Erlang實現,支援多協議,broker架構,重量級 |
ZeroMQ | AMQP最初設計者iMatix公司實現,輕量消息內核,無broker設計。C++實現 |
Kafka | LinkedIn用Scala語言實現,支援hadoop數據並行載入 |
ActiveMQ | Apach的一種JMS具體實現,支援代理和p2p部署。支援多協議。Java實現 |
Redis | Key-value NoSQL資料庫,有MQ的功能 |
MemcacheQ | 國人利用memcache緩衝隊列協議開發的消息隊列,C/C++實現 |
- 數據永久存儲技術框架選型
框架名稱 | 主要用途 |
---|---|
HDFS | 分散式文件存儲系統 |
Hbase | Key,value對的nosql資料庫 |
Kudu | Cloudera公司開源提供的類似於Hbase的數據存儲 |
Hive | 基於MR的數據倉庫工具 |
- 數據離線計算框架技術選型(hive引擎)
框架名稱 | 基本介紹 |
---|---|
MapReduce | 最早期的分散式文件計算系統 |
Spark | 基於spark,一站式解決批流處理問題 |
Flink | 基於flink,一站式解決批流處理問題 |
- 分析資料庫選型
對比項目 | Druid | Kylin | Presto | Impala | ES |
---|---|---|---|---|---|
亞秒級響應 | √ | √ | × | × | × |
百億數據集 | √ | √ | √ | √ | √ |
SQL支援 | √ | √ | √ | √ | √(需插件) |
離線 | √ | √ | √ | √ | √ |
實時 | √ | √ | × | × | × |
精確去重 | × | √ | √ | √ | × |
多表Join | × | √ | √ | √ | × |
JDBC for BI | × | √ | √ | √ | × |
- 其他選型
- 任務調度:DolphinScheduler
- 集群監控:CM+CDH
- 元數據管理:Atlas
- BI工具:Zeppelin、Superset
1.3 架構
1.4 集群資源規劃
- 如何確認集群規模(假設每台伺服器8T磁碟,128G記憶體)
- 每天日活躍用戶100萬,每人一天平均100條:100萬 * 100條 = 1億條
- 每條日誌1K左右,每天1一條:1億 / 1024 /1024 = 約100G
- 半年內不擴容伺服器來算:100G * 180天 = 約18T
- 保存3個副本:18T * 3 = 54T
- 預留20% ~ 30%BUF:54T / 0.7 = 77T
- 總結:約10台伺服器
由於資源有限,採用3台進行製作
服務名稱 | 子服務 | 伺服器 cdh01.cm | 伺服器 cdh02.cm | 伺服器 cdh03.cm |
---|---|---|---|---|
HDFS | NameNode DataNode SecondaryNameNode |
√ √ |
√ | √ √ |
Yarn | NodeManager Resourcemanager |
√ | √ √ |
√ |
Zookeeper | Zookeeper Server | √ | √ | √ |
Flume | Flume Flume(消費 Kafka) |
√ | √ | √ |
Kafka | Kafka | √ | √ | √ |
Hive | Hive | √ | ||
MySQL | MySQL | √ | ||
Sqoop | Sqoop | √ | ||
Presto | Coordinator Worker |
√ | √ | √ |
DolphinScheduler | DolphinScheduler | √ | ||
Druid | Druid | √ | √ | √ |
Kylin | Kylin | √ | ||
Hbase | HMaster HRegionServer |
√ √ |
√ | √ |
Superset | Superset | √ | ||
Atlas | Atlas | √ | ||
Solr | Solr | √ |
2 數據生成模組
此模組主要針對於用戶行為數據的採集,為什麼要進行用戶行為數據的採集呢?
因為對於企業來說,用戶就是錢,需要將用戶的習慣等數據進行採集,以便在大數據衍生產品如用戶畫像標籤系統進行分析,那麼一般情況下用戶的資訊都是離線分析的,後期我們可以將分析結果存入ES等倒排索引生態中,在使用實時計算的方式匹配用戶習慣,進行訂製化推薦,更進一步的深度學習,對相似用戶進行推薦。
2.1 埋點數據基本格式
-
公共欄位:基本所有Android手機都包含的欄位
-
業務欄位:埋點上報的欄位,有具體的業務類型
{
"ap":"xxxxx",//項目數據來源 app pc
"cm": { //公共欄位
"mid": "", // (String) 設備唯一標識
"uid": "", // (String) 用戶標識
"vc": "1", // (String) versionCode,程式版本號
"vn": "1.0", // (String) versionName,程式版本名
"l": "zh", // (String) language 系統語言
"sr": "", // (String) 渠道號,應用從哪個渠道來的。
"os": "7.1.1", // (String) Android 系統版本
"ar": "CN", // (String) area 區域
"md": "BBB100-1", // (String) model 手機型號
"ba": "blackberry", // (String) brand 手機品牌
"sv": "V2.2.1", // (String) sdkVersion
"g": "", // (String) gmail
"hw": "1620x1080", // (String) heightXwidth,螢幕寬高
"t": "1506047606608", // (String) 客戶端日誌產生時的時間
"nw": "WIFI", // (String) 網路模式
"ln": 0, // (double) lng 經度
"la": 0 // (double) lat 緯度
},
"et": [ //事件
{
"ett": "1506047605364", //客戶端事件產生時間
"en": "display", //事件名稱
"kv": { //事件結果,以 key-value 形式自行定義
"goodsid": "236",
"action": "1",
"extend1": "1",
"place": "2",
"category": "75"
}
}
]
}
- 示例日誌(伺服器時間戳 | 日誌),時間戳可以有效判定網路服務的通訊時長:
1540934156385| {
"ap": "gmall", //數倉庫名
"cm": {
"uid": "1234",
"vc": "2",
"vn": "1.0",
"la": "EN",
"sr": "",
"os": "7.1.1",
"ar": "CN",
"md": "BBB100-1",
"ba": "blackberry",
"sv": "V2.2.1",
"g": "[email protected]",
"hw": "1620x1080",
"t": "1506047606608",
"nw": "WIFI",
"ln": 0,
"la": 0
},
"et": [
{
"ett": "1506047605364", //客戶端事件產生時間
"en": "display", //事件名稱
"kv": { //事件結果,以 key-value 形式自行定義
"goodsid": "236",
"action": "1",
"extend1": "1",
"place": "2",
"category": "75"
}
},{
"ett": "1552352626835",
"en": "active_background",
"kv": {
"active_source": "1"
}
}
]
}
}
2.2 埋點事件日誌數據
2.2.1 商品列表頁
- 事件名稱:loading
標籤 | 含義 |
---|---|
action | 動作:開始載入=1,載入成功=2,載入失敗=3 |
loading_time | 載入時長:計算下拉開始到介面返回數據的時間,(開始載入報 0,載入成 功或載入失敗才上報時間) |
loading_way | 載入類型:1-讀取快取,2-從介面拉新數據 (載入成功才上報載入類型) |
extend1 | 擴展欄位 Extend1 |
extend2 | 擴展欄位 Extend2 |
type | 載入類型:自動載入=1,用戶下拽載入=2,底部載入=3(底部條觸發點擊底部提示條/點擊返回頂部載入) |
type1 | 載入失敗碼:把載入失敗狀態碼報回來(報空為載入成功,沒有失敗) |
2.2.2 商品點擊
- 事件標籤:display
標籤 | 含義 |
---|---|
action | 動作:曝光商品=1,點擊商品=2 |
goodsid | 商品 ID(服務端下發的 ID) |
place | 順序(第幾條商品,第一條為 0,第二條為 1,如此類推) |
extend1 | 曝光類型:1 – 首次曝光 2-重複曝光 |
category | 分類 ID(服務端定義的分類 ID) |
2.2.3 商品詳情頁
- 事件標籤:newsdetail
標籤 | 含義 |
---|---|
entry | 頁面入口來源:應用首頁=1、push=2、詳情頁相關推薦=3 |
action | 動作:開始載入=1,載入成功=2(pv),載入失敗=3, 退出頁面=4 |
goodsid | 商品 ID(服務端下發的 ID) |
show_style | 商品樣式:0、無圖、1、一張大圖、2、兩張圖、3、三張小圖、4、一張小圖、 5、一張大圖兩張小圖 |
news_staytime | 頁面停留時長:從商品開始載入時開始計算,到用戶關閉頁面所用的時間。 若中途用跳轉到其它頁面了,則暫停計時,待回到詳情頁時恢復計時。或中 途划出的時間超過 10 分鐘,則本次計時作廢,不上報本次數據。如未載入成 功退出,則報空。 |
loading_time | 載入時長:計算頁面開始載入到介面返回數據的時間 (開始載入報 0,載入 成功或載入失敗才上報時間) |
type1 | 載入失敗碼:把載入失敗狀態碼報回來(報空為載入成功,沒有失敗) |
category | 分類 ID(服務端定義的分類 ID) |
2.2.4 廣告
- 事件名稱:ad
標籤 | 含義 |
---|---|
entry | 入口:商品列表頁=1 應用首頁=2 商品詳情頁=3 |
action | 動作: 廣告展示=1 廣告點擊=2 |
contentType | Type: 1 商品 2 營銷活動 |
displayMills | 展示時長 毫秒數 |
itemId | 商品 id |
activityId | 營銷活動 id |
2.2.5 消息通知
- 事件標籤:notification
標籤 | 含義 |
---|---|
action | 動作:通知產生=1,通知彈出=2,通知點擊=3,常駐通知展示(不重複上 報,一天之內只報一次)=4 |
type | 通知 id:預警通知=1,天氣預報(早=2,晚=3),常駐=4 |
ap_time | 客戶端彈出時間 |
content | 備用欄位 |
2.2.6 用戶後台活躍
- 事件標籤: active_background
標籤 | 含義 |
---|---|
active_source | 1=upgrade,2=download(下載),3=plugin_upgrade |
2.2.7 評論
- 描述:評論表(comment)
序號 | 欄位名稱 | 欄位描述 | 欄位類型 | 長度 | 允許空 | 預設值 |
---|---|---|---|---|---|---|
1 | comment_id | 評論表 | int | 10,0 | ||
2 | userid | 用戶 id | int | 10,0 | √ | 0 |
3 | p_comment_id | 父級評論 id(為 0 則是 一級評論,不 為 0 則是回復) |
int | 10,0 | √ | |
4 | content | 評論內容 | string | 1000 | √ | |
5 | addtime | 創建時間 | string | √ | ||
6 | other_id | 評論的相關 id | int | 10,0 | √ | |
7 | praise_count | 點贊數量 | int | 10,0 | √ | 0 |
8 | reply_count | 回複數量 | int | 10,0 | √ | 0 |
2.2.8 收藏
- 描述:收藏(favorites)
序號 | 欄位名稱 | 欄位描述 | 欄位類型 | 長度 | 允許空 | 預設值 |
---|---|---|---|---|---|---|
1 | id | 主鍵 | int | 10,0 | ||
2 | course_id | 商品 id | int | 10,0 | √ | 0 |
3 | userid | 用戶 ID | int | 10,0 | √ | 0 |
4 | add_time | 創建時間 | string | √ |
2.2.9 點贊
- 描述:所有的點贊表(praise)
序號 | 欄位名稱 | 欄位描述 | 欄位類型 | 長度 | 允許空 | 預設值 |
---|---|---|---|---|---|---|
1 | id | 主鍵 id | int | 10,0 | ||
2 | userid | 用戶 id | int | 10,0 | √ | |
3 | target_id | 點贊的對象 id | int | 10,0 | √ | |
4 | type | 創建點贊類型:1問答點贊 2問答評論點贊 3文章點贊數 4評論點贊 |
int | 10,0 | √ | |
5 | add_time | 添加時間 | string | √ |
2.2.10 錯誤日誌
errorBrief | 錯誤摘要 |
---|---|
errorBrief | 錯誤詳情 |
2.3 埋點啟動日誌數據
{
"action":"1",
"ar":"MX",
"ba":"HTC",
"detail":"",
"en":"start",
"entry":"2",
"extend1":"",
"g":"[email protected]",
"hw":"640*960",
"l":"en",
"la":"20.4",
"ln":"-99.3",
"loading_time":"2",
"md":"HTC-2",
"mid":"995",
"nw":"4G",
"open_ad_type":"2",
"os":"8.1.2",
"sr":"B",
"sv":"V2.0.6",
"t":"1561472502444",
"uid":"995",
"vc":"10",
"vn":"1.3.4"
}
- 事件標籤: start
標籤 | 含義 |
---|---|
entry | 入 口 : push=1 , widget=2 , icon=3 , notification=4, lockscreen_widget =5 |
open_ad_type | 開屏廣告類型: 開屏原生廣告=1, 開屏插屏廣告=2 |
action | 狀態:成功=1 失敗=2 |
loading_time | 載入時長:計算下拉開始到介面返回數據的時間,(開始載入報 0,載入成 功或載入失敗才上報時間) |
detail | 失敗碼(沒有則上報空) |
extend1 | 失敗的 message(沒有則上報空) |
en | 日誌類型 start |
2.4 數據生成腳本
如下案例中將省略圖中紅框中的日誌生成過程,直接使用Java程式構建logFile文件。
2.4.1 數據生成格式
- 啟動日誌
{“action”:”1″,”ar”:”MX”,”ba”:”Sumsung”,”detail”:”201″,”en”:”start”,”entry”:”4″,”extend1″:””,”g”:”[email protected]”,”hw”:”1080*1920″,”l”:”pt”,”la”:”-11.0″,”ln”:”-70.0″,”loading_time”:”9″,”md”:”sumsung-5″,”mid”:”244″,”nw”:”3G”,”open_ad_type”:”1″,”os”:”8.2.3″,”sr”:”D”,”sv”:”V2.1.3″,”t”:”1589612165914″,”uid”:”244″,”vc”:”16″,”vn”:”1.2.1″}
- 事件日誌(由於轉換問題,圖中沒有 “時間戳|”)
1589695383284|{“cm”:{“ln”:”-79.4″,”sv”:”V2.5.3″,”os”:”8.0.6″,”g”:”[email protected]”,”mid”:”245″,”nw”:”WIFI”,”l”:”pt”,”vc”:”6″,”hw”:”1080*1920″,”ar”:”MX”,”uid”:”245″,”t”:”1589627025851″,”la”:”-39.6″,”md”:”HTC-7″,”vn”:”1.3.5″,”ba”:”HTC”,”sr”:”N”},”ap”:”app”,”et”:[{“ett”:”1589650631883″,”en”:”display”,”kv”:{“goodsid”:”53″,”action”:”2″,”extend1″:”2″,”place”:”3″,”category”:”50″}},{“ett”:”1589690866312″,”en”:”newsdetail”,”kv”:{“entry”:”3″,”goodsid”:”54″,”news_staytime”:”1″,”loading_time”:”6″,”action”:”4″,”showtype”:”0″,”category”:”78″,”type1″:””}},{“ett”:”1589641734037″,”en”:”loading”,”kv”:{“extend2″:””,”loading_time”:”0″,”action”:”1″,”extend1″:””,”type”:”2″,”type1″:”201″,”loading_way”:”2″}},{“ett”:”1589687684878″,”en”:”ad”,”kv”:{“activityId”:”1″,”displayMills”:”92030″,”entry”:”3″,”action”:”5″,”contentType”:”0″}},{“ett”:”1589632980772″,”en”:”active_background”,”kv”:{“active_source”:”1″}},{“ett”:”1589682030324″,”en”:”error”,”kv”:{“errorDetail”:”java.lang.NullPointerException\n at cn.lift.appIn.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)\n at cn.lift.dfdf.web.AbstractBaseController.validInbound”,”errorBrief”:”at cn.lift.dfdf.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)”}},{“ett”:”1589675065650″,”en”:”comment”,”kv”:{“p_comment_id”:2,”addtime”:”1589624299628″,”praise_count”:509,”other_id”:6,”comment_id”:7,”reply_count”:35,”userid”:3,”content”:”關色蘆候佰間綸珊斑禁尹贊滌仇彭企呵姜毅”}},{“ett”:”1589631359459″,”en”:”favorites”,”kv”:{“course_id”:7,”id”:0,”add_time”:”1589681240066″,”userid”:7}},{“ett”:”1589616574187″,”en”:”praise”,”kv”:{“target_id”:1,”id”:7,”type”:3,”add_time”:”1589642497314″,”userid”:8}}]}
2.4.2 創建maven工程
- data-producer:pom.xml
<!--版本號統一-->
<properties>
<slf4j.version>1.7.20</slf4j.version>
<logback.version>1.0.7</logback.version>
</properties>
<dependencies> <!--阿里巴巴開源 json 解析框架-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.51</version>
</dependency> <!--日誌生成框架-->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.10</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<!--主類名-->
<mainClass>com.heaton.bigdata.datawarehouse.app.App</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
- data-producer:logback.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false"> <!--定義日誌文件的存儲地址 勿在 LogBack 的配置中使用相對路徑 -->
<property name="LOG_HOME" value="/root/logs/"/> <!-- 控制台輸出 -->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder
class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> <!--格式化輸出:%d 表示日期,%thread 表示執行緒名,%-5level:級別從左顯示 5 個字元寬度%msg: 日誌消息,%n 是換行符 -->
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
</encoder>
</appender> <!-- 按照每天生成日誌文件。存儲事件日誌 -->
<appender name="FILE"
class="ch.qos.logback.core.rolling.RollingFileAppender"> <!-- <File>${LOG_HOME}/app.log</File>設置日誌不超過${log.max.size}時的保存路徑,注意, 如果是 web 項目會保存到 Tomcat 的 bin 目錄 下 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <!--日誌文件輸出的文件名 -->
<FileNamePattern>${LOG_HOME}/app-%d{yyyy-MM-dd}.log</FileNamePattern> <!--日誌文件保留天數 -->
<MaxHistory>30</MaxHistory>
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%msg%n</pattern>
</encoder> <!--日誌文件最大的大小 -->
<triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<MaxFileSize>10MB</MaxFileSize>
</triggeringPolicy>
</appender> <!--非同步列印日誌-->
<appender name="ASYNC_FILE"
class="ch.qos.logback.classic.AsyncAppender"> <!-- 不丟失日誌.默認的,如果隊列的 80%已滿,則會丟棄 TRACT、DEBUG、INFO 級別的日誌 -->
<discardingThreshold>0</discardingThreshold> <!-- 更改默認的隊列的深度,該值會影響性能.默認值為 256 -->
<queueSize>512</queueSize> <!-- 添加附加的 appender,最多只能添加一個 -->
<appender-ref ref="FILE"/>
</appender> <!-- 日誌輸出級別 -->
<root level="INFO">
<appender-ref ref="STDOUT"/>
<appender-ref ref="ASYNC_FILE"/>
<appender-ref ref="error"/>
</root>
</configuration>
- data-flume:pom.xml
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
- hive-function:pom.xml
<dependencies>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>2.1.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
2.4.3 各事件bean
data-producer工程
2.4.3.1 公共日誌類
import lombok.Data;
/**
* @author Heaton
* @email [email protected]
* @date 2020/4/25 14:54
* @describe 公共日誌類
*/
@Data
public class AppBase {
private String mid; // (String) 設備唯一
private String uid; // (String) 用戶 uid
private String vc; // (String) versionCode,程式版本號
private String vn; // (String) versionName,程式版本名
private String l; // (String) 系統語言
private String sr; // (String) 渠道號,應用從哪個渠道來的。
private String os; // (String) Android 系統版本
private String ar; // (String) 區域
private String md; // (String) 手機型號
private String ba; // (String) 手機品牌
private String sv; // (String) sdkVersion
private String g; // (String) gmail
private String hw; // (String) heightXwidth,螢幕寬高
private String t; // (String) 客戶端日誌產生時的時間
private String nw; // (String) 網路模式
private String ln; // (double) lng 經度
private String la; // (double) lat 緯度
}
2.4.3.2 啟動日誌類
import lombok.Data;
/**
* @author Heaton
* @email [email protected]
* @date 2020/4/25 14:54
* @describe 啟動日誌類
*/
@Data
public class AppStart extends AppBase {
private String entry;//入口: push=1,widget=2,icon=3,notification=4, lockscreen_widget
private String open_ad_type;//開屏廣告類型: 開屏原生廣告=1, 開屏插屏廣告=2
private String action;//狀態:成功=1 失敗=2
private String loading_time;//載入時長:計算下拉開始到介面返回數據的時間,(開始載入報 0,載入成功或載入失敗才上報時間)
private String detail;//失敗碼(沒有則上報空)
private String extend1;//失敗的 message(沒有則上報空)
private String en;//啟動日誌類型標記
}
2.4.3.3 錯誤日誌類
import lombok.Data;
/**
* @author Heaton
* @email [email protected]
* @date 2020/4/25 14:54
* @describe 錯誤日誌類
*/
@Data
public class AppErrorLog {
private String errorBrief; //錯誤摘要
private String errorDetail; //錯誤詳情
}
2.4.3.4 商品點擊日誌類
import lombok.Data;
/**
* @author Heaton
* @email [email protected]
* @date 2020/4/25 14:54
* @describe 商品點擊日誌類
*/
@Data
public class AppDisplay {
private String action;//動作:曝光商品=1,點擊商品=2
private String goodsid;//商品 ID(服務端下發的 ID)
private String place;//順序(第幾條商品,第一條為 0,第二條為 1,如此類推)
private String extend1;//曝光類型:1 - 首次曝光 2-重複曝光(沒有使用)
private String category;//分類 ID(服務端定義的分類 ID)
}
2.4.3.5 商品詳情類
import lombok.Data;
/**
* @author Heaton
* @email [email protected]
* @date 2020/4/25 14:54
* @describe 商品詳情類
*/
@Data
public class AppNewsDetail {
private String entry;//頁面入口來源:應用首頁=1、push=2、詳情頁相關推薦
private String action;//動作:開始載入=1,載入成功=2(pv),載入失敗=3, 退出頁面=4
private String goodsid;//商品 ID(服務端下發的 ID)
private String showtype;//商品樣式:0、無圖 1、一張大圖 2、兩張圖 3、三張小圖 4、一張小 圖 5、一張大圖兩張小圖 來源於詳情頁相關推薦的商品,上報樣式都為 0(因為都是左文右圖)
private String news_staytime;//頁面停留時長:從商品開始載入時開始計算,到用戶關閉頁面 所用的時間。若中途用跳轉到其它頁面了,則暫停計時,待回到詳情頁時恢復計時。或中途划出的時間超 過 10 分鐘,則本次計時作廢,不上報本次數據。如未載入成功退出,則報空。
private String loading_time;//載入時長:計算頁面開始載入到介面返回數據的時間 (開始加 載報 0,載入成功或載入失敗才上報時間)
private String type1;//載入失敗碼:把載入失敗狀態碼報回來(報空為載入成功,沒有失敗)
private String category;//分類 ID(服務端定義的分類 ID)
}
2.4.3.6 商品列表類
import lombok.Data;
/**
* @author Heaton
* @email [email protected]
* @date 2020/4/25 14:54
* @describe 商品列表類
*/
@Data
public class AppLoading {
private String action;//動作:開始載入=1,載入成功=2,載入失敗
private String loading_time;//載入時長:計算下拉開始到介面返回數據的時間,(開始載入報 0, 載入成功或載入失敗才上報時間)
private String loading_way;//載入類型:1-讀取快取,2-從介面拉新數據 (載入成功才上報加 載類型)
private String extend1;//擴展欄位 Extend1
private String extend2;//擴展欄位 Extend2
private String type;//載入類型:自動載入=1,用戶下拽載入=2,底部載入=3(底部條觸發點擊底 部提示條/點擊返回頂部載入)
private String type1;//載入失敗碼:把載入失敗狀態碼報回來(報空為載入成功,沒有失敗)
}
2.4.3.7 廣告類
import lombok.Data;
/**
* @author Heaton
* @email [email protected]
* @date 2020/4/25 14:54
* @describe 廣告類
*/
@Data
public class AppAd {
private String entry;//入口:商品列表頁=1 應用首頁=2 商品詳情頁=3
private String action;//動作: 廣告展示=1 廣告點擊=2
private String contentType;//Type: 1 商品 2 營銷活動
private String displayMills;//展示時長 毫秒數
private String itemId; //商品id
private String activityId; //營銷活動id
}
2.4.3.8 消息通知日誌類
import lombok.Data;
/**
* @author Heaton
* @email [email protected]
* @date 2020/4/25 14:54
* @describe 消息通知日誌類
*/
@Data
public class AppNotification {
private String action;//動作:通知產生=1,通知彈出=2,通知點擊=3,常駐通知展示(不重複上 報,一天之內只報一次)
private String type;//通知 id:預警通知=1,天氣預報(早=2,晚=3),常駐=4
private String ap_time;//客戶端彈出時間
private String content;//備用欄位
}
2.4.3.9 用戶後台活躍類
import lombok.Data;
/**
* @author Heaton
* @email [email protected]
* @date 2020/4/25 14:54
* @describe 用戶後台活躍類
*/
@Data
public class AppActive {
private String active_source;//1=upgrade,2=download(下載),3=plugin_upgrade
}
2.4.3.10 用戶評論類
import lombok.Data;
/**
* @author Heaton
* @email [email protected]
* @date 2020/4/25 14:54
* @describe 用戶評論類
*/
@Data
public class AppComment {
private int comment_id;//評論表
private int userid;//用戶 id
private int p_comment_id;//父級評論 id(為 0 則是一級評論,不為 0 則是回復)
private String content;//評論內容
private String addtime;//創建時間
private int other_id;//評論的相關 id
private int praise_count;//點贊數量
private int reply_count;//回複數量
}
2.4.3.11 用戶收藏類
import lombok.Data;
/**
* @author Heaton
* @email [email protected]
* @date 2020/4/25 14:54
* @describe 用戶收藏類
*/
@Data
public class AppFavorites {
private int id;//主鍵
private int course_id;//商品 id
private int userid;//用戶 ID
private String add_time;//創建時間
}
2.4.3.12 用戶點贊類
import lombok.Data;
/**
* @author Heaton
* @email [email protected]
* @date 2020/4/25 14:54
* @describe 用戶點贊類
*/
@Data
public class AppPraise {
private int id; //主鍵 id
private int userid;//用戶 id
private int target_id;//點贊的對象 id
private int type;//點贊類型 1 問答點贊 2 問答評論點贊 3 文章點贊數 4 評論點贊
private String add_time;//添加時間
}
2.4.4 啟動類
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.UnsupportedEncodingException;
import java.util.Random;
/**
* @author Heaton
* @email [email protected]
* @date 2020/4/25 14:54
* @describe 啟動類
*/
public class App {
private final static Logger logger = LoggerFactory.getLogger(App.class);
private static Random rand = new Random();
// 設備id
private static int s_mid = 0;
// 用戶id
private static int s_uid = 0;
// 商品id
private static int s_goodsid = 0;
public static void main(String[] args) {
// 參數一:控制發送每條的延時時間,默認是0
Long delay = args.length > 0 ? Long.parseLong(args[0]) : 0L;
// 參數二:循環遍歷次數
int loop_len = args.length > 1 ? Integer.parseInt(args[1]) : 1000;
// 生成數據
generateLog(delay, loop_len);
}
private static void generateLog(Long delay, int loop_len) {
for (int i = 0; i < loop_len; i++) {
int flag = rand.nextInt(2);
switch (flag) {
case (0):
//應用啟動
AppStart appStart = generateStart();
String jsonString = JSON.toJSONString(appStart);
//控制台列印
logger.info(jsonString);
break;
case (1):
JSONObject json = new JSONObject();
json.put("ap", "app");
json.put("cm", generateComFields());
JSONArray eventsArray = new JSONArray();
// 事件日誌
// 商品點擊,展示
if (rand.nextBoolean()) {
eventsArray.add(generateDisplay());
json.put("et", eventsArray);
}
// 商品詳情頁
if (rand.nextBoolean()) {
eventsArray.add(generateNewsDetail());
json.put("et", eventsArray);
}
// 商品列表頁
if (rand.nextBoolean()) {
eventsArray.add(generateNewList());
json.put("et", eventsArray);
}
// 廣告
if (rand.nextBoolean()) {
eventsArray.add(generateAd());
json.put("et", eventsArray);
}
// 消息通知
if (rand.nextBoolean()) {
eventsArray.add(generateNotification());
json.put("et", eventsArray);
}
// 用戶後台活躍
if (rand.nextBoolean()) {
eventsArray.add(generateBackground());
json.put("et", eventsArray);
}
//故障日誌
if (rand.nextBoolean()) {
eventsArray.add(generateError());
json.put("et", eventsArray);
}
// 用戶評論
if (rand.nextBoolean()) {
eventsArray.add(generateComment());
json.put("et", eventsArray);
}
// 用戶收藏
if (rand.nextBoolean()) {
eventsArray.add(generateFavorites());
json.put("et", eventsArray);
}
// 用戶點贊
if (rand.nextBoolean()) {
eventsArray.add(generatePraise());
json.put("et", eventsArray);
}
//時間
long millis = System.currentTimeMillis();
//控制台列印
logger.info(millis + "|" + json.toJSONString());
break;
}
// 延遲
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 公共欄位設置
*/
private static JSONObject generateComFields() {
AppBase appBase = new AppBase();
//設備id
appBase.setMid(s_mid + "");
s_mid++;
// 用戶id
appBase.setUid(s_uid + "");
s_uid++;
// 程式版本號 5,6等
appBase.setVc("" + rand.nextInt(20));
//程式版本名 v1.1.1
appBase.setVn("1." + rand.nextInt(4) + "." + rand.nextInt(10));
// Android系統版本
appBase.setOs("8." + rand.nextInt(3) + "." + rand.nextInt(10));
// 語言 es,en,pt
int flag = rand.nextInt(3);
switch (flag) {
case (0):
appBase.setL("es");
break;
case (1):
appBase.setL("en");
break;
case (2):
appBase.setL("pt");
break;
}
// 渠道號 從哪個渠道來的
appBase.setSr(getRandomChar(1));
// 區域
flag = rand.nextInt(2);
switch (flag) {
case 0:
appBase.setAr("BR");
case 1:
appBase.setAr("MX");
}
// 手機品牌 ba ,手機型號 md,就取2位數字了
flag = rand.nextInt(3);
switch (flag) {
case 0:
appBase.setBa("Sumsung");
appBase.setMd("sumsung-" + rand.nextInt(20));
break;
case 1:
appBase.setBa("Huawei");
appBase.setMd("Huawei-" + rand.nextInt(20));
break;
case 2:
appBase.setBa("HTC");
appBase.setMd("HTC-" + rand.nextInt(20));
break;
}
// 嵌入sdk的版本
appBase.setSv("V2." + rand.nextInt(10) + "." + rand.nextInt(10));
// gmail
appBase.setG(getRandomCharAndNumr(8) + "@gmail.com");
// 螢幕寬高 hw
flag = rand.nextInt(4);
switch (flag) {
case 0:
appBase.setHw("640*960");
break;
case 1:
appBase.setHw("640*1136");
break;
case 2:
appBase.setHw("750*1134");
break;
case 3:
appBase.setHw("1080*1920");
break;
}
// 客戶端產生日誌時間
long millis = System.currentTimeMillis();
appBase.setT("" + (millis - rand.nextInt(99999999)));
// 手機網路模式 3G,4G,WIFI
flag = rand.nextInt(3);
switch (flag) {
case 0:
appBase.setNw("3G");
break;
case 1:
appBase.setNw("4G");
break;
case 2:
appBase.setNw("WIFI");
break;
}
// 拉丁美洲 西經34°46′至西經117°09;北緯32°42′至南緯53°54′
// 經度
appBase.setLn((-34 - rand.nextInt(83) - rand.nextInt(60) / 10.0) + "");
// 緯度
appBase.setLa((32 - rand.nextInt(85) - rand.nextInt(60) / 10.0) + "");
return (JSONObject) JSON.toJSON(appBase);
}
/**
* 商品展示事件
*/
private static JSONObject generateDisplay() {
AppDisplay appDisplay = new AppDisplay();
boolean boolFlag = rand.nextInt(10) < 7;
// 動作:曝光商品=1,點擊商品=2,
if (boolFlag) {
appDisplay.setAction("1");
} else {
appDisplay.setAction("2");
}
// 商品id
String goodsId = s_goodsid + "";
s_goodsid++;
appDisplay.setGoodsid(goodsId);
// 順序 設置成6條吧
int flag = rand.nextInt(6);
appDisplay.setPlace("" + flag);
// 曝光類型
flag = 1 + rand.nextInt(2);
appDisplay.setExtend1("" + flag);
// 分類
flag = 1 + rand.nextInt(100);
appDisplay.setCategory("" + flag);
JSONObject jsonObject = (JSONObject) JSON.toJSON(appDisplay);
return packEventJson("display", jsonObject);
}
/**
* 商品詳情頁
*/
private static JSONObject generateNewsDetail() {
AppNewsDetail appNewsDetail = new AppNewsDetail();
// 頁面入口來源
int flag = 1 + rand.nextInt(3);
appNewsDetail.setEntry(flag + "");
// 動作
appNewsDetail.setAction("" + (rand.nextInt(4) + 1));
// 商品id
appNewsDetail.setGoodsid(s_goodsid + "");
// 商品來源類型
flag = 1 + rand.nextInt(3);
appNewsDetail.setShowtype(flag + "");
// 商品樣式
flag = rand.nextInt(6);
appNewsDetail.setShowtype("" + flag);
// 頁面停留時長
flag = rand.nextInt(10) * rand.nextInt(7);
appNewsDetail.setNews_staytime(flag + "");
// 載入時長
flag = rand.nextInt(10) * rand.nextInt(7);
appNewsDetail.setLoading_time(flag + "");
// 載入失敗碼
flag = rand.nextInt(10);
switch (flag) {
case 1:
appNewsDetail.setType1("102");
break;
case 2:
appNewsDetail.setType1("201");
break;
case 3:
appNewsDetail.setType1("325");
break;
case 4:
appNewsDetail.setType1("433");
break;
case 5:
appNewsDetail.setType1("542");
break;
default:
appNewsDetail.setType1("");
break;
}
// 分類
flag = 1 + rand.nextInt(100);
appNewsDetail.setCategory("" + flag);
JSONObject eventJson = (JSONObject) JSON.toJSON(appNewsDetail);
return packEventJson("newsdetail", eventJson);
}
/**
* 商品列表
*/
private static JSONObject generateNewList() {
AppLoading appLoading = new AppLoading();
// 動作
int flag = rand.nextInt(3) + 1;
appLoading.setAction(flag + "");
// 載入時長
flag = rand.nextInt(10) * rand.nextInt(7);
appLoading.setLoading_time(flag + "");
// 失敗碼
flag = rand.nextInt(10);
switch (flag) {
case 1:
appLoading.setType1("102");
break;
case 2:
appLoading.setType1("201");
break;
case 3:
appLoading.setType1("325");
break;
case 4:
appLoading.setType1("433");
break;
case 5:
appLoading.setType1("542");
break;
default:
appLoading.setType1("");
break;
}
// 頁面 載入類型
flag = 1 + rand.nextInt(2);
appLoading.setLoading_way("" + flag);
// 擴展欄位1
appLoading.setExtend1("");
// 擴展欄位2
appLoading.setExtend2("");
// 用戶載入類型
flag = 1 + rand.nextInt(3);
appLoading.setType("" + flag);
JSONObject jsonObject = (JSONObject) JSON.toJSON(appLoading);
return packEventJson("loading", jsonObject);
}
/**
* 廣告相關欄位
*/
private static JSONObject generateAd() {
AppAd appAd = new AppAd();
// 入口
int flag = rand.nextInt(3) + 1;
appAd.setEntry(flag + "");
// 動作
flag = rand.nextInt(5) + 1;
appAd.setAction(flag + "");
// 內容類型類型
flag = rand.nextInt(6) + 1;
appAd.setContentType(flag + "");
// 展示樣式
flag = rand.nextInt(120000) + 1000;
appAd.setDisplayMills(flag + "");
flag = rand.nextInt(1);
if (flag == 1) {
appAd.setContentType(flag + "");
flag = rand.nextInt(6);
appAd.setItemId(flag + "");
} else {
appAd.setContentType(flag + "");
flag = rand.nextInt(1) + 1;
appAd.setActivityId(flag + "");
}
JSONObject jsonObject = (JSONObject) JSON.toJSON(appAd);
return packEventJson("ad", jsonObject);
}
/**
* 啟動日誌
*/
private static AppStart generateStart() {
AppStart appStart = new AppStart();
//設備id
appStart.setMid(s_mid + "");
s_mid++;
// 用戶id
appStart.setUid(s_uid + "");
s_uid++;
// 程式版本號 5,6等
appStart.setVc("" + rand.nextInt(20));
//程式版本名 v1.1.1
appStart.setVn("1." + rand.nextInt(4) + "." + rand.nextInt(10));
// Android系統版本
appStart.setOs("8." + rand.nextInt(3) + "." + rand.nextInt(10));
//設置日誌類型
appStart.setEn("start");
// 語言 es,en,pt
int flag = rand.nextInt(3);
switch (flag) {
case (0):
appStart.setL("es");
break;
case (1):
appStart.setL("en");
break;
case (2):
appStart.setL("pt");
break;
}
// 渠道號 從哪個渠道來的
appStart.setSr(getRandomChar(1));
// 區域
flag = rand.nextInt(2);
switch (flag) {
case 0:
appStart.setAr("BR");
case 1:
appStart.setAr("MX");
}
// 手機品牌 ba ,手機型號 md,就取2位數字了
flag = rand.nextInt(3);
switch (flag) {
case 0:
appStart.setBa("Sumsung");
appStart.setMd("sumsung-" + rand.nextInt(20));
break;
case 1:
appStart.setBa("Huawei");
appStart.setMd("Huawei-" + rand.nextInt(20));
break;
case 2:
appStart.setBa("HTC");
appStart.setMd("HTC-" + rand.nextInt(20));
break;
}
// 嵌入sdk的版本
appStart.setSv("V2." + rand.nextInt(10) + "." + rand.nextInt(10));
// gmail
appStart.setG(getRandomCharAndNumr(8) + "@gmail.com");
// 螢幕寬高 hw
flag = rand.nextInt(4);
switch (flag) {
case 0:
appStart.setHw("640*960");
break;
case 1:
appStart.setHw("640*1136");
break;
case 2:
appStart.setHw("750*1134");
break;
case 3:
appStart.setHw("1080*1920");
break;
}
// 客戶端產生日誌時間
long millis = System.currentTimeMillis();
appStart.setT("" + (millis - rand.nextInt(99999999)));
// 手機網路模式 3G,4G,WIFI
flag = rand.nextInt(3);
switch (flag) {
case 0:
appStart.setNw("3G");
break;
case 1:
appStart.setNw("4G");
break;
case 2:
appStart.setNw("WIFI");
break;
}
// 拉丁美洲 西經34°46′至西經117°09;北緯32°42′至南緯53°54′
// 經度
appStart.setLn((-34 - rand.nextInt(83) - rand.nextInt(60) / 10.0) + "");
// 緯度
appStart.setLa((32 - rand.nextInt(85) - rand.nextInt(60) / 10.0) + "");
// 入口
flag = rand.nextInt(5) + 1;
appStart.setEntry(flag + "");
// 開屏廣告類型
flag = rand.nextInt(2) + 1;
appStart.setOpen_ad_type(flag + "");
// 狀態
flag = rand.nextInt(10) > 8 ? 2 : 1;
appStart.setAction(flag + "");
// 載入時長
appStart.setLoading_time(rand.nextInt(20) + "");
// 失敗碼
flag = rand.nextInt(10);
switch (flag) {
case 1:
appStart.setDetail("102");
break;
case 2:
appStart.setDetail("201");
break;
case 3:
appStart.setDetail("325");
break;
case 4:
appStart.setDetail("433");
break;
case 5:
appStart.setDetail("542");
break;
default:
appStart.setDetail("");
break;
}
// 擴展欄位
appStart.setExtend1("");
return appStart;
}
/**
* 消息通知
*/
private static JSONObject generateNotification() {
AppNotification appNotification = new AppNotification();
int flag = rand.nextInt(4) + 1;
// 動作
appNotification.setAction(flag + "");
// 通知id
flag = rand.nextInt(4) + 1;
appNotification.setType(flag + "");
// 客戶端彈時間
appNotification.setAp_time((System.currentTimeMillis() - rand.nextInt(99999999)) + "");
// 備用欄位
appNotification.setContent("");
JSONObject jsonObject = (JSONObject) JSON.toJSON(appNotification);
return packEventJson("notification", jsonObject);
}
/**
* 後台活躍
*/
private static JSONObject generateBackground() {
AppActive appActive_background = new AppActive();
// 啟動源
int flag = rand.nextInt(3) + 1;
appActive_background.setActive_source(flag + "");
JSONObject jsonObject = (JSONObject) JSON.toJSON(appActive_background);
return packEventJson("active_background", jsonObject);
}
/**
* 錯誤日誌數據
*/
private static JSONObject generateError() {
AppErrorLog appErrorLog = new AppErrorLog();
String[] errorBriefs = {"at cn.lift.dfdf.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)", "at cn.lift.appIn.control.CommandUtil.getInfo(CommandUtil.java:67)"}; //錯誤摘要
String[] errorDetails = {"java.lang.NullPointerException\\n " + "at cn.lift.appIn.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)\\n " + "at cn.lift.dfdf.web.AbstractBaseController.validInbound", "at cn.lift.dfdfdf.control.CommandUtil.getInfo(CommandUtil.java:67)\\n " + "at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\\n" + " at java.lang.reflect.Method.invoke(Method.java:606)\\n"}; //錯誤詳情
//錯誤摘要
appErrorLog.setErrorBrief(errorBriefs[rand.nextInt(errorBriefs.length)]);
//錯誤詳情
appErrorLog.setErrorDetail(errorDetails[rand.nextInt(errorDetails.length)]);
JSONObject jsonObject = (JSONObject) JSON.toJSON(appErrorLog);
return packEventJson("error", jsonObject);
}
/**
* 為各個事件類型的公共欄位(時間、事件類型、Json數據)拼接
*/
private static JSONObject packEventJson(String eventName, JSONObject jsonObject) {
JSONObject eventJson = new JSONObject();
eventJson.put("ett", (System.currentTimeMillis() - rand.nextInt(99999999)) + "");
eventJson.put("en", eventName);
eventJson.put("kv", jsonObject);
return eventJson;
}
/**
* 獲取隨機字母組合
*
* @param length 字元串長度
*/
private static String getRandomChar(Integer length) {
StringBuilder str = new StringBuilder();
Random random = new Random();
for (int i = 0; i < length; i++) {
// 字元串
str.append((char) (65 + random.nextInt(26)));// 取得大寫字母
}
return str.toString();
}
/**
* 獲取隨機字母數字組合
*
* @param length 字元串長度
*/
private static String getRandomCharAndNumr(Integer length) {
StringBuilder str = new StringBuilder();
Random random = new Random();
for (int i = 0; i < length; i++) {
boolean b = random.nextBoolean();
if (b) { // 字元串
// int choice = random.nextBoolean() ? 65 : 97; 取得65大寫字母還是97小寫字母
str.append((char) (65 + random.nextInt(26)));// 取得大寫字母
} else { // 數字
str.append(String.valueOf(random.nextInt(10)));
}
}
return str.toString();
}
/**
* 收藏
*/
private static JSONObject generateFavorites() {
AppFavorites favorites = new AppFavorites();
favorites.setCourse_id(rand.nextInt(10));
favorites.setUserid(rand.nextInt(10));
favorites.setAdd_time((System.currentTimeMillis() - rand.nextInt(99999999)) + "");
JSONObject jsonObject = (JSONObject) JSON.toJSON(favorites);
return packEventJson("favorites", jsonObject);
}
/**
* 點贊
*/
private static JSONObject generatePraise() {
AppPraise praise = new AppPraise();
praise.setId(rand.nextInt(10));
praise.setUserid(rand.nextInt(10));
praise.setTarget_id(rand.nextInt(10));
praise.setType(rand.nextInt(4) + 1);
praise.setAdd_time((System.currentTimeMillis() - rand.nextInt(99999999)) + "");
JSONObject jsonObject = (JSONObject) JSON.toJSON(praise);
return packEventJson("praise", jsonObject);
}
/**
* 評論
*/
private static JSONObject generateComment() {
AppComment comment = new AppComment();
comment.setComment_id(rand.nextInt(10));
comment.setUserid(rand.nextInt(10));
comment.setP_comment_id(rand.nextInt(5));
comment.setContent(getCONTENT());
comment.setAddtime((System.currentTimeMillis() - rand.nextInt(99999999)) + "");
comment.setOther_id(rand.nextInt(10));
comment.setPraise_count(rand.nextInt(1000));
comment.setReply_count(rand.nextInt(200));
JSONObject jsonObject = (JSONObject) JSON.toJSON(comment);
return packEventJson("comment", jsonObject);
}
/**
* 生成單個漢字
*/
private static char getRandomChar() {
String str = "";
int hightPos; //
int lowPos;
Random random = new Random();
//隨機生成漢子的兩個位元組
hightPos = (176 + Math.abs(random.nextInt(39)));
lowPos = (161 + Math.abs(random.nextInt(93)));
byte[] b = new byte[2];
b[0] = (Integer.valueOf(hightPos)).byteValue();
b[1] = (Integer.valueOf(lowPos)).byteValue();
try {
str = new String(b, "GBK");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
System.out.println("錯誤");
}
return str.charAt(0);
}
/**
* 拼接成多個漢字
*/
private static String getCONTENT() {
StringBuilder str = new StringBuilder();
for (int i = 0; i < rand.nextInt(100); i++) {
str.append(getRandomChar());
}
return str.toString();
}
}
2.4.5 啟動測試
注意,需要將日誌模擬放到2台伺服器上,模擬日誌每一條中即包括公共日誌,又包含事件日誌,需要flume攔截器進行日誌分發,當然也需要兩個flume-ng來做這個事情
打包上傳2台伺服器節點,生產數據為後面的測試做準備,這裡為用戶目錄test文件夾下
通過參數控制生成消息速度及產量(如下 2秒一條,列印1000條)
#控制時間及條數
nohup java -jar data-producer-1.0-SNAPSHOT-jar-with-dependencies.jar 2000 1000 &
#監控日誌
tail -F /root/logs/*.log
通過www.json.cn查看數據格式
3 創建KafKa-Topic
- 創建啟動日誌主題:topic_start
- 創建事件日誌主題:topic_event
4 Flume準備
共分為2組flume
第一組:將伺服器日誌收集,並使用Kafka-Channels將數據發往Kafka不同的Topic,其中使用攔截器進行公共日誌和事件日誌的分發,
第二組:收集Kafka數據,使用Flie-Channels快取數據,最終發往Hdfs保存
4.1 Flume:File->Kafka配置編寫
- vim /root/test/file-flume-kafka.conf
#1 定義組件
a1.sources=r1
a1.channels=c1 c2
# 2 source配置 type類型 positionFile記錄日誌讀取位置 filegroups讀取哪些目錄 app.+為讀取什麼開頭 channels發往哪裡
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /root/test/flume/log_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /root/logs/app.+
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1 c2
#3 攔截器 這裡2個為自定義的攔截器 multiplexing為類型區分選擇器 header頭用於區分類型 mapping匹配頭
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = com.heaton.bigdata.flume.LogETLInterceptor$Builder
a1.sources.r1.interceptors.i2.type = com.heaton.bigdata.flume.LogTypeInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = topic
a1.sources.r1.selector.mapping.topic_start = c1
a1.sources.r1.selector.mapping.topic_event = c2
#4 channel配置 kafkaChannel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = cdh01.cm:9092,cdh02.cm:9092,cdh03.cm:9092
a1.channels.c1.kafka.topic = topic_start
a1.channels.c1.parseAsFlumeEvent = false
a1.channels.c1.kafka.consumer.group.id = flume-consumer
a1.channels.c2.type =org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.kafka.bootstrap.servers = cdh01.cm:9092,cdh02.cm:9092,cdh03.cm:9092
a1.channels.c2.kafka.topic = topic_event
a1.channels.c2.parseAsFlumeEvent = false
a1.channels.c2.kafka.consumer.group.id = flume-consumer
在生產日誌的2台伺服器節點上創建flume配置文件。
LogETLInterceptor,LogTypeInterceptor為自定義攔截
4.2 自定義攔截器
data-flume工程
- LogUtils
import org.apache.commons.lang.math.NumberUtils;
public class LogUtils {
public static boolean validateEvent(String log) {
/** 伺服器時間 | json
1588319303710|{
"cm":{
"ln":"-51.5","sv":"V2.0.7","os":"8.0.8","g":"[email protected]","mid":"13",
"nw":"4G","l":"en","vc":"7","hw":"640*960","ar":"MX","uid":"13","t":"1588291826938",
"la":"-38.2","md":"Huawei-14","vn":"1.3.6","ba":"Huawei","sr":"Y"
},
"ap":"app",
"et":[{
"ett":"1588228193191","en":"ad","kv":{"activityId":"1","displayMills":"113201","entry":"3","action":"5","contentType":"0"}
},{
"ett":"1588300304713","en":"notification","kv":{"ap_time":"1588277440794","action":"2","type":"3","content":""}
},{
"ett":"1588249203743","en":"active_background","kv":{"active_source":"3"}
},{
"ett":"1588254200122","en":"favorites","kv":{"course_id":5,"id":0,"add_time":"1588264138625","userid":0}
},{
"ett":"1588281152824","en":"praise","kv":{"target_id":4,"id":3,"type":3,"add_time":"1588307696417","userid":8}
}]
}
*/
// 1 切割
String[] logContents = log.split("\\|");
// 2 校驗
if (logContents.length != 2) {
return false;
}
//3 校驗伺服器時間
if (logContents[0].length() != 13 || !NumberUtils.isDigits(logContents[0])) {
return false;
}
// 4 校驗 json
if (!logContents[1].trim().startsWith("{")
|| !logContents[1].trim().endsWith("}")) {
return false;
}
return true;
}
public static boolean validateStart(String log) {
/**
{
"action":"1","ar":"MX","ba":"HTC","detail":"201","en":"start","entry":"4","extend1":"",
"g":"[email protected]","hw":"750*1134","l":"pt","la":"-29.7","ln":"-48.1","loading_time":"0",
"md":"HTC-18","mid":"14","nw":"3G","open_ad_type":"2","os":"8.0.8","sr":"D","sv":"V2.8.2",
"t":"1588251833523","uid":"14","vc":"15","vn":"1.2.9"
}
*/
if (log == null) {
return false;
}
// 校驗 json
if (!log.trim().startsWith("{") || !log.trim().endsWith("}")) {
return false;
}
return true;
}
}
- LogETLInterceptor
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
public class LogETLInterceptor implements Interceptor {
@Override
public void initialize() {
//初始化
}
@Override
public Event intercept(Event event) {
// 1 獲取數據
byte[] body = event.getBody();
String log = new String(body, Charset.forName("UTF-8"));
// 2 判斷數據類型並向 Header 中賦值
if (log.contains("start")) {
if (LogUtils.validateStart(log)) {
return event;
}
} else {
if (LogUtils.validateEvent(log)) {
return event;
}
}
// 3 返回校驗結果
return null;
}
@Override
public List<Event> intercept(List<Event> events) {
ArrayList<Event> interceptors = new ArrayList<>();
for (Event event : events) {
Event intercept1 = intercept(event);
if (intercept1 != null) {
interceptors.add(intercept1);
}
}
return interceptors;
}
@Override
public void close() {
//關閉
}
public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new LogETLInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
- LogTypeInterceptor
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class LogTypeInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
// 區分日誌類型: body header
// 1 獲取 body 數據
byte[] body = event.getBody();
String log = new String(body, Charset.forName("UTF-8"));
// 2 獲取 header
Map<String, String> headers = event.getHeaders();
// 3 判斷數據類型並向 Header 中賦值
if (log.contains("start")) {
headers.put("topic", "topic_start");
} else {
headers.put("topic", "topic_event");
}
return event;
}
@Override
public List<Event> intercept(List<Event> events) {
ArrayList<Event> interceptors = new ArrayList<>();
for (Event event : events) {
Event intercept1 = intercept(event);
interceptors.add(intercept1);
}
return interceptors;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new LogTypeInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
將項目打包放入Flume/lib目錄下(所有節點):
CDH路徑參考:/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/flume-ng/lib
4.3 Flume啟停腳本
- vim /root/log-kafka-flume.sh
#! /bin/bash
case $1 in
"start"){
for i in cdh02.cm cdh03.cm
do
echo " --------啟動 $i 消費 flume-------"
ssh $i "nohup flume-ng agent --conf-file /root/test/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/root/test/file-flume-kafka.log 2>&1 &"
done
};;
"stop"){
for i in cdh02.cm cdh03.cm
do
echo " --------停止 $i 消費 flume-------"
ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk '{print \$2}' | xargs kill"
done
};;
esac
4.4 Flume:Kafka->HDFS配置編寫
在第三台服務上準備
- vim /root/test/kafka-flume-hdfs.conf
## 組件
a1.sources=r1 r2
a1.channels=c1 c2
a1.sinks=k1 k2
## Kafka-source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers= cdh01.cm:9092,cdh02.cm:9092,cdh03.cm:9092
a1.sources.r1.kafka.topics = topic_start
## Kafka- source2
a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.batchSize = 5000
a1.sources.r2.batchDurationMillis = 2000
a1.sources.r2.kafka.bootstrap.servers = cdh01.cm:9092,cdh02.cm:9092,cdh03.cm:9092
a1.sources.r2.kafka.topics = topic_event
## channel1
a1.channels.c1.type = file
##索引文件路徑
a1.channels.c1.checkpointDir=/root/test/flume/checkpoint/behavior1
##持久化路徑
a1.channels.c1.dataDirs = /root/test/flume/data/behavior1/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6
## channel2
a1.channels.c2.type = file
##索引文件路徑
a1.channels.c1.checkpointDir=/root/test/flume/checkpoint/behavior2
##持久化路徑
a1.channels.c1.dataDirs = /root/test/flume/data/behavior2/
a1.channels.c2.maxFileSize = 2146435071
a1.channels.c2.capacity = 1000000
a1.channels.c2.keep-alive = 6
## HDFS-sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path=/origin_data/gmall/log/topic_start/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logstart-
## HDFS-sink2
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = logevent-
## 不要產生大量小文件
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k2.hdfs.rollInterval = 50
a1.sinks.k2.hdfs.rollSize = 134217728
a1.sinks.k2.hdfs.rollCount = 0
## 控制輸出文件是原生文件。
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k2.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = snappy
a1.sinks.k2.hdfs.codeC = snappy
## 組件拼裝
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
a1.sources.r2.channels = c2
a1.sinks.k2.channel= c2
4.5 Flume啟停腳本
在第三台服務上準備
- vim /root/test/kafka-hdfs-flume.sh
#! /bin/bash
case $1 in
"start"){
for i in cdh01.cm
do
echo " --------啟動 $i 消費 flume-------"
ssh $i "nohup flume-ng agent --conf-file /root/test/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/root/test/kafka-flume-hdfs.log 2>&1 &"
done
};;
"stop"){
for i in cdh01.cm
do
echo " --------停止 $i 消費 flume-------"
ssh $i "ps -ef | grep kafka-flume-hdfs | grep -v grep |awk '{print \$2}' | xargs kill"
done
};;
esac
5 業務數據
此模組後主要針對於企業報表決策,為數據分析提供數據支援,解決大數據量下,無法快速產出報表,及一些即席業務需求的快速展示提供數據支撐。劃分企業離線與實時業務,用離線的方式直觀的管理數據呈現,為實時方案奠定良好基礎。
5.1 電商業務流程
5.2 SKU-SPU
- SKU(Stock Keeping Unit):庫存量基本單位,現在已經被引申為產品統一編號的簡稱, 每種產品均對應有唯一的 SKU 號。
- SPU(Standard Product Unit):是商品資訊聚合的最小單位,是一組可復用、易檢索的 標準化資訊集合。
- 總結:黑鯊3 手機就是 SPU。一台鎧甲灰、256G 記憶體的就是 SKU。
5.3 業務表結構
5.3.1 訂單表(order_info)
5.3.2 訂單詳情表(order_detail)
5.3.3 SKU 商品表(sku_info)
5.3.4 用戶表(user_info)
5.3.5 商品一級分類表(base_category1)
5.3.6 商品二級分類表(base_category2)
5.3.7 商品三級分類表(base_category3)
5.3.8 支付流水表(payment_info)
5.3.9 省份表(base_province)
5.3.10 地區表(base_region)
5.3.11 品牌表(base_trademark)
5.3.12 訂單狀態表(order_status_log)
5.3.13 SPU 商品表(spu_info)
5.3.14 商品評論表(comment_info)
5.3.15 退單表(order_refund_info)
5.3.16 加入購物車表(cart_info)
5.3.17 商品收藏表(favor_info)
5.3.18 優惠券領用表(coupon_use)
5.3.19 優惠券表(coupon_info)
5.3.20 活動表(activity_info)
5.3.21 活動訂單關聯表(activity_order)
5.3.22 優惠規則表(activity_rule)
5.3.23 編碼字典表(base_dic)
5.3.24 活動參與商品表(activity_sku)
5.4 時間表結構
5.4.1 時間表(date_info)
5.4.2 假期表(holiday_info)
5.4.3 假期年表(holiday_year)
6 同步策略及數倉分層
數據同步策略的類型包括:全量表、增量表、新增及變化表
-
全量表:每天一個分區,存儲完整的數據。
-
增量表:每天新增數據放在一個分區,存儲新增加的數據。
-
新增及變化表:每天新增和變化的數據放在一個分區,存儲新增加的數據和變化的數據。
-
特殊表:沒有分區,只需要存儲一次。
6.1 全量策略
每日全量,每天存儲一份完整數據,作為一個分區。
適合場景:表數據量不大,且有新增或修改業務的場景
例如:品牌表、編碼表、商品分類表、優惠規則表、活動表、商品表、加購表、收藏表、SKU/SPU表
6.2 增量策略
每日增量,每天儲存一份增量數據,作為一個分區
適合場景:表數據量大,且只會有新增數據的場景。
例如:退單表、訂單狀態表、支付流水表、訂單詳情表、活動與訂單關聯表、商品評論表
6.3 新增及變化策略
每日新增及變化,儲存創建時間和操作時間都是今天的數據,作為一個分區
適合場景:表數據量大,既會有新增,又會有修改。
例如:用戶表、訂單表、優惠卷領用表。
6.4 特殊策略
某些特殊的維度表,可不必遵循上述同步策略,在數倉中只做一次同步,數據不變化不更新
適合場景:表數據幾乎不會變化
1.客觀世界維度:沒變化的客觀世界的維度(比如性別,地區,民族,政治成分,鞋子尺碼)可以只存一 份固定值
2.日期維度:日期維度可以一次性導入一年或若干年的數據。
3.地區維度:省份表、地區表
6.5 分析業務表同步策略
考慮到特殊表可能會緩慢變化,比如打仗佔地盤,地區表可能就會發生變化,故也選擇分區全量同步策略。
6.6 數倉分層
- 為什麼分層:
- 簡單化:把複雜的任務分解為多層來完成,每層處理各自的任務,方便定位問題。
- 減少重複開發:規範數據分層,通過中間層數據,能夠極大的減少重複計算,增加結果復用性。
- 隔離數據:不論是數據異常還是數據敏感性,使真實數據和統計數據解耦。
- 一般在DWD層進行維度建模
- ODS層:原始數據層,存放原始數據
- DWD層:對ODS層數據進行清洗(去空、臟數據,轉換類型等),維度退化,脫敏(保護隱私)
- DWS層:以DWD為基礎,按天進行匯總
- DWT層:以DWS為基礎,按主題進行匯總
- ADS層:為各種數據分析報表提供數據
7 Sqoop同步數據
Sqoop注意點:
Hive 中的 Null 在底層是以「\N」來存儲,而 MySQL 中的 Null 在底層就是 Null,為了 保證數據兩端的一致性。
- 在導出數據時採用 –input-null-string 和 –input-null-non-string
- 導入數據時採用 –null-string 和 –null-non-string
本例思路為:sqoop抽取mysql數據上傳至Hdfs上,存儲為parquet文件,在建立hive-ods表,使用對應數據。
使用DolphinScheduler調度執行腳本。
- Sqoop採集Mysql和Hive數據格式
mysql欄位類型 | hive:ods欄位類型 | hive:dwd-ads欄位類型 |
---|---|---|
tinyint | tinyint | tinyint |
int | int | int |
bigint | bigint | bigint |
varchar | string | string |
datetime | bigint | string |
bit | boolean | int |
double | double | double |
decimal | decimal | decimal |
8 ods層構建
8.1 ods建表
hive創建ods資料庫,使用DolphinScheduler創建數據源,在創建DAG時需要選擇hive庫。
順便將dwd,dws,dwt,ads一起創建了
- base_dic
drop table if exists ods.mall__base_dic
CREATE EXTERNAL TABLE `ods.mall__base_dic`(
`dic_code` string COMMENT '編號',
`dic_name` string COMMENT '編碼名稱',
`parent_code` string COMMENT '父編號',
`create_time` bigint COMMENT '創建日期',
`operate_time` bigint COMMENT '修改日期'
) COMMENT '編碼字典表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/base_dic/'
tblproperties ("parquet.compression"="snappy")
- base_trademark
drop table if exists ods.mall__base_trademark
CREATE EXTERNAL TABLE `ods.mall__base_trademark`(
`tm_id` string COMMENT '品牌id',
`tm_name` string COMMENT '品牌名稱'
) COMMENT '品牌表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/base_trademark/'
tblproperties ("parquet.compression"="snappy")
- base_category3
drop table if exists ods.mall__base_category3
CREATE EXTERNAL TABLE `ods.mall__base_category3`(
`id` bigint COMMENT '編號',
`name` string COMMENT '三級分類名稱',
`category2_id` bigint COMMENT '二級分類編號'
) COMMENT '三級分類表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/base_category3/'
tblproperties ("parquet.compression"="snappy")
- base_category2
drop table if exists ods.mall__base_category2
CREATE EXTERNAL TABLE `ods.mall__base_category2`(
`id` bigint COMMENT '編號',
`name` string COMMENT '二級分類名稱',
`category1_id` bigint COMMENT '一級分類編號'
) COMMENT '二級分類表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/base_category2/'
tblproperties ("parquet.compression"="snappy")
- base_category1
drop table if exists ods.mall__base_category1
CREATE EXTERNAL TABLE `ods.mall__base_category1`(
`id` bigint COMMENT '編號',
`name` string COMMENT '分類名稱'
) COMMENT '一級分類表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/base_category1/'
tblproperties ("parquet.compression"="snappy")
- activity_rule
drop table if exists ods.mall__activity_rule
CREATE EXTERNAL TABLE `ods.mall__activity_rule`(
`id` int COMMENT '編號',
`activity_id` int COMMENT '類型',
`condition_amount` decimal(16,2) COMMENT '滿減金額',
`condition_num` bigint COMMENT '滿減件數',
`benefit_amount` decimal(16,2) COMMENT '優惠金額',
`benefit_discount` bigint COMMENT '優惠折扣',
`benefit_level` bigint COMMENT '優惠級別'
) COMMENT '優惠規則'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/activity_rule/'
tblproperties ("parquet.compression"="snappy")
- activity_info
drop table if exists ods.mall__activity_info
CREATE EXTERNAL TABLE `ods.mall__activity_info`(
`id` bigint COMMENT '活動id',
`activity_name` string COMMENT '活動名稱',
`activity_type` string COMMENT '活動類型',
`start_time` bigint COMMENT '開始時間',
`end_time` bigint COMMENT '結束時間',
`create_time` bigint COMMENT '創建時間'
) COMMENT '活動表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/activity_info/'
tblproperties ("parquet.compression"="snappy")
- activity_sku
drop table if exists ods.mall__activity_sku
CREATE EXTERNAL TABLE `ods.mall__activity_sku`(
`id` bigint COMMENT '編號',
`activity_id` bigint COMMENT '活動id',
`sku_id` bigint COMMENT 'sku_id',
`create_time` bigint COMMENT '創建時間'
) COMMENT '活動參與商品'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/activity_sku/'
tblproperties ("parquet.compression"="snappy")
- cart_info
drop table if exists ods.mall__cart_info
CREATE EXTERNAL TABLE `ods.mall__cart_info`(
`id` bigint COMMENT '編號',
`user_id` bigint COMMENT '用戶id',
`sku_id` bigint COMMENT 'sku_id',
`cart_price` decimal(10,2) COMMENT '放入購物車時價格',
`sku_num` bigint COMMENT '數量',
`sku_name` string COMMENT 'sku名稱',
`create_time` bigint COMMENT '創建時間',
`operate_time` bigint COMMENT '修改時間',
`is_ordered` bigint COMMENT '是否已經下單',
`order_time` bigint COMMENT '下單時間'
) COMMENT '購物車表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/cart_info/'
tblproperties ("parquet.compression"="snappy")
- favor_info
drop table if exists ods.mall__favor_info
CREATE EXTERNAL TABLE `ods.mall__favor_info`(
`id` bigint COMMENT '編號',
`user_id` bigint COMMENT '用戶id',
`sku_id` bigint COMMENT 'sku_id',
`spu_id` bigint COMMENT '商品id',
`is_cancel` string COMMENT '是否已取消 0 正常 1 已取消',
`create_time` bigint COMMENT '創建時間',
`cancel_time` bigint COMMENT '修改時間'
) COMMENT '商品收藏表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/favor_info/'
tblproperties ("parquet.compression"="snappy")
- coupon_info
drop table if exists ods.mall__coupon_info
CREATE EXTERNAL TABLE `ods.mall__coupon_info`(
`id` bigint COMMENT '購物券編號',
`coupon_name` string COMMENT '購物券名稱',
`coupon_type` string COMMENT '購物券類型 1 現金券 2 折扣券 3 滿減券 4 滿件打折券',
`condition_amount` decimal(10,2) COMMENT '滿額數',
`condition_num` bigint COMMENT '滿件數',
`activity_id` bigint COMMENT '活動編號',
`benefit_amount` decimal(16,2) COMMENT '減金額',
`benefit_discount` bigint COMMENT '折扣',
`create_time` bigint COMMENT '創建時間',
`range_type` string COMMENT '範圍類型 1、商品 2、品類 3、品牌',
`spu_id` bigint COMMENT '商品id',
`tm_id` bigint COMMENT '品牌id',
`category3_id` bigint COMMENT '品類id',
`limit_num` int COMMENT '最多領用次數',
`operate_time` bigint COMMENT '修改時間',
`expire_time` bigint COMMENT '過期時間'
) COMMENT '優惠券表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/coupon_info/'
tblproperties ("parquet.compression"="snappy")
- sku_info
drop table if exists ods.mall__sku_info
CREATE EXTERNAL TABLE `ods.mall__sku_info`(
`id` bigint COMMENT 'skuid',
`spu_id` bigint COMMENT 'spuid',
`price` decimal(10,0) COMMENT '價格',
`sku_name` string COMMENT 'sku名稱',
`sku_desc` string COMMENT '商品規格描述',
`weight` decimal(10,2) COMMENT '重量',
`tm_id` bigint COMMENT '品牌',
`category3_id` bigint COMMENT '三級分類id',
`create_time` bigint COMMENT '創建時間'
) COMMENT '庫存單元表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/sku_info/'
tblproperties ("parquet.compression"="snappy")
- spu_info
drop table if exists ods.mall__spu_info
CREATE EXTERNAL TABLE `ods.mall__spu_info`(
`id` bigint COMMENT '商品id',
`spu_name` string COMMENT '商品名稱',
`category3_id` bigint COMMENT '三級分類id',
`tm_id` bigint COMMENT '品牌id'
) COMMENT '商品表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/spu_info/'
tblproperties ("parquet.compression"="snappy")
- base_province
drop table if exists ods.mall__base_province
CREATE EXTERNAL TABLE `ods.mall__base_province`(
`id` bigint COMMENT 'id',
`name` string COMMENT '省名稱',
`region_id` string COMMENT '大區id',
`area_code` string COMMENT '行政區位碼',
`iso_code` string COMMENT '國際編碼'
) COMMENT '省份表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/base_province/'
tblproperties ("parquet.compression"="snappy")
- base_region
drop table if exists ods.mall__base_region
CREATE EXTERNAL TABLE `ods.mall__base_region`(
`id` string COMMENT '大區id',
`region_name` string COMMENT '大區名稱'
) COMMENT '地區表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/base_region/'
tblproperties ("parquet.compression"="snappy")
- refund_info
drop table if exists ods.mall__order_refund_info
CREATE EXTERNAL TABLE `ods.mall__order_refund_info`(
`id` bigint COMMENT '編號',
`user_id` bigint COMMENT '用戶id',
`order_id` bigint COMMENT '訂單編號',
`sku_id` bigint COMMENT 'skuid',
`refund_type` string COMMENT '退款類型',
`refund_num` bigint COMMENT '退貨件數',
`refund_amount` decimal(16,2) COMMENT '退款金額',
`refund_reason_type` string COMMENT '原因類型',
`create_time` bigint COMMENT '創建時間'
) COMMENT '退單表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/order_refund_info/'
tblproperties ("parquet.compression"="snappy")
- order_status_log
drop table if exists ods.mall__order_status_log
CREATE EXTERNAL TABLE `ods.mall__order_status_log`(
`id` bigint COMMENT '編號',
`order_id` bigint COMMENT '訂單編號',
`order_status` string COMMENT '訂單狀態',
`operate_time` bigint COMMENT '操作時間'
) COMMENT '訂單狀態表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/order_status_log/'
tblproperties ("parquet.compression"="snappy")
- payment_info
drop table if exists ods.mall__payment_info
CREATE EXTERNAL TABLE `ods.mall__payment_info`(
`id` bigint COMMENT '編號',
`out_trade_no` string COMMENT '對外業務編號',
`order_id` bigint COMMENT '訂單編號',
`user_id` bigint COMMENT '用戶編號',
`alipay_trade_no` string COMMENT '支付寶交易流水編號',
`total_amount` decimal(16,2) COMMENT '支付金額',
`subject` string COMMENT '交易內容',
`payment_type` string COMMENT '支付方式',
`payment_time` bigint COMMENT '支付時間'
) COMMENT '支付流水表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/payment_info/'
tblproperties ("parquet.compression"="snappy")
- order_detail
drop table if exists ods.mall__order_detail
CREATE EXTERNAL TABLE `ods.mall__order_detail`(
`id` bigint COMMENT '編號',
`order_id` bigint COMMENT '訂單編號',
`user_id` bigint COMMENT '用戶id',
`sku_id` bigint COMMENT 'sku_id',
`sku_name` string COMMENT 'sku名稱',
`order_price` decimal(10,2) COMMENT '購買價格(下單時sku價格)',
`sku_num` string COMMENT '購買個數',
`create_time` bigint COMMENT '創建時間'
) COMMENT '訂單明細表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/order_detail/'
tblproperties ("parquet.compression"="snappy")
- activity_order
drop table if exists ods.mall__activity_order
CREATE EXTERNAL TABLE `ods.mall__activity_order`(
`id` bigint COMMENT '編號',
`activity_id` bigint COMMENT '活動id',
`order_id` bigint COMMENT '訂單編號',
`create_time` bigint COMMENT '發生日期'
) COMMENT '活動與訂單關聯表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/activity_order/'
tblproperties ("parquet.compression"="snappy")
- comment_info
drop table if exists ods.mall__comment_info
CREATE EXTERNAL TABLE `ods.mall__comment_info`(
`id` bigint COMMENT '編號',
`user_id` bigint COMMENT '用戶名稱',
`sku_id` bigint COMMENT 'skuid',
`spu_id` bigint COMMENT '商品id',
`order_id` bigint COMMENT '訂單編號',
`appraise` string COMMENT '評價 1 好評 2 中評 3 差評',
`comment_txt` string COMMENT '評價內容',
`create_time` bigint COMMENT '創建時間'
) COMMENT '商品評論表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/comment_info/'
tblproperties ("parquet.compression"="snappy")
- coupon_use
drop table if exists ods.mall__coupon_use
CREATE EXTERNAL TABLE `ods.mall__coupon_use`(
`id` bigint COMMENT '編號',
`coupon_id` bigint COMMENT '購物券ID',
`user_id` bigint COMMENT '用戶ID',
`order_id` bigint COMMENT '訂單ID',
`coupon_status` string COMMENT '購物券狀態',
`get_time` bigint COMMENT '領券時間',
`using_time` bigint COMMENT '使用時間',
`used_time` bigint COMMENT '過期時間'
) COMMENT '優惠券領用表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/coupon_use/'
tblproperties ("parquet.compression"="snappy")
- user_info
drop table if exists ods.mall__user_info
CREATE EXTERNAL TABLE `ods.mall__user_info`(
`id` bigint COMMENT '編號',
`name` string COMMENT '用戶姓名',
`email` string COMMENT '郵箱',
`user_level` string COMMENT '用戶級別',
`birthday` bigint COMMENT '用戶生日',
`gender` string COMMENT '性別 M男,F女',
`create_time` bigint COMMENT '創建時間',
`operate_time` bigint COMMENT '修改時間'
) COMMENT '用戶表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/user_info/'
tblproperties ("parquet.compression"="snappy")
- order_info
drop table if exists ods.mall__order_info
CREATE EXTERNAL TABLE `ods.mall__order_info`(
`id` bigint COMMENT '編號',
`final_total_amount` decimal(16,2) COMMENT '總金額',
`order_status` string COMMENT '訂單狀態',
`user_id` bigint COMMENT '用戶id',
`out_trade_no` string COMMENT '訂單交易編號(第三方支付用)',
`create_time` bigint COMMENT '創建時間',
`operate_time` bigint COMMENT '操作時間',
`province_id` int COMMENT '地區',
`benefit_reduce_amount` decimal(16,2) COMMENT '優惠金額',
`original_total_amount` decimal(16,2) COMMENT '原價金額',
`feight_fee` decimal(16,2) COMMENT '運費'
) COMMENT '訂單表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/order_info/'
tblproperties ("parquet.compression"="snappy")
- start_log
此為埋點啟動日誌表
drop table if exists ods.mall__start_log
CREATE EXTERNAL TABLE `ods.mall__start_log`(
`line` string COMMENT '啟動日誌'
) COMMENT '啟動日誌表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
location '/warehouse/ods/mall/start_log/'
- event_log
此為埋點事件日誌表
drop table if exists ods.mall__event_log
CREATE EXTERNAL TABLE `ods.mall__event_log`(
`line` string COMMENT '事件日誌'
) COMMENT '事件日誌表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
location '/warehouse/ods/mall/event_log/'
- date_info
此為時間表
drop table if exists ods.mall__date_info
CREATE EXTERNAL TABLE `ods.mall__date_info`(
`date_id` int COMMENT '日',
`week_id` int COMMENT '周',
`week_day` int COMMENT '周的第幾天',
`day` int COMMENT '每月的第幾天',
`month` int COMMENT '第幾月',
`quarter` int COMMENT '第幾季度',
`year` int COMMENT '年',
`is_workday` int COMMENT '是否是周末',
`holiday_id` int COMMENT '是否是節假日'
) COMMENT '時間維度表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ods/mall/date_info/'
tblproperties ("parquet.compression"="snappy")
8.2 mysql數據抽取
- sqoop抽取腳本基礎
#!/bin/bash
db_date=${date}
mysql_db_name=${db_name}
mysql_db_addr=${db_addr}
mysql_db_user=${db_user}
mysql_db_password=${db_password}
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
echo "日期:"$db_date
echo "mysql庫名:"$mysql_db_name
import_data() {
/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/sqoop import \
--connect jdbc:mysql://$mysql_db_addr:3306/$mysql_db_name?tinyInt1isBit=false \
--username $mysql_db_user \
--password $mysql_db_password \
--target-dir /origin_data/$mysql_db_name/$1/$db_date \
--delete-target-dir \
--num-mappers 1 \
--null-string '' \
--null-non-string '\\n' \
--fields-terminated-by "\t" \
--query "$2"' and $CONDITIONS;' \
--as-parquetfile
}
- DolphinScheduler全局參數
date | 不傳為昨天 |
---|---|
db_name | 資料庫名字 |
db_addr | 資料庫IP地址 |
db_user | 資料庫用戶 |
db_password | 資料庫密碼 |
元數據中數據開始日期為2020-03-15
如下導入數據程式碼片段,拼接上述的基礎片段執行
- 全量表程式碼片段
import_data "base_dic" "select
dic_code,
dic_name,
parent_code,
create_time,
operate_time
from base_dic
where 1=1"
import_data "base_trademark" "select
tm_id,
tm_name
from base_trademark
where 1=1"
import_data "base_category3" "select
id,
name,
category2_id
from base_category3 where 1=1"
import_data "base_category2" "select
id,
name,
category1_id
from base_category2 where 1=1"
import_data "base_category1" "select
id,
name
from base_category1 where 1=1"
import_data "activity_rule" "select
id,
activity_id,
condition_amount,
condition_num,
benefit_amount,
benefit_discount,
benefit_level
from activity_rule
where 1=1"
import_data "activity_info" "select
id,
activity_name,
activity_type,
start_time,
end_time,
create_time
from activity_info
where 1=1"
import_data "activity_sku" "select
id,
activity_id,
sku_id,
create_time
FROM
activity_sku
where 1=1"
import_data "cart_info" "select
id,
user_id,
sku_id,
cart_price,
sku_num,
sku_name,
create_time,
operate_time,
is_ordered,
order_time
from cart_info
where 1=1"
import_data "favor_info" "select
id,
user_id,
sku_id,
spu_id,
is_cancel,
create_time,
cancel_time
from favor_info
where 1=1"
import_data "coupon_info" "select
id,
coupon_name,
coupon_type,
condition_amount,
condition_num,
activity_id,
benefit_amount,
benefit_discount,
create_time,
range_type,
spu_id,
tm_id,
category3_id,
limit_num,
operate_time,
expire_time
from coupon_info
where 1=1"
import_data "sku_info" "select
id,
spu_id,
price,
sku_name,
sku_desc,
weight,
tm_id,
category3_id,
create_time
from sku_info where 1=1"
import_data "spu_info" "select
id,
spu_name,
category3_id,
tm_id
from spu_info
where 1=1"
- 特殊表程式碼片段
import_data "base_province" "select
id,
name,
region_id,
area_code,
iso_code
from base_province
where 1=1"
import_data "base_region" "select
id,
region_name
from base_region
where 1=1"
import_data "date_info" "select
date_id,
week_id,
week_day,
day,
month,
quarter,
year,
is_workday,
holiday_id
from date_info
where 1=1"
- 增量表程式碼片段
import_data "order_refund_info" "select
id,
user_id,
order_id,
sku_id,
refund_type,
refund_num,
refund_amount,
refund_reason_type,
create_time
from order_refund_info
where
date_format(create_time,'%Y-%m-%d')='$db_date'"
import_data "order_status_log" "select
id,
order_id,
order_status,
operate_time
from order_status_log
where
date_format(operate_time,'%Y-%m-%d')='$db_date'"
import_data "payment_info" "select
id,
out_trade_no,
order_id,
user_id,
alipay_trade_no,
total_amount,
subject,
payment_type,
payment_time
from payment_info
where
DATE_FORMAT(payment_time,'%Y-%m-%d')='$db_date'"
import_data "order_detail" "select
od.id,
od.order_id,
oi.user_id,
od.sku_id,
od.sku_name,
od.order_price,
od.sku_num,
od.create_time
from order_detail od
join order_info oi
on od.order_id=oi.id
where
DATE_FORMAT(od.create_time,'%Y-%m-%d')='$db_date'"
import_data "activity_order" "select
id,
activity_id,
order_id,
create_time
from activity_order
where
date_format(create_time,'%Y-%m-%d')='$db_date'"
import_data "comment_info" "select
id,
user_id,
sku_id,
spu_id,
order_id,
appraise,
comment_txt,
create_time
from comment_info
where date_format(create_time,'%Y-%m-%d')='$db_date'"
- 增量及變化表程式碼片段
import_data "coupon_use" "select
id,
coupon_id,
user_id,
order_id,
coupon_status,
get_time,
using_time,
used_time
from coupon_use
where (date_format(get_time,'%Y-%m-%d')='$db_date'
or date_format(using_time,'%Y-%m-%d')='$db_date'
or date_format(used_time,'%Y-%m-%d')='$db_date')"
import_data "user_info" "select
id,
name,
birthday,
gender,
email,
user_level,
create_time,
operate_time
from user_info
where (DATE_FORMAT(create_time,'%Y-%m-%d')='$db_date'
or DATE_FORMAT(operate_time,'%Y-%m-%d')='$db_date')"
import_data "order_info" "select
id,
final_total_amount,
order_status,
user_id,
out_trade_no,
create_time,
operate_time,
province_id,
benefit_reduce_amount,
original_total_amount,
feight_fee
from order_info
where (date_format(create_time,'%Y-%m-%d')='$db_date'
or date_format(operate_time,'%Y-%m-%d')='$db_date')"
8.3 ods層數據載入
- 腳本修改$table_name即可
注意2張埋點日誌表的數據導出目錄
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ods
table_name=base_dic
hive_table_name=$APP2.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
load data inpath '/origin_data/$APP1/$table_name/$db_date' OVERWRITE into table $hive_table_name partition(dt='$db_date');
"
$hive -e "$sql"
9 dwd層構建
9.1 dwd層構建(啟動-事件日誌)
9.1.1 啟動日誌表
- 建表
drop table if exists dwd.mall__start_log
CREATE EXTERNAL TABLE `dwd.mall__start_log`(
`mid_id` string COMMENT '設備唯一標識',
`user_id` string COMMENT '用戶標識',
`version_code` string COMMENT '程式版本號',
`version_name` string COMMENT '程式版本名',
`lang` string COMMENT '系統語言',
`source` string COMMENT '渠道號',
`os` string COMMENT '系統版本',
`area` string COMMENT '區域',
`model` string COMMENT '手機型號',
`brand` string COMMENT '手機品牌',
`sdk_version` string COMMENT 'sdkVersion',
`gmail` string COMMENT 'gmail',
`height_width` string COMMENT '螢幕寬高',
`app_time` string COMMENT '客戶端日誌產生時的時間',
`network` string COMMENT '網路模式',
`lng` string COMMENT '經度',
`lat` string COMMENT '緯度',
`entry` string COMMENT '入口: push=1,widget=2,icon=3,notification=4,lockscreen_widget=5',
`open_ad_type` string COMMENT '開屏廣告類型: 開屏原生廣告=1, 開屏插屏廣告=2',
`action` string COMMENT '狀態:成功=1 失敗=2',
`loading_time` string COMMENT '載入時長',
`detail` string COMMENT '失敗碼',
`extend1` string COMMENT '失敗的 message'
) COMMENT '啟動日誌表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/start_log/'
tblproperties ("parquet.compression"="snappy")
- 數據導入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
APP3=ods
table_name=start_log
hive_table_name=$APP2.mall__$table_name
hive_origin_table_name=$APP3.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
get_json_object(line,'$.mid') mid_id,
get_json_object(line,'$.uid') user_id,
get_json_object(line,'$.vc') version_code,
get_json_object(line,'$.vn') version_name,
get_json_object(line,'$.l') lang,
get_json_object(line,'$.sr') source,
get_json_object(line,'$.os') os,
get_json_object(line,'$.ar') area,
get_json_object(line,'$.md') model,
get_json_object(line,'$.ba') brand,
get_json_object(line,'$.sv') sdk_version,
get_json_object(line,'$.g') gmail,
get_json_object(line,'$.hw') height_width,
get_json_object(line,'$.t') app_time,
get_json_object(line,'$.nw') network,
get_json_object(line,'$.ln') lng,
get_json_object(line,'$.la') lat,
get_json_object(line,'$.entry') entry,
get_json_object(line,'$.open_ad_type') open_ad_type,
get_json_object(line,'$.action') action,
get_json_object(line,'$.loading_time') loading_time,
get_json_object(line,'$.detail') detail,
get_json_object(line,'$.extend1') extend1
from $hive_origin_table_name
where dt='$db_date';
"
$hive -e "$sql"
9.1.2 事件日誌表
- 建表
drop table if exists dwd.mall__event_log
CREATE EXTERNAL TABLE `dwd.mall__event_log`(
`mid_id` string COMMENT '設備唯一標識',
`user_id` string COMMENT '用戶標識',
`version_code` string COMMENT '程式版本號',
`version_name` string COMMENT '程式版本名',
`lang` string COMMENT '系統語言',
`source` string COMMENT '渠道號',
`os` string COMMENT '系統版本',
`area` string COMMENT '區域',
`model` string COMMENT '手機型號',
`brand` string COMMENT '手機品牌',
`sdk_version` string COMMENT 'sdkVersion',
`gmail` string COMMENT 'gmail',
`height_width` string COMMENT '螢幕寬高',
`app_time` string COMMENT '客戶端日誌產生時的時間',
`network` string COMMENT '網路模式',
`lng` string COMMENT '經度',
`lat` string COMMENT '緯度',
`event_name` string COMMENT '事件名稱',
`event_json` string COMMENT '事件詳情',
`server_time` string COMMENT '伺服器時間'
) COMMENT '事件日誌表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/event_log/'
tblproperties ("parquet.compression"="snappy")
9.2.1 製作 UDF UDTF
- udf
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.json.JSONException;
import org.json.JSONObject;
public class BaseFieldUDF extends UDF {
public String evaluate(String line, String key) throws JSONException {
String[] log = line.split("\\|");
if (log.length != 2 || StringUtils.isBlank(log[1])) {
return "";
}
JSONObject baseJson = new JSONObject(log[1].trim());
String result = "";
// 獲取伺服器時間
if ("st".equals(key)) {
result = log[0].trim();
} else if ("et".equals(key)) {
// 獲取事件數組
if (baseJson.has("et")) {
result = baseJson.getString("et");
}
} else {
JSONObject cm = baseJson.getJSONObject("cm");
// 獲取 key 對應公共欄位的 value
if (cm.has(key)) {
result = cm.getString(key);
}
}
return result;
}
public static void main(String[] args) throws JSONException {
String line = " 1588319303710|{\n" +
" \"cm\":{\n" +
" \"ln\":\"-51.5\",\"sv\":\"V2.0.7\",\"os\":\"8.0.8\",\"g\":\"[email protected]\",\"mid\":\"13\",\n" +
" \"nw\":\"4G\",\"l\":\"en\",\"vc\":\"7\",\"hw\":\"640*960\",\"ar\":\"MX\",\"uid\":\"13\",\"t\":\"1588291826938\",\n" +
" \"la\":\"-38.2\",\"md\":\"Huawei-14\",\"vn\":\"1.3.6\",\"ba\":\"Huawei\",\"sr\":\"Y\"\n" +
" },\n" +
" \"ap\":\"app\",\n" +
" \"et\":[{\n" +
" \"ett\":\"1588228193191\",\"en\":\"ad\",\"kv\":{\"activityId\":\"1\",\"displayMills\":\"113201\",\"entry\":\"3\",\"action\":\"5\",\"contentType\":\"0\"}\n" +
" },{\n" +
" \"ett\":\"1588300304713\",\"en\":\"notification\",\"kv\":{\"ap_time\":\"1588277440794\",\"action\":\"2\",\"type\":\"3\",\"content\":\"\"}\n" +
" },{\n" +
" \"ett\":\"1588249203743\",\"en\":\"active_background\",\"kv\":{\"active_source\":\"3\"}\n" +
" },{\n" +
" \"ett\":\"1588225856101\",\"en\":\"comment\",\"kv\":{\"p_comment_id\":0,\"addtime\":\"1588263895040\",\"praise_count\":231,\"other_id\":5,\"comment_id\":5,\"reply_count\":62,\"userid\":7,\"content\":\"骸汞\"}\n" +
" },{\n" +
" \"ett\":\"1588254200122\",\"en\":\"favorites\",\"kv\":{\"course_id\":5,\"id\":0,\"add_time\":\"1588264138625\",\"userid\":0}\n" +
" },{\n" +
" \"ett\":\"1588281152824\",\"en\":\"praise\",\"kv\":{\"target_id\":4,\"id\":3,\"type\":3,\"add_time\":\"1588307696417\",\"userid\":8}\n" +
" }]\n" +
" }";
String s = new BaseFieldUDF().evaluate(line, "mid");
String ss = new BaseFieldUDF().evaluate(line, "st");
String sss = new BaseFieldUDF().evaluate(line, "et");
System.out.println(s);
System.out.println(ss);
System.out.println(sss);
}
}
結果:
13
1588319303710
[{“ett”:”1588228193191″,”en”:”ad”,”kv”:{“activityId”:”1″,”displayMills”:”113201″,”entry”:”3″,”action”:”5″,”contentType”:”0″}},{“ett”:”1588300304713″,”en”:”notification”,”kv”:{“ap_time”:”1588277440794″,”action”:”2″,”type”:”3″,”content”:””}},{“ett”:”1588249203743″,”en”:”active_background”,”kv”:{“active_source”:”3″}},{“ett”:”1588225856101″,”en”:”comment”,”kv”:{“p_comment_id”:0,”addtime”:”1588263895040″,”praise_count”:231,”other_id”:5,”comment_id”:5,”reply_count”:62,”userid”:7,”content”:”骸汞”}},{“ett”:”1588254200122″,”en”:”favorites”,”kv”:{“course_id”:5,”id”:0,”add_time”:”1588264138625″,”userid”:0}},{“ett”:”1588281152824″,”en”:”praise”,”kv”:{“target_id”:4,”id”:3,”type”:3,”add_time”:”1588307696417″,”userid”:8}}]
- udtf
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.json.JSONArray;
import org.json.JSONException;
import java.util.ArrayList;
public class EventJsonUDTF extends GenericUDTF {
//該方法中,我們將指定輸出參數的名稱和參數類型:
public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
ArrayList<String> fieldNames = new ArrayList<String>();
ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
fieldNames.add("event_name");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("event_json");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,
fieldOIs);
}
//輸入 1 條記錄,輸出若干條結果
@Override
public void process(Object[] objects) throws HiveException {
// 獲取傳入的 et
String input = objects[0].toString();
// 如果傳進來的數據為空,直接返回過濾掉該數據
if (StringUtils.isBlank(input)) {
return;
} else {
try {
// 獲取一共有幾個事件(ad/facoriters)
JSONArray ja = new JSONArray(input);
if (ja == null)
return;
// 循環遍歷每一個事件
for (int i = 0; i < ja.length(); i++) {
String[] result = new String[2];
try {
// 取出每個的事件名稱(ad/facoriters)
result[0] = ja.getJSONObject(i).getString("en");
// 取出每一個事件整體
result[1] = ja.getString(i);
} catch (JSONException e) {
continue;
}
// 將結果返回
forward(result);
}
} catch (JSONException e) {
e.printStackTrace();
}
}
}
//當沒有記錄處理的時候該方法會被調用,用來清理程式碼或者產生額外的輸出
@Override
public void close() throws HiveException {
}
}
9.1.2.2 直接永久使用UDF
- 上傳UDF資源
將hive-function-1.0-SNAPSHOT包傳到HDFS 的/user/hive/jars下
hadoop dfs -mkdir /user/hive/jars
hadoop dfs -put hive-function-1.0-SNAPSHOT.jar /user/hive/jars/hive-function-1.0-SNAPSHOT.jar
在hive中創建永久UDF
create function base_analizer as 'com.heaton.bigdata.udf.BaseFieldUDF' using jar 'hdfs://cdh01.cm:8020/user/hive/jars/hive-function-1.0-SNAPSHOT.jar';
create function flat_analizer as 'com.heaton.bigdata.udtf.EventJsonUDTF' using jar 'hdfs://cdh01.cm:8020/user/hive/jars/hive-function-1.0-SNAPSHOT.jar';
9.1.2.3 Dolphin使用方式UDF
在DAG圖創建SQL工具中選擇對應UDF函數即可使用,但是目前Dolphin1.2.0中關聯函數操作保存無效。
大家可以使用UDF管理功能將JAR傳入到HDFS上,這樣通過腳本加入臨時函數,也可以很好的完成功能。
臨時函數語句:
create temporary function base_analizer as 'com.heaton.bigdata.udf.BaseFieldUDF' using jar 'hdfs://cdh01.cm:8020/dolphinscheduler/dolphinscheduler/udfs/hive-function-1.0-SNAPSHOT.jar'; create temporary function flat_analizer as 'com.heaton.bigdata.udtf.EventJsonUDTF' using jar 'hdfs://cdh01.cm:8020/dolphinscheduler/dolphinscheduler/udfs/hive-function-1.0-SNAPSHOT.jar';
9.2.4 數據導入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
APP3=ods
table_name=event_log
hive_table_name=$APP2.mall__$table_name
hive_origin_table_name=$APP3.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
base_analizer(line,'mid') as mid_id,
base_analizer(line,'uid') as user_id,
base_analizer(line,'vc') as version_code,
base_analizer(line,'vn') as version_name,
base_analizer(line,'l') as lang,
base_analizer(line,'sr') as source,
base_analizer(line,'os') as os,
base_analizer(line,'ar') as area,
base_analizer(line,'md') as model,
base_analizer(line,'ba') as brand,
base_analizer(line,'sv') as sdk_version,
base_analizer(line,'g') as gmail,
base_analizer(line,'hw') as height_width,
base_analizer(line,'t') as app_time,
base_analizer(line,'nw') as network,
base_analizer(line,'ln') as lng,
base_analizer(line,'la') as lat,
event_name,
event_json,
base_analizer(line,'st') as server_time
from $hive_origin_table_name lateral view flat_analizer(base_analizer(line,'et')) tmp_flat as event_name,event_json
where dt='$db_date' and base_analizer(line,'et')<>'';
"
$hive -e "$sql"
9.1.3 商品點擊表
- 建表
drop table if exists dwd.mall__display_log
CREATE EXTERNAL TABLE `dwd.mall__display_log`(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`action` string,
`goodsid` string,
`place` string,
`extend1` string,
`category` string,
`server_time` string
) COMMENT '商品點擊表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/display_log/'
tblproperties ("parquet.compression"="snappy")
- 數據導入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
APP3=ods
table_name=display_log
hive_table_name=$APP2.mall__$table_name
hive_origin_table_name=$APP3.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.action') action,
get_json_object(event_json,'$.kv.goodsid') goodsid,
get_json_object(event_json,'$.kv.place') place,
get_json_object(event_json,'$.kv.extend1') extend1,
get_json_object(event_json,'$.kv.category') category,
server_time
from dwd.mall__event_log
where dt='$db_date' and event_name='display';
"
$hive -e "$sql"
9.1.4 商品列表表
- 建表
drop table if exists dwd.mall__loading_log
CREATE EXTERNAL TABLE `dwd.mall__loading_log`(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`action` string,
`loading_time` string,
`loading_way` string,
`extend1` string,
`extend2` string,
`type` string,
`type1` string,
`server_time` string
) COMMENT '商品列表表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/loading_log/'
tblproperties ("parquet.compression"="snappy")
- 數據導入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
APP3=ods
table_name=loading_log
hive_table_name=$APP2.mall__$table_name
hive_origin_table_name=$APP3.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.action') action,
get_json_object(event_json,'$.kv.loading_time') loading_time,
get_json_object(event_json,'$.kv.loading_way') loading_way,
get_json_object(event_json,'$.kv.extend1') extend1,
get_json_object(event_json,'$.kv.extend2') extend2,
get_json_object(event_json,'$.kv.type') type,
get_json_object(event_json,'$.kv.type1') type1,
server_time
from dwd.mall__event_log
where dt='$db_date' and event_name='loading';
"
$hive -e "$sql"
9.1.5 廣告表
- 建表
drop table if exists dwd.mall__ad_log
CREATE EXTERNAL TABLE `dwd.mall__ad_log`(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`entry` string,
`action` string,
`contentType` string,
`displayMills` string,
`itemId` string,
`activityId` string,
`server_time` string
) COMMENT '廣告表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/ad_log/'
tblproperties ("parquet.compression"="snappy")
- 數據導入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
APP3=ods
table_name=ad_log
hive_table_name=$APP2.mall__$table_name
hive_origin_table_name=$APP3.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.entry') entry,
get_json_object(event_json,'$.kv.action') action,
get_json_object(event_json,'$.kv.contentType') contentType,
get_json_object(event_json,'$.kv.displayMills') displayMills,
get_json_object(event_json,'$.kv.itemId') itemId,
get_json_object(event_json,'$.kv.activityId') activityId,
server_time
from dwd.mall__event_log
where dt='db_date' and event_name='ad';
"
$hive -e "$sql"
9.1.6 消息通知表
- 建表
drop table if exists dwd.mall__notification_log
CREATE EXTERNAL TABLE `dwd.mall__notification_log`(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`action` string,
`noti_type` string,
`ap_time` string,
`content` string,
`server_time` string
) COMMENT '消息通知表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/notification_log/'
tblproperties ("parquet.compression"="snappy")
- 數據導入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
APP3=ods
table_name=notification_log
hive_table_name=$APP2.mall__$table_name
hive_origin_table_name=$APP3.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.action') action,
get_json_object(event_json,'$.kv.noti_type') noti_type,
get_json_object(event_json,'$.kv.ap_time') ap_time,
get_json_object(event_json,'$.kv.content') content,
server_time
from dwd.mall__event_log
where dt='$db_date' and event_name='notification';
"
$hive -e "$sql"
9.1.7 用戶後台活躍表
- 建表
drop table if exists dwd.mall__active_background_log
CREATE EXTERNAL TABLE `dwd.mall__active_background_log`(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`active_source` string,
`server_time` string
) COMMENT '用戶後台活躍表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/active_background_log/'
tblproperties ("parquet.compression"="snappy")
- 數據導入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
APP3=ods
table_name=active_background_log
hive_table_name=$APP2.mall__$table_name
hive_origin_table_name=$APP3.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.active_source') active_source,
server_time
from dwd.mall__event_log
where dt='$db_date' and event_name='active_background';
"
$hive -e "$sql"
9.1.8 評論表
- 建表
drop table if exists dwd.mall__comment_log
CREATE EXTERNAL TABLE `dwd.mall__comment_log`(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`comment_id` int,
`userid` int,
`p_comment_id` int,
`content` string,
`addtime` string,
`other_id` int,
`praise_count` int,
`reply_count` int,
`server_time` string
) COMMENT '評論表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/comment_log/'
tblproperties ("parquet.compression"="snappy")
- 數據導入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
APP3=ods
table_name=comment_log
hive_table_name=$APP2.mall__$table_name
hive_origin_table_name=$APP3.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.comment_id') comment_id,
get_json_object(event_json,'$.kv.userid') userid,
get_json_object(event_json,'$.kv.p_comment_id') p_comment_id,
get_json_object(event_json,'$.kv.content') content,
get_json_object(event_json,'$.kv.addtime') addtime,
get_json_object(event_json,'$.kv.other_id') other_id,
get_json_object(event_json,'$.kv.praise_count') praise_count,
get_json_object(event_json,'$.kv.reply_count') reply_count,
server_time
from dwd.mall__event_log
where dt='$db_date' and event_name='comment';
"
$hive -e "$sql"
9.1.9 收藏表
- 建表
drop table if exists dwd.mall__favorites_log
CREATE EXTERNAL TABLE `dwd.mall__favorites_log`(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`id` int,
`course_id` int,
`userid` int,
`add_time` string,
`server_time` string
) COMMENT '收藏表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/favorites_log/'
tblproperties ("parquet.compression"="snappy")
- 數據導入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
APP3=ods
table_name=favorites_log
hive_table_name=$APP2.mall__$table_name
hive_origin_table_name=$APP3.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.id') id,
get_json_object(event_json,'$.kv.course_id') course_id,
get_json_object(event_json,'$.kv.userid') userid,
get_json_object(event_json,'$.kv.add_time') add_time,
server_time
from dwd.mall__event_log
where dt='$db_date' and event_name='favorites';
"
$hive -e "$sql"
9.1.10 點贊表
- 建表
drop table if exists dwd.mall__praise_log
CREATE EXTERNAL TABLE `dwd.mall__praise_log`(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`id` string,
`userid` string,
`target_id` string,
`type` string,
`add_time` string,
`server_time` string
) COMMENT '點贊表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/praise_log/'
tblproperties ("parquet.compression"="snappy")
- 數據導入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
APP3=ods
table_name=praise_log
hive_table_name=$APP2.mall__$table_name
hive_origin_table_name=$APP3.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.id') id,
get_json_object(event_json,'$.kv.userid') userid,
get_json_object(event_json,'$.kv.target_id') target_id,
get_json_object(event_json,'$.kv.type') type,
get_json_object(event_json,'$.kv.add_time') add_time,
server_time
from dwd.mall__event_log
where dt='$db_date' and event_name='praise';
"
$hive -e "$sql"
9.1.11 錯誤日誌表
- 建表
drop table if exists dwd.mall__error_log
CREATE EXTERNAL TABLE `dwd.mall__error_log`(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`errorBrief` string,
`errorDetail` string,
`server_time` string
) COMMENT '錯誤日誌表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/error_log/'
tblproperties ("parquet.compression"="snappy")
- 數據導入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
APP3=ods
table_name=error_log
hive_table_name=$APP2.mall__$table_name
hive_origin_table_name=$APP3.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.errorBrief') errorBrief,
get_json_object(event_json,'$.kv.errorDetail') errorDetail,
server_time
from dwd.mall__event_log
where dt='$db_date' and event_name='error';
"
$hive -e "$sql"
9.2 dwd層構建(業務庫)
此層在構建之初,增量表需要動態分區來劃分時間,將數據放入指定分區
事實/維度 | 時間 | 用戶 | 地區 | 商品 | 優惠卷 | 活動 | 編碼 | 度量 |
---|---|---|---|---|---|---|---|---|
訂單 | √ | √ | √ | √ | 件數/金額 | |||
訂單詳情 | √ | √ | √ | 件數/金額 | ||||
支付 | √ | √ | 次數/金額 | |||||
加入購物車 | √ | √ | √ | 件數/金額 | ||||
收藏 | √ | √ | √ | 個數 | ||||
評價 | √ | √ | √ | 個數 | ||||
退款 | √ | √ | √ | 件數/金額 | ||||
優惠卷領用 | √ | √ | √ | 個數 |
9.2.1 商品維度表(全量)
- 建表
drop table if exists dwd.mall__dim_sku_info
CREATE EXTERNAL TABLE `dwd.mall__dim_sku_info`(
`id` string COMMENT '商品 id',
`spu_id` string COMMENT 'spuid',
`price` double COMMENT '商品價格',
`sku_name` string COMMENT '商品名稱',
`sku_desc` string COMMENT '商品描述',
`weight` double COMMENT '重量',
`tm_id` string COMMENT '品牌 id',
`tm_name` string COMMENT '品牌名稱',
`category3_id` string COMMENT '三級分類 id',
`category2_id` string COMMENT '二級分類 id',
`category1_id` string COMMENT '一級分類 id',
`category3_name` string COMMENT '三級分類名稱',
`category2_name` string COMMENT '二級分類名稱',
`category1_name` string COMMENT '一級分類名稱',
`spu_name` string COMMENT 'spu 名稱',
`create_time` string COMMENT '創建時間'
) COMMENT '商品維度表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/dim_sku_info/'
tblproperties ("parquet.compression"="snappy")
- 數據導入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=dim_sku_info
hive_table_name=$APP2.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
sku.id,
sku.spu_id,
sku.price,
sku.sku_name,
sku.sku_desc,
sku.weight,
sku.tm_id,
ob.tm_name,
sku.category3_id,
c2.id category2_id,
c1.id category1_id,
c3.name category3_name,
c2.name category2_name,
c1.name category1_name,
spu.spu_name,
from_unixtime(cast(sku.create_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') create_time
from
(
select * from ods.mall__sku_info where dt='$db_date'
)sku
join
(
select * from ods.mall__base_trademark where dt='$db_date'
)ob on sku.tm_id=ob.tm_id
join
(
select * from ods.mall__spu_info where dt='$db_date'
)spu on spu.id = sku.spu_id
join
(
select * from ods.mall__base_category3 where dt='$db_date'
)c3 on sku.category3_id=c3.id
join
(
select * from ods.mall__base_category2 where dt='$db_date'
)c2 on c3.category2_id=c2.id
join
(
select * from ods.mall__base_category1 where dt='$db_date'
)c1 on c2.category1_id=c1.id;
"
$hive -e "$sql"
9.2.2 優惠券資訊維度表(全量)
- 建表
drop table if exists dwd.mall__dim_coupon_info
CREATE EXTERNAL TABLE `dwd.mall__dim_coupon_info`(
`id` string COMMENT '購物券編號',
`coupon_name` string COMMENT '購物券名稱',
`coupon_type` string COMMENT '購物券類型 1 現金券 2 折扣券 3 滿減券 4 滿件打折券',
`condition_amount` string COMMENT '滿額數',
`condition_num` string COMMENT '滿件數',
`activity_id` string COMMENT '活動編號',
`benefit_amount` string COMMENT '減金額',
`benefit_discount` string COMMENT '折扣',
`create_time` string COMMENT '創建時間',
`range_type` string COMMENT '範圍類型 1、商品 2、品類 3、品牌',
`spu_id` string COMMENT '商品 id',
`tm_id` string COMMENT '品牌 id',
`category3_id` string COMMENT '品類 id',
`limit_num` string COMMENT '最多領用次數',
`operate_time` string COMMENT '修改時間',
`expire_time` string COMMENT '過期時間'
) COMMENT '優惠券資訊維度表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/dim_coupon_info/'
tblproperties ("parquet.compression"="snappy")
- 數據導入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=dim_coupon_info
hive_table_name=$APP2.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
id,
coupon_name,
coupon_type,
condition_amount,
condition_num,
activity_id,
benefit_amount,
benefit_discount,
from_unixtime(cast(create_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') create_time,
range_type,
spu_id,
tm_id,
category3_id,
limit_num,
from_unixtime(cast(operate_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') operate_time,
from_unixtime(cast(expire_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') expire_time
from ods.mall__coupon_info
where dt='$db_date';
"
$hive -e "$sql"
9.2.3 活動維度表(全量)
- 建表
drop table if exists dwd.mall__dim_activity_info
CREATE EXTERNAL TABLE `dwd.mall__dim_activity_info`(
`id` string COMMENT '編號',
`activity_name` string COMMENT '活動名稱',
`activity_type` string COMMENT '活動類型',
`condition_amount` string COMMENT '滿減金額',
`condition_num` string COMMENT '滿減件數',
`benefit_amount` string COMMENT '優惠金額',
`benefit_discount` string COMMENT '優惠折扣',
`benefit_level` string COMMENT '優惠級別',
`start_time` string COMMENT '開始時間',
`end_time` string COMMENT '結束時間',
`create_time` string COMMENT '創建時間'
) COMMENT '活動維度表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/dim_activity_info/'
tblproperties ("parquet.compression"="snappy")
- 數據導入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=dim_activity_info
hive_table_name=$APP2.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
info.id,
info.activity_name,
info.activity_type,
rule.condition_amount,
rule.condition_num,
rule.benefit_amount,
rule.benefit_discount,
rule.benefit_level,
from_unixtime(cast(info.start_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') start_time,
from_unixtime(cast(info.end_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') end_time,
from_unixtime(cast(info.create_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') create_time
from
(
select * from ods.mall__activity_info where dt='$db_date'
)info
left join
(
select * from ods.mall__activity_rule where dt='$db_date'
)rule on info.id = rule.activity_id;
"
$hive -e "$sql"
9.2.4 地區維度表(特殊)
- 建表
drop table if exists dwd.mall__dim_base_province
CREATE EXTERNAL TABLE `dwd.mall__dim_base_province`(
`id` string COMMENT 'id',
`province_name` string COMMENT '省市名稱',
`area_code` string COMMENT '地區編碼',
`iso_code` string COMMENT 'ISO 編碼',
`region_id` string COMMENT '地區 id',
`region_name` string COMMENT '地區名稱'
) COMMENT '地區維度表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/dim_base_province/'
tblproperties ("parquet.compression"="snappy")
- 數據導入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=dim_base_province
hive_table_name=$APP2.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
bp.id,
bp.name,
bp.area_code,
bp.iso_code,
bp.region_id,
br.region_name
from ods.mall__base_province bp
join ods.mall__base_region br
on bp.region_id=br.id;
"
$hive -e "$sql"
9.2.5 時間維度表(特殊)
- 建表
drop table if exists dwd.mall__dim_date_info
CREATE EXTERNAL TABLE `dwd.mall__dim_date_info`(
`date_id` string COMMENT '日',
`week_id` int COMMENT '周',
`week_day` int COMMENT '周的第幾天',
`day` int COMMENT '每月的第幾天',
`month` int COMMENT '第幾月',
`quarter` int COMMENT '第幾季度',
`year` int COMMENT '年',
`is_workday` int COMMENT '是否是周末',
`holiday_id` int COMMENT '是否是節假日'
) COMMENT '時間維度表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/dim_date_info/'
tblproperties ("parquet.compression"="snappy")
- 數據導入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=dim_date_info
hive_table_name=$APP2.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
date_id,
week_id,
week_day,
day,
month,
quarter,
year,
is_workday,
holiday_id
from ods.mall__date_info
"
$hive -e "$sql"
9.2.6 用戶維度表(新增及變化-緩慢變化維-拉鏈表)
9.2.6.1 拉鏈表介紹
拉鏈表,記錄每條資訊的生命周期,一旦一條記錄的生命周期結束,就重新開始一條新的記錄,並把當前日期放入生效開始日期。
如果當前資訊至今有效,在生效結束日期中填入一個極大值(如:9999-99-99),下表為張三的手機號變化例子
用戶ID | 姓名 | 手機號 | 開始日期 | 結束日期 |
---|---|---|---|---|
1 | 張三 | 134XXXX5050 | 2019-01-01 | 2019-01-02 |
1 | 張三 | 139XXXX3232 | 2019-01-03 | 2020-01-01 |
1 | 張三 | 137XXXX7676 | 2020-01-02 | 9999-99-99 |
- 適合場景:數據會發生變化,但是大部分不變(即:緩慢變化維)
比如:用戶資訊發生變化,但是每天變化比例不高,按照每日全量,則效率低
- 如何使用拉鏈表:通過–>生效開始日期<=某個日期 且 生效結束日期>=某個日期,能夠得到某個時間點的數據全量切片。
- 拉鏈表形成過程
- 製作流程
用戶當日全部數據和MySQL中每天變化的數據拼接在一起,形成一個<新的臨時拉鏈表。
用臨時拉鏈表覆蓋舊的拉鏈表數據。
從而解決Hive中數據不能更新的問題
9.2.6.2 用戶維度表
用戶表中的數據每日既有可能新增,也有可能修改,屬於緩慢變化維度,此處採用拉鏈表存儲用戶維度數據。
- 建表
drop table if exists dwd.mall__dim_user_info_his
CREATE EXTERNAL TABLE `dwd.mall__dim_user_info_his`(
`id` string COMMENT '用戶 id',
`name` string COMMENT '姓名',
`birthday` string COMMENT '生日',
`gender` string COMMENT '性別',
`email` string COMMENT '郵箱',
`user_level` string COMMENT '用戶等級',
`create_time` string COMMENT '創建時間',
`operate_time` string COMMENT '操作時間',
`start_date` string COMMENT '有效開始日期',
`end_date` string COMMENT '有效結束日期'
) COMMENT '用戶拉鏈表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/dim_user_info_his/'
tblproperties ("parquet.compression"="snappy")
- 臨時表建表(結構與主表相同)
drop table if exists dwd.mall__dim_user_info_his_tmp
CREATE EXTERNAL TABLE `dwd.mall__dim_user_info_his_tmp`(
`id` string COMMENT '用戶 id',
`name` string COMMENT '姓名',
`birthday` string COMMENT '生日',
`gender` string COMMENT '性別',
`email` string COMMENT '郵箱',
`user_level` string COMMENT '用戶等級',
`create_time` string COMMENT '創建時間',
`operate_time` string COMMENT '操作時間',
`start_date` string COMMENT '有效開始日期',
`end_date` string COMMENT '有效結束日期'
) COMMENT '用戶拉鏈表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/dim_user_info_his_tmp/'
tblproperties ("parquet.compression"="snappy")
- 首先(主表)數據初始化,只做一次
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=dim_user_info_his
hive_table_name=$APP2.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
select
id,
name,
from_unixtime(cast(birthday/1000 as bigint),'yyyy-MM-dd HH:mm:ss') birthday,
gender,
email,
user_level,
from_unixtime(cast(create_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') create_time,
from_unixtime(cast(operate_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') operate_time,
'$db_date',
'9999-99-99'
from ods.mall__user_info oi
where oi.dt='$db_date';
"
$hive -e "$sql"
- 臨時表數據計算導入(在主表數據之後執行)
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=dim_user_info_his_tmp
hive_table_name=$APP2.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
select
*
from
( --查詢當前時間的所有資訊
select
cast(id as string) id,
name,
from_unixtime(cast(birthday/1000 as bigint),'yyyy-MM-dd HH:mm:ss') birthday,
gender,
email,
user_level,
from_unixtime(cast(create_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') create_time,
from_unixtime(cast(operate_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') operate_time,
'$db_date' start_date,
'9999-99-99' end_date
from ods.mall__user_info where dt='$db_date'
union all
--查詢當前變化了的數據,修改日期
select
uh.id,
uh.name,
from_unixtime(cast(uh.birthday/1000 as bigint),'yyyy-MM-dd HH:mm:ss') birthday,
uh.gender,
uh.email,
uh.user_level,
from_unixtime(cast(uh.create_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') create_time,
from_unixtime(cast(uh.operate_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') operate_time,
uh.start_date,
if(ui.id is not null and uh.end_date='9999-99-99', date_add(ui.dt,-1),uh.end_date) end_date
from dwd.mall__dim_user_info_his uh left join
(
--查詢當前時間的所有資訊
select
cast(id as string) id,
name,
from_unixtime(cast(birthday/1000 as bigint),'yyyy-MM-dd HH:mm:ss') birthday,
gender,
email,
user_level,
from_unixtime(cast(create_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') create_time,
from_unixtime(cast(operate_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') operate_time,
dt
from ods.mall__user_info
where dt='$db_date'
) ui on uh.id=ui.id
)his
order by his.id, start_date;
"
$hive -e "$sql"
- 數據導入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=dim_user_info_his
hive_table_name=$APP2.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
select * from dwd.mall__dim_user_info_his_tmp;
"
$hive -e "$sql"
9.2.7 訂單詳情事實表(事務型快照事實表-新增)
- 建表
drop table if exists dwd.mall__fact_order_detail
CREATE EXTERNAL TABLE `dwd.mall__fact_order_detail`(
`id` bigint COMMENT '編號',
`order_id` bigint COMMENT '訂單編號',
`user_id` bigint COMMENT '用戶id',
`sku_id` bigint COMMENT 'sku_id',
`sku_name` string COMMENT 'sku名稱',
`order_price` decimal(10,2) COMMENT '購買價格(下單時sku價格)',
`sku_num` string COMMENT '購買個數',
`create_time` bigint COMMENT '創建時間',
`province_id` string COMMENT '省份ID',
`total_amount` decimal(20,2) COMMENT '訂單總金額'
) COMMENT '訂單明細表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/fact_order_detail/'
tblproperties ("parquet.compression"="snappy")
- 數據導入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=fact_order_detail
hive_table_name=$APP2.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
od.id,
od.order_id,
od.user_id,
od.sku_id,
od.sku_name,
od.order_price,
od.sku_num,
od.create_time,
oi.province_id,
od.order_price*od.sku_num
from (select * from ods.mall__order_detail where dt='$db_date' ) od
join (select * from ods.mall__order_info where dt='$db_date' ) oi
on od.order_id=oi.id;
"
$hive -e "$sql"
9.2.7 支付事實表(事務型快照事實表-新增)
- 建表
drop table if exists dwd.mall__fact_payment_info
CREATE EXTERNAL TABLE `dwd.mall__fact_payment_info`(
`id` string COMMENT '',
`out_trade_no` string COMMENT '對外業務編號',
`order_id` string COMMENT '訂單編號',
`user_id` string COMMENT '用戶編號',
`alipay_trade_no` string COMMENT '支付寶交易流水編號',
`payment_amount` decimal(16,2) COMMENT '支付金額',
`subject` string COMMENT '交易內容',
`payment_type` string COMMENT '支付類型',
`payment_time` string COMMENT '支付時間',
`province_id` string COMMENT '省份 ID'
) COMMENT '支付事實表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/fact_payment_info/'
tblproperties ("parquet.compression"="snappy")
- 數據導入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=fact_payment_info
hive_table_name=$APP2.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
pi.id,
pi.out_trade_no,
pi.order_id,
pi.user_id,
pi.alipay_trade_no,
pi.total_amount,
pi.subject,
pi.payment_type,
from_unixtime(cast(pi.payment_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') payment_time,
oi.province_id
from
(
select * from ods.mall__payment_info where dt='$db_date'
)pi
join
(
select id, province_id from ods.mall__order_info where dt='$db_date'
)oi
on pi.order_id = oi.id;
"
$hive -e "$sql"
9.2.8 退款事實表(事務型快照事實表-新增)
- 建表
drop table if exists dwd.mall__fact_order_refund_info
CREATE EXTERNAL TABLE `dwd.mall__fact_order_refund_info`(
`id` string COMMENT '編號',
`user_id` string COMMENT '用戶 ID',
`order_id` string COMMENT '訂單 ID',
`sku_id` string COMMENT '商品 ID',
`refund_type` string COMMENT '退款類型',
`refund_num` bigint COMMENT '退款件數',
`refund_amount` decimal(16,2) COMMENT '退款金額',
`refund_reason_type` string COMMENT '退款原因類型',
`create_time` string COMMENT '退款時間'
) COMMENT '退款事實表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/fact_order_refund_info/'
tblproperties ("parquet.compression"="snappy")
- 數據導入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=fact_order_refund_info
hive_table_name=$APP2.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
id,
user_id,
order_id,
sku_id,
refund_type,
refund_num,
refund_amount,
refund_reason_type,
from_unixtime(cast(create_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') create_time
from ods.mall__order_refund_info
where dt='$db_date';
"
$hive -e "$sql"
9.2.9 評價事實表(事務型快照事實表-新增)
- 建表
drop table if exists dwd.mall__fact_comment_info
CREATE EXTERNAL TABLE `dwd.mall__fact_comment_info`(
`id` string COMMENT '編號',
`user_id` string COMMENT '用戶 ID',
`sku_id` string COMMENT '商品 sku',
`spu_id` string COMMENT '商品 spu',
`order_id` string COMMENT '訂單 ID',
`appraise` string COMMENT '評價',
`create_time` string COMMENT '評價時間'
) COMMENT '評價事實表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/fact_comment_info/'
tblproperties ("parquet.compression"="snappy")
- 數據導入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=fact_comment_info
hive_table_name=$APP2.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
id,
user_id,
sku_id,
spu_id,
order_id,
appraise,
from_unixtime(cast(create_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') create_time
from ods.mall__comment_info
where dt='$db_date';
"
$hive -e "$sql"
9.2.10 加購事實表(周期型快照事實表-全量)
- 建表
drop table if exists dwd.mall__fact_cart_info
CREATE EXTERNAL TABLE `dwd.mall__fact_cart_info`(
`id` string COMMENT '編號',
`user_id` string COMMENT '用戶 id',
`sku_id` string COMMENT 'skuid',
`cart_price` string COMMENT '放入購物車時價格',
`sku_num` string COMMENT '數量',
`sku_name` string COMMENT 'sku 名稱 (冗餘)',
`create_time` string COMMENT '創建時間',
`operate_time` string COMMENT '修改時間',
`is_ordered` string COMMENT '是否已經下單。1 為已下單;0 為未下單',
`order_time` string COMMENT '下單時間'
) COMMENT '加購事實表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/fact_cart_info/'
tblproperties ("parquet.compression"="snappy")
- 數據導入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=fact_cart_info
hive_table_name=$APP2.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
id,
user_id,
sku_id,
cart_price,
sku_num,
sku_name,
from_unixtime(cast(create_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') create_time,
from_unixtime(cast(operate_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') operate_time,
is_ordered,
from_unixtime(cast(order_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') order_time
from ods.mall__cart_info
where dt='$db_date';
"
$hive -e "$sql"
9.2.11 收藏事實表(周期型快照事實表-全量)
- 建表
drop table if exists dwd.mall__fact_favor_info
CREATE EXTERNAL TABLE `dwd.mall__fact_favor_info`(
`id` string COMMENT '編號',
`user_id` string COMMENT '用戶 id',
`sku_id` string COMMENT 'skuid',
`spu_id` string COMMENT 'spuid',
`is_cancel` string COMMENT '是否取消',
`create_time` string COMMENT '收藏時間',
`cancel_time` string COMMENT '取消時間'
) COMMENT '收藏事實表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/fact_favor_info/'
tblproperties ("parquet.compression"="snappy")
- 數據導入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=fact_favor_info
hive_table_name=$APP2.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
id,
user_id,
sku_id,
spu_id,
is_cancel,
from_unixtime(cast(create_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') create_time,
from_unixtime(cast(cancel_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss') cancel_time
from ods.mall__favor_info
where dt='$db_date';
"
$hive -e "$sql"
9.2.12 優惠券領用事實表(累積型快照事實表-新增及變化)
- 建表
drop table if exists dwd.mall__fact_coupon_use
CREATE EXTERNAL TABLE `dwd.mall__fact_coupon_use`(
`` string COMMENT '編號',
`coupon_id` string COMMENT '優惠券 ID',
`user_id` string COMMENT 'userid',
`order_id` string COMMENT '訂單 id',
`coupon_status` string COMMENT '優惠券狀態',
`get_time` string COMMENT '領取時間',
`using_time` string COMMENT '使用時間(下單)',
`used_time` string COMMENT '使用時間(支付)'
) COMMENT '優惠券領用事實表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/fact_coupon_use/'
tblproperties ("parquet.compression"="snappy")
dt 是按照優惠卷領用時間 get_time 做為分區。
get_time 為領用時間,領用過後數據就需要存在,然後在下單和支付的時候疊加更新時間
- 數據導入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=fact_coupon_use
hive_table_name=$APP2.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
set hive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
if(new.id is null,old.id,new.id) id,
if(new.coupon_id is null,old.coupon_id,new.coupon_id) coupon_id,
if(new.user_id is null,old.user_id,new.user_id) user_id,
if(new.order_id is null,old.order_id,new.order_id) order_id,
if(new.coupon_status is null,old.coupon_status,new.coupon_status) coupon_status,
from_unixtime(cast(if(new.get_time is null,old.get_time,new.get_time)/1000 as bigint),'yyyy-MM-dd') get_time,
from_unixtime(cast(if(new.using_time is null,old.using_time,new.using_time)/1000 as bigint),'yyyy-MM-dd') using_time,
from_unixtime(cast(if(new.used_time is null,old.used_time,new.used_time)/1000 as bigint),'yyyy-MM-dd'),
from_unixtime(cast(if(new.get_time is null,old.get_time,new.get_time)/1000 as bigint),'yyyy-MM-dd')
from
(
select
id,
coupon_id,
user_id,
order_id,
coupon_status,
get_time,
using_time,
used_time
from dwd.mall__fact_coupon_use
where dt in
(
select
from_unixtime(cast(get_time/1000 as bigint),'yyyy-MM-dd')
from ods.mall__coupon_use
where dt='$db_date'
)
)old
full outer join
(
select
id,
coupon_id,
user_id,
order_id,
coupon_status,
get_time,
using_time,
used_time
from ods.mall__coupon_use
where dt='$db_date'
)new
on old.id=new.id;
"
$hive -e "$sql"
9.2.13 訂單事實表(累積型快照事實表-新增及變化)
- 建表
drop table if exists dwd.mall__fact_order_info
CREATE EXTERNAL TABLE `dwd.mall__fact_order_info`(
`id` string COMMENT '訂單編號',
`order_status` string COMMENT '訂單狀態',
`user_id` string COMMENT '用戶 id',
`out_trade_no` string COMMENT '支付流水號',
`create_time` string COMMENT '創建時間(未支付狀態)',
`payment_time` string COMMENT '支付時間(已支付狀態)',
`cancel_time` string COMMENT '取消時間(已取消狀態)',
`finish_time` string COMMENT '完成時間(已完成狀態)',
`refund_time` string COMMENT '退款時間(退款中狀態)',
`refund_finish_time` string COMMENT '退款完成時間(退款完成狀態)',
`province_id` string COMMENT '省份 ID',
`activity_id` string COMMENT '活動 ID',
`original_total_amount` string COMMENT '原價金額',
`benefit_reduce_amount` string COMMENT '優惠金額',
`feight_fee` string COMMENT '運費',
`final_total_amount` decimal(10,2) COMMENT '訂單金額'
) COMMENT '訂單事實表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/mall/fact_order_info/'
tblproperties ("parquet.compression"="snappy")
- 數據導入
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwd
table_name=fact_order_info
hive_table_name=$APP2.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
if(new.id is null,old.id,new.id),
if(new.order_status is null,old.order_status,new.order_status),
if(new.user_id is null,old.user_id,new.user_id),
if(new.out_trade_no is null,old.out_trade_no,new.out_trade_no),
if(new.tms['1001'] is null,from_unixtime(cast(old.create_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss'),new.tms['1001']),--1001 對應未支付狀態
if(new.tms['1002'] is null,from_unixtime(cast(old.payment_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss'),new.tms['1002']),
if(new.tms['1003'] is null,from_unixtime(cast(old.cancel_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss'),new.tms['1003']),
if(new.tms['1004'] is null,from_unixtime(cast(old.finish_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss'),new.tms['1004']),
if(new.tms['1005'] is null,from_unixtime(cast(old.refund_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss'),new.tms['1005']),
if(new.tms['1006'] is null,from_unixtime(cast(old.refund_finish_time/1000 as bigint),'yyyy-MM-dd HH:mm:ss'),new.tms['1006']),
if(new.province_id is null,old.province_id,new.province_id),
if(new.activity_id is null,old.activity_id,new.activity_id),
if(new.original_total_amount is null,old.original_total_amount,new.original_total_amount),
if(new.benefit_reduce_amount is null,old.benefit_reduce_amount,new.benefit_reduce_amount),
if(new.feight_fee is null,old.feight_fee,new.feight_fee),
if(new.final_total_amount is null,old.final_total_amount,new.final_total_amount)
from
(
select
id,
order_status,
user_id,
out_trade_no,
create_time,
payment_time,
cancel_time,
finish_time,
refund_time,
refund_finish_time,
province_id,
activity_id,
original_total_amount,
benefit_reduce_amount,
feight_fee,
final_total_amount
from dwd.mall__fact_order_info
where dt in
(
select
from_unixtime(cast(create_time/1000 as bigint),'yyyy-MM-dd')
from ods.mall__order_info
where dt='$db_date'
)
)old
full outer join
(
select
info.id,
info.order_status,
info.user_id,
info.out_trade_no,
info.province_id,
act.activity_id,
log.tms,
info.original_total_amount,
info.benefit_reduce_amount,
info.feight_fee,
info.final_total_amount
from
(
select
order_id,
str_to_map(concat_ws(',',collect_set(concat(order_status,'=',from_unixtime(cast(operate_time/1000 as bigint),'yyyy-MM-dd')))),',','=')
tms
from ods.mall__order_status_log
where dt='$db_date'
group by order_id
)log
join
(
select * from ods.mall__order_info where dt='$db_date'
)info
on log.order_id=info.id
left join
(
select * from ods.mall__activity_order where dt='$db_date'
)act
on log.order_id=act.order_id
)new
on old.id=new.id;
"
$hive -e "$sql"
10 DWS層構建
不在進行壓縮處理,因為壓縮對於硬碟是好的,但是對於CPU計算是差的,對於DWS層的表,會被經常使用,那麼講究的是計算效率,此層主要處理每日主題行為
10.1 每日設備行為(用戶行為)
- 建表
drop table if exists dws.mall__uv_detail_daycount
CREATE EXTERNAL TABLE `dws.mall__uv_detail_daycount`(
`mid_id` string COMMENT '設備唯一標識',
`user_id` string COMMENT '用戶標識',
`version_code` string COMMENT '程式版本號',
`version_name` string COMMENT '程式版本名',
`lang` string COMMENT '系統語言',
`source` string COMMENT '渠道號',
`os` string COMMENT 'Android系統版本',
`area` string COMMENT '區域',
`model` string COMMENT '手機型號',
`brand` string COMMENT '手機品牌',
`sdk_version` string COMMENT 'sdkVersion',
`gmail` string COMMENT 'gmail',
`height_width` string COMMENT '螢幕寬高',
`app_time` string COMMENT '客戶端日誌產生時的時間',
`network` string COMMENT '網路模式',
`lng` string COMMENT '經度',
`lat` string COMMENT '緯度',
`login_count` bigint COMMENT '活躍次數'
) COMMENT '每日設備行為表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dws/mall/uv_detail_daycount/'
- 導入數據
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dws
table_name=uv_detail_daycount
hive_table_name=$APP2.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
PARTITION (dt='$db_date')
select
mid_id,
concat_ws('|', collect_set(user_id)) user_id,
concat_ws('|', collect_set(version_code)) version_code,
concat_ws('|', collect_set(version_name)) version_name,
concat_ws('|', collect_set(lang))lang,
concat_ws('|', collect_set(source)) source,
concat_ws('|', collect_set(os)) os,
concat_ws('|', collect_set(area)) area,
concat_ws('|', collect_set(model)) model,
concat_ws('|', collect_set(brand)) brand,
concat_ws('|', collect_set(sdk_version)) sdk_version,
concat_ws('|', collect_set(gmail)) gmail,
concat_ws('|', collect_set(height_width)) height_width,
concat_ws('|', collect_set(app_time)) app_time,
concat_ws('|', collect_set(network)) network,
concat_ws('|', collect_set(lng)) lng,
concat_ws('|', collect_set(lat)) lat,
count(*) login_count
from dwd.mall__start_log
where dt='$db_date'
group by mid_id;
"
$hive -e "$sql"
10.2 每日會員行為(業務)
- 建表
drop table if exists dws.mall__user_action_daycount
CREATE EXTERNAL TABLE `dws.mall__user_action_daycount`(
user_id string comment '用戶 id',
login_count bigint comment '登錄次數',
cart_count bigint comment '加入購物車次數',
cart_amount double comment '加入購物車金額',
order_count bigint comment '下單次數',
order_amount decimal(16,2) comment '下單金額',
payment_count bigint comment '支付次數',
payment_amount decimal(16,2) comment '支付金額'
) COMMENT '每日會員行為表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dws/mall/user_action_daycount/'
- 導入數據
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dws
table_name=user_action_daycount
hive_table_name=$APP2.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
with
tmp_login as
(
select
user_id,
count(*) login_count
from dwd.mall__start_log
where dt='$db_date'
and user_id is not null
group by user_id
),
tmp_cart as
(
select
user_id,
count(*) cart_count,
sum(cart_price*sku_num) cart_amount
from dwd.mall__fact_cart_info
where dt='$db_date'
and user_id is not null
and date_format(create_time,'yyyy-MM-dd')='$db_date'
group by user_id
),
tmp_order as
(
select
user_id,
count(*) order_count,
sum(final_total_amount) order_amount
from dwd.mall__fact_order_info
where dt='$db_date'
group by user_id
) ,
tmp_payment as
(
select
user_id,
count(*) payment_count,
sum(payment_amount) payment_amount
from dwd.mall__fact_payment_info
where dt='$db_date'
group by user_id
)
insert overwrite table $hive_table_name partition(dt='$db_date')
select
user_actions.user_id,
sum(user_actions.login_count),
sum(user_actions.cart_count),
sum(user_actions.cart_amount),
sum(user_actions.order_count),
sum(user_actions.order_amount),
sum(user_actions.payment_count),
sum(user_actions.payment_amount)
from
(
select
user_id,
login_count,
0 cart_count,
0 cart_amount,
0 order_count,
0 order_amount,
0 payment_count,
0 payment_amount
from
tmp_login
union all
select
user_id,
0 login_count,
cart_count,
cart_amount,
0 order_count,
0 order_amount,
0 payment_count,
0 payment_amount
from
tmp_cart
union all
select
user_id,
0 login_count,
0 cart_count,
0 cart_amount,
order_count,
order_amount,
0 payment_count,
0 payment_amount
from tmp_order
union all
select
user_id,
0 login_count,
0 cart_count,
0 cart_amount,
0 order_count,
0 order_amount,
payment_count,
payment_amount
from tmp_payment
) user_actions
group by user_id;
"
$hive -e "$sql"
10.3 每日商品行為(業務)
- 建表
drop table if exists dws.mall__sku_action_daycount
CREATE EXTERNAL TABLE `dws.mall__sku_action_daycount`(
sku_id string comment 'sku_id',
order_count bigint comment '被下單次數',
order_num bigint comment '被下單件數',
order_amount decimal(16,2) comment '被下單金額',
payment_count bigint comment '被支付次數',
payment_num bigint comment '被支付件數',
payment_amount decimal(16,2) comment '被支付金額',
refund_count bigint comment '被退款次數',
refund_num bigint comment '被退款件數',
refund_amount decimal(16,2) comment '被退款金額',
cart_count bigint comment '被加入購物車次數',
cart_num bigint comment '被加入購物車件數',
favor_count bigint comment '被收藏次數',
appraise_good_count bigint comment '好評數',
appraise_mid_count bigint comment '中評數',
appraise_bad_count bigint comment '差評數',
appraise_default_count bigint comment '默認評價數'
) COMMENT '每日商品行為表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dws/mall/sku_action_daycount/'
- 導入數據
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dws
table_name=sku_action_daycount
hive_table_name=$APP2.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
with
tmp_order as
(
select
cast(sku_id as string) sku_id,
count(*) order_count,
sum(sku_num) order_num,
sum(total_amount) order_amount
from dwd.mall__fact_order_detail
where dt='$db_date'
group by sku_id
),
tmp_payment as
(
select
cast(sku_id as string) sku_id,
count(*) payment_count,
sum(sku_num) payment_num,
sum(total_amount) payment_amount
from dwd.mall__fact_order_detail
where dt='$db_date'
and order_id in
(
select
id
from dwd.mall__fact_order_info
where (dt='$db_date' or dt=date_add('$db_date',-1))
and date_format(payment_time,'yyyy-MM-dd')='$db_date'
)
group by sku_id
),
tmp_refund as
(
select
cast(sku_id as string) sku_id,
count(*) refund_count,
sum(refund_num) refund_num,
sum(refund_amount) refund_amount
from dwd.mall__fact_order_refund_info
where dt='$db_date'
group by sku_id
),
tmp_cart as
(
select
cast(sku_id as string) sku_id,
count(*) cart_count,
sum(sku_num) cart_num
from dwd.mall__fact_cart_info
where dt='$db_date'
and date_format(create_time,'yyyy-MM-dd')='$db_date'
group by sku_id
),
tmp_favor as
(
select
cast(sku_id as string) sku_id,
count(*) favor_count
from dwd.mall__fact_favor_info
where dt='$db_date'
and date_format(create_time,'yyyy-MM-dd')='$db_date'
group by sku_id
),
tmp_appraise as
(
select
cast(sku_id as string) sku_id,
sum(if(appraise='1201',1,0)) appraise_good_count,
sum(if(appraise='1202',1,0)) appraise_mid_count,
sum(if(appraise='1203',1,0)) appraise_bad_count,
sum(if(appraise='1204',1,0)) appraise_default_count
from dwd.mall__fact_comment_info
where dt='$db_date'
group by sku_id
)
insert overwrite table $hive_table_name partition(dt='$db_date')
select
sku_id,
sum(order_count),
sum(order_num),
sum(order_amount),
sum(payment_count),
sum(payment_num),
sum(payment_amount),
sum(refund_count),
sum(refund_num),
sum(refund_amount),
sum(cart_count),
sum(cart_num),
sum(favor_count),
sum(appraise_good_count),
sum(appraise_mid_count),
sum(appraise_bad_count),
sum(appraise_default_count)
from
(
select
sku_id,
order_count,
order_num,
order_amount,
0 payment_count,
0 payment_num,
0 payment_amount,
0 refund_count,
0 refund_num,
0 refund_amount,
0 cart_count,
0 cart_num,
0 favor_count,
0 appraise_good_count,
0 appraise_mid_count,
0 appraise_bad_count,
0 appraise_default_count
from tmp_order
union all
select
sku_id,
0 order_count,
0 order_num,
0 order_amount,
payment_count,
payment_num,
payment_amount,
0 refund_count,
0 refund_num,
0 refund_amount,
0 cart_count,
0 cart_num,
0 favor_count,
0 appraise_good_count,
0 appraise_mid_count,
0 appraise_bad_count,
0 appraise_default_count
from tmp_payment
union all
select
sku_id,
0 order_count,
0 order_num,
0 order_amount,
0 payment_count,
0 payment_num,
0 payment_amount,
refund_count,
refund_num,
refund_amount,
0 cart_count,
0 cart_num,
0 favor_count,
0 appraise_good_count,
0 appraise_mid_count,
0 appraise_bad_count,
0 appraise_default_count
from tmp_refund
union all
select
sku_id,
0 order_count,
0 order_num,
0 order_amount,
0 payment_count,
0 payment_num,
0 payment_amount,
0 refund_count,
0 refund_num,
0 refund_amount,
cart_count,
cart_num,
0 favor_count,
0 appraise_good_count,
0 appraise_mid_count,
0 appraise_bad_count,
0 appraise_default_count
from tmp_cart
union all
select
sku_id,
0 order_count,
0 order_num,
0 order_amount,
0 payment_count,
0 payment_num,
0 payment_amount,
0 refund_count,
0 refund_num,
0 refund_amount,
0 cart_count,
0 cart_num,
favor_count,
0 appraise_good_count,
0 appraise_mid_count,
0 appraise_bad_count,
0 appraise_default_count
from tmp_favor
union all
select
sku_id,
0 order_count,
0 order_num,
0 order_amount,
0 payment_count,
0 payment_num,
0 payment_amount,
0 refund_count,
0 refund_num,
0 refund_amount,
0 cart_count,
0 cart_num,
0 favor_count,
appraise_good_count,
appraise_mid_count,
appraise_bad_count,
appraise_default_count
from tmp_appraise
)tmp
group by sku_id;
"
$hive -e "$sql"
10.4 每日優惠券統計(業務)
- 建表
drop table if exists dws.mall__coupon_use_daycount
CREATE EXTERNAL TABLE `dws.mall__coupon_use_daycount`(
`coupon_id` string COMMENT '優惠券 ID',
`coupon_name` string COMMENT '購物券名稱',
`coupon_type` string COMMENT '購物券類型 1 現金券 2 折扣券 3 滿減券 4 滿件打折券',
`condition_amount` string COMMENT '滿額數',
`condition_num` string COMMENT '滿件數',
`activity_id` string COMMENT '活動編號',
`benefit_amount` string COMMENT '減金額',
`benefit_discount` string COMMENT '折扣',
`create_time` string COMMENT '創建時間',
`range_type` string COMMENT '範圍類型 1、商品 2、品類 3、品牌',
`spu_id` string COMMENT '商品 id',
`tm_id` string COMMENT '品牌 id',
`category3_id` string COMMENT '品類 id',
`limit_num` string COMMENT '最多領用次數',
`get_count` bigint COMMENT '領用次數',
`using_count` bigint COMMENT '使用(下單)次數',
`used_count` bigint COMMENT '使用(支付)次數'
) COMMENT '每日優惠券統計表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dws/mall/coupon_use_daycount/'
- 導入數據
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dws
table_name=coupon_use_daycount
hive_table_name=$APP2.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name partition(dt='$db_date')
select
cu.coupon_id,
ci.coupon_name,
ci.coupon_type,
ci.condition_amount,
ci.condition_num,
ci.activity_id,
ci.benefit_amount,
ci.benefit_discount,
ci.create_time,
ci.range_type,
ci.spu_id,
ci.tm_id,
ci.category3_id,
ci.limit_num,
cu.get_count,
cu.using_count,
cu.used_count
from
(
select
coupon_id,
sum(if(date_format(get_time,'yyyy-MM-dd')='$db_date',1,0))
get_count,
sum(if(date_format(using_time,'yyyy-MM-dd')='$db_date',1,0))
using_count,
sum(if(date_format(used_time,'yyyy-MM-dd')='$db_date',1,0))
used_count
from dwd.mall__fact_coupon_use
where dt='$db_date'
group by coupon_id
)cu
left join
(
select
*
from dwd.mall__dim_coupon_info
where dt='$db_date'
)ci on cu.coupon_id=ci.id;
"
$hive -e "$sql"
10.5 每日活動統計(業務)
- 建表
drop table if exists dws.mall__activity_info_daycount
CREATE EXTERNAL TABLE `dws.mall__activity_info_daycount`(
`id` string COMMENT '編號',
`activity_name` string COMMENT '活動名稱',
`activity_type` string COMMENT '活動類型',
`start_time` string COMMENT '開始時間',
`end_time` string COMMENT '結束時間',
`create_time` string COMMENT '創建時間',
`order_count` bigint COMMENT '下單次數',
`payment_count` bigint COMMENT '支付次數'
) COMMENT '每日活動統計表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dws/mall/activity_info_daycount/'
- 導入數據
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dws
table_name=activity_info_daycount
hive_table_name=$APP2.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name partition(dt='$db_date')
select
oi.activity_id,
ai.activity_name,
ai.activity_type,
ai.start_time,
ai.end_time,
ai.create_time,
oi.order_count,
oi.payment_count
from
(
select
activity_id,
sum(if(date_format(create_time,'yyyy-MM-dd')='$db_date',1,0))
order_count,
sum(if(date_format(payment_time,'yyyy-MM-dd')='$db_date',1,0))
payment_count
from dwd.mall__fact_order_info
where (dt='$db_date' or dt=date_add('$db_date',-1))
and activity_id is not null
group by activity_id
)oi
join
(
select
*
from dwd.mall__dim_activity_info
where dt='$db_date'
)ai
on oi.activity_id=ai.id;
"
$hive -e "$sql"
10.6 每日購買行為(業務)
- 建表
drop table if exists dws.mall__sale_detail_daycount
CREATE EXTERNAL TABLE `dws.mall__sale_detail_daycount`(
user_id string comment '用戶 id',
sku_id string comment '商品 id',
user_gender string comment '用戶性別',
user_age string comment '用戶年齡',
user_level string comment '用戶等級',
order_price decimal(10,2) comment '商品價格',
sku_name string comment '商品名稱',
sku_tm_id string comment '品牌 id',
sku_category3_id string comment '商品三級品類 id',
sku_category2_id string comment '商品二級品類 id',
sku_category1_id string comment '商品一級品類 id',
sku_category3_name string comment '商品三級品類名稱',
sku_category2_name string comment '商品二級品類名稱',
sku_category1_name string comment '商品一級品類名稱',
spu_id string comment '商品 spu',
sku_num int comment '購買個數',
order_count bigint comment '當日下單單數',
order_amount decimal(16,2) comment '當日下單金額'
) COMMENT '每日購買行為表'
PARTITIONED BY (
`dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dws/mall/sale_detail_daycount/'
- 導入數據
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dws
table_name=sale_detail_daycount
hive_table_name=$APP2.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name partition(dt='$db_date')
select
op.user_id,
op.sku_id,
ui.gender,
months_between('$db_date', ui.birthday)/12 age,
ui.user_level,
si.price,
si.sku_name,
si.tm_id,
si.category3_id,
si.category2_id,
si.category1_id,
si.category3_name,
si.category2_name,
si.category1_name,
si.spu_id,
op.sku_num,
op.order_count,
op.order_amount
from
(
select
user_id,
sku_id,
sum(sku_num) sku_num,
count(*) order_count,
sum(total_amount) order_amount
from dwd.mall__fact_order_detail
where dt='$db_date'
group by user_id, sku_id
)op
join
(
select
*
from dwd.mall__dim_user_info_his
where end_date='9999-99-99'
)ui on op.user_id = ui.id
join
(
select
*
from dwd.mall__dim_sku_info
where dt='$db_date'
)si on op.sku_id = si.id;
"
$hive -e "$sql"
11 DWT層構建
此層主要針對dws層每日數據進行匯總,不建立分區,不壓縮,每日進行數據覆蓋
11.1 設備主題寬表
- 建表
drop table if exists dwt.mall__uv_topic
CREATE EXTERNAL TABLE `dwt.mall__uv_topic`(
`mid_id` string COMMENT '設備唯一標識',
`user_id` string COMMENT '用戶標識',
`version_code` string COMMENT '程式版本號',
`version_name` string COMMENT '程式版本名',
`lang` string COMMENT '系統語言',
`source` string COMMENT '渠道號',
`os` string COMMENT 'Android系統版本',
`area` string COMMENT '區域',
`model` string COMMENT '手機型號',
`brand` string COMMENT '手機品牌',
`sdk_version` string COMMENT 'sdkVersion',
`gmail` string COMMENT 'gmail',
`height_width` string COMMENT '螢幕寬高',
`app_time` string COMMENT '客戶端日誌產生時的時間',
`network` string COMMENT '網路模式',
`lng` string COMMENT '經度',
`lat` string COMMENT '緯度',
`login_date_first` string comment '首次活躍時間',
`login_date_last` string comment '末次活躍時間',
`login_day_count` bigint comment '當日活躍次數',
`login_count` bigint comment '累積活躍天數'
) COMMENT '設備主題寬表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwt/mall/uv_topic/'
- 導入數據
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwt
table_name=uv_topic
hive_table_name=$APP2.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
select
nvl(new.mid_id,old.mid_id),
nvl(new.user_id,old.user_id),
nvl(new.version_code,old.version_code),
nvl(new.version_name,old.version_name),
nvl(new.lang,old.lang),
nvl(new.source,old.source),
nvl(new.os,old.os),
nvl(new.area,old.area),
nvl(new.model,old.model),
nvl(new.brand,old.brand),
nvl(new.sdk_version,old.sdk_version),
nvl(new.gmail,old.gmail),
nvl(new.height_width,old.height_width),
nvl(new.app_time,old.app_time),
nvl(new.network,old.network),
nvl(new.lng,old.lng),
nvl(new.lat,old.lat),
if(old.mid_id is null,'2020-03-10',old.login_date_first),
if(new.mid_id is not null,'2020-03-10',old.login_date_last),
if(new.mid_id is not null, new.login_count,0),
nvl(old.login_count,0)+if(new.login_count>0,1,0)
from
(
select
*
from dwt.mall__uv_topic
)old
full outer join
(
select
*
from dws.mall__uv_detail_daycount
where dt='$db_date'
)new
on old.mid_id=new.mid_id;
"
$hive -e "$sql"
11.2 會員主題寬表
- 建表
drop table if exists dwt.mall__user_topic
CREATE EXTERNAL TABLE `dwt.mall__user_topic`(
user_id string comment '用戶 id',
login_date_first string comment '首次登錄時間',
login_date_last string comment '末次登錄時間',
login_count bigint comment '累積登錄天數',
login_last_30d_count bigint comment '最近 30 日登錄天數',
order_date_first string comment '首次下單時間',
order_date_last string comment '末次下單時間',
order_count bigint comment '累積下單次數',
order_amount decimal(16,2) comment '累積下單金額',
order_last_30d_count bigint comment '最近 30 日下單次數',
order_last_30d_amount bigint comment '最近 30 日下單金額',
payment_date_first string comment '首次支付時間',
payment_date_last string comment '末次支付時間',
payment_count decimal(16,2) comment '累積支付次數',
payment_amount decimal(16,2) comment '累積支付金額',
payment_last_30d_count decimal(16,2) comment '最近 30 日支付次數',
payment_last_30d_amount decimal(16,2) comment '最近 30 日支付金額'
) COMMENT '會員主題寬表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwt/mall/user_topic/'
- 導入數據
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwt
table_name=user_topic
hive_table_name=$APP2.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
select
nvl(new.user_id,old.user_id),
if(old.login_date_first is null and
new.login_count>0,'$db_date',old.login_date_first),
if(new.login_count>0,'$db_date',old.login_date_last),
nvl(old.login_count,0)+if(new.login_count>0,1,0),
nvl(new.login_last_30d_count,0),
if(old.order_date_first is null and
new.order_count>0,'$db_date',old.order_date_first),
if(new.order_count>0,'$db_date',old.order_date_last),
nvl(old.order_count,0)+nvl(new.order_count,0),
nvl(old.order_amount,0)+nvl(new.order_amount,0),
nvl(new.order_last_30d_count,0),
nvl(new.order_last_30d_amount,0),
if(old.payment_date_first is null and
new.payment_count>0,'$db_date',old.payment_date_first),
if(new.payment_count>0,'$db_date',old.payment_date_last),
nvl(old.payment_count,0)+nvl(new.payment_count,0),
nvl(old.payment_amount,0)+nvl(new.payment_amount,0),
nvl(new.payment_last_30d_count,0),
nvl(new.payment_last_30d_amount,0)
from
dwt.mall__user_topic old
full outer join
(
select
user_id,
sum(if(dt='$db_date',login_count,0)) login_count,
sum(if(dt='$db_date',order_count,0)) order_count,
sum(if(dt='$db_date',order_amount,0)) order_amount,
sum(if(dt='$db_date',payment_count,0)) payment_count,
sum(if(dt='$db_date',payment_amount,0)) payment_amount,
sum(if(login_count>0,1,0)) login_last_30d_count,
sum(order_count) order_last_30d_count,
sum(order_amount) order_last_30d_amount,
sum(payment_count) payment_last_30d_count,
sum(payment_amount) payment_last_30d_amount
from dws.mall__user_action_daycount
where dt>=date_add( '$db_date',-30)
group by user_id
)new
on old.user_id=new.user_id;
"
$hive -e "$sql"
11.3 商品主題寬表
- 建表
drop table if exists dwt.mall__sku_topic
CREATE EXTERNAL TABLE `dwt.mall__sku_topic`(
sku_id string comment 'sku_id',
spu_id string comment 'spu_id',
order_last_30d_count bigint comment '最近 30 日被下單次數',
order_last_30d_num bigint comment '最近 30 日被下單件數',
order_last_30d_amount decimal(16,2) comment '最近 30 日被下單金額',
order_count bigint comment '累積被下單次數',
order_num bigint comment '累積被下單件數',
order_amount decimal(16,2) comment '累積被下單金額',
payment_last_30d_count bigint comment '最近 30 日被支付次數',
payment_last_30d_num bigint comment '最近 30 日被支付件數',
payment_last_30d_amount decimal(16,2) comment '最近 30 日被支付金額',
payment_count bigint comment '累積被支付次數',
payment_num bigint comment '累積被支付件數',
payment_amount decimal(16,2) comment '累積被支付金額',
refund_last_30d_count bigint comment '最近三十日退款次數',
refund_last_30d_num bigint comment '最近三十日退款件數',
refund_last_30d_amount decimal(10,2) comment '最近三十日退款金額',
refund_count bigint comment '累積退款次數',
refund_num bigint comment '累積退款件數',
refund_amount decimal(10,2) comment '累積退款金額',
cart_last_30d_count bigint comment '最近 30 日被加入購物車次數',
cart_last_30d_num bigint comment '最近 30 日被加入購物車件數',
cart_count bigint comment '累積被加入購物車次數',
cart_num bigint comment '累積被加入購物車件數',
favor_last_30d_count bigint comment '最近 30 日被收藏次數',
favor_count bigint comment '累積被收藏次數',
appraise_last_30d_good_count bigint comment '最近 30 日好評數',
appraise_last_30d_mid_count bigint comment '最近 30 日中評數',
appraise_last_30d_bad_count bigint comment '最近 30 日差評數',
appraise_last_30d_default_count bigint comment '最近 30 日默認評價數',
appraise_good_count bigint comment '累積好評數',
appraise_mid_count bigint comment '累積中評數',
appraise_bad_count bigint comment '累積差評數',
appraise_default_count bigint comment '累積默認評價數'
) COMMENT '商品主題寬表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwt/mall/sku_topic/'
- 導入數據
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwt
table_name=sku_topic
hive_table_name=$APP2.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
select
nvl(new.sku_id,old.sku_id), sku_info.spu_id,
nvl(new.order_count30,0),
nvl(new.order_num30,0),
nvl(new.order_amount30,0),
nvl(old.order_count,0) + nvl(new.order_count,0),
nvl(old.order_num,0) + nvl(new.order_num,0),
nvl(old.order_amount,0) + nvl(new.order_amount,0),
nvl(new.payment_count30,0),
nvl(new.payment_num30,0),
nvl(new.payment_amount30,0),
nvl(old.payment_count,0) + nvl(new.payment_count,0),
nvl(old.payment_num,0) + nvl(new.payment_count,0),
nvl(old.payment_amount,0) + nvl(new.payment_count,0),
nvl(new.refund_count30,0),
nvl(new.refund_num30,0),
nvl(new.refund_amount30,0),
nvl(old.refund_count,0) + nvl(new.refund_count,0),
nvl(old.refund_num,0) + nvl(new.refund_num,0),
nvl(old.refund_amount,0) + nvl(new.refund_amount,0),
nvl(new.cart_count30,0),
nvl(new.cart_num30,0),
nvl(old.cart_count,0) + nvl(new.cart_count,0),
nvl(old.cart_num,0) + nvl(new.cart_num,0),
nvl(new.favor_count30,0),
nvl(old.favor_count,0) + nvl(new.favor_count,0),
nvl(new.appraise_good_count30,0),
nvl(new.appraise_mid_count30,0),
nvl(new.appraise_bad_count30,0),
nvl(new.appraise_default_count30,0) ,
nvl(old.appraise_good_count,0) + nvl(new.appraise_good_count,0),
nvl(old.appraise_mid_count,0) + nvl(new.appraise_mid_count,0),
nvl(old.appraise_bad_count,0) + nvl(new.appraise_bad_count,0),
nvl(old.appraise_default_count,0) + nvl(new.appraise_default_count,0)
from
(
select
sku_id,
spu_id,
order_last_30d_count,
order_last_30d_num,
order_last_30d_amount,
order_count,
order_num,
order_amount ,
payment_last_30d_count,
payment_last_30d_num,
payment_last_30d_amount,
payment_count,
payment_num,
payment_amount,
refund_last_30d_count,
refund_last_30d_num,
refund_last_30d_amount,
refund_count,
refund_num,
refund_amount,
cart_last_30d_count,
cart_last_30d_num,
cart_count,
cart_num,
favor_last_30d_count,
favor_count,
appraise_last_30d_good_count,
appraise_last_30d_mid_count,
appraise_last_30d_bad_count,
appraise_last_30d_default_count,
appraise_good_count,
appraise_mid_count,
appraise_bad_count,
appraise_default_count
from dwt.mall__sku_topic
)old
full outer join
(
select
sku_id,
sum(if(dt='$db_date', order_count,0 )) order_count,
sum(if(dt='$db_date',order_num ,0 )) order_num,
sum(if(dt='$db_date',order_amount,0 )) order_amount ,
sum(if(dt='$db_date',payment_count,0 )) payment_count,
sum(if(dt='$db_date',payment_num,0 )) payment_num,
sum(if(dt='$db_date',payment_amount,0 )) payment_amount,
sum(if(dt='$db_date',refund_count,0 )) refund_count,
sum(if(dt='$db_date',refund_num,0 )) refund_num,
sum(if(dt='$db_date',refund_amount,0 )) refund_amount,
sum(if(dt='$db_date',cart_count,0 )) cart_count,
sum(if(dt='$db_date',cart_num,0 )) cart_num,
sum(if(dt='$db_date',favor_count,0 )) favor_count,
sum(if(dt='$db_date',appraise_good_count,0 )) appraise_good_count,
sum(if(dt='$db_date',appraise_mid_count,0 ) ) appraise_mid_count ,
sum(if(dt='$db_date',appraise_bad_count,0 )) appraise_bad_count,
sum(if(dt='$db_date',appraise_default_count,0 )) appraise_default_count,
sum(order_count) order_count30 ,
sum(order_num) order_num30,
sum(order_amount) order_amount30,
sum(payment_count) payment_count30,
sum(payment_num) payment_num30,
sum(payment_amount) payment_amount30,
sum(refund_count) refund_count30,
sum(refund_num) refund_num30,
sum(refund_amount) refund_amount30,
sum(cart_count) cart_count30,
sum(cart_num) cart_num30,
sum(favor_count) favor_count30,
sum(appraise_good_count) appraise_good_count30,
sum(appraise_mid_count) appraise_mid_count30,
sum(appraise_bad_count) appraise_bad_count30,
sum(appraise_default_count) appraise_default_count30
from dws.mall__sku_action_daycount
where dt >= date_add ('$db_date', -30)
group by sku_id
)new
on new.sku_id = old.sku_id
left join
(
select * from dwd.mall__dim_sku_info where dt='$db_date'
) sku_info
on nvl(new.sku_id,old.sku_id)= sku_info.id;
"
$hive -e "$sql"
11.4 優惠卷主題寬表
- 建表
drop table if exists dwt.mall__coupon_topic
CREATE EXTERNAL TABLE `dwt.mall__coupon_topic`(
`coupon_id` string COMMENT '優惠券 ID',
`get_day_count` bigint COMMENT '當日領用次數',
`using_day_count` bigint COMMENT '當日使用(下單)次數',
`used_day_count` bigint COMMENT '當日使用(支付)次數',
`get_count` bigint COMMENT '累積領用次數',
`using_count` bigint COMMENT '累積使用(下單)次數',
`used_count` bigint COMMENT '累積使用(支付)次數'
) COMMENT '優惠券主題寬表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwt/mall/coupon_topic/'
- 導入數據
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwt
table_name=coupon_topic
hive_table_name=$APP2.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
select
nvl(new.coupon_id,old.coupon_id),
nvl(new.get_count,0),
nvl(new.using_count,0),
nvl(new.used_count,0),
nvl(old.get_count,0)+nvl(new.get_count,0),
nvl(old.using_count,0)+nvl(new.using_count,0),
nvl(old.used_count,0)+nvl(new.used_count,0)
from
(
select
*
from dwt.mall__coupon_topic
)old
full outer join
(
select
coupon_id,
get_count,
using_count,
used_count
from dws.mall__coupon_use_daycount
where dt='$db_date'
)new
on old.coupon_id=new.coupon_id;
"
$hive -e "$sql"
11.5 活動主題寬表
- 建表
drop table if exists dwt.mall__activity_topic
CREATE EXTERNAL TABLE `dwt.mall__activity_topic`(
`id` string COMMENT '活動 id',
`activity_name` string COMMENT '活動名稱',
`order_day_count` bigint COMMENT '當日日下單次數',
`payment_day_count` bigint COMMENT '當日支付次數',
`order_count` bigint COMMENT '累積下單次數',
`payment_count` bigint COMMENT '累積支付次數'
) COMMENT '活動主題寬表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwt/mall/activity_topic/'
- 導入數據
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=dwt
table_name=activity_topic
hive_table_name=$APP2.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table $hive_table_name
select
nvl(new.id,old.id),
nvl(new.activity_name,old.activity_name),
nvl(new.order_count,0),
nvl(new.payment_count,0),
nvl(old.order_count,0)+nvl(new.order_count,0),
nvl(old.payment_count,0)+nvl(new.payment_count,0)
from
(
select
*
from dwt.mall__activity_topic
)old
full outer join
(
select
id,
activity_name,
order_count,
payment_count
from dws.mall__activity_info_daycount
where dt='$db_date'
)new
on old.id=new.id;
"
$hive -e "$sql"
12 ADS層構建
此層為最終數據需求層,考慮數據導出和數據數量決定是否需要壓縮,不需要分區,每天刷寫
12.1 設備主題
12.1.1 活躍設備數(日、周、月)
日活:當日活躍的設備數
周活:當周活躍的設備數
月活:當月活躍的設備數
- 建表
drop table if exists ads.mall__uv_count
CREATE EXTERNAL TABLE `ads.mall__uv_count`(
`dt` string COMMENT '統計日期',
`day_count` bigint COMMENT '當日用戶數量',
`wk_count` bigint COMMENT '當周用戶數量',
`mn_count` bigint COMMENT '當月用戶數量',
`is_weekend` string COMMENT 'Y,N 是否是周末,用於得到本周最終結果',
`is_monthend` string COMMENT 'Y,N 是否是月末,用於得到本月最終結果'
) COMMENT '活躍設備數表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/uv_count/'
tblproperties ("parquet.compression"="snappy")
- 導入數據
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=uv_count
hive_table_name=$APP2.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
'$db_date' dt,
daycount.ct,
wkcount.ct,
mncount.ct,
if(date_add(next_day('$db_date','MO'),-1)='$db_date','Y','N') ,
if(last_day('$db_date')='$db_date','Y','N')
from
(
select
'$db_date' dt,
count(*) ct
from dwt.mall__uv_topic
where login_date_last='$db_date'
)daycount join
(
select
'$db_date' dt,
count (*) ct
from dwt.mall__uv_topic
where login_date_last>=date_add(next_day('$db_date','MO'),-7)
and login_date_last<= date_add(next_day('$db_date','MO'),-1)
) wkcount on daycount.dt=wkcount.dt
join
(
select
'$db_date' dt,
count (*) ct
from dwt.mall__uv_topic
where
date_format(login_date_last,'yyyy-MM')=date_format('$db_date','yyyy-MM')
)mncount on daycount.dt=mncount.dt;
"
$hive -e "$sql"
12.1.2 每日新增設備
- 建表
drop table if exists ads.mall__new_mid_count
CREATE EXTERNAL TABLE `ads.mall__new_mid_count`(
`create_date` string comment '創建時間' ,
`new_mid_count` bigint comment '新增設備數量'
) COMMENT '每日新增設備表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/new_mid_count/'
tblproperties ("parquet.compression"="snappy")
- 導入數據
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=new_mid_count
hive_table_name=$APP2.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
login_date_first,
count(*)
from dwt.mall__uv_topic
where login_date_first='$db_date'
group by login_date_first;
"
$hive -e "$sql"
12.1.3 沉默用戶數
沉默用戶:只在安裝當天啟動過,且啟動時間是在 7 天前
- 建表
drop table if exists ads.mall__silent_count
CREATE EXTERNAL TABLE `ads.mall__silent_count`(
`dt` string COMMENT '統計日期',
`silent_count` bigint COMMENT '沉默設備數'
) COMMENT '沉默用戶數表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/silent_count/'
tblproperties ("parquet.compression"="snappy")
- 導入數據
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=silent_count
hive_table_name=$APP2.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
'$db_date',
count(*)
from dwt.mall__uv_topic
where login_date_first=login_date_last
and login_date_last<=date_add('$db_date',-7);
"
$hive -e "$sql"
12.1.4 本周迴流用戶數
本周迴流用戶:上周未活躍,本周活躍的設備,且不是本周新增設備
- 建表
drop table if exists ads.mall__back_count
CREATE EXTERNAL TABLE `ads.mall__back_count`(
`wk_dt` string COMMENT '統計日期所在周',
`wastage_count` bigint COMMENT '迴流設備數'
) COMMENT '本周迴流用戶數表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/back_count/'
tblproperties ("parquet.compression"="snappy")
- 導入數據
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=back_count
hive_table_name=$APP2.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
'$db_date',
count(*)
from
(
select
mid_id
from dwt.mall__uv_topic
where login_date_last>=date_add(next_day('$db_date','MO'),-7)
and login_date_last<= date_add(next_day('$db_date','MO'),-1)
and login_date_first<date_add(next_day('$db_date','MO'),-7)
)current_wk
left join
(
select
mid_id
from dws.mall__uv_detail_daycount
where dt>=date_add(next_day('$db_date','MO'),-7*2)
and dt<= date_add(next_day('$db_date','MO'),-7-1)
group by mid_id
)last_wk
on current_wk.mid_id=last_wk.mid_id
where last_wk.mid_id is null;
"
$hive -e "$sql"
12.1.5 流失用戶數
流失用戶:最近 7 天未活躍的設備
- 建表
drop table if exists ads.mall__wastage_count
CREATE EXTERNAL TABLE `ads.mall__wastage_count`(
`dt` string COMMENT '統計日期',
`wastage_count` bigint COMMENT '流失設備數'
) COMMENT '流失用戶數表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/wastage_count/'
tblproperties ("parquet.compression"="snappy")
- 導入數據
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=wastage_count
hive_table_name=$APP2.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
'$db_date',
count(*)
from
(
select
mid_id
from dwt.mall__uv_topic
where login_date_last<=date_add('$db_date',-7)
group by mid_id
)t1;
"
$hive -e "$sql"
12.1.6 留存率
- 建表
drop table if exists ads.mall__user_retention_day_rate
CREATE EXTERNAL TABLE `ads.mall__user_retention_day_rate`(
`stat_date` string comment '統計日期',
`create_date` string comment '設備新增日期',
`retention_day` int comment '截止當前日期留存天數',
`retention_count` bigint comment '留存數量',
`new_mid_count` bigint comment '設備新增數量',
`retention_ratio` decimal(10,2) comment '留存率'
) COMMENT '留存率表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/user_retention_day_rate/'
tblproperties ("parquet.compression"="snappy")
- 導入數據
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=user_retention_day_rate
hive_table_name=$APP2.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
'$db_date',--統計日期
date_add('$db_date',-1),--新增日期
1,--留存天數
sum(if(login_date_first=date_add('$db_date',-1) and
login_date_last='$db_date',1,0)),--2020-03-09 的 1 日留存數
sum(if(login_date_first=date_add('$db_date',-1),1,0)),--2020-03-09 新增
sum(if(login_date_first=date_add('$db_date',-1) and
login_date_last='$db_date',1,0))/sum(if(login_date_first=date_add('$db_date',-1),1,0))*100
from dwt.mall__uv_topic
union all
select
'$db_date',--統計日期
date_add('$db_date',-2),--新增日期
2,--留存天數
sum(if(login_date_first=date_add('$db_date',-2) and
login_date_last='$db_date',1,0)),--2020-03-08 的 2 日留存數
sum(if(login_date_first=date_add('$db_date',-2),1,0)),--2020-03-08 新增
sum(if(login_date_first=date_add('$db_date',-2) and
login_date_last='$db_date',1,0))/sum(if(login_date_first=date_add('$db_date',-2),1,0))*100
from dwt.mall__uv_topic
union all
select
'$db_date',--統計日期
date_add('$db_date',-3),--新增日期
3,--留存天數
sum(if(login_date_first=date_add('$db_date',-3) and
login_date_last='$db_date',1,0)),--2020-03-07 的 3 日留存數
sum(if(login_date_first=date_add('$db_date',-3),1,0)),--2020-03-07 新增
sum(if(login_date_first=date_add('$db_date',-3) and
login_date_last='$db_date',1,0))/sum(if(login_date_first=date_add('$db_date',-3),1,0))*100
from dwt.mall__uv_topic;
"
$hive -e "$sql"
12.1.7 最近連續三周活躍用戶數
- 建表
drop table if exists ads.mall__continuity_wk_count
CREATE EXTERNAL TABLE `ads.mall__continuity_wk_count`(
`dt` string COMMENT '統計日期,一般用結束周周日日期,如果每天計算一次,可用當天日期',
`wk_dt` string COMMENT '持續時間',
`continuity_count` bigint COMMENT '活躍次數'
) COMMENT '最近連續三周活躍用戶數表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/continuity_wk_count/'
tblproperties ("parquet.compression"="snappy")
- 導入數據
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=continuity_wk_count
hive_table_name=$APP2.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
'$db_date',
concat(date_add(next_day('$db_date','MO'),-7*3),'_',date_add(next_day('$db_date','MO'),-1)),
count(*)
from
(
select
mid_id
from
(
select
mid_id
from dws.mall__uv_detail_daycount
where dt>=date_add(next_day('$db_date','monday'),-7)
and dt<=date_add(next_day('$db_date','monday'),-1)
group by mid_id
union all
select
mid_id
from dws.mall__uv_detail_daycount
where dt>=date_add(next_day('$db_date','monday'),-7*2)
and dt<=date_add(next_day('$db_date','monday'),-7-1)
group by mid_id
union all
select
mid_id
from dws.mall__uv_detail_daycount
where dt>=date_add(next_day('$db_date','monday'),-7*3)
and dt<=date_add(next_day('$db_date','monday'),-7*2-1)
group by mid_id
)t1
group by mid_id
having count(*)=3
)t2
"
$hive -e "$sql"
12.1.8 最近七天內連續三天活躍用戶數
- 建表
drop table if exists ads.mall__continuity_uv_count
CREATE EXTERNAL TABLE `ads.mall__continuity_uv_count`(
`dt` string COMMENT '統計日期',
`wk_dt` string COMMENT '最近 7 天日期',
`continuity_count` bigint
) COMMENT '最近七天內連續三天活躍用戶數表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/continuity_uv_count/'
tblproperties ("parquet.compression"="snappy")
- 導入數據
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=continuity_uv_count
hive_table_name=$APP2.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
'$db_date',
concat(date_add('db_date',-6),'_','db_date'),
count(*)
from
(
select
mid_id
from
(
select
mid_id
from
(
select
mid_id,
date_sub(dt,rank) date_dif
from
(
select
mid_id,
dt,
rank() over(partition by mid_id order by dt) rank
from dws.mall__uv_detail_daycount
where dt>=date_add('db_date',-6) and
dt<='db_date'
)t1
)t2
group by mid_id,date_dif
having count(*)>=3
)t3
group by mid_id
)t4;
"
$hive -e "$sql"
12.2 會員主題
12.2.1 會員主題資訊
- 建表
drop table if exists ads.mall__user_topic
CREATE EXTERNAL TABLE `ads.mall__user_topic`(
`dt` string COMMENT '統計日期',
`day_users` string COMMENT '活躍會員數',
`day_new_users` string COMMENT '新增會員數',
`day_new_payment_users` string COMMENT '新增消費會員數',
`payment_users` string COMMENT '總付費會員數',
`users` string COMMENT '總會員數',
`day_users2users` decimal(10,2) COMMENT '會員活躍率',
`payment_users2users` decimal(10,2) COMMENT '會員付費率',
`day_new_users2users` decimal(10,2) COMMENT '會員新鮮度'
) COMMENT '會員主題資訊表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/user_topic/'
tblproperties ("parquet.compression"="snappy")
- 導入數據
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=user_topic
hive_table_name=$APP2.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
'$db_date',
sum(if(login_date_last='$db_date',1,0)),
sum(if(login_date_first='$db_date',1,0)),
sum(if(payment_date_first='$db_date',1,0)),
sum(if(payment_count>0,1,0)),
count(*),
sum(if(login_date_last='$db_date',1,0))/count(*),
sum(if(payment_count>0,1,0))/count(*),
sum(if(login_date_first='$db_date',1,0))/sum(if(login_date_last='$db_date',1,0))
from dwt.mall__user_topic
"
$hive -e "$sql"
12.2.2 漏斗分析
統計「瀏覽->購物車->下單->支付」的轉化率
思路:統計各個行為的人數,然後計算比值。
- 建表
drop table if exists ads.mall__user_action_convert_day
CREATE EXTERNAL TABLE `ads.mall__user_action_convert_day`(
`dt` string COMMENT '統計日期',
`total_visitor_m_count` bigint COMMENT '總訪問人數',
`cart_u_count` bigint COMMENT '加入購物車的人數',
`visitor2cart_convert_ratio` decimal(10,2) COMMENT '訪問到加入購物車轉化率',
`order_u_count` bigint COMMENT '下單人數',
`cart2order_convert_ratio` decimal(10,2) COMMENT '加入購物車到下單轉化率',
`payment_u_count` bigint COMMENT '支付人數',
`order2payment_convert_ratio` decimal(10,2) COMMENT '下單到支付的轉化率'
) COMMENT '漏斗分析表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/user_action_convert_day/'
tblproperties ("parquet.compression"="snappy")
- 導入數據
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=user_action_convert_day
hive_table_name=$APP2.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
'$db_date',
uv.day_count,
ua.cart_count,
cast(ua.cart_count/uv.day_count as decimal(10,2)) visitor2cart_convert_ratio,
ua.order_count,
cast(ua.order_count/ua.cart_count as decimal(10,2)) visitor2order_convert_ratio,
ua.payment_count,
cast(ua.payment_count/ua.order_count as decimal(10,2)) order2payment_convert_ratio
from
(
select
dt,
sum(if(cart_count>0,1,0)) cart_count,
sum(if(order_count>0,1,0)) order_count,
sum(if(payment_count>0,1,0)) payment_count
from dws.mall__user_action_daycount
where dt='$db_date'
group by dt
)ua join ads.mall__uv_count uv on uv.dt=ua.dt;
"
$hive -e "$sql"
12.3 商品主題
12.3.1 商品個數資訊
- 建表
drop table if exists ads.mall__product_info
CREATE EXTERNAL TABLE `ads.mall__product_info`(
`dt` string COMMENT '統計日期',
`sku_num` string COMMENT 'sku 個數',
`spu_num` string COMMENT 'spu 個數'
) COMMENT '商品個數資訊表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/product_info/'
tblproperties ("parquet.compression"="snappy")
- 導入數據
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=product_info
hive_table_name=$APP2.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
'$db_date' dt,
sku_num,
spu_num
from
(
select
'$db_date' dt,
count(*) sku_num
from
dwt.mall__sku_topic
) tmp_sku_num
join
(
select
'$db_date' dt,
count(*) spu_num
from
(
select
spu_id
from
dwt.mall__sku_topic
group by
spu_id
) tmp_spu_id
) tmp_spu_num
on
tmp_sku_num.dt=tmp_spu_num.dt;
"
$hive -e "$sql"
12.3.2 商品銷量排行
- 建表
drop table if exists ads.mall__product_sale_topN
CREATE EXTERNAL TABLE `ads.mall__product_sale_topN`(
`dt` string COMMENT '統計日期',
`sku_num` string COMMENT 'sku 個數',
`spu_num` string COMMENT 'spu 個數'
) COMMENT '商品銷量排名表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/product_sale_topN/'
tblproperties ("parquet.compression"="snappy")
- 導入數據
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=product_sale_topN
hive_table_name=$APP2.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
'$db_date' dt,
sku_id,
payment_amount
from
dws.mall__sku_action_daycount
where
dt='$db_date'
order by payment_amount desc
limit 10;
"
$hive -e "$sql"
12.3.3 商品收藏排名
- 建表
drop table if exists ads.mall__product_favor_topN
CREATE EXTERNAL TABLE `ads.mall__product_favor_topN`(
`dt` string COMMENT '統計日期',
`sku_id` string COMMENT '商品 ID',
`favor_count` bigint COMMENT '收藏量'
) COMMENT '商品收藏排名表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/product_favor_topN/'
tblproperties ("parquet.compression"="snappy")
- 導入數據
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=product_favor_topN
hive_table_name=$APP2.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
'$db_date' dt,
sku_id,
favor_count
from
dws.mall__sku_action_daycount
where
dt='$db_date'
order by favor_count desc
limit 10;
"
$hive -e "$sql"
12.3.4 商品加入購物車排名
- 建表
drop table if exists ads.mall__product_cart_topN
CREATE EXTERNAL TABLE `ads.mall__product_cart_topN`(
`dt` string COMMENT '統計日期',
`sku_id` string COMMENT '商品 ID',
`cart_num` bigint COMMENT '加入購物車數量'
) COMMENT '商品加入購物車排名表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/product_cart_topN/'
tblproperties ("parquet.compression"="snappy")
- 導入數據
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=product_cart_topN
hive_table_name=$APP2.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
'$db_date' dt,
sku_id,
cart_num
from
dws.mall__sku_action_daycount
where
dt='$db_date'
order by cart_num desc
limit 10;
"
$hive -e "$sql"
12.3.5 商品退款率排名(近30天)
- 建表
drop table if exists ads.mall__product_refund_topN
CREATE EXTERNAL TABLE `ads.mall__product_refund_topN`(
`dt` string COMMENT '統計日期',
`sku_id` string COMMENT '商品 ID',
`refund_ratio` decimal(10,2) COMMENT '退款率'
) COMMENT '商品退款率排名(最近 30 天)表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/product_refund_topN/'
tblproperties ("parquet.compression"="snappy")
- 導入數據
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=product_refund_topN
hive_table_name=$APP2.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
'$db_date',
sku_id,
refund_last_30d_count/payment_last_30d_count*100 refund_ratio
from dwt.mall__sku_topic
order by refund_ratio desc
limit 10;
"
$hive -e "$sql"
12.3.6 商品差評率
- 建表
drop table if exists ads.mall__appraise_bad_topN
CREATE EXTERNAL TABLE `ads.mall__appraise_bad_topN`(
`dt` string COMMENT '統計日期',
`sku_id` string COMMENT '商品 ID',
`appraise_bad_ratio` decimal(10,2) COMMENT '差評率'
) COMMENT '商品差評率表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/appraise_bad_topN/'
tblproperties ("parquet.compression"="snappy")
- 導入數據
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=appraise_bad_topN
hive_table_name=$APP2.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
'$db_date' dt,
sku_id,
appraise_bad_count/(appraise_good_count+appraise_mid_count+appraise_bad_count+appraise_default_count) appraise_bad_ratio
from
dws.mall__sku_action_daycount
where
dt='$db_date'
order by appraise_bad_ratio desc
limit 10;
"
$hive -e "$sql"
12.4 營銷主題
12.4.1 下單數目統計
- 建表
drop table if exists ads.mall__order_daycount
CREATE EXTERNAL TABLE `ads.mall__order_daycount`(
dt string comment '統計日期',
order_count bigint comment '單日下單筆數',
order_amount bigint comment '單日下單金額',
order_users bigint comment '單日下單用戶數'
) COMMENT '下單數目統計表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/order_daycount/'
tblproperties ("parquet.compression"="snappy")
- 導入數據
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=order_daycount
hive_table_name=$APP2.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
'$db_date',
sum(order_count),
sum(order_amount),
sum(if(order_count>0,1,0))
from dws.mall__user_action_daycount
where dt='$db_date';
"
$hive -e "$sql"
12.4.2 支付資訊統計
- 建表
drop table if exists ads.mall__payment_daycount
CREATE EXTERNAL TABLE `ads.mall__payment_daycount`(
dt string comment '統計日期',
order_count bigint comment '單日支付筆數',
order_amount bigint comment '單日支付金額',
payment_user_count bigint comment '單日支付人數',
payment_sku_count bigint comment '單日支付商品數',
payment_avg_time double comment '下單到支付的平均時長,取分鐘數'
) COMMENT '支付資訊統計表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/payment_daycount/'
tblproperties ("parquet.compression"="snappy")
- 導入數據
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=payment_daycount
hive_table_name=$APP2.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
tmp_payment.dt,
tmp_payment.payment_count,
tmp_payment.payment_amount,
tmp_payment.payment_user_count,
tmp_skucount.payment_sku_count,
tmp_time.payment_avg_time
from
(
select
'$db_date' dt,
sum(payment_count) payment_count,
sum(payment_amount) payment_amount,
sum(if(payment_count>0,1,0)) payment_user_count
from dws.mall__user_action_daycount
where dt='$db_date'
)tmp_payment
join
(
select
'$db_date' dt,
sum(if(payment_count>0,1,0)) payment_sku_count
from dws.mall__sku_action_daycount
where dt='$db_date'
)tmp_skucount on tmp_payment.dt=tmp_skucount.dt
join
(
select
'$db_date' dt,
sum(unix_timestamp(payment_time)-unix_timestamp(create_time))/count(*)/60
payment_avg_time
from dwd.mall__fact_order_info
where dt='$db_date'
and payment_time is not null
)tmp_time on tmp_payment.dt=tmp_time.dt
"
$hive -e "$sql"
12.4.3 復購率
- 建表
drop table if exists ads.mall__sale_tm_category1_stat_mn
CREATE EXTERNAL TABLE `ads.mall__sale_tm_category1_stat_mn`(
tm_id string comment '品牌 id',
category1_id string comment '1 級品類 id ',
category1_name string comment '1 級品類名稱 ',
buycount bigint comment '購買人數',
buy_twice_last bigint comment '兩次以上購買人數',
buy_twice_last_ratio decimal(10,2) comment '單次復購率',
buy_3times_last bigint comment '三次以上購買人數',
buy_3times_last_ratio decimal(10,2) comment '多次復購率',
stat_mn string comment '統計月份',
stat_date string comment '統計日期'
) COMMENT '復購率表'
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/ads/mall/sale_tm_category1_stat_mn/'
tblproperties ("parquet.compression"="snappy")
- 導入數據
#!/bin/bash
db_date=${date}
hive=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/bin/hive
APP1=mall
APP2=ads
table_name=sale_tm_category1_stat_mn
hive_table_name=$APP2.mall__$table_name
# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "${date}" ] ;then
db_date=${date}
else
db_date=`date -d "-1 day" +%F`
fi
sql="
insert into table $hive_table_name
select
mn.sku_tm_id,
mn.sku_category1_id,
mn.sku_category1_name,
sum(if(mn.order_count>=1,1,0)) buycount,
sum(if(mn.order_count>=2,1,0)) buyTwiceLast,
sum(if(mn.order_count>=2,1,0))/sum( if(mn.order_count>=1,1,0)) buyTwiceLastRatio,
sum(if(mn.order_count>=3,1,0)) buy3timeLast ,
sum(if(mn.order_count>=3,1,0))/sum( if(mn.order_count>=1,1,0)) buy3timeLastRatio,
date_format('$db_date' ,'yyyy-MM') stat_mn,
'$db_date' stat_date
from
(
select
user_id,
sd.sku_tm_id,
sd.sku_category1_id,
sd.sku_category1_name,
sum(order_count) order_count
from dws.mall__sale_detail_daycount sd
where date_format(dt,'yyyy-MM')=date_format('$db_date' ,'yyyy-MM')
group by user_id, sd.sku_tm_id, sd.sku_category1_id, sd.sku_category1_name
) mn
group by mn.sku_tm_id, mn.sku_category1_id, mn.sku_category1_name;
"
$hive -e "$sql"