ELK技術-Logstash
1.背景
1.1 簡介
Logstash 是一個功能強大的工具,可與各種部署集成。 它提供了大量插件,可幫助業務做解析,豐富,轉換和緩衝來自各種來源的數據。

Logstash 是一個數據流引擎
- 它是用於數據物流的開源流式 ETL(Extract-Transform-Load)引擎
- 在幾分鐘內建立數據流管道
- 具有水平可擴展及韌性且具有自適應緩衝
- 不可知的數據源
- 具有 200 多個集成和處理器的插件生態系統
- 使用 Elastic Stack 監視和管理部署
Logstash 幾乎可以攝入各種類別的數據
它可以攝入日誌,文件,指標或者網路真實數據。經過 Logstash 的處理,變為可以使用的 Web Apps 可以消耗的數據,也可以存儲於數據中心,或變為其它的流式數據。
Logstash 相關概念
- Logstash 實例是一個正在運行的 Logstash 進程。建議在 Elasticsearch 的單獨主機上運行 Logstash,以確保兩個組件有足夠的計算資源可用。
- 管道(pipeline)是配置為處理給定工作負載的插件集合。一個 Logstash 實例可以運行多個管道。(彼此獨立)
- 輸入插件(input plugins)用於從給定的源系統中提取或接收數據。 Logstash 參考指南中提供了支持的輸入插件列表://www.elastic.co/guide/en/logstash/current/input-plugins.html
- 過濾器插件(filter plugin)用於對傳入事件應用轉換和豐富。 Logstash 參考指南中提供了支持的過濾器插件列表:Filter plugins | Logstash Reference [8.3] | Elastic
- 輸出插件(output plugin)用於將數據加載或發送到給定的目標系統。 Logstash 參考指南中提供了支持的輸出插件列表://www.elastic.co/guide/en/logstash/current/output-plugins.html
Logstash 包含3個主要部分: 輸入(inputs),過濾器(filters)和輸出(outputs)。 你必須定義這些過程的配置才能使用 Logstash,儘管不是每一個都必須的。在有些情況下,可以甚至沒有過濾器。在過濾器的部分,它可以對數據源的數據進行分析,豐富,處理等。
1.2 學習參考
- Logstash官方文檔:《Logstash官方文檔》
- 中國社區官方博客:《Logstash入門教程》
- 其他參考技術博客:《通過Logstash實現mysql數據定時增量同步到ES》
- Logstash解析:《解析插件-Grok》
1.3 本例測試版本
[root@dev1613 study]# sudo -u logstash ../bin/logstash --version Using bundled JDK: /opt/logstash/jdk logstash 7.12.1
2.功能應用
2.1 基礎測試
輸入測試命令,../bin為當前執行命令所在文件夾,與logstash安裝後bin的相對目錄位置。
sudo -u logstash ../bin/logstash -e 'input { stdin { } } output { stdout {} }'
執行命令後,輸出結果如圖:

2.2 Logstash解析日誌文件
最原始的 Log 數據,經過 Logstash 的處理,可以把非結構化的數據變成結構化的數據。甚至可以使用 Logstash 強大的 Filter 來對數據繼續加工。最終將加工後的數據存儲下來,用於分析和搜索。

日誌原始內容
2022-07-06 18:48:37.453 ERROR 14677 --- [ dispatcher 108] c.a.c.s.dashboard.metric.MetricFetcher : Failed to fetch metric from <http://10.32.4.230:8719/metric?startTime=1657104506000&endTime=1657104512000&refetch=false>: socket timeout 2022-07-06 18:48:44.439 ERROR 14677 --- [ dispatcher 109] c.a.c.s.dashboard.metric.MetricFetcher : Failed to fetch metric from <http://10.32.4.230:8719/metric?startTime=1657104513000&endTime=1657104519000&refetch=false>: socket timeout 2022-07-06 18:48:51.514 ERROR 14677 --- [ dispatcher 110] c.a.c.s.dashboard.metric.MetricFetcher : Failed to fetch metric from <http://10.32.4.230:8719/metric?startTime=1657104520000&endTime=1657104526000&refetch=false>: socket timeout
Logstash配置文件
編寫日誌解析配置文件,並解析時間,錯誤級別,錯誤行,錯誤信息。提取出來變為結構化數據。編寫配置文件如下:
配置相關節點參考官方文檔:《plugins-inputs-file》
input { file { path => "/opt/logstash/study/outlog.log" start_position => "beginning" stat_interval => "3" type => "sentinel-log" } } filter { grok { match => ["message","%{TIMESTAMP_ISO8601:datetime} %{LOGLEVEL:loglevel} %{NUMBER:textid} %{GREEDYDATA:errormsg}"] } json { source => "request" } } output { stdout { codec => rubydebug } }
Grok日誌解析在線測試
基於elastic在線網頁,可編寫解析日誌測試demo。

日誌解析結構化輸出
運行命令:sudo -u logstash ../bin/logstash -f study-file-es.conf
運行logstash加載配置文件命令,啟動測試輸出結構化內容如下:

2.3 Logstash-數據庫同步
本例將MySql數據表中的數據,基於修改時間同步到es數據存儲中心。
基礎數據內容
數據源-mysql數據表建表語句:
CREATE TABLE `study_logstash_es` ( `id` int NOT NULL AUTO_INCREMENT COMMENT '自增主鍵', `study_code` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '編碼', `study_name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '名稱', `study_tag` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT '標籤', `study_level` smallint NOT NULL DEFAULT '0' COMMENT '等級,如1,2,3', `is_delete` tinyint unsigned NOT NULL DEFAULT '0' COMMENT '0 未刪除 1 刪除', `create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '創建時間', `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新時間', `operate_user` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '操作人', PRIMARY KEY (`id`), UNIQUE KEY `uniq_study_code` (`study_code`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='學習-logstash同步msql數據到es';
目標源-es索引創建腳本:
PUT /study_logstash_es { "settings": { "index": { "number_of_shards": 1, "number_of_replicas": 1 } }, "mappings": { "properties": { "id": { "type": "integer" }, "study_code": { "type": "text" }, "study_name": { "type": "text" }, "operate_user": { "type": "text" }, "study_tag": { "type": "keyword" }, "is_delete": { "type": "integer" }, "study_level": { "type": "integer" }, "mark_time": { "type": "date", "format": "epoch_millis" }, "update_time": { "type": "date" } } } }
Logstash配置文件
本例測試的數據庫地址,es地址,已經基於xxx脫敏。更多jdbc的配置,請參考官方文檔:《plugins-inputs-jdbc》。
jdbc_driver_library:為mysql連接包,可在Maven上下載,下載地址參考:《mysql-connector-java.jar 包下載》。
input { jdbc { jdbc_driver_library => "/opt/logstash/study/mysql-connector-java-8.0.30.jar" jdbc_driver_class => "com.mysql.cj.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://xxx.xxx.xx.x:3306/study_database?serverTimezone=Asia/Shanghai&allowMultiQueries=true&characterEncoding=utf-8" jdbc_user => "root" jdbc_password => "xxxxx" jdbc_paging_enabled => true jdbc_page_size => "2" use_column_value => true tracking_column => "mark_time" tracking_column_type => "numeric" schedule => "* * * * *" statement => "SELECT id,study_code,study_name,study_tag,study_level,operate_user,update_time,UNIX_TIMESTAMP(update_time) as mark_time from study_logstash_es where UNIX_TIMESTAMP(update_time)>:sql_last_value AND update_time < NOW()" } } output{ elasticsearch{ hosts => ["xxx.xxx.16.4:9200","xxx.xxx.16.xx:9200","192.xxx.xx.xx:9200"] index => "study_logstash_es" timeout => 300 user => "xxx" password => "xxxxx" } }
數據同步es
運行命令:sudo -u logstash ../bin/logstash -f study-mysql-es.conf
運行logstash加載配置文件命令,啟動運行日誌,es同步的數據如下:

es數據查詢如下:

2.4 Logstash-kafka消息同步
Logstash的輸入項可以監聽kafka消息,消費消息記錄。
input { kafka { bootstrap_servers => "xxx.xxx.xx.4:9092,xxx.xxx.16.4:9093,xxx.xxx.16.4:9094" #kafka服務器地址 topics => "xxxlog" # batch_size => 5 codec => "json" group_id => "logstash" consumer_threads => 3 } } filter { # 丟棄所有的header請求 if [request][method] == "HEAD" { drop { } } # 因為[request][querystring]這個玩意中的字段類型可能不一樣,所以全部干成字符串 ruby { code => "event.set('[request][querystring]', event.get('[request][querystring]').to_s) if event.get('[request][querystring]')" } if [request][uri] =~ "^/ucenter-admin-view/v3(.*)" { mutate { add_field => { "log_source" => "用戶中心管理後台" } add_field => { "log_source_id" => "1" } } } else if [request][uri] =~ "^/ucenter-org-view/v3/(.*)" { mutate { add_field => { "log_source" => "用戶中心工作台" } add_field => { "log_source_id" => "2" } } } else if [request][uri] =~ "^/safety-admin-api(.*)" { mutate { add_field => { "log_source" => "安全管理平台" } add_field => { "log_source_id" => "3" } } } else{ mutate { add_field => { "log_source" => "其他" } add_field => { "log_source_id" => "0" } } } grok { match => { "[request][uri]" => "%{URIPATH:[request][path]}" } named_captures_only => false } } output{ # stdout { # codec => json # } elasticsearch{ hosts => ["xxx.xxx.xx.4:9200","xxx.xxx.16.13:9200","xxx.xxx.16.14:9200"] index => "apisixlog" timeout => 300 user => "elastic" password => "HApn2xCJMuRlg0UOIV0P" }
3.總結
- Logstash基於 輸入(inputs),過濾器(filters)和輸出(outputs)可以方便快捷的處理數據,將一些非結構化數據,處理為結構化數據。Logstash支持數據中轉,數據同步等場景的應用。本例只是簡要測試,在實際業務使用時,可基於某一個輸入插件/輸出插件參考官方文檔,結合項目使用。
- Logstash收集大量日誌時,存在耗內存的情況,建議參考官方推薦的FileBeat模式。詳情參考文檔:《開源日誌管理方案 ELK 和 EFK 的區別》,《通過Filebeat把日誌傳入到Elasticsearch》。
- Logstash在配置文件調整後,啟動命令,可能出現如下報錯:

刪除掉Logstash/data文件下的緩存文件,即可重新啟動成功。

- Logstash啟動命名常用如下:
sudo -u logstash ../bin/logstash -f study-file-es.conf 表示當前窗口啟動,關閉或退出命令行時,logstash實例關閉。 sudo -u logstash ../bin/logstash -f study-file-es.conf --config.reload.automatic 表示當前窗口啟動,配置文件變化時,不用重新啟動實例,可自動加載。關閉或退出命令行時,logstash實例關閉。 sudo -u logstash ../bin/logstash -f study-mysql-es.conf & test.out --config.reload.automatic 表示後台啟動,關閉退出命令,實例在後台一直運行。 ps -ef|grep logstash kill-9 進程號, 關閉對應的實例
- Logstash運行日誌查看
查看cat logstash-plain.log 文件,可查看Logstash運行日誌記錄。
