Rx第六章

Rx第六章

組合Observables

上一章中涌乳,我們學到如何轉(zhuǎn)換可觀測序列胞谭。我們也看到了map(),scan(),groupBY(),以及更多有用的函數(shù)的實際例子露氮,它們幫助我們操作Observable來創(chuàng)建我們想要的Observable须揣。

本章中葛虐,我們將研究組合函數(shù)并學習如何同時處理多個Observables來創(chuàng)建我們想要的Observable。

Merge

在”異步的世界“中經(jīng)常會創(chuàng)建這樣的場景译株,我們有多個來源但是又只想有一個結(jié)果:多輸入瓜喇,單輸出。RxJava的merge()方法將幫助你把兩個甚至更多的Observables合并到他們發(fā)射的數(shù)據(jù)項里歉糜。下圖給出了把兩個序列合并在一個最終發(fā)射的Observable乘寒。

正如你看到的那樣,發(fā)射的數(shù)據(jù)被交叉合并到一個Observable里面现恼。注意如果你同步的合并Observable,它們將連接在一起并且不會交叉黍檩。

像往常一樣叉袍,我們用我們的App和已安裝的App列表來創(chuàng)建了一個“真實世界”的例子。為此我們還需要第二個Observable刽酱。我們可以創(chuàng)建一個單獨的應用列表然后讓它逆序排列喳逛。當然這沒有實際的意義,只是為了這個例子棵里。對于第二個列表润文,我們的loadList()函數(shù)像下面這樣:

private void loadList(List<AppInfo> apps) {
    mRecyclerView.setVisibility(View.VISIBLE);
    List reversedApps = Lists.reverse(apps);
    Observable<AppInfo> observableApps =Observable.from(apps);
    Observable<AppInfo> observableReversedApps =Observable.from(reversedApps);
    Observable<AppInfo> mergedObserbable = Observable.merge(observableApps,observableReversedApps);
    
    mergedObserbable.subscribe(new Observer<AppInfo>(){
        @Override
        public void onCompleted() {
            mSwipeRefreshLayout.setRefreshing(false);
            Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
        }
        
        @Override
        public void onError(Throwable e) {
            Toast.makeText(getActivity(), "One of the two Observable threw an error!", Toast.LENGTH_SHORT).show();
            mSwipeRefreshLayout.setRefreshing(false);
        }
        
        @Override
        public void onNext(AppInfoappInfo) {
            mAddedApps.add(appInfo);
            mAdapter.addApplication(mAddedApps.size() - 1, appInfo);
        } 
    });
}

我們創(chuàng)建了Observable和observableApps數(shù)據(jù)項以及新的observableReversedApps逆序列表姐呐。使用Observable.merge(),我們可以創(chuàng)建新的Observable MergedObservable典蝌,它在單個可觀測序列中發(fā)射源Observables發(fā)出的所有數(shù)據(jù)曙砂。

正如你能看到的,每個方法簽名都是一樣的,因此我們的觀察者無需在意任何不同就可以復用代碼骏掀。結(jié)果如下:


注意錯誤時的toast消息鸠澈,你可以認為每個Observable拋出的錯誤都將會打斷合并。如果你需要避免這種情況截驮,RxJava提供了mergeDelayError()笑陈,它能從一個Observable中繼續(xù)發(fā)射數(shù)據(jù)即便是其中有一個拋出了錯誤。當所有的Observables都完成時葵袭,mergeDelayError()將會發(fā)射onError()涵妥,如下圖所示:

ZIP

在一種新的可能場景中處理多個數(shù)據(jù)來源時會帶來:多從個Observables接收數(shù)據(jù),處理它們坡锡,然后將它們合并成一個新的可觀測序列來使用蓬网。RxJava有一個特殊的方法可以完成:zip()合并兩個或者多個Observables發(fā)射出的數(shù)據(jù)項,根據(jù)指定的函數(shù)Func*變換它們娜氏,并發(fā)射一個新值拳缠。下圖展示了zip()方法如何處理發(fā)射的“numbers”和“l(fā)etters”然后將它們合并一個新的數(shù)據(jù)項:

對于“真實世界”的例子來說,我們將使用已安裝的應用列表和一個新的動態(tài)的Observable來讓例子變得有點有趣味贸弥。

