­

深入學習 Node.js EventEmitter

  • 2019 年 11 月 6 日
  • 筆記

預備知識

觀察者模式

觀察者模式軟體設計模式的一種。在此種模式中,一個目標對象管理所有相依於它的觀察者對象,並且在它本身的狀態改變時主動發出通知。這通常透過呼叫各觀察者所提供的方法來實現。此種模式通常被用來實時事件處理系統。 —— 維基百科

觀察者模式,它定義了一種一對多的關係,讓多個觀察者對象同時監聽某一個主題對象,這個主題對象的狀態發生變化時就會通知所有的觀察者對象,使得它們能夠自動更新自己。

我們可以使用日常生活中,期刊訂閱的例子來形象地解釋一下上面的概念。期刊訂閱包含兩個主要的角色:期刊出版方和訂閱者,它們之間的關係如下:

  • 期刊出版方 – 負責期刊的出版和發行工作。
  • 訂閱者 – 只需執行訂閱操作,新版的期刊發布後,就會主動收到通知,如果取消訂閱,以後就不會再收到通知。

在觀察者模式中也有兩個主要角色:主題和觀察者,分別對應期刊訂閱例子中的期刊出版方和訂閱者,它們之間的關係圖如下:

觀察者模式的優缺點、應用和實現,這裡就不詳細展開,有興趣的小夥伴可以閱讀本人之前整理的文章Observable詳解 – Observer Pattern

發布/訂閱模式

軟體架構中,發布-訂閱是一種消息範式,消息的發送者(稱為發布者)不會將消息直接發送給特定的接收者(稱為訂閱者)。而是將發布的消息分為不同的類別,無需了解哪些訂閱者(如果有的話)可能存在。同樣的,訂閱者可以表達對一個或多個類別的興趣,只接收感興趣的消息,無需了解哪些發布者(如果有的話)存在。—— 維基百科

發布/訂閱模式與觀察者模式非常類似,它們最大的區別是:發布者和訂閱者不知道對方的存在。它們之間需要一個第三方組件,叫做資訊中介,它將訂閱者和發布者串聯起來,它過濾和分配所有輸入的消息。換句話說,發布/訂閱模式用來處理不同系統組件的資訊交流,即使這些組件不知道對方的存在。

那麼資訊中介是如何過濾消息呢?在發布/訂閱模型中,訂閱者通常接收所有發布的消息的一個子集。選擇接受和處理的消息的過程被稱作過濾。有兩種常用的過濾形式:基於主題的和基於內容的。

  • 基於主題的系統中,消息被發布到主題或命名通道上。訂閱者將收到其訂閱的主題上的所有消息,並且所有訂閱同一主題的訂閱者將接收到同樣的消息。發布者負責定義訂閱者所訂閱的消息類別。
  • 基於內容的系統中,訂閱者定義其感興趣的消息的條件,只有當消息的屬性或內容滿足訂閱者定義的條件時,消息才會被投遞到該訂閱者。訂閱者需要負責對消息進行分類。

一些系統支援兩者的混合:發布者發布消息到主題上,而訂閱者將基於內容的訂閱註冊到一個或多個主題上。基於主題的通訊基礎結構圖如下:

最後我們再來總結一下觀察者模式與發布/訂閱模式之間的區別。

觀察者模式 vs 發布/訂閱模式

