Rxjava原理解析

先看RxJava的簡單使用及解析:

        //產(chǎn)生事件并返回Single對象,Single和Observable是一樣的作用,不一樣的地方是Single只回調(diào)onSuccess(),不會回調(diào)onError()
        Single<Integer> just = Single.just(1);
                //訂閱
        just.subscribe(new SingleObserver<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                
            }

            @Override
            public void onSuccess(Integer integer) {
                        
            }

            @Override
            public void onError(Throwable e) {

            }
        });

以上是Rxjava的一個簡單示例疗我,第一步通過Single.just()發(fā)送一個事件,第二部調(diào)用subscribe()訂閱事件赠堵。

先看第一步Single.just():

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Single<T> just(final T item) {
        ObjectHelper.requireNonNull(item, "value is null");
        //構建一個SingleJust返回
        return RxJavaPlugins.onAssembly(new SingleJust<T>(item));
    }

        //鉤子方法残家,如果設置了onSingleAssembly兽间,那么可以通過apply()對數(shù)據(jù)進行再加工,默認沒設置onSingleAssembly价匠,不必關注此方法
    public static <T> Single<T> onAssembly(@NonNull Single<T> source) {
        Function<? super Single, ? extends Single> f = onSingleAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

可以看到只是構造了一個SingleJust()對象当纱,傳入事件并返回。

那么第一步簡單總結一下為:構造新的被觀察者SingleJust踩窖,并傳遞事件

public final class SingleJust<T> extends Single<T> {

    final T value;

    public SingleJust(T value) {
        this.value = value;
    }



}

可以看到SingleJust在構造里記錄了數(shù)據(jù)坡氯,并且它還有一個subscribeActual(),這個方法里回調(diào)了onSubscribe()及onSuccess();

那么簡單總結一下,第一步Single.just(1)執(zhí)行后毙石,構造里一個SingleJust對象廉沮,并存儲了數(shù)據(jù)。所以返回的Single其實已經(jīng)是SingleJust對象了徐矩,那么被觀察者對象已經(jīng)切換為SingleJust

再看第二步just.subscribe():

    @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(SingleObserver<? super T> observer) {
                //鉤子方法滞时,和上面一樣,不必關注滤灯。
        observer = RxJavaPlugins.onSubscribe(this, observer);
        try {
            //調(diào)用subscribeActual坪稽,傳入observer
            subscribeActual(observer);
        } catch (Throwable ex) {
            throw npe;
        }
    }

第二部是訂閱,它實際執(zhí)行了subscribeActual(),并傳入了下游的觀察者鳞骤,由于這里已經(jīng)是SingleJust窒百,那么執(zhí)行的就是它的subscrieActual():

    @Override
    protected void subscribeActual(SingleObserver<? super T> observer) {
        //此方法非核心,忽略
        observer.onSubscribe(Disposables.disposed());
        //執(zhí)行觀察者的onSuccess()
        observer.onSuccess(value);
    }

可以看到豫尽,最后實際執(zhí)行了下游觀察者的onSuccess()

總結:

以上就是RxJava的一個簡單模型過程篙梢,當調(diào)用一個操作符后,被觀察者對象就會改變美旧,同時事件從上往下傳遞渤滞。當產(chǎn)生訂閱關系時,下游觀察者在上游被觀察者的subscribeActual()中獲取結果**

為了更好理解復雜的情況榴嗅,這里再明確一個概念妄呕,下游觀察者是當前被觀察者調(diào)用subscribe()時傳入的參數(shù)。

Rxjava中其他復雜的操作嗽测,其實就是操作符的改變及累加绪励,只是在此模型上增加中間過程,如處理數(shù)據(jù)等。

稍復雜的RxJava使用及解析

        Single.just("1")
                .map(new Function<String, Integer>() {
                    @Override
                    public Integer apply(String s) throws Exception {
                        return Integer.valueOf(s);
                    }
                }).subscribe(new SingleObserver<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onSuccess(Integer integer) {

            }

            @Override
            public void onError(Throwable e) {

            }
        });

Single.just("1")這一行在上面分析過了疏魏,主要是把Single轉換為SingleJust停做,并傳遞數(shù)據(jù)。那么再看第二行map():

    public final <R> Single<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        //創(chuàng)建了SingleMap對象蠢护,并傳入了this,和mapper參數(shù)雅宾,this就是SingleJust對象,mapper處理數(shù)據(jù)用葵硕,用來回調(diào)apply()
        return RxJavaPlugins.onAssembly(new SingleMap<T, R>(this, mapper));
    }

可以看到眉抬,這一步又把SingleJust轉換為了一個SingleMap,那么現(xiàn)在被觀察者就轉換為了SingleMap懈凹,那么現(xiàn)在可以把被觀察者SingleJust稱為上游被觀察者蜀变,同時把上游被觀察者和處理數(shù)據(jù)的mapper,存入了SingleMap中。

第三行開始訂閱介评,在這里被觀察者已經(jīng)變成了SingleMap库北,那么會執(zhí)行它的subscribeActual():

    @Override
    protected void subscribeActual(final SingleObserver<? super R> t) {
        //這里的source其實是SingleJust,然后訂閱了觀察者MapSingleObserver们陆,由于觀察者是由下方的訂閱產(chǎn)生的寒瓦,這里稱觀察者t為下游觀察者
        source.subscribe(new MapSingleObserver<T, R>(t, mapper));
    }

如果這里回調(diào)觀察者的onSuccess(),那么就和之前總結的沒有區(qū)別坪仇,但是這里調(diào)用了source.subscribe(),由上游被觀察者再次訂閱杂腰。也就是說,產(chǎn)生訂閱關系后椅文,事件開始從下往上傳遞喂很,對數(shù)據(jù)進行處理。

根據(jù)之前的代碼source.subscribe()皆刺,最終會執(zhí)行source的subscribeActual()少辣,再執(zhí)行到觀察者的onSuccess(),也就是MapSingleObserver的onSuccess():

static final class MapSingleObserver<T, R> implements SingleObserver<T> {
                //觀察者
        final SingleObserver<? super R> t;
                //處理數(shù)據(jù)對象
        final Function<? super T, ? extends R> mapper;
        @Override
        public void onSuccess(T value) {
            try {
                //這里調(diào)用apply()處理數(shù)據(jù)。
                v = ObjectHelper.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
              //如果出錯調(diào)用onError()
                onError(e);
                return;
            }
                    //處理完成羡蛾,下游觀察者調(diào)用onSuccess()
            t.onSuccess(v);
        }
 
}

可以看到在onSuccess()中漓帅,先對數(shù)據(jù)進行處理,如果數(shù)據(jù)沒出錯痴怨,那么再把數(shù)據(jù)交給最終的觀察者煎殷。

總結:根據(jù)上面的代碼再完善一下簡單模型,首先當發(fā)送一個事件后腿箩,事件開始從上游往下傳遞,傳遞過程中會由當前調(diào)用的操作符暫代被觀察者功能劣摇,當傳遞完成后代表設置完成珠移。然后開始訂閱事件,發(fā)起訂閱后那么事件開始從下往上傳遞,對之前的設置進行處理钧惧,最后處理完后成暇韧,事件再從上往下傳遞給最終的觀察者。那么最終模型就是:

從上往下(開始傳遞事件浓瞪,初始化操作符)—>從下往下(連接操作符懈玻,并進行部分設置)->從上往下(根據(jù)操作符對數(shù)據(jù)進行處理并返回)

根據(jù)模型,分析完整示例

        Single.just("1")
                .map(new Function<String, Integer>() {
                    @Override
                    public Integer apply(String s) throws Exception {
                        return Integer.valueOf(s);
                    }
                })
                //切換到io線程
                .subscribeOn(Schedulers.io())
                    //切換到主線程
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new SingleObserver<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onSuccess(Integer integer) {

            }

            @Override
            public void onError(Throwable e) {

            }
        });

這里只為分析過程乾颁,忽律當前代碼并不需要切線程的操作涂乌。

第一步從上往下,初始化操作符:

map()分析過了英岭,那么直接看subscribeOn():

    public final Single<T> subscribeOn(final Scheduler scheduler) {
        //構建一個SingleSubscribeOn返回湾盒,scheduler就是要被設置的參數(shù)
        return RxJavaPlugins.onAssembly(new SingleSubscribeOn<T>(this, scheduler));
    }

根據(jù)之前的分析,那么就是subscribeOn暫時接管了被觀察者的職能诅妹,subscribeOn完成了設置罚勾,繼續(xù)看observeOn():

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Single<T> observeOn(final Scheduler scheduler) {
        //構建一個SingleObserveOn返回,scheduler就是要被設置的參數(shù)
        return RxJavaPlugins.onAssembly(new SingleObserveOn<T>(this, scheduler));
    }

根據(jù)之前的分析吭狡,那么就是observeOn暫時接管了被觀察者的職能尖殃,此時完成了第一步,事件從上到下傳遞的設置功能划煮。

第二步從下往上送丰,開始訂閱,連接操作符

調(diào)用subscribe()般此,由于最后一個暫時接管被觀察者功能的是observeOn蚪战,所以是由它進行的訂閱,那么根據(jù)之前的代碼可知铐懊,會執(zhí)行SingleObserveOn的subscribeActual():

    @Override
    protected void subscribeActual(final SingleObserver<? super T> observer) {
        //調(diào)用上游訂閱方法邀桑,這里的source是subscribeOn
        source.subscribe(new ObserveOnSingleObserver<T>(observer, scheduler));
    }

可以看到,調(diào)用了上游的subscribe()科乎,那么會再執(zhí)行上游的subscribeActual(),那么會來到SingleSubscribeOn的subscribeActual():

    @Override
    protected void subscribeActual(final SingleObserver<? super T> observer) {
        //構造一個SubscribeOnObserver
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer, source);
        observer.onSubscribe(parent);
                
        Disposable f = scheduler.scheduleDirect(parent);

        parent.task.replace(f);

    }

這里先看一下這個SubscribeOnObserver是什么:

    static final class SubscribeOnObserver<T>
    extends AtomicReference<Disposable>
    implements SingleObserver<T>, Disposable, Runnable

可以看到它時一個Runnable對象壁畸,那么接上面的第二行:

//切換線程,這里具體實現(xiàn)不必深究茅茂,只需知道是完成了切換線程操作就行
Disposable f = scheduler.scheduleDirect(parent);

可以看到捏萍,這是把上面的Runnable對象parent,傳了進去空闲。這里面其實使用了Executors進行了線程切換操作

既然是Runnable令杈,那么任務開始執(zhí)行就會觸發(fā)它的run():

        @Override
        public void run() {
          //這里的source是SingleMap
            source.subscribe(this);
        }

subscribeOn()的實際作用就是切線程, 那么它的設置就在這里生效碴倾。另外由于不管下游有什么設置逗噩,都會在這里進行切線程操作掉丽,然后再進行訂閱,那么也就可以得出一個結論:無論subscribeOn()設置多少次异雁,只有在第一次有效捶障,因為從下往上傳播最終都會回到第一次設置的地方進行切線程操作

可以看到纲刀,繼續(xù)往上傳遞项炼,那么會來到SingleMap的subscribeActual():

    @Override
    protected void subscribeActual(final SingleObserver<? super R> t) {
        //這里的source是SingleJust
        source.subscribe(new MapSingleObserver<T, R>(t, mapper));
    }

還是繼續(xù)往上傳遞,來到SingleJust的subscribeActual():

    @Override
    protected void subscribeActual(SingleObserver<? super T> observer) {
        observer.onSubscribe(Disposables.disposed());
        //這里的observer是MapSingleObserver
        observer.onSuccess(value);
    }

最終調(diào)用了觀察者的onSuccess()示绊,那么這里完成了第二部锭部,從下往上,對數(shù)據(jù)進行相應的操作處理

第三步耻台,從上往下空免,處理數(shù)據(jù)

根據(jù)之前的總結:下游觀察者是當前被觀察者調(diào)用subscribe()時傳入的數(shù)(之后的推論同理,不再強調(diào)),那么最后傳入的參數(shù)的map()操作符中的MapSingleObserver,那么就會執(zhí)行它的onSuccess():

        @Override
        public void onSuccess(T value) {
            R v;
            try {
                v = ObjectHelper.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                onError(e);
                return;
            }
                //這里的t是MapSingleObserver
            t.onSuccess(v);
        }

可以看到盆耽,這里首先調(diào)用了apply(),讓用戶對數(shù)據(jù)進行處理蹋砚,那么操作符map,已經(jīng)完全發(fā)揮了作用。繼續(xù)往下摄杂,根據(jù)之前的總結可得知t是subscribeOn()操作符中的SubscribeOnObserver坝咐,再看它的onSuccess:

        @Override
        public void onSuccess(T value) {
          //這里的downstream是ObserveOnSingleObserver
            downstream.onSuccess(value);
        }

這里沒有操作,直接向下傳遞是因為之前已經(jīng)分析過析恢,它的作用是切線程墨坚,并且已經(jīng)切過了。上面分析過映挂。那么繼續(xù)往下泽篮,downstream是observeOn()操作符的ObserveOnSingleObserver(),再看它的onSuccess::

        public void onSuccess(T value) {
            this.value = value;
            //切換線程
            Disposable d = scheduler.scheduleDirect(this);
            DisposableHelper.replace(this, d);
        }

observeOn的作用也是切換線程,所以這里也進行了切線程操作柑船,并把this作為參數(shù)帽撑,那么就是看它自身的run():

        @Override
        public void run() {
            Throwable ex = error;
            if (ex != null) {
                downstream.onError(ex);
            } else {
                 //這里的downstream是最終的觀察者
                downstream.onSuccess(value);
            }
        }

這里可以看到,切換線程后將結果交給了最終的觀察者鞍时。完成了整個流程亏拉。observeOn()的線程是多次有效的,從上面的代碼可以看出逆巍,切完線程后繼續(xù)將結果傳遞給下游觀察者及塘,假如繼續(xù)調(diào)用observeOn(),那么就會繼續(xù)切線程锐极,結果也是在下游的run()中執(zhí)行笙僚,所以切線程有效。

總結:

RxJava總體流程可以概括為以下三步灵再,復雜的操作只是增加中間環(huán)節(jié)味咳,以及中間環(huán)節(jié)的各種細化處理

  • 第一步流程-從上往下庇勃,初始化操作符,對所有操作符進行初始化
  • 第二步流程-從下往上槽驶,開始訂閱,并連接操作符鸳兽,對連接的操作符進行設置掂铐,此例中主要是異步請求時切換線程,
    • subscribeOn()的設置只有第一次生效揍异,因為在subscribeOn中會切換線程全陨,然后進行訂閱。這時的流程時從下往上衷掷,最終都會回到第一次的設置辱姨。所以之前不管切到哪個線程,最終又會由第一次的設置切回它所設置的線程戚嗅。
  • 第三步流程-從上往下雨涛,處理數(shù)據(jù),獲得數(shù)據(jù)后由各個操作符對數(shù)據(jù)進行加工處理懦胞,最終傳遞給最終的觀察者替久。
    • observeOn()每次設置都會起效果,因為observeOn()發(fā)揮作用是最后一步從上往下的過程中躏尉,所以它每一次切完下次蚯根,然后再傳遞到下一層,下一次同樣可以切線程操作胀糜。所以如果要對結果再次進行切線程操作颅拦,可多次使用observeOn()
最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市教藻,隨后出現(xiàn)的幾起案子距帅,更是在濱河造成了極大的恐慌,老刑警劉巖怖竭,帶你破解...
    沈念sama閱讀 211,817評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件锥债,死亡現(xiàn)場離奇詭異,居然都是意外死亡痊臭,警方通過查閱死者的電腦和手機哮肚,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,329評論 3 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來广匙,“玉大人允趟,你說我怎么就攤上這事⊙恢拢” “怎么了潮剪?”我有些...
    開封第一講書人閱讀 157,354評論 0 348
  • 文/不壞的土叔 我叫張陵涣楷,是天一觀的道長。 經(jīng)常有香客問我抗碰,道長狮斗,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,498評論 1 284
  • 正文 為了忘掉前任弧蝇,我火速辦了婚禮碳褒,結果婚禮上,老公的妹妹穿的比我還像新娘看疗。我一直安慰自己沙峻,他們只是感情好,可當我...
    茶點故事閱讀 65,600評論 6 386
  • 文/花漫 我一把揭開白布两芳。 她就那樣靜靜地躺著摔寨,像睡著了一般。 火紅的嫁衣襯著肌膚如雪怖辆。 梳的紋絲不亂的頭發(fā)上是复,一...
    開封第一講書人閱讀 49,829評論 1 290
  • 那天,我揣著相機與錄音疗隶,去河邊找鬼佑笋。 笑死,一個胖子當著我的面吹牛斑鼻,可吹牛的內(nèi)容都是我干的蒋纬。 我是一名探鬼主播,決...
    沈念sama閱讀 38,979評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼坚弱,長吁一口氣:“原來是場噩夢啊……” “哼蜀备!你這毒婦竟也來了?” 一聲冷哼從身側響起荒叶,我...
    開封第一講書人閱讀 37,722評論 0 266
  • 序言:老撾萬榮一對情侶失蹤碾阁,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后些楣,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體脂凶,經(jīng)...
    沈念sama閱讀 44,189評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,519評論 2 327
  • 正文 我和宋清朗相戀三年愁茁,在試婚紗的時候發(fā)現(xiàn)自己被綠了蚕钦。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,654評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡鹅很,死狀恐怖嘶居,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情促煮,我是刑警寧澤邮屁,帶...
    沈念sama閱讀 34,329評論 4 330
  • 正文 年R本政府宣布整袁,位于F島的核電站,受9級特大地震影響佑吝,放射性物質(zhì)發(fā)生泄漏坐昙。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,940評論 3 313
  • 文/蒙蒙 一迹蛤、第九天 我趴在偏房一處隱蔽的房頂上張望民珍。 院中可真熱鬧,春花似錦盗飒、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,762評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至嗜历,卻和暖如春宣渗,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背梨州。 一陣腳步聲響...
    開封第一講書人閱讀 31,993評論 1 266
  • 我被黑心中介騙來泰國打工痕囱, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人暴匠。 一個月前我還...
    沈念sama閱讀 46,382評論 2 360
  • 正文 我出身青樓鞍恢,卻偏偏與公主長得像,于是被迫代替她去往敵國和親每窖。 傳聞我的和親對象是個殘疾皇子帮掉,可洞房花燭夜當晚...
    茶點故事閱讀 43,543評論 2 349

推薦閱讀更多精彩內(nèi)容

  • 基本使用 添加依賴 定義Api請求接口倉庫 發(fā)起網(wǎng)絡請求 框架結構 RxJava的整體結構是一條鏈 鏈的最上游:生...
    Hsicen閱讀 1,416評論 0 1
  • 初學RxJava,對其許多的API頗感神奇窒典,所以RxJava的原理充滿了興趣蟆炊。正好最近教父大頭鬼也出了一篇RxJa...
    alighters閱讀 28,132評論 2 28
  • 前言 Rxjava是NetFlix出品的Java框架, 官方描述為 a library for composing...
    kamisamer閱讀 924評論 0 1
  • 創(chuàng)建操作符 操作符使用 基本創(chuàng)建create() 完整創(chuàng)建1個被觀察者對象(Observable) 快速創(chuàng)建瀑志,發(fā)送...
    帝王鯊kingcp閱讀 1,510評論 0 1
  • 今天下午在學校把兒子老師發(fā)的試題整理一下涩搓,沒找到部編一,就先弄了二三四和數(shù)學老師發(fā)的一份劈猪,一共17頁昧甘,感覺有點多,...
    冰園哲月閱讀 132評論 0 3