本篇文章已授權(quán)微信公眾號(hào) YYGeeker
獨(dú)家發(fā)布轉(zhuǎn)載請(qǐng)標(biāo)明出處
CSDN學(xué)院課程地址
- RxJava2從入門到精通-初級(jí)篇:https://edu.csdn.net/course/detail/10036
- RxJava2從入門到精通-中級(jí)篇:https://edu.csdn.net/course/detail/10037
- RxJava2從入門到精通-進(jìn)階篇:https://edu.csdn.net/course/detail/10038
- RxJava2從入門到精通-源碼分析篇:https://edu.csdn.net/course/detail/10138
5. RxJava背壓策略(BackpressureStrategy)
5.1 背壓是什么
背壓的概念是在平時(shí)業(yè)務(wù)開(kāi)發(fā)時(shí)較為常見(jiàn)很泊,大多數(shù)是針對(duì)高并發(fā)的業(yè)務(wù),背壓是必須考慮的因素之一。在異步場(chǎng)景中继蜡,由于數(shù)據(jù)流的發(fā)射速度高于數(shù)據(jù)流的接收速度,就會(huì)導(dǎo)致數(shù)據(jù)不能及時(shí)處理蝙场,從而導(dǎo)致數(shù)據(jù)流的阻塞嗜憔。背壓所要做的事情就是主動(dòng)控制數(shù)據(jù)流發(fā)射的速度
在RxJava2.0中夺鲜,推出了Flowable用來(lái)支持背壓,去除了Observable對(duì)背壓的支持呐舔,下面在背壓策略的講解中币励,我們都使用Flowable作為我們的響應(yīng)類型。在使用背壓時(shí)珊拼,只需要在create()
方法中第二個(gè)參數(shù)添加背壓策略即可
- 在訂閱的時(shí)候如果使用
FlowableSubscriber
食呻,那么需要通過(guò)s.request(Long.MAX_VALUE)
去主動(dòng)請(qǐng)求上游的數(shù)據(jù)項(xiàng)。如果遇到背壓報(bào)錯(cuò)的時(shí)候,FlowableSubscriber
默認(rèn)已經(jīng)將錯(cuò)誤try-catch仅胞,并通過(guò)onError()
進(jìn)行回調(diào)每辟,程序并不會(huì)崩潰 - 在訂閱的時(shí)候如果使用
Consumer
,那么不需要主動(dòng)去請(qǐng)求上游數(shù)據(jù)干旧,默認(rèn)已經(jīng)調(diào)用了s.request(Long.MAX_VALUE)
渠欺。如果遇到背壓報(bào)錯(cuò)、且對(duì)Throwable的Consumer
沒(méi)有new出來(lái)椎眯,則程序直接崩潰 - 背壓策略的上游的默認(rèn)緩存池是128
public abstract class Flowable<T> implements Publisher<T> {
/** The default buffer size. */
static final int BUFFER_SIZE;
static {
BUFFER_SIZE = Math.max(1, Integer.getInteger("rx2.buffer-size", 128));
}
}
5.2 MISSING
MISSING表示OnNext事件沒(méi)有任何緩存和丟棄挠将,下游要處理任何溢出,可以理解為相當(dāng)于沒(méi)有指定背壓策略编整。Flowable相當(dāng)于沒(méi)有指定背壓策略可以將下游要處理任何溢出理解為捐名,上游發(fā)射的數(shù)據(jù)未得到處理,就會(huì)緩存起來(lái)闹击,當(dāng)緩存容量達(dá)到128時(shí),再增加一個(gè)未處理的數(shù)據(jù)項(xiàng)成艘,就會(huì)拋出MissingBackpressureException赏半,且?guī)в嘘?duì)列已經(jīng)滿了的友好提示。這里就好比一個(gè)大水缸淆两,當(dāng)水注滿的時(shí)候断箫,它就會(huì)把蓋子蓋上,不讓你再繼續(xù)注水了
這里我們模擬上游發(fā)送速度高于下游數(shù)據(jù)流的處理速度秋冰,在數(shù)據(jù)處理的時(shí)候加上
Thread.sleep(1000)
public void missing() {
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 129; i++) {
emitter.onNext(i);
}
}
}, BackpressureStrategy.MISSING)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new FlowableSubscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.e("TAG", "onNext=" + integer);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
}
});
}
輸出
io.reactivex.exceptions.MissingBackpressureException: Queue is full?!
5.3 ERROR
ERROR表示在下游無(wú)法跟上時(shí)仲义,會(huì)拋出MissingBackpressureException〗9矗可以將下游無(wú)法跟上理解為埃撵,上游發(fā)射的數(shù)據(jù)未得到處理,就會(huì)緩存起來(lái)虽另,當(dāng)緩存容量達(dá)到128時(shí)暂刘,再增加一個(gè)未處理的數(shù)據(jù)項(xiàng),就會(huì)拋出MissingBackpressureException捂刺。這里好比一個(gè)大水缸谣拣,當(dāng)水注滿的時(shí)候,它會(huì)把水缸撐破了族展,直接破裂
public void error() {
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 129; i++) {
emitter.onNext(i);
}
}
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new FlowableSubscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.e("TAG", "onNext=" + integer);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
}
});
}
輸出
io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
5.4 BUFFER
上游不斷的發(fā)出onNext請(qǐng)求森缠,直到下游處理完,上游發(fā)射的數(shù)據(jù)項(xiàng)的緩存池是無(wú)限大的仪缸,程序也不會(huì)拋出錯(cuò)誤贵涵,但是要注意程序OOM的現(xiàn)象,因?yàn)榫彺嬖酱螅加玫膬?nèi)存就越多独悴。例子中發(fā)射129個(gè)數(shù)據(jù)項(xiàng)例书,然而程序并沒(méi)有崩潰,只會(huì)一直讀取緩存池的數(shù)據(jù)項(xiàng)刻炒,直到數(shù)據(jù)項(xiàng)被處理完决采。這里就是一個(gè)無(wú)限大的水缸
背壓策略除了BUFFER策略的緩存池是無(wú)限大之外,其他默認(rèn)的緩存池都是128
public void buffer() {
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 1000; i++) {
emitter.onNext(i);
}
}
}, BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new FlowableSubscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.e("TAG", "onNext=" + integer);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
}
});
}
輸出
onNext=0
onNext=1
onNext=2
......
onNext=998
onNext=999
5.5 DROP
會(huì)在下游跟不上速度時(shí)坟奥,把onNext的值丟棄树瞭,簡(jiǎn)單的說(shuō)就是,超過(guò)緩存區(qū)大邪(128)的數(shù)據(jù)項(xiàng)都會(huì)被丟棄晒喷。例子中通過(guò)發(fā)射800個(gè)數(shù)據(jù)項(xiàng),那么我們只會(huì)收到0-127的數(shù)據(jù)項(xiàng)访敌。如果我們?cè)俅握{(diào)用request()
凉敲,這時(shí)候取到的數(shù)據(jù)就是上一次request()后的128個(gè)數(shù)據(jù)。這里好比一個(gè)大水缸寺旺,當(dāng)水注滿的時(shí)候爷抓,水還是在繼續(xù)的流,一旦有request調(diào)用的時(shí)候阻塑,它就會(huì)去取出水缸里的所有水蓝撇,這時(shí)候水缸就是空的,但水一直在流陈莽,所以水缸馬上又會(huì)被注滿渤昌,這個(gè)時(shí)候就要等request再次取出水缸里的水
public void drop() {
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 1000; i++) {
emitter.onNext(i);
}
}
}, BackpressureStrategy.DROP)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new FlowableSubscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.e("TAG", "onNext=" + integer);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
}
});
}
輸出
onNext=0
onNext=1
onNext=2
......
onNext=127
5.6 LATEST
LATEST與Drop策略一樣,如果超過(guò)緩存池容量大小的數(shù)據(jù)項(xiàng)都會(huì)被丟棄走搁。不同的是独柑,不管緩存池的狀態(tài)如何,LATEST都會(huì)將最后一條數(shù)據(jù)強(qiáng)行放入緩存池中私植。這里的水缸容納下了最后一滴水
public void latest() {
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 1000; i++) {
emitter.onNext(i);
}
}
}, BackpressureStrategy.LATEST)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new FlowableSubscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.e("TAG", "onNext=" + integer);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
}
});
}
輸出
onNext=0
onNext=1
......
onNext=126
onNext=127
onNext=999
5.7 小結(jié)
- MISSING:沒(méi)有任何緩存和丟棄群嗤,下游要處理任何溢出
- ERROR:下游的處理速度無(wú)法跟上上游的發(fā)射速度時(shí)報(bào)錯(cuò)
- BUFFER:數(shù)據(jù)項(xiàng)的緩存池?zé)o限大
- DROP:下游的處理速度無(wú)法跟上上游的發(fā)射速度時(shí)丟棄
- LATEST:最后一條數(shù)據(jù)項(xiàng)被強(qiáng)行放入緩存池