(圖片來源 – developers-club

觀察者模式與發布/訂閱模式之間的區別:

  • 在觀察者模式中,觀察者知道 Subject 的存在,Subject 一直保持對觀察者進行記錄。然而,在發布/訂閱模式中,發布者和訂閱者不知道對方的存在,它們只有通過資訊中介進行通訊。
  • 在發布訂閱模式中,組件是鬆散耦合的,正好和觀察者模式相反。
  • 觀察者模式大多數時候是同步的,比如當事件觸發,Subject 就會去調用觀察者的方法。而發布/訂閱模式大多數時候是非同步的(使用消息隊列)。

Node.js EventEmitter

大多數 Node.js 核心 API 都採用慣用的非同步事件驅動架構,其中某些類型的對象(觸發器)會周期性地觸發命名事件來調用函數對象(監聽器)。

例如,net.Server 對象會在每次有新連接時觸發事件;fs.ReadStream 會在文件被打開時觸發事件;流對象 會在數據可讀時觸發事件。

所有能觸發事件的對象都是 EventEmitter 類的實例。 這些對象開放了一個 eventEmitter.on() 函數,允許將一個或多個函數綁定到會被對象觸發的命名事件上。 事件名稱通常是駝峰式的字元串,但也可以使用任何有效的 JavaScript 屬性名。

EventEmitter 對象觸發一個事件時,所有綁定在該事件上的函數都被同步地調用。 監聽器的返回值會被丟棄。

EventEmitter 基本使用

const EventEmitter = require('events');    class MyEmitter extends EventEmitter {}    const myEmitter = new MyEmitter();  myEmitter.on('event', () => {    console.log('觸發了一個事件!');  });    myEmitter.emit('event');

以上示例,我們自定義 MyEmitter 類,該類繼承於 EventEmitter 類,接著我們通過使用 new 關鍵字創建了 myEmitter 實例,然後使用 on() 方法監聽 event 事件,最後利用 emit() 方法觸發 event 事件。

小夥伴們,是不是覺得示例很簡單。覺得簡單就對了,我們就從簡單的入手,慢慢深入學習 EventEmitter 類。

EventEmitter 構造函數

function EventEmitter() {    EventEmitter.init.call(this);  }    EventEmitter.usingDomains = false;    EventEmitter.prototype._events = undefined;  EventEmitter.prototype._eventsCount = 0; // 事件數  EventEmitter.prototype._maxListeners = undefined; // 最大的監聽器數

在 EventEmitter 構造函數內部,會調用 EventEmitter.init 方法執行初始化操作,EventEmitter.init 的具體實現如下:

EventEmitter.init = function() {    if (this._events === undefined ||        this._events === Object.getPrototypeOf(this)._events) {      this._events = Object.create(null);      this._eventsCount = 0;    }    this._maxListeners = this._maxListeners || undefined;  };

在 EventEmitter.init 內部,會根據條件執行初始化操作,比較重要的這行程式碼 this._events = Object.create(null),實現過簡單發布/訂閱模式的小夥伴,估計已經猜到 _events 屬性的作用了,這裡我們就先不繼續討論,我們先來看一下 on() 方法。

EventEmitter on() 方法

EventEmitter.prototype.on = EventEmitter.prototype.addListener;    EventEmitter.prototype.addListener = function addListener(type, listener) {    return _addListener(this, type, listener, false);  };

通過程式碼我們可以發現 EventEmitter 實例上 addListeneron 的實現是一樣的,執行時都是調用 events.js 文件內的 _addListener() 函數,它的具體實現如下(程式碼片段):

/**  * 添加事件監聽器  * target:EventEmitter 實例  * type:事件類型  * listener:事件監聽器  * prepend:是否添加在前面  */  function _addListener(target, type, listener, prepend) {    var m;    var events;    var existing;      // 若監聽器不是函數對象,則拋出異常    if (typeof listener !== 'function') {      const errors = lazyErrors();      throw new errors.TypeError('ERR_INVALID_ARG_TYPE', 'listener', 'Function');    }      events = target._events;    // 若target._events對象未定義,則使用Object.create創建一個新的對象    if (events === undefined) {      events = target._events = Object.create(null);      target._eventsCount = 0;    } else {      // To avoid recursion in the case that type === "newListener"! Before      // adding it to the listeners, first emit "newListener".      if (events.newListener !== undefined) {        target.emit('newListener', type,                    listener.listener ? listener.listener : listener);          // Re-assign `events` because a newListener handler could have caused the        // this._events to be assigned to a new object        events = target._events;      }      existing = events[type]; // 獲取type類型保存的對象    }      if (existing === undefined) {      // Optimize the case of one listener. Don't need the extra array object.      // 優化單個監聽器的場景,不需使用額外的數組對象。      existing = events[type] = listener;      ++target._eventsCount;    } else {      if (typeof existing === 'function') { // 添加type前已有綁定監聽器        // Adding the second element, need to change to array.        existing = events[type] =          prepend ? [listener, existing] : [existing, listener];        // If we've already got an array, just append.      } else if (prepend) { // 添加到前面        existing.unshift(listener);      } else { // 添加到後面        existing.push(listener);      }    }    return target;  }

現在我們來簡單總結一下 _addListener() 方法內部的主要流程:

  • 驗證監聽器是否為函數對象。
  • 避免類型為 newListener 的事件類型,造成遞歸調用。
  • 優化單個監聽器的場景,不需使用額外的數組對象。
  • 基於 prepend 參數的值,控制監聽器的添加順序。

這時,相信你已經知道 EventEmitter 實例中 _events 屬性的作用了,即用來以 Key-Value 的形式來保存指定的事件類型與對應的監聽器。具體可以參考下圖(myEmitter.on(『event』, ()=>{} 內部執行情況):

綁定完事件,如果要派發事件,就可以調用 EventEmitter 實例的 emit() 方法,該方法的實現如下(程式碼片段):

EventEmitter.prototype.emit = function emit(type, ...args) {    let doError = (type === 'error');      const events = this._events;      const handler = events[type]; // 獲取type類型對應的處理器      if (handler === undefined)      return false;      // 若事件處理器為函數對象,則使用Reflect.apply進行調用    if (typeof handler === 'function') {      Reflect.apply(handler, this, args);    } else {      const len = handler.length;      const listeners = arrayClone(handler, len);      for (var i = 0; i < len; ++i)        Reflect.apply(listeners[i], this, args);    }      return true;  };    // 數組淺拷貝  function arrayClone(arr, n) {    var copy = new Array(n);    for (var i = 0; i < n; ++i)      copy[i] = arr[i];    return copy;  }

emit() 方法內部實現還是挺簡單的,先根據事件類型獲取對應的處理器,然後根據事件處理器的類型,進行進一步處理。需要注意的是,調用處理器是通過 Reflect 對象提供的 apply() 方法來實現。

Reflect.apply() 方法的簽名如下:

Reflect.apply(target, thisArgument, argumentsList)

  • target —— 目標函數。
  • thisArgument —— target 函數調用時綁定的 this 對象。
  • argumentsList —— target 函數調用時傳入的實參列表,該參數應該是一個類數組的對象。

如果對 Reflect 對象感興趣的小夥伴,可以參考MDN – Reflect 對象

到這裡前面的簡單的示例,我們已經分析完了。我們已經知道通過 EventEmitter 實例的 on() 方法可以用來添加事件監聽,但有些時候,我們也需要在某些情況下移除對應的監聽。針對這種需求,我們就需要利用 EventEmitter 實例的 removeListener() 方法了。

EventEmitter removeListener() 方法

removeListener() 方法最多只會從監聽器數組裡移除一個監聽器實例。 如果任何單一的監聽器被多次添加到指定 type 的監聽器數組中,則必須多次調用 removeListener() 方法才能移除每個實例。為了方便一次性移除 type 對應的監聽器,EventEmitter 為我們提供了 removeAllListeners() 方法。

下面我們來看一下 removeListener() 方法的具體實現(程式碼片段):

// Emits a 'removeListener' event if and only if the listener was removed.  EventEmitter.prototype.removeListener =      function removeListener(type, listener) {        var list, events, position, i, originalListener;          // 若監聽器不是函數對象,則拋出異常        if (typeof listener !== 'function') {          const errors = lazyErrors();          throw new errors.TypeError('ERR_INVALID_ARG_TYPE', 'listener',            'Function');        }          events = this._events;        if (events === undefined)          return this;          list = events[type]; // 獲取type對應的綁定對象        if (list === undefined)          return this;          if (list === listener || list.listener === listener) {          if (--this._eventsCount === 0) // 只綁定一個監聽器            this._events = Object.create(null);          else {            delete events[type];            // 若已設置removeListener監聽器,則觸發removeListener事件            if (events.removeListener)              this.emit('removeListener', type, list.listener || listener);          }        } else if (typeof list !== 'function') { // 包含多個監聽器          position = -1;            for (i = list.length - 1; i >= 0; i--) { // 獲取需移除listener對應的索引值            if (list[i] === listener || list[i].listener === listener) {              originalListener = list[i].listener;              position = i;              break;            }          }            if (position < 0)            return this;            if (position === 0)            list.shift();          else {            if (spliceOne === undefined)              spliceOne = require('internal/util').spliceOne;            // 調用內置的spliceOne移除position對應的值            spliceOne(list, position);          }            if (list.length === 1)            events[type] = list[0];            if (events.removeListener !== undefined)            this.emit('removeListener', type, originalListener || listener);        }          return this;      };

通過程式碼我們發現在調用 removeListener() 方法時,若 type 事件類型上綁定多個事件處理器,那麼內部處理程式會先根據 listener 事件處理器,查找該事件處理器對應的索引值,若該索引值大於 0,則會調用 Node.js 內部工具庫提供的 spliceOne() 方法,移除對應的事件處理器。為什麼不直接利用 Array#splice() 方法呢?官方的回答是 spliceOne() 方法的執行速度比 Array#splice() 快大約 1.5 倍。

spliceOne() 方法具體實現如下:

// About 1.5x faster than the two-arg version of Array#splice().  function spliceOne(list, index) {    for (var i = index, k = i + 1, n = list.length; k < n; i += 1, k += 1)      list[i] = list[k];    list.pop(); // 把最後面的空位移除  }

感興趣的小夥伴,可以實際對比一下 Array#splice() 與 spliceOne() 的性能哈。最後我們來介紹一下 EventEmitter 另一個常用的方法 once()。

EventEmitter once() 方法

有些時候,對於一些特殊的事件類型,我們只需執行一次事件處理器,這時我們就可以使用 once() 方法:

const myEmitter = new MyEmitter();  let m = 0;  myEmitter.once('event', () => {    console.log(++m);  });  myEmitter.emit('event'); // 列印: 1  myEmitter.emit('event'); // 無輸出

以上程式碼很簡單,廢話不多說,我們直接看一下 once 函數的具體實現:

EventEmitter.prototype.once = function once(type, listener) {    if (typeof listener !== 'function') {      const errors = lazyErrors();      throw new errors.TypeError('ERR_INVALID_ARG_TYPE', 'listener', 'Function');    }    this.on(type, _onceWrap(this, type, listener));    return this;  };

通過源碼可以發現,once() 函數內部也是通過調用 on() 方法來綁定事件監聽器。特別之處是,內部使用 _onceWrap 函數對 listener 函數進行進一步封裝。那我們只能繼續發掘 _onceWrap 函數,該函數的實現如下:

function _onceWrap(target, type, listener) {    var state = { fired: false, wrapFn: undefined, target, type, listener };    var wrapped = onceWrapper.bind(state); // 綁定this上下文    wrapped.listener = listener;    state.wrapFn = wrapped;    return wrapped;  }

在 _onceWrap 函數內部,我們創建了一個 state 對象,該對象有一個 fired 屬性,用來標識是否已觸發,其默認值是 false。一開始還以為內部實現都包含在 _onceWrap 函數內,沒想到竟然又來了個 onceWrapper 函數對象。為了能夠揭開 once() 的神秘面紗,只能繼續前進了。onceWrapper 函數的實現如下:

function onceWrapper(...args) {    if (!this.fired) {      this.target.removeListener(this.type, this.wrapFn);      this.fired = true;      Reflect.apply(this.listener, this.target, args);    }  }

守得雲開見月明,終於見到 onceWrapper 函數的廬山真面目。在函數體中,若發現事件處理器未被調用,則先移除事件監聽器並設置 fired 欄位值為 true,然後利用之前介紹的 Reflect.apply() 方法調用 type 事件類型,對應的事件處理器。至此,EventEmitter 的探索之旅,就落下的帷幕,想繼續了解 EventEmitter 的小夥伴,可以查閱官方文檔或 EventEmitter 對應的源碼。

總結

為了能夠更好地理解 EventEmitter 的設計思想,首先我們介紹了觀察者模式與發布/訂閱模式,然後對比了它們之間的區別。接著我們以一個簡單的示例為切入點,介紹了 EventEmitter 的 on()、emit()、removeListener() 和 once() 方法的使用及內部實現。

如果小夥伴們也對 EventEmitter 源碼感興趣,建議採用閱讀和調試相結合的方式,進行源碼學習。詳細的調試方式,請參考 Debugging Node.js Apps 文章。

參考資源