基於ASP.NET Core 5.0使用RabbitMQ消息隊列實現事件匯流排(EventBus)

文章閱讀請前先參考看一下 //www.cnblogs.com/hudean/p/13858285.html 安裝RabbitMQ消息隊列軟體與了解C#中如何使用RabbitMQ 和 //www.cnblogs.com/Keep-Ambition/p/8038885.html 添加一個用戶並可以遠程訪問,

 消息隊列的作用:跨服務通訊、服務之間解耦,削峰、非同步,其實還有一個作用是提高接收者性能

RabbitMQ 官方網站://www.rabbitmq.com/

RabbitMQ 中文文檔網址://rabbitmq.mr-ping.com/

本文程式碼GitHub 地址是: //github.com/hudean/MQDemo

一、初衷

為什麼要設計消息匯流排(對消息隊列進行二次封裝),而不是讓各業務系統直接使用RabbitMQ、Kafka、RocketMQ這樣的成熟的消息隊列呢? 如果業務系統比較簡單,確實不需要考慮這樣的問題,直接拿最成熟的開源方案是最好的方式,但是在複雜的多系統下、多人分工合作的場景下,直接使用成熟的消息隊列一般都會面臨以下問題

  • 開發難度大,各系統間分別隔離,需要關注消息中間件的各種複雜繁瑣的配置,關注不同的消息則需要對接不同的消息隊列
  • 維護成本高,各系統或團隊需要分別管理消息中間件、處理各種服務異常、(消息中間件的高可用、業務的高可用等)
  • 管理難度大,沒法對消息的生產和消費進行業務管理,也不方便對消息中的敏感數據進行許可權管理
  • 擴展成本高,無法統一消息系統擴展功能,如路由、延時、重試、消費確認等 總結消息隊列是一個面向技術的接入,重點關注消息隊列的配置、介面對接;而消息匯流排則是通過屏蔽部署、分組和通訊等技術細節,實現一個面向業務的接入,重點關注要接收什麼消息。

定義

事件匯流排是實現基於事件驅動模式的方式之一,事件發送者將事件消息發送到一個事件匯流排上,事件訂閱者向事件匯流排訂閱和接收事件,而後再處理接收到的事件。固然,訂閱者不只能夠接收和消費事件,它們自己也能夠建立事件,並將它們發送到事件匯流排上。

事件匯流排是對發布-訂閱模式的一種實現。它是一種集中式事件處理機制,容許不一樣的組件之間進行彼此通訊而又不須要相互依賴,達到一種解耦的目的。

如前所述,使用基於事件的通訊時,當值得注意的事件發生時,微服務會發布事件,例如更新業務實體時。 其他微服務訂閱這些事件。 微服務收到事件時,可以更新其自己的業務實體,這可能會導致發布更多事件。 這是最終一致性概念的本質。 通常通過使用事件匯流排實現來執行此發布/訂閱系統。 事件匯流排可以設計為包含 API 的介面,該 API 是訂閱和取消訂閱事件和發布事件所需的。 它還可以包含一個或多個基於跨進程或消息通訊的實現,例如支援非同步通訊和發布/訂閱模型的消息隊列或服務匯流排。

可以使用事件來實現跨多個服務的業務事務,這可提供這些服務間的最終一致性。 最終一致事務由一系列分散式操作組成。 在每個操作中,微服務會更新業務實體,並發布可觸發下一個操作的事件。 下面的圖 6-18 顯示了通過事件匯流排發布了 PriceUpdated 事件,因此價格更新傳播到購物籃和其他微服務。

Diagram of asynchronous event-driven communication with an event bus.

圖 6-18。 基於事件匯流排的事件驅動的通訊

本部分介紹如何使用通用事件匯流排介面(如圖 6-18 所示)實現這種與 .NET 的通訊。 存在多種可能的實現,每種實現使用不同的技術或基礎結構,例如 RabbitMQ、Azure 服務匯流排或任何其他第三方開源或商用服務匯流排。

三、集成事件

集成事件用於跨多個微服務或外部系統保持域狀態同步。 此功能可通過在微服務外發布集成事件來實現。 將事件發布到多個接收方微服務(訂閱到集成事件的儘可能多個微服務)時,每個接收方微服務中的相應事件處理程式會處理該事件。

集成事件基本上是數據保持類,如以下示例所示:

 1 using System;
 2 using System.Collections.Generic;
 3 using System.Linq;
 4 using System.Text;
 5 using System.Text.Json.Serialization;
 6 using System.Threading.Tasks;
 7 
 8 namespace EventBus.Events
 9 {
10     /// <summary>
11     /// 集成事件
12     /// </summary>
13     public record IntegrationEvent
14     {
15         public IntegrationEvent()
16         {
17             Id = Guid.NewGuid();
18             CreationDate = DateTime.UtcNow;
19         }
20         [JsonConstructor]
21         public IntegrationEvent(Guid id, DateTime createDate)
22         {
23             Id = id;
24             CreationDate = createDate;
25         }
26 
27         [JsonInclude]
28         public Guid Id { get; private init; }
29 
30         [JsonInclude]
31         public DateTime CreationDate { get; private init; }
32     }
33 }

View Code

 1 public class ProductPriceChangedIntegrationEvent : IntegrationEvent
 2 {
 3     public int ProductId { get; private set; }
 4     public decimal NewPrice { get; private set; }
 5     public decimal OldPrice { get; private set; }
 6 
 7     public ProductPriceChangedIntegrationEvent(int productId, decimal newPrice,
 8         decimal oldPrice)
 9     {
10         ProductId = productId;
11         NewPrice = newPrice;
12         OldPrice = oldPrice;
13     }
14 }

View Code

 

 

事件匯流排

事件匯流排可實現發布/訂閱式通訊,無需組件之間相互顯式識別,如圖 6-19 所示。

A diagram showing the basic publish/subscribe pattern.

圖 6-19。 事件匯流排的發布/訂閱基礎知識

上圖顯示了微服務 A 發布到事件匯流排,這會分發到訂閱微服務 B 和 C,發布伺服器無需知道訂閱伺服器。 事件匯流排與觀察者模式和發布-訂閱模式相關。

觀察者模式

觀察者模式中,主對象(稱為可觀察對象)將相關資訊(事件)告知其他感興趣的對象(稱為觀察者)。

發布-訂閱(發布/訂閱)模式

發布/訂閱模式的用途與觀察者模式相同:某些事件發生時,需要告知其他服務。 但觀察者模式與發布/訂閱模式之間存在重要區別。 在觀察者模式中,直接從可觀察對象廣播到觀察者,因此它們「知道」彼此。 但在發布/訂閱模式中,存在稱為中轉站、消息中轉站或事件匯流排的第三個組件,發布伺服器和訂閱伺服器都知道第三個組件。 因此,使用發布/訂閱模式時,發布伺服器和訂閱伺服器通過所述的事件匯流排或消息中轉站精確分離。

中轉站或事件匯流排

如何實現發布伺服器和訂閱伺服器之間的匿名? 一個簡單方法是讓中轉站處理所有通訊。 事件匯流排是一個這樣的中轉站。

事件匯流排通常由兩部分組成:

  • 抽象或介面。

  • 一個或多個實現。

在圖 6-19 中,從應用程式角度看,會發現事件匯流排實際上是一個發布/訂閱通道。 實現此非同步通訊的方式可能會有差異。 它可以具有多個實現,以便你進行交換,具體取決於環境要求(例如,生產和開發環境)。

在圖 6-20 中,可看到事件匯流排的抽象,包含基於 RabbitMQ、Azure 服務匯流排或其他事件/消息中轉站等基礎結構消息技術的多個實現。

Diagram showing the addition of an event bus abstraction layer.

圖 6- 20。 事件匯流排的多個實現

最好通過介面定義事件匯流排,以便它可使用多種技術(例如 RabbitMQ、Azure 服務匯流排等)來實現。 但是,如前所述,僅當需要由你的抽象支援的基本事件匯流排功能時,才適合使用你自己的抽象(事件匯流排介面)。 如果需要更豐富的服務匯流排功能,應使用你喜歡的商用服務匯流排提供的 API 和抽象,而不是你自己的抽象。

定義事件匯流排介面

首先,讓我們了解一下事件匯流排介面的一些實現程式碼和可能的實現。 介面應是通用和簡單的,如下所示介面。

 1 using EventBus.Events;
 2 
 3 namespace EventBus.Abstractions
 4 {
 5     /// <summary>
 6     /// 事件匯流排介面
 7     /// </summary>
 8     public interface IEventBus
 9     {
10         /// <summary>
11         /// 發布
12         /// </summary>
13         /// <param name="event"></param>
14         void Publish(IntegrationEvent @event);
15 
16         /// <summary>
17         /// 訂閱
18         /// </summary>
19         /// <typeparam name="T"></typeparam>
20         /// <typeparam name="TH"></typeparam>
21         void Subscribe<T, TH>()
22             where T : IntegrationEvent
23             where TH : IIntegrationEventHandler<T>;
24 
25         /// <summary>
26         /// 動態訂閱
27         /// </summary>
28         /// <typeparam name="TH"></typeparam>
29         /// <param name="eventName"></param>
30         void SubscribeDynamic<TH>(string eventName)
31             where TH : IDynamicIntegrationEventHandler;
32 
33         /// <summary>
34         /// 取消動態訂閱
35         /// </summary>
36         /// <typeparam name="TH"></typeparam>
37         /// <param name="eventName"></param>
38         void UnsubscribeDynamic<TH>(string eventName)
39             where TH : IDynamicIntegrationEventHandler;
40 
41         /// <summary>
42         /// 取消訂閱
43         /// </summary>
44         /// <typeparam name="T"></typeparam>
45         /// <typeparam name="TH"></typeparam>
46         void Unsubscribe<T, TH>()
47             where TH : IIntegrationEventHandler<T>
48             where T : IntegrationEvent;
49     }
50 }

View Code

藉助 RabbitMQ 的事件匯流排實現,微服務可訂閱事件、發布事件和接收事件,如圖 6-21 所示。

顯示消息發送方和消息接收方之間的 RabbitMQ 的關係圖。

圖 6-21。 事件匯流排的 RabbitMQ 實現

RabbitMQ 充當消息發布伺服器和訂閱者之間的中介,處理分發。 在程式碼中,EventBusRabbitMQ 類實現了泛型 IEventBus 介面。 此實現基於依賴項注入,以便可以從此開發/測試版本交換到生產版本。

1 public class EventBusRabbitMQ : IEventBus, IDisposable
2 {
3     // Implementation using RabbitMQ API
4     //...
5 }

View Code

示例開發/測試事件匯流排的 RabbitMQ 實現是樣板程式碼。 它必須處理與 RabbitMQ 伺服器的連接,並提供用於將消息事件發布到隊列的程式碼。 它還必須為每個事件類型實現收集集成事件處理程式的字典;這些事件類型可以對每個接收器微服務具有不同的實例化和不同的訂閱,如圖 6-21 所示。

四、使用 RabbitMQ 實現一個簡單的發布方法

下面的程式碼是 RabbitMQ 的事件匯流排實現的簡化版,用以展示整個方案。 你真的不必以這種方式處理連接。 要查看完整的實現,在後面

 1 public class EventBusRabbitMQ : IEventBus, IDisposable
 2 {
 3     // Member objects and other methods ...
 4     // ...
 5 
 6     public void Publish(IntegrationEvent @event)
 7     {
 8         var eventName = @event.GetType().Name;
 9         var factory = new ConnectionFactory() { HostName = _connectionString };
10         using (var connection = factory.CreateConnection())
11         using (var channel = connection.CreateModel())
12         {
13             channel.ExchangeDeclare(exchange: _brokerName,
14                 type: "direct");
15             string message = JsonConvert.SerializeObject(@event);
16             var body = Encoding.UTF8.GetBytes(message);
17             channel.BasicPublish(exchange: _brokerName,
18                 routingKey: eventName,
19                 basicProperties: null,
20                 body: body);
21        }
22     }
23 }

View Code

五、使用 RabbitMQ API 實現訂閱程式碼

與發布程式碼一樣,下面的程式碼是 RabbitMQ 事件匯流排實現的簡化部分。

public class EventBusRabbitMQ : IEventBus, IDisposable
{
    // Member objects and other methods ...
    // ...

    public void Subscribe<T, TH>()
        where T : IntegrationEvent
        where TH : IIntegrationEventHandler<T>
    {
        var eventName = _subsManager.GetEventKey<T>();

        var containsKey = _subsManager.HasSubscriptionsForEvent(eventName);
        if (!containsKey)
        {
            if (!_persistentConnection.IsConnected)
            {
                _persistentConnection.TryConnect();
            }

            using (var channel = _persistentConnection.CreateModel())
            {
                channel.QueueBind(queue: _queueName,
                                    exchange: BROKER_NAME,
                                    routingKey: eventName);
            }
        }

        _subsManager.AddSubscription<T, TH>();
    }
}

View Code

每個事件類型都有一個相關的通道,以獲取 RabbitMQ 中的事件。 然後,可以根據需要在每個通道和事件類型中擁有儘可能多的事件處理程式。

訂閱方法接受一個 IIntegrationEventHandler 對象,該對象相當於當前微服務中的回調方法,以及其相關的 IntegrationEvent 對象。 然後,程式碼將該事件處理程式添加到事件處理程式列表,每個客戶端微服務的每個集成事件類型都可具有事件處理程式。 如果客戶端程式碼尚未訂閱事件,該程式碼將為事件類型創建一個通道,以便在從任何其他服務中發布事件時,它可以從 RabbitMQ 以推送方式接收事件。

六、使用 RabbitMQ 完整實現事件匯流排程式碼

結構圖如下:

 

 

動態集成事件處理器介面

 1 using System.Threading.Tasks;
 2 
 3 namespace EventBus.Abstractions
 4 {
 5     /// <summary>
 6     /// 動態集成事件處理器介面
 7     /// </summary>
 8     public interface IDynamicIntegrationEventHandler
 9     {
10         Task Handle(dynamic eventData);
11     }
12 }

View Code

事件匯流排介面

 1 using EventBus.Events;
 2 
 3 namespace EventBus.Abstractions
 4 {
 5     /// <summary>
 6     /// 事件匯流排介面
 7     /// </summary>
 8     public interface IEventBus
 9     {
10         /// <summary>
11         /// 發布
12         /// </summary>
13         /// <param name="event"></param>
14         void Publish(IntegrationEvent @event);
15 
16         /// <summary>
17         /// 訂閱
18         /// </summary>
19         /// <typeparam name="T"></typeparam>
20         /// <typeparam name="TH"></typeparam>
21         void Subscribe<T, TH>()
22             where T : IntegrationEvent
23             where TH : IIntegrationEventHandler<T>;
24 
25         /// <summary>
26         /// 動態訂閱
27         /// </summary>
28         /// <typeparam name="TH"></typeparam>
29         /// <param name="eventName"></param>
30         void SubscribeDynamic<TH>(string eventName)
31             where TH : IDynamicIntegrationEventHandler;
32 
33         /// <summary>
34         /// 取消動態訂閱
35         /// </summary>
36         /// <typeparam name="TH"></typeparam>
37         /// <param name="eventName"></param>
38         void UnsubscribeDynamic<TH>(string eventName)
39             where TH : IDynamicIntegrationEventHandler;
40 
41         /// <summary>
42         /// 取消訂閱
43         /// </summary>
44         /// <typeparam name="T"></typeparam>
45         /// <typeparam name="TH"></typeparam>
46         void Unsubscribe<T, TH>()
47             where TH : IIntegrationEventHandler<T>
48             where T : IntegrationEvent;
49     }
50 }

View Code

集成事件處理器介面

 1 using EventBus.Events;
 2 using System;
 3 using System.Collections.Generic;
 4 using System.Linq;
 5 using System.Text;
 6 using System.Threading.Tasks;
 7 
 8 namespace EventBus.Abstractions
 9 {
10     /// <summary>
11     /// 集成事件處理器介面
12     /// </summary>
13     /// <typeparam name="TIntegrationEvent">TIntegrationEvent泛型</typeparam>
14     public interface IIntegrationEventHandler<in TIntegrationEvent> : IIntegrationEventHandler
15        where TIntegrationEvent : IntegrationEvent
16     {
17         Task Handle(TIntegrationEvent @event);
18     }
19 
20     /// <summary>
21     /// 集成事件處理器
22     /// </summary>
23     public interface IIntegrationEventHandler
24     {
25     }
26 }

View Code

集成事件

 1 using System.Text;
 2 using System.Text.Json.Serialization;
 3 using System.Threading.Tasks;
 4 
 5 namespace EventBus.Events
 6 {
 7     /// <summary>
 8     /// 集成事件
 9     /// </summary>
10     public record IntegrationEvent
11     {
12         public IntegrationEvent()
13         {
14             Id = Guid.NewGuid();
15             CreationDate = DateTime.UtcNow;
16         }
17         [JsonConstructor]
18         public IntegrationEvent(Guid id, DateTime createDate)
19         {
20             Id = id;
21             CreationDate = createDate;
22         }
23 
24         [JsonInclude]
25         public Guid Id { get; private init; }
26 
27         [JsonInclude]
28         public DateTime CreationDate { get; private init; }
29     }
30 }

View Code

GenericTypeExtensions

 1 using System;
 2 using System.Collections.Generic;
 3 using System.Linq;
 4 using System.Text;
 5 using System.Threading.Tasks;
 6 
 7 namespace EventBus.Extensions
 8 {
 9     public static class GenericTypeExtensions
10     {
11         public static string GetGenericTypeName(this Type type)
12         {
13             var typeName = string.Empty;
14             if (type.IsGenericType)
15             {
16                 var genericTypes = string.Join(",", type.GetGenericArguments().Select(t => t.Name).ToArray());
17                 typeName = $"{type.Name.Remove(type.Name.IndexOf('`'))}<{genericTypes}>";
18             }
19             else
20             {
21                 typeName = type.Name;
22             }
23 
24             return typeName;
25         }
26 
27         /// <summary>
28         /// 獲取通用類型名稱
29         /// </summary>
30         /// <param name="object"></param>
31         /// <returns></returns>
32         public static string GetGenericTypeName(this object @object)
33         {
34             return @object.GetType().GetGenericTypeName();
35         }
36     }
37 }

View Code

事件匯流排訂閱管理器介面

  1 using EventBus.Abstractions;
  2 using EventBus.Events;
  3 using System;
  4 using System.Collections.Generic;
  5 using System.Linq;
  6 using System.Text;
  7 using System.Threading.Tasks;
  8 using static EventBus.InMemoryEventBusSubscriptionsManager;
  9 
 10 namespace EventBus
 11 {
 12     /// <summary>
 13     /// 事件匯流排訂閱管理器介面
 14     /// </summary>
 15     public interface IEventBusSubscriptionsManager
 16     {
 17         bool IsEmpty { get; }
 18         event EventHandler<string> OnEventRemoved;
 19 
 20         /// <summary>
 21         /// 添加動態訂閱
 22         /// </summary>
 23         /// <typeparam name="TH"></typeparam>
 24         /// <param name="eventName"></param>
 25         void AddDynamicSubscription<TH>(string eventName)
 26            where TH : IDynamicIntegrationEventHandler;
 27         
 28         /// <summary>
 29         /// 添加訂閱
 30         /// </summary>
 31         /// <typeparam name="T"></typeparam>
 32         /// <typeparam name="TH"></typeparam>
 33         void AddSubscription<T, TH>()
 34            where T : IntegrationEvent
 35            where TH : IIntegrationEventHandler<T>;
 36 
 37         /// <summary>
 38         /// 刪除訂閱
 39         /// </summary>
 40         /// <typeparam name="T"></typeparam>
 41         /// <typeparam name="TH"></typeparam>
 42         void RemoveSubscription<T, TH>()
 43              where TH : IIntegrationEventHandler<T>
 44              where T : IntegrationEvent;
 45 
 46         /// <summary>
 47         /// 移除動態訂閱
 48         /// </summary>
 49         /// <typeparam name="TH"></typeparam>
 50         /// <param name="eventName"></param>
 51         void RemoveDynamicSubscription<TH>(string eventName)
 52             where TH : IDynamicIntegrationEventHandler;
 53 
 54         /// <summary>
 55         /// 已訂閱事件
 56         /// </summary>
 57         /// <typeparam name="T"></typeparam>
 58         /// <returns></returns>
 59         bool HasSubscriptionsForEvent<T>() where T : IntegrationEvent;
 60 
 61         /// <summary>
 62         /// 已訂閱事件
 63         /// </summary>
 64         /// <param name="eventName"></param>
 65         /// <returns></returns>
 66         bool HasSubscriptionsForEvent(string eventName);
 67 
 68         /// <summary>
 69         /// 按名稱獲取事件類型
 70         /// </summary>
 71         /// <param name="eventName"></param>
 72         /// <returns></returns>
 73         Type GetEventTypeByName(string eventName);
 74 
 75         /// <summary>
 76         /// 清空
 77         /// </summary>
 78         void Clear();
 79 
 80         /// <summary>
 81         /// 獲取事件處理程式
 82         /// </summary>
 83         /// <typeparam name="T"></typeparam>
 84         /// <returns></returns>
 85         IEnumerable<SubscriptionInfo> GetHandlersForEvent<T>() where T : IntegrationEvent;
 86 
 87         /// <summary>
 88         /// 獲取事件處理程式
 89         /// </summary>
 90         /// <param name="eventName"></param>
 91         /// <returns></returns>
 92         IEnumerable<SubscriptionInfo> GetHandlersForEvent(string eventName);
 93 
 94         /// <summary>
 95         /// 獲取事件密鑰
 96         /// </summary>
 97         /// <typeparam name="T"></typeparam>
 98         /// <returns></returns>
 99         string GetEventKey<T>();
100     }
101 }

View Code

記憶體中事件匯流排訂閱管理器

  1 using EventBus.Abstractions;
  2 using EventBus.Events;
  3 using System;
  4 using System.Collections.Generic;
  5 using System.Linq;
  6 using System.Text;
  7 using System.Threading.Tasks;
  8 
  9 namespace EventBus
 10 {
 11     /// <summary>
 12     /// 記憶體中事件匯流排訂閱管理器
 13     /// </summary>
 14     public partial class InMemoryEventBusSubscriptionsManager : IEventBusSubscriptionsManager
 15     {
 16         private readonly Dictionary<string, List<SubscriptionInfo>> _handlers;
 17         private readonly List<Type> _eventTypes;
 18 
 19         public event EventHandler<string> OnEventRemoved;
 20 
 21         public InMemoryEventBusSubscriptionsManager()
 22         {
 23             _handlers = new Dictionary<string, List<SubscriptionInfo>>();
 24             _eventTypes = new List<Type>();
 25         }
 26 
 27         public bool IsEmpty => !_handlers.Keys.Any();
 28         public void Clear() => _handlers.Clear();
 29 
 30         public void AddDynamicSubscription<TH>(string eventName)
 31             where TH : IDynamicIntegrationEventHandler
 32         {
 33             DoAddSubscription(typeof(TH), eventName, isDynamic: true);
 34         }
 35 
 36         public void AddSubscription<T, TH>()
 37             where T : IntegrationEvent
 38             where TH : IIntegrationEventHandler<T>
 39         {
 40             var eventName = GetEventKey<T>();
 41 
 42             DoAddSubscription(typeof(TH), eventName, isDynamic: false);
 43 
 44             if (!_eventTypes.Contains(typeof(T)))
 45             {
 46                 _eventTypes.Add(typeof(T));
 47             }
 48         }
 49 
 50         /// <summary>
 51         /// 添加訂閱
 52         /// </summary>
 53         /// <param name="handlerType"></param>
 54         /// <param name="eventName"></param>
 55         /// <param name="isDynamic"></param>
 56         private void DoAddSubscription(Type handlerType, string eventName, bool isDynamic)
 57         {
 58             if (!HasSubscriptionsForEvent(eventName))
 59             {
 60                 _handlers.Add(eventName, new List<SubscriptionInfo>());
 61             }
 62 
 63             if (_handlers[eventName].Any(s => s.HandlerType == handlerType))
 64             {
 65                 throw new ArgumentException(
 66                     $"Handler Type {handlerType.Name} already registered for '{eventName}'", nameof(handlerType));
 67             }
 68 
 69             if (isDynamic)
 70             {
 71                 _handlers[eventName].Add(SubscriptionInfo.Dynamic(handlerType));
 72             }
 73             else
 74             {
 75                 _handlers[eventName].Add(SubscriptionInfo.Typed(handlerType));
 76             }
 77         }
 78 
 79 
 80         public void RemoveDynamicSubscription<TH>(string eventName)
 81             where TH : IDynamicIntegrationEventHandler
 82         {
 83             var handlerToRemove = FindDynamicSubscriptionToRemove<TH>(eventName);
 84             DoRemoveHandler(eventName, handlerToRemove);
 85         }
 86 
 87 
 88         public void RemoveSubscription<T, TH>()
 89             where TH : IIntegrationEventHandler<T>
 90             where T : IntegrationEvent
 91         {
 92             var handlerToRemove = FindSubscriptionToRemove<T, TH>();
 93             var eventName = GetEventKey<T>();
 94             DoRemoveHandler(eventName, handlerToRemove);
 95         }
 96 
 97         /// <summary>
 98         /// 刪除處理程式
 99         /// </summary>
100         /// <param name="eventName"></param>
101         /// <param name="subsToRemove"></param>
102         private void DoRemoveHandler(string eventName, SubscriptionInfo subsToRemove)
103         {
104             if (subsToRemove != null)
105             {
106                 _handlers[eventName].Remove(subsToRemove);
107                 if (!_handlers[eventName].Any())
108                 {
109                     _handlers.Remove(eventName);
110                     var eventType = _eventTypes.SingleOrDefault(e => e.Name == eventName);
111                     if (eventType != null)
112                     {
113                         _eventTypes.Remove(eventType);
114                     }
115                     RaiseOnEventRemoved(eventName);
116                 }
117 
118             }
119         }
120 
121         public IEnumerable<SubscriptionInfo> GetHandlersForEvent<T>() where T : IntegrationEvent
122         {
123             var key = GetEventKey<T>();
124             return GetHandlersForEvent(key);
125         }
126         public IEnumerable<SubscriptionInfo> GetHandlersForEvent(string eventName) => _handlers[eventName];
127 
128         /// <summary>
129         /// 刪除事件引發
130         /// </summary>
131         /// <param name="eventName"></param>
132         private void RaiseOnEventRemoved(string eventName)
133         {
134             var handler = OnEventRemoved;
135             handler?.Invoke(this, eventName);
136         }
137 
138 
139         private SubscriptionInfo FindDynamicSubscriptionToRemove<TH>(string eventName)
140             where TH : IDynamicIntegrationEventHandler
141         {
142             return DoFindSubscriptionToRemove(eventName, typeof(TH));
143         }
144 
145 
146         private SubscriptionInfo FindSubscriptionToRemove<T, TH>()
147              where T : IntegrationEvent
148              where TH : IIntegrationEventHandler<T>
149         {
150             var eventName = GetEventKey<T>();
151             return DoFindSubscriptionToRemove(eventName, typeof(TH));
152         }
153 
154         /// <summary>
155         /// 找到要刪除的訂閱
156         /// </summary>
157         /// <param name="eventName"></param>
158         /// <param name="handlerType"></param>
159         /// <returns></returns>
160         private SubscriptionInfo DoFindSubscriptionToRemove(string eventName, Type handlerType)
161         {
162             if (!HasSubscriptionsForEvent(eventName))
163             {
164                 return null;
165             }
166 
167             return _handlers[eventName].SingleOrDefault(s => s.HandlerType == handlerType);
168 
169         }
170 
171         public bool HasSubscriptionsForEvent<T>() where T : IntegrationEvent
172         {
173             var key = GetEventKey<T>();
174             return HasSubscriptionsForEvent(key);
175         }
176         public bool HasSubscriptionsForEvent(string eventName) => _handlers.ContainsKey(eventName);
177 
178         public Type GetEventTypeByName(string eventName) => _eventTypes.SingleOrDefault(t => t.Name == eventName);
179 
180         public string GetEventKey<T>()
181         {
182             return typeof(T).Name;
183         }
184     }
185 }

View Code

SubscriptionInfo

 1 using System;
 2 using System.Collections.Generic;
 3 using System.Linq;
 4 using System.Text;
 5 using System.Threading.Tasks;
 6 
 7 namespace EventBus
 8 {
 9     public partial class InMemoryEventBusSubscriptionsManager : IEventBusSubscriptionsManager
10     {
11         /// <summary>
12         /// 訂閱資訊
13         /// </summary>
14         public class SubscriptionInfo
15         {
16             public bool IsDynamic { get; }
17             public Type HandlerType { get; }
18 
19             private SubscriptionInfo(bool isDynamic, Type handlerType)
20             {
21                 IsDynamic = isDynamic;
22                 HandlerType = handlerType;
23             }
24 
25             //public static SubscriptionInfo Dynamic(Type handlerType)
26             //{
27             //    return new SubscriptionInfo(true, handlerType);
28             //}
29             //public static SubscriptionInfo Typed(Type handlerType)
30             //{
31             //    return new SubscriptionInfo(false, handlerType);
32             //}
33             public static SubscriptionInfo Dynamic(Type handlerType) =>
34               new SubscriptionInfo(true, handlerType);
35 
36             public static SubscriptionInfo Typed(Type handlerType) =>
37                 new SubscriptionInfo(false, handlerType);
38         }
39     }
40 }

View Code

EventBusRabbitMQ里的程式碼

默認 RabbitMQ 持久連接

  1 using Microsoft.Extensions.Logging;
  2 using Polly;
  3 using Polly.Retry;
  4 using RabbitMQ.Client;
  5 using RabbitMQ.Client.Events;
  6 using RabbitMQ.Client.Exceptions;
  7 using System;
  8 using System.IO;
  9 using System.Net.Sockets;
 10 
 11 namespace EventBus.EventBusRabbitMQ
 12 {
 13     /// <summary>
 14     /// 默認 RabbitMQ 持久連接
 15     /// </summary>
 16     public class DefaultRabbitMQPersistentConnection
 17       : IRabbitMQPersistentConnection
 18     {
 19         private readonly IConnectionFactory _connectionFactory;
 20         private readonly ILogger<DefaultRabbitMQPersistentConnection> _logger;
 21         private readonly int _retryCount;
 22         IConnection _connection;
 23         bool _disposed;
 24 
 25         object sync_root = new object();
 26 
 27         public DefaultRabbitMQPersistentConnection(IConnectionFactory connectionFactory, ILogger<DefaultRabbitMQPersistentConnection> logger, int retryCount = 5)
 28         {
 29             _connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory));
 30             _logger = logger ?? throw new ArgumentNullException(nameof(logger));
 31             _retryCount = retryCount;
 32         }
 33 
 34         public bool IsConnected
 35         {
 36             get
 37             {
 38                 return _connection != null && _connection.IsOpen && !_disposed;
 39             }
 40         }
 41 
 42         /// <summary>
 43         /// 創建rabbitmq模型
 44         /// </summary>
 45         /// <returns></returns>
 46         public IModel CreateModel()
 47         {
 48             if (!IsConnected)
 49             {
 50                 throw new InvalidOperationException("No RabbitMQ connections are available to perform this action");
 51             }
 52 
 53             return _connection.CreateModel();
 54         }
 55 
 56         public void Dispose()
 57         {
 58             if (_disposed) return;
 59 
 60             _disposed = true;
 61 
 62             try
 63             {
 64                 _connection?.Dispose();
 65             }
 66             catch (IOException ex)
 67             {
 68                 _logger.LogCritical(ex.ToString());
 69             }
 70         }
 71 
 72         /// <summary>
 73         /// 嘗試連接
 74         /// </summary>
 75         /// <returns></returns>
 76         public bool TryConnect()
 77         {
 78             _logger.LogInformation("RabbitMQ Client is trying to connect");
 79 
 80             lock (sync_root)
 81             {
 82                 var policy = RetryPolicy.Handle<SocketException>()
 83                     .Or<BrokerUnreachableException>()
 84                     .WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
 85                     {
 86                         _logger.LogWarning(ex, "RabbitMQ Client could not connect after {TimeOut}s ({ExceptionMessage})", $"{time.TotalSeconds:n1}", ex.Message);
 87                     }
 88                 );
 89 
 90                 policy.Execute(() =>
 91                 {
 92                     _connection = _connectionFactory
 93                           .CreateConnection();
 94                 });
 95 
 96                 if (IsConnected)
 97                 {
 98                     _connection.ConnectionShutdown += OnConnectionShutdown;
 99                     _connection.CallbackException += OnCallbackException;
100                     _connection.ConnectionBlocked += OnConnectionBlocked;
101 
102                     _logger.LogInformation("RabbitMQ Client acquired a persistent connection to '{HostName}' and is subscribed to failure events", _connection.Endpoint.HostName);
103 
104                     return true;
105                 }
106                 else
107                 {
108                     _logger.LogCritical("FATAL ERROR: RabbitMQ connections could not be created and opened");
109 
110                     return false;
111                 }
112             }
113         }
114 
115         /// <summary>
116         /// 連接阻塞
117         /// </summary>
118         /// <param name="sender"></param>
119         /// <param name="e"></param>
120         private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e)
121         {
122             if (_disposed) return;
123 
124             _logger.LogWarning("A RabbitMQ connection is shutdown. Trying to re-connect...");
125 
126             TryConnect();
127         }
128 
129         /// <summary>
130         /// 回調異常
131         /// </summary>
132         /// <param name="sender"></param>
133         /// <param name="e"></param>
134         void OnCallbackException(object sender, CallbackExceptionEventArgs e)
135         {
136             if (_disposed) return;
137 
138             _logger.LogWarning("A RabbitMQ connection throw exception. Trying to re-connect...");
139 
140             TryConnect();
141         }
142 
143         /// <summary>
144         /// 連接關閉
145         /// </summary>
146         /// <param name="sender"></param>
147         /// <param name="reason"></param>
148         void OnConnectionShutdown(object sender, ShutdownEventArgs reason)
149         {
150             if (_disposed) return;
151 
152             _logger.LogWarning("A RabbitMQ connection is on shutdown. Trying to re-connect...");
153 
154             TryConnect();
155         }
156     }
157 }

View Code

使用RabbitMQ的事件匯流排

  1 using Autofac;
  2 using EventBus;
  3 using EventBus.Abstractions;
  4 using EventBus.Events;
  5 using EventBus.Extensions;
  6 using Microsoft.Extensions.Logging;
  7 using Polly;
  8 using Polly.Retry;
  9 using RabbitMQ.Client;
 10 using RabbitMQ.Client.Events;
 11 using RabbitMQ.Client.Exceptions;
 12 using System;
 13 using System.Collections.Generic;
 14 using System.Linq;
 15 using System.Net.Sockets;
 16 using System.Text;
 17 using System.Text.Json;
 18 using System.Threading.Tasks;
 19 using Microsoft.Extensions.DependencyInjection;
 20 
 21 namespace EventBus.EventBusRabbitMQ
 22 {
 23     /// <summary>
 24     /// 使用RabbitMQ的事件匯流排
 25     /// </summary>
 26     public class EventBusRabbitMQ : IEventBus, IDisposable
 27     {
 28         const string BROKER_NAME = "hudean_event_bus";
 29         const string AUTOFAC_SCOPE_NAME = "hudean_event_bus";
 30 
 31         private readonly IRabbitMQPersistentConnection _persistentConnection;
 32         private readonly ILogger<EventBusRabbitMQ> _logger;
 33         private readonly IEventBusSubscriptionsManager _subsManager;
 34         private readonly ILifetimeScope _autofac;
 35         private readonly int _retryCount;
 36         
 37         private IModel _consumerChannel;
 38         private string _queueName;
 39         //後面把AutoFac的改成.net core 自帶的生命周期
 40         private readonly IServiceProvider _serviceProvider;
 41 
 42         public EventBusRabbitMQ(IServiceProvider serviceProvider, IRabbitMQPersistentConnection persistentConnection, ILogger<EventBusRabbitMQ> logger,
 43             ILifetimeScope autofac, IEventBusSubscriptionsManager subsManager, string queueName = null, int retryCount = 5)
 44         {
 45             _persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection));
 46             _logger = logger ?? throw new ArgumentNullException(nameof(logger));
 47             _subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager();
 48             _queueName = queueName;
 49             _consumerChannel = CreateConsumerChannel();
 50             _autofac = autofac;
 51             _retryCount = retryCount;
 52             _subsManager.OnEventRemoved += SubsManager_OnEventRemoved;
 53 
 54             _serviceProvider = serviceProvider;
 55         }
 56 
 57 
 58         /// <summary>
 59         /// 訂閱管理器刪除事件
 60         /// </summary>
 61         /// <param name="sender"></param>
 62         /// <param name="eventName"></param>
 63         private void SubsManager_OnEventRemoved(object sender, string eventName)
 64         {
 65             if (!_persistentConnection.IsConnected)
 66             {
 67                 _persistentConnection.TryConnect();
 68             }
 69 
 70             using (var channel = _persistentConnection.CreateModel())
 71             {
 72                 channel.QueueUnbind(queue: _queueName,
 73                     exchange: BROKER_NAME,
 74                     routingKey: eventName);
 75 
 76                 if (_subsManager.IsEmpty)
 77                 {
 78                     _queueName = string.Empty;
 79                     _consumerChannel.Close();
 80                 }
 81             }
 82         }
 83 
 84         /// <summary>
 85         /// 發布
 86         /// </summary>
 87         /// <param name="event"></param>
 88         public void Publish(IntegrationEvent @event)
 89         {
 90             if (!_persistentConnection.IsConnected)
 91             {
 92                 _persistentConnection.TryConnect();
 93             }
 94 
 95             var policy = RetryPolicy.Handle<BrokerUnreachableException>()
 96                 .Or<SocketException>()
 97                 .WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
 98                 {
 99                     _logger.LogWarning(ex, "Could not publish event: {EventId} after {Timeout}s ({ExceptionMessage})", @event.Id, $"{time.TotalSeconds:n1}", ex.Message);
100                 });
101 
102             var eventName = @event.GetType().Name;
103 
104             _logger.LogTrace("Creating RabbitMQ channel to publish event: {EventId} ({EventName})", @event.Id, eventName);
105 
106             using (var channel = _persistentConnection.CreateModel())
107             {
108                 _logger.LogTrace("Declaring RabbitMQ exchange to publish event: {EventId}", @event.Id);
109 
110                 channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct");
111 
112                 var body = JsonSerializer.SerializeToUtf8Bytes(@event, @event.GetType(), new JsonSerializerOptions
113                 {
114                     WriteIndented = true
115                 });
116 
117                 policy.Execute(() =>
118                 {
119                     var properties = channel.CreateBasicProperties();
120                     properties.DeliveryMode = 2; // persistent
121 
122                     _logger.LogTrace("Publishing event to RabbitMQ: {EventId}", @event.Id);
123 
124                     channel.BasicPublish(
125                         exchange: BROKER_NAME,
126                         routingKey: eventName,
127                         mandatory: true,
128                         basicProperties: properties,
129                         body: body);
130                 });
131             }
132         }
133 
134         /// <summary>
135         /// 動態訂閱
136         /// </summary>
137         /// <typeparam name="TH"></typeparam>
138         /// <param name="eventName"></param>
139         public void SubscribeDynamic<TH>(string eventName)
140             where TH : IDynamicIntegrationEventHandler
141         {
142             _logger.LogInformation("Subscribing to dynamic event {EventName} with {EventHandler}", eventName, typeof(TH).GetGenericTypeName());
143 
144             DoInternalSubscription(eventName);
145             _subsManager.AddDynamicSubscription<TH>(eventName);
146             StartBasicConsume();
147         }
148 
149         public void Subscribe<T, TH>()
150             where T : IntegrationEvent
151             where TH : IIntegrationEventHandler<T>
152         {
153             var eventName = _subsManager.GetEventKey<T>();
154             DoInternalSubscription(eventName);
155 
156             _logger.LogInformation("Subscribing to event {EventName} with {EventHandler}", eventName, typeof(TH).GetGenericTypeName());
157 
158             _subsManager.AddSubscription<T, TH>();
159             StartBasicConsume();
160         }
161 
162         /// <summary>
163         /// 做內部訂閱
164         /// </summary>
165         /// <param name="eventName"></param>
166         private void DoInternalSubscription(string eventName)
167         {
168             var containsKey = _subsManager.HasSubscriptionsForEvent(eventName);
169             if (!containsKey)
170             {
171                 if (!_persistentConnection.IsConnected)
172                 {
173                     _persistentConnection.TryConnect();
174                 }
175 
176                 _consumerChannel.QueueBind(queue: _queueName,
177                                     exchange: BROKER_NAME,
178                                     routingKey: eventName);
179             }
180         }
181 
182         public void Unsubscribe<T, TH>()
183             where T : IntegrationEvent
184             where TH : IIntegrationEventHandler<T>
185         {
186             var eventName = _subsManager.GetEventKey<T>();
187 
188             _logger.LogInformation("Unsubscribing from event {EventName}", eventName);
189 
190             _subsManager.RemoveSubscription<T, TH>();
191         }
192 
193         public void UnsubscribeDynamic<TH>(string eventName)
194             where TH : IDynamicIntegrationEventHandler
195         {
196             _subsManager.RemoveDynamicSubscription<TH>(eventName);
197         }
198 
199         /// <summary>
200         /// 釋放
201         /// </summary>
202         public void Dispose()
203         {
204             if (_consumerChannel != null)
205             {
206                 _consumerChannel.Dispose();
207             }
208 
209             _subsManager.Clear();
210         }
211 
212         /// <summary>
213         /// 開始基本消費
214         /// </summary>
215         private void StartBasicConsume()
216         {
217             _logger.LogTrace("Starting RabbitMQ basic consume");
218 
219             if (_consumerChannel != null)
220             {
221                 var consumer = new AsyncEventingBasicConsumer(_consumerChannel);
222 
223                 consumer.Received += Consumer_Received;
224 
225                 _consumerChannel.BasicConsume(
226                     queue: _queueName,
227                     autoAck: false,
228                     consumer: consumer);
229             }
230             else
231             {
232                 _logger.LogError("StartBasicConsume can't call on _consumerChannel == null");
233             }
234         }
235 
236         /// <summary>
237         /// 消費者收到消息
238         /// </summary>
239         /// <param name="sender"></param>
240         /// <param name="eventArgs"></param>
241         /// <returns></returns>
242         private async Task Consumer_Received(object sender, BasicDeliverEventArgs eventArgs)
243         {
244             var eventName = eventArgs.RoutingKey;
245             var message = Encoding.UTF8.GetString(eventArgs.Body.Span);
246 
247             try
248             {
249                 if (message.ToLowerInvariant().Contains("throw-fake-exception"))
250                 {
251                     throw new InvalidOperationException($"Fake exception requested: \"{message}\"");
252                 }
253 
254                 await ProcessEvent(eventName, message);
255                 //await ProcessEventByNetCore(eventName, message);
256             }
257             catch (Exception ex)
258             {
259                 _logger.LogWarning(ex, "----- ERROR Processing message \"{Message}\"", message);
260             }
261 
262             // Even on exception we take the message off the queue.
263             // in a REAL WORLD app this should be handled with a Dead Letter Exchange (DLX). 
264             // For more information see: //www.rabbitmq.com/dlx.html
265             _consumerChannel.BasicAck(eventArgs.DeliveryTag, multiple: false);
266         }
267 
268         private IModel CreateConsumerChannel()
269         {
270             if (!_persistentConnection.IsConnected)
271             {
272                 _persistentConnection.TryConnect();
273             }
274 
275             _logger.LogTrace("Creating RabbitMQ consumer channel");
276 
277             var channel = _persistentConnection.CreateModel();
278 
279             channel.ExchangeDeclare(exchange: BROKER_NAME,
280                                     type: "direct");
281 
282             channel.QueueDeclare(queue: _queueName,
283                                  durable: true,
284                                  exclusive: false,
285                                  autoDelete: false,
286                                  arguments: null);
287 
288             channel.CallbackException += (sender, ea) =>
289             {
290                 _logger.LogWarning(ea.Exception, "Recreating RabbitMQ consumer channel");
291 
292                 _consumerChannel.Dispose();
293                 _consumerChannel = CreateConsumerChannel();
294                 StartBasicConsume();
295             };
296 
297             return channel;
298         }
299 
300         /// <summary>
301         /// 進程事件(使用autofac)推薦
302         /// </summary>
303         /// <param name="eventName"></param>
304         /// <param name="message"></param>
305         /// <returns></returns>
306         private async Task ProcessEvent(string eventName, string message)
307         {
308             _logger.LogTrace("Processing RabbitMQ event: {EventName}", eventName);
309 
310             if (_subsManager.HasSubscriptionsForEvent(eventName))
311             {
312                 using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME))
313                 {
314                     var subscriptions = _subsManager.GetHandlersForEvent(eventName);
315                     foreach (var subscription in subscriptions)
316                     {
317                         if (subscription.IsDynamic)
318                         {
319                             var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler;
320                             if (handler == null) continue;
321                             using dynamic eventData = JsonDocument.Parse(message);
322                             await Task.Yield();
323                             await handler.Handle(eventData);
324                         }
325                         else
326                         {
327                             var handler = scope.ResolveOptional(subscription.HandlerType);
328                             if (handler == null) continue;
329                             var eventType = _subsManager.GetEventTypeByName(eventName);
330                             var integrationEvent = JsonSerializer.Deserialize(message, eventType, new JsonSerializerOptions() { PropertyNameCaseInsensitive = true });
331                             var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
332 
333                             await Task.Yield();
334                             await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
335                         }
336                     }
337                 }
338             }
339             else
340             {
341                 _logger.LogWarning("No subscription for RabbitMQ event: {EventName}", eventName);
342             }
343         }
344 
345         /// <summary>
346         /// 進程事件(使用自帶的)
347         /// </summary>
348         /// <param name="eventName"></param>
349         /// <param name="message"></param>
350         /// <returns></returns>
351         private async Task ProcessEventByNetCore(string eventName, string message)
352         {
353             _logger.LogTrace("Processing RabbitMQ event: {EventName}", eventName);
354            
355             if (_subsManager.HasSubscriptionsForEvent(eventName))
356             {
357                 //安裝 Microsoft.Extensions.DependencyInjection擴展包
358                 
359                 using (var scope = _serviceProvider.CreateScope())
360                 {
361                     var subscriptions = _subsManager.GetHandlersForEvent(eventName);
362                     foreach (var subscription in subscriptions)
363                     {
364                         if (subscription.IsDynamic)
365                         {
366                             var handler = scope.ServiceProvider.GetRequiredService(subscription.HandlerType) as IDynamicIntegrationEventHandler;
367                             if (handler == null) continue;
368                             using dynamic eventData = JsonDocument.Parse(message);
369                             await Task.Yield();
370                             await handler.Handle(eventData);
371                         }
372                         else
373                         {
374                             var handler = scope.ServiceProvider.GetRequiredService(subscription.HandlerType);
375                             if (handler == null) continue;
376                             var eventType = _subsManager.GetEventTypeByName(eventName);
377                             var integrationEvent = JsonSerializer.Deserialize(message, eventType, new JsonSerializerOptions() { PropertyNameCaseInsensitive = true });
378                             var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
379 
380                             await Task.Yield();
381                             await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
382                         }
383                     }
384                 }
385 
386                
387                 
388                
389             }
390             else
391             {
392                 _logger.LogWarning("No subscription for RabbitMQ event: {EventName}", eventName);
393             }
394         }
395 
396     }
397 }

View Code

RabbitMQ持續連接介面

using System;
using RabbitMQ.Client;

namespace EventBus.EventBusRabbitMQ
{
    /// <summary>
    /// RabbitMQ持續連接
    /// </summary>
    public interface IRabbitMQPersistentConnection
         : IDisposable
    {
        /// <summary>
        /// 已連接
        /// </summary>
        bool IsConnected { get; }

        /// <summary>
        /// 嘗試連接
        /// </summary>
        /// <returns></returns>
        bool TryConnect();

        IModel CreateModel();
    }
}

View Code

完整程式碼 地址 : //github.com/hudean/EventBusTest.git

運行訂閱服務和發布服務結果如圖

 

 

 

 

Tags: