Apache Hudi 設計與架構最強解讀
- 2020 年 4 月 16 日
- 筆記
感謝 Apache Hudi contributor:王祥虎 翻譯&供稿。
歡迎關注微信公眾號:ApacheHudi
本文將介紹Apache Hudi的基本概念、設計以及總體基礎架構。
1.簡介
Apache Hudi(簡稱:Hudi)使得您能在hadoop兼容的存儲之上存儲大量數據,同時它還提供兩種原語,使得除了經典的批處理之外,還可以在數據湖上進行流處理。這兩種原語分別是:
-
Update/Delete記錄:Hudi使用細粒度的文件/記錄級別索引來支援Update/Delete記錄,同時還提供寫操作的事務保證。查詢會處理最後一個提交的快照,並基於此輸出結果。
-
變更流:Hudi對獲取數據變更提供了一流的支援:可以從給定的時間點獲取給定表中已updated/inserted/deleted的所有記錄的增量流,並解鎖新的查詢姿勢(類別)。
這些原語緊密結合,解鎖了基於DFS抽象的流/增量處理能力。如果您熟悉流處理,那麼這和從kafka主題消費事件,然後使用狀態存儲逐步累加中間結果類似。這在架構上會有以下幾點優勢:1) 效率的提升:攝取數據通常需要處理更新、刪除以及強制唯一鍵約束。然而,由於缺乏像Hudi這樣能對這些功能提供標準支援的系統,數據工程師們通常會採用大批量的作業來重新處理一整天的事件,或者每次運行都重新載入整個上游資料庫,從而導致大量的計算資源浪費。由於Hudi支援記錄級更新,它通過只處理有變更的記錄並且只重寫表中已更新/刪除的部分,而不是重寫整個表分區甚至整個表,為這些操作帶來一個數量級的性能提升。2) 更快的ETL/派生Pipelines:從外部系統攝入數據後,下一步需要使用Apache Spark/Apache Hive或者任何其他數據處理框架來ETL這些數據用於諸如數據倉庫、機器學習或者僅僅是數據分析等一些應用場景。通常,這些處理再次依賴以程式碼或SQL表示的批處理作業,這些作業將批量處理所有輸入數據並重新計算所有輸出結果。通過使用增量查詢而不是快照查詢來查詢一個或多個輸入表,可以大大加速此類數據管道,從而再次像上面一樣僅處理來自上游表的增量更改,然後upsert或者delete目標派生表。3) 新鮮數據的獲取:減少資源還能獲取性能上的提升並不是常見的事。畢竟我們通常會使用更多的資源(例如記憶體)來提升性能(例如查詢延遲)。Hudi通過從根本上擺脫數據集的傳統管理方式,將批量處理增量化帶來了一個附加的好處:與以前的數據湖相比,pipeline運行的時間會更短,數據交付會更快。4) 統一存儲:基於以上三個優點,在現有數據湖之上進行更快速、更輕量的處理意味著僅出於訪問近實時數據的目的時不再需要專門的存儲或數據集市。
2.設計原則
流式讀/寫:Hudi借鑒了資料庫設計的原理,從零設計,應用於大型數據集記錄流的輸入和輸出。為此,Hudi提供了索引實現,可以將記錄的鍵快速映射到其所在的文件位置。同樣,對於流式輸出數據,Hudi通過其特殊列添加並跟蹤記錄級的元數據,從而可以提供所有發生變更的精確增量流。自管理:Hudi注意到用戶可能對數據新鮮度(寫友好)與查詢性能(讀/查詢友好)有不同的期望,它支援了三種查詢類型,這些類型提供實時快照,增量流以及稍早的純列數據。在每一步,Hudi都努力做到自我管理(例如自動優化編寫程式的並行性,保持文件大小)和自我修復(例如:自動回滾失敗的提交),即使這樣做會稍微增加運行時成本(例如:在記憶體中快取輸入數據已分析工作負載)。如果沒有這些內置的操作槓桿/自我管理功能,這些大型流水線的運營成本通常會翻倍。萬物皆日誌:Hudi還具有 append only、雲數據友好的設計,該設計實現了日誌結構化存儲系統的原理,可以無縫管理所有雲提供商的數據。
鍵-值數據模型:在寫方面,Hudi表被建模為鍵值對數據集,其中每條記錄都有一個唯一的記錄鍵。此外,一個記錄鍵還可以包括分區路徑,在該路徑下,可以對記錄進行分區和存儲。這通常有助於減少索引查詢的搜索空間。
3. 表設計
了解了Hudi項目的關鍵技術動機後,現在讓我們更深入地研究Hudi系統本身的設計。在較高的層次上,用於寫Hudi表的組件使用了一種受支援的方式嵌入到Apache Spark作業中,它會在支援DFS的存儲上生成代表Hudi表的一組文件。然後,在具有一定保證的情況下,諸如Apache Spark、Presto、Apache Hive之類的查詢引擎可以查詢該表。 Hudi表的三個主要組件:1) 有序的時間軸元數據。類似於資料庫事務日誌。2) 分層布局的數據文件:實際寫入表中的數據。3) 索引(多種實現方式):映射包含指定記錄的數據集。
Hudi提供了以下功能來對基礎數據進行寫入、查詢,這使其成為大型數據湖的重要模組:1) 支援快速,可插拔索引的upsert();2) 高效、只掃描新數據的增量查詢;3) 原子性的數據發布和回滾,支援恢復的Savepoint;4) 使用mvcc(多版本並發控制)風格設計的讀和寫快照隔離;5) 使用統計資訊管理文件大小;6) 已有記錄update/delta的自管理壓縮;7) 審核數據修改的時間軸元數據;8) 滿足GDPR(通用數據保護條例)、數據刪除功能。
3.1 時間軸
在其核心,Hudi維護了一條包含在不同的即時時間(instant time)對數據集做的所有instant操作的timeline,從而提供表的即時視圖,同時還有效支援按到達順序進行數據檢索。時間軸類似於資料庫的redo/transaction日誌,由一組時間軸實例組成。Hudi保證在時間軸上執行的操作的原子性和基於即時時間的時間軸一致性。時間軸被實現為表基礎路徑下.hoodie元數據文件夾下的一組文件。具體來說,最新的instant被保存為單個文件,而較舊的instant被存檔到時間軸歸檔文件夾中,以限制writers和queries列出的文件數量。一個Hudi 時間軸instant由下面幾個組件構成:1) 操作類型:對數據集執行的操作類型;2) 即時時間:即時時間通常是一個時間戳(例如:20190117010349),該時間戳按操作開始時間的順序單調增加;3) 即時狀態:instant的當前狀態;每個instant都有avro或者json格式的元數據資訊,詳細的描述了該操作的狀態以及這個即時時刻instant的狀態。 關鍵的Instant操作類型有:1) COMMIT:一次提交表示將一組記錄原子寫入到數據集中;2) CLEAN: 刪除數據集中不再需要的舊文件版本的後台活動;3) DELTA_COMMIT:將一批記錄原子寫入到MergeOnRead存儲類型的數據集中,其中一些/所有數據都可以只寫到增量日誌中;4) COMPACTION: 協調Hudi中差異數據結構的後台活動,例如:將更新從基於行的日誌文件變成列格式。在內部,壓縮表現為時間軸上的特殊提交;5) ROLLBACK: 表示提交/增量提交不成功且已回滾,刪除在寫入過程中產生的所有部分文件;6) SAVEPOINT: 將某些文件組標記為”已保存”,以便清理程式不會將其刪除。在發生災難/數據恢復的情況下,它有助於將數據集還原到時間軸上的某個點;任何給定的即時都會處於以下狀態之一:1) REQUESTED:表示已調度但尚未初始化;2) INFLIGHT: 表示當前正在執行該操作;3) COMPLETED: 表示在時間軸上完成了該操作.
3.2 數據文件
Hudi將表組織成DFS上基本路徑下的文件夾結構中。如果表是分區的,則在基本路徑下還會有其他的分區,這些分區是包含該分區數據的文件夾,與Hive表非常類似。每個分區均由相對於基本路徑的分區路徑唯一標識。在每個分區內,文件被組織成文件組,由文件ID唯一標識。其中每個切片包含在某個提交/壓縮即時時間生成的基本列文件(*.parquet)以及一組日誌文件(*.log*),該文件包含自生成基本文件以來對基本文件的插入/更新。Hudi採用了MVCC設計,壓縮操作會將日誌和基本文件合併以產生新的文件片,而清理操作則將未使用的/較舊的文件片刪除以回收DFS上的空間。
3.3 索引
Hudi通過索引機制提供高效的upsert操作,該機制會將一個記錄鍵+分區路徑組合一致性的映射到一個文件ID.這個記錄鍵和文件組/文件ID之間的映射自記錄被寫入文件組開始就不會再改變。簡而言之,這個映射文件組包含了一組文件的所有版本。Hudi當前提供了3種索引實現(HBaseIndex,、HoodieBloomIndex(HoodieGlobalBloomIndex)、InMemoryHashIndex)來映射一個記錄鍵到包含該記錄的文件ID。這將使我們無需掃描表中的每條記錄,就可顯著提高upsert速度。Hudi索引可以根據其查詢分區記錄的能力進行分類:1) 全局索引:不需要分區資訊即可查詢記錄鍵映射的文件ID。比如,寫程式可以傳入null或者任何字元串作為分區路徑(partitionPath),但索引仍然會查找到該記錄的位置。全局索引在記錄鍵在整張表中保證唯一的情況下非常有用,但是查詢的消耗隨著表的大小呈函數式增加。2) 非全局索引:與全局索引不同,非全局索引依賴分區路徑(partitionPath),對於給定的記錄鍵,它只會在給定分區路徑下查找該記錄。這比較適合總是同時生成分區路徑和記錄鍵的場景,同時還能享受到更好的擴展性,因為查詢索引的消耗只與寫入到該分區下數據集大小有關係。
4. 表類型
4.1 Copy On Write表
COW表寫的時候數據直接寫入basefile,(parquet)不寫log文件。所以COW表的文件片只包含basefile(一個parquet文件構成一個文件片)。這種的存儲方式的Spark DAG相對簡單。關鍵目標是是使用partitioner將tagged Hudi記錄RDD(所謂的tagged是指已經通過索引查詢,標記每條輸入記錄在表中的位置)分成一些列的updates和inserts.為了維護文件大小,我們先對輸入進行取樣,獲得一個工作負載profile,這個profile記錄了輸入記錄的insert和update、以及在分區中的分布等資訊。把數據從新打包,這樣:1) 對於updates, 該文件ID的最新版本都將被重寫一次,並對所有已更改的記錄使用新值2) 對於inserts.記錄首先打包到每個分區路徑中的最小文件中,直到達到配置的最大大小。之後的所有剩餘記錄將再次打包到新的文件組,新的文件組也會滿足最大文件大小要求。
4.2 Merge On Read表
MOR表寫數據時,記錄首先會被快速的寫進日誌文件,稍後會使用時間軸上的壓縮操作將其與基礎文件合併。根據查詢是讀取日誌中的合併快照流還是變更流,還是僅讀取未合併的基礎文件,MOR表支援多種查詢類型。在高層次上,MOR writer在讀取數據時會經歷與COW writer 相同的階段。這些更新將追加到最新文件篇的最新日誌文件中,而不會合併。對於insert,Hudi支援兩種模式:1) 插入到日誌文件:有可索引日誌文件的表會執行此操作(HBase索引);2) 插入parquet文件:沒有索引文件的表(例如布隆索引)與寫時複製(COW)一樣,對已標記位置的輸入記錄進行分區,以便將所有發往相同文件id的upsert分到一組。這批upsert會作為一個或多個日誌塊寫入日誌文件。Hudi允許客戶端控制日誌文件大小。對於寫時複製(COW)和讀時合併(MOR)writer來說,Hudi的WriteClient是相同的。幾輪數據的寫入將會累積一個或多個日誌文件。這些日誌文件與基本的parquet文件(如有)一起構成一個文件片,而這個文件片代表該文件的一個完整版本。這種表是用途最廣、最高級的表。為寫(可以指定不同的壓縮策略,吸收突發寫流量)和查詢(例如權衡數據的新鮮度和查詢性能)提供了很大的靈活性。同時它包含一個學習曲線,以便在操作上掌控他。
5. 寫設計
5.1 寫操作
了解Hudi數據源或者deltastreamer工具提供的3種不同寫操作以及如何最好的利用他們可能會有所幫助。這些操作可以在對數據集發出的每個commit/delta commit中進行選擇/更改。1) upsert操作:這是默認操作,在該操作中,首先通過查詢索引將數據記錄標記為插入或更新,然後再運行試探法確定如何最好地將他們打包到存儲,以對文件大小進行優化,最終將記錄寫入。對於諸如資料庫更改捕獲之類的用例,建議在輸入幾乎肯定包含更新的情況下使用此操作。2) insert操作:與upsert相比,insert操作也會運行試探法確定打包方式,優化文件大小,但會完全跳過索引查詢。因此對於諸如日誌重複數據刪除(結合下面提到的過濾重複項選項)的用例而言,它比upsert的速度快得多。這也適用於數據集可以容忍重複項,但只需要Hudi具有事務性寫/增量拉取/存儲管理功能的用例。3) bulk insert操作:upsert 和insert操作都會將輸入記錄保留在記憶體中,以加快存儲啟發式計算速度,因此對於最初載入/引導Hudi數據集的用例而言可能會很麻煩。Bulk insert提供了與insert相同的語義,同時實現了基於排序的數據寫入演算法,該演算法可以很好的擴展數百TB的初始負載。但是這只是在調整文件大小方面進行的最大努力,而不是像insert/update那樣保證文件大小。
5.2 壓縮
壓縮是一個 instant操作,它將一組文件片作為輸入,將每個文件切片中的所有日誌文件與其basefile文件(parquet文件)合併,以生成新的壓縮文件片,並寫為時間軸上的一個commit。壓縮僅適用於讀時合併(MOR)表類型,並且由壓縮策略(默認選擇具有最大未壓縮日誌的文件片)決定選擇要進行壓縮的文件片。這個壓縮策略會在每個寫操作之後評估。從高層次上講,壓縮有兩種方式:1)同步壓縮:這裡的壓縮由寫程式進程本身在每次寫入之後同步執行的,即直到壓縮完成後才能開始下一個寫操作。就操作而言,這個是最簡單的,因為無需安排單獨的壓縮過程,但保證的數據新鮮度最低。不過,如果可以在每次寫操作中壓縮最新的表分區,同時又能延遲遲到/較舊分區的壓縮,這種方式仍然非常有用。2)非同步壓縮:使用這種方式,壓縮過程可以與表的寫操作同時非同步運行。這樣具有明顯的好處,即壓縮不會阻塞下一批數據寫入,從而產生近實時的數據新鮮度。Hudi DeltaStreamer之類的工具支援邊界的連續模式,其中的壓縮和寫入操作就是以這種方式在單個Spark運行時集群中進行的。
5.3 清理
清理是一項基本的即時操作,其執行的目的是刪除舊的文件片,並限制表佔用的存儲空間。清理會在每次寫操作之後自動執行,並利用時間軸伺服器上快取的時間軸元數據來避免掃描整個表來評估清理時機。Hudi支援兩種清理方式:1) 按commits / deltacommits清理:這是增量查詢中最常見且必須使用的模式。以這種方式,Cleaner會保留最近N次commit/delta commit提交中寫入的所有文件切片,從而有效提供在任何即時範圍內進行增量查詢的能力。儘管這對於增量查詢很有幫助,但由於保留了配置範圍內所有版本的文件片,因此,在某些高寫入負載的場景下可能需要更大的存儲空間。2) 按保留的文件片清理:這是一種更為簡單的清理方式,這裡我們僅保存每個文件組中的最後N個文件片。諸如Apache Hive之類的某些查詢引擎會處理非常大的查詢,這些查詢可能需要幾個小時才能完成,在這種情況下,將N設置為足夠大以至於不會刪除查詢仍然可以訪問的文件片是很有用的。此外,清理操作會保證每個文件組下面會一直只有一個文件片(最新的一片)。
5.4 DFS訪問優化
Hudi還對表中存儲的數據執行了幾種秘鑰存儲管理功能。在DFS上存儲數據的關鍵是管理文件大小和計數以及回收存儲空間。例如,HDFS在處理小文件問題上臭名昭著–在NameNode上施加記憶體/RPC壓力,可能破壞整個集群的穩定性。通常,查詢引擎可在適當大小的列文件上提供更好的性能,因為它們可以有效地攤銷獲取列統計資訊等的成本。即使在某些雲數據存儲上,列出包含大量小文件的目錄也會產生成本。下面是一些Hudi高效寫,管理數據存儲的方法:1)小文件處理特性會剖析輸入的工作負載,並將內容分配到現有的文件組,而不是創建新文件組(這會導致生成小文件)。2)在writer中使用一個時間軸快取,這樣只要Spark集群不每次都重啟,後續的寫操作就不需要列出DFS目錄來獲取指定分區路徑下的文件片列表。3)用戶還可以調整基本文件和日誌文件大小之間的比值係數以及期望的壓縮率,以便將足夠數量的insert分到統一文件組,從而生成大小合適的基本文件。4)智慧調整bulk insert並行度,可以再次調整大小合適的初始文件組。實際上,正確執行此操作非常關鍵,因為文件組一旦創建就不能被刪除,而只能如前面所述對其進行擴展。
6.查詢
鑒於這種靈活而全面的數據布局和豐富的時間線,Hudi能夠支援三種不同的查詢表方式,具體取決於表的類型。
查詢類型 | COW | MOR |
快照查詢 | 查詢在給定表或表分區中所有文件片中的最新基本文件上執行,將查看到最新提交的記錄。 | 通過併到給定表或表分區中的所有文件切片中最新的基本文件及其日誌文件合來執行查詢,將看到最新的delta-commit操作寫入的的記錄。 |
增量查詢 | 在給定的開始,結束即時時間範圍內,對最新的基本文件執行查詢(稱為增量查詢窗口),同時僅使用Hudi指定的列提取在此窗口中寫入的記錄。 | 查詢是在增量查詢窗口中對最新的文件片執行的,具體取決於窗口本身,讀取基本塊或日誌塊中讀取記錄的組合。 |
讀優化查詢 | 和快照查詢相同 | 僅訪問基本文件,提供給定文件片自上次執行壓縮操作以來的數據。通常查詢數據的最新程度的保證取決於壓縮策略 |
6.1 快照查詢
可查看給定delta commit或者commit即時操作後表的最新快照。在讀時合併(MOR)表的情況下,它通過即時合併最新文件片的基本文件和增量文件來提供近實時表(幾分鐘)。對於寫時複製(COW),它可以替代現有的parquet表(或相同基本文件類型的表),同時提供upsert/delete和其他寫入方面的功能。
6.2 增量查詢
可查看自給定commit/delta commit即時操作以來新寫入的數據。有效的提供變更流來啟用增量數據管道。
6.3 讀優化查詢
可查看給定的commit/compact即時操作的表的最新快照。僅將最新文件片的基本/列文件暴露給查詢,並保證與非Hudi表相同的列查詢性能。
指標 | 讀優化查詢 | 快照查詢 |
數據延遲 | 高 | 低 |
查詢延遲 | 低 | 高 |