使用Flink进行实时日志聚合:第一部分

  • 2020 年 3 月 10 日
  • 笔记

由 Gyula Fora 和Matyas Orhidi 撰写

介绍

我们中的许多人都经历过无可奈何地挖掘多个服务器上的日志文件以解决严重生产问题的感觉。我们可能都同意这远非理想。在处理实时处理应用程序时,查找和搜索日志文件更具挑战性,因为调试过程本身对时间非常敏感。

分布式数据处理中的一个常见挑战是从不同的计算节点收集日志,并以一种可以在以后进行有效搜索以进行监视和调试的方式来组织日志。用于描述此日志收集过程的术语是 日志聚合。

日志聚合: 从不同来源收集日志,以提供整个系统的整体视图。

市场上有几种用于日志聚合的现成解决方案,它们带有自己的组件堆栈和操作困难。例如,行业中广泛使用的著名日志记录框架是ELK stack和Graylog。

不幸的是,没有适用于每个应用程序的明确解决方案,不同的日志记录解决方案可能更适合某些用例。例如,实时应用程序的日志处理也应实时进行,否则,我们会丢失及时信息,而这些信息可能无法成功运行系统。

在此博客文章中,我们将深入研究实时应用程序的日志记录。更具体地说,我们将:

a) 讨论流式应用程序的日志记录要求

b) 检查通用日志聚合系统的组件

c) 从头开始构建可扩展的日志聚合框架

d) 将我们的定制解决方案与现成的工具进行比较

记录流应用程序

在进入分布式流应用程序的日志记录需求之前,让我们退后一步,看看更传统的批处理。这些应用程序定期运行,处理大量数据,并产生关键的输出。在处理期间出现错误时,我们需要能够对其进行调试,并且我们的日志记录堆栈应始终为解决方案提供支持。

我们希望日志记录堆栈中有一些关键特性可以用于批处理:

• 从大量进程中收集日志

• 日志被索引以启用自由文本搜索

• 处理完成(完成或失败)后,日志立即可用

基于标准文件的日志记录通常适用于批处理应用程序,其一次性日志聚合步骤可在数据处理结束时收集和索引日志。从概念上讲,我们可以将日志聚合过程视为只是另一个批处理应用程序,该应用程序在另一个完成或失败时触发。

不幸的是,流应用程序的情况有所不同。与批处理应用程序相比,这些作业以24/7运行,产生连续的低延迟输出。出现问题时,我们需要尽快开始调试过程。希望在它表现为我们的生产系统停机之前。由于当我们不看时可能会发生奇怪的事情,因此理想情况下,我们还希望在日志记录框架中内置一些监视和警报功能。

让我们总结流应用程序的其他日志记录要求:

• 低延迟日志访问

• 随着时间的推移可扩展到大日志大小

• 监控/仪表板功能

分解日志堆栈

现在,我们已经清楚地了解了要解决的挑战,下面让我们看一下日志聚合堆栈中所需的组件。

大多数可用的日志记录框架由以下四个组件组成:

  • 日志追加程序
  • 日志提取
  • 存储和搜索层
  • 仪表板和警报层

日志追加程序 负责从应用程序进程中收集日志(在整个群集中运行),并确保将日志发送到下游进行提取。有各种追加程序可用,例如文件、控制台、数据库、消息队列等。

日志提取 是获取由附加程序收集的日志并将其放入存储层的步骤。这通常意味着清理和转换日志,然后将它们编入搜索引擎以方便用户使用。

存储和搜索层 通常是一个分布式搜索引擎,或者更简单的情况下,分布在日志存储和访问文件系统或数据库。

仪表板与警报层 就座于存储层的顶部。它为用户提供了交互式图形界面,用于搜索日志和可视化重要信息。它还通常包括警报功能。

这些组成我们的日志记录功能的组件本身也是生产应用程序。在理想情况下,各部分之间只是松散耦合,因此我们可以独立管理和操作它们而不会影响整个管道。整个日志系统的操作复杂性在很大程度上取决于各个组件。

现在,我们已经对生产级日志聚合堆栈的需求有了一个很好的概述,让我们动手做事,并使用从头开始就已经知道的系统来设置整个管道。

使用Flink、Kafka和Solr进行日志聚合

在此初始解决方案中,让我们使用Cloudera平台中可用的处理框架来构建可伸缩且完全可自定义的日志聚合堆栈。

我们的目标是建立一个日志聚合管道,以服务于我们的实时数据处理应用程序以及任何数据处理或其他类型的应用程序。

我们使用以下系统实现日志聚合组件:

a) Apache Kafka日志附加程序,用于可伸缩和低延迟的日志收集

b) 使用Apache Flink进行日志提取、索引编制和自定义监视

c) Apache Solr用于存储和搜索功能

d) Hue用于记录仪表板

在深入了解细节之前,让我们看一个高级示例,说明日志消息如何从我们的应用程序一直流向日志记录仪表板:

由于我们的数据处理作业在多台服务器上运行,因此每个工作节点(在Flink情况下为TaskManager)都将产生连续的日志流。这些日志将使用预先配置的日志附加程序自动发送到指定的Kafka主题。同时,与产生日志的应用程序完全分离,我们还有另一个Apache Flink流应用程序,它监听来自Kafka的日志消息。此摄取器流作业将接收传入的日志消息、对其进行解析、然后通过我们的Solr搜索引擎对其进行索引。负责流应用程序平稳运行的工程师可以直接在Solr中与索引日志交互,也可以使用Hue作为仪表板工具进行交互。

登录到Kafka

要解决的第一个挑战是将日志从生产应用程序收集到传输到摄取器组件。通常,有几种方法可以解决此问题,每种方法都有其起伏。

通过直接将日志索引到存储层,我们可以完全跳过整个日志收集/传输步骤。从理论上讲,这将给我们带来非常低的延迟,但是它将日志记录与获取和存储本身紧密结合在一起,从而导致系统脆弱:

• 摄取/存储逻辑的更改要求日志记录应用程序的更改

• 存储层的停机时间可能会影响正在运行的应用程序(或丢失日志)

• 存储系统本身可能无法扩展到传入连接的数量

由于这些原因,我们强烈希望将日志记录与摄取分开。鉴于这个关键的设计决策,我们仍然有不同的方法来将日志消息发送到日志接收器。

默认情况下,大多数应用程序都会写入日志文件,这些文件存储在主机本地。可以定期收集这些文件,但是不幸的是,随着越来越多的应用程序,它变得相当复杂,并且它也不能为我们的实时需求提供足够的延迟。

为了立即解决所有这些问题,我们决定将记录的消息视为任何其他实时数据源,并使用Apache Kafka作为传输层。Kafka在行业中被广泛用作实时数据的消息总线,并提供了我们记录的消息所需的所有功能:

• 可扩展到大量生产者应用程序和日志消息

• 易于与现有应用程序集成

• 提供低延迟的日志传输

大多数数据处理框架(包括Flink)都使用slf4j API,因此我们可以在幕后使用我们喜欢的Java日志记录框架来配置附加器逻辑。

kafka-log4j-appender模块实现了一个简单的log4j附加程序,该附加程序将应用程序日志发送到所需的Kafka主题。在本文中,我们将重点放在log4j上,但是这里的概念可以轻松地应用于其他日志记录框架。

我们引用了Cloudera Maven 存储库中 的kafka附加程序:

<dependency>   <groupId>org.apache.kafka</groupId>   <artifactId>kafka-log4j-appender</artifactId>   <version>2.3.0.7.0.3.0-79/</version></dependency>

要开始记录到Kafka,将以下内容添加到记录配置文件(log4j.properties):

log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppenderlog4j.appender.kafka.topic=flink.logslog4j.appender.kafka.brokerList=<your_broker_list>

在这个简单的配置片段中,我们配置了appender类,kafka代理和主题。选择主题时,我们可以决定让多个应用共享同一主题或使用特定于应用的主题。只要可以将应用程序日志彼此区分开(稍后会详细介绍),我们建议共享日志记录主题以简化提取,除非公司政策要求按职位或部门分开。

为了简化下游处理,我们决定将日志存储在JSON布局中。为此,我们使用以下依赖项:

<dependency>    <groupId>net.logstash.log4j</groupId>    <artifactId>jsonevent-layout</artifactId>    <version>1.7</version></dependency>

下载完所需的jar之后,可以将布局配置添加到相同的log4j.properties 文件中:

log4j.appender.kafka.layout=net.logstash.log4j.JSONEventLayoutV1.appender.kafka.layout=net.logstash.log4j.JSONEventLayoutV1

开箱即用的日志只是带有时间戳的简单消息,其中包含有关源类,主机名等的一些信息。不幸的是,如果我们运行类似的应用程序,或者同一数据处理作业的多个工作容器在同一运行主机,按实际应用程序对记录的消息进行分组非常困难。作为有效的解决方案,我们将yarnContainerId 附加到每个日志消息中,以唯一标识应用程序和工作程序。

我们使用了UserFields 可选设置来在我们的JSON日志中显示yarnContainerId 。

log4j.appender.kafka.layout.UserFields=yarnContainerId:${yarnContainerId}.appender.kafka.layout.UserFields=yarnContainerId:${yarnContainerId}

为此,log4j希望将yarnContainerId 设置为JVM上的系统属性(logback和log4j 2也支持解析环境变量)。运行Flink时不会自动填充该字段,但可以使用-DyarnContainerId =… 设置将其添加。

对于Flink,容器ID已存储在$ CONTAINER_ID 环境变量中,因此我们修改env.java.opts 使其包含以下额外的java属性:

env.java.opts.taskmanager: -DyarnContainerId=$CONTAINER_ID ....java.opts.taskmanager: -DyarnContainerId=$CONTAINER_ID ...env.java.opts.jobmanager: -DyarnContainerId=$CONTAINER_ID ....java.opts.jobmanager: -DyarnContainerId=$CONTAINER_ID ...

现在已经设置了所有详细信息,让我们快速看一下完整的log4j配置文件,该文件保留了原始的基于文件的日志记录,并添加了额外的Kafka 记录器:

log4j.rootLogger=INFO, file, kafka    # Avoid deadlock on appender startlog4j.logger.cloudera.shaded.org.apache.kafka=INFO, filelog4j.additivity.cloudera.shaded.org.apache.kafka=false  # Log all infos in the given filelog4j.appender.file=org.apache.log4j.FileAppenderlog4j.appender.file.file=${log.file}log4j.appender.file.append=falselog4j.appender.file.layout=org.apache.log4j.PatternLayoutlog4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n    log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppenderlog4j.appender.kafka.topic=flink.logslog4j.appender.kafka.brokerList=<your_broker_list>log4j.appender.kafka.layout=net.logstash.log4j.JSONEventLayoutV1log4j.appender.kafka.layout.UserFields=yarnContainerId:${yarnContainerId}

我们同时保留了基于文件和基于kafka的附加程序。这将使用最新数据填充Flink / YARN UI日志选项卡,并且仍将所有日志也定向到Kafka。

请注意,Kafka日志附加程序及其自己的日志可能存在死锁。为了避免出现这种极端情况,我们构建了kafka附加程序的阴影版本,其中kafka依赖项已重定位到:cloudera.shaded.org.apache.kafka 。这些类的日志仅定向到文件记录器。如果您使用香草kafka附加程序依赖项作为解决方法,则可以从kafka日志附加程序中排除所有kafka日志。

一旦启动应用程序,日志应该由flink.logs 主题接收。我们可以轻松地检查Kafka控制台使用者的使用情况:

kafka-console-consumer --bootstrap-server <broker>:9092 --topic flink.logs-console-consumer --bootstrap-server <broker>:9092 --topic flink.logs

正确设置所有内容后,我们应该会看到一些类似于以下内容的新消息:

{  "source_host": "gyula-2.gce.cloudera.com",  "method": "completePendingCheckpoint",  "level": "INFO",  "message": "Completed checkpoint 1 for job 5e70cf704ed010372e2007333db10cf0 (50738 bytes in 2721 ms).",  "mdc": {},  "yarnContainerId": "container_1571051884501_0001_01_000001",  "@timestamp": "2019-10-14T11:21:07.400Z",  "file": "CheckpointCoordinator.java",  "line_number": "906",  "thread_name": "jobmanager-future-thread-1",  "@version": 1,  "logger_name": "org.apache.flink.runtime.checkpoint.CheckpointCoordinator",  "class": "org.apache.flink.runtime.checkpoint.CheckpointCoordinator"}

快速检查yarnContainerId 字段应确保我们正确设置了flink配置。

至此,我们已经在日志记录方面进行了所有设置。我们的应用程序所有日志最终都存储在Kafka中,可以进行提取了。

圆满完成

在这一点上,我们对分布式数据处理应用程序的日志记录的挑战有一个很好的概述。我们探讨了实时流处理应用程序的特定要求,并查看了端到端日志记录解决方案所需的组件。

承担在Cloudera平台上自行构建定制的日志聚合管道的任务,我们已经制定了计划并开始实施日志附加器和收集逻辑。我们已经使用JSON日志格式为Flink应用程序成功配置了基于Kafka的日志记录,当我们提取这些日志时,将在下一步中派上用场。

在第2部分中,我们将使用摄取和仪表板组件来完善日志聚合管道,并研究如何将现成的框架与我们的自定义解决方案进行比较。

原文链接:https://blog.cloudera.com/real-time-log-aggregation-with-flink-part-1/