Observable<Long> tictoc = Observable.interval(1, TimeUnit.SECONDS);

tictocObservable變量使用interval()函數(shù)每秒生成一個Long類型的數(shù)據(jù):雖簡單但有效窟坐,正如之前所說的,我們需要一個Func對象绵疲。因為它需要傳兩個參數(shù)哲鸳,所以是Func2:

private AppInfo updateTitle(AppInfoappInfo, Long time) {
    appInfo.setName(time + " " + appInfo.getName());
    return appInfo;
}

現(xiàn)在我們的loadList()函數(shù)變成這樣:

private void loadList(List<AppInfo> apps) {
    mRecyclerView.setVisibility(View.VISIBLE);
    Observable<AppInfo> observableApp = Observable.from(apps);
    
    Observable<Long> tictoc = Observable.interval(1, TimeUnit.SECONDS);
    
    Observable.zip(observableApp, tictoc,
    (AppInfo appInfo, Long time) -> updateTitle(appInfo, time))
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<AppInfo>() {
        @Override
        public void onCompleted() {
            Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
        }
        
        @Override
        public void onError(Throwable e) {
            mSwipeRefreshLayout.setRefreshing(false);
            Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
        }
        
        @Override
        public void onNext(AppInfoappInfo) {
            if (mSwipeRefreshLayout.isRefreshing()) {
                mSwipeRefreshLayout.setRefreshing(false);
            } 
            mAddedApps.add(appInfo);
            int position = mAddedApps.size() - 1;
            mAdapter.addApplication(position, appInfo);
            mRecyclerView.smoothScrollToPosition(position);
        }
    });
}

正如你看到的那樣,zip()函數(shù)有三個參數(shù):兩個Observables和一個Func2盔憨。

仔細一看會發(fā)現(xiàn)observeOn()函數(shù)徙菠。它將在下一章中講解:現(xiàn)在我們可以小試一下。

結(jié)果如下:

Join

前面兩個方法郁岩,zip()merge()方法作用在發(fā)射數(shù)據(jù)的范疇內(nèi)婿奔,在決定如何操作值之前有些場景我們需要考慮時間的。RxJava的join()函數(shù)基于時間窗口將兩個Observables發(fā)射的數(shù)據(jù)結(jié)合在一起问慎。

為了正確的理解上一張圖萍摊,我們解釋下join()需要的參數(shù):

  • 第二個Observable和源Observable結(jié)合。
  • Func1參數(shù):在指定的由時間窗口定義時間間隔內(nèi)如叼,源Observable發(fā)射的數(shù)據(jù)和從第二個Observable發(fā)射的數(shù)據(jù)相互配合返回的Observable冰木。
  • Func1參數(shù):在指定的由時間窗口定義時間間隔內(nèi),第二個Observable發(fā)射的數(shù)據(jù)和從源Observable發(fā)射的數(shù)據(jù)相互配合返回的Observable。
  • Func2參數(shù):定義已發(fā)射的數(shù)據(jù)如何與新發(fā)射的數(shù)據(jù)項相結(jié)合踊沸。

如下練習的例子歇终,我們可以修改loadList()函數(shù)像下面這樣:

private void loadList(List<AppInfo> apps) {
    mRecyclerView.setVisibility(View.VISIBLE);
    
    Observable<AppInfo> appsSequence =
    Observable.interval(1000, TimeUnit.MILLISECONDS)
                .map(position -> {
                    return apps.get(position.intValue());
                });
                
    Observable<Long> tictoc = Observable.interval(1000,TimeUnit.MILLISECONDS);
    
    appsSequence.join(
        tictoc, 
        appInfo -> Observable.timer(2,TimeUnit.SECONDS),
        time -> Observable.timer(0, TimeUnit.SECONDS),
        this::updateTitle)
        .observeOn(AndroidSchedulers.mainThread())
        .take(10)
        .subscribe(new Observer<AppInfo>() {
            @Override
            public void onCompleted() {
                Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
            }
            
            @Override
            public void onError(Throwable e) {
                mSwipeRefreshLayout.setRefreshing(false); 
                Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
            }
            
            @Override
            public void onNext(AppInfoappInfo) {
                if (mSwipeRefreshLayout.isRefreshing()) {
                    mSwipeRefreshLayout.setRefreshing(false);
                } 
                mAddedApps.add(appInfo);
                int position = mAddedApps.size() - 1;
                mAdapter.addApplication(position, appInfo);
                mRecyclerView.smoothScrollToPosition(position);
            } 
        });
}

