RxJava2.0 操作符(6)—— Utility 輔助操作符

這個(gè)頁(yè)面展示的操作符可用于組合多個(gè) Observables。

Delay — 延時(shí)發(fā)射 Observable 的結(jié)果。
DelaySubscription — 延時(shí)處理訂閱請(qǐng)求。
DoOnEach — 注冊(cè)一個(gè)動(dòng)作,對(duì) Observable 發(fā)射的每個(gè)數(shù)據(jù)項(xiàng)使用。
DoOnComplete — 注冊(cè)一個(gè)動(dòng)作猾浦,對(duì)正常完成的 Observable 使用。
DoOnError — 注冊(cè)一個(gè)動(dòng)作灯抛,對(duì)發(fā)生錯(cuò)誤的 Observable 使用金赦。
DoOnTerminate — 注冊(cè)一個(gè)動(dòng)作,對(duì)完成的 Observable 使用对嚼,無(wú)論是否發(fā)生錯(cuò)誤夹抗。
DoOnSubscribe — 注冊(cè)一個(gè)動(dòng)作,在觀察者訂閱時(shí)使用猪半。
DoOnUnsubscribe — 注冊(cè)一個(gè)動(dòng)作兔朦,在觀察者取消訂閱時(shí)使用偷线。
Dematerialize — 將上面的結(jié)果逆轉(zhuǎn)回一個(gè) Observable
ObserveOn — 指定觀察者觀察 Observable 的調(diào)度器
Materialize — 將 Observable 轉(zhuǎn)換成一個(gè)通知列表
Serialize — 強(qiáng)制一個(gè) Observable 連續(xù)調(diào)用并保證行為正確
Subscribe — 操作來(lái)自 Observable 的發(fā)射物和通知。
SubscribeOn — 指定 Observable 執(zhí)行任務(wù)的調(diào)度器沽甥。
TimeInterval — 定期發(fā)射數(shù)據(jù)声邦。
Timeout - 對(duì)原始 Observable 的一個(gè)鏡像,如果過(guò)了一個(gè)指定的時(shí)長(zhǎng)仍沒(méi)有發(fā)射數(shù)據(jù)摆舟,它會(huì)發(fā)一個(gè)錯(cuò)誤通知亥曹。
Timestamp — 給 Observable 發(fā)射的每個(gè)數(shù)據(jù)項(xiàng)添加一個(gè)時(shí)間戳。

6.1 Delay

延遲一段指定的時(shí)間再發(fā)射來(lái)自 Observable 的請(qǐng)求恨诱。

Delay
Delay

RxJava 的實(shí)現(xiàn)是 delay 和 delaySubscription媳瞪。不同之處在于 Delay 是延時(shí)數(shù)據(jù)的發(fā)射,而 DelaySubscription 是延時(shí)注冊(cè) Subscriber照宝。

6.1.1 Delay

delay
delay

示例代碼:

final long currentTimeMillis = System.currentTimeMillis();
Observable.range(1, 2).delay(2000, TimeUnit.MILLISECONDS).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(@NonNull Integer integer) throws Exception {
        if (integer == 1) {
            Log.e(TAG, "delay Time :" + (System.currentTimeMillis() - currentTimeMillis) + "");
        }
        Log.e(TAG, "accept:" + integer);
    }
});

輸出結(jié)果:

delay Time :2408
accept:1
accept:2

6.1.2 delaySubscription

delaySubscription
delaySubscription

示例代碼:

final long currentTimeMillis = System.currentTimeMillis();
Observable.range(1, 2).delaySubscription(2, TimeUnit.SECONDS).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(@NonNull Integer aLong) throws Exception {
        if (aLong == 1) {
            Log.e(TAG, "delay Time :" + (System.currentTimeMillis() - currentTimeMillis) + "");
        }
        Log.e(TAG, "accept:" + aLong);
    }
});

輸出結(jié)果:

delay Time :2500
accept:1
accept:2

6.2 Do

注冊(cè)一個(gè)動(dòng)作作為原始 Observable 生命周期事件的一種占位符蛇受。

Do
Do

Do 操作符就是給 Observable 的生命周期的各個(gè)階段加上一系列的回調(diào)監(jiān)聽,當(dāng) Observable 執(zhí)行到這個(gè)階段的時(shí)候厕鹃,這些回調(diào)就會(huì)被觸發(fā)兢仰。
在 Rxjava2.0 中實(shí)現(xiàn)了很多的 do 操作符的變體。

6.2.1 doAfterNext

實(shí)現(xiàn)方法:doAfterNext(Consumer)
從上流向下流發(fā)射后被調(diào)用剂碴。

示例代碼:

public static void demo_doAfterNext(){
    Observable<Integer> ob1 =  Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
            emitter.onComplete();
        }
    });
    ob1.doAfterNext(new Consumer<Integer>() {
        @Override
        public void accept(@NonNull Integer integer) throws Exception {
            Log.e(TAG,"doAfterNext="+integer);
        }
    }).subscribe(getNormalObserver());
}


public static Disposable mDisposable ;
//可重復(fù)使用
public static Observer<Integer> getNormalObserver(){
    return new Observer<Integer>() {
        @Override
        public void onSubscribe(@NonNull Disposable d) {
            mDisposable = d;
        }

        @Override
        public void onNext(@NonNull Integer integer) {
            Log.e(TAG,"normal,onNext:"+integer);
        }

        @Override
        public void onError(@NonNull Throwable error) {
            Log.e(TAG,"normal,Error: " + error.getMessage());
        }

        @Override
        public void onComplete() {
            Log.e(TAG,"normal,onComplete");
        }
    };
}

輸出結(jié)果:

normal,onNext:1
doAfterNext : 1
normal,onNext:2
doAfterNext : 2
normal,onNext:3
doAfterNext : 3
normal,onComplete

6.2.2 doAfterTerminate

doAfterTerminate
doAfterTerminate

實(shí)現(xiàn)方法: doAfterTerminate(Action)

注冊(cè)一個(gè) Action,當(dāng) Observable 調(diào)用 onComplete 或 onError 觸發(fā)把将。

示例代碼:

Observable<Integer> ob1 =  Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onComplete();
//                emitter.onError(new Throwable("nothingerro"));
    }
});
ob1.doAfterTerminate(new Action() {
    @Override
    public void run() throws Exception {
        Log.e(TAG,"doAfterTerminate run");
    }
}).subscribe(getNormalObserver());

輸出結(jié)果:

normal,onNext:1
normal,onNext:2
normal,onComplete
doAfterTerminate run

6.2.3 doFinally

實(shí)現(xiàn)方法: doFinally(Action onDispose)

當(dāng) Observable 調(diào)用 onError 或 onCompleted 之后調(diào)用指定的操作,或由下游處理忆矛。
doFinally 優(yōu)先于 doAfterTerminate 的調(diào)用察蹲。

示例代碼:

Observable<Integer> ob1 =  Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onComplete();
//      emitter.onError(new Throwable("nothingerro"));
    }
});
ob1.doFinally(new Action() {
    @Override
    public void run() throws Exception {
        Log.e(TAG,"doFinally run");
    }
}).subscribe(getNormalObserver());

輸出結(jié)果:

normal,onNext:1
normal,onNext:2
normal,onComplete
doFinally run

6.2.4 doOnDispose

doOnDispose
doOnDispose

實(shí)現(xiàn)方法:doOnDispose(Action onDispose)

當(dāng) Observable 取消訂閱時(shí),它就會(huì)被調(diào)用催训。

示例代碼:

Observable<Integer> ob1 =  Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {

        emitter.onNext(1);
        //mDisposable 參考6.2.1
        if (mDisposable != null) {
            mDisposable.dispose();
        }
        emitter.onNext(2);
        emitter.onComplete();
//                emitter.onError(new Throwable("nothingerro"));
    }
});
ob1.doOnDispose(new Action() {
    @Override
    public void run() throws Exception {
        Log.e(TAG,"doOnDispose run");
    }
}).subscribe(getNormalObserver());

輸出結(jié)果:

normal,onNext:1
doOnDispose run

6.2.5 doOnComplete

doOnComplete
doOnComplete

當(dāng)它產(chǎn)生的 Observable 正常終止調(diào)用 onCompleted 時(shí)會(huì)被調(diào)用洽议。
Javadoc: doOnCompleted(Action)

示例代碼:

Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
//                emitter.onError(new Throwable("nothingerror"));
        emitter.onComplete();

    }
});
ob1.doOnComplete(new Action() {
    @Override
    public void run() throws Exception {
        Log.e(TAG, "doOnComplete run");
    }
}).subscribe(getNormalObserver());

輸出結(jié)果:

normal,onNext:1
normal,onNext:2
doOnComplete run
normal,onComplete

6.2.6 doOnEach

doOnEach
doOnEach

doOnEach 操作符讓你可以注冊(cè)一個(gè)回調(diào),它產(chǎn)生的 Observable 每發(fā)射一項(xiàng)數(shù)據(jù)就會(huì)調(diào)用它一次瞳腌。不僅包括 onNext 還包括 onError 和 onCompleted绞铃。

示例代碼:

 Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
//                emitter.onError(new Throwable("nothingerror"));
        emitter.onComplete();

    }
});
ob1.doOnEach(new Observer<Integer>() {
    @Override
    public void onSubscribe(@NonNull Disposable d) {

    }

    @Override
    public void onNext(@NonNull Integer integer) {
        Log.e(TAG, "doOnEach,onNext:" + integer);
    }

    @Override
    public void onError(@NonNull Throwable e) {
        Log.e(TAG, "doOnEach,onError:" + e.getMessage());
    }

    @Override
    public void onComplete() {
        Log.e(TAG, "doOnEach,onComplete");
    }
}).subscribe(getNormalObserver());

輸出結(jié)果:

doOnEach,onNext:1
normal,onNext:1
doOnEach,onNext:2
normal,onNext:2
doOnEach,onComplete
normal,onComplete

6.2.7 doOnError

doOnError 操作符注冊(cè)一個(gè)動(dòng)作镜雨,當(dāng)它產(chǎn)生的 Observable 異常終止調(diào)用 onError 時(shí)會(huì)被調(diào)用嫂侍。

doOnError
doOnError

實(shí)現(xiàn)方法 doOnError(Consumer<? super Throwable>);

示例代碼:

Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onError(new Throwable("nothing error"));
        emitter.onComplete();

    }
});
ob1.doOnError(new Consumer<Throwable>() {
    @Override
    public void accept(@NonNull Throwable throwable) throws Exception {
        Log.e(TAG,"doOnError : "+throwable.getMessage());
    }
}).subscribe(getNormalObserver());

輸出結(jié)果:

normal,onNext:1
normal,onNext:2
doOnError : nothing error
normal,Error: nothing error

6.2.8 doOnLifecycle

調(diào)用相應(yīng)的 onXXX 方法(在所有 Observer 之間共享),用于序列的生命周期事件(訂閱荚坞,取消挑宠,請(qǐng)求)。


doOnLifecycle
doOnLifecycle

示例代碼:

Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
//                emitter.onError(new Throwable("nothing error"));
        if (mDisposable != null) {
            mDisposable.dispose();
        }
        emitter.onComplete();

    }
});
ob1.doOnLifecycle(new Consumer<Disposable>() {
    @Override
    public void accept(@NonNull Disposable disposable) throws Exception {
        Log.e(TAG, "doOnLifecycle ,disposable:" + disposable);
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        Log.e(TAG, "doOnLifecycle ,run");
    }
}).subscribe(getNormalObserver());

輸出結(jié)果:

doOnLifecycle ,disposable:null
normal,onNext:1
normal,onNext:2
doOnLifecycle ,run

6.2.9 doOnNext

doOnNext操作符類似于 doOnEach(Consumer)颓影。

doOnNext
doOnNext

示例代碼:

Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
//      emitter.onError(new Throwable("nothing error"));
        emitter.onComplete();
    }
});
ob1.doOnNext(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
    Log.e(TAG, "doOnNext ,onNext:"+integer);
}
}).subscribe(getNormalObserver());

輸出結(jié)果:

doOnNext ,onNext:1
normal,onNext:1
doOnNext ,onNext:2
normal,onNext:2
normal,onComplete

6.2.10 doOnSubscribe

doOnSubscribe各淀,當(dāng)觀察者訂閱它生成的 Observable 它就會(huì)被調(diào)用。

doOnSubscribe
doOnSubscribe

實(shí)踐:在 Observable 發(fā)射前做一些初始化操作(比如開始加載數(shù)據(jù)時(shí)顯示載入中界面)诡挂。

示例代碼:

Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
//      emitter.onError(new Throwable("nothing error"));
        emitter.onComplete();

    }
});
ob1.doOnSubscribe(new Consumer<Disposable>() {
    @Override
    public void accept(@NonNull Disposable disposable) throws Exception {
        Log.e(TAG, "doOnSubscribe,disposable:" + disposable);
    }
}).subscribe(getNormalObserver());

輸出結(jié)果:

doOnSubscribe,disposable:null
normal,onNext:1
normal,onNext:2
normal,onComplete

6.2.11 doOnTerminate

doOnTerminate 操作符注冊(cè)一個(gè)動(dòng)作碎浇,當(dāng)它產(chǎn)生的 Observable 終止之前會(huì)被調(diào)用临谱,無(wú)論是正常還是異常終止。

doOnTerminate
doOnTerminate

實(shí)現(xiàn)方法:doOnTerminate(Action)
實(shí)踐:不管消息流最終以 onError() / onComplete() 結(jié)束奴璃,都會(huì)被調(diào)用(類似 Java 的 finally )悉默,對(duì)于某些需要 onError() / onComplete() 后都要執(zhí)行的操作(如網(wǎng)絡(luò)加載成功/失敗都要隱藏載入中界面),可以放在這里苟穆。

注意:取消訂閱時(shí)抄课,不會(huì)調(diào)用 doOnTerminate 方法。

示例代碼:

Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
//      emitter.onError(new Throwable("nothing error"));
        emitter.onComplete();

    }
});
ob1.doOnTerminate(new Action() {
    @Override
    public void run() throws Exception {
        Log.e(TAG,"doOnTerminate,run");
    }
}).subscribe(getNormalObserver());

輸出結(jié)果:

normal,onNext:1
normal,onNext:2
doOnTerminate,run
normal,onComplete

6.2.12 onTerminateDetach

當(dāng)執(zhí)行了反注冊(cè) unsubscribes 或者發(fā)送數(shù)據(jù)序列中斷了雳旅,解除上游生產(chǎn)者對(duì)下游接受者的引用跟磨。
實(shí)踐:onTerminateDetach 會(huì)使 Observable 調(diào)用 UnSubscriber 時(shí),對(duì) Subscriber 的引用會(huì)被釋放攒盈,從而避免造成內(nèi)存泄漏抵拘。

6.3 Meterialize / Dematerialize

6.3.1 Materialize

Materialize 將數(shù)據(jù)項(xiàng)和事件通知都當(dāng)做數(shù)據(jù)項(xiàng)發(fā)射,Dematerialize 剛好相反型豁。


Meterialize
Meterialize

一個(gè)合法的有限的 Obversable 將調(diào)用它的觀察者的 onNext 方法零次或多次仑濒,然后調(diào)用觀察者的 onCompleted 或 onError 僅一次。Materialize 操作符將這一系列調(diào)用偷遗,包括原來(lái)的 onNext 通知和終止通知 onCompleted 或 onError 都轉(zhuǎn)換為一個(gè) Observable 發(fā)射的數(shù)據(jù)序列墩瞳。
通俗一點(diǎn)的說(shuō)法:Meterialize 操作符將 OnNext / OnError / OnComplet e都轉(zhuǎn)化為一個(gè) Notification 對(duì)象并按照原來(lái)的順序發(fā)射出來(lái)。

示例代碼:

Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
//      emitter.onError(new Throwable("love world"));
        emitter.onComplete();
    }
});

ob1.materialize().subscribe(new Consumer<Notification<Integer>>() {
    @Override
    public void accept(@NonNull Notification<Integer> in) throws Exception {
        if (in.isOnNext()) {
            Log.e(TAG, "materialize,onNext: " + in.isOnNext());
            return;
        }
        if (in.isOnError()) {
            Log.e(TAG, "materialize,onError: "+in.getError().getMessage());
            return;
        }
        if (in.isOnComplete()) {
            Log.e(TAG, "materialize,OnComplete");
            return;
        }
    }
});

輸出結(jié)果:

materialize,onNext: true
materialize,onNext: true
materialize,OnComplete

6.3.2 Dematerialize

而 Dematerialize 執(zhí)行相反的過(guò)程氏豌。


Dematerialize
Dematerialize

示例代碼:

Observable<Notification<Integer>> ob1 = Observable.create(new ObservableOnSubscribe<Notification<Integer>>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Notification<Integer>> e) throws Exception {
        e.onNext(Notification.createOnNext(1));
        e.onNext(Notification.<Integer>createOnError(new Throwable("My error!")));
        e.onNext(Notification.<Integer>createOnComplete());
        
    }
});
ob1.dematerialize().subscribe(new Observer<Object>() {
    @Override
    public void onSubscribe(@NonNull Disposable d) {

    }

    @Override
    public void onNext(@NonNull Object o) {
        Log.e(TAG, "onNext:" + o.toString());
    }

    @Override
    public void onError(@NonNull Throwable e) {
        Log.e(TAG, "onError:" + e.getMessage());
    }

    @Override
    public void onComplete() {
        Log.e(TAG, "onComplete");
    }
});

輸出結(jié)果:

onNext:1
onComplete

6.4 ObserveOn / SubscribeOn

指定一個(gè)觀察者在哪個(gè)調(diào)度器(線程)上觀察這個(gè) Observable喉酌。


ObserveOn
ObserveOn

SubscribeOn
SubscribeOn

ObserverOn 用來(lái)指定觀察者所運(yùn)行的線程,也就是發(fā)射出的數(shù)據(jù)在那個(gè)線程上使用泵喘。
在 Android 中泪电,如果經(jīng)常會(huì)遇見這樣場(chǎng)景,我們需要從網(wǎng)絡(luò)中讀取數(shù)據(jù)纪铺,之后修改 UI 界面相速,觀察者就必須在主線程上運(yùn)行,就如同 AsyncTask 的 onPostExecute鲜锚。

.subscribeOn(Schedulers.io()) // 指定 subscribe() 發(fā)生在 IO 線程  
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調(diào)發(fā)生在主線程  

注意:當(dāng)遇到一個(gè)異常時(shí) ObserveOn 會(huì)立即向前傳遞這個(gè) onError 終止通知突诬,它不會(huì)等待慢速消費(fèi)的 Observable 接受任何之前它已經(jīng)收到但還沒(méi)有發(fā)射的數(shù)據(jù)項(xiàng)。這可能意味著 onError 通知會(huì)跳到(并吞掉)原始 Observable 發(fā)射的數(shù)據(jù)項(xiàng)前面芜繁,正如下圖所示的旺隙。


ObserveOn
ObserveOn

示例代碼:

/**
 Schedulers.io() 代表 io 操作的線程, 通常用于網(wǎng)絡(luò),讀寫文件等 io 密集型的操作
 Schedulers.computation() 代表 CPU 計(jì)算密集型的操作, 例如需要大量計(jì)算的操作
 Schedulers.newThread() 代表一個(gè)常規(guī)的新線程
 */
Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
        Log.e(TAG, "subscribeOn:" + Thread.currentThread().getName());
        emitter.onNext(1);
    }
}).subscribeOn(Schedulers.io())
        .observeOn(Schedulers.newThread())
        .subscribe(new Consumer<Integer>() {
    @Override
    public void accept(@NonNull Integer integer) throws Exception {
        Log.e(TAG, "observerOn:" + Thread.currentThread().getName());
        Log.e(TAG, "onNext:" +integer);
    }
});

輸出結(jié)果:

subscribeOn:RxCachedThreadScheduler-1
observerOn:RxNewThreadScheduler-1
onNext:1

6.4.1 unsubscribeOn

修改原 Observable,以便訂閱者將其配置在指定的調(diào)度器(線程)上骏令。

示例代碼:

//將線程從 computation 換到 io 中
Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
    
            Log.e(TAG, "subscribeOn:" + Thread.currentThread().getName());
            emitter.onNext(1);
        }
    }).subscribeOn(Schedulers.newThread())
    .observeOn(Schedulers.computation())
    .doOnNext(new Consumer<Integer>() {
        @Override
        public void accept(@NonNull Integer integer) throws Exception {
            Log.e(TAG, "doOnNext,observerOn:" + Thread.currentThread().getName());
            Log.e(TAG, "doOnNext,onNext:" + integer);
        }
    })
    .unsubscribeOn(Schedulers.computation())
    .subscribeOn(Schedulers.io())
    .subscribe(new Consumer<Integer>() {
        @Override
        public void accept(@NonNull Integer integer) throws Exception {
            Log.e(TAG, "observerOn:" + Thread.currentThread().getName());
            Log.e(TAG, "onNext:" + integer);
        }
    });

輸出結(jié)果:

subscribeOn:RxNewThreadScheduler-1
doOnNext,observerOn:RxComputationThreadPool-1
doOnNext,onNext:1
observerOn:RxComputationThreadPool-1
onNext:1

6.5 Serialize

強(qiáng)制一個(gè) Observable 連續(xù)調(diào)用并保證行為正確蔬捷。

Serialize
Serialize

一個(gè) Observable 可以異步調(diào)用它的觀察者的方法,可能是從不同的線程調(diào)用榔袋。這可能會(huì)讓 Observable 行為不正確周拐,它可能會(huì)在某一個(gè) onNext 調(diào)用之前嘗試調(diào)用 onCompleted 或 onError 方法铡俐,或者從兩個(gè)不同的線程同時(shí)調(diào)用 onNext 方法。使用 serialize 操作符妥粟,你可以糾正這個(gè) Observable 的行為高蜂,保證它的行為是正確的且是同步的。

6.6 TimeInterval

將一個(gè)發(fā)射數(shù)據(jù)的 Observable 轉(zhuǎn)換為發(fā)射那些數(shù)據(jù)發(fā)射時(shí)間間隔的 Observable罕容。

TimeInterval
TimeInterval

TimeInterval 操作符攔截原始 Observable 發(fā)射的數(shù)據(jù)項(xiàng)备恤,替換為兩個(gè)連續(xù)發(fā)射物之間流逝的時(shí)間長(zhǎng)度。 也就是說(shuō)這個(gè)使用這個(gè)操作符后發(fā)射的不再是原始數(shù)據(jù)锦秒,而是原始數(shù)據(jù)發(fā)射的時(shí)間間隔露泊。新的 Observable 的第一個(gè)發(fā)射物表示的是在觀察者訂閱原始 Observable 到原始 Observable 發(fā)射它的第一項(xiàng)數(shù)據(jù)之間流逝的時(shí)間長(zhǎng)度。 不存在與原始 Observable 發(fā)射最后一項(xiàng)數(shù)據(jù)和發(fā)射 onCompleted 通知之間時(shí)長(zhǎng)對(duì)應(yīng)的發(fā)射物旅择。timeInterval 默認(rèn)在 immediate 調(diào)度器上執(zhí)行惭笑,你可以通過(guò)傳參數(shù)修改。

示例代碼:

Observable.interval(100, TimeUnit.MILLISECONDS)
    .take(3)
    .timeInterval()
    .subscribe(new Consumer<Timed<Long>>() {
        @Override
        public void accept(@NonNull Timed<Long> t) throws Exception {
            Log.e(TAG, "onNext: " + t.value() + " , time = " + t.time());
        }
    });

輸出結(jié)果:

onNext: 0 , time = 104
onNext: 1 , time = 113
onNext: 2 , time = 100

6.7 Timeout

對(duì)原始 Observable 的一個(gè)鏡像生真,如果過(guò)了一個(gè)指定的時(shí)長(zhǎng)仍沒(méi)有發(fā)射數(shù)據(jù)沉噩,它會(huì)發(fā)一個(gè)錯(cuò)誤通知。


Timeout
Timeout

Timeout 操作符給 Observable 加上超時(shí)時(shí)間柱蟀,每發(fā)射一個(gè)數(shù)據(jù)后就重置計(jì)時(shí)器川蒙,當(dāng)超過(guò)預(yù)定的時(shí)間還沒(méi)有發(fā)射下一個(gè)數(shù)據(jù),就拋出一個(gè)超時(shí)的異常长已。
RxJava2.0 中的實(shí)現(xiàn)的 Timeout 操作符有好幾個(gè)變體:

  • timeout(long,TimeUnit): 第一個(gè)變體接受一個(gè)時(shí)長(zhǎng)參數(shù)畜眨,每當(dāng)原始 Observable 發(fā)射了一項(xiàng)數(shù)據(jù),timeout就啟動(dòng)一個(gè)計(jì)時(shí)器术瓮,如果計(jì)時(shí)器超過(guò)了指定指定的時(shí)長(zhǎng)而原始 Observable 沒(méi)有發(fā)射另一項(xiàng)數(shù)據(jù)康聂,timeout 就拋出 TimeoutException,以一個(gè)錯(cuò)誤通知終止 Observable胞四。 這個(gè)timeout默認(rèn)在computation調(diào)度器上執(zhí)行恬汁,你可以通過(guò)參數(shù)指定其它的調(diào)度器。
  • timeout(long,TimeUnit,Observable): 這個(gè)版本的 timeout 在超時(shí)時(shí)會(huì)切換到使用一個(gè)你指定的備用的 Observable辜伟,而不是發(fā)錯(cuò)誤通知氓侧。它也默認(rèn)在 computation 調(diào)度器上執(zhí)行。
  • timeout(Function):這個(gè)版本的 timeout 使用一個(gè)函數(shù)針對(duì)原始 Observable 的每一項(xiàng)返回一個(gè) Observable游昼,如果當(dāng)這個(gè) Observable 終止時(shí)原始 Observable 還沒(méi)有發(fā)射另一項(xiàng)數(shù)據(jù)甘苍,就會(huì)認(rèn)為是超時(shí)了,timeout 就拋出 TimeoutException烘豌,以一個(gè)錯(cuò)誤通知終止 Observable。
  • timeout(Function,Observable): 這個(gè)版本的 timeout 同時(shí)指定超時(shí)時(shí)長(zhǎng)和備用的 Observable看彼。它默認(rèn)在immediate調(diào)度器上執(zhí)行

示例代碼1:

/**
 * 在 150 毫秒間隔內(nèi)如果沒(méi)有發(fā)射數(shù)據(jù)廊佩。發(fā)送一個(gè) TimeoutException 通知終止囚聚。
 * */
Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
            for (int i = 0; i < 5; i++) {
                Thread.sleep(i * 100);
                emitter.onNext(i);
            }
            emitter.onComplete();
        }
    })
    .timeout(150, TimeUnit.MILLISECONDS)
    .subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(@NonNull Disposable d) {
            
        }
    
        @Override
        public void onNext(@NonNull Integer integer) {
            Log.e(TAG, "onNext:" + integer);
        }
    
        @Override
        public void onError(@NonNull Throwable e) {
            Log.e(TAG, "onError:" + e.getMessage());
        }
    
        @Override
        public void onComplete() {
            Log.e(TAG, "onComplete");
        }
    });

輸出結(jié)果:

onNext:0
onNext:1
onError:null

示例代碼 2:

 /**
 * 只接收 200 毫秒間隔內(nèi)發(fā)送的數(shù)據(jù),如果超時(shí)則切換到 Observable.just(100, 200)
 * */
Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
            for (int i = 0; i < 5; i++) {
                Thread.sleep(i * 100);
                emitter.onNext(i);
            }
            emitter.onComplete();
        }
    })
    .timeout(200, TimeUnit.MILLISECONDS, Observable.just(100, 200))
    .subscribe(new Consumer<Integer>() {
        @Override
        public void accept(@NonNull Integer integer) throws Exception {
            Log.e(TAG, "accept:" + integer);
        }
    });

輸出結(jié)果:

accept:0
accept:1
accept:100
accept:200

6.8 Timestamp

給 Observable 發(fā)射的數(shù)據(jù)項(xiàng)附加一個(gè)時(shí)間戳标锄。


Timestamp
Timestamp

它將一個(gè)發(fā)射 T 類型數(shù)據(jù)的 Observable 轉(zhuǎn)換為一個(gè)發(fā)射類型為 Timestamped 的數(shù)據(jù)的 Observable顽铸,每一項(xiàng)都包含數(shù)據(jù)的發(fā)射時(shí)間。也就是把 Observable 發(fā)射的數(shù)據(jù)重新包裝了一下料皇,將數(shù)據(jù)發(fā)射的時(shí)間打包一起發(fā)射出去谓松,這樣觀察者不僅能得到數(shù)據(jù),還能得到數(shù)據(jù)的發(fā)射時(shí)間践剂。 timestamp 默認(rèn)在 immediate 調(diào)度器上執(zhí)行鬼譬,但是可以通過(guò)參數(shù)指定其它的調(diào)度器。

示例代碼:

Observable.range(1, 3)
    .timestamp()
    .subscribe(new Consumer<Timed<Integer>>() {
        @Override
        public void accept(@NonNull Timed<Integer> t) throws Exception {
            Log.e(TAG, "accept ,onNext:" + t.value() + ",time = " + t.time());
        }
    });

輸出結(jié)果:

accept ,onNext:1,time = 1494606809418
accept ,onNext:2,time = 1494606809420
accept ,onNext:3,time = 1494606809420

6.9 Using

創(chuàng)建一個(gè)只在 Observable 生命周期內(nèi)存在的一次性資源.


Using
Using

當(dāng)一個(gè)觀察者訂閱 using 返回的 Observable 時(shí)逊脯,using 將會(huì)使用 Observable 工廠函數(shù)創(chuàng)建觀察者要觀察 Observable优质,同時(shí)使用資源工廠函數(shù)創(chuàng)建一個(gè)你想要?jiǎng)?chuàng)建的資源。當(dāng)觀察者取消訂閱這個(gè) Observable 時(shí)军洼,或者當(dāng)觀察者終止時(shí)(無(wú)論是正常終止還是因錯(cuò)誤而終止)巩螃,using 使用第三個(gè)函數(shù)釋放它創(chuàng)建的資源。

using 操作符接受三個(gè)參數(shù):

  • 一個(gè)用戶創(chuàng)建一次性資源的工廠函數(shù)
  • 一個(gè)用于創(chuàng)建 Observable 的工廠函數(shù)
  • 一個(gè)用于釋放資源的函數(shù)

示例代碼:

Observable.using(new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {
            return new Random().nextInt(10);
        }
    }, new Function<Integer, ObservableSource<String>>() {
        @Override
        public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
            return Observable.just("hello+" + integer, "world+" + integer);
        }
    }, new Consumer<Integer>() {
        @Override
        public void accept(@NonNull Integer integer) throws Exception {
            Log.e(TAG, "using,accept - >" + integer);
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(@NonNull String s) throws Exception {
            Log.e(TAG, "subscribe,accept -> " + s);
        }
    });

輸出結(jié)果:

subscribe,accept -> hello+8
subscribe,accept -> world+8
using,accept - >8

6.10 To

將 Observable 轉(zhuǎn)換為另一個(gè)對(duì)象或數(shù)據(jù)結(jié)構(gòu)匕争。


To
To

ReactiveX 的很多語(yǔ)言特定實(shí)現(xiàn)都有一種操作符讓你可以將 Observable 或者 Observable 發(fā)射的數(shù)據(jù)序列轉(zhuǎn)換為另一個(gè)對(duì)象或數(shù)據(jù)結(jié)構(gòu)避乏。它們中的一些會(huì)阻塞直到 Observable 終止,然后生成一個(gè)等價(jià)的對(duì)象或數(shù)據(jù)結(jié)構(gòu)甘桑;另一些返回一個(gè)發(fā)射那個(gè)對(duì)象或數(shù)據(jù)結(jié)構(gòu)的 Observable淑际。

在某些 ReactiveX 實(shí)現(xiàn)中,還有一個(gè)操作符用于將 Observable 轉(zhuǎn)換成阻塞式的扇住。一個(gè)阻塞式的 Ogbservable 在普通的 Observable 的基礎(chǔ)上增加了幾個(gè)方法春缕,用于操作 Observable 發(fā)射的數(shù)據(jù)項(xiàng)。

RxJava2.x 中實(shí)現(xiàn)了多種 To 操作符:

6.10.1 To

示例代碼:


輸出結(jié)果:


6.10.2 toFuture

返回表示該 Observable 發(fā)出的單個(gè)值的 Future艘蹋。
如果 Observable 發(fā)出多個(gè)項(xiàng)目锄贼,F(xiàn)uture 將會(huì)收到一個(gè) IllegalArgumentException。 如果 Observable 為空女阀,F(xiàn)uture 將收到一個(gè) NoSuchElementException宅荤。

如果 Observable 可能會(huì)發(fā)出多個(gè)項(xiàng)目,請(qǐng)使用Observable.toList() 浸策、toBlocking() 冯键、toFuture()。


toFuture
toFuture
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末庸汗,一起剝皮案震驚了整個(gè)濱河市惫确,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌,老刑警劉巖改化,帶你破解...
    沈念sama閱讀 218,941評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件掩蛤,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡陈肛,警方通過(guò)查閱死者的電腦和手機(jī)揍鸟,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,397評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)句旱,“玉大人阳藻,你說(shuō)我怎么就攤上這事√溉觯” “怎么了腥泥?”我有些...
    開封第一講書人閱讀 165,345評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)港华。 經(jīng)常有香客問(wèn)我道川,道長(zhǎng),這世上最難降的妖魔是什么立宜? 我笑而不...
    開封第一講書人閱讀 58,851評(píng)論 1 295
  • 正文 為了忘掉前任冒萄,我火速辦了婚禮,結(jié)果婚禮上橙数,老公的妹妹穿的比我還像新娘尊流。我一直安慰自己,他們只是感情好灯帮,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,868評(píng)論 6 392
  • 文/花漫 我一把揭開白布崖技。 她就那樣靜靜地躺著,像睡著了一般钟哥。 火紅的嫁衣襯著肌膚如雪迎献。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,688評(píng)論 1 305
  • 那天腻贰,我揣著相機(jī)與錄音吁恍,去河邊找鬼。 笑死播演,一個(gè)胖子當(dāng)著我的面吹牛冀瓦,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播写烤,決...
    沈念sama閱讀 40,414評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼翼闽,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了洲炊?” 一聲冷哼從身側(cè)響起感局,我...
    開封第一講書人閱讀 39,319評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤尼啡,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后蓝厌,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體玄叠,經(jīng)...
    沈念sama閱讀 45,775評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡古徒,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,945評(píng)論 3 336
  • 正文 我和宋清朗相戀三年拓提,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片隧膘。...
    茶點(diǎn)故事閱讀 40,096評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡代态,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出疹吃,到底是詐尸還是另有隱情蹦疑,我是刑警寧澤,帶...
    沈念sama閱讀 35,789評(píng)論 5 346
  • 正文 年R本政府宣布萨驶,位于F島的核電站歉摧,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏腔呜。R本人自食惡果不足惜叁温,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,437評(píng)論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望核畴。 院中可真熱鬧膝但,春花似錦、人聲如沸谤草。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,993評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)丑孩。三九已至冀宴,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間温学,已是汗流浹背略贮。 一陣腳步聲響...
    開封第一講書人閱讀 33,107評(píng)論 1 271
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留枫浙,地道東北人刨肃。 一個(gè)月前我還...
    沈念sama閱讀 48,308評(píng)論 3 372
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像箩帚,于是被迫代替她去往敵國(guó)和親真友。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,037評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • 本篇文章介主要紹RxJava中操作符是以函數(shù)作為基本單位紧帕,與響應(yīng)式編程作為結(jié)合使用的盔然,對(duì)什么是操作桅打、操作符都有哪些...
    嘎啦果安卓獸閱讀 2,862評(píng)論 0 10
  • 注:只包含標(biāo)準(zhǔn)包中的操作符,用于個(gè)人學(xué)習(xí)及備忘參考博客:http://blog.csdn.net/maplejaw...
    小白要超神閱讀 2,195評(píng)論 2 8
  • 創(chuàng)建操作 用于創(chuàng)建Observable的操作符Create通過(guò)調(diào)用觀察者的方法從頭創(chuàng)建一個(gè)ObservableEm...
    rkua閱讀 1,829評(píng)論 0 1
  • 作者: maplejaw本篇只解析標(biāo)準(zhǔn)包中的操作符愈案。對(duì)于擴(kuò)展包挺尾,由于使用率較低,如有需求站绪,請(qǐng)讀者自行查閱文檔遭铺。 創(chuàng)...
    maplejaw_閱讀 45,670評(píng)論 8 93
  • RxJava正在Android開發(fā)者中變的越來(lái)越流行。唯一的問(wèn)題就是上手不容易恢准,尤其是大部分人之前都是使用命令式編...
    劉啟敏閱讀 1,871評(píng)論 1 7