本文中通過(guò)圖解的方式解釋Rxjava中復(fù)雜的操作符,值得收藏怖糊。其中用到的demo地址:RxJava2-Android-Samples
1. buffer
demo:
Observable<List<String>> buffered = getObservable().buffer(2叹侄, 3);
第一個(gè)參數(shù)表示在emit數(shù)據(jù)之前坏瘩,Observable需要緩存多少個(gè)數(shù)據(jù)
第二個(gè)參數(shù)表示每次emit數(shù)據(jù)之后跳過(guò)幾個(gè)數(shù)據(jù)山林。
圖示中就是每次buffer 2個(gè)數(shù)據(jù)之后emit志秃,每次emit之后跳過(guò)3個(gè)數(shù)據(jù)怔球。
2. concat
demo:
Observable.concat(aObservable, bObservable)
第一個(gè)參數(shù)為第一個(gè)Observable
第二個(gè)參數(shù)為第二個(gè)Observable
圖示中連接兩個(gè)Observable之后嚼酝,數(shù)據(jù)會(huì)連接起來(lái)浮还,emit a1, a2, a3,b1,b2,b3
3. debounce
demo:
getObservable() .debounce(500, TimeUnit.MILLISECONDS)
第一個(gè)參數(shù)是時(shí)間間隔
第二個(gè)參數(shù)是時(shí)間單位
debounce表示emit數(shù)據(jù)之后一定時(shí)間內(nèi)沒(méi)有其他數(shù)據(jù)出現(xiàn)才真正emit數(shù)據(jù)。
圖示中emit黃球后闽巩,在規(guī)定時(shí)間內(nèi)又emit綠球钧舌,則黃球不會(huì)被emit。
4. defer
defer為每一個(gè)observer創(chuàng)建一個(gè)ObservableSource涎跨,這樣當(dāng)?shù)谝粋€(gè)observer訂閱之后如果ObservableSource中的數(shù)據(jù)發(fā)生變化洼冻,第二個(gè)訂閱的Observer會(huì)得到不同的數(shù)據(jù)。
demo:
Observable.defer(new Callable<ObservableSource<? extends String>>() {
@Override
public ObservableSource<? extends String> call() throws Exception {
return Observable.just(brand);
}
});
demo中可以隨時(shí)改變brand
的值隅很,這樣不同的Observer可能會(huì)得到不同的值撞牢。
5. distinct
distinct可以對(duì) emit 的數(shù)據(jù)做去重處理
demo:
Observable.just(1, 2, 1, 1, 2, 3, 4 ,6, 4)
.distinct()
.subscribe(getObserver());
demo中最后emit的數(shù)據(jù)只有1,2叔营,3屋彪,4,6
6. filter
filter按照一定的規(guī)則過(guò)濾數(shù)據(jù)
demo:
Observable.just(1, 2, 3, 4, 5, 6)
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer % 2 == 0;
}
})
.subscribe(getObserver());
demo中原始數(shù)據(jù)中奇數(shù)會(huì)被過(guò)濾掉绒尊。
7. reduce
reduce 對(duì)所有數(shù)據(jù)進(jìn)行處理畜挥,最終emit一個(gè)數(shù)據(jù)。
demo:
Flowable<Integer> observable = Flowable.just(1, 2, 3, 4);
observable.reduce(50, new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer t1, Integer t2) {
return t1 + t2;
}
}).subscribe(getObserver());
demo中把 50 + 1 +2 +3 +4 的結(jié)果60 emit婴谱。
8. interval
demo:
Observable.interval(0, 2, TimeUnit.SECONDS);
interval可以延時(shí)一定時(shí)間后開(kāi)始按周期emit數(shù)據(jù)蟹但,emit的數(shù)據(jù)從0開(kāi)始一次遞增。
第一個(gè)參數(shù)為第一次emit數(shù)據(jù)時(shí)延時(shí)時(shí)間
第二個(gè)參數(shù)為emit數(shù)據(jù)周期
第三個(gè)參數(shù)為時(shí)間單位
9 .last
如果Observable有數(shù)據(jù)則只emit最后一個(gè)數(shù)據(jù)谭羔,如果沒(méi)有數(shù)據(jù)則emit默認(rèn)數(shù)據(jù)华糖。
demo:
Observable.just("A1", "A2", "A3", "A4", "A5", "A6").last("A1") // the default item ("A1") to emit if the source ObservableSource is empty
.subscribe(getObserver());
demo中只emit A6,如果Observable沒(méi)有數(shù)據(jù)瘟裸,則會(huì)emit 默認(rèn)數(shù)據(jù)A1客叉。
10. map
map可以對(duì)數(shù)據(jù)執(zhí)行一些操作后再emit出去。
demo:
getObservable()
// Run on a background thread
.subscribeOn(Schedulers.io())
// Be notified on the main thread
.observeOn(AndroidSchedulers.mainThread())
.map(new Function<List<ApiUser>, List<User>>() {
@Override
public List<User> apply(List<ApiUser> apiUsers) throws Exception {
return Utils.convertApiUserListToUserList(apiUsers);
}
})
.subscribe(getObserver());
demo中把一個(gè)ApiUser list轉(zhuǎn)為 User list 了。
11. merge
merge 與concat不同的是把兩個(gè) Observable的數(shù)據(jù)合成一列數(shù)據(jù)十办,就像是從一個(gè)Observable emit秀撇,但是順序不一定。
demo:
final String[] aStrings = {"A1", "A2", "A3", "A4"};
final String[] bStrings = {"B1", "B2", "B3"};
final Observable<String> aObservable = Observable.fromArray(aStrings);
final Observable<String> bObservable = Observable.fromArray(bStrings);
Observable.merge(aObservable, bObservable)
.subscribe(getObserver());
demo 中最終emit的數(shù)據(jù)可能是"A1", "B1", "A2", "A3", "A4", "B2", "B3"向族,還可能是其他順序呵燕。
12. scan
** demo:**
Observable.just(1, 2, 3, 4, 5)
// Run on a background thread
.subscribeOn(Schedulers.io())
// Be notified on the main thread
.observeOn(AndroidSchedulers.mainThread())
.scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer int1, Integer int2) throws Exception {
return int1 + int2;
}
})
.subscribe(getObserver());
demo中依次輸出1,3件相,6再扭,10,15夜矗,即依次把BiFunction作用在前一個(gè)輸出結(jié)果和當(dāng)前數(shù)據(jù)上泛范。
13. skip
demo:
Observable.just(1, 2, 3, 4, 5)
// Run on a background thread
.subscribeOn(Schedulers.io())
// Be notified on the main thread
.observeOn(AndroidSchedulers.mainThread())
.skip(3)
.subscribe(getObserver());
skip比較簡(jiǎn)單,會(huì)跳過(guò)前幾個(gè)數(shù)據(jù)紊撕,具體可以通過(guò)參數(shù)設(shè)置罢荡,demo中是跳過(guò)前三個(gè)數(shù)據(jù)。
14. take
demo:
Observable.just(1, 2, 3, 4, 5)
// Run on a background thread
.subscribeOn(Schedulers.io())
// Be notified on the main thread
.observeOn(AndroidSchedulers.mainThread())
.take(3)
.subscribe(getObserver());
take比較簡(jiǎn)單对扶,只取前幾個(gè)數(shù)據(jù)emit区赵,demo中取前三個(gè)數(shù)據(jù)。
15. throttleLast
throttleLast emit一定周期內(nèi)的最后一個(gè)數(shù)據(jù)浪南。
demo:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
// send events with simulated time wait
Thread.sleep(0);
emitter.onNext(1); // skip
emitter.onNext(2); // deliver
Thread.sleep(505);
emitter.onNext(3); // skip
Thread.sleep(99);
emitter.onNext(4); // skip
Thread.sleep(100);
emitter.onNext(5); // skip
emitter.onNext(6); // deliver
Thread.sleep(305);
emitter.onNext(7); // deliver
Thread.sleep(510);
emitter.onComplete();
}
}).throttleLast(500, TimeUnit.MILLISECONDS)
// Run on a background thread
.subscribeOn(Schedulers.io())
// Be notified on the main thread
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getObserver());
demo中每隔500ms emit當(dāng)時(shí)的最后一個(gè)數(shù)據(jù)笼才,demo中最終emit 2,6络凿,7骡送。
16. timer
timer比較簡(jiǎn)單,就是延時(shí)一定時(shí)間emit 數(shù)據(jù)0絮记。
demo:
Observable.timer(2, TimeUnit.SECONDS)
// Run on a background thread
.subscribeOn(Schedulers.io())
// Be notified on the main thread
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getObserver());
17 zip
demo:
private void doSomeWork() {
Observable.zip(getCricketFansObservable(), getFootballFansObservable(),
new BiFunction<List<User>, List<User>, List<User>>() {
@Override
public List<User> apply(List<User> cricketFans, List<User> footballFans) throws Exception {
return Utils.filterUserWhoLovesBoth(cricketFans, footballFans);
}
})
// Run on a background thread
.subscribeOn(Schedulers.io())
// Be notified on the main thread
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getObserver());
}
private Observable<List<User>> getCricketFansObservable() {
return Observable.create(new ObservableOnSubscribe<List<User>>() {
@Override
public void subscribe(ObservableEmitter<List<User>> e) throws Exception {
if (!e.isDisposed()) {
e.onNext(Utils.getUserListWhoLovesCricket());
e.onComplete();
}
}
});
}
private Observable<List<User>> getFootballFansObservable() {
return Observable.create(new ObservableOnSubscribe<List<User>>() {
@Override
public void subscribe(ObservableEmitter<List<User>> e) throws Exception {
if (!e.isDisposed()) {
e.onNext(Utils.getUserListWhoLovesFootball());
e.onComplete();
}
}
});
}
public static List<User> filterUserWhoLovesBoth(List<User> cricketFans, List<User> footballFans) {
List<User> userWhoLovesBoth = new ArrayList<User>();
for (User cricketFan : cricketFans) {
for (User footballFan : footballFans) {
if (cricketFan.id == footballFan.id) {
userWhoLovesBoth.add(cricketFan);
}
}
}
return userWhoLovesBoth;
}
zip對(duì)兩個(gè)Observable的數(shù)據(jù)進(jìn)行BiFunction操作摔踱,之后再emit出去。demo中g(shù)etCricketFansObservable 獲取到喜歡cricket 的人到千,getFootballFansObservable獲取到喜歡football的人昌渤,最終經(jīng)過(guò)BiFunction之后獲取到喜歡兩項(xiàng)運(yùn)動(dòng)的人。
關(guān)于Rxjava的更多operators可以參考其官網(wǎng)憔四,地址:http://reactivex.io/documentation/operators.html