實際業務處理 Kafka 消息丟失、重複消費和順序消費的問題

關於 Kafka 消息丟失、重複消費和順序消費的問題

消息丟失,消息重複消費,消息順序消費等問題是我們使用 MQ 時不得不考慮的一個問題,下面我結合實際的業務來和你分享一下解決方案。

消息丟失問題

比如我們使用 Kakfa 時,以下場景都會發生消息丟失:

  • producer -> broker (生產者生產消息)
  • broker -> broker (集群環境,broker 同步給其他 broker)
  • broker -> consumer (消費者消費消息)

解決方案也很簡單,設置 acks(消息確認機制)retries(重試機制)factor(設置 partition 數量)…

一般來說,最常見的消息丟失場景就是:consumer 消費消息

要保證 consumer 消費消息時不丟失消息,必須使用手動提交 ack

我們業務是這樣實現的:

  1. Kafka 拉取消息(一次批量拉取 100條)
  2. 為每條消息分配一個 msgId(遞增)
  3. msgId 存入內存隊列(sortSet)
  4. 使用 Map 存儲 msgIdmsg (包含 offset)的映射關係
  5. 當業務處理完消息後,獲取當前消息的 msgId,然後從 sortSet 中刪除該 msgId(表示該消息已經處理過了)
  6. ack 時,如果當前 msgId <= sortSet(msgId 在 sortSet 中是從小到大排列) ,就提交當前 offset
  7. 就算 consumer 在處理消息時掛了,下次重啟時就會從 sortSet 隊首的消息開始拉取,實現至少處理一次語義。
  8. 步驟 7 存在一個問題:當消息處理完後,還沒從 sortSet 中刪除該 msgId,系統就掛了,當系統重啟時,又會重新處理一次剛剛已處理過的消息,這就引出消息重複消費的問題了。

消息重複消費

要解決消息重複消費,也就是要實現冪等(冪等就是:多次請求,但結果保持不變,舉一個例子你就明白了:在 http 中,你發送同一個 get 請求,無論發送多少次,返回結果都是一樣的

回到我們的業務場景上,我以處理訂單消息為例:

  • 冪等Key 由我們的訂單Id + 訂單狀態組成(一筆訂單的狀態只會處理一次)

  • 在處理之前,我們首先會去 Redis 查詢是否存在這個 Key

    如果存在,說明我們已經處理過了,直接丟掉;

    ​ 如果不存在,說明沒處理過,繼續往下處理;

  • 最終的邏輯是:將處理過的數據存到DB上,再把 冪等Key 存到 Redis

顯然一般場景下 Redis 是無法保證冪等的

所以Redis只是一個前置處理,最終的冪等性依賴 DB唯一Key(訂單Id+訂單狀態)

總的來說就是:通過 Redis 做前置處理,DB 唯一索引做最終保證實現冪等性

消息順序消費

消息的順序性很好理解,還是以訂單處理為例

訂單的狀態有:支付、確認收貨、完成等等,而訂單下還有計費、退款的消息報

理論上來說:支付的消息肯定要比退款的消息先到。

但是程序處理的過程就不一定了,所以我們處理消息順序消費的流程如下:

  • 寬表:創建一張寬表,唯一索引是 訂單Id,將訂單的每個狀態拆分為一個列,當消息來了,只更新對應的字段就好,消息只會存在短暫的狀態不一致問題,但是最終狀態是一致的
  • 消息補償機制
  • 把相同的 userID/orderId 發送到相同的 partition(因為一個 consumer 消費一個 partition)
Tags: