[源碼分析] 從實例和源碼入手看 Flink 之廣播 Broadcast

  • 2020 年 3 月 29 日
  • 筆記

[源碼分析] 從實例和源碼入手看 Flink 之廣播 Broadcast

0x00 摘要

本文將通過源碼分析和實例講解,帶領大家熟悉Flink的廣播變數機制。

0x01 業務需求

1. 場景需求

對黑名單中的IP進行檢測過濾。IP黑名單的內容會隨時增減,因此是可以隨時動態配置的。

該黑名單假設存在mysql中,Flink作業啟動時候會把這個黑名單從mysql載入,作為一個變數由Flink運算元使用。

2. 問題

我們不想重啟作業以便重新獲取這個變數。所以就需要一個能夠動態修改運算元里變數的方法。

3. 解決方案

使用廣播的方式去解決。去做配置的動態更新。

廣播和普通的流數據不同的是:廣播流的1條流數據能夠被運算元的所有分區所處理,而數據流的1條流數據只能夠被運算元的某一分區處理。因此廣播流的特點也決定適合做配置的動態更新。

0x02 概述

廣播這部分有三個難點:使用步驟;如何自定義函數;如何存取狀態。下面就先為大家概述下。

1. broadcast的使用步驟

  • 建立MapStateDescriptor
  • 通過DataStream.broadcast方法返回廣播數據流BroadcastStream
  • 通過DataStream.connect方法,把業務數據流和BroadcastStream進行連接,返回BroadcastConnectedStream
  • 通過BroadcastConnectedStream.process方法分別進行processElement及processBroadcastElement處理

2. 用戶自定義處理函數

  • BroadcastConnectedStream.process接收兩種類型的function:KeyedBroadcastProcessFunction 和 BroadcastProcessFunction
  • 兩種類型的function都定義了processElement、processBroadcastElement抽象方法,只是KeyedBroadcastProcessFunction多定義了一個onTimer方法,默認是空操作,允許子類重寫
  • processElement處理業務數據流
  • processBroadcastElement處理廣播數據流

3. Broadcast State

  • Broadcast State始終表示為MapState,即map format。這是Flink提供的最通用的狀態原語。是託管狀態的一種,託管狀態是由Flink框架管理的狀態,如ValueState, ListState, MapState等
  • 用戶必須創建一個 MapStateDescriptor,才能得到對應的狀態句柄。 這保存了狀態名稱, 狀態所持有值的類型,並且可能包含用戶指定的函數
  • checkpoint的時候也會checkpoint broadcast state
  • Broadcast State只在記憶體有,沒有RocksDB state backend
  • Flink 會將state廣播到每個task,注意該state並不會跨task傳播,對其修改僅僅是作用在其所在的task
  • downstream tasks接收到broadcast event的順序可能不一樣,所以依賴其到達順序來處理element的時候要小心

0x03. 示例程式碼

1. 示例程式碼

我們直接從Flink源碼入手可以找到理想的示例。 以下程式碼直接摘錄 Flink 源碼 StatefulJobWBroadcastStateMigrationITCase,我會在裡面加上注釋說明。

  @Test    def testRestoreSavepointWithBroadcast(): Unit = {        val env = StreamExecutionEnvironment.getExecutionEnvironment      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)        // 以下兩個變數是為了確定廣播流發出的數據類型,廣播流可以同時發出多種類型的數據      lazy val firstBroadcastStateDesc = new MapStateDescriptor[Long, Long](        "broadcast-state-1",        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]])        lazy val secondBroadcastStateDesc = new MapStateDescriptor[String, String](        "broadcast-state-2",        BasicTypeInfo.STRING_TYPE_INFO,        BasicTypeInfo.STRING_TYPE_INFO)        env.setStateBackend(new MemoryStateBackend)      env.enableCheckpointing(500)      env.setParallelism(4)      env.setMaxParallelism(4)        // 數據流,這裡數據流和廣播流的Source都是同一種CheckpointedSource。數據流這裡做了一系列運算元操作,比如flatMap      val stream = env        .addSource(          new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedSource")        .keyBy(          new KeySelector[(Long, Long), Long] {            override def getKey(value: (Long, Long)): Long = value._1          }        )        .flatMap(new StatefulFlatMapper)        .keyBy(          new KeySelector[(Long, Long), Long] {            override def getKey(value: (Long, Long)): Long = value._1          }        )        // 廣播流      val broadcastStream = env        .addSource(          new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedBroadcastSource")        .broadcast(firstBroadcastStateDesc, secondBroadcastStateDesc)        // 把數據流和廣播流結合起來      stream        .connect(broadcastStream)        .process(new VerifyingBroadcastProcessFunction(expectedFirstState, expectedSecondState))        .addSink(new AccumulatorCountingSink)    }  }    // 用戶自定義的處理函數  class TestBroadcastProcessFunction    extends KeyedBroadcastProcessFunction      [Long, (Long, Long), (Long, Long), (Long, Long)] {      // 重點說明,這裡的 firstBroadcastStateDesc,secondBroadcastStateDesc 其實和之前廣播流的那兩個MapStateDescriptor無關。      // 這裡兩個MapStateDescriptor是為了存取BroadcastState,這樣在 processBroadcastElement和processElement之間就可以傳遞變數了。我們完全可以定義新的MapStateDescriptor,只要processBroadcastElement和processElement之間認可就行。      // 這裡參數 "broadcast-state-1" 是name, flink就是用這個 name 來從Flink運行時系統中存取MapStateDescriptor    lazy val firstBroadcastStateDesc = new MapStateDescriptor[Long, Long](      "broadcast-state-1",      BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],      BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]])      val secondBroadcastStateDesc = new MapStateDescriptor[String, String](      "broadcast-state-2",      BasicTypeInfo.STRING_TYPE_INFO,      BasicTypeInfo.STRING_TYPE_INFO)      override def processElement(                  value: (Long, Long),                  ctx: KeyedBroadcastProcessFunction                  [Long, (Long, Long), (Long, Long), (Long, Long)]#ReadOnlyContext,                  out: Collector[(Long, Long)]): Unit = {        // 這裡Flink源碼中是直接把接受到的業務變數直接再次轉發出去      out.collect(value)    }      override def processBroadcastElement(                  value: (Long, Long),                  ctx: KeyedBroadcastProcessFunction                  [Long, (Long, Long), (Long, Long), (Long, Long)]#Context,                  out: Collector[(Long, Long)]): Unit = {      // 這裡是把最新傳來的廣播變數存儲起來,processElement中可以取出再次使用. 具體是通過firstBroadcastStateDesc 的 name 來獲取 BroadcastState      ctx.getBroadcastState(firstBroadcastStateDesc).put(value._1, value._2)      ctx.getBroadcastState(secondBroadcastStateDesc).put(value._1.toString, value._2.toString)    }  }    // 廣播流和數據流的Source  private class CheckpointedSource(val numElements: Int)    extends SourceFunction[(Long, Long)] with CheckpointedFunction {      private var isRunning = true    private var state: ListState[CustomCaseClass] = _      // 就是簡單的定期發送    override def run(ctx: SourceFunction.SourceContext[(Long, Long)]) {      ctx.emitWatermark(new Watermark(0))      ctx.getCheckpointLock synchronized {        var i = 0        while (i < numElements) {          ctx.collect(i, i)          i += 1        }      }      // don't emit a final watermark so that we don't trigger the registered event-time      // timers      while (isRunning) Thread.sleep(20)    }  }  

2. 技術難點

MapStateDescriptor

首先要說明一些概念:

  • Flink中包含兩種基礎的狀態:Keyed State和Operator State。
  • Keyed State和Operator State又可以 以兩種形式存在:原始狀態和託管狀態。
  • 託管狀態是由Flink框架管理的狀態,如ValueState, ListState, MapState等。
  • raw state即原始狀態,由用戶自行管理狀態具體的數據結構,框架在做checkpoint的時候,使用byte[]來讀寫狀態內容,對其內部數據結構一無所知。
  • MapState是託管狀態的一種:即狀態值為一個map。用戶通過putputAll方法添加元素。

回到我們的例子,廣播變數就是OperatorState的一部分,是以託管狀態的MapState形式保存的。具體getBroadcastState函數就是DefaultOperatorStateBackend中的實現

所以我們需要用MapStateDescriptor描述broadcast state,這裡MapStateDescriptor的使用比較靈活,因為是key,value類似使用,所以個人覺得value直接使用類,這樣更方便,尤其是對於從其他語言轉到scala的同學。

