concat/concatArray
組合多個被觀察者一起發(fā)送數(shù)據(jù),合并后 按發(fā)送順序串行執(zhí)行
區(qū)別:concat()組合被觀察者數(shù)量<=4個皆刺,concatArray數(shù)量大于4個
Observable.concat(Observable.just(1, 2, 3),
Observable.just(4, 5, 6),
Observable.just(7, 8, 9),
Observable.just(10, 11, 12))
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("yzh","onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.e("yzh","onNext--"+integer);
}
@Override
public void onError(Throwable e) {
Log.e("yzh","onError--"+e.toString());
}
@Override
public void onComplete() {
Log.e("yzh","onComplete");
}
});
打印結果
concat
Observable.concatArray(Observable.just(1, 2, 3),
Observable.just(4, 5, 6),
Observable.just(7, 8, 9),
Observable.just(10, 11, 12),
Observable.just(11,12,13))
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("yzh","onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.e("yzh","onNext--"+integer);
}
@Override
public void onError(Throwable e) {
Log.e("yzh","onError--"+e.toString());
}
@Override
public void onComplete() {
Log.e("yzh","onComplete");
}
});
打印結果
concatArray
merge()/mergeArray()
組合多個被觀察者一起發(fā)送數(shù)據(jù)难衰,將同一時刻的事件合并然后發(fā)送俘陷,再順序合并下面的事件
區(qū)別與concat/concatArray一樣
Observable.merge(Observable.intervalRange(0,3,1,1, TimeUnit.SECONDS),
Observable.intervalRange(2,3,1,1,TimeUnit.SECONDS))
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("yzh","onSubScribe");
}
@Override
public void onNext(Long aLong) {
Log.e("yzh","onNext--"+aLong);
}
@Override
public void onError(Throwable e) {
Log.e("yzh","onError--"+e.toString());
}
@Override
public void onComplete() {
Log.e("yzh","onComplete");
}
});
打印結果
merge
concatDelayError() / mergeDelayError()
當合并的被觀察中有一個發(fā)出onError事件時取视,其他的被觀察者的事件也會被阻止發(fā)送甥材,使用上面這兩個方法可以將onError事件推遲到其他被觀察者發(fā)送事件結束后才觸發(fā)
Observable.concatArrayDelayError(Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new NullPointerException());
}
}),Observable.just(1,2,3))
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer serializable) {
Log.e("yzh","onNext--"+serializable);
}
@Override
public void onError(Throwable e) {
Log.e("yzh","onError-"+e.toString());
}
@Override
public void onComplete() {
Log.e("yzh","onComplete");
}
});
打印結果
concatArrayDelayError
如果直接使用concat結果如下
onNext--1
onNext--2
onError--java.lang.NullPointException
zip
合并多個被觀察者發(fā)送的事件绑改,生成一個新的事件序列谢床,然后發(fā)送
Observable<Integer> observable1 =Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
Log.e("yzh","被觀察者1發(fā)送事件1");
e.onNext(1);
Thread.sleep(1000);
Log.e("yzh","被觀察者1發(fā)送事件2");
e.onNext(2);
Thread.sleep(1000);
// e.onComplete();
}
}).subscribeOn(Schedulers.io());
Observable<String> observable2 =Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
Log.e("yzh","觀察者2發(fā)送事件1");
e.onNext("a");
Thread.sleep(1000);
Log.e("yzh","觀察者2發(fā)送事件2");
e.onNext("b");
Thread.sleep(1000);
Log.e("yzh","被觀察者2發(fā)送事件3");
e.onNext("c");
Thread.sleep(1000);
e.onComplete();
}
}).subscribeOn(Schedulers.newThread());
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String string) throws Exception {
Log.e("yzh","apply") ;
return integer + string;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("yzh", "onSubscribe");
}
@Override
public void onNext(String value) {
Log.e("yzh", "onNext = " + value);
}
@Override
public void onError(Throwable e) {
Log.e("yzh", "onError");
}
@Override
public void onComplete() {
Log.e("yzh", "onComplete");
}
});
打印結果
zip
注意 例子中的兩個觀察者用subscribeOn使用了不同的 線程,如果不加上這句代碼厘线,zip效果與concat一樣识腿,可以試一試。
combineLatest()
對兩個被觀察者中的事件組合再發(fā)送造壮,特點是將第一個被觀察者中最后一個事件分別與另一個被觀察者中的事件組合再發(fā)送渡讼。
Observable.combineLatest(Observable.just(1L, 2L, 3L),
Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS), new BiFunction<Long, Long, Long>() {
@Override
public Long apply(Long integer, Long aLong) throws Exception {
Log.e("yzh","合并的對象--"+integer+"--"+aLong);
return integer+aLong;
}
}).subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("yzh","onSubscribe");
}
@Override
public void onNext(Long s) {
Log.e("yzh","onNext--"+s);
}
@Override
public void onError(Throwable e) {
Log.e("yzh","onError--"+e.toString());
}
@Override
public void onComplete() {
Log.e("yzh","onComplete");
}
});
打印結果
combineLatest
reduce()
把被觀察者需要發(fā)送的事件聚合成1個事件然后發(fā)送,有點斐波那契數(shù)列的意思
Observable.just(1,2,3,4)
.reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
Log.e("yzh","操作數(shù)據(jù)--"+integer+"---"+integer2);
return integer*integer2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("yzh","接受到的數(shù)據(jù)--"+integer);
}
});
打印結果
reduce
collect()
將被觀察者Observable發(fā)送的數(shù)據(jù)事件收集到一個數(shù)據(jù)結構里
Observable.just(1,2,3,4,5)
.collect(new Callable<ArrayList<Integer>>() {
@Override
public ArrayList<Integer> call() throws Exception {
return new ArrayList<>();
}
}, new BiConsumer<ArrayList<Integer>, Integer>() {
@Override
public void accept(ArrayList<Integer> integers, Integer integer) throws Exception {
integers.add(integer);
}
}).subscribe(new Consumer<ArrayList<Integer>>() {
@Override
public void accept(ArrayList<Integer> integers) throws Exception {
Log.e("yzh","accept--"+integers.toString());
}
});
打印結果
collect
startWith() / startWithArray()
在一個被觀察者發(fā)送事件前耳璧,追加發(fā)送一些數(shù)據(jù) / 一個新的被觀察者
注意 后面的方法添加的數(shù)據(jù)在前面
Observable.just(4,5,6)
.startWith(0)
.startWithArray(1,2,3)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("yzh","onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.e("yzh","onNext--"+integer);
}
@Override
public void onError(Throwable e) {
Log.e("yzh","onError--"+e.toString());
}
@Override
public void onComplete() {
Log.e("yzh","onComplete");
}
});
打印結果
startWith