使用 UT 高效地玩轉(zhuǎn) RxJava 的操作符

RxJava 博大精深暴心,想要入門和進階姐浮,操作符是一個切入點。 所以超歌,我們希望尋找一種可以把操作符寫得比較爽砍艾,同時可以快速驗證輸入輸出是否準確的玩法。思路有以下兩點:

  1. 使用 UT(JUnit Test) 來對每一個操作符進行實現(xiàn)巍举,如此一來可以脫離 Android 平臺的依賴脆荷,專注于操作符本身。
  2. 對于每一種操作符懊悯,使用 RX Marbles 蜓谋,或者 RxJava 官方的彈珠圖(marble diagrams)進行實現(xiàn)。

比如下面兩張圖炭分,分別來自 RX Marbles 和官方的彈珠圖桃焕,我們要做的就是用 UT 有目的性、精確地實現(xiàn)這兩張圖的輸入和輸出捧毛。

來自 RX Marbles的彈珠圖
來自官方的彈珠圖

所謂有目的性观堂、精確地輸入輸出,意思就是根據(jù)所有操作符的彈珠圖的每條數(shù)據(jù)流呀忧,以及操作符的含義师痕,嚴格按照圖片表達的意思進行代碼的實現(xiàn)。通過這種方強迫癥一般的方式而账,對理解操作符和 RxJava 的體系有很大的幫助胰坟。

(一)預備知識

我們希望把精力專注于操作符的實現(xiàn),而不是單元測試的技巧泞辐,但由于 RxJava 的異步特性笔横,有很多操作符是跟線程相關(guān)的,因此我們要先掌握單元測試中如何對線程進行處理的預備知識铛碑。

讓測試線程最晚結(jié)束

在線程相關(guān)的測試代碼中狠裹,有個很棘手的現(xiàn)象是:測試線程早于子線程執(zhí)行完畢,如下代碼:

@Test
public void test_thread_early() {

    //測試線程啟動
    System.out.println("測試線程-start");

    new Thread(new Runnable() {
        @Override
        public void run() {
            System.out.println("子線程-start");
            OperatorUtils.sleep(3000);
            System.out.println("子線程-end");
        }
    }).start();

    //測試線程結(jié)束后汽烦,子線程還未執(zhí)行完畢涛菠,因此子線程無法完整的輸出測試結(jié)果
    System.out.println("測試線程-end");
}

在上述代碼中,測試線程瞬間就執(zhí)行完畢了撇吞,而子線程需要執(zhí)行3s俗冻,測試線程早于子線程執(zhí)行完畢,因此子線程將無法完整的執(zhí)行牍颈,因此迄薄,輸出的結(jié)果是:

測試線程-start
測試線程-end
子線程-start

于此對應的,我們來看看 RxJava 操作符的例子煮岁,通過 timer 操作符實現(xiàn)延遲3s發(fā)送數(shù)據(jù):

@Test
public void test_thread_early_observable() {
    System.out.println("測試線程-start,所在線程:" + Thread.currentThread().getName());

    //消息源在Schedulers.computation()線程中執(zhí)行讥蔽,3s后執(zhí)行涣易,此時測試線程已經(jīng)執(zhí)行完畢,無法正常輸出結(jié)果
    Observable.timer(3, TimeUnit.SECONDS)
            .subscribe(num -> {
                System.out.println("Observable和Subscriber所在線程:" + Thread.currentThread().getName());
                System.out.println("獲取訂閱數(shù)據(jù):" + num);

            });
    System.out.println("測試線程--end");
}

與上面的代碼一樣冶伞,由于測試線程早早的結(jié)束了新症,timer 操作符所在的線程 Schedulers.computation() 將無法完整地執(zhí)行完畢,因此輸出的結(jié)果是:

測試線程-start,所在線程:main
測試線程-end

