LogStash的安装部署与应用
- 2019 年 10 月 5 日
- 筆記
LogStash的安装部署与应用
介绍
1、Logstash是一个接收,处理,转发日志的工具; 2、Logstash支持网络日志、系统日志、应用日志、apache日志等等,总之可以处理所有日志类型; 3、典型应用场景ELK:logstash负责采集、解析日志,elasticsearch负责数据存储,kibana负责前端报表展示。
下载
https://www.elastic.co/cn/downloads/logstash
安装部署
上传服务器、解压即可使用。
配置
根据需要修改配置文件config/logstash.yml,默认不修改即可运行
- 节点名称:node.name
- 日志级别:log.level(默认debug,如果想要看详细日志改为trace)
根据需要调整jvm.options配置文件:
- -Xms256m
- -Xmx1g
测试安装是否成功: 快速启动,标准输入输出作为input和output
./bin/logstash -e 'input { stdin {} } output { stdout {} }'
使用
主要组件
主要组件
- Input组件:负责采集日志数据,包括文件、syslog、collectd、kafka、redis等等;
- Filter:负责解析日志数据,包括解析、加工、转换数据等;
- Output:负责输出日志数据,对接到redis、kafka、elasticsearch、hdfs等存储组件;
常用启动参数
-e 立即执行,使用命令行里的配置参数启动实例 ./bin/logstash -e ‘input {stdin {}} output {stdout {}}' -f 指定启动实例的配置文件 ./bin/logstash -f config/test.conf -t 测试配置文件的正确性 ./bin/logstash-f config/test.conf -t -l 指定日志文件名称 ./bin/logstash-f config/test.conf -l logs/test.log -w 指定filter线程数量,默认线程数是5 ./bin/logstash-f config/test.conf -w
常用input配置
File
文件读取插件主要用来抓取文件的变化信息,将变化信息封装成Event进程处理或者传递。
input file { #监听文件的路径 path => ["E:/software/logstash-1.5.4/logstash-1.5.4/data/*","F:/test.txt"] #排除不想监听的文件 exclude => "1.log" #添加自定义的字段 add_field => {"test"=>"test"} #增加标签 tags => "tag1" #设置新事件的标志 delimiter => "n" #设置多长时间扫描目录,发现新文件 discover_interval => 15 #设置多长时间检测文件是否修改 stat_interval => 1 #监听文件的起始位置,默认是end start_position => beginning #监听文件读取信息记录的位置 sincedb_path => "E:/software/logstash-1.5.4/logstash-1.5.4/test.txt" #设置多长时间会写入读取的位置信息 sincedb_write_interval => 15 } }
kafka
input kafka { #集群地址 bootstrap_servers => ["10.142.134.179:9092"] #消费组id group_id => "ete_serv_common" #数组类型,可配置多个topic topics_pattern => ["BO_TOPO_d{1,}$"]正则匹配 topics => ["logq","loge"] #从最早的偏移量开始消费latest从最新的开始消费 auto_offset_reset => "earliest" #起多少个input线程数 consumer_threads => 5 #此属性会将当前topic、offset、group、partition等信息也带到message中 decorate_events => true } }
Beats
Beats插件用于建立监听服务,接收Filebeat或者其他beat发送的Events;
配置示例
input { beats { port => 5044 } }
TCP
TCP插件有两种工作模式,"Client"和"Server",分别用于发送网络数据和监听网络数据。
配置示例
tcp { port => 41414 }
Redis
input { redis { host => "127.0.0.1" port => 6379 data_type => "list" key => "logstash-list" } }
常用的Filter配置
丰富的过滤器插件的是 logstash威力如此强大的重要因素,过滤器插件主要处理流经当前Logstash的事件信息,可以添加字段、移除字段、转换字段类型,通过正则表达式切分数据等,也可以根据条件判断来进行不同的数据处理方式。
grok 过滤器
grok 是Logstash中将非结构化数据解析成结构化数据以便于查询的最好工具,非常适合解析syslog logs,apache log, mysql log,以及一些其他的web log Logstash提供120个常用正则表达式可供安装使用,安装之后你可以通过名称调用它们 语法如下:%{SYNTAX:SEMANTIC}
- SYNTAX:表示已经安装的正则表达式的名称
- SEMANTIC:表示从Event中匹配到的内容的名称
例如:Event的内容为"[debug] 127.0.0.1 – test log content",匹配%{IP:client}将获得"client: 127.0.0.1"的结果,前提安装了IP表达式;
通过配置grok可以把 [debug] 127.0.0.1 – test log content 这样的非结构化数据转为: "cllient":"127.0.0.1".
一个完整的例子
日志文件http.log内容:192.168.1.11 GET /index.html 15994 0.053 表达式:%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration} 配置文件内容: input { file { path => "/var/log/http.log" } } filter { grok { match => {"message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"} } } 输出结果: client: 192.168.1.11 method: GET request: /index.html bytes: 15994 duration: 0.053
自定义表达式 与预定义表达式相同,你也可以将自定义的表达式配置到Logstash中,然后就可以像于定义的表达式一样使用;
语法:(?<field_name>the pattern here) 举例:捕获10或11和长度的十六进制queue_id可以使用表达式(?<queue_id>[0-9A-F]{10,11}) 安装自定义表达式 1、在Logstash根目录下创建文件夹"patterns",在"patterns"文件夹中创建文件"extra"(文件名称无所谓,可自己选择有意义的文件名称); 2、在文件"extra"中添加表达式,格式:patternName regexp,名称与表达式之间用空格隔开即可,如下: # contents of ./patterns/postfix: POSTFIX_QUEUEID [0-9A-F]{10,11} 3、使用自定义的表达式时需要指定"patterns_dir"变量,变量内容指向表达式文件所在的目录 举例如下: ## 日志内容 Jan 1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: [email protected]> ## Logstash配置 filter { grok { patterns_dir => ["./patterns"] match => { "message" => "%{SYSLOGBASE} %{POSTFIX_QUEUEID:queue_id}: %{GREEDYDATA:syslog_message}" } } } ## 运行结果 ## timestamp: Jan 1 06:25:43 logsource: mailserver14 program: postfix/cleanup pid: 21403 queue_id: BEF25A72965 -Grok表达式在线debug地址: http://grokdebug.herokuapp.com -预定义正则表达式参考地址: https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns
date 时间处理过滤器
该插件用于时间字段的格式转换,比如将"Apr 17 09:32:01"(MMM dd HH:mm:ss)转换为"MM-dd HH:mm:ss"。 而且通常情况下,Logstash会为自动给Event打上时间戳,但是这个时间戳是Event的处理时间(主要是input接收数据的时间),和日志记录时间会存在偏差(主要原因是buffer),我们可以使用此插件用日志发生时间替换掉默认是时间戳的值。
#日志内容 { "agent":"Windows 7", "client_time":"2017-11-20 12:00:00", "client_ip":"123.10.91.106" } # Filter 配置 filter { date { match => ["client_time", "yyyy-MM-dd HH:mm:ss"] #这里是如果client_time跟后面的格式匹配上了就会去替换,替换什么呢? #target默认指的就是@timestamp,所以就是以client_time的时间更新@timestamp的时间 } }
mutate数据修改过滤器
mutate 插件是 Logstash另一个重要插件。它提供了丰富的基础类型数据处理能力。 可以重命名,删除,替换和修改事件中的字段。
- 重命名 — rename 对于已经存在的字段,重命名其字段名称
filter { mutate { rename => ["syslog_host", "host"] } }
- 更新字段内容 — update 更新字段内容,如果字段不存在,不会新建
filter { mutate { update => { "sample" => "My new message" } } }
- 替换字段内容 — replace 与 update 功能相同,区别在于如果字段不存在则会新建字段
filter { mutate { replace => { "message" => "%{source_host}: My new message" } } }
- 数据类型转换 — convert
filter { mutate { convert => ["request_time", "float"] } }
JSON过滤器
JSON插件用于解码JSON格式的字符串,一般是一堆日志信息中,部分是JSON格式,部分不是的情况下
配置示例 json { source => ... } ## 示例配置,message是JSON格式的字符串:"{"uid":3081609001,"type":"signal"}" ## filter { json { source => "message" target => "jsoncontent" } } ## 输出结果 ## { "@version": "1", "@timestamp": "2014-11-18T08:11:33.000Z", "host": "web121.mweibo.tc.sinanode.com", "message": "{"uid":333333333,"type":"signal"}", "jsoncontent": { "uid": 333333333, "type": "signal" } } ## 如果从示例配置中删除`target`,输出结果如下 ## { "@version": "1", "@timestamp": "2014-11-18T08:11:33.000Z", "host": "web121.mweibo.tc.sinanode.com", "message": "{"uid":333333333,"type":"signal"}", "uid": 333333333, "type": "signal" }
Output
ElasticSearch输出插件
用于将事件信息写入到Elasticsearch中,官方推荐插件,ELK必备插件
配置事例
output { elasticsearch { hosts => ["127.0.0.1:9200"] index => "filebeat-%{type}-%{+yyyy.MM.dd}" user => "upuptop" password => "upuptop" template => "/dd/c/caaaa.json" template_name => "index_scene_control" template_overwrite => true } }
File输出插件
用于将Event输出到文件内
配置事例
output { file { path => "/home/upuptop/test.txt " } }