1.前言
- 閱讀本文需要對Rxjava了解女责,如果還沒有了解或者使用過Rxjava的兄die們耳鸯,可以觀看我另外一篇
Android Rxjava:不一樣的詮釋進行學習湿蛔。 -
Rxjava背壓
:被觀察者發(fā)送事件的速度大于觀察者接收事件的速度時,觀察者內(nèi)會創(chuàng)建一個無限制大少的緩沖池存儲未接收的事件县爬,因此當存儲的事件越來越多時就會導致OOM的出現(xiàn)阳啥。(注:當subscribeOn與observeOn不為同一個線程時,被觀察者與觀察者內(nèi)存在不同時長耗時任務财喳,就會使發(fā)送與接收速度存在差異察迟。) - 背壓例子
public void backpressureSample(){
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
int i = 0;
while(true){
Thread.sleep(500);
i++;
e.onNext(i);
Log.i(TAG,"每500ms發(fā)送一次數(shù)據(jù):"+i);
}
}
}).subscribeOn(Schedulers.newThread())//使被觀察者存在獨立的線程執(zhí)行
.observeOn(Schedulers.newThread())//使觀察者存在獨立的線程執(zhí)行
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Thread.sleep(5000);
Log.e(TAG,"每5000m接收一次數(shù)據(jù):"+integer);
}
});
}
-
例子執(zhí)行效果
上述代碼執(zhí)行效果
backpressure.png - 通過上述例子可以大概了解背壓是如何產(chǎn)生,因此Rxjava2.0版本提供了 Flowable 解決背壓問題耳高。
- 本文章就是使用與分析 Flowable 是如何解決背壓問題扎瓶。
- 文章中實例 linhaojian的Github
2.目錄
目錄.png
3.簡介
簡介.png
_______________________________________________________________________________
4.使用與原理詳解
4.1 Flowable 與 Observable 的區(qū)別
-
flowable與observable對比.png
上圖可以很清楚看出二者的區(qū)別,其實Flowable
出來以上的區(qū)別之外泌枪,它其他所有使用與Observable完全一樣概荷。 -
Flowable
的create例子
public void flowable(){
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
for(int j = 0;j<=150;j++){
e.onNext(j);
Log.i(TAG," 發(fā)送數(shù)據(jù):"+j);
try{
Thread.sleep(50);
}catch (Exception ex){
}
}
}
},BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE); //觀察者設置接收事件的數(shù)量,如果不設置接收不到事件
}
@Override
public void onNext(Integer integer) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.e(TAG,"onNext : "+(integer));
}
@Override
public void onError(Throwable t) {
Log.e(TAG,"onError : "+t.toString());
}
@Override
public void onComplete() {
Log.e(TAG,"onComplete");
}
});
}
4.2 BackpressureStrategy媒體類
- 從Flowable源碼查看,緩存池默認大少為:128
public abstract class Flowable<T> implements Publisher<T> {
/** The default buffer size. */
static final int BUFFER_SIZE;
static {
BUFFER_SIZE = Math.max(1, Integer.getInteger("rx2.buffer-size", 128));
}
.....
}
- 通過上面的例子碌燕,我們可以看到create方法中的包含了一個BackpressureStrategy媒體類误证,其包含5種類型:
4.2.1. ERROR
-
把上面例子改為ERROR類型继薛,執(zhí)行結果如下:
error.png - 總結 :當被觀察者發(fā)送事件大于128時,觀察者拋出異常并終止接收事件雷厂,但不會影響被觀察者繼續(xù)發(fā)送事件惋增。
4.2.2. BUFFER
-
把上面例子改為BUFFER類型,執(zhí)行結果如下:
buffer.gif - 總結 :與Observable一樣存在背壓問題改鲫,但是接收性能比Observable低诈皿,因為BUFFER類型通過BufferAsyncEmitter添加了額外的邏輯處理,再發(fā)送至觀察者像棘。
4.2.3. DROP
-
把上面例子改為DROP類型稽亏,執(zhí)行結果如下:
drop.png - 總結 :每當觀察者接收128事件之后,就會丟棄部分事件缕题。
4.2.4. LATEST
-
把上面例子改為LATEST類型截歉,執(zhí)行結果如下:
laster.gif - 總結 :LATEST與DROP使用效果一樣,但LATEST會保證能接收最后一個事件烟零,而DROP則不會保證瘪松。
4.2.5. MISSING
-
把上面例子改為MISSING類型,執(zhí)行結果如下:
buffer.gif - 總結 :MISSING就是沒有采取背壓策略的類型锨阿,效果跟Obserable一樣宵睦。
- 在設置MISSING類型時,可以配合onBackPressure相關操作符使用墅诡,也可以到達上述其他類型的處理效果壳嚎。
4.3 onBackPressure相關操作符
- 使用例子:
Flowable.interval(50,TimeUnit.MILLISECONDS)
.onBackpressureDrop()//效果與Drop類型一樣
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.e(TAG,"onNext : "+(aLong));
}
});
- onBackpressureBuffer :與BUFFER類型一樣效果。
- onBackpressureDrop :與DROP類型一樣效果末早。
- onBackpressureLaster :與LASTER類型一樣效果烟馅。
4.4 request()
4.4.1 request(int count):設置接收事件的數(shù)量.
- 例子:
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
for(int j = 0;j<50;j++){
e.onNext(j);
Log.i(TAG," 發(fā)送數(shù)據(jù):"+j);
try{
Thread.sleep(50);
}catch (Exception ex){
}
}
}
},BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(10); //觀察者設置接收事件的數(shù)量,如果不設置接收不到事件
}
@Override
public void onNext(Integer integer) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.e(TAG,"onNext : "+(integer));
}
@Override
public void onError(Throwable t) {
Log.e(TAG,"onError : "+t.toString());
}
@Override
public void onComplete() {
Log.e(TAG,"onComplete");
}
});
request.png
4.4.2 request擴展使用
- request還可進行擴展使用,當遇到在接收事件時想追加接收數(shù)量(如:通信數(shù)據(jù)通過幾次接收然磷,驗證準確性的應用場景)郑趁,可以通過以下方式進行擴展:
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
for(int j = 0;j<50;j++){
e.onNext(j);
Log.i(TAG," 發(fā)送數(shù)據(jù):"+j);
try{
Thread.sleep(50);
}catch (Exception ex){
}
}
}
},BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.subscribe(new Subscriber<Integer>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
subscription = s;
s.request(10); //觀察者設置接收事件的數(shù)量,如果不設置接收不到事件
}
@Override
public void onNext(Integer integer) {
if(integer==5){
subscription.request(3);
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.e(TAG,"onNext : "+(integer));
}
@Override
public void onError(Throwable t) {
Log.e(TAG,"onError : "+t.toString());
}
@Override
public void onComplete() {
Log.e(TAG,"onComplete");
}
});
request擴展.png
總結:可以動態(tài)設置觀察者接收事件的數(shù)量,但不影響被觀察者繼續(xù)發(fā)送事件姿搜。
4.5 requested
- requested 與 request不是同一的函數(shù)穿撮,但它們都是屬于FlowableEmitter類里的方法,那么requested()是有什么作用呢痪欲,看看以下例子:
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
for(int j = 0;j<15;j++){
e.onNext(j);
Log.i(TAG,e.requested()+" 發(fā)送數(shù)據(jù):"+j);
try{
Thread.sleep(50);
}catch (Exception ex){
}
}
}
},BackpressureStrategy.BUFFER)
// .subscribeOn(Schedulers.newThread())
// .observeOn(Schedulers.newThread())
.subscribe(new Subscriber<Integer>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
subscription = s;
s.request(10); //觀察者設置接收事件的數(shù)量,如果不設置接收不到事件
}
@Override
public void onNext(Integer integer) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.e(TAG,"onNext : "+(integer));
}
@Override
public void onError(Throwable t) {
Log.e(TAG,"onError : "+t.toString());
}
@Override
public void onComplete() {
Log.e(TAG,"onComplete");
}
});
requested.png
- 從圖中我們可以發(fā)現(xiàn),requested打印的結果就是 剩余可接收的數(shù)量 ,它的作用就是可以檢測剩余可接收的事件數(shù)量攻礼。
5.總結
- 到此业踢,
Flowable
講解完畢。 - 如果喜歡我的分享礁扮,可以點擊 關注 或者 贊知举,你們支持是我分享的最大動力 瞬沦。
- linhaojian的Github
歡迎關注linhaojian_CSDN博客或者linhaojian_簡書!
不定期分享關于安卓開發(fā)的干貨雇锡。
寫技術文章初心
- 技術知識積累
- 技術知識鞏固
- 技術知識分享
- 技術知識交流