如何優雅的升級 Flink Job?

Flink 作為有狀態計算的流批一體分佈式計算引擎,會在運行過程中保存很多的「狀態」數據,並依賴這些數據完成任務的 Failover 以及任務的重啟恢復。

那麼,請思考一個問題:如果程序升級迭代調整了這些「狀態」的數據結構以及類型,Flink 能不能從舊的「狀態」文件(一般就是 Savepoint 文件)中恢復?

數據類型

上一篇我們介紹過 Flink 內置的一些用於狀態存儲的集合工具,如 ValueState、ListState、MapState 等。這些只是裝數據的容器,具體能存儲哪些類型的數據或許你還不清楚。

實際上,Flink 支持以下一些數據類型:

內建類型狀態數據結構更新

Flink 中默認提供對一些特定條件下的狀態數據結構升級的自動兼容,無需用戶介入。

POJO 類型

Flink 基於下面的規則來支持 POJO 類型結構的升級:

  • 可以刪除字段。一旦刪除,被刪除字段的前值將會在將來的 checkpoints 以及 savepoints 中刪除。
  • 可以添加字段。新字段會使用類型對應的默認值進行初始化,比如 Java 類型。
  • 不可以修改字段的聲明類型。
  • 不可以改變 POJO 類型的類名,包括類的命名空間。

其中,比較重要的是,對於一個 POJO 對象的某些字段的類型修改是不被支持的,因為 Savepoint 文件是按照二進制位緊湊存儲的,不同類型佔用的 bit 位長度是不一樣的。

按照目前的 Flink 內置支持能力,最多對於 POJO 類型增加或者刪除字段等基本操作。

Avro 類型

Avro 的 Schema 用 JSON 表示。Schema 定義了簡單數據類型和複雜數據類型。Flink 完全支持 Avro 的 Schema 升級。

因為 Avro 本身就是一個高性能的數據序列化框架,它使用JSON 來定義數據類型和通訊協議,使用壓縮二進制格式來序列化數據。

Flink 中相當於藉助它完成數據的序列化和反序列化,那麼理論上只要用戶的 Schema 升級是 Avro 支持的,那麼 Flink 也是完全支持的。

非內建類型狀態數據結構更新

除了上述兩種 Flink 內置支持的兩種類型外,其餘所有類型均不支持 Schema 升級。那麼我們就只有通過自定義狀態序列化器來完成對狀態 Schema 升級的兼容。

序列化反序列化的流程

HashMapStateBackend 這種基於內存的狀態後端和 EmbeddedRocksDBStateBackend 這種基於 RocksDb 的狀態後端的序列化與反序列化流程稍有不同。

基於內存狀態後端的序列化反序列化流程:

  • Job的相關狀態的數據是以Object的形式存儲在JVM內存堆中
  • 通過Checkpoint/Savepoint機制將內存中的狀態數據序列化到外部存儲介質
  • 新序列化器反序列化的時候會通過舊的序列化器反序列化數據到內存
  • 基於內存中狀態更新後再通過新序列化器序列化數據到外部存儲介質

基於RocksDb狀態後端的序列化反序列化流程:

  • Job的相關狀態數據直接經過序列化器序列化好存儲在JVM堆外內存中
  • 通過Checkpoint/Savepoint機制將內存中序列化好的數據原樣傳輸到外部存儲介質
  • 新序列化器反序列化的時候會從外部介質直接讀取狀態數據到內存(不做反序列化操作)
  • 對於使用到的狀態數據會使用舊序列化器先反序列化,再修改,再使用新序列化器序列化

其中,對於後面兩個步驟與內存狀態後端是有區別的,相當於是一種 lazy 的模式,只有用到才會去反序列化。

舉個例子:狀態中有個 KeyedState(我們知道每個Key會對應一個狀態),那麼如果某些 Key 的狀態數據恢復到內存後沒有被程序使用或者更新,那麼下一次序列化的時候就不會使用新序列化器操作。

那麼,結果就是:對於一個 Job 的 Checkpoint/Savepoint 文件里是存在多個版本的。這也是待會兒要提到的,對於每一次序列化都會把序列化器的相關配置以快照的形式和數據一起存儲,這樣才保證了多個版本狀態數據存在的可能。

演示一個 Schema 升級狀態恢復失敗的 Demo

模擬一個訂單系統上報數據的場景,計算每十秒系統的訂單量以及下單數最多的用戶

1、自定義一個 SourceFunction 模擬上游源源不斷產生數據:

public class MakeDataSource extends RichSourceFunction<OrderModel> {

    private boolean flag = true;

