前言:Rxjava是通過觀察者模式設(shè)計的異步任務(wù)框架,他的有點在于簡潔性,不是代碼的簡潔性,而是邏輯的簡潔性,隨著項目的中異步任務(wù)邏輯越來越復(fù)雜,Rxjava可以一直保持代碼邏輯的簡潔,不會像handler,ThreadHandler這些在幾次線程間切換以后可能都已經(jīng)暈頭轉(zhuǎn)向了.Rxjava提供了多種類的操作符,比如ObserverOn可以為每次業(yè)務(wù)的處理的處理制定線程,flatmap這種操作符來幫助我們處理循環(huán)遍歷的問題,總的來說rxjava就是時代碼簡潔的異步任務(wù)框架.
但是在rxjava1.x版本中存在一個比較深的坑就是,有些操作符支持熬粗,背壓叶眉,有些操作符不支持背壓米罚,當我們使用不支持背壓的操作符時會報出rx.exceptions.MissingBackpressureException,當我們不理解Backpressure概念時導(dǎo)致我們很難處理這種錯誤嘱支。下面我們就來學(xué)習(xí)rxjava中背壓的概念。
Backpressure
Rx 中的數(shù)據(jù)流是從一個地方發(fā)射到另外一個地方。每個地方處理數(shù)據(jù)的速度是不一樣的零抬。如果生產(chǎn)者發(fā)射數(shù)據(jù)的速度比消費者處理的快會出現(xiàn)什么情況?在同步操作中宽涌,這不是個問題平夜,例如:
Observable<Integer> producer = Observable.create(o -> {
o.onNext(1);
o.onNext(2);
o.onCompleted();
});
// Consume
producer.subscribe(i -> {
try {
Thread.sleep(1000);
System.out.println(i);
} catch (Exception e) { }
});
雖然上面的消費者處理數(shù)據(jù)的速度慢,但是由于是同步調(diào)用的卸亮,所以當 o.onNext(1) 執(zhí)行后忽妒,一直阻塞到消費者處理完才執(zhí)行 o.onNext(2)。 但是生產(chǎn)者和消費者異步處理的情況很常見兼贸。如果是在異步的情況下會出現(xiàn)什么情況呢段直?
在傳統(tǒng)的 pull 模型中,當消費者請求數(shù)據(jù)的時候溶诞,如果生產(chǎn)者比較慢柬甥,則消費者會阻塞等待辛慰。如果生產(chǎn)者比較快猫妙,則生產(chǎn)者會等待消費者處理完后再生產(chǎn)新的數(shù)據(jù)串绩。
而 Rx 為 push 模型。 在 Rx 中枉圃,只要生產(chǎn)者數(shù)據(jù)好了就發(fā)射出去了功茴。如果生產(chǎn)者比較慢,則消費者就會等待新的數(shù)據(jù)到來讯蒲。如果生產(chǎn)者快痊土,則就會有很多數(shù)據(jù)發(fā)射給消費者,而不管消費者當前有沒有能力處理數(shù)據(jù)墨林。這樣會導(dǎo)致一個問題赁酝,例如:
Observable.interval(1, TimeUnit.MILLISECONDS)
.observeOn(Schedulers.newThread())
.subscribe(
i -> {
System.out.println(i);
try {
Thread.sleep(100);
} catch (Exception e) { }
},
System.out::println);
結(jié)果;
0
1
rx.exceptions.MissingBackpressureException
上面的 MissingBackpressureException 告訴我們,生產(chǎn)者太快了旭等,我們的操作函數(shù)無法處理這種情況酌呆。
解決:
RxJava 實現(xiàn)了一種通過 Subscriber 來通知 Observable 發(fā)射數(shù)據(jù)的方式。Subscriber 有個函數(shù) request(n)搔耕,調(diào)用該函數(shù)用來通知 Observable 現(xiàn)在 Subscriber 準備接受下面 n 個數(shù)據(jù)了隙袁。在 Subscriber 的 onStart 函數(shù)里面調(diào)用 request 函數(shù)則就開啟了reactive pull backpressure痰娱。這并不是傳統(tǒng)的 pull 模型,并不會阻塞調(diào)用菩收。只是 Subscriber 通知 Observable 當前 Subscriber 的處理能力梨睁。 通過調(diào)用 request 可以發(fā)射更多的數(shù)據(jù)。
觀察者可以根據(jù)自身實際情況按需拉取數(shù)據(jù)娜饵,而不是被動接收(也就相當于告訴上游觀察者把速度慢下來)坡贺,最終實現(xiàn)了上游被觀察者發(fā)送事件的速度的控制,實現(xiàn)了背壓的策略箱舞。
class MySubscriber extends Subscriber<T> {
@Override
public void onStart() {
request(1); //要在onStart中通知被觀察者先發(fā)送一個事件
}
@Override
public void onCompleted() {
...
}
@Override
public void onError(Throwable e) {
...
}
@Override
public void onNext(T n) {
...
request(1); //處理完畢之后遍坟,在通知被觀察者發(fā)送下一個事件
}
}
Observable observable=Observable.range(1,100000);
observable.observeOn(Schedulers.newThread())
.subscribe(new MySubscriber());
在 onStart 函數(shù)中調(diào)用 request(1) 開啟了 backpressure 模式,告訴 Observable 一次只發(fā)射一個數(shù)據(jù)晴股。在 onNext 里面處理完該數(shù)據(jù)后愿伴,可以請求下一個數(shù)據(jù)。通過 quest(Long.MAX_VALUE) 可以取消 backpressure 模式电湘。
實際上隔节,在上面的代碼中,你也可以不需要調(diào)用request(n)方法去拉取數(shù)據(jù)胡桨,程序依然能完美運行官帘,這是因為range –> observeOn,這一段中間過程本身就是響應(yīng)式拉取數(shù)據(jù)瞬雹,observeOn這個操作符內(nèi)部有一個緩沖區(qū)昧谊,Android環(huán)境下長度是16,它會告訴range最多發(fā)送16個事件酗捌,充滿緩沖區(qū)即可呢诬。不過話說回來,在觀察者中使用request(n)這個方法可以使背壓的策略表現(xiàn)得更加直觀胖缤,更便于理解尚镰。
如果你足夠細心,會發(fā)現(xiàn)哪廓,在開頭展示異常情況的代碼中狗唉,使用的是interval這個操作符,但是在這里使用了range操作符涡真,為什么呢分俯?
這是因為interval操作符本身并不支持背壓策略,它并不響應(yīng)request(n)哆料,也就是說缸剪,它發(fā)送事件的速度是不受控制的,而range這類操作符是支持背壓的东亦,它發(fā)送事件的速度可以被控制杏节。
Backpressure 策略
很多 Rx 操作函數(shù)內(nèi)部都使用了 backpressure 從而避免過多的數(shù)據(jù)填滿內(nèi)部的隊列。這樣處理慢的消費者就會把這種情況傳遞給前面的消費者,前面的消費者開始緩沖數(shù)據(jù)直到他也緩存滿為止再告訴他前面的消費者奋渔。Backpressure 并沒有消除這種情況镊逝。只是讓錯誤延遲發(fā)生,我們還是需要處理這種情況嫉鲸。
Rx 中有操作函數(shù)可以用來處理這種消費者處理不過來的情況蹋半。
onBackpressureBuffer
onBackpressureBuffer 會緩存所有當前無法消費的數(shù)據(jù),直到 Observer 可以處理為止充坑。
Observable.interval(1, TimeUnit.MILLISECONDS)
.onBackpressureBuffer(1000)
.observeOn(Schedulers.newThread())
.subscribe(
i -> {
System.out.println(i);
try {
Thread.sleep(100);
} catch (Exception e) { }
},
System.out::println
);
上面的示例减江,生產(chǎn)者比消費者快 100 倍。使用 1000個緩沖來處理這種消費者比較慢的情況捻爷。當消費者消費 11個數(shù)據(jù)的時候辈灼,緩沖區(qū)滿了,生產(chǎn)者生產(chǎn)了 1100個數(shù)據(jù)也榄。數(shù)據(jù)流就拋出異常了巡莹。
onBackpressureDrop
如果消費者無法處理數(shù)據(jù),則 onBackpressureDrop 就把該數(shù)據(jù)丟棄了甜紫。
Observable.interval(1, TimeUnit.MILLISECONDS)
.onBackpressureDrop()
.observeOn(Schedulers.newThread())
.subscribe(
i -> {
System.out.println(i);
try {
Thread.sleep(100);
} catch (Exception e) { }
},
System.out::println);
結(jié)果:
0
1
2
...
126
127
12861
12862
...
這個示例中降宅,前面 128 個數(shù)據(jù)正常的被處理的,這是應(yīng)為 observeOn 在切換線程的時候囚霸, 使用了一個 128 個數(shù)據(jù)的小緩沖腰根。
在RxJava1.X中,背壓的設(shè)計并不十分完美拓型。而是希望你對背壓有一個全面清晰的認識额嘿,對于它在RxJava1.X中的設(shè)計缺陷有所了解即可×哟欤可喜的是册养,RxJava2.X中解決了背壓的問題,推出了Flowable(Observable在RxJava2.0中新的實現(xiàn))压固,而且其中的操作符全部都實現(xiàn)了背壓