RxJava2.x常用操作符總結(jié)(二)

四、功能操作符

1、delay

Observable.just(1, 2, 3)

.delay(2, TimeUnit.SECONDS)

.subscribe(new Observer < Integer > () {

? ? @Override

? ? public void onSubscribe(Disposable d) {

? ? ? ? Log.d(TAG, "=======================onSubscribe");

? ? }

? ? @Override

? ? public void onNext(Integer integer) {

? ? ? ? Log.d(TAG, "=======================onNext " + integer);

? ? }

? ? @Override

? ? public void onError(Throwable e) {

? ? }

? ? @Override

? ? public void onComplete() {

? ? ? ? Log.d(TAG, "=======================onComplete");

? ? }

});

打印:

=======================onSubscribe

=======================onNext 1

=======================onNext 2

=======================onNext 3

=======================onComplete

說明:從打印結(jié)果可以看出 onSubscribe 回調(diào)2秒之后 onNext 才會回調(diào)。

2、doOnEach

Observable.create(new ObservableOnSubscribe < Integer > () {

? ? @Override

? ? public void subscribe(ObservableEmitter < Integer > e) throws Exception {

? ? ? ? e.onNext(1);

? ? ? ? e.onNext(2);

? ? ? ? e.onNext(3);

? ? ? ? //? ? ? e.onError(new NumberFormatException());

? ? ? ? e.onComplete();

? ? }

})

.doOnEach(new Consumer < Notification < Integer >> () {

? ? @Override

? ? public void accept(Notification < Integer > integerNotification) throws Exception {

? ? ? ? Log.d(TAG, "==================doOnEach " + integerNotification.getValue());

? ? }

})

.subscribe(new Observer < Integer > () {

? ? @Override

? ? public void onSubscribe(Disposable d) {

? ? ? ? Log.d(TAG, "==================onSubscribe ");

? ? }

? ? @Override

? ? public void onNext(Integer integer) {

? ? ? ? Log.d(TAG, "==================onNext " + integer);

? ? }

? ? @Override

? ? public void onError(Throwable e) {

? ? ? ? Log.d(TAG, "==================onError ");

? ? }

? ? @Override

? ? public void onComplete() {

? ? ? ? Log.d(TAG, "==================onComplete ");

? ? }

});

打印:

==================onSubscribe

==================doOnEach 1

==================onNext 1

==================doOnEach 2

==================onNext 2

==================doOnEach 3

==================onNext 3

==================doOnEach null

==================onComplete

說明:

Observable 每發(fā)送一件事件之前都會先回調(diào)這個方法彻消。從結(jié)果就可以看出每發(fā)送一個事件之前都會回調(diào) doOnEach 方法,并且可以取出 onNext() 發(fā)送的值宙拉。

3宾尚、doOnNext

Observable.create(new ObservableOnSubscribe < Integer > () {

? ? @Override

? ? public void subscribe(ObservableEmitter < Integer > e) throws Exception {

? ? ? ? e.onNext(1);

? ? ? ? e.onNext(2);

? ? ? ? e.onNext(3);

? ? ? ? e.onComplete();

? ? }

})

.doOnNext(new Consumer < Integer > () {

? ? @Override

? ? public void accept(Integer integer) throws Exception {

? ? ? ? Log.d(TAG, "==================doOnNext " + integer);

? ? }

})

.subscribe(new Observer < Integer > () {

? ? @Override

? ? public void onSubscribe(Disposable d) {

? ? ? ? Log.d(TAG, "==================onSubscribe ");

? ? }

? ? @Override

? ? public void onNext(Integer integer) {

? ? ? ? Log.d(TAG, "==================onNext " + integer);

? ? }

? ? @Override

? ? public void onError(Throwable e) {

? ? ? ? Log.d(TAG, "==================onError ");

? ? }

? ? @Override

? ? public void onComplete() {

? ? ? ? Log.d(TAG, "==================onComplete ");

? ? }

});

打印:

==================onSubscribe

==================doOnNext 1

==================onNext 1

==================doOnNext 2

==================onNext 2

==================doOnNext 3

==================onNext 3

==================onComplete

說明:Observable 每發(fā)送 onNext() 之前都會先回調(diào)這個方法谢澈。

4煌贴、doAfterNext

Observable.create(new ObservableOnSubscribe < Integer > () {

? ? @Override

? ? public void subscribe(ObservableEmitter < Integer > e) throws Exception {

? ? ? ? e.onNext(1);

? ? ? ? e.onNext(2);

? ? ? ? e.onNext(3);

? ? ? ? e.onComplete();

? ? }

})

.doAfterNext(new Consumer < Integer > () {

? ? @Override

? ? public void accept(Integer integer) throws Exception {

? ? ? ? Log.d(TAG, "==================doAfterNext " + integer);

? ? }

})

.subscribe(new Observer < Integer > () {

? ? @Override

? ? public void onSubscribe(Disposable d) {

? ? ? ? Log.d(TAG, "==================onSubscribe ");

? ? }

? ? @Override

? ? public void onNext(Integer integer) {

? ? ? ? Log.d(TAG, "==================onNext " + integer);

? ? }

? ? @Override

? ? public void onError(Throwable e) {

? ? ? ? Log.d(TAG, "==================onError ");

? ? }

? ? @Override

? ? public void onComplete() {

? ? ? ? Log.d(TAG, "==================onComplete ");

? ? }

});

打印:

==================onSubscribe

==================onNext 1

==================doAfterNext 1

==================onNext 2

==================doAfterNext 2

==================onNext 3

==================doAfterNext 3

==================onComplete

說明:Observable 每發(fā)送 onNext() 之后都會回調(diào)這個方法锥忿。

5牛郑、doOnComplete

Observable.create(new ObservableOnSubscribe < Integer > () {

? ? @Override

? ? public void subscribe(ObservableEmitter < Integer > e) throws Exception {

? ? ? ? e.onNext(1);

? ? ? ? e.onNext(2);

? ? ? ? e.onNext(3);

? ? ? ? e.onComplete();

? ? }

})

.doOnComplete(new Action() {

? ? @Override

? ? public void run() throws Exception {

? ? ? ? Log.d(TAG, "==================doOnComplete ");

? ? }

})

.subscribe(new Observer < Integer > () {

? ? @Override

? ? public void onSubscribe(Disposable d) {

? ? ? ? Log.d(TAG, "==================onSubscribe ");

? ? }

? ? @Override

? ? public void onNext(Integer integer) {

? ? ? ? Log.d(TAG, "==================onNext " + integer);

? ? }

? ? @Override

? ? public void onError(Throwable e) {

? ? ? ? Log.d(TAG, "==================onError ");

? ? }

? ? @Override

? ? public void onComplete() {

? ? ? ? Log.d(TAG, "==================onComplete ");

? ? }

});

打印:

==================onSubscribe

==================onNext 1

==================onNext 2

==================onNext 3

==================doOnComplete

==================onComplete

說明:Observable 每發(fā)送 onComplete() 之前都會回調(diào)這個方法敬鬓。

6淹朋、doOnError()

Observable.create(new ObservableOnSubscribe < Integer > () {

? ? @Override

? ? public void subscribe(ObservableEmitter < Integer > e) throws Exception {

? ? ? ? e.onNext(1);

? ? ? ? e.onNext(2);

? ? ? ? e.onNext(3);

? ? ? ? e.onError(new NullPointerException());

? ? }

})

.doOnError(new Consumer < Throwable > () {

? ? @Override

? ? public void accept(Throwable throwable) throws Exception {

? ? ? ? Log.d(TAG, "==================doOnError " + throwable);

? ? }

})

.subscribe(new Observer < Integer > () {

? ? @Override

? ? public void onSubscribe(Disposable d) {

? ? ? ? Log.d(TAG, "==================onSubscribe ");

? ? }

? ? @Override

? ? public void onNext(Integer integer) {

? ? ? ? Log.d(TAG, "==================onNext " + integer);

? ? }

? ? @Override

? ? public void onError(Throwable e) {

? ? ? ? Log.d(TAG, "==================onError ");

? ? }

? ? @Override

? ? public void onComplete() {

? ? ? ? Log.d(TAG, "==================onComplete ");

? ? }

});

打印:

==================onSubscribe

==================onNext 1

==================onNext 2

==================onNext 3

==================doOnError java.lang.NullPointerException

==================onError

說明:Observable 每發(fā)送 onError() 之前都會回調(diào)這個方法钉答。

7础芍、doOnSubscribe

Observable.create(new ObservableOnSubscribe < Integer > () {

? ? @Override

? ? public void subscribe(ObservableEmitter < Integer > e) throws Exception {

? ? ? ? e.onNext(1);

? ? ? ? e.onNext(2);

? ? ? ? e.onNext(3);

? ? ? ? e.onComplete();

? ? }

})

.doOnSubscribe(new Consumer < Disposable > () {

? ? @Override

? ? public void accept(Disposable disposable) throws Exception {

? ? ? ? Log.d(TAG, "==================doOnSubscribe ");

? ? }

})

.subscribe(new Observer < Integer > () {

? ? @Override

? ? public void onSubscribe(Disposable d) {

? ? ? ? Log.d(TAG, "==================onSubscribe ");

? ? }

? ? @Override

? ? public void onNext(Integer integer) {

? ? ? ? Log.d(TAG, "==================onNext " + integer);

? ? }

? ? @Override

? ? public void onError(Throwable e) {

? ? ? ? Log.d(TAG, "==================onError ");

? ? }

? ? @Override

? ? public void onComplete() {

? ? ? ? Log.d(TAG, "==================onComplete ");

? ? }

});

打印:

==================doOnSubscribe

==================onSubscribe

==================onNext 1

==================onNext 2

==================onNext 3

==================onComplete

說明:Observable 每發(fā)送 onSubscribe() 之前都會回調(diào)這個方法数尿。

8仑性、doOnDispose

Observable.create(new ObservableOnSubscribe < Integer > () {

? ? @Override

? ? public void subscribe(ObservableEmitter < Integer > e) throws Exception {

? ? ? ? e.onNext(1);

? ? ? ? e.onNext(2);

? ? ? ? e.onNext(3);

? ? ? ? e.onComplete();

? ? }

})

.doOnDispose(new Action() {

? ? @Override

? ? public void run() throws Exception {

? ? ? ? Log.d(TAG, "==================doOnDispose ");

? ? }

})

.subscribe(new Observer < Integer > () {

? ? private Disposable d;


? ? @Override

? ? public void onSubscribe(Disposable d) {

? ? ? ? Log.d(TAG, "==================onSubscribe ");

? ? ? ? this.d = d;

? ? }

? ? @Override

? ? public void onNext(Integer integer) {

? ? ? ? Log.d(TAG, "==================onNext " + integer);

? ? ? ? d.dispose();

? ? }

? ? @Override

? ? public void onError(Throwable e) {

? ? ? ? Log.d(TAG, "==================onError ");

? ? }

? ? @Override

? ? public void onComplete() {

? ? ? ? Log.d(TAG, "==================onComplete ");

? ? }

});

打印:

==================onSubscribe

==================onNext 1

==================doOnDispose

說明:當調(diào)用 Disposable 的 dispose() 之后回調(diào)該方法右蹦。

9诊杆、doOnLifecycle

Observable.create(new ObservableOnSubscribe<Integer>() {

? ? @Override

? ? public void subscribe(ObservableEmitter<Integer> e) throws Exception {

? ? ? ? e.onNext(1);

? ? ? ? e.onNext(2);

? ? ? ? e.onNext(3);

? ? ? ? e.onComplete();

? ? }

})

.doOnLifecycle(new Consumer<Disposable>() {

? ? @Override

? ? public void accept(Disposable disposable) throws Exception {

? ? ? ? Log.d(TAG, "==================doOnLifecycle accept");

? ? }

}, new Action() {

? ? @Override

? ? public void run() throws Exception {

? ? ? ? Log.d(TAG, "==================doOnLifecycle Action");

? ? }

})

.doOnDispose(

? ? new Action() {

? ? ? ? @Override

? ? ? ? public void run() throws Exception {

? ? ? ? ? ? Log.d(TAG, "==================doOnDispose Action");

? ? ? ? }

})

.subscribe(new Observer<Integer>() {

? ? private Disposable d;

? ? @Override

? ? public void onSubscribe(Disposable d) {

? ? ? ? Log.d(TAG, "==================onSubscribe ");

? ? ? ? this.d = d;

? ? }

? ? @Override

? ? public void onNext(Integer integer) {

? ? ? ? Log.d(TAG, "==================onNext " + integer);

? ? ? ? d.dispose();

? ? }

? ? @Override

? ? public void onError(Throwable e) {

? ? ? ? Log.d(TAG, "==================onError ");

? ? }

? ? @Override

? ? public void onComplete() {

? ? ? ? Log.d(TAG, "==================onComplete ");

? ? }


});

打印:

==================doOnLifecycle accept

==================onSubscribe

==================onNext 1

==================doOnDispose Action

==================doOnLifecycle Action

說明:

在回調(diào) onSubscribe 之前回調(diào)該方法的第一個參數(shù)的回調(diào)方法何陆,可以使用該回調(diào)方法決定是否取消訂閱刽辙。

doOnLifecycle() 第二個參數(shù)的回調(diào)方法的作用與 doOnDispose() 是一樣的。

可以看到當在 onNext() 方法進行取消訂閱操作后甲献,doOnDispose() 和 doOnLifecycle() 都會被回調(diào)宰缤。

如果使用 doOnLifecycle 進行取消訂閱

打印:

==================doOnLifecycle accept

==================onSubscrib

說明:

可以發(fā)現(xiàn) doOnDispose Action 和 doOnLifecycle Action 都沒有被回調(diào)晃洒。

10慨灭、doOnTerminate() & doAfterTerminate()

Observable.create(new ObservableOnSubscribe<Integer>() {

? ? @Override

? ? public void subscribe(ObservableEmitter<Integer> e) throws Exception {

? ? ? ? e.onNext(1);

? ? ? ? e.onNext(2);

? ? ? ? e.onNext(3);

//? ? ? e.onError(new NullPointerException());

? ? ? ? e.onComplete();

? ? }

})

.doOnTerminate(new Action() {

? ? @Override

? ? public void run() throws Exception {

? ? ? ? Log.d(TAG, "==================doOnTerminate ");

? ? }

})

.subscribe(new Observer<Integer>() {

? ? @Override

? ? public void onSubscribe(Disposable d) {

? ? ? ? Log.d(TAG, "==================onSubscribe ");

? ? }

? ? @Override

? ? public void onNext(Integer integer) {

? ? ? ? Log.d(TAG, "==================onNext " + integer);

? ? }

? ? @Override

? ? public void onError(Throwable e) {

? ? ? ? Log.d(TAG, "==================onError ");

? ? }

? ? @Override

? ? public void onComplete() {

? ? ? ? Log.d(TAG, "==================onComplete ");

? ? }


});

打印:

==================onSubscribe

==================onNext 1

==================onNext 2

==================onNext 3

==================doOnTerminate

==================onComplete

說明:doOnTerminate 是在 onError 或者 onComplete 發(fā)送之前回調(diào)球及,而 doAfterTerminate 則是 onError 或者 onComplete 發(fā)送之后回調(diào)氧骤。doAfterTerminate 也是差不多,這里就不再贅述吃引。

11筹陵、doFinally

Observable.create(new ObservableOnSubscribe<Integer>() {

? ? @Override

? ? public void subscribe(ObservableEmitter<Integer> e) throws Exception {

? ? ? ? e.onNext(1);

? ? ? ? e.onNext(2);

? ? ? ? e.onNext(3);

? ? ? ? e.onComplete();

? ? }

})

.doFinally(new Action() {

? ? @Override

? ? public void run() throws Exception {

? ? ? ? Log.d(TAG, "==================doFinally ");

? ? }

})

.doOnDispose(new Action() {

? ? @Override

? ? public void run() throws Exception {

? ? ? ? Log.d(TAG, "==================doOnDispose ");

? ? }

})

.doAfterTerminate(new Action() {

? ? @Override

? ? public void run() throws Exception {

? ? ? ? Log.d(TAG, "==================doAfterTerminate ");

? ? }

})

.subscribe(new Observer<Integer>() {

? ? private Disposable d;

? ? @Override

? ? public void onSubscribe(Disposable d) {

? ? ? ? Log.d(TAG, "==================onSubscribe ");

? ? ? ? this.d = d;

? ? }

? ? @Override

? ? public void onNext(Integer integer) {

? ? ? ? Log.d(TAG, "==================onNext " + integer);

? ? ? ? d.dispose();

? ? }

? ? @Override

? ? public void onError(Throwable e) {

? ? ? ? Log.d(TAG, "==================onError ");

? ? }

? ? @Override

? ? public void onComplete() {

? ? ? ? Log.d(TAG, "==================onComplete ");

? ? }

});

打庸舸浮:

==================onSubscribe

==================onNext 1

==================doOnDispose

==================doFinally

說明:在所有事件發(fā)送完畢之后回調(diào)該方法。doFinally() 和 doAfterTerminate() 到底有什么區(qū)別朦佩?區(qū)別就是在于取消訂閱并思,如果取消訂閱之后 doAfterTerminate() 就不會被回調(diào),而 doFinally() 無論怎么樣都會被回調(diào)语稠,且都會在事件序列的最后宋彼。可以看到如果調(diào)用了 dispose() 方法仙畦,doAfterTerminate() 不會被回調(diào)输涕。

現(xiàn)在試試把 dispose() 注釋掉看看

打印:

==================onSubscribe

==================onNext 1

==================onNext 2

==================onNext 3

==================onComplete

==================doAfterTerminate

==================doFinally

說明:doAfterTerminate() 已經(jīng)成功回調(diào)慨畸,doFinally() 還是會在事件序列的最后莱坎。

12、onErrorReturn

Observable.create(new ObservableOnSubscribe<Integer>() {

? ? @Override

? ? public void subscribe(ObservableEmitter<Integer> e) throws Exception {

? ? ? ? e.onNext(1);

? ? ? ? e.onNext(2);

? ? ? ? e.onNext(3);

? ? ? ? e.onError(new NullPointerException());

? ? }

})

.onErrorReturn(new Function<Throwable, Integer>() {

? ? @Override

? ? public Integer apply(Throwable throwable) throws Exception {

? ? ? ? Log.d(TAG, "==================onErrorReturn " + throwable);

? ? ? ? return 404;

? ? }

})

.subscribe(new Observer<Integer>() {

? ? @Override

? ? public void onSubscribe(Disposable d) {

? ? ? ? Log.d(TAG, "==================onSubscribe ");

? ? }

? ? @Override

? ? public void onNext(Integer integer) {

? ? ? ? Log.d(TAG, "==================onNext " + integer);

? ? }

? ? @Override

? ? public void onError(Throwable e) {

? ? ? ? Log.d(TAG, "==================onError ");

? ? }

? ? @Override

? ? public void onComplete() {

? ? ? ? Log.d(TAG, "==================onComplete ");

? ? }

});

打哟缡俊:

==================onSubscribe

==================onNext 1

==================onNext 2

==================onNext 3

==================onErrorReturn java.lang.NullPointerException

==================onNext 404

==================onComplete

說明:當接收到一個 onError() 事件之后回調(diào)檐什,返回的值會回調(diào) onNext() 方法,并正常結(jié)束該事件序列碉京。

13厢汹、onErrorResumeNext

Observable.create(new ObservableOnSubscribe<Integer>() {

? ? @Override

? ? public void subscribe(ObservableEmitter<Integer> e) throws Exception {

? ? ? ? e.onNext(1);

? ? ? ? e.onNext(2);

? ? ? ? e.onNext(3);

? ? ? ? e.onError(new NullPointerException());

? ? }

})

.onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {

? ? @Override

? ? public ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception {

? ? ? ? Log.d(TAG, "==================onErrorResumeNext " + throwable);

? ? ? ? return Observable.just(4, 5, 6);

? ? }

})

.subscribe(new Observer<Integer>() {

? ? @Override

? ? public void onSubscribe(Disposable d) {

? ? ? ? Log.d(TAG, "==================onSubscribe ");

? ? }

? ? @Override

? ? public void onNext(Integer integer) {

? ? ? ? Log.d(TAG, "==================onNext " + integer);

? ? }

? ? @Override

? ? public void onError(Throwable e) {

? ? ? ? Log.d(TAG, "==================onError ");

? ? }

? ? @Override

? ? public void onComplete() {

? ? ? ? Log.d(TAG, "==================onComplete ");

? ? }

});

打用睢:

==================onSubscribe

==================onNext 1

==================onNext 2

==================onNext 3

==================onErrorResumeNext java.lang.NullPointerException

==================onNext 4

==================onNext 5

==================onNext 6

==================onComplete

說明:當接收到 onError() 事件時谐宙,返回一個新的 Observable,并正常結(jié)束事件序列界弧。

14凡蜻、onExceptionResumeNext

Observable.create(new ObservableOnSubscribe<Integer>() {

? ? @Override

? ? public void subscribe(ObservableEmitter<Integer> e) throws Exception {

? ? ? ? e.onNext(1);

? ? ? ? e.onNext(2);

? ? ? ? e.onNext(3);

? ? ? ? e.onError(new Exception("404"));

? ? }

})

.onExceptionResumeNext(new Observable<Integer>() {

? ? @Override

? ? protected void subscribeActual(Observer<? super Integer> observer) {

? ? ? ? observer.onNext(333);

? ? ? ? observer.onComplete();

? ? }

})

.subscribe(new Observer<Integer>() {

? ? @Override

? ? public void onSubscribe(Disposable d) {

? ? ? ? Log.d(TAG, "==================onSubscribe ");

? ? }

? ? @Override

? ? public void onNext(Integer integer) {

? ? ? ? Log.d(TAG, "==================onNext " + integer);

? ? }

? ? @Override

? ? public void onError(Throwable e) {

? ? ? ? Log.d(TAG, "==================onError ");

? ? }

? ? @Override

? ? public void onComplete() {

? ? ? ? Log.d(TAG, "==================onComplete ");

? ? }

});

