前言
Rxjava是NetFlix出品的Java框架荔烧, 官方描述為 a library for composing asynchronous and event-based programs using observable sequences for the Java VM吮蛹,翻譯過來就是“使用可觀察序列組成的一個異步地痘绎、基于事件的響應(yīng)式編程框架”奶躯。一個典型的使用示范如下:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
String s = "1234";
//執(zhí)行耗時任務(wù)
emitter.onNext(s);
}
}).map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
return Integer.parseInt(s);
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
本文要講的主要內(nèi)容是Rxjava的核心思路,利用一張圖并結(jié)合源碼分析Rxjava的實現(xiàn)原理,至于使用以及其比較深入的內(nèi)容督笆,比如不常用的操作符,背壓等诱贿,讀者可以自行學(xué)習(xí)娃肿。另外提一句咕缎,本文采用的Rxjava版本是2.2.3,Rxjava最新版本是3.x.x料扰,感興趣的可以自行閱讀凭豪,但相信其最核心的原理是不會變化的。另外晒杈,本文篇幅較長嫂伞,最好的效果是邊看本文邊到源碼中體會,如果讀者沒有耐心讀完拯钻,可以只看圖片和頭尾帖努。
正題
先放出本文最重要的圖:
Rxjava的核心思路被總結(jié)在了圖中,本文分為兩部分粪般,第一部分講圖中的三條流和事件傳遞拼余,第二部分講線程切換的原理,下面進(jìn)入正題亩歹。
流式構(gòu)建和事件傳遞
在講之前匙监,先提一點,在Rxjava中小作,有Observable和Observer這兩個核心的概念舅柜,但是它們在發(fā)生訂閱時,跟普通的觀察者模式寫法不太一樣躲惰,因為常識來講致份,應(yīng)該是觀察者去訂閱(subscribe)被觀察者,但是Rxjava為了其基于事件的流式編程础拨,只能反著來氮块,observable去訂閱observer,所以在rxjava中诡宗,subscribe可以理解“注入”觀察者滔蝉。
首先我們看上面的圖片,先簡單解釋一下:圖中方形的框代表的是Observable塔沃,因為它代表節(jié)點蝠引,所以用Ni表示,圓形框代表的是觀察者Observer蛀柴,用Oi標(biāo)識螃概,后面加括號的意思是Oi持有其下游Observer的引用,左側(cè)代表上游鸽疾,右側(cè)代表下游吊洼。圖片里有三條有方向的彩色粗線,代表三個不同的流制肮,這三個流是我們?yōu)榱朔治鰡栴}而抽象出來的的冒窍,代表從構(gòu)建到訂閱整個事件的流向递沪,按照時間順序從上到下依次流過,它們的含義分別是:
- 從左往右的構(gòu)建流:用來構(gòu)建整個事件序列综液,這個流表征了整個鏈路的構(gòu)建過程款慨,相當(dāng)于構(gòu)造方法。
- 從右往左的訂閱流:當(dāng)最終訂閱(subscribe方法)這個行為發(fā)生的時候谬莹,每個節(jié)點從右向左依次執(zhí)行訂閱行為檩奠。
- 從左往右的觀察者回調(diào)流:當(dāng)事件發(fā)生以后,會通過這個流依次通知給各個觀察者届良。
我們依次分析這三條流:
構(gòu)建流
在使用Rxjava時笆凌,其流式構(gòu)建流程是很大的特色,避免了傳統(tǒng)回調(diào)的繁瑣士葫。怎么實現(xiàn)的呢乞而?使用過Rxjava的讀者應(yīng)該都知道,Rxjava的每一步構(gòu)建過程api都是相同的慢显,這是因為每一步的函數(shù)返回結(jié)果都是一個Observable爪模,Observable提供了Rxjava所有的功能。那么Obsevable在Rxjava中到底扮演一個什么角色呢荚藻?事實上屋灌,其官方定義就已經(jīng)告訴我們答案了,前言里官方定義中有這樣一段:“using Observable sequences”应狱,所以說共郭,Obsevable就是構(gòu)建流的組件,我們可以看成一個個節(jié)點疾呻,這些節(jié)點串起來組成整個鏈路除嘹。Observable這個類實現(xiàn)了一個接口:ObservableSource,這個接口只有一個方法:subscribe(observer),也就是說岸蜗,所有的Obsevable節(jié)點都具有訂閱這個功能尉咕,這個功能很重要,是訂閱流的關(guān)鍵璃岳,待會會講年缎。總結(jié)一下:
在我們編寫Rxjava代碼時铃慷,每一步操作都會生成一個新的Observable節(jié)點(沒錯单芜,包括ObserveOn和SubscribeOn線程變換操作),并將新生成的Observable返回枚冗,直到最后一步執(zhí)行subscribe方法
無論是構(gòu)建的第一步 create方法缓溅,還是observeOn,subscribeOn變換線程方法,還是各種操作符比如map,flatMap等赁温,都會生成對應(yīng)的Observable坛怪,每個Observble中要實現(xiàn)一個最重要的方法就是subscribe,我們看其實現(xiàn):
public final void subscribe(Observer<? super T> observer) {
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
RxJavaPlugins.onError(e);
throw npe;
}
}
這里提一點股囊,大家看源碼時遇到RxJavaPlugins時直接略過看里面的代碼就好了袜匿,它是hook用的,不影響主要流程稚疹。所以上面代碼其實只有一行有用:
subscribeActual(observer);
也就是說居灯,每個節(jié)點在執(zhí)行subscribe時,其實就是在調(diào)用該節(jié)點的subscribeActual方法内狗,這個方法是抽象的怪嫌,每個節(jié)點的實現(xiàn)都不一樣。我們舉個栗子柳沙,拿ObseverOn這個操作生成的ObservableSubscribeOn瞧瞧:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
//xxx省略
}
其中其父類繼承Observable岩灭,所以它是一個Observble。
整個過程有點像builder模式赂鲤,不同之處是它是生成了新的節(jié)點噪径,而builder模式返回的自身。如果你讀過okHttp的源碼数初,okHttp中攔截器跟這里有些相似找爱,okHttp中會構(gòu)建多個Chain節(jié)點,然后用相應(yīng)的Intercepter去處理Chain泡孩。
我們理解了編寫Rxjava代碼的過程其實就是構(gòu)建一個一個Observable節(jié)點的過程车摄,接下來我們看第二條流。
訂閱流
構(gòu)建過程只是通過構(gòu)造函數(shù)將一些配置傳給了各個節(jié)點仑鸥,實際還沒有執(zhí)行任何代碼吮播,只有最后一步才真正的執(zhí)行訂閱行為。當(dāng)最后一個節(jié)點調(diào)用subscribe方法時锈候,是構(gòu)建流向訂閱流變化的轉(zhuǎn)折點薄料,我們以圖中為例:最后一個節(jié)點是N5,N5節(jié)點是最后一個flatmap操作符方法產(chǎn)生的泵琳,也就是說摄职,最后是調(diào)用這個節(jié)點的subscribe方法,這個方法最終也是會調(diào)用到subscribeActual方法中去获列,我們看其源碼:
public final class ObservableFlatMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
final boolean delayErrors;
final int maxConcurrency;
final int bufferSize;
public ObservableFlatMap(ObservableSource<T> source,
Function<? super T, ? extends ObservableSource<? extends U>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize) {
super(source);
this.mapper = mapper;
this.delayErrors = delayErrors;
this.maxConcurrency = maxConcurrency;
this.bufferSize = bufferSize;
}
@Override
public void subscribeActual(Observer<? super U> t) {
if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
return;
}
source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
static final class MergeObserver<T, U> extends AtomicInteger implements Disposable, Observer<T> {
final Observer<? super U> downstream;
final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
}
剛才我們分析了谷市,N5節(jié)點是Observable節(jié)點,其subscribe方法最后調(diào)用的是subscribeActual方法击孩,我們看上面代碼中它的這個方法:前面的判斷語句跳過迫悠,第二行:
source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
這行代碼需要注意兩點:
- 生成了一個新的Observer,請注意其構(gòu)造函數(shù)中第一個參數(shù)t巩梢,保存到了downstream這個“下游”變量中创泄,這個t從哪兒傳進(jìn)來的呢艺玲?對于N5節(jié)點來說,這個t就是我們代碼中最后一步編寫的Observer鞠抑,比如我們常用的網(wǎng)絡(luò)請求返回后的回調(diào)饭聚。也就是說,這個新生成的Observer包含了它的“下游”觀察者的引用搁拙,在圖片中對應(yīng)最右邊的圓形框O1(observer)秒梳。
- 執(zhí)行訂閱行為,這里的source是該節(jié)點構(gòu)造函數(shù)傳入的source箕速,通過源碼得知其實就是N5節(jié)點的上一個節(jié)點N4酪碘,因此,這里的訂閱行為本質(zhì)上是讓當(dāng)前節(jié)點的上一個節(jié)點訂閱當(dāng)前節(jié)點新生成的Observer盐茎。
到這里兴垦,我們分析了最后一個節(jié)點執(zhí)行subscribe方法的過程,事實上庭呜,每個節(jié)點的執(zhí)行流程都是類似的(subscribeOn節(jié)點有些特殊滑进,等會線程調(diào)度會將),也就是說募谎,N5會調(diào)用N4的subscribe方法扶关,而在N4的subscribe方法中,又去調(diào)用了N3的subscribe....一直到N0會調(diào)用source的subscribe方法数冬〗诨保總結(jié)下來就是:
從最后一個N5節(jié)點的訂閱行為開始,依次執(zhí)行前面各個節(jié)點真正的訂閱方法拐纱。在每個節(jié)點的訂閱方法中铜异,都會生成一個新的Observer,這個Observer會包含“下游”的Observer秸架,這樣當(dāng)每個節(jié)點都執(zhí)行完訂閱(subscribeActual)后揍庄,也就生成了一串Observer,它們通過downstream,upstream引用連接东抹。
以上就是訂閱流的發(fā)生過程蚂子,簡單講就是下游節(jié)點調(diào)用上游節(jié)點的subscribeActual方法,從而形成了一個調(diào)用鏈缭黔。
觀察者回調(diào)流
當(dāng)訂閱流執(zhí)行到最后食茎,也就是第一個節(jié)點N0時,我們看發(fā)生了什么馏谨,首先看看N0節(jié)點怎么建立的:
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
生成了ObservableCreate實例别渔,我們看這個類(簡化):
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
source.subscribe(parent);
}
}
所以訂閱流的最終會掉到上面的subscrbeActual方法,它其實還是和其他節(jié)點一樣,最主要的還是執(zhí)行了
source.subscribe(parent)
這行代碼哎媚,那么這個節(jié)點的source是什么呢喇伯?它就是我們事件的源頭啊抄伍!
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
String s = "1234";
//執(zhí)行耗時任務(wù)
emitter.onNext(s);
}
})
上面代碼直接拿的開頭的例子艘刚,這個source是一個ObservableOnSubscribe管宵,看它的subscribe方法里截珍,這里很重要,這個函數(shù)里面其實是訂閱流和觀察者流的轉(zhuǎn)折點箩朴,也就是流在這兒“轉(zhuǎn)向了”岗喉。這里,這個事件源沒有像節(jié)點那樣炸庞,調(diào)用上一個節(jié)點的訂閱方法钱床,而是調(diào)用了其參數(shù)的emitter的onNext方法,這個emitter對應(yīng)N0節(jié)點的什么呢埠居?看代碼知道查牌,時CreateEmitter這個類,我們看這個類里面
static final class CreateEmitter<T> extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (!isDisposed()) {
observer.onNext(t);
}
}
//省略
}
看它的onNext方法滥壕,執(zhí)行的是
observer.onNext(t)
observer是誰纸颜?構(gòu)造函數(shù)傳進(jìn)來的,也就是N0節(jié)點subscribeActual方法中的observer绎橘,這個observer是誰呢胁孙?仔細(xì)回想一下,前面訂閱流的時候不就是一次訂閱上一個節(jié)點生成的Observer嗎称鳞,所以這個observer就是前一個節(jié)點N1生成的Observer涮较,我們看N1節(jié)點,是一個Map冈止,對應(yīng)的Observable節(jié)點里的Observer源碼如下:
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
downstream.onNext(v);
//省略后續(xù)
名為MapObserver狂票,看它的onNext方法,忽略前面兩個判斷語句熙暴,核心就兩句闺属,一個是mapper.apply(t),另一個就是downstream.onNext(v)怨咪。也就是說屋剑,這個mapObserver干了兩件事,一個是把上個節(jié)點返回的數(shù)據(jù)進(jìn)行一次map變換诗眨,另一個就是將map后的結(jié)果傳遞給下游唉匾,下游是什么呢?看了訂閱流的讀者自然知道,就是N2節(jié)點的Observer巍膘,對應(yīng)圖中O4,依次類推厂财,我們知道了,事件發(fā)生以后峡懈,通過各個節(jié)點的Observer事件源被層層處理并傳遞給下游璃饱,一直到最后一個觀察者執(zhí)行完畢,整個事件處理完成肪康。
至此荚恶,我們?nèi)齻€流分析完畢,接下來磷支,我們開始分析線程調(diào)度是怎么實現(xiàn)的谒撼。
線程調(diào)度
觀察仔細(xì)的讀者可能已經(jīng)看到了,圖中N2節(jié)點左側(cè)的所有節(jié)點和右側(cè)的節(jié)點顏色不同雾狈,我為什么要這樣畫呢廓潜?其實里面的玄機(jī)就是線程調(diào)度,接下來我們分別看subscribeOn和observeOn的線程切換玄機(jī)吧善榛。
SubscribeOn
在訂閱流發(fā)生的的時候辩蛋,大多數(shù)節(jié)點都是直接調(diào)用上一個節(jié)點的subscribe方法,實現(xiàn)雖有差別移盆,但大同小異悼院。唯一有個最大的不同就是subscribeOn這個節(jié)點,我們看源碼:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
普通的節(jié)點執(zhí)行時味滞,大多只是簡單的執(zhí)行source.subscribe(observer)樱蛤,但是這個不一樣。先看第二行剑鞍,它調(diào)用了觀察者的onSubscribe方法昨凡,熟悉Rxjava的人知道,我們在自定義Observer的時候蚁署,里面有這個回調(diào)便脊,其發(fā)生時機(jī)就在此刻。我們接著看最后一行,忽略parent.setDisposable這個邏輯光戈,我們直接看參數(shù)里面的東西哪痰。
scheduler.scheduleDirect(new SubscribeTask(parent))
看看干了什么:
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
繼續(xù):
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
創(chuàng)建了一個worker,一個runnable久妆,然后將二者封裝到一個DisposeTask中晌杰,最后用worker執(zhí)行這個task,那么這個worker是什么呢筷弦?
@NonNull
public abstract Worker createWorker();
createworker是一個抽象方法肋演,所以需要去找Scheduler的子類抑诸,我們回想一下rxjava的使用,如果在子線程中執(zhí)行爹殊,我們一般設(shè)置調(diào)度器為Schedulers.io(),我們看這個子類的實現(xiàn):
在IOSchedluer類中:
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
繼續(xù):
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
這里的executor就是一個ExecutorService蜕乡,熟悉線程池的讀者應(yīng)該知道,這里的submit方法梗夸,就是將callable丟到線程池中去執(zhí)行任務(wù)了层玲。
我們回到主線
scheduler.scheduleDirect(new SubscribeTask(parent))
對于io線程的調(diào)度器來說,上面的代碼就是將new SubscribeTask(parent)丟到線程池中執(zhí)行反症,我們看參數(shù)里面的SubscribeTask:
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
看run方法:source.subscribe(parent)辛块,這里的parent跟普通節(jié)點一樣,仍然是本節(jié)點生成的新的Observer惰帽,對于本節(jié)點來說憨降,是一個SubscribeOnObserver。因此该酗,我們就知道了,對于subscribeOn這個節(jié)點士嚎,它跟普通的節(jié)點不同之處在于:
SubscribeOn節(jié)點在訂閱的時候呜魄,將它的上游節(jié)點的訂閱行為,以runnable的形式扔給了一個線程池(對于IO調(diào)度器來說)莱衩,也就是說爵嗅,當(dāng)訂閱流流到SubscribeOn節(jié)點時,線程發(fā)生了切換笨蚁,之后流向的節(jié)點都在切換后的線程中執(zhí)行睹晒。
分析到這里,我們就知道了subscribeOn的線程切換原理了括细,原來是在訂閱流中塞了一個線程變化操作伪很。我們再看圖中的顏色問題,為什么這個節(jié)點上游的節(jié)點都是紅色的呢奋单?因為當(dāng)訂閱流流過這個節(jié)點后锉试,后面的節(jié)點只是單純的傳遞給上游節(jié)點而已,無論是普通的操作符览濒,還是ObserveOn節(jié)點呆盖,都是簡單的傳遞給上游,沒有做線程切換(注意贷笛,ObserveOn是在觀察者流中做的線程切換应又,待會會講)。
我們再思考一個問題乏苦,如果上游還有別的subscribeOn株扛,會發(fā)生什么?
我們假設(shè)N1節(jié)點的map修改程subscribeOn(AndroidScheduler.Main),也就是說席里,切換到主線程叔磷。我們還是從N2節(jié)點開始分析,剛才說到最后會執(zhí)行到SubscribeTask里的Run方法奖磁,注意此時source.subscribe(parent)發(fā)生在子線程中改基,接下來,回調(diào)用N1節(jié)點的subscribe咖为,N1節(jié)點回調(diào)用scheduler.scheduleDirect(new SubscribeTask(parent))策彤,方法哥纫,此時,因為線程調(diào)度器是主線程的,我們看它的代碼:
private static final class MainHolder {
static final Scheduler DEFAULT
= new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
}
看看這個HandlerScheduler的方法:
@Override
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
handler.postDelayed(scheduled, unit.toMillis(delay));
return scheduled;
}
熟悉Android Handler機(jī)制的讀者應(yīng)該很清楚哈蝇,這里會把N1節(jié)點上游的操作,通過Handler機(jī)制蔬咬,扔給主線程操作曹动,雖然這一步是在N2節(jié)點的子線程中執(zhí)行的,但是它之前的事件仍然會在主線程中執(zhí)行饰恕。因此我們有以下結(jié)論:
subscribeOn節(jié)點影響它前面的節(jié)點的線程挠羔,如果前面還有多個subscribeOn節(jié)點,最終只有第一個埋嵌,也就是最上游的那個節(jié)點生效
接下來我們分析observeOn
ObserveOn
前面的subscribeOn線程切換是在訂閱流中發(fā)生的破加,接下來的ObserveOn比較簡單,它發(fā)生在第三條流-觀察者回調(diào)流中雹嗦,我們看ObserveOn節(jié)點的源碼:
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
//簡化
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
}
在前面的觀察者流分析時范舀,我們知道,觀察者流是通過onNext()方法傳遞的了罪,我們看最后一行锭环,schedule(),線程切換捶惜,所以這個ObserveOn節(jié)點其實沒干啥事田藐,就是切換線程了,而且是在onNext回調(diào)中切換的吱七。我們進(jìn)到這個方法:
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
worker是這個節(jié)點訂閱時指定的 scheduler.createWorker()汽久, 以主線程觀察為例:
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
if (async) {
message.setAsynchronous(true);
}
handler.sendMessageDelayed(message, unit.toMillis(delay));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
同樣,通過Handler機(jī)制踊餐,將runnable扔給主線程執(zhí)行,runnable是誰呢景醇,是this,this就是這個ObserveOnObserver,我們看它的run方法:
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
繼續(xù)看drainNormal
void drainNorml() {
//簡化
final Observer<? super T> a = downstream;
T v;
v = q.poll();
a.onNext(v);
}
抓重點吝岭,還是把上游的處理結(jié)果扔給下游三痰。也就是說observeOn會將它下游的onNext操作扔給它切換的線程中吧寺,因此ObserveOn影響的是它的下游,所以我們途中observeOn后面的顏色都是藍(lán)的散劫。
同樣我們思考稚机,如果有多個observeOn會發(fā)生什么?很簡單获搏,思路同subscribeOn赖条,每個ObserveOn只會影響它下游一直到下一個obseveOn節(jié)點的線程,也就是分段的常熙。
總結(jié)
到此為止我們就講完了全部內(nèi)容纬乍,包括三條流的原理和線程切換的原理,至于Rxjava的其他功能和原理裸卫,限于篇幅仿贬,本文不會講解,感興趣的讀者自行閱讀源碼墓贿。本文主要為讀者提供了理解Rxjava的思路茧泪,真正要去理解它,還是要多看源碼募壕。
在我看來调炬,Rxjava有點像觀察者模式和責(zé)任鏈模式的結(jié)合,普通的觀察者模式一般是被觀察者通知多個觀察者舱馅,而Rxjava則是被觀察者通知第一個Obsever,接下來Observer依次通知其他節(jié)點的Observer,形成一個“觀察鏈”刀荒,將觀察者模式進(jìn)行了一種類似鏈?zhǔn)降淖儞Q代嗤,每個節(jié)點又會執(zhí)行它不同的“職責(zé)”,非常巧妙缠借,總結(jié)以下就是:
最原始的訂閱事件從最后一個節(jié)點開始干毅,沿著Obsevable節(jié)點往上游傳遞,事件源頭處理完任務(wù)后泼返,通知給最上游的觀察者硝逢,然后通知沿著Observer鏈條往下游傳遞,直到最后一個觀察者結(jié)束绅喉。
后記渠鸽,關(guān)于flatmap
關(guān)于flatmap這個操作符,讀者可能會用到柴罐,但理解起來又比較難徽缚,我們通過本文,其實就很容易從源碼中理解這個操作符的含義革屠。這里我順便給大家解釋一下凿试,還是看圖:
上圖簡化了整個事件流向排宰,我們對事件源進(jìn)行了flatmap操作,flatmap在訂閱流的時候跟其他的操作符基本一致那婉,但是在觀察者回調(diào)流中卻很不一樣板甘,它在回調(diào)流中做了以下內(nèi)容:
flatmap將上游傳過來的數(shù)據(jù)進(jìn)行了一次變換,變成了一個Observable详炬,如何變的是由開發(fā)者自定義的盐类,比如圖中下面三個豎著的三個Observable節(jié)點流,這條流跟上面的四個Observable節(jié)點本質(zhì)上是一樣的痕寓。flatmap這個節(jié)點的Obsever將上游的數(shù)據(jù)轉(zhuǎn)化成了一個新的Observable流傲醉,然后執(zhí)行這條新的流,當(dāng)這條新的流走完時呻率,會接著原來的觀察者流繼續(xù)走下去硬毕。也就是說,flatMap這個操作符將一條新的Observable節(jié)點流“插入”到原始的觀察者回調(diào)流上去了礼仗。
那圖中的橘黃色和紫色的虛線是什么意思呢吐咳?
實際上它是flatmap的一種特殊情況,當(dāng)新插入的流的事件源有多個的時候元践,這是會產(chǎn)生分流韭脊,每個流都會執(zhí)行一遍下游的原始節(jié)點。我們拿下面這個例子來看:
String[] mainArmy = {"第一大隊", "第二大隊", "第三大隊"};
Observable.fromArray(mainArmy)
.flatMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(String s) throws Exception {
String[] littleArmy = {s + "的第一小隊", s + "的第二小隊", s + "的第三小隊"};
return Observable.fromArray(littleArmy);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String little) throws Exception {
System.out.println(little);
}
});
這個代碼運行結(jié)果是
第一大隊的第一小隊
第一大隊的第二小隊
第一大隊的第三小隊
第二大隊的第一小隊
第二大隊的第二小隊
第二大隊的第三小隊
第三大隊的第一小隊
第三大隊的第二小隊
第三大隊的第三小隊
這個例子也許很好的體現(xiàn)了flatmap這個操作符的意義单旁,把list鋪平展開沪羔,而且防止了繁瑣的嵌套循環(huán)。但是象浑,雖然flatmap很擅長處理這種問題(我不知道這個操作符是不是為了解決這個問題而設(shè)計出來的)蔫饰,但flatmap的功能卻遠(yuǎn)不僅如此,它的本質(zhì)是在合并Obsevable流愉豺,可以做很多事情篓吁,比如我們網(wǎng)絡(luò)請求的“連環(huán)請求”,舉個例子蚪拦,首先通過書本的Id獲取出版商名字杖剪,然后拿到出版商名字后獲取出版社信息。
api.getBookPublisherNameById("01102").flatmap(new Function<String, ObservableSource<PublisherInfo>>() {
@Override
public ObservableSource<PublisherInfo> apply(String s) throws Exception {
return api.getPublisherInfoByName(s);
}
}).subscribe(new COnsume<PublisherInfo>() {
@Override
public void accept(PublisherInfo little) throws Exception {
//獲取到出版社信息
}
})
看完這里驰贷,flatmap是不是也蠻好理解的~