LogStash的安装部署与应用

  • 2019 年 10 月 5 日
  • 筆記

LogStash的安装部署与应用

介绍

1、Logstash是一个接收,处理,转发日志的工具; 2、Logstash支持网络日志、系统日志、应用日志、apache日志等等,总之可以处理所有日志类型; 3、典型应用场景ELK:logstash负责采集、解析日志,elasticsearch负责数据存储,kibana负责前端报表展示。

下载

https://www.elastic.co/cn/downloads/logstash

安装部署

上传服务器、解压即可使用。

配置

根据需要修改配置文件config/logstash.yml,默认不修改即可运行

  • 节点名称:node.name
  • 日志级别:log.level(默认debug,如果想要看详细日志改为trace)

根据需要调整jvm.options配置文件:

  • -Xms256m
  • -Xmx1g

测试安装是否成功: 快速启动,标准输入输出作为input和output

./bin/logstash -e 'input { stdin {} } output { stdout {} }'  

使用

主要组件

主要组件

  • Input组件:负责采集日志数据,包括文件、syslog、collectd、kafka、redis等等;
  • Filter:负责解析日志数据,包括解析、加工、转换数据等;
  • Output:负责输出日志数据,对接到redis、kafka、elasticsearch、hdfs等存储组件;

常用启动参数

-e    立即执行,使用命令行里的配置参数启动实例      ./bin/logstash -e ‘input {stdin {}} output {stdout {}}'  -f    指定启动实例的配置文件      ./bin/logstash -f config/test.conf  -t    测试配置文件的正确性      ./bin/logstash-f config/test.conf -t  -l    指定日志文件名称      ./bin/logstash-f config/test.conf -l logs/test.log  -w    指定filter线程数量,默认线程数是5      ./bin/logstash-f config/test.conf -w  

常用input配置

File

文件读取插件主要用来抓取文件的变化信息,将变化信息封装成Event进程处理或者传递。

input    file {          #监听文件的路径          path => ["E:/software/logstash-1.5.4/logstash-1.5.4/data/*","F:/test.txt"]          #排除不想监听的文件          exclude => "1.log"            #添加自定义的字段          add_field => {"test"=>"test"}          #增加标签          tags => "tag1"            #设置新事件的标志          delimiter => "n"            #设置多长时间扫描目录,发现新文件          discover_interval => 15          #设置多长时间检测文件是否修改          stat_interval => 1            #监听文件的起始位置,默认是end          start_position => beginning            #监听文件读取信息记录的位置          sincedb_path => "E:/software/logstash-1.5.4/logstash-1.5.4/test.txt"          #设置多长时间会写入读取的位置信息          sincedb_write_interval => 15    }  }  

kafka

input    kafka {          #集群地址          bootstrap_servers => ["10.142.134.179:9092"]          #消费组id          group_id => "ete_serv_common"          #数组类型,可配置多个topic topics_pattern => ["BO_TOPO_d{1,}$"]正则匹配          topics => ["logq","loge"]          #从最早的偏移量开始消费latest从最新的开始消费          auto_offset_reset => "earliest"          #起多少个input线程数          consumer_threads => 5          #此属性会将当前topic、offset、group、partition等信息也带到message中          decorate_events => true      }  }  

Beats

Beats插件用于建立监听服务,接收Filebeat或者其他beat发送的Events;

配置示例

input {      beats {          port => 5044      }  }  

TCP

TCP插件有两种工作模式,"Client"和"Server",分别用于发送网络数据和监听网络数据。

配置示例

tcp {      port => 41414  }  

Redis

input {    redis {      host => "127.0.0.1"      port => 6379      data_type => "list"      key => "logstash-list"    }  }  

常用的Filter配置

丰富的过滤器插件的是 logstash威力如此强大的重要因素,过滤器插件主要处理流经当前Logstash的事件信息,可以添加字段、移除字段、转换字段类型,通过正则表达式切分数据等,也可以根据条件判断来进行不同的数据处理方式。

grok 过滤器

grok 是Logstash中将非结构化数据解析成结构化数据以便于查询的最好工具,非常适合解析syslog logs,apache log, mysql log,以及一些其他的web log Logstash提供120个常用正则表达式可供安装使用,安装之后你可以通过名称调用它们 语法如下:%{SYNTAX:SEMANTIC}

  • SYNTAX:表示已经安装的正则表达式的名称
  • SEMANTIC:表示从Event中匹配到的内容的名称

例如:Event的内容为"[debug] 127.0.0.1 – test log content",匹配%{IP:client}将获得"client: 127.0.0.1"的结果,前提安装了IP表达式;

通过配置grok可以把 [debug] 127.0.0.1 – test log content 这样的非结构化数据转为: "cllient":"127.0.0.1".

一个完整的例子

 日志文件http.log内容:192.168.1.11 GET /index.html 15994 0.053   表达式:%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}    配置文件内容:    input {    file {      path => "/var/log/http.log"    }  }  filter {    grok {      match => {"message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"}    }  }    输出结果:  client: 192.168.1.11  method: GET  request: /index.html  bytes: 15994  duration: 0.053  

自定义表达式 与预定义表达式相同,你也可以将自定义的表达式配置到Logstash中,然后就可以像于定义的表达式一样使用;

语法:(?<field_name>the pattern here)    举例:捕获10或11和长度的十六进制queue_id可以使用表达式(?<queue_id>[0-9A-F]{10,11})    安装自定义表达式    1、在Logstash根目录下创建文件夹"patterns",在"patterns"文件夹中创建文件"extra"(文件名称无所谓,可自己选择有意义的文件名称);    2、在文件"extra"中添加表达式,格式:patternName regexp,名称与表达式之间用空格隔开即可,如下:                # contents of ./patterns/postfix:              POSTFIX_QUEUEID [0-9A-F]{10,11}      3、使用自定义的表达式时需要指定"patterns_dir"变量,变量内容指向表达式文件所在的目录    举例如下:        ## 日志内容  Jan  1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: [email protected]>        ## Logstash配置      filter {        grok {          patterns_dir => ["./patterns"]          match => { "message" => "%{SYSLOGBASE} %{POSTFIX_QUEUEID:queue_id}: %{GREEDYDATA:syslog_message}" }        }      }        ## 运行结果 ##      timestamp: Jan 1 06:25:43      logsource: mailserver14      program: postfix/cleanup      pid: 21403      queue_id: BEF25A72965    -Grok表达式在线debug地址:      http://grokdebug.herokuapp.com  -预定义正则表达式参考地址:      https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns  

date 时间处理过滤器

该插件用于时间字段的格式转换,比如将"Apr 17 09:32:01"(MMM dd HH:mm:ss)转换为"MM-dd HH:mm:ss"。 而且通常情况下,Logstash会为自动给Event打上时间戳,但是这个时间戳是Event的处理时间(主要是input接收数据的时间),和日志记录时间会存在偏差(主要原因是buffer),我们可以使用此插件用日志发生时间替换掉默认是时间戳的值。

#日志内容  {      "agent":"Windows 7",      "client_time":"2017-11-20 12:00:00",      "client_ip":"123.10.91.106"  }    # Filter 配置  filter {     date {         match => ["client_time", "yyyy-MM-dd HH:mm:ss"]         #这里是如果client_time跟后面的格式匹配上了就会去替换,替换什么呢?         #target默认指的就是@timestamp,所以就是以client_time的时间更新@timestamp的时间     }  }  

mutate数据修改过滤器

mutate 插件是 Logstash另一个重要插件。它提供了丰富的基础类型数据处理能力。 可以重命名,删除,替换和修改事件中的字段。

  • 重命名 — rename 对于已经存在的字段,重命名其字段名称
filter {      mutate {          rename => ["syslog_host", "host"]      }  }  
  • 更新字段内容 — update 更新字段内容,如果字段不存在,不会新建
filter {      mutate {          update => { "sample" => "My new message" }      }  }  
  • 替换字段内容 — replace 与 update 功能相同,区别在于如果字段不存在则会新建字段
filter {      mutate {          replace => { "message" => "%{source_host}: My new message" }      }  }  
  • 数据类型转换 — convert
filter {      mutate {          convert => ["request_time", "float"]      }  }  

JSON过滤器

JSON插件用于解码JSON格式的字符串,一般是一堆日志信息中,部分是JSON格式,部分不是的情况下

配置示例    json {      source => ...  }    ## 示例配置,message是JSON格式的字符串:"{"uid":3081609001,"type":"signal"}" ##    filter {      json {          source => "message"          target => "jsoncontent"      }  }    ## 输出结果 ##  {      "@version": "1",      "@timestamp": "2014-11-18T08:11:33.000Z",      "host": "web121.mweibo.tc.sinanode.com",      "message": "{"uid":333333333,"type":"signal"}",      "jsoncontent": {          "uid": 333333333,          "type": "signal"      }  }    ## 如果从示例配置中删除`target`,输出结果如下 ##  {      "@version": "1",      "@timestamp": "2014-11-18T08:11:33.000Z",      "host": "web121.mweibo.tc.sinanode.com",      "message": "{"uid":333333333,"type":"signal"}",      "uid": 333333333,      "type": "signal"  }  

Output

ElasticSearch输出插件

用于将事件信息写入到Elasticsearch中,官方推荐插件,ELK必备插件

配置事例

output {      elasticsearch {          hosts => ["127.0.0.1:9200"]          index => "filebeat-%{type}-%{+yyyy.MM.dd}"          user => "upuptop"          password => "upuptop"          template => "/dd/c/caaaa.json"          template_name => "index_scene_control"                   template_overwrite => true              }  }  

File输出插件

用于将Event输出到文件内

配置事例

output {      file {          path => "/home/upuptop/test.txt "        }  }