RxJS Subject

  • 2019 年 11 月 6 日
  • 筆記

觀察者模式

觀察者模式,它定義了一種一對多的關係,讓多個觀察者對象同時監聽某一個主題對象,這個主題對象的狀態發生變化時就會通知所有的觀察者對象,使得它們能夠自動更新自己。

我們可以使用日常生活中,期刊訂閱的例子來形象地解釋一下上面的概念。期刊訂閱包含兩個主要的角色:期刊出版方和訂閱者,他們之間的關係如下:

  • 期刊出版方 —— 負責期刊的出版和發行工作。
  • 訂閱者 —— 只需執行訂閱操作,新版的期刊發佈後,就會主動收到通知,如果取消訂閱,以後就不會再收到通知。

在觀察者模式中也有兩個主要角色:Subject(主題)和 Observer (觀察者),它們分別對應例子中的期刊出版方和訂閱者。

訂閱 Observable

在介紹 RxJS Subject 之前,我們先來看個示例:

import { interval } from "rxjs";  import { take } from "rxjs/operators";    const interval$ = interval(1000).pipe(take(3));    interval$.subscribe(value => console.log("Observer A get value: " + value));    setTimeout(() => {    interval$.subscribe(value => console.log("Observer B get value: " + value));  }, 1000);

以上代碼運行後,控制台的輸出結果:

Observer A get value: 0  Observer A get value: 1  Observer B get value: 0  Observer A get value: 2  Observer B get value: 1  Observer B get value: 2

通過以上示例,我們可以得出以下結論:

  • Observable 對象可以被重複訂閱。
  • Observable 對象每次被訂閱後,都會重新執行。

上面的示例,我們可以簡單地認為兩次調用普通的函數,具體參考以下代碼:

function interval() {    setInterval(() => console.log('..'), 1000);  }    interval();    setTimeout(() => {    interval();  }, 1000);

Observable 對象的默認行為,適用於大部分場景。但有些時候,我們會希望在第二次訂閱的時候,不會從頭開始接收 Observable 發出的值,而是從第一次訂閱當前正在處理的值開始發送,我們把這種處理方式成為組播。

上述的需求要如何實現呢?我們已經知道了觀察者模式定義了一對多的關係,我們可以讓多個觀察者對象同時監聽同一個主題,這裡就是我們的時間序列流。當數據源發出新值的時,所有的觀察者就能接收到新的值。

自定義 Subject

  1. Subject 類定義
class Subject {    observers = [];    addObserver(observer) {      this.observers.push(observer);    }      next(value) {      this.observers.forEach(o => o.next(value));    }      error(error) {      this.observers.forEach(o => o.error(error));    }      complete() {      this.observers.forEach(o => o.complete());    }  }
  1. 使用示例
const interval$ = interval(1000).pipe(take(3));  const subject = new Subject();    let observerA = {    next: value => console.log("Observer A get value: " + value),    error: error => console.log("Observer A error: " + error),    complete: () => console.log("Observer A complete!")  };    var observerB = {    next: value => console.log("Observer B get value: " + value),    error: error => console.log("Observer B error: " + error),    complete: () => console.log("Observer B complete!")  };    subject.addObserver(observerA); // 添加觀察者A  interval$.subscribe(subject); // 訂閱interval$對象  setTimeout(() => {    subject.addObserver(observerB); // 添加觀察者B  }, 1000);

以上代碼運行後,控制台的輸出結果:

Observer A get value: 0  Observer A get value: 1  Observer B get value: 1  Observer A get value: 2  Observer B get value: 2  Observer A complete!  Observer B complete!

RxJS Subject

其實 RxJS 也為我們提供了 Subject 類,接下我們來利用 RxJS 的 Suject 重寫一下上面的示例:

import { interval, Subject } from "rxjs";  import { take } from "rxjs/operators";    const interval$ = interval(1000).pipe(take(3));  const subject = new Subject();    const observerA = {    next: value => console.log("Observer A get value: " + value),    error: error => console.log("Observer A error: " + error),    complete: () => console.log("Observer A complete!")  };    const observerB = {    next: value => console.log("Observer B get value: " + value),    error: error => console.log("Observer B error: " + error),    complete: () => console.log("Observer B complete!")  };    subject.subscribe(observerA); // 添加觀察者A  interval$.subscribe(subject); // 訂閱interval$對象  setTimeout(() => {    subject.subscribe(observerB); // 添加觀察者B  }, 1000);

