processFunction使用及SideOutPut替換Split實現分流
- 2020 年 1 月 13 日
- 筆記
自定義processFunction函數:
// 3.2 添加任務,使用{@link ProcessFunction}方便控制: 1. 忽略null數據,2. 旁路輸出side output DetaiEventRuleExecutingProcessor executingProcessor = new DetaiEventRuleExecutingProcessor();
函數內部實現,進行數據的簡單四捨五入:
public class DetaiEventRuleExecutingProcessor extends ProcessFunction<StartupInfoData, DetailData> { private static final OutputTag<StartupInfoData> APP_LOG_TAG = new OutputTag<>("RandomLessFive", TypeInformation.of(StartupInfoData.class)); private void sideOut(OutputTag<StartupInfoData> tag, StartupInfoData inputKV, Context ctx) { ctx.output(tag, inputKV); } @Override public void processElement(StartupInfoData startupInfoData, Context context, Collector<DetailData> collector) throws Exception { if(startupInfoData.getRandomNum() < 5) { sideOut(APP_LOG_TAG, startupInfoData, context); }else if(startupInfoData.getRandomNum() >= 5){ DetailData detailData = new DetailData(); detailData.setAppId(startupInfoData.getAppId()); detailData.setAppName(startupInfoData.getAppName()); detailData.setMsgtime(System.currentTimeMillis()); detailData.setRandomNum(startupInfoData.getRandomNum()); collector.collect(detailData); } } }
將處理的數據以及旁路數據寫入到文件,4一下寫入u4, 5以及以上寫入b5:
//自定義processFunction,同時進行sideOut SingleOutputStreamOperator<DetailData> executeMainStream = startupInfoData.process(executingProcessor).name("processExecuteProcessor"); //輸出5以上的數值 executeMainStream.writeAsText("D:\all\b5.txt").setParallelism(1); logger.info("丟棄分割線.........................."); //獲取丟棄的原始數據 DataStream<StartupInfoData> leftLogStream = executeMainStream.getSideOutput(APP_LOG_TAG); leftLogStream.writeAsText("D:\all\u4.txt").setParallelism(1);
主函數配置以及kafka數據獲取:
logger.info("------------------------------------------------------"); logger.info("------------- FlinkKafkaSource Start -----------------"); logger.info("------------------------------------------------------"); String ZOOKEEPER_HOST = kafkaConf.getZkhosts(); String KAFKA_BROKER = kafkaConf.getServers(); String TRANSACTION_GROUP = kafkaConf.getGroup(); String topic = kafkaConf.getTopic(); Properties prop = new Properties(); prop.setProperty("zookeeper.connect", ZOOKEEPER_HOST); prop.setProperty("bootstrap.servers", KAFKA_BROKER); prop.setProperty("group.id", TRANSACTION_GROUP); //todo Flink的流運行環境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //todo checkpoint配置 每5s checkpoint一次 // 每隔1000 ms進行啟動一個檢查點【設置checkpoint的周期】 env.enableCheckpointing(1000); // 高級選項: // 設置模式為exactly-once (這是默認值) env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 確保檢查點之間有至少500 ms的間隔【checkpoint最小間隔】 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // 檢查點必須在一分鐘內完成,或者被丟棄【checkpoint的超時時間】 env.getCheckpointConfig().setCheckpointTimeout(60000); // 同一時間只允許進行一個檢查點 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 表示一旦Flink處理程式被cancel後,會保留Checkpoint數據,以便根據實際需要恢復到指定的Checkpoint【詳細解釋見備註】 //ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink處理程式被cancel後,會保留Checkpoint數據,以便根據實際需要恢復到指定的Checkpoint //ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink處理程式被cancel後,會刪除Checkpoint數據,只有job執行失敗的時候才會保存checkpoint env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //設置statebackend //env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true)); //創建消費者 logger.info("%%%%%%% 消費topic: {}", topic); FlinkKafkaConsumer011<String> consumer011 = new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), prop); //todo 默認消費策略--從上次消費組的偏移量進行繼續消費 consumer011.setStartFromGroupOffsets(); DataStreamSource<String> text = env.addSource(consumer011); //todo map 輸入一個數據源,產生一個數據源 DataStream<StartupInfoData> startupInfoData = text.map(new MapFunction<String, StartupInfoData>() { @Override public StartupInfoData map(String input) throws Exception { return JSON.parseObject(input, StartupInfoData.class); } });
測試結果是否準確:
source,kafka生產的數據:

輸出文件:


——————————————————————————————————————————————————-
範例二:
private static final OutputTag<LogEntity> APP_LOG_TAG = new OutputTag<>("appLog", TypeInformation.of(LogEntity.class)); private static final OutputTag<LogEntity> ANALYZE_METRIC_TAG = new OutputTag<>("analyzeMetricLog", TypeInformation.of(LogEntity.class));
private static SingleOutputStreamOperator<LogEntity> sideOutStream(DataStream<LogEntity> rawLogStream) { return rawLogStream .process(new ProcessFunction<LogEntity, LogEntity>() { @Override public void processElement(LogEntity entity, Context ctx, Collector<LogEntity> out) throws Exception { // 根據日誌等級,給對象打上不同的標記 if (entity.getLevel().equals(ANALYZE_LOG_LEVEL)) { ctx.output(ANALYZE_METRIC_TAG, entity); } else { ctx.output(APP_LOG_TAG, entity); } } }) .name("RawLogEntitySplitStream"); } // 調用函數,對原始數據流中的對象進行標記 SingleOutputStreamOperator<LogEntity> sideOutLogStream = sideOutStream(rawLogStream); // 根據標記,獲取不同的數據流,以便後續進行進一步分析 DataStream<LogEntity> appLogStream = sideOutLogStream.getSideOutput(APP_LOG_TAG); DataStream<LogEntity> rawAnalyzeMetricLogStream = sideOutLogStream.getSideOutput(ANALYZE_METRIC_TAG);