大數據下一代變革之必研究數據湖技術Hudi原理實戰雙管齊下-上
@
概述
定義
Apache Hudi 官網地址 //hudi.apache.org/
Apache Hudi 官網文檔 //hudi.apache.org/docs/overview
Apache Hudi GitHub源碼地址 //github.com/apache/hudi
Apache Hudi是可以在資料庫層上使用增量數據管道構建流數據湖,滿足記錄級更新/刪除和更改流,並實現自我管理,支援流批一體並在此基礎上持續優化。最新版本為0.12.1
Apache Hudi(發音為「hoodie」)是下一代流數據湖平台,將核心倉庫和資料庫功能引入數據湖中。Hudi提供了表、事務、高效的upsert /delete、高級索引、流攝入服務、數據集群/壓縮優化和並發性,同時將數據保持為開源文件格式,在分散式文件存儲(雲存儲,HDFS或任何Hadoop文件系統兼容的存儲)上管理大型分析數據集的存儲;不僅非常適合於流工作負載,還允許創建高效的增量處理管道;得益於其高級性能優化,使得分析工作能否較好的支援流行的查詢引擎如Spark、Flink、Presto、Trino、Hive。總體框架及周邊關係如下:
Apache Hudi是一個快速發展的多元化社區,下面為使用和貢獻Hudi的小部分公司示例:
發展歷史
- 2015 年:發表了增量處理的核心思想/原則(O’reilly 文章)。
- 2016 年:由 Uber 創建並為所有資料庫/關鍵業務提供支援。
- 2017 年:由 Uber 開源,並支撐 100PB 數據湖。
- 2018 年:吸引大量使用者,並因雲計算普及。
- 2019 年:成為 ASF 孵化項目,並增加更多平台組件。
- 2020 年:畢業成為 Apache 頂級項目,社區、下載量、採用率增長超過 10 倍。
- 2021 年:支援 Uber 500PB 數據湖,SQL DML、Flink 集成、索引、元伺服器、快取。
特性
- 支援可插拔、快速索引的Upserts/Delete。
- 支援增量拉取表變更以進行增量查詢、記錄級別更改流等處理。
- 支援事務提交、回滾和並發控制,具有回滾支援的原子式發布數據。
- 支援Spark、 Flink、Presto、 Trino、Hive等引擎的SQL讀/寫。
- 自我管理小文件,數據聚簇、壓縮(行和列數據的非同步壓縮)和清理,使用統計資訊管理文件大小和布局,利用聚類優化數據湖布局。
- 流式攝入,內置CDC源和工具。
- 內置可擴展的存儲訪問的時間軸元數據跟蹤。
- 向後兼容的模式實現表結構變更的支援。寫入器和查詢之間的快照隔離,用於數據恢復的保存點。
使用場景
- 近實時寫入
- 減少碎片化工具的使用,直接使用內置工具。
- 通過CDC工具增量導入RDBMS數據。
- 限制小文件的大小和數量。
- 近實時分析
- 相對於秒級的存儲(Druid、時序資料庫)節省了資源。
- 提供了分鐘級別的時效性,支撐更高效的查詢。
- Hudi作為lib,非常輕量。
- 增量pipeline
- 區分arrivetime和eventtime處理延遲數據。
- 更短的調度間隔減少端到端的延遲(從小時級別到分鐘級別)的增量處理。
- 增量導出
- 替換部分Kafka的場景,數據導出到在線服務存儲如ES。
編譯安裝
編譯環境
-
組件版本
- Hadoop
- Hive
- Spark(Scala-2.12)
- Flink(Scala-2.12)
-
準備編譯環境Maven
編譯Hudi
- 上傳源碼包
# 可以在github中下載
wget //github.com/apache/hudi/archive/refs/tags/release-0.12.1.tar.gz
# 解壓
tar -xvf release-0.12.1.tar.gz
# 進入根目錄
cd hudi-release-0.12.1/
- 修改根目錄下的pom文件的組件版本和加速倉庫依賴下載,vim pom.xml
<hadoop.version>3.3.4</hadoop.version>
<hive.version>3.1.3</hive.version>
<repositories>
<repository>
<id>nexus-aliyun</id>
<name>nexus-aliyun</name>
<url>//maven.aliyun.com/nexus/content/groups/public/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
- 執行編譯命令
mvn clean package -DskipTests -Dspark3.3 -Dflink1.15 -Dscala-2.12 -Dhadoop.version=3.3.4 -Pflink-bundle-shade-hive3
編譯報錯
- 修改源碼(110行位置),vim hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
try (FSDataOutputStream outputStream = new FSDataOutputStream(baos,null)) {
- 手動安裝Kafka依賴
由於kafka-schema-registry-client-5.3.4.jar、common-utils-5.3.4.jar、common-config-5.3.4.jar、kafka-avro-serializer-5.3.4.jar這四個包一直沒有安裝成功,因此我們手動下載安裝到本地maven倉庫
# 下載confluent包
wget //packages.confluent.io/archive/5.3/confluent-5.3.4-2.12.zip
# 解壓
unzip confluent-5.3.4-2.12.zip
# 通過find命令找到存儲位置
find share/ -name kafka-schema-registry-client-5.3.4.jar
# 安裝到本地maven倉庫
mvn install:install-file -DgroupId=io.confluent -DartifactId=common-config -Dversion=5.3.4 -Dpackaging=jar -Dfile=./share/java/confluent-common/common-config-5.3.4.jar
mvn install:install-file -DgroupId=io.confluent -DartifactId=common-utils -Dversion=5.3.4 -Dpackaging=jar -Dfile=./share/java/confluent-common/common-utils-5.3.4.jar
mvn install:install-file -DgroupId=io.confluent -DartifactId=kafka-schema-registry-client -Dversion=5.3.4 -Dpackaging=jar -Dfile=./share/java/confluent-control-center/kafka-schema-registry-client-5.3.4.jar
mvn install:install-file -DgroupId=io.confluent -DartifactId=kafka-avro-serialize -Dversion=5.3.4 -Dpackaging=jar -Dfile=./share/java/confluent-control-center/kafka-avro-serializer-5.3.4.jar
-
解決spark模組依賴衝突(修改了Hive版本為3.1.2,其攜帶的jetty是0.9.3,hudi本身用的0.9.4)存在依賴衝突
- 修改hudi-spark-bundle的pom文件,排除低版本jetty,添加hudi指定版本的jetty。vim packaging/hudi-spark-bundle/pom.xml
在hive-service中376行之後增加如下內容
<exclusion> <artifactId>guava</artifactId> <groupId>com.google.guava</groupId> </exclusion> <exclusion> <groupId>org.eclipse.jetty</groupId> <artifactId>*</artifactId> </exclusion> <exclusion> <groupId>org.pentaho</groupId> <artifactId>*</artifactId> </exclusion>
在hive-jdbc中排除下面依賴
<exclusions> <exclusion> <groupId>javax.servlet</groupId> <artifactId>*</artifactId> </exclusion> <exclusion> <groupId>javax.servlet.jsp</groupId> <artifactId>*</artifactId> </exclusion> <exclusion> <groupId>org.eclipse.jetty</groupId> <artifactId>*</artifactId> </exclusion> </exclusions>
在hive-metastore中排除下面依賴
<exclusions> <exclusion> <groupId>javax.servlet</groupId> <artifactId>*</artifactId> </exclusion> <exclusion> <groupId>org.datanucleus</groupId> <artifactId>datanucleus-core</artifactId> </exclusion> <exclusion> <groupId>javax.servlet.jsp</groupId> <artifactId>*</artifactId> </exclusion> <exclusion> <artifactId>guava</artifactId> <groupId>com.google.guava</groupId> </exclusion> </exclusions>
在hive-commons中排除下面依賴
<exclusions> <exclusion> <groupId>org.eclipse.jetty.orbit</groupId> <artifactId>javax.servlet</artifactId> </exclusion> <exclusion> <groupId>org.eclipse.jetty</groupId> <artifactId>*</artifactId> </exclusion> </exclusions>
增加Hudi依賴的jetty版本
<!-- 增加hudi配置版本的jetty --> <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-server</artifactId> <version>${jetty.version}</version> </dependency> <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-util</artifactId> <version>${jetty.version}</version> </dependency> <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-webapp</artifactId> <version>${jetty.version}</version> </dependency> <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-http</artifactId> <version>${jetty.version}</version> </dependency>
- 修改hudi-utilities-bundle的pom文件,排除低版本jetty,添加hudi指定版本的jetty(否則在使用DeltaStreamer工具向hudi表插入數據時,也會報Jetty的錯誤)vim ./packaging/hudi-utilities-bundle/pom.xml
在hive-service中396行之後增加如下內容
<exclusion> <artifactId>servlet-api</artifactId> <groupId>javax.servlet</groupId> </exclusion> <exclusion> <artifactId>guava</artifactId> <groupId>com.google.guava</groupId> </exclusion> <exclusion> <groupId>org.eclipse.jetty</groupId> <artifactId>*</artifactId> </exclusion> <exclusion> <groupId>org.pentaho</groupId> <artifactId>*</artifactId> </exclusion>
在hive-jdbc中排除下面依賴
<exclusions> <exclusion> <groupId>javax.servlet</groupId> <artifactId>*</artifactId> </exclusion> <exclusion> <groupId>javax.servlet.jsp</groupId> <artifactId>*</artifactId> </exclusion> <exclusion> <groupId>org.eclipse.jetty</groupId> <artifactId>*</artifactId> </exclusion> </exclusions>
在hive-metastore中排除下面依賴
<exclusion> <groupId>javax.servlet</groupId> <artifactId>*</artifactId> </exclusion> <exclusion> <groupId>org.datanucleus</groupId> <artifactId>datanucleus-core</artifactId> </exclusion> <exclusion> <groupId>javax.servlet.jsp</groupId> <artifactId>*</artifactId> </exclusion> <exclusion> <artifactId>guava</artifactId> <groupId>com.google.guava</groupId> </exclusion>
在hive-commons中排除下面依賴
<exclusions> <exclusion> <groupId>org.eclipse.jetty.orbit</groupId> <artifactId>javax.servlet</artifactId> </exclusion> <exclusion> <groupId>org.eclipse.jetty</groupId> <artifactId>*</artifactId> </exclusion> </exclusions>
增加Hudi依賴的jetty版本
<!-- 增加hudi配置版本的jetty --> <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-server</artifactId> <version>${jetty.version}</version> </dependency> <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-util</artifactId> <version>${jetty.version}</version> </dependency> <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-webapp</artifactId> <version>${jetty.version}</version> </dependency> <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-http</artifactId> <version>${jetty.version}</version> </dependency>
-
重新執行編譯命令,等待5~10分鐘時間
- 驗證編譯:上一步編譯成功後,執行hudi-cli/hudi-cli.sh 能進入hudi-cli說明成功
- 編譯完成後,相關的包在packaging目錄的各個模組中,比如flink與hudi的包
關鍵概念
TimeLine(時間軸)
Hudi的核心是維護表上在不同時刻執行的所有操作的時間軸,這有助於提供表的瞬時視圖,同時還有效地支援按到達順序檢索數據。TimeLine是Hudi實現管理事務和其他表服務,一個Hudi瞬間由以下幾個部分組成:
- Instant action(即時動作):在表上執行的動作類型;Hudi保證在時間軸上執行的操作是原子的,並且是基於即時時間的時間軸一致的。
- COMMITS:表示將一批記錄原子地寫入表。
- CLEANS:清除表中不再需要的舊版本文件的後台活動。
- DELTA_COMMIT:增量提交是指將一批記錄原子地寫入MergeOnRead類型的表,其中一些/所有數據可以直接寫入增量日誌。
- COMPACTION :協調Hudi中不同數據結構的後台活動,例如:將更新從基於行的日誌文件移動到柱狀格式。在內部,壓縮表現為時間軸上的特殊提交。
- ROLLBACK:指示提交/增量提交失敗並回滾,刪除在此寫入過程中產生的所有部分文件。
- SAVEPOINT:將某些文件組標記為「已保存」,以便清理器不會刪除它們。在發生災難/數據恢復場景時,它有助於將表恢復到時間軸上的某個點。
- Instant time(即時時間):即時時間通常是一個時間戳(例如:20190117010349),它按動作開始時間的順序單調增加。有兩個重要時間概念
- Arrival time:數據到達Hudi的時間。
- Event Time:數據記錄中的時間。
- State:瞬時的當前狀態。
- REQUESTED:表示一個action已經調度,但尚未執行。
- INFLIGHT:表示當前action正在執行。
- COMPLETED:表示時間軸上action已完成。
File Layouts(文件布局)
Apache Hudi 文件在存儲上的總體布局方式如下:
- Hudi將數據表組織到分散式文件系統的基本路徑下的目錄結構中。
- 表被分成多幾個分區,這些分區是包含該分區的數據文件的文件夾,非常類似Hive表。
- 在每個分區中,文件被組織到文件組中,由文件ID唯一標識。
- 每個文件組包含幾個文件片(FileSlice)。
- 每個文件片都包含在某個 commit/compaction 瞬間時間生成的一個BaseFile(MOR可能沒有),以及一組LogFile文件(COW可能沒有),其中包含自BaseFile生成以來對BaseFile的插入/更新。Hudi將一個表映射為如下文件結構:
- 元數據:.hoodie目錄對應著表的元數據資訊,包括表的版本管理(Timeline)、歸檔目錄(存放過時的instant也就是版本),一個instant記錄了一次提交的行為、時間戳和狀態;Hudi以時間軸的形式維護了在數據集上執行的所有操作的元數據。
- 數據:和hive一樣,以分區方式存放數據;分區裡面存放著BaseFile(.parquet)和LogFile(.log.*)。
- Hudi採用多版本並發控制(MVCC)
- compaction 操作:合併日誌和基本文件以產生新的文件片。
- clean 操作:清除不使用的/舊的文件片以回收文件系統上的空間。
- Hudi的BaseFile在 footer 的 meta記錄了 record key 組成的 BloomFilter,用於在 file based index 實現高效率的 key contains 檢測。只有不在 BloomFilter 的 key 才需要掃描整個文件——索引檢測key是否存在。
- Hudi 的 log 文件通過積攢數據 buffer 以 LogBlock 為單位寫出,每個 LogBlock 包含 magic number、size、content、footer 等資訊,用於數據讀、校驗和過濾。
索引
-
原理:Hudi通過索引機制提供高效的upserts,具體是將hoodie key(record key+partition path)與文件id(文件組)建立唯一映射,映射的文件組包含一組記錄的所有版本。
- 數據第一次寫入文件後保持不變,一個FileGroup包含了一批record的所有版本記錄。index用於區分消息是insert還是update;此做法的意義在於,當更新的數據到了之後可以快速定位到對應的FileGroup,避免了不必要的更新,只需要在FileGroup內做合併。
- 對於Copy-On-Write tables 可以實現快速的upsert/delete操作,避免了需要針對整個數據集進行關聯來確定要重寫哪些文件。
- 對於 Merge-On-Read tables 這種設計允許Hudi限制需要合併的任何給定基文件的記錄數量。具體地說,給定的基本文件只需要針對作為該基本文件一部分的記錄的更新進行合併。
下圖中黃色塊為更新文件,白色塊為基本文件
-
索引的類型
- Bloom Index(默認):使用布隆過濾器來判斷記錄存在與否,也可以選擇使用record key範圍修剪候選文件。
- 優點:效率高,不依賴外部系統,數據和索引保持一致性。
- 缺點:因偽正率問題,還需回溯原文件再查找一遍。
- Simple Index:根據從存儲上的表中提取的鍵,把update/delete操作的新數據和老數據進行join。
- 優點:實現最簡單,無需額外的資源。
- 缺點:性能比較差。
- HBase Index:管理外部Apache HBase表的索引映射,把index存放在HBase裡面,在插入 File Group定位階段所有task向HBase發送 Batch Get 請求,獲取 Record Key 的 Mapping 資訊。
- 優點:對於小批次的keys,查詢效率高。
- 缺點:需要外部的系統,增加了運維壓力。
- 自帶實現:您可以擴展這個公共API來實現自定義索引。
- Bloom Index(默認):使用布隆過濾器來判斷記錄存在與否,也可以選擇使用record key範圍修剪候選文件。
-
全局索引/非全局索引
- 全局索引:全局索引在全表的所有分區範圍下強制要求鍵的唯一性,也就是確保對給定的鍵有且只有一個對應的記錄。全局索引提供了更強的保證,但是隨著表增大,update/delete 操作損失的性能越高,因此更適用於小表。
- 非全局索引:默認的索引實現,只能保證數據在分區的唯一性。非全局索引依靠寫入器為同一個記錄的update/delete提供一致的分區路徑,同時大幅提高了效率,更適用於大表。
- HBase索引本質上是一個全局索引,bloom和simple index都有全局選項:
hoodie.index.type=GLOBAL_BLOOM hoodie.index.type=GLOBAL_SIMPLE
-
索引的選擇策略
- 對事實表的延遲更新:許多公司在NoSQL資料庫上存儲大量交易數據,例如共享的行程數據、股票交易數據、電商的訂單數據,這些表大部分的更新會隨機發生在較新的時間記錄上,而對舊的數據有著長尾分布型的更新。也即是只有小部分會在舊的分區,這種可以使用布隆索引,如果record key是有序的,那就可以通過範圍進一步篩選;如果更加高效的使用布隆過濾器進行比對,hudi快取了輸入記錄並且使用了自定義的分區器和統計的規律來解決了數據的傾斜,如果偽正率較高,查詢會增加數據的打亂操作,也會根據數據量來調整大小從而達到設定的假陽性率。
- 對事件表的去重:事件流數據無所不在,比如從kafka或者其他消息件發出的數據,插入和更新只存在於最新的幾個分區中,重複事件較多,所以在入湖之前去重是一個常見的需求;雖然可以使用hbase索引進行去重,但索引存儲的消耗還是會隨著事件的增長而線性增長,所以有範圍裁剪的布隆索引才是最佳的解決方案,可以使用事件時間戳+事件id組成的鍵作為去重條件。
- 對維度表的隨機更新:使用布隆裁剪就不合適,直接使用普通簡單索引就合適,直接將所有的文件的所需欄位連接;也可以採用HBase索引,其對這些表能提供更加優越的查詢效率;當遇到分區內數據需要更新時,較為適合採用Merge-On-Read表。
表類型
Hudi表類型定義了如何在DFS上對數據進行索引和布局,以及如何在這種組織之上實現上述原語和時間軸活動(即如何寫入數據)。反過來,查詢類型定義了如何向查詢公開底層數據(即如何讀取數據)。Hudi表類型分為COPY_ON_WRITE(寫時複製)和MERGE_ON_READ(讀時合併)。
-
Copy On Write
- 使用專門的列式格式存儲數據(例如parquet),通過在寫過程中執行同步合併,簡單地更新文件的版本和重寫。
- 只有數據文件/基本文件(.parquet),沒有增量日誌文件(.log.*)。
- 對於每一個新批次的寫入都將創建相應數據文件的版本(新的FileSlice),也就是第一次寫入文件為fileslice1,第二次更新追加操作就是fileslice2。
- data_file1 和 data_file2 都將創建更新的版本,data_file1 V2 是data_file1 V1 的內容與data_file1 中傳入批次匹配記錄的記錄合併。
- cow是在寫入期間進行合併,因此會產生一些延時,但是它最大的特點在於簡單性,不需要其他表的服務,也相對容易調試。
當數據寫入寫入即寫複製表並在其上運行兩個查詢時
- Merge On Read
- 使用列式存儲(如parquet) +基於行(如avro)的文件格式組合存儲數據,更新被記錄到增量文件,然後壓縮以同步或非同步生成新版本的列式文件。
- 可能包含列存的基本文件(.parquet)和行存的增量日誌文件(基於行的avro格式,.log文件)。
- 所以對於初始的文件也是追加的avro文件,後續修改追加的文件是avro文件,而且只有在讀的時候或者compaction才會合併成列文件。
- compaction可以選擇內聯或者非同步方式,比如可以將壓縮的最大增量日誌配置為 4。這意味著在進行 4 次增量寫入後,將對數據文件進行壓縮並創建更新版本的數據文件。
- 不同索引寫文件會有差異,布隆索引插入還是寫入parquet文件,只有更新才會寫入avro文件,因為當parquet文件記錄了要更新消息的FileGroupID;而對於Flink索引可以直接寫入avro文件。
在讀表上合併的目的是支援直接在DFS上進行接近實時的處理,而不是將數據複製到可能無法處理數據量的專門系統。這個表還有一些次要的好處,比如通過避免數據的同步合併減少了寫量的增加,即在批處理中每1個位元組的數據寫入的數據量。下面為兩種類型的查詢—快照查詢和讀取優化查詢的圖說明
- COW適合批次處理,MOR適合批流一體但更適合流式計算,COW與MOR的對比如下
CopyOnWrite | MergeOnRead | |
---|---|---|
數據延遲 | 高 | 低 |
查詢延遲 | 低 | 高 |
更新 (I/O)成本 | 高(重寫整個 parquet文件) | 低 (追加到增量日誌) |
Parquet 文件大小 | 小 | 較大 |
寫擴大 | 高 | 低(依賴合併或壓縮策略) |
查詢類型
-
查詢類型:支援快照查詢、增量查詢、讀優化查詢三種查詢類型。
-
快照查詢:提供對實時數據的快照查詢,使用基於列和基於行的存儲的組合(例如Parquet + Avro)。針對全量最新數據COW表直接查最新的parquet文件,而MOR表需要做一個合併(最新全量數據)。
-
增量查詢:提供一個更改流,其中包含在某個時間點之後插入或更新的記錄。可以查詢給定commit/delta commit即時操作以來新寫入的數據。有效的提供變更流來啟用增量數據管道(最新增量數據)。
-
讀優化查詢:通過純列存儲(例如Parquet)提供出色的快照查詢性能。可查看給定的commit/compact即時操作的表的最新快照。僅將最新文件片的基本/列文件暴露給查詢,並保證與非Hudi表相同的列查詢性能(並不是全量最新),只是合併時文件。
-
不同表支援查詢類型
Table Type | Supported Query types |
---|---|
Copy On Write | Snapshot Queries + Incremental Queries |
Merge On Read | Snapshot Queries + Incremental Queries + Read Optimized Queries |
不同查詢類型之間的權衡
快照 | 讀優化 | |
---|---|---|
數據延遲 | 低 | 高 |
查詢延遲 | 高 (合併基本文件/列式文件 + 基於行的 delta 日誌文件) | 低(行原始 / 列式文件性能) |
**本人部落格網站 **IT小神 www.itxiaoshen.com