四、功能操作符
1、delay
Observable.just(1, 2, 3)
.delay(2, TimeUnit.SECONDS)
.subscribe(new Observer < Integer > () {
? ? @Override
? ? public void onSubscribe(Disposable d) {
? ? ? ? Log.d(TAG, "=======================onSubscribe");
? ? }
? ? @Override
? ? public void onNext(Integer integer) {
? ? ? ? Log.d(TAG, "=======================onNext " + integer);
? ? }
? ? @Override
? ? public void onError(Throwable e) {
? ? }
? ? @Override
? ? public void onComplete() {
? ? ? ? Log.d(TAG, "=======================onComplete");
? ? }
});
打印:
=======================onSubscribe
=======================onNext 1
=======================onNext 2
=======================onNext 3
=======================onComplete
說明:從打印結(jié)果可以看出 onSubscribe 回調(diào)2秒之后 onNext 才會回調(diào)。
2、doOnEach
Observable.create(new ObservableOnSubscribe < Integer > () {
? ? @Override
? ? public void subscribe(ObservableEmitter < Integer > e) throws Exception {
? ? ? ? e.onNext(1);
? ? ? ? e.onNext(2);
? ? ? ? e.onNext(3);
? ? ? ? //? ? ? e.onError(new NumberFormatException());
? ? ? ? e.onComplete();
? ? }
})
.doOnEach(new Consumer < Notification < Integer >> () {
? ? @Override
? ? public void accept(Notification < Integer > integerNotification) throws Exception {
? ? ? ? Log.d(TAG, "==================doOnEach " + integerNotification.getValue());
? ? }
})
.subscribe(new Observer < Integer > () {
? ? @Override
? ? public void onSubscribe(Disposable d) {
? ? ? ? Log.d(TAG, "==================onSubscribe ");
? ? }
? ? @Override
? ? public void onNext(Integer integer) {
? ? ? ? Log.d(TAG, "==================onNext " + integer);
? ? }
? ? @Override
? ? public void onError(Throwable e) {
? ? ? ? Log.d(TAG, "==================onError ");
? ? }
? ? @Override
? ? public void onComplete() {
? ? ? ? Log.d(TAG, "==================onComplete ");
? ? }
});
打印:
==================onSubscribe
==================doOnEach 1
==================onNext 1
==================doOnEach 2
==================onNext 2
==================doOnEach 3
==================onNext 3
==================doOnEach null
==================onComplete
說明:
Observable 每發(fā)送一件事件之前都會先回調(diào)這個方法彻消。從結(jié)果就可以看出每發(fā)送一個事件之前都會回調(diào) doOnEach 方法,并且可以取出 onNext() 發(fā)送的值宙拉。
3宾尚、doOnNext
Observable.create(new ObservableOnSubscribe < Integer > () {
? ? @Override
? ? public void subscribe(ObservableEmitter < Integer > e) throws Exception {
? ? ? ? e.onNext(1);
? ? ? ? e.onNext(2);
? ? ? ? e.onNext(3);
? ? ? ? e.onComplete();
? ? }
})
.doOnNext(new Consumer < Integer > () {
? ? @Override
? ? public void accept(Integer integer) throws Exception {
? ? ? ? Log.d(TAG, "==================doOnNext " + integer);
? ? }
})
.subscribe(new Observer < Integer > () {
? ? @Override
? ? public void onSubscribe(Disposable d) {
? ? ? ? Log.d(TAG, "==================onSubscribe ");
? ? }
? ? @Override
? ? public void onNext(Integer integer) {
? ? ? ? Log.d(TAG, "==================onNext " + integer);
? ? }
? ? @Override
? ? public void onError(Throwable e) {
? ? ? ? Log.d(TAG, "==================onError ");
? ? }
? ? @Override
? ? public void onComplete() {
? ? ? ? Log.d(TAG, "==================onComplete ");
? ? }
});
打印:
==================onSubscribe
==================doOnNext 1
==================onNext 1
==================doOnNext 2
==================onNext 2
==================doOnNext 3
==================onNext 3
==================onComplete
說明:Observable 每發(fā)送 onNext() 之前都會先回調(diào)這個方法谢澈。
4煌贴、doAfterNext
Observable.create(new ObservableOnSubscribe < Integer > () {
? ? @Override
? ? public void subscribe(ObservableEmitter < Integer > e) throws Exception {
? ? ? ? e.onNext(1);
? ? ? ? e.onNext(2);
? ? ? ? e.onNext(3);
? ? ? ? e.onComplete();
? ? }
})
.doAfterNext(new Consumer < Integer > () {
? ? @Override
? ? public void accept(Integer integer) throws Exception {
? ? ? ? Log.d(TAG, "==================doAfterNext " + integer);
? ? }
})
.subscribe(new Observer < Integer > () {
? ? @Override
? ? public void onSubscribe(Disposable d) {
? ? ? ? Log.d(TAG, "==================onSubscribe ");
? ? }
? ? @Override
? ? public void onNext(Integer integer) {
? ? ? ? Log.d(TAG, "==================onNext " + integer);
? ? }
? ? @Override
? ? public void onError(Throwable e) {
? ? ? ? Log.d(TAG, "==================onError ");
? ? }
? ? @Override
? ? public void onComplete() {
? ? ? ? Log.d(TAG, "==================onComplete ");
? ? }
});
打印:
==================onSubscribe
==================onNext 1
==================doAfterNext 1
==================onNext 2
==================doAfterNext 2
==================onNext 3
==================doAfterNext 3
==================onComplete
說明:Observable 每發(fā)送 onNext() 之后都會回調(diào)這個方法锥忿。
5牛郑、doOnComplete
Observable.create(new ObservableOnSubscribe < Integer > () {
? ? @Override
? ? public void subscribe(ObservableEmitter < Integer > e) throws Exception {
? ? ? ? e.onNext(1);
? ? ? ? e.onNext(2);
? ? ? ? e.onNext(3);
? ? ? ? e.onComplete();
? ? }
})
.doOnComplete(new Action() {
? ? @Override
? ? public void run() throws Exception {
? ? ? ? Log.d(TAG, "==================doOnComplete ");
? ? }
})
.subscribe(new Observer < Integer > () {
? ? @Override
? ? public void onSubscribe(Disposable d) {
? ? ? ? Log.d(TAG, "==================onSubscribe ");
? ? }
? ? @Override
? ? public void onNext(Integer integer) {
? ? ? ? Log.d(TAG, "==================onNext " + integer);
? ? }
? ? @Override
? ? public void onError(Throwable e) {
? ? ? ? Log.d(TAG, "==================onError ");
? ? }
? ? @Override
? ? public void onComplete() {
? ? ? ? Log.d(TAG, "==================onComplete ");
? ? }
});
打印:
==================onSubscribe
==================onNext 1
==================onNext 2
==================onNext 3
==================doOnComplete
==================onComplete
說明:Observable 每發(fā)送 onComplete() 之前都會回調(diào)這個方法敬鬓。
6淹朋、doOnError()
Observable.create(new ObservableOnSubscribe < Integer > () {
? ? @Override
? ? public void subscribe(ObservableEmitter < Integer > e) throws Exception {
? ? ? ? e.onNext(1);
? ? ? ? e.onNext(2);
? ? ? ? e.onNext(3);
? ? ? ? e.onError(new NullPointerException());
? ? }
})
.doOnError(new Consumer < Throwable > () {
? ? @Override
? ? public void accept(Throwable throwable) throws Exception {
? ? ? ? Log.d(TAG, "==================doOnError " + throwable);
? ? }
})
.subscribe(new Observer < Integer > () {
? ? @Override
? ? public void onSubscribe(Disposable d) {
? ? ? ? Log.d(TAG, "==================onSubscribe ");
? ? }
? ? @Override
? ? public void onNext(Integer integer) {
? ? ? ? Log.d(TAG, "==================onNext " + integer);
? ? }
? ? @Override
? ? public void onError(Throwable e) {
? ? ? ? Log.d(TAG, "==================onError ");
? ? }
? ? @Override
? ? public void onComplete() {
? ? ? ? Log.d(TAG, "==================onComplete ");
? ? }
});
打印:
==================onSubscribe
==================onNext 1
==================onNext 2
==================onNext 3
==================doOnError java.lang.NullPointerException
==================onError
說明:Observable 每發(fā)送 onError() 之前都會回調(diào)這個方法钉答。
7础芍、doOnSubscribe
Observable.create(new ObservableOnSubscribe < Integer > () {
? ? @Override
? ? public void subscribe(ObservableEmitter < Integer > e) throws Exception {
? ? ? ? e.onNext(1);
? ? ? ? e.onNext(2);
? ? ? ? e.onNext(3);
? ? ? ? e.onComplete();
? ? }
})
.doOnSubscribe(new Consumer < Disposable > () {
? ? @Override
? ? public void accept(Disposable disposable) throws Exception {
? ? ? ? Log.d(TAG, "==================doOnSubscribe ");
? ? }
})
.subscribe(new Observer < Integer > () {
? ? @Override
? ? public void onSubscribe(Disposable d) {
? ? ? ? Log.d(TAG, "==================onSubscribe ");
? ? }
? ? @Override
? ? public void onNext(Integer integer) {
? ? ? ? Log.d(TAG, "==================onNext " + integer);
? ? }
? ? @Override
? ? public void onError(Throwable e) {
? ? ? ? Log.d(TAG, "==================onError ");
? ? }
? ? @Override
? ? public void onComplete() {
? ? ? ? Log.d(TAG, "==================onComplete ");
? ? }
});
打印:
==================doOnSubscribe
==================onSubscribe
==================onNext 1
==================onNext 2
==================onNext 3
==================onComplete
說明:Observable 每發(fā)送 onSubscribe() 之前都會回調(diào)這個方法数尿。
8仑性、doOnDispose
Observable.create(new ObservableOnSubscribe < Integer > () {
? ? @Override
? ? public void subscribe(ObservableEmitter < Integer > e) throws Exception {
? ? ? ? e.onNext(1);
? ? ? ? e.onNext(2);
? ? ? ? e.onNext(3);
? ? ? ? e.onComplete();
? ? }
})
.doOnDispose(new Action() {
? ? @Override
? ? public void run() throws Exception {
? ? ? ? Log.d(TAG, "==================doOnDispose ");
? ? }
})
.subscribe(new Observer < Integer > () {
? ? private Disposable d;
? ? @Override
? ? public void onSubscribe(Disposable d) {
? ? ? ? Log.d(TAG, "==================onSubscribe ");
? ? ? ? this.d = d;
? ? }
? ? @Override
? ? public void onNext(Integer integer) {
? ? ? ? Log.d(TAG, "==================onNext " + integer);
? ? ? ? d.dispose();
? ? }
? ? @Override
? ? public void onError(Throwable e) {
? ? ? ? Log.d(TAG, "==================onError ");
? ? }
? ? @Override
? ? public void onComplete() {
? ? ? ? Log.d(TAG, "==================onComplete ");
? ? }
});
打印:
==================onSubscribe
==================onNext 1
==================doOnDispose
說明:當調(diào)用 Disposable 的 dispose() 之后回調(diào)該方法右蹦。
9诊杆、doOnLifecycle
Observable.create(new ObservableOnSubscribe<Integer>() {
? ? @Override
? ? public void subscribe(ObservableEmitter<Integer> e) throws Exception {
? ? ? ? e.onNext(1);
? ? ? ? e.onNext(2);
? ? ? ? e.onNext(3);
? ? ? ? e.onComplete();
? ? }
})
.doOnLifecycle(new Consumer<Disposable>() {
? ? @Override
? ? public void accept(Disposable disposable) throws Exception {
? ? ? ? Log.d(TAG, "==================doOnLifecycle accept");
? ? }
}, new Action() {
? ? @Override
? ? public void run() throws Exception {
? ? ? ? Log.d(TAG, "==================doOnLifecycle Action");
? ? }
})
.doOnDispose(
? ? new Action() {
? ? ? ? @Override
? ? ? ? public void run() throws Exception {
? ? ? ? ? ? Log.d(TAG, "==================doOnDispose Action");
? ? ? ? }
})
.subscribe(new Observer<Integer>() {
? ? private Disposable d;
? ? @Override
? ? public void onSubscribe(Disposable d) {
? ? ? ? Log.d(TAG, "==================onSubscribe ");
? ? ? ? this.d = d;
? ? }
? ? @Override
? ? public void onNext(Integer integer) {
? ? ? ? Log.d(TAG, "==================onNext " + integer);
? ? ? ? d.dispose();
? ? }
? ? @Override
? ? public void onError(Throwable e) {
? ? ? ? Log.d(TAG, "==================onError ");
? ? }
? ? @Override
? ? public void onComplete() {
? ? ? ? Log.d(TAG, "==================onComplete ");
? ? }
});
打印:
==================doOnLifecycle accept
==================onSubscribe
==================onNext 1
==================doOnDispose Action
==================doOnLifecycle Action
說明:
在回調(diào) onSubscribe 之前回調(diào)該方法的第一個參數(shù)的回調(diào)方法何陆,可以使用該回調(diào)方法決定是否取消訂閱刽辙。
doOnLifecycle() 第二個參數(shù)的回調(diào)方法的作用與 doOnDispose() 是一樣的。
可以看到當在 onNext() 方法進行取消訂閱操作后甲献,doOnDispose() 和 doOnLifecycle() 都會被回調(diào)宰缤。
如果使用 doOnLifecycle 進行取消訂閱
打印:
==================doOnLifecycle accept
==================onSubscrib
說明:
可以發(fā)現(xiàn) doOnDispose Action 和 doOnLifecycle Action 都沒有被回調(diào)晃洒。
10慨灭、doOnTerminate() & doAfterTerminate()
Observable.create(new ObservableOnSubscribe<Integer>() {
? ? @Override
? ? public void subscribe(ObservableEmitter<Integer> e) throws Exception {
? ? ? ? e.onNext(1);
? ? ? ? e.onNext(2);
? ? ? ? e.onNext(3);
//? ? ? e.onError(new NullPointerException());
? ? ? ? e.onComplete();
? ? }
})
.doOnTerminate(new Action() {
? ? @Override
? ? public void run() throws Exception {
? ? ? ? Log.d(TAG, "==================doOnTerminate ");
? ? }
})
.subscribe(new Observer<Integer>() {
? ? @Override
? ? public void onSubscribe(Disposable d) {
? ? ? ? Log.d(TAG, "==================onSubscribe ");
? ? }
? ? @Override
? ? public void onNext(Integer integer) {
? ? ? ? Log.d(TAG, "==================onNext " + integer);
? ? }
? ? @Override
? ? public void onError(Throwable e) {
? ? ? ? Log.d(TAG, "==================onError ");
? ? }
? ? @Override
? ? public void onComplete() {
? ? ? ? Log.d(TAG, "==================onComplete ");
? ? }
});
打印:
==================onSubscribe
==================onNext 1
==================onNext 2
==================onNext 3
==================doOnTerminate
==================onComplete
說明:doOnTerminate 是在 onError 或者 onComplete 發(fā)送之前回調(diào)球及,而 doAfterTerminate 則是 onError 或者 onComplete 發(fā)送之后回調(diào)氧骤。doAfterTerminate 也是差不多,這里就不再贅述吃引。
11筹陵、doFinally
Observable.create(new ObservableOnSubscribe<Integer>() {
? ? @Override
? ? public void subscribe(ObservableEmitter<Integer> e) throws Exception {
? ? ? ? e.onNext(1);
? ? ? ? e.onNext(2);
? ? ? ? e.onNext(3);
? ? ? ? e.onComplete();
? ? }
})
.doFinally(new Action() {
? ? @Override
? ? public void run() throws Exception {
? ? ? ? Log.d(TAG, "==================doFinally ");
? ? }
})
.doOnDispose(new Action() {
? ? @Override
? ? public void run() throws Exception {
? ? ? ? Log.d(TAG, "==================doOnDispose ");
? ? }
})
.doAfterTerminate(new Action() {
? ? @Override
? ? public void run() throws Exception {
? ? ? ? Log.d(TAG, "==================doAfterTerminate ");
? ? }
})
.subscribe(new Observer<Integer>() {
? ? private Disposable d;
? ? @Override
? ? public void onSubscribe(Disposable d) {
? ? ? ? Log.d(TAG, "==================onSubscribe ");
? ? ? ? this.d = d;
? ? }
? ? @Override
? ? public void onNext(Integer integer) {
? ? ? ? Log.d(TAG, "==================onNext " + integer);
? ? ? ? d.dispose();
? ? }
? ? @Override
? ? public void onError(Throwable e) {
? ? ? ? Log.d(TAG, "==================onError ");
? ? }
? ? @Override
? ? public void onComplete() {
? ? ? ? Log.d(TAG, "==================onComplete ");
? ? }
});
打庸舸浮:
==================onSubscribe
==================onNext 1
==================doOnDispose
==================doFinally
說明:在所有事件發(fā)送完畢之后回調(diào)該方法。doFinally() 和 doAfterTerminate() 到底有什么區(qū)別朦佩?區(qū)別就是在于取消訂閱并思,如果取消訂閱之后 doAfterTerminate() 就不會被回調(diào),而 doFinally() 無論怎么樣都會被回調(diào)语稠,且都會在事件序列的最后宋彼。可以看到如果調(diào)用了 dispose() 方法仙畦,doAfterTerminate() 不會被回調(diào)输涕。
現(xiàn)在試試把 dispose() 注釋掉看看
打印:
==================onSubscribe
==================onNext 1
==================onNext 2
==================onNext 3
==================onComplete
==================doAfterTerminate
==================doFinally
說明:doAfterTerminate() 已經(jīng)成功回調(diào)慨畸,doFinally() 還是會在事件序列的最后莱坎。
12、onErrorReturn
Observable.create(new ObservableOnSubscribe<Integer>() {
? ? @Override
? ? public void subscribe(ObservableEmitter<Integer> e) throws Exception {
? ? ? ? e.onNext(1);
? ? ? ? e.onNext(2);
? ? ? ? e.onNext(3);
? ? ? ? e.onError(new NullPointerException());
? ? }
})
.onErrorReturn(new Function<Throwable, Integer>() {
? ? @Override
? ? public Integer apply(Throwable throwable) throws Exception {
? ? ? ? Log.d(TAG, "==================onErrorReturn " + throwable);
? ? ? ? return 404;
? ? }
})
.subscribe(new Observer<Integer>() {
? ? @Override
? ? public void onSubscribe(Disposable d) {
? ? ? ? Log.d(TAG, "==================onSubscribe ");
? ? }
? ? @Override
? ? public void onNext(Integer integer) {
? ? ? ? Log.d(TAG, "==================onNext " + integer);
? ? }
? ? @Override
? ? public void onError(Throwable e) {
? ? ? ? Log.d(TAG, "==================onError ");
? ? }
? ? @Override
? ? public void onComplete() {
? ? ? ? Log.d(TAG, "==================onComplete ");
? ? }
});
打哟缡俊:
==================onSubscribe
==================onNext 1
==================onNext 2
==================onNext 3
==================onErrorReturn java.lang.NullPointerException
==================onNext 404
==================onComplete
說明:當接收到一個 onError() 事件之后回調(diào)檐什,返回的值會回調(diào) onNext() 方法,并正常結(jié)束該事件序列碉京。
13厢汹、onErrorResumeNext
Observable.create(new ObservableOnSubscribe<Integer>() {
? ? @Override
? ? public void subscribe(ObservableEmitter<Integer> e) throws Exception {
? ? ? ? e.onNext(1);
? ? ? ? e.onNext(2);
? ? ? ? e.onNext(3);
? ? ? ? e.onError(new NullPointerException());
? ? }
})
.onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {
? ? @Override
? ? public ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception {
? ? ? ? Log.d(TAG, "==================onErrorResumeNext " + throwable);
? ? ? ? return Observable.just(4, 5, 6);
? ? }
})
.subscribe(new Observer<Integer>() {
? ? @Override
? ? public void onSubscribe(Disposable d) {
? ? ? ? Log.d(TAG, "==================onSubscribe ");
? ? }
? ? @Override
? ? public void onNext(Integer integer) {
? ? ? ? Log.d(TAG, "==================onNext " + integer);
? ? }
? ? @Override
? ? public void onError(Throwable e) {
? ? ? ? Log.d(TAG, "==================onError ");
? ? }
? ? @Override
? ? public void onComplete() {
? ? ? ? Log.d(TAG, "==================onComplete ");
? ? }
});
打用睢:
==================onSubscribe
==================onNext 1
==================onNext 2
==================onNext 3
==================onErrorResumeNext java.lang.NullPointerException
==================onNext 4
==================onNext 5
==================onNext 6
==================onComplete
說明:當接收到 onError() 事件時谐宙,返回一個新的 Observable,并正常結(jié)束事件序列界弧。
14凡蜻、onExceptionResumeNext
Observable.create(new ObservableOnSubscribe<Integer>() {
? ? @Override
? ? public void subscribe(ObservableEmitter<Integer> e) throws Exception {
? ? ? ? e.onNext(1);
? ? ? ? e.onNext(2);
? ? ? ? e.onNext(3);
? ? ? ? e.onError(new Exception("404"));
? ? }
})
.onExceptionResumeNext(new Observable<Integer>() {
? ? @Override
? ? protected void subscribeActual(Observer<? super Integer> observer) {
? ? ? ? observer.onNext(333);
? ? ? ? observer.onComplete();
? ? }
})
.subscribe(new Observer<Integer>() {
? ? @Override
? ? public void onSubscribe(Disposable d) {
? ? ? ? Log.d(TAG, "==================onSubscribe ");
? ? }
? ? @Override
? ? public void onNext(Integer integer) {
? ? ? ? Log.d(TAG, "==================onNext " + integer);
? ? }
? ? @Override
? ? public void onError(Throwable e) {
? ? ? ? Log.d(TAG, "==================onError ");
? ? }
? ? @Override
? ? public void onComplete() {
? ? ? ? Log.d(TAG, "==================onComplete ");
? ? }
});
打印:
==================onSubscribe
==================onNext 1
==================onNext 2
==================onNext 3
==================onNext 333
==================onComplete
說明:與 onErrorResumeNext() 作用基本一致垢箕,但是這個方法只能捕捉 Exception划栓。
15、retry
Observable.create(new ObservableOnSubscribe<Integer>() {
? ? @Override
? ? public void subscribe(ObservableEmitter<Integer> e) throws Exception {
? ? ? ? e.onNext(1);
? ? ? ? e.onNext(2);
? ? ? ? e.onNext(3);
? ? ? ? e.onError(new Exception("404"));
? ? }
})
.retry(2)
.subscribe(new Observer<Integer>() {
? ? @Override
? ? public void onSubscribe(Disposable d) {
? ? ? ? Log.d(TAG, "==================onSubscribe ");
? ? }
? ? @Override
? ? public void onNext(Integer integer) {
? ? ? ? Log.d(TAG, "==================onNext " + integer);
? ? }
? ? @Override
? ? public void onError(Throwable e) {
? ? ? ? Log.d(TAG, "==================onError ");
? ? }
? ? @Override
? ? public void onComplete() {
? ? ? ? Log.d(TAG, "==================onComplete ");
? ? }
});
打犹趸瘛:
==================onSubscribe
==================onNext 1
==================onNext 2
==================onNext 3
==================onNext 1
==================onNext 2
==================onNext 3
==================onNext 1
==================onNext 2
==================onNext 3
==================onError
說明:如果出現(xiàn)錯誤事件忠荞,則會重新發(fā)送所有事件序列。times 是代表重新發(fā)的次數(shù)帅掘。
16委煤、retryUntil
Observable.create(new ObservableOnSubscribe < Integer > () {
? ? @Override
? ? public void subscribe(ObservableEmitter < Integer > e) throws Exception {
? ? ? ? e.onNext(1);
? ? ? ? e.onNext(2);
? ? ? ? e.onNext(3);
? ? ? ? e.onError(new Exception("404"));
? ? }
})
.retryUntil(new BooleanSupplier() {
? ? @Override
? ? public boolean getAsBoolean() throws Exception {
? ? ? ? if (i == 6) {
? ? ? ? ? ? return true;
? ? ? ? }
? ? ? ? return false;
? ? }
})
.subscribe(new Observer < Integer > () {
? ? @Override
? ? public void onSubscribe(Disposable d) {
? ? ? ? Log.d(TAG, "==================onSubscribe ");
? ? }
? ? @Override
? ? public void onNext(Integer integer) {
? ? ? ? i += integer;
? ? ? ? Log.d(TAG, "==================onNext " + integer);
? ? }
? ? @Override
? ? public void onError(Throwable e) {
? ? ? ? Log.d(TAG, "==================onError ");
? ? }
? ? @Override
? ? public void onComplete() {
? ? ? ? Log.d(TAG, "==================onComplete ");
? ? }
});
打印:
==================onSubscribe
==================onNext 1
==================onNext 2
==================onNext 3
==================onError
說明:出現(xiàn)錯誤事件之后修档,可以通過此方法判斷是否繼續(xù)發(fā)送事件碧绞。
17、retryWhen
Observable.create(new ObservableOnSubscribe < String > () {
? ? @Override
? ? public void subscribe(ObservableEmitter < String > e) throws Exception {
? ? ? ? e.onNext("chan");
? ? ? ? e.onNext("ze");
? ? ? ? e.onNext("de");
? ? ? ? e.onError(new Exception("404"));
? ? ? ? e.onNext("haha");
? ? }
})
.retryWhen(new Function < Observable < Throwable > , ObservableSource <? >> () {
? ? @Override
? ? public ObservableSource <? > apply(Observable < Throwable > throwableObservable) throws Exception {
? ? ? ? return throwableObservable.flatMap(new Function < Throwable, ObservableSource <? >> () {
? ? ? ? ? ? @Override
? ? ? ? ? ? public ObservableSource <? > apply(Throwable throwable) throws Exception {
? ? ? ? ? ? ? ? if(!throwable.toString().equals("java.lang.Exception: 404")) {
? ? ? ? ? ? ? ? ? ? return Observable.just("可以忽略的異常");
? ? ? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? ? ? return Observable.error(new Throwable("終止啦"));
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? });
? ? }
})
.subscribe(new Observer < String > () {
? ? @Override
? ? public void onSubscribe(Disposable d) {
? ? ? ? Log.d(TAG, "==================onSubscribe ");
? ? }
? ? @Override
? ? public void onNext(String s) {
? ? ? ? Log.d(TAG, "==================onNext " + s);
? ? }
? ? @Override
? ? public void onError(Throwable e) {
? ? ? ? Log.d(TAG, "==================onError " + e.toString());
? ? }
? ? @Override
? ? public void onComplete() {
? ? ? ? Log.d(TAG, "==================onComplete ");
? ? }
});
打又ㄎ选:
==================onSubscribe
==================onNext chan
==================onNext ze
==================onNext de
==================onError java.lang.Throwable: 終止啦
說明:當被觀察者接收到異臣チ冢或者錯誤事件時會回調(diào)該方法迫靖,這個方法會返回一個新的被觀察者。如果返回的被觀察者發(fā)送 Error 事件則之前的被觀察者不會繼續(xù)發(fā)送事件兴使,如果發(fā)送正常事件則之前的被觀察者會繼續(xù)不斷重試發(fā)送事件系宜。
將 onError(new Exception("404")) 改為 onError(new Exception("303"))
打印:
==================onNext chan
==================onNext ze
==================onNext de
==================onNext chan
==================onNext ze
==================onNext de
==================onNext chan
==================onNext ze
==================onNext de
==================onNext chan
==================onNext ze
==================onNext de
==================onNext chan
==================onNext ze
==================onNext de
==================onNext chan
......
說明:不斷的重復發(fā)送消息
18鲫惶、repeat
Observable.create(new ObservableOnSubscribe < Integer > () {
? ? @Override
? ? public void subscribe(ObservableEmitter < Integer > e) throws Exception {
? ? ? ? e.onNext(1);
? ? ? ? e.onNext(2);
? ? ? ? e.onNext(3);
? ? ? ? e.onComplete();
? ? }
})
.repeat(2)
.subscribe(new Observer < Integer > () {
? ? @Override
? ? public void onSubscribe(Disposable d) {
? ? ? ? Log.d(TAG, "===================onSubscribe ");
? ? }
? ? @Override
? ? public void onNext(Integer integer) {
? ? ? ? Log.d(TAG, "===================onNext " + integer);
? ? }
? ? @Override
? ? public void onError(Throwable e) {
? ? }
? ? @Override
? ? public void onComplete() {
? ? ? ? Log.d(TAG, "===================onComplete ");
? ? }
});
打域谑住:
===================onSubscribe
===================onNext 1
===================onNext 2
===================onNext 3
===================onNext 1
===================onNext 2
===================onNext 3
===================onComplete
說明:重復發(fā)送被觀察者的事件,times 為發(fā)送次數(shù)欠母。該事件發(fā)送了兩次欢策。
19、repeatWhen
Observable.create(new ObservableOnSubscribe < Integer > () {
? ? @Override
? ? public void subscribe(ObservableEmitter < Integer > e) throws Exception {
? ? ? ? e.onNext(1);
? ? ? ? e.onNext(2);
? ? ? ? e.onNext(3);
? ? ? ? e.onComplete();
? ? }
})
.repeatWhen(new Function < Observable < Object > , ObservableSource <? >> () {
? ? @Override
? ? public ObservableSource <? > apply(Observable < Object > objectObservable) throws Exception {
? ? ? ? return Observable.empty();
? ? //? return Observable.error(new Exception("404"));
? ? //? return Observable.just(4); null;
? ? }
})
.subscribe(new Observer < Integer > () {
? ? @Override
? ? public void onSubscribe(Disposable d) {
? ? ? ? Log.d(TAG, "===================onSubscribe ");
? ? }
? ? @Override
? ? public void onNext(Integer integer) {
? ? ? ? Log.d(TAG, "===================onNext " + integer);
? ? }
? ? @Override
? ? public void onError(Throwable e) {
? ? ? ? Log.d(TAG, "===================onError ");
? ? }
? ? @Override
? ? public void onComplete() {
? ? ? ? Log.d(TAG, "===================onComplete ");
? ? }
});
打由吞省:
===================onSubscribe
===================onComplete
說明:這個方法可以會返回一個新的被觀察者設定一定邏輯來決定是否重復發(fā)送事件踩寇。這里分三種情況,如果新的被觀察者返回 onComplete 或者 onError 事件六水,則舊的被觀察者不會繼續(xù)發(fā)送事件俺孙。如果被觀察者返回其他事件,則會重復發(fā)送事件掷贾。
下面直接看看發(fā)送 onError 事件和其他事件的打印結(jié)果睛榄。
===================onSubscribe
===================onError
發(fā)送其他事件的打印結(jié)果:
===================onSubscribe
===================onNext 1
===================onNext 2
===================onNext 3
===================onComplete
20、subscribeOn
Observable.create(new ObservableOnSubscribe < Integer > () {
? ? @Override
? ? public void subscribe(ObservableEmitter < Integer > e) throws Exception {
? ? ? ? Log.d(TAG, "=========================currentThread name: " + Thread.currentThread().getName());
? ? ? ? e.onNext(1);
? ? ? ? e.onNext(2);
? ? ? ? e.onNext(3);
? ? ? ? e.onComplete();
? ? }
})
//.subscribeOn(Schedulers.newThread())
.subscribe(new Observer < Integer > () {
? ? @Override
? ? public void onSubscribe(Disposable d) {
? ? ? ? Log.d(TAG, "======================onSubscribe");
? ? }
? ? @Override
? ? public void onNext(Integer integer) {
? ? ? ? Log.d(TAG, "======================onNext " + integer);
? ? }
? ? @Override
? ? public void onError(Throwable e) {
? ? ? ? Log.d(TAG, "======================onError");
? ? }
? ? @Override
? ? public void onComplete() {
? ? ? ? Log.d(TAG, "======================onComplete");
? ? }
});
打酉胨А:
======================onSubscribe
=========================currentThread name: main
======================onNext 1
======================onNext 2
======================onNext 3
======================onComplete
說明:上面的打印是不調(diào)用subscribeOn
打映⊙ァ:
======================onSubscribe
=========================currentThread name: RxNewThreadScheduler-1
======================onNext 1
======================onNext 2
======================onNext 3
======================onComplete
說明:調(diào)動了subscribeOn(Schedulers.newThread())
現(xiàn)在看看多次調(diào)用代碼如下:
Observable.create(new ObservableOnSubscribe < Integer > () {
? ? @Override
? ? public void subscribe(ObservableEmitter < Integer > e) throws Exception {
? ? ? ? Log.d(TAG, "=========================currentThread name: " + Thread.currentThread().getName());
? ? ? ? e.onNext(1);
? ? ? ? e.onNext(2);
? ? ? ? e.onNext(3);
? ? ? ? e.onComplete();
? ? }
})
.subscribeOn(Schedulers.computation())
.subscribeOn(Schedulers.newThread())
.subscribe(new Observer < Integer > () {@Override
? ? public void onSubscribe(Disposable d) {
? ? ? ? Log.d(TAG, "======================onSubscribe");
? ? }
? ? @Override
? ? public void onNext(Integer integer) {
? ? ? ? Log.d(TAG, "======================onNext " + integer);
? ? }
? ? @Override
? ? public void onError(Throwable e) {
? ? ? ? Log.d(TAG, "======================onError");
? ? }
? ? @Override
? ? public void onComplete() {
? ? ? ? Log.d(TAG, "======================onComplete");
? ? }
});
打印:
======================onSubscribe
=========================currentThread name: RxComputationThreadPool-1
======================onNext 1
======================onNext 2
======================onNext 3
======================onComplete
說明:可以看到第二次調(diào)動的 subscribeOn(Schedulers.newThread()) 并沒有效果港准。
21旨剥、observeOn
Observable.just(1, 2, 3)
.observeOn(Schedulers.newThread())
.flatMap(new Function < Integer, ObservableSource < String >> () {
? ? @Override
? ? public ObservableSource < String > apply(Integer integer) throws Exception {
? ? ? ? Log.d(TAG, "======================flatMap Thread name " + Thread.currentThread().getName());
? ? ? ? return Observable.just("chan" + integer);
? ? }
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer < String > () {
? ? @Override
? ? public void onSubscribe(Disposable d) {
? ? ? ? Log.d(TAG, "======================onSubscribe");
? ? }
? ? @Override
? ? public void onNext(String s) {
? ? ? ? Log.d(TAG, "======================onNext Thread name " + Thread.currentThread().getName());
? ? ? ? Log.d(TAG, "======================onNext " + s);
? ? }
? ? @Override
? ? public void onError(Throwable e) {
? ? ? ? Log.d(TAG, "======================onError");
? ? }
? ? @Override
? ? public void onComplete() {
? ? ? ? Log.d(TAG, "======================onComplete");
? ? }
});
打印:
======================onSubscribe
======================flatMap Thread name RxNewThreadScheduler-1
======================flatMap Thread name RxNewThreadScheduler-1
======================flatMap Thread name RxNewThreadScheduler-1
======================onNext Thread name main
======================onNext chan1
======================onNext Thread name main
======================onNext chan2
======================onNext Thread name main
======================onNext chan3
======================onComplete
說明:指定觀察者的線程浅缸,每指定一次就會生效一次轨帜。從打印結(jié)果可以知道,observeOn 成功切換了線程衩椒。
總結(jié)RxJava中的調(diào)度器:
Schedulers.computation(?)
用于使用計算任務蚌父,如事件循環(huán)和回調(diào)處理
Schedulers.immediate(?)
當前線程
Schedulers.io(?)
用于 IO 密集型任務,如果異步阻塞 IO 操作毛萌。
Schedulers.newThread(?)
創(chuàng)建一個新的線程
AndroidSchedulers.mainThread()
Android 的 UI 線程苟弛,用于操作 UI。
五朝聋、過濾操作符
1嗡午、filter
Observable.just(1, 2, 3)
? ? .filter(new Predicate < Integer > () {
? ? ? ? @Override
? ? ? ? public boolean test(Integer integer) throws Exception {
? ? ? ? ? ? return integer < 2;
? ? ? ? }
})
.subscribe(new Observer < Integer > () {
? ? @Override
? ? public void onSubscribe(Disposable d) {
? ? ? ? Log.d(TAG, "==================onSubscribe ");
? ? }
? ? @Override
? ? public void onNext(Integer integer) {
? ? ? ? i += integer;
? ? ? ? Log.d(TAG, "==================onNext " + integer);
? ? }
? ? @Override
? ? public void onError(Throwable e) {
? ? ? ? Log.d(TAG, "==================onError ");
? ? }
? ? @Override
? ? public void onComplete() {
? ? ? ? Log.d(TAG, "==================onComplete ");
? ? }
});
打印:
==================onSubscribe
==================onNext 1
==================onComplete
說明:通過一定邏輯來過濾被觀察者發(fā)送的事件冀痕,如果返回 true 則會發(fā)送事件荔睹,否則不會發(fā)送亿柑。以上代碼只有小于2的事件才會發(fā)送责语,
2孵户、ofType
Observable.just(1, 2, 3, "chan", "zhide")
.ofType(Integer.class)
.subscribe(new Observer < Integer > () {
? ? @Override
? ? public void onSubscribe(Disposable d) {
? ? ? ? Log.d(TAG, "==================onSubscribe ");
? ? }
? ? @Override
? ? public void onNext(Integer integer) {
? ? ? ? i += integer;
? ? ? ? Log.d(TAG, "==================onNext " + integer);
? ? }
? ? @Override
? ? public void onError(Throwable e) {
? ? ? ? Log.d(TAG, "==================onError ");
? ? }
? ? @Override
? ? public void onComplete() {
? ? ? ? Log.d(TAG, "==================onComplete ");
? ? }
});
打踊防俊:
==================onSubscribe
==================onNext 1
==================onNext 2
==================onNext 3
==================onComplete
說明:可以過濾不符合該類型事件
3、skip
Observable.just(1, 2, 3)
.skip(2)
.subscribe(new Observer < Integer > () {
? ? @Override
? ? public void onSubscribe(Disposable d) {
? ? ? ? Log.d(TAG, "==================onSubscribe ");
? ? }
? ? @Override
? ? public void onNext(Integer integer) {
? ? ? ? i += integer;
? ? ? ? Log.d(TAG, "==================onNext " + integer);
? ? }
? ? @Override
? ? public void onError(Throwable e) {
? ? ? ? Log.d(TAG, "==================onError ");
? ? }
? ? @Override
? ? public void onComplete() {
? ? ? ? Log.d(TAG, "==================onComplete ");
? ? }
});
打佣洲帧:
==================onSubscribe
==================onNext 3
==================onComplete
說明:跳過正序某些事件满哪,count 代表跳過事件的數(shù)量(skipLast() 作用也是跳過某些事件,不過它是用來跳過正序的后面的事件劝篷,這里就不再講解了哨鸭。)
4、distinct
Observable.just(1, 2, 3, 3, 2, 1)
.distinct()
.subscribe(new Observer < Integer > () {
? ? @Override
? ? public void onSubscribe(Disposable d) {
? ? ? ? Log.d(TAG, "==================onSubscribe ");
? ? }
? ? @Override
? ? public void onNext(Integer integer) {
? ? ? ? i += integer;
? ? ? ? Log.d(TAG, "==================onNext " + integer);
? ? }
? ? @Override
? ? public void onError(Throwable e) {
? ? ? ? Log.d(TAG, "==================onError ");
? ? }
? ? @Override
? ? public void onComplete() {
? ? ? ? Log.d(TAG, "==================onComplete ");
? ? }
});
打咏考恕:
==================onSubscribe
==================onNext 1
==================onNext 2
==================onNext 3
==================onComplete
說明:過濾事件序列中的重復事件像鸡。
5、distinctUntilChanged
Observable.just(1, 2, 3, 3, 2, 1)
.distinctUntilChanged()
.subscribe(new Observer < Integer > () {
? ? @Override
? ? public void onSubscribe(Disposable d) {
? ? ? ? Log.d(TAG, "==================onSubscribe ");
? ? }
? ? @Override
? ? public void onNext(Integer integer) {
? ? ? ? i += integer;
? ? ? ? Log.d(TAG, "==================onNext " + integer);
? ? }
? ? @Override
? ? public void onError(Throwable e) {
? ? ? ? Log.d(TAG, "==================onError ");
? ? }
? ? @Override
? ? public void onComplete() {
? ? ? ? Log.d(TAG, "==================onComplete ");
? ? }
});
打印:
==================onSubscribe
==================onNext 1
==================onNext 2
==================onNext 3
==================onNext 2
==================onNext 1
==================onComplete
說明:過濾掉連續(xù)重復的事件哈恰,因為事件序列中連續(xù)出現(xiàn)兩次3只估,所以第二次3并不會發(fā)出。
6着绷、take
Observable.just(1, 2, 3, 4, 5)
.take(3)
.subscribe(new Observer < Integer > () {
? ? @Override
? ? public void onSubscribe(Disposable d) {
? ? ? ? Log.d(TAG, "==================onSubscribe ");
? ? }
? ? @Override
? ? public void onNext(Integer integer) {
? ? ? ? i += integer;
? ? ? ? Log.d(TAG, "==================onNext " + integer);
? ? }
? ? @Override
? ? public void onError(Throwable e) {
? ? ? ? Log.d(TAG, "==================onError ");
? ? }
? ? @Override
? ? public void onComplete() {
? ? ? ? Log.d(TAG, "==================onComplete ");
? ? }
});
打踊赘啤:
==================onSubscribe
==================onNext 1
==================onNext 2
==================onNext 3
==================onComplete
說明:控制觀察者接收的事件的數(shù)量,takeLast() 的作用就是控制觀察者只能接受事件序列的后面幾件事情荠医,這里就不再講解了吁脱,大家可以自己試試。
7子漩、debounce
Observable.create(new ObservableOnSubscribe < Integer > () {
? ? @Override
? ? public void subscribe(ObservableEmitter < Integer > e) throws Exception {
? ? ? ? e.onNext(1);
? ? ? ? Thread.sleep(900);
? ? ? ? e.onNext(2);
? ? }
})
.debounce(1, TimeUnit.SECONDS)
.subscribe(new Observer < Integer > () {
? ? @Override
? ? public void onSubscribe(Disposable d) {
? ? ? ? Log.d(TAG, "===================onSubscribe ");
? ? }
? ? @Override
? ? public void onNext(Integer integer) {
? ? ? ? Log.d(TAG, "===================onNext " + integer);
? ? }
? ? @Override
? ? public void onError(Throwable e) {
? ? ? ? Log.d(TAG, "===================onError ");
? ? }
? ? @Override
? ? public void onComplete() {
? ? ? ? Log.d(TAG, "===================onComplete ");
? ? }
});
打釉バ:
===================onSubscribe
===================onNext 2
說明:如果兩件事件發(fā)送的時間間隔小于設定的時間間隔則前一件事件就不會發(fā)送給觀察者石洗。
可以看到事件1并沒有發(fā)送出去幢泼,現(xiàn)在將間隔時間改為1000,
打咏采馈:
===================onSubscribe
===================onNext 1
===================onNext 2
throttleWithTimeout() 與此方法的作用一樣缕棵,這里就不再贅述了。
8涉兽、firstElement() && lastElement()
Observable.just(1, 2, 3, 4)
.firstElement()
.subscribe(new Consumer < Integer > () {
? ? @Override
? ? public void accept(Integer integer) throws Exception {
? ? ? ? Log.d(TAG, "====================firstElement " + integer);
? ? }
});
Observable.just(1, 2, 3, 4)
.lastElement()
.subscribe(new Consumer < Integer > () {
? ? @Override
? ? public void accept(Integer integer) throws Exception {
? ? ? ? Log.d(TAG, "====================lastElement " + integer);
? ? }
});
打诱新俊:
====================firstElement 1
====================lastElement 4
說明:firstElement() 取事件序列的第一個元素,lastElement() 取事件序列的最后一個元素枷畏。
9别厘、elementAt() & elementAtOrError()
Observable.just(1, 2, 3, 4)
.elementAt(0)
.subscribe(new Consumer < Integer > () {
? ? @Override
? ? public void accept(Integer integer) throws Exception {
? ? ? ? Log.d(TAG, "====================accept " + integer);
? ? }
});
打印:
====================accept 1
說明:elementAt() 可以指定取出事件序列中事件拥诡,但是輸入的 index 超出事件序列的總數(shù)的話就不會出現(xiàn)任何結(jié)果触趴。這種情況下氮发,你想發(fā)出異常信息的話就用 elementAtOrError() 。
將 elementAt() 的值改為5冗懦,這時是沒有打印結(jié)果的爽冕,因為沒有滿足條件的元素。
替換 elementAt() 為 elementAtOrError()披蕉,代碼如下:
Observable.just(1, 2, 3, 4)
.elementAtOrError(5)
.subscribe(new Consumer < Integer > () {
? ? @Override
? ? public void accept(Integer integer) throws Exception {
? ? ? ? Log.d(TAG, "====================accept " + integer);
? ? }
});
打泳被:
io.reactivex.exceptions.OnErrorNotImplementedException
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java: 704)
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java: 701)
at io.reactivex.internal.observers.ConsumerSingleObserver.onError(ConsumerSingleObserver.java: 47)
at io.reactivex.internal.operators.observable.ObservableElementAtSingle$ElementAtObserver.onComplete(ObservableElementAtSingle.java: 117)
at io.reactivex.internal.operators.observable.ObservableFromArray$FromArrayDisposable.run(ObservableFromArray.java: 110)
at io.reactivex.internal.operators.observable.ObservableFromArray.subscribeActual(ObservableFromArray.java: 36)
at io.reactivex.Observable.subscribe(Observable.java: 10903)
at io.reactivex.internal.operators.observable.ObservableElementAtSingle.subscribeActual(ObservableElementAtSingle.java: 37)
at io.reactivex.Single.subscribe(Single.java: 2707)
at io.reactivex.Single.subscribe(Single.java: 2693)
at io.reactivex.Single.subscribe(Single.java: 2664)
at com.example.rxjavademo.MainActivity.onCreate(MainActivity.java: 103)
at android.app.Activity.performCreate(Activity.java: 6942)
at android.app.Instrumentation.callActivityOnCreate(Instrumentation.java: 1126)
at android.app.ActivityThread.performLaunchActivity(ActivityThread.java: 2880)
at android.app.ActivityThread.handleLaunchActivity(ActivityThread.java: 2988)
at android.app.ActivityThread. - wrap14(ActivityThread.java)
at android.app.ActivityThread$H.handleMessage(ActivityThread.java: 1631)
at android.os.Handler.dispatchMessage(Handler.java: 102)
at android.os.Looper.loop(Looper.java: 154)
at android.app.ActivityThread.main(ActivityThread.java: 6682)
at java.lang.reflect.Method.invoke(Native Method)
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java: 1520)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java: 1410)
Caused by: java.util.NoSuchElementException
at io.reactivex.internal.operators.observable.ObservableElementAtSingle$ElementAtObserver.onComplete(ObservableElementAtSingle.java: 117)
at io.reactivex.internal.operators.observable.ObservableFromArray$FromArrayDisposable.run(ObservableFromArray.java: 110)
at io.reactivex.internal.operators.observable.ObservableFromArray.subscribeActual(ObservableFromArray.java: 36)
at io.reactivex.Observable.subscribe(Observable.java: 10903)
at io.reactivex.internal.operators.observable.ObservableElementAtSingle.subscribeActual(ObservableElementAtSingle.java: 37)
at io.reactivex.Single.subscribe(Single.java: 2707)
at io.reactivex.Single.subscribe(Single.java: 2693)
at io.reactivex.Single.subscribe(Single.java: 2664)
at com.example.rxjavademo.MainActivity.onCreate(MainActivity.java: 103)
at android.app.Activity.performCreate(Activity.java: 6942)
at android.app.Instrumentation.callActivityOnCreate(Instrumentation.java: 1126)
at android.app.ActivityThread.performLaunchActivity(ActivityThread.java: 2880)
at android.app.ActivityThread.handleLaunchActivity(ActivityThread.java: 2988)
at android.app.ActivityThread. - wrap14(ActivityThread.java)
at android.app.ActivityThread$H.handleMessage(ActivityThread.java: 1631)
at android.os.Handler.dispatchMessage(Handler.java: 102)
at android.os.Looper.loop(Looper.java: 154)
at android.app.ActivityThread.main(ActivityThread.java: 6682)
at java.lang.reflect.Method.invoke(Native Method)
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java: 1520)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java: 1410)
說明:這時候會拋出 NoSuchElementException 異常。
六没讲、條件操作符
1眯娱、all
Observable.just(1, 2, 3, 4)
.all(new Predicate < Integer > () {
? ? @Override
? ? public boolean test(Integer integer) throws Exception {
? ? ? ? return integer < 5;
? ? }
})
.subscribe(new Consumer < Boolean > () {
? ? @Override
? ? public void accept(Boolean aBoolean) throws Exception {
? ? ? ? Log.d(TAG, "==================aBoolean " + aBoolean);
? ? }
});
打印:
==================aBoolean true
說明:判斷事件序列是否全部滿足某個事件爬凑,如果都滿足則返回 true困乒,反之則返回 false。
2贰谣、takeWhile
Observable.just(1, 2, 3, 4)
.takeWhile(new Predicate < Integer > () {
? ? @Override
? ? public boolean test(Integer integer) throws Exception {
? ? ? ? return integer < 3;
? ? }
})
.subscribe(new Consumer < Integer > () {
? ? @Override
? ? public void accept(Integer integer) throws Exception {
? ? ? ? Log.d(TAG, "========================integer " + integer);
? ? }
});
打幽嚷А:
========================integer 1
========================integer 2
說明:可以設置條件,當某個數(shù)據(jù)滿足條件時就會發(fā)送該數(shù)據(jù)吱抚,反之則不發(fā)送百宇。
3、skipWhile
Observable.just(1, 2, 3, 4)
.skipWhile(new Predicate < Integer > () {
? ? @Override
? ? public boolean test(Integer integer) throws Exception {
? ? ? ? return integer < 3;
? ? }
})
.subscribe(new Consumer < Integer > () {
? ? @Override
? ? public void accept(Integer integer) throws Exception {
? ? ? ? Log.d(TAG, "========================integer " + integer);
? ? }
});
打用乇:
========================integer 3
========================integer 4
說明:可以設置條件携御,當某個數(shù)據(jù)滿足條件時不發(fā)送該數(shù)據(jù),反之則發(fā)送既绕。
4啄刹、takeUntil
Observable.just(1, 2, 3, 4, 5, 6)
.takeUntil(new Predicate < Integer > () {
? ? @Override
? ? public boolean test(Integer integer) throws Exception {
? ? ? ? return integer > 3;
? ? }
})
.subscribe(new Consumer < Integer > () {
? ? @Override
? ? public void accept(Integer integer) throws Exception {
? ? ? ? Log.d(TAG, "========================integer " + integer);
? ? }
});
打印:
========================integer 1
========================integer 2
========================integer 3
========================integer 4
說明:可以設置條件凄贩,當事件滿足此條件時誓军,下一次的事件就不會被發(fā)送了。
5疲扎、skipUntil
Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS)
.skipUntil(Observable.intervalRange(6, 5, 3, 1, TimeUnit.SECONDS))
.subscribe(new Observer < Long > () {
? ? @Override
? ? public void onSubscribe(Disposable d) {
? ? ? ? Log.d(TAG, "========================onSubscribe ");
? ? }
? ? @Override
? ? public void onNext(Long along) {
? ? ? ? Log.d(TAG, "========================onNext " + along);
? ? }
? ? @Override
? ? public void onError(Throwable e) {
? ? ? ? Log.d(TAG, "========================onError ");
? ? }
? ? @Override
? ? public void onComplete() {
? ? ? ? Log.d(TAG, "========================onComplete ");
? ? }
});
打雨鞘薄:
========================onSubscribe
========================onNext 4
========================onNext 5
========================onComplete
說明:當 skipUntil() 中的 Observable 發(fā)送事件了,原來的 Observable 才會發(fā)送事件給觀察者椒丧。
6壹甥、sequenceEqual
Observable.sequenceEqual(Observable.just(1, 2, 3),
Observable.just(1, 2, 3))
.subscribe(new Consumer < Boolean > () {
? ? @Override
? ? public void accept(Boolean aBoolean) throws Exception {
? ? ? ? Log.d(TAG, "========================onNext " + aBoolean);
? ? }
});
打印:
========================onNext true
說明:判斷兩個 Observable 發(fā)送的事件是否相同壶熏。
7句柠、contains
Observable.just(1, 2, 3)
.contains(3)
.subscribe(new Consumer < Boolean > () {
? ? @Override
? ? public void accept(Boolean aBoolean) throws Exception {
? ? ? ? Log.d(TAG, "========================onNext " + aBoolean);
? ? }
});
打印:
========================onNext true
說明:判斷事件序列中是否含有某個元素,如果有則返回 true溯职,如果沒有則返回 false管怠。
8、isEmpty
Observable.create(new ObservableOnSubscribe < Integer > () {
? ? @Override
? ? public void subscribe(ObservableEmitter < Integer > e) throws Exception {
? ? ? ? e.onComplete();
? ? }
})
.isEmpty()
.subscribe(new Consumer < Boolean > () {
? ? @Override
? ? public void accept(Boolean aBoolean) throws Exception {
? ? ? ? Log.d(TAG, "========================onNext " + aBoolean);
? ? }
});
打痈组:
========================onNext true
說明:判斷事件序列是否為空渤弛。
9、amb
ArrayList < Observable < Long >> list = new ArrayList < > ();
list.add(Observable.intervalRange(1, 5, 2, 1, TimeUnit.SECONDS));
list.add(Observable.intervalRange(6, 5, 0, 1, TimeUnit.SECONDS));
Observable.amb(list)
.subscribe(new Consumer < Long > () {
? ? @Override
? ? public void accept(Long aLong) throws Exception {
? ? ? ? Log.d(TAG, "========================aLong " + aLong);
? ? }
});
打由醮:
========================aLong 6
========================aLong 7
========================aLong 8
========================aLong 9
========================aLong 10
說明:amb() 要傳入一個 Observable 集合她肯,但是只會發(fā)送最先發(fā)送事件的 Observable 中的事件,其余 Observable 將會被丟棄鹰贵。
10晴氨、Observable.create(new ObservableOnSubscribe < Integer > () {
? ? @Override
? ? public void subscribe(ObservableEmitter < Integer > e) throws Exception {
? ? ? ? e.onComplete();
? ? }
})
.defaultIfEmpty(666)
.subscribe(new Consumer < Integer > () {
? ? @Override
? ? public void accept(Integer integer) throws Exception {
? ? ? ? Log.d(TAG, "========================onNext " + integer);
? ? }
});
打印:
========================onNext 666
說明:如果被觀察者只發(fā)送一個 onComplete() 事件碉输,則可以利用這個方法發(fā)送一個值籽前。