MassTransit | .NET 分散式應用框架
引言
A free, open-source distributed application framework for .NET.
一個免費、開源的.NET 分散式應用框架。 — MassTransit 官網
MassTransit,直譯公共交通, 是由Chris Patterson
開發的基於消息驅動的.NET 分散式應用框架,其核心思想是藉助消息來實現服務之間的松耦合非同步通訊,進而確保應用更高的可用性、可靠性和可擴展性。通過對消息模型的高度抽象,以及對主流的消息代理(包括RabbitMQ、ActiveMQ、Kafaka、Azure Service Bus、Amazon SQS等)的集成,大大簡化了基於消息驅動的開發門檻,同時內置了連接管理、消息序列化和消費者生命周期管理,以及諸如重試、限流、斷路器等異常處理機制,讓開發者更好的專註於業務實現。
簡而言之,MassTransit實現了消息代理透明化。無需面向消息代理編程進行諸如連接管理、隊列的申明和綁定等操作,即可輕鬆實現應用間消息的傳遞和消費。
快速體驗
空口無憑,創建一個項目快速體驗一下。
- 基於
worker
模板創建一個基礎項目:dotnet new worker -n MassTransit.Demo
- 打開項目,添加NuGet包:
MassTransit
- 定義訂單創建事件消息契約:
using System;
namespace MassTransit.Demo
{
public record OrderCreatedEvent
{
public Guid OrderId { get; set; }
}
}
- 修改
Worker
類,發送訂單創建事件:
namespace MassTransit.Demo;
public class Worker : BackgroundService
{
readonly IBus _bus;//註冊匯流排
public Worker(IBus bus)
{
_bus = bus;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
//模擬並發送訂單創建事件
await _bus.Publish(new OrderCreatedEvent(Guid.NewGuid()), stoppingToken);
await Task.Delay(1000, stoppingToken);
}
}
}
- 僅需實現
IConsumer<OrderCreatedEvent>
泛型介面,即可實現消息的訂閱:
public class OrderCreatedEventConsumer: IConsumer<OrderCreatedEvent>
{
private readonly ILogger<OrderCreatedEventConsumer> _logger;
public OrderCreatedEventConsumer(ILogger<OrderCreatedEventConsumer> logger)
{
_logger = logger;
}
public Task Consume(ConsumeContext<OrderCreatedEvent> context)
{
_logger.LogInformation($"Received Order:{context.Message.OrderId}");
return Task.CompletedTask;
}
}
- 註冊服務:
using MassTransit;
using MassTransit.Demo;
IHost host = Host.CreateDefaultBuilder(args)
.ConfigureServices(services =>
{
services.AddHostedService<Worker>();
services.AddMassTransit(configurator =>
{
//註冊消費者
configurator.AddConsumer<OrderCreatedEventConsumer>();
//使用基於記憶體的消息路由傳輸
configurator.UsingInMemory((context, cfg) =>
{
cfg.ConfigureEndpoints(context);
});
});
})
.Build();
await host.RunAsync();
- 運行項目,一個簡單的進程內事件發布訂閱的應用就完成了。
如果需要使用RabbitMQ 消息代理進行消息傳輸,則僅需安裝MassTransit.RabbitMQ
NuGet包,然後指定使用RabbitMQ 傳輸消息即可。
using MassTransit;
using MassTransit.Demo;
IHost host = Host.CreateDefaultBuilder(args)
.ConfigureServices(services =>
{
services.AddHostedService<Worker>();
services.AddMassTransit(configurator =>
{
configurator.AddConsumer<OrderCreatedEventConsumer>();
// configurator.UsingInMemory((context, cfg) =>
// {
// cfg.ConfigureEndpoints(context);
// });
configurator.UsingRabbitMq((context, cfg) =>
{
cfg.Host(
host: "localhost",
port: 5672,
virtualHost: "/",
configure: hostConfig =>
{
hostConfig.Username("guest");
hostConfig.Password("guest");
});
cfg.ConfigureEndpoints(context);
});
});
})
.Build();
await host.RunAsync();
運行項目,MassTransit會自動在指定的RabbitMQ上創建一個類型為fanout
的MassTransit.Demo.OrderCreatedEvent
Exchange和一個與OrderCreatedEvent
同名的隊列進行消息傳輸,如下圖所示。
核心概念
MassTranist 為了實現消息代理的透明化和應用間消息的高效傳輸,抽象了以下概念,其中消息流轉流程如下圖所示:
- Message:消息契約,定義了消息生產者和消息消費者之間的契約。
- Producer:生產者,發送消息的一方都可以稱為生產者。
- SendEndpoint:發送端點,用於將消息內容序列化,並發送到傳輸模組。
- Transport:傳輸模組,消息代理透明化的核心,用於和消息代理通訊,負責發送和接收消息。
- ReceiveEndpoint:接收端點,用於從傳輸模組接收消息,反序列化消息內容,並將消息路由到消費者。
- Consumer:消費者,用於消息消費。
從上圖可知,本質上還是發布訂閱模式的實現,接下來就核心概念進行詳解。
Message
Message:消息,可以使用class、interface、struct和record來創建,消息作為一個契約,需確保創建後不能篡改,因此應只保留只讀屬性且不應包含方法和行為。MassTransit使用的是包含命名空間的完全限定名即typeof(T).FullName
來表示特定的消息類型。因此若在另外的項目中消費同名的消息類型,需確保消息的命名空間相同。另外需注意消息不應繼承,以避免發送基類消息類型造成的不可預期的結果。為避免此類情況,官方建議使用介面來定義消息。在MassTransit中,消息主要分為兩種類型:
- Command:命令,用於告訴服務做什麼,命令被發送到指定端點,僅被一個服務接收並執行。一般以動名詞結構命名,如:UpdateAddress、CancelOrder。
- Event:事件,用於告訴服務什麼發生了,事件被發布到多個端點,可以被多個服務消費。 一般以過去式結構命名,如:AddressUpdated,OrderCanceled。
經過MassTransit發送的消息,會使用信封包裝,包含一些附加資訊,數據結構舉例如下:
{
"messageId": "6c600000-873b-00ff-9a8f-08da8da85542",
"requestId": null,
"correlationId": null,
"conversationId": "6c600000-873b-00ff-9526-08da8da85544",
"initiatorId": null,
"sourceAddress": "rabbitmq://localhost/THINKPAD_MassTransitDemo_bus_ptoyyyr88cyx9s1gbdpe5kniy1?temporary=true",
"destinationAddress": "rabbitmq://localhost/MassTransit.Demo:OrderCreatedEvent",
"responseAddress": null,
"faultAddress": null,
"messageType": [
"urn:message:MassTransit.Demo:OrderCreatedEvent"
],
"message": {
"orderId": "fd8a3598-4c3a-4ec9-bbf9-d5f508e1a0d8"
},
"expirationTime": null,
"sentTime": "2022-09-03T12:32:15.0796943Z",
"headers": {},
"host": {
"machineName": "THINKPAD",
"processName": "MassTransit.Demo",
"processId": 24684,
"assembly": "MassTransit.Demo",
"assemblyVersion": "1.0.0.0",
"frameworkVersion": "6.0.5",
"massTransitVersion": "8.0.6.0",
"operatingSystemVersion": "Microsoft Windows NT 10.0.19044.0"
}
}
從以上消息實例中可以看出一個包裝後的消息包含以下核心屬性:
- messageId:全局唯一的消息ID
- messageType:消息類型
- message:消息體,也就是具體的消息實例
- sourceAddress:消息來源地址
- destinationAddress:消息目標地址
- responseAddress:響應地址,在請求響應模式中使用
- faultAddress:消息異常發送地址,用於存儲異常消費消息
- headers:消息頭,允許應用自定義擴展資訊
- correlationId:關聯Id,在Saga狀態機中會用到,用來關聯繫列事件
- host:宿主,消息來源應用的宿主資訊
Producer
Producer,生產者,即用於生產消息。在MassTransit主要藉助以下對象進行命令的發送和事件的發布。
從以上類圖可以看出,消息的發送主要核心依賴於兩個介面:
ISendEndpoint
:提供了Send
方法,用於發送命令。IPublishEndpoint
:提供了Publish
方法,用於發布事件。
但基於上圖的繼承體系,可以看出通過IBus
、ISendEndpointProvider
和ConsumeContext
進行命令的發送;通過IBus
和IPublishEndpointProvider
進行事件的發布。具體舉例如下:
發送命令
- 通過
IBus
發送:
private readonly IBus _bus;
public async Task Post(CreateOrderRequest request)
{
//通過以下方式配置對應消息類型的目標地址
EndpointConvention.Map<CreateOrderRequest>(new Uri("queue:create-order"));
await _bus.Send(request);
}
- 通過
ISendEndpointProvider
發送:
private readonly ISendEndpointProvider _sendEndpointProvider;
public async Task Post(CreateOrderRequest request)
{
var serviceAddress = new Uri("queue:create-order");
var endpoint = await _sendEndpointProvider.GetSendEndpoint(serviceAddress);
await endpoint.Send(request);
}
- 通過
ConsumeContext
發送:
public class CreateOrderRequestConsumer:IConsumer<CreateOrderRequest>
{
public async Task Consume(ConsumeContext<CreateOrderRequest> context)
{
//do something else
var destinationAddress = new Uri("queue:lock-stock");
var command = new LockStockRequest(context.Message.OrderId);
await context.Send<LockStockRequest>(destinationAddress, command);
// 也可以通過獲取`SendEndpoint`發送命令
// var endpoint = await context.GetSendEndpoint(destinationAddress);
// await endpoint.Send<LockStockRequest>(command);
}
}
發布事件
- 通過
IBus
發布:
private readonly IBus _bus;
public async Task Post(CreateOrderRequest request)
{
//do something
await _bus.Send(request);
}
- 通過
IPublishEndpoint
發布:
private readonly IPublishEndpoint _publishEndpoint;
public async Task Post(CreateOrderRequest request)
{
//do something
var order = CreateOrder(request);
await _publishEndpoint.Publish<OrderCreatedEvent>(new OrderCreateEvent(order.Id));
}
- 通過
ConsumeContext
發布:
public class CreateOrderRequestConsumer: IConsumer<CreateOrderRequest>
{
public async Task Consume(ConsumeContext<CreateOrderRequest> context)
{
、 var order = CreateOrder(conext.Message);
await context.Publish<OrderCreatedEvent>(new OrderCreateEvent(order.Id));
}
}
Consumer
Consumer,消費者,即用於消費消息。MassTransit 包括多種消費者類型,主要分為無狀態和有狀態兩種消費者類型。
無狀態消費者
無狀態消費者,即消費者無狀態,消息消費完畢,消費者就釋放。主要的消費者類型有:IConsumer<TMessage>
、JobConsumer
、IActivity
和RoutingSlip
等。其中IConsumer<TMessage>
已經在上面的快速體驗
部分舉例說明。而JobConsumer<TMessage>
主要是對IConsumer<TMessage>
的補充,其主要應用場景在於執行耗時任務。
而對於IActivity
和RoutingSlip
則是MassTransit Courier
的核心對象,主要用於實現Saga模式的分散式事務。MassTransit Courier 實現了Routing Slip模式,通過按需有序組合一系列的Activity,得到一個用來限定消息處理順序的Routing Slip。而每個Activity的具體抽象就是IActivity
和IExecuteActivity
。二者的差別在於IActivity
定義了Execute
和Compensate
兩個方法,而IExecuteActivitiy
僅定義了Execute
方法。其中Execute
代表正向操作,Compensate
代表反向補償操作。用一個簡單的下單流程:創建訂單->扣減庫存->支付訂單舉例而言,其示意圖如下所示。而對於具體實現,可參閱文章:AspNetCore&MassTransit Courier實現分散式事務
有狀態消費者
有狀態消費者,即消費者有狀態,其狀態會持久化,代表的消費者類型為MassTransitStateMachine
。MassTransitStateMachine
是MassTransit Automatonymous
庫定義的,Automatonymous
是一個.NET 狀態機庫,用於定義狀態機,包括狀態、事件和行為。MassTransitStateMachine
就是狀態機的具體抽象,可以用其編排一系列事件來實現狀態的流轉,也可以用來實現Saga模式的分散式事務。並支援與EF Core和Dapper集成將狀態持久化到關係型資料庫,也支援將狀態持久化到MongoDB、Redis等資料庫。MassTransitStateMachine
對於Saga模式分散式事務的實現方式與RoutingSlip
不同,還是以簡單的下單流程:創建訂單->扣減庫存->支付訂單舉例而言,其示意圖如下所示。基於MassTransitStateMachine
實現分散式事務詳參後續文章。
從上圖可知,通過MassTransitStateMachine
可以將事件的執行順序邏輯編排在一個集中的狀態機中,通過發送命令和訂閱事件來推動狀態流轉,而這也正是Saga編排模式的實現。
應用場景
了解完MassTransit的核心概念,接下來再來看下MassTransit的核心特性以及應用場景:
- 基於消息的請求響應模式:可用於同步通訊
- Mediator模式:中間者模式的實現,類似MediatR,但功能更完善
- 計劃任務:可用於執行定時任務
- Routing Slip 模式:可用於實現Saga模式的分散式事務
- Saga 狀態機:可用於實現Saga模式的分散式事務
- 本地消息表:類似DotNetCore.Cap,用於實現最終一致性
總體而言,MassTransit是一款優秀的分散式應用框架,可作為分散式應用的消息匯流排,也可以用作單體應用的事件匯流排。感興趣的朋友不妨一觀。