­

Docker實戰之Kafka集群

1. 概述

Apache Kafka 是一個快速、可擴展的、高吞吐、可容錯的分散式發布訂閱消息系統。其具有高吞吐量、內置分區、支援數據副本和容錯的特性,適合在大規模消息處理場景中使用。

筆者之前在物聯網公司工作,其中 Kafka 作為物聯網 MQ 選型的事實標準,這裡優先給大家搭建 Kafka 集群環境。由於 Kafka 的安裝需要依賴 Zookeeper,對 Zookeeper 還不了解的小夥伴可以在 這裡 先認識下 Zookeeper。

Kafka 能解決什麼問題呢?先說一下消息隊列常見的使用場景吧,其實場景有很多,但是比較核心的有 3 個:解耦、非同步、削峰。

2. Kafka 基本概念

Kafka 部分名詞解釋如下:

  • Broker:消息中間件處理結點,一個 Kafka 節點就是一個 broker,多個 broker 可以組成一個 Kafka 集群。
  • Topic:一類消息,例如 page view 日誌、click 日誌等都可以以 topic 的形式存在,Kafka 集群能夠同時負責多個 topic 的分發。
  • Partition:topic 物理上的分組,一個 topic 可以分為多個 partition,每個 partition 是一個有序的隊列。
  • Segment:partition 物理上由多個 segment 組成,下面有詳細說明。
  • offset:每個 partition 都由一系列有序的、不可變的消息組成,這些消息被連續的追加到 partition 中。partition 中的每個消息都有一個連續的序列號叫做 offset,用於 partition 唯一標識一條消息.每個 partition 中的消息都由 offset=0 開始記錄消息。

3. Docker 環境搭建

配合上一節的 Zookeeper 環境,計劃搭建一個 3 節點的集群。宿主機 IP 為 192.168.124.5

docker-compose-kafka-cluster.yml

version: '3.7'    networks:    docker_net:      external: true    services:      kafka1:      image: wurstmeister/kafka      restart: unless-stopped      container_name: kafka1      ports:        - "9093:9092"      external_links:        - zoo1        - zoo2        - zoo3      environment:        KAFKA_BROKER_ID: 1        KAFKA_ADVERTISED_HOST_NAME: 192.168.124.5                   ## 修改:宿主機IP        KAFKA_ADVERTISED_PORT: 9093                                 ## 修改:宿主機映射port        KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.124.5:9093    ## 綁定發布訂閱的埠。修改:宿主機IP        KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2181,zoo3:2181"      volumes:        - "./kafka/kafka1/docker.sock:/var/run/docker.sock"        - "./kafka/kafka1/data/:/kafka"      networks:        - docker_net        kafka2:      image: wurstmeister/kafka      restart: unless-stopped      container_name: kafka2      ports:        - "9094:9092"      external_links:        - zoo1        - zoo2        - zoo3      environment:        KAFKA_BROKER_ID: 2        KAFKA_ADVERTISED_HOST_NAME: 192.168.124.5                 ## 修改:宿主機IP        KAFKA_ADVERTISED_PORT: 9094                               ## 修改:宿主機映射port        KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.124.5:9094   ## 修改:宿主機IP        KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2181,zoo3:2181"      volumes:        - "./kafka/kafka2/docker.sock:/var/run/docker.sock"        - "./kafka/kafka2/data/:/kafka"      networks:        - docker_net      kafka3:      image: wurstmeister/kafka      restart: unless-stopped      container_name: kafka3      ports:        - "9095:9092"      external_links:        - zoo1        - zoo2        - zoo3      environment:        KAFKA_BROKER_ID: 3        KAFKA_ADVERTISED_HOST_NAME: 192.168.124.5                 ## 修改:宿主機IP        KAFKA_ADVERTISED_PORT: 9095                              ## 修改:宿主機映射port        KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.124.5:9095   ## 修改:宿主機IP        KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2181,zoo3:2181"      volumes:        - "./kafka/kafka3/docker.sock:/var/run/docker.sock"        - "./kafka/kafka3/data/:/kafka"      networks:        - docker_net      kafka-manager:      image: sheepkiller/kafka-manager:latest      restart: unless-stopped      container_name: kafka-manager      hostname: kafka-manager      ports:        - "9000:9000"      links:            # 連接本compose文件創建的container        - kafka1        - kafka2        - kafka3      external_links:   # 連接本compose文件以外的container        - zoo1        - zoo2        - zoo3      environment:        ZK_HOSTS: zoo1:2181,zoo2:2181,zoo3:2181                 ## 修改:宿主機IP        TZ: CST-8      networks:        - docker_net

執行以下命令啟動

docker-compose -f docker-compose-kafka-cluster.yml up -d

可以看到 kafka 集群已經啟動成功。

4. Kafka 初認識

4.1 可視化管理

細心的小夥伴發現上邊的配置除了 kafka 外還有一個 kafka-manager 模組。它是 kafka 的可視化管理模組。因為 kafka 的元數據、配置資訊由 Zookeeper 管理,這裡我們在 UI 頁面做下相關配置。

1. 訪問 http:localhost:9000,按圖示添加相關配置

2. 配置後我們可以看到默認有一個 topic(__consumer_offsets),3 個 brokers。該 topic 分 50 個 partition,用於記錄 kafka 的消費偏移量。

4.2 Zookeeper 在 kafka 環境中做了什麼

1. 首先觀察下根目錄

kafka 基於 zookeeper,kafka 啟動會將元數據保存在 zookeeper 中。查看 zookeeper 節點目錄,會發現多了很多和 kafka 相關的目錄。結果如下:

➜  docker zkCli -server 127.0.0.1:2183  Connecting to 127.0.0.1:2183  Welcome to ZooKeeper!  JLine support is enabled    WATCHER::    WatchedEvent state:SyncConnected type:None path:null  [zk: 127.0.0.1:2183(CONNECTED) 0] ls /  [cluster, controller, brokers, zookeeper, admin, isr_change_notification, log_dir_event_notification, controller_epoch, zk-test0000000000, kafka-manager, consumers, latest_producer_id_block, config]

2. 查看我們映射的 kafka 目錄,新版本的 kafka 偏移量不再存儲在 zk 中,而是在 kafka 自己的環境中。

我們節選了部分目錄(包含 2 個 partition)

├── kafka1  │   ├── data  │   │   └── kafka-logs-c4e2e9edc235  │   │       ├── __consumer_offsets-1  │   │       │   ├── 00000000000000000000.index       // segment索引文件  │   │       │   ├── 00000000000000000000.log         // 數據文件  │   │       │   ├── 00000000000000000000.timeindex   // 消息時間戳索引文件  │   │       │   └── leader-epoch-checkpoint  ...  │   │       ├── __consumer_offsets-7  │   │       │   ├── 00000000000000000000.index  │   │       │   ├── 00000000000000000000.log  │   │       │   ├── 00000000000000000000.timeindex  │   │       │   └── leader-epoch-checkpoint  │   │       ├── cleaner-offset-checkpoint  │   │       ├── log-start-offset-checkpoint  │   │       ├── meta.properties  │   │       ├── recovery-point-offset-checkpoint  │   │       └── replication-offset-checkpoint  │   └── docker.sock

結果與 Kafka-Manage 顯示結果一致。圖示的文件是一個 Segment,00000000000000000000.log 表示 offset 從 0 開始,隨著數據不斷的增加,會有多個 Segment 文件。

5. 生產與消費

5.1 創建主題

➜  docker docker exec -it kafka1 /bin/bash   # 進入容器  bash-4.4# cd /opt/kafka/   # 進入安裝目錄  bash-4.4# ./bin/kafka-topics.sh --list --zookeeper zoo1:2181,zoo2:2181,zoo3:2181   # 查看主題列表  __consumer_offsets  bash-4.4# ./bin/kafka-topics.sh --create --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 --replication-factor 2 --partitions 3 --topic test    # 新建主題  Created topic test.

說明:
–replication-factor 副本數;
–partitions 分區數;
replication<=broker(一定);
有效消費者數<=partitions 分區數(一定);

新建主題後, 再次查看映射目錄, 由圖可見,partition 在 3 個 broker 上均勻分布。

5.2 生產消息

bash-4.4# ./bin/kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9092,kafka3:9092  --topic test  >msg1  >msg2  >msg3  >msg4  >msg5  >msg6

5.3 消費消息

