如何編寫 Pipeline 腳本

前言

Pipeline 編寫較為麻煩,為此,DataKit 中內置了簡單的調試工具,用以輔助大家來編寫 Pipeline 腳本。

調試 grok 和 pipeline

指定 pipeline 腳本名稱,輸入一段文本即可判斷提取是否成功

Pipeline 腳本必須放在 /pipeline 目錄下。

$ datakit pipeline your_pipeline.p -T '2021-01-11T17:43:51.887+0800  DEBUG io  io/io.go:458  post cost 6.87021ms'
Extracted data(cost: 421.705µs): # 表示切割成功
{    
"code"   : "io/io.go: 458",       # 對應代碼位置    
"level"  : "DEBUG",               # 對應日誌等級    
"module" : "io",                  # 對應代碼模塊    
"msg"    : "post cost 6.87021ms", # 純日誌內容    
"time"   : 1610358231887000000    # 日誌時間(Unix 納秒時間戳)    "message": "2021-01-11T17:43:51.887+0800  DEBUG io  io/io.g o:458  post cost 6.87021ms"
}

提取失敗示例(只有 message 留下了,說明其它字段並未提取出來):

$ datakit pipeline other_pipeline.p -T '2021-01-11T17:43:51.887+0800  DEBUG io  io/io.g o:458  post cost 6.87021ms'
{    
"message": "2021-01-11T17:43:51.887+0800  DEBUG io  io/io.g o:458  post cost 6.87021ms"
} 

如果調試文本比較複雜,可以將它們寫入一個文件(sample.log),用如下方式調試:

$ datakit pipeline your_pipeline.p -F sample.log

更多 Pipeline 調試命令,參見 datakit help pipeline。

Grok 通配搜索

由於 Grok pattern 數量繁多,人工匹配較為麻煩。DataKit 提供了交互式的命令行工具 grokq(grok query):

datakit tool --grokq
grokq > Mon Jan 25 19:41:17 CST 2021   # 此處輸入你希望匹配的文本        
2 %{DATESTAMP_OTHER: ?}        # 工具會給出對應對的建議,越靠前匹配月精確(權重也越大)。前面的數字表明權重。        
0 %{GREEDYDATA: ?}

grokq > 2021-01-25T18:37:22.016+0800        
4 %{TIMESTAMP_ISO8601: ?}      # 此處的 ? 表示你需要用一個字段來命名匹配到的文本        
0 %{NOTSPACE: ?}       
0 %{PROG: ?}        
0 %{SYSLOGPROG: ?}        
0 %{GREEDYDATA: ?}             # 像 GREEDYDATA 這種範圍很廣的 pattern,權重都較低                                       # 權重越高,匹配的精確度越大
grokq > Q                              # Q 或 exit 退出
Bye!

Windows 下,請在 Powershell 中執行調試。

多行如何處理

在處理一些調用棧相關的日誌時,由於其日誌行數不固定,直接用 GREEDYDATA 這個 pattern 無法處理如下情況的日誌:

 

 1 2022-02-10 16:27:36.116 ERROR 1629881 --- [scheduling-1] o.s.s.s.TaskUtils$LoggingErrorHandler    : Unexpected error occurred in scheduled task
 2 
 3     java.lang.NullPointerException: null    
 4 
 5 at com.xxxxx.xxxxxxxxxxx.xxxxxxx.impl.SxxxUpSxxxxxxImpl.isSimilarPrize(xxxxxxxxxxxxxxxxx.java:442)    
 6 
 7 at com.xxxxx.xxxxxxxxxxx.xxxxxxx.impl.SxxxUpSxxxxxxImpl.lambda$getSimilarPrizeSnapUpDo$0(xxxxxxxxxxxxxxxxx.java:595)    
 8 
 9 at java.util.stream.ReferencePipeline$3$1.accept(xxxxxxxxxxxxxxxxx.java:193)    
10 
11 at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(xxxxxxxxx.java:1382)    
12 
13 at java.util.stream.AbstractPipeline.copyInto(xxxxxxxxxxxxxxxx.java:481)    
14 
15 at java.util.stream.AbstractPipeline.wrapAndCopyInto(xxxxxxxxxxxxxxxx.java:471)    
16 
17 at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(xxxxxxxxx.java:708)    
18 
19 at java.util.stream.AbstractPipeline.evaluate(xxxxxxxxxxxxxxxx.java:234)    
20 
21 at java.util.stream.ReferencePipeline.collect(xxxxxxxxxxxxxxxxx.java:499)
22  
此處可以使用 GREEDYLINES 規則來通配,如(/usr/local/datakit/pipeline/test.p):
add_pattern('_dklog_date', '%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{HOUR}:%{MINUTE}:%{SECOND}%{INT}')
grok(_, '%{_dklog_date:log_time}\\s+%{LOGLEVEL:Level}\\s+%{NUMBER:Level_value}\\s+---\\s+\\[%{NOTSPACE:thread_name}\\]\\s+%{GREEDYDATA:Logger_name}\\s+(\\n)?(%{GREEDYLINES:stack_trace})'

# 此處移除 message 字段便於調試
drop_origin_data()

將上述多行日誌存為 multi-line.log,調試一下:

$ datakit --pl test.p --txt "$(<multi-line.log)" 

得到如下切割結果:

{  
"Level": "ERROR",  "Level_value": "1629881",  
"Logger_name": "o.s.s.s.TaskUtils$LoggingErrorHandler    : Unexpected error occurred in scheduled task",  
"log_time": "2022-02-10 16:27:36.116",  
"stack_trace": "java.lang.NullPointerException: null\n\tat com.xxxxx.xxxxxxxxxxx.xxxxxxx.impl.SxxxUpSxxxxxxImpl.isSimilarPrize(xxxxxxxxxxxxxxxxx.java:442)\n\tat com.xxxxx.xxxxxxxxxxx.xxxxxxx.impl.SxxxUpSxxxxxxImpl.lambda$getSimilarPrizeSnapUpDo$0(xxxxxxxxxxxxxxxxx.java:595)\n\tat java.util.stream.ReferencePipeline$3$1.accept(xxxxxxxxxxxxxxxxx.java:193)\n\tat java.util.ArrayList$ArrayListSpliterator.forEachRemaining(xxxxxxxxx.java:1382)\n\tat java.util.stream.AbstractPipeline.copyInto(xxxxxxxxxxxxxxxx.java:481)\n\tat java.util.stream.AbstractPipeline.wrapAndCopyInto(xxxxxxxxxxxxxxxx.java:471)\n\tat java.util.stream.ReduceOps$ReduceOp.evaluateSequential(xxxxxxxxx.java:708)\n\tat java.util.stream.AbstractPipeline.evaluate(xxxxxxxxxxxxxxxx.java:234)\n\tat java.util.stream.ReferencePipeline.collect(xxxxxxxxxxxxxxxxx.java:499)",  

"thread_name": "scheduling-1"
}

Pipeline 字段命名注意事項

在所有 Pipeline 切割出來的字段中,它們都是指標(field)而不是標籤(tag)。由於行協議約束,我們不應該切割出任何跟 tag 同名的字段。這些 Tag 包含如下幾類:

  • DataKit 中的全局 Tag

  • 日誌採集器中自定義的 Tag

另外,所有採集上來的日誌,均存在如下多個保留字段。我們不應該去覆蓋這些字段,否則可能導致數據在查看器頁面顯示不正常。

字段名 類型 說明
source string(tag) 日誌來源
service string(tag) 日誌對應的服務,默認跟 service 一樣
status string(tag) 日誌對應的等級
message string(field) 原始日誌
time int 日誌對應的時間戳

 

當然我們可以通過特定的 Pipeline 函數覆蓋上面這些 tag 的值。

一旦 Pipeline 切割出來的字段跟已有 Tag 重名(大小寫敏感),都會導致如下數據報錯。故建議在 Pipeline 切割中,繞開這些字段命名。

# 該錯誤在 DataKit monitor 中能看到
same key xxx in tag and field

完整 Pipeline 示例

這裡以 DataKit 自身的日誌切割為例。DataKit 自身的日誌形式如下:

2021-01-11T17:43:51.887+0800  DEBUG io  io/io.go:458  post cost 6.87021ms 

編寫對應 pipeline:

# pipeline for datakit log
# Mon Jan 11 10:42:41 CST 2021
# auth: tanb

grok(_, '%{_dklog_date:log_time}%{SPACE}%{_dklog_level:level}%{SPACE}%{_dklog_mod:module}%{SPACE}%{_dklog_source_file:code}%{SPACE}%{_dklog_msg:msg}')
rename("time", log_time) # 將 log_time 重名命名為 time
default_time(time)       # 將 time 字段作為輸出數據的時間戳
drop_origin_data()       # 丟棄原始日誌文本(不建議這麼做)

這裡引用了幾個用戶自定義的 pattern,如 _dklog_date、_dklog_level。我們將這些規則存放 <datakit安裝目錄>/pipeline/pattern 下。

 

注意,用戶自定義 pattern 如果需要==全局生效==(即在其它 Pipeline 腳本中應用),必須放置在 <DataKit安裝目錄/pipeline/pattern/> 目錄下):

$ cat pipeline/pattern/datakit
# 注意:自定義的這些 pattern,命名最好加上特定的前綴,以免跟內置的命名衝突(內置 pattern 名稱不允許覆蓋)
# 自定義 pattern 格式為:
#    <pattern-name><空格><具體 pattern 組合>
_dklog_date %{YEAR}-%{MONTHNUM}-%{MONTHDAY}T%{HOUR}:%{MINUTE}:%{SECOND}%{INT}
_dklog_level (DEBUG|INFO|WARN|ERROR|FATAL)
_dklog_mod %{WORD}
_dklog_source_file (/?[\w_%!$@:.,-]?/?)(\S+)?
_dklog_msg %{GREEDYDATA}

現在 pipeline 以及其引用的 pattern 都有了,就能通過 DataKit 內置的 pipeline 調試工具,對這一行日誌進行切割:

# 提取成功示例
$ ./datakit --pl dklog_pl.p --txt '2021-01-11T17:43:51.887+0800  DEBUG io  io/io.go:458  post cost 6.87021ms'
Extracted data(cost: 421.705µs):
{    
"code": "io/io.go:458",    
"level": "DEBUG",    
"module": "io",    
"msg": "post cost 6.87021ms",    
"time": 1610358231887000000
}

FAQ

Pipeline 調試時,為什麼變量無法引用?

Pipeline 為:

json(_, message, "message")
json(_, thread_name, "thread")
json(_, level, "status")
json(_, @timestamp, "time")

其報錯如下:

[E] new piepline failed: 4:8 parse error: unexpected character: '@' 

A: 對於有特殊字符的變量,需將其用兩個 ` 修飾一下:

json(_, `@timestamp`, "time") 

參見【 Pipeline 的基本語法規則 】//docs.guance.com/developers/pipeline/#basic-syntax

 

Pipeline 調試時,為什麼找不到對應的 Pipeline 腳本?

命令如下:

$ datakit pipeline test.p -T "..."
[E] get pipeline failed: stat /usr/local/datakit/pipeline/test.p: no such file or directory

A: 調試用的 Pipeline 腳本,需將其放置到 /pipeline 目錄下。

 

如何在一個 Pipeline 中切割多種不同格式的日誌?

在日常的日誌中,因為業務的不同,日誌會呈現出多種形態,此時,需寫多個 Grok 切割,為提高 Grok 的運行效率,可根據日誌出現的頻率高低,優先匹配出現頻率更高的那個 Grok,這樣,大概率日誌在前面幾個 Grok 中就匹配上了,避免了無效的匹配。

 

在日誌切割中,Grok 匹配是性能開銷最大的部分,故避免重複的 Grok 匹配,能極大的提高 Grok 的切割性能。

grok(_, "%{NOTSPACE:client_ip} %{NOTSPACE:http_ident} ...")
if client_ip != nil {    
# 證明此時上面的 grok 已經匹配上了,那麼就按照該日誌來繼續後續處理    
...
} else {    
# 這裡說明是不同的日誌來了,上面的 grok 沒有匹配上當前的日誌    
grok(_, "%{date2:time} \\[%{LOGLEVEL:status}\\] %{GREEDYDATA:msg} ...")

    if status != nil {       
 # 此處可再檢查上面的 grok 是否匹配上...    
} else {        
# 未識別的日誌,或者,在此可再加一個 grok 來處理,如此層層遞進    
}
}

如何丟棄字段切割

在某些情況下,我們需要的只是日誌==中間的幾個字段==,但不好跳過前面的部分,比如

200 356 1 0 44 30032 other messages

其中,我們只需要 44 這個值,它可能代碼響應延遲,那麼可以這樣切割(即 Grok 中不附帶 :some_field 這個部分):

grok(_, "%{INT} %{INT} %{INT} %{INT:response_time} %{GREEDYDATA}") 

add_pattern() 轉義問題

大家在使用 add_pattern() 添加局部模式時,容易陷入轉義問題,比如如下這個 pattern(用來通配文件路徑以及文件名):

(/?[\w_%!$@:.,-]?/?)(\S+)? 

如果我們將其放到全局 pattern 目錄下(即 pipeline/pattern 目錄),可這麼寫:

# my-testsource_file (/?[\w_%!$@:.,-]?/?)(\S+)?

  

 

如果使用 add_pattern(),就需寫成這樣:

 

# my-test.padd_pattern('source_file', '(/?[\\w_%!$@:.,-]?/?)(\\S+)?')

 

即這裏面反斜杠需要轉義。