RxJava2 源碼解析——流程

RxJava——目前最熱門的響應(yīng)式函數(shù)編程框架杂抽。
筆者也是初涉Rx,所以打算通過這篇文章來理解Rx的操作流程,加深自己對Rx的理解艾少。
本文不涉及RxJava的入門使用,如有需有:
關(guān)于RxJava的入門推薦:拋物線大佬的精品——給 Android 開發(fā)者的 RxJava 詳解

[筆者仍為Android初學(xué)者翼悴。如有解釋錯誤的地方缚够,歡迎評論區(qū)指正探討]

本文主要根據(jù)RxJava2的源碼解析整個(gè)流程痢士。


引入

首先簡單的看一下關(guān)于RxJava的一般使用:

前提:定義了一個(gè)login接口垛孔,返回值為 { isSuccess, UserInfo}

Observable.create(new ObservableOnSubscribe<LoginApiResult>() {
            @Override
            public void subscribe(ObservableEmitter<LoginApiResult> e) throws Exception {
                e.onNext(login());
            }
        }) //調(diào)用登錄接口
        .map(new Function<LoginApiBean, UserInfoBean>() {
            @Override
            protected UserInfoBean decode(LoginApiBean loginApiBean) {
                //處理登錄結(jié)果保檐,返回UserInfo
                if (loginApiBean.isSuccess()) {
                    return loginApiBean.getUserInfoBean();
                } else {
                    throw new RequestFailException("獲取網(wǎng)絡(luò)請求失敗");
                }
            }
        })
        .doOnNext(new Consumer<UserInfoBean>() {    //保存登錄結(jié)果UserInfo
            @Override
            public void accept(@NonNull UserInfoBean bean) throws Exception {
                saveUserInfo(bean);
            }
        })
        .subscribeOn(Schedulers.io())   //調(diào)度線程
        .observeOn(AndroidSchedulers.mainThread())  //調(diào)度線程
        .subscribe(new Consumer<UserInfoBean>() {
            @Override
            public void accept(@NonNull UserInfoBean bean) throws Exception {LoginApiBean
                //整個(gè)請求成功速挑,根據(jù)獲取的UserInfo更新對應(yīng)的View
                showSuccessView(bean);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(@NonNull Throwable throwable) throws Exception {
                //請求失敗豆茫,顯示對應(yīng)的View
                showFailView();
            }
        });

為了便于理解空民,上述邏輯沒有對應(yīng)的進(jìn)行封裝狡耻,簡單的展示 RxJava 的幾個(gè)重要流程淮悼。

按著代碼的順序我們理一下步驟:

  1. 首先是通過create方法陪踩,生成一個(gè)Observable對象杖们,并傳入一個(gè)ObservableOnSubscribe對象,在其回調(diào)方法中調(diào)用login接口返回LoginApiResult肩狂,并執(zhí)行onNext

  2. 然后通過map方法將LoginApiResult轉(zhuǎn)換成UserInfoBean

  3. 緊接著是通過doOnNext方法進(jìn)行保存saveUserInfo操作

  4. 然后是線程的調(diào)度摘完,分別通過subscribeOnobserveOn將上面提到的步驟都執(zhí)行在IO線程,下面的步驟都執(zhí)行在主(UI)線程中

  5. 最后是通過Consumer根據(jù)執(zhí)行結(jié)果完成(成功或拋異常)傻谁,執(zhí)行對應(yīng)的UI更新方法孝治。

按著順序,我們一步一步的跟進(jìn)看看RxJava到底是如何實(shí)現(xiàn)這些操作的。


任務(wù)鏈的構(gòu)建

入口類——Observable

既然要了解RxJava荆秦,那么比不可少的我們應(yīng)該先來看看他的入口類篱竭,也就是Observable

public abstract class Observable<T> implements ObservableSource<T> {
    @Override  //交由子類實(shí)現(xiàn)的出現(xiàn)方法
    protected abstract void subscribeActual(Observer observer) ;

    @Override  //實(shí)現(xiàn)了ObservableSource的方法
    public final void subscribe(Observer<? super T> observer) {
        //省略一堆判空等處理 
        subscribeActual(observer);
    }

省略了一堆靜態(tài)方法之后,我們可以看到步绸,Observable是一個(gè)抽象類掺逼,實(shí)現(xiàn)了ObservableSource接口,并留了subscribeActual這個(gè)抽象方法瓤介。
ObservableSource接口只定義了subscribe一個(gè)方法吕喘,可以看到這個(gè)方法做了一些基礎(chǔ)判斷之后直接跳轉(zhuǎn)到子類的subscribeActual方法。

所以一個(gè)被觀察者被subscribe的邏輯其實(shí)是交由Observable子類來實(shí)現(xiàn)的刑桑,每個(gè)不同的被觀察者可以根據(jù)自己的需求實(shí)現(xiàn) "被訂閱" 后的操作
(賊拗口- -md氯质,總覺得這里用subscribe這個(gè)命名很奇怪,還是setSubscriber好懂)
(換而言之祠斧,每個(gè)子類可以實(shí)現(xiàn)各自被setSubscriber后的動作)

Create

接下來是如何生成一個(gè)Obserable對象闻察,我們看到create方法。
create方法便是Obserable其中一個(gè)關(guān)鍵的靜態(tài)方法琢锋。
我們跟進(jìn)看一下源碼:

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

首先第一句代碼是對傳入的對象進(jìn)行判空辕漂,內(nèi)部內(nèi)部實(shí)現(xiàn)是如果傳入null,會拋異常吴超。
接著是生成一個(gè)ObservableCreate對象钉嘹,然后將這個(gè)對象傳入RxJavaPlugins進(jìn)行組裝。
RxJavaPlugins提供了一系列的Hook function鲸阻,通過鉤子函數(shù)這種方法對RxJava的標(biāo)準(zhǔn)操作進(jìn)行加工跋涣,當(dāng)我們沒有進(jìn)行配置時(shí),默認(rèn)是直接返回原來的對象鸟悴,也就是返回ObservableCreate對象陈辱。
(為了方便講解,后續(xù)將直接忽視判空和RxJavaPlugins的代碼)

分析后可以看到遣臼,這里其實(shí)直接返回一個(gè)ObservableCreate對象性置。
我們跟進(jìn)去看一下這個(gè)對象的一些基本信息:

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

可以簡單的看到,這個(gè)類繼承了Observalbe類揍堰,并存儲了我們剛才傳進(jìn)去的ObservableOnSubscribe對象鹏浅。當(dāng)然這個(gè)類也實(shí)現(xiàn)了剛才說的subscribeActual方法,我們待會再看屏歹。

map

往下隐砸,我們調(diào)用了Obserable的map方法:
我們跟進(jìn):

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
    return new ObservableMap<T, R>(this, mapper);
}

可以看到其實(shí)是返回了一個(gè)ObservableMap對象,接受了兩個(gè)參數(shù)蝙眶,一個(gè)是this季希,在這里指的也就是剛才的ObservableCreate 褪那,還有一個(gè)Function對象,
我們再跟進(jìn)去看一下ObservableMap的基礎(chǔ)信息:

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

可以看到其實(shí)構(gòu)造方法和剛才的ObservableCreate一樣式塌,將傳入的對象進(jìn)行了存儲博敬。

不過可以發(fā)現(xiàn)- -這個(gè)類并不是繼承自O(shè)bservable,而是AbstractObservableWithUpstream峰尝,我們再跟進(jìn)看看:

// Base class for operators with a source consumable.
// 帶有source的operator的基類
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {

    /** The source consumable Observable. */
    protected final ObservableSource<T> source;

    AbstractObservableWithUpstream(ObservableSource<T> source) {
        this.source = source;
    }

    @Override
    public final ObservableSource<T> source() {
        return source;
    }
}

可以看到這個(gè)父類其實(shí)繼承了Observable偏窝,看到官方的注釋可以知道,這個(gè)類是所有接受上一級輸入的操作符(operator 如map)的基類武学,這里的邏輯并復(fù)雜祭往,其實(shí)只是簡單的封裝了一下上一級的輸入source和輸出先下一級的數(shù)據(jù)。

分析之后可以看到火窒,調(diào)用了map方法其實(shí)也是返回了一個(gè)Observable對象硼补。

doOnNext

接著往下是doOnNext,- -看到這里可以猜測也是簡單的返回一個(gè)Observable對象吧熏矿。已骇。
不管怎么說,先進(jìn)入源碼看一看:

public final Observable<T> doOnNext(Consumer<? super T> onNext) {
        return doOnEach(onNext, Functions.emptyConsumer(), Functions.EMPTY_ACTION, Functions.EMPTY_ACTION);
    }

private Observable<T> doOnEach(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Action onAfterTerminate) {
        return new ObservableDoOnEach<T>(this, onNext, onError, onComplete, onAfterTerminate);
    }

可以看到跳轉(zhuǎn)到doOnEach方法曲掰,傳入的參數(shù)除了我們傳進(jìn)來的Consumer之外疾捍,其實(shí)都是傳了空實(shí)現(xiàn)的Consumer對象。
可以看到- -真的是簡單的返回一個(gè)Observable對象栏妖。
老規(guī)矩,先看一下ObservableDoOnEach的基礎(chǔ)信息:

public final class ObservableDoOnEach<T> extends AbstractObservableWithUpstream<T, T> {
    final Consumer<? super T> onNext;
    final Consumer<? super Throwable> onError;
    final Action onComplete;
    final Action onAfterTerminate;

    public ObservableDoOnEach(ObservableSource<T> source, Consumer<? super T> onNext,
                              Consumer<? super Throwable> onError,
                              Action onComplete,
                              Action onAfterTerminate) {
        super(source);
        this.onNext = onNext;
        this.onError = onError;
        this.onComplete = onComplete;
        this.onAfterTerminate = onAfterTerminate;
    }

同樣的對所有信息進(jìn)行了保存奖恰〉踔海可以看到這個(gè)類也是繼承了AbstractObservableWithUpstream,可以接受上一層的輸入瑟啃,并向下一層輸出數(shù)據(jù)论泛。

subscribeOn & observeOn

= =接著是線程調(diào)度,其實(shí)不看也猜得出蛹屿。屁奏。。這里也是直接返回對應(yīng)的Observable對象错负。

首先看一下subscribeOn:

public final Observable<T> subscribeOn(Scheduler scheduler) {
     return new ObservableSubscribeOn<T>(this, scheduler);
}

再看一下ObserveOn:

public final Observable<T> observeOn(Scheduler scheduler) {
     return observeOn(scheduler, false, bufferSize());
}
 
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
      return new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize);
    }

可以看到坟瓢,這里分別返回了0ObservableSubscribeOnObservableObserveOn對象。照舊我們先看看這兩個(gè)個(gè)類的基礎(chǔ)信息:

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

怎么樣- -一路看到這里犹撒,也能知道他這里的基礎(chǔ)信息是什么了吧折联。

再看看另外一個(gè):

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

同樣的保存了傳進(jìn)去的基礎(chǔ)信息,我們發(fā)現(xiàn)其中共有的都保存了Scheduler對象识颊,我們先稍微看一下Scheduler

public abstract class Scheduler {

    @NonNull
    public abstract Worker createWorker();

    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }

    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();
        w.schedule(new Runnable() {
            @Override
            public void run() {
                try {
                    run.run();
                } finally {
                    w.dispose();
                }
            }
        }, delay, unit);

        return w;
    }

    public abstract static class Worker implements Disposable {

        @NonNull
        public Disposable schedule(@NonNull Runnable run) {
            return schedule(run, 0L, TimeUnit.NANOSECONDS);
        }

        @NonNull
        public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);

}

可以看到诚镰,Scheduler對外暴露了scheduleDirect方法,這個(gè)方法通過調(diào)用抽象方法createWorker得到worker對象,然后調(diào)用worker對象的schedule方法清笨,執(zhí)行runnable月杉。

看到這里大致就能猜出Scheduler對應(yīng)的邏輯啦,內(nèi)部的worker對象維護(hù)自己的線程池抠艾,然后每次執(zhí)行schedule方法時(shí)把runnable對象提交到線程池中沙合。先這樣理解,最后我們再深入一下跌帐。

subscribe

終于來到最后這個(gè)方法了- -md首懈。。谨敛。前面全都是直接返回對象究履,難道所有邏輯都在最后實(shí)現(xiàn)嗎?- -進(jìn)去看一下先脸狸。

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {
    LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);

    subscribe(ls);

    return ls;
}

public final void subscribe(Observer<? super T> observer) {
    subscribeActual(observer);

因?yàn)?strong>subscribe的重載方法很多- -這里只挑最終的兩個(gè)最仑,其中LambdaObserver其實(shí)就是把傳進(jìn)來的Consumer包裝成一個(gè)Observer(看清不是Observable!Observer是訂閱者)炊甲,內(nèi)部就是簡單的在各個(gè)階段調(diào)用我們傳進(jìn)去的Consumeraccpet方法泥彤。

Observer其實(shí)只是個(gè)接口,里面定義了接收到被觀察者(Observable)發(fā)出的事件時(shí)卿啡,訂閱者(Observer)應(yīng)該執(zhí)行的方法:

public interface Observer<T> {
    void onSubscribe(Disposable d);
    void onNext(T t);
    void onError(Throwable e);
    void onComplete();
}

接著就是直接調(diào)用了subscribeActual方法吟吝。剛才我們在上述的步驟也說了,這個(gè)方法是Observable的抽象方法颈娜。

其實(shí)到這里我們可以看出剑逃,整個(gè)步驟通過對象的嵌套,形成了一條完整的鏈官辽。

創(chuàng)建任務(wù)鏈.png

逆向逐級訂閱

跟蹤subscribe

按照我們剛才的案例蛹磺,到最后subscribe方法的調(diào)用關(guān)系應(yīng)該是這樣的:

ObservableObserveOn.subscribe(LambdaObserver)。

所以我們跟進(jìn)看一下ObservableObserveOn.subscribe方法的實(shí)現(xiàn):

@Override
protected void subscribeActual(Observer<? super T> observer) {
    //省略部分代碼
    Scheduler.Worker w = scheduler.createWorker();
    source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}

可以看到同仆,這里通過Scheduler創(chuàng)建了一個(gè)wroker對象萤捆,然后調(diào)用了source(上一級)的subscribe方法,并通過已有的observer對象生成一個(gè)ObserveOnObserver(注意是Observer)對象作為傳參俗批。

看到這里也大概知道套路了= =和剛才一樣俗或,會一直沿著整條鏈返回,一個(gè)一個(gè)訂閱對應(yīng)Observable并生成新的嵌套的Observer扶镀。

我們依舊跟著看看蕴侣,ObservableObserveOn.subscribe之后是ObservableSubscribeOn.subscribe

@Override
public void subscribeActual(final Observer<? super T> s) {
    //將上一級傳進(jìn)來的訂閱者包裝為線程安全的原子變量
    //SubscribeOnObserver只是簡單的包裝,這里不展開
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    //先在當(dāng)前線程執(zhí)行訂閱者的onSubscribe方法
    s.onSubscribe(parent);
    //然后在指定的線程中執(zhí)行source(上一級)的subscribe
    parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
        @Override
        public void run() {
            source.subscribe(parent);
        }
    }));
}

根據(jù)我們最開始的業(yè)務(wù)邏輯臭觉,我們這里的scheduler應(yīng)該對應(yīng)IO線程昆雀,也就是說往下執(zhí)行的subscribe操作都是執(zhí)行再IO線程中的辱志。(現(xiàn)在是逆向遍歷剛才建立的observable鏈。)

緊接著ObservableDoOnEach.subscribe

@Override
public void subscribeActual(Observer<? super T> t) {
    source.subscribe(new ObservableDoOnEach.DoOnEachObserver<T>(t, onNext, onError, onComplete, onAfterTerminate));
}

可以看到狞膘,這里也是封裝了我們傳進(jìn)去的Consumer參數(shù)揩懒,直接調(diào)用了上一級的source.subscribe方法。

= =那么就接著往下看挽封。應(yīng)該來到了ObservableMap.subscribe方法了已球。

@Override
public void subscribeActual(Observer<? super U> t) {
    source.subscribe(new ObservableMap.MapObserver<T, U>(t, function));
}

可以看到也是封裝了我們傳進(jìn)去的Function參數(shù),然后調(diào)用上一級source.subscribe辅愿,也就是ObservableCreate.subscribe智亮,也就到了鏈的一開始。

我們跟進(jìn)看看ObservableCreate.subscribe

@Override
protected void subscribeActual(Observer<? super T> observer) {
    //首先是創(chuàng)建了CreateEmitter對象点待,這個(gè)類有沒有覺得特別眼熟- -
    ObservableCreate.CreateEmitter<T> parent = new ObservableCreate.CreateEmitter<T>(observer);
    //然后調(diào)用了訂閱者observer的onSubscribe方法
    //這里的訂閱者來自剛才的map操作
    observer.onSubscribe(parent);

    try {
        //調(diào)用上一級source的subscribe方法
        //顯然- -沒有上一級了阔蛉,這里的source就是我們一開始創(chuàng)建的observer對象,調(diào)用的subscribe方法也就是我們調(diào)用的login()方法的地方
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        //捕獲異常
        parent.onError(ex);
    }
}

終于回到了第一級癞埠,可以看到状原,一樣的封裝了observer訂閱者,(這里的訂閱者來自map操作)苗踪,然后調(diào)用了source.subscribe方法颠区,(- -看到這里不知道你們還記不記得source來自哪- -看下面代碼)這個(gè)source來自我們一開始調(diào)用Observable.create時(shí)傳進(jìn)來的參數(shù),而subscribe方法就是我們一開始執(zhí)行l(wèi)ogin()方法的地方通铲。

Observable.create(new ObservableOnSubscribe<LoginApiResult>() {
            @Override
            public void subscribe(ObservableEmitter<LoginApiResult> e) throws Exception {
                e.onNext(login());
            }
        }) //調(diào)用登錄接口
        ……省略

也就是說毕莱,在剛才所有的逆序遍歷過程中,下一級的Observable會生成的對應(yīng)的Observer訂閱上一級的source测暗。

逆向逐級訂閱.png

執(zhí)行任務(wù)鏈

接下來就是激動人心的執(zhí)行我們定義的任務(wù)了央串。(md終于- -)

= =在分析前,先重新看一下我們剛才的業(yè)務(wù)邏輯:

Observable.create(new ObservableOnSubscribe<LoginApiResult>() {
            @Override
            public void subscribe(ObservableEmitter<LoginApiResult> e) throws Exception {
                e.onNext(login());
            }
        }) //調(diào)用登錄接口
        .map(new Function<LoginApiBean, UserInfoBean>() {
            @Override
            protected UserInfoBean decode(LoginApiBean loginApiBean) {
                //處理登錄結(jié)果碗啄,返回UserInfo
                if (loginApiBean.isSuccess()) {
                    return loginApiBean.getUserInfoBean();
                } else {
                    throw new RequestFailException("獲取網(wǎng)絡(luò)請求失敗");
                }
            }
        })
        .doOnNext(new Consumer<UserInfoBean>() {    //保存登錄結(jié)果UserInfo
            @Override
            public void accept(@NonNull UserInfoBean bean) throws Exception {
                saveUserInfo(bean);
            }
        })
        .subscribeOn(Schedulers.io())   //調(diào)度線程
        .observeOn(AndroidSchedulers.mainThread())  //調(diào)度線程
        .subscribe(new Consumer<UserInfoBean>() {
            @Override
            public void accept(@NonNull UserInfoBean bean) throws Exception {LoginApiBean
                //整個(gè)請求成功,根據(jù)獲取的UserInfo更新對應(yīng)的View
                showSuccessView(bean);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(@NonNull Throwable throwable) throws Exception {
                //請求失敗稳摄,顯示對應(yīng)的View
                showFailView();
            }
        });

一趟創(chuàng)建稚字,一趟逆向訂閱,我們又回到了最開始的地方厦酬。我們剛才分析到胆描,ObservableCreate會執(zhí)行我們定義的方法

所以就來到了這段代碼:

public void subscribe(ObservableEmitter<LoginApiResult> e) throws Exception {
    e.onNext(login());
}

行- -就是login仗阅,就是調(diào)用ObservableEmitter.onNext方法昌讲。我們跟進(jìn):

public final class ObservableCreate<T> extends Observable<T> {
    
    protected void subscribeActual(Observer<? super T> observer) {
        //可以看到,這里傳入的Observer參數(shù)是來自下一級的訂閱者
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        //省略一堆- -
    }


    //省略繼承關(guān)系
    static final class CreateEmitter<T> {
        //保存訂閱者
        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            //省略判空
            if (!isDisposed()) {
                //調(diào)用訂閱者的onNext方法
                observer.onNext(t);
            }
         }

    }
}

可以看到吧减噪,簡單的執(zhí)行一些判斷后短绸,就調(diào)用了訂閱者的onNext方法车吹,而通過上面的代碼,我們可以看到observer來自于subscribe時(shí)調(diào)用構(gòu)造函數(shù)的傳參醋闭,而通過上述的分析窄驹,我們知道,這里的訂閱者來自下一級证逻,也就是map操作生成的訂閱者乐埠。這里很自然的進(jìn)入了map操作。
(后面不再貼出observer的來源)

我們再往下看到MapObserver

@Override
public void onNext(T t) {
    //省略一些細(xì)節(jié)上的判斷
    U v;
    //mapper就是我們new 的function對象
    v = mapper.apply(t)
    actual.onNext(v);
}

可以看到囚企,這里調(diào)用了我們定義的apply方法丈咐,獲得了新的對象,然后調(diào)用了下一級訂閱者的onNext方法龙宏。

嘿嘿嘿看到這里大概就知道執(zhí)行任務(wù)鏈的套路了棵逊。嵌套的調(diào)用下一級的onNext方法。
我們先繼續(xù)往下看烦衣,來到了DoOnEachObserver中:

@Override
public void onNext(T t) {
    onNext.accept(t);
    actual.onNext(t);
}

bingo歹河!基本上和我們猜想的一樣~accept方法就是我們定義的doOnNext的操作啊~

再接著往下來到SubscribeOnObserver

@Override
public void onNext(T t) {
  actual.onNext(t);
}

這貨更直接。花吟。直接就調(diào)過去了- -(這里涉及到Scheduler的線程調(diào)度秸歧,后面再補(bǔ)充)

快到重點(diǎn)了,再看一下ObserveOnObserver

@Override
public void onNext(T t) {
    if (sourceMode != QueueDisposable.ASYNC) {
        queue.offer(t);
    }
    schedule();
}

擦衅澈,這貨邏輯賊復(fù)雜键菱。。畢竟在這里進(jìn)行了線程調(diào)度今布。暫時(shí)不深入经备。
只需要知道:這貨把任務(wù)提交給了Scheduler中的worker。等到任務(wù)結(jié)束獲取到結(jié)果后會調(diào)用下一級的onNext方法部默。

強(qiáng)行來到最后一層了~
這里的Observer就是我們調(diào)用subscribe時(shí)傳入的Observer啦~
那就是調(diào)用:

@Override
public void accept(@NonNull UserInfoBean bean) throws Exception {LoginApiBean
   //整個(gè)請求成功侵蒙,根據(jù)獲取的UserInfo更新對應(yīng)的View
   showSuccessView(bean);
}

行了- -走完整個(gè)流程了。傅蹂。相信看到這里就能大致理解Rx的流程怎么走了(也沒有想象的那么復(fù)雜嘛~)

在剛才的遍歷訂閱后纷闺,每一步操作都會通知對應(yīng)的Observer,從而完成整調(diào)任務(wù)鏈份蝴。

執(zhí)行任務(wù)鏈.png

總結(jié)

總結(jié)一下:

  1. 創(chuàng)建任務(wù)鏈犁功,每一步都會返回對應(yīng)的Observable對象。
  2. 逆向逐級訂閱婚夫。每一步都會生成對應(yīng)的Observer對上一步生成的Observable進(jìn)行訂閱
  3. 執(zhí)行任務(wù)鏈浸卦。執(zhí)行任務(wù)鏈之后,每一步都會通知對應(yīng)的Observer案糙,從而完成整調(diào)任務(wù)鏈限嫌。

嘿嘿嘿靴庆,感覺整個(gè)流程也沒有想想中的難~對Rx的理解又更上一層了。
= =呃萤皂。寫到這發(fā)現(xiàn)篇幅略長撒穷。Scheduler的解析還是另起一篇文章吧,挖個(gè)坑先裆熙。

[筆者仍為Android初學(xué)者端礼。如有解釋錯誤的地方,歡迎評論區(qū)指正探討]


最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末入录,一起剝皮案震驚了整個(gè)濱河市蛤奥,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌僚稿,老刑警劉巖凡桥,帶你破解...
    沈念sama閱讀 218,755評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異蚀同,居然都是意外死亡缅刽,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,305評論 3 395
  • 文/潘曉璐 我一進(jìn)店門蠢络,熙熙樓的掌柜王于貴愁眉苦臉地迎上來衰猛,“玉大人,你說我怎么就攤上這事刹孔》仁。” “怎么了?”我有些...
    開封第一講書人閱讀 165,138評論 0 355
  • 文/不壞的土叔 我叫張陵髓霞,是天一觀的道長卦睹。 經(jīng)常有香客問我,道長方库,這世上最難降的妖魔是什么结序? 我笑而不...
    開封第一講書人閱讀 58,791評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮纵潦,結(jié)果婚禮上笼痹,老公的妹妹穿的比我還像新娘。我一直安慰自己酪穿,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,794評論 6 392
  • 文/花漫 我一把揭開白布晴裹。 她就那樣靜靜地躺著被济,像睡著了一般。 火紅的嫁衣襯著肌膚如雪涧团。 梳的紋絲不亂的頭發(fā)上只磷,一...
    開封第一講書人閱讀 51,631評論 1 305
  • 那天经磅,我揣著相機(jī)與錄音,去河邊找鬼钮追。 笑死预厌,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的元媚。 我是一名探鬼主播轧叽,決...
    沈念sama閱讀 40,362評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼刊棕!你這毒婦竟也來了炭晒?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,264評論 0 276
  • 序言:老撾萬榮一對情侶失蹤甥角,失蹤者是張志新(化名)和其女友劉穎网严,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體嗤无,經(jīng)...
    沈念sama閱讀 45,724評論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡震束,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,900評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了当犯。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片垢村。...
    茶點(diǎn)故事閱讀 40,040評論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖灶壶,靈堂內(nèi)的尸體忽然破棺而出肝断,到底是詐尸還是另有隱情,我是刑警寧澤驰凛,帶...
    沈念sama閱讀 35,742評論 5 346
  • 正文 年R本政府宣布胸懈,位于F島的核電站,受9級特大地震影響恰响,放射性物質(zhì)發(fā)生泄漏趣钱。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,364評論 3 330
  • 文/蒙蒙 一胚宦、第九天 我趴在偏房一處隱蔽的房頂上張望首有。 院中可真熱鬧,春花似錦枢劝、人聲如沸井联。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,944評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽烙常。三九已至,卻和暖如春鹤盒,著一層夾襖步出監(jiān)牢的瞬間蚕脏,已是汗流浹背侦副。 一陣腳步聲響...
    開封第一講書人閱讀 33,060評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留驼鞭,地道東北人秦驯。 一個(gè)月前我還...
    沈念sama閱讀 48,247評論 3 371
  • 正文 我出身青樓,卻偏偏與公主長得像挣棕,于是被迫代替她去往敵國和親译隘。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,979評論 2 355

推薦閱讀更多精彩內(nèi)容