Flink状态专题:keyed state和Operator state

  • 2019 年 10 月 3 日
  • 筆記

        ?????flink????????????flink???????
        ??????????????flink????????????????????????
        ?????????????????????????????????????
??????????????????????????????????????????????
??????????????
???????????flink????????????????flink????????????????????Function??????????
????????????????
??

??1.????

?????Flink??????????Key??????????Keyde state?Operator State?????

?????1?Keyed State

?????????key?????state,????KeyedStream????????Functions?Operators???Keyed State?Operator State????????Keyed State????key????????????Key State?????Operator?Key????Keyed State????                      ????Key Groups???????????????????????????Keyed State??????????????Keyed??????????????Key Groups ? keys?

?????2?Operator State

???????Keyed State?????Operator State???????????????????key????????????????????????????Operator State
 
??????????????????????????????
 
?????Flink??Keyed State?Operator State???????????????????????????????????????
 

??2.Managed Keyed State

????Flink????Managed Keyed State???????ValueState[T],ListState[T],MapState[K,V]?

?????1?Stateful Function??

????????????????????RichFlatmapFunction???ValueState,??????????????

????

StreamExecutionEnvironment env = StreamExecutionEnvironment .getExecutionEnvironment;  //???????  DataStream<int,long> inputStream = env.fromElements((2,21L),(4,1L),(5,4L));  inputStream.keyBy(“1”).flatMap{  //?????RichFlatMapFunction,?????????????????????????  new RichFlatMapFunction<Map(int,long),Map(int,Map(long,long))>(){    private ValueState leastValueState = null;    @Override    open(Configuration parameters){       ValueStateDescriptor leastValueStateDescriptor =new ValueStateDescriptor ("leastValueState ",class.of(long));  ?? leastValueState = getRuntimeContext.getState(leastValueStateDescriptor );      }      @Override    flatMap(Collector collector,Tuple2(int,long) t){  ??long leastValue =leastValueState .value();  ??if(t.f1>leastValue){  ????collector.collect(t,leastValue);  ??}else{  ????leastValueState.update(t.f1);  ????collector.collect(t,leastValue);  ??}  ?}   }
}

??3.Managed Operator State

??Operator State???non-keyed state??????????????????Kafka Connector????Kafka???????????Kafka?????????Topic???Offsets????????Operator State??Flink?????CheckpointedFunction??ListCheckpointed?????????Managed Operator State????

???1???CheckpointedFunction????Operator State

        CheckpointedFunction?????

public interface CheckpointedFunction{  //??checkpoint??    void snapshotState(FunctionSnapshotContext context)throws Exception;  //??????????????    void initializeState(FunctionInitializationContext context)throws Exception;  }

????????Managed Operator State???List??????????????????????List????????????????Flink?????Managed OperatorState????????????Even-split Redistribution?Union Redistribution?

????????FlatMapFunction?CheckpointedFunction??????????key???????????????????

???initializeState?????????keyedState?operator State?????????Key?????????????????

private class CheckpointCount(int numElements)extends FlatMapFunction<Map(int,long),Map(int,Map(long,long))>with CheckpointedFunction{  //?????????????Operator????  private long operatorCount = null;  //??keyedState????key??????  private ValueState keyedState =null;  //??operatorState,????????  private ListState operatorState = null;  @Override  flatMap(Tuple(int,long)t,Collector collector){  long keyedCount okeyedState.value() +1;  //??keyedState??  keyedState.update(keyedCount);  //??????operatorCount?  operatorCount =operatorCount+1;  //???????id,id???????keyedCount,???????????operatorCount  collector.collect(t.f0,keyedCount,operatorCount);    }  //???????  @Override  initializeState(FunctionInitializationContext context){  //?????keyedState  ValueStateDescriptor KeyedDescriptor =new ValueStateDescriptor ("keyedState",createTypeInformation);  keyedState = context.getKeyedStateStore.getState(KeyedDescriptor );  //?????operatorState  ValueStateDescriptor OperatorDescriptor =new ValueStateDescriptor ("OperatorState",createTypeInformation);  operatorState = context.getOperatorStateStore.getListState();  //???Restored?????operatorState????????  if(context.isRestored){    operatorCount = operatorState.get()  }  //???snapshot???operatorCount???operatorState?  @Override  snapshotState(FunctionSnapshotContext context){  operatorState.clear();  operatorState.add(operatorCount);  }  }  }

???????????snapshotState()?????????checkpoint????operatorState???????????????????checkpoint?operatorCount????????????initializeState???????keyedState?OperatorState,??operatorCount????????operatorState????

?2???ListCheckpointed????Operator State

  ? ListCheckpointed???CheckpointedFunction????????????????????List?????????????????even-redistribution???

???????????????Operator State:

??

List<T> snapshotState(long checkpointId,long timestamp) throws Exception;  void restoreState(List<T> state) throws Exception;

????snapshotState????????List???checkpoints????restoreState??????checkpoints?????????

class numberRecordsCount extends FlatMapFunction(Map(String,long),Map(String,long))with ListCheckpointed{    private long numberRecords =0L;  @Override  flatMap(Tuple2(String,long)t,Collector collector){  //???????????????  numberRecords +=1;  collector.collect(t.f0,numberRecords);  }  @Override  snapshotState(long checkpointId){    Collections.singletonList(numberRecords);  }  @Override  restoreState(List<long> list){   numberRecords =0L;  for(count <list){   //??????numberRecords??  numberRecords +=count  }  }  }

 

????????????????????????????????????????????
??
??????????????flink?????????????????????scala??????????????java?????????java???????????????
 
????????????
 
???????flink???????????????????????????????flink???????
 
???????????