NServiceBus+Saga開發分散式應用

  • 2019 年 10 月 23 日
  • 筆記

前言

      當你在處理非同步消息時,每個單獨的消息處理程式都是一個單獨的handler,每個handler之間互不影響。這時如果一個消息依賴另一個消息的狀態呢? 這時業務邏輯怎麼處理?

image.png

     借用我們上篇文章的業務場景,如果在Ship項目里需要發送一個ShipOrder Command。這個ShipOrder需要依賴Sales.OrderPlaced和Bill.OrderBilled Command的狀態,目前我們的兩個單獨的Message Handler都沒有保持任何的狀態欄位,所以這時如果我們需要完成這個業務模型,就需要跟蹤他們的狀態。

什麼是Saga

    這個就是本篇文章要提的saga,定義在NServiceBus框架里,他的本質是一個消息驅動模型里的狀態機,或者也可以理解為一系列消息處理程式用來共享狀態的業務模型。我理解在消息隊列里如果我們要保證消息一致性通常會自己創建一張Event表,這裡saga維持狀態的角色有點像我們這裡的Event表。
   
螢幕快照 2019-10-23 20.46.15.png
   好的,回到正題上,如果我們需要在Shipping Service里發送一個ShipOrder,發送他之前需要確定OrderPlaced和OrderBilled的狀態,確保這兩個消息都收到以後才能發送ShipOrder。

如何使用Saga

   當然,我暫且理解Saga的目的是為了處理在長時間運行的任務里保證數據一致性這樣的一個角色。

Saga狀態

  saga狀態主要是告訴NServiceBus在處理數據一致性的判斷邏輯,這裡需要繼承抽象類ContainSagaData,在我們這個業務場景中則主要是判斷OrderPlaced和OrderBilled消息是否已經接收到並處理。

public class ShippingPolicyData:ContainSagaData  {     public string OrderId { get; set; }     public bool IsOrderPlaced { get; set; }     public bool IsOrderBilled { get; set; }  }

Saga如何工作

   有了狀態以後,我們還需要一個「handler」來告訴NServiceBus,在這個handler里主要用來處理消息數據一致性,我看了官方文檔後,他們建議我們這裡的handler角色使用Policy後綴命名,當然我覺的也可以用Saga後綴命名,比如ShippingPolicy或者ShippingSaga。
   同時這裡我們這個handler覺色還要繼承Saga類,Saga類主要重寫方法ConfigureHowToFindSaga,這個方法的作用主要是在接受的消息和我們的Saga實體之間建立映射關係。

 public class ShipPolicy:Saga<ShippingPolicyData>,          IAmStartedByMessages<OrderPlaced>,          IAmStartedByMessages<OrderBilled> //都可以創建Saga實例      {          private static ILog log = LogManager.GetLogger<ShipPolicy>();            protected override void ConfigureHowToFindSaga(SagaPropertyMapper<ShippingPolicyData> mapper)          {              mapper.ConfigureMapping<OrderPlaced>(t=>t.OrderId).ToSaga(sagaData=>sagaData.OrderId);              mapper.ConfigureMapping<OrderBilled>(t=>t.OrderId).ToSaga(sagaData=>sagaData.OrderId);            }            public Task Handle(OrderPlaced message, IMessageHandlerContext context)          {              log.Info("OrderPlaced message received ");              this.Data.IsOrderPlaced = true;              return ProcessOrder(context);          }            public Task Handle(OrderBilled message, IMessageHandlerContext context)          {                log.Info("OrderBilled message received");              this.Data.IsOrderBilled = true;              return ProcessOrder(context);          }            private async Task ProcessOrder(IMessageHandlerContext context)          {              if (Data.IsOrderBilled && Data.IsOrderPlaced)              {                  await context.SendLocal(new ShipOrder()                  {                      OrderId = Data.OrderId                  });                    MarkAsComplete();              }          }      }

    
    這個類里你會發現還實現了介面IAmStartedByMessages, 這個介面主要是告訴Saga,不論是那種消息類型先進來,都可以創建一個Saga實例,就比如是Event表,不管那個消息進來,都需要先插入一條數據,後續消息再進來時要更新數據狀態,當然,這裡的Saga實例也好,Event表也好,關鍵問題就是有效標識,或者叫主鍵,我們這個業務模型里,OrderPlaced和OrderBilled都包含一個屬性OrderId, 這裡Saga實例則使用這個OrderId做關鍵屬性。

發送ShipOrder Command

    到這裡也就是我們的OrderPlaced和OrderBIlled消息都收到了,業務邏輯符合要求,可以發送ShipOrder消息了,也就是用戶創建了訂單,付了款,可以發貨了。
image.png

新建ShipOrder類

public class ShipOrder:ICommand  {      public string OrderId { get; set; }  }

新建ShipOrderHandler

public class ShipOrderHandler:IHandleMessages<ShipOrder>  {     private static ILog log = LogManager.GetLogger<ShipOrderHandler>();     public Task Handle(ShipOrder message, IMessageHandlerContext context)     {         log.Info($"Order [{message.OrderId}] - Successfully shipped");         return Task.CompletedTask;     }  }

運行Shipping項目,看到下圖,則說明程式運行成功,我們這個業務場景里OrderPlaced消息肯定先接受到,OrderBilled消息後接受到。
螢幕快照 2019-10-23 20.29.05.png

參考鏈接

https://docs.particular.net/tutorials/nservicebus-sagas/1-getting-started/
https://docs.particular.net/nservicebus/sagas/