今天記錄下在使用RxJava時(shí)關(guān)于線程調(diào)度方面的知識(shí)疆虚。
以下是今天的學(xué)習(xí)目錄
- 基本使用
- 線程調(diào)度的整體流程
- 關(guān)鍵類及方法說明
- 總結(jié)
基本使用
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
Log.e(TAG, "被觀察者---發(fā)送事件'Hello World!'-- Thread:" + Thread.currentThread().getName());
e.onNext("Hello World!");
e.onComplete();
}
}).subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.single())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {}
@Override
public void onNext(String value) {
Log.e(TAG, "觀察者---onNext---Value:" + value + "---Thread:" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {}
@Override
public void onComplete() {
Log.e(TAG, "觀察者---onComplete---Thread:" + Thread.currentThread().getName());
}
});
}
log:
被觀察者---發(fā)送事件'Hello World!'-- Thread:RxNewThreadScheduler-1
觀察者---onNext---Value:Hello World!---Thread:RxSingleScheduler-1
觀察者---onComplete---Thread:RxSingleScheduler-1
通過log我們發(fā)現(xiàn):
- 通過“消息發(fā)射器-Emitter”發(fā)送消息時(shí)序芦,當(dāng)前所處的線程是以“RxNewThreadScheduler”開頭的工作線程雳殊,而非 UI線程捣作。
- 在“Observer”接受消息時(shí)费变,當(dāng)前所處的線程是以“RxSingleScheduler”開頭的工作線程析恢,也是非 UI線程。
那么問題來了
1:RxJava是如何實(shí)現(xiàn)線程調(diào)度的振亮?
2:這樣做有什么好處?
帶著以上問題鞭莽,我們來分析下“RxJava中線程調(diào)度的內(nèi)部實(shí)現(xiàn)”吧坊秸。
線程調(diào)度的整體流程
創(chuàng)建“Observable”的過程
1:通過Observable.create()創(chuàng)建類型為“ObservableCreate”的“被觀察者”并把“ObservableOnSubscribe(事件容器)”保存至變量“source”中。
2:通過Observabl.subscribeOn()創(chuàng)建類型為“ObservableSubscribeOn”的“被觀察者”澎怒。把“ObservableCreate”保存至變量“source”中褒搔。把“Scheduler”保存至“scheduler”變量中。
3:通過Observabl.observeOn()創(chuàng)建類型為“ObservableObserveOn”的“被觀察者”。把“ObservableSubscribeOn”保存至變量“source”中星瘾。把“Scheduler”保存至“scheduler”變量中走孽。
通過上述3步之后,到此“Observable”的創(chuàng)建過程完畢琳状,此時(shí)可能有人會(huì)問“Scheduler”是什么磕瓷?它的作用又是什么呢?先不要著急念逞,接來下我們通過分析“Observable的事件傳遞過程”來繼續(xù)分析吧困食。
“Observable”的事件傳遞過程
說明:關(guān)于Observable的subscribeActual()方法在上一節(jié)已經(jīng)介紹過了。該方法是Obaservable實(shí)際訂閱動(dòng)作的發(fā)生地翎承,并且此方法為Obaservable的抽象方法每個(gè)子類必須實(shí)現(xiàn)陷舅,再者調(diào)用Observable的subscribe()其內(nèi)部實(shí)際就是調(diào)用了subscribeActual()去完成“事件”發(fā)送之前的種種事情,例如:調(diào)用Observer的onSubscribe()詢問是否需要通過事件控制開關(guān)關(guān)閉事件的傳遞审洞,然后再回調(diào)ObservableOnSubscribe的subscribe()回調(diào)至事件產(chǎn)生處莱睁。所以我們必須要了解該方法究竟做了什么。
通過上面的分析芒澜,我們知道最后創(chuàng)建的“Observable”是“ObservableObserveOn”仰剿,我們先看看它的“subscribeActual()”方法都做了什么吧。
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
通過查看此方法我們發(fā)現(xiàn)痴晦,映入眼簾的是一個(gè)對“scheduler”的if--else判斷南吮。此時(shí)我們就有必要分析下:Scheduler是什么?它的作用是什么誊酌?
Scheduler:線程調(diào)度者
以上是關(guān)于Scheduler的部分類結(jié)構(gòu)圖部凑。結(jié)合“第一節(jié)-基本使用”我們了解到,要是使用Scheduler基本都是通過“Schedulers”來完成碧浊。
Schedulers:創(chuàng)建線程調(diào)度者實(shí)例的靜態(tài)工廠
從“Schedulers”的類說明“Static factory methods for returning standard Scheduler instances.”以及其中相應(yīng)方法的實(shí)現(xiàn)涂邀,可以知道,“Schedulers是專門生產(chǎn)Scheduler實(shí)例的”箱锐。例如:通過 io()可以返回IoScheduler實(shí)例比勉,通過newThread()可以獲得NewThreadScheduler實(shí)例等等。
接下來我們就debug下代碼來看看驹止,Scheduler到底是什么浩聋?它的運(yùn)行機(jī)制又是怎樣的吧。
1:進(jìn)入ObservableSubscribeOn.subscribeActual()查看運(yùn)行時(shí)信息
此時(shí)臊恋,會(huì)調(diào)用Scheduler的抽象方法createWorker()返回一個(gè)Worker衣洁,因?yàn)槲覀円呀?jīng)通過observeOn()指定的Scheduler是“SingleScheduler”,接下來看看“SingleScheduler”的createWorker()的實(shí)現(xiàn)吧抖仅。
2:SingleScheduler.createWorker()
通過createWorker()返回了SingleScheduler中實(shí)現(xiàn)了Scheduler.Worker該抽象類的ScheduledWorker的實(shí)例坊夫。
3:創(chuàng)建完Worker毙替,執(zhí)行“source.subscribe()”
此時(shí)會(huì)執(zhí)行“ObservableSubscribeOn.subscribe()”(線程調(diào)度的整體流程 介紹了為什么會(huì)執(zhí)行ObservableSubscribeOn.subscribe()),并把“ObserveOnObserver”該observer傳遞至ObservableSubscribeOn中。
4:執(zhí)行ObservableSubscribeOn.subscribeActual()
4.1:首先践樱,回調(diào)第3步中創(chuàng)建的ObserveOnObserver.onSubscribe(),在該方法中會(huì)回調(diào)外部創(chuàng)建的Observer(實(shí)際接受事件的觀察者)的onSubscribe()
4.2:再次厂画,繼續(xù)執(zhí)行ObservableSubscribeOn.subscribeActual()剩下的代碼,此時(shí)就真正開始 在為被觀察者設(shè)置的線程中(此處是在通過NewThreadWorker創(chuàng)建相應(yīng)線程執(zhí)行)執(zhí)行其相應(yīng)的方法了
5:執(zhí)行Scheduler.scheduleDirect()
5.1:首先拷邢,執(zhí)行Scheduler的抽象方法createWorker()
因?yàn)槲覀優(yōu)樵揝cheduler設(shè)置的是NewThreadScheduler,我們先去看看NewThreadScheduler的createWorker()做了什么袱院。
從圖中我們可以知道,NewThreadScheduler.createWorker()返回了一個(gè)NewThreadWorker實(shí)例瞭稼,并且在執(zhí)行該類實(shí)例的時(shí)候忽洛,初始化了一個(gè)相應(yīng)的 線程池(該線程池是為了后續(xù)要在其中執(zhí)行異步任務(wù)準(zhǔn)備的)。
5.2:其次环肘,在上一步創(chuàng)建的Worker中欲虚,執(zhí)行給定的任務(wù)(也就是在上一步創(chuàng)建的Worker中的線程池中執(zhí)行相應(yīng)的任務(wù))
5.3:再次,回到ObservableSubscribeOn.subscribeActual() 5.2執(zhí)行的Runnable(任務(wù))就是 調(diào)用source的subscribe()悔雹。該source是誰复哆?它就是我們在 創(chuàng)建“Observable”的過程 中通過調(diào)用Observable.create()創(chuàng)建的 ObservableCreate實(shí)例。
5.4:最后腌零,調(diào)用ObservableOnSubscribe.subscribe()回調(diào)至 消息準(zhǔn)備處梯找。 此時(shí)我們發(fā)現(xiàn) 當(dāng)前線程是以“RxNewThreadScheduler”開頭的工作線程,這也就解釋了益涧,在 基本使用中我們發(fā)現(xiàn)為什么通過 subscribeOn(Schedulers.newThread())設(shè)置后锈锤,執(zhí)行的事件發(fā)送所在的線程不是UI線程而是以“RxNewThreadScheduler”開頭的工作線程了。
6:發(fā)送事件闲询,此時(shí)會(huì)通過CreateEmitter(消息發(fā)射器)的OnNext(),onComplete(),onError()來向“觀察者”發(fā)送事件
7:調(diào)用ObservableObserveOn中ObserveOnObserver的onNext(),onComplete(),onError()
8:通過閱讀源碼發(fā)現(xiàn)久免,ObserveOnObserver實(shí)現(xiàn)了Runnable接口,所以再執(zhí)行Scheduler的schedule()方法時(shí)執(zhí)行的是該類型的run()方法扭弧。
當(dāng)執(zhí)行drainNormal()阎姥,先通過checkTerminated()判斷線程是否為終止了,如果沒終止,則調(diào)用 Observer.onNext()接受事件寄狼。此時(shí)我們發(fā)現(xiàn)當(dāng)前線程是通過observeOn(Schedulers.single())指定的線程名為“RxSingleScheduler”的線程丁寄。
到此我們分析了通過Observable.subscribeOn()給 被觀察者調(diào)用方法設(shè)置所在的線程 以及通過Observable.observeOn()給 觀察者調(diào)用方法設(shè)置所在的線程 內(nèi)部實(shí)現(xiàn)氨淌。