Rxjava---操作符篇---組合 / 合并操作符

原文鏈接:
Android RxJava操作符詳解 系列:組合 / 合并操作符
Android RxJava 實戰(zhàn)系列:從磁盤 / 內(nèi)存緩存中 獲取緩存數(shù)據(jù)
Android RxJava 實戰(zhàn)講解:合并數(shù)據(jù)源 & 同時展示數(shù)據(jù)
Android RxJava 實戰(zhàn)系列:聯(lián)合判斷

image.png

目錄

image

作用

組合多個被觀察者&合并需要發(fā)送的事件

類型

image

應用場景&對應操作符 介紹

3.1 組合多個被觀察者

concat()/concatArray()

  • 作用:組合多個被觀察者一起發(fā)送數(shù)據(jù),合并后 按發(fā)送順序串行執(zhí)行

二者區(qū)別:組合被觀察者的數(shù)量糙及,即concat()組合被觀察者數(shù)量≤4個,而concatArray()則可>4個

  • 具體使用
// concat():組合多個被觀察者(≤4個)一起發(fā)送數(shù)據(jù)
        // 注:串行執(zhí)行
        Observable.concat(Observable.just(1, 2, 3),
                           Observable.just(4, 5, 6),
                           Observable.just(7, 8, 9),
                           Observable.just(10, 11, 12))
                  .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對Error事件作出響應");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對Complete事件作出響應");
                    }
                });

// concatArray():組合多個被觀察者一起發(fā)送數(shù)據(jù)(可>4個)
        // 注:串行執(zhí)行
        Observable.concatArray(Observable.just(1, 2, 3),
                           Observable.just(4, 5, 6),
                           Observable.just(7, 8, 9),
                           Observable.just(10, 11, 12),
                           Observable.just(13, 14, 15))
                  .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對Error事件作出響應");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對Complete事件作出響應");
                    }
                });
image.png

merge()/mergeArray()

  • 作用:組合多個被觀察者一起發(fā)送事件,合并后 按時間線并行執(zhí)行
  Observable.merge(
                Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS), // 從0開始發(fā)送景醇、共發(fā)送3個數(shù)據(jù)铛只、第1次事件延遲發(fā)送時間 = 1s、間隔時間 = 1s
                Observable.intervalRange(2, 3, 1, 1, TimeUnit.SECONDS)) // 從2開始發(fā)送、共發(fā)送3個數(shù)據(jù)厚棵、第1次事件延遲發(fā)送時間 = 1s尤仍、間隔時間 = 1s
                  .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Long value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對Error事件作出響應");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對Complete事件作出響應");
                    }
                });

兩個被觀察者發(fā)送事件并行執(zhí)行箫津,輸出結(jié)果 = 0,2 -> 1,3 -> 2,4

concatDelayError() / mergeDelayError()

image.png

a. 無使用concatDelayError()的情況

Observable.concat(
                Observable.create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {

                        emitter.onNext(1);
                        emitter.onNext(2);
                        emitter.onNext(3);
                        emitter.onError(new NullPointerException()); // 發(fā)送Error事件,因為無使用concatDelayError,所以第2個Observable將不會發(fā)送事件
                        emitter.onComplete();
                    }
                }),
                Observable.just(4, 5, 6))
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }
                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對Error事件作出響應");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對Complete事件作出響應");
                    }
                });

測試結(jié)果:第1個被觀察者發(fā)送Error事件后苏遥,第2個被觀察者則不會繼續(xù)發(fā)送事件


image.png
<-- 使用了concatDelayError()的情況 -->
Observable.concatArrayDelayError(
                Observable.create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {

                        emitter.onNext(1);
                        emitter.onNext(2);
                        emitter.onNext(3);
                        emitter.onError(new NullPointerException()); // 發(fā)送Error事件饼拍,因為使用了concatDelayError,所以第2個Observable將會發(fā)送事件田炭,等發(fā)送完畢后师抄,再發(fā)送錯誤事件
                        emitter.onComplete();
                    }
                }),
                Observable.just(4, 5, 6))
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }
                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對Error事件作出響應");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對Complete事件作出響應");
                    }
                });

測試結(jié)果:第1個被觀察者的Error事件將在第2個被觀察者發(fā)送完事件后再繼續(xù)發(fā)送


image.png

3.2合并多個事件

該類型的操作符主要是對多個被觀察者中的事件進行合并處理

Zip()

  • 作用
    合并 多個被觀察者(Observable)發(fā)送的事件,生成一個新的事件序列(即組合過后的事件序列)教硫,并最終發(fā)送
  • 特別注意
    1.事件組合方式 = 嚴格按照原先事件序列 進行對位合并
    2.最終合并的事件數(shù)量 = 多個被觀察者(Observable)中數(shù)量最少的數(shù)量
<-- 創(chuàng)建第1個被觀察者 -->
        Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "被觀察者1發(fā)送了事件1");
                emitter.onNext(1);
                // 為了方便展示效果叨吮,所以在發(fā)送事件后加入2s的延遲
                Thread.sleep(1000);

                Log.d(TAG, "被觀察者1發(fā)送了事件2");
                emitter.onNext(2);
                Thread.sleep(1000);

                Log.d(TAG, "被觀察者1發(fā)送了事件3");
                emitter.onNext(3);
                Thread.sleep(1000);

                emitter.onComplete();
            }
        }).subscribeOn(Schedulers.io()); // 設(shè)置被觀察者1在工作線程1中工作

<-- 創(chuàng)建第2個被觀察者 -->
        Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                Log.d(TAG, "被觀察者2發(fā)送了事件A");
                emitter.onNext("A");
                Thread.sleep(1000);

                Log.d(TAG, "被觀察者2發(fā)送了事件B");
                emitter.onNext("B");
                Thread.sleep(1000);

                Log.d(TAG, "被觀察者2發(fā)送了事件C");
                emitter.onNext("C");
                Thread.sleep(1000);

                Log.d(TAG, "被觀察者2發(fā)送了事件D");
                emitter.onNext("D");
                Thread.sleep(1000);

                emitter.onComplete();
            }
        }).subscribeOn(Schedulers.newThread());// 設(shè)置被觀察者2在工作線程2中工作
        // 假設(shè)不作線程控制,則該兩個被觀察者會在同一個線程中工作瞬矩,即發(fā)送事件存在先后順序茶鉴,而不是同時發(fā)送

<-- 使用zip變換操作符進行事件合并 -->
// 注:創(chuàng)建BiFunction對象傳入的第3個參數(shù) = 合并后數(shù)據(jù)的數(shù)據(jù)類型
        Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
            @Override
            public String apply(Integer integer, String string) throws Exception {
                return  integer + string;
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
            }

            @Override
            public void onNext(String value) {
                Log.d(TAG, "最終接收到的事件 =  " + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });
  • 特別注意:
    盡管被觀察者2的事件D沒有事件與其合并,但還是會繼續(xù)發(fā)送
    若在被觀察者1 & 被觀察者2的事件序列最后發(fā)送onComplete()事件景用,則被觀察者2的事件D也不會發(fā)送


    image.png

    image.png

combineLatest()

  • 作用
    當兩個Observables中的任何一個發(fā)送了數(shù)據(jù)后涵叮,將先發(fā)送了數(shù)據(jù)的Observables 的最新(最后)一個數(shù)據(jù) 與 另外一個Observable發(fā)送的每個數(shù)據(jù)結(jié)合,最終基于該函數(shù)的結(jié)果發(fā)送數(shù)據(jù)

與Zip()的區(qū)別:Zip() = 按個數(shù)合并伞插,即1對1合并割粮;CombineLatest() = 按時間合并,即在同一個時間點上合并

Observable.combineLatest(
                    Observable.just(1L, 2L, 3L), // 第1個發(fā)送數(shù)據(jù)事件的Observable
                    Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS), // 第2個發(fā)送數(shù)據(jù)事件的Observable:從0開始發(fā)送媚污、共發(fā)送3個數(shù)據(jù)舀瓢、第1次事件延遲發(fā)送時間 = 1s、間隔時間 = 1s
                    new BiFunction<Long, Long, Long>() {
                @Override
                public Long apply(Long o1, Long o2) throws Exception {
                    // o1 = 第1個Observable發(fā)送的最新(最后)1個數(shù)據(jù)
                    // o2 = 第2個Observable發(fā)送的每1個數(shù)據(jù)
                    Log.e(TAG, "合并的數(shù)據(jù)是: "+ o1 + " "+ o2);
                    return o1 + o2;
                    // 合并的邏輯 = 相加
                    // 即第1個Observable發(fā)送的最后1個數(shù)據(jù) 與 第2個Observable發(fā)送的每1個數(shù)據(jù)進行相加
                }
            }).subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long s) throws Exception {
                    Log.e(TAG, "合并的結(jié)果是: "+s);
                }
            });

reduce()

  • 作用
    把被觀察者需要發(fā)送的事件聚合成1個事件 & 發(fā)送
    聚合的邏輯根據(jù)需求撰寫杠步,但本質(zhì)都是前2個數(shù)據(jù)聚合氢伟,然后與后1個數(shù)據(jù)繼續(xù)進行聚合,依次類推
Observable.just(1,2,3,4)
                .reduce(new BiFunction<Integer, Integer, Integer>() {
                    // 在該復寫方法中復寫聚合的邏輯
                    @Override
                    public Integer apply(@NonNull Integer s1, @NonNull Integer s2) throws Exception {
                        Log.e(TAG, "本次計算的數(shù)據(jù)是: "+s1 +" 乘 "+ s2);
                        return s1 * s2;
                        // 本次聚合的邏輯是:全部數(shù)據(jù)相乘起來
                        // 原理:第1次取前2個數(shù)據(jù)相乘幽歼,之后每次獲取到的數(shù)據(jù) = 返回的數(shù)據(jù)x原始下1個數(shù)據(jù)每
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer s) throws Exception {
                Log.e(TAG, "最終計算的結(jié)果是: "+s);

            }
        });
image.png

collect()

  • 作用
    將被觀察者Observable發(fā)送的數(shù)據(jù)事件收集到一個數(shù)據(jù)結(jié)構(gòu)里
Observable.just(1, 2, 3 ,4, 5, 6)
                .collect(
                        // 1. 創(chuàng)建數(shù)據(jù)結(jié)構(gòu)(容器)朵锣,用于收集被觀察者發(fā)送的數(shù)據(jù)
                        new Callable<ArrayList<Integer>>() {
                            @Override
                            public ArrayList<Integer> call() throws Exception {
                                return new ArrayList<>();
                            }
                            // 2. 對發(fā)送的數(shù)據(jù)進行收集
                        }, new BiConsumer<ArrayList<Integer>, Integer>() {
                            @Override
                            public void accept(ArrayList<Integer> list, Integer integer)
                                    throws Exception {
                                // 參數(shù)說明:list = 容器,integer = 后者數(shù)據(jù)
                                list.add(integer);
                                // 對發(fā)送的數(shù)據(jù)進行收集
                            }
                        }).subscribe(new Consumer<ArrayList<Integer>>() {
            @Override
            public void accept(@NonNull ArrayList<Integer> s) throws Exception {
                Log.e(TAG, "本次發(fā)送的數(shù)據(jù)是: "+s);

            }
        });

3.3 發(fā)送事件前追加發(fā)送事件

startWith() / startWithArray()

  • 作用
    在一個被觀察者發(fā)送事件前甸私,追加發(fā)送一些數(shù)據(jù) / 一個新的被觀察者
<-- 在一個被觀察者發(fā)送事件前诚些,追加發(fā)送一些數(shù)據(jù) -->
        // 注:追加數(shù)據(jù)順序 = 后調(diào)用先追加
        Observable.just(4, 5, 6)
                  .startWith(0)  // 追加單個數(shù)據(jù) = startWith()
                  .startWithArray(1, 2, 3) // 追加多個數(shù)據(jù) = startWithArray()
                  .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對Error事件作出響應");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對Complete事件作出響應");
                    }
                });


<-- 在一個被觀察者發(fā)送事件前,追加發(fā)送被觀察者 & 發(fā)送數(shù)據(jù) -->
        // 注:追加數(shù)據(jù)順序 = 后調(diào)用先追加
        Observable.just(4, 5, 6)
                .startWith(Observable.just(1, 2, 3))
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對Error事件作出響應");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對Complete事件作出響應");
                    }
                });
image.png
image.png

3.4 統(tǒng)計發(fā)送事件數(shù)量

count()

  • 作用
    統(tǒng)計被觀察者發(fā)送事件的數(shù)量
// 注:返回結(jié)果 = Long類型
        Observable.just(1, 2, 3, 4)
                  .count()
                  .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.e(TAG, "發(fā)送的事件數(shù)量 =  "+aLong);

                    }
                });
image.png

4. 實際開發(fā)需求案例

組合 / 合并操作符的常見實際需求:

  1. 從緩存(磁盤皇型、內(nèi)存)中獲取緩存數(shù)據(jù)
  2. 合并數(shù)據(jù)源
  3. 聯(lián)合判斷

4.1 從緩存(磁盤诬烹、內(nèi)存)中獲取緩存數(shù)據(jù)

  • 需求場景


    image.png
  • 功能說明


    image.png
// 該2變量用于模擬內(nèi)存緩存 & 磁盤緩存中的數(shù)據(jù)
        String memoryCache = null;
        String diskCache = "從磁盤緩存中獲取數(shù)據(jù)";

        /*
         * 設(shè)置第1個Observable:檢查內(nèi)存緩存是否有該數(shù)據(jù)的緩存
         **/
        Observable<String> memory = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {

                // 先判斷內(nèi)存緩存有無數(shù)據(jù)
                if (memoryCache != null) {
                    // 若有該數(shù)據(jù),則發(fā)送
                    emitter.onNext(memoryCache);
                } else {
                    // 若無該數(shù)據(jù)弃鸦,則直接發(fā)送結(jié)束事件
                    emitter.onComplete();
                }

            }
        });

        /*
         * 設(shè)置第2個Observable:檢查磁盤緩存是否有該數(shù)據(jù)的緩存
         **/
        Observable<String> disk = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {

                // 先判斷磁盤緩存有無數(shù)據(jù)
                if (diskCache != null) {
                    // 若有該數(shù)據(jù)绞吁,則發(fā)送
                    emitter.onNext(diskCache);
                } else {
                    // 若無該數(shù)據(jù),則直接發(fā)送結(jié)束事件
                    emitter.onComplete();
                }

            }
        });

        /*
         * 設(shè)置第3個Observable:通過網(wǎng)絡(luò)獲取數(shù)據(jù)
         **/
        Observable<String> network = Observable.just("從網(wǎng)絡(luò)中獲取數(shù)據(jù)");
        // 此處僅作網(wǎng)絡(luò)請求的模擬


        /*
         * 通過concat() 和 firstElement()操作符實現(xiàn)緩存功能
         **/

        // 1. 通過concat()合并memory唬格、disk家破、network 3個被觀察者的事件(即檢查內(nèi)存緩存颜说、磁盤緩存 & 發(fā)送網(wǎng)絡(luò)請求)
        //    并將它們按順序串聯(lián)成隊列
        Observable.concat(memory, disk, network)
                // 2. 通過firstElement(),從串聯(lián)隊列中取出并發(fā)送第1個有效事件(Next事件)汰聋,即依次判斷檢查memory门粪、disk、network
                .firstElement()
                // 即本例的邏輯為:
                // a. firstElement()取出第1個事件 = memory烹困,即先判斷內(nèi)存緩存中有無數(shù)據(jù)緩存玄妈;由于memoryCache = null,即內(nèi)存緩存中無數(shù)據(jù)髓梅,所以發(fā)送結(jié)束事件(視為無效事件)
                // b. firstElement()繼續(xù)取出第2個事件 = disk拟蜻,即判斷磁盤緩存中有無數(shù)據(jù)緩存:由于diskCache ≠ null,即磁盤緩存中有數(shù)據(jù)枯饿,所以發(fā)送Next事件(有效事件)
                // c. 即firstElement()已發(fā)出第1個有效事件(disk事件)瞭郑,所以停止判斷。

                // 3. 觀察者訂閱
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept( String s) throws Exception {
                        Log.d(TAG,"最終獲取的數(shù)據(jù)來源 =  "+ s);
                    }
                });
image.png

4.2 合并數(shù)據(jù)源 & 同時展示數(shù)據(jù)

  • 需求場景:


    image.png
  • 功能說明
    同時向2個數(shù)據(jù)源獲取數(shù)據(jù) -> 合并數(shù)據(jù) -> 統(tǒng)一展示到客戶端
  • 具體實現(xiàn)
    此處采用Merge() & Zip()操作符進行講解鸭你,其中:
    Merge()例子 :實現(xiàn)較為簡單的從(網(wǎng)絡(luò) + 本地)獲取數(shù)據(jù) & 統(tǒng)一展示
    Zip()例子:結(jié)合Retrofit 與RxJava,實現(xiàn)較為復雜的合并2個網(wǎng)絡(luò)請求向2個服務(wù)器獲取數(shù)據(jù) & 統(tǒng)一展示

4.2.1 采用 Merge()操作符

// 用于存放最終展示的數(shù)據(jù)
        String result = "數(shù)據(jù)源來自 = " ;

        /*
         * 設(shè)置第1個Observable:通過網(wǎng)絡(luò)獲取數(shù)據(jù)
         * 此處僅作網(wǎng)絡(luò)請求的模擬
         **/
        Observable<String> network = Observable.just("網(wǎng)絡(luò)");

        /*
         * 設(shè)置第2個Observable:通過本地文件獲取數(shù)據(jù)
         * 此處僅作本地文件請求的模擬
         **/
        Observable<String> file = Observable.just("本地文件");


        /*
         * 通過merge()合并事件 & 同時發(fā)送事件
         **/
        Observable.merge(network, file)
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(String value) {
                        Log.d(TAG, "數(shù)據(jù)源有: "+ value  );
                        result += value + "+";
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對Error事件作出響應");
                    }

                    // 接收合并事件后擒权,統(tǒng)一展示
                    @Override
                    public void onComplete() {
                        Log.d(TAG, "獲取數(shù)據(jù)完成");
                        Log.d(TAG,  result  );
                    }
                });
image.png

4.2.2 采用Zip()操作符

  • 功能說明
    從不同數(shù)據(jù)源(2個服務(wù)器)獲取數(shù)據(jù)袱巨,即 合并網(wǎng)絡(luò)請求的發(fā)送
    統(tǒng)一顯示結(jié)果
public class MainActivity extends AppCompatActivity {


        private static final String TAG = "Rxjava";


        // 定義Observable接口類型的網(wǎng)絡(luò)請求對象
        Observable<Translation1> observable1;
        Observable<Translation2> observable2;

        @Override
        protected void onCreate(Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            setContentView(R.layout.activity_main);

            // 步驟1:創(chuàng)建Retrofit對象
            Retrofit retrofit = new Retrofit.Builder()
                    .baseUrl("http://fy.iciba.com/") // 設(shè)置 網(wǎng)絡(luò)請求 Url
                    .addConverterFactory(GsonConverterFactory.create()) //設(shè)置使用Gson解析(記得加入依賴)
                    .addCallAdapterFactory(RxJava2CallAdapterFactory.create()) // 支持RxJava
                    .build();

            // 步驟2:創(chuàng)建 網(wǎng)絡(luò)請求接口 的實例
            GetRequest_Interface request = retrofit.create(GetRequest_Interface.class);

            // 步驟3:采用Observable<...>形式 對 2個網(wǎng)絡(luò)請求 進行封裝
            observable1 = request.getCall().subscribeOn(Schedulers.io()); // 新開線程進行網(wǎng)絡(luò)請求1
            observable2 = request.getCall_2().subscribeOn(Schedulers.io());// 新開線程進行網(wǎng)絡(luò)請求2
            // 即2個網(wǎng)絡(luò)請求異步 & 同時發(fā)送

            // 步驟4:通過使用Zip()對兩個網(wǎng)絡(luò)請求進行合并再發(fā)送
            Observable.zip(observable1, observable2,
                    new BiFunction<Translation1, Translation2, String>() {
                        // 注:創(chuàng)建BiFunction對象傳入的第3個參數(shù) = 合并后數(shù)據(jù)的數(shù)據(jù)類型
                        @Override
                        public String apply(Translation1 translation1,
                                            Translation2 translation2) throws Exception {
                            return translation1.show() + " & " +translation2.show();
                        }
                    }).observeOn(AndroidSchedulers.mainThread()) // 在主線程接收 & 處理數(shù)據(jù)
                    .subscribe(new Consumer<String>() {
                        // 成功返回數(shù)據(jù)時調(diào)用
                        @Override
                        public void accept(String combine_infro) throws Exception {
                            // 結(jié)合顯示2個網(wǎng)絡(luò)請求的數(shù)據(jù)結(jié)果
                            Log.d(TAG, "最終接收到的數(shù)據(jù)是:" + combine_infro);
                        }
                    }, new Consumer<Throwable>() {
                        // 網(wǎng)絡(luò)請求錯誤時調(diào)用
                        @Override
                        public void accept(Throwable throwable) throws Exception {
                            System.out.println("登錄失敗");
                        }
                    });
        }
}

4.3 聯(lián)合判斷

  • 需求場景
    需要同時對多個事件進行聯(lián)合判斷

如,填寫表單時碳抄,需要表單里所有信息(姓名愉老、年齡、職業(yè)等)都被填寫后剖效,才允許點擊 “提交” 按鈕

  • 具體實現(xiàn)
    采用 RxJava 組合操作符中的combineLatest() 實現(xiàn)
  /*
         * 步驟1:設(shè)置控件變量 & 綁定
         **/
        EditText name,age,job;
        Button list;

        name = (EditText) findViewById(R.id.name);
        age = (EditText) findViewById(R.id.age);
        job = (EditText) findViewById(R.id.job);
        list = (Button) findViewById(R.id.list);

        /*
         * 步驟2:為每個EditText設(shè)置被觀察者嫉入,用于發(fā)送監(jiān)聽事件
         * 說明:
         * 1. 此處采用了RxBinding:RxTextView.textChanges(name) = 對對控件數(shù)據(jù)變更進行監(jiān)聽(功能類似TextWatcher),需要引入依賴:compile 'com.jakewharton.rxbinding2:rxbinding:2.0.0'
         * 2. 傳入EditText控件璧尸,點擊任1個EditText撰寫時咒林,都會發(fā)送數(shù)據(jù)事件 = Function3()的返回值(下面會詳細說明)
         * 3. 采用skip(1)原因:跳過 一開始EditText無任何輸入時的空值
         **/
        Observable<CharSequence> nameObservable = RxTextView.textChanges(name).skip(1);
        Observable<CharSequence> ageObservable = RxTextView.textChanges(age).skip(1);
        Observable<CharSequence> jobObservable = RxTextView.textChanges(job).skip(1);

        /*
         * 步驟3:通過combineLatest()合并事件 & 聯(lián)合判斷
         **/
        Observable.combineLatest(nameObservable,ageObservable,jobObservable,new Function3<CharSequence, CharSequence, CharSequence,Boolean>() {
            @Override
            public Boolean apply(@NonNull CharSequence charSequence, @NonNull CharSequence charSequence2, @NonNull CharSequence charSequence3) throws Exception {

                /*
                 * 步驟4:規(guī)定表單信息輸入不能為空
                 **/
                // 1. 姓名信息
                boolean isUserNameValid = !TextUtils.isEmpty(name.getText()) ;
                // 除了設(shè)置為空,也可設(shè)置長度限制
                // boolean isUserNameValid = !TextUtils.isEmpty(name.getText()) && (name.getText().toString().length() > 2 && name.getText().toString().length() < 9);

                // 2. 年齡信息
                boolean isUserAgeValid = !TextUtils.isEmpty(age.getText());
                // 3. 職業(yè)信息
                boolean isUserJobValid = !TextUtils.isEmpty(job.getText()) ;

                /*
                 * 步驟5:返回信息 = 聯(lián)合判斷爷光,即3個信息同時已填寫垫竞,"提交按鈕"才可點擊
                 **/
                return isUserNameValid && isUserAgeValid && isUserJobValid;
            }

                }).subscribe(new Consumer<Boolean>() {
            @Override
            public void accept(Boolean s) throws Exception {
                /*
                 * 步驟6:返回結(jié)果 & 設(shè)置按鈕可點擊樣式
                 **/
                Log.e(TAG, "提交按鈕是否可點擊: "+s);
                list.setEnabled(s);
            }
        });

