「視頻小課堂」Logstash如何成為鎮得住場面的數據管道(文字版)
視頻地址
B站視頻地址:Logstash如何成為鎮得住場面的數據管道
公眾號視頻地址:Logstash如何成為鎮得住場面的數據管道
知乎視頻地址:Logstash如何成為鎮得住場面的數據管道
內容
首先我們延續上一期視頻中日誌採集架構的案例,Filebeat採集日誌並推送Kafka消息隊列進行分發,再由Logstash消費日誌消息,並將日誌數據最終落地在Elasticsearch集群索引當中,Kafka作為消息隊列分發服務需要將收集到的日誌消息繼續分發下去,最終數據落地在Elasticsearch集群索引當中。
那麼連接整個過程的主角Logstash是如何工作的,就是我們今天講解的重點。
Logstash工作過程分為三個部分:Input輸入、Filter過濾、Output輸出,它們共同協作形成了完整的Logstash數據管道傳輸機制
我們先從一個最簡單的例子演示開始,看看Logstash是怎麼輸入和輸出的,這一次先跳過filter過濾環節。
下面查看已經預置好的一個配置文件01-kafka-elastic-nginx.conf
首先是input輸入配置點,從Kafka訂閱消息,Kafka集群地址與filebeat中都指向了一個地址,其他配置我們先略過,後續Kafka專題再說
下來看到要訂閱的Topic主題TestT3,我們先不用json格式解碼消息,默認就是純文本的方式
一樣的,這一步先略過過濾環節,直接看看output輸出配置點,目標是給Elasticsearch輸出數據,並指定了elasticsearch集群的三個節點
輸出環節創建需要寫入的elasticsearch日誌索引,我們先按照默認的filebeat採集時間,進行日期格式化,按照每個小時建立一個索引,這塊會有時間問題,一會兒再說。
讓數據輸出到終端,方便我們調試結果。
通過演示中最簡單的配置方式,這時候的Logstash已經成為連接Kafka和Elastisearch之間的數據管道了!
好,接下來我們將所有系統運行起來,並生成一條nginx請求日誌,看看管道各個階段的數據變化。
首先nginx日誌數據被filebeat採集,是一條典型的無結構的文本日誌數據,大家注意紅色標註的時間是2021年2月21日13時
接着這條日誌數據通過Kafka進入到了Logstash管道的輸入階段,
Logstash為這條日誌生成了更為非常龐大的Json數據,裏面包括了所有被採集主機的信息,以及nginx日誌,實際上這些原始信息並沒有被良好的進行數據清洗與結構化
最後數據被寫入到Elastisearch一個按小時劃分的索引當中,對應時間為2021年2月21日5時
我們發現Logstash對原始數據在沒有任何處理的情況下,會很不方便將來數據的使用;
這次我們利用Logstash json解碼器讓管道重新再來一次,
接下來我們進入Logstash中對應的配置文件,並找到input輸入點的codec配置,刪掉注釋,打開Logstash對輸入數據的json解碼方式·。
我們看看再次進入管道中的日誌數據,Logstash首先對原始日誌數據進行Json解析
這時候我們再看Json解析後的數據,是不是就清晰多了,filebeat採集到的本地機器數據、以及紅色框中Nginx HTTP日誌數據、以及其他標籤數據都進行了字段分離
做到這一步其實還是不夠好,為什麼呢?一方面因為我們依然希望將Nginx HTTP的日誌數據也進行結構化處理,
另一個方面,Filebeat傳遞給Logstash的系統時間是慢了8個小時的UTC時間標準,反而Nginx日誌中的時間是我們本地的北京時間標準,因此我們希望用Nginx日誌時間作為創建Elasticsearch日誌索引的唯一依據
這時候我們就要使用Logstash的過濾機制了,我們繼續進入Logstash對應的配置中,刪掉過濾配置中的注釋,讓Logstash過濾最常用插件grok、date、ruby、mutate起作用
grok插件是專業處理非結構化數據的能手,通過自定義的Nginx日誌正則表達式,就能實現Nginx日誌的結構化解析
date插件用於處理時間問題,我們通過date插件將nginx日誌中的時間轉換成Logstash時間對象,並賦給一個新的臨時時間字段indextime
ruby就是在過濾過程中可以插入ruby腳本語言來進行程序級處理,我們通過ruby語言對indextime時間格式化,生成一個精確到小時的字符串字段index.date,用於elasticsearch索引名稱
mutate是最常用的可以對管道中數據字段進行操作的插件了,我們的目的是刪除臨時時間字段indextime
最後我們還需要將output輸出中的索引生成方式修改一下,注釋掉原來用filebeat生默認時間生成的索引,改成nginx日誌時間生成的索引。
我們重新運行Logstash,數據經過了Input解碼、日誌grok結構化處理、本地時間對象創建,並進行日期格式化,為了生成新的Elasticsearch索引字段,並對臨時字段進行刪除,最終經過Output輸出階段,創建Elasticsearch索引或寫入日誌數據
讓我們看看Elasticsearch最終保存的數據效果,index索引對應的時間來自過濾器創建的index.date字段,index.date字段又來自nginx日誌中分離出的本地時間。這樣我們就不用再去修改Logstash的系統時間了
我們看到菱形標註的字段數據就是由過濾器對nginx http日誌進行結構化抽取的結果,
同樣elasticsearch依然保存着nginx日誌的原始數據以備不時之需
公眾號 “讀位元組” 大數據(技術、架構、應用)的深度,專業解讀


