Subscriber(這篇文章主要講assign
和sink
)
-
Publisher
中末尾提到了sink
和assign
的調(diào)用嫂沉,現(xiàn)在具體看一下具體做了什么
sink操作符的代碼調(diào)用
```
let arr: [Int] = [1, 2, 100]
/// `Sink` 將數(shù)組轉(zhuǎn)換成一個(gè)數(shù)據(jù)流
arr.publisher
/// 過濾數(shù)據(jù)流中大于2的元素
.filter{$0 > 2}
/// 進(jìn)行一次轉(zhuǎn)換稽寒,轉(zhuǎn)成String類型
.compactMap{"\($0)"}
/// 訂閱數(shù)據(jù)源
.sink { value in
debugPrint("數(shù)據(jù)流: \(value)")
}.store(in: &cancel)
```
Sink
之前講解了
Publisher
的數(shù)據(jù)流發(fā)布的過程,sink
相當(dāng)于接收publisher
發(fā)送的數(shù)據(jù)趟章,然后執(zhí)行receiveValue
閉包杏糙,調(diào)用者可以進(jìn)行自己的業(yè)務(wù)處理-
先看一下
sink
操作符public func sink( receiveValue: @escaping (Output) -> Void ) -> AnyCancellable { /// 包裝類`Sink`處理數(shù)據(jù)流,然后回調(diào)給receiveValue閉包 let subscriber = Subscribers.Sink<Output, Failure>( receiveCompletion: { _ in }, receiveValue: receiveValue ) /// 訂閱數(shù)據(jù)流 subscribe(subscriber) /// GC 包裝類 return AnyCancellable(subscriber) }
Sink包裝類實(shí)現(xiàn)
/// A simple subscriber that requests an unlimited number of values upon subscription. public final class Sink<Input, Failure: Error> : Subscriber, Cancellable { /// 持有外部的閉包 public var receiveValue: (Input) -> Void /// 完成閉包 public var receiveCompletion: (Subscribers.Completion<Failure>) -> Void /// 狀態(tài) private var status = SubscriptionStatus.awaitingSubscription /// 初始化持有完成回調(diào)閉包和數(shù)據(jù)流回調(diào)閉包 public init( receiveCompletion: @escaping (Subscribers.Completion<Failure>) -> Void, receiveValue: @escaping ((Input) -> Void) ) { self.receiveCompletion = receiveCompletion self.receiveValue = receiveValue } /// `Subscription`協(xié)議方法實(shí)現(xiàn) public func receive(subscription: Subscription) { guard case .awaitingSubscription = status else { subscription.cancel() return } subscription.request(.unlimited) } /// `Subscribers` 協(xié)議實(shí)現(xiàn)蚓土,執(zhí)行數(shù)據(jù)流回調(diào) public func receive(_ value: Input) -> Subscribers.Demand { /// 記錄當(dāng)前的數(shù)據(jù)流回調(diào) let receiveValue = self.receiveValue /// 觸發(fā)數(shù)據(jù)流回調(diào) receiveValue(value) return .none } /// `Subscribers` 完成回調(diào) public func receive(completion: Subscribers.Completion<Failure>) { let receiveCompletion = self.receiveCompletion self.receiveCompletion = { _ in } withExtendedLifetime(receiveValue) { receiveValue = { _ in } } /// 執(zhí)行完成回調(diào) receiveCompletion(completion) } /// 數(shù)據(jù)流取消(`cancel`) public func cancel() { guard case let .subscribed(subscription) = status else { return } withExtendedLifetime((receiveValue, receiveCompletion)) { receiveCompletion = { _ in } receiveValue = { _ in } } subscription.cancel() } }
assign操作符代碼調(diào)用
```
let arr: [Int] = [1, 2, 100]
/// `Assign`使用keypath進(jìn)行賦值
arr.publisher
.filter{$0 > 2}
.compactMap{"\($0)"}
/// keypath 賦值
.assign(to: \.name, on: root).store(in: &cancel)
debugPrint("root name: \(root.name)")
```
Assign
使用
KeyPath
來實(shí)現(xiàn)具體類的賦值操作宏侍,僅支持Class
類型的具體類-
assign
操作符public func assign<Root>(to keyPath: ReferenceWritableKeyPath<Root, Output>, on object: Root) -> AnyCancellable { /// 訂閱類,`Assign`進(jìn)行了包裝蜀漆,數(shù)據(jù)流流轉(zhuǎn)到這里進(jìn)行keypath賦值 let subscriber = Subscribers.Assign(object: object, keyPath: keyPath) /// 訂閱數(shù)據(jù)流 subscribe(subscriber) /// GC 處理 return AnyCancellable(subscriber) }
Assign
包裝類public final class Assign<Root, Input>: Subscriber, Cancellable { public typealias Failure = Never public private(set) var object: Root? /// The key path that indicates the property to assign. public let keyPath: ReferenceWritableKeyPath<Root, Input> private var status = SubscriptionStatus.awaitingSubscription /// 持有keypath的關(guān)聯(lián)類和keypath的鍵 public init(object: Root, keyPath: ReferenceWritableKeyPath<Root, Input>) { self.object = object self.keyPath = keyPath } public func receive(subscription: Subscription) { guard case .awaitingSubscription = status else { subscription.cancel() return } status = .subscribed(subscription) subscription.request(.unlimited) } /// 數(shù)據(jù)流流轉(zhuǎn)到這里谅河,進(jìn)行keypath賦值 public func receive(_ value: Input) -> Subscribers.Demand { guard case .subscribed = status, let object = self.object else { return .none } object[keyPath: keyPath] = value return .none } /// 完成回調(diào) public func receive(completion: Subscribers.Completion<Never>) { guard case .subscribed = status else { return } terminateAndConsumeLock() } public func cancel() { guard case let .subscribed(subscription) = status else { return } terminateAndConsumeLock() subscription.cancel() } private func terminateAndConsumeLock() { withExtendedLifetime(object) { object = nil } } }