.Net Core with 微服務 – 使用 AgileDT 快速實現基於可靠消息的分散式事務

前面對於分散式事務也講了好幾篇了(可靠消息最終一致性 分散式事務 – TCC 分散式事務 – 2PC、3PC),但是還沒有實戰過。那麼本篇我們就來演示下如何在 .NET 環境下實現一個基於可靠消息的分散式事務。基於可靠消息的分散式事務流程上還是比較清晰明了的,但是要用程式碼去一個個實現還是比較費事的。通過分析可以發現這個事務的關鍵點就是要在真正的業務邏輯的前面、後面插入對應的流程。很明顯這種流程是可以通過 AOP 技術來簡化操作的。於是就有了 AgileDT 。AgileDT 使用 Natasha 在啟動的時候動態生成代理類,來為你完成跟消息部分的操作,使用者只需關心核心業務邏輯就可以了。
//github.com/kklldog/AgileDT 開源不易,大家多多 ✨✨✨

回顧

前面一篇文章(可靠消息最終一致性 )我們詳細介紹了基於可靠消息的分散式事務。為了更好的理解 AgileDT 的程式碼,我們還是有必要簡單的來回顧下。


該方案總體流程上可分為以下步驟:

  1. 主動方在真正的業務開始前先向可靠消息服務發送一個「待確認」的消息
  2. 可靠消息服務收到待確認消息後持久化消息到資料庫
  3. 如果以上操作成功則主動方開始真正的業務,如果失敗則直接放棄執行業務
  4. 如果業務執行成功則發送「確認」消息給可靠消息服務,如果執行失敗則發送「取消」給可靠消息服務。
  5. 如果可靠消息服務收到「確認」消息則更新資料庫里的消息記錄的狀態為「待發送」,如果收到的消息為「取消」則更新消息狀態為「已取消」
  6. 如果上一步更新的資料庫為「待發送」,那麼會開始往MQ投遞消息,並且更改資料庫里的消息記錄的狀態為「已發送」
  7. 上一步往MQ投遞消息成功後,MQ會給被動方推送消息。
  8. 被動方收到消息後開始處理業務
  9. 如果業務處理成功,則被動方對MQ進行ACK回復,則這條消息會從MQ內移除掉
  10. 如果業務處理成功,則發送「已完成」消息給可靠消息服務
  11. 可靠消息服務收到「已完成」消息後更新資料庫消息記錄未「已完成」

廢話不多說了,下面讓我們演示下如何使用 AgileDT 來快速實現一個基於可靠消息的分散式事務。
以下我們還是以經典的訂單下單完成給會員贈送積分的場景來演示。

使用 AgileDT

依賴組件

  • mysql
  • rabbitmq

目前支援 mysql 資料庫,但是數據訪問組件使用的是 freesql 所以後續要實現支援別的資料庫也很簡單。目前框架使用的可靠消息服務為 rabbitmq 。

運行服務端

在服務新建一個資料庫並且新建一張表

// crate event_message table on mysql
create table if not exists event_message
(
	event_id varchar(36) not null
		primary key,
	biz_msg varchar(4000) null,
	status enum('Prepare', 'Done', 'WaitSend', 'Sent', 'Finish', 'Cancel') not null,
	create_time datetime(3) null,
	event_name varchar(255) null
);

使用docker-compose運行服務端

version: "3"  # optional since v1.27.0
services:
  agile_dt:
    image: "kklldog/agile_dt"
    ports:
      - "5000:5000"
    environment:
      - db:provider=mysql
      - db:conn= Database=agile_dt;Data Source=192.168.0.115;User Id=root;Password=mdsd;port=3306
      - mq:userName=admin
      - mq:password=123456
      - mq:host=192.168.0.115
      - mq:port=5672

安裝客戶端

在主動方跟被動方都需要安裝AgileDT的客戶端庫

Install-Package AgileDT.Client

主動方使用方法

  1. 在業務資料庫添加事務消息表
// crate event_message table on mysql
create table if not exists event_message
(
	event_id varchar(36) not null
		primary key,
	biz_msg varchar(4000) null,
	status enum('Prepare', 'Done', 'WaitSend', 'Sent', 'Finish', 'Cancel') not null,
	create_time datetime(3) null,
	event_name varchar(255) null
);

  1. 修改配置文件
在appsettings.json文件添加以下節點:
  "agiledt": {
    "server": "//localhost:5000",
    "db": {
      "provider": "mysql",
      "conn": "Database=agile_order;Data Source=192.168.0.125;User Id=dev;Password=dev@123f;port=13306"
      //"conn": "Database=agile_order;Data Source=192.168.0.115;User Id=root;Password=mdsd;port=3306"
    },
    "mq": {
      "host": "192.168.0.125",
      //"host": "192.168.0.115",
      "userName": "admin",
      "password": "123456",
      "port": 5672
    }
  }
  1. 注入 AgileDT 客戶端服務
       public void ConfigureServices(IServiceCollection services)
        {
            services.AddAgileDT();
            ...
        }
  1. 實現IEventService方法
    處理主動方業務邏輯的類需要實現IEventService介面,並且標記那個方法是真正的業務方法。AgileDT在啟動的時候會掃描這些類型,並且使用AOP技術生成代理類,在業務方法前後插入對應的邏輯來跟可靠消息服務通訊。
    這裡要注意的幾個地方:
  • 實現IEventService介面
  • 使用DtEventBizMethod註解標記業務入口方法
  • 使用DtEventName註解來標記事務的方法名稱,如果不標記則使用類名

注意:業務方法最終一定要使用事務來同步修改消息表的status欄位為done狀態,這個操作框架沒辦法幫你實現
注意:業務方法如果失敗請拋出Exception,如果不拋異常框架一律認為執行成功

 public interface IAddOrderService:IEventService
    {
        bool AddOrder(Order order);
    }

    [DtEventName("orderservice.order_added")]
    public class AddOrderService : IAddOrderService
    {
        private readonly ILogger<AddOrderService> _logger;

        public AddOrderService(ILogger<AddOrderService> logger)
        {
            _logger = logger;
        }

        public string EventId { 
            get;
            set;
        }
        
        [DtEventBizMethod]
        public virtual bool AddOrder(Order order)
        {
            var ret = false;

            //3. 寫 Order 跟 修改 event 的狀態必選寫在同一個事務內
            FreeSQL.Instance.Ado.Transaction(() =>
            {
                order.EventId = EventId;//在訂單表新增一個eventid欄位,使order跟event_message表關聯起來
                var ret0 = FreeSQL.Instance.Insert(order).ExecuteAffrows();
                var ret1 = FreeSQL.Instance.Update<OrderService.Data.entities.EventMessage>()
                .Set(x => x.Status, MessageStatus.Done)
                .Where(x => x.EventId == EventId)
                .ExecuteAffrows();

                ret = ret0 > 0 && ret1 > 0;
            });

            return ret;

        }

        /// <summary>
        /// 構造後續業務處理需要的消息內容
        /// </summary>
        /// <returns></returns>
        public string GetBizMsg()
        {
            //這裡可以構造傳遞到MQ的業務消息的內容,比如傳遞訂單編號啊 ,以便後續的被動方處理業務時候使用
            var order = FreeSQL.Instance.Select<Order>().Where(x => x.EventId == EventId).First();
            return order?.Id;
        }
      
    }

在實現好 IAddOrderService 介面後,你可以像平常一樣使用 IAddOrderService 來注入實現類。比如在 Controller 的構造函數注入進去。因為 AgileDT 在啟動的時候會自動幫你註冊。

注意:IAddOrderService 跟實現類的生命周期是 Scoped 。

被動方使用方法

  1. 在業務方資料庫建表或者在業務表上加欄位
    對於被動方來說這裡不是必須要建一個表。但是至少要有個地方來存儲event_id的資訊,最簡單的是直接在業務主表上加event_id欄位。
  2. 修改配置文件
在appsettings.json文件添加以下節點:
  "agiledt": {
    "server": "//localhost:5000",
    "db": {
      "provider": "mysql",
      "conn": "Database=agile_order;Data Source=192.168.0.125;User Id=dev;Password=dev@123f;port=13306"
      //"conn": "Database=agile_order;Data Source=192.168.0.115;User Id=root;Password=mdsd;port=3306"
    },
    "mq": {
      "host": "192.168.0.125",
      //"host": "192.168.0.115",
      "userName": "admin",
      "password": "123456",
      "port": 5672
    }
  }
  1. 注入AgileDT服務
       public void ConfigureServices(IServiceCollection services)
        {
            services.AddAgileDT();
            ...
        }
  1. 實現IEventMessageHandler介面
    被動方需要接收MQ投遞過來的消息,這些處理類需要實現IEventMessageHandler介面。AgileDT啟動的時候會去掃描這些類,然後跟MQ建立綁定關係。
  • 這裡必須使用DtEventName註解標記需要處理的事件名稱
  • Reveive 方法必須是冥等的
    public interface IOrderAddedMessageHandler: IEventMessageHandler
    {
    }
    
    [DtEventName("orderservice.order_added")]
    public class OrderAddedMessageHandler: IOrderAddedMessageHandler
    {
        static object _lock = new object();

        public bool Receive(EventMessage message)
        {
            var bizMsg = message.BizMsg;
            var eventId = message.EventId;
            string orderId = bizMsg;

            lock (_lock)
            {
                var entity = FreeSQL.Instance.Select<PointHistory>().Where(x => x.EventId == eventId).First();
                if (entity == null)
                {
                    var ret = FreeSQL.Instance.Insert(new PointHistory
                    {
                        Id = Guid.NewGuid().ToString(),
                        EventId = message.EventId,
                        OrderId = orderId,
                        Points = 20,
                        CreateTime = DateTime.Now
                    }).ExecuteAffrows();

                    return ret > 0;
                }
                else
                {
                    return true;
                }
            }
        }
    }

總結

通過以上演示,我們快速的實現了一個訂單下單會員贈送積分的服務。可以看到使用 AgileDT 可以很快速的實現一個分散式事務。特別是在實現過一個分散式事務後,後面實現起來就特別簡單,只要實現幾個介面就可以了。AgileDT 才剛剛起步,希望大家多多支援,多多✨✨✨ ,多多 PR 。

//github.com/kklldog/AgileDT