Rxjava(總結(jié))

為什么要學(xué) RxJava?

提升開(kāi)發(fā)效率缠局,降低維護(hù)成本一直是開(kāi)發(fā)團(tuán)隊(duì)永恒不變的宗旨壕吹。近兩年來(lái)國(guó)內(nèi)的技術(shù)圈子中越來(lái)越多的開(kāi)始提及 RxJava 浊闪,越來(lái)越多的應(yīng)用和面試中都會(huì)有 RxJava ,而就目前的情況概荷,Android 的網(wǎng)絡(luò)庫(kù)基本被 Retrofit + OkHttp 一統(tǒng)天下了秕岛,而配合上響應(yīng)式編程 RxJava 可謂如魚(yú)得水。想必大家肯定被近期的 Kotlin 炸開(kāi)了鍋误证,筆者也在閑暇之時(shí)去了解了一番(作為一個(gè)與時(shí)俱進(jìn)的有理想的青年怎么可能不與時(shí)俱進(jìn)继薛?),發(fā)現(xiàn)其中有個(gè)非常好的優(yōu)點(diǎn)就是簡(jiǎn)潔愈捅,支持函數(shù)式編程遏考。是的, RxJava 最大的優(yōu)點(diǎn)也是簡(jiǎn)潔蓝谨,但它不止是簡(jiǎn)潔灌具,而且是 隨著程序邏輯變得越來(lái)越復(fù)雜青团,它依然能夠保持簡(jiǎn)潔 (這貨潔身自好呀有木有)。

咳咳咖楣,要例子督笆,猛戳這里:給 Android 開(kāi)發(fā)者的 RxJava 詳解

什么是響應(yīng)式編程

上面我們提及了響應(yīng)式編程,不少新司機(jī)對(duì)它可謂一臉懵逼诱贿,那什么是響應(yīng)式編程呢娃肿?響應(yīng)式編程是一種基于異步數(shù)據(jù)流概念的編程模式。數(shù)據(jù)流就像一條河:它可以被觀測(cè)珠十,被過(guò)濾料扰,被操作,或者為新的消費(fèi)者與另外一條流合并為一條新的流焙蹭。
響應(yīng)式編程的一個(gè)關(guān)鍵概念是事件晒杈。事件可以被等待,可以觸發(fā)過(guò)程壳嚎,也可以觸發(fā)其它事件桐智。事件是唯一的以合適的方式將我們的現(xiàn)實(shí)世界映射到我們的軟件中:如果屋里太熱了我們就打開(kāi)一扇窗戶(hù)。同樣的烟馅,當(dāng)我們的天氣app從服務(wù)端獲取到新的天氣數(shù)據(jù)后,我們需要更新app上展示天氣信息的UI然磷;汽車(chē)上的車(chē)道偏移系統(tǒng)探測(cè)到車(chē)輛偏移了正常路線就會(huì)提醒駕駛者糾正郑趁,就是是響應(yīng)事件。
今天姿搜,響應(yīng)式編程最通用的一個(gè)場(chǎng)景是UI:我們的移動(dòng)App必須做出對(duì)網(wǎng)絡(luò)調(diào)用寡润、用戶(hù)觸摸輸入和系統(tǒng)彈框的響應(yīng)。在這個(gè)世界上舅柜,軟件之所以是事件驅(qū)動(dòng)并響應(yīng)的是因?yàn)楝F(xiàn)實(shí)生活也是如此梭纹。
為什么出了一個(gè)系列后還有完結(jié)版?
RxJava 這些年可謂越來(lái)越流行致份,而在去年的晚些時(shí)候發(fā)布了2.0正式版变抽。大半年已過(guò),雖然網(wǎng)上已經(jīng)出現(xiàn)了大部分的 RxJava 教程(其實(shí)細(xì)心的你還是會(huì)發(fā)現(xiàn) 1.x 的超級(jí)多)氮块,前些日子绍载,筆者花了大約兩周的閑暇之時(shí)寫(xiě)了 RxJava 2.x 系列教程,也得到了不少反饋滔蝉,其中就有不少讀者覺(jué)得每一篇的教程太短击儡,抑或是希望更多的側(cè)重適用場(chǎng)景的介紹,在這樣的大前提下蝠引,這篇完結(jié)版教程就此誕生阳谍,僅供各位新司機(jī)采納蛀柴。

開(kāi)始

RxJava 2.x 已經(jīng)按照 Reactive-Streams specification 規(guī)范完全的重寫(xiě)了,maven也被放在了 io.reactivex.rxjava2:rxjava:2.x.y 下矫夯,所以 RxJava 2.x 獨(dú)立于 RxJava 1.x 而存在名扛,而隨后官方宣布的將在一段時(shí)間后終止對(duì) RxJava 1.x 的維護(hù),所以對(duì)于熟悉 RxJava 1.x 的老司機(jī)自然可以直接看一下 2.x 的文檔和異同就能輕松上手了茧痒,而對(duì)于不熟悉的年輕司機(jī)肮韧,不要慌,本醬帶你裝逼帶你飛旺订,馬上就發(fā)車(chē)弄企,坐穩(wěn)了:https://github.com/nanchen2251/RxJava2Examples
你只需要在 build.gradle 中加上:compile 'io.reactivex.rxjava2:rxjava:2.1.1'(2.1.1為寫(xiě)此文章時(shí)的最新版本)

接口變化

RxJava 2.x 擁有了新的特性,其依賴(lài)于4個(gè)基礎(chǔ)接口区拳,它們分別是
Publisher
Subscriber
Subscription
Processor
其中最核心的莫過(guò)于 Publisher 和 Subscriber拘领。Publisher 可以發(fā)出一系列的事件,而 Subscriber 負(fù)責(zé)和處理這些事件樱调。
其中用的比較多的自然是 Publisher 的 Flowable约素,它支持背壓,有興趣的可以看一下官方對(duì)于背壓的講解
可以明顯地發(fā)現(xiàn)笆凌,RxJava 2.x 最大的改動(dòng)就是對(duì)于 backpressure 的處理圣猎,為此將原來(lái)的 Observable 拆分成了新的Observable 和Flowable,同時(shí)其他相關(guān)部分也同時(shí)進(jìn)行了拆分乞而,但令人慶幸的是送悔,是它,是它爪模,還是它欠啤,還是我們最熟悉和最喜歡的 RxJava。

觀察者模式

大家可能都知道屋灌, RxJava 以觀察者模式為骨架洁段,在 2.0 中依舊如此。
不過(guò)此次更新中共郭,出現(xiàn)了兩種觀察者模式:
Observable ( 被觀察者 ) / Observer ( 觀察者 )
Flowable (被觀察者)/ Subscriber (觀察者)

在 RxJava 2.x 中祠丝,Observable 用于訂閱 Observer,不再支持背壓(1.x 中可以使用背壓策略)落塑,而 Flowable 用于訂閱 Subscriber 纽疟, 是支持背壓(Backpressure)的。

Observable

在 RxJava 1.x 中憾赁,我們最熟悉的莫過(guò)于 Observable 這個(gè)類(lèi)了污朽,筆者在剛剛使用 RxJava 2.x 的時(shí)候,創(chuàng)建了 一個(gè) Observable龙考,瞬間一臉懵逼有木有蟆肆,居然連我們最最熟悉的 Subscriber 都沒(méi)了矾睦,取而代之的是 ObservableEmmiter,俗稱(chēng)發(fā)射器炎功。此外枚冗,由于沒(méi)有了Subscriber的蹤影,我們創(chuàng)建觀察者時(shí)需使用 Observer蛇损。而 Observer 也不是我們熟悉的那個(gè) Observer赁温,又出現(xiàn)了一個(gè) Disposable 參數(shù)帶你裝逼帶你飛。
廢話(huà)不多說(shuō)淤齐,從會(huì)用開(kāi)始股囊,還記得 RxJava 的三部曲嗎?

第一步:初始化 Observable
第二步:初始化 Observer
第三步:建立訂閱關(guān)系

Observable.create(new ObservableOnSubscribe<Integer>() { // 第一步:初始化Observable
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                Log.e(TAG, "Observable emit 1" + "\n");
                e.onNext(1);
                Log.e(TAG, "Observable emit 2" + "\n");
                e.onNext(2);
                Log.e(TAG, "Observable emit 3" + "\n");
                e.onNext(3);
                e.onComplete();
                Log.e(TAG, "Observable emit 4" + "\n" );
                e.onNext(4);
            }
        }).subscribe(new Observer<Integer>() { // 第三步:訂閱

            // 第二步:初始化Observer
            private int i;
            private Disposable mDisposable;

            @Override
            public void onSubscribe(@NonNull Disposable d) {      
                mDisposable = d;
            }

            @Override
            public void onNext(@NonNull Integer integer) {
                i++;
                if (i == 2) {
                    // 在RxJava 2.x 中更啄,新增的Disposable可以做到切斷的操作稚疹,讓Observer觀察者不再接收上游事件
                    mDisposable.dispose();
                }
            }

            @Override
            public void onError(@NonNull Throwable e) {
                Log.e(TAG, "onError : value : " + e.getMessage() + "\n" );
            }

            @Override
            public void onComplete() {
                Log.e(TAG, "onComplete" + "\n" );
            }
        });

不難看出,RxJava 2.x 與 1.x 還是存在著一些區(qū)別的祭务。首先内狗,創(chuàng)建 Observable 時(shí),回調(diào)的是 ObservableEmitter 义锥,字面意思即發(fā)射器柳沙,并且直接 throws Exception。其次缨该,在創(chuàng)建的 Observer 中偎行,也多了一個(gè)回調(diào)方法:onSubscribe,傳遞參數(shù)為Disposable贰拿,Disposable 相當(dāng)于 RxJava 1.x 中的 Subscription , 用于解除訂閱熄云∨蚋可以看到示例代碼中,在 i 自增到 2 的時(shí)候缴允,訂閱關(guān)系被切斷荚守。

07-03 14:24:11.663 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: onSubscribe : false
07-03 14:24:11.664 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: Observable emit 1
07-03 14:24:11.665 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: onNext : value : 1
07-03 14:24:11.666 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: Observable emit 2
07-03 14:24:11.667 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: onNext : value : 2
07-03 14:24:11.668 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: onNext : isDisposable : true
07-03 14:24:11.669 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: Observable emit 3
07-03 14:24:11.670 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: Observable emit 4

當(dāng)然,我們的 RxJava 2.x 也為我們保留了簡(jiǎn)化訂閱方法练般,我們可以根據(jù)需求矗漾,進(jìn)行相應(yīng)的簡(jiǎn)化訂閱,只不過(guò)傳入對(duì)象改為了 Consumer薄料。
Consumer 即消費(fèi)者敞贡,用于接收單個(gè)值,BiConsumer 則是接收兩個(gè)值摄职,F(xiàn)unction 用于變換對(duì)象誊役,Predicate 用于判斷获列。這些接口命名大多參照了 Java 8 ,熟悉 Java 8 新特性的應(yīng)該都知道意思蛔垢,這里也不再贅述击孩。

線程調(diào)度

關(guān)于線程切換這點(diǎn),RxJava 1.x 和 RxJava 2.x 的實(shí)現(xiàn)思路是一樣的鹏漆。這里簡(jiǎn)單的說(shuō)一下巩梢,以便于我們的新司機(jī)入手。
subScribeOn同 RxJava 1.x 一樣艺玲,subscribeOn 用于指定subscribe()時(shí)所發(fā)生的線程括蝠,從源碼角度可以看出,內(nèi)部線程調(diào)度是通過(guò) ObservableSubscribeOn來(lái)實(shí)現(xiàn)的板驳。

    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

ObservableSubscribeOn 的核心源碼在 subscribeActual 方法中又跛,通過(guò)代理的方式使用 SubscribeOnObserver 包裝 Observer 后,設(shè)置 Disposable 來(lái)將 subscribe 切換到 Scheduler 線程中若治。

observeOn

observeOn 方法用于指定下游 Observer 回調(diào)發(fā)生的線程慨蓝。

    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

線程切換需要注意的

