先看RxJava的簡單使用及解析:
//產(chǎn)生事件并返回Single對象,Single和Observable是一樣的作用,不一樣的地方是Single只回調(diào)onSuccess(),不會回調(diào)onError()
Single<Integer> just = Single.just(1);
//訂閱
just.subscribe(new SingleObserver<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(Integer integer) {
}
@Override
public void onError(Throwable e) {
}
});
以上是Rxjava的一個簡單示例疗我,第一步通過Single.just()發(fā)送一個事件,第二部調(diào)用subscribe()訂閱事件赠堵。
先看第一步Single.just():
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Single<T> just(final T item) {
ObjectHelper.requireNonNull(item, "value is null");
//構建一個SingleJust返回
return RxJavaPlugins.onAssembly(new SingleJust<T>(item));
}
//鉤子方法残家,如果設置了onSingleAssembly兽间,那么可以通過apply()對數(shù)據(jù)進行再加工,默認沒設置onSingleAssembly价匠,不必關注此方法
public static <T> Single<T> onAssembly(@NonNull Single<T> source) {
Function<? super Single, ? extends Single> f = onSingleAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
可以看到只是構造了一個SingleJust()對象当纱,傳入事件并返回。
那么第一步簡單總結一下為:構造新的被觀察者SingleJust踩窖,并傳遞事件
public final class SingleJust<T> extends Single<T> {
final T value;
public SingleJust(T value) {
this.value = value;
}
}
可以看到SingleJust在構造里記錄了數(shù)據(jù)坡氯,并且它還有一個subscribeActual(),這個方法里回調(diào)了onSubscribe()及onSuccess();
那么簡單總結一下,第一步Single.just(1)執(zhí)行后毙石,構造里一個SingleJust對象廉沮,并存儲了數(shù)據(jù)。所以返回的Single其實已經(jīng)是SingleJust對象了徐矩,那么被觀察者對象已經(jīng)切換為SingleJust
再看第二步just.subscribe():
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(SingleObserver<? super T> observer) {
//鉤子方法滞时,和上面一樣,不必關注滤灯。
observer = RxJavaPlugins.onSubscribe(this, observer);
try {
//調(diào)用subscribeActual坪稽,傳入observer
subscribeActual(observer);
} catch (Throwable ex) {
throw npe;
}
}
第二部是訂閱,它實際執(zhí)行了subscribeActual(),并傳入了下游的觀察者鳞骤,由于這里已經(jīng)是SingleJust窒百,那么執(zhí)行的就是它的subscrieActual():
@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
//此方法非核心,忽略
observer.onSubscribe(Disposables.disposed());
//執(zhí)行觀察者的onSuccess()
observer.onSuccess(value);
}
可以看到豫尽,最后實際執(zhí)行了下游觀察者的onSuccess()
總結:
以上就是RxJava的一個簡單模型過程篙梢,當調(diào)用一個操作符后,被觀察者對象就會改變美旧,同時事件從上往下傳遞渤滞。當產(chǎn)生訂閱關系時,下游觀察者在上游被觀察者的subscribeActual()中獲取結果**
為了更好理解復雜的情況榴嗅,這里再明確一個概念妄呕,下游觀察者是當前被觀察者調(diào)用subscribe()時傳入的參數(shù)。
Rxjava中其他復雜的操作嗽测,其實就是操作符的改變及累加绪励,只是在此模型上增加中間過程,如處理數(shù)據(jù)等。
稍復雜的RxJava使用及解析
Single.just("1")
.map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
return Integer.valueOf(s);
}
}).subscribe(new SingleObserver<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(Integer integer) {
}
@Override
public void onError(Throwable e) {
}
});
Single.just("1")這一行在上面分析過了疏魏,主要是把Single轉換為SingleJust停做,并傳遞數(shù)據(jù)。那么再看第二行map():
public final <R> Single<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
//創(chuàng)建了SingleMap對象蠢护,并傳入了this,和mapper參數(shù)雅宾,this就是SingleJust對象,mapper處理數(shù)據(jù)用葵硕,用來回調(diào)apply()
return RxJavaPlugins.onAssembly(new SingleMap<T, R>(this, mapper));
}
可以看到眉抬,這一步又把SingleJust轉換為了一個SingleMap,那么現(xiàn)在被觀察者就轉換為了SingleMap懈凹,那么現(xiàn)在可以把被觀察者SingleJust稱為上游被觀察者蜀变,同時把上游被觀察者和處理數(shù)據(jù)的mapper,存入了SingleMap中。
第三行開始訂閱介评,在這里被觀察者已經(jīng)變成了SingleMap库北,那么會執(zhí)行它的subscribeActual():
@Override
protected void subscribeActual(final SingleObserver<? super R> t) {
//這里的source其實是SingleJust,然后訂閱了觀察者MapSingleObserver们陆,由于觀察者是由下方的訂閱產(chǎn)生的寒瓦,這里稱觀察者t為下游觀察者
source.subscribe(new MapSingleObserver<T, R>(t, mapper));
}
如果這里回調(diào)觀察者的onSuccess(),那么就和之前總結的沒有區(qū)別坪仇,但是這里調(diào)用了source.subscribe(),由上游被觀察者再次訂閱杂腰。也就是說,產(chǎn)生訂閱關系后椅文,事件開始從下往上傳遞喂很,對數(shù)據(jù)進行處理。
根據(jù)之前的代碼source.subscribe()皆刺,最終會執(zhí)行source的subscribeActual()少辣,再執(zhí)行到觀察者的onSuccess(),也就是MapSingleObserver的onSuccess():
static final class MapSingleObserver<T, R> implements SingleObserver<T> {
//觀察者
final SingleObserver<? super R> t;
//處理數(shù)據(jù)對象
final Function<? super T, ? extends R> mapper;
@Override
public void onSuccess(T value) {
try {
//這里調(diào)用apply()處理數(shù)據(jù)。
v = ObjectHelper.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
//如果出錯調(diào)用onError()
onError(e);
return;
}
//處理完成羡蛾,下游觀察者調(diào)用onSuccess()
t.onSuccess(v);
}
}
可以看到在onSuccess()中漓帅,先對數(shù)據(jù)進行處理,如果數(shù)據(jù)沒出錯痴怨,那么再把數(shù)據(jù)交給最終的觀察者煎殷。
總結:根據(jù)上面的代碼再完善一下簡單模型,首先當發(fā)送一個事件后腿箩,事件開始從上游往下傳遞,傳遞過程中會由當前調(diào)用的操作符暫代被觀察者功能劣摇,當傳遞完成后代表設置完成珠移。然后開始訂閱事件,發(fā)起訂閱后那么事件開始從下往上傳遞,對之前的設置進行處理钧惧,最后處理完后成暇韧,事件再從上往下傳遞給最終的觀察者。那么最終模型就是:
從上往下(開始傳遞事件浓瞪,初始化操作符)—>從下往下(連接操作符懈玻,并進行部分設置)->從上往下(根據(jù)操作符對數(shù)據(jù)進行處理并返回)
根據(jù)模型,分析完整示例
Single.just("1")
.map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
return Integer.valueOf(s);
}
})
//切換到io線程
.subscribeOn(Schedulers.io())
//切換到主線程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new SingleObserver<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(Integer integer) {
}
@Override
public void onError(Throwable e) {
}
});
這里只為分析過程乾颁,忽律當前代碼并不需要切線程的操作涂乌。
第一步從上往下,初始化操作符:
map()分析過了英岭,那么直接看subscribeOn():
public final Single<T> subscribeOn(final Scheduler scheduler) {
//構建一個SingleSubscribeOn返回湾盒,scheduler就是要被設置的參數(shù)
return RxJavaPlugins.onAssembly(new SingleSubscribeOn<T>(this, scheduler));
}
根據(jù)之前的分析,那么就是subscribeOn暫時接管了被觀察者的職能诅妹,subscribeOn完成了設置罚勾,繼續(xù)看observeOn():
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Single<T> observeOn(final Scheduler scheduler) {
//構建一個SingleObserveOn返回,scheduler就是要被設置的參數(shù)
return RxJavaPlugins.onAssembly(new SingleObserveOn<T>(this, scheduler));
}
根據(jù)之前的分析吭狡,那么就是observeOn暫時接管了被觀察者的職能尖殃,此時完成了第一步,事件從上到下傳遞的設置功能划煮。
第二步從下往上送丰,開始訂閱,連接操作符
調(diào)用subscribe()般此,由于最后一個暫時接管被觀察者功能的是observeOn蚪战,所以是由它進行的訂閱,那么根據(jù)之前的代碼可知铐懊,會執(zhí)行SingleObserveOn的subscribeActual():
@Override
protected void subscribeActual(final SingleObserver<? super T> observer) {
//調(diào)用上游訂閱方法邀桑,這里的source是subscribeOn
source.subscribe(new ObserveOnSingleObserver<T>(observer, scheduler));
}
可以看到,調(diào)用了上游的subscribe()科乎,那么會再執(zhí)行上游的subscribeActual(),那么會來到SingleSubscribeOn的subscribeActual():
@Override
protected void subscribeActual(final SingleObserver<? super T> observer) {
//構造一個SubscribeOnObserver
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer, source);
observer.onSubscribe(parent);
Disposable f = scheduler.scheduleDirect(parent);
parent.task.replace(f);
}
這里先看一下這個SubscribeOnObserver是什么:
static final class SubscribeOnObserver<T>
extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable, Runnable
可以看到它時一個Runnable對象壁畸,那么接上面的第二行:
//切換線程,這里具體實現(xiàn)不必深究茅茂,只需知道是完成了切換線程操作就行
Disposable f = scheduler.scheduleDirect(parent);
可以看到捏萍,這是把上面的Runnable對象parent,傳了進去空闲。這里面其實使用了Executors進行了線程切換操作
既然是Runnable令杈,那么任務開始執(zhí)行就會觸發(fā)它的run():
@Override
public void run() {
//這里的source是SingleMap
source.subscribe(this);
}
subscribeOn()的實際作用就是切線程, 那么它的設置就在這里生效碴倾。另外由于不管下游有什么設置逗噩,都會在這里進行切線程操作掉丽,然后再進行訂閱,那么也就可以得出一個結論:無論subscribeOn()設置多少次异雁,只有在第一次有效捶障,因為從下往上傳播最終都會回到第一次設置的地方進行切線程操作。
可以看到纲刀,繼續(xù)往上傳遞项炼,那么會來到SingleMap的subscribeActual():
@Override
protected void subscribeActual(final SingleObserver<? super R> t) {
//這里的source是SingleJust
source.subscribe(new MapSingleObserver<T, R>(t, mapper));
}
還是繼續(xù)往上傳遞,來到SingleJust的subscribeActual():
@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
observer.onSubscribe(Disposables.disposed());
//這里的observer是MapSingleObserver
observer.onSuccess(value);
}
最終調(diào)用了觀察者的onSuccess()示绊,那么這里完成了第二部锭部,從下往上,對數(shù)據(jù)進行相應的操作處理
第三步耻台,從上往下空免,處理數(shù)據(jù):
根據(jù)之前的總結:下游觀察者是當前被觀察者調(diào)用subscribe()時傳入的數(shù)(之后的推論同理,不再強調(diào)),那么最后傳入的參數(shù)的map()操作符中的MapSingleObserver,那么就會執(zhí)行它的onSuccess():
@Override
public void onSuccess(T value) {
R v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
onError(e);
return;
}
//這里的t是MapSingleObserver
t.onSuccess(v);
}
可以看到盆耽,這里首先調(diào)用了apply(),讓用戶對數(shù)據(jù)進行處理蹋砚,那么操作符map,已經(jīng)完全發(fā)揮了作用。繼續(xù)往下摄杂,根據(jù)之前的總結可得知t是subscribeOn()操作符中的SubscribeOnObserver坝咐,再看它的onSuccess:
@Override
public void onSuccess(T value) {
//這里的downstream是ObserveOnSingleObserver
downstream.onSuccess(value);
}
這里沒有操作,直接向下傳遞是因為之前已經(jīng)分析過析恢,它的作用是切線程墨坚,并且已經(jīng)切過了。上面分析過映挂。那么繼續(xù)往下泽篮,downstream是observeOn()操作符的ObserveOnSingleObserver(),再看它的onSuccess::
public void onSuccess(T value) {
this.value = value;
//切換線程
Disposable d = scheduler.scheduleDirect(this);
DisposableHelper.replace(this, d);
}
observeOn的作用也是切換線程,所以這里也進行了切線程操作柑船,并把this作為參數(shù)帽撑,那么就是看它自身的run():
@Override
public void run() {
Throwable ex = error;
if (ex != null) {
downstream.onError(ex);
} else {
//這里的downstream是最終的觀察者
downstream.onSuccess(value);
}
}
這里可以看到,切換線程后將結果交給了最終的觀察者鞍时。完成了整個流程亏拉。observeOn()的線程是多次有效的,從上面的代碼可以看出逆巍,切完線程后繼續(xù)將結果傳遞給下游觀察者及塘,假如繼續(xù)調(diào)用observeOn(),那么就會繼續(xù)切線程锐极,結果也是在下游的run()中執(zhí)行笙僚,所以切線程有效。
總結:
RxJava總體流程可以概括為以下三步灵再,復雜的操作只是增加中間環(huán)節(jié)味咳,以及中間環(huán)節(jié)的各種細化處理
- 第一步流程-從上往下庇勃,初始化操作符,對所有操作符進行初始化
- 第二步流程-從下往上槽驶,開始訂閱,并連接操作符鸳兽,對連接的操作符進行設置掂铐,此例中主要是異步請求時切換線程,
- subscribeOn()的設置只有第一次生效揍异,因為在subscribeOn中會切換線程全陨,然后進行訂閱。這時的流程時從下往上衷掷,最終都會回到第一次的設置辱姨。所以之前不管切到哪個線程,最終又會由第一次的設置切回它所設置的線程戚嗅。
- 第三步流程-從上往下雨涛,處理數(shù)據(jù),獲得數(shù)據(jù)后由各個操作符對數(shù)據(jù)進行加工處理懦胞,最終傳遞給最終的觀察者替久。
- observeOn()每次設置都會起效果,因為observeOn()發(fā)揮作用是最后一步從上往下的過程中躏尉,所以它每一次切完下次蚯根,然后再傳遞到下一層,下一次同樣可以切線程操作胀糜。所以如果要對結果再次進行切線程操作颅拦,可多次使用observeOn()