Carson帶你學(xué)Android:圖文解析RxJava背壓策略


前言

Rxjava妻柒,由于其基于事件流的鏈?zhǔn)秸{(diào)用祝沸、邏輯簡潔 & 使用簡單的特點(diǎn)封恰,深受各大 Android開發(fā)者的歡迎麻养。

如果還不了解RxJava,請(qǐng)看文章:Android:這是一篇 清晰 & 易懂的Rxjava 入門教程

  • 本文主要講解的是RxJava中的 背壓控制策略诺舔,希望你們會(huì)喜歡鳖昌。

Carson帶你學(xué)RxJava系列文章,包括 原理低飒、操作符许昨、應(yīng)用場(chǎng)景、背壓等等褥赊,請(qǐng)關(guān)注看文章:Android:這是一份全面 & 詳細(xì)的RxJava學(xué)習(xí)指南


目錄

示意圖

1. 引言

1.1 背景

  • 觀察者 & 被觀察者 之間存在2種訂閱關(guān)系:同步 & 異步糕档。具體如下:
示意圖
  • 對(duì)于異步訂閱關(guān)系,存在 被觀察者發(fā)送事件速度 與觀察者接收事件速度 不匹配的情況
  1. 發(fā)送 & 接收事件速度 = 單位時(shí)間內(nèi) 發(fā)送&接收事件的數(shù)量
  2. 大多數(shù)情況崭倘,主要是 被觀察者發(fā)送事件速度 > 觀察者接收事件速度

1.2 問題

  • 被觀察者 發(fā)送事件速度太快,而觀察者 來不及接收所有事件类垫,從而導(dǎo)致觀察者無法及時(shí)響應(yīng) / 處理所有發(fā)送過來事件的問題司光,最終導(dǎo)致緩存區(qū)溢出、事件丟失 & OOM
  1. 如悉患,點(diǎn)擊按鈕事件:連續(xù)過快的點(diǎn)擊按鈕10次残家,則只會(huì)造成點(diǎn)擊2次的效果;
  2. 解釋:因?yàn)辄c(diǎn)擊速度太快了售躁,所以按鈕來不及響應(yīng)

下面再舉個(gè)例子:

  • 被觀察者的發(fā)送事件速度 = 10ms / 個(gè)
  • 觀察者的接收事件速度 = 5s / 個(gè)

即出現(xiàn)發(fā)送 & 接收事件嚴(yán)重不匹配的問題

 Observable.create(new ObservableOnSubscribe<Integer>() {
            // 1. 創(chuàng)建被觀察者 & 生產(chǎn)事件
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {

                for (int i = 0; ; i++) {
                    Log.d(TAG, "發(fā)送了事件"+ i );
                    Thread.sleep(10);
                    // 發(fā)送事件速度:10ms / 個(gè) 
                    emitter.onNext(i);

                }
                
            }
        }).subscribeOn(Schedulers.io()) // 設(shè)置被觀察者在io線程中進(jìn)行
                .observeOn(AndroidSchedulers.mainThread()) // 設(shè)置觀察者在主線程中進(jìn)行
             .subscribe(new Observer<Integer>() {
            // 2. 通過通過訂閱(subscribe)連接觀察者和被觀察者
                 
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "開始采用subscribe連接");
            }

            @Override
            public void onNext(Integer value) {

                try {
                    // 接收事件速度:5s / 個(gè) 
                    Thread.sleep(5000);
                    Log.d(TAG, "接收到了事件"+ value  );
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "對(duì)Error事件作出響應(yīng)");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "對(duì)Complete事件作出響應(yīng)");
            }

        });
  • 結(jié)果
    由于被觀察者發(fā)送事件速度 > 觀察者接收事件速度坞淮,所以出現(xiàn)流速不匹配問題,從而導(dǎo)致OOM
    示意圖

1.3 解決方案

采用 背壓策略陪捷。

下面回窘,我將開始介紹背壓策略。


2. 背壓策略簡介

2.1 定義

一種 控制事件流速 的策略

2.2 作用

異步訂閱關(guān)系 中市袖,控制事件發(fā)送 & 接收的速度

注:背壓的作用域 = 異步訂閱關(guān)系啡直,即 被觀察者 & 觀察者處在不同線程中

2.3 解決的問題

解決了 因被觀察者發(fā)送事件速度 與 觀察者接收事件速度 不匹配(一般是前者 快于 后者),從而導(dǎo)致觀察者無法及時(shí)響應(yīng) / 處理所有 被觀察者發(fā)送事件 的問題

2.4 應(yīng)用場(chǎng)景

  • 被觀察者發(fā)送事件速度 與 觀察者接收事件速度 不匹配的場(chǎng)景
  • 具體場(chǎng)景就取決于 該事件的類型苍碟,如:網(wǎng)絡(luò)請(qǐng)求酒觅,那么具體場(chǎng)景:有很多網(wǎng)絡(luò)請(qǐng)求需要執(zhí)行,但執(zhí)行者的執(zhí)行速度沒那么快微峰,此時(shí)就需要使用背壓策略來進(jìn)行控制舷丹。

3. 背壓策略的原理

  • 那么,RxJava實(shí)現(xiàn)背壓策略(Backpressure)的原理是什么呢蜓肆?
  • 解決方案 & 思想主要如下:
示意圖
  • 示意圖如下
示意圖
  • RxJava1.0 中被觀察者的舊實(shí)現(xiàn) Observable 對(duì)比
示意圖
  • 好了颜凯,那么上圖中在RxJava 2.0觀察者模型中谋币,Flowable到底是什么呢?它其實(shí)是RxJava 2.0中被觀察者的一種新實(shí)現(xiàn)装获,同時(shí)也是背壓策略實(shí)現(xiàn)的承載者
  • 請(qǐng)繼續(xù)看下一節(jié)的介紹:背壓策略的具體實(shí)現(xiàn) - Flowable

4. 背壓策略的具體實(shí)現(xiàn):Flowable

RxJava2.0中瑞信,采用 Flowable 實(shí)現(xiàn) 背壓策略

正確來說,應(yīng)該是 “非阻塞式背壓” 策略

4.1 Flowable 介紹

  • 定義:在 RxJava2.0中穴豫,被觀察者(Observable)的一種新實(shí)現(xiàn)

同時(shí)凡简,RxJava1.0 中被觀察者(Observable)的舊實(shí)現(xiàn): Observable依然保留

  • 作用:實(shí)現(xiàn) 非阻塞式背壓 策略

4.2 Flowable 特點(diǎn)

  • Flowable的特點(diǎn) 具體如下
示意圖
  • 下面再貼出一張RxJava2.0RxJava1.0的觀察者模型的對(duì)比圖

實(shí)際上,RxJava2.0 也有保留(被觀察者)Observerble - Observer(觀察者)的觀察者模型精肃,此處只是為了做出對(duì)比讓讀者了解

示意圖

4.3 與 RxJava1.0 中被觀察者的舊實(shí)現(xiàn) Observable 的關(guān)系

  • 具體如下圖
示意圖
  • 那么秤涩,為什么要采用新實(shí)現(xiàn)Flowable實(shí)現(xiàn)背壓,而不采用舊的Observable呢司抱?
  • 主要原因:舊實(shí)現(xiàn)Observable無法很好解決背壓問題筐眷。
示意圖

4.4 Flowable的基礎(chǔ)使用

  • Flowable的基礎(chǔ)使用非常類似于 Observable
  • 具體如下
/**
  * 步驟1:創(chuàng)建被觀察者 =  Flowable
  */
        Flowable<Integer> upstream = Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }, BackpressureStrategy.ERROR);
        // 需要傳入背壓參數(shù)BackpressureStrategy,下面會(huì)詳細(xì)講解

 /**
   * 步驟2:創(chuàng)建觀察者 =  Subscriber
   */
        Subscriber<Integer> downstream = new Subscriber<Integer>() {

            @Override
            public void onSubscribe(Subscription s) {
                // 對(duì)比Observer傳入的Disposable參數(shù)习柠,Subscriber此處傳入的參數(shù) = Subscription
                // 相同點(diǎn):Subscription具備Disposable參數(shù)的作用匀谣,即Disposable.dispose()切斷連接, 同樣的調(diào)用Subscription.cancel()切斷連接
                // 不同點(diǎn):Subscription增加了void request(long n)
                Log.d(TAG, "onSubscribe");
                s.request(Long.MAX_VALUE);
               // 關(guān)于request()下面會(huì)繼續(xù)詳細(xì)說明
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: " + integer);
            }

            @Override
            public void onError(Throwable t) {
                Log.w(TAG, "onError: ", t);
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        };

 /**
   * 步驟3:建立訂閱關(guān)系
   */
        upstream.subscribe(downstream);

示意圖
  • 更加優(yōu)雅的鏈?zhǔn)秸{(diào)用
        // 步驟1:創(chuàng)建被觀察者 =  Flowable
        Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "發(fā)送事件 1");
                emitter.onNext(1);
                Log.d(TAG, "發(fā)送事件 2");
                emitter.onNext(2);
                Log.d(TAG, "發(fā)送事件 3");
                emitter.onNext(3);
                Log.d(TAG, "發(fā)送完成");
                emitter.onComplete();
            }
        }, BackpressureStrategy.ERROR)
                .subscribe(new Subscriber<Integer>() {
                // 步驟2:創(chuàng)建觀察者 =  Subscriber & 建立訂閱關(guān)系

                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        s.request(3);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "接收到了事件" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
  • 至此,Flowable的基礎(chǔ)使用講解完
  • 關(guān)于更深層次的使用會(huì)結(jié)合 背壓策略的實(shí)現(xiàn) 來講解

5. 背壓策略的使用

  • 在本節(jié)中资溃,我將結(jié)合 背壓策略的原理 & Flowable的使用武翎,為大家介紹在RxJava 2.0 中該如何使用Flowable來實(shí)現(xiàn)背壓策略功能,即背壓策略的使用
  • FlowableObservable在功能上的區(qū)別主要是 多了背壓的功能
  • 下面溶锭,我將順著第3節(jié)中講解背壓策略實(shí)現(xiàn)原理 & 解決方案(如下圖)宝恶,來講解Flowable在背壓策略功能上的使用
示意圖

注:

  1. 由于第2節(jié)中提到,使用背壓的場(chǎng)景 = 異步訂閱關(guān)系趴捅,所以下文中講解的主要是異步訂閱關(guān)系場(chǎng)景垫毙,即 被觀察者 & 觀察者 工作在不同線程中
  2. 但由于在同步訂閱關(guān)系的場(chǎng)景也可能出現(xiàn)流速不匹配的問題,所以在講解異步情況后拱绑,會(huì)稍微講解一下同步情況综芥,以方便對(duì)比

5.1 控制 觀察者接收事件 的速度

5.1.1 異步訂閱情況
  • 簡介
示意圖
  • 具體原理圖
示意圖
  • 具體使用
// 1. 創(chuàng)建被觀察者Flowable
        Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                // 一共發(fā)送4個(gè)事件
                Log.d(TAG, "發(fā)送事件 1");
                emitter.onNext(1);
                Log.d(TAG, "發(fā)送事件 2");
                emitter.onNext(2);
                Log.d(TAG, "發(fā)送事件 3");
                emitter.onNext(3);
                Log.d(TAG, "發(fā)送事件 4");
                emitter.onNext(4);
                Log.d(TAG, "發(fā)送完成");
                emitter.onComplete();
            }
        }, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()) // 設(shè)置被觀察者在io線程中進(jìn)行
                .observeOn(AndroidSchedulers.mainThread()) // 設(shè)置觀察者在主線程中進(jìn)行
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        // 對(duì)比Observer傳入的Disposable參數(shù),Subscriber此處傳入的參數(shù) = Subscription
                        // 相同點(diǎn):Subscription參數(shù)具備Disposable參數(shù)的作用猎拨,即Disposable.dispose()切斷連接, 同樣的調(diào)用Subscription.cancel()切斷連接
                        // 不同點(diǎn):Subscription增加了void request(long n)

                        s.request(3);
                        // 作用:決定觀察者能夠接收多少個(gè)事件
                        // 如設(shè)置了s.request(3)毫痕,這就說明觀察者能夠接收3個(gè)事件(多出的事件存放在緩存區(qū))
                        // 官方默認(rèn)推薦使用Long.MAX_VALUE,即s.request(Long.MAX_VALUE);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "接收到了事件" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

  • 效果圖
示意圖
  • 有2個(gè)結(jié)論是需要大家注意的
示意圖

下圖 = 當(dāng)緩存區(qū)存滿時(shí)(128個(gè)事件)溢出報(bào)錯(cuò)的原理圖

示意圖
  • 代碼演示1:觀察者不接收事件的情況下迟几,被觀察者繼續(xù)發(fā)送事件 & 存放到緩存區(qū)消请;再按需取出
 /**
    * 步驟1:設(shè)置變量
    */
    private static final String TAG = "Rxjava";
    private Button btn; // 該按鈕用于調(diào)用Subscription.request(long n )
    private Subscription mSubscription; // 用于保存Subscription對(duì)象
    
  /**
    * 步驟2:設(shè)置點(diǎn)擊事件 = 調(diào)用Subscription.request(long n )
    */
        btn = (Button) findViewById(R.id.btn);
        btn.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View view) {
                mSubscription.request(2);
            }

        });

        /**
         * 步驟3:異步調(diào)用
         */
        Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "發(fā)送事件 1");
                emitter.onNext(1);
                Log.d(TAG, "發(fā)送事件 2");
                emitter.onNext(2);
                Log.d(TAG, "發(fā)送事件 3");
                emitter.onNext(3);
                Log.d(TAG, "發(fā)送事件 4");
                emitter.onNext(4);
                Log.d(TAG, "發(fā)送完成");
                emitter.onComplete();
            }
        }, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()) // 設(shè)置被觀察者在io線程中進(jìn)行
                .observeOn(AndroidSchedulers.mainThread()) // 設(shè)置觀察者在主線程中進(jìn)行
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;
                        // 保存Subscription對(duì)象,等待點(diǎn)擊按鈕時(shí)(調(diào)用request(2))觀察者再接收事件
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "接收到了事件" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

示意圖
  • 代碼演示2:觀察者不接收事件的情況下类腮,被觀察者繼續(xù)發(fā)送事件至超出緩存區(qū)大须(128)
Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                // 一共發(fā)送129個(gè)事件,即超出了緩存區(qū)的大小
                for (int i = 0;i< 129; i++) {
                    Log.d(TAG, "發(fā)送了事件" + i);
                    emitter.onNext(i);
                }
                emitter.onComplete();
            }
        }, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()) // 設(shè)置被觀察者在io線程中進(jìn)行
                .observeOn(AndroidSchedulers.mainThread()) // 設(shè)置觀察者在主線程中進(jìn)行
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        // 默認(rèn)不設(shè)置可接收事件大小
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "接收到了事件" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
示意圖

5.1.2 同步訂閱情況

同步訂閱 & 異步訂閱 的區(qū)別在于:

  • 同步訂閱中蚜枢,被觀察者 & 觀察者工作于同1線程
  • 同步訂閱關(guān)系中沒有緩存區(qū)
示意圖
  • 被觀察者在發(fā)送1個(gè)事件后缸逃,必須等待觀察者接收后针饥,才能繼續(xù)發(fā)下1個(gè)事件
/**
         * 步驟1:創(chuàng)建被觀察者 =  Flowable
         */
        Flowable<Integer> upstream = Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                
                // 發(fā)送3個(gè)事件
                Log.d(TAG, "發(fā)送了事件1");
                emitter.onNext(1);
                Log.d(TAG, "發(fā)送了事件2");
                emitter.onNext(2);
                Log.d(TAG, "發(fā)送了事件3");
                emitter.onNext(3);
                emitter.onComplete();
            }
        }, BackpressureStrategy.ERROR);

        /**
         * 步驟2:創(chuàng)建觀察者 =  Subscriber
         */
        Subscriber<Integer> downstream = new Subscriber<Integer>() {

            @Override
            public void onSubscribe(Subscription s) {
                Log.d(TAG, "onSubscribe");
                 s.request(3);
                 // 每次可接收事件 = 3 二次匹配
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "接收到了事件 " + integer);
            }

            @Override
            public void onError(Throwable t) {
                Log.w(TAG, "onError: ", t);
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        };

        /**
         * 步驟3:建立訂閱關(guān)系
         */
        upstream.subscribe(downstream);
示意圖

所以,實(shí)際上并不會(huì)出現(xiàn)被觀察者發(fā)送事件速度 > 觀察者接收事件速度的情況需频。可是丁眼,卻會(huì)出現(xiàn)被觀察者發(fā)送事件數(shù)量 > 觀察者接收事件數(shù)量的問題。

  • 如:觀察者只能接受3個(gè)事件昭殉,但被觀察者卻發(fā)送了4個(gè)事件苞七,所以出現(xiàn)了不匹配情況
/**
         * 步驟1:創(chuàng)建被觀察者 =  Flowable
         */
        Flowable<Integer> upstream = Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {

                // 被觀察者發(fā)送事件數(shù)量 = 4個(gè)
                Log.d(TAG, "發(fā)送了事件1");
                emitter.onNext(1);
                Log.d(TAG, "發(fā)送了事件2");
                emitter.onNext(2);
                Log.d(TAG, "發(fā)送了事件3");
                emitter.onNext(3);
                Log.d(TAG, "發(fā)送了事件4");
                emitter.onNext(4);
                emitter.onComplete();
            }
        }, BackpressureStrategy.ERROR);

        /**
         * 步驟2:創(chuàng)建觀察者 =  Subscriber
         */
        Subscriber<Integer> downstream = new Subscriber<Integer>() {

            @Override
            public void onSubscribe(Subscription s) {
                Log.d(TAG, "onSubscribe");
                 s.request(3);
                 // 觀察者接收事件 = 3個(gè) ,即不匹配
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "接收到了事件 " + integer);
            }

            @Override
            public void onError(Throwable t) {
                Log.w(TAG, "onError: ", t);
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        };

        /**
         * 步驟3:建立訂閱關(guān)系
         */
        upstream.subscribe(downstream);
示意圖

所以挪丢,對(duì)于沒有緩存區(qū)概念的同步訂閱關(guān)系來說蹂风,單純采用控制觀察者的接收事件數(shù)量(響應(yīng)式拉取)實(shí)際上就等于 “單相思”乾蓬,雖然觀察者控制了要接收3個(gè)事件惠啄,但假設(shè)被觀察者需要發(fā)送4個(gè)事件,還是會(huì)出現(xiàn)問題任内。

在下面講解 5.2 控制被觀察者發(fā)送事件速度 時(shí)會(huì)解決這個(gè)問題撵渡。

  • 有1個(gè)特殊情況需要注意
示意圖
  • 代碼演示
/**
  * 同步情況
  */

        /**
         * 步驟1:創(chuàng)建被觀察者 =  Flowable
         */
        Flowable<Integer> upstream = Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "發(fā)送了事件1");
                emitter.onNext(1);
                Log.d(TAG, "發(fā)送了事件2");
                emitter.onNext(2);
                Log.d(TAG, "發(fā)送了事件3");
                emitter.onNext(3);
                emitter.onComplete();
            }
        }, BackpressureStrategy.ERROR);

        /**
         * 步驟2:創(chuàng)建觀察者 =  Subscriber
         */
        Subscriber<Integer> downstream = new Subscriber<Integer>() {

            @Override
            public void onSubscribe(Subscription s) {
                Log.d(TAG, "onSubscribe");
                // 不設(shè)置request(long n)
                // s.request(Long.MAX_VALUE);

            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: " + integer);
            }

            @Override
            public void onError(Throwable t) {
                Log.w(TAG, "onError: ", t);
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        };

        /**
         * 步驟3:建立訂閱關(guān)系
         */
        upstream.subscribe(downstream);

在被觀察者發(fā)送第1個(gè)事件后, 就拋出MissingBackpressureException異常 & 觀察者沒有收到任何事件

示意圖

5.2 控制 被觀察者發(fā)送事件 的速度

  • 簡介
示意圖
  • FlowableEmitter類的requested()介紹

public interface FlowableEmitter<T> extends Emitter<T> {
// FlowableEmitter = 1個(gè)接口,繼承自Emitter
// Emitter接口方法包括:onNext(),onComplete() & onError

    
    long requested();
    // 作用:返回當(dāng)前線程中request(a)中的a值
    // 該request(a)則是措施1中講解的方法死嗦,作用  = 設(shè)置
   
    ....// 僅貼出關(guān)鍵代碼

}

  • 每個(gè)線程中的requested()的返回值 = 該線程中的request(a)的a值

  • 對(duì)應(yīng)于同步 & 異步訂閱情況 的原理圖

示意圖

為了方便大家理解該策略中的requested()使用趋距,該節(jié)會(huì)先講解同步訂閱情況,再講解異步訂閱情況


5.2.1 同步訂閱情況

  • 原理說明
示意圖

即在同步訂閱情況中越走,被觀察者 通過 FlowableEmitter.requested()獲得了觀察者自身接收事件能力棚品,從而根據(jù)該信息控制事件發(fā)送速度靠欢,從而達(dá)到了觀察者反向控制被觀察者的效果

  • 具體使用
    下面的例子 = 被觀察者根據(jù)觀察者自身接收事件能力(10個(gè)事件)廊敌,從而僅發(fā)送10個(gè)事件
Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                
                // 調(diào)用emitter.requested()獲取當(dāng)前觀察者需要接收的事件數(shù)量
                long n = emitter.requested();

                Log.d(TAG, "觀察者可接收事件" + n);

                // 根據(jù)emitter.requested()的值,即當(dāng)前觀察者需要接收的事件數(shù)量來發(fā)送事件
                for (int i = 0; i < n; i++) {
                    Log.d(TAG, "發(fā)送了事件" + i);
                    emitter.onNext(i);
                }
            }
        }, BackpressureStrategy.ERROR)
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");

                        // 設(shè)置觀察者每次能接受10個(gè)事件
                        s.request(10);

                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "接收到了事件" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
示意圖
  • 特別注意
    在同步訂閱情況中使用FlowableEmitter.requested()時(shí)门怪,有以下幾種使用特性需要注意的:
示意圖

情況1:可疊加性

  • 即:觀察者可連續(xù)要求接收事件骡澈,被觀察者會(huì)進(jìn)行疊加并一起發(fā)送
Subscription.request(a1);
Subscription.request(a2)掷空;

FlowableEmitter.requested()的返回值 = a1 + a2
  • 代碼演示
Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                        
                // 調(diào)用emitter.requested()獲取當(dāng)前觀察者需要接收的事件數(shù)量
                Log.d(TAG, "觀察者可接收事件" + emitter.requested());

            }
        }, BackpressureStrategy.ERROR)
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");

                        s.request(10); // 第1次設(shè)置觀察者每次能接受10個(gè)事件
                        s.request(20); // 第2次設(shè)置觀察者每次能接受20個(gè)事件

                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "接收到了事件" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
示意圖

情況2:實(shí)時(shí)更新性

  • 即肋殴,每次發(fā)送事件后,emitter.requested()會(huì)實(shí)時(shí)更新觀察者能接受的事件
  1. 即一開始觀察者要接收10個(gè)事件坦弟,發(fā)送了1個(gè)后护锤,會(huì)實(shí)時(shí)更新為9個(gè)
  2. 僅計(jì)算Next事件,complete & error事件不算酿傍。

Subscription.request(10)烙懦;
// FlowableEmitter.requested()的返回值 = 10

