rxjs里的Observable對象subscribe方法的執行原理

  • 2020 年 12 月 8 日
  • AI

看個例子:

const myObservable = of(1, 2, 3);

        // 創建一個觀察者對象-Observer(處理next、error、complete回調)
        const myObserver = {
         next: x => console.log('Observer got a next value: ' + x),
        error: err => console.error('Observer got an error: ' + err),
        complete: () => console.log('Observer got a complete notification'),
    };

        // 通過Observable的subscribe函數,觀察者去訂閱可觀察者的消息
        myObservable.subscribe(myObserver);

調用Observable的subscribe方法,傳入一個包含回調函數的observer對象:

後兩個參數都是undefined:

在toSubscriber函數里,因為nextOrObserver是我手動傳入的對象,所以前兩個IF條件均不滿足:

進入默認實現,新建一個Subscriber對象:

Subscriber是Subscription的子類:

我們現在的Subscriber的構造函數里,創建一個SafeSubscruber實例:this作為parent subscriber傳入

EmptyObserver是從./Observer導入進來的:

從SafeSubscriber的實現能看出,傳入的Observer對象的next,error和complete這些函數名稱都是硬編碼的,必須符合這個命名規範:

Object.create()方法創建一個新對象,使用現有的對象來提供新創建的對象的proto

執行subscribe:

sink的destination包含了應用程式傳入的complete, next, error邏輯:

這裡能看到,subscribe的邏輯就是,遍歷所有Observable參數,依次調用observer的next方法,最後再調用一次complete方法:

next調用私有的_next方法:

this._next調用this.destination.next:

最終調用到應用程式設計師傳入的next方法:

最後的輸出: