好文分享|一文帶你搞清楚什麼是「數據傾斜」
- 2019 年 12 月 4 日
- 筆記

什麼是數據傾斜
我們在用hive取數的時候,有的時候只是跑一個簡單的join語句,但是卻跑了很長的時間,有的時候我們會覺得是集群資源不夠導致的,但是很大情況下就是出現了"數據傾斜"的情況。
在了解數據傾斜之前,我們應該有一個常識,就是現實生活中的數據分佈是不均勻的,俗話說"28定理",80%的財富集中在20%的人手中之類的故事相信大家都看得不少。所以,在我們日常處理的現實數據中,也是符合這種數據分佈的,數據傾斜一般有兩種情況:
- 變量值很少: 單個變量值的佔比極大,常見的字段如性別、學歷、年齡等。
- 變量值很多: 單個變量值的佔比極小,常見的字段如收入、訂單金額之類的。
數據傾斜,在MapReduce編程模型中十分常見,就是大量的相同key被partition分配到一個分區里,造成了"一個人累死,其他人閑死"的情況,這違背了並行計算的初衷,整體的效率是十分低下的。
數據傾斜的原因
當我們看任務進度長時間維持在99%(或100%),查看任務監控頁面就會發現只有少量(1個或幾個)reduce子任務未完成。因為其處理的數據量和其他reduce差異過大,這就是數據傾斜的直接表現。
而導致這個的原因,大致可以分為下面幾點:
- key分佈不均勻
- 業務數據本身的特性
- 建表時考慮不周
- 某些SQL語句本身就有數據傾斜
具體可以體現在下面的常見操作:

備註:圖片文字內容來自網絡
Hadoop計算框架的特點
在了解如何避免數據傾斜之前,我們先來看看Hadoop框架的特性:
- 大數據量不是大問題,數據傾斜才是大問題;
- jobs數比較多的作業效率相對比較低,比如即使有幾百萬的表,如果多次關聯多次匯總,產生十幾個jobs,耗時很長。原因是map reduce作業初始化的時間是比較長的;
- sum,count,max,min等UDAF(User Defined Aggregate Function:自定義函數),不怕數據傾斜問題,hadoop在map端的匯總並優化,使數據傾斜不成問題;
- count(distinct),在數據量大的情況下,效率較低,如果是多count(distinct)效率更低,因為count(distinct)是按group by字段分組,按distinct字段排序,一般這種分佈式是很傾斜的,比如男uv,女uv,淘寶一天30億的pv,如果按性別分組,分配2個reduce,每個reduce處理15億數據。
✌️ 優化的常用手段
說的是優化手段,但更多的是"踩坑"的經驗之談。
☝️ 優化之道:
- 首先要了解數據分佈,自己動手解決數據傾斜問題是個不錯的選擇;
- 增加jvm(Java Virtual Machine:Java虛擬機)內存,這適用於變量值非常少的情況,這種情況下,往往只能通過硬件的手段來進行調優,增加jvm內存可以顯著的提高運行效率;
- 增加reduce的個數,這適用於變量值非常多的情況,這種情況下最容易造成的結果就是大量相同key被partition到一個分區,從而一個reduce執行了大量的工作;
- 重新設計key,有一種方案是在map階段時給key加上一個隨機數,有了隨機數的key就不會被大量的分配到同一節點(小几率),待到reduce後再把隨機數去掉即可;
- 使用combiner合併。combinner是在map階段,reduce之前的一個中間階段,在這個階段可以選擇性的把大量的相同key數據先進行一個合併,可以看做是local reduce,然後再交給reduce來處理,減輕了map端向reduce端發送的數據量(減輕了網絡帶寬),也減輕了map端和reduce端中間的shuffle階段的數據拉取數量(本地化磁盤IO速率);(hive.map.aggr=true)
- 設置合理的map reduce的task數,能有效提升性能。(比如,10w+級別的計算,用160個reduce,那是相當的浪費,1個足夠);
- 數據量較大的情況下,慎用count(distinct),count(distinct)容易產生傾斜問題;
- hive.groupby.skewindata=true 有數據傾斜的時候進行負載均衡,當選項設定為 true,生成的查詢計劃會有兩個 MR Job。第一個 MR Job 中,Map 的輸出結果集合會隨機分佈到 Reduce 中,每個 Reduce 做部分聚合操作,並輸出結果,這樣處理的結果是相同的 Group By Key 有可能被分發到不同的 Reduce 中,從而達到負載均衡的目的;第二個 MR Job 再根據預處理的數據結果按照 Group By Key 分佈到 Reduce 中(這個過程可以保證相同的 Group By Key 被分佈到同一個 Reduce 中),最後完成最終的聚合操作。
✌️ SQL語句調節:
- 如何Join: 關於驅動表的選取,選用join key分佈最均勻的表作為驅動表; 做好列裁剪和filter操作,以達到兩表做join的時候,數據量相對變小的效果。
- 大小表Join: 使用map join讓小的維度表(1000條以下的記錄條數) 先進內存。在map端完成reduce。
- 大表Join大表: 把空值的key變成一個字符串加上隨機數,把傾斜的數據分到不同的reduce上,由於null值關聯不上,處理後並不影響最終結果。
- count distinct大量相同特殊值: count distinct時,將值為空的情況單獨處理,如果是計算count distinct,可以不用處理,直接過濾,在最後結果中加1。如果還有其他計算,需要進行group by,可以先將值為空的記錄單獨處理,再和其他計算結果進行union。
- group by維度過小: 採用sum() group by的方式來替換count(distinct)完成計算。
- 特殊情況特殊處理: 在業務邏輯優化效果的不大情況下,有些時候是可以將傾斜的數據單獨拿出來處理。最後union回去。
看完上面的經驗總結還是有點懵逼?說實話我也是的,太多的信息量衝擊,不過我們可以收藏起來以後繼續看多幾次,加深印象。
接下來,我們將從一些具體的案例來講講SQL語句優化的技巧,非常常用,對我們日常寫SQL很有幫助。
優化案例
場景1:RAC常用(real application clusters的縮寫,譯為「實時應用集群」)
有一張user表,為賣家每天收入表,user_id,dt(日期)為key,屬性有主營類目(cat),指標有交易金額,交易比數。每天要取每個user的主營類目在過去10天的總收入,總比數。
常規做法:取每個user_id最近一天的主營類目,存入臨時表t1,匯總過去10天的總交易金額,交易比數,存入臨時表t2,連接t1,t2,得到最終的結果。
優化做法:
SELECT user_id , substr(MAX(concat(dt, cat)), 9) AS main_cat , SUM(qty), SUM(amt) FROM users WHERE dt BETWEEN 20101201 AND 20101210 GROUP BY user_id;
場景2:空值產生的數據傾斜(最常見的現象)
日誌中,常會有信息丟失的問題,比如全網日誌中的 user_id,如果取其中的 user_id 和 bmw_users 關聯,會碰到數據傾斜的問題。
解決方式1:user_id為空的不參與關聯
SELECT * FROM log a JOIN bmw_users b ON a.user_id IS NOT NULL AND a.user_id = b.user_id UNION ALL SELECT * FROM log a WHERE a.user_id IS NULL;
解決方式2:賦與空值分新的key值
SELECT * FROM log a LEFT JOIN bmw_users b ON CASE WHEN a.user_id IS NULL THEN concat(『dp_hive』, rand()) ELSE a.user_id END = b.user_id;
結論:方法2比方法1效率更好,不但io少了,而且作業數也少了。解決方法1 log表被讀取了兩次,jobs是2。這個優化適合無效 id (比如 -99 , 』』, null 等) 產生的傾斜問題。把空值的 key 變成一個字符串加上隨機數,就能把傾斜的數據分到不同的reduce上 ,解決數據傾斜問題。
場景3:不同數據類型關聯產生數據傾斜
一張表 s8_log,每個商品一條記錄,要和商品表關聯。但關聯卻碰到傾斜的問題。s8_log 中有字符串商品 id,也有數字的商品 id。字符串商品 id 類型是 string 的,但商品中的數字 id 是 bigint 的。問題的原因是把 s8_log 的商品 id 轉成數字 id 做 Hash(數字的 Hash 值為其本身,相同的字符串的 Hash 也不同)來分配 Reducer,所以相同字符串 id 的 s8_log,都到一個 Reducer 上了。
解決方式:把數字類型轉換成字符串類型
SELECT * FROM s8_log a LEFT JOIN r_auction_auctions b ON a.auction_id = CAST(b.auction_id AS string);
場景4:多表 union all 會優化成一個 job
推廣效果表要和商品表關聯,效果表中的 auction id 列既有商品 id,也有數字 id,和商品表關聯得到商品的信息。
SELECT * FROM effect a JOIN ( SELECT auction_id AS auction_id FROM auctions UNION ALL SELECT auction_string_id AS auction_id FROM auctions ) b ON a.auction_id = b.auction_id;
結論:這樣子比分別過濾數字 id,字符串 id ,然後分別和商品表關聯性能要好。這樣寫的好處:1個 MR 作業,商品表只讀取一次,推廣效果表只讀取一次。把這個 sql 換成 MR 代碼的話,map 的時候,把 a 表的記錄打上標籤 a ,商品表記錄每讀取一條,打上標籤 t,變成兩個<key,value> 對,<t,數字id,value>,<t,字符串id,value>。所以商品表的 HDFS(Hadoop Distributed File System) 讀只會是一次。
場景5:消滅子查詢內的 group by
原寫法:
SELECT * FROM ( SELECT * FROM t1 GROUP BY c1, c2, c3 UNION ALL SELECT * FROM t2 GROUP BY c1, c2, c3 ) t3 GROUP BY c1, c2, c3;
優化寫法:
SELECT * FROM ( SELECT * FROM t1 UNION ALL SELECT * FROM t2 ) t3 GROUP BY c1, c2, c3;
結論:從業務邏輯上說,子查詢內的 group by 功能與外層的 group by 重複,除非子查詢內有 count(distinct)。經過測試,並未出現 union all 的 hive bug,數據是一致的。MR 的作業數由3減少到1。t1 相當於一個目錄,t2 相當於一個目錄,對map reduce程序來說,t1,t2 可以做為 map reduce 作業的 mutli inputs。這可以通過一個 map reduce 來解決這個問題。Hadoop的計算框架,不怕數據多,怕作業數多。
場景6:消滅子查詢內的count(distinct),max,min
原寫法:
SELECT c1, c2, c3, sum(pv) FROM ( SELECT c1, c2, c3, COUNT(c4) FROM t1 GROUP BY c1, c2, c3 UNION ALL SELECT c1, c2, c3, COUNT(DISTINCT c4) FROM t2 GROUP BY c1, c2, c3 ) t3 GROUP BY c1, c2, c3;
這種我們不能直接union 再groupby,因為其中有一個表的操作用到了去重,這種情況,我們可以通過建立臨時表來消滅這種數據傾斜問題。
優化寫法:
INSERT INTO t4 SELECT c1, c2, c3, COUNT(DISTINCT c4) FROM t2 GROUP BY c1, c2, c3; SELECT c1, c2, c3, SUM(pv) FROM ( SELECT c1, c2, c3, COUNT(c4) FROM t1 UNION ALL SELECT * FROM t4 ) t3 GROUP BY c1, c2, c3;
場景7:兩張大表join
有兩張表,一張是用戶訪問日誌表log,一張是用戶表users,其中log表上T,user表也上G,如何每日做到快速連接呢?
解決方法:
SELECT * FROM log a LEFT JOIN ( SELECT d.* FROM ( SELECT DISTINCT memberid FROM log ) c JOIN users d ON c.memberid = d.memberid ) x ON a.memberid = b.memberid;
上面代碼的意思,就是我們可以通過縮小主鍵的範圍來達到減少表的連接操作,比如說限值某段時間,這樣子,memberid就會有所減少了,而不是全量數據。
場景8:reduce的時間過長
還是場景7的例子,假設一個memberid對應的log里有很多數據,那麼最後合併的時候,也是十分耗時的,所以,這裡需要找到一個方法來解決這種reduce分配不均的問題。
解決方法:
SELECT * FROM log a LEFT JOIN ( SELECT memberid, number FROM users d JOIN num e ) b ON a.memberid = b.memberid AND mod(a.pvtime, 30) + 1 = b.number;
解釋一下,上面的num是一張1列30行的表,對應1-30的正整數,把users表膨脹成N份(基於傾斜程度做一個合適的選擇),然後把log數據根據memberid和pvtime分到不同的reduce里去,這樣可以保證每個reduce分配到的數據可以相對均勻。
場景9:過多的where條件
有的時候,我們會寫超級多的where條件來限制查詢,其實這樣子是非常低效的,主要原因是因為這個and條件hive在生成執行計劃時產生了一個嵌套層次很多的算子。
解決方案:
1)把篩選條件對應的值寫入一張小表,再一次性join到主表;
2)或者寫個udf(user-defined function,用戶定義函數),把這些預設值讀取進去,udf來完成這個and數據過濾操作。
場景10:分組結果很多,但是你只需要topK
原寫法:
SELECT mid, url, COUNT(1) AS cnt FROM ( SELECT * FROM r_atpanel_log WHERE pt = '20190610' AND pagetype = 'normal' ) subq GROUP BY mid, url ORDER BY cnt DESC LIMIT 15;
優化寫法:
SELECT * FROM ( SELECT mid, url, COUNT(1) AS cnt FROM ( SELECT * FROM r_atpanel_log WHERE pt = '20190610' AND pagetype = 'normal' ) subq GROUP BY mid, url ) subq2 WHERE cnt > 100 ORDER BY cnt DESC LIMIT 15;
可以看出,我們先過濾掉無關的內容,再進行排序,這樣子快很多。
References
- 百度百科
- Hive優化案例(很好):https://blog.csdn.net/u011500419/article/details/90266428
- 數據傾斜是什麼以及造成的原因?:https://blog.csdn.net/wyz0516071128/article/details/80997158