bash-4.4# ./bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic test --from-beginning  msg1  msg3  msg2  msg4  msg6  msg5

–from-beginning 代表從頭開始消費

5.4 消費詳情

查看消費者組

bash-4.4# ./bin/kafka-consumer-groups.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --list  KafkaManagerOffsetCache  console-consumer-86137

消費組偏移量

bash-4.4# ./bin/kafka-consumer-groups.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --describe --group KafkaManagerOffsetCache

查看 topic 詳情

bash-4.4# ./bin/kafka-topics.sh --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 --describe --topic test  Topic: test PartitionCount: 3   ReplicationFactor: 2    Configs:      Topic: test Partition: 0    Leader: 3   Replicas: 3,1   Isr: 3,1      Topic: test Partition: 1    Leader: 1   Replicas: 1,2   Isr: 1,2      Topic: test Partition: 2    Leader: 2   Replicas: 2,3   Isr: 2,3

查看.log 數據文件

bash-4.4# ./bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.log  --print-data-log  Dumping /kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.log  Starting offset: 0  baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1583317546421 size: 72 magic: 2 compresscodec: NONE crc: 1454276831 isvalid: true  | offset: 0 CreateTime: 1583317546421 keysize: -1 valuesize: 4 sequence: -1 headerKeys: [] payload: msg2  baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 72 CreateTime: 1583317550369 size: 72 magic: 2 compresscodec: NONE crc: 3578672322 isvalid: true  | offset: 1 CreateTime: 1583317550369 keysize: -1 valuesize: 4 sequence: -1 headerKeys: [] payload: msg4  baseOffset: 2 lastOffset: 2 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 144 CreateTime: 1583317554831 size: 72 magic: 2 compresscodec: NONE crc: 2727139808 isvalid: true  | offset: 2 CreateTime: 1583317554831 keysize: -1 valuesize: 4 sequence: -1 headerKeys: [] payload: msg6

這裡需要看下自己的文件路徑是什麼,別直接 copy 我的哦

查看.index 索引文件

bash-4.4# ./bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.index  Dumping /kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.index  offset: 0 position: 0

查看.timeindex 索引文件

bash-4.4# ./bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.timeindex  --verify-index-only  Dumping /kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.timeindex  Found timestamp mismatch in :/kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.timeindex    Index timestamp: 0, log timestamp: 1583317546421

6. SpringBoot 集成

筆者 SpringBoot 版本是 2.2.2.RELEASE

pom.xml 添加依賴

        <dependency>              <groupId>org.springframework.kafka</groupId>              <artifactId>spring-kafka</artifactId>              <version>2.4.0.RELEASE</version>          </dependency>

生產者配置

@Configuration  public class KafkaProducerConfig {        /**       * producer配置       * @return       */      public Map<String, Object> producerConfigs() {          Map<String, Object> props = new HashMap<>();          // 指定多個kafka集群多個地址 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094          props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.124.5:9093,192.168.124.5:9094,192.168.124.5:9095");          // 重試次數,0為不啟用重試機制          props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);          // acks=0 把消息發送到kafka就認為發送成功          // acks=1 把消息發送到kafka leader分區,並且寫入磁碟就認為發送成功          // acks=all 把消息發送到kafka leader分區,並且leader分區的副本follower對消息進行了同步就任務發送成功          props.put(ProducerConfig.ACKS_CONFIG,"all");          // 生產者空間不足時,send()被阻塞的時間,默認60s          props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 6000);          // 控制批處理大小,單位為位元組          props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);          // 批量發送,延遲為1毫秒,啟用該功能能有效減少生產者發送消息次數,從而提高並發量          props.put(ProducerConfig.LINGER_MS_CONFIG, 1);          // 生產者可以使用的總記憶體位元組來緩衝等待發送到伺服器的記錄          props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);          // 消息的最大大小限制,也就是說send的消息大小不能超過這個限制, 默認1048576(1MB)          props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,1048576);          // 客戶端id          props.put(ProducerConfig.CLIENT_ID_CONFIG,"producer.client.id.topinfo");          // 鍵的序列化方式          props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);          // 值的序列化方式          props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);          // 壓縮消息,支援四種類型,分別為:none、lz4、gzip、snappy,默認為none。          // 消費者默認支援解壓,所以壓縮設置在生產者,消費者無需設置。          props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"none");          return props;      }        /**       * producer工廠配置       * @return       */      public ProducerFactory<String, String> producerFactory() {          return new DefaultKafkaProducerFactory<>(producerConfigs());      }        /**       * Producer Template 配置       */      @Bean(name="kafkaTemplate")      public KafkaTemplate<String, String> kafkaTemplate() {          return new KafkaTemplate<>(producerFactory());      }  }

消費者配置

@Configuration  public class KafkaConsumerConfig {          private static final String GROUP0_ID = "group0";      private static final String GROUP1_ID = "group1";        /**       * 1. setAckMode: 消費者手動提交ack       *       * RECORD: 每處理完一條記錄後提交。       * BATCH(默認): 每次poll一批數據後提交一次,頻率取決於每次poll的調用頻率。       * TIME: 每次間隔ackTime的時間提交。       * COUNT: 處理完poll的一批數據後並且距離上次提交處理的記錄數超過了設置的ackCount就提交。       * COUNT_TIME: TIME和COUNT中任意一條滿足即提交。       * MANUAL: 手動調用Acknowledgment.acknowledge()後,並且處理完poll的這批數據後提交。       * MANUAL_IMMEDIATE: 手動調用Acknowledgment.acknowledge()後立即提交。       *       * 2. factory.setConcurrency(3);       * 此處設置的目的在於:假設 topic test 下有 0、1、2三個 partition,Spring Boot中只有一個 @KafkaListener() 消費者訂閱此 topic,此處設置並發為3,       * 啟動後 會有三個不同的消費者分別訂閱 p0、p1、p2,本地實際有三個消費者執行緒。       * 而 factory.setConcurrency(1); 的話 本地只有一個消費者執行緒, p0、p1、p2被同一個消費者訂閱。       * 由於 一個partition只能被同一個消費者組下的一個消費者訂閱,對於只有一個 partition的topic,即使設置 並發為3,也只會有一個消費者,多餘的消費者沒有 partition可以訂閱。       *       * 3. factory.setBatchListener(true);       * 設置批量消費 ,每個批次數量在Kafka配置參數ConsumerConfig.MAX_POLL_RECORDS_CONFIG中配置,       * 限制的是 一次批量接收的最大條數,而不是 等到達到最大條數才接收,這點容易被誤解。       * 實際測試時,接收是實時的,當生產者大量寫入時,一次批量接收的消息數量為 配置的最大條數。       */      @Bean      KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {          ConcurrentKafkaListenerContainerFactory<Integer, String>                  factory = new ConcurrentKafkaListenerContainerFactory<>();          // 設置消費者工廠          factory.setConsumerFactory(consumerFactory());          // 設置為批量消費,每個批次數量在Kafka配置參數中設置ConsumerConfig.MAX_POLL_RECORDS_CONFIG          factory.setBatchListener(true);          // 消費者組中執行緒數量,消費者數量<=partition數量,即使配置的消費者數量大於partition數量,多餘消費者無法消費到數據。          factory.setConcurrency(4);          // 拉取超時時間          factory.getContainerProperties().setPollTimeout(3000);          // 手動提交          factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);          return factory;      }        @Bean      public ConsumerFactory<Integer, String> consumerFactory() {          Map<String, Object> map = consumerConfigs();          map.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP0_ID);          return new DefaultKafkaConsumerFactory<>(consumerConfigs());      }    //    @Bean  //    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory1() {  //        ConcurrentKafkaListenerContainerFactory<Integer, String>  //                factory = new ConcurrentKafkaListenerContainerFactory<>();  //        // 設置消費者工廠  //        factory.setConsumerFactory(consumerFactory1());  //        // 設置為批量消費,每個批次數量在Kafka配置參數中設置ConsumerConfig.MAX_POLL_RECORDS_CONFIG  //        factory.setBatchListener(true);  //        // 消費者組中執行緒數量,消費者數量<=partition數量,即使配置的消費者數量大於partition數量,多餘消費者無法消費到數據。  //        factory.setConcurrency(3);  //        // 拉取超時時間  //        factory.getContainerProperties().setPollTimeout(3000);  //        // 手動提交  //        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);  //        return factory;  //    }  //  //    public ConsumerFactory<Integer, String> consumerFactory1() {  //        Map<String, Object> map = consumerConfigs();  //        map.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP1_ID);  //        return new DefaultKafkaConsumerFactory<>(consumerConfigs());  //    }        @Bean      public Map<String, Object> consumerConfigs() {          Map<String, Object> props = new HashMap<>();          // Kafka地址          props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.124.5:9093,192.168.124.5:9094,192.168.124.5:9095");          // 是否自動提交offset偏移量(默認true)          props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);          // 批量消費          props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");          // 消費者組          props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-default");          // 自動提交的頻率(ms)  //        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");          // Session超時設置          props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");          // 鍵的反序列化方式          props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);          // 值的反序列化方式          props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);          // offset偏移量規則設置:          // (1)、earliest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費          // (2)、latest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據          // (3)、none:topic各分區都存在已提交的offset時,從offset後開始消費;只要有一個分區不存在已提交的offset,則拋出異常          props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");          return props;      }    }

主題配置

@Configuration  public class KafkaTopicConfig {        /**       * 定義一個KafkaAdmin的bean,可以自動檢測集群中是否存在topic,不存在則創建       */      @Bean      public KafkaAdmin kafkaAdmin() {          Map<String, Object> configs = new HashMap<>();          // 指定多個kafka集群多個地址,例如:192.168.2.11,9092,192.168.2.12:9092,192.168.2.13:9092          configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.124.5:9093,192.168.124.5:9094,192.168.124.5:9095");          return new KafkaAdmin(configs);      }        /**       * 創建 Topic       */      @Bean      public NewTopic topicinfo() {          // 創建topic,需要指定創建的topic的"名稱"、"分區數"、"副本數量(副本數數目設置要小於Broker數量)"          return new NewTopic("test", 3, (short) 2);      }    }

消費者服務

@Slf4j  @Service  public class KafkaConsumerService {      //    /**  //     * 單條消費  //     * @param message  //     */  //    @KafkaListener(id = "id0", topics = {Constant.TOPIC}, containerFactory="kafkaListenerContainerFactory")  //    public void kafkaListener0(String message){  //        log.info("consumer:group0 --> message:{}", message);  //    }  //  //    @KafkaListener(id = "id1", topics = {Constant.TOPIC}, groupId = "group1")  //    public void kafkaListener1(String message){  //        log.info("consumer:group1 --> message:{}", message);  //    }  //    /**  //     * 監聽某個 Topic 的某個分區示例,也可以監聽多個 Topic 的分區  //     * 為什麼找不到group2呢?  //     * @param message  //     */  //    @KafkaListener(id = "id2", groupId = "group2", topicPartitions = { @TopicPartition(topic = Constant.TOPIC, partitions = { "0" }) })  //    public void kafkaListener2(String message) {  //        log.info("consumer:group2 --> message:{}", message);  //    }  //  //    /**  //     * 獲取監聽的 topic 消息頭中的元數據  //     * @param message  //     * @param topic  //     * @param key  //     */  //    @KafkaListener(id = "id3", topics = Constant.TOPIC, groupId = "group3")  //    public void kafkaListener(@Payload String message,  //                              @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,  //                              @Header(KafkaHeaders.RECEIVED_PARTITION_ID) String partition,  //                              @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {  //        Long threadId = Thread.currentThread().getId();  //        log.info("consumer:group3 --> message:{}, topic:{}, partition:{}, key:{}, threadId:{}", message, topic, partition, key, threadId);  //    }  //  //    /**  //     * 監聽 topic 進行批量消費  //     * @param messages  //     */  //    @KafkaListener(id = "id4", topics = Constant.TOPIC, groupId = "group4")  //    public void kafkaListener(List<String> messages) {  //        for(String msg:messages){  //            log.info("consumer:group4 --> message:{}", msg);  //        }  //    }  //  //    /**  //     * 監聽topic並手動提交偏移量  //     * @param messages  //     * @param acknowledgment  //     */  //    @KafkaListener(id = "id5", topics = Constant.TOPIC,groupId = "group5")  //    public void kafkaListener(List<String> messages, Acknowledgment acknowledgment) {  //        for(String msg:messages){  //            log.info("consumer:group5 --> message:{}", msg);  //        }  //        // 觸發提交offset偏移量  //        acknowledgment.acknowledge();  //    }  //  //    /**  //     * 模糊匹配多個 Topic  //     * @param message  //     */  //    @KafkaListener(id = "id6", topicPattern = "test.*",groupId = "group6")  //    public void annoListener2(String message) {  //        log.error("consumer:group6 --> message:{}", message);  //    }        /**       * 完整consumer       * @return       */      @KafkaListener(id = "id7", topics = {Constant.TOPIC}, groupId = "group7")      public boolean consumer4(List<ConsumerRecord<?, ?>> data) {          for (int i=0; i<data.size(); i++) {              ConsumerRecord<?, ?> record = data.get(i);              Optional<?> kafkaMessage = Optional.ofNullable(record.value());                Long threadId = Thread.currentThread().getId();              if (kafkaMessage.isPresent()) {                  Object message = kafkaMessage.get();                  log.info("consumer:group7 --> message:{}, topic:{}, partition:{}, key:{}, offset:{}, threadId:{}", message.toString(), record.topic(), record.partition(), record.key(), record.offset(), threadId);              }          }            return true;      }    }

生產者服務

@Service  public class KafkaProducerService {        @Autowired      private KafkaTemplate kafkaTemplate;        /**       * producer 同步方式發送數據       * @param topic    topic名稱       * @param key      一般用業務id,相同業務在同一partition保證消費順序       * @param message  producer發送的數據       */      public void sendMessageSync(String topic, String key, String message) throws InterruptedException, ExecutionException, TimeoutException {          // 默認輪詢partition          kafkaTemplate.send(topic, message).get(10, TimeUnit.SECONDS);  //        // 根據key進行hash運算,再將運算結果寫入到不同partition  //        kafkaTemplate.send(topic, key, message).get(10, TimeUnit.SECONDS);  //        // 第二個參數為partition,當partition和key同時設置時partition優先。  //        kafkaTemplate.send(topic, 0, key, message);  //        // 組裝消息  //        Message msg = MessageBuilder.withPayload("Send Message(payload,headers) Test")  //                .setHeader(KafkaHeaders.MESSAGE_KEY, key)  //                .setHeader(KafkaHeaders.TOPIC, topic)  //                .setHeader(KafkaHeaders.PREFIX,"kafka_")  //                .build();  //        kafkaTemplate.send(msg).get(10, TimeUnit.SECONDS);  //        // 組裝消息  //        ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test", "Send ProducerRecord(topic,value) Test");  //        kafkaTemplate.send(producerRecord).get(10, TimeUnit.SECONDS);      }        /**       * producer 非同步方式發送數據       * @param topic    topic名稱       * @param message  producer發送的數據       */      public void sendMessageAsync(String topic, String message) {          ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(topic, message);            // 設置非同步發送消息獲取發送結果後執行的動作          ListenableFutureCallback listenableFutureCallback = new ListenableFutureCallback<SendResult<Integer, String>>() {              @Override              public void onSuccess(SendResult<Integer, String> result) {                  System.out.println("success");              }                @Override              public void onFailure(Throwable ex) {                  System.out.println("failure");              }          };            // 將listenableFutureCallback與非同步發送消息對象綁定          future.addCallback(listenableFutureCallback);      }        public void test(String topic, Integer partition, String key, String message) throws InterruptedException, ExecutionException, TimeoutException {          kafkaTemplate.send(topic, partition, key, message).get(10, TimeUnit.SECONDS);      }  }

web 測試

@RestController  public class KafkaProducerController {        @Autowired      private KafkaProducerService producerService;        @GetMapping("/sync")      public void sendMessageSync(@RequestParam String topic) throws InterruptedException, ExecutionException, TimeoutException {          producerService.sendMessageSync(topic, null, "同步發送消息測試");      }        @GetMapping("/async")      public void sendMessageAsync(){          producerService.sendMessageAsync("test","非同步發送消息測試");      }        @GetMapping("/test")      public void test(@RequestParam String topic, @RequestParam(required = false) Integer partition, @RequestParam(required = false) String key, @RequestParam String message) throws InterruptedException, ExecutionException, TimeoutException {          producerService.test(topic, partition, key, message);      }    }

7. AD

如果您覺得寫的還不錯,請關注公眾號 【當我遇上你】, 您的支援是我最大的動力。