Rxjava2 可謂是日常開發(fā)中的利器亩进,特別是在異步任務中更能發(fā)揮作用。響應式編程以及流式api的良好支持膨俐,給予了更好的編碼體驗勇皇。越來越多開發(fā)者漸漸用起來了。學習rxjava2最好的地方無外乎官方文檔焚刺,詳細且完整敛摘。以下結合官方文檔和我自己的理解以及例子,解釋各個操作符的用法乳愉,給各位以及我自己作一篇參考兄淫。
怎么用Rxjava2
要使用RxJava屯远,需要先創(chuàng)建Observables(發(fā)出數(shù)據(jù)項),以各種方式轉換這些Observable以獲取所需要的精確數(shù)據(jù)項(通過使用Observable運算符)捕虽,然后觀察并響應這些需要的項目序列(通過實現(xiàn)觀察者)
或者訂閱者氓润,然后將它們訂閱到最終的變換后的Observables)。
Creating Observables 創(chuàng)建操作符
just
通過獲取預先存在的對象并在訂閱時將該特定對象發(fā)布給下游使用者來構造反應類型薯鳍。為方便起見咖气,存在2到9個參數(shù)的重載,這些對象(具有相同的常見類型)將按指定的順序發(fā)出挖滤。就像From類似崩溪,但請注意From將傳入一個數(shù)組或一個iterable或類似的東西來取出要發(fā)出的項目,而Just只是簡單地發(fā)出數(shù)組或者迭代器斩松。
請注意伶唯,如果將null傳遞給Just,它將返回一個Observable惧盹,它將null作為項發(fā)出乳幸。不要錯誤地假設這將返回一個空的Observable(一個根本不發(fā)出任何項目)。為此钧椰,需要使用Empty運算符粹断。
just
fun testOpJust() {
val arr = arrayOf("mary", "tom", "ben", "lisa", "ken")
Observable.fromArray(arr).filter { it.size > 3 }.map { it + "s" }.subscribe(System.out::println)
val list = arrayListOf("mary", "tom", "ben", "lisa", "ken")
Observable.just(list).forEach { it -> System.out.println(it + "s") }
list.stream().filter { it -> it.length > 3 }.map { "$it s" }.forEach(System.out::println)
}
from
根據(jù)預先存在的源或生成器類型構造序列。當使用Observable時嫡霞,如果使用的所有數(shù)據(jù)都可以表示為Observables瓶埋,而不是Observables和其他類型的混合,則可以更方便诊沪。這允許使用一組運算符來控制數(shù)據(jù)流的整個生命周期养筒。例如,Iterables可以被認為是一種的Observable;作為一種始終只發(fā)出單一項目的Observable端姚。通過將這些對象顯式轉換為Observable晕粪,可以將它們作為對等體與其他Observable進行交互。因此渐裸,大多數(shù)ReactiveX實現(xiàn)都具有允許將特定于語言的對象和數(shù)據(jù)結構轉換為Observable的方法巫湘。
注意:這些靜態(tài)方法使用后綴命名約定(即,在方法名稱中重復參數(shù)類型)以避免重載解析模糊橄仆。
fromIterable
從java.lang.Iterable源(例如Lists剩膘,Sets或Collections或custom Iterables)發(fā)出信號,然后完成序列盆顾。
可用于 Flowable ,Observable
fromArray
發(fā)信號通知給定數(shù)組的元素怠褐,然后完成序列。
可用于Flowable,Observable
注意:RxJava不支持原始數(shù)組您宪,只支持(通用)引用數(shù)組奈懒。
fun testOpFrom(){
val list = arrayListOf<Int>(1,2,3,4,5,6)
Observable.fromIterable(list).subscribe(System.out::println)
Observable.fromArray(1,2,3,4,5,6).subscribe(System.out::println)
}
fromCallable
當消費者訂閱時奠涌,調用給定的java.util.concurrent.Callable并將其返回值(或拋出的異常)轉發(fā)給該使用者。
可用于:Observable,Flowable,Maybe,Single,Completable
備注:在Completable中磷杏,忽略實際返回值溜畅,并且Completable完成。
Observable.fromCallable<String> {
"hello"
}.subscribe(System.out::println)
Completable.fromCallable{
"complatable from callable"
}.subscribe {
System.out.println("complete")
}
fromAction
當消費者訂閱時极祸,調用給定的io.reactivex.function.Action并且消費者完成或接收Action拋出的異常慈格。
可用于: Maybe,Completable
Maybe.fromAction<String>{
System.out.println("maybe from action")
}.subscribe(System.out::println)
以下標星先不多做解釋,用得不多
*fromRunnable
*fromFuture
*from{reactive type}
將另一種反應類型包裹或轉換為目標反應類型遥金。具有以下簽名模式的各種反應類型中提供以下組合:targetType.from {sourceType}()
*注意:并非所有可能的轉換都是通過from {reactive type}方法系列實現(xiàn)的浴捆。查看to {reactive type}方法系列以獲得進一步的轉換可能性。
注意:fromAction和fromRunnable之間的區(qū)別在于Action接口允許拋出已受檢的異常稿械,而java.lang.Runnable則不然选泻。
error
可用于Observable,Flowable,Maybe,Single,Completable
通過java.util.concurrent.Callable向消費者發(fā)出預先存在或生成的錯誤信號。
fun testOpError(){
Observable.error<Throwable>(IOException(""))
.subscribe({
System.out.print("不會打印吧")
},{
it.printStackTrace()
},{
System.out.println("也不會打印")
})
}
一個典型的用例是使用onErrorResumeNext有條件地映射或抑制鏈中的異常:
/**
* 抑制鏈上發(fā)生的異常
*/
@Test
fun testOpOnErrorResumeNext() {
val observable = Observable.fromCallable {
if (Math.random() < 0.5f) {
throw IllegalArgumentException()
}
throw IOException()
}
observable.onErrorResumeNext(Function {
if (it is IllegalArgumentException) {
Observable.empty()
} else {
Observable.error(it)
}
}).subscribe({
System.out.println("nothing")
},{
it.printStackTrace()
},{
System.out.println("empty")
})
}
這個onErrorResumeNext 厲害了美莫,可以說之前一直不太明白怎么很好的處理页眯。通過此操作符可以抑制錯誤的傳遞,本來如果subscribe發(fā)生了錯誤會觸發(fā)onError回調厢呵。事實上可能發(fā)生了錯誤窝撵,需要不處理或者抑制產(chǎn)生。在onErrorResumeNext的function參數(shù)中述吸,可以根據(jù)錯誤類型返回處理流程忿族。
- empty 這種類型的源在訂閱后立即表示完成。
可用于Observable,Flowable,Maybe,Single,Completable
示例可見onErrorResumeNext的例子
empty發(fā)送直接表示完成蝌矛,就是訂閱者直接調用onComplete回調。onNext 不會執(zhí)行
- never 這種類型的源不會發(fā)出任何onNext错英,onSuccess入撒,onError或onComplete的信號。這種類型的反應源可用于測試或“禁用”組合子操作符中的某些源椭岩。
可用于Observable,Flowable,Maybe,Single,Completable
不會對訂閱者的任何回調進行調用茅逮。禁用也可理解,比如發(fā)送了錯誤判哥,都不往下執(zhí)行
- interval 定期生成無限的献雅,不斷增加的數(shù)字(Long類型)。intervalRange變體生成有限數(shù)量的此類數(shù)字塌计。
可用于Observable,Flowable
fun testOpInterval(){
Observable.interval(1,TimeUnit.SECONDS)
.onErrorResumeNext(Function {
Observable.error(it)
})
.subscribe({
if (it.rem(5) == 0L) {
System.out.println("tick")
} else {
System.out.println("tock")
}
},{
it.printStackTrace()
},{
System.out.println("interval complete")
})
}
-
Timer運算符創(chuàng)建一個Observable挺身,在指定的一段時間后發(fā)出一個特定項。
Timer
也就是說在給定的時間之后發(fā)送事件
- range 為每個消費者生成一系列值锌仅。range()方法生成Integers章钾,rangeLong()生成Longs墙贱。Range運算符按順序發(fā)出一系列順序整數(shù),您可以在其中選擇范圍的起點及其長度贱傀。
可用于 Observable,Flowable
fun testOpRange(){
val s = "test range operation now"
Observable.range(0,s.length- 3)
.map { "${s[it]} in range"}
.subscribe {
System.out.println(it)
}
}
發(fā)出一系列值惨撇,參數(shù)為起點,和長度府寒。
- generate 創(chuàng)建一個冷魁衙,同步和有狀態(tài)的值生成器。
可用于Observable,Flowable
@Test
fun testOpGenerate(){
val start = 1
val increaseValue = 2
Observable.generate<Int,Int>(Callable<Int> {
start
}, BiFunction<Int, Emitter<Int>,Int> {
t1, t2 ->
t2.onNext(t1 + increaseValue)
t1 + increaseValue
}).subscribe {
System.out.println("generate value : $it")
}
}
不太明白干啥的株搔,具體應用場景纺棺。只是一直不間斷的產(chǎn)生值
Filtering Observables 過濾Observable
過濾操作是非常常用且重要的,而且相關的操作符也很多
Debounce
可用于Observable,Flowable
刪除響應源發(fā)出的項目邪狞,在給定的超時值到期之前祷蝌,這些項目后面跟著更新的項目。計時器重置每次發(fā)射帆卓。此運算符會跟蹤最近發(fā)出的項目巨朦,并且僅在有足夠的時間過去而沒有源發(fā)出任何其他項目時才會發(fā)出此項目。
按照我得理解就是debounde傳入了超時值剑令,在該時間之內如果多次發(fā)射糊啡,取離超時值最近得值。既然又超時那么也應該又開始時間吁津,開始時間就是一組發(fā)射最開始值得時間棚蓄,這一組發(fā)射得值的時的差是在debounce超時時間之內。
// Diagram:
// -A--------------B----C-D-------------------E-|---->
// a---------1s
// b---------1s
// c---------1s
// d---------1s
// e-|---->
// -----------A---------------------D-----------E-|-->
fun testOpDebounce(){
Observable.create<String>{
it.onNext("A")
Thread.sleep(1_500)
it.onNext("B")
Thread.sleep(500)
it.onNext("C")
Thread.sleep(250)
it.onNext("D")
Thread.sleep(2_000)
it.onNext("E")
}.debounce(1,TimeUnit.SECONDS)
.subscribe(System.out::println)
}
distinct
可用于Observable Flowable
通過僅發(fā)出與先前項目相比不同的項目來過濾反應源碍脏∷笠溃可以指定io.reactivex.functions.Function,將源發(fā)出的每個項目映射到一個新值中典尾,該值將用于與先前的映射值進行比較役拴。Distinct運算符通過僅允許尚未發(fā)出的項目來過濾Observable。在一些實現(xiàn)中钾埂,存在允許調整兩個項被視為“不同”的標準的變體河闰。在一些實施例中,存在操作符的變體褥紫,其僅將項目與其前一個項目進行比較以獲得更精確的比較姜性,從而僅過濾連續(xù)的重復項目,序列中的項目髓考。
fun testOpDistinct(){
Observable.fromArray(1,2,3,3,4,5)
.distinct()
.subscribe(System.out::println)
// 用來過濾序列中一組值前后是否相同得值
Observable.fromArray(1,1,2,3,2)
.distinct { "呵呵" }
.subscribe(System.out::println)
}
重載的方法部念,傳入keySelectro ,作用是對每個元素應用方法得到得新得值,再決定怎么去重
distinctUntilChanged
可用于Observable Flowable
通過僅發(fā)出與其前一個元素相比較不同的項目來過濾反應源∮』可以指定io.reactivex.functions.Function矢腻,將源發(fā)出的每個項目映射到一個新值中,該值將用于與先前的映射值進行比較射赛《喔蹋或者,可以指定io.reactivex.functions.BiPredicate作為比較器函數(shù)來比較前一個楣责。
Observable.fromArray(1,2,3,3,4,5)
// .distinctUntilChanged()
.distinctUntilChanged { t1, t2 ->
t1 == t2
}
.subscribe(System.out::println)
可以說是distinct的加強版竣灌,多了一個可以傳入比較器的重載方法
elementAt
課用于Flowable,Observable
在來自反應源的一系列發(fā)射的數(shù)據(jù)項中,以指定的從零開始的索引發(fā)出單個項目秆麸。如果指定的索引不在序列中初嘹,則可以指定將發(fā)出的默認項。
簡單說就是按照發(fā)出項的次序獲取指定的位置的元素
Observable.fromArray(1,2,3,3,4,5)
.elementAt(2)
.subscribe(System.out::println)
elementAtOrError
filter
可用于Observable,Flowable,Maybe,Single
通過僅發(fā)出滿足指定函數(shù)的項來過濾由反應源發(fā)出的項沮趣。
過濾偶數(shù)
Observable.fromArray(1,2,3,3,4,5)
.filter {
it.rem(2) == 0
}
.subscribe(System.out::println)}
first
可用于Flowable,Observable
僅發(fā)出反應源發(fā)出的第一個項目屯烦,或者如果源完成而不發(fā)出項目則發(fā)出給定的默認項目。這與firstElement的不同之處在于此運算符返回Single房铭,而firstElement返回Maybe驻龟。
Observable.fromArray(1,2,3,3,4,5)
.first(-1)
.subscribe(Consumer<Int> {
System.out.println("onNext :$it")
})
Observable.fromArray(1,2,3,3,4,5)
.firstElement()
.subscribe {
System.out.println("onNext :$it")
}
firstOrError
僅發(fā)出響應源發(fā)出的第一個項目,或者如果源完成而不發(fā)出項目則發(fā)出java.util.NoSuchElementException信號缸匪。
ignoreElement
可用于Maybe Single
忽略Single或Maybe源發(fā)出的單個項目翁狐,并返回一個Completable,它僅從源中發(fā)出錯誤或完成事件的信號凌蔬。
Maybe.timer(1L,TimeUnit.SECONDS)
.ignoreElement()
.doOnComplete {
System.out.println("done")
}
.blockingAwait()
ignoreElements
忽略Single或Maybe源發(fā)出的單個項目露懒,并返回一個Completable,它僅從源中發(fā)出錯誤或完成事件的信號砂心。
Observable.timer(1L,TimeUnit.SECONDS)
.ignoreElements()
.doOnComplete {
System.out.println("completed")
}
.blockingAwait()
last
可用于Observable,Flowable
僅發(fā)出反應源發(fā)出的最后一個項目懈词,或者如果源完成而不發(fā)出項目則發(fā)出給定的默認項目。這與lastElement的不同之處在于此運算符返回Single计贰,而lastElement返回Maybe钦睡。
Observable.fromArray(1,2,3,3,4,5)
.last(-1)
.subscribe(Consumer<Int>{
System.out.println("last $it")
})
lastElement
Observable.fromArray(1,2,3,3,4,5)
.lastElement()
.subscribe(Consumer<Int>{
System.out.println("last $it")
})
lastOnError
僅發(fā)出響應源發(fā)出的最后一項,或者如果源完成而不發(fā)出項躁倒,則發(fā)出java.util.NoSuchElementException信號。
ofType
可用于Flowable洒琢,Observable,Maybe
通過僅發(fā)出指定類型的項目來過濾反應源發(fā)出的項目秧秉。
Observable.fromArray(1,2.1f,3,3,4,5)
.ofType(Int::class.java)
.subscribe(Consumer<Int>{
System.out.println("last $it")
})
sample
可用于Observable Flowable
通過僅在周期性時間間隔內發(fā)出最近發(fā)出的項目來過濾反應源發(fā)出的項目。
Observable.create<String> {
it.onNext("A")
Thread.sleep(1_000)
it.onNext("B")
Thread.sleep(300)
it.onNext("C")
Thread.sleep(700)
it.onNext("D")
it.onComplete()
}.sample(1,TimeUnit.SECONDS)
.blockingSubscribe(System.out::println)
skip
刪除響應源發(fā)出的前n個項目衰抑,并發(fā)出剩余項目象迎。您可以通過使用Skip運算符修改Observable來忽略Observable發(fā)出的前n個項目,并僅參加之后的項目。
Observable.fromArray("hehe",2.1f,3,3,4,5)
// .ofType(String::class.java)
.skip(3)
.subscribe {
System.out.println(it)
}
skipLast
丟棄反應源發(fā)出的最后n個項目砾淌,并發(fā)出剩余的項目啦撮。
take
可用于Flowable Observable
僅發(fā)出反應源發(fā)出的前n項。
Observable.fromArray("hehe",2.1f,3,3,4,5)
.take(2)
.subscribe(System.out::println)
takeLast
可用于Flowable Observable
僅發(fā)出反應源發(fā)出的最后n個項目汪厨。
throttleFirst
可用于Flowable Observable
跟debounce有些相似赃春,是取時間范圍內第一個,在點擊事件過濾很常用
在指定持續(xù)時間的連續(xù)時間窗口期間僅發(fā)出由反應源發(fā)出的第一個項目劫乱。
Observable.create<String> {
it.onNext("A")
Thread.sleep(300)
it.onNext("B")
Thread.sleep(400)
}.throttleFirst(1,TimeUnit.SECONDS)
.subscribe(System.out::println)
throttleLast
可用于Observable,Flowable
在指定持續(xù)時間的連續(xù)時間期間僅發(fā)出由反應源發(fā)出的最后一個項目织中。跟throttleFirst相反,取最后一個值
throttleWithTimeout
跟debounce的別名
public final Observable<T> throttleWithTimeout(long timeout, TimeUnit unit) {
return debounce(timeout, unit);
}
timeout
從Observable或Flowable源發(fā)出項目衷戈,但如果在從上一項開始的指定超時持續(xù)時間內未發(fā)出下一項狭吼,則以java.util.concurrent.TimeoutException終止。對于Maybe殖妇,Single和Completable刁笙,指定的超時持續(xù)時間指定等待成功或完成事件到達的最長時間。如果Maybe谦趣,Single或Completable在給定時間內沒有完成疲吸,將發(fā)出java.util.concurrent.TimeoutException。
Observable.create<String>{
it.onNext("A")
Thread.sleep(600)
it.onNext("B")
Thread.sleep(1_500)
it.onNext("C")
Thread.sleep(500)
}.subscribeOn(Schedulers.io())
.subscribe({
System.out.println(it)
},{
it.printStackTrace()
})
捕獲處理
一下為Kotlin編寫的代碼蔚润,可以看到在發(fā)生錯誤的情況下磅氨,通過onError() 拋出了錯誤,并且需要在訂閱者嫡纠,第二個參數(shù)傳入烦租,處理錯誤的回調。
fun testErrorHandle() {
Observable.create<String> {
it.onNext("start")
Thread {
try {
System.out.println("start open ...")
it.onNext("start open ...")
val stream = URL("https://www.baidu.com").openStream()
System.out.println("after url ...")
it.onNext("after url")
val br = stream.bufferedReader()
if (!it.isDisposed) {
var text = br.readText()
it.onNext(text)
}
stream.close()
br.close()
it.onNext("after open ...")
if (!it.isDisposed) {
it.onComplete()
}
}catch (e : java.lang.Exception) {
System.out.println(e)
e.printStackTrace()
it.onError(e)
}
}.start()
}.subscribe(System.out::println) {
it.printStackTrace()
System.out.println("what the fuck")
}
}
Observable通常不會拋出異常除盏。相反叉橱,它會通過使用onError通知終止Observable序列來通知任何觀察者發(fā)生了不可恢復的錯誤。
這有一些例外者蠕。例如窃祝,如果onError()調用本身失敗,Observable將不會嘗試通過再次調用onError來通知觀察者踱侣,但會拋出RuntimeException粪小,OnErrorFailedException或OnErrorNotImplementedException。
從onError通知中恢復的技術
因此抡句,不是捕獲異常探膊,而是觀察者或操作者應該更通常地響應異常的onError通知。還有各種Observable運算符可用于對來自Observable的onError通知作出反應或從中恢復待榔。例如逞壁,可以使用運算符:
- 吞下錯誤并切換到備份Observable以繼續(xù)序列
- 吞下錯誤并發(fā)出默認項
- 吞下錯誤并立即嘗試重啟失敗的Observable
- 吞下錯誤并嘗試在一些退避間隔后重新啟動失敗的Observable
可以使用錯誤處理運算符中描述的運算符來實現(xiàn)這些策略流济。
吞下的意思,應該是不處理異常
RxJava特定的異常以及如何處理它們
CompositeException
這表明發(fā)生了多個異常腌闯∩粒可以使用異常的getExceptions()方法來檢索構成組合的各個異常。
MissingBackpressureException
這表示試圖將過多發(fā)出數(shù)據(jù)項應用于它的Observable姿骏。有關背壓(https://github.com/ReactiveX/RxJava/wiki/Backpressure)的Observable的解決方法糖声,請參閱Backpressure。
OnErrorFailedException
這表明Observable試圖調用其觀察者的onError()方法工腋,但該方法本身引發(fā)了異常姨丈。
OnErrorNotImplementedException
這表明Observable試圖調用其觀察者的onError()方法,但是沒有這樣的方法存在擅腰◇瘢可以通過修復Observable以使其不再達到錯誤條件,通過在觀察者中實現(xiàn)onError處理程序趁冈,或通過使用本頁其他地方描述的其中一個運算符到達觀察者之前截獲onError通知來消除此問題歼争。。
OnErrorThrowable
觀察者將這種類型的throwable傳遞給他們的觀察者的onError()處理程序渗勘。此變量的Throwable包含有關錯誤的更多信息以及錯誤發(fā)生時系統(tǒng)的Observable特定狀態(tài)沐绒,而不是標準Throwable。