聊一聊如何用C#輕鬆完成一個SAGA分散式事務

背景

銀行跨行轉賬業務是一個典型分散式事務場景,假設 A 需要跨行轉賬給 B,那麼就涉及兩個銀行的數據,無法通過一個資料庫的本地事務保證轉賬的 ACID ,只能夠通過分散式事務來解決。

市面上使用比較多的分散式事務框架,支援 SAGA 的,大部分都是 JAVA 為主的,沒有提供 C# 的對接方式,或者是對接難度大,一定程度上讓人望而卻步。

這裡推薦一下葉東富大佬的分散式事務框架 dtm,一款跨語言的開源分散式事務管理器,優雅的解決了冪等、空補償、懸掛等分散式事務難題。提供了簡單易用、高性能、易水平擴展的分散式事務解決方案。

老黃在搜索相關分散式事務資料的時候,他寫的文章都是相對比較好理解的,也就是這樣關注到了 dtm 這個項目。

下面就基於這個框架來實踐一下銀行轉賬的例子。

前置工作

dotnet add package Dtmcli --version 0.3.0

成功的 SAGA

先來看一下一個成功完成的 SAGA 時序圖。

上圖的微服務1,對應我們示例的 OutApi,也就是轉錢出去的那個服務。

微服務2,對應我們示例的 InApi,也就是轉錢進來的那個服務。

下面是兩個服務的正向操作和補償操作的處理。

OutApi

app.MapPost("/api/TransOut", (string branch_id, string gid, string op, TransRequest req) => 
{
    // 進行 資料庫操作
    Console.WriteLine($"用戶【{req.UserId}】轉出【{req.Amount}】正向操作,gid={gid}, branch_id={branch_id}, op={op}");

    return Results.Ok(TransResponse.BuildSucceedResponse());
});

app.MapPost("/api/TransOutCompensate", (string branch_id, string gid, string op, TransRequest req) =>
{
    // 進行 資料庫操作
    Console.WriteLine($"用戶【{req.UserId}】轉出【{req.Amount}】補償操作,gid={gid}, branch_id={branch_id}, op={op}");

    return Results.Ok(TransResponse.BuildSucceedResponse());
});

InApi

app.MapPost("/api/TransIn", (string branch_id, string gid, string op, TransRequest req) =>
{
    Console.WriteLine($"用戶【{req.UserId}】轉入【{req.Amount}】正向操作,gid={gid}, branch_id={branch_id}, op={op}");

    return Results.Ok(TransResponse.BuildSucceedResponse());
});

app.MapPost("/api/TransInCompensate", (string branch_id, string gid, string op, TransRequest req) =>
{
    Console.WriteLine($"用戶【{req.UserId}】轉入【{req.Amount}】補償操作,gid={gid}, branch_id={branch_id}, op={op}");

    return Results.Ok(TransResponse.BuildSucceedResponse());
});

註:示例為了簡單,沒有進行實際的資料庫操作。

到此各個子事務的處理已經 OK 了,然後是開啟 SAGA 事務,進行分支調用

var userOutReq = new TransRequest() { UserId = "1", Amount = -30 };
var userInReq = new TransRequest() { UserId = "2", Amount = 30 };

var ct = new CancellationToken();
var gid = await dtmClient.GenGid(ct);
var saga = new Saga(dtmClient, gid)
    .Add(outApi + "/TransOut", outApi + "/TransOutCompensate", userOutReq)
    .Add(inApi + "/TransIn", inApi + "/TransInCompensate", userInReq)
    ;

var flag = await saga.Submit(ct);

Console.WriteLine($"case1, {gid} saga 提交結果 = {flag}");

到這裡,一個完整的 SAGA 分散式事務就編寫完成了。

搭建好 dtm 的環境後,運行上面的例子,會看到下面的輸出。

當然,上面的情況太理想了,轉出轉入都是一次性就成功了。

但是實際上我們會遇到許許多多的問題,最常見的應該就是網路故障了。

下面來看一個異常的 SAGA 示例

異常的 SAGA

做一個假設,用戶1的轉出是正常的,但是用戶2在轉入的時候出現了問題。

由於事務已經提交給 dtm 了,按照 SAGA 事務的協議,dtm 會重試未完成的操作。

這個時候用戶2 這邊會出現什麼樣的情況呢?

  1. 轉入其實成功了,但是 dtm 收到錯誤 (網路故障等)
  2. 轉入沒有成功,直接告訴 dtm 失敗了 (應用異常等)

無論是那一種,dtm 都會進行重試操作。這個時候會發生什麼呢?我們繼續往下看。

先看一下事務失敗交互的時序圖

再通過調整上面成功的例子,來比較直觀的看看出現的情況。

在 InApi 加多一個轉入失敗的處理介面

app.MapPost("/api/TransInError", (string branch_id, string gid, string op, TransRequest req) =>
{
    Console.WriteLine($"用戶【{req.UserId}】轉入【{req.Amount}】正向操作--失敗,gid={gid}, branch_id={branch_id}, op={op}");

    //return Results.BadRequest();
    return Results.Ok(TransResponse.BuildFailureResponse());
});

失敗的返回有兩種,一種是狀態碼大於 400,一種是狀態碼是 200 並且響應體包含 FAILURE,上面的例子是第二種

調整一下調用方,把轉入正向操作替換成上面這個返回錯誤的介面。

var saga = new Saga(dtmClient, gid)
    .Add(outApi + "/TransOut", outApi + "/TransOutCompensate", userOutReq)
    .Add(inApi + "/TransInError", inApi + "/TransInCompensate", userInReq);

運行結果如下:

在這個例子中,只考慮補償/重試成功的情況下。

用戶1 轉出的 30 塊錢最終是回到了他的帳號上,他沒有出現損失。

用戶2 就有點苦逼了,轉入沒有成功,返回了失敗,還觸發了轉入的補償機制,結果就是把用戶2 還沒進帳的 30 塊錢給多扣了,這個就是上面的情況2,常見的空補償問題。

這個時候就要在進行轉入補償的時候做一系列的判斷,轉入有沒有成功,轉出有沒有失敗等等,把業務變的十分複雜。

如果出現了上述的情況1,會發生什麼呢?

用戶2 第一次已經成功轉入 30 塊錢,返回的也是成功,但是網路出了點問題,導致 dtm 認為失敗了,它就會進行重試,相當於用戶2 還會收到第二個轉入 30 塊錢的請求!也就是說這次轉帳,用戶2 會進賬 60 塊錢,翻倍了,也就是說這個請求不是冪等。

同樣的,要處理這個問題,在進行轉入的正向操作中也要進行一系列的判斷,同樣會把複雜度上升一個級別。

前面有提到 dtm 提供了子事務屏障的功能,保證了冪等、空補償等常見問題。

再來看看這個子事務屏障的功能有沒有幫我們簡化上面異常處理。

子事務屏障

子事務屏障,需要根據 trans_typegidbranch_idop 四個內容進行創建。

這4個內容 dtm 在回調時會放在 querysting 上面。

客戶端裡面提供了 IBranchBarrierFactory 來供我們使用。

空補償

針對上面的異常情況(用戶2 憑空消失 30 塊錢),對轉入的補償進行子事務屏障的改造。

app.MapPost("/api/BarrierTransInCompensate", async (string branch_id, string gid, string op, string trans_type, TransRequest req, IBranchBarrierFactory factory) =>
{
    var barrier = factory.CreateBranchBarrier(trans_type, gid, branch_id, op);

    using var db = Db.GeConn();
    await barrier.Call(db, async (tx) =>
    {
        // 轉入失敗的情況下,不應該輸出下面這個
        Console.WriteLine($"用戶【{req.UserId}】轉入【{req.Amount}】補償操作,gid={gid}, branch_id={branch_id}, op={op}");
        // tx 參數是事務,可和本地事務一起提交回滾
        await Task.CompletedTask;
    });

    Console.WriteLine($"子事務屏障-補償操作,gid={gid}, branch_id={branch_id}, op={op}");
    return Results.Ok(TransResponse.BuildSucceedResponse());
});

Call 方法就是關鍵所在了,需要傳入一個 DbConnection 和真正的業務操作,這裡的業務操作就是在控制台輸出補償操作的資訊。

同樣的,我們再調整一下調用方,把轉入補償操作替換成上面帶子事務屏障的介面。

var saga = new Saga(dtmClient, gid)
    .Add(outApi + "/TransOut", outApi + "/TransOutCompensate", userOutReq)
    .Add(inApi + "/TransInError", inApi + "/BarrierTransInCompensate", userInReq)
    ;

再來運行這個例子。

會發現轉入的補償操作並沒執行,控制台沒有輸出補償資訊,而是輸出了

Will not exec busiCall, isNullCompensation=True, isDuplicateOrPend=False

這個就表明了,這個請求是個空補償,是不應該執行業務方法的,既空操作。

再來看一下,轉入成功的,但是 dtm 收到了失敗的訊號,不斷重試造成重複請求的情況。

冪等

針對用戶2 轉入兩次 30 塊錢的異常情況,對轉入的正向操作進行子事務屏障的改造。

app.MapPost("/api/BarrierTransIn", async (string branch_id, string gid, string op, string trans_type, TransRequest req, IBranchBarrierFactory factory) =>
{
    Console.WriteLine($"用戶【{req.UserId}】轉入【{req.Amount}】請求來了!!! gid={gid}, branch_id={branch_id}, op={op}");

    var barrier = factory.CreateBranchBarrier(trans_type, gid, branch_id, op);

    using var db = Db.GeConn();
    await barrier.Call(db, async (tx) =>
    {
        var c = Interlocked.Increment(ref _errCount);

        // 模擬一個超時執行
        if (c > 0 && c < 2) await Task.Delay(10000);

        Console.WriteLine($"用戶【{req.UserId}】轉入【{req.Amount}】正向操作,gid={gid}, branch_id={branch_id}, op={op}");
        await Task.CompletedTask;
    });

    return Results.Ok(TransResponse.BuildSucceedResponse());
});

這裡通過一個超時執行來讓 dtm 進行轉入正向操作的重試。

同樣的,我們再調整一下調用方,把轉入的正向操作也替換成上面帶子事務屏障的介面。

var saga = new Saga(dtmClient, gid)
    .Add(outApi + "/TransOut", outApi + "/TransOutCompensate", userOutReq)
    .Add(inApi + "/BarrierTransIn", inApi + "/BarrierTransInCompensate", userInReq)
    ;

再來運行這個例子。

可以看到轉入的正向操作確實是觸發了多次,第一次實際上是成功,只是響應比較慢,導致 dtm 認為是失敗了,觸發了第二次請求,但是第二次請求並沒有執行業務操作,而是輸出了

Will not exec busiCall, isNullCompensation=False, isDuplicateOrPend=True

這個就表明了,這個請求是個重複請求,是不應該執行業務方法的,保證了冪等。

到這裡,可以看出,子事務屏障確實解決了冪等和空補償的問題,大大降低了業務判斷的複雜度和出錯的可能性

寫在最後

在這篇文章里,也通過幾個例子,完整給出了編寫一個 SAGA 事務的過程,涵蓋了正常成功完成,異常情況,以及成功回滾的情況。希望對研究分散式事務的您有所幫助。

本文示例程式碼: DtmSagaSample

參考資料