Flink状态专题:keyed state和Operator state
- 2019 年 10 月 3 日
- 筆記
??1.????
?????Flink??????????Key??????????Keyde state?Operator State?????
?????1?Keyed State
?????????key?????state,????KeyedStream????????Functions?Operators???Keyed State?Operator State????????Keyed State????key????????????Key State?????Operator?Key????Keyed State???? ????Key Groups???????????????????????????Keyed State??????????????Keyed??????????????Key Groups ? keys?
?????2?Operator State
??2.Managed Keyed State
?????1?Stateful Function??
????????????????????RichFlatmapFunction???ValueState,??????????????
????
StreamExecutionEnvironment env = StreamExecutionEnvironment .getExecutionEnvironment; //??????? DataStream<int,long> inputStream = env.fromElements((2,21L),(4,1L),(5,4L)); inputStream.keyBy(“1”).flatMap{ //?????RichFlatMapFunction,????????????????????????? new RichFlatMapFunction<Map(int,long),Map(int,Map(long,long))>(){ private ValueState leastValueState = null; @Override open(Configuration parameters){ ValueStateDescriptor leastValueStateDescriptor =new ValueStateDescriptor ("leastValueState ",class.of(long)); ?? leastValueState = getRuntimeContext.getState(leastValueStateDescriptor ); } @Override flatMap(Collector collector,Tuple2(int,long) t){ ??long leastValue =leastValueState .value(); ??if(t.f1>leastValue){ ????collector.collect(t,leastValue); ??}else{ ????leastValueState.update(t.f1); ????collector.collect(t,leastValue); ??} ?} }
}
??3.Managed Operator State
??Operator State???non-keyed state??????????????????Kafka Connector????Kafka???????????Kafka?????????Topic???Offsets????????Operator State??Flink?????CheckpointedFunction??ListCheckpointed?????????Managed Operator State????
???1???CheckpointedFunction????Operator State
CheckpointedFunction?????
public interface CheckpointedFunction{ //??checkpoint?? void snapshotState(FunctionSnapshotContext context)throws Exception; //?????????????? void initializeState(FunctionInitializationContext context)throws Exception; }
????????Managed Operator State???List??????????????????????List????????????????Flink?????Managed OperatorState????????????Even-split Redistribution?Union Redistribution?
????????FlatMapFunction?CheckpointedFunction??????????key???????????????????
???initializeState?????????keyedState?operator State?????????Key?????????????????
private class CheckpointCount(int numElements)extends FlatMapFunction<Map(int,long),Map(int,Map(long,long))>with CheckpointedFunction{ //?????????????Operator???? private long operatorCount = null; //??keyedState????key?????? private ValueState keyedState =null; //??operatorState,???????? private ListState operatorState = null; @Override flatMap(Tuple(int,long)t,Collector collector){ long keyedCount okeyedState.value() +1; //??keyedState?? keyedState.update(keyedCount); //??????operatorCount? operatorCount =operatorCount+1; //???????id,id???????keyedCount,???????????operatorCount collector.collect(t.f0,keyedCount,operatorCount); } //??????? @Override initializeState(FunctionInitializationContext context){ //?????keyedState ValueStateDescriptor KeyedDescriptor =new ValueStateDescriptor ("keyedState",createTypeInformation); keyedState = context.getKeyedStateStore.getState(KeyedDescriptor ); //?????operatorState ValueStateDescriptor OperatorDescriptor =new ValueStateDescriptor ("OperatorState",createTypeInformation); operatorState = context.getOperatorStateStore.getListState(); //???Restored?????operatorState???????? if(context.isRestored){ operatorCount = operatorState.get() } //???snapshot???operatorCount???operatorState? @Override snapshotState(FunctionSnapshotContext context){ operatorState.clear(); operatorState.add(operatorCount); } } }
???????????snapshotState()?????????checkpoint????operatorState???????????????????checkpoint?operatorCount????????????initializeState???????keyedState?OperatorState,??operatorCount????????operatorState????
?2???ListCheckpointed????Operator State
? ListCheckpointed???CheckpointedFunction????????????????????List?????????????????even-redistribution???
???????????????Operator State:
??
List<T> snapshotState(long checkpointId,long timestamp) throws Exception; void restoreState(List<T> state) throws Exception;
????snapshotState????????List???checkpoints????restoreState??????checkpoints?????????
class numberRecordsCount extends FlatMapFunction(Map(String,long),Map(String,long))with ListCheckpointed{ private long numberRecords =0L; @Override flatMap(Tuple2(String,long)t,Collector collector){ //??????????????? numberRecords +=1; collector.collect(t.f0,numberRecords); } @Override snapshotState(long checkpointId){ Collections.singletonList(numberRecords); } @Override restoreState(List<long> list){ numberRecords =0L; for(count <list){ //??????numberRecords?? numberRecords +=count } } }