RxSwift源碼分析(一)-核心邏輯解析

前言:這幾篇關于RxSwift源碼分析的文章主要是對源碼進行解析业踏,不涉及到RxSwift的具體使用鞍陨。具體使用可以查看RxSwift中文文檔或者github

在RxSwift的使用中,我們需要先創(chuàng)建序列乳乌,然后訂閱信號、發(fā)送信號來實現(xiàn)一個完整的流程市咆。先舉一個簡單的例子:

        // 1:創(chuàng)建序列
        let observable = Observable<Any>.create { (obserber) -> Disposable in
            // 3:發(fā)送信號
            obserber.onNext("發(fā)送信號")
            obserber.onCompleted()
//            obserber.onError(NSError.init(domain: "fail", code: 10087, userInfo: nil))
            return Disposables.create()
        }
        
        // 2:訂閱信號
        let _ = observable.subscribe(onNext: { (text) in
            print("訂閱到信號:\(text)")
        }, onError: { (error) in
            print("error: \(error)")
        }, onCompleted: {
            print("完成")
        }) {
            print("銷毀")
        }

我們需要重點關注的是:
1汉操、第一行代碼中的create的閉包什么時候執(zhí)行
2、訂閱信號subscribe的閉包什么時候執(zhí)行
3蒙兰、信號從發(fā)送出去到接收流程是怎樣的
下面針對這幾個問題來一一探索

第一步create函數(shù)

首先來看看create函數(shù)磷瘤,它返回一個AnonymousObservable對象,AnonymousObservable是匿名可觀察者,用來存儲產生事件的閉包(self._subscribeHandler = subscribeHandler)和激活處理事件閉包的入口(run方法)搜变。

public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
    return AnonymousObservable(subscribe)
}
final private class AnonymousObservable<Element>: Producer<Element> {
    typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable

    let _subscribeHandler: SubscribeHandler

    init(_ subscribeHandler: @escaping SubscribeHandler) {
        self._subscribeHandler = subscribeHandler
    }

    override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
}

AnonymousObservableObservable的子類采缚,它們的繼承關系是:AnonymousObservable -> Producer -> Observable -> ObservableType -> ObservableConvertibleType

第二步subscribe函數(shù)

源碼:

    public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
        -> Disposable {
            let disposable: Disposable
            
            if let disposed = onDisposed {
                disposable = Disposables.create(with: disposed)
            }
            else {
                disposable = Disposables.create()
            }
            
            #if DEBUG
                let synchronizationTracker = SynchronizationTracker()
            #endif
            
            let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
            
            let observer = AnonymousObserver<E> { event in
                
                #if DEBUG
                    synchronizationTracker.register(synchronizationErrorMessage: .default)
                    defer { synchronizationTracker.unregister() }
                #endif
                
                switch event {
                case .next(let value):
                    onNext?(value)
                case .error(let error):
                    if let onError = onError {
                        onError(error)
                    }
                    else {
                        Hooks.defaultErrorHandler(callStack, error)
                    }
                    disposable.dispose()
                case .completed:
                    onCompleted?()
                    disposable.dispose()
                }
            }
            return Disposables.create(
                self.asObservable().subscribe(observer),
                disposable
            )
    }

從上面的代碼可以看出,首先創(chuàng)建了一個AnonymousObserver對象挠他,在初始化時傳遞了一個閉包作為參數(shù)并且保存下來self._eventHandler = eventHandler扳抽。
AnonymousObserver是匿名觀察者,用于存儲和處理事件的閉包。

然后在最后有self.asObservable().subscribe(observer)這樣一行代碼殖侵,asObservable返回的是對象本身贸呢,然后調用subscribe這個函數(shù)并且把創(chuàng)建的AnonymousObserver對象傳遞過去,會來到AnonymousObservable這個類里面拢军,但是發(fā)現(xiàn)這個類里面沒有subscribe方法楞陷,我們往父類Producer里面找到這個方法,源碼如下:

    override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        if !CurrentThreadScheduler.isScheduleRequired {
            // The returned disposable needs to release all references once it was disposed.
            let disposer = SinkDisposer()
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

            return disposer
        }
        else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

                return disposer
            }
        }
    }

在這里調用了run方法茉唉,也就是AnonymousObservable這個類里面的run方法固蛾,把observer作為參數(shù)傳過來结执。

    override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }

run方法中,先創(chuàng)建一個AnonymousObservableSink對象并持有observer艾凯,然后調用這個對象的run方法把self傳遞過去昌犹,也就是把observable作為參數(shù)。
AnonymousObservableSink這個類將可觀察者Observable和觀察者Observer鏈接起來,實現(xiàn)事件的傳遞览芳,起到一個橋梁的作用斜姥。

    func run(_ parent: Parent) -> Disposable {
        return parent._subscribeHandler(AnyObserver(self))
    }

在這里觸發(fā)了_subscribeHandler的調用,這里的_subscribeHandler就是之前create函數(shù)的閉包沧竟。到這里就解決了第一個問題“第一行代碼中的create的閉包什么時候執(zhí)行”铸敏。
在這個方法中把self轉換成AnyObserver對象,也就是把AnonymousObservableSink對象轉換成AnyObserver對象悟泵。那么杈笔,具體是怎么轉換的呢?我們來看看源碼

public struct AnyObserver<Element> : ObserverType {
    /// The type of elements in sequence that observer can observe.
    public typealias E = Element
    
    /// Anonymous event handler type.
    public typealias EventHandler = (Event<Element>) -> Void

    private let observer: EventHandler

    /// Construct an instance whose `on(event)` calls `eventHandler(event)`
    ///
    /// - parameter eventHandler: Event handler that observes sequences events.
    public init(eventHandler: @escaping EventHandler) {
        self.observer = eventHandler
    }
    
    /// Construct an instance whose `on(event)` calls `observer.on(event)`
    ///
    /// - parameter observer: Observer that receives sequence events.
    public init<O : ObserverType>(_ observer: O) where O.E == Element {
        self.observer = observer.on
    }
    
    /// Send `event` to this observer.
    ///
    /// - parameter event: Event instance.
    public func on(_ event: Event<Element>) {
        return self.observer(event)
    }

    /// Erases type of observer and returns canonical observer.
    ///
    /// - returns: type erased observer.
    public func asObserver() -> AnyObserver<E> {
        return self
    }
}

可以看到在構造函數(shù)中有一行代碼self.observer = observer.on糕非,就是把AnonymousObservableSink類的on函數(shù)賦值給AnyObserver類的observer變量蒙具。從這里就可以明白為什么這行代碼observer.onNext("發(fā)送信號") 最終會觸發(fā)AnonymousObservableSink.on事件。

// AnonymousObservableSink類on方法
    func on(_ event: Event<E>) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        switch event {
        case .next:
            if load(self._isStopped) == 1 {
                return
            }
            self.forwardOn(event)
        case .error, .completed:
            if fetchOr(self._isStopped, 1) == 0 {
                self.forwardOn(event)
                self.dispose()
            }
        }
    }

然后進入到Sink類的forwardOn方法

    final func forwardOn(_ event: Event<O.E>) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        if isFlagSet(self._disposed, 1) {
            return
        }
        self._observer.on(event)
    }

這里的_observer就是第二步調用subscribe函數(shù)里面創(chuàng)建的observer對象朽肥。
會先進入到父類的ObserverBaseon方法

    func on(_ event: Event<E>) {
        switch event {
        case .next:
            if load(self._isStopped) == 0 {
                self.onCore(event)
            }
        case .error, .completed:
            if fetchOr(self._isStopped, 1) == 0 {
                self.onCore(event)
            }
        }
    }
    override func onCore(_ event: Event<Element>) {
        return self._eventHandler(event)
    }

這里的_eventHandler閉包就是第二步調用subscribe函數(shù)傳入的閉包禁筏。
到這里就可以理解第二個和第三個問題了。

總結

從整個流程來看衡招,我們可以體會到函數(shù)響應式編程的魅力篱昔,代碼簡介,思路非常的清晰始腾。

梳理一下整個流程:
RxSwift主要核心流程.png

tips:由于在RxSwift中廣泛的使用了協(xié)議和泛型州刽,還有很多抽象方法和同名方法,導致在剛開始閱讀源碼的時候可能有些不是很習慣浪箭,再加上Xcode跟蹤代碼不是很強大穗椅,閱讀起來有一些障礙。所以在閱讀源碼的時候需要耐心一點奶栖,不要找錯了地方匹表,可以找一些重點代碼閱讀,不要盲目的從頭看到尾驼抹。

有問題或者建議和意見桑孩,歡迎大家評論或者私信。
喜歡的朋友可以點下關注和喜歡框冀,后續(xù)會持續(xù)更新文章流椒。

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市明也,隨后出現(xiàn)的幾起案子宣虾,更是在濱河造成了極大的恐慌惯裕,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,843評論 6 502
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件绣硝,死亡現(xiàn)場離奇詭異蜻势,居然都是意外死亡,警方通過查閱死者的電腦和手機鹉胖,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,538評論 3 392
  • 文/潘曉璐 我一進店門握玛,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人甫菠,你說我怎么就攤上這事挠铲。” “怎么了寂诱?”我有些...
    開封第一講書人閱讀 163,187評論 0 353
  • 文/不壞的土叔 我叫張陵拂苹,是天一觀的道長。 經常有香客問我痰洒,道長瓢棒,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,264評論 1 292
  • 正文 為了忘掉前任丘喻,我火速辦了婚禮脯宿,結果婚禮上,老公的妹妹穿的比我還像新娘仓犬。我一直安慰自己嗅绰,他們只是感情好,可當我...
    茶點故事閱讀 67,289評論 6 390
  • 文/花漫 我一把揭開白布搀继。 她就那樣靜靜地躺著,像睡著了一般翠语。 火紅的嫁衣襯著肌膚如雪叽躯。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,231評論 1 299
  • 那天肌括,我揣著相機與錄音点骑,去河邊找鬼。 笑死谍夭,一個胖子當著我的面吹牛黑滴,可吹牛的內容都是我干的。 我是一名探鬼主播紧索,決...
    沈念sama閱讀 40,116評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼袁辈,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了珠漂?” 一聲冷哼從身側響起晚缩,我...
    開封第一講書人閱讀 38,945評論 0 275
  • 序言:老撾萬榮一對情侶失蹤尾膊,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后荞彼,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體冈敛,經...
    沈念sama閱讀 45,367評論 1 313
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,581評論 2 333
  • 正文 我和宋清朗相戀三年鸣皂,在試婚紗的時候發(fā)現(xiàn)自己被綠了抓谴。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,754評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡寞缝,死狀恐怖癌压,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情第租,我是刑警寧澤措拇,帶...
    沈念sama閱讀 35,458評論 5 344
  • 正文 年R本政府宣布,位于F島的核電站慎宾,受9級特大地震影響丐吓,放射性物質發(fā)生泄漏。R本人自食惡果不足惜趟据,卻給世界環(huán)境...
    茶點故事閱讀 41,068評論 3 327
  • 文/蒙蒙 一券犁、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧汹碱,春花似錦粘衬、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,692評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至跪腹,卻和暖如春褂删,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背冲茸。 一陣腳步聲響...
    開封第一講書人閱讀 32,842評論 1 269
  • 我被黑心中介騙來泰國打工屯阀, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人轴术。 一個月前我還...
    沈念sama閱讀 47,797評論 2 369
  • 正文 我出身青樓难衰,卻偏偏與公主長得像,于是被迫代替她去往敵國和親逗栽。 傳聞我的和親對象是個殘疾皇子盖袭,可洞房花燭夜當晚...
    茶點故事閱讀 44,654評論 2 354

推薦閱讀更多精彩內容