RocketMQ 原理:消息存儲、高可用、消息重試、消息冪等性

消息存儲

消息存儲方式

非持久化

image

  1. 消息生成者發送消息到 MQ

  2. MQ 返回 ACK(Acknowledge Character)給生產者

  3. MQ push 消息給對應的消費者

  4. 消息消費者返回 ACK 給 MQ

持久化

image

  1. 消息生成者發送消息到 MQ

  2. MQ 收到消息,將消息進行持久化,存儲該消息

  3. MQ 返回 ACK 給生產者

  4. MQ push 消息給對應的消費者

  5. 消息消費者返回 ACK 給 MQ

  6. MQ 刪除消息

注意:

①第 5 步 MQ 在指定時間內接到消息消費者返回 ACK,MQ 認定消息消費成功,執行 6 。

②第 5 步 MQ 在指定時間內未接到消息消費者返回 ACK,MQ 認定消息消費失敗,重新執行 4、5、6 。

消息存儲介質

image

數據庫

  • 實現:ActiveMQ

  • 缺點:數據庫瓶頸將成為 MQ 瓶頸

文件系統

  • 實現:RocketMQ/Kafka/RabbitMQ

  • 解決方案:採用消息刷盤機制進行數據存儲

  • 缺點:硬盤損壞的問題無法避免

消息存儲與讀寫方式

SSD(Solid State Disk):固態硬盤

  • 隨機寫(100 KB/s)

  • 順序寫(600 MB/s):1秒1部電影

Linux 系統發送數據的方式

  • 「零拷貝」技術
    • 數據傳輸由傳統的 4 次複製簡化成 3 次複製,減少 1 次複製過程
    • Java 語言中使用 MappedByteBuffer 類實現了該技術
    • 要求:預留存儲空間,用於保存數據(1G 存儲空間起步)

image

消息存儲結構

image

如圖所示,MQ 數據存儲區域包含如下內容:

  • 消息數據存儲區域
    • topic
    • queueId
    • message
  • 消費邏輯隊列
    • minOffset
    • maxOffset
    • consumerOffset
  • 索引
    • key 索引
    • 創建時間索引
    • ……

刷盤機制

同步刷盤

image

  1. 生產者發送消息到 MQ,MQ 接到消息數據

  2. MQ 掛起生產者發送消息的線程

  3. MQ 將消息數據寫入內存

  4. 內存數據寫入硬盤

  5. 磁盤存儲後返回 SUCCESS

  6. MQ 恢復掛起的生產者線程

  7. 發送 ACK 到生產者

異步刷盤

image

  1. 生產者發送消息到 MQ,MQ 接到消息數據

  2. MQ 將消息數據寫入內存

  3. 發送 ACK 到生產者

小結

  • 同步刷盤:安全性高,效率低,速度慢(適用於對數據安全要求較高的業務)
  • 異步刷盤:安全性低,效率高,速度快(適用於對數據處理速度要求較高的業務)
# 刷盤方式
#- ASYNC_FLUSH 異步刷盤
#- SYNC_FLUSH 同步刷盤
flushDiskType=SYNC_FLUSH

高可用

高可用實現

nameserver

  • 無狀態 + 全服務器註冊

消息服務器

  • 主從架構(2M-2S)

消息生產

  • 生產者將相同的 topic 綁定到多個 group 組,保證即使 broker master 掛掉,其他 master 仍可正常進行消息接收。

消息消費

  • RocketMQ 自身會根據 broker master 的壓力確認是否由 master 承擔消息讀取的功能,當 master 繁忙時候,自動切換由 slave 承擔數據讀取的工作。

主從複製

同步複製

  • master 接到消息後,先複製到 slave,然後反饋給生產者寫操作成功
  • 優點:數據安全,不丟數據,出現故障容易恢復
  • 缺點:影響數據吞吐量,整體性能低

異步複製

  • master 接到消息後,立即返回給生產者寫操作成功,當消息達到一定量後再異步複製到slave
  • 優點:數據吞吐量大,操作延遲低,性能高
  • 缺點:數據不安全,會出現數據丟失的現象,一旦 master 出現故障,從上次數據同步到故障時間的數據將丟失

配置方式

#Broker 的角色
#- ASYNC_MASTER 異步複製Master
#- SYNC_MASTER 同步雙寫Master
#- SLAVE
brokerRole=SYNC_MASTER

負載均衡

Producer 負載均衡:

  • 內部實現了不同 broker 集群中對同一 topic 對應消息隊列的負載均衡

Consumer 兩種負載均衡策略

  • 平均分配
    image

  • 循環平均分配
    image

消息重試

當消息消費後未正常返回消費成功的信息將啟動消息重試機制

兩種消息重試機制:

  • 順序消息重試

  • 無序消息重試

順序消息重試

  • 當消費者消費消息失敗後,RocketMQ 會自動進行消息重試(每次間隔時間為 1 秒)。
  • 注意:應用會出現消息消費被阻塞的情況,因此,要對順序消息的消費情況進行監控,避免阻塞現象的發生。

image

無序消息重試

  • 無序消息包括普通消息、定時消息、延時消息、事務消息。
  • 無序消息重試僅適用於負載均衡(集群)模型下的消息消費,不適用於廣播模式下的消息消費。
  • 為保障無序消息的消費,MQ 設定了合理的消息重試間隔時長。

image

死信隊列

概念

  • 當消息消費重試到達了指定次數(默認 16 次)後,MQ 將無法被正常消費的消息稱為死信消息(Dead-Letter Message)。

  • 死信消息不會被直接拋棄,而是保存到了一個全新的隊列中,該隊列稱為死信隊列(Dead-Letter Queue)。

死信隊列的特徵

  • 歸屬某一個組(Gourp Id),而不歸屬 Topic,也不歸屬消費者。
  • 一個死信隊列中可以包含同一個組下的多個 Topic 中的死信消息。
  • 死信隊列不會進行默認初始化,當第一個死信出現後,此隊列首次初始化。

死信隊列中的消息的特徵

  • 不會被再次重複消費。
  • 死信隊列中的消息有效期為 3 天,達到時限後將被清除。

死信處理

  • 在監控平台中,通過查找死信,獲取死信的 messageId,然後通過 id 對死信進行精準消費。

消息冪等

消息重複消費

消息重複消費原因

  • 生產者發送了重複的消息
    • 網絡閃斷
    • 生產者宕機
  • 消息服務器投遞了重複的消息
    • 網絡閃斷
  • 動態的負載均衡過程
    • 網絡閃斷/抖動
    • broker重啟
    • 訂閱方應用重啟(消費者)
    • 客戶端擴容
    • 客戶端縮容

image

消息冪等

對同一條消息,無論消費多少次,結果保持一致,稱為消息冪等性

解決方案

  1. 使用業務 id 作為消息的 key 。

  2. 在消費消息時,客戶端對 key 做判定,未使用過放行,使用過拋棄。

  • 注意:messageId 由 RocketMQ 產生,messageId 並不具有唯一性,不能作用冪等判定條件。

常見的冪等方法示例

  • 新增(不冪等):insert into order values(……)
  • 查詢(冪等)
  • 刪除(冪等):delete from 表 where id=1
  • 修改(不冪等):update account set balance = balance+100 where no=1
  • 修改(冪等):update account set balance = 100 where no=1