Kafka消息隊列

之前也學習過消息隊列,但一直沒有使用的場景,今天項目中遇到了 kafka 那便有了應用場景

1. Kafka

Kafka 是一個分散式、支援分區,多副本的基於 zookeeper 的消息隊列。使用消息隊列,是應用 A 將要處理的資訊發送到消息隊列然後繼續下面的任務,需要該資訊的應用 B 從消息隊列裡面獲取資訊再做處理,這樣做像是多此一舉,應用 A 直接發資訊給應用 B 不就可以了嗎?存在即合理,使用消息隊列其作用如下:

  • 非同步處理:用戶註冊後發送郵件、簡訊、驗證碼等可以非同步處理,使註冊這個過程寫入資料庫後就可立即返回
  • 流量消峰:秒殺活動超過閾值的請求丟棄轉向錯誤頁面,然後根據消息隊列的消息做業務處理
  • 日誌處理:可以將error的日誌單獨給消息隊列進行持久化處理
  • 應用解耦:購物的下單操作,訂單系統與庫存系統中間加消息隊列,使二者解耦,若後者故障也不會導致消息丟失

之前 筆者也寫過 RabbitMQ 的筆記,傳送門

2. 生產消費模型

結合 kafka 的下面這些名詞來解釋其模型會更加容易理解

名稱 解釋
Broker kafka 的實例,部署多台 kafka 就是有多個 broker
Topic 消息訂閱的話題,是這些消息的分類,類似於消息訂閱的頻道
Producer 生產者,負責往 kafka 發送消息
Consumer 消費者,從 kafka 讀取消息來進行消費

3. 安裝部署

kafka 和依賴的 zookeeper 是 java 編寫的工具,其需要 jdk8 及其以上。筆者這裡使用 Docker 安裝,偷懶了貪圖方便快捷

# 使用 wurstmeister 製作的鏡像
docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka


# 啟動 zookeeper
docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeper


# 單機啟動 kafka
docker run  -d --name kafka -p 9092:9092 \
-e KAFKA_BROKER_ID=0 \
-e KAFKA_ZOOKEEPER_CONNECT=xxx.xxx.xxx.xxx:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://xxx.xxx.xxx.xxx:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka

4. Quickstart

kafka 官網也有很好的介紹,quickstart

# 進入kafka容器
docker exec -it kafka /bin/sh


# 進入 bin 目錄
cd /opt/kafka_2.13-2.8.1/bin


# partitions 分區
# replication 副本因子
# 創建一個主題(參數不懂可直接填寫,後面會講解說明)
./kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic quickstart-events --bootstrap-server localhost:9092


# 查看
./kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092


# 寫入 topic(回車表示一條消息,ctrl + c 結束輸入)
# 消息默認存儲 7 天,下一步的消費可以驗證
./kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
This is my first event
This is my second event


# 讀取 topic(運行多次可以讀取消息,因為默認存儲 7 天)
./kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092

./kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092

5. SpringBoot 集成

SpringBoot 集成了 Kafka,添加依賴後可使用內置的 KafkaTemplate 模板方法來操作 kafka 消息隊列

5.1 添加依賴

<!--  sprinboot版本管理中有kafka可不寫版本號  -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

5.2 配置文件

server:
  port: 8080

spring:
  # 消息隊列
  kafka:
    producer:
      # broker地址,重試次數,確認接收個數,消息的編解碼方式
      bootstrap-servers: 101.200.197.22:9092
      retries: 3
      acks: 1
      key-serializer: org.springframework.kafka.support.serializer.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.StringSerializer
    consumer:
      # broker地址,自動提交,分區offset設置
      bootstrap-servers: 101.200.197.22:9092
      enable-auto-commit: false
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

5.3 生產者

@RestController
@RequestMapping("/kafka")
public class Producer {

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    @GetMapping("/producer1")
    public String sendMessage1(@RequestParam(value = "message", defaultValue = "123") String message) throws ExecutionException, InterruptedException {
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("topic1", message);
        SendResult<String, Object> sendResult = future.get();
        return sendResult.toString();
    }