FlowableEmitter.onNext(1); // 發(fā)送了1個(gè)事件
// FlowableEmitter.requested()的返回值 = 9
  • 代碼演示
Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {

                // 1. 調(diào)用emitter.requested()獲取當(dāng)前觀察者需要接收的事件數(shù)量
                Log.d(TAG, "觀察者可接收事件數(shù)量 = " + emitter.requested());

                // 2. 每次發(fā)送事件后,emitter.requested()會(huì)實(shí)時(shí)更新觀察者能接受的事件
                // 即一開始觀察者要接收10個(gè)事件赤炒,發(fā)送了1個(gè)后氯析,會(huì)實(shí)時(shí)更新為9個(gè)
                Log.d(TAG, "發(fā)送了事件 1");
                emitter.onNext(1);
                Log.d(TAG, "發(fā)送了事件1后, 還需要發(fā)送事件數(shù)量 = " + emitter.requested());

                Log.d(TAG, "發(fā)送了事件 2");
                emitter.onNext(2);
                Log.d(TAG, "發(fā)送事件2后, 還需要發(fā)送事件數(shù)量 = " + emitter.requested());

                Log.d(TAG, "發(fā)送了事件 3");
                emitter.onNext(3);
                Log.d(TAG, "發(fā)送事件3后, 還需要發(fā)送事件數(shù)量 = " + emitter.requested());

                emitter.onComplete();
            }
        }, BackpressureStrategy.ERROR)
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");

                        s.request(10); // 設(shè)置觀察者每次能接受10個(gè)事件
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "接收到了事件" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
示意圖

情況3:異常

  • 當(dāng)FlowableEmitter.requested()減到0時(shí)亏较,則代表觀察者已經(jīng)不可接收事件
  • 此時(shí)被觀察者若繼續(xù)發(fā)送事件,則會(huì)拋出MissingBackpressureException異常

如觀察者可接收事件數(shù)量 = 1掩缓,當(dāng)被觀察者發(fā)送第2個(gè)事件時(shí)叹阔,就會(huì)拋出異常

Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {

                // 1. 調(diào)用emitter.requested()獲取當(dāng)前觀察者需要接收的事件數(shù)量
                Log.d(TAG, "觀察者可接收事件數(shù)量 = " + emitter.requested());

                // 2. 每次發(fā)送事件后赠尾,emitter.requested()會(huì)實(shí)時(shí)更新觀察者能接受的事件
                // 即一開始觀察者要接收10個(gè)事件,發(fā)送了1個(gè)后,會(huì)實(shí)時(shí)更新為9個(gè)
                Log.d(TAG, "發(fā)送了事件 1");
                emitter.onNext(1);
                Log.d(TAG, "發(fā)送了事件1后, 還需要發(fā)送事件數(shù)量 = " + emitter.requested());

                Log.d(TAG, "發(fā)送了事件 2");
                emitter.onNext(2);
                Log.d(TAG, "發(fā)送事件2后, 還需要發(fā)送事件數(shù)量 = " + emitter.requested());

                emitter.onComplete();
            }
        }, BackpressureStrategy.ERROR)
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {

                        Log.d(TAG, "onSubscribe");
                        s.request(1); // 設(shè)置觀察者每次能接受1個(gè)事件

                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "接收到了事件" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
示意圖

額外

  • 若觀察者沒有設(shè)置可接收事件數(shù)量冈在,即無調(diào)用Subscription.request()
  • 那么被觀察者默認(rèn)觀察者可接收事件數(shù)量 = 0,即FlowableEmitter.requested()的返回值 = 0

5.2.2 異步訂閱情況

  • 原理說明
示意圖

從上面可以看出令杈,由于二者處于不同線程掌敬,所以被觀察者 無法通過 FlowableEmitter.requested()知道觀察者自身接收事件能力,即 被觀察者不能根據(jù) 觀察者自身接收事件的能力 控制發(fā)送事件的速度蠢熄。具體請(qǐng)看下面例子

Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {

                // 調(diào)用emitter.requested()獲取當(dāng)前觀察者需要接收的事件數(shù)量
                Log.d(TAG, "觀察者可接收事件數(shù)量 = " + emitter.requested());

            }
        }, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()) // 設(shè)置被觀察者在io線程中進(jìn)行
                .observeOn(AndroidSchedulers.mainThread()) // 設(shè)置觀察者在主線程中進(jìn)行
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        s.request(150);
                        // 該設(shè)置僅影響觀察者線程中的requested跪解,卻不會(huì)影響的被觀察者中的FlowableEmitter.requested()的返回值
                        // 因?yàn)镕lowableEmitter.requested()的返回值 取決于RxJava內(nèi)部調(diào)用request(n),而該內(nèi)部調(diào)用會(huì)在一開始就調(diào)用request(128)
                        // 為什么是調(diào)用request(128)下面再講解
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "接收到了事件" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
示意圖

而在異步訂閱關(guān)系中签孔,反向控制的原理是:通過RxJava內(nèi)部固定調(diào)用被觀察者線程中的request(n) 從而 反向控制被觀察者的發(fā)送事件速度

那么該什么時(shí)候調(diào)用被觀察者線程中的request(n) & n 的值該是多少呢叉讥?請(qǐng)繼續(xù)往下看。

  • 具體使用

關(guān)于RxJava內(nèi)部調(diào)用request(n)(n = 128饥追、96图仓、0)的邏輯如下:

示意圖

至于為什么是調(diào)用request(128) & request(96) & request(0),感興趣的讀者可自己閱讀 Flowable的源碼

  • 代碼演示

下面我將用一個(gè)例子來演示該原理的邏輯

// 被觀察者:一共需要發(fā)送500個(gè)事件但绕,但真正開始發(fā)送事件的前提 = FlowableEmitter.requested()返回值 ≠ 0
// 觀察者:每次接收事件數(shù)量 = 48(點(diǎn)擊按鈕)

        Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {

                Log.d(TAG, "觀察者可接收事件數(shù)量 = " + emitter.requested());
                    boolean flag; //設(shè)置標(biāo)記位控制

                    // 被觀察者一共需要發(fā)送500個(gè)事件
                    for (int i = 0; i < 500; i++) {
                        flag = false;

                        // 若requested() == 0則不發(fā)送
                        while (emitter.requested() == 0) {
                            if (!flag) {
                                Log.d(TAG, "不再發(fā)送");
                                flag = true;
                            }
                        }
                        // requested() ≠ 0 才發(fā)送
                        Log.d(TAG, "發(fā)送了事件" + i + "救崔,觀察者可接收事件數(shù)量 = " + emitter.requested());
                        emitter.onNext(i);


                }
            }
        }, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()) // 設(shè)置被觀察者在io線程中進(jìn)行
                .observeOn(AndroidSchedulers.mainThread()) // 設(shè)置觀察者在主線程中進(jìn)行
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;
                       // 初始狀態(tài) = 不接收事件;通過點(diǎn)擊按鈕接收事件
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "接收到了事件" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });


// 點(diǎn)擊按鈕才會(huì)接收事件 = 48 / 次
btn = (Button) findViewById(R.id.btn);
        btn.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View view) {
                mSubscription.request(48);
                // 點(diǎn)擊按鈕 則 接收48個(gè)事件
            }

        });

整個(gè)流程 & 測(cè)試結(jié)果 請(qǐng)看下圖

示意圖

5.3 采用背壓策略模式:BackpressureStrategy

5.3.1 背壓模式介紹

在Flowable的使用中捏顺,會(huì)被要求傳入背壓模式參數(shù)

示意圖
  • 面向?qū)ο螅横槍?duì)緩存區(qū)
  • 作用:當(dāng)緩存區(qū)大小存滿六孵、被觀察者仍然繼續(xù)發(fā)送下1個(gè)事件時(shí),該如何處理的策略方式

緩存區(qū)大小存滿幅骄、溢出 = 發(fā)送事件速度 > 接收事件速度 的結(jié)果 = 發(fā)送 & 接收事件不匹配的結(jié)果

5.3.2 背壓模式類型

示意圖

下面我將對(duì)每種模式逐一說明劫窒。

模式1:BackpressureStrategy.ERROR

  • 問題:發(fā)送事件速度 > 接收事件 速度,即流速不匹配

具體表現(xiàn):出現(xiàn)當(dāng)緩存區(qū)大小存滿(默認(rèn)緩存區(qū)大小 = 128)拆座、被觀察者仍然繼續(xù)發(fā)送下1個(gè)事件時(shí)

  • 處理方式:直接拋出異常MissingBackpressureException
 // 創(chuàng)建被觀察者Flowable
        Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {

                // 發(fā)送 129個(gè)事件
                for (int i = 0;i< 129; i++) {
                    Log.d(TAG, "發(fā)送了事件" + i);
                    emitter.onNext(i);
                }
                emitter.onComplete();
            }
        }, BackpressureStrategy.ERROR) // 設(shè)置背壓模式 = BackpressureStrategy.ERROR
                .subscribeOn(Schedulers.io()) // 設(shè)置被觀察者在io線程中進(jìn)行
                .observeOn(AndroidSchedulers.mainThread()) // 設(shè)置觀察者在主線程中進(jìn)行
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "接收到了事件" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
示意圖

模式2:BackpressureStrategy.MISSING

  • 問題:發(fā)送事件速度 > 接收事件 速度主巍,即流速不匹配

具體表現(xiàn)是:出現(xiàn)當(dāng)緩存區(qū)大小存滿(默認(rèn)緩存區(qū)大小 = 128)、被觀察者仍然繼續(xù)發(fā)送下1個(gè)事件時(shí)

  • 處理方式:友好提示:緩存區(qū)滿了
// 創(chuàng)建被觀察者Flowable
        Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {

                // 發(fā)送 129個(gè)事件
                for (int i = 0;i< 129; i++) {
                    Log.d(TAG, "發(fā)送了事件" + i);
                    emitter.onNext(i);
                }
                emitter.onComplete();
            }
        }, BackpressureStrategy.MISSING) // 設(shè)置背壓模式 = BackpressureStrategy.MISSING
                .subscribeOn(Schedulers.io()) // 設(shè)置被觀察者在io線程中進(jìn)行
                .observeOn(AndroidSchedulers.mainThread()) // 設(shè)置觀察者在主線程中進(jìn)行
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "接收到了事件" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
示意圖

模式3:BackpressureStrategy.BUFFER

  • 問題:發(fā)送事件速度 > 接收事件 速度挪凑,即流速不匹配

具體表現(xiàn)是:出現(xiàn)當(dāng)緩存區(qū)大小存滿(默認(rèn)緩存區(qū)大小 = 128)孕索、被觀察者仍然繼續(xù)發(fā)送下1個(gè)事件時(shí)

  • 處理方式:將緩存區(qū)大小設(shè)置成無限大
  1. 即 被觀察者可無限發(fā)送事件 觀察者,但實(shí)際上是存放在緩存區(qū)
  2. 但要注意內(nèi)存情況躏碳,防止出現(xiàn)OOM
// 創(chuàng)建被觀察者Flowable
        Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {

                // 發(fā)送 129個(gè)事件
                for (int i = 1;i< 130; i++) {
                    Log.d(TAG, "發(fā)送了事件" + i);
                    emitter.onNext(i);
                }
                emitter.onComplete();
            }
        }, BackpressureStrategy.BUFFER) // 設(shè)置背壓模式 = BackpressureStrategy.BUFFER
                .subscribeOn(Schedulers.io()) // 設(shè)置被觀察者在io線程中進(jìn)行
                .observeOn(AndroidSchedulers.mainThread()) // 設(shè)置觀察者在主線程中進(jìn)行
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "接收到了事件" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

可以接收超過原先緩存區(qū)大懈阈瘛(128)的事件數(shù)量了


示意圖

模式4: BackpressureStrategy.DROP

  • 問題:發(fā)送事件速度 > 接收事件 速度,即流速不匹配

具體表現(xiàn)是:出現(xiàn)當(dāng)緩存區(qū)大小存滿(默認(rèn)緩存區(qū)大小 = 128)、被觀察者仍然繼續(xù)發(fā)送下1個(gè)事件時(shí)

  • 處理方式:超過緩存區(qū)大醒〖埂(128)的事件丟棄

如發(fā)送了150個(gè)事件杭抠,僅保存第1 - 第128個(gè)事件,第129 -第150事件將被丟棄

        Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                // 發(fā)送150個(gè)事件
                for (int i = 0;i< 150; i++) {
                    Log.d(TAG, "發(fā)送了事件" + i);
                    emitter.onNext(i);
                }
                emitter.onComplete();
            }
        }, BackpressureStrategy.DROP)      // 設(shè)置背壓模式 = BackpressureStrategy.DROP
                .subscribeOn(Schedulers.io()) // 設(shè)置被觀察者在io線程中進(jìn)行
                .observeOn(AndroidSchedulers.mainThread()) // 設(shè)置觀察者在主線程中進(jìn)行
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;
                        // 通過按鈕進(jìn)行接收事件
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "接收到了事件" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });


btn = (Button) findViewById(R.id.btn);
        btn.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View view) {
                mSubscription.request(128);
                // 每次接收128個(gè)事件
            }

        });

被觀察者一下子發(fā)送了150個(gè)事件恳啥,點(diǎn)擊按鈕接收時(shí)觀察者接收了128個(gè)事件偏灿;再次點(diǎn)擊接收時(shí)卻無法接受事件,這說明超過緩存區(qū)大小的事件被丟棄了钝的。


示意圖

模式5:BackpressureStrategy.LATEST

  • 問題:發(fā)送事件速度 > 接收事件 速度翁垂,即流速不匹配

具體表現(xiàn)是:出現(xiàn)當(dāng)緩存區(qū)大小存滿(默認(rèn)緩存區(qū)大小 = 128)、被觀察者仍然繼續(xù)發(fā)送下1個(gè)事件時(shí)

  • 處理方式:只保存最新(最后)事件硝桩,超過緩存區(qū)大醒夭隆(128)的事件丟棄

即如果發(fā)送了150個(gè)事件,緩存區(qū)里會(huì)保存129個(gè)事件(第1-第128 + 第150事件)


        Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0;i< 150; i++) {
                    Log.d(TAG, "發(fā)送了事件" + i);
                    emitter.onNext(i);
                }
                emitter.onComplete();
            }
        }, BackpressureStrategy.LATEST) // // 設(shè)置背壓模式 = BackpressureStrategy.LATEST
                 .subscribeOn(Schedulers.io()) // 設(shè)置被觀察者在io線程中進(jìn)行
                .observeOn(AndroidSchedulers.mainThread()) // 設(shè)置觀察者在主線程中進(jìn)行
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;
                        // 通過按鈕進(jìn)行接收事件
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "接收到了事件" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

btn = (Button) findViewById(R.id.btn);
        btn.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View view) {
                mSubscription.request(128);
                // 每次接收128個(gè)事件
            }

        });
  • 被觀察者一下子發(fā)送了150個(gè)事件碗脊,點(diǎn)擊按鈕接收時(shí)觀察者接收了128個(gè)事件啼肩;
  • 再次點(diǎn)擊接收時(shí)卻接收到1個(gè)事件(第150個(gè)事件),這說明超過緩存區(qū)大小的事件僅保留最后的事件(第150個(gè)事件)
示意圖

5.3.3 特別注意

在使用背壓策略模式的時(shí)候衙伶,有1種情況是需要注意的:

a. 背景
FLowable 可通過自己創(chuàng)建(如上面例子)祈坠,或通過其他方式自動(dòng)創(chuàng)建,如interval操作符

interval操作符簡介

  1. 作用:每隔1段時(shí)間就產(chǎn)生1個(gè)數(shù)字(Long型)矢劲,從0開始赦拘、1次遞增1,直至無窮大
  2. 默認(rèn)運(yùn)行在1個(gè)新線程上
  3. 與timer操作符區(qū)別:timer操作符可結(jié)束發(fā)送

b. 沖突

  • 對(duì)于自身手動(dòng)創(chuàng)建FLowable的情況芬沉,可通過傳入背壓模式參數(shù)選擇背壓策略
    (即上面描述的)

  • 可是對(duì)于自動(dòng)創(chuàng)建FLowable躺同,卻無法手動(dòng)傳入傳入背壓模式參數(shù),那么出現(xiàn)流速不匹配的情況下丸逸,該如何選擇 背壓模式呢蹋艺?

// 通過interval自動(dòng)創(chuàng)建被觀察者Flowable
        // 每隔1ms將當(dāng)前數(shù)字(從0開始)加1,并發(fā)送出去
        // interval操作符會(huì)默認(rèn)新開1個(gè)新的工作線程
        Flowable.interval(1, TimeUnit.MILLISECONDS)
                .observeOn(Schedulers.newThread()) // 觀察者同樣工作在一個(gè)新開線程中
                .subscribe(new Subscriber<Long>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;
                        s.request(Long.MAX_VALUE); //默認(rèn)可以接收Long.MAX_VALUE個(gè)事件
                    }

                    @Override
                    public void onNext(Long aLong) {
                        Log.d(TAG, "onNext: " + aLong);
                        try {
                            Thread.sleep(1000);
                            // 每次延時(shí)1秒再接收事件
                            // 因?yàn)榘l(fā)送事件 = 延時(shí)1ms椭员,接收事件 = 延時(shí)1s车海,出現(xiàn)了發(fā)送速度 & 接收速度不匹配的問題
                            // 緩存區(qū)很快就存滿了128個(gè)事件笛园,從而拋出MissingBackpressureException異常隘击,請(qǐng)看下圖結(jié)果
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }
                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
示意圖

c. 解決方案
RxJava 2.0內(nèi)部提供 封裝了背壓策略模式的方法

  • onBackpressureBuffer()
  • onBackpressureDrop()
  • onBackpressureLatest()

默認(rèn)采用BackpressureStrategy.ERROR模式

具體使用如下:

Flowable.interval(1, TimeUnit.MILLISECONDS)
                .onBackpressureBuffer() // 添加背壓策略封裝好的方法,此處選擇Buffer模式研铆,即緩存區(qū)大小無限制
                .observeOn(Schedulers.newThread()) 
                .subscribe(new Subscriber<Long>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;
                        s.request(Long.MAX_VALUE); 
                    }

                    @Override
                    public void onNext(Long aLong) {
                        Log.d(TAG, "onNext: " + aLong);
                        try {
                            Thread.sleep(1000);
                            
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }
                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

從而很好地解決了發(fā)送事件 & 接收事件 速度不匹配的問題埋同。

封裝方法的示意圖.gif

其余方法的作用類似于上面的說背壓模式參數(shù),此處不作過多描述棵红。

背壓策略模式小結(jié)

示意圖
  • 至此凶赁,對(duì)RxJava 2.0的背壓模式終于講解完畢
  • 所有代碼Demo均存放在Carson_Ho的Github地址

6. 總結(jié)

  • 本文主要對(duì) Rxjava 的背壓模式知識(shí)進(jìn)行講解

  • Carson帶你學(xué)RxJava系列文章:

入門
Carson帶你學(xué)Android:這是一篇清晰易懂的Rxjava入門教程
Carson帶你學(xué)Android:面向初學(xué)者的RxJava使用指南
Carson帶你學(xué)Android:RxJava2.0到底更新了什么?
原理
Carson帶你學(xué)Android:圖文解析RxJava原理
Carson帶你學(xué)Android:手把手帶你源碼分析RxJava
使用教程:操作符
Carson帶你學(xué)Android:RxJava操作符教程
Carson帶你學(xué)Android:RxJava創(chuàng)建操作符
Carson帶你學(xué)Android:RxJava功能性操作符
Carson帶你學(xué)Android:RxJava過濾操作符
Carson帶你學(xué)Android:RxJava組合/合并操作符
Carson帶你學(xué)Android:RxJava變換操作符
Carson帶你學(xué)Android:RxJava條件/布爾操作符
實(shí)戰(zhàn)
Carson帶你學(xué)Android:什么時(shí)候應(yīng)該使用Rxjava?(開發(fā)場(chǎng)景匯總)
Carson帶你學(xué)Android:RxJava線程控制(含實(shí)例講解)
Carson帶你學(xué)Android:圖文詳解RxJava背壓策略
Carson帶你學(xué)Android:RxJava虱肄、Retrofit聯(lián)合使用匯總(含實(shí)例教程)
Carson帶你學(xué)Android:優(yōu)雅實(shí)現(xiàn)網(wǎng)絡(luò)請(qǐng)求嵌套回調(diào)
Carson帶你學(xué)Android:網(wǎng)絡(luò)請(qǐng)求輪詢(有條件)
Carson帶你學(xué)Android:網(wǎng)絡(luò)請(qǐng)求輪詢(無條件)
Carson帶你學(xué)Android:網(wǎng)絡(luò)請(qǐng)求出錯(cuò)重連(結(jié)合Retrofit)
Carson帶你學(xué)Android:合并數(shù)據(jù)源
Carson帶你學(xué)Android:聯(lián)想搜索優(yōu)化
Carson帶你學(xué)Android:功能防抖
Carson帶你學(xué)Android:從磁盤/內(nèi)存緩存中獲取緩存數(shù)據(jù)
Carson帶你學(xué)Android:聯(lián)合判斷


歡迎關(guān)注Carson_Ho的簡書

不定期分享關(guān)于安卓開發(fā)的干貨致板,追求短、平咏窿、快斟或,但卻不缺深度


請(qǐng)點(diǎn)贊集嵌!因?yàn)槟愕墓膭?lì)是我寫作的最大動(dòng)力萝挤!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市根欧,隨后出現(xiàn)的幾起案子怜珍,更是在濱河造成了極大的恐慌,老刑警劉巖凤粗,帶你破解...
    沈念sama閱讀 219,366評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件酥泛,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡嫌拣,警方通過查閱死者的電腦和手機(jī)揭璃,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,521評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來亭罪,“玉大人瘦馍,你說我怎么就攤上這事∮σ郏” “怎么了情组?”我有些...
    開封第一講書人閱讀 165,689評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長箩祥。 經(jīng)常有香客問我院崇,道長,這世上最難降的妖魔是什么袍祖? 我笑而不...
    開封第一講書人閱讀 58,925評(píng)論 1 295
  • 正文 為了忘掉前任底瓣,我火速辦了婚禮,結(jié)果婚禮上蕉陋,老公的妹妹穿的比我還像新娘捐凭。我一直安慰自己,他們只是感情好凳鬓,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,942評(píng)論 6 392
  • 文/花漫 我一把揭開白布茁肠。 她就那樣靜靜地躺著,像睡著了一般缩举。 火紅的嫁衣襯著肌膚如雪垦梆。 梳的紋絲不亂的頭發(fā)上匹颤,一...
    開封第一講書人閱讀 51,727評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音托猩,去河邊找鬼印蓖。 笑死,一個(gè)胖子當(dāng)著我的面吹牛京腥,可吹牛的內(nèi)容都是我干的另伍。 我是一名探鬼主播,決...
    沈念sama閱讀 40,447評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼绞旅,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼摆尝!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起因悲,我...
    開封第一講書人閱讀 39,349評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤堕汞,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后晃琳,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體讯检,經(jīng)...
    沈念sama閱讀 45,820評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,990評(píng)論 3 337
  • 正文 我和宋清朗相戀三年卫旱,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了人灼。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,127評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡顾翼,死狀恐怖投放,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情适贸,我是刑警寧澤灸芳,帶...
    沈念sama閱讀 35,812評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站拜姿,受9級(jí)特大地震影響烙样,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜蕊肥,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,471評(píng)論 3 331
  • 文/蒙蒙 一谒获、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧壁却,春花似錦批狱、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,017評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽狼速。三九已至琅锻,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背恼蓬。 一陣腳步聲響...
    開封第一講書人閱讀 33,142評(píng)論 1 272
  • 我被黑心中介騙來泰國打工惊完, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人处硬。 一個(gè)月前我還...
    沈念sama閱讀 48,388評(píng)論 3 373
  • 正文 我出身青樓小槐,卻偏偏與公主長得像,于是被迫代替她去往敵國和親荷辕。 傳聞我的和親對(duì)象是個(gè)殘疾皇子凿跳,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,066評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容