Spring 事件監聽機制及原理分析

簡介

在JAVA體系中,有支持實現事件監聽機制,在Spring 中也專門提供了一套事件機制的接口,方便我們實現。比如我們可以實現當用戶註冊後,給他發送一封郵件告訴他註冊成功的一些信息,比如用戶訂閱的主題更新了,通知用戶注意及時查看等。

觀察者模式

觀察者模式還有很多其他的稱謂,如發佈-訂閱(Publish/Subscribe)模式、模型-視圖(Model/View)模式、源-監聽器(Source/Listener)模式或從屬者(Dependents)模式。觀察者模式定義了一種一對多的依賴關係,讓多個觀察者對象同時監聽某一個主題對象。這個主題對象在狀態上發生變化時,會通知所有觀察者對象,使它們能夠自動更新自己。

觀察者模式一般包含以下幾個對象:

Subject:

被觀察的對象。它提供一系列方法來增加和刪除觀察者對象,同時它定義了通知方法notify()。目標類可以是接口,也可以是抽象類或具體類。

ConcreteSubject:

具體的觀察對象。Subject的具體實現類,在這裡實現通知事件。

Observer:

觀察者。這裡是抽象的觀察者,觀察者有一個或者多個。

ConcreteObserver:

具體的觀察者。在這裡維護觀察對象的具體操作。

Java 中的事件機制

Java中提供了基本的事件處理基類:

  1. EventObject:所有事件狀態對象都將從其派生的根類;
  2. EventListener:所有事件偵聽器接口必須擴展的標記接口;

非常經典的開門案例:

一、創建事件對象

@Getter
@Setter
public class DoorEvent extends EventObject{

    int state;

    public DoorEvent(Object source){
        super(source);
    }
    public DoorEvent(Object source,int state){
        super(source);
        this.state = state;
    }
}

二、事件監聽器

public interface DoorListener extends EventListener{
    void doorEvent(DoorEvent doorEvent);
}
public class CloseDoorEvent implements DoorListener{
    @Override
    public void doorEvent(DoorEvent doorEvent){
        if(doorEvent.getState() == -1){
            System.out.println("門關上了");
        }
    }
}
public class OpenDoorListener implements DoorListener{
    @Override
    public void doorEvent(DoorEvent doorEvent){
        if(doorEvent.getState() == 1){
            System.out.println("門打開了");
        }
    }
}

三、測試

public static void main(String[] args){
    List<DoorListener> list = new ArrayList<>();
    list.add(new OpenDoorListener());
    list.add(new CloseDoorEvent());
    for(DoorListener listener : list){
        listener.doorEvent(new DoorEvent(-1,-1));
        listener.doorEvent(new DoorEvent(1,1));
    }
}

四、輸出結果

門打開了
門關上了

Spring 中的事件機制

在 Spring 容器中通過ApplicationEven類和 ApplicationListener接口來實現事件監聽機制,每次Event 被發佈到Spring容器中時都會通知該Listener。需要注意的是,Spring 的事件默認是同步的,調用 publishEvent 方法發佈事件後,它會處於阻塞狀態,直到Listener接收到事件並處理返回之後才繼續執行下去。

代碼示例:

一、定義事件對象

@Getter
@Setter
@ToString
public class UserDTO extends ApplicationEvent{
    private Integer userId;
    private String name;
    private Integer age;

    public UserDTO(Object source){
        super(source);
    }
}

二、定義事件監聽器,可以通過註解或者實現接口來實現。

@Component
public class UserRegisterSmsListener{

		// 通過註解實現監聽器
    @EventListener
    public void handleUserEvent(UserDTO userDTO){
        System.out.println("監聽到用戶註冊,準備發送短訊,user:"+userDTO.toString());
    }
}

// 通過實現接口實現監聽器
@Component
public class UserRegisterEmailListener implements ApplicationListener<UserDTO>{
    @Override
    public void onApplicationEvent(UserDTO userDTO){
        System.out.println("監聽到用戶註冊,準備發送郵件,user:" + userDTO.toString());
    }
}
@Component
public class UserRegisterMessageListener implements ApplicationListener<UserDTO>{
    @Override
    public void onApplicationEvent(UserDTO userDTO){
        System.out.println("監聽到用戶註冊,給新用戶發送首條站內短消息,user:" + userDTO.toString());
    }
}

三、註冊服務

public interface UserService{
    void register();
}
@Service
public class UserServiceImpl implements UserService{
    @Autowired
    private ApplicationEventPublisher eventPublisher;
    @Override
    public void register(){
        UserDTO userDTO = new UserDTO(this);
        userDTO.setAge(18);
        userDTO.setName("admol");
        userDTO.setUserId(1001);
        System.out.println("register user");
        eventPublisher.publishEvent(userDTO);
    }
}

四、測試

@Autowired
private UserService userService;

@Test
public void testUserEvent(){
    userService.register();
}

五、輸出結果

register user
監聽到用戶註冊,準備發送短訊,user:UserDTO(userId=1001, name=admol, age=18)
監聽到用戶註冊,準備發送郵件,user:UserDTO(userId=1001, name=admol, age=18)
監聽到用戶註冊,給新用戶發送首條站內短消息,user:UserDTO(userId=1001, name=admol, age=18)

指定監聽器的順序

監聽器的發佈順序是按照 bean 自然裝載的順序執行的,Spring 支持兩種方式來實現有序

一、實現SmartApplicationListener接口指定順序。

把上面三個Listener都改成實現SmartApplicationListener接口,並指定getOrder的返回值,返回值越小,優先級越高。

@Component
public class UserRegisterMessageListener implements SmartApplicationListener{

    @Override
    public boolean supportsEventType(Class<? extends ApplicationEvent> eventType){
        return eventType == UserDTO.class;
    }

    @Override
    public boolean supportsSourceType(Class<?> sourceType){
        return true;
    }

    @Override
    public void onApplicationEvent(ApplicationEvent event){
        System.out.println("監聽到用戶註冊,給新用戶發送首條站內短消息,user:" + event.toString());
    }

    @Override
    public int getOrder(){
        return -1;
    }
}

另外兩個監聽器的改造省略,指定改造後的UserRegisterSmsListener返回order為0,UserRegisterEmailListener的getOrder返回1,測試輸出結果如下:

register user
監聽到用戶註冊,給新用戶發送首條站內短消息,user:UserDTO(userId=1001, name=admol, age=18)
監聽到用戶註冊,準備發送短訊,user:UserDTO(userId=1001, name=admol, age=18)
監聽到用戶註冊,準備發送郵件,user:UserDTO(userId=1001, name=admol, age=18)

二、使用註解@Order()

@Component
public class UserRegisterSmsListener{

    @Order(-2)
    @EventListener
    public void handleUserEvent(UserDTO userDTO){
        System.out.println("監聽到用戶註冊,準備發送短訊,user:"+userDTO.toString());
    }
}

測試輸出結果如下:

register user
監聽到用戶註冊,準備發送短訊,user:UserDTO(userId=1001, name=admol, age=18)
監聽到用戶註冊,給新用戶發送首條站內短消息,user:UserDTO(userId=1001, name=admol, age=18)
監聽到用戶註冊,準備發送郵件,user:UserDTO(userId=1001, name=admol, age=18)

可以發現,短訊監聽器最先執行。

異步支持

Spring 事件機制默認是同步阻塞的,如果 ApplicationEventPublisher 發佈事件之後他會一直阻塞等待listener 響應,多個 listener 的情況下前面的沒有執行完後面的會一直被阻塞。這時候我們可以利用 Spring 提供的線程池註解 @Async 來實現異步線程

一、使用 @Async 之前需要先開啟線程池,在 啟動類上添加 @EnableAsync 註解即可。

@EnableAsync
@SpringBootApplication
public class DemoApplication {
		public static void main(String[] args) {
				SpringApplication.run(DemoApplication.class, args);
		}
}

二、監聽器使用異步線程

自定義異步線程池

@Configuration
public class AsyncConfig{

    @Bean("asyncThreadPool")
    public Executor getAsyncExecutor(){
        System.out.println("asyncThreadPool init");
        Executor executor = new ThreadPoolExecutor(
                10,20,60L,TimeUnit.SECONDS
                ,new ArrayBlockingQueue<>(100),new MyThreadFactory());
        return executor;
    }

    class MyThreadFactory implements ThreadFactory{
        final AtomicInteger threadNumber = new AtomicInteger(0);
        @Override
        public Thread newThread(Runnable r){
            Thread t = new Thread(r);
            t.setName("async-thread-"+threadNumber.getAndIncrement());
            t.setDaemon(true);
            return t;
        }
    }
}

指定監聽器的線程池

@Component
public class UserRegisterSmsListener{

    @Order(-2)
    @Async("asyncThreadPool")
    @EventListener
    public void handleUserEvent(UserDTO userDTO){
        System.out.println(Thread.currentThread().getName() + " 監聽到用戶註冊,準備發送短訊,user:"+userDTO.toString());
    }
}

三、測試輸出結果

register user
監聽到用戶註冊,給新用戶發送首條站內短消息,user:UserDTO(userId=1001, name=admol, age=18)
監聽到用戶註冊,準備發送郵件,user:UserDTO(userId=1001, name=admol, age=18)
async-thread-0 監聽到用戶註冊,準備發送短訊,user:UserDTO(userId=1001, name=admol, age=18)

Spring事件機制原理分析

Spring事件機制涉及的重要類主要有以下四個:

ApplicationEvent:
事件對象,繼承至JDK的類EventObject ,可以攜帶事件的時間戳

ApplicationListener:
事件監聽器,繼承至JDK的接口EventListener,該接口被所有的事件監聽器實現,比如支持指定順序的SmartApplicationListener

ApplicationEventMulticaster:
事件管理者,管理監聽器和發佈事件,ApplicationContext通過委託ApplicationEventMulticaster來 發佈事件

ApplicationEventPublisher:
事件發佈者,該接口封裝了事件有關的公共方法,作為ApplicationContext的超級街廓,也是委託 ApplicationEventMulticaster完成事件發佈。

源碼展示

ApplicationEvent

事件對象ApplicationEvent的主要源代碼如下,繼承了JAVA的 EventObject 對象:

public abstract class ApplicationEvent extends EventObject {
	private static final long serialVersionUID = 7099057708183571937L;
	private final long timestamp; // 多了一個時間戳屬性
	public ApplicationEvent(Object source) {
		super(source);
		this.timestamp = System.currentTimeMillis(); // 初始當前化時間戳
	}
	public final long getTimestamp() {
		return this.timestamp;
	}
}

從上面ApplicationEvent的子類關係圖種可以發現,ApplicationEvent有一個重要的子類ApplicationContextEvent,而ApplicationContextEvent又有4個重要的子類ContextStartedEventContextRefreshedEventContextClosedEventContextStoppedEvent

從名字就可以看出,這4個事件都和Spring容器有關係的:

  • ContextRefreshedEvent:當spring容器context刷新時觸發
  • ContextStartedEvent:當spring容器context啟動後觸發
  • ContextStoppedEvent:當spring容器context停止時觸發
  • ContextClosedEvent:當spring容器context關閉時觸發,容器被關閉時,其管理的所有單例Bean都被銷毀。

當每個事件觸發時,相關的監聽器就會監聽到相應事件,然後觸發onApplicationEvent方法。

ApplicationListener

事件監聽器,繼承DK的接口EventListener

/* ...
 * @author Rod Johnson
 * @author Juergen Hoeller
 * @param <E> the specific ApplicationEvent subclass to listen to
 * @see org.springframework.context.event.ApplicationEventMulticaster
 */
public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {

	/**
	 * Handle an application event. by jinglingwang.cn
	 * @param event the event to respond to
	 */
	void onApplicationEvent(E event);

}

注釋@param <E> the specific ApplicationEvent subclass to listen [email protected] ApplicationEventMulticaster 裏面說明了事件的廣播在ApplicationEventMulticaster類。

ApplicationEventMulticaster

ApplicationEventMulticaster是一個接口,負責管理監聽器和發佈事件,定義了如下方法:

  1. addApplicationListener(ApplicationListener<?> listener) :新增一個listener;
  2. addApplicationListenerBean(String listenerBeanName):新增一個listener,參數為bean name;
  3. removeApplicationListener(ApplicationListener<?> listener):刪除listener;
  4. void removeAllListeners():刪除所有的Listener
  5. removeApplicationListenerBean(String listenerBeanName):根據bean name 刪除listener;
  6. multicastEvent(ApplicationEvent event):廣播事件;
  7. multicastEvent(ApplicationEvent event, @Nullable ResolvableType eventType):廣播事件,指定事件的source類型。

AbstractApplicationEventMulticaster 實現了 ApplicationEventMulticaster接口,SimpleApplicationEventMulticaster 繼承了AbstractApplicationEventMulticaster ;

  1. AbstractApplicationEventMulticaster 主要實現了管理監聽器的方法(上面接口的前5個方法)

  2. SimpleApplicationEventMulticaster 主要實現了事件廣播相關的方法(上面接口的最後2個方法)

    兩個類分別繼承了部分上面的方法。

一、先看新增Listener方法實現邏輯:

