Flink消費kafka如何獲取每條消息對應的topic
- 2020 年 3 月 26 日
- 筆記
1.首先自定義個 KafkaDeserializationSchema
public class CustomKafkaDeserializationSchema implements KafkaDeserializationSchema<Tuple2<String, String>> { @Override //nextElement 是否表示流的最後一條元素,我們要設置為 false ,因為我們需要 msg 源源不斷的被消費 public boolean isEndOfStream(Tuple2<String, String> nextElement) { return false; } @Override // 反序列化 kafka 的 record,我們直接返回一個 tuple2<kafkaTopicName,kafkaMsgValue> public Tuple2<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception { return new Tuple2<>(record.topic(), new String(record.value(), "UTF-8")); } @Override //告訴 Flink 我輸入的數據類型, 方便 Flink 的類型推斷 public TypeInformation<Tuple2<String, String>> getProducedType() { return new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); } }
2.使用自定義的 KafkaDeserializationSchema 進行消費
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "test"); FlinkKafkaConsumer<Tuple2<String, String>> kafkaConsumer = new FlinkKafkaConsumer<>("test", new CustomKafkaDeserializationSchema(), properties); kafkaConsumer.setStartFromEarliest(); env.addSource(kafkaConsumer).flatMap(new FlatMapFunction<Tuple2<String, String>, Object>() { @Override public void flatMap(Tuple2<String, String> value, Collector<Object> out) throws Exception { System.out.println("topic==== " + value.f0); } }); // execute program env.execute("Flink Streaming Java API Skeleton"); }