Halo 開源項目學習(六):事件監聽機制
Halo 項目中,當用戶或部落客執行某些操作時,伺服器會發布相應的事件,例如部落客登錄管理員後台時發布 “日誌記錄” 事件,用戶瀏覽文章時發布 “訪問文章” 事件。事件發布後,負責監聽的 Bean 會做出相應的處理,這種設計稱為事件監聽機制,其作用是可以實現業務邏輯之間的解耦,提高程式的擴展性和可維護性。
ApplicationEvent 和 Listener
Halo 使用 ApplicationEvent 和 Listener 來實現事件的發布與監聽,二者由 Spring 提供,其中 ApplicationEvent 是需要發布的事件,Listener 則是監聽器。用戶可在監聽器中自定義事件的處理邏輯,當事件發生時,只需要將事件發布,監聽器會根據用戶定義的邏輯自動處理該事件。
事件需要繼承 ApplicationEvent 類,且需要重載構造方法,以 LogEvent 為例:
public class LogEvent extends ApplicationEvent {
private final LogParam logParam;
* Create a new ApplicationEvent.
* @param source the object on which the event initially occurred (never {@code null})
* @param logParam login param
public LogEvent(Object source, LogParam logParam) {
// Validate the log param
// Set ip address
this.logParam = logParam;
public LogEvent(Object source, String logKey, LogType logType, String content) {
this(source, new LogParam(logKey, logType, content));
public LogParam getLogParam() {
return logParam;
構造方法中的 source 指的是觸發事件的 Bean,也稱為事件源,通常用 this 關鍵字代替,其它參數可由用戶任意指定。
ApplicationContext 介面的 publishEvent 方法可用於發布事件,例如部落格初始化完成後發布 LogEvent 事件(InstallConroller 中的 installBlog 方法):
public BaseResponse<String> installBlog(@RequestBody InstallParam installParam) {
// 省略部分程式碼
new LogEvent(this, user.getId().toString(), LogType.BLOG_INITIALIZED, "部落格已成功初始化")
return BaseResponse.ok("安裝完成!");
監聽器的創建方式有多種,例如實現 ApplicationListener 介面、SmartApplicationListener 介面,或者添加 @EventListener 註解。項目中使用註解來定義監聽器,如 LogEventListener:
public class LogEventListener {
private final LogService logService;
public LogEventListener(LogService logService) {
this.logService = logService;
public void onApplicationEvent(LogEvent event) {
// Convert to log
Log logToCreate = event.getLogParam().convertTo();
// Create log
用戶可在 @EventListener 註解修飾的方法中定義事件的處理邏輯,方法接收的參數為監聽的事件類型。@Async 註解的作用是實現非同步監聽,以上文中的 installBlog 方法為例,如果不添加該註解,那麼程式需要等待 onApplicationEvent 方法執行結束後才能返回 “安裝完成!”。加上 @Async 註解後,onApplicationEvent 方法會在新的執行緒中執行,installBlog 方法可以立即返回。若要使 @Async 註解生效,還需要在啟動類或配置類上添加 @EnableAsync 註解。
接下來我們分析一下 Halo 項目中不同事件的處理過程:
日誌記錄事件 LogEvent 由 LogEventListener 中的 onApplicationEvent 方法處理,該方法的處理邏輯非常簡單,就是在 logs 表中插入一條系統日誌,插入的記錄用於在管理員介面展示:
需要注意的是,不同類型日誌的 logKey、logType 以及 content 會有所區別,例如用戶登錄時,logKey 為用戶的 userName,logType 為 LogType.LOGGED_IN,content 為用戶的 nickName:
new LogEvent(this, user.getUsername(), LogType.LOGGED_IN, user.getNickname()));
發布文章時,logKey 為文章的 id,logType 為 LogType.POST_PUBLISHED,content 為文章的 title:
LogEvent logEvent = new LogEvent(this, createdPost.getId().toString(),
LogType.POST_PUBLISHED, createdPost.getTitle());
文章訪問事件 PostVisitEvent 由 AbstractVisitEventListener 中的 handleVisitEvent 方法處理,該方法的處理的邏輯是將當前文章的訪問量加一:
protected void handleVisitEvent(@NonNull AbstractVisitEvent event) throws InterruptedException {
Assert.notNull(event, "Visit event must not be null");
// 獲取文章 id
// Get post id
Integer id = event.getId();
log.debug("Received a visit event, post id: [{}]", id);
// 如果當前 postId 具有對應的 BlockingQueue, 那麼直接返回該 BlockingQueue, 否則為當前 postId 創建一個新的 BlockingQueue
// Get post visit queue
BlockingQueue<Integer> postVisitQueue =
visitQueueMap.computeIfAbsent(id, this::createEmptyQueue);
// 如果當前 postId 具有對應的 PostVisitTask, 不做任何處理, 否則為當前 postId 創建一個新的 PostVisitTask 任務
visitTaskMap.computeIfAbsent(id, this::createPostVisitTask);
// 將當前 postId 存入到對應的 BlockingQueue
// Put a visit for the post
上述方法首先獲取當前被訪問文章的 postId,然後查詢 visitQueueMap 中是否存在 postId 對應的阻塞隊列(實際類型為 LinkedBlockingQueue),如果存在那麼直接返回該隊列, 否則為當前 postId 創建一個新的阻塞隊列並存入到 visitQueueMap。接著查詢 visitTaskMap 中是否存在 postId 對應的 PostVisitTask 任務(任務的作用是將文章的訪問量加一),如果沒有,那麼就為 postId 創建一個新的 PostVisitTask 任務,並將該任務交給執行緒池 ThreadPoolExecutor(Executors.newCachedThreadPool())執行。之後將 postId 添加到對應的阻塞隊列,這一步的目的是管理 PostVisitTask 任務的執行次數。
visitQueueMap 和 visitTaskMap 都是 ConcurrentHashMap 類型的對象,使用 ConcurrentHashMap 是為了保證執行緒安全,因為監聽器的事件處理方法被 @Async 註解修飾。默認情況下,@Async 註解修飾的方法會由 Spring 創建的執行緒池 ThreadPoolTaskExecutor 中的執行緒執行,因此當某一篇文章被多個用戶同時瀏覽時,ThreadPoolTaskExecutor 中的多個執行緒可能會同時在 visitQueueMap 中創建阻塞隊列,或在 visitTaskMap 中創建 PostVisitTask 任務。
下面看一下 PostVisitTask 任務中 run 方法的處理邏輯:
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
BlockingQueue<Integer> postVisitQueue = visitQueueMap.get(id);
Integer postId = postVisitQueue.take();
log.debug("Took a new visit for post id: [{}]", postId);
// Increase the visit
log.debug("Increased visits for post id: [{}]", postId);
} catch (InterruptedException e) {
"Post visit task: " + Thread.currentThread().getName() + " was interrupted",
// Ignore this exception
log.debug("Thread: [{}] has been interrupted", Thread.currentThread().getName());
執行緒池 ThreadPoolExecutor 中的一個執行緒處理該任務:
從 visitQueueMap 獲取 postId 對應的阻塞隊列(這裡的 id 其實就是 postId),並取出隊首元素。
將 postId 對應的文章的點贊量加一。
只要執行緒不被中斷,就一直重複步驟 1 和步驟 2,如果隊列為空,那麼執行緒進入阻塞。
當 id 為 postId 的文章被訪問時,系統會為其創建一個 LinkedBlockingQueue 類型的阻塞隊列和一個負責將文章點贊量加一的 PostVisitTask 任務。然後 postId 入隊,執行緒池 ThreadPoolExecutor 分配一個執行緒執行 PostVisitTask 任務,阻塞隊列有多少個 postId 該任務就執行多少次。
事件監聽機制是一個非常重要的知識點,實際開發中,如果某些業務處理起來比較耗時,且與主要業務的關聯性並不是很強,那麼可以考慮做任務拆分,利用事件監聽機制將串列執行非同步化,改為並行執行(當然也可以使用消息隊列)。Halo 中還有新增評論、主題更新等事件,這些事件的的處理思路與文章訪問事件相似,所以本文就不再過多陳述了 ( ⊙‿⊙)。