Rx第六章
組合Observables
上一章中涌乳,我們學到如何轉(zhuǎn)換可觀測序列胞谭。我們也看到了map()
,scan()
,groupBY()
,以及更多有用的函數(shù)的實際例子露氮,它們幫助我們操作Observable來創(chuàng)建我們想要的Observable须揣。
本章中葛虐,我們將研究組合函數(shù)并學習如何同時處理多個Observables來創(chuàng)建我們想要的Observable。
Merge
在”異步的世界“中經(jīng)常會創(chuàng)建這樣的場景译株,我們有多個來源但是又只想有一個結(jié)果:多輸入瓜喇,單輸出。RxJava的merge()
方法將幫助你把兩個甚至更多的Observables合并到他們發(fā)射的數(shù)據(jù)項里歉糜。下圖給出了把兩個序列合并在一個最終發(fā)射的Observable乘寒。
正如你看到的那樣,發(fā)射的數(shù)據(jù)被交叉合并到一個Observable里面现恼。注意如果你同步的合并Observable,它們將連接在一起并且不會交叉黍檩。
像往常一樣叉袍,我們用我們的App和已安裝的App列表來創(chuàng)建了一個“真實世界”的例子。為此我們還需要第二個Observable刽酱。我們可以創(chuàng)建一個單獨的應用列表然后讓它逆序排列喳逛。當然這沒有實際的意義,只是為了這個例子棵里。對于第二個列表润文,我們的loadList()
函數(shù)像下面這樣:
private void loadList(List<AppInfo> apps) {
mRecyclerView.setVisibility(View.VISIBLE);
List reversedApps = Lists.reverse(apps);
Observable<AppInfo> observableApps =Observable.from(apps);
Observable<AppInfo> observableReversedApps =Observable.from(reversedApps);
Observable<AppInfo> mergedObserbable = Observable.merge(observableApps,observableReversedApps);
mergedObserbable.subscribe(new Observer<AppInfo>(){
@Override
public void onCompleted() {
mSwipeRefreshLayout.setRefreshing(false);
Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
}
@Override
public void onError(Throwable e) {
Toast.makeText(getActivity(), "One of the two Observable threw an error!", Toast.LENGTH_SHORT).show();
mSwipeRefreshLayout.setRefreshing(false);
}
@Override
public void onNext(AppInfoappInfo) {
mAddedApps.add(appInfo);
mAdapter.addApplication(mAddedApps.size() - 1, appInfo);
}
});
}
我們創(chuàng)建了Observable和observableApps數(shù)據(jù)項以及新的observableReversedApps逆序列表姐呐。使用Observable.merge()
,我們可以創(chuàng)建新的Observable MergedObservable
典蝌,它在單個可觀測序列中發(fā)射源Observables發(fā)出的所有數(shù)據(jù)曙砂。
正如你能看到的,每個方法簽名都是一樣的,因此我們的觀察者無需在意任何不同就可以復用代碼骏掀。結(jié)果如下:
注意錯誤時的toast消息鸠澈,你可以認為每個Observable拋出的錯誤都將會打斷合并。如果你需要避免這種情況截驮,RxJava提供了mergeDelayError()
笑陈,它能從一個Observable中繼續(xù)發(fā)射數(shù)據(jù)即便是其中有一個拋出了錯誤。當所有的Observables都完成時葵袭,mergeDelayError()
將會發(fā)射onError()
涵妥,如下圖所示:
ZIP
在一種新的可能場景中處理多個數(shù)據(jù)來源時會帶來:多從個Observables接收數(shù)據(jù),處理它們坡锡,然后將它們合并成一個新的可觀測序列來使用蓬网。RxJava有一個特殊的方法可以完成:zip()
合并兩個或者多個Observables發(fā)射出的數(shù)據(jù)項,根據(jù)指定的函數(shù)Func*
變換它們娜氏,并發(fā)射一個新值拳缠。下圖展示了zip()
方法如何處理發(fā)射的“numbers”和“l(fā)etters”然后將它們合并一個新的數(shù)據(jù)項:
對于“真實世界”的例子來說,我們將使用已安裝的應用列表和一個新的動態(tài)的Observable來讓例子變得有點有趣味贸弥。
Observable<Long> tictoc = Observable.interval(1, TimeUnit.SECONDS);
tictoc
Observable變量使用interval()
函數(shù)每秒生成一個Long類型的數(shù)據(jù):雖簡單但有效窟坐,正如之前所說的,我們需要一個Func
對象绵疲。因為它需要傳兩個參數(shù)哲鸳,所以是Func2
:
private AppInfo updateTitle(AppInfoappInfo, Long time) {
appInfo.setName(time + " " + appInfo.getName());
return appInfo;
}
現(xiàn)在我們的loadList()
函數(shù)變成這樣:
private void loadList(List<AppInfo> apps) {
mRecyclerView.setVisibility(View.VISIBLE);
Observable<AppInfo> observableApp = Observable.from(apps);
Observable<Long> tictoc = Observable.interval(1, TimeUnit.SECONDS);
Observable.zip(observableApp, tictoc,
(AppInfo appInfo, Long time) -> updateTitle(appInfo, time))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<AppInfo>() {
@Override
public void onCompleted() {
Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
}
@Override
public void onError(Throwable e) {
mSwipeRefreshLayout.setRefreshing(false);
Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
}
@Override
public void onNext(AppInfoappInfo) {
if (mSwipeRefreshLayout.isRefreshing()) {
mSwipeRefreshLayout.setRefreshing(false);
}
mAddedApps.add(appInfo);
int position = mAddedApps.size() - 1;
mAdapter.addApplication(position, appInfo);
mRecyclerView.smoothScrollToPosition(position);
}
});
}
正如你看到的那樣,zip()
函數(shù)有三個參數(shù):兩個Observables和一個Func2
盔憨。
仔細一看會發(fā)現(xiàn)observeOn()
函數(shù)徙菠。它將在下一章中講解:現(xiàn)在我們可以小試一下。
結(jié)果如下:
Join
前面兩個方法郁岩,zip()
和merge()
方法作用在發(fā)射數(shù)據(jù)的范疇內(nèi)婿奔,在決定如何操作值之前有些場景我們需要考慮時間的。RxJava的join()
函數(shù)基于時間窗口將兩個Observables發(fā)射的數(shù)據(jù)結(jié)合在一起问慎。
為了正確的理解上一張圖萍摊,我們解釋下join()
需要的參數(shù):
- 第二個Observable和源Observable結(jié)合。
-
Func1
參數(shù):在指定的由時間窗口定義時間間隔內(nèi)如叼,源Observable發(fā)射的數(shù)據(jù)和從第二個Observable發(fā)射的數(shù)據(jù)相互配合返回的Observable冰木。 -
Func1
參數(shù):在指定的由時間窗口定義時間間隔內(nèi),第二個Observable發(fā)射的數(shù)據(jù)和從源Observable發(fā)射的數(shù)據(jù)相互配合返回的Observable。 -
Func2
參數(shù):定義已發(fā)射的數(shù)據(jù)如何與新發(fā)射的數(shù)據(jù)項相結(jié)合踊沸。
如下練習的例子歇终,我們可以修改loadList()
函數(shù)像下面這樣:
private void loadList(List<AppInfo> apps) {
mRecyclerView.setVisibility(View.VISIBLE);
Observable<AppInfo> appsSequence =
Observable.interval(1000, TimeUnit.MILLISECONDS)
.map(position -> {
return apps.get(position.intValue());
});
Observable<Long> tictoc = Observable.interval(1000,TimeUnit.MILLISECONDS);
appsSequence.join(
tictoc,
appInfo -> Observable.timer(2,TimeUnit.SECONDS),
time -> Observable.timer(0, TimeUnit.SECONDS),
this::updateTitle)
.observeOn(AndroidSchedulers.mainThread())
.take(10)
.subscribe(new Observer<AppInfo>() {
@Override
public void onCompleted() {
Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
}
@Override
public void onError(Throwable e) {
mSwipeRefreshLayout.setRefreshing(false);
Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
}
@Override
public void onNext(AppInfoappInfo) {
if (mSwipeRefreshLayout.isRefreshing()) {
mSwipeRefreshLayout.setRefreshing(false);
}
mAddedApps.add(appInfo);
int position = mAddedApps.size() - 1;
mAdapter.addApplication(position, appInfo);
mRecyclerView.smoothScrollToPosition(position);
}
});
}
我們有一個新的對象appsSequence
,它是一個每秒從我們已安裝的app列表發(fā)射app數(shù)據(jù)的可觀測序列逼龟。tictoc
這個Observable數(shù)據(jù)每秒只發(fā)射一個新的Long
型整數(shù)评凝。為了合并它們,我們需要指定兩個Func1
變量:
appInfo -> Observable.timer(2, TimeUnit.SECONDS)
time -> Observable.timer(0, TimeUnit.SECONDS)
上面描述了兩個時間窗口审轮。下面一行描述我們?nèi)绾问褂?code>Func2將兩個發(fā)射的數(shù)據(jù)結(jié)合在一起肥哎。
this::updateTitle
結(jié)果如下:
它看起來有點亂,但是注意app的名字和我們指定的時間窗口疾渣,我們可以看到:一旦第二個數(shù)據(jù)發(fā)射了我們就會將它與源數(shù)據(jù)結(jié)合篡诽,但我們用同一個源數(shù)據(jù)有2秒鐘。這就是為什么標題重復數(shù)字增加的原因榴捡。
值得一提的是杈女,為了簡單起見,也有一個join()
操作符作用于字符串然后簡單的和發(fā)射的字符串連接成最終的字符串吊圾。
combineLatest
RxJava的combineLatest()
函數(shù)有點像zip()
函數(shù)的特殊形式达椰。正如我們已經(jīng)學習的,zip()
作用于最近未打包的兩個Observables项乒。相反啰劲,combineLatest()
作用于最近發(fā)射的數(shù)據(jù)項:如果Observable1
發(fā)射了A并且Observable2
發(fā)射了B和C,combineLatest()
將會分組處理AB和AC檀何,如下圖所示:
combineLatest()
函數(shù)接受二到九個Observable作為參數(shù)蝇裤,如果有需要的話或者單個Observables列表作為參數(shù)。
從之前的例子中把loadList()
函數(shù)借用過來频鉴,我們可以修改一下來用于combineLatest()
實現(xiàn)“真實世界”這個例子:
private void loadList(List<AppInfo> apps) {
mRecyclerView.setVisibility(View.VISIBLE);
Observable<AppInfo> appsSequence = Observable.interval(1000, TimeUnit.MILLISECONDS)
.map(position ->apps.get(position.intValue()));
Observable<Long> tictoc = Observable.interval(1500, TimeUnit.MILLISECONDS);
Observable.combineLatest(appsSequence, tictoc,
this::updateTitle)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<AppInfo>() {
@Override
public void onCompleted() {
Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
}
@Override
public void onError(Throwable e) {
mSwipeRefreshLayout.setRefreshing(false);
Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
}
@Override
public void onNext(AppInfoappInfo) {
if (mSwipeRefreshLayout.isRefreshing()) {
mSwipeRefreshLayout.setRefreshing(false);
}
mAddedApps.add(appInfo);
int position = mAddedApps.size() - 1;
mAdapter.addApplication(position, appInfo);
mRecyclerView.smoothScrollToPosition(position);
}
});
}
這我們使用了兩個Observables:一個是每秒鐘從我們已安裝的應用列表發(fā)射一個App數(shù)據(jù)栓辜,第二個是每隔1.5秒發(fā)射一個Long
型整數(shù)。我們將他們結(jié)合起來并執(zhí)行updateTitle()
函數(shù)垛孔,結(jié)果如下:
正如你看到的藕甩,由于不同的時間間隔,AppInfo
對象如我們所預料的那樣有時候會重復周荐。
And,Then和When
在將來還有一些zip()
滿足不了的場景狭莱。如復雜的架構(gòu),或者是僅僅為了個人愛好概作,你可以使用And/Then/When解決方案腋妙。它們在RxJava的joins包下,使用Pattern和Plan作為中介仆嗦,將發(fā)射的數(shù)據(jù)集合并到一起辉阶。
我們的loadList()
函數(shù)將會被修改從這樣:
private void loadList(List<AppInfo> apps) {
mRecyclerView.setVisibility(View.VISIBLE);
Observable<AppInfo> observableApp = Observable.from(apps);
Observable<Long> tictoc = Observable.interval(1, TimeUnit.SECONDS);
Pattern2<AppInfo, Long> pattern = JoinObservable.from(observableApp).and(tictoc);
Plan0<AppInfo> plan = pattern.then(this::updateTitle);
JoinObservable
.when(plan)
.toObservable()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<AppInfo>() {
@Override
public void onCompleted() {
Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
}
@Override
public void onError(Throwable e) {
mSwipeRefreshLayout.setRefreshing(false);
Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
}
@Override
public void onNext(AppInfoappInfo) {
if (mSwipeRefreshLayout.isRefreshing()) {
mSwipeRefreshLayout.setRefreshing(false);
}
mAddedApps.add(appInfo);
int position = mAddedApps.size() - 1;
mAdapter.addApplication(position, appInfo); mRecyclerView.smoothScrollToPosition(position);
}
});
}
和通常一樣先壕,我們有兩個發(fā)射的序列瘩扼,observableApp
谆甜,發(fā)射我們安裝的應用列表數(shù)據(jù),tictoc
每秒發(fā)射一個Long
型整數(shù)〖拢現(xiàn)在我們用and()
連接源Observable和第二個Observable规辱。
JoinObservable.from(observableApp).and(tictoc);
這里創(chuàng)建一個pattern
對象,使用這個對象我們可以創(chuàng)建一個Plan
對象:"我們有兩個發(fā)射數(shù)據(jù)的Observables,then()
是做什么的栽燕?"
pattern.then(this::updateTitle);
現(xiàn)在我們有了一個Plan
對象并且當plan發(fā)生時我們可以決定接下來發(fā)生的事情罕袋。
.when(plan).toObservable()
這時候,我們可以訂閱新的Observable碍岔,正如我們總是做的那樣浴讯。
Switch
有這樣一個復雜的場景就是在一個subscribe-unsubscribe
的序列里我們能夠從一個Observable自動取消訂閱來訂閱一個新的Observable。
RxJava的switch()
蔼啦,正如定義的榆纽,將一個發(fā)射多個Observables的Observable轉(zhuǎn)換成另一個單獨的Observable,后者發(fā)射那些Observables最近發(fā)射的數(shù)據(jù)項捏肢。
給出一個發(fā)射多個Observables序列的源Observable奈籽,switch()
訂閱到源Observable然后開始發(fā)射由第一個發(fā)射的Observable發(fā)射的一樣的數(shù)據(jù)。當源Observable發(fā)射一個新的Observable時鸵赫,switch()
立即取消訂閱前一個發(fā)射數(shù)據(jù)的Observable(因此打斷了從它那里發(fā)射的數(shù)據(jù)流)然后訂閱一個新的Observable衣屏,并開始發(fā)射它的數(shù)據(jù)。
StartWith
我們已經(jīng)學到如何連接多個Observables并追加指定的值到一個發(fā)射序列里辩棒。RxJava的startWith()
是concat()
的對應部分狼忱。正如concat()
向發(fā)射數(shù)據(jù)的Observable追加數(shù)據(jù)那樣,在Observable開始發(fā)射他們的數(shù)據(jù)之前盗温, startWith()
通過傳遞一個參數(shù)來先發(fā)射一個數(shù)據(jù)序列藕赞。
總結(jié)
這章中,我們學習了如何將兩個或者更多個Observable結(jié)合來創(chuàng)建一個新的可觀測序列卖局。我們將能夠merge
Observable斧蜕,join
Observables ,zip
Observables 并在幾種情況下把他們結(jié)合在一起砚偶。
下一章批销,我們將介紹調(diào)度器,它將很容易的幫助我們創(chuàng)建主線程以及提高我們應用程序的性能染坯。我們也將學習如何正確的執(zhí)行長任務(wù)或者I/O任務(wù)來獲得更好的性能均芽。