Apache Hudi核心概念一網打盡

1. 場景

//hudi.apache.org/docs/use_cases.html

  • 近實時寫入
    • 減少碎片化工具的使用
    • CDC 增量導入 RDBMS 數據
    • 限制小文件的大小和數量
  • 近實時分析
    • 相對於秒級存儲 (Druid, OpenTSDB) ,節省資源
    • 提供分鐘級別時效性,支撐更高效的查詢
    • Hudi 作為 lib,非常輕量
  • 增量 pipeline
    • 區分 arrivetime 和 event time 處理延遲數據
    • 更短的調度 interval 減少端到端延遲 (小時 -> 分鐘) => Incremental Processing
  • 增量導出
    • 替代部分 Kafka 的場景,數據導出到在線服務存儲 e.g. ES

2. 概念/術語

//hudi.apache.org/docs/concepts.html

2.1 Timeline

Timeline 是 HUDI 用來管理提交(commit)的抽象,每個 commit 都綁定一個固定時間戳,分散到時間線上。在 Timeline 上,每個 commit 被抽象為一個 HoodieInstant,一個 instant 記錄了一次提交 (commit) 的行為、時間戳、和狀態。
HUDI 的讀寫 API 通過 Timeline 的介面可以方便的在 commits 上進行條件篩選,對 history 和 on-going 的 commits 應用各種策略,快速篩選出需要操作的目標 commit。

2.2 Time

Arrival time: 數據到達 Hudi 的時間,commit time

Event time: record 中記錄的時間

上圖中採用時間(小時)作為分區欄位,從 10:00 開始陸續產生各種 commits,10:20 來了一條 9:00 的數據,該數據仍然可以落到 9:00 對應的分區,通過 timeline 直接消費 10:00 之後的增量更新(只消費有新 commits 的 group),那麼這條延遲的數據仍然可以被消費到。

2.3 文件管理

2.3.1 文件版本

一個新的 base commit time 對應一個新的 FileSlice,實際就是一個新的數據版本。HUDI 通過 TableFileSystemView 抽象來管理 table 對應的文件,比如找到所有最新版本 FileSlice 中的 base file (Copy On Write Snapshot 讀)或者 base + log files(Merge On Read 讀)。
通過 Timeline 和 TableFileSystemView 抽象,HUDI 實現了非常便捷和高效的表文件查找。

2.3.3 文件格式

Hoodie 的每個 FileSlice 中包含一個 base file (merge on read 模式可能沒有)和多個 log file (copy on write 模式沒有)。

每個文件的文件名都帶有其歸屬的 FileID(即 FileGroup Identifier)和 base commit time(即 InstanceTime)。通過文件名的 group id 組織 FileGroup 的 logical 關係;通過文件名的 base commit time 組織 FileSlice 的邏輯關係。

HUDI 的 base file (parquet 文件) 在 footer 的 meta 去記錄了 record key 組成的 BloomFilter,用於在 file based index 的實現中實現高效率的 key contains 檢測。只有不在 BloomFilter 的 key 才需要掃描整個文件消滅假陽。

HUDI 的 log (avro 文件)是自己編碼的,通過積攢數據 buffer 以 LogBlock 為單位寫出,每個 LogBlock 包含 magic number、size、content、footer 等資訊,用於數據讀、校驗和過濾。

2.4 Index

Hoodie key (record key + partition path) 和 file id (FileGroup) 之間的映射關係,數據第一次寫入文件後保持不變,所以,一個 FileGroup 包含了一批 record 的所有版本記錄。Index 用於區分消息是 INSERT 還是 UPDATE。

2.4.1 Index的創建過程

1. BloomFilter Index
  • 新增 records 找到映射關係:record key => target partition
  • 當前最新的數據 找到映射關係:partition => (fileID, minRecordKey, maxRecordKey) LIST (如果是 base files 可加速)
  • 新增 records 找到需要搜索的映射關係:fileID => HoodieKey(record key + partition path) LIST,key 是候選的 fileID
  • 通過 HoodieKeyLookupHandle 查找目標文件(通過 BloomFilter 加速)

HUDI 在 0.8.0 版本中實現的 Flink witer,採用了 Flink 的 state 作為底層的 index 存儲,每個 records 在寫入之前都會先計算目標 bucket ID,不同於 BloomFilter Index,避免了每次重複的文件 index 查找。

2.5 Table 類型

2.5.1 Copy On Write

Copy On Write 類型表每次寫入都會生成一個新的持有 base file(對應寫入的 instant time ) 的 FileSlice。

用戶在 snapshot 讀取的時候會掃描所有最新的 FileSlice 下的 base file。

2.5.2 Merge On Read

Merge On Read 表的寫入行為,依據 index 的不同會有細微的差別:

  • 對於 BloomFilter 這種無法對 log file 生成 index 的索引方案,對於 INSERT 消息仍然會寫 base file (parquet format),只有 UPDATE 消息會 append log 文件(因為 base file 總已經記錄了該 UPDATE 消息的 FileGroup ID)。
  • 對於可以對 log file 生成 index 的索引方案,例如 Flink writer 中基於 state 的索引,每次寫入都是 log format,並且會不斷追加和 roll over。

Merge On Read 表的讀在 READ OPTIMIZED 模式下,只會讀最近的經過 compaction 的 commit。

3. 數據寫

3.1 寫操作

  • UPSERT:默認行為,數據先通過 index 打標(INSERT/UPDATE),有一些啟發式演算法決定消息的組織以優化文件的大小 => CDC 導入
  • INSERT:跳過 index,寫入效率更高 => Log Deduplication
  • BULK_INSERT:寫排序,對大數據量的 Hudi 表初始化友好,對文件大小的限制 best effort(寫 HFile)

3.1.1 寫流程(UPSERT)

1. Copy On Write
  • 先對 records 按照 record key 去重
  • 首先對這批數據創建索引 (HoodieKey => HoodieRecordLocation);通過索引區分哪些 records 是 update,哪些 records 是 insert(key 第一次寫入)
  • 對於 update 消息,會直接找到對應 key 所在的最新 FileSlice 的 base 文件,並做 merge 後寫新的 base file (新的 FileSlice)
  • 對於 insert 消息,會掃描當前 partition 的所有 SmallFile(小於一定大小的 base file),然後 merge 寫新的 FileSlice;如果沒有 SmallFile,直接寫新的 FileGroup + FileSlice
2. Merge On Read
  • 先對 records 按照 record key 去重(可選)
  • 首先對這批數據創建索引 (HoodieKey => HoodieRecordLocation);通過索引區分哪些 records 是 update,哪些 records 是 insert(key 第一次寫入)
  • 如果是 insert 消息,如果 log file 不可建索引(默認),會嘗試 merge 分區內最小的 base file (不包含 log file 的 FileSlice),生成新的 FileSlice;如果沒有 base file 就新寫一個 FileGroup + FileSlice + base file;如果 log file 可建索引,嘗試 append 小的 log file,如果沒有就新寫一個 FileGroup + FileSlice + base file
  • 如果是 update 消息,寫對應的 file group + file slice,直接 append 最新的 log file(如果碰巧是當前最小的小文件,會 merge base file,生成新的 file slice)log file 大小達到閾值會 roll over 一個新的

3.1.2 寫流程(INSERT)

1. Copy On Write
  • 先對 records 按照 record key 去重(可選)
  • 不會創建 Index
  • 如果有小的 base file 文件,merge base file,生成新的 FileSlice + base file,否則直接寫新的 FileSlice + base file
2. Merge On Read
  • 先對 records 按照 record key 去重(可選)
  • 不會創建 Index
  • 如果 log file 可索引,並且有小的 FileSlice,嘗試追加或寫最新的 log file;如果 log file 不可索引,寫一個新的 FileSlice + base file

3.1.3 工具

  • DeltaStreamer
  • Datasource Writer
  • Flink SQL API

3.1.4 Key 生成策略

用來生成 HoodieKey(record key + partition path),目前支援以下策略:

  • 支援多個欄位組合 record keys
  • 支援多個欄位組合的 parition path (可訂製時間格式,Hive style path name)
  • 非分區表

3.1.5 刪除策略

  • 邏輯刪:將 value 欄位全部標記為 null
  • 物理刪:
    • 通過 OPERATION_OPT_KEY 刪除所有的輸入記錄
    • 配置 PAYLOAD_CLASS_OPT_KEY = org.apache.hudi.EmptyHoodieRecordPayload 刪除所有的輸入記錄
    • 在輸入記錄添加欄位:_hoodie_is_deleted

4. 數據讀

4.1 Snapshot 讀

讀取所有 partiiton 下每個 FileGroup 最新的 FileSlice 中的文件,Copy On Write 表讀 parquet 文件,Merge On Read 表讀 parquet + log 文件

4.2 Incremantal 讀

//hudi.apache.org/docs/querying_data.html#spark-incr-query,當前的 Spark data source 可以指定消費的起始和結束 commit 時間,讀取 commit 增量的數據集。但是內部的實現不夠高效:拉取每個 commit 的全部目標文件再按照系統欄位 hoodie_commit_time apply 過濾條件。

4.3 Streaming 讀

0.8.0 版本的 HUDI Flink writer 支援實時的增量訂閱,可用於同步 CDC 數據,日常的數據同步 ETL pipeline。Flink 的 streaming 讀做到了真正的流式讀取,source 定期監控新增的改動文件,將讀取任務下派給讀 task。

5. Compaction

  • 沒有 base file:走 copy on write insert 流程,直接 merge 所有的 log file 並寫 base file
  • 有 base file:走 copy on write upsert 流程,* 先讀 log file 建 index,再讀 base file,最後讀 log file 寫新的 base file

Flink 和 Spark streaming 的 writer 都可以 apply 非同步的 compaction 策略,按照間隔 commits 數或者時間來觸發 compaction 任務,在獨立的 pipeline 中執行。

6. 總結

通過對寫流程的梳理我們了解到 HUDI 相對於其他數據湖方案的核心優勢:

  • 寫入過程充分優化了文件存儲的小文件問題,Copy On Write 寫會一直將一個 bucket (FileGroup)的 base 文件寫到設定的閾值大小才會劃分新的 bucket;Merge On Read 寫在同一個 bucket 中,log file 也是一直 append 直到大小超過設定的閾值 roll over。
  • 對 UPDATE 和 DELETE 的支援非常高效,一條 record 的整個生命周期操作都發生在同一個 bucket,不僅減少小文件數量,也提升了數據讀取的效率(不必要的 join 和 merge)。

0.8.0 的 HUDI Flink 支援了 streaming 消費 HUDI 表,在後續版本還會支援 watermark 機制,讓 HUDI Flink 承擔 streaming ETL pipeline 的中間層,成為數據湖/倉建設中流批一體的中間計算層。