public abstract class AbstractApplicationEventMulticaster
		implements ApplicationEventMulticaster, BeanClassLoaderAware, BeanFactoryAware {

		private final ListenerRetriever defaultRetriever = new ListenerRetriever(false);

...
		@Override
		public void addApplicationListener(ApplicationListener<?> listener) {
			synchronized (this.retrievalMutex) { // 加排他鎖
				// Explicitly remove target for a proxy, if registered already,
				// in order to avoid double invocations of the same listener.
				Object singletonTarget = AopProxyUtils.getSingletonTarget(listener);
				if (singletonTarget instanceof ApplicationListener) {
          // 刪除,避免重複調用
					this.defaultRetriever.applicationListeners.remove(singletonTarget);
				}
        // 加入到Set LinkedHashSet 集合中
				this.defaultRetriever.applicationListeners.add(listener);
				this.retrieverCache.clear(); // 緩存
			}
		}
...
}

最核心的一句代碼:this.defaultRetriever.applicationListeners.add(listener);

ListenerRetriever類是AbstractApplicationEventMulticaster類的內部類,裏面有兩個集合,用來記錄維護事件監聽器。

private class ListenerRetriever {

		public final Set<ApplicationListener<?>> applicationListeners = new LinkedHashSet<>();
		public final Set<String> applicationListenerBeans = new LinkedHashSet<>();
		...
}

這就和設計模式中的發佈訂閱模式一樣了,維護一個List,用來管理所有的訂閱者,當發佈者發佈消息時,遍歷對應的訂閱者列表,執行各自的回調handler。

二、看SimpleApplicationEventMulticaster類實現的廣播事件邏輯:

@Override
public void multicastEvent(ApplicationEvent event) {
	multicastEvent(event, resolveDefaultEventType(event)); // 繼續調用下面的廣播方法
}

@Override
public void multicastEvent(final ApplicationEvent event, ResolvableType eventType) {
	ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
  // 遍歷監聽器列表
	for (final ApplicationListener<?> listener : getApplicationListeners(event, type)) {
		Executor executor = getTaskExecutor();
		if (executor != null) { // 是否指定了線程池
			executor.execute(new Runnable() {
				@Override
				public void run() { // 線程池執行
					invokeListener(listener, event);
				}
			});
		}
		else { // 普通執行
			invokeListener(listener, event);
		}
	}
}

代碼分析:

  1. 首先根據事件類型,獲取事件監聽器列表:getApplicationListeners(event, type)
  2. 遍歷監聽器列表,for循環
  3. 判斷是否有線程池,如果有,在線程池執行
  4. 否則直接執行

我們再看看 invokeListener方法的邏輯:

protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {
		ErrorHandler errorHandler = getErrorHandler();
		if (errorHandler != null) { // 是否有錯誤處理
			try {
				doInvokeListener(listener, event);
			} catch (Throwable err) {
				errorHandler.handleError(err);
			}
		} else {
			doInvokeListener(listener, event); // 直接執行
		}
	}

核心邏輯就是繼續調用doInvokeListener方法:

private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
		try {
			listener.onApplicationEvent(event);// 執行監聽器事件
		}
		catch (ClassCastException ex) {
			String msg = ex.getMessage();
			if (msg == null || msg.startsWith(event.getClass().getName())) {
				// Possibly a lambda-defined listener which we could not resolve the generic event type for
				Log logger = LogFactory.getLog(getClass());
				if (logger.isDebugEnabled()) {
					logger.debug("Non-matching event type for listener: " + listener, ex);
				}
			}
			else {
				throw ex;
			}
		}
	}

發現最後實際就是調用的 listener.onApplicationEvent(event); 也就是我們通過實現接口ApplicationListener的方式來實現監聽器的onApplicationEvent實現邏輯。

ApplicationEventPublisher類

在我們的發佈事件邏輯代碼的地方,通過查看 eventPublisher.publishEvent(userDTO);方法可以發現ApplicationEventPublisher是一個接口,publishEvent方法的邏輯實現主要在類AbstractApplicationContext中:

public abstract class AbstractApplicationContext extends DefaultResourceLoader
		implements ConfigurableApplicationContext, DisposableBean {
...
		private Set<ApplicationEvent> earlyApplicationEvents;
...
		@Override
		public void publishEvent(ApplicationEvent event) {
			publishEvent(event, null); // 調用下面的方法
		}
    // 發佈事件主要邏輯
		protected void publishEvent(Object event, ResolvableType eventType) {
				Assert.notNull(event, "Event must not be null");
				if (logger.isTraceEnabled()) {
					logger.trace("Publishing event in " + getDisplayName() + ": " + event);
				}
		
				// 事件裝飾為 ApplicationEvent
				ApplicationEvent applicationEvent;
				if (event instanceof ApplicationEvent) {
					applicationEvent = (ApplicationEvent) event;
				} else {
					applicationEvent = new PayloadApplicationEvent<Object>(this, event);
					if (eventType == null) {
						eventType = ((PayloadApplicationEvent) applicationEvent).getResolvableType();
					}
				}
		
				// 容器啟動的時候 earlyApplicationEvents 可能還沒有初始化
				if (this.earlyApplicationEvents != null) {
					this.earlyApplicationEvents.add(applicationEvent); // 加入到集合,同一廣播
				} else {
          // 還沒初始化,直接廣播事件
					getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
				}
		
				// 通過父上下文發佈事件.
				if (this.parent != null) {
					if (this.parent instanceof AbstractApplicationContext) {
						((AbstractApplicationContext) this.parent).publishEvent(event, eventType);
					}
					else {
						this.parent.publishEvent(event);
					}
				}
			}
...
}

這段代碼的主要邏輯在這:

if (this.earlyApplicationEvents != null) {
	this.earlyApplicationEvents.add(applicationEvent);
}
else {
	getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
}

可以發現earlyApplicationEvents也是一個Set集合,如果這個集合已經初始化了,就把事件加入到集合中,否則直接調用multicastEvent執行事件監聽邏輯。

我們跟蹤找到初始化這個集合的地方,發現在方法protected void prepareRefresh()中:

protected void prepareRefresh() {
		this.startupDate = System.currentTimeMillis();
		this.closed.set(false);
		this.active.set(true);

		if (logger.isInfoEnabled()) {
			logger.info("Refreshing " + this);
		}

		initPropertySources();

		getEnvironment().validateRequiredProperties();

		**this.earlyApplicationEvents = new LinkedHashSet<ApplicationEvent>();**
	}

繼續跟蹤調用這個方法的地方,發現在AbstractApplicationContext.refresh()方法中,而這個方法是Spring容器初始化必須要調用的過程,非常的重要。

那在什麼地方使用到了這個集合呢?我們繼續跟蹤發現在 protected void registerListeners() 方法中,代碼如下:

protected void registerListeners() {
		// Register statically specified listeners first.
		for (ApplicationListener<?> listener : getApplicationListeners()) {
			getApplicationEventMulticaster().addApplicationListener(listener);
		}

		// Do not initialize FactoryBeans here: We need to leave all regular beans
		// uninitialized to let post-processors apply to them! jinglingwang.cn
		String[] listenerBeanNames = getBeanNamesForType(ApplicationListener.class, true, false);
		for (String listenerBeanName : listenerBeanNames) {
			getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName);
		}

		// 拿到集合引用
		Set<ApplicationEvent> ****earlyEventsToProcess = this.earlyApplicationEvents;
		this.earlyApplicationEvents = null; // 把之前的集合置為null
		if (earlyEventsToProcess != null) { // 如果集合不為空,則廣播裏面的事件
			for (ApplicationEvent earlyEvent : earlyEventsToProcess) {
				getApplicationEventMulticaster().multicastEvent(earlyEvent);
			}
		}
	}

邏輯是先獲得該集合的引用,然後置空之前的集合,然後遍歷集合,進行廣播事件multicastEvent,這個方法的邏輯上面已經說過了。

而registerListeners這個方法是在什麼時候調用的呢?通過跟蹤發現也是在AbstractApplicationContext.refresh()方法中。

只不過基本是在方法邏輯的最後,也就是Spring已經容器初始化完成了。

@Override
	public void refresh() throws BeansException, IllegalStateException {
		synchronized (this.startupShutdownMonitor) {
			// Prepare this context for refreshing.
			**prepareRefresh**();

			....
			try {
				onRefresh();

				// Check for listener beans and register them.
				**registerListeners**();

				// Instantiate all remaining (non-lazy-init) singletons.
				finishBeanFactoryInitialization(beanFactory);

				// Last step: publish corresponding event.
				**finishRefresh**();
			}

			catch (BeansException ex) {
			...
			}

			finally {
				...
			}
		}
	}

容器初始化之前和之後都有可能進行廣播事件。

總結

  1. 事件監聽機制和觀察者模式非常相似
  2. JDK 也有實現提供事件監聽機制
  3. Spring 的事件機制也是基於JDK 來擴展的
  4. Spring 的事件機制默認是同步阻塞的
  5. Spring 容器初始化前後都可能進行廣播事件