RxJava學(xué)習(xí)筆記

RxJava

RxJava是響應(yīng)式程序設(shè)計(jì)的一種實(shí)現(xiàn)脂新。在響應(yīng)式程序設(shè)計(jì)中,當(dāng)數(shù)據(jù)到達(dá)的時(shí)候粗梭,消費(fèi)者做出響應(yīng)争便。響應(yīng)式編程可以將事件傳遞給注冊(cè)了的觀察者Observer。RxJava為觀察者模式提供了一種通用的實(shí)現(xiàn)断医,并且提供了豐富的操作符來處理數(shù)據(jù)流滞乙。它同時(shí)支持不同線程之間切換。

什么是響應(yīng)式編程

響應(yīng)式編程(Reactive)是一種基于異步數(shù)據(jù)流概念的編程模式鉴嗤。數(shù)據(jù)流就像一條河:它可以被觀測(cè)斩启,被過濾,被操作醉锅,或者為新的消費(fèi)者與另外一條流合并為一條新的流兔簇。
響應(yīng)式編程的一個(gè)關(guān)鍵概念是事件。事件可以被等待硬耍,可以觸發(fā)過程垄琐,也可以觸發(fā)其它事件。事件是唯一的以合適的方式將我們的現(xiàn)實(shí)世界映射到我們的軟件中:如果屋里太熱了我們就打開一扇窗戶默垄。同樣的此虑,當(dāng)我們的天氣app從服務(wù)端獲取到新的天氣數(shù)據(jù)后,我們需要更新app上展示天氣信息的UI口锭;汽車上的車道偏移系統(tǒng)探測(cè)到車輛偏移了正常路線就會(huì)提醒駕駛者糾正朦前,就是是響應(yīng)事件。
今天鹃操,響應(yīng)式編程最通用的一個(gè)場(chǎng)景是UI:我們的移動(dòng)App必須做出對(duì)網(wǎng)絡(luò)調(diào)用韭寸、用戶觸摸輸入和系統(tǒng)彈框的響應(yīng)。在這個(gè)世界上荆隘,軟件之所以是事件驅(qū)動(dòng)并響應(yīng)的是因?yàn)楝F(xiàn)實(shí)生活也是如此恩伺。

觀察者模式

大家可能都知道, RxJava 以觀察者模式為骨架椰拒,在 2.0 中依舊如此晶渠。不過此次更新中凰荚,出現(xiàn)了兩種觀察者模式:

  • Observable ( 被觀察者 ) / Observer ( 觀察者 )
  • Flowable (被觀察者)/ Subscriber (觀察者)

在 RxJava 2.x 中,Observable 用于訂閱 Observer褒脯,不再支持背壓(1.x 中可以使用背壓策略)便瑟,而 Flowable 用于訂閱 Subscriber , 是支持背壓(Backpressure)的番川。

接口變化

RxJava 2.x 擁有了新的特性到涂,其依賴于4個(gè)基礎(chǔ)接口,它們分別是:

  • Publisher
  • Subscriber
  • Subscription
  • Processor

其中最核心的莫過于 Publisher 和 Subscriber颁督。Publisher 可以發(fā)出一系列的事件践啄,而 Subscriber 負(fù)責(zé)和處理這些事件。其中用的比較多的自然是 PublisherFlowable沉御,它支持背壓屿讽。

Base Reactive Interfaces

可訂閱的對(duì)象在RxJava1中只有Observable一種,之前我們經(jīng)常會(huì)直接把數(shù)據(jù)源稱作Observable嚷节。而在RxJava2中擴(kuò)充成了4種 Observable聂儒、Flowable、Singles硫痰、Completable衩婚,因此在之后還是把他們統(tǒng)稱為數(shù)據(jù)源為宜。

public abstract class Flowable<T> implements Publisher<T> {

public abstract class Observable<T> implements ObservableSource<T> {

public abstract class Single<T> implements SingleSource<T> {

public abstract class Completable implements CompletableSource {

Single

和Observable效斑,F(xiàn)lowable一樣會(huì)發(fā)送數(shù)據(jù)非春,不同的是訂閱后只能接受到一次。普通Observable可以使用toSingle轉(zhuǎn)換:Observable.just(1).toSingle()

Completable

與Single類似缓屠,只能接受到完成(onComplete)和錯(cuò)誤(onError)奇昙。同樣也可以由普通的Observable轉(zhuǎn)換而來:Observable.just(1).toCompletable()

和Flowable的接口Publisher類似,Observable敌完、Single储耐、Completable也有類似的基類

public interface Publisher<T> {
    void subscribe(Subscriber<? super T> s);
}

interface ObservableSource<T> {
    void subscribe(Observer<? super T> observer);
}

interface SingleSource<T> {
    void subscribe(SingleObserver<? super T> observer);
}

interface CompletableSource {
    void subscribe(CompletableObserver observer);
}

Observable的創(chuàng)建

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                //執(zhí)行一些其他操作
                //.............
                //執(zhí)行完畢,觸發(fā)回調(diào)滨溉,通知觀察者
                e.onNext("我來發(fā)射數(shù)據(jù)");
            }
        });

使用create( )創(chuàng)建Observable最基本的創(chuàng)建方式』拊埽可以看到,這里傳入了一個(gè) ObservableOnSubscribe對(duì)象作為參數(shù)哟旗,它的作用相當(dāng)于一個(gè)計(jì)劃表,當(dāng) Observable被訂閱的時(shí)候闸餐,ObservableOnSubscribe的subscribe()方法會(huì)自動(dòng)被調(diào)用,事件序列就會(huì)依照設(shè)定依次觸發(fā)(對(duì)于上面的代碼近尚,就是觀察者Observer 將會(huì)被調(diào)用一次 onNext())场勤。這樣歼跟,由被觀察者調(diào)用了觀察者的回調(diào)方法,就實(shí)現(xiàn)了由被觀察者向觀察者的事件傳遞留瞳,即觀察者模式。

Observer的創(chuàng)建

Observer<String> observer = new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    //觀察者接收到通知,進(jìn)行相關(guān)操作
    public void onNext(String aLong) {
        System.out.println("我接收到數(shù)據(jù)了");
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
};
public interface Observer<T> {
    void onSubscribe(@NonNull Disposable d);

    void onNext(@NonNull T t);
 
    void onError(@NonNull Throwable e);

    void onComplete();
}


public interface Subscriber<T> {
    public void onSubscribe(Subscription s);

    public void onNext(T t);

    public void onError(Throwable t);

    public void onComplete();
}

public interface SingleObserver<T> {

    void onSubscribe(@NonNull Disposable d);

    void onSuccess(@NonNull T t);

    void onError(@NonNull Throwable e);
}

public interface CompletableObserver {

    void onSubscribe(@NonNull Disposable d);

    void onComplete();

    void onError(@NonNull Throwable e);
}

Subscribe訂閱

 observable.subscribe(observer);

subscribe()有多個(gè)重載的方法:

public final Disposable subscribe() {}

public final Disposable subscribe(Consumer<? super T> onNext) {}

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {} 

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}

public final void subscribe(Observer<? super T> observer) {}
  • 不帶任何參數(shù)的subscribe() 表示下游不關(guān)心任何事件,你上游盡管發(fā)你的數(shù)據(jù)去吧, 老子可不管你發(fā)什么.
  • 帶有一個(gè)Consumer參數(shù)的方法表示下游只關(guān)心onNext事件, 其他的事件我假裝沒看見

Observable的其他創(chuàng)建方式

just()方式

Observable<String> observable = Observable.just("Hello");

使用just( )骚秦,將為你創(chuàng)建一個(gè)Observable并自動(dòng)為你調(diào)用onNext( )發(fā)射數(shù)據(jù)她倘。通過just( )方式 直接觸發(fā)onNext(),just中傳遞的參數(shù)將直接在Observer的onNext()方法中接收到作箍。

fromIterable()方式

List<String> list = new ArrayList<String>();
for(int i =0;i<10;i++){
    list.add("Hello"+i);
}
Observable<String> observable = Observable.fromIterable((Iterable<String>) list);

使用fromIterable()硬梁,遍歷集合,發(fā)送每個(gè)item胞得。相當(dāng)于多次回調(diào)onNext()方法荧止,每次傳入一個(gè)item。
注意:Collection接口是Iterable接口的子接口阶剑,所以所有Collection接口的實(shí)現(xiàn)類都可以作為Iterable對(duì)象直接傳入fromIterable()方法跃巡。

簡(jiǎn)便的觀察者模式

除了Observable(被觀察者)的創(chuàng)建之外,RxJava 2.x 還提供了多個(gè)函數(shù)式接口 牧愁,用于實(shí)現(xiàn)簡(jiǎn)便式的觀察者模式素邪。具體的函數(shù)式接口包括以下:

Observable.just("hello").subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        System.out.println(s);
    }
});

其中Consumer中的accept()方法接收一個(gè)來自O(shè)bservable的單個(gè)值。Consumer就是一個(gè)觀察者猪半。其他函數(shù)式接口可以類似應(yīng)用兔朦。
注意:Observable (被觀察者)只有在被Observer (觀察者)訂閱后才能執(zhí)行其內(nèi)部的相關(guān)邏輯

Cold Observable 和 Hot Observable

Hot Observable 無論有沒有 Subscriber 訂閱,事件始終都會(huì)發(fā)生办龄。當(dāng) Hot Observable 有多個(gè)訂閱者時(shí)烘绽,Hot Observable 與訂閱者們的關(guān)系是一對(duì)多的關(guān)系,可以與多個(gè)訂閱者共享信息俐填。

然而安接,Cold Observable 只有 Subscriber 訂閱時(shí),才開始執(zhí)行發(fā)射數(shù)據(jù)流的代碼。并且 Cold Observable 和 Subscriber 只能是一對(duì)一的關(guān)系盏檐,當(dāng)有多個(gè)不同的訂閱者時(shí)歇式,消息是重新完整發(fā)送的。也就是說對(duì) Cold Observable 而言胡野,有多個(gè)Subscriber的時(shí)候,他們各自的事件是獨(dú)立的龙巨。

盡管 Cold Observable 很好旨别,但是對(duì)于某些事件不確定何時(shí)發(fā)生以及不確定 Observable 發(fā)射的元素?cái)?shù)量秸弛,那還得使用 Hot Observable递览。比如:UI交互的事件绞铃、網(wǎng)絡(luò)環(huán)境的變化憎兽、地理位置的變化纯命、服務(wù)器推送消息的到達(dá)等等。

Cold Observable

只有當(dāng)有訂閱者訂閱的時(shí)候揪阿, Cold Observable 才開始執(zhí)行發(fā)射數(shù)據(jù)流的代碼吴裤。并且每個(gè)訂閱者訂閱的時(shí)候都獨(dú)立的執(zhí)行一遍數(shù)據(jù)流代碼麦牺。 Observable.interval 就是一個(gè) Cold Observable剖膳。每一個(gè)訂閱者都會(huì)獨(dú)立的收到他們的數(shù)據(jù)流吱晒。

Observable 的 just叹话、creat渣刷、range、fromXXX 等操作符都能生成Cold Observable瞭吃。

Hot Observable

Hot observable 不管有沒有訂閱者訂閱歪架,他們創(chuàng)建后就開發(fā)發(fā)射數(shù)據(jù)流和蚪。 一個(gè)比較好的示例就是 鼠標(biāo)事件攒霹。 不管系統(tǒng)有沒有訂閱者監(jiān)聽鼠標(biāo)事件催束,鼠標(biāo)事件一直在發(fā)生抠刺,當(dāng)有訂閱者訂閱后速妖,從訂閱后的事件開始發(fā)送給這個(gè)訂閱者,之前的事件這個(gè)訂閱者是接受不到的;如果訂閱者取消訂閱了烘跺,鼠標(biāo)事件依然繼續(xù)發(fā)射。

Cold Observable 如何轉(zhuǎn)換成 Hot Observable砌左?

  1. 使用publish屁擅,生成 ConnectableObservable

使用 publish 操作符派歌,可以讓 Cold Observable 轉(zhuǎn)換成 Hot Observable胶果。它將原先的 Observable 轉(zhuǎn)換成 ConnectableObservable早抠。

  1. 使用Subject/Processor

Subject 和 Processor 的作用是相同的蕊连。Processor 是 RxJava2.x 新增的類甘苍,繼承自 Flowable 支持背壓控制羊赵。而 Subject 則不支持背壓控制。

public abstract class Subject<T> extends Observable<T> implements Observer<T> {

當(dāng) Subject 作為 Subscriber 時(shí)靡挥,它可以訂閱目標(biāo) Cold Observable 使對(duì)方開始發(fā)送事件跋破。同時(shí)它又作為Observable 轉(zhuǎn)發(fā)或者發(fā)送新的事件,讓 Cold Observable 借助 Subject 轉(zhuǎn)換為 Hot Observable租幕。

注意劲绪,Subject 并不是線程安全的贾富,如果想要其線程安全需要調(diào)用toSerialized()方法。(在RxJava1.x的時(shí)代還可以用 SerializedSubject 代替 Subject淑际,但是在RxJava2.x以后SerializedSubject不再是一個(gè)public class)
然而霍骄,很多基于 EventBus 改造的 RxBus 并沒有這么做,這樣的做法是非常危險(xiǎn)的咱娶,因?yàn)闀?huì)遇到并發(fā)的情況膘侮。

Hot Observable 如何轉(zhuǎn)換成 Cold Observable琼了?

  1. ConnectableObservable的refCount操作符

RefCount操作符把從一個(gè)可連接的 Observable 連接和斷開的過程自動(dòng)化了雕薪。它操作一個(gè)可連接的Observable所袁,返回一個(gè)普通的Observable燥爷。當(dāng)?shù)谝粋€(gè)訂閱者訂閱這個(gè)Observable時(shí),RefCount連接到下層的可連接Observable暖璧。RefCount跟蹤有多少個(gè)觀察者訂閱它,直到最后一個(gè)觀察者完成才斷開與下層可連接Observable的連接愤惰。

  1. Observable的share操作符

share操作符封裝了publish().refCount()調(diào)用

Flowable的產(chǎn)生

在原來的RxJava 1.x版本中并沒有Flowable的存在,Backpressure問題是由Observable來處理的商模。在RxJava 2.x中對(duì)于backpressure的處理進(jìn)行了改動(dòng)响疚,為此將原來的Observable拆分成了新的Observable和Flowable忿晕,同時(shí)其他相關(guān)部分也同時(shí)進(jìn)行了拆分践盼。原先的Observable已經(jīng)不具備背壓處理能力咕幻。

被觀察者發(fā)送消息十分迅速以至于觀察者不能及時(shí)的響應(yīng)這些消息肄程。

背壓是指在異步場(chǎng)景中蓝厌,被觀察者發(fā)送事件速度遠(yuǎn)快于觀察者的處理速度的情況下,一種告訴上游的被觀察者降低發(fā)送速度的策略崎苗。簡(jiǎn)而言之,背壓是流速控制的一種策略肌蜻。

處理Backpressure的策略

處理Backpressure的策略僅僅是處理Subscriber接收事件的方式蒋搜,并不影響Flowable發(fā)送事件的方法豆挽。即使采用了處理Backpressure的策略,F(xiàn)lowable原來以什么樣的速度產(chǎn)生事件锰镀,現(xiàn)在還是什么樣的速度不會(huì)變化憾筏,主要處理的是Subscriber接收事件的方式氧腰。

什么情況下才會(huì)產(chǎn)生Backpressure問題容贝?
  1. 如果生產(chǎn)者和消費(fèi)者在一個(gè)線程的情況下,無論生產(chǎn)者的生產(chǎn)速度有多快锻狗,每生產(chǎn)一個(gè)事件都會(huì)通知消費(fèi)者轻纪,等待消費(fèi)者消費(fèi)完畢刻帚,再生產(chǎn)下一個(gè)事件崇众。所以在這種情況下顷歌,根本不存在Backpressure問題。即同步情況下芹扭,Backpressure問題不存在舱卡。
  2. 如果生產(chǎn)者和消費(fèi)者不在同一線程的情況下轮锥,如果生產(chǎn)者的速度大于消費(fèi)者的速度交胚,就會(huì)產(chǎn)生Backpressure問題蝴簇。即異步情況下熬词,Backpressure問題才會(huì)存在互拾。
