Hive 集成 Hudi 實踐(含代碼)| 可能是全網最詳細的數據湖系列
公眾號後台越來越多人問關於數據湖相關的內容,看來大家對新技術還是很感興趣的。關於數據湖的資料網絡上還是比較少的,特別是實踐系列,對於新技術來說,基礎的入門文檔還是很有必要的,所以這一篇希望能夠幫助到想使用Hudi的同學入門。
本篇的Hudi使用的是孵化版本 0.5.2;其他依賴 Spark-2.4.4,Hive-1.1.0
Hudi 服務器環境準備
wget //github.com/apache/hudi/archive/release-0.5.2-incubating.tar.gz
tar zxvf release-0.5.2-incubating.tar.gz
cd release-0.5.2-incubating
mvn clean package -DskipTests -DskipITs
cp ./hudi-hadoop-mr/target/hudi-hadoop-mr-0.5.2-incubating.jar $HIVE_HOME/lib/
拷貝依賴包到 Hive 路徑是為了 Hive 能夠正常讀到 Hudi 的數據,至此服務器環境準備完畢。
用 Spark 寫一段數據
一切準備完畢先寫一段數據到 Hudi 里,首先數據源 ods.ods_user_event 的表結構為:
CREATE TABLE ods.ods_user_event(
uuid STRING,
name STRING,
addr STRING,
update_time STRING,
date STRING)
stored as parquet;
然後是 Maven 的依賴,詳細代碼關注公眾號【老懞大數據】回復 hudi 後即可獲取。
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-spark_2.11</artifactId>
<version>0.5.2-incubating</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-common</artifactId>
<version>0.5.2-incubating</version>
</dependency>
代碼邏輯:
- 初始化 SparkSession,配置相關配置項
- 構建 DataFrame,大家可以自由發揮,這裡的案例是從Hive讀數據構建。
- DataFrame寫入Hudi,這一塊說到底就是把數據寫入 HDFS 路徑下,但是需要一堆配置,這些配置就體現了 Hudi 的特性:
- DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY:指定唯一id的列名
- DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY:指定更新時間,該字段數值大的數據會覆蓋小的
- DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY:指定分區列,和Hive的分區概念類似
- HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH:設置當分區變更時,當前數據的分區目錄是否變更
- HoodieIndexConfig.INDEX_TYPE_PROP:設置索引類型目前有 HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM 四種索引
上述例子中,選擇了 HoodieGlobalBloomIndex(全局索引),會在所有分區內查找指定的 recordKey。而 HoodieBloomIndex 只在指定的分區內查找。
def main(args: Array[String]): Unit = {
val sss = SparkSession.builder.appName("hudi")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("hive.metastore.uris", "thrift://ip:port")
.enableHiveSupport().getOrCreate()
val sql = "select * from ods.ods_user_event"
val df: DataFrame = sss.sql(sql)
df.write.format("org.apache.hudi")
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "recordKey")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "update_time")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "date")
.option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true")
.option(HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name())
.option("hoodie.insert.shuffle.parallelism", "10")
.option("hoodie.upsert.shuffle.parallelism", "10")
.option(HoodieWriteConfig.TABLE_NAME, "ods.ods_user_event_hudi")
.mode(SaveMode.Append)
.save("/user/hudi/lake/ods.db/ods_user_event_hudi")
}
執行成功後會有如下結果,因為我們是按照date分區,每一天的數據會生成一個文件夾和Hive類似。
[hadoop@hadoop31 ~]# hdfs dfs -ls /user/hudi/lake/ods.db/ods_user_event_hudi/
Found 4 items
drwxr-xr-x - hadoop hadoop 0 2020-05-25 18:42 /user/hudi/lake/ods.db/ods_user_event_hudi/20200501
drwxr-xr-x - hadoop hadoop 0 2020-05-25 18:42 /user/hudi/lake/ods.db/ods_user_event_hudi/20200502
drwxr-xr-x - hadoop hadoop 0 2020-05-25 18:42 /user/hudi/lake/ods.db/ods_user_event_hudi/20200503
drwxr-xr-x - hadoop hadoop 0 2020-05-25 18:42 /user/hudi/lake/ods.db/ods_user_event_hudi/20200504
另外,注意 recordKey 必須唯一,不然數據會被覆蓋,且值不能為 null,否則會有以下報錯。
Caused by: org.apache.hudi.exception.HoodieKeyException: recordKey value: "null" for field: "user_uid" cannot be null or empty.
Hive 創建外部表讀數據
上一步中 Spark 將數據寫到了 hudi,想要通過Hive訪問到這塊數據,就需要創建一個Hive外部表了,因為 Hudi 配置了分區,所以為了能讀到所有的數據,咱們的外部表也得分區,分區字段名可隨意配置。
CREATE TABLE ods.ods_user_event_hudi(
uuid STRING,
name STRING,
addr STRING,
update_time STRING,
date STRING)
PARTITIONED BY (
`dt` string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hudi.hadoop.HoodieParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
'/user/hudi/lake/ods.db/ods_user_event_hudi'
至此,直接讀數據肯定是空的,因為我們創建的是個分區表,所以還需要指定分區
alter table ods.ods_user_event_hudi add if not exists partition(dt='20200504') location '/user/hudi/lake/ods.db/ods_user_event_hudi/20200504'
那麼這個時候問題來了,一年有365個分區,要一個一個建立手動創建分區嗎?
抱歉我也沒發現更好的辦法,只能送你個簡單的腳本了。
#!/bin/bash
start_date=20190101
end_date=20200520
start=`date -d "$start_date" "+%s"`
end=`date -d "$end_date" "+%s"`
for((i=start;i<=end;i+=86400)); do
dt=$(date -d "@$i" "+%Y%m%d")
hive -e "alter table ods.ods_user_event_hudi add if not exists partition(dt='${dt}') location '/user/hudi/lake/ods.db/ods_user_event_hudi/${dt}';
"
done
後記
最後,執行 select * from ods.ods_user_event_hudi 要是沒有數據你來找我。另外值得注意的是,如果此時直接用 Hive 將數據 insert into ods.ods_user_event_hudi,雖然數據會寫入到 hudi 的目錄下,但是相同的 recordKey 是不會覆蓋原有數據的。
下一篇詳細寫 Spark 操作 Hudi 的相關內容,敬請期待。本篇詳細代碼關注公眾號【老懞大數據】回復 hudi 後即可獲取。
推薦閱讀
3000字長文教你大數據該怎麼學!
選方向?大數據的職位你了解多少