RxJS 中的觀察者和迭代器模式

目錄

  • 前言
  • 觀察者模式
  • 迭代器模式
  • RxJS 中兩種模式的結合和實現
  • 小結
  • 參考

1. 前言

RxJS 是一個庫,它通過使用observable(可觀察對象)序列來編寫非同步和基於事件的程式。其結合了觀察者模式迭代器模式使用集合的函數式編程,以一種理想方式來管理事件序列所需要的一切。

本文將主要探討觀察者模式、迭代器模式以及它們如何在 RxJS 中被應用。

2. 觀察者模式

實現了生產者(事件的創建者)和消費者(事件的監聽者)的邏輯分離關係。

瀏覽器 DOM 事件的監聽和觸發應該是 Web 前端最典型的觀察者模式的實現。

document.body.addEventListener('click', function listener(e) {
    console.log(e);
});

document.body.click(); // 模擬用戶點擊

監聽:通過addEventListenerdocument.body節點綁定一個click事件的事件處理函數。

觸發:當用戶點擊頁面(body)時,body節點將會觸發綁定的事件處理函數。

關係圖如下:

Web事件的觀察者模式

3. 迭代器模式

可以讓用戶通過特定的介面訪問集合中的每一個元素而不用了解底層的實現。

從 ES 6 開始,引入的一種新的遍歷機制——迭代器,其就是迭代器模式在 JavaScript 中的一種實現。在 JavaScript 中,迭代器是一個對象,它定義一個序列,並在終止時可能返回一個返回值。 更具體地說,迭代器是通過使用 next() 方法實現 Iterator protocol (迭代器協議)的任何一個對象,該方法返回具有兩個屬性的對象: valuedone ,其中value代表具體返回值,done表示是否已經迭代完畢。

StringArrayMapSet 等都是內置可迭代對象,它們的原型對象都擁有一個 Symbol.iterator 方法。

const arr = ['a', 'b'];
const iterator = arr[Symbol.iterator](); // 獲取迭代器對象

iterator.next();  // { value: 'a', done: false }
iterator.next();  // { value: 'b', done: false }
iterator.next();  // { value: undefined, done: true }

我們常常用for-of 循環來遍歷可迭代對象:

const arr = ['a', 'b'];

for (let value of arr) {
    console.log(value); // a b
}

for-of語法是為了方便遍歷可迭代對象,其內部實現調用的是Symbol.iterator方法,類似下面的程式碼:

const arr = ['a', 'b'];
const iterator = arr[Symbol.iterator]();

let result = iterator.next();
while (!result.done) {
  console.log(result.value); // a b
  result = iterator.next();
}

迭代器的特點:

  1. 訪問集合中的內容而不用了解底層的實現。
  2. 提供了一個統一的介面遍歷不同的集合結構,從而支援同樣的演算法在不同的集合結構上進行操作。

4. RxJS 中兩種模式的結合和實現

RxJS 中包含兩個基本概念:**Observable **和 Observer

Observable 作為可觀察對象(被觀察者),是一個可調用的未來值或事件的集合(非同步或同步數據流)。

Observer 作為觀察者,是一個回調函數的集合,它知道如何去監聽由Observable提供的值。

ObservableObserver之間的訂閱發布關係(觀察者模式)如下:

訂閱:Observer 通過 Observable 提供的 subscribe() 方法訂閱 Observable

發布:Observable 通過 Observer 提供的 next 方法向 Observer 發布事件。

兩者關係的偽程式碼如下:

// Observer
const observer = {
    next(value) {
        console.log(value);
    }
};

// Observable
function Observable (observer) {
    setTimeout(()=>{
        observer.next('A');
    }, 1000);
}

// subscribe
Observable(observer);

從上可知,所謂訂閱,就是將觀察者Observer注入到可觀察對象Observable中。

在 RxJS 中,Observer 除了有 next 方法來接收 Observable 的事件外,還提供了另外的兩個方法:error()complete(),來處理異常和完成狀態。

const observer = {
    next(value) { /* 處理值 */ },
    error(err) { /* 處理異常 */ },
    complete() { /* 處理已完成態 */ }
};

結合迭代器 Iterator 來理解Observer的三個方法:

  • next()Observer通過提供 next 方法來接受Observable流(集合),是一種 push 形式(推送)。
    • 對比 Iterator,則是通過調用 iterator.next() 拿值,是一種 pull 的形式(拉取)。
  • complete():當不再有新的值發出時,將觸發 Observercomplete 方法。
    • 對比 Iterator ,則是在 next() 的返回結果中的 donetrue 時,則表示 complete
  • error():當處理事件中出現異常時,通過try-catch捕獲異常,Observer 提供 error 方法來接收錯誤進行統一處理。

一個簡單的 RxJS 訂閱-發布實例:

import { Observable } from 'rxjs';

const observable = new Observable(function (observer) {
    // 通知觀察者
    observer.next('a');
    observer.next('b');
    observer.complete(); // 將取消該觀察者的訂閱
  
    // observer.error(new Error('err'));
  
    observer.next('c'); // 由於已經 complete,所以不會再發送
});

// 定義觀察者,next、complete、error 方法處理流的不同狀態
const observer = {
    next: (value) => console.log(value),
    error: err => console.error('Observer got an error: ' + err),
    complete: () => console.log('Observer got a complete notification')
}

// 訂閱 Observable 並執行
const subscription = observable.subscribe(observer); // 將返回一個可取消的訂閱對象 subscription

執行結果:

a
b

5. 小結

一句話概述 RxJS 中實現的觀察者+迭代器模式:就是將觀察者Observer注入到可觀察對象Observable中,然後在可觀察對象Observable中通過調用Observer提供的 nextcompleteerror 方法處理流的不同狀態,以實現對數據流的一種順序訪問處理。

6. 參考

RxJS 中文文檔

Rx.js實現原理淺析