使用EventBus + Redis发布订阅模式提升业务执行性能

前言

最近一直奔波于面试,面了几家公司的研发。有让我受益颇多的面试经验,也有让我感觉浪费时间的面试经历~
因为疫情原因,最近宅在家里也没事,就想着使用Redis配合事件总线去实现下具体的业务。  

  • 需求

    一个简单的电商,有几个重要的需求点

    商品下单后TODO

    • 存储订单信息
    • 锁定商品库存
    • 消息推送商家端

    订单支付后TODO

    • 存储订单支付信息
    • 商品库存减少
    • 消息推送商家端
    • 会员积分调整

技术思路

这里用控制台实现上面的业务功能外,自行编写一个基于C#反射特性的事件总线,方便具体业务事件的后续扩展,比如订单支付后后续还要加会员消息推送啥的。使用Redis的发布订阅模式对事件处理进行异步化,提升执行性能。
所以最终技术架构就是 事件总线+Redis发布订阅。

完成事件总线

这里先不急着将上面的订单、支付、会员 等进行建模。先将事件总线的架子搭好。首先需要理解事件总线在业务系统的目的是什么。
事件总线存在目的最重要的就是解耦 。我们需要实现的效果就是针对指定事件源对象触发事件后,但凡注册了该事件参数的事件处理类则开始执行相关代码。

下图可以看出我们的事件处理类均需要引用事件参数,所有事件处理类都是基于对事件参数处理的需求上来的。

9d8bb75ee6ad2747e082d625ce99549f.png

但是!并不是意味创建了事件处理类就一定会去执行!能否执行除了取决于事件源的触发外就是必须有一层注册(也可称映射)。
在WinForm程序里处处可见事件的绑定,如 this.button1.OnClick+=button1OnClick;
那么在这里我将绑定事件放置到一个字典里。C#的字典Dictionary是个key value的键值对数据集合,键和值都可以是任意数据类型。
我们可以将事件处理类EventHandle和事件参数EventData作为键和值存储到字典里。在事件源触发时根据EventData反向找出所有的EventHandle

思路就是这样,开始编码了。
定义事件参数接口,后续具体业务的事件参数接口均要继承它。

    /// <summary>      /// 事件参数接口      /// </summary>      public interface IEventData      {          /// <summary>          /// 事件源对象          /// </summary>          object Source { get; set; }            ///// <summary>          ///// 事件发生的数据          ///// </summary>          //TDataModel Data { get; set; }            /// <summary>          /// 事件发生时间          /// </summary>          DateTime Time { get; set; }      }  

  

需要一个事件处理接口,后续具体业务的事件处理接口均需要继承它

    /// <summary>      /// 事件实现接口      /// </summary>      public interface IEventHandle<T> where T : IEventData      {          /// <summary>          /// 处理等级          /// 方便事件总线触发时候可以有序的执行相应          /// </summary>          /// <returns></returns>          int ExecuteLevel { get; }            /// <summary>          /// 事件执行          /// </summary>          /// <param name="eventData">事件参数</param>          void Execute(T eventData);      }  

  

现在已经将事件参数和事件处理都抽象出来了,接下来是要实现上面说的注册容器的实现了。

   /// <summary>      /// 事件仓库      /// </summary>      public interface IEventStore      {          /// <summary>          /// 事件注册          /// </summary>          /// <param name="handle">事件实现对象</param>          /// <param name="data">事件参数</param>          void EventRegister(Type handle, Type data);            /// <summary>          /// 事件取消注册          /// </summary>          /// <param name="handle">事件实现对象</param>          void EventUnRegister(Type handle);            /// <summary>          /// 获取事件处理对象          /// </summary>          /// <param name="data"></param>          /// <returns></returns>          Type GetEventHandle(Type data);            /// <summary>          /// 根据事件参数获取事件处理集合          /// </summary>          /// <typeparam name="TEventData">事件参数类型</typeparam>          /// <param name="data">事件参数</param>          /// <returns></returns>          IEnumerable<Type> GetEventHandleList<TEventData>(TEventData data);      }  

  

实现上面的接口

    /// <summary>      /// 基于反射实现的事件仓储      /// 存储事件处理对象和事件参数      /// </summary>      public class ReflectEventStore : IEventStore      {          private static Dictionary<Type, Type> StoreLst;            public ReflectEventStore()          {              StoreLst = new Dictionary<Type, Type>();          }            public void EventRegister(Type handle, Type data)          {              if (handle == null || data == null) throw new NullReferenceException();              if (StoreLst.Keys.Contains(handle))                  throw new ArgumentException($"事件总线已注册类型为{nameof(handle)} !");                if (!StoreLst.TryAdd(handle, data))                  throw new Exception($"注册{nameof(handle)}类型到事件总线失败!");          }              public void EventUnRegister(Type handle)          {              if (handle == null) throw new NullReferenceException();              StoreLst.Remove(handle);          }            public Type GetEventHandle(Type data)          {              if (data == null) throw new NullReferenceException();              Type handle = StoreLst.FirstOrDefault(p => p.Value == data).Key;              return handle;          }            public IEnumerable<Type> GetEventHandleList<TEventData>(TEventData data)          {              if (data == null) throw new NullReferenceException();              var items = StoreLst.Where(p => p.Value == data.GetType())                                    .Select(k => k.Key);              return items;          }      }  

  

根据上面代码可以看出来,我们存储到Dictionary内的是Type类型,GetEventHandleList方法最终获取的是一个List<Type>的集合。
我们需要在下面创建的EventBus类里循环List<Type>并且执行这个事件处理类的Execute方法。

实现EventBus

    /// <summary>      /// 事件总线服务      /// </summary>      public class EventBus : ReflectEventStore      {            public void Trigger<TEventData>(TEventData data, SortType sort = SortType.Asc) where TEventData : IEventData          {              // 这里如需保证顺序执行则必须循环两次 - -....              var items = GetEventHandleList(data).ToList();              Dictionary<object, Tuple<Type, int>> ds = new Dictionary<object, Tuple<Type, int>>();                foreach (var item in items)              {                  var instance = Activator.CreateInstance(item);                  MethodInfo method = item.GetMethod("get_ExecuteLevel");                  int value = (int)method.Invoke(instance, null);                  ds.Add(instance, new Tuple<Type, int>(item, value));              }                var lst = sort == SortType.Asc ? ds.OrderBy(p => p.Value.Item2).ToList() : ds.OrderByDescending(p => p.Value.Item2).ToList();                foreach (var k in lst)              {                  MethodInfo method = k.Value.Item1.GetMethod("Execute");                  method.Invoke(k.Key, new object[] { data });              }          }      }  

  

上面可以看到,我们的事件总线是支持对绑定的事件处理对象进行有序处理,需要依赖下面这个枚举

    /// <summary>      /// 排序类型      /// </summary>      public enum SortType      {          /// <summary>          /// 升序          /// </summary>          Asc = 1,          /// <summary>          /// 降序          /// </summary>          Desc = 2      }  

  

好了,至此,我们的简易版的事件总线就出来了~ 接下来就是去建模、实现相应的事件参数和事件处理类了。
创建订单模型:

   /// <summary>      /// 订单模型      /// </summary>      public class OrderModel      {          /// <summary>          /// 订单ID          /// </summary>          public Guid Id { get; set; }            /// <summary>          /// 用户ID          /// </summary>          public Guid UserId { get; set; }            /// <summary>          /// 订单创建时间          /// </summary>          public DateTime CreateTime { get; set; }            /// <summary>          /// 商品名称          /// </summary>          public string ProductName { get; set; }            /// <summary>          /// 购买数量          /// </summary>          public int Number { get; set; }            /// <summary>          /// 订单金额          /// </summary>          public decimal Money { get; set; }      }  

  

创建订单下单事件参数

    public interface IOrderCreateEventData : IEventData      {          /// <summary>          /// 订单信息          /// </summary>          OrderModel Order { get; set; }      }        /// <summary>      /// 订单创建事件参数      /// </summary>      public class OrderCreateEventData : IOrderCreateEventData      {          public OrderModel Order { get; set; }          public object Source { get; set; }          public DateTime Time { get; set; }      }  

OK~接下来就是实现我们上面需求上的那些功能了。

  • 存储订单信息
  • 锁定商品库存
  • 消息推送商家端
    这里我不实现存储订单信息的事件处理对象,我默认此业务必须同步处理,至于后面两个则可以采取异步处理。通过下面代码创建相应的事件处理类。
    订单创建事件之消息推送商家端处理类。
    /// <summary>      /// 订单创建事件之消息处理类      /// </summary>      public class OrderCreateEventNotifyHandle : IEventHandle<IOrderCreateEventData>      {          public int ExecuteLevel { get; private set; }            public OrderCreateEventNotifyHandle()          {              Console.WriteLine($"创建OrderCreateEventNotifyHandle对象");              this.ExecuteLevel = 2;          }            public void Execute(IOrderCreateEventData eventData)          {              Thread.Sleep(1000);              Console.WriteLine($"执行订单创建事件之消息推送!订单ID:{eventData.Order.Id.ToString()},商品名称:{eventData.Order.ProductName}");          }        }  

  

订单创建消息之锁定库存处理类

   /// <summary>      /// 订单创建事件 锁定库存 处理类      /// </summary>      public class OrderCreateEventStockLockHandle : IEventHandle<IOrderCreateEventData>      {          public int ExecuteLevel { get; private set; }            public OrderCreateEventStockLockHandle()          {              Console.WriteLine($"创建OrderCreateEventStockLockHandle对象");              this.ExecuteLevel = 1;          }              public void Execute(IOrderCreateEventData eventData)          {              Thread.Sleep(1000);              Console.WriteLine($"执行订单创建事件之库存锁定!订单ID:{eventData.Order.Id.ToString()},商品名称:{eventData.Order.ProductName}");          }      }  

  

OK~ 到main方法下开始执行订单创建相关代码。

        static void Main(string[] args)          {                Guid userId = Guid.NewGuid();              Stopwatch stopwatch = new Stopwatch();              stopwatch.Start();              EventBus eventBus = new EventBus();              eventBus.EventRegister(typeof(OrderCreateEventNotifyHandle), typeof(OrderCreateEventData));              eventBus.EventRegister(typeof(OrderCreateEventStockLockHandle), typeof(OrderCreateEventData));              var order = new Order.OrderModel()              {                  CreateTime = DateTime.Now,                  Id = Guid.NewGuid(),                  Money = (decimal)300.00,                  Number = 1,                  ProductName = "鲜花一束",                  UserId = userId              };              Console.WriteLine($"模拟存储订单");              Thread.Sleep(1000);              eventBus.Trigger(new OrderCreateEventData()              {                  Order = order              });              stopwatch.Stop();              Console.WriteLine($"下单总耗时:{stopwatch.ElapsedMilliseconds}毫秒");              Console.ReadLine();          }  

  至此,我们采取事件总线的方式成功将需求实现了,执行后结果如下:

 

 

可以看到我们的下单总耗时是3038毫秒,如您所见,我们解决了代码的耦合性但是没有解决代码的执行效率。
下一章,将我们的Redis的发布订阅模式再加入进来,看是否能改善我们的代码执行效率~~

源码在下一篇博客上提供下载地址(毕竟现在才完成一半~)