引言
該篇文章主要是關(guān)于RxJava的組合/變換操作符使用的代碼講解卿堂。組合/變換操作符總共有四大類:
(1)組合多個(gè)被觀察者
- 按發(fā)送順序:concat()蜂绎、concatArray()
- 按時(shí)間:merge()、mergeArray()
- 錯(cuò)誤處理:concatDelayError()、mergeDelayError()
(2)合并多個(gè)事件
- 按數(shù)量:zip()
- 按時(shí)間:combineLatest()、combineLatestDelayError()
- 合并成一個(gè)事件發(fā)送:reduce()、collect()
(3)發(fā)送事件前追加發(fā)送事件
- startWith()
- startWithArray()
(4)統(tǒng)計(jì)發(fā)送事件數(shù)量
- count()
1. concat()/concatArray()
組合多個(gè)被觀察者一起發(fā)送數(shù)據(jù)存哲,合并后按發(fā)送順序串行執(zhí)行因宇。
二者區(qū)別:組合被觀察者的數(shù)量,即concat()組合被觀察者數(shù)量≤4個(gè)祟偷,而concatArray()則可>4個(gè)察滑。
Observable.concat(
Observable.just(1,2,3),
Observable.just(4,5,6),
Observable.just(7,8,9),
Observable.just(10,11,12))
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(Constant.TAG,"接收到了事件"+value);
}
@Override
public void onError(Throwable e) {
Log.d(Constant.TAG,"對(duì)Error事件做出響應(yīng)");
}
@Override
public void onComplete() {
Log.d(Constant.TAG,"對(duì)Complete事件做出響應(yīng)");
}
});
Observable.concatArray(Observable.just(1,2),
Observable.just(3,4),
Observable.just(5,6),
Observable.just(7,8),
Observable.just(9,10))
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(Constant.TAG,"接收到了事件"+value);
}
@Override
public void onError(Throwable e) {
Log.d(Constant.TAG,"對(duì)Error事件做出響應(yīng)");
}
@Override
public void onComplete() {
Log.d(Constant.TAG,"對(duì)Complete事件做出響應(yīng)");
}
});
concat()的log信息:
06-22 14:00:51.141 12967-12967/? D/RxJava: 接收到了事件1
06-22 14:00:51.141 12967-12967/? D/RxJava: 接收到了事件2
06-22 14:00:51.141 12967-12967/? D/RxJava: 接收到了事件3
06-22 14:00:51.142 12967-12967/? D/RxJava: 接收到了事件4
06-22 14:00:51.141 12967-12967/? D/RxJava: 接收到了事件5
06-22 14:00:51.141 12967-12967/? D/RxJava: 接收到了事件6
06-22 14:00:51.141 12967-12967/? D/RxJava: 接收到了事件7
06-22 14:00:51.141 12967-12967/? D/RxJava: 接收到了事件8
06-22 14:00:51.143 12967-12967/? D/RxJava: 對(duì)Complete事件做出響應(yīng)
concatArray()的log信息:
06-22 14:00:51.143 12967-12967/? D/RxJava: 對(duì)Complete事件做出響應(yīng)
06-22 14:00:51.143 12967-12967/? D/RxJava: 接收到了事件1
06-22 14:00:51.143 12967-12967/? D/RxJava: 接收到了事件2
06-22 14:00:51.143 12967-12967/? D/RxJava: 接收到了事件3
06-22 14:00:51.144 12967-12967/? D/RxJava: 接收到了事件4
06-22 14:00:51.143 12967-12967/? D/RxJava: 接收到了事件5
06-22 14:00:51.143 12967-12967/? D/RxJava: 接收到了事件6
06-22 14:00:51.143 12967-12967/? D/RxJava: 接收到了事件7
06-22 14:00:51.143 12967-12967/? D/RxJava: 接收到了事件8
06-22 14:00:51.143 12967-12967/? D/RxJava: 接收到了事件9
06-22 14:00:51.143 12967-12967/? D/RxJava: 接收到了事件10
06-22 14:00:51.143 12967-12967/? D/RxJava: 對(duì)Complete事件做出響應(yīng)
2. merge()/mergeArray()
組合多個(gè)被觀察者一起發(fā)送數(shù)據(jù),合并后按時(shí)間線并行執(zhí)行修肠。
1.二者區(qū)別:和上述的concat和concatArray的一樣贺辰;
2.區(qū)別上述concat操作符,同樣是組合多個(gè)被觀察者一起發(fā)送數(shù)據(jù)嵌施,但concat操作符合并后是按發(fā)送順序串行執(zhí)行饲化。
Observable.merge(
Observable.intervalRange(0,3,1,1, TimeUnit.SECONDS),
Observable.intervalRange(2,3,1,1, TimeUnit.SECONDS))
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long value) {
Log.d(Constant.TAG,"接收到了事件"+value);
}
@Override
public void onError(Throwable e) {
Log.d(Constant.TAG,"對(duì)Error事件做出響應(yīng)");
}
@Override
public void onComplete() {
Log.d(Constant.TAG,"對(duì)Complete事件做出響應(yīng)");
}
});
log信息:
06-22 14:23:11.358 14031-14082/com.gjj.frame D/RxJava: 接收到了事件0
06-22 14:23:11.366 14031-14083/com.gjj.frame D/RxJava: 接收到了事件2
06-22 14:23:12.357 14031-14082/com.gjj.frame D/RxJava: 接收到了事件1
06-22 14:23:12.358 14031-14082/com.gjj.frame D/RxJava: 接收到了事件3
06-22 14:23:13.358 14031-14082/com.gjj.frame D/RxJava: 接收到了事件2
06-22 14:23:13.359 14031-14082/com.gjj.frame D/RxJava: 接收到了事件4
06-22 14:23:13.362 14031-14082/com.gjj.frame D/RxJava: 對(duì)Complete事件做出響應(yīng)
3. concatArrayDelayError()/mergeArrayDelayError()
使用concat和merge操作符時(shí),若其中一個(gè)被觀察者發(fā)出onError事件吗伤,則會(huì)馬上終止其他被觀察者繼續(xù)發(fā)送事件吃靠,若希望onError事件推遲到其他被觀察者發(fā)送事件結(jié)束后才處罰,就需要使用對(duì)應(yīng)的concatDelayError或mergeDelayError()操作符足淆。
(1)無使用concatArrayDelayError()的情況
Observable.concat(Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
//對(duì)error事件巢块,因?yàn)闊o使用concatDelayError,所以第二個(gè)Observable將不會(huì)發(fā)送事件
e.onError(new NullPointerException());
e.onComplete();
}
}),Observable.just(4,5,6)).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(Constant.TAG,"接收到了事件"+value);
}
@Override
public void onError(Throwable e) {
Log.d(Constant.TAG,"對(duì)error事件做出響應(yīng)");
}
@Override
public void onComplete() {
Log.d(Constant.TAG,"對(duì)Complete事件做出響應(yīng)");
}
});
測(cè)試結(jié)果:第一個(gè)悲觀者發(fā)送Error事件后,第2個(gè)被觀察者則不會(huì)繼續(xù)發(fā)送事件缸浦。
log信息:
06-25 11:03:06.905 21337-21337/com.gjj.frame D/RxJava: 接收到了事件1
06-25 11:03:06.905 21337-21337/com.gjj.frame D/RxJava: 接收到了事件2
06-25 11:03:06.905 21337-21337/com.gjj.frame D/RxJava: 接收到了事件3
06-25 11:03:06.906 21337-21337/com.gjj.frame D/RxJava: 對(duì)error事件做出響應(yīng)
(2)使用concatArrayDelayError()的情況
Observable.concatArrayDelayError(Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
//對(duì)error事件夕冲,因?yàn)闊o使用concatDelayError,所以第二個(gè)Observable將不會(huì)發(fā)送事件
e.onError(new NullPointerException());
e.onComplete();
}
}),Observable.just(4,5,6)).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(Constant.TAG,"接收到了事件"+value);
}
@Override
public void onError(Throwable e) {
Log.d(Constant.TAG,"對(duì)error事件做出響應(yīng)");
}
@Override
public void onComplete() {
Log.d(Constant.TAG,"對(duì)Complete事件做出響應(yīng)");
}
});
測(cè)試結(jié)果:第1個(gè)被觀察者的error事件將在第2個(gè)被觀察者發(fā)送完事件后再繼續(xù)發(fā)送。
log信息:
06-25 11:08:55.097 21509-21509/com.gjj.frame D/RxJava: 接收到了事件1
06-25 11:08:55.097 21509-21509/com.gjj.frame D/RxJava: 接收到了事件2
06-25 11:08:55.097 21509-21509/com.gjj.frame D/RxJava: 接收到了事件3
06-25 11:08:55.097 21509-21509/com.gjj.frame D/RxJava: 接收到了事件4
06-25 11:08:55.097 21509-21509/com.gjj.frame D/RxJava: 接收到了事件5
06-25 11:08:55.097 21509-21509/com.gjj.frame D/RxJava: 接收到了事件6
06-25 11:08:55.097 21509-21509/com.gjj.frame D/RxJava: 對(duì)error事件做出響應(yīng)
4. Zip()
合并多個(gè)被觀察者(Observable)發(fā)送的事件裂逐,生成一個(gè)新的事件序列(即組合過后的事件序列),并最終發(fā)送泣栈。
//創(chuàng)建第1個(gè)觀察者
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
}).subscribeOn(Schedulers.io());//設(shè)置被觀察者1再工作線程1中工作
//創(chuàng)建第2個(gè)觀察者
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("A");
e.onNext("B");
e.onNext("C");
e.onNext("D");
e.onComplete();
}
}).subscribeOn(Schedulers.newThread());//設(shè)置被觀察者2再工作線程2中工作
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return integer+s;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String value) {
Log.d(Constant.TAG,"最終收到的事件 = "+value);
}
@Override
public void onError(Throwable e) {
Log.d(Constant.TAG,"onError");
}
@Override
public void onComplete() {
Log.d(Constant.TAG,"onComplete");
}
});
log信息:
06-26 16:30:02.147 29926-29985/com.gjj.frame D/RxJava: 最終收到的事件 = 1A
06-26 16:30:02.150 29926-29984/com.gjj.frame D/RxJava: 最終收到的事件 = 2B
06-26 16:30:02.151 29926-29984/com.gjj.frame D/RxJava: 最終收到的事件 = 3C
注意:最終合并的事件數(shù)量是多個(gè)被觀察者中最少的數(shù)量卜高,多余的事件將不會(huì)發(fā)送。
5. combineLatest()
當(dāng)兩個(gè)Observable中的任何一個(gè)發(fā)送了數(shù)據(jù)后南片,將先發(fā)送了數(shù)據(jù)的Observables的最新(最后)一個(gè)數(shù)據(jù)與另外一個(gè)Observable發(fā)送的每一個(gè)數(shù)據(jù)結(jié)合掺涛,最終基于該函數(shù)的結(jié)果發(fā)送數(shù)據(jù)。
Observable.combineLatest(Observable.just(1L, 2L, 3L), Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS), new BiFunction<Long, Long, Long>() {
@Override
public Long apply(Long aLong, Long aLong2) throws Exception {
Log.d(Constant.TAG,"合并的數(shù)據(jù)是:"+aLong+" "+aLong2);
return aLong+aLong2;
}
}).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d(Constant.TAG,"合并的結(jié)果是:"+aLong);
}
});
log信息:
06-26 16:48:37.010 30604-30634/com.gjj.frame D/RxJava: 合并的數(shù)據(jù)是:3 0
06-26 16:48:37.011 30604-30634/com.gjj.frame D/RxJava: 合并的結(jié)果是:3
06-26 16:48:38.010 30604-30634/com.gjj.frame D/RxJava: 合并的數(shù)據(jù)是:3 1
06-26 16:48:38.011 30604-30634/com.gjj.frame D/RxJava: 合并的結(jié)果是:4
06-26 16:48:39.012 30604-30634/com.gjj.frame D/RxJava: 合并的數(shù)據(jù)是:3 2
06-26 16:48:39.013 30604-30634/com.gjj.frame D/RxJava: 合并的結(jié)果是:5
6. combineLatestDelayError()
作用類似于concatArrayDelayError()疼进。
7. reduce()
把被觀察者需要發(fā)送的事件聚合成一個(gè)事件&發(fā)送
Observable.just(1,2,3,4)
.reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
Log.d(Constant.TAG,"本次計(jì)算的數(shù)據(jù)是:"+integer+"乘"+integer2);
return integer * integer2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(Constant.TAG,"最終計(jì)算的結(jié)果是:"+integer);
}
});
log信息:
06-26 16:59:56.401 31613-31613/com.gjj.frame D/RxJava: 本次計(jì)算的數(shù)據(jù)是:1乘2
06-26 16:59:56.402 31613-31613/com.gjj.frame D/RxJava: 本次計(jì)算的數(shù)據(jù)是:2乘3
06-26 16:59:56.402 31613-31613/com.gjj.frame D/RxJava: 本次計(jì)算的數(shù)據(jù)是:6乘4
06-26 16:59:56.402 31613-31613/com.gjj.frame D/RxJava: 最終計(jì)算的結(jié)果是:24
8. collect()
將被觀察者Observable發(fā)送的數(shù)據(jù)事件收集到一個(gè)數(shù)據(jù)結(jié)構(gòu)里
Observable.just(1,2,3,4,5,6)
.collect(new Callable<ArrayList<Integer>>() {
@Override
public ArrayList<Integer> call() throws Exception {
return new ArrayList<>();
}
}, new BiConsumer<ArrayList<Integer>, Integer>() {
@Override
public void accept(ArrayList<Integer> list, Integer integer) throws Exception {
list.add(integer);
}
}).subscribe(new Consumer<ArrayList<Integer>>() {
@Override
public void accept(ArrayList<Integer> integers) throws Exception {
Log.d(Constant.TAG,"本次發(fā)送的數(shù)據(jù)是:"+integers);
}
});
log信息:
06-26 17:04:40.264 31785-31785/com.gjj.frame D/RxJava: 本次發(fā)送的數(shù)據(jù)是:[1, 2, 3, 4, 5, 6]
9. startWith()/startWithArray()
在一個(gè)被觀察者發(fā)送事件錢薪缆,追加發(fā)送一些數(shù)據(jù)/一個(gè)新的被觀察者
Observable.just(3,4)
.startWith(0)//追加單個(gè)數(shù)據(jù)
.startWithArray(1,2)//追加多個(gè)數(shù)據(jù)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(Constant.TAG,"接收到了事件"+integer);
}
});
log信息:
06-26 17:39:24.075 4052-4052/com.gjj.frame D/RxJava: 接收到了事件1
06-26 17:39:24.075 4052-4052/com.gjj.frame D/RxJava: 接收到了事件2
06-26 17:39:24.075 4052-4052/com.gjj.frame D/RxJava: 接收到了事件0
06-26 17:39:24.075 4052-4052/com.gjj.frame D/RxJava: 接收到了事件3
06-26 17:39:24.075 4052-4052/com.gjj.frame D/RxJava: 接收到了事件4
10.count()
統(tǒng)計(jì)被觀察者發(fā)送事件的數(shù)量。
Observable.just(1,2,3,4)
.count()
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long integer) throws Exception {
Log.d(Constant.TAG,"發(fā)送的事件數(shù)量 = "+integer);
}
});
log信息:
06-26 17:42:20.639 4750-4750/com.gjj.frame D/RxJava: 發(fā)送的事件數(shù)量 = 4