前言
前兩篇文章中事示,分別對(duì)RxJava2的基本路程與鏈?zhǔn)秸{(diào)用分別做了闡述蜡歹,如有遺忘牺丙,此為傳送門(mén)
其實(shí)箩溃,從Rx調(diào)用鏈上拉庶,線(xiàn)程調(diào)度僅是遵守運(yùn)轉(zhuǎn)機(jī)制的一環(huán)癌佩,但因其便捷木缝、高頻的特點(diǎn)便锨,并在項(xiàng)目中很可能需要切換到自己的線(xiàn)程里,故將其選出我碟,理解如何實(shí)現(xiàn)放案。
正文
案例
ob.observeOn(Schedulers.newThread())
由于流程、鏈?zhǔn)骄隽岁U述矫俺,因此案例直接定位到線(xiàn)程調(diào)度線(xiàn)程吱殉。上面的案例中,下游被切換到了新的線(xiàn)程里進(jìn)行響應(yīng)厘托。
調(diào)度器與工作線(xiàn)程
Rx中考婴,由調(diào)度器負(fù)責(zé)提供工作線(xiàn)程,而工作線(xiàn)程則負(fù)責(zé)具體的運(yùn)轉(zhuǎn)催烘。以下為兩者的簡(jiǎn)要信息沥阱。
public abstract class Scheduler {
......
public abstract Worker createWorker();
}
public abstract static class Worker implements Disposable {
......
public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);
}
- Scheduler.createWorker(): 獲取用來(lái)執(zhí)行任務(wù)的線(xiàn)程
- Worker.schedule() : 具體的調(diào)度邏輯
當(dāng)前案例中,ob.observeOn()拿到的Observable為ObservableObserveOn伊群。
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
// 一般會(huì)執(zhí)行到這里
// 通過(guò)Scheduler拿到執(zhí)行線(xiàn)程任務(wù)的Worker
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
在Rx構(gòu)建調(diào)用鏈構(gòu)建時(shí)考杉,將會(huì)來(lái)到subscribeActual(),且對(duì)于當(dāng)前案例來(lái)說(shuō)舰始,事件推送到對(duì)應(yīng)的下游節(jié)點(diǎn)才會(huì)進(jìn)行線(xiàn)程調(diào)度崇棠,因此,調(diào)度信息由ObserveOnObserver保存丸卷,具體信息如下:
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.actual = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
}
當(dāng)事件推送到當(dāng)前節(jié)點(diǎn)枕稀,即ObserveOnObserver將進(jìn)行處理,取onNext()來(lái)看
@Override
public void onNext(T t) {
......
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
ObserveOnObserver將作為Runnable的交給工作線(xiàn)程Worker處理谜嫉,當(dāng)ObserveOnObserver作為線(xiàn)程任務(wù)獲得運(yùn)轉(zhuǎn)時(shí)機(jī)后萎坷,drainFused()或drainNormal()根據(jù)具體情況向調(diào)用鏈上的下一節(jié)點(diǎn)推送相應(yīng)事件,代碼不貼沐兰。而線(xiàn)程調(diào)度也就完成了哆档。
為何這么短
當(dāng)前節(jié)點(diǎn)下,關(guān)注的事情是Rx如何進(jìn)行線(xiàn)程調(diào)度住闯,而核心為瓜浸,將當(dāng)前節(jié)點(diǎn)作為Runnable運(yùn)行在相應(yīng)的線(xiàn)程里,步驟為:
- 由Scheduler提供工作線(xiàn)程Worker
- 在當(dāng)前節(jié)點(diǎn)的響應(yīng)時(shí)機(jī)比原,將此節(jié)點(diǎn)作為Runnable交給Worker處理插佛,在run()時(shí)機(jī)將事件推送知下一節(jié)點(diǎn)
至于Worker是如何處理Runnable,何時(shí)獲得執(zhí)行時(shí)機(jī)量窘,這就是Rx機(jī)制之外的問(wèn)題了雇寇,因?yàn)橐涯鼙WC接下來(lái)的響應(yīng),運(yùn)行在Worker線(xiàn)程里,即完成了線(xiàn)程調(diào)度谢床,如下圖:
下一篇:響應(yīng)式拉刃忠弧(未發(fā)布,待續(xù))