hudi clustering 數據聚集(一)
- 2021 年 11 月 11 日
- 筆記
- clustering, Hudi, spark, 數據聚集
概要
數據湖的業務場景主要包括對資料庫、日誌、文件的分析,而管理數據湖有兩點比較重要:寫入的吞吐量和查詢性能,這裡主要說明以下問題:
1、為了獲得更好的寫入吞吐量,通常把數據直接寫入文件中,這種情況下會產生很多小的數據文件。雖然小文件的使用可以增加寫入的並行度,且能夠並行讀取文件以提高讀取速度,但會出現一個數據量很小,需要從多個小文件中讀取數據,增加了很多IO。
2、數據按照進入數據湖的方式寫入到文件中,在同一個文件上,數據局部性不是最佳的。 數據之間,與傳入批次相關,相近的批次的數據會相關聯,而不是與經常要查詢的數據相關聯。所以小文件的大小和缺乏數據局部性會降低查詢性能。
3、此外,許多文件系統(包括 hdfs),當有很多小文件時,性能會下降。
hudi clustering
hudi支援clustering功能,在不影響查詢性能的情況下提高寫入吞吐量。該功能可以以不同方式重寫數據:
1、數據先寫入小文件,在滿足某些條件後(例如經過的時間、小文件數量、commit次數等),將小文件拼接成大文件。
2、通過對不同列上的數據進行排序,來更改磁碟上的數據布局,已提高數據間的相關性,可以提高查詢性能。
實現
(用戶可以將小文件的限制 hoodie.parquet.small.file.limit 配置為 0,這樣可以強制將數據進入新的文件組。)
cow表的timeline
在上面的示例流程圖中,顯示了隨時間(t5 到 t9)的分區狀態。 主要有以下步驟:
- 在 t5,表中的一個分區有 5 個文件組 f0、f1、f2、f3、f4,分別在 t0、t1、t2、t3、t4時刻被創建。 假設每個文件組為 100MB。 所以分區中的總數據為 500MB。
- 在 t6 請求 clustering 操作。 與壓縮類似,我們在帶有「ClusteringPlan」的元數據中創建了一個「t6.clustering.requested」文件,其中包含跨所有分區的集群操作涉及的所有文件組。例如:{ partitionPath: {「datestr」}, oldfileGroups: [ {fileId: 「f0」, time: 「t0」}, { fileId: 「f1」, time: 「t1」}, … ], newFileGroups: [「c1」, 「c2」] }
- 假設clustering後的最大文件大小配置為 250MB。 集群會將分區中的所有數據重新分配到兩個文件組中:c1、c2。 此時這些文件組是「虛假」的,在 t8 clustering 完成之前,對查詢不可見。
- 請注意,文件組中的記錄可以拆分為多個文件組。 在此示例中,來自 f4 文件組的一些記錄同時轉到了新文件組 c1、c2。
- 當集群正在進行時(t6 到 t8),任何涉及到這些文件組的更新插入都會被拒絕。
- 在寫入新的數據文件 c1-t6.parquet 和 c2-t6.parquet 後,如果配置了全局索引,我們會在記錄級索引中為所有具有新位置的鍵添加條目。 新的索引條目對其他寫入將不可見,因為還沒有關聯的提交。
- 最後,我們創建一個提交元數據文件「t6.commit」,其中包含由此次提交修改的文件組(f0、f1、f2、f3、f4)。
- 註:文件組(f0 到 f4)不會立即從磁碟中刪除。 cleaner 會在歸檔 t6.commit 之前清理這些文件。 並且,clustering 還會更新所有視圖和源數據文件。
mor表的時間線
這種方法同樣支援mor表,且過程與cow 表非常相似。
clustering 的為 parquet 格式文件。
Clustering 操作步驟
總體來說,需要兩步:
- clustering 調度:創建 clustering 計劃
- 執行 clustering:執行計劃。創建新的文件,並替換舊的文件。
clustering 調度
- 識別符合集群條件的文件
- 過濾特定分區(根據配置優先考慮最新分區或舊分區)
- 任何大小 > targetFileSize 的文件都不符合條件
- 任何有待定壓縮/clustering計劃的文件都不符合條件
- 任何具有日誌文件的文件組都不符合集群條件(該限制以後可能會被取消)
- 根據特定條件對符合聚類條件的文件進行分組。 每個組的數據大小預計是「targetFileSize」的倍數。 分組是作為計劃中定義的「策略」的一部分完成的:
- 根據記錄鍵範圍對文件進行分組。因為鍵值範圍存儲在parquet footer中,這個可用於某些查詢/更新。
- 根據提交時間對文件進行分組。
- 對自定義列,且具有重疊值的文件進行分組(指定列進行排序)
- 分組隨機文件
- 我們可以限制組大小以提高並行性
- 根據特定條件過濾組(類似於 CompactionStrategy 中的 orderAndFilter)
- 最後,clustering計劃被保存到timeline中。
執行 clustering
- 讀取clustering計劃,查看「clusteringGroups」的數量(用於並行性)。
- 創建 inflight狀態的 clustering 文件
- 對於每組:
- 使用 strategyParams 實例化適當的策略類(例如:sortColumns)
- 策略類定義了分區器,我們可以用它來創建桶並寫入數據。
- 創建 replacecommit:
- operationType 設置為「clustering」。
- 擴展元數據,並存儲附加欄位以跟蹤重要資訊(策略類可以返回這些額外的元數據資訊)
- 用於合併文件的策略
- 跟蹤替換文件
【參考】
//hudi.apache.org/docs/next/configurations/#hoodieclusteringplanstrategyclass