RxJava操作符--->組合/合并


引言

該篇文章主要是關(guān)于RxJava的組合/變換操作符使用的代碼講解卿堂。組合/變換操作符總共有四大類:

(1)組合多個(gè)被觀察者

  • 按發(fā)送順序:concat()蜂绎、concatArray()
  • 按時(shí)間:merge()、mergeArray()
  • 錯(cuò)誤處理:concatDelayError()、mergeDelayError()

(2)合并多個(gè)事件

  • 按數(shù)量:zip()
  • 按時(shí)間:combineLatest()、combineLatestDelayError()
  • 合并成一個(gè)事件發(fā)送:reduce()、collect()

(3)發(fā)送事件前追加發(fā)送事件

  • startWith()
  • startWithArray()

(4)統(tǒng)計(jì)發(fā)送事件數(shù)量

  • count()

1. concat()/concatArray()

組合多個(gè)被觀察者一起發(fā)送數(shù)據(jù)存哲,合并后按發(fā)送順序串行執(zhí)行因宇。

二者區(qū)別:組合被觀察者的數(shù)量,即concat()組合被觀察者數(shù)量≤4個(gè)祟偷,而concatArray()則可>4個(gè)察滑。

        Observable.concat(
                Observable.just(1,2,3),
                Observable.just(4,5,6),
                Observable.just(7,8,9),
                Observable.just(10,11,12))
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(Constant.TAG,"接收到了事件"+value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(Constant.TAG,"對(duì)Error事件做出響應(yīng)");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(Constant.TAG,"對(duì)Complete事件做出響應(yīng)");
                    }
                });

        Observable.concatArray(Observable.just(1,2),
                Observable.just(3,4),
                Observable.just(5,6),
                Observable.just(7,8),
                Observable.just(9,10))
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(Constant.TAG,"接收到了事件"+value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(Constant.TAG,"對(duì)Error事件做出響應(yīng)");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(Constant.TAG,"對(duì)Complete事件做出響應(yīng)");
                    }
                });

concat()的log信息:

06-22 14:00:51.141 12967-12967/? D/RxJava: 接收到了事件1
06-22 14:00:51.141 12967-12967/? D/RxJava: 接收到了事件2
06-22 14:00:51.141 12967-12967/? D/RxJava: 接收到了事件3
06-22 14:00:51.142 12967-12967/? D/RxJava: 接收到了事件4
06-22 14:00:51.141 12967-12967/? D/RxJava: 接收到了事件5
06-22 14:00:51.141 12967-12967/? D/RxJava: 接收到了事件6
06-22 14:00:51.141 12967-12967/? D/RxJava: 接收到了事件7
06-22 14:00:51.141 12967-12967/? D/RxJava: 接收到了事件8
06-22 14:00:51.143 12967-12967/? D/RxJava: 對(duì)Complete事件做出響應(yīng)

concatArray()的log信息:

06-22 14:00:51.143 12967-12967/? D/RxJava: 對(duì)Complete事件做出響應(yīng)
06-22 14:00:51.143 12967-12967/? D/RxJava: 接收到了事件1
06-22 14:00:51.143 12967-12967/? D/RxJava: 接收到了事件2
06-22 14:00:51.143 12967-12967/? D/RxJava: 接收到了事件3
06-22 14:00:51.144 12967-12967/? D/RxJava: 接收到了事件4
06-22 14:00:51.143 12967-12967/? D/RxJava: 接收到了事件5
06-22 14:00:51.143 12967-12967/? D/RxJava: 接收到了事件6
06-22 14:00:51.143 12967-12967/? D/RxJava: 接收到了事件7
06-22 14:00:51.143 12967-12967/? D/RxJava: 接收到了事件8
06-22 14:00:51.143 12967-12967/? D/RxJava: 接收到了事件9
06-22 14:00:51.143 12967-12967/? D/RxJava: 接收到了事件10
06-22 14:00:51.143 12967-12967/? D/RxJava: 對(duì)Complete事件做出響應(yīng)

2. merge()/mergeArray()

組合多個(gè)被觀察者一起發(fā)送數(shù)據(jù),合并后按時(shí)間線并行執(zhí)行修肠。

1.二者區(qū)別:和上述的concat和concatArray的一樣贺辰;
2.區(qū)別上述concat操作符,同樣是組合多個(gè)被觀察者一起發(fā)送數(shù)據(jù)嵌施,但concat操作符合并后是按發(fā)送順序串行執(zhí)行饲化。

        Observable.merge(
                Observable.intervalRange(0,3,1,1, TimeUnit.SECONDS),
                Observable.intervalRange(2,3,1,1, TimeUnit.SECONDS))
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Long value) {
                        Log.d(Constant.TAG,"接收到了事件"+value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(Constant.TAG,"對(duì)Error事件做出響應(yīng)");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(Constant.TAG,"對(duì)Complete事件做出響應(yīng)");
                    }
                });

log信息:

06-22 14:23:11.358 14031-14082/com.gjj.frame D/RxJava: 接收到了事件0
06-22 14:23:11.366 14031-14083/com.gjj.frame D/RxJava: 接收到了事件2
06-22 14:23:12.357 14031-14082/com.gjj.frame D/RxJava: 接收到了事件1
06-22 14:23:12.358 14031-14082/com.gjj.frame D/RxJava: 接收到了事件3
06-22 14:23:13.358 14031-14082/com.gjj.frame D/RxJava: 接收到了事件2
06-22 14:23:13.359 14031-14082/com.gjj.frame D/RxJava: 接收到了事件4
06-22 14:23:13.362 14031-14082/com.gjj.frame D/RxJava: 對(duì)Complete事件做出響應(yīng)

3. concatArrayDelayError()/mergeArrayDelayError()

使用concat和merge操作符時(shí),若其中一個(gè)被觀察者發(fā)出onError事件吗伤,則會(huì)馬上終止其他被觀察者繼續(xù)發(fā)送事件吃靠,若希望onError事件推遲到其他被觀察者發(fā)送事件結(jié)束后才處罰,就需要使用對(duì)應(yīng)的concatDelayError或mergeDelayError()操作符足淆。

(1)無使用concatArrayDelayError()的情況

        Observable.concat(Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                //對(duì)error事件巢块,因?yàn)闊o使用concatDelayError,所以第二個(gè)Observable將不會(huì)發(fā)送事件
                e.onError(new NullPointerException());
                e.onComplete();
            }
        }),Observable.just(4,5,6)).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer value) {
                Log.d(Constant.TAG,"接收到了事件"+value);
           }

            @Override
            public void onError(Throwable e) {
                Log.d(Constant.TAG,"對(duì)error事件做出響應(yīng)");
            }

            @Override
            public void onComplete() {
                Log.d(Constant.TAG,"對(duì)Complete事件做出響應(yīng)");
            }
        });

測(cè)試結(jié)果:第一個(gè)悲觀者發(fā)送Error事件后,第2個(gè)被觀察者則不會(huì)繼續(xù)發(fā)送事件缸浦。
log信息:

06-25 11:03:06.905 21337-21337/com.gjj.frame D/RxJava: 接收到了事件1
06-25 11:03:06.905 21337-21337/com.gjj.frame D/RxJava: 接收到了事件2
06-25 11:03:06.905 21337-21337/com.gjj.frame D/RxJava: 接收到了事件3
06-25 11:03:06.906 21337-21337/com.gjj.frame D/RxJava: 對(duì)error事件做出響應(yīng)

(2)使用concatArrayDelayError()的情況

        Observable.concatArrayDelayError(Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                //對(duì)error事件夕冲,因?yàn)闊o使用concatDelayError,所以第二個(gè)Observable將不會(huì)發(fā)送事件
                e.onError(new NullPointerException());
                e.onComplete();
            }
        }),Observable.just(4,5,6)).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer value) {
                Log.d(Constant.TAG,"接收到了事件"+value);
           }

            @Override
            public void onError(Throwable e) {
                Log.d(Constant.TAG,"對(duì)error事件做出響應(yīng)");
            }

            @Override
            public void onComplete() {
                Log.d(Constant.TAG,"對(duì)Complete事件做出響應(yīng)");
            }
        });

測(cè)試結(jié)果:第1個(gè)被觀察者的error事件將在第2個(gè)被觀察者發(fā)送完事件后再繼續(xù)發(fā)送。
log信息:

06-25 11:08:55.097 21509-21509/com.gjj.frame D/RxJava: 接收到了事件1
06-25 11:08:55.097 21509-21509/com.gjj.frame D/RxJava: 接收到了事件2
06-25 11:08:55.097 21509-21509/com.gjj.frame D/RxJava: 接收到了事件3
06-25 11:08:55.097 21509-21509/com.gjj.frame D/RxJava: 接收到了事件4
06-25 11:08:55.097 21509-21509/com.gjj.frame D/RxJava: 接收到了事件5
06-25 11:08:55.097 21509-21509/com.gjj.frame D/RxJava: 接收到了事件6
06-25 11:08:55.097 21509-21509/com.gjj.frame D/RxJava: 對(duì)error事件做出響應(yīng)

4. Zip()

合并多個(gè)被觀察者(Observable)發(fā)送的事件裂逐,生成一個(gè)新的事件序列(即組合過后的事件序列),并最終發(fā)送泣栈。

        //創(chuàng)建第1個(gè)觀察者
        Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
            }
        }).subscribeOn(Schedulers.io());//設(shè)置被觀察者1再工作線程1中工作

        //創(chuàng)建第2個(gè)觀察者
        Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("A");
                e.onNext("B");
                e.onNext("C");
                e.onNext("D");
                e.onComplete();
            }
        }).subscribeOn(Schedulers.newThread());//設(shè)置被觀察者2再工作線程2中工作

        Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
            @Override
            public String apply(Integer integer, String s) throws Exception {
                return integer+s;
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(String value) {
                Log.d(Constant.TAG,"最終收到的事件 = "+value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(Constant.TAG,"onError");

            }

            @Override
            public void onComplete() {
                Log.d(Constant.TAG,"onComplete");

            }
        });

log信息:

06-26 16:30:02.147 29926-29985/com.gjj.frame D/RxJava: 最終收到的事件 = 1A
06-26 16:30:02.150 29926-29984/com.gjj.frame D/RxJava: 最終收到的事件 = 2B
06-26 16:30:02.151 29926-29984/com.gjj.frame D/RxJava: 最終收到的事件 = 3C

注意:最終合并的事件數(shù)量是多個(gè)被觀察者中最少的數(shù)量卜高,多余的事件將不會(huì)發(fā)送。

5. combineLatest()

當(dāng)兩個(gè)Observable中的任何一個(gè)發(fā)送了數(shù)據(jù)后南片,將先發(fā)送了數(shù)據(jù)的Observables的最新(最后)一個(gè)數(shù)據(jù)與另外一個(gè)Observable發(fā)送的每一個(gè)數(shù)據(jù)結(jié)合掺涛,最終基于該函數(shù)的結(jié)果發(fā)送數(shù)據(jù)。

        Observable.combineLatest(Observable.just(1L, 2L, 3L), Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS), new BiFunction<Long, Long, Long>() {
            @Override
            public Long apply(Long aLong, Long aLong2) throws Exception {
                Log.d(Constant.TAG,"合并的數(shù)據(jù)是:"+aLong+" "+aLong2);
                return aLong+aLong2;
            }
        }).subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                Log.d(Constant.TAG,"合并的結(jié)果是:"+aLong);
            }
        });

log信息:

06-26 16:48:37.010 30604-30634/com.gjj.frame D/RxJava: 合并的數(shù)據(jù)是:3 0
06-26 16:48:37.011 30604-30634/com.gjj.frame D/RxJava: 合并的結(jié)果是:3
06-26 16:48:38.010 30604-30634/com.gjj.frame D/RxJava: 合并的數(shù)據(jù)是:3 1
06-26 16:48:38.011 30604-30634/com.gjj.frame D/RxJava: 合并的結(jié)果是:4
06-26 16:48:39.012 30604-30634/com.gjj.frame D/RxJava: 合并的數(shù)據(jù)是:3 2
06-26 16:48:39.013 30604-30634/com.gjj.frame D/RxJava: 合并的結(jié)果是:5

6. combineLatestDelayError()

作用類似于concatArrayDelayError()疼进。

7. reduce()

把被觀察者需要發(fā)送的事件聚合成一個(gè)事件&發(fā)送

        Observable.just(1,2,3,4)
                .reduce(new BiFunction<Integer, Integer, Integer>() {
                    @Override
                    public Integer apply(Integer integer, Integer integer2) throws Exception {
                        Log.d(Constant.TAG,"本次計(jì)算的數(shù)據(jù)是:"+integer+"乘"+integer2);
                        return integer * integer2;
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(Constant.TAG,"最終計(jì)算的結(jié)果是:"+integer);
            }
        });

log信息:

06-26 16:59:56.401 31613-31613/com.gjj.frame D/RxJava: 本次計(jì)算的數(shù)據(jù)是:1乘2
06-26 16:59:56.402 31613-31613/com.gjj.frame D/RxJava: 本次計(jì)算的數(shù)據(jù)是:2乘3
06-26 16:59:56.402 31613-31613/com.gjj.frame D/RxJava: 本次計(jì)算的數(shù)據(jù)是:6乘4
06-26 16:59:56.402 31613-31613/com.gjj.frame D/RxJava: 最終計(jì)算的結(jié)果是:24

8. collect()

將被觀察者Observable發(fā)送的數(shù)據(jù)事件收集到一個(gè)數(shù)據(jù)結(jié)構(gòu)里

        Observable.just(1,2,3,4,5,6)
                .collect(new Callable<ArrayList<Integer>>() {
                    @Override
                    public ArrayList<Integer> call() throws Exception {
                        return new ArrayList<>();
                    }
                }, new BiConsumer<ArrayList<Integer>, Integer>() {
                    @Override
                    public void accept(ArrayList<Integer> list, Integer integer) throws Exception {
                        list.add(integer);
                    }
                }).subscribe(new Consumer<ArrayList<Integer>>() {
            @Override
            public void accept(ArrayList<Integer> integers) throws Exception {
                Log.d(Constant.TAG,"本次發(fā)送的數(shù)據(jù)是:"+integers);
            }
        });

log信息:

06-26 17:04:40.264 31785-31785/com.gjj.frame D/RxJava: 本次發(fā)送的數(shù)據(jù)是:[1, 2, 3, 4, 5, 6]

9. startWith()/startWithArray()

在一個(gè)被觀察者發(fā)送事件錢薪缆,追加發(fā)送一些數(shù)據(jù)/一個(gè)新的被觀察者

        Observable.just(3,4)
                .startWith(0)//追加單個(gè)數(shù)據(jù)
                .startWithArray(1,2)//追加多個(gè)數(shù)據(jù)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(Constant.TAG,"接收到了事件"+integer);
                    }
                });

log信息:

06-26 17:39:24.075 4052-4052/com.gjj.frame D/RxJava: 接收到了事件1
06-26 17:39:24.075 4052-4052/com.gjj.frame D/RxJava: 接收到了事件2
06-26 17:39:24.075 4052-4052/com.gjj.frame D/RxJava: 接收到了事件0
06-26 17:39:24.075 4052-4052/com.gjj.frame D/RxJava: 接收到了事件3
06-26 17:39:24.075 4052-4052/com.gjj.frame D/RxJava: 接收到了事件4

10.count()

統(tǒng)計(jì)被觀察者發(fā)送事件的數(shù)量。

        Observable.just(1,2,3,4)
                .count()
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long integer) throws Exception {
                        Log.d(Constant.TAG,"發(fā)送的事件數(shù)量 = "+integer);
                    }
                });

log信息:

06-26 17:42:20.639 4750-4750/com.gjj.frame D/RxJava: 發(fā)送的事件數(shù)量 = 4

參考文章:
Android RxJava:組合 / 合并操作符 詳細(xì)教程

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末伞广,一起剝皮案震驚了整個(gè)濱河市拣帽,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌嚼锄,老刑警劉巖减拭,帶你破解...
    沈念sama閱讀 219,110評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異区丑,居然都是意外死亡拧粪,警方通過查閱死者的電腦和手機(jī)修陡,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,443評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來可霎,“玉大人魄鸦,你說我怎么就攤上這事⊙⒗剩” “怎么了拾因?”我有些...
    開封第一講書人閱讀 165,474評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)斯棒。 經(jīng)常有香客問我盾致,道長(zhǎng),這世上最難降的妖魔是什么荣暮? 我笑而不...
    開封第一講書人閱讀 58,881評(píng)論 1 295
  • 正文 為了忘掉前任庭惜,我火速辦了婚禮,結(jié)果婚禮上穗酥,老公的妹妹穿的比我還像新娘护赊。我一直安慰自己,他們只是感情好砾跃,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,902評(píng)論 6 392
  • 文/花漫 我一把揭開白布骏啰。 她就那樣靜靜地躺著,像睡著了一般抽高。 火紅的嫁衣襯著肌膚如雪判耕。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,698評(píng)論 1 305
  • 那天翘骂,我揣著相機(jī)與錄音壁熄,去河邊找鬼。 笑死碳竟,一個(gè)胖子當(dāng)著我的面吹牛草丧,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播莹桅,決...
    沈念sama閱讀 40,418評(píng)論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼昌执,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了诈泼?” 一聲冷哼從身側(cè)響起懂拾,我...
    開封第一講書人閱讀 39,332評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎厂汗,沒想到半個(gè)月后委粉,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,796評(píng)論 1 316
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡娶桦,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,968評(píng)論 3 337
  • 正文 我和宋清朗相戀三年贾节,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了汁汗。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,110評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡栗涂,死狀恐怖知牌,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情斤程,我是刑警寧澤角寸,帶...
    沈念sama閱讀 35,792評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站忿墅,受9級(jí)特大地震影響扁藕,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜疚脐,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,455評(píng)論 3 331
  • 文/蒙蒙 一亿柑、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧棍弄,春花似錦望薄、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,003評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至蛮原,卻和暖如春卧须,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背儒陨。 一陣腳步聲響...
    開封第一講書人閱讀 33,130評(píng)論 1 272
  • 我被黑心中介騙來泰國(guó)打工故慈, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人框全。 一個(gè)月前我還...
    沈念sama閱讀 48,348評(píng)論 3 373
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像干签,于是被迫代替她去往敵國(guó)和親津辩。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,047評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容