簡要:
需求了解:
在使用 RxJava
開發(fā)的過程中,很多時候需要結(jié)合多個條件或者數(shù)據(jù)的邏輯判斷缀去,比如登錄功能的表單驗證,實時數(shù)據(jù)比對等。這個時候我們就需要使用 RxJava 的結(jié)合操作符來完成這一需求欢搜,Rx中提供了豐富的結(jié)合操作處理的操作方法。
可用于組合多個Observables的操作方法:
- CombineLatest:當(dāng)Observables中的任何一個發(fā)射了一個數(shù)據(jù)時谴轮,通過一個指定的函數(shù)組合每個Observable發(fā)射的最新數(shù)據(jù)炒瘟,然后發(fā)射這個函數(shù)的結(jié)果。
- Join:只要在另一個Observable發(fā)射的數(shù)據(jù)定義的時間窗口內(nèi)第步,這個Observable發(fā)射了一條數(shù)據(jù)疮装,就結(jié)合兩個Observable發(fā)射的數(shù)據(jù)。
- Merge:合并多個Observables的發(fā)射物粘都,可以將多個Observables的輸出合并廓推,就好像它們是一個單個的Observable一樣。
- Zip:通過一個函數(shù)將多個Observables的發(fā)射物結(jié)合到一起翩隧,基于這個函數(shù)的結(jié)果為每個結(jié)合體嚴(yán)格按照數(shù)量以及順序發(fā)射單個數(shù)據(jù)項受啥。
- StartWith:在數(shù)據(jù)序列的開頭插入一條指定的數(shù)據(jù)項或者數(shù)據(jù)序列。
- SwitchOnNext:將一個發(fā)射多個Observables的Observable轉(zhuǎn)換成另一個單獨的Observable,后者發(fā)射那些Observables最新發(fā)射的Observable的數(shù)據(jù)項滚局。
1. CombineLatest
當(dāng) Observables 中的任何一個發(fā)射了數(shù)據(jù)時居暖,使用一個函數(shù)結(jié)合每個 Observable 發(fā)射的最近數(shù)據(jù)項,并且基于這個函數(shù)的結(jié)果發(fā)射數(shù)據(jù)藤肢。
CombineLatest
操作符行為類似于zip
太闺,但是只有當(dāng)原始的Observable中的每一個都發(fā)射了一條數(shù)據(jù)時 zip 才發(fā)射數(shù)據(jù)。 CombineLatest
則在原始的Observable中任意一個發(fā)射了數(shù)據(jù)時發(fā)射一條數(shù)據(jù)嘁圈。當(dāng)原始Observables的任何一個發(fā)射了一條數(shù)據(jù)時省骂, CombineLatest 使用一 個函數(shù)結(jié)合它們最近發(fā)射的數(shù)據(jù),然后發(fā)射這個函數(shù)的返回值最住。
解析:
combineLatest
操作符可以結(jié)合多個Observable钞澳,可以接收 2-9 個Observable對象, 在其中原始Observables的任何一個發(fā)射了一條數(shù)據(jù)時涨缚, CombineLatest 使用一個函數(shù)結(jié)合它們最近發(fā)射的數(shù)據(jù)轧粟,然后發(fā)射這個函數(shù)的返回值。此外combineLatest
操作符還有一些接收 Iterable 脓魏, 數(shù)組方式的變體兰吟,以及其他指定參數(shù)combiner、bufferSize茂翔、和combineLatestDelayError方法等變體混蔼,在此就不在詳細(xì)展開了,有興趣的可以查看官方的相關(guān)API文檔了解珊燎。
實例代碼:
// Observables 創(chuàng)建
Observable<Long> observable1 = Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS);
Observable<Long> observable2 = Observable.intervalRange(1, 5, 1, 2, TimeUnit.SECONDS);
Observable<Long> observable3 = Observable.intervalRange(100, 5, 1, 1, TimeUnit.SECONDS);
// 1. combineLatest(ObservableSource, ObservableSource [支持2-9個參數(shù)]..., BiFunction)
// 結(jié)合多個Observable, 當(dāng)他們其中任意一個發(fā)射了數(shù)據(jù)時惭嚣,使用函數(shù)結(jié)合他們最近發(fā)射的一項數(shù)據(jù)
Observable.combineLatest(observable1, observable2, new BiFunction<Long, Long, String>() {
@Override
public String apply(Long t1, Long t2) throws Exception {
System.out.println("--> apply(1) t1 = " + t1 + ", t2 = " + t2);
if (t1 + t2 == 10) {
return "Success"; // 滿足一定條件,返回指定的字符串
}
return t1 + t2 + ""; // 計算所有數(shù)據(jù)的和并轉(zhuǎn)換為字符串
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String t) throws Exception {
System.out.println("----> accept combineLatest(1): " + t);
}
});
System.out.println("--------------------------------------------------------");
// 2. combineLatest(T1, T2, T3, Function)
// Observables的結(jié)合
Observable.combineLatest(observable1, observable2, observable3, new Function3<Long, Long, Long, String>() {
@Override
public String apply(Long t1, Long t2, Long t3) throws Exception {
System.out.println("--> apply(2): t1 = " + t1 + ", t2 = " + t2 + ", t3 = " + t3);
return t1 + t2 + t3 + ""; // 計算所有數(shù)據(jù)的和并轉(zhuǎn)換為字符串
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String t) throws Exception {
System.out.println("--> accept(2): " + t);
}
});
輸出:
--> apply(1) t1 = 1, t2 = 1
----> accept combineLatest(1): 2
--> apply(1) t1 = 2, t2 = 1
----> accept combineLatest(1): 3
--> apply(1) t1 = 3, t2 = 1
----> accept combineLatest(1): 4
--> apply(1) t1 = 3, t2 = 2
----> accept combineLatest(1): 5
--> apply(1) t1 = 4, t2 = 2
----> accept combineLatest(1): 6
--> apply(1) t1 = 4, t2 = 3
----> accept combineLatest(1): 7
--> apply(1) t1 = 5, t2 = 3
----> accept combineLatest(1): 8
--> apply(1) t1 = 5, t2 = 4
----> accept combineLatest(1): 9
--> apply(1) t1 = 5, t2 = 5
----> accept combineLatest(1): Success
--------------------------------------------------------
--> apply(2): t1 = 1, t2 = 1, t3 = 100
--> accept(2): 102
--> apply(2): t1 = 2, t2 = 1, t3 = 100
--> accept(2): 103
--> apply(2): t1 = 2, t2 = 1, t3 = 101
--> accept(2): 104
--> apply(2): t1 = 2, t2 = 2, t3 = 101
--> accept(2): 105
--> apply(2): t1 = 3, t2 = 2, t3 = 101
--> accept(2): 106
--> apply(2): t1 = 3, t2 = 2, t3 = 102
--> accept(2): 107
--> apply(2): t1 = 4, t2 = 2, t3 = 102
--> accept(2): 108
--> apply(2): t1 = 4, t2 = 2, t3 = 103
--> accept(2): 109
--> apply(2): t1 = 5, t2 = 2, t3 = 103
--> accept(2): 110
--> apply(2): t1 = 5, t2 = 3, t3 = 103
--> accept(2): 111
--> apply(2): t1 = 5, t2 = 3, t3 = 104
--> accept(2): 112
--> apply(2): t1 = 5, t2 = 4, t3 = 104
--> accept(2): 113
--> apply(2): t1 = 5, t2 = 5, t3 = 104
--> accept(2): 114
2. Join
任何時候悔政,只要在另一個Observable發(fā)射的數(shù)據(jù)定義的時間窗口內(nèi)晚吞,這個Observable發(fā)射了一條數(shù)據(jù),就結(jié)合兩個Observable發(fā)射的數(shù)據(jù)卓箫。
Join
操作符結(jié)合兩個Observable發(fā)射的數(shù)據(jù)载矿,基于時間窗口(你定義的針對每條數(shù)據(jù)特定的原則)選擇待集合的數(shù)據(jù)項垄潮。你將這些時間窗口實現(xiàn)為一些Observables烹卒,它們的生命周期從任何一條Observable發(fā)射的每一條數(shù)據(jù)開始。當(dāng)這個定義時間窗口的Observable發(fā)射了一條數(shù)據(jù)或者完成時弯洗,與這條數(shù)據(jù)關(guān)聯(lián)的窗口也會關(guān)閉旅急。只要這條數(shù)據(jù)的窗口是打開的,它將繼續(xù)結(jié)合其它Observable發(fā)射的任何數(shù)據(jù)項牡整。你定義一個用于結(jié)合數(shù)據(jù)的函數(shù)藐吮。
解析: join(other, leftEnd, rightEnd, resultSelector)
相關(guān)參數(shù)的解析
- other: 源Observable與其組合的目標(biāo)Observable。
- leftEnd: 接收一個源數(shù)據(jù)項,返回一個Observable谣辞,這個Observable的生命周期就是源Observable發(fā)射數(shù)據(jù)的有效期。
- rightEnd: 接收一個源數(shù)據(jù)項,返回一個Observable躯嫉,這個Observable的生命周期就是目標(biāo)Observable發(fā)射數(shù)據(jù)的有效期。
- resultSelector: 接收源Observable和目標(biāo)Observable發(fā)射的數(shù)據(jù)項变骡, 處理后的數(shù)據(jù)返回給觀察者對象。
注意: 這是源Observable和目標(biāo)Observable發(fā)射數(shù)據(jù)在任意一個基于時間窗口的有效期內(nèi)才會接收到組合數(shù)據(jù)懊缺,這就意味著可能有數(shù)據(jù)丟失的情況猛遍,在其中一個已經(jīng)發(fā)射完所有數(shù)據(jù),并且沒有處于時間窗口的數(shù)據(jù)情況涕刚,另一個Observable的數(shù)據(jù)發(fā)射將不會收到組合數(shù)據(jù)晨缴。
示例代碼:
// Observable的創(chuàng)建
Observable<Long> sourceObservable = Observable.intervalRange(1, 5, 1, 500, TimeUnit.MILLISECONDS);
Observable<Long> targetObservable = Observable.intervalRange(10, 5, 1, 1000, TimeUnit.MILLISECONDS);
// 1. join(other, leftEnd, rightEnd, resultSelector)
// other: 目標(biāo)組合的Observable
// leftEnd: 接收一個源數(shù)據(jù)項装盯,返回一個Observable,這個Observable的生命周期就是源Observable發(fā)射數(shù)據(jù)的有效期
// rightEnd: 接收一個源數(shù)據(jù)項,返回一個Observable,這個Observable的生命周期就是目標(biāo)Observable發(fā)射數(shù)據(jù)的有效期
// resultSelector: 接收源Observable和目標(biāo)Observable發(fā)射的數(shù)據(jù)項幌衣, 處理后的數(shù)據(jù)返回給觀察者對象
sourceObservable.join(targetObservable, new Function<Long, ObservableSource<Long>>() {
@Override
public ObservableSource<Long> apply(Long t) throws Exception {
System.out.println("-----> t1 is emitter: " + t);
return Observable.timer(1000, TimeUnit.MILLISECONDS); // 源Observable發(fā)射數(shù)據(jù)的有效期為1000毫秒
}
}, new Function<Long, ObservableSource<Long>>() {
@Override
public ObservableSource<Long> apply(Long t) throws Exception {
System.out.println("-----> t2 is emitter: " + t);
return Observable.timer(1000, TimeUnit.MILLISECONDS); // 目標(biāo)Observable發(fā)射數(shù)據(jù)的有效期為1000毫秒
}
}, new BiFunction<Long, Long, String>() {
@Override
public String apply(Long t1, Long t2) throws Exception {
return "t1 = " + t1 + ", t2 = " + t2; // 對數(shù)據(jù)進行組合后返回和觀察者
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String t) throws Exception {
System.out.println("--> accept(1): " + t);
}
});
System.in.read();
輸出:
-----> t1 is emitter: 1
-----> t2 is emitter: 10
--> accept(1): t1 = 1, t2 = 10
-----> t1 is emitter: 2
--> accept(1): t1 = 2, t2 = 10
-----> t1 is emitter: 3
--> accept(1): t1 = 3, t2 = 10
-----> t2 is emitter: 11
--> accept(1): t1 = 1, t2 = 11
--> accept(1): t1 = 2, t2 = 11
--> accept(1): t1 = 3, t2 = 11
-----> t1 is emitter: 4
--> accept(1): t1 = 4, t2 = 11
-----> t1 is emitter: 5
--> accept(1): t1 = 5, t2 = 11
-----> t2 is emitter: 12
--> accept(1): t1 = 3, t2 = 12
--> accept(1): t1 = 4, t2 = 12
--> accept(1): t1 = 5, t2 = 12
-----> t2 is emitter: 13
--> accept(1): t1 = 5, t2 = 13
-----> t2 is emitter: 14 // 此時源t1中已經(jīng)沒有數(shù)據(jù)還處于時間窗口有效期內(nèi)
groupJoin
groupJoin
操作符與 join
相同趁耗,只是參數(shù)傳遞有所區(qū)別右冻。groupJoin(other, leftEnd, rightEnd, resultSelector) 中的resultSelector
可以將原始數(shù)據(jù)轉(zhuǎn)換為 Observable 類型的數(shù)據(jù)發(fā)送給觀察者。
示例代碼:
// Observable的創(chuàng)建
Observable<Long> sourceObservable = Observable.intervalRange(1, 5, 1, 500, TimeUnit.MILLISECONDS);
Observable<Long> targetObservable = Observable.intervalRange(10, 5, 1, 1000, TimeUnit.MILLISECONDS);
// 2. groupJoin(other, leftEnd, rightEnd, resultSelector)
// groupJoin操作符與join相同著拭,只是參數(shù)傳遞有所區(qū)別纱扭。
// resultSelector可以將原始數(shù)據(jù)轉(zhuǎn)換為Observable類型的數(shù)據(jù)發(fā)送給觀察者。
sourceObservable.groupJoin(targetObservable, new Function<Long, ObservableSource<Long>>() {
@Override
public ObservableSource<Long> apply(Long t) throws Exception {
System.out.println("-----> t1 is emitter: " + t);
return Observable.timer(1000, TimeUnit.MILLISECONDS); // 源Observable發(fā)射數(shù)據(jù)的有效期為1000毫秒
}
}, new Function<Long, ObservableSource<Long>>() {
@Override
public ObservableSource<Long> apply(Long t) throws Exception {
System.out.println("-----> t2 is emitter: " + t);
return Observable.timer(1000, TimeUnit.MILLISECONDS); // 目標(biāo)Observable發(fā)射數(shù)據(jù)的有效期為1000毫秒
}
}, new BiFunction<Long, Observable<Long>, Observable<String>>() {
@Override
public Observable<String> apply(Long t1, Observable<Long> t2) throws Exception {
System.out.println("--> apply(2) combine: " + t1); // 結(jié)合操作
return t2.map(new Function<Long, String>() {
@Override
public String apply(Long t) throws Exception {
System.out.println("-----> apply(2) operation: " + t);
return "t1 = " + t1 + ", t2 = " + t;
}
});
}
}).subscribe(new Consumer<Observable<String>>() {
@Override
public void accept(Observable<String> stringObservable) throws Exception {
stringObservable.subscribe(new Consumer<String>() {
@Override
public void accept(String t) throws Exception {
System.out.println("--> accept(2): " + t);
}
});
}
});
輸出:
-----> t1 is emitter: 1
--> apply(2) combine: 1
-----> t2 is emitter: 10
-----> apply(2) operation: 10
--> accept(2): t1 = 1, t2 = 10
-----> t1 is emitter: 2
--> apply(2) combine: 2
-----> apply(2) operation: 10
--> accept(2): t1 = 2, t2 = 10
-----> t1 is emitter: 3
--> apply(2) combine: 3
-----> apply(2) operation: 10
--> accept(2): t1 = 3, t2 = 10
-----> t2 is emitter: 11
-----> apply(2) operation: 11
--> accept(2): t1 = 1, t2 = 11
-----> apply(2) operation: 11
--> accept(2): t1 = 2, t2 = 11
-----> apply(2) operation: 11
--> accept(2): t1 = 3, t2 = 11
-----> t1 is emitter: 4
--> apply(2) combine: 4
-----> apply(2) operation: 11
--> accept(2): t1 = 4, t2 = 11
-----> t1 is emitter: 5
--> apply(2) combine: 5
-----> apply(2) operation: 11
--> accept(2): t1 = 5, t2 = 11
-----> t2 is emitter: 12
-----> apply(2) operation: 12
--> accept(2): t1 = 3, t2 = 12
-----> apply(2) operation: 12
--> accept(2): t1 = 4, t2 = 12
-----> apply(2) operation: 12
--> accept(2): t1 = 5, t2 = 12
-----> t2 is emitter: 13
-----> apply(2) operation: 13
--> accept(2): t1 = 5, t2 = 13
-----> t2 is emitter: 14
Javadoc: groupJoin(other, leftEnd, rightEnd, resultSelector)
3. Merge
合并多個Observables的發(fā)射物儡遮。
使用 Merge
操作符你可以將多個Observables的輸出合并乳蛾,就好像它們是一個單個的 Observable 一樣。
3.1 merge
Merge 可能會讓合并的Observables發(fā)射的數(shù)據(jù)交錯(有一個類似的操作符 Concat
不會讓數(shù)據(jù)交錯鄙币,它會按順序一個接著一個發(fā)射多個Observables的發(fā)射物)肃叶,任何一個原始Observable的 onError
通知會被立即傳遞給觀察者,而且會終止合并后的Observable十嘿。
除了傳遞多個Observable給 merge 因惭,你還可以傳遞一個Observable列表 List ,數(shù)組绩衷,甚至是一個發(fā)射Observable序列的Observable蹦魔, merge 將合并它們的輸出作為單個Observable的輸出激率。
如果你傳遞一個發(fā)射Observables序列的Observable,你可以指定 merge 應(yīng)該同時訂閱的 Observable 的最大數(shù)量勿决。一旦達到訂閱數(shù)的限制乒躺,它將不再訂閱原始Observable發(fā)射的任何其它Observable,直到某個已經(jīng)訂閱的Observable發(fā)射了 onCompleted 通知低缩。
示例代碼:
// 創(chuàng)建Observable對象
Observable<Integer> odd = Observable.just(1, 3, 5);
Observable<Integer> even = Observable.just(2, 4, 6);
Observable<Integer> big = Observable.just(188888, 688888, 888888);
// 創(chuàng)建list對象
List<Observable<Integer>> list = new ArrayList<>();
list.add(odd);
list.add(even);
list.add(big);
// 創(chuàng)建Array對象
Observable<Integer>[] observables = new Observable[3];
observables[0] = odd;
observables[1] = even;
observables[2] = big;
// 創(chuàng)建發(fā)射Observable序列的Observable
Observable<ObservableSource<Integer>> sources = Observable.create(new ObservableOnSubscribe<ObservableSource<Integer>>() {
@Override
public void subscribe(ObservableEmitter<ObservableSource<Integer>> emitter) throws Exception {
emitter.onNext(Observable.just(1));
emitter.onNext(Observable.just(1, 2));
emitter.onNext(Observable.just(1, 2, 3));
emitter.onNext(Observable.just(1, 2, 3, 4));
emitter.onNext(Observable.just(1, 2, 3, 4, 5));
emitter.onComplete();
}
});
// 1. merge(ObservableSource source1, ObservableSource source2, ..., ObservableSource source4)
// 可接受 2-4 個Observable對象進行merge
Observable.merge(odd, even)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("--> accept(1): " + integer);
}
});
System.out.println("-----------------------------------------------");
// 2. merge(Iterable<? extends ObservableSource<? extends T>> sources, int maxConcurrency, int bufferSize)
// 可選參數(shù), maxConcurrency: 最大的并發(fā)處理數(shù), bufferSize: 緩存的數(shù)量(從每個內(nèi)部觀察資源預(yù)取的項數(shù))
// 接受一個Observable的列表List
Observable.merge(list)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("--> accept(2): " + integer);
}
});
System.out.println("-----------------------------------------------");
// 3. mergeArray(int maxConcurrency, int bufferSize, ObservableSource<? extends T>... sources)
// 可選參數(shù), maxConcurrency: 最大的并發(fā)處理數(shù), bufferSize: 緩存的數(shù)量(從每個內(nèi)部觀察資源預(yù)取的項數(shù))
// 接受一個Observable的數(shù)組Array
Observable.mergeArray(observables)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("--> accept(3): " + integer);
}
});
System.out.println("-----------------------------------------------");
// 4. merge(ObservableSource<? extends ObservableSource<? extends T>> sources, int maxConcurrency)
// 可選參數(shù), maxConcurrency: 最大的并發(fā)處理數(shù)
// 接受一個發(fā)射Observable序列的Observable
Observable.merge(sources)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("--> accept(4): " + integer);
}
});
System.out.println("-----------------------------------------------");
// 5. mergeWith(other)
// merge 是靜態(tài)方法嘉冒, mergeWith 是對象方法: Observable.merge(odd,even) 等價于 odd.mergeWith(even)
odd.mergeWith(even)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("--> accept(5): " + integer);
}
});
輸出:
--> accept(1): 1
--> accept(1): 3
--> accept(1): 5
--> accept(1): 2
--> accept(1): 4
--> accept(1): 6
-----------------------------------------------
--> accept(2): 1
--> accept(2): 3
--> accept(2): 5
--> accept(2): 2
--> accept(2): 4
--> accept(2): 6
--> accept(2): 188888
--> accept(2): 688888
--> accept(2): 888888
-----------------------------------------------
--> accept(3): 1
--> accept(3): 3
--> accept(3): 5
--> accept(3): 2
--> accept(3): 4
--> accept(3): 6
--> accept(3): 188888
--> accept(3): 688888
--> accept(3): 888888
-----------------------------------------------
--> accept(4): 1
--> accept(4): 1
--> accept(4): 2
--> accept(4): 1
--> accept(4): 2
--> accept(4): 3
--> accept(4): 1
--> accept(4): 2
--> accept(4): 3
--> accept(4): 4
--> accept(4): 1
--> accept(4): 2
--> accept(4): 3
--> accept(4): 4
--> accept(4): 5
-----------------------------------------------
--> accept(5): 1
--> accept(5): 3
--> accept(5): 5
--> accept(5): 2
--> accept(5): 4
--> accept(5): 6
Javadoc: merge(source1, ... , source4)
Javadoc: merge(Iterable sources, int maxConcurrency, int bufferSize)
Javadoc: mergeArray(int maxConcurrency, int bufferSize, ObservableSource... sources)
Javadoc: merge(ObservableSource<ObservableSource> sources, int maxConcurrency)
3.2 mergeDelayError
如果傳遞給 merge 的任何一個的Observable發(fā)射了 onError
通知終止了, merge 操作符生成的Observable也會立即以onError
通知終止咆繁。如果你想讓它繼續(xù)發(fā)射數(shù)據(jù)讳推,在最后才報告錯誤,可以使用 mergeDelayError
玩般。
MergeDelayError
操作符娜遵,mergeDelayError 在合并與交錯輸出的使用上與 merge
相同,區(qū)別在于它會保留 onError
通知直到其他沒有Error的Observable所有的數(shù)據(jù)發(fā)射完成壤短,在那時它才會把onError
傳遞給觀察者设拟。
注意: 如果有多個原始Observable出現(xiàn)了Error
, 這些Error通知會被合并成一個 CompositeException
久脯,保留在CompositeException 內(nèi)部的 List<Throwable> exceptions
中纳胧,但是如果只有一個原始Observable出現(xiàn)了Error,則不會生成 CompositeException 帘撰,只會發(fā)送這個Error通知跑慕。
由于MergeDelayError
使用上和merge
相同 ,所以這里就不做詳細(xì)分析了摧找,這里就簡單描述其中的一種的使用實例核行。
實例代碼:
// 創(chuàng)建有Error的Observable序列的Observable
Observable<ObservableSource<Integer>> DelayErrorObservable = Observable.create(new ObservableOnSubscribe<ObservableSource<Integer>>() {
@Override
public void subscribe(ObservableEmitter<ObservableSource<Integer>> emitter) throws Exception {
emitter.onNext(Observable.just(1));
emitter.onNext(Observable.error(new Exception("Error Test1"))); // 發(fā)射一個Error的通知的Observable
emitter.onNext(Observable.just(2, 3));
emitter.onNext(Observable.error(new Exception("Error Test2"))); // 發(fā)射一個Error的通知的Observable
emitter.onNext(Observable.just(4, 5, 6));
emitter.onComplete();
}
});
// 6. mergeDelayError
// 保留onError通知直到合并后的Observable所有的數(shù)據(jù)發(fā)射完成,在那時它才會把onError傳遞給觀察者
Observable.mergeDelayError(DelayErrorObservable)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe(6)");
}
@Override
public void onNext(Integer integer) {
System.out.println("--> onNext(6): " + integer);
}
@Override
public void onError(Throwable e) {
// 判斷是否是CompositeException對象(發(fā)生多個Observable出現(xiàn)Error時會發(fā)送的對象)
if (e instanceof CompositeException) {
CompositeException compositeException = (CompositeException) e;
List<Throwable> exceptions = compositeException.getExceptions();
System.out.println("--> onError(6): " + exceptions);
} else {
System.out.println("--> onError(6): " + e);
}
}
@Override
public void onComplete() {
System.out.println("--> onComplete(6)");
}
});
輸出:
--> onSubscribe(6)
--> onNext(6): 1
--> onNext(6): 2
--> onNext(6): 3
--> onNext(6): 4
--> onNext(6): 5
--> onNext(6): 6
--> onError(6): [java.lang.Exception: Error Test1, java.lang.Exception: Error Test2]
Javadoc: mergeDelayError(source1, … , source4)
Javadoc: mergeDelayError(Iterable sources, int maxConcurrency, int bufferSize)
Javadoc: mergeArrayDelayError(int maxConcurrency, int bufferSize, ObservableSource… sources)
Javadoc: mergeDelayError(ObservableSource sources, int maxConcurrency)
4. Zip
通過一個函數(shù)將多個Observables的發(fā)射物結(jié)合到一起蹬耘,基于這個函數(shù)的結(jié)果為每個 結(jié)合體 發(fā)射單個數(shù)據(jù)項芝雪。
Zip
操作符與 Merge
類似,都是合并多個Observables的數(shù)據(jù)综苔,返回一個Obversable惩系,主要不同的是它使用這個函數(shù)按順序結(jié)合兩個或多個Observables發(fā)射的數(shù)據(jù)項,然后它發(fā)射這個函數(shù)返回的結(jié)果如筛。它按照嚴(yán)格的順序應(yīng)用這個函數(shù)堡牡。 它只發(fā)射與發(fā)射數(shù)據(jù)項最少的那個Observable一樣多的數(shù)據(jù)。
解析:
-
Zip
操作符與Merge
的使用上基本一致杨刨,主要不同的是 zip 發(fā)射的數(shù)據(jù)取決于發(fā)射數(shù)據(jù)項最少的那個Observable并且按照嚴(yán)格的順序去結(jié)合數(shù)據(jù)晤柄。 - 同樣具備靜態(tài)方法
zip
與對象方法zipWith
,可以傳遞一個Observable列表 List 妖胀,數(shù)組芥颈,甚至是一個發(fā)射Observable序列的Observable惠勒。
使用上在此就不做詳細(xì)的展開了,可參照上面的 Merge
使用方法浇借,下面就針對 zip
的特性實現(xiàn)一個簡單的實例。
實例代碼:
// 創(chuàng)建Observable
Observable<Integer> observable1 = Observable.just(1, 2, 3);
Observable<Integer> observable2 = Observable.just(1, 2, 3, 4, 5, 6);
// zip(sources)
// 可接受2-9個參數(shù)的Observable怕品,對其進行順序合并操作妇垢,最終合并的數(shù)據(jù)項取決于最少的數(shù)據(jù)項的Observable
Observable.zip(observable1, observable2, new BiFunction<Integer, Integer, String>() {
@Override
public String apply(Integer t1, Integer t2) throws Exception {
System.out.println("--> apply: t1 = " + t1 + ", t2 = " + t2);
return t1 + t2 + "";
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("--> accept: " + s); // 最終接受observable1全部數(shù)據(jù)項與observable2相同數(shù)量順序部分?jǐn)?shù)據(jù)
}
});
輸出:
--> apply: t1 = 1, t2 = 1
--> accept: 2
--> apply: t1 = 2, t2 = 2
--> accept: 4
--> apply: t1 = 3, t2 = 3
--> accept: 6
Javadoc: zip( source1, source2, ... , source9, zipper )
Javadoc: zip( Iterable sources, Function zipper )
Javadoc: zipIterable(Iterable<ObservableSource> sources, Function<Object[],R> zipper, boolean delayError, int bufferSize)
Javadoc: zipArray( Function<Object[]> zipper, boolean delayError, int bufferSize, ObservableSource... sources )
Javadoc: zip( ObservableSource<ObservableSource> sources, Function<Object[]> zipper )
5. StartWith
在數(shù)據(jù)序列的開頭插入一條指定的數(shù)據(jù)項或者數(shù)據(jù)序列。
如果你想要一個Observable在發(fā)射數(shù)據(jù)之前先發(fā)射一個指定的數(shù)據(jù)或者數(shù)據(jù)序列(可以是單個數(shù)據(jù)肉康、數(shù)組闯估、列表,Observable中的數(shù)據(jù))吼和,可以使 用
StartWith
操作符涨薪。(如果你想一個Observable發(fā)射的數(shù)據(jù)末尾追加一個數(shù)據(jù)序列可以使用 Concat
操作符。)
實例代碼:
// 創(chuàng)建列表List
List<Integer> lists = new ArrayList<>();
lists.add(999);
lists.add(9999);
lists.add(99999);
// 創(chuàng)建數(shù)組Array
Integer[] arrays = new Integer[3];
arrays[0] = 999;
arrays[1] = 9999;
arrays[2] = 9999;
// 1. startWith(item)
// 在Observable數(shù)據(jù)發(fā)射前發(fā)射item數(shù)據(jù)項
Observable.just(1, 2, 3)
.startWith(999)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("--> accept(1): " + integer);
}
});
System.out.println("-----------------------------------------");
// 2. startWith(Iterable items)
// 在Observable數(shù)據(jù)發(fā)射前發(fā)射items列表中的數(shù)據(jù)序列
Observable.just(1, 2, 3)
.startWith(lists)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("--> accept(2): " + integer);
}
});
System.out.println("-----------------------------------------");
// 3. startWithArray(items)
// 在Observable數(shù)據(jù)發(fā)射前發(fā)射items數(shù)組中的數(shù)據(jù)序列
Observable.just(1, 2, 3)
.startWithArray(arrays)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("--> accept(3): " + integer);
}
});
System.out.println("-----------------------------------------");
// 4. startWith(ObservableSource other)
// 在Observable數(shù)據(jù)發(fā)射前發(fā)射other中的數(shù)據(jù)序列
Observable.just(1, 2, 3)
.startWith(Observable.just(999, 9999, 99999))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("--> accept(4): " + integer);
}
});
輸出:
--> accept(1): 999
--> accept(1): 1
--> accept(1): 2
--> accept(1): 3
-----------------------------------------
--> accept(2): 999
--> accept(2): 9999
--> accept(2): 99999
--> accept(2): 1
--> accept(2): 2
--> accept(2): 3
-----------------------------------------
--> accept(3): 999
--> accept(3): 9999
--> accept(3): 9999
--> accept(3): 1
--> accept(3): 2
--> accept(3): 3
-----------------------------------------
--> accept(4): 999
--> accept(4): 9999
--> accept(4): 99999
--> accept(4): 1
--> accept(4): 2
--> accept(4): 3
Javadoc: startWith(item)
Javadoc: startWith(Iterable items)
Javadoc: startWithArray(items)
Javadoc: startWith(ObservableSource other)
6. SwitchOnNext
將一個發(fā)射多個Observables的Observable轉(zhuǎn)換成另一個單獨的Observable炫乓,后者發(fā)射那些 Observables最近發(fā)射的數(shù)據(jù)項刚夺。
6.1 switchOnNext
switchOnNext
訂閱一個發(fā)射多個Observables的Observable。它每次觀察那些Observables中的一個末捣, switchOnNext
發(fā)射的這個新Observable并取消訂閱前一個發(fā)射數(shù)據(jù)的舊Observable侠姑,開始發(fā)射最新的Observable發(fā)射的數(shù)據(jù)。
注意: 當(dāng)原始Observables發(fā)射了一個新的Observable時(不是這個新的Observable發(fā)射了一條數(shù)據(jù)時)箩做,它將取消訂閱之前的那個Observable莽红。這意味著,在 后來那個Observable產(chǎn)生之后到它開始發(fā)射數(shù)據(jù)之前的這段時間里邦邦,前一個Observable發(fā)射 的數(shù)據(jù)將被丟棄(就像圖例上的那個黃色圓圈一樣)安吁。
6.2 switchOnNextDelayError
當(dāng)Observables
發(fā)射一個新的Observable后,則會取消訂閱前面的舊observable燃辖,直接開始接受新Observable的數(shù)據(jù)鬼店,如果Observables中的Observable有 Error
異常,將保留 onError
通知直到其他沒有Error的Observable所有的數(shù)據(jù)發(fā)射完成黔龟,在那時它才會把 onError 傳遞給觀察者薪韩。
注意: 如果有多個原始Observable出現(xiàn)了Error
, 這些Error通知會被合并成一個 CompositeException
捌锭,保留在CompositeException 內(nèi)部的 List<Throwable> exceptions
中俘陷,但是如果只有一個原始Observable出現(xiàn)了Error,則不會生成 CompositeException 观谦,只會發(fā)送這個Error通知拉盾。
實例代碼:
// 創(chuàng)建Observable
Observable<Long> observable1 = Observable.intervalRange(1, 5, 1, 500, TimeUnit.MILLISECONDS);
Observable<Long> observable2 = Observable.intervalRange(10, 5, 1, 500, TimeUnit.MILLISECONDS);
// 創(chuàng)建發(fā)射Observable序列的Observable
Observable<Observable<Long>> sources = Observable.create(new ObservableOnSubscribe<Observable<Long>>() {
@Override
public void subscribe(ObservableEmitter<Observable<Long>> emitter) throws Exception {
emitter.onNext(observable1);
Thread.sleep(1000);
// 此時發(fā)射一個新的observable2,將會取消訂閱observable1
emitter.onNext(observable2);
emitter.onComplete();
}
});
// 創(chuàng)建發(fā)射含有Error通知的Observable序列的Observable
Observable<Observable<Long>> sourcesError = Observable.create(new ObservableOnSubscribe<Observable<Long>>() {
@Override
public void subscribe(ObservableEmitter<Observable<Long>> emitter) throws Exception {
emitter.onNext(observable1);
emitter.onNext(Observable.error(new Exception("Error Test1!"))); // 發(fā)射一個發(fā)射Error通知的Observable
emitter.onNext(Observable.error(new Exception("Error Test2!"))); // 發(fā)射一個發(fā)射Error通知的Observable
Thread.sleep(1000);
// 此時發(fā)射一個新的observable2豁状,將會取消訂閱observable1
emitter.onNext(observable2);
emitter.onComplete();
}
});
// 1. switchOnNext(ObservableSource<ObservableSource> sources, int bufferSize)
// 可選參數(shù) bufferSize: 緩存數(shù)據(jù)項大小
// 接受一個發(fā)射Observable序列的Observable類型的sources捉偏,
// 當(dāng)sources發(fā)射一個新的Observable后倒得,則會取消訂閱前面的舊observable,直接開始接受新Observable的數(shù)據(jù)
Observable.switchOnNext(sources)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long integer) throws Exception {
System.out.println("--> accept(1): " + integer);
}
});
System.in.read();
System.out.println("--------------------------------------------------------------------");
// 2. switchOnNextDelayError(ObservableSource<ObservableSource> sources, int prefetch)
// 可選參數(shù) prefetch: 與讀取數(shù)據(jù)項大小
// 當(dāng)sources發(fā)射一個新的Observable后夭禽,則會取消訂閱前面的舊observable霞掺,直接開始接受新Observable的數(shù)據(jù),
// 保留onError通知直到合并后的Observable所有的數(shù)據(jù)發(fā)射完成讹躯,在那時它才會把onError傳遞給觀察者
Observable.switchOnNextDelayError(sourcesError)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe(2)");
}
@Override
public void onNext(Long t) {
System.out.println("--> onNext(2): " + t);
}
@Override
public void onError(Throwable e) {
// 判斷是否是CompositeException對象(發(fā)生多個Observable出現(xiàn)Error時會發(fā)送的對象)
if (e instanceof CompositeException) {
CompositeException compositeException = (CompositeException) e;
List<Throwable> exceptions = compositeException.getExceptions();
System.out.println("--> onError(2): " + exceptions);
} else {
System.out.println("--> onError(2): " + e);
}
}
@Override
public void onComplete() {
System.out.println("--> onComplete(2)");
}
});
System.in.read();
輸出:
--> accept(1): 1
--> accept(1): 2
--> accept(1): 10
--> accept(1): 11
--> accept(1): 12
--> accept(1): 13
--> accept(1): 14
--------------------------------------------------------------------
--> onSubscribe(2)
--> onNext(2): 10
--> onNext(2): 11
--> onNext(2): 12
--> onNext(2): 13
--> onNext(2): 14
--> onError(2): [java.lang.Exception: Error Test1!, java.lang.Exception: Error Test2!]
Javadoc: switchOnNext(ObservableSource<ObservableSource> sources, int bufferSize)
Javadoc: switchOnNextDelayError(ObservableSource<ObservableSource> sources, int prefetch)
小結(jié)
Rxjava 的合并操作符能夠同時處理多個被觀察者菩彬,并發(fā)送相應(yīng)的事件通知以及數(shù)據(jù)。常常應(yīng)用于多業(yè)務(wù)合并處理場景潮梯,比如表單的聯(lián)動驗證骗灶,網(wǎng)絡(luò)交互性數(shù)據(jù)的校驗等,rxjava的合并操作符能夠很好的去實現(xiàn)和處理秉馏。
提示:以上使用的Rxjava2版本: 2.2.12
Rx介紹與講解及完整目錄參考:Rxjava2 介紹與詳解實例
實例代碼: