序言
RxJava是現(xiàn)在最流行的響應(yīng)式函數(shù)編程框架艘包,之前的項(xiàng)目中一直使用RxJava统倒,結(jié)合Retrofit+OkHttp搭建網(wǎng)絡(luò)請(qǐng)求框架寨典,很是好用。
后來(lái)RxJava2出來(lái)了房匆,官網(wǎng)表明一段時(shí)間之后不再維護(hù)RxJava耸成,所以在新項(xiàng)目中,決定使用RxJava2浴鸿。
對(duì)于新手來(lái)說(shuō)井氢,即使沒(méi)用過(guò)RxJava,也可以直接學(xué)習(xí)RxJava2岳链。而對(duì)于從RxJava過(guò)渡到RxJava2的同學(xué)花竞,自然更容易上手。
響應(yīng)式編程(Reactive Programming)這個(gè)詞很多人都知道掸哑,但具體是什么含義可能沒(méi)多少人能解釋清楚约急。我簡(jiǎn)單說(shuō)一下自己的理解:響應(yīng)式編程可基于任何事物(數(shù)據(jù)寇仓、用戶(hù)行為、時(shí)間烤宙、對(duì)象)創(chuàng)建事件流,并且框架提供一個(gè)強(qiáng)大的函數(shù)庫(kù)來(lái)操作事件流俭嘁,包括合并躺枕、過(guò)濾、轉(zhuǎn)換供填、切換線(xiàn)程拐云、監(jiān)聽(tīng)...,流是響應(yīng)式的核心近她。
RxJava就是這樣一個(gè)響應(yīng)式編程框架叉瘩,今天我們主要來(lái)介紹RxJava2的事件處理流程和線(xiàn)程切換原理。本文并不是一篇新手指引教程粘捎,而是一篇進(jìn)階教程薇缅。如果想入門(mén)RxJava2,可以看一下這篇文章攒磨。
用戶(hù)登錄場(chǎng)景
下面以用戶(hù)登錄場(chǎng)景為例泳桦,介紹怎樣通過(guò)RxJava2進(jìn)行事件流處理,登錄代碼如下:
/**
* 用戶(hù)登錄操作
*/
public void login(final String userName, final String password) {
Observable.create(new ObservableOnSubscribe<CommonApiBean<UserInfo>>() {
@Override
public void subscribe(ObservableEmitter<CommonApiBean<UserInfo>> e) throws Exception {
e.onNext(loginApi(userName, password));
}
})
.map(new Function<CommonApiBean<UserInfo>, UserInfo>() {
@Override
public UserInfo apply(CommonApiBean<UserInfo> bean) throws Exception {
if (bean != null && bean.body != null) {
return bean.body;
}
return new UserInfo();
}
})
.doOnNext(new Consumer<UserInfo>() {
@Override
public void accept(UserInfo userInfo) throws Exception {
saveUserInfo(userInfo);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<UserInfo>() {
@Override
public void accept(UserInfo userInfo) throws Exception {
//登錄成功娩缰,跳轉(zhuǎn)頁(yè)面
loginSuccess(userInfo);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
//登錄失敗提示用戶(hù)
loginFailed();
}
});
}
上面一段代碼是RxJava2常規(guī)的使用方式灸撰,能夠滿(mǎn)足多數(shù)網(wǎng)絡(luò)請(qǐng)求場(chǎng)景,下面我們就針對(duì)這段代碼進(jìn)行分析拼坎。
分析事件流
RxJava2處理事件流分為3個(gè)步驟:
- 構(gòu)建操作符對(duì)應(yīng)的Observable
- 逐級(jí)生成Observer浮毯,逆向訂閱Observable
- 逐級(jí)調(diào)用Observer的onNext方法
下面我們就分別來(lái)介紹這三個(gè)流程。
1. 構(gòu)建操作符對(duì)應(yīng)的Observable
上面一段代碼中泰鸡,使用的操作符包括create债蓝、map、doOnNext鸟顺、subscribeOn惦蚊、observeOn
,我們依次看這些操作符做了什么事情讯嫂。
Observable
要了解RxJava2原理蹦锋,必須先了解Observable,Observable是RxJava2事件流的入口類(lèi)欧芽,也可以叫做事件源莉掂。
public abstract class Observalbe<T> implements ObservableSource<T> {
//交由子類(lèi)實(shí)現(xiàn)的抽象方法
@Override
protected abstract void subscribeActual(Observer observer) ;
//實(shí)現(xiàn)了ObservableSource的方法
@Override
public final void subscribe(Observer<? super T> observer) {
//省略一堆判空等處理
subscribeActual(observer);
}
//省略一堆靜態(tài)操作符方法
}
從Observable的定義可知,它實(shí)現(xiàn)了ObservableSource接口千扔,并定義了一個(gè)subscribeActual抽象方法憎妙,調(diào)用Observable的subscribe方法實(shí)際上是做了一些基礎(chǔ)判斷后库正,調(diào)用subscribeActual方法。Observable的每個(gè)子類(lèi)需要需要實(shí)現(xiàn)自己的subscribeActual方法厘唾。
create
跟蹤到Observable的create方法:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
第一句代碼對(duì)source進(jìn)行判空褥符,如果為空,會(huì)拋出異常抚垃。接著生成一個(gè)ObservableCreate對(duì)象喷楣,把這個(gè)對(duì)象傳入RxJavaPlugin進(jìn)行組裝。
RxJavaPlugin提供了一系列的Hook Function鹤树,通過(guò)這種函數(shù)對(duì)RxJava的標(biāo)準(zhǔn)進(jìn)行加工铣焊,如果我們不配置這些方法,默認(rèn)直接返回原對(duì)象罕伯,即ObservableCreate曲伊。
注:下面介紹其他操作符,就不再解釋判空操作和RxJavaPlugin追他。
接著坟募,我們看一下ObservableCretae的定義:
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
//省略其他方法
}
很簡(jiǎn)單,ObservableCreate繼承Observable邑狸,并且在構(gòu)造方法中保存了傳入的ObservableOnSubscribe對(duì)象婿屹。
總結(jié):create()構(gòu)建了一個(gè)ObservableCreate對(duì)象,該對(duì)象繼承Observable推溃。
map
同樣昂利,跟蹤到Observable的map方法中:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
這里創(chuàng)建并返回一個(gè)ObservalbeMap對(duì)象:
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
//省略其他方法
}
ObservableMap繼承AbstractObservableWithUpstream類(lèi):
/**
* Base class for operators with a source consumable.
*/
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
protected final ObservableSource<T> source;
AbstractObservableWithUpstream(ObservableSource<T> source) {
this.source = source;
}
@Override
public final ObservableSource<T> source() {
return source;
}
}
AbstractObservableWithUpstream是所有接收上一級(jí)輸入操作符的基類(lèi)。
總結(jié):map()構(gòu)建了一個(gè)ObservableMap對(duì)象铁坎。
doOnNext
根據(jù)上面兩個(gè)操作符的源碼蜂奸,我們猜測(cè)這里也會(huì)返回一個(gè)Observable子類(lèi)對(duì)象,進(jìn)入源碼驗(yàn)證一下:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> doOnNext(Consumer<? super T> onNext) {
return doOnEach(onNext, Functions.emptyConsumer(), Functions.EMPTY_ACTION, Functions.EMPTY_ACTION);
}
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
private Observable<T> doOnEach(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Action onAfterTerminate) {
//省略判空操作
return RxJavaPlugins.onAssembly(new ObservableDoOnEach<T>(this, onNext, onError, onComplete, onAfterTerminate));
}
果不其然硬萍,doOnNext最后返回了一個(gè)ObservableDoOnEach對(duì)象:
public final class ObservableDoOnEach<T> extends AbstractObservableWithUpstream<T, T> {
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Action onAfterTerminate;
public ObservableDoOnEach(ObservableSource<T> source, Consumer<? super T> onNext,
Consumer<? super Throwable> onError,
Action onComplete,
Action onAfterTerminate) {
super(source);
this.onNext = onNext;
this.onError = onError;
this.onComplete = onComplete;
this.onAfterTerminate = onAfterTerminate;
}
//省略其他代碼
}
doOnNext也需要接收上流傳來(lái)的Observable作為source扩所,所以也繼承了AbstractObservableWithUpstream。
總結(jié):doOnNext()構(gòu)建了一個(gè)ObservableDoOnEach對(duì)象朴乖。
subscribeOn
用過(guò)RxJava的同學(xué)都知道祖屏,subscribeOn是用來(lái)切換線(xiàn)程的,用于指定被觀察者執(zhí)行的線(xiàn)程买羞。
不著急袁勺,怎樣切換線(xiàn)程我們后面會(huì)分析,先看一下它的源碼:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
這里返回了一個(gè)ObservableSubscribeOn對(duì)象畜普,繼續(xù)跟蹤:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
//省略其他代碼
}
ObservableSubscribeOn中保存了事件源source和線(xiàn)程調(diào)度器scheduler期丰,而這個(gè)scheduler是我們傳入的Schedulers.io()
。
總結(jié):subscribeOn()構(gòu)建了一個(gè)ObservableSubscribeOn對(duì)象。
observeOn
各位同學(xué)肯定也知道钝荡,observeOn用于指定觀察者執(zhí)行的線(xiàn)程街立,至于怎樣實(shí)現(xiàn)線(xiàn)程切換等到后面分析。
老套路埠通,我們跟蹤到Observable.observeOn方法:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
//省略判空驗(yàn)證操作
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
observeOn最后返回了一個(gè)ObservableObserveOn對(duì)象:
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
//省略其他代碼
}
ObservableObserveOn對(duì)象也保存了事件源source和線(xiàn)程調(diào)度器scheduler赎离,這里的scheduler是我們傳入的AndroidScheduler.mainThread()
。
總結(jié):observeOn構(gòu)建了一個(gè)ObservableObserveOn對(duì)象端辱。
到這里蟹瘾,操作符對(duì)應(yīng)的Observable構(gòu)建完成,總結(jié)一下掠手,按照操作符順序,構(gòu)建了ObservableCreate -> ObservableMap -> ObservableDoOnEach -> ObservableSubscribeOn -> ObservableObserveOn
幾個(gè)Observable對(duì)象狸捕。
2. 逐級(jí)生成Observer喷鸽,逆向訂閱Observable
LambdaObserver
在登錄場(chǎng)景的最后,調(diào)用了subscribe方法灸拍,傳入了兩個(gè)Comsumer對(duì)象,我們看一下subscribe方法的實(shí)現(xiàn):
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {
return subscribe(onNext, onError, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) {
//省略判空操作
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
return ls;
}
從代碼看出,subscribe方法拿我們傳入的兩個(gè)Consumer構(gòu)建了一個(gè)LambdaObserver對(duì)象慨亲,而兩個(gè)Consumer分別對(duì)應(yīng)onNext和onError戈锻,并且用LambdaObserver來(lái)訂閱ObservableObserveOn。
總結(jié):用LambdaObserver訂閱ObservableObserveOn對(duì)象轩性。
ObserveOnObserver
繼續(xù)分析ObservableObserveOn.subscribe方法声登,上面提到過(guò),Observable的subscribe方法實(shí)際上會(huì)調(diào)用具體子類(lèi)的subscribeActual方法揣苏,所以我們跟蹤ObservableObserveOn的subscribeActual方法:
//ObservableObserveOn
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
TrampolineScheduler表示是否當(dāng)前線(xiàn)程悯嗓,而我們傳入的schedule是AndroidScheduler.mainThread()
,并不是TrampolineScheduler對(duì)象卸察。
所以會(huì)走else邏輯脯厨,創(chuàng)建一個(gè)Scheduler.Worker,并把它作為參數(shù)構(gòu)建一個(gè)ObserveOnObserver對(duì)象坑质。
用ObserveOnObserver對(duì)象訂閱source(source是我們構(gòu)建ObservableObserveOn對(duì)象傳入的ObservableSubscribeOn對(duì)象)合武。
總結(jié):用ObserveOnObserver訂閱ObservableSubscribeOn對(duì)象
SubscribeOnObserver
接著,看一下ObservableSubscribeOn的訂閱邏輯:
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
首先創(chuàng)建一個(gè)SubscribeOnObserver對(duì)象涡扼,scheduler.scheduleDirect
從名字上看稼跳,大概用來(lái)切換線(xiàn)程的。在指定線(xiàn)程中執(zhí)行SubscribeTask任務(wù):
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
這個(gè)任務(wù)很簡(jiǎn)單吃沪,就是用parent(SubscribeOnObserver對(duì)象)來(lái)訂閱source(此處的source是ObservableDoOnEach)岂贩。
總結(jié):SubscribeOnObserver訂閱ObservableDoOnEach對(duì)象。
DoOnEachObserver
老套路,我們繼續(xù)看ObservableDoOnEach的subscribeActual方法:
@Override
public void subscribeActual(Observer<? super T> t) {
source.subscribe(new DoOnEachObserver<T>(t, onNext, onError, onComplete, onAfterTerminate));
}
這個(gè)方法很干脆萎津,直接用DoOnEachObserver訂閱source(此處的source是ObservableMap對(duì)象)
總結(jié):用DoOnEachObserver訂閱ObservableMap對(duì)象卸伞。
MapObserver
到這里,相信大家也可以猜到锉屈,ObservableMap的subscribeActual中荤傲,肯定也是構(gòu)建一個(gè)MapObserver來(lái)訂閱source,本著實(shí)事求是的精神颈渊,源碼還是要看的:
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
果不其然_K焓颉!
總結(jié):用MapObserver訂閱ObservableCreate俊嗽。
最后雾家,我們要看一下ObservableCreate中的subscribeActual方法:
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
這里也出現(xiàn)了source.subscribe(parent)
,parent是CreateEmitter對(duì)象绍豁,那么source是什么呢芯咧?
有哪位同學(xué)能回答一下這個(gè)問(wèn)題嗎?
哈哈竹揍,不賣(mài)關(guān)子了敬飒,這里的source就是我們最開(kāi)始創(chuàng)建Observable事件流傳入的ObservableOnSubscribe對(duì)象,還有印象嗎芬位,沒(méi)有也沒(méi)關(guān)系:
public void login(final String userName, final String password) {
Observable.create(new ObservableOnSubscribe<CommonApiBean<UserInfo>>() {
@Override
public void subscribe(ObservableEmitter<CommonApiBean<UserInfo>> e) throws Exception {
e.onNext(loginApi(userName, password));
}
}).//省略后續(xù)操作代碼
}
沒(méi)錯(cuò)无拗,就是調(diào)用此處我們實(shí)現(xiàn)的subscribe方法。
到這里昧碉,逆向訂閱Observable的過(guò)程分析完畢了英染。
總結(jié):
LambdaObserver -> ObservableObserveOn
ObserveOnObserver -> ObservableSubscribeOn
SubscribeOnObserver -> ObservableDoOnEach
DoOnEachObserver -> ObservableMap
MapObserver -> ObservableCreate
最后調(diào)用ObservableOnSubscribe的subscribe方法
有木有感覺(jué)思緒逐漸明朗起來(lái)!不急被饿,后面還有呢税迷!
3. 逐級(jí)調(diào)用Observer的onNext方法
MapObserver.onNext
接著上面最后一步進(jìn)行分析,調(diào)用ObservableOnSubscribe的subscribe方法锹漱,而我們?cè)趕ubscribe中調(diào)用了e.onNext
箭养,此處的e是CreateEmmiter對(duì)象,進(jìn)入它的onNext方法:
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
邏輯很簡(jiǎn)單哥牍,對(duì)發(fā)射的數(shù)據(jù)判空毕泌,如果數(shù)據(jù)為空則拋出異常。如果沒(méi)有中斷事件流嗅辣,則調(diào)用observer.onNext
撼泛,此處的observer是創(chuàng)建CreateEmitter時(shí)傳入的MapObserver。
總結(jié):調(diào)用MapObserver的onNext方法澡谭。
DoOnEachObserver
跟蹤到MapObserver的onNext方法:
@Override
public void onNext(T t) {
//省略一些判斷
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}
先通過(guò)mapper.appply
對(duì)數(shù)據(jù)t做變換愿题,變換之后繼續(xù)調(diào)用DoOnEachObserver的onNext方法。
總結(jié):調(diào)用DoOnEachObserver的onNext方法。
SubscribeOnObserver.onNext
我們跟到DoOnEachObserver.onNext方法:
@Override
public void onNext(T t) {
if (done) {
return;
}
try {
onNext.accept(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
s.dispose();
onError(e);
return;
}
actual.onNext(t);
}
跟我們預(yù)料一樣潘酗,繼續(xù)調(diào)用下一級(jí)observer(SubscribeOnObserver)的onNext方法杆兵。
總結(jié):調(diào)用SubscribeOnObserver的onNext方法。
ObserveOnObserver.onNext
長(zhǎng)征的路一步一步走仔夺,接著看SubscribeOnObserver.onNext方法:
@Override
public void onNext(T t) {
actual.onNext(t);
}
這里也很干脆直接琐脏,調(diào)用上一級(jí)observer(ObserveOnObserver)的onNext方法。
總結(jié):調(diào)用ObserveOnObserver的onNext方法缸兔。
LambdaObserver.onNext
進(jìn)入ObserveOnObserver.onNext方法:
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
先把數(shù)據(jù)加入queue日裙,然后代用schedule方法,這里涉及到線(xiàn)程調(diào)度惰蜜,我們稍后分析昂拂,總之最后會(huì)調(diào)用到LambdaObserver.onNext方法。
到這里抛猖,逐級(jí)調(diào)用Observer的onNext方法也分析完畢了格侯。
總結(jié):
MapObserver.onNext -> DoOnEachObserver.onNext -> SubscribeOnObserver.onNext -> ObserveOnObserver.onNext -> LambdaObserver.onNext
到這里,RxJava整個(gè)事件流的原理分析完畢了樟结。回顧一下精算,包括三個(gè)步驟:
- 構(gòu)建操作符對(duì)應(yīng)的Observable
- 逐級(jí)生成Observer瓢宦,逆向訂閱Observable
- 逐級(jí)調(diào)用Observer的onNext方法
相信各位同學(xué)很容易理解這個(gè)圖片描述的流程灰羽。
線(xiàn)程切換原理
在理解了RxJava2操作符工作原理之后驮履,我們需要分析subscribeOn和observeOn切換線(xiàn)程的原理。
1. subscribeOn
上面提到過(guò)廉嚼,subscribeOn指定被觀察者處理事件流所在線(xiàn)程玫镐,它作用在subscribe階段(即圖中逆向訂閱過(guò)程),我們重新看一下ObservableSubscribeOn的訂閱過(guò)程:
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
此處的scheduler怠噪,是我們出傳入的Schedulers.io()
恐似,這是個(gè)什么東西呢?
public final class Schedulers {
@NonNull
static final Scheduler IO;
@NonNull
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
static {
IO = RxJavaPlugins.initIoScheduler(new IOTask());
//省略其他幾個(gè)Scheduler初始化過(guò)程
}
static final class IOTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return IoHolder.DEFAULT;
}
}
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
//省略其他代碼
}
看到這里傍念,我們知道了矫夷,Scheduler.io()
具體是指IoScheduler對(duì)象,IoScheduler繼承Scheduler憋槐,Scheduler是所有線(xiàn)程調(diào)度器的父類(lèi)双藕,看一下Scheduler的實(shí)現(xiàn):
public abstract class Scheduler {
@NonNull
public abstract Worker createWorker();
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
//省略其代碼
}
Scheduler是個(gè)抽象類(lèi),包含一個(gè)抽象方法createWorker阳仔,返回一個(gè)Worker對(duì)象忧陪。
而它的scheduleDirect方法實(shí)際上就是調(diào)用這個(gè)Worker的schedule方法。
繼續(xù)分析線(xiàn)程切換邏輯,代碼中調(diào)用了IoScheduler.scheduleDirect方法嘶摊,實(shí)際就是把SubscribeTask交給IoScheduler.createWorker構(gòu)建的worker去執(zhí)行延蟹。
跟到IoScheduler.createWorker方法:
@NonNull
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
返回了一個(gè)EventLoopWorker對(duì)象,進(jìn)入它的schedule方法:
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
這里也不是真正執(zhí)行任務(wù)的地方更卒,那就繼續(xù)跟進(jìn)到ThreadWorker.scheduleActual方法:
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
到這里等孵,總算看到了,通過(guò)線(xiàn)程池分配線(xiàn)程來(lái)執(zhí)行任務(wù)蹂空。
總結(jié):subscribeOn(Schedulers.io())
會(huì)在逆向訂閱步驟中俯萌,通過(guò)線(xiàn)程池分配一個(gè)子線(xiàn)程來(lái)執(zhí)行任務(wù)。
圖中通過(guò)粉色和紅色的箭頭區(qū)分了UI線(xiàn)程和子線(xiàn)程上枕,走到訂閱ObservableSubscribeOn時(shí)咐熙,從UI線(xiàn)程切換到子線(xiàn)程,箭頭從粉色變?yōu)榧t色辨萍,之后的逆向訂閱操作都在子線(xiàn)程中進(jìn)行棋恼。
2. observeOn
接下來(lái)分析observeOn,它是指定觀察者(訂閱者)處理事件所在線(xiàn)程锈玉。我們傳入的是AndroidScheduler.mainThread()
,這又是個(gè)什么東西呢爪飘?
public final class AndroidSchedulers {
private static final class MainHolder {
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
public static Scheduler from(Looper looper) {
if (looper == null) throw new NullPointerException("looper == null");
return new HandlerScheduler(new Handler(looper));
}
private AndroidSchedulers() {
throw new AssertionError("No instances.");
}
}
看到這段代碼,我們就知道了拉背,AndroidScheduler.mainThread
指的是HandlerThread师崎,在它構(gòu)造方法中會(huì)傳入一個(gè)主線(xiàn)程Handler,相信不用解釋?zhuān)魑煌瑢W(xué)明白這個(gè)Handler的作用吧椅棺。對(duì)犁罩,,两疚,就是用來(lái)把觀察者執(zhí)行邏輯切換到主線(xiàn)程床估。
那么,具體是在哪個(gè)過(guò)程切換的呢诱渤?執(zhí)行ObserveOnObserver的onNext階段丐巫!
代碼如下:
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
先把事件流中的數(shù)據(jù)t加入隊(duì)列queue,然后執(zhí)行schedule方法:
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
這個(gè)worker相信大家猜到了勺美,就是HandlerScheduler的createWorker得到的對(duì)象鞋吉,把this作為任務(wù)(ObserveOnObserver實(shí)現(xiàn)了Runnable),交給worker執(zhí)行:
@Override
public Worker createWorker() {
return new HandlerWorker(handler);
}
這里的worker是HandlerWorker對(duì)象励烦,繼續(xù)看它的schedule方法:
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
//省略一些判斷
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
到這里谓着,總算守得云開(kāi)見(jiàn)日出。通過(guò)Handler發(fā)送消息把任務(wù)切換到主線(xiàn)程執(zhí)行坛掠。
這個(gè)任務(wù)就是剛才提到的ObserveOnObserver赊锚,我們看一下它的run方法:
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
這里會(huì)根據(jù)outputFused走不通的邏輯治筒,正常情況下都會(huì)走else邏輯,我們就只分析這條分支舷蒲。
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
這段代碼比較長(zhǎng)耸袜,但仔細(xì)看下來(lái),邏輯還是比較簡(jiǎn)單牲平,就是從queue中獲取數(shù)據(jù)堤框,然后把數(shù)據(jù)交給actual(LambdaObserver)的onNext方法。到這里纵柿,經(jīng)過(guò)轉(zhuǎn)換的數(shù)據(jù)交給我們傳入的Comsumer蜈抓,在主線(xiàn)程中處理,observeOn切換線(xiàn)程的邏輯分析完畢昂儒。
還是可以看回那張圖沟使,當(dāng)執(zhí)行ObserveOnObserver.onNext時(shí),就從子線(xiàn)程切換回UI線(xiàn)程渊跋,箭頭變成粉色腊嗡。
總結(jié):observeOn作用于onNext階段。
總結(jié)
這篇文章很長(zhǎng)拾酝,相信看完的同學(xué)肯定會(huì)有所收獲燕少,對(duì)RxJava2有更好的認(rèn)識(shí)。
當(dāng)然蒿囤,除了本文舉例場(chǎng)景中的操作符客们,RxJava2還提供了很有強(qiáng)的而好用的操作符,各位同學(xué)可以學(xué)習(xí)學(xué)習(xí)蟋软。