Rxjava2 Observable的結(jié)合操作詳解及實例

簡要:

需求了解:

在使用 RxJava 開發(fā)的過程中,很多時候需要結(jié)合多個條件或者數(shù)據(jù)的邏輯判斷缀去,比如登錄功能的表單驗證,實時數(shù)據(jù)比對等。這個時候我們就需要使用 RxJava 的結(jié)合操作符來完成這一需求欢搜,Rx中提供了豐富的結(jié)合操作處理的操作方法。

可用于組合多個Observables的操作方法:

  • CombineLatest:當(dāng)Observables中的任何一個發(fā)射了一個數(shù)據(jù)時谴轮,通過一個指定的函數(shù)組合每個Observable發(fā)射的最新數(shù)據(jù)炒瘟,然后發(fā)射這個函數(shù)的結(jié)果。
  • Join:只要在另一個Observable發(fā)射的數(shù)據(jù)定義的時間窗口內(nèi)第步,這個Observable發(fā)射了一條數(shù)據(jù)疮装,就結(jié)合兩個Observable發(fā)射的數(shù)據(jù)。
  • Merge:合并多個Observables的發(fā)射物粘都,可以將多個Observables的輸出合并廓推,就好像它們是一個單個的Observable一樣。
  • Zip:通過一個函數(shù)將多個Observables的發(fā)射物結(jié)合到一起翩隧,基于這個函數(shù)的結(jié)果為每個結(jié)合體嚴(yán)格按照數(shù)量以及順序發(fā)射單個數(shù)據(jù)項受啥。
  • StartWith:在數(shù)據(jù)序列的開頭插入一條指定的數(shù)據(jù)項或者數(shù)據(jù)序列。
  • SwitchOnNext:將一個發(fā)射多個Observables的Observable轉(zhuǎn)換成另一個單獨的Observable,后者發(fā)射那些Observables最新發(fā)射的Observable的數(shù)據(jù)項滚局。

1. CombineLatest

當(dāng) Observables 中的任何一個發(fā)射了數(shù)據(jù)時居暖,使用一個函數(shù)結(jié)合每個 Observable 發(fā)射的最近數(shù)據(jù)項,并且基于這個函數(shù)的結(jié)果發(fā)射數(shù)據(jù)藤肢。

CombineLatest 操作符行為類似于zip太闺,但是只有當(dāng)原始的Observable中的每一個都發(fā)射了一條數(shù)據(jù)時 zip 才發(fā)射數(shù)據(jù)。 CombineLatest 則在原始的Observable中任意一個發(fā)射了數(shù)據(jù)時發(fā)射一條數(shù)據(jù)嘁圈。當(dāng)原始Observables的任何一個發(fā)射了一條數(shù)據(jù)時省骂, CombineLatest 使用一 個函數(shù)結(jié)合它們最近發(fā)射的數(shù)據(jù),然后發(fā)射這個函數(shù)的返回值最住。

img-CombineLatest

解析: combineLatest 操作符可以結(jié)合多個Observable钞澳,可以接收 2-9 個Observable對象, 在其中原始Observables的任何一個發(fā)射了一條數(shù)據(jù)時涨缚, CombineLatest 使用一個函數(shù)結(jié)合它們最近發(fā)射的數(shù)據(jù)轧粟,然后發(fā)射這個函數(shù)的返回值。此外combineLatest 操作符還有一些接收 Iterable 脓魏, 數(shù)組方式的變體兰吟,以及其他指定參數(shù)combiner、bufferSize茂翔、和combineLatestDelayError方法等變體混蔼,在此就不在詳細(xì)展開了,有興趣的可以查看官方的相關(guān)API文檔了解珊燎。

實例代碼:

    // Observables 創(chuàng)建
    Observable<Long> observable1 = Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS);
    Observable<Long> observable2 = Observable.intervalRange(1, 5, 1, 2, TimeUnit.SECONDS);
    Observable<Long> observable3 = Observable.intervalRange(100, 5, 1, 1, TimeUnit.SECONDS);
    
    // 1. combineLatest(ObservableSource, ObservableSource [支持2-9個參數(shù)]...,  BiFunction)
    // 結(jié)合多個Observable, 當(dāng)他們其中任意一個發(fā)射了數(shù)據(jù)時惭嚣,使用函數(shù)結(jié)合他們最近發(fā)射的一項數(shù)據(jù)
    Observable.combineLatest(observable1, observable2, new BiFunction<Long, Long, String>() {
    
        @Override
        public String apply(Long t1, Long t2) throws Exception {
            System.out.println("--> apply(1) t1 = " + t1 + ", t2 = " + t2);
            if (t1 + t2 == 10) {
                return "Success";   // 滿足一定條件,返回指定的字符串
            }
            return t1 + t2 + "";    // 計算所有數(shù)據(jù)的和并轉(zhuǎn)換為字符串
        }
    }).subscribe(new Consumer<String>() {
    
        @Override
        public void accept(String t) throws Exception {
            System.out.println("----> accept combineLatest(1): " + t);
        }
    });
    
    System.out.println("--------------------------------------------------------");
    // 2. combineLatest(T1, T2, T3, Function)
    // Observables的結(jié)合
    Observable.combineLatest(observable1, observable2, observable3, new Function3<Long, Long, Long, String>() {
        @Override
        public String apply(Long t1, Long t2, Long t3) throws Exception {
            System.out.println("--> apply(2): t1 = " + t1 + ", t2 = " + t2 + ", t3 = " + t3);
            return t1 + t2 + t3 + "";   // 計算所有數(shù)據(jù)的和并轉(zhuǎn)換為字符串
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String t) throws Exception {
            System.out.println("--> accept(2): " + t);
        }
    });

