RXSwift中Scheduler調(diào)度者核心原理解析(二)

先來看一個例子:

print("當前測試方法中的線程:\(Thread.current)")

Observable.of(1,2,3,4,5,6,7,8,9,10)
           .observeOn(SerialDispatchQueueScheduler.init(internalSerialQueueName: "observeOnSerial"))
           .subscribe{print("observeOn",$0,Thread.current)}
           .disposed(by: self.bag)

直接看打印結果:

當前測試方法中的線程:<NSThread: 0x600001232c40>{number = 1, name = main}
observeOn next(1) <NSThread: 0x600001250900>{number = 3, name = (null)}
observeOn next(2) <NSThread: 0x600001250900>{number = 3, name = (null)}
... /*中間因篇幅原因就省略了*/
observeOn next(10) <NSThread: 0x600001250900>{number = 3, name = (null)}
observeOn completed <NSThread: 0x600001250900>{number = 3, name = (null)}

上面寫法中比較奇特的 observeOn 以及上篇文章中也有提到 subscribeOn 到底做了什么, Scheduler 又是如何調(diào)度的呢?

Scheduler 流程解析

步驟1: cmd + 點擊 of 進去 , 返回了一個 ObservableSequence 的序列, 這個序列繼承自 Producer .

public static func of(_ elements: Element ..., scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<Element> {
   return ObservableSequence(elements: elements, scheduler: scheduler)
}

注意:

我們調(diào)用 .of 只傳了 1,2,3,4,5,6,7,8.. 也就是 elements 參數(shù),那么第二個 scheduler 參數(shù)則是上面 of方法中所賦的默認參數(shù) CurrentThreadScheduler.instance ( swift 語言特性 不多闡述)幔妨。 而這個 CurrentThreadScheduler.instance 上篇文章RXSwift中Scheduler調(diào)度者本質(zhì)核心原理解析(一)中有講述误堡,就是主隊列锁施。也可以自己點進去看下。

那么意味著 原序列 ObservableSequenceScheduler肩狂,是主隊列傻谁。

步驟2: cmd + 點擊 observeOn 進去, 注意 進入到父類 ObservableTypeobserveOn 方法中.

public func observeOn(_ scheduler: ImmediateSchedulerType)
        -> Observable<Element> {
     if let scheduler = scheduler as? SerialDispatchQueueScheduler {
         return ObserveOnSerialDispatchQueue(source: self.asObservable(), scheduler: scheduler)
     }else {
         return ObserveOn(source: self.asObservable(), scheduler: scheduler)
     }
}

由于外界我們指定的 observeOn 的參數(shù), 顯然 我們走上面, 也就是返回了一個 ObserveOnSerialDispatchQueue 新序列, 并且這個序列同樣繼承與 Producer .并且這個新產(chǎn)生的中間序列的 Scheduler审磁,為用戶指定的岂座。

步驟3: 外界開始執(zhí)行 subscribe, 而這里很明顯是針對ObserveOnSerialDispatchQueue 新序列進行訂閱. 上面提到這個新序列繼承自 Producer 那么我們直接來到子類的 run 方法中.
(關于這里的執(zhí)行順序如果還有問題,請仔細研究本系列 核心邏輯1 以及 核心邏輯2
)

override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
    let sink = ObserveOnSerialDispatchQueueSink(scheduler: self.scheduler, observer: observer, cancel: cancel)
    let subscription = self.source.subscribe(sink)
    return (sink: sink, subscription: subscription)
}

在這里, 我們看到了一步 和 .map 原理分析時同樣的操作. 也就是針對源序列進行訂閱. self.source.subscribe(sink) 钾恢,那么接下來 --基本操作鸳址,創(chuàng)建一個 sink 管道氯质,并將用戶指定的隊列傳遞過去。

步驟4: 當針對原序列訂閱之后拱礁,原序列的觀察者發(fā)送響應時 來到 ObservableSequence 的父類 Producersubscribe 方法中呢灶。

override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
    if !CurrentThreadScheduler.isScheduleRequired {   // 后續(xù)走這里
        let disposer = SinkDisposer()
        let sinkAndSubscription = self.run(observer, cancel: disposer)
        disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
        return disposer
    }
    else {    // 第一次走這里
        return CurrentThreadScheduler.instance.schedule(()) { _ in
            let disposer = SinkDisposer()
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
            return disposer
        }
    }
}

第一次進來鸯乃,標識為 true 跋涣,走 CurrentThreadScheduler.instance.schedule(())

public func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
    if CurrentThreadScheduler.isScheduleRequired {
        CurrentThreadScheduler.isScheduleRequired = false

        let disposable = action(state)

        defer {
            CurrentThreadScheduler.isScheduleRequired = true
            CurrentThreadScheduler.queue = nil
        }

        guard let queue = CurrentThreadScheduler.queue else {
            return disposable
        }
    ///省略
        return disposable
    }
    ///省略
    return scheduledItem
}

我們看到這一步 將標識置為 false , 并調(diào)用傳遞進來的尾隨閉包 action(state) 奖年,那么回到上個方法 就會執(zhí)行 self.run(observer, cancel: disposer) 也就是會來到源序列的 run 方法沛贪。

步驟5: ObservableSequence -> run

override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == E {
    let sink = ObservableSequenceSink(parent: self, observer: observer, cancel: cancel)
    let subscription = sink.run()
    return (sink: sink, subscription: subscription)
}

再過渡到 ObservableSequenceSink -> run

func run() -> Disposable {
    return self._parent._scheduler.scheduleRecursive(self._parent._elements.makeIterator()) { iterator, recurse in
        var mutableIterator = iterator
        if let next = mutableIterator.next() {
            self.forwardOn(.next(next))
            recurse(mutableIterator)
        }
        else {
            self.forwardOn(.completed)
            self.dispose()
        }
    }
}

cmd + 點擊 scheduleRecursive,之前提到原序列的 _scheduler 是默認主隊列猩系,進入到對應方法中寇甸,ImmediateSchedulerType -> scheduleRecursive

public func scheduleRecursive<State>(_ state: State, action: @escaping (_ state: State, _ recurse: (State) -> Void) -> Void) -> Disposable {
    let recursiveScheduler = RecursiveImmediateScheduler(action: action, scheduler: self)
    
    recursiveScheduler.schedule(state)
    
    return Disposables.create(with: recursiveScheduler.dispose)
}

我們看到 創(chuàng)建了一個 RecursiveImmediateScheduler 的類幽纷, 保存了外部 self._parent._scheduler.scheduleRecursive 傳過來的閉包博敬, 并且調(diào)用了自己的 schedule 方法偏窝。

步驟6: RecursiveImmediateScheduler -> schedule方法

func schedule(_ state: State) {
    var scheduleState: ScheduleState = .initial

    let d = self._scheduler.schedule(state) { state -> Disposable in
        //無關代碼已省略
        let action = self._lock.calculateLocked { () -> Action? in
            switch scheduleState {
            case let .added(removeKey):
                self._group.remove(for: removeKey)
            case .initial:
                break
            case .done:
                break
            }

            scheduleState = .done

            return self._action
        }
        
        if let action = action {
            action(state, self.schedule)
        }
        //...
        return Disposables.create()
    }

}

調(diào)用: self._scheduler.schedule , 這里的 self._scheduler 為主隊列祭往, 那么調(diào)用主隊列的 schedule 方法硼补,也就是 SerialDispatchQueueSchedulerschedule 方法


過度方法省略已骇,直接來到

func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
    let cancel = SingleAssignmentDisposable()

    self.queue.async {
        if cancel.isDisposed {
            return
        }


        cancel.setDisposable(action(state))
    }

    return cancel
}

走到這里票编,異步執(zhí)行了 RecursiveImmediateScheduler 的閉包慧域,也就是下圖中選中的地方。


走到這里,我們已經(jīng)找到了 action(state, self.schedule) 方法調(diào)用互订,那么就繼續(xù)回調(diào)屁奏。

步驟7: ObservableSequenceSink 的回調(diào)


步驟6中 action 調(diào)用,即上圖選中部分犹撒,那么意味著調(diào)用了 self.forwardOn(.next(next))
點進去可以看到其實就是 self._observer.on(event) 粒褒,給原序列發(fā)送響應奕坟。而在中間序列訂閱原序列時, 響應者是 ObserveOnSerialDispatchQueueSink
回顧一下:

final private class ObserveOnSerialDispatchQueue<E>: Producer<E> {
    let scheduler: SerialDispatchQueueScheduler
    let source: Observable<E>

    init(source: Observable<E>, scheduler: SerialDispatchQueueScheduler) {
        self.scheduler = scheduler
        self.source = source
    }

    override func run<O: ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == E {
        let sink = ObserveOnSerialDispatchQueueSink(scheduler: self.scheduler, observer: observer, cancel: cancel)
        let subscription = self.source.subscribe(sink)   
        return (sink: sink, subscription: subscription)
    }

}

步驟8: ObserveOnSerialDispatchQueueSink -> onCore

self.source.subscribe(sink) 這兒就很靈性了,訂閱原序列苛萎,但訂閱者是新產(chǎn)生的中間序列的sink腌歉,并且中間序列的調(diào)度環(huán)境 scheduler 就是用戶指定的隊列翘盖。
那么由于原序列訂閱者是 ObserveOnSerialDispatchQueueSink 馍驯, 接收到事件就來到了其 onCore 方法。

override func onCore(_ event: Event<E>) {
    _ = self.scheduler.schedule((self, event), action: self.cachedScheduleLambda!)
}

繼續(xù)調(diào)度:
中間過渡方法列一下:

public final func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
    return self.scheduleInternal(state, action: action)
}
func scheduleInternal<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
    return self.configuration.schedule(state, action: action)
}

最終來到: 在 self.queue 中異步執(zhí)行任務, 調(diào)用 action(state)

func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
    let cancel = SingleAssignmentDisposable()

    self.queue.async {
        if cancel.isDisposed {
            return
        }


        cancel.setDisposable(action(state))
    }

    return cancel
}

加個斷點驗證一下:


打印結果證明菱父,這次 on(event) 是在我們規(guī)定的隊列執(zhí)行的浙宜。從而實現(xiàn)了 我們所寫代碼 observeOn 的目的

總結:

.observeOn / .subscribeOn 函數(shù)時產(chǎn)生了新的中間序列蛹磺,而對新序列 ObserveOnSerialDispatchQueue 進行訂閱時萤捆,ObserveOnSerialDispatchQueuerun 方法中俗批,對源序列同樣進行訂閱岁忘,訂閱者為內(nèi)部類ObserveOnSerialDispatchQueueSink 其調(diào)度環(huán)境為用戶指定的隊列区匠。那么源序列發(fā)送 event 時驰弄, ObserveOnSerialDispatchQueueSink 響應,在用戶指定的隊列中異步執(zhí)行任務。

這個流程走下來和 .map 函數(shù)很類似五鲫,其實這也是 RX 中很多操作函數(shù)的流程臣镣。 也就是針對原序列做相應操作時智亮,會產(chǎn)生一個新序列阔蛉,并對原序列進行訂閱癞埠,而訂閱者是具體中間類的sink苗踪,或者是自定義的 實現(xiàn)了 onCore方法的類,這個類保存了原序列的事件毕莱, 從而實現(xiàn) 類似 映射朋截、篩選吧黄、改變調(diào)度環(huán)境等操作拗慨。

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市剧蹂,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌减噪,老刑警劉巖车吹,帶你破解...
    沈念sama閱讀 218,204評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件朝卒,死亡現(xiàn)場離奇詭異乐埠,居然都是意外死亡,警方通過查閱死者的電腦和手機瑞眼,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,091評論 3 395
  • 文/潘曉璐 我一進店門伤疙,熙熙樓的掌柜王于貴愁眉苦臉地迎上來徒像,“玉大人蛙讥,你說我怎么就攤上這事次慢。” “怎么了拭抬?”我有些...
    開封第一講書人閱讀 164,548評論 0 354
  • 文/不壞的土叔 我叫張陵造虎,是天一觀的道長算凿。 經(jīng)常有香客問我份蝴,道長,這世上最難降的妖魔是什么氓轰? 我笑而不...
    開封第一講書人閱讀 58,657評論 1 293
  • 正文 為了忘掉前任婚夫,我火速辦了婚禮,結果婚禮上署鸡,老公的妹妹穿的比我還像新娘案糙。我一直安慰自己,他們只是感情好靴庆,可當我...
    茶點故事閱讀 67,689評論 6 392
  • 文/花漫 我一把揭開白布时捌。 她就那樣靜靜地躺著,像睡著了一般炉抒。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上焰薄,一...
    開封第一講書人閱讀 51,554評論 1 305
  • 那天拿诸,我揣著相機與錄音,去河邊找鬼塞茅。 笑死亩码,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的野瘦。 我是一名探鬼主播描沟,決...
    沈念sama閱讀 40,302評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼缅刽!你這毒婦竟也來了?” 一聲冷哼從身側響起蠢络,我...
    開封第一講書人閱讀 39,216評論 0 276
  • 序言:老撾萬榮一對情侶失蹤衰猛,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后刹孔,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體啡省,經(jīng)...
    沈念sama閱讀 45,661評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,851評論 3 336
  • 正文 我和宋清朗相戀三年髓霞,在試婚紗的時候發(fā)現(xiàn)自己被綠了卦睹。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,977評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡方库,死狀恐怖结序,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情纵潦,我是刑警寧澤徐鹤,帶...
    沈念sama閱讀 35,697評論 5 347
  • 正文 年R本政府宣布垃环,位于F島的核電站,受9級特大地震影響返敬,放射性物質(zhì)發(fā)生泄漏遂庄。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,306評論 3 330
  • 文/蒙蒙 一劲赠、第九天 我趴在偏房一處隱蔽的房頂上張望涛目。 院中可真熱鬧,春花似錦凛澎、人聲如沸霹肝。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,898評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽阿迈。三九已至,卻和暖如春轧叽,著一層夾襖步出監(jiān)牢的瞬間苗沧,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,019評論 1 270
  • 我被黑心中介騙來泰國打工炭晒, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留待逞,地道東北人。 一個月前我還...
    沈念sama閱讀 48,138評論 3 370
  • 正文 我出身青樓网严,卻偏偏與公主長得像识樱,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子震束,可洞房花燭夜當晚...
    茶點故事閱讀 44,927評論 2 355

推薦閱讀更多精彩內(nèi)容