Spring Cloud非同步場景分散式事務怎樣做?試試RocketMQ

  • 2019 年 10 月 3 日
  • 筆記

mark

一、背景

在微服務架構中,我們常常使用非同步化的手段來提升系統的 吞吐量解耦 上下游,而構建非同步架構最常用的手段就是使用 消息隊列(MQ),那非同步架構怎樣才能實現數據一致性呢?本文主要介紹如何使用RocketMQ事務消息來解決一致性問題。

RocketMQ 是阿里巴巴開源的分散式消息中間件,目前已成為 Apache 的頂級項目。歷經多次天貓雙十一海量消息考驗,具有高性能、低延時和高可靠等特性

PS:同步場景怎樣保證一致性?請看文章《Spring Cloud同步場景分散式事務怎樣做?試試Seata

 

二、MQ選型

可以看到在 業務處理 方面來說 RocketMQ 優於其他對手,而且原生支援 事務消息

mark

PS:業務系統用的是其他 MQ 產品但是又需要 事務消息 怎麼辦?學習原理自己開發實現!

 

三、什麼是事務消息

例如下圖的場景:生成訂單記錄 -> MQ -> 增加積分

mark

我們是應該先 創建訂單記錄,還是先 發送MQ消息 呢?

  1. 先發送MQ消息:這個明顯是不行的,因為如果消息發送成功,而訂單創建失敗的話是沒辦法把消息收回來的

  2. 先創建訂單記錄:如果訂單創建成功後MQ消息發送失敗 拋出異常,因為兩個操作都在本地事務中所以訂單數據是可以 回滾

上面的 方式二 看似沒問題,但是 網路是不可靠的!如果 MQ 的響應因為網路原因沒有收到,所以在面對不確定的結果只好進行回滾;但是 MQ 端又確實是收到了這條消息的,只是回給客戶端的 響應丟失 了!
 
所以 事務消息 就是用來保證 本地事務MQ消息發送 的原子性!

 

四、RocketMQ事務消息原理

mark

主要的邏輯分為兩個流程:

  • 事務消息發送及提交
    1. 發送 half消息
    2. MQ服務端 響應消息寫入結果
    3. 根據發送結果執行 本地事務(如果寫入失敗,此時half消息對業務 不可見,本地邏輯不執行)
    4. 根據本地事務狀態執行 Commit 或者 Rollback(Commit操作生成消息索引,消息對消費者 可見
       
  • 回查流程
    1. 對於長時間沒有 Commit/Rollback 的事務消息(pending 狀態的消息),從服務端發起一次 回查
    2. Producer 收到回查消息,檢查回查消息對應的 本地事務狀態
    3. 根據本地事務狀態,重新 Commit 或者 Rollback

 
邏輯時序圖

mark

 

五、非同步架構一致性實現思路

從上面的原理可以發現 事務消息 僅僅只是保證本地事務和MQ消息發送形成整體的 原子性,而投遞到MQ伺服器後,並無法保證消費者一定能消費成功!
 
如果 消費端消費失敗 後的處理方式,建議是記錄異常資訊然後 人工處理,並不建議回滾上游服務的數據(因為兩者是 解耦 的,而且 複雜度 太高)
 
我們可以利用 MQ 的兩個特性 重試死信隊列 來協助消費端處理:

  1. 消費失敗後進行一定次數的 重試
  2. 重試後也失敗的話該消息丟進 死信隊列
  3. 另外起一個執行緒監聽消費 死信隊列 里的消息,記錄日誌並且預警!

因為有 重試 所以消費者需要實現 冪等性

 

六、分散式事務場景樣例

下面就用剛剛提到的場景:生成訂單記錄 -> MQ -> 增加積分;來簡單講一下 Spring Cloud 中應該怎麼做,詳細程式碼請 下載demo 查看。
PS:怎樣安裝部署RocketMQ可以參考《Apache RocketMQ 消息隊列部署與可視化介面安裝

6.1. 引入依賴

使用 spring-cloud-stream 框架來訪問 RocketMQ

mark

Spring Cloud Stream 是一個構建消息驅動的框架,通過抽象的定義實現應用與MQ消息隊列之間的解耦,目前支援 RabbitMQkafkaRocketMQ
mark

 

6.2. 開啟事務消息

消息生產者需要添加 transactional: true 開啟 事務消息

mark

 

6.3. 訂單服務發送half消息

mark

因為開啟了 事務消息 所以這裡發送的是 half消息 對於消費端是 不可見

 

6.4. 訂單服務監聽half消息

使用 @RocketMQTransactionListener 註解監聽 半消息,並實現 RocketMQLocalTransactionListener 介面,該介面有兩個方法

  • executeLocalTransaction:用於提交本地事務
  • checkLocalTransaction:用於事務回查

mark

如果提交事務消息失敗,需等待約1分鐘左右 事務回查 方法才會被調用

 

6.5. 積分服務消費消息

mark

注意:因為有 重試,這裡如果是真實的業務需要自行實現 冪等性

 

6.6. 消費死信隊列預警

mark

監聽並消費死信隊列中的消息,用於記錄錯誤日誌,並且預警通知運維人員等

 

6.7. 測試用例

demo中提供了3個介面分別測試不同的場景:

  • 事務成功
    http://localhost:11002/success
    流程如下:
    1. 訂單創建 成功
    2. 提交事務消息 成功
    3. 消費消息增加積分 成功
  • 訂單創建成功但提交事務消息失敗
    http://localhost:11002/produceError
    流程如下:
    1. 訂單創建 成功
    2. 提交事務消息 失敗
    3. 事務回查(等待1分鐘左右) 成功
    4. 提交事務消息 成功
    5. 消費消息增加積分 成功
  • 消費消息失敗
    http://localhost:11002/consumeError
    流程如下:
    1. 訂單創建 成功
    2. 提交事務消息 成功
    3. 消費消息增加積分 失敗
    4. 重試消費消息 失敗
    5. 進入死信隊列 成功
    6. 消費死信隊列的消息 成功
    7. 記錄日誌並發出預警 成功

 

七、demo下載地址

https://gitee.com/zlt2000/microservices-platform/tree/master/zlt-demo/rocketmq-demo/rocketmq-transactional

 

推薦閱讀

 
掃碼關注有驚喜!

file