基於Delta Lake構建數據湖倉體系

直播回放地址//developer.aliyun.com/live/249789

導讀: 今天很高興能與大家分享如何通過 Delta Lake 構建湖倉架構。

全文將圍繞以下四個部分展開:

  • Delta Lake 的基本概念和發展歷程,以及 2.0 版本的關鍵特性
  • Delta Lake 的內核解析以及關鍵技術
  • 圍繞 Delta Lake 湖格式的生態建設
  • Delta Lake 在數倉領域的經典案例

Delta Lake 及 2.0 特性

關於數據湖,數倉以及數據湖倉的概念已經在很多文章及分享中介紹得比較多了,相信大家也都有所了解,在此就不過多重複了,讓我們直接來看由 Databricks 提出的數據湖倉 Lakehouse 的關鍵特性有哪些。

  • ACID 事務。一張表可以被多個工作流來讀寫,事務可以保證數據的正確性。
  • Schema Enforcement 和數據管理。Schema Enforcement 也可稱作 Schema Validation,在數據寫入時,檢驗數據的 schema 是否能被表所接受,從而來保證數據質量。同時,我們還會對錶做一些管理運維操作。
  • 支持 BI。湖倉中存儲的數據可以直接對接到 BI 系統進行數據分析。
  • 支持結構化、半結構化、非結構化數據。數據湖倉提供了統一的、中心化的存儲,能夠支持各類型的數據。
  • 開放性。使用開放、開源的存儲格式,如 Parquet 和 ORC 等作為底層的存儲格式。
  • 支持多類 API。除了 SQL 以外還可以支持如 dataframe 或者機器學習的 API,用以解決 SQL 無法實現的場景。
  • 批流一體。簡化流式和離線兩條數據 ETL 鏈路,同時降低存在的管理和運維成本。
  • 存算分離。每個公司和團隊都會關心成本問題。存儲和計算分離,按需伸縮,可以更好地實現成本管控。

如上,我們可以發現湖倉大部分的特性是由湖格式來承載和支持的,這就是當前 Delta Lake,Iceberg 和 Hudi 能夠興起的主要背景和原因。

我們接下來看一下 Delta Lake 的功能迭代和發展歷程

如圖上半部分是社區近年的發展,下半部分是 EMR 在 Delta Lake 上的一些進展。我們來介紹幾個關鍵點,首先在 2019 年 6 月份 Databricks 將 0.2 版本作為第一個 release 版本,2020 年的 0.6 和 0.7 版本分別是 Spark2 的最後一個版本和 Spark3 的第一個版本,並從 0.7 版本之後開始支持 DML SQL 的語法,今年(2022 年)的 1.2 和剛剛發佈的 2.0 版本放出了比較重大的一些新特性。

阿里雲 EMR 對 Delta Lake 的跟進是比較早的,我們從 2019 年就實現了一些比較關鍵的特性,包括常用 SQL 的覆蓋,Z-Order 和 data skipping 的能力,同時我們也逐步解決了 Metastore 同步的問題,實現了無縫與其他產品的訪問。Time Travel 也是我們在 Spark2 上就較早支持的功能,目前社區是在 Spark3.3 之後才開始支持 Time Travel 的特性。EMR 這邊也提供了自動的 Vacuum 和自動 Compaction 的能力。在數倉場景支持上,EMR 提出了 G-SCD 方案,通過 Delta Lake,藉助其 Time Travel 的能力在保留原表結構上,實現了 SCD (Slowly Changing Dimension) Type2 的場景,同時 EMR 也支持了 Delta Lake CDC,使得可以將 Delta 表作為 CDC 的 source 實現增量數倉。

我們再來介紹一下大家比較關注的 Delta Lake 2.0 的一些關鍵特性

  • Change Data Feed
  • Z-Order clustering
  • Idempotent Writes
  • Drop Column
  • Dynamic Partition Overwrite
  • Multi-Part Checkpoint

Change Data Feed 和 Z-Order 這些比較重要的特性我們會在之後再重點介紹。

這裡着重介紹 Drop Column,它可以結合 1.2 版本發佈的一個特性 Rename Column 一起來講,這類的 Schema 演化都是依賴於 Column-Mapping 的能力。讓我們先通過對比 Add Column 和 Change Column 來思考一下,數據寫入了 Delta 表之後,Delta 保存了 Schema 信息,同樣 Parquet 層面也會保留相同的 Schema 信息,兩者是完全一致的,都是以字段名來做標識和存儲的。在這樣的實現下,單純的 drop column 還是可以實現的,但是如果在 drop 之後緊跟着 add 一個同名的 column 會是如何呢?這就需要我們在 Delta Schema 和 Parquet Schema 之間做一層映射關係,將每一個字段都能映射到一個全局唯一的標識符,而 Parquet 則保存這些唯一的標識信息。這樣的實現下,當我們進行一個重命名列的操作,就可以轉化成了對 mapping 配置的修改。

Dynamic Partition Overwrite 只是一個社區一直沒支持的語法,大家都比較了解不過多解釋。Multi-Part Checkpoint 是用來提高元數據加載效率的特性,Checkpoint 具體是什麼我們接下來會進一步討論。

Delta Lake 內核剖析及關鍵技術

1. Delta Lake 文件布局

Delta Lake 的元數據由自身管理,不依賴於 Hive Metastore 這樣的外部元數據存儲。圖中的介紹文字分為綠字和橙字兩部分,下部橙色標識的是普通的數據或者目錄文件,它和普通表沒有什麼區別,也是以分區的結構管理。區別在於上面元數據的部分,這部分有三類文件,第一類是 json 文件,它記錄的是每次 commit 之後產生的信息,每次 commit 會生成一個新的 json 文件;第二類是 checkpoint.parquet,它是由前一次的 checkpoint 文件及其之後的 json 文件合併而來,它的作用是用於加速元數據解析;第三類是 _last_checkpoint 文件,它存儲的是上次 checkpoint 的版本號,來快速定位到需要讀取的 check point 文件,可以看到前兩類文件是元數據的核心。

接下來讓我們深入了解 Delta Lake 元數據的組成。

2. Delta Lake 元數據——元素

首先我們來介紹一下基本概念,一張表通常是由兩部分組成的,一部分是數據,一部分是元數據。元數據通常存儲在 Hive Metastore 中,數據存儲在文件系統上。Delta Lake 表也是一樣的。它與普通表的區別在於它的元數據是自己管理的,和數據一起存放在自己的文件系統的目錄下;另外表路徑下存放的數據文件並不是全部有效的,我們需要通過元數據來標出來哪些數據文件是有效的,哪些是無效的。Delta Lake 所有的元數據操作都被抽象成了相應的 Action 操作,也就是所有表的元數據都是由 Action 子類實現的,讓我們看目前都有哪些 Action:

  • Metadata:保存表的 Schema,Partition 列,及表配置等信息。
  • AddFile:commit 中新加入的有效的數據文件。
  • RemoveFile:commit 中刪除標記為無效的文件。
  • AddCDCFile:commit 中新增的 CDC 文件。
  • Protocol:Delta 讀寫協議,用來管理 Delta 不同版本的兼容問題。
  • CommitInfo:記錄 commit 操作的統計信息,做一些簡單的審計工作。
  • SetTransaction:存儲 Streaming Sink 信息。

3. DDL/DML 組織

在認識了 Action 的元素之後,我們就需要知道不同的操作會對應到哪些 Action 集合,我們以圖中的幾個例子來說明。首先我們可以看到表中的所有操作都會生成 CommitInfo,它更多起到的是審計作用,而沒有實際作用。

接下來讓我們看具體操作:

  • Create Table。由於只是定義了表,那麼僅使用 Metadata 來保存表的元數據信息即可。
  • CTAS(Create Table As Select) 。在創建表的同時會加載數據,所以同時會有 Metadata 和 AddFile 的 Action。
  • Alter Table。除了 Drop Partition 以外其他的 Alter Table 操作都只是修改元數據,所以這裡也只需要修改 Metadata。
  • Insert/Update/Delete/Merge。 在沒有涉及到 Schema Evolution 的 DML 情況下不會修改元數據,因此不會有 Metadata。我們以 Update 為例,Delta Lake 會先讀取 Update 語句中 where 條件可能涉及到的文件,加載這部分數據,然後使用 Update 語句中 set 部分修改掉需要修改的部分,然後連同原文件中不需要修改的部分一起重新寫到一個新的文件。這就意味着我們會把讀取的文件標記為老文件,也就是 RemoveFile,而新的寫入文件我們會使用 AddFile 來標識。

4. 元數據加載

接下來我們來看一下基於 Action 元素如何構建一個表的快照。

首先嘗試尋找 _last_checkpoint 文件,如果不存在就從 0 號 commit json 文件讀到最新的 json 元數據文件;如果存在,會獲取 last_checkpoint 記錄的版本號,找到這個版本號對應的 checkpoint 文件,及其之後版本的 commit json 文件,按照版本依次解析元數據文件。通過圖中的 6 條規則得到最新的 snapshot 的元數據。最終我們會得到一個最新的 Protocol,Metadata 與一組有效的 AddFile,只要有了這三個我們就知道了這個表的元數據和數據文件,從而組成當前的一個比較完整的快照。

5. Delta Lake 事務

ACID 事務是湖倉一個比較重要的特性,對於 Delta Lake 來說它的 ACID 事務性是以成功提交 json 文件到文件系統來標識此次 commit 執行成功來保證的,也就是對於多個並發寫入流能首先將 json 文件的某個版本提交到文件系統的就是提交成功的流。如果大家對存儲有所了解,就可以看出來 Delta Lake 事務的核心依賴於數據所在的文件系統是否具備原子性和持久性。這點讓我們稍作解釋:

  • 一個文件一旦被寫入,一定是一個完全可見或者完全不可見的,不會存在讀取正在寫入的不完整數據文件。
  • 同一時刻只有一個寫入端能夠完成對某個文件的創建或者重命名操作。
  • 一個文件一旦被寫入,後續的 List 操作是一定可見的。

對於並發控制協議,Delta Lake 採用的 OCC,關於該協議的具體原理就不過多展開了。

對於衝突檢測,Delta Lake 是支持多個流的同時寫入的,這也就造成了勢必會有衝突的可能。讓我們以一個例子來說明。假設用戶當前已經讀到了版本號為 10 的文件,並且想將更改後的數據向版本 11 提交,這時發現已經有其他用戶提交了版本 11,此時就需要去檢測版本 11 與自己提交版本信息是否用衝突。檢測衝突的方式在於判斷兩個提交之間是否操作了相同的文件集合,如果沒有就會讓用戶嘗試提交為版本 12,如果版本 12 在這個過程中也被提交,那麼對繼續檢測。如果有衝突的話,會直接報錯,判斷當前的寫操作失敗,而不是強行寫入造成臟數據的產生。

6. Z-Order

Z-Order 是大家目前比較關注的一個技術。它是一個存在較早的概念,即一種空間的索引曲線,連續且無交叉,能夠讓點在空間位置上更加聚集。它的核心能力是能夠實現多維到單維的映射關係。

接下來我們以一個例子說明。如圖所示有 X,Y 兩列,X,Y∈[0,7],圖中所展示的是 Z-Order 的排序方式,將數據分為了 16 個文件。對於傳統排序來說,如果我們先對 X 再對 Y 做線性排序的話,我們會發現與 X 更靠近的元素會分到一個文件。例如, X 為 0,Y∈[0,3],4 個豎著的元素將存儲在一個文件中,這樣我們也就同樣可以生成 16 個文件。這時如果我們想查詢 4<=Y<=5,這時就需要我們將全部掃描下半部分的 8 個文件(豎著的 4 個元素為一個文件,Y∈[4,7])。如果我們使用 Z-Order 來排序的話可以看到我們就只需要掃描四個文件。

讓我們再舉一個例子,如果我們要查詢 2<=X<=3 and 4<=Y<=5,如果按照 Z-Order 來排序的話就只要掃描一個文件,按照傳統線性排序的方式需要掃描 2 個文件(X∈[2,3],Y∈[4,7])。可以看到在我們使用 Z-Order 之後需要掃描的數據量減少了一半,也就是說在同等計算資源的情況下我們的查詢時間可以減少一半,使性能提升一倍。從以上例子我們可以看出線性排序更關注的是當前排序的字段的聚集效果而不是空間的聚集效果。

7. Z-Order+Dataskipping

Z-Order 只是幫我們做了一個文件布局,我們要結合 data skipping 才能發揮它真正的效果。這兩個在功能上是各司其職互不干擾的,它們沒有任何的在功能上的耦合,但是它們卻必須是需要相互輔助的。我們可以想像一下,如果沒有 Z-Order 這樣的擁有良好聚集效果的文件布局,單獨 data skipping 是不能實現較好的文件過濾效果的,同樣只有 Z-Order 沒有 data skipping,其單純的文件布局也起不到任何的讀取加速的作用。具體的使用過程是:在寫入時,完成對數據 Z-Order 排列,寫入文件系統,並以文件粒度提取文件對應字段的 min-max 值,寫入如圖所示的 AddFile 的元數據 stats 中。在查詢時,使用 min-max 值做過濾,選出符合查詢條件的需要加載的文件,之後對數據再做過濾,從而減少文件和數據的讀取。

這裡有一個需要關注的點是如果當查詢模式改變了,比如說原來是基於 a,b 兩個字段做 Z-Order,但是一段時間之後主要查詢的是 c 字段,或者文件經過了多次寫入,它的聚集效果都會產生退化,這時就需要我們定期重新執行 Z-Order 來保證聚集效果。

Delta Lake 生態建設

上面我們提到了 Delta Lake 的一些基本概念,大家也可以看到基於目前的大數據架構我們沒法通過一個單一的系統來構建整體的大數據生態,接下來我們就來了解一下 Delta Lake 目前的生態系統如何輔助我們搭建大數據體系。

首先我們來看開源的生態。對於大數據組件我們可以粗略地分為存儲、計算以及元數據管理。元數據管理的事實標準是 Hive Metastore,存儲主要有 HDFS 及各雲廠商的對象存儲,各種計算引擎都有相對應的存儲接口。對於查詢來講,由於各個引擎的框架語義或者 API 的不同會導致每個湖格式都需要和查詢/計算引擎一對一的對接支持。

我們結合幾個典型的引擎來介紹一下目前的開源生態。

Delta Lake 本身就是 Databricks 公司開源的,所以它們對 Spark 的支持從底層代碼實現到性能上的表現都比較好的,只不過對於開源版本來說某些 SQL 功能還沒有完全開放或支持。阿里雲 EMR 的 Delta Lake 版本目前已經覆蓋了常用 SQL。

對於 Hive,Presto 和 Trino 來講目前社區已經實現了查詢的功能,寫的能力目前還不支持。以上三種引擎接口的實現都是基於 Delta Standalone 項目來實現和拓展的,該項目內部抽象了一個 Standalone 的功能來對接非 Spark 的計算和查詢引擎的讀寫功能。

這裡提幾個目前社區還沒有很好支持的點:

  • 通過 Spark 建表之後,是不能直接使用 Hive 等引擎去查詢的,需要用戶在 Hive 側手動創建一個外表才能做查詢。其原因是 Hive 查詢 Delta 表需要通過 InputFormat 來實現,而 Spark 側創建的 Delta 表在將元數據同步到 Hive Metastore 時,沒有獲取到正確的相關信息(其他表類型如 Parquet 和 ORC 等是在 Spark 源碼內硬編碼到了 HiveSerde 類中),也就沒法實現正確的元數據同步。我理解這個主要原因是 Spark 沒有考慮到這些場景,實現比較好的拓展能力,同時 Delta Lake 社區也沒有想將同步元數據相關的邏輯嵌入到其代碼實現中。

  • 在 Hive 中創建 Delta 外表不能指定分區字段,即使本身 Delta 是一個分區表,對於 Hive 引擎來言也將其視為普通表。這裡提一點,這樣的設計並不會引起性能的差異,Delta Standalone 內部依然會根據查詢條件進行分區裁剪。

對於以上兩點,阿里雲 EMR 已經做了比較好的支持:使用 Spark 建表會自動同步元數據到 metastore,然後直接通過 Hive,Presto,Trino 去查詢,不需要任何額外的操作。同時我們支持將表的分區特性正確的顯示在 Hive Metastore 中,避免用戶使用時的困惑。

另外,在 Hive 等基於 standalone 模塊實現的查詢引擎上查詢 Delta 表會存在元數據加載效率問題。如 Hive 查詢,Delta 表的元數據加載是在 Hive CLI 本地去完成的。元數據比較大的情況下,會佔用大量的內存和時間。在 EMR 上我們實現了 emr manifest 元數據加速的能力,在每次寫入時將最新快照關聯到的 AddFile 信息提前寫入到文件系統中,查詢時跳過元數據加載來解決該場景下的元數據加速問題。

同時我們在 Presto/Trino 上支持了 TimeTravel 查詢和 dataskipping 優化。

最後對於 Flink 的寫入,Delta 在 0.4 版本開始社區發佈了 Flink sink 的功能,在 0.5 發佈了 Flink source 的功能。

接下來我們來介紹一下阿里雲生態對 Delta Lake 的支持。我們目前已經實現了 Dataworks,MaxCompute,Hologres 對 Delta 表的查詢;對接並支持使用阿里雲數據湖構建 DLF 作為元數據,助力實現更好的湖倉一體。同時我們也對接了 DLF 的湖表自動管理模塊,這點我們展開介紹一下。

在湖格式中我們引入了版本的概念和批流一體的功能,這會造成有一些歷史版本的數據在當前的快照下已經失效,或者在流式場景下產生一些小文件,再加上我們剛剛提到的 Z-Order 的效果會隨着時間退化,這些問題都需要我們對湖表進行一些管理的操作,如我們需要定期做歷史文件的清理,重新執行 Z-Order,以及做一些文件合併的操作等。DLF 這裡我們實現了自動化的湖表管理模塊,會實時感知表的版本更新,實時分析表的狀態(如有效文件佔比、平均文件大小等指標),結合策略中心預定義的策略來採取相應的操作,透明地幫助用戶實現表的管理。同時我們也拓展了對湖表生命周期的管理,對於一些老的分區如果我們使用頻率較低我們可以對其進行壓縮或者移到低成本的存儲中去。同時 DLF 的 data profiling 模塊也會實時統計表級別或者分區級別的各個維度的統計信息,更新到指標庫,用於進一步的查詢加速或者湖表管理等。

Delta Lake 經典數倉案例

最後我們來看一下 Delta Lake 經典數倉的案例。

Slowly Changing Dimension(SCD,緩慢變化維),SCD 是用來解決在數倉場景中隨着時間緩慢變化的維度數據的。根據對變化之後的新值的處理方式,定義了不同的 SCD 類型,這裡我們着重討論一下 Type2:通過新增一行記錄的方式來保存歷史值的這種類型。通常情況下我們在傳統數據庫內我們首先會在表中添加 Start 和 End 列來標識當前維度值的生效範圍,如果 End 值為空表示當前維度在最新版本是生效的,我們也可以再添加一列狀態列來表示當前維度值是否生效。更多的時候我們不會關注每一次變化,而只關心一個固定的業務周期或者一個時間段內最新的值是什麼。舉一個例子:我們將用戶和其所在地做成的一個維度表,假設用戶 A 從北京遷到杭州、武漢,在表中用戶 A 不同時間就會有不同的地址。我們想知道 2022 年 7 月 16 號用戶 A 的所在地,也就是它最終的所在地武漢,而不是關注用戶 A 早上從北京到了杭州,中午又從杭州到遷了武漢的過程。

SCD Type2 的傳統方案大致如下:通過實時流不斷獲取增量數據寫入到增量表中,當 T+1 的數據全部處理完後我們會和離線表的 T 分區做合併,從而生成離線表的 T+1 分區。在使用過程中我們只需要基於離線表,通過分區字段來指定到一個固定的粒度(如天)去查詢相關數據。這裡存在的缺點是離線表的 T 和 T+1 數據時高度重複大量冗餘的,這就造成了很明顯的存儲浪費,同時離線和事實兩條工作流也增加了管理和運營的成本。

那麼我們來看 Delta Lake 是如何解決以上問題的。剛才提到了我們更關心的是一個固定時間段內的最新值,所以我們將其命名為 G-SCD——基於固定粒度的緩慢變化維。Delta Lake 這樣的湖格式是具備多版本的概念的,那麼就可以利用 Time Travel 的能力查詢到歷史的某一個快照的數據,同時保障查詢性能和數據不重複存儲,EMR G-SCD 就利用上以上的特性來進行構建,讓我們來看具體的解決方案:

首先 MySQL 會將 binlog 同步到 Kafka,之後會由 Spark Streaming 來消費,最終我們會將數據提交給 Delta Lake。

整個流程看着和普通的流式寫入沒有什麼區別,但關鍵在於:

① 最終將數據和業務快照信息一起提交。

② Spark Streaming 會對 batch 數據按照業務快照進行切分,保證每次提交的僅包含一個業務快照內的數據,同時會將已經處理完的快照做 save point 來永久保留某版本。

在 G-SCD 的實現上有兩個核心的問題要解決:

① 業務快照與 Delta 版本之間的映射。如圖所示,通過每次 commit 關聯到一個具體的業務快照(Delta 版本 V7 和 V8 提交的都是業務快照 T 的數據),並且要求業務快照隨着 Delta 版本遞增(從 T-1,到 T,再到 T+1 業務快照)。這樣就可以將查詢某個業務的快照例如 7 月 15 號的數據,映射轉化成對某一個具體的版本的 Time Travel 去做查詢。

② Savepoint&Rollback。對於傳統方案來講,只要不主動刪除分區,分區是不會丟失的,而湖表則具備自動化清理歷史版本的能力。G-SCD 方案下我們並不是需要保留所有的版本,而是希望能夠指定某一個版本能夠保留而不被刪除,所以在這裡我們需要 save point 的功能。另外的一點是數據難免是有錯誤的,我們也就需要版本回溯的功能,能夠回溯到某一天的數據從而重新修補數據。這裡 rollback 的功能相當於社區 2.0 版本發佈的 restore 功能。

對流式數據處理比較熟悉的同學能會發現這裡存在數據漂移的問題,這個現象產生的原因就是前一個快照的數據到了下一個快照周期才到,那麼這個情況下我們該怎麼處理?G-SCD 會要求業務快照在 Delta 版本上是遞增的,這點已經提到。同時該方案也會要求上游 Kafka 的 Partition 是按照業務快照嚴格有序的,同時同一個 ID 的數據只能落到同一個 partition 內,這樣在處理某一主鍵的數據上就永遠不會出現錯序的情況。之後在 Streaming 的層面上我們會判斷每一個 batch 是否屬於同一個業務快照,如果是的話就直接提交,如果不是的話我們就僅僅提交業務快照周期小一點的數據,而將另一部分數據先做緩存。對於緩存機制,我們會先將首次出現的下一個快照數據暫存,先去處理由於合理數據漂移的前一個快照數據。在一定時間之後,當我們認為不會再有漂移數據的情況下我們才會將這部分數據提交。通過這樣的切分可以保證 Delta 側每一個 commit 只會對應到一個業務快照的數據。

接下來讓我們看 G-SCD 方案所具備的優點:

  • 批流一體,降低管理成本
  • 充分節省存儲資源
  • 充分利用 Spark/Delta 的查詢優化
  • 不需要像其他 SCD Type2 的實現方案那樣添加多個輔助字段;同時保留了傳統方案中使用 dt 作為分區方式,從而可以復用原有 SQL,使用戶遷移無成本。

該方案已經被阿里雲的客戶廣泛地應用到了生產實際當中。

Change Data Capture(CDC, 變化數據捕捉)。最後讓我們來講一下 CDC 場景,這裡涉及到 Delta 2.0 發佈的非常重要的 CDF 特性。CDC 是一個用來捕捉和識別數據的變化,並將變化的數據交給下游來做進一步處理的場景。CDF 是一種能讓表或者數據庫能夠具備吐出變化數據的能力。CDF 的輸出結果可以標識出數據做了什麼樣的變化,比如 insert,update 或者 delete,以及可以讓我們知道數據在變更前後的內容,CDF 同時也包含了版本數據變化的時間點和版本號信息。在 Delta Lake 中開啟 CDF 只需將 delta.enableChangeDateFeed 設為 true。

在沒有 CDF 之前,一般我們只能通過 MySQL 取 binlog 的形式實現到 ODS 層的增量數據更新,但是到下游 DWD,DWS 層我們只能通過低效的全量的方式去做數據更新了。當我們具有可 CDF 的能力之後我們就能夠實現將湖格式作為 CDC 的一個源實現從 ODS 到 DWD,DWS 的全鏈路的增量實時數倉。接下來讓我們來看一個具體的案例。

如圖我們定義了一個數據源和三張表。user_dim 是一張維表,user_city_tbl 表示用戶所在地,city_population_tbl 用來統計城市常駐人口。user_city_tbl 表的更新需要 source 源和 user_dim 表做 join 後寫入。city_population_tbl 表是通過對 city 字段做聚合產生的。現在讓我們將兩張表都開啟 CDF,看一下會有什麼數據產生。比如當前在上游來了兩條數據,user1 來自杭州,user5 來自武漢,通過 Merge 語句將數據加載到 user_city_tbl 中,如圖,user1 已經存在所以會更新地址信息,user5 為新用戶所以插入數據。對於更新操作會有兩條數據來表示,一條是 pre_update,表示更新前的舊值,一條是 post_update,表示更新後的新值。對於新插入的數據我們只要一條數據來表示插入操作,沒有舊值。對於刪除操作,CDC 當前的值表示它的一個舊值,沒有新值。

可以看到這裡輸出的格式採用了一種不同於大家比較熟悉的 MySQL binlog 或者 debezium 的格式,相比較而言,CDF 的實現方案對下游去做數據處理是更加友好的,它同時也包含了我們所需要的所有的信息,不需要做過多轉換。如果我們使用 binlog 或者 Debezium 的話還需要從 json 字符串中提取出我們需要的列的信息。

使用 user_city_tbl 的 change data 對下游 city_population_tbl 做增量更新,最終實現對 city_population_tbl 表中 bj 城市的人口數減一,對 hz 和 wh 的城市人口數加一。從這裡我們也看以看出 CDC 的輸出數據是需要包含 update 或 delete 數據的舊記錄的詳細信息的,不然就無法增量更新 bj 城市的人口數,準確的實現數據聚合的操作。

流程繼續,如果 city_population_tbl 表也需要用做 CDC source,開啟 CDF 之後的 CDC 輸出信息如右下圖所示。

最後再讓我們看通過 Delta Lake 實現 CDC 的設計和實現。

Delta Lake 通過 CDF 方案來實現 CDC,其理念是在必要場景下持久化 CDC 數據,儘可能地復用已有的數據文件,來充分平衡讀寫兩端。

不同於一些傳統數據庫它們有自己的常駐服務,可以在不影響寫入效率的情況下直接後台生成相關的數據,Delta Lake 僅僅作為數據存儲層的數據組織方式,數據讀寫的執行還是依賴於計算引擎本身,比如 Flink 或 Spark,其所有額外的開銷也需要在當前 commit 完成,從而會影響寫入效率。

之所以不採用完全依賴於查詢時通過類似版本間 Diff 或者 Join 的方式來實時計算出 Change Data,當然考慮的是查詢性能。在這裡通過一個場景來明確一下 CDC 的一個可能被忽略的點,即 CDC 需要感知到每一次相鄰 commit 間的變化,而不能僅僅是查詢方位內首尾兩個 commit 的變化。湖格式 CDC 是基於單 commit 來講的,也就是說如果有一條數據,第一次 commit 從 1 變到了 2,第二次 commit 從 2 變到了 3,那麼這兩次 commit 的 CDC 數據應該是從 1 到 2 再到 3,而不是直接由 1 到 3,部分 CDC 的實際生產場景要求這樣的能力。

在設計方案上 Delta Lake 提供了僅在無法簡單的通過當前 commit 信息獲取完整數據變更時才會持久化 CDC 的能力,這裡完整的 CDC 包含前值和新值,包含所有的操作以及時間戳和版本號的信息。這意味着可以直接讀取和加載 CDC 數據而不需要通過讀歷史的快照數據來計算得到。

在了解了以上 CDF 設計特點之後我們會發現,有一部分場景需要持久化 CDC,另一部分場景不需要持久化 CDC。我們先來聊一下不需要持久化 CDC 的場景,也就是哪些操作可以通過當前的 commit 信息直接返回 CDC 數據。這裡舉兩個例子,第一個是 Insert into,Insert into 語法新增加的 AddFile 不會對其他的數據有任何影響,其相關的元數據 commit json 文件中的元數據只有 AddFile,所以我們可以直接加載這些 AddFile 文件的數據,對每一條記錄加上 insert 的操作標識,同時添加 timestamp 和 version 信息,轉換成 CDC 的格式返回即可。第二個例子是 Drop Partition,這個功能在社區是沒有支持的,在阿里雲 EMR 上支持。它會將某一分區下的所有有效數據都標識為 RemoveFile,當我們讀取 commit json 文件時我們得到只有 RemoveFile 的文件列表,那麼我們就可以加載 RemoveFile 標識的數據文件,對於每一條數據添加 delete 的操作標識,並且加上 timestamp 和 version 信息。對於類似這樣的操作,CDF 的實現方案沒有增加任何的寫入開銷,直接復用已有的數據完成,加載轉換得到 CDC 數據返回。

那我們再來看一下哪些是需要持久化 CDC 的。如 Update 操作,需要將某個數據文件中的部分數據更新後,連同未更新的部分一起寫入到一個新的數據文件。這樣的場景下,就需要將更新的部分數據直接轉化成要輸出的 CDC 格式數據,然後保存到文件系統。在查詢時,對於這樣的 commit,直接去讀取它包含的 CDC 文件,加載返回 CDC 數據。持久化 CDC 數據的文件,就是通過剛才未詳細解釋的 AddCDCFile 這個 Action 來記錄的。

如上圖,CDF 方案下持久化的 CDC 寫入到單獨的 _change_data 目錄下(圖中紅色部分)。