使用NServiceBus开发分布式应用

  • 2019 年 10 月 21 日
  • 筆記

前言

NServiceBus是.Net平台下的开源的消息服务框架,已经支持.Net Core。目前稳定版本7.1。企业开发需要购买License,开发者可在线下载开发者License。
官方网站:https://particular.net/
官方示例:https://docs.particular.net/get-started/

NServiceBus入门

屏幕快照 2019-10-21 21.17.54.png
如图所示,项目一共包括4个端点(Endpoint),也就是四个单独的项目,端点是NServiceBus中的核心概念,发送消息和事件发布订阅的基础都是Endpoint。这个项目中包括发送消息和事件的发布订阅。

完整的项目结构如图所示:
屏幕快照 2019-10-21 21.22.11.png

ClientUI

class Program      {          private static ILog log = LogManager.GetLogger<Program>();          static void Main(string[] args)          {              MainAsync().GetAwaiter().GetResult();          }              static async Task RunAsync(IEndpointInstance endpointInstance)          {              log.Info("Press 'P' to place an order,press 'Q' to quit");                while (true)              {                    var key = Console.ReadKey();                  Console.WriteLine();                    switch (key.Key)                  {                      case ConsoleKey.P:                      {                          var command = new PlaceOrder                          {                              OrderId = Guid.NewGuid().ToString()                          };                            log.Info($"Sending PlaceOrder with OrderId:{command.OrderId}");                          //发送到Sales端点                          await endpointInstance.Send("Sales",command).ConfigureAwait(false);                          break;                      }                        case ConsoleKey.Q:                          return;                      default:                          log.Info("Please try again");                          break;                  }                }          }            static async Task MainAsync()          {              Console.Title = "Client-UI";              var config = new EndpointConfiguration("ClientUI");//设置端点名称              config.UseTransport<LearningTransport>(); //设置消息管道模式,LearningTransport仅仅用来学习,生产慎用              config.UsePersistence<LearningPersistence>();//持久化                var endpointInstance =await Endpoint.Start(config).ConfigureAwait(false);                await RunAsync(endpointInstance).ConfigureAwait(false); //RunAsync返回的是Task,所以这里使用ConfigureAwait()                await endpointInstance.Stop().ConfigureAwait(false);            }      }

Sales

class Program  {      static async Task Main(string[] args)      {          Console.Title = "Sales";            var config = new EndpointConfiguration("Sales");          config.UseTransport<LearningTransport>();          config.UsePersistence<LearningPersistence>();            var endpointInstance = await Endpoint.Start(config).ConfigureAwait(false);            Console.WriteLine("Press Enter to quit...");          Console.ReadLine();            await endpointInstance.Stop().ConfigureAwait(false);        }  }    public class PlaceOrderHandler:IHandleMessages<PlaceOrder>  {     private static ILog log = LogManager.GetLogger<PlaceOrderHandler>();     public Task Handle(PlaceOrder message, IMessageHandlerContext context)     {        //接受端点消息        log.Info($"Received PlaceOrder ,OrderId:{message.OrderId}");          //发布OrderPlaced事件        var order=new OrderPlaced();        order.OrderId = message.OrderId;          return context.Publish(order);     }  }

Billing

static async Task Main(string[] args)  {      Console.Title = "Sales";        var config = new EndpointConfiguration("Billing");      config.UseTransport<LearningTransport>();      config.UsePersistence<LearningPersistence>();        var endpointInstance = await Endpoint.Start(config).ConfigureAwait(false);        Console.WriteLine("Press Enter to quit...");      Console.ReadLine();        await endpointInstance.Stop().ConfigureAwait(false);  }     public class OrderPlacedHandler:IHandleMessages<OrderPlaced>   {      private static ILog log = LogManager.GetLogger<OrderPlacedHandler>();        public Task Handle(OrderPlaced message, IMessageHandlerContext context)      {         //订阅OrderPlaced事件         log.Info($"Received OrderPlaced,OrderId {message.OrderId} - Charging credit card");           //发布OrderBilled事件         var order=new OrderBilled();         order.OrderId = message.OrderId;         return context.Publish(order);       }  }

Shipping

static async Task Main(string[] args)  {      Console.Title = "Sales";        var config = new EndpointConfiguration("Shipping");      config.UseTransport<LearningTransport>();      config.UsePersistence<LearningPersistence>();        var endpointInstance = await Endpoint.Start(config).ConfigureAwait(false);        Console.WriteLine("Press Enter to quit...");      Console.ReadLine();        await endpointInstance.Stop().ConfigureAwait(false);  }    public class OrderBilledHandler:IHandleMessages<OrderBilled>  {    private static ILog log = LogManager.GetLogger<OrderBilledHandler>();    //处理OrderBilled订阅事件    public Task Handle(OrderBilled message, IMessageHandlerContext context)    {        log.Info($"Received OrderBilled,OrderId={message.OrderId} Should we ship now?");        return Task.CompletedTask;    }  }    public class OrderPlacedHandler:IHandleMessages<OrderPlaced>  {     private static ILog log = LogManager.GetLogger<OrderPlacedHandler>();     //处理OrderPlaced订阅事件     public Task Handle(OrderPlaced message, IMessageHandlerContext context)     {         log.Info($"Received OrderPlaced,OrderId={message.OrderId} Should we ship now?");         return Task.CompletedTask;     }  }      

运行结果

屏幕快照 2019-10-21 21.07.43.png

屏幕快照 2019-10-21 21.07.51.png

屏幕快照 2019-10-21 21.07.57.png

屏幕快照 2019-10-21 21.08.03.png

总结

      NServiceBus的核心是在端点之间通信,通信的实体需要实现ICommand接口,通信的事件需要实现IEvent事件,NServiceBus会扫描实现这两个接口的类。每个端点之间的关键配置就是EndpointConfiguration。