基本概念
要想充分理解RXSwift核心邏輯逛绵,那么首先必須要知道RXSwift里包含哪幾個角色,以及它們的職責(zé)。
被觀察者(Observable)
它主要負(fù)責(zé)產(chǎn)生事件揽乱,實(shí)質(zhì)上就是一個可被監(jiān)聽的序列(Sequence
)雷逆。
Observable<T>
這個類就是Rx框架的基礎(chǔ)弦讽,我們稱它為可觀察序列。
Observable<T>
==異步產(chǎn)生==>
event(element : T)
觀察者(Observer)
它主要負(fù)責(zé)監(jiān)聽事件然后對這個事件做出響應(yīng),或者說任何響應(yīng)事件的行為都是觀察者
往产。
訂閱者(Subscriber)
事件的最終處理者
管道(Sink)
Observer 和 Observable 溝通的橋梁:Sink相當(dāng)與一個加工者被碗,可以將源事件流轉(zhuǎn)換成一個新的事件流,如果將事件流比作水流仿村,事件的傳遞過程比作水管锐朴,那么Sink就相當(dāng)于水管中的一個轉(zhuǎn)換頭。
代碼解析
接下來我們結(jié)合以下很簡單的代碼來分析蔼囊,逐步揭開RXSwift的神秘面紗焚志。
//1:創(chuàng)建序列
let ob = Observable<Any>.create { (observer) -> Disposable in
// 3:發(fā)送信號
obserber.onNext("測試OnNext")
obserber.onCompleted()
obserber.onError(NSError.init(domain: "error!", code: 000, userInfo: nil))
return Disposables.create()
//2:訂閱信息
let _ = ob.subscribe(onNext: { (text) in
print("訂閱到:\(text)") //text從哪里來的畏鼓?
}, onError: { (error) in
print("error: \(error)") //error從哪里來的酱酬?
}, onCompleted: {
print("完成")
}) {
print("銷毀")
}
在這里我們主要關(guān)注下
create 閉包什么時候執(zhí)行,
subscribe 閉包又是什么時候執(zhí)行的
也就是3->2這條線
Create方法
public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
return AnonymousObservable(subscribe)
}
我們看到create 函數(shù)云矫, 返回了一個AnonymousObservable
實(shí)例膳沽,接著我們進(jìn)入AnonymousObservable
看它里面具體內(nèi)容
// AnonymousObservable Class
final fileprivate class AnonymousObservable<Element> : Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
let _subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) {
_subscribeHandler = subscribeHandler
}
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
在這里我們再次回顧下Observable集成體系
(父類)
ObservableConvertibleType
(完全的抽象)
|
ObservableType
( 處理subscribe)
|
Observable
(處理 asObservable)
|
Producer
(重載subscribe)
|
AnonymousObservable
(處理run)
(子類)
PS:由上面我們能看出,如果想自定義Observable
通常只需要繼承Producer让禀, 并實(shí)現(xiàn)run方法就可以了挑社。
在這里我們看到Run方法里面涉及了類AnonymousObservableSink
,它作為Observer 和 Observable的銜接的橋梁我們看到它本身遵守ObseverType
協(xié)議,與此同時實(shí)現(xiàn)了run
方法巡揍。
那也就是說痛阻,
sink
從某種程度來說也是Observable
通過sink就可以完成從Observable到Obsever的轉(zhuǎn)變。
總結(jié)下create
方法主要工作:
- 創(chuàng)建
AnonymousObservable
對象腮敌, - 用
_subscribeHandler
保存了閉包 - 寫了
run
方法在內(nèi)部創(chuàng)建了AnonymousObservableSink
Subscribe方法
public func subscribe(_ on: @escaping (Event<E>) -> Void)
-> Disposable {
let observer = AnonymousObserver { e in
on(e)
}
return self.asObservable().subscribe(observer)
}
上述代碼只是入口阱当,下面的才是核心(Producer里面)
// Producer Class
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
if !CurrentThreadScheduler.isScheduleRequired {
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
let sinkAndSubscription = run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
這里面我們看到了Producer
調(diào)用了自己的run
方法,而AnonymousObservableSink
作為其子類重寫了該方法缀皱,我們先去看下子類是如何寫的斗这。
// AnonymousObservable Class
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
之前提到過AnonymousObservableSink
,注意Sink
是持有Observer
的啤斗,從這也可以看出來
Observerable
的run
方法觸發(fā)Sink
的run
方法
接下來就要關(guān)注AnonymousObservableSink
方法表箭。
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}
在這里我們再一次見到了subscribeHandler
,這個subscribeHandler
就是之前最開始的閉包钮莲!
Observable<String>.create { observer -> Disposable in
observer.onNext("測試")
return Disposables.create()
}
至此我們知道了create閉包是什么執(zhí)行的了免钻,接下來我們自然的把目光鎖定到實(shí)體類AnyObserver
,看看它里面究竟是如何實(shí)現(xiàn)的崔拥。
public struct AnyObserver<Element> : ObserverType {
/// The type of elements in sequence that observer can observe.
public typealias E = Element
/// Anonymous event handler type.
public typealias EventHandler = (Event<Element>) -> Void
private let observer: EventHandler
/// Construct an instance whose `on(event)` calls `eventHandler(event)`
///
/// - parameter eventHandler: Event handler that observes sequences events.
public init(eventHandler: @escaping EventHandler) {
self.observer = eventHandler
}
/// Construct an instance whose `on(event)` calls `observer.on(event)`
///
/// - parameter observer: Observer that receives sequence events.
public init<O : ObserverType>(_ observer: O) where O.E == Element {
self.observer = observer.on
}
/// Send `event` to this observer.
///
/// - parameter event: Event instance.
public func on(_ event: Event<Element>) {
return self.observer(event)
}
/// Erases type of observer and returns canonical observer.
///
/// - returns: type erased observer.
public func asObserver() -> AnyObserver<E> {
return self
}
}
在這里我們看到其內(nèi)部的Observer
其實(shí)是一個EventHandler
极舔,并且在初始化的時候把外部傳過來的AnonymousObservableSink.on
賦值給了這個Observer
,也就是說observer.onNext("測試")
最終會觸發(fā)AnonymousObservableSink.on
事件
接著我們看下AnonymousObservableSink
的on
的具體實(shí)現(xiàn)
// AnonymousObservableSink.on
func on(_ event: Event<E>) {
#if DEBUG
_synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { _synchronizationTracker.unregister() }
#endif
switch event {
case .next:
if _isStopped == 1 {
return
}
forwardOn(event)
case .error, .completed:
if AtomicCompareAndSwap(0, 1, &_isStopped) {
forwardOn(event)
dispose()
}
}
}
AnonymousObservableSink
的on
會調(diào)用其內(nèi)部的forwardOn
// Sink.forwardOn
final func forwardOn(_ event: Event<O.E>) {
#if DEBUG
_synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { _synchronizationTracker.unregister() }
#endif
if _disposed {
return
}
_observer.on(event)
}
在這里還記得這個_observer
的實(shí)體嗎链瓦,看到下面代碼你一定會回憶起來拆魏,
public func subscribe(_ on: @escaping (Event<E>) -> Void)
-> Disposable {
let observer = AnonymousObserver { e in
on(e)
return self.asObservable().subscribe(observer)
}
對盯桦,沒錯,這個_observer
就是我們最初傳進(jìn)來的subscribe
閉包的實(shí)體類AnonymousObserver
~!
接著我們繼續(xù)看下這個AnonymousObserver
的on
方法又是如何實(shí)現(xiàn)的
繼承關(guān)系:
AnonymousObserver
->ObserverBase
->ObserverType
// ObserverBase.on
func on(_ event: Event<E>) {
switch event {
case .next:
if _isStopped == 0 {
onCore(event)
}
case .error, .completed:
if AtomicCompareAndSwap(0, 1, &_isStopped) {
onCore(event)
}
}
}
ObserverBase.on
會觸發(fā)onCore
方法渤刃,看下子類的實(shí)現(xiàn)
// AnonymousObserver.onCore
override func onCore(_ event: Event<Element>) {
return _eventHandler(event)
}
這里的_eventHandler
還記得嗎拥峦?它就是最初傳進(jìn)來的訂閱閉包即:
.subscribe { event in
print(event.element)
}
總結(jié)一下
Observable-Create階段:
- 創(chuàng)建
AnonymousObservable
- 保存閉包(
subscribeHandler
)
Observable-Subscribe階段:
- 創(chuàng)建
AnonymousObserver
- 調(diào)用自己(
AnonymousObservable
)的run
方法:(AnonymousObserver
作為參數(shù))
AnonymousObservable
重寫了run
,它在方法里面創(chuàng)建了AnonymousObservableSink
并在sink
里保存了這個剛創(chuàng)建的AnonymousObserver
- 調(diào)用
AnonymousObservableSink
的run
:run
方法里用到AnonymousObservable
的_subscribeHandler
并傳入AnyObserver
卖子,這里AnonymousObservableSink.on
賦值給了AnyObserver
內(nèi)部的EventHandler
成員observer
執(zhí)行階段:
AnyObserver.on
----> AnonymousObservableSink.on
----> AnonymousObservableSink.forwardOn
----> AnonymousObserver.on
---> AnonymousObserver.onCore
----> _eventHandler(event)
PS:
可以看出Sink
在不同的階段有著不同的身份
- Sink充當(dāng)Observable
Obsevable.subscribe
--->Obsevable.run
---->Sink.run
這個過程通過Sink建立Obsevable和 Observer的聯(lián)系
- Sink充當(dāng)Observer
AnyObserver.on
---->Sink.on
---->Observer.on
--->Observer.OnCore