先來看一個例子:
print("當前測試方法中的線程:\(Thread.current)")
Observable.of(1,2,3,4,5,6,7,8,9,10)
.observeOn(SerialDispatchQueueScheduler.init(internalSerialQueueName: "observeOnSerial"))
.subscribe{print("observeOn",$0,Thread.current)}
.disposed(by: self.bag)
直接看打印結果:
當前測試方法中的線程:<NSThread: 0x600001232c40>{number = 1, name = main}
observeOn next(1) <NSThread: 0x600001250900>{number = 3, name = (null)}
observeOn next(2) <NSThread: 0x600001250900>{number = 3, name = (null)}
... /*中間因篇幅原因就省略了*/
observeOn next(10) <NSThread: 0x600001250900>{number = 3, name = (null)}
observeOn completed <NSThread: 0x600001250900>{number = 3, name = (null)}
上面寫法中比較奇特的 observeOn
以及上篇文章中也有提到 subscribeOn
到底做了什么, Scheduler
又是如何調(diào)度的呢?
Scheduler 流程解析
步驟1: cmd
+ 點擊 of
進去 , 返回了一個 ObservableSequence
的序列, 這個序列繼承自 Producer
.
public static func of(_ elements: Element ..., scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<Element> {
return ObservableSequence(elements: elements, scheduler: scheduler)
}
注意:
我們調(diào)用
.of
只傳了 1,2,3,4,5,6,7,8.. 也就是elements
參數(shù),那么第二個scheduler
參數(shù)則是上面of
方法中所賦的默認參數(shù)CurrentThreadScheduler.instance
(swift
語言特性 不多闡述)幔妨。 而這個CurrentThreadScheduler.instance
上篇文章RXSwift中Scheduler調(diào)度者本質(zhì)核心原理解析(一)中有講述误堡,就是主隊列锁施。也可以自己點進去看下。
那么意味著 原序列 ObservableSequence
的 Scheduler
肩狂,是主隊列傻谁。
步驟2: cmd
+ 點擊 observeOn
進去, 注意 進入到父類 ObservableType
的 observeOn
方法中.
public func observeOn(_ scheduler: ImmediateSchedulerType)
-> Observable<Element> {
if let scheduler = scheduler as? SerialDispatchQueueScheduler {
return ObserveOnSerialDispatchQueue(source: self.asObservable(), scheduler: scheduler)
}else {
return ObserveOn(source: self.asObservable(), scheduler: scheduler)
}
}
由于外界我們指定的 observeOn
的參數(shù), 顯然 我們走上面, 也就是返回了一個 ObserveOnSerialDispatchQueue
新序列, 并且這個序列同樣繼承與 Producer
.并且這個新產(chǎn)生的中間序列的 Scheduler
审磁,為用戶指定的岂座。
步驟3: 外界開始執(zhí)行 subscribe
, 而這里很明顯是針對ObserveOnSerialDispatchQueue
新序列進行訂閱. 上面提到這個新序列繼承自 Producer
那么我們直接來到子類的 run
方法中.
(關于這里的執(zhí)行順序如果還有問題,請仔細研究本系列 核心邏輯1 以及 核心邏輯2
)
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = ObserveOnSerialDispatchQueueSink(scheduler: self.scheduler, observer: observer, cancel: cancel)
let subscription = self.source.subscribe(sink)
return (sink: sink, subscription: subscription)
}
在這里, 我們看到了一步 和 .map
原理分析時同樣的操作. 也就是針對源序列進行訂閱. self.source.subscribe(sink)
钾恢,那么接下來 --基本操作鸳址,創(chuàng)建一個 sink
管道氯质,并將用戶指定的隊列傳遞過去。
步驟4: 當針對原序列訂閱之后拱礁,原序列的觀察者發(fā)送響應時 來到 ObservableSequence
的父類 Producer
的 subscribe
方法中呢灶。
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
if !CurrentThreadScheduler.isScheduleRequired { // 后續(xù)走這里
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
else { // 第一次走這里
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
第一次進來鸯乃,標識為 true
跋涣,走 CurrentThreadScheduler.instance.schedule(())
public func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
if CurrentThreadScheduler.isScheduleRequired {
CurrentThreadScheduler.isScheduleRequired = false
let disposable = action(state)
defer {
CurrentThreadScheduler.isScheduleRequired = true
CurrentThreadScheduler.queue = nil
}
guard let queue = CurrentThreadScheduler.queue else {
return disposable
}
///省略
return disposable
}
///省略
return scheduledItem
}
我們看到這一步 將標識置為 false
, 并調(diào)用傳遞進來的尾隨閉包 action(state)
奖年,那么回到上個方法 就會執(zhí)行 self.run(observer, cancel: disposer)
也就是會來到源序列的 run
方法沛贪。
步驟5: ObservableSequence
-> run
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == E {
let sink = ObservableSequenceSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
再過渡到 ObservableSequenceSink
-> run
func run() -> Disposable {
return self._parent._scheduler.scheduleRecursive(self._parent._elements.makeIterator()) { iterator, recurse in
var mutableIterator = iterator
if let next = mutableIterator.next() {
self.forwardOn(.next(next))
recurse(mutableIterator)
}
else {
self.forwardOn(.completed)
self.dispose()
}
}
}
cmd
+ 點擊 scheduleRecursive
,之前提到原序列的 _scheduler
是默認主隊列猩系,進入到對應方法中寇甸,ImmediateSchedulerType
-> scheduleRecursive
public func scheduleRecursive<State>(_ state: State, action: @escaping (_ state: State, _ recurse: (State) -> Void) -> Void) -> Disposable {
let recursiveScheduler = RecursiveImmediateScheduler(action: action, scheduler: self)
recursiveScheduler.schedule(state)
return Disposables.create(with: recursiveScheduler.dispose)
}
我們看到 創(chuàng)建了一個 RecursiveImmediateScheduler
的類幽纷, 保存了外部 self._parent._scheduler.scheduleRecursive
傳過來的閉包博敬, 并且調(diào)用了自己的 schedule
方法偏窝。
步驟6: RecursiveImmediateScheduler
-> schedule
方法
func schedule(_ state: State) {
var scheduleState: ScheduleState = .initial
let d = self._scheduler.schedule(state) { state -> Disposable in
//無關代碼已省略
let action = self._lock.calculateLocked { () -> Action? in
switch scheduleState {
case let .added(removeKey):
self._group.remove(for: removeKey)
case .initial:
break
case .done:
break
}
scheduleState = .done
return self._action
}
if let action = action {
action(state, self.schedule)
}
//...
return Disposables.create()
}
}
調(diào)用: self._scheduler.schedule
, 這里的 self._scheduler
為主隊列祭往, 那么調(diào)用主隊列的 schedule
方法硼补,也就是 SerialDispatchQueueScheduler
的 schedule
方法
過度方法省略已骇,直接來到
func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
let cancel = SingleAssignmentDisposable()
self.queue.async {
if cancel.isDisposed {
return
}
cancel.setDisposable(action(state))
}
return cancel
}
走到這里票编,異步執(zhí)行了 RecursiveImmediateScheduler
的閉包慧域,也就是下圖中選中的地方。
走到這里,我們已經(jīng)找到了
action(state, self.schedule)
方法調(diào)用互订,那么就繼續(xù)回調(diào)屁奏。
步驟7: ObservableSequenceSink 的回調(diào)
步驟6中
action
調(diào)用,即上圖選中部分犹撒,那么意味著調(diào)用了 self.forwardOn(.next(next))
點進去可以看到其實就是
self._observer.on(event)
粒褒,給原序列發(fā)送響應奕坟。而在中間序列訂閱原序列時, 響應者是 ObserveOnSerialDispatchQueueSink
回顧一下:
final private class ObserveOnSerialDispatchQueue<E>: Producer<E> {
let scheduler: SerialDispatchQueueScheduler
let source: Observable<E>
init(source: Observable<E>, scheduler: SerialDispatchQueueScheduler) {
self.scheduler = scheduler
self.source = source
}
override func run<O: ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == E {
let sink = ObserveOnSerialDispatchQueueSink(scheduler: self.scheduler, observer: observer, cancel: cancel)
let subscription = self.source.subscribe(sink)
return (sink: sink, subscription: subscription)
}
}
步驟8: ObserveOnSerialDispatchQueueSink
-> onCore
self.source.subscribe(sink)
這兒就很靈性了,訂閱原序列苛萎,但訂閱者是新產(chǎn)生的中間序列的sink腌歉,并且中間序列的調(diào)度環(huán)境 scheduler
就是用戶指定的隊列翘盖。
那么由于原序列訂閱者是 ObserveOnSerialDispatchQueueSink
馍驯, 接收到事件就來到了其 onCore
方法。
override func onCore(_ event: Event<E>) {
_ = self.scheduler.schedule((self, event), action: self.cachedScheduleLambda!)
}
繼續(xù)調(diào)度:
中間過渡方法列一下:
public final func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
return self.scheduleInternal(state, action: action)
}
func scheduleInternal<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
return self.configuration.schedule(state, action: action)
}
最終來到: 在 self.queue
中異步執(zhí)行任務, 調(diào)用 action(state)
func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
let cancel = SingleAssignmentDisposable()
self.queue.async {
if cancel.isDisposed {
return
}
cancel.setDisposable(action(state))
}
return cancel
}
加個斷點驗證一下:
打印結果證明菱父,這次
on(event)
是在我們規(guī)定的隊列執(zhí)行的浙宜。從而實現(xiàn)了 我們所寫代碼 observeOn
的目的
總結:
.observeOn
/.subscribeOn
函數(shù)時產(chǎn)生了新的中間序列蛹磺,而對新序列ObserveOnSerialDispatchQueue
進行訂閱時萤捆,ObserveOnSerialDispatchQueue
的run
方法中俗批,對源序列同樣進行訂閱岁忘,訂閱者為內(nèi)部類ObserveOnSerialDispatchQueueSink
其調(diào)度環(huán)境為用戶指定的隊列区匠。那么源序列發(fā)送event
時驰弄,ObserveOnSerialDispatchQueueSink
響應,在用戶指定的隊列中異步執(zhí)行任務。
這個流程走下來和 .map
函數(shù)很類似五鲫,其實這也是 RX
中很多操作函數(shù)的流程臣镣。 也就是針對原序列做相應操作時智亮,會產(chǎn)生一個新序列阔蛉,并對原序列進行訂閱癞埠,而訂閱者是具體中間類的sink苗踪,或者是自定義的 實現(xiàn)了 onCore方法的類,這個類保存了原序列的事件毕莱, 從而實現(xiàn) 類似 映射朋截、篩選吧黄、改變調(diào)度環(huán)境等操作拗慨。