Kafka消息隊列
之前也學習過消息隊列,但一直沒有使用的場景,今天項目中遇到了 kafka 那便有了應用場景
1. Kafka
Kafka 是一個分散式、支援分區,多副本的基於 zookeeper 的消息隊列。使用消息隊列,是應用 A 將要處理的資訊發送到消息隊列然後繼續下面的任務,需要該資訊的應用 B 從消息隊列裡面獲取資訊再做處理,這樣做像是多此一舉,應用 A 直接發資訊給應用 B 不就可以了嗎?存在即合理,使用消息隊列其作用如下:
- 非同步處理:用戶註冊後發送郵件、簡訊、驗證碼等可以非同步處理,使註冊這個過程寫入資料庫後就可立即返回
- 流量消峰:秒殺活動超過閾值的請求丟棄轉向錯誤頁面,然後根據消息隊列的消息做業務處理
- 日誌處理:可以將error的日誌單獨給消息隊列進行持久化處理
- 應用解耦:購物的下單操作,訂單系統與庫存系統中間加消息隊列,使二者解耦,若後者故障也不會導致消息丟失
之前 筆者也寫過 RabbitMQ 的筆記,傳送門
2. 生產消費模型
結合 kafka 的下面這些名詞來解釋其模型會更加容易理解
名稱 | 解釋 |
---|---|
Broker | kafka 的實例,部署多台 kafka 就是有多個 broker |
Topic | 消息訂閱的話題,是這些消息的分類,類似於消息訂閱的頻道 |
Producer | 生產者,負責往 kafka 發送消息 |
Consumer | 消費者,從 kafka 讀取消息來進行消費 |
3. 安裝部署
kafka 和依賴的 zookeeper 是 java 編寫的工具,其需要 jdk8 及其以上。筆者這裡使用 Docker 安裝,偷懶了貪圖方便快捷
# 使用 wurstmeister 製作的鏡像
docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka
# 啟動 zookeeper
docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeper
# 單機啟動 kafka
docker run -d --name kafka -p 9092:9092 \
-e KAFKA_BROKER_ID=0 \
-e KAFKA_ZOOKEEPER_CONNECT=xxx.xxx.xxx.xxx:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://xxx.xxx.xxx.xxx:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka
4. Quickstart
kafka 官網也有很好的介紹,quickstart
# 進入kafka容器
docker exec -it kafka /bin/sh
# 進入 bin 目錄
cd /opt/kafka_2.13-2.8.1/bin
# partitions 分區
# replication 副本因子
# 創建一個主題(參數不懂可直接填寫,後面會講解說明)
./kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic quickstart-events --bootstrap-server localhost:9092
# 查看
./kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
# 寫入 topic(回車表示一條消息,ctrl + c 結束輸入)
# 消息默認存儲 7 天,下一步的消費可以驗證
./kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
This is my first event
This is my second event
# 讀取 topic(運行多次可以讀取消息,因為默認存儲 7 天)
./kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
./kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
5. SpringBoot 集成
SpringBoot 集成了 Kafka,添加依賴後可使用內置的 KafkaTemplate 模板方法來操作 kafka 消息隊列
5.1 添加依賴
<!-- sprinboot版本管理中有kafka可不寫版本號 -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
5.2 配置文件
server:
port: 8080
spring:
# 消息隊列
kafka:
producer:
# broker地址,重試次數,確認接收個數,消息的編解碼方式
bootstrap-servers: 101.200.197.22:9092
retries: 3
acks: 1
key-serializer: org.springframework.kafka.support.serializer.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.StringSerializer
consumer:
# broker地址,自動提交,分區offset設置
bootstrap-servers: 101.200.197.22:9092
enable-auto-commit: false
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
5.3 生產者
@RestController
@RequestMapping("/kafka")
public class Producer {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
@GetMapping("/producer1")
public String sendMessage1(@RequestParam(value = "message", defaultValue = "123") String message) throws ExecutionException, InterruptedException {
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("topic1", message);
SendResult<String, Object> sendResult = future.get();
return sendResult.toString();
}
@GetMapping("/producer2")
public String sendMessage2(@RequestParam(value = "message", defaultValue = "123") String message) throws ExecutionException, InterruptedException {
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("topic1", message);
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable ex) {
System.out.println("faile");
}
@Override
public void onSuccess(SendResult<String, Object> result) {
System.out.println("success");
}
});
return "";
}
}
5.4 消費者
@Component
public class Consumer {
@KafkaListener(topics = {"topic1"})
public void onMessage(ConsumerRecord<?, ?> record) {
System.out.println(record.value());
}
}
6. 存儲目錄結構
kafka
|____kafka-logs
|____topic1
| |____00000000000000000000.log(存儲接收的消息)
| |____consumer_offsets-01(消費者偏移量)
| |____consumer_offsets-02
|____topic2
|____00000000000000000000.log
|____consumer_offsets-01
|____consumer_offsets-02
每台 broker 實例接收到消息後將之存儲到 00000.log 裡面,保存的方式是先入先出。消息被消費後不會被刪除,相反可以設置 topic 的消息保留時間,重要的是 Kafka 的性能在數據大小方面實際上是恆定的,因此長時間存儲數據是完全沒問題的
消費者會將自己消費偏移量 offset 提交給 topic 在 _consumer_offsets 裡面保存,然後通過偏移量來確定消息的位置,默認從上次消費的位置開始,添加參數 –frombeginning 則從頭開始消費,可獲取之前所有存儲的消息。kafka 也會定期清除內部的消息,直到保存最新的一條(文件保存的消息默認保存 7 天)
7. 消費組
這個在筆者配置消費者的時候發現的問題,啟動時報錯說沒有指定消費組
- 每條分區消息只能被同組的一個消費者消費,consumer1 和 consumer2 同組,所以只有其中一個能消費同條消息
- 每條分區消息能被不同組的單個消費者消費,consumer2 和 consumer4 不同組,所以都能消費同條消息
- 以上二個規則同時成立
- 其作用是可以保證消費順序,同個分區里的消息會被同個消費者順序消費
8. 分區和副本
topic 消息保存的文件 0000.log 可以進行物理切分,這就是分區的概念,類似於資料庫的分庫分表。這樣做的好處在於單個保存的文件不會太大從而影響性能,最重要的是分區後不是單個文件串列執行了,而是多區多文件可並行執行提高了並發能力
分區:消費者會消費同一 topic 的不同分區,所以會保存不同分區的偏移量,其格式為:GroupId + topic + 分區號
副本:副本是對分區的備份,集群中不同的分區在不同的 broker 上,但副本會對該分區備份到指定數量的 broker 上,這些副本有 leader 和 follower 的區別,leader負責讀寫,掛了再重新選舉,副本為了保持數據一致性
9. 常見問題
9.1 生產者同步和非同步消息
生產者發送消息給 broker,之後 broker 會響應 ack 給生產者,生產者等待接收 ack 訊號 3 秒,超時則重試 3 次
生產者 ack 確認配置:
- ack = 0:不需要同步消息
- ack = 1:則 leader 收到消息,並保存到本地 log 之後才響應 ack 資訊
- ack 默認配置為 2
9.2 消費者自動提交和手動提交
- 自動提交:消費者 pull 消息之後馬上將自身的偏移量提交到 broker 中,這個過程是自動的
- 手動提交:消費者 pull 消息時或之後,在程式碼里將偏移量提交到 broker
- 二者區別:防止消費者 pull 消息之後掛掉,在消息還沒消費但又提交了偏移量
9.3 消息丟失和重複消費
- 消息丟失
- 生產者:配置 ack ,以及配置副本和分區數值一致
- 消費者:設置手動提交
- 重複消費
- 設置唯一主鍵,Mysql 主鍵唯一則插入失敗
- 分散式鎖
9.4 順序消費方案
- 生產者:關閉重試,使用同步發送,成功了再發下一條
- 消費者:消息發送到一個分區中,只有一個消費組的消費者能接收消息