RxJava 博大精深暴心,想要入門和進階姐浮,操作符是一個切入點。 所以超歌,我們希望尋找一種可以把操作符寫得比較爽砍艾,同時可以快速驗證輸入輸出是否準確的玩法。思路有以下兩點:
- 使用 UT(JUnit Test) 來對每一個操作符進行實現(xiàn)巍举,如此一來可以脫離 Android 平臺的依賴脆荷,專注于操作符本身。
- 對于每一種操作符懊悯,使用 RX Marbles 蜓谋,或者 RxJava 官方的彈珠圖(marble diagrams)進行實現(xiàn)。
比如下面兩張圖炭分,分別來自 RX Marbles 和官方的彈珠圖桃焕,我們要做的就是用 UT 有目的性、精確地實現(xiàn)這兩張圖的輸入和輸出捧毛。
所謂有目的性观堂、精確地輸入輸出,意思就是根據(jù)所有操作符的彈珠圖的每條數(shù)據(jù)流呀忧,以及操作符的含義师痕,嚴格按照圖片表達的意思進行代碼的實現(xiàn)。通過這種方強迫癥一般的方式而账,對理解操作符和 RxJava 的體系有很大的幫助胰坟。
(一)預備知識
我們希望把精力專注于操作符的實現(xiàn),而不是單元測試的技巧泞辐,但由于 RxJava 的異步特性笔横,有很多操作符是跟線程相關(guān)的,因此我們要先掌握單元測試中如何對線程進行處理的預備知識铛碑。
讓測試線程最晚結(jié)束
在線程相關(guān)的測試代碼中狠裹,有個很棘手的現(xiàn)象是:測試線程早于子線程執(zhí)行完畢,如下代碼:
@Test
public void test_thread_early() {
//測試線程啟動
System.out.println("測試線程-start");
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("子線程-start");
OperatorUtils.sleep(3000);
System.out.println("子線程-end");
}
}).start();
//測試線程結(jié)束后汽烦,子線程還未執(zhí)行完畢涛菠,因此子線程無法完整的輸出測試結(jié)果
System.out.println("測試線程-end");
}
在上述代碼中,測試線程瞬間就執(zhí)行完畢了撇吞,而子線程需要執(zhí)行3s俗冻,測試線程早于子線程執(zhí)行完畢,因此子線程將無法完整的執(zhí)行牍颈,因此迄薄,輸出的結(jié)果是:
測試線程-start
測試線程-end
子線程-start
于此對應的,我們來看看 RxJava 操作符的例子煮岁,通過 timer
操作符實現(xiàn)延遲3s發(fā)送數(shù)據(jù):
@Test
public void test_thread_early_observable() {
System.out.println("測試線程-start,所在線程:" + Thread.currentThread().getName());
//消息源在Schedulers.computation()線程中執(zhí)行讥蔽,3s后執(zhí)行涣易,此時測試線程已經(jīng)執(zhí)行完畢,無法正常輸出結(jié)果
Observable.timer(3, TimeUnit.SECONDS)
.subscribe(num -> {
System.out.println("Observable和Subscriber所在線程:" + Thread.currentThread().getName());
System.out.println("獲取訂閱數(shù)據(jù):" + num);
});
System.out.println("測試線程--end");
}
與上面的代碼一樣冶伞,由于測試線程早早的結(jié)束了新症,timer
操作符所在的線程 Schedulers.computation()
將無法完整地執(zhí)行完畢,因此輸出的結(jié)果是:
測試線程-start,所在線程:main
測試線程-end
如果無法保證所有線程都執(zhí)行完畢响禽,便無法得到預期的輸出結(jié)果徒爹。那么,如何解決這個問題芋类?有種最笨的方法便是讓測試線程成為最晚結(jié)束的線程隆嗅,我們?yōu)闇y試線程增加類似于 Thread.sleep(4000)
的邏輯,便可保證以上兩份代碼可以在正常輸出侯繁。(此文不希望涉及太多的測試技巧胖喳,如果需要更嚴謹和更強大的線程異步測試,可以參考些第三方框架巫击,如 awaitility)
使用TestScheduler操縱時間
除了這種笨方法之外禀晓,RxJava 提供了 TestScheduler
,通過這個調(diào)度器可以實現(xiàn)對時間的操縱坝锰。
對于上文提到的 timer
操作符,通過testScheduler.advanceTimeBy(3, TimeUnit.SECONDS)
可以將時間提前3s重付,此時測試線程和 timer
操作符所在的線程均可順利的執(zhí)行完畢顷级,完整代碼如下:
@Test
public void test_thread_with_TestScheduler() {
TestScheduler testScheduler = Schedulers.test();
System.out.println("測試線程:" + Thread.currentThread().getName());
//指定調(diào)度器
Observable.timer(3, TimeUnit.SECONDS, testScheduler)
.subscribe(num -> {
System.out.println("Observable和Subscriber線程:" + Thread.currentThread().getName());
System.out.println("獲取訂閱數(shù)據(jù):" + num);
});
//將時間提前了3s
testScheduler.advanceTimeBy(3, TimeUnit.SECONDS);
}
聚合操作符的線程處理
很多聚合操作符,如 merge
确垫、zip
等弓颈,需要在多個不同的線程中構(gòu)造不同的數(shù)據(jù)流,從而體現(xiàn)數(shù)據(jù)流發(fā)送的先后關(guān)系删掀,以及所對應的不同的輸出結(jié)果翔冀。如何讓多個線程完整的執(zhí)行完畢?結(jié)合上文所講的讓測試線程最晚結(jié)束以及**使用 TestScheduler
**便可做到披泪。筆者在下文的聚合操作符一節(jié)中將會具體講解纤子。
有了這些預備知識,基本上可以實現(xiàn) RxJava 的所有操作符款票,接下來針對不同類型的操作符分別舉例一二進行講解控硼,如需完整代碼,請前往Github:
https://github.com/geniusmart/RxJavaOperatorsUTSample
(二)不同類型的操作符實現(xiàn)
interval
interval
作為創(chuàng)建型的操作符艾少,具備間隔一段時間發(fā)送數(shù)據(jù)的能力卡乾,是我們寫其他操作符的基礎(chǔ),因此先來講解下interval
缚够。
這張圖要表達的意思很簡單幔妨,自頂而下的分析如下:
- 操作符:由于
interval
操作符是創(chuàng)建型的鹦赎,因此直接調(diào)用操作符來產(chǎn)生數(shù)據(jù)流,根據(jù) api 參數(shù)误堡,需定義其間隔時長钙姊,這個數(shù)值我們設(shè)置為100ms。 - 輸入:執(zhí)行了
Observable.interval()
之后埂伦,每間隔指定時間將輸出0煞额、1、2沾谜、3……
的無窮數(shù)據(jù)(注:通過彈珠圖膊毁,可以看到第一個數(shù)據(jù)也是有間隔時間的)。 - 輸出:即數(shù)據(jù)消費者基跑,在 RxJava 中體現(xiàn)為
Subscriber
婚温。這張圖里并沒有畫出輸出的數(shù)據(jù)流,為了觀察輸出媳否,我們自定義訂閱者栅螟。 - 實現(xiàn)思路:
interval
默認在Schedulers.computation()
線程中執(zhí)行,執(zhí)行的時間將會超過測試線程篱竭,根據(jù)上文的「預備知識」這一節(jié)所述力图,我們使用TestScheduler
來操縱時間,比如掺逼,為了輸出4個數(shù)據(jù)吃媒,interval
需要4個單位的間隔時間(400ms),將時間提前400ms可輸出我們想要的結(jié)果吕喘。具體實現(xiàn)如下:
@Test
public void interval() {
Observable.interval(100, TimeUnit.MILLISECONDS, mTestScheduler)
.subscribe(mList::add);
//時間提早400ms前
mTestScheduler.advanceTimeBy(400, TimeUnit.MILLISECONDS);
assertEquals(mList, Arrays.asList(0L, 1L, 2L, 3L));
//時間提早(400 + 200)ms前
mTestScheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
assertEquals(mList, Arrays.asList(0L, 1L, 2L, 3L, 4L, 5L));
}
以此類推赘那,range
倒淫、just
伪很、repeat
等創(chuàng)建型的操作符均可按照這種方式實現(xiàn)彈珠圖,這類操作符的實現(xiàn)代碼請查看:CreatingOperatorsTest.java
delay
delay
是工具類型的操作符盹兢,可以對數(shù)據(jù)流進行延時發(fā)送闻察。
與創(chuàng)建型操作符 interval
彈珠圖不一樣拱礁,delay
有輸入和輸出兩條數(shù)據(jù)流,中間是操作符的轉(zhuǎn)換過程蜓陌。輸入需借助創(chuàng)建型操作符實現(xiàn)(如 just
)觅彰,輸出則由訂閱者完成。
- 輸入:使用比較簡單的
just
操作符钮热,即Observable.just(1, 2, 1)
填抬。 - 輸出:經(jīng)過
delay
的變換,在延遲指定的時間之后隧期,輸出與輸入一致的輸入流飒责。 - 實現(xiàn)思路:此操作符也是與時間相關(guān)的操作符赘娄,通用
TestScheduler
來操縱時間,并且驗證「延時時間內(nèi)」和「超過延時時間」是否有數(shù)據(jù)流輸出宏蛉。代碼如下:
@Test
public void delay() {
Observable.just(1, 2, 1)
.delay(3000, TimeUnit.SECONDS, mTestScheduler)
.subscribe(mList::add);
mTestScheduler.advanceTimeBy(2000, TimeUnit.SECONDS);
System.out.println("after 2000ms,result = " + mList);
assertTrue(mList.isEmpty());
mTestScheduler.advanceTimeBy(1000, TimeUnit.SECONDS);
System.out.println("after 3000ms,result = " + mList);
assertEquals(mList, Arrays.asList(1, 2, 1));
}
工具型的操作符還有非常多遣臼,比如變換線程的 observeOn
和 subscribeOn
,比如 Observable
生命周期的事件監(jiān)聽操作符 doOnSubscribe
拾并、doOnNext
揍堰、doOnCompleted
,延遲訂閱的 delaySubscription
等嗅义,這類型的操作符實現(xiàn)請查看:UtilityOperatorsTest.java屏歹。
amb
amb
是條件型的操作符(Conditional Operators),滿足一定的條件數(shù)據(jù)流才會開始發(fā)送之碗,而 amb
需要滿足的條件便是:多個數(shù)據(jù)流中最早產(chǎn)生數(shù)據(jù)的數(shù)據(jù)流進行發(fā)送蝙眶,彈珠圖也明確地表達出了這層含義。
- 輸入:這里有3條數(shù)據(jù)流褪那,開始發(fā)送數(shù)據(jù)的時間各不一樣幽纷,通過之前的操作符講解,這里使用
just
+delay
即可實現(xiàn)博敬。 - 輸出:經(jīng)過
amb
變化后友浸,輸出了最早發(fā)送數(shù)據(jù)的數(shù)據(jù)流,即第二條數(shù)據(jù)流冶忱。 - 實現(xiàn)思路:通過
delay
操作符分別延時500s尾菇、200s和1000s,然后通用TestScheduler
將時間提早1000s囚枪,訂閱數(shù)據(jù)流后,驗證下輸出劳淆。代碼如下:
@Test
public void amb() {
Observable<Integer> o1 = Observable.just(20, 40, 60)
.delay(500, TimeUnit.SECONDS, mTestScheduler);
Observable<Integer> o2 = Observable.just(1, 2, 3)
.delay(200, TimeUnit.SECONDS, mTestScheduler);
Observable<Integer> o3 = Observable.just(0, 0, 0)
.delay(1000, TimeUnit.SECONDS, mTestScheduler);
Observable.amb(o1, o2, o3)
.subscribe(mList::add);
mTestScheduler.advanceTimeBy(1000, TimeUnit.SECONDS);
assertEquals(mList, Arrays.asList(1, 2, 3));
}
以此類推链沼,更多條件行的操作符,如 skipUntil
沛鸵、takeUntil
等括勺,請查看ConditionalAndBooleanOperatorsTest.java。
buffer
buffer
是轉(zhuǎn)換型的操作符曲掰,他可以將單個數(shù)據(jù)緩存起來疾捍,批量發(fā)送,發(fā)送的數(shù)據(jù)類型是 List
栏妖。
上圖要表達的意思很明確乱豆,發(fā)送6個數(shù)據(jù),每三個做一次緩存吊趾,然后批量發(fā)送宛裕,代碼實現(xiàn)如下:
@Test
public void buffer() {
Observable.just(1, 2, 3, 4, 5, 6)
.buffer(3)
.subscribe(mList::add);
System.out.println(mList);
List<List<Integer>> exceptList = Arrays.asList(Arrays.asList(1, 2, 3),
Arrays.asList(4, 5, 6));
assertEquals(mList, exceptList);
}
flatMap 和 concatMap
接下來瑟啃,來對比一組轉(zhuǎn)換型的操作符:flatMap
和 concatMap
,這兩者充分體現(xiàn)了 marble diagrams 給我們帶來的各種有價值的信息揩尸。以下是這兩個操作符的 marble diagrams:
- 輸入:兩者完全一模一樣的輸入蛹屿,這里要重點關(guān)注彈珠的顏色,顏色代表了數(shù)據(jù)流的順序岩榆。
- 輸出:輸入的數(shù)據(jù)流經(jīng)過變換后错负,每份數(shù)據(jù)都變成了兩份,此外勇边,**
flatMap
變換后犹撒,綠色的◇和藍色◇是交叉的,而concatMap
則保持了與輸入一致的順序**粥诫,這個細節(jié)決定了我們?nèi)绾蝸韺崿F(xiàn)這兩張圖油航。 - 實現(xiàn)思路:在
flatMap
或concatMap
之后,3個數(shù)據(jù)變成了6個數(shù)據(jù)怀浆,假設(shè)輸入為1谊囚、2、3
执赡,則輸出為1镰踏、1、2沙合、2奠伪、3、3
首懈,我們要想辦法讓變換后的輸出有時間差绊率,即按照1、1究履、2滤否、3、2最仑、3
的順序輸出藐俺,思考再三,interval
可以實現(xiàn)這個場景泥彤,將原始的輸入流1欲芹、2、3分別作為interval
時間間隔的變量吟吝,來模擬交叉的輸出菱父。具體實現(xiàn)如下:
@Test
public void flatMap() {
Observable.just(1, 2, 3)
.flatMap((Func1<Integer, Observable<?>>) num -> Observable.interval(num - 1,
TimeUnit.SECONDS, mTestScheduler)
.take(2)
.map(value -> num + "◇"))
.subscribe(mList::add);
mTestScheduler.advanceTimeBy(100, TimeUnit.SECONDS);
assertEquals(mList, Arrays.asList("1◇", "1◇", "2◇", "3◇", "2◇", "3◇"));
System.out.println(mList);
}
上述代碼中,只需把 flatMap
修改為 concatMap
,便可獲得 "1◇", "1◇", "2◇", "2◇", "3◇", "3◇"
的數(shù)據(jù)流滞伟,與彈珠圖所要表達的意思完全一致揭鳞。通過這個例子,我們可以感受到梆奈,彈珠圖包含了操作符的諸多細節(jié)野崇,嚴謹?shù)貙崿F(xiàn)彈珠圖的輸入輸出,可以更深入的了解操作符亩钟。
更多轉(zhuǎn)換型的操作符的實現(xiàn)乓梨,如 switchMap
、groupBy
清酥、window
等扶镀,請查看TransformingOperatorsTest.java。
debounce
debounce
是過濾型的操作符焰轻,所以會按一定的規(guī)則過濾數(shù)據(jù)流臭觉。這個規(guī)則是:Observable每產(chǎn)生一個結(jié)果后,如果在規(guī)定的間隔時間內(nèi)沒有別的結(jié)果產(chǎn)生辱志,則把這個結(jié)果提交給訂閱者處理蝠筑,否則忽略該結(jié)果。
- 輸入:對于輸入的數(shù)據(jù)流可以這樣定義:先產(chǎn)生
1
的數(shù)據(jù)揩懒,間隔500ms后產(chǎn)生2什乙、3、4已球、5
臣镣,再間隔500ms,產(chǎn)生6
智亮,使用create
操作符結(jié)合Thread.sleep()
來實現(xiàn)輸入忆某。 - 輸出:
debounce
的間隔時間設(shè)置為400ms,在三段間隔周期內(nèi)阔蛉,將依次輸出 1褒繁、5、6 馍忽。具體代碼如下:
@Test
public void debounce() {
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
OperatorUtils.sleep(500);
subscriber.onNext(2);
subscriber.onNext(3);
subscriber.onNext(4);
subscriber.onNext(5);
OperatorUtils.sleep(500);
subscriber.onNext(6);
subscriber.onCompleted();
}
})
.subscribeOn(mTestScheduler)
.doOnNext(System.out::println)
.debounce(400, TimeUnit.MILLISECONDS)
.subscribe(mList::add);
// 測試線程將時間提早10ms,可以保證create操作符順利執(zhí)行完畢
mTestScheduler.advanceTimeBy(10, TimeUnit.MILLISECONDS);
System.out.println(mList);
assertEquals(mList, Arrays.asList(1, 5, 6));
}
以此類推燕差,按照這種方式可以實現(xiàn) sample
遭笋、throttleFirst
、throttleLast
等過濾型的操作符徒探,具體代碼請查看:FilteringOperatorsTest.java 瓦呼。
merge
merge
是聚合型的操作符。既然是聚合测暗,因此需要2條以上的數(shù)據(jù)流央串,聚合之后磨澡,輸出一條全新的數(shù)據(jù)流。
- 輸入:兩條數(shù)據(jù)流质和,并且要重點關(guān)注下數(shù)據(jù)發(fā)送的順序稳摄。
- 輸出:根據(jù)輸入的數(shù)據(jù)順序,原封不動的合并之后饲宿,進行輸出厦酬。
- 實現(xiàn)思路:兩條數(shù)據(jù)流均使用
interval
創(chuàng)建,第一條的間隔時間定義為5s瘫想,第二條數(shù)據(jù)流在第一條數(shù)據(jù)流產(chǎn)生了三個數(shù)據(jù)之后才發(fā)出第一個數(shù)據(jù)仗阅,因此時間間隔設(shè)置為18s,具體實現(xiàn)如下:
@Test
public void merge() {
Observable<Long> observable1 = Observable.interval(5, TimeUnit.SECONDS, mTestScheduler)
.take(5)
.map(aLong -> (aLong + 1) * 20)
.doOnNext(System.out::println);
Observable<Long> observable2 = Observable.interval(18, TimeUnit.SECONDS, mTestScheduler)
.take(2)
.map(aLong -> 1L)
.doOnNext(System.out::println);
Observable.merge(observable1, observable2).subscribe(mList::add);
mTestScheduler.advanceTimeBy(1000, TimeUnit.SECONDS);
assertEquals(mList, Arrays.asList(20L, 40L, 60L, 1L, 80L, 100L, 1L));
}
combineLatest
combineLatest
是聚合型的操作符国夜, 其聚合的規(guī)則是:每條數(shù)據(jù)流中的每個數(shù)據(jù)都要與另外一條數(shù)據(jù)流已發(fā)送的最近的數(shù)據(jù)進行兩兩結(jié)合减噪。
構(gòu)造出如彈珠圖所示的兩條數(shù)據(jù)流,重點在于制造時間差和多線程:
- 使用
create
+Thread.sleep()
來制造數(shù)據(jù)流產(chǎn)生的時間差车吹。 - 讓兩條數(shù)據(jù)流在不同的線程中發(fā)送數(shù)據(jù)筹裕,使用
subscribeOn
操作符可以實現(xiàn)線程的調(diào)度。
- 首先是第一條數(shù)據(jù)流的構(gòu)造礼搁,讓其在
TestScheduler.test()
線程中產(chǎn)生數(shù)據(jù)(其實便是測試線程饶碘,增加了操縱時間的能力),代碼如下:
Observable<Integer> observable1 = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
OperatorUtils.logThread("observable1");
subscriber.onNext(1);
OperatorUtils.sleep(500);
subscriber.onNext(2);
OperatorUtils.sleep(1500);
subscriber.onNext(3);
OperatorUtils.sleep(250);
subscriber.onNext(4);
OperatorUtils.sleep(500);
subscriber.onNext(5);
subscriber.onCompleted();
}
}).subscribeOn(mTestScheduler).doOnNext(System.out::println);
- 其次是第二條數(shù)據(jù)流馒吴,將其生產(chǎn)數(shù)據(jù)的線程定義為
Schedulers.newThread()
扎运,代碼如下:
Observable<String> observable2 = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
OperatorUtils.logThread("observable2");
OperatorUtils.sleep(250);
subscriber.onNext("A");
OperatorUtils.sleep(300);
subscriber.onNext("B");
OperatorUtils.sleep(500);
subscriber.onNext("C");
OperatorUtils.sleep(100);
subscriber.onNext("D");
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.newThread()).doOnNext(System.out::println);
- 前面2點完成了輸入,接下來就是進行聚合變換饮戳,以及消費數(shù)據(jù)豪治,產(chǎn)生輸出,并驗證與彈珠圖的輸出一致扯罐。
(Func2<Integer, String, Object>) (integer, s) -> integer + s).subscribe(mList::add);
//測試線程提前一定時間负拟,讓observable1能順利開始發(fā)送數(shù)據(jù)
mTestScheduler.advanceTimeBy(10, TimeUnit.MILLISECONDS);
System.out.println(mList);
assertEquals(mList, Arrays.asList("1A", "2A", "2B", "2C", "2D", "3D", "4D", "5D"));
與 merge
和 combineLatest
類似,我們可以依次實現(xiàn) zip
歹河、switchOnNext
掩浙、 withLatestFrom
等聚合型操作符,并了解他們之間的區(qū)別秸歧。聚合型的操作符所有代碼請前往:CombiningOperatorsTest.java 厨姚。
connect
之前介紹的創(chuàng)建型操作符均創(chuàng)建了 cold 類型的 Observable
,其特點是只有訂閱者訂閱數(shù)據(jù)時键菱,數(shù)據(jù)流才會開始發(fā)送數(shù)據(jù)谬墙。于此相反,hot 類型的 Observable
不管有沒有訂閱者,都可以直接開始發(fā)送數(shù)據(jù)拭抬。publish
和 connect
是與 hot Observable
相關(guān)的一類操作符部默。
這張彈珠圖并不好理解,但如果能完整實現(xiàn)造虎,對 hot Observable
的便能了然于胸傅蹂。這張圖中,輸出有三條數(shù)據(jù)流累奈,代表有三個訂閱者贬派,但是訂閱的時間不一致,最終接收到的數(shù)據(jù)也不一致澎媒,此外搞乏,這張圖中,體現(xiàn)了 publish
和 connect
兩種操作符戒努。
- 輸入:數(shù)據(jù)流的產(chǎn)生比較清晰请敦,用上文講過的創(chuàng)建型操作符即可實現(xiàn)。由于需要時間差储玫,因此采用
interval
來產(chǎn)生數(shù)據(jù)流侍筛,時間間隔定義為3s。此外撒穷,interval
產(chǎn)生的數(shù)據(jù)流是 cold 類型的匣椰,如何由 cold 變成 hot,其實這便是publish
操作符要做的事情端礼。 - 輸出:輸出的信息量比較大禽笑,我們需要好好捋一捋:
- 首先可以明確有三個訂閱者,且訂閱的時間各不一樣蛤奥。延時訂閱可以使用
delaySubscription
操作符佳镜。 - 第一個訂閱者即刻訂閱,不延時凡桥,而他在訂閱時蟀伸,數(shù)據(jù)流還未開始發(fā)送數(shù)據(jù),因此可以訂閱到完整的數(shù)據(jù)流缅刽。
- 第一個訂閱者的數(shù)據(jù)流中有個操作符不可忽視——
connect
啊掏,他決定著Observable
何時開始發(fā)送數(shù)據(jù)。根據(jù)圖中所示衰猛,將時間定義為2秒后脖律。 - 第二個訂閱者在數(shù)據(jù)發(fā)送了2個之后才開始訂閱,因此將訂閱時間設(shè)置為延遲6秒訂閱腕侄。他將只能訂閱到最后一個數(shù)據(jù)。
- 第三個訂閱者與第一個區(qū)別并不大,我們將他定義為延時1秒后訂閱冕杠。
完整的代碼實現(xiàn)如下:
public void connect() {
List<Integer> list1 = new ArrayList<>();
List<Integer> list2 = new ArrayList<>();
List<Integer> list3 = new ArrayList<>();
//構(gòu)造1,2,3的數(shù)據(jù)流微姊,每隔3s發(fā)射數(shù)據(jù)
ConnectableObservable<Integer> connectableObservable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
OperatorUtils.sleep(3000);
subscriber.onNext(2);
OperatorUtils.sleep(3000);
subscriber.onNext(3);
}
}).publish();
System.out.println("Subscriber1-0s后開始訂閱數(shù)據(jù)");
//立刻訂閱完整的數(shù)據(jù)流
connectableObservable.doOnNext(num -> System.out.println("Subscriber1-->" + num))
.subscribe(list1::add);
//延遲6s后再訂閱,將只訂閱到3的數(shù)據(jù)流
connectableObservable.delaySubscription(6, TimeUnit.SECONDS, Schedulers.newThread())
.doOnSubscribe(()->{
System.out.println("Subscriber2-6s后開始訂閱數(shù)據(jù)");
})
.doOnNext(num -> System.out.println("Subscriber2-->" + num))
.subscribe(list2::add);
//延遲1s后再訂閱分预,將只訂閱到完整的數(shù)據(jù)流
connectableObservable.delaySubscription(1, TimeUnit.SECONDS, Schedulers.newThread())
.doOnSubscribe(()->{
System.out.println("Subscriber3-1s后開始訂閱數(shù)據(jù)");
})
.doOnNext(num -> System.out.println("Subscriber3-->" + num))
.subscribe(list3::add);
//延時2s執(zhí)行connect()
OperatorUtils.sleep(2000);
System.out.println("Observable 2s后觸發(fā)connect()");
connectableObservable.connect();
assertEquals(list1, Arrays.asList(1, 2, 3));
assertEquals(list2, Collections.singletonList(3));
assertEquals(list3, Arrays.asList(1, 2, 3));
}
以此類推兢交,可以實現(xiàn)其他與 hot Observable
相關(guān)的操作符,如 refCount笼痹、replay配喳、cache 等,具體代碼請查看ConnectableOperatorsTest.java 凳干。
其他類型的操作符
除了上文介紹的7種不同類型的操作符之外晴裹,還有錯誤處理類型(如 retry
、retryWhen
)救赐、背壓類型(如 onBackpressureBuffer
)涧团、Convert 類型(如toList
、toMap
)的操作符未涉及到经磅,以及一些彈珠圖無法完全詮釋操作符本身的諸多細節(jié)的講解泌绣,篇幅所限,請移步這篇文章查看预厌。
(三)本文代碼
本文的所有代碼請前往這個地址查看:
https://github.com/geniusmart/RxJavaOperatorsUTSample
目前已經(jīng)實現(xiàn)了的彈珠圖(marble diagrams)的操作符種類如下:
(四)結(jié)束語
授人以魚不如授人以漁阿迈。本文側(cè)重介紹一種學習 RxJava 、全面且深入了解操作符的方式轧叽,總結(jié)起來有如下關(guān)鍵點:
- 使用單元測試實現(xiàn)苗沧,消除對 Android 的依賴,且不要涉及太多的測試技巧犹芹,專注于操作符的實現(xiàn)崎页。
- 有目的性且嚴謹?shù)貙崿F(xiàn)輸入輸出。每個操作符腰埂,讀懂 marble diagrams 飒焦,并通過代碼實現(xiàn)。
- marble diagrams 圖片來自于RX Marbles 屿笼,或者 RxJava 官方 牺荠。
- 一些有更深層次含義或細節(jié)的,marble diagrams 無法完整詮釋的驴一,如
defer
休雌,retryWhen
,查閱更多的文章實現(xiàn)肝断。這部分的講解請移步到另外一篇文章:《使用 UT 玩轉(zhuǎn) defer 和 retryWhen》杈曲。