RxSwift 核心原理解析
角色定位
- 觀察者(Observer)
- 被觀察者(Observable)
- 訂閱者(Subscriber) 事件的最終處理者
- 管道(Sink) Observer 和 Observable 溝通的橋梁
Simple Example
Observable<String>.create { observer -> Disposable in
observer.onNext("hello")
return Disposables.create()
}
.subscribe { event in
print(event.element)
}
我們重點(diǎn)關(guān)注:
- create 閉包什么時(shí)候執(zhí)行型宝,
- subscribe 閉包又是什么時(shí)候執(zhí)行的
Observable 繼承體系
Observable
的核心函數(shù):
-
subscribe
訂閱操作,Observable
和Observer
通過(guò)訂閱建立聯(lián)系,
Observable 好比水源, Observer 好比水龍頭(永遠(yuǎn)開(kāi)著的水龍頭)絮爷, 訂閱的過(guò)程就是在Observable 和 Observer 之間建立管道趴酣, 一旦建立管道即是永久性的,只要水源有誰(shuí)坑夯, 水龍頭就會(huì)有水岖寞。 -
run
對(duì)用戶(hù)不可見(jiàn),隱藏了大量的實(shí)現(xiàn)細(xì)節(jié)柜蜈, 這個(gè)函數(shù)就是建立水管的過(guò)程 -
asObservable
: 這個(gè)協(xié)議的存在使得Observable 的定義變得更加廣泛慎璧。
示例:
/// GroupedObservable Class
/// Converts `self` to `Observable` sequence.
public func asObservable() -> Observable<Element> {
return source
}
ObservableType
協(xié)議擴(kuò)展
extension ObservableType {
/**
Subscribes an event handler to an observable sequence.
- parameter on: Action to invoke for each event in the observable sequence.
- returns: Subscription object used to unsubscribe from the observable sequence.
*/
public func subscribe(_ on: @escaping (Event<E>) -> Void)
-> Disposable {
let observer = AnonymousObserver { e in
on(e)
}
return self.asObservable().subscribe(observer)
}
}
- 這里的
GroupedObservable
, 本身無(wú)法被直接訂閱,但是它的屬性source
可以被訂閱跨释, 于是只需要在asObservable
函數(shù)中返回source
, 即可被訂閱胸私, 這樣的設(shè)計(jì)可以更自由使用組合方式,產(chǎn)生新的類(lèi)型的Observable
-
ObservableType
: 擴(kuò)展subscribe
方法鳖谈, 確保所有的Observable
行為一致岁疼, 都是經(jīng)由self.asObservable()
獲取Observable
Observer 繼承體系
Observer 核心函數(shù)
-
on
: 事件發(fā)生器 -
onCore
: 對(duì)用戶(hù)不可見(jiàn),隱藏具體實(shí)現(xiàn)細(xì)節(jié)
Sink 核心函數(shù)
- forwardOn
- on
- run
代碼分析
先看看Observable.create 函數(shù)
/// static Observable create method
public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
return AnonymousObservable(subscribe)
}
該函數(shù)返回一個(gè)AnonymousObservable
實(shí)例缆娃,這是因?yàn)?code>Observable是一個(gè)抽象類(lèi)捷绒,需要具體的實(shí)類(lèi)實(shí)現(xiàn)功能,而AnonymousObservable
是最簡(jiǎn)單的Observable
類(lèi)
// AnonymousObservable Class
final fileprivate class AnonymousObservable<Element> : Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
let _subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) {
_subscribeHandler = subscribeHandler
}
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
首先說(shuō)一下繼承體系: AnonymousObservable -> Producer -> Observable -> ObservableType -> ObservableConvertibleType
每一層都只處理一點(diǎn)點(diǎn)事情贯要,剩下的交給下一層處理
ObservableConvertibleType: 完全的抽象
ObservableType: 處理subscribe
Observable: 處理 asObservable
Producer: 重載subscribe
AnonymousObservable: 處理run
也就是說(shuō)如果說(shuō)如果我們要自定義一個(gè)Observable的話(huà)暖侨,通常只需要繼承Producer, 并實(shí)現(xiàn)run方法崇渗。AnonymousObservable
做的事情也不多字逗,實(shí)現(xiàn)run方法,作為create閉包的持有者宅广。
run方法涉及另外一個(gè)類(lèi)AnonymousObservableSink
,Sink作為Observer 和 Observable的銜接的橋梁葫掉,之前還在想為什么叫做ObservableSink,現(xiàn)在想明白了跟狱。Sink本身遵守ObseverType協(xié)議俭厚,與此同時(shí)實(shí)現(xiàn)了run方法,雖然沒(méi)有實(shí)現(xiàn)subscribe方法驶臊,但是已經(jīng)足夠了挪挤,這樣sink從某種程度來(lái)說(shuō)也是Observable叼丑,通過(guò)sink就可以完成從Observable到Obsever的轉(zhuǎn)變。
subscribe:
public func subscribe(_ on: @escaping (Event<E>) -> Void)
-> Disposable {
let observer = AnonymousObserver { e in
on(e)
}
return self.asObservable().subscribe(observer)
}
這里不太明白為什么一定要寫(xiě)成這樣:
let observer = AnonymousObserver { e in
on(e)
}
感覺(jué)直接寫(xiě)成let observer = AnonymousObserver(on)
會(huì)更加清晰扛门。AnonymousObserver
比較簡(jiǎn)單持有subscribe閉包鸠信,并且實(shí)現(xiàn)onCore
method.
剛才的只是個(gè)入口,這個(gè)才是核心
// Producer Class
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
if !CurrentThreadScheduler.isScheduleRequired {
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
let sinkAndSubscription = run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
這個(gè)if 和 else 不用管這個(gè)是線(xiàn)程安全相關(guān)的尖飞,不是我關(guān)注的重點(diǎn)症副。這里返回的是 Disposable
, Disposable
跟對(duì)象的生命周期密切相關(guān)店雅。暫且在這打個(gè)tag吧政基。
- branch_1: 線(xiàn)程安全
- branch_2: 生命周期管理
- branch_3: 核心邏輯實(shí)現(xiàn)細(xì)節(jié)
好了我們現(xiàn)在關(guān)注的是核心邏輯實(shí)現(xiàn)細(xì)節(jié),其他有時(shí)間再補(bǔ)上闹啦。核心語(yǔ)句
let sinkAndSubscription = self.run(observer, cancel: disposer)
我們來(lái)看看run的邏輯
// AnonymousObservable Class
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
之前提到過(guò)AnonymousObservableSink
,注意Sink是持有Observer的沮明,從這也可以看出來(lái)Observerable的 run方法觸發(fā)Sink
的run方法,接下來(lái)就要關(guān)注AnonymousObservableSink
方法窍奋。
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}
哇荐健,終于觸發(fā)subscribeHandler了,這里的subscribeHandler就是之前最開(kāi)始的閉包琳袄。
Observable<String>.create { observer -> Disposable in
observer.onNext("hello")
return Disposables.create()
}
也就是create閉包江场,真的是千辛萬(wàn)苦才跟蹤到這了,那么現(xiàn)在解決了第一個(gè)問(wèn)題: create閉包是什么執(zhí)行的
現(xiàn)在就要關(guān)注這個(gè)閉包執(zhí)行后會(huì)帶來(lái)什么后果窖逗?
目光轉(zhuǎn)到實(shí)體類(lèi)AnyObserver
,
public struct AnyObserver<Element> : ObserverType {
/// The type of elements in sequence that observer can observe.
public typealias E = Element
/// Anonymous event handler type.
public typealias EventHandler = (Event<Element>) -> Void
private let observer: EventHandler
/// Construct an instance whose `on(event)` calls `eventHandler(event)`
///
/// - parameter eventHandler: Event handler that observes sequences events.
public init(eventHandler: @escaping EventHandler) {
self.observer = eventHandler
}
/// Construct an instance whose `on(event)` calls `observer.on(event)`
///
/// - parameter observer: Observer that receives sequence events.
public init<O : ObserverType>(_ observer: O) where O.E == Element {
self.observer = observer.on
}
/// Send `event` to this observer.
///
/// - parameter event: Event instance.
public func on(_ event: Event<Element>) {
return self.observer(event)
}
/// Erases type of observer and returns canonical observer.
///
/// - returns: type erased observer.
public func asObserver() -> AnyObserver<E> {
return self
}
}
構(gòu)造函數(shù)的入?yún)⑹荅ventHandle,結(jié)合上下文 AnyObserver.observer = AnonymousObservableSink.on
,那么這下就明白了 observer.onNext("hello")
最終會(huì)觸發(fā)AnonymousObservableSink.on
事件
// AnonymousObservableSink.on
func on(_ event: Event<E>) {
#if DEBUG
_synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { _synchronizationTracker.unregister() }
#endif
switch event {
case .next:
if _isStopped == 1 {
return
}
forwardOn(event)
case .error, .completed:
if AtomicCompareAndSwap(0, 1, &_isStopped) {
forwardOn(event)
dispose()
}
}
}
老規(guī)矩忽略線(xiàn)程安全問(wèn)題址否,關(guān)注核心問(wèn)題
// Sink.forwardOn
final func forwardOn(_ event: Event<O.E>) {
#if DEBUG
_synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { _synchronizationTracker.unregister() }
#endif
if _disposed {
return
}
_observer.on(event)
}
還記得_observer 的實(shí)體是啥,沒(méi)關(guān)系幫你回憶一下:
public func subscribe(_ on: @escaping (Event<E>) -> Void)
-> Disposable {
let observer = AnonymousObserver { e in
on(e)
return self.asObservable().subscribe(observer)
}
我們最初傳進(jìn)來(lái)的subscribe閉包的實(shí)體類(lèi)碎紊,理解了這個(gè)接下來(lái)就簡(jiǎn)單了
// ObserverBase.on
func on(_ event: Event<E>) {
switch event {
case .next:
if _isStopped == 0 {
onCore(event)
}
case .error, .completed:
if AtomicCompareAndSwap(0, 1, &_isStopped) {
onCore(event)
}
}
}
// AnonymousObserver.onCore
override func onCore(_ event: Event<Element>) {
return _eventHandler(event)
}
這里的_eventHandler 就是最初傳進(jìn)來(lái)的訂閱閉包即:
.subscribe { event in
print(event.element)
}
好了到此為止了佑附,總結(jié)一下:
來(lái)個(gè)堆棧看的清晰些:
再來(lái)一個(gè)類(lèi)圖
稍微解釋一下整個(gè)過(guò)程分為兩個(gè)階段:
- Obsevable 構(gòu)建階段仗考,這里使用create構(gòu)造方法構(gòu)造Obsevable音同,還有其他各種各樣的構(gòu)造方法,這里不一一贅述秃嗜。
- subscribe 階段权均, Obsevable.subscribe ---> Obsevable.run ----> Sink.run ----> AnyObserver.On ----> Sink.on ----> Observer.On ---> Observer.OnCore ----> eventHandler
這個(gè)過(guò)程也可以分為多個(gè)階段:
Obsevable.subscribe ---> Obsevable.run ----> Sink.run 這個(gè)過(guò)程我理解為build階段,通過(guò)Sink建立Obsevable和 Observer的聯(lián)系锅锨,在這個(gè)階段Sink扮演還是Obsevable角色螺句。
AnyObserver.On ----> Sink.on ----> Observer.On ---> Observer.OnCore 這個(gè)階段的話(huà)Sink扮演的就是 Observer 角色