MASA Framework — EventBus入門與設計
- 2022 年 11 月 14 日
- 筆記
- .NET, MASA Framework
概述
事件總線是一種事件發佈/訂閱結構,通過發佈訂閱模式可以解耦不同架構層級,同樣它也可以來解決業務之間的耦合,它有以下優點
- 松耦合
- 橫切關注點
- 可測試性
- 事件驅動
發佈訂閱模式
通過下圖我們可以快速了解發佈訂閱模式的本質
- 訂閱者將自己關心的事件在調度中心進行註冊
- 事件的發佈者通過調度中心把事件發佈出去
- 訂閱者收到自己關心的事件變更並執行相對應業務
其中發佈者無需知道訂閱者是誰,訂閱者彼此之間也互不認識,彼此之間互不干擾
事件總線類型
在Masa Framework中,將事件劃分為
- 進程內事件 (Event)
本地事件,它的發佈與訂閱需要在同一個進程中,訂閱方與發佈方需要在同一個項目中
- 跨進程事件 (IntegrationEvent)
集成事件,它的發佈與訂閱一定不在同一個進程中,訂閱方與發佈方可以在同一個項目中,也可以在不同的項目中
下面我們會用一個註冊用戶的例子來說明如何使用本地事件
入門
- 安裝.NET 6.0
- 新建ASP.NET Core 空項目
Assignment.InProcessEventBus
,並安裝Masa.Contrib.Dispatcher.Events
dotnet new web -o Assignment.InProcessEventBus
cd Assignment.InProcessEventBus
dotnet add package Masa.Contrib.Dispatcher.Events --version 0.7.0-preview.7
- 註冊EventBus (用於發佈本地事件), 修改
Program.cs
builder.Services.AddEventBus();
- 新增
RegisterUserEvent
類並繼承Event
,用於發佈註冊用戶事件
public record RegisterEvent : Event
{
public string Account { get; set; }
public string Email { get; set; }
public string Password { get; set; }
}
- 新增
註冊用戶
處理程序
在指定事件處理程序方法上增加特性 EventHandler,並在方法中增加參數 RegisterUserEvent
public class UserHandler
{
private readonly ILogger<UserHandler>? _logger;
public UserHandler(ILogger<UserHandler>? logger = null)
{
//todo: 根據需要可在構造函數中注入其它服務 (需支持從DI獲取)
_logger = logger;
}
[EventHandler]
public void RegisterUser(RegisterUserEvent @event)
{
//todo: 1. 編寫註冊用戶業務
_logger?.LogDebug("-----------{Message}-----------", "檢測用戶是否存在並註冊用戶");
//todo: 2. 編寫發送註冊通知等
_logger?.LogDebug("-----------{Account} 註冊成功 {Message}-----------", @event.Account, "發送郵件提示註冊成功");
}
}
註冊用戶的處理程序可以放到任意一個類中,但其構造函數參數必須支持從DI獲取,且處理程序的方法僅支持
Task
或Void
兩種, 不支持其它類型
- 發送註冊用戶事件,修改
Program.cs
app.MapPost("/register", async (RegisterUserEvent @event, IEventBus eventBus) =>
{
await eventBus.PublishAsync(@event);
});
進階
處理流程
EventBus的 請求管道包含一系列請求委託,依次調用。 它們與ASP.NET Core中間件有異曲同工之妙,區別點在於中間件的執行順序與註冊順序相反,最先註冊的最後執行
每個委託均可在下一個委託前後執行操作,其中TransactionMiddleware
是EventBus發佈後第一個要進入的中間件 (默認提供),並且它是不支持多次嵌套的。
EventBus 支持嵌套,這意味着我們可以在Handler中重新發佈一個新的
Event
,但TransactionMiddleware
僅會在最外層進入時被觸發一次
自定義中間件
根據需要我們可以自定義中間件,並註冊到EventBus的請求管道中,比如通過增加FluentValidation
, 將參數驗證從業務代碼中剝離開來,從而使得處理程序更專註於業務
- 註冊
FluentValidation
, 修改Program.cs
builder.Services.AddValidatorsFromAssembly(Assembly.GetEntryAssembly());
- 自定義驗證中間件
ValidatorMiddleware.cs
,用於驗證參數
public class ValidatorMiddleware<TEvent> : Middleware<TEvent>
where TEvent : IEvent
{
private readonly ILogger<ValidatorMiddleware<TEvent>>? _logger;
private readonly IEnumerable<IValidator<TEvent>> _validators;
public ValidatorMiddleware(IEnumerable<IValidator<TEvent>> validators, ILogger<ValidatorMiddleware<TEvent>>? logger = null)
{
_validators = validators;
_logger = logger;
}
public override async Task HandleAsync(TEvent @event, EventHandlerDelegate next)
{
var typeName = @event.GetType().FullName;
_logger?.LogDebug("----- Validating command {CommandType}", typeName);
var failures = _validators
.Select(v => v.Validate(@event))
.SelectMany(result => result.Errors)
.Where(error => error != null)
.ToList();
if (failures.Any())
{
_logger?.LogError("Validation errors - {CommandType} - Event: {@Command} - Errors: {@ValidationErrors}",
typeName,
@event,
failures);
throw new ValidationException("Validation exception", failures);
}
await next();
}
}
- 註冊EventBus並使用驗證中間件
ValidatorMiddleware
builder.Services.AddEventBus(eventBusBuilder=>eventBusBuilder.UseMiddleware(typeof(ValidatorMiddleware<>)));
- 添加註冊用戶驗證類
RegisterUserEventValidator.cs
public class RegisterUserEventValidator : AbstractValidator<RegisterUserEvent>
{
public RegisterUserEventValidator()
{
RuleFor(e => e.Account).NotNull().WithMessage("用戶名不能為空");
RuleFor(e => e.Email).NotNull().WithMessage("郵箱不能為空");
RuleFor(e => e.Password)
.NotNull().WithMessage("密碼不能為空")
.MinimumLength(6)
.WithMessage("密碼必須大於6位")
.MaximumLength(20)
.WithMessage("密碼必須小於20位");
}
}
編排
EventBus 支持事件編排,它們可以用來處理一些對執行順序有要求的業務,比如: 註冊用戶必須成功之後才可以發送註冊郵件通知,發送獎勵等等,那我們可以這樣做
將註冊用戶業務拆分為三個Handler,並通過指定Order的值來對執行事件排序
public class UserHandler
{
private readonly ILogger<UserHandler>? _logger;
public UserHandler(ILogger<UserHandler>? logger = null)
{
_logger = logger;
}
[EventHandler(1)]
public void RegisterUser(RegisterUserEvent @event)
{
_logger?.LogDebug("-----------{Message}-----------", "檢測用戶是否存在並註冊用戶");
//todo: 編寫註冊用戶業務
}
[EventHandler(2)]
public void SendAwardByRegister(RegisterUserEvent @event)
{
_logger?.LogDebug("-----------{Account} 註冊成功 {Message}-----------", @event.Account, "發送註冊獎勵");
//todo: 編寫發送獎勵等
}
[EventHandler(3)]
public void SendNoticeByRegister(RegisterUserEvent @event)
{
_logger?.LogDebug("-----------{Account} 註冊成功 {Message}-----------", @event.Account, "發送註冊成功郵件");
//todo: 編寫發送註冊通知等
}
}
Saga
EventBus支持Saga模式
具體是怎麼做呢?
[EventHandler(1, IsCancel = true)]
public void CancelSendAwardByRegister(RegisterUserEvent @event)
{
_logger?.LogDebug("-----------{Account} 註冊成功,發放獎勵失敗 {Message}-----------", @event.Account, "發放獎勵補償");
}
當發送獎勵出現異常時,則執行補償機制,執行順序為 (2 – 1) > 0,由於目前僅存在一個Order為1的Handler,則執行獎勵補償後退出
但對於部分不需要執行失敗但不需要執行回退的方法,我們可以修改 FailureLevels
確保不會因為當前方法的異常而導致執行補償機制
[EventHandler(3, FailureLevels = FailureLevels.Ignore)]
public void SendNoticeByRegister(RegisterUserEvent @event)
{
_logger?.LogDebug("-----------{Account} 註冊成功 {Message}-----------", @event.Account, "發送郵件提示註冊成功");
//todo: 編寫發送註冊通知等
}
源碼解讀
EventHandler
- FailureLevels: 失敗級別, 默認: Throw
- Throw:發生異常後,依次執行Order小於當前Handler的Order的取消動作,比如:Handler順序為 1、2、3,CancelHandler為 1、2、3,如果執行 Handler3 異常,則依次執行 2、1
- ThrowAndCancel:發生異常後,依次執行Order小於等於當前Handler的Order的取消動作,比如:Handler順序為 1、2、3,CancelHandler為 1、2、3,如果執行 Handler3 異常,則依次執行 3、2、1
- Ignore:發生異常後,忽略當前異常(不執行取消動作),繼續執行其他Handler
- Order: 執行順序,默認: int.MaxValue,用於控制當前方法的執行順序
- EnableRetry: 當Handler異常後是否啟用重試, 默認: false
- RetryTimes: 重試次數,當出現異常後執行多少次重試, 需開啟重試配置
- IsCancel: 是否是補償機制,默認: false
Middleware
- SupportRecursive: 是否支持遞歸 (嵌套), 默認: true
- 部分中間件僅在最外層被觸發一次,像
TransactionMiddleware
就是如此,但也有很多中間件是需要被多次執行的,比如ValidatorMiddleware
,每次發佈事件時都需要驗證參數是否正確
- 部分中間件僅在最外層被觸發一次,像
- HandleAsync(TEvent @event, EventHandlerDelegate next): 處理程序,通過調用
next()
使得請求進入下一個Handler
IEventHandler 與 ISagaEventHandler
- HandleAsync(TEvent @event): 提供事件的Handler
- CancelAsync(TEvent @event): 提供事件的補償Handler
與
EventHandler
功能類似,提供基本的Handler以及補償Handler,推薦使用EventHandler
的方式使用
TransactionMiddleware
提供事務中間件,當EventBus
與UoW
以及Masa提供的Repository
來使用時,當存在待提交的數據時,會自動執行保存並提交,當出現異常後,會執行事務回滾,無需擔心臟數據入庫
性能測試
與市面上使用較多的MeidatR
作了對比,結果如下圖所示:
BenchmarkDotNet=v0.13.1, OS=Windows 10.0.19043.1023 (21H1/May2021Update)
11th Gen Intel Core i7-11700 2.50GHz, 1 CPU, 16 logical and 8 physical cores
.NET SDK=7.0.100-preview.4.22252.9
[Host] : .NET 6.0.6 (6.0.622.26707), X64 RyuJIT DEBUG
Job-MHJZJL : .NET 6.0.6 (6.0.622.26707), X64 RyuJIT
Runtime=.NET 6.0 IterationCount=100 RunStrategy=ColdStart
Method | Mean | Error | StdDev | Median | Min | Max |
---|---|---|---|---|---|---|
AddShoppingCartByEventBusAsync | 124.80 us | 346.93 us | 1,022.94 us | 8.650 us | 6.500 us | 10,202.4 us |
AddShoppingCartByMediatRAsync | 110.57 us | 306.47 us | 903.64 us | 7.500 us | 5.300 us | 9,000.1 us |
根據性能測試我們發現,EventBus與MediatR性能差距很小,但EventBus提供的功能卻要強大的多
常見問題
- 按照文檔操作,通過
EventBus
發佈事件後,對應的Handler並沒有執行,也沒有發現錯誤?
①. EventBus.PublishAsync(@event) 是異步方法,確保等待方法調用成功,檢查是否出現同步方法調用異步方法的情況
②. 註冊EventBus
時指定程序集集合, Assembly被用於註冊時獲取並保存事件與Handler的對應關係
var assemblies = new[]
{
typeof(UserHandler).Assembly
};
builder.Services.AddEventBus(assemblies);
程序集: 手動指定Assembly集合 -> MasaApp.GetAssemblies() -> AppDomain.CurrentDomain.GetAssemblies()
但由於NetCore按需加載,未使用的程序集在當前域中不存在,因此可能會導致部分事件以及Handler的對應關係未正確保存,因此可通過手動指定Assembly集合或者修改全局配置中的Assembly集合來修復這個問題
- 通過EventBus發佈事件,Handler出錯,但數據依然保存到數據庫中
①. 檢查是否禁用事務
- DisableRollbackOnFailure是否為true (是否失敗時禁止回滾)
- UseTransaction是否為false (禁止使用事務)
②. 檢查當前數據庫是否支持回滾。例如: 使用的是Mysql數據庫,但回滾數據失敗,請查看
本章源碼
Assignment11
//github.com/zhenlei520/MasaFramework.Practice
開源地址
MASA.Framework://github.com/masastack/MASA.Framework
MASA.EShop://github.com/masalabs/MASA.EShop
MASA.Blazor://github.com/BlazorComponent/MASA.Blazor
如果你對我們的 MASA Framework 感興趣,無論是代碼貢獻、使用、提 Issue,歡迎聯繫我們