RxJava2 中的背壓
在RxJava2里摘能,引入了Flowable這個類來處理backpressure湾蔓,而Observable不包含backpressure處理洲劣。Flowable的三種Backpressure策略:
- BackpressureStrategy.BUFFER
onBackpressureBuffer是不丟棄數(shù)據(jù)的處理方式瓢颅。把上游收到的全部緩存下來碘饼,等下游來請求再發(fā)給下游熙兔。相當(dāng)于一個水庫。但上游太快艾恼,水庫(buffer)就會溢出住涉。 - BackpressureStrategy.DROP 與 BackpressureStrategy.LATEST
Drop 和Latest 類似,都會丟棄數(shù)據(jù)钠绍,下游通過request請求產(chǎn)生令牌給上游舆声,上游接收到多少令牌,就發(fā)送多少柳爽,當(dāng)令牌為0的時候媳握,上游開始丟棄數(shù)據(jù)。區(qū)別在于磷脯,drop直接丟棄數(shù)據(jù)不緩存數(shù)據(jù)蛾找。而latest緩存最新的一條數(shù)據(jù),當(dāng)上游收到令牌赵誓,就把緩存的上一條“最新”數(shù)據(jù)發(fā)送給下游打毛。
例如 :
Flowable<Integer> flowable =
Flowable.create((FlowableOnSubscribe<Integer>) e -> {
for (int i = 0; i < Integer.MAX_VALUE; i++) {
Log.d(TAG, "onNext : " + i);
e.onNext(i);
Thread.sleep(10);
}
}, BackpressureStrategy.DROP).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
Flowable以10毫秒一次派發(fā)數(shù)據(jù),注意我們讓Flowable和訂閱者運行在不同的線程俩功,這樣才能看出生產(chǎn)與消費在不同效率下時的差異性隘冲,如果Flowable和訂閱者在同一線程,背壓是沒什么意義的绑雄。假設(shè)訂閱他們的方法都是:
Subscription mSubscription;
flowable.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription subscription) {
mSubscription = subscription;
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
});
我們在onSubscribe
中保存了Subscription ,以后 方便我們可以在任何時候request 數(shù)據(jù)奥邮。我們添加一個按鈕万牺,以實現(xiàn)手動request數(shù)據(jù) 罗珍,代碼如下:
if(mSubscription != null) {
mSubscription.request(64);
}
我們一開始request 64個數(shù)據(jù),我們啟動Flowable后脚粟,隔一段時間才點擊request , log打印 0 ~ 63 :
D/SimpleExampleActivity: onNext: 0
D/SimpleExampleActivity: onNext: 1
D/SimpleExampleActivity: onNext: 2
...
D/SimpleExampleActivity: onNext: 63
D/SimpleExampleActivity: onNext: 64
隔一段時間第二次點擊request , log打印 64~ 127 :
D/SimpleExampleActivity: onNext: 64
D/SimpleExampleActivity: onNext: 65
D/SimpleExampleActivity: onNext: 66
...
D/SimpleExampleActivity: onNext: 126
D/SimpleExampleActivity: onNext: 127
隔一段時間第三次點擊request , log打印 1243~ 1306:
D/SimpleExampleActivity: onNext: 1243
D/SimpleExampleActivity: onNext: 1244
D/SimpleExampleActivity: onNext: 1245
...
D/SimpleExampleActivity: onNext: 1304
D/SimpleExampleActivity: onNext: 1305
D/SimpleExampleActivity: onNext: 1306
我們使用是BackpressureStrategy.DROP 覆旱, 與就是 直接丟棄數(shù)據(jù)不緩存數(shù)據(jù) 『宋蓿可是我們一開始隔了點時間再request時扣唱,還是打印從 0 ~ 127 , 這說明 Flowable 本身就會存儲緩存 128 個數(shù)據(jù)团南,超過128個后執(zhí)行我們的策略噪沙,也就是丟棄。所以 1243~ 1306 其實是我們在第二次點擊后吐根,重新緩存的128數(shù)據(jù)正歼。如果我們換成 BackpressureStrategy.BUFFER , 那么不管你點擊多少次拷橘,數(shù)據(jù)都是連續(xù)的局义,因為 BackpressureStrategy.BUFFER 策略會把數(shù)據(jù)一直放到內(nèi)存中,直到發(fā)生OutOfMemoryError冗疮。
我們現(xiàn)在修改request 的數(shù)目 萄唇,改成 每次 request 96 個 , 代碼如下 :
if(mSubscription != null) {
mSubscription.request(96);
}
第一次點擊request , log打印 0 ~ 95 , 沒什么問題
D/SimpleExampleActivity: onNext: 0
D/SimpleExampleActivity: onNext: 1
D/SimpleExampleActivity: onNext: 2
...
D/SimpleExampleActivity: onNext: 94
D/SimpleExampleActivity: onNext: 95
隔一段時間第二次點擊request , log打印 96 ~ 127 , 188 ~ 251 :
D/SimpleExampleActivity: onNext: 96
D/SimpleExampleActivity: onNext: 97
D/SimpleExampleActivity: onNext: 98
...
D/SimpleExampleActivity: onNext: 126
D/SimpleExampleActivity: onNext: 127
D/SimpleExampleActivity: onNext: 188
D/SimpleExampleActivity: onNext: 189
...
D/SimpleExampleActivity: onNext: 250
D/SimpleExampleActivity: onNext: 251
隔一段時間第三次點擊request , log打印 252 ~ 283 , 650 ~ 713:
D/SimpleExampleActivity: onNext: 252
D/SimpleExampleActivity: onNext: 253
...
D/SimpleExampleActivity: onNext: 282
D/SimpleExampleActivity: onNext: 283
D/SimpleExampleActivity: onNext: 650
D/SimpleExampleActivity: onNext: 651
...
D/SimpleExampleActivity: onNext: 712
D/SimpleExampleActivity: onNext: 713
我們可以看到第二次术幔、第三次時打印的數(shù)據(jù)出現(xiàn)了中斷的情況另萤。我們知道Flowable 默認會緩存 127個數(shù)據(jù),那么第一次點擊之后應(yīng)該剩下 128 - 96 = 32個 特愿, 所以第二次首先打印 96 ~ 127 仲墨, 之后再打印 188 ~ 251 64個數(shù)據(jù)。第三次又打印了 252 ~ 283 32個數(shù)據(jù)揍障。第二次打印中斷之后打印的 64個數(shù)據(jù) 加上 第三次打印中斷前打印的 32個目养,剛好是 96個數(shù)據(jù),也就是打印中斷的時間點的數(shù)據(jù)剛好是96個毒嫡。
這個96就是Flowable 重新去拉取緩存的限制癌蚁,這是在源碼上設(shè)定的,就是說首先緩存了 128個數(shù)據(jù)之后兜畸,被消費了96個數(shù)據(jù)時才會重新緩存努释。所以在第二次時,從127后就打印了 188咬摇,因為這個188是在第一次點擊之后就重新緩存了伐蒂。
總結(jié)
Flowable 有三種Backpressure策略,分別是BackpressureStrategy.BUFFER肛鹏、BackpressureStrategy.DROP 和 BackpressureStrategy.LATEST逸邦。默認會緩存 127個數(shù)據(jù)恩沛,被消費了96個數(shù)據(jù)后才會重新緩存。