打印:

==================onSubscribe

==================onNext 1

==================onNext 2

==================onNext 3

==================onNext 333

==================onComplete

說明:與 onErrorResumeNext() 作用基本一致垢箕,但是這個方法只能捕捉 Exception划栓。

15、retry

Observable.create(new ObservableOnSubscribe<Integer>() {

? ? @Override

? ? public void subscribe(ObservableEmitter<Integer> e) throws Exception {

? ? ? ? e.onNext(1);

? ? ? ? e.onNext(2);

? ? ? ? e.onNext(3);

? ? ? ? e.onError(new Exception("404"));

? ? }

})

.retry(2)

.subscribe(new Observer<Integer>() {

? ? @Override

? ? public void onSubscribe(Disposable d) {

? ? ? ? Log.d(TAG, "==================onSubscribe ");

? ? }

? ? @Override

? ? public void onNext(Integer integer) {

? ? ? ? Log.d(TAG, "==================onNext " + integer);

? ? }

? ? @Override

? ? public void onError(Throwable e) {

? ? ? ? Log.d(TAG, "==================onError ");

? ? }

? ? @Override

? ? public void onComplete() {

? ? ? ? Log.d(TAG, "==================onComplete ");

? ? }

});

打犹趸瘛:

==================onSubscribe

==================onNext 1

==================onNext 2

==================onNext 3

==================onNext 1

==================onNext 2

==================onNext 3

==================onNext 1

==================onNext 2

==================onNext 3

==================onError

說明:如果出現(xiàn)錯誤事件忠荞,則會重新發(fā)送所有事件序列。times 是代表重新發(fā)的次數(shù)帅掘。

16委煤、retryUntil

Observable.create(new ObservableOnSubscribe < Integer > () {

? ? @Override

? ? public void subscribe(ObservableEmitter < Integer > e) throws Exception {

? ? ? ? e.onNext(1);

? ? ? ? e.onNext(2);

? ? ? ? e.onNext(3);

? ? ? ? e.onError(new Exception("404"));

? ? }

})

.retryUntil(new BooleanSupplier() {

? ? @Override

? ? public boolean getAsBoolean() throws Exception {

? ? ? ? if (i == 6) {

? ? ? ? ? ? return true;

? ? ? ? }

? ? ? ? return false;

? ? }

})

.subscribe(new Observer < Integer > () {

? ? @Override

? ? public void onSubscribe(Disposable d) {

? ? ? ? Log.d(TAG, "==================onSubscribe ");

? ? }

? ? @Override

? ? public void onNext(Integer integer) {

? ? ? ? i += integer;

? ? ? ? Log.d(TAG, "==================onNext " + integer);

? ? }

? ? @Override

? ? public void onError(Throwable e) {

? ? ? ? Log.d(TAG, "==================onError ");

? ? }

? ? @Override

? ? public void onComplete() {

? ? ? ? Log.d(TAG, "==================onComplete ");

? ? }

});

打印:

==================onSubscribe

==================onNext 1

==================onNext 2

==================onNext 3

==================onError

說明:出現(xiàn)錯誤事件之后修档,可以通過此方法判斷是否繼續(xù)發(fā)送事件碧绞。

17、retryWhen

Observable.create(new ObservableOnSubscribe < String > () {

? ? @Override

? ? public void subscribe(ObservableEmitter < String > e) throws Exception {

? ? ? ? e.onNext("chan");

? ? ? ? e.onNext("ze");

? ? ? ? e.onNext("de");

? ? ? ? e.onError(new Exception("404"));

? ? ? ? e.onNext("haha");

? ? }

})

.retryWhen(new Function < Observable < Throwable > , ObservableSource <? >> () {

? ? @Override

? ? public ObservableSource <? > apply(Observable < Throwable > throwableObservable) throws Exception {

? ? ? ? return throwableObservable.flatMap(new Function < Throwable, ObservableSource <? >> () {

? ? ? ? ? ? @Override

? ? ? ? ? ? public ObservableSource <? > apply(Throwable throwable) throws Exception {

? ? ? ? ? ? ? ? if(!throwable.toString().equals("java.lang.Exception: 404")) {

? ? ? ? ? ? ? ? ? ? return Observable.just("可以忽略的異常");

? ? ? ? ? ? ? ? } else {

? ? ? ? ? ? ? ? ? ? return Observable.error(new Throwable("終止啦"));

? ? ? ? ? ? ? ? }

? ? ? ? ? ? }

? ? ? ? });

? ? }

})

.subscribe(new Observer < String > () {

? ? @Override

? ? public void onSubscribe(Disposable d) {

? ? ? ? Log.d(TAG, "==================onSubscribe ");

? ? }

? ? @Override

? ? public void onNext(String s) {

? ? ? ? Log.d(TAG, "==================onNext " + s);

? ? }

? ? @Override

? ? public void onError(Throwable e) {

? ? ? ? Log.d(TAG, "==================onError " + e.toString());

? ? }

? ? @Override

? ? public void onComplete() {

? ? ? ? Log.d(TAG, "==================onComplete ");

? ? }

});

打又ㄎ选:

==================onSubscribe

==================onNext chan

==================onNext ze

==================onNext de

==================onError java.lang.Throwable: 終止啦

說明:當被觀察者接收到異臣チ冢或者錯誤事件時會回調(diào)該方法迫靖,這個方法會返回一個新的被觀察者。如果返回的被觀察者發(fā)送 Error 事件則之前的被觀察者不會繼續(xù)發(fā)送事件兴使,如果發(fā)送正常事件則之前的被觀察者會繼續(xù)不斷重試發(fā)送事件系宜。

將 onError(new Exception("404")) 改為 onError(new Exception("303"))

打印:

==================onNext chan

==================onNext ze

==================onNext de

==================onNext chan

==================onNext ze

==================onNext de

==================onNext chan

==================onNext ze

==================onNext de

==================onNext chan

==================onNext ze

==================onNext de

==================onNext chan

==================onNext ze

==================onNext de

==================onNext chan

......

說明:不斷的重復發(fā)送消息

18鲫惶、repeat

Observable.create(new ObservableOnSubscribe < Integer > () {

? ? @Override

? ? public void subscribe(ObservableEmitter < Integer > e) throws Exception {

? ? ? ? e.onNext(1);

? ? ? ? e.onNext(2);

? ? ? ? e.onNext(3);

? ? ? ? e.onComplete();

? ? }

})

.repeat(2)

.subscribe(new Observer < Integer > () {

? ? @Override

? ? public void onSubscribe(Disposable d) {

? ? ? ? Log.d(TAG, "===================onSubscribe ");

? ? }

? ? @Override

? ? public void onNext(Integer integer) {

? ? ? ? Log.d(TAG, "===================onNext " + integer);

? ? }

? ? @Override

? ? public void onError(Throwable e) {

? ? }

? ? @Override

? ? public void onComplete() {

? ? ? ? Log.d(TAG, "===================onComplete ");

? ? }

});

打域谑住:

===================onSubscribe

===================onNext 1

===================onNext 2

===================onNext 3

===================onNext 1

===================onNext 2

===================onNext 3

===================onComplete

說明:重復發(fā)送被觀察者的事件,times 為發(fā)送次數(shù)欠母。該事件發(fā)送了兩次欢策。

19、repeatWhen

Observable.create(new ObservableOnSubscribe < Integer > () {

? ? @Override

? ? public void subscribe(ObservableEmitter < Integer > e) throws Exception {

? ? ? ? e.onNext(1);

? ? ? ? e.onNext(2);

? ? ? ? e.onNext(3);

? ? ? ? e.onComplete();

? ? }

})

.repeatWhen(new Function < Observable < Object > , ObservableSource <? >> () {

? ? @Override

? ? public ObservableSource <? > apply(Observable < Object > objectObservable) throws Exception {

? ? ? ? return Observable.empty();

? ? //? return Observable.error(new Exception("404"));

? ? //? return Observable.just(4); null;

? ? }

})

.subscribe(new Observer < Integer > () {

? ? @Override

? ? public void onSubscribe(Disposable d) {

? ? ? ? Log.d(TAG, "===================onSubscribe ");

? ? }

? ? @Override

? ? public void onNext(Integer integer) {

? ? ? ? Log.d(TAG, "===================onNext " + integer);

? ? }

? ? @Override

? ? public void onError(Throwable e) {

? ? ? ? Log.d(TAG, "===================onError ");

? ? }

? ? @Override

? ? public void onComplete() {

? ? ? ? Log.d(TAG, "===================onComplete ");

? ? }

});

打由吞省:

===================onSubscribe

===================onComplete

說明:這個方法可以會返回一個新的被觀察者設定一定邏輯來決定是否重復發(fā)送事件踩寇。這里分三種情況,如果新的被觀察者返回 onComplete 或者 onError 事件六水,則舊的被觀察者不會繼續(xù)發(fā)送事件俺孙。如果被觀察者返回其他事件,則會重復發(fā)送事件掷贾。

下面直接看看發(fā)送 onError 事件和其他事件的打印結(jié)果睛榄。

===================onSubscribe

===================onError

發(fā)送其他事件的打印結(jié)果:

===================onSubscribe

===================onNext 1

===================onNext 2

===================onNext 3

===================onComplete

20、subscribeOn

Observable.create(new ObservableOnSubscribe < Integer > () {

? ? @Override

? ? public void subscribe(ObservableEmitter < Integer > e) throws Exception {

? ? ? ? Log.d(TAG, "=========================currentThread name: " + Thread.currentThread().getName());

? ? ? ? e.onNext(1);

? ? ? ? e.onNext(2);

? ? ? ? e.onNext(3);

? ? ? ? e.onComplete();

? ? }

})

//.subscribeOn(Schedulers.newThread())

.subscribe(new Observer < Integer > () {

? ? @Override

? ? public void onSubscribe(Disposable d) {

? ? ? ? Log.d(TAG, "======================onSubscribe");

? ? }

? ? @Override

? ? public void onNext(Integer integer) {

? ? ? ? Log.d(TAG, "======================onNext " + integer);

? ? }

? ? @Override

? ? public void onError(Throwable e) {

? ? ? ? Log.d(TAG, "======================onError");

? ? }

? ? @Override

? ? public void onComplete() {

? ? ? ? Log.d(TAG, "======================onComplete");

? ? }

});

打酉胨А:

======================onSubscribe

=========================currentThread name: main

======================onNext 1

======================onNext 2

======================onNext 3

======================onComplete

說明:上面的打印是不調(diào)用subscribeOn

打映⊙ァ:

======================onSubscribe

=========================currentThread name: RxNewThreadScheduler-1

======================onNext 1

======================onNext 2

======================onNext 3

======================onComplete

說明:調(diào)動了subscribeOn(Schedulers.newThread())

現(xiàn)在看看多次調(diào)用代碼如下:

Observable.create(new ObservableOnSubscribe < Integer > () {

? ? @Override

? ? public void subscribe(ObservableEmitter < Integer > e) throws Exception {

? ? ? ? Log.d(TAG, "=========================currentThread name: " + Thread.currentThread().getName());

? ? ? ? e.onNext(1);

? ? ? ? e.onNext(2);

? ? ? ? e.onNext(3);

? ? ? ? e.onComplete();

? ? }

})

.subscribeOn(Schedulers.computation())

.subscribeOn(Schedulers.newThread())

.subscribe(new Observer < Integer > () {@Override

? ? public void onSubscribe(Disposable d) {

? ? ? ? Log.d(TAG, "======================onSubscribe");

? ? }

? ? @Override

? ? public void onNext(Integer integer) {

? ? ? ? Log.d(TAG, "======================onNext " + integer);

? ? }

? ? @Override

? ? public void onError(Throwable e) {

? ? ? ? Log.d(TAG, "======================onError");

? ? }

? ? @Override

? ? public void onComplete() {

? ? ? ? Log.d(TAG, "======================onComplete");

? ? }

});

打印:

======================onSubscribe

=========================currentThread name: RxComputationThreadPool-1

======================onNext 1

======================onNext 2

======================onNext 3

======================onComplete

說明:可以看到第二次調(diào)動的 subscribeOn(Schedulers.newThread()) 并沒有效果港准。

21旨剥、observeOn

Observable.just(1, 2, 3)

.observeOn(Schedulers.newThread())

.flatMap(new Function < Integer, ObservableSource < String >> () {

? ? @Override

? ? public ObservableSource < String > apply(Integer integer) throws Exception {

? ? ? ? Log.d(TAG, "======================flatMap Thread name " + Thread.currentThread().getName());

? ? ? ? return Observable.just("chan" + integer);

? ? }

})

.observeOn(AndroidSchedulers.mainThread())

.subscribe(new Observer < String > () {

? ? @Override

? ? public void onSubscribe(Disposable d) {

? ? ? ? Log.d(TAG, "======================onSubscribe");

? ? }

? ? @Override

? ? public void onNext(String s) {

? ? ? ? Log.d(TAG, "======================onNext Thread name " + Thread.currentThread().getName());

? ? ? ? Log.d(TAG, "======================onNext " + s);

? ? }

? ? @Override

? ? public void onError(Throwable e) {

? ? ? ? Log.d(TAG, "======================onError");

? ? }

? ? @Override

? ? public void onComplete() {

? ? ? ? Log.d(TAG, "======================onComplete");

? ? }

});

打印:

======================onSubscribe

======================flatMap Thread name RxNewThreadScheduler-1

======================flatMap Thread name RxNewThreadScheduler-1

======================flatMap Thread name RxNewThreadScheduler-1

======================onNext Thread name main

======================onNext chan1

======================onNext Thread name main

======================onNext chan2

======================onNext Thread name main

======================onNext chan3

======================onComplete

說明:指定觀察者的線程浅缸,每指定一次就會生效一次轨帜。從打印結(jié)果可以知道,observeOn 成功切換了線程衩椒。

總結(jié)RxJava中的調(diào)度器:

Schedulers.computation(?)

用于使用計算任務蚌父,如事件循環(huán)和回調(diào)處理

Schedulers.immediate(?)

當前線程

Schedulers.io(?)

用于 IO 密集型任務,如果異步阻塞 IO 操作毛萌。

Schedulers.newThread(?)

創(chuàng)建一個新的線程

AndroidSchedulers.mainThread()

Android 的 UI 線程苟弛,用于操作 UI。

五朝聋、過濾操作符

1嗡午、filter

Observable.just(1, 2, 3)

? ? .filter(new Predicate < Integer > () {

? ? ? ? @Override

? ? ? ? public boolean test(Integer integer) throws Exception {

? ? ? ? ? ? return integer < 2;

? ? ? ? }

})

.subscribe(new Observer < Integer > () {

? ? @Override

? ? public void onSubscribe(Disposable d) {

? ? ? ? Log.d(TAG, "==================onSubscribe ");

? ? }

? ? @Override

? ? public void onNext(Integer integer) {

? ? ? ? i += integer;

? ? ? ? Log.d(TAG, "==================onNext " + integer);

? ? }

? ? @Override

? ? public void onError(Throwable e) {

? ? ? ? Log.d(TAG, "==================onError ");

? ? }

? ? @Override

? ? public void onComplete() {

? ? ? ? Log.d(TAG, "==================onComplete ");

? ? }

});

打印:

==================onSubscribe

==================onNext 1

==================onComplete

說明:通過一定邏輯來過濾被觀察者發(fā)送的事件冀痕,如果返回 true 則會發(fā)送事件荔睹,否則不會發(fā)送亿柑。以上代碼只有小于2的事件才會發(fā)送责语,

2孵户、ofType

Observable.just(1, 2, 3, "chan", "zhide")

.ofType(Integer.class)

.subscribe(new Observer < Integer > () {

? ? @Override

? ? public void onSubscribe(Disposable d) {

? ? ? ? Log.d(TAG, "==================onSubscribe ");

? ? }

? ? @Override

? ? public void onNext(Integer integer) {

? ? ? ? i += integer;

? ? ? ? Log.d(TAG, "==================onNext " + integer);

? ? }

? ? @Override

? ? public void onError(Throwable e) {

? ? ? ? Log.d(TAG, "==================onError ");

? ? }

? ? @Override

? ? public void onComplete() {

? ? ? ? Log.d(TAG, "==================onComplete ");

? ? }

});

打踊防俊:

==================onSubscribe

==================onNext 1

==================onNext 2

==================onNext 3

==================onComplete

說明:可以過濾不符合該類型事件

3、skip

Observable.just(1, 2, 3)

.skip(2)

.subscribe(new Observer < Integer > () {

? ? @Override

? ? public void onSubscribe(Disposable d) {

? ? ? ? Log.d(TAG, "==================onSubscribe ");

? ? }

? ? @Override

? ? public void onNext(Integer integer) {

? ? ? ? i += integer;

? ? ? ? Log.d(TAG, "==================onNext " + integer);

? ? }

? ? @Override

? ? public void onError(Throwable e) {

? ? ? ? Log.d(TAG, "==================onError ");

? ? }

? ? @Override

? ? public void onComplete() {

? ? ? ? Log.d(TAG, "==================onComplete ");

? ? }

});

打佣洲帧:

==================onSubscribe

==================onNext 3

==================onComplete

說明:跳過正序某些事件满哪,count 代表跳過事件的數(shù)量(skipLast() 作用也是跳過某些事件,不過它是用來跳過正序的后面的事件劝篷,這里就不再講解了哨鸭。)

4、distinct

Observable.just(1, 2, 3, 3, 2, 1)

.distinct()

.subscribe(new Observer < Integer > () {

? ? @Override

? ? public void onSubscribe(Disposable d) {

? ? ? ? Log.d(TAG, "==================onSubscribe ");

? ? }

? ? @Override

? ? public void onNext(Integer integer) {

? ? ? ? i += integer;

? ? ? ? Log.d(TAG, "==================onNext " + integer);

? ? }

? ? @Override

? ? public void onError(Throwable e) {

? ? ? ? Log.d(TAG, "==================onError ");

? ? }

? ? @Override

? ? public void onComplete() {

? ? ? ? Log.d(TAG, "==================onComplete ");

? ? }

});

打咏考恕:

==================onSubscribe

==================onNext 1

==================onNext 2

==================onNext 3

==================onComplete

說明:過濾事件序列中的重復事件像鸡。

5、distinctUntilChanged

Observable.just(1, 2, 3, 3, 2, 1)

.distinctUntilChanged()

.subscribe(new Observer < Integer > () {

? ? @Override

? ? public void onSubscribe(Disposable d) {

? ? ? ? Log.d(TAG, "==================onSubscribe ");

? ? }

? ? @Override

? ? public void onNext(Integer integer) {

? ? ? ? i += integer;

? ? ? ? Log.d(TAG, "==================onNext " + integer);

? ? }

? ? @Override

? ? public void onError(Throwable e) {

? ? ? ? Log.d(TAG, "==================onError ");

? ? }

? ? @Override

? ? public void onComplete() {

? ? ? ? Log.d(TAG, "==================onComplete ");

? ? }

});

打印:

==================onSubscribe

==================onNext 1

==================onNext 2

==================onNext 3

==================onNext 2

==================onNext 1

==================onComplete

說明:過濾掉連續(xù)重復的事件哈恰,因為事件序列中連續(xù)出現(xiàn)兩次3只估,所以第二次3并不會發(fā)出。

6着绷、take

Observable.just(1, 2, 3, 4, 5)

.take(3)

.subscribe(new Observer < Integer > () {

? ? @Override

? ? public void onSubscribe(Disposable d) {

? ? ? ? Log.d(TAG, "==================onSubscribe ");

? ? }

? ? @Override

? ? public void onNext(Integer integer) {

? ? ? ? i += integer;

? ? ? ? Log.d(TAG, "==================onNext " + integer);

? ? }

? ? @Override

? ? public void onError(Throwable e) {

? ? ? ? Log.d(TAG, "==================onError ");

? ? }

? ? @Override

? ? public void onComplete() {

? ? ? ? Log.d(TAG, "==================onComplete ");

? ? }

});

打踊赘啤:

==================onSubscribe

==================onNext 1

==================onNext 2

==================onNext 3

==================onComplete

說明:控制觀察者接收的事件的數(shù)量,takeLast() 的作用就是控制觀察者只能接受事件序列的后面幾件事情荠医,這里就不再講解了吁脱,大家可以自己試試。

7子漩、debounce

Observable.create(new ObservableOnSubscribe < Integer > () {

? ? @Override

? ? public void subscribe(ObservableEmitter < Integer > e) throws Exception {

? ? ? ? e.onNext(1);

? ? ? ? Thread.sleep(900);

? ? ? ? e.onNext(2);

? ? }

})

.debounce(1, TimeUnit.SECONDS)

.subscribe(new Observer < Integer > () {

? ? @Override

? ? public void onSubscribe(Disposable d) {

? ? ? ? Log.d(TAG, "===================onSubscribe ");

? ? }

? ? @Override

? ? public void onNext(Integer integer) {

? ? ? ? Log.d(TAG, "===================onNext " + integer);

? ? }

? ? @Override

? ? public void onError(Throwable e) {

? ? ? ? Log.d(TAG, "===================onError ");

? ? }

? ? @Override

? ? public void onComplete() {

? ? ? ? Log.d(TAG, "===================onComplete ");

? ? }

});

打釉バ:

===================onSubscribe

===================onNext 2

說明:如果兩件事件發(fā)送的時間間隔小于設定的時間間隔則前一件事件就不會發(fā)送給觀察者石洗。

可以看到事件1并沒有發(fā)送出去幢泼,現(xiàn)在將間隔時間改為1000,

打咏采馈:

===================onSubscribe

===================onNext 1

===================onNext 2

throttleWithTimeout() 與此方法的作用一樣缕棵,這里就不再贅述了。

8涉兽、firstElement() && lastElement()

Observable.just(1, 2, 3, 4)

.firstElement()