我們有一個新的對象appsSequence,它是一個每秒從我們已安裝的app列表發(fā)射app數(shù)據(jù)的可觀測序列逼龟。tictoc這個Observable數(shù)據(jù)每秒只發(fā)射一個新的Long型整數(shù)评凝。為了合并它們,我們需要指定兩個Func1變量:

appInfo -> Observable.timer(2, TimeUnit.SECONDS)

time -> Observable.timer(0, TimeUnit.SECONDS)

上面描述了兩個時間窗口审轮。下面一行描述我們?nèi)绾问褂?code>Func2將兩個發(fā)射的數(shù)據(jù)結(jié)合在一起肥哎。

this::updateTitle

結(jié)果如下:

它看起來有點亂,但是注意app的名字和我們指定的時間窗口疾渣,我們可以看到:一旦第二個數(shù)據(jù)發(fā)射了我們就會將它與源數(shù)據(jù)結(jié)合篡诽,但我們用同一個源數(shù)據(jù)有2秒鐘。這就是為什么標題重復數(shù)字增加的原因榴捡。

值得一提的是杈女,為了簡單起見,也有一個join()操作符作用于字符串然后簡單的和發(fā)射的字符串連接成最終的字符串吊圾。

combineLatest

RxJava的combineLatest()函數(shù)有點像zip()函數(shù)的特殊形式达椰。正如我們已經(jīng)學習的,zip()作用于最近未打包的兩個Observables项乒。相反啰劲,combineLatest()作用于最近發(fā)射的數(shù)據(jù)項:如果Observable1發(fā)射了A并且Observable2發(fā)射了B和C,combineLatest()將會分組處理AB和AC檀何,如下圖所示:

combineLatest()函數(shù)接受二到九個Observable作為參數(shù)蝇裤,如果有需要的話或者單個Observables列表作為參數(shù)。

從之前的例子中把loadList()函數(shù)借用過來频鉴,我們可以修改一下來用于combineLatest()實現(xiàn)“真實世界”這個例子:

private void loadList(List<AppInfo> apps) {
    mRecyclerView.setVisibility(View.VISIBLE);
    Observable<AppInfo> appsSequence = Observable.interval(1000, TimeUnit.MILLISECONDS)
              .map(position ->apps.get(position.intValue()));
    Observable<Long> tictoc = Observable.interval(1500, TimeUnit.MILLISECONDS);
    Observable.combineLatest(appsSequence, tictoc,
               this::updateTitle)
       .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<AppInfo>() {
        
        @Override
        public void onCompleted() {
            Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
        }
        
        @Override
        public void onError(Throwable e) {
            mSwipeRefreshLayout.setRefreshing(false);
            Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
        }
        
        @Override
        public void onNext(AppInfoappInfo) {
            if (mSwipeRefreshLayout.isRefreshing()) {
                mSwipeRefreshLayout.setRefreshing(false);
            } 
            mAddedApps.add(appInfo);
            int position = mAddedApps.size() - 1;
            mAdapter.addApplication(position, appInfo);
            mRecyclerView.smoothScrollToPosition(position);
        } 
    });
}

這我們使用了兩個Observables:一個是每秒鐘從我們已安裝的應用列表發(fā)射一個App數(shù)據(jù)栓辜,第二個是每隔1.5秒發(fā)射一個Long型整數(shù)。我們將他們結(jié)合起來并執(zhí)行updateTitle()函數(shù)垛孔,結(jié)果如下:

正如你看到的藕甩,由于不同的時間間隔,AppInfo對象如我們所預料的那樣有時候會重復周荐。

And,Then和When

在將來還有一些zip()滿足不了的場景狭莱。如復雜的架構(gòu),或者是僅僅為了個人愛好概作,你可以使用And/Then/When解決方案腋妙。它們在RxJava的joins包下,使用Pattern和Plan作為中介仆嗦,將發(fā)射的數(shù)據(jù)集合并到一起辉阶。

我們的loadList()函數(shù)將會被修改從這樣:

private void loadList(List<AppInfo> apps) {

    mRecyclerView.setVisibility(View.VISIBLE);

    Observable<AppInfo> observableApp = Observable.from(apps);
    
    Observable<Long> tictoc = Observable.interval(1, TimeUnit.SECONDS);
    
    Pattern2<AppInfo, Long> pattern = JoinObservable.from(observableApp).and(tictoc); 
    
    Plan0<AppInfo> plan = pattern.then(this::updateTitle);
    
    JoinObservable
        .when(plan)
        .toObservable()
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<AppInfo>() {
        
            @Override
            public void onCompleted() {
                Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
            }
            
            @Override
            public void onError(Throwable e) {
                mSwipeRefreshLayout.setRefreshing(false); 
                Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
            }
            
            @Override
            public void onNext(AppInfoappInfo) {
                if (mSwipeRefreshLayout.isRefreshing()) { 
                mSwipeRefreshLayout.setRefreshing(false);
                } 
                mAddedApps.add(appInfo);
                int position = mAddedApps.size() - 1;
                mAdapter.addApplication(position, appInfo); mRecyclerView.smoothScrollToPosition(position);
            } 
        });
}

和通常一樣先壕,我們有兩個發(fā)射的序列瘩扼,observableApp谆甜,發(fā)射我們安裝的應用列表數(shù)據(jù),tictoc每秒發(fā)射一個Long型整數(shù)〖拢現(xiàn)在我們用and()連接源Observable和第二個Observable规辱。

JoinObservable.from(observableApp).and(tictoc);

這里創(chuàng)建一個pattern對象,使用這個對象我們可以創(chuàng)建一個Plan對象:"我們有兩個發(fā)射數(shù)據(jù)的Observables,then()是做什么的栽燕?"

pattern.then(this::updateTitle);

現(xiàn)在我們有了一個Plan對象并且當plan發(fā)生時我們可以決定接下來發(fā)生的事情罕袋。

.when(plan).toObservable()

這時候,我們可以訂閱新的Observable碍岔,正如我們總是做的那樣浴讯。

Switch

有這樣一個復雜的場景就是在一個subscribe-unsubscribe的序列里我們能夠從一個Observable自動取消訂閱來訂閱一個新的Observable。

RxJava的switch()蔼啦,正如定義的榆纽,將一個發(fā)射多個Observables的Observable轉(zhuǎn)換成另一個單獨的Observable,后者發(fā)射那些Observables最近發(fā)射的數(shù)據(jù)項捏肢。

給出一個發(fā)射多個Observables序列的源Observable奈籽,switch()訂閱到源Observable然后開始發(fā)射由第一個發(fā)射的Observable發(fā)射的一樣的數(shù)據(jù)。當源Observable發(fā)射一個新的Observable時鸵赫,switch()立即取消訂閱前一個發(fā)射數(shù)據(jù)的Observable(因此打斷了從它那里發(fā)射的數(shù)據(jù)流)然后訂閱一個新的Observable衣屏,并開始發(fā)射它的數(shù)據(jù)。

StartWith

我們已經(jīng)學到如何連接多個Observables并追加指定的值到一個發(fā)射序列里辩棒。RxJava的startWith()concat()的對應部分狼忱。正如concat()向發(fā)射數(shù)據(jù)的Observable追加數(shù)據(jù)那樣,在Observable開始發(fā)射他們的數(shù)據(jù)之前盗温, startWith()通過傳遞一個參數(shù)來先發(fā)射一個數(shù)據(jù)序列藕赞。

總結(jié)

這章中,我們學習了如何將兩個或者更多個Observable結(jié)合來創(chuàng)建一個新的可觀測序列卖局。我們將能夠merge Observable斧蜕,join Observables ,zip Observables 并在幾種情況下把他們結(jié)合在一起砚偶。

