NServiceBus+RabbitMQ開發分散式應用

  • 2019 年 10 月 25 日
  • 筆記

前言

     NServiceBus提供了8種傳輸管道組件,分別是Learning、MSMQ、Azure Service Bus、Azure Service Bus (Legacy)、Azure Storage Queues、SQL Server、RabbitMQ、Amazon SQS。前兩篇我們主要用的是Learnning,這篇使用RabbitMQ,也可以直接用於生產。

安裝RabbitMQ組件

       RabbitMQ不在NServiceBus下,是一個單獨的組件,需要單獨安裝。
QQ截圖20191025114539.jpg

設置RabbitMQ連接串

var transport = config.UseTransport<RabbitMQTransport>();  transport.ConnectionString("host=192.168.80.129;username=admin;password=admin");  transport.UseDirectRoutingTopology();

消息生產者(ClientUI)

 class Program      {          static ILog log = LogManager.GetLogger<Program>();          static void Main(string[] args)          {              MainAsync().GetAwaiter().GetResult();          }            static async Task MainAsync()          {              Console.Title = "Sample.ClientUI";              var config = new EndpointConfiguration("Sample.ClientUI");              config.UseSerialization<NewtonsoftSerializer>();              config.UsePersistence<InMemoryPersistence>();              config.EnableInstallers();                var transport = config.UseTransport<RabbitMQTransport>();              transport.ConnectionString("host=192.168.80.129;username=admin;password=admin");              transport.UseDirectRoutingTopology();                var endpointInstance = await Endpoint.Start(config).ConfigureAwait(false);                await RunAsync(endpointInstance).ConfigureAwait(false);                await endpointInstance.Stop().ConfigureAwait(false);          }              static async Task RunAsync(IEndpointInstance endpointInstance)          {              log.Info("Press 'P' to send an PlaceOrder Command");              while (true)              {                  var key = Console.ReadKey();                  Console.WriteLine();                    switch (key.Key)                  {                      case ConsoleKey.P:                          {                              var command = new PlaceOrder                              {                                  OrderId = Guid.NewGuid().ToString()                              };                                log.Info($"Sending PlaceOrder with OrderId:{command.OrderId}");                              await endpointInstance.Send("Sample.Server",command).ConfigureAwait(false);                              break;                          }                        case ConsoleKey.Q:                          return;                      default:                          log.Info("Please try again");                          break;                  }              }          }      }

    先啟動ClientUI,啟動後先發送幾條數據,發送數據觀察程式控制台數據是否發送,然後在RabbitMQ控制台里觀察Queue的情況,沒創建Queue時,這裡會自動創建Queue,隊列名稱和端點名稱相同。這裡發送消息數據,數據會在Sample.Server隊列里。
QQ截圖20191025114228.jpg

QQ截圖20191025114115.jpg

QQ截圖20191025114135.jpg
     這裡能看到在隊列Sample.Server里有三條未消費的數據,因為我還沒有啟動Sample.Server控制台程式。

消息消費者(Server)

public class Program  {      static ILog log = LogManager.GetLogger<Program>();      static void Main(string[] args)      {          MainAsync().GetAwaiter().GetResult();      }          static async Task MainAsync()      {          Console.Title = "Sample.Server";          var config = new EndpointConfiguration("Sample.Server");          config.UseSerialization<NewtonsoftSerializer>();          config.UsePersistence<InMemoryPersistence>();          config.EnableInstallers();            var transport = config.UseTransport<RabbitMQTransport>();          transport.ConnectionString("host=192.168.80.129;username=admin;password=admin");          transport.UseDirectRoutingTopology();            var endpointInstance = await Endpoint.Start(config).ConfigureAwait(false);            log.Info("Press any key to quit");          Console.ReadKey();            await endpointInstance.Stop().ConfigureAwait(false);      }  }
public class PlaceOrderHandler : IHandleMessages<PlaceOrder>  {      static ILog log = LogManager.GetLogger<PlaceOrderHandler>();      public Task Handle(PlaceOrder message, IMessageHandlerContext context)      {          log.Info($"Received PlaceOrder with OrderId:{message.OrderId}");          return Task.CompletedTask;      }  }

      啟動Sample.Server控制台後,就會消費Queue里的數據,消費完成後觀察程式控制台的提示和RabbitMQ控制台的提示。
QQ截圖20191025114221.jpg

QQ截圖20191025114245.jpg
QQ截圖20191025114259.jpg
    這裡能看到剛才堆積的三條數據已經被消費了。

總結

    寫完這個demo我就在想在程式里通過使用NServiceBus的RabbitMQ組件和直接在程式里使用RabbitMQ直接調用的區別,到底有沒有必要通過NServiceBus調用,還需要進一步考量。