RxSwift-Scheduler

Scheduler簡介

Scheduler,又稱調(diào)度者遂唧,有一下分類:

  • CurrentThreadScheduler - 當(dāng)前線程
  • MainScheduler - 表示主線程,當(dāng)需要執(zhí)行一些UI相關(guān)任務(wù)時欧穴,需要切換到該Scheduler下執(zhí)行
  • SerialDispatchQueueScheduler - 封裝了GCD的串行隊列洼畅,需要執(zhí)行一些串行任務(wù)可以切換到這個Scheduler下運行
  • ConcurrentDispatchQueueScheduler - 封裝了GCD的并行隊列佳鳖,需要執(zhí)行一些并行任務(wù)時霍殴,可以切換到這個Scheduler下執(zhí)行
  • OperationQueueScheduler - 封裝了NSOperationQueue

源碼解析observeOn-ConcurrentDispatchQueueScheduler

使用

let ob = Observable.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
ob.observeOn(ConcurrentDispatchQueueScheduler.init(qos: .background))
    .subscribe{
        print("observeOn-ConcurrentDispatchQueueScheduler:\($0)-",Thread.current)
    }.disposed(by: disposeBag)

打印

observeOn-ConcurrentDispatchQueueScheduler:next(1)- <NSThread: 0x6000022cd9c0>{number = 3, name = (null)}
observeOn-ConcurrentDispatchQueueScheduler:next(2)- <NSThread: 0x6000022cd9c0>{number = 3, name = (null)}
observeOn-ConcurrentDispatchQueueScheduler:next(3)- <NSThread: 0x6000022cd9c0>{number = 3, name = (null)}
observeOn-ConcurrentDispatchQueueScheduler:next(4)- <NSThread: 0x6000022cd9c0>{number = 3, name = (null)}
observeOn-ConcurrentDispatchQueueScheduler:next(5)- <NSThread: 0x6000022ecdc0>{number = 4, name = (null)}
observeOn-ConcurrentDispatchQueueScheduler:next(6)- <NSThread: 0x6000022ecdc0>{number = 4, name = (null)}
observeOn-ConcurrentDispatchQueueScheduler:next(7)- <NSThread: 0x6000022ecdc0>{number = 4, name = (null)}
observeOn-ConcurrentDispatchQueueScheduler:next(8)- <NSThread: 0x6000022ecdc0>{number = 4, name = (null)}
observeOn-ConcurrentDispatchQueueScheduler:next(9)- <NSThread: 0x6000022ecdc0>{number = 4, name = (null)}
observeOn-ConcurrentDispatchQueueScheduler:next(10)- <NSThread: 0x6000022ecdc0>{number = 4, name = (null)}
observeOn-ConcurrentDispatchQueueScheduler:completed- <NSThread: 0x6000022ecdc0>{number = 4, name = (null)}
####源碼
//ObserveOn.swift
extension ObservableType {
    public func observeOn(_ scheduler: ImmediateSchedulerType)
        -> Observable<Element> {
            if let scheduler = scheduler as? SerialDispatchQueueScheduler {
                return ObserveOnSerialDispatchQueue(source: self.asObservable(), scheduler: scheduler)
            }
            else {
                return ObserveOn(source: self.asObservable(), scheduler: scheduler)
            }
    }
}

創(chuàng)建ObserveOn內(nèi)部類

final private class ObserveOn<Element>: Producer<Element> {
    let scheduler: ImmediateSchedulerType
    let source: Observable<Element>

    init(source: Observable<Element>, scheduler: ImmediateSchedulerType) {
        self.scheduler = scheduler
        self.source = source

#if TRACE_RESOURCES
        _ = Resources.incrementTotal()
#endif
    }

    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = ObserveOnSink(scheduler: self.scheduler, observer: observer, cancel: cancel)
        let subscription = self.source.subscribe(sink)
        return (sink: sink, subscription: subscription)
    }

#if TRACE_RESOURCES
    deinit {
        _ = Resources.decrementTotal()
    }
#endif
}

ObserveOn這個類繼承Producer,所以當(dāng)序列被訂閱時必定執(zhí)行其run方法

final private class ObserveOnSink<Observer: ObserverType>: ObserverBase<Observer.Element> {
    typealias Element = Observer.Element 

    let _scheduler: ImmediateSchedulerType

    var _lock = SpinLock()
    let _observer: Observer

    // state
    var _state = ObserveOnState.stopped
    var _queue = Queue<Event<Element>>(capacity: 10)

    let _scheduleDisposable = SerialDisposable()
    let _cancel: Cancelable

    init(scheduler: ImmediateSchedulerType, observer: Observer, cancel: Cancelable) {
        self._scheduler = scheduler
        self._observer = observer
        self._cancel = cancel
    }

    override func onCore(_ event: Event<Element>) {
        let shouldStart = self._lock.calculateLocked { () -> Bool in
            self._queue.enqueue(event)

            switch self._state {
            case .stopped:
                self._state = .running
                return true
            case .running:
                return false
            }
        }

        if shouldStart {
            self._scheduleDisposable.disposable = self._scheduler.scheduleRecursive((), action: self.run)
        }
    }

    func run(_ state: (), _ recurse: (()) -> Void) {
        let (nextEvent, observer) = self._lock.calculateLocked { () -> (Event<Element>?, Observer) in
            if !self._queue.isEmpty {
                return (self._queue.dequeue(), self._observer)
            }
            else {
                self._state = .stopped
                return (nil, self._observer)
            }
        }

        if let nextEvent = nextEvent, !self._cancel.isDisposed {
            observer.on(nextEvent)
            if nextEvent.isStopEvent {
                self.dispose()
            }
        }
        else {
            return
        }

        let shouldContinue = self._shouldContinue_synchronized()

        if shouldContinue {
            recurse(())
        }
    }

    func _shouldContinue_synchronized() -> Bool {
        self._lock.lock(); defer { self._lock.unlock() } // {
            if !self._queue.isEmpty {
                return true
            }
            else {
                self._state = .stopped
                return false
            }
        // }
    }

    override func dispose() {
        super.dispose()

        self._cancel.dispose()
        self._scheduleDisposable.dispose()
    }
}
extension ImmediateSchedulerType {
    /**
    Schedules an action to be executed recursively.
    
    - parameter state: State passed to the action to be executed.
    - parameter action: Action to execute recursively. The last parameter passed to the action is used to trigger recursive scheduling of the action, passing in recursive invocation state.
    - returns: The disposable object used to cancel the scheduled action (best effort).
    */
    public func scheduleRecursive<State>(_ state: State, action: @escaping (_ state: State, _ recurse: (State) -> Void) -> Void) -> Disposable {
        let recursiveScheduler = RecursiveImmediateScheduler(action: action, scheduler: self)
        
        recursiveScheduler.schedule(state)
        
        return Disposables.create(with: recursiveScheduler.dispose)
    }
}

思維導(dǎo)圖腋颠,這個分析到一半感覺有點不對勁繁成,后續(xù)再研究


observeOn
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市淑玫,隨后出現(xiàn)的幾起案子巾腕,更是在濱河造成了極大的恐慌,老刑警劉巖絮蒿,帶你破解...
    沈念sama閱讀 221,820評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件尊搬,死亡現(xiàn)場離奇詭異,居然都是意外死亡土涝,警方通過查閱死者的電腦和手機佛寿,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,648評論 3 399
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來但壮,“玉大人冀泻,你說我怎么就攤上這事±” “怎么了弹渔?”我有些...
    開封第一講書人閱讀 168,324評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長溯祸。 經(jīng)常有香客問我肢专,道長舞肆,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,714評論 1 297
  • 正文 為了忘掉前任博杖,我火速辦了婚禮椿胯,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘剃根。我一直安慰自己哩盲,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 68,724評論 6 397
  • 文/花漫 我一把揭開白布狈醉。 她就那樣靜靜地躺著种冬,像睡著了一般。 火紅的嫁衣襯著肌膚如雪舔糖。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,328評論 1 310
  • 那天莺匠,我揣著相機與錄音金吗,去河邊找鬼。 笑死趣竣,一個胖子當(dāng)著我的面吹牛摇庙,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播遥缕,決...
    沈念sama閱讀 40,897評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼卫袒,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了单匣?” 一聲冷哼從身側(cè)響起夕凝,我...
    開封第一講書人閱讀 39,804評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎户秤,沒想到半個月后码秉,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,345評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡鸡号,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,431評論 3 340
  • 正文 我和宋清朗相戀三年转砖,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片鲸伴。...
    茶點故事閱讀 40,561評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡府蔗,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出汞窗,到底是詐尸還是另有隱情姓赤,我是刑警寧澤,帶...
    沈念sama閱讀 36,238評論 5 350
  • 正文 年R本政府宣布杉辙,位于F島的核電站模捂,受9級特大地震影響捶朵,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜狂男,卻給世界環(huán)境...
    茶點故事閱讀 41,928評論 3 334
  • 文/蒙蒙 一综看、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧岖食,春花似錦红碑、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,417評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至蔑穴,卻和暖如春忠寻,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背存和。 一陣腳步聲響...
    開封第一講書人閱讀 33,528評論 1 272
  • 我被黑心中介騙來泰國打工奕剃, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人捐腿。 一個月前我還...
    沈念sama閱讀 48,983評論 3 376
  • 正文 我出身青樓纵朋,卻偏偏與公主長得像,于是被迫代替她去往敵國和親茄袖。 傳聞我的和親對象是個殘疾皇子操软,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,573評論 2 359

推薦閱讀更多精彩內(nèi)容