Apache Hudi又雙叕被中國頂級雲服務提供商集成了!
- 2020 年 5 月 9 日
- 筆記
是的,最近中國雲服務提供商騰訊雲在其EMR-V2.2.0版本中優先集成了Hudi 0.5.1版本作為其雲上的數據湖解決方案對外提供服務
Apache Hudi 在 HDFS 的數據集上提供了插入更新和增量拉取的流原語。
一般來說,我們會將大量數據存儲到 HDFS,新數據增量寫入,而舊數據鮮有改動,特別是在經過數據清洗,放入數據倉庫的場景。而且在數據倉庫如 hive 中,對於 update 的支援非常有限,計算昂貴。另一方面,若是有僅對某段時間內新增數據進行分析的場景,則 hive、presto、hbase 等也未提供原生方式,而是需要根據時間戳進行過濾分析。
在此需求下,Hudi 可以提供這兩種需求的實現。第一個是對 record 級別的更新,另一個是僅對增量數據的查詢。且 Hudi 提供了對 Hive、presto、Spark 的支援,可以直接使用這些組件對 Hudi 管理的數據進行查詢。
Hudi 是一個通用的大數據存儲系統,主要特性:
- 攝取和查詢引擎之間的快照隔離,包括 Apache Hive、Presto 和 Apache Spark。
- 支援回滾和存儲點,可以恢複數據集。
- 自動管理文件大小和布局,以優化查詢性能准實時攝取,為查詢提供最新數據。
- 實時數據和列數據的非同步壓縮。
時間軸
在它的核心,Hudi 維護一條包含在不同的即時時間所有對數據集操作的時間軸,從而提供了從不同時間點出發得到不同的視圖下的數據集。
Hudi 即時包含以下組件:
- 操作類型:對數據集執行的操作類型。
- 即時時間:即時時間通常是一個時間戳(例如:20190117010349),該時間戳按操作開始時間的順序單調增加。
- 狀態:即時的狀態。
文件組織
Hudi 將 DFS 上的數據集組織到基本路徑
下的目錄結構中。數據集分為多個分區,這些分區是包含該分區的數據文件的文件夾,這與 Hive 表非常相似。
每個分區被相對於基本路徑的特定分區路徑
區分開來。
在每個分區內,文件被組織為文件組
,由文件id
唯一標識。
每個文件組包含多個文件切片
,其中每個切片包含在某個提交/壓縮即時時間生成的基本列文件*.parquet
以及一組日誌文件*.log*
,該文件包含自生成基本文件以來對基本文件的插入/更新。
Hudi 採用 MVCC 設計,其中壓縮操作將日誌和基本文件合併以產生新的文件片,而清理操作則將未使用的/較舊的文件片刪除以回收 DFS 上的空間。
Hudi 通過索引機制將給定的 hoodie 鍵(記錄鍵+分區路徑)映射到文件組,從而提供了高效的 Upsert。
一旦將記錄的第一個版本寫入文件,記錄鍵和文件組
/文件id
之間的映射就永遠不會改變。簡而言之,映射的文件組包含一組記錄的所有版本。
存儲類型
Hudi 支援以下存儲類型:
- 寫時複製:僅使用列文件格式(例如 parquet)存儲數據。通過在寫入過程中執行同步合併以更新版本並重寫文件。
- 讀時合併:使用列式(例如 parquet)+ 基於行(例如 avro)的文件格式組合來存儲數據。 更新記錄到增量文件中,然後進行同步或非同步壓縮以生成列文件的新版本。
下表總結了這兩種存儲類型之間的權衡:
權衡 | 寫時複製 | 讀時合併 |
---|---|---|
數據延遲 | 更高 | 更低 |
更新代價(I/O) | 更高(重寫整個parquet文件) | 更低(追加到增量日誌) |
Parquet文件大小 | 更小(高更新代價(I/o)) | 更大(低更新代價) |
寫放大 | 更高 | 更低(取決於壓縮策略) |
hudi 對 EMR 底層存儲支援
- HDFS
- COS
安裝 Hudi
進入 EMR 購買頁,選擇【產品版本】為 EMR-V2.2.0,選擇【可選組件】為【hudi 0.5.1】。hudi 組件默認安裝在 master 和 router 節點上。
! hudi 組件依賴 hive 和 spark 組件, 如果選擇安裝 hudi 組件,EMR 將自動安裝 hive 和 spark 組件。
使用示例
可參考 hudi 官網示例:
- 登錄 master 節點,切換為 hadoop 用戶。
- 載入 spark 配置。
cd /usr/local/service/hudi
ln -s /usr/local/service/spark/conf/spark-defaults.conf /usr/local/service/hudi/demo/config/spark-defaults.conf
上傳配置到 hdfs:
hdfs dfs -mkdir -p /hudi/config
hdfs dfs -copyFromLocal demo/config/* /hudi/config/
- 修改 kafka 數據源。
/usr/local/service/apps/hudi-0.5.1/demo/config/kafka-source.properties
bootstrap.servers=kafka_ip:kafka_port
上傳第一批次數據:
cat demo/data/batch_1.json | kafkacat -b [kafka_ip] -t stock_ticks -P
- 攝取第一批數據。
spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --master yarn ./hudi-utilities-bundle_2.11-0.5.1-incubating.jar --table-type COPY_ON_WRITE --source-class org.apache.hudi.utilities.sources.JsonKafkaSource --source-ordering-field ts --target-base-path /usr/hive/warehouse/stock_ticks_cow --target-table stock_ticks_cow --props /hudi/config/kafka-source.properties --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider
spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --master yarn ./hudi-utilities-bundle_2.11-0.5.1-incubating.jar --table-type MERGE_ON_READ --source-class org.apache.hudi.utilities.sources.JsonKafkaSource --source-ordering-field ts --target-base-path /usr/hive/warehouse/stock_ticks_mor --target-table stock_ticks_mor --props /hudi/config/kafka-source.properties --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider --disable-compaction
- 查看 hdfs 數據。
hdfs dfs -ls /usr/hive/warehouse/
- 同步 hive 元數據。
bin/run_sync_tool.sh --jdbc-url jdbc:hive2://[hiveserver2_ip:hiveserver2_port] --user hadoop --pass [password] --partitioned-by dt --base-path /usr/hive/warehouse/stock_ticks_cow --database default --table stock_ticks_cow
bin/run_sync_tool.sh --jdbc-url jdbc:hive2://[hiveserver2_ip:hiveserver2_port] --user hadoop --pass [password]--partitioned-by dt --base-path /usr/hive/warehouse/stock_ticks_mor --database default --table stock_ticks_mor --skip-ro-suffix
- 使用計算引擎查詢數據。
- hive 引擎
beeline -u jdbc:hive2://[hiveserver2_ip:hiveserver2_port] -n hadoop --hiveconf hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat --hiveconf hive.stats.autogather=false
或者 spark 引擎
spark-sql --master yarn --conf spark.sql.hive.convertMetastoreParquet=false
hive/spark 引擎執行如下 sql 語句:
select symbol, max(ts) from stock_ticks_cow group by symbol HAVING symbol = 'GOOG';
select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';
select symbol, max(ts) from stock_ticks_mor group by symbol HAVING symbol = 'GOOG';
select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor where symbol = 'GOOG';
select symbol, max(ts) from stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG';
select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG';
- 進入 presto 引擎
/usr/local/service/presto-client/presto --server localhost:9000 --catalog hive --schema default --user Hadoop
presto 查詢有下劃線的欄位需要用雙引號,例如 "_hoodie_commit_time"
,執行如下 sql 語句:
select symbol, max(ts) from stock_ticks_cow group by symbol HAVING symbol = 'GOOG';
select "_hoodie_commit_time", symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';
select symbol, max(ts) from stock_ticks_mor group by symbol HAVING symbol = 'GOOG';
select "_hoodie_commit_time", symbol, ts, volume, open, close from stock_ticks_mor where symbol = 'GOOG';
select symbol, max(ts) from stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG';
- 上傳第二批數據。
cat demo/data/batch_2.json | kafkacat -b 10.0.1.70 -t stock_ticks -P
- 攝取第二批增量數據。
spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --master yarn ./hudi-utilities-bundle_2.11-0.5.1-incubating.jar --table-type COPY_ON_WRITE --source-class org.apache.hudi.utilities.sources.JsonKafkaSource --source-ordering-field ts --target-base-path /usr/hive/warehouse/stock_ticks_cow --target-table stock_ticks_cow --props /hudi/config/kafka-source.properties --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider
spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --master yarn ./hudi-utilities-bundle_2.11-0.5.1-incubating.jar --table-type MERGE_ON_READ --source-class org.apache.hudi.utilities.sources.JsonKafkaSource --source-ordering-field ts --target-base-path /usr/hive/warehouse/stock_ticks_mor --target-table stock_ticks_mor --props /hudi/config/kafka-source.properties --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider --disable-compaction
- 查詢增量數據,查詢方法同步驟7。
- 使用 hive-cli 工具。
cli/bin/hudi-cli.sh
connect --path /usr/hive/warehouse/stock_ticks_mor
compactions show all
compaction schedule
合併執行計劃
compaction run --compactionInstant [requestID] --parallelism 2 --sparkMemory 1G --schemaFilePath /hudi/config/schema.avsc --retry 1
- 使用 tez/spark 引擎查詢。
beeline -u jdbc:hive2://[hiveserver2_ip:hiveserver2_port] -n hadoop --hiveconf hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat --hiveconf hive.stats.autogather=false
set hive.execution.engine=tez;
set hive.execution.engine=spark;
然後執行 sql 查詢,可參考步驟7。
與對象存儲結合使用
與 hdfs 類似,需要在存儲路徑前加上cosn://[bucket]
。參考如下操作:
bin/kafka-server-start.sh config/server.properties &
cat demo/data/batch_1.json | kafkacat -b kafkaip -t stock_ticks -P
cat demo/data/batch_2.json | kafkacat -b kafkaip -t stock_ticks -P
kafkacat -b kafkaip -L
hdfs dfs -mkdir -p cosn://[bucket]/hudi/config
hdfs dfs -copyFromLocal demo/config/* cosn://[bucket]/hudi/config/
spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --master yarn ./hudi-utilities-bundle_2.11-0.5.1-incubating.jar --table-type COPY_ON_WRITE --source-class org.apache.hudi.utilities.sources.JsonKafkaSource --source-ordering-field ts --target-base-path cosn://[bucket]/usr/hive/warehouse/stock_ticks_cow --target-table stock_ticks_cow --props cosn://[bucket]/hudi/config/kafka-source.properties --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider
spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --master yarn ./hudi-utilities-bundle_2.11-0.5.1-incubating.jar --table-type MERGE_ON_READ --source-class org.apache.hudi.utilities.sources.JsonKafkaSource --source-ordering-field ts --target-base-path cosn://[bucket]/usr/hive/warehouse/stock_ticks_mor --target-table stock_ticks_mor --props cosn://[bucket]/hudi/config/kafka-source.properties --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider --disable-compaction
bin/run_sync_tool.sh --jdbc-url jdbc:hive2://[hiveserver2_ip:hiveserver2_port] --user hadoop --pass isd@cloud --partitioned-by dt --base-path cosn://[bucket]/usr/hive/warehouse/stock_ticks_cow --database default --table stock_ticks_cow
bin/run_sync_tool.sh --jdbc-url jdbc:hive2://[hiveserver2_ip:hiveserver2_port] --user hadoop --pass hive --partitioned-by dt --base-path cosn://[bucket]/usr/hive/warehouse/stock_ticks_mor --database default --table stock_ticks_mor --skip-ro-suffix
beeline -u jdbc:hive2://[hiveserver2_ip:hiveserver2_port] -n hadoop --hiveconf hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat --hiveconf hive.stats.autogather=false
spark-sql --master yarn --conf spark.sql.hive.convertMetastoreParquet=false
hivesqls:
select symbol, max(ts) from stock_ticks_cow group by symbol HAVING symbol = 'GOOG';
select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';
select symbol, max(ts) from stock_ticks_mor group by symbol HAVING symbol = 'GOOG';
select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor where symbol = 'GOOG';
select symbol, max(ts) from stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG';
select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG';
prestosqls:
/usr/local/service/presto-client/presto --server localhost:9000 --catalog hive --schema default --user Hadoop
select symbol, max(ts) from stock_ticks_cow group by symbol HAVING symbol = 'GOOG';
select "_hoodie_commit_time", symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';
select symbol, max(ts) from stock_ticks_mor group by symbol HAVING symbol = 'GOOG';
select "_hoodie_commit_time", symbol, ts, volume, open, close from stock_ticks_mor where symbol = 'GOOG';
select symbol, max(ts) from stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG';
select "_hoodie_commit_time", symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG';
cli/bin/hudi-cli.sh
connect --path cosn://[bucket]/usr/hive/warehouse/stock_ticks_mor
compactions show all
compaction schedule
compaction run --compactionInstant [requestid] --parallelism 2 --sparkMemory 1G --schemaFilePath cosn://[bucket]/hudi/config/schema.avsc --retry 1