深入學習 Node.js EventEmitter
- 2019 年 11 月 6 日
- 筆記
預備知識
觀察者模式
觀察者模式是軟體設計模式的一種。在此種模式中,一個目標對象管理所有相依於它的觀察者對象,並且在它本身的狀態改變時主動發出通知。這通常透過呼叫各觀察者所提供的方法來實現。此種模式通常被用來實時事件處理系統。 —— 維基百科
觀察者模式,它定義了一種一對多的關係,讓多個觀察者對象同時監聽某一個主題對象,這個主題對象的狀態發生變化時就會通知所有的觀察者對象,使得它們能夠自動更新自己。
我們可以使用日常生活中,期刊訂閱的例子來形象地解釋一下上面的概念。期刊訂閱包含兩個主要的角色:期刊出版方和訂閱者,它們之間的關係如下:
- 期刊出版方 – 負責期刊的出版和發行工作。
- 訂閱者 – 只需執行訂閱操作,新版的期刊發布後,就會主動收到通知,如果取消訂閱,以後就不會再收到通知。
在觀察者模式中也有兩個主要角色:主題和觀察者,分別對應期刊訂閱例子中的期刊出版方和訂閱者,它們之間的關係圖如下:

觀察者模式的優缺點、應用和實現,這裡就不詳細展開,有興趣的小夥伴可以閱讀本人之前整理的文章Observable詳解 – Observer Pattern。
發布/訂閱模式
在軟體架構中,發布-訂閱是一種消息範式,消息的發送者(稱為發布者)不會將消息直接發送給特定的接收者(稱為訂閱者)。而是將發布的消息分為不同的類別,無需了解哪些訂閱者(如果有的話)可能存在。同樣的,訂閱者可以表達對一個或多個類別的興趣,只接收感興趣的消息,無需了解哪些發布者(如果有的話)存在。—— 維基百科
發布/訂閱模式與觀察者模式非常類似,它們最大的區別是:發布者和訂閱者不知道對方的存在。它們之間需要一個第三方組件,叫做資訊中介,它將訂閱者和發布者串聯起來,它過濾和分配所有輸入的消息。換句話說,發布/訂閱模式用來處理不同系統組件的資訊交流,即使這些組件不知道對方的存在。
那麼資訊中介是如何過濾消息呢?在發布/訂閱模型中,訂閱者通常接收所有發布的消息的一個子集。選擇接受和處理的消息的過程被稱作過濾。有兩種常用的過濾形式:基於主題的和基於內容的。
- 在基於主題的系統中,消息被發布到主題或命名通道上。訂閱者將收到其訂閱的主題上的所有消息,並且所有訂閱同一主題的訂閱者將接收到同樣的消息。發布者負責定義訂閱者所訂閱的消息類別。
- 在基於內容的系統中,訂閱者定義其感興趣的消息的條件,只有當消息的屬性或內容滿足訂閱者定義的條件時,消息才會被投遞到該訂閱者。訂閱者需要負責對消息進行分類。
一些系統支援兩者的混合:發布者發布消息到主題上,而訂閱者將基於內容的訂閱註冊到一個或多個主題上。基於主題的通訊基礎結構圖如下:

最後我們再來總結一下觀察者模式與發布/訂閱模式之間的區別。
觀察者模式 vs 發布/訂閱模式

(圖片來源 – developers-club)
觀察者模式與發布/訂閱模式之間的區別:
- 在觀察者模式中,觀察者知道 Subject 的存在,Subject 一直保持對觀察者進行記錄。然而,在發布/訂閱模式中,發布者和訂閱者不知道對方的存在,它們只有通過資訊中介進行通訊。
- 在發布訂閱模式中,組件是鬆散耦合的,正好和觀察者模式相反。
- 觀察者模式大多數時候是同步的,比如當事件觸發,Subject 就會去調用觀察者的方法。而發布/訂閱模式大多數時候是非同步的(使用消息隊列)。
Node.js EventEmitter
大多數 Node.js 核心 API 都採用慣用的非同步事件驅動架構,其中某些類型的對象(觸發器)會周期性地觸發命名事件來調用函數對象(監聽器)。
例如,net.Server
對象會在每次有新連接時觸發事件;fs.ReadStream
會在文件被打開時觸發事件;流對象 會在數據可讀時觸發事件。
所有能觸發事件的對象都是 EventEmitter
類的實例。 這些對象開放了一個 eventEmitter.on()
函數,允許將一個或多個函數綁定到會被對象觸發的命名事件上。 事件名稱通常是駝峰式的字元串,但也可以使用任何有效的 JavaScript 屬性名。
當 EventEmitter
對象觸發一個事件時,所有綁定在該事件上的函數都被同步地調用。 監聽器的返回值會被丟棄。
EventEmitter 基本使用
const EventEmitter = require('events'); class MyEmitter extends EventEmitter {} const myEmitter = new MyEmitter(); myEmitter.on('event', () => { console.log('觸發了一個事件!'); }); myEmitter.emit('event');
以上示例,我們自定義 MyEmitter 類,該類繼承於 EventEmitter 類,接著我們通過使用 new
關鍵字創建了 myEmitter
實例,然後使用 on()
方法監聽 event 事件,最後利用 emit()
方法觸發 event 事件。
小夥伴們,是不是覺得示例很簡單。覺得簡單就對了,我們就從簡單的入手,慢慢深入學習 EventEmitter 類。
EventEmitter 構造函數
function EventEmitter() { EventEmitter.init.call(this); } EventEmitter.usingDomains = false; EventEmitter.prototype._events = undefined; EventEmitter.prototype._eventsCount = 0; // 事件數 EventEmitter.prototype._maxListeners = undefined; // 最大的監聽器數
在 EventEmitter 構造函數內部,會調用 EventEmitter.init
方法執行初始化操作,EventEmitter.init
的具體實現如下:
EventEmitter.init = function() { if (this._events === undefined || this._events === Object.getPrototypeOf(this)._events) { this._events = Object.create(null); this._eventsCount = 0; } this._maxListeners = this._maxListeners || undefined; };
在 EventEmitter.init 內部,會根據條件執行初始化操作,比較重要的這行程式碼 this._events = Object.create(null)
,實現過簡單發布/訂閱模式的小夥伴,估計已經猜到 _events
屬性的作用了,這裡我們就先不繼續討論,我們先來看一下 on()
方法。
EventEmitter on() 方法
EventEmitter.prototype.on = EventEmitter.prototype.addListener; EventEmitter.prototype.addListener = function addListener(type, listener) { return _addListener(this, type, listener, false); };
通過程式碼我們可以發現 EventEmitter 實例上 addListener
和 on
的實現是一樣的,執行時都是調用 events.js
文件內的 _addListener()
函數,它的具體實現如下(程式碼片段):
/** * 添加事件監聽器 * target:EventEmitter 實例 * type:事件類型 * listener:事件監聽器 * prepend:是否添加在前面 */ function _addListener(target, type, listener, prepend) { var m; var events; var existing; // 若監聽器不是函數對象,則拋出異常 if (typeof listener !== 'function') { const errors = lazyErrors(); throw new errors.TypeError('ERR_INVALID_ARG_TYPE', 'listener', 'Function'); } events = target._events; // 若target._events對象未定義,則使用Object.create創建一個新的對象 if (events === undefined) { events = target._events = Object.create(null); target._eventsCount = 0; } else { // To avoid recursion in the case that type === "newListener"! Before // adding it to the listeners, first emit "newListener". if (events.newListener !== undefined) { target.emit('newListener', type, listener.listener ? listener.listener : listener); // Re-assign `events` because a newListener handler could have caused the // this._events to be assigned to a new object events = target._events; } existing = events[type]; // 獲取type類型保存的對象 } if (existing === undefined) { // Optimize the case of one listener. Don't need the extra array object. // 優化單個監聽器的場景,不需使用額外的數組對象。 existing = events[type] = listener; ++target._eventsCount; } else { if (typeof existing === 'function') { // 添加type前已有綁定監聽器 // Adding the second element, need to change to array. existing = events[type] = prepend ? [listener, existing] : [existing, listener]; // If we've already got an array, just append. } else if (prepend) { // 添加到前面 existing.unshift(listener); } else { // 添加到後面 existing.push(listener); } } return target; }
現在我們來簡單總結一下 _addListener() 方法內部的主要流程:
- 驗證監聽器是否為函數對象。
- 避免類型為 newListener 的事件類型,造成遞歸調用。
- 優化單個監聽器的場景,不需使用額外的數組對象。
- 基於 prepend 參數的值,控制監聽器的添加順序。
這時,相信你已經知道 EventEmitter 實例中 _events
屬性的作用了,即用來以 Key-Value 的形式來保存指定的事件類型與對應的監聽器。具體可以參考下圖(myEmitter.on(『event』, ()=>{} 內部執行情況):

綁定完事件,如果要派發事件,就可以調用 EventEmitter 實例的 emit() 方法,該方法的實現如下(程式碼片段):
EventEmitter.prototype.emit = function emit(type, ...args) { let doError = (type === 'error'); const events = this._events; const handler = events[type]; // 獲取type類型對應的處理器 if (handler === undefined) return false; // 若事件處理器為函數對象,則使用Reflect.apply進行調用 if (typeof handler === 'function') { Reflect.apply(handler, this, args); } else { const len = handler.length; const listeners = arrayClone(handler, len); for (var i = 0; i < len; ++i) Reflect.apply(listeners[i], this, args); } return true; }; // 數組淺拷貝 function arrayClone(arr, n) { var copy = new Array(n); for (var i = 0; i < n; ++i) copy[i] = arr[i]; return copy; }
emit() 方法內部實現還是挺簡單的,先根據事件類型獲取對應的處理器,然後根據事件處理器的類型,進行進一步處理。需要注意的是,調用處理器是通過 Reflect 對象提供的 apply()
方法來實現。
Reflect.apply() 方法的簽名如下:
Reflect.apply(target, thisArgument, argumentsList)
- target —— 目標函數。
- thisArgument —— target 函數調用時綁定的 this 對象。
- argumentsList —— target 函數調用時傳入的實參列表,該參數應該是一個類數組的對象。
如果對 Reflect 對象感興趣的小夥伴,可以參考MDN – Reflect 對象。
到這裡前面的簡單的示例,我們已經分析完了。我們已經知道通過 EventEmitter 實例的 on()
方法可以用來添加事件監聽,但有些時候,我們也需要在某些情況下移除對應的監聽。針對這種需求,我們就需要利用 EventEmitter 實例的 removeListener()
方法了。
EventEmitter removeListener() 方法
removeListener()
方法最多只會從監聽器數組裡移除一個監聽器實例。 如果任何單一的監聽器被多次添加到指定 type
的監聽器數組中,則必須多次調用 removeListener()
方法才能移除每個實例。為了方便一次性移除 type
對應的監聽器,EventEmitter 為我們提供了 removeAllListeners()
方法。
下面我們來看一下 removeListener() 方法的具體實現(程式碼片段):
// Emits a 'removeListener' event if and only if the listener was removed. EventEmitter.prototype.removeListener = function removeListener(type, listener) { var list, events, position, i, originalListener; // 若監聽器不是函數對象,則拋出異常 if (typeof listener !== 'function') { const errors = lazyErrors(); throw new errors.TypeError('ERR_INVALID_ARG_TYPE', 'listener', 'Function'); } events = this._events; if (events === undefined) return this; list = events[type]; // 獲取type對應的綁定對象 if (list === undefined) return this; if (list === listener || list.listener === listener) { if (--this._eventsCount === 0) // 只綁定一個監聽器 this._events = Object.create(null); else { delete events[type]; // 若已設置removeListener監聽器,則觸發removeListener事件 if (events.removeListener) this.emit('removeListener', type, list.listener || listener); } } else if (typeof list !== 'function') { // 包含多個監聽器 position = -1; for (i = list.length - 1; i >= 0; i--) { // 獲取需移除listener對應的索引值 if (list[i] === listener || list[i].listener === listener) { originalListener = list[i].listener; position = i; break; } } if (position < 0) return this; if (position === 0) list.shift(); else { if (spliceOne === undefined) spliceOne = require('internal/util').spliceOne; // 調用內置的spliceOne移除position對應的值 spliceOne(list, position); } if (list.length === 1) events[type] = list[0]; if (events.removeListener !== undefined) this.emit('removeListener', type, originalListener || listener); } return this; };
通過程式碼我們發現在調用 removeListener()
方法時,若 type 事件類型上綁定多個事件處理器,那麼內部處理程式會先根據 listener
事件處理器,查找該事件處理器對應的索引值,若該索引值大於 0,則會調用 Node.js 內部工具庫提供的 spliceOne() 方法,移除對應的事件處理器。為什麼不直接利用 Array#splice() 方法呢?官方的回答是 spliceOne() 方法的執行速度比 Array#splice() 快大約 1.5 倍。
spliceOne() 方法具體實現如下:
// About 1.5x faster than the two-arg version of Array#splice(). function spliceOne(list, index) { for (var i = index, k = i + 1, n = list.length; k < n; i += 1, k += 1) list[i] = list[k]; list.pop(); // 把最後面的空位移除 }
感興趣的小夥伴,可以實際對比一下 Array#splice() 與 spliceOne() 的性能哈。最後我們來介紹一下 EventEmitter 另一個常用的方法 once()。
EventEmitter once() 方法
有些時候,對於一些特殊的事件類型,我們只需執行一次事件處理器,這時我們就可以使用 once() 方法:
const myEmitter = new MyEmitter(); let m = 0; myEmitter.once('event', () => { console.log(++m); }); myEmitter.emit('event'); // 列印: 1 myEmitter.emit('event'); // 無輸出
以上程式碼很簡單,廢話不多說,我們直接看一下 once 函數的具體實現:
EventEmitter.prototype.once = function once(type, listener) { if (typeof listener !== 'function') { const errors = lazyErrors(); throw new errors.TypeError('ERR_INVALID_ARG_TYPE', 'listener', 'Function'); } this.on(type, _onceWrap(this, type, listener)); return this; };
通過源碼可以發現,once() 函數內部也是通過調用 on()
方法來綁定事件監聽器。特別之處是,內部使用 _onceWrap
函數對 listener 函數進行進一步封裝。那我們只能繼續發掘 _onceWrap
函數,該函數的實現如下:
function _onceWrap(target, type, listener) { var state = { fired: false, wrapFn: undefined, target, type, listener }; var wrapped = onceWrapper.bind(state); // 綁定this上下文 wrapped.listener = listener; state.wrapFn = wrapped; return wrapped; }
在 _onceWrap 函數內部,我們創建了一個 state 對象,該對象有一個 fired
屬性,用來標識是否已觸發,其默認值是 false。一開始還以為內部實現都包含在 _onceWrap 函數內,沒想到竟然又來了個 onceWrapper 函數對象。為了能夠揭開 once() 的神秘面紗,只能繼續前進了。onceWrapper 函數的實現如下:
function onceWrapper(...args) { if (!this.fired) { this.target.removeListener(this.type, this.wrapFn); this.fired = true; Reflect.apply(this.listener, this.target, args); } }
守得雲開見月明,終於見到 onceWrapper 函數的廬山真面目。在函數體中,若發現事件處理器未被調用,則先移除事件監聽器並設置 fired 欄位值為 true,然後利用之前介紹的 Reflect.apply() 方法調用 type 事件類型,對應的事件處理器。至此,EventEmitter 的探索之旅,就落下的帷幕,想繼續了解 EventEmitter 的小夥伴,可以查閱官方文檔或 EventEmitter 對應的源碼。
總結
為了能夠更好地理解 EventEmitter 的設計思想,首先我們介紹了觀察者模式與發布/訂閱模式,然後對比了它們之間的區別。接著我們以一個簡單的示例為切入點,介紹了 EventEmitter 的 on()、emit()、removeListener() 和 once() 方法的使用及內部實現。
如果小夥伴們也對 EventEmitter 源碼感興趣,建議採用閱讀和調試相結合的方式,進行源碼學習。詳細的調試方式,請參考 Debugging Node.js Apps 文章。