ElasticSearch結合Logstash(三)
一、Logstash簡介
1,什麼是Logstash
Logstash 是開源的伺服器端數據處理管道,能夠同時從多個來源採集數據,轉換數據,然後將數據發送到您最喜歡的「存儲庫」中。
2,為什麼使用Logstash
如果某台伺服器部署了多個實例,則需要去每個應用實例的日誌目錄下去找日誌文件。每個應用實例還會設置日誌滾動策略(如:每天生成一個文件),還有日誌壓縮歸檔策略等。
如果我們能把這些日誌集中管理,並提供集中檢索功能,不僅可以提高診斷的效率,同時對系統情況有個全面的理解,避免事後救火的被動。所以日誌集中管理功能就可以使用ELK技術棧進行實現。Elasticsearch只有數據存儲和分析的能力,Kibana就是可視化管理平台。還缺少數據收集和整理的角色,這個功能就是Logstash負責的。
3,Logstash工作原理
a)Data Source
Logstash 支援的數據源有很多。例如對於日誌功能來說只要有日誌記錄和日誌傳遞功能的日誌都支援,Spring Boot中默認推薦logback支援日誌輸出功能(輸出到資料庫、數據出到文件)。
b)Logstash Pipeline
在Logstash中包含非常重要的三個功能:
- Input:輸入源,一般配置為自己監聽的主機及埠。DataSource向指定的ip及埠輸出日誌,Input 輸入源監聽到數據資訊就可以進行收集。
- Filter:過濾功能,對收集到的資訊進行過濾(額外處理),也可以省略這個配置(不做處理)
- Output:把收集到的資訊發送給誰。在ELK技術棧中都是輸出給Elasticsearch,後面數據檢索和數據分析的過程就給Elasticsearch了。
最終效果:通過整體步驟就可以把原來一行日誌資訊轉換為Elasticsearch支援的Document形式(鍵值對形式)的數據進行存儲。
二、採用Logstash收集SpringBoot日誌
1,安裝logstash
- 解壓logstash-6.8.4.tar.gz(與當前es的版本匹配)
- 創建一個conf配置文件
#輸入採用tcp監控 ,當前監控埠為5044 input { tcp { mode => "server" host => "0.0.0.0" port => 5044 } } #輸出到es中,配置index索引為my-es-log-年月日 output { elasticsearch { hosts => ["//hadoop208:9200","//hadoop209:9200"] index => "my-es-log-%{+YYYY.MM.dd}" } }
-
啟動:./logstash -f /usr/local/logstash/config/mylogstash.conf
2,在Springboot添加logback
a)添加maven
<!-- 使用logback往logstash中寫入日誌 -->
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>6.3</version>
</dependency>
b)配置logback.xml(在resource目錄下)


<?xml version="1.0" encoding="UTF-8"?> <!-- 小技巧: 在根pom裡面設置統一存放路徑,統一管理方便維護 <properties> <log-path>/Users/lengleng</log-path> </properties> 1. 其他模組加日誌輸出,直接copy本文件放在resources 目錄即可 2. 注意修改 <property name="${log-path}/log.path" value=""/> 的value模組 --> <configuration debug="false" scan="false"> <property name="log.path" value="logs/${project.artifactId}"/> <!-- 彩色日誌格式 --> <property name="CONSOLE_LOG_PATTERN" value="${CONSOLE_LOG_PATTERN:-%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}"/> <!-- 彩色日誌依賴的渲染類 --> <conversionRule conversionWord="clr" converterClass="org.springframework.boot.logging.logback.ColorConverter"/> <conversionRule conversionWord="wex" converterClass="org.springframework.boot.logging.logback.WhitespaceThrowableProxyConverter"/> <conversionRule conversionWord="wEx" converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter"/> <!-- Console log output --> <appender name="console" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>${CONSOLE_LOG_PATTERN}</pattern> </encoder> </appender> <!-- Log file debug output --> <appender name="debug" class="ch.qos.logback.core.rolling.RollingFileAppender"> <file>${log.path}/debug.log</file> <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy"> <fileNamePattern>${log.path}/%d{yyyy-MM, aux}/debug.%d{yyyy-MM-dd}.%i.log.gz</fileNamePattern> <maxFileSize>50MB</maxFileSize> <maxHistory>30</maxHistory> </rollingPolicy> <encoder> <pattern>%date [%thread] %-5level [%logger{50}] %file:%line - %msg%n</pattern> </encoder> </appender> <!-- Log file error output --> <appender name="error" class="ch.qos.logback.core.rolling.RollingFileAppender"> <file>${log.path}/error.log</file> <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy"> <fileNamePattern>${log.path}/%d{yyyy-MM}/error.%d{yyyy-MM-dd}.%i.log.gz</fileNamePattern> <maxFileSize>50MB</maxFileSize> <maxHistory>30</maxHistory> </rollingPolicy> <encoder> <pattern>%date [%thread] %-5level [%logger{50}] %file:%line - %msg%n</pattern> </encoder> <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> <level>ERROR</level> </filter> </appender> <!--輸出到logstash的appender--> <appender name="LOGSTASH" class="net.logstash.logback.appender.LogstashTcpSocketAppender"> <!--可以訪問的logstash日誌收集埠--> <destination>hadoop208:3116</destination> <encoder charset="UTF-8" class="net.logstash.logback.encoder.LogstashEncoder"/> </appender> <logger name="org.activiti.engine.impl.db" level="DEBUG"> <appender-ref ref="debug"/> </logger> <!--nacos 心跳 INFO 屏蔽--> <logger name="com.alibaba.nacos" level="OFF"> <appender-ref ref="error"/> </logger> <!-- Level: FATAL 0 ERROR 3 WARN 4 INFO 6 DEBUG 7 --> <root level="INFO"> <appender-ref ref="console"/> <appender-ref ref="debug"/> <appender-ref ref="LOGSTASH"/> </root> </configuration>
View Code
c)啟動並查看
3,使用kibana查看
創建索引規則
查看Discover
三、Logstash完成MySQL數據增量導入es
1 ,上傳jdbc連接jar
上傳到:$LogStash_HOME/logstash_core/lib/jars/
2,創建mysql


CREATE TABLE `tb_item` ( `id` bigint(20) NOT NULL COMMENT '商品id,同時也是商品編號', `title` varchar(100) NOT NULL COMMENT '商品標題', `sell_point` varchar(500) DEFAULT NULL COMMENT '商品賣點', `price` bigint(20) NOT NULL COMMENT '商品價格,單位為:分', `num` int(10) NOT NULL COMMENT '庫存數量', `barcode` varchar(30) DEFAULT NULL COMMENT '商品條形碼', `image` varchar(500) DEFAULT NULL COMMENT '商品圖片', `cid` bigint(10) NOT NULL COMMENT '所屬類目,葉子類目', `status` tinyint(4) NOT NULL DEFAULT '1' COMMENT '商品狀態,1-正常,2-下架,3-刪除', `created` datetime NOT NULL COMMENT '創建時間', `updated` datetime NOT NULL COMMENT '更新時間', PRIMARY KEY (`id`), KEY `cid` (`cid`), KEY `status` (`status`), KEY `updated` (`updated`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='商品表';
View Code
LogStash實現增量導入,需要有一個定位欄位,這個欄位的數據,可以表示數據的新舊,代表這個數據是否是一個需要導入到ES中的數據。案例中使用表格的updated欄位作為定位欄位,每次讀取數據的時候,都會記錄一個最大的updated時間,每次讀取數據的時候,都讀取updated大於等於記錄的定位欄位數據。每次查詢的就都是最新的,要導入到ES中的數據。
3,編寫logstash配置
在$LogStash_home/config/目錄中,編寫配置文件ego-items-db2es.conf
input { jdbc { # 連接地址 jdbc_connection_string => "jdbc:mysql://hadop202:3306/ego?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC" # 資料庫用戶名和密碼 jdbc_user => "root" jdbc_password => "123456" # 驅動類,如果使用低版本的logstash,需要再增加配置 jdbc_driver_library,配置驅動包所在位置 jdbc_driver_class => "com.mysql.cj.jdbc.Driver" # 是否開啟分頁邏輯 jdbc_paging_enabled => true # 分頁的長度是多少 jdbc_page_size => "2000" # 時區 jdbc_default_timezone => "Asia/Shanghai" # 執行的SQL statement => "select id, title, sell_point, price, image, updated from tb_item where updated >= :sql_last_value order by updated asc" # 執行SQL的周期, [秒] 分鐘 小時 天 月 年 schedule => "* * * * *" # 是否使用欄位的值作為比較策略 use_column_value => true # 作為比較策略的欄位名稱 tracking_column => "updated" # 作為比較策略的欄位類型,可選為numberic和timestamp tracking_column_type => "timestamp" # 記錄最近的比較策略欄位值的文件是什麼,相對定址路徑是logstash的安裝路徑 last_run_metadata_path => "./ego-items-db2es-last-value" # 是否每次執行SQL的時候,都刪除last_run_metadata_path文件內容 clean_run => false # 是否強制把ES中的欄位名都定義為小寫。 lowercase_column_names => false } } output { elasticsearch { hosts => ["//hadoop208:9200", "//hadoop209:9200"] index => "ego-items-index" action => "index" #指定document_id為資料庫主鍵,保證更新 document_id => "%{id}" } }
4,安裝logstash-input-jdbc插件
$Logstash_HOME/bin/logstash-plugin install logstash-input-jdbc
5,啟動測試
bin/logstash -f 配置文件