Rxjava2.0筆記-004-合并,過濾操作符實(shí)際應(yīng)用

關(guān)于合并數(shù)據(jù)源:之前使用了flatMap()以及concatMap()進(jìn)行嵌套調(diào)用入录,,注冊之后登陸

合并數(shù)據(jù)源2:合并數(shù)據(jù)(獲取圖書詳情以及評論)統(tǒng)一展示到客戶端:采用merge()或者zip()操作符

merge()例子:實(shí)現(xiàn)較為簡單的從(網(wǎng)絡(luò)+本地)獲取數(shù)據(jù)举塔,奋隶,統(tǒng)一展示

zip()例子:結(jié)合Retrofit以及Rxjava,實(shí)現(xiàn)多個(gè)網(wǎng)絡(luò)請求合并獲得數(shù)據(jù)俭驮,回溺,統(tǒng)一展示

二者區(qū)別為:merge()只添加被觀察者合并數(shù)據(jù)源的操作在observable觀察者的onnext()里面處理,進(jìn)行合并混萝,合并的結(jié)果在onComplete()處理遗遵,zip()可以直接添加發(fā)射者,再添加合并數(shù)據(jù)源的bean逸嘀,在轉(zhuǎn)主線程车要,訂閱,可以使用new Consumer<Bean>() )里面處理合并結(jié)果

/**
     * 合并發(fā)射者崭倘,按時(shí)間線執(zhí)行
     * 合并事件翼岁,還是merge()比較方便好用
     */

    String resultss = "數(shù)據(jù)源來自:";

    private void merge() {

//        Observable.merge(
//                //延遲發(fā)送操作符
//                //從0開始發(fā)送,發(fā)送3個(gè)數(shù)據(jù)司光,第一次發(fā)件延遲時(shí)間1秒琅坡。間隔時(shí)間1s
//                //
//                Observable.intervalRange(0,3,1,1,TimeUnit.SECONDS),
//                Observable.intervalRange(2,3,1,1,TimeUnit.SECONDS)
//        ).subscribe(aLong -> {
//
//        });

        Observable.merge(
                Observable.just("網(wǎng)絡(luò)"),
                Observable.just("本地文件")
        ).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
            }
            @Override
            public void onNext(String s) {
                resultss += s;
            }
            @Override
            public void onError(Throwable e) {
            }
            @Override
            public void onComplete() {

                KLog.d(TTAG, "接收完成統(tǒng)一處理事件:" + resultss);
            }
        });
    }

下面使用zip操作:

  /**
     * 合并數(shù)據(jù)源
     */
    private void zip() {

        Observable.zip(
                retrofitApi.getCall().subscribeOn(Schedulers.io()),
                retrofitApi.getCall().subscribeOn(Schedulers.io()),
                (translation, translation2) ->
                        translation.toString() + translation2.toString())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(s -> {

                    KLog.d(TTAG, "合并的數(shù)據(jù)源是:" + s.toString());
                }, throwable -> {

                });
    }

concat()實(shí)例

 /**
     * 該類型的操作符的作用 = 組合多個(gè)被觀察者
     * 組合多個(gè)被觀察者一起發(fā)送數(shù)據(jù),合并后 按發(fā)送順序串行執(zhí)行
     * concat()
     * concatArray()
     * 
     * 實(shí)例:從內(nèi)存以及磁盤和網(wǎng)絡(luò)獲取緩存
     */

    String memoryCache = null;
    String diskCache = "磁盤緩存數(shù)據(jù)";

    private void concat() {

    
        Observable.concat(
                Observable.create(emitter -> {

                    //判斷內(nèi)存是否含有緩存
                    if (null == memoryCache) {
                        emitter.onComplete();
                    } else {
                        emitter.onNext(memoryCache);
                    }
                }),
                Observable.create(emitter -> {

                    //判斷磁盤
                    if (null == diskCache) {
                        emitter.onComplete();
                    } else {
                        emitter.onNext(diskCache);
                    }
                }),
                Observable.create((ObservableOnSubscribe<String>) emitter -> {

                    emitter.onNext("從網(wǎng)絡(luò)獲取緩存數(shù)據(jù)");
                })
                //通過firstElement()残家,從串聯(lián)隊(duì)列中取出并發(fā)送第1個(gè)有效事件(Next事件)榆俺,即依次判斷檢查memory、disk坞淮、network
        ).firstElement()
                // 即本例的邏輯為:
                // a. firstElement()取出第1個(gè)事件 = memory茴晋,即先判斷內(nèi)存緩存中有無數(shù)據(jù)緩存;由于memoryCache = null回窘,即內(nèi)存緩存中無數(shù)據(jù)诺擅,所以發(fā)送結(jié)束事件(視為無效事件)
                // b. firstElement()繼續(xù)取出第2個(gè)事件 = disk,即判斷磁盤緩存中有無數(shù)據(jù)緩存:由于diskCache ≠ null啡直,即磁盤緩存中有數(shù)據(jù)烁涌,所以發(fā)送Next事件(有效事件)
                // c. 即firstElement()已發(fā)出第1個(gè)有效事件(disk事件)苍碟,所以停止判斷。

                .subscribe(s -> {

                    KLog.d(TTAG, "緩存獲得路徑是:" + s.toString());
                });
    }

combineLatest()實(shí)例

進(jìn)行多個(gè)輸入框判斷烹玉,有一個(gè)為空時(shí)按鈕不可點(diǎn)擊驰怎,都不為空時(shí)才可以點(diǎn)擊(并且改變輸入框顏色)

 /**
     * 通過combineLatest()合并事件 & 聯(lián)合判斷
     * <p>
     * 當(dāng)兩個(gè)Observables中的任何一個(gè)發(fā)送了數(shù)據(jù)后,
     * 將先發(fā)送了數(shù)據(jù)的Observables 的最新(最后)一個(gè)數(shù)據(jù) 與
     * 另外一個(gè)Observable發(fā)送的每個(gè)數(shù)據(jù)結(jié)合二打,最終基于該函數(shù)的結(jié)果發(fā)送數(shù)據(jù)
     */
    private void init() {

        nameObser = RxTextView.textChanges(name).skip(1);
        ageObser = RxTextView.textChanges(age).skip(1);
        jobObser = RxTextView.textChanges(job).skip(1);

        Observable.combineLatest(nameObser, ageObser, jobObser,
                (charSequence, charSequence2, charSequence3) -> {
                    boolean nameIsNOtEmpty = !TextUtils.isEmpty(name.getText());

                   // boolean nameIs = !TextUtils.isEmpty(name.getText()) && name.getText().toString().length() <= 10;
                    boolean ageIsNotEmpty = !TextUtils.isEmpty(age.getText());
                    boolean jobIsNotEmpty = !TextUtils.isEmpty(job.getText());

                    return nameIsNOtEmpty && ageIsNotEmpty && jobIsNotEmpty;
                }
        ).subscribe(aBoolean -> {

            KLog.d(TTAG, "點(diǎn)擊結(jié)果是:" + aBoolean);
            push.setEnabled(aBoolean);
        });
    }

有條件的輪詢操作:

使用關(guān)鍵字:repeatWhen

// 設(shè)置變量 = 模擬輪詢服務(wù)器次數(shù)
    private int i = 0 ;
    /**
     * 有條件的輪詢
     * 使用操作符:repeatWhen
     */
    private void init3() {


        RetrofitApi retrofitApi = OkHttpUtils.newInstance().create(RetrofitApi.class);

        retrofitApi.getCall()
                .repeatWhen(objectObservable -> {

                    // 將原始 Observable 停止發(fā)送事件的標(biāo)識(Complete() /  Error())轉(zhuǎn)換成1個(gè) Object 類型數(shù)據(jù)傳遞給1個(gè)新被觀察者(Observable)
                    // 以此決定是否重新訂閱 & 發(fā)送原來的 Observable,即輪詢
                    // 此處有2種情況:
                    // 1. 若返回1個(gè)Complete() /  Error()事件掂榔,則不重新訂閱 & 發(fā)送原來的 Observable继效,即輪詢結(jié)束
                    // 2. 若返回其余事件,則重新訂閱 & 發(fā)送原來的 Observable装获,即繼續(xù)輪詢

                    return objectObservable.flatMap((Function<Object, ObservableSource<?>>) o -> {

                        // 加入判斷條件:當(dāng)輪詢次數(shù) = 5次后瑞信,就停止輪詢
                        if (i>3){
                            return Observable.error(new Throwable("輪詢結(jié)束"));
                        }
                        // 若輪詢次數(shù)<4次,則發(fā)送1Next事件以繼續(xù)輪詢
                        // 注:此處加入了delay操作符穴豫,作用 = 延遲一段時(shí)間發(fā)送(此處設(shè)置 = 2s)凡简,以實(shí)現(xiàn)輪詢間間隔設(shè)置

                        return Observable.just(1).delay(2000, TimeUnit.MILLISECONDS);
                    });
                }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Translation>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Translation translation) {

                        translation.show();
                        i++;
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });


    }

有條件的網(wǎng)絡(luò)請求出錯(cuò),重試精肃,可以設(shè)置條件

 /**
     * 請求出錯(cuò)去重復(fù)查詢秤涩,可以設(shè)置條件
     * 使用操作符:retryWhen
     * 發(fā)送網(wǎng)絡(luò)請求 & 通過retryWhen()進(jìn)行重試
     * 主要異常才會回調(diào)retryWhen()進(jìn)行重試
     *   參數(shù)Observable<Throwable>中的泛型 = 上游操作符拋出的異常,可通過該條件來判斷異常的類型
     */

    // 設(shè)置變量
    // 可重試次數(shù)
    private int maxConnectCount = 10;
    // 當(dāng)前已重試次數(shù)
    private int currentRetryCount = 0;
    // 重試等待時(shí)間
    private int waitRetryTime = 0;

    private void init4() {

        retrofitApi.getCall()
                .retryWhen(throwableObservable ->

                        throwableObservable.flatMap((Function<Throwable, ObservableSource<?>>) throwable -> {

                            if (throwable instanceof IOException) {

                                if (currentRetryCount < maxConnectCount) {
                                    currentRetryCount++;
                                    waitRetryTime = 1000 + currentRetryCount * 1000;

                                    return Observable.just(1).delay(waitRetryTime, TimeUnit.MILLISECONDS);
                                } else {

                                    return Observable.error(new Throwable("超過重試次數(shù):" + currentRetryCount));
                                }
                            } else {
                                return Observable.error(new Throwable("發(fā)生異常司抱,非網(wǎng)絡(luò)"));
                            }
                        }))
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Translation>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                    }

                    @Override
                    public void onNext(Translation translation) {
                        translation.show();
                    }

                    @Override
                    public void onError(Throwable e) {
                    }

                    @Override
                    public void onComplete() {

                    }
                });
    }

有關(guān)過濾操作符

ofType

  /**
     * 過濾操作符
     */
    private void useOfType() {

        Observable.just(1, "asd", 2, 3, 4, "qwe")
                .ofType(Integer.class)
                .subscribe(integer -> {

                    KLog.d(TTAG, "獲得的整型消息事件是:" + integer);

                });
    }

Skip筐眷,,习柠,SkipLast

  /**
     * 跳轉(zhuǎn)開頭和跳過結(jié)尾消息
     */
    private void userSkipAndSkipLast() {

        // 使用1:根據(jù)順序跳過數(shù)據(jù)項(xiàng)
        Observable.just(1, 2, 3, 4, 5)
                .skip(1) // 跳過正序的前1項(xiàng)
                .skipLast(2) // 跳過正序的后2項(xiàng)
                .subscribe(integer -> KLog.d(TTAG, "獲取到的整型事件元素是: " + integer));

// 使用2:根據(jù)時(shí)間跳過數(shù)據(jù)項(xiàng)
        // 發(fā)送事件特點(diǎn):發(fā)送數(shù)據(jù)0-5匀谣,每隔1s發(fā)送一次,每次遞增1资溃;第1次發(fā)送延遲0s
        Observable.intervalRange(0, 5, 0, 1, TimeUnit.SECONDS)
                .skip(1, TimeUnit.SECONDS) // 跳過第1s發(fā)送的數(shù)據(jù)
                .skipLast(1, TimeUnit.SECONDS) // 跳過最后1s發(fā)送的數(shù)據(jù)
                .subscribe(along -> KLog.d(TTAG, "獲取到的整型事件元素是: " + along));
    }

