processFunction使用及SideOutPut替換Split實現分流

  • 2020 年 1 月 13 日
  • 筆記

自定義processFunction函數:

        // 3.2 添加任務,使用{@link ProcessFunction}方便控制: 1. 忽略null數據,2. 旁路輸出side output          DetaiEventRuleExecutingProcessor executingProcessor = new DetaiEventRuleExecutingProcessor();

函數內部實現,進行數據的簡單四捨五入:

public class DetaiEventRuleExecutingProcessor extends ProcessFunction<StartupInfoData, DetailData> {          private static final OutputTag<StartupInfoData> APP_LOG_TAG = new OutputTag<>("RandomLessFive", TypeInformation.of(StartupInfoData.class));        private void sideOut(OutputTag<StartupInfoData> tag, StartupInfoData inputKV, Context ctx)      {          ctx.output(tag, inputKV);      }          @Override      public void processElement(StartupInfoData startupInfoData, Context context, Collector<DetailData> collector) throws Exception {            if(startupInfoData.getRandomNum() < 5)          {              sideOut(APP_LOG_TAG, startupInfoData, context);            }else if(startupInfoData.getRandomNum() >= 5){                DetailData detailData = new DetailData();              detailData.setAppId(startupInfoData.getAppId());              detailData.setAppName(startupInfoData.getAppName());              detailData.setMsgtime(System.currentTimeMillis());              detailData.setRandomNum(startupInfoData.getRandomNum());              collector.collect(detailData);          }      }      }

將處理的數據以及旁路數據寫入到文件,4一下寫入u4, 5以及以上寫入b5:

        //自定義processFunction,同時進行sideOut          SingleOutputStreamOperator<DetailData> executeMainStream = startupInfoData.process(executingProcessor).name("processExecuteProcessor");          //輸出5以上的數值          executeMainStream.writeAsText("D:\all\b5.txt").setParallelism(1);            logger.info("丟棄分割線..........................");            //獲取丟棄的原始數據          DataStream<StartupInfoData> leftLogStream   = executeMainStream.getSideOutput(APP_LOG_TAG);          leftLogStream.writeAsText("D:\all\u4.txt").setParallelism(1);

主函數配置以及kafka數據獲取:

        logger.info("------------------------------------------------------");          logger.info("------------- FlinkKafkaSource Start -----------------");          logger.info("------------------------------------------------------");            String ZOOKEEPER_HOST = kafkaConf.getZkhosts();          String KAFKA_BROKER = kafkaConf.getServers();          String TRANSACTION_GROUP = kafkaConf.getGroup();          String topic = kafkaConf.getTopic();            Properties prop = new Properties();          prop.setProperty("zookeeper.connect", ZOOKEEPER_HOST);          prop.setProperty("bootstrap.servers", KAFKA_BROKER);          prop.setProperty("group.id", TRANSACTION_GROUP);              //todo Flink的流運行環境          final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();            //todo checkpoint配置 每5s checkpoint一次          // 每隔1000 ms進行啟動一個檢查點【設置checkpoint的周期】          env.enableCheckpointing(1000);          // 高級選項:          // 設置模式為exactly-once (這是默認值)          env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);          // 確保檢查點之間有至少500 ms的間隔【checkpoint最小間隔】          env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);          // 檢查點必須在一分鐘內完成,或者被丟棄【checkpoint的超時時間】          env.getCheckpointConfig().setCheckpointTimeout(60000);          // 同一時間只允許進行一個檢查點          env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);          // 表示一旦Flink處理程式被cancel後,會保留Checkpoint數據,以便根據實際需要恢復到指定的Checkpoint【詳細解釋見備註】          //ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink處理程式被cancel後,會保留Checkpoint數據,以便根據實際需要恢復到指定的Checkpoint          //ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink處理程式被cancel後,會刪除Checkpoint數據,只有job執行失敗的時候才會保存checkpoint          env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);            //設置statebackend          //env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true));            //創建消費者          logger.info("%%%%%%% 消費topic: {}", topic);          FlinkKafkaConsumer011<String> consumer011 = new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), prop);            //todo 默認消費策略--從上次消費組的偏移量進行繼續消費          consumer011.setStartFromGroupOffsets();            DataStreamSource<String> text = env.addSource(consumer011);              //todo  map 輸入一個數據源,產生一個數據源          DataStream<StartupInfoData> startupInfoData = text.map(new MapFunction<String, StartupInfoData>() {              @Override              public StartupInfoData map(String input) throws Exception {                  return JSON.parseObject(input, StartupInfoData.class);              }          });

測試結果是否準確:

source,kafka生產的數據:

輸出文件:

——————————————————————————————————————————————————-

範例二:

private static final OutputTag<LogEntity> APP_LOG_TAG = new OutputTag<>("appLog", TypeInformation.of(LogEntity.class));  private static final OutputTag<LogEntity> ANALYZE_METRIC_TAG = new OutputTag<>("analyzeMetricLog", TypeInformation.of(LogEntity.class));
   private static SingleOutputStreamOperator<LogEntity> sideOutStream(DataStream<LogEntity> rawLogStream) {          return rawLogStream                  .process(new ProcessFunction<LogEntity, LogEntity>() {                      @Override                      public void processElement(LogEntity entity, Context ctx, Collector<LogEntity> out) throws Exception {                          // 根據日誌等級,給對象打上不同的標記                          if (entity.getLevel().equals(ANALYZE_LOG_LEVEL)) {                              ctx.output(ANALYZE_METRIC_TAG, entity);                          } else {                              ctx.output(APP_LOG_TAG, entity);                          }                      }                  })                  .name("RawLogEntitySplitStream");      }        // 調用函數,對原始數據流中的對象進行標記      SingleOutputStreamOperator<LogEntity> sideOutLogStream = sideOutStream(rawLogStream);      // 根據標記,獲取不同的數據流,以便後續進行進一步分析      DataStream<LogEntity> appLogStream = sideOutLogStream.getSideOutput(APP_LOG_TAG);      DataStream<LogEntity> rawAnalyzeMetricLogStream = sideOutLogStream.getSideOutput(ANALYZE_METRIC_TAG);