.subscribe(new Consumer < Integer > () {

? ? @Override

? ? public void accept(Integer integer) throws Exception {

? ? ? ? Log.d(TAG, "====================firstElement " + integer);

? ? }

});

Observable.just(1, 2, 3, 4)

.lastElement()

.subscribe(new Consumer < Integer > () {

? ? @Override

? ? public void accept(Integer integer) throws Exception {

? ? ? ? Log.d(TAG, "====================lastElement " + integer);

? ? }

});

打诱新俊:

====================firstElement 1

====================lastElement 4

說明:firstElement() 取事件序列的第一個元素,lastElement() 取事件序列的最后一個元素枷畏。

9别厘、elementAt() & elementAtOrError()

Observable.just(1, 2, 3, 4)

.elementAt(0)

.subscribe(new Consumer < Integer > () {

? ? @Override

? ? public void accept(Integer integer) throws Exception {

? ? ? ? Log.d(TAG, "====================accept " + integer);

? ? }

});

打印:

====================accept 1

說明:elementAt() 可以指定取出事件序列中事件拥诡,但是輸入的 index 超出事件序列的總數(shù)的話就不會出現(xiàn)任何結(jié)果触趴。這種情況下氮发,你想發(fā)出異常信息的話就用 elementAtOrError() 。

將 elementAt() 的值改為5冗懦,這時是沒有打印結(jié)果的爽冕,因為沒有滿足條件的元素。

替換 elementAt() 為 elementAtOrError()披蕉,代碼如下:

Observable.just(1, 2, 3, 4)

.elementAtOrError(5)

.subscribe(new Consumer < Integer > () {

? ? @Override

? ? public void accept(Integer integer) throws Exception {

? ? ? ? Log.d(TAG, "====================accept " + integer);

? ? }

});

打泳被:

io.reactivex.exceptions.OnErrorNotImplementedException

at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java: 704)

at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java: 701)

at io.reactivex.internal.observers.ConsumerSingleObserver.onError(ConsumerSingleObserver.java: 47)

at io.reactivex.internal.operators.observable.ObservableElementAtSingle$ElementAtObserver.onComplete(ObservableElementAtSingle.java: 117)

at io.reactivex.internal.operators.observable.ObservableFromArray$FromArrayDisposable.run(ObservableFromArray.java: 110)

at io.reactivex.internal.operators.observable.ObservableFromArray.subscribeActual(ObservableFromArray.java: 36)

at io.reactivex.Observable.subscribe(Observable.java: 10903)

at io.reactivex.internal.operators.observable.ObservableElementAtSingle.subscribeActual(ObservableElementAtSingle.java: 37)

at io.reactivex.Single.subscribe(Single.java: 2707)

at io.reactivex.Single.subscribe(Single.java: 2693)

at io.reactivex.Single.subscribe(Single.java: 2664)

at com.example.rxjavademo.MainActivity.onCreate(MainActivity.java: 103)

at android.app.Activity.performCreate(Activity.java: 6942)

at android.app.Instrumentation.callActivityOnCreate(Instrumentation.java: 1126)

at android.app.ActivityThread.performLaunchActivity(ActivityThread.java: 2880)

at android.app.ActivityThread.handleLaunchActivity(ActivityThread.java: 2988)

at android.app.ActivityThread. - wrap14(ActivityThread.java)

at android.app.ActivityThread$H.handleMessage(ActivityThread.java: 1631)

at android.os.Handler.dispatchMessage(Handler.java: 102)

at android.os.Looper.loop(Looper.java: 154)

at android.app.ActivityThread.main(ActivityThread.java: 6682)

at java.lang.reflect.Method.invoke(Native Method)

at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java: 1520)

at com.android.internal.os.ZygoteInit.main(ZygoteInit.java: 1410)

Caused by: java.util.NoSuchElementException

at io.reactivex.internal.operators.observable.ObservableElementAtSingle$ElementAtObserver.onComplete(ObservableElementAtSingle.java: 117)

at io.reactivex.internal.operators.observable.ObservableFromArray$FromArrayDisposable.run(ObservableFromArray.java: 110)

at io.reactivex.internal.operators.observable.ObservableFromArray.subscribeActual(ObservableFromArray.java: 36)

at io.reactivex.Observable.subscribe(Observable.java: 10903)

at io.reactivex.internal.operators.observable.ObservableElementAtSingle.subscribeActual(ObservableElementAtSingle.java: 37)

at io.reactivex.Single.subscribe(Single.java: 2707)

at io.reactivex.Single.subscribe(Single.java: 2693)

at io.reactivex.Single.subscribe(Single.java: 2664)

at com.example.rxjavademo.MainActivity.onCreate(MainActivity.java: 103)

at android.app.Activity.performCreate(Activity.java: 6942)

at android.app.Instrumentation.callActivityOnCreate(Instrumentation.java: 1126)

at android.app.ActivityThread.performLaunchActivity(ActivityThread.java: 2880)

at android.app.ActivityThread.handleLaunchActivity(ActivityThread.java: 2988)

at android.app.ActivityThread. - wrap14(ActivityThread.java)

at android.app.ActivityThread$H.handleMessage(ActivityThread.java: 1631)

at android.os.Handler.dispatchMessage(Handler.java: 102)

at android.os.Looper.loop(Looper.java: 154)

at android.app.ActivityThread.main(ActivityThread.java: 6682)

at java.lang.reflect.Method.invoke(Native Method)

at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java: 1520)

at com.android.internal.os.ZygoteInit.main(ZygoteInit.java: 1410)

說明:這時候會拋出 NoSuchElementException 異常。

六没讲、條件操作符

1眯娱、all

Observable.just(1, 2, 3, 4)

.all(new Predicate < Integer > () {

? ? @Override

? ? public boolean test(Integer integer) throws Exception {

? ? ? ? return integer < 5;

? ? }

})

.subscribe(new Consumer < Boolean > () {

? ? @Override

? ? public void accept(Boolean aBoolean) throws Exception {

? ? ? ? Log.d(TAG, "==================aBoolean " + aBoolean);

? ? }

});

打印:

==================aBoolean true

說明:判斷事件序列是否全部滿足某個事件爬凑,如果都滿足則返回 true困乒,反之則返回 false。

2贰谣、takeWhile

Observable.just(1, 2, 3, 4)

.takeWhile(new Predicate < Integer > () {

? ? @Override

? ? public boolean test(Integer integer) throws Exception {

? ? ? ? return integer < 3;

? ? }

})

.subscribe(new Consumer < Integer > () {

? ? @Override

? ? public void accept(Integer integer) throws Exception {

? ? ? ? Log.d(TAG, "========================integer " + integer);

? ? }

});

打幽嚷А:

========================integer 1

========================integer 2

說明:可以設置條件,當某個數(shù)據(jù)滿足條件時就會發(fā)送該數(shù)據(jù)吱抚,反之則不發(fā)送百宇。

3、skipWhile

Observable.just(1, 2, 3, 4)

.skipWhile(new Predicate < Integer > () {

? ? @Override

? ? public boolean test(Integer integer) throws Exception {

? ? ? ? return integer < 3;

? ? }

})

.subscribe(new Consumer < Integer > () {

? ? @Override

? ? public void accept(Integer integer) throws Exception {

? ? ? ? Log.d(TAG, "========================integer " + integer);

? ? }

});

打用乇:

========================integer 3

========================integer 4

說明:可以設置條件携御,當某個數(shù)據(jù)滿足條件時不發(fā)送該數(shù)據(jù),反之則發(fā)送既绕。

4啄刹、takeUntil

Observable.just(1, 2, 3, 4, 5, 6)

.takeUntil(new Predicate < Integer > () {

? ? @Override

? ? public boolean test(Integer integer) throws Exception {

? ? ? ? return integer > 3;

? ? }

})

.subscribe(new Consumer < Integer > () {

? ? @Override

? ? public void accept(Integer integer) throws Exception {

? ? ? ? Log.d(TAG, "========================integer " + integer);

? ? }

});

打印:

========================integer 1

========================integer 2

========================integer 3

========================integer 4

說明:可以設置條件凄贩,當事件滿足此條件時誓军,下一次的事件就不會被發(fā)送了。

5疲扎、skipUntil

Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS)

.skipUntil(Observable.intervalRange(6, 5, 3, 1, TimeUnit.SECONDS))

.subscribe(new Observer < Long > () {

? ? @Override

? ? public void onSubscribe(Disposable d) {

? ? ? ? Log.d(TAG, "========================onSubscribe ");

? ? }

? ? @Override

? ? public void onNext(Long along) {

? ? ? ? Log.d(TAG, "========================onNext " + along);

? ? }

? ? @Override

? ? public void onError(Throwable e) {

? ? ? ? Log.d(TAG, "========================onError ");

? ? }

? ? @Override

? ? public void onComplete() {

? ? ? ? Log.d(TAG, "========================onComplete ");

? ? }

});

打雨鞘薄:

========================onSubscribe

========================onNext 4

========================onNext 5

========================onComplete

說明:當 skipUntil() 中的 Observable 發(fā)送事件了,原來的 Observable 才會發(fā)送事件給觀察者椒丧。

6壹甥、sequenceEqual

Observable.sequenceEqual(Observable.just(1, 2, 3),

Observable.just(1, 2, 3))

.subscribe(new Consumer < Boolean > () {

? ? @Override

? ? public void accept(Boolean aBoolean) throws Exception {

? ? ? ? Log.d(TAG, "========================onNext " + aBoolean);

? ? }

});

打印:

========================onNext true

說明:判斷兩個 Observable 發(fā)送的事件是否相同壶熏。

7句柠、contains

Observable.just(1, 2, 3)

.contains(3)

.subscribe(new Consumer < Boolean > () {

? ? @Override

? ? public void accept(Boolean aBoolean) throws Exception {

? ? ? ? Log.d(TAG, "========================onNext " + aBoolean);

? ? }

});

打印:

========================onNext true

說明:判斷事件序列中是否含有某個元素,如果有則返回 true溯职,如果沒有則返回 false管怠。

8、isEmpty

Observable.create(new ObservableOnSubscribe < Integer > () {

? ? @Override

? ? public void subscribe(ObservableEmitter < Integer > e) throws Exception {

? ? ? ? e.onComplete();

? ? }

})

.isEmpty()

.subscribe(new Consumer < Boolean > () {

? ? @Override

? ? public void accept(Boolean aBoolean) throws Exception {

? ? ? ? Log.d(TAG, "========================onNext " + aBoolean);

? ? }

});

打痈组:

========================onNext true

說明:判斷事件序列是否為空渤弛。

9、amb

ArrayList < Observable < Long >> list = new ArrayList < > ();

list.add(Observable.intervalRange(1, 5, 2, 1, TimeUnit.SECONDS));

list.add(Observable.intervalRange(6, 5, 0, 1, TimeUnit.SECONDS));

Observable.amb(list)

.subscribe(new Consumer < Long > () {

? ? @Override

? ? public void accept(Long aLong) throws Exception {

? ? ? ? Log.d(TAG, "========================aLong " + aLong);

? ? }

});

打由醮:

========================aLong 6

========================aLong 7

========================aLong 8

========================aLong 9

========================aLong 10

說明:amb() 要傳入一個 Observable 集合她肯,但是只會發(fā)送最先發(fā)送事件的 Observable 中的事件,其余 Observable 將會被丟棄鹰贵。

10晴氨、Observable.create(new ObservableOnSubscribe < Integer > () {

? ? @Override

? ? public void subscribe(ObservableEmitter < Integer > e) throws Exception {

? ? ? ? e.onComplete();

? ? }

})

.defaultIfEmpty(666)

.subscribe(new Consumer < Integer > () {

? ? @Override

? ? public void accept(Integer integer) throws Exception {

? ? ? ? Log.d(TAG, "========================onNext " + integer);

? ? }

});

打印:

========================onNext 666

說明:如果被觀察者只發(fā)送一個 onComplete() 事件碉输,則可以利用這個方法發(fā)送一個值籽前。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市敷钾,隨后出現(xiàn)的幾起案子枝哄,更是在濱河造成了極大的恐慌,老刑警劉巖阻荒,帶你破解...
    沈念sama閱讀 211,948評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件挠锥,死亡現(xiàn)場離奇詭異,居然都是意外死亡侨赡,警方通過查閱死者的電腦和手機蓖租,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,371評論 3 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來羊壹,“玉大人蓖宦,你說我怎么就攤上這事∮兔ǎ” “怎么了稠茂?”我有些...
    開封第一講書人閱讀 157,490評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長眨攘。 經(jīng)常有香客問我主慰,道長嚣州,這世上最難降的妖魔是什么鲫售? 我笑而不...
    開封第一講書人閱讀 56,521評論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮该肴,結(jié)果婚禮上情竹,老公的妹妹穿的比我還像新娘。我一直安慰自己匀哄,他們只是感情好秦效,可當我...
    茶點故事閱讀 65,627評論 6 386
  • 文/花漫 我一把揭開白布雏蛮。 她就那樣靜靜地躺著,像睡著了一般阱州。 火紅的嫁衣襯著肌膚如雪挑秉。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,842評論 1 290
  • 那天苔货,我揣著相機與錄音犀概,去河邊找鬼。 笑死夜惭,一個胖子當著我的面吹牛姻灶,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播诈茧,決...
    沈念sama閱讀 38,997評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼产喉,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了敢会?” 一聲冷哼從身側(cè)響起曾沈,我...
    開封第一講書人閱讀 37,741評論 0 268
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎鸥昏,沒想到半個月后晦譬,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,203評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡互广,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,534評論 2 327
  • 正文 我和宋清朗相戀三年敛腌,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片惫皱。...
    茶點故事閱讀 38,673評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡像樊,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出旅敷,到底是詐尸還是另有隱情生棍,我是刑警寧澤,帶...
    沈念sama閱讀 34,339評論 4 330
  • 正文 年R本政府宣布媳谁,位于F島的核電站涂滴,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏晴音。R本人自食惡果不足惜柔纵,卻給世界環(huán)境...
    茶點故事閱讀 39,955評論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望锤躁。 院中可真熱鬧搁料,春花似錦、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,770評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至昭伸,卻和暖如春梧乘,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背庐杨。 一陣腳步聲響...
    開封第一講書人閱讀 32,000評論 1 266
  • 我被黑心中介騙來泰國打工宋下, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人辑莫。 一個月前我還...
    沈念sama閱讀 46,394評論 2 360
  • 正文 我出身青樓学歧,卻偏偏與公主長得像,于是被迫代替她去往敵國和親各吨。 傳聞我的和親對象是個殘疾皇子枝笨,可洞房花燭夜當晚...
    茶點故事閱讀 43,562評論 2 349

推薦閱讀更多精彩內(nèi)容