@TOC
RxSwift序列核心邏輯
上一篇博客:Rxswift學(xué)習(xí)之(一)函數(shù)響應(yīng)式編程思想只是簡(jiǎn)單的分析了序列的核心邏輯由缆。本篇博客主要針對(duì)上一篇做一下更加深入的探討,如果有那些地方分析有誤晕换,還請(qǐng)留言:QQ:282889543,讓我們彼此提高铛铁,彼此成就匀归。
總的來說分析Rxswift的核心邏輯還是按照三部曲:創(chuàng)建序列择懂,訂閱序列,銷毀序列邑遏。核心思想是萬物皆序列佣赖。
1. 序列的創(chuàng)建
Observable可觀察者序列
我們先來看下創(chuàng)建Observable所涉及的類的繼承關(guān)系:
如下圖:
針對(duì)上面的類圖,簡(jiǎn)單分析下類的關(guān)系和設(shè)計(jì)思想:
首先分層實(shí)施的很徹底记盒,每一層都只解決一件事情憎蛤,一層層疊起來結(jié)構(gòu)非常清晰:
AnonymousObservable
-> Producer
-> Observable
-> ObservableType
-> ObservableConvertibleType
其次我們簡(jiǎn)單分解一下每個(gè)類都做了些什么:
-
ObservableConvertibleType:顧名思義即可轉(zhuǎn)換為
Observable
類型協(xié)議,方法只有一個(gè)asObservable
纪吮,這有什么好處呢俩檬?
- 用戶不需要關(guān)注其具體是哪個(gè)類型對(duì)象
- 讓用戶更多的關(guān)注其核心功能
-
ObservableType:也是個(gè)協(xié)議,繼承了ObservableConvertibleType協(xié)議的
asObservable
碾盟,它提供抽象方法subscribe豆胸,即我們常說的訂閱,只有外部訂閱了該對(duì)象巷疼,才能真正實(shí)現(xiàn)對(duì)該對(duì)象進(jìn)行觀察。 -
Observable:真正的類灵奖,可以稱之為元類嚼沿,對(duì)于用戶來說Observable 的功能是完整的,因?yàn)樗呀?jīng)具備了所有的用戶所需要的功能瓷患,盡管有些方法并沒有得到實(shí)現(xiàn)仍是抽象方法骡尽。
Producer: 它繼承了Observable
的所有方法,并實(shí)現(xiàn)subscribe 方法 -
AnonymousObservable:它繼承了
Producer
的所有方法擅编,并且增加了屬性let _subscribeHandler: SubscribeHandler
用來保存創(chuàng)建序列時(shí)傳入的閉包攀细,也就相對(duì)于擁有了調(diào)用這個(gè)序列的能力箫踩,此外它還實(shí)現(xiàn)run方法,這也是創(chuàng)建序列最核心關(guān)鍵的方法谭贪。在run()方法中它創(chuàng)建一個(gè)AnonymousObservableSink
final private類的對(duì)象境钟,而這個(gè)對(duì)象sink可以稱之為管子,它類似于manager的角色俭识,擁有序列和訂閱慨削,銷毀能力。這里有兩個(gè)疑惑:
問題1.
AnonymousObservableSink
為什么要定義成final private類套媚,不能被繼承缚态,也不能被外部訪問?
問題2. 創(chuàng)建的Observable
是如何關(guān)聯(lián)到訂閱的堤瘤?
這兩個(gè)問題我們后面再分析玫芦。
最后,我們總結(jié)一下設(shè)計(jì)思想:
事實(shí)上用戶所使用的
Observable
本辐,都是Producer
的子類和AnonymousObservable
平行的子類桥帆,只不過用戶不需要關(guān)心其具體實(shí)現(xiàn)罷了
每一個(gè)類似AnonymousObservable
的類,還有一個(gè)與之相關(guān)的類AnonymousObservableSink
师郑,Sink即管道环葵,所有這些組合起來才能讓其真正運(yùn)行起來,AnonymousObservableSink
同時(shí)擁有序列宝冕,訂閱功能张遭,類似于我們項(xiàng)目中經(jīng)常用的manager角色。
整個(gè)設(shè)計(jì)向上通過組合協(xié)議的方式描述其特性地梨,向下通過子類化的方式屏蔽其實(shí)現(xiàn)細(xì)節(jié)菊卷,類似于工廠模式,這樣的類也可以叫類簇宝剖。
序列創(chuàng)建的流程
通過上面類繼承關(guān)系洁闰,其實(shí)我們不難理解序列的創(chuàng)建流程,它確實(shí)也是只有比較簡(jiǎn)單的幾部万细,寥寥幾行代碼就搞定了扑眉,難點(diǎn)是上面拋出的幾個(gè)問題:
下面我們將通過一個(gè)簡(jiǎn)單的Rxswift的實(shí)例來分析一下序列的創(chuàng)建,訂閱赖钞,銷毀直接的流程和關(guān)系腰素。
實(shí)例1:
//1. 創(chuàng)建序列
let ob = Observable<Any>.create { (obserber) -> Disposable in
// 3:發(fā)送信號(hào)
obserber.onNext("kyl發(fā)送了信號(hào)")
obserber.onCompleted()
return Disposables.create()
}
// 2:訂閱信號(hào)
let _ = ob.subscribe(onNext: { (text) in
print("訂閱到:\(text)")
}, onError: { (error) in
print("error: \(error)")
}, onCompleted: {
print("完成")
}) {
print("銷毀")
}
上面實(shí)例1 的這段代碼可以用酷C老師的一個(gè)圖來清晰的表達(dá):
從上面的代碼和關(guān)系圖,我們可能會(huì)產(chǎn)生這樣一個(gè)疑惑:
問題3: 創(chuàng)建的ob序列雪营,僅僅只是通過
ob.subscribe()
訂閱了一下弓千,為什么我們?cè)趏b創(chuàng)建時(shí)的尾隨閉包(我們這里給個(gè)名字叫閉包A
)里面調(diào)用了obserber.onNext("kyl發(fā)送了信號(hào)")
這個(gè)代碼,我們就可以訂閱到let _ = ob.subscribe(onNext: { (text) in print("訂閱到:\(text)") }
這里會(huì)打酉灼稹:”訂閱到:kyl發(fā)送了信號(hào)“洋访。我們沒有看見他們之間有任何關(guān)聯(lián)镣陕,怎么ob發(fā)送消息,subcribe()的onNext閉包就可以觸發(fā)呢姻政,這是為什么呢呆抑?
我們可以這里可以簡(jiǎn)單推理下:ob.subscribe()這個(gè)訂閱方法肯定做了一些事情,在某個(gè)地方調(diào)用了閉包A
,才能實(shí)現(xiàn)這個(gè)功能扶歪。具體是怎么樣實(shí)現(xiàn)的呢理肺?下面我們將通過分析源碼來解答這個(gè)疑惑。
從上面的代碼我們可以知道善镰,創(chuàng)建序列就一行代碼:let ob = Observable<Any>.create { (obserber) ->
而這一行代碼其實(shí)是做了好多事情的妹萨。
首先我們通過一個(gè)流程圖來初略的了解一下序列創(chuàng)建流程:
創(chuàng)建序列的Rxswift原碼很簡(jiǎn)單,從上圖可以看出,直接一行代碼return AnonymousObservable(subscribe)
就結(jié)束了炫欺,這里我們們并沒有找到我們需要的答案乎完,甚至我們有點(diǎn)越來越暈感覺。
- AnonymousObservable類源碼
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
let _subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) {
self._subscribeHandler = subscribeHandler
}
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
我們先做個(gè)深呼吸品洛,放輕松树姨,此路不通那我們來嘗試分析其他方向,不能在一棵樹上吊死桥状。下面我們來分析一下訂閱的流程帽揪。
2.訂閱
回顧上面實(shí)例1中的訂閱代碼:let _ = ob.subscribe(onNext: { (text) in
這行代碼又做了些什么事情呢?下面我們通過源碼來深入分析一下:
- Rxswift訂閱
subscribe()
的源碼如下:
public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
-> Disposable {
... 上面代碼不是我們要分析的重點(diǎn)辅斟,...表示忽略了此次的一段源碼
/*注意转晰,此次定義了一個(gè)AnonymousObserver()對(duì)象,以參數(shù)的形式士飒,
構(gòu)造方法里面?zhèn)魅肓艘粋€(gè)尾隨閉包eventHandler查邢,
在這個(gè)閉包里面,當(dāng)收到event的不同事件酵幕,
會(huì)觸發(fā)并調(diào)用扰藕,我們 `let _ = ob.subscribe(onNext: { (text) in` 這個(gè)方法傳入閉包
*/
let observer = AnonymousObserver<E> { event in
...
switch event {
case .next(let value):
onNext?(value) //調(diào)用訂閱時(shí)傳入的ob.subscribe(onNext:閉包
case .error(let error):
if let onError = onError {
onError(error)//調(diào)用訂閱時(shí)傳入的ob.subscribe(onError:閉包
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()//調(diào)用訂閱時(shí)傳入的ob.subscribe(onCompleted:閉包
disposable.dispose()
}
}
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)/*這里直接返回了Disposables對(duì)象,用來釋放資源芳撒,
在它的構(gòu)造函數(shù)里面直接調(diào)用了self.asObservable().subscribe(observer)邓深,
而asObservable()就是我們創(chuàng)建的序列ob,也就是ob.subscribe(),
并傳入了,在這段代碼里面創(chuàng)建的局部變量let observer = AnonymousObserver<E>笔刹,*/
}
通過上面源碼我們可以知道:subscribe()這個(gè)方法庐完,以參數(shù)的形式傳入了onNext()閉包,onError()閉包徘熔,onComplete()閉包,在函數(shù)里面創(chuàng)建了一個(gè)AnonymousObserver對(duì)象observer淆党,這個(gè)對(duì)象創(chuàng)建的時(shí)候傳入了一個(gè)閉包酷师,當(dāng)收到不同event事件時(shí)讶凉,會(huì)分別調(diào)用我們subscribe()傳入的onNext,onError山孔,onComplete這三個(gè)閉包懂讯。最重要一點(diǎn)是return Disposables.create( self.asObservable().subscribe(observer), disposable )
這句代碼調(diào)用了我們真正的subscribe()函數(shù),并以參數(shù)的形式傳入了AnonymousObserver對(duì)象台颠,self.asObservable()就是我們create()函數(shù)創(chuàng)建的序列ob, 而到此處我們可以清晰的看到褐望,我們訂閱時(shí)傳入?yún)?shù)閉包和我們創(chuàng)建的ob建立了一個(gè)鏈條。
這里我們又有一個(gè)疑問:self.asObservable()為什么就是我們create()函數(shù)返回的ob呢串前?
要解答這個(gè)問題瘫里,我需要回顧一下上面分析的Observable類的繼承關(guān)系:Observable
-> ObservableType
-> ObservableConvertibleType
即Observable繼承ObservableType協(xié)議,ObservableType又繼承ObservableConvertibleType協(xié)議荡碾,而我們的ObservableConvertibleType提供了抽象方法asObservable()谨读,我們Observable類中實(shí)現(xiàn)了asObservable()這個(gè)方法,它直接返回self就它自己坛吁。
下面通過源碼來證實(shí):
///
/// It represents a push style sequence.
public class Observable<Element> : ObservableType {
...
public func asObservable() -> Observable<E> {
return self
}
...
}
分析了Rxswift訂閱subscribe()
的源碼感覺非常nice, 我們找到了我們ob 創(chuàng)建時(shí)傳入的閉包和我們訂閱時(shí)的閉包存在了一條鏈條關(guān)系劳殖,也就是只要ob發(fā)送了消息,那我們的訂閱者一定可以按照這個(gè)鏈條收到消息拨脉。但是我們還是不知道到底是怎么調(diào)用的哆姻,怎么觸發(fā)的。
而且我們注意到self.asObservable().subscribe(observer)
也就是AnonymousObservable
調(diào)用了subscribe()
方法玫膀,但是在AnonymousObservable
類中我們并沒有找到subscribe()的定義矛缨,所以我們需要來看他的父類Producer
- Producer的源碼如下:
class Producer<Element> : Observable<Element> {
override init() {
super.init()
}
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
if !CurrentThreadScheduler.isScheduleRequired {
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
/*重點(diǎn)在這里了,這里調(diào)用了run()方法匆骗,一切疑惑都清晰了劳景,我們知道了run()調(diào)用時(shí)傳入了observer,并且創(chuàng)建了sink管子,而這個(gè)管子具備了序列的功能碉就,可以調(diào)用on()方法盟广。
*/
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
rxAbstractMethod()
}
}
果然不出我們所料,在Producer
中我們找到了subscribe()的方法定義瓮钥,到此我們可以總結(jié)出很清晰的幾條線索了:
- (1)通過前面的類繼承關(guān)系可以知道是
Producer
實(shí)現(xiàn)了ObservableType
協(xié)議的subscribe()方法筋量。在這個(gè)方法里面調(diào)用了self.run(observer, cancel: disposer)
- (2) self.run()實(shí)際上就是AnonymousObservable.run(), 這這個(gè)方法里面做了三件事情:
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
//1.創(chuàng)建了一個(gè)sink管道對(duì)象,并將observer也就create()創(chuàng)建
//序列時(shí)傳入的閉包傳給了sink
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
//2. sink調(diào)用自己的run()方法碉熄,并把AnonymousObservable作為參數(shù)傳入桨武。
let subscription = sink.run(self)
//返回一個(gè)元組,包含sink管道信息锈津。
return (sink: sink, subscription: subscription)
}
- (3)AnonymousObservableSink類中run()方法中調(diào)用
parent._subscribeHandler(AnyObserver(self))
其中parent就是我們(2)中sink.run(self)
傳入的self呀酸,也就是AnonymousObservable對(duì)象;并且我們前面已經(jīng)知道_subscribeHandler就是創(chuàng)建序列時(shí)保存的那個(gè)通過create()函數(shù)參數(shù)傳入的 閉包:let ob = Observable<Any>.create { (obserber) -> Disposable in // 3:發(fā)送信號(hào) obserber.onNext("kyl發(fā)送了信號(hào)") obserber.onCompleted() return Disposables.create() }
琼梆。 現(xiàn)在已經(jīng)很清晰了parent._subscribeHandler(AnyObserver(self))
執(zhí)行閉包性誉,這行代碼就會(huì)調(diào)用obserber.onNext("kyl發(fā)送了信號(hào)")
這個(gè)行代碼窿吩。
- 現(xiàn)在我們可以通過一個(gè)流程圖來總結(jié)我們代碼執(zhí)行的流程:
上面的訂閱序列流程分析:我們弄明白了從訂閱序列到調(diào)用create()函數(shù)時(shí)傳入的參數(shù)閉包調(diào)用的邏輯,但是這個(gè)閉包發(fā)送onNext()信號(hào)后错览,怎么到訂閱消息的onNext()閉包我們還不是很清晰纫雁。因此我們需要分析AnonymousObserver
我們先來看下AnonymousObserver
類
-
AnonymousObserver
源碼定義如下:
final class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
typealias Element = ElementType
typealias EventHandler = (Event<Element>) -> Void
private let _eventHandler : EventHandler
/*構(gòu)造函數(shù),保存了EventHandler尾隨閉包*/
init(_ eventHandler: @escaping EventHandler) {
#if TRACE_RESOURCES
_ = Resources.incrementTotal()
#endif
self._eventHandler = eventHandler
}
//覆寫了onCore方法倾哺,調(diào)用了EventHandler閉包
override func onCore(_ event: Event<Element>) {
return self._eventHandler(event)
}
#if TRACE_RESOURCES
deinit {
_ = Resources.decrementTotal()
}
#endif
}
從AnonymousObserver
源碼中我們并沒有找到onNext()方法轧邪,那我們只能沿著它的繼承鏈往上查找,這里需要了解一下類的繼承關(guān)系:
- AnonymousObserver的繼承關(guān)系:
通過分析類的繼承關(guān)系羞海,我們得知:這樣一個(gè)關(guān)系鏈:
AnonymousObserver對(duì)象的on()方法會(huì)調(diào)用onCore()方法忌愚,ObserverType里面有onNext,onError,onComplete方法。但是on()是如何調(diào)用的扣猫,何時(shí)調(diào)用的呢菜循?
要解決這個(gè)疑問,我們需要再次回到我們創(chuàng)建序列的代碼:
public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
return AnonymousObservable(subscribe)
}
創(chuàng)建序列的create()方法傳入了一個(gè)subscribe閉包申尤,并返回了AnonymousObservable對(duì)象癌幕。其中subscribe閉包就是我們序列創(chuàng)建時(shí)參數(shù)形式傳入 閉包。并且AnonymousObservable初始化時(shí)將這個(gè)閉包保存起來了self._subscribeHandler = subscribeHandler
AnonymousObservable 有一個(gè)run()方法昧穿,run方法里面創(chuàng)建了一個(gè)AnonymousObservableSink
對(duì)象sink勺远,具體源碼如下:
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
let _subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) {
self._subscribeHandler = subscribeHandler
}
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
分析了這么久,繞了一圈时鸵,終于發(fā)現(xiàn)關(guān)鍵就在AnonymousObservableSink
管子這個(gè)對(duì)象里面了胶逢。sink這是個(gè)神奇的管子。它就保存了序列饰潜,也保存了訂閱初坠,還保存了用于銷毀的disposed 也就是同時(shí)擁有了創(chuàng)建序列,訂閱序列彭雾,銷毀序列功能碟刺。
我們來分析下AnonymousObservableSink
的源碼:
final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
typealias E = O.E
//這里的Parent就是我們上面分析的AnonymousObservable,非常重要
typealias Parent = AnonymousObservable<E>
// state
private let _isStopped = AtomicInt(0)
#if DEBUG
fileprivate let _synchronizationTracker = SynchronizationTracker()
#endif
// 構(gòu)造方法薯酝,傳入了observer序列半沽,和Cancelable
override init(observer: O, cancel: Cancelable) {
super.init(observer: observer, cancel: cancel)
}
//這里實(shí)現(xiàn) 了ObserverType協(xié)議的on()方法
func on(_ event: Event<E>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
switch event {
case .next:
if load(self._isStopped) == 1 {
return
}
//調(diào)用了父類的發(fā)布,self.forwardOn()會(huì)調(diào)用自己的on()方法
self.forwardOn(event)
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.forwardOn(event)
self.dispose()
}
}
}
func run(_ parent: Parent) -> Disposable {
/*調(diào)用了_subscribeHandler閉包吴菠,這個(gè)閉包就是我們之前創(chuàng)建序列時(shí)傳入閉包者填。
parent就是傳入進(jìn)來的序列,這里序列的閉包里傳入了self并且強(qiáng)轉(zhuǎn)為AnyObserver
這里將self傳給了閉包_subscribeHandler做葵,這樣_subscribeHandler也就具備了subcribe的能力占哟。
*/
return parent._subscribeHandler(AnyObserver(self))
}
}
其中Sink類的源碼如下:
class Sink<O : ObserverType> : Disposable {
fileprivate let _observer: O
fileprivate let _cancel: Cancelable
fileprivate let _disposed = AtomicInt(0)
#if DEBUG
fileprivate let _synchronizationTracker = SynchronizationTracker()
#endif
init(observer: O, cancel: Cancelable) {
#if TRACE_RESOURCES
_ = Resources.incrementTotal()
#endif
self._observer = observer
self._cancel = cancel
}
final func forwardOn(_ event: Event<O.E>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
if isFlagSet(self._disposed, 1) {
return
}
//這里調(diào)用了傳入observer.on()方法,
self._observer.on(event)
}
final func forwarder() -> SinkForward<O> {
return SinkForward(forward: self)
}
final var disposed: Bool {
return isFlagSet(self._disposed, 1)
}
func dispose() {
fetchOr(self._disposed, 1)
self._cancel.dispose()
}
deinit {
#if TRACE_RESOURCES
_ = Resources.decrementTotal()
#endif
}
}
從源碼分析我們得知:
我們的sink保存了我們的序列,當(dāng)我們調(diào)用ob.onNext()發(fā)送信號(hào)時(shí)榨乎,由于我們的sink已經(jīng)持有了ob, 這樣sink會(huì)調(diào)用on()方法嗓化,在on()方法里面會(huì)調(diào)用self.forwardOn(event),而在fowardOn()里面會(huì)調(diào)用self._observer.on(event)谬哀。這樣我的疑問就解決了,答案就是sink調(diào)用了on()方法严肪。
這里我們?cè)賮砜偨Y(jié)一下總的流程:
- 創(chuàng)建序列時(shí)
create()
返回了一個(gè)ob
, 這個(gè)ob就是序列史煎,創(chuàng)建的時(shí)候傳遞了一個(gè)閉包A
。在閉包A中調(diào)用了ob.onNext()
發(fā)送了信號(hào)驳糯。 - 訂閱序列時(shí)調(diào)用
ob.subscribe()
方法篇梭,這個(gè)方法會(huì)創(chuàng)建一個(gè)AnonymousObserver
對(duì)象,并調(diào)用了self.asObservable().subscribe(observer)
酝枢。 -
self.asObservable()
實(shí)際就是我們的ob
, 也就是ob調(diào)用了subscribe().而AnonymousObserver中沒有找到subscribe()恬偷。 - 我們?cè)?code>AnonymousObserver的父類中找到了subscribe(),發(fā)現(xiàn)subscribe()調(diào)用了
AnonymousObserver的run()
方法帘睦。 - 在AnonymousObserver的run()方法中袍患,創(chuàng)建了一個(gè)管子sink,并調(diào)用了
sink.run(self)
,sink是AnonymousObservableSink的對(duì)象竣付,而在sink的run()方法中parent._subscribeHandler(AnyObserver(self))
調(diào)用了創(chuàng)建序列時(shí)保存的閉包A (parent就是AnonymousObserver)诡延,這樣就解釋了訂閱時(shí),回調(diào)了A閉包的原因古胆。 - 至于怎么調(diào)用onNext()方法也是通過sink來實(shí)現(xiàn)的肆良。
- sink已經(jīng)持有了ob ,當(dāng)我們?cè)贏閉包里面調(diào)用ob.onNext()發(fā)送信號(hào)時(shí),實(shí)際會(huì)通過sink.on()來調(diào)用逸绎。首先sink.on()會(huì)調(diào)用forwardOn().
- 在forwardOn()中會(huì)調(diào)用self._observer.on(event)惹恃。
- _observer.on()會(huì)調(diào)用_observer.onCore()
- _observer.onCore(event)會(huì)根據(jù)event的類型判斷是調(diào)用onNext(),onError(),onComplete()中間一個(gè),由于我們傳遞的是onNext事件棺牧,所以會(huì)調(diào)用onNext() 巫糙,而這個(gè)_observer.onNext()會(huì)調(diào)用我們訂閱時(shí)傳入閉包subscribe(onNext:).
- 為什么回調(diào)的原因是:
public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
-> Disposable {
... 上面代碼不是我們要分析的重點(diǎn),...表示忽略了此次的一段源碼
/*注意陨帆,此次定義了一個(gè)AnonymousObserver()對(duì)象曲秉,以參數(shù)的形式,
構(gòu)造方法里面?zhèn)魅肓艘粋€(gè)尾隨閉包eventHandler疲牵,
在這個(gè)閉包里面承二,當(dāng)收到event的不同事件,
會(huì)觸發(fā)并調(diào)用纲爸,我們 `let _ = ob.subscribe(onNext: { (text) in` 這個(gè)方法傳入閉包
*/
let observer = AnonymousObserver<E> { event in
...
switch event {
case .next(let value):
onNext?(value) //調(diào)用訂閱時(shí)傳入的
這里調(diào)用ob.subscribe()的時(shí)候亥鸠,我們創(chuàng)建了AnonymousObserver和我們subscribe()傳入的onNext()閉包做了一個(gè)綁定,當(dāng)AnonymousObserver.onNext()
調(diào)用的時(shí)候必定會(huì)回調(diào)subscribe()傳入的onNext()閉包。而10中的_observer對(duì)象指的就是let observer = AnonymousObserver
- 還是通過這張圖來解釋最簡(jiǎn)潔: