前言
通過上一篇內(nèi)容RxSwift學(xué)習(xí)--核心邏輯初探,對(duì)RxSwift
有了些初步的認(rèn)知础废,下面通過源碼來看一下RxSwift
到底有多騷
RxSwift核心邏輯再探
先把上篇中的例子代碼搬過來:
//第一步:創(chuàng)建序列
//在create()函數(shù)中傳入一個(gè)閉包史汗,任務(wù)是對(duì)每一個(gè)過來的訂閱進(jìn)行處理
let ob = Observable<Any>.create { (observer) -> Disposable in
// 第三步:發(fā)送信號(hào)(onCompleted和onError只能發(fā)送一個(gè))
observer.onNext("你好騷啊")
observer.onCompleted()
// observer.onError(NSError.init(domain: "loser", code: 10010, userInfo: nil))
return Disposables.create()
//第二步:訂閱信息
//當(dāng)我們訂閱了Observable的消息后浸剩,只要Observable的事件觸發(fā)肝匆,都會(huì)通過onNext這個(gè)閉包告訴我們味悄。
let _ = ob.subscribe(onNext: { (text) in
print("訂閱到:\(text)") //這里會(huì)監(jiān)聽到訂閱的Observable事件
}, onError: { (error) in
print("error: \(error)") //當(dāng)發(fā)生錯(cuò)誤時(shí)刻获,會(huì)回調(diào)這里
}, onCompleted: { // 當(dāng)序列執(zhí)行完畢時(shí)郎仆,會(huì)回調(diào)這里只祠。
print("完成")
}) {
print("銷毀")
}
下面我們根據(jù)代碼進(jìn)行具體分析:
1.創(chuàng)建序列Observable<Any>.create()
在執(zhí)行let ob = Observable<Any>.create { (obserber) -> Disposable in }
這句代碼時(shí),是創(chuàng)建了一個(gè)可觀察序列扰肌,點(diǎn)進(jìn)create()
方法的源碼
extension ObservableType {
/*
Creates an observable sequence from a specified subscribe method implementation.
- seealso: [create operator on reactivex.io](http://reactivex.io/documentation/operators/create.html)
- parameter subscribe: Implementation of the resulting observable sequence's `subscribe` method.
- returns: The observable sequence with the specified implementation for the `subscribe` method.
*/
public static func create(_ subscribe: @escaping (RxSwift.AnyObserver<Self.E>) -> Disposable) -> RxSwift.Observable<Self.E>
}
根據(jù)注釋路由可知create()
方法是在Create.swift
文件中實(shí)現(xiàn)的
public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
return AnonymousObservable(subscribe)
}
在create()
方法中返回了一個(gè)匿名內(nèi)部類---AnonymousObservable
(匿名序列)抛寝,用來存儲(chǔ)產(chǎn)生事件的閉包(self._subscribeHandler = subscribeHandler
)和激活處理事件閉包的入口(run
方法)在跟進(jìn)去這個(gè)類,
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
let _subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) {
self._subscribeHandler = subscribeHandler
}
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
可以看到在匿名序列的里面是把傳給create()
方法的閉包(也就是序列訂閱的回調(diào),self._subscribeHandler = subscribeHandler
)保存下來了
2.序列訂閱ob.subscribe(onNext:)
同樣的,跟進(jìn)去訂閱信號(hào)的這個(gè)方法的源碼盗舰,
在這里跟源碼晶府,跟進(jìn)去的是ObserveableType.subscribe
這個(gè)方法
public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
-> Disposable {
let disposable: Disposable
if let disposed = onDisposed {
disposable = Disposables.create(with: disposed)
}
else {
disposable = Disposables.create()
}
#if DEBUG
let synchronizationTracker = SynchronizationTracker()
#endif
let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
let observer = AnonymousObserver<E> { event in
#if DEBUG
synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { synchronizationTracker.unregister() }
#endif
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
}
可以看到在調(diào)用ob.subscribe()
這個(gè)方法的時(shí)候,onNext
, onError
,onComplete
, onDisposed
這四個(gè)閉包都是作為參數(shù)傳遞到這個(gè)訂閱函數(shù)中來钻趋,
在這段代碼里同樣的也創(chuàng)建了一個(gè)匿名觀察者let observer = AnonymousObserver<E>
用于存儲(chǔ)和處理事件的閉包川陆,而對(duì)于這個(gè)觀察者會(huì)傳入一個(gè)帶有event
的閉包,在前面已經(jīng)知道event
是一個(gè)枚舉類型爷绘,有三種事件類型.next书劝,.error,.completed
,所以只要這個(gè)觀察者調(diào)用了這三種事件土至,那么相應(yīng)的就會(huì)調(diào)用onNext购对,onError,onCompleted
這三個(gè)方法的具體實(shí)現(xiàn)陶因,可是這里的observer
觀察者只是一個(gè)局部變量骡苞,它怎么和外界的觀察者聯(lián)系起來呢?
騷就騷在這個(gè)訂閱方法的return
這句代碼:
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
其實(shí)這個(gè)self.asObservable()
就是我們?cè)谧铋_始定義的序列let ob = Observable<Any>
,而subscribe()
就是回調(diào)了我們剛剛創(chuàng)建的observer
閉包楷扬,而observer
就會(huì)調(diào)用傳入的監(jiān)聽序列消息閉包onNext, onError,onCompleted
,
那么這個(gè)subscribe()
是怎么回調(diào)了observer
呢解幽?
可知self.asObservable()
是AnonymousObservable
類型的,但是通過前面的AnonymousObservable
類的源碼可以發(fā)現(xiàn)烘苹,它并沒有subscribe()
這個(gè)方法躲株,但是AnonymousObservable
是繼承于Producer
(生產(chǎn)者)的,下面來看一下Producer
這個(gè)類的源碼:
class Producer<Element> : Observable<Element> {
override init() {
super.init()
}
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
if !CurrentThreadScheduler.isScheduleRequired {
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
rxAbstractMethod()
}
}
在這里面subscribe()
方法會(huì)把傳入的observer
镣衡,調(diào)用self.run(observer, cancel: disposer)
;那么再次跟進(jìn)去看源碼霜定;至于這個(gè)run()
方法具體怎么調(diào)用,應(yīng)該還是交給Producer
的子類AnonymousObservable
去實(shí)現(xiàn)的廊鸥,那么跟進(jìn)去望浩,代碼再次回到了AnonymousObservable
類里面:
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
在這里會(huì)創(chuàng)建一個(gè)AnonymousObservableSink
對(duì)象并持有observer
,然后這個(gè)AnonymousObservableSink
類會(huì)調(diào)用它自己的run()
方法,并傳入self
,再次跟進(jìn)去sink.run(self)
,發(fā)現(xiàn)來到了AnonymousObservableSink
類的源碼;
final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
typealias E = O.E
typealias Parent = AnonymousObservable<E>
// state
private let _isStopped = AtomicInt(0)
#if DEBUG
fileprivate let _synchronizationTracker = SynchronizationTracker()
#endif
override init(observer: O, cancel: Cancelable) {
super.init(observer: observer, cancel: cancel)
}
func on(_ event: Event<E>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
switch event {
case .next:
if load(self._isStopped) == 1 {
return
}
self.forwardOn(event)
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.forwardOn(event)
self.dispose()
}
}
}
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}
}
可以看到AnonymousObservableSink
的run()
方法惰说,返回parent._subscribeHandler()
,而這個(gè)Parent
,在這個(gè)AnonymousObservableSink
類中已經(jīng)取了別名磨德,那就是typealias Parent = AnonymousObservable<E>
;在前面我們已經(jīng)知道,AnonymousObservable
匿名序列的里面是把傳給create()
函數(shù)的閉包保存了下來吆视,那么return parent._subscribeHandler(AnyObserver(self))
這個(gè)_subscribeHandler
就是之前create()
函數(shù)的閉包,在這個(gè)_subscribeHandler(AnyObserver(self))
方法中把self
轉(zhuǎn)換成AnyObserver
對(duì)象典挑,也就是把AnonymousObservableSink
對(duì)象轉(zhuǎn)換成AnyObserver
對(duì)象.
現(xiàn)在接著來看AnyObserver
的源碼:
public struct AnyObserver<Element> : ObserverType {
/// The type of elements in sequence that observer can observe.
public typealias E = Element
/// Anonymous event handler type.
public typealias EventHandler = (Event<Element>) -> Void
private let observer: EventHandler
/// Construct an instance whose `on(event)` calls `eventHandler(event)`
///
/// - parameter eventHandler: Event handler that observes sequences events.
public init(eventHandler: @escaping EventHandler) {
self.observer = eventHandler
}
/// Construct an instance whose `on(event)` calls `observer.on(event)`
///
/// - parameter observer: Observer that receives sequence events.
public init<O : ObserverType>(_ observer: O) where O.E == Element {
self.observer = observer.on
}
/// Send `event` to this observer.
///
/// - parameter event: Event instance.
public func on(_ event: Event<Element>) {
return self.observer(event)
}
/// Erases type of observer and returns canonical observer.
///
/// - returns: type erased observer.
public func asObserver() -> AnyObserver<E> {
return self
}
}
通過剛剛的AnyObserver(self)
我們應(yīng)該知道這里的self
指的是AnonymousObservableSink
,所以在AnyObserver
的源碼中,可以看到在構(gòu)造函數(shù)中有一行代碼self.observer = observer.on
,就是把AnonymousObservableSink
類的on
函數(shù)賦值給AnyObserver
的observer
變量.注意這里保存的是AnonymousObservableSink.on
.
看到這里是否能明白啦吧,在調(diào)用ob.subscribe
方法的時(shí)候搔弄,創(chuàng)建了一個(gè)AnonymousObserver
對(duì)象,并把閉包傳入丰滑,但是這個(gè)對(duì)象最終走向的卻是繼承于Producer
的AnonymousObservable
類的subscribe()
方法,經(jīng)過調(diào)用self.run()
之后,最后返回AnonymousObservable._subscribeHandler()
方法,也就是ob.create()
方法褒墨。這部分代碼先分析到這里炫刷,下面來分析下發(fā)送信號(hào)的代碼。
3.發(fā)送信號(hào)observer.onNext("你好騷啊")
在剛開始分析create()
方法的時(shí)候郁妈,就已經(jīng)知道浑玛,observer.onNext("你好騷啊")
中的observer
是AnyObserver
類型的,可是在上面的AnyObserver
源碼中我們可以清楚地看到它并沒有onNext()
方法噩咪,那就去它的父類ObserverType
看一下顾彰,
extension ObserverType {
/// Convenience method equivalent to `on(.next(element: E))`
///
/// - parameter element: Next element to send to observer(s)
public func onNext(_ element: E) {
self.on(.next(element))
}
/// Convenience method equivalent to `on(.completed)`
public func onCompleted() {
self.on(.completed)
}
/// Convenience method equivalent to `on(.error(Swift.Error))`
/// - parameter error: Swift.Error to send to observer(s)
public func onError(_ error: Swift.Error) {
self.on(.error(error))
}
}
在這里可以看到調(diào)用ObserverType
的onNext
方法,返回ObserverType
的on(.next(element))
,所以在observer.onNext("你好騷啊")
調(diào)用onNext()
方法胃碾,其實(shí)調(diào)用的就是observer.on(.next())
,在前面的AnyObserver
的源碼中涨享,已經(jīng)知道AnyObserver
的observer
變量保存的是AnonymousObservableSink.on
,同樣的看一下在AnyObserver
的源碼中的on()
方法
public func on(_ event: Event<Element>) {
return self.observer(event)
}
這里返回的是self.observer(event)
,也就是說observer.on(.next())
可以是self.observer(.next())
仆百,然而self.observer = AnonymousObservableSink.on
,所以self.observer(.next())
可以是AnonymousObservableSink.on(.next())
,這里又回到了AnonymousObservableSink
這個(gè)類了厕隧,(AnonymousObservableSink
的源碼在前面已經(jīng)出現(xiàn)了)這是饒了一圈又回來了,下面把AnonymousObservableSink
的on(event)
方法單獨(dú)拿出來:
func on(_ event: Event<E>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
switch event {
case .next:
if load(self._isStopped) == 1 {
return
}
self.forwardOn(event)
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.forwardOn(event)
self.dispose()
}
}
}
在on(event)
方法中有句重要的代碼self.forwardOn(event)
,再跟進(jìn)去self.forwardOn(event)
這個(gè)方法俄周,可以看到進(jìn)入到Sink
類的方法中吁讨,這里AnonymousObservableSink
繼承于Sink
:
final func forwardOn(_ event: Event<O.E>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
if isFlagSet(self._disposed, 1) {
return
}
self._observer.on(event)
}
在這個(gè)方法中self._observer.on(event)
代碼中,這個(gè)self._observer
就是在初始化AnonymousObservableSink
時(shí)候傳入的observer
(
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
)峦朗,然而這個(gè)observer
是在subscribe()
訂閱方法中由let observer = AnonymousObserver
創(chuàng)建的observer
,那么等于是這里又來到了AnonymousObserver
中,那么下面就來看一下調(diào)用AnonymousObserver.on(event)
會(huì)發(fā)生什么建丧,現(xiàn)在來看一下AnonymousObserver
的源碼:
final class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
typealias Element = ElementType
typealias EventHandler = (Event<Element>) -> Void
private let _eventHandler : EventHandler
init(_ eventHandler: @escaping EventHandler) {
#if TRACE_RESOURCES
_ = Resources.incrementTotal()
#endif
self._eventHandler = eventHandler
}
override func onCore(_ event: Event<Element>) {
return self._eventHandler(event)
}
#if TRACE_RESOURCES
deinit {
_ = Resources.decrementTotal()
}
#endif
}
通過這里的源碼可以看到AnonymousObserver
類中并沒有on
方法,但是AnonymousObserver
是繼承于ObserverBase
的波势,在ObserverBase
的源碼中我們可以看到:
class ObserverBase<ElementType> : Disposable, ObserverType {
typealias E = ElementType
private let _isStopped = AtomicInt(0)
func on(_ event: Event<E>) {
switch event {
case .next:
if load(self._isStopped) == 0 {
self.onCore(event)
}
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.onCore(event)
}
}
}
func onCore(_ event: Event<E>) {
rxAbstractMethod()
}
func dispose() {
fetchOr(self._isStopped, 1)
}
}
調(diào)用ObserverBase
的on()
方法會(huì)返回self.onCore(event)
;這里回到子類AnonymousObserver
去執(zhí)行這個(gè)方法翎朱;
override func onCore(_ event: Event<Element>) {
return self._eventHandler(event)
}
可以看到這里返回的是self._eventHandler(event)
;也就是AnonymousObserver._eventHandler(event)
通過源碼可知這里的AnonymousObserver._eventHandler(event)
就是傳入的eventHandler(event)
,也就是序列訂閱方法ob.subscribe()
傳入的閉包,那么就來看下在subscribe()
訂閱方法中AnonymousObserver
創(chuàng)建的observer
到底對(duì)這個(gè)eventHandler(event)
做了什么事情:
let observer = AnonymousObserver<E> { event in
#if DEBUG
synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { synchronizationTracker.unregister() }
#endif
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
因?yàn)?code>Event是枚舉值這里會(huì)根據(jù)observer
傳入的event
來判斷到底該響應(yīng)onNext()
,onError()
,onCompleted()
的具體方法艰亮,這樣就是把create()
方法中的observer.onNext("你好騷啊")
與subscribe()
訂閱方法中onNext()
聯(lián)系起來了闭翩。這樣就說明了為什么觀察者調(diào)用了onNext()
方法,序列能夠訂閱到這個(gè)方法里的內(nèi)容迄埃。
總結(jié)
啰啰嗦嗦寫了這么多疗韵,算是對(duì)RxSwift
的核心邏輯有了個(gè)初步的認(rèn)識(shí),不等不說RxSwift
很強(qiáng)大侄非,足夠的"騷氣"蕉汪。目前寫到這里,對(duì)RxSwift
僅僅只是略知皮毛逞怨,冰山一角罷了者疤,對(duì)RxSwift
的學(xué)習(xí)還需要繼續(xù)努力。下面通過一張圖來總結(jié)一下: