好文分享|一文帶你搞清楚什麼是「數據傾斜」

  • 2019 年 12 月 4 日
  • 筆記

什麼是數據傾斜

我們在用hive取數的時候,有的時候只是跑一個簡單的join語句,但是卻跑了很長的時間,有的時候我們會覺得是集群資源不夠導致的,但是很大情況下就是出現了"數據傾斜"的情況。

在了解數據傾斜之前,我們應該有一個常識,就是現實生活中的數據分佈是不均勻的,俗話說"28定理",80%的財富集中在20%的人手中之類的故事相信大家都看得不少。所以,在我們日常處理的現實數據中,也是符合這種數據分佈的,數據傾斜一般有兩種情況:

  • 變量值很少: 單個變量值的佔比極大,常見的字段如性別、學歷、年齡等。
  • 變量值很多: 單個變量值的佔比極小,常見的字段如收入、訂單金額之類的。

數據傾斜,在MapReduce編程模型中十分常見,就是大量的相同key被partition分配到一個分區里,造成了"一個人累死,其他人閑死"的情況,這違背了並行計算的初衷,整體的效率是十分低下的。

數據傾斜的原因

當我們看任務進度長時間維持在99%(或100%),查看任務監控頁面就會發現只有少量(1個或幾個)reduce子任務未完成。因為其處理的數據量和其他reduce差異過大,這就是數據傾斜的直接表現。

而導致這個的原因,大致可以分為下面幾點:

  • key分佈不均勻
  • 業務數據本身的特性
  • 建表時考慮不周
  • 某些SQL語句本身就有數據傾斜

具體可以體現在下面的常見操作:

備註:圖片文字內容來自網絡

Hadoop計算框架的特點

在了解如何避免數據傾斜之前,我們先來看看Hadoop框架的特性:

  • 大數據量不是大問題,數據傾斜才是大問題;
  • jobs數比較多的作業效率相對比較低,比如即使有幾百萬的表,如果多次關聯多次匯總,產生十幾個jobs,耗時很長。原因是map reduce作業初始化的時間是比較長的;
  • sum,count,max,min等UDAF(User Defined Aggregate Function:自定義函數),不怕數據傾斜問題,hadoop在map端的匯總並優化,使數據傾斜不成問題;
  • count(distinct),在數據量大的情況下,效率較低,如果是多count(distinct)效率更低,因為count(distinct)是按group by字段分組,按distinct字段排序,一般這種分佈式是很傾斜的,比如男uv,女uv,淘寶一天30億的pv,如果按性別分組,分配2個reduce,每個reduce處理15億數據。

✌️ 優化的常用手段

說的是優化手段,但更多的是"踩坑"的經驗之談。

☝️ 優化之道:

  • 首先要了解數據分佈,自己動手解決數據傾斜問題是個不錯的選擇;
  • 增加jvm(Java Virtual Machine:Java虛擬機)內存,這適用於變量值非常少的情況,這種情況下,往往只能通過硬件的手段來進行調優,增加jvm內存可以顯著的提高運行效率;
  • 增加reduce的個數,這適用於變量值非常多的情況,這種情況下最容易造成的結果就是大量相同key被partition到一個分區,從而一個reduce執行了大量的工作;
  • 重新設計key,有一種方案是在map階段時給key加上一個隨機數,有了隨機數的key就不會被大量的分配到同一節點(小几率),待到reduce後再把隨機數去掉即可;
  • 使用combiner合併。combinner是在map階段,reduce之前的一個中間階段,在這個階段可以選擇性的把大量的相同key數據先進行一個合併,可以看做是local reduce,然後再交給reduce來處理,減輕了map端向reduce端發送的數據量(減輕了網絡帶寬),也減輕了map端和reduce端中間的shuffle階段的數據拉取數量(本地化磁盤IO速率);(hive.map.aggr=true)
  • 設置合理的map reduce的task數,能有效提升性能。(比如,10w+級別的計算,用160個reduce,那是相當的浪費,1個足夠);
  • 數據量較大的情況下,慎用count(distinct),count(distinct)容易產生傾斜問題;
  • hive.groupby.skewindata=true 有數據傾斜的時候進行負載均衡,當選項設定為 true,生成的查詢計劃會有兩個 MR Job。第一個 MR Job 中,Map 的輸出結果集合會隨機分佈到 Reduce 中,每個 Reduce 做部分聚合操作,並輸出結果,這樣處理的結果是相同的 Group By Key 有可能被分發到不同的 Reduce 中,從而達到負載均衡的目的;第二個 MR Job 再根據預處理的數據結果按照 Group By Key 分佈到 Reduce 中(這個過程可以保證相同的 Group By Key 被分佈到同一個 Reduce 中),最後完成最終的聚合操作。

✌️ SQL語句調節:

  • 如何Join: 關於驅動表的選取,選用join key分佈最均勻的表作為驅動表; 做好列裁剪和filter操作,以達到兩表做join的時候,數據量相對變小的效果。
  • 大小表Join: 使用map join讓小的維度表(1000條以下的記錄條數) 先進內存。在map端完成reduce。
  • 大表Join大表: 把空值的key變成一個字符串加上隨機數,把傾斜的數據分到不同的reduce上,由於null值關聯不上,處理後並不影響最終結果。
  • count distinct大量相同特殊值: count distinct時,將值為空的情況單獨處理,如果是計算count distinct,可以不用處理,直接過濾,在最後結果中加1。如果還有其他計算,需要進行group by,可以先將值為空的記錄單獨處理,再和其他計算結果進行union。
  • group by維度過小: 採用sum() group by的方式來替換count(distinct)完成計算。
  • 特殊情況特殊處理: 在業務邏輯優化效果的不大情況下,有些時候是可以將傾斜的數據單獨拿出來處理。最後union回去。

看完上面的經驗總結還是有點懵逼?說實話我也是的,太多的信息量衝擊,不過我們可以收藏起來以後繼續看多幾次,加深印象。

接下來,我們將從一些具體的案例來講講SQL語句優化的技巧,非常常用,對我們日常寫SQL很有幫助。

優化案例

場景1:RAC常用(real application clusters的縮寫,譯為「實時應用集群」)

有一張user表,為賣家每天收入表,user_id,dt(日期)為key,屬性有主營類目(cat),指標有交易金額,交易比數。每天要取每個user的主營類目在過去10天的總收入,總比數。

常規做法:取每個user_id最近一天的主營類目,存入臨時表t1,匯總過去10天的總交易金額,交易比數,存入臨時表t2,連接t1,t2,得到最終的結果。

優化做法:

SELECT user_id    , substr(MAX(concat(dt, cat)), 9) AS main_cat    , SUM(qty), SUM(amt)  FROM users  WHERE dt BETWEEN 20101201 AND 20101210  GROUP BY user_id;

場景2:空值產生的數據傾斜(最常見的現象)

日誌中,常會有信息丟失的問題,比如全網日誌中的 user_id,如果取其中的 user_id 和 bmw_users 關聯,會碰到數據傾斜的問題。

解決方式1:user_id為空的不參與關聯

SELECT *  FROM log a    JOIN bmw_users b    ON a.user_id IS NOT NULL        AND a.user_id = b.user_id  UNION ALL  SELECT *  FROM log a  WHERE a.user_id IS NULL;

解決方式2:賦與空值分新的key值

SELECT *  FROM log a    LEFT JOIN bmw_users b ON CASE            WHEN a.user_id IS NULL THEN concat(『dp_hive』, rand())            ELSE a.user_id        END = b.user_id;

結論:方法2比方法1效率更好,不但io少了,而且作業數也少了。解決方法1 log表被讀取了兩次,jobs是2。這個優化適合無效 id (比如 -99 , 』』, null 等) 產生的傾斜問題。把空值的 key 變成一個字符串加上隨機數,就能把傾斜的數據分到不同的reduce上 ,解決數據傾斜問題。

場景3:不同數據類型關聯產生數據傾斜

一張表 s8_log,每個商品一條記錄,要和商品表關聯。但關聯卻碰到傾斜的問題。s8_log 中有字符串商品 id,也有數字的商品 id。字符串商品 id 類型是 string 的,但商品中的數字 id 是 bigint 的。問題的原因是把 s8_log 的商品 id 轉成數字 id 做 Hash(數字的 Hash 值為其本身,相同的字符串的 Hash 也不同)來分配 Reducer,所以相同字符串 id 的 s8_log,都到一個 Reducer 上了。

解決方式:把數字類型轉換成字符串類型

SELECT *  FROM s8_log a    LEFT JOIN r_auction_auctions b ON a.auction_id = CAST(b.auction_id AS string);

場景4:多表 union all 會優化成一個 job

推廣效果表要和商品表關聯,效果表中的 auction id 列既有商品 id,也有數字 id,和商品表關聯得到商品的信息。

SELECT *  FROM effect a    JOIN (        SELECT auction_id AS auction_id        FROM auctions        UNION ALL        SELECT auction_string_id AS auction_id        FROM auctions    ) b    ON a.auction_id = b.auction_id;

結論:這樣子比分別過濾數字 id,字符串 id ,然後分別和商品表關聯性能要好。這樣寫的好處:1個 MR 作業,商品表只讀取一次,推廣效果表只讀取一次。把這個 sql 換成 MR 代碼的話,map 的時候,把 a 表的記錄打上標籤 a ,商品表記錄每讀取一條,打上標籤 t,變成兩個<key,value> 對,<t,數字id,value>,<t,字符串id,value>。所以商品表的 HDFS(Hadoop Distributed File System) 讀只會是一次。

