基於ASP.NET Core 5.0使用RabbitMQ消息隊列實現事件匯流排(EventBus)
文章閱讀請前先參考看一下 //www.cnblogs.com/hudean/p/13858285.html 安裝RabbitMQ消息隊列軟體與了解C#中如何使用RabbitMQ 和 //www.cnblogs.com/Keep-Ambition/p/8038885.html 添加一個用戶並可以遠程訪問,
消息隊列的作用:跨服務通訊、服務之間解耦,削峰、非同步,其實還有一個作用是提高接收者性能
RabbitMQ 官方網站://www.rabbitmq.com/
RabbitMQ 中文文檔網址://rabbitmq.mr-ping.com/
本文程式碼GitHub 地址是: //github.com/hudean/MQDemo
一、初衷
- 開發難度大,各系統間分別隔離,需要關注消息中間件的各種複雜繁瑣的配置,關注不同的消息則需要對接不同的消息隊列
- 維護成本高,各系統或團隊需要分別管理消息中間件、處理各種服務異常、(消息中間件的高可用、業務的高可用等)
- 管理難度大,沒法對消息的生產和消費進行業務管理,也不方便對消息中的敏感數據進行許可權管理
- 擴展成本高,無法統一消息系統擴展功能,如路由、延時、重試、消費確認等 總結消息隊列是一個面向技術的接入,重點關注消息隊列的配置、介面對接;而消息匯流排則是通過屏蔽部署、分組和通訊等技術細節,實現一個面向業務的接入,重點關注要接收什麼消息。
二、定義
事件匯流排是實現基於事件驅動模式的方式之一,事件發送者將事件消息發送到一個事件匯流排上,事件訂閱者向事件匯流排訂閱和接收事件,而後再處理接收到的事件。固然,訂閱者不只能夠接收和消費事件,它們自己也能夠建立事件,並將它們發送到事件匯流排上。
事件匯流排是對發布-訂閱模式的一種實現。它是一種集中式事件處理機制,容許不一樣的組件之間進行彼此通訊而又不須要相互依賴,達到一種解耦的目的。
如前所述,使用基於事件的通訊時,當值得注意的事件發生時,微服務會發布事件,例如更新業務實體時。 其他微服務訂閱這些事件。 微服務收到事件時,可以更新其自己的業務實體,這可能會導致發布更多事件。 這是最終一致性概念的本質。 通常通過使用事件匯流排實現來執行此發布/訂閱系統。 事件匯流排可以設計為包含 API 的介面,該 API 是訂閱和取消訂閱事件和發布事件所需的。 它還可以包含一個或多個基於跨進程或消息通訊的實現,例如支援非同步通訊和發布/訂閱模型的消息隊列或服務匯流排。
可以使用事件來實現跨多個服務的業務事務,這可提供這些服務間的最終一致性。 最終一致事務由一系列分散式操作組成。 在每個操作中,微服務會更新業務實體,並發布可觸發下一個操作的事件。 下面的圖 6-18 顯示了通過事件匯流排發布了 PriceUpdated 事件,因此價格更新傳播到購物籃和其他微服務。
圖 6-18。 基於事件匯流排的事件驅動的通訊
本部分介紹如何使用通用事件匯流排介面(如圖 6-18 所示)實現這種與 .NET 的通訊。 存在多種可能的實現,每種實現使用不同的技術或基礎結構,例如 RabbitMQ、Azure 服務匯流排或任何其他第三方開源或商用服務匯流排。
三、集成事件
集成事件用於跨多個微服務或外部系統保持域狀態同步。 此功能可通過在微服務外發布集成事件來實現。 將事件發布到多個接收方微服務(訂閱到集成事件的儘可能多個微服務)時,每個接收方微服務中的相應事件處理程式會處理該事件。
集成事件基本上是數據保持類,如以下示例所示:


