RxJava2 筆記

package com.github.cai.greendaotaste.rxjava;

import android.os.Bundle;
import android.support.annotation.Nullable;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;
import android.view.View;
import android.widget.Button;
import android.widget.TextView;
import android.widget.Toast;

import com.github.cai.greendaotaste.R;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.util.ArrayList;
import java.util.concurrent.TimeUnit;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.ObservableSource;
import io.reactivex.Observer;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.annotations.NonNull;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.BiFunction;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;
import io.reactivex.schedulers.Schedulers;

/**
 * Created by admin on 2017/6/19.
 * 參考自 碼個蛋 http://chuansong.me/n/1875207353935
 * 上游和下游就分別對應(yīng)著RxJava中的Observable和Observer酱塔,它們之間的連接就對應(yīng)著subscribe()
 * <p>
 * 知識點一:
 * 一,上游可以發(fā)送無限個onNext, 下游也可以接收無限個onNext.
 * 二弟蚀,當(dāng)上游發(fā)送了一個onComplete后, 上游onComplete之后的事件將會繼續(xù)發(fā)送, 而下游收到onComplete事件之后將不再繼續(xù)接收事件.
 * 三拌消,當(dāng)上游發(fā)送了一個onError后, 上游onError之后的事件將繼續(xù)發(fā)送, 而下游收到onError事件之后將不再繼續(xù)接收事件.
 * 四挑豌,上游可以不發(fā)送onComplete或onError.
 * 五,最為關(guān)鍵的是onComplete和onError必須唯一并且互斥, 即不能發(fā)多個onComplete, 也不能發(fā)多個onError, 也不能先發(fā)一個onComplete, 然后再發(fā)一個onError, 反之亦然
 * 六墩崩,調(diào)用順序:先調(diào)用onSubscribe -> subscribe(上游方法) -> onNext -> onComplete或onError
 * <p>
 * 知識點二: 調(diào)用dispose()并不會導(dǎo)致上游不再繼續(xù)發(fā)送事件, 上游會繼續(xù)發(fā)送剩余的事件.
 * <p>
 * 知識點三:
 * 帶有Observer參數(shù)的我們已經(jīng)使用過了,這里對其他幾個方法進行說明.
 * 一氓英,不帶任何參數(shù)的subscribe() 表示下游不關(guān)心任何事件,你上游盡管發(fā)你的數(shù)據(jù)去吧, 老子可不管你發(fā)什么.
 * 二,帶有一個Consumer參數(shù)的方法表示下游只關(guān)心onNext事件, 其他的事件我假裝沒看見, 因此我們?nèi)绻恍枰猳nNext事件可以這么寫:
 * <p>
 * 知識點四:
 * 上游和下游是工作在同一個線程中的, 也就是說上游在哪個線程發(fā)事件, 下游就在哪個線程接收事件.
 * <p>
 * 知識點五:
 * 一,多次指定上游的線程只有第一次指定的有效, 也就是說多次調(diào)用subscribeOn() 只有第一次的有效, 其余的會被忽略.
 * 二,多次指定下游的線程是可以的, 也就是說*每調(diào)用一次observeOn()* , 下游的線程就會切換一次.
 * <p>
 * 知識點六:
 * Schedulers.io()
 * 代表io操作的線程, 通常用于網(wǎng)絡(luò),讀寫文件等io密集型的操作
 * <p>
 * Schedulers.computation()
 * 代表CPU計算密集型的操作, 例如需要大量計算的操作
 * <p>
 * Schedulers.newThread()
 * 代表一個常規(guī)的新線程
 * <p>
 * AndroidSchedulers.mainThread()
 * 代表Android的主線程
 * <p>
 * 知識點七:
 * 那如果有多個Disposable 該怎么辦呢, RxJava中已經(jīng)內(nèi)置了一個容器CompositeDisposable,
 * 每當(dāng)我們得到一個Disposable時就調(diào)用CompositeDisposable.add()將它添加到容器中, 在退出的時候,
 * 調(diào)用CompositeDisposable.clear() 即可切斷所有的水管.
 * <p>
 * 知識點八:
 * 上游每發(fā)送一個事件, flatMap都將創(chuàng)建一個新的水管, 然后發(fā)送轉(zhuǎn)換之后的新的事件, 下游接收到的就是這些新的水管發(fā)送的數(shù)據(jù).
 * 這里需要注意的是, flatMap并不保證事件的順序, 也就是圖中所看到的, 并不是事件1就在事件2的前面.
 * 如果需要保證順序則需要使用concatMap.
 * <p>
 * 最終下游收到的事件數(shù)量是和上游中發(fā)送事件最少的那一根水管的事件數(shù)量相同,這個也很好理解,
 * 因為是從每一根水管里取一個事件來進行合并,
 * 最少的那個肯定就最先取完, 這個時候其他的水管盡管還有事件,
 * 但是已經(jīng)沒有足夠的事件來組合了, 因此下游就不會收到剩余的事件
 * <p>
 */

public class RxJavaActivity extends AppCompatActivity implements View.OnClickListener {

    public static final String TAG = RxJavaActivity.class.getSimpleName();

    private TextView showRxMessage;
    private Button subscriber, observer,requestBtn;

    private Subscription mSubscription;

    @Override
    protected void onCreate(@Nullable Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_rxjava);
        showRxMessage = (TextView) findViewById(R.id.show_rx_message);
        subscriber = (Button) findViewById(R.id.subscriber);
        observer = (Button) findViewById(R.id.observer);
        requestBtn = (Button) findViewById(R.id.request);

        subscriber.setOnClickListener(this);
        observer.setOnClickListener(this);
        requestBtn.setOnClickListener(this);

        Log.d(TAG, "onCreate: current thread is " + Thread.currentThread().getName());
    }

    @Override
    public void onClick(View v) {
        switch (v.getId()) {
            case R.id.subscriber:
                //simpleUseRxJava();
                //onlyCareAboutOnNext();
                //rxJavaMapTaste();
                //flatMapTaste();
                //rxJavaZipTaste();
                //dealWithBackpressureWithFilter();
                //dealWithBackPressWithSample();
                rxJavaFlowableResponseFetchAync();
                break;
            case R.id.observer:
                //flowableTaste();
                //changeFlowableCacheSize();
                //rxJavaIntervalTaste();
                //rxJavaFlowableResponseFetch();
                lookWhatTimeHitRequestValue();
                break;

            case R.id.request:
                mSubscription.request(96);
                break;
        }
    }

    public void simpleUseRxJava() {
        //可觀察者 上游 可觀察者所以用訂閱subscribeOn
        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                //ObservableEmitter: Emitter是發(fā)射器的意思 可以聯(lián)想為送報員
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onComplete();
                e.onComplete();//發(fā)完一個后還可以發(fā)第二個呵燕,只是觀察者只收一個
                //e.onError(new Throwable());崩潰 onComplete和onError事件是互斥的昔汉,只能發(fā)一個

                Log.d(TAG, "subscribe: current thread is " + Thread.currentThread().getName());
            }
        });
        //觀察者 所有用觀察observeOn
        Observer<Integer> observer = new Observer<Integer>() {

            private Disposable disposable;

            @Override
            public void onSubscribe(@NonNull Disposable d) {
                // Disposable, 這個單詞的字面意思是一次性用品,用完即可丟棄的.
                // 那么在RxJava中怎么去理解它呢, 對應(yīng)于上面的水管的例子,
                // 我們可以把它理解成兩根管道之間的一個機關(guān), 當(dāng)調(diào)用它的dispose()方法時,
                // 它就會將兩根管道切斷, 從而導(dǎo)致下游收不到事件.
                // 因此我們可以在Activity中將這個Disposable 保存起來, 當(dāng)Activity退出時, 切斷它即可.
                showMessageInText("onSubscribe: ");
                disposable = d;

                Log.d(TAG, "onSubscribe: current thread is " + Thread.currentThread().getName());
            }

            @Override
            public void onNext(@NonNull Integer integer) {
                //showMessageInText("onNext: " + integer.toString());

                if (integer == 2) {
                    disposable.dispose();
                }

                //showMessageInText("onNext: " + disposable.isDisposed());//切斷水管,之后的時間不在接收
                //但是不妨礙上游時間的傳遞

                Log.d(TAG, "onNext: current thread is " + Thread.currentThread().getName());
            }

            @Override
            public void onError(@NonNull Throwable e) {
                //showMessageInText("onError: ");

                Log.d(TAG, "onError: current thread is " + Thread.currentThread().getName());
            }

            @Override
            public void onComplete() {
                //showMessageInText("onComplete: ");

                Log.d(TAG, "onComplete: current thread is " + Thread.currentThread().getName());
            }
        };
        //郵局派送報紙(聯(lián)想訂報紙,雖然是人去訂報社報紙,但是報社是實際遞送報紙給人的),上游向下游發(fā)送事件
        observable.subscribeOn(Schedulers.newThread())//指定發(fā)送事件的線程 上游發(fā)送事件的線程,
                .subscribeOn(Schedulers.newThread())
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())//指定接收事件的線程 下游接收事件的線程.
                .observeOn(Schedulers.io())
                .observeOn(Schedulers.computation())
                .subscribe(observer);
    }

    public void onlyCareAboutOnNext() {
        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                //ObservableEmitter: Emitter是發(fā)射器的意思 可以聯(lián)想為送報員
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onError(new Throwable("test error"));
                //e.onComplete();
                Log.d(TAG, "subscribe: current thread is " + Thread.currentThread().getName());
            }
        });
        Consumer<Integer> careAboutNext = new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                showMessageInText("accept: " + integer.toString() + " login is success");
            }
        };

        Consumer<Throwable> failureNext = new Consumer<Throwable>() {//注意:用于錯誤的Consumer只能使用泛型Throwable
            @Override
            public void accept(@NonNull Throwable throwable) throws Exception {
                showMessageInText("accept: login is failure");
            }
        };

        observable.subscribe(careAboutNext, failureNext);
    }

    //map是RxJava中最簡單的一個變換操作符了, 它的作用就是對上游發(fā)送的每一個事件應(yīng)用一個函數(shù), 使得每一個事件都按照指定的函數(shù)去變化
    public void rxJavaMapTaste() {
        //要傳遞的是Integer類型
        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                e.onNext(10);
                e.onNext(11);
                e.onNext(12);
                e.onComplete();
            }
        });
        //要接收的是String類型
        Consumer<String> consumer = new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                showMessageInText("accept: I receive value is" + s);
            }
        };

        //提供的轉(zhuǎn)換函數(shù)
        observable.map(new Function<Integer, String>() {
            @Override
            public String apply(@NonNull Integer integer) throws Exception {
                return String.valueOf(integer);
            }
        }).subscribe(consumer);
    }

    //FlatMap將一個發(fā)送事件的上游Observable變換為多個發(fā)送事件的Observables啊鸭,然后將它們發(fā)射的事件合并后放進一個單獨的Observable里.
    //自己的理解:
    public void flatMapTaste() {
        //要傳遞的是Integer類型
        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                e.onNext(100);
                e.onNext(110);
                e.onNext(120);
                e.onComplete();
            }
        });

        Consumer<String> consumer = new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                showMessageInText("accept: After flat map I receive value is" + s);
            }
        };

        observable.flatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
                ArrayList<String> list = new ArrayList<>(3);
                for (int i = 0; i < 3; i++) {
                    list.add(String.valueOf(integer));
                }
                return Observable.fromIterable(list).delay(15, TimeUnit.MILLISECONDS);//不保證發(fā)送順序
            }
        }).observeOn(AndroidSchedulers.mainThread()).subscribe(consumer);

    }

    // Zip通過一個函數(shù)將多個Observable發(fā)送的事件結(jié)合到一起,然后發(fā)送這些組合到一起的事件.
    // 它按照*嚴格的順序*應(yīng)用這個函數(shù)匿值。它只發(fā)射與 發(fā)射數(shù)據(jù)項最少的那個Observable 一樣多的數(shù)據(jù)
    // (最終下游收到的事件數(shù)量是和上游中發(fā)送事件最少的那一根水管的事件數(shù)量相同)
    public void rxJavaZipTaste() {
        //要傳遞的是Integer類型
        Observable<Integer> observableInt = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1000);
                e.onNext(1100);
                e.onNext(1200);
                e.onNext(1300);
                e.onComplete();
            }
        });

        //要傳遞的是Integer類型
        Observable<String> observableStr = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                e.onNext("thousand");
                e.onNext("thousand one hundred");
                e.onNext("thousand two hundred");
                e.onComplete();
            }
        });

        Consumer<String> consumer = new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                showMessageInText("accept: After zip I receive value is" + s);
            }
        };

        //泛型參數(shù)表示 第一個表示一個上游的類型赠制,第二個參數(shù)表示第二個上游的類型,第三個參數(shù)表示組合后返回的類型
        Observable.zip(observableInt, observableStr, new BiFunction<Integer, String, String>() {
            @Override
            public String apply(@NonNull Integer integer, @NonNull String s) throws Exception {
                return integer + s;
            }
        }).subscribe(consumer);//如果沒有指定線程,默認全部都是在同一個線程中執(zhí)行
    }


    public void backpressureTaste() {
        /**
         * Zip可以將多個上游發(fā)送的事件組合起來發(fā)送給下游, 那大家有沒有想過一個問題,
         * 如果其中一個水管A發(fā)送事件特別快, 而另一個水管B 發(fā)送事件特別慢, 那就可能出現(xiàn)這種情況,
         * 發(fā)得快的水管A 已經(jīng)發(fā)送了1000個事件了, 而發(fā)的慢的水管B 才發(fā)一個出來, 組合了一個之后水管A 還剩999個事件,
         * 這些事件需要繼續(xù)等待水管B 發(fā)送事件出來組合, 那么這么多的事件是放在哪里的呢? 總有一個地方保存吧?
         * 沒錯, Zip給我們的每一根水管都弄了一個水缸 , 用來保存這些事件
         *
         * 所謂的Backpressure其實就是為了控制流量
         */

        /**
         * 知識點九:
         * 一钟些,當(dāng)上下游工作在同一個線程中時, 這時候是一個同步的訂閱關(guān)系, 也就是說上游每發(fā)送一個事件必須等到下游接收處理完了以后才能接著發(fā)送下一個事件
         * 相當(dāng)于直接調(diào)用方法的過程
         * 二烟号,當(dāng)上下游工作在不同的線程中時, 這時候是一個異步的訂閱關(guān)系, 這個時候上游發(fā)送數(shù)據(jù)不需要等待下游接收,
         * 為什么呢, 因為兩個線程并不能直接進行通信, 因此上游發(fā)送的事件并不能直接到下游里去,
         * 這個時候就需要一個田螺姑娘來幫助它們倆, 這個田螺姑娘就是我們剛才說的水缸 ! 上游把事件發(fā)送到水缸里去,
         * 下游從水缸里取出事件來處理, 因此, 當(dāng)上游發(fā)事件的速度太快, 下游取事件的速度太慢, 水缸就會迅速裝滿, 然后溢出來, 最后就OOM了.
         */
    }

    public void dealWithBackpressureWithFilter() {

        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                for (int i = 0; ; i++)
                    e.onNext(i);
            }
        });

        Consumer<Integer> consumer = new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                showMessageInText("the receive value is " + integer);
            }
        };

        Consumer<Throwable> error = new Consumer<Throwable>() {
            @Override
            public void accept(@NonNull Throwable throwable) throws Exception {
                Toast.makeText(RxJavaActivity.this, "Oop,occur error", Toast.LENGTH_SHORT).show();
            }
        };

        observable.observeOn(AndroidSchedulers.mainThread())
                .subscribeOn(Schedulers.io())
                .filter(new Predicate<Integer>() {//過濾,只有符合條件的才放入水缸
                    @Override
                    public boolean test(@NonNull Integer integer) throws Exception {
                        if (integer % 10000 == 0)
                            return true;
                        return false;
                    }
                })
                .subscribe(consumer, error);
    }

    public void dealWithBackPressWithSample() {

        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                for (int i = 0; ; i++)
                    e.onNext(i);
            }
        });

        Consumer<Integer> consumer = new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                Log.e(TAG, "accept: the receive value is " + integer);
            }
        };

        Consumer<Throwable> error = new Consumer<Throwable>() {
            @Override
            public void accept(@NonNull Throwable throwable) throws Exception {
                Toast.makeText(RxJavaActivity.this, "Oop,occur error", Toast.LENGTH_SHORT).show();
            }
        };

        observable.subscribeOn(Schedulers.computation())
                .observeOn(Schedulers.io())
                .sample(2,TimeUnit.SECONDS)//這個操作符每隔指定的時間就從上游中取出一個事件發(fā)送給下游政恍,這里是每隔2秒鐘取樣一次
                .subscribe(consumer,error);
    }

    /**
     * 知識點一:
     * 我們注意到這次和Observable有些不同. 首先是創(chuàng)建Flowable的時候增加了一個參數(shù),
     * 這個參數(shù)是用來選擇背壓,也就是出現(xiàn)上下游流速不均衡的時候應(yīng)該怎么處理的辦法,
     * 這里我們直接用BackpressureStrategy.ERROR這種方式, 這種方式會在出現(xiàn)上下游流速不均衡的時候直接拋出一個異常,
     * 這個異常就是著名的MissingBackpressureException. 其余的策略后面再來講解
     *
     * 知識點二:
     * 另外的一個區(qū)別是在下游的onSubscribe方法中傳給我們的不再是Disposable了, 而是Subscription,
     * 它倆有什么區(qū)別呢, 首先它們都是上下游中間的一個開關(guān), 之前我們說調(diào)用Disposable.dispose()方法可以切斷水管,
     * 同樣的調(diào)用Subscription.cancel()也可以切斷水管, 不同的地方在于Subscription增加了一個void request(long n)方法,
     * 這個方法有什么用呢, 在上面的代碼中也有這么一句代碼:
     *
     * 知識點三:
     * 關(guān)于Subscription的request(int)方法
     * 這是因為Flowable在設(shè)計的時候采用了一種新的思路也就是響應(yīng)式拉取的方式來更好的解決上下游流速不均衡的問題,
     * 與我們之前所講的控制數(shù)量和控制速度不太一樣, 這種方式用通俗易懂的話來說就好比是葉問打鬼子, 我們把上游看成小日本,
     * 把下游當(dāng)作葉問, 當(dāng)調(diào)用Subscription.request(1)時, 葉問就說我要打一個! 然后小日本就拿出一個鬼子給葉問,
     * 讓他打, 等葉問打死這個鬼子之后, 再次調(diào)用request(10),  葉問就又說我要打十個! 然后小日本又派出十個鬼子給葉問,
     * 然后就在邊上看熱鬧, 看葉問能不能打死十個鬼子, 等葉問打死十個鬼子后再繼續(xù)要鬼子接著打...

     * 所以我們把request當(dāng)做是一種能力, 當(dāng)成下游處理事件的能力, 下游能處理幾個就告訴上游我要幾個,
     * 這樣只要上游根據(jù)下游的處理能力來決定發(fā)送多少事件, 就不會造成一窩蜂的發(fā)出一堆事件來, 從而導(dǎo)致OOM.
     * 這也就完美的解決之前我們所學(xué)到的兩種方式的缺陷, 過濾事件會導(dǎo)致事件丟失, 減速又可能導(dǎo)致性能損失.
     * 而這種方式既解決了事件丟失的問題, 又解決了速度的問題, 完美 !
     *
     * 知識點四:
     * 首先第一個同步的代碼, 為什么上游發(fā)送第一個事件后下游就拋出了MissingBackpressureException異常(最新版的只是調(diào)用onError方法),
     * 這是因為下游沒有調(diào)用request, 上游就認為下游沒有處理事件的能力, 而這又是一個同步的訂閱, 既然下游處理不了,
     * 那上游不可能一直等待吧, 如果是這樣, 萬一這兩根水管工作在主線程里, 界面不就卡死了嗎, 因此只能拋個異常來提醒我們.
     * 那如何解決這種情況呢, 很簡單啦, 下游直接調(diào)用request(Long.MAX_VALUE)就行了, 或者根據(jù)上游發(fā)送事件的數(shù)量來request就行了,
     * 比如這里request(3)就可以了.

     * 然后我們再來看看第二段代碼, 為什么上下游沒有工作在同一個線程時, 上游卻正確的發(fā)送了所有的事件呢?
     * 這是因為在Flowable里默認有一個大小為128的水缸, 當(dāng)上下游工作在不同的線程中時, 上游就會先把事件發(fā)送到這個水缸中,
     * 因此, 下游雖然沒有調(diào)用request, 但是上游在水缸中保存著這些事件, 只有當(dāng)下游調(diào)用request時, 才從水缸里取出事件發(fā)給下游.
     */
    public void flowableTaste(){
        //上游變成了Flowable
        Flowable<Integer> flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception {
                e.onNext(10);
                e.onNext(11);
                e.onNext(12);
                e.onNext(13);
                e.onComplete();
            }
        }, BackpressureStrategy.ERROR);

        //下游變成了Subscriber
        Subscriber<Integer> subcriber = new Subscriber<Integer>() {
            Subscription s;
            @Override
            public void onSubscribe(Subscription s) {
                Log.e(TAG, "onSubscribe: ");
                //未指定線程注釋這段話汪拥,不會發(fā)送onNext事件,發(fā)完onSubscribe篙耗,直接發(fā)onError
                //如果指定線程注釋這段話迫筑,那么只會執(zhí)行onSubscribe
                s.request(2);//可看做是下游處理事件的能力,但是需要上游配合宗弯,如果上游不配合應(yīng)該沒有用
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                s.request(2);
                this.s = s;
            }

            @Override
            public void onNext(Integer integer) {
                Log.e(TAG, "onNext: " + integer);
                //if (integer == 12)
                    //s.cancel();//切斷水管
            }

            @Override
            public void onError(Throwable t) {
                Log.e(TAG, "onError: " + t.getMessage());
            }

            @Override
            public void onComplete() {
                Log.e(TAG, "onComplete: ");
            }
        };
        flowable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(subcriber);
    }

    /**
     * 知識點一:
     * 想想看我們之前學(xué)習(xí)Observable的時候說到的如何解決上游發(fā)送事件太快的, 有一招叫從數(shù)量上取勝,
     * 同樣的Flowable中也有這種方法, 對應(yīng)的就是BackpressureStrategy.DROP和BackpressureStrategy.LATEST這兩種策略.

     * 從名字上就能猜到它倆是干啥的, Drop就是直接把存不下的事件丟棄,Latest就是只保留最新的事件, 來看看它們的實際效果吧.
     */
    public void changeFlowableCacheSize(){

        //上游變成了Flowable
        Flowable<Integer> flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception {
                for (int i = 0;i < 1000; i++){
                    e.onNext(i);
                }
                e.onComplete();
            }
        }, BackpressureStrategy.BUFFER);//使用ERROR的時候overflow了脯燃,這次換成BUFFER,它沒有大小限制, 因此可以存放許許多多的事件.
        // 但是使用BUFFER無限循環(huán)的發(fā)送事件,會和Observable一樣發(fā)生OOM
        // 所以我們得從數(shù)量上取勝, 同樣的Flowable中也有這種方法, 對應(yīng)的就是BackpressureStrategy.DROP和BackpressureStrategy.LATEST這兩種策略.

        //下游變成了Subscriber
        Subscriber<Integer> subcriber = new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                Log.e(TAG, "onSubscribe: ");
            }

            @Override
            public void onNext(Integer integer) {
                Log.e(TAG, "onNext: " + integer);
            }

            @Override
            public void onError(Throwable t) {
                Log.e(TAG, "onError: " + t.getMessage());
            }

            @Override
            public void onComplete() {
                Log.e(TAG, "onComplete: ");
            }
        };
        flowable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(subcriber);
    }

    public void rxJavaIntervalTaste(){
        //每隔一秒進行加1
        Flowable.interval(1,TimeUnit.SECONDS)
                .onBackpressureBuffer()//指定緩存的大小
                .subscribe(new Subscriber<Long>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.e(TAG, "onSubscribe: ");
                        s.request(Integer.MAX_VALUE);
                    }

                    @Override
                    public void onNext(Long aLong) {
                        Log.e(TAG, "onNext: " + aLong);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.e(TAG, "onError: " + t.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        Log.e(TAG, "onComplete: ");
                    }
                });
    }

    /**
     * 知識點一:
     * 在下游中調(diào)用Subscription.request(n)就可以告訴上游蒙保,下游能夠處理多少個事件辕棚,
     * 那么上游要根據(jù)下游的處理能力正確的去發(fā)送事件,那么上游是不是應(yīng)該知道下游的處理能力是多少啊追他,
     * 對吧坟募,不然岛蚤,一個巴掌拍不響啊邑狸,這種事情得你情我愿才行。

     * 那么上游從哪里得知下游的處理能力呢涤妒?我們來看看上游最重要的部分单雾,肯定就是FlowableEmitter了啊,
     * 我們就是通過它來發(fā)送事件的啊她紫,來看看它的源碼吧
     *
     * 知識點二:
     * 下游調(diào)用request(n) 告訴上游它的處理能力硅堆,上游每發(fā)送一個next事件之后,requested就減一贿讹,注意是next事件渐逃,
     * complete和error事件不會消耗requested,當(dāng)減到0時民褂,則代表下游沒有處理能力了茄菊,這個時候你如果繼續(xù)發(fā)送事件,就會調(diào)用onError事件了
     */
    public void rxJavaFlowableResponseFetch(){//同步的情況
        Flowable<Integer> flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception {
                e.onNext(15);
                e.onNext(17);
                e.onNext(19);
                e.onNext(21);
                e.onComplete();
            }
        },BackpressureStrategy.ERROR);

        Subscriber<Integer> subcriber = new Subscriber<Integer>() {
            Subscription sub;
            @Override
            public void onSubscribe(Subscription s) {
                Log.e(TAG, "onSubscribe: ");
                sub = s;
                s.request(1);
            }

            @Override
            public void onNext(Integer integer) {
                Log.e(TAG, "onNext: " + integer);
                sub.request(1);
            }

            @Override
            public void onError(Throwable t) {
                Log.e(TAG, "onError: " + t.getMessage());
            }

            @Override
            public void onComplete() {
                Log.e(TAG, "onComplete: ");
            }
        };

        flowable.subscribe(subcriber);
    }

    /**
     * 可以看到赊堪,當(dāng)上下游工作在不同的線程里時面殖,每一個線程里都有一個requested,而我們調(diào)用request(1000)時哭廉,
     * 實際上改變的是下游主線程中的requested脊僚,而上游中的requested的值是由RxJava內(nèi)部調(diào)用request(n)去設(shè)置的,
     * 這個調(diào)用會在合適的時候自動觸發(fā)遵绰。
     *
     * question:什么時候是何時的時候辽幌?
     * 剛才同步的時候我們說了增淹,上游每發(fā)送一個事件,requested的值便會減一乌企,對于異步來說同樣如此埠通,
     * 那有人肯定有疑問了,一開始上游的requested的值是128逛犹,那這128個事件發(fā)送完了不就不能繼續(xù)發(fā)送了嗎端辱?
     */
    public void rxJavaFlowableResponseFetchAync(){//異步的情況
        //上下游都不做操作的情況
        Flowable<Integer> flowableNothingDoing = Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception {
                Log.e(TAG, "subscribe: request count is " + e.requested());//128
            }
        },BackpressureStrategy.ERROR);

        Subscriber<Integer> subcriberNothingDoing = new Subscriber<Integer>() {
            Subscription sub;
            @Override
            public void onSubscribe(Subscription s) {
                Log.e(TAG, "onSubscribe: ");
                sub = s;
            }

            @Override
            public void onNext(Integer integer) {
                Log.e(TAG, "onNext: " + integer);
            }

            @Override
            public void onError(Throwable t) {
                Log.e(TAG, "onError: " + t.getMessage());
            }

            @Override
            public void onComplete() {
                Log.e(TAG, "onComplete: ");
            }
        };

        flowableNothingDoing.subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(subcriberNothingDoing);

        Flowable<Integer> flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception {
                Log.e(TAG, "subscribe: request count is " + e.requested());//128
                e.onNext(15);
                e.onNext(17);
                e.onNext(19);
                e.onNext(21);
                e.onComplete();
            }
        },BackpressureStrategy.ERROR);

        Subscriber<Integer> subcriber = new Subscriber<Integer>() {
            Subscription sub;
            @Override
            public void onSubscribe(Subscription s) {
                Log.e(TAG, "onSubscribe: ");
                sub = s;
                s.request(1000);
            }

            @Override
            public void onNext(Integer integer) {
                Log.e(TAG, "onNext: " + integer);
                sub.request(1);
            }

            @Override
            public void onError(Throwable t) {
                Log.e(TAG, "onError: " + t.getMessage());
            }

            @Override
            public void onComplete() {
                Log.e(TAG, "onComplete: ");
            }
        };

        flowable.subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(subcriber);
    }

    /**
     * 可以看到,當(dāng)下游消費掉第96個事件之后虽画,上游又開始發(fā)事件了舞蔽,
     * 而且可以看到當(dāng)前上游的requested的值是96(打印出來的95是已經(jīng)發(fā)送了一個事件減一之后的值),
     * 最終發(fā)出了第223個事件之后又進入了等待區(qū)码撰,而223-127 正好等于 96渗柿。
     * 這是不是說明當(dāng)下游每消費96個事件便會自動觸發(fā)內(nèi)部的request()去設(shè)置上游的requested的值啊脖岛!沒錯朵栖,就是這樣,而這個新的值就是96柴梆。
     */
    public void lookWhatTimeHitRequestValue(){
        Flowable<Integer> flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception {
                Log.e(TAG, "subscribe: request count is " + e.requested());//128
                boolean flag;
                for (int i = 0; ; i++){
                    flag = false;
                    while(e.requested() == 0){
                        if (!flag) {
                            Log.e(TAG, "subscribe: Oop I can't emitter event");
                            flag =true;
                        }
                    }
                    e.onNext(i);
                    Log.e(TAG, "subscribe: emit " + i + " request is " + e.requested());
                }
            }
        },BackpressureStrategy.ERROR);

        Subscriber<Integer> subscriber = new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                Log.e(TAG, "onSubscribe: ");
                mSubscription = s;
            }

            @Override
            public void onNext(Integer integer) {
                Log.e(TAG, "onNext: " + integer);
            }

            @Override
            public void onError(Throwable t) {
                Log.e(TAG, "onError: " + t.getMessage());
            }

            @Override
            public void onComplete() {
                Log.e(TAG, "onComplete: ");
            }
        };

        flowable.subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(subscriber);
    }

    public void showMessageInText(String appendStr) {
        showRxMessage.append(appendStr + "\n");
    }
}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末陨溅,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子绍在,更是在濱河造成了極大的恐慌门扇,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,204評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件偿渡,死亡現(xiàn)場離奇詭異臼寄,居然都是意外死亡,警方通過查閱死者的電腦和手機溜宽,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,091評論 3 395
  • 文/潘曉璐 我一進店門吉拳,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人适揉,你說我怎么就攤上這事留攒。” “怎么了涡扼?”我有些...
    開封第一講書人閱讀 164,548評論 0 354
  • 文/不壞的土叔 我叫張陵稼跳,是天一觀的道長。 經(jīng)常有香客問我吃沪,道長汤善,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,657評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮红淡,結(jié)果婚禮上不狮,老公的妹妹穿的比我還像新娘。我一直安慰自己在旱,他們只是感情好摇零,可當(dāng)我...
    茶點故事閱讀 67,689評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著桶蝎,像睡著了一般驻仅。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上登渣,一...
    開封第一講書人閱讀 51,554評論 1 305
  • 那天噪服,我揣著相機與錄音,去河邊找鬼胜茧。 笑死粘优,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的呻顽。 我是一名探鬼主播雹顺,決...
    沈念sama閱讀 40,302評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼廊遍!你這毒婦竟也來了嬉愧?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,216評論 0 276
  • 序言:老撾萬榮一對情侶失蹤昧碉,失蹤者是張志新(化名)和其女友劉穎英染,沒想到半個月后揽惹,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體被饿,經(jīng)...
    沈念sama閱讀 45,661評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,851評論 3 336
  • 正文 我和宋清朗相戀三年搪搏,在試婚紗的時候發(fā)現(xiàn)自己被綠了狭握。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,977評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡疯溺,死狀恐怖论颅,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情囱嫩,我是刑警寧澤恃疯,帶...
    沈念sama閱讀 35,697評論 5 347
  • 正文 年R本政府宣布,位于F島的核電站墨闲,受9級特大地震影響今妄,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,306評論 3 330
  • 文/蒙蒙 一盾鳞、第九天 我趴在偏房一處隱蔽的房頂上張望犬性。 院中可真熱鬧,春花似錦腾仅、人聲如沸乒裆。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,898評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽鹤耍。三九已至,卻和暖如春验辞,著一層夾襖步出監(jiān)牢的瞬間惰蜜,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,019評論 1 270
  • 我被黑心中介騙來泰國打工受神, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留抛猖,地道東北人。 一個月前我還...
    沈念sama閱讀 48,138評論 3 370
  • 正文 我出身青樓鼻听,卻偏偏與公主長得像财著,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子撑碴,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,927評論 2 355

推薦閱讀更多精彩內(nèi)容