Flink狀態編程-學習小結
- 2020 年 3 月 3 日
- 筆記
一、基礎概念
在Flink架構體系中,有狀態計算可以說是Flink非常重要的特性之一。

有狀態計算是指:
在程式計算過程中,在Flink程式內部存儲計算產生的中間結果,並提供給後續Function或運算元計算結果使用。(如下圖所示)

無狀態計算實現的複雜度相對較低,實現起來較容易,但是無法完成提到的比較複雜的業務場景:
- CEP(複雜事件處理):獲取符合某一特定事件規則的事件,狀態計算就可以將接入的事件進行存儲,然後等待符合規則的事件觸發
- 最大值、均值等聚合指標(如pv,uv): 需要利用狀態來維護當前計算過程中產生的結果,例如事件的總數、總和以及最大,最小值等
- 機器學習場景,維護當前版本模型使用的參數
- 其他需要使用歷史數據的計算
二、Flink狀態編程
1、支援的狀態類型
Flink根據數據集是否根據Key進行分區,將狀態分為Keyed State和Operator State(Non-keyed State)兩種類型。
其中Keyed State是Operator State的特例,可以通過Key Groups進行管理,主要用於當運算元並行度發生變化時,自動重新分布Keyed Sate數據
同時在Flink中Keyed State和Operator State均具有兩種形式:
- 一種為託管狀態(ManagedState)形式,由Flink Runtime中控制和管理狀態數據,並將狀態數據轉換成為記憶體Hashtables或RocksDB的對象存儲,然後將這些狀態數據通過內部的介面持久化到Checkpoints中,任務異常時可以通過這些狀態數據恢復任務。
- 另外一種是原生狀態(Raw State)形式,由運算元自己管理數據結構,當觸發Checkpoint過程中,Flink並不知道狀態數據內部的數據結構,只是將數據轉換成bytes數據存儲在Checkpoints中,當從Checkpoints恢復任務時,運算元自己再反序列化出狀態的數據結構。
在Flink中推薦用戶使用Managed State管理狀態數據,主要原因是Managed State能夠更好地支援狀態數據的重平衡以及更加完善的記憶體管理。
2、Managed Keyed State
六種類型
Managed Keyed State 又分為如下六種類型:

FoldingState已經被標註為deprecated
基本API
在Flink中需要通過創建StateDescriptor來獲取相應State的操作類。如下方程式碼,構建一個ValueState:
lazy val isPayedState: ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("is-payed-state", classOf[Boolean]))
其中對ValueState可以增刪改查:
# 獲取狀態值 val isPayed = isPayedState.value() # 更新狀態值 isPayedState.update(true) # 釋放狀態值 isPayedState.clear()
狀態的生命周期
對於任何類型Keyed State都可以設定狀態的生命周期(TTL),以確保能夠在規定時間內及時地清理狀態數據。
實現方法:
1、生成StateTtlConfig配置
2、將StateTtlConfig配置傳入StateDescriptor中的enableTimeToLive方法中即可
import org.apache.flink.api.common.state.StateTtlConfig import org.apache.flink.api.common.state.ValueStateDescriptor import org.apache.flink.api.common.time.Time val ttlConfig = StateTtlConfig .newBuilder(Time.seconds(1)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String]) stateDescriptor.enableTimeToLive(ttlConfig)
StateTtlConfig的詳細配置見: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/state.html#state-time-to-live-ttl
3、Managed Operator State
Operator State是一種non-keyed state,與並行的操作運算元實例相關聯,例如在KafkaConnector中,每個Kafka消費端運算元實例都對應到Kafka的一個分區中,維護Topic分區和Offsets偏移量作為運算元的Operator State。在Flink中可以實現Checkpointed-Function
或者ListCheckpointed<T extends Serializable>
兩個介面來定義操作Managed Operator State的函數。
(待補充……)
三、案例:訂單延遲告警統計
1、需求描述
需求與數據來自《大數據技術之電商用戶行為分析》
在電商平台中,最終創造收入和利潤的是用戶下單購買的環節;更具體一點,是用戶真正完成支付動作的時候。用戶下單的行為可以表明用戶對商品的需求,但在現實中,並不是每次下單都會被用戶立刻支付。當拖延一段時間後,用戶支付的意願會降低。
所以為了讓用戶更有緊迫感從而提高支付轉化率,同時也為了防範訂單支付環節的安全風險,電商網站往往會對訂單狀態進行監控,設置一個失效時間(比如 15 分鐘),如果下單後一段時間仍未支付,訂單就會被取消。
此時需要給用戶發送一個資訊提醒用戶,提高支付轉換率!
2、需求分析
本需求可以使用CEP來實現,但這裡推薦使用process function原生的狀態編程。
問題可以簡化成: 在pay事件超時未發生的情況下,輸出超時報警資訊。
一個簡單的思路是:
- 在訂單的 create 事件到來後註冊定時器,15分鐘後觸發;
- 用一個布爾類型的 Value 狀態來作為標識位,表明 pay 事件是否發生過。
- 如果 pay 事件已經發生,狀態被置為true,那麼就不再需要做什麼操作;
- 而如果 pay 事件一直沒來,狀態一直為false,到定時器觸發時,就應該輸出超時報警資訊。
3、數據與模型
示例數據:
34729,create,,1558430842 34730,create,,1558430843 34729,pay,sd76f87d6,1558430844 34730,modify,3hu3k2432,1558430845 34731,create,,1558430846 34731,pay,35jue34we,1558430849 34732,create,,1558430852 34733,create,,1558430855 34734,create,,1558430859 34734,create,,1558431000 34733,pay,,1558431000 34732,pay,,1558449999
我們可以得到Flink的輸入與輸出類
// 定義輸入訂單事件的樣例類 case class OrderEvent(orderId: Long, eventType: String, txId: String, eventTime: Long) // 定義輸出結果樣例類 case class OrderResult(orderId: Long, resultMsg: String)
4、詳細實現
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector object OrderTimeout { val orderTimeoutOutputTag = new OutputTag[OrderResult]("orderTimeout") def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val orderEventStream = env.socketTextStream("127.0.0.1", 9999) .map(data => { val dataArray = data.split(",") OrderEvent(dataArray(0).trim.toLong, dataArray(1).trim, dataArray(2).trim, dataArray(3).trim.toLong) }) .assignAscendingTimestamps(_.eventTime * 1000L) .keyBy(_.orderId) val orderResultStream = orderEventStream.process(new OrderPayMatch()) orderResultStream.print("payed") orderResultStream.getSideOutput(orderTimeoutOutputTag).print("time out order") env.execute("order timeout without cep job") } class OrderPayMatch() extends KeyedProcessFunction[Long, OrderEvent, OrderResult]() { lazy val isPayedState: ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("is-payed-state", classOf[Boolean])) lazy val timerState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("timer-state", classOf[Long])) override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, OrderEvent, OrderResult]#OnTimerContext, out: Collector[OrderResult]): Unit = { val isPayed = isPayedState.value() if (isPayed) { ctx.output(orderTimeoutOutputTag, OrderResult(ctx.getCurrentKey, "payed but no create")) } else { // 典型場景,只create,沒有pay ctx.output(orderTimeoutOutputTag, OrderResult(ctx.getCurrentKey, " order timeout")) } isPayedState.clear() timerState.clear() } override def processElement(value: OrderEvent, ctx: KeyedProcessFunction[Long, OrderEvent, OrderResult]#Context, out: Collector[OrderResult]): Unit = { val isPayed = isPayedState.value() val timerTs = timerState.value() if (value.eventType == "create") { // 亂序行為,先到pay再到create if (isPayed) { out.collect(OrderResult(value.orderId, "payed successfully")) ctx.timerService().deleteEventTimeTimer(timerTs) isPayedState.clear() timerState.clear() } else { // 已創建訂單未支付,設置定時器 val ts = value.eventTime * 1000L + 15 * 60 * 1000L ctx.timerService().registerEventTimeTimer(ts) timerState.update(ts) } } else if (value.eventType == "pay") { //假如有定時器,說明create過 if (timerTs > 0) { // timerTs 是 認為超時後的時間戳 if (timerTs > value.eventTime * 1000L) { out.collect(OrderResult(value.orderId, "payed successfully")) } else { ctx.output(orderTimeoutOutputTag, OrderResult(value.orderId, "this order is timeout")) } ctx.timerService().deleteEventTimeTimer(timerTs) isPayedState.clear() timerState.clear() } else { // 先來pay isPayedState.update(true) // 等待watermark時間 ctx.timerService().registerEventTimeTimer(value.eventTime * 1000L) timerState.update(value.eventTime * 1000L) } } } } }