Rxjava2 Observable的數(shù)據(jù)過濾詳解及實(shí)例(一)

簡(jiǎn)要:

需求了解:

對(duì)于數(shù)據(jù)的觀察以及處理過程中往往有需要過濾一些不需要的數(shù)據(jù)的需求插爹,比如防抖(防止快速操作),獲取第一項(xiàng)崇败、指定序列項(xiàng)或者最后一項(xiàng)的需要,獲取指定時(shí)間內(nèi)的有效數(shù)據(jù)等奈偏。Rx中提供了豐富的數(shù)據(jù)過濾處理的操作方法。

可用于過濾和選擇Observable發(fā)射的數(shù)據(jù)序列的方法:

  • Debounce:過濾發(fā)射速率較快的數(shù)據(jù)項(xiàng)躯护,防抖操作惊来。
  • Throttle: 對(duì)數(shù)據(jù)序列進(jìn)行限流操作,可以指定獲取周期內(nèi)的指定數(shù)據(jù)項(xiàng)棺滞,也可以用于防抖裁蚁。
  • Sample: 允許通過將序列劃分為時(shí)間片段收集數(shù)據(jù)矢渊,并從每片中取出一個(gè)值來稀疏序列。
  • Distinct: 過濾掉重復(fù)數(shù)據(jù)枉证。
  • Skip: 跳過指定的N項(xiàng)數(shù)據(jù)矮男。
  • Filter: 通過函數(shù)指定過濾的數(shù)據(jù)。
  • First: 只發(fā)射第一項(xiàng)或者滿足某個(gè)條件的第一項(xiàng)數(shù)據(jù)室谚。
  • Single: 與 first 類似毡鉴,但是如果原始Observable在完成之前不是正好發(fā)射一次數(shù)據(jù),它會(huì)拋出一個(gè)NoSuchElementException 的異常通知秒赤。
  • ElementAt: 獲取原始Observable發(fā)射的數(shù)據(jù)序列指定索引位置的數(shù)據(jù)項(xiàng)猪瞬,然后當(dāng)做自己的唯一數(shù)據(jù)發(fā)射。
  • ignoreElements: 不發(fā)射任何數(shù)據(jù)入篮,只發(fā)射Observable的終止通知陈瘦。
  • Last: 只發(fā)射最后一項(xiàng)(或者滿足某個(gè)條件的最后一項(xiàng))數(shù)據(jù)。
  • Take: 只返回Observable發(fā)送數(shù)據(jù)項(xiàng)序列前面的N項(xiàng)數(shù)據(jù)潮售,忽略剩余的數(shù)據(jù)痊项。
  • TakeLast: 只發(fā)射Observable發(fā)送數(shù)據(jù)項(xiàng)序列的后N項(xiàng)數(shù)據(jù),忽略其他數(shù)據(jù)酥诽。
  • ofType: 過濾一個(gè)Observable只返回指定類型的數(shù)據(jù)鞍泉。

1. Debounce

僅在過了一段指定的時(shí)間還沒發(fā)射數(shù)據(jù)時(shí)才發(fā)射一個(gè)數(shù)據(jù)。Debounce 操作符會(huì)過濾掉發(fā)射速率過快的數(shù)據(jù)項(xiàng)盆均。

提示: 操作默認(rèn)在 computation 調(diào)度器上執(zhí)行塞弊,但是你可以指定其它的調(diào)度器漱逸。

1.1 debounce(timeout, unit)

指定每個(gè)數(shù)據(jù)發(fā)射后在 timeout 時(shí)間內(nèi)泪姨,原始數(shù)據(jù)序列中沒有下一個(gè)數(shù)據(jù)發(fā)射時(shí),發(fā)射此項(xiàng)數(shù)據(jù)饰抒,否則丟棄這項(xiàng)數(shù)據(jù)肮砾。此操作與 throttleWithTimeout 方法相同。

注意: 這個(gè)操作符會(huì)在原始數(shù)據(jù)的 onCompleted 時(shí)候直接發(fā)射發(fā)射數(shù)據(jù)袋坑,不會(huì)因?yàn)橄蘖鞫鴣G棄數(shù)據(jù)仗处。

img-debounce(timeout, unit)

實(shí)例代碼:

     // 1. debounce(long timeout, TimeUnit unit)
     // 發(fā)送一個(gè)數(shù)據(jù),如果在包含timeout時(shí)間內(nèi)枣宫,沒有第二個(gè)數(shù)據(jù)發(fā)射婆誓,那么就會(huì)發(fā)射此數(shù)據(jù),否則丟棄此數(shù)據(jù)
    Observable.create(new ObservableOnSubscribe<Integer>() {

        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);  // 下一個(gè)數(shù)據(jù)到此數(shù)據(jù)發(fā)射, 30 < timeout    --> skip            
            Thread.sleep(30);
            emitter.onNext(2);  // 下一個(gè)數(shù)據(jù)到此數(shù)據(jù)發(fā)射, 100 > timeout   --> deliver
            Thread.sleep(100);
            emitter.onNext(3);  // 下一個(gè)數(shù)據(jù)到此數(shù)據(jù)發(fā)射, 50 = timeout    --> skip:           
            Thread.sleep(50);
            emitter.onNext(4);  // 下一個(gè)數(shù)據(jù)到此數(shù)據(jù)發(fā)射, onCompleted     --> deliver
            emitter.onComplete();

        }
    }).debounce(50, TimeUnit.MILLISECONDS)  // 指定防抖丟棄時(shí)間段為50毫秒
    //  .debounce(50, TimeUnit.MILLISECONDS, Schedulers.trampoline())   // 指定調(diào)度為當(dāng)前線程排隊(duì)
        .subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("--> accept debounce(1-1): " + t);
            }
        });

輸出:

--> accept debounce(1-1): 2
--> accept debounce(1-1): 4

Javadoc: debounce(timeout, unit)
Javadoc: debounce(timeout, unit, scheduler)

1.2 debounce(debounceSelector)

原始數(shù)據(jù)發(fā)射每一個(gè)序列都通過綁定監(jiān)聽debounceSelector的數(shù)據(jù)通知也颤,在debounceSelector數(shù)據(jù)發(fā)送前洋幻,如果有下一個(gè)數(shù)據(jù),則丟棄當(dāng)前項(xiàng)數(shù)據(jù)翅娶,繼續(xù)監(jiān)視下一個(gè)數(shù)據(jù)文留。

注意: 這個(gè)操作符會(huì)在原始數(shù)據(jù)的 onCompleted 時(shí)候直接發(fā)射發(fā)射數(shù)據(jù)好唯,不會(huì)因?yàn)橄蘖鞫鴣G棄數(shù)據(jù)。

img-debounce(debounceSelector)

實(shí)例代碼:

    // 2. debounce(debounceSelector)
    // 原始數(shù)據(jù)發(fā)射每一個(gè)序列的通過監(jiān)聽debounceSelector的數(shù)據(jù)通知燥翅,
    // 在debounceSelector數(shù)據(jù)發(fā)送前骑篙,如果有下一個(gè)數(shù)據(jù),則丟棄當(dāng)前項(xiàng)數(shù)據(jù)森书,繼續(xù)監(jiān)視下一個(gè)數(shù)據(jù)
    Observable.create(new ObservableOnSubscribe<Integer>() {

        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);      // skip     --> debounceSelector is no emitter(<2s)
            Thread.sleep(1000); 
            emitter.onNext(2);      // skip     --> debounceSelector is no emitter(<2s)
            Thread.sleep(200);
            emitter.onNext(3);      // deliver  --> debounceSelector is emitter(>2s)
            Thread.sleep(2500);
            emitter.onNext(4);      // skip     --> debounceSelector is no emitter(=2s)
            Thread.sleep(2000);
            emitter.onNext(5);      // deliver  --> onComplete
            Thread.sleep(500);
            emitter.onComplete();
        }
    }).debounce(new Function<Integer, ObservableSource<Long>>() {

            @Override
            public ObservableSource<Long> apply(Integer t) throws Exception {
                System.out.println("--> apply(1-2): " + t);
                // 設(shè)置過濾延遲時(shí)間為2秒靶端,此時(shí)返回的Observable從訂閱到發(fā)送數(shù)據(jù)時(shí)間段即為timeout
                return Observable.timer(2, TimeUnit.SECONDS)
                        .doOnSubscribe(new Consumer<Disposable>() {

                            @Override
                            public void accept(Disposable t) throws Exception {
                                // 開始訂閱,監(jiān)聽數(shù)據(jù)的發(fā)送來過濾數(shù)據(jù)
                                System.out.println("--> debounceSelector(1-2) is onSubscribe!");
                            }
                        }).doOnDispose(new Action() {
        
                            @Override
                            public void run() throws Exception {
                                // 發(fā)射數(shù)據(jù)后拄氯,丟棄當(dāng)前的數(shù)據(jù)躲查,解除當(dāng)前綁定
                                System.out.println("--> debounceSelector(1-2) is unSubscribe!");
                            }
                        });
            }
        }).subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("----------> accept(1-2): " + t);
            }
        });

輸出:

--> apply(1-2): 1
--> debounceSelector(1-2) is onSubscribe!
--> debounceSelector(1-2) is unSubscribe!
--> apply(1-2): 2
--> debounceSelector(1-2) is onSubscribe!
--> debounceSelector(1-2) is unSubscribe!
--> apply(1-2): 3
--> debounceSelector(1-2) is onSubscribe!
--> debounceSelector(1-2) is unSubscribe!
----------> accept(1-2): 3
--> apply(1-2): 4
--> debounceSelector(1-2) is onSubscribe!
--> debounceSelector(1-2) is unSubscribe!
--> apply(1-2): 5
--> debounceSelector(1-2) is onSubscribe!
----------> accept(1-2): 5
--> debounceSelector(1-2) is unSubscribe!

Javadoc: debounce(debounceSelector)

2. Throttle

主要應(yīng)用于數(shù)據(jù)序列的節(jié)流操作,在指定的采樣周期內(nèi)獲取指定的數(shù)據(jù)译柏。Throttling 也用于稀疏序列镣煮。當(dāng)生產(chǎn)者發(fā)出的值超出我們想要的值時(shí),我們不需要每個(gè)序列值鄙麦,我們可以通過限制它來稀釋序列典唇。

注意: 時(shí)間的劃分不一定是統(tǒng)一的。例如胯府,發(fā)射數(shù)據(jù)的時(shí)間間隔與劃分?jǐn)?shù)據(jù)的時(shí)間間隔一致時(shí)介衔,在原始數(shù)據(jù)發(fā)送的一個(gè)時(shí)間點(diǎn)(此時(shí)數(shù)據(jù)還沒有實(shí)際發(fā)送),此時(shí)可能由于劃分時(shí)間已到骂因,劃分的數(shù)據(jù)片直接關(guān)閉了炎咖,所以有的時(shí)間片數(shù)據(jù)會(huì)有時(shí)間間隙差異。

提示: 操作默認(rèn)在 computation 調(diào)度器上執(zhí)行寒波,但是你可以指定其它的調(diào)度器乘盼。

2.1 throttleFirst(windowDuration, unit)

獲取每個(gè) windowDuration 時(shí)間段內(nèi)的原始數(shù)據(jù)序列中的第一項(xiàng)數(shù)據(jù),直到原始數(shù)據(jù)全部發(fā)送完畢俄烁。

img-throttleFirst(windowDuration, unit)

解析: 實(shí)際在每個(gè)采樣周期內(nèi)绸栅,先發(fā)送第一項(xiàng)接收到的數(shù)據(jù),然后丟棄后續(xù)周期內(nèi)的數(shù)據(jù)項(xiàng)页屠。

實(shí)例代碼:

    // 1. throttleFirst(long windowDuration, TimeUnit unit)
    // 指定每個(gè)指定時(shí)間內(nèi)取第一項(xiàng)數(shù)據(jù), 直到原始數(shù)據(jù)序列全部發(fā)送結(jié)束
    Observable.intervalRange(1, 10, 0, 1, TimeUnit.SECONDS)
        .doOnNext(new Consumer<Long>() {

            @Override
            public void accept(Long t) throws Exception {
                System.out.println("--> DataSource doOnNext : " + t);
            }
        }).throttleFirst(2, TimeUnit.SECONDS)                           // 獲取每隔2秒之內(nèi)收集的第一項(xiàng)數(shù)據(jù)
     //   .throttleFirst(2, TimeUnit.SECONDS, Schedulers.newThread())   // 指定調(diào)度線程為newThread()
          .subscribe(new Observer<Long>() {

                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("--> throttleFirst onSubscribe");
                }

                @Override
                public void onNext(Long t) {
                    System.out.println("-------------> throttleFirst onNext: " + t);
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("--> throttleFirst onError: " + e);
                }

                @Override
                public void onComplete() {
                    System.out.println("--> throttleFirst onComplete");
                }
            });

輸出:

--> throttleFirst onSubscribe
--> DataSource doOnNext : 1
-------------> throttleFirst onNext: 1
--> DataSource doOnNext : 2
--> DataSource doOnNext : 3
--> DataSource doOnNext : 4
-------------> throttleFirst onNext: 4
--> DataSource doOnNext : 5
--> DataSource doOnNext : 6
--> DataSource doOnNext : 7
-------------> throttleFirst onNext: 7
--> DataSource doOnNext : 8
--> DataSource doOnNext : 9
-------------> throttleFirst onNext: 9
--> DataSource doOnNext : 10
--> throttleFirst onComplete

Javadoc: throttleFirst(windowDuration, unit)
Javadoc: throttleFirst(windowDuration, unit, scheduler)

2.2 throttleLast(intervalDuration, unit)

獲取每個(gè) windowDuration 時(shí)間段內(nèi)的原始數(shù)據(jù)序列中的最近的一項(xiàng)數(shù)據(jù)粹胯,直到原始數(shù)據(jù)全部發(fā)送完畢。throttleLast 運(yùn)算符以固定間隔而不是相對(duì)于最后一項(xiàng)來劃分時(shí)間辰企。它會(huì)在每個(gè)窗口中發(fā)出最后一個(gè)值风纠,而不是它后面的第一個(gè)值。

img-throttleLast(intervalDuration, unit)

解析: 實(shí)際在每個(gè)采樣周期內(nèi)牢贸,先緩存收集的數(shù)據(jù)竹观,等周期結(jié)束發(fā)送最后一項(xiàng)數(shù)據(jù),丟棄最后數(shù)據(jù)項(xiàng)前面的數(shù)據(jù)十减。

實(shí)例代碼:

    // 2. throttleLast(long intervalDuration, TimeUnit unit)
    // 指定間隔時(shí)間內(nèi)取最后一項(xiàng)數(shù)據(jù)栈幸,直到原始數(shù)據(jù)序列全部發(fā)送結(jié)束
    Observable.intervalRange(1, 10, 0, 1050, TimeUnit.MILLISECONDS)
        .doOnNext(new Consumer<Long>() {

            @Override
            public void accept(Long t) throws Exception {
                System.out.println("--> DataSource doOnNext : " + t);
            }
        }).throttleLast(2, TimeUnit.SECONDS)                            // 獲取每隔2秒之內(nèi)收集的最后一項(xiàng)數(shù)據(jù)
     //   .throttleLast(2, TimeUnit.SECONDS, Schedulers.newThread())    // 指定調(diào)度線程為newThread()
          .subscribe(new Observer<Long>() {

                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("--> throttleLast onSubscribe");
                }

                @Override
                public void onNext(Long t) {
                    System.out.println("-------------> throttleLast onNext: " + t);
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("--> throttleLast onError: " + e);
                }

                @Override
                public void onComplete() {
                    System.out.println("--> throttleLast onComplete");
                }
            });

輸出:

--> throttleLast onSubscribe
--> DataSource doOnNext : 1
--> DataSource doOnNext : 2
-------------> throttleLast onNext: 2
--> DataSource doOnNext : 3
--> DataSource doOnNext : 4
-------------> throttleLast onNext: 4
--> DataSource doOnNext : 5
--> DataSource doOnNext : 6
-------------> throttleLast onNext: 6
--> DataSource doOnNext : 7
--> DataSource doOnNext : 8
-------------> throttleLast onNext: 8
--> DataSource doOnNext : 9
--> DataSource doOnNext : 10
--> throttleLast onComplete

Javadoc: throttleLast(intervalDuration, unit)
Javadoc: throttleLast(intervalDuration, unit, scheduler)

2.3 throttleWithTimeout(timeout, unit)

指定每個(gè)數(shù)據(jù)發(fā)射后在 timeout 時(shí)間內(nèi)愤估,原始數(shù)據(jù)序列中沒有下一個(gè)數(shù)據(jù)發(fā)射時(shí),發(fā)射此項(xiàng)數(shù)據(jù)速址,否則丟棄這項(xiàng)數(shù)據(jù)玩焰。此操作與 debounce 方法相同。

注意: 這個(gè)操作符會(huì)在原始數(shù)據(jù)的 onCompleted 時(shí)候直接發(fā)射發(fā)射數(shù)據(jù)芍锚,不會(huì)因?yàn)橄蘖鞫鴣G棄數(shù)據(jù)昔园。

img-throttleWithTimeout(timeout, unit)

實(shí)例代碼:

    // 3. throttleWithTimeout(long timeout, TimeUnit unit)
    // 發(fā)送一個(gè)數(shù)據(jù),如果在包含timeout時(shí)間內(nèi)并炮,沒有第二個(gè)數(shù)據(jù)發(fā)射默刚,那么就會(huì)發(fā)射此數(shù)據(jù),否則丟棄此數(shù)據(jù)
    Observable.create(new ObservableOnSubscribe<Integer>() {

        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);  // 下一個(gè)數(shù)據(jù)到此數(shù)據(jù)發(fā)射, --> skip:       30 < timeout
            Thread.sleep(30);
            emitter.onNext(2);  // 下一個(gè)數(shù)據(jù)到此數(shù)據(jù)發(fā)射, --> skip:       50 = timeout
            Thread.sleep(50);
            emitter.onNext(3);  // 下一個(gè)數(shù)據(jù)到此數(shù)據(jù)發(fā)射, --> deliver:    60 > timeout
            Thread.sleep(60);
            emitter.onNext(4);  // onComplete           --> deliver:    onComplete
            emitter.onComplete();
        }
    }).throttleWithTimeout(50, TimeUnit.MILLISECONDS) // 指定防抖丟棄時(shí)間段為50毫秒
 //   .throttleWithTimeout(50, TimeUnit.MILLISECONDS, Schedulers.newThread()) // 指定調(diào)度線程為newThread()
      .subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                // TODO Auto-generated method stub
                System.out.println("--> accept throttleWithTimeout(3): " + t);
            }
        });

輸出:

--> accept throttleWithTimeout(3): 3
--> accept throttleWithTimeout(3): 4

Javadoc: throttleWithTimeout(timeout, unit)
Javadoc: throttleWithTimeout(timeout, unit, scheduler)

3. Sample

sample 允許您通過將序列劃分為時(shí)間片段逃魄,并從每片中取出一個(gè)值來稀疏序列荤西。當(dāng)每片結(jié)束時(shí),將發(fā)出其中的最后一個(gè)值(如果有的話)伍俘。

注意: 時(shí)間的劃分不一定是統(tǒng)一的邪锌。例如,發(fā)射數(shù)據(jù)的時(shí)間間隔與劃分?jǐn)?shù)據(jù)的時(shí)間間隔一致時(shí)癌瘾,在原始數(shù)據(jù)發(fā)送的一個(gè)時(shí)間點(diǎn)(此時(shí)數(shù)據(jù)還沒有實(shí)際發(fā)送)觅丰,此時(shí)可能由于劃分時(shí)間已到,劃分的數(shù)據(jù)片直接關(guān)閉了妨退,所以有的時(shí)間片數(shù)據(jù)會(huì)有時(shí)間間隙差異妇萄。

3.1 sample(period, unit)

獲取每個(gè) period 時(shí)間片段內(nèi)手機(jī)收據(jù)序列的最后一項(xiàng),忽略此時(shí)間片內(nèi)收集的其他數(shù)據(jù)項(xiàng)咬荷。

實(shí)例代碼:

    // 1. sample(long period, TimeUnit unit)/sample(long period, TimeUnit unit)
    // 將序列分為 period 的時(shí)間片段冠句,從每片重取出最近的一個(gè)數(shù)據(jù)
    // 等同于throttleLast
    Observable.intervalRange(1, 5, 0, 1100, TimeUnit.MILLISECONDS)
        .doOnNext(new Consumer<Long>() {

            @Override
            public void accept(Long t) throws Exception {
                System.out.println("--> DataSource onNext: " + t);
            }
        }).sample(2, TimeUnit.SECONDS)                              // 每3秒時(shí)間段數(shù)據(jù)中取最近一個(gè)值
    //    .sample(2, TimeUnit.SECONDS, true)                        // 參數(shù)emitLast,設(shè)置是否忽略未采樣的最后一個(gè)數(shù)據(jù)
    //    .sample(2, TimeUnit.SECONDS, Schedulers.newThread())      // 指定調(diào)度器為newThread()
          .subscribe(new Consumer<Long>() {
    
                @Override
                public void accept(Long t) throws Exception {
                    System.out.println("--> accept(1): " + t);
                }
          });

輸出:

--> DataSource onNext: 1
--> DataSource onNext: 2
--> accept(1): 2
--> DataSource onNext: 3
--> DataSource onNext: 4
--> accept(1): 4
--> DataSource onNext: 5

Javadoc: sample(long period, TimeUnit unit)
Javadoc: sample(long period, TimeUnit unit, emitLast)
Javadoc: sample(long period, TimeUnit unit, scheduler)
Javadoc: sample(long period, TimeUnit unit, scheduler, emitLast)

3.2 sample(sampler)

sample 的這個(gè)方法每當(dāng)?shù)诙€(gè) sampler 發(fā)射一個(gè)數(shù)據(jù)(或者當(dāng)它終止)時(shí)就對(duì)原始 Observable 進(jìn)行采樣萍丐。第二個(gè)Observable通過參數(shù)傳遞給 sample 轩端。

img-sample(sampler)

實(shí)例代碼:

    // 2. sample(ObservableSource sampler)
    // 每當(dāng)?shù)诙€(gè) sampler 發(fā)射一個(gè)數(shù)據(jù)(或者當(dāng)它終止)時(shí)就對(duì)原始 Observable進(jìn)行采樣
    Observable.intervalRange(1, 5, 0, 1020, TimeUnit.MILLISECONDS)
        .doOnNext(new Consumer<Long>() {

                @Override
                public void accept(Long t) throws Exception {
                    System.out.println("--> DataSource onNext: " + t);
                }
        }).sample(Observable.interval(2, TimeUnit.SECONDS)) // 每隔2秒進(jìn)行一次采樣
          .subscribe(new Consumer<Long>() {

                @Override
                public void accept(Long t) throws Exception {
                    System.out.println("--> accept(2): " + t);
                }
          });

輸出:

--> DataSource onNext: 1
--> DataSource onNext: 2
--> accept(2): 2
--> DataSource onNext: 3
--> DataSource onNext: 4
--> accept(2): 4
--> DataSource onNext: 5

Javadoc: sample(sampler)
Javadoc: sample(sampler, emitLast)

4. Distinct

抑制(過濾掉)重復(fù)的數(shù)據(jù)項(xiàng)放典。Distinct 的過濾規(guī)則是:只允許還沒有發(fā)射過的數(shù)據(jù)項(xiàng)通過逝变。

在某些實(shí)現(xiàn)中,有一些方法中允許你調(diào)整判定兩個(gè)數(shù)據(jù)不同( distinct )的標(biāo)準(zhǔn)奋构。還有一些實(shí)現(xiàn)只比較一項(xiàng)數(shù)據(jù)和它的直接前驅(qū)壳影,因此只會(huì)從序列中過濾掉連續(xù)重復(fù)的數(shù)據(jù)。

4.1 distinct()

只允許還沒有發(fā)射過的數(shù)據(jù)項(xiàng)通過弥臼,過濾數(shù)據(jù)序列中的所有重復(fù)的數(shù)據(jù)項(xiàng)宴咧,保證處理后的數(shù)據(jù)序列沒有重復(fù)。

img-distinct

示例代碼:

    // 1. distinct()
    // 去除全部數(shù)據(jù)中重復(fù)的數(shù)據(jù)
    Observable.just(1, 2, 3, 3, 3, 4, 4, 5, 6, 6)
            .distinct()
            .subscribe(new Consumer<Integer>() {
            
                @Override
                public void accept(Integer t) throws Exception {
                    System.out.println("--> accept distinct(1): " + t);
                }
            });

輸出:

--> accept distinct(1): 1
--> accept distinct(1): 2
--> accept distinct(1): 3
--> accept distinct(1): 4
--> accept distinct(1): 5
--> accept distinct(1): 6

Javadoc: distinct()

4.2 distinct(keySelector)

這個(gè)操作符接受一個(gè)函數(shù)径缅。這個(gè)函數(shù)根據(jù)原始Observable發(fā)射的數(shù)據(jù)項(xiàng)產(chǎn)生一個(gè) Key掺栅,然后烙肺,比較這些Key而不是數(shù)據(jù)本身,來判定兩個(gè)數(shù)據(jù)是否是不同的氧卧。

img-distinct(keySelector)

實(shí)例代碼:

    // 數(shù)根據(jù)原始Observable發(fā)射的數(shù)據(jù)項(xiàng)產(chǎn)生一個(gè) Key桃笙,然后比較這些Key而不是數(shù)據(jù)本身,來判定兩個(gè)數(shù)據(jù)是否是不同的(去除全部數(shù)據(jù)中重復(fù)的數(shù)據(jù))
    Observable.just(1, 2, 3, 3, 4, 5, 6, 6)
            .distinct(new Function<Integer, String>() {

                @Override
                public String apply(Integer t) throws Exception {
                    // 根據(jù)奇數(shù)或偶數(shù)來判斷數(shù)據(jù)序列的重復(fù)的key
                    return t % 2 == 0 ? "even" : "odd";
                }
            }).subscribe(new Consumer<Integer>() {

                @Override
                public void accept(Integer t) throws Exception {
                    System.out.println("--> accept distinct(2): " + t);
                }
            });

輸出:

--> accept distinct(2): 1
--> accept distinct(2): 2

Javadoc: distinct(keySelector)

4.3 distinctUntilChanged()

distinctUntilChanged 操作符沙绝,去除數(shù)據(jù)序列中的連續(xù)重復(fù)項(xiàng)搏明。它只判定一個(gè)數(shù)據(jù)和它的直接前驅(qū)是否是不同的。

img-distinctUntilChanged

實(shí)例代碼:

    // 3. distinctUntilChanged()
    // 去除連續(xù)重復(fù)的數(shù)據(jù)
    Observable.just(1, 2, 3, 3, 4, 5, 6, 6, 3, 2)
        .distinctUntilChanged()
        .subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("--> accept distinctUntilChanged(3): " + t);
            }
        });

輸出:

--> accept distinctUntilChanged(3): 1
--> accept distinctUntilChanged(3): 2
--> accept distinctUntilChanged(3): 3
--> accept distinctUntilChanged(3): 4
--> accept distinctUntilChanged(3): 5
--> accept distinctUntilChanged(3): 6
--> accept distinctUntilChanged(3): 3
--> accept distinctUntilChanged(3): 2

Javadoc: distinctUntilChanged()

4.4 distinctUntilChanged(keySelector)

distinctUntilChanged(keySelector) 操作符闪檬,根據(jù)一個(gè)函數(shù)產(chǎn)生的 Key 判定兩個(gè)相鄰的數(shù)據(jù)項(xiàng)是不是相同的星著,去除連續(xù)重復(fù)的數(shù)據(jù)。

實(shí)例代碼:

    // 4. distinctUntilChanged(Function<T,K>)
    // 數(shù)根據(jù)原始Observable發(fā)射的數(shù)據(jù)項(xiàng)產(chǎn)生的 Key粗悯,去除連續(xù)重復(fù)的數(shù)據(jù)
    Observable.just(8, 2, 3, 5, 9, 5, 6, 6)
            .distinctUntilChanged(new Function<Integer, String>() {

                @Override
                public String apply(Integer t) throws Exception {
                    // 根據(jù)原始數(shù)據(jù)處理后添加key虚循,依據(jù)這個(gè)key來判斷是否重復(fù)(去除連續(xù)重復(fù)的數(shù)據(jù))
                    return t % 2 == 0 ? "even" : "odd";
                }
            }).subscribe(new Consumer<Integer>() {

                @Override
                public void accept(Integer t) throws Exception {
                    System.out.println("--> accept distinctUntilChanged(4): " + t);
                }
            });

輸出:

--> accept distinctUntilChanged(4): 8
--> accept distinctUntilChanged(4): 3
--> accept distinctUntilChanged(4): 6

Javadoc: distinctUntilChanged(keySelector)

5. Skip

主要用于忽略O(shè)bservable發(fā)射的指定的 N 項(xiàng)數(shù)據(jù),如跳過數(shù)據(jù)序列的前面或后面 N 項(xiàng)數(shù)據(jù)样傍,指定時(shí)間段內(nèi)的數(shù)據(jù)項(xiàng)邮丰。

Skip 操作符的還有一些變體的操作方法如下:

5.1 skip(count)

忽略 Observable 發(fā)射的前 N 項(xiàng)數(shù)據(jù),只保留之后的數(shù)據(jù)铭乾。

img-skip(count)

實(shí)例代碼:

    // 1. skip(long count)
    // 跳過前count項(xiàng)數(shù)據(jù)剪廉,保留后面的數(shù)據(jù)
    Observable.range(1, 10)
        .skip(5) // 過濾數(shù)據(jù)序列前5項(xiàng)數(shù)據(jù)
        .subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("--> accept skip(1): " + t);
            }
        });

輸出:

--> accept skip(1): 6
--> accept skip(1): 7
--> accept skip(1): 8
--> accept skip(1): 9
--> accept skip(1): 10

Javadoc: skip(count)

5.2 skip(time, unit)

skip 的這個(gè)變體接受一個(gè)時(shí)長(zhǎng)參數(shù),它會(huì)丟棄原始Observable開始的那段時(shí)間段發(fā)射的數(shù)據(jù)炕檩,時(shí)長(zhǎng)和時(shí)間單位通過參數(shù)指定斗蒋。

img-skip(time, unit)

實(shí)例代碼:

    // 2. skip(long time, TimeUnit unit)
    // 跳過開始的time時(shí)間段內(nèi)的數(shù)據(jù),保留后面的數(shù)據(jù)
    Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS)
        .skip(2, TimeUnit.SECONDS)  // 跳過前2秒的數(shù)據(jù)
        .subscribe(new Consumer<Long>() {

            @Override
            public void accept(Long t) throws Exception {
                System.out.println("--> accept skip(2): " + t);
            }
        });

輸出:

--> accept skip(2): 4
--> accept skip(2): 5

Javadoc: skip(time, unit)
Javadoc: skip(time, unit, scheduler)

5.3 skipLast(count)

使用 SkipLast 操作符修改原始Observable笛质,你可以忽略O(shè)bservable發(fā)射的后 N 項(xiàng)數(shù)據(jù)泉沾,只保留前面的數(shù)據(jù)。

img-skipLast(count)

實(shí)例代碼:

    // 3. skipLast(int count)
    // 跳過數(shù)據(jù)后面的count個(gè)數(shù)據(jù)
    Observable.range(1, 10)
        .skipLast(5) // 跳過數(shù)據(jù)序列的后5項(xiàng)數(shù)據(jù)
        .subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("--> accept skipLast(3): " + t);
            }
        });

輸出:

--> accept skipLast(3): 1
--> accept skipLast(3): 2
--> accept skipLast(3): 3
--> accept skipLast(3): 4
--> accept skipLast(3): 5

Javadoc: skipLast(count)

5.4 skipLast(time, unit)

還有一個(gè) skipLast 變體接受一個(gè)時(shí)間段參數(shù)妇押,它會(huì)丟棄在原始 Observable 的生命周期內(nèi)最后一段時(shí)間內(nèi)發(fā)射的數(shù)據(jù)跷究。時(shí)長(zhǎng)和時(shí)間單位通過參數(shù)指定。

注意: 這個(gè)機(jī)制是這樣實(shí)現(xiàn)的:延遲原始 Observable 發(fā)射的任何數(shù)據(jù)項(xiàng)敲霍,直到自原始數(shù)據(jù)發(fā)射之后過了給定的時(shí)長(zhǎng)之后俊马,才開始發(fā)送數(shù)據(jù)。

img-skipLast(time, unit)

實(shí)例代碼:

    // 4. skipLast(long time, TimeUnit unit, [boolean delayError])
    // 丟棄在原始Observable的生命周 期內(nèi)最后time時(shí)間內(nèi)發(fā)射的數(shù)據(jù)
    // 可選參數(shù)delayError:延遲異常通知
    Observable.intervalRange(1, 10, 0, 1, TimeUnit.SECONDS)
        .doOnNext(new Consumer<Long>() {

            @Override
            public void accept(Long t) throws Exception {
                System.out.println("--> DataSource: " + t);
            }
        }).skipLast(2, TimeUnit.SECONDS)
    //    .skipLast(2, TimeUnit.SECONDS, Schedulers.trampoline()) // 通過scheduler指定工作線程
    //    .skipLast(2, TimeUnit.SECONDS, true)                  // 延遲Error的通知肩杈,多用于組合Observable的場(chǎng)景
          .subscribe(new Consumer<Long>() {

              @Override
              public void accept(Long t) throws Exception {
                  System.out.println("--> accept skipLast(4): " + t);
              }
          });

輸出:

--> DataSource: 1
--> DataSource: 2
--> DataSource: 3
--> accept skipLast(4): 1
--> DataSource: 4
--> accept skipLast(4): 2
--> DataSource: 5
--> accept skipLast(4): 3
--> DataSource: 6
--> accept skipLast(4): 4
--> DataSource: 7
--> accept skipLast(4): 5
--> DataSource: 8
--> accept skipLast(4): 6
--> DataSource: 9
--> accept skipLast(4): 7
--> DataSource: 10
--> accept skipLast(4): 8

注意: skipLast 的這個(gè)操作默認(rèn)在 computation 調(diào)度器上執(zhí)行柴我,但是你可以使用Scheduler參數(shù)指定其 它的調(diào)度器。
Javadoc: skipLast(time, unit)
Javadoc: skipLast(time, unit, delayError)
Javadoc: skipLast(time, unit, scheduler)
Javadoc: skipLast(time, unit, scheduler, delayError)
Javadoc: skipLast(time, unit, scheduler, delayError, bufferSize)

接續(xù):

后續(xù)的Rx相關(guān)數(shù)據(jù)過濾部分請(qǐng)參考: Rxjava2 Observable的數(shù)據(jù)過濾詳解及實(shí)例(二)

Rx介紹與講解及完整目錄參考:Rxjava2 介紹與詳解實(shí)例

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末扩然,一起剝皮案震驚了整個(gè)濱河市艘儒,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌,老刑警劉巖界睁,帶你破解...
    沈念sama閱讀 218,284評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件觉增,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡翻斟,警方通過查閱死者的電腦和手機(jī)抑片,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,115評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來杨赤,“玉大人敞斋,你說我怎么就攤上這事〖采” “怎么了植捎?”我有些...
    開封第一講書人閱讀 164,614評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)阳柔。 經(jīng)常有香客問我焰枢,道長(zhǎng),這世上最難降的妖魔是什么舌剂? 我笑而不...
    開封第一講書人閱讀 58,671評(píng)論 1 293
  • 正文 為了忘掉前任济锄,我火速辦了婚禮,結(jié)果婚禮上霍转,老公的妹妹穿的比我還像新娘荐绝。我一直安慰自己,他們只是感情好避消,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,699評(píng)論 6 392
  • 文/花漫 我一把揭開白布低滩。 她就那樣靜靜地躺著,像睡著了一般岩喷。 火紅的嫁衣襯著肌膚如雪恕沫。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,562評(píng)論 1 305
  • 那天纱意,我揣著相機(jī)與錄音婶溯,去河邊找鬼。 笑死偷霉,一個(gè)胖子當(dāng)著我的面吹牛迄委,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播腾它,決...
    沈念sama閱讀 40,309評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼跑筝,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼死讹!你這毒婦竟也來了瞒滴?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,223評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎妓忍,沒想到半個(gè)月后虏两,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,668評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡世剖,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,859評(píng)論 3 336
  • 正文 我和宋清朗相戀三年定罢,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片旁瘫。...
    茶點(diǎn)故事閱讀 39,981評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡祖凫,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出酬凳,到底是詐尸還是另有隱情惠况,我是刑警寧澤,帶...
    沈念sama閱讀 35,705評(píng)論 5 347
  • 正文 年R本政府宣布宁仔,位于F島的核電站稠屠,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏翎苫。R本人自食惡果不足惜权埠,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,310評(píng)論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望煎谍。 院中可真熱鬧攘蔽,春花似錦、人聲如沸呐粘。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,904評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)事哭。三九已至漫雷,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間鳍咱,已是汗流浹背降盹。 一陣腳步聲響...
    開封第一講書人閱讀 33,023評(píng)論 1 270
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留谤辜,地道東北人蓄坏。 一個(gè)月前我還...
    沈念sama閱讀 48,146評(píng)論 3 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像丑念,于是被迫代替她去往敵國(guó)和親涡戳。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,933評(píng)論 2 355