幹掉Switch-Case、If-Else—-訂閱發佈模式+事件驅動模式
在上位機和下位機或者服務端和客戶端通信的時候,很多時候可能為了趕項目進度或者寫代碼方便直接使用Socket通信,傳輸string類型的關鍵字驅動對應的事件,這就有可能導致程序中存在大量的Switch-Case、If-Else判斷。當通信的邏輯越來越複雜,增加的關鍵字就越來越多,導致一個事件處理類中不斷的累加成千上萬的Switch-Case、If-Else代碼,導致後期的代碼極其難以維護。
當大家在看到大量的Switch-Case、If-Else代碼堆積在一起肯定會感覺非常的頭痛,其中的業務邏輯就如同線團一樣錯綜複雜。我曾經在一家規模不小的大廠中就看到一個客戶端的工程中,有一個上萬行代碼的消息處理類,其中的消息關鍵字嵌套關鍵字,Switch-Case代碼佔據大壁江山,而項目中的其他人員沒有願意接這個工程的,久而久之成了代代相傳的祖傳代碼。
那麼,如何去優化幹掉Switch-Case、If-Else?
其實這個話題可以寫成一個系列,根據不同的情況使用不同的設計模式去進行重構和優化。今天推薦的訂閱發佈模式+事件驅動模式就是針對以上在網絡通信過程中的代碼優化,使用消息中間件RabbitMQ替代Socket。
如果您接觸過、使用過RabbitMQ,本文中的代碼也許更能理解,以下代碼僅代表個人的開發經驗與大家一起分享學習,如有異議歡迎溝通討論。
訂閱發佈模式+事件驅動模式概念
訂閱發佈模式,將消息分為不同的類型,發佈者無需關心有哪些訂閱者存在,而訂閱者只關心自己感興趣的消息,無需關心有哪些發佈者存在。
事件驅動模式,程序的行為是通過事件驅動的,程序能夠實時感知自己感興趣的事件,並對不同的事件作出不同的反應。
實現思路
消息中間件RabbitMQ
本文的訂閱發佈模式+事件驅動模式是藉助RabbitMQ來實現,需要確保本地電腦已經安裝RabbitMQ的相關環境,然後在VS中創建一個解決方案MyRBQServer,並添加兩個控制台程序MyRBQClient、MyRBQServer,一個類庫MyRBQPolisher,並安裝Nuget包 RabbitMQ.Client。
MyRBQClient:模擬客戶端,其實在本文的設計中沒有明顯的客戶端和服務端的概念,客戶端和服務端都可以發佈和訂閱事件。
MyRBQPolisher:消息的接口實現庫,其中包含用於發佈和訂閱事件的接口 IPublisher和各種消息的定義,這個庫是本文代碼中的重點。
MyRBQServer:模擬服務端。
實現MyRBQPolisher
在MyRBQPolisher中添加一個消息基類EventBase和一個消息處理類的泛型接口IMessageEvent,IMessageEvent的類型需要約束為EventBase的子類。
EventBase是所有消息事件的基類,而IMessageEvent則是對應消息事件的處理接口,後面會使用反射並動態創建對象調用Invoke方法。
public abstract class EventBase { }
public interface IMessageEvent<EventType> where EventType: EventBase { Task Invoke(EventType eventBase); }
然後再添加一個發佈訂閱的接口IPublisher,並添加訂閱接口Subscribe和發佈接口Publish, 此處我把訂閱的接口和發佈的接口寫在同一個類中,實際應用的時候也可以分開。
public interface IPublisher { void Subscribe<TC, TH>() where TC : EventBase where TH : IMessageEvent<TC>; void Publish<TC>(TC @event) where TC:EventBase; }
再添加一個接口IEventManager和對應的接口實現類EventManager,這是一個用來管理註冊事件的處理類,在這個實現類中使用Dictionary來保存消息事件和消息處理類的對應關係。
public interface IEventManager { void Subscribe<TC, TH>() where TC : EventBase where TH : IMessageEvent<TC>; Type GetEventHandleType(string eventKey); Type GetEventType(string eventKey); }
public class EventManager : IEventManager { private Dictionary<string, Type> _messageEvents = new Dictionary<string, Type>();//保存消息事件和對應的消息處理類型 private List<Type> _eventTypes = new List<Type>(); /// <summary> /// 獲取消息對應的處理類型 /// </summary> /// <param name="eventKey"></param> /// <returns></returns> public Type GetEventHandleType(string eventKey) { if (_messageEvents.ContainsKey(eventKey)) { return _messageEvents[eventKey]; } return null; } /// <summary> /// 獲取消息類型 /// </summary> /// <param name="eventKey"></param> /// <returns></returns> public Type GetEventType(string eventKey) { return _eventTypes.FirstOrDefault(s=>s.Name == eventKey); } /// <summary> /// 註冊消息事件類型和消息處理類型 /// </summary> /// <typeparam name="TC"></typeparam> /// <typeparam name="TH"></typeparam> public void Subscribe<TC, TH>() where TC : EventBase where TH : IMessageEvent<TC> { string eventKey = typeof(TC).Name; if (_messageEvents.ContainsKey(eventKey)) { throw new Exception("The same event has been subscribe"); } _messageEvents.Add(eventKey, typeof(TH)); _eventTypes.Add(typeof(TC)); } }
添加一個ServiceProcesser類,這是RabbitMQ的業務邏輯實現類,用來發送消息,註冊消息接收的回調事件。
public class ServiceProcesser { private const string EXCHANGE_NAME = "ServiceProcesser"; private const string QUEUE_NAME= "domain.event"; private static object _sync = new object(); private IConnectionFactory _connectionFactory; private IConnection _connection; private IEventManager _eventManager; private IModel _consumeChannel; public bool IsConnected => (_connection?.IsOpen).GetValueOrDefault(false); public ServiceProcesser(IEventManager eventManager, IConnectionFactory connectionFactory) { _connectionFactory = connectionFactory; _eventManager = eventManager; CreateConsumer(); } /// <summary> /// 將eventBaseJson 序列化並作為消息發送 /// </summary> /// <param name="eventBase"></param> public void Send(EventBase eventBase) { using (var channel = _connection.CreateModel()) { string evenKey = eventBase.GetType().Name; channel.ExchangeDeclare(EXCHANGE_NAME, "direct"); //Json序列化類來發送消息 string message = JsonConvert.SerializeObject(eventBase); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(EXCHANGE_NAME, evenKey, null, body); } } public bool TryConnection() { lock (_sync) { try { if (!IsConnected) { _connection = _connectionFactory.CreateConnection(); } return true; } catch (Exception) { //寫日誌 return false; } } } /// <summary> /// 使用消息事件的名字綁定路由和隊列 /// </summary> /// <param name="eventName"></param> public void BindEvent(string eventName) { if (!IsConnected) { TryConnection(); } using (var channel = _connection.CreateModel()) { channel.QueueDeclare(QUEUE_NAME, true, false, false, null); channel.QueueBind(QUEUE_NAME,EXCHANGE_NAME,eventName); } } /// <summary> /// 註冊RabbitMQ消費者的回調接口 /// </summary> private void CreateConsumer() { if (!IsConnected) { TryConnection(); } _consumeChannel = _connection.CreateModel(); _consumeChannel.ExchangeDeclare(EXCHANGE_NAME, "direct"); _consumeChannel.QueueDeclare(QUEUE_NAME, true, false, false, null); var consumer = new EventingBasicConsumer(_consumeChannel); consumer.Received += async (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); await ProcessEvent(ea.RoutingKey, message); }; _consumeChannel.BasicConsume(QUEUE_NAME, true, consumer); } /// <summary> /// 通過反射動態消息處理類來動態調用消息處理接口Invoke /// </summary> /// <param name="routeKey"></param> /// <param name="message"></param> /// <returns></returns> private async Task ProcessEvent(string routeKey, string message) { Type eventType = _eventManager.GetEventType(routeKey); if (eventType != null) { object @event = JsonConvert.DeserializeObject(message, eventType); if(@event != null && @event is EventBase) { var handleType = _eventManager.GetEventHandleType(eventType.Name); object handler = Activator.CreateInstance(handleType); await (Task)(typeof(IMessageEvent<>)).MakeGenericType(eventType).GetMethod("Invoke").Invoke(handler,new object[1] { @event }); } } } }
發送消息的Send方法使用參數EventBase類型的事件對象,並將該對象進行Json的序列化作為消息發送。
/// <summary> /// 將eventBaseJson 序列化並作為消息發送 /// </summary> /// <param name="eventBase"></param> public void Send(EventBase eventBase) { using (var channel = _connection.CreateModel()) { string evenKey = eventBase.GetType().Name; channel.ExchangeDeclare(EXCHANGE_NAME, "direct"); //Json序列化類來發送消息 string message = JsonConvert.SerializeObject(eventBase); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(EXCHANGE_NAME, evenKey, null, body); } }
ProcessEvent是消息的接收回調處理方法,在這個方法中根據獲取到的事件名稱調用IEventManager的接口查找對應的事件類型和消息處理類型,並通過反射動態創建消息處理類並調用處理的接口Invoke。
/// <summary> /// 通過反射動態消息處理類來動態調用消息處理接口Invoke /// </summary> /// <param name="routeKey"></param> /// <param name="message"></param> /// <returns></returns> private async Task ProcessEvent(string routeKey, string message) { Type eventType = _eventManager.GetEventType(routeKey); if (eventType != null) { object @event = JsonConvert.DeserializeObject(message, eventType); if(@event != null && @event is EventBase) { var handleType = _eventManager.GetEventHandleType(eventType.Name); object handler = Activator.CreateInstance(handleType); await (Task)(typeof(IMessageEvent<>)).MakeGenericType(eventType).GetMethod("Invoke").Invoke(handler,new object[1] { @event }); } } }
最後添加 IPublisher的實現類 Publisher
public class Publisher : IPublisher { private const string HOST_NAME = "localhost"; private const string USER_NAME = "admin"; private const string PASSWORD = "admin"; private ServiceProcesser _serviceProcesser; private IEventManager _eventManager; public Publisher() { var connectionFactory = new ConnectionFactory(); connectionFactory.HostName = HOST_NAME; connectionFactory.UserName = USER_NAME; connectionFactory.Password = PASSWORD; _eventManager = new EventManager(); _serviceProcesser = new ServiceProcesser(_eventManager, connectionFactory); } public void Publish<TC>(TC @event) where TC : EventBase { if (!_serviceProcesser.IsConnected) { _serviceProcesser.TryConnection(); } _serviceProcesser.Send(@event); } public void Subscribe<TC, TH>() where TC : EventBase where TH : IMessageEvent<TC> { _eventManager.Subscribe<TC,TH>(); _serviceProcesser.BindEvent(typeof(TC).Name); } }
客戶端和服務端訂閱發佈事件
在MyRBQPolisher類庫中添加一個消息的定義類HelloWorldEvent並繼承消息基類EventBase
public class HelloWorldEvent:EventBase { public string MyName { get; set; } = "Joiun"; }
MyRBQClient和MyRBQServer分別引用類庫MyRBQPolisher。
在MyRBQClient中添加一個HelloWorldEvent的消息處理類HelloWroldHandler,
public class HelloWroldHandler : IMessageEvent<HelloWorldEvent> { public Task Invoke(HelloWorldEvent eventBase) { Console.WriteLine(eventBase?.MyName); return Task.FromResult(0); } }
在Main方法中創建消息接口IPublisher和對應的實現類,並訂閱HelloWorldEvent,註冊對應的消息處理類,這樣一來,就實現了對消息HelloWorldEvent的處理,處理的邏輯包含在HelloWroldHandler的Invoke方法中。
static void Main(string[] args) { IPublisher publisher = new Publisher(); publisher.Subscribe<HelloWorldEvent, HelloWroldHandler>(); Console.Read(); }
而MyRBQServer的Main方法就更簡單了,只要創建IPunlisher的接口並發送一個HelloWorldEvent的對象既可。
static void Main(string[] args) { IPublisher publisher = new Publisher(); publisher.Publish(new HelloWorldEvent()); Console.WriteLine("Send Success"); Console.ReadLine(); }
運行程序,結果:
而MyRBQServer如果需要訂閱自己發送的消息,也可以創建一個自己的HelloWroldHandler處理類並註冊訂閱即可。
在上述代碼中,HelloWorldEvent就相當於是一個WCF中的消息契約,需要通信的雙方規定好消息的格式,否則會有序列化方面的問題,而這樣做的好處就是可以將原本的一個個消息關鍵字和對應的行為分散到了不同的消息類和處理類中,並訂閱感興趣的事件來驅動對應的行為,不同事件行為之間互不影響,松耦合,將一個上萬行代碼的消息處理類化整為零,沒有大量的String類型的關鍵字,沒有Switch Case,If Else判斷。
需要注意的是,Inovke方法的每一次動態調用都是在不同的子線程中調用,如果需要在Invoke中處理UI相關的代碼,則可以藉助主線程的上下文來更新。