前言
之前寫RxJava相關(guān)文章的時(shí)候先口,就有人想讓我談?wù)凴xJava2.0的新特性,說實(shí)話宇驾,一開始我是拒絕的倍靡。因?yàn)樵谖铱磥恚琑xJava2.0雖然是版本的重大升級(jí)课舍,但總歸還是RxJava塌西,升級(jí)一個(gè)版本還能上天是咋的他挎?了解一下它的更新文檔不就好了么?真的有必要單出一篇文章來談這個(gè)么捡需?
但是詳細(xì)的了解了RxJava2.0以及部分源碼之后办桨,我覺得還是有必要對(duì)RxJava2.0做一個(gè)說明,幫助大家對(duì)于RxJava有更好的認(rèn)識(shí)站辉。
鋪墊
假如你還不是很熟悉RxJava呢撞,或者對(duì)于背壓這個(gè)概念(2.0更新中會(huì)涉及到背壓的概念)很模糊,希望你也能讀一讀下面兩篇鋪墊的文章:
- 關(guān)于RxJava最友好的文章
- 關(guān)于RxJava最友好的文章----背壓
關(guān)于背壓的那篇文章本來是本文的一部分饰剥,因?yàn)槠^大殊霞,被剝離出去了,所以建議大家有時(shí)間也一并閱讀捐川。
正文
RxJava2.0有很多的更新脓鹃,一些改動(dòng)甚至沖擊了我之前的文章里的內(nèi)容,這也是我想寫這篇文章的原因之一古沥。不過想要寫這篇文章其實(shí)也是有難度的瘸右,因?yàn)橄嚓P(guān)的資料去其實(shí)是很少的,還得自己硬著頭皮上....不過俗話說得好岩齿,有困難要上太颤,沒有困難創(chuàng)造困難也要上。
在這里盹沈,我會(huì)按照我們之前關(guān)于RxJava的文章的講述順序:觀察者模式龄章,操作符,線程調(diào)度乞封,這三個(gè)方面依次看有哪些更新做裙。
添加依賴
這個(gè)估計(jì)得放在最前面。
Android端使用RxJava需要依賴新的包名:
//RxJava的依賴包(我使用的最新版本)
compile 'io.reactivex.rxjava2:rxjava:2.0.1'
//RxAndroid的依賴包
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
觀察者模式
首先聲明肃晚,RxJava以觀察者模式為骨架锚贱,在2.0中依然如此。
不過此次更新中关串,出現(xiàn)了兩種觀察者模式:
- Observable(被觀察者)/Observer(觀察者)
- Flowable(被觀察者)/Subscriber(觀察者)
RxJava2.X中拧廊,Observeable用于訂閱Observer,是不支持背壓的晋修,而Flowable用于訂閱Subscriber吧碾,是支持背壓(Backpressure)的。
關(guān)于背壓這個(gè)概念以及它在1.0版本中的缺憾在上一篇文章中我已經(jīng)介紹到了墓卦,如果你不是很清楚倦春,我在這里在做一個(gè)介紹:背壓是指在異步場(chǎng)景中,被觀察者發(fā)送事件速度遠(yuǎn)快于觀察者的處理速度的情況下,一種告訴上游的被觀察者降低發(fā)送速度的策略溅漾,在1.0中山叮,關(guān)于背壓最大的遺憾,就是集中在Observable這個(gè)類中添履,導(dǎo)致有的Observable支持背壓,有的不支持脑又。為了解決這種缺憾暮胧,新版本把支持背壓和不支持背壓的Observable區(qū)分開來。
Observable/Observer
Observable正常用法:
Observable mObservable=Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onComplete();
}
});
Observer mObserver=new Observer<Integer>() {
//這是新加入的方法问麸,在訂閱后發(fā)送數(shù)據(jù)之前往衷,
//回首先調(diào)用這個(gè)方法,而Disposable可用于取消訂閱
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
mObservable.subscribe(mObserver);
這種觀察者模型是不支持背壓的严卖。
啥叫不支持背壓呢席舍?
當(dāng)被觀察者快速發(fā)送大量數(shù)據(jù)時(shí),下游不會(huì)做其他處理哮笆,即使數(shù)據(jù)大量堆積来颤,調(diào)用鏈也不會(huì)報(bào)MissingBackpressureException,消耗內(nèi)存過大只會(huì)OOM
我在測(cè)試的時(shí)候,快速發(fā)送了100000個(gè)整形數(shù)據(jù)稠肘,下游延遲接收福铅,結(jié)果被觀察者的數(shù)據(jù)全部發(fā)送出去了,內(nèi)存確實(shí)明顯增加了项阴,遺憾的是沒有OOM滑黔。
所以,當(dāng)我們使用Observable/Observer的時(shí)候环揽,我們需要考慮的是略荡,數(shù)據(jù)量是不是很大(官方給出以1000個(gè)事件為分界線,僅供各位參考)
Flowable/Subscriber
Flowable.range(0,10)
.subscribe(new Subscriber<Integer>() {
Subscription sub;
//當(dāng)訂閱后歉胶,會(huì)首先調(diào)用這個(gè)方法汛兜,其實(shí)就相當(dāng)于onStart(),
//傳入的Subscription s參數(shù)可以用于請(qǐng)求數(shù)據(jù)或者取消訂閱
@Override
public void onSubscribe(Subscription s) {
Log.w("TAG","onsubscribe start");
sub=s;
sub.request(1);
Log.w("TAG","onsubscribe end");
}
@Override
public void onNext(Integer o) {
Log.w("TAG","onNext--->"+o);
sub.request(1);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
Log.w("TAG","onComplete");
}
});
輸出如下:
onsubscribe start
onNext--->0
onNext--->1
onNext--->2
...
onNext--->10
onComplete
onsubscribe end
Flowable是支持背壓的跨扮,也就是說序无,一般而言,上游的被觀察者會(huì)響應(yīng)下游觀察者的數(shù)據(jù)請(qǐng)求衡创,下游調(diào)用request(n)來告訴上游發(fā)送多少個(gè)數(shù)據(jù)帝嗡。這樣避免了大量數(shù)據(jù)堆積在調(diào)用鏈上,使內(nèi)存一直處于較低水平璃氢。
當(dāng)然哟玷,F(xiàn)lowable也可以通過creat()來創(chuàng)建:
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
e.onComplete();
}
}
//需要指定背壓策略
, BackpressureStrategy.BUFFER);
Flowable雖然可以通過create()來創(chuàng)建,但是你必須指定背壓的策略,以保證你創(chuàng)建的Flowable是支持背壓的(這個(gè)在1.0的時(shí)候就很難保證巢寡,可以說RxJava2.0收緊了create()的權(quán)限)喉脖。
根據(jù)上面的代碼的結(jié)果輸出中可以看到,當(dāng)我們調(diào)用subscription.request(n)方法的時(shí)候抑月,不等onSubscribe()中后面的代碼執(zhí)行树叽,就會(huì)立刻執(zhí)行到onNext方法,因此谦絮,如果你在onNext方法中使用到需要初始化的類時(shí)题诵,應(yīng)當(dāng)盡量在subscription.request(n)這個(gè)方法調(diào)用之前做好初始化的工作;
當(dāng)然,這也不是絕對(duì)的层皱,我在測(cè)試的時(shí)候發(fā)現(xiàn)性锭,通過create()自定義Flowable的時(shí)候,即使調(diào)用了subscription.request(n)方法叫胖,也會(huì)等onSubscribe()方法中后面的代碼都執(zhí)行完之后草冈,才開始調(diào)用onNext。
TIPS: 盡可能確保在request()之前已經(jīng)完成了所有的初始化工作瓮增,否則就有空指針的風(fēng)險(xiǎn)怎棱。
其他觀察者模式
當(dāng)然,除了上面這兩種觀察者钉赁,還有一類觀察者
- Single/SingleObserver
- Completable/CompletableObserver
- Maybe/MaybeObserver
其實(shí)這三者都差不多蹄殃,Maybe/MaybeObserver可以說是前兩者的復(fù)合體,因此以Maybe/MaybeObserver為例簡(jiǎn)單介紹一下這種觀察者模式的用法
//判斷是否登陸
Maybe.just(isLogin())
//可能涉及到IO操作你踩,放在子線程
.subscribeOn(Schedulers.newThread())
//取回結(jié)果傳到主線程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new MaybeObserver<Boolean>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(Boolean value) {
if(value){
...
}else{
...
}
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
上面就是Maybe/MaybeObserver的普通用法诅岩,你可以看到,實(shí)際上带膜,這種觀察者模式并不用于發(fā)送大量數(shù)據(jù)吩谦,而是發(fā)送單個(gè)數(shù)據(jù),也就是說膝藕,當(dāng)你只想要某個(gè)事件的結(jié)果(true or false)的時(shí)候式廷,你可以用這種觀察者模式
這是上面那些被觀察者的上層接口:
//Observable接口
interface ObservableSource<T> {
void subscribe(Observer<? super T> observer);
}
//Single接口
interface SingleSource<T> {
void subscribe(SingleObserver<? super T> observer);
}
//Completable接口
interface CompletableSource {
void subscribe(CompletableObserver observer);
}
//Maybe接口
interface MaybeSource<T> {
void subscribe(MaybeObserver<? super T> observer);
}
//Flowable接口
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
其實(shí)我們可以看到,每一種觀察者都繼承自各自的接口芭挽,這也就把他們能完全的區(qū)分開滑废,各自獨(dú)立(特別是Observable和Flowable),保證了他們各自的拓展或者配套的操作符不會(huì)相互影響。
例如flatMap操作符實(shí)現(xiàn):
//Flowable中flatMap的定義
Flowable<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper);
//Observable中flatMap的定義
Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper);
假如你想為Flowable寫一個(gè)自定義的操作符袜爪,那么只要保證Function< Publisher >中的類型實(shí)現(xiàn)了Publisher接口即可蠕趁。這么說可能很抽象,大家不理解其實(shí)也沒關(guān)系辛馆,因?yàn)椴⒉煌扑]大家自定義操作符俺陋,RxJava中的操縱符的組合已經(jīng)可以滿足大家的需求了。
當(dāng)然,你也會(huì)注意到上面那些接口中的subscribe()方法的返回類型為void了腊状,在1.X中诱咏,這個(gè)方法一般會(huì)返回一個(gè)Subscription對(duì)象,用于取消訂閱〗赏冢現(xiàn)在袋狞,這個(gè)功能的對(duì)象已經(jīng)被放到觀察者Observer或者subscriber的內(nèi)部實(shí)現(xiàn)方法中了,
Flowable/Subscriber
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
public interface Subscription {
public void request(long n);
public void cancel();
}
上面的實(shí)例中映屋,onSubscribe(Subscription s)傳入的參數(shù)s就肩負(fù)著取消訂閱的功能硕并,當(dāng)然,他也可以用于請(qǐng)求上游的數(shù)據(jù)秧荆。
在Observable/observer中,傳入的參數(shù)是另一個(gè)對(duì)象
Observable/Observer
public interface Observer<T> {
void onSubscribe(Disposable d);
void onNext(T value);
void onError(Throwable e);
void onComplete();
}
public interface Disposable {
/**
* Dispose the resource, the operation should be idempotent.
*/
void dispose();
/**
* Returns true if this resource has been disposed.
* @return true if this resource has been disposed
*/
boolean isDisposed();
}
在Observer接口中埃仪,onSubscribe(Disposable d)方法傳入的Disposable也是用于取消訂閱乙濒,基本功能是差不多的,只不過命名不一致卵蛉,大家知道就好颁股。
其實(shí)這種設(shè)計(jì)可以說還是符合邏輯的,因?yàn)槿∠嗛嗊@個(gè)動(dòng)作就只有觀察者(Observer等)才能做的傻丝,現(xiàn)在把它并入到觀察者內(nèi)部甘有,也算順理成章吧。
最后再提一點(diǎn)更新葡缰,就是被觀察者不再接收null作為數(shù)據(jù)源了亏掀。
操作符相關(guān)
這一塊其實(shí)可以說沒什么改動(dòng),大部分之前你用過的操作符都沒變泛释,即使有所變動(dòng)滤愕,也只是包名或類名的改動(dòng)。大家可能經(jīng)常用到的就是Action和Function怜校。
Action相關(guān)
之前我在文章里介紹過關(guān)于Action這類接口间影,在1.0中,這類接口是從Action0茄茁,Action1...往后排(數(shù)字代表可接受的參數(shù))魂贬,現(xiàn)在做出了改動(dòng)
Rx1.0-----------Rx2.0
Action1--------Action
Action1--------Consumer
Action2--------BiConsumer
后面的Action都去掉了,只保留了ActionN
Function相關(guān)
同上裙顽,也是命名方式的改變
上面那兩個(gè)類付燥,和RxJava1.0相比,他們都增加了throws Exception锦庸,也就是說机蔗,在這些方法做某些操作就不需要try-catch。
例如:
Flowable.just("file.txt")
.map(name -> Files.readLines(name))
.subscribe(lines -> System.out.println(lines.size()), Throwable::printStackTrace);
Files.readLines(name)這類io方法本來是需要try-catch的,現(xiàn)在直接拋出異常萝嘁,就可以放心的使用lambda 梆掸,保證代碼的簡(jiǎn)潔優(yōu)美。
doOnCancel/doOnDispose/unsubscribeOn
以doOnCancel為例牙言,大概就是當(dāng)取消訂閱時(shí)酸钦,會(huì)調(diào)用這個(gè)方法,例如:
Flowable.just(1, 2, 3)
.doOnCancel(() -> System.out.println("Cancelled!"))
.take(2)
.subscribe(System.out::println);
take新操符會(huì)取消后面那些還未被發(fā)送的事件咱枉,因而會(huì)觸發(fā)doOnCancel
其他的一些操作符基本沒變卑硫,或者只是改變了名字,在這里就不一一介紹了蚕断,需要提一下的是欢伏,很多操作符都有兩套,一套用于Observable亿乳,一套用于Flowable硝拧。
線程調(diào)度
可以說這一塊兒基本也沒有改動(dòng),如果一定要說的話葛假。
- 那就是去掉了Schedulers.immediate()這個(gè)線程環(huán)境
- 移除的還有Schedulers.test()(我好像從來沒用過這個(gè)方法)
- io.reactivex.Scheduler這個(gè)抽象類支持直接調(diào)度自定義線程任務(wù)(這個(gè)我也沒怎么用)
補(bǔ)充
如果你想把自己的RxJava1.0的遷移到2.0的版本障陶,可以使用這個(gè)庫RxJava2Interop ,在github上可以找到,它可以在Rxjava1.0和2.0之間相互轉(zhuǎn)換聊训,也就是說抱究,不僅可以把1.0的代碼遷移到2.0,你還可以把2.0的代碼遷移到1.0,哈哈带斑。
補(bǔ)充2
在RxJava1.0中鼓寺,有的人會(huì)使用CompositeSubscription來收集Subscription,來統(tǒng)一取消訂閱遏暴,現(xiàn)在在RxJava2.0中侄刽,由于subscribe()方法現(xiàn)在返回void,那怎么辦呢朋凉?
其實(shí)在RxJava2.0中州丹,Flowable提供了subscribeWith這個(gè)方法可以返回當(dāng)前訂閱的觀察者,并且通過ResourceSubscriber DisposableSubscriber等觀察者來提供 Disposable的接口杂彭。
所以墓毒,如果想要達(dá)成RxJava1.0的效果,現(xiàn)在應(yīng)該是這樣做:
CompositeDisposable composite = new CompositeDisposable();
composite.add(Flowable.range(1, 8).subscribeWith(subscriber));
這個(gè)subscriber 應(yīng)該是 ResourceSubscriber 或者 DisposableSubscriber 的實(shí)例亲怠。
結(jié)尾
其實(shí)從整篇文章的分析來看所计,改動(dòng)最大的還是觀察者模式的實(shí)現(xiàn),被拆分和細(xì)化了团秽,主要分成了Observable和Flowable兩大類主胧,當(dāng)然還有與之相關(guān)聯(lián)的其他變動(dòng)叭首,總體來看這一版本可以說是對(duì)于觀察者和被觀察者的重構(gòu)。
RxJava2.0的范例代碼我沒精力去寫了踪栋,也正巧有位外國朋友已經(jīng)寫了RxJava2.0的demo,下面是他的項(xiàng)目地址:
在github上搜索:RxJava2-Android-Samples
當(dāng)然焙格,學(xué)習(xí)2.0 的過程中有什么問題也可以在這里留言討論。
附錄
下面我截圖展示一下2.0相對(duì)于1.0的一些改動(dòng)的細(xì)節(jié)夷都,僅做參考眷唉。
其實(shí)這些都是官方給出的列表,截圖在這里只是方便大家觀摩囤官。