Rx系列<第十五篇>:RxJava之組合/合并操作符

(1)concat和concatArray

組合多個(gè)被觀察者一起發(fā)送數(shù)據(jù)夸溶,合并后 按發(fā)送順序串行執(zhí)行

    List<Observable<Integer>> list = new ArrayList<>();
    list.add(Observable.just(1,2));
    list.add(Observable.just(3, 4));
    list.add(Observable.just(5, 6));
    Observable.concat(list)
    .subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.d("aaa", String.valueOf(integer));
        }
    });


    Observable.concatArray(Observable.just(1, 2), Observable.just(3, 4), Observable.just(5, 6))
    .subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.d("aaa", String.valueOf(integer));
        }
    });

以上兩種方式的合并揩慕,返回結(jié)果是:1 2 3 4 5 6

(2)merge和mergeArray

組合多個(gè)被觀察者一起發(fā)送數(shù)據(jù)岩瘦,合并后 按時(shí)間線(xiàn)并行執(zhí)行

    List<Observable<Integer>> list = new ArrayList<>();
    list.add(Observable.just(1, 2).delay(2000, TimeUnit.MILLISECONDS));
    list.add(Observable.just(3, 4));
    list.add(Observable.just(5, 6));
    list.add(Observable.just(7, 8));
    Observable
            .merge(list)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d("aaa", String.valueOf(integer));
                }
            });


    Observable
            .mergeArray(Observable.just(1, 2).delay(2000, TimeUnit.MILLISECONDS), Observable.just(3, 4), Observable.just(5, 6), Observable.just(7, 8))
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d("aaa", String.valueOf(integer));
                }
            });

以上兩種方法的返回結(jié)果是:3 4 5 6 7 8 1 2

(3)concatDelayError和mergeDelayError

concatDelayError:多個(gè)Observable合并揉燃,并按順序發(fā)射數(shù)據(jù)掸哑, 如果發(fā)生異常约急,則不會(huì)立即中斷發(fā)射數(shù)據(jù),異常將延遲發(fā)射苗分。
mergeDelayError:多個(gè)Observable合并厌蔽,并行發(fā)射數(shù)據(jù), 如果發(fā)生異常摔癣,則不會(huì)立即中斷發(fā)射數(shù)據(jù)奴饮,異常將延遲發(fā)射。

    List<Observable<Integer>> list = new ArrayList<>();
    list.add(Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            e.onNext(1);
            e.onNext(2);
            e.onError(new NullPointerException("exception"));
            e.onComplete();
        }
    }).delay(2000, TimeUnit.MILLISECONDS));

    list.add(Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            e.onNext(3);
            e.onNext(4);
            e.onError(new NullPointerException("exception"));
            e.onComplete();
        }
    }));

    list.add(Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            e.onNext(5);
            e.onNext(6);
            e.onError(new NullPointerException("exception"));
            e.onComplete();
        }
    }));

    Observable
            .concatDelayError(list)
            .subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.d("aaa", String.valueOf(integer));
        }
    }, new Consumer<Throwable>() {
        @Override
        public void accept(Throwable throwable) throws Exception {
            Log.d("aaa", "發(fā)生了異常");
        }
    });

返回結(jié)果是:

圖片.png
(4)zip

合并多個(gè)被觀察者的數(shù)據(jù)流供填, 然后發(fā)送(Emit)最終合并的數(shù)據(jù)拐云。(數(shù)據(jù)和數(shù)據(jù)之間是一對(duì)一的關(guān)系)

    Observable observable1=Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            e.onNext(1);
            SystemClock.sleep(1000);
            e.onNext(2);
            SystemClock.sleep(1000);
            e.onNext(3);
            SystemClock.sleep(1000);
            e.onNext(4);
            SystemClock.sleep(1000);
            e.onComplete();
        }
    }).subscribeOn(Schedulers.io());

    Observable observable2=Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            e.onNext("A");
            SystemClock.sleep(1000);
            e.onNext("B");
            SystemClock.sleep(1000);
            e.onNext("C");
            SystemClock.sleep(1000);
            e.onComplete();
        }
    }).subscribeOn(Schedulers.io());

    Observable.zip(observable1, observable2, new BiFunction<Integer,String,String>() {
        @Override
        public String apply(Integer a,String b) throws Exception {
            return a+b;
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.d("aaa", s);
        }
    });

