Flink State 可以代替資料庫嗎?

  • 2019 年 11 月 28 日
  • 筆記

有狀態的計算作為容錯以及數據一致性的保證,是當今實時計算必不可少的特性之一,流行的實時計算引擎包括 Google Dataflow、Flink、Spark (Structure) Streaming、Kafka Streams 都分別提供對內置 State 的支援。State 的引入使得實時應用可以不依賴外部資料庫來存儲元數據及中間數據,部分情況下甚至可以直接用 State 存儲結果數據,這讓業界不禁思考: State 和 Database 是何種關係?有沒有可能用 State 來代替資料庫呢?

在這個課題上,Flink 社區是比較早就開始探索的。總體來說,Flink 社區的努力可以分為兩條線: 一是在作業運行時通過作業查詢介面訪問 State 的能力,即 QueryableState;二是通過 State 的離線 dump 文件(Savepoint)來離線查詢和修改 State 的能力,即即將引入的 Savepoint Processor API

QueryableState

在 2017 年發布的 Flink 1.2 版本,Flink 引入了 QueryableState 的特性以允許用戶通過特定的 client 查詢作業 State 的內容 [1],這意味著 Flink 應用可以在完全不依賴 State 存儲介質以外的外部存儲的情況下提供實時訪問計算結果的能力。

只通過 Queryable State 提供實時數據訪問

然而,QueryableState 雖然設想上比較理想化,但由於依賴底層架構的改動較多且功能也比較受限,它一直處於 Beta 版本並不能用於生產環境。針對這個問題,在前段時間騰訊的工程師楊華提出 QueryableState 的改進計劃 [2]。在郵件列表中,社區就 QueryableState 是否可以用於代替資料庫作了討論並出現了不同的觀點。筆者結合個人見解將 State as Database 的主要優缺點整理如下。

優點:

  • 更低的數據延遲。一般情況下 Flink 應用的計算結果需要同步到外部的資料庫,比如定時觸發輸出窗口計算結果,而這種同步通常是定時的會帶來一定的延遲,導致計算是實時的而查詢卻不是實時的尷尬局面,而直接 State 則可以避免這個問題。
  • 更強的數據一致性保證。根據外部存儲的特性不同,Flink Connector 或者自定義的 SinkFunction 提供的一致性保障也有所差別。比如對於不支援多行事務的 HBase,Flink 只能通過業務邏輯的冪等性來保障 Exactly-Once 投遞。相比之下 State 則有妥妥的 Exactly-Once 投遞保證。
  • 節省資源。因為減少了同步數據到外部存儲的需要,我們可以節省序列化和網路傳輸的成本,另外當然還可以節省資料庫成本。

缺點:

  • SLA 保障不足。資料庫技術已經非常成熟,在可用性、容錯性和運維上都很多的積累,在這點上 State 還相當於是處於原始人時期。另外從定位上來看,Flink 作業有版本迭代維護或者遇到錯誤自動重啟帶來的 down time,並不能達到資料庫在數據訪問上的高可用性。
  • 可能導致作業的不穩定。未經過考慮的 Ad-hoc Query 可能會要求掃描並返回誇張量級的數據,這會系統帶來很大的負荷,很可能影響作業的正常執行。即使是合理的 Query,在並發數較多的情況下也可能影響作業的執行效率。
  • 存儲數據量不能太大。State 運行時主要存儲在 TaskManager 本地記憶體和磁碟,State 過大會造成 TaskManager OOM 或者磁碟空間不足。另外 State 大意味著 checkpoint 大,導致 checkpoint 可能會超時並顯著延長作業恢復時長。
  • 只支援最基礎的查詢。State 只能進行最簡單的數據結構查詢,不能像關係型資料庫一樣提供函數等計算能力,也不支援謂詞下推等優化技術。
  • 只可以讀取,不能修改。State 在運行時只可以被作業本身修改,如果實在要修改 State 只能通過下文的 Savepoint Processor API 來實現。

總體來說,目前 State 代替資料庫的缺點還是遠多於其優點,不過對於某些對數據可用性要求不高的作業來說,使用 State 作為資料庫還是完全合理的。由於定位上的不同,Flink State 在短時間內很難看到可以完全替代資料庫的可能性,但在數據訪問特性上 State 往資料庫方向發展是無需質疑的。

Savepoint Processor API

Savepoint Processor API 是社區最近提出的一個新特性(見 FLIP-42 [3]),用於離線對 State 的 dump 文件 Savepoint 進行分析、修改或者直接根據數據構建出一個初始的 Savepoint。Savepoint Processor API 屬於 Flink State Evolution 的 State Management。如果說 QueryableState 是 DSL 的話,Flink State Evolution 就是 DML,而 Savepoint Processor API 就是 DML 中最為重要的部分。

Savepoint Processor API 的前身是第三方的 Bravo 項目 [4],主要思路提供 Savepoint 和 DataSet 相互轉換的能力,典型應用是 Savepoint 讀取成 DataSet,在 DataSet 上進行修改,然後再寫為一個新的 Savepoint。這適合用於以下的場景:

  • 分析作業 State 以研究其模式和規律
  • 排查問題或者審計
  • 為新的應用構建的初始 State
  • 修改 Savepoint,比如:
    • 改變作業最大並行度
    • 進行巨大的 Schema 改動
    • 修正有問題的 State

Savepoint 作為 State 的 dump 文件,通過 Savepoint Processor API 可以暴露數據查詢和修改功能,類似於一個離線的資料庫,但 State 的概念和典型關係型數據的概念還是有很多不同,FLIP-43 也對這些差異進行了類比和總結。

首先 Savepoint 是多個 operator 的 state 的物理存儲集合,不同 operator 的 state 是獨立的,這類似於資料庫下不同 namespace 之間的 table。我們可以得到 Savepoint 對應資料庫,單個 operator 對應 Namespace。

Database

Savepoint

Namespace

Uid

Table

State

但就 table 而言,其在 Savepoint 里對應的概念根據 State 類型的不同而有所差別。State 有 Operator State、Keyed State 和 Broadcast State 三種,其中 Operator State 和 Broadcast State 屬於 non-partitioned state,即沒有按 key 分區的 state,而相反地 Keyed State 則屬於 partitioned state。對於 non-partitioned state 來說,state 是一個 table,state 的每個元素即是 table 里的一行;而對於 partitioned state 來說,同一個 operator 下的所有 state 對應一個 table。這個 table 像是 HBase 一樣有個 row key,然後每個具體的 state 對應 table 里的一個 column。

舉個例子,假設有一個遊戲玩家得分和在線時長的數據流,我們需要用 Keyed State 來記錄玩家所在組的分數和遊戲時長,用 Operator State 記錄玩家的總得分和總時長。

在一段時間內數據流的輸入如下:

user_id

user_name

user_group

score

1001

Paul

A

5,000

1002

Charlotte

A

3,600

1003

Kate

C

2,000

1004

Robert

B

3,900

user_id

user_name

user_group

time

1001

Paul

A

1,800

1002

Charlotte

A

1,200

1003

Kate

C

600

1004

Robert

B

2,000

用 Keyed State ,我們分別註冊 group_score 和 group_time 兩個 MapState 表示組總得分和組總時長,並根據 user_group keyby 數據流之後將兩個指標的累積值更新到 State 里,得到的表如下:

user_group

group_score

group_time

A

8,600

3,000

C

2,00

600

B

3,900

2,000

相對地,假如用 Operator State 來記錄總得分和總時長(並行度設為 1),我們註冊 total_score 和 total_time 兩個 State,得到的表有兩個:

total_score

14,500

total_time

5,600

至此 Savepoint 和 Database 的對應關係應該是比較清晰明了的。而對於 Savepoint 來說還有不同的 StateBackend 來決定 State 具體如何持續化,這顯然對應的是資料庫的存儲引擎。在 MySQL 中,我們可以通過簡單的一行命令 ALTER TABLE xxx ENGINE = InnoDB; 來改變存儲引擎,在背後 MySQL 會自動完成繁瑣的格式轉換工作。而對於 Savepoint 來說,由於 StateBackend 各自的存儲格式不兼容,目前尚不能方便地切換 StateBackend。為此,社區在不久前創建 FLIP-41 [5] 來進一步完善 Savepoint 的可操作性。

總 結

State as Database 是實時計算髮展的大趨勢,它並不是要代替資料庫的使用,而是借鑒資料庫領域的經驗拓展 State 介面使其操作方式更接近我們熟悉的資料庫。對於 Flink 而言,State 的外部使用可以分為在線的實時訪問和離線的訪問和修改,分別將由 Queryable State 和 Savepoint Processor API 兩個特性支援。

參考文獻

  • Queryable State in Apache Flink® 1.2.0: An Overview & Demo https://www.ververica.com/blog/queryable-state-use-case-demo
  • Improve Queryable State and Introduce a QueryServerProxy Component http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Improve-Queryable-State-and-introduce-a-QueryServerProxy-component-td28578.html#a28581
  • FLIP-43: Savepoint Processor API https://cwiki.apache.org/confluence/display/FLINK/FLIP-43%3A+State+Processor+API
  • Bravo: Utilities for processing Flink checkpoints/savepoints https://github.com/king/bravo
  • FLIP-41: Unify Keyed State Snapshot Binary Format for Savepoints https://cwiki.apache.org/confluence/display/FLINK/FLIP-41%3A+Unify+Binary+format+for+Keyed+State

作者介紹:

林小鉑,網易遊戲高級開發工程師,負責遊戲數據中心實時平台的開發及運維工作,目前專註於 Apache Flink 的開發及應用。探究問題本來就是一種樂趣。