(1)concat和concatArray
組合多個(gè)被觀察者一起發(fā)送數(shù)據(jù)夸溶,合并后 按發(fā)送順序串行執(zhí)行
List<Observable<Integer>> list = new ArrayList<>();
list.add(Observable.just(1,2));
list.add(Observable.just(3, 4));
list.add(Observable.just(5, 6));
Observable.concat(list)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("aaa", String.valueOf(integer));
}
});
Observable.concatArray(Observable.just(1, 2), Observable.just(3, 4), Observable.just(5, 6))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("aaa", String.valueOf(integer));
}
});
以上兩種方式的合并揩慕,返回結(jié)果是:1 2 3 4 5 6
(2)merge和mergeArray
組合多個(gè)被觀察者一起發(fā)送數(shù)據(jù)岩瘦,合并后 按時(shí)間線(xiàn)并行執(zhí)行
List<Observable<Integer>> list = new ArrayList<>();
list.add(Observable.just(1, 2).delay(2000, TimeUnit.MILLISECONDS));
list.add(Observable.just(3, 4));
list.add(Observable.just(5, 6));
list.add(Observable.just(7, 8));
Observable
.merge(list)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("aaa", String.valueOf(integer));
}
});
Observable
.mergeArray(Observable.just(1, 2).delay(2000, TimeUnit.MILLISECONDS), Observable.just(3, 4), Observable.just(5, 6), Observable.just(7, 8))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("aaa", String.valueOf(integer));
}
});
以上兩種方法的返回結(jié)果是:3 4 5 6 7 8 1 2
(3)concatDelayError和mergeDelayError
concatDelayError:多個(gè)Observable合并揉燃,并按順序發(fā)射數(shù)據(jù)掸哑, 如果發(fā)生異常约急,則不會(huì)立即中斷發(fā)射數(shù)據(jù),異常將延遲發(fā)射苗分。
mergeDelayError:多個(gè)Observable合并厌蔽,并行發(fā)射數(shù)據(jù), 如果發(fā)生異常摔癣,則不會(huì)立即中斷發(fā)射數(shù)據(jù)奴饮,異常將延遲發(fā)射。
List<Observable<Integer>> list = new ArrayList<>();
list.add(Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new NullPointerException("exception"));
e.onComplete();
}
}).delay(2000, TimeUnit.MILLISECONDS));
list.add(Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(3);
e.onNext(4);
e.onError(new NullPointerException("exception"));
e.onComplete();
}
}));
list.add(Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(5);
e.onNext(6);
e.onError(new NullPointerException("exception"));
e.onComplete();
}
}));
Observable
.concatDelayError(list)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("aaa", String.valueOf(integer));
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d("aaa", "發(fā)生了異常");
}
});
返回結(jié)果是:
(4)zip
合并多個(gè)被觀察者的數(shù)據(jù)流供填, 然后發(fā)送(Emit)最終合并的數(shù)據(jù)拐云。(數(shù)據(jù)和數(shù)據(jù)之間是一對(duì)一的關(guān)系)
Observable observable1=Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
SystemClock.sleep(1000);
e.onNext(2);
SystemClock.sleep(1000);
e.onNext(3);
SystemClock.sleep(1000);
e.onNext(4);
SystemClock.sleep(1000);
e.onComplete();
}
}).subscribeOn(Schedulers.io());
Observable observable2=Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("A");
SystemClock.sleep(1000);
e.onNext("B");
SystemClock.sleep(1000);
e.onNext("C");
SystemClock.sleep(1000);
e.onComplete();
}
}).subscribeOn(Schedulers.io());
Observable.zip(observable1, observable2, new BiFunction<Integer,String,String>() {
@Override
public String apply(Integer a,String b) throws Exception {
return a+b;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("aaa", s);
}
});
返回結(jié)果:
(5)combineLatest
按照同一時(shí)間線(xiàn)來(lái)進(jìn)行合并。
Observable observable1=Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
SystemClock.sleep(700);
e.onNext(1);
SystemClock.sleep(700);
e.onNext(2);
SystemClock.sleep(700);
e.onNext(3);
SystemClock.sleep(700);
e.onNext(4);
SystemClock.sleep(700);
e.onComplete();
}
}).subscribeOn(Schedulers.io());
Observable observable2=Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("A");
SystemClock.sleep(600);
e.onNext("B");
SystemClock.sleep(600);
e.onNext("C");
SystemClock.sleep(600);
e.onComplete();
}
}).subscribeOn(Schedulers.io());
Observable.combineLatest(observable1, observable2, new BiFunction<Integer,String,String>() {
@Override
public String apply(Integer a,String b) throws Exception {
return a+b;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("aaa", s);
}
});
接下來(lái)近她,根據(jù)代碼叉瘩,我們來(lái)畫(huà)一張圖。
畫(huà)兩個(gè)時(shí)間線(xiàn)observable1和observable2粘捎,根據(jù)代碼中指定的時(shí)間畫(huà)時(shí)間線(xiàn)薇缅,最后觀察兩個(gè)被觀察者時(shí)間線(xiàn)重合的地方。
實(shí)際上代碼輸出的結(jié)果也是:
(6)combineLatestDelayError
作用類(lèi)似于concatDelayError() / mergeDelayError() 攒磨,即錯(cuò)誤處理泳桦,上面已經(jīng)介紹過(guò)類(lèi)似的了。
(7)reduce
把被觀察者需要發(fā)送的事件聚合成1個(gè)事件 & 發(fā)送
Observable.just(1,2,3,4,5)
.reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("aaa", String.valueOf(integer));
}
});
日志如下:
相當(dāng)于做了一個(gè)這樣的計(jì)算1+2+3+4+5 = 15娩缰,再將15發(fā)射出去灸撰。
(8)collect
將被觀察者Observable發(fā)送的數(shù)據(jù)事件收集到一個(gè)數(shù)據(jù)結(jié)構(gòu)里
Observable
.just(1, 2, 3, 4, 5, 6)
.collect(new Callable<ArrayList<Integer>>() {
@Override
public ArrayList<Integer> call() throws Exception {
return new ArrayList();
}
}, new BiConsumer<ArrayList<Integer>, Integer>() {
@Override
public void accept(ArrayList<Integer> list, Integer integer) throws Exception {
list.add(integer);
}
}).subscribe(new Consumer<ArrayList<Integer>>() {
@Override
public void accept(ArrayList<Integer> list) throws Exception {
for (int result : list){
Log.d("aaa", String.valueOf(result));
}
}
});
執(zhí)行結(jié)果:
(9)startWith和startWithArray
startWith: 在已有數(shù)據(jù)流之前追加一個(gè)或一組數(shù)據(jù)流。
startWith可以傳遞的參數(shù)是:一個(gè)數(shù)據(jù)拼坎,一個(gè)數(shù)據(jù)列表浮毯,一個(gè)Observable。
Observable.just(1, 2, 3)
.startWith(4)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("aaa", String.valueOf(integer));
}
});
返回結(jié)果: 4 1 2 3
startWithArray:在已有數(shù)據(jù)流之前追加一組數(shù)據(jù)流泰鸡。
Observable.just(1, 2, 3)
.startWithArray(4, 5)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("aaa", String.valueOf(integer));
}
});
返回結(jié)果:4 5 1 2 3
(10)count
統(tǒng)計(jì)被觀察者發(fā)送事件的數(shù)量
Observable.just(1, 2, 3, 4)
.count()
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d("aaa", String.valueOf(aLong));
}
});
返回結(jié)果:4