hudi clustering 數據聚集(二)
- 2021 年 11 月 12 日
- 筆記
- clustering, Hudi, spark
小文件合併解析
執行程式碼:
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
val t1 = "t1"
val basePath = "file:///tmp/hudi_data/"
val dataGen = new DataGenerator(Array("2020/03/11"))
// 生成隨機數據100條
val updates = convertToStringList(dataGen.generateInserts(100))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 1));
df.write.format("org.apache.hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, t1).
// 每次寫入的數據都生成一個新的文件
option("hoodie.parquet.small.file.limit", "0").
// 每次操作之後都會進行clustering操作
option("hoodie.clustering.inline", "true").
// 每4次提交就做一次clustering操作
option("hoodie.clustering.inline.max.commits", "4").
// 指定生成文件最大大小
option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824").
// 指定小文件大小限制,當文件小於該值時,可用於被 clustering 操作
option("hoodie.clustering.plan.strategy.small.file.limit", "629145600").
mode(Append).
save(basePath+t1);
// 創建臨時視圖,查看當前表內數據總個數
spark.read.format("hudi").load(basePath+t1).createOrReplaceTempView("t1_table")
spark.sql("select count(*) from t1_table").show()
以上示例中,指定了進行 clustering 的觸發頻率:每4次提交就觸發一次,並指定了文件相關大小:生成新文件的最大大小、小文件最小大小。
執行步驟:
1、生成數據,插入數據。
查看當前磁碟上的文件:
查看錶內數據個數:
查看 spark-web 上 該 sql 執行讀取的文件個數:
所以,當前表中共100條數據,磁碟上生成一個數據文件,在查詢該表數據時,只讀取了一個文件。
2、重複上面操作兩次。
查看當前磁碟上的文件:
查看錶內數據個數:
查看 spark-web 上 該 sql 執行讀取的文件個數:
所以,目前為止,我們提交了3次寫操作,每次生成1個數據文件,共生成了3個數據文件,當查詢所有的數據時,需要從3個文件中讀取數據。
3、再進行一次數據插入:
查看當前磁碟上的文件:
查看錶內數據個數:
查看 spark-web 上 該 sql 執行讀取的文件個數:
結論:
1、配置了hoodie.parquet.small.file.limit之後,每次提交新數據,都會生成一個數據文件。
2、在 clustering 之前,每次讀取表所有數據的時候,都需要讀取所有文件。
3、提交第4次數據之後,觸發了 clustering ,生成了一個更大的文件,此時再讀取所有數據的時候,就只需要讀取合併後的大文件即可。在.hoodie文件夾下,也可以看到 replacecommit 的提交:
小文件合併+sort columns解析
執行程式碼:
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
val t1 = "t1"
val basePath = "file:///tmp/hudi_data/"
val dataGen = new DataGenerator(Array("2020/03/11"))
var a = 0;
for (a <- 1 to 8) {
val updates = convertToStringList(dataGen.generateInserts(10000))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 1));
df.write.format("org.apache.hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, t1).
// 每次寫入的數據都生成一個新的文件
option("hoodie.parquet.small.file.limit", "0").
// 每次操作之後都會進行clustering操作
option("hoodie.clustering.inline", "true").
// 每4次提交就做一次clustering操作
option("hoodie.clustering.inline.max.commits", "8").
// 指定生成文件最大大小
option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1400000").
// 指定小文件大小限制,當文件小於該值時,可用於被 clustering 操作
option("hoodie.clustering.plan.strategy.small.file.limit", "1400000").
// 指定排序的列
option("hoodie.clustering.plan.strategy.sort.columns", "fare").
mode(Append).
save(basePath+t1);
// 創建臨時視圖,查看當前表內數據總個數
spark.read.format("hudi").load(basePath+t1).createOrReplaceTempView("t1_table")
spark.sql("select count(*) from t1_table where fare > 50").show()
}
執行程式碼分析
該程式碼比之前程式碼修改了幾個地方:
1、增加了for循環:
因為我們已經知道了在8次提交之後,小文件會合併大文件,所以一個for循環,做8次提交,我們直接看結果就行。
2、增加了 hoodie.clustering.plan.strategy.sort.columns 配置:
這是本次主要的測試點。該配置可以對指定的列進行排序。
即,當做 clustering 的時候,hudi 會重新讀取所有文件,並根據指定的列做排序,這樣可以把相關的數據聚集在一起,可以做更好的查詢過濾(後面會演示說明),而我們要做的對比,就是以 fare 為條件查詢數據,觀察在 clustering 前後,hudi 會讀取的文件個數。
我們想要的結果是,在 clustering 之前,由於沒有根據 fare 對數據任何處理,符合過濾條件的數據會分布在各個文件,所以會讀取的文件個數很多,過濾效果差。而在 clustering 之後,會根據 fare 列對數據做重新分布,符合過濾條件的數據較為集中,那麼讀取的數據就會比較少,過濾效果較好。
3、修改了 hoodie.clustering.plan.strategy.target.file.max.bytes 和 hoodie.clustering.plan.strategy.small.file.limit
我們想測的是,clustering 前後過濾的效果,所以文件個數不能夠被改變(否則4個文件合併成1個文件後,讀取數據時也只會讀取1個文件,就看不出來sort是否有效果),所以這裡把該值設置成兩個較為近似的值,使其既能夠觸發 clustering,又能夠在 clustering 前後文件個數相同。
執行結果:
查看當前磁碟文件:
查看第5次的sql過濾結果:
查看第6次的sql過濾結果:
查看第7次的sql過濾結果:
查看最後一次的sql過濾結果:
結論:
1、在 clustering 之前,過濾 fare 列時,會讀取所有的數據。
比如,在執行第5次過濾時,此時表總共有50000行數據,hudi就會掃描50000行數據;在執行第6次過濾時,此時表總共有60000行數據,hudi就會掃描60000行數據;在執行第7次過濾時,此時表總共有70000行數據,hudi就會掃描70000行數據,
2、在 clustering 之後,數據文件個數不變的情況下(前後都是8個數據文件),在第8次過濾時,能夠有效應用sort columns的重排列數據,將本應掃描80000行數據降低到只掃描了50405行數據,過濾效果明顯提升很多!!