RxJava 內(nèi)置的線程調(diào)度器的確可以讓我們的線程切換得心應(yīng)手,但其中也有些需要注意的地方端幼。
簡(jiǎn)單地說(shuō)礼烈,subscribeOn() 指定的就是發(fā)射事件的線程,observerOn 指定的就是訂閱者接收事件的線程婆跑。
多次指定發(fā)射事件的線程只有第一次指定的有效此熬,也就是說(shuō)多次調(diào)用 subscribeOn() 只有第一次的有效,其余的會(huì)被忽略滑进。
但多次指定訂閱者接收線程是可以的犀忱,也就是說(shuō)每調(diào)用一次 observerOn(),下游的線程就會(huì)切換一次扶关。

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                Log.e(TAG, "Observable thread is : " + Thread.currentThread().getName());
                e.onNext(1);
                e.onComplete();
            }
        }).subscribeOn(Schedulers.newThread())
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        Log.e(TAG, "After observeOn(mainThread)阴汇,Current thread is " + Thread.currentThread().getName());
                    }
                })
                .observeOn(Schedulers.io())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        Log.e(TAG, "After observeOn(io),Current thread is " + Thread.currentThread().getName());
                    }
                });

輸出:

07-03 14:54:01.177 15121-15438/com.nanchen.rxjava2examples E/RxThreadActivity: Observable thread is : RxNewThreadScheduler-1
07-03 14:54:01.178 15121-15121/com.nanchen.rxjava2examples E/RxThreadActivity: After observeOn(mainThread)节槐,Current thread is main
07-03 14:54:01.179 15121-15439/com.nanchen.rxjava2examples E/RxThreadActivity: After observeOn(io)搀庶,Current thread is RxCachedThreadScheduler-2

實(shí)例代碼中,分別用 Schedulers.newThread() 和 Schedulers.io() 對(duì)發(fā)射線程進(jìn)行切換铜异,并采用 observeOn(AndroidSchedulers.mainThread() 和 Schedulers.io() 進(jìn)行了接收線程的切換哥倔。可以看到輸出中發(fā)射線程僅僅響應(yīng)了第一個(gè) newThread揍庄,但每調(diào)用一次 observeOn() 咆蒿,線程便會(huì)切換一次,因此如果我們有類(lèi)似的需求時(shí),便知道如何處理了蜡秽。
RxJava 中府阀,已經(jīng)內(nèi)置了很多線程選項(xiàng)供我們選擇,例如有:
Schedulers.io() 代表io操作的線程, 通常用于網(wǎng)絡(luò),讀寫(xiě)文件等io密集型的操作芽突;
Schedulers.computation() 代表CPU計(jì)算密集型的操作, 例如需要大量計(jì)算的操作试浙;
Schedulers.newThread() 代表一個(gè)常規(guī)的新線程;
AndroidSchedulers.mainThread() 代表Android的主線程
這些內(nèi)置的 Scheduler 已經(jīng)足夠滿(mǎn)足我們開(kāi)發(fā)的需求寞蚌,因此我們應(yīng)該使用內(nèi)置的這些選項(xiàng)田巴,而 RxJava 內(nèi)部使用的是線程池來(lái)維護(hù)這些線程,所以效率也比較高挟秤。

操作符

關(guān)于操作符壹哺,在官方文檔中已經(jīng)做了非常完善的講解,并且筆者前面的系列教程中也著重講解了絕大多數(shù)的操作符作用艘刚,這里受于篇幅限制管宵,就不多做贅述,只挑選幾個(gè)進(jìn)行實(shí)際情景的講解攀甚。

map

map 操作符可以將一個(gè) Observable 對(duì)象通過(guò)某種關(guān)系轉(zhuǎn)換為另一個(gè)Observable 對(duì)象箩朴。在 2.x 中和 1.x 中作用幾乎一致,不同點(diǎn)在于:2.x 將 1.x 中的 Func1 和 Func2 改為了 Function 和 BiFunction秋度。

采用 map 操作符進(jìn)行網(wǎng)絡(luò)數(shù)據(jù)解析

想必大家都知道炸庞,很多時(shí)候我們?cè)谑褂?RxJava 的時(shí)候總是和 Retrofit 進(jìn)行結(jié)合使用,而為了方便演示荚斯,這里我們就暫且采用 OkHttp3 進(jìn)行演示埠居,配合 map,doOnNext 事期,線程切換進(jìn)行簡(jiǎn)單的網(wǎng)絡(luò)請(qǐng)求:
1)通過(guò) Observable.create() 方法滥壕,調(diào)用 OkHttp 網(wǎng)絡(luò)請(qǐng)求;
2)通過(guò) map 操作符集合 gson兽泣,將 Response 轉(zhuǎn)換為 bean 類(lèi)捏浊;
3)通過(guò) doOnNext() 方法,解析 bean 中的數(shù)據(jù)撞叨,并進(jìn)行數(shù)據(jù)庫(kù)存儲(chǔ)等操作;
4)調(diào)度線程浊洞,在子線程中進(jìn)行耗時(shí)操作任務(wù)牵敷,在主線程中更新 UI ;
5)通過(guò) subscribe()法希,根據(jù)請(qǐng)求成功或者失敗來(lái)更新 UI 枷餐。

Observable.create(new ObservableOnSubscribe<Response>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Response> e) throws Exception {
                Builder builder = new Builder()
                        .url("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512")
                        .get();
                Request request = builder.build();
                Call call = new OkHttpClient().newCall(request);
                Response response = call.execute();
                e.onNext(response);
            }
        }).map(new Function<Response, MobileAddress>() {
                    @Override
                    public MobileAddress apply(@NonNull Response response) throws Exception {
                        if (response.isSuccessful()) {
                            ResponseBody body = response.body();
                            if (body != null) {
                                Log.e(TAG, "map:轉(zhuǎn)換前:" + response.body());
                                return new Gson().fromJson(body.string(), MobileAddress.class);
                            }
                        }
                        return null;
                    }
                }).observeOn(AndroidSchedulers.mainThread())
                .doOnNext(new Consumer<MobileAddress>() {
                    @Override
                    public void accept(@NonNull MobileAddress s) throws Exception {
                        Log.e(TAG, "doOnNext: 保存成功:" + s.toString() + "\n");
                    }
                }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<MobileAddress>() {
                    @Override
                    public void accept(@NonNull MobileAddress data) throws Exception {
                        Log.e(TAG, "成功:" + data.toString() + "\n");
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {
                        Log.e(TAG, "失敗:" + throwable.getMessage() + "\n");
                    }
                });

concat

concat 可以做到不交錯(cuò)的發(fā)射兩個(gè)甚至多個(gè) Observable 的發(fā)射事件苫亦,并且只有前一個(gè) Observable 終止(onComplete) 后才會(huì)訂閱下一個(gè) Observable毛肋。
采用 concat 操作符先讀取緩存再通過(guò)網(wǎng)絡(luò)請(qǐng)求獲取數(shù)據(jù)
想必在實(shí)際應(yīng)用中怨咪,很多時(shí)候(對(duì)數(shù)據(jù)操作不敏感時(shí))都需要我們先讀取緩存的數(shù)據(jù),如果緩存沒(méi)有數(shù)據(jù)润匙,再通過(guò)網(wǎng)絡(luò)請(qǐng)求獲取诗眨,隨后在主線程更新我們的UI。
concat 操作符簡(jiǎn)直就是為我們這種需求量身定做孕讳。
利用 concat 的必須調(diào)用 onComplete 后才能訂閱下一個(gè) Observable 的特性匠楚,我們就可以先讀取緩存數(shù)據(jù),倘若獲取到的緩存數(shù)據(jù)不是我們想要的厂财,再調(diào)用 onComplete() 以執(zhí)行獲取網(wǎng)絡(luò)數(shù)據(jù)的Observable芋簿,如果緩存數(shù)據(jù)能應(yīng)我們所需,則直接調(diào)用 onNext()璃饱,防止過(guò)度的網(wǎng)絡(luò)請(qǐng)求与斤,浪費(fèi)用戶(hù)的流量。

Observable<FoodList> cache = Observable.create(new ObservableOnSubscribe<FoodList>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<FoodList> e) throws Exception {
                Log.e(TAG, "create當(dāng)前線程:"+Thread.currentThread().getName() );
                FoodList data = CacheManager.getInstance().getFoodListData();

                // 在操作符 concat 中荚恶,只有調(diào)用 onComplete 之后才會(huì)執(zhí)行下一個(gè) Observable
                if (data != null){ // 如果緩存數(shù)據(jù)不為空撩穿,則直接讀取緩存數(shù)據(jù)窟蓝,而不讀取網(wǎng)絡(luò)數(shù)據(jù)
                    isFromNet = false;
                    Log.e(TAG, "\nsubscribe: 讀取緩存數(shù)據(jù):" );
                    runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            mRxOperatorsText.append("\nsubscribe: 讀取緩存數(shù)據(jù):\n");
                        }
                    });

                    e.onNext(data);
                }else {
                    isFromNet = true;
                    runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            mRxOperatorsText.append("\nsubscribe: 讀取網(wǎng)絡(luò)數(shù)據(jù):\n");
                        }
                    });
                    Log.e(TAG, "\nsubscribe: 讀取網(wǎng)絡(luò)數(shù)據(jù):" );
                    e.onComplete();
                }


            }
        });

        Observable<FoodList> network = Rx2AndroidNetworking.get("http://www.tngou.net/api/food/list")
                .addQueryParameter("rows",10+"")
                .build()
                .getObjectObservable(FoodList.class);


        // 兩個(gè) Observable 的泛型應(yīng)當(dāng)保持一致

        Observable.concat(cache,network)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<FoodList>() {
                    @Override
                    public void accept(@NonNull FoodList tngouBeen) throws Exception {
                        Log.e(TAG, "subscribe 成功:"+Thread.currentThread().getName() );
                        if (isFromNet){
                            mRxOperatorsText.append("accept : 網(wǎng)絡(luò)獲取數(shù)據(jù)設(shè)置緩存: \n");
                            Log.e(TAG, "accept : 網(wǎng)絡(luò)獲取數(shù)據(jù)設(shè)置緩存: \n"+tngouBeen.toString() );
                            CacheManager.getInstance().setFoodListData(tngouBeen);
                        }

                        mRxOperatorsText.append("accept: 讀取數(shù)據(jù)成功:" + tngouBeen.toString()+"\n");
                        Log.e(TAG, "accept: 讀取數(shù)據(jù)成功:" + tngouBeen.toString());
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {
                        Log.e(TAG, "subscribe 失敗:"+Thread.currentThread().getName() );
                        Log.e(TAG, "accept: 讀取數(shù)據(jù)失斎硖洹:"+throwable.getMessage() );
                        mRxOperatorsText.append("accept: 讀取數(shù)據(jù)失敗:"+throwable.getMessage()+"\n");
                    }
                });

