六、過(guò)濾操作符
6.1、filter()
通過(guò)一定邏輯來(lái)過(guò)濾被觀(guān)察者發(fā)送的事件,如果返回 true
則會(huì)發(fā)送事件场晶,否則不會(huì)發(fā)送
Observable.just(1, 2, 3, 4).filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer % 2 == 0;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept : " + integer);
}
});
10-06 07:57:48.196 12753-12753/? E/MainActivity: accept : 2
10-06 07:57:48.196 12753-12753/? E/MainActivity: accept : 4
6.2、ofType()
過(guò)濾不符合該類(lèi)型的事件
Observable.just(1, 2, "Hi", 3, 4, "Hello").ofType(Integer.class).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept : " + integer);
}
});
10-06 07:59:41.265 12857-12857/leavesc.hello.rxjavademo E/MainActivity: accept : 1
10-06 07:59:41.265 12857-12857/leavesc.hello.rxjavademo E/MainActivity: accept : 2
10-06 07:59:41.265 12857-12857/leavesc.hello.rxjavademo E/MainActivity: accept : 3
10-06 07:59:41.265 12857-12857/leavesc.hello.rxjavademo E/MainActivity: accept : 4
6.3怠缸、skip()
以正序跳過(guò)指定數(shù)量的事件
Observable.just(1, 2, 3, 4).skip(2).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept : " + integer);
}
});
10-06 08:01:09.183 12971-12971/leavesc.hello.rxjavademo E/MainActivity: accept : 3
10-06 08:01:09.183 12971-12971/leavesc.hello.rxjavademo E/MainActivity: accept : 4
6.4诗轻、skipLast()
以反序跳過(guò)指定數(shù)量的事件
Observable.just(1, 2, 3, 4).skipLast(2).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept : " + integer);
}
});
10-06 08:02:00.753 13079-13079/leavesc.hello.rxjavademo E/MainActivity: accept : 1
10-06 08:02:00.753 13079-13079/leavesc.hello.rxjavademo E/MainActivity: accept : 2
6.5、distinct()
過(guò)濾事件序列中的重復(fù)事件
Observable.just(1, 2, 1, 2, 3, 4, 3).distinct().subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept : " + integer);
}
});
10-06 08:03:27.402 13189-13189/leavesc.hello.rxjavademo E/MainActivity: accept : 1
10-06 08:03:27.402 13189-13189/leavesc.hello.rxjavademo E/MainActivity: accept : 2
10-06 08:03:27.402 13189-13189/leavesc.hello.rxjavademo E/MainActivity: accept : 3
10-06 08:03:27.402 13189-13189/leavesc.hello.rxjavademo E/MainActivity: accept : 4
6.6凯旭、distinctUntilChanged()
過(guò)濾掉連續(xù)重復(fù)的事件
Observable.just(1, 2, 2, 1, 3, 4, 3, 3).distinctUntilChanged().subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept : " + integer);
}
});
10-06 08:04:44.531 13294-13294/leavesc.hello.rxjavademo E/MainActivity: accept : 1
10-06 08:04:44.541 13294-13294/leavesc.hello.rxjavademo E/MainActivity: accept : 2
10-06 08:04:44.541 13294-13294/leavesc.hello.rxjavademo E/MainActivity: accept : 1
10-06 08:04:44.541 13294-13294/leavesc.hello.rxjavademo E/MainActivity: accept : 3
10-06 08:04:44.541 13294-13294/leavesc.hello.rxjavademo E/MainActivity: accept : 4
10-06 08:04:44.541 13294-13294/leavesc.hello.rxjavademo E/MainActivity: accept : 3
6.7概耻、take()
控制觀(guān)察者接收事件的數(shù)量
Observable.just(1, 2, 2, 1, 3, 4, 3, 3).take(3).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept : " + integer);
}
});
10-06 08:05:43.520 13397-13397/? E/MainActivity: accept : 1
10-06 08:05:43.520 13397-13397/? E/MainActivity: accept : 2
10-06 08:05:43.520 13397-13397/? E/MainActivity: accept : 2
6.8、debounce()
如果兩個(gè)事件發(fā)送的時(shí)間間隔小于設(shè)定的時(shí)間間隔罐呼,則前一件事件不會(huì)發(fā)送給觀(guān)察者
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
Thread.sleep(900);
emitter.onNext(2);
}
}).debounce(1, TimeUnit.SECONDS).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept : " + integer);
}
});
10-06 08:08:59.337 13509-13523/leavesc.hello.rxjavademo E/MainActivity: accept : 2
6.9鞠柄、firstElement() && lastElement()
firstElement()
取事件序列的第一個(gè)元素,lastElement()
取事件序列的最后一個(gè)元素
Observable.just(1, 2, 3, 4, 5).firstElement().subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept : " + integer);
}
});
6.10嫉柴、elementAt() & elementAtOrError()
elementAt()
可以指定取出事件序列中事件厌杜,但是輸入的 index
超出事件序列的總數(shù)的話(huà)就不會(huì)觸發(fā)任何調(diào)用,想觸發(fā)異常信息的話(huà)就用 elementAtOrError()
Observable.just(1, 2, 3, 4, 5).elementAt(5).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept : " + integer);
}
});
以上代碼不會(huì)觸發(fā)任何
改用為 elementAtOrError()
计螺,則會(huì)拋出異常
Observable.just(1, 2, 3, 4, 5).elementAtOrError(5).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept : " + integer);
}
});
Process: leavesc.hello.rxjavademo, PID: 13948
io.reactivex.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call. Further reading: https://github.com/ReactiveX/RxJava/wiki/Error-Handling | null
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
at io.reactivex.internal.observers.ConsumerSingleObserver.onError(ConsumerSingleObserver.java:46)
at io.reactivex.internal.operators.observable.ObservableElementAtSingle$ElementAtObserver.onComplete(ObservableElementAtSingle.java:115)
at io.reactivex.internal.operators.observable.ObservableFromArray$FromArrayDisposable.run(ObservableFromArray.java:111)
at io.reactivex.internal.operators.observable.ObservableFromArray.subscribeActual(ObservableFromArray.java:37)
at io.reactivex.Observable.subscribe(Observable.java:12090)
at io.reactivex.internal.operators.observable.ObservableElementAtSingle.subscribeActual(ObservableElementAtSingle.java:37)
at io.reactivex.Single.subscribe(Single.java:3438)
at io.reactivex.Single.subscribe(Single.java:3424)
七夯尽、條件操作符
7.1、all()
判斷事件序列是否全部滿(mǎn)足某個(gè)事件登馒,如果都滿(mǎn)足則返回 true
匙握,反之則返回 false
Observable.just(1, 2, 3, 4, 5).all(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer % 2 == 0;
}
}).subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
Log.e(TAG, "accept: " + aBoolean);
}
});
10-06 08:16:10.212 14043-14043/leavesc.hello.rxjavademo E/MainActivity: accept: false
7.2、takeWhile()
發(fā)射原始 Observable
陈轿,直到指定的某個(gè)條件不成立的那一刻圈纺,它停止發(fā)射原始 Observable
,并終止自己的 Observable
Observable.just(1, 2, 3, 4, 5, 1, 2).takeWhile(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer < 4;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept: " + integer);
}
});
10-06 14:03:42.110 20095-20095/leavesc.hello.rxjavademo E/MainActivity: accept: 1
10-06 14:03:42.110 20095-20095/leavesc.hello.rxjavademo E/MainActivity: accept: 2
10-06 14:03:42.110 20095-20095/leavesc.hello.rxjavademo E/MainActivity: accept: 3
7.3麦射、skipWhile()
訂閱原始的 Observable
蛾娶,但是忽略它的發(fā)射物,直到指定的某個(gè)條件變?yōu)?false 時(shí)才開(kāi)始發(fā)射原始 Observable
Observable.just(1, 2, 4, 1, 3, 4, 5, 1, 5)
.skipWhile(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer < 3;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "integer " + integer);
}
});
10-06 13:59:40.583 19764-19764/leavesc.hello.rxjavademo E/MainActivity: integer 4
10-06 13:59:40.593 19764-19764/leavesc.hello.rxjavademo E/MainActivity: integer 1
10-06 13:59:40.593 19764-19764/leavesc.hello.rxjavademo E/MainActivity: integer 3
10-06 13:59:40.593 19764-19764/leavesc.hello.rxjavademo E/MainActivity: integer 4
10-06 13:59:40.593 19764-19764/leavesc.hello.rxjavademo E/MainActivity: integer 5
10-06 13:59:40.593 19764-19764/leavesc.hello.rxjavademo E/MainActivity: integer 1
10-06 13:59:40.593 19764-19764/leavesc.hello.rxjavademo E/MainActivity: integer 5
7.4潜秋、takeUntil()
用于設(shè)置一個(gè)條件蛔琅,當(dāng)事件滿(mǎn)足此條件時(shí),此事件會(huì)被發(fā)送峻呛,但之后的事件就不會(huì)被發(fā)送了
Observable.just(1, 2, 4, 1, 3, 4, 5, 1, 5)
.takeUntil(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer > 3;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "integer " + integer);
}
});
10-06 08:54:24.833 17208-17208/? E/MainActivity: integer 1
10-06 08:54:24.833 17208-17208/? E/MainActivity: integer 2
10-06 08:54:24.833 17208-17208/? E/MainActivity: integer 4
7.5罗售、skipUntil()
當(dāng) skipUntil()
中的 Observable
發(fā)送事件了,原始的 Observable
才會(huì)發(fā)送事件給觀(guān)察者
Observable.intervalRange(1, 6, 0, 1, TimeUnit.SECONDS)
.skipUntil(Observable.intervalRange(10, 3, 1, 1, TimeUnit.SECONDS))
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe");
}
@Override
public void onNext(Long along) {
Log.e(TAG, "onNext : " + along);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError");
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete");
}
});
10-06 08:51:16.926 16877-16877/leavesc.hello.rxjavademo E/MainActivity: onSubscribe
10-06 08:51:17.946 16877-16892/leavesc.hello.rxjavademo E/MainActivity: onNext : 2
10-06 08:51:18.936 16877-16892/leavesc.hello.rxjavademo E/MainActivity: onNext : 3
10-06 08:51:19.946 16877-16892/leavesc.hello.rxjavademo E/MainActivity: onNext : 4
10-06 08:51:20.936 16877-16892/leavesc.hello.rxjavademo E/MainActivity: onNext : 5
10-06 08:51:21.946 16877-16892/leavesc.hello.rxjavademo E/MainActivity: onNext : 6
10-06 08:51:21.946 16877-16892/leavesc.hello.rxjavademo E/MainActivity: onComplete
7.6杀饵、sequenceEqual()
判斷兩個(gè) Observable
發(fā)送的事件是否相同莽囤,如果兩個(gè)序列是相同的(相同的數(shù)據(jù),相同的順序切距,相同的終止?fàn)顟B(tài))朽缎,它就發(fā)射 true,否則發(fā)射 false
Observable.sequenceEqual(Observable.just(1, 2, 3), Observable.just(1, 2, 3))
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
Log.e(TAG, "accept aBoolean : " + aBoolean);
}
});
10-06 08:46:59.369 16492-16492/leavesc.hello.rxjavademo E/MainActivity: accept aBoolean : true
7.7、contains()
判斷事件序列中是否含有某個(gè)元素话肖,如果有則返回 true北秽,如果沒(méi)有則返回 false
Observable.just(1, 2, 3, 4).contains(2).subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
Log.e(TAG, "accept aBoolean : " + aBoolean);
}
});
10-06 08:45:58.100 16386-16386/leavesc.hello.rxjavademo E/MainActivity: accept aBoolean : true
7.8、isEmpty()
判斷事件序列是否為空
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onComplete();
}
}).isEmpty().subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
Log.e(TAG, "accept aBoolean: " + aBoolean);
}
});
10-06 08:43:43.201 16278-16278/leavesc.hello.rxjavademo E/MainActivity: accept aBoolean: true
7.9最筒、amb()
amb()
接收一個(gè) Observable
集合贺氓,但是只會(huì)發(fā)送最先發(fā)送事件的 Observable
中的事件,不管發(fā)射的是一項(xiàng)數(shù)據(jù)還是一個(gè) onError
或 onCompleted
通知床蜘,其余 Observable
將會(huì)被丟棄
List<Observable<Long>> list = new ArrayList<>();
list.add(Observable.intervalRange(1, 3, 2, 1, TimeUnit.SECONDS));
list.add(Observable.intervalRange(10, 3, 0, 1, TimeUnit.SECONDS));
Observable.amb(list).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e(TAG, "accept: " + aLong);
}
});
10-06 08:41:45.783 16053-16068/leavesc.hello.rxjavademo E/MainActivity: accept: 10
10-06 08:41:46.783 16053-16068/leavesc.hello.rxjavademo E/MainActivity: accept: 11
10-06 08:41:47.783 16053-16068/leavesc.hello.rxjavademo E/MainActivity: accept: 12
7.10辙培、defaultIfEmpty()
如果 Observable 沒(méi)有發(fā)射任何值,則可以利用這個(gè)方法發(fā)送一個(gè)默認(rèn)值
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onComplete();
}
}).defaultIfEmpty(100).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept: " + integer);
}
});
10-06 08:40:04.754 15945-15945/leavesc.hello.rxjavademo E/MainActivity: accept: 100