場景5:消滅子查詢內的 group by

原寫法:

SELECT *  FROM (    SELECT *    FROM t1    GROUP BY c1, c2, c3    UNION ALL    SELECT *    FROM t2    GROUP BY c1, c2, c3  ) t3  GROUP BY c1, c2, c3;

優化寫法:

SELECT *  FROM (    SELECT *    FROM t1    UNION ALL    SELECT *    FROM t2  ) t3  GROUP BY c1, c2, c3;

結論:從業務邏輯上說,子查詢內的 group by 功能與外層的 group by 重複,除非子查詢內有 count(distinct)。經過測試,並未出現 union all 的 hive bug,數據是一致的。MR 的作業數由3減少到1。t1 相當於一個目錄,t2 相當於一個目錄,對map reduce程序來說,t1,t2 可以做為 map reduce 作業的 mutli inputs。這可以通過一個 map reduce 來解決這個問題。Hadoop的計算框架,不怕數據多,怕作業數多。

場景6:消滅子查詢內的count(distinct),max,min

原寫法:

SELECT c1, c2, c3, sum(pv)  FROM (      SELECT c1, c2, c3, COUNT(c4)      FROM t1      GROUP BY c1, c2, c3      UNION ALL      SELECT c1, c2, c3, COUNT(DISTINCT c4)      FROM t2      GROUP BY c1, c2, c3  ) t3  GROUP BY c1, c2, c3;

這種我們不能直接union 再groupby,因為其中有一個表的操作用到了去重,這種情況,我們可以通過建立臨時表來消滅這種數據傾斜問題。

優化寫法:

INSERT INTO t4  SELECT c1, c2, c3, COUNT(DISTINCT c4)  FROM t2  GROUP BY c1, c2, c3;    SELECT c1, c2, c3, SUM(pv)  FROM (      SELECT c1, c2, c3, COUNT(c4)      FROM t1      UNION ALL      SELECT *      FROM t4  ) t3  GROUP BY c1, c2, c3;

場景7:兩張大表join

有兩張表,一張是用戶訪問日誌表log,一張是用戶表users,其中log表上T,user表也上G,如何每日做到快速連接呢?

解決方法:

SELECT *  FROM log a      LEFT JOIN (          SELECT d.*          FROM (              SELECT DISTINCT memberid              FROM log          ) c              JOIN users d ON c.memberid = d.memberid      ) x      ON a.memberid = b.memberid;

上面代碼的意思,就是我們可以通過縮小主鍵的範圍來達到減少表的連接操作,比如說限值某段時間,這樣子,memberid就會有所減少了,而不是全量數據。

場景8:reduce的時間過長

還是場景7的例子,假設一個memberid對應的log里有很多數據,那麼最後合併的時候,也是十分耗時的,所以,這裡需要找到一個方法來解決這種reduce分配不均的問題。

解決方法:

SELECT *  FROM log a      LEFT JOIN (          SELECT memberid, number          FROM users d              JOIN num e      ) b      ON a.memberid = b.memberid          AND mod(a.pvtime, 30) + 1 = b.number;

解釋一下,上面的num是一張1列30行的表,對應1-30的正整數,把users表膨脹成N份(基於傾斜程度做一個合適的選擇),然後把log數據根據memberid和pvtime分到不同的reduce里去,這樣可以保證每個reduce分配到的數據可以相對均勻。

場景9:過多的where條件

有的時候,我們會寫超級多的where條件來限制查詢,其實這樣子是非常低效的,主要原因是因為這個and條件hive在生成執行計劃時產生了一個嵌套層次很多的算子。

解決方案:

1)把篩選條件對應的值寫入一張小表,再一次性join到主表;

2)或者寫個udf(user-defined function,用戶定義函數),把這些預設值讀取進去,udf來完成這個and數據過濾操作。

場景10:分組結果很多,但是你只需要topK

原寫法:

SELECT mid, url, COUNT(1) AS cnt  FROM (      SELECT *      FROM r_atpanel_log      WHERE pt = '20190610'          AND pagetype = 'normal'  ) subq  GROUP BY mid, url  ORDER BY cnt DESC  LIMIT 15;

優化寫法:

SELECT *  FROM (      SELECT mid, url, COUNT(1) AS cnt      FROM (          SELECT *          FROM r_atpanel_log          WHERE pt = '20190610'              AND pagetype = 'normal'      ) subq      GROUP BY mid, url  ) subq2  WHERE cnt > 100  ORDER BY cnt DESC  LIMIT 15;

可以看出,我們先過濾掉無關的內容,再進行排序,這樣子快很多。

References

  • 百度百科
  • Hive優化案例(很好):https://blog.csdn.net/u011500419/article/details/90266428
  • 數據傾斜是什麼以及造成的原因?:https://blog.csdn.net/wyz0516071128/article/details/80997158