processBroadcastElement

// 因為主要起到控制作用,所以這個函數的處理相對簡單  override def processBroadcastElement(): Unit = {      // 這裡可以把最新傳來的廣播變數存儲起來,processElement中可以取出再次使用,比如      ctx.getBroadcastState(firstBroadcastStateDesc).put(value._1, value._2)  }  

processElement

// 這個函數需要和processBroadcastElement配合起來使用  override def processElement(): Unit = {      // 可以取出processBroadcastElement之前存儲的廣播變數,然後用此來處理業務變數,比如     val secondBroadcastStateDesc = new MapStateDescriptor[String, String](      "broadcast-state-2",      BasicTypeInfo.STRING_TYPE_INFO,      BasicTypeInfo.STRING_TYPE_INFO)        var actualSecondState = Map[String, String]()      for (entry <- ctx.getBroadcastState(secondBroadcastStateDesc).immutableEntries()) {        val v = secondExpectedBroadcastState.get(entry.getKey).get        actualSecondState += (entry.getKey -> entry.getValue)      }       // 甚至這裡只要和processBroadcastElement一起關聯好,可以存儲任意類型的變數。不必須要和廣播變數的類型一致。重點是聲明新的對應的MapStateDescriptor     // MapStateDescriptor繼承了StateDescriptor,其中state為MapState類型,value為Map類型  }  

結合起來使用

因為某些限制,所以下面只能從網上找一個例子給大家講講。

// 模式始終存儲在MapState中,並將null作為鍵。broadcast state始終表示為MapState,這是Flink提供的最通用的狀態原語。  MapStateDescriptor<Void, Pattern> bcStateDescriptor =    new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class));    // 能看到的是,在處理廣播變數時候,存儲廣播變數到BroadcastState   public void processBroadcastElement(Pattern pattern, Context ctx,       Collector<Tuple2<Long, Pattern>> out) throws Exception {     // store the new pattern by updating the broadcast state     BroadcastState<Void, Pattern> bcState = ctx.getBroadcastState(patternDesc);     // storing in MapState with null as VOID default value     bcState.put(null, pattern);   }    // 能看到的是,在處理業務變數時候,從BroadcastState取出廣播變數,存取時候實際都是用"patterns"這個name字元串來作為key。    public void processElement(Action action, ReadOnlyContext ctx,       Collector<Tuple2<Long, Pattern>> out) throws Exception {     // get current pattern from broadcast state     Pattern pattern = ctx.getBroadcastState(this.patternDesc)       // access MapState with null as VOID default value       .get(null);     // get previous action of current user from keyed state     String prevAction = prevActionState.value();     if (pattern != null && prevAction != null) {       // user had an action before, check if pattern matches       if (pattern.firstAction.equals(prevAction) &&           pattern.secondAction.equals(action.action)) {         // MATCH         out.collect(new Tuple2<>(ctx.getCurrentKey(), pattern));       }     }     // update keyed state and remember action for next pattern evaluation     prevActionState.update(action.action);   }  

1. 廣播的邏輯流程

 * The life cycle of the Broadcast:   * {@code   *  -- 初始化邏輯 -> 用一個BroadcastConnectedStream把數據流和廣播流結合起來進行拓撲轉換   *        |   *        +---->  businessStream = DataStream.filter.map....   *        |       // 處理業務邏輯的數據流,businessStream 是普通DataStream   *        +---->  broadcastStream = DataStream.broadcast(broadcastStateDesc)   *        |       // 處理配置邏輯的廣播數據流,broadcastStream是BroadcastStream類型   *        +---->  businessStream.connect(broadcastStream)   *        |                     .process(new processFunction(broadcastStateDesc))   *        |       // 把業務流,廣播流 結合起來,生成一個BroadcastConnectedStream,然後進行 process   *        +----------> process @ BroadcastConnectedStream   *        |                TwoInputStreamOperator<IN1, IN2, OUT> operator =   *        |                new CoBroadcastWithNonKeyedOperator<>(clean(function),   *        |                broadcastStateDescriptors);   *        |                return transform(outTypeInfo, operator);   *        |       // 生成一個類型是TwoInputStreamOperator 的 operator,進行 transform   *        +----------------> transform @ BroadcastConnectedStream   *        |                      transform = new TwoInputTransformation<>(   *        |       			  	       inputStream1.getTransformation(), // 業務流   *        |       			  	       inputStream2.getTransformation(), // 廣播流   *        |       			  	       ifunctionName, // 用戶的UDF   *        |       			  	       operator, // 運算元 CoBroadcastWithNonKeyedOperator   *        |       			  	       outTypeInfo);  // 輸出類型   *        |       	      		   returnStream = new SingleOutputStreamOperator(transform);   *        |       			         getExecutionEnvironment().addOperator(transform)   *        |       // 將業務流,廣播流與拓撲聯合起來形成一個轉換,加到 Env 中,這就完成了拓撲轉換   *        |       // 最後返回結果是一個SingleOutputStreamOperator。   * }       *  數據結構:   *  -- BroadcastStream.   *  就是簡單封裝一個DataStream,然後記錄這個廣播流對應的StateDescriptors   public class BroadcastStream<T> {  	private final StreamExecutionEnvironment environment;  	private final DataStream<T> inputStream;  	private final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors;   }     *  數據結構:   *  -- BroadcastConnectedStream.   *  把業務流,廣播流 結合起來,然後會生成運算元和拓撲  public class BroadcastConnectedStream<IN1, IN2> {  	private final StreamExecutionEnvironment environment;  	private final DataStream<IN1> inputStream1;  	private final BroadcastStream<IN2> inputStream2;  	private final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors;  }    *  真實計算:  *  -- CoBroadcastWithNonKeyedOperator -> 真正對BroadcastProcessFunction的執行,是在這裡完成的  public class CoBroadcastWithNonKeyedOperator<IN1, IN2, OUT>  		extends AbstractUdfStreamOperator<OUT, BroadcastProcessFunction<IN1, IN2, OUT>>  		implements TwoInputStreamOperator<IN1, IN2, OUT> {      private final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors;  	private transient TimestampedCollector<OUT> collector;  	private transient Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> broadcastStates;  	private transient ReadWriteContextImpl rwContext;  	private transient ReadOnlyContextImpl rContext;    	@Override  	public void processElement1(StreamRecord<IN1> element) throws Exception {  		collector.setTimestamp(element);  		rContext.setElement(element);      // 當上游有最新業務數據來的時候,調用用戶自定義的processElement      // 在這可以把之前存儲的廣播配置資訊取出,然後對業務數據流進行處理  		userFunction.processElement(element.getValue(), rContext, collector);  		rContext.setElement(null);  	}    	@Override  	public void processElement2(StreamRecord<IN2> element) throws Exception {  		collector.setTimestamp(element);  		rwContext.setElement(element);      // 當上游有數據來的時候,調用用戶自定義的processBroadcastElement      // 在這可以把最新傳送的廣播配置資訊存起來  		userFunction.processBroadcastElement(element.getValue(), rwContext, collector);  		rwContext.setElement(null);  	}  }  

2. DataStream的關鍵函數

// 就是connect,broadcast,分別生成對應的數據流  public class DataStream<T> {    protected final StreamExecutionEnvironment environment;    protected final Transformation<T> transformation;    	@PublicEvolving  	public <R> BroadcastConnectedStream<T, R> connect(BroadcastStream<R> broadcastStream) {  		return new BroadcastConnectedStream<>(  				environment,  				this,  				Preconditions.checkNotNull(broadcastStream),  				broadcastStream.getBroadcastStateDescriptor());  	}    	@PublicEvolving  	public BroadcastStream<T> broadcast(final MapStateDescriptor<?, ?>... broadcastStateDescriptors) {  		final DataStream<T> broadcastStream = setConnectionType(new BroadcastPartitioner<>());  		return new BroadcastStream<>(environment, broadcastStream, broadcastStateDescriptors);  	}  }  

3. 關鍵數據結構MapStateDescriptor

主要是用來聲明各種元數據資訊。後續可以看出來,系統是通過MapStateDescriptor的name,即第一個參數來存儲 / 獲取MapStateDescriptor對應的State。

public class MapStateDescriptor<UK, UV> extends StateDescriptor<MapState<UK, UV>, Map<UK, UV>> {  	/**  	 * Create a new {@code MapStateDescriptor} with the given name and the given type serializers.  	 *  	 * @param name The name of the {@code MapStateDescriptor}.  	 * @param keySerializer The type serializer for the keys in the state.  	 * @param valueSerializer The type serializer for the values in the state.  	 */  	public MapStateDescriptor(String name, TypeSerializer<UK> keySerializer, TypeSerializer<UV> valueSerializer) {  		super(name, new MapSerializer<>(keySerializer, valueSerializer), null);  	}    	/**  	 * Create a new {@code MapStateDescriptor} with the given name and the given type information.  	 *  	 * @param name The name of the {@code MapStateDescriptor}.  	 * @param keyTypeInfo The type information for the keys in the state.  	 * @param valueTypeInfo The type information for the values in the state.  	 */  	public MapStateDescriptor(String name, TypeInformation<UK> keyTypeInfo, TypeInformation<UV> valueTypeInfo) {  		super(name, new MapTypeInfo<>(keyTypeInfo, valueTypeInfo), null);  	}    	/**  	 * Create a new {@code MapStateDescriptor} with the given name and the given type information.  	 *  	 * <p>If this constructor fails (because it is not possible to describe the type via a class),  	 * consider using the {@link #MapStateDescriptor(String, TypeInformation, TypeInformation)} constructor.  	 *  	 * @param name The name of the {@code MapStateDescriptor}.  	 * @param keyClass The class of the type of keys in the state.  	 * @param valueClass The class of the type of values in the state.  	 */  	public MapStateDescriptor(String name, Class<UK> keyClass, Class<UV> valueClass) {  		super(name, new MapTypeInfo<>(keyClass, valueClass), null);  	}  }  

4. 狀態存取

在processBroadcastElement和processElement之間傳遞的狀態,是通過MapStateDescriptor的name為key,來存儲在Flink中。即類似調用如下ctx.getBroadcastState(firstBroadcastStateDesc).put(value._1, value._2)。所以我們接下來需要介紹下Flink的State概念。

State vs checkpoint

首先區分一下兩個概念,state一般指一個具體的task/operator的狀態。而checkpoint則表示了一個Flink Job,在一個特定時刻的一份全局狀態快照,即包含了所有task/operator的狀態。Flink通過定期地做checkpoint來實現容錯和恢復。

Flink中包含兩種基礎的狀態:Keyed State和Operator State。

Keyed State

顧名思義,就是基於KeyedStream上的狀態。這個狀態是跟特定的key綁定的,對KeyedStream流上的每一個key,可能都對應一個state。

Operator State

與Keyed State不同,Operator State跟一個特定operator的一個並發實例綁定,整個operator只對應一個state。相比較而言,在一個operator上,可能會有很多個key,從而對應多個keyed state。

舉例來說,Flink中的Kafka Connector,就使用了operator state。它會在每個connector實例中,保存該實例中消費topic的所有(partition, offset)映射。

原始狀態和Flink託管狀態 (Raw and Managed State)

這又是另外一個維度。

Keyed StateOperator State 分別有兩種存在形式:managed and raw, 即原始狀態和託管狀態。

託管狀態是由 Flink框架運行時 管理的狀態,比如內部的 hash table 或者 RocksDB。 比如 「ValueState」, 「ListState」 等。Flink runtime 會對這些狀態進行編碼並寫入 checkpoint。

比如managed keyed state 介面提供不同類型狀態的訪問介面,這些狀態都作用於當前輸入數據的 key 下。換句話說,這些狀態僅可在 KeyedStream 上使用,可以通過 stream.keyBy(...) 得到 KeyedStream。而我們可以通過實現 CheckpointedFunctionListCheckpointed 介面來使用 managed operator state。

Raw state即原始狀態,由用戶自行管理狀態具體的數據結構,保存在運算元自己的數據結構中。checkpoint 的時候,Flink 並不知曉具體的內容,僅僅寫入一串位元組序列到 checkpoint。

通常在DataStream上的狀態推薦使用託管的狀態,當實現一個用戶自定義的operator時,會使用到原始狀態。

回到我們的例子,廣播變數就是OperatorState的一部分,是以託管狀態的MapState形式保存的。具體getBroadcastState函數就是DefaultOperatorStateBackend中的實現

StateDescriptor

你必須創建一個 StateDescriptor,才能得到對應的狀態句柄。 這保存了狀態名稱(你可以創建多個狀態,並且它們必須具有唯一的名稱以便可以引用它們), 狀態所持有值的類型,並且可能包含用戶指定的函數,例如ReduceFunction。 根據不同的狀態類型,可以創建ValueStateDescriptorListStateDescriptorReducingStateDescriptorFoldingStateDescriptorMapStateDescriptor

狀態通過 RuntimeContext 進行訪問,因此只能在 rich functions 中使用。

OperatorStateBackEnd

OperatorStateBackEnd 主要管理OperatorState. 目前只有一種實現: DefaultOperatorStateBackend。

DefaultOperatorStateBackend

DefaultOperatorStateBackend狀態是以Map方式來存儲的。其構造出一個 PartitionableListState (屬於ListState)。OperatorState都保存在記憶體中。

public class DefaultOperatorStateBackend implements OperatorStateBackend {    	/**  	 * Map for all registered operator states. Maps state name -> state  	 */  	private final Map<String, PartitionableListState<?>> registeredOperatorStates;    	/**  	 * Map for all registered operator broadcast states. Maps state name -> state  	 */  	private final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates;      /**  	 * Cache of already accessed states.  	 *  	 * <p>In contrast to {@link #registeredOperatorStates} which may be repopulated  	 * with restored state, this map is always empty at the beginning.  	 *  	 * <p>TODO this map should be moved to a base class once we have proper hierarchy for the operator state backends.  	 */  	private final Map<String, PartitionableListState<?>> accessedStatesByName;    	private final Map<String, BackendWritableBroadcastState<?, ?>> accessedBroadcastStatesByName;  // 這裡用來快取廣播變數      // 這裡就是前文中所說的,存取廣播變數的API  	public <K, V> BroadcastState<K, V> getBroadcastState(final MapStateDescriptor<K, V> stateDescriptor) throws StateMigrationException {    		String name = Preconditions.checkNotNull(stateDescriptor.getName());        // 如果之前有,就取出來  		BackendWritableBroadcastState<K, V> previous =  			(BackendWritableBroadcastState<K, V>) accessedBroadcastStatesByName.get(        name);    		if (previous != null) {  			return previous;  		}    		stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());  		TypeSerializer<K> broadcastStateKeySerializer = Preconditions.checkNotNull(stateDescriptor.getKeySerializer());  		TypeSerializer<V> broadcastStateValueSerializer = Preconditions.checkNotNull(stateDescriptor.getValueSerializer());    		BackendWritableBroadcastState<K, V> broadcastState =  			(BackendWritableBroadcastState<K, V>) registeredBroadcastStates.get(name);    		if (broadcastState == null) {  			broadcastState = new HeapBroadcastState<>(  					new RegisteredBroadcastStateBackendMetaInfo<>(  							name,  							OperatorStateHandle.Mode.BROADCAST,  							broadcastStateKeySerializer,  							broadcastStateValueSerializer));  			registeredBroadcastStates.put(name, broadcastState);  		} else {  			// has restored state; check compatibility of new state access    			RegisteredBroadcastStateBackendMetaInfo<K, V> restoredBroadcastStateMetaInfo = broadcastState.getStateMetaInfo();    			// check whether new serializers are incompatible  			TypeSerializerSchemaCompatibility<K> keyCompatibility =  				restoredBroadcastStateMetaInfo.updateKeySerializer(broadcastStateKeySerializer);    			TypeSerializerSchemaCompatibility<V> valueCompatibility =  				restoredBroadcastStateMetaInfo.updateValueSerializer(broadcastStateValueSerializer);    			broadcastState.setStateMetaInfo(restoredBroadcastStateMetaInfo);  		}    		accessedBroadcastStatesByName.put(name, broadcastState); // 如果之前沒有,就存入  		return broadcastState;  	}  }  

0x05. 參考

Flink原理與實現:詳解Flink中的狀態管理 https://yq.aliyun.com/articles/225623

Flink使用廣播實現配置動態更新 https://www.jianshu.com/p/c8c99f613f10

Flink Broadcast State實用指南 https://blog.csdn.net/u010942041/article/details/93901918

聊聊flink的Broadcast State https://www.jianshu.com/p/d6576ae67eae

Working with State https://ci.apache.org/projects/flink/flink-docs-stable/zh/dev/stream/state/state.html