    @GetMapping("/producer2")
    public String sendMessage2(@RequestParam(value = "message", defaultValue = "123") String message) throws ExecutionException, InterruptedException {
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("topic1", message);
        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable ex) {
                System.out.println("faile");
            }

            @Override
            public void onSuccess(SendResult<String, Object> result) {
                System.out.println("success");
            }
        });
        return "";
    }
}

5.4 消費者

@Component
public class Consumer {

    @KafkaListener(topics = {"topic1"})
    public void onMessage(ConsumerRecord<?, ?> record) {
        System.out.println(record.value());
    }
}

6. 存儲目錄結構

kafka
|____kafka-logs
    |____topic1
    |	  |____00000000000000000000.log(存儲接收的消息)
    |	  |____consumer_offsets-01(消費者偏移量)
    |	  |____consumer_offsets-02
    |____topic2
    	  |____00000000000000000000.log
    	  |____consumer_offsets-01
    	  |____consumer_offsets-02

每台 broker 實例接收到消息後將之存儲到 00000.log 裡面,保存的方式是先入先出。消息被消費後不會被刪除,相反可以設置 topic 的消息保留時間,重要的是 Kafka 的性能在數據大小方面實際上是恆定的,因此長時間存儲數據是完全沒問題的

消費者會將自己消費偏移量 offset 提交給 topic 在 _consumer_offsets 裡面保存,然後通過偏移量來確定消息的位置,默認從上次消費的位置開始,添加參數 –frombeginning 則從頭開始消費,可獲取之前所有存儲的消息。kafka 也會定期清除內部的消息,直到保存最新的一條(文件保存的消息默認保存 7 天)

7. 消費組

這個在筆者配置消費者的時候發現的問題,啟動時報錯說沒有指定消費組

  • 每條分區消息只能被同組的一個消費者消費,consumer1 和 consumer2 同組,所以只有其中一個能消費同條消息
  • 每條分區消息能被不同組的單個消費者消費,consumer2 和 consumer4 不同組,所以都能消費同條消息
  • 以上二個規則同時成立
  • 其作用是可以保證消費順序,同個分區里的消息會被同個消費者順序消費

8. 分區和副本

topic 消息保存的文件 0000.log 可以進行物理切分,這就是分區的概念,類似於資料庫的分庫分表。這樣做的好處在於單個保存的文件不會太大從而影響性能,最重要的是分區後不是單個文件串列執行了,而是多區多文件可並行執行提高了並發能力

分區:消費者會消費同一 topic 的不同分區,所以會保存不同分區的偏移量,其格式為:GroupId + topic + 分區號

副本:副本是對分區的備份,集群中不同的分區在不同的 broker 上,但副本會對該分區備份到指定數量的 broker 上,這些副本有 leader 和 follower 的區別,leader負責讀寫,掛了再重新選舉,副本為了保持數據一致性

9. 常見問題

9.1 生產者同步和非同步消息

生產者發送消息給 broker,之後 broker 會響應 ack 給生產者,生產者等待接收 ack 訊號 3 秒,超時則重試 3 次

生產者 ack 確認配置:

  • ack = 0:不需要同步消息
  • ack = 1:則 leader 收到消息,並保存到本地 log 之後才響應 ack 資訊
  • ack 默認配置為 2

9.2 消費者自動提交和手動提交

  • 自動提交:消費者 pull 消息之後馬上將自身的偏移量提交到 broker 中,這個過程是自動的
  • 手動提交:消費者 pull 消息時或之後,在程式碼里將偏移量提交到 broker
  • 二者區別:防止消費者 pull 消息之後掛掉,在消息還沒消費但又提交了偏移量

9.3 消息丟失和重複消費

  • 消息丟失
    • 生產者:配置 ack ,以及配置副本和分區數值一致
    • 消費者:設置手動提交
  • 重複消費
    • 設置唯一主鍵,Mysql 主鍵唯一則插入失敗
    • 分散式鎖

9.4 順序消費方案

  • 生產者:關閉重試,使用同步發送,成功了再發下一條
  • 消費者:消息發送到一個分區中,只有一個消費組的消費者能接收消息
Tags: