Carson帶你學(xué)Android:RxJava功能性操作符

前言

Rxjava技羔,由于其基于事件流的鏈?zhǔn)秸{(diào)用瓮顽、邏輯簡(jiǎn)潔 & 使用簡(jiǎn)單的特點(diǎn)霜大,深受各大 Android開發(fā)者的歡迎。

如果還不了解 RxJava捡絮,請(qǐng)看文章:Android:這是一篇 清晰 & 易懂的Rxjava 入門教程

  • RxJava如此受歡迎的原因熬芜,在于其提供了豐富 & 功能強(qiáng)大的操作符,幾乎能完成所有的功能需求
  • 今天福稳,我將為大家詳細(xì)介紹RxJava操作符中最常用的 功能性操作符涎拉,并附帶 Retrofit 結(jié)合 RxJava的實(shí)例Demo教學(xué),希望你們會(huì)喜歡的圆。

Carson帶你學(xué)RxJava系列文章鼓拧,包括 原理、操作符越妈、應(yīng)用場(chǎng)景季俩、背壓等等,請(qǐng)關(guān)注看文章:Android:這是一份全面 & 詳細(xì)的RxJava學(xué)習(xí)指南


目錄

示意圖

1. 作用

輔助被觀察者(Observable) 在發(fā)送事件時(shí)實(shí)現(xiàn)一些功能性需求

如錯(cuò)誤處理叮称、線程調(diào)度等等


2. 類型

  • RxJava 2 中种玛,常見的功能性操作符 主要有:
示意圖
  • 下面,我將對(duì)每個(gè)操作符進(jìn)行詳細(xì)講解

3. 應(yīng)用場(chǎng)景 & 對(duì)應(yīng)操作符詳解

注:在使用RxJava 2操作符前瓤檐,記得在項(xiàng)目的Gradle中添加依賴:

dependencies {
      compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
      compile 'io.reactivex.rxjava2:rxjava:2.0.7'
      // 注:RxJava2 與 RxJava1 不能共存赂韵,即依賴不能同時(shí)存在
}

3.1 連接被觀察者 & 觀察者

  • 需求場(chǎng)景
    即使得被觀察者 & 觀察者 形成訂閱關(guān)系

  • 對(duì)應(yīng)操作符

subscribe()

  • 作用
    訂閱,即連接觀察者 & 被觀察者

  • 具體使用

observable.subscribe(observer);
// 前者 = 被觀察者(observable)挠蛉;后者 = 觀察者(observer 或 subscriber)


<-- 1. 分步驟的完整調(diào)用 -->
//  步驟1: 創(chuàng)建被觀察者 Observable 對(duì)象
        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        });

// 步驟2:創(chuàng)建觀察者 Observer 并 定義響應(yīng)事件行為
        Observer<Integer> observer = new Observer<Integer>() {
            // 通過復(fù)寫對(duì)應(yīng)方法來 響應(yīng) 被觀察者
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "開始采用subscribe連接");
            }
            // 默認(rèn)最先調(diào)用復(fù)寫的 onSubscribe()

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "對(duì)Next事件"+ value +"作出響應(yīng)"  );
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "對(duì)Error事件作出響應(yīng)");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "對(duì)Complete事件作出響應(yīng)");
            }
        };

        
        // 步驟3:通過訂閱(subscribe)連接觀察者和被觀察者
        observable.subscribe(observer);


<-- 2. 基于事件流的鏈?zhǔn)秸{(diào)用 -->
        Observable.create(new ObservableOnSubscribe<Integer>() {
        // 1. 創(chuàng)建被觀察者 & 生產(chǎn)事件
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).subscribe(new Observer<Integer>() {
            // 2. 通過通過訂閱(subscribe)連接觀察者和被觀察者
            // 3. 創(chuàng)建觀察者 & 定義響應(yīng)事件的行為
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "開始采用subscribe連接");
            }
            // 默認(rèn)最先調(diào)用復(fù)寫的 onSubscribe()

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "對(duì)Next事件"+ value +"作出響應(yīng)"  );
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "對(duì)Error事件作出響應(yīng)");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "對(duì)Complete事件作出響應(yīng)");
            }

        });
    }
}

  • 測(cè)試結(jié)果
示意圖
  • 擴(kuò)展說明
<-- Observable.subscribe(Subscriber) 的內(nèi)部實(shí)現(xiàn) -->

public Subscription subscribe(Subscriber subscriber) {
    subscriber.onStart();
    // 在觀察者 subscriber抽象類復(fù)寫的方法 onSubscribe.call(subscriber)祭示,用于初始化工作
    // 通過該調(diào)用,從而回調(diào)觀察者中的對(duì)應(yīng)方法從而響應(yīng)被觀察者生產(chǎn)的事件
    // 從而實(shí)現(xiàn)被觀察者調(diào)用了觀察者的回調(diào)方法 & 由被觀察者向觀察者的事件傳遞谴古,即觀察者模式
    // 同時(shí)也看出:Observable只是生產(chǎn)事件质涛,真正的發(fā)送事件是在它被訂閱的時(shí)候,即當(dāng) subscribe() 方法執(zhí)行時(shí)
}

3.2 線程調(diào)度

  • 需求場(chǎng)景
    快速掰担、方便指定 & 控制被觀察者 & 觀察者 的工作線程

3.3 延遲操作

  • 需求場(chǎng)景
    即在被觀察者發(fā)送事件前進(jìn)行一些延遲的操作

  • 對(duì)應(yīng)操作符使用

delay()

  • 作用
    使得被觀察者延遲一段時(shí)間再發(fā)送事件

  • 方法介紹
    delay() 具備多個(gè)重載方法带饱,具體如下:

// 1. 指定延遲時(shí)間
// 參數(shù)1 = 時(shí)間毡代;參數(shù)2 = 時(shí)間單位
delay(long delay,TimeUnit unit)

// 2. 指定延遲時(shí)間 & 調(diào)度器
// 參數(shù)1 = 時(shí)間;參數(shù)2 = 時(shí)間單位勺疼;參數(shù)3 = 線程調(diào)度器
delay(long delay,TimeUnit unit,mScheduler scheduler)

// 3. 指定延遲時(shí)間  & 錯(cuò)誤延遲
// 錯(cuò)誤延遲教寂,即:若存在Error事件,則如常執(zhí)行执庐,執(zhí)行后再拋出錯(cuò)誤異常
// 參數(shù)1 = 時(shí)間酪耕;參數(shù)2 = 時(shí)間單位;參數(shù)3 = 錯(cuò)誤延遲參數(shù)
delay(long delay,TimeUnit unit,boolean delayError)

// 4. 指定延遲時(shí)間 & 調(diào)度器 & 錯(cuò)誤延遲
// 參數(shù)1 = 時(shí)間轨淌;參數(shù)2 = 時(shí)間單位迂烁;參數(shù)3 = 線程調(diào)度器看尼;參數(shù)4 = 錯(cuò)誤延遲參數(shù)
delay(long delay,TimeUnit unit,mScheduler scheduler,boolean delayError): 指定延遲多長(zhǎng)時(shí)間并添加調(diào)度器,錯(cuò)誤通知可以設(shè)置是否延遲
  • 具體使用
Observable.just(1, 2, 3)
                .delay(3, TimeUnit.SECONDS) // 延遲3s再發(fā)送婚被,由于使用類似狡忙,所以此處不作全部展示
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對(duì)Error事件作出響應(yīng)");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對(duì)Complete事件作出響應(yīng)");
                    }
                });
  • 測(cè)試結(jié)果
示意圖

3.4 在事件的生命周期中操作

  • 需求場(chǎng)景
    在事件發(fā)送 & 接收的整個(gè)生命周期過程中進(jìn)行操作

如發(fā)送事件前的初始化、發(fā)送事件后的回調(diào)請(qǐng)求等

  • 對(duì)應(yīng)操作符使用

do()

  • 作用
    在某個(gè)事件的生命周期中調(diào)用
  • 類型
    do()操作符有很多個(gè)址芯,具體如下:
示意圖
  • 具體使用
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onError(new Throwable("發(fā)生錯(cuò)誤了"));
                 }
               })
                // 1. 當(dāng)Observable每發(fā)送1次數(shù)據(jù)事件就會(huì)調(diào)用1次
                .doOnEach(new Consumer<Notification<Integer>>() {
                    @Override
                    public void accept(Notification<Integer> integerNotification) throws Exception {
                        Log.d(TAG, "doOnEach: " + integerNotification.getValue());
                    }
                })
                // 2. 執(zhí)行Next事件前調(diào)用
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "doOnNext: " + integer);
                    }
                })
                // 3. 執(zhí)行Next事件后調(diào)用
                .doAfterNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "doAfterNext: " + integer);
                    }
                })
                // 4. Observable正常發(fā)送事件完畢后調(diào)用
                .doOnComplete(new Action() {
                    @Override
                    public void run() throws Exception {
                        Log.e(TAG, "doOnComplete: ");
                    }
                })
                // 5. Observable發(fā)送錯(cuò)誤事件時(shí)調(diào)用
                .doOnError(new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        Log.d(TAG, "doOnError: " + throwable.getMessage());
                    }
                })
                // 6. 觀察者訂閱時(shí)調(diào)用
                .doOnSubscribe(new Consumer<Disposable>() {
                    @Override
                    public void accept(@NonNull Disposable disposable) throws Exception {
                        Log.e(TAG, "doOnSubscribe: ");
                    }
                })
                // 7. Observable發(fā)送事件完畢后調(diào)用灾茁,無論正常發(fā)送完畢 / 異常終止
                .doAfterTerminate(new Action() {
                    @Override
                    public void run() throws Exception {
                        Log.e(TAG, "doAfterTerminate: ");
                    }
                })
                // 8. 最后執(zhí)行
                .doFinally(new Action() {
                    @Override
                    public void run() throws Exception {
                        Log.e(TAG, "doFinally: ");
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }
                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對(duì)Error事件作出響應(yīng)");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對(duì)Complete事件作出響應(yīng)");
                    }
                });
  • 測(cè)試結(jié)果
示意圖

3.5 錯(cuò)誤處理

  • 需求場(chǎng)景
    發(fā)送事件過程中,遇到錯(cuò)誤時(shí)的處理機(jī)制

  • 對(duì)應(yīng)操作符類型

示意圖
  • 對(duì)應(yīng)操作符使用

onErrorReturn()

  • 作用
    遇到錯(cuò)誤時(shí)谷炸,發(fā)送1個(gè)特殊事件 & 正常終止

可捕獲在它之前發(fā)生的異常

  • 具體使用
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new Throwable("發(fā)生錯(cuò)誤了"));
                 }
               })
                .onErrorReturn(new Function<Throwable, Integer>() {
                    @Override
                    public Integer apply(@NonNull Throwable throwable) throws Exception {
                        // 捕捉錯(cuò)誤異常
                        Log.e(TAG, "在onErrorReturn處理了錯(cuò)誤: "+throwable.toString() );

                        return 666;
                        // 發(fā)生錯(cuò)誤事件后北专,發(fā)送一個(gè)"666"事件,最終正常結(jié)束
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }
                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對(duì)Error事件作出響應(yīng)");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對(duì)Complete事件作出響應(yīng)");
                    }
                });
  • 測(cè)試結(jié)果
示意圖

onErrorResumeNext()

  • 作用
    遇到錯(cuò)誤時(shí)旬陡,發(fā)送1個(gè)新的Observable

注:

  1. onErrorResumeNext()攔截的錯(cuò)誤 = Throwable拓颓;若需攔截Exception請(qǐng)用onExceptionResumeNext()
  2. onErrorResumeNext()攔截的錯(cuò)誤 = Exception,則會(huì)將錯(cuò)誤傳遞給觀察者的onError方法
  • 具體使用
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new Throwable("發(fā)生錯(cuò)誤了"));
                 }
               })
                .onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {
                    @Override
                    public ObservableSource<? extends Integer> apply(@NonNull Throwable throwable) throws Exception {

                        // 1. 捕捉錯(cuò)誤異常
                        Log.e(TAG, "在onErrorReturn處理了錯(cuò)誤: "+throwable.toString() );

                        // 2. 發(fā)生錯(cuò)誤事件后描孟,發(fā)送一個(gè)新的被觀察者 & 發(fā)送事件序列
                        return Observable.just(11,22);
                        
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }
                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對(duì)Error事件作出響應(yīng)");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對(duì)Complete事件作出響應(yīng)");
                    }
                });
  • 測(cè)試結(jié)果
示意圖

onExceptionResumeNext()

  • 作用
    遇到錯(cuò)誤時(shí)驶睦,發(fā)送1個(gè)新的Observable

注:

  1. onExceptionResumeNext()攔截的錯(cuò)誤 = Exception;若需攔截Throwable請(qǐng)用onErrorResumeNext()
  2. onExceptionResumeNext()攔截的錯(cuò)誤 = Throwable匿醒,則會(huì)將錯(cuò)誤傳遞給觀察者的onError方法
  • 具體使用
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new Exception("發(fā)生錯(cuò)誤了"));
                 }
               })
                .onExceptionResumeNext(new Observable<Integer>() {
                    @Override
                    protected void subscribeActual(Observer<? super Integer> observer) {
                        observer.onNext(11);
                        observer.onNext(22);
                        observer.onComplete();
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }
                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對(duì)Error事件作出響應(yīng)");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對(duì)Complete事件作出響應(yīng)");
                    }
                });
  • 測(cè)試結(jié)果
示意圖

retry()

  • 作用
    重試场航,即當(dāng)出現(xiàn)錯(cuò)誤時(shí),讓被觀察者(Observable)重新發(fā)射數(shù)據(jù)
  1. 接收到 onError()時(shí)廉羔,重新訂閱 & 發(fā)送事件
  2. ThrowableException都可攔截
  • 類型

共有5種重載方法

<-- 1. retry() -->
// 作用:出現(xiàn)錯(cuò)誤時(shí)溉痢,讓被觀察者重新發(fā)送數(shù)據(jù)
// 注:若一直錯(cuò)誤,則一直重新發(fā)送

<-- 2. retry(long time) -->
// 作用:出現(xiàn)錯(cuò)誤時(shí)憋他,讓被觀察者重新發(fā)送數(shù)據(jù)(具備重試次數(shù)限制
// 參數(shù) = 重試次數(shù)
 
<-- 3. retry(Predicate predicate) -->
// 作用:出現(xiàn)錯(cuò)誤后孩饼,判斷是否需要重新發(fā)送數(shù)據(jù)(若需要重新發(fā)送& 持續(xù)遇到錯(cuò)誤,則持續(xù)重試)
// 參數(shù) = 判斷邏輯

<--  4. retry(new BiPredicate<Integer, Throwable>) -->
// 作用:出現(xiàn)錯(cuò)誤后竹挡,判斷是否需要重新發(fā)送數(shù)據(jù)(若需要重新發(fā)送 & 持續(xù)遇到錯(cuò)誤镀娶,則持續(xù)重試
// 參數(shù) =  判斷邏輯(傳入當(dāng)前重試次數(shù) & 異常錯(cuò)誤信息)

<-- 5. retry(long time,Predicate predicate) -->
// 作用:出現(xiàn)錯(cuò)誤后,判斷是否需要重新發(fā)送數(shù)據(jù)(具備重試次數(shù)限制
// 參數(shù) = 設(shè)置重試次數(shù) & 判斷邏輯

  • 具體使用
<-- 1. retry() -->
// 作用:出現(xiàn)錯(cuò)誤時(shí)揪罕,讓被觀察者重新發(fā)送數(shù)據(jù)
// 注:若一直錯(cuò)誤梯码,則一直重新發(fā)送

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new Exception("發(fā)生錯(cuò)誤了"));
                e.onNext(3);
                 }
               })
                .retry() // 遇到錯(cuò)誤時(shí),讓被觀察者重新發(fā)射數(shù)據(jù)(若一直錯(cuò)誤耸序,則一直重新發(fā)送
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }
                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對(duì)Error事件作出響應(yīng)");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對(duì)Complete事件作出響應(yīng)");
                    }
                });


<-- 2. retry(long time) -->
// 作用:出現(xiàn)錯(cuò)誤時(shí),讓被觀察者重新發(fā)送數(shù)據(jù)(具備重試次數(shù)限制
// 參數(shù) = 重試次數(shù)
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new Exception("發(fā)生錯(cuò)誤了"));
                e.onNext(3);
                 }
               })
                .retry(3) // 設(shè)置重試次數(shù) = 3次
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }
                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對(duì)Error事件作出響應(yīng)");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對(duì)Complete事件作出響應(yīng)");
                    }
                });

<-- 3. retry(Predicate predicate) -->
// 作用:出現(xiàn)錯(cuò)誤后鲁猩,判斷是否需要重新發(fā)送數(shù)據(jù)(若需要重新發(fā)送& 持續(xù)遇到錯(cuò)誤坎怪,則持續(xù)重試)
// 參數(shù) = 判斷邏輯
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new Exception("發(fā)生錯(cuò)誤了"));
                e.onNext(3);
                 }
               })
                // 攔截錯(cuò)誤后,判斷是否需要重新發(fā)送請(qǐng)求
                .retry(new Predicate<Throwable>() {
                    @Override
                    public boolean test(@NonNull Throwable throwable) throws Exception {
                        // 捕獲異常
                        Log.e(TAG, "retry錯(cuò)誤: "+throwable.toString());

                        //返回false = 不重新重新發(fā)送數(shù)據(jù) & 調(diào)用觀察者的onError結(jié)束
                        //返回true = 重新發(fā)送請(qǐng)求(若持續(xù)遇到錯(cuò)誤廓握,就持續(xù)重新發(fā)送)
                        return true;
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }
                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對(duì)Error事件作出響應(yīng)");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對(duì)Complete事件作出響應(yīng)");
                    }
                });

<--  4. retry(new BiPredicate<Integer, Throwable>) -->
// 作用:出現(xiàn)錯(cuò)誤后搅窿,判斷是否需要重新發(fā)送數(shù)據(jù)(若需要重新發(fā)送 & 持續(xù)遇到錯(cuò)誤嘁酿,則持續(xù)重試
// 參數(shù) =  判斷邏輯(傳入當(dāng)前重試次數(shù) & 異常錯(cuò)誤信息)
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new Exception("發(fā)生錯(cuò)誤了"));
                e.onNext(3);
                 }
               })

                // 攔截錯(cuò)誤后,判斷是否需要重新發(fā)送請(qǐng)求
                .retry(new BiPredicate<Integer, Throwable>() {
                    @Override
                    public boolean test(@NonNull Integer integer, @NonNull Throwable throwable) throws Exception {
                        // 捕獲異常
                        Log.e(TAG, "異常錯(cuò)誤 =  "+throwable.toString());

                        // 獲取當(dāng)前重試次數(shù)
                        Log.e(TAG, "當(dāng)前重試次數(shù) =  "+integer);

                        //返回false = 不重新重新發(fā)送數(shù)據(jù) & 調(diào)用觀察者的onError結(jié)束
                        //返回true = 重新發(fā)送請(qǐng)求(若持續(xù)遇到錯(cuò)誤男应,就持續(xù)重新發(fā)送)
                        return true;
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }
                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對(duì)Error事件作出響應(yīng)");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對(duì)Complete事件作出響應(yīng)");
                    }
                });


<-- 5. retry(long time,Predicate predicate) -->
// 作用:出現(xiàn)錯(cuò)誤后闹司,判斷是否需要重新發(fā)送數(shù)據(jù)(具備重試次數(shù)限制
// 參數(shù) = 設(shè)置重試次數(shù) & 判斷邏輯
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new Exception("發(fā)生錯(cuò)誤了"));
                e.onNext(3);
                 }
               })
                // 攔截錯(cuò)誤后,判斷是否需要重新發(fā)送請(qǐng)求
                .retry(3, new Predicate<Throwable>() {
                    @Override
                    public boolean test(@NonNull Throwable throwable) throws Exception {
                        // 捕獲異常
                        Log.e(TAG, "retry錯(cuò)誤: "+throwable.toString());

                        //返回false = 不重新重新發(fā)送數(shù)據(jù) & 調(diào)用觀察者的onError()結(jié)束
                        //返回true = 重新發(fā)送請(qǐng)求(最多重新發(fā)送3次)
                        return true;
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }
                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對(duì)Error事件作出響應(yīng)");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對(duì)Complete事件作出響應(yīng)");
                    }
                });

retryUntil()

  • 作用
    出現(xiàn)錯(cuò)誤后沐飘,判斷是否需要重新發(fā)送數(shù)據(jù)
  1. 若需要重新發(fā)送 & 持續(xù)遇到錯(cuò)誤游桩,則持續(xù)重試
  2. 作用類似于retry(Predicate predicate)
  • 具體使用
    具體使用類似于retry(Predicate predicate),唯一區(qū)別:返回 true 則不重新發(fā)送數(shù)據(jù)事件耐朴。此處不作過多描述

retryWhen()

  • 作用
    遇到錯(cuò)誤時(shí)借卧,將發(fā)生的錯(cuò)誤傳遞給一個(gè)新的被觀察者(Observable),并決定是否需要重新訂閱原始被觀察者(Observable)& 發(fā)送事件
  • 具體使用
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new Exception("發(fā)生錯(cuò)誤了"));
                e.onNext(3);
            }
        })
                // 遇到error事件才會(huì)回調(diào)
                .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
                    
                    @Override
                    public ObservableSource<?> apply(@NonNull Observable<Throwable> throwableObservable) throws Exception {
                        // 參數(shù)Observable<Throwable>中的泛型 = 上游操作符拋出的異常筛峭,可通過該條件來判斷異常的類型
                        // 返回Observable<?> = 新的被觀察者 Observable(任意類型)
                        // 此處有兩種情況:
                            // 1. 若 新的被觀察者 Observable發(fā)送的事件 = Error事件铐刘,那么 原始Observable則不重新發(fā)送事件:
                            // 2. 若 新的被觀察者 Observable發(fā)送的事件 = Next事件 ,那么原始的Observable則重新發(fā)送事件:
                        return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
                            @Override
                            public ObservableSource<?> apply(@NonNull Throwable throwable) throws Exception {

                                // 1. 若返回的Observable發(fā)送的事件 = Error事件影晓,則原始的Observable不重新發(fā)送事件
                                // 該異常錯(cuò)誤信息可在觀察者中的onError()中獲得
                                 return Observable.error(new Throwable("retryWhen終止啦"));
                                
                                // 2. 若返回的Observable發(fā)送的事件 = Next事件镰吵,則原始的Observable重新發(fā)送事件(若持續(xù)遇到錯(cuò)誤,則持續(xù)重試)
                                 // return Observable.just(1);
                            }
                        });

                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }
                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對(duì)Error事件作出響應(yīng)" + e.toString());
                        // 獲取異常錯(cuò)誤信息
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對(duì)Complete事件作出響應(yīng)");
                    }
                });
  • 測(cè)試結(jié)果
新的Observable發(fā)送錯(cuò)誤事件 = 原始Observable終止發(fā)送
新的Observable發(fā)送數(shù)據(jù)事件 = 原始Observable 持續(xù)重試

3.6 重復(fù)發(fā)送

  • 需求場(chǎng)景
    重復(fù)不斷地發(fā)送被觀察者事件

  • 對(duì)應(yīng)操作符類型
    repeat() & repeatWhen()

repeat()

  • 作用
    無條件地挂签、重復(fù)發(fā)送 被觀察者事件

具備重載方法疤祭,可設(shè)置重復(fù)創(chuàng)建次數(shù)

  • 具體使用
// 不傳入?yún)?shù) = 重復(fù)發(fā)送次數(shù) = 無限次
        repeat();
        // 傳入?yún)?shù) = 重復(fù)發(fā)送次數(shù)有限
        repeatWhen(Integer int )竹握;

// 注:
  // 1. 接收到.onCompleted()事件后画株,觸發(fā)重新訂閱 & 發(fā)送
  // 2. 默認(rèn)運(yùn)行在一個(gè)新的線程上

        // 具體使用
        Observable.just(1, 2, 3, 4)
                .repeat(3) // 重復(fù)創(chuàng)建次數(shù) =- 3次
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "開始采用subscribe連接");
                    }
                    
                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件" + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對(duì)Error事件作出響應(yīng)");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對(duì)Complete事件作出響應(yīng)");
                    }

                });



  • 測(cè)試結(jié)果
示意圖

repeatWhen()

  • 作用
    有條件地、重復(fù)發(fā)送 被觀察者事件

  • 原理
    將原始 Observable 停止發(fā)送事件的標(biāo)識(shí)(Complete() / Error())轉(zhuǎn)換成1個(gè) Object 類型數(shù)據(jù)傳遞給1個(gè)新被觀察者(Observable)啦辐,以此決定是否重新訂閱 & 發(fā)送原來的 Observable

  1. 若新被觀察者(Observable)返回1個(gè)Complete / Error事件谓传,則不重新訂閱 & 發(fā)送原來的 Observable
  2. 若新被觀察者(Observable)返回其余事件時(shí),則重新訂閱 & 發(fā)送原來的 Observable
  • 具體使用
 Observable.just(1,2,4).repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
            @Override
            // 在Function函數(shù)中芹关,必須對(duì)輸入的 Observable<Object>進(jìn)行處理续挟,這里我們使用的是flatMap操作符接收上游的數(shù)據(jù)
            public ObservableSource<?> apply(@NonNull Observable<Object> objectObservable) throws Exception {
                // 將原始 Observable 停止發(fā)送事件的標(biāo)識(shí)(Complete() /  Error())轉(zhuǎn)換成1個(gè) Object 類型數(shù)據(jù)傳遞給1個(gè)新被觀察者(Observable)
                // 以此決定是否重新訂閱 & 發(fā)送原來的 Observable
                // 此處有2種情況:
                // 1. 若新被觀察者(Observable)返回1個(gè)Complete() /  Error()事件,則不重新訂閱 & 發(fā)送原來的 Observable
                // 2. 若新被觀察者(Observable)返回其余事件侥衬,則重新訂閱 & 發(fā)送原來的 Observable
                return objectObservable.flatMap(new Function<Object, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(@NonNull Object throwable) throws Exception {

                        // 情況1:若新被觀察者(Observable)返回1個(gè)Complete() /  Error()事件诗祸,則不重新訂閱 & 發(fā)送原來的 Observable
                        return Observable.empty();
                        // Observable.empty() = 發(fā)送Complete事件,但不會(huì)回調(diào)觀察者的onComplete()

                        // return Observable.error(new Throwable("不再重新訂閱事件"));
                        // 返回Error事件 = 回調(diào)onError()事件轴总,并接收傳過去的錯(cuò)誤信息直颅。

                        // 情況2:若新被觀察者(Observable)返回其余事件,則重新訂閱 & 發(fā)送原來的 Observable
                        // return Observable.just(1);
                       // 僅僅是作為1個(gè)觸發(fā)重新訂閱被觀察者的通知怀樟,發(fā)送的是什么數(shù)據(jù)并不重要功偿,只要不是Complete() /  Error()事件
                    }
                });

            }
        })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "開始采用subscribe連接");
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件" + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對(duì)Error事件作出響應(yīng):" + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對(duì)Complete事件作出響應(yīng)");
                    }

                });
  • 測(cè)試結(jié)果
新的Observable發(fā)送Complete 事件 = 原始Observable停止發(fā)送 & 不重新發(fā)送
新的Observable發(fā)送Error 事件 = 原始Observable停止發(fā)送 & 不重新發(fā)送
新的Observable發(fā)送其余事件 = 原始Observable重新發(fā)送

至此,RxJava 2中的功能性操作符講解完畢往堡。


4. 實(shí)際開發(fā)需求案例

  • 下面械荷,我將 結(jié)合Retrofit & RxJava共耍,講解功能性操作符的3個(gè)實(shí)際需求案例場(chǎng)景:
    1. 線程操作(切換 / 調(diào)度 / 控制 )
    2. 輪詢
    3. 發(fā)送網(wǎng)絡(luò)請(qǐng)求時(shí)的差錯(cuò)重試機(jī)制

4.1 線程控制(切換 / 調(diào)度 )


4.2 輪詢

  • 需求場(chǎng)景說明
示意圖

4.3 發(fā)送網(wǎng)絡(luò)請(qǐng)求時(shí)的差錯(cuò)重試機(jī)制


5. Demo地址

上述所有的Demo源代碼都存放在:Carson_Ho的Github地址:RxJava2_功能性操作符


6. 總結(jié)

  • 下面颤诀,我將用一張圖總結(jié) RxJava2 中常用的功能性操作符
示意圖
  • Carson帶你學(xué)RxJava系列文章:

入門
Carson帶你學(xué)Android:這是一篇清晰易懂的Rxjava入門教程
Carson帶你學(xué)Android:面向初學(xué)者的RxJava使用指南
Carson帶你學(xué)Android:RxJava2.0到底更新了什么字旭?
原理
Carson帶你學(xué)Android:圖文解析RxJava原理
Carson帶你學(xué)Android:手把手帶你源碼分析RxJava
使用教程:操作符
Carson帶你學(xué)Android:RxJava操作符教程
Carson帶你學(xué)Android:RxJava創(chuàng)建操作符
Carson帶你學(xué)Android:RxJava功能性操作符
Carson帶你學(xué)Android:RxJava過濾操作符
Carson帶你學(xué)Android:RxJava組合/合并操作符
Carson帶你學(xué)Android:RxJava變換操作符
Carson帶你學(xué)Android:RxJava條件/布爾操作符
實(shí)戰(zhàn)
Carson帶你學(xué)Android:什么時(shí)候應(yīng)該使用Rxjava?(開發(fā)場(chǎng)景匯總)
Carson帶你學(xué)Android:RxJava線程控制(含實(shí)例講解)
Carson帶你學(xué)Android:圖文詳解RxJava背壓策略
Carson帶你學(xué)Android:RxJava着绊、Retrofit聯(lián)合使用匯總(含實(shí)例教程)
Carson帶你學(xué)Android:優(yōu)雅實(shí)現(xiàn)網(wǎng)絡(luò)請(qǐng)求嵌套回調(diào)
Carson帶你學(xué)Android:網(wǎng)絡(luò)請(qǐng)求輪詢(有條件)
Carson帶你學(xué)Android:網(wǎng)絡(luò)請(qǐng)求輪詢(無條件)
Carson帶你學(xué)Android:網(wǎng)絡(luò)請(qǐng)求出錯(cuò)重連(結(jié)合Retrofit)
Carson帶你學(xué)Android:合并數(shù)據(jù)源
Carson帶你學(xué)Android:聯(lián)想搜索優(yōu)化
Carson帶你學(xué)Android:功能防抖
Carson帶你學(xué)Android:從磁盤/內(nèi)存緩存中獲取緩存數(shù)據(jù)
Carson帶你學(xué)Android:聯(lián)合判斷


歡迎關(guān)注Carson_Ho的簡(jiǎn)書

不定期分享關(guān)于安卓開發(fā)的干貨谐算,追求短、平归露、快洲脂,但卻不缺深度


請(qǐng)點(diǎn)贊剧包!因?yàn)槟愕墓膭?lì)是我寫作的最大動(dòng)力恐锦!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市疆液,隨后出現(xiàn)的幾起案子一铅,更是在濱河造成了極大的恐慌,老刑警劉巖堕油,帶你破解...
    沈念sama閱讀 219,366評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件潘飘,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡掉缺,警方通過查閱死者的電腦和手機(jī)卜录,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,521評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來眶明,“玉大人艰毒,你說我怎么就攤上這事∷汛眩” “怎么了丑瞧?”我有些...
    開封第一講書人閱讀 165,689評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)蜀肘。 經(jīng)常有香客問我绊汹,道長(zhǎng),這世上最難降的妖魔是什么扮宠? 我笑而不...
    開封第一講書人閱讀 58,925評(píng)論 1 295
  • 正文 為了忘掉前任西乖,我火速辦了婚禮,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘浴栽。我一直安慰自己,他們只是感情好轿偎,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,942評(píng)論 6 392
  • 文/花漫 我一把揭開白布典鸡。 她就那樣靜靜地躺著,像睡著了一般坏晦。 火紅的嫁衣襯著肌膚如雪萝玷。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,727評(píng)論 1 305
  • 那天昆婿,我揣著相機(jī)與錄音球碉,去河邊找鬼。 笑死仓蛆,一個(gè)胖子當(dāng)著我的面吹牛睁冬,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播看疙,決...
    沈念sama閱讀 40,447評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼豆拨,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了能庆?” 一聲冷哼從身側(cè)響起施禾,我...
    開封第一講書人閱讀 39,349評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎搁胆,沒想到半個(gè)月后弥搞,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,820評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡渠旁,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,990評(píng)論 3 337
  • 正文 我和宋清朗相戀三年攀例,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片一死。...
    茶點(diǎn)故事閱讀 40,127評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡肛度,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出投慈,到底是詐尸還是另有隱情承耿,我是刑警寧澤,帶...
    沈念sama閱讀 35,812評(píng)論 5 346
  • 正文 年R本政府宣布伪煤,位于F島的核電站加袋,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏抱既。R本人自食惡果不足惜职烧,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,471評(píng)論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧蚀之,春花似錦蝗敢、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,017評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至失受,卻和暖如春讶泰,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背拂到。 一陣腳步聲響...
    開封第一講書人閱讀 33,142評(píng)論 1 272
  • 我被黑心中介騙來泰國(guó)打工痪署, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人兄旬。 一個(gè)月前我還...
    沈念sama閱讀 48,388評(píng)論 3 373
  • 正文 我出身青樓狼犯,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親领铐。 傳聞我的和親對(duì)象是個(gè)殘疾皇子辜王,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,066評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容