聊聊流計算系統中的核心問題:狀態管理

本文選自《實時流計算系統設計與實現》 文末有驚喜

狀態管理是流計算系統的核心問題之一。在實現流數據的關聯操作時,流計算系統需要先將窗口內的數據臨時保存起來,然後在窗口結束時,再對窗口內的數據做關聯計算。在實現時間維度聚合特徵計算和關聯圖譜特徵計算時,更是需要創建大量的寄存用於記錄聚合的結果。而CEP的實現,本身就與常說的有限狀態機(Finite-state machine,FSM)是密切相關的。不管是為了關聯計算而臨時保存的數據,還是為了保存聚合計算的數據,抑或是CEP里的有限狀態機,這些數據都是流計算應用開始運行之後才創建和積累起來。如果沒有做持久化操作,這些數據在流計算應用重啟後會被完全清空。正因為如此,我們將這些數據稱之為流計算應用的「狀態」。從各種開源流計算框架的發展歷史來看,大家對實時流計算中的「狀態」問題也是一點點逐步弄清楚的。

我們將流在執行過程中涉及到的狀態分為兩類:流數據狀態和流信息狀態。

  • 流數據狀態。在流數據處理的過程中,可能需要處理事件窗口、時間亂序、多流關聯等問題,在解決這些問題的過程中,通常會涉及到對部分流數據的臨時緩存,並在處理完後將其清理。我們將臨時保存的部分流數據稱為「流數據狀態」。
  • 流信息狀態。在對流數據的分析過程中,會得到一些我們感興趣的信息,比如時間維度的聚合數據、關聯圖譜中的一度關聯節點數、CEP中的有限狀態機等,這些信息可能會在後續的流數據分析過程中被繼續使用,從而需要將這些信息保存下來。同時在後續的流數據處理過程中,這些信息還會被不斷地訪問和更新。我們將這些分析所得並保存下來的數據稱為「流信息狀態」。

圖1: 流數據狀態和流信息狀態

為什麼區分這兩種狀態非常重要?思考這麼一個問題,如果我們要計算「用戶過去7天交易的總金額」,該如何做?一種顯而易見的方法,是直接使用在各種流計算框架中都提供的窗口函數來實現。比如在Flink中如下:

userTransactions.keyBy(0)// 滑動窗口,每1秒鐘計算一次7天窗口內的交易金額.timeWindow(Time.days(7), Time.seconds(1)).sum(1);

上面的Flink示例代碼使用timeWindow窗口,每1秒鐘計算一次7天窗口內的總交易金額。其它流計算平台如Spark Streaming、Storm等也有類似的方法。但這樣做有以下幾點非常不妥:

  • 這個計算是每1秒鐘才能輸出結果,而如果是需要每來一個事件就要計算一次該事件所代表的用戶在「過去7天交易的總金額」,這種做法顯然就不可行。
  • 窗口為7天,滑動步長為1秒,這兩個時間相差的數量級也太大了。這也意味着需要在「7天除以1秒」這麼多個窗口中被重複計算!當然,這裡設置1秒是因為要儘可能地「實時」。如果覺得1秒太「過分」,也可以設置滑動步長為30秒、60秒等,但這並不能改變重複計算的本質,且滑動步長越長,離「實時計算」越遠。
  • 窗口為7天,就需要在實時流計算系統中緩存7天的流數據。而我們想要得到的其實只是一個聚合值而已,所以保存7天完整的流數據似乎有些殺雞用牛刀。當然,Flink對諸如sum、max、min之類的窗口聚合計算做了優化,可以不用保存窗口裡的全部數據,只需要保留聚合結果即可。但是如果用戶需要做些定製化操作(比如自定義Evictor)的話,就需要保存窗口內的全量數據了。
  • 如果要在一個事件上,計算幾十個類似於「用戶過去7天交易的總金額」這樣的特徵,按照timeWindow的實現方法,每個特徵可能會有不同的時間窗口和滑動步長,該怎樣同步這幾十個特徵計算的結果呢?

所以說,直接使用由流計算框架提供的窗口函數來實現諸如「時間維度聚合特徵」的計算問題,我們在很多情況下都會遇到問題。究其根本原因,是因為混淆了「對流的管理」和「對數據信息的管理」這兩者本身。因為「窗口」實際上是對「流數據」的分塊管理,我們用「窗口」來將「無窮無盡」的流數據分割成一個個的「數據塊」,然後在「數據塊」上做各種計算。這屬於對流數據的「分而治之」處理。我們不能將這種針對「流數據」本身的分治管理模式,與我們對數據的業務信息分析窗口耦和起來。

因此,我們需要將「對流的管理」和「對數據信息的管理」這兩者分離開來。其中「對流的管理」需要解決諸如窗口、亂序、多流關聯等問題,其中也會涉及對數據的臨時緩存,它緩存的是流數據本身,因此我們稱之為「流數據狀態」。而「對數據信息的管理」則是為了在我們在分析和挖掘數據內含信息時,幫助我們記錄和保存業務分析結果,因而稱之為「流信息狀態」。

流數據狀態管理中,比較重要的就是事件窗口、時間亂序和流的關聯操作。

事件窗口是產生流數據狀態的主要原因。比如「每30秒鐘計算一次過去五分鐘交易總額」、「每滿100個事件計算平均交易金額」、「統計用戶在一次活躍期間點擊過的商品數量」等。對於這些以「窗口」為單元來處理事件的方式,我們需要用一個緩衝區(buffer)臨時地存儲過去一段時間接收到的事件,等觸發窗口計算的條件滿足時,再觸發處理窗口內的事件。當處理完成後,還需要將過期和以後不再使用的數據清除掉。另外,在實際生產環境中,可能會出現故障恢復、重啟等情況,這些「緩衝區」的數據在必要時需要被寫入磁盤,並在重新計算或重啟時恢復。

解決時間亂序問題是使用流數據狀態的另一個重要原因。由於網絡傳輸和並發處理的原因,在流計算系統接收到事件時,非常有可能事件已經在時間上亂序了。比如時間戳為1532329665005的事件,比時間戳為1532329665001的事件先到達流計算系統。怎樣處理這種事件在時間上亂序的問題呢?通常的做法就是將收到的事件先保存起來,等過一段時間後亂序的事件到達時,再將其和保存的事件按時間排序,這樣就恢復了事件的時間順序。當然,上面的過程存在一個問題,就是「等過一段時間」到底是怎樣等以及等多久?針對這個問題有一個非常優秀的解決方案,就是水印(watermark)。使用水印解決時間亂序的原理如下,在流計算數據中,按照一定的規律(比如以特定周期)插入「水印」,水印是一個時間戳,當處理單元接收到「水印」時,表示應該處理所有時間戳在該水印之前的事件。我們通常將水印設置為事件的時間戳減去一段時間的值,這樣就給先到的時間戳較大的事件一個等待晚到的時間戳較小的事件的機會,而且確保了不會沒完沒了地等待下去。

流的關聯操作也會涉及流數據狀態的管理。常見的關聯操作有join和union。特別是在實現join操作時,需要先將參與join操作的各個流的相應窗口內的數據緩存在流計算系統內,然後以這些窗口內的數據為基礎,做類似於關係型數據庫中表與表之間的join計算,得到join計算的結果,之後再將這些結果以流的方式輸出。很顯然,流的關聯操作也是需要臨時保存部分流數據的,故而也是一種「流數據狀態」的運用。

除了以上三種「流數據狀態」的主要用途外,還有些地方也會涉及流數據狀態的管理,比如排序(sorting)、分組(group by)等。但不管怎樣,這些操作都有個共同的特點,即它們需要緩存的是部分原始的流數據。換言之,這些操作要保存的狀態是部分「流數據」本身。這也正是將這類狀態取名為「流數據狀態」的原因。流信息狀態是為了記錄流數據的處理和分析過程中獲得的我們感興趣的信息,這些信息會在後續的流處理過程中會被繼續使用和更新。以「實時計算每個交易事件在發生時過去7天交易的總金額」這個計算為例,可以將每小時的交易金額記錄為一條狀態,這樣,當一個交易事件到來時,計算「過去7天的交易總金額」,就是將過去7天每個小時的總交易金額讀取出來,然後對這些金額記錄求總和即可。在上面這個例子中,將每小時的交易金額記錄為一條狀態,就是我們說的「流信息狀態」。

流信息狀態的管理通常依賴於數據庫完成。這是因為對於從流分析出來的信息,我們可能需要保存較長時間,而且數據量會很大,如果將這些信息狀態放在內存中,勢必會佔用過多的內存,這是不必要的。對於保存的流信息狀態,我們並不是在每次計算中都會用到,它會存在冷數據和過期淘汰的問題。所以,對於流信息狀態的管理,交給專門的數據庫是非常明智的。畢竟目前為止,各種數據庫的選擇十分豐富,而且許多數據庫對熱數據緩存和TTL機制都有非常好的支持。

實時流計算應用中的「流數據狀態」和「流信息狀態」。可以說是分別從兩個不同的維度對「流」進行了管理。前者「流數據狀態」是從「時間」角度對流進行管理,而後者「流信息狀態」則是從「空間」角度對流的管理。「流信息狀態」彌補了「流數據狀態」彌補了「流數據狀態」只是對事件在時間序列上做管理的不足,將流的狀態擴展到了任意的空間。

作者簡介:周爽,本碩畢業於華中科技大學,先後在華為2012實驗室高斯部門和上海行邑信息科技有限公司工作。開發過實時分析型內存數據庫RTANA、華為公有雲RDS服務、移動反欺詐MoFA等產品。目前但任公司技術部架構師一職。著有《實時流計算系統設計與實現》一書。

本次聯合【機械工業出版社華章公司】為大家送上1本作者的正版書籍《實時流計算系統設計與實現》

請在關注「實時流式計算」 並在後台回復 「抽獎」參與活動

更多實時數據分析相關博文與科技資訊,歡迎關注 「實時流式計算」