前言
第一次接觸RxJava是在學(xué)習(xí)Retrofit的時候,那個時候經(jīng)逞冶看到別人都是Retrofit+RxJava一起使用的,于是后來自己也上網(wǎng)研究了一下,經(jīng)過一段時間的學(xué)習(xí)總算是把RxJava給弄懂了,在這里就分享一下我的使用心得,給想入門的同學(xué)引一引路.
概述
RxJava是什么
對于RxJava,官方給的說法是一個使用Java虛擬機觀察序列異步和基于事件的程序庫;繞過這些官方語言,以我自己的話來說,它就是一個用觀察者模式對程序進行異步控制的這么一個庫.
觀察者模式
既然提到了RxJava的核心是觀察者模式,那么這里就簡單的說說什么是觀察者模式.觀察者模式完美的將觀察者和被觀察的對象分離開,在這種模式中有兩個對象:觀察者(Observer)和被觀察者(Observerable),他們兩個通過訂閱(Subscribe)的方式產(chǎn)生聯(lián)系,當(dāng)兩者建立起聯(lián)系以后,那么當(dāng)被觀察者作出一些變化之后,觀察者能夠立即獲知到,并根據(jù)被觀察者作出的變化作出相應(yīng)的反應(yīng).
舉個例子:護士和病人即觀察者和被觀察者,當(dāng)病人不舒服的時候就按鈴,護士聽到鈴聲以后趕來照顧病人,它們之間通過鈴作為紐帶把它們聯(lián)系到一起,這樣一來護士就不需要時時刻刻盯著病人
RxJava的優(yōu)點
說了那么多,那到底RxJava有啥好處呢,異步和簡潔應(yīng)該是它最大的優(yōu)點,它可以隨時隨地的切換線程,同時又能保證代碼的清晰簡明.至于它是怎么做到的,后面我會一點一點的來分析.
入門
環(huán)境搭建
1.RxJava的GitHub地址:https://github.com/ReactiveX/RxJava
2.RxAndroid的GitHub地址:https://github.com/ReactiveX/RxAndroid
3.導(dǎo)入Jar包
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.1.0'
compile 'com.squareup.retrofit2:adapter-rxjava2:latest.version'
rxandroid是一個對rxjava的擴展庫,這個庫里包含了一些和Android有關(guān)的內(nèi)容,下面我會具體介紹到
入門案例
實現(xiàn)RxJava的步驟分三步
第一步:創(chuàng)建被觀察者(Observable)
第二步:創(chuàng)建觀察者(Observer)
第三步:訂閱(Subscribe),即讓觀察者和被觀察者產(chǎn)生聯(lián)系
基于以上理論,下面我們用代碼來演示一下
1.創(chuàng)建Observable
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("123");
e.onComplete();
}
});
2.創(chuàng)建Observer
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.i(TAG,"onSubscribe");
}
@Override
public void onNext(@NonNull String s) {
Log.i(TAG,"onNext:"+s);
}
@Override
public void onError(@NonNull Throwable e) {
Log.i(TAG,"onError");
}
@Override
public void onComplete() {
Log.i(TAG,"onComplete");
}
};
3.訂閱
observable.subscribe(observer);
最終輸出結(jié)果為:onSubscribe念搬、onNext:123、onComplete,可以看到在創(chuàng)建Observable的時候,傳入了一個ObservableOnSubscribe對象作為參數(shù),它決定了事件序列執(zhí)行順序,而observer里的方法決定了,執(zhí)行內(nèi)容,最后通過訂閱將兩個對象綁定在一起
create()
是最基本的一種創(chuàng)建Observeable的方法,基于這個方法,RxJava為我們提供了很多種方法來創(chuàng)建事件隊列.
fromArray(T...item)
傳入多個參數(shù),并將這些參數(shù)依次發(fā)給觀察者
just(T...)
和fromArray(T...item)
差不多
操作符
可以說RxJava那么受歡迎,很大程度得益于操作符的存在,操作符可以讓事件隊列在發(fā)送的過程中進行轉(zhuǎn)換加工處理,比如你創(chuàng)建的時候往隊列里放了一個香蕉,但是經(jīng)過中間轉(zhuǎn)換以后,香蕉變成了蘋果,這么說可能會比較抽象,下面我就簡單的介紹幾個常用操作符,來加深一下對它的理解
map
Observable.fromArray("小明") //輸入內(nèi)容
.map(new Function<String, String>() { //map轉(zhuǎn)換
@Override
public String apply(@NonNull String s) throws Exception {
return s + "的爸爸";
}
})
.subscribe(new Consumer<String>() { //訂閱
@Override
public void accept(@NonNull String s) throws Exception {
Log.i(TAG, s);
}
});
這里出現(xiàn)了幾個新東西Function,Consumer,咱們一一來解釋其作用,首先是Function<T,R>
,它就是一個接口,作用是傳入一個值,然后傳出另一個值,T代表傳入值的類型,R代表傳出值的類型,用它可以對輸入值進行一些運算,返回一些其他值.map的作用就是將事件序列中的對象或整個序列進行加工處理鸭栖,轉(zhuǎn)換成不同的事件或事件序列.而Consumer<T>
則是一個接受單一值的函數(shù)接口,作用類似于Observer里的onNext
用來處理接收過來的值;現(xiàn)在我們可以這樣來理解:Observable原先的序列中僅僅存儲了小明這個事件,而如今經(jīng)過map的變換,就把事件源中的小明改造成了小明的爸爸這一事件
flatMap
flatMap
算是一個比較有用的變換,它的作用和map
類似,但是我覺得它又比map
高級多了,到底是哪里高級呢,下面我們通過一段代碼來分析一下
Observable.fromArray("1","2")
.flatMap(new Function<String, Observable<String>>() {
@Override
public Observable<String> apply(@NonNull String s) throws Exception {
Log.e(TAG,"原來的事件:"+s);
return Observable.fromArray("一","二");
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.i(TAG,"改造后的事件:"+s);
}
});
打印的Log如下
從結(jié)果看出
map
和flatMap
類似,都是對數(shù)據(jù)進行了變換,不同的是flatMap
返回的是Observeable
對象,我們可以把之前的事件處理以后放到一個新的Observeable
中,需要注意的是這個新的Observeable
并不是直接發(fā)送事件,它還是被放到了原先的Observeable
中由原先的Observeable
來發(fā)送事件,下面畫個圖來理解一下這個過程從圖里可以看出,原先的
Observeable
序列中的事件經(jīng)過變換,又變成了一個新的序列,最后再由原先Observeable
把這些事件統(tǒng)一交給Subscriber
的回調(diào)方法
當(dāng)然了,類似的操作符還有很多,比如zip操作符,它就是把多個數(shù)據(jù)流合并到一處,最終發(fā)送到一個地方;總而言之,操作符的作用就是在事件發(fā)送過程中對數(shù)據(jù)進行處理的.
線程調(diào)度(Scheduler)
說到這線程調(diào)度嘛,這就是RxJava另一個牛逼的地方了;利用RxJava給的Scheduler咱們可以隨時隨地的切換線程,這就是RxJava實現(xiàn)異步的方法,具體怎么使用呢,且聽我娓娓道來.
案例
在看代碼前,有幾個概念需要先說明一下,subscribeOn()
和 observeOn()
這兩個方法可以對線程進行切換,而具體切換到哪個線程則是由調(diào)度器Scheduler
來控制,下面講講兩個方法的使用以及作用范圍.
observeOn
observeOn()可以多次調(diào)用,一般放在map和flatmap等操作符前,用來控制操作符中的操作,作用范圍在下一個observeOn出現(xiàn)前
observeable.just(T...)
.observeOn(Schedulers.io())
.map(1)
.flatMap(2);
.observeOn(Schedulers.newThread())
.map(3)
.subscribe(4)
結(jié)果是1、2是在io線程里執(zhí)行的;3异袄、4是在newThread線程里執(zhí)行的
subscribeOn
subscribeOn()位置放在哪里都行,但只能調(diào)用一次;一般就是指定Observable創(chuàng)建和doOnSubscribe的線程;比如:Observeable.create(...)
;Observeable.fromArray(...)
或者Observeable.doOnSubscribe(...)
都由subscribeOn
來控制,如果程序中沒有調(diào)用過observeOn
,而只調(diào)用了subscribeOn
,那么程序里所有的map
舶掖、flatMap
都會在subscribeOn
指定的線程中執(zhí)行直到出現(xiàn)下個observeOn
Observable.fromArray(...)
.map(1)
.flatMap(2)
.subscribeOn(Schedulers.io())
1卑硫、2都是在io線程中執(zhí)行, 除非在程序中某處執(zhí)行了observeOn
常用的Scheduler
調(diào)度器類型 | 作用范圍 |
---|---|
AndroidSchedulers.mainThread() | AndroidUI線程 |
Schedulers.io() | IO線程(讀寫文件徒恋、數(shù)據(jù)庫和網(wǎng)絡(luò)信息交互等)都可以放在IO線程里來執(zhí)行,它的模式和newThread()差不多,但效率比newThread()高 |
Schedulers.newThread() | 開啟一個新線程 |
Schedulers.single() | 單例線程,可以把兩段程序放在同一個線程里順序執(zhí)行 |
背壓(Backpressure)
概念
在rxjava中會經(jīng)常遇到一種情況就是被觀察者發(fā)送消息太快以至于它的操作符或者訂閱者不能及時處理相關(guān)的消息。那么隨之而來的就是如何處理這些未處理的消息欢伏。舉個例子入挣,使用zip操作符將兩個無限大的Observable壓縮在一起,其中一個被觀察者發(fā)送消息的速度是另一個的兩倍硝拧。一個比較不靠譜的做法就是把發(fā)送比較快的消息緩存起來径筏,當(dāng)比較慢的Observable發(fā)送消息的時候取出來并將他們結(jié)合在一起。這樣做就使得rxjava變得笨重而且十分占用系統(tǒng)資源障陶。
解決這個問題的方法就是使用背壓策略,在RxJava1.0中,如果使用Observeable的時候產(chǎn)生了背壓問題系統(tǒng)是會拋MissingBackpressureException異常的,而在RxJava2.0中Observable不再支持背壓滋恬,多出了Flowable
(也是被觀察者)來支持背壓,當(dāng)然了這里既然提到了一個新的被觀察者,那么就不得不提它的搭檔觀察者了Subscriber
通常這兩個是一起使用的,下面來看看使用Flowable
是如何使用背壓策略的
Flowable.create(FlowableOnSubscribe<T> source, BackpressureStrategy mode)
Flowable的幾種Backpressure策略
MISSING
如果流的速度無法保持同步,可能會拋出MissingBackpressureException或IllegalStateException。
BUFFER
上游不斷的發(fā)出onNext請求抱究,直到下游處理完恢氯,也就是和Observable一樣了,緩存池?zé)o限大鼓寺,最后直到程序崩潰
ERROR
會在下游跟不上速度時拋出MissingBackpressureException勋拟。
DROP
會在下游跟不上速度時把onNext的值丟棄。
LATEST
會一直保留最新的onNext的值妈候,直到被下游消費掉敢靡。
下面咱們通過一段代碼來具體分析
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
for(int i = 0; i < 129; ++i){
e.onNext(i);
}
e.onComplete();
}
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(3);
}
@Override
public void onNext(Integer s) {
Log.d(TAG, "onNext: " + s);
}
@Override
public void onError(Throwable t) {
Log.d(TAG, "onError"+t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
在Flowable中背壓采取拉取響應(yīng)的方式來進行水流控制,也就是說Subscription是控制上下游水流的主要方式苦银,一般的啸胧,我們需要調(diào)用Subscription.request()來傳入一個下游目前能處理的事件的數(shù)量,不過你不傳也是可以的,因為在Flowable內(nèi)部已經(jīng)設(shè)置了一個默認值(128),具體的可以去看Flowable的源碼,如上面所寫,我設(shè)置了s.request(3);表示下游只能出來3個事件,而上游我傳了129個事件進來,下游顯然是無法處理的,因此程序會拋出MissingBackpressureException異常;如果感興趣的同學(xué)可以試試其他幾種策略模式,這里就不再贅述了.
補充
也不知道該補充些啥,那么簡單的介紹幾個知識點吧(可能有點亂,湊合著看吧)
- 用
Observablet.timer(long delay, TimeUnit unit)
可以定時發(fā)送事件 -
Filter
操作符可以在數(shù)據(jù)數(shù)據(jù)發(fā)送過程中過濾掉一些數(shù)據(jù) - 一般來說,Observable不會拋異常。它會調(diào)用 onError 終止Observable序列,以此通知所有的觀察者發(fā)生了一個不可恢復(fù)的錯誤. 但是,也存在一些異常.例如:如果
onError
調(diào)用失敗了,Observable不會嘗試再次調(diào)用onError
去通知觀察者,它會拋出RuntimeException,OnErrorFailedException或者OnErrorNotImplementedException.
4.暫時就這些了,以后想到的話我還會繼續(xù)補充
結(jié)語
看完上面的介紹,首先你得明白我們到底拿RxJava來做什么,怎么做,有啥好處,如果你把上面的內(nèi)容都看懂了,那么恭喜你你已經(jīng)入門了;剩下的只需要在實際開發(fā)中去應(yīng)用了,最后:如果有寫的不對的地方還請見諒,也歡迎各位大神指正.