事件中心 – 解耦核心業務與輔助功能

  • 2019 年 12 月 20 日
  • 筆記

隨着業務的不斷發展,原來融入在業務系統主流程中的輔助功能越來越多,每次增加新的邏輯,都要修改主幹流程,比如發送微信服務號消息,發送郵件提醒等。 這種做法,讓主幹流程和輔助功能耦合太緊密,很容易在修改輔助功能的時候,導致主幹流程的bug。本文介紹利用事件中心,讓主幹流程專註於業務核心,其他輔助功能會通過監聽事件中心來實現,大大解耦了核心業務和輔助邏輯。

實現

  • 事件中心EventHub

事件中心,一般用來監聽收集各種事件並分發給監聽者列表.

/**   * 事件中心,一般用來監聽收集各種事件並分發給監聽者列表.   *   * @author tenmao   * @since 2019/12/12   */  @Slf4j  public class EventHub<Event, Entity> {      private final MdcThreadPoolExecutor mdcThreadPoolExecutor = MdcThreadPoolExecutor.newWithInheritedMdc(4, 16, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<>(100));        /**       * 同步監聽(一般可以用在事務一致性場景).       */      private Map<Event, Set<Consumer<Entity>>> syncListenerMap = new HashMap<>();        /**       * 異步監聽.       */      private Map<Event, Set<Consumer<Entity>>> asyncListenerMap = new HashMap<>();          /**       * 註冊同步監聽器.       *       * @param event        事件       * @param syncListener 同步監聽器       */      public void subscribeSync(Event event, Consumer<Entity> syncListener) {          Preconditions.checkNotNull(event);          Preconditions.checkNotNull(syncListener);          log.info("subscribeSync: event[{}], listener[{}]", event, syncListener);          addListener(syncListenerMap, event, syncListener);      }        /**       * 註冊異步監聽器.       *       * @param event         事件       * @param asyncListener 異步監聽器       */      public void subscribeAsyncListener(Event event, Consumer<Entity> asyncListener) {          Preconditions.checkNotNull(event);          Preconditions.checkNotNull(asyncListener);          log.info("subscribeAsyncListener: event[{}], listener[{}]", event, asyncListener);          addListener(asyncListenerMap, event, asyncListener);      }        private synchronized void addListener(Map<Event, Set<Consumer<Entity>>> map, Event event, Consumer<Entity> listener) {          Set<Consumer<Entity>> consumers = map.get(event);          if (consumers == null) {              consumers = new HashSet<>();              map.putIfAbsent(event, consumers);          }          consumers.add(listener);      }          /**       * 發佈事件.       *       * @param event  事件       * @param entity 事件實體       */      public void publishEvent(Event event, Entity entity) {          //訂單狀態變更          log.info("publishEvent: event[{}], entity[{}]", event, entity);          for (Consumer<Entity> consumer : syncListenerMap.get(event)) {              consumer.accept(entity);          }            for (Consumer<Entity> asyncObserver : asyncListenerMap.get(event)) {              mdcThreadPoolExecutor.execute(() -> {                  try {                      asyncObserver.accept(entity);                  } catch (Exception e) {                      log.warn("fail to finish listener for event: event[{}], entity[{}]", event, entity);                  }              });          }      }  }
  • 為不同類型事件配置事件中心
/**   * @author tenmao   * @since 2019/12/12   */  @Configuration  public class EventHubConfiguration {      /**       * 訂單狀態事件中心.       */      @Bean      public EventHub<OrderStatus, Order> orderEventHub() {          return new EventHub<>();      }        /**       * 退款狀態事件中心.       */      @Bean      public EventHub<RefundStatus, Refund> refundEventHub() {          return new EventHub<>();      }  }

事件監聽者

/**   * 發送提醒消息.   *   * @author tenmao   * @since 2019/12/12   */  @Slf4j  @Component  public class EmailListener {      @Resource      private EmailRemoteManager EmailRemoteManager;        @Resource      private EventHub<OrderStatus, Order> orderEventHub;        @PostConstruct      private void init() {          //監聽多個不同的事件,監聽到後會發送提醒消息          orderEventHub.subscribeAsyncListener(OrderStatus.PAYED, this::handleOrderPayed);          orderEventHub.subscribeAsyncListener(OrderStatus.ARRIVAL, this::handleOrderArrival);      }        private void handleOrderPayed(Order order) {          //推送 Email          EmailRequest EmailRequest = EmailRequest.build(order);          EmailRemoteManager.sendEmail(EmailRequest);      }        private void handleOrderArrival(Order order) {          //推送 Email          EmailRequest EmailRequest = EmailRequest.build(order);          EmailRemoteManager.sendEmail(EmailRequest);      }  }

事件源

業務系統,比如訂單系統中,會在不同的訂單狀態時,發送相應的訂單事件.

@Slf4j  @Component  public class OrderManager {      @Resource      private EventHub<OrderStatus, Order> orderEventHub;        public void payOrder(String orderNo) {          //TODO 訂單付款          Order order = getOne(orderNo);          orderEventHub.publishEvent(OrderStatus.PAYED, order);      }        public void orderArrival(String orderNo) {          //TODO 訂單到貨          Order order = getOne(orderNo);          orderEventHub.publishEvent(OrderStatus.ARRIVAL, order);      }      //TODO 還有其他事件  }