RxJava源碼解析第二篇库车。
我們知道哥放,在使用RxJava的時(shí)候智嚷,線程的調(diào)度是其內(nèi)部幫我們實(shí)現(xiàn)的,這讓我們可以便捷的實(shí)現(xiàn)函數(shù)式編程糠悯。
本文主要從源碼的角度來分析RxJava的線程調(diào)度機(jī)制
= =最近被項(xiàng)目搞瘋都沒什么時(shí)間寫筆記了帮坚。
引入
我們知道妻往,線程調(diào)度主要通過observeOn
和subscribeOn
這兩個(gè)方法,以及Schedular來指定使用的線程试和。
還是以上一次的代碼為例:
Observable.create(new ObservableOnSubscribe<LoginApiResult>() {
@Override
public void subscribe(ObservableEmitter<LoginApiResult> e) throws Exception {
e.onNext(login());
}
}) //調(diào)用登錄接口
.map(new Function<LoginApiBean, UserInfoBean>() {
@Override
protected UserInfoBean decode(LoginApiBean loginApiBean) {
//處理登錄結(jié)果讯泣,返回UserInfo
if (loginApiBean.isSuccess()) {
return loginApiBean.getUserInfoBean();
} else {
throw new RequestFailException("獲取網(wǎng)絡(luò)請求失敗");
}
}
})
.doOnNext(new Consumer<UserInfoBean>() { //保存登錄結(jié)果UserInfo
@Override
public void accept(@NonNull UserInfoBean bean) throws Exception {
saveUserInfo(bean);
}
})
.subscribeOn(Schedulers.io()) //調(diào)度線程
.observeOn(AndroidSchedulers.mainThread()) //調(diào)度線程
.subscribe(new Consumer<UserInfoBean>() {
@Override
public void accept(@NonNull UserInfoBean bean) throws Exception {LoginApiBean
//整個(gè)請求成功,根據(jù)獲取的UserInfo更新對應(yīng)的View
showSuccessView(bean);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
//請求失敗灰署,顯示對應(yīng)的View
showFailView();
}
});
我們知道判帮,通過:
.subscribeOn(Schedulers.io()) //調(diào)度線程
.observeOn(AndroidSchedulers.mainThread()) //調(diào)度線程
這兩句代碼,就使我們上半部分的請求和保存數(shù)據(jù)都執(zhí)行在io線程中溉箕,而下半部的ui更新則執(zhí)行在主線程晦墙。
通過這段代碼,我們引入幾個(gè)問題:
- observeOn和subscribeOn是如何實(shí)現(xiàn)線程調(diào)度的肴茄?
- observeOn和subscribeOn之間是否存在沖突晌畅?
observeOn源碼
首先解決第一個(gè)問題,我們先了解一下ObserveOn的實(shí)現(xiàn)原理:
首先看一下調(diào)用:
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
return new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize);
}
我們可以看到寡痰,ObserveOn
最終是返回了一個(gè)ObservableObserveOn
對象抗楔,并將scheduler
傳入。
根據(jù)上一篇文的思路:
ObservableObserveOn
會(huì)被我們最后subscribe
的時(shí)候傳入的Observer
訂閱拦坠。
讓我們跟進(jìn)看一下ObservableObserveOn
被訂閱時(shí)會(huì)執(zhí)行什么邏輯:
@Override
protected void subscribeActual(Observer<? super T> observer) {
//TrampolineScheduler 表示當(dāng)前線程
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
//根據(jù)scheduler創(chuàng)建worker
Scheduler.Worker w = scheduler.createWorker();
//通過ObservableObserveOnObserver代理
source.subscribe(new ObservableObserveOn.ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
這里的邏輯并不難理解连躏,(如果看了上一篇文章),
首先是判斷了scheduler
是不是表示當(dāng)前線程的TrampolineScheduler
贞滨,如果是就直接讓observer
訂閱上一級的Observable
入热,也就是跳過當(dāng)前這一層,即圖中的Observer
直接訂閱ObservableSubscribeOn
晓铆。
然后根據(jù)schedular
生成對應(yīng)的worker
勺良,交由ObservableObserveOnObserver
代理,訂閱上一級的Observable
骄噪。
根據(jù)我們引入的案例尚困,我們以observeOn(AndroidSchedulers.mainThread())
為例,當(dāng)完成逆向訂閱链蕊,執(zhí)行任務(wù)鏈到ObservableObserveOnObserver
時(shí):
@Override
public void onNext(T t) {
// 上一級的模式如果不是異步的事甜,加入隊(duì)列
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
//進(jìn)行線程調(diào)度
schedule();
}
void schedule() {
// 判斷當(dāng)前正在執(zhí)行的任務(wù)數(shù)目
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
這里首先是判斷了sourceMode
,這里先不跟蹤這個(gè)變量滔韵,只需要知道大多數(shù)情況下逻谦,這個(gè)判斷是成立,所以會(huì)把數(shù)據(jù)加入隊(duì)列奏属。
然后轉(zhuǎn)而讓worker
執(zhí)行接下去的步驟。
我們跟蹤看看潮峦,可以發(fā)現(xiàn)囱皿,這是個(gè)抽象方法勇婴,可以找到他在不同類中有不同實(shí)現(xiàn),分別對應(yīng)了幾種不同的線程調(diào)度機(jī)制嘱腥,我們挑選案例中的AndroidSchedulers.mainThread()
來跟蹤耕渴。
首先我們跟蹤mainThread
方法,可以發(fā)現(xiàn)內(nèi)部轉(zhuǎn)到了這里:
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
我們再跟進(jìn)HandlerScheduler
齿兔,我們知道worker
是通過createWorker
方法產(chǎn)生的:
public Worker createWorker() {
return new HandlerWorker(handler);
}
可以看到直接生成了HandlerWorker
橱脸,并傳入了一開始創(chuàng)建的綁定了MainLooper
的Handler
》治看到這里也能大致猜出添诉,后續(xù)會(huì)把任務(wù)傳給這個(gè)handler
執(zhí)行:
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
//省略部分代碼
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
return scheduled;
}
可以看到,這里將傳進(jìn)來的runnable
包裝成ScheduledRunnable
医寿,然后提交給綁定的handler
栏赴。
我們知道,后續(xù)Handler
會(huì)調(diào)用ScheduledRunnable
的run方法:
ScheduledRunnable(Handler handler, Runnable delegate) {
this.handler = handler;
this.delegate = delegate;
}
@Override
public void run() {
try {
delegate.run();
} catch (Throwable t) {
//……
}
}
可以看到靖秩,只是簡單的調(diào)用了我們傳入的runnable
的run
方法须眷,也就是剛才我們在ObservableObserveOnObserver
中通過schedule
方法傳入的runnable
,我們回去看看:
void schedule() {
// 判斷當(dāng)前正在執(zhí)行的任務(wù)數(shù)目
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
可以看到其實(shí)本身就是個(gè)runnable
:
@Override
public void run() {
//輸出結(jié)果是否融合
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
可以看到沟突,根據(jù)outputFused
來跳轉(zhuǎn)方法花颗,這里先不跟蹤這個(gè)變量,后面會(huì)再提到惠拭。
現(xiàn)在只需要知道當(dāng)連續(xù)兩個(gè)observable都需要線程調(diào)度時(shí)(比如從observeOn
到observeOn
)扩劝,這個(gè)outputFused才會(huì)發(fā)生變化,默認(rèn)為false求橄。
那么這里今野,我們先進(jìn)入drainNormal
方法:
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
//第一層循環(huán)
for (;;) {
// 檢查異常處理
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
//第二層循環(huán)
for (;;) {
boolean d = done;
T v;
//從隊(duì)列中獲取數(shù)據(jù)
v = q.poll();
boolean empty = v == null;
// 檢查異常
if (checkTerminated(d, empty, a)) {
return;
}
//如果沒有數(shù)據(jù)了,跳出
if (empty) {
break;
}
//執(zhí)行下一次操作罐农。
a.onNext(v);
}
//減掉執(zhí)行的次數(shù)条霜,并獲取剩于任務(wù)數(shù)量,然后再次循環(huán)
//直到獲取剩余任務(wù)量為0涵亏,跳出循環(huán)
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
這里的邏輯其實(shí)也不難宰睡,具體可以看注釋。
到這里其實(shí)已經(jīng)切換了線程气筋,然后就是分發(fā)數(shù)據(jù)拆内,逐個(gè)調(diào)用onNext
操作了。直到?jīng)]有數(shù)據(jù)就跳出循環(huán)宠默。(總覺得這里missed
的設(shè)計(jì)很奇怪- -為什么是初始化1而不是missed=get()
呢麸恍。望有大神解答~)
看到這里也就大致明白了ObserveOn
的流程呢。
總結(jié)一下:
ObserveOn
會(huì)用一個(gè)queue
保存上一級傳下來的數(shù)據(jù),然后通過scheduler創(chuàng)建一個(gè)worker
抹沪,提交數(shù)據(jù)刻肄,并將任務(wù)執(zhí)行在worker
設(shè)置的線程中。
subscribeOn源碼
看完ObserveOn
融欧,我們看一下subscribeOn
,
首先看一下當(dāng)他被訂閱時(shí)會(huì)執(zhí)行什么操作:
@Override
public void subscribeActual(final Observer<? super T> s) {
//創(chuàng)建對應(yīng)的Observer
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//執(zhí)行線程調(diào)度敏弃,內(nèi)部會(huì)訂閱上一級的Observable
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
可以看到,這里直接進(jìn)行了線程調(diào)度噪馏,創(chuàng)建了SubscribeTask
任務(wù)麦到,然后交由Scheduler
執(zhí)行。
我們先看看scheduleDirect
會(huì)執(zhí)行什么操作:
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Scheduler.Worker w = createWorker();
Scheduler.DisposeTask task = new Scheduler.DisposeTask(run, w);
w.schedule(task, delay, unit);
return task;
}
可以看到欠肾,這里和我們剛才追蹤ObserveOn
時(shí)的邏輯一樣瓶颠。都是將任務(wù)交給了Worker
處理。我們剛才已經(jīng)分析了董济,Worker
會(huì)將任務(wù)提交給對應(yīng)的線程執(zhí)行步清。
所以我們回過頭看一下我們提交了什么任務(wù):
@Override
public void run() {
source.subscribe(parent);
}
可以看出,這里將訂閱的操作提交給了Worker
執(zhí)行虏肾。
總結(jié)一下:
subscribeOn
會(huì)將訂閱上一級的操作調(diào)交給worker
中對應(yīng)的線程執(zhí)行廓啊。
ObserveOn和subscribeOn
我們還是以上述引入的例子為例,可以看出封豪,整個(gè)過程進(jìn)行了兩次線程調(diào)度谴轮,首先是subscribeOn
,然后是ObserveOn
吹埠,這個(gè)過程比較簡單第步,先解析這個(gè)過程。
根據(jù)上一篇文章的分析缘琅,RxJava的整個(gè)流程分為三個(gè)步驟:
創(chuàng)建任務(wù)鏈粘都,這里沒有涉及線程調(diào)度。默認(rèn)執(zhí)行在當(dāng)前線程刷袍,在這里也就是主線程翩隧。
逆向訂閱,這里當(dāng)遇到
ObserveOn
的時(shí)候呻纹,ObserveOn
直接進(jìn)行了訂閱操作堆生,所以沒有影響。
但是但我們訂閱ObservableSubscribeOn
的時(shí)候雷酪,其便將訂閱操作提交到了對應(yīng)線程淑仆,所以后續(xù)的訂閱操作都執(zhí)行在對應(yīng)線程,在這里便是IO線程哥力。執(zhí)行任務(wù)鏈蔗怠,受到
ObservableSubscribeOn
的影響,這里也會(huì)繼續(xù)執(zhí)行在IO線程。
但是當(dāng)我們執(zhí)行到ObserveOnObserver
的時(shí)候寞射,onNext
操作會(huì)執(zhí)行在對應(yīng)的線程中最住,在這里也就是切換到主線程。
圖中怠惶,紫色的箭頭表示執(zhí)行在默認(rèn)線程(主線程),紅色的箭頭表示執(zhí)行在IO線程轧粟,繩藍(lán)色的線表示執(zhí)行在切換后的主線程策治。
observeOn和subscribeOn之間是否存在沖突
其實(shí)從上述的例子我們可以看出并不存在沖突的問題,一個(gè)影響的subscribe之后的操作兰吟,一個(gè)影響的是doNext之后的操作通惫。
從圖中可以看出,不管subscribe
和ObserveOn
怎么變化混蔼,都不會(huì)發(fā)生沖突的情況履腋。