?
本文首發(fā)于 Ficow Shen's Blog,原文地址: Combine 框架桶现,從0到1 —— 3.使用 Subscriber 控制發(fā)布速度躲雅。
?
內(nèi)容概覽
- 前言
- 在發(fā)布者生產(chǎn)元素時(shí)消耗它們
- 使用自定義的訂閱者施加背壓(back pressure)
- 使用背壓操作符管理無限需求(Unlimited Demand)
- 總結(jié)
?
前言
?
對(duì)于大多數(shù)響應(yīng)式編程場(chǎng)景而言,訂閱者不需要對(duì)發(fā)布過程進(jìn)行過多的控制巩那。當(dāng)發(fā)布者發(fā)布元素時(shí)吏夯,訂閱者只需要無條件地接收即可。但是即横,如果發(fā)布者發(fā)布的速度過快噪生,而訂閱者接收的速度又太慢
,我們?cè)撛趺唇鉀Q這個(gè)問題呢东囚?Combine
已經(jīng)為我們制定了穩(wěn)健的解決方案跺嗽!現(xiàn)在,讓我們來了解如何施加背壓(back pressure页藻,也可以叫反壓)以精確控制發(fā)布者何時(shí)生成元素
桨嫁。
?
在 Combine
中,發(fā)布者生成元素份帐,而訂閱者對(duì)其接收的元素進(jìn)行操作璃吧。不過,發(fā)布者會(huì)在訂閱者連接和獲取元素時(shí)才發(fā)送元素废境。訂閱者通過 Subscribers.Demand
類型來表明自己可以接收多少個(gè)元素畜挨,以此來控制發(fā)布者發(fā)送元素的速率。
訂閱者可以通過兩種方式來表明需求(Demand
):
- 調(diào)用
Subscription
實(shí)例(由發(fā)布者在訂閱者進(jìn)行第一次訂閱時(shí)提供)的request(_:)
方法噩凹; - 在發(fā)布者調(diào)用訂閱者的
receive(_:)
方法來發(fā)送元素時(shí)巴元,返回一個(gè)新的Subscribers.Demand
實(shí)例;
Demand
是可以累加的驮宴。如果訂閱者已經(jīng)請(qǐng)求了兩個(gè)元素逮刨,然后請(qǐng)求 Subscribers.Demand(.max(3))
,則現(xiàn)在發(fā)布者不滿足的需求是五個(gè)元素堵泽。如果發(fā)布者隨后發(fā)送元素修己,則未滿足的需求將減少到四個(gè)。
?
發(fā)布元素是減少未滿足需求的數(shù)量的唯一方法迎罗,訂閱者不能請(qǐng)求負(fù)需求箩退。
?
很多應(yīng)用會(huì)使用 sink(receiveValue:)
和 assign(to:on:)
來創(chuàng)建便捷的訂閱者類型,分別為:Subscribers.Sink
和 Subscribers.Assign
佳谦。這兩種訂閱者在第一次連接到發(fā)布者時(shí)戴涝,會(huì)發(fā)送一個(gè) unlimited
的 Demand
,這時(shí)候訂閱者會(huì)一直不停地接收發(fā)布者發(fā)來的內(nèi)容。
?
在發(fā)布者生產(chǎn)元素時(shí)消耗它們
?
當(dāng)發(fā)布者的需求很高或不受限制時(shí)啥刻,它發(fā)送元素的速度可能比訂閱者處理元素的速度快很多奸鸯。這種情況可能導(dǎo)致元素丟失,或者在元素等待被緩存時(shí)迅速增加內(nèi)存的壓力可帽。
如果您使用便捷的訂閱者娄涩,則會(huì)發(fā)生這種情況,因?yàn)樗鼈兊男枨?Demand
) 是無限數(shù)量 (unlimited
) 的元素映跟。確保您提供給 sink(receiveValue:)
的閉包和 assign(to:on:)
的副作用(執(zhí)行效果)遵循以下特征:
- 不會(huì)阻塞發(fā)布者蓄拣;
- 不會(huì)因?yàn)榫彺嬖囟倪^多的內(nèi)存;
- 不會(huì)不知所措并且不能處理元素;
慶幸的是努隙,許多常用的發(fā)布者(例如與用戶界面元素相關(guān)聯(lián)的發(fā)布者)都會(huì)以可控的速度進(jìn)行發(fā)布球恤。其他常見的發(fā)布者僅僅生成一個(gè)元素,例如:URL
加載系統(tǒng)的 URLSession.DataTaskPublisher
荸镊。配合這些發(fā)布者咽斧,使用 sink(receiveValue:)
和 assign(to:on:)
訂閱者是絕對(duì)安全的。
?
使用自定義的訂閱者施加背壓(back pressure)
?
想要控制發(fā)布者向訂閱者發(fā)送元素的速率躬存,可以創(chuàng)建訂閱者協(xié)議的自定義實(shí)現(xiàn)张惹。使用你的自定義實(shí)現(xiàn)來指定你的訂閱者可以適應(yīng)的需求。當(dāng)訂閱者接收元素時(shí)岭洲,它可以通過返回新的需求值給 receive(_:)
方法宛逗,或通過在訂閱上調(diào)用 request(_:)
來請(qǐng)求更多內(nèi)容。無論使用哪種方法盾剩,你自定義的訂閱者都可以在任何給定時(shí)間微調(diào)發(fā)布者可以發(fā)送的元素?cái)?shù)量雷激。
?
通過發(fā)信號(hào)來表明訂閱者已準(zhǔn)備好接收元素來控制流量的概念稱為
背壓
。
?
每個(gè)發(fā)布者都跟蹤其當(dāng)前未滿足的需求彪腔,也就是:訂閱者已請(qǐng)求多少個(gè)元素侥锦。甚至进栽,像 Foundation
框架中的 Timer.TimerPublisher
這樣的自動(dòng)化資源德挣,也只會(huì)在有未滿足的需求時(shí)才產(chǎn)生元素。
下面的示例代碼說明了這個(gè)行為:
// 發(fā)布者: 使用一個(gè)定時(shí)器來每秒發(fā)送一個(gè)日期對(duì)象
let timerPub = Timer.publish(every: 1, on: .main, in: .default)
.autoconnect()
// 訂閱者: 在訂閱以后快毛,等待5秒格嗅,然后請(qǐng)求最多3個(gè)值
class MySubscriber: Subscriber {
typealias Input = Date
typealias Failure = Never
var subscription: Subscription?
func receive(subscription: Subscription) {
print("published received")
self.subscription = subscription
DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
subscription.request(.max(3))
}
}
func receive(_ input: Date) -> Subscribers.Demand {
print("\(input) \(Date())")
return Subscribers.Demand.none
}
func receive(completion: Subscribers.Completion<Never>) {
print ("--done--")
}
}
// 訂閱 timerPub
let mySub = MySubscriber()
print ("Subscribing at \(Date())")
timerPub.subscribe(mySub)
訂閱者的 receive(subscription:)
實(shí)現(xiàn)在請(qǐng)求發(fā)布者的任何元素之前執(zhí)行了五秒鐘的延遲。在此期間唠帝,發(fā)布者存在并具有有效的訂閱者屯掖,但需求為零,因此不會(huì)產(chǎn)生任何元素襟衰。它僅在延遲到期且訂閱者給它一個(gè)非零需求 subscription.request(.max(3))
之后才開始發(fā)布元素贴铜,如以下輸出所示:
Subscribing at 2019-12-09 18:57:06 +0000
published received
2019-12-09 18:57:11 +0000 2019-12-09 18:57:11 +0000
2019-12-09 18:57:12 +0000 2019-12-09 18:57:12 +0000
2019-12-09 18:57:13 +0000 2019-12-09 18:57:13 +0000
這個(gè)示例只請(qǐng)求了三個(gè)元素,在五秒鐘的延遲到期后發(fā)出需求。最后绍坝,發(fā)布者在第三個(gè)元素之后不再發(fā)送其他元素徘意,但是也不會(huì)通過發(fā)送完成(.finished
) 的值來完成發(fā)布,因?yàn)榘l(fā)布者只是在等待更多需求轩褐。為了繼續(xù)接收元素椎咧,訂閱者可以存儲(chǔ)訂閱并定期請(qǐng)求更多元素。它還可以在 receive(_:)
方法中返回新需求的值把介。
?
使用背壓操作符管理無限需求(Unlimited Demand)
?
即使沒有自定義的訂閱者勤讽,你也可以通過一些操作符來實(shí)施背壓:
-
buffer(size:prefetch:whenFull:)
,保留來自上游發(fā)布者的固定數(shù)量的項(xiàng)目拗踢。緩沖滿了之后脚牍,緩沖區(qū)會(huì)丟棄元素或拋出錯(cuò)誤; -
debounce(for:scheduler:options:)
秒拔,只在上游發(fā)布者在指定的時(shí)間間隔內(nèi)停止發(fā)布時(shí)才發(fā)布莫矗; -
throttle(for:scheduler:latest:)
,以給定的最大速率生成元素砂缩。如果在一個(gè)間隔內(nèi)接收到多個(gè)元素作谚,則僅發(fā)送最新的或最早的元素; -
collect(_:)
和collect(_:options:)
聚集元素,直到它們超過給定的數(shù)量或時(shí)間間隔庵芭,然后向訂閱者發(fā)送元素?cái)?shù)組妹懒。如果訂閱者可以同時(shí)處理多個(gè)元素,這個(gè)操作符將是很好的選擇双吆。
由于這些操作符可以控制訂閱者接收的元素?cái)?shù)量眨唬,因此可以放心地連接無限需求的訂閱者,例如:sink(receiveValue:)
和 assign(to:on:)
好乐。
?
總結(jié)
?
通過實(shí)施背壓匾竿,我們可以靈活地調(diào)控發(fā)布過程。背壓操作符可以幫助我們應(yīng)對(duì)大多數(shù)場(chǎng)景蔚万,這些操作符可以大幅提升我們的開發(fā)效率岭妖。
比如這種常見的場(chǎng)景:當(dāng)搜索輸入框的內(nèi)容發(fā)生變動(dòng)時(shí),應(yīng)用需要去查找用戶輸入內(nèi)容對(duì)應(yīng)的結(jié)果反璃,但是這個(gè)查找操作的頻率需要有一定的控制昵慌。如果用戶按住一個(gè)鍵不放開,輸入框的內(nèi)容就會(huì)一直變化淮蜈,此時(shí)就會(huì)觸發(fā)多次查找操作斋攀。這時(shí)候,我們可以從容地使用背壓操作符解決這種問題梧田。
如果你需要處理的場(chǎng)景非常復(fù)雜淳蔼,通過自定義訂閱者來實(shí)施精確的背壓
將會(huì)是一個(gè)更好的選擇侧蘸。
?
推薦繼續(xù)閱讀:Combine 框架,從0到1 —— 4.在 Combine 中使用通知
?
本文內(nèi)容來源: Processing Published Elements with Subscribers鹉梨,轉(zhuǎn)載請(qǐng)注明出處闺魏。
?