事件中心 – 解耦核心業務與輔助功能
- 2019 年 12 月 20 日
- 筆記
隨着業務的不斷發展,原來融入在業務系統主流程中的輔助功能越來越多,每次增加新的邏輯,都要修改主幹流程,比如發送微信服務號消息,發送郵件提醒等。 這種做法,讓主幹流程和輔助功能耦合太緊密,很容易在修改輔助功能的時候,導致主幹流程的bug。本文介紹利用事件中心,讓主幹流程專註於業務核心,其他輔助功能會通過監聽事件中心來實現,大大解耦了核心業務和輔助邏輯。
實現
- 事件中心
EventHub
事件中心,一般用來監聽收集各種事件並分發給監聽者列表.
/** * 事件中心,一般用來監聽收集各種事件並分發給監聽者列表. * * @author tenmao * @since 2019/12/12 */ @Slf4j public class EventHub<Event, Entity> { private final MdcThreadPoolExecutor mdcThreadPoolExecutor = MdcThreadPoolExecutor.newWithInheritedMdc(4, 16, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<>(100)); /** * 同步監聽(一般可以用在事務一致性場景). */ private Map<Event, Set<Consumer<Entity>>> syncListenerMap = new HashMap<>(); /** * 異步監聽. */ private Map<Event, Set<Consumer<Entity>>> asyncListenerMap = new HashMap<>(); /** * 註冊同步監聽器. * * @param event 事件 * @param syncListener 同步監聽器 */ public void subscribeSync(Event event, Consumer<Entity> syncListener) { Preconditions.checkNotNull(event); Preconditions.checkNotNull(syncListener); log.info("subscribeSync: event[{}], listener[{}]", event, syncListener); addListener(syncListenerMap, event, syncListener); } /** * 註冊異步監聽器. * * @param event 事件 * @param asyncListener 異步監聽器 */ public void subscribeAsyncListener(Event event, Consumer<Entity> asyncListener) { Preconditions.checkNotNull(event); Preconditions.checkNotNull(asyncListener); log.info("subscribeAsyncListener: event[{}], listener[{}]", event, asyncListener); addListener(asyncListenerMap, event, asyncListener); } private synchronized void addListener(Map<Event, Set<Consumer<Entity>>> map, Event event, Consumer<Entity> listener) { Set<Consumer<Entity>> consumers = map.get(event); if (consumers == null) { consumers = new HashSet<>(); map.putIfAbsent(event, consumers); } consumers.add(listener); } /** * 發佈事件. * * @param event 事件 * @param entity 事件實體 */ public void publishEvent(Event event, Entity entity) { //訂單狀態變更 log.info("publishEvent: event[{}], entity[{}]", event, entity); for (Consumer<Entity> consumer : syncListenerMap.get(event)) { consumer.accept(entity); } for (Consumer<Entity> asyncObserver : asyncListenerMap.get(event)) { mdcThreadPoolExecutor.execute(() -> { try { asyncObserver.accept(entity); } catch (Exception e) { log.warn("fail to finish listener for event: event[{}], entity[{}]", event, entity); } }); } } }
- 為不同類型事件配置事件中心
/** * @author tenmao * @since 2019/12/12 */ @Configuration public class EventHubConfiguration { /** * 訂單狀態事件中心. */ @Bean public EventHub<OrderStatus, Order> orderEventHub() { return new EventHub<>(); } /** * 退款狀態事件中心. */ @Bean public EventHub<RefundStatus, Refund> refundEventHub() { return new EventHub<>(); } }
事件監聽者
/** * 發送提醒消息. * * @author tenmao * @since 2019/12/12 */ @Slf4j @Component public class EmailListener { @Resource private EmailRemoteManager EmailRemoteManager; @Resource private EventHub<OrderStatus, Order> orderEventHub; @PostConstruct private void init() { //監聽多個不同的事件,監聽到後會發送提醒消息 orderEventHub.subscribeAsyncListener(OrderStatus.PAYED, this::handleOrderPayed); orderEventHub.subscribeAsyncListener(OrderStatus.ARRIVAL, this::handleOrderArrival); } private void handleOrderPayed(Order order) { //推送 Email EmailRequest EmailRequest = EmailRequest.build(order); EmailRemoteManager.sendEmail(EmailRequest); } private void handleOrderArrival(Order order) { //推送 Email EmailRequest EmailRequest = EmailRequest.build(order); EmailRemoteManager.sendEmail(EmailRequest); } }
事件源
業務系統,比如訂單系統中,會在不同的訂單狀態時,發送相應的訂單事件.
@Slf4j @Component public class OrderManager { @Resource private EventHub<OrderStatus, Order> orderEventHub; public void payOrder(String orderNo) { //TODO 訂單付款 Order order = getOne(orderNo); orderEventHub.publishEvent(OrderStatus.PAYED, order); } public void orderArrival(String orderNo) { //TODO 訂單到貨 Order order = getOne(orderNo); orderEventHub.publishEvent(OrderStatus.ARRIVAL, order); } //TODO 還有其他事件 }