1.Rxjava1中的背壓
Rxjava2中有這么一個(gè)被觀察者Flowable敷扫,同樣作為被觀察者晴楔,它和Observable有什么區(qū)別呢摆昧,在Rxjava2中撩满,Observable不再支持背壓,而新增的Flowable支持背壓,何為背壓伺帘,就是上游發(fā)送事件的速度大于下游處理事件的速度所產(chǎn)生的現(xiàn)象昭躺。
我們來看個(gè)例子,先把rxjava切換到rxjava1.0:
implementation 'io.reactivex:rxjava:1.1.6'
implementation 'io.reactivex:rxandroid:1.2.1'
然后執(zhí)行如下代碼:
//被觀察者在主線程中伪嫁,每1ms發(fā)送一個(gè)事件
Observable.interval(1, TimeUnit.MILLISECONDS)
//觀察者每1s才處理一個(gè)事件
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.w("tag", "---->" + aLong);
}
});
執(zhí)行結(jié)果如下:
我特?說好的背壓呢张咳,說好的異常呢帝洪,不要慌,因?yàn)樯厦娴拇a是同步的情況,都是運(yùn)行在祝線程的,所以同步的情況下觅赊,被觀察者每發(fā)送一個(gè)事件,觀察者就會處理一個(gè)事件砰奕,等觀察者處理完當(dāng)前事件后,被觀察者才會繼續(xù)發(fā)送事件提鸟,兩者分工明確军援,恩愛和睦,不存在發(fā)送速度不一致的情況称勋。
下面我們來看下異步的情況:
//被觀察者在主線程中胸哥,每1ms發(fā)送一個(gè)事件
Observable.interval(1, TimeUnit.MILLISECONDS)
.observeOn(Schedulers.newThread())
//觀察者在子線程中每1s處理一個(gè)事件
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.w("tag", "---->" + aLong);
}
});
運(yùn)行后就會出現(xiàn)如下異常:
出現(xiàn)了背壓的情況,拋出了MissingBackpressureException異常赡鲜,異步情況下被觀察者發(fā)送事件是比較暴力的空厌,一次性全部發(fā)完,放在緩存池蝗蛙,然后觀察者一條條慢慢去處理蝇庭,發(fā)送過快就會出現(xiàn)背壓的情況.
背壓產(chǎn)生的條件:必須是異步的場景下才會出現(xiàn)醉鳖,即被觀察者和觀察者處于不同的線程中捡硅。
rxjava1中默認(rèn)的緩存池大小是16,當(dāng)事件超過就會出現(xiàn)MissingBackpressureException盗棵,看如下例子:
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
for (int i = 0; i < 17; i++) {
Log.w("tag", "send ----> i = " + i);
subscriber.onNext("i = "+i);
}
}
})
.subscribeOn(Schedulers.newThread())
//將觀察者的工作放在新線程環(huán)境中
.observeOn(Schedulers.newThread())
//觀察者處理每1000ms才處理一個(gè)事件
.subscribe(new Action1<String>() {
@Override
public void call(String value) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.w("tag", "---->" + value);
}
});
你看:
嗯壮韭,默認(rèn)的緩存池為什么是16,這個(gè)問題問的好纹因,因?yàn)槿思襯xjava給的默認(rèn)值就是16啊喷屋,不信你看:
public final <B> Observable<List<T>> buffer(Observable<B> boundary) {
return buffer(boundary, 16);
}
rxjava1中也提供了處理背壓的操作符onBackpressureBuffer和onBackpressureDrop,下面我們來簡單看下onBackpressureBuffer:
//被觀察者在主線程中瞭恰,每1ms發(fā)送一個(gè)事件
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
for (int i = 0; i < 10000; i++) {
Log.w("tag", "send ----> i = " + i);
subscriber.onNext("i = "+i);
}
}
})
.onBackpressureBuffer()
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.subscribe(new Action1<String>() {
@Override
public void call(String value) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.w("tag", "---->" + value);
}
});
運(yùn)行結(jié)果如下:
其實(shí)onBackpressureBuffer也就是增加了緩存池的大小屯曹,這個(gè)值為Long.MAX_VALUE,當(dāng)然我們也可以自己指定onBackpressureBuffer(size)的大小:
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
for (int i = 0; i < 100; i++) {
Log.w("tag", "send ----> i = " + i);
subscriber.onNext("i = "+i);
}
}
})
.onBackpressureBuffer(100)
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.subscribe(new Action1<String>() {
@Override
public void call(String value) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.w("tag", "---->" + value);
}
});
onBackpressureDrop的作用是當(dāng)觀察者來不及處理事件的時(shí)候恶耽,會把事件給丟棄掉密任,而onBackpressureLatest操作符表示當(dāng)被觀察者Observable發(fā)出事件的速度比觀察者消耗得要快,觀察者會接收Observable最新發(fā)出的事件進(jìn)行處理偷俭,這兩種情況大家可以自行測試感受下浪讳。
從上面的例子可以看出,在rxjava1中涌萤,interval操作符默認(rèn)是不支持背壓的淹遵,我們來試試range操作符:
Observable.range(1,10000)
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer value) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.w("tag", "---->" + value);
}
});
運(yùn)行結(jié)果如下:
尼瑪,竟然沒有出現(xiàn)背壓负溪,納尼透揣?
表情包好像放錯(cuò)了,走錯(cuò)片場了笙以,哈哈哈淌实,難道range操作符有毛病,不應(yīng)該啊猖腕,最后經(jīng)過一番查找拆祈,發(fā)現(xiàn)問題在observeOn操作符上,observeOn這個(gè)操作符內(nèi)部有一個(gè)緩沖區(qū)倘感,Android環(huán)境下長度是16放坏,它會告訴range最多發(fā)送16個(gè)事件,充滿緩沖區(qū)即可老玛。
這樣可以看出淤年,之前使用的interval操作符是不支持背壓的,而range則支持背壓蜡豹,那么到底什么樣的Observable支持背壓或不支持背壓呢麸粮?
其實(shí)在rxjava1中,不是所有Observable都支持背壓镜廉,從上面的例子也可以看出來這一點(diǎn)弄诲,我們知道Observable有hot和cold之分,rxjava1中hot observable是不支持背壓的娇唯,而cold observable中也有一部分不支持背壓齐遵,這里不再深究,想繼續(xù)了解可以自行g(shù)oogle塔插,另外一個(gè)原因是現(xiàn)在都tm Rxjava2了梗摇,我還在這扯rxjava1,罪過罪過想许,我也是為了引出問題伶授。
簡單扯一下解決背壓的思路断序,無非是限制發(fā)送的速度,俗稱限流糜烹,很多操作符都可以做到這些逢倍,比如sample在一段時(shí)間內(nèi)只處理最后一個(gè)數(shù)據(jù)等,也可以使用rxjava1中提供的onBackpressureBuffer景图,onBackpressureDrop较雕,onBackpressureLatest。
雖然rxjava1也有處理背壓的方法挚币,但設(shè)計(jì)并不完美亮蒋,緩存池大小只有16,而且被觀察者無法得知下游觀察者對事件的處理速度妆毕,一次性把事件拋給了下游觀察者慎玖,所以rxjava2中對背壓進(jìn)行了改進(jìn)。
2.Rxjava2中的背壓
Rxjava2中新增了一個(gè)被觀察者Flowable用來專門支持背壓笛粘,默認(rèn)隊(duì)列大小128趁怔,并且其所有的操作符都強(qiáng)制支持背壓,先看個(gè)簡單的例子:
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
for (int i = 0;i < 1000000; i++) {
emitter.onNext("i = "+i);
}
}
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e("tag","----> "+s);
}
});
運(yùn)行結(jié)果如下:
說好的支持背壓呢薪前,怎么這個(gè)熟悉的異常又出現(xiàn)了润努??示括?铺浇?
細(xì)心的同學(xué)肯定發(fā)現(xiàn)了,F(xiàn)lowable.create方法第二個(gè)參數(shù)BackpressureStrategy.ERROR垛膝,這個(gè)BackpressureStrategy類其實(shí)就是處理背壓的策略類鳍侣,看下這個(gè)類的源碼:
public enum BackpressureStrategy {
//不指定背壓策略
MISSING,
//出現(xiàn)背壓就拋出異常
ERROR,
//指定無限大小的緩存池,此時(shí)不會出現(xiàn)異常吼拥,但無限制大量發(fā)送會發(fā)生OOM
BUFFER,
//如果緩存池滿了就丟棄掉之后發(fā)出的事件
DROP,
//在DROP的基礎(chǔ)上倚聚,強(qiáng)制將最后一條數(shù)據(jù)加入到緩存池中
LATEST
}
依次來看下這幾種策略的區(qū)別吧!
MISSING
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
for (int i = 0;i < 1000000; i++) {
emitter.onNext("i = "+i);
}
}
}, BackpressureStrategy.MISSING)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e("tag","----> "+s);
}
});
不出所料凿可,果然拋出了異常:
ERROR
BackpressureStrategy.ERROR上面已經(jīng)測試過了惑折,不再重復(fù)了,依然會報(bào)異常矿酵。
BUFFER
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
for (int i = 0;i < 1000000; i++) {
emitter.onNext("i = "+i);
}
}
}, BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e("tag","----> "+s);
}
});
運(yùn)行結(jié)果如下唬复,確實(shí)不會出現(xiàn)背壓異常了矗积,但是內(nèi)存占用嗖嗖的升高全肮,數(shù)據(jù)量足夠大足夠快的時(shí)候,OOM指日可待棘捣,哈哈哈9枷佟!!
DROP
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
for (int i = 0;i < 1000000; i++) {
emitter.onNext("i = "+i);
}
}
}, BackpressureStrategy.DROP)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e("tag","----> "+s);
}
});
運(yùn)行結(jié)果如下:
可以發(fā)現(xiàn)评疗,在填充滿了默認(rèn)的128個(gè)大小的緩存池后测砂,丟棄了很多數(shù)據(jù),DROP就是干這事的百匆,發(fā)不下就不放了砌些,有點(diǎn)狠啊<有佟存璃!
LATEST
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
for (int i = 0;i < 1000; i++) {
emitter.onNext("i = "+i);
}
}
}, BackpressureStrategy.LATEST)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e("tag","----> "+s);
}
});
這次我們只發(fā)送1000個(gè)事件,運(yùn)行結(jié)果如下:
LATEST策略下雕拼,當(dāng)緩存池滿了之后也是會丟棄事件的纵东,不僅如此,它還會把事件的最后一個(gè)強(qiáng)制放入到緩存池中啥寇,所以可以看到999被觀察者收到了偎球。
上面我們都是用的Flowable的create創(chuàng)建的被觀察者,如果我們使用just辑甜,fromArray等操作符該如何指定背壓策略呢衰絮?其實(shí)也很簡單,因?yàn)閞xjava2像rxjava1那樣也提供了onBackpressureBuffer()磷醋,onBackpressureDrop()岂傲,onBackpressureLatest(),這樣用就可以了:
Flowable.range(1,1000)
.onBackpressureBuffer(1000)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer s) throws Exception {
Log.e("tag","----> "+s);
}
});
嗯子檀,運(yùn)行結(jié)果很穩(wěn):
那么可能我們會有個(gè)疑問镊掖,上面的例子都是觀察者被動的接收事件,能不能主動拉取事件呢褂痰,當(dāng)然可以亩进,我們看下下面這個(gè)例子:
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
for (int i = 0; i < 1000; i++) {
emitter.onNext("i = " + i);
}
}
}, BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
subscription = s;
}
@Override
public void onNext(String s) {
Log.e("tag", "----> " + s);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
看下運(yùn)行結(jié)果:
此時(shí)我們的觀察者使用了Subscriber,它有一個(gè)onSubscribe方法缩歪,參數(shù)為Subscription归薛,其實(shí)關(guān)鍵點(diǎn)就是這個(gè)Subscription,它的作用就是從緩存池拉取事件匪蝙,它有一個(gè)request(count)方法主籍,它的作用就是拉取事件,并可以指定拉取事件的個(gè)數(shù)逛球。我們在上面的例子中千元,使用subscription.request(5)每次拉取5個(gè)事件,其實(shí)也是很簡單的颤绕。
其實(shí)搞了半天幸海,文章基本也要結(jié)束了祟身,雖然rxjava提供了處理背壓的策略,但是最好還是能盡量避免上游被觀察者發(fā)送事件過快過多物独,實(shí)在需要處理袜硫,就結(jié)合各種策略和操作符進(jìn)行按需處理。
3.項(xiàng)目中的使用
上周在項(xiàng)目中遇到了這么一個(gè)場景挡篓,就是在跳轉(zhuǎn)頁面之前需要釋放camera婉陷,這是個(gè)耗時(shí)操作,返回當(dāng)前頁面的時(shí)候需要重新open Camera官研,而且open Camera的時(shí)機(jī)需要在SurfaceView的create中執(zhí)行憨攒,這個(gè)場景剛好用request可以解決,例子和上面類似阀参,就不再上代碼了肝集。
關(guān)于Flowable的使用就先到這吧,由于個(gè)人水平有限蛛壳,難免會犯些錯(cuò)誤杏瞻,有問題歡迎留言討論。