throttleFirst()武翎,,throttleLast()

在某段時(shí)間內(nèi)溶锭,只發(fā)送該段時(shí)間內(nèi)第1次事件 / 最后1次事件

<<- 在某段時(shí)間內(nèi)宝恶,只發(fā)送該段時(shí)間內(nèi)第1次事件 ->>
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                // 隔段事件發(fā)送時(shí)間
                e.onNext(1);
                Thread.sleep(500);
                
                e.onNext(2);
                Thread.sleep(400);
                
                e.onNext(3);
                Thread.sleep(300);
               
                Thread.sleep(300);
                e.onComplete();
            }
        }).throttleFirst(1, TimeUnit.SECONDS)//每1秒中采用數(shù)據(jù)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "開始采用subscribe連接");
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對Error事件作出響應(yīng)");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對Complete事件作出響應(yīng)");
                    }
                });


<<- 在某段時(shí)間內(nèi),只發(fā)送該段時(shí)間內(nèi)最后1次事件 ->>
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                // 隔段事件發(fā)送時(shí)間
                e.onNext(1);
                Thread.sleep(500);

                e.onNext(2);
                Thread.sleep(400);

                Thread.sleep(300);
                e.onComplete();
            }
        }).throttleLast(1, TimeUnit.SECONDS)//每1秒中采用數(shù)據(jù)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        KLog.d(TTAG, "開始采用subscribe連接");
                    }

                    @Override
                    public void onNext(Integer value) {
                        KLog.d(TTAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        KLog.d(TTAG, "對Error事件作出響應(yīng)");
                    }

                    @Override
                    public void onComplete() {
                       K Log.d(TTAG, "對Complete事件作出響應(yīng)");
                    }
                });

實(shí)際應(yīng)用:規(guī)定時(shí)間內(nèi)暖途,多次點(diǎn)擊按鈕禁止多次操作使用throttleFirst卑惜,操作符

        RxView.clicks(button)
                .throttleFirst(2, TimeUnit.SECONDS)  // 才發(fā)送 2s內(nèi)第1次點(diǎn)擊按鈕的事件
                .subscribe(new Observer<Object>() {
                    @Override
                    public void onSubscribe(Disposable d) {  
                    }
                    @Override
                    public void onNext(Object value) {
                    }

                    @Override
                    public void onError(Throwable e) {
                    }

                    @Override
                    public void onComplete() {
                    }
                });

Sample()實(shí)例應(yīng)用實(shí)時(shí)搜索

在某段時(shí)間內(nèi),只發(fā)送該段時(shí)間內(nèi)最新(最后)1次事件,與 throttleLast() 操作符類似

throttleWithTimeout () / debounce()

發(fā)送數(shù)據(jù)事件時(shí)驻售,若2次發(fā)送事件的間隔<指定時(shí)間露久,就會丟棄前一次的數(shù)據(jù),直到指定時(shí)間內(nèi)都沒有新數(shù)據(jù)發(fā)射時(shí)才會發(fā)送后一次的數(shù)據(jù)

        RxTextView.textChanges(ed)
                .debounce(1, TimeUnit.SECONDS)
                .skip(1) //跳過 第1次請求 = 初始輸入框的空字符狀態(tài)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<CharSequence>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                    }
                    @Override
                    public void onNext(CharSequence charSequence) {
                        tv.setText("發(fā)送給服務(wù)器的字符 = " + charSequence.toString());
                    }
                    @Override
                    public void onError(Throwable e) {
                     
                    }
                    @Override
                    public void onComplete() {
                      }
                });

firstElement() 欺栗,毫痕, lastElement()

僅選取第1個(gè)元素 征峦,,最后一個(gè)元素

// 獲取第1個(gè)元素
        Observable.just(1, 2, 3, 4, 5)
                  .firstElement()
                  .subscribe(new Consumer<Integer>() {
                      @Override
                      public void accept( Integer integer) throws Exception {
                          KLog.d(TTAG,"獲取到的第一個(gè)事件是: "+ integer);
                      }
        });

// 獲取最后1個(gè)元素
        Observable.just(1, 2, 3, 4, 5)
                .lastElement()
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept( Integer integer) throws Exception {
                        KLog.d(TTAG,"獲取到的最后1個(gè)事件是: "+ integer);
                    }
                });

elementAt()

指定接收某個(gè)消息消请,根據(jù)索引栏笆,可以設(shè)置默認(rèn)消息

  private void userEleMentAt() {
        // 使用1:獲取位置索引 = 2的 元素
        // 位置索引從0開始
        Observable.just(1, 2, 3, 4, 5)
                .elementAt(2)
                .subscribe(integer -> KLog.d(TTAG,"獲取到的事件元素是: "+ integer));

// 使用2:獲取的位置索引 > 發(fā)送事件序列長度時(shí),設(shè)置默認(rèn)參數(shù)
        Observable.just(1, 2, 3, 4, 5)
                .elementAt(6,10)
                .subscribe(integer -> KLog.d(TTAG,"獲取到的事件元素是: "+ integer));
    }

elementAtOrError()

在elementAt()的基礎(chǔ)上臊泰,當(dāng)出現(xiàn)越界情況(即獲取的位置索引 > 發(fā)送事件序列長度)時(shí)蛉加,即拋出異常

 private void userElementAtOrError() {
        Observable.just(1, 2, 3, 4, 5)
                .elementAtOrError(6)
                .subscribe(integer -> KLog.d(TTAG,"獲取到的事件元素是: "+ integer));

    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市缸逃,隨后出現(xiàn)的幾起案子针饥,更是在濱河造成了極大的恐慌,老刑警劉巖需频,帶你破解...
    沈念sama閱讀 207,113評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件丁眼,死亡現(xiàn)場離奇詭異,居然都是意外死亡昭殉,警方通過查閱死者的電腦和手機(jī)苞七,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,644評論 2 381
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來挪丢,“玉大人蹂风,你說我怎么就攤上這事〕钥浚” “怎么了硫眨?”我有些...
    開封第一講書人閱讀 153,340評論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長巢块。 經(jīng)常有香客問我礁阁,道長,這世上最難降的妖魔是什么族奢? 我笑而不...
    開封第一講書人閱讀 55,449評論 1 279
  • 正文 為了忘掉前任姥闭,我火速辦了婚禮,結(jié)果婚禮上越走,老公的妹妹穿的比我還像新娘棚品。我一直安慰自己,他們只是感情好廊敌,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,445評論 5 374
  • 文/花漫 我一把揭開白布铜跑。 她就那樣靜靜地躺著,像睡著了一般骡澈。 火紅的嫁衣襯著肌膚如雪锅纺。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,166評論 1 284
  • 那天肋殴,我揣著相機(jī)與錄音囤锉,去河邊找鬼坦弟。 笑死,一個(gè)胖子當(dāng)著我的面吹牛官地,可吹牛的內(nèi)容都是我干的酿傍。 我是一名探鬼主播,決...
    沈念sama閱讀 38,442評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼驱入,長吁一口氣:“原來是場噩夢啊……” “哼赤炒!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起沧侥,我...
    開封第一講書人閱讀 37,105評論 0 261
  • 序言:老撾萬榮一對情侶失蹤可霎,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后宴杀,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,601評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡拾因,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,066評論 2 325
  • 正文 我和宋清朗相戀三年旺罢,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片绢记。...
    茶點(diǎn)故事閱讀 38,161評論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡扁达,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出蠢熄,到底是詐尸還是另有隱情跪解,我是刑警寧澤,帶...
    沈念sama閱讀 33,792評論 4 323
  • 正文 年R本政府宣布签孔,位于F島的核電站叉讥,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏饥追。R本人自食惡果不足惜图仓,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,351評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望但绕。 院中可真熱鬧救崔,春花似錦、人聲如沸捏顺。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,352評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽幅骄。三九已至劫窒,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間昌执,已是汗流浹背烛亦。 一陣腳步聲響...
    開封第一講書人閱讀 31,584評論 1 261
  • 我被黑心中介騙來泰國打工诈泼, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人煤禽。 一個(gè)月前我還...
    沈念sama閱讀 45,618評論 2 355
  • 正文 我出身青樓铐达,卻偏偏與公主長得像,于是被迫代替她去往敵國和親檬果。 傳聞我的和親對象是個(gè)殘疾皇子瓮孙,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,916評論 2 344