RxSwift 源碼之 CurrentThreadScheduler

先來看看核心代碼subscribe


    override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        if !CurrentThreadScheduler.isScheduleRequired {
            // The returned disposable needs to release all references once it was disposed.
            let disposer = SinkDisposer()
            let sinkAndSubscription = run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

            return disposer
        }
        else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

                return disposer
            }
        }
    }


其實(shí)看代碼是有技巧的暮蹂,我一看到這個(gè)代碼我就知道這個(gè)if..else是跟線程安全有關(guān)的,如果你重點(diǎn)關(guān)注線程安全的話牧抽,那么你就要仔細(xì)看線程安全的邏輯鞠评,如果你關(guān)注的是實(shí)現(xiàn)原理那么你看的核心實(shí)現(xiàn)邏輯,看代碼的時(shí)候要選擇性的忽略掉一些細(xì)節(jié)胸懈,集中精力放在你關(guān)注的問題上面。我現(xiàn)在重點(diǎn)關(guān)注的是schedule 邏輯恰响。

public class CurrentThreadScheduler : ImmediateSchedulerType {
    typealias ScheduleQueue = RxMutableBox<Queue<ScheduledItemType>>

    /// The singleton instance of the current thread scheduler.
    public static let instance = CurrentThreadScheduler()

    private static var isScheduleRequiredKey: pthread_key_t = { () -> pthread_key_t in
        let key = UnsafeMutablePointer<pthread_key_t>.allocate(capacity: 1)
        defer {
            key.deallocate(capacity: 1)
        }
                                                               
        guard pthread_key_create(key, nil) == 0 else {
            rxFatalError("isScheduleRequired key creation failed")
        }

        return key.pointee
    }()

    private static var scheduleInProgressSentinel: UnsafeRawPointer = { () -> UnsafeRawPointer in
        return UnsafeRawPointer(UnsafeMutablePointer<Int>.allocate(capacity: 1))
    }()

    static var queue : ScheduleQueue? {
        get {
            return Thread.getThreadLocalStorageValueForKey(CurrentThreadSchedulerQueueKey.instance)
        }
        set {
            Thread.setThreadLocalStorageValue(newValue, forKey: CurrentThreadSchedulerQueueKey.instance)
        }
    }

    /// Gets a value that indicates whether the caller must call a `schedule` method.
    public static fileprivate(set) var isScheduleRequired: Bool {
        get {
            return pthread_getspecific(CurrentThreadScheduler.isScheduleRequiredKey) == nil
        }
        set(isScheduleRequired) {
            if pthread_setspecific(CurrentThreadScheduler.isScheduleRequiredKey, isScheduleRequired ? nil : scheduleInProgressSentinel) != 0 {
                rxFatalError("pthread_setspecific failed")
            }
        }
    }

    /**
    Schedules an action to be executed as soon as possible on current thread.

    If this method is called on some thread that doesn't have `CurrentThreadScheduler` installed, scheduler will be
    automatically installed and uninstalled after all work is performed.

    - parameter state: State passed to the action to be executed.
    - parameter action: Action to be executed.
    - returns: The disposable object used to cancel the scheduled action (best effort).
    */
    public func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
        if CurrentThreadScheduler.isScheduleRequired {
            CurrentThreadScheduler.isScheduleRequired = false

            let disposable = action(state)

            defer {
                CurrentThreadScheduler.isScheduleRequired = true
                CurrentThreadScheduler.queue = nil
            }

            guard let queue = CurrentThreadScheduler.queue else {
                return disposable
            }

            while let latest = queue.value.dequeue() {
                if latest.isDisposed {
                    continue
                }
                latest.invoke()
            }

            return disposable
        }

        let existingQueue = CurrentThreadScheduler.queue

        let queue: RxMutableBox<Queue<ScheduledItemType>>
        if let existingQueue = existingQueue {
            queue = existingQueue
        }
        else {
            queue = RxMutableBox(Queue<ScheduledItemType>(capacity: 1))
            CurrentThreadScheduler.queue = queue
        }

        let scheduledItem = ScheduledItem(action: action, state: state)
        queue.value.enqueue(scheduledItem)

        return scheduledItem
    }
}

代碼有點(diǎn)長(zhǎng)趣钱,拆開一點(diǎn)點(diǎn)看


typealias ScheduleQueue = RxMutableBox<Queue<ScheduledItemType>>

Box:你可以理解為黑盒,里面裝著實(shí)物胚宦,做這個(gè)封裝是為了忽視細(xì)節(jié)首有。打個(gè)比方對(duì)于貨車司機(jī)來說他才不管你具體裝的什么貨呢?他只關(guān)心目的地在哪枢劝,只有發(fā)貨方和收貨方才需要關(guān)注具體貨物是什么井联,這樣做一層抽象有利于中間層的運(yùn)輸。類似的就要系統(tǒng)的Optional類型
Queue: 是一個(gè)可擴(kuò)展循環(huán)隊(duì)列您旁,遵循先進(jìn)先出原則烙常,add和remove算法復(fù)雜度都為O(1),可根據(jù)元素?cái)?shù)量自適應(yīng)大小,通過設(shè)置游標(biāo)達(dá)到訪問元素的目的鹤盒,具體參見Queue.Swift


    private static var isScheduleRequiredKey: pthread_key_t = { () -> pthread_key_t in
        let key = UnsafeMutablePointer<pthread_key_t>.allocate(capacity: 1)
        defer {
            key.deallocate(capacity: 1)
        }
                                                               
        guard pthread_key_create(key, nil) == 0 else {
            rxFatalError("isScheduleRequired key creation failed")
        }

        return key.pointee
    }()


這是一個(gè)底層api蚕脏,為線程開辟一個(gè)私有空間存儲(chǔ)線程數(shù)據(jù)
相關(guān)文章: pthread_key_create


    static var queue : ScheduleQueue? {
        get {
            return Thread.getThreadLocalStorageValueForKey(CurrentThreadSchedulerQueueKey.instance)
        }
        set {
            Thread.setThreadLocalStorageValue(newValue, forKey: CurrentThreadSchedulerQueueKey.instance)
        }
    }

與上面的功能類似,不同的是使用的NSThread的API,相對(duì)上面的api要高級(jí)一點(diǎn)侦锯,為當(dāng)前線程開辟一片空間用來存儲(chǔ)queue類型數(shù)據(jù)

  public static fileprivate(set) var isScheduleRequired: Bool {
        get {
            return pthread_getspecific(CurrentThreadScheduler.isScheduleRequiredKey) == nil
        }
        set(isScheduleRequired) {
            if pthread_setspecific(CurrentThreadScheduler.isScheduleRequiredKey, isScheduleRequired ? nil : scheduleInProgressSentinel) != 0 {
                rxFatalError("pthread_setspecific failed")
            }
        }
    }

這個(gè)比較簡(jiǎn)單設(shè)置isScheduleRequired {get , set}屬性注意這里的數(shù)據(jù)都是針對(duì)線程的不同于以往的數(shù)據(jù)屬于對(duì)象驼鞭。

    public func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
        if CurrentThreadScheduler.isScheduleRequired {
            CurrentThreadScheduler.isScheduleRequired = false

            let disposable = action(state)

            defer {
                CurrentThreadScheduler.isScheduleRequired = true
                CurrentThreadScheduler.queue = nil
            }

            guard let queue = CurrentThreadScheduler.queue else {
                return disposable
            }

            while let latest = queue.value.dequeue() {
                if latest.isDisposed {
                    continue
                }
                latest.invoke()
            }

            return disposable
        }

        let existingQueue = CurrentThreadScheduler.queue

        let queue: RxMutableBox<Queue<ScheduledItemType>>
        if let existingQueue = existingQueue {
            queue = existingQueue
        }
        else {
            queue = RxMutableBox(Queue<ScheduledItemType>(capacity: 1))
            CurrentThreadScheduler.queue = queue
        }

        let scheduledItem = ScheduledItem(action: action, state: state)
        queue.value.enqueue(scheduledItem)

        return scheduledItem
    }


這個(gè)是核心代碼schedule, 通過設(shè)置flag判斷當(dāng)前是否有task在執(zhí)行尺碰,如果有則將task加入到待處理的隊(duì)列中去挣棕。當(dāng)線程處理完當(dāng)前task后汇竭,會(huì)檢測(cè)是否還有為執(zhí)行的task,然后依次執(zhí)行為執(zhí)行的task

官方測(cè)試代碼

    func testCurrentThreadScheduler_basicScenario() {

        XCTAssertTrue(CurrentThreadScheduler.isScheduleRequired)

        var messages = [Int]()
        _ = CurrentThreadScheduler.instance.schedule(()) { s in
            messages.append(1)
            _ = CurrentThreadScheduler.instance.schedule(()) { s in
                messages.append(3)
                _ = CurrentThreadScheduler.instance.schedule(()) {
                    messages.append(5)
                    return Disposables.create()
                }
                messages.append(4)
                return Disposables.create()
            }
            messages.append(2)
            return Disposables.create()
        }

        XCTAssertEqual(messages, [1, 2, 3, 4, 5])
    }

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末穴张,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子两曼,更是在濱河造成了極大的恐慌皂甘,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,470評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件悼凑,死亡現(xiàn)場(chǎng)離奇詭異偿枕,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)户辫,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,393評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門渐夸,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人渔欢,你說我怎么就攤上這事墓塌。” “怎么了奥额?”我有些...
    開封第一講書人閱讀 162,577評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵苫幢,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我垫挨,道長(zhǎng)韩肝,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,176評(píng)論 1 292
  • 正文 為了忘掉前任九榔,我火速辦了婚禮哀峻,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘哲泊。我一直安慰自己剩蟀,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,189評(píng)論 6 388
  • 文/花漫 我一把揭開白布攻旦。 她就那樣靜靜地躺著喻旷,像睡著了一般。 火紅的嫁衣襯著肌膚如雪牢屋。 梳的紋絲不亂的頭發(fā)上且预,一...
    開封第一講書人閱讀 51,155評(píng)論 1 299
  • 那天,我揣著相機(jī)與錄音烙无,去河邊找鬼锋谐。 笑死,一個(gè)胖子當(dāng)著我的面吹牛截酷,可吹牛的內(nèi)容都是我干的涮拗。 我是一名探鬼主播,決...
    沈念sama閱讀 40,041評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼三热!你這毒婦竟也來了鼓择?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,903評(píng)論 0 274
  • 序言:老撾萬榮一對(duì)情侶失蹤就漾,失蹤者是張志新(化名)和其女友劉穎呐能,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體抑堡,經(jīng)...
    沈念sama閱讀 45,319評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡摆出,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,539評(píng)論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了首妖。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片偎漫。...
    茶點(diǎn)故事閱讀 39,703評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖有缆,靈堂內(nèi)的尸體忽然破棺而出象踊,到底是詐尸還是另有隱情,我是刑警寧澤妒貌,帶...
    沈念sama閱讀 35,417評(píng)論 5 343
  • 正文 年R本政府宣布通危,位于F島的核電站,受9級(jí)特大地震影響灌曙,放射性物質(zhì)發(fā)生泄漏菊碟。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,013評(píng)論 3 325
  • 文/蒙蒙 一在刺、第九天 我趴在偏房一處隱蔽的房頂上張望逆害。 院中可真熱鬧,春花似錦蚣驼、人聲如沸魄幕。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,664評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽纯陨。三九已至,卻和暖如春留储,著一層夾襖步出監(jiān)牢的瞬間翼抠,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,818評(píng)論 1 269
  • 我被黑心中介騙來泰國(guó)打工获讳, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留阴颖,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,711評(píng)論 2 368
  • 正文 我出身青樓丐膝,卻偏偏與公主長(zhǎng)得像量愧,于是被迫代替她去往敵國(guó)和親钾菊。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,601評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容