如何編寫 Pipeline 腳本
前言
Pipeline 編寫較為麻煩,為此,DataKit 中內置了簡單的調試工具,用以輔助大家來編寫 Pipeline 腳本。
調試 grok 和 pipeline
指定 pipeline 腳本名稱,輸入一段文本即可判斷提取是否成功
Pipeline 腳本必須放在 /pipeline 目錄下。
$ datakit pipeline your_pipeline.p -T '2021-01-11T17:43:51.887+0800 DEBUG io io/io.go:458 post cost 6.87021ms' Extracted data(cost: 421.705µs): # 表示切割成功 { "code" : "io/io.go: 458", # 對應代碼位置 "level" : "DEBUG", # 對應日誌等級 "module" : "io", # 對應代碼模塊 "msg" : "post cost 6.87021ms", # 純日誌內容 "time" : 1610358231887000000 # 日誌時間(Unix 納秒時間戳) "message": "2021-01-11T17:43:51.887+0800 DEBUG io io/io.g o:458 post cost 6.87021ms" }
提取失敗示例(只有 message 留下了,說明其它字段並未提取出來):
$ datakit pipeline other_pipeline.p -T '2021-01-11T17:43:51.887+0800 DEBUG io io/io.g o:458 post cost 6.87021ms' { "message": "2021-01-11T17:43:51.887+0800 DEBUG io io/io.g o:458 post cost 6.87021ms" }
如果調試文本比較複雜,可以將它們寫入一個文件(sample.log),用如下方式調試:
$ datakit pipeline your_pipeline.p -F sample.log
更多 Pipeline 調試命令,參見 datakit help pipeline。
Grok 通配搜索
由於 Grok pattern 數量繁多,人工匹配較為麻煩。DataKit 提供了交互式的命令行工具 grokq
(grok query):
datakit tool --grokq grokq > Mon Jan 25 19:41:17 CST 2021 # 此處輸入你希望匹配的文本 2 %{DATESTAMP_OTHER: ?} # 工具會給出對應對的建議,越靠前匹配月精確(權重也越大)。前面的數字表明權重。 0 %{GREEDYDATA: ?} grokq > 2021-01-25T18:37:22.016+0800 4 %{TIMESTAMP_ISO8601: ?} # 此處的 ? 表示你需要用一個字段來命名匹配到的文本 0 %{NOTSPACE: ?} 0 %{PROG: ?} 0 %{SYSLOGPROG: ?} 0 %{GREEDYDATA: ?} # 像 GREEDYDATA 這種範圍很廣的 pattern,權重都較低 # 權重越高,匹配的精確度越大 grokq > Q # Q 或 exit 退出 Bye!
Windows 下,請在 Powershell 中執行調試。
多行如何處理
在處理一些調用棧相關的日誌時,由於其日誌行數不固定,直接用 GREEDYDATA
這個 pattern 無法處理如下情況的日誌:
1 2022-02-10 16:27:36.116 ERROR 1629881 --- [scheduling-1] o.s.s.s.TaskUtils$LoggingErrorHandler : Unexpected error occurred in scheduled task 2 3 java.lang.NullPointerException: null 4 5 at com.xxxxx.xxxxxxxxxxx.xxxxxxx.impl.SxxxUpSxxxxxxImpl.isSimilarPrize(xxxxxxxxxxxxxxxxx.java:442) 6 7 at com.xxxxx.xxxxxxxxxxx.xxxxxxx.impl.SxxxUpSxxxxxxImpl.lambda$getSimilarPrizeSnapUpDo$0(xxxxxxxxxxxxxxxxx.java:595) 8 9 at java.util.stream.ReferencePipeline$3$1.accept(xxxxxxxxxxxxxxxxx.java:193) 10 11 at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(xxxxxxxxx.java:1382) 12 13 at java.util.stream.AbstractPipeline.copyInto(xxxxxxxxxxxxxxxx.java:481) 14 15 at java.util.stream.AbstractPipeline.wrapAndCopyInto(xxxxxxxxxxxxxxxx.java:471) 16 17 at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(xxxxxxxxx.java:708) 18 19 at java.util.stream.AbstractPipeline.evaluate(xxxxxxxxxxxxxxxx.java:234) 20 21 at java.util.stream.ReferencePipeline.collect(xxxxxxxxxxxxxxxxx.java:499) 22
此處可以使用 GREEDYLINES
規則來通配,如(/usr/local/datakit/pipeline/test.p):
add_pattern('_dklog_date', '%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{HOUR}:%{MINUTE}:%{SECOND}%{INT}') grok(_, '%{_dklog_date:log_time}\\s+%{LOGLEVEL:Level}\\s+%{NUMBER:Level_value}\\s+---\\s+\\[%{NOTSPACE:thread_name}\\]\\s+%{GREEDYDATA:Logger_name}\\s+(\\n)?(%{GREEDYLINES:stack_trace})' # 此處移除 message 字段便於調試 drop_origin_data()
將上述多行日誌存為 multi-line.log,調試一下:
$ datakit --pl test.p --txt "$(<multi-line.log)"
得到如下切割結果:
{ "Level": "ERROR", "Level_value": "1629881", "Logger_name": "o.s.s.s.TaskUtils$LoggingErrorHandler : Unexpected error occurred in scheduled task", "log_time": "2022-02-10 16:27:36.116", "stack_trace": "java.lang.NullPointerException: null\n\tat com.xxxxx.xxxxxxxxxxx.xxxxxxx.impl.SxxxUpSxxxxxxImpl.isSimilarPrize(xxxxxxxxxxxxxxxxx.java:442)\n\tat com.xxxxx.xxxxxxxxxxx.xxxxxxx.impl.SxxxUpSxxxxxxImpl.lambda$getSimilarPrizeSnapUpDo$0(xxxxxxxxxxxxxxxxx.java:595)\n\tat java.util.stream.ReferencePipeline$3$1.accept(xxxxxxxxxxxxxxxxx.java:193)\n\tat java.util.ArrayList$ArrayListSpliterator.forEachRemaining(xxxxxxxxx.java:1382)\n\tat java.util.stream.AbstractPipeline.copyInto(xxxxxxxxxxxxxxxx.java:481)\n\tat java.util.stream.AbstractPipeline.wrapAndCopyInto(xxxxxxxxxxxxxxxx.java:471)\n\tat java.util.stream.ReduceOps$ReduceOp.evaluateSequential(xxxxxxxxx.java:708)\n\tat java.util.stream.AbstractPipeline.evaluate(xxxxxxxxxxxxxxxx.java:234)\n\tat java.util.stream.ReferencePipeline.collect(xxxxxxxxxxxxxxxxx.java:499)", "thread_name": "scheduling-1" }
Pipeline 字段命名注意事項
在所有 Pipeline 切割出來的字段中,它們都是指標(field)而不是標籤(tag)。由於行協議約束,我們不應該切割出任何跟 tag 同名的字段。這些 Tag 包含如下幾類:
-
DataKit 中的全局 Tag
-
日誌採集器中自定義的 Tag
另外,所有採集上來的日誌,均存在如下多個保留字段。我們不應該去覆蓋這些字段,否則可能導致數據在查看器頁面顯示不正常。
字段名 | 類型 | 說明 |
---|---|---|
source |
string(tag) | 日誌來源 |
service |
string(tag) | 日誌對應的服務,默認跟 service 一樣 |
status |
string(tag) | 日誌對應的等級 |
message |
string(field) | 原始日誌 |
time |
int | 日誌對應的時間戳 |
當然我們可以通過特定的 Pipeline 函數覆蓋上面這些 tag 的值。
一旦 Pipeline 切割出來的字段跟已有 Tag 重名(大小寫敏感),都會導致如下數據報錯。故建議在 Pipeline 切割中,繞開這些字段命名。
# 該錯誤在 DataKit monitor 中能看到
same key xxx in tag and field
完整 Pipeline 示例
這裡以 DataKit 自身的日誌切割為例。DataKit 自身的日誌形式如下:
2021-01-11T17:43:51.887+0800 DEBUG io io/io.go:458 post cost 6.87021ms
編寫對應 pipeline:
# pipeline for datakit log # Mon Jan 11 10:42:41 CST 2021 # auth: tanb grok(_, '%{_dklog_date:log_time}%{SPACE}%{_dklog_level:level}%{SPACE}%{_dklog_mod:module}%{SPACE}%{_dklog_source_file:code}%{SPACE}%{_dklog_msg:msg}') rename("time", log_time) # 將 log_time 重名命名為 time default_time(time) # 將 time 字段作為輸出數據的時間戳 drop_origin_data() # 丟棄原始日誌文本(不建議這麼做)
這裡引用了幾個用戶自定義的 pattern,如 _dklog_date、_dklog_level。我們將這些規則存放 <datakit安裝目錄>/pipeline/pattern 下。
注意,用戶自定義 pattern 如果需要==全局生效==(即在其它 Pipeline 腳本中應用),必須放置在 <DataKit安裝目錄/pipeline/pattern/> 目錄下):
$ cat pipeline/pattern/datakit # 注意:自定義的這些 pattern,命名最好加上特定的前綴,以免跟內置的命名衝突(內置 pattern 名稱不允許覆蓋) # 自定義 pattern 格式為: # <pattern-name><空格><具體 pattern 組合> _dklog_date %{YEAR}-%{MONTHNUM}-%{MONTHDAY}T%{HOUR}:%{MINUTE}:%{SECOND}%{INT} _dklog_level (DEBUG|INFO|WARN|ERROR|FATAL) _dklog_mod %{WORD} _dklog_source_file (/?[\w_%!$@:.,-]?/?)(\S+)? _dklog_msg %{GREEDYDATA}
現在 pipeline 以及其引用的 pattern 都有了,就能通過 DataKit 內置的 pipeline 調試工具,對這一行日誌進行切割:
# 提取成功示例 $ ./datakit --pl dklog_pl.p --txt '2021-01-11T17:43:51.887+0800 DEBUG io io/io.go:458 post cost 6.87021ms' Extracted data(cost: 421.705µs): { "code": "io/io.go:458", "level": "DEBUG", "module": "io", "msg": "post cost 6.87021ms", "time": 1610358231887000000 }
FAQ
Pipeline 調試時,為什麼變量無法引用?
Pipeline 為:
json(_, message, "message") json(_, thread_name, "thread") json(_, level, "status") json(_, @timestamp, "time")
其報錯如下:
[E] new piepline failed: 4:8 parse error: unexpected character: '@'
A: 對於有特殊字符的變量,需將其用兩個 `
修飾一下:
json(_, `@timestamp`, "time")
參見【 Pipeline 的基本語法規則 】//docs.guance.com/developers/pipeline/#basic-syntax
Pipeline 調試時,為什麼找不到對應的 Pipeline 腳本?
命令如下:
$ datakit pipeline test.p -T "..."
[E] get pipeline failed: stat /usr/local/datakit/pipeline/test.p: no such file or directory
A: 調試用的 Pipeline 腳本,需將其放置到 /pipeline 目錄下。
如何在一個 Pipeline 中切割多種不同格式的日誌?
在日常的日誌中,因為業務的不同,日誌會呈現出多種形態,此時,需寫多個 Grok 切割,為提高 Grok 的運行效率,可根據日誌出現的頻率高低,優先匹配出現頻率更高的那個 Grok,這樣,大概率日誌在前面幾個 Grok 中就匹配上了,避免了無效的匹配。
在日誌切割中,Grok 匹配是性能開銷最大的部分,故避免重複的 Grok 匹配,能極大的提高 Grok 的切割性能。
grok(_, "%{NOTSPACE:client_ip} %{NOTSPACE:http_ident} ...") if client_ip != nil { # 證明此時上面的 grok 已經匹配上了,那麼就按照該日誌來繼續後續處理 ... } else { # 這裡說明是不同的日誌來了,上面的 grok 沒有匹配上當前的日誌 grok(_, "%{date2:time} \\[%{LOGLEVEL:status}\\] %{GREEDYDATA:msg} ...") if status != nil { # 此處可再檢查上面的 grok 是否匹配上... } else { # 未識別的日誌,或者,在此可再加一個 grok 來處理,如此層層遞進 } }
如何丟棄字段切割
在某些情況下,我們需要的只是日誌==中間的幾個字段==,但不好跳過前面的部分,比如
200 356 1 0 44 30032 other messages
其中,我們只需要 44 這個值,它可能代碼響應延遲,那麼可以這樣切割(即 Grok 中不附帶 :some_field 這個部分):
grok(_, "%{INT} %{INT} %{INT} %{INT:response_time} %{GREEDYDATA}")
add_pattern() 轉義問題
大家在使用 add_pattern() 添加局部模式時,容易陷入轉義問題,比如如下這個 pattern(用來通配文件路徑以及文件名):
(/?[\w_%!$@:.,-]?/?)(\S+)?
如果我們將其放到全局 pattern 目錄下(即 pipeline/pattern 目錄),可這麼寫:
# my-testsource_file (/?[\w_%!$@:.,-]?/?)(\S+)?
如果使用 add_pattern(),就需寫成這樣:
# my-test.p
add_pattern('source_file', '(/?[\\w_%!$@:.,-]?/?)(\\S+)?')
即這裏面反斜杠需要轉義。