【译】如何调整ApacheFlink®集群的大小How To Size Your Apache Flink® Cluster: A Back-of-the-Envelope Calculation

  • 2019 年 10 月 4 日
  • 筆記

来自Flink Forward Berlin 2017的最受欢迎的会议是Robert Metzger的“坚持下去:如何可靠,高效地操作Apache Flink”。 Robert所涉及的主题之一是如何粗略地确定Apache Flink集群的大小。 Flink Forward的与会者提到他的群集大小调整指南对他们有帮助,因此我们将他的谈话部分转换为博客文章。 请享用!

Flink社区中最常见的问题之一是如何在从开发阶段转向生产阶段时确定群集的大小。 对这个问题的明确答案当然是“它取决于”,但这不是一个有用的答案。 这篇文章概述了一系列问题,要求您提供一些可用作指导的数字。

做计算并建立基线

第一步是仔细考虑应用程序的运营指标,以获得所需资源的基线。

要考虑的关键指标是:

  • 每秒记录数和每条记录的大小
  • 您拥有的不同key的数量以及每个key的状态大小
  • 状态更新的数量和状态后端的访问模式

最后,更实际的问题是您的服务水平协议(SLA)与客户的停机时间,延迟和最大吞吐量有关,因为这些直接影响您的容量规划。

接下来,根据您的预算查看您可用的资源。例如:

  • 网络容量,考虑到也使用网络的任何外部服务,如Kafka,HDFS等。
  • 您的磁盘带宽,如果您依赖于基于磁盘的状态后端(如RocksDB)(并考虑其他磁盘使用,如Kafka或HDFS)
  • 机器的数量以及它们可用的CPU和内存

基于所有这些因素,您现在可以构建正常操作的基线,以及用于恢复追赶或处理负载峰值的资源缓冲区。我建议您在建立基线时考虑检查点期间使用的资源。

示例:让我们举一些例子

我现在将计划在假设的集群上部署作业,以可视化建立资源使用基准的过程。 这些数字是粗略的“背后”值,并且它们并不全面 – 在帖子的最后,我还将确定在进行此计算时我忽略的一些方面。

示例Flink流式处理作业和硬件

示例Flink Streaming作业拓扑

对于此示例,我将部署一个典型的Flink流式作业,该作业使用Flink的Kafka使用者从Kafka主题读取数据。 然后使用键控聚合窗口运算符来变换流。 窗口操作符在5分钟的时间窗口上执行聚合。 由于总是有新数据,我将窗口配置为一个滑动窗口,滑动时间为1分钟。

这意味着我将获得每分钟更新过去5分钟的聚合。 流式传输作业为每个userId创建一个聚合。 从Kafka主题消耗的消息的大小(平均)为2 KB。

吞吐量是每秒100万条消息。 要了解窗口运算符的状态大小,您需要知道不同键的数量。 在这种情况下,它是userIds的数量,即500,000,000个唯一身份用户。 对于每个用户,您计算四个数字,存储为长(8个字节)。

让我们总结一下这项工作的关键指标:

  • Message size: 2KB
  • Throughput: 1,000,000 msg/sec
  • Distinct keys: 500,000,000 (aggregation in window: 4 longs per key)
  • Checkpointing: Once every minute.

假设硬件设置

运行该作业的机器有五台,每台机器都运行Flink TaskManager(Flink的工作节点)。 磁盘是网络连接的(在云设置中很常见),从主交换机到运行TaskManager的每台机器都有一个10千兆以太网连接。 Kafka broker分布在不同的机器上运行。

每台机器有16个CPU核心。 为简单起见,我不会考虑CPU和内存要求。 在现实世界中,根据您的应用程序逻辑和使用中的状态后端,您需要注意内存。 此示例使用基于RocksDB的状态后端,该后端功能强大且内存要求低。

单机的视角

要了解整个作业部署的资源需求,最简单的方法是首先关注一台机器和一台TaskManager中的操作。 然后,您可以使用从一台计算机派生的数字来计算总体资源需求。

默认情况下(如果所有运算符具有相同的并行性且没有特殊的调度限制),则每个计算机上都会运行流式作业的所有运算符。

在这种情况下,Kafka源(或消费者),窗口操作符和Kafka接收器(或生产者)都在五台机器中的每台机器上运行。

机器视角 – TaskManager n

keyBy是上图中的一个单独的运算符,因此计算资源需求更容易。 实际上,keyBy是一个API构造,并转换为Kafka源和窗口运算符之间连接的配置属性。

我现在将从上到下遍历每个运算符,以了解他们的网络资源需求。

The Kafka source

要计算单个Kafka源接收的数据量,首先计算聚合Kafka输入。 源每秒接收1,000,000条消息,每条消息2KB。

2KB x 1,000,000/s = 2GB/s

将2GB / s除以机器数量(5)会产生以下结果:

2GB/s ÷ 5 machines = 400MB/s

群集中运行的5个Kafka源中的每一个都接收平均吞吐量为400 MB / s的数据。

The Kafka source calculation

混洗和分区

接下来,您需要确保具有相同key的所有事件(在本例中为userId)最终位于同一台计算机上。 您正在读取的Kafka主题中的数据可能会根据不同的分区方案进行分区。

混洗过程将具有相同key的所有数据发送到一台计算机,因此您将来自Kafka的400MB / s数据流拆分为userId分区流:

400MB/s ÷ 5 machines = 80MB/s

平均而言,您必须向每台计算机发送80 MB / s的数据。 这个分析是从一台机器的角度来看的,这意味着一些数据已经在指定的目标机器上,因此减去80MB / s来解释:

400MB/s – 80MB = 320MB/s

每台机器以320MB / s的速率接收和发送用户数据。

混洗计算

Window Emit and Kafka Sink

接下来要问的问题是窗口操作员发出多少数据并将其发送到Kafka接收器。 它是67MB / s,让我们解释一下我们是如何达到这个数字的。

窗口运算符为每个键保留4个数字(表示为长整数)的汇总。 每分钟一次,操作员发出当前的聚合值。 每个key从聚合中发出2个int(user_id,window_ts)和4个long:

(2 x 4 bytes) + (4 x 8 bytes) = 40 bytes per key

然后考虑key(500,000,000除以机器数量):

100,000,000 keys x 40 bytes = 4GB

……来自每台机器。

然后计算每秒大小:

4GB/min ÷ 60 = 67MB/s

…由每个TaskManager发出。

这意味着每个TaskManager平均从窗口运算符发出67 MB / s的用户数据。 由于每个TaskManager上都运行一个Kafka接收器(窗口运算符旁边),并且没有进一步的重新分区,这是从Flink发送到Kafka的数据量。

用户数据:从Kafka,洗牌到窗口运算符,然后回到Kafka

窗口运算符的数据发射预计是“突发性的”,因为它们每分钟发出一次数据。 实际上,运营商不会以67 MB / s的恒定速率发送数据,而是每分钟最多可用带宽几秒钟。

这总计为:

  • Data in: 720MB/s (400 + 320) per machine
  • Data out: 387MB/s (320 + 67) per machine

状态访问和检查点

这不是一切。 到目前为止,我只查看了Flink正在处理的用户数据。 您需要将存储状态和检查点保存在RocksDB中而进行的磁盘访问的开销包括在内。 要了解磁盘访问成本,请查看窗口运算符如何访问状态。 Kafka源也保持一些状态,但与窗口运算符相比,它可以忽略不计。

要了解窗口运算符的状态大小,请从不同的角度查看它。 Flink正在计算5分钟的窗户,只需1分钟的幻灯片。 Flink通过维护五个窗口来实现滑动窗口,每个窗口对应一个“幻灯片”。如前所述,当使用执行急切聚合的窗口实现时,每个窗口和聚合的每个key保持40个字节的状态。 对于每个传入事件,首先需要从磁盘检索当前聚合值(读取40个字节),更新聚合,然后再写入新值(写入40个字节)。

窗口状态

这意味着:

40 bytes of state x 5 windows x 200,000 msg/s per machine = 40MB/s

…每台机器的读写磁盘访问权限。 如开头所述,磁盘是网络连接的,因此我需要将这些数字添加到整体吞吐量计算中。 总数现在是:

  • Data in: 760MB/s (400 MB/s data in + 320 MB/s shuffle + 40 MB/s state)
  • Data out: 427MB/s (320 MB/s shuffle + 67 MB/s data out + 40 MB/s state)

以上考虑用于状态访问,当新事件到达窗口操作符时,该访问一致地发生。 您还可以启用容错检查点。 如果计算机或其他任何其他设备出现故障,您需要恢复窗口内容并继续处理。

检查点设置为每分钟一个检查点的间隔,每个检查点将作业的整个状态复制到网络附加文件系统中。

让我们快速了解每台机器上的整个状态有多大:

40bytes of state x 5 windows x 100,000,000 keys = 20GB

并且,要获得每秒值:

20GB ÷ 60 = 333 MB/s.

与窗口运算符类似,检查点具有突发模式,每分钟一次,它会尝试将其数据全速发送到外部存储。 检查点导致对RocksDB的额外状态访问(在此示例中位于网络连接磁盘上)。 自Flink 1.3以来,RocksDB状态后端支持增量检查点,减少了每个检查点上所需的网络传输,从概念上讲,仅发送自上一个检查点以来的“diff”,但此示例中未使用此功能。

这会将总计更新为:

  • Data in: 760MB/s (400 + 320 + 40)
  • Data out: 760MB/s (320 + 67 + 40 + 333)

这意味着整体网络流量为:

760 + 760 x 5 + 400 + 2335 = 10335 MB/s

400是整个5台机器上80MB状态访问(读写)进程的总和,2335是整个集群中Kafka进出流程的总和。

或者只是上面硬件设置中可用网络容量的一半以上。

网络要求

我想补充一下免责声明。 这些计算都不包括协议开销,例如来自Flink,Kafka或文件系统的TCP,以太网和RPC调用。 这仍然是了解工作所需的硬件类型以及性能指标的良好起点。

扩大你的方式

根据我的分析,此示例使用5节点集群,并且在典型操作中,每台计算机需要处理760 MB / s的数据,包括输入和输出,总容量为1250 MB / s。 这为我所掩盖的复杂性保留了大约40%的网络容量,例如网络协议开销,从检查点恢复时事件重放期间的高负载,以及由数据偏差导致的集群内不均衡的负载平衡。

对于40%是否是适当的余量,没有一个通用的答案,但这个算术应该给你一个很好的起点。 尝试上面的计算,更换机器数量,key数量或每秒消息数,以便选择要考虑的值,然后根据预算和运营因素进行平衡。 快乐缩放!

原文连接:https://www.ververica.com/blog/how-to-size-your-apache-flink-cluster-general-guidelines

欢迎来腾讯云社区:https://cloud.tencent.com/developer/support-plan?invite_code=1q904aao4zdgs