Kafka與ELK實現一個日誌系統
- 2022 年 2 月 28 日
- 筆記
- elasticsearch, ELK, JAVA, kafka
1.概述
客戶端應用程序在運行過程中可能會產生錯誤,例如調用服務端接口超時、客戶端處理業務邏輯發生異常、應用程序突然閃退等。這些異常信息都是會產生日誌記錄的,並通過上報到指定的日誌服務器進行壓縮存儲。 本篇博客以一個應用實時日誌分析平台作為案例來講述ELK(ElasticSearch、LogStash、Kibana)在實際業務中的具體用法,讓讀者能夠從中理解ELK適用的業務場景及實現細節。
2.內容
在傳統的應用場景中,對於這些上報的異常日誌信息。通常適用Linux命令去分析定位問題,如果日誌數據量小,也許不會覺得有什麼不適。假若面對的是海量的異常日誌信息,這時還用Linux命令去逐一查看、定位,這將是災難性的。需要花費大量的時間、精力去查閱這些異常日誌,而且效率也不高。 因此,構建一個應用實時日誌分析平台就顯得很有必要。通過對這些異常日誌進行集中管理(包括採集、存儲、展示),用戶可以在這樣一個平台上按照自己的想法來實現對應的需求。
1.自定義需求
用戶可以通過瀏覽器界面訪問Kibana來制定不同的篩選規則,查詢存儲在ElasticSearch集群中的異常日誌數據。返回的結果在瀏覽器界面通過表格或者JSON對象的形式進行展示,一目了然。
2.命令接口
對於周期較長的歷史數據,如果不需要可以進行刪除。在Kibana中提供了操作ElasticSearch的接口,通過執行刪除命令來清理ElasticSearch中無效的數據。
3.結果導出與共享
在Kibana系統中,分析完異常日誌後可以將這些結果直接導出或者共享。Kibana的瀏覽器界面支持一鍵式結果導出與數據分享,不需要額外的去編寫代碼來實現。
2.1 架構與剖析
搭建實時日誌分析平台涉及到的組件有ElasticSearch、LogStash、Kibana、Kafka,它們各自負責的功能如下:
- ElasticSearch:負責分佈式存儲日誌數據,給Kibana提供可視化的數據源;
- LogStash:負責消費Kafka消息隊列中的原始數據,並將消費的數據上報到ElasticSearch進行存儲;
- Kibana:負責可視化ElasticSearch中存儲的數據,並提供查詢、聚合、圖表、導出等功能;
- Kafka:負責集中管理日誌信息,並做數據分流。例如,Flume、LogStash、Spark Streaming等。
1.架構
將日誌服務器託管的壓縮日誌統計收集到Kafka消息隊列,有Kafka實現數據分流。通過LogStash工具消費Kafka中存儲的消息數據,並將消費後的數據寫入到ElasticSearch進行存儲,最後通過Kibana工具來查詢、分析ElasticSearch中存儲的數據,整個體系架構如圖10-17所示。
數據源的收集可以採用不同的方式,使用Flume Agent採集數據則省略了額外的編碼工作,使用Java API讀取日誌信息則需要額外的編寫代碼來實現。
這兩種方式將採集的數據均輸送到Kafka集群中進行存儲,這裡使用Kafka主要是方便業務拓展,如果直接對接LogStash,那麼後續如果需要使用Spark Streaming來進行消費日誌數據,就能很方便的從Kafka集群中消費Topic來獲取數據。這裡Kafka起到了很好的數據分流作用。
2.模塊分析
實時日誌分析平台可以拆分為幾個核心模塊,它們分別是數據源準備、數據採集、數據分流、數據存儲、數據查看與分析。在整個平台系統中,它們執行的流程需要按照固定的順序來完成,如下圖所示。
a. 數據源準備
數據源是由異常壓縮日誌構成的,這些日誌分別由客戶端執行業務邏輯、調用服務端接口這類操作產生。然後將這些日誌進行壓縮存儲到日誌服務器。
b. 數據採集
採集數據源的方式有很多,可以選擇開源的日誌採集工具(如Apache Flume、LogStash、Beats),使用這些現有的採集工具的好處在於省略了編碼工作,通過編輯工具的配置文件即可快速使用,缺點在於針對一些特定的業務場景,可能無法滿足。
另外一種方式是使用應用編程接口(API)來採集,例如使用Java API讀取待採集的數據源,然後調用Kafka接口將數據寫入到Kafka消息隊列中進行存儲。這種方式的好處在於對於需要的實現是可控的,缺點在於編碼實現時需要考慮很多因素,比如程序的性能、穩定性、可擴展性等。
c. 數據分流
在一個海量數據應用場景中,數據採集的Agent是有很多個的,如果直接將採集的數據寫入到ElasticSearch進行數據存儲,那麼ElasticSearch需要同時處理所有的Agent上報的數據,這樣會給ElasticSearch集群服務端造成很大的壓力。
因此需要有個緩衝區來緩解ElasticSearch集群服務端的壓力。這裡使用Kafka來做數據分流,將Agent上報的數據存儲到消息隊列。然後在通過消費Kafka中的Topic消息數據後存儲到ElasticSearch集群中,這樣不僅可以緩解ElasticSearch集群服務端的壓力,而且還能提高整個系統的性能、穩定性、擴展性。
d. 數據存儲
這裡使用ElasticSearch集群來作為日誌最終的存儲介質。通過消費Kafka集群中的Topic數據,按照不同的索引(Index)和類型(Type)存儲到ElasticSearch集群中。
e. 數據可視化
異常日誌數據落地在ElasticSearch集群中,可以通過Kibana來實現可視化功能。用戶可以自定義規則來查詢ElasticSearch集群中的數據,並將查詢的結果以表格或者JSON對象形式輸出。同時,Kibana還提供了一鍵導出功能,將這些查詢的結果從Kibana瀏覽器界面導出到本地。
3.實戰演練
1. 數據源採集
這裡通過Apache Flume工具將上報的異常日誌數據採集到Kafka集群進行存儲。在日誌服務器部署一個Flume Agent進行數據採集,Flume配置文件所包含的內容見如下代碼:
# 設置代理別名 agent.sources = s1 agent.channels = c1 agent.sinks = k1 # 設置收集方式 agent.sources.s1.type=exec agent.sources.s1.command=tail -F /data/soft/new/error/logs/apps.log agent.sources.s1.channels=c1 agent.channels.c1.type=memory agent.channels.c1.capacity=10000 agent.channels.c1.transactionCapacity=100 # 設置Kafka接收器 agent.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink # 設置Kafka的broker地址和端口號 agent.sinks.k1.brokerList=dn1:9092,dn2:9092,dn3:9092 # 設置Kafka的Topic agent.sinks.k1.topic=error_es_apps # 設置序列化方式 agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder # 指定管道別名 agent.sinks.k1.channel=c1
然後在Kafka集群上使用命令創建名為「error_es_apps」的Topic,創建命令如下所示:
# 創建Topic,3個副本,6個分區 [hadoop@dn1 ~]$kafka-topics.sh --create –zookeeper\ dn1:2181,dn2:2181,dn3:2181 --replication-factor 3\ --partitions 6 --topic error_es_apps
接着啟動Flume Agent代理服務,具體命令如下所示:
# 在日誌服務器上啟動Agent服務 [hadoop@dn1 ~]$ flume-ng agent -n agent -c conf -f $FLUME_HOME/conf/flume-kafka.properties\ -Dflume.root.logger=DEBUG,CONSOLE
2. 數據分流
將採集的數據存儲到Kafka消息隊列後,可以供其他工具或者應用程序消費來進行數據分流。例如,通過使用LogStash來消費業務數據並將消費後的數據存儲到ElasticSearch集群中。
如果LogStash沒有按照X-Pack插件,這裡可以提前安裝該插件。具體命令如下所示:
# 在線安裝 [hadoop@nna bin]$ ./logstash-plugin install x-pack # 離線安裝 [hadoop@nna bin]$ ./logstash-plugin install file:///tmp/x-pack-6.1.1.zip
安裝成功後,在Linux控制台會打印日誌信息,如下圖所示:
然後,在logstash.yml文件中配置LogStash的用戶名和密碼,具體配置內容見代碼:
# 用戶名 xpack.monitoring.elasticsearch.username: "elastic" # 密碼 xpack.monitoring.elasticsearch.password: "123456"
最後配置LogStash的屬性,連接到Kafka集群進行消費。具體實現內容見代碼:
# 配置輸入源信息 input{ kafka{ bootstrap_servers => "dn1:9092,dn2:9092,dn3:9092" group_id => "es_apps" topics => ["error_es_apps"] } } # 配置輸出信息 output{ elasticsearch{ hosts => ["nna:9200","nns:9200","dn1:9200"] index => "error_es_apps-%{+YYYY.MM.dd}" user => "elastic" password => "123456" } }
在配置輸出到ElasticSearch集群信息時,索引建議以「業務名稱-時間戳」來進行命名,這樣做的好處在於後續刪除數據的時候,可以很方便的根據索引來刪除。由於配置了權限認證,索引需要設置用戶名和密碼。 配置完成後,在Linux控制台執行LogStash命令來消費Kafka集群中的數據。具體操作命令如下所示:
# 啟動LogStash消費命令
[hadoop@nna ~]$ logstash -f $LOGSTASH_HOME/config/kafka2es.conf
如果配置文件內容正確,LogStash Agent將正常啟動消費Kafka集群中的消息數據,並將消費後的數據存儲到ElasticSearch集群中。 啟動LogStash Agent之後,它會一直在Linux操作系統後台運行。如果Kafka集群中Topic有新的數據產生,LogStash Agent會立刻開始消費Kafka集群中的Topic裏面新增的數據。
3. 數據可視化
當數據存儲到ElasticSearch集群後,可以通過Kibana來查詢、分析數據。單擊「Management」模塊後,在跳轉後的頁面中找到「Kibana-Index Patterns」來添加新創建的索引(Index)。在添加完成創建的索引後,單擊「Discover」模塊,然後選擇不同的索引來查詢、分析ElasticSearch集群中的數據
4.結束語
這篇博客就和大家分享到這裡,如果大家在研究學習的過程當中有什麼問題,可以加群進行討論或發送郵件給我,我會盡我所能為您解答,與君共勉!
另外,博主出書了《Kafka並不難學》和《Hadoop大數據挖掘從入門到進階實戰》,喜歡的朋友或同學, 可以在公告欄那裡點擊購買鏈接購買博主的書進行學習,在此感謝大家的支持。關注下面公眾號,根據提示,可免費獲取書籍的教學視頻。