RxJava
RxJava是響應(yīng)式程序設(shè)計(jì)的一種實(shí)現(xiàn)脂新。在響應(yīng)式程序設(shè)計(jì)中,當(dāng)數(shù)據(jù)到達(dá)的時(shí)候粗梭,消費(fèi)者做出響應(yīng)争便。響應(yīng)式編程可以將事件傳遞給注冊(cè)了的觀察者Observer。RxJava為觀察者模式提供了一種通用的實(shí)現(xiàn)断医,并且提供了豐富的操作符來處理數(shù)據(jù)流滞乙。它同時(shí)支持不同線程之間切換。
什么是響應(yīng)式編程
響應(yīng)式編程(Reactive)是一種基于異步數(shù)據(jù)流概念的編程模式鉴嗤。數(shù)據(jù)流就像一條河:它可以被觀測(cè)斩启,被過濾,被操作醉锅,或者為新的消費(fèi)者與另外一條流合并為一條新的流兔簇。
響應(yīng)式編程的一個(gè)關(guān)鍵概念是事件。事件可以被等待硬耍,可以觸發(fā)過程垄琐,也可以觸發(fā)其它事件。事件是唯一的以合適的方式將我們的現(xiàn)實(shí)世界映射到我們的軟件中:如果屋里太熱了我們就打開一扇窗戶默垄。同樣的此虑,當(dāng)我們的天氣app從服務(wù)端獲取到新的天氣數(shù)據(jù)后,我們需要更新app上展示天氣信息的UI口锭;汽車上的車道偏移系統(tǒng)探測(cè)到車輛偏移了正常路線就會(huì)提醒駕駛者糾正朦前,就是是響應(yīng)事件。
今天鹃操,響應(yīng)式編程最通用的一個(gè)場(chǎng)景是UI:我們的移動(dòng)App必須做出對(duì)網(wǎng)絡(luò)調(diào)用韭寸、用戶觸摸輸入和系統(tǒng)彈框的響應(yīng)。在這個(gè)世界上荆隘,軟件之所以是事件驅(qū)動(dòng)并響應(yīng)的是因?yàn)楝F(xiàn)實(shí)生活也是如此恩伺。
觀察者模式
大家可能都知道, RxJava 以觀察者模式為骨架椰拒,在 2.0 中依舊如此晶渠。不過此次更新中凰荚,出現(xiàn)了兩種觀察者模式:
- Observable ( 被觀察者 ) / Observer ( 觀察者 )
- Flowable (被觀察者)/ Subscriber (觀察者)
在 RxJava 2.x 中,Observable 用于訂閱 Observer褒脯,不再支持背壓(1.x 中可以使用背壓策略)便瑟,而 Flowable 用于訂閱 Subscriber , 是支持背壓(Backpressure)的番川。
接口變化
RxJava 2.x 擁有了新的特性到涂,其依賴于4個(gè)基礎(chǔ)接口,它們分別是:
- Publisher
- Subscriber
- Subscription
- Processor
其中最核心的莫過于 Publisher 和 Subscriber颁督。Publisher 可以發(fā)出一系列的事件践啄,而 Subscriber 負(fù)責(zé)和處理這些事件。其中用的比較多的自然是 Publisher
的 Flowable
沉御,它支持背壓屿讽。
Base Reactive Interfaces
可訂閱的對(duì)象在RxJava1中只有Observable一種,之前我們經(jīng)常會(huì)直接把數(shù)據(jù)源稱作Observable嚷节。而在RxJava2中擴(kuò)充成了4種 Observable聂儒、Flowable、Singles硫痰、Completable衩婚,因此在之后還是把他們統(tǒng)稱為數(shù)據(jù)源為宜。
public abstract class Flowable<T> implements Publisher<T> {
public abstract class Observable<T> implements ObservableSource<T> {
public abstract class Single<T> implements SingleSource<T> {
public abstract class Completable implements CompletableSource {
Single
和Observable效斑,F(xiàn)lowable一樣會(huì)發(fā)送數(shù)據(jù)非春,不同的是訂閱后只能接受到一次。普通Observable可以使用toSingle轉(zhuǎn)換:Observable.just(1).toSingle()
Completable
與Single類似缓屠,只能接受到完成(onComplete)和錯(cuò)誤(onError)奇昙。同樣也可以由普通的Observable轉(zhuǎn)換而來:Observable.just(1).toCompletable()
和Flowable的接口Publisher類似,Observable敌完、Single储耐、Completable也有類似的基類
public interface Publisher<T> {
void subscribe(Subscriber<? super T> s);
}
interface ObservableSource<T> {
void subscribe(Observer<? super T> observer);
}
interface SingleSource<T> {
void subscribe(SingleObserver<? super T> observer);
}
interface CompletableSource {
void subscribe(CompletableObserver observer);
}
Observable的創(chuàng)建
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
//執(zhí)行一些其他操作
//.............
//執(zhí)行完畢,觸發(fā)回調(diào)滨溉,通知觀察者
e.onNext("我來發(fā)射數(shù)據(jù)");
}
});
使用create( )創(chuàng)建Observable最基本的創(chuàng)建方式』拊埽可以看到,這里傳入了一個(gè) ObservableOnSubscribe對(duì)象作為參數(shù)哟旗,它的作用相當(dāng)于一個(gè)計(jì)劃表,當(dāng) Observable被訂閱的時(shí)候闸餐,ObservableOnSubscribe的subscribe()方法會(huì)自動(dòng)被調(diào)用,事件序列就會(huì)依照設(shè)定依次觸發(fā)(對(duì)于上面的代碼近尚,就是觀察者Observer 將會(huì)被調(diào)用一次 onNext())场勤。這樣歼跟,由被觀察者調(diào)用了觀察者的回調(diào)方法,就實(shí)現(xiàn)了由被觀察者向觀察者的事件傳遞留瞳,即觀察者模式。
Observer的創(chuàng)建
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
//觀察者接收到通知,進(jìn)行相關(guān)操作
public void onNext(String aLong) {
System.out.println("我接收到數(shù)據(jù)了");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
public interface Observer<T> {
void onSubscribe(@NonNull Disposable d);
void onNext(@NonNull T t);
void onError(@NonNull Throwable e);
void onComplete();
}
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
public interface SingleObserver<T> {
void onSubscribe(@NonNull Disposable d);
void onSuccess(@NonNull T t);
void onError(@NonNull Throwable e);
}
public interface CompletableObserver {
void onSubscribe(@NonNull Disposable d);
void onComplete();
void onError(@NonNull Throwable e);
}
Subscribe訂閱
observable.subscribe(observer);
subscribe()
有多個(gè)重載的方法:
public final Disposable subscribe() {}
public final Disposable subscribe(Consumer<? super T> onNext) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
public final void subscribe(Observer<? super T> observer) {}
- 不帶任何參數(shù)的
subscribe()
表示下游不關(guān)心任何事件,你上游盡管發(fā)你的數(shù)據(jù)去吧, 老子可不管你發(fā)什么. - 帶有一個(gè)
Consumer
參數(shù)的方法表示下游只關(guān)心onNext事件, 其他的事件我假裝沒看見
Observable的其他創(chuàng)建方式
just()方式
Observable<String> observable = Observable.just("Hello");
使用just( )骚秦,將為你創(chuàng)建一個(gè)Observable并自動(dòng)為你調(diào)用onNext( )發(fā)射數(shù)據(jù)她倘。通過just( )方式 直接觸發(fā)onNext(),just中傳遞的參數(shù)將直接在Observer的onNext()方法中接收到作箍。
fromIterable()方式
List<String> list = new ArrayList<String>();
for(int i =0;i<10;i++){
list.add("Hello"+i);
}
Observable<String> observable = Observable.fromIterable((Iterable<String>) list);
使用fromIterable()硬梁,遍歷集合,發(fā)送每個(gè)item胞得。相當(dāng)于多次回調(diào)onNext()方法荧止,每次傳入一個(gè)item。
注意:Collection接口是Iterable接口的子接口阶剑,所以所有Collection接口的實(shí)現(xiàn)類都可以作為Iterable對(duì)象直接傳入fromIterable()方法跃巡。
簡(jiǎn)便的觀察者模式
除了Observable(被觀察者)的創(chuàng)建之外,RxJava 2.x 還提供了多個(gè)函數(shù)式接口 牧愁,用于實(shí)現(xiàn)簡(jiǎn)便式的觀察者模式素邪。具體的函數(shù)式接口包括以下:
Observable.just("hello").subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
其中Consumer中的accept()方法接收一個(gè)來自O(shè)bservable的單個(gè)值。Consumer就是一個(gè)觀察者猪半。其他函數(shù)式接口可以類似應(yīng)用兔朦。
注意:Observable (被觀察者)只有在被Observer (觀察者)訂閱后才能執(zhí)行其內(nèi)部的相關(guān)邏輯
Cold Observable 和 Hot Observable
Hot Observable 無論有沒有 Subscriber 訂閱,事件始終都會(huì)發(fā)生办龄。當(dāng) Hot Observable 有多個(gè)訂閱者時(shí)烘绽,Hot Observable 與訂閱者們的關(guān)系是一對(duì)多的關(guān)系,可以與多個(gè)訂閱者共享信息俐填。
然而安接,Cold Observable 只有 Subscriber 訂閱時(shí),才開始執(zhí)行發(fā)射數(shù)據(jù)流的代碼。并且 Cold Observable 和 Subscriber 只能是一對(duì)一的關(guān)系盏檐,當(dāng)有多個(gè)不同的訂閱者時(shí)歇式,消息是重新完整發(fā)送的。也就是說對(duì) Cold Observable 而言胡野,有多個(gè)Subscriber的時(shí)候,他們各自的事件是獨(dú)立的龙巨。
盡管 Cold Observable 很好旨别,但是對(duì)于某些事件不確定何時(shí)發(fā)生以及不確定 Observable 發(fā)射的元素?cái)?shù)量秸弛,那還得使用 Hot Observable递览。比如:UI交互的事件绞铃、網(wǎng)絡(luò)環(huán)境的變化憎兽、地理位置的變化纯命、服務(wù)器推送消息的到達(dá)等等。
Cold Observable
只有當(dāng)有訂閱者訂閱的時(shí)候揪阿, Cold Observable 才開始執(zhí)行發(fā)射數(shù)據(jù)流的代碼吴裤。并且每個(gè)訂閱者訂閱的時(shí)候都獨(dú)立的執(zhí)行一遍數(shù)據(jù)流代碼麦牺。 Observable.interval 就是一個(gè) Cold Observable剖膳。每一個(gè)訂閱者都會(huì)獨(dú)立的收到他們的數(shù)據(jù)流吱晒。
Observable 的 just叹话、creat渣刷、range、fromXXX 等操作符都能生成Cold Observable瞭吃。
Hot Observable
Hot observable 不管有沒有訂閱者訂閱歪架,他們創(chuàng)建后就開發(fā)發(fā)射數(shù)據(jù)流和蚪。 一個(gè)比較好的示例就是 鼠標(biāo)事件攒霹。 不管系統(tǒng)有沒有訂閱者監(jiān)聽鼠標(biāo)事件催束,鼠標(biāo)事件一直在發(fā)生抠刺,當(dāng)有訂閱者訂閱后速妖,從訂閱后的事件開始發(fā)送給這個(gè)訂閱者,之前的事件這個(gè)訂閱者是接受不到的;如果訂閱者取消訂閱了烘跺,鼠標(biāo)事件依然繼續(xù)發(fā)射。
Cold Observable 如何轉(zhuǎn)換成 Hot Observable砌左?
- 使用publish屁擅,生成 ConnectableObservable
使用 publish 操作符派歌,可以讓 Cold Observable 轉(zhuǎn)換成 Hot Observable胶果。它將原先的 Observable 轉(zhuǎn)換成 ConnectableObservable早抠。
- 使用Subject/Processor
Subject 和 Processor 的作用是相同的蕊连。Processor 是 RxJava2.x 新增的類甘苍,繼承自 Flowable 支持背壓控制羊赵。而 Subject 則不支持背壓控制。
public abstract class Subject<T> extends Observable<T> implements Observer<T> {
當(dāng) Subject 作為 Subscriber 時(shí)靡挥,它可以訂閱目標(biāo) Cold Observable 使對(duì)方開始發(fā)送事件跋破。同時(shí)它又作為Observable 轉(zhuǎn)發(fā)或者發(fā)送新的事件,讓 Cold Observable 借助 Subject 轉(zhuǎn)換為 Hot Observable租幕。
注意劲绪,Subject 并不是線程安全的贾富,如果想要其線程安全需要調(diào)用toSerialized()方法。(在RxJava1.x的時(shí)代還可以用 SerializedSubject 代替 Subject淑际,但是在RxJava2.x以后SerializedSubject不再是一個(gè)public class)
然而霍骄,很多基于 EventBus 改造的 RxBus 并沒有這么做,這樣的做法是非常危險(xiǎn)的咱娶,因?yàn)闀?huì)遇到并發(fā)的情況膘侮。
Hot Observable 如何轉(zhuǎn)換成 Cold Observable琼了?
- ConnectableObservable的refCount操作符
RefCount操作符把從一個(gè)可連接的 Observable 連接和斷開的過程自動(dòng)化了雕薪。它操作一個(gè)可連接的Observable所袁,返回一個(gè)普通的Observable燥爷。當(dāng)?shù)谝粋€(gè)訂閱者訂閱這個(gè)Observable時(shí),RefCount連接到下層的可連接Observable暖璧。RefCount跟蹤有多少個(gè)觀察者訂閱它,直到最后一個(gè)觀察者完成才斷開與下層可連接Observable的連接愤惰。
- Observable的share操作符
share操作符封裝了publish().refCount()調(diào)用
Flowable的產(chǎn)生
在原來的RxJava 1.x版本中并沒有Flowable的存在,Backpressure問題是由Observable來處理的商模。在RxJava 2.x中對(duì)于backpressure的處理進(jìn)行了改動(dòng)响疚,為此將原來的Observable拆分成了新的Observable和Flowable忿晕,同時(shí)其他相關(guān)部分也同時(shí)進(jìn)行了拆分践盼。原先的Observable已經(jīng)不具備背壓處理能力咕幻。
被觀察者發(fā)送消息十分迅速以至于觀察者不能及時(shí)的響應(yīng)這些消息肄程。
背壓是指在異步場(chǎng)景中蓝厌,被觀察者發(fā)送事件速度遠(yuǎn)快于觀察者的處理速度的情況下,一種告訴上游的被觀察者降低發(fā)送速度的策略崎苗。簡(jiǎn)而言之,背壓是流速控制的一種策略肌蜻。
處理Backpressure的策略
處理Backpressure的策略僅僅是處理Subscriber接收事件的方式蒋搜,并不影響Flowable發(fā)送事件的方法豆挽。即使采用了處理Backpressure的策略,F(xiàn)lowable原來以什么樣的速度產(chǎn)生事件锰镀,現(xiàn)在還是什么樣的速度不會(huì)變化憾筏,主要處理的是Subscriber接收事件的方式氧腰。
什么情況下才會(huì)產(chǎn)生Backpressure問題容贝?
- 如果生產(chǎn)者和消費(fèi)者在一個(gè)線程的情況下,無論生產(chǎn)者的生產(chǎn)速度有多快锻狗,每生產(chǎn)一個(gè)事件都會(huì)通知消費(fèi)者轻纪,等待消費(fèi)者消費(fèi)完畢刻帚,再生產(chǎn)下一個(gè)事件崇众。所以在這種情況下顷歌,根本不存在Backpressure問題。即同步情況下芹扭,Backpressure問題不存在舱卡。
- 如果生產(chǎn)者和消費(fèi)者不在同一線程的情況下轮锥,如果生產(chǎn)者的速度大于消費(fèi)者的速度交胚,就會(huì)產(chǎn)生Backpressure問題蝴簇。即異步情況下熬词,Backpressure問題才會(huì)存在互拾。
ERROR
這種方式會(huì)在產(chǎn)生Backpressure問題的時(shí)候直接拋出一個(gè)異常,這個(gè)異常就是著名的MissingBackpressureException颜矿。
Flowable<Integer> flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emit 1");
emitter.onNext(1);
Log.d(TAG, "emit 2");
emitter.onNext(2);
Log.d(TAG, "emit 3");
emitter.onNext(3);
Log.d(TAG, "emit complete");
emitter.onComplete();
}
}, BackpressureStrategy.ERROR); //增加了一個(gè)參數(shù)
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
};
flowable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
上述代碼創(chuàng)建了一個(gè)Flowable(被觀察者)和一個(gè)Subscriber(觀察者)。不同的是 onSubscribe(Subscription s)中傳給我們的不再是Disposable了, 而是Subscription箍铭。然而Subscription也可以用于切斷觀察者與被觀察者之間的聯(lián)系诈火,調(diào)用Subscription.cancel()方法便可冷守。 不同的地方在于Subscription增加了一個(gè)void request(long n)方法, 這個(gè)方法有什么用呢, 在上面的代碼中也有這么一句代碼:
s.request(Long.MAX_VALUE);
這個(gè)方法就是用來向生產(chǎn)者申請(qǐng)可以消費(fèi)的事件數(shù)量教沾。這樣我們便可以根據(jù)本身的消費(fèi)能力進(jìn)行消費(fèi)事件授翻。
當(dāng)調(diào)用了request()方法后,生產(chǎn)者便發(fā)送對(duì)應(yīng)數(shù)量的事件供消費(fèi)者消費(fèi)巡语。
這是因?yàn)镕lowable在設(shè)計(jì)的時(shí)候采用了一種新的思路也就是響應(yīng)式拉取的方式,你要求多少,我便傳給你多少枢赔。
注意:如果不顯示調(diào)用request就表示消費(fèi)能力為0拥知。
雖然并不限制向request()方法中傳入任意數(shù)字速梗,但是如果消費(fèi)者并沒有這么多的消費(fèi)能力襟齿,依舊會(huì)造成資源浪費(fèi)位隶,最后產(chǎn)生OOM钓试。形象點(diǎn)就是不能打腫臉充胖子。而ERROR策略就避免了這種情況的出現(xiàn)糠睡。
在異步調(diào)用時(shí)狈孔,RxJava中有個(gè)緩存池,用來緩存消費(fèi)者處理不了暫時(shí)緩存下來的數(shù)據(jù)嫁赏,緩存池的默認(rèn)大小為128潦蝇,即只能緩存128個(gè)事件攘乒。無論request()中傳入的數(shù)字比128大或小则酝,緩存池中在剛開始都會(huì)存入128個(gè)事件沽讹。當(dāng)然如果本身并沒有這么多事件需要發(fā)送爽雄,則不會(huì)存128個(gè)事件盲链。
在ERROR策略下刽沾,如果緩存池溢出,就會(huì)立刻拋出MissingBackpressureException異常监氢。
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 129; i++) {
Log.d(TAG, "emit " + i);
emitter.onNext(i);
}
}
}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
mSubscription = s;
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
}
});
讓Flowable發(fā)送129個(gè)事件,而Subscriber一個(gè)也不處理(如果不顯示調(diào)用request就表示消費(fèi)能力為0)泽谨,就產(chǎn)生了異常吧雹。
因此雄卷,ERROR即保證在異步操作中妒潭,事件累積不能超過128杜耙,超過即出現(xiàn)異常佑女。消費(fèi)者不能再接收事件了团驱,但生產(chǎn)者并不會(huì)停止嚎花。
BUFFER
所謂BUFFER就是把RxJava中默認(rèn)的只能存128個(gè)事件的緩存池?fù)Q成一個(gè)大的緩存池紊选,支持存很多很多的數(shù)據(jù)兵罢。
這樣,消費(fèi)者通過request()即使傳入一個(gè)很大的數(shù)字此蜈,生產(chǎn)者也會(huì)生產(chǎn)事件,并將處理不了的事件緩存跺嗽。
但是這種方式任然比較消耗內(nèi)存战授,除非是我們比較了解消費(fèi)者的消費(fèi)能力舔庶,能夠把握具體情況,不會(huì)產(chǎn)生OOM陈醒。
總之BUFFER要慎用。
DROP
看名字就可以了解其作用:當(dāng)消費(fèi)者處理不了事件瞧甩,就丟棄钉跷。
消費(fèi)者通過request()傳入其需求n,然后生產(chǎn)者把n個(gè)事件傳遞給消費(fèi)者供其消費(fèi)肚逸。其他消費(fèi)不掉的事件就丟掉爷辙。
mFlowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; ; i++) {
emitter.onNext(i);
}
}
}, BackpressureStrategy.DROP);
mSubscriber = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
mSubscription = s;
s.request(50);
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
}
};
}
public void start(View view){
mFlowable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(mSubscriber);
}
public void consume(View view){
mSubscription.request(50);
}
生產(chǎn)者一次性傳入128個(gè)事件進(jìn)入緩存池血当。點(diǎn)擊“開始”按鈕箩退,消費(fèi)了50個(gè)啥刻。然后第一次點(diǎn)擊“消費(fèi)”按鈕,又消費(fèi)了50個(gè),第二次點(diǎn)擊“消費(fèi)”按鈕,再次消費(fèi)50個(gè)。然而此時(shí)原來的128個(gè)緩存只剩下28個(gè)了,所以先消費(fèi)掉28個(gè),然后剩下22個(gè)是后來傳入的(其實(shí)后來的是在消費(fèi)了96個(gè)后傳入,并一次性在緩存池中又傳入了96個(gè),具體可以看源碼署照,這里不解釋了)禁荸。
LATEST
LATEST與DROP功能基本一致映砖。
消費(fèi)者通過request()傳入其需求n,然后生產(chǎn)者把n個(gè)事件傳遞給消費(fèi)者供其消費(fèi)。其他消費(fèi)不掉的事件就丟掉。
唯一的區(qū)別就是LATEST總能使消費(fèi)者能夠接收到生產(chǎn)者產(chǎn)生的最后一個(gè)事件。
還是以上述例子展示滔悉,唯一的區(qū)別就是Flowable不再無限發(fā)事件歉提,只發(fā)送1000000個(gè)。
上述例子Flowable對(duì)象的獲取都是通過create()獲取的肖方,自然可以通過BackpressureStrategy.LATEST之類的方式指定處理背壓的策略艰垂。如果Flowable對(duì)象不是自己創(chuàng)建的峦树,可以采用onBackpressureBuffer()葬馋、onBackpressureDrop()蒋院、onBackpressureLatest()的方式指定踏枣。
Flowable.just(1).onBackpressureBuffer()
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
}
});
背壓和流量控制
Flow Control有哪些思路呢?
- 背壓(Backpressure)
- 節(jié)流(Throttling)
- 打包處理
- 調(diào)用棧阻塞(Callstack blocking)
Flow Control的幾種思路
背壓(Backpressure)
Backpressure泼菌,也稱為Reactive Pull虐块,就是下游需要多少(具體是通過下游的request請(qǐng)求指定需要多少)喉悴,上游就發(fā)送多少吟宦。這有點(diǎn)類似于TCP里的流量控制,接收方根據(jù)自己的接收窗口的情況來控制接收速率,并通過反向的ACK包來控制發(fā)送方的發(fā)送速率晌梨。
這種方案只對(duì)于所謂的cold Observable有效。cold Observable指的是那些允許降低速率的發(fā)送源癌刽,比如兩臺(tái)機(jī)器傳一個(gè)文件,速率可大可小确虱,即使降低到每秒幾個(gè)字節(jié)故黑,只要時(shí)間足夠長扳炬,還是能夠完成的。相反的例子是音視頻直播圈纺,數(shù)據(jù)速率低于某個(gè)值整個(gè)功能就沒法用了(這種就屬于hot Observable了)。
節(jié)流(Throttling)
節(jié)流(Throttling)所禀,說白了就是丟棄邢锯。消費(fèi)不過來翎冲,就處理其中一部分,剩下的丟棄。還是舉音視頻直播的例子欣舵,在下游處理不過來的時(shí)候缘圈,就需要丟棄數(shù)據(jù)包。
而至于處理哪些和丟棄哪些數(shù)據(jù)辨液,就有不同的策略榜揖。主要有三種策略:
- sample (也叫throttleLast)
- throttleFirst
- debounce (也叫throttleWithTimeout)
從細(xì)的方面分別解釋一下。
sample威兜,采樣销斟。類比一下音頻采樣,8kHz的音頻就是每125微秒采一個(gè)值椒舵。sample可以配置成蚂踊,比如每100毫秒采樣一個(gè)值,但100毫秒內(nèi)上游可能過來很多值笔宿,選哪個(gè)值呢犁钟,就是選最后那個(gè)值。所以它也叫throttleLast泼橘。
打包處理
打包就是把上游來的小包裹打成大包裹涝动,分發(fā)到下游。這樣下游需要處理的包裹的個(gè)數(shù)就減少了炬灭。RxJava中提供了兩類這樣的機(jī)制:buffer和window醋粟。
調(diào)用棧阻塞(Callstack blocking)
這是一種特殊情況,阻塞住整個(gè)調(diào)用棧(Callstack blocking)担败。之所以說這是一種特殊情況昔穴,是因?yàn)檫@種方式只適用于整個(gè)調(diào)用鏈都在一個(gè)線程上同步執(zhí)行的情況,這要求中間的各個(gè)operator都不能啟動(dòng)新的線程提前。在平常使用中這種應(yīng)該是比較少見的吗货,因?yàn)槲覀兘?jīng)常使用subscribeOn或observeOn來切換執(zhí)行線程,而且有些復(fù)雜的operator本身也會(huì)在內(nèi)部啟動(dòng)新的線程來處理狈网。另外宙搬,如果真的出現(xiàn)了完全同步的調(diào)用鏈,前面的另外三種Flow Control思路仍然可能是適用的拓哺,只不過這種阻塞的方式更簡(jiǎn)單勇垛,不需要額外的支持。
這里舉個(gè)例子把調(diào)用棧阻塞和前面的Backpressure比較一下士鸥∠泄拢“調(diào)用棧阻塞”相當(dāng)于很多車行駛在盤山公路上,而公路只有一條車道烤礁。那么排在最前面的第一輛車就擋住了整條路讼积,后面的車也只能排在后面。而“Backpressure”相當(dāng)于銀行辦業(yè)務(wù)時(shí)的窗口叫號(hào)脚仔,窗口主動(dòng)叫某個(gè)號(hào)過去(相當(dāng)于請(qǐng)求)勤众,那個(gè)人才過去辦理。
如何讓Observable支持Backpressure鲤脏?
在RxJava 1.x中们颜,有些Observable是支持Backpressure的吕朵,而有些不支持。但不支持Backpressure的Observable可以通過一些operator來轉(zhuǎn)化成支持Backpressure的Observable窥突。這些operator包括:
- onBackpressureBuffer
- onBackpressureDrop
- onBackpressureLatest
- onBackpressureBlock(已過期)
它們轉(zhuǎn)化成的Observable分別具有不同的Backpressure策略努溃。
而在RxJava 2.x中,Observable不再支持Backpressure阻问,而是改用Flowable來專門支持Backpressure茅坛。上面提到的四種operator的前三種分別對(duì)應(yīng)Flowable的三種Backpressure策略:
- BackpressureStrategy.BUFFER
- BackpressureStrategy.DROP
- BackpressureStrategy.LATEST