Rxjava2(二)、五種觀察者模式創(chuàng)建及背壓

Android進階系列之第三方庫知識點整理掠拳。

知識點總結癞揉,整理也是學習的過程,如有錯誤溺欧,歡迎批評指出喊熟。

上一篇:Rxjava2(一)、基礎概念及使用

image

直接開整姐刁,上一篇基礎概念里面說了芥牌,rxjava2 擴展于觀察者模式,我們上篇的只是簡單的介紹了用Observable來創(chuàng)建使用聂使,其實rxjava2給我們提供了五種觀察者模式的創(chuàng)建方式壁拉。

image

1、Observable 和 Observer

能夠發(fā)射0或n個數(shù)據(jù)柏靶,并以成功或錯誤事件終止弃理,在第一篇中已經(jīng)舉例說明了,這里就不再詳細說明屎蜓。

2痘昌、Flowable 和 Subscriber

能夠發(fā)射0或n個數(shù)據(jù),并以成功或錯誤事件終止梆靖。 支持背壓控汉,可以控制數(shù)據(jù)源發(fā)射的速度。

我們看到ObservableFlowable這兩個的區(qū)別就是后者支持背壓返吻,那么何為背壓姑子?

2.1、什么是背壓

背壓是一種現(xiàn)象测僵,簡單來說就是在異步操作中街佑,上游發(fā)送數(shù)據(jù)速度快于下游處理數(shù)據(jù)的速度,下游來不及處理捍靠,Buffer 溢出沐旨,導致事件阻塞,從而引起的各種問題榨婆,比如事件丟失磁携,OOM等。

rxjava1中并不支持背壓良风,當出現(xiàn)事件阻塞時候谊迄,會直接拋出 MissingBackpressureException 異常闷供,但是在rxjava2中,提供了 Flowable 來創(chuàng)建被觀察者统诺,通過Flowable 來處理背壓問題歪脏,我們可以簡單通過demo分析。

[站外圖片上傳中...(image-7e5758-1577706002259)]

A:我們上游模擬循環(huán)發(fā)送數(shù)據(jù)粮呢。

B:線程切換婿失,異步操作。

C:下游每隔一秒獲取數(shù)據(jù)啄寡。

我們Observable 創(chuàng)建豪硅,來模擬了背壓這個現(xiàn)象,我們在上游模擬無限循環(huán)的發(fā)送數(shù)據(jù)这难,下游每次都休眠一秒再獲取數(shù)據(jù)舟误,這樣肯定會造成我們前面提的問題葡秒,就是上游發(fā)送太他丫的快了姻乓,下游根本處理不過來,我們先看結果眯牧。

image

看日志蹋岩,打印結果停留在了13就沒有繼續(xù)打印了?同時可以看到程序已經(jīng)崩了学少,是因為在rxjava2中剪个,Observable并不支持背壓操作,遇到背壓問題版确,它并不會報錯扣囊,也不會拋MissingBackpressureException 異常,但是內(nèi)存會一直飆高绒疗,最后導致內(nèi)存不足程序直接掛掉侵歇。

[站外圖片上傳中...(image-41c5f6-1577706002259)]

可以看到內(nèi)存一直在往上飆,針對背壓這種現(xiàn)象吓蘑,rxjava2中提出用 Flowable 來處理惕虑。

下面由淺入深,慢慢揭開Flowable 的神秘面紗磨镶。

我們先用Flowable創(chuàng)建一個基本的demo:

       Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                emitter.onNext("事件一");
                LogUtil.d(TAG + "--subscribe  發(fā)送事件一");
                emitter.onNext("事件二");
                LogUtil.d(TAG + "--subscribe  發(fā)送事件二");
                emitter.onNext("事件三");
                LogUtil.d(TAG + "--subscribe  發(fā)送事件三");
                emitter.onNext("事件四");
                LogUtil.d(TAG + "--subscribe  發(fā)送事件四");
                emitter.onComplete();
                LogUtil.d(TAG + "--subscribe  發(fā)送完成");
            }
        }, BackpressureStrategy.ERROR) // 這里需要傳入背壓策略溃蔫,跟線程池里面飽和策略類似,當緩存區(qū)存滿時候采取的處理策略
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread()) // 線程切換琳猫,異步操作
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        LogUtil.d(TAG + "--onSubscribe");
                        // 決定觀察者能接收多少個事件伟叛,多余事件放入緩存區(qū)
                        // Flowable 默認緩存區(qū)大小為128,即最大能存放128個事件
                        s.request(3);
                    }

                    @Override
                    public void onNext(String s) {
                        LogUtil.d(TAG + "--onNext  接收到:" + s);
                    }

                    @Override
                    public void onError(Throwable t) {
                        LogUtil.d(TAG + "--onError  error=" + t.getLocalizedMessage());
                    }

                    @Override
                    public void onComplete() {
                        LogUtil.d(TAG + "--onComplete");
                    }
                });

可以看到Flowable創(chuàng)建和Observable基本差不多脐嫂,只是在create方法中多傳入BackpressureStrategy.ERROR 這么一個背壓策略统刮,這個后面會詳講侄榴。

onSubscribe 的回調(diào)中,參數(shù)變成了Subscription网沾,我們可以通過這個參數(shù)癞蚕,讓觀察者自己設置要接收多少個事件,如果發(fā)送的事件大于觀察者設置接收的事件辉哥,多余事件將會存入Flowable緩存區(qū)中桦山。

Flowable緩存區(qū)隊列大小只能存放128個事件,如果超過醋旦,就會報異常恒水。

結果:

[站外圖片上傳中...(image-af462-1577706002259)]

發(fā)送四個事件,觀察者通過Subscription.request(3)設置只接收三個事件饲齐,所以下游只接收三個钉凌,剩下一個放入Flowable緩存區(qū)中。

如果我們觀察者不設置Subscription.request(x),即不接收事件捂人,被觀察者仍然會發(fā)送事件御雕,并存入緩存區(qū)中,觀察者可以動態(tài)調(diào)用Subscription.request(x)方法來獲取事件滥搭。

        Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception {

                for (int x = 0; x <= 10; x++) {
                    LogUtil.d(TAG + "--subscribe  發(fā)送了" + x + "個事件");
                    emitter.onNext(x + "事件");
                }
            }
        }, BackpressureStrategy.ERROR) 
                // 線程切換酸纲,異步操作
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        LogUtil.d(TAG + "--onSubscribe");
                        subscription = s;
                        // s.request(3);  這里不指定觀察者接收事件個數(shù)
                    }

                    @Override
                    public void onNext(String s) {
                        LogUtil.d(TAG + "--onNext  接收到:" + s);
                    }

                    @Override
                    public void onError(Throwable t) {
                        LogUtil.d(TAG + "--onError  error=" + t.getLocalizedMessage());
                    }

                    @Override
                    public void onComplete() {
                        LogUtil.d(TAG + "--onComplete");
                    }
                });

動態(tài)獲取

findViewById(R.id.bt_get_event).setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                if (subscription != null) {
                    LogUtil.d(TAG + "--onClick");
                    subscription.request(4);
                }
            }
        });

可以看到我們觀察者一開始并沒有指定接收多少個事件,而是通過外接點擊事件瑟匆,來動態(tài)設置接收事件個數(shù)闽坡,我們看結果,當點擊觸發(fā)后愁溜,我們收到了最先存入隊列的四個事件疾嗅。

結果:

image

2.2、背壓策略

我們前面提到冕象,Flowable 默認的緩存區(qū)隊列大小為128代承,即只能存放上游發(fā)送的128個事件,如果上游發(fā)送的事件超過128交惯,就需要我們指定相應的背壓策略來做不同的處理次泽,BackpressureStrategy為我們提供了五種背壓策略。

[站外圖片上傳中...(image-287d84-1577706002259)]

整理如下:

策略 作用
MISSING 當緩存區(qū)大小存滿(128)席爽,被觀察者仍然繼續(xù)發(fā)送下一個事件時意荤,拋出異常MissingBackpressureException , 提示緩存區(qū)滿了
ERROR 當緩存區(qū)大小存滿(128)(默認緩存區(qū)大小128),被觀察者仍然繼續(xù)發(fā)送下一個事件時只锻,直接拋出異常MissingBackpressureException
BUFFER 當緩存區(qū)大小存滿(128)玖像,被觀察者仍然繼續(xù)發(fā)送下一個事件時,緩存區(qū)大小設置無限大, 即被觀察者可無限發(fā)送事件,但實際上是存放在緩存區(qū)
DROP 當緩存區(qū)大小存滿捐寥,被觀察者仍然繼續(xù)發(fā)送下一個事件時笤昨, 超過緩存區(qū)大小(128)的事件會被全部丟棄
LATEST 當緩存區(qū)大小存滿握恳,被觀察者仍然繼續(xù)發(fā)送下一個事件時瞒窒,只保存最新/最后發(fā)送的事件, 其他超過緩存區(qū)大邢缤荨(128)的事件會被全部丟棄

2.2.1崇裁、BackpressureStrategy.MISSING

當緩存區(qū)大小存滿(128),被觀察者仍然繼續(xù)發(fā)送下一個事件時束昵,拋出異常MissingBackpressureException , 提示緩存區(qū)滿了

        Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                // 發(fā)送129個事件拔稳,模擬超出緩存區(qū)
                for (int x = 0; x < 129; x++) {
                    emitter.onNext(x + "事件");
                    LogUtil.d(TAG + "--subscribe  發(fā)送了" + x + "個事件");
                }
            }
        }, BackpressureStrategy.MISSING) // 使用BackpressureStrategy.MISSING背壓策略
                // 線程切換,異步操作
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        LogUtil.d(TAG + "--onSubscribe");
                        s.request(Integer.MAX_VALUE);
                    }

                    @Override
                    public void onNext(String s) {
                        LogUtil.d(TAG + "--onNext  接收到:" + s);
                    }

                    @Override
                    public void onError(Throwable t) {
                        LogUtil.d(TAG + "--onError  error=" + t);
                    }

                    @Override
                    public void onComplete() {
                        LogUtil.d(TAG + "--onComplete");
                    }
                });

我們使用BackpressureStrategy.MISSING背壓策略锹雏,觀察者接收request(Integer.MAX_VALUE)巴比,此值也為推薦值。

結果:

image

我們看到礁遵,當發(fā)送了128個事件后轻绞,再發(fā)送第129個事件時候,拋了MissingBackpressureException異常榛丢,而且我們設置了觀察者接收也未接收到數(shù)據(jù)铲球,說明是先存入緩存區(qū)隊列,再發(fā)送晰赞,當緩存區(qū)中拋異常后,就停止了onNext()事件选侨,我們可以驗證一下掖鱼,當設置被觀察者發(fā)送128事件。

        Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                // *******  發(fā)送128個事件  ********
                for (int x = 0; x < 128; x++) {
                    emitter.onNext(x + "事件");
                    LogUtil.d(TAG + "--subscribe  發(fā)送了" + x + "個事件");
                }
            }
        }, BackpressureStrategy.MISSING)
                // 線程切換援制,異步操作
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        LogUtil.d(TAG + "--onSubscribe");
                        s.request(Integer.MAX_VALUE);
                    }

                    @Override
                    public void onNext(String s) {
                        LogUtil.d(TAG + "--onNext  接收到:" + s);
                    }

                    @Override
                    public void onError(Throwable t) {
                        LogUtil.d(TAG + "--onError  error=" + t);
                    }

                    @Override
                    public void onComplete() {
                        LogUtil.d(TAG + "--onComplete");
                    }
                });

就是在上面demo的基礎上戏挡,改了發(fā)送的事件個數(shù),上游發(fā)送128個事件晨仑,剛好為緩存區(qū)大小褐墅,并不拋異常。

結果:

image

我們看到程序沒有拋異常洪己,并且正常打印了緩存區(qū)中的128個數(shù)據(jù)(從0開始)妥凳,可以印證兩點

1、緩存區(qū)大小確實為128

2答捕、先存入緩存區(qū)后再獲仁旁俊(如果異常,onNext直接不調(diào)用)

2.2.2拱镐、BackpressureStrategy.ERROR

當緩存區(qū)大小存滿(128)(默認緩存區(qū)大小128)艘款,被觀察者仍然繼續(xù)發(fā)送下一個事件時持际,直接拋出異常MissingBackpressureException

        Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                // 發(fā)送129個事件,模擬超出緩存區(qū)
                for (int x = 0; x < 129; x++) {
                    emitter.onNext(x + "事件");
                    LogUtil.d(TAG + "--subscribe  發(fā)送了" + x + "個事件");
                }
            }
        }, BackpressureStrategy.ERROR)
                // 線程切換哗咆,異步操作
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        LogUtil.d(TAG + "--onSubscribe");
                        s.request(Integer.MAX_VALUE);
                    }

                    @Override
                    public void onNext(String s) {
                        LogUtil.d(TAG + "--onNext  接收到:" + s);
                    }

                    @Override
                    public void onError(Throwable t) {
                        LogUtil.d(TAG + "--onError  error=" + t);
                    }

                    @Override
                    public void onComplete() {
                        LogUtil.d(TAG + "--onComplete");
                    }
                });

使用 BackpressureStrategy.ERROR 背壓策略

結果:

[站外圖片上傳中...(image-b907e7-1577706002259)]

跟Missing一樣蜘欲,直接拋了MissingBackpressureException異常且下游未接收到數(shù)據(jù),同理晌柬,如果上游發(fā)送數(shù)據(jù)小于等于128芒填,正常發(fā)送和接收。

2.2.3空繁、BackpressureStrategy.BUFFER

當緩存區(qū)大小存滿(128)殿衰,被觀察者仍然繼續(xù)發(fā)送下一個事件時,緩存區(qū)大小設置無限大, 即被觀察者可無限發(fā)送事件,但實際上是存放在緩存區(qū)。

       Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                // 發(fā)送129個事件亦鳞,模擬超出緩存區(qū)
                for (int x = 0; x < 129; x++) {
                    emitter.onNext(x + "事件");
                    LogUtil.d(TAG + "--subscribe  發(fā)送了" + x + "個事件");
                }
            }
        }, BackpressureStrategy.BUFFER)
                // 線程切換晨抡,異步操作
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        LogUtil.d(TAG + "--onSubscribe");
                        s.request(Integer.MAX_VALUE);
                    }

                    @Override
                    public void onNext(String s) {
                        LogUtil.d(TAG + "--onNext  接收到:" + s);
                    }

                    @Override
                    public void onError(Throwable t) {
                        LogUtil.d(TAG + "--onError  error=" + t);
                    }

                    @Override
                    public void onComplete() {
                        LogUtil.d(TAG + "--onComplete");
                    }
                });

使用 BackpressureStrategy.BUFFER 背壓策略

更改緩存區(qū)大小,不做限制见剩。

結果:

image

可以看到,我們發(fā)送的129個事件全部發(fā)送且接收到了。

2.2.4悟衩、BackpressureStrategy.DROP

當緩存區(qū)大小存滿,被觀察者仍然繼續(xù)發(fā)送下一個事件時栓拜, 超過緩存區(qū)大凶尽(128)的事件會被全部丟棄

        Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                // 發(fā)送129個事件,模擬超出緩存區(qū)
                for (int x = 0; x < 129; x++) {
                    emitter.onNext(x + "事件");
                    LogUtil.d(TAG + "--subscribe  發(fā)送了" + x + "個事件");
                }
            }
        }, BackpressureStrategy.DROP)
                // 線程切換幕与,異步操作
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        LogUtil.d(TAG + "--onSubscribe");
                        s.request(Integer.MAX_VALUE);
                    }

                    @Override
                    public void onNext(String s) {
                        LogUtil.d(TAG + "--onNext  接收到:" + s);
                    }

                    @Override
                    public void onError(Throwable t) {
                        LogUtil.d(TAG + "--onError  error=" + t);
                    }

                    @Override
                    public void onComplete() {
                        LogUtil.d(TAG + "--onComplete");
                    }
                });

使用 BackpressureStrategy.DROP 背壓策略

丟掉大于緩存區(qū)的事件挑势。

結果:

[站外圖片上傳中...(image-ea3cc7-1577706002259)]

結果很明了,并沒有拋異常同時也正常打印了啦鸣,但是超過緩存區(qū)的那個事件被拋棄潮饱,并沒有獲取到。

2.2.5诫给、BackpressureStrategy.LATEST

當緩存區(qū)大小存滿香拉,被觀察者仍然繼續(xù)發(fā)送下一個事件時,只保存最新/最后發(fā)送的事件中狂, 其他超過緩存區(qū)大匈炻怠(128)的事件會被全部丟棄

        Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                // 發(fā)送150個事件
                for (int x = 0; x < 150; x++) {
                    emitter.onNext(x + "事件");
                    LogUtil.d(TAG + "--subscribe  發(fā)送了" + x + "個事件");
                }
            }
        }, BackpressureStrategy.LATEST)
                // 線程切換,異步操作
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        LogUtil.d(TAG + "--onSubscribe");
                        s.request(Integer.MAX_VALUE);
                    }

                    @Override
                    public void onNext(String s) {
                        LogUtil.d(TAG + "--onNext  接收到:" + s);
                    }

                    @Override
                    public void onError(Throwable t) {
                        LogUtil.d(TAG + "--onError  error=" + t);
                    }

                    @Override
                    public void onComplete() {
                        LogUtil.d(TAG + "--onComplete");
                    }
                });

使用 BackpressureStrategy.LATEST 背壓策略

發(fā)送了150個事件

當超出128時吃型,會保存最新的一個事件证鸥,即會接收129個事件。

結果:

image

我們可以看到,觀察者端接收到129個數(shù)據(jù)枉层,分別為緩存區(qū)內(nèi)數(shù)據(jù)泉褐,加上最新/最后一條數(shù)據(jù),中間數(shù)據(jù)均被丟棄鸟蜡。

2.3膜赃、同步情況下Flowable

前面說過,背壓前提是異步操作下揉忘,在同步下跳座,我們并不會有背壓一說,因為在同一個線程泣矛,發(fā)送數(shù)據(jù)后總是要等下游處理了才會發(fā)送第二條數(shù)據(jù)疲眷,不會存在緩沖區(qū),如下:

       Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                LogUtil.d(TAG + "--subscribe  發(fā)送事件一");
                emitter.onNext("事件一");
                LogUtil.d(TAG + "--subscribe  發(fā)送事件二");
                emitter.onNext("事件二");
                LogUtil.d(TAG + "--subscribe  發(fā)送事件三");
                emitter.onNext("事件三");
                LogUtil.d(TAG + "--subscribe  發(fā)送完成");
                emitter.onComplete();
            }
        }, BackpressureStrategy.ERROR).subscribe(new Subscriber<String>() {
            @Override
            public void onSubscribe(Subscription s) {
                LogUtil.d(TAG + "--onSubscribe");
                s.request(3);
            }

            @Override
            public void onNext(String s) {
                LogUtil.d(TAG + "--onNext  接收到:" + s);
            }

            @Override
            public void onError(Throwable t) {
                LogUtil.d(TAG + "--onError  error=" + t);
            }

            @Override
            public void onComplete() {
                LogUtil.d(TAG + "--onComplete");
            }
        });

結果:

image

可以看到您朽,事件都是順序執(zhí)行狂丝,發(fā)送一條接收一條,然后再執(zhí)行下一條哗总。

但是几颜,我們可能會遇到這個一個情況,當上游發(fā)送了四條數(shù)據(jù)讯屈,但是下游只接收三條蛋哭?我們改一下demo如下:

       Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                LogUtil.d(TAG + "--subscribe  發(fā)送事件一");
                emitter.onNext("事件一");
                LogUtil.d(TAG + "--subscribe  發(fā)送事件二");
                emitter.onNext("事件二");
                LogUtil.d(TAG + "--subscribe  發(fā)送事件三");
                emitter.onNext("事件三");
                LogUtil.d(TAG + "--subscribe  發(fā)送事件四");
                emitter.onNext("事件四");
                LogUtil.d(TAG + "--subscribe  發(fā)送完成");
                emitter.onComplete();
            }
        }, BackpressureStrategy.ERROR).subscribe(new Subscriber<String>() {
            @Override
            public void onSubscribe(Subscription s) {
                LogUtil.d(TAG + "--onSubscribe");
                s.request(3);

            }

            @Override
            public void onNext(String s) {
                LogUtil.d(TAG + "--onNext  接收到:" + s);
            }

            @Override
            public void onError(Throwable t) {
                LogUtil.d(TAG + "--onError  error=" + t);
            }

            @Override
            public void onComplete() {
                LogUtil.d(TAG + "--onComplete");
            }
        });

可以看到,被觀察者發(fā)送了四個事件涮母,但是觀察者只接收了三條谆趾。

結果:

[站外圖片上傳中...(image-2790ee-1577706002259)]

可以看到,同樣拋了MissingBackpressureException異常

這里可以使用BUFFER的背壓策略來處理哈蝇,但是我們?yōu)榱苏f明觀察者反向控制被觀察者棺妓,我們采用如下方案:

        Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                // 通過emitter.requested()獲取觀察者設置的接收的事件數(shù)目
                long requested = emitter.requested();
                LogUtil.d(TAG + "--subscribe 觀察者設置接收的事件數(shù)目:" + requested);

                for (int x = 0; x < requested; x++) {
                    LogUtil.d(TAG + "--subscribe  發(fā)送事件" + x);
                    emitter.onNext("發(fā)送事件" + x);
                }
                LogUtil.d(TAG + "--subscribe  發(fā)送完成");
                emitter.onComplete();
            }
        }, BackpressureStrategy.BUFFER).subscribe(new Subscriber<String>() {
            @Override
            public void onSubscribe(Subscription s) {
                LogUtil.d(TAG + "--onSubscribe");
                // 設置觀察者接收事件數(shù)目為3
                s.request(3);

            }

            @Override
            public void onNext(String s) {
                LogUtil.d(TAG + "--onNext  接收到:" + s);
            }

            @Override
            public void onError(Throwable t) {
                LogUtil.e(TAG + "--onError  error=" + t);
            }

            @Override
            public void onComplete() {
                LogUtil.d(TAG + "--onComplete");
            }
        });

我們在subscribe中通過emitter.requested()獲取觀察者中設置的接收事件數(shù)目,來動態(tài)的發(fā)送數(shù)據(jù)炮赦,這樣就避免了上下游數(shù)據(jù)不同步問題。

結果:

image

2.4样勃、使用操作符時背壓處理

我們前面都是通過create來創(chuàng)建Flowable吠勘,可以在Create第二個參數(shù)中傳入相應的背壓策略,Flowable所有的操作符都支持背壓峡眶,但是通過操作符創(chuàng)建的背壓策略默認為BackpressureStrategy.ERROR剧防,我們可以通過

  • onBackpressureBuffer()
  • onBackpressureDrop()
  • onBackpressureLatest()

三種方式來指定相應的背壓策略。

        Flowable.interval(1, TimeUnit.MILLISECONDS)
                .observeOn(Schedulers.io())
                .subscribe(new Subscriber<Long>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        subscription = s;
                        s.request(Long.MAX_VALUE); //默認可以接收Long.MAX_VALUE個事件
                    }

                    @Override
                    public void onNext(Long aLong) {
                        LogUtil.i(TAG + "--onNext  aLong=" + aLong);
                        try {
                            // 延時一秒接收
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                    @Override
                    public void onError(Throwable t) {
                        LogUtil.e(TAG + "--onError  error=" + t);
                    }

                    @Override
                    public void onComplete() {
                        LogUtil.i(TAG + "--onComplete");
                    }
                });

這里我們通過 interval來創(chuàng)建Flowable辫樱,可以看到下游每一毫秒發(fā)送一條數(shù)據(jù)峭拘,下游一秒處理一條,上游明顯快于下游,處理不過來數(shù)據(jù)放入緩存池中鸡挠,當緩存池中隊列滿時辉饱,就會拋異常,因為其默認的背壓策略為BackpressureStrategy.ERROR

結果:

image

我們可以通過onBackpressureXXX其指定相應的背壓策略拣展。

image

結果:

[站外圖片上傳中...(image-b45029-1577706002259)]

當我們指定背壓策略為BUFFER后彭沼,可以看到并沒有異常拋出,程序一直在打印輸出备埃。

3姓惑、Single和SingleObserver

只發(fā)射單個數(shù)據(jù)或錯誤事件。

        Single.create(new SingleOnSubscribe<String>() {
            @Override
            public void subscribe(SingleEmitter<String> emitter) throws Exception {
                // 只能發(fā)送onSuccess或者onError按脚,發(fā)射多條數(shù)據(jù)于毙,只接受第一條
                emitter.onSuccess("Success");
                emitter.onError(new NullPointerException(""));
            }
        }).subscribe(new SingleObserver<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                LogUtil.d(TAG + "--onSubscribe");
            }

            @Override
            public void onSuccess(String s) {
                LogUtil.d(TAG + "--onSuccess  s=" + s);
            }

            @Override
            public void onError(Throwable e) {
                LogUtil.e(TAG + "--onError  error=" + e.getMessage());
            }
        });

