RxJava入門(4):組合合并操作符

concat/concatArray

組合多個被觀察者一起發(fā)送數(shù)據(jù),合并后 按發(fā)送順序串行執(zhí)行
區(qū)別:concat()組合被觀察者數(shù)量<=4個皆刺,concatArray數(shù)量大于4個

 Observable.concat(Observable.just(1, 2, 3),
                Observable.just(4, 5, 6),
                Observable.just(7, 8, 9),
                Observable.just(10, 11, 12))
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.e("yzh","onSubscribe");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.e("yzh","onNext--"+integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e("yzh","onError--"+e.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.e("yzh","onComplete");
                    }
                });

打印結果


concat
 Observable.concatArray(Observable.just(1, 2, 3),
                Observable.just(4, 5, 6),
                Observable.just(7, 8, 9),
                Observable.just(10, 11, 12),
                Observable.just(11,12,13))
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.e("yzh","onSubscribe");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.e("yzh","onNext--"+integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e("yzh","onError--"+e.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.e("yzh","onComplete");
                    }
                });

打印結果


concatArray

merge()/mergeArray()

組合多個被觀察者一起發(fā)送數(shù)據(jù)难衰,將同一時刻的事件合并然后發(fā)送俘陷,再順序合并下面的事件
區(qū)別與concat/concatArray一樣

 Observable.merge(Observable.intervalRange(0,3,1,1, TimeUnit.SECONDS),
                Observable.intervalRange(2,3,1,1,TimeUnit.SECONDS))
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.e("yzh","onSubScribe");
                    }

                    @Override
                    public void onNext(Long aLong) {
                        Log.e("yzh","onNext--"+aLong);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e("yzh","onError--"+e.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.e("yzh","onComplete");
                    }
                });

打印結果


merge

concatDelayError() / mergeDelayError()

當合并的被觀察中有一個發(fā)出onError事件時取视,其他的被觀察者的事件也會被阻止發(fā)送甥材,使用上面這兩個方法可以將onError事件推遲到其他被觀察者發(fā)送事件結束后才觸發(fā)

 Observable.concatArrayDelayError(Observable.create(new ObservableOnSubscribe<Integer>() {
           @Override
           public void subscribe(ObservableEmitter<Integer> e) throws Exception {
               e.onNext(1);
               e.onNext(2);

               e.onError(new NullPointerException());
           }
       }),Observable.just(1,2,3))
               .subscribe(new Observer<Integer>() {
                   @Override
                   public void onSubscribe(Disposable d) {

                   }

                   @Override
                   public void onNext(Integer serializable) {
                       Log.e("yzh","onNext--"+serializable);
                   }

                   @Override
                   public void onError(Throwable e) {
                       Log.e("yzh","onError-"+e.toString());
                   }

                   @Override
                   public void onComplete() {
                        Log.e("yzh","onComplete");
                   }
               });

打印結果


concatArrayDelayError

如果直接使用concat結果如下

onNext--1
onNext--2
onError--java.lang.NullPointException

zip

合并多個被觀察者發(fā)送的事件绑改,生成一個新的事件序列谢床,然后發(fā)送

 Observable<Integer> observable1 =Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                    Log.e("yzh","被觀察者1發(fā)送事件1");
                    e.onNext(1);
                    Thread.sleep(1000);
                    Log.e("yzh","被觀察者1發(fā)送事件2");
                    e.onNext(2);
                    Thread.sleep(1000);

//                    e.onComplete();
            }
        }).subscribeOn(Schedulers.io());
        Observable<String> observable2 =Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                Log.e("yzh","觀察者2發(fā)送事件1");
                e.onNext("a");
                Thread.sleep(1000);
                Log.e("yzh","觀察者2發(fā)送事件2");
                e.onNext("b");
                Thread.sleep(1000);
                Log.e("yzh","被觀察者2發(fā)送事件3");
                e.onNext("c");
                Thread.sleep(1000);
                e.onComplete();
            }
        }).subscribeOn(Schedulers.newThread());
        Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
            @Override
            public String apply(Integer integer, String string) throws Exception {
                Log.e("yzh","apply") ;
                return  integer + string;
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("yzh", "onSubscribe");
            }

            @Override
            public void onNext(String value) {
                Log.e("yzh", "onNext =  " + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.e("yzh", "onError");
            }

            @Override
            public void onComplete() {
                Log.e("yzh", "onComplete");
            }
        });

打印結果

zip

注意 例子中的兩個觀察者用subscribeOn使用了不同的 線程,如果不加上這句代碼厘线,zip效果與concat一樣识腿,可以試一試。

combineLatest()

對兩個被觀察者中的事件組合再發(fā)送造壮,特點是將第一個被觀察者中最后一個事件分別與另一個被觀察者中的事件組合再發(fā)送渡讼。

Observable.combineLatest(Observable.just(1L, 2L, 3L),
                Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS), new BiFunction<Long, Long, Long>() {
                    @Override
                    public Long apply(Long integer, Long aLong) throws Exception {
                        Log.e("yzh","合并的對象--"+integer+"--"+aLong);
                        return integer+aLong;
                    }
                }).subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("yzh","onSubscribe");
            }

            @Override
            public void onNext(Long s) {
                Log.e("yzh","onNext--"+s);
            }

            @Override
            public void onError(Throwable e) {
                Log.e("yzh","onError--"+e.toString());
            }

            @Override
            public void onComplete() {
                Log.e("yzh","onComplete");
            }
        });

打印結果


combineLatest

reduce()

把被觀察者需要發(fā)送的事件聚合成1個事件然后發(fā)送,有點斐波那契數(shù)列的意思

Observable.just(1,2,3,4)
                .reduce(new BiFunction<Integer, Integer, Integer>() {
                    @Override
                    public Integer apply(Integer integer, Integer integer2) throws Exception {
                        Log.e("yzh","操作數(shù)據(jù)--"+integer+"---"+integer2);
                        return integer*integer2;
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e("yzh","接受到的數(shù)據(jù)--"+integer);
            }
        });

打印結果


reduce

collect()

將被觀察者Observable發(fā)送的數(shù)據(jù)事件收集到一個數(shù)據(jù)結構里

 Observable.just(1,2,3,4,5)
                .collect(new Callable<ArrayList<Integer>>() {

                    @Override
                    public ArrayList<Integer> call() throws Exception {
                        return new ArrayList<>();
                    }
                }, new BiConsumer<ArrayList<Integer>, Integer>() {
                    @Override
                    public void accept(ArrayList<Integer> integers, Integer integer) throws Exception {
                                integers.add(integer);
                    }
                }).subscribe(new Consumer<ArrayList<Integer>>() {
            @Override
            public void accept(ArrayList<Integer> integers) throws Exception {
                    Log.e("yzh","accept--"+integers.toString());
            }
        });

打印結果


collect

startWith() / startWithArray()

在一個被觀察者發(fā)送事件前耳璧,追加發(fā)送一些數(shù)據(jù) / 一個新的被觀察者
注意 后面的方法添加的數(shù)據(jù)在前面

 Observable.just(4,5,6)
                .startWith(0)
                .startWithArray(1,2,3)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.e("yzh","onSubscribe");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.e("yzh","onNext--"+integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e("yzh","onError--"+e.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.e("yzh","onComplete");
                    }
                });

打印結果


startWith
最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末成箫,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子旨枯,更是在濱河造成了極大的恐慌蹬昌,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,640評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件攀隔,死亡現(xiàn)場離奇詭異皂贩,居然都是意外死亡栖榨,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,254評論 3 395
  • 文/潘曉璐 我一進店門明刷,熙熙樓的掌柜王于貴愁眉苦臉地迎上來婴栽,“玉大人,你說我怎么就攤上這事辈末∮拚” “怎么了?”我有些...
    開封第一講書人閱讀 165,011評論 0 355
  • 文/不壞的土叔 我叫張陵挤聘,是天一觀的道長准脂。 經(jīng)常有香客問我,道長檬洞,這世上最難降的妖魔是什么狸膏? 我笑而不...
    開封第一講書人閱讀 58,755評論 1 294
  • 正文 為了忘掉前任,我火速辦了婚禮添怔,結果婚禮上湾戳,老公的妹妹穿的比我還像新娘。我一直安慰自己广料,他們只是感情好砾脑,可當我...
    茶點故事閱讀 67,774評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著艾杏,像睡著了一般韧衣。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上购桑,一...
    開封第一講書人閱讀 51,610評論 1 305
  • 那天畅铭,我揣著相機與錄音,去河邊找鬼勃蜘。 笑死硕噩,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的缭贡。 我是一名探鬼主播炉擅,決...
    沈念sama閱讀 40,352評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼阳惹!你這毒婦竟也來了谍失?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 39,257評論 0 276
  • 序言:老撾萬榮一對情侶失蹤莹汤,失蹤者是張志新(化名)和其女友劉穎快鱼,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,717評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡攒巍,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,894評論 3 336
  • 正文 我和宋清朗相戀三年嗽仪,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片柒莉。...
    茶點故事閱讀 40,021評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡闻坚,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出兢孝,到底是詐尸還是另有隱情窿凤,我是刑警寧澤,帶...
    沈念sama閱讀 35,735評論 5 346
  • 正文 年R本政府宣布跨蟹,位于F島的核電站雳殊,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏窗轩。R本人自食惡果不足惜夯秃,卻給世界環(huán)境...
    茶點故事閱讀 41,354評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望痢艺。 院中可真熱鬧仓洼,春花似錦、人聲如沸堤舒。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,936評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽舌缤。三九已至箕戳,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間国撵,已是汗流浹背陵吸。 一陣腳步聲響...
    開封第一講書人閱讀 33,054評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留卸留,地道東北人走越。 一個月前我還...
    沈念sama閱讀 48,224評論 3 371
  • 正文 我出身青樓,卻偏偏與公主長得像耻瑟,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子赏酥,可洞房花燭夜當晚...
    茶點故事閱讀 44,974評論 2 355