Spring Cloud非同步場景分散式事務怎樣做?試試RocketMQ
- 2019 年 10 月 3 日
- 筆記
一、背景
在微服務架構中,我們常常使用非同步化的手段來提升系統的 吞吐量 和 解耦 上下游,而構建非同步架構最常用的手段就是使用 消息隊列(MQ)
,那非同步架構怎樣才能實現數據一致性呢?本文主要介紹如何使用RocketMQ
的事務消息
來解決一致性問題。
RocketMQ 是阿里巴巴開源的分散式消息中間件,目前已成為 Apache 的頂級項目。歷經多次天貓雙十一海量消息考驗,具有高性能、低延時和高可靠等特性
PS:同步場景怎樣保證一致性?請看文章《Spring Cloud同步場景分散式事務怎樣做?試試Seata》
二、MQ選型
可以看到在 業務處理 方面來說 RocketMQ
優於其他對手,而且原生支援 事務消息
PS:業務系統用的是其他 MQ
產品但是又需要 事務消息 怎麼辦?學習原理自己開發實現!
三、什麼是事務消息
例如下圖的場景:生成訂單記錄 -> MQ -> 增加積分
我們是應該先 創建訂單記錄,還是先 發送MQ消息 呢?
-
先發送MQ消息:這個明顯是不行的,因為如果消息發送成功,而訂單創建失敗的話是沒辦法把消息收回來的
-
先創建訂單記錄:如果訂單創建成功後MQ消息發送失敗 拋出異常,因為兩個操作都在本地事務中所以訂單數據是可以 回滾 的
上面的 方式二 看似沒問題,但是 網路是不可靠的!如果 MQ
的響應因為網路原因沒有收到,所以在面對不確定的結果只好進行回滾;但是 MQ
端又確實是收到了這條消息的,只是回給客戶端的 響應丟失 了!
所以 事務消息
就是用來保證 本地事務 與 MQ消息發送 的原子性!
四、RocketMQ事務消息原理
主要的邏輯分為兩個流程:
- 事務消息發送及提交:
- 發送
half消息
MQ服務端
響應消息寫入結果- 根據發送結果執行
本地事務
(如果寫入失敗,此時half消息對業務 不可見,本地邏輯不執行) - 根據本地事務狀態執行
Commit
或者Rollback
(Commit操作生成消息索引,消息對消費者 可見)
- 發送
- 回查流程:
- 對於長時間沒有
Commit/Rollback
的事務消息(pending
狀態的消息),從服務端發起一次 回查 Producer
收到回查消息,檢查回查消息對應的本地事務狀態
- 根據本地事務狀態,重新
Commit
或者Rollback
- 對於長時間沒有
邏輯時序圖
五、非同步架構一致性實現思路
從上面的原理可以發現 事務消息
僅僅只是保證本地事務和MQ消息發送形成整體的 原子性
,而投遞到MQ伺服器後,並無法保證消費者一定能消費成功!
如果 消費端消費失敗 後的處理方式,建議是記錄異常資訊然後 人工處理,並不建議回滾上游服務的數據(因為兩者是 解耦 的,而且 複雜度 太高)
我們可以利用 MQ
的兩個特性 重試
和 死信隊列
來協助消費端處理:
- 消費失敗後進行一定次數的
重試
- 重試後也失敗的話該消息丟進
死信隊列
里 - 另外起一個執行緒監聽消費
死信隊列
里的消息,記錄日誌並且預警!
因為有 重試
所以消費者需要實現 冪等性
六、分散式事務場景樣例
下面就用剛剛提到的場景:生成訂單記錄 -> MQ -> 增加積分;來簡單講一下 Spring Cloud
中應該怎麼做,詳細程式碼請 下載demo 查看。
PS:怎樣安裝部署RocketMQ可以參考《Apache RocketMQ 消息隊列部署與可視化介面安裝》
6.1. 引入依賴
使用 spring-cloud-stream
框架來訪問 RocketMQ
Spring Cloud Stream 是一個構建消息驅動的框架,通過抽象的定義實現應用與MQ消息隊列之間的解耦,目前支援
RabbitMQ
、kafka
和RocketMQ
6.2. 開啟事務消息
消息生產者需要添加 transactional: true
開啟 事務消息
6.3. 訂單服務發送half消息
因為開啟了
事務消息
所以這裡發送的是half消息
對於消費端是不可見
的
6.4. 訂單服務監聽half消息
使用 @RocketMQTransactionListener
註解監聽 半消息,並實現 RocketMQLocalTransactionListener
介面,該介面有兩個方法
- executeLocalTransaction:用於提交本地事務
- checkLocalTransaction:用於事務回查
如果提交事務消息失敗,需等待約1分鐘左右 事務回查 方法才會被調用
6.5. 積分服務消費消息
注意:因為有
重試
,這裡如果是真實的業務需要自行實現冪等性
6.6. 消費死信隊列預警
監聽並消費死信隊列中的消息,用於記錄錯誤日誌,並且預警通知運維人員等
6.7. 測試用例
demo中提供了3個介面分別測試不同的場景:
- 事務成功
http://localhost:11002/success
流程如下:- 訂單創建 成功
- 提交事務消息 成功
- 消費消息增加積分 成功
- 訂單創建成功但提交事務消息失敗
http://localhost:11002/produceError
流程如下:- 訂單創建 成功
- 提交事務消息 失敗
- 事務回查(等待1分鐘左右) 成功
- 提交事務消息 成功
- 消費消息增加積分 成功
- 消費消息失敗
http://localhost:11002/consumeError
流程如下:- 訂單創建 成功
- 提交事務消息 成功
- 消費消息增加積分 失敗
- 重試消費消息 失敗
- 進入死信隊列 成功
- 消費死信隊列的消息 成功
- 記錄日誌並發出預警 成功
七、demo下載地址
推薦閱讀
- 日誌排查問題困難?分散式日誌鏈路跟蹤來幫你
- zuul集成Sentinel最新的網關流控組件
- 阿里註冊中心Nacos生產部署方案
- Spring Boot自定義配置項在IDE裡面實現自動提示
- Spring Cloud Zuul的動態路由怎樣做?集成Nacos實現很簡單
- Spring Cloud開發人員如何解決服務衝突和實例亂竄?
- Spring Cloud同步場景分散式事務怎樣做?試試Seata
掃碼關注有驚喜!