前言
RxSwift 由于在日常工作中會(huì)經(jīng)常使用阎毅,所以下面進(jìn)行核心源碼分析與探究楣颠,學(xué)習(xí)優(yōu)秀開(kāi)源框架之路,先進(jìn)行序列的源碼分析它褪。
RxSwift 核心流程
- 創(chuàng)建序列
- 訂閱序列
- 發(fā)送信號(hào)
第一步 創(chuàng)建序列
let ob = Observable<String>.create { (observer) -> Disposable in
return Disposables.create()
}
第二步 訂閱信號(hào)
let _ = ob.subscribe(onNext: { (text) in
print("訂閱信息: \(text)")
}, onError: { (error) in
print("error: \(error)")
}, onCompleted: {
print("訂閱結(jié)束")
}) {
print("已銷(xiāo)毀")
}
第三步 發(fā)送信號(hào)
let ob = Observable<String>.create { (observer) -> Disposable in
// 第三步:發(fā)送信號(hào)
obserber.onNext("你好明天")
return Disposables.create()
}
代碼合并:
//1鬼癣、創(chuàng)建序列
let obs = Observable<Any>.create { (observer) -> Disposable in
//3陶贼、發(fā)送信號(hào)
observer.onNext("我是一條消息")
return Disposables.create()
}
//2、訂閱序列
obs.subscribe(onNext: { (val) in
//4待秃、序列監(jiān)聽(tīng)
print("onNext:\(val)")
}).disposed(by: disposeBag)//5拜秧、打包待銷(xiāo)毀
從代碼合并角度來(lái)看:
- 通過(guò)Observable的create創(chuàng)建序列,在create閉包內(nèi)調(diào)用onNext方法實(shí)現(xiàn)信號(hào)發(fā)送
- 調(diào)用subscribe方法訂閱序列章郁,并實(shí)現(xiàn)subscribe的參數(shù)閉包onNext枉氮,在閉包內(nèi)監(jiān)聽(tīng)信號(hào)
- 最后通過(guò)disposed對(duì)序列打包等待銷(xiāo)毀
大致流程如下圖所示:
RxSwift 核心序列代碼分析
1 創(chuàng)建序列
1.使用Observable的create方法創(chuàng)建可觀察序列
public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
return AnonymousObservable(subscribe)
}
可觀察序列 的創(chuàng)建是利用協(xié)議拓展功能的create方法實(shí)現(xiàn)的,里面創(chuàng)建了一個(gè) AnonymousObservable(匿名可觀察序列)
//匿名序列
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
let _subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) {
//保存外部訂閱的代碼塊
self._subscribeHandler = subscribeHandler
}
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
- AnonymousObservable繼承了 Producer 具有非常重要的方法 subscribe
- 可以看到AnonymousObservable這里一共兩個(gè)方法:init方法是用來(lái)保存這個(gè)subscribe閉包暖庄,run方法執(zhí)行閉包操作
繼承關(guān)系如下圖所示:
2 訂閱序列
使用ObservableType的subscribe訂閱信號(hào)
public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
-> Disposable {
//此處省略若干行代碼
let observer = AnonymousObserver<E> { event in
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
}
該方法是對(duì)ObservableType的拓展聊替。在方法內(nèi)部已經(jīng)出現(xiàn)對(duì)觀察者的定義,AnonymousObserver類(lèi)型的閉包observer培廓。
observer內(nèi)部調(diào)用的外部(應(yīng)用層)實(shí)現(xiàn)的閉包惹悄,由此看出所有信號(hào)是由此發(fā)出,event是observer的參數(shù)医舆,不難看出俘侠,observer閉包也是在其他地方調(diào)用象缀,傳入帶有信號(hào)值的event參數(shù)
-
observer被當(dāng)做參數(shù)傳入到subscribe中蔬将,而observer的調(diào)用必然是在subscribe中實(shí)現(xiàn)的
self.asObservable().subscribe(observer)
-
self.asObservable()該方法返回本身,保證協(xié)議的一致性央星,方法如下:
public class Observable<Element> : ObservableType { // 省去代碼若干 public func asObservable() -> Observable<E> { return self } }
-
繼續(xù)斷點(diǎn)執(zhí)行找到subscribe方法霞怀,正是上面所提到的Producer中的方法,方法如下:
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element { // 省去代碼若干 return CurrentThreadScheduler.instance.schedule(()) { _ in let disposer = SinkDisposer() let sinkAndSubscription = self.run(observer, cancel: disposer) disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription) return disposer } }
-
observer觀察者被傳入到run中莉给,上面說(shuō)到該觀察者一定會(huì)被調(diào)用毙石,繼續(xù)深入
let sinkAndSubscription = self.run(observer, cancel: disposer)
-
發(fā)現(xiàn)self.run的調(diào)用,調(diào)用的是AnonymousObservable中的run方法颓遏,代碼如下:
final private class AnonymousObservable<Element>: Producer<Element> { typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable let _subscribeHandler: SubscribeHandler init(_ subscribeHandler: @escaping SubscribeHandler) { self._subscribeHandler = subscribeHandler } override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element { let sink = AnonymousObservableSink(observer: observer, cancel: cancel) let subscription = sink.run(self) return (sink: sink, subscription: subscription) } }
-
此處就是創(chuàng)建序列時(shí)的AnonymousObservable類(lèi)徐矩。在run方法類(lèi)創(chuàng)建了sink對(duì)象,在初始化時(shí)傳入了我們上面所說(shuō)的觀察者叁幢,記住sink保存了觀察者observer閉包滤灯,并且調(diào)用了sink.run(self)方法,傳入的是創(chuàng)建時(shí)產(chǎn)生的可觀察序列observable閉包對(duì)象,深入run:
final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType { typealias E = O.E typealias Parent = AnonymousObservable<E> // 省去代碼若干 // 此處向父類(lèi)Sink初始化了observer對(duì)象 override init(observer: O, cancel: Cancelable) { super.init(observer: observer, cancel: cancel) } func run(_ parent: Parent) -> Disposable { return parent._subscribeHandler(AnyObserver(self)) } }
-
此處parent由let subscription = sink.run(self)傳入鳞骤,self即為創(chuàng)建序列create方法返回的observable對(duì)象窒百,而_subscribeHandler是創(chuàng)建序列所保存的閉包,此時(shí)我們的閉包就被調(diào)用了豫尽,被調(diào)用閉包如下:
let obs = Observable<Any>.create { (observer) -> Disposable in //3篙梢、發(fā)送消息 observer.onNext("我是一條消息") return Disposables.create() }
3 發(fā)送信號(hào)
在信號(hào)發(fā)送閉包中通常調(diào)用一下三種方法,用來(lái)發(fā)送信號(hào)美旧。如下:
- observer.onNext("我是一條消息") 信號(hào)發(fā)送
- observer.onCompleted() 序列完成渤滞,完成后序列將被釋放
- observer.onError(error) 序列出錯(cuò)中斷,序列不可繼續(xù)使用榴嗅,被釋放
以上三個(gè)方法為ObserverType的拓展方法
extension ObserverType {
/// Convenience method equivalent to `on(.next(element: E))`
///
/// - parameter element: Next element to send to observer(s)
public func onNext(_ element: E) {
self.on(.next(element))
}
/// Convenience method equivalent to `on(.completed)`
public func onCompleted() {
self.on(.completed)
}
/// Convenience method equivalent to `on(.error(Swift.Error))`
/// - parameter error: Swift.Error to send to observer(s)
public func onError(_ error: Swift.Error) {
self.on(.error(error))
}
}
E表示了一個(gè)泛型信號(hào)量蔼水,可表示任意類(lèi)型的信號(hào)
-
.next(element)是一個(gè)帶泛型參數(shù)的枚舉,管理了三種類(lèi)型事件的消息傳遞录肯。如下:
public enum Event<Element> { /// Next element is produced. case next(Element) /// Sequence terminated with an error. case error(Swift.Error) /// Sequence completed successfully. case completed }
on這是AnonymousObservableSink中的方法趴腋,代碼如下:
final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
typealias E = O.E
typealias Parent = AnonymousObservable<E>
// 代碼省略若干行
func on(_ event: Event<E>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
switch event {
case .next:
if load(&self._isStopped) == 1 {
return
}
self.forwardOn(event)
case .error, .completed:
if fetchOr(&self._isStopped, 1) == 0 {
self.forwardOn(event)
self.dispose()
}
}
}
}
內(nèi)部根據(jù)Event枚舉不同成員變量做不同的信號(hào)發(fā)送,信號(hào)發(fā)送調(diào)用了forwardOn方法论咏。方法實(shí)現(xiàn)如下:
class Sink<O : ObserverType> : Disposable {
init(observer: O, cancel: Cancelable) {
self._observer = observer
self._cancel = cancel
}
final func forwardOn(_ event: Event<O.E>) {
if isFlagSet(&self._disposed, 1) {
return
}
self._observer.on(event)
}
}
_observer即是訂閱中在內(nèi)部產(chǎn)生的AnonymousObserver對(duì)象优炬,而該對(duì)象調(diào)用了on方法并傳遞了信號(hào)。on方法所在位置如下:
AnonymousObserver -> ObserverBase -> on()
class ObserverBase<ElementType> : Disposable, ObserverType {
typealias E = ElementType
func on(_ event: Event<E>) {
switch event {
case .next:
if load(&self._isStopped) == 0 {
self.onCore(event)
}
case .error, .completed:
if fetchOr(&self._isStopped, 1) == 0 {
self.onCore(event)
}
}
}
}
在方法內(nèi)部又掉用了self.onCore(event)厅贪,此時(shí)該方法在AnonymousObserver中實(shí)現(xiàn)蠢护,代碼如下:
final class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
typealias Element = ElementType
typealias EventHandler = (Event<Element>) -> Void
private let _eventHandler : EventHandler
init(_ eventHandler: @escaping EventHandler) {
self._eventHandler = eventHandler
}
override func onCore(_ event: Event<Element>) {
return self._eventHandler(event)
}
}