注:代碼塊中的...是一些不需要顯示的代碼家浇,直接用...替代了躯畴,rx的思維導圖我后面重新畫了一張刨晴,老漂亮了,畫出經(jīng)驗了茫死。
一.創(chuàng)建序列:
let ob = Observable<Any>.create { (obserber) -> Disposable in
obserber.onNext("onNext1")
return Disposables.create()
}
Observable
遵守了ObservableType
協(xié)議跪但,create
方法是協(xié)議的一個擴展方法。
create
的具體實現(xiàn)可以在工程目錄下搜索create.swift
查看峦萎。
create
方法返回了一個AnonymousObservable(subscribe)
類屡久,AnonymousObservable
初始化保存了閉包。保存在了self._subscribeHandler
屬性中爱榔,self
就是AnonymousObservable
被环,也就是 AnonymousObservable. _subscribeHandler,這個很重要,后面會用到(函數(shù)式編程思想)详幽。
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
let _subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) {
self._subscribeHandler = subscribeHandler
}
AnonymousObservable
,繼承于父類Producer
筛欢,父類Producer
繼承于Observable
,重寫了subscribe
方法。
下面是父類Observable
:
public class Observable<Element> : ObservableType {
/// Type of elements in sequence.
public typealias E = Element
init() {
#if TRACE_RESOURCES
_ = Resources.incrementTotal()
#endif
}
public func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
rxAbstractMethod()
}
public func asObservable() -> Observable<E> {
return self
}
deinit {
#if TRACE_RESOURCES
_ = Resources.decrementTotal()
#endif
}
解釋:
func asObservable()
悴能,返回一個self
揣钦,實現(xiàn)了非序列向序列的轉(zhuǎn)化,這個是萬物皆序列的第二種解釋漠酿,第一種解釋前面的博客介紹過了冯凹。例如UISwitch().rx.value
并不是一個序列,我們可以通過UISwitch().rx.value.asObservable()
,轉(zhuǎn)成一個序列使用炒嘲。
func subscribe
實現(xiàn)的是一個抽象方法宇姚,里面沒有具體的實現(xiàn)。
這里畫了一張序列繼承鏈的思維導圖夫凸,供大家參考浑劳,如有不對的地方望不吝賜教:
二.序列執(zhí)行訂閱操作
let _ = ob.subscribe(onNext: { (text) in
print("訂閱到:\(text)")
}, onError: { (error) in
print("error: \(error)")
}, onCompleted: {
print("完成")
}) {
print("銷毀")
}
ob.subscribe
方法中就下面的這段代碼需要我們?nèi)シ治觥?/p>
let observer = AnonymousObserver<E> { event in
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
首先通過let observer = AnonymousObserver<E>{...}
創(chuàng)建了一個觀察者。AnonymousObserver<E>
中的E
來自于public func subscribe(onNext: ((E) -> Void)? = nil, ....)
中的E
夭拌,而這個E
(序列類型)魔熏,要從序列的subscribe
方法中尋找,序列的subscribe
方法是來自于:class AnonymousObservable<Element>
---->父類Producer
----> Observable<Element>
public class Observable<Element> : ObservableType {
/// Type of elements in sequence.
public typealias E = Element
...
}
接著說let observer = AnonymousObserver<E>
的AnonymousObserver
類鸽扁,let observer = AnonymousObserver<E>{...}
這個其實是進行了一個初始化的操作蒜绽, self._eventHandler = eventHandler
保存了尾隨閉包:
final class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
typealias Element = ElementType
typealias EventHandler = (Event<Element>) -> Void
private let _eventHandler : EventHandler
init(_ eventHandler: @escaping EventHandler) {
#if TRACE_RESOURCES
_ = Resources.incrementTotal()
#endif
self._eventHandler = eventHandler
}
override func onCore(_ event: Event<Element>) {
return self._eventHandler(event)
}
...
}
這里的AnonymousObserver
繼承的ObserverBase
,其實是一個基類訂閱者桶现,包含核心方法on
躲雅, onCore
發(fā)送消息,和dispose
方法執(zhí)行銷毀操作骡和。
這里我們能夠看到相赁,如果AnonymousObserver<E>
后面的閉包能夠被調(diào)用,就能實現(xiàn)序列執(zhí)行訂閱操作中尾隨閉包的調(diào)動慰于,也就完成了序列的響應,也就是說后面我們要搞清楚序列和訂閱是如何關(guān)聯(lián)上的钮科。
而observer
的調(diào)用是在:
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
self.asObservable().subscribe(observer)
中 self.asObservable()
代表的就是序列,也就變成 ob.subscribe(observer)
(ob是剛開始創(chuàng)建的序列), ob
的subscribe
婆赠,需要到其父類(AnonymousObservable
)的父類(Producer
)中尋找跺嗽。
class Producer<Element> : Observable<Element> {
override init() {
super.init()
}
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
if !CurrentThreadScheduler.isScheduleRequired {
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
...
}
可以看到其實就是執(zhí)行了一個 let sinkAndSubscription = self.run(observer, cancel: disposer)
的run
方法,同時返回了一個銷毀者,run
的調(diào)用者不是AnonymousObservable
就是Producer
页藻,所以查找run
方法的實現(xiàn),如下植兰。
以下是run方法的實現(xiàn):
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
let _subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) {
self._subscribeHandler = subscribeHandler
}
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
我們看到run
方法中創(chuàng)建了一個let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
份帐,保存了observer
(訂閱者)和cancel
(銷毀者)。然后調(diào)用了sink.run(self)
楣导,這里的self
就是對應的類AnonymousObservable
废境,也就是sink.run(AnonymousObservable)
,也就是sink.run
傳入了一個序列,這一點非常重要噩凹,說明在sink
中獲得了序列巴元,后面sink
會把序列和訂閱者關(guān)聯(lián)。這個run
方法的執(zhí)行如下:
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}
parent._subscribeHandler(AnyObserver(self))
其實就是 AnonymousObservable._subscribeHandler(AnyObserver(self))
,這個方法是不是很熟悉驮宴,是的逮刨,就是create
初始化保存的代碼塊。這里我把run
方法的整個代碼塊貼出來堵泽,因為非常重要:
final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
typealias E = O.E
typealias Parent = AnonymousObservable<E>
// state
private let _isStopped = AtomicInt(0)
#if DEBUG
fileprivate let _synchronizationTracker = SynchronizationTracker()
#endif
override init(observer: O, cancel: Cancelable) {
super.init(observer: observer, cancel: cancel)
}
func on(_ event: Event<E>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
switch event {
case .next:
if load(self._isStopped) == 1 {
return
}
self.forwardOn(event)
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.forwardOn(event)
self.dispose()
}
}
}
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}
}
AnonymousObservable._subscribeHandler(AnyObserver(self))
,這個方法是怎么實現(xiàn)訂閱者和序列的關(guān)聯(lián)修己,主要是通過AnyObserver(self)
,也就是 AnyObserver(AnonymousObservableSink)
迎罗,在AnyObserver
這個結(jié)構(gòu)體中,自定義屬性self.observer = observer.on
(注意啦尤辱,這里的self.observer
是一個結(jié)構(gòu)體的一個function
)厢岂,保存了 observer
的on
方法的代碼塊光督,observer
是sink
傳過來的,也就是
on
方法(AnonymousObservableSink類內(nèi)的方法)
的內(nèi)部就是如下(核心邏輯):
func on(_ event: Event<E>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
switch event {
case .next:
if load(self._isStopped) == 1 {
return
}
self.forwardOn(event)
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.forwardOn(event)
self.dispose()
}
}
}
上面的意思總結(jié)起來就是 結(jié)構(gòu)體AnyObserver的一個函數(shù)AnyObserver. observer保存了一個on函數(shù)塊
回歸到 AnonymousObservable._subscribeHandler(AnyObserver(self))
可帽,外界的obserber.onNext("onNext--1")
窗怒,其實就是是調(diào)用的AnyObserver.onNext
.而onNext
其實就是調(diào)用了一個on
方法(AnyObserver
遵從了ObserverType
協(xié)議映跟,這個onNext
是協(xié)議的擴展方法)
extension ObserverType {
public func onNext(_ element: E) {
self.on(.next(element))
}
public func onCompleted() {
self.on(.completed)
}
public func onError(_ error: Swift.Error) {
self.on(.error(error))
}
}
這個on
方法,其實就是前面提到的AnonymousObservableSink
中的on
方法扬虚,再上一次代碼塊:
final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
...
func on(_ event: Event<E>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
switch event {
case .next:
if load(self._isStopped) == 1 {
return
}
self.forwardOn(event)
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.forwardOn(event)
self.dispose()
}
}
}
...
}
走到這里,在執(zhí)行switch
荸镊,---> 里面的self.forwardOn(event)
堪置,調(diào)用的而其實是父類的forwardOn
方法:
class Sink<O : ObserverType> : Disposable {
...
final func forwardOn(_ event: Event<O.E>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
if isFlagSet(self._disposed, 1) {
return
}
self._observer.on(event)
}
...
}
最后通過self._observer.on(event)
調(diào)用 我們之前創(chuàng)建訂閱者時的閉包(let observer = AnonymousObserver<E> {閉包} )
:
let observer = AnonymousObserver<E> { event in
#if DEBUG
synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { synchronizationTracker.unregister() }
#endif
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
然后就可以快樂的調(diào)用 onNext
舀锨,onError
岭洲,onCompleted
輸出print(訂閱到....)
了盾剩,終于完了,好累告私。以上就是RXSwift底層實現(xiàn)的核心邏輯了驻粟。如有不對的地方,煩請不吝賜教格嗅,本人不勝感激~~~