RxSwift 核心原理解析

RxSwift 核心原理解析

角色定位

  • 觀察者(Observer)
  • 被觀察者(Observable)
  • 訂閱者(Subscriber) 事件的最終處理者
  • 管道(Sink) Observer 和 Observable 溝通的橋梁

Simple Example

Observable<String>.create { observer -> Disposable in
  observer.onNext("hello")
  return Disposables.create()
}
.subscribe { event in
  print(event.element)
}

我們重點(diǎn)關(guān)注:

  • create 閉包什么時(shí)候執(zhí)行型宝,
  • subscribe 閉包又是什么時(shí)候執(zhí)行的

Observable 繼承體系

Observable.png

Observable的核心函數(shù):

  1. subscribe 訂閱操作, ObservableObserver 通過(guò)訂閱建立聯(lián)系,
    Observable 好比水源, Observer 好比水龍頭(永遠(yuǎn)開(kāi)著的水龍頭)絮爷, 訂閱的過(guò)程就是在Observable 和 Observer 之間建立管道趴酣, 一旦建立管道即是永久性的,只要水源有誰(shuí)坑夯, 水龍頭就會(huì)有水岖寞。
  2. run 對(duì)用戶(hù)不可見(jiàn),隱藏了大量的實(shí)現(xiàn)細(xì)節(jié)柜蜈, 這個(gè)函數(shù)就是建立水管的過(guò)程
  3. asObservable: 這個(gè)協(xié)議的存在使得Observable 的定義變得更加廣泛慎璧。
    示例:
    /// GroupedObservable Class

    /// Converts `self` to `Observable` sequence. 
    public func asObservable() -> Observable<Element> {
        return source
    }

ObservableType 協(xié)議擴(kuò)展

extension ObservableType {
    /**
     Subscribes an event handler to an observable sequence.
     
     - parameter on: Action to invoke for each event in the observable sequence.
     - returns: Subscription object used to unsubscribe from the observable sequence.
     */
    public func subscribe(_ on: @escaping (Event<E>) -> Void)
        -> Disposable {
            let observer = AnonymousObserver { e in
                on(e)
            }
            return self.asObservable().subscribe(observer)
    }
}

  • 這里的GroupedObservable, 本身無(wú)法被直接訂閱,但是它的屬性source可以被訂閱跨释, 于是只需要在asObservable 函數(shù)中返回source, 即可被訂閱胸私, 這樣的設(shè)計(jì)可以更自由使用組合方式,產(chǎn)生新的類(lèi)型的Observable
  • ObservableType: 擴(kuò)展 subscribe 方法鳖谈, 確保所有的Observable行為一致岁疼, 都是經(jīng)由self.asObservable() 獲取Observable

Observer 繼承體系

ObserverType.png

Observer 核心函數(shù)

  1. on: 事件發(fā)生器
  2. onCore: 對(duì)用戶(hù)不可見(jiàn),隱藏具體實(shí)現(xiàn)細(xì)節(jié)
Sink.png

Sink 核心函數(shù)

  1. forwardOn
  2. on
  3. run

代碼分析

先看看Observable.create 函數(shù)

/// static Observable create method
    public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
        return AnonymousObservable(subscribe)
    }

該函數(shù)返回一個(gè)AnonymousObservable實(shí)例缆娃,這是因?yàn)?code>Observable是一個(gè)抽象類(lèi)捷绒,需要具體的實(shí)類(lèi)實(shí)現(xiàn)功能,而AnonymousObservable是最簡(jiǎn)單的Observable類(lèi)

// AnonymousObservable  Class
final fileprivate class AnonymousObservable<Element> : Producer<Element> {
    typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable

    let _subscribeHandler: SubscribeHandler

    init(_ subscribeHandler: @escaping SubscribeHandler) {
        _subscribeHandler = subscribeHandler
    }

    override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
}

首先說(shuō)一下繼承體系: AnonymousObservable -> Producer -> Observable -> ObservableType -> ObservableConvertibleType
每一層都只處理一點(diǎn)點(diǎn)事情贯要,剩下的交給下一層處理

ObservableConvertibleType: 完全的抽象

ObservableType: 處理subscribe

Observable: 處理 asObservable

Producer: 重載subscribe

AnonymousObservable: 處理run
也就是說(shuō)如果說(shuō)如果我們要自定義一個(gè)Observable的話(huà)暖侨,通常只需要繼承Producer, 并實(shí)現(xiàn)run方法崇渗。AnonymousObservable做的事情也不多字逗,實(shí)現(xiàn)run方法,作為create閉包的持有者宅广。

run方法涉及另外一個(gè)類(lèi)AnonymousObservableSink,Sink作為Observer 和 Observable的銜接的橋梁葫掉,之前還在想為什么叫做ObservableSink,現(xiàn)在想明白了跟狱。Sink本身遵守ObseverType協(xié)議俭厚,與此同時(shí)實(shí)現(xiàn)了run方法,雖然沒(méi)有實(shí)現(xiàn)subscribe方法驶臊,但是已經(jīng)足夠了挪挤,這樣sink從某種程度來(lái)說(shuō)也是Observable叼丑,通過(guò)sink就可以完成從Observable到Obsever的轉(zhuǎn)變。

subscribe:

   public func subscribe(_ on: @escaping (Event<E>) -> Void)
        -> Disposable {
            let observer = AnonymousObserver { e in
                on(e)
            }
            return self.asObservable().subscribe(observer)
    }

這里不太明白為什么一定要寫(xiě)成這樣:

 let observer = AnonymousObserver { e in
                on(e)
            }

感覺(jué)直接寫(xiě)成let observer = AnonymousObserver(on) 會(huì)更加清晰扛门。AnonymousObserver 比較簡(jiǎn)單持有subscribe閉包鸠信,并且實(shí)現(xiàn)onCore method.

剛才的只是個(gè)入口,這個(gè)才是核心

// Producer Class
  override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        if !CurrentThreadScheduler.isScheduleRequired {
            // The returned disposable needs to release all references once it was disposed.
            let disposer = SinkDisposer()
            let sinkAndSubscription = run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

            return disposer
        }
        else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

                return disposer
            }
        }
    }


這個(gè)if 和 else 不用管這個(gè)是線(xiàn)程安全相關(guān)的尖飞,不是我關(guān)注的重點(diǎn)症副。這里返回的是 Disposable , Disposable 跟對(duì)象的生命周期密切相關(guān)店雅。暫且在這打個(gè)tag吧政基。

  1. branch_1: 線(xiàn)程安全
  2. branch_2: 生命周期管理
  3. branch_3: 核心邏輯實(shí)現(xiàn)細(xì)節(jié)

好了我們現(xiàn)在關(guān)注的是核心邏輯實(shí)現(xiàn)細(xì)節(jié),其他有時(shí)間再補(bǔ)上闹啦。核心語(yǔ)句
let sinkAndSubscription = self.run(observer, cancel: disposer)
我們來(lái)看看run的邏輯

// AnonymousObservable Class
    override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }

之前提到過(guò)AnonymousObservableSink,注意Sink是持有Observer的沮明,從這也可以看出來(lái)Observerable的 run方法觸發(fā)Sink的run方法,接下來(lái)就要關(guān)注AnonymousObservableSink方法窍奋。


    func run(_ parent: Parent) -> Disposable {
        return parent._subscribeHandler(AnyObserver(self))
    }


哇荐健,終于觸發(fā)subscribeHandler了,這里的subscribeHandler就是之前最開(kāi)始的閉包琳袄。

 Observable<String>.create { observer -> Disposable in
            observer.onNext("hello")
            return Disposables.create()
        }

也就是create閉包江场,真的是千辛萬(wàn)苦才跟蹤到這了,那么現(xiàn)在解決了第一個(gè)問(wèn)題: create閉包是什么執(zhí)行的
現(xiàn)在就要關(guān)注這個(gè)閉包執(zhí)行后會(huì)帶來(lái)什么后果窖逗?
目光轉(zhuǎn)到實(shí)體類(lèi)AnyObserver,


public struct AnyObserver<Element> : ObserverType {
    /// The type of elements in sequence that observer can observe.
    public typealias E = Element
    
    /// Anonymous event handler type.
    public typealias EventHandler = (Event<Element>) -> Void

    private let observer: EventHandler

    /// Construct an instance whose `on(event)` calls `eventHandler(event)`
    ///
    /// - parameter eventHandler: Event handler that observes sequences events.
    public init(eventHandler: @escaping EventHandler) {
        self.observer = eventHandler
    }
    
    /// Construct an instance whose `on(event)` calls `observer.on(event)`
    ///
    /// - parameter observer: Observer that receives sequence events.
    public init<O : ObserverType>(_ observer: O) where O.E == Element {
        self.observer = observer.on
    }
    
    /// Send `event` to this observer.
    ///
    /// - parameter event: Event instance.
    public func on(_ event: Event<Element>) {
        return self.observer(event)
    }

    /// Erases type of observer and returns canonical observer.
    ///
    /// - returns: type erased observer.
    public func asObserver() -> AnyObserver<E> {
        return self
    }
}


構(gòu)造函數(shù)的入?yún)⑹荅ventHandle,結(jié)合上下文 AnyObserver.observer = AnonymousObservableSink.on,那么這下就明白了 observer.onNext("hello") 最終會(huì)觸發(fā)AnonymousObservableSink.on事件

// AnonymousObservableSink.on

    func on(_ event: Event<E>) {
        #if DEBUG
            _synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { _synchronizationTracker.unregister() }
        #endif
        switch event {
        case .next:
            if _isStopped == 1 {
                return
            }
            forwardOn(event)
        case .error, .completed:
            if AtomicCompareAndSwap(0, 1, &_isStopped) {
                forwardOn(event)
                dispose()
            }
        }
    }

老規(guī)矩忽略線(xiàn)程安全問(wèn)題址否,關(guān)注核心問(wèn)題

// Sink.forwardOn
    final func forwardOn(_ event: Event<O.E>) {
        #if DEBUG
            _synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { _synchronizationTracker.unregister() }
        #endif
        if _disposed {
            return
        }
        _observer.on(event)
    }


還記得_observer 的實(shí)體是啥,沒(méi)關(guān)系幫你回憶一下:

   public func subscribe(_ on: @escaping (Event<E>) -> Void)
        -> Disposable {
            let observer = AnonymousObserver { e in
                on(e)
            return self.asObservable().subscribe(observer)
    }

我們最初傳進(jìn)來(lái)的subscribe閉包的實(shí)體類(lèi)碎紊,理解了這個(gè)接下來(lái)就簡(jiǎn)單了

// ObserverBase.on
    func on(_ event: Event<E>) {
        switch event {
        case .next:
            if _isStopped == 0 {
                onCore(event)
            }
        case .error, .completed:
            if AtomicCompareAndSwap(0, 1, &_isStopped) {
                onCore(event)
            }
        }
    }

// AnonymousObserver.onCore
    override func onCore(_ event: Event<Element>) {
        return _eventHandler(event)
    }

這里的_eventHandler 就是最初傳進(jìn)來(lái)的訂閱閉包即:

            .subscribe { event in
               print(event.element)
        }

好了到此為止了佑附,總結(jié)一下:
來(lái)個(gè)堆棧看的清晰些:


Rx subscribe源碼分析.png

再來(lái)一個(gè)類(lèi)圖


Compose Diagram.png

稍微解釋一下整個(gè)過(guò)程分為兩個(gè)階段:

  1. Obsevable 構(gòu)建階段仗考,這里使用create構(gòu)造方法構(gòu)造Obsevable音同,還有其他各種各樣的構(gòu)造方法,這里不一一贅述秃嗜。
  2. subscribe 階段权均, Obsevable.subscribe ---> Obsevable.run ----> Sink.run ----> AnyObserver.On ----> Sink.on ----> Observer.On ---> Observer.OnCore ----> eventHandler

這個(gè)過(guò)程也可以分為多個(gè)階段:

Obsevable.subscribe ---> Obsevable.run ----> Sink.run 這個(gè)過(guò)程我理解為build階段,通過(guò)Sink建立Obsevable和 Observer的聯(lián)系锅锨,在這個(gè)階段Sink扮演還是Obsevable角色螺句。

AnyObserver.On ----> Sink.on ----> Observer.On ---> Observer.OnCore 這個(gè)階段的話(huà)Sink扮演的就是 Observer 角色

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市橡类,隨后出現(xiàn)的幾起案子蛇尚,更是在濱河造成了極大的恐慌,老刑警劉巖顾画,帶你破解...
    沈念sama閱讀 206,311評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件取劫,死亡現(xiàn)場(chǎng)離奇詭異匆笤,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)谱邪,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,339評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門(mén)炮捧,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人惦银,你說(shuō)我怎么就攤上這事咆课。” “怎么了扯俱?”我有些...
    開(kāi)封第一講書(shū)人閱讀 152,671評(píng)論 0 342
  • 文/不壞的土叔 我叫張陵书蚪,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我迅栅,道長(zhǎng)殊校,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,252評(píng)論 1 279
  • 正文 為了忘掉前任读存,我火速辦了婚禮为流,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘让簿。我一直安慰自己敬察,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,253評(píng)論 5 371
  • 文/花漫 我一把揭開(kāi)白布尔当。 她就那樣靜靜地躺著莲祸,像睡著了一般。 火紅的嫁衣襯著肌膚如雪居凶。 梳的紋絲不亂的頭發(fā)上虫给,一...
    開(kāi)封第一講書(shū)人閱讀 49,031評(píng)論 1 285
  • 那天,我揣著相機(jī)與錄音侠碧,去河邊找鬼抹估。 笑死,一個(gè)胖子當(dāng)著我的面吹牛弄兜,可吹牛的內(nèi)容都是我干的药蜻。 我是一名探鬼主播,決...
    沈念sama閱讀 38,340評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼替饿,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼语泽!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起视卢,我...
    開(kāi)封第一講書(shū)人閱讀 36,973評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤踱卵,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體惋砂,經(jīng)...
    沈念sama閱讀 43,466評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡妒挎,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,937評(píng)論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了西饵。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片酝掩。...
    茶點(diǎn)故事閱讀 38,039評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖眷柔,靈堂內(nèi)的尸體忽然破棺而出期虾,到底是詐尸還是另有隱情,我是刑警寧澤驯嘱,帶...
    沈念sama閱讀 33,701評(píng)論 4 323
  • 正文 年R本政府宣布镶苞,位于F島的核電站,受9級(jí)特大地震影響宙拉,放射性物質(zhì)發(fā)生泄漏宾尚。R本人自食惡果不足惜丙笋,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,254評(píng)論 3 307
  • 文/蒙蒙 一谢澈、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧御板,春花似錦锥忿、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,259評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至笙各,卻和暖如春钉答,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背杈抢。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,485評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工数尿, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人惶楼。 一個(gè)月前我還...
    沈念sama閱讀 45,497評(píng)論 2 354
  • 正文 我出身青樓右蹦,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親歼捐。 傳聞我的和親對(duì)象是個(gè)殘疾皇子何陆,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,786評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容

  • 轉(zhuǎn)一篇文章 原地址:http://gank.io/post/560e15be2dca930e00da1083 前言...
    jack_hong閱讀 905評(píng)論 0 2
  • 我從去年開(kāi)始使用 RxJava ,到現(xiàn)在一年多了豹储。今年加入了 Flipboard 后贷盲,看到 Flipboard 的...
    Jason_andy閱讀 5,456評(píng)論 7 62
  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)剥扣,斷路器巩剖,智...
    卡卡羅2017閱讀 134,599評(píng)論 18 139
  • 《金谷園圖》軸,清吃引,華喦繪筹陵,紙本設(shè)色,縱178.7厘米镊尺,橫94.4厘米朦佩,上海博物館藏。 華嵒《金谷園圖》取材于西晉...
    陽(yáng)陽(yáng)說(shuō)畫(huà)閱讀 398評(píng)論 0 3
  • 愁庐氮,那女孩兒怎么越來(lái)越帥了o(>﹏<)o
    呆毛默默閱讀 89評(píng)論 0 0