使用Flink進行實時日誌聚合:第一部分

  • 2020 年 3 月 10 日
  • 筆記

由 Gyula Fora 和Matyas Orhidi 撰寫

介紹

我們中的許多人都經歷過無可奈何地挖掘多個服務器上的日誌文件以解決嚴重生產問題的感覺。我們可能都同意這遠非理想。在處理實時處理應用程序時,查找和搜索日誌文件更具挑戰性,因為調試過程本身對時間非常敏感。

分佈式數據處理中的一個常見挑戰是從不同的計算節點收集日誌,並以一種可以在以後進行有效搜索以進行監視和調試的方式來組織日誌。用於描述此日誌收集過程的術語是 日誌聚合。

日誌聚合: 從不同來源收集日誌,以提供整個系統的整體視圖。

市場上有幾種用於日誌聚合的現成解決方案,它們帶有自己的組件堆棧和操作困難。例如,行業中廣泛使用的著名日誌記錄框架是ELK stack和Graylog。

不幸的是,沒有適用於每個應用程序的明確解決方案,不同的日誌記錄解決方案可能更適合某些用例。例如,實時應用程序的日誌處理也應實時進行,否則,我們會丟失及時信息,而這些信息可能無法成功運行系統。

在此博客文章中,我們將深入研究實時應用程序的日誌記錄。更具體地說,我們將:

a) 討論流式應用程序的日誌記錄要求

b) 檢查通用日誌聚合系統的組件

c) 從頭開始構建可擴展的日誌聚合框架

d) 將我們的定製解決方案與現成的工具進行比較

記錄流應用程序

在進入分佈式流應用程序的日誌記錄需求之前,讓我們退後一步,看看更傳統的批處理。這些應用程序定期運行,處理大量數據,併產生關鍵的輸出。在處理期間出現錯誤時,我們需要能夠對其進行調試,並且我們的日誌記錄堆棧應始終為解決方案提供支持。

我們希望日誌記錄堆棧中有一些關鍵特性可以用於批處理:

• 從大量進程中收集日誌

• 日誌被索引以啟用自由文本搜索

• 處理完成(完成或失敗)後,日誌立即可用

基於標準文件的日誌記錄通常適用於批處理應用程序,其一次性日誌聚合步驟可在數據處理結束時收集和索引日誌。從概念上講,我們可以將日誌聚合過程視為只是另一個批處理應用程序,該應用程序在另一個完成或失敗時觸發。

不幸的是,流應用程序的情況有所不同。與批處理應用程序相比,這些作業以24/7運行,產生連續的低延遲輸出。出現問題時,我們需要儘快開始調試過程。希望在它表現為我們的生產系統停機之前。由於當我們不看時可能會發生奇怪的事情,因此理想情況下,我們還希望在日誌記錄框架中內置一些監視和警報功能。

讓我們總結流應用程序的其他日誌記錄要求:

• 低延遲日誌訪問

• 隨着時間的推移可擴展到大日誌大小

• 監控/儀錶板功能

分解日誌堆棧

現在,我們已經清楚地了解了要解決的挑戰,下面讓我們看一下日誌聚合堆棧中所需的組件。

大多數可用的日誌記錄框架由以下四個組件組成:

  • 日誌追加程序
  • 日誌提取
  • 存儲和搜索層
  • 儀錶板和警報層

日誌追加程序 負責從應用程序進程中收集日誌(在整個群集中運行),並確保將日誌發送到下游進行提取。有各種追加程序可用,例如文件、控制台、數據庫、消息隊列等。

日誌提取 是獲取由附加程序收集的日誌並將其放入存儲層的步驟。這通常意味着清理和轉換日誌,然後將它們編入搜索引擎以方便用戶使用。

存儲和搜索層 通常是一個分佈式搜索引擎,或者更簡單的情況下,分佈在日誌存儲和訪問文件系統或數據庫。

儀錶板與警報層 就座於存儲層的頂部。它為用戶提供了交互式圖形界面,用於搜索日誌和可視化重要信息。它還通常包括警報功能。

這些組成我們的日誌記錄功能的組件本身也是生產應用程序。在理想情況下,各部分之間只是鬆散耦合,因此我們可以獨立管理和操作它們而不會影響整個管道。整個日誌系統的操作複雜性在很大程度上取決於各個組件。

現在,我們已經對生產級日誌聚合堆棧的需求有了一個很好的概述,讓我們動手做事,並使用從頭開始就已經知道的系統來設置整個管道。

使用Flink、Kafka和Solr進行日誌聚合

在此初始解決方案中,讓我們使用Cloudera平台中可用的處理框架來構建可伸縮且完全可自定義的日誌聚合堆棧。

我們的目標是建立一個日誌聚合管道,以服務於我們的實時數據處理應用程序以及任何數據處理或其他類型的應用程序。

我們使用以下系統實現日誌聚合組件:

a) Apache Kafka日誌附加程序,用於可伸縮和低延遲的日誌收集

b) 使用Apache Flink進行日誌提取、索引編製和自定義監視

c) Apache Solr用於存儲和搜索功能

d) Hue用於記錄儀錶板

在深入了解細節之前,讓我們看一個高級示例,說明日誌消息如何從我們的應用程序一直流向日誌記錄儀錶板:

由於我們的數據處理作業在多台服務器上運行,因此每個工作節點(在Flink情況下為TaskManager)都將產生連續的日誌流。這些日誌將使用預先配置的日誌附加程序自動發送到指定的Kafka主題。同時,與產生日誌的應用程序完全分離,我們還有另一個Apache Flink流應用程序,它監聽來自Kafka的日誌消息。此攝取器流作業將接收傳入的日誌消息、對其進行解析、然後通過我們的Solr搜索引擎對其進行索引。負責流應用程序平穩運行的工程師可以直接在Solr中與索引日誌交互,也可以使用Hue作為儀錶板工具進行交互。

登錄到Kafka

要解決的第一個挑戰是將日誌從生產應用程序收集到傳輸到攝取器組件。通常,有幾種方法可以解決此問題,每種方法都有其起伏。

通過直接將日誌索引到存儲層,我們可以完全跳過整個日誌收集/傳輸步驟。從理論上講,這將給我們帶來非常低的延遲,但是它將日誌記錄與獲取和存儲本身緊密結合在一起,從而導致系統脆弱:

• 攝取/存儲邏輯的更改要求日誌記錄應用程序的更改

• 存儲層的停機時間可能會影響正在運行的應用程序(或丟失日誌)

• 存儲系統本身可能無法擴展到傳入連接的數量

由於這些原因,我們強烈希望將日誌記錄與攝取分開。鑒於這個關鍵的設計決策,我們仍然有不同的方法來將日誌消息發送到日誌接收器。

默認情況下,大多數應用程序都會寫入日誌文件,這些文件存儲在主機本地。可以定期收集這些文件,但是不幸的是,隨着越來越多的應用程序,它變得相當複雜,並且它也不能為我們的實時需求提供足夠的延遲。

為了立即解決所有這些問題,我們決定將記錄的消息視為任何其他實時數據源,並使用Apache Kafka作為傳輸層。Kafka在行業中被廣泛用作實時數據的消息總線,並提供了我們記錄的消息所需的所有功能:

• 可擴展到大量生產者應用程序和日誌消息

• 易於與現有應用程序集成

• 提供低延遲的日誌傳輸

大多數數據處理框架(包括Flink)都使用slf4j API,因此我們可以在幕後使用我們喜歡的Java日誌記錄框架來配置附加器邏輯。

kafka-log4j-appender模塊實現了一個簡單的log4j附加程序,該附加程序將應用程序日誌發送到所需的Kafka主題。在本文中,我們將重點放在log4j上,但是這裡的概念可以輕鬆地應用於其他日誌記錄框架。

我們引用了Cloudera Maven 存儲庫中 的kafka附加程序:

<dependency>   <groupId>org.apache.kafka</groupId>   <artifactId>kafka-log4j-appender</artifactId>   <version>2.3.0.7.0.3.0-79/</version></dependency>

要開始記錄到Kafka,將以下內容添加到記錄配置文件(log4j.properties):

log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppenderlog4j.appender.kafka.topic=flink.logslog4j.appender.kafka.brokerList=<your_broker_list>

在這個簡單的配置片段中,我們配置了appender類,kafka代理和主題。選擇主題時,我們可以決定讓多個應用共享同一主題或使用特定於應用的主題。只要可以將應用程序日誌彼此區分開(稍後會詳細介紹),我們建議共享日誌記錄主題以簡化提取,除非公司政策要求按職位或部門分開。

為了簡化下游處理,我們決定將日誌存儲在JSON布局中。為此,我們使用以下依賴項:

<dependency>    <groupId>net.logstash.log4j</groupId>    <artifactId>jsonevent-layout</artifactId>    <version>1.7</version></dependency>

下載完所需的jar之後,可以將布局配置添加到相同的log4j.properties 文件中:

log4j.appender.kafka.layout=net.logstash.log4j.JSONEventLayoutV1.appender.kafka.layout=net.logstash.log4j.JSONEventLayoutV1

開箱即用的日誌只是帶有時間戳的簡單消息,其中包含有關源類,主機名等的一些信息。不幸的是,如果我們運行類似的應用程序,或者同一數據處理作業的多個工作容器在同一運行主機,按實際應用程序對記錄的消息進行分組非常困難。作為有效的解決方案,我們將yarnContainerId 附加到每個日誌消息中,以唯一標識應用程序和工作程序。

