MASA Framework — EventBus入門與設計

概述

事件總線是一種事件發佈/訂閱結構,通過發佈訂閱模式可以解耦不同架構層級,同樣它也可以來解決業務之間的耦合,它有以下優點

  • 松耦合
  • 橫切關注點
  • 可測試性
  • 事件驅動

發佈訂閱模式

通過下圖我們可以快速了解發佈訂閱模式的本質

  1. 訂閱者將自己關心的事件在調度中心進行註冊
  2. 事件的發佈者通過調度中心把事件發佈出去
  3. 訂閱者收到自己關心的事件變更並執行相對應業務

EventBus.png

其中發佈者無需知道訂閱者是誰,訂閱者彼此之間也互不認識,彼此之間互不干擾

事件總線類型

在Masa Framework中,將事件劃分為

  • 進程內事件 (Event)

本地事件,它的發佈與訂閱需要在同一個進程中,訂閱方與發佈方需要在同一個項目中

集成事件,它的發佈與訂閱一定不在同一個進程中,訂閱方與發佈方可以在同一個項目中,也可以在不同的項目中

下面我們會用一個註冊用戶的例子來說明如何使用本地事件

入門

  1. 新建ASP.NET Core 空項目Assignment.InProcessEventBus,並安裝Masa.Contrib.Dispatcher.Events
dotnet new web -o Assignment.InProcessEventBus
cd Assignment.InProcessEventBus
dotnet add package Masa.Contrib.Dispatcher.Events --version 0.7.0-preview.7
  1. 註冊EventBus (用於發佈本地事件), 修改Program.cs
builder.Services.AddEventBus();
  1. 新增RegisterUserEvent類並繼承Event,用於發佈註冊用戶事件
public record RegisterEvent : Event
{
    public string Account { get; set; }

    public string Email { get; set; }

    public string Password { get; set; }
}
  1. 新增註冊用戶處理程序

在指定事件處理程序方法上增加特性 EventHandler,並在方法中增加參數 RegisterUserEvent

public class UserHandler
{
    private readonly ILogger<UserHandler>? _logger;

    public UserHandler(ILogger<UserHandler>? logger = null)
    {
        //todo: 根據需要可在構造函數中注入其它服務 (需支持從DI獲取)
        _logger = logger;
    }

    [EventHandler]
    public void RegisterUser(RegisterUserEvent @event)
    {
        //todo: 1. 編寫註冊用戶業務
        _logger?.LogDebug("-----------{Message}-----------", "檢測用戶是否存在並註冊用戶");
        
        //todo: 2. 編寫發送註冊通知等
        _logger?.LogDebug("-----------{Account} 註冊成功 {Message}-----------", @event.Account, "發送郵件提示註冊成功");
    }
}

註冊用戶的處理程序可以放到任意一個類中,但其構造函數參數必須支持從DI獲取,且處理程序的方法僅支持 TaskVoid 兩種, 不支持其它類型

  1. 發送註冊用戶事件,修改Program.cs
app.MapPost("/register", async (RegisterUserEvent @event, IEventBus eventBus) =>
{
    await eventBus.PublishAsync(@event);
});

進階

處理流程

EventBus的 請求管道包含一系列請求委託,依次調用。 它們與ASP.NET Core中間件有異曲同工之妙,區別點在於中間件的執行順序與註冊順序相反,最先註冊的最後執行

EventBus.png

每個委託均可在下一個委託前後執行操作,其中TransactionMiddleware是EventBus發佈後第一個要進入的中間件 (默認提供),並且它是不支持多次嵌套的。

EventBus 支持嵌套,這意味着我們可以在Handler中重新發佈一個新的Event,但TransactionMiddleware僅會在最外層進入時被觸發一次

自定義中間件

根據需要我們可以自定義中間件,並註冊到EventBus的請求管道中,比如通過增加FluentValidation, 將參數驗證從業務代碼中剝離開來,從而使得處理程序更專註於業務

  1. 註冊FluentValidation, 修改Program.cs
builder.Services.AddValidatorsFromAssembly(Assembly.GetEntryAssembly());
  1. 自定義驗證中間件ValidatorMiddleware.cs,用於驗證參數
public class ValidatorMiddleware<TEvent> : Middleware<TEvent>
    where TEvent : IEvent
{
    private readonly ILogger<ValidatorMiddleware<TEvent>>? _logger;
    private readonly IEnumerable<IValidator<TEvent>> _validators;

    public ValidatorMiddleware(IEnumerable<IValidator<TEvent>> validators, ILogger<ValidatorMiddleware<TEvent>>? logger = null)
    {
        _validators = validators;
        _logger = logger;
    }

    public override async Task HandleAsync(TEvent @event, EventHandlerDelegate next)
    {
        var typeName = @event.GetType().FullName;

        _logger?.LogDebug("----- Validating command {CommandType}", typeName);

        var failures = _validators
            .Select(v => v.Validate(@event))
            .SelectMany(result => result.Errors)
            .Where(error => error != null)
            .ToList();

        if (failures.Any())
        {
            _logger?.LogError("Validation errors - {CommandType} - Event: {@Command} - Errors: {@ValidationErrors}",
                typeName,
                @event,
                failures);

            throw new ValidationException("Validation exception", failures);
        }

        await next();
    }
}
  1. 註冊EventBus並使用驗證中間件ValidatorMiddleware
builder.Services.AddEventBus(eventBusBuilder=>eventBusBuilder.UseMiddleware(typeof(ValidatorMiddleware<>)));
  1. 添加註冊用戶驗證類RegisterUserEventValidator.cs
public class RegisterUserEventValidator : AbstractValidator<RegisterUserEvent>
{
    public RegisterUserEventValidator()
    {
        RuleFor(e => e.Account).NotNull().WithMessage("用戶名不能為空");
        RuleFor(e => e.Email).NotNull().WithMessage("郵箱不能為空");
        RuleFor(e => e.Password)
            .NotNull().WithMessage("密碼不能為空")
            .MinimumLength(6)
            .WithMessage("密碼必須大於6位")
            .MaximumLength(20)
            .WithMessage("密碼必須小於20位");
    }
}

編排

EventBus 支持事件編排,它們可以用來處理一些對執行順序有要求的業務,比如: 註冊用戶必須成功之後才可以發送註冊郵件通知,發送獎勵等等,那我們可以這樣做

將註冊用戶業務拆分為三個Handler,並通過指定Order的值來對執行事件排序

public class UserHandler
{
    private readonly ILogger<UserHandler>? _logger;

    public UserHandler(ILogger<UserHandler>? logger = null)
    {
        _logger = logger;
    }

    [EventHandler(1)]
    public void RegisterUser(RegisterUserEvent @event)
    {
        _logger?.LogDebug("-----------{Message}-----------", "檢測用戶是否存在並註冊用戶");
        //todo: 編寫註冊用戶業務
    }

    [EventHandler(2)]
    public void SendAwardByRegister(RegisterUserEvent @event)
    {
        _logger?.LogDebug("-----------{Account} 註冊成功 {Message}-----------", @event.Account, "發送註冊獎勵");
        //todo: 編寫發送獎勵等
    }

    [EventHandler(3)]
    public void SendNoticeByRegister(RegisterUserEvent @event)
    {
        _logger?.LogDebug("-----------{Account} 註冊成功 {Message}-----------", @event.Account, "發送註冊成功郵件");
        //todo: 編寫發送註冊通知等
    }
}

Saga

EventBus支持Saga模式

Saga

具體是怎麼做呢?

[EventHandler(1, IsCancel = true)]
public void CancelSendAwardByRegister(RegisterUserEvent @event)
{
    _logger?.LogDebug("-----------{Account} 註冊成功,發放獎勵失敗 {Message}-----------", @event.Account, "發放獎勵補償");
}

當發送獎勵出現異常時,則執行補償機制,執行順序為 (2 – 1) > 0,由於目前僅存在一個Order為1的Handler,則執行獎勵補償後退出

但對於部分不需要執行失敗但不需要執行回退的方法,我們可以修改 FailureLevels 確保不會因為當前方法的異常而導致執行補償機制

[EventHandler(3, FailureLevels = FailureLevels.Ignore)]
public void SendNoticeByRegister(RegisterUserEvent @event)
{
    _logger?.LogDebug("-----------{Account} 註冊成功 {Message}-----------", @event.Account, "發送郵件提示註冊成功");
    //todo: 編寫發送註冊通知等
}

源碼解讀