輸出:

--> apply(1) t1 = 1, t2 = 1
----> accept combineLatest(1): 2
--> apply(1) t1 = 2, t2 = 1
----> accept combineLatest(1): 3
--> apply(1) t1 = 3, t2 = 1
----> accept combineLatest(1): 4
--> apply(1) t1 = 3, t2 = 2
----> accept combineLatest(1): 5
--> apply(1) t1 = 4, t2 = 2
----> accept combineLatest(1): 6
--> apply(1) t1 = 4, t2 = 3
----> accept combineLatest(1): 7
--> apply(1) t1 = 5, t2 = 3
----> accept combineLatest(1): 8
--> apply(1) t1 = 5, t2 = 4
----> accept combineLatest(1): 9
--> apply(1) t1 = 5, t2 = 5
----> accept combineLatest(1): Success
--------------------------------------------------------
--> apply(2): t1 = 1, t2 = 1, t3 = 100
--> accept(2): 102
--> apply(2): t1 = 2, t2 = 1, t3 = 100
--> accept(2): 103
--> apply(2): t1 = 2, t2 = 1, t3 = 101
--> accept(2): 104
--> apply(2): t1 = 2, t2 = 2, t3 = 101
--> accept(2): 105
--> apply(2): t1 = 3, t2 = 2, t3 = 101
--> accept(2): 106
--> apply(2): t1 = 3, t2 = 2, t3 = 102
--> accept(2): 107
--> apply(2): t1 = 4, t2 = 2, t3 = 102
--> accept(2): 108
--> apply(2): t1 = 4, t2 = 2, t3 = 103
--> accept(2): 109
--> apply(2): t1 = 5, t2 = 2, t3 = 103
--> accept(2): 110
--> apply(2): t1 = 5, t2 = 3, t3 = 103
--> accept(2): 111
--> apply(2): t1 = 5, t2 = 3, t3 = 104
--> accept(2): 112
--> apply(2): t1 = 5, t2 = 4, t3 = 104
--> accept(2): 113
--> apply(2): t1 = 5, t2 = 5, t3 = 104
--> accept(2): 114

Javadoc: combineLatest(T1, T2, T3... , T9, combiner)

2. Join

任何時候悔政,只要在另一個Observable發(fā)射的數(shù)據(jù)定義的時間窗口內(nèi)晚吞,這個Observable發(fā)射了一條數(shù)據(jù),就結(jié)合兩個Observable發(fā)射的數(shù)據(jù)卓箫。

img-join

Join 操作符結(jié)合兩個Observable發(fā)射的數(shù)據(jù)载矿,基于時間窗口(你定義的針對每條數(shù)據(jù)特定的原則)選擇待集合的數(shù)據(jù)項垄潮。你將這些時間窗口實現(xiàn)為一些Observables烹卒,它們的生命周期從任何一條Observable發(fā)射的每一條數(shù)據(jù)開始。當(dāng)這個定義時間窗口的Observable發(fā)射了一條數(shù)據(jù)或者完成時弯洗,與這條數(shù)據(jù)關(guān)聯(lián)的窗口也會關(guān)閉旅急。只要這條數(shù)據(jù)的窗口是打開的,它將繼續(xù)結(jié)合其它Observable發(fā)射的任何數(shù)據(jù)項牡整。你定義一個用于結(jié)合數(shù)據(jù)的函數(shù)藐吮。

解析: join(other, leftEnd, rightEnd, resultSelector) 相關(guān)參數(shù)的解析

  • other: 源Observable與其組合的目標(biāo)Observable。
  • leftEnd: 接收一個源數(shù)據(jù)項,返回一個Observable谣辞,這個Observable的生命周期就是源Observable發(fā)射數(shù)據(jù)的有效期。
  • rightEnd: 接收一個源數(shù)據(jù)項,返回一個Observable躯嫉,這個Observable的生命周期就是目標(biāo)Observable發(fā)射數(shù)據(jù)的有效期。
  • resultSelector: 接收源Observable和目標(biāo)Observable發(fā)射的數(shù)據(jù)項变骡, 處理后的數(shù)據(jù)返回給觀察者對象。

注意: 這是源Observable和目標(biāo)Observable發(fā)射數(shù)據(jù)在任意一個基于時間窗口的有效期內(nèi)才會接收到組合數(shù)據(jù)懊缺,這就意味著可能有數(shù)據(jù)丟失的情況猛遍,在其中一個已經(jīng)發(fā)射完所有數(shù)據(jù),并且沒有處于時間窗口的數(shù)據(jù)情況涕刚,另一個Observable的數(shù)據(jù)發(fā)射將不會收到組合數(shù)據(jù)晨缴。

示例代碼:

    // Observable的創(chuàng)建
    Observable<Long> sourceObservable = Observable.intervalRange(1, 5, 1, 500, TimeUnit.MILLISECONDS);
    Observable<Long> targetObservable = Observable.intervalRange(10, 5, 1, 1000, TimeUnit.MILLISECONDS);

    // 1. join(other, leftEnd, rightEnd, resultSelector)
    // other: 目標(biāo)組合的Observable
    // leftEnd: 接收一個源數(shù)據(jù)項装盯,返回一個Observable,這個Observable的生命周期就是源Observable發(fā)射數(shù)據(jù)的有效期
    // rightEnd: 接收一個源數(shù)據(jù)項,返回一個Observable,這個Observable的生命周期就是目標(biāo)Observable發(fā)射數(shù)據(jù)的有效期
    // resultSelector: 接收源Observable和目標(biāo)Observable發(fā)射的數(shù)據(jù)項幌衣, 處理后的數(shù)據(jù)返回給觀察者對象
    sourceObservable.join(targetObservable, new Function<Long, ObservableSource<Long>>() {
        @Override
        public ObservableSource<Long> apply(Long t) throws Exception {
            System.out.println("-----> t1 is emitter: " + t);
            return Observable.timer(1000, TimeUnit.MILLISECONDS);   // 源Observable發(fā)射數(shù)據(jù)的有效期為1000毫秒
        }
    }, new Function<Long, ObservableSource<Long>>() {
        @Override
        public ObservableSource<Long> apply(Long t) throws Exception {
            System.out.println("-----> t2 is emitter: " + t);
            return Observable.timer(1000, TimeUnit.MILLISECONDS);   // 目標(biāo)Observable發(fā)射數(shù)據(jù)的有效期為1000毫秒
        }
    }, new BiFunction<Long, Long, String>() {
        @Override
        public String apply(Long t1, Long t2) throws Exception {
            return "t1 = " + t1 + ", t2 = " + t2;                   // 對數(shù)據(jù)進行組合后返回和觀察者
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String t) throws Exception {
            System.out.println("--> accept(1): " + t);
        }
    });

    System.in.read();

輸出:

-----> t1 is emitter: 1
-----> t2 is emitter: 10
--> accept(1): t1 = 1, t2 = 10
-----> t1 is emitter: 2
--> accept(1): t1 = 2, t2 = 10
-----> t1 is emitter: 3
--> accept(1): t1 = 3, t2 = 10
-----> t2 is emitter: 11
--> accept(1): t1 = 1, t2 = 11
--> accept(1): t1 = 2, t2 = 11
--> accept(1): t1 = 3, t2 = 11
-----> t1 is emitter: 4
--> accept(1): t1 = 4, t2 = 11
-----> t1 is emitter: 5
--> accept(1): t1 = 5, t2 = 11
-----> t2 is emitter: 12
--> accept(1): t1 = 3, t2 = 12
--> accept(1): t1 = 4, t2 = 12
--> accept(1): t1 = 5, t2 = 12
-----> t2 is emitter: 13
--> accept(1): t1 = 5, t2 = 13
-----> t2 is emitter: 14   // 此時源t1中已經(jīng)沒有數(shù)據(jù)還處于時間窗口有效期內(nèi)

Javadoc: join(other, leftEnd, rightEnd, resultSelector)

groupJoin

groupJoin 操作符與 join 相同趁耗,只是參數(shù)傳遞有所區(qū)別右冻。groupJoin(other, leftEnd, rightEnd, resultSelector) 中的resultSelector 可以將原始數(shù)據(jù)轉(zhuǎn)換為 Observable 類型的數(shù)據(jù)發(fā)送給觀察者。

img-groupJoin

示例代碼:

    // Observable的創(chuàng)建
    Observable<Long> sourceObservable = Observable.intervalRange(1, 5, 1, 500, TimeUnit.MILLISECONDS);
    Observable<Long> targetObservable = Observable.intervalRange(10, 5, 1, 1000, TimeUnit.MILLISECONDS);

    // 2. groupJoin(other, leftEnd, rightEnd, resultSelector)
    // groupJoin操作符與join相同著拭,只是參數(shù)傳遞有所區(qū)別纱扭。
    // resultSelector可以將原始數(shù)據(jù)轉(zhuǎn)換為Observable類型的數(shù)據(jù)發(fā)送給觀察者。
    sourceObservable.groupJoin(targetObservable, new Function<Long, ObservableSource<Long>>() {
        @Override
        public ObservableSource<Long> apply(Long t) throws Exception {
            System.out.println("-----> t1 is emitter: " + t);
            return Observable.timer(1000, TimeUnit.MILLISECONDS);   // 源Observable發(fā)射數(shù)據(jù)的有效期為1000毫秒
        }
    }, new Function<Long, ObservableSource<Long>>() {
        @Override
        public ObservableSource<Long> apply(Long t) throws Exception {
            System.out.println("-----> t2 is emitter: " + t);
            return Observable.timer(1000, TimeUnit.MILLISECONDS);   // 目標(biāo)Observable發(fā)射數(shù)據(jù)的有效期為1000毫秒
        }
    }, new BiFunction<Long, Observable<Long>, Observable<String>>() {
        @Override
        public Observable<String> apply(Long t1, Observable<Long> t2) throws Exception {
            System.out.println("--> apply(2) combine: " + t1);            // 結(jié)合操作
            return t2.map(new Function<Long, String>() {
                @Override
                public String apply(Long t) throws Exception {
                    System.out.println("-----> apply(2) operation: " + t);
                    return "t1 = " + t1 + ", t2 = " + t;
                }
            });
        }
    }).subscribe(new Consumer<Observable<String>>() {
        @Override
        public void accept(Observable<String> stringObservable) throws Exception {
            stringObservable.subscribe(new Consumer<String>() {
                @Override
                public void accept(String t) throws Exception {
                    System.out.println("--> accept(2): " + t);
                }
            });
        }
    });

輸出:

-----> t1 is emitter: 1
--> apply(2) combine: 1
-----> t2 is emitter: 10
-----> apply(2) operation: 10
--> accept(2): t1 = 1, t2 = 10
-----> t1 is emitter: 2
--> apply(2) combine: 2
-----> apply(2) operation: 10
--> accept(2): t1 = 2, t2 = 10
-----> t1 is emitter: 3
--> apply(2) combine: 3
-----> apply(2) operation: 10
--> accept(2): t1 = 3, t2 = 10
-----> t2 is emitter: 11
-----> apply(2) operation: 11
--> accept(2): t1 = 1, t2 = 11
-----> apply(2) operation: 11
--> accept(2): t1 = 2, t2 = 11
-----> apply(2) operation: 11
--> accept(2): t1 = 3, t2 = 11
-----> t1 is emitter: 4
--> apply(2) combine: 4
-----> apply(2) operation: 11
--> accept(2): t1 = 4, t2 = 11
-----> t1 is emitter: 5
--> apply(2) combine: 5
-----> apply(2) operation: 11
--> accept(2): t1 = 5, t2 = 11
-----> t2 is emitter: 12
-----> apply(2) operation: 12
--> accept(2): t1 = 3, t2 = 12
-----> apply(2) operation: 12
--> accept(2): t1 = 4, t2 = 12
-----> apply(2) operation: 12
--> accept(2): t1 = 5, t2 = 12
-----> t2 is emitter: 13
-----> apply(2) operation: 13
--> accept(2): t1 = 5, t2 = 13
-----> t2 is emitter: 14

Javadoc: groupJoin(other, leftEnd, rightEnd, resultSelector)

3. Merge

合并多個Observables的發(fā)射物儡遮。

img-Merge

使用 Merge 操作符你可以將多個Observables的輸出合并乳蛾,就好像它們是一個單個的 Observable 一樣。

3.1 merge

Merge 可能會讓合并的Observables發(fā)射的數(shù)據(jù)交錯(有一個類似的操作符 Concat 不會讓數(shù)據(jù)交錯鄙币,它會按順序一個接著一個發(fā)射多個Observables的發(fā)射物)肃叶,任何一個原始Observable的 onError 通知會被立即傳遞給觀察者,而且會終止合并后的Observable十嘿。

img-merge

除了傳遞多個Observable給 merge 因惭,你還可以傳遞一個Observable列表 List ,數(shù)組绩衷,甚至是一個發(fā)射Observable序列的Observable蹦魔, merge 將合并它們的輸出作為單個Observable的輸出激率。

img-merge-observables

如果你傳遞一個發(fā)射Observables序列的Observable,你可以指定 merge 應(yīng)該同時訂閱的 Observable 的最大數(shù)量勿决。一旦達到訂閱數(shù)的限制乒躺,它將不再訂閱原始Observable發(fā)射的任何其它Observable,直到某個已經(jīng)訂閱的Observable發(fā)射了 onCompleted 通知低缩。

示例代碼:

    // 創(chuàng)建Observable對象
    Observable<Integer> odd = Observable.just(1, 3, 5);
    Observable<Integer> even = Observable.just(2, 4, 6);
    Observable<Integer> big = Observable.just(188888, 688888, 888888);

    // 創(chuàng)建list對象
    List<Observable<Integer>> list = new ArrayList<>();
    list.add(odd);
    list.add(even);
    list.add(big);

    // 創(chuàng)建Array對象
    Observable<Integer>[] observables = new Observable[3];
    observables[0] = odd;
    observables[1] = even;
    observables[2] = big;

    // 創(chuàng)建發(fā)射Observable序列的Observable
    Observable<ObservableSource<Integer>> sources = Observable.create(new ObservableOnSubscribe<ObservableSource<Integer>>() {
        @Override
        public void subscribe(ObservableEmitter<ObservableSource<Integer>> emitter) throws Exception {
            emitter.onNext(Observable.just(1));
            emitter.onNext(Observable.just(1, 2));
            emitter.onNext(Observable.just(1, 2, 3));
            emitter.onNext(Observable.just(1, 2, 3, 4));
            emitter.onNext(Observable.just(1, 2, 3, 4, 5));
            emitter.onComplete();
        }
    });

    // 1. merge(ObservableSource source1, ObservableSource source2, ..., ObservableSource source4)
    // 可接受 2-4 個Observable對象進行merge
    Observable.merge(odd, even)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(1): " + integer);
                }
            });

    System.out.println("-----------------------------------------------");
    // 2. merge(Iterable<? extends ObservableSource<? extends T>> sources, int maxConcurrency, int bufferSize)
    // 可選參數(shù), maxConcurrency: 最大的并發(fā)處理數(shù), bufferSize: 緩存的數(shù)量(從每個內(nèi)部觀察資源預(yù)取的項數(shù))
    // 接受一個Observable的列表List
    Observable.merge(list)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(2): " + integer);
                }
            });

    System.out.println("-----------------------------------------------");
    // 3. mergeArray(int maxConcurrency, int bufferSize, ObservableSource<? extends T>... sources)
    // 可選參數(shù), maxConcurrency: 最大的并發(fā)處理數(shù), bufferSize: 緩存的數(shù)量(從每個內(nèi)部觀察資源預(yù)取的項數(shù))
    // 接受一個Observable的數(shù)組Array
    Observable.mergeArray(observables)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(3): " + integer);
                }
            });

    System.out.println("-----------------------------------------------");
    // 4. merge(ObservableSource<? extends ObservableSource<? extends T>> sources, int maxConcurrency)
    // 可選參數(shù), maxConcurrency: 最大的并發(fā)處理數(shù)
    // 接受一個發(fā)射Observable序列的Observable
    Observable.merge(sources)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(4): " + integer);
                }
            });

    System.out.println("-----------------------------------------------");
    // 5. mergeWith(other)
    // merge 是靜態(tài)方法嘉冒, mergeWith 是對象方法: Observable.merge(odd,even) 等價于 odd.mergeWith(even)
    odd.mergeWith(even)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(5): " + integer);
                }
            });

輸出:

--> accept(1): 1
--> accept(1): 3
--> accept(1): 5
--> accept(1): 2
--> accept(1): 4
--> accept(1): 6
-----------------------------------------------
--> accept(2): 1
--> accept(2): 3
--> accept(2): 5
--> accept(2): 2
--> accept(2): 4
--> accept(2): 6
--> accept(2): 188888
--> accept(2): 688888
--> accept(2): 888888
-----------------------------------------------
--> accept(3): 1
--> accept(3): 3
--> accept(3): 5
--> accept(3): 2
--> accept(3): 4
--> accept(3): 6
--> accept(3): 188888
--> accept(3): 688888
--> accept(3): 888888
-----------------------------------------------
--> accept(4): 1
--> accept(4): 1
--> accept(4): 2
--> accept(4): 1
--> accept(4): 2
--> accept(4): 3
--> accept(4): 1
--> accept(4): 2
--> accept(4): 3
--> accept(4): 4
--> accept(4): 1
--> accept(4): 2
--> accept(4): 3
--> accept(4): 4
--> accept(4): 5
-----------------------------------------------
--> accept(5): 1
--> accept(5): 3
--> accept(5): 5
--> accept(5): 2
--> accept(5): 4
--> accept(5): 6

Javadoc: merge(source1, ... , source4)
Javadoc: merge(Iterable sources, int maxConcurrency, int bufferSize)
Javadoc: mergeArray(int maxConcurrency, int bufferSize, ObservableSource... sources)
Javadoc: merge(ObservableSource<ObservableSource> sources, int maxConcurrency)

3.2 mergeDelayError

如果傳遞給 merge 的任何一個的Observable發(fā)射了 onError通知終止了, merge 操作符生成的Observable也會立即以onError通知終止咆繁。如果你想讓它繼續(xù)發(fā)射數(shù)據(jù)讳推,在最后才報告錯誤,可以使用 mergeDelayError 玩般。

img-mergeDelayError

MergeDelayError 操作符娜遵,mergeDelayError 在合并與交錯輸出的使用上與 merge 相同,區(qū)別在于它會保留 onError 通知直到其他沒有Error的Observable所有的數(shù)據(jù)發(fā)射完成壤短,在那時它才會把onError傳遞給觀察者设拟。

注意: 如果有多個原始Observable出現(xiàn)了Error, 這些Error通知會被合并成一個 CompositeException 久脯,保留在CompositeException 內(nèi)部的 List<Throwable> exceptions 中纳胧,但是如果只有一個原始Observable出現(xiàn)了Error,則不會生成 CompositeException 帘撰,只會發(fā)送這個Error通知跑慕。

由于MergeDelayError使用上和merge相同 ,所以這里就不做詳細(xì)分析了摧找,這里就簡單描述其中的一種的使用實例核行。

實例代碼:

    // 創(chuàng)建有Error的Observable序列的Observable
    Observable<ObservableSource<Integer>> DelayErrorObservable = Observable.create(new ObservableOnSubscribe<ObservableSource<Integer>>() {

        @Override
        public void subscribe(ObservableEmitter<ObservableSource<Integer>> emitter) throws Exception {
            emitter.onNext(Observable.just(1));
            emitter.onNext(Observable.error(new Exception("Error Test1"))); // 發(fā)射一個Error的通知的Observable
            emitter.onNext(Observable.just(2, 3));
            emitter.onNext(Observable.error(new Exception("Error Test2"))); // 發(fā)射一個Error的通知的Observable
            emitter.onNext(Observable.just(4, 5, 6));
            emitter.onComplete();
        }
    });

    // 6. mergeDelayError
    // 保留onError通知直到合并后的Observable所有的數(shù)據(jù)發(fā)射完成,在那時它才會把onError傳遞給觀察者
    Observable.mergeDelayError(DelayErrorObservable)
            .subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("--> onSubscribe(6)");
                }

                @Override
                public void onNext(Integer integer) {
                    System.out.println("--> onNext(6): " + integer);
                }

                @Override
                public void onError(Throwable e) {
                    // 判斷是否是CompositeException對象(發(fā)生多個Observable出現(xiàn)Error時會發(fā)送的對象)
                    if (e instanceof CompositeException) {
                        CompositeException compositeException = (CompositeException) e;
                        List<Throwable> exceptions = compositeException.getExceptions();
                        System.out.println("--> onError(6): " + exceptions);
                    } else {
                        System.out.println("--> onError(6): " + e);
                    }
                }

                @Override
                public void onComplete() {
                    System.out.println("--> onComplete(6)");
                }
            });

輸出:

--> onSubscribe(6)
--> onNext(6): 1
--> onNext(6): 2
--> onNext(6): 3
--> onNext(6): 4
--> onNext(6): 5
--> onNext(6): 6
--> onError(6): [java.lang.Exception: Error Test1, java.lang.Exception: Error Test2]

Javadoc: mergeDelayError(source1, … , source4)
Javadoc: mergeDelayError(Iterable sources, int maxConcurrency, int bufferSize)
Javadoc: mergeArrayDelayError(int maxConcurrency, int bufferSize, ObservableSource… sources)
Javadoc: mergeDelayError(ObservableSource sources, int maxConcurrency)

4. Zip

通過一個函數(shù)將多個Observables的發(fā)射物結(jié)合到一起蹬耘,基于這個函數(shù)的結(jié)果為每個 結(jié)合體 發(fā)射單個數(shù)據(jù)項芝雪。

img-Zip

Zip 操作符與 Merge 類似,都是合并多個Observables的數(shù)據(jù)综苔,返回一個Obversable惩系,主要不同的是它使用這個函數(shù)按順序結(jié)合兩個或多個Observables發(fā)射的數(shù)據(jù)項,然后它發(fā)射這個函數(shù)返回的結(jié)果如筛。它按照嚴(yán)格的順序應(yīng)用這個函數(shù)堡牡。 它只發(fā)射與發(fā)射數(shù)據(jù)項最少的那個Observable一樣多的數(shù)據(jù)。

img-Zip-Sources

解析:

  1. Zip 操作符與 Merge 的使用上基本一致杨刨,主要不同的是 zip 發(fā)射的數(shù)據(jù)取決于發(fā)射數(shù)據(jù)項最少的那個Observable并且按照嚴(yán)格的順序去結(jié)合數(shù)據(jù)晤柄。
  2. 同樣具備靜態(tài)方法 zip 與對象方法 zipWith,可以傳遞一個Observable列表 List 妖胀,數(shù)組芥颈,甚至是一個發(fā)射Observable序列的Observable惠勒。

使用上在此就不做詳細(xì)的展開了,可參照上面的 Merge 使用方法浇借,下面就針對 zip 的特性實現(xiàn)一個簡單的實例。

實例代碼:

    // 創(chuàng)建Observable
    Observable<Integer> observable1 = Observable.just(1, 2, 3);
    Observable<Integer> observable2 = Observable.just(1, 2, 3, 4, 5, 6);
    
    // zip(sources)
    // 可接受2-9個參數(shù)的Observable怕品,對其進行順序合并操作妇垢,最終合并的數(shù)據(jù)項取決于最少的數(shù)據(jù)項的Observable
    Observable.zip(observable1, observable2, new BiFunction<Integer, Integer, String>() {
        @Override
        public String apply(Integer t1, Integer t2) throws Exception {
            System.out.println("--> apply: t1 = " + t1 + ", t2 = " + t2);
            return t1 + t2 + "";
        }
    }).subscribe(new Consumer<String>() {

        @Override
        public void accept(String s) throws Exception {
            System.out.println("--> accept: " + s);  // 最終接受observable1全部數(shù)據(jù)項與observable2相同數(shù)量順序部分?jǐn)?shù)據(jù)
        }
    });

輸出:

--> apply: t1 = 1, t2 = 1
--> accept: 2
--> apply: t1 = 2, t2 = 2
--> accept: 4
--> apply: t1 = 3, t2 = 3
--> accept: 6

Javadoc: zip( source1, source2, ... , source9, zipper )
Javadoc: zip( Iterable sources, Function zipper )
Javadoc: zipIterable(Iterable<ObservableSource> sources, Function<Object[],R> zipper, boolean delayError, int bufferSize)
Javadoc: zipArray( Function<Object[]> zipper, boolean delayError, int bufferSize, ObservableSource... sources )
Javadoc: zip( ObservableSource<ObservableSource> sources, Function<Object[]> zipper )

5. StartWith

在數(shù)據(jù)序列的開頭插入一條指定的數(shù)據(jù)項或者數(shù)據(jù)序列。

img-StartWith

如果你想要一個Observable在發(fā)射數(shù)據(jù)之前先發(fā)射一個指定的數(shù)據(jù)或者數(shù)據(jù)序列(可以是單個數(shù)據(jù)肉康、數(shù)組闯估、列表,Observable中的數(shù)據(jù))吼和,可以使 用 StartWith 操作符涨薪。(如果你想一個Observable發(fā)射的數(shù)據(jù)末尾追加一個數(shù)據(jù)序列可以使用 Concat 操作符。)

img-StartWith-Items

實例代碼:

    // 創(chuàng)建列表List
    List<Integer> lists = new ArrayList<>();
    lists.add(999);
    lists.add(9999);
    lists.add(99999);

    // 創(chuàng)建數(shù)組Array
    Integer[] arrays = new Integer[3];
    arrays[0] = 999;
    arrays[1] = 9999;
    arrays[2] = 9999;

    // 1. startWith(item)
    // 在Observable數(shù)據(jù)發(fā)射前發(fā)射item數(shù)據(jù)項
    Observable.just(1, 2, 3)
            .startWith(999)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(1): " + integer);
                }
            });

    System.out.println("-----------------------------------------");
    // 2. startWith(Iterable items)
    // 在Observable數(shù)據(jù)發(fā)射前發(fā)射items列表中的數(shù)據(jù)序列
    Observable.just(1, 2, 3)
            .startWith(lists)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(2): " + integer);
                }
            });

    System.out.println("-----------------------------------------");
    // 3. startWithArray(items)
    // 在Observable數(shù)據(jù)發(fā)射前發(fā)射items數(shù)組中的數(shù)據(jù)序列
    Observable.just(1, 2, 3)
            .startWithArray(arrays)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(3): " + integer);
                }
            });

    System.out.println("-----------------------------------------");
    // 4. startWith(ObservableSource other)
    // 在Observable數(shù)據(jù)發(fā)射前發(fā)射other中的數(shù)據(jù)序列
    Observable.just(1, 2, 3)
            .startWith(Observable.just(999, 9999, 99999))
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(4): " + integer);
                }
            });

輸出:

--> accept(1): 999
--> accept(1): 1
--> accept(1): 2
--> accept(1): 3
-----------------------------------------
--> accept(2): 999
--> accept(2): 9999
--> accept(2): 99999
--> accept(2): 1
--> accept(2): 2
--> accept(2): 3
-----------------------------------------
--> accept(3): 999
--> accept(3): 9999
--> accept(3): 9999
--> accept(3): 1
--> accept(3): 2
--> accept(3): 3
-----------------------------------------
--> accept(4): 999
--> accept(4): 9999
--> accept(4): 99999
--> accept(4): 1
--> accept(4): 2
--> accept(4): 3

Javadoc: startWith(item)
Javadoc: startWith(Iterable items)
Javadoc: startWithArray(items)
Javadoc: startWith(ObservableSource other)

6. SwitchOnNext

將一個發(fā)射多個Observables的Observable轉(zhuǎn)換成另一個單獨的Observable炫乓,后者發(fā)射那些 Observables最近發(fā)射的數(shù)據(jù)項刚夺。

6.1 switchOnNext

switchOnNext 訂閱一個發(fā)射多個Observables的Observable。它每次觀察那些Observables中的一個末捣, switchOnNext 發(fā)射的這個新Observable并取消訂閱前一個發(fā)射數(shù)據(jù)的舊Observable侠姑,開始發(fā)射最新的Observable發(fā)射的數(shù)據(jù)。

img-switchOnNext

注意: 當(dāng)原始Observables發(fā)射了一個新的Observable時(不是這個新的Observable發(fā)射了一條數(shù)據(jù)時)箩做,它將取消訂閱之前的那個Observable莽红。這意味著,在 后來那個Observable產(chǎn)生之后到它開始發(fā)射數(shù)據(jù)之前的這段時間里邦邦,前一個Observable發(fā)射 的數(shù)據(jù)將被丟棄(就像圖例上的那個黃色圓圈一樣)安吁。

6.2 switchOnNextDelayError

當(dāng)Observables發(fā)射一個新的Observable后,則會取消訂閱前面的舊observable燃辖,直接開始接受新Observable的數(shù)據(jù)鬼店,如果Observables中的Observable有 Error 異常,將保留 onError 通知直到其他沒有Error的Observable所有的數(shù)據(jù)發(fā)射完成黔龟,在那時它才會把 onError 傳遞給觀察者薪韩。

img-switchOnNextDelayError

注意: 如果有多個原始Observable出現(xiàn)了Error, 這些Error通知會被合并成一個 CompositeException 捌锭,保留在CompositeException 內(nèi)部的 List<Throwable> exceptions 中俘陷,但是如果只有一個原始Observable出現(xiàn)了Error,則不會生成 CompositeException 观谦,只會發(fā)送這個Error通知拉盾。

實例代碼:

    // 創(chuàng)建Observable
    Observable<Long> observable1 = Observable.intervalRange(1, 5, 1, 500, TimeUnit.MILLISECONDS);
    Observable<Long> observable2 = Observable.intervalRange(10, 5, 1, 500, TimeUnit.MILLISECONDS);

    // 創(chuàng)建發(fā)射Observable序列的Observable
    Observable<Observable<Long>> sources = Observable.create(new ObservableOnSubscribe<Observable<Long>>() {

        @Override
        public void subscribe(ObservableEmitter<Observable<Long>> emitter) throws Exception {
            emitter.onNext(observable1);
            Thread.sleep(1000);
            // 此時發(fā)射一個新的observable2,將會取消訂閱observable1
            emitter.onNext(observable2);
            emitter.onComplete();
        }
    });

    // 創(chuàng)建發(fā)射含有Error通知的Observable序列的Observable
    Observable<Observable<Long>> sourcesError = Observable.create(new ObservableOnSubscribe<Observable<Long>>() {

        @Override
        public void subscribe(ObservableEmitter<Observable<Long>> emitter) throws Exception {
            emitter.onNext(observable1);
            emitter.onNext(Observable.error(new Exception("Error Test1!"))); // 發(fā)射一個發(fā)射Error通知的Observable
            emitter.onNext(Observable.error(new Exception("Error Test2!"))); // 發(fā)射一個發(fā)射Error通知的Observable
            Thread.sleep(1000);
            // 此時發(fā)射一個新的observable2豁状,將會取消訂閱observable1
            emitter.onNext(observable2);
            emitter.onComplete();
        }
    });

    // 1. switchOnNext(ObservableSource<ObservableSource> sources, int bufferSize)
    // 可選參數(shù) bufferSize: 緩存數(shù)據(jù)項大小
    // 接受一個發(fā)射Observable序列的Observable類型的sources捉偏,
    // 當(dāng)sources發(fā)射一個新的Observable后倒得,則會取消訂閱前面的舊observable,直接開始接受新Observable的數(shù)據(jù)
    Observable.switchOnNext(sources)
            .subscribe(new Consumer<Long>() {

                @Override
                public void accept(Long integer) throws Exception {
                    System.out.println("--> accept(1): " + integer);
                }
            });

    System.in.read();
    System.out.println("--------------------------------------------------------------------");
    // 2. switchOnNextDelayError(ObservableSource<ObservableSource> sources, int prefetch)
    // 可選參數(shù) prefetch: 與讀取數(shù)據(jù)項大小
    // 當(dāng)sources發(fā)射一個新的Observable后夭禽,則會取消訂閱前面的舊observable霞掺,直接開始接受新Observable的數(shù)據(jù),
    // 保留onError通知直到合并后的Observable所有的數(shù)據(jù)發(fā)射完成讹躯,在那時它才會把onError傳遞給觀察者
    Observable.switchOnNextDelayError(sourcesError)
            .subscribe(new Observer<Long>() {

                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("--> onSubscribe(2)");
                }

                @Override
                public void onNext(Long t) {
                    System.out.println("--> onNext(2): " + t);
                }

                @Override
                public void onError(Throwable e) {
                    // 判斷是否是CompositeException對象(發(fā)生多個Observable出現(xiàn)Error時會發(fā)送的對象)
                    if (e instanceof CompositeException) {
                        CompositeException compositeException = (CompositeException) e;
                        List<Throwable> exceptions = compositeException.getExceptions();
                        System.out.println("--> onError(2): " + exceptions);
                    } else {
                        System.out.println("--> onError(2): " + e);
                    }
                }

                @Override
                public void onComplete() {
                    System.out.println("--> onComplete(2)");
                }
            });

    System.in.read();

輸出:

--> accept(1): 1
--> accept(1): 2
--> accept(1): 10
--> accept(1): 11
--> accept(1): 12
--> accept(1): 13
--> accept(1): 14
--------------------------------------------------------------------
--> onSubscribe(2)
--> onNext(2): 10
--> onNext(2): 11
--> onNext(2): 12
--> onNext(2): 13
--> onNext(2): 14
--> onError(2): [java.lang.Exception: Error Test1!, java.lang.Exception: Error Test2!]

Javadoc: switchOnNext(ObservableSource<ObservableSource> sources, int bufferSize)
Javadoc: switchOnNextDelayError(ObservableSource<ObservableSource> sources, int prefetch)

小結(jié)

Rxjava 的合并操作符能夠同時處理多個被觀察者菩彬,并發(fā)送相應(yīng)的事件通知以及數(shù)據(jù)。常常應(yīng)用于多業(yè)務(wù)合并處理場景潮梯,比如表單的聯(lián)動驗證骗灶,網(wǎng)絡(luò)交互性數(shù)據(jù)的校驗等,rxjava的合并操作符能夠很好的去實現(xiàn)和處理秉馏。

提示:以上使用的Rxjava2版本: 2.2.12

Rx介紹與講解及完整目錄參考:Rxjava2 介紹與詳解實例

實例代碼:

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末耙旦,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子萝究,更是在濱河造成了極大的恐慌免都,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,734評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件帆竹,死亡現(xiàn)場離奇詭異琴昆,居然都是意外死亡,警方通過查閱死者的電腦和手機馆揉,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,931評論 3 394
  • 文/潘曉璐 我一進店門业舍,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人升酣,你說我怎么就攤上這事舷暮。” “怎么了噩茄?”我有些...
    開封第一講書人閱讀 164,133評論 0 354
  • 文/不壞的土叔 我叫張陵下面,是天一觀的道長。 經(jīng)常有香客問我绩聘,道長沥割,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,532評論 1 293
  • 正文 為了忘掉前任凿菩,我火速辦了婚禮窍箍,結(jié)果婚禮上极颓,老公的妹妹穿的比我還像新娘巫橄。我一直安慰自己膀斋,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,585評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著蚀苛,像睡著了一般在验。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上堵未,一...
    開封第一講書人閱讀 51,462評論 1 302
  • 那天腋舌,我揣著相機與錄音,去河邊找鬼渗蟹。 笑死块饺,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的拙徽。 我是一名探鬼主播刨沦,決...
    沈念sama閱讀 40,262評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼诗宣,長吁一口氣:“原來是場噩夢啊……” “哼膘怕!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起召庞,我...
    開封第一講書人閱讀 39,153評論 0 276
  • 序言:老撾萬榮一對情侶失蹤岛心,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后篮灼,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體忘古,經(jīng)...
    沈念sama閱讀 45,587評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,792評論 3 336
  • 正文 我和宋清朗相戀三年诅诱,在試婚紗的時候發(fā)現(xiàn)自己被綠了髓堪。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,919評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡娘荡,死狀恐怖干旁,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情炮沐,我是刑警寧澤争群,帶...
    沈念sama閱讀 35,635評論 5 345
  • 正文 年R本政府宣布,位于F島的核電站大年,受9級特大地震影響换薄,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜翔试,卻給世界環(huán)境...
    茶點故事閱讀 41,237評論 3 329
  • 文/蒙蒙 一轻要、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧垦缅,春花似錦伦腐、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,855評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽幸冻。三九已至,卻和暖如春咳焚,著一層夾襖步出監(jiān)牢的瞬間洽损,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,983評論 1 269
  • 我被黑心中介騙來泰國打工革半, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留碑定,地道東北人。 一個月前我還...
    沈念sama閱讀 48,048評論 3 370
  • 正文 我出身青樓又官,卻偏偏與公主長得像延刘,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子六敬,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,864評論 2 354

推薦閱讀更多精彩內(nèi)容