在RxJava2(一)教程中秃诵,已經(jīng)跟著大神們學(xué)習(xí)了RxJava2的基本使用轨功,現(xiàn)在我們來(lái)學(xué)習(xí)一下RxJava2很強(qiáng)大的操作符
Android RxJava2操作符
Map
- Map是RxJava中的一個(gè)變換操作符丰辣,它的作用就是對(duì)上游發(fā)送的每一個(gè)事件應(yīng)用一個(gè)函數(shù)乍恐,使得每一個(gè)事件都按照指定的函數(shù)去變化默蚌。通過(guò)Map可以將上游發(fā)來(lái)的事件轉(zhuǎn)換為任意的類型辆琅,可以是一個(gè)Object也可以是一個(gè)集合漱办,圖示表示如下:
- 代碼表示:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
Log.d(TAG, "subscribe: ");
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
}).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
String mapStr = String.valueOf(integer + 1);
return mapStr;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: " + s);
}
});
FlatMap
flatMap是一個(gè)非常強(qiáng)大的操作符,flatMap將一個(gè)發(fā)送事件的上游Observable變換為多個(gè)發(fā)送事件的Observables婉烟,然后將它們發(fā)射的事件合并后放進(jìn)一個(gè)單獨(dú)的Observable里娩井。
- 圖示:
上游發(fā)送三個(gè)事件,分別是1,2,3注意它們的顏色似袁,中間flatMap的作用是將圓形的事件轉(zhuǎn)換為一個(gè)發(fā)送矩形事件和三角形事件的新的上游Observable
上游每發(fā)送一個(gè)事件洞辣,flatMap都將創(chuàng)建一個(gè)新的水管,然后發(fā)送轉(zhuǎn)換之后的新的事件昙衅,下游接收到的就是這些新的水管發(fā)送的數(shù)據(jù)扬霜。flatMap不能保證事件的順序
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
}).flatMap(new Function<Integer, Observable<String>>() {
@Override
public Observable<String> apply(Integer integer) throws Exception {
ArrayList<String> arrayList = new ArrayList<>();
for (int i = 0; i < 5; i++) {
String iStr = "flatMap value" + integer;
arrayList.add(iStr);
}
return Observable.fromIterable(arrayList).delay(10, TimeUnit.MICROSECONDS);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: " + s);
}
});
concatMap
concatMap和flatMap的作用是一樣的,它的結(jié)果是嚴(yán)格按照上游發(fā)送的順序來(lái)發(fā)送的而涉。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(11);
e.onNext(111);
}
}).concatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
ArrayList<String> arrayList = new ArrayList<>();
for (int i = 0; i < 3; i++) {
arrayList.add("concatMap value" + i);
}
return Observable.fromIterable(arrayList).delay(5, TimeUnit.MILLISECONDS);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: " + s);
}
});
- 運(yùn)行結(jié)果
09-11 03:21:32.970 25661-25678/? D/Rxjava2Lesson: accept: concatMap value0
09-11 03:21:32.970 25661-25678/? D/Rxjava2Lesson: accept: concatMap value1
09-11 03:21:32.970 25661-25678/? D/Rxjava2Lesson: accept: concatMap value2
09-11 03:21:32.981 25661-25680/? D/Rxjava2Lesson: accept: concatMap value0
09-11 03:21:32.981 25661-25680/? D/Rxjava2Lesson: accept: concatMap value1
09-11 03:21:32.981 25661-25680/? D/Rxjava2Lesson: accept: concatMap value2
09-11 03:21:32.988 25661-25681/? D/Rxjava2Lesson: accept: concatMap value0
09-11 03:21:32.988 25661-25681/? D/Rxjava2Lesson: accept: concatMap value1
09-11 03:21:32.988 25661-25681/? D/Rxjava2Lesson: accept: concatMap value2
Buffer
Buffer操作符會(huì)定期收集Observable的數(shù)據(jù)放進(jìn)一個(gè)數(shù)據(jù)包裹著瓶,然后發(fā)射這些包裹,并不是一次發(fā)射一個(gè)值
Buffer操作符將一個(gè)Observable變換為另一個(gè)啼县,原來(lái)的Observable正常發(fā)射數(shù)據(jù)材原,變換產(chǎn)生的Observable發(fā)射這些數(shù)據(jù)的緩存集合。如果原來(lái)的Observable發(fā)射了一個(gè)onError通知季眷,Buffer會(huì)立即傳遞這個(gè)通知余蟹,而不是首先發(fā)射緩存的數(shù)據(jù)。
Buffer變體
- Buffer(count) 以列表List的形式發(fā)射非重疊的緩存子刮,每一個(gè)緩存至多包含來(lái)自原始Observable的count項(xiàng)數(shù)據(jù)
- Buffer(count,skip) 從原始Observable的第一項(xiàng)數(shù)據(jù)開(kāi)始創(chuàng)建新的緩存威酒。每當(dāng)接收到skip數(shù)據(jù),用count項(xiàng)數(shù)據(jù)來(lái)填充‘
Scan
Scan連續(xù)地對(duì)數(shù)據(jù)序列的每一項(xiàng)應(yīng)用一個(gè)函數(shù),然后連續(xù)發(fā)射結(jié)果
Scan操作符對(duì)原始Observable發(fā)射的第一項(xiàng)數(shù)據(jù)應(yīng)用一個(gè)函數(shù)兼搏,然后將這個(gè)函數(shù)的結(jié)果作為自己的第一項(xiàng)數(shù)據(jù)發(fā)射卵慰。將函數(shù)的結(jié)果同第二項(xiàng)數(shù)據(jù)一起填充給這個(gè)函數(shù)來(lái)產(chǎn)生自己的第二項(xiàng)數(shù)據(jù)。持續(xù)進(jìn)行這個(gè)過(guò)程來(lái)產(chǎn)生剩余的數(shù)據(jù)序列佛呻。
Observable.just(1, 2, 3, 4, 5)
.scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "accept: " + integer);
}
});
Window
Window定期將來(lái)自原始Observable的數(shù)據(jù)分解為一個(gè)Observable窗口裳朋,發(fā)射這些窗口而不是每次發(fā)射一項(xiàng)數(shù)據(jù)
window和Buffer類似,但不是發(fā)射來(lái)自原始Observable的數(shù)據(jù)包吓著,發(fā)射的是Observables鲤嫡,這些Observables中的每一個(gè)都發(fā)射原始Observable數(shù)據(jù)的一個(gè)子集,最后發(fā)射一個(gè)onComplete通知绑莺。
Observable.range(1, 10).window(new Observable<Integer>() {
@Override
protected void subscribeActual(Observer<? super Integer> observer) {
Log.d(TAG, "subscribeActual: ");
observer.onNext(1);
observer.onNext(1);
observer.onNext(1);
}
}).subscribe(new Consumer<Observable<Integer>>() {
@Override
public void accept(Observable<Integer> integerObservable) throws Exception {
integerObservable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "accept: " + integer);
}
});
}
});
ZIP操作符
ZIP通過(guò)一個(gè)函數(shù)將多個(gè)Observable發(fā)送的事件結(jié)合到一起暖眼,然后發(fā)送這些組合到一起的事件。按照嚴(yán)格的順序應(yīng)用這個(gè)函數(shù)纺裁,只發(fā)射與發(fā)射項(xiàng)最少的那個(gè)Observable一樣多的數(shù)據(jù)
從圖中看到诫肠,有兩個(gè)上游的水管,通過(guò)ZIP操作符欺缘,使得兩個(gè)事件合并為了一個(gè)事件
- 分解動(dòng)作
組合的過(guò)程是分別從 兩根水管里各取出一個(gè)事件 來(lái)進(jìn)行組合, 并且一個(gè)事件只能被使用一次, 組合的順序是嚴(yán)格按照事件發(fā)送的順利 來(lái)進(jìn)行的, 也就是說(shuō)不會(huì)出現(xiàn)圓形1 事件和三角形B 事件進(jìn)行合并, 也不可能出現(xiàn)圓形2 和三角形A 進(jìn)行合并的情況.
最終下游收到的事件數(shù)量 是和上游中發(fā)送事件最少的那一根水管的事件數(shù)量 相同. 這個(gè)也很好理解, 因?yàn)槭菑拿恳桓?里取一個(gè)事件來(lái)進(jìn)行合并, 最少的 那個(gè)肯定就最先取完 , 這個(gè)時(shí)候其他的水管盡管還有事件 , 但是已經(jīng)沒(méi)有足夠的事件來(lái)組合了, 因此下游就不會(huì)收到剩余的事件了.
//上游水管第一個(gè)事件
Observable<Integer> observable1 = Observable.range(1, 5);
//上游水管第二個(gè)事件
Observable<Integer> observable2 = Observable.range(6, 10);
//合并事件
Observable.zip(observable1, observable2, new BiFunction<Integer, Integer, String>() {
@Override
public String apply(Integer integer, Integer integer2) throws Exception {
return String.valueOf(integer + integer2);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: " + s);
}
});
實(shí)踐
- zip在Android中的使用栋豫,可以適用于如下場(chǎng)景,一個(gè)界面需要展示用戶的一些信息谚殊,這些信息分別要從兩個(gè)服務(wù)器接口中獲取丧鸯,只有當(dāng)兩個(gè)數(shù)據(jù)都獲取后才能進(jìn)行展示。這類同時(shí)的信息請(qǐng)求比較適用zip
public interface Api {
@GET
Observable<UserBaseInfoResponse> getUserBaseInfo(@Body UserBaseInfoRequest request);
@GET
Observable<UserExtraInfoResponse> getUserExtraInfo(@Body UserExtraInfoRequest request);
}
zip打包
Observable<UserBaseInfoResponse> observable1 =
api.getUserBaseInfo(new UserBaseInfoRequest()).subscribeOn(Schedulers.io());
Observable<UserExtraInfoResponse> observable2 =
api.getUserExtraInfo(new UserExtraInfoRequest()).subscribeOn(Schedulers.io());
Observable.zip(observable1, observable2,
new BiFunction<UserBaseInfoResponse, UserExtraInfoResponse, UserInfo>() {
@Override
public UserInfo apply(UserBaseInfoResponse baseInfo,
UserExtraInfoResponse extraInfo) throws Exception {
return new UserInfo(baseInfo, extraInfo);
}
}).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<UserInfo>() {
@Override
public void accept(UserInfo userInfo) throws Exception {
//do something;
}
});
基本的操作符就是這些了嫩絮,以后再學(xué)習(xí)到其它的運(yùn)算符再繼續(xù)補(bǔ)充