NServiceBus+RabbitMQ開發分散式應用
- 2019 年 10 月 25 日
- 筆記
前言
NServiceBus提供了8種傳輸管道組件,分別是Learning、MSMQ、Azure Service Bus、Azure Service Bus (Legacy)、Azure Storage Queues、SQL Server、RabbitMQ、Amazon SQS。前兩篇我們主要用的是Learnning,這篇使用RabbitMQ,也可以直接用於生產。
安裝RabbitMQ組件
RabbitMQ不在NServiceBus下,是一個單獨的組件,需要單獨安裝。
設置RabbitMQ連接串
var transport = config.UseTransport<RabbitMQTransport>(); transport.ConnectionString("host=192.168.80.129;username=admin;password=admin"); transport.UseDirectRoutingTopology();
消息生產者(ClientUI)
class Program { static ILog log = LogManager.GetLogger<Program>(); static void Main(string[] args) { MainAsync().GetAwaiter().GetResult(); } static async Task MainAsync() { Console.Title = "Sample.ClientUI"; var config = new EndpointConfiguration("Sample.ClientUI"); config.UseSerialization<NewtonsoftSerializer>(); config.UsePersistence<InMemoryPersistence>(); config.EnableInstallers(); var transport = config.UseTransport<RabbitMQTransport>(); transport.ConnectionString("host=192.168.80.129;username=admin;password=admin"); transport.UseDirectRoutingTopology(); var endpointInstance = await Endpoint.Start(config).ConfigureAwait(false); await RunAsync(endpointInstance).ConfigureAwait(false); await endpointInstance.Stop().ConfigureAwait(false); } static async Task RunAsync(IEndpointInstance endpointInstance) { log.Info("Press 'P' to send an PlaceOrder Command"); while (true) { var key = Console.ReadKey(); Console.WriteLine(); switch (key.Key) { case ConsoleKey.P: { var command = new PlaceOrder { OrderId = Guid.NewGuid().ToString() }; log.Info($"Sending PlaceOrder with OrderId:{command.OrderId}"); await endpointInstance.Send("Sample.Server",command).ConfigureAwait(false); break; } case ConsoleKey.Q: return; default: log.Info("Please try again"); break; } } } }
先啟動ClientUI,啟動後先發送幾條數據,發送數據觀察程式控制台數據是否發送,然後在RabbitMQ控制台里觀察Queue的情況,沒創建Queue時,這裡會自動創建Queue,隊列名稱和端點名稱相同。這裡發送消息數據,數據會在Sample.Server隊列里。
這裡能看到在隊列Sample.Server里有三條未消費的數據,因為我還沒有啟動Sample.Server控制台程式。
消息消費者(Server)
public class Program { static ILog log = LogManager.GetLogger<Program>(); static void Main(string[] args) { MainAsync().GetAwaiter().GetResult(); } static async Task MainAsync() { Console.Title = "Sample.Server"; var config = new EndpointConfiguration("Sample.Server"); config.UseSerialization<NewtonsoftSerializer>(); config.UsePersistence<InMemoryPersistence>(); config.EnableInstallers(); var transport = config.UseTransport<RabbitMQTransport>(); transport.ConnectionString("host=192.168.80.129;username=admin;password=admin"); transport.UseDirectRoutingTopology(); var endpointInstance = await Endpoint.Start(config).ConfigureAwait(false); log.Info("Press any key to quit"); Console.ReadKey(); await endpointInstance.Stop().ConfigureAwait(false); } }
public class PlaceOrderHandler : IHandleMessages<PlaceOrder> { static ILog log = LogManager.GetLogger<PlaceOrderHandler>(); public Task Handle(PlaceOrder message, IMessageHandlerContext context) { log.Info($"Received PlaceOrder with OrderId:{message.OrderId}"); return Task.CompletedTask; } }
啟動Sample.Server控制台後,就會消費Queue里的數據,消費完成後觀察程式控制台的提示和RabbitMQ控制台的提示。
這裡能看到剛才堆積的三條數據已經被消費了。
總結
寫完這個demo我就在想在程式里通過使用NServiceBus的RabbitMQ組件和直接在程式里使用RabbitMQ直接調用的區別,到底有沒有必要通過NServiceBus調用,還需要進一步考量。