聊一聊如何用C#輕鬆完成一個SAGA分散式事務
背景
銀行跨行轉賬業務是一個典型分散式事務場景,假設 A 需要跨行轉賬給 B,那麼就涉及兩個銀行的數據,無法通過一個資料庫的本地事務保證轉賬的 ACID ,只能夠通過分散式事務來解決。
市面上使用比較多的分散式事務框架,支援 SAGA 的,大部分都是 JAVA 為主的,沒有提供 C# 的對接方式,或者是對接難度大,一定程度上讓人望而卻步。
這裡推薦一下葉東富大佬的分散式事務框架 dtm,一款跨語言的開源分散式事務管理器,優雅的解決了冪等、空補償、懸掛等分散式事務難題。提供了簡單易用、高性能、易水平擴展的分散式事務解決方案。
老黃在搜索相關分散式事務資料的時候,他寫的文章都是相對比較好理解的,也就是這樣關注到了 dtm 這個項目。
下面就基於這個框架來實踐一下銀行轉賬的例子。
前置工作
dotnet add package Dtmcli --version 0.3.0
成功的 SAGA
先來看一下一個成功完成的 SAGA 時序圖。
上圖的微服務1,對應我們示例的 OutApi,也就是轉錢出去的那個服務。
微服務2,對應我們示例的 InApi,也就是轉錢進來的那個服務。
下面是兩個服務的正向操作和補償操作的處理。
OutApi
app.MapPost("/api/TransOut", (string branch_id, string gid, string op, TransRequest req) =>
{
// 進行 資料庫操作
Console.WriteLine($"用戶【{req.UserId}】轉出【{req.Amount}】正向操作,gid={gid}, branch_id={branch_id}, op={op}");
return Results.Ok(TransResponse.BuildSucceedResponse());
});
app.MapPost("/api/TransOutCompensate", (string branch_id, string gid, string op, TransRequest req) =>
{
// 進行 資料庫操作
Console.WriteLine($"用戶【{req.UserId}】轉出【{req.Amount}】補償操作,gid={gid}, branch_id={branch_id}, op={op}");
return Results.Ok(TransResponse.BuildSucceedResponse());
});
InApi
app.MapPost("/api/TransIn", (string branch_id, string gid, string op, TransRequest req) =>
{
Console.WriteLine($"用戶【{req.UserId}】轉入【{req.Amount}】正向操作,gid={gid}, branch_id={branch_id}, op={op}");
return Results.Ok(TransResponse.BuildSucceedResponse());
});
app.MapPost("/api/TransInCompensate", (string branch_id, string gid, string op, TransRequest req) =>
{
Console.WriteLine($"用戶【{req.UserId}】轉入【{req.Amount}】補償操作,gid={gid}, branch_id={branch_id}, op={op}");
return Results.Ok(TransResponse.BuildSucceedResponse());
});
註:示例為了簡單,沒有進行實際的資料庫操作。
到此各個子事務的處理已經 OK 了,然後是開啟 SAGA 事務,進行分支調用
var userOutReq = new TransRequest() { UserId = "1", Amount = -30 };
var userInReq = new TransRequest() { UserId = "2", Amount = 30 };
var ct = new CancellationToken();
var gid = await dtmClient.GenGid(ct);
var saga = new Saga(dtmClient, gid)
.Add(outApi + "/TransOut", outApi + "/TransOutCompensate", userOutReq)
.Add(inApi + "/TransIn", inApi + "/TransInCompensate", userInReq)
;
var flag = await saga.Submit(ct);
Console.WriteLine($"case1, {gid} saga 提交結果 = {flag}");
到這裡,一個完整的 SAGA 分散式事務就編寫完成了。
搭建好 dtm 的環境後,運行上面的例子,會看到下面的輸出。
當然,上面的情況太理想了,轉出轉入都是一次性就成功了。
但是實際上我們會遇到許許多多的問題,最常見的應該就是網路故障了。
下面來看一個異常的 SAGA 示例
異常的 SAGA
做一個假設,用戶1的轉出是正常的,但是用戶2在轉入的時候出現了問題。
由於事務已經提交給 dtm 了,按照 SAGA 事務的協議,dtm 會重試未完成的操作。
這個時候用戶2 這邊會出現什麼樣的情況呢?
- 轉入其實成功了,但是 dtm 收到錯誤 (網路故障等)
- 轉入沒有成功,直接告訴 dtm 失敗了 (應用異常等)
無論是那一種,dtm 都會進行重試操作。這個時候會發生什麼呢?我們繼續往下看。
先看一下事務失敗交互的時序圖
再通過調整上面成功的例子,來比較直觀的看看出現的情況。
在 InApi 加多一個轉入失敗的處理介面
app.MapPost("/api/TransInError", (string branch_id, string gid, string op, TransRequest req) =>
{
Console.WriteLine($"用戶【{req.UserId}】轉入【{req.Amount}】正向操作--失敗,gid={gid}, branch_id={branch_id}, op={op}");
//return Results.BadRequest();
return Results.Ok(TransResponse.BuildFailureResponse());
});
失敗的返回有兩種,一種是狀態碼大於 400,一種是狀態碼是 200 並且響應體包含 FAILURE,上面的例子是第二種
調整一下調用方,把轉入正向操作替換成上面這個返回錯誤的介面。
var saga = new Saga(dtmClient, gid)
.Add(outApi + "/TransOut", outApi + "/TransOutCompensate", userOutReq)
.Add(inApi + "/TransInError", inApi + "/TransInCompensate", userInReq);
運行結果如下:
在這個例子中,只考慮補償/重試成功的情況下。
用戶1 轉出的 30 塊錢最終是回到了他的帳號上,他沒有出現損失。
用戶2 就有點苦逼了,轉入沒有成功,返回了失敗,還觸發了轉入的補償機制,結果就是把用戶2 還沒進帳的 30 塊錢給多扣了,這個就是上面的情況2,常見的空補償問題。
這個時候就要在進行轉入補償的時候做一系列的判斷,轉入有沒有成功,轉出有沒有失敗等等,把業務變的十分複雜。
如果出現了上述的情況1,會發生什麼呢?
用戶2 第一次已經成功轉入 30 塊錢,返回的也是成功,但是網路出了點問題,導致 dtm 認為失敗了,它就會進行重試,相當於用戶2 還會收到第二個轉入 30 塊錢的請求!也就是說這次轉帳,用戶2 會進賬 60 塊錢,翻倍了,也就是說這個請求不是冪等。
同樣的,要處理這個問題,在進行轉入的正向操作中也要進行一系列的判斷,同樣會把複雜度上升一個級別。
前面有提到 dtm 提供了子事務屏障的功能,保證了冪等、空補償等常見問題。
再來看看這個子事務屏障的功能有沒有幫我們簡化上面異常處理。
子事務屏障
子事務屏障,需要根據 trans_type,gid,branch_id 和 op 四個內容進行創建。
這4個內容 dtm 在回調時會放在 querysting 上面。
客戶端裡面提供了 IBranchBarrierFactory 來供我們使用。
空補償
針對上面的異常情況(用戶2 憑空消失 30 塊錢),對轉入的補償進行子事務屏障的改造。
app.MapPost("/api/BarrierTransInCompensate", async (string branch_id, string gid, string op, string trans_type, TransRequest req, IBranchBarrierFactory factory) =>
{
var barrier = factory.CreateBranchBarrier(trans_type, gid, branch_id, op);
using var db = Db.GeConn();
await barrier.Call(db, async (tx) =>
{
// 轉入失敗的情況下,不應該輸出下面這個
Console.WriteLine($"用戶【{req.UserId}】轉入【{req.Amount}】補償操作,gid={gid}, branch_id={branch_id}, op={op}");
// tx 參數是事務,可和本地事務一起提交回滾
await Task.CompletedTask;
});
Console.WriteLine($"子事務屏障-補償操作,gid={gid}, branch_id={branch_id}, op={op}");
return Results.Ok(TransResponse.BuildSucceedResponse());
});
Call 方法就是關鍵所在了,需要傳入一個 DbConnection 和真正的業務操作,這裡的業務操作就是在控制台輸出補償操作的資訊。
同樣的,我們再調整一下調用方,把轉入補償操作替換成上面帶子事務屏障的介面。
var saga = new Saga(dtmClient, gid)
.Add(outApi + "/TransOut", outApi + "/TransOutCompensate", userOutReq)
.Add(inApi + "/TransInError", inApi + "/BarrierTransInCompensate", userInReq)
;
再來運行這個例子。
會發現轉入的補償操作並沒執行,控制台沒有輸出補償資訊,而是輸出了
Will not exec busiCall, isNullCompensation=True, isDuplicateOrPend=False
這個就表明了,這個請求是個空補償,是不應該執行業務方法的,既空操作。
再來看一下,轉入成功的,但是 dtm 收到了失敗的訊號,不斷重試造成重複請求的情況。
冪等
針對用戶2 轉入兩次 30 塊錢的異常情況,對轉入的正向操作進行子事務屏障的改造。
app.MapPost("/api/BarrierTransIn", async (string branch_id, string gid, string op, string trans_type, TransRequest req, IBranchBarrierFactory factory) =>
{
Console.WriteLine($"用戶【{req.UserId}】轉入【{req.Amount}】請求來了!!! gid={gid}, branch_id={branch_id}, op={op}");
var barrier = factory.CreateBranchBarrier(trans_type, gid, branch_id, op);
using var db = Db.GeConn();
await barrier.Call(db, async (tx) =>
{
var c = Interlocked.Increment(ref _errCount);
// 模擬一個超時執行
if (c > 0 && c < 2) await Task.Delay(10000);
Console.WriteLine($"用戶【{req.UserId}】轉入【{req.Amount}】正向操作,gid={gid}, branch_id={branch_id}, op={op}");
await Task.CompletedTask;
});
return Results.Ok(TransResponse.BuildSucceedResponse());
});
這裡通過一個超時執行來讓 dtm 進行轉入正向操作的重試。
同樣的,我們再調整一下調用方,把轉入的正向操作也替換成上面帶子事務屏障的介面。
var saga = new Saga(dtmClient, gid)
.Add(outApi + "/TransOut", outApi + "/TransOutCompensate", userOutReq)
.Add(inApi + "/BarrierTransIn", inApi + "/BarrierTransInCompensate", userInReq)
;
再來運行這個例子。
可以看到轉入的正向操作確實是觸發了多次,第一次實際上是成功,只是響應比較慢,導致 dtm 認為是失敗了,觸發了第二次請求,但是第二次請求並沒有執行業務操作,而是輸出了
Will not exec busiCall, isNullCompensation=False, isDuplicateOrPend=True
這個就表明了,這個請求是個重複請求,是不應該執行業務方法的,保證了冪等。
到這裡,可以看出,子事務屏障確實解決了冪等和空補償的問題,大大降低了業務判斷的複雜度和出錯的可能性。
寫在最後
在這篇文章里,也通過幾個例子,完整給出了編寫一個 SAGA 事務的過程,涵蓋了正常成功完成,異常情況,以及成功回滾的情況。希望對研究分散式事務的您有所幫助。
本文示例程式碼: DtmSagaSample
參考資料