ERROR

這種方式會(huì)在產(chǎn)生Backpressure問題的時(shí)候直接拋出一個(gè)異常,這個(gè)異常就是著名的MissingBackpressureException颜矿。

Flowable<Integer> flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "emit 1");
                emitter.onNext(1);
                Log.d(TAG, "emit 2");
                emitter.onNext(2);
                Log.d(TAG, "emit 3");
                emitter.onNext(3);
                Log.d(TAG, "emit complete");
                emitter.onComplete();
            }
}, BackpressureStrategy.ERROR); //增加了一個(gè)參數(shù)

Subscriber<Integer> subscriber = new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                Log.d(TAG, "onSubscribe");
                s.request(Long.MAX_VALUE);  
            }
            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: " + integer);

            }
            @Override
            public void onError(Throwable t) {
                Log.w(TAG, "onError: ", t);
            }
            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
};

flowable.subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(subscriber);

上述代碼創(chuàng)建了一個(gè)Flowable(被觀察者)和一個(gè)Subscriber(觀察者)。不同的是 onSubscribe(Subscription s)中傳給我們的不再是Disposable了, 而是Subscription箍铭。然而Subscription也可以用于切斷觀察者與被觀察者之間的聯(lián)系诈火,調(diào)用Subscription.cancel()方法便可冷守。 不同的地方在于Subscription增加了一個(gè)void request(long n)方法, 這個(gè)方法有什么用呢, 在上面的代碼中也有這么一句代碼:

s.request(Long.MAX_VALUE);
這個(gè)方法就是用來向生產(chǎn)者申請(qǐng)可以消費(fèi)的事件數(shù)量教沾。這樣我們便可以根據(jù)本身的消費(fèi)能力進(jìn)行消費(fèi)事件授翻。
當(dāng)調(diào)用了request()方法后,生產(chǎn)者便發(fā)送對(duì)應(yīng)數(shù)量的事件供消費(fèi)者消費(fèi)巡语。
這是因?yàn)镕lowable在設(shè)計(jì)的時(shí)候采用了一種新的思路也就是響應(yīng)式拉取的方式,你要求多少,我便傳給你多少枢赔。

注意:如果不顯示調(diào)用request就表示消費(fèi)能力為0拥知。

雖然并不限制向request()方法中傳入任意數(shù)字速梗,但是如果消費(fèi)者并沒有這么多的消費(fèi)能力襟齿,依舊會(huì)造成資源浪費(fèi)位隶,最后產(chǎn)生OOM钓试。形象點(diǎn)就是不能打腫臉充胖子。而ERROR策略就避免了這種情況的出現(xiàn)糠睡。

在異步調(diào)用時(shí)狈孔,RxJava中有個(gè)緩存池,用來緩存消費(fèi)者處理不了暫時(shí)緩存下來的數(shù)據(jù)嫁赏,緩存池的默認(rèn)大小為128潦蝇,即只能緩存128個(gè)事件攘乒。無論request()中傳入的數(shù)字比128大或小则酝,緩存池中在剛開始都會(huì)存入128個(gè)事件沽讹。當(dāng)然如果本身并沒有這么多事件需要發(fā)送爽雄,則不會(huì)存128個(gè)事件盲链。
在ERROR策略下刽沾,如果緩存池溢出,就會(huì)立刻拋出MissingBackpressureException異常监氢。

Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; i < 129; i++) {
                    Log.d(TAG, "emit " + i);
                    emitter.onNext(i);
                }
            }
        }, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        mSubscription = s;
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }
                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                    }
});

讓Flowable發(fā)送129個(gè)事件,而Subscriber一個(gè)也不處理(如果不顯示調(diào)用request就表示消費(fèi)能力為0)泽谨,就產(chǎn)生了異常吧雹。
因此雄卷,ERROR即保證在異步操作中妒潭,事件累積不能超過128杜耙,超過即出現(xiàn)異常佑女。消費(fèi)者不能再接收事件了团驱,但生產(chǎn)者并不會(huì)停止嚎花。

BUFFER

所謂BUFFER就是把RxJava中默認(rèn)的只能存128個(gè)事件的緩存池?fù)Q成一個(gè)大的緩存池紊选,支持存很多很多的數(shù)據(jù)兵罢。
這樣,消費(fèi)者通過request()即使傳入一個(gè)很大的數(shù)字此蜈,生產(chǎn)者也會(huì)生產(chǎn)事件,并將處理不了的事件緩存跺嗽。
但是這種方式任然比較消耗內(nèi)存战授,除非是我們比較了解消費(fèi)者的消費(fèi)能力舔庶,能夠把握具體情況,不會(huì)產(chǎn)生OOM陈醒。
總之BUFFER要慎用。

DROP

看名字就可以了解其作用:當(dāng)消費(fèi)者處理不了事件瞧甩,就丟棄钉跷。
消費(fèi)者通過request()傳入其需求n,然后生產(chǎn)者把n個(gè)事件傳遞給消費(fèi)者供其消費(fèi)肚逸。其他消費(fèi)不掉的事件就丟掉爷辙。

mFlowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; ; i++) {

                    emitter.onNext(i);
                }
            }
        }, BackpressureStrategy.DROP);
        mSubscriber = new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                mSubscription = s;
                s.request(50);
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: " + integer);
            }

            @Override
            public void onError(Throwable t) {
                Log.w(TAG, "onError: ", t);
            }

            @Override
            public void onComplete() {

            }
        };
    }
    
    public void start(View view){
        mFlowable.subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(mSubscriber);

    }
    
    public void consume(View view){
        mSubscription.request(50);

    } 

生產(chǎn)者一次性傳入128個(gè)事件進(jìn)入緩存池血当。點(diǎn)擊“開始”按鈕箩退,消費(fèi)了50個(gè)啥刻。然后第一次點(diǎn)擊“消費(fèi)”按鈕,又消費(fèi)了50個(gè),第二次點(diǎn)擊“消費(fèi)”按鈕,再次消費(fèi)50個(gè)。然而此時(shí)原來的128個(gè)緩存只剩下28個(gè)了,所以先消費(fèi)掉28個(gè),然后剩下22個(gè)是后來傳入的(其實(shí)后來的是在消費(fèi)了96個(gè)后傳入,并一次性在緩存池中又傳入了96個(gè),具體可以看源碼署照,這里不解釋了)禁荸。

LATEST

LATEST與DROP功能基本一致映砖。
消費(fèi)者通過request()傳入其需求n,然后生產(chǎn)者把n個(gè)事件傳遞給消費(fèi)者供其消費(fèi)。其他消費(fèi)不掉的事件就丟掉。

唯一的區(qū)別就是LATEST總能使消費(fèi)者能夠接收到生產(chǎn)者產(chǎn)生的最后一個(gè)事件。
還是以上述例子展示滔悉,唯一的區(qū)別就是Flowable不再無限發(fā)事件歉提,只發(fā)送1000000個(gè)。

上述例子Flowable對(duì)象的獲取都是通過create()獲取的肖方,自然可以通過BackpressureStrategy.LATEST之類的方式指定處理背壓的策略艰垂。如果Flowable對(duì)象不是自己創(chuàng)建的峦树,可以采用onBackpressureBuffer()葬馋、onBackpressureDrop()蒋院、onBackpressureLatest()的方式指定踏枣。

Flowable.just(1).onBackpressureBuffer()
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeOn(Schedulers.io())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {

                    }
});

背壓和流量控制

Flow Control有哪些思路呢?

  • 背壓(Backpressure)
  • 節(jié)流(Throttling)
  • 打包處理
  • 調(diào)用棧阻塞(Callstack blocking)

Flow Control的幾種思路

背壓(Backpressure)

Backpressure泼菌,也稱為Reactive Pull虐块,就是下游需要多少(具體是通過下游的request請(qǐng)求指定需要多少)喉悴,上游就發(fā)送多少吟宦。這有點(diǎn)類似于TCP里的流量控制,接收方根據(jù)自己的接收窗口的情況來控制接收速率,并通過反向的ACK包來控制發(fā)送方的發(fā)送速率晌梨。

這種方案只對(duì)于所謂的cold Observable有效。cold Observable指的是那些允許降低速率的發(fā)送源癌刽,比如兩臺(tái)機(jī)器傳一個(gè)文件,速率可大可小确虱,即使降低到每秒幾個(gè)字節(jié)故黑,只要時(shí)間足夠長扳炬,還是能夠完成的。相反的例子是音視頻直播圈纺,數(shù)據(jù)速率低于某個(gè)值整個(gè)功能就沒法用了(這種就屬于hot Observable了)。

節(jié)流(Throttling)

節(jié)流(Throttling)所禀,說白了就是丟棄邢锯。消費(fèi)不過來翎冲,就處理其中一部分,剩下的丟棄。還是舉音視頻直播的例子欣舵,在下游處理不過來的時(shí)候缘圈,就需要丟棄數(shù)據(jù)包。

而至于處理哪些和丟棄哪些數(shù)據(jù)辨液,就有不同的策略榜揖。主要有三種策略:

  • sample (也叫throttleLast)
  • throttleFirst
  • debounce (也叫throttleWithTimeout)

從細(xì)的方面分別解釋一下。

sample威兜,采樣销斟。類比一下音頻采樣,8kHz的音頻就是每125微秒采一個(gè)值椒舵。sample可以配置成蚂踊,比如每100毫秒采樣一個(gè)值,但100毫秒內(nèi)上游可能過來很多值笔宿,選哪個(gè)值呢犁钟,就是選最后那個(gè)值。所以它也叫throttleLast泼橘。

打包處理

打包就是把上游來的小包裹打成大包裹涝动,分發(fā)到下游。這樣下游需要處理的包裹的個(gè)數(shù)就減少了炬灭。RxJava中提供了兩類這樣的機(jī)制:buffer和window醋粟。

調(diào)用棧阻塞(Callstack blocking)

這是一種特殊情況,阻塞住整個(gè)調(diào)用棧(Callstack blocking)担败。之所以說這是一種特殊情況昔穴,是因?yàn)檫@種方式只適用于整個(gè)調(diào)用鏈都在一個(gè)線程上同步執(zhí)行的情況,這要求中間的各個(gè)operator都不能啟動(dòng)新的線程提前。在平常使用中這種應(yīng)該是比較少見的吗货,因?yàn)槲覀兘?jīng)常使用subscribeOn或observeOn來切換執(zhí)行線程,而且有些復(fù)雜的operator本身也會(huì)在內(nèi)部啟動(dòng)新的線程來處理狈网。另外宙搬,如果真的出現(xiàn)了完全同步的調(diào)用鏈,前面的另外三種Flow Control思路仍然可能是適用的拓哺,只不過這種阻塞的方式更簡(jiǎn)單勇垛,不需要額外的支持。

這里舉個(gè)例子把調(diào)用棧阻塞和前面的Backpressure比較一下士鸥∠泄拢“調(diào)用棧阻塞”相當(dāng)于很多車行駛在盤山公路上,而公路只有一條車道烤礁。那么排在最前面的第一輛車就擋住了整條路讼积,后面的車也只能排在后面。而“Backpressure”相當(dāng)于銀行辦業(yè)務(wù)時(shí)的窗口叫號(hào)脚仔,窗口主動(dòng)叫某個(gè)號(hào)過去(相當(dāng)于請(qǐng)求)勤众,那個(gè)人才過去辦理。

如何讓Observable支持Backpressure鲤脏?

在RxJava 1.x中们颜,有些Observable是支持Backpressure的吕朵,而有些不支持。但不支持Backpressure的Observable可以通過一些operator來轉(zhuǎn)化成支持Backpressure的Observable窥突。這些operator包括:

  • onBackpressureBuffer
  • onBackpressureDrop
  • onBackpressureLatest
  • onBackpressureBlock(已過期)

它們轉(zhuǎn)化成的Observable分別具有不同的Backpressure策略努溃。

