創(chuàng)建操作
- create
-
defer
直到有觀察者訂閱時(shí)才創(chuàng)建Observable,并且為每個(gè)觀察者創(chuàng)建一個(gè)新的Observable执泰。
Defer操作符會(huì)一直等待直到有觀察者訂閱它,然后它使用Observable工廠方法生成一個(gè)Observable距潘。它對(duì)每個(gè)觀察者都這樣做罗侯,因此盡管每個(gè)訂閱者都以為自己訂閱的是同一個(gè)Observable,事實(shí)上每個(gè)訂閱者獲取的是它們自己的單獨(dú)的數(shù)據(jù)序列伏钠。
在某些情況下,等待直到最后一分鐘(就是直到訂閱發(fā)生時(shí))才生成Observable可以確保Observable包含最新的數(shù)據(jù)谨设。 -
Empty/Never/Throw
Empty
創(chuàng)建一個(gè)不發(fā)射任何數(shù)據(jù)但是正常終止的Observable
Never
創(chuàng)建一個(gè)不發(fā)射數(shù)據(jù)也不終止的Observable
Throw
創(chuàng)建一個(gè)不發(fā)射數(shù)據(jù)以一個(gè)錯(cuò)誤終止的Observable -
fromArray/fromIterable
將其它種類(lèi)的對(duì)象和數(shù)據(jù)類(lèi)型轉(zhuǎn)換為Observable
在RxJava中熟掂,from操作符可以轉(zhuǎn)換Iterable和Array等。對(duì)于Iterable和數(shù)組扎拣,產(chǎn)生的Observable會(huì)發(fā)射Iterable或數(shù)組
-
Interval
創(chuàng)建一個(gè)按固定時(shí)間間隔發(fā)射無(wú)線遞增的整數(shù)序列的Observable,RxJava將這個(gè)操作符實(shí)現(xiàn)為interval方法赴肚。它接受一個(gè)表示時(shí)間間隔的參數(shù)和一個(gè)表示時(shí)間單位的參數(shù)。
interval(long,TimeUnit)
interval(long,TimeUnit,Scheduler)
-
just
創(chuàng)建一個(gè)發(fā)射指定值的Observable,Just將單個(gè)數(shù)據(jù)轉(zhuǎn)換為發(fā)射那個(gè)數(shù)據(jù)的Observable二蓝。
Just類(lèi)似于From誉券,但是From會(huì)將數(shù)組或Iterable的數(shù)據(jù)取出然后逐個(gè)發(fā)射,而Just只是簡(jiǎn)單的原樣發(fā)射刊愚,將數(shù)組或Iterable當(dāng)做單個(gè)數(shù)據(jù)踊跟。just方法最多接受10個(gè)參數(shù),返回一個(gè)按參數(shù)列表順序發(fā)射這些數(shù)據(jù)的Observable -
Range
創(chuàng)建一個(gè)發(fā)射特定整數(shù)序列的Observable
RxJava將這個(gè)操作符實(shí)現(xiàn)為range函數(shù),它接受兩個(gè)參數(shù)鸥诽,一個(gè)是范圍的起始值商玫,一個(gè)是范圍的數(shù)據(jù)的數(shù)目。如果你將第二個(gè)參數(shù)設(shè)為0牡借,將導(dǎo)致Observable不發(fā)射任何數(shù)據(jù)(如果設(shè)置為負(fù)數(shù)决帖,會(huì)拋異常)。 -
Repeat
創(chuàng)建一個(gè)發(fā)射特定數(shù)據(jù)重復(fù)多次的Observable
RxJava將這個(gè)操作符實(shí)現(xiàn)為repeat方法蓖捶。它不是創(chuàng)建一個(gè)Observable地回,而是重復(fù)發(fā)射原始Observable的數(shù)據(jù)序列,這個(gè)序列或者是無(wú)限的,或者通過(guò)repeat(n)指定重復(fù)次數(shù)
變換操作
-
map
對(duì)Observable發(fā)射的每一項(xiàng)數(shù)據(jù)應(yīng)用一個(gè)函數(shù)刻像,執(zhí)行變換操作
Map操作符對(duì)原始Observable發(fā)射的每一項(xiàng)數(shù)據(jù)應(yīng)用一個(gè)你選擇的函數(shù)畅买,然后返回一個(gè)發(fā)射這些結(jié)果的Observable。
RxJava將這個(gè)操作符實(shí)現(xiàn)為map函數(shù)细睡。這個(gè)操作符默認(rèn)不在任何特定的調(diào)度器上執(zhí)行谷羞。
Observable.just(1,2,3)
.map(new Function<Integer, String>() {
@Override
public String apply(@NonNull Integer integer) throws Exception {
return "integer is" + integer;
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i(TAG, "accept: s = " + s);
}
});
08-24 17:08:19.573 27421-27421/com.example.wty.learnrxjava I/MainActivity: accept: s = integer is 1
08-24 17:08:19.573 27421-27421/com.example.wty.learnrxjava I/MainActivity: accept: s = integer is 2
08-24 17:08:19.573 27421-27421/com.example.wty.learnrxjava I/MainActivity: accept: s = integer is 3
-
flatMap
FlatMap將一個(gè)發(fā)射數(shù)據(jù)的Observable變換為多個(gè)Observables,然后將它們發(fā)射的數(shù)據(jù)合并后放進(jìn)一個(gè)單獨(dú)的Observable
FlatMap操作符使用一個(gè)指定的函數(shù)對(duì)原始Observable發(fā)射的每一項(xiàng)數(shù)據(jù)執(zhí)行變換操作溜徙,這個(gè)函數(shù)返回一個(gè)本身也發(fā)射數(shù)據(jù)的Observable湃缎,然后FlatMap合并這些Observables發(fā)射的數(shù)據(jù),最后將合并后的結(jié)果當(dāng)做它自己的數(shù)據(jù)序列發(fā)射蠢壹。
這個(gè)方法是很有用的嗓违,例如,當(dāng)你有一個(gè)這樣的Observable:它發(fā)射一個(gè)數(shù)據(jù)序列图贸,這些數(shù)據(jù)本身包含Observable成員或者可以變換為Observable蹂季,因此你可以創(chuàng)建一個(gè)新的Observable發(fā)射這些次級(jí)Observable發(fā)射的數(shù)據(jù)的完整集合。
注意:FlatMap對(duì)這些Observables發(fā)射的數(shù)據(jù)做的是合并(merge)操作疏日,因此它們可能是交錯(cuò)的偿洁。如果想要按照嚴(yán)格的順序發(fā)射這些數(shù)據(jù),使用ConcatMap操作符即可
舉個(gè)例子:
Observable.fromIterable(getData())
.flatMap(new Function<NoteBook, ObservableSource<Note>>() {
@Override
public ObservableSource<Note> apply(@NonNull NoteBook noteBook) throws Exception {
return Observable.fromIterable(noteBook.getNotes());
}
})
.subscribe(new Consumer<Note>() {
@Override
public void accept(Note note) throws Exception {
Log.i(TAG, "accept: " + note);
}
});
08-24 17:14:09.512 32091-32091/? I/MainActivity: accept: Note{id='1', noteBookId='1', title='Introduction', content='$$$$$$$$$$$$$$$$$$'}
08-24 17:14:09.513 32091-32091/? I/MainActivity: accept: Note{id='2', noteBookId='1', title='ReactiveX', content='########################'}
08-24 17:14:09.513 32091-32091/? I/MainActivity: accept: Note{id='3', noteBookId='1', title='Observables', content='@@@@@@@@@@@@@@@@@@@@@@@@@@'}
08-24 17:14:09.513 32091-32091/? I/MainActivity: accept: Note{id='4', noteBookId='1', title='Operators Categories', content='********************'}
08-24 17:14:09.513 32091-32091/? I/MainActivity: accept: Note{id='5', noteBookId='1', title='RxJava文檔和教程', content='********************'}
08-24 17:14:09.513 32091-32091/? I/MainActivity: accept: Note{id='6', noteBookId='2', title='Retrofit入門(mén)教程1', content='$$$$$$$$$$$$$$$$$$'}
08-24 17:14:09.513 32091-32091/? I/MainActivity: accept: Note{id='7', noteBookId='2', title='Retrofit入門(mén)教程2', content='########################'}
08-24 17:14:09.513 32091-32091/? I/MainActivity: accept: Note{id='8', noteBookId='2', title='Retrofit入門(mén)教程3', content='@@@@@@@@@@@@@@@@@@@@@@@@@@'}
08-24 17:14:09.513 32091-32091/? I/MainActivity: accept: Note{id='9', noteBookId='2', title='Retrofit入門(mén)教程4', content='********************'}
08-24 17:14:09.514 32091-32091/? I/MainActivity: accept: Note{id='10', noteBookId='2', title='Retrofit入門(mén)教程5', content='********************'}
-
Buffer
定期收集Observable的數(shù)據(jù)放進(jìn)一個(gè)數(shù)據(jù)包裹,然后發(fā)射這些數(shù)據(jù)包裹沟优,而不是一次發(fā)射一個(gè)值涕滋。
Buffer操作符將一個(gè)Observable變換為另一個(gè),原來(lái)的Observable正常發(fā)射數(shù)據(jù)挠阁,變換產(chǎn)生的Observable發(fā)射這些數(shù)據(jù)的緩存集合何吝。Buffer操作符在很多語(yǔ)言特定的實(shí)現(xiàn)中有很多種變體,它們?cè)谌绾尉彺孢@個(gè)問(wèn)題上存在區(qū)別鹃唯。
注意:如果原來(lái)的Observable發(fā)射了一個(gè)onError通知,Buffer會(huì)立即傳遞這個(gè)通知瓣喊,而不是首先發(fā)射緩存的數(shù)據(jù)坡慌,即使在這之前緩存中包含了原始Observable發(fā)射的數(shù)據(jù)。
Observable.interval(1000,TimeUnit.MILLISECONDS)
.buffer(5)
.subscribe(new Consumer<List<Long>>() {
@Override
public void accept(List<Long> longs) throws Exception {
Log.i(TAG, "accept: longs = " + longs);
}
});
08-24 17:04:12.162 23975-24014/com.example.wty.learnrxjava I/MainActivity: accept: longs = [0, 1, 2, 3, 4]
08-24 17:04:17.162 23975-24014/com.example.wty.learnrxjava I/MainActivity: accept: longs = [5, 6, 7, 8, 9]
08-24 17:04:22.162 23975-24014/com.example.wty.learnrxjava I/MainActivity: accept: longs = [10, 11, 12, 13, 14]
08-24 17:04:27.162 23975-24014/com.example.wty.learnrxjava I/MainActivity: accept: longs = [15, 16, 17, 18, 19]
08-24 17:04:32.162 23975-24014/com.example.wty.learnrxjava I/MainActivity: accept: longs = [20, 21, 22, 23, 24]
08-24 17:04:37.162 23975-24014/com.example.wty.learnrxjava I/MainActivity: accept: longs = [25, 26, 27, 28, 29]
-
GroupBy
將一個(gè)Observable分拆為一些Observables集合藻三,它們中的每一個(gè)發(fā)射原始Observable的一個(gè)子序列
GroupBy操作符將原始Observable分拆為一些Observables集合洪橘,它們中的每一個(gè)發(fā)射原始Observable數(shù)據(jù)序列的一個(gè)子序列。哪個(gè)數(shù)據(jù)項(xiàng)由哪一個(gè)Observable發(fā)射是由一個(gè)函數(shù)判定的棵帽,這個(gè)函數(shù)給每一項(xiàng)指定一個(gè)Key熄求,Key相同的數(shù)據(jù)會(huì)被同一個(gè)Observable發(fā)射。
舉個(gè)例子,
Observable.interval(1, TimeUnit.SECONDS)
.take(30)
.groupBy(new Function<Long, Integer>() {
@Override
public Integer apply(@NonNull Long aLong) throws Exception {
if (aLong % 3 == 0) {
return 3;
} else if (aLong % 4 == 0) {
return 4;
} else if (aLong % 5 == 0) {
return 5;
} else {
return 1;
}
}
})
.subscribe(new Consumer<GroupedObservable<Integer, Long>>() {
@Override
public void accept(GroupedObservable<Integer, Long> longLongGroupedObservable) throws Exception {
Integer key = longLongGroupedObservable.getKey();
Log.i(TAG, "accept: key = " + key);
if (key == 3) {
longLongGroupedObservable.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.i(TAG, "accept: key = 3,3的倍數(shù),aLong = " + aLong);
}
});
} else if (key == 4) {
longLongGroupedObservable.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.i(TAG, "accept: key = 4,4的倍數(shù) aLong = " + aLong);
}
});
} else if (key == 5) {
longLongGroupedObservable.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.i(TAG, "accept: key = 5,5的倍數(shù) aLong = " + aLong);
}
});
} else {
longLongGroupedObservable.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.i(TAG, "accept: key = 1,不是3逗概、4弟晚、5的倍數(shù) aLong = " + aLong);
}
});
}
}
});
08-24 17:22:37.134 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 1
08-24 17:22:37.135 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 1,不是3、4、5的倍數(shù) aLong = 1
08-24 17:22:37.231 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 1,不是3卿城、4枚钓、5的倍數(shù) aLong = 2
08-24 17:22:37.331 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 3
08-24 17:22:37.332 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 3,3的倍數(shù),aLong = 3
08-24 17:22:37.431 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 4
08-24 17:22:37.432 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 4,4的倍數(shù) aLong = 4
08-24 17:22:37.531 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 5
08-24 17:22:37.532 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 5,5的倍數(shù) aLong = 5
08-24 17:22:37.631 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 3,3的倍數(shù),aLong = 6
08-24 17:22:37.731 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 1,不是3、4瑟押、5的倍數(shù) aLong = 7
08-24 17:22:37.831 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 4,4的倍數(shù) aLong = 8
08-24 17:22:37.931 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 3,3的倍數(shù),aLong = 9
08-24 17:22:38.031 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 5,5的倍數(shù) aLong = 10
08-24 17:22:38.131 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 1,不是3搀捷、4、5的倍數(shù) aLong = 11
08-24 17:22:38.231 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 3,3的倍數(shù),aLong = 12
08-24 17:22:38.331 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 1,不是3多望、4嫩舟、5的倍數(shù) aLong = 13
08-24 17:22:38.431 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 1,不是3、4怀偷、5的倍數(shù) aLong = 14
08-24 17:22:38.531 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 3,3的倍數(shù),aLong = 15
08-24 17:22:38.631 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 4,4的倍數(shù) aLong = 16
08-24 17:22:38.731 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 1,不是3家厌、4、5的倍數(shù) aLong = 17
08-24 17:22:38.831 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 3,3的倍數(shù),aLong = 18
08-24 17:22:38.931 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 1,不是3枢纠、4像街、5的倍數(shù) aLong = 19
08-24 17:22:39.031 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 4,4的倍數(shù) aLong = 20
08-24 17:22:39.131 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 3,3的倍數(shù),aLong = 21
08-24 17:22:39.231 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 1,不是3、4晋渺、5的倍數(shù) aLong = 22
08-24 17:22:39.331 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 1,不是3镰绎、4、5的倍數(shù) aLong = 23
08-24 17:22:39.431 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 3,3的倍數(shù),aLong = 24
08-24 17:22:39.531 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 5,5的倍數(shù) aLong = 25
08-24 17:22:39.631 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 1,不是3木西、4畴栖、5的倍數(shù) aLong = 26
08-24 17:22:39.731 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 3,3的倍數(shù),aLong = 27
08-24 17:22:39.831 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 4,4的倍數(shù) aLong = 28
08-24 17:22:39.932 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 1,不是3、4八千、5的倍數(shù) aLong = 29
08-24 17:22:40.031 7842-7881/com.example.wty.learnrxjava I/MainActivity: accept: key = 3,3的倍數(shù),aLong = 30
過(guò)濾操作
-
Distinct
抑制(過(guò)濾掉)重復(fù)的數(shù)據(jù)項(xiàng)
Observable.just(1,2,1,1,2,1,1,2,3,4)
.distinct(new Function<Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer) throws Exception {
return integer;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i(TAG, "accept: integer = " + integer);
}
});
08-24 17:26:19.198 10816-10816/com.example.wty.learnrxjava I/MainActivity: accept: integer = 1
08-24 17:26:19.199 10816-10816/com.example.wty.learnrxjava I/MainActivity: accept: integer = 2
08-24 17:26:19.199 10816-10816/com.example.wty.learnrxjava I/MainActivity: accept: integer = 3
08-24 17:26:19.199 10816-10816/com.example.wty.learnrxjava I/MainActivity: accept: integer = 4
-
ElementAt
只發(fā)射第N項(xiàng)數(shù)據(jù) -
Filter
Filter操作符使用你指定的一個(gè)謂詞函數(shù)測(cè)試數(shù)據(jù)項(xiàng)吗讶,只有通過(guò)測(cè)試的數(shù)據(jù)才會(huì)被發(fā)射。
舉個(gè)例子,過(guò)濾整數(shù)序列中的奇數(shù),只發(fā)射偶數(shù)
Observable.interval(1,TimeUnit.SECONDS)
.filter(new Predicate<Long>() {
@Override
public boolean test(@NonNull Long aLong) throws Exception {
if (aLong % 2 == 0){
return true;
}
return false;
}
})
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.i(TAG, "accept: " + aLong);
}
});
08-24 17:30:57.613 14530-14549/com.example.wty.learnrxjava I/MainActivity: accept: 0
08-24 17:30:59.613 14530-14549/com.example.wty.learnrxjava I/MainActivity: accept: 2
08-24 17:31:01.613 14530-14549/com.example.wty.learnrxjava I/MainActivity: accept: 4
08-24 17:31:03.613 14530-14549/com.example.wty.learnrxjava I/MainActivity: accept: 6
08-24 17:31:05.613 14530-14549/com.example.wty.learnrxjava I/MainActivity: accept: 8
08-24 17:31:07.613 14530-14549/com.example.wty.learnrxjava I/MainActivity: accept: 10
-
First
只發(fā)射第一項(xiàng)(或者滿(mǎn)足某個(gè)條件的第一項(xiàng))數(shù)據(jù) -
IgnoreElements
不發(fā)射任何數(shù)據(jù)恋捆,只發(fā)射Observable的終止通知 -
skip/take
skip
take
- skipLast/takeLast
-
Sample
定期發(fā)射Observable最近發(fā)射的數(shù)據(jù)項(xiàng)
RxJava將這個(gè)操作符實(shí)現(xiàn)為sample和throttleLast照皆。
Observable.range(0,1000)
.sample(1, TimeUnit.MICROSECONDS)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i(TAG, "accept: " + integer);
}
});
08-24 17:41:23.376 22979-23017/com.example.wty.learnrxjava I/MainActivity: accept: 56
08-24 17:41:23.376 22979-23017/com.example.wty.learnrxjava I/MainActivity: accept: 154
08-24 17:41:23.376 22979-23017/com.example.wty.learnrxjava I/MainActivity: accept: 186
08-24 17:41:23.376 22979-23017/com.example.wty.learnrxjava I/MainActivity: accept: 208
08-24 17:41:23.376 22979-23017/com.example.wty.learnrxjava I/MainActivity: accept: 228
08-24 17:41:23.376 22979-23017/com.example.wty.learnrxjava I/MainActivity: accept: 247
組合操作
- zip
通過(guò)一個(gè)函數(shù)將多個(gè)Observables的發(fā)射物結(jié)合到一起,基于這個(gè)函數(shù)的結(jié)果為每個(gè)結(jié)合體發(fā)射單個(gè)數(shù)據(jù)項(xiàng)沸停。
Zip操作符返回一個(gè)Obversable膜毁,它使用這個(gè)函數(shù)按順序結(jié)合兩個(gè)或多個(gè)Observables發(fā)射的數(shù)據(jù)項(xiàng),然后它發(fā)射這個(gè)函數(shù)返回的結(jié)果愤钾。它按照嚴(yán)格的順序應(yīng)用這個(gè)函數(shù)瘟滨。它只發(fā)射與發(fā)射數(shù)據(jù)項(xiàng)最少的那個(gè)Observable一樣多的數(shù)據(jù)。
上代碼:
Observable<Integer> integerObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
Thread.sleep(1000);
e.onNext(1);
Thread.sleep(1000);
e.onNext(2);
Thread.sleep(1000);
e.onNext(3);
Thread.sleep(1000);
e.onNext(4);
Thread.sleep(1000);
e.onComplete();
}
}).subscribeOn(Schedulers.newThread());
Observable<String> stringObservable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
Thread.sleep(700);
e.onNext("A");
Thread.sleep(700);
e.onNext("B");
Thread.sleep(700);
e.onNext("C");
Thread.sleep(700);
e.onNext("D");
Thread.sleep(700);
e.onNext("E");
Thread.sleep(700);
e.onNext("F");
Thread.sleep(700);
e.onComplete();
}
}).subscribeOn(Schedulers.newThread());
Observable
.zip(stringObservable, integerObservable, new BiFunction<String, Integer, String>() {
@Override
public String apply(@NonNull String s, @NonNull Integer integer) throws Exception {
return s + integer;
}
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.i(TAG, "onSubscribe: ");
}
@Override
public void onNext(@NonNull String s) {
Log.i(TAG, "onNext: " +s);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete: ");
}
});
08-24 17:50:11.195 30117-30157/com.example.wty.learnrxjava I/MainActivity: onNext: A1
08-24 17:50:12.196 30117-30157/com.example.wty.learnrxjava I/MainActivity: onNext: B2
08-24 17:50:13.197 30117-30157/com.example.wty.learnrxjava I/MainActivity: onNext: C3
08-24 17:50:14.198 30117-30157/com.example.wty.learnrxjava I/MainActivity: onNext: D4
08-24 17:50:15.198 30117-30157/com.example.wty.learnrxjava I/MainActivity: onComplete:
錯(cuò)誤處理
- Retry
如果原始Observable遇到錯(cuò)誤(即接收到onError的時(shí)候觸發(fā))能颁,重新訂閱它期望它能正常終止
-
RetryWhen
retryWhen和retry類(lèi)似杂瘸,區(qū)別是,retryWhen將onError中的Throwable傳遞給一個(gè)函數(shù)伙菊,這個(gè)函數(shù)產(chǎn)生另一個(gè)Observable败玉,retryWhen觀察它的結(jié)果再?zèng)Q定是不是要重新訂閱原始的Observable敌土。如果這個(gè)Observable發(fā)射了一項(xiàng)數(shù)據(jù),它就重新訂閱绒怨,如果這個(gè)Observable發(fā)射的是onError通知纯赎,它就將這個(gè)通知傳遞給觀察者然后終止。
Observable<Integer> integerObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
Thread.sleep(1000);
e.onNext(1);
Thread.sleep(1000);
e.onNext(2);
Thread.sleep(1000);
e.onNext(3);
int value = 1 / 0;
Thread.sleep(1000);
e.onNext(4);
Thread.sleep(1000);
e.onComplete();
}
}).subscribeOn(Schedulers.newThread());
integerObservable.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull Throwable throwable) throws Exception {
if (throwable instanceof ArithmeticException)
return Observable.just(1);
else
return Observable.error(throwable);
}
});
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.i(TAG, "onSubscribe: ");
}
@Override
public void onNext(@NonNull Integer integer) {
Log.i(TAG, "onNext: " + integer);
}
@Override
public void onError(@NonNull Throwable e) {
Log.e(TAG, "onError: ", e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete: ");
}
});
08-24 17:56:24.620 3895-3935/com.example.wty.learnrxjava I/MainActivity: onNext: 1
08-24 17:56:25.621 3895-3935/com.example.wty.learnrxjava I/MainActivity: onNext: 2
08-24 17:56:26.621 3895-3935/com.example.wty.learnrxjava I/MainActivity: onNext: 3
08-24 17:56:27.624 3895-3998/com.example.wty.learnrxjava I/MainActivity: onNext: 1
08-24 17:56:28.624 3895-3998/com.example.wty.learnrxjava I/MainActivity: onNext: 2
08-24 17:56:29.625 3895-3998/com.example.wty.learnrxjava I/MainActivity: onNext: 3
08-24 17:56:30.629 3895-4038/com.example.wty.learnrxjava I/MainActivity: onNext: 1
08-24 17:56:31.630 3895-4038/com.example.wty.learnrxjava I/MainActivity: onNext: 2
08-24 17:56:32.630 3895-4038/com.example.wty.learnrxjava I/MainActivity: onNext: 3
線程調(diào)度
-
SubscribeOn
指定Observable自身在哪個(gè)調(diào)度器上執(zhí)行 -
ObserveOn
指定一個(gè)觀察者在哪個(gè)調(diào)度器上觀察這個(gè)Observable