Apache Hudi又雙叕被中國頂級雲服務提供商集成了!

是的,最近中國雲服務提供商騰訊雲在其EMR-V2.2.0版本中優先集成了Hudi 0.5.1版本作為其雲上的數據湖解決方案對外提供服務

Apache Hudi 在 HDFS 的數據集上提供了插入更新和增量拉取的流原語。

一般來說,我們會將大量數據存儲到 HDFS,新數據增量寫入,而舊數據鮮有改動,特別是在經過數據清洗,放入數據倉庫的場景。而且在數據倉庫如 hive 中,對於 update 的支援非常有限,計算昂貴。另一方面,若是有僅對某段時間內新增數據進行分析的場景,則 hive、presto、hbase 等也未提供原生方式,而是需要根據時間戳進行過濾分析。

在此需求下,Hudi 可以提供這兩種需求的實現。第一個是對 record 級別的更新,另一個是僅對增量數據的查詢。且 Hudi 提供了對 Hive、presto、Spark 的支援,可以直接使用這些組件對 Hudi 管理的數據進行查詢。

Hudi 是一個通用的大數據存儲系統,主要特性:

  • 攝取和查詢引擎之間的快照隔離,包括 Apache Hive、Presto 和 Apache Spark。
  • 支援回滾和存儲點,可以恢複數據集。
  • 自動管理文件大小和布局,以優化查詢性能准實時攝取,為查詢提供最新數據。
  • 實時數據和列數據的非同步壓縮。

時間軸

在它的核心,Hudi 維護一條包含在不同的即時時間所有對數據集操作的時間軸,從而提供了從不同時間點出發得到不同的視圖下的數據集。

Hudi 即時包含以下組件:

  • 操作類型:對數據集執行的操作類型。
  • 即時時間:即時時間通常是一個時間戳(例如:20190117010349),該時間戳按操作開始時間的順序單調增加。
  • 狀態:即時的狀態。

文件組織

Hudi 將 DFS 上的數據集組織到基本路徑下的目錄結構中。數據集分為多個分區,這些分區是包含該分區的數據文件的文件夾,這與 Hive 表非常相似。

每個分區被相對於基本路徑的特定分區路徑區分開來。

在每個分區內,文件被組織為文件組,由文件id唯一標識。

每個文件組包含多個文件切片,其中每個切片包含在某個提交/壓縮即時時間生成的基本列文件*.parquet以及一組日誌文件*.log*,該文件包含自生成基本文件以來對基本文件的插入/更新。

Hudi 採用 MVCC 設計,其中壓縮操作將日誌和基本文件合併以產生新的文件片,而清理操作則將未使用的/較舊的文件片刪除以回收 DFS 上的空間。

Hudi 通過索引機制將給定的 hoodie 鍵(記錄鍵+分區路徑)映射到文件組,從而提供了高效的 Upsert。

一旦將記錄的第一個版本寫入文件,記錄鍵和文件組/文件id之間的映射就永遠不會改變。簡而言之,映射的文件組包含一組記錄的所有版本。

存儲類型

Hudi 支援以下存儲類型:

  • 寫時複製:僅使用列文件格式(例如 parquet)存儲數據。通過在寫入過程中執行同步合併以更新版本並重寫文件。
  • 讀時合併:使用列式(例如 parquet)+ 基於行(例如 avro)的文件格式組合來存儲數據。 更新記錄到增量文件中,然後進行同步或非同步壓縮以生成列文件的新版本。

下表總結了這兩種存儲類型之間的權衡:

權衡 寫時複製 讀時合併
數據延遲 更高 更低
更新代價(I/O) 更高(重寫整個parquet文件) 更低(追加到增量日誌)
Parquet文件大小 更小(高更新代價(I/o)) 更大(低更新代價)
寫放大 更高 更低(取決於壓縮策略)

hudi 對 EMR 底層存儲支援

  • HDFS
  • COS

安裝 Hudi

進入 EMR 購買頁,選擇【產品版本】為 EMR-V2.2.0,選擇【可選組件】為【hudi 0.5.1】。hudi 組件默認安裝在 master 和 router 節點上。

! hudi 組件依賴 hive 和 spark 組件, 如果選擇安裝 hudi 組件,EMR 將自動安裝 hive 和 spark 組件。

使用示例

可參考 hudi 官網示例

  1. 登錄 master 節點,切換為 hadoop 用戶。
  2. 載入 spark 配置。
cd /usr/local/service/hudi
ln -s /usr/local/service/spark/conf/spark-defaults.conf /usr/local/service/hudi/demo/config/spark-defaults.conf 

上傳配置到 hdfs:

hdfs dfs -mkdir -p /hudi/config
hdfs dfs -copyFromLocal demo/config/* /hudi/config/
  1. 修改 kafka 數據源。
/usr/local/service/apps/hudi-0.5.1/demo/config/kafka-source.properties
bootstrap.servers=kafka_ip:kafka_port

上傳第一批次數據:

cat demo/data/batch_1.json | kafkacat -b [kafka_ip] -t stock_ticks -P
  1. 攝取第一批數據。
spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --master yarn ./hudi-utilities-bundle_2.11-0.5.1-incubating.jar   --table-type COPY_ON_WRITE --source-class org.apache.hudi.utilities.sources.JsonKafkaSource --source-ordering-field ts  --target-base-path /usr/hive/warehouse/stock_ticks_cow --target-table stock_ticks_cow --props /hudi/config/kafka-source.properties --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider
spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer  --master yarn ./hudi-utilities-bundle_2.11-0.5.1-incubating.jar  --table-type MERGE_ON_READ --source-class org.apache.hudi.utilities.sources.JsonKafkaSource --source-ordering-field ts  --target-base-path /usr/hive/warehouse/stock_ticks_mor --target-table stock_ticks_mor --props /hudi/config/kafka-source.properties --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider --disable-compaction
  1. 查看 hdfs 數據。
 hdfs dfs -ls /usr/hive/warehouse/
  1. 同步 hive 元數據。
bin/run_sync_tool.sh  --jdbc-url jdbc:hive2://[hiveserver2_ip:hiveserver2_port]  --user hadoop --pass [password] --partitioned-by dt --base-path /usr/hive/warehouse/stock_ticks_cow --database default --table stock_ticks_cow
bin/run_sync_tool.sh  --jdbc-url jdbc:hive2://[hiveserver2_ip:hiveserver2_port]   --user hadoop --pass [password]--partitioned-by dt --base-path /usr/hive/warehouse/stock_ticks_mor --database default --table stock_ticks_mor --skip-ro-suffix
  1. 使用計算引擎查詢數據。
  • hive 引擎
beeline -u jdbc:hive2://[hiveserver2_ip:hiveserver2_port] -n hadoop --hiveconf hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat --hiveconf hive.stats.autogather=false

或者 spark 引擎

spark-sql --master yarn --conf spark.sql.hive.convertMetastoreParquet=false

hive/spark 引擎執行如下 sql 語句:

select symbol, max(ts) from stock_ticks_cow group by symbol HAVING symbol = 'GOOG';
select `_hoodie_commit_time`, symbol, ts, volume, open, close  from stock_ticks_cow where  symbol = 'GOOG';
select symbol, max(ts) from stock_ticks_mor group by symbol HAVING symbol = 'GOOG';
select `_hoodie_commit_time`, symbol, ts, volume, open, close  from stock_ticks_mor where  symbol = 'GOOG';
select symbol, max(ts) from stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG';
select `_hoodie_commit_time`, symbol, ts, volume, open, close  from stock_ticks_mor_rt where  symbol = 'GOOG';
  • 進入 presto 引擎
/usr/local/service/presto-client/presto --server localhost:9000 --catalog hive --schema default --user Hadoop

presto 查詢有下劃線的欄位需要用雙引號,例如 "_hoodie_commit_time",執行如下 sql 語句:

select symbol, max(ts) from stock_ticks_cow group by symbol HAVING symbol = 'GOOG';
select "_hoodie_commit_time", symbol, ts, volume, open, close  from stock_ticks_cow where  symbol = 'GOOG';
select symbol, max(ts) from stock_ticks_mor group by symbol HAVING symbol = 'GOOG';
select "_hoodie_commit_time", symbol, ts, volume, open, close  from stock_ticks_mor where  symbol = 'GOOG';
select symbol, max(ts) from stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG';
  1. 上傳第二批數據。
cat demo/data/batch_2.json | kafkacat -b 10.0.1.70 -t stock_ticks -P
  1. 攝取第二批增量數據。
spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --master yarn ./hudi-utilities-bundle_2.11-0.5.1-incubating.jar   --table-type COPY_ON_WRITE --source-class org.apache.hudi.utilities.sources.JsonKafkaSource --source-ordering-field ts  --target-base-path /usr/hive/warehouse/stock_ticks_cow --target-table stock_ticks_cow --props /hudi/config/kafka-source.properties --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider
spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer  --master yarn ./hudi-utilities-bundle_2.11-0.5.1-incubating.jar  --table-type MERGE_ON_READ --source-class org.apache.hudi.utilities.sources.JsonKafkaSource --source-ordering-field ts  --target-base-path /usr/hive/warehouse/stock_ticks_mor --target-table stock_ticks_mor --props /hudi/config/kafka-source.properties --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider --disable-compaction
  1. 查詢增量數據,查詢方法同步驟7。
  2. 使用 hive-cli 工具。
 cli/bin/hudi-cli.sh
connect --path /usr/hive/warehouse/stock_ticks_mor
compactions show all
compaction schedule
合併執行計劃
 compaction run --compactionInstant [requestID] --parallelism 2 --sparkMemory 1G  --schemaFilePath /hudi/config/schema.avsc --retry 1
  1. 使用 tez/spark 引擎查詢。
beeline -u jdbc:hive2://[hiveserver2_ip:hiveserver2_port] -n hadoop --hiveconf hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat --hiveconf hive.stats.autogather=false
set hive.execution.engine=tez;
set hive.execution.engine=spark;

然後執行 sql 查詢,可參考步驟7。

與對象存儲結合使用

與 hdfs 類似,需要在存儲路徑前加上cosn://[bucket]。參考如下操作:

bin/kafka-server-start.sh config/server.properties &
cat     demo/data/batch_1.json | kafkacat -b kafkaip -t stock_ticks -P
cat     demo/data/batch_2.json | kafkacat -b kafkaip -t stock_ticks -P
kafkacat -b kafkaip  -L
hdfs dfs -mkdir -p cosn://[bucket]/hudi/config
hdfs dfs -copyFromLocal demo/config/*  cosn://[bucket]/hudi/config/


spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --master yarn ./hudi-utilities-bundle_2.11-0.5.1-incubating.jar   --table-type COPY_ON_WRITE --source-class org.apache.hudi.utilities.sources.JsonKafkaSource --source-ordering-field ts  --target-base-path cosn://[bucket]/usr/hive/warehouse/stock_ticks_cow --target-table stock_ticks_cow --props cosn://[bucket]/hudi/config/kafka-source.properties --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider


spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer  --master yarn ./hudi-utilities-bundle_2.11-0.5.1-incubating.jar  --table-type MERGE_ON_READ --source-class org.apache.hudi.utilities.sources.JsonKafkaSource --source-ordering-field ts  --target-base-path cosn://[bucket]/usr/hive/warehouse/stock_ticks_mor --target-table stock_ticks_mor --props cosn://[bucket]/hudi/config/kafka-source.properties --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider --disable-compaction


bin/run_sync_tool.sh  --jdbc-url jdbc:hive2://[hiveserver2_ip:hiveserver2_port] --user hadoop --pass isd@cloud --partitioned-by dt --base-path cosn://[bucket]/usr/hive/warehouse/stock_ticks_cow --database default --table stock_ticks_cow

bin/run_sync_tool.sh  --jdbc-url jdbc:hive2://[hiveserver2_ip:hiveserver2_port] --user hadoop --pass hive --partitioned-by dt --base-path cosn://[bucket]/usr/hive/warehouse/stock_ticks_mor --database default --table stock_ticks_mor --skip-ro-suffix


beeline -u jdbc:hive2://[hiveserver2_ip:hiveserver2_port] -n hadoop --hiveconf hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat --hiveconf hive.stats.autogather=false

spark-sql --master yarn --conf spark.sql.hive.convertMetastoreParquet=false


hivesqls:
select symbol, max(ts) from stock_ticks_cow group by symbol HAVING symbol = 'GOOG';
select `_hoodie_commit_time`, symbol, ts, volume, open, close  from stock_ticks_cow where  symbol = 'GOOG';
select symbol, max(ts) from stock_ticks_mor group by symbol HAVING symbol = 'GOOG';
select `_hoodie_commit_time`, symbol, ts, volume, open, close  from stock_ticks_mor where  symbol = 'GOOG';
select symbol, max(ts) from stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG';
select `_hoodie_commit_time`, symbol, ts, volume, open, close  from stock_ticks_mor_rt where  symbol = 'GOOG';

prestosqls:
/usr/local/service/presto-client/presto --server localhost:9000 --catalog hive --schema default --user Hadoop
select symbol, max(ts) from stock_ticks_cow group by symbol HAVING symbol = 'GOOG';
select "_hoodie_commit_time", symbol, ts, volume, open, close  from stock_ticks_cow where  symbol = 'GOOG';
select symbol, max(ts) from stock_ticks_mor group by symbol HAVING symbol = 'GOOG';
select "_hoodie_commit_time", symbol, ts, volume, open, close  from stock_ticks_mor where  symbol = 'GOOG';
select symbol, max(ts) from stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG';
select "_hoodie_commit_time", symbol, ts, volume, open, close  from stock_ticks_mor_rt where  symbol = 'GOOG';


cli/bin/hudi-cli.sh
connect --path cosn://[bucket]/usr/hive/warehouse/stock_ticks_mor
compactions show all
compaction schedule
compaction run --compactionInstant [requestid]  --parallelism 2 --sparkMemory 1G  --schemaFilePath cosn://[bucket]/hudi/config/schema.avsc --retry 1