EventHandler

  • FailureLevels: 失敗級別, 默認: Throw
    • Throw:發生異常後,依次執行Order小於當前Handler的Order的取消動作,比如:Handler順序為 1、2、3,CancelHandler為 1、2、3,如果執行 Handler3 異常,則依次執行 2、1
    • ThrowAndCancel:發生異常後,依次執行Order小於等於當前Handler的Order的取消動作,比如:Handler順序為 1、2、3,CancelHandler為 1、2、3,如果執行 Handler3 異常,則依次執行 3、2、1
    • Ignore:發生異常後,忽略當前異常(不執行取消動作),繼續執行其他Handler
  • Order: 執行順序,默認: int.MaxValue,用於控制當前方法的執行順序
  • EnableRetry: 當Handler異常後是否啟用重試, 默認: false
  • RetryTimes: 重試次數,當出現異常後執行多少次重試, 需開啟重試配置
  • IsCancel: 是否是補償機制,默認: false

Middleware

  • SupportRecursive: 是否支持遞歸 (嵌套), 默認: true
    • 部分中間件僅在最外層被觸發一次,像TransactionMiddleware 就是如此,但也有很多中間件是需要被多次執行的,比如ValidatorMiddleware,每次發佈事件時都需要驗證參數是否正確
  • HandleAsync(TEvent @event, EventHandlerDelegate next): 處理程序,通過調用 next() 使得請求進入下一個Handler

IEventHandler 與 ISagaEventHandler

  • HandleAsync(TEvent @event): 提供事件的Handler
  • CancelAsync(TEvent @event): 提供事件的補償Handler

EventHandler功能類似,提供基本的Handler以及補償Handler,推薦使用EventHandler的方式使用

TransactionMiddleware

提供事務中間件,當EventBusUoW以及Masa提供的Repository來使用時,當存在待提交的數據時,會自動執行保存並提交,當出現異常後,會執行事務回滾,無需擔心臟數據入庫

性能測試

與市面上使用較多的MeidatR作了對比,結果如下圖所示:

BenchmarkDotNet=v0.13.1, OS=Windows 10.0.19043.1023 (21H1/May2021Update)
11th Gen Intel Core i7-11700 2.50GHz, 1 CPU, 16 logical and 8 physical cores
.NET SDK=7.0.100-preview.4.22252.9
[Host] : .NET 6.0.6 (6.0.622.26707), X64 RyuJIT DEBUG
Job-MHJZJL : .NET 6.0.6 (6.0.622.26707), X64 RyuJIT

Runtime=.NET 6.0 IterationCount=100 RunStrategy=ColdStart

Method Mean Error StdDev Median Min Max
AddShoppingCartByEventBusAsync 124.80 us 346.93 us 1,022.94 us 8.650 us 6.500 us 10,202.4 us
AddShoppingCartByMediatRAsync 110.57 us 306.47 us 903.64 us 7.500 us 5.300 us 9,000.1 us

根據性能測試我們發現,EventBus與MediatR性能差距很小,但EventBus提供的功能卻要強大的多

常見問題

  1. 按照文檔操作,通過EventBus發佈事件後,對應的Handler並沒有執行,也沒有發現錯誤?

①. EventBus.PublishAsync(@event) 是異步方法,確保等待方法調用成功,檢查是否出現同步方法調用異步方法的情況
②. 註冊EventBus時指定程序集集合, Assembly被用於註冊時獲取並保存事件與Handler的對應關係

var assemblies = new[]
{
    typeof(UserHandler).Assembly
};
builder.Services.AddEventBus(assemblies);

程序集: 手動指定Assembly集合 -> MasaApp.GetAssemblies() -> AppDomain.CurrentDomain.GetAssemblies()

但由於NetCore按需加載,未使用的程序集在當前域中不存在,因此可能會導致部分事件以及Handler的對應關係未正確保存,因此可通過手動指定Assembly集合或者修改全局配置中的Assembly集合來修復這個問題

  1. 通過EventBus發佈事件,Handler出錯,但數據依然保存到數據庫中

①. 檢查是否禁用事務

  1. DisableRollbackOnFailure是否為true (是否失敗時禁止回滾)
  2. UseTransaction是否為false (禁止使用事務)

②. 檢查當前數據庫是否支持回滾。例如: 使用的是Mysql數據庫,但回滾數據失敗,請查看

本章源碼

Assignment11

//github.com/zhenlei520/MasaFramework.Practice

開源地址

MASA.Framework://github.com/masastack/MASA.Framework

MASA.EShop://github.com/masalabs/MASA.EShop

MASA.Blazor://github.com/BlazorComponent/MASA.Blazor

如果你對我們的 MASA Framework 感興趣,無論是代碼貢獻、使用、提 Issue,歡迎聯繫我們

16373211753064.png