使用Apache Flink 和 Apache Hudi 創建低延遲數據湖管道

近年來出現了從單體架構向微服務架構的轉變。微服務架構使應用程序更容易擴展和更快地開發,支持創新並加快新功能上線時間。但是這種方法會導致數據存在於不同的孤島中,這使得執行分析變得困難。為了獲得更深入和更豐富的見解,企業應該將來自不同孤島的所有數據集中到一個地方。
AWS 提供複製工具,例如 AWS Database Migration Service (AWS DMS),用於將數據更改從各種源數據庫複製到各種目標,包括 Amazon Simple Storage Service (Amazon S3)。但是需要將數據湖中的數據與源系統上的更新和刪除同步的客戶仍然面臨一些挑戰:

  • 當記錄存儲在 Amazon S3 上的開放數據格式文件(例如 JSON、ORC 或 Parquet)中時,很難應用記錄級更新或刪除。
  • 在流式使用案例中,作業需要以低延遲寫入數據,JSON 和 Avro 等基於行的格式最適合。但是使用這些格式掃描許多小文件會降低讀取查詢性能。
  • 在源數據模式頻繁更改的用例中,通過自定義代碼維護目標數據集的模式既困難又容易出錯。

Apache Hudi 提供了解決這些挑戰的好方法。 Hudi 在第一次寫入記錄時會建立索引。 Hudi 使用這些索引來定位更新(或刪除)所屬的文件。這使 Hudi 能夠通過避免掃描整個數據集來執行快速更新插入(或刪除)操作。 Hudi 提供了兩種表類型,每種都針對特定場景進行了優化:

  • Copy-On-Write (COW) – 這些表在批處理中很常見。在這種類型中,數據以列格式(Parquet)存儲,每次更新(或刪除)都會在寫入過程中創建一個新版本的文件。
  • Merge-On-Read (MOR) – 使用列(例如 Parquet)和基於行(例如 Avro)文件格式的組合存儲數據,旨在讓數據更加實時。

存儲在 Amazon S3 中的 Hudi 數據集提供與其他 AWS 服務的原生集成。例如可以使用 AWS Glue(請參閱使用 AWS Glue 自定義連接器寫入 Apache Hudi 表)或 Amazon EMR(請參閱 Amazon EMR 中提供的 Apache Hudi 的新功能)寫入 Apache Hudi 表。但這些方法需要對 Hudi 的 Spark API 和編程技能有深入的掌握,才能構建和維護數據管道。
這篇文章中將展示一種以最少編碼處理流數據的不同方式。本文中的步驟演示了如何在沒有 Flink 或 Hudi 知識的情況下使用 SQL 語言構建完全可擴展的管道。可以通過編寫熟悉的 SELECT 查詢來查詢和探索多個數據流中的數據,還可以連接來自多個流的數據並將結果物化到 Amazon S3 上的 Hudi 數據集。

解決方案概述

下圖提供了本文中描述的解決方案的整體架構。接下來的部分將會詳細描述描述組件和步驟。

image.png

使用 Amazon Aurora MySQL 數據庫作為源,使用帶有 MSK Connect Lab 中描述的 Debezium MySQL 連接器作為變更數據捕獲 (CDC) 複製器。本實驗將引導完成設置堆棧的步驟,以使用帶有 MySql Debezium 源 Kafka 連接器的 Amazon MSK Connect 將 Aurora 數據庫 salesdb 複製到 Amazon Managed Streaming for Apache Kafka (Amazon MSK) 集群。
2021 年 9 月,AWS 宣布 MSK Connect 用於運行完全託管的 Kafka Connect 集群。只需單擊幾下,MSK Connect 即可輕鬆部署、監控和擴展連接器,將數據從數據庫、文件系統和搜索索引等外部系統移入和移出 Apache Kafka 和 MSK 集群。用戶現在可以使用 MSK Connect 構建從許多數據庫源到 MSK 集群的完整 CDC 管道。
Amazon MSK 是一項完全託管的服務,可以輕鬆構建和運行使用 Apache Kafka 處理流數據的應用程序。使用 Apache Kafka 可以從數據庫更改事件或網站點擊流等來源捕獲實時數據。然後構建管道(使用流處理框架,如 Apache Flink)將它們交付到目標,如持久存儲或 Amazon S3。
Apache Flink 是一個流行的框架,用於構建有狀態的流和批處理管道。 Flink 帶有不同級別的抽象,以涵蓋廣泛的用例。
Flink 還根據選擇的資源提供者(Hadoop YARN、Kubernetes 或獨立)提供不同的部署模式。
這篇文章將使用 SQL 客戶端工具作為一種交互式方式以 SQL 語法創建 Flink 作業。 sql-client.sh將作業編譯並提交到 Amazon EMR 上長時間運行的 Flink 集群(session 模式)。根據腳本,sql-client.sh要麼實時顯示作業的表格格式輸出,要麼返回長時間運行的作業的作業 ID。
可以通過以下步驟實施解決方案:

  • 創建 EMR 集群
  • 使用 Kafka 和 Hudi 表連接器配置 Flink
  • 開發實時提取、轉換和加載 (ETL) 作業
  • 將管道部署到生產環境

先決條件

本文假設環境中有一個正在運行的 MSK Connect ,其中包含以下組件:

  • Aurora MySQL 託管數據庫。這篇文章中將使用示例數據庫 salesdb
  • 在 MSK Connect 上運行的 Debezium MySQL 連接器,在 Amazon Virtual Private Cloud (Amazon VPC) 中以 Amazon MSK 結尾。
  • 在 VPC 中運行的 MSK 集群

如果沒有 MSK Connect ,請按照 MSK Connect 實驗室設置中的說明進行操作,並驗證源連接器是否將數據更改複製到 MSK 主題。
還需要能夠直接連接到 EMR Leader節點。Session Manager 是 AWS Systems Manager 的一項功能,可提供基於瀏覽器的交互式一鍵式 shell 窗口。會話管理器還允許對受管節點進行受控訪問的公司策略。
如果不使用 Session Manager ,也可以使用 Amazon Elastic Compute Cloud (Amazon EC2) 私有密鑰對,但需要在公有子網中啟動集群並提供入站 SSH 訪問。

創建 EMR 集群

在撰寫本文時最新發佈的 Apache Hudi 版本是 0.10.0。 Hudi 發佈版本 0.10.0 兼容 Flink 發佈版本 1.13。 因此需要 Amazon EMR 發佈版本 emr-6.4.0 及更高版本,它與 Flink 發佈版本 1.13 一起提供。 要使用 AWS 命令行界面 (AWS CLI) 啟動安裝了 Flink 的集群,請完成以下步驟:

  1. 創建一個文件,configurations.json,包含以下內容:
[
  {
    "Classification": "flink-conf",
    "Properties": {
      "taskmanager.numberOfTaskSlots":"4"
    }
  }
]
  1. 在私有子網(推薦)或託管 MSK 集群的同一 VPC 的公有子網中創建 EMR 集群。 使用 --name 選項輸入集群的名稱,並使用 --ec2-attributes 選項指定 EC2 密鑰對的名稱以及子網 ID。 請參閱以下代碼:
aws emr create-cluster --release-label emr-6.4.0 \
--applications Name=Flink \
--name FlinkHudiCluster \
--configurations file://./configurations.json \
--region us-east-1 \
--log-uri s3://yourLogUri \
--instance-type m5.xlarge \
--instance-count 2 \
--service-role EMR_DefaultRole \ 
--ec2-attributes KeyName=YourKeyName,InstanceProfile=EMR_EC2_DefaultRole, SubnetId=A SubnetID of Amazon MSK VPC 
  1. 等到集群狀態變更為 Running。
  2. 使用 Amazon EMR 控制台或 AWS CLI 檢索Leader節點的 DNS 名稱。
  3. 通過 Session Manager 或在 Linux、Unix 和 Mac OS X 上使用 SSH 和 EC2 私鑰連接到Leader節點。
  4. 使用 SSH 連接時,領導節點的安全組必須允許端口 22。
  5. 確保 MSK 集群的安全組具有接收來自 EMR 集群安全組的流量的入站規則。

使用 Kafka 和 Hudi 表連接器配置 Flink

Flink 表連接器允許使用 Table API 編程流操作時連接到外部系統。源連接器提供對流服務的訪問,包括作為數據源的 Kinesis 或 Apache Kafka。Sink 連接器允許 Flink 將流處理結果發送到外部系統或 Amazon S3 等存儲服務。
在 Amazon EMR Leader節點上下載以下連接器並將它們保存在 /lib/flink/lib 目錄中:

  • 源連接器——從 Apache 倉庫下載 flink-connector-kafka_2.11-1.13.1.jar。 Apache Kafka SQL 連接器允許 Flink 從 Kafka 主題中讀取數據。
  • 接收器連接器 – Amazon EMR 發佈版本 emr-6.4.0 隨附 Hudi 發佈版本 0.8.0。但是在這篇文章中需要 Hudi Flink 捆綁連接器發佈版本 0.10.0,它與 Flink 發佈版本 1.13 兼容。從 Apache 倉庫下載 hudi-flink-bundle_2.11-0.10.0.jar。它還包含多個文件系統客戶端,包括用於與 Amazon S3 集成的 S3A。

開發實時 ETL 作業

這篇文章使用 Debezium 源 Kafka 連接器將示例數據庫 salesdb的數據更改流式傳輸到 MSK 集群。連接器以 JSON 格式生成數據更改。 Flink Kafka 連接器可以通過在表選項中使用 debezium-json設置 value.format來反序列化 JSON 格式的事件。除了插入之外,此配置還完全支持數據更新和刪除。
使用 Flink SQL API 構建一個新作業。這些 API 允許使用流數據,類似於關係數據庫中的表。此方法中指定的 SQL 查詢在源流中的數據事件上連續運行。因為 Flink 應用程序從流中消費無限數據,所以輸出不斷變化。為了將輸出發送到另一個系統,Flink 向下游 sink 操作員發出更新或刪除事件。因此當使用 CDC 數據或編寫需要更新或刪除輸出行的 SQL 查詢時,必須提供支持這些操作的接收器連接器。否則Flink 作業將出現如下錯誤信息

Target Table doesn't support consuming update or delete changes which is produced by {your query statement} …

使用之前在配置文件中指定的配置在 EMR 集群上啟動 Flink YARN 應用程序:

cd /lib/flink && ./bin/yarn-session.sh --detached

命令成功運行後就可以創建第一個作業了。運行以下命令以啟動 sql-client:

./bin/sql-client.sh

終端窗口類似於以下屏幕截圖。

image.png

設置作業參數

運行以下命令來設置此會話的檢查點間隔:

SET execution.checkpointing.interval = 1min;

定義源表

從概念上講使用 SQL 查詢處理流需要將事件解釋為表中的邏輯記錄。 因此使用 SQL API 讀取或寫入數據之前的第一步是創建源表和目標表。 表定義包括連接設置和配置以及定義流中對象的結構和序列化格式的模式。
這篇文章中將創建三個源表,每個對應於 Amazon MSK 中的一個主題。還可以創建一個目標表,將輸出數據記錄寫入存儲在 Amazon S3 上的 Hudi 數據集。
"properties.bootstrap.servers"選項中將 BOOTSTRAP SERVERS ADDRESSES替換為自己的 Amazon MSK 集群信息,並在 sql-client終端中運行以下命令:

CREATE TABLE CustomerKafka (
      `event_time` TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,  -- from Debezium format
      `origin_table` STRING METADATA FROM 'value.source.table' VIRTUAL, -- from Debezium format
      `record_time` TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
      `CUST_ID` BIGINT,
      `NAME` STRING,
      `MKTSEGMENT` STRING,
       WATERMARK FOR event_time AS event_time
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'salesdb.salesdb.CUSTOMER', -- created by debezium connector, corresponds to CUSTOMER table in Amazon Aurora database. 
      'properties.bootstrap.servers' = '<PLAINTEXT BOOTSTRAP SERVERS ADDRESSES>',
      'properties.group.id' = 'ConsumerGroup1',
      'scan.startup.mode' = 'earliest-offset',
      'value.format' = 'debezium-json'
    );

CREATE TABLE CustomerSiteKafka (
      `event_time` TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,  -- from Debezium format
      `origin_table` STRING METADATA FROM 'value.source.table' VIRTUAL, -- from Debezium format
      `record_time` TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
      `CUST_ID` BIGINT,
      `SITE_ID` BIGINT,
      `STATE` STRING,
      `CITY` STRING,
       WATERMARK FOR event_time AS event_time
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'salesdb.salesdb.CUSTOMER_SITE',
      'properties.bootstrap.servers' = '< PLAINTEXT BOOTSTRAP SERVERS ADDRESSES>',
      'properties.group.id' = 'ConsumerGroup2',
      'scan.startup.mode' = 'earliest-offset',
      'value.format' = 'debezium-json'
    );