    @Override
    public void run(SourceContext<OrderModel> sourceContext) throws Exception {
        List<String> userIdSet = Arrays.asList("joha""nina""gru""andi");
        Random random = new Random();
        while(flag){
            OrderModel order = new OrderModel();
            order.setCreateTs(System.currentTimeMillis());
            order.setOrderId(UUID.randomUUID().toString());
            order.setUserId(userIdSet.get(random.nextInt(4)));
            sourceContext.collect(order);
        }
    }
    @Override
    public void cancel() {
        flag = false;
    }
}

2、寫一個 job 每十秒聚合一次窗口,輸出用戶產生的訂單數量:

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(1000 * 60);
        env.getCheckpointConfig().setCheckpointStorage("file:///data/");

        env.addSource(new MakeDataSource())
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<OrderModel>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                                .withTimestampAssigner((SerializableTimestampAssigner<OrderModel>) (orderModel, l) -> orderModel.getCreateTs())
                )
                .keyBy((KeySelector<OrderModel, String>) orderModel -> orderModel.getUserId())
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
        .process(new ProcessWindowFunction<OrderModel, Tuple2<Long,String>, String, TimeWindow>() {
            @Override
            public void process(String key, Context context, Iterable<OrderModel> elements, Collector<Tuple2<Long, String>> out) throws Exception {
                Iterator<OrderModel> iterator = elements.iterator();
                int count = 0;
                while (iterator.hasNext()){
                    iterator.next();
                    count++;
                }
                logger.info("userid:{},count:{}",key,count);
            }
        });

        env.execute("test_job");
    }

WebUi 上看大概就是這個樣子,由於我是本地 docker 起的集群,所以資源不是特別充足,並行度都是 1。

我們執行以下命令停止任務並生成 Savepoint:

root@ae894850e6ae:/opt/flink# flink stop 613c3662a4a4f5affa8eb8fb04bf4592
Suspending job "613c3662a4a4f5affa8eb8fb04bf4592" with a savepoint.
Savepoint completed. Path: file:/data/flink-savepoints/savepoint-613c36-3f0f01590c70

然後我們給我們的狀態對象 OrderModel 的 orderId 字段類型從 String 給他改成 Integer。

再指定 Savepoint 重啟 Job,不出意外的話,你應該也會得到這麼個錯誤:

自定義狀態序列化器

1、需要繼承 TypeSerializer 並實現其中相關方法,其中比較重要的有這麼幾個:

//創建一個待序列化的數據類型實例
public abstract T createInstance();
//序列化操作
public abstract void serialize(T record, DataOutputView target) throws IOException;
//反序列化操作
public abstract T deserialize(DataInputView source) throws IOException;
//這個比較重要,用於對序列化器進行快照存儲
public abstract TypeSerializerSnapshot<T> snapshotConfiguration();

2、對於 TypeSerializerSnapshot 來說它實際上就是提供了對序列化器的快照存儲以及版本兼容處理,核心方法有這麼幾個:

//把當前序列化器以二進制格式和數據寫到一起
void writeSnapshot(DataOutputView out) throws IOException;
//從當前輸入流中,讀出序列化器,一般會有一個類私有變量來存儲
void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader)throws IOException;
//檢驗當前序列化器是否能兼容之前版本
TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializer<T> newSerializer);
//重置當前序列化器為之前的一個序列化器  
TypeSerializer<T> restoreSerializer();

這裏面有兩個比較核心,resolveSchemaCompatibility 傳入一個新的序列化器,然後判斷這個序列化器是否能夠兼容反序列化之前版本序列化器序列化的數據,TypeSerializerSchemaCompatibility.type 枚舉定義了可返回的類型:

enum Type {
  //兼容,並且今後使用用戶新定義的 Serializer
  COMPATIBLE_AS_IS
  //不兼容,需要重置之前序列化器反序列化後再使用新序列化器序列化
  COMPATIBLE_AFTER_MIGRATION
  //兼容,需要返回一個reconfiguredNewSerializer,替換傳入的序列化器
  COMPATIBLE_WITH_RECONFIGURED_SERIALIZER
  //不兼容,作業拋異常退出
  INCOMPATIBLE
}

那麼對於我們上面的案例,把 POJO 中字段類型從 String 改成 Integer 的情況,其實只要重寫 TypeSerializerSnapshot.resolveSchemaCompatibility 方法,返回 COMPATIBLE_AFTER_MIGRATION 類型,然後再 resolveSchemaCompatibility 中返回上一個版本的序列化器(可以反序列化String)即可。

限於篇幅就不再演示了,歡迎交流!

本文所有測試來自本地 docker 起的 Flink session 集群,如有需要 docker-compose.yml 文件的可以公眾號回復「flink-docker」領取

Tags: