Rxjava2 Observable的輔助操作詳解及實例(二)

接續(xù)上篇: Rxjava2 Observable的輔助操作詳解及實例(一)

8. TimeInterval

將一個發(fā)射數(shù)據(jù)的Observable轉(zhuǎn)換為發(fā)射那些數(shù)據(jù)發(fā)射時間間隔的Observable旭从。

img-TimeInterval

TimeInterval 操作符攔截原始Observable發(fā)射的數(shù)據(jù)項胧谈,替換為發(fā)射表示相鄰發(fā)射物時間間隔的對象。

這個操作符將原始 Observable 轉(zhuǎn)換為另一個 Observable ,后者發(fā)射一個標志替換前者的數(shù)據(jù)項扮念,這個標志表示前者的兩個連續(xù)發(fā)射物之間流逝的時間長度。新的Observable的第一個發(fā)射物表示的是在觀察者訂閱原始 Observable 到原始 Observable 發(fā)射它的第一項數(shù)據(jù)之間流逝的時間長度。不存在與原始 Observable 發(fā)射最后一項數(shù)據(jù)和發(fā)射 onCompleted 通知之間時長對應(yīng)的發(fā)射物。

示例代碼:

    /**
     * 1. timeInterval(Scheduler scheduler)
     *  scheduler: 可選參數(shù)削锰,指定調(diào)度線程
     *  接收原始數(shù)據(jù)項,發(fā)射射表示相鄰發(fā)射物時間間隔的對象
     */
    Observable.intervalRange(1, 10, 100, 100, TimeUnit.MILLISECONDS)
            .timeInterval()
         // .timeInterval(Schedulers.newThread())       // 指定工作線程
            .subscribe(new Observer<Timed<Long>>() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("--> onSubscribe(1)");
                }

                @Override
                public void onNext(Timed<Long> longTimed) {
                    long time = longTimed.time();           // 連續(xù)數(shù)據(jù)間的間隔時間
                    TimeUnit unit = longTimed.unit();       // 連續(xù)數(shù)據(jù)間的時間間隔單位
                    Long value = longTimed.value();         // Observable發(fā)送的數(shù)據(jù)項
                    System.out.println("--> onNext(1): " + longTimed.toString());
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("--> onError(1): " + e);
                }

                @Override
                public void onComplete() {
                    System.out.println("--> onComplete(1)");
                }
            });

    System.in.read();
    System.out.println("-------------------------------------------------");
    /**
     *  2. timeInterval(TimeUnit unit, Scheduler scheduler)
     *  指定時間間隔單位和指定工作線程毕莱,接收原始數(shù)據(jù)項器贩,發(fā)射射表示相鄰發(fā)射物時間間隔的對象
     */
    Observable.intervalRange(1, 10, 1000, 1200, TimeUnit.MILLISECONDS)
        //  .timeInterval(TimeUnit.SECONDS)                             // 指定時間間隔單位
            .timeInterval(TimeUnit.SECONDS, Schedulers.newThread())     // 指定時間間隔單位和指定工作線程
            .subscribe(new Observer<Timed<Long>>() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("--> onSubscribe(2)");
                }

                @Override
                public void onNext(Timed<Long> longTimed) {
                    System.out.println("--> onNext(2): " + longTimed.toString());
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("--> onError(2): " + e);
                }

                @Override
                public void onComplete() {
                    System.out.println("--> onComplete(2)");
                }
            });

    System.in.read();

輸出:

--> onSubscribe(1)
--> onNext(1): Timed[time=104, unit=MILLISECONDS, value=1]
--> onNext(1): Timed[time=100, unit=MILLISECONDS, value=2]
--> onNext(1): Timed[time=100, unit=MILLISECONDS, value=3]
--> onNext(1): Timed[time=100, unit=MILLISECONDS, value=4]
--> onNext(1): Timed[time=100, unit=MILLISECONDS, value=5]
--> onNext(1): Timed[time=100, unit=MILLISECONDS, value=6]
--> onNext(1): Timed[time=100, unit=MILLISECONDS, value=7]
--> onNext(1): Timed[time=100, unit=MILLISECONDS, value=8]
--> onNext(1): Timed[time=100, unit=MILLISECONDS, value=9]
--> onNext(1): Timed[time=100, unit=MILLISECONDS, value=10]
--> onComplete(1)
-------------------------------------------------
--> onSubscribe(2)
--> onNext(2): Timed[time=1, unit=SECONDS, value=1]
--> onNext(2): Timed[time=1, unit=SECONDS, value=2]
--> onNext(2): Timed[time=1, unit=SECONDS, value=3]
--> onNext(2): Timed[time=1, unit=SECONDS, value=4]
--> onNext(2): Timed[time=1, unit=SECONDS, value=5]
--> onNext(2): Timed[time=2, unit=SECONDS, value=6]
--> onNext(2): Timed[time=1, unit=SECONDS, value=7]
--> onNext(2): Timed[time=1, unit=SECONDS, value=8]
--> onNext(2): Timed[time=1, unit=SECONDS, value=9]
--> onNext(2): Timed[time=1, unit=SECONDS, value=10]
--> onComplete(2)

Javadoc: timeInterval()
Javadoc: timeInterval(Scheduler scheduler)
Javadoc: timeInterval(TimeUnit unit)
Javadoc: timeInterval(TimeUnit unit, Scheduler scheduler)

9. Timeout

對原始Observable的一個鏡像,如果過了一個指定的時長仍沒有發(fā)射數(shù)據(jù)朋截,它會發(fā)一個錯誤通知蛹稍。

RxJava中的實現(xiàn)為 timeout 操作符,具有多個不同的變體部服。

9.1 timeout(timeout, timeUnit)

如果原始 Observable 過了指定的一段時長沒有發(fā)射任何數(shù)據(jù)唆姐,Timeout操作符會以一個 onError 通知終止這個Observable。

img-TImeout

示例代碼:

    /**
     *  1. timeout(long timeout, TimeUnit timeUnit)
     *  接受一個時長參數(shù)饲宿,如果在指定的時間段內(nèi)沒有數(shù)據(jù)項發(fā)射,將會發(fā)射一個Error通知胆描,
     *  或者每當原始Observable發(fā)射了一項數(shù)據(jù)瘫想, timeout 就啟動一個計時器,
     *  如果計時器超過了指定指定的時長而原始Observable沒有發(fā)射另一項數(shù)據(jù)昌讲, 
     *  就拋出 TimeoutException 国夜,以一個錯誤通知終止Observable。
     */
    Observable.create(new ObservableOnSubscribe<Long>() {
        @Override
        public void subscribe(ObservableEmitter<Long> emitter) throws Exception {
            //  Thread.sleep(2000);     // 延遲2秒后發(fā)射數(shù)據(jù)短绸,此時會有TimeoutException
            emitter.onNext(1L);
            Thread.sleep(2000);     // 延遲2秒后發(fā)射數(shù)據(jù)车吹,此時會有TimeoutException
            emitter.onNext(2L);
            emitter.onComplete();
        }
    }).timeout(1, TimeUnit.SECONDS)     // 指定超時時間段為1秒
      .subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("--> onSubscribe(1)");
            }

            @Override
            public void onNext(Long aLong) {
                System.out.println("--> onNext(1): " + aLong);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("--> onError(1): " + e);
            }

            @Override
            public void onComplete() {
                System.out.println("--> onComplete(1)");
            }
      });

    System.in.read();

輸出:

--> onSubscribe(1)
--> onNext(1): 1
--> onError(1): java.util.concurrent.TimeoutException: The source did not signal an event for 1 seconds and has been terminated.

Javadoc: timeout(long timeout, TimeUnit timeUnit)

9.2 timeout(timeout, timeUnit, scheduler, other)

在指定時間段后超時時會切換到使用一個你指定的備用的 Observable,而不是發(fā)onError通知醋闭,可以通過scheduler 來指定工作線程窄驹。

img-Timeout-Other

示例代碼:

    /**
     *  2. timeout(long timeout, TimeUnit timeUnit,
     *  Scheduler scheduler,        // 可選參數(shù),指定線程調(diào)度器
     *  ObservableSource other      // 可選參數(shù)证逻,超時備用Observable
     *  )
     *
     *  在指定時間段后超時時會切換到使用一個你指定的備用的Observable乐埠,而不是發(fā)onError通知。
     */
    Observable.create(new ObservableOnSubscribe<Long>() {
        @Override
        public void subscribe(ObservableEmitter<Long> emitter) throws Exception {
            //  Thread.sleep(2000);     // 延遲2秒后發(fā)射數(shù)據(jù),此時會有TimeoutException
            emitter.onNext(1L);
            Thread.sleep(2000);         // 延遲2秒后發(fā)射數(shù)據(jù)丈咐,此時會有TimeoutException
            emitter.onNext(2L);
            emitter.onComplete();
        }
    }).timeout(1, TimeUnit.SECONDS,             // 指定超時時間段為1秒
            Schedulers.newThread(),             // 指定工作線程為子線程
            Observable.just(888L))              // 超時后默認發(fā)射的Observable
            .subscribe(new Observer<Long>() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("--> onSubscribe(2)");
                }

                @Override
                public void onNext(Long aLong) {
                    System.out.println("--> onNext(2): " + aLong);
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("--> onError(2): " + e);
                }

                @Override
                public void onComplete() {
                    System.out.println("--> onComplete(2)");
                }
            });
    
    System.in.read();

輸出:

--> onSubscribe(2)
--> onNext(2): 1
--> onNext(2): 888
--> onComplete(2)

Javadoc: timeout(long timeout, TimeUnit timeUnit, Scheduler scheduler, ObservableSource other)

9.3 timeout(Function itemTimeoutIndicator, ObservableSource other)

使用一個函數(shù) itemTimeoutIndicator 針對原始 Observable 的每一項返回一個 Observable瑞眼,如果當這個 Observable 終止時原始 Observable 還沒有發(fā)射另一項數(shù)據(jù),就會認為是超時了棵逊,如果沒有指定超時備用的 other伤疙,就拋出 TimeoutException,以一個錯誤通知終止 bservable辆影,否則超時后發(fā)射備用的 Observable徒像。

img-Timeout-Function-Other

示例代碼:

    /**
     *  3. timeout(Function<T, ObservableSource> itemTimeoutIndicator
     *  ObservableSource other      // 可選參數(shù),當超時后發(fā)射的備用Observable
     *  )
     *  對原始Observable的每一項返回一個Observable秸歧,
     *  如果當這個Observable終止時原始Observable還沒有發(fā)射另一項數(shù)據(jù)厨姚,就會認為是超時了,
     *  如果沒有指定超時備用的Observable键菱,就拋出TimeoutException谬墙,以一個錯誤通知終止Observable,
     *  否則超時后發(fā)射備用的Observable经备。
     */
    Observable.create(new ObservableOnSubscribe<Long>() {
        @Override
        public void subscribe(ObservableEmitter<Long> emitter) throws Exception {
            emitter.onNext(1L);
            Thread.sleep(3000);     // 延遲3秒后發(fā)射數(shù)據(jù)拭抬,此時會有TimeoutException
            emitter.onNext(2L);
            emitter.onComplete();
        }
    }).timeout(new Function<Long, ObservableSource<Long>>() {
        @Override
        public ObservableSource<Long> apply(Long aLong) throws Exception {
            // 為每一個原始數(shù)據(jù)發(fā)射一個Observable來指示下一個數(shù)據(jù)發(fā)射的Timeout,這里指定1秒超時時間
            return Observable.timer(1, TimeUnit.SECONDS);
        }
    }, Observable.just(888L))  // 超時后默認發(fā)射的Observable
            .subscribe(new Observer<Long>() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("--> onSubscribe(3)");
                }

                @Override
                public void onNext(Long aLong) {
                    System.out.println("--> onNext(3): " + aLong);
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("--> onError(3): " + e);
                }

                @Override
                public void onComplete() {
                    System.out.println("--> onComplete(3)");
                }
            });

    System.in.read();

輸出:

--> onSubscribe(3)
--> onNext(3): 1
--> onNext(3): 888
--> onComplete(3)

Javadoc: timeout(Function<T, ObservableSource> itemTimeoutIndicator)
Javadoc: timeout(Function<T, ObservableSource> itemTimeoutIndicator, ObservableSource other)

10. Timestamp

給Observable發(fā)射的數(shù)據(jù)項附加一個指定的時間戳侵蒙。

img-Timestamp

timestamp 造虎,它將一個發(fā)射Timed類型數(shù)據(jù)的Observable轉(zhuǎn)換為一個發(fā)射類型為 Timestamped<Timed> 的數(shù)據(jù)的Observable,每一項都包含數(shù)據(jù)的原始發(fā)射時間信息和原始數(shù)據(jù)纷闺。

示例代碼:

    /**
     *  1. timestamp(Scheduler scheduler)
     *  scheduler: 可選參數(shù)算凿,指定線程調(diào)度器
     *
     *  給Observable發(fā)射的數(shù)據(jù)項附加一個時間戳信息
     */
    Observable.intervalRange(1, 5, 1, 100, TimeUnit.MILLISECONDS)
            .timestamp(Schedulers.newThread())      // 指定在子線程調(diào)度處理
            .subscribe(new Observer<Timed<Long>>() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("--> onSubscribe(1)");
                }

                @Override
                public void onNext(Timed<Long> longTimed) {
                    long time = longTimed.time();           // 連續(xù)數(shù)據(jù)間的間隔時間
                    TimeUnit unit = longTimed.unit();       // 連續(xù)數(shù)據(jù)間的時間間隔單位
                    Long value = longTimed.value();         // Observable發(fā)送的數(shù)據(jù)項
                    System.out.println("--> onNext(1): " + longTimed);
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("--> onError(1): " + e);
                }

                @Override
                public void onComplete() {
                    System.out.println("--> onComplete(1)");
                }
            });

    System.in.read();
    System.out.println("-------------------------------------------");
    /**
     *  2. timestamp(TimeUnit unit, Scheduler scheduler)
     *  scheduler: 可選參數(shù),指定線程調(diào)度器
     *
     *  給Observable發(fā)射的數(shù)據(jù)項附加一個指定單位的時間戳信息
     */
    Observable.intervalRange(1, 5, 1, 1200, TimeUnit.MILLISECONDS)
            .timestamp(TimeUnit.SECONDS, Schedulers.newThread())    // 指定時間單位為秒犁功,在子線程調(diào)度處理
            .subscribe(new Observer<Timed<Long>>() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("--> onSubscribe(2)");
                }

                @Override
                public void onNext(Timed<Long> longTimed) {
                    System.out.println("--> onNext(2): " + longTimed);
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("--> onError(2): " + e);
                }

                @Override
                public void onComplete() {
                    System.out.println("--> onComplete(2)");
                }
            });

    System.in.read();

輸出:

--> onSubscribe(1)
--> onNext(1): Timed[time=1577455367446, unit=MILLISECONDS, value=1]
--> onNext(1): Timed[time=1577455367545, unit=MILLISECONDS, value=2]
--> onNext(1): Timed[time=1577455367645, unit=MILLISECONDS, value=3]
--> onNext(1): Timed[time=1577455367745, unit=MILLISECONDS, value=4]
--> onNext(1): Timed[time=1577455367845, unit=MILLISECONDS, value=5]
--> onComplete(1)
-------------------------------------------
--> onSubscribe(2)
--> onNext(2): Timed[time=1577455369, unit=SECONDS, value=1]
--> onNext(2): Timed[time=1577455370, unit=SECONDS, value=2]
--> onNext(2): Timed[time=1577455371, unit=SECONDS, value=3]
--> onNext(2): Timed[time=1577455373, unit=SECONDS, value=4]
--> onNext(2): Timed[time=1577455374, unit=SECONDS, value=5]
--> onComplete(2)

Javadoc: timestamp()
Javadoc: timestamp(Scheduler scheduler)
Javadoc: timestamp(TimeUnit unit)
Javadoc: timestamp(TimeUnit unit, Scheduler scheduler)

11. Using

創(chuàng)建一個只在Observable生命周期內(nèi)存在的一次性資源氓轰。

Using 操作符讓你可以指示Observable創(chuàng)建一個只在它的生命周期內(nèi)存在的資源,當Observable終止時這個資源會被自動釋放浸卦。

img-Using

using 操作符接受三個參數(shù):

  1. observableFactory:一個用戶創(chuàng)建一次性資源的工廠函數(shù)
  2. resourceFactory:一個用于創(chuàng)建Observable的工廠函數(shù)
  3. disposeFunction:一個用于釋放資源的函數(shù)

當一個觀察者訂閱 using 返回的Observable時署鸡, using 將會使用Observable工廠函數(shù)創(chuàng)建觀察者要觀察的Observable,同時使用資源工廠函數(shù)創(chuàng)建一個你想要創(chuàng)建的資源限嫌。當觀察者取消訂閱這個Observable時靴庆,或者當觀察者終止時(無論是正常終止還是因錯誤而終止), using 使用第三個函數(shù)釋放它創(chuàng)建的資源怒医。

示例代碼:

    /**
     * 用于在Observable的生命周期內(nèi)存在的資源對象
     */
    class MyResource {
        private String resource;

        public MyResource(String resource) {
            this.resource = resource;
        }

        @Override
        public String toString() {
            return "MyResource{" +
                    "resource='" + resource + '\'' +
                    '}';
        }

        public void releaseResource() {
            System.out.println("----> MyResource resource is release. ");
            resource = null;
        }
    }
    
    /**
     *  1. using(Callable resourceSupplier, Function sourceSupplier, Consumer disposer, boolean eager)
     *
     *  resourceSupplier:   // 一個用戶創(chuàng)建一次性資源的工廠函數(shù)
     *  sourceSupplier:     // 一個用于創(chuàng)建Observable的工廠函數(shù)
     *  disposer:           // 一個用于釋放資源的函數(shù)
     *  eager:              // 可選參數(shù)炉抒,如果為true的話,則第三個函數(shù)disposer的處理在Observable的結(jié)束前執(zhí)行
     *
     *  當一個觀察者訂閱 using 返回的Observable時稚叹, using 將會使用Observable工廠函數(shù)創(chuàng)建觀察者要觀察的Observable端礼,
     *  同時使用資源工廠函數(shù)創(chuàng)建一個你想要創(chuàng)建的資源禽笑。
     *  當觀察者取消訂閱這個Observable時,或者當觀察者終止時(無論是正常終止還是因錯誤而終止)蛤奥, 
     *  using 使用第三個函數(shù)釋放它創(chuàng)建的資源佳镜。
     */
    Observable.using(
            // 一個用戶創(chuàng)建一次性資源的工廠函數(shù)
            new Callable<MyResource>() {
                @Override
                public MyResource call() throws Exception {
                    System.out.println("----> resourceSupplier call");
                    return new MyResource("This is Observable resource!");
                }
            },
            // 一個用于創(chuàng)建Observable的工廠函數(shù),這個函數(shù)返回的Observable就是最終被觀察的Observable
            new Function<MyResource, ObservableSource<Long>>() {
                @Override
                public ObservableSource<Long> apply(MyResource myResource) throws Exception {
                    System.out.println("----> sourceSupplier apply: " + myResource);
                    return Observable.rangeLong(1, 5);
                }
            },
            // 一個用于釋放資源的函數(shù)
            new Consumer<MyResource>() {
                @Override
                public void accept(MyResource myResource) throws Exception {
                    System.out.println("----> disposer accept: ");
                    myResource.releaseResource();
                }
            },
            // 可選參數(shù)凡桥,如果為true的話蟀伸,則在Observable的結(jié)束前執(zhí)行釋放資源的函數(shù)
            true).subscribe(new Observer<Long>() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("--> onSubscribe");
                }

                @Override
                public void onNext(Long aLong) {
                    System.out.println("--> onNext: " + aLong);
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("--> onError: " + e);
                }

                @Override
                public void onComplete() {
                    System.out.println("--> onComplete");
                }
            });

輸出:

----> resourceSupplier call(1)
----> sourceSupplier apply(1): MyResource{resource='This is Observable resource!'}
--> onSubscribe(1)
--> onNext(1): 1
--> onNext(1): 2
--> onNext(1): 3
--> onNext(1): 4
--> onNext(1): 5
----> disposer accept(1): 
----> MyResource resource is release. 
--> onComplete

Javadoc: using(Callable resourceSupplier, Function sourceSupplier, Consumer disposer)
Javadoc: using(Callable resourceSupplier, Function sourceSupplier, Consumer disposer, boolean eager)

12. To

將Observable轉(zhuǎn)換為另一個對象或數(shù)據(jù)結(jié)構(gòu)。

img-To

將 Observable 或者Observable 發(fā)射的數(shù)據(jù)序列轉(zhuǎn)換為另一個對象或數(shù)據(jù)結(jié)構(gòu)缅刽。它們中的一些會阻塞直到 Observable 終止啊掏,然后生成一個等價的對象或數(shù)據(jù)結(jié)構(gòu);另一些返回一個發(fā)射那個對象或數(shù)據(jù)結(jié)構(gòu)的 Observable衰猛。

由于 rxjava 的 To 操作符中有很多 toXXX 操作符的實現(xiàn)和不同的變體重載迟蜜,此處就不詳細的展開了,有興趣的可以查看官方的API 文檔 詳細參閱啡省。

下面幾個是常見的幾種To操作符的:

  • toList():讓Observable將多項數(shù)據(jù)組合成一個List娜睛,然后調(diào)用一次onNext方法傳遞整個列表。
  • toMap(Function keySelector,Function valueSelector):toMap收集原始Observable發(fā)射的所有數(shù)據(jù)項到一個Map(默認是HashMap)然后發(fā)射這個Map卦睹。 你可以提供一個用于生成Map的Key的函數(shù)畦戒,還可以提供一個函數(shù)轉(zhuǎn)換數(shù)據(jù)項到Map存儲的值(默認數(shù)據(jù)項本身就是值)。
  • toSortedList(): 它會對產(chǎn)生的列表排序结序,默認是自然升序障斋,如果發(fā)射的數(shù)據(jù)項沒有實現(xiàn)Comparable接口价匠,會拋出一個異常伴挚,你也可以傳遞一個函數(shù)作為用于比較兩個數(shù)據(jù)項。
  • toMultimap(Function keySelector, Function valueSelector):類似于toMap异赫,不同的是返敬,它生成的這個Map的value類型還是一個ArrayList遂庄。

示例代碼:

        /**
         *  1. toList()
         *  讓Observable將多項數(shù)據(jù)組合成一個List,然后調(diào)用一次onNext方法傳遞整個列表救赐。
         */
        range.toList()
                .subscribe(new Consumer<List<Integer>>() {
                    @Override
                    public void accept(List<Integer> integers) throws Exception {
                        System.out.println("--> toList accept(1): " + integers);
                    }
                });

        System.out.println("------------------------------------------");
        /**
         *  2. toMap(Function<? super T, ? extends K> keySelector,Function<? super T, ? extends V> valueSelector)
         *   toMap收集原始Observable發(fā)射的所有數(shù)據(jù)項到一個Map(默認是HashMap)然后發(fā)射這個Map涧团。
         *   你可以提供一個用于生成Map的Key的函數(shù)只磷,還可以提供一個函數(shù)轉(zhuǎn)換數(shù)據(jù)項到Map存儲的值(默認數(shù)據(jù)項本身就是值)经磅。
         */
        range.toMap(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return "key" + integer;     // 返回一個Map的key
            }
        }, new Function<Integer, Integer>() {
            @Override
            public Integer apply(Integer integer) throws Exception {
                return integer;                     // 返回一個Map的value
            }
        }).subscribe(new SingleObserver<Map<String, Integer>>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("--> onSubscribe(2)");
            }

            @Override
            public void onSuccess(Map<String, Integer> stringIntegerMap) {
                System.out.println("--> onSuccess(2): " + stringIntegerMap);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("--> onError(2): " + e);
            }
        });

        System.out.println("------------------------------------------");
        /**
         *  3. toSortedList()
         *  它會對產(chǎn)生的列表排序,默認是自然升序钮追,如果發(fā)射的數(shù)據(jù)項沒有實現(xiàn)Comparable接口预厌,會拋出一個異常。
         *  然而元媚,你也可以傳遞一個函數(shù)作為用于比較兩個數(shù)據(jù)項
         */
        Observable.just(5, 3, 8, 6, 9, 10)
                .toSortedList()
                .subscribe(new SingleObserver<List<Integer>>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        System.out.println("--> onSubscribe(3)");
                    }

                    @Override
                    public void onSuccess(List<Integer> integers) {
                        System.out.println("--> onSuccess(3): " + integers);
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println("--> onError(3): " + e);
                    }
                });

        System.out.println("------------------------------------------");
        /**
         *  4. toSortedList(Comparator comparator)
         *
         *  傳遞一個函數(shù)comparator作為用于比較兩個數(shù)據(jù)項轧叽,它會對產(chǎn)生的列表排序
         */
        Observable.just(5, 3, 8, 6, 9, 10)
                .toSortedList(new Comparator<Integer>() {
                    @Override
                    public int compare(Integer o1, Integer o2) {
                        System.out.println("--> compare: o1 = " + o1 + ", o2 = " + o2);
                        return o1 - o2;     // 比較器的排序邏輯
                    }
                }).subscribe(new SingleObserver<List<Integer>>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        System.out.println("--> onSubscribe(4)");
                    }

                    @Override
                    public void onSuccess(List<Integer> integers) {
                        System.out.println("--> onSuccess(4): " + integers);
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println("--> onError(4): " + e);
                    }
                });

        System.out.println("------------------------------------------");
        /**
         *  5. toMultimap(Function<T, K> keySelector, Function<T, V> valueSelector)
         *  類似于 toMap 苗沧,不同的是,它生成的這個Map的value類型還是一個ArrayList
         */
        range.toMultimap(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return "key" + integer;     // 返回一個Map的key
            }
        }, new Function<Integer, Integer>() {
            @Override
            public Integer apply(Integer integer) throws Exception {
                return integer;                  // 返回一個Map的value
            }
        }).subscribe(new SingleObserver<Map<String, Collection<Integer>>>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("--> onSubscribe(5)");
            }

            @Override
            public void onSuccess(Map<String, Collection<Integer>> stringCollectionMap) {
                System.out.println("--> onSuccess(5): " + stringCollectionMap);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("--> onError(5): " + e);
            }
        });

輸出:

--> toList accept(1): [1, 2, 3, 4, 5]
------------------------------------------
--> onSubscribe(2)
--> onSuccess(2): {key1=1, key2=2, key5=5, key3=3, key4=4}
------------------------------------------
--> onSubscribe(3)
--> onSuccess(3): [3, 5, 6, 8, 9, 10]
------------------------------------------
--> onSubscribe(4)
--> compare: o1 = 3, o2 = 5
--> compare: o1 = 8, o2 = 3
--> compare: o1 = 8, o2 = 5
--> compare: o1 = 6, o2 = 5
--> compare: o1 = 6, o2 = 8
--> compare: o1 = 9, o2 = 6
--> compare: o1 = 9, o2 = 8
--> compare: o1 = 10, o2 = 6
--> compare: o1 = 10, o2 = 9
--> onSuccess(4): [3, 5, 6, 8, 9, 10]
------------------------------------------
--> onSubscribe(5)
--> onSuccess(5): {key1=[1], key2=[2], key5=[5], key3=[3], key4=[4]}

Javadoc: toList()
Javadoc: toMap(Function keySelector,Function valueSelector)
Javadoc: toSortedList()
Javadoc: toMultimap(Function keySelector, Function valueSelector)

小結(jié)

本節(jié)主要是介紹了 Rxjava 中的各種輔助操作符炭晒,比如延遲待逞、超時,事件監(jiān)聽等相關(guān)的輔助類型的操作网严,這在開發(fā)中是很有用處的识樱。

提示:以上使用的Rxjava2版本: 2.2.12

Rx介紹與講解及完整目錄參考:Rxjava2 介紹與詳解實例

實例代碼:

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市震束,隨后出現(xiàn)的幾起案子怜庸,更是在濱河造成了極大的恐慌,老刑警劉巖垢村,帶你破解...
    沈念sama閱讀 217,734評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件割疾,死亡現(xiàn)場離奇詭異,居然都是意外死亡嘉栓,警方通過查閱死者的電腦和手機宏榕,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,931評論 3 394
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來胸懈,“玉大人担扑,你說我怎么就攤上這事∪で” “怎么了涌献?”我有些...
    開封第一講書人閱讀 164,133評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長首有。 經(jīng)常有香客問我燕垃,道長,這世上最難降的妖魔是什么井联? 我笑而不...
    開封第一講書人閱讀 58,532評論 1 293
  • 正文 為了忘掉前任卜壕,我火速辦了婚禮,結(jié)果婚禮上烙常,老公的妹妹穿的比我還像新娘轴捎。我一直安慰自己,他們只是感情好蚕脏,可當我...
    茶點故事閱讀 67,585評論 6 392
  • 文/花漫 我一把揭開白布侦副。 她就那樣靜靜地躺著,像睡著了一般驼鞭。 火紅的嫁衣襯著肌膚如雪秦驯。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,462評論 1 302
  • 那天挣棕,我揣著相機與錄音译隘,去河邊找鬼亲桥。 笑死,一個胖子當著我的面吹牛固耘,可吹牛的內(nèi)容都是我干的题篷。 我是一名探鬼主播,決...
    沈念sama閱讀 40,262評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼厅目,長吁一口氣:“原來是場噩夢啊……” “哼悼凑!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起璧瞬,我...
    開封第一講書人閱讀 39,153評論 0 276
  • 序言:老撾萬榮一對情侶失蹤户辫,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后嗤锉,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體渔欢,經(jīng)...
    沈念sama閱讀 45,587評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,792評論 3 336
  • 正文 我和宋清朗相戀三年瘟忱,在試婚紗的時候發(fā)現(xiàn)自己被綠了奥额。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,919評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡访诱,死狀恐怖垫挨,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情触菜,我是刑警寧澤九榔,帶...
    沈念sama閱讀 35,635評論 5 345
  • 正文 年R本政府宣布,位于F島的核電站涡相,受9級特大地震影響哲泊,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜催蝗,卻給世界環(huán)境...
    茶點故事閱讀 41,237評論 3 329
  • 文/蒙蒙 一切威、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧丙号,春花似錦先朦、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,855評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至遍尺,卻和暖如春截酷,著一層夾襖步出監(jiān)牢的瞬間涮拗,已是汗流浹背乾戏。 一陣腳步聲響...
    開封第一講書人閱讀 32,983評論 1 269
  • 我被黑心中介騙來泰國打工迂苛, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人鼓择。 一個月前我還...
    沈念sama閱讀 48,048評論 3 370
  • 正文 我出身青樓三幻,卻偏偏與公主長得像,于是被迫代替她去往敵國和親呐能。 傳聞我的和親對象是個殘疾皇子念搬,可洞房花燭夜當晚...
    茶點故事閱讀 44,864評論 2 354