官方WIKI What's different in 2.0
RxJava2已經(jīng)發(fā)布了兩周了乙嘀,相比RxJava1,它的改動(dòng)還是很大的:
Observable and Flowable
在前一個(gè)版本里backpressure
被集成到了Observable
中,官方也提供了很多方法讓我們來處理backpressure
問題禀倔。但是有一些特殊的場(chǎng)景根本無法用其來解決熬芜,最常見的例如UI事件。而不處理backpressure
有可能導(dǎo)致MissingBackpressureException
的出現(xiàn)曙强。
關(guān)于
backpressure
的概念可以看一下RxJava中backpressure這個(gè)概念的理解
為了解決這個(gè)問題残拐,在RxJava2里,引入了Flowable這個(gè)類:Observable不包含backpressure
處理碟嘴,而Flowable包含溪食。
例如:
Flowable<Long> flowable =
Flowable.create((FlowableOnSubscribe<Long>) e -> {
Observable.interval(10, TimeUnit.MILLISECONDS)
.take(Integer.MAX_VALUE)
.subscribe(e::onNext);
}, FlowableEmitter.BackpressureMode.DROP);
Observable<Long> observable =
Observable.create((ObservableOnSubscribe<Long>) e -> {
Observable.interval(10, TimeUnit.MILLISECONDS)
.take(Integer.MAX_VALUE)
.subscribe(e::onNext);
});
兩個(gè)對(duì)象都以10毫秒一次派發(fā)數(shù)據(jù),假設(shè)訂閱他們的方法都是:
i -> {
Thread.sleep(100);
Log.v("TEST", "out : " + i);
}
以100毫秒一次消費(fèi)數(shù)據(jù)娜扇,消費(fèi)數(shù)據(jù)的效率是生產(chǎn)的1/10错沃。那么
對(duì)于observable
他會(huì)按照0,1,2,3,4...
的順序依次消費(fèi)栅组,并輸出log,而沒有消費(fèi)的數(shù)據(jù)將會(huì)都存在內(nèi)存中枢析。如果在RxJava1中玉掸,內(nèi)存數(shù)據(jù)超過128個(gè)時(shí)將會(huì)拋出MissingBackpressureException
錯(cuò)誤;而在RxJava2中并不會(huì)報(bào)錯(cuò)醒叁,數(shù)據(jù)會(huì)一直放到內(nèi)存中司浪,直到發(fā)生OutOfMemoryError
。對(duì)于flowable, 在創(chuàng)建時(shí)我們?cè)O(shè)定了
FlowableEmitter.BackpressureMode.DROP
把沼,一開始他會(huì)輸出0,1,2,3....127
但之后會(huì)忽然跳躍到966,967,968 ...
啊易。中間的部分?jǐn)?shù)據(jù)由于緩存不了,被拋棄掉了饮睬。
Single
和Observable租谈,F(xiàn)lowable一樣會(huì)發(fā)送數(shù)據(jù),不同的是訂閱后只能接受到一次:
Single<Long> single = Single.just(1l);
single.subscribe(new SingleObserver<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(Long value) {
// 和onNext是一樣的
}
@Override
public void onError(Throwable e) {
}
});
普通Observable可以使用toSingle轉(zhuǎn)換:Observable.just(1).toSingle()
Completable
與Single類似捆愁,只能接受到完成(onComplete)和錯(cuò)誤(onError)
同樣也可以由普通的Observable轉(zhuǎn)換而來:Observable.just(1).toCompletable()
Base reactive interfaces
和Flowable的接口Publisher類似垦垂,Observable、Single牙瓢、Completable也有類似的基類
interface ObservableSource<T> {
void subscribe(Observer<? super T> observer);
}
interface SingleSource<T> {
void subscribe(SingleObserver<? super T> observer);
}
interface CompletableSource {
void subscribe(CompletableObserver observer);
}
因此許多操作符接受的參數(shù)從以前的具體對(duì)象劫拗,變成了現(xiàn)在的接口:
Flowable<R> flatMap(
Function<? super T, ? extends Publisher<? extends R>> mapper
);
Observable<R> flatMap(
Function<? super T, ? extends ObservableSource<? extends R>> mapper
);
------
// 以前
Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
由于接收的都是接口,在使用其他遵循Reactive-Streams
設(shè)計(jì)的第三方庫的時(shí)候矾克,就不需要把他自定義的Flowable
轉(zhuǎn)換成標(biāo)準(zhǔn)Flowable了页慷。
Subjects and Processors
io.reactivex.subjects.AsyncSubject
,
io.reactivex.subjects.BehaviorSubject
,
io.reactivex.subjects.PublishSubject
,
io.reactivex.subjects.ReplaySubject
,
io.reactivex.subjects.UnicastSubject
在RxJava2中依然存在,但現(xiàn)在他們不支持backpressure
胁附。新出現(xiàn)的
io.reactivex.processors.AsyncProcessor
,
io.reactivex.processors.BehaviorProcessor
,
io.reactivex.processors.PublishProcessor
,
io.reactivex.processors.ReplayProcessor
io.reactivex.processors.UnicastProcessor
支持backpressure
Other classes
rx.observables.ConnectableObservable
變成了io.reactivex.observables.ConnectableObservable<T>
和io.reactivex.flowables.ConnectableFlowable<T>
類似的還有rx.observables.GroupedObservable
酒繁。
Functional interfaces
需要注意的一點(diǎn)是,現(xiàn)在RxJava2的接口方法里加上了throws Exception
:
ublic interface Consumer<T> {
void accept(T t) throws Exception;
}
意味著在這些方法里調(diào)用一些會(huì)發(fā)生異常的方法不需要try-catch
了
Actions
另外大部分接口方法都按照J(rèn)ava8的接口方法名進(jìn)行了相應(yīng)的修改控妻,比如上面那個(gè)Consumer<T>
接口原來叫Action1<T>
州袒,而Action2<T>
改名成了BiConsumer
Action3
-Action9
被刪掉了,大概因?yàn)闆]人用弓候。郎哭。
Functions
同上,基本就是名字的修改和不常用類的刪除
Subscriber
RxJava1里Subscriber
只是一個(gè)空接口菇存,在新版里Subscriber
被賦予了更多的作用夸研,有幾個(gè)實(shí)現(xiàn)類可以供我們使用,例如
ResourceSubscriber<Integer> subscriber = new ResourceSubscriber<Integer>() {
@Override
public void onStart() {
request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer t) {
System.out.println(t);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done");
}
};
request()
方法可以控制當(dāng)前subscriber需要接收幾個(gè)事件依鸥。而且亥至,還可以調(diào)用subscriber.dispose()
來斷開對(duì)信號(hào)的監(jiān)聽。
同時(shí),onCompleted
方法改成了onComplete
姐扮。意味著完成時(shí)調(diào)用這個(gè)方法絮供,而不是完成后d
由于Subscription
被改掉了(下面會(huì)講到)。如果需要類似以前CompositeSubscription的用法茶敏,可以使用:
CompositeDisposable composite2 = new CompositeDisposable();
composite2.add(Flowable.range(1, 5).subscribeWith(subscriber));
注意這里需要使用subscribeWith而不是subscribe杯缺,因?yàn)閟ubscribe方法現(xiàn)在返回void
Subscription
在RxJava1里,Subscription起到的是訂閱橋梁的作用睡榆。在2中萍肆,由于Subscription本身和Reactive-Streams
里的另外一個(gè)同名概念沖突。因此把原本的Subscription改名成了Disposable
胀屿。
除了上一節(jié)里subscribe(Subscriber )
方法返回void
塘揣,其他名為subscribe
的方法都返回Disposable
相應(yīng)的,
-
CompositeSubscription
改名成了CompositeDisposable
-
SerialSubscription
和MultipleAssignmentSubscription
被合并到了SerialDisposable
里. set() 方法會(huì)處理掉就的值宿崭,而replace()方法不會(huì)亲铡。 -
RefCountSubscription
被移除了
Backpressure
在第一節(jié)Observable and Flowable里已經(jīng)說到了這個(gè)問題,在2中葡兑,Observable將不會(huì)處理backpressure奖蔓,也就不會(huì)發(fā)生MissingBackpressureException
問題,但內(nèi)存仍然會(huì)緩存多余的數(shù)據(jù)讹堤。
而在使用Flowable時(shí)吆鹤,如果配置Backpressure有問題,那么MissingBackpressureException
依然存在
Schedulers
RxJava2里仍然包含了computation
, io
, newThread
和 trampoline
這些默認(rèn)線程調(diào)度洲守。而immediate
被移除了疑务,因?yàn)樗?jīng)常被人錯(cuò)誤使用。同時(shí)Schedulers.test
也被移除了梗醇。
Entering the reactive world
將普通方法轉(zhuǎn)換成RxJava的數(shù)據(jù)源知允,在RxJava1中,提供了Observable.create()
方法叙谨,但是這個(gè)方法過于強(qiáng)大温鸽,但使用時(shí)需要注意的東西太多經(jīng)常會(huì)發(fā)生錯(cuò)誤。
因此在RxJava2中手负,把原來的fromAsync
重命名成了create
涤垫,fromAsync
是一個(gè)和create
類似但更為簡(jiǎn)單和安全的方法。這樣大部分舊代碼都能夠繼續(xù)使用虫溜。
Leaving the reactive world
之前如果想把數(shù)據(jù)源轉(zhuǎn)換成普通的數(shù)據(jù)對(duì)象雹姊,需要先轉(zhuǎn)換成BlockingObservable
。而在2中衡楞,可以調(diào)用blockingXXX
方法直接把數(shù)據(jù)源轉(zhuǎn)換成對(duì)象:
List<Integer> list = Flowable.range(1, 100).toList().blockingFirst();
有一點(diǎn)需要特別注意,在RxJava2里,不建議在Subscriber
里拋出錯(cuò)誤瘾境,這意味著下面的代碼可能有一天就不能繼續(xù)運(yùn)行了:
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
public void onNext(Integer t) {
if (t == 1) {
throw new IllegalArgumentException();
}
}
public void onError(Throwable e) {
if (e instanceof IllegalArgumentException) {
throw new UnsupportedOperationException();
}
}
public void onComplete() {
throw new NoSuchElementException();
}
};
Flowable.just(1).subscribe(subscriber);
由于上面類似的代碼實(shí)際中出現(xiàn)得很多歧杏,因此在2中提供了safeSubscribe
方法,使用它就可以繼續(xù)在subscriber
里拋出錯(cuò)誤迷守。
當(dāng)然犬绒,你可以繞過subscribe(subscriber)
這個(gè)方法,使用類似:
Flowable.just(1).subscribe(subscriber::onNext, subscriber::onError, subscriber::onComplete);
這樣的方法兑凿,之前的代碼仍然可以繼續(xù)throw錯(cuò)誤凯力。
Operator differences
操作符的改動(dòng)不大,大部分是擴(kuò)充了參數(shù)數(shù)量礼华。
或者是加入prefetch
代表可以加入預(yù)置數(shù)據(jù)咐鹤。
總結(jié)
可以明顯的看到,RxJava2最大的改動(dòng)就是對(duì)于backpressure
的處理圣絮,為此將原來的Observable
拆分成了新的Observable
和Flowable
祈惶,同時(shí)其他相關(guān)部分也同時(shí)進(jìn)行了拆分。
除此之外扮匠,他和我們最熟悉和喜愛的RxJava~
引用