Combine 框架,從0到1 —— 3.使用 Subscriber 控制發佈速度

 

本文首發於 Ficow Shen’s Blog,原文地址: Combine 框架,從0到1 —— 3.使用 Subscriber 控制發佈速度

 

內容概覽

  • 前言
  • 在發佈者生產元素時消耗它們
  • 使用自定義的訂閱者施加背壓(back pressure)
  • 使用背壓操作符管理無限需求(Unlimited Demand)
  • 總結

 

前言

 

對於大多數響應式編程場景而言,訂閱者不需要對發佈過程進行過多的控制。當發佈者發佈元素時,訂閱者只需要無條件地接收即可。但是,如果發佈者發佈的速度過快,而訂閱者接收的速度又太慢,我們該怎麼解決這個問題呢?Combine 已經為我們制定了穩健的解決方案!現在,讓我們來了解如何施加背壓(back pressure,也可以叫反壓)以精確控制發佈者何時生成元素

 

Combine 中,發佈者生成元素,而訂閱者對其接收的元素進行操作。不過,發佈者會在訂閱者連接和獲取元素時才發送元素。訂閱者通過 Subscribers.Demand 類型來表明自己可以接收多少個元素,以此來控制發佈者發送元素的速率。

訂閱者可以通過兩種方式來表明需求(Demand):

  • 調用 Subscription 實例(由發佈者在訂閱者進行第一次訂閱時提供)的 request(_:) 方法;
  • 在發佈者調用訂閱者的 receive(_:) 方法來發送元素時,返回一個新的 Subscribers.Demand 實例;

Demand 是可以累加的。如果訂閱者已經請求了兩個元素,然後請求 Subscribers.Demand(.max(3)),則現在發佈者不滿足的需求是五個元素。如果發佈者隨後發送元素,則未滿足的需求將減少到四個。

 

發佈元素是減少未滿足需求的數量的唯一方法,訂閱者不能請求負需求。

 

很多應用會使用 sink(receiveValue:)assign(to:on:) 來創建便捷的訂閱者類型,分別為:Subscribers.SinkSubscribers.Assign。這兩種訂閱者在第一次連接到發佈者時,會發送一個 unlimitedDemand,這時候訂閱者會一直不停地接收發佈者發來的內容。

 

在發佈者生產元素時消耗它們

 

當發佈者的需求很高或不受限制時,它發送元素的速度可能比訂閱者處理元素的速度快很多。這種情況可能導致元素丟失,或者在元素等待被緩存時迅速增加內存的壓力。

如果您使用便捷的訂閱者,則會發生這種情況,因為它們的需求(Demand) 是無限數量 (unlimited) 的元素。確保您提供給 sink(receiveValue:) 的閉包和 assign(to:on:) 的副作用(執行效果)遵循以下特徵:

  • 不會阻塞發佈者;
  • 不會因為緩存元素而消耗過多的內存;
  • 不會不知所措並且不能處理元素;

慶幸的是,許多常用的發佈者(例如與用戶界面元素相關聯的發佈者)都會以可控的速度進行發佈。其他常見的發佈者僅僅生成一個元素,例如:URL 加載系統的 URLSession.DataTaskPublisher。配合這些發佈者,使用 sink(receiveValue:)assign(to:on:) 訂閱者是絕對安全的。

 

使用自定義的訂閱者施加背壓(back pressure)

 

想要控制發佈者向訂閱者發送元素的速率,可以創建訂閱者協議的自定義實現。使用你的自定義實現來指定你的訂閱者可以適應的需求。當訂閱者接收元素時,它可以通過返回新的需求值給 receive(_:) 方法,或通過在訂閱上調用 request(_:) 來請求更多內容。無論使用哪種方法,你自定義的訂閱者都可以在任何給定時間微調發佈者可以發送的元素數量。

 

通過發信號來表明訂閱者已準備好接收元素來控制流量的概念稱為背壓

 

每個發佈者都跟蹤其當前未滿足的需求,也就是:訂閱者已請求多少個元素。甚至,像 Foundation 框架中的 Timer.TimerPublisher 這樣的自動化資源,也只會在有未滿足的需求時才產生元素。

下面的示例代碼說明了這個行為:

// 發佈者: 使用一個定時器來每秒發送一個日期對象
let timerPub = Timer.publish(every: 1, on: .main, in: .default)
    .autoconnect()


// 訂閱者: 在訂閱以後,等待5秒,然後請求最多3個值
class MySubscriber: Subscriber {
    typealias Input = Date
    typealias Failure = Never
    var subscription: Subscription?
    
    func receive(subscription: Subscription) {
        print("published                             received")
        self.subscription = subscription
        DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
            subscription.request(.max(3))
        }
    }
    
    func receive(_ input: Date) -> Subscribers.Demand {
        print("\(input)             \(Date())")
        return Subscribers.Demand.none
    }
    
    func receive(completion: Subscribers.Completion<Never>) {
        print ("--done--")
    }
}

// 訂閱 timerPub
let mySub = MySubscriber()
print ("Subscribing at \(Date())")
timerPub.subscribe(mySub)

訂閱者的 receive(subscription:) 實現在請求發佈者的任何元素之前執行了五秒鐘的延遲。在此期間,發佈者存在並具有有效的訂閱者,但需求為零,因此不會產生任何元素。它僅在延遲到期且訂閱者給它一個非零需求 subscription.request(.max(3)) 之後才開始發佈元素,如以下輸出所示:

Subscribing at 2019-12-09 18:57:06 +0000
published                             received
2019-12-09 18:57:11 +0000             2019-12-09 18:57:11 +0000
2019-12-09 18:57:12 +0000             2019-12-09 18:57:12 +0000
2019-12-09 18:57:13 +0000             2019-12-09 18:57:13 +0000

這個示例只請求了三個元素,在五秒鐘的延遲到期後發出需求。最後,發佈者在第三個元素之後不再發送其他元素,但是也不會通過發送完成(.finished) 的值來完成發佈,因為發佈者只是在等待更多需求。為了繼續接收元素,訂閱者可以存儲訂閱並定期請求更多元素。它還可以在 receive(_:) 方法中返回新需求的值。

 

使用背壓操作符管理無限需求(Unlimited Demand)

 

即使沒有自定義的訂閱者,你也可以通過一些操作符來實施背壓:

  • buffer(size:prefetch:whenFull:) ,保留來自上游發佈者的固定數量的項目。緩衝滿了之後,緩衝區會丟棄元素或拋出錯誤;
  • debounce(for:scheduler:options:),只在上游發佈者在指定的時間間隔內停止發佈時才發佈;
  • throttle(for:scheduler:latest:),以給定的最大速率生成元素。如果在一個間隔內接收到多個元素,則僅發送最新的或最早的元素;
  • collect(_:)collect(_:options:) 聚集元素,直到它們超過給定的數量或時間間隔,然後向訂閱者發送元素數組。如果訂閱者可以同時處理多個元素,這個操作符將是很好的選擇。

由於這些操作符可以控制訂閱者接收的元素數量,因此可以放心地連接無限需求的訂閱者,例如:sink(receiveValue:)assign(to:on:)

 

總結

 

通過實施背壓,我們可以靈活地調控發佈過程。背壓操作符可以幫助我們應對大多數場景,這些操作符可以大幅提升我們的開發效率。

比如這種常見的場景:當搜索輸入框的內容發生變動時,應用需要去查找用戶輸入內容對應的結果,但是這個查找操作的頻率需要有一定的控制。如果用戶按住一個鍵不放開,輸入框的內容就會一直變化,此時就會觸發多次查找操作。這時候,我們可以從容地使用背壓操作符解決這種問題。

如果你需要處理的場景非常複雜,通過自定義訂閱者來實施精確的背壓將會是一個更好的選擇。

 

本文內容來源: Processing Published Elements with Subscribers,轉載請註明出處

 

Tags: