01. RxSwift源碼解讀:基本訂閱流程

今天帶大家解讀下訂閱發(fā)布流程的內(nèi)部源碼庭惜。

本系列文章RxSwift使用的版本是:6.2.0

let observable = Observable<Int>.create { (anyObserver) -> Disposable in
            anyObserver.onNext(1)
            anyObserver.onCompleted()
            return Disposables.create ()
        }
        let dispose = observable.subscribe(onNext: { ele in
            print(ele)
        }, onDisposed:  {
            print("dispose")
        })
        dispose.dispose()

上面代碼創(chuàng)建一個(gè)被觀察者这揣,并訂閱它,打印序列元素怀吻,最后dispose瞬浓,最后打印:

1
dispose

RxSwift的基本訂閱流程涉及到許多類和協(xié)議蓬坡,需要先弄清楚各個(gè)類和協(xié)議的作用以及它們之間的關(guān)系猿棉,否則直接看代碼容易繞暈。
先上個(gè)類圖把所有相關(guān)類和協(xié)議關(guān)系理清楚屑咳。


類圖.jpg

其中藍(lán)色的是類萨赁,橙色的協(xié)議,黃色的是枚舉兆龙, 綠色的是結(jié)構(gòu)體杖爽。我們一個(gè)一個(gè)說:

  • 協(xié)議:
    ObservableConvertibleType 這是個(gè)被觀察者的協(xié)議,唯一的一個(gè)協(xié)議方法是asObservable(), 表示可以轉(zhuǎn)換成被觀察者紫皇。
    ObservableType: 繼承自O(shè)bservableConvertibleType慰安,可以創(chuàng)建被觀察者,任何被觀察者類需要遵循此協(xié)議坝橡。在extension中定義了一個(gè)create函數(shù)用來創(chuàng)建被觀察者泻帮,兩個(gè)subscribe函數(shù)用來訂閱觀察者。
    Disposable 訂閱取消的接口计寇,只有一個(gè)disposes協(xié)議方法锣杂,用來釋放相關(guān)資源脂倦。
    Cancelable 繼承自Disposable,有一個(gè)isDisposed協(xié)議方法元莫,表示是否已釋放資源赖阻。
    ObserverType 表示序列發(fā)布者,可以發(fā)送序列踱蠢。其中有幾個(gè)我們比較熟悉的方法:onNext onError onCompleted 這三個(gè)會(huì)調(diào)用on方法火欧,on只是個(gè)協(xié)議方法,看下代碼就明白了:
/// Supports push-style iteration over an observable sequence.
public protocol ObserverType {
    /// The type of elements in sequence that observer can observe.
    associatedtype Element

    @available(*, deprecated, message: "Use `Element` instead.")
    typealias E = Element

    /// Notify observer about sequence event.
    ///
    /// - parameter event: Event that occurred.
    func on(_ event: Event<Element>)
}

/// Convenience API extensions to provide alternate next, error, completed events
extension ObserverType {
    
    /// Convenience method equivalent to `on(.next(element: Element))`
    ///
    /// - parameter element: Next element to send to observer(s)
    public func onNext(_ element: Element) {
        self.on(.next(element))
    }
    
    /// Convenience method equivalent to `on(.completed)`
    public func onCompleted() {
        self.on(.completed)
    }
    
    /// Convenience method equivalent to `on(.error(Swift.Error))`
    /// - parameter error: Swift.Error to send to observer(s)
    public func onError(_ error: Swift.Error) {
        self.on(.error(error))
    }
}

onNext onError onCompleted都調(diào)用了on方法茎截,它還包含一個(gè)關(guān)聯(lián)類型苇侵,Element可以認(rèn)為是個(gè)范型,表示元素的類型企锌。


  • Observable Producer AnonymousObservable 這三個(gè)都是被觀察者榆浓,依次繼承的關(guān)系,Observable遵循ObservableType撕攒, Producer 繼承Observable陡鹃,AnonymousObservable繼承了Producer。Producer抖坪,實(shí)現(xiàn)subscribe萍鲸,run方法, AnonymousObservable實(shí)現(xiàn)了run方法擦俐。

SinkAnonymousObservableSink 這兩個(gè)算是是整個(gè)流程的核心類脊阴,對(duì)消息訂閱發(fā)送進(jìn)行管理。其中Sink 遵循Dispsable捌肴,包含兩個(gè)屬性O(shè)bserverType和Cancelable蹬叭,這是整個(gè)類圖唯一的兩個(gè)組合關(guān)系,其他類的屬性都是閉包状知,AnonymousObservableSink繼承了Sink,同時(shí)遵循了ObserverType孽查,也就是說AnonymousObservableSink 包括一個(gè)ObserverType類型的屬性(由父類繼承而來)同時(shí)又遵循了ObserverType協(xié)議饥悴,這讓我想起的設(shè)計(jì)模式中的裝飾模式,其實(shí)整個(gè)Rx框架還有很多類型的Sink盲再。

ObserverBase AnonymousObserver, 遵循了Dispose和 ObserverType西设,這兩個(gè)是觀察者,繼承關(guān)系答朋。

  • 結(jié)構(gòu)體 唯一的一個(gè)結(jié)構(gòu)體AnyObserver贷揽,遵循了ObserverType 這個(gè)好像也是觀察者,實(shí)際上是序列的發(fā)送者梦碗,用戶通過它來調(diào)用onNext 等方法發(fā)送序列禽绪,被觀察者如Observable通過AnyObserver發(fā)送序列蓖救,而AnonymousObserver對(duì)象負(fù)責(zé)接收序列。

  • 枚舉 Event 表示序列事件印屁,包含next(Element) error(Swift.Error) onCompleted三個(gè)case循捺。
    終于介紹完了所有的類和協(xié)議。
    接下來需要說到三個(gè)重要的閉包:

  1. 創(chuàng)建被觀察者的閉包即我們一開始的代碼中的
{ (anyObserver) -> Disposable in
            anyObserver.onNext(1)
            anyObserver.onCompleted()
            return Disposables.create ()
        }

它的類型是(AnyObserver)-> Disposeable 在訂閱時(shí)會(huì)執(zhí)行這個(gè)閉包雄人,并且用anyObserver發(fā)送序列从橘。

  1. 訂閱時(shí)的onNext的閉包:
  (onNext: { ele in
            print(ele)
        }, onDisposed:  {
            print("dispose")
        })

這樣發(fā)送next序列時(shí),會(huì)調(diào)用這個(gè)閉包础钠。

  1. 第三個(gè)閉包在subscribe方法的內(nèi)部恰力,我們進(jìn)去看一下:
let observer = AnonymousObserver<Element> { event in
                
                #if DEBUG
                    synchronizationTracker.register(synchronizationErrorMessage: .default)
                    defer { synchronizationTracker.unregister() }
                #endif
                
                switch event {
                case .next(let value):
                    onNext?(value)
                case .error(let error):
                    if let onError = onError {
                        onError(error)
                    }
                    else {
                        Hooks.defaultErrorHandler(callStack, error)
                    }
                    disposable.dispose()
                case .completed:
                    onCompleted?()
                    disposable.dispose()
                }
            }

這里創(chuàng)建了一個(gè)AnonymousObserver, 也就是觀察者旗吁,創(chuàng)建時(shí)將閉包作為初始化參數(shù)踩萎,閉包中調(diào)用了第二個(gè)閉包onNext。所以第一個(gè)閉包和第三個(gè)閉包是如果關(guān)聯(lián)的呢阵漏?也就是訂閱的時(shí)候如何調(diào)用第一個(gè)閉包驻民,第一個(gè)閉包再調(diào)用到第三個(gè)閉包?這是整個(gè)流程的關(guān)鍵履怯?
我們?cè)偕蟼€(gè)流程圖看看整個(gè)訂閱發(fā)布的流程回还。


訂閱流程圖.jpg

根據(jù)這個(gè)圖再結(jié)合源碼一步一步分析下流程。

  1. 首先通過Observable的 create方法創(chuàng)建序列叹洲,ObservableType extension 提供了實(shí)現(xiàn)柠硕,而Observable遵循了ObservableType協(xié)議,可以看到代碼創(chuàng)建了AnonymousObservable(subscribe), 并把閉包傳進(jìn)入运提,而AnonymousObservable類內(nèi)部持有了這個(gè)閉包蝗柔。
public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
        return AnonymousObservable(subscribe)
    }

final private class AnonymousObservable<Element>: Producer<Element> {
    typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable

    let _subscribeHandler: SubscribeHandler

    init(_ subscribeHandler: @escaping SubscribeHandler) {
        self._subscribeHandler = subscribeHandler
    }
}
  1. 用返回的AnonymousObservable對(duì)象,調(diào)用subscribe方法民泵,這個(gè)方法在ObservableType 擴(kuò)展中癣丧,看一看subscribe主要代碼:
        let observer = AnonymousObserver<Element> { event in
     
                #if DEBUG
                    synchronizationTracker.register(synchronizationErrorMessage: .default)
                    defer { synchronizationTracker.unregister() }
                #endif
                
                switch event {
                case .next(let value):
                    onNext?(value)
                case .error(let error):
                    if let onError = onError {
                        onError(error)
                    }
                    else {
                        Hooks.defaultErrorHandler(callStack, error)
                    }
                    disposable.dispose()
                case .completed:
                    onCompleted?()
                    disposable.dispose()
                }
            }
            return Disposables.create(
                self.asObservable().subscribe(observer),
                disposable
            )

先包裝一個(gè)AnonymousObserver,即觀察者栈妆,把閉包作為初始化方法的參數(shù)傳進(jìn)去胁编,并賦值給它的_eventHandler屬性,跟AnonymousObservable比較類似鳞尔,不要與AnonymousObservable弄混了嬉橙。一個(gè)是觀察者,一個(gè)是被觀察者寥假。
閉包的代碼中根據(jù)事件類型調(diào)用 onNext 或 onError 或 onCompleted閉包市框,這個(gè)是最后一步的調(diào)用, 順便提一下處理.error 和 .completed 事件時(shí)會(huì)調(diào)用disposable.dispose()糕韧,說明這兩個(gè)事件發(fā)生后會(huì)取消訂閱枫振,回收資源喻圃,之后無法再發(fā)送序列了。

  1. 接著看return Disposables.create( self.asObservable().subscribe(observer), disposable )
    這里會(huì)創(chuàng)建Disposables蒋得,它傳入兩個(gè)disposeable级及,這里先不講dispose,看第一個(gè)參數(shù), 通過asObservable 調(diào)用subscribe额衙,并把剛剛創(chuàng)建的observer傳入饮焦,asObservable我們可以通過上面的類圖看到它是ObservableConvertibleType的協(xié)議,Observable 必然實(shí)現(xiàn)了這個(gè)協(xié)議窍侧,實(shí)際上是返回自身县踢。自身的類型其實(shí)是AnonymousObservable,所以就是通過self調(diào)用subscribe伟件,subscribe在父類Producer實(shí)現(xiàn)了硼啤。
  2. 接著看Producer 的subscribe代碼:
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        if !CurrentThreadScheduler.isScheduleRequired {
            // The returned disposable needs to release all references once it was disposed.
            let disposer = SinkDisposer()
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

            return disposer
        }
        else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

                return disposer
            }
        }
    }

CurrentThreadScheduler是線程派發(fā),以后再說線程派發(fā)斧账,調(diào)用schedule谴返,在schedule會(huì)執(zhí)行傳入的閉包,
image.png

所以最后會(huì)走到閉包中咧织,關(guān)注下閉包的代碼:調(diào)用了 self.run, 把參數(shù)observer 傳進(jìn)去嗓袱, observer是外面創(chuàng)建的AnonymousObserver對(duì)象,我們整個(gè)流程只會(huì)有一個(gè)AnonymousObserver和一個(gè)AnonymousObservable其他地方看到的observer都是傳進(jìn)去的习绢,所以看到observer簡(jiǎn)單想到是最開始創(chuàng)建的AnonymousObserver就行了渠抹。

  1. 現(xiàn)在到run方法了,AnonymousObservable實(shí)現(xiàn)了run方法:
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
  1. 這里比較關(guān)鍵:創(chuàng)建了AnonymousObservableSink對(duì)象闪萄,Sink翻譯做“業(yè)務(wù)下沉”梧却,表示這個(gè)類是管理者或者專門處理業(yè)務(wù)的。將observer和cancel傳進(jìn)去败去。observer還是通過調(diào)用鏈傳來的放航。接著調(diào)用AnonymousObservableSink的run方法:sink.run(self)
  2. 看一下AnonymousObservableSink的run方法:
    func run(_ parent: Parent) -> Disposable {
        return parent._subscribeHandler(AnyObserver(self))
    }

Parant 是 sink.run(self) 的self, self實(shí)際上是AnonymousObservable對(duì)象圆裕,就是唯一的被觀察者三椿,是用戶通過create創(chuàng)建的。

  1. 所以接下來調(diào)用了AnonymousObservable對(duì)象的_subscribeHandler葫辐,同時(shí)把AnyObserver(self)最為參數(shù)傳入,_subscribeHandler是個(gè)閉包伴郁,我們可以回到第1步看看耿战。_subscribeHandler是最開始 create 被觀察者傳入的閉包,所以到這一步才開始執(zhí)行第1步創(chuàng)建的閉包焊傅,這里還有個(gè)關(guān)鍵點(diǎn):AnyObserver(self), 創(chuàng)建了一個(gè)AnyObserver并把self作為參數(shù)傳入剂陡,self是 AnonymousObservableSink對(duì)象啊狈涮,記住了后面會(huì)比較繞。 打開AnyObserver看看這個(gè)初始化方法:
    /// Construct an instance whose `on(event)` calls `observer.on(event)`
    ///
    /// - parameter observer: Observer that receives sequence events.
    public init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
        self.observer = observer.on
    }

是這個(gè)沒問題鸭栖,通過類圖可以看到AnonymousObservableSink遵循了ObserverType歌馍,把on方法賦值給observer。

  1. self.observer = observer.on, 把AnonymousObservableSink的on 方法復(fù)制給observer 這里的observer可不是AnonymousObserver對(duì)象晕鹊,它是個(gè)閉包松却,類型是(Event<Element>) -> Void。誒溅话,我是誰晓锻!我在哪里!????飞几,我們回到執(zhí)行_subscribeHandler的地方砚哆。
  2. 第8步說到執(zhí)行_subscribeHandler,也就是我們最開始create Observable的閉包:
      let observable = Observable<Int>.create { (anyObserver) -> Disposable in
            anyObserver.onNext(1)
            anyObserver.onCompleted()
            return Disposables.create()
       }
  1. 當(dāng)代碼執(zhí)行anyObserver.onNext(1)屑墨, anyObserver是剛剛第8步創(chuàng)建的哦躁锁, AnyObserver沒有找到方法,它其實(shí)在ObserverType extension中(AnyObserver 遵循了ObserverType):
/// Convenience API extensions to provide alternate next, error, completed events
extension ObserverType {
    
    /// Convenience method equivalent to `on(.next(element: Element))`
    ///
    /// - parameter element: Next element to send to observer(s)
    public func onNext(_ element: Element) {
        self.on(.next(element))
    }

執(zhí)行了on方法

  1. AnyObserver有實(shí)現(xiàn)了 on 方法協(xié)議:
public func on(_ event: Event<Element>) {
        return self.observer(event)
    }
  1. 調(diào)用self.observer(event), 這個(gè)observer 在第9步(self.observer = observer.on)由AnonymousObservableSink.on 賦值的閉包卵史。所以解下來調(diào)用AnonymousObservableSink.on方法战转。

  2. 我們看看on方法實(shí)現(xiàn):

func on(_ event: Event<Element>) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        switch event {
        case .next:
            if load(self._isStopped) == 1 {
                return
            }
            self.forwardOn(event)
        case .error, .completed:
            if fetchOr(self._isStopped, 1) == 0 {
                self.forwardOn(event)
                self.dispose()
            }
        }
    }

我們看到AnonymousObservableSink 類的 run 方法和on 方法在整個(gè)流程中至關(guān)重要,run方法調(diào)用了訂閱的handler程腹,而on 方法處理了事件匣吊。這說明AnonymousObservableSink類處理主要業(yè)務(wù)邏輯,是整個(gè)流程的核心寸潦。

  1. 最終會(huì)調(diào)用self.forwardOn(event), 這個(gè)forwardOn是在父類Sink定義的色鸳,然后跳到forwardOn看看:
    final func forwardOn(_ event: Event<Observer.Element>) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        if isFlagSet(self._disposed, 1) {
            return
        }
        self._observer.on(event)
    }
  1. 然后調(diào)用 self._observer.on(event), 這個(gè)_observer 是在第6步創(chuàng)建AnonymousObservableSink對(duì)象時(shí)作為初始化參數(shù)賦值的。這個(gè)observer就是唯一的AnonymousObserver對(duì)象见转,還知道它是什么時(shí)候創(chuàng)建的嗎命雀?所以我們?nèi)nonymousObserver找下on方法,沒找到斩箫,去父類ObserverBase找到了:
    case .next:
            if load(self._isStopped) == 0 {
                self.onCore(event)
            }
  1. 接著調(diào)用onCore吏砂,在AnonymousObserver里:
    override func onCore(_ event: Event<Element>) {
        return self._eventHandler(event)
    }

終于繞出來了,調(diào)用self._eventHandler(event), _eventHandler 現(xiàn)在還記得是啥嗎乘客?啥時(shí)候被賦值的狐血?_eventHandler是在第2步創(chuàng)建AnonymousObserver時(shí)被賦值的。所以接著調(diào)用第2步的代碼:case .next(let value): onNext?(value) 調(diào)用onNext就是我們訂閱時(shí)傳入的閉包易核。最后打印1匈织, 希望大家能看明白,下一篇文章會(huì)解析dispose流程。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末缀匕,一起剝皮案震驚了整個(gè)濱河市纳决,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌乡小,老刑警劉巖阔加,帶你破解...
    沈念sama閱讀 221,576評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異满钟,居然都是意外死亡胜榔,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,515評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門零远,熙熙樓的掌柜王于貴愁眉苦臉地迎上來苗分,“玉大人,你說我怎么就攤上這事牵辣∷ぱⅲ” “怎么了?”我有些...
    開封第一講書人閱讀 168,017評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵纬向,是天一觀的道長(zhǎng)择浊。 經(jīng)常有香客問我,道長(zhǎng)逾条,這世上最難降的妖魔是什么琢岩? 我笑而不...
    開封第一講書人閱讀 59,626評(píng)論 1 296
  • 正文 為了忘掉前任,我火速辦了婚禮师脂,結(jié)果婚禮上担孔,老公的妹妹穿的比我還像新娘。我一直安慰自己吃警,他們只是感情好糕篇,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,625評(píng)論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著酌心,像睡著了一般拌消。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上安券,一...
    開封第一講書人閱讀 52,255評(píng)論 1 308
  • 那天墩崩,我揣著相機(jī)與錄音,去河邊找鬼侯勉。 笑死鹦筹,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的址貌。 我是一名探鬼主播盛龄,決...
    沈念sama閱讀 40,825評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了余舶?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,729評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤锹淌,失蹤者是張志新(化名)和其女友劉穎匿值,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體赂摆,經(jīng)...
    沈念sama閱讀 46,271評(píng)論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡挟憔,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,363評(píng)論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了烟号。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片绊谭。...
    茶點(diǎn)故事閱讀 40,498評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖汪拥,靈堂內(nèi)的尸體忽然破棺而出达传,到底是詐尸還是另有隱情,我是刑警寧澤迫筑,帶...
    沈念sama閱讀 36,183評(píng)論 5 350
  • 正文 年R本政府宣布宪赶,位于F島的核電站,受9級(jí)特大地震影響脯燃,放射性物質(zhì)發(fā)生泄漏搂妻。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,867評(píng)論 3 333
  • 文/蒙蒙 一辕棚、第九天 我趴在偏房一處隱蔽的房頂上張望欲主。 院中可真熱鬧,春花似錦逝嚎、人聲如沸扁瓢。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,338評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽涤妒。三九已至,卻和暖如春赚哗,著一層夾襖步出監(jiān)牢的瞬間她紫,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,458評(píng)論 1 272
  • 我被黑心中介騙來泰國(guó)打工屿储, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留贿讹,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,906評(píng)論 3 376
  • 正文 我出身青樓够掠,卻偏偏與公主長(zhǎng)得像民褂,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,507評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容