【Android】Rxjava2 Flowable詳解與背壓那些事

1.Rxjava1中的背壓

Rxjava2中有這么一個(gè)被觀察者Flowable敷扫,同樣作為被觀察者晴楔,它和Observable有什么區(qū)別呢摆昧,在Rxjava2中撩满,Observable不再支持背壓,而新增的Flowable支持背壓,何為背壓伺帘,就是上游發(fā)送事件的速度大于下游處理事件的速度所產(chǎn)生的現(xiàn)象昭躺。

我們來看個(gè)例子,先把rxjava切換到rxjava1.0:

implementation 'io.reactivex:rxjava:1.1.6'
implementation 'io.reactivex:rxandroid:1.2.1'

然后執(zhí)行如下代碼:

        //被觀察者在主線程中伪嫁,每1ms發(fā)送一個(gè)事件
        Observable.interval(1, TimeUnit.MILLISECONDS)
                //觀察者每1s才處理一個(gè)事件
                .subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        Log.w("tag", "---->" + aLong);
                    }
                });

執(zhí)行結(jié)果如下:


image.png

我特?說好的背壓呢张咳,說好的異常呢帝洪,不要慌,因?yàn)樯厦娴拇a是同步的情況,都是運(yùn)行在祝線程的,所以同步的情況下觅赊,被觀察者每發(fā)送一個(gè)事件,觀察者就會處理一個(gè)事件砰奕,等觀察者處理完當(dāng)前事件后,被觀察者才會繼續(xù)發(fā)送事件提鸟,兩者分工明確军援,恩愛和睦,不存在發(fā)送速度不一致的情況称勋。

下面我們來看下異步的情況:

        //被觀察者在主線程中胸哥,每1ms發(fā)送一個(gè)事件
        Observable.interval(1, TimeUnit.MILLISECONDS)
                .observeOn(Schedulers.newThread())
                //觀察者在子線程中每1s處理一個(gè)事件
                .subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        Log.w("tag", "---->" + aLong);
                    }
                });

運(yùn)行后就會出現(xiàn)如下異常:


image.png

出現(xiàn)了背壓的情況,拋出了MissingBackpressureException異常赡鲜,異步情況下被觀察者發(fā)送事件是比較暴力的空厌,一次性全部發(fā)完,放在緩存池蝗蛙,然后觀察者一條條慢慢去處理蝇庭,發(fā)送過快就會出現(xiàn)背壓的情況.

背壓產(chǎn)生的條件:必須是異步的場景下才會出現(xiàn)醉鳖,即被觀察者和觀察者處于不同的線程中捡硅。

rxjava1中默認(rèn)的緩存池大小是16,當(dāng)事件超過就會出現(xiàn)MissingBackpressureException盗棵,看如下例子:

    Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                for (int i = 0; i < 17; i++) {
                    Log.w("tag", "send ----> i = " + i);
                    subscriber.onNext("i = "+i);
                }
            }
        })
                .subscribeOn(Schedulers.newThread())
                //將觀察者的工作放在新線程環(huán)境中
                .observeOn(Schedulers.newThread())
                //觀察者處理每1000ms才處理一個(gè)事件
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String value) {
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        Log.w("tag", "---->" + value);
                    }
                });

你看:


image.png

嗯壮韭,默認(rèn)的緩存池為什么是16,這個(gè)問題問的好纹因,因?yàn)槿思襯xjava給的默認(rèn)值就是16啊喷屋,不信你看:

    public final <B> Observable<List<T>> buffer(Observable<B> boundary) {
        return buffer(boundary, 16);
    }

rxjava1中也提供了處理背壓的操作符onBackpressureBuffer和onBackpressureDrop,下面我們來簡單看下onBackpressureBuffer:

        //被觀察者在主線程中瞭恰,每1ms發(fā)送一個(gè)事件
        Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                for (int i = 0; i < 10000; i++) {
                    Log.w("tag", "send ----> i = " + i);
                    subscriber.onNext("i = "+i);
                }
            }
        })
                .onBackpressureBuffer()
                .subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.newThread())
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String value) {
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        Log.w("tag", "---->" + value);
                    }
                });

運(yùn)行結(jié)果如下:


image.png

其實(shí)onBackpressureBuffer也就是增加了緩存池的大小屯曹,這個(gè)值為Long.MAX_VALUE,當(dāng)然我們也可以自己指定onBackpressureBuffer(size)的大小:

    Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                for (int i = 0; i < 100; i++) {
                    Log.w("tag", "send ----> i = " + i);
                    subscriber.onNext("i = "+i);
                }
            }
        })
                .onBackpressureBuffer(100)
                .subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.newThread())
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String value) {
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        Log.w("tag", "---->" + value);
                    }
                });
image.png

onBackpressureDrop的作用是當(dāng)觀察者來不及處理事件的時(shí)候恶耽,會把事件給丟棄掉密任,而onBackpressureLatest操作符表示當(dāng)被觀察者Observable發(fā)出事件的速度比觀察者消耗得要快,觀察者會接收Observable最新發(fā)出的事件進(jìn)行處理偷俭,這兩種情況大家可以自行測試感受下浪讳。

從上面的例子可以看出,在rxjava1中涌萤,interval操作符默認(rèn)是不支持背壓的淹遵,我們來試試range操作符:

    Observable.range(1,10000)
                .subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.newThread())
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer value) {
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        Log.w("tag", "---->" + value);
                    }
                });

運(yùn)行結(jié)果如下:


image.png

尼瑪,竟然沒有出現(xiàn)背壓负溪,納尼透揣?


image.png

表情包好像放錯(cuò)了,走錯(cuò)片場了笙以,哈哈哈淌实,難道range操作符有毛病,不應(yīng)該啊猖腕,最后經(jīng)過一番查找拆祈,發(fā)現(xiàn)問題在observeOn操作符上,observeOn這個(gè)操作符內(nèi)部有一個(gè)緩沖區(qū)倘感,Android環(huán)境下長度是16放坏,它會告訴range最多發(fā)送16個(gè)事件,充滿緩沖區(qū)即可老玛。

這樣可以看出淤年,之前使用的interval操作符是不支持背壓的,而range則支持背壓蜡豹,那么到底什么樣的Observable支持背壓或不支持背壓呢麸粮?


image.png

其實(shí)在rxjava1中,不是所有Observable都支持背壓镜廉,從上面的例子也可以看出來這一點(diǎn)弄诲,我們知道Observable有hot和cold之分,rxjava1中hot observable是不支持背壓的娇唯,而cold observable中也有一部分不支持背壓齐遵,這里不再深究,想繼續(xù)了解可以自行g(shù)oogle塔插,另外一個(gè)原因是現(xiàn)在都tm Rxjava2了梗摇,我還在這扯rxjava1,罪過罪過想许,我也是為了引出問題伶授。

簡單扯一下解決背壓的思路断序,無非是限制發(fā)送的速度,俗稱限流糜烹,很多操作符都可以做到這些逢倍,比如sample在一段時(shí)間內(nèi)只處理最后一個(gè)數(shù)據(jù)等,也可以使用rxjava1中提供的onBackpressureBuffer景图,onBackpressureDrop较雕,onBackpressureLatest。

雖然rxjava1也有處理背壓的方法挚币,但設(shè)計(jì)并不完美亮蒋,緩存池大小只有16,而且被觀察者無法得知下游觀察者對事件的處理速度妆毕,一次性把事件拋給了下游觀察者慎玖,所以rxjava2中對背壓進(jìn)行了改進(jìn)。

2.Rxjava2中的背壓

Rxjava2中新增了一個(gè)被觀察者Flowable用來專門支持背壓笛粘,默認(rèn)隊(duì)列大小128趁怔,并且其所有的操作符都強(qiáng)制支持背壓,先看個(gè)簡單的例子:

        Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                for (int i = 0;i < 1000000; i++) {
                    emitter.onNext("i = "+i);
                }
            }
        }, BackpressureStrategy.ERROR)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.e("tag","----> "+s);
                    }
                });

運(yùn)行結(jié)果如下:


image.png

說好的支持背壓呢薪前,怎么這個(gè)熟悉的異常又出現(xiàn)了润努??示括?铺浇?


image.png

細(xì)心的同學(xué)肯定發(fā)現(xiàn)了,F(xiàn)lowable.create方法第二個(gè)參數(shù)BackpressureStrategy.ERROR垛膝,這個(gè)BackpressureStrategy類其實(shí)就是處理背壓的策略類鳍侣,看下這個(gè)類的源碼:
public enum BackpressureStrategy {
    //不指定背壓策略
    MISSING,
    //出現(xiàn)背壓就拋出異常
    ERROR,
    //指定無限大小的緩存池,此時(shí)不會出現(xiàn)異常吼拥,但無限制大量發(fā)送會發(fā)生OOM
    BUFFER,
    //如果緩存池滿了就丟棄掉之后發(fā)出的事件
    DROP,
    //在DROP的基礎(chǔ)上倚聚,強(qiáng)制將最后一條數(shù)據(jù)加入到緩存池中
    LATEST
}

依次來看下這幾種策略的區(qū)別吧!

MISSING

        Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                for (int i = 0;i < 1000000; i++) {
                    emitter.onNext("i = "+i);
                }
            }
        }, BackpressureStrategy.MISSING)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.e("tag","----> "+s);
                    }
                });

不出所料凿可,果然拋出了異常:


MISSING

ERROR

BackpressureStrategy.ERROR上面已經(jīng)測試過了惑折,不再重復(fù)了,依然會報(bào)異常矿酵。

BUFFER

      Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                for (int i = 0;i < 1000000; i++) {
                    emitter.onNext("i = "+i);
                }
            }
        }, BackpressureStrategy.BUFFER)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.e("tag","----> "+s);
                    }
                });

運(yùn)行結(jié)果如下唬复,確實(shí)不會出現(xiàn)背壓異常了矗积,但是內(nèi)存占用嗖嗖的升高全肮,數(shù)據(jù)量足夠大足夠快的時(shí)候,OOM指日可待棘捣,哈哈哈9枷佟!!


BUFFER

DROP

        Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                for (int i = 0;i < 1000000; i++) {
                    emitter.onNext("i = "+i);
                }
            }
        }, BackpressureStrategy.DROP)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.e("tag","----> "+s);
                    }
                });

運(yùn)行結(jié)果如下:


DROP

可以發(fā)現(xiàn)评疗,在填充滿了默認(rèn)的128個(gè)大小的緩存池后测砂,丟棄了很多數(shù)據(jù),DROP就是干這事的百匆,發(fā)不下就不放了砌些,有點(diǎn)狠啊<有佟存璃!

LATEST

        Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                for (int i = 0;i < 1000; i++) {
                    emitter.onNext("i = "+i);
                }
            }
        }, BackpressureStrategy.LATEST)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.e("tag","----> "+s);
                    }
                });

這次我們只發(fā)送1000個(gè)事件,運(yùn)行結(jié)果如下:


LATEST

LATEST策略下雕拼,當(dāng)緩存池滿了之后也是會丟棄事件的纵东,不僅如此,它還會把事件的最后一個(gè)強(qiáng)制放入到緩存池中啥寇,所以可以看到999被觀察者收到了偎球。

上面我們都是用的Flowable的create創(chuàng)建的被觀察者,如果我們使用just辑甜,fromArray等操作符該如何指定背壓策略呢衰絮?其實(shí)也很簡單,因?yàn)閞xjava2像rxjava1那樣也提供了onBackpressureBuffer()磷醋,onBackpressureDrop()岂傲,onBackpressureLatest(),這樣用就可以了:

        Flowable.range(1,1000)
                .onBackpressureBuffer(1000)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer s) throws Exception {
                        Log.e("tag","----> "+s);
                    }
                });

嗯子檀,運(yùn)行結(jié)果很穩(wěn):


onBackpressureBuffer

那么可能我們會有個(gè)疑問镊掖,上面的例子都是觀察者被動的接收事件,能不能主動拉取事件呢褂痰,當(dāng)然可以亩进,我們看下下面這個(gè)例子:

        Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                for (int i = 0; i < 1000; i++) {
                    emitter.onNext("i = " + i);
                }
            }
        }, BackpressureStrategy.BUFFER)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        subscription = s;
                    }

                    @Override
                    public void onNext(String s) {
                        Log.e("tag", "----> " + s);
                    }

                    @Override
                    public void onError(Throwable t) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

看下運(yùn)行結(jié)果:

image.png

此時(shí)我們的觀察者使用了Subscriber,它有一個(gè)onSubscribe方法缩歪,參數(shù)為Subscription归薛,其實(shí)關(guān)鍵點(diǎn)就是這個(gè)Subscription,它的作用就是從緩存池拉取事件匪蝙,它有一個(gè)request(count)方法主籍,它的作用就是拉取事件,并可以指定拉取事件的個(gè)數(shù)逛球。我們在上面的例子中千元,使用subscription.request(5)每次拉取5個(gè)事件,其實(shí)也是很簡單的颤绕。

其實(shí)搞了半天幸海,文章基本也要結(jié)束了祟身,雖然rxjava提供了處理背壓的策略,但是最好還是能盡量避免上游被觀察者發(fā)送事件過快過多物独,實(shí)在需要處理袜硫,就結(jié)合各種策略和操作符進(jìn)行按需處理。

3.項(xiàng)目中的使用

上周在項(xiàng)目中遇到了這么一個(gè)場景挡篓,就是在跳轉(zhuǎn)頁面之前需要釋放camera婉陷,這是個(gè)耗時(shí)操作,返回當(dāng)前頁面的時(shí)候需要重新open Camera官研,而且open Camera的時(shí)機(jī)需要在SurfaceView的create中執(zhí)行憨攒,這個(gè)場景剛好用request可以解決,例子和上面類似阀参,就不再上代碼了肝集。


大家拜拜

關(guān)于Flowable的使用就先到這吧,由于個(gè)人水平有限蛛壳,難免會犯些錯(cuò)誤杏瞻,有問題歡迎留言討論。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末衙荐,一起剝皮案震驚了整個(gè)濱河市捞挥,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌忧吟,老刑警劉巖砌函,帶你破解...
    沈念sama閱讀 216,919評論 6 502
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異溜族,居然都是意外死亡讹俊,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,567評論 3 392
  • 文/潘曉璐 我一進(jìn)店門煌抒,熙熙樓的掌柜王于貴愁眉苦臉地迎上來仍劈,“玉大人,你說我怎么就攤上這事寡壮》犯恚” “怎么了?”我有些...
    開封第一講書人閱讀 163,316評論 0 353
  • 文/不壞的土叔 我叫張陵况既,是天一觀的道長这溅。 經(jīng)常有香客問我,道長棒仍,這世上最難降的妖魔是什么悲靴? 我笑而不...
    開封第一講書人閱讀 58,294評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮降狠,結(jié)果婚禮上对竣,老公的妹妹穿的比我還像新娘。我一直安慰自己榜配,他們只是感情好否纬,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,318評論 6 390
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著蛋褥,像睡著了一般临燃。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上烙心,一...
    開封第一講書人閱讀 51,245評論 1 299
  • 那天膜廊,我揣著相機(jī)與錄音,去河邊找鬼淫茵。 笑死爪瓜,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的匙瘪。 我是一名探鬼主播铆铆,決...
    沈念sama閱讀 40,120評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼丹喻!你這毒婦竟也來了薄货?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,964評論 0 275
  • 序言:老撾萬榮一對情侶失蹤碍论,失蹤者是張志新(化名)和其女友劉穎谅猾,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體鳍悠,經(jīng)...
    沈念sama閱讀 45,376評論 1 313
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡税娜,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,592評論 2 333
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了藏研。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片巧涧。...
    茶點(diǎn)故事閱讀 39,764評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖遥倦,靈堂內(nèi)的尸體忽然破棺而出谤绳,到底是詐尸還是另有隱情,我是刑警寧澤袒哥,帶...
    沈念sama閱讀 35,460評論 5 344
  • 正文 年R本政府宣布缩筛,位于F島的核電站,受9級特大地震影響堡称,放射性物質(zhì)發(fā)生泄漏瞎抛。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,070評論 3 327
  • 文/蒙蒙 一却紧、第九天 我趴在偏房一處隱蔽的房頂上張望桐臊。 院中可真熱鬧胎撤,春花似錦、人聲如沸断凶。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,697評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽认烁。三九已至肿男,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間却嗡,已是汗流浹背舶沛。 一陣腳步聲響...
    開封第一講書人閱讀 32,846評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留窗价,地道東北人如庭。 一個(gè)月前我還...
    沈念sama閱讀 47,819評論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像撼港,于是被迫代替她去往敵國和親柱彻。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,665評論 2 354

推薦閱讀更多精彩內(nèi)容