RxJava中的常用操作符

創(chuàng)建操作

  • create
  • defer
    直到有觀察者訂閱時(shí)才創(chuàng)建Observable,并且為每個(gè)觀察者創(chuàng)建一個(gè)新的Observable执泰。
    defer.c.png

    Defer操作符會(huì)一直等待直到有觀察者訂閱它,然后它使用Observable工廠方法生成一個(gè)Observable距潘。它對(duì)每個(gè)觀察者都這樣做罗侯,因此盡管每個(gè)訂閱者都以為自己訂閱的是同一個(gè)Observable,事實(shí)上每個(gè)訂閱者獲取的是它們自己的單獨(dú)的數(shù)據(jù)序列伏钠。
    在某些情況下,等待直到最后一分鐘(就是直到訂閱發(fā)生時(shí))才生成Observable可以確保Observable包含最新的數(shù)據(jù)谨设。
  • Empty/Never/Throw
    Empty
    創(chuàng)建一個(gè)不發(fā)射任何數(shù)據(jù)但是正常終止的Observable
    Never
    創(chuàng)建一個(gè)不發(fā)射數(shù)據(jù)也不終止的Observable
    Throw
    創(chuàng)建一個(gè)不發(fā)射數(shù)據(jù)以一個(gè)錯(cuò)誤終止的Observable
  • fromArray/fromIterable
    將其它種類(lèi)的對(duì)象和數(shù)據(jù)類(lèi)型轉(zhuǎn)換為Observable
from.png

在RxJava中熟掂,from操作符可以轉(zhuǎn)換Iterable和Array等。對(duì)于Iterable和數(shù)組扎拣,產(chǎn)生的Observable會(huì)發(fā)射Iterable或數(shù)組

  • Interval
    創(chuàng)建一個(gè)按固定時(shí)間間隔發(fā)射無(wú)線遞增的整數(shù)序列的Observable,RxJava將這個(gè)操作符實(shí)現(xiàn)為interval方法赴肚。它接受一個(gè)表示時(shí)間間隔的參數(shù)和一個(gè)表示時(shí)間單位的參數(shù)。
interval(long,TimeUnit)
interval(long,TimeUnit,Scheduler)
  • just
    創(chuàng)建一個(gè)發(fā)射指定值的Observable,Just將單個(gè)數(shù)據(jù)轉(zhuǎn)換為發(fā)射那個(gè)數(shù)據(jù)的Observable二蓝。
    Just類(lèi)似于From誉券,但是From會(huì)將數(shù)組或Iterable的數(shù)據(jù)取出然后逐個(gè)發(fā)射,而Just只是簡(jiǎn)單的原樣發(fā)射刊愚,將數(shù)組或Iterable當(dāng)做單個(gè)數(shù)據(jù)踊跟。just方法最多接受10個(gè)參數(shù),返回一個(gè)按參數(shù)列表順序發(fā)射這些數(shù)據(jù)的Observable
  • Range
    創(chuàng)建一個(gè)發(fā)射特定整數(shù)序列的Observable
    RxJava將這個(gè)操作符實(shí)現(xiàn)為range函數(shù),它接受兩個(gè)參數(shù)鸥诽,一個(gè)是范圍的起始值商玫,一個(gè)是范圍的數(shù)據(jù)的數(shù)目。如果你將第二個(gè)參數(shù)設(shè)為0牡借,將導(dǎo)致Observable不發(fā)射任何數(shù)據(jù)(如果設(shè)置為負(fù)數(shù)决帖,會(huì)拋異常)。
  • Repeat
    創(chuàng)建一個(gè)發(fā)射特定數(shù)據(jù)重復(fù)多次的Observable
    RxJava將這個(gè)操作符實(shí)現(xiàn)為repeat方法蓖捶。它不是創(chuàng)建一個(gè)Observable地回,而是重復(fù)發(fā)射原始Observable的數(shù)據(jù)序列,這個(gè)序列或者是無(wú)限的,或者通過(guò)repeat(n)指定重復(fù)次數(shù)

變換操作

  • map
    對(duì)Observable發(fā)射的每一項(xiàng)數(shù)據(jù)應(yīng)用一個(gè)函數(shù)刻像,執(zhí)行變換操作
map.png

Map操作符對(duì)原始Observable發(fā)射的每一項(xiàng)數(shù)據(jù)應(yīng)用一個(gè)你選擇的函數(shù)畅买,然后返回一個(gè)發(fā)射這些結(jié)果的Observable。
RxJava將這個(gè)操作符實(shí)現(xiàn)為map函數(shù)细睡。這個(gè)操作符默認(rèn)不在任何特定的調(diào)度器上執(zhí)行谷羞。

Observable.just(1,2,3)
                .map(new Function<Integer, String>() {
                    @Override
                    public String apply(@NonNull Integer integer) throws Exception {
                        return "integer is" + integer;
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.i(TAG, "accept: s = " + s);     
                    }
                });
08-24 17:08:19.573 27421-27421/com.example.wty.learnrxjava I/MainActivity: accept: s = integer is 1
08-24 17:08:19.573 27421-27421/com.example.wty.learnrxjava I/MainActivity: accept: s = integer is 2
08-24 17:08:19.573 27421-27421/com.example.wty.learnrxjava I/MainActivity: accept: s = integer is 3
  • flatMap
    FlatMap將一個(gè)發(fā)射數(shù)據(jù)的Observable變換為多個(gè)Observables,然后將它們發(fā)射的數(shù)據(jù)合并后放進(jìn)一個(gè)單獨(dú)的Observable
mergeMap.png

FlatMap操作符使用一個(gè)指定的函數(shù)對(duì)原始Observable發(fā)射的每一項(xiàng)數(shù)據(jù)執(zhí)行變換操作溜徙,這個(gè)函數(shù)返回一個(gè)本身也發(fā)射數(shù)據(jù)的Observable湃缎,然后FlatMap合并這些Observables發(fā)射的數(shù)據(jù),最后將合并后的結(jié)果當(dāng)做它自己的數(shù)據(jù)序列發(fā)射蠢壹。
這個(gè)方法是很有用的嗓违,例如,當(dāng)你有一個(gè)這樣的Observable:它發(fā)射一個(gè)數(shù)據(jù)序列图贸,這些數(shù)據(jù)本身包含Observable成員或者可以變換為Observable蹂季,因此你可以創(chuàng)建一個(gè)新的Observable發(fā)射這些次級(jí)Observable發(fā)射的數(shù)據(jù)的完整集合。
注意:FlatMap對(duì)這些Observables發(fā)射的數(shù)據(jù)做的是合并(merge)操作疏日,因此它們可能是交錯(cuò)的偿洁。如果想要按照嚴(yán)格的順序發(fā)射這些數(shù)據(jù),使用ConcatMap操作符即可
舉個(gè)例子:

Observable.fromIterable(getData())
                .flatMap(new Function<NoteBook, ObservableSource<Note>>() {
                    @Override
                    public ObservableSource<Note> apply(@NonNull NoteBook noteBook) throws Exception {
                        return Observable.fromIterable(noteBook.getNotes());
                    }
                })
                .subscribe(new Consumer<Note>() {
                    @Override
                    public void accept(Note note) throws Exception {
                        Log.i(TAG, "accept: " + note);
                    }
                });
08-24 17:14:09.512 32091-32091/? I/MainActivity: accept: Note{id='1', noteBookId='1', title='Introduction', content='$$$$$$$$$$$$$$$$$$'}
08-24 17:14:09.513 32091-32091/? I/MainActivity: accept: Note{id='2', noteBookId='1', title='ReactiveX', content='########################'}
08-24 17:14:09.513 32091-32091/? I/MainActivity: accept: Note{id='3', noteBookId='1', title='Observables', content='@@@@@@@@@@@@@@@@@@@@@@@@@@'}
08-24 17:14:09.513 32091-32091/? I/MainActivity: accept: Note{id='4', noteBookId='1', title='Operators Categories', content='********************'}
08-24 17:14:09.513 32091-32091/? I/MainActivity: accept: Note{id='5', noteBookId='1', title='RxJava文檔和教程', content='********************'}
08-24 17:14:09.513 32091-32091/? I/MainActivity: accept: Note{id='6', noteBookId='2', title='Retrofit入門(mén)教程1', content='$$$$$$$$$$$$$$$$$$'}
08-24 17:14:09.513 32091-32091/? I/MainActivity: accept: Note{id='7', noteBookId='2', title='Retrofit入門(mén)教程2', content='########################'}
08-24 17:14:09.513 32091-32091/? I/MainActivity: accept: Note{id='8', noteBookId='2', title='Retrofit入門(mén)教程3', content='@@@@@@@@@@@@@@@@@@@@@@@@@@'}
08-24 17:14:09.513 32091-32091/? I/MainActivity: accept: Note{id='9', noteBookId='2', title='Retrofit入門(mén)教程4', content='********************'}
08-24 17:14:09.514 32091-32091/? I/MainActivity: accept: Note{id='10', noteBookId='2', title='Retrofit入門(mén)教程5', content='********************'}
  • Buffer
    定期收集Observable的數(shù)據(jù)放進(jìn)一個(gè)數(shù)據(jù)包裹,然后發(fā)射這些數(shù)據(jù)包裹沟优,而不是一次發(fā)射一個(gè)值涕滋。
    buffer.png

    Buffer操作符將一個(gè)Observable變換為另一個(gè),原來(lái)的Observable正常發(fā)射數(shù)據(jù)挠阁,變換產(chǎn)生的Observable發(fā)射這些數(shù)據(jù)的緩存集合何吝。Buffer操作符在很多語(yǔ)言特定的實(shí)現(xiàn)中有很多種變體,它們?cè)谌绾尉彺孢@個(gè)問(wèn)題上存在區(qū)別鹃唯。
    注意:如果原來(lái)的Observable發(fā)射了一個(gè)onError通知,Buffer會(huì)立即傳遞這個(gè)通知瓣喊,而不是首先發(fā)射緩存的數(shù)據(jù)坡慌,即使在這之前緩存中包含了原始Observable發(fā)射的數(shù)據(jù)。
Observable.interval(1000,TimeUnit.MILLISECONDS)
                .buffer(5)
                .subscribe(new Consumer<List<Long>>() {
                    @Override
                    public void accept(List<Long> longs) throws Exception {
                        Log.i(TAG, "accept: longs = " + longs);
                    }
                });
08-24 17:04:12.162 23975-24014/com.example.wty.learnrxjava I/MainActivity: accept: longs = [0, 1, 2, 3, 4]
08-24 17:04:17.162 23975-24014/com.example.wty.learnrxjava I/MainActivity: accept: longs = [5, 6, 7, 8, 9]
08-24 17:04:22.162 23975-24014/com.example.wty.learnrxjava I/MainActivity: accept: longs = [10, 11, 12, 13, 14]
08-24 17:04:27.162 23975-24014/com.example.wty.learnrxjava I/MainActivity: accept: longs = [15, 16, 17, 18, 19]
08-24 17:04:32.162 23975-24014/com.example.wty.learnrxjava I/MainActivity: accept: longs = [20, 21, 22, 23, 24]
08-24 17:04:37.162 23975-24014/com.example.wty.learnrxjava I/MainActivity: accept: longs = [25, 26, 27, 28, 29]
  • GroupBy
    將一個(gè)Observable分拆為一些Observables集合藻三,它們中的每一個(gè)發(fā)射原始Observable的一個(gè)子序列
groupBy.c.png

GroupBy操作符將原始Observable分拆為一些Observables集合洪橘,它們中的每一個(gè)發(fā)射原始Observable數(shù)據(jù)序列的一個(gè)子序列。哪個(gè)數(shù)據(jù)項(xiàng)由哪一個(gè)Observable發(fā)射是由一個(gè)函數(shù)判定的棵帽,這個(gè)函數(shù)給每一項(xiàng)指定一個(gè)Key熄求,Key相同的數(shù)據(jù)會(huì)被同一個(gè)Observable發(fā)射。
舉個(gè)例子,

Observable.interval(1, TimeUnit.SECONDS)
                .take(30)
                .groupBy(new Function<Long, Integer>() {
                    @Override
                    public Integer apply(@NonNull Long aLong) throws Exception {
                        if (aLong % 3 == 0) {
                            return 3;
                        } else if (aLong % 4 == 0) {
                            return 4;
                        } else if (aLong % 5 == 0) {
                            return 5;
                        } else {
                            return 1;
                        }

                    }
                })
                .subscribe(new Consumer<GroupedObservable<Integer, Long>>() {
                    @Override
                    public void accept(GroupedObservable<Integer, Long> longLongGroupedObservable) throws Exception {
                        Integer key = longLongGroupedObservable.getKey();
                        Log.i(TAG, "accept: key = " + key);
                        if (key == 3) {
                            longLongGroupedObservable.subscribe(new Consumer<Long>() {
                                @Override
                                public void accept(Long aLong) throws Exception {
                                    Log.i(TAG, "accept: key = 3,3的倍數(shù),aLong = " + aLong);
                                }
                            });
                        } else if (key == 4) {
                            longLongGroupedObservable.subscribe(new Consumer<Long>() {
                                @Override
                                public void accept(Long aLong) throws Exception {
                                    Log.i(TAG, "accept: key = 4,4的倍數(shù) aLong = " + aLong);
                                }
                            });
                        } else if (key == 5) {
                            longLongGroupedObservable.subscribe(new Consumer<Long>() {
                                @Override
                                public void accept(Long aLong) throws Exception {
                                    Log.i(TAG, "accept: key = 5,5的倍數(shù) aLong = " + aLong);
                                }
                            });
                        } else {
                            longLongGroupedObservable.subscribe(new Consumer<Long>() {
                                @Override
                                public void accept(Long aLong) throws Exception {
                                    Log.i(TAG, "accept: key = 1,不是3逗概、4弟晚、5的倍數(shù) aLong = " + aLong);
                                }
                            });
                        }
                    }
                });
08-24 17:22:37.134 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 1
08-24 17:22:37.135 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 1,不是3、4、5的倍數(shù) aLong = 1
08-24 17:22:37.231 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 1,不是3卿城、4枚钓、5的倍數(shù) aLong = 2
08-24 17:22:37.331 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 3
08-24 17:22:37.332 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 3,3的倍數(shù),aLong = 3
08-24 17:22:37.431 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 4
08-24 17:22:37.432 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 4,4的倍數(shù) aLong = 4
08-24 17:22:37.531 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 5
08-24 17:22:37.532 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 5,5的倍數(shù) aLong = 5
08-24 17:22:37.631 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 3,3的倍數(shù),aLong = 6
08-24 17:22:37.731 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 1,不是3、4瑟押、5的倍數(shù) aLong = 7
08-24 17:22:37.831 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 4,4的倍數(shù) aLong = 8
08-24 17:22:37.931 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 3,3的倍數(shù),aLong = 9
08-24 17:22:38.031 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 5,5的倍數(shù) aLong = 10
08-24 17:22:38.131 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 1,不是3搀捷、4、5的倍數(shù) aLong = 11
08-24 17:22:38.231 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 3,3的倍數(shù),aLong = 12
08-24 17:22:38.331 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 1,不是3多望、4嫩舟、5的倍數(shù) aLong = 13
08-24 17:22:38.431 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 1,不是3、4怀偷、5的倍數(shù) aLong = 14
08-24 17:22:38.531 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 3,3的倍數(shù),aLong = 15
08-24 17:22:38.631 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 4,4的倍數(shù) aLong = 16
08-24 17:22:38.731 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 1,不是3家厌、4、5的倍數(shù) aLong = 17
08-24 17:22:38.831 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 3,3的倍數(shù),aLong = 18
08-24 17:22:38.931 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 1,不是3枢纠、4像街、5的倍數(shù) aLong = 19
08-24 17:22:39.031 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 4,4的倍數(shù) aLong = 20
08-24 17:22:39.131 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 3,3的倍數(shù),aLong = 21
08-24 17:22:39.231 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 1,不是3、4晋渺、5的倍數(shù) aLong = 22
08-24 17:22:39.331 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 1,不是3镰绎、4、5的倍數(shù) aLong = 23
08-24 17:22:39.431 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 3,3的倍數(shù),aLong = 24
08-24 17:22:39.531 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 5,5的倍數(shù) aLong = 25
08-24 17:22:39.631 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 1,不是3木西、4畴栖、5的倍數(shù) aLong = 26
08-24 17:22:39.731 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 3,3的倍數(shù),aLong = 27
08-24 17:22:39.831 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 4,4的倍數(shù) aLong = 28
08-24 17:22:39.932 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 1,不是3、4八千、5的倍數(shù) aLong = 29
08-24 17:22:40.031 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 3,3的倍數(shù),aLong = 30

過(guò)濾操作

  • Distinct
    抑制(過(guò)濾掉)重復(fù)的數(shù)據(jù)項(xiàng)
distinct.png
Observable.just(1,2,1,1,2,1,1,2,3,4)
                .distinct(new Function<Integer, Integer>() {
                    @Override
                    public Integer apply(@NonNull Integer integer) throws Exception {
                        return integer;
                    }
                })
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.i(TAG, "accept: integer = " + integer);
                    }
                });
08-24 17:26:19.198 10816-10816/com.example.wty.learnrxjava I/MainActivity: accept: integer = 1
08-24 17:26:19.199 10816-10816/com.example.wty.learnrxjava I/MainActivity: accept: integer = 2
08-24 17:26:19.199 10816-10816/com.example.wty.learnrxjava I/MainActivity: accept: integer = 3
08-24 17:26:19.199 10816-10816/com.example.wty.learnrxjava I/MainActivity: accept: integer = 4
  • ElementAt
    只發(fā)射第N項(xiàng)數(shù)據(jù)
  • Filter
    Filter操作符使用你指定的一個(gè)謂詞函數(shù)測(cè)試數(shù)據(jù)項(xiàng)吗讶,只有通過(guò)測(cè)試的數(shù)據(jù)才會(huì)被發(fā)射。
    舉個(gè)例子,過(guò)濾整數(shù)序列中的奇數(shù),只發(fā)射偶數(shù)
    filter.png
Observable.interval(1,TimeUnit.SECONDS)
                .filter(new Predicate<Long>() {
                    @Override
                    public boolean test(@NonNull Long aLong) throws Exception {
                        if (aLong % 2 == 0){
                            return true;
                        }
                        return false;
                    }
                })
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.i(TAG, "accept: " + aLong);
                    }
                });
08-24 17:30:57.613 14530-14549/com.example.wty.learnrxjava I/MainActivity: accept: 0
08-24 17:30:59.613 14530-14549/com.example.wty.learnrxjava I/MainActivity: accept: 2
08-24 17:31:01.613 14530-14549/com.example.wty.learnrxjava I/MainActivity: accept: 4
08-24 17:31:03.613 14530-14549/com.example.wty.learnrxjava I/MainActivity: accept: 6
08-24 17:31:05.613 14530-14549/com.example.wty.learnrxjava I/MainActivity: accept: 8
08-24 17:31:07.613 14530-14549/com.example.wty.learnrxjava I/MainActivity: accept: 10
  • First
    只發(fā)射第一項(xiàng)(或者滿(mǎn)足某個(gè)條件的第一項(xiàng))數(shù)據(jù)
  • IgnoreElements
    不發(fā)射任何數(shù)據(jù)恋捆,只發(fā)射Observable的終止通知
  • skip/take
    skip
    image.png

    take
    image.png
  • skipLast/takeLast
image.png
image.png
  • Sample
    定期發(fā)射Observable最近發(fā)射的數(shù)據(jù)項(xiàng)
image.png

RxJava將這個(gè)操作符實(shí)現(xiàn)為sample和throttleLast照皆。

Observable.range(0,1000)
                .sample(1, TimeUnit.MICROSECONDS)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.i(TAG, "accept: " + integer);
                    }
                });
08-24 17:41:23.376 22979-23017/com.example.wty.learnrxjava I/MainActivity: accept: 56
08-24 17:41:23.376 22979-23017/com.example.wty.learnrxjava I/MainActivity: accept: 154
08-24 17:41:23.376 22979-23017/com.example.wty.learnrxjava I/MainActivity: accept: 186
08-24 17:41:23.376 22979-23017/com.example.wty.learnrxjava I/MainActivity: accept: 208
08-24 17:41:23.376 22979-23017/com.example.wty.learnrxjava I/MainActivity: accept: 228
08-24 17:41:23.376 22979-23017/com.example.wty.learnrxjava I/MainActivity: accept: 247

組合操作

  • zip
    通過(guò)一個(gè)函數(shù)將多個(gè)Observables的發(fā)射物結(jié)合到一起,基于這個(gè)函數(shù)的結(jié)果為每個(gè)結(jié)合體發(fā)射單個(gè)數(shù)據(jù)項(xiàng)沸停。
    Zip操作符返回一個(gè)Obversable膜毁,它使用這個(gè)函數(shù)按順序結(jié)合兩個(gè)或多個(gè)Observables發(fā)射的數(shù)據(jù)項(xiàng),然后它發(fā)射這個(gè)函數(shù)返回的結(jié)果愤钾。它按照嚴(yán)格的順序應(yīng)用這個(gè)函數(shù)瘟滨。它只發(fā)射與發(fā)射數(shù)據(jù)項(xiàng)最少的那個(gè)Observable一樣多的數(shù)據(jù)。
image.png

上代碼:

Observable<Integer> integerObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                Thread.sleep(1000);
                e.onNext(1);
                Thread.sleep(1000);
                e.onNext(2);
                Thread.sleep(1000);
                e.onNext(3);
                Thread.sleep(1000);
                e.onNext(4);
                Thread.sleep(1000);
                e.onComplete();
            }
        }).subscribeOn(Schedulers.newThread());

        Observable<String> stringObservable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                Thread.sleep(700);
                e.onNext("A");
                Thread.sleep(700);
                e.onNext("B");
                Thread.sleep(700);
                e.onNext("C");
                Thread.sleep(700);
                e.onNext("D");
                Thread.sleep(700);
                e.onNext("E");
                Thread.sleep(700);
                e.onNext("F");
                Thread.sleep(700);
                e.onComplete();
            }
        }).subscribeOn(Schedulers.newThread());
        Observable
                .zip(stringObservable, integerObservable, new BiFunction<String, Integer, String>() {
                    @Override
                    public String apply(@NonNull String s, @NonNull Integer integer) throws Exception {
                        return s + integer;
                    }
                })
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {
                        Log.i(TAG, "onSubscribe: ");
                    }

                    @Override
                    public void onNext(@NonNull String s) {
                        Log.i(TAG, "onNext: " +s);
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {

                    }

                    @Override
                    public void onComplete() {
                        Log.i(TAG, "onComplete: ");
                    }
                });
08-24 17:50:11.195 30117-30157/com.example.wty.learnrxjava I/MainActivity: onNext: A1
08-24 17:50:12.196 30117-30157/com.example.wty.learnrxjava I/MainActivity: onNext: B2
08-24 17:50:13.197 30117-30157/com.example.wty.learnrxjava I/MainActivity: onNext: C3
08-24 17:50:14.198 30117-30157/com.example.wty.learnrxjava I/MainActivity: onNext: D4
08-24 17:50:15.198 30117-30157/com.example.wty.learnrxjava I/MainActivity: onComplete: 

錯(cuò)誤處理

  • Retry
    如果原始Observable遇到錯(cuò)誤(即接收到onError的時(shí)候觸發(fā))能颁,重新訂閱它期望它能正常終止
image.png
  • RetryWhen
    retryWhen和retry類(lèi)似杂瘸,區(qū)別是,retryWhen將onError中的Throwable傳遞給一個(gè)函數(shù)伙菊,這個(gè)函數(shù)產(chǎn)生另一個(gè)Observable败玉,retryWhen觀察它的結(jié)果再?zèng)Q定是不是要重新訂閱原始的Observable敌土。如果這個(gè)Observable發(fā)射了一項(xiàng)數(shù)據(jù),它就重新訂閱绒怨,如果這個(gè)Observable發(fā)射的是onError通知纯赎,它就將這個(gè)通知傳遞給觀察者然后終止。
 Observable<Integer> integerObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                Thread.sleep(1000);
                e.onNext(1);
                Thread.sleep(1000);
                e.onNext(2);
                Thread.sleep(1000);
                e.onNext(3);
                int value = 1 / 0;
                Thread.sleep(1000);
                e.onNext(4);
                Thread.sleep(1000);
                e.onComplete();
            }
        }).subscribeOn(Schedulers.newThread());
integerObservable.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(@NonNull Observable<Throwable> throwableObservable) throws Exception {

                return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(@NonNull Throwable throwable) throws Exception {
                        if (throwable instanceof  ArithmeticException)
                            return Observable.just(1);
                        else
                            return Observable.error(throwable);
                    }
                });
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Log.i(TAG, "onSubscribe: ");
            }

            @Override
            public void onNext(@NonNull Integer integer) {
                Log.i(TAG, "onNext: " + integer);
            }

            @Override
            public void onError(@NonNull Throwable e) {
                Log.e(TAG, "onError: ", e);
            }

            @Override
            public void onComplete() {
                Log.i(TAG, "onComplete: ");
            }
        });
08-24 17:56:24.620 3895-3935/com.example.wty.learnrxjava I/MainActivity: onNext: 1
08-24 17:56:25.621 3895-3935/com.example.wty.learnrxjava I/MainActivity: onNext: 2
08-24 17:56:26.621 3895-3935/com.example.wty.learnrxjava I/MainActivity: onNext: 3
08-24 17:56:27.624 3895-3998/com.example.wty.learnrxjava I/MainActivity: onNext: 1
08-24 17:56:28.624 3895-3998/com.example.wty.learnrxjava I/MainActivity: onNext: 2
08-24 17:56:29.625 3895-3998/com.example.wty.learnrxjava I/MainActivity: onNext: 3
08-24 17:56:30.629 3895-4038/com.example.wty.learnrxjava I/MainActivity: onNext: 1
08-24 17:56:31.630 3895-4038/com.example.wty.learnrxjava I/MainActivity: onNext: 2
08-24 17:56:32.630 3895-4038/com.example.wty.learnrxjava I/MainActivity: onNext: 3

線程調(diào)度

  • SubscribeOn
    指定Observable自身在哪個(gè)調(diào)度器上執(zhí)行
  • ObserveOn
    指定一個(gè)觀察者在哪個(gè)調(diào)度器上觀察這個(gè)Observable
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末南蹂,一起剝皮案震驚了整個(gè)濱河市犬金,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌六剥,老刑警劉巖晚顷,帶你破解...
    沈念sama閱讀 207,113評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異疗疟,居然都是意外死亡该默,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,644評(píng)論 2 381
  • 文/潘曉璐 我一進(jìn)店門(mén)策彤,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)栓袖,“玉大人,你說(shuō)我怎么就攤上這事店诗」危” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 153,340評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵庞瘸,是天一觀的道長(zhǎng)捧弃。 經(jīng)常有香客問(wèn)我,道長(zhǎng)擦囊,這世上最難降的妖魔是什么违霞? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,449評(píng)論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮瞬场,結(jié)果婚禮上买鸽,老公的妹妹穿的比我還像新娘。我一直安慰自己贯被,他們只是感情好眼五,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,445評(píng)論 5 374
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著刃榨,像睡著了一般。 火紅的嫁衣襯著肌膚如雪双仍。 梳的紋絲不亂的頭發(fā)上枢希,一...
    開(kāi)封第一講書(shū)人閱讀 49,166評(píng)論 1 284
  • 那天,我揣著相機(jī)與錄音朱沃,去河邊找鬼苞轿。 笑死茅诱,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的搬卒。 我是一名探鬼主播瑟俭,決...
    沈念sama閱讀 38,442評(píng)論 3 401
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼契邀!你這毒婦竟也來(lái)了摆寄?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 37,105評(píng)論 0 261
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤坯门,失蹤者是張志新(化名)和其女友劉穎微饥,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體古戴,經(jīng)...
    沈念sama閱讀 43,601評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡欠橘,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,066評(píng)論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了现恼。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片肃续。...
    茶點(diǎn)故事閱讀 38,161評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖叉袍,靈堂內(nèi)的尸體忽然破棺而出始锚,到底是詐尸還是另有隱情,我是刑警寧澤畦韭,帶...
    沈念sama閱讀 33,792評(píng)論 4 323
  • 正文 年R本政府宣布疼蛾,位于F島的核電站,受9級(jí)特大地震影響艺配,放射性物質(zhì)發(fā)生泄漏察郁。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,351評(píng)論 3 307
  • 文/蒙蒙 一转唉、第九天 我趴在偏房一處隱蔽的房頂上張望皮钠。 院中可真熱鬧,春花似錦赠法、人聲如沸麦轰。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,352評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)款侵。三九已至,卻和暖如春侧纯,著一層夾襖步出監(jiān)牢的瞬間新锈,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,584評(píng)論 1 261
  • 我被黑心中介騙來(lái)泰國(guó)打工眶熬, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留妹笆,地道東北人块请。 一個(gè)月前我還...
    沈念sama閱讀 45,618評(píng)論 2 355
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像拳缠,于是被迫代替她去往敵國(guó)和親墩新。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,916評(píng)論 2 344

推薦閱讀更多精彩內(nèi)容

  • 本篇文章介主要紹RxJava中操作符是以函數(shù)作為基本單位窟坐,與響應(yīng)式編程作為結(jié)合使用的海渊,對(duì)什么是操作、操作符都有哪些...
    嘎啦果安卓獸閱讀 2,841評(píng)論 0 10
  • 創(chuàng)建操作 用于創(chuàng)建Observable的操作符Create通過(guò)調(diào)用觀察者的方法從頭創(chuàng)建一個(gè)ObservableEm...
    rkua閱讀 1,802評(píng)論 0 1
  • 作者: maplejaw本篇只解析標(biāo)準(zhǔn)包中的操作符狸涌。對(duì)于擴(kuò)展包切省,由于使用率較低,如有需求帕胆,請(qǐng)讀者自行查閱文檔朝捆。 創(chuàng)...
    maplejaw_閱讀 45,600評(píng)論 8 93
  • RxJava正在Android開(kāi)發(fā)者中變的越來(lái)越流行。唯一的問(wèn)題就是上手不容易懒豹,尤其是大部分人之前都是使用命令式編...
    劉啟敏閱讀 1,846評(píng)論 1 7
  • 注:只包含標(biāo)準(zhǔn)包中的操作符芙盘,用于個(gè)人學(xué)習(xí)及備忘參考博客:http://blog.csdn.net/maplejaw...
    小白要超神閱讀 2,184評(píng)論 2 8