簡(jiǎn)要:
需求了解:
對(duì)于數(shù)據(jù)的觀察以及處理過程中往往有需要過濾一些不需要的數(shù)據(jù)的需求插爹,比如防抖(防止快速操作),獲取第一項(xiàng)崇败、指定序列項(xiàng)或者最后一項(xiàng)的需要,獲取指定時(shí)間內(nèi)的有效數(shù)據(jù)等奈偏。Rx中提供了豐富的數(shù)據(jù)過濾處理的操作方法。
可用于過濾和選擇Observable發(fā)射的數(shù)據(jù)序列的方法:
- Debounce:過濾發(fā)射速率較快的數(shù)據(jù)項(xiàng)躯护,防抖操作惊来。
- Throttle: 對(duì)數(shù)據(jù)序列進(jìn)行限流操作,可以指定獲取周期內(nèi)的指定數(shù)據(jù)項(xiàng)棺滞,也可以用于防抖裁蚁。
- Sample: 允許通過將序列劃分為時(shí)間片段收集數(shù)據(jù)矢渊,并從每片中取出一個(gè)值來稀疏序列。
- Distinct: 過濾掉重復(fù)數(shù)據(jù)枉证。
- Skip: 跳過指定的N項(xiàng)數(shù)據(jù)矮男。
- Filter: 通過函數(shù)指定過濾的數(shù)據(jù)。
- First: 只發(fā)射第一項(xiàng)或者滿足某個(gè)條件的第一項(xiàng)數(shù)據(jù)室谚。
- Single: 與 first 類似毡鉴,但是如果原始Observable在完成之前不是正好發(fā)射一次數(shù)據(jù),它會(huì)拋出一個(gè)NoSuchElementException 的異常通知秒赤。
- ElementAt: 獲取原始Observable發(fā)射的數(shù)據(jù)序列指定索引位置的數(shù)據(jù)項(xiàng)猪瞬,然后當(dāng)做自己的唯一數(shù)據(jù)發(fā)射。
- ignoreElements: 不發(fā)射任何數(shù)據(jù)入篮,只發(fā)射Observable的終止通知陈瘦。
- Last: 只發(fā)射最后一項(xiàng)(或者滿足某個(gè)條件的最后一項(xiàng))數(shù)據(jù)。
- Take: 只返回Observable發(fā)送數(shù)據(jù)項(xiàng)序列前面的N項(xiàng)數(shù)據(jù)潮售,忽略剩余的數(shù)據(jù)痊项。
- TakeLast: 只發(fā)射Observable發(fā)送數(shù)據(jù)項(xiàng)序列的后N項(xiàng)數(shù)據(jù),忽略其他數(shù)據(jù)酥诽。
- ofType: 過濾一個(gè)Observable只返回指定類型的數(shù)據(jù)鞍泉。
1. Debounce
僅在過了一段指定的時(shí)間還沒發(fā)射數(shù)據(jù)時(shí)才發(fā)射一個(gè)數(shù)據(jù)。Debounce
操作符會(huì)過濾掉發(fā)射速率過快的數(shù)據(jù)項(xiàng)盆均。
提示: 操作默認(rèn)在 computation 調(diào)度器上執(zhí)行塞弊,但是你可以指定其它的調(diào)度器漱逸。
1.1 debounce(timeout, unit)
指定每個(gè)數(shù)據(jù)發(fā)射后在 timeout
時(shí)間內(nèi)泪姨,原始數(shù)據(jù)序列中沒有下一個(gè)數(shù)據(jù)發(fā)射時(shí),發(fā)射此項(xiàng)數(shù)據(jù)饰抒,否則丟棄這項(xiàng)數(shù)據(jù)肮砾。此操作與 throttleWithTimeout
方法相同。
注意: 這個(gè)操作符會(huì)在原始數(shù)據(jù)的 onCompleted
時(shí)候直接發(fā)射發(fā)射數(shù)據(jù)袋坑,不會(huì)因?yàn)橄蘖鞫鴣G棄數(shù)據(jù)仗处。
實(shí)例代碼:
// 1. debounce(long timeout, TimeUnit unit)
// 發(fā)送一個(gè)數(shù)據(jù),如果在包含timeout時(shí)間內(nèi)枣宫,沒有第二個(gè)數(shù)據(jù)發(fā)射婆誓,那么就會(huì)發(fā)射此數(shù)據(jù),否則丟棄此數(shù)據(jù)
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1); // 下一個(gè)數(shù)據(jù)到此數(shù)據(jù)發(fā)射, 30 < timeout --> skip
Thread.sleep(30);
emitter.onNext(2); // 下一個(gè)數(shù)據(jù)到此數(shù)據(jù)發(fā)射, 100 > timeout --> deliver
Thread.sleep(100);
emitter.onNext(3); // 下一個(gè)數(shù)據(jù)到此數(shù)據(jù)發(fā)射, 50 = timeout --> skip:
Thread.sleep(50);
emitter.onNext(4); // 下一個(gè)數(shù)據(jù)到此數(shù)據(jù)發(fā)射, onCompleted --> deliver
emitter.onComplete();
}
}).debounce(50, TimeUnit.MILLISECONDS) // 指定防抖丟棄時(shí)間段為50毫秒
// .debounce(50, TimeUnit.MILLISECONDS, Schedulers.trampoline()) // 指定調(diào)度為當(dāng)前線程排隊(duì)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer t) throws Exception {
System.out.println("--> accept debounce(1-1): " + t);
}
});
輸出:
--> accept debounce(1-1): 2
--> accept debounce(1-1): 4
Javadoc: debounce(timeout, unit)
Javadoc: debounce(timeout, unit, scheduler)
1.2 debounce(debounceSelector)
原始數(shù)據(jù)發(fā)射每一個(gè)序列都通過綁定監(jiān)聽debounceSelector
的數(shù)據(jù)通知也颤,在debounceSelector
數(shù)據(jù)發(fā)送前洋幻,如果有下一個(gè)數(shù)據(jù),則丟棄當(dāng)前項(xiàng)數(shù)據(jù)翅娶,繼續(xù)監(jiān)視下一個(gè)數(shù)據(jù)文留。
注意: 這個(gè)操作符會(huì)在原始數(shù)據(jù)的 onCompleted
時(shí)候直接發(fā)射發(fā)射數(shù)據(jù)好唯,不會(huì)因?yàn)橄蘖鞫鴣G棄數(shù)據(jù)。
實(shí)例代碼:
// 2. debounce(debounceSelector)
// 原始數(shù)據(jù)發(fā)射每一個(gè)序列的通過監(jiān)聽debounceSelector的數(shù)據(jù)通知燥翅,
// 在debounceSelector數(shù)據(jù)發(fā)送前骑篙,如果有下一個(gè)數(shù)據(jù),則丟棄當(dāng)前項(xiàng)數(shù)據(jù)森书,繼續(xù)監(jiān)視下一個(gè)數(shù)據(jù)
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1); // skip --> debounceSelector is no emitter(<2s)
Thread.sleep(1000);
emitter.onNext(2); // skip --> debounceSelector is no emitter(<2s)
Thread.sleep(200);
emitter.onNext(3); // deliver --> debounceSelector is emitter(>2s)
Thread.sleep(2500);
emitter.onNext(4); // skip --> debounceSelector is no emitter(=2s)
Thread.sleep(2000);
emitter.onNext(5); // deliver --> onComplete
Thread.sleep(500);
emitter.onComplete();
}
}).debounce(new Function<Integer, ObservableSource<Long>>() {
@Override
public ObservableSource<Long> apply(Integer t) throws Exception {
System.out.println("--> apply(1-2): " + t);
// 設(shè)置過濾延遲時(shí)間為2秒靶端,此時(shí)返回的Observable從訂閱到發(fā)送數(shù)據(jù)時(shí)間段即為timeout
return Observable.timer(2, TimeUnit.SECONDS)
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable t) throws Exception {
// 開始訂閱,監(jiān)聽數(shù)據(jù)的發(fā)送來過濾數(shù)據(jù)
System.out.println("--> debounceSelector(1-2) is onSubscribe!");
}
}).doOnDispose(new Action() {
@Override
public void run() throws Exception {
// 發(fā)射數(shù)據(jù)后拄氯,丟棄當(dāng)前的數(shù)據(jù)躲查,解除當(dāng)前綁定
System.out.println("--> debounceSelector(1-2) is unSubscribe!");
}
});
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer t) throws Exception {
System.out.println("----------> accept(1-2): " + t);
}
});
輸出:
--> apply(1-2): 1
--> debounceSelector(1-2) is onSubscribe!
--> debounceSelector(1-2) is unSubscribe!
--> apply(1-2): 2
--> debounceSelector(1-2) is onSubscribe!
--> debounceSelector(1-2) is unSubscribe!
--> apply(1-2): 3
--> debounceSelector(1-2) is onSubscribe!
--> debounceSelector(1-2) is unSubscribe!
----------> accept(1-2): 3
--> apply(1-2): 4
--> debounceSelector(1-2) is onSubscribe!
--> debounceSelector(1-2) is unSubscribe!
--> apply(1-2): 5
--> debounceSelector(1-2) is onSubscribe!
----------> accept(1-2): 5
--> debounceSelector(1-2) is unSubscribe!
Javadoc: debounce(debounceSelector)
2. Throttle
主要應(yīng)用于數(shù)據(jù)序列的節(jié)流操作,在指定的采樣周期內(nèi)獲取指定的數(shù)據(jù)译柏。Throttling
也用于稀疏序列镣煮。當(dāng)生產(chǎn)者發(fā)出的值超出我們想要的值時(shí),我們不需要每個(gè)序列值鄙麦,我們可以通過限制它來稀釋序列典唇。
注意: 時(shí)間的劃分不一定是統(tǒng)一的。例如胯府,發(fā)射數(shù)據(jù)的時(shí)間間隔與劃分?jǐn)?shù)據(jù)的時(shí)間間隔一致時(shí)介衔,在原始數(shù)據(jù)發(fā)送的一個(gè)時(shí)間點(diǎn)(此時(shí)數(shù)據(jù)還沒有實(shí)際發(fā)送),此時(shí)可能由于劃分時(shí)間已到骂因,劃分的數(shù)據(jù)片直接關(guān)閉了炎咖,所以有的時(shí)間片數(shù)據(jù)會(huì)有時(shí)間間隙差異。
提示: 操作默認(rèn)在 computation 調(diào)度器上執(zhí)行寒波,但是你可以指定其它的調(diào)度器乘盼。
2.1 throttleFirst(windowDuration, unit)
獲取每個(gè) windowDuration
時(shí)間段內(nèi)的原始數(shù)據(jù)序列中的第一項(xiàng)數(shù)據(jù),直到原始數(shù)據(jù)全部發(fā)送完畢俄烁。
解析: 實(shí)際在每個(gè)采樣周期內(nèi)绸栅,先發(fā)送第一項(xiàng)接收到的數(shù)據(jù),然后丟棄后續(xù)周期內(nèi)的數(shù)據(jù)項(xiàng)页屠。
實(shí)例代碼:
// 1. throttleFirst(long windowDuration, TimeUnit unit)
// 指定每個(gè)指定時(shí)間內(nèi)取第一項(xiàng)數(shù)據(jù), 直到原始數(shù)據(jù)序列全部發(fā)送結(jié)束
Observable.intervalRange(1, 10, 0, 1, TimeUnit.SECONDS)
.doOnNext(new Consumer<Long>() {
@Override
public void accept(Long t) throws Exception {
System.out.println("--> DataSource doOnNext : " + t);
}
}).throttleFirst(2, TimeUnit.SECONDS) // 獲取每隔2秒之內(nèi)收集的第一項(xiàng)數(shù)據(jù)
// .throttleFirst(2, TimeUnit.SECONDS, Schedulers.newThread()) // 指定調(diào)度線程為newThread()
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("--> throttleFirst onSubscribe");
}
@Override
public void onNext(Long t) {
System.out.println("-------------> throttleFirst onNext: " + t);
}
@Override
public void onError(Throwable e) {
System.out.println("--> throttleFirst onError: " + e);
}
@Override
public void onComplete() {
System.out.println("--> throttleFirst onComplete");
}
});
輸出:
--> throttleFirst onSubscribe
--> DataSource doOnNext : 1
-------------> throttleFirst onNext: 1
--> DataSource doOnNext : 2
--> DataSource doOnNext : 3
--> DataSource doOnNext : 4
-------------> throttleFirst onNext: 4
--> DataSource doOnNext : 5
--> DataSource doOnNext : 6
--> DataSource doOnNext : 7
-------------> throttleFirst onNext: 7
--> DataSource doOnNext : 8
--> DataSource doOnNext : 9
-------------> throttleFirst onNext: 9
--> DataSource doOnNext : 10
--> throttleFirst onComplete
Javadoc: throttleFirst(windowDuration, unit)
Javadoc: throttleFirst(windowDuration, unit, scheduler)
2.2 throttleLast(intervalDuration, unit)
獲取每個(gè) windowDuration
時(shí)間段內(nèi)的原始數(shù)據(jù)序列中的最近的一項(xiàng)數(shù)據(jù)粹胯,直到原始數(shù)據(jù)全部發(fā)送完畢。throttleLast
運(yùn)算符以固定間隔而不是相對(duì)于最后一項(xiàng)來劃分時(shí)間辰企。它會(huì)在每個(gè)窗口中發(fā)出最后一個(gè)值风纠,而不是它后面的第一個(gè)值。
解析: 實(shí)際在每個(gè)采樣周期內(nèi)牢贸,先緩存收集的數(shù)據(jù)竹观,等周期結(jié)束發(fā)送最后一項(xiàng)數(shù)據(jù),丟棄最后數(shù)據(jù)項(xiàng)前面的數(shù)據(jù)十减。
實(shí)例代碼:
// 2. throttleLast(long intervalDuration, TimeUnit unit)
// 指定間隔時(shí)間內(nèi)取最后一項(xiàng)數(shù)據(jù)栈幸,直到原始數(shù)據(jù)序列全部發(fā)送結(jié)束
Observable.intervalRange(1, 10, 0, 1050, TimeUnit.MILLISECONDS)
.doOnNext(new Consumer<Long>() {
@Override
public void accept(Long t) throws Exception {
System.out.println("--> DataSource doOnNext : " + t);
}
}).throttleLast(2, TimeUnit.SECONDS) // 獲取每隔2秒之內(nèi)收集的最后一項(xiàng)數(shù)據(jù)
// .throttleLast(2, TimeUnit.SECONDS, Schedulers.newThread()) // 指定調(diào)度線程為newThread()
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("--> throttleLast onSubscribe");
}
@Override
public void onNext(Long t) {
System.out.println("-------------> throttleLast onNext: " + t);
}
@Override
public void onError(Throwable e) {
System.out.println("--> throttleLast onError: " + e);
}
@Override
public void onComplete() {
System.out.println("--> throttleLast onComplete");
}
});
輸出:
--> throttleLast onSubscribe
--> DataSource doOnNext : 1
--> DataSource doOnNext : 2
-------------> throttleLast onNext: 2
--> DataSource doOnNext : 3
--> DataSource doOnNext : 4
-------------> throttleLast onNext: 4
--> DataSource doOnNext : 5
--> DataSource doOnNext : 6
-------------> throttleLast onNext: 6
--> DataSource doOnNext : 7
--> DataSource doOnNext : 8
-------------> throttleLast onNext: 8
--> DataSource doOnNext : 9
--> DataSource doOnNext : 10
--> throttleLast onComplete
Javadoc: throttleLast(intervalDuration, unit)
Javadoc: throttleLast(intervalDuration, unit, scheduler)
2.3 throttleWithTimeout(timeout, unit)
指定每個(gè)數(shù)據(jù)發(fā)射后在 timeout
時(shí)間內(nèi)愤估,原始數(shù)據(jù)序列中沒有下一個(gè)數(shù)據(jù)發(fā)射時(shí),發(fā)射此項(xiàng)數(shù)據(jù)速址,否則丟棄這項(xiàng)數(shù)據(jù)玩焰。此操作與 debounce
方法相同。
注意: 這個(gè)操作符會(huì)在原始數(shù)據(jù)的 onCompleted
時(shí)候直接發(fā)射發(fā)射數(shù)據(jù)芍锚,不會(huì)因?yàn)橄蘖鞫鴣G棄數(shù)據(jù)昔园。
實(shí)例代碼:
// 3. throttleWithTimeout(long timeout, TimeUnit unit)
// 發(fā)送一個(gè)數(shù)據(jù),如果在包含timeout時(shí)間內(nèi)并炮,沒有第二個(gè)數(shù)據(jù)發(fā)射默刚,那么就會(huì)發(fā)射此數(shù)據(jù),否則丟棄此數(shù)據(jù)
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1); // 下一個(gè)數(shù)據(jù)到此數(shù)據(jù)發(fā)射, --> skip: 30 < timeout
Thread.sleep(30);
emitter.onNext(2); // 下一個(gè)數(shù)據(jù)到此數(shù)據(jù)發(fā)射, --> skip: 50 = timeout
Thread.sleep(50);
emitter.onNext(3); // 下一個(gè)數(shù)據(jù)到此數(shù)據(jù)發(fā)射, --> deliver: 60 > timeout
Thread.sleep(60);
emitter.onNext(4); // onComplete --> deliver: onComplete
emitter.onComplete();
}
}).throttleWithTimeout(50, TimeUnit.MILLISECONDS) // 指定防抖丟棄時(shí)間段為50毫秒
// .throttleWithTimeout(50, TimeUnit.MILLISECONDS, Schedulers.newThread()) // 指定調(diào)度線程為newThread()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer t) throws Exception {
// TODO Auto-generated method stub
System.out.println("--> accept throttleWithTimeout(3): " + t);
}
});
輸出:
--> accept throttleWithTimeout(3): 3
--> accept throttleWithTimeout(3): 4
Javadoc: throttleWithTimeout(timeout, unit)
Javadoc: throttleWithTimeout(timeout, unit, scheduler)
3. Sample
sample
允許您通過將序列劃分為時(shí)間片段逃魄,并從每片中取出一個(gè)值來稀疏序列荤西。當(dāng)每片結(jié)束時(shí),將發(fā)出其中的最后一個(gè)值(如果有的話)伍俘。
注意: 時(shí)間的劃分不一定是統(tǒng)一的邪锌。例如,發(fā)射數(shù)據(jù)的時(shí)間間隔與劃分?jǐn)?shù)據(jù)的時(shí)間間隔一致時(shí)癌瘾,在原始數(shù)據(jù)發(fā)送的一個(gè)時(shí)間點(diǎn)(此時(shí)數(shù)據(jù)還沒有實(shí)際發(fā)送)觅丰,此時(shí)可能由于劃分時(shí)間已到,劃分的數(shù)據(jù)片直接關(guān)閉了妨退,所以有的時(shí)間片數(shù)據(jù)會(huì)有時(shí)間間隙差異妇萄。
3.1 sample(period, unit)
獲取每個(gè) period
時(shí)間片段內(nèi)手機(jī)收據(jù)序列的最后一項(xiàng),忽略此時(shí)間片內(nèi)收集的其他數(shù)據(jù)項(xiàng)咬荷。
實(shí)例代碼:
// 1. sample(long period, TimeUnit unit)/sample(long period, TimeUnit unit)
// 將序列分為 period 的時(shí)間片段冠句,從每片重取出最近的一個(gè)數(shù)據(jù)
// 等同于throttleLast
Observable.intervalRange(1, 5, 0, 1100, TimeUnit.MILLISECONDS)
.doOnNext(new Consumer<Long>() {
@Override
public void accept(Long t) throws Exception {
System.out.println("--> DataSource onNext: " + t);
}
}).sample(2, TimeUnit.SECONDS) // 每3秒時(shí)間段數(shù)據(jù)中取最近一個(gè)值
// .sample(2, TimeUnit.SECONDS, true) // 參數(shù)emitLast,設(shè)置是否忽略未采樣的最后一個(gè)數(shù)據(jù)
// .sample(2, TimeUnit.SECONDS, Schedulers.newThread()) // 指定調(diào)度器為newThread()
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long t) throws Exception {
System.out.println("--> accept(1): " + t);
}
});
輸出:
--> DataSource onNext: 1
--> DataSource onNext: 2
--> accept(1): 2
--> DataSource onNext: 3
--> DataSource onNext: 4
--> accept(1): 4
--> DataSource onNext: 5
Javadoc: sample(long period, TimeUnit unit)
Javadoc: sample(long period, TimeUnit unit, emitLast)
Javadoc: sample(long period, TimeUnit unit, scheduler)
Javadoc: sample(long period, TimeUnit unit, scheduler, emitLast)
3.2 sample(sampler)
sample
的這個(gè)方法每當(dāng)?shù)诙€(gè) sampler
發(fā)射一個(gè)數(shù)據(jù)(或者當(dāng)它終止)時(shí)就對(duì)原始 Observable 進(jìn)行采樣萍丐。第二個(gè)Observable通過參數(shù)傳遞給 sample
轩端。
實(shí)例代碼:
// 2. sample(ObservableSource sampler)
// 每當(dāng)?shù)诙€(gè) sampler 發(fā)射一個(gè)數(shù)據(jù)(或者當(dāng)它終止)時(shí)就對(duì)原始 Observable進(jìn)行采樣
Observable.intervalRange(1, 5, 0, 1020, TimeUnit.MILLISECONDS)
.doOnNext(new Consumer<Long>() {
@Override
public void accept(Long t) throws Exception {
System.out.println("--> DataSource onNext: " + t);
}
}).sample(Observable.interval(2, TimeUnit.SECONDS)) // 每隔2秒進(jìn)行一次采樣
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long t) throws Exception {
System.out.println("--> accept(2): " + t);
}
});
輸出:
--> DataSource onNext: 1
--> DataSource onNext: 2
--> accept(2): 2
--> DataSource onNext: 3
--> DataSource onNext: 4
--> accept(2): 4
--> DataSource onNext: 5
Javadoc: sample(sampler)
Javadoc: sample(sampler, emitLast)
4. Distinct
抑制(過濾掉)重復(fù)的數(shù)據(jù)項(xiàng)放典。Distinct 的過濾規(guī)則是:只允許還沒有發(fā)射過的數(shù)據(jù)項(xiàng)通過逝变。
在某些實(shí)現(xiàn)中,有一些方法中允許你調(diào)整判定兩個(gè)數(shù)據(jù)不同( distinct )的標(biāo)準(zhǔn)奋构。還有一些實(shí)現(xiàn)只比較一項(xiàng)數(shù)據(jù)和它的直接前驅(qū)壳影,因此只會(huì)從序列中過濾掉連續(xù)重復(fù)的數(shù)據(jù)。
4.1 distinct()
只允許還沒有發(fā)射過的數(shù)據(jù)項(xiàng)通過弥臼,過濾數(shù)據(jù)序列中的所有重復(fù)的數(shù)據(jù)項(xiàng)宴咧,保證處理后的數(shù)據(jù)序列沒有重復(fù)。
示例代碼:
// 1. distinct()
// 去除全部數(shù)據(jù)中重復(fù)的數(shù)據(jù)
Observable.just(1, 2, 3, 3, 3, 4, 4, 5, 6, 6)
.distinct()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer t) throws Exception {
System.out.println("--> accept distinct(1): " + t);
}
});
輸出:
--> accept distinct(1): 1
--> accept distinct(1): 2
--> accept distinct(1): 3
--> accept distinct(1): 4
--> accept distinct(1): 5
--> accept distinct(1): 6
Javadoc: distinct()
4.2 distinct(keySelector)
這個(gè)操作符接受一個(gè)函數(shù)径缅。這個(gè)函數(shù)根據(jù)原始Observable發(fā)射的數(shù)據(jù)項(xiàng)產(chǎn)生一個(gè) Key
掺栅,然后烙肺,比較這些Key而不是數(shù)據(jù)本身,來判定兩個(gè)數(shù)據(jù)是否是不同的氧卧。
實(shí)例代碼:
// 數(shù)根據(jù)原始Observable發(fā)射的數(shù)據(jù)項(xiàng)產(chǎn)生一個(gè) Key桃笙,然后比較這些Key而不是數(shù)據(jù)本身,來判定兩個(gè)數(shù)據(jù)是否是不同的(去除全部數(shù)據(jù)中重復(fù)的數(shù)據(jù))
Observable.just(1, 2, 3, 3, 4, 5, 6, 6)
.distinct(new Function<Integer, String>() {
@Override
public String apply(Integer t) throws Exception {
// 根據(jù)奇數(shù)或偶數(shù)來判斷數(shù)據(jù)序列的重復(fù)的key
return t % 2 == 0 ? "even" : "odd";
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer t) throws Exception {
System.out.println("--> accept distinct(2): " + t);
}
});
輸出:
--> accept distinct(2): 1
--> accept distinct(2): 2
Javadoc: distinct(keySelector)
4.3 distinctUntilChanged()
distinctUntilChanged
操作符沙绝,去除數(shù)據(jù)序列中的連續(xù)重復(fù)項(xiàng)搏明。它只判定一個(gè)數(shù)據(jù)和它的直接前驅(qū)是否是不同的。
實(shí)例代碼:
// 3. distinctUntilChanged()
// 去除連續(xù)重復(fù)的數(shù)據(jù)
Observable.just(1, 2, 3, 3, 4, 5, 6, 6, 3, 2)
.distinctUntilChanged()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer t) throws Exception {
System.out.println("--> accept distinctUntilChanged(3): " + t);
}
});
輸出:
--> accept distinctUntilChanged(3): 1
--> accept distinctUntilChanged(3): 2
--> accept distinctUntilChanged(3): 3
--> accept distinctUntilChanged(3): 4
--> accept distinctUntilChanged(3): 5
--> accept distinctUntilChanged(3): 6
--> accept distinctUntilChanged(3): 3
--> accept distinctUntilChanged(3): 2
Javadoc: distinctUntilChanged()
4.4 distinctUntilChanged(keySelector)
distinctUntilChanged(keySelector)
操作符闪檬,根據(jù)一個(gè)函數(shù)產(chǎn)生的 Key
判定兩個(gè)相鄰的數(shù)據(jù)項(xiàng)是不是相同的星著,去除連續(xù)重復(fù)的數(shù)據(jù)。
實(shí)例代碼:
// 4. distinctUntilChanged(Function<T,K>)
// 數(shù)根據(jù)原始Observable發(fā)射的數(shù)據(jù)項(xiàng)產(chǎn)生的 Key粗悯,去除連續(xù)重復(fù)的數(shù)據(jù)
Observable.just(8, 2, 3, 5, 9, 5, 6, 6)
.distinctUntilChanged(new Function<Integer, String>() {
@Override
public String apply(Integer t) throws Exception {
// 根據(jù)原始數(shù)據(jù)處理后添加key虚循,依據(jù)這個(gè)key來判斷是否重復(fù)(去除連續(xù)重復(fù)的數(shù)據(jù))
return t % 2 == 0 ? "even" : "odd";
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer t) throws Exception {
System.out.println("--> accept distinctUntilChanged(4): " + t);
}
});
輸出:
--> accept distinctUntilChanged(4): 8
--> accept distinctUntilChanged(4): 3
--> accept distinctUntilChanged(4): 6
Javadoc: distinctUntilChanged(keySelector)
5. Skip
主要用于忽略O(shè)bservable發(fā)射的指定的 N 項(xiàng)數(shù)據(jù),如跳過數(shù)據(jù)序列的前面或后面 N 項(xiàng)數(shù)據(jù)样傍,指定時(shí)間段內(nèi)的數(shù)據(jù)項(xiàng)邮丰。
Skip
操作符的還有一些變體的操作方法如下:
5.1 skip(count)
忽略 Observable
發(fā)射的前 N
項(xiàng)數(shù)據(jù),只保留之后的數(shù)據(jù)铭乾。
實(shí)例代碼:
// 1. skip(long count)
// 跳過前count項(xiàng)數(shù)據(jù)剪廉,保留后面的數(shù)據(jù)
Observable.range(1, 10)
.skip(5) // 過濾數(shù)據(jù)序列前5項(xiàng)數(shù)據(jù)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer t) throws Exception {
System.out.println("--> accept skip(1): " + t);
}
});
輸出:
--> accept skip(1): 6
--> accept skip(1): 7
--> accept skip(1): 8
--> accept skip(1): 9
--> accept skip(1): 10
Javadoc: skip(count)
5.2 skip(time, unit)
skip
的這個(gè)變體接受一個(gè)時(shí)長(zhǎng)參數(shù),它會(huì)丟棄原始Observable開始的那段時(shí)間段發(fā)射的數(shù)據(jù)炕檩,時(shí)長(zhǎng)和時(shí)間單位通過參數(shù)指定斗蒋。
實(shí)例代碼:
// 2. skip(long time, TimeUnit unit)
// 跳過開始的time時(shí)間段內(nèi)的數(shù)據(jù),保留后面的數(shù)據(jù)
Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS)
.skip(2, TimeUnit.SECONDS) // 跳過前2秒的數(shù)據(jù)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long t) throws Exception {
System.out.println("--> accept skip(2): " + t);
}
});
輸出:
--> accept skip(2): 4
--> accept skip(2): 5
Javadoc: skip(time, unit)
Javadoc: skip(time, unit, scheduler)
5.3 skipLast(count)
使用 SkipLast
操作符修改原始Observable笛质,你可以忽略O(shè)bservable發(fā)射的后 N
項(xiàng)數(shù)據(jù)泉沾,只保留前面的數(shù)據(jù)。
實(shí)例代碼:
// 3. skipLast(int count)
// 跳過數(shù)據(jù)后面的count個(gè)數(shù)據(jù)
Observable.range(1, 10)
.skipLast(5) // 跳過數(shù)據(jù)序列的后5項(xiàng)數(shù)據(jù)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer t) throws Exception {
System.out.println("--> accept skipLast(3): " + t);
}
});
輸出:
--> accept skipLast(3): 1
--> accept skipLast(3): 2
--> accept skipLast(3): 3
--> accept skipLast(3): 4
--> accept skipLast(3): 5
Javadoc: skipLast(count)
5.4 skipLast(time, unit)
還有一個(gè) skipLast
變體接受一個(gè)時(shí)間段參數(shù)妇押,它會(huì)丟棄在原始 Observable 的生命周期內(nèi)最后一段時(shí)間內(nèi)發(fā)射的數(shù)據(jù)跷究。時(shí)長(zhǎng)和時(shí)間單位通過參數(shù)指定。
注意: 這個(gè)機(jī)制是這樣實(shí)現(xiàn)的:延遲原始 Observable 發(fā)射的任何數(shù)據(jù)項(xiàng)敲霍,直到自原始數(shù)據(jù)發(fā)射之后過了給定的時(shí)長(zhǎng)之后俊马,才開始發(fā)送數(shù)據(jù)。
實(shí)例代碼:
// 4. skipLast(long time, TimeUnit unit, [boolean delayError])
// 丟棄在原始Observable的生命周 期內(nèi)最后time時(shí)間內(nèi)發(fā)射的數(shù)據(jù)
// 可選參數(shù)delayError:延遲異常通知
Observable.intervalRange(1, 10, 0, 1, TimeUnit.SECONDS)
.doOnNext(new Consumer<Long>() {
@Override
public void accept(Long t) throws Exception {
System.out.println("--> DataSource: " + t);
}
}).skipLast(2, TimeUnit.SECONDS)
// .skipLast(2, TimeUnit.SECONDS, Schedulers.trampoline()) // 通過scheduler指定工作線程
// .skipLast(2, TimeUnit.SECONDS, true) // 延遲Error的通知肩杈,多用于組合Observable的場(chǎng)景
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long t) throws Exception {
System.out.println("--> accept skipLast(4): " + t);
}
});
輸出:
--> DataSource: 1
--> DataSource: 2
--> DataSource: 3
--> accept skipLast(4): 1
--> DataSource: 4
--> accept skipLast(4): 2
--> DataSource: 5
--> accept skipLast(4): 3
--> DataSource: 6
--> accept skipLast(4): 4
--> DataSource: 7
--> accept skipLast(4): 5
--> DataSource: 8
--> accept skipLast(4): 6
--> DataSource: 9
--> accept skipLast(4): 7
--> DataSource: 10
--> accept skipLast(4): 8
注意: skipLast 的這個(gè)操作默認(rèn)在 computation 調(diào)度器上執(zhí)行柴我,但是你可以使用Scheduler參數(shù)指定其 它的調(diào)度器。
Javadoc: skipLast(time, unit)
Javadoc: skipLast(time, unit, delayError)
Javadoc: skipLast(time, unit, scheduler)
Javadoc: skipLast(time, unit, scheduler, delayError)
Javadoc: skipLast(time, unit, scheduler, delayError, bufferSize)
接續(xù):
后續(xù)的Rx相關(guān)數(shù)據(jù)過濾部分請(qǐng)參考: Rxjava2 Observable的數(shù)據(jù)過濾詳解及實(shí)例(二)
Rx介紹與講解及完整目錄參考:Rxjava2 介紹與詳解實(shí)例