從 demo 到生產 – 手把手寫出實戰需求的 Flink 廣播程序
Flink 廣播變量在實時處理程序中扮演着很重要的角色,適當的使用廣播變量會大大提升程序處理效率。
本文從簡單的 demo 場景出發,引入生產中實際的需求並提出思路與部分示例代碼,應對一般需求應該沒有什麼問題,話不多說,趕緊來看看這篇乾貨滿滿的廣播程序使用實戰吧。
1 啥是廣播
Flink 支持廣播變量,允許在每台機器上保留一個只讀的緩存變量,數據存在內存中,在不同的 task 所在的節點上的都能獲取到,可以減少大量的 shuffle 操作。
換句話說,廣播變量可以理解為一個公共的共享變量,可以把一個 dataset 的數據集廣播出去,然後不同的 task 在節點上都能夠獲取到,這個數據在每個節點上只會存在一份。
如果不使用 broadcast,則在每個節點中的每個 task 中都需要拷貝一份 dataset 數據集,比較浪費內存 (也就是一個節點中可能會存在多份 dataset 數據)
2 用法總結
//1 初始化數據 DataSet<Integer> toBroadcast = env.fromElements(1,2,3) //2 廣播數據 api withBroadcastSet(toBroadcast,"broadcastSetName") //3 獲取數據 Collection<integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");
注意:
-
廣播變量由於要常駐內存,程序結束時才會失效,所以數據量不宜過大
-
廣播變量廣播在初始化後不支持修改 (修改場景也有辦法)
3 基礎案例演示
-
基礎案例廣播變量使用
這種場景下廣播變量就是加載參數表,參數表不會變化,記住第二部分常用總結公式即可。
/** * @author 大數據江湖 * @version 1.0 * @date 2021/5/17. * */ public class BaseBroadCast { /** * broadcast廣播變量 * 需求: * flink會從數據源中獲取到用戶的姓名 * 最終需要把用戶的姓名和年齡信息打印出來 * 分析: * 所以就需要在中間的map處理的時候獲取用戶的年齡信息 * 建議吧用戶的關係數據集使用廣播變量進行處理 * */ public static void main(String[] args) throws Exception { //獲取運行環境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //1:準備需要廣播的數據 ArrayList<Tuple2<String, Integer>> broadData = new ArrayList<>(); broadData.add(new Tuple2<>("zs", 18)); broadData.add(new Tuple2<>("ls", 20)); broadData.add(new Tuple2<>("ww", 17)); DataSet<Tuple2<String, Integer>> tupleData = env.fromCollection(broadData); //1.1:處理需要廣播的數據,把數據集轉換成map類型,map中的key就是用戶姓名,value就是用戶年齡 DataSet<HashMap<String, Integer>> toBroadcast = tupleData.map(new MapFunction<Tuple2<String, Integer>, HashMap<String, Integer>>() { @Override public HashMap<String, Integer> map(Tuple2<String, Integer> value) throws Exception { HashMap<String, Integer> res = new HashMap<>(); res.put(value.f0, value.f1); return res; } }); //源數據 DataSource<String> data = env.fromElements("zs", "ls", "ww"); //注意:在這裡需要使用到RichMapFunction獲取廣播變量 DataSet<String> result = data.map(new RichMapFunction<String, String>() { List<HashMap<String, Integer>> broadCastMap = new ArrayList<HashMap<String, Integer>>(); HashMap<String, Integer> allMap = new HashMap<String, Integer>(); /** * 這個方法只會執行一次 * 可以在這裡實現一些初始化的功能 * 所以,就可以在open方法中獲取廣播變量數據 */ @Override public void open(Configuration parameters) throws Exception { super.open(parameters); //3:獲取廣播數據 this.broadCastMap = getRuntimeContext().getBroadcastVariable("broadCastMapName"); for (HashMap map : broadCastMap) { allMap.putAll(map); } } @Override public String map(String value) throws Exception { Integer age = allMap.get(value); return value + "," + age; } }).withBroadcastSet(toBroadcast, "broadCastMapName");//2:執行廣播數據的操作 result.print(); } }
4 生產案例演示
實際生產中有時候是需要更新廣播變量的,但不是實時更新的,一般會設置一個更新周期,幾分鐘,幾小時的都很常見,根據業務而定。
由於廣播變量需要更新,解決辦法一般是需要將廣播變量做成另一個 source,進行流與流之間的 connect 操作,定時刷新廣播的source,從而達到廣播變量修改的目的。
4.1.1 使用 redis 中的數據作為廣播變量的思路:
消費 kafka 中的數據,使用 redis 中的數據作為廣播數據,進行數據清洗後 寫到 kafka中。
示例代碼分為三個部分:kafka 生產者,redis 廣播數據源,執行入口類
-
構建 kafka 生成者,模擬數據 (以下代碼的消費消息來源均是此處生產)
/** * 模擬數據源 */ public class kafkaProducer { public static void main(String[] args) throws Exception{ Properties prop = new Properties(); //指定kafka broker地址 prop.put("bootstrap.servers", "10.20.7.20:9092,10.20.7.51:9092,10.20.7.50:9092"); //指定key value的序列化方式 prop.put("key.serializer", StringSerializer.class.getName()); prop.put("value.serializer", StringSerializer.class.getName()); //指定topic名稱 String topic = "data_flink_bigdata_test"; //創建producer鏈接 KafkaProducer<String, String> producer = new KafkaProducer<String,String>(prop); //{"dt":"2018-01-01 10:11:11","countryCode":"US","data":[{"type":"s1","score":0.3,"level":"A"},{"type":"s2","score":0.2,"level":"B"}]} while(true){ String message = "{\"dt\":\""+getCurrentTime()+"\",\"countryCode\":\""+getCountryCode()+"\",\"data\":[{\"type\":\""+getRandomType()+"\",\"score\":"+getRandomScore()+",\"level\":\""+getRandomLevel()+"\"},{\"type\":\""+getRandomType()+"\",\"score\":"+getRandomScore()+",\"level\":\""+getRandomLevel()+"\"}]}"; System.out.println(message); //同步的方式,往Kafka裏面生產數據 producer.send(new ProducerRecord<String, String>(topic,message)); Thread.sleep(2000); } //關閉鏈接 //producer.close(); } public static String getCurrentTime(){ SimpleDateFormat sdf = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss"); return sdf.format(new Date()); } public static String getCountryCode(){ String[] types = {"US","TW","HK","PK","KW","SA","IN"}; Random random = new Random(); int i = random.nextInt(types.length); return types[i]; } public static String getRandomType(){ String[] types = {"s1","s2","s3","s4","s5"}; Random random = new Random(); int i = random.nextInt(types.length); return types[i]; } public static double getRandomScore(){ double[] types = {0.3,0.2,0.1,0.5,0.8}; Random random = new Random(); int i = random.nextInt(types.length); return types[i]; } public static String getRandomLevel(){ String[] types = {"A","A+","B","C","D"}; Random random = new Random(); int i = random.nextInt(types.length); return types[i]; } }
-
redis 數據作為廣播數據
/** * redis中準備的數據源 * source: * * hset areas AREA_US US * hset areas AREA_CT TW,HK * hset areas AREA_AR PK,KW,SA * hset areas AREA_IN IN * * result: * * HashMap * * US,AREA_US * TW,AREA_CT * HK,AREA_CT * */ public class BigDataRedisSource implements SourceFunction<HashMap<String,String>> { private Logger logger= LoggerFactory.getLogger(BigDataRedisSource.class); private Jedis jedis; private boolean isRunning=true; @Override public void run(SourceContext<HashMap<String, String>> cxt) throws Exception { this.jedis = new Jedis("localhost",6379); HashMap<String, String> map = new HashMap<>(); while(isRunning){ try{ map.clear(); Map<String, String> areas = jedis.hgetAll("areas"); /** * AREA_CT TT,AA * * map: * TT,AREA_CT * AA,AREA_CT */ for(Map.Entry<String,String> entry: areas.entrySet()){ String area = entry.getKey(); String value = entry.getValue(); String[] fields = value.split(","); for(String country:fields){ map.put(country,area); } } if(map.size() > 0 ){ cxt.collect(map); } Thread.sleep(60000); }catch (JedisConnectionException e){ logger.error("redis連接異常",e.getCause()); this.jedis = new Jedis("localhost",6379); }catch (Exception e){ logger.error("數據源異常",e.getCause()); } } } @Override public void cancel() { isRunning=false; if(jedis != null){ jedis.close(); } } }
-
程序入口類
/** * @author 大數據江湖 * @version 1.0 * @date 2021/4/25. * * * 使用 kafka 輸出流和 redis 輸出流 進行合併清洗 * * */ public class 廣播方式1分兩個流進行connnect操作 { public static void main(String[] args) throws Exception { //1 獲取執行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(3);//並行度取決於 kafka 中的分區數 保持與kafka 一致 //2 設置 checkpoint //開啟checkpoint 一分鐘一次 env.enableCheckpointing(60000); //設置checkpoint 僅一次語義 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //兩次checkpoint的時間間隔 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000); //最多只支持1個checkpoint同時執行 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //checkpoint超時的時間 env.getCheckpointConfig().setCheckpointTimeout(60000); // 任務失敗後也保留 checkPonit數據 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3, // 嘗試重啟的次數 Time.of(10, TimeUnit.SECONDS) // 間隔 )); // 設置 checkpoint 路徑 // env.setStateBackend(new FsStateBackend("hdfs://192.168.123.103:9000/flink/checkpoint")); //3 設置 kafka Flink 消費 //創建 Kafka 消費信息 String topic="data_flink_bigdata_test"; Properties consumerProperties = new Properties(); consumerProperties.put("bootstrap.servers","10.20.7.20:9092,10.20.7.51:9092,10.20.7.50:9092"); consumerProperties.put("group.id","data_test_new_1"); consumerProperties.put("enable.auto.commit", "false"); consumerProperties.put("auto.offset.reset","earliest"); //4 獲取 kafka 與 redis 數據源 FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), consumerProperties); DataStreamSource<String> kafkaSourceData = env.addSource(consumer); //直接使用廣播的方式 後續作為兩個數據流來操作 DataStream<HashMap<String, String>> redisSourceData = env.addSource(new NxRedisSource()).broadcast(); //5 兩個數據源進行 ETL 處理 使用 connect 連接處理 SingleOutputStreamOperator<String> etlData = kafkaSourceData.connect(redisSourceData).flatMap(new MyETLProcessFunction()); //6 新創建一個 kafka 生產者 進行發送 String outputTopic="allDataClean"; // 輸出給下游 kafka Properties producerProperties = new Properties(); producerProperties.put("bootstrap.servers","10.20.7.20:9092,10.20.7.51:9092,10.20.7.50:9092"); FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(outputTopic, new KeyedSerializationSchemaWrapper<String>(new SimpleStringSchema()), producerProperties); etlData.addSink(producer); //7 提交任務執行 env.execute("DataClean"); } /** * in 1 kafka source : * * {"dt":"2018-01-01 10:11:11","countryCode":"US","data":[{"type":"s1","score":0.3,"level":"A"},{"type":"s2","score":0.2,"level":"B"}]} * * * in 2 redis source * * * US,AREA_US * TW,AREA_CT * HK,AREA_CT * * * * out 合併後的source */ private static class MyETLProcessFunction implements CoFlatMapFunction<String,HashMap<String,String>,String> { //用來存儲 redis 中的數據 HashMap<String,String> allMap = new HashMap<String,String>(); @Override public void flatMap1(String line, Collector<String> collector) throws Exception { //將 kafka 數據 按 redis 數據進行替換 // s -> kafka 數據 //allMap -> redis 數據 JSONObject jsonObject = JSONObject.parseObject(line); String dt = jsonObject.getString("dt"); String countryCode = jsonObject.getString("countryCode"); //可以根據countryCode獲取大區的名字 String area = allMap.get(countryCode); JSONArray data = jsonObject.getJSONArray("data"); for (int i = 0; i < data.size(); i++) { JSONObject dataObject = data.getJSONObject(i); System.out.println("大區:"+area); dataObject.put("dt", dt); dataObject.put("area", area); //下游獲取到數據的時候,也就是一個json格式的數據 collector.collect(dataObject.toJSONString()); } } @Override public void flatMap2(HashMap<String, String> stringStringHashMap, Collector<String> collector) throws Exception { //將 redis 中 數據進行賦值 allMap = stringStringHashMap; } } }
4.1.2 使用 MapState 進行廣播程序優化:
優化的點在於 (下面代碼中 TODO 標識點):
-
進行數據廣播時需要使用 MapStateDescriptor 進行註冊
-
進行兩個流合併處理時 使用 process 函數
-
處理函數中使用 MapState 來存取 redis 中的數據
/** * @author 大數據江湖 * @version 1.0 * @date 2021/4/25. * <p> * 使用 kafka 輸出流和 redis 輸出流 進行合併清洗 * <p> * 線上使用的方式 */ public class 廣播方式2使用MapState對方式1改造 { public static void main(String[] args) throws Exception { //1 獲取執行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(3);//並行度取決於 kafka 中的分區數 保持與kafka 一致 //2 設置 checkpoint //開啟checkpoint 一分鐘一次 env.enableCheckpointing(60000); //設置checkpoint 僅一次語義 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //兩次checkpoint的時間間隔 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000); //最多只支持1個checkpoint同時執行 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //checkpoint超時的時間 env.getCheckpointConfig().setCheckpointTimeout(60000); // 任務失敗後也保留 checkPonit數據 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3, // 嘗試重啟的次數 Time.of(10, TimeUnit.SECONDS) // 間隔 )); // 設置 checkpoint 路徑 //env.setStateBackend(new FsStateBackend("hdfs://192.168.123.103:9000/flink/checkpoint")); //3 設置 kafka Flink 消費 //創建 Kafka 消費信息 String topic = "data_flink_bigdata_test"; Properties consumerProperties = new Properties(); consumerProperties.put("bootstrap.servers", "10.20.7.20:9092,10.20.7.51:9092,10.20.7.50:9092"); consumerProperties.put("group.id", "data_flink_fpy_test_consumer"); consumerProperties.put("enable.auto.commit", "false"); consumerProperties.put("auto.offset.reset", "earliest"); //4 獲取 kafka 與 redis 數據源 FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), consumerProperties); DataStreamSource<String> kafkaSourceData = env.addSource(consumer); // 獲取 redis 數據源並且進行廣播 線上的廣播也是 source + 廣播方法 MapStateDescriptor<String, String> descriptor = new MapStateDescriptor<String, String>( "RedisBdStream", String.class, String.class ); //5 兩個數據源進行 ETL 處理 使用 connect 連接處理 TODO process 替換 FlatMap //TODO 使用 MapState 來進行廣播 BroadcastStream<HashMap<String, String>> redisSourceData = env.addSource(new NxRedisSource()).broadcast(descriptor); SingleOutputStreamOperator<String> etlData = kafkaSourceData.connect(redisSourceData).process(new MyETLProcessFunction()); //6 新創建一個 kafka 生產者 進行發送 String outputTopic = "allDataClean"; // 輸出給下游 kafka Properties producerProperties = new Properties(); producerProperties.put("bootstrap.servers","10.20.7.20:9092,10.20.7.51:9092,10.20.7.50:9092"); FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(outputTopic, new KeyedSerializationSchemaWrapper<String>(new SimpleStringSchema()), producerProperties); etlData.addSink(producer); etlData.print(); //7 提交任務執行 env.execute("DataClean"); } /** * in 1 kafka source * in 2 redis source * <p> * out 合併後的source */ private static class MyETLProcessFunction extends BroadcastProcessFunction<String, HashMap<String, String>, String> { // TODO 注意此處 descriptor 的名稱需要與 廣播時 (99行代碼) 名稱一致 MapStateDescriptor<String, String> descriptor = new MapStateDescriptor<String, String>( "RedisBdStream", String.class, String.class ); //邏輯的處理方法 kafka 的數據 @Override public void processElement(String line, ReadOnlyContext readOnlyContext, Collector<String> collector) throws Exception { //將 kafka 數據 按 redis 數據進行替換 // s -> kafka 數據 //allMap -> redis 數據 System.out.println("into processElement "); JSONObject jsonObject = JSONObject.parseObject(line); String dt = jsonObject.getString("dt"); String countryCode = jsonObject.getString("countryCode"); //可以根據countryCode獲取大區的名字 // String area = allDataMap.get(countryCode); //TODO 從MapState中獲取對應的Code String area = readOnlyContext.getBroadcastState(descriptor).get(countryCode); JSONArray data = jsonObject.getJSONArray("data"); for (int i = 0; i < data.size(); i++) { JSONObject dataObject = data.getJSONObject(i); System.out.println("大區:" + area); dataObject.put("dt", dt); dataObject.put("area", area); //下游獲取到數據的時候,也就是一個json格式的數據 collector.collect(dataObject.toJSONString()); } } //廣播流的處理方法 @Override public void processBroadcastElement(HashMap<String, String> stringStringHashMap, Context context, Collector<String> collector) throws Exception { // 將接收到的控制數據放到 broadcast state 中 //key , flink // 將 RedisMap中的值放入 MapState 中 for (Map.Entry<String, String> entry : stringStringHashMap.entrySet()) { //TODO 使用 MapState 存儲 redis 數據 context.getBroadcastState(descriptor).put(entry.getKey(), entry.getValue()); System.out.println(entry); } } } }
4.2 關係型數據庫廣播變量案例思路:
需求:
在 flink 流式處理中常常需要加載數據庫中的數據作為條件進行數據處理,有些表作為系統表,實時查詢效率很低,這時候就需要將這些數據作為廣播數據,而同時這些數據可能也需要定期的更新。
思路:
數據庫表的廣播變量思路同redis等緩存廣播數據的思路類似,也是使用 兩個source 進行 connect 處理 , 在數據庫表的 source 中定時刷新數據就可以了。
不同點在於這裡把數據庫查詢的操作轉成另一個工具類,在初始化時使用了靜態代碼塊,在廣播時使用了流的 connect 操作。
示例代碼分為三個部分:數據庫表廣播源,數據庫操作類,執行入口類
-
數據庫表廣播源
/** * @author 大數據江湖 * @Date:2021-5-17 * DB source 源頭 進行廣播 */ public class BigDataDBBroadSource extends RichSourceFunction<Map<String,Object>> { private final Logger logger = LoggerFactory.getLogger(BigDataDBBroadSource.class); private volatile boolean isRunning = true; public BigDataDBBroadSource() { } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); } @Override public void run(SourceContext<Map<String,Object>> sourceContext) throws Exception { while (isRunning) { //TODO 使用的是一個 DB 源頭的 source 60 s 刷新一次 進行往下游發送 TimeUnit.SECONDS.sleep(60); Map<String,Object> map = new HashMap<String,Object>(); //規則匹配關鍵詞 final DbBroadCastListInitUtil.Build ruleListInitUtil = new DbBroadCastListInitUtil.Build(); ruleListInitUtil.reloadRule(); map.put("dbsource", ruleListInitUtil); if(map.size() > 0) { sourceContext.collect(map); } } } @Override public void cancel() { this.isRunning = false; } @Override public void close() throws Exception { super.close(); } }
-
執行數據庫操作類
/** * 數據庫規則表初始化 * * @author 大數據江湖 * @Date:2021-5-17 * * US,AREA_US * TW,AREA_CT * HK,AREA_CT * */ public class DbBroadCastListInitUtil implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(DbBroadCastListInitUtil.class); // 數據庫規則信息 public static Map<String, String> areasMap = new HashMap<String, String>(); static { LOG.info("初始化 db 模塊"); Connection dbConn = null; try { if (dbConn == null || dbConn.isClosed()) { LOG.info("init dbConn start...."); LOG.info("init dbConn end...."); } HashMap<String, String> map = Maps.newHashMap(); map.put("US","AREA_US"); map.put("TW","AREA_CT"); map.put("HK","AREA_CT"); areasMap = map; } catch (Exception e) { LOG.error("init database [status:error]", e); throw new RuntimeException(" static article rule list db select error! , "+e.getMessage()) ; } finally { if(dbConn != null) { try { dbConn.close(); } catch (SQLException e) { LOG.error("dbConn conn close error!",e); } } } } public static class Build { // 數據庫規則信息 public static Map<String, String> newAreasMap = new HashMap<String, String>(); public void reloadRule() throws Exception { LOG.info("重新初始化 DB reloadRule 模塊"); Connection dbConn = null; try { if (dbConn == null || dbConn.isClosed()) { LOG.info("init dbConn start...."); LOG.info("init dbConn end...."); } HashMap<String, String> map = Maps.newHashMap(); map.put("US","AREA_US"); map.put("TW","AREA_CT"); map.put("HK","AREA_CT"); map.put("AM","AREA_CT"); newAreasMap = map; } catch (Exception e) { LOG.error("init database [status:error]", e); throw e; } finally { if(dbConn != null) { try { dbConn.close(); } catch (SQLException e) { LOG.error("dbConn conn close error!",e); } } } } public static Map<String, String> getNewAreasMap() { return newAreasMap; } } public static Build build() throws Exception { final DbBroadCastListInitUtil.Build build = new DbBroadCastListInitUtil.Build(); build.reloadRule(); return build; } }
-
程序入口類
/** * @author 大數據江湖 * @version 1.0 * @date 2021/4/25. * <p> * 使用 kafka 輸出流和 redis 輸出流 進行合併清洗 * <p> * 線上使用的方式 */ public class 廣播方式3使用DB對方式廣播 { public static void main(String[] args) throws Exception { //1 獲取執行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(3);//並行度取決於 kafka 中的分區數 保持與kafka 一致 //2 設置 checkpoint //開啟checkpoint 一分鐘一次 env.enableCheckpointing(60000); //設置checkpoint 僅一次語義 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //兩次checkpoint的時間間隔 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000); //最多只支持1個checkpoint同時執行 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //checkpoint超時的時間 env.getCheckpointConfig().setCheckpointTimeout(60000); // 任務失敗後也保留 checkPonit數據 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3, // 嘗試重啟的次數 Time.of(10, TimeUnit.SECONDS) // 間隔 )); // 設置 checkpoint 路徑 //env.setStateBackend(new FsStateBackend("hdfs://192.168.123.103:9000/flink/checkpoint")); //3 設置 kafka Flink 消費 //創建 Kafka 消費信息 String topic = "data_flink_bigdata_test"; Properties consumerProperties = new Properties(); consumerProperties.put("bootstrap.servers", "10.20.7.20:9092,10.20.7.51:9092,10.20.7.50:9092"); consumerProperties.put("group.id", "data_flink_bigdata_test_consumer"); consumerProperties.put("enable.auto.commit", "false"); consumerProperties.put("auto.offset.reset", "earliest"); //4 獲取 kafka 與 redis 數據源 FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), consumerProperties); DataStreamSource<String> kafkaSourceData = env.addSource(consumer); // 獲取 redis 數據源並且進行廣播 線上的廣播也是 source + 廣播方法 MapStateDescriptor<String, String> descriptor = new MapStateDescriptor<String, String>( "RedisBdStream", String.class, String.class ); //使用 數據庫源 來進行廣播 BroadcastStream<Map<String, Object>> broadcast = env.addSource(new BigDataDBBroadSource()).broadcast(descriptor); //5 兩個數據源進行 ETL 處理 使用 connect 連接處理 數據庫表信息進行廣播 SingleOutputStreamOperator<String> etlData = kafkaSourceData.connect(broadcast).process(new MyETLProcessFunction()); //6 新創建一個 kafka 生產者 進行發送 String outputTopic = "allDataClean"; // 輸出給下游 kafka /* Properties producerProperties = new Properties(); producerProperties.put("bootstrap.servers","10.20.7.20:9092,10.20.7.51:9092,10.20.7.50:9092"); FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(outputTopic, new KeyedSerializationSchemaWrapper<String>(new SimpleStringSchema()), producerProperties); etlData.addSink(producer);*/ etlData.print(); //7 提交任務執行 env.execute("DataClean"); } /** * in 1 kafka source * in 2 redis source * <p> * out 合併後的source * * * TODO 程序啟動後發生的事: * * 1 運行 open 方法 ,觸發靜態方法給 areasMap 賦值 * 2 運行 processElement 方法前, areasMap 肯定是值的,正常進行處理 * 3 當到 BigDataDBBroadSource 輪訓的時間後 ,刷新數據庫表數據到 areasMap ,此時 areasMap 會加入新值,完成廣播變量的更新 * 4 廣播變量更新後 繼續進行 processElement 數據處理 * */ private static class MyETLProcessFunction extends BroadcastProcessFunction<String, Map<String, Object>, String> { public Map<String, String> areasMap = new HashMap<String, String>(); @Override public void open(Configuration parameters) throws Exception { super.open(parameters); //觸發靜態方法去賦值 areasMap = DbBroadCastListInitUtil.areasMap; } //邏輯的處理方法 kafka 的數據 @Override public void processElement(String line, ReadOnlyContext readOnlyContext, Collector<String> collector) throws Exception { //將 kafka 數據 按 redis 數據進行替換 // s -> kafka 數據 //allMap -> redis 數據 System.out.println("into processElement "); JSONObject jsonObject = JSONObject.parseObject(line); String dt = jsonObject.getString("dt"); String countryCode = jsonObject.getString("countryCode"); //可以根據countryCode獲取大區的名字 // String area = allDataMap.get(countryCode); //從MapState中獲取對應的Code String area =areasMap.get(countryCode); JSONArray data = jsonObject.getJSONArray("data"); for (int i = 0; i < data.size(); i++) { JSONObject dataObject = data.getJSONObject(i); System.out.println("大區:" + area); dataObject.put("dt", dt); dataObject.put("area", area); //下游獲取到數據的時候,也就是一個json格式的數據 collector.collect(dataObject.toJSONString()); } } @Override public void processBroadcastElement(Map<String, Object> value, Context ctx, Collector<String> out) throws Exception { //廣播算子定時刷新後 將數據發送到下游 if (value != null && value.size() > 0) { Object obj = value.getOrDefault("dbsource", null); if (obj != null) { DbBroadCastListInitUtil.Build biulder = (DbBroadCastListInitUtil.Build) obj; //更新了 數據庫數據 areasMap = biulder.getNewAreasMap(); System.out.println("數據庫刷新算子運行完成!"); } } } } }
注意看最後處理函數啟動後發生的事:
-
運行 open 方法 ,觸發數據庫操作工具類靜態方法給 areasMap 賦值
-
運行執行類 processElement 方法前,此時 areasMap 肯定是值的,正常進行處理
-
當到數據庫源輪訓的時間後 ,刷新數據庫表數據到 areasMap ,此時 areasMap 會加入新值,完成廣播變量的更新
-
廣播變量更新後 繼續進行執行類 processElement 數據處理
至此 廣播程序的使用介紹完了, 對於廣播數據不需要改變的情況 參考基礎樣例;對於從緩存或數據庫等獲取廣播變量,同時又需要改變的情況,參考生成樣例即可。
PS: 文中代碼地址 —- //gitee.com/fanpengyi0922/flink-window-broadcast
— THE END —