今天帶大家解讀下訂閱發(fā)布流程的內(nèi)部源碼庭惜。
本系列文章RxSwift使用的版本是:6.2.0
let observable = Observable<Int>.create { (anyObserver) -> Disposable in
anyObserver.onNext(1)
anyObserver.onCompleted()
return Disposables.create ()
}
let dispose = observable.subscribe(onNext: { ele in
print(ele)
}, onDisposed: {
print("dispose")
})
dispose.dispose()
上面代碼創(chuàng)建一個(gè)被觀察者这揣,并訂閱它,打印序列元素怀吻,最后dispose瞬浓,最后打印:
1
dispose
RxSwift的基本訂閱流程涉及到許多類和協(xié)議蓬坡,需要先弄清楚各個(gè)類和協(xié)議的作用以及它們之間的關(guān)系猿棉,否則直接看代碼容易繞暈。
先上個(gè)類圖把所有相關(guān)類和協(xié)議關(guān)系理清楚屑咳。
其中藍(lán)色的是類萨赁,橙色的協(xié)議,黃色的是枚舉兆龙, 綠色的是結(jié)構(gòu)體杖爽。我們一個(gè)一個(gè)說:
- 協(xié)議:
ObservableConvertibleType
這是個(gè)被觀察者的協(xié)議,唯一的一個(gè)協(xié)議方法是asObservable(), 表示可以轉(zhuǎn)換成被觀察者紫皇。
ObservableType
: 繼承自O(shè)bservableConvertibleType慰安,可以創(chuàng)建被觀察者,任何被觀察者類需要遵循此協(xié)議坝橡。在extension中定義了一個(gè)create函數(shù)用來創(chuàng)建被觀察者泻帮,兩個(gè)subscribe函數(shù)用來訂閱觀察者。
Disposable
訂閱取消的接口计寇,只有一個(gè)disposes協(xié)議方法锣杂,用來釋放相關(guān)資源脂倦。
Cancelable
繼承自Disposable,有一個(gè)isDisposed協(xié)議方法元莫,表示是否已釋放資源赖阻。
ObserverType
表示序列發(fā)布者,可以發(fā)送序列踱蠢。其中有幾個(gè)我們比較熟悉的方法:onNext onError onCompleted 這三個(gè)會(huì)調(diào)用on方法火欧,on只是個(gè)協(xié)議方法,看下代碼就明白了:
/// Supports push-style iteration over an observable sequence.
public protocol ObserverType {
/// The type of elements in sequence that observer can observe.
associatedtype Element
@available(*, deprecated, message: "Use `Element` instead.")
typealias E = Element
/// Notify observer about sequence event.
///
/// - parameter event: Event that occurred.
func on(_ event: Event<Element>)
}
/// Convenience API extensions to provide alternate next, error, completed events
extension ObserverType {
/// Convenience method equivalent to `on(.next(element: Element))`
///
/// - parameter element: Next element to send to observer(s)
public func onNext(_ element: Element) {
self.on(.next(element))
}
/// Convenience method equivalent to `on(.completed)`
public func onCompleted() {
self.on(.completed)
}
/// Convenience method equivalent to `on(.error(Swift.Error))`
/// - parameter error: Swift.Error to send to observer(s)
public func onError(_ error: Swift.Error) {
self.on(.error(error))
}
}
onNext onError onCompleted都調(diào)用了on方法茎截,它還包含一個(gè)關(guān)聯(lián)類型苇侵,Element可以認(rèn)為是個(gè)范型,表示元素的類型企锌。
- 類
Observable
Producer
AnonymousObservable
這三個(gè)都是被觀察者榆浓,依次繼承的關(guān)系,Observable遵循ObservableType撕攒, Producer 繼承Observable陡鹃,AnonymousObservable繼承了Producer。Producer抖坪,實(shí)現(xiàn)subscribe萍鲸,run方法, AnonymousObservable實(shí)現(xiàn)了run方法擦俐。
Sink
和 AnonymousObservableSink
這兩個(gè)算是是整個(gè)流程的核心類脊阴,對(duì)消息訂閱發(fā)送進(jìn)行管理。其中Sink
遵循Dispsable捌肴,包含兩個(gè)屬性O(shè)bserverType和Cancelable蹬叭,這是整個(gè)類圖唯一的兩個(gè)組合關(guān)系,其他類的屬性都是閉包状知,AnonymousObservableSink
繼承了Sink,同時(shí)遵循了ObserverType孽查,也就是說AnonymousObservableSink
包括一個(gè)ObserverType類型的屬性(由父類繼承而來)同時(shí)又遵循了ObserverType協(xié)議饥悴,這讓我想起的設(shè)計(jì)模式中的裝飾模式,其實(shí)整個(gè)Rx框架還有很多類型的Sink盲再。
ObserverBase
AnonymousObserver
, 遵循了Dispose和 ObserverType西设,這兩個(gè)是觀察者,繼承關(guān)系答朋。
結(jié)構(gòu)體 唯一的一個(gè)結(jié)構(gòu)體AnyObserver贷揽,遵循了ObserverType 這個(gè)好像也是觀察者,實(shí)際上是序列的發(fā)送者梦碗,用戶通過它來調(diào)用onNext 等方法發(fā)送序列禽绪,被觀察者如Observable通過AnyObserver發(fā)送序列蓖救,而AnonymousObserver對(duì)象負(fù)責(zé)接收序列。
枚舉 Event 表示序列事件印屁,包含next(Element) error(Swift.Error) onCompleted三個(gè)case循捺。
終于介紹完了所有的類和協(xié)議。
接下來需要說到三個(gè)重要的閉包:
- 創(chuàng)建被觀察者的閉包即我們一開始的代碼中的
{ (anyObserver) -> Disposable in
anyObserver.onNext(1)
anyObserver.onCompleted()
return Disposables.create ()
}
它的類型是(AnyObserver)-> Disposeable 在訂閱時(shí)會(huì)執(zhí)行這個(gè)閉包雄人,并且用anyObserver發(fā)送序列从橘。
- 訂閱時(shí)的onNext的閉包:
(onNext: { ele in
print(ele)
}, onDisposed: {
print("dispose")
})
這樣發(fā)送next序列時(shí),會(huì)調(diào)用這個(gè)閉包础钠。
- 第三個(gè)閉包在subscribe方法的內(nèi)部恰力,我們進(jìn)去看一下:
let observer = AnonymousObserver<Element> { event in
#if DEBUG
synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { synchronizationTracker.unregister() }
#endif
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
這里創(chuàng)建了一個(gè)AnonymousObserver, 也就是觀察者旗吁,創(chuàng)建時(shí)將閉包作為初始化參數(shù)踩萎,閉包中調(diào)用了第二個(gè)閉包onNext。所以第一個(gè)閉包和第三個(gè)閉包是如果關(guān)聯(lián)的呢阵漏?也就是訂閱的時(shí)候如何調(diào)用第一個(gè)閉包驻民,第一個(gè)閉包再調(diào)用到第三個(gè)閉包?這是整個(gè)流程的關(guān)鍵履怯?
我們?cè)偕蟼€(gè)流程圖看看整個(gè)訂閱發(fā)布的流程回还。
根據(jù)這個(gè)圖再結(jié)合源碼一步一步分析下流程。
- 首先通過Observable的 create方法創(chuàng)建序列叹洲,ObservableType extension 提供了實(shí)現(xiàn)柠硕,而Observable遵循了ObservableType協(xié)議,可以看到代碼創(chuàng)建了AnonymousObservable(subscribe), 并把閉包傳進(jìn)入运提,而AnonymousObservable類內(nèi)部持有了這個(gè)閉包蝗柔。
public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
return AnonymousObservable(subscribe)
}
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
let _subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) {
self._subscribeHandler = subscribeHandler
}
}
- 用返回的AnonymousObservable對(duì)象,調(diào)用subscribe方法民泵,這個(gè)方法在ObservableType 擴(kuò)展中癣丧,看一看subscribe主要代碼:
let observer = AnonymousObserver<Element> { event in
#if DEBUG
synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { synchronizationTracker.unregister() }
#endif
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
先包裝一個(gè)AnonymousObserver,即觀察者栈妆,把閉包作為初始化方法的參數(shù)傳進(jìn)去胁编,并賦值給它的_eventHandler屬性,跟AnonymousObservable比較類似鳞尔,不要與AnonymousObservable弄混了嬉橙。一個(gè)是觀察者,一個(gè)是被觀察者寥假。
閉包的代碼中根據(jù)事件類型調(diào)用 onNext 或 onError 或 onCompleted閉包市框,這個(gè)是最后一步的調(diào)用, 順便提一下處理.error 和 .completed 事件時(shí)會(huì)調(diào)用disposable.dispose()糕韧,說明這兩個(gè)事件發(fā)生后會(huì)取消訂閱枫振,回收資源喻圃,之后無法再發(fā)送序列了。
- 接著看
return Disposables.create( self.asObservable().subscribe(observer), disposable )
這里會(huì)創(chuàng)建Disposables蒋得,它傳入兩個(gè)disposeable级及,這里先不講dispose,看第一個(gè)參數(shù), 通過asObservable 調(diào)用subscribe额衙,并把剛剛創(chuàng)建的observer傳入饮焦,asObservable我們可以通過上面的類圖看到它是ObservableConvertibleType的協(xié)議,Observable 必然實(shí)現(xiàn)了這個(gè)協(xié)議窍侧,實(shí)際上是返回自身县踢。自身的類型其實(shí)是AnonymousObservable,所以就是通過self調(diào)用subscribe伟件,subscribe在父類Producer實(shí)現(xiàn)了硼啤。 - 接著看Producer 的subscribe代碼:
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
if !CurrentThreadScheduler.isScheduleRequired {
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
CurrentThreadScheduler是線程派發(fā),以后再說線程派發(fā)斧账,調(diào)用schedule谴返,在schedule會(huì)執(zhí)行傳入的閉包,所以最后會(huì)走到閉包中咧织,關(guān)注下閉包的代碼:調(diào)用了 self.run, 把參數(shù)observer 傳進(jìn)去嗓袱, observer是外面創(chuàng)建的AnonymousObserver對(duì)象,我們整個(gè)流程只會(huì)有一個(gè)AnonymousObserver和一個(gè)AnonymousObservable其他地方看到的observer都是傳進(jìn)去的习绢,所以看到observer簡(jiǎn)單想到是最開始創(chuàng)建的AnonymousObserver就行了渠抹。
- 現(xiàn)在到run方法了,AnonymousObservable實(shí)現(xiàn)了run方法:
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
- 這里比較關(guān)鍵:創(chuàng)建了AnonymousObservableSink對(duì)象闪萄,Sink翻譯做“業(yè)務(wù)下沉”梧却,表示這個(gè)類是管理者或者專門處理業(yè)務(wù)的。將observer和cancel傳進(jìn)去败去。observer還是通過調(diào)用鏈傳來的放航。接著調(diào)用AnonymousObservableSink的run方法:sink.run(self)
- 看一下AnonymousObservableSink的run方法:
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}
Parant 是 sink.run(self) 的self, self實(shí)際上是AnonymousObservable對(duì)象圆裕,就是唯一的被觀察者三椿,是用戶通過create創(chuàng)建的。
- 所以接下來調(diào)用了AnonymousObservable對(duì)象的_subscribeHandler葫辐,同時(shí)把AnyObserver(self)最為參數(shù)傳入,_subscribeHandler是個(gè)閉包伴郁,我們可以回到第1步看看耿战。_subscribeHandler是最開始 create 被觀察者傳入的閉包,所以到這一步才開始執(zhí)行第1步創(chuàng)建的閉包焊傅,這里還有個(gè)關(guān)鍵點(diǎn):
AnyObserver(self)
, 創(chuàng)建了一個(gè)AnyObserver并把self作為參數(shù)傳入剂陡,self是 AnonymousObservableSink對(duì)象啊狈涮,記住了后面會(huì)比較繞。 打開AnyObserver看看這個(gè)初始化方法:
/// Construct an instance whose `on(event)` calls `observer.on(event)`
///
/// - parameter observer: Observer that receives sequence events.
public init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
self.observer = observer.on
}
是這個(gè)沒問題鸭栖,通過類圖可以看到AnonymousObservableSink遵循了ObserverType歌馍,把on方法賦值給observer。
-
self.observer = observer.on
, 把AnonymousObservableSink的on 方法復(fù)制給observer 這里的observer可不是AnonymousObserver對(duì)象晕鹊,它是個(gè)閉包松却,類型是(Event<Element>) -> Void
。誒溅话,我是誰晓锻!我在哪里!????飞几,我們回到執(zhí)行_subscribeHandler的地方砚哆。 - 第8步說到執(zhí)行_subscribeHandler,也就是我們最開始create Observable的閉包:
let observable = Observable<Int>.create { (anyObserver) -> Disposable in
anyObserver.onNext(1)
anyObserver.onCompleted()
return Disposables.create()
}
- 當(dāng)代碼執(zhí)行
anyObserver.onNext(1)
屑墨, anyObserver是剛剛第8步創(chuàng)建的哦躁锁, AnyObserver沒有找到方法,它其實(shí)在ObserverType extension中(AnyObserver 遵循了ObserverType):
/// Convenience API extensions to provide alternate next, error, completed events
extension ObserverType {
/// Convenience method equivalent to `on(.next(element: Element))`
///
/// - parameter element: Next element to send to observer(s)
public func onNext(_ element: Element) {
self.on(.next(element))
}
執(zhí)行了on方法
- AnyObserver有實(shí)現(xiàn)了 on 方法協(xié)議:
public func on(_ event: Event<Element>) {
return self.observer(event)
}
調(diào)用
self.observer(event)
, 這個(gè)observer 在第9步(self.observer = observer.on
)由AnonymousObservableSink.on 賦值的閉包卵史。所以解下來調(diào)用AnonymousObservableSink.on方法战转。我們看看on方法實(shí)現(xiàn):
func on(_ event: Event<Element>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
switch event {
case .next:
if load(self._isStopped) == 1 {
return
}
self.forwardOn(event)
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.forwardOn(event)
self.dispose()
}
}
}
我們看到AnonymousObservableSink 類的 run 方法和on 方法在整個(gè)流程中至關(guān)重要,run方法調(diào)用了訂閱的handler程腹,而on 方法處理了事件匣吊。這說明AnonymousObservableSink類處理主要業(yè)務(wù)邏輯,是整個(gè)流程的核心寸潦。
- 最終會(huì)調(diào)用
self.forwardOn(event)
, 這個(gè)forwardOn是在父類Sink定義的色鸳,然后跳到forwardOn
看看:
final func forwardOn(_ event: Event<Observer.Element>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
if isFlagSet(self._disposed, 1) {
return
}
self._observer.on(event)
}
- 然后調(diào)用
self._observer.on(event)
, 這個(gè)_observer 是在第6步創(chuàng)建AnonymousObservableSink對(duì)象時(shí)作為初始化參數(shù)賦值的。這個(gè)observer就是唯一的AnonymousObserver對(duì)象见转,還知道它是什么時(shí)候創(chuàng)建的嗎命雀?所以我們?nèi)nonymousObserver找下on方法,沒找到斩箫,去父類ObserverBase找到了:
case .next:
if load(self._isStopped) == 0 {
self.onCore(event)
}
- 接著調(diào)用onCore吏砂,在AnonymousObserver里:
override func onCore(_ event: Event<Element>) {
return self._eventHandler(event)
}
終于繞出來了,調(diào)用self._eventHandler(event)
, _eventHandler 現(xiàn)在還記得是啥嗎乘客?啥時(shí)候被賦值的狐血?_eventHandler是在第2步創(chuàng)建AnonymousObserver時(shí)被賦值的。所以接著調(diào)用第2步的代碼:case .next(let value): onNext?(value)
調(diào)用onNext就是我們訂閱時(shí)傳入的閉包易核。最后打印1匈织, 希望大家能看明白,下一篇文章會(huì)解析dispose流程。