Flink cep的初步使用
- 2019 年 10 月 3 日
- 筆記
一、CEP是什麼
在應用系統中,總會發生這樣或那樣的事件,有些事件是用戶觸發的,有些事件是系統觸發的,有些可能是第三方觸發的,但它們都可以被看做系統中可觀察的狀態改變,例如用戶登陸應用失敗、用戶下了一筆訂單或RFID感測器回報的消息。應對狀態改變的策略可以分為兩類,一類是簡單事件處理(Simple event processing),一般簡單事件處理會有兩個步驟,過濾和路由,決定是否要處理,由誰處理,另一類是複雜事件處理(Complex event processing),複雜事件處理本身也會處理單一的事件,但其典型特質是需要對多個事件組成的是事件流進行檢測分析並響應。
在維基百科中也對CEP做了定義,“CEP是一種事件處理模式,它從若干源中獲取事件,並偵測複雜環境的事件或模式,CEP的目的是確認一些有意義的事件(比如某種威脅或某種機會),並儘快對其作出響應”,可見CEP的主要特點包括:複雜性,需要在多源的事件流中進行檢測;低延遲,秒級或毫秒級的響應,比如應對威脅;高吞吐,需要迅速對大量或者超大量事件流作出響應。
以往的CEP框架往往處理大量收集到的事件,不能處理正在收集的事件,這時,Flink來了。
二、Flink CEP
Flink作為目前大數據領域實時計算的主流計算框架,天然支援低延遲、高吞吐等特性,再加上Flink中的窗口模型和狀態模型,更是對CEP提供了非常強大的支撐。Flink中專門實現了複雜事件處理的庫——Flink CEP,用來方便的進行在事件流中檢測事件模式。
以下是一個簡單的例子,說明在Flink中如何實現CEP:
1 public class CepEvent { 2 public static void main(String[] args) throws Exception { 3 StreamExecutionEnvironment env 4 = StreamExecutionEnvironment.getExecutionEnvironment(); 5 DataStream<Tuple3<Integer, String, String>> eventStream = env.fromElements( 6 Tuple3.of(1500, "login", "fail"), 7 Tuple3.of(1500, "login", "fail"), 8 Tuple3.of(1500, "login", "fail"), 9 Tuple3.of(1320, "login", "success"), 10 Tuple3.of(1450, "exit", "success"), 11 Tuple3.of(982, "login", "fail")); 12 AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.skipPastLastEvent(); 13 Pattern<Tuple3<Integer, String, String>, ?> loginFail = 14 Pattern.<Tuple3<Integer, String, String>>begin("begin", skipStrategy) 15 .where(new IterativeCondition<Tuple3<Integer, String, String>>() { 16 @Override 17 public boolean filter(Tuple3<Integer, String, String> s, 18 Context<Tuple3<Integer, String, String>> context) throws Exception { 19 return s.f2.equalsIgnoreCase("fail"); 20 } 21 }).times(3).within(Time.seconds(5)); 22 PatternStream<Tuple3<Integer, String, String>> patternStream = 23 CEP.pattern(eventStream.keyBy(x -> x.f0), loginFail); 24 DataStream<String> alarmStream = 25 patternStream.select(new PatternSelectFunction<Tuple3<Integer, String, String>, String>() { 26 @Override 27 public String select(Map<String, List<Tuple3<Integer, String, String>>> map) throws Exception { 28 String msg = String.format("ID %d has login failed 3 times in 5 seconds." 29 , map.values().iterator().next().get(0).f0); 30 return msg; 31 } 32 }); 33 34 alarmStream.print(); 35 36 env.execute("cep event test"); 37 } 38 }
運行結果如下:
可見成功了捕獲了ID為1500的用戶。
在Flink中實現一個CEP可以總結為四步:一,構建需要的數據流,二,構造正確的模式,三,將數據流和模式進行結合,四,在模式流中獲取匹配到的數據。其中第一步和第三步一般會是標準操作,核心在於第二部構建模式,需要利用Flink CEP支援的特性,構造出正確反映業務需求的匹配模式。
三、Flink CEP中對CEP的支撐
Flink CEP的核心在於模式匹配,對於不同模式匹配特性的支援,往往決定相應的CEP框架是否能夠得到廣泛應用。Flink CEP對模式提供了如下的一些支援:
(一) 支援匹配模式
模式匹配具有一些共同的基礎模式,對不同的模式匹配的語義的表達和實現,意味著這種模式能在多大範圍內得到應用。
Flink CEP對模式匹配的語義支援具有如下特點:
- 支援匹配數量,提供匹配次數的支援,可以指定匹配一次或多次(oneOrMore),可以指定匹配固定數量次(times(n)),也可以指定範圍固定數量次(times(n,m))。
- 支援歷史匹配,在匹配的過程中,既可以對當前事件進行屬性判斷,也可以對匹配事件組中的歷史匹配結果進行回溯來進行判定。
- 支援組匹配,支援將單模式進行組合成為模式組,支援不同的組合模式和語義,比如,or,until,begin,next,followBy,otNext,notFollowBy。
- 支援窗口匹配,支援時間窗口,可以方便的在某個時間窗口內進行模式匹配。
(二) 支援不同臨近條件
如果只是單模式匹配,則無需考慮臨近條件。在模式組的執行中,即 多個模式組合執行的過程中,臨近條件指的是如何將一組事件匹配到特定的模式組中的不同模式。不同的臨近條件的使用,會顯著的改變最終匹配的結果。
Flink CEP中支援如下三種臨近條件:
A. Strict Contiguity,嚴格臨近指的是匹配事件必須具有嚴格的前後相鄰關係,即匹配事件之間沒有非匹配事件。
B. Relaxed Contiguity,寬鬆匹配指的是匹配事件可以有非匹配事件,非匹配事件的存在不阻擋非連續事件被匹配成功。
C. Non-Deterministic Relaxed Contiguity,非確定性寬鬆匹配在寬鬆匹配的基礎上,即使某個事件被某個模式匹配完畢還可以參加後面其他模式的匹配。
(三) 支援不同匹配後策略
匹配後策略指的是當某一組事件成功匹配了某個模式之後,這組事件以何種方式參與後續的模式匹配。不同的匹配後策略會導致大相徑庭的匹配結果,所以在實際開發中,需要小心的選擇合適的匹配後策略。
Flink CEP支援如下五種匹配後策略:
A. NO_SKIP策略,意即當前事件組中的事件還會不受約束的參與後續的模式匹配。
B. SKIP_TO_NEXT策略,意即當前事件組中除了第一個事件之外,其他事件可以不受約束的參與後續的模式匹配。
C. SKIP_PAST_LAST_EVENT策略,意即當前事件組中的任意一個事件都不參與後續的模式匹配。
D. SKIP_TO_FIRST策略,此種策略需要指定一個模式,當前事件組中的任何子匹配如果包含指定模式匹配事件組中的最大匹配事件組,則此子匹配會被丟棄。
E. SKIP_TO_LAST策略,此種策略需要指定一個模式,當前事件組中的任何子匹配如果包含指定模式匹配事件組中的最小匹配事件組,則此子匹配會被丟棄。
(四) 支援事件時間與亂序
在CEP的處理過程中,事件到達的順序至關重要,因為事件到達的順序會真正決定是否可以與相應的模式成功匹配。目前業界已有的CEP計算框架一般都採用事件到達的自然順序,即處理時間作為模式匹配的基礎,這種模式不能滿足目前分散式環境下CEP的要求。在大數據分散式環境下,事件到達的順序和事件發生的順序往往不匹配,存在延遲到達和亂序等情況,這時往往需要依靠事件時間來進行相應的模式匹配,不然就會發生匹配錯誤或者匹配失效。
Flink特有的事件時間模型,包括event time特性與watermark機制同樣可以在Flink CEP中發揮作用。Flink CEP會將事件進行快取,不在一開始就進行模式匹配,在相應的watermark到底之後,Flink CEP將快取中的事件按照事件時間進行排序,然後再進行相應的模式匹配,能夠在很大程度上解決分散式環境下的CEP難題。
四、結尾
CEP在生活中的各行各業可以有很多應用,比如金融行業的風險控制、欺詐識別、行情策略等等,比如安全領域的攻擊告警、危險建模、漏洞發現等等,再比如智慧交通、用戶漏斗等等,再關聯目前的IOT,其可應用的場景不勝枚舉。需要進一步挖掘和改進Flink CEP的能力,為各種業務場景賦能和輸出價值。