前言
RxSwift 的創(chuàng)建序列的流程調(diào)用比較復(fù)雜,我看了好久才真正的理解留拾,希望我這篇文章能保住對這個流程的源碼還沒有理解的同學(xué)书斜。我還不知道如何用一種直觀的方式將流程清晰的表達(dá)出來,不過關(guān)鍵的代碼本文已經(jīng)都放了出來颓哮,比較關(guān)鍵的方法也都有注釋說明。
創(chuàng)建自定義隊列
Rxswift 創(chuàng)建序列的代碼如下:
///創(chuàng)建序列
let seq = Observable<String>.create {
(observer) -> Disposable in
///發(fā)送元素
observer.onNext("RxSwift Hello World")
return Disposables.create()
}
///訂閱
dispose = seq.subscribe(onNext: {
e in
debugPrint(e)
})
///銷毀
dispose.dispose()
從代碼可以看到大體上可以分為三步走: 創(chuàng)建序列珊豹,訂閱序列簸呈,銷毀。
創(chuàng)建序列
///創(chuàng)建序列
public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
return AnonymousObservable(subscribe)
}
RxSwift創(chuàng)建的自定義序列實際上是 AnonymousObservable店茶,來看看AnonymousObservable 的實現(xiàn)
AnonymousObservable
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
let _subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) {
self._subscribeHandler = subscribeHandler
}
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
///調(diào)用創(chuàng)建時傳遞的 create 閉包
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
AnonymousObservable 的結(jié)構(gòu)比較簡單 蜕便,繼承了Producer<Element>,內(nèi)部持有了初始化創(chuàng)建時傳遞的閉包贩幻。
訂閱序列
public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
-> Disposable {
///這個Disposeable是為調(diào)用 onDisposed回調(diào)的
let disposable: Disposable
if let disposed = onDisposed {
disposable = Disposables.create(with: disposed)
}
else {
disposable = Disposables.create()
}
#if DEBUG
let synchronizationTracker = SynchronizationTracker()
#endif
let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
///創(chuàng)建的觀察者
let observer = AnonymousObserver<E> { event in
#if DEBUG
synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { synchronizationTracker.unregister() }
#endif
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
///這里創(chuàng)建的是BinaryDisposable
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
}
如果沒有理解代碼中的注釋沒有關(guān)系轿腺,源碼的調(diào)用過程也比較復(fù)雜,請繼續(xù)向下看丛楚。
訂閱序列的流程看起來比較長族壳,不過總結(jié)起來做了三件事:
- 根據(jù)是否傳遞了onDisposed, 創(chuàng)建了一個Disposeable(銷毀器)
- 創(chuàng)建了一個AnonymousObserver
- 調(diào)用 Disposables.creat 又創(chuàng)建了 Disposeable(銷毀器) 然后返回趣些,并且調(diào)用了另一個 subscribe方法仿荆。
Producer 的 subscribe 方法
class Producer<Element> : Observable<Element> {
override init() {
super.init()
}
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
if !CurrentThreadScheduler.isScheduleRequired {
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
///會調(diào)用子類的run方法
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
rxAbstractMethod()
}
}
上面比較關(guān)鍵的是run方法,看上面AnonymousObservable的注釋坏平。run方法的作用是調(diào)用 創(chuàng)建 調(diào)用create方法時傳遞的閉包拢操。
run方法
/// AnonymousObservableSink<O: ObserverType>
func run(_ parent: Parent) -> Disposable {
///這里的 parent 就是AnonymousObservable
///AnyObserver是對外界隱藏 觀察者具體類型的包裝
return parent._subscribeHandler(AnyObserver(self))
}
訂閱過程總的來說就創(chuàng)建觀察者,調(diào)用<創(chuàng)建序列時傳遞的閉包>將觀察者傳遞給調(diào)用方舶替,調(diào)用方拿到觀察者就可發(fā)送事件了令境。然后返回銷毀器,給調(diào)用者用來銷毀資源顾瞪。
相關(guān)對象的引用關(guān)系
下圖是整個過程涉及到的類型的引用圖舔庶,箭頭指向持有者:
結(jié)語
如果我的文章對你有幫助,那就請點個贊吧陈醒,讓我知道本篇文章確實起到了作用栖茉。Thanks?(?ω?)?