這個(gè)頁(yè)面展示的操作符可用于組合多個(gè) Observables。
Delay — 延時(shí)發(fā)射 Observable 的結(jié)果。
DelaySubscription — 延時(shí)處理訂閱請(qǐng)求。
DoOnEach — 注冊(cè)一個(gè)動(dòng)作,對(duì) Observable 發(fā)射的每個(gè)數(shù)據(jù)項(xiàng)使用。
DoOnComplete — 注冊(cè)一個(gè)動(dòng)作猾浦,對(duì)正常完成的 Observable 使用。
DoOnError — 注冊(cè)一個(gè)動(dòng)作灯抛,對(duì)發(fā)生錯(cuò)誤的 Observable 使用金赦。
DoOnTerminate — 注冊(cè)一個(gè)動(dòng)作,對(duì)完成的 Observable 使用对嚼,無(wú)論是否發(fā)生錯(cuò)誤夹抗。
DoOnSubscribe — 注冊(cè)一個(gè)動(dòng)作,在觀察者訂閱時(shí)使用猪半。
DoOnUnsubscribe — 注冊(cè)一個(gè)動(dòng)作兔朦,在觀察者取消訂閱時(shí)使用偷线。
Dematerialize — 將上面的結(jié)果逆轉(zhuǎn)回一個(gè) Observable
ObserveOn — 指定觀察者觀察 Observable 的調(diào)度器
Materialize — 將 Observable 轉(zhuǎn)換成一個(gè)通知列表
Serialize — 強(qiáng)制一個(gè) Observable 連續(xù)調(diào)用并保證行為正確
Subscribe — 操作來(lái)自 Observable 的發(fā)射物和通知。
SubscribeOn — 指定 Observable 執(zhí)行任務(wù)的調(diào)度器沽甥。
TimeInterval — 定期發(fā)射數(shù)據(jù)声邦。
Timeout - 對(duì)原始 Observable 的一個(gè)鏡像,如果過(guò)了一個(gè)指定的時(shí)長(zhǎng)仍沒(méi)有發(fā)射數(shù)據(jù)摆舟,它會(huì)發(fā)一個(gè)錯(cuò)誤通知亥曹。
Timestamp — 給 Observable 發(fā)射的每個(gè)數(shù)據(jù)項(xiàng)添加一個(gè)時(shí)間戳。
6.1 Delay
延遲一段指定的時(shí)間再發(fā)射來(lái)自 Observable 的請(qǐng)求恨诱。
![Delay](http://opgvsfix4.bkt.clouddn.com/rxjava_delay.c.png)
RxJava 的實(shí)現(xiàn)是 delay 和 delaySubscription媳瞪。不同之處在于 Delay 是延時(shí)數(shù)據(jù)的發(fā)射,而 DelaySubscription 是延時(shí)注冊(cè) Subscriber照宝。
6.1.1 Delay
![delay](http://opgvsfix4.bkt.clouddn.com/rxjava_delay.png)
示例代碼:
final long currentTimeMillis = System.currentTimeMillis();
Observable.range(1, 2).delay(2000, TimeUnit.MILLISECONDS).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
if (integer == 1) {
Log.e(TAG, "delay Time :" + (System.currentTimeMillis() - currentTimeMillis) + "");
}
Log.e(TAG, "accept:" + integer);
}
});
輸出結(jié)果:
delay Time :2408
accept:1
accept:2
6.1.2 delaySubscription
![delaySubscription](http://opgvsfix4.bkt.clouddn.com/rxjava_delaySubscription.png)
示例代碼:
final long currentTimeMillis = System.currentTimeMillis();
Observable.range(1, 2).delaySubscription(2, TimeUnit.SECONDS).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer aLong) throws Exception {
if (aLong == 1) {
Log.e(TAG, "delay Time :" + (System.currentTimeMillis() - currentTimeMillis) + "");
}
Log.e(TAG, "accept:" + aLong);
}
});
輸出結(jié)果:
delay Time :2500
accept:1
accept:2
6.2 Do
注冊(cè)一個(gè)動(dòng)作作為原始 Observable 生命周期事件的一種占位符蛇受。
![Do](http://opgvsfix4.bkt.clouddn.com/txjava_do.c.png)
Do 操作符就是給 Observable 的生命周期的各個(gè)階段加上一系列的回調(diào)監(jiān)聽,當(dāng) Observable 執(zhí)行到這個(gè)階段的時(shí)候厕鹃,這些回調(diào)就會(huì)被觸發(fā)兢仰。
在 Rxjava2.0 中實(shí)現(xiàn)了很多的 do 操作符的變體。
6.2.1 doAfterNext
實(shí)現(xiàn)方法:doAfterNext(Consumer)
從上流向下流發(fā)射后被調(diào)用剂碴。
示例代碼:
public static void demo_doAfterNext(){
Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
});
ob1.doAfterNext(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG,"doAfterNext="+integer);
}
}).subscribe(getNormalObserver());
}
public static Disposable mDisposable ;
//可重復(fù)使用
public static Observer<Integer> getNormalObserver(){
return new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
mDisposable = d;
}
@Override
public void onNext(@NonNull Integer integer) {
Log.e(TAG,"normal,onNext:"+integer);
}
@Override
public void onError(@NonNull Throwable error) {
Log.e(TAG,"normal,Error: " + error.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG,"normal,onComplete");
}
};
}
輸出結(jié)果:
normal,onNext:1
doAfterNext : 1
normal,onNext:2
doAfterNext : 2
normal,onNext:3
doAfterNext : 3
normal,onComplete
6.2.2 doAfterTerminate
![doAfterTerminate](http://opgvsfix4.bkt.clouddn.com/rxjava_doAfterTerminate.png)
實(shí)現(xiàn)方法: doAfterTerminate(Action)
注冊(cè)一個(gè) Action,當(dāng) Observable 調(diào)用 onComplete 或 onError 觸發(fā)把将。
示例代碼:
Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
// emitter.onError(new Throwable("nothingerro"));
}
});
ob1.doAfterTerminate(new Action() {
@Override
public void run() throws Exception {
Log.e(TAG,"doAfterTerminate run");
}
}).subscribe(getNormalObserver());
輸出結(jié)果:
normal,onNext:1
normal,onNext:2
normal,onComplete
doAfterTerminate run
6.2.3 doFinally
實(shí)現(xiàn)方法: doFinally(Action onDispose)
當(dāng) Observable 調(diào)用 onError 或 onCompleted 之后調(diào)用指定的操作,或由下游處理忆矛。
doFinally 優(yōu)先于 doAfterTerminate 的調(diào)用察蹲。
示例代碼:
Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
// emitter.onError(new Throwable("nothingerro"));
}
});
ob1.doFinally(new Action() {
@Override
public void run() throws Exception {
Log.e(TAG,"doFinally run");
}
}).subscribe(getNormalObserver());
輸出結(jié)果:
normal,onNext:1
normal,onNext:2
normal,onComplete
doFinally run
6.2.4 doOnDispose
![doOnDispose](http://opgvsfix4.bkt.clouddn.com/doOnUnsubscribe.png)
實(shí)現(xiàn)方法:doOnDispose(Action onDispose)
當(dāng) Observable 取消訂閱時(shí),它就會(huì)被調(diào)用催训。
示例代碼:
Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
//mDisposable 參考6.2.1
if (mDisposable != null) {
mDisposable.dispose();
}
emitter.onNext(2);
emitter.onComplete();
// emitter.onError(new Throwable("nothingerro"));
}
});
ob1.doOnDispose(new Action() {
@Override
public void run() throws Exception {
Log.e(TAG,"doOnDispose run");
}
}).subscribe(getNormalObserver());
輸出結(jié)果:
normal,onNext:1
doOnDispose run
6.2.5 doOnComplete
![doOnComplete](http://opgvsfix4.bkt.clouddn.com/doOnComplete.png)
當(dāng)它產(chǎn)生的 Observable 正常終止調(diào)用 onCompleted 時(shí)會(huì)被調(diào)用洽议。
Javadoc: doOnCompleted(Action)
示例代碼:
Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
// emitter.onError(new Throwable("nothingerror"));
emitter.onComplete();
}
});
ob1.doOnComplete(new Action() {
@Override
public void run() throws Exception {
Log.e(TAG, "doOnComplete run");
}
}).subscribe(getNormalObserver());
輸出結(jié)果:
normal,onNext:1
normal,onNext:2
doOnComplete run
normal,onComplete
6.2.6 doOnEach
![doOnEach](http://opgvsfix4.bkt.clouddn.com/rxjava_doOnEach.png)
doOnEach 操作符讓你可以注冊(cè)一個(gè)回調(diào),它產(chǎn)生的 Observable 每發(fā)射一項(xiàng)數(shù)據(jù)就會(huì)調(diào)用它一次瞳腌。不僅包括 onNext 還包括 onError 和 onCompleted绞铃。
示例代碼:
Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
// emitter.onError(new Throwable("nothingerror"));
emitter.onComplete();
}
});
ob1.doOnEach(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer integer) {
Log.e(TAG, "doOnEach,onNext:" + integer);
}
@Override
public void onError(@NonNull Throwable e) {
Log.e(TAG, "doOnEach,onError:" + e.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "doOnEach,onComplete");
}
}).subscribe(getNormalObserver());
輸出結(jié)果:
doOnEach,onNext:1
normal,onNext:1
doOnEach,onNext:2
normal,onNext:2
doOnEach,onComplete
normal,onComplete
6.2.7 doOnError
doOnError 操作符注冊(cè)一個(gè)動(dòng)作镜雨,當(dāng)它產(chǎn)生的 Observable 異常終止調(diào)用 onError 時(shí)會(huì)被調(diào)用嫂侍。
![doOnError](http://opgvsfix4.bkt.clouddn.com/rxjava_doOnError.png)
實(shí)現(xiàn)方法 doOnError(Consumer<? super Throwable>);
示例代碼:
Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onError(new Throwable("nothing error"));
emitter.onComplete();
}
});
ob1.doOnError(new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
Log.e(TAG,"doOnError : "+throwable.getMessage());
}
}).subscribe(getNormalObserver());
輸出結(jié)果:
normal,onNext:1
normal,onNext:2
doOnError : nothing error
normal,Error: nothing error
6.2.8 doOnLifecycle
調(diào)用相應(yīng)的 onXXX 方法(在所有 Observer 之間共享),用于序列的生命周期事件(訂閱荚坞,取消挑宠,請(qǐng)求)。
![doOnLifecycle](http://opgvsfix4.bkt.clouddn.com/doOnLifecycle.png)
示例代碼:
Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
// emitter.onError(new Throwable("nothing error"));
if (mDisposable != null) {
mDisposable.dispose();
}
emitter.onComplete();
}
});
ob1.doOnLifecycle(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
Log.e(TAG, "doOnLifecycle ,disposable:" + disposable);
}
}, new Action() {
@Override
public void run() throws Exception {
Log.e(TAG, "doOnLifecycle ,run");
}
}).subscribe(getNormalObserver());
輸出結(jié)果:
doOnLifecycle ,disposable:null
normal,onNext:1
normal,onNext:2
doOnLifecycle ,run
6.2.9 doOnNext
doOnNext操作符類似于 doOnEach(Consumer)颓影。
![doOnNext](http://opgvsfix4.bkt.clouddn.com/rxjava_doOnNext.png)
示例代碼:
Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
// emitter.onError(new Throwable("nothing error"));
emitter.onComplete();
}
});
ob1.doOnNext(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, "doOnNext ,onNext:"+integer);
}
}).subscribe(getNormalObserver());
輸出結(jié)果:
doOnNext ,onNext:1
normal,onNext:1
doOnNext ,onNext:2
normal,onNext:2
normal,onComplete
6.2.10 doOnSubscribe
doOnSubscribe各淀,當(dāng)觀察者訂閱它生成的 Observable 它就會(huì)被調(diào)用。
![doOnSubscribe](http://opgvsfix4.bkt.clouddn.com/rxjava_doOnSubscribe.png)
實(shí)踐:在 Observable 發(fā)射前做一些初始化操作(比如開始加載數(shù)據(jù)時(shí)顯示載入中界面)诡挂。
示例代碼:
Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
// emitter.onError(new Throwable("nothing error"));
emitter.onComplete();
}
});
ob1.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
Log.e(TAG, "doOnSubscribe,disposable:" + disposable);
}
}).subscribe(getNormalObserver());
輸出結(jié)果:
doOnSubscribe,disposable:null
normal,onNext:1
normal,onNext:2
normal,onComplete
6.2.11 doOnTerminate
doOnTerminate 操作符注冊(cè)一個(gè)動(dòng)作碎浇,當(dāng)它產(chǎn)生的 Observable 終止之前會(huì)被調(diào)用临谱,無(wú)論是正常還是異常終止。
![doOnTerminate](http://opgvsfix4.bkt.clouddn.com/rxjava_doOnTerminate.png)
實(shí)現(xiàn)方法:doOnTerminate(Action)
實(shí)踐:不管消息流最終以 onError() / onComplete() 結(jié)束奴璃,都會(huì)被調(diào)用(類似 Java 的 finally )悉默,對(duì)于某些需要 onError() / onComplete() 后都要執(zhí)行的操作(如網(wǎng)絡(luò)加載成功/失敗都要隱藏載入中界面),可以放在這里苟穆。
注意:取消訂閱時(shí)抄课,不會(huì)調(diào)用 doOnTerminate 方法。
示例代碼:
Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
// emitter.onError(new Throwable("nothing error"));
emitter.onComplete();
}
});
ob1.doOnTerminate(new Action() {
@Override
public void run() throws Exception {
Log.e(TAG,"doOnTerminate,run");
}
}).subscribe(getNormalObserver());
輸出結(jié)果:
normal,onNext:1
normal,onNext:2
doOnTerminate,run
normal,onComplete
6.2.12 onTerminateDetach
當(dāng)執(zhí)行了反注冊(cè) unsubscribes 或者發(fā)送數(shù)據(jù)序列中斷了雳旅,解除上游生產(chǎn)者對(duì)下游接受者的引用跟磨。
實(shí)踐:onTerminateDetach 會(huì)使 Observable 調(diào)用 UnSubscriber 時(shí),對(duì) Subscriber 的引用會(huì)被釋放攒盈,從而避免造成內(nèi)存泄漏抵拘。
6.3 Meterialize / Dematerialize
6.3.1 Materialize
Materialize 將數(shù)據(jù)項(xiàng)和事件通知都當(dāng)做數(shù)據(jù)項(xiàng)發(fā)射,Dematerialize 剛好相反型豁。
![Meterialize](http://opgvsfix4.bkt.clouddn.com/materialize.c.png)
一個(gè)合法的有限的 Obversable 將調(diào)用它的觀察者的 onNext 方法零次或多次仑濒,然后調(diào)用觀察者的 onCompleted 或 onError 僅一次。Materialize 操作符將這一系列調(diào)用偷遗,包括原來(lái)的 onNext 通知和終止通知 onCompleted 或 onError 都轉(zhuǎn)換為一個(gè) Observable 發(fā)射的數(shù)據(jù)序列墩瞳。
通俗一點(diǎn)的說(shuō)法:Meterialize 操作符將 OnNext / OnError / OnComplet e都轉(zhuǎn)化為一個(gè) Notification 對(duì)象并按照原來(lái)的順序發(fā)射出來(lái)。
示例代碼:
Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
// emitter.onError(new Throwable("love world"));
emitter.onComplete();
}
});
ob1.materialize().subscribe(new Consumer<Notification<Integer>>() {
@Override
public void accept(@NonNull Notification<Integer> in) throws Exception {
if (in.isOnNext()) {
Log.e(TAG, "materialize,onNext: " + in.isOnNext());
return;
}
if (in.isOnError()) {
Log.e(TAG, "materialize,onError: "+in.getError().getMessage());
return;
}
if (in.isOnComplete()) {
Log.e(TAG, "materialize,OnComplete");
return;
}
}
});
輸出結(jié)果:
materialize,onNext: true
materialize,onNext: true
materialize,OnComplete
6.3.2 Dematerialize
而 Dematerialize 執(zhí)行相反的過(guò)程氏豌。
![Dematerialize](http://opgvsfix4.bkt.clouddn.com/dematerialize.c.png)
示例代碼:
Observable<Notification<Integer>> ob1 = Observable.create(new ObservableOnSubscribe<Notification<Integer>>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Notification<Integer>> e) throws Exception {
e.onNext(Notification.createOnNext(1));
e.onNext(Notification.<Integer>createOnError(new Throwable("My error!")));
e.onNext(Notification.<Integer>createOnComplete());
}
});
ob1.dematerialize().subscribe(new Observer<Object>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Object o) {
Log.e(TAG, "onNext:" + o.toString());
}
@Override
public void onError(@NonNull Throwable e) {
Log.e(TAG, "onError:" + e.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete");
}
});
輸出結(jié)果:
onNext:1
onComplete
6.4 ObserveOn / SubscribeOn
指定一個(gè)觀察者在哪個(gè)調(diào)度器(線程)上觀察這個(gè) Observable喉酌。
![ObserveOn](http://opgvsfix4.bkt.clouddn.com/observeOn.c.png)
![SubscribeOn](http://opgvsfix4.bkt.clouddn.com/subscribeOn.c.png)
ObserverOn 用來(lái)指定觀察者所運(yùn)行的線程,也就是發(fā)射出的數(shù)據(jù)在那個(gè)線程上使用泵喘。
在 Android 中泪电,如果經(jīng)常會(huì)遇見這樣場(chǎng)景,我們需要從網(wǎng)絡(luò)中讀取數(shù)據(jù)纪铺,之后修改 UI 界面相速,觀察者就必須在主線程上運(yùn)行,就如同 AsyncTask 的 onPostExecute鲜锚。
.subscribeOn(Schedulers.io()) // 指定 subscribe() 發(fā)生在 IO 線程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調(diào)發(fā)生在主線程
注意:當(dāng)遇到一個(gè)異常時(shí) ObserveOn 會(huì)立即向前傳遞這個(gè) onError 終止通知突诬,它不會(huì)等待慢速消費(fèi)的 Observable 接受任何之前它已經(jīng)收到但還沒(méi)有發(fā)射的數(shù)據(jù)項(xiàng)。這可能意味著 onError 通知會(huì)跳到(并吞掉)原始 Observable 發(fā)射的數(shù)據(jù)項(xiàng)前面芜繁,正如下圖所示的旺隙。
![ObserveOn](http://opgvsfix4.bkt.clouddn.com/observeOn.e.png)
示例代碼:
/**
Schedulers.io() 代表 io 操作的線程, 通常用于網(wǎng)絡(luò),讀寫文件等 io 密集型的操作
Schedulers.computation() 代表 CPU 計(jì)算密集型的操作, 例如需要大量計(jì)算的操作
Schedulers.newThread() 代表一個(gè)常規(guī)的新線程
*/
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
Log.e(TAG, "subscribeOn:" + Thread.currentThread().getName());
emitter.onNext(1);
}
}).subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, "observerOn:" + Thread.currentThread().getName());
Log.e(TAG, "onNext:" +integer);
}
});
輸出結(jié)果:
subscribeOn:RxCachedThreadScheduler-1
observerOn:RxNewThreadScheduler-1
onNext:1
6.4.1 unsubscribeOn
修改原 Observable,以便訂閱者將其配置在指定的調(diào)度器(線程)上骏令。
示例代碼:
//將線程從 computation 換到 io 中
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
Log.e(TAG, "subscribeOn:" + Thread.currentThread().getName());
emitter.onNext(1);
}
}).subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.computation())
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, "doOnNext,observerOn:" + Thread.currentThread().getName());
Log.e(TAG, "doOnNext,onNext:" + integer);
}
})
.unsubscribeOn(Schedulers.computation())
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, "observerOn:" + Thread.currentThread().getName());
Log.e(TAG, "onNext:" + integer);
}
});
輸出結(jié)果:
subscribeOn:RxNewThreadScheduler-1
doOnNext,observerOn:RxComputationThreadPool-1
doOnNext,onNext:1
observerOn:RxComputationThreadPool-1
onNext:1
6.5 Serialize
強(qiáng)制一個(gè) Observable 連續(xù)調(diào)用并保證行為正確蔬捷。
![Serialize](http://opgvsfix4.bkt.clouddn.com/serialize.c.png)
一個(gè) Observable 可以異步調(diào)用它的觀察者的方法,可能是從不同的線程調(diào)用榔袋。這可能會(huì)讓 Observable 行為不正確周拐,它可能會(huì)在某一個(gè) onNext 調(diào)用之前嘗試調(diào)用 onCompleted 或 onError 方法铡俐,或者從兩個(gè)不同的線程同時(shí)調(diào)用 onNext 方法。使用 serialize 操作符妥粟,你可以糾正這個(gè) Observable 的行為高蜂,保證它的行為是正確的且是同步的。
6.6 TimeInterval
將一個(gè)發(fā)射數(shù)據(jù)的 Observable 轉(zhuǎn)換為發(fā)射那些數(shù)據(jù)發(fā)射時(shí)間間隔的 Observable罕容。
![TimeInterval](http://opgvsfix4.bkt.clouddn.com/timeInterval.c.png)
TimeInterval 操作符攔截原始 Observable 發(fā)射的數(shù)據(jù)項(xiàng)备恤,替換為兩個(gè)連續(xù)發(fā)射物之間流逝的時(shí)間長(zhǎng)度。 也就是說(shuō)這個(gè)使用這個(gè)操作符后發(fā)射的不再是原始數(shù)據(jù)锦秒,而是原始數(shù)據(jù)發(fā)射的時(shí)間間隔露泊。新的 Observable 的第一個(gè)發(fā)射物表示的是在觀察者訂閱原始 Observable 到原始 Observable 發(fā)射它的第一項(xiàng)數(shù)據(jù)之間流逝的時(shí)間長(zhǎng)度。 不存在與原始 Observable 發(fā)射最后一項(xiàng)數(shù)據(jù)和發(fā)射 onCompleted 通知之間時(shí)長(zhǎng)對(duì)應(yīng)的發(fā)射物旅择。timeInterval 默認(rèn)在 immediate 調(diào)度器上執(zhí)行惭笑,你可以通過(guò)傳參數(shù)修改。
示例代碼:
Observable.interval(100, TimeUnit.MILLISECONDS)
.take(3)
.timeInterval()
.subscribe(new Consumer<Timed<Long>>() {
@Override
public void accept(@NonNull Timed<Long> t) throws Exception {
Log.e(TAG, "onNext: " + t.value() + " , time = " + t.time());
}
});
輸出結(jié)果:
onNext: 0 , time = 104
onNext: 1 , time = 113
onNext: 2 , time = 100
6.7 Timeout
對(duì)原始 Observable 的一個(gè)鏡像生真,如果過(guò)了一個(gè)指定的時(shí)長(zhǎng)仍沒(méi)有發(fā)射數(shù)據(jù)沉噩,它會(huì)發(fā)一個(gè)錯(cuò)誤通知。
![Timeout](http://opgvsfix4.bkt.clouddn.com/timeout.c.png)
Timeout 操作符給 Observable 加上超時(shí)時(shí)間柱蟀,每發(fā)射一個(gè)數(shù)據(jù)后就重置計(jì)時(shí)器川蒙,當(dāng)超過(guò)預(yù)定的時(shí)間還沒(méi)有發(fā)射下一個(gè)數(shù)據(jù),就拋出一個(gè)超時(shí)的異常长已。
RxJava2.0 中的實(shí)現(xiàn)的 Timeout 操作符有好幾個(gè)變體:
- timeout(long,TimeUnit): 第一個(gè)變體接受一個(gè)時(shí)長(zhǎng)參數(shù)畜眨,每當(dāng)原始 Observable 發(fā)射了一項(xiàng)數(shù)據(jù),timeout就啟動(dòng)一個(gè)計(jì)時(shí)器术瓮,如果計(jì)時(shí)器超過(guò)了指定指定的時(shí)長(zhǎng)而原始 Observable 沒(méi)有發(fā)射另一項(xiàng)數(shù)據(jù)康聂,timeout 就拋出 TimeoutException,以一個(gè)錯(cuò)誤通知終止 Observable胞四。 這個(gè)timeout默認(rèn)在computation調(diào)度器上執(zhí)行恬汁,你可以通過(guò)參數(shù)指定其它的調(diào)度器。
- timeout(long,TimeUnit,Observable): 這個(gè)版本的 timeout 在超時(shí)時(shí)會(huì)切換到使用一個(gè)你指定的備用的 Observable辜伟,而不是發(fā)錯(cuò)誤通知氓侧。它也默認(rèn)在 computation 調(diào)度器上執(zhí)行。
- timeout(Function):這個(gè)版本的 timeout 使用一個(gè)函數(shù)針對(duì)原始 Observable 的每一項(xiàng)返回一個(gè) Observable游昼,如果當(dāng)這個(gè) Observable 終止時(shí)原始 Observable 還沒(méi)有發(fā)射另一項(xiàng)數(shù)據(jù)甘苍,就會(huì)認(rèn)為是超時(shí)了,timeout 就拋出 TimeoutException烘豌,以一個(gè)錯(cuò)誤通知終止 Observable。
- timeout(Function,Observable): 這個(gè)版本的 timeout 同時(shí)指定超時(shí)時(shí)長(zhǎng)和備用的 Observable看彼。它默認(rèn)在immediate調(diào)度器上執(zhí)行
示例代碼1:
/**
* 在 150 毫秒間隔內(nèi)如果沒(méi)有發(fā)射數(shù)據(jù)廊佩。發(fā)送一個(gè) TimeoutException 通知終止囚聚。
* */
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 5; i++) {
Thread.sleep(i * 100);
emitter.onNext(i);
}
emitter.onComplete();
}
})
.timeout(150, TimeUnit.MILLISECONDS)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer integer) {
Log.e(TAG, "onNext:" + integer);
}
@Override
public void onError(@NonNull Throwable e) {
Log.e(TAG, "onError:" + e.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete");
}
});
輸出結(jié)果:
onNext:0
onNext:1
onError:null
示例代碼 2:
/**
* 只接收 200 毫秒間隔內(nèi)發(fā)送的數(shù)據(jù),如果超時(shí)則切換到 Observable.just(100, 200)
* */
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 5; i++) {
Thread.sleep(i * 100);
emitter.onNext(i);
}
emitter.onComplete();
}
})
.timeout(200, TimeUnit.MILLISECONDS, Observable.just(100, 200))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, "accept:" + integer);
}
});
輸出結(jié)果:
accept:0
accept:1
accept:100
accept:200
6.8 Timestamp
給 Observable 發(fā)射的數(shù)據(jù)項(xiàng)附加一個(gè)時(shí)間戳标锄。
![Timestamp](http://opgvsfix4.bkt.clouddn.com/timestamp.c.png)
它將一個(gè)發(fā)射 T 類型數(shù)據(jù)的 Observable 轉(zhuǎn)換為一個(gè)發(fā)射類型為 Timestamped 的數(shù)據(jù)的 Observable顽铸,每一項(xiàng)都包含數(shù)據(jù)的發(fā)射時(shí)間。也就是把 Observable 發(fā)射的數(shù)據(jù)重新包裝了一下料皇,將數(shù)據(jù)發(fā)射的時(shí)間打包一起發(fā)射出去谓松,這樣觀察者不僅能得到數(shù)據(jù),還能得到數(shù)據(jù)的發(fā)射時(shí)間践剂。 timestamp 默認(rèn)在 immediate 調(diào)度器上執(zhí)行鬼譬,但是可以通過(guò)參數(shù)指定其它的調(diào)度器。
示例代碼:
Observable.range(1, 3)
.timestamp()
.subscribe(new Consumer<Timed<Integer>>() {
@Override
public void accept(@NonNull Timed<Integer> t) throws Exception {
Log.e(TAG, "accept ,onNext:" + t.value() + ",time = " + t.time());
}
});
輸出結(jié)果:
accept ,onNext:1,time = 1494606809418
accept ,onNext:2,time = 1494606809420
accept ,onNext:3,time = 1494606809420
6.9 Using
創(chuàng)建一個(gè)只在 Observable 生命周期內(nèi)存在的一次性資源.
![Using](http://opgvsfix4.bkt.clouddn.com/using.c.png)
當(dāng)一個(gè)觀察者訂閱 using 返回的 Observable 時(shí)逊脯,using 將會(huì)使用 Observable 工廠函數(shù)創(chuàng)建觀察者要觀察 Observable优质,同時(shí)使用資源工廠函數(shù)創(chuàng)建一個(gè)你想要?jiǎng)?chuàng)建的資源。當(dāng)觀察者取消訂閱這個(gè) Observable 時(shí)军洼,或者當(dāng)觀察者終止時(shí)(無(wú)論是正常終止還是因錯(cuò)誤而終止)巩螃,using 使用第三個(gè)函數(shù)釋放它創(chuàng)建的資源。
using 操作符接受三個(gè)參數(shù):
- 一個(gè)用戶創(chuàng)建一次性資源的工廠函數(shù)
- 一個(gè)用于創(chuàng)建 Observable 的工廠函數(shù)
- 一個(gè)用于釋放資源的函數(shù)
示例代碼:
Observable.using(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return new Random().nextInt(10);
}
}, new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
return Observable.just("hello+" + integer, "world+" + integer);
}
}, new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, "using,accept - >" + integer);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.e(TAG, "subscribe,accept -> " + s);
}
});
輸出結(jié)果:
subscribe,accept -> hello+8
subscribe,accept -> world+8
using,accept - >8
6.10 To
將 Observable 轉(zhuǎn)換為另一個(gè)對(duì)象或數(shù)據(jù)結(jié)構(gòu)匕争。
![To](http://opgvsfix4.bkt.clouddn.com/to.c.png)
ReactiveX 的很多語(yǔ)言特定實(shí)現(xiàn)都有一種操作符讓你可以將 Observable 或者 Observable 發(fā)射的數(shù)據(jù)序列轉(zhuǎn)換為另一個(gè)對(duì)象或數(shù)據(jù)結(jié)構(gòu)避乏。它們中的一些會(huì)阻塞直到 Observable 終止,然后生成一個(gè)等價(jià)的對(duì)象或數(shù)據(jù)結(jié)構(gòu)甘桑;另一些返回一個(gè)發(fā)射那個(gè)對(duì)象或數(shù)據(jù)結(jié)構(gòu)的 Observable淑际。
在某些 ReactiveX 實(shí)現(xiàn)中,還有一個(gè)操作符用于將 Observable 轉(zhuǎn)換成阻塞式的扇住。一個(gè)阻塞式的 Ogbservable 在普通的 Observable 的基礎(chǔ)上增加了幾個(gè)方法春缕,用于操作 Observable 發(fā)射的數(shù)據(jù)項(xiàng)。
RxJava2.x 中實(shí)現(xiàn)了多種 To 操作符:
6.10.1 To
示例代碼:
輸出結(jié)果:
6.10.2 toFuture
返回表示該 Observable 發(fā)出的單個(gè)值的 Future艘蹋。
如果 Observable 發(fā)出多個(gè)項(xiàng)目锄贼,F(xiàn)uture 將會(huì)收到一個(gè) IllegalArgumentException。 如果 Observable 為空女阀,F(xiàn)uture 將收到一個(gè) NoSuchElementException宅荤。
如果 Observable 可能會(huì)發(fā)出多個(gè)項(xiàng)目,請(qǐng)使用Observable.toList() 浸策、toBlocking() 冯键、toFuture()。
![toFuture](http://opgvsfix4.bkt.clouddn.com/B.toFuture.png)