CREATE TABLE SalesOrderAllKafka (
      `event_time` TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,  -- from Debezium format
      `origin_table` STRING METADATA FROM 'value.source.table' VIRTUAL, -- from Debezium format
      `record_time` TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
      `ORDER_ID` BIGINT,
      `SITE_ID` BIGINT,
      `ORDER_DATE` BIGINT,
      `SHIP_MODE` STRING,
       WATERMARK FOR event_time AS event_time
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'salesdb.salesdb.SALES_ORDER_ALL',
      'properties.bootstrap.servers' = '< PLAINTEXT BOOTSTRAP SERVERS ADDRESSES>',
      'properties.group.id' = 'ConsumerGroup3',
      'scan.startup.mode' = 'earliest-offset',
      'value.format' = 'debezium-json'
    );

默認情況下 sql-client將這些表存儲在內存中,它們僅在活動會話期間存在,每當 sql-client 會話到期或退出時都需要重新創建表。

定義目標Sink表

以下命令創建目標表。 指定 'hudi'作為此表中的連接器。 其餘的 Hudi 配置在 CREATE TABLE語句的 with(...)部分中設置。將 S3URI OF HUDI DATASET LOCATION替換為在 Amazon S3 中的 Hudi 數據集位置並運行以下代碼:

CREATE TABLE CustomerHudi (
      `order_count` BIGINT,
      `customer_id` BIGINT,
      `name` STRING,
      `mktsegment` STRING,
      `ts` TIMESTAMP(3),
      PRIMARY KEY (`customer_id`) NOT Enforced
    )
    PARTITIONED BY (`mktsegment`)
    WITH (
      'connector' = 'hudi',
      'write.tasks' = '4',
      'path' = '<S3URI OF HUDI DATASET LOCATION>',
      'table.type' = 'MERGE_ON_READ' --  MERGE_ON_READ table or, by default is COPY_ON_WRITE
    );

對於 select查詢,sql-client將作業提交到 Flink 集群,然後將結果實時顯示在屏幕上。 運行以下選擇查詢以查看 Amazon MSK 數據:

SELECT Count(O.order_id) AS order_count,
       C.cust_id,
       C.NAME,
       C.mktsegment
FROM   customerkafka C
       JOIN customersitekafka CS
         ON C.cust_id = CS.cust_id
       JOIN salesorderallkafka O
         ON O.site_id = CS.site_id
GROUP  BY C.cust_id,
          C.NAME,
          C.mktsegment; 

此查詢連接三個流並聚合按每個客戶記錄分組的客戶訂單計數,幾秒鐘後會在終端中看到結果。 請注意終端輸出如何隨着 Flink 作業從源流中消耗更多事件而發生變化。

將結果寫入 Hudi 數據集

要擁有完整的管道,需要將結果寫到 Amazon S3 上的 Hudi 數據集。 為此請在查詢前面添加一個插入 CustomerHudi 語句:

INSERT INTO customerhudi
SELECT Count(O.order_id),
       C.cust_id,
       C.NAME,
       C.mktsegment,
       Proctime()
FROM   customerkafka C
       JOIN customersitekafka CS
         ON C.cust_id = CS.cust_id
       JOIN salesorderallkafka O
         ON O.site_id = CS.site_id
GROUP  BY C.cust_id,
          C.NAME,
          C.mktsegment;

這一次 sql-client提交作業後與集群斷開連接,客戶端不必等待作業的結果,因為它會將結果寫入 Hudi 數據集。即使停止了 sql-client會話,該作業也會繼續在 Flink 集群上運行。
等待幾分鐘,直到作業將 Hudi 提交日誌文件生成到 Amazon S3。然後導航到為 CustomerHudi 表指定的 Amazon S3 中的位置,其中包含按 MKTSEGMENT 列分區的 Hudi 數據集。在每個分區中,您還可以找到 Hudi 提交日誌文件。這是因為表類型定義為 MERGE_ON_READ。在此模式下使用默認配置,Hudi 會在出現五個 delta 提交日誌後將提交日誌合併到更大的 Parquet 文件中。可以通過將表類型更改為 COPY_ON_WRITE 或指定自定義壓縮配置來更改此設置。

查詢 Hudi 數據集

可以使用 Hudi Flink 連接器作為源連接器來讀取存儲在 Amazon S3 上的 Hudi 數據集。為此可以針對 CustomerHudi 表運行 select 語句,或者使用為連接器指定的 hudi 創建一個新表。該路徑必須指向 Amazon S3 上現有 Hudi 數據集的位置。將 S3URI OF HUDI DATASET LOCATION 替換並運行以下命令以創建新表:

CREATE TABLE `CustomerHudiReadonly` (
      `_hoodie_commit_time` string,
      `_hoodie_commit_seqno` string,
      `_hoodie_record_key` string,
      `order_count` BIGINT,
      `customer_id` BIGINT,
      `name` STRING,
      `mktsegment` STRING,
      `ts` TIMESTAMP(3),
      PRIMARY KEY (`customer_id`) NOT Enforced
    )
    PARTITIONED BY (`mktsegment`)
    WITH (
      'connector' = 'hudi',
      'hoodie.datasource.query.type' = 'snapshot',
      'path' = '<S3URI OF HUDI DATASET LOCATION>',
     'table.type' = 'MERGE_ON_READ' --  MERGE_ON_READ table or, by default is COPY_ON_WRITE
    );

請注意以 _hoodie_為前綴的附加列名,這些列是 Hudi 在寫入過程中添加的,用於維護每條記錄的元數據。另請注意在表定義的 WITH 部分中傳遞的額外"hoodie.datasource.query.type"讀取配置,這可確保從 Hudi 數據集的實時視圖中讀取數據。運行以下命令:

select * from CustomerHudiReadonly where customer_id <= 5;

終端會在 30 秒內顯示結果。導航到 Flink Web 界面可以在其中觀察由 select 查詢啟動的新 Flink 作業(有關如何找到 Flink Web 界面,請參見下文)。它掃描 Hudi 數據集中已提交的文件,並將結果返回給 Flink SQL 客戶端。
使用 mysql CLI或其他 IDE 連接到託管在 Aurora MySQL 上的 salesdb 數據庫。針對 SALES_ORDER_ALL表運行一些插入語句:

insert into SALES_ORDER_ALL values (29001, 2, now(), 'STANDARD');
insert into SALES_ORDER_ALL values (29002, 2, now(), 'TWO-DAY');
insert into SALES_ORDER_ALL values (29003, 2, now(), 'STANDARD');
insert into SALES_ORDER_ALL values (29004, 2, now(), 'TWO-DAY');
insert into SALES_ORDER_ALL values (29005, 2, now(), 'STANDARD');

幾秒鐘後一個新的提交日誌文件會出現在 Amazon S3 上的 Hudi 數據集中。 Debezium for MySQL Kafka 連接器捕獲更改並為 MSK 主題生成事件。 Flink 應用程序使用來自主題的新事件並相應地更新 customer_count 列。然後它將更改的記錄發送到 Hudi 連接器以與 Hudi 數據集合併。
Hudi 支持不同的寫操作類型。默認操作是 upsert,它最初在數據集中插入記錄。當具有現有鍵的記錄到達流程時,它被視為更新。此操作在希望將數據集與源數據庫同步且不希望出現重複記錄的情況下很有用。

Flink Web 界面可幫助您查看 Flink 作業的配置、圖表、狀態、異常錯誤、資源利用率等。要訪問它首先需要在瀏覽器中設置 SSH 隧道並激活代理,以連接到 YARN 資源管理器。連接到資源管理器後,選擇託管 Flink 會話的 YARN 應用程序。選擇 Tracking UI 列下的鏈接以導航到 Flink Web 界面。

image.png

將管道部署到生產環境

對於實驗、開發或測試數據管道來說,使用 Flink sql-client以交互方式快速構建數據管道,這是一個不錯的選擇。但是對於生產環境,建議將 SQL 腳本嵌入 Flink Java 應用程序並在 Amazon Kinesis Data Analytics 上運行。 Kinesis Data Analytics 是用於運行 Flink 應用程序的完全託管服務;它具有內置的自動擴展和容錯功能,可為生產應用程序提供所需的可用性和可擴展性。GitHub 上提供了一個 Flink Hudi 應用程序,其中包含這篇文章中的腳本,用戶可以訪問此存儲庫,並比較在 sql-clientKinesis Data Analytics中運行之間的差異。

清理

為避免產生持續費用,請完成以下清理步驟:

  • 停止 EMR 集群
  • 刪除 MSK Connect Lab 設置創建的 AWS CloudFormation

結論

構建數據湖是打破數據孤島和運行分析以從所有數據中獲取洞察力的第一步。在數據湖上的事務數據庫和數據文件之間同步數據並非易事,而且需要大量工作。在 Hudi 添加對 Flink SQL API 的支持之前,Hudi 客戶必須具備編寫 Apache Spark 代碼並在 AWS Glue 或 Amazon EMR 上運行它的必要技能。在這篇文章中展示了一種新方法,可以使用 SQL 查詢以交互方式探索流服務中的數據,並加快數據管道的開發過程。