Publisher 根據(jù) Subscriber
的請求提供數(shù)據(jù)。如果沒有任何訂閱請求霎匈,Publisher 不會提供任何數(shù)據(jù)。所以可以這樣說,Subscriber
負(fù)責(zé)向 Publisher 請求數(shù)據(jù)并接收數(shù)據(jù)(或失斂涿恕)。
Subscriber定義
public protocol Subscriber: CustomCombineIdentifierConvertible {
/// 可以接收的數(shù)據(jù)的類型
associatedtype Input
/// 可以接收的錯誤類型像捶;如果不接收錯誤上陕,則使用 `Never`
associatedtype Failure: Error
/// Publisher調(diào)用此方法以提供訂閱
func receive(subscription: Subscription)
/// Publisher調(diào)用此方法發(fā)布新的數(shù)據(jù)
func receive(_ input: Self.Input) -> Subscribers.Demand
/// Publisher調(diào)用此方法發(fā)送錯誤或完成事件
func receive(completion: Subscribers.Completion<Self.Failure>)
}
其中 Input
和Failure
分別表示了 Subscriber 能夠接受的數(shù)據(jù)類型和錯誤類型桩砰,如果不接收錯誤,則使用Never
释簿。
Subscription
連接 Publisher 和 Subscriber 是 Subscription亚隅,其定義如下:
public protocol Subscription: Cancellable, CustomCombineIdentifierConvertible {
/// 告訴 Publisher 可以發(fā)送多少個數(shù)據(jù)到 Subscriber
func request(_ demand: Subscribers.Demand)
}
Back pressure
Combine 約定 Subscriber 控制數(shù)據(jù)流,因此它可以同時控制整個流程中發(fā)生的所有操作庶溶,這個特性稱之為Back pressure煮纵。Subscription 中的request
方法就體現(xiàn)了這種特性,它返回值是一個Subscribers.Demand
偏螺,設(shè)置接受數(shù)據(jù)的最大值行疏,但是在每次收到新的數(shù)據(jù)以后都可以調(diào)整這個值,且這個值是累加的套像。
發(fā)布與訂閱流程
- Subscriber 通過調(diào)用
Publisher.subscribe
來告訴 Publisher 開始訂閱酿联。 - Publisher 通過調(diào)用
Subscriber.receive(subscription:)
發(fā)送確認(rèn)信息給 Subscriber。這個方法接收一個 Subscription凉夯。 - Subscriber 通過調(diào)用 2 中創(chuàng)建的 Subscription 上的
request(_: Demand)
方法來首次告訴 Publisher 需要事件的事件的最大值货葬。 - Publisher 通過調(diào)用
Subscriber.·receive(_: Input)
發(fā)送 1 個數(shù)據(jù)或者事件給 Subscriber 。 - 同4
- Publisher 通過調(diào)用
Subscriber.receive(completion :)
向 Subscriber 發(fā)送 completion 完成事件劲够。這里的 completion 可以是正常.finished
震桶,也可以是.failure
的,如果是.failure
的會攜帶一個錯誤信息征绎。注意:如果中途取消了訂閱蹲姐,Publisher 將不發(fā)送完成事件。
自定義
自己實(shí)現(xiàn)一個 Subscriber人柿,寫完以后對 Publisher 和 Subscriber 之間的關(guān)系會更加明晰柴墩。
// 1 通過數(shù)組創(chuàng)建一個Publisher
let publisher = [1,2,3,4,5,6].publisher
// 2 自定義一個Subscriber
class CustomSubscriber: Subscriber {
// 3 指定接收值的類型和錯誤類型
typealias Input = Int
typealias Failure = Never
// 4 Publisher首先會調(diào)用該方法
func receive(subscription: Subscription) {
// 接收訂閱的值不做限制,也可以通過.max()設(shè)置最大值
subscription.request(.unlimited)
}
// 5 接受到值時的方法凫岖,返回接收值的最大個數(shù)變化
func receive(_ input: Int) -> Subscribers.Demand {
// 打印出接收到的值
print("Received value", input)
// 返回.none,意思就是不改變最大接收數(shù)量江咳,也可以通過.max()設(shè)置增大多少
return .none
}
// 6 實(shí)現(xiàn)接收到完成事件的方法
func receive(completion: Subscribers.Completion<Never>) {
print("Received completion", completion)
}
}
// 訂閱Publisher
publisher.subscribe(CustomSubscriber())
/*輸出
Received value 1
Received value 2
Received value 3
Received value 4
Received value 5
Received value 6
Received completion finished
*/
內(nèi)置Subscriber
Sink
Assign
Sink
在閉包中處理數(shù)據(jù)或 completion 事件。
// 1 Just發(fā)送單個數(shù)據(jù)
let publisher = Just(1)
// 2 sink訂閱
publisher.sink(receiveCompletion: { _ in
print("receiveCompletion")
}, receiveValue: { value in
print(value)
})
/* 輸出
1
receiveCompletion
*/
Assign
- 為屬性寫入數(shù)據(jù)哥放。
- 它接受一個class對象以及對象類型上的某個KeyPath歼指。會將 Publisher 的 Output 數(shù)據(jù)設(shè)置到對應(yīng)的屬性上去。
// 1 創(chuàng)建對象
class Student {
var name: String = ""
}
let stu = Student()
// 2 Just發(fā)送單個數(shù)據(jù)
let publisher = Just("Hello Combine")
// 3 assign訂閱甥雕,設(shè)置到foo的bar屬性上
publisher.assign(to: \.name, on: stu)
print(stu.name)
/* 輸出
Hello Combine
*/
Cancellable
Combine 中提供了Cancellable
這個協(xié)議踩身,里面只定義了一個cancel
方法,用于提前結(jié)束訂閱流程社露。Sink
和Assign
都實(shí)現(xiàn)了Cancellable 協(xié)議挟阻,所以可以調(diào)用cancel
方法來取消訂閱。另外 Combine 中還定義了AnyCancellable
類,它也實(shí)現(xiàn)了 Cancellable 協(xié)議附鸽,這個類會在deinit
時自動執(zhí)行cancel
方法脱拼。
protocol Cancellable {
func cancel()
}
應(yīng)用
場景一:模擬用戶取消上傳數(shù)據(jù)。
let request = URLRequest(url: URL(string: "https://xxxxx")!)
let image = UIImage(named: "largeImage")
let imgFile: Data = image!.pngData()!
// 上傳Publisher
let downloadPublisher = Future<Data?, Never> { promise in
URLSession.shared.uploadTask(with: request, from: imgFile) { (data, _, _) in
promise(.success(data))
}.resume()
}
// 訂閱
let subscription = downloadPublisher.sink { data in
print("Received data: \(data)")
}
// 可以在完成之前調(diào)用cancel取消任務(wù)
subscription.cancel()
場景二:模擬網(wǎng)絡(luò)原因?qū)е碌木W(wǎng)絡(luò)請求中斷拒炎。
let dataPublisher = URLSession.shared.dataTaskPublisher(for: URL(string: "https://www.baidu.com")!)
let cancellableSink = dataPublisher
.sink(receiveCompletion: { completion in
switch completion {
case .finished:
print("received finished")
break
case .failure(let error):
print("received error: ", error)
}}, receiveValue: { someValue in
print(".sink() received \(someValue)")
})
// 可以取消
cancellableSink.cancel()