事件中心 – 解耦核心业务与辅助功能
- 2019 年 12 月 20 日
- 筆記
随着业务的不断发展,原来融入在业务系统主流程中的辅助功能越来越多,每次增加新的逻辑,都要修改主干流程,比如发送微信服务号消息,发送邮件提醒等。 这种做法,让主干流程和辅助功能耦合太紧密,很容易在修改辅助功能的时候,导致主干流程的bug。本文介绍利用事件中心,让主干流程专注于业务核心,其他辅助功能会通过监听事件中心来实现,大大解耦了核心业务和辅助逻辑。
实现
- 事件中心
EventHub
事件中心,一般用来监听收集各种事件并分发给监听者列表.
/** * 事件中心,一般用来监听收集各种事件并分发给监听者列表. * * @author tenmao * @since 2019/12/12 */ @Slf4j public class EventHub<Event, Entity> { private final MdcThreadPoolExecutor mdcThreadPoolExecutor = MdcThreadPoolExecutor.newWithInheritedMdc(4, 16, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<>(100)); /** * 同步监听(一般可以用在事务一致性场景). */ private Map<Event, Set<Consumer<Entity>>> syncListenerMap = new HashMap<>(); /** * 异步监听. */ private Map<Event, Set<Consumer<Entity>>> asyncListenerMap = new HashMap<>(); /** * 注册同步监听器. * * @param event 事件 * @param syncListener 同步监听器 */ public void subscribeSync(Event event, Consumer<Entity> syncListener) { Preconditions.checkNotNull(event); Preconditions.checkNotNull(syncListener); log.info("subscribeSync: event[{}], listener[{}]", event, syncListener); addListener(syncListenerMap, event, syncListener); } /** * 注册异步监听器. * * @param event 事件 * @param asyncListener 异步监听器 */ public void subscribeAsyncListener(Event event, Consumer<Entity> asyncListener) { Preconditions.checkNotNull(event); Preconditions.checkNotNull(asyncListener); log.info("subscribeAsyncListener: event[{}], listener[{}]", event, asyncListener); addListener(asyncListenerMap, event, asyncListener); } private synchronized void addListener(Map<Event, Set<Consumer<Entity>>> map, Event event, Consumer<Entity> listener) { Set<Consumer<Entity>> consumers = map.get(event); if (consumers == null) { consumers = new HashSet<>(); map.putIfAbsent(event, consumers); } consumers.add(listener); } /** * 发布事件. * * @param event 事件 * @param entity 事件实体 */ public void publishEvent(Event event, Entity entity) { //订单状态变更 log.info("publishEvent: event[{}], entity[{}]", event, entity); for (Consumer<Entity> consumer : syncListenerMap.get(event)) { consumer.accept(entity); } for (Consumer<Entity> asyncObserver : asyncListenerMap.get(event)) { mdcThreadPoolExecutor.execute(() -> { try { asyncObserver.accept(entity); } catch (Exception e) { log.warn("fail to finish listener for event: event[{}], entity[{}]", event, entity); } }); } } }
- 为不同类型事件配置事件中心
/** * @author tenmao * @since 2019/12/12 */ @Configuration public class EventHubConfiguration { /** * 订单状态事件中心. */ @Bean public EventHub<OrderStatus, Order> orderEventHub() { return new EventHub<>(); } /** * 退款状态事件中心. */ @Bean public EventHub<RefundStatus, Refund> refundEventHub() { return new EventHub<>(); } }
事件监听者
/** * 发送提醒消息. * * @author tenmao * @since 2019/12/12 */ @Slf4j @Component public class EmailListener { @Resource private EmailRemoteManager EmailRemoteManager; @Resource private EventHub<OrderStatus, Order> orderEventHub; @PostConstruct private void init() { //监听多个不同的事件,监听到后会发送提醒消息 orderEventHub.subscribeAsyncListener(OrderStatus.PAYED, this::handleOrderPayed); orderEventHub.subscribeAsyncListener(OrderStatus.ARRIVAL, this::handleOrderArrival); } private void handleOrderPayed(Order order) { //推送 Email EmailRequest EmailRequest = EmailRequest.build(order); EmailRemoteManager.sendEmail(EmailRequest); } private void handleOrderArrival(Order order) { //推送 Email EmailRequest EmailRequest = EmailRequest.build(order); EmailRemoteManager.sendEmail(EmailRequest); } }
事件源
业务系统,比如订单系统中,会在不同的订单状态时,发送相应的订单事件.
@Slf4j @Component public class OrderManager { @Resource private EventHub<OrderStatus, Order> orderEventHub; public void payOrder(String orderNo) { //TODO 订单付款 Order order = getOne(orderNo); orderEventHub.publishEvent(OrderStatus.PAYED, order); } public void orderArrival(String orderNo) { //TODO 订单到货 Order order = getOne(orderNo); orderEventHub.publishEvent(OrderStatus.ARRIVAL, order); } //TODO 还有其他事件 }