RxJava——目前最熱門的響應(yīng)式函數(shù)編程框架杂抽。
筆者也是初涉Rx,所以打算通過這篇文章來理解Rx的操作流程,加深自己對Rx的理解艾少。
本文不涉及RxJava的入門使用,如有需有:
關(guān)于RxJava的入門推薦:拋物線大佬的精品——給 Android 開發(fā)者的 RxJava 詳解
[筆者仍為Android初學(xué)者翼悴。如有解釋錯誤的地方缚够,歡迎評論區(qū)指正探討]
本文主要根據(jù)RxJava2的源碼解析整個(gè)流程痢士。
引入
首先簡單的看一下關(guān)于RxJava的一般使用:
前提:定義了一個(gè)login接口垛孔,返回值為 { isSuccess, UserInfo}
Observable.create(new ObservableOnSubscribe<LoginApiResult>() {
@Override
public void subscribe(ObservableEmitter<LoginApiResult> e) throws Exception {
e.onNext(login());
}
}) //調(diào)用登錄接口
.map(new Function<LoginApiBean, UserInfoBean>() {
@Override
protected UserInfoBean decode(LoginApiBean loginApiBean) {
//處理登錄結(jié)果保檐,返回UserInfo
if (loginApiBean.isSuccess()) {
return loginApiBean.getUserInfoBean();
} else {
throw new RequestFailException("獲取網(wǎng)絡(luò)請求失敗");
}
}
})
.doOnNext(new Consumer<UserInfoBean>() { //保存登錄結(jié)果UserInfo
@Override
public void accept(@NonNull UserInfoBean bean) throws Exception {
saveUserInfo(bean);
}
})
.subscribeOn(Schedulers.io()) //調(diào)度線程
.observeOn(AndroidSchedulers.mainThread()) //調(diào)度線程
.subscribe(new Consumer<UserInfoBean>() {
@Override
public void accept(@NonNull UserInfoBean bean) throws Exception {LoginApiBean
//整個(gè)請求成功速挑,根據(jù)獲取的UserInfo更新對應(yīng)的View
showSuccessView(bean);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
//請求失敗豆茫,顯示對應(yīng)的View
showFailView();
}
});
為了便于理解空民,上述邏輯沒有對應(yīng)的進(jìn)行封裝狡耻,簡單的展示 RxJava 的幾個(gè)重要流程淮悼。
按著代碼的順序我們理一下步驟:
首先是通過create方法陪踩,生成一個(gè)Observable對象杖们,并傳入一個(gè)ObservableOnSubscribe對象,在其回調(diào)方法中調(diào)用login接口返回LoginApiResult肩狂,并執(zhí)行onNext
然后通過map方法將LoginApiResult轉(zhuǎn)換成UserInfoBean
緊接著是通過doOnNext方法進(jìn)行保存saveUserInfo操作
然后是線程的調(diào)度摘完,分別通過subscribeOn和observeOn將上面提到的步驟都執(zhí)行在IO線程,下面的步驟都執(zhí)行在主(UI)線程中
最后是通過Consumer根據(jù)執(zhí)行結(jié)果完成(成功或拋異常)傻谁,執(zhí)行對應(yīng)的UI更新方法孝治。
按著順序,我們一步一步的跟進(jìn)看看RxJava到底是如何實(shí)現(xiàn)這些操作的。
任務(wù)鏈的構(gòu)建
入口類——Observable
既然要了解RxJava荆秦,那么比不可少的我們應(yīng)該先來看看他的入口類篱竭,也就是Observable:
public abstract class Observable<T> implements ObservableSource<T> {
@Override //交由子類實(shí)現(xiàn)的出現(xiàn)方法
protected abstract void subscribeActual(Observer observer) ;
@Override //實(shí)現(xiàn)了ObservableSource的方法
public final void subscribe(Observer<? super T> observer) {
//省略一堆判空等處理
subscribeActual(observer);
}
省略了一堆靜態(tài)方法之后,我們可以看到步绸,Observable是一個(gè)抽象類掺逼,實(shí)現(xiàn)了ObservableSource接口,并留了subscribeActual這個(gè)抽象方法瓤介。
ObservableSource接口只定義了subscribe一個(gè)方法吕喘,可以看到這個(gè)方法做了一些基礎(chǔ)判斷之后直接跳轉(zhuǎn)到子類的subscribeActual方法。
所以一個(gè)被觀察者被subscribe的邏輯其實(shí)是交由Observable子類來實(shí)現(xiàn)的刑桑,每個(gè)不同的被觀察者可以根據(jù)自己的需求實(shí)現(xiàn) "被訂閱" 后的操作
(賊拗口- -md氯质,總覺得這里用subscribe這個(gè)命名很奇怪,還是setSubscriber好懂)
(換而言之祠斧,每個(gè)子類可以實(shí)現(xiàn)各自被setSubscriber后的動作)
Create
接下來是如何生成一個(gè)Obserable對象闻察,我們看到create方法。
create方法便是Obserable其中一個(gè)關(guān)鍵的靜態(tài)方法琢锋。
我們跟進(jìn)看一下源碼:
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
首先第一句代碼是對傳入的對象進(jìn)行判空辕漂,內(nèi)部內(nèi)部實(shí)現(xiàn)是如果傳入null,會拋異常吴超。
接著是生成一個(gè)ObservableCreate對象钉嘹,然后將這個(gè)對象傳入RxJavaPlugins進(jìn)行組裝。
RxJavaPlugins提供了一系列的Hook function鲸阻,通過鉤子函數(shù)這種方法對RxJava的標(biāo)準(zhǔn)操作進(jìn)行加工跋涣,當(dāng)我們沒有進(jìn)行配置時(shí),默認(rèn)是直接返回原來的對象鸟悴,也就是返回ObservableCreate對象陈辱。
(為了方便講解,后續(xù)將直接忽視判空和RxJavaPlugins的代碼)
分析后可以看到遣臼,這里其實(shí)直接返回一個(gè)ObservableCreate對象性置。
我們跟進(jìn)去看一下這個(gè)對象的一些基本信息:
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
可以簡單的看到,這個(gè)類繼承了Observalbe類揍堰,并存儲了我們剛才傳進(jìn)去的ObservableOnSubscribe對象鹏浅。當(dāng)然這個(gè)類也實(shí)現(xiàn)了剛才說的subscribeActual方法,我們待會再看屏歹。
map
往下隐砸,我們調(diào)用了Obserable的map方法:
我們跟進(jìn):
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
return new ObservableMap<T, R>(this, mapper);
}
可以看到其實(shí)是返回了一個(gè)ObservableMap對象,接受了兩個(gè)參數(shù)蝙眶,一個(gè)是this季希,在這里指的也就是剛才的ObservableCreate 褪那,還有一個(gè)Function對象,
我們再跟進(jìn)去看一下ObservableMap的基礎(chǔ)信息:
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
可以看到其實(shí)構(gòu)造方法和剛才的ObservableCreate一樣式塌,將傳入的對象進(jìn)行了存儲博敬。
不過可以發(fā)現(xiàn)- -這個(gè)類并不是繼承自O(shè)bservable,而是AbstractObservableWithUpstream峰尝,我們再跟進(jìn)看看:
// Base class for operators with a source consumable.
// 帶有source的operator的基類
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
/** The source consumable Observable. */
protected final ObservableSource<T> source;
AbstractObservableWithUpstream(ObservableSource<T> source) {
this.source = source;
}
@Override
public final ObservableSource<T> source() {
return source;
}
}
可以看到這個(gè)父類其實(shí)繼承了Observable偏窝,看到官方的注釋可以知道,這個(gè)類是所有接受上一級輸入的操作符(operator 如map)的基類武学,這里的邏輯并復(fù)雜祭往,其實(shí)只是簡單的封裝了一下上一級的輸入source和輸出先下一級的數(shù)據(jù)。
分析之后可以看到火窒,調(diào)用了map方法其實(shí)也是返回了一個(gè)Observable對象硼补。
doOnNext
接著往下是doOnNext,- -看到這里可以猜測也是簡單的返回一個(gè)Observable對象吧熏矿。已骇。
不管怎么說,先進(jìn)入源碼看一看:
public final Observable<T> doOnNext(Consumer<? super T> onNext) {
return doOnEach(onNext, Functions.emptyConsumer(), Functions.EMPTY_ACTION, Functions.EMPTY_ACTION);
}
private Observable<T> doOnEach(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Action onAfterTerminate) {
return new ObservableDoOnEach<T>(this, onNext, onError, onComplete, onAfterTerminate);
}
可以看到跳轉(zhuǎn)到doOnEach方法曲掰,傳入的參數(shù)除了我們傳進(jìn)來的Consumer之外疾捍,其實(shí)都是傳了空實(shí)現(xiàn)的Consumer對象。
可以看到- -真的是簡單的返回一個(gè)Observable對象栏妖。
老規(guī)矩,先看一下ObservableDoOnEach的基礎(chǔ)信息:
public final class ObservableDoOnEach<T> extends AbstractObservableWithUpstream<T, T> {
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Action onAfterTerminate;
public ObservableDoOnEach(ObservableSource<T> source, Consumer<? super T> onNext,
Consumer<? super Throwable> onError,
Action onComplete,
Action onAfterTerminate) {
super(source);
this.onNext = onNext;
this.onError = onError;
this.onComplete = onComplete;
this.onAfterTerminate = onAfterTerminate;
}
同樣的對所有信息進(jìn)行了保存奖恰〉踔海可以看到這個(gè)類也是繼承了AbstractObservableWithUpstream,可以接受上一層的輸入瑟啃,并向下一層輸出數(shù)據(jù)论泛。
subscribeOn & observeOn
= =接著是線程調(diào)度,其實(shí)不看也猜得出蛹屿。屁奏。。這里也是直接返回對應(yīng)的Observable對象错负。
首先看一下subscribeOn:
public final Observable<T> subscribeOn(Scheduler scheduler) {
return new ObservableSubscribeOn<T>(this, scheduler);
}
再看一下ObserveOn:
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
return new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize);
}
可以看到坟瓢,這里分別返回了0ObservableSubscribeOn和ObservableObserveOn對象。照舊我們先看看這兩個(gè)個(gè)類的基礎(chǔ)信息:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
怎么樣- -一路看到這里犹撒,也能知道他這里的基礎(chǔ)信息是什么了吧折联。
再看看另外一個(gè):
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
同樣的保存了傳進(jìn)去的基礎(chǔ)信息,我們發(fā)現(xiàn)其中共有的都保存了Scheduler對象识颊,我們先稍微看一下Scheduler:
public abstract class Scheduler {
@NonNull
public abstract Worker createWorker();
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
w.schedule(new Runnable() {
@Override
public void run() {
try {
run.run();
} finally {
w.dispose();
}
}
}, delay, unit);
return w;
}
public abstract static class Worker implements Disposable {
@NonNull
public Disposable schedule(@NonNull Runnable run) {
return schedule(run, 0L, TimeUnit.NANOSECONDS);
}
@NonNull
public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);
}
可以看到诚镰,Scheduler對外暴露了scheduleDirect方法,這個(gè)方法通過調(diào)用抽象方法createWorker得到worker對象,然后調(diào)用worker對象的schedule方法清笨,執(zhí)行runnable月杉。
看到這里大致就能猜出Scheduler對應(yīng)的邏輯啦,內(nèi)部的worker對象維護(hù)自己的線程池抠艾,然后每次執(zhí)行schedule方法時(shí)把runnable對象提交到線程池中沙合。先這樣理解,最后我們再深入一下跌帐。
subscribe
終于來到最后這個(gè)方法了- -md首懈。。谨敛。前面全都是直接返回對象究履,難道所有邏輯都在最后實(shí)現(xiàn)嗎?- -進(jìn)去看一下先脸狸。
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
return ls;
}
public final void subscribe(Observer<? super T> observer) {
subscribeActual(observer);
因?yàn)?strong>subscribe的重載方法很多- -這里只挑最終的兩個(gè)最仑,其中LambdaObserver其實(shí)就是把傳進(jìn)來的Consumer包裝成一個(gè)Observer(看清不是Observable!Observer是訂閱者)炊甲,內(nèi)部就是簡單的在各個(gè)階段調(diào)用我們傳進(jìn)去的Consumer的accpet方法泥彤。
Observer其實(shí)只是個(gè)接口,里面定義了接收到被觀察者(Observable)發(fā)出的事件時(shí)卿啡,訂閱者(Observer)應(yīng)該執(zhí)行的方法:
public interface Observer<T> {
void onSubscribe(Disposable d);
void onNext(T t);
void onError(Throwable e);
void onComplete();
}
接著就是直接調(diào)用了subscribeActual方法吟吝。剛才我們在上述的步驟也說了,這個(gè)方法是Observable的抽象方法颈娜。
其實(shí)到這里我們可以看出剑逃,整個(gè)步驟通過對象的嵌套,形成了一條完整的鏈官辽。
逆向逐級訂閱
跟蹤subscribe
按照我們剛才的案例蛹磺,到最后subscribe
方法的調(diào)用關(guān)系應(yīng)該是這樣的:
ObservableObserveOn.subscribe(LambdaObserver)。
所以我們跟進(jìn)看一下ObservableObserveOn.subscribe方法的實(shí)現(xiàn):
@Override
protected void subscribeActual(Observer<? super T> observer) {
//省略部分代碼
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
可以看到同仆,這里通過Scheduler創(chuàng)建了一個(gè)wroker對象萤捆,然后調(diào)用了source(上一級)的subscribe方法,并通過已有的observer對象生成一個(gè)ObserveOnObserver(注意是Observer)對象作為傳參俗批。
看到這里也大概知道套路了= =和剛才一樣俗或,會一直沿著整條鏈返回,一個(gè)一個(gè)訂閱對應(yīng)Observable并生成新的嵌套的Observer扶镀。
我們依舊跟著看看蕴侣,ObservableObserveOn.subscribe之后是ObservableSubscribeOn.subscribe:
@Override
public void subscribeActual(final Observer<? super T> s) {
//將上一級傳進(jìn)來的訂閱者包裝為線程安全的原子變量
//SubscribeOnObserver只是簡單的包裝,這里不展開
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//先在當(dāng)前線程執(zhí)行訂閱者的onSubscribe方法
s.onSubscribe(parent);
//然后在指定的線程中執(zhí)行source(上一級)的subscribe
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
source.subscribe(parent);
}
}));
}
根據(jù)我們最開始的業(yè)務(wù)邏輯臭觉,我們這里的scheduler應(yīng)該對應(yīng)IO線程昆雀,也就是說往下執(zhí)行的subscribe操作都是執(zhí)行再IO線程中的辱志。(現(xiàn)在是逆向遍歷剛才建立的observable鏈。)
緊接著ObservableDoOnEach.subscribe:
@Override
public void subscribeActual(Observer<? super T> t) {
source.subscribe(new ObservableDoOnEach.DoOnEachObserver<T>(t, onNext, onError, onComplete, onAfterTerminate));
}
可以看到狞膘,這里也是封裝了我們傳進(jìn)去的Consumer參數(shù)揩懒,直接調(diào)用了上一級的source.subscribe方法。
= =那么就接著往下看挽封。應(yīng)該來到了ObservableMap.subscribe方法了已球。
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new ObservableMap.MapObserver<T, U>(t, function));
}
可以看到也是封裝了我們傳進(jìn)去的Function參數(shù),然后調(diào)用上一級source.subscribe辅愿,也就是ObservableCreate.subscribe智亮,也就到了鏈的一開始。
我們跟進(jìn)看看ObservableCreate.subscribe:
@Override
protected void subscribeActual(Observer<? super T> observer) {
//首先是創(chuàng)建了CreateEmitter對象点待,這個(gè)類有沒有覺得特別眼熟- -
ObservableCreate.CreateEmitter<T> parent = new ObservableCreate.CreateEmitter<T>(observer);
//然后調(diào)用了訂閱者observer的onSubscribe方法
//這里的訂閱者來自剛才的map操作
observer.onSubscribe(parent);
try {
//調(diào)用上一級source的subscribe方法
//顯然- -沒有上一級了阔蛉,這里的source就是我們一開始創(chuàng)建的observer對象,調(diào)用的subscribe方法也就是我們調(diào)用的login()方法的地方
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
//捕獲異常
parent.onError(ex);
}
}
終于回到了第一級癞埠,可以看到状原,一樣的封裝了observer訂閱者,(這里的訂閱者來自map操作)苗踪,然后調(diào)用了source.subscribe方法颠区,(- -看到這里不知道你們還記不記得source來自哪- -看下面代碼)這個(gè)source來自我們一開始調(diào)用Observable.create時(shí)傳進(jìn)來的參數(shù),而subscribe方法就是我們一開始執(zhí)行l(wèi)ogin()方法的地方通铲。
Observable.create(new ObservableOnSubscribe<LoginApiResult>() {
@Override
public void subscribe(ObservableEmitter<LoginApiResult> e) throws Exception {
e.onNext(login());
}
}) //調(diào)用登錄接口
……省略
也就是說毕莱,在剛才所有的逆序遍歷過程中,下一級的Observable會生成的對應(yīng)的Observer訂閱上一級的source测暗。
執(zhí)行任務(wù)鏈
接下來就是激動人心的執(zhí)行我們定義的任務(wù)了央串。(md終于- -)
= =在分析前,先重新看一下我們剛才的業(yè)務(wù)邏輯:
Observable.create(new ObservableOnSubscribe<LoginApiResult>() {
@Override
public void subscribe(ObservableEmitter<LoginApiResult> e) throws Exception {
e.onNext(login());
}
}) //調(diào)用登錄接口
.map(new Function<LoginApiBean, UserInfoBean>() {
@Override
protected UserInfoBean decode(LoginApiBean loginApiBean) {
//處理登錄結(jié)果碗啄,返回UserInfo
if (loginApiBean.isSuccess()) {
return loginApiBean.getUserInfoBean();
} else {
throw new RequestFailException("獲取網(wǎng)絡(luò)請求失敗");
}
}
})
.doOnNext(new Consumer<UserInfoBean>() { //保存登錄結(jié)果UserInfo
@Override
public void accept(@NonNull UserInfoBean bean) throws Exception {
saveUserInfo(bean);
}
})
.subscribeOn(Schedulers.io()) //調(diào)度線程
.observeOn(AndroidSchedulers.mainThread()) //調(diào)度線程
.subscribe(new Consumer<UserInfoBean>() {
@Override
public void accept(@NonNull UserInfoBean bean) throws Exception {LoginApiBean
//整個(gè)請求成功,根據(jù)獲取的UserInfo更新對應(yīng)的View
showSuccessView(bean);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
//請求失敗稳摄,顯示對應(yīng)的View
showFailView();
}
});
一趟創(chuàng)建稚字,一趟逆向訂閱,我們又回到了最開始的地方厦酬。我們剛才分析到胆描,ObservableCreate會執(zhí)行我們定義的方法。
所以就來到了這段代碼:
public void subscribe(ObservableEmitter<LoginApiResult> e) throws Exception {
e.onNext(login());
}
行- -就是login仗阅,就是調(diào)用ObservableEmitter.onNext方法昌讲。我們跟進(jìn):
public final class ObservableCreate<T> extends Observable<T> {
protected void subscribeActual(Observer<? super T> observer) {
//可以看到,這里傳入的Observer參數(shù)是來自下一級的訂閱者
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//省略一堆- -
}
//省略繼承關(guān)系
static final class CreateEmitter<T> {
//保存訂閱者
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
//省略判空
if (!isDisposed()) {
//調(diào)用訂閱者的onNext方法
observer.onNext(t);
}
}
}
}
可以看到吧减噪,簡單的執(zhí)行一些判斷后短绸,就調(diào)用了訂閱者的onNext方法车吹,而通過上面的代碼,我們可以看到observer來自于subscribe時(shí)調(diào)用構(gòu)造函數(shù)的傳參醋闭,而通過上述的分析窄驹,我們知道,這里的訂閱者來自下一級证逻,也就是map操作生成的訂閱者乐埠。這里很自然的進(jìn)入了map操作。
(后面不再貼出observer的來源)
我們再往下看到MapObserver:
@Override
public void onNext(T t) {
//省略一些細(xì)節(jié)上的判斷
U v;
//mapper就是我們new 的function對象
v = mapper.apply(t)
actual.onNext(v);
}
可以看到囚企,這里調(diào)用了我們定義的apply方法丈咐,獲得了新的對象,然后調(diào)用了下一級訂閱者的onNext方法龙宏。
嘿嘿嘿看到這里大概就知道執(zhí)行任務(wù)鏈的套路了棵逊。嵌套的調(diào)用下一級的onNext方法。
我們先繼續(xù)往下看烦衣,來到了DoOnEachObserver中:
@Override
public void onNext(T t) {
onNext.accept(t);
actual.onNext(t);
}
bingo歹河!基本上和我們猜想的一樣~accept方法就是我們定義的doOnNext的操作啊~
再接著往下來到SubscribeOnObserver:
@Override
public void onNext(T t) {
actual.onNext(t);
}
這貨更直接。花吟。直接就調(diào)過去了- -(這里涉及到Scheduler的線程調(diào)度秸歧,后面再補(bǔ)充)
快到重點(diǎn)了,再看一下ObserveOnObserver:
@Override
public void onNext(T t) {
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
擦衅澈,這貨邏輯賊復(fù)雜键菱。。畢竟在這里進(jìn)行了線程調(diào)度今布。暫時(shí)不深入经备。
只需要知道:這貨把任務(wù)提交給了Scheduler中的worker。等到任務(wù)結(jié)束獲取到結(jié)果后會調(diào)用下一級的onNext方法部默。
強(qiáng)行來到最后一層了~
這里的Observer就是我們調(diào)用subscribe時(shí)傳入的Observer啦~
那就是調(diào)用:
@Override
public void accept(@NonNull UserInfoBean bean) throws Exception {LoginApiBean
//整個(gè)請求成功侵蒙,根據(jù)獲取的UserInfo更新對應(yīng)的View
showSuccessView(bean);
}
行了- -走完整個(gè)流程了。傅蹂。相信看到這里就能大致理解Rx的流程怎么走了(也沒有想象的那么復(fù)雜嘛~)
在剛才的遍歷訂閱后纷闺,每一步操作都會通知對應(yīng)的Observer,從而完成整調(diào)任務(wù)鏈份蝴。
執(zhí)行任務(wù)鏈.png
總結(jié)
總結(jié)一下:
- 創(chuàng)建任務(wù)鏈犁功,每一步都會返回對應(yīng)的Observable對象。
- 逆向逐級訂閱婚夫。每一步都會生成對應(yīng)的Observer對上一步生成的Observable進(jìn)行訂閱
- 執(zhí)行任務(wù)鏈浸卦。執(zhí)行任務(wù)鏈之后,每一步都會通知對應(yīng)的Observer案糙,從而完成整調(diào)任務(wù)鏈限嫌。
嘿嘿嘿靴庆,感覺整個(gè)流程也沒有想想中的難~對Rx的理解又更上一層了。
= =呃萤皂。寫到這發(fā)現(xiàn)篇幅略長撒穷。Scheduler的解析還是另起一篇文章吧,挖個(gè)坑先裆熙。
[筆者仍為Android初學(xué)者端礼。如有解釋錯誤的地方,歡迎評論區(qū)指正探討]