李呈祥:bilibili在湖倉一體查詢加速上的實踐與探索

file


導讀: 本文主要介紹嗶哩嗶哩在數據湖與數據倉庫一體架構下,探索查詢加速以及索引增強的一些實踐。主要內容包括:

  • 什麼是湖倉一體架構
  • 嗶哩嗶哩目前的湖倉一體架構
  • 湖倉一體架構下,數據的排序組織優化
  • 湖倉一體架構下,索引增強與優化的實踐探索

01 什麼是湖倉一體

當我們講湖倉一體時,涉及到數據湖和數據倉庫兩個概念。

什麼是數據湖?通常來說,它有以下幾個特點:

  • 有一個統一的存儲系統,所有的數據都放到這個統一的存儲系統里,沒有數據孤島。
  • 支援任意數據類型,比較自由,包括結構化、半結構化和非結構化的數據。這些不同類型的數據都可以統一放到存儲系統里。
  • 對於多個計算引擎是開放的,包括實時、離線的分析等,計算引擎很豐富。
  • 有比較靈活的數據處理介面。有基於SQL這種級別的數據處理,也可以像基於Spark,提供更底層的,基於Dataset甚至RDD這種層面的,更甚者對於機器學習一些場景去對數據進行解析。也可以直接用文件系統,通過API直接讀取文件。
  • 因其靈活性,數據品質相對較低,比較難管理。

因此,基於數據湖的靈活與易用,它非常適合用於基於未知數據的探索與創新。

什麼是數據倉庫?它有以下幾個特性:

  • 強格式(schema),事前對數據進行建模
  • 有封閉的數據格式與存儲,不對其他引擎開放
  • 查詢效率高:存儲與計算的緊密結合優化,豐富的索引/預計算等支援
  • 數據入倉,無論是實時的還是離線的,或者說數據的存儲組織由數倉內部而非寫入任務決定
  • 數據品質高,容易運維管理,建設成本高

因此,高效和可靠的特性使得數據倉庫非常適用於基於已知數據的分析與決策。

嗶哩嗶哩和大部分互聯網公司一樣,之前的大數據平台都是基於開源的Hadoop生態系統。存儲用的是HDFS,計算引擎則有Hive,Spark以及Presto等。在我看來,這是一個比較典型的數據湖的架構。但是,我們也有很多交互分析的需求,為了解決這些需求,我們會引入特定的分散式數倉,引擎使用的是ClickHouse。

這樣就會引入新的問題。

比如,針對某個數據產品或者數據服務,為了對其提供比較好的查詢效率和性能,需要先把數據從HDFS入倉到ClickHouse,使得整個的流程變長。其次,會帶來數據冗餘的問題,因為數據在HDFS上有一份,在ClickHouse上也有一份。第三,在入倉過程中,可能會對數據做一些操作,使得數據發生變化,導致很難與HFDS里的其他數據進行關聯,導致數據孤島出現。這兩年,像Iceberg,Hudi以及Delta Lake這種數據湖的存儲格式也逐漸被很多公司引入去解決上述問題。對我們來說,我們主要使用的是Iceberg這種引擎去解決數據湖和數據倉庫之間存在gap的問題。

在我們看來,湖倉一體的目標有三個:

第一,希望它還是像數據湖一樣靈活。主要是我們還是用統一的HDFS存儲,和之前的SQL on Hadoop生態系統無縫兼容,包括基於Spark或者Flink ETL數據接入,SQL/ML/DataSet等各層次API訪問以及Presto/Spark/Hive等多種計算引擎的支援。

第二,希望它像數據倉庫一樣高效。針對於寫成Iceberg格式的表,我們希望它能夠做到或者接近於專用的分散式數倉的高效查詢效率,包括數據分布組織、索引、預計算、計算存儲一體化/快取等整體的增強和優化。像Iceberg本身提供了粗粒度這種事務的能力,使得我們能夠擁有支援更多的變化讀寫,實時進實時出倉的額外能力。

第三,希望它能像風一樣自由,這個是對用戶而言,希望可以做到智慧化,用戶的使用門檻更低,易用性更高。之前,用戶想要使其ETL結果的數據分析查詢效率更高的話,需要關注很多方面,比如寫出的數據是不是小文件會非常多,ETL里的SQL邏輯怎麼寫使得寫出去的數據怎樣排序,是否需要做預計算等,並且這些方面和用戶的業務邏輯沒有關係。

用戶想追求比較高的查詢效率,這些在原來的方案中,用戶需要考慮這些事情,也就是說,用戶需要自己變成一個大數據專家才能解決這些問題,並且還要自己去開發,有額外的ETL任務,因此,對用戶的門檻就比較高。第二步,它需要做到更自動化。對用戶來說,只需要考慮常用的過濾欄位,聚合維度和統計項等是哪些。後台的這種自動服務可以幫助解決數據的組織、排序等問題。第三階段,希望能做到智慧化。用戶只需要關注表裡有哪些欄位,算什麼類型等業務資訊。後面的這些數據的組織,索引的構建以及預計算等全部都能被自動化。用戶不需要關心這些,只需關注基於某張表,怎麼寫SQL表達業務邏輯就行。

file

02 湖倉一體架構

在嗶哩嗶哩,湖倉一體架構的核心是Iceberg,這是我們在Hudi,Delta Lake以及Iceberg這三個中進行選擇的最終結果。整個的數據處理流程架構大概是這樣的:實時的數據在Kafka里,通過Flink實施ETL,將數據以Iceberg格式寫到HDFS,離線的數據則通過Spark寫入HDFS。對於用戶來說,數據寫到Iceberg時,我們的後台有一個自研的Magnus服務,會針對數據落地到HDFS上的Iceberg表進行持續的整體的組織優化。具體如何進行優化,會在下一個部分詳細介紹,主要是運用Spark任務。

在分析端,我們用的是Trino做查詢引擎,它是PrestoSQL改名後的稱呼。同時,我們還用了Alluxio,因為Iceberg里元數據以及索引數據相較於原始數據來說,數據量都比較小,所以我們將其快取到Alluxio,方便快取加速。Magnus則是我們基於湖倉一體核心Iceberg的數據管理服務。它主要的任務是對於Iceberg數據做優化和管理,包括基本資訊的展示,比如表/分區/文件以及Snapshot等。在其內部還有一個scheduler,用於數據優化作業的調度。無論是離線的批任務,還是實時的任務,把數據寫到Iceberg的表裡的時候,都會把這些commit event發送到Magnus,裡面會有一個隊列,scheduler會根據制定的特定policy去消費隊列,去決定對哪些Iceberg表做相應工作,拉起對應的Spark任務做具體事情。

file
file

之前,基於Iceberg,Hudi或者Delta Lake的湖倉一體架構,大家比較關注或者說應用場景比較多的就是實時數倉。因為它們的對於粗粒度的事務的支援,去解決提供(近)實時數倉的能力。對於我們來說,除了一些比較重要的,比較獨立的數據產品服務我們會放到專門的像ClickHouse這樣的分散式數倉里去做刪除和查詢。實際上,我們在數倉建設過程中,還是有大量的業務場景還是基於之前的Hadoop的數據湖的架構上,我們數倉開發部門的同學基於這種數據湖架構去做數倉的建設,比如從ods到dwd等不同的分層建模。實際上,大部分數倉建模工作,其數據還是寫到HFDS上,然後應用Presto或者Spark去做分析。

在這種場景之下,我們湖倉一體架構的目標是如何加速查詢性能,使其效率可以達到或者接近專門的分散式數倉那樣。我們分析里開源的湖倉一體方案以及分散式數倉的性能Gap,這就涉及到Runtime引擎、存儲以及預計算等性能方面上的比較,這裡面涉及到的性能相關的因素非常多。本文主要分享在存儲里的排序組織以及索引上的一些探索和實踐。為什麼選擇這兩個因素,主要是我們的調研結果認為它們是開源的湖倉一體方案與分散式數倉的性能gap里最明顯的部分。

file

03 數據的排序組織

首先,我們來看數據的排序組織。

說到典型的數據分析場景,我們這次做的分享是基於star schema 作為一個benchmark的多維分析場景,整個數據模型就是一個事實表外加多個維度表,查詢的模式也是比較固定的。先關聯,再過濾,接著聚合,最後對結果做排序。其中的過濾條件可以是等值過濾也可能是範圍過濾,而過濾欄位可以是高基數欄位也可能是低基數欄位。

因此,在這種典型的多維分析場景下,也是我們實際業務中會經常遇到的問題是:我們如何在各種類型欄位及各種過濾條件下,執行查詢時只讀取需要的數據,而不是做全表的掃描?這裡有兩個比較關鍵的點,通過數據的組織外加索引,使得我們只查詢讀取SQL邏輯上所需要的那部分數據。

file
file

就索引來說,比如Iceberg,它已經默認提供文件級別的MinMax索引,在Meta文件里,它會記錄每列的Min和Max值。

舉個例子,我們有四個文件,在Meta文件里就會記錄相應的Max和Min值,那麼對於下圖的查詢案例,我們可以通過age=17對文件進行過濾,按照age對數據文件之間的數據進行排序,那麼在這個查詢中,讀取時的過濾效果會非常好,可以不用讀取其中的三個文件,因為通過Iceberg的Meta文件,可以判斷出來我們只需要讀取文件2,提高了查詢效率。

file
file

但是,我們做好排序之後,比如說我們另外有一個查詢,需要根據obd這個欄位去做過濾,這時候就會產生問題。通常,用戶的查詢里過濾的欄位不止一個,我們在做排序設定的時候,當全局依次用a,b,c三個欄位做排序,對a的數據聚集性是最好的,越往後的欄位聚集性越不好。如果欄位a的基數比較高,那麼對於欄位b,甚至後面的欄位c等,可能就完全沒有過濾效果。在用這些欄位進行具體查詢時,過濾就基本無效。

這是MinMax索引排序經常會遇到的一個問題,一種解決方案是使用projection的方式——按照另外欄位再做一次排序來保存數據。而在嗶哩嗶哩,採用的是引入Z-Order排序的方式。

Z-Order具體是什麼呢?舉例來說,我們有a,b,c三個欄位,用這三個欄位進行排序的時候,我們希望同時可以保證它們的聚集性,而不是像global order那樣優先保證a的,其次是b和c的。我們希望在最終的排序結果里三個欄位都能有一定的聚集性。

具體怎麼做呢?實際上,Z-Order是把天然沒有有序性的多維數據以某種方式映射成一維數據進行比較。映射後的一維數據,能夠保證各個原始維度按照同種程度去保證其聚集性。舉一個簡單的例子,如下圖所示,對X,Y這兩個維度進行比特位的交叉組值,形成了Interleave Index進而得出一個新的值,這個值被稱作Z-Value。從圖中,可以看到針對X,Y這兩個欄位的數據,生成的z-value會呈現出一個Z形嵌套。這樣的一個結構,在進行切分時,能夠同時保證X,Y兩個欄位的聚集性。

file
file

這種聚集性,又會帶來一個新的問題:我們支援的數據類型不止Int類型,如何將
Int/Long/String/Date/Timestamp等各種類型數據進行正整型轉化,進行Interleave Index計算以及計算出相應Z-Value?

因此,實現Z-Order 的一個前提是需要保證數據以保序的方式映射成一個正整型。對於Int類型的數據,可以做首位比特逆轉來實現;對於其它類型的數據,實現的方式都不太一樣,比如對String類型,會取固定的前幾位來進行排序。

但是保序映射也存在問題:

第一,從原始值到映射值的過程中可能會丟失數值資訊。比如String類型的數據,如果只取前幾位的話,後面的資訊就丟失掉了。

第二,映射值的分布無法保證從0開始是正整形,導致z-value不符合Z-Order曲線的嵌套分布。比如,X的取值是0,1,2,3,4,5,6,7,Y的取值是8,16,24,32這種,計算出來的z-value排序效果實際上和數據按照order by y,x的效果是一樣的。也就是說這種排序並沒有帶來額外的好處,對於X的聚集性無法保證。

file

因此,我們引入了Boundary-based interleave Index這種計算方式。它主要對Spark RangePartitioner進行了一些改造,實現了一個新的排序方法。以下圖為例,我們需要對city和age兩個欄位進行Z-Order排序,我們對這兩個欄位進行數據取樣,取樣之後,對每個欄位進行排序後再繼續取樣。把boundary取樣出來之後,對於進來的數據,Spark的shuffle partition會把這個值和boundary進行比較,取boundary的index值去進行計算出它的z-value值。因為我們是按照boundary的index值進行計算,所以z-value肯定是從零開始的正整型。

file

下圖是Z-Order的一個效果呈現,具體來說,我們對所示的三個欄位進行Z-Order排序,然後我們發現它可以做到百分之八十多的data skipping,即百分之八十的數據在在查詢時不用被讀取。

file

另外,我們還支援了基於Hibert曲線的排序,Z-Order排序存在一個缺陷就是它會存在跨度較大的連接線,這樣會導致在文件切割時,如果大跨度連接線被包含在某個文件里時,會導致這個文件的Min,Max跨度很大,data skippingde效果就大打折扣。Hibert曲線就不存在連接線跨度非常大的問題,效果也就比Z-Order更好,如下圖所示。

file

我們支援了Z-Order排序組織之後,再加上MinMax索引,它們在Star Schema Benchmark(SSB)的效果如下圖所示。可以發現文件讀取數量和查詢速度都有非常大的提升。但是Z-Order排序欄位越多,排序效果也會越差,因此我們建議2-4個。如果不進行數據的組織排序,MinMax索引的過濾效果就會非常有限。

file

04 索引的增強

針對Iceberg,我們引入了基於文件級別的BloomFilter索引的支援,每個表的每個欄位可以創建BloomFilter過濾器。針對SSB,我們增加了兩個額外測試,一個是等值的數據查詢,另一個則是範圍過濾的數據查詢。如圖所示,加了BloomFilter後的等值數據查詢,讀取的文件數量大大減少,查詢速度也有很大的提升。但是對於範圍查詢,BloomFilter這種索引並不支援根據範圍過濾條件過濾數據文件。

file

因此,我們引入了BitMap索引的支援。

如下圖的totalprice欄位,我們引入BitMap索引,過濾條件為totoalprice小於19時,可以把2和18的BitMap進行一次或運算的操作,進而判斷操作結果是否大於零。大於零就說明這個文件里包含所需要的查詢數據,需要讀取這個數據。

BitMap還有一個額外的好處在於:當過濾條件有兩個的時候,我們需要查詢totalprice小於19並且city是「United ST005」的數據。對於這兩個查詢,單獨查詢需要的數據可能在某個文件里都存在,但是當我們這對這兩個條件做BitMap且運算後會發現某個文件里同時滿足這兩個條件的數據並不存在,因此我們可以不用讀取這個文件。也就是說,BitMap的交並運算可以更好地在複雜過濾條件的情況下過濾掉更多的數據文件。

file
file

但是BitMap有兩個比較重要的、功能實現上需要解決的問題。

其一,進行範圍過濾時,比如需要查詢price小於51的,需要把2到50的這些數據進行編碼和計算,大量的讀寫和計算會非常影響查詢效率。其二,針對每一個基數,都需要存儲對應的BitMap,存儲的代價比較大,尤其是高基數的欄位。

針對第一個問題,我們引入了Range Encoded的BitMap去解決問題。舉例來說,針對price欄位數值為18的值,我們存儲的不是它的BitMap,而是存儲了它與數值為2的BitMap的或運算結果。簡單說來,如果BitMap欄位里有一個值是1,那麼其後面的所有值都是1。通過這種編碼的BitMap,我們就可以保證對於範圍查詢而言,我們都可以優化成只需要最多兩個BitMap的值就能取出任意條件範圍內的值。

file

為了解決第二個問題,我們引入了Bit-Slice Encoded的BitMap索引。

在下圖的例子中,我們看到price的取值有2,18,20,33等具體值以及相應的BitMap,對這些具體值進行按位切分,圖中的「Comp 0」代表第零位。比如對於第零位為8的值,有18和188,那麼就對這兩個值進行BitMap的或運算並且將其結果存儲到Comp 0里值為8的對應Bit Map里。對於其它的位,也是以同樣的方式進行存儲。這樣之下,如果我們之前有256個基數,經過Bit-SLice Encoded之後,我們就只需要30個BitMap了,而不是之前的256個BitMap。

file

我們還可以把Bit-Slice Encoded的BitMap從十進位進化成二進位的位數表示,此處不做詳細介紹。

總的來說,我們可以把Bit-Slice Encoded和Range Encoded的BitMap進行結合,對於二進位的Bit-Slice Range Encoded的BitMap,可以把256個基數的BitMap轉化成只需要9個BitMap的結果。

file

基於以上兩點,BitMap很好的解決了沒有排序的數據組織中的高基數的範圍查詢的問題。它的SSB結果中顯示查詢效率有1-10倍的提升,讀取文件數量則有0-400倍的減少。也就是說,不僅是查詢性能的提升,也能使計算引擎的負載有很大程度的減少,硬體資源可以有更多的存儲。

file

以上這些工作,不僅是在Iceberg這個項目以及相關Spark項目上做了對應的擴展和改造去實現的,也有SQL層面上的介面擴展。比如支援新的API,包括在Iceberg里,對Spark 3進行的語法擴展,通過distributed by支援文件間的排序,排序方式有哈希, Range, Z-Order以及Hibert曲線。Locally ordered by則是支援文件內的排序。也就是說用戶可以自定義文件間和文件內的排序方式。後續的就是Magnus通過optimizer具體做相應數據優化任務。我們支援上文提到過的BitMap和BloomFilter文件級別的索引,通過Iceberg的Actions可以拉起相應Spark任務,去做對應的寫索引和刪除索引的操作。

file

通過對於數據的排序的組織,以及索引的支援,我們也總結了在多維分析的數據場景下的配置策略,如下圖所示。這些策略使得我們能夠支援任意多欄位,任意過濾類型,在絕大部分多維分析場景下,只訪問盡量少的文件,加速查詢。

file


今天的分享就到這裡,謝謝大家。
本文首發於微信公眾號「DataFunTalk」。