Combine 框架,從0到1 —— 2.通過 ConnectablePublisher 控制何時發布

 

本文首發於 Ficow Shen’s Blog,原文地址: Combine 框架,從0到1 —— 2.通過 ConnectablePublisher 控制何時發布

 

內容概覽

  • 前言
  • 使用 makeConnectable() 和 connect() 手動控制發布
  • 使用 autoconnect() 操作符進行自動連接
  • 總結

 

前言

 

使用 Connectable Publisher, 你可以決定發布者何時開始發送訂閱元素給訂閱者。那麼,為什麼我們需要這麼做?

使用 sink(receiveValue:) 可以立刻開始接收訂閱元素,但是這可能不是你想要的結果。當多個訂閱者訂閱了同一個發布者時,有可能會出現其中一個訂閱者收到訂閱內容,而另外一個訂閱者收不到的情況。

比如,當你發起一個網路請求,並為這個請求創建了一個發布者以及連接了這個發布者的訂閱者。

圖片alt

然後,這個訂閱者的訂閱操作觸發了實際的網路請求。在某個時間點,你將第二個訂閱者連接到了這個發布者。如果在連接第二個訂閱者之前,網路請求已經完成,那麼第二個訂閱者將只會收到完成事件,收不到網路請求的響應結果。這時候,這個結果將不是你所期望。

在使用 Combine 的過程中,我們往往需要面對這些問題。現在就來弄清楚如何處理這一類問題吧~

 

使用 makeConnectable() 和 connect() 控制發布

 

ConnectablePublisher 是一個協議類型,它可以在你準備好之前阻止發布者發布元素。

/// 可連接的發布者,它提供了顯式的連接、取消訂閱的方式
///
/// 使用 `makeConnectable()` 來從任何一個失敗類型是 `Never` 的發布者創建一個 `ConnectablePublisher`
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public protocol ConnectablePublisher : Publisher {

    /// 連接到發布者並返回一個用於取消發布的 `Cancellable` 實例
    ///
    /// - 返回值: 一個用於取消發布的 `Cancellable` 實例
    func connect() -> Cancellable
}

在你顯式地調用 connect() 方法之前,一個 ConnectablePublisher 不會發送任何元素。

現在,就讓我們用 ConnectablePublisher 來解決上面提到的網路請求示例中的問題吧!

ConnectablePublisher

在兩個訂閱者都連接到發布者之後,調用 connect(),然後網路請求才被觸發。這樣就可以避免競爭(race condition),保證兩個訂閱者都收到數據。

為了在你的 Combine 程式碼中使用 ConnectablePublisher,你可以使用 makeConnectable() 操作符將當前的發布者包裝到一個 Publishers.MakeConnectable 結構體實例中。

如下方的程式碼所示:

class ConnectablePublisherDemo {
    
    private var cancellable1: AnyCancellable?
    private var cancellable2: AnyCancellable?
    private var connection: Cancellable?
    
    func run() {
        let url = URL(string: "//ficow.cn")!
        let connectable = URLSession.shared
            .dataTaskPublisher(for: url)
            .map(\.data)
            .catch() { _ in Just(Data()) }
            .share()
            .makeConnectable() // 阻止發布者發布內容
        
        cancellable1 = connectable
            .sink(receiveCompletion: { print("Received completion 1: \($0).") },
                  receiveValue: { print("Received data 1: \($0.count) bytes.") })
        
        DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
            self.cancellable2 = connectable.sink(receiveCompletion: { log("Received completion 2: \($0).") },
                                                 receiveValue: { log("Received data 2: \($0.count) bytes.") })
        }

        DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
			// 顯式地啟動發布。返回值需要被強引用,可用於取消發布(主動調用cancel方法或返回值被析構)
            self.connection = connectable.connect() 
        }
    }

}

請注意,在 makeConnectable() 操作符前面有一個 share() 操作符!請問,這個操作符有什麼作用呢?

 

使用 autoconnect() 操作符進行自動連接

 

某些 Combine 發布者已經實現了 ConnectablePublisher 協議,如:Publishers.MulticastTimer.TimerPublisher。使用這些發布者時,如果你不需要配置發布者或者不需要連接多個訂閱者,你就需要顯式地調用 connect() 方法。

對於這種情況,ConnectablePublisher 提供了 autoconnect() 操作符。當一個訂閱者通過 subscribe(_:) 方法連接到發布者時,connect() 方法會被馬上調用。

let cancellable = Timer.publish(every: 1, on: .main, in: .default)
    .autoconnect()
    .sink() { date in
        print ("Date now: \(date)")
     }

上面的程式碼示例中使用了 autoconnect(),所以訂閱者可以馬上接收到定時器發送的元素。如果沒有 autoconnect(),我們就需要在某個時刻手動地調用 connect() 方法。

 

總結

 

Combine 為我們提供了很強大的非同步編程功能,不過這也是有代價的,我們需要深知使用 Combine 過程中可能會遭遇的問題。如果不了解這些「坑」就開始上路,犯錯的概率會非常高,犯錯的成本也會非常高。

 

本文內容來源: Controlling Publishing with Connectable Publishers,轉載請註明出處

 

Tags: