一跋理、簡(jiǎn)介
RxJava是響應(yīng)式編程(Reactive Extensions)的java實(shí)現(xiàn),它基于觀察者模式的實(shí)現(xiàn)了異步編程接口筷厘。
Rxjava 3.x 的github官網(wǎng)鸣峭;
RxJava2將被支持到2021年2月28日宏所,錯(cuò)誤的會(huì)同時(shí)在2.x和3.x修復(fù),但新功能只會(huì)在3.x上添加摊溶;
Rxjava 3.0的一些改變:官方Wiki爬骤;
Rxjava 3.x 文檔可以在官方j(luò)avadoc中找到;
使用Rxjava3.x之前的準(zhǔn)備工作:
1.1 添加依賴:
//RxJava的依賴包
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
//RxAndroid的依賴包
implementation 'io.reactivex.rxjava3:rxjava:3.0.0'
1.2 將項(xiàng)目的編譯目標(biāo)設(shè)置更改為 java8:
android {
compileOptions {
sourceCompatibility JavaVersion.VERSION_1_8
targetCompatibility JavaVersion.VERSION_1_8
}
}
二莫换、Rx概念
2.1 字段含義
Reactive 直譯為反應(yīng)性的霞玄,有活性的,根據(jù)上下文一般翻譯為反應(yīng)式拉岁、響應(yīng)式
Iterable 可迭代對(duì)象坷剧,支持以迭代器的形式遍歷,許多語(yǔ)言中都存在這個(gè)概念
Observable 可觀察對(duì)象喊暖,在Rx中定義為更強(qiáng)大的Iterable惫企,在觀察者模式中是被觀察的對(duì)象,一旦數(shù)據(jù)產(chǎn)生或發(fā)生變化陵叽,會(huì)通過(guò)某種方式通知觀察者或訂閱者
Observer 觀察者對(duì)象雅任,監(jiān)聽Observable發(fā)射的數(shù)據(jù)并做出響應(yīng),Subscriber是它的一個(gè)特殊實(shí)現(xiàn)
emit 直譯為發(fā)射咨跌,發(fā)布,發(fā)出硼婿,含義是Observable在數(shù)據(jù)產(chǎn)生或變化時(shí)發(fā)送通知給Observer锌半,調(diào)用Observer對(duì)應(yīng)的方法,文章里一律譯為發(fā)射
items 直譯為項(xiàng)目寇漫,條目刊殉,在Rx里是指Observable發(fā)射的數(shù)據(jù)項(xiàng),文章里一律譯為數(shù)據(jù)州胳,數(shù)據(jù)項(xiàng)记焊。
2.2 上/下流
在RxJava中,數(shù)據(jù)以流的方式組織:Rxjava包括一個(gè)源數(shù)據(jù)流栓撞,源數(shù)據(jù)流后跟著若干個(gè)用于消費(fèi)數(shù)據(jù)流的步驟遍膜。
source
.operator1()
.operator2()
.operator3()
.subscribe(consumer)
在代碼中,對(duì)于operator2來(lái)說(shuō)瓤湘,在它前面叫做上流瓢颅,在它后面的叫做下流。
2.3 流對(duì)象
在RxJava的文檔中弛说,emission, emits, item, event, signal, data and message都被認(rèn)為在數(shù)據(jù)流中被傳遞的數(shù)據(jù)對(duì)象挽懦。
2.4 背壓(Backpressure)
當(dāng)上下游在不同的線程中,通過(guò)Observable發(fā)射木人,處理信柿,響應(yīng)數(shù)據(jù)流時(shí)冀偶,如果上游發(fā)射數(shù)據(jù)的速度快于下游接收處理數(shù)據(jù)的速度,這樣對(duì)于那些沒來(lái)得及處理的數(shù)據(jù)就會(huì)造成積壓渔嚷,這些數(shù)據(jù)既不會(huì)丟失进鸠,也不會(huì)被垃圾回收機(jī)制回收,而是存放在一個(gè)異步緩存池中圃伶,如果緩存池中的數(shù)據(jù)一直得不到處理堤如,越積越多,最后就會(huì)造成內(nèi)存溢出窒朋,這便是響應(yīng)式編程中的背壓(backpressure)問(wèn)題搀罢。
為此,RxJava帶來(lái)了backpressure的概念侥猩。背壓是一種流量的控制步驟榔至,在不知道上流還有多少數(shù)據(jù)的情形下控制內(nèi)存的使用,表示它們還能處理多少數(shù)據(jù)欺劳。背壓是指在異步場(chǎng)景中唧取,被觀察者發(fā)送事件速度遠(yuǎn)快于觀察者的處理速度的情況下,一種告訴上游的被觀察者降低發(fā)送速度的策略
在Rxjava1.0中划提,有的Observable支持背壓枫弟,有的不支持,為了解決這種問(wèn)題鹏往,2.0把支持背壓和不支持背壓的Observable區(qū)分開來(lái):支持背壓的有Flowable類淡诗,不支持背壓的有Observable,Single, Maybe and Completable類伊履。
- 在訂閱的時(shí)候如果使用FlowableSubscriber韩容,那么需要通過(guò)s.request(Long.MAX_VALUE)去主動(dòng)請(qǐng)求上游的數(shù)據(jù)項(xiàng)。如果遇到背壓報(bào)錯(cuò)的時(shí)候唐瀑,F(xiàn)lowableSubscriber默認(rèn)已經(jīng)將錯(cuò)誤try-catch群凶,并通過(guò)onError()進(jìn)行回調(diào),程序并不會(huì)崩潰哄辣;
- 在訂閱的時(shí)候如果使用Consumer请梢,那么不需要主動(dòng)去請(qǐng)求上游數(shù)據(jù),默認(rèn)已經(jīng)調(diào)用了s.request(Long.MAX_VALUE)力穗。如果遇到背壓報(bào)錯(cuò)溢陪、且對(duì)Throwable的Consumer沒有new出來(lái),則程序直接崩潰睛廊;
- 背壓策略的上游的默認(rèn)緩存池是128形真。
背壓策略:
error, 緩沖區(qū)大概在128
buffer, 緩沖區(qū)在1000左右
drop咆霜, 把存不下的事件丟棄
latest邓馒, 只保留最新的
missing, 缺省設(shè)置,不做任何操作
public enum BackpressureStrategy {
/**
* OnNext events are written without any buffering or dropping.
* Downstream has to deal with any overflow.
* <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators.
*/
MISSING,
/**
* Signals a MissingBackpressureException in case the downstream can't keep up.
*/
ERROR,
/**
* Buffers <em>all</em> onNext values until the downstream consumes it.
*/
BUFFER,
/**
* Drops the most recent onNext value if the downstream can't keep up.
*/
DROP,
/**
* Keeps only the latest onNext value, overwriting any previous value if the
* downstream can't keep up.
*/
LATEST
}
2.5 線程調(diào)度器(Schedulers)
對(duì)于Android開發(fā)者而言蛾坯,RxJava最簡(jiǎn)單的是通過(guò)調(diào)度器來(lái)方便地切換線程光酣。在不同平臺(tái)還有不同的調(diào)度器,例如我們Android的主線程:AndroidSchedulers.mainThread()脉课。
屬性 | 類型 |
---|---|
AndroidSchedulers.mainThread() | 需要引用rxandroid, 切換到UI線程 |
Schedulers.computation() | 用于計(jì)算任務(wù)救军,如事件循環(huán)和回調(diào)處理,默認(rèn)線程數(shù)等于處理器數(shù)量 |
Schedulers.io() | 用于IO密集型任務(wù)倘零,如異步阻塞IO操作唱遭,這個(gè)調(diào)度器的線程池會(huì)根據(jù)需求,它默認(rèn)是一個(gè)CacheThreadScheduler |
Schedulers.newThread() | 為每一個(gè)任務(wù)創(chuàng)建一個(gè)新線程 |
Schedulers.trampoline() | 在當(dāng)前線程中立刻執(zhí)行呈驶,如當(dāng)前線程中有任務(wù)在執(zhí)行則將其暫停拷泽, 等插入進(jìn)來(lái)的任務(wù)執(zhí)行完成之后,在將未完成的任務(wù)繼續(xù)完成袖瞻。 |
Scheduler.from(executor) | 指定Executor作為調(diào)度器 |
2.6 事件調(diào)度器
RxJava事件發(fā)出去并不是置之不顧司致,要有合理的管理者來(lái)管理它們,在合適的時(shí)機(jī)要進(jìn)行釋放事件聋迎,這樣才不會(huì)導(dǎo)致內(nèi)存泄漏脂矫,這里的管理者我們稱為事件調(diào)度器(或事件管理者)CompositeDisposable。
2.7 基類
RxJava 3 中的基類相比RxJava 2 沒啥改變霉晕,主要有以下幾個(gè)基類:
- io.reactivex.Flowable:發(fā)送0個(gè)N個(gè)的數(shù)據(jù)羹唠,支持Reactive-Streams和背壓
- io.reactivex.Observable:發(fā)送0個(gè)N個(gè)的數(shù)據(jù),不支持背壓娄昆,
- io.reactivex.Single:只能發(fā)送單個(gè)數(shù)據(jù)或者一個(gè)錯(cuò)誤
- io.reactivex.Completable:沒有發(fā)送任何數(shù)據(jù),但只處理 onComplete 和 onError 事件缝彬。
- io.reactivex.Maybe:能夠發(fā)射0或者1個(gè)數(shù)據(jù)萌焰,要么成功,要么失敗谷浅。
2.8 Observables的"熱"和"冷"
Observable什么時(shí)候開始發(fā)射數(shù)據(jù)序列扒俯?這取決于Observable的實(shí)現(xiàn),一個(gè)"熱"的Observable可能一創(chuàng)建完就開始發(fā)射數(shù)據(jù)一疯,因此所有后續(xù)訂閱它的觀察者可能從序列中間的某個(gè)位置開始接受數(shù)據(jù)(有一些數(shù)據(jù)錯(cuò)過(guò)了)撼玄。一個(gè)"冷"的Observable會(huì)一直等待,直到有觀察者訂閱它才開始發(fā)射數(shù)據(jù)墩邀,因此這個(gè)觀察者可以確保會(huì)收到整個(gè)數(shù)據(jù)序列掌猛。
在一些ReactiveX實(shí)現(xiàn)里,還存在一種被稱作Connectable的Observable眉睹,不管有沒有觀察者訂閱它荔茬,這種Observable都不會(huì)開始發(fā)射數(shù)據(jù)废膘,除非Connect方法被調(diào)用。
基本使用
需要知道的是慕蔚,RxJava以觀察者模式為骨架丐黄,有兩種常見的觀察者模式:
- Observable(被觀察者)/Observer(觀察者):不支持背壓
- Flowable(被觀察者)/Subscriber(觀察者):支持背壓
3.1 Observable/Observer用法:
Observable mObservable=Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onComplete();
}
});
Observer mObserver=new Observer<Integer>() {
//這是新加入的方法,在訂閱后發(fā)送數(shù)據(jù)之前孔飒,
//回首先調(diào)用這個(gè)方法灌闺,而Disposable可用于取消訂閱
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.e("lucas", "onNext: "+value );
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
mObservable.subscribe(mObserver);
這種觀察者模型不支持背壓:當(dāng)被觀察者快速發(fā)送大量數(shù)據(jù)時(shí),下游不會(huì)做其他處理坏瞄,即使數(shù)據(jù)大量堆積桂对,調(diào)用鏈也不會(huì)報(bào)MissingBackpressureException,消耗內(nèi)存過(guò)大只會(huì)OOM。所以惦积,當(dāng)我們使用Observable/Observer的時(shí)候接校,我們需要考慮的是,數(shù)據(jù)量是不是很大(官方給出以1000個(gè)事件為分界線作為參考)狮崩。
3.2 Flowable/Subscriber用法
Flowable.range(0,10)
.subscribe(new Subscriber<Integer>() {
Subscription sub;
//當(dāng)訂閱后蛛勉,會(huì)首先調(diào)用這個(gè)方法,其實(shí)就相當(dāng)于onStart()睦柴,
//傳入的Subscription s參數(shù)可以用于請(qǐng)求數(shù)據(jù)或者取消訂閱
@Override
public void onSubscribe(Subscription s) {
Log.w("TAG","onsubscribe start");
sub=s;
sub.request(1);
Log.w("TAG","onsubscribe end");
}
@Override
public void onNext(Integer o) {
Log.w("TAG","onNext--->"+o);
sub.request(1);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
Log.w("TAG","onComplete");
}
});
Flowable是支持背壓的诽凌,也就是說(shuō),一般而言坦敌,上游的被觀察者會(huì)響應(yīng)下游觀察者的數(shù)據(jù)請(qǐng)求侣诵,下游調(diào)用request(n)來(lái)告訴上游發(fā)送多少個(gè)數(shù)據(jù)。這樣避免了大量數(shù)據(jù)堆積在調(diào)用鏈上狱窘,使內(nèi)存一直處于較低水平杜顺。
當(dāng)然,F(xiàn)lowable也可以通過(guò)creat()來(lái)創(chuàng)建:
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
e.onComplete();
}
}
//需要指定背壓策略
, BackpressureStrategy.BUFFER);
Flowable雖然可以通過(guò)create()來(lái)創(chuàng)建蘸炸,但是你必須指定背壓的策略躬络,以保證你創(chuàng)建的Flowable是支持背壓的。
根據(jù)上面的代碼的結(jié)果輸出中可以看到搭儒,當(dāng)我們調(diào)用subscription.request(n)方法的時(shí)候穷当,不等onSubscribe()中后面的代碼執(zhí)行,就會(huì)立刻執(zhí)行到onNext方法淹禾,因此馁菜,如果你在onNext方法中使用到需要初始化的類時(shí),應(yīng)當(dāng)盡量在subscription.request(n)這個(gè)方法調(diào)用之前做好初始化的工作;
當(dāng)然铃岔,這也不是絕對(duì)的汪疮,我在測(cè)試的時(shí)候發(fā)現(xiàn),通過(guò)create()自定義Flowable的時(shí)候,即使調(diào)用了subscription.request(n)方法铲咨,也會(huì)等onSubscribe()方法中后面的代碼都執(zhí)行完之后躲胳,才開始調(diào)用onNext。
TIPS: 盡可能確保在request()之前已經(jīng)完成了所有的初始化工作纤勒,否則就有空指針的風(fēng)險(xiǎn)坯苹。
3.3 其他的觀察者
最常用的其實(shí)就是上面說(shuō)的兩種訂閱觀察者,但是一些情況下摇天,我們也會(huì)用到一些其他的一類觀察者比如
- Single/SingleObserver
- Completable/CompletableObserver
- Maybe/MaybeObserver
3.3.1 Single/SingleObserver用法
//被觀察者
Single<String> single = Single.create(new SingleOnSubscribe<String>() {
@Override
public void subscribe(SingleEmitter<String> e) throws Exception {
e.onSuccess("test");
e.onSuccess("test2");//錯(cuò)誤寫法粹湃,重復(fù)調(diào)用也不會(huì)處理,因?yàn)橹粫?huì)調(diào)用一次
}
});
//訂閱觀察者SingleObserver
single.subscribe(new SingleObserver<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(String s) {
//相當(dāng)于onNext和onComplete
Log.d("lucas", s );
}
@Override
public void onError(Throwable e) {
}
});
//運(yùn)行結(jié)果
2020-04-03 23:02:37.337 15462-15462/com.ysalliance.getfan.myapplication D/lucas: test
Single類似于Observable泉坐,不同的是为鳄,它總是只發(fā)射一個(gè)值,或者一個(gè)錯(cuò)誤通知腕让,而不是發(fā)射一系列的值(當(dāng)然就不存在背壓?jiǎn)栴})孤钦,所以當(dāng)你使用一個(gè)單一連續(xù)事件流,這樣你可以使用Single纯丸。Single觀察者只包含兩個(gè)事件偏形,一個(gè)是正常處理成功的onSuccess,另一個(gè)是處理失敗的onError觉鼻。因此俊扭,不同于Observable需要三個(gè)方法onNext, onError, onCompleted,訂閱Single只需要兩個(gè)方法:
onSuccess - Single發(fā)射單個(gè)的值到這個(gè)方法
onError - 如果無(wú)法發(fā)射需要的值坠陈,Single發(fā)射一個(gè)Throwable對(duì)象到這個(gè)方法
Single只會(huì)調(diào)用這兩個(gè)方法中的一個(gè)萨惑,而且只會(huì)調(diào)用一次,調(diào)用了任何一個(gè)方法之后仇矾,訂閱關(guān)系終止庸蔼。
Single的操作符:
Single也可以組合使用多種操作,一些操作符讓你可以混合使用Observable和Single:
詳細(xì)可參考:Single操作符
3.3.2 Completable/CompletableObserver
如果你的觀察者連onNext事件都不關(guān)心贮匕,可以使用Completable姐仅,它只有onComplete和onError兩個(gè)事件:
Completable.create(new CompletableOnSubscribe() {//被觀察者
@Override
public void subscribe(CompletableEmitter e) throws Exception {
e.onComplete();//單一onComplete或者onError
}
}).subscribe(new CompletableObserver() {//觀察者
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onComplete() {
Log.e("lucas", "onComplete: ");
}
@Override
public void onError(Throwable e) {
}
});
//打印結(jié)果
2020-04-03 23:12:08.099 16264-16264/com.ysalliance.getfan.myapplication E/lucas: onComplete:
3.3.3 Maybe/MaybeObserver
如果你有一個(gè)需求是可能發(fā)送一個(gè)數(shù)據(jù)或者不會(huì)發(fā)送任何數(shù)據(jù),這時(shí)候你就需要Maybe粗合,它類似于Single和Completable的混合體。
Maybe可能會(huì)調(diào)用以下其中一種情況(也就是所謂的Maybe):
onSuccess或者onError
onComplete或者onError
可以看到onSuccess和onComplete是互斥的存在乌昔,例子代碼如下:
//被觀察者
Maybe<String> maybe = Maybe.create(new MaybeOnSubscribe<String>() {
@Override
public void subscribe(MaybeEmitter<String> e) throws Exception {
e.onSuccess("test");//發(fā)送一個(gè)數(shù)據(jù)的情況隙疚,或者onError,不需要再調(diào)用onComplete(調(diào)用了也不會(huì)觸發(fā)onComplete回調(diào)方法)
//e.onComplete();//不需要發(fā)送數(shù)據(jù)的情況磕道,或者onError
}
});
//訂閱觀察者
maybe.subscribe(new MaybeObserver<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(String s) {
//發(fā)送一個(gè)數(shù)據(jù)時(shí)供屉,相當(dāng)于onNext和onComplete,但不會(huì)觸發(fā)另一個(gè)方法onComplete
Log.i("lucas", s);
}
@Override
public void onComplete() {
//無(wú)數(shù)據(jù)發(fā)送時(shí)候的onComplete事件
Log.i("lucas", "onComplete");
}
@Override
public void onError(Throwable e) {
}
});
//打印結(jié)果
2020-04-03 23:14:40.266 16558-16558/com.ysalliance.getfan.myapplication I/lucas: test
要轉(zhuǎn)換成其他類型的被觀察者,也是可以使用toFlowable()伶丐、toObservable()等方法去轉(zhuǎn)換悼做。
//判斷是否登陸
Maybe.just(isLogin())
//可能涉及到IO操作,放在子線程
.subscribeOn(Schedulers.newThread())
//取回結(jié)果傳到主線程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new MaybeObserver<Boolean>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(Boolean value) {
if(value){
...
}else{
...
}
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
上面就是Maybe/MaybeObserver的普通用法哗魂,你可以看到肛走,實(shí)際上,這種觀察者模式并不用于發(fā)送大量數(shù)據(jù)录别,而是發(fā)送單個(gè)數(shù)據(jù)朽色,也就是說(shuō),當(dāng)你只想要某個(gè)事件的結(jié)果(true or false)的時(shí)候组题,你可以用這種觀察者模式
3.4 事件調(diào)度器釋放事件
public class Main {
private static CompositeDisposable mRxEvent = new CompositeDisposable();
public static void main(String[] args) {
Disposable subscribe = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("俊俊俊很帥");
e.onNext("你值得擁有");
e.onNext("取消關(guān)注");
e.onNext("但還是要保持微笑");
e.onComplete();
}
}).subscribe(
new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
//對(duì)應(yīng)onNext()
System.out.println("accept=" + s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
//對(duì)應(yīng)onError()
}
}, new Action() {
@Override
public void run() throws Exception {
//對(duì)應(yīng)onComplete()
}
}, new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
//對(duì)應(yīng)onSubscribe()
}
});
mRxEvent.add(subscribe);
mRxEvent.clear();
}
}
CompositeDisposable提供的方法中葫男,都是對(duì)事件的管理
- dispose():釋放所有事件
- clear():釋放所有事件,實(shí)現(xiàn)同dispose()
- add():增加某個(gè)事件
- addAll():增加所有事件
- remove():移除某個(gè)事件并釋放
- delete():移除某個(gè)事件