如果無法保證所有線程都執(zhí)行完畢响禽,便無法得到預期的輸出結(jié)果徒爹。那么,如何解決這個問題芋类?有種最笨的方法便是讓測試線程成為最晚結(jié)束的線程隆嗅,我們?yōu)闇y試線程增加類似于 Thread.sleep(4000) 的邏輯,便可保證以上兩份代碼可以在正常輸出侯繁。(此文不希望涉及太多的測試技巧胖喳,如果需要更嚴謹和更強大的線程異步測試,可以參考些第三方框架巫击,如 awaitility

使用TestScheduler操縱時間

除了這種笨方法之外禀晓,RxJava 提供了 TestScheduler,通過這個調(diào)度器可以實現(xiàn)對時間的操縱坝锰。

對于上文提到的 timer 操作符,通過testScheduler.advanceTimeBy(3, TimeUnit.SECONDS) 可以將時間提前3s重付,此時測試線程和 timer 操作符所在的線程均可順利的執(zhí)行完畢顷级,完整代碼如下:

@Test
public void test_thread_with_TestScheduler() {

    TestScheduler testScheduler = Schedulers.test();
    System.out.println("測試線程:" + Thread.currentThread().getName());

    //指定調(diào)度器
    Observable.timer(3, TimeUnit.SECONDS, testScheduler)
            .subscribe(num -> {
                System.out.println("Observable和Subscriber線程:" + Thread.currentThread().getName());
                System.out.println("獲取訂閱數(shù)據(jù):" + num);
            });

    //將時間提前了3s
    testScheduler.advanceTimeBy(3, TimeUnit.SECONDS);
}

聚合操作符的線程處理

很多聚合操作符,如 merge 确垫、zip 等弓颈,需要在多個不同的線程中構(gòu)造不同的數(shù)據(jù)流,從而體現(xiàn)數(shù)據(jù)流發(fā)送的先后關(guān)系删掀,以及所對應的不同的輸出結(jié)果翔冀。如何讓多個線程完整的執(zhí)行完畢?結(jié)合上文所講的讓測試線程最晚結(jié)束以及**使用 TestScheduler **便可做到披泪。筆者在下文的聚合操作符一節(jié)中將會具體講解纤子。

有了這些預備知識,基本上可以實現(xiàn) RxJava 的所有操作符款票,接下來針對不同類型的操作符分別舉例一二進行講解控硼,如需完整代碼,請前往Github:
https://github.com/geniusmart/RxJavaOperatorsUTSample

(二)不同類型的操作符實現(xiàn)

interval

interval 作為創(chuàng)建型的操作符艾少,具備間隔一段時間發(fā)送數(shù)據(jù)的能力卡乾,是我們寫其他操作符的基礎(chǔ),因此先來講解下interval 缚够。

這張圖要表達的意思很簡單幔妨,自頂而下的分析如下:

  1. 操作符:由于 interval 操作符是創(chuàng)建型的鹦赎,因此直接調(diào)用操作符來產(chǎn)生數(shù)據(jù)流,根據(jù) api 參數(shù)误堡,需定義其間隔時長钙姊,這個數(shù)值我們設(shè)置為100ms。
  2. 輸入:執(zhí)行了 Observable.interval() 之后埂伦,每間隔指定時間將輸出 0煞额、1、2沾谜、3…… 的無窮數(shù)據(jù)(注:通過彈珠圖膊毁,可以看到第一個數(shù)據(jù)也是有間隔時間的)。
  3. 輸出:即數(shù)據(jù)消費者基跑,在 RxJava 中體現(xiàn)為 Subscriber 婚温。這張圖里并沒有畫出輸出的數(shù)據(jù)流,為了觀察輸出媳否,我們自定義訂閱者栅螟。
  4. 實現(xiàn)思路: interval 默認在 Schedulers.computation() 線程中執(zhí)行,執(zhí)行的時間將會超過測試線程篱竭,根據(jù)上文的「預備知識」這一節(jié)所述力图,我們使用 TestScheduler 來操縱時間,比如掺逼,為了輸出4個數(shù)據(jù)吃媒, interval 需要4個單位的間隔時間(400ms),將時間提前400ms可輸出我們想要的結(jié)果吕喘。具體實現(xiàn)如下:
@Test
public void interval() {
    Observable.interval(100, TimeUnit.MILLISECONDS, mTestScheduler)
            .subscribe(mList::add);

    //時間提早400ms前
    mTestScheduler.advanceTimeBy(400, TimeUnit.MILLISECONDS);
    assertEquals(mList, Arrays.asList(0L, 1L, 2L, 3L));

    //時間提早(400 + 200)ms前
    mTestScheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
    assertEquals(mList, Arrays.asList(0L, 1L, 2L, 3L, 4L, 5L));
}

以此類推赘那,range 倒淫、just 伪很、repeat 等創(chuàng)建型的操作符均可按照這種方式實現(xiàn)彈珠圖,這類操作符的實現(xiàn)代碼請查看:CreatingOperatorsTest.java

delay

delay 是工具類型的操作符盹兢,可以對數(shù)據(jù)流進行延時發(fā)送闻察。

delay 彈珠圖來自于rxmarbles.com

與創(chuàng)建型操作符 interval 彈珠圖不一樣拱礁,delay 有輸入和輸出兩條數(shù)據(jù)流,中間是操作符的轉(zhuǎn)換過程蜓陌。輸入需借助創(chuàng)建型操作符實現(xiàn)(如 just)觅彰,輸出則由訂閱者完成。

  1. 輸入:使用比較簡單的 just 操作符钮热,即 Observable.just(1, 2, 1)填抬。
  2. 輸出:經(jīng)過 delay 的變換,在延遲指定的時間之后隧期,輸出與輸入一致的輸入流飒责。
  3. 實現(xiàn)思路:此操作符也是與時間相關(guān)的操作符赘娄,通用 TestScheduler 來操縱時間,并且驗證「延時時間內(nèi)」和「超過延時時間」是否有數(shù)據(jù)流輸出宏蛉。代碼如下:
@Test
public void delay() {
    Observable.just(1, 2, 1)
            .delay(3000, TimeUnit.SECONDS, mTestScheduler)
            .subscribe(mList::add);

    mTestScheduler.advanceTimeBy(2000, TimeUnit.SECONDS);
    System.out.println("after 2000ms,result = " + mList);
    assertTrue(mList.isEmpty());

    mTestScheduler.advanceTimeBy(1000, TimeUnit.SECONDS);
    System.out.println("after 3000ms,result = " + mList);
    assertEquals(mList, Arrays.asList(1, 2, 1));
}

工具型的操作符還有非常多遣臼,比如變換線程的 observeOnsubscribeOn ,比如 Observable 生命周期的事件監(jiān)聽操作符 doOnSubscribe 拾并、doOnNext揍堰、doOnCompleted,延遲訂閱的 delaySubscription 等嗅义,這類型的操作符實現(xiàn)請查看:UtilityOperatorsTest.java屏歹。

amb

amb 是條件型的操作符(Conditional Operators),滿足一定的條件數(shù)據(jù)流才會開始發(fā)送之碗,而 amb 需要滿足的條件便是:多個數(shù)據(jù)流中最早產(chǎn)生數(shù)據(jù)的數(shù)據(jù)流進行發(fā)送蝙眶,彈珠圖也明確地表達出了這層含義。

  1. 輸入:這里有3條數(shù)據(jù)流褪那,開始發(fā)送數(shù)據(jù)的時間各不一樣幽纷,通過之前的操作符講解,這里使用 just + delay 即可實現(xiàn)博敬。
  2. 輸出:經(jīng)過 amb 變化后友浸,輸出了最早發(fā)送數(shù)據(jù)的數(shù)據(jù)流,即第二條數(shù)據(jù)流冶忱。
  3. 實現(xiàn)思路:通過 delay 操作符分別延時500s尾菇、200s和1000s,然后通用 TestScheduler 將時間提早1000s囚枪,訂閱數(shù)據(jù)流后,驗證下輸出劳淆。代碼如下:
@Test
public void amb() {
    Observable<Integer> o1 = Observable.just(20, 40, 60)
            .delay(500, TimeUnit.SECONDS, mTestScheduler);

    Observable<Integer> o2 = Observable.just(1, 2, 3)
            .delay(200, TimeUnit.SECONDS, mTestScheduler);

    Observable<Integer> o3 = Observable.just(0, 0, 0)
            .delay(1000, TimeUnit.SECONDS, mTestScheduler);

    Observable.amb(o1, o2, o3)
            .subscribe(mList::add);

    mTestScheduler.advanceTimeBy(1000, TimeUnit.SECONDS);
    assertEquals(mList, Arrays.asList(1, 2, 3));
}

以此類推链沼,更多條件行的操作符,如 skipUntil 沛鸵、takeUntil 等括勺,請查看ConditionalAndBooleanOperatorsTest.java

buffer

buffer 是轉(zhuǎn)換型的操作符曲掰,他可以將單個數(shù)據(jù)緩存起來疾捍,批量發(fā)送,發(fā)送的數(shù)據(jù)類型是 List 栏妖。

buffer

上圖要表達的意思很明確乱豆,發(fā)送6個數(shù)據(jù),每三個做一次緩存吊趾,然后批量發(fā)送宛裕,代碼實現(xiàn)如下:

@Test
public void buffer() {

    Observable.just(1, 2, 3, 4, 5, 6)
            .buffer(3)
            .subscribe(mList::add);

    System.out.println(mList);
    List<List<Integer>> exceptList = Arrays.asList(Arrays.asList(1, 2, 3),
            Arrays.asList(4, 5, 6));
    assertEquals(mList, exceptList);
}

flatMap 和 concatMap

接下來瑟啃,來對比一組轉(zhuǎn)換型的操作符:flatMapconcatMap,這兩者充分體現(xiàn)了 marble diagrams 給我們帶來的各種有價值的信息揩尸。以下是這兩個操作符的 marble diagrams:

flatMap
concatMap
  1. 輸入:兩者完全一模一樣的輸入蛹屿,這里要重點關(guān)注彈珠的顏色,顏色代表了數(shù)據(jù)流的順序岩榆。
  2. 輸出:輸入的數(shù)據(jù)流經(jīng)過變換后错负,每份數(shù)據(jù)都變成了兩份,此外勇边,** flatMap 變換后犹撒,綠色的◇和藍色◇是交叉的,而 concatMap 則保持了與輸入一致的順序**粥诫,這個細節(jié)決定了我們?nèi)绾蝸韺崿F(xiàn)這兩張圖油航。
  3. 實現(xiàn)思路:在 flatMapconcatMap 之后,3個數(shù)據(jù)變成了6個數(shù)據(jù)怀浆,假設(shè)輸入為 1谊囚、2、3 执赡,則輸出為 1镰踏、1、2沙合、2奠伪、3、3 首懈,我們要想辦法讓變換后的輸出有時間差绊率,即按照 1、1究履、2滤否、3、2最仑、3 的順序輸出藐俺,思考再三, interval 可以實現(xiàn)這個場景泥彤,將原始的輸入流1欲芹、2、3分別作為 interval 時間間隔的變量吟吝,來模擬交叉的輸出菱父。具體實現(xiàn)如下:
@Test
public void flatMap() {
    Observable.just(1, 2, 3)
            .flatMap((Func1<Integer, Observable<?>>) num -> Observable.interval(num - 1,
                    TimeUnit.SECONDS, mTestScheduler)
                    .take(2)
                    .map(value -> num + "◇"))
            .subscribe(mList::add);

    mTestScheduler.advanceTimeBy(100, TimeUnit.SECONDS);
    assertEquals(mList, Arrays.asList("1◇", "1◇", "2◇", "3◇", "2◇", "3◇"));
    System.out.println(mList);
}

上述代碼中,只需把 flatMap 修改為 concatMap ,便可獲得 "1◇", "1◇", "2◇", "2◇", "3◇", "3◇" 的數(shù)據(jù)流滞伟,與彈珠圖所要表達的意思完全一致揭鳞。通過這個例子,我們可以感受到梆奈,彈珠圖包含了操作符的諸多細節(jié)野崇,嚴謹?shù)貙崿F(xiàn)彈珠圖的輸入輸出,可以更深入的了解操作符亩钟。

更多轉(zhuǎn)換型的操作符的實現(xiàn)乓梨,如 switchMapgroupBy 清酥、window 等扶镀,請查看TransformingOperatorsTest.java

debounce

debounce 是過濾型的操作符焰轻,所以會按一定的規(guī)則過濾數(shù)據(jù)流臭觉。這個規(guī)則是:Observable每產(chǎn)生一個結(jié)果后,如果在規(guī)定的間隔時間內(nèi)沒有別的結(jié)果產(chǎn)生辱志,則把這個結(jié)果提交給訂閱者處理蝠筑,否則忽略該結(jié)果。

  1. 輸入:對于輸入的數(shù)據(jù)流可以這樣定義:先產(chǎn)生 1 的數(shù)據(jù)揩懒,間隔500ms后產(chǎn)生 2什乙、3、4已球、5 臣镣,再間隔500ms,產(chǎn)生 6 智亮,使用 create 操作符結(jié)合 Thread.sleep() 來實現(xiàn)輸入忆某。
  2. 輸出: debounce 的間隔時間設(shè)置為400ms,在三段間隔周期內(nèi)阔蛉,將依次輸出 1褒繁、5、6 馍忽。具體代碼如下:
@Test
public void debounce() {

    Observable.create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            subscriber.onNext(1);
            OperatorUtils.sleep(500);

            subscriber.onNext(2);
            subscriber.onNext(3);
            subscriber.onNext(4);
            subscriber.onNext(5);

            OperatorUtils.sleep(500);
            subscriber.onNext(6);
            subscriber.onCompleted();
        }
    })
            .subscribeOn(mTestScheduler)
            .doOnNext(System.out::println)
            .debounce(400, TimeUnit.MILLISECONDS)
            .subscribe(mList::add);

    // 測試線程將時間提早10ms,可以保證create操作符順利執(zhí)行完畢
    mTestScheduler.advanceTimeBy(10, TimeUnit.MILLISECONDS);
    System.out.println(mList);
    assertEquals(mList, Arrays.asList(1, 5, 6));
}

以此類推燕差,按照這種方式可以實現(xiàn) sample遭笋、throttleFirstthrottleLast 等過濾型的操作符徒探,具體代碼請查看:FilteringOperatorsTest.java 瓦呼。

merge

merge 是聚合型的操作符。既然是聚合测暗,因此需要2條以上的數(shù)據(jù)流央串,聚合之后磨澡,輸出一條全新的數(shù)據(jù)流。

  1. 輸入:兩條數(shù)據(jù)流质和,并且要重點關(guān)注下數(shù)據(jù)發(fā)送的順序稳摄。
  2. 輸出:根據(jù)輸入的數(shù)據(jù)順序,原封不動的合并之后饲宿,進行輸出厦酬。
  3. 實現(xiàn)思路:兩條數(shù)據(jù)流均使用 interval 創(chuàng)建,第一條的間隔時間定義為5s瘫想,第二條數(shù)據(jù)流在第一條數(shù)據(jù)流產(chǎn)生了三個數(shù)據(jù)之后才發(fā)出第一個數(shù)據(jù)仗阅,因此時間間隔設(shè)置為18s,具體實現(xiàn)如下:
@Test
public void merge() {
    Observable<Long> observable1 = Observable.interval(5, TimeUnit.SECONDS, mTestScheduler)
            .take(5)
            .map(aLong -> (aLong + 1) * 20)
            .doOnNext(System.out::println);

    Observable<Long> observable2 = Observable.interval(18, TimeUnit.SECONDS, mTestScheduler)
            .take(2)
            .map(aLong -> 1L)
            .doOnNext(System.out::println);

    Observable.merge(observable1, observable2).subscribe(mList::add);

    mTestScheduler.advanceTimeBy(1000, TimeUnit.SECONDS);
    assertEquals(mList, Arrays.asList(20L, 40L, 60L, 1L, 80L, 100L, 1L));
}

combineLatest

combineLatest 是聚合型的操作符国夜, 其聚合的規(guī)則是:每條數(shù)據(jù)流中的每個數(shù)據(jù)都要與另外一條數(shù)據(jù)流已發(fā)送的最近的數(shù)據(jù)進行兩兩結(jié)合减噪。

構(gòu)造出如彈珠圖所示的兩條數(shù)據(jù)流,重點在于制造時間差和多線程:

  • 使用 create + Thread.sleep() 來制造數(shù)據(jù)流產(chǎn)生的時間差车吹。
  • 讓兩條數(shù)據(jù)流在不同的線程中發(fā)送數(shù)據(jù)筹裕,使用 subscribeOn 操作符可以實現(xiàn)線程的調(diào)度。
  1. 首先是第一條數(shù)據(jù)流的構(gòu)造礼搁,讓其在 TestScheduler.test() 線程中產(chǎn)生數(shù)據(jù)(其實便是測試線程饶碘,增加了操縱時間的能力),代碼如下:
Observable<Integer> observable1 = Observable.create(new Observable.OnSubscribe<Integer>() {

   @Override
   public void call(Subscriber<? super Integer> subscriber) {
       OperatorUtils.logThread("observable1");
       subscriber.onNext(1);
       OperatorUtils.sleep(500);
       subscriber.onNext(2);
       OperatorUtils.sleep(1500);
       subscriber.onNext(3);
       OperatorUtils.sleep(250);
       subscriber.onNext(4);
       OperatorUtils.sleep(500);
       subscriber.onNext(5);
       subscriber.onCompleted();
   }
}).subscribeOn(mTestScheduler).doOnNext(System.out::println);
  1. 其次是第二條數(shù)據(jù)流馒吴,將其生產(chǎn)數(shù)據(jù)的線程定義為 Schedulers.newThread() 扎运,代碼如下:
Observable<String> observable2 = Observable.create(new Observable.OnSubscribe<String>() {
   @Override
   public void call(Subscriber<? super String> subscriber) {
       OperatorUtils.logThread("observable2");
       OperatorUtils.sleep(250);
       subscriber.onNext("A");
       OperatorUtils.sleep(300);
       subscriber.onNext("B");
       OperatorUtils.sleep(500);
       subscriber.onNext("C");
       OperatorUtils.sleep(100);
       subscriber.onNext("D");
       subscriber.onCompleted();
   }
}).subscribeOn(Schedulers.newThread()).doOnNext(System.out::println);
  1. 前面2點完成了輸入,接下來就是進行聚合變換饮戳,以及消費數(shù)據(jù)豪治,產(chǎn)生輸出,并驗證與彈珠圖的輸出一致扯罐。
 (Func2<Integer, String, Object>) (integer, s) -> integer + s).subscribe(mList::add);
//測試線程提前一定時間负拟,讓observable1能順利開始發(fā)送數(shù)據(jù)
mTestScheduler.advanceTimeBy(10, TimeUnit.MILLISECONDS);
System.out.println(mList);
assertEquals(mList, Arrays.asList("1A", "2A", "2B", "2C", "2D", "3D", "4D", "5D"));

mergecombineLatest 類似,我們可以依次實現(xiàn) zip 歹河、switchOnNext 掩浙、 withLatestFrom 等聚合型操作符,并了解他們之間的區(qū)別秸歧。聚合型的操作符所有代碼請前往:CombiningOperatorsTest.java 厨姚。

connect

之前介紹的創(chuàng)建型操作符均創(chuàng)建了 cold 類型的 Observable ,其特點是只有訂閱者訂閱數(shù)據(jù)時键菱,數(shù)據(jù)流才會開始發(fā)送數(shù)據(jù)谬墙。于此相反,hot 類型的 Observable 不管有沒有訂閱者,都可以直接開始發(fā)送數(shù)據(jù)拭抬。publishconnect 是與 hot Observable 相關(guān)的一類操作符部默。

這張彈珠圖并不好理解,但如果能完整實現(xiàn)造虎,對 hot Observable 的便能了然于胸傅蹂。這張圖中,輸出有三條數(shù)據(jù)流累奈,代表有三個訂閱者贬派,但是訂閱的時間不一致,最終接收到的數(shù)據(jù)也不一致澎媒,此外搞乏,這張圖中,體現(xiàn)了 publishconnect兩種操作符戒努。

  1. 輸入:數(shù)據(jù)流的產(chǎn)生比較清晰请敦,用上文講過的創(chuàng)建型操作符即可實現(xiàn)。由于需要時間差储玫,因此采用 interval 來產(chǎn)生數(shù)據(jù)流侍筛,時間間隔定義為3s。此外撒穷, interval 產(chǎn)生的數(shù)據(jù)流是 cold 類型的匣椰,如何由 cold 變成 hot,其實這便是 publish 操作符要做的事情端礼。
  2. 輸出:輸出的信息量比較大禽笑,我們需要好好捋一捋:
  • 首先可以明確有三個訂閱者,且訂閱的時間各不一樣蛤奥。延時訂閱可以使用 delaySubscription 操作符佳镜。
  • 第一個訂閱者即刻訂閱,不延時凡桥,而他在訂閱時蟀伸,數(shù)據(jù)流還未開始發(fā)送數(shù)據(jù),因此可以訂閱到完整的數(shù)據(jù)流缅刽。
  • 第一個訂閱者的數(shù)據(jù)流中有個操作符不可忽視——connect 啊掏,他決定著 Observable 何時開始發(fā)送數(shù)據(jù)。根據(jù)圖中所示衰猛,將時間定義為2秒后脖律。
  • 第二個訂閱者在數(shù)據(jù)發(fā)送了2個之后才開始訂閱,因此將訂閱時間設(shè)置為延遲6秒訂閱腕侄。他將只能訂閱到最后一個數(shù)據(jù)。
  • 第三個訂閱者與第一個區(qū)別并不大,我們將他定義為延時1秒后訂閱冕杠。

完整的代碼實現(xiàn)如下:

public void connect() {

    List<Integer> list1 = new ArrayList<>();
    List<Integer> list2 = new ArrayList<>();
    List<Integer> list3 = new ArrayList<>();

    //構(gòu)造1,2,3的數(shù)據(jù)流微姊,每隔3s發(fā)射數(shù)據(jù)
    ConnectableObservable<Integer> connectableObservable = Observable.create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            subscriber.onNext(1);
            OperatorUtils.sleep(3000);
            subscriber.onNext(2);
            OperatorUtils.sleep(3000);
            subscriber.onNext(3);
        }
    }).publish();

    System.out.println("Subscriber1-0s后開始訂閱數(shù)據(jù)");
    //立刻訂閱完整的數(shù)據(jù)流
    connectableObservable.doOnNext(num -> System.out.println("Subscriber1-->" + num))
            .subscribe(list1::add);

    //延遲6s后再訂閱,將只訂閱到3的數(shù)據(jù)流
    connectableObservable.delaySubscription(6, TimeUnit.SECONDS, Schedulers.newThread())
            .doOnSubscribe(()->{
                System.out.println("Subscriber2-6s后開始訂閱數(shù)據(jù)");
            })
            .doOnNext(num -> System.out.println("Subscriber2-->" + num))
            .subscribe(list2::add);

    //延遲1s后再訂閱分预,將只訂閱到完整的數(shù)據(jù)流
    connectableObservable.delaySubscription(1, TimeUnit.SECONDS, Schedulers.newThread())
            .doOnSubscribe(()->{
                System.out.println("Subscriber3-1s后開始訂閱數(shù)據(jù)");
            })
            .doOnNext(num -> System.out.println("Subscriber3-->" + num))
            .subscribe(list3::add);


    //延時2s執(zhí)行connect()
    OperatorUtils.sleep(2000);
    System.out.println("Observable 2s后觸發(fā)connect()");
    connectableObservable.connect();

    assertEquals(list1, Arrays.asList(1, 2, 3));
    assertEquals(list2, Collections.singletonList(3));
    assertEquals(list3, Arrays.asList(1, 2, 3));
}

以此類推兢交,可以實現(xiàn)其他與 hot Observable 相關(guān)的操作符,如 refCount笼痹、replay配喳、cache 等,具體代碼請查看ConnectableOperatorsTest.java 凳干。

其他類型的操作符

除了上文介紹的7種不同類型的操作符之外晴裹,還有錯誤處理類型(如 retryretryWhen)救赐、背壓類型(如 onBackpressureBuffer)涧团、Convert 類型(如toListtoMap )的操作符未涉及到经磅,以及一些彈珠圖無法完全詮釋操作符本身的諸多細節(jié)的講解泌绣,篇幅所限,請移步這篇文章查看预厌。

(三)本文代碼

本文的所有代碼請前往這個地址查看:
https://github.com/geniusmart/RxJavaOperatorsUTSample

目前已經(jīng)實現(xiàn)了的彈珠圖(marble diagrams)的操作符種類如下:


(四)結(jié)束語

授人以魚不如授人以漁阿迈。本文側(cè)重介紹一種學習 RxJava 、全面且深入了解操作符的方式轧叽,總結(jié)起來有如下關(guān)鍵點:

  1. 使用單元測試實現(xiàn)苗沧,消除對 Android 的依賴,且不要涉及太多的測試技巧犹芹,專注于操作符的實現(xiàn)崎页。
  2. 有目的性且嚴謹?shù)貙崿F(xiàn)輸入輸出。每個操作符腰埂,讀懂 marble diagrams 飒焦,并通過代碼實現(xiàn)。
  3. marble diagrams 圖片來自于RX Marbles 屿笼,或者 RxJava 官方 牺荠。
  4. 一些有更深層次含義或細節(jié)的,marble diagrams 無法完整詮釋的驴一,如defer休雌,retryWhen,查閱更多的文章實現(xiàn)肝断。這部分的講解請移步到另外一篇文章:《使用 UT 玩轉(zhuǎn) defer 和 retryWhen》杈曲。

參考文章

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末驰凛,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子担扑,更是在濱河造成了極大的恐慌恰响,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,682評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件涌献,死亡現(xiàn)場離奇詭異胚宦,居然都是意外死亡,警方通過查閱死者的電腦和手機燕垃,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,277評論 3 395
  • 文/潘曉璐 我一進店門枢劝,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人卜壕,你說我怎么就攤上這事您旁。” “怎么了印叁?”我有些...
    開封第一講書人閱讀 165,083評論 0 355
  • 文/不壞的土叔 我叫張陵被冒,是天一觀的道長。 經(jīng)常有香客問我轮蜕,道長昨悼,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,763評論 1 295
  • 正文 為了忘掉前任跃洛,我火速辦了婚禮率触,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘汇竭。我一直安慰自己葱蝗,他們只是感情好,可當我...
    茶點故事閱讀 67,785評論 6 392
  • 文/花漫 我一把揭開白布细燎。 她就那樣靜靜地躺著两曼,像睡著了一般。 火紅的嫁衣襯著肌膚如雪玻驻。 梳的紋絲不亂的頭發(fā)上悼凑,一...
    開封第一講書人閱讀 51,624評論 1 305
  • 那天,我揣著相機與錄音璧瞬,去河邊找鬼户辫。 笑死,一個胖子當著我的面吹牛嗤锉,可吹牛的內(nèi)容都是我干的渔欢。 我是一名探鬼主播,決...
    沈念sama閱讀 40,358評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼瘟忱,長吁一口氣:“原來是場噩夢啊……” “哼奥额!你這毒婦竟也來了苫幢?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,261評論 0 276
  • 序言:老撾萬榮一對情侶失蹤披坏,失蹤者是張志新(化名)和其女友劉穎态坦,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體棒拂,經(jīng)...
    沈念sama閱讀 45,722評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,900評論 3 336
  • 正文 我和宋清朗相戀三年玫氢,在試婚紗的時候發(fā)現(xiàn)自己被綠了帚屉。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,030評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡漾峡,死狀恐怖攻旦,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情生逸,我是刑警寧澤牢屋,帶...
    沈念sama閱讀 35,737評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站槽袄,受9級特大地震影響烙无,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜遍尺,卻給世界環(huán)境...
    茶點故事閱讀 41,360評論 3 330
  • 文/蒙蒙 一截酷、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧乾戏,春花似錦迂苛、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,941評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至呐能,卻和暖如春念搬,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背催跪。 一陣腳步聲響...
    開封第一講書人閱讀 33,057評論 1 270
  • 我被黑心中介騙來泰國打工锁蠕, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人懊蒸。 一個月前我還...
    沈念sama閱讀 48,237評論 3 371
  • 正文 我出身青樓荣倾,卻偏偏與公主長得像,于是被迫代替她去往敵國和親骑丸。 傳聞我的和親對象是個殘疾皇子舌仍,可洞房花燭夜當晚...
    茶點故事閱讀 44,976評論 2 355

推薦閱讀更多精彩內(nèi)容

  • 本篇文章介主要紹RxJava中操作符是以函數(shù)作為基本單位妒貌,與響應式編程作為結(jié)合使用的,對什么是操作铸豁、操作符都有哪些...
    嘎啦果安卓獸閱讀 2,862評論 0 10
  • 作者: maplejaw本篇只解析標準包中的操作符灌曙。對于擴展包,由于使用率較低节芥,如有需求在刺,請讀者自行查閱文檔。 創(chuàng)...
    maplejaw_閱讀 45,668評論 8 93
  • 創(chuàng)建操作 用于創(chuàng)建Observable的操作符Create通過調(diào)用觀察者的方法從頭創(chuàng)建一個ObservableEm...
    rkua閱讀 1,829評論 0 1
  • 前言 按照官方的分類头镊,操作符大致分為以下幾種: Creating Observables(Observable的創(chuàng)...
    小玉1991閱讀 1,050評論 0 1
  • 作者寄語 很久之前就想寫一個專題蚣驼,專寫Android開發(fā)框架,專題的名字叫 XXX 從入門到放棄 相艇,沉淀了這么久颖杏,...
    戴定康閱讀 7,631評論 13 85