前言
Rxjava
妻柒,由于其基于事件流的鏈?zhǔn)秸{(diào)用祝沸、邏輯簡潔 & 使用簡單的特點(diǎn)封恰,深受各大 Android
開發(fā)者的歡迎麻养。
如果還不了解RxJava,請(qǐng)看文章:Android:這是一篇 清晰 & 易懂的Rxjava 入門教程
- 本文主要講解的是
RxJava
中的 背壓控制策略诺舔,希望你們會(huì)喜歡鳖昌。
Carson帶你學(xué)RxJava系列文章,包括 原理低飒、操作符许昨、應(yīng)用場(chǎng)景、背壓等等褥赊,請(qǐng)關(guān)注看文章:Android:這是一份全面 & 詳細(xì)的RxJava學(xué)習(xí)指南
目錄
1. 引言
1.1 背景
- 觀察者 & 被觀察者 之間存在2種訂閱關(guān)系:同步 & 異步糕档。具體如下:
- 對(duì)于異步訂閱關(guān)系,存在 被觀察者發(fā)送事件速度 與觀察者接收事件速度 不匹配的情況
- 發(fā)送 & 接收事件速度 = 單位時(shí)間內(nèi) 發(fā)送&接收事件的數(shù)量
- 大多數(shù)情況崭倘,主要是 被觀察者發(fā)送事件速度 > 觀察者接收事件速度
1.2 問題
- 被觀察者 發(fā)送事件速度太快,而觀察者 來不及接收所有事件类垫,從而導(dǎo)致觀察者無法及時(shí)響應(yīng) / 處理所有發(fā)送過來事件的問題司光,最終導(dǎo)致緩存區(qū)溢出、事件丟失 & OOM
- 如悉患,點(diǎn)擊按鈕事件:連續(xù)過快的點(diǎn)擊按鈕10次残家,則只會(huì)造成點(diǎn)擊2次的效果;
- 解釋:因?yàn)辄c(diǎn)擊速度太快了售躁,所以按鈕來不及響應(yīng)
下面再舉個(gè)例子:
- 被觀察者的發(fā)送事件速度 = 10ms / 個(gè)
- 觀察者的接收事件速度 = 5s / 個(gè)
即出現(xiàn)發(fā)送 & 接收事件嚴(yán)重不匹配的問題
Observable.create(new ObservableOnSubscribe<Integer>() {
// 1. 創(chuàng)建被觀察者 & 生產(chǎn)事件
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; ; i++) {
Log.d(TAG, "發(fā)送了事件"+ i );
Thread.sleep(10);
// 發(fā)送事件速度:10ms / 個(gè)
emitter.onNext(i);
}
}
}).subscribeOn(Schedulers.io()) // 設(shè)置被觀察者在io線程中進(jìn)行
.observeOn(AndroidSchedulers.mainThread()) // 設(shè)置觀察者在主線程中進(jìn)行
.subscribe(new Observer<Integer>() {
// 2. 通過通過訂閱(subscribe)連接觀察者和被觀察者
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開始采用subscribe連接");
}
@Override
public void onNext(Integer value) {
try {
// 接收事件速度:5s / 個(gè)
Thread.sleep(5000);
Log.d(TAG, "接收到了事件"+ value );
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對(duì)Error事件作出響應(yīng)");
}
@Override
public void onComplete() {
Log.d(TAG, "對(duì)Complete事件作出響應(yīng)");
}
});
- 結(jié)果
由于被觀察者發(fā)送事件速度 > 觀察者接收事件速度坞淮,所以出現(xiàn)流速不匹配問題,從而導(dǎo)致OOM
示意圖
1.3 解決方案
采用 背壓策略陪捷。
下面回窘,我將開始介紹背壓策略。
2. 背壓策略簡介
2.1 定義
一種 控制事件流速 的策略
2.2 作用
在 異步訂閱關(guān)系 中市袖,控制事件發(fā)送 & 接收的速度
注:背壓的作用域 = 異步訂閱關(guān)系啡直,即 被觀察者 & 觀察者處在不同線程中
2.3 解決的問題
解決了 因被觀察者發(fā)送事件速度 與 觀察者接收事件速度 不匹配(一般是前者 快于 后者),從而導(dǎo)致觀察者無法及時(shí)響應(yīng) / 處理所有 被觀察者發(fā)送事件 的問題
2.4 應(yīng)用場(chǎng)景
- 被觀察者發(fā)送事件速度 與 觀察者接收事件速度 不匹配的場(chǎng)景
- 具體場(chǎng)景就取決于 該事件的類型苍碟,如:網(wǎng)絡(luò)請(qǐng)求酒觅,那么具體場(chǎng)景:有很多網(wǎng)絡(luò)請(qǐng)求需要執(zhí)行,但執(zhí)行者的執(zhí)行速度沒那么快微峰,此時(shí)就需要使用背壓策略來進(jìn)行控制舷丹。
3. 背壓策略的原理
- 那么,RxJava實(shí)現(xiàn)背壓策略(
Backpressure
)的原理是什么呢蜓肆? - 解決方案 & 思想主要如下:
- 示意圖如下
- 與
RxJava1.0
中被觀察者的舊實(shí)現(xiàn)Observable
對(duì)比
- 好了颜凯,那么上圖中在
RxJava 2.0
觀察者模型中谋币,Flowable
到底是什么呢?它其實(shí)是RxJava 2.0
中被觀察者的一種新實(shí)現(xiàn)装获,同時(shí)也是背壓策略實(shí)現(xiàn)的承載者 - 請(qǐng)繼續(xù)看下一節(jié)的介紹:背壓策略的具體實(shí)現(xiàn) -
Flowable
4. 背壓策略的具體實(shí)現(xiàn):Flowable
在 RxJava2.0
中瑞信,采用 Flowable
實(shí)現(xiàn) 背壓策略
正確來說,應(yīng)該是 “非阻塞式背壓” 策略
4.1 Flowable 介紹
- 定義:在
RxJava2.0
中穴豫,被觀察者(Observable
)的一種新實(shí)現(xiàn)
同時(shí)凡简,
RxJava1.0
中被觀察者(Observable
)的舊實(shí)現(xiàn):Observable
依然保留
- 作用:實(shí)現(xiàn) 非阻塞式背壓 策略
4.2 Flowable 特點(diǎn)
-
Flowable
的特點(diǎn) 具體如下
- 下面再貼出一張
RxJava2.0
與RxJava1.0
的觀察者模型的對(duì)比圖
實(shí)際上,
RxJava2.0
也有保留(被觀察者)Observerble - Observer(觀察者)的觀察者模型精肃,此處只是為了做出對(duì)比讓讀者了解
4.3 與 RxJava1.0 中被觀察者的舊實(shí)現(xiàn) Observable 的關(guān)系
- 具體如下圖
- 那么秤涩,為什么要采用新實(shí)現(xiàn)
Flowable
實(shí)現(xiàn)背壓,而不采用舊的Observable
呢司抱? - 主要原因:舊實(shí)現(xiàn)
Observable
無法很好解決背壓問題筐眷。
4.4 Flowable的基礎(chǔ)使用
-
Flowable
的基礎(chǔ)使用非常類似于Observable
- 具體如下
/**
* 步驟1:創(chuàng)建被觀察者 = Flowable
*/
Flowable<Integer> upstream = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}, BackpressureStrategy.ERROR);
// 需要傳入背壓參數(shù)BackpressureStrategy,下面會(huì)詳細(xì)講解
/**
* 步驟2:創(chuàng)建觀察者 = Subscriber
*/
Subscriber<Integer> downstream = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
// 對(duì)比Observer傳入的Disposable參數(shù)习柠,Subscriber此處傳入的參數(shù) = Subscription
// 相同點(diǎn):Subscription具備Disposable參數(shù)的作用匀谣,即Disposable.dispose()切斷連接, 同樣的調(diào)用Subscription.cancel()切斷連接
// 不同點(diǎn):Subscription增加了void request(long n)
Log.d(TAG, "onSubscribe");
s.request(Long.MAX_VALUE);
// 關(guān)于request()下面會(huì)繼續(xù)詳細(xì)說明
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
};
/**
* 步驟3:建立訂閱關(guān)系
*/
upstream.subscribe(downstream);
- 更加優(yōu)雅的鏈?zhǔn)秸{(diào)用
// 步驟1:創(chuàng)建被觀察者 = Flowable
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "發(fā)送事件 1");
emitter.onNext(1);
Log.d(TAG, "發(fā)送事件 2");
emitter.onNext(2);
Log.d(TAG, "發(fā)送事件 3");
emitter.onNext(3);
Log.d(TAG, "發(fā)送完成");
emitter.onComplete();
}
}, BackpressureStrategy.ERROR)
.subscribe(new Subscriber<Integer>() {
// 步驟2:創(chuàng)建觀察者 = Subscriber & 建立訂閱關(guān)系
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
s.request(3);
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "接收到了事件" + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
- 至此,
Flowable
的基礎(chǔ)使用講解完 - 關(guān)于更深層次的使用會(huì)結(jié)合 背壓策略的實(shí)現(xiàn) 來講解
5. 背壓策略的使用
- 在本節(jié)中资溃,我將結(jié)合 背壓策略的原理 & Flowable的使用武翎,為大家介紹在RxJava 2.0 中該如何使用Flowable來實(shí)現(xiàn)背壓策略功能,即背壓策略的使用
-
Flowable
與Observable
在功能上的區(qū)別主要是 多了背壓的功能 - 下面溶锭,我將順著第3節(jié)中講解背壓策略實(shí)現(xiàn)原理 & 解決方案(如下圖)宝恶,來講解
Flowable
在背壓策略功能上的使用
注:
- 由于第2節(jié)中提到,使用背壓的場(chǎng)景 = 異步訂閱關(guān)系趴捅,所以下文中講解的主要是異步訂閱關(guān)系場(chǎng)景垫毙,即 被觀察者 & 觀察者 工作在不同線程中
- 但由于在同步訂閱關(guān)系的場(chǎng)景也可能出現(xiàn)流速不匹配的問題,所以在講解異步情況后拱绑,會(huì)稍微講解一下同步情況综芥,以方便對(duì)比
5.1 控制 觀察者接收事件 的速度
5.1.1 異步訂閱情況
- 簡介
- 具體原理圖
- 具體使用
// 1. 創(chuàng)建被觀察者Flowable
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
// 一共發(fā)送4個(gè)事件
Log.d(TAG, "發(fā)送事件 1");
emitter.onNext(1);
Log.d(TAG, "發(fā)送事件 2");
emitter.onNext(2);
Log.d(TAG, "發(fā)送事件 3");
emitter.onNext(3);
Log.d(TAG, "發(fā)送事件 4");
emitter.onNext(4);
Log.d(TAG, "發(fā)送完成");
emitter.onComplete();
}
}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()) // 設(shè)置被觀察者在io線程中進(jìn)行
.observeOn(AndroidSchedulers.mainThread()) // 設(shè)置觀察者在主線程中進(jìn)行
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
// 對(duì)比Observer傳入的Disposable參數(shù),Subscriber此處傳入的參數(shù) = Subscription
// 相同點(diǎn):Subscription參數(shù)具備Disposable參數(shù)的作用猎拨,即Disposable.dispose()切斷連接, 同樣的調(diào)用Subscription.cancel()切斷連接
// 不同點(diǎn):Subscription增加了void request(long n)
s.request(3);
// 作用:決定觀察者能夠接收多少個(gè)事件
// 如設(shè)置了s.request(3)毫痕,這就說明觀察者能夠接收3個(gè)事件(多出的事件存放在緩存區(qū))
// 官方默認(rèn)推薦使用Long.MAX_VALUE,即s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "接收到了事件" + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
- 效果圖
- 有2個(gè)結(jié)論是需要大家注意的
下圖 = 當(dāng)緩存區(qū)存滿時(shí)(128個(gè)事件)溢出報(bào)錯(cuò)的原理圖
- 代碼演示1:觀察者不接收事件的情況下迟几,被觀察者繼續(xù)發(fā)送事件 & 存放到緩存區(qū)消请;再按需取出
/**
* 步驟1:設(shè)置變量
*/
private static final String TAG = "Rxjava";
private Button btn; // 該按鈕用于調(diào)用Subscription.request(long n )
private Subscription mSubscription; // 用于保存Subscription對(duì)象
/**
* 步驟2:設(shè)置點(diǎn)擊事件 = 調(diào)用Subscription.request(long n )
*/
btn = (Button) findViewById(R.id.btn);
btn.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View view) {
mSubscription.request(2);
}
});
/**
* 步驟3:異步調(diào)用
*/
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "發(fā)送事件 1");
emitter.onNext(1);
Log.d(TAG, "發(fā)送事件 2");
emitter.onNext(2);
Log.d(TAG, "發(fā)送事件 3");
emitter.onNext(3);
Log.d(TAG, "發(fā)送事件 4");
emitter.onNext(4);
Log.d(TAG, "發(fā)送完成");
emitter.onComplete();
}
}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()) // 設(shè)置被觀察者在io線程中進(jìn)行
.observeOn(AndroidSchedulers.mainThread()) // 設(shè)置觀察者在主線程中進(jìn)行
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
mSubscription = s;
// 保存Subscription對(duì)象,等待點(diǎn)擊按鈕時(shí)(調(diào)用request(2))觀察者再接收事件
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "接收到了事件" + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
- 代碼演示2:觀察者不接收事件的情況下类腮,被觀察者繼續(xù)發(fā)送事件至超出緩存區(qū)大须(128)
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
// 一共發(fā)送129個(gè)事件,即超出了緩存區(qū)的大小
for (int i = 0;i< 129; i++) {
Log.d(TAG, "發(fā)送了事件" + i);
emitter.onNext(i);
}
emitter.onComplete();
}
}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()) // 設(shè)置被觀察者在io線程中進(jìn)行
.observeOn(AndroidSchedulers.mainThread()) // 設(shè)置觀察者在主線程中進(jìn)行
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
// 默認(rèn)不設(shè)置可接收事件大小
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "接收到了事件" + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
5.1.2 同步訂閱情況
同步訂閱 & 異步訂閱 的區(qū)別在于:
- 同步訂閱中蚜枢,被觀察者 & 觀察者工作于同1線程
- 同步訂閱關(guān)系中沒有緩存區(qū)
- 被觀察者在發(fā)送1個(gè)事件后缸逃,必須等待觀察者接收后针饥,才能繼續(xù)發(fā)下1個(gè)事件
/**
* 步驟1:創(chuàng)建被觀察者 = Flowable
*/
Flowable<Integer> upstream = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
// 發(fā)送3個(gè)事件
Log.d(TAG, "發(fā)送了事件1");
emitter.onNext(1);
Log.d(TAG, "發(fā)送了事件2");
emitter.onNext(2);
Log.d(TAG, "發(fā)送了事件3");
emitter.onNext(3);
emitter.onComplete();
}
}, BackpressureStrategy.ERROR);
/**
* 步驟2:創(chuàng)建觀察者 = Subscriber
*/
Subscriber<Integer> downstream = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
s.request(3);
// 每次可接收事件 = 3 二次匹配
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "接收到了事件 " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
};
/**
* 步驟3:建立訂閱關(guān)系
*/
upstream.subscribe(downstream);
所以,實(shí)際上并不會(huì)出現(xiàn)被觀察者發(fā)送事件速度 > 觀察者接收事件速度的情況需频。可是丁眼,卻會(huì)出現(xiàn)被觀察者發(fā)送事件數(shù)量 > 觀察者接收事件數(shù)量的問題。
- 如:觀察者只能接受3個(gè)事件昭殉,但被觀察者卻發(fā)送了4個(gè)事件苞七,所以出現(xiàn)了不匹配情況
/**
* 步驟1:創(chuàng)建被觀察者 = Flowable
*/
Flowable<Integer> upstream = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
// 被觀察者發(fā)送事件數(shù)量 = 4個(gè)
Log.d(TAG, "發(fā)送了事件1");
emitter.onNext(1);
Log.d(TAG, "發(fā)送了事件2");
emitter.onNext(2);
Log.d(TAG, "發(fā)送了事件3");
emitter.onNext(3);
Log.d(TAG, "發(fā)送了事件4");
emitter.onNext(4);
emitter.onComplete();
}
}, BackpressureStrategy.ERROR);
/**
* 步驟2:創(chuàng)建觀察者 = Subscriber
*/
Subscriber<Integer> downstream = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
s.request(3);
// 觀察者接收事件 = 3個(gè) ,即不匹配
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "接收到了事件 " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
};
/**
* 步驟3:建立訂閱關(guān)系
*/
upstream.subscribe(downstream);
所以挪丢,對(duì)于沒有緩存區(qū)概念的同步訂閱關(guān)系來說蹂风,單純采用控制觀察者的接收事件數(shù)量(響應(yīng)式拉取)實(shí)際上就等于 “單相思”乾蓬,雖然觀察者控制了要接收3個(gè)事件惠啄,但假設(shè)被觀察者需要發(fā)送4個(gè)事件,還是會(huì)出現(xiàn)問題任内。
在下面講解 5.2 控制被觀察者發(fā)送事件速度 時(shí)會(huì)解決這個(gè)問題撵渡。
- 有1個(gè)特殊情況需要注意
- 代碼演示
/**
* 同步情況
*/
/**
* 步驟1:創(chuàng)建被觀察者 = Flowable
*/
Flowable<Integer> upstream = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "發(fā)送了事件1");
emitter.onNext(1);
Log.d(TAG, "發(fā)送了事件2");
emitter.onNext(2);
Log.d(TAG, "發(fā)送了事件3");
emitter.onNext(3);
emitter.onComplete();
}
}, BackpressureStrategy.ERROR);
/**
* 步驟2:創(chuàng)建觀察者 = Subscriber
*/
Subscriber<Integer> downstream = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
// 不設(shè)置request(long n)
// s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
};
/**
* 步驟3:建立訂閱關(guān)系
*/
upstream.subscribe(downstream);
在被觀察者發(fā)送第1個(gè)事件后, 就拋出MissingBackpressureException
異常 & 觀察者沒有收到任何事件
5.2 控制 被觀察者發(fā)送事件 的速度
- 簡介
-
FlowableEmitter
類的requested()
介紹
public interface FlowableEmitter<T> extends Emitter<T> {
// FlowableEmitter = 1個(gè)接口,繼承自Emitter
// Emitter接口方法包括:onNext(),onComplete() & onError
long requested();
// 作用:返回當(dāng)前線程中request(a)中的a值
// 該request(a)則是措施1中講解的方法死嗦,作用 = 設(shè)置
....// 僅貼出關(guān)鍵代碼
}
每個(gè)線程中的
requested()
的返回值 = 該線程中的request(a)
的a值對(duì)應(yīng)于同步 & 異步訂閱情況 的原理圖
為了方便大家理解該策略中的requested()
使用趋距,該節(jié)會(huì)先講解同步訂閱情況,再講解異步訂閱情況
5.2.1 同步訂閱情況
- 原理說明
即在同步訂閱情況中越走,被觀察者 通過 FlowableEmitter.requested()
獲得了觀察者自身接收事件能力棚品,從而根據(jù)該信息控制事件發(fā)送速度靠欢,從而達(dá)到了觀察者反向控制被觀察者的效果
- 具體使用
下面的例子 = 被觀察者根據(jù)觀察者自身接收事件能力(10個(gè)事件)廊敌,從而僅發(fā)送10個(gè)事件
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
// 調(diào)用emitter.requested()獲取當(dāng)前觀察者需要接收的事件數(shù)量
long n = emitter.requested();
Log.d(TAG, "觀察者可接收事件" + n);
// 根據(jù)emitter.requested()的值,即當(dāng)前觀察者需要接收的事件數(shù)量來發(fā)送事件
for (int i = 0; i < n; i++) {
Log.d(TAG, "發(fā)送了事件" + i);
emitter.onNext(i);
}
}
}, BackpressureStrategy.ERROR)
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
// 設(shè)置觀察者每次能接受10個(gè)事件
s.request(10);
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "接收到了事件" + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
- 特別注意
在同步訂閱情況中使用FlowableEmitter.requested()
時(shí)门怪,有以下幾種使用特性需要注意的:
情況1:可疊加性
- 即:觀察者可連續(xù)要求接收事件骡澈,被觀察者會(huì)進(jìn)行疊加并一起發(fā)送
Subscription.request(a1);
Subscription.request(a2)掷空;
FlowableEmitter.requested()的返回值 = a1 + a2
- 代碼演示
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
// 調(diào)用emitter.requested()獲取當(dāng)前觀察者需要接收的事件數(shù)量
Log.d(TAG, "觀察者可接收事件" + emitter.requested());
}
}, BackpressureStrategy.ERROR)
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
s.request(10); // 第1次設(shè)置觀察者每次能接受10個(gè)事件
s.request(20); // 第2次設(shè)置觀察者每次能接受20個(gè)事件
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "接收到了事件" + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
情況2:實(shí)時(shí)更新性
- 即肋殴,每次發(fā)送事件后,emitter.requested()會(huì)實(shí)時(shí)更新觀察者能接受的事件
- 即一開始觀察者要接收10個(gè)事件坦弟,發(fā)送了1個(gè)后护锤,會(huì)實(shí)時(shí)更新為9個(gè)
- 僅計(jì)算
Next
事件,complete & error
事件不算酿傍。
Subscription.request(10)烙懦;
// FlowableEmitter.requested()的返回值 = 10
FlowableEmitter.onNext(1); // 發(fā)送了1個(gè)事件
// FlowableEmitter.requested()的返回值 = 9
- 代碼演示
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
// 1. 調(diào)用emitter.requested()獲取當(dāng)前觀察者需要接收的事件數(shù)量
Log.d(TAG, "觀察者可接收事件數(shù)量 = " + emitter.requested());
// 2. 每次發(fā)送事件后,emitter.requested()會(huì)實(shí)時(shí)更新觀察者能接受的事件
// 即一開始觀察者要接收10個(gè)事件赤炒,發(fā)送了1個(gè)后氯析,會(huì)實(shí)時(shí)更新為9個(gè)
Log.d(TAG, "發(fā)送了事件 1");
emitter.onNext(1);
Log.d(TAG, "發(fā)送了事件1后, 還需要發(fā)送事件數(shù)量 = " + emitter.requested());
Log.d(TAG, "發(fā)送了事件 2");
emitter.onNext(2);
Log.d(TAG, "發(fā)送事件2后, 還需要發(fā)送事件數(shù)量 = " + emitter.requested());
Log.d(TAG, "發(fā)送了事件 3");
emitter.onNext(3);
Log.d(TAG, "發(fā)送事件3后, 還需要發(fā)送事件數(shù)量 = " + emitter.requested());
emitter.onComplete();
}
}, BackpressureStrategy.ERROR)
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
s.request(10); // 設(shè)置觀察者每次能接受10個(gè)事件
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "接收到了事件" + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
情況3:異常
- 當(dāng)
FlowableEmitter.requested()
減到0時(shí)亏较,則代表觀察者已經(jīng)不可接收事件 - 此時(shí)被觀察者若繼續(xù)發(fā)送事件,則會(huì)拋出
MissingBackpressureException
異常
如觀察者可接收事件數(shù)量 = 1掩缓,當(dāng)被觀察者發(fā)送第2個(gè)事件時(shí)叹阔,就會(huì)拋出異常
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
// 1. 調(diào)用emitter.requested()獲取當(dāng)前觀察者需要接收的事件數(shù)量
Log.d(TAG, "觀察者可接收事件數(shù)量 = " + emitter.requested());
// 2. 每次發(fā)送事件后赠尾,emitter.requested()會(huì)實(shí)時(shí)更新觀察者能接受的事件
// 即一開始觀察者要接收10個(gè)事件,發(fā)送了1個(gè)后,會(huì)實(shí)時(shí)更新為9個(gè)
Log.d(TAG, "發(fā)送了事件 1");
emitter.onNext(1);
Log.d(TAG, "發(fā)送了事件1后, 還需要發(fā)送事件數(shù)量 = " + emitter.requested());
Log.d(TAG, "發(fā)送了事件 2");
emitter.onNext(2);
Log.d(TAG, "發(fā)送事件2后, 還需要發(fā)送事件數(shù)量 = " + emitter.requested());
emitter.onComplete();
}
}, BackpressureStrategy.ERROR)
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
s.request(1); // 設(shè)置觀察者每次能接受1個(gè)事件
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "接收到了事件" + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
額外
- 若觀察者沒有設(shè)置可接收事件數(shù)量冈在,即無調(diào)用
Subscription.request()
- 那么被觀察者默認(rèn)觀察者可接收事件數(shù)量 = 0,即
FlowableEmitter.requested()
的返回值 = 0
5.2.2 異步訂閱情況
- 原理說明
從上面可以看出令杈,由于二者處于不同線程掌敬,所以被觀察者 無法通過 FlowableEmitter.requested()
知道觀察者自身接收事件能力,即 被觀察者不能根據(jù) 觀察者自身接收事件的能力 控制發(fā)送事件的速度蠢熄。具體請(qǐng)看下面例子
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
// 調(diào)用emitter.requested()獲取當(dāng)前觀察者需要接收的事件數(shù)量
Log.d(TAG, "觀察者可接收事件數(shù)量 = " + emitter.requested());
}
}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()) // 設(shè)置被觀察者在io線程中進(jìn)行
.observeOn(AndroidSchedulers.mainThread()) // 設(shè)置觀察者在主線程中進(jìn)行
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
s.request(150);
// 該設(shè)置僅影響觀察者線程中的requested跪解,卻不會(huì)影響的被觀察者中的FlowableEmitter.requested()的返回值
// 因?yàn)镕lowableEmitter.requested()的返回值 取決于RxJava內(nèi)部調(diào)用request(n),而該內(nèi)部調(diào)用會(huì)在一開始就調(diào)用request(128)
// 為什么是調(diào)用request(128)下面再講解
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "接收到了事件" + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
而在異步訂閱關(guān)系中签孔,反向控制的原理是:通過RxJava
內(nèi)部固定調(diào)用被觀察者線程中的request(n)
從而 反向控制被觀察者的發(fā)送事件速度
那么該什么時(shí)候調(diào)用被觀察者線程中的request(n)
& n
的值該是多少呢叉讥?請(qǐng)繼續(xù)往下看。
- 具體使用
關(guān)于RxJava
內(nèi)部調(diào)用request(n)(n = 128饥追、96图仓、0)
的邏輯如下:
至于為什么是調(diào)用
request(128)
&request(96)
&request(0)
,感興趣的讀者可自己閱讀Flowable
的源碼
- 代碼演示
下面我將用一個(gè)例子來演示該原理的邏輯
// 被觀察者:一共需要發(fā)送500個(gè)事件但绕,但真正開始發(fā)送事件的前提 = FlowableEmitter.requested()返回值 ≠ 0
// 觀察者:每次接收事件數(shù)量 = 48(點(diǎn)擊按鈕)
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "觀察者可接收事件數(shù)量 = " + emitter.requested());
boolean flag; //設(shè)置標(biāo)記位控制
// 被觀察者一共需要發(fā)送500個(gè)事件
for (int i = 0; i < 500; i++) {
flag = false;
// 若requested() == 0則不發(fā)送
while (emitter.requested() == 0) {
if (!flag) {
Log.d(TAG, "不再發(fā)送");
flag = true;
}
}
// requested() ≠ 0 才發(fā)送
Log.d(TAG, "發(fā)送了事件" + i + "救崔,觀察者可接收事件數(shù)量 = " + emitter.requested());
emitter.onNext(i);
}
}
}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()) // 設(shè)置被觀察者在io線程中進(jìn)行
.observeOn(AndroidSchedulers.mainThread()) // 設(shè)置觀察者在主線程中進(jìn)行
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
mSubscription = s;
// 初始狀態(tài) = 不接收事件;通過點(diǎn)擊按鈕接收事件
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "接收到了事件" + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
// 點(diǎn)擊按鈕才會(huì)接收事件 = 48 / 次
btn = (Button) findViewById(R.id.btn);
btn.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View view) {
mSubscription.request(48);
// 點(diǎn)擊按鈕 則 接收48個(gè)事件
}
});
整個(gè)流程 & 測(cè)試結(jié)果 請(qǐng)看下圖
5.3 采用背壓策略模式:BackpressureStrategy
5.3.1 背壓模式介紹
在Flowable的使用中捏顺,會(huì)被要求傳入背壓模式參數(shù)
- 面向?qū)ο螅横槍?duì)緩存區(qū)
- 作用:當(dāng)緩存區(qū)大小存滿六孵、被觀察者仍然繼續(xù)發(fā)送下1個(gè)事件時(shí),該如何處理的策略方式
緩存區(qū)大小存滿幅骄、溢出 = 發(fā)送事件速度 > 接收事件速度 的結(jié)果 = 發(fā)送 & 接收事件不匹配的結(jié)果
5.3.2 背壓模式類型
下面我將對(duì)每種模式逐一說明劫窒。
模式1:BackpressureStrategy.ERROR
- 問題:發(fā)送事件速度 > 接收事件 速度,即流速不匹配
具體表現(xiàn):出現(xiàn)當(dāng)緩存區(qū)大小存滿(默認(rèn)緩存區(qū)大小 = 128)拆座、被觀察者仍然繼續(xù)發(fā)送下1個(gè)事件時(shí)
- 處理方式:直接拋出異常
MissingBackpressureException
// 創(chuàng)建被觀察者Flowable
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
// 發(fā)送 129個(gè)事件
for (int i = 0;i< 129; i++) {
Log.d(TAG, "發(fā)送了事件" + i);
emitter.onNext(i);
}
emitter.onComplete();
}
}, BackpressureStrategy.ERROR) // 設(shè)置背壓模式 = BackpressureStrategy.ERROR
.subscribeOn(Schedulers.io()) // 設(shè)置被觀察者在io線程中進(jìn)行
.observeOn(AndroidSchedulers.mainThread()) // 設(shè)置觀察者在主線程中進(jìn)行
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "接收到了事件" + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
模式2:BackpressureStrategy.MISSING
- 問題:發(fā)送事件速度 > 接收事件 速度主巍,即流速不匹配
具體表現(xiàn)是:出現(xiàn)當(dāng)緩存區(qū)大小存滿(默認(rèn)緩存區(qū)大小 = 128)、被觀察者仍然繼續(xù)發(fā)送下1個(gè)事件時(shí)
- 處理方式:友好提示:緩存區(qū)滿了
// 創(chuàng)建被觀察者Flowable
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
// 發(fā)送 129個(gè)事件
for (int i = 0;i< 129; i++) {
Log.d(TAG, "發(fā)送了事件" + i);
emitter.onNext(i);
}
emitter.onComplete();
}
}, BackpressureStrategy.MISSING) // 設(shè)置背壓模式 = BackpressureStrategy.MISSING
.subscribeOn(Schedulers.io()) // 設(shè)置被觀察者在io線程中進(jìn)行
.observeOn(AndroidSchedulers.mainThread()) // 設(shè)置觀察者在主線程中進(jìn)行
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "接收到了事件" + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
模式3:BackpressureStrategy.BUFFER
- 問題:發(fā)送事件速度 > 接收事件 速度挪凑,即流速不匹配
具體表現(xiàn)是:出現(xiàn)當(dāng)緩存區(qū)大小存滿(默認(rèn)緩存區(qū)大小 = 128)孕索、被觀察者仍然繼續(xù)發(fā)送下1個(gè)事件時(shí)
- 處理方式:將緩存區(qū)大小設(shè)置成無限大
- 即 被觀察者可無限發(fā)送事件 觀察者,但實(shí)際上是存放在緩存區(qū)
- 但要注意內(nèi)存情況躏碳,防止出現(xiàn)OOM
// 創(chuàng)建被觀察者Flowable
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
// 發(fā)送 129個(gè)事件
for (int i = 1;i< 130; i++) {
Log.d(TAG, "發(fā)送了事件" + i);
emitter.onNext(i);
}
emitter.onComplete();
}
}, BackpressureStrategy.BUFFER) // 設(shè)置背壓模式 = BackpressureStrategy.BUFFER
.subscribeOn(Schedulers.io()) // 設(shè)置被觀察者在io線程中進(jìn)行
.observeOn(AndroidSchedulers.mainThread()) // 設(shè)置觀察者在主線程中進(jìn)行
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "接收到了事件" + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
可以接收超過原先緩存區(qū)大懈阈瘛(128)的事件數(shù)量了
模式4: BackpressureStrategy.DROP
- 問題:發(fā)送事件速度 > 接收事件 速度,即流速不匹配
具體表現(xiàn)是:出現(xiàn)當(dāng)緩存區(qū)大小存滿(默認(rèn)緩存區(qū)大小 = 128)、被觀察者仍然繼續(xù)發(fā)送下1個(gè)事件時(shí)
- 處理方式:超過緩存區(qū)大醒〖埂(128)的事件丟棄
如發(fā)送了150個(gè)事件杭抠,僅保存第1 - 第128個(gè)事件,第129 -第150事件將被丟棄
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
// 發(fā)送150個(gè)事件
for (int i = 0;i< 150; i++) {
Log.d(TAG, "發(fā)送了事件" + i);
emitter.onNext(i);
}
emitter.onComplete();
}
}, BackpressureStrategy.DROP) // 設(shè)置背壓模式 = BackpressureStrategy.DROP
.subscribeOn(Schedulers.io()) // 設(shè)置被觀察者在io線程中進(jìn)行
.observeOn(AndroidSchedulers.mainThread()) // 設(shè)置觀察者在主線程中進(jìn)行
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
mSubscription = s;
// 通過按鈕進(jìn)行接收事件
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "接收到了事件" + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
btn = (Button) findViewById(R.id.btn);
btn.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View view) {
mSubscription.request(128);
// 每次接收128個(gè)事件
}
});
被觀察者一下子發(fā)送了150個(gè)事件恳啥,點(diǎn)擊按鈕接收時(shí)觀察者接收了128個(gè)事件偏灿;再次點(diǎn)擊接收時(shí)卻無法接受事件,這說明超過緩存區(qū)大小的事件被丟棄了钝的。
模式5:BackpressureStrategy.LATEST
- 問題:發(fā)送事件速度 > 接收事件 速度翁垂,即流速不匹配
具體表現(xiàn)是:出現(xiàn)當(dāng)緩存區(qū)大小存滿(默認(rèn)緩存區(qū)大小 = 128)、被觀察者仍然繼續(xù)發(fā)送下1個(gè)事件時(shí)
- 處理方式:只保存最新(最后)事件硝桩,超過緩存區(qū)大醒夭隆(128)的事件丟棄
即如果發(fā)送了150個(gè)事件,緩存區(qū)里會(huì)保存129個(gè)事件(第1-第128 + 第150事件)
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0;i< 150; i++) {
Log.d(TAG, "發(fā)送了事件" + i);
emitter.onNext(i);
}
emitter.onComplete();
}
}, BackpressureStrategy.LATEST) // // 設(shè)置背壓模式 = BackpressureStrategy.LATEST
.subscribeOn(Schedulers.io()) // 設(shè)置被觀察者在io線程中進(jìn)行
.observeOn(AndroidSchedulers.mainThread()) // 設(shè)置觀察者在主線程中進(jìn)行
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
mSubscription = s;
// 通過按鈕進(jìn)行接收事件
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "接收到了事件" + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
btn = (Button) findViewById(R.id.btn);
btn.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View view) {
mSubscription.request(128);
// 每次接收128個(gè)事件
}
});
- 被觀察者一下子發(fā)送了150個(gè)事件碗脊,點(diǎn)擊按鈕接收時(shí)觀察者接收了128個(gè)事件啼肩;
- 再次點(diǎn)擊接收時(shí)卻接收到1個(gè)事件(第150個(gè)事件),這說明超過緩存區(qū)大小的事件僅保留最后的事件(第150個(gè)事件)
5.3.3 特別注意
在使用背壓策略模式的時(shí)候衙伶,有1種情況是需要注意的:
a. 背景
FLowable
可通過自己創(chuàng)建(如上面例子)祈坠,或通過其他方式自動(dòng)創(chuàng)建,如interval操作符
interval操作符簡介
- 作用:每隔1段時(shí)間就產(chǎn)生1個(gè)數(shù)字(Long型)矢劲,從0開始赦拘、1次遞增1,直至無窮大
- 默認(rèn)運(yùn)行在1個(gè)新線程上
- 與timer操作符區(qū)別:timer操作符可結(jié)束發(fā)送
b. 沖突
對(duì)于自身手動(dòng)創(chuàng)建
FLowable
的情況芬沉,可通過傳入背壓模式參數(shù)選擇背壓策略
(即上面描述的)可是對(duì)于自動(dòng)創(chuàng)建
FLowable
躺同,卻無法手動(dòng)傳入傳入背壓模式參數(shù),那么出現(xiàn)流速不匹配的情況下丸逸,該如何選擇 背壓模式呢蹋艺?
// 通過interval自動(dòng)創(chuàng)建被觀察者Flowable
// 每隔1ms將當(dāng)前數(shù)字(從0開始)加1,并發(fā)送出去
// interval操作符會(huì)默認(rèn)新開1個(gè)新的工作線程
Flowable.interval(1, TimeUnit.MILLISECONDS)
.observeOn(Schedulers.newThread()) // 觀察者同樣工作在一個(gè)新開線程中
.subscribe(new Subscriber<Long>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
mSubscription = s;
s.request(Long.MAX_VALUE); //默認(rèn)可以接收Long.MAX_VALUE個(gè)事件
}
@Override
public void onNext(Long aLong) {
Log.d(TAG, "onNext: " + aLong);
try {
Thread.sleep(1000);
// 每次延時(shí)1秒再接收事件
// 因?yàn)榘l(fā)送事件 = 延時(shí)1ms椭员,接收事件 = 延時(shí)1s车海,出現(xiàn)了發(fā)送速度 & 接收速度不匹配的問題
// 緩存區(qū)很快就存滿了128個(gè)事件笛园,從而拋出MissingBackpressureException異常隘击,請(qǐng)看下圖結(jié)果
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
c. 解決方案
RxJava 2.0
內(nèi)部提供 封裝了背壓策略模式的方法
onBackpressureBuffer()
onBackpressureDrop()
onBackpressureLatest()
默認(rèn)采用
BackpressureStrategy.ERROR
模式
具體使用如下:
Flowable.interval(1, TimeUnit.MILLISECONDS)
.onBackpressureBuffer() // 添加背壓策略封裝好的方法,此處選擇Buffer模式研铆,即緩存區(qū)大小無限制
.observeOn(Schedulers.newThread())
.subscribe(new Subscriber<Long>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
mSubscription = s;
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Long aLong) {
Log.d(TAG, "onNext: " + aLong);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
從而很好地解決了發(fā)送事件 & 接收事件 速度不匹配的問題埋同。
其余方法的作用類似于上面的說背壓模式參數(shù),此處不作過多描述棵红。
背壓策略模式小結(jié)
- 至此凶赁,對(duì)
RxJava 2.0
的背壓模式終于講解完畢 - 所有代碼Demo均存放在Carson_Ho的Github地址
6. 總結(jié)
本文主要對(duì)
Rxjava
的背壓模式知識(shí)進(jìn)行講解Carson帶你學(xué)RxJava系列文章:
入門
Carson帶你學(xué)Android:這是一篇清晰易懂的Rxjava入門教程
Carson帶你學(xué)Android:面向初學(xué)者的RxJava使用指南
Carson帶你學(xué)Android:RxJava2.0到底更新了什么?
原理
Carson帶你學(xué)Android:圖文解析RxJava原理
Carson帶你學(xué)Android:手把手帶你源碼分析RxJava
使用教程:操作符
Carson帶你學(xué)Android:RxJava操作符教程
Carson帶你學(xué)Android:RxJava創(chuàng)建操作符
Carson帶你學(xué)Android:RxJava功能性操作符
Carson帶你學(xué)Android:RxJava過濾操作符
Carson帶你學(xué)Android:RxJava組合/合并操作符
Carson帶你學(xué)Android:RxJava變換操作符
Carson帶你學(xué)Android:RxJava條件/布爾操作符
實(shí)戰(zhàn)
Carson帶你學(xué)Android:什么時(shí)候應(yīng)該使用Rxjava?(開發(fā)場(chǎng)景匯總)
Carson帶你學(xué)Android:RxJava線程控制(含實(shí)例講解)
Carson帶你學(xué)Android:圖文詳解RxJava背壓策略
Carson帶你學(xué)Android:RxJava虱肄、Retrofit聯(lián)合使用匯總(含實(shí)例教程)
Carson帶你學(xué)Android:優(yōu)雅實(shí)現(xiàn)網(wǎng)絡(luò)請(qǐng)求嵌套回調(diào)
Carson帶你學(xué)Android:網(wǎng)絡(luò)請(qǐng)求輪詢(有條件)
Carson帶你學(xué)Android:網(wǎng)絡(luò)請(qǐng)求輪詢(無條件)
Carson帶你學(xué)Android:網(wǎng)絡(luò)請(qǐng)求出錯(cuò)重連(結(jié)合Retrofit)
Carson帶你學(xué)Android:合并數(shù)據(jù)源
Carson帶你學(xué)Android:聯(lián)想搜索優(yōu)化
Carson帶你學(xué)Android:功能防抖
Carson帶你學(xué)Android:從磁盤/內(nèi)存緩存中獲取緩存數(shù)據(jù)
Carson帶你學(xué)Android:聯(lián)合判斷
歡迎關(guān)注Carson_Ho的簡書
不定期分享關(guān)于安卓開發(fā)的干貨致板,追求短、平咏窿、快斟或,但卻不缺深度。