1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Text.Json.Serialization; 6 using System.Threading.Tasks; 7 8 namespace EventBus.Events 9 { 10 /// <summary> 11 /// 集成事件 12 /// </summary> 13 public record IntegrationEvent 14 { 15 public IntegrationEvent() 16 { 17 Id = Guid.NewGuid(); 18 CreationDate = DateTime.UtcNow; 19 } 20 [JsonConstructor] 21 public IntegrationEvent(Guid id, DateTime createDate) 22 { 23 Id = id; 24 CreationDate = createDate; 25 } 26 27 [JsonInclude] 28 public Guid Id { get; private init; } 29 30 [JsonInclude] 31 public DateTime CreationDate { get; private init; } 32 } 33 }
View Code


1 public class ProductPriceChangedIntegrationEvent : IntegrationEvent 2 { 3 public int ProductId { get; private set; } 4 public decimal NewPrice { get; private set; } 5 public decimal OldPrice { get; private set; } 6 7 public ProductPriceChangedIntegrationEvent(int productId, decimal newPrice, 8 decimal oldPrice) 9 { 10 ProductId = productId; 11 NewPrice = newPrice; 12 OldPrice = oldPrice; 13 } 14 }
View Code
事件匯流排
事件匯流排可實現發布/訂閱式通訊,無需組件之間相互顯式識別,如圖 6-19 所示。
圖 6-19。 事件匯流排的發布/訂閱基礎知識
上圖顯示了微服務 A 發布到事件匯流排,這會分發到訂閱微服務 B 和 C,發布伺服器無需知道訂閱伺服器。 事件匯流排與觀察者模式和發布-訂閱模式相關。
觀察者模式
在觀察者模式中,主對象(稱為可觀察對象)將相關資訊(事件)告知其他感興趣的對象(稱為觀察者)。
發布-訂閱(發布/訂閱)模式
發布/訂閱模式的用途與觀察者模式相同:某些事件發生時,需要告知其他服務。 但觀察者模式與發布/訂閱模式之間存在重要區別。 在觀察者模式中,直接從可觀察對象廣播到觀察者,因此它們「知道」彼此。 但在發布/訂閱模式中,存在稱為中轉站、消息中轉站或事件匯流排的第三個組件,發布伺服器和訂閱伺服器都知道第三個組件。 因此,使用發布/訂閱模式時,發布伺服器和訂閱伺服器通過所述的事件匯流排或消息中轉站精確分離。
中轉站或事件匯流排
如何實現發布伺服器和訂閱伺服器之間的匿名? 一個簡單方法是讓中轉站處理所有通訊。 事件匯流排是一個這樣的中轉站。
事件匯流排通常由兩部分組成:
-
抽象或介面。
-
一個或多個實現。
在圖 6-19 中,從應用程式角度看,會發現事件匯流排實際上是一個發布/訂閱通道。 實現此非同步通訊的方式可能會有差異。 它可以具有多個實現,以便你進行交換,具體取決於環境要求(例如,生產和開發環境)。
在圖 6-20 中,可看到事件匯流排的抽象,包含基於 RabbitMQ、Azure 服務匯流排或其他事件/消息中轉站等基礎結構消息技術的多個實現。
圖 6- 20。 事件匯流排的多個實現
最好通過介面定義事件匯流排,以便它可使用多種技術(例如 RabbitMQ、Azure 服務匯流排等)來實現。 但是,如前所述,僅當需要由你的抽象支援的基本事件匯流排功能時,才適合使用你自己的抽象(事件匯流排介面)。 如果需要更豐富的服務匯流排功能,應使用你喜歡的商用服務匯流排提供的 API 和抽象,而不是你自己的抽象。
定義事件匯流排介面
首先,讓我們了解一下事件匯流排介面的一些實現程式碼和可能的實現。 介面應是通用和簡單的,如下所示介面。


1 using EventBus.Events; 2 3 namespace EventBus.Abstractions 4 { 5 /// <summary> 6 /// 事件匯流排介面 7 /// </summary> 8 public interface IEventBus 9 { 10 /// <summary> 11 /// 發布 12 /// </summary> 13 /// <param name="event"></param> 14 void Publish(IntegrationEvent @event); 15 16 /// <summary> 17 /// 訂閱 18 /// </summary> 19 /// <typeparam name="T"></typeparam> 20 /// <typeparam name="TH"></typeparam> 21 void Subscribe<T, TH>() 22 where T : IntegrationEvent 23 where TH : IIntegrationEventHandler<T>; 24 25 /// <summary> 26 /// 動態訂閱 27 /// </summary> 28 /// <typeparam name="TH"></typeparam> 29 /// <param name="eventName"></param> 30 void SubscribeDynamic<TH>(string eventName) 31 where TH : IDynamicIntegrationEventHandler; 32 33 /// <summary> 34 /// 取消動態訂閱 35 /// </summary> 36 /// <typeparam name="TH"></typeparam> 37 /// <param name="eventName"></param> 38 void UnsubscribeDynamic<TH>(string eventName) 39 where TH : IDynamicIntegrationEventHandler; 40 41 /// <summary> 42 /// 取消訂閱 43 /// </summary> 44 /// <typeparam name="T"></typeparam> 45 /// <typeparam name="TH"></typeparam> 46 void Unsubscribe<T, TH>() 47 where TH : IIntegrationEventHandler<T> 48 where T : IntegrationEvent; 49 } 50 }
View Code
藉助 RabbitMQ 的事件匯流排實現,微服務可訂閱事件、發布事件和接收事件,如圖 6-21 所示。
圖 6-21。 事件匯流排的 RabbitMQ 實現
RabbitMQ 充當消息發布伺服器和訂閱者之間的中介,處理分發。 在程式碼中,EventBusRabbitMQ 類實現了泛型 IEventBus 介面。 此實現基於依賴項注入,以便可以從此開發/測試版本交換到生產版本。


1 public class EventBusRabbitMQ : IEventBus, IDisposable 2 { 3 // Implementation using RabbitMQ API 4 //... 5 }
View Code
示例開發/測試事件匯流排的 RabbitMQ 實現是樣板程式碼。 它必須處理與 RabbitMQ 伺服器的連接,並提供用於將消息事件發布到隊列的程式碼。 它還必須為每個事件類型實現收集集成事件處理程式的字典;這些事件類型可以對每個接收器微服務具有不同的實例化和不同的訂閱,如圖 6-21 所示。
四、使用 RabbitMQ 實現一個簡單的發布方法
下面的程式碼是 RabbitMQ 的事件匯流排實現的簡化版,用以展示整個方案。 你真的不必以這種方式處理連接。 要查看完整的實現,在後面


1 public class EventBusRabbitMQ : IEventBus, IDisposable 2 { 3 // Member objects and other methods ... 4 // ... 5 6 public void Publish(IntegrationEvent @event) 7 { 8 var eventName = @event.GetType().Name; 9 var factory = new ConnectionFactory() { HostName = _connectionString }; 10 using (var connection = factory.CreateConnection()) 11 using (var channel = connection.CreateModel()) 12 { 13 channel.ExchangeDeclare(exchange: _brokerName, 14 type: "direct"); 15 string message = JsonConvert.SerializeObject(@event); 16 var body = Encoding.UTF8.GetBytes(message); 17 channel.BasicPublish(exchange: _brokerName, 18 routingKey: eventName, 19 basicProperties: null, 20 body: body); 21 } 22 } 23 }
View Code
五、使用 RabbitMQ API 實現訂閱程式碼
與發布程式碼一樣,下面的程式碼是 RabbitMQ 事件匯流排實現的簡化部分。


public class EventBusRabbitMQ : IEventBus, IDisposable { // Member objects and other methods ... // ... public void Subscribe<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T> { var eventName = _subsManager.GetEventKey<T>(); var containsKey = _subsManager.HasSubscriptionsForEvent(eventName); if (!containsKey) { if (!_persistentConnection.IsConnected) { _persistentConnection.TryConnect(); } using (var channel = _persistentConnection.CreateModel()) { channel.QueueBind(queue: _queueName, exchange: BROKER_NAME, routingKey: eventName); } } _subsManager.AddSubscription<T, TH>(); } }
View Code
每個事件類型都有一個相關的通道,以獲取 RabbitMQ 中的事件。 然後,可以根據需要在每個通道和事件類型中擁有儘可能多的事件處理程式。
訂閱方法接受一個 IIntegrationEventHandler 對象,該對象相當於當前微服務中的回調方法,以及其相關的 IntegrationEvent 對象。 然後,程式碼將該事件處理程式添加到事件處理程式列表,每個客戶端微服務的每個集成事件類型都可具有事件處理程式。 如果客戶端程式碼尚未訂閱事件,該程式碼將為事件類型創建一個通道,以便在從任何其他服務中發布事件時,它可以從 RabbitMQ 以推送方式接收事件。
六、使用 RabbitMQ 完整實現事件匯流排程式碼
結構圖如下:
動態集成事件處理器介面


1 using System.Threading.Tasks; 2 3 namespace EventBus.Abstractions 4 { 5 /// <summary> 6 /// 動態集成事件處理器介面 7 /// </summary> 8 public interface IDynamicIntegrationEventHandler 9 { 10 Task Handle(dynamic eventData); 11 } 12 }
View Code
事件匯流排介面


1 using EventBus.Events; 2 3 namespace EventBus.Abstractions 4 { 5 /// <summary> 6 /// 事件匯流排介面 7 /// </summary> 8 public interface IEventBus 9 { 10 /// <summary> 11 /// 發布 12 /// </summary> 13 /// <param name="event"></param> 14 void Publish(IntegrationEvent @event); 15 16 /// <summary> 17 /// 訂閱 18 /// </summary> 19 /// <typeparam name="T"></typeparam> 20 /// <typeparam name="TH"></typeparam> 21 void Subscribe<T, TH>() 22 where T : IntegrationEvent 23 where TH : IIntegrationEventHandler<T>; 24 25 /// <summary> 26 /// 動態訂閱 27 /// </summary> 28 /// <typeparam name="TH"></typeparam> 29 /// <param name="eventName"></param> 30 void SubscribeDynamic<TH>(string eventName) 31 where TH : IDynamicIntegrationEventHandler; 32 33 /// <summary> 34 /// 取消動態訂閱 35 /// </summary> 36 /// <typeparam name="TH"></typeparam> 37 /// <param name="eventName"></param> 38 void UnsubscribeDynamic<TH>(string eventName) 39 where TH : IDynamicIntegrationEventHandler; 40 41 /// <summary> 42 /// 取消訂閱 43 /// </summary> 44 /// <typeparam name="T"></typeparam> 45 /// <typeparam name="TH"></typeparam> 46 void Unsubscribe<T, TH>() 47 where TH : IIntegrationEventHandler<T> 48 where T : IntegrationEvent; 49 } 50 }
View Code
集成事件處理器介面


1 using EventBus.Events; 2 using System; 3 using System.Collections.Generic; 4 using System.Linq; 5 using System.Text; 6 using System.Threading.Tasks; 7 8 namespace EventBus.Abstractions 9 { 10 /// <summary> 11 /// 集成事件處理器介面 12 /// </summary> 13 /// <typeparam name="TIntegrationEvent">TIntegrationEvent泛型</typeparam> 14 public interface IIntegrationEventHandler<in TIntegrationEvent> : IIntegrationEventHandler 15 where TIntegrationEvent : IntegrationEvent 16 { 17 Task Handle(TIntegrationEvent @event); 18 } 19 20 /// <summary> 21 /// 集成事件處理器 22 /// </summary> 23 public interface IIntegrationEventHandler 24 { 25 } 26 }
View Code
集成事件


1 using System.Text; 2 using System.Text.Json.Serialization; 3 using System.Threading.Tasks; 4 5 namespace EventBus.Events 6 { 7 /// <summary> 8 /// 集成事件 9 /// </summary> 10 public record IntegrationEvent 11 { 12 public IntegrationEvent() 13 { 14 Id = Guid.NewGuid(); 15 CreationDate = DateTime.UtcNow; 16 } 17 [JsonConstructor] 18 public IntegrationEvent(Guid id, DateTime createDate) 19 { 20 Id = id; 21 CreationDate = createDate; 22 } 23 24 [JsonInclude] 25 public Guid Id { get; private init; } 26 27 [JsonInclude] 28 public DateTime CreationDate { get; private init; } 29 } 30 }
View Code
GenericTypeExtensions


1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Threading.Tasks; 6 7 namespace EventBus.Extensions 8 { 9 public static class GenericTypeExtensions 10 { 11 public static string GetGenericTypeName(this Type type) 12 { 13 var typeName = string.Empty; 14 if (type.IsGenericType) 15 { 16 var genericTypes = string.Join(",", type.GetGenericArguments().Select(t => t.Name).ToArray()); 17 typeName = $"{type.Name.Remove(type.Name.IndexOf('`'))}<{genericTypes}>"; 18 } 19 else 20 { 21 typeName = type.Name; 22 } 23 24 return typeName; 25 } 26 27 /// <summary> 28 /// 獲取通用類型名稱 29 /// </summary> 30 /// <param name="object"></param> 31 /// <returns></returns> 32 public static string GetGenericTypeName(this object @object) 33 { 34 return @object.GetType().GetGenericTypeName(); 35 } 36 } 37 }
View Code
事件匯流排訂閱管理器介面


1 using EventBus.Abstractions; 2 using EventBus.Events; 3 using System; 4 using System.Collections.Generic; 5 using System.Linq; 6 using System.Text; 7 using System.Threading.Tasks; 8 using static EventBus.InMemoryEventBusSubscriptionsManager; 9 10 namespace EventBus 11 { 12 /// <summary> 13 /// 事件匯流排訂閱管理器介面 14 /// </summary> 15 public interface IEventBusSubscriptionsManager 16 { 17 bool IsEmpty { get; } 18 event EventHandler<string> OnEventRemoved; 19 20 /// <summary> 21 /// 添加動態訂閱 22 /// </summary> 23 /// <typeparam name="TH"></typeparam> 24 /// <param name="eventName"></param> 25 void AddDynamicSubscription<TH>(string eventName) 26 where TH : IDynamicIntegrationEventHandler; 27 28 /// <summary> 29 /// 添加訂閱 30 /// </summary> 31 /// <typeparam name="T"></typeparam> 32 /// <typeparam name="TH"></typeparam> 33 void AddSubscription<T, TH>() 34 where T : IntegrationEvent 35 where TH : IIntegrationEventHandler<T>; 36 37 /// <summary> 38 /// 刪除訂閱 39 /// </summary> 40 /// <typeparam name="T"></typeparam> 41 /// <typeparam name="TH"></typeparam> 42 void RemoveSubscription<T, TH>() 43 where TH : IIntegrationEventHandler<T> 44 where T : IntegrationEvent; 45 46 /// <summary> 47 /// 移除動態訂閱 48 /// </summary> 49 /// <typeparam name="TH"></typeparam> 50 /// <param name="eventName"></param> 51 void RemoveDynamicSubscription<TH>(string eventName) 52 where TH : IDynamicIntegrationEventHandler; 53 54 /// <summary> 55 /// 已訂閱事件 56 /// </summary> 57 /// <typeparam name="T"></typeparam> 58 /// <returns></returns> 59 bool HasSubscriptionsForEvent<T>() where T : IntegrationEvent; 60 61 /// <summary> 62 /// 已訂閱事件 63 /// </summary> 64 /// <param name="eventName"></param> 65 /// <returns></returns> 66 bool HasSubscriptionsForEvent(string eventName); 67 68 /// <summary> 69 /// 按名稱獲取事件類型 70 /// </summary> 71 /// <param name="eventName"></param> 72 /// <returns></returns> 73 Type GetEventTypeByName(string eventName); 74 75 /// <summary> 76 /// 清空 77 /// </summary> 78 void Clear(); 79 80 /// <summary> 81 /// 獲取事件處理程式 82 /// </summary> 83 /// <typeparam name="T"></typeparam> 84 /// <returns></returns> 85 IEnumerable<SubscriptionInfo> GetHandlersForEvent<T>() where T : IntegrationEvent; 86 87 /// <summary> 88 /// 獲取事件處理程式 89 /// </summary> 90 /// <param name="eventName"></param> 91 /// <returns></returns> 92 IEnumerable<SubscriptionInfo> GetHandlersForEvent(string eventName); 93 94 /// <summary> 95 /// 獲取事件密鑰 96 /// </summary> 97 /// <typeparam name="T"></typeparam> 98 /// <returns></returns> 99 string GetEventKey<T>(); 100 } 101 }
View Code
記憶體中事件匯流排訂閱管理器


1 using EventBus.Abstractions; 2 using EventBus.Events; 3 using System; 4 using System.Collections.Generic; 5 using System.Linq; 6 using System.Text; 7 using System.Threading.Tasks; 8 9 namespace EventBus 10 { 11 /// <summary> 12 /// 記憶體中事件匯流排訂閱管理器 13 /// </summary> 14 public partial class InMemoryEventBusSubscriptionsManager : IEventBusSubscriptionsManager 15 { 16 private readonly Dictionary<string, List<SubscriptionInfo>> _handlers; 17 private readonly List<Type> _eventTypes; 18 19 public event EventHandler<string> OnEventRemoved; 20 21 public InMemoryEventBusSubscriptionsManager() 22 { 23 _handlers = new Dictionary<string, List<SubscriptionInfo>>(); 24 _eventTypes = new List<Type>(); 25 } 26 27 public bool IsEmpty => !_handlers.Keys.Any(); 28 public void Clear() => _handlers.Clear(); 29 30 public void AddDynamicSubscription<TH>(string eventName) 31 where TH : IDynamicIntegrationEventHandler 32 { 33 DoAddSubscription(typeof(TH), eventName, isDynamic: true); 34 } 35 36 public void AddSubscription<T, TH>() 37 where T : IntegrationEvent 38 where TH : IIntegrationEventHandler<T> 39 { 40 var eventName = GetEventKey<T>(); 41 42 DoAddSubscription(typeof(TH), eventName, isDynamic: false); 43 44 if (!_eventTypes.Contains(typeof(T))) 45 { 46 _eventTypes.Add(typeof(T)); 47 } 48 } 49 50 /// <summary> 51 /// 添加訂閱 52 /// </summary> 53 /// <param name="handlerType"></param> 54 /// <param name="eventName"></param> 55 /// <param name="isDynamic"></param> 56 private void DoAddSubscription(Type handlerType, string eventName, bool isDynamic) 57 { 58 if (!HasSubscriptionsForEvent(eventName)) 59 { 60 _handlers.Add(eventName, new List<SubscriptionInfo>()); 61 } 62 63 if (_handlers[eventName].Any(s => s.HandlerType == handlerType)) 64 { 65 throw new ArgumentException( 66 $"Handler Type {handlerType.Name} already registered for '{eventName}'", nameof(handlerType)); 67 } 68 69 if (isDynamic) 70 { 71 _handlers[eventName].Add(SubscriptionInfo.Dynamic(handlerType)); 72 } 73 else 74 { 75 _handlers[eventName].Add(SubscriptionInfo.Typed(handlerType)); 76 } 77 } 78 79 80 public void RemoveDynamicSubscription<TH>(string eventName) 81 where TH : IDynamicIntegrationEventHandler 82 { 83 var handlerToRemove = FindDynamicSubscriptionToRemove<TH>(eventName); 84 DoRemoveHandler(eventName, handlerToRemove); 85 } 86 87 88 public void RemoveSubscription<T, TH>() 89 where TH : IIntegrationEventHandler<T> 90 where T : IntegrationEvent 91 { 92 var handlerToRemove = FindSubscriptionToRemove<T, TH>(); 93 var eventName = GetEventKey<T>(); 94 DoRemoveHandler(eventName, handlerToRemove); 95 } 96 97 /// <summary> 98 /// 刪除處理程式 99 /// </summary> 100 /// <param name="eventName"></param> 101 /// <param name="subsToRemove"></param> 102 private void DoRemoveHandler(string eventName, SubscriptionInfo subsToRemove) 103 { 104 if (subsToRemove != null) 105 { 106 _handlers[eventName].Remove(subsToRemove); 107 if (!_handlers[eventName].Any()) 108 { 109 _handlers.Remove(eventName); 110 var eventType = _eventTypes.SingleOrDefault(e => e.Name == eventName); 111 if (eventType != null) 112 { 113 _eventTypes.Remove(eventType); 114 } 115 RaiseOnEventRemoved(eventName); 116 } 117 118 } 119 } 120 121 public IEnumerable<SubscriptionInfo> GetHandlersForEvent<T>() where T : IntegrationEvent 122 { 123 var key = GetEventKey<T>(); 124 return GetHandlersForEvent(key); 125 } 126 public IEnumerable<SubscriptionInfo> GetHandlersForEvent(string eventName) => _handlers[eventName]; 127 128 /// <summary> 129 /// 刪除事件引發 130 /// </summary> 131 /// <param name="eventName"></param> 132 private void RaiseOnEventRemoved(string eventName) 133 { 134 var handler = OnEventRemoved; 135 handler?.Invoke(this, eventName); 136 } 137 138 139 private SubscriptionInfo FindDynamicSubscriptionToRemove<TH>(string eventName) 140 where TH : IDynamicIntegrationEventHandler 141 { 142 return DoFindSubscriptionToRemove(eventName, typeof(TH)); 143 } 144 145 146 private SubscriptionInfo FindSubscriptionToRemove<T, TH>() 147 where T : IntegrationEvent 148 where TH : IIntegrationEventHandler<T> 149 { 150 var eventName = GetEventKey<T>(); 151 return DoFindSubscriptionToRemove(eventName, typeof(TH)); 152 } 153 154 /// <summary> 155 /// 找到要刪除的訂閱 156 /// </summary> 157 /// <param name="eventName"></param> 158 /// <param name="handlerType"></param> 159 /// <returns></returns> 160 private SubscriptionInfo DoFindSubscriptionToRemove(string eventName, Type handlerType) 161 { 162 if (!HasSubscriptionsForEvent(eventName)) 163 { 164 return null; 165 } 166 167 return _handlers[eventName].SingleOrDefault(s => s.HandlerType == handlerType); 168 169 } 170 171 public bool HasSubscriptionsForEvent<T>() where T : IntegrationEvent 172 { 173 var key = GetEventKey<T>(); 174 return HasSubscriptionsForEvent(key); 175 } 176 public bool HasSubscriptionsForEvent(string eventName) => _handlers.ContainsKey(eventName); 177 178 public Type GetEventTypeByName(string eventName) => _eventTypes.SingleOrDefault(t => t.Name == eventName); 179 180 public string GetEventKey<T>() 181 { 182 return typeof(T).Name; 183 } 184 } 185 }
View Code
SubscriptionInfo


1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Threading.Tasks; 6 7 namespace EventBus 8 { 9 public partial class InMemoryEventBusSubscriptionsManager : IEventBusSubscriptionsManager 10 { 11 /// <summary> 12 /// 訂閱資訊 13 /// </summary> 14 public class SubscriptionInfo 15 { 16 public bool IsDynamic { get; } 17 public Type HandlerType { get; } 18 19 private SubscriptionInfo(bool isDynamic, Type handlerType) 20 { 21 IsDynamic = isDynamic; 22 HandlerType = handlerType; 23 } 24 25 //public static SubscriptionInfo Dynamic(Type handlerType) 26 //{ 27 // return new SubscriptionInfo(true, handlerType); 28 //} 29 //public static SubscriptionInfo Typed(Type handlerType) 30 //{ 31 // return new SubscriptionInfo(false, handlerType); 32 //} 33 public static SubscriptionInfo Dynamic(Type handlerType) => 34 new SubscriptionInfo(true, handlerType); 35 36 public static SubscriptionInfo Typed(Type handlerType) => 37 new SubscriptionInfo(false, handlerType); 38 } 39 } 40 }
View Code
EventBusRabbitMQ里的程式碼
默認 RabbitMQ 持久連接


1 using Microsoft.Extensions.Logging; 2 using Polly; 3 using Polly.Retry; 4 using RabbitMQ.Client; 5 using RabbitMQ.Client.Events; 6 using RabbitMQ.Client.Exceptions; 7 using System; 8 using System.IO; 9 using System.Net.Sockets; 10 11 namespace EventBus.EventBusRabbitMQ 12 { 13 /// <summary> 14 /// 默認 RabbitMQ 持久連接 15 /// </summary> 16 public class DefaultRabbitMQPersistentConnection 17 : IRabbitMQPersistentConnection 18 { 19 private readonly IConnectionFactory _connectionFactory; 20 private readonly ILogger<DefaultRabbitMQPersistentConnection> _logger; 21 private readonly int _retryCount; 22 IConnection _connection; 23 bool _disposed; 24 25 object sync_root = new object(); 26 27 public DefaultRabbitMQPersistentConnection(IConnectionFactory connectionFactory, ILogger<DefaultRabbitMQPersistentConnection> logger, int retryCount = 5) 28 { 29 _connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory)); 30 _logger = logger ?? throw new ArgumentNullException(nameof(logger)); 31 _retryCount = retryCount; 32 } 33 34 public bool IsConnected 35 { 36 get 37 { 38 return _connection != null && _connection.IsOpen && !_disposed; 39 } 40 } 41 42 /// <summary> 43 /// 創建rabbitmq模型 44 /// </summary> 45 /// <returns></returns> 46 public IModel CreateModel() 47 { 48 if (!IsConnected) 49 { 50 throw new InvalidOperationException("No RabbitMQ connections are available to perform this action"); 51 } 52 53 return _connection.CreateModel(); 54 } 55 56 public void Dispose() 57 { 58 if (_disposed) return; 59 60 _disposed = true; 61 62 try 63 { 64 _connection?.Dispose(); 65 } 66 catch (IOException ex) 67 { 68 _logger.LogCritical(ex.ToString()); 69 } 70 } 71 72 /// <summary> 73 /// 嘗試連接 74 /// </summary> 75 /// <returns></returns> 76 public bool TryConnect() 77 { 78 _logger.LogInformation("RabbitMQ Client is trying to connect"); 79 80 lock (sync_root) 81 { 82 var policy = RetryPolicy.Handle<SocketException>() 83 .Or<BrokerUnreachableException>() 84 .WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) => 85 { 86 _logger.LogWarning(ex, "RabbitMQ Client could not connect after {TimeOut}s ({ExceptionMessage})", $"{time.TotalSeconds:n1}", ex.Message); 87 } 88 ); 89 90 policy.Execute(() => 91 { 92 _connection = _connectionFactory 93 .CreateConnection(); 94 }); 95 96 if (IsConnected) 97 { 98 _connection.ConnectionShutdown += OnConnectionShutdown; 99 _connection.CallbackException += OnCallbackException; 100 _connection.ConnectionBlocked += OnConnectionBlocked; 101 102 _logger.LogInformation("RabbitMQ Client acquired a persistent connection to '{HostName}' and is subscribed to failure events", _connection.Endpoint.HostName); 103 104 return true; 105 } 106 else 107 { 108 _logger.LogCritical("FATAL ERROR: RabbitMQ connections could not be created and opened"); 109 110 return false; 111 } 112 } 113 } 114 115 /// <summary> 116 /// 連接阻塞 117 /// </summary> 118 /// <param name="sender"></param> 119 /// <param name="e"></param> 120 private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e) 121 { 122 if (_disposed) return; 123 124 _logger.LogWarning("A RabbitMQ connection is shutdown. Trying to re-connect..."); 125 126 TryConnect(); 127 } 128 129 /// <summary> 130 /// 回調異常 131 /// </summary> 132 /// <param name="sender"></param> 133 /// <param name="e"></param> 134 void OnCallbackException(object sender, CallbackExceptionEventArgs e) 135 { 136 if (_disposed) return; 137 138 _logger.LogWarning("A RabbitMQ connection throw exception. Trying to re-connect..."); 139 140 TryConnect(); 141 } 142 143 /// <summary> 144 /// 連接關閉 145 /// </summary> 146 /// <param name="sender"></param> 147 /// <param name="reason"></param> 148 void OnConnectionShutdown(object sender, ShutdownEventArgs reason) 149 { 150 if (_disposed) return; 151 152 _logger.LogWarning("A RabbitMQ connection is on shutdown. Trying to re-connect..."); 153 154 TryConnect(); 155 } 156 } 157 }
View Code
使用RabbitMQ的事件匯流排


1 using Autofac; 2 using EventBus; 3 using EventBus.Abstractions; 4 using EventBus.Events; 5 using EventBus.Extensions; 6 using Microsoft.Extensions.Logging; 7 using Polly; 8 using Polly.Retry; 9 using RabbitMQ.Client; 10 using RabbitMQ.Client.Events; 11 using RabbitMQ.Client.Exceptions; 12 using System; 13 using System.Collections.Generic; 14 using System.Linq; 15 using System.Net.Sockets; 16 using System.Text; 17 using System.Text.Json; 18 using System.Threading.Tasks; 19 using Microsoft.Extensions.DependencyInjection; 20 21 namespace EventBus.EventBusRabbitMQ 22 { 23 /// <summary> 24 /// 使用RabbitMQ的事件匯流排 25 /// </summary> 26 public class EventBusRabbitMQ : IEventBus, IDisposable 27 { 28 const string BROKER_NAME = "hudean_event_bus"; 29 const string AUTOFAC_SCOPE_NAME = "hudean_event_bus"; 30 31 private readonly IRabbitMQPersistentConnection _persistentConnection; 32 private readonly ILogger<EventBusRabbitMQ> _logger; 33 private readonly IEventBusSubscriptionsManager _subsManager; 34 private readonly ILifetimeScope _autofac; 35 private readonly int _retryCount; 36 37 private IModel _consumerChannel; 38 private string _queueName; 39 //後面把AutoFac的改成.net core 自帶的生命周期 40 private readonly IServiceProvider _serviceProvider; 41 42 public EventBusRabbitMQ(IServiceProvider serviceProvider, IRabbitMQPersistentConnection persistentConnection, ILogger<EventBusRabbitMQ> logger, 43 ILifetimeScope autofac, IEventBusSubscriptionsManager subsManager, string queueName = null, int retryCount = 5) 44 { 45 _persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection)); 46 _logger = logger ?? throw new ArgumentNullException(nameof(logger)); 47 _subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager(); 48 _queueName = queueName; 49 _consumerChannel = CreateConsumerChannel(); 50 _autofac = autofac; 51 _retryCount = retryCount; 52 _subsManager.OnEventRemoved += SubsManager_OnEventRemoved; 53 54 _serviceProvider = serviceProvider; 55 } 56 57 58 /// <summary> 59 /// 訂閱管理器刪除事件 60 /// </summary> 61 /// <param name="sender"></param> 62 /// <param name="eventName"></param> 63 private void SubsManager_OnEventRemoved(object sender, string eventName) 64 { 65 if (!_persistentConnection.IsConnected) 66 { 67 _persistentConnection.TryConnect(); 68 } 69 70 using (var channel = _persistentConnection.CreateModel()) 71 { 72 channel.QueueUnbind(queue: _queueName, 73 exchange: BROKER_NAME, 74 routingKey: eventName); 75 76 if (_subsManager.IsEmpty) 77 { 78 _queueName = string.Empty; 79 _consumerChannel.Close(); 80 } 81 } 82 } 83 84 /// <summary> 85 /// 發布 86 /// </summary> 87 /// <param name="event"></param> 88 public void Publish(IntegrationEvent @event) 89 { 90 if (!_persistentConnection.IsConnected) 91 { 92 _persistentConnection.TryConnect(); 93 } 94 95 var policy = RetryPolicy.Handle<BrokerUnreachableException>() 96 .Or<SocketException>() 97 .WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) => 98 { 99 _logger.LogWarning(ex, "Could not publish event: {EventId} after {Timeout}s ({ExceptionMessage})", @event.Id, $"{time.TotalSeconds:n1}", ex.Message); 100 }); 101 102 var eventName = @event.GetType().Name; 103 104 _logger.LogTrace("Creating RabbitMQ channel to publish event: {EventId} ({EventName})", @event.Id, eventName); 105 106 using (var channel = _persistentConnection.CreateModel()) 107 { 108 _logger.LogTrace("Declaring RabbitMQ exchange to publish event: {EventId}", @event.Id); 109 110 channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct"); 111 112 var body = JsonSerializer.SerializeToUtf8Bytes(@event, @event.GetType(), new JsonSerializerOptions 113 { 114 WriteIndented = true 115 }); 116 117 policy.Execute(() => 118 { 119 var properties = channel.CreateBasicProperties(); 120 properties.DeliveryMode = 2; // persistent 121 122 _logger.LogTrace("Publishing event to RabbitMQ: {EventId}", @event.Id); 123 124 channel.BasicPublish( 125 exchange: BROKER_NAME, 126 routingKey: eventName, 127 mandatory: true, 128 basicProperties: properties, 129 body: body); 130 }); 131 } 132 } 133 134 /// <summary> 135 /// 動態訂閱 136 /// </summary> 137 /// <typeparam name="TH"></typeparam> 138 /// <param name="eventName"></param> 139 public void SubscribeDynamic<TH>(string eventName) 140 where TH : IDynamicIntegrationEventHandler 141 { 142 _logger.LogInformation("Subscribing to dynamic event {EventName} with {EventHandler}", eventName, typeof(TH).GetGenericTypeName()); 143 144 DoInternalSubscription(eventName); 145 _subsManager.AddDynamicSubscription<TH>(eventName); 146 StartBasicConsume(); 147 } 148 149 public void Subscribe<T, TH>() 150 where T : IntegrationEvent 151 where TH : IIntegrationEventHandler<T> 152 { 153 var eventName = _subsManager.GetEventKey<T>(); 154 DoInternalSubscription(eventName); 155 156 _logger.LogInformation("Subscribing to event {EventName} with {EventHandler}", eventName, typeof(TH).GetGenericTypeName()); 157 158 _subsManager.AddSubscription<T, TH>(); 159 StartBasicConsume(); 160 } 161 162 /// <summary> 163 /// 做內部訂閱 164 /// </summary> 165 /// <param name="eventName"></param> 166 private void DoInternalSubscription(string eventName) 167 { 168 var containsKey = _subsManager.HasSubscriptionsForEvent(eventName); 169 if (!containsKey) 170 { 171 if (!_persistentConnection.IsConnected) 172 { 173 _persistentConnection.TryConnect(); 174 } 175 176 _consumerChannel.QueueBind(queue: _queueName, 177 exchange: BROKER_NAME, 178 routingKey: eventName); 179 } 180 } 181 182 public void Unsubscribe<T, TH>() 183 where T : IntegrationEvent 184 where TH : IIntegrationEventHandler<T> 185 { 186 var eventName = _subsManager.GetEventKey<T>(); 187 188 _logger.LogInformation("Unsubscribing from event {EventName}", eventName); 189 190 _subsManager.RemoveSubscription<T, TH>(); 191 } 192 193 public void UnsubscribeDynamic<TH>(string eventName) 194 where TH : IDynamicIntegrationEventHandler 195 { 196 _subsManager.RemoveDynamicSubscription<TH>(eventName); 197 } 198 199 /// <summary> 200 /// 釋放 201 /// </summary> 202 public void Dispose() 203 { 204 if (_consumerChannel != null) 205 { 206 _consumerChannel.Dispose(); 207 } 208 209 _subsManager.Clear(); 210 } 211 212 /// <summary> 213 /// 開始基本消費 214 /// </summary> 215 private void StartBasicConsume() 216 { 217 _logger.LogTrace("Starting RabbitMQ basic consume"); 218 219 if (_consumerChannel != null) 220 { 221 var consumer = new AsyncEventingBasicConsumer(_consumerChannel); 222 223 consumer.Received += Consumer_Received; 224 225 _consumerChannel.BasicConsume( 226 queue: _queueName, 227 autoAck: false, 228 consumer: consumer); 229 } 230 else 231 { 232 _logger.LogError("StartBasicConsume can't call on _consumerChannel == null"); 233 } 234 } 235 236 /// <summary> 237 /// 消費者收到消息 238 /// </summary> 239 /// <param name="sender"></param> 240 /// <param name="eventArgs"></param> 241 /// <returns></returns> 242 private async Task Consumer_Received(object sender, BasicDeliverEventArgs eventArgs) 243 { 244 var eventName = eventArgs.RoutingKey; 245 var message = Encoding.UTF8.GetString(eventArgs.Body.Span); 246 247 try 248 { 249 if (message.ToLowerInvariant().Contains("throw-fake-exception")) 250 { 251 throw new InvalidOperationException($"Fake exception requested: \"{message}\""); 252 } 253 254 await ProcessEvent(eventName, message); 255 //await ProcessEventByNetCore(eventName, message); 256 } 257 catch (Exception ex) 258 { 259 _logger.LogWarning(ex, "----- ERROR Processing message \"{Message}\"", message); 260 } 261 262 // Even on exception we take the message off the queue. 263 // in a REAL WORLD app this should be handled with a Dead Letter Exchange (DLX). 264 // For more information see: //www.rabbitmq.com/dlx.html 265 _consumerChannel.BasicAck(eventArgs.DeliveryTag, multiple: false); 266 } 267 268 private IModel CreateConsumerChannel() 269 { 270 if (!_persistentConnection.IsConnected) 271 { 272 _persistentConnection.TryConnect(); 273 } 274 275 _logger.LogTrace("Creating RabbitMQ consumer channel"); 276 277 var channel = _persistentConnection.CreateModel(); 278 279 channel.ExchangeDeclare(exchange: BROKER_NAME, 280 type: "direct"); 281 282 channel.QueueDeclare(queue: _queueName, 283 durable: true, 284 exclusive: false, 285 autoDelete: false, 286 arguments: null); 287 288 channel.CallbackException += (sender, ea) => 289 { 290 _logger.LogWarning(ea.Exception, "Recreating RabbitMQ consumer channel"); 291 292 _consumerChannel.Dispose(); 293 _consumerChannel = CreateConsumerChannel(); 294 StartBasicConsume(); 295 }; 296 297 return channel; 298 } 299 300 /// <summary> 301 /// 進程事件(使用autofac)推薦 302 /// </summary> 303 /// <param name="eventName"></param> 304 /// <param name="message"></param> 305 /// <returns></returns> 306 private async Task ProcessEvent(string eventName, string message) 307 { 308 _logger.LogTrace("Processing RabbitMQ event: {EventName}", eventName); 309 310 if (_subsManager.HasSubscriptionsForEvent(eventName)) 311 { 312 using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME)) 313 { 314 var subscriptions = _subsManager.GetHandlersForEvent(eventName); 315 foreach (var subscription in subscriptions) 316 { 317 if (subscription.IsDynamic) 318 { 319 var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler; 320 if (handler == null) continue; 321 using dynamic eventData = JsonDocument.Parse(message); 322 await Task.Yield(); 323 await handler.Handle(eventData); 324 } 325 else 326 { 327 var handler = scope.ResolveOptional(subscription.HandlerType); 328 if (handler == null) continue; 329 var eventType = _subsManager.GetEventTypeByName(eventName); 330 var integrationEvent = JsonSerializer.Deserialize(message, eventType, new JsonSerializerOptions() { PropertyNameCaseInsensitive = true }); 331 var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); 332 333 await Task.Yield(); 334 await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent }); 335 } 336 } 337 } 338 } 339 else 340 { 341 _logger.LogWarning("No subscription for RabbitMQ event: {EventName}", eventName); 342 } 343 } 344 345 /// <summary> 346 /// 進程事件(使用自帶的) 347 /// </summary> 348 /// <param name="eventName"></param> 349 /// <param name="message"></param> 350 /// <returns></returns> 351 private async Task ProcessEventByNetCore(string eventName, string message) 352 { 353 _logger.LogTrace("Processing RabbitMQ event: {EventName}", eventName); 354 355 if (_subsManager.HasSubscriptionsForEvent(eventName)) 356 { 357 //安裝 Microsoft.Extensions.DependencyInjection擴展包 358 359 using (var scope = _serviceProvider.CreateScope()) 360 { 361 var subscriptions = _subsManager.GetHandlersForEvent(eventName); 362 foreach (var subscription in subscriptions) 363 { 364 if (subscription.IsDynamic) 365 { 366 var handler = scope.ServiceProvider.GetRequiredService(subscription.HandlerType) as IDynamicIntegrationEventHandler; 367 if (handler == null) continue; 368 using dynamic eventData = JsonDocument.Parse(message); 369 await Task.Yield(); 370 await handler.Handle(eventData); 371 } 372 else 373 { 374 var handler = scope.ServiceProvider.GetRequiredService(subscription.HandlerType); 375 if (handler == null) continue; 376 var eventType = _subsManager.GetEventTypeByName(eventName); 377 var integrationEvent = JsonSerializer.Deserialize(message, eventType, new JsonSerializerOptions() { PropertyNameCaseInsensitive = true }); 378 var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); 379 380 await Task.Yield(); 381 await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent }); 382 } 383 } 384 } 385 386 387 388 389 } 390 else 391 { 392 _logger.LogWarning("No subscription for RabbitMQ event: {EventName}", eventName); 393 } 394 } 395 396 } 397 }
View Code
RabbitMQ持續連接介面


using System; using RabbitMQ.Client; namespace EventBus.EventBusRabbitMQ { /// <summary> /// RabbitMQ持續連接 /// </summary> public interface IRabbitMQPersistentConnection : IDisposable { /// <summary> /// 已連接 /// </summary> bool IsConnected { get; } /// <summary> /// 嘗試連接 /// </summary> /// <returns></returns> bool TryConnect(); IModel CreateModel(); } }
View Code
完整程式碼 地址 : //github.com/hudean/EventBusTest.git
運行訂閱服務和發布服務結果如圖