原文鏈接:
Android RxJava操作符詳解 系列:組合 / 合并操作符
Android RxJava 實戰(zhàn)系列:從磁盤 / 內(nèi)存緩存中 獲取緩存數(shù)據(jù)
Android RxJava 實戰(zhàn)講解:合并數(shù)據(jù)源 & 同時展示數(shù)據(jù)
Android RxJava 實戰(zhàn)系列:聯(lián)合判斷

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市蛀序,隨后出現(xiàn)的幾起案子欢瞪,更是在濱河造成了極大的恐慌,老刑警劉巖徐裸,帶你破解...
    沈念sama閱讀 212,599評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件遣鼓,死亡現(xiàn)場離奇詭異,居然都是意外死亡重贺,警方通過查閱死者的電腦和手機骑祟,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,629評論 3 385
  • 文/潘曉璐 我一進店門回懦,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人曾我,你說我怎么就攤上這事粉怕。” “怎么了抒巢?”我有些...
    開封第一講書人閱讀 158,084評論 0 348
  • 文/不壞的土叔 我叫張陵贫贝,是天一觀的道長。 經(jīng)常有香客問我蛉谜,道長稚晚,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,708評論 1 284
  • 正文 為了忘掉前任型诚,我火速辦了婚禮客燕,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘狰贯。我一直安慰自己也搓,他們只是感情好,可當我...
    茶點故事閱讀 65,813評論 6 386
  • 文/花漫 我一把揭開白布涵紊。 她就那樣靜靜地躺著傍妒,像睡著了一般。 火紅的嫁衣襯著肌膚如雪摸柄。 梳的紋絲不亂的頭發(fā)上颤练,一...
    開封第一講書人閱讀 50,021評論 1 291
  • 那天,我揣著相機與錄音驱负,去河邊找鬼嗦玖。 笑死,一個胖子當著我的面吹牛跃脊,可吹牛的內(nèi)容都是我干的宇挫。 我是一名探鬼主播,決...
    沈念sama閱讀 39,120評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼酪术,長吁一口氣:“原來是場噩夢啊……” “哼捞稿!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起拼缝,我...
    開封第一講書人閱讀 37,866評論 0 268
  • 序言:老撾萬榮一對情侶失蹤娱局,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后咧七,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體衰齐,經(jīng)...
    沈念sama閱讀 44,308評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,633評論 2 327
  • 正文 我和宋清朗相戀三年继阻,在試婚紗的時候發(fā)現(xiàn)自己被綠了耻涛。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片废酷。...
    茶點故事閱讀 38,768評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖抹缕,靈堂內(nèi)的尸體忽然破棺而出澈蟆,到底是詐尸還是另有隱情,我是刑警寧澤卓研,帶...
    沈念sama閱讀 34,461評論 4 333
  • 正文 年R本政府宣布趴俘,位于F島的核電站,受9級特大地震影響奏赘,放射性物質(zhì)發(fā)生泄漏寥闪。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 40,094評論 3 317
  • 文/蒙蒙 一磨淌、第九天 我趴在偏房一處隱蔽的房頂上張望疲憋。 院中可真熱鬧,春花似錦梁只、人聲如沸缚柳。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,850評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽喂击。三九已至,卻和暖如春淤翔,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背佩谷。 一陣腳步聲響...
    開封第一講書人閱讀 32,082評論 1 267
  • 我被黑心中介騙來泰國打工旁壮, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人谐檀。 一個月前我還...
    沈念sama閱讀 46,571評論 2 362
  • 正文 我出身青樓抡谐,卻偏偏與公主長得像,于是被迫代替她去往敵國和親桐猬。 傳聞我的和親對象是個殘疾皇子麦撵,可洞房花燭夜當晚...
    茶點故事閱讀 43,666評論 2 350

推薦閱讀更多精彩內(nèi)容