摘要
使用 subscribeOn
我們用 subscribeOn 來決定數(shù)據(jù)序列的構(gòu)建函數(shù)在哪個(gè) Scheduler 上運(yùn)行。以上例子中批狐,由于獲取 Data 需要花很長的時(shí)間,所以用 subscribeOn 切換到 后臺(tái) Scheduler 來獲取 Data。這樣可以避免主線程被阻塞。
使用 observeOn
我們用 observeOn 來決定在哪個(gè) Scheduler 監(jiān)聽這個(gè)數(shù)據(jù)序列晃虫。以上例子中,通過使用 observeOn 方法切換到主線程來監(jiān)聽并且處理結(jié)果扣墩。
一個(gè)比較典型的例子就是哲银,在后臺(tái)發(fā)起網(wǎng)絡(luò)請(qǐng)求,然后解析數(shù)據(jù)沮榜,最后在主線程刷新頁面盘榨。你就可以先用 subscribeOn 切到后臺(tái)去發(fā)送請(qǐng)求并解析數(shù)據(jù)喻粹,最后用 observeOn 切換到主線程更新頁面
結(jié)構(gòu)
在討論 SubscribeOn
與 ObserverOn
的區(qū)別之前蟆融,先來討論一下 Observable
從產(chǎn)生事件到響應(yīng)事件的結(jié)構(gòu),可以分為三部分:
- 產(chǎn)生事件:
Observable<Int>.create(...)
- 操作事件:
map()
,filter()
,zip()
等操作符的功能 - 響應(yīng)事件:
subscribe(onNext: ..., onCompleted: ..., onError: ...)
Observable<Int>.create { observer in
1 ..... 產(chǎn)生事件
}
.map { element -> String in
2 ...... 操作事件
}
.subscribe(onNext: { element in
3 ...... 響應(yīng)事件
}).disposed(by: disposeBag)
1. 默認(rèn)情況
在沒有使用 SubscribeOn
和 ObserverOn
明確指定在哪個(gè)隊(duì)列中調(diào)度時(shí)守呜,subscribe()
在哪個(gè)隊(duì)列被調(diào)用型酥,「產(chǎn)生事件」和「響應(yīng)事件」的方法就在哪個(gè)隊(duì)列中執(zhí)行
在「子線程」中調(diào)用 subscribe()
func invokeSubscribeInGlobalQueue() {
let observableInt = Observable<Int>.create { observer in
print("產(chǎn)生事件 -> \(self.currentQueueName() ?? "queue")")
observer.onNext(1)
return Disposables.create()
}
.map { element -> Int in
print("操作事件1 -> \(self.currentQueueName() ?? "queue")")
return element + 1
}
.map { element -> Int in
print("操作事件2 -> \(self.currentQueueName() ?? "queue")")
return element + 1
}
DispatchQueue.global().async {
print("調(diào)用 subscribe() 的隊(duì)列: \(self.currentQueueName() ?? "queue")")
observableInt.subscribe(onNext: { element in
print("響應(yīng)事件 -> \(self.currentQueueName() ?? "queue"), element -> \(element)\n\n")
}).disposed(by: self.disposeBag)
}
}
輸出結(jié)果:
調(diào)用 subscribe() 的隊(duì)列: com.apple.root.default-qos
產(chǎn)生事件 -> com.apple.root.default-qos
操作事件1 -> com.apple.root.default-qos
操作事件2 -> com.apple.root.default-qos
響應(yīng)事件 -> com.apple.root.default-qos, element -> 3
在「主線程」中調(diào)用 subscribe()
func invokeSubscribeInMainQueue() {
let observableInt = Observable<Int>.create { observer in
print("產(chǎn)生事件 -> \(self.currentQueueName() ?? "queue")")
observer.onNext(1)
return Disposables.create()
}
.map { element -> Int in
print("操作事件1 -> \(self.currentQueueName() ?? "queue")")
return element + 1
}
.map { element -> Int in
print("操作事件2 -> \(self.currentQueueName() ?? "queue")")
return element + 1
}
print("調(diào)用 subscribe() 的隊(duì)列: \(self.currentQueueName() ?? "queue")")
observableInt.subscribe(onNext: { element in
print("響應(yīng)事件 -> \(self.currentQueueName() ?? "queue"), element -> \(element)\n\n")
}).disposed(by: self.disposeBag)
}
輸出結(jié)果:
調(diào)用 subscribe() 的隊(duì)列: com.apple.main-thread
產(chǎn)生事件 -> com.apple.main-thread
操作事件1 -> com.apple.main-thread
操作事件2 -> com.apple.main-thread
響應(yīng)事件 -> com.apple.main-thread, element -> 3
2. 只使用 SubscribeOn 指定「產(chǎn)生事件」執(zhí)行的隊(duì)列
SubscribeOn
是向上和向下作用的,只使用subscribeOn
指定執(zhí)行的隊(duì)列之后查乒,「產(chǎn)生事件」弥喉、「操作事件」、「響應(yīng)事件」都將在指定的隊(duì)列中執(zhí)行玛迄。
func subscribeOn() {
Observable<Int>.create { observer in
print("產(chǎn)生事件 -> \(self.currentQueueName() ?? "queue")")
observer.onNext(1)
return Disposables.create()
}
.map { element -> Int in
print("操作事件1 -> \(self.currentQueueName() ?? "queue")")
return element + 1
}
.subscribeOn(SerialDispatchQueueScheduler(qos: .background))
.map { element -> Int in
print("操作事件2 -> \(self.currentQueueName() ?? "queue")")
return element + 1
}
.subscribe(onNext: { element in
print("響應(yīng)事件 -> \(self.currentQueueName() ?? "queue"), element -> \(element)\n\n")
}).disposed(by: self.disposeBag)
}
輸出結(jié)果:
產(chǎn)生事件 -> rx.global_dispatch_queue.serial
操作事件1 -> rx.global_dispatch_queue.serial
操作事件2 -> rx.global_dispatch_queue.serial
響應(yīng)事件 -> rx.global_dispatch_queue.serial, element -> 3
當(dāng)出現(xiàn)多個(gè) subscribeOn
的時(shí)候由境,只有第一個(gè)能起到作用。
func subscribeOn() {
Observable<Int>.create { observer in
print("產(chǎn)生事件 -> \(self.currentQueueName() ?? "queue")")
observer.onNext(1)
return Disposables.create()
}
.map { element -> Int in
print("操作事件1 -> \(self.currentQueueName() ?? "queue")")
return element + 1
}
.subscribeOn(SerialDispatchQueueScheduler(qos: .background))
.map { element -> Int in
print("操作事件2 -> \(self.currentQueueName() ?? "queue")")
return element + 1
}
.subscribeOn(MainScheduler.instance)
.subscribe(onNext: { element in
print("響應(yīng)事件 -> \(self.currentQueueName() ?? "queue"), element -> \(element)\n\n")
}).disposed(by: self.disposeBag)
}
輸出結(jié)果:
產(chǎn)生事件 -> rx.global_dispatch_queue.serial
操作事件1 -> rx.global_dispatch_queue.serial
操作事件2 -> rx.global_dispatch_queue.serial
響應(yīng)事件 -> rx.global_dispatch_queue.serial, element -> 3
3. 只使用 ObserverOn 指定「響應(yīng)事件」執(zhí)行的隊(duì)列
observarOn
是向下作用的, observarOn
可以指定其后面的「操作事件」和「響應(yīng)事件」執(zhí)行的隊(duì)列,可以使用多個(gè)observarOn
來改變不同的「操作事件」執(zhí)行的隊(duì)列
func observerOn() {
Observable<Int>.create { observer in
print("產(chǎn)生事件 -> \(self.currentQueueName() ?? "queue")")
observer.onNext(1)
return Disposables.create()
}
.map { element -> Int in
print("操作事件1 -> \(self.currentQueueName() ?? "queue")")
return element + 1
}
.observeOn(SerialDispatchQueueScheduler(qos: .background))
.map { element -> Int in
print("操作事件2 -> \(self.currentQueueName() ?? "queue")")
return element + 1
}
.observeOn(MainScheduler.instance)
.map { element -> Int in
print("操作事件3 -> \(self.currentQueueName() ?? "queue")")
return element + 1
}
.observeOn(SerialDispatchQueueScheduler(qos: .background))
.map { element -> Int in
print("操作事件4 -> \(self.currentQueueName() ?? "queue")")
return element + 1
}
.observeOn(MainScheduler.instance)
.observeOn(SerialDispatchQueueScheduler(qos: .background))
.subscribe(onNext: { element in
print("響應(yīng)事件 -> \(self.currentQueueName() ?? "queue"), element -> \(element) \n\n")
}).disposed(by: disposeBag)
}
輸出結(jié)果:
產(chǎn)生事件 -> com.apple.main-thread
操作事件1 -> com.apple.main-thread
操作事件2 -> rx.global_dispatch_queue.serial
操作事件3 -> com.apple.main-thread
操作事件4 -> rx.global_dispatch_queue.serial
響應(yīng)事件 -> rx.global_dispatch_queue.serial, element -> 5
4. 同時(shí)使用 SubscribeOn 和 ObserverOn
同時(shí)使用 SubscribeOn
和 ObserverOn
時(shí)虏杰〖ン。「產(chǎn)生事件」和 ObserveOn
之前的 「操作事件」將會(huì)在 SubscribeOn
指定的隊(duì)列中執(zhí)行。ObserveOn
之后的 「操作事件」和 「響應(yīng)事件」將會(huì)在 ObserveOn
指定的隊(duì)列中執(zhí)行纺阔。
SubscribeOn
和ObserveOn
在代碼中出現(xiàn)的順序不會(huì)影響上述的邏輯
func subscribeOnAndObserveOn() {
Observable<Int>.create { observer in
print("產(chǎn)生事件 -> \(self.currentQueueName() ?? "queue")")
observer.onNext(1)
return Disposables.create()
}
.map { element -> Int in
print("操作事件1 -> \(self.currentQueueName() ?? "queue")")
return element + 1
}
.observeOn(MainScheduler.instance)
.subscribeOn(SerialDispatchQueueScheduler(qos: .background))
.map { element -> Int in
print("操作事件2 -> \(self.currentQueueName() ?? "queue")")
return element + 1
}
.subscribe(onNext: { element in
print("響應(yīng)事件 -> \(self.currentQueueName() ?? "queue"), element -> \(element)\n\n")
}).disposed(by: self.disposeBag)
}
輸出結(jié)果:
產(chǎn)生事件 -> rx.global_dispatch_queue.serial
操作事件1 -> rx.global_dispatch_queue.serial
操作事件2 -> com.apple.main-thread
響應(yīng)事件 -> com.apple.main-thread, element -> 3