下一章批销,我們將介紹調(diào)度器,它將很容易的幫助我們創(chuàng)建主線程以及提高我們應用程序的性能染坯。我們也將學習如何正確的執(zhí)行長任務(wù)或者I/O任務(wù)來獲得更好的性能均芽。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市单鹿,隨后出現(xiàn)的幾起案子掀宋,更是在濱河造成了極大的恐慌,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,681評論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件劲妙,死亡現(xiàn)場離奇詭異湃鹊,居然都是意外死亡,警方通過查閱死者的電腦和手機镣奋,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,205評論 3 399
  • 文/潘曉璐 我一進店門币呵,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人侨颈,你說我怎么就攤上這事余赢。” “怎么了哈垢?”我有些...
    開封第一講書人閱讀 169,421評論 0 362
  • 文/不壞的土叔 我叫張陵妻柒,是天一觀的道長。 經(jīng)常有香客問我耘分,道長蛤奢,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 60,114評論 1 300
  • 正文 為了忘掉前任陶贼,我火速辦了婚禮啤贩,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘拜秧。我一直安慰自己痹屹,他們只是感情好,可當我...
    茶點故事閱讀 69,116評論 6 398
  • 文/花漫 我一把揭開白布枉氮。 她就那樣靜靜地躺著志衍,像睡著了一般贫奠。 火紅的嫁衣襯著肌膚如雪倚喂。 梳的紋絲不亂的頭發(fā)上叶圃,一...
    開封第一講書人閱讀 52,713評論 1 312
  • 那天商源,我揣著相機與錄音,去河邊找鬼鸯旁。 笑死亭螟,一個胖子當著我的面吹牛碾褂,可吹牛的內(nèi)容都是我干的泣港。 我是一名探鬼主播暂殖,決...
    沈念sama閱讀 41,170評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼当纱!你這毒婦竟也來了呛每?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 40,116評論 0 277
  • 序言:老撾萬榮一對情侶失蹤坡氯,失蹤者是張志新(化名)和其女友劉穎晨横,沒想到半個月后洋腮,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,651評論 1 320
  • 正文 獨居荒郊野嶺守林人離奇死亡手形,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,714評論 3 342
  • 正文 我和宋清朗相戀三年徐矩,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片叁幢。...
    茶點故事閱讀 40,865評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖坪稽,靈堂內(nèi)的尸體忽然破棺而出曼玩,到底是詐尸還是另有隱情,我是刑警寧澤窒百,帶...
    沈念sama閱讀 36,527評論 5 351
  • 正文 年R本政府宣布黍判,位于F島的核電站,受9級特大地震影響篙梢,放射性物質(zhì)發(fā)生泄漏顷帖。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 42,211評論 3 336
  • 文/蒙蒙 一渤滞、第九天 我趴在偏房一處隱蔽的房頂上張望贬墩。 院中可真熱鬧,春花似錦妄呕、人聲如沸陶舞。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,699評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至眉抬,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間库北,已是汗流浹背寒瓦。 一陣腳步聲響...
    開封第一講書人閱讀 33,814評論 1 274
  • 我被黑心中介騙來泰國打工垃你, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留惜颇,地道東北人。 一個月前我還...
    沈念sama閱讀 49,299評論 3 379
  • 正文 我出身青樓,卻偏偏與公主長得像器予,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,870評論 2 361

推薦閱讀更多精彩內(nèi)容

  • 響應式編程簡介 響應式編程是一種基于異步數(shù)據(jù)流概念的編程模式懈玻。數(shù)據(jù)流就像一條河:它可以被觀測艺栈,被過濾湿右,被操作吭狡,或者...
    說碼解字閱讀 3,074評論 0 5
  • 本篇文章介主要紹RxJava中操作符是以函數(shù)作為基本單位,與響應式編程作為結(jié)合使用的弛秋,對什么是操作、操作符都有哪些...
    嘎啦果安卓獸閱讀 2,867評論 0 10
  • 作者: maplejaw本篇只解析標準包中的操作符壁畸。對于擴展包太抓,由于使用率較低,如有需求掉丽,請讀者自行查閱文檔。 創(chuàng)...
    maplejaw_閱讀 45,699評論 8 93
  • 創(chuàng)建操作 用于創(chuàng)建Observable的操作符Create通過調(diào)用觀察者的方法從頭創(chuàng)建一個ObservableEm...
    rkua閱讀 1,836評論 0 1
  • RxJava技術(shù)分享 京金所—時光 2016.9.22 這里我拿出來給 Android 開發(fā)者的 RxJava 詳...
    JC_Mobile閱讀 5,570評論 3 55