使用Apache Flink 和 Apache Hudi 创建低延迟数据湖管道

近年来出现了从单体架构向微服务架构的转变。微服务架构使应用程序更容易扩展和更快地开发,支持创新并加快新功能上线时间。但是这种方法会导致数据存在于不同的孤岛中,这使得执行分析变得困难。为了获得更深入和更丰富的见解,企业应该将来自不同孤岛的所有数据集中到一个地方。
AWS 提供复制工具,例如 AWS Database Migration Service (AWS DMS),用于将数据更改从各种源数据库复制到各种目标,包括 Amazon Simple Storage Service (Amazon S3)。但是需要将数据湖中的数据与源系统上的更新和删除同步的客户仍然面临一些挑战:

  • 当记录存储在 Amazon S3 上的开放数据格式文件(例如 JSON、ORC 或 Parquet)中时,很难应用记录级更新或删除。
  • 在流式使用案例中,作业需要以低延迟写入数据,JSON 和 Avro 等基于行的格式最适合。但是使用这些格式扫描许多小文件会降低读取查询性能。
  • 在源数据模式频繁更改的用例中,通过自定义代码维护目标数据集的模式既困难又容易出错。

Apache Hudi 提供了解决这些挑战的好方法。 Hudi 在第一次写入记录时会建立索引。 Hudi 使用这些索引来定位更新(或删除)所属的文件。这使 Hudi 能够通过避免扫描整个数据集来执行快速更新插入(或删除)操作。 Hudi 提供了两种表类型,每种都针对特定场景进行了优化:

  • Copy-On-Write (COW) – 这些表在批处理中很常见。在这种类型中,数据以列格式(Parquet)存储,每次更新(或删除)都会在写入过程中创建一个新版本的文件。
  • Merge-On-Read (MOR) – 使用列(例如 Parquet)和基于行(例如 Avro)文件格式的组合存储数据,旨在让数据更加实时。

存储在 Amazon S3 中的 Hudi 数据集提供与其他 AWS 服务的原生集成。例如可以使用 AWS Glue(请参阅使用 AWS Glue 自定义连接器写入 Apache Hudi 表)或 Amazon EMR(请参阅 Amazon EMR 中提供的 Apache Hudi 的新功能)写入 Apache Hudi 表。但这些方法需要对 Hudi 的 Spark API 和编程技能有深入的掌握,才能构建和维护数据管道。
这篇文章中将展示一种以最少编码处理流数据的不同方式。本文中的步骤演示了如何在没有 Flink 或 Hudi 知识的情况下使用 SQL 语言构建完全可扩展的管道。可以通过编写熟悉的 SELECT 查询来查询和探索多个数据流中的数据,还可以连接来自多个流的数据并将结果物化到 Amazon S3 上的 Hudi 数据集。

解决方案概述

下图提供了本文中描述的解决方案的整体架构。接下来的部分将会详细描述描述组件和步骤。

image.png

使用 Amazon Aurora MySQL 数据库作为源,使用带有 MSK Connect Lab 中描述的 Debezium MySQL 连接器作为变更数据捕获 (CDC) 复制器。本实验将引导完成设置堆栈的步骤,以使用带有 MySql Debezium 源 Kafka 连接器的 Amazon MSK Connect 将 Aurora 数据库 salesdb 复制到 Amazon Managed Streaming for Apache Kafka (Amazon MSK) 集群。
2021 年 9 月,AWS 宣布 MSK Connect 用于运行完全托管的 Kafka Connect 集群。只需单击几下,MSK Connect 即可轻松部署、监控和扩展连接器,将数据从数据库、文件系统和搜索索引等外部系统移入和移出 Apache Kafka 和 MSK 集群。用户现在可以使用 MSK Connect 构建从许多数据库源到 MSK 集群的完整 CDC 管道。
Amazon MSK 是一项完全托管的服务,可以轻松构建和运行使用 Apache Kafka 处理流数据的应用程序。使用 Apache Kafka 可以从数据库更改事件或网站点击流等来源捕获实时数据。然后构建管道(使用流处理框架,如 Apache Flink)将它们交付到目标,如持久存储或 Amazon S3。
Apache Flink 是一个流行的框架,用于构建有状态的流和批处理管道。 Flink 带有不同级别的抽象,以涵盖广泛的用例。
Flink 还根据选择的资源提供者(Hadoop YARN、Kubernetes 或独立)提供不同的部署模式。
这篇文章将使用 SQL 客户端工具作为一种交互式方式以 SQL 语法创建 Flink 作业。 sql-client.sh将作业编译并提交到 Amazon EMR 上长时间运行的 Flink 集群(session 模式)。根据脚本,sql-client.sh要么实时显示作业的表格格式输出,要么返回长时间运行的作业的作业 ID。
可以通过以下步骤实施解决方案:

  • 创建 EMR 集群
  • 使用 Kafka 和 Hudi 表连接器配置 Flink
  • 开发实时提取、转换和加载 (ETL) 作业
  • 将管道部署到生产环境

先决条件

本文假设环境中有一个正在运行的 MSK Connect ,其中包含以下组件:

  • Aurora MySQL 托管数据库。这篇文章中将使用示例数据库 salesdb
  • 在 MSK Connect 上运行的 Debezium MySQL 连接器,在 Amazon Virtual Private Cloud (Amazon VPC) 中以 Amazon MSK 结尾。
  • 在 VPC 中运行的 MSK 集群

如果没有 MSK Connect ,请按照 MSK Connect 实验室设置中的说明进行操作,并验证源连接器是否将数据更改复制到 MSK 主题。
还需要能够直接连接到 EMR Leader节点。Session Manager 是 AWS Systems Manager 的一项功能,可提供基于浏览器的交互式一键式 shell 窗口。会话管理器还允许对受管节点进行受控访问的公司策略。
如果不使用 Session Manager ,也可以使用 Amazon Elastic Compute Cloud (Amazon EC2) 私有密钥对,但需要在公有子网中启动集群并提供入站 SSH 访问。

创建 EMR 集群

在撰写本文时最新发布的 Apache Hudi 版本是 0.10.0。 Hudi 发布版本 0.10.0 兼容 Flink 发布版本 1.13。 因此需要 Amazon EMR 发布版本 emr-6.4.0 及更高版本,它与 Flink 发布版本 1.13 一起提供。 要使用 AWS 命令行界面 (AWS CLI) 启动安装了 Flink 的集群,请完成以下步骤:

  1. 创建一个文件,configurations.json,包含以下内容:
[
  {
    "Classification": "flink-conf",
    "Properties": {
      "taskmanager.numberOfTaskSlots":"4"
    }
  }
]
  1. 在私有子网(推荐)或托管 MSK 集群的同一 VPC 的公有子网中创建 EMR 集群。 使用 --name 选项输入集群的名称,并使用 --ec2-attributes 选项指定 EC2 密钥对的名称以及子网 ID。 请参阅以下代码:
aws emr create-cluster --release-label emr-6.4.0 \
--applications Name=Flink \
--name FlinkHudiCluster \
--configurations file://./configurations.json \
--region us-east-1 \
--log-uri s3://yourLogUri \
--instance-type m5.xlarge \
--instance-count 2 \
--service-role EMR_DefaultRole \ 
--ec2-attributes KeyName=YourKeyName,InstanceProfile=EMR_EC2_DefaultRole, SubnetId=A SubnetID of Amazon MSK VPC 
  1. 等到集群状态变更为 Running。
  2. 使用 Amazon EMR 控制台或 AWS CLI 检索Leader节点的 DNS 名称。
  3. 通过 Session Manager 或在 Linux、Unix 和 Mac OS X 上使用 SSH 和 EC2 私钥连接到Leader节点。
  4. 使用 SSH 连接时,领导节点的安全组必须允许端口 22。
  5. 确保 MSK 集群的安全组具有接收来自 EMR 集群安全组的流量的入站规则。

使用 Kafka 和 Hudi 表连接器配置 Flink

Flink 表连接器允许使用 Table API 编程流操作时连接到外部系统。源连接器提供对流服务的访问,包括作为数据源的 Kinesis 或 Apache Kafka。Sink 连接器允许 Flink 将流处理结果发送到外部系统或 Amazon S3 等存储服务。
在 Amazon EMR Leader节点上下载以下连接器并将它们保存在 /lib/flink/lib 目录中:

  • 源连接器——从 Apache 仓库下载 flink-connector-kafka_2.11-1.13.1.jar。 Apache Kafka SQL 连接器允许 Flink 从 Kafka 主题中读取数据。
  • 接收器连接器 – Amazon EMR 发布版本 emr-6.4.0 随附 Hudi 发布版本 0.8.0。但是在这篇文章中需要 Hudi Flink 捆绑连接器发布版本 0.10.0,它与 Flink 发布版本 1.13 兼容。从 Apache 仓库下载 hudi-flink-bundle_2.11-0.10.0.jar。它还包含多个文件系统客户端,包括用于与 Amazon S3 集成的 S3A。

开发实时 ETL 作业

这篇文章使用 Debezium 源 Kafka 连接器将示例数据库 salesdb的数据更改流式传输到 MSK 集群。连接器以 JSON 格式生成数据更改。 Flink Kafka 连接器可以通过在表选项中使用 debezium-json设置 value.format来反序列化 JSON 格式的事件。除了插入之外,此配置还完全支持数据更新和删除。
使用 Flink SQL API 构建一个新作业。这些 API 允许使用流数据,类似于关系数据库中的表。此方法中指定的 SQL 查询在源流中的数据事件上连续运行。因为 Flink 应用程序从流中消费无限数据,所以输出不断变化。为了将输出发送到另一个系统,Flink 向下游 sink 操作员发出更新或删除事件。因此当使用 CDC 数据或编写需要更新或删除输出行的 SQL 查询时,必须提供支持这些操作的接收器连接器。否则Flink 作业将出现如下错误信息

Target Table doesn't support consuming update or delete changes which is produced by {your query statement} …

使用之前在配置文件中指定的配置在 EMR 集群上启动 Flink YARN 应用程序:

cd /lib/flink && ./bin/yarn-session.sh --detached

命令成功运行后就可以创建第一个作业了。运行以下命令以启动 sql-client:

./bin/sql-client.sh

终端窗口类似于以下屏幕截图。

image.png

设置作业参数

运行以下命令来设置此会话的检查点间隔:

SET execution.checkpointing.interval = 1min;

定义源表

从概念上讲使用 SQL 查询处理流需要将事件解释为表中的逻辑记录。 因此使用 SQL API 读取或写入数据之前的第一步是创建源表和目标表。 表定义包括连接设置和配置以及定义流中对象的结构和序列化格式的模式。
这篇文章中将创建三个源表,每个对应于 Amazon MSK 中的一个主题。还可以创建一个目标表,将输出数据记录写入存储在 Amazon S3 上的 Hudi 数据集。
"properties.bootstrap.servers"选项中将 BOOTSTRAP SERVERS ADDRESSES替换为自己的 Amazon MSK 集群信息,并在 sql-client终端中运行以下命令:

CREATE TABLE CustomerKafka (
      `event_time` TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,  -- from Debezium format
      `origin_table` STRING METADATA FROM 'value.source.table' VIRTUAL, -- from Debezium format
      `record_time` TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
      `CUST_ID` BIGINT,
      `NAME` STRING,
      `MKTSEGMENT` STRING,
       WATERMARK FOR event_time AS event_time
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'salesdb.salesdb.CUSTOMER', -- created by debezium connector, corresponds to CUSTOMER table in Amazon Aurora database. 
      'properties.bootstrap.servers' = '<PLAINTEXT BOOTSTRAP SERVERS ADDRESSES>',
      'properties.group.id' = 'ConsumerGroup1',
      'scan.startup.mode' = 'earliest-offset',
      'value.format' = 'debezium-json'
    );

CREATE TABLE CustomerSiteKafka (
      `event_time` TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,  -- from Debezium format
      `origin_table` STRING METADATA FROM 'value.source.table' VIRTUAL, -- from Debezium format
      `record_time` TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
      `CUST_ID` BIGINT,
      `SITE_ID` BIGINT,
      `STATE` STRING,
      `CITY` STRING,
       WATERMARK FOR event_time AS event_time
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'salesdb.salesdb.CUSTOMER_SITE',
      'properties.bootstrap.servers' = '< PLAINTEXT BOOTSTRAP SERVERS ADDRESSES>',
      'properties.group.id' = 'ConsumerGroup2',
      'scan.startup.mode' = 'earliest-offset',
      'value.format' = 'debezium-json'
    );