SingleEmitter發(fā)射器只能發(fā)送一條onSuccess或者onError數(shù)據(jù),如果發(fā)射器發(fā)射多條數(shù)據(jù)辅搬,觀察者只能接收到第一條數(shù)據(jù)唯沮。

結果:

image

4、Completable和CompletableObserver

不發(fā)射數(shù)據(jù)伞辛,只處理 onComplete 和 onError 事件烂翰。

[圖片上傳失敗...(image-34cb09-1577706002259)]

方法onCompleteonError只可調(diào)用一個,同時調(diào)用蚤氏,第一個生效甘耿。

5、Maybe和MaybeObserver

能夠發(fā)射0或者1個數(shù)據(jù)竿滨,要么成功佳恬,要么失敗。有點類似于Optional于游。

[站外圖片上傳中...(image-63ca30-1577706002259)]

onSuccess方法一次訂閱只能發(fā)送一次毁葱。

方法onCompleteonError只可調(diào)用一個,同時調(diào)用贰剥,第一個生效倾剿。

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市蚌成,隨后出現(xiàn)的幾起案子前痘,更是在濱河造成了極大的恐慌,老刑警劉巖担忧,帶你破解...
    沈念sama閱讀 221,820評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件芹缔,死亡現(xiàn)場離奇詭異,居然都是意外死亡瓶盛,警方通過查閱死者的電腦和手機最欠,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,648評論 3 399
  • 文/潘曉璐 我一進店門示罗,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人芝硬,你說我怎么就攤上這事蚜点。” “怎么了吵取?”我有些...
    開封第一講書人閱讀 168,324評論 0 360
  • 文/不壞的土叔 我叫張陵禽额,是天一觀的道長。 經(jīng)常有香客問我皮官,道長脯倒,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,714評論 1 297
  • 正文 為了忘掉前任捺氢,我火速辦了婚禮藻丢,結果婚禮上,老公的妹妹穿的比我還像新娘摄乒。我一直安慰自己悠反,他們只是感情好,可當我...
    茶點故事閱讀 68,724評論 6 397
  • 文/花漫 我一把揭開白布馍佑。 她就那樣靜靜地躺著斋否,像睡著了一般。 火紅的嫁衣襯著肌膚如雪拭荤。 梳的紋絲不亂的頭發(fā)上茵臭,一...
    開封第一講書人閱讀 52,328評論 1 310
  • 那天,我揣著相機與錄音舅世,去河邊找鬼旦委。 笑死,一個胖子當著我的面吹牛雏亚,可吹牛的內(nèi)容都是我干的缨硝。 我是一名探鬼主播,決...
    沈念sama閱讀 40,897評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼罢低,長吁一口氣:“原來是場噩夢啊……” “哼查辩!你這毒婦竟也來了?” 一聲冷哼從身側響起网持,我...
    開封第一講書人閱讀 39,804評論 0 276
  • 序言:老撾萬榮一對情侶失蹤宜肉,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后翎碑,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,345評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡之斯,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,431評論 3 340
  • 正文 我和宋清朗相戀三年日杈,在試婚紗的時候發(fā)現(xiàn)自己被綠了遣铝。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,561評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡莉擒,死狀恐怖酿炸,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情涨冀,我是刑警寧澤填硕,帶...
    沈念sama閱讀 36,238評論 5 350
  • 正文 年R本政府宣布,位于F島的核電站鹿鳖,受9級特大地震影響扁眯,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜翅帜,卻給世界環(huán)境...
    茶點故事閱讀 41,928評論 3 334
  • 文/蒙蒙 一姻檀、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧涝滴,春花似錦绣版、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,417評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至韩脏,卻和暖如春缩麸,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背骤素。 一陣腳步聲響...
    開封第一講書人閱讀 33,528評論 1 272
  • 我被黑心中介騙來泰國打工匙睹, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人济竹。 一個月前我還...
    沈念sama閱讀 48,983評論 3 376
  • 正文 我出身青樓痕檬,卻偏偏與公主長得像,于是被迫代替她去往敵國和親送浊。 傳聞我的和親對象是個殘疾皇子梦谜,可洞房花燭夜當晚...
    茶點故事閱讀 45,573評論 2 359