數據的另一種展示形式,Hive 拉鏈表實踐
- 2020 年 4 月 11 日
- 筆記
背景
拉鏈表是一種數據模型,主要是針對數據倉庫設計中表存儲數據的方式而定義的;顧名思義,所謂拉鏈表,就是記錄歷史。記錄一個事務從開始一直到當前狀態的所有變化的資訊。
拉鏈表可以避免按每一天存儲所有記錄造成的海量存儲問題,同時也是處理緩慢變化數據(SCD2)的一種常見方式。
應用場景
現假設有如下場景:一個企業擁有5000萬會員資訊,每天有20萬會員資料變更,需要在數倉中記錄會員錶的歷史變化以備分析使用,即每天都要保留一個快照供查詢,反映歷史數據的情況。
在此場景中,需要反映5000萬會員的歷史變化,如果保留快照,存儲兩年就需要2X365X5000W條數據存儲空間,數據量為365億,如果存儲更長時間,則無法估計需要的存儲空間。而利用拉鏈演算法存儲,每日只向歷史表中添加新增和變化的數據,每日不過20萬條,存儲4年也只需要3億存儲空間。
實現步驟
在拉鏈表中,每一條數據都有一個生效日期(effective_date)和失效日期(expire_date)。假設在一個用戶表中,在2019年11月8日新增了兩個用戶,如下表所示,則這兩條記錄的生效時間為當天,由於到2019年11月8日為止,這兩條就還沒有被修改過,所以失效時間為一個給定的比較大的值,比如:3000-12-31
member_id |
phoneno |
create_time |
update_time |
---|---|---|---|
10001 |
13300000001 |
2019-11-08 |
3000-12-31 |
10002 |
13500000002 |
2019-11-08 |
3000-12-31 |
第二天(2019-11-09),用戶10001被刪除了,用戶10002的電話號碼被修改成13600000002.為了保留歷史狀態,用戶10001的失效時間被修改為2019-11-09,用戶10002則變成了兩條記錄,如下表所示:
member_id |
phoneno |
create_time |
update_time |
---|---|---|---|
10001 |
13300000001 |
2019-11-08 |
2019-11-09 |
10002 |
13500000002 |
2019-11-08 |
2019-11-09 |
10002 |
13600000002 |
2019-11-09 |
3000-12-31 |
第三天(2019-11-10),又新增了用戶10003,則用戶表數據如小表所示:
member_id |
phoneno |
create_time |
update_time |
---|---|---|---|
10001 |
13300000001 |
2019-11-08 |
2019-11-09 |
10002 |
13500000002 |
2019-11-08 |
2019-11-09 |
10002 |
13600000002 |
2019-11-09 |
3000-12-31 |
10003 |
13300000006 |
2019-11-10 |
3000-12-31 |
如果要查詢最新的數據,那麼只要查詢失效時間為3000-12-31的數據即可,如果要查11月8號的歷史數據,則篩選生效時間<= 2019-11-08並且失效時間>2019-11-08的數據即可。如果查詢11月9號的數據,那麼篩選條件則是生效時間<=2019-11-09並且失效時間>2019-11-09
表結構
- MySQL源member表
CREATE TABLE member( member_id VARCHAR ( 64 ), phoneno VARCHAR ( 20 ), create_time datetime, update_time datetime );
- ODS層增量表member_delta,每天一個分區
CREATE TABLE member_delta (member_id string, phoneno string, create_time string, update_time string) PARTITIONED BY (DAY string);
- 臨時表
CREATE TABLE member_his_tmp (member_id string, phoneno string, effective_date date, expire_date date );
- DW層歷史拉鏈表
CREATE TABLE member_his (member_id string, phoneno string, effective_date date, expire_date date);
Demo數據準備
2019-11-08的數據為:
member_id |
phoneno |
create_time |
update_time |
---|---|---|---|
10001 |
13500000001 |
2019-11-08 14:47:55 |
2019-11-08 14:47:55 |
10002 |
13500000002 |
2019-11-08 14:48:33 |
2019-11-08 14:48:33 |
10003 |
13500000003 |
2019-11-08 14:48:53 |
2019-11-08 14:48:53 |
10004 |
13500000004 |
2019-11-08 14:49:02 |
2019-11-08 14:49:02 |
2019-11-09的數據為:其中藍色代表新增數據,紅色代表修改的數據
member_id |
phoneno |
create_time |
update_time |
---|---|---|---|
10001 |
13500000001 |
2019-11-08 14:47:55 |
2019-11-08 14:47:55 |
10002 |
13600000002 |
2019-11-08 14:48:33 |
2019-11-09 14:48:33 |
10003 |
13500000003 |
2019-11-08 14:48:53 |
2019-11-08 14:48:53 |
10004 |
13500000004 |
2019-11-08 14:49:02 |
2019-11-08 14:49:02 |
10005 |
13500000005 |
2019-11-09 08:54:03 |
2019-11-09 08:54:03 |
10006 |
13500000006 |
2019-11-09 09:54:25 |
2019-11-09 09:54:25 |
2019-11-10的數據:其中藍色代表新增數據,紅色代表修改的數據
member_id |
phoneno |
create_time |
update_time |
---|---|---|---|
10001 |
13500000001 |
2019-11-08 14:47:55 |
2019-11-08 14:47:55 |
10002 |
13600000002 |
2019-11-08 14:48:33 |
2019-11-09 14:48:33 |
10003 |
13500000003 |
2019-11-08 14:48:53 |
2019-11-08 14:48:53 |
10004 |
13600000004 |
2019-11-08 14:49:02 |
2019-11-10 14:49:02 |
10005 |
13500000005 |
2019-11-09 08:54:03 |
2019-11-09 08:54:03 |
10006 |
13500000006 |
2019-11-09 09:54:25 |
2019-11-09 09:54:25 |
10007 |
13500000007 |
2019-11-10 17:41:49 |
2019-11-10 17:41:49 |
全量初始裝載
在啟用拉鏈表時,先對其進行初始裝載,比如以2019-11-08為開始時間,那麼將MySQL源表全量抽取到ODS層member_delta表的2018-11-08的分區中,然後初始裝載DW層的拉鏈表member_his
INSERT overwrite TABLE member_his SELECT member_id, phoneno, to_date ( create_time ) AS effective_date, '3000-12-31' FROM member_delta WHERE DAY = '2019-11-08'
查詢初始的歷史拉鏈表數據
增量抽取數據
每天,從源系統member表中,將前一天的增量數據抽取到ODS層的增量數據表member_delta對應的分區中。這裡的增量需要通過member表中的創建時間和修改時間來確定,或者使用sqoop job監控update時間來進行增聯抽取。比如,本案例中2019-11-09和2019-11-10為兩個分區,分別存儲了2019-11-09和2019-11-10日的增量數據。2019-11-09分區的數據為:
2019-11-10分區的數據為:
增量刷新歷史拉鏈數據
- 2019-11-09增量刷新歷史拉鏈表將數據放進臨時表
INSERT overwrite TABLE member_his_tmp SELECT * FROM ( -- 2019-11-09增量數據,代表最新的狀態,該數據的生效時間是2019-11-09,過期時間為3000-12-31 -- 這些增量的數據需要被全部載入到歷史拉鏈表中 SELECT member_id, phoneno, '2019-11-09' effective_date, '3000-12-31' expire_date FROM member_delta WHERE DAY='2019-11-09' UNION ALL -- 用當前為生效狀態的拉鏈數據,去left join 增量數據, -- 如果匹配得上,則表示該數據已發生了更新, -- 此時,需要將發生更新的數據的過期時間更改為當前時間. -- 如果匹配不上,則表明該數據沒有發生更新,此時過期時間不變 SELECT a.member_id, a.phoneno, a.effective_date, if(b.member_id IS NULL, to_date(a.expire_date), to_date(b.day)) expire_date FROM (SELECT * FROM member_his WHERE expire_date='3000-12-31') a LEFT JOIN (SELECT * FROM member_delta WHERE DAY='2019-11-09') b ON a.member_id=b.member_id)his
將數據覆蓋到歷史拉鏈表
INSERT overwrite TABLE member_his SELECT * FROM member_his_tmp
查看歷史拉鏈表
- 2019-11-10增量刷新歷史拉鏈表
將數據放進臨時表
INSERT overwrite TABLE member_his_tmp SELECT * FROM ( -- 2019-11-10增量數據,代表最新的狀態,該數據的生效時間是2019-11-10,過期時間為3000-12-31 -- 這些增量的數據需要被全部載入到歷史拉鏈表中 SELECT member_id, phoneno, '2019-11-10' effective_date, '3000-12-31' expire_date FROM member_delta WHERE DAY='2019-11-10' UNION ALL -- 用當前為生效狀態的拉鏈數據,去left join 增量數據, -- 如果匹配得上,則表示該數據已發生了更新, -- 此時,需要將發生更新的數據的過期時間更改為當前時間. -- 如果匹配不上,則表明該數據沒有發生更新,此時過期時間不變 SELECT a.member_id, a.phoneno, a.effective_date, if(b.member_id IS NULL, to_date(a.expire_date), to_date(b.day)) expire_date FROM (SELECT * FROM member_his WHERE expire_date='3000-12-31') a LEFT JOIN (SELECT * FROM member_delta WHERE DAY='2019-11-10') b ON a.member_id=b.member_id)his
查看歷史拉鏈表
將以上腳本封裝成shell調度的腳本
#!/bin/bash #如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天 if [ -n "$1" ] ;then do_date=$1 else do_date=`date -d "-1 day" +%F` fi sql=" INSERT overwrite TABLE member_his_tmp SELECT * FROM ( -- 2019-11-10增量數據,代表最新的狀態,該數據的生效時間是2019-11-10,過期時間為3000-12-31 -- 這些增量的數據需要被全部載入到歷史拉鏈表中 SELECT member_id, phoneno, '$do_date' effective_date, '3000-12-31' expire_date FROM member_delta WHERE DAY='$do_date' UNION ALL -- 用當前為生效狀態的拉鏈數據,去left join 增量數據, -- 如果匹配得上,則表示該數據已發生了更新, -- 此時,需要將發生更新的數據的過期時間更改為當前時間. -- 如果匹配不上,則表明該數據沒有發生更新,此時過期時間不變 SELECT a.member_id, a.phoneno, a.effective_date, if(b.member_id IS NULL, to_date(a.expire_date), to_date(b.day)) expire_date FROM (SELECT * FROM member_his WHERE expire_date='3000-12-31') a LEFT JOIN (SELECT * FROM member_delta WHERE DAY='$do_date') b ON a.member_id=b.member_id)his; " $hive -e "$sql"
—END—
如果感覺文章有用,記得分享給你的朋友哦(在看也是鼓勵!)