以上代碼運行後,控制台的輸出結果:

Observer A get value: 0  Observer A get value: 1  Observer B get value: 1  Observer A get value: 2  Observer B get value: 2  Observer A complete!  Observer B complete!

根據上述的示例代碼和控制台的輸出結果,我們來總結一下 Subject 的特點:

  • Subject 既是 Observable 對象,又是 Observer 對象。
  • 當有新消息時,Subject 會通知內部的所有觀察者。

RxJS Subject & Observable

Subject 其實是觀察者模式的實現,所以當觀察者訂閱 Subject 對象時,Subject 對象會把訂閱者添加到觀察者列表中,每當有 subject 對象接收到新值時,它就會遍歷觀察者列表,依次調用觀察者內部的 next() 方法,把值一一送出。

Subject 之所以具有 Observable 中的所有方法,是因為 Subject 類繼承了 Observable 類,在 Subject 類中有五個重要的方法:

  • next —— 每當 Subject 對象接收到新值的時候,next 方法會被調用。
  • error —— 運行中出現異常,error 方法會被調用。
  • complete —— Subject 訂閱的 Observable 對象結束後,complete 方法會被調用。
  • subscribe —— 添加觀察者。
  • unsubscribe —— 取消訂閱(設置終止標識符、清空觀察者列表)。

除了 Subject 之外,RxJS 還為我們提供了 Subject 的幾種變體,如 BehaviorSubject、ReplaySubject 和 AsyncSubject。下面我們來分別介紹一下它們。

BehaviorSubject

有些時候我們會希望 Subject 能保存當前的最新狀態,而不是單純的進行事件發送,也就是說每當新增一個觀察者的時候,我們希望 Subject 能夠立即發出當前最新的值,而不是沒有任何響應。

為了說明上述的情景,我們先來分析一下以下示例:

import { Subject } from "rxjs";    const subject = new Subject();    const observerA = {    next: value => console.log("Observer A get value: " + value),    error: error => console.log("Observer A error: " + error),    complete: () => console.log("Observer A complete!")  };    const observerB = {    next: value => console.log("Observer B get value: " + value),    error: error => console.log("Observer B error: " + error),    complete: () => console.log("Observer B complete!")  };    subject.subscribe(observerA);    subject.next(1);  subject.next(2);  subject.next(3);    setTimeout(() => {    subject.subscribe(observerB); // 1秒後訂閱  }, 1000);

以上代碼運行後,控制台的輸出結果:

Observer A get value: 1  Observer A get value: 2  Observer A get value: 3

通過輸出結果,我們發現在 observerB 訂閱 Subject 對象後,它再也沒有收到任何值了。因為 Subject 對象沒有再調用 next() 方法。但很多時候我們會希望 Subject 對象能夠保存當前的狀態,當新增訂閱者的時候,自動把當前最新的值發送給訂閱者。要實現這個功能,我們就需要使用 BehaviorSubject。

BehaviorSubject 跟 Subject 最大的不同就是 BehaviorSubject 是用來保存當前最新的值,而不是單純的發送事件。BehaviorSubject 會記住最近一次發送的值,並把該值作為當前值保存在內部的屬性中。

下面我們來使用 BehaviorSubject 重寫上面的示例:

import { BehaviorSubject } from "rxjs";  const subject = new BehaviorSubject(0);    const observerA = {    next: value => console.log("Observer A get value: " + value),    error: error => console.log("Observer A error: " + error),    complete: () => console.log("Observer A complete!")  };    const observerB = {    next: value => console.log("Observer B get value: " + value),    error: error => console.log("Observer B error: " + error),    complete: () => console.log("Observer B complete!")  };    subject.subscribe(observerA);    subject.next(1);  subject.next(2);  subject.next(3);    setTimeout(() => {    subject.subscribe(observerB); // 1秒後訂閱  }, 1000);

以上代碼運行後,控制台的輸出結果:

Observer A get value: 0  Observer A get value: 1  Observer A get value: 2  Observer A get value: 3  Observer B get value: 3

通過以上示例,我們知道 BehaviorSubject 會記住最近一次發送的值,當新的觀察者進行訂閱時,就會接收到最新的值。然後有些時候,我們新增的訂閱者,可以接收到數據源最近發送的幾個值,針對這種場景,我們就需要使用 ReplaySubject。

ReplaySubject

import { ReplaySubject } from "rxjs";  const subject = new ReplaySubject(2);    const observerA = {    next: value => console.log("Observer A get value: " + value),    error: error => console.log("Observer A error: " + error),    complete: () => console.log("Observer A complete!")  };    const observerB = {    next: value => console.log("Observer B get value: " + value),    error: error => console.log("Observer B error: " + error),    complete: () => console.log("Observer B complete!")  };    subject.subscribe(observerA);    subject.next(1);  subject.next(2);  subject.next(3);    setTimeout(() => {    subject.subscribe(observerB); // 1秒後訂閱  }, 1000);

以上代碼運行後,控制台的輸出結果:

Observer A get value: 1  Observer A get value: 2  Observer A get value: 3  Observer B get value: 2  Observer B get value: 3

可能會有人認為 ReplaySubject(1) 是不是等同於 BehaviorSubject,其實它們是不一樣的。在創建BehaviorSubject 對象時,是設置初始值,它用於表示 Subject 對象當前的狀態,而 ReplaySubject 只是事件的重放

AsyncSubject

AsyncSubject 類似於 last 操作符,它會在 Subject 結束後發出最後一個值,具體示例如下:

import { AsyncSubject } from "rxjs";  const subject = new AsyncSubject();    const observerA = {    next: value => console.log("Observer A get value: " + value),    error: error => console.log("Observer A error: " + error),    complete: () => console.log("Observer A complete!")  };    const observerB = {    next: value => console.log("Observer B get value: " + value),    error: error => console.log("Observer B error: " + error),    complete: () => console.log("Observer B complete!")  };    subject.subscribe(observerA);    subject.next(1);  subject.next(2);  subject.next(3);    subject.complete();    setTimeout(() => {    subject.subscribe(observerB); // 1秒後訂閱  }, 1000);

最後我們來介紹一下在 Angular 項目中,RxJS Subject 的應用。

Angular RxJS Subject 應用

在 Angular 中,我們可以利用 RxJS Subject 來實現組件間通信,具體示例如下:

  1. message.service.ts
import { Injectable } from '@angular/core';  import { Observable, Subject } from 'rxjs';    @Injectable({    providedIn: 'root'  })  export class MessageService {    private subject = new Subject<any>();      sendMessage(message: string) {      this.subject.next({ text: message });    }      clearMessage() {      this.subject.next();    }      getMessage(): Observable<any> {      return this.subject.asObservable();    }  }
  1. home.component.ts
import { Component } from '@angular/core';    import { MessageService } from '../message.service';    @Component({    selector: 'app-home',    template: `      <div>        <h1>Home</h1>        <button (click)="sendMessage()">Send Message</button>        <button (click)="clearMessage()">Clear Message</button>      </div>      `  })    export class HomeComponent {    constructor(private messageService: MessageService) { }      sendMessage(): void { // 發送消息      this.messageService.sendMessage('Message from Home Component to App Component!');    }      clearMessage(): void { // 清除消息      this.messageService.clearMessage();    }  }
  1. app.component.ts
import { Component, OnDestroy } from '@angular/core';  import { Subscription } from 'rxjs';    import { MessageService } from './message.service';    @Component({    selector: 'my-app',    template: `        <div *ngIf="message">{{message.text}}</div>        <app-home></app-home>      `  })    export class AppComponent implements OnDestroy {    message: any;    subscription: Subscription;      constructor(private messageService: MessageService) {      this.subscription = this.messageService.getMessage().subscribe(message => {        this.message = message;      });    }      ngOnDestroy() {      this.subscription.unsubscribe();    }  }

感興趣的同學可以查看 Stackblitz 完整示例。

參考資源