有時(shí)候我們的緩存可能還會(huì)分為 memory 和 disk 律秃,實(shí)際上都差不多嗤栓,無(wú)非是多寫(xiě)點(diǎn) Observable 冻河,然后通過(guò) concat 合并即可。

flatMap 實(shí)現(xiàn)多個(gè)網(wǎng)絡(luò)請(qǐng)求依次依賴(lài)

想必這種情況也在實(shí)際情況中比比皆是茉帅,例如用戶(hù)注冊(cè)成功后需要自動(dòng)登錄叨叙,我們只需要先通過(guò)注冊(cè)接口注冊(cè)用戶(hù)信息,注冊(cè)成功后馬上調(diào)用登錄接口進(jìn)行自動(dòng)登錄即可堪澎。
我們的 flatMap 恰好解決了這種應(yīng)用場(chǎng)景擂错,flatMap 操作符可以將一個(gè)發(fā)射數(shù)據(jù)的 Observable 變換為多個(gè) Observables ,然后將它們發(fā)射的數(shù)據(jù)合并后放到一個(gè)單獨(dú)的 Observable樱蛤,利用這個(gè)特性钮呀,我們很輕松地達(dá)到了我們的需求。

Rx2AndroidNetworking.get("http://www.tngou.net/api/food/list")
                .addQueryParameter("rows", 1 + "")
                .build()
                .getObjectObservable(FoodList.class) // 發(fā)起獲取食品列表的請(qǐng)求昨凡,并解析到FootList
                .subscribeOn(Schedulers.io())        // 在io線程進(jìn)行網(wǎng)絡(luò)請(qǐng)求
                .observeOn(AndroidSchedulers.mainThread()) // 在主線程處理獲取食品列表的請(qǐng)求結(jié)果
                .doOnNext(new Consumer<FoodList>() {
                    @Override
                    public void accept(@NonNull FoodList foodList) throws Exception {
                        // 先根據(jù)獲取食品列表的響應(yīng)結(jié)果做一些操作
                        Log.e(TAG, "accept: doOnNext :" + foodList.toString());
                        mRxOperatorsText.append("accept: doOnNext :" + foodList.toString()+"\n");
                    }
                })
                .observeOn(Schedulers.io()) // 回到 io 線程去處理獲取食品詳情的請(qǐng)求
                .flatMap(new Function<FoodList, ObservableSource<FoodDetail>>() {
                    @Override
                    public ObservableSource<FoodDetail> apply(@NonNull FoodList foodList) throws Exception {
                        if (foodList != null && foodList.getTngou() != null && foodList.getTngou().size() > 0) {
                            return Rx2AndroidNetworking.post("http://www.tngou.net/api/food/show")
                                    .addBodyParameter("id", foodList.getTngou().get(0).getId() + "")
                                    .build()
                                    .getObjectObservable(FoodDetail.class);
                        }
                        return null;

                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<FoodDetail>() {
                    @Override
                    public void accept(@NonNull FoodDetail foodDetail) throws Exception {
                        Log.e(TAG, "accept: success :" + foodDetail.toString());
                        mRxOperatorsText.append("accept: success :" + foodDetail.toString()+"\n");
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {
                        Log.e(TAG, "accept: error :" + throwable.getMessage());
                        mRxOperatorsText.append("accept: error :" + throwable.getMessage()+"\n");
                    }
                });

善用 zip 操作符爽醋,實(shí)現(xiàn)多個(gè)接口數(shù)據(jù)共同更新 UI

在實(shí)際應(yīng)用中,我們極有可能會(huì)在一個(gè)頁(yè)面顯示的數(shù)據(jù)來(lái)源于多個(gè)接口便脊,這時(shí)候我們的 zip 操作符為我們排憂(yōu)解難蚂四。
zip 操作符可以將多個(gè) Observable 的數(shù)據(jù)結(jié)合為一個(gè)數(shù)據(jù)源再發(fā)射出去。

Observable<MobileAddress> observable1 = Rx2AndroidNetworking.get("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512")
                .build()
                .getObjectObservable(MobileAddress.class);

        Observable<CategoryResult> observable2 = Network.getGankApi()
                .getCategoryData("Android",1,1);

        Observable.zip(observable1, observable2, new BiFunction<MobileAddress, CategoryResult, String>() {
            @Override
            public String apply(@NonNull MobileAddress mobileAddress, @NonNull CategoryResult categoryResult) throws Exception {
                return "合并后的數(shù)據(jù)為:手機(jī)歸屬地:"+mobileAddress.getResult().getMobilearea()+"人名:"+categoryResult.results.get(0).who;
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(@NonNull String s) throws Exception {
                        Log.e(TAG, "accept: 成功:" + s+"\n");
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {
                        Log.e(TAG, "accept: 失敗:" + throwable+"\n");
                    }
                });

采用 interval 操作符實(shí)現(xiàn)心跳間隔任務(wù)

想必即時(shí)通訊等需要輪訓(xùn)的任務(wù)在如今的 APP 中已是很常見(jiàn)遂赠,而 RxJava 2.x 的 interval 操作符可謂完美地解決了我們的疑惑久妆。
這里就簡(jiǎn)單的意思一下輪訓(xùn)。

private Disposable mDisposable;
    @Override
    protected void doSomething() {
        mDisposable = Flowable.interval(1, TimeUnit.SECONDS)
                .doOnNext(new Consumer<Long>() {
                    @Override
                    public void accept(@NonNull Long aLong) throws Exception {
                        Log.e(TAG, "accept: doOnNext : "+aLong );
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(@NonNull Long aLong) throws Exception {
                        Log.e(TAG, "accept: 設(shè)置文本 :"+aLong );
                        mRxOperatorsText.append("accept: 設(shè)置文本 :"+aLong +"\n");
                    }
                });
    }

    /**
     * 銷(xiāo)毀時(shí)停止心跳
     */
    @Override
    protected void onDestroy() {
        super.onDestroy();
        if (mDisposable != null){
            mDisposable.dispose();
        }
    }

RxJava 1.x 如何平滑升級(jí)到 RxJava 2.x跷睦?

由于 RxJava 2.x 變化較大無(wú)法直接升級(jí)筷弦,幸運(yùn)的是,官方為我們提供了 RxJava2Interrop 這個(gè)庫(kù)送讲,可以方便地把 RxJava 1.x 升級(jí)到 RxJava 2.x奸笤,或者將 RxJava 2.x 轉(zhuǎn)回到 RxJava 1.x。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末哼鬓,一起剝皮案震驚了整個(gè)濱河市监右,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌异希,老刑警劉巖健盒,帶你破解...
    沈念sama閱讀 212,454評(píng)論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異称簿,居然都是意外死亡扣癣,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,553評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門(mén)憨降,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)父虑,“玉大人,你說(shuō)我怎么就攤上這事授药∈亢浚” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 157,921評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵悔叽,是天一觀的道長(zhǎng)莱衩。 經(jīng)常有香客問(wèn)我,道長(zhǎng)娇澎,這世上最難降的妖魔是什么笨蚁? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,648評(píng)論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮趟庄,結(jié)果婚禮上括细,老公的妹妹穿的比我還像新娘。我一直安慰自己戚啥,他們只是感情好勒极,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,770評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著虑鼎,像睡著了一般。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上炫彩,一...
    開(kāi)封第一講書(shū)人閱讀 49,950評(píng)論 1 291
  • 那天匾七,我揣著相機(jī)與錄音,去河邊找鬼江兢。 笑死昨忆,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的杉允。 我是一名探鬼主播邑贴,決...
    沈念sama閱讀 39,090評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼叔磷!你這毒婦竟也來(lái)了拢驾?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 37,817評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤改基,失蹤者是張志新(化名)和其女友劉穎繁疤,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體秕狰,經(jīng)...
    沈念sama閱讀 44,275評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡稠腊,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,592評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了鸣哀。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片架忌。...
    茶點(diǎn)故事閱讀 38,724評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖我衬,靈堂內(nèi)的尸體忽然破棺而出叹放,到底是詐尸還是另有隱情,我是刑警寧澤低飒,帶...
    沈念sama閱讀 34,409評(píng)論 4 333
  • 正文 年R本政府宣布许昨,位于F島的核電站,受9級(jí)特大地震影響褥赊,放射性物質(zhì)發(fā)生泄漏糕档。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,052評(píng)論 3 316
  • 文/蒙蒙 一拌喉、第九天 我趴在偏房一處隱蔽的房頂上張望速那。 院中可真熱鬧,春花似錦尿背、人聲如沸端仰。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,815評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)荔烧。三九已至吱七,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間鹤竭,已是汗流浹背踊餐。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,043評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留臀稚,地道東北人吝岭。 一個(gè)月前我還...
    沈念sama閱讀 46,503評(píng)論 2 361
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像吧寺,于是被迫代替她去往敵國(guó)和親窜管。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,627評(píng)論 2 350

推薦閱讀更多精彩內(nèi)容

  • 本篇文章介主要紹RxJava中操作符是以函數(shù)作為基本單位稚机,與響應(yīng)式編程作為結(jié)合使用的幕帆,對(duì)什么是操作、操作符都有哪些...
    嘎啦果安卓獸閱讀 2,851評(píng)論 0 10
  • Android 自定義View的各種姿勢(shì)1 Activity的顯示之ViewRootImpl詳解 Activity...
    passiontim閱讀 171,846評(píng)論 25 707
  • 我從去年開(kāi)始使用 RxJava 抒钱,到現(xiàn)在一年多了蜓肆。今年加入了 Flipboard 后,看到 Flipboard 的...
    Jason_andy閱讀 5,462評(píng)論 7 62
  • 摘錄自查理于1986年6月13日在哈佛學(xué)校畢業(yè)典禮上的演講: “聽(tīng)不進(jìn)勸告的人得不到幫助谋币≌萄铮” “經(jīng)驗(yàn)是一所好學(xué)校,...
    壹顆大橙子閱讀 161評(píng)論 0 0
  • 網(wǎng)上查了大量的資料蕾额,都是說(shuō)去修改my.cnf文件早芭,但是在IOS系統(tǒng)中根本沒(méi)有這個(gè)文件 解決方法1: 其中有個(gè)人說(shuō)修...
    古佛青燈度流年閱讀 642評(píng)論 0 0