我們使用了UserFields 可選設置來在我們的JSON日誌中顯示yarnContainerId 。

log4j.appender.kafka.layout.UserFields=yarnContainerId:${yarnContainerId}.appender.kafka.layout.UserFields=yarnContainerId:${yarnContainerId}

為此,log4j希望將yarnContainerId 設置為JVM上的系統屬性(logback和log4j 2也支持解析環境變量)。運行Flink時不會自動填充該字段,但可以使用-DyarnContainerId =… 設置將其添加。

對於Flink,容器ID已存儲在$ CONTAINER_ID 環境變量中,因此我們修改env.java.opts 使其包含以下額外的java屬性:

env.java.opts.taskmanager: -DyarnContainerId=$CONTAINER_ID ....java.opts.taskmanager: -DyarnContainerId=$CONTAINER_ID ...env.java.opts.jobmanager: -DyarnContainerId=$CONTAINER_ID ....java.opts.jobmanager: -DyarnContainerId=$CONTAINER_ID ...

現在已經設置了所有詳細信息,讓我們快速看一下完整的log4j配置文件,該文件保留了原始的基於文件的日誌記錄,並添加了額外的Kafka 記錄器:

log4j.rootLogger=INFO, file, kafka    # Avoid deadlock on appender startlog4j.logger.cloudera.shaded.org.apache.kafka=INFO, filelog4j.additivity.cloudera.shaded.org.apache.kafka=false  # Log all infos in the given filelog4j.appender.file=org.apache.log4j.FileAppenderlog4j.appender.file.file=${log.file}log4j.appender.file.append=falselog4j.appender.file.layout=org.apache.log4j.PatternLayoutlog4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n    log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppenderlog4j.appender.kafka.topic=flink.logslog4j.appender.kafka.brokerList=<your_broker_list>log4j.appender.kafka.layout=net.logstash.log4j.JSONEventLayoutV1log4j.appender.kafka.layout.UserFields=yarnContainerId:${yarnContainerId}

我們同時保留了基於文件和基於kafka的附加程序。這將使用最新數據填充Flink / YARN UI日誌選項卡,並且仍將所有日誌也定向到Kafka。

請注意,Kafka日誌附加程序及其自己的日誌可能存在死鎖。為了避免出現這種極端情況,我們構建了kafka附加程序的陰影版本,其中kafka依賴項已重定位到:cloudera.shaded.org.apache.kafka 。這些類的日誌僅定向到文件記錄器。如果您使用香草kafka附加程序依賴項作為解決方法,則可以從kafka日誌附加程序中排除所有kafka日誌。

一旦啟動應用程序,日誌應該由flink.logs 主題接收。我們可以輕鬆地檢查Kafka控制台使用者的使用情況:

kafka-console-consumer --bootstrap-server <broker>:9092 --topic flink.logs-console-consumer --bootstrap-server <broker>:9092 --topic flink.logs

正確設置所有內容後,我們應該會看到一些類似於以下內容的新消息:

{  "source_host": "gyula-2.gce.cloudera.com",  "method": "completePendingCheckpoint",  "level": "INFO",  "message": "Completed checkpoint 1 for job 5e70cf704ed010372e2007333db10cf0 (50738 bytes in 2721 ms).",  "mdc": {},  "yarnContainerId": "container_1571051884501_0001_01_000001",  "@timestamp": "2019-10-14T11:21:07.400Z",  "file": "CheckpointCoordinator.java",  "line_number": "906",  "thread_name": "jobmanager-future-thread-1",  "@version": 1,  "logger_name": "org.apache.flink.runtime.checkpoint.CheckpointCoordinator",  "class": "org.apache.flink.runtime.checkpoint.CheckpointCoordinator"}

快速檢查yarnContainerId 字段應確保我們正確設置了flink配置。

至此,我們已經在日誌記錄方面進行了所有設置。我們的應用程序所有日誌最終都存儲在Kafka中,可以進行提取了。

圓滿完成

在這一點上,我們對分佈式數據處理應用程序的日誌記錄的挑戰有一個很好的概述。我們探討了實時流處理應用程序的特定要求,並查看了端到端日誌記錄解決方案所需的組件。

承擔在Cloudera平台上自行構建定製的日誌聚合管道的任務,我們已經制定了計劃並開始實施日誌附加器和收集邏輯。我們已經使用JSON日誌格式為Flink應用程序成功配置了基於Kafka的日誌記錄,當我們提取這些日誌時,將在下一步中派上用場。

在第2部分中,我們將使用攝取和儀錶板組件來完善日誌聚合管道,並研究如何將現成的框架與我們的自定義解決方案進行比較。

原文鏈接:https://blog.cloudera.com/real-time-log-aggregation-with-flink-part-1/