而在RxJava 2.x中,Observable不再支持Backpressure阻问,而是改用Flowable來專門支持Backpressure茅坛。上面提到的四種operator的前三種分別對(duì)應(yīng)Flowable的三種Backpressure策略:

  • BackpressureStrategy.BUFFER
  • BackpressureStrategy.DROP
  • BackpressureStrategy.LATEST
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市则拷,隨后出現(xiàn)的幾起案子贡蓖,更是在濱河造成了極大的恐慌,老刑警劉巖煌茬,帶你破解...
    沈念sama閱讀 211,561評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件斥铺,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡坛善,警方通過查閱死者的電腦和手機(jī)晾蜘,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,218評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來眠屎,“玉大人剔交,你說我怎么就攤上這事「鸟茫” “怎么了岖常?”我有些...
    開封第一講書人閱讀 157,162評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長葫督。 經(jīng)常有香客問我竭鞍,道長,這世上最難降的妖魔是什么橄镜? 我笑而不...
    開封第一講書人閱讀 56,470評(píng)論 1 283
  • 正文 為了忘掉前任偎快,我火速辦了婚禮,結(jié)果婚禮上洽胶,老公的妹妹穿的比我還像新娘晒夹。我一直安慰自己,他們只是感情好姊氓,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,550評(píng)論 6 385
  • 文/花漫 我一把揭開白布丐怯。 她就那樣靜靜地躺著,像睡著了一般他膳。 火紅的嫁衣襯著肌膚如雪响逢。 梳的紋絲不亂的頭發(fā)上绒窑,一...
    開封第一講書人閱讀 49,806評(píng)論 1 290
  • 那天棕孙,我揣著相機(jī)與錄音,去河邊找鬼。 笑死蟀俊,一個(gè)胖子當(dāng)著我的面吹牛钦铺,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播肢预,決...
    沈念sama閱讀 38,951評(píng)論 3 407
  • 文/蒼蘭香墨 我猛地睜開眼矛洞,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了烫映?” 一聲冷哼從身側(cè)響起沼本,我...
    開封第一講書人閱讀 37,712評(píng)論 0 266
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎锭沟,沒想到半個(gè)月后抽兆,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,166評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡族淮,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,510評(píng)論 2 327
  • 正文 我和宋清朗相戀三年辫红,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片祝辣。...
    茶點(diǎn)故事閱讀 38,643評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡贴妻,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出蝙斜,到底是詐尸還是另有隱情名惩,我是刑警寧澤,帶...
    沈念sama閱讀 34,306評(píng)論 4 330
  • 正文 年R本政府宣布孕荠,位于F島的核電站绢片,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏岛琼。R本人自食惡果不足惜底循,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,930評(píng)論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望槐瑞。 院中可真熱鬧熙涤,春花似錦、人聲如沸困檩。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,745評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽悼沿。三九已至等舔,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間糟趾,已是汗流浹背慌植。 一陣腳步聲響...
    開封第一講書人閱讀 31,983評(píng)論 1 266
  • 我被黑心中介騙來泰國打工甚牲, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人蝶柿。 一個(gè)月前我還...
    沈念sama閱讀 46,351評(píng)論 2 360
  • 正文 我出身青樓丈钙,卻偏偏與公主長得像,于是被迫代替她去往敵國和親交汤。 傳聞我的和親對(duì)象是個(gè)殘疾皇子雏赦,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,509評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容

  • 首先這是基于rxjava1.0的學(xué)習(xí),最近又出了2.0版本芙扎,有了一些改動(dòng)星岗,首先用法就有了一定的變化,就比如開始我在...
    cgzysan閱讀 1,037評(píng)論 1 24
  • 本文僅為學(xué)習(xí)筆記戒洼;不是原創(chuàng)文章伍茄; 給 Android 開發(fā)者的 RxJava 詳解 一:基本介紹 概念: 是一個(gè)實(shí)...
    shuixingge閱讀 662評(píng)論 0 1
  • 參閱給 Android 開發(fā)者的 RxJava 詳解什么是函數(shù)式編程RxJava 2.0 全新來襲基于RxJava...
    小編閱讀 1,959評(píng)論 0 5
  • 注明:摘選自給 Android 開發(fā)者的 RxJava 詳解,做該篇文章的筆記 一.創(chuàng)建 Observable-被...
    墨源為水閱讀 153評(píng)論 0 1
  • 在正文開始之前的最后,放上 GitHub 鏈接和引入依賴的 gradle 代碼: Github:https://g...
    CHSmile閱讀 1,592評(píng)論 0 10