CREATE TABLE SalesOrderAllKafka (
      `event_time` TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,  -- from Debezium format
      `origin_table` STRING METADATA FROM 'value.source.table' VIRTUAL, -- from Debezium format
      `record_time` TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
      `ORDER_ID` BIGINT,
      `SITE_ID` BIGINT,
      `ORDER_DATE` BIGINT,
      `SHIP_MODE` STRING,
       WATERMARK FOR event_time AS event_time
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'salesdb.salesdb.SALES_ORDER_ALL',
      'properties.bootstrap.servers' = '< PLAINTEXT BOOTSTRAP SERVERS ADDRESSES>',
      'properties.group.id' = 'ConsumerGroup3',
      'scan.startup.mode' = 'earliest-offset',
      'value.format' = 'debezium-json'
    );

默认情况下 sql-client将这些表存储在内存中,它们仅在活动会话期间存在,每当 sql-client 会话到期或退出时都需要重新创建表。

定义目标Sink表

以下命令创建目标表。 指定 'hudi'作为此表中的连接器。 其余的 Hudi 配置在 CREATE TABLE语句的 with(...)部分中设置。将 S3URI OF HUDI DATASET LOCATION替换为在 Amazon S3 中的 Hudi 数据集位置并运行以下代码:

CREATE TABLE CustomerHudi (
      `order_count` BIGINT,
      `customer_id` BIGINT,
      `name` STRING,
      `mktsegment` STRING,
      `ts` TIMESTAMP(3),
      PRIMARY KEY (`customer_id`) NOT Enforced
    )
    PARTITIONED BY (`mktsegment`)
    WITH (
      'connector' = 'hudi',
      'write.tasks' = '4',
      'path' = '<S3URI OF HUDI DATASET LOCATION>',
      'table.type' = 'MERGE_ON_READ' --  MERGE_ON_READ table or, by default is COPY_ON_WRITE
    );

对于 select查询,sql-client将作业提交到 Flink 集群,然后将结果实时显示在屏幕上。 运行以下选择查询以查看 Amazon MSK 数据:

SELECT Count(O.order_id) AS order_count,
       C.cust_id,
       C.NAME,
       C.mktsegment
FROM   customerkafka C
       JOIN customersitekafka CS
         ON C.cust_id = CS.cust_id
       JOIN salesorderallkafka O
         ON O.site_id = CS.site_id
GROUP  BY C.cust_id,
          C.NAME,
          C.mktsegment; 

此查询连接三个流并聚合按每个客户记录分组的客户订单计数,几秒钟后会在终端中看到结果。 请注意终端输出如何随着 Flink 作业从源流中消耗更多事件而发生变化。

将结果写入 Hudi 数据集

要拥有完整的管道,需要将结果写到 Amazon S3 上的 Hudi 数据集。 为此请在查询前面添加一个插入 CustomerHudi 语句:

INSERT INTO customerhudi
SELECT Count(O.order_id),
       C.cust_id,
       C.NAME,
       C.mktsegment,
       Proctime()
FROM   customerkafka C
       JOIN customersitekafka CS
         ON C.cust_id = CS.cust_id
       JOIN salesorderallkafka O
         ON O.site_id = CS.site_id
GROUP  BY C.cust_id,
          C.NAME,
          C.mktsegment;

这一次 sql-client提交作业后与集群断开连接,客户端不必等待作业的结果,因为它会将结果写入 Hudi 数据集。即使停止了 sql-client会话,该作业也会继续在 Flink 集群上运行。
等待几分钟,直到作业将 Hudi 提交日志文件生成到 Amazon S3。然后导航到为 CustomerHudi 表指定的 Amazon S3 中的位置,其中包含按 MKTSEGMENT 列分区的 Hudi 数据集。在每个分区中,您还可以找到 Hudi 提交日志文件。这是因为表类型定义为 MERGE_ON_READ。在此模式下使用默认配置,Hudi 会在出现五个 delta 提交日志后将提交日志合并到更大的 Parquet 文件中。可以通过将表类型更改为 COPY_ON_WRITE 或指定自定义压缩配置来更改此设置。

查询 Hudi 数据集

可以使用 Hudi Flink 连接器作为源连接器来读取存储在 Amazon S3 上的 Hudi 数据集。为此可以针对 CustomerHudi 表运行 select 语句,或者使用为连接器指定的 hudi 创建一个新表。该路径必须指向 Amazon S3 上现有 Hudi 数据集的位置。将 S3URI OF HUDI DATASET LOCATION 替换并运行以下命令以创建新表:

CREATE TABLE `CustomerHudiReadonly` (
      `_hoodie_commit_time` string,
      `_hoodie_commit_seqno` string,
      `_hoodie_record_key` string,
      `order_count` BIGINT,
      `customer_id` BIGINT,
      `name` STRING,
      `mktsegment` STRING,
      `ts` TIMESTAMP(3),
      PRIMARY KEY (`customer_id`) NOT Enforced
    )
    PARTITIONED BY (`mktsegment`)
    WITH (
      'connector' = 'hudi',
      'hoodie.datasource.query.type' = 'snapshot',
      'path' = '<S3URI OF HUDI DATASET LOCATION>',
     'table.type' = 'MERGE_ON_READ' --  MERGE_ON_READ table or, by default is COPY_ON_WRITE
    );

请注意以 _hoodie_为前缀的附加列名,这些列是 Hudi 在写入过程中添加的,用于维护每条记录的元数据。另请注意在表定义的 WITH 部分中传递的额外"hoodie.datasource.query.type"读取配置,这可确保从 Hudi 数据集的实时视图中读取数据。运行以下命令:

select * from CustomerHudiReadonly where customer_id <= 5;

终端会在 30 秒内显示结果。导航到 Flink Web 界面可以在其中观察由 select 查询启动的新 Flink 作业(有关如何找到 Flink Web 界面,请参见下文)。它扫描 Hudi 数据集中已提交的文件,并将结果返回给 Flink SQL 客户端。
使用 mysql CLI或其他 IDE 连接到托管在 Aurora MySQL 上的 salesdb 数据库。针对 SALES_ORDER_ALL表运行一些插入语句:

insert into SALES_ORDER_ALL values (29001, 2, now(), 'STANDARD');
insert into SALES_ORDER_ALL values (29002, 2, now(), 'TWO-DAY');
insert into SALES_ORDER_ALL values (29003, 2, now(), 'STANDARD');
insert into SALES_ORDER_ALL values (29004, 2, now(), 'TWO-DAY');
insert into SALES_ORDER_ALL values (29005, 2, now(), 'STANDARD');

几秒钟后一个新的提交日志文件会出现在 Amazon S3 上的 Hudi 数据集中。 Debezium for MySQL Kafka 连接器捕获更改并为 MSK 主题生成事件。 Flink 应用程序使用来自主题的新事件并相应地更新 customer_count 列。然后它将更改的记录发送到 Hudi 连接器以与 Hudi 数据集合并。
Hudi 支持不同的写操作类型。默认操作是 upsert,它最初在数据集中插入记录。当具有现有键的记录到达流程时,它被视为更新。此操作在希望将数据集与源数据库同步且不希望出现重复记录的情况下很有用。

Flink Web 界面可帮助您查看 Flink 作业的配置、图表、状态、异常错误、资源利用率等。要访问它首先需要在浏览器中设置 SSH 隧道并激活代理,以连接到 YARN 资源管理器。连接到资源管理器后,选择托管 Flink 会话的 YARN 应用程序。选择 Tracking UI 列下的链接以导航到 Flink Web 界面。

image.png

将管道部署到生产环境

对于实验、开发或测试数据管道来说,使用 Flink sql-client以交互方式快速构建数据管道,这是一个不错的选择。但是对于生产环境,建议将 SQL 脚本嵌入 Flink Java 应用程序并在 Amazon Kinesis Data Analytics 上运行。 Kinesis Data Analytics 是用于运行 Flink 应用程序的完全托管服务;它具有内置的自动扩展和容错功能,可为生产应用程序提供所需的可用性和可扩展性。GitHub 上提供了一个 Flink Hudi 应用程序,其中包含这篇文章中的脚本,用户可以访问此存储库,并比较在 sql-clientKinesis Data Analytics中运行之间的差异。

清理

为避免产生持续费用,请完成以下清理步骤:

  • 停止 EMR 集群
  • 删除 MSK Connect Lab 设置创建的 AWS CloudFormation

结论

构建数据湖是打破数据孤岛和运行分析以从所有数据中获取洞察力的第一步。在数据湖上的事务数据库和数据文件之间同步数据并非易事,而且需要大量工作。在 Hudi 添加对 Flink SQL API 的支持之前,Hudi 客户必须具备编写 Apache Spark 代码并在 AWS Glue 或 Amazon EMR 上运行它的必要技能。在这篇文章中展示了一种新方法,可以使用 SQL 查询以交互方式探索流服务中的数据,并加快数据管道的开发过程。