分散式事務最終一致性-CAP框架輕鬆搞定
前言
對於分散式事務,常用的解決方案根據一致性的程度可以進行如下劃分:
- 強一致性(2PC、3PC):資料庫層面的實現,通過鎖定資源,犧牲可用性,保證數據的強一致性,效率相對比較低。
- 弱一致性(TCC):業務層面的實現,通過預留或鎖定部分資源,最後通過確認或取消操作完成事務的處理。比如A向B轉款500元,A帳號會凍結500元,其他操作正常,B接收轉款時,也不能直接入賬,而是將500元放到預留空間,只有經過確認之後,A才正式扣錢,B才正式入賬; 如果取消把A的500塊解凍,B也不會入賬。
- 最終一致性(本地消息表):不管經過多少個服務節點,最終數據一致就行。比如下單成功之後,需要庫存服務扣減庫存,如果庫存扣減失敗,不管是重試,還是最後人工處理,最後確保訂單和庫存數據能對上就行;為保證用戶體驗,及時通過中間狀態的形式回饋給用戶,比如常見的出票中、數據處理中等。
對於強一致性和弱一致性的解決方案一般針對數據一致性和時效性要求特別高的業務場景,通常會犧牲暫時的可用性來滿足一致性的要求;由於為保證一致性,會鎖定資源,在高並發的業務場景不是最佳選擇,所以很多系統在業務需求允許的情況下,基本上都會採用最終一致性方案。
正文
1.1 最終一致性簡述
顧名思義就是保證數據最後的一致性就行了。如果中間節點發生失敗,系統為了減少代價,一般不會自動回滾,而是通過重試機制和人工參與的方式對失敗數據進行處理,從而保證系統高並發場景下高可用的數據一致性需求。
1.2 解決方案
目前用得最多的方案是結合本地消息表進行實現,再加上後台任務、消息隊列中間件就可以更好的實現分散式事務的處理。

本地消息表:就是在對應業務資料庫中增加的一張消息表;這張表存儲業務產生的消息,通過本地事務保證業務數據和消息數據的一致性。在消息表中通過一個狀態來標識業務是否執行成功,如果失敗,後台任務就進行重試。
1.2.2 CAP框架簡介
CAP 是一個EventBus(事件匯流排),同時也是一個在微服務或者SOA系統中解決分散式事務問題的一個框架,基於CAP理論思想進行封裝的。採用模組化設計,具有高度的可擴展性,可靠並且易於更改。
對於分散式事務的處理,CAP 框架採用的是「非同步確保」這種方案,即本地消息表。官方支援的數據存儲方式有SQL Server、MySQL、PostgreSql、MongoDB、In-Memory(記憶體),由於是開源項目,社區大佬也提供了其他數據存儲支援,如:Oracle、SQLite、SmartSql等。
在分散式系統,各節點需要進行消息傳輸,CAP框架提供以下幾種方式RabbitMQ、Kafka、Redis Streams(Redis 5.0支援)、Azure Service Bus、Amazon SQS、In-Memory Queue,使用方式都差不多。
CAP的架構圖如下:

上圖簡要說明:
- 有兩個微服務,服務A和服務B;
- 服務A中通過本地事務的方式,將事件消息和業務邏輯進行事務保存(事件消息保存在本地消息表中),保證業務邏輯和消息的一致性和可靠性;關於消息的處理和保存CAP已經封裝在內部;
- CAP內部定時調度任務將消息發布到消息隊列中;
- 服務B訂閱到消息,將其保存到服務B的本地消息表中,CAP已經封裝好,只需按照說明使用即可;
- 如果業務處理失敗,服務B中集成的CAP會根據配置的定時任務策略進行重試,直到處理成功為止;
主要的理論就說那麼多,更多詳細內容,請進下方傳送門:
接下來就到擼碼時刻,CAP由於封裝比較好,所以使用起來比較簡單。
1.3 擼碼實踐
以下的業務場景是為了案例演示,目的是體現CAP的實踐,所以業務邏輯都只是模擬,切勿當真。
1.3.1 環境準備
演示中要用到RabbitMQ,為了安裝方便,這裡使用Docker的方式,直接通過鏡像運行,簡單,快速方便。關於Docker的實踐,後續會專門出系列文章。這裡就先總結一下Docker的安裝和RabbitMQ在Docker中的運行步驟,採用的主機環境是我之前買的阿里雲伺服器(CentOS 7);演示用的資料庫是SqlServer。
-
Docker安裝
1、移除移動舊版本
sudo yum remove docker \ docker-client \ docker-client-latest \ docker-common \ docker-latest \ docker-latest-logrotate \ docker-logrotate \ docker-engine2、安裝需要的依賴包
sudo yum install -y yum-utils3、設置鏡像倉庫
sudo yum-config-manager \ --add-repo \ //download.docker.com/linux/centos/docker-ce.repo4、更新Yum軟體包索引
sudo yum makecache fast # 提高安裝速度5、開始安裝Docker
sudo yum install docker-ce docker-ce-cli containerd.io6、啟動Docker
sudo systemctl start docker7、測試Docker
sudo docker run hello-world # 運行Hello-world
-
RabbitMQ在Docker中安裝和運行
1、一行命令直接指定鏡像運行,如果本地找不到鏡像,會去遠程倉儲里去找。
docker run -d --hostname my-rabbit --name cap-rabbit -p 8888:15672 -p 5672:5672 -p 5671:5671 -p 1883:1883 rabbitmq:3-management這裡先不細說命令了,後續聊Docker的時候好好說說。命令需要注意的是主機埠和容器埠的映射。
2、運行成功後就可以訪問啦,默認用戶名和密碼:guest/guest;
這裡訪問的地址埠是8888,那是在啟動容器的時候將主機埠8888和容器埠15672進行了映射。

這就是選擇Dokcer安裝的原因,超級快;如果用傳統的方式,還得安裝語言環境,還得配置,最後才能安裝;Docker通過鏡像的方式直接運行即可。
如果小夥伴新增用戶之後不能訪問,或者程式連接報錯,可以排查是否有許可權訪問,如下:

註:如果小夥伴用的是雲伺服器,需要配置安全組,允許埠訪問;另外如果程式和RabbitMq所在的主機不是同一台機器,主機防火牆也需要放開對應的埠。
1.3.2 開始擼碼
-
項目準備
這裡模擬兩個服務,一個是訂單服務,一個是庫存服務,兩都用到EF(Code First),如果小夥伴對EF入門還不熟,<<跟我一起學.NetCore之EF Core 實戰入門,一看就會>>這篇文章超詳細,肯定能幫到你; 所以接下來就上幾張關鍵的圖就行啦。
項目結構:

OrderDbContext:

Startup中註冊服務:

庫存服務的程式碼和這個類似。
通過遷移並更新到資料庫時,會生成如下資料庫和表:

-
集成CAP
這裡因為用的是RabbitMQ、SqlServer,所以需要引入以下幾個包;如果用其他消息隊列或資料庫,可以引入對應的包。
因為訂單服務是在Respository層使用CAP,所以對應的包就在這層引用;

庫存服務是直接在Controller那層引用,這裡就不重複截圖啦。
訂單服務和庫存服務都是在各自項目的Startup文件中註冊CAP相關服務,並配置相關資訊,如下圖:

集成完畢之後,啟動項目(不需要手動自己遷移),在各自業務資料庫中就自動生成兩個消息表,用於後續消息的存儲,如下:

-
編寫業務程式碼
訂單服務,在訂單生成成功之後,向庫存服務發送消息,業務邏輯如下:

圖中用到的_capPublisher是通過構造函數注入的。訂單服務其他層的程式碼就不用截圖了,就是簡單調用,源碼地址在文末。
庫存服務直接訂閱就行,演示案例中是直接在StockController中進行訂閱,如下:
// 標記為不實Action [NonAction] // 訂閱消息,參數和發布時指定的參數一致 [CapSubscribe("Order.Create.Success")] public void UpdateStock(OrderEntity order) { //throw new Exception("扣減庫存異常了~~~"); // 為了測試,庫存裡面沒有數據的話,先模擬一條數據 bool bHaveData = _stockDbContext.Stock.Any(); if(!bHaveData) { StockEntity stock = new StockEntity { Id = Guid.NewGuid(), ProductNo = "Product001", StockCount = 100, UpdateDate = DateTime.Now }; _stockDbContext.Stock.Add(stock); _stockDbContext.SaveChanges(); } // 模擬扣減庫存 using var trans = _stockDbContext.Database.BeginTransaction(_capPublisher, autoCommit: false); try { // 根據產品編號找到產品 var product = _stockDbContext.Stock.Where(s => s.ProductNo == order.ProductNo).FirstOrDefault(); // 扣減庫存之後保存 product.StockCount = product.StockCount - order.Count; _stockDbContext.Update(product); _stockDbContext.SaveChanges(); // 可以繼續向下發布流程,比如庫存扣減成功,下一步到物流服務進行相關處理,可以繼續發布消息 // _capPublisher.Publish(); trans.Commit(); Console.WriteLine(order.OrderNo); } catch (Exception ex) { trans.Rollback(); } }可以看到,訂閱很簡單,直接標上[CapSubscribe(“Order.Create.Success”)]這個Attribute就行了,如果消息狀態為失敗,後續CAP的定時任務會根據定時策略調用此方法。
1.3.3 運行看效果
-
正常流程,下單成功,扣減庫存成功
將訂單服務(埠5000)和庫存服務(埠6000)都啟動起來。
訂單服務中增加了OrderController,裡面有一個GenerateOrder的介面,直接調用即可:

這裡使用Postman工具進行測試,如下:

庫存服務就會訂閱到資訊,如下:

業務流程完成之後,訂單和庫存數據整體一致了,回過頭來看看消息表,看看裡面有什麼消息,如下:

-
異常流程模擬,下單成功,扣減庫存失敗
在扣減服務邏輯方法中手動拋出異常,程式碼如下:

然後啟動項目重新測試,再下一個訂單試試; 操作後,先來看看消息表,如下:

註:CAP在默認情況下,發送和消費消息的過程中失敗會立即重試 3 次,在 3 次以後將進入重試輪詢;重試將在發送和消費消息失敗的 4分鐘後 開始,這是為了避免設置消息狀態延遲導致可能出現的問題;後續就會每隔1分鐘之後重試一次,默認的最高重試次數為50次,當達到50次時,就不會重試了。
現在知道問題了,優化程式碼,重新啟動,即把拋異常的程式碼注釋掉,看看會不會自動處理,如下:

如上圖,稍等一會,消息就自動處理了,業務數據符合預期,保證一致性。 這個是CAP內部定時讀取消息表,根據狀態不斷重試業務邏輯,直到成功為止。 CAP的全自動是不是感覺比較便捷,寫最少的程式碼,解決了最難搞的分散式事務。
-
修改默認的配置
在實際業務場景中,默認配置可能不太實用,可以在註冊服務時進行默認配置更改,如下:

配置修改之後的測試這裡就不截圖了,留給小夥伴們動手試試吧。
案例程式碼地址://gitee.com/CodeZoe/microservies-demo/tree/main/CapDemo
總結
關於分散式事務的實操,把最常用的最終一致性方案簡單分享了一下,小夥伴可以根據自己的業務場景,趕緊動手試試吧;
其他方案會在後續的文章中加上,主要還是以實用為主,已經不咋用的就沒必要再說啦。
文章中提及到Docker和RabbitMQ,我已經在著手準備這塊的文章了,關注「Code綜藝圈」,和我一起學習吧;




















