spring boot整合kafka

  • 2019 年 10 月 6 日
  • 筆記

最近項目需求用到了kafka資訊中間件,在此做一次簡單的記錄,方便以後其它項目用到。

引入依賴

<dependency>      <groupId>org.springframework.kafka</groupId>      <artifactId>spring-kafka</artifactId>  </dependency>

配置文件

kafka.consumer.servers=127.0.0.1:9092  kafka.consumer.enable.auto.commit=true  kafka.consumer.session.timeout=6000  kafka.consumer.auto.commit.interval=100  kafka.consumer.auto.offset.reset=latest  kafka.consumer.group.id=kafka-test-group  kafka.consumer.concurrency=10    kafka.producer.servers=127.0.0.1:9092  kafka.producer.retries=1  kafka.producer.batch.size=4096  kafka.producer.linger=1  kafka.producer.buffer.memory=40960

生產者配置類

@Configuration  @EnableKafka  public class KafkaProducerConfig {      @Value("${kafka.producer.servers}")      private String servers;      @Value("${kafka.producer.retries}")      private int retries;      @Value("${kafka.producer.batch.size}")      private int batchSize;      @Value("${kafka.producer.linger}")      private int linger;      @Value("${kafka.producer.buffer.memory}")      private int bufferMemory;      @Bean      public KafkaTemplate<String, String> kafkaTemplate() {          return new KafkaTemplate(producerFactory());      }      public ProducerFactory<String, String> producerFactory() {          return new DefaultKafkaProducerFactory<>(producerConfigs());      }      public Map<String, Object> producerConfigs() {          Map<String, Object> props = new HashMap<>();          props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);          props.put(ProducerConfig.RETRIES_CONFIG, retries);          props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);          props.put(ProducerConfig.LINGER_MS_CONFIG, linger);          props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);          props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);          props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);          return props;      }  }

消費者配置類

@Configuration  @EnableKafka  public class KafkaConsumerConfig {      @Value("${kafka.consumer.servers}")      private String servers;      @Value("${kafka.consumer.enable.auto.commit}")      private boolean enableAutoCommit;      @Value("${kafka.consumer.session.timeout}")      private String sessionTimeout;      @Value("${kafka.consumer.auto.commit.interval}")      private String autoCommitInterval;      @Value("${kafka.consumer.group.id}")      private String groupId;      @Value("${kafka.consumer.auto.offset.reset}")      private String autoOffsetReset;      @Value("${kafka.consumer.concurrency}")      private int concurrency;      @Bean      public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {          ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();          factory.setConsumerFactory(consumerFactory());          factory.setConcurrency(concurrency);          factory.getContainerProperties().setPollTimeout(1500);          return factory;      }      public ConsumerFactory<String, String> consumerFactory() {          return new DefaultKafkaConsumerFactory<>(consumerConfigs());      }      public Map<String, Object> consumerConfigs() {          Map<String, Object> propsMap = new HashMap<>(8);          propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);          propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);          propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);          propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);          propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);          propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);          propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);          propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);          return propsMap;      }  }

生產者類

@Component  public class KafkaProducer {      private Logger logger = LoggerFactory.getLogger(getClass());      @Autowired      private KafkaTemplate kafkaTemplate;        public void sendMessage(String topic, String message) {          logger.info("on message:{}", message);          kafkaTemplate.send(topic, message);      }  }

消費者類

@Component  public class VideoCosConsumer {      protected final Logger logger = LoggerFactory.getLogger(this.getClass());        @KafkaListener(topics = {"test-topic"})      public void consumerMessage(String message) {          logger.info("on message:{}", message);      }  }

  以上就是spring cloud整合kafka的過程,現在spring讓我們程式碼搬運工越來越沒有活幹了,連複製粘貼都不行了,只能簡單的拼裝需要的實體類。