一.引言
Operators 操作符是為了實(shí)現(xiàn)對(duì)被觀察者發(fā)射事件或數(shù)據(jù)的變換,用于在被觀察者Observable/Flowable和目標(biāo)觀察者Observer/Subscriber之間修改Observable發(fā)出的事件斜做。
RxJava提供了很多很有用的操作符场躯,按照功能可以主要分為一下類型:
● 創(chuàng)建操作符-- Creating Observables
● 變換操作符--Transforming Observables
● 過(guò)濾操作符-- Filtering Observables
● 合并操作符-- Combining Observables
● 布爾操作符-- Boolean Operators
● 事件流操作符--Observable Utility Operators
1.創(chuàng)建操作符 Creating Observables
用來(lái)創(chuàng)建新的Observables.主要有以下方法:
●Create
●Defer
●Empty/Never/Throw
●From
● Interval
●Just
● Range
● Repeat
● Start
● Timer
創(chuàng)建操作符在之前講Observable/Flowable的創(chuàng)建時(shí)已經(jīng)講過(guò)谈为。
2.變換操作符Transforming Observables
將上級(jí)的數(shù)據(jù)處理變換后再發(fā)射出去,主要有以下方法:
●Buffer
●FlatMap
● GroupBy
● Map
● Scan
● Window
1.Buffer
Buffer操作符定期收集Observable的數(shù)據(jù)放進(jìn)一個(gè)數(shù)據(jù)集合里踢关,然后發(fā)射這些數(shù)據(jù)集合伞鲫,而不是一次發(fā)射一個(gè)源數(shù)據(jù)的值。
Observable.fromArray(arr)
.buffer(3,1)
.subscribe(getStringListObserver("buffer"));
// 3 means, 每創(chuàng)建一個(gè)list的長(zhǎng)度 (it takes max of three from its start index and create list)
// 1 means, 每創(chuàng)建一個(gè)list,開始下標(biāo)跳過(guò)的個(gè)數(shù)(it jumps one step every time)
//輸出
// a,b,c
// b,c,d
// c,d,e
// d,e
// e
Observable.fromArray(arr)
.buffer(2)//默認(rèn)skip 和count相同
.subscribe(getStringListObserver("buffer"));
//輸出
// a,b,
// c,d,
// e
```
**2.Map**
將上級(jí)數(shù)據(jù)修改签舞,變換處理后秕脓,在發(fā)射到下一級(jí),變換數(shù)據(jù)是一對(duì)一的儒搭。
```java
Observable.just(1,2,3,4,5)
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return integer+".xxxxx";
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
print(s);
}
});
//輸出結(jié)果
// 1.xxxxx
// 2.xxxxx
// 3.xxxxx
// 4.xxxxx
// 5.xxxxx
3. FlatMap
將上級(jí)數(shù)據(jù)修改撒会,變換處理后,在發(fā)射到下一級(jí)师妙,變換數(shù)據(jù)可以一對(duì)一诵肛,也可以一對(duì)多。
Observable.just(1,2,3,4,5)
.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(final Integer integer) throws Exception {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext(integer+".x");
e.onNext(integer+".xx");
e.onNext(integer+".xxx");
e.onNext(integer+".xxxx");
e.onComplete();
}
});
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
print("accept:"+s);
}
});
Observable.just(1,2,3,4,5).flatMap(new Function<Integer, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Integer integer) throws Exception {
return Observable.just(integer * 2);
}
}, new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
print("accept:"+integer);
}
});
//輸出結(jié)果
// accept:1.x
// accept:1.xx
// accept:1.xxx
// accept:1.xxxx
// accept:2.x
// accept:2.xx
// accept:2.xxx
// accept:2.xxxx
// accept:3.x
// accept:3.xx
// accept:3.xxx
// accept:3.xxxx
// accept:4.x
// accept:4.xx
// accept:4.xxx
// accept:4.xxxx
// accept:5.x
// accept:5.xx
// accept:5.xxx
// accept:5.xxxx
// accept:3
// accept:6
// accept:9
// accept:12
// accept:15
4.GroupBy
GroupBy操作符將原始Observable發(fā)射的數(shù)據(jù)按照key來(lái)拆分成一些小的Observable默穴,然后這些小的Observable分別發(fā)射其所包含的的數(shù)據(jù)怔檩。
String[] arr = new String[]{"aaa", "bb", "ccc", "dd", "eee"};
Observable.fromArray(arr).groupBy(new Function<String, Boolean>() {
@Override
public Boolean apply(String s) throws Exception {
return s.length()== 3;
}
}).subscribe(new Consumer<GroupedObservable<Boolean, String>>() {
@Override
public void accept(final GroupedObservable<Boolean, String> booleanStringGroupedObservable) throws Exception {
if(booleanStringGroupedObservable.getKey()) {
booleanStringGroupedObservable.toList().subscribe(new Consumer<List<String>>() {
@Override
public void accept(List<String> strings) throws Exception {
print("key=" + booleanStringGroupedObservable.getKey() + ",val=" + strings);
}
});
}else{
booleanStringGroupedObservable.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
print("key=" + booleanStringGroupedObservable.getKey() + ",val=" + s+s);
}
});
}
}
});
//輸出結(jié)果
// key=false,val=bbbb
// key=false,val=dddd
// key=true,val=[aaa, ccc, eee]
5. Scan
Scan操作符對(duì)一個(gè)序列的數(shù)據(jù)應(yīng)用一個(gè)函數(shù),并將這個(gè)函數(shù)的結(jié)果發(fā)射出去作為下個(gè)數(shù)據(jù)應(yīng)用這個(gè)函數(shù)時(shí)候的第一個(gè)參數(shù)使用蓄诽。
Observable.just(1,2,3,4,5).scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
return integer+integer2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
print(integer+"");
}
});
Observable.fromArray(new String[]{"a","b","c","d","e"}).scan(new BiFunction<String, String, String>() {
@Override
public String apply(String s, String s2) throws Exception {
return s+s2;
}
}).subscribe(getStringObserver("Scan"));
//輸出結(jié)果
//1 3 6 10 15
// Scan-->String onSubscribe
// Scan-->String onNext : value :a
// Scan-->String onNext : value :ab
// Scan-->String onNext : value :abc
// Scan-->String onNext : value :abcd
// Scan-->String onNext : value :abcde
// Scan-->String onComplete
6.Window
window操作符會(huì)在時(shí)間間隔內(nèi)緩存結(jié)果薛训,類似于buffer緩存一個(gè)list集合,區(qū)別在于window將這個(gè)結(jié)果集合封裝成了observable
Observable.interval(300, TimeUnit.MILLISECONDS).take(50)
.window(5, TimeUnit.SECONDS)
.subscribe(new Consumer<Observable<Long>>() {
@Override
public void accept(Observable<Long> longObservable) throws Exception {
print("Window accept---------");
longObservable.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
print(aLong+"");
}
});
}
});
//輸出結(jié)果:
//Window accept---------
//0
//1
//...
//15
//Window accept---------
//16
//...
//32
//Window accept---------
//33
//...
//48
//Window accept---------
//49