返回結(jié)果:

圖片.png
(5)combineLatest

按照同一時(shí)間線(xiàn)來(lái)進(jìn)行合并。

    Observable observable1=Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            SystemClock.sleep(700);
            e.onNext(1);
            SystemClock.sleep(700);
            e.onNext(2);
            SystemClock.sleep(700);
            e.onNext(3);
            SystemClock.sleep(700);
            e.onNext(4);
            SystemClock.sleep(700);
            e.onComplete();
        }
    }).subscribeOn(Schedulers.io());

    Observable observable2=Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            e.onNext("A");
            SystemClock.sleep(600);
            e.onNext("B");
            SystemClock.sleep(600);
            e.onNext("C");
            SystemClock.sleep(600);
            e.onComplete();
        }
    }).subscribeOn(Schedulers.io());

    Observable.combineLatest(observable1, observable2, new BiFunction<Integer,String,String>() {
        @Override
        public String apply(Integer a,String b) throws Exception {
            return a+b;
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.d("aaa", s);
        }
    });

接下來(lái)近她,根據(jù)代碼叉瘩,我們來(lái)畫(huà)一張圖。

畫(huà)兩個(gè)時(shí)間線(xiàn)observable1和observable2粘捎,根據(jù)代碼中指定的時(shí)間畫(huà)時(shí)間線(xiàn)薇缅,最后觀察兩個(gè)被觀察者時(shí)間線(xiàn)重合的地方。

圖片.png

實(shí)際上代碼輸出的結(jié)果也是:

圖片.png
(6)combineLatestDelayError

作用類(lèi)似于concatDelayError() / mergeDelayError() 攒磨,即錯(cuò)誤處理泳桦,上面已經(jīng)介紹過(guò)類(lèi)似的了。

(7)reduce

把被觀察者需要發(fā)送的事件聚合成1個(gè)事件 & 發(fā)送

    Observable.just(1,2,3,4,5)
            .reduce(new BiFunction<Integer, Integer, Integer>() {
                @Override
                public Integer apply(Integer integer, Integer integer2) throws Exception {
                    return integer + integer2;
                }
            })
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d("aaa", String.valueOf(integer));
                }
            });

日志如下:

圖片.png

相當(dāng)于做了一個(gè)這樣的計(jì)算1+2+3+4+5 = 15娩缰,再將15發(fā)射出去灸撰。

(8)collect

將被觀察者Observable發(fā)送的數(shù)據(jù)事件收集到一個(gè)數(shù)據(jù)結(jié)構(gòu)里

    Observable
            .just(1, 2, 3, 4, 5, 6)
            .collect(new Callable<ArrayList<Integer>>() {

                @Override
                public ArrayList<Integer> call() throws Exception {
                    return new ArrayList();
                }
            }, new BiConsumer<ArrayList<Integer>, Integer>() {
                @Override
                public void accept(ArrayList<Integer> list, Integer integer) throws Exception {
                    list.add(integer);
                }
            }).subscribe(new Consumer<ArrayList<Integer>>() {
        @Override
        public void accept(ArrayList<Integer> list) throws Exception {
            for (int result : list){
                Log.d("aaa", String.valueOf(result));
            }
        }
    });

執(zhí)行結(jié)果:

圖片.png
(9)startWith和startWithArray

startWith: 在已有數(shù)據(jù)流之前追加一個(gè)或一組數(shù)據(jù)流。

圖片.png

startWith可以傳遞的參數(shù)是:一個(gè)數(shù)據(jù)拼坎,一個(gè)數(shù)據(jù)列表浮毯,一個(gè)Observable。

    Observable.just(1, 2, 3)
            .startWith(4)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d("aaa", String.valueOf(integer));
                }
            });

返回結(jié)果: 4 1 2 3

startWithArray:在已有數(shù)據(jù)流之前追加一組數(shù)據(jù)流泰鸡。

圖片.png
    Observable.just(1, 2, 3)
            .startWithArray(4, 5)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d("aaa", String.valueOf(integer));
                }
            });

返回結(jié)果:4 5 1 2 3

(10)count

統(tǒng)計(jì)被觀察者發(fā)送事件的數(shù)量

    Observable.just(1, 2, 3, 4)
            .count()
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    Log.d("aaa", String.valueOf(aLong));
                }
            });

返回結(jié)果:4

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末债蓝,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子盛龄,更是在濱河造成了極大的恐慌饰迹,老刑警劉巖芳誓,帶你破解...
    沈念sama閱讀 218,204評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異啊鸭,居然都是意外死亡锹淌,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,091評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén)莉掂,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)葛圃,“玉大人,你說(shuō)我怎么就攤上這事憎妙】庹” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 164,548評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵厘唾,是天一觀的道長(zhǎng)褥符。 經(jīng)常有香客問(wèn)我,道長(zhǎng)抚垃,這世上最難降的妖魔是什么喷楣? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,657評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮鹤树,結(jié)果婚禮上铣焊,老公的妹妹穿的比我還像新娘。我一直安慰自己罕伯,他們只是感情好曲伊,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,689評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著追他,像睡著了一般坟募。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上邑狸,一...
    開(kāi)封第一講書(shū)人閱讀 51,554評(píng)論 1 305
  • 那天懈糯,我揣著相機(jī)與錄音,去河邊找鬼单雾。 笑死赚哗,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的硅堆。 我是一名探鬼主播蜂奸,決...
    沈念sama閱讀 40,302評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼硬萍!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起围详,我...
    開(kāi)封第一講書(shū)人閱讀 39,216評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤朴乖,失蹤者是張志新(化名)和其女友劉穎祖屏,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體买羞,經(jīng)...
    沈念sama閱讀 45,661評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡袁勺,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,851評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了畜普。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片期丰。...
    茶點(diǎn)故事閱讀 39,977評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖吃挑,靈堂內(nèi)的尸體忽然破棺而出钝荡,到底是詐尸還是另有隱情,我是刑警寧澤舶衬,帶...
    沈念sama閱讀 35,697評(píng)論 5 347
  • 正文 年R本政府宣布埠通,位于F島的核電站,受9級(jí)特大地震影響逛犹,放射性物質(zhì)發(fā)生泄漏端辱。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,306評(píng)論 3 330
  • 文/蒙蒙 一虽画、第九天 我趴在偏房一處隱蔽的房頂上張望舞蔽。 院中可真熱鬧,春花似錦码撰、人聲如沸渗柿。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,898評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)做祝。三九已至,卻和暖如春鸡岗,著一層夾襖步出監(jiān)牢的瞬間混槐,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,019評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工轩性, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留声登,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,138評(píng)論 3 370
  • 正文 我出身青樓揣苏,卻偏偏與公主長(zhǎng)得像悯嗓,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子卸察,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,927評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • 一脯厨、RxJava操作符概述 RxJava中的操作符就是為了提供函數(shù)式的特性,函數(shù)式最大的好處就是使得數(shù)據(jù)處理簡(jiǎn)潔易...
    BrotherChen閱讀 1,615評(píng)論 0 10
  • 一坑质、RxJava操作符概述 RxJava中的操作符就是為了提供函數(shù)式的特性合武,函數(shù)式最大的好處就是使得數(shù)據(jù)處理簡(jiǎn)潔易...
    無(wú)求_95dd閱讀 3,082評(píng)論 0 21
  • 創(chuàng)建操作 用于創(chuàng)建Observable的操作符Create通過(guò)調(diào)用觀察者的方法從頭創(chuàng)建一個(gè)ObservableEm...
    rkua閱讀 1,827評(píng)論 0 1
  • 注:只包含標(biāo)準(zhǔn)包中的操作符临梗,用于個(gè)人學(xué)習(xí)及備忘參考博客:http://blog.csdn.net/maplejaw...
    小白要超神閱讀 2,195評(píng)論 2 8
  • 記錄RxJava操作符,方便查詢(xún)(2.2.2版本) 英文文檔地址:http://reactivex.io/docu...
    凌云飛魚(yú)閱讀 824評(píng)論 0 0