Flink消费kafka如何获取每条消息对应的topic

  • 2020 年 3 月 26 日
  • 筆記

1.首先自定义个 KafkaDeserializationSchema

public class CustomKafkaDeserializationSchema implements KafkaDeserializationSchema<Tuple2<String, String>> {  	@Override  	//nextElement 是否表示流的最后一条元素,我们要设置为 false ,因为我们需要 msg 源源不断的被消费  	public boolean isEndOfStream(Tuple2<String, String> nextElement) {  		return false;  	}    	@Override  	// 反序列化 kafka 的 record,我们直接返回一个 tuple2<kafkaTopicName,kafkaMsgValue>  	public Tuple2<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {  		return new Tuple2<>(record.topic(), new String(record.value(), "UTF-8"));  	}    	@Override  	//告诉 Flink 我输入的数据类型, 方便 Flink 的类型推断  	public TypeInformation<Tuple2<String, String>> getProducedType() {  		return new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);  	}  }

2.使用自定义的 KafkaDeserializationSchema 进行消费

public static void main(String[] args) throws Exception {  		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();    		Properties properties = new Properties();  		properties.setProperty("bootstrap.servers", "localhost:9092");  		properties.setProperty("group.id", "test");    		FlinkKafkaConsumer<Tuple2<String, String>> kafkaConsumer = new FlinkKafkaConsumer<>("test", new CustomKafkaDeserializationSchema(), properties);  		kafkaConsumer.setStartFromEarliest();  		env.addSource(kafkaConsumer).flatMap(new FlatMapFunction<Tuple2<String, String>, Object>() {  			@Override  			public void flatMap(Tuple2<String, String> value, Collector<Object> out) throws Exception {  				System.out.println("topic==== " + value.f0);  			}  		});    		// execute program  		env.execute("Flink Streaming Java API Skeleton");  	}