RxJava操作符

本文參考
這可能是最好的RxJava 2.x 教程(完結(jié)版)

RxJava 2.x 擁有了新的特性,其依賴于4個基礎(chǔ)接口螺男,它們分別是

  • Publisher

  • Subscriber

  • Subscription

  • Processor

其中最核心的莫過于 Publisher 和 Subscriber华望。Publisher 可以發(fā)出一系列的事件丈秩,而 Subscriber 負責和處理這些事件。

其中用的比較多的自然是 Publisher 的 Flowable烁巫,它支持背壓署隘。關(guān)于背壓給個簡潔的定義就是:

背壓是指在異步場景中,被觀察者發(fā)送事件速度遠快于觀察者的處理速度的情況下亚隙,一種告訴上游的被觀察者降低發(fā)送速度的策略磁餐。

簡而言之,背壓是流速控制的一種策略恃鞋。有興趣的可以看一下官方對于背壓的講解崖媚。

可以明顯地發(fā)現(xiàn),RxJava 2.x 最大的改動就是對于 backpressure 的處理恤浪,為此將原來的 Observable 拆分成了新的 Observable 和 Flowable畅哑,同時其他相關(guān)部分也同時進行了拆分,但令人慶幸的是水由,是它荠呐,是它,還是它砂客,還是我們最熟悉和最喜歡的 RxJava泥张。

觀察者模式

大家可能都知道, RxJava 以觀察者模式為骨架鞠值,在 2.0 中依舊如此媚创。

不過此次更新中,出現(xiàn)了兩種觀察者模式:

  • Observable ( 被觀察者 ) / Observer ( 觀察者 )

  • Flowable (被觀察者)/ Subscriber (觀察者)

image.png

在 RxJava 2.x 中彤恶,Observable 用于訂閱 Observer钞钙,不再支持背壓(1.x 中可以使用背壓策略),而 Flowable 用于訂閱 Subscriber 声离, 是支持背壓(Backpressure)的芒炼。

image.png

第一步:初始化 Observable

第二步:初始化 Observer

第三步:建立訂閱關(guān)系

  //        RxJava的鏈式操作
        Observable.create(new ObservableOnSubscribe<Integer>() {
            // 1. 創(chuàng)建被觀察者 & 生產(chǎn)事件
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onNext(4);
            }
        }).subscribe(new Observer<Integer>() {
            // 2. 通過通過訂閱(subscribe)連接觀察者和被觀察者
            // 3. 創(chuàng)建觀察者 & 定義響應事件的行為
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "開始采用subscribe連接");
            }
            // 默認最先調(diào)用復寫的 onSubscribe()

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "對Next事件" + value + "作出響應");

            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "對Error事件作出響應");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "對Complete事件作出響應");
            }
        });


線程調(diào)度

subScribeOn

用于指定發(fā)送者發(fā)送消息的線程

observeOn

用于指定接受者接送消息的線程

線程切換需要注意的

RxJava 內(nèi)置的線程調(diào)度器的確可以讓我們的線程切換得心應手,但其中也有些需要注意的地方术徊。

  • 簡單地說本刽,subscribeOn() 指定的就是發(fā)射事件的線程,observerOn 指定的就是訂閱者接收事件的線程。

  • 多次指定發(fā)射事件的線程只有第一次指定的有效子寓,也就是說多次調(diào)用 subscribeOn() 只有第一次的有效暗挑,其余的會被忽略。

  • 但多次指定訂閱者接收線程是可以的斜友,也就是說每調(diào)用一次 observerOn()窿祥,下游的線程就會切換一次。

  Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                Log.e(TAG, "Observable thread is : " + Thread.currentThread().getName());
                e.onNext(1);
                e.onComplete();
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.e(TAG, "After observeOn(mainThread)蝙寨,Current thread is " + Thread.currentThread().getName());
                    }
                })
                .observeOn(Schedulers.io())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.e(TAG, "After observeOn(io),Current thread is " + Thread.currentThread().getName());
                    }
                });

實例代碼中嗤瞎,分別用 Schedulers.newThread() 和 Schedulers.io() 對發(fā)射線程進行切換墙歪,并采用 observeOn(AndroidSchedulers.mainThread() 和 Schedulers.io() 進行了接收線程的切換”雌妫可以看到輸出中發(fā)射線程僅僅響應了第一個 newThread虹菲,但每調(diào)用一次 observeOn() ,線程便會切換一次掉瞳,因此如果我們有類似的需求時毕源,便知道如何處理了。

RxJava 中陕习,已經(jīng)內(nèi)置了很多線程選項供我們選擇霎褐,例如有:

  • Schedulers.io() 代表io操作的線程, 通常用于網(wǎng)絡,讀寫文件等io密集型的操作;

  • Schedulers.computation() 代表CPU計算密集型的操作, 例如需要大量計算的操作该镣;

  • Schedulers.newThread() 代表一個常規(guī)的新線程冻璃;

  • AndroidSchedulers.mainThread() 代表Android的主線程

這些內(nèi)置的 Scheduler 已經(jīng)足夠滿足我們開發(fā)的需求,因此我們應該使用內(nèi)置的這些選項损合,而 RxJava 內(nèi)部使用的是線程池來維護這些線程省艳,所以效率也比較高。

操作符

Create

create 操作符應該是最常見的操作符了嫁审,主要用于產(chǎn)生一個 Obserable 被觀察者對象跋炕,為了方便大家的認知,以后的教程中統(tǒng)一把被觀察者 Observable 稱為發(fā)射器(上游事件)律适,觀察者 Observer 稱為接收器(下游事件)辐烂。

image.png
 Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                Log.e(TAG, "Observable emit 1" + "\n");
                e.onNext(1);
                Log.e(TAG, "Observable emit 2" + "\n");
                e.onNext(2);
                Log.e(TAG, "Observable emit 3" + "\n");
                e.onNext(3);
                e.onComplete();
                Log.e(TAG, "Observable emit 4" + "\n" );
                e.onNext(4);
            }
        }).subscribe(new Observer<Integer>() {
            private int i;
            private Disposable mDisposable;

            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Log.e(TAG, "onSubscribe : " + d.isDisposed() + "\n" );
                mDisposable = d;
            }

            @Override
            public void onNext(@NonNull Integer integer) {
                Log.e(TAG, "onNext : value : " + integer + "\n" );
                i++;
                if (i == 2) {
                    // 在RxJava 2.x 中,新增的Disposable可以做到切斷的操作擦耀,讓Observer觀察者不再接收上游事件
                    mDisposable.dispose();
                    Log.e(TAG, "onNext : isDisposable : " + mDisposable.isDisposed() + "\n");
                }
            }

            @Override
            public void onError(@NonNull Throwable e) {
                Log.e(TAG, "onError : value : " + e.getMessage() + "\n" );
            }

            @Override
            public void onComplete() {
                Log.e(TAG, "onComplete" + "\n" );
            }
        });

需要注意的幾點是:
  • 在發(fā)射事件中棉圈,我們在發(fā)射了數(shù)值 3 之后,直接調(diào)用了 e.onComlete()眷蜓,雖然無法接收事件分瘾,但發(fā)送事件還是繼續(xù)的。

  • 另外一個值得注意的點是,在 RxJava 2.x 中德召,可以看到發(fā)射事件方法相比 1.x 多了一個 throws Excetion白魂,意味著我們做一些特定操作再也不用 try-catch 了。

  • 并且 2.x 中有一個 Disposable 概念上岗,這個東西可以直接調(diào)用切斷福荸,可以看到,當它的 isDisposed() 返回為 false 的時候肴掷,接收器能正常接收事件敬锐,但當其為 true 的時候,接收器停止了接收呆瞻。所以可以通過此參數(shù)動態(tài)控制接收事件了台夺。

Map

Map 基本算是 RxJava 中一個最簡單的操作符了,熟悉 RxJava 1.x 的知道痴脾,它的作用是對上游發(fā)送的每一個事件應用一個函數(shù)颤介,使得每一個事件都按照指定的函數(shù)去變化,而在 2.x 中它的作用幾乎一致赞赖。

image.png

        Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
            }
        }).map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return "this is result " + integer;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.e(TAG, "accept : " + s +"\n" );
            }
        });

map 基本作用就是將一個 Observable 通過某種函數(shù)關(guān)系滚朵,轉(zhuǎn)換為另一種 Observable,上面例子中就是把我們的 Integer 數(shù)據(jù)變成了 String 類型前域。從Log日志顯而易見辕近。

Zip

zip 專用于合并事件,該合并不是連接(連接操作符后面會說)匿垄,合并事件專用,分別從兩個上游事件中各取出一個組合,一個事件只能被使用一次亏推,順序嚴格按照事件發(fā)送的順序,最終下游事件收到的是和上游事件最少的數(shù)目相同(必須兩兩配對,多余的舍棄)

    Observable.zip(getStringObservable(), getInterObservable(), new BiFunction<String, Integer, String>() {

            @Override
            public String apply(String s, Integer integer) throws Exception {
                return s+integer;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.e(TAG, "zip : accept : " + s + "\n");
            }
        });


    private Observable<String> getStringObservable() {
        return Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                if (!e.isDisposed()) {
                    e.onNext("A");
                    Log.e(TAG, "String emit : A \n");
                    e.onNext("B");
                    Log.e(TAG, "String emit : B \n");
                    e.onNext("C");
                    Log.e(TAG, "String emit : C \n");
                }
            }
        });

    }

    private Observable<Integer> getInterObservable() {
        return Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                if (!e.isDisposed()) {
                    e.onNext(1);
                    Log.e(TAG, "Integer emit : 1 \n");
                    e.onNext(2);
                    Log.e(TAG, "Integer emit : 2 \n");
                    e.onNext(3);
                    Log.e(TAG, "Integer emit : 3 \n");
                    e.onNext(4);
                    Log.e(TAG, "Integer emit : 4 \n");
                    e.onNext(5);
                    Log.e(TAG, "Integer emit : 5 \n");
                }
            }
        });

    }

運行結(jié)果.png
  • zip 組合事件的過程就是分別從發(fā)射器 A 和發(fā)射器 B 各取出一個事件來組合年堆,并且一個事件只能被使用一次吞杭,組合的順序是嚴格按照事件發(fā)送的順序來進行的,運行后可以看到变丧,1 永遠是和 A 結(jié)合的芽狗,2 永遠是和 B 結(jié)合的。

  • 最終接收器收到的事件數(shù)量是和發(fā)送器發(fā)送事件最少的那個發(fā)送器的發(fā)送事件數(shù)目相同痒蓬,運行后看到童擎,5 很孤單,沒有人愿意和它交往攻晒,孤獨終老的單身狗顾复。

Concat

對于單一的把兩個發(fā)射器連接成一個發(fā)射器,雖然 zip 不能完成鲁捏,但我們還是可以自力更生芯砸,官方提供的 concat 讓我們的問題得到了完美解決。

image.png
 //兩個發(fā)射器連接成一個發(fā)射器
        Observable.concat(Observable.just(1, 2, 3, 4), Observable.just("hello", 5, 6))
                .subscribe(new Consumer<Object>() {
                    @Override
                    public void accept(Object serializable) throws Exception {
                        Log.e(TAG, "concat : "+ serializable + "\n" );
                    }
                });

FlatMap

FlatMap將一個發(fā)送事件的上游Observable變換成多個發(fā)送事件的Observables,然后將它們發(fā)射的事件合并后放進一個單獨的Observable里假丧,flatMap 并不能保證事件的順序

image.png
  Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
            }
        }).flatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                List<String> list = new ArrayList<>();
                for (int i = 0; i < 3; i++) {
                    list.add("I am value " + integer);
                }
                int delayTime = (int) (1 + Math.random() * 100);

                return Observable.fromIterable(list).delay(delayTime, TimeUnit.MICROSECONDS);
//                return Observable.fromIterable(list);
            }
        }).subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<Object>() {
            @Override
            public void accept(Object s) throws Exception {
                Log.e(TAG, "flatMap : accept : " + s + "\n");

            }
        });

ConcatMap

ConcatMap將一個發(fā)送事件的上游Observable變換成多個發(fā)送事件Observables双揪,然后將它們發(fā)射的事件合并后放進一個單獨的Observable里,flatMap 并不能保證事件的順序包帚,ConcatMap 可以保證事件發(fā)送的順序

    Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
            }
        }).concatMap(new Function<Integer, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(Integer integer) throws Exception {
                List<String> list = new ArrayList<>();
                for (int i = 0; i < 3; i++) {
                    list.add("I am value " + integer);
                }
                int delayTime = (int) (1 + Math.random() * 10);
                return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS);
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Object>() {
                    @Override
                    public void accept(Object o) throws Exception {
                        Log.e(TAG, "flatMap : accept : " + o + "\n");
                    }
                });

distinct

去重操作符渔期,其實就是簡單的去重

image.png
   Observable.just(1, 2, 3, 1, 2, 34, 45, 55)
                .distinct()
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.e(TAG, "distinct : " + integer + "\n");
                    }
                });

Filter

Filter 過濾操作符】拾睿可以接受一個參數(shù)疯趟,讓其過濾掉不符合我們條件的值

image.png
    Observable.just(1, 20, 65, -5, 19)
        .filter(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Exception {
                return integer >= 10;
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, "filter : " + integer + "\n");
            }
        });

buffer

buffer 操作符接受兩個參數(shù),buffer(count,skip)谋梭,作用是將 Observable 中的數(shù)據(jù)按 skip (步長) 分成最大不超過 count 的 buffer 迅办,然后生成一個 Observable

image.png

 Observable.just(1, 2, 3, 4, 5)
                .buffer(3,2)
                .subscribe(new Consumer<List<Integer>>() {
                    @Override
                    public void accept(List<Integer> integers) throws Exception {
                        Log.e(TAG, "buffer size : " + integers.size() + "\n");
                        Log.e(TAG, "buffer value : " + integers.toString());
                        for (Integer i : integers) {
                            Log.e(TAG, i + "");
                        }
                        Log.e(TAG, "\n");
                    }
                });

timer

在Rxjava中timer操作符既可以延遲執(zhí)行一段邏輯,也可以間隔執(zhí)行一段邏輯【注意】但在RxJava2.x已經(jīng)過時了章蚣,現(xiàn)在用interval操作符來間隔執(zhí)行,但需要注意的是姨夹,timer 和 interval 均默認在新線程纤垂。

image.png

 Observable.timer(2, TimeUnit.SECONDS)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.e(TAG, "timer :" + aLong + " at " + DateUtil.getStringDate() + "\n");
                    }
                });

interval

interval 操作符用于間隔時間執(zhí)行某個操作,其接受三個參數(shù)磷账,分別是第一次發(fā)送延遲峭沦,間隔時間,時間單位逃糟。默認在新線程

image.png

    Disposable mDisposable;

   Log.e(TAG, "interval start : " + DateUtil.getStringDate() + "\n");
        mDisposable = Observable.interval(3, 2, TimeUnit.SECONDS)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())// 由于interval默認在新線程吼鱼,所以我們應該切回主線程
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.e(TAG, "interval :" + aLong + " at " + DateUtil.getStringDate() + "\n");
                    }
                });


    @Override
    protected void onDestroy() {
        super.onDestroy();
        if (mDisposable != null && !mDisposable.isDisposed()) {
            mDisposable.dispose();
        }
    }

doOnNext

它的作用是讓訂閱者在接收到數(shù)據(jù)之前干點有意思的事情。假如我們在獲取到數(shù)據(jù)之前想先保存一下它

    Observable.just(1, 2, 3, 4)
        .doOnNext(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, "doOnNext 保存 " + integer + "成功" + "\n");
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, "doOnNext :" + integer + "\n");
            }
        });

skip

skip 很有意思绰咽,其實作用就和字面意思一樣菇肃,接受一個 long 型參數(shù) count ,代表跳過 count 個數(shù)目開始接收取募。

image.png

     Observable.just(1, 2, 3, 4, 5).skip(2)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.e(TAG, "skip : "+integer + "\n");
                    }
                });

take

take琐谤,接受一個 long 型參數(shù) count ,用于指定訂閱者最多收到多少數(shù)據(jù)

image.png
    Flowable.fromArray(1, 2, 3, 4, 5)
        .take(2)//最多接收多少個參數(shù)
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, "accept: take : "+integer + "\n" );
            }
        });

just

就是一個簡單的發(fā)射器依次調(diào)用 onNext() 方法玩敏。

image.png
   Observable.just("1", "2",1,5,7)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<Serializable>() {
            @Override
            public void accept(Serializable serializable) throws Exception {
                Log.e(TAG,"accept : onNext : " + serializable + "\n" );
            }
        });

Single

顧名思義斗忌,Single 只會接收一個參數(shù),而 SingleObserver 只會調(diào)用 onError() 或者 onSuccess()旺聚。

   Single.just(new Random().nextInt())
                .subscribe(new SingleObserver<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onSuccess(Integer value) {
                        Log.e(TAG, "single : onSuccess : "+value+"\n" );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG, "single : onError : "+e.getMessage()+"\n");
                    }
                });

debounce

去除發(fā)送頻率過快的項 debounce(500, TimeUnit.MILLISECONDS) 去除發(fā)送間隔時間小于500毫秒的發(fā)射事件织阳、或者用于過濾掉發(fā)射速率過快的數(shù)據(jù)項

image.png

 Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                // send events with simulated time wait
                emitter.onNext(1); // skip
                Thread.sleep(400);
                emitter.onNext(2); // deliver
                Thread.sleep(505);
                emitter.onNext(3); // skip
                Thread.sleep(100);
                emitter.onNext(4); // deliver
                Thread.sleep(605);
                emitter.onNext(5); // deliver
                Thread.sleep(510);
                emitter.onComplete();
            }
        }).debounce(500, TimeUnit.MICROSECONDS) //去除發(fā)送間隔小于500毫秒的發(fā)送事件
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.e(TAG, "debounce :" + integer + "\n");
                    }
                });

Defer

簡單地時候就是每次訂閱都會創(chuàng)建一個新的 Observable,并且如果沒有被訂閱砰粹,就不會產(chǎn)生新的 Observable唧躲。

image.png

   Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
            @Override
            public ObservableSource<? extends Integer> call() throws Exception {
                return Observable.just(1, 2, 3, 4, 5);
            }
        });
        observable.subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer value) {
                Log.e(TAG, "defer : " + value + "\n");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "defer : onError : " + e.getMessage() + "\n");
            }

            @Override
            public void onComplete() {
                Log.e(TAG, "defer : onComplete\n");
            }
        });

last

last 操作符僅取出可觀察到的最后一個值,或者是滿足某些條件的最后一項。

image.png

    Observable.just(1, 2, 3,99,33,0)
        .last(5)
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e("RxJavaAct", "last : " + integer + "\n");
            }
        });

merge

merge 合并惊窖,熟悉版本控制工具的你一定不會不知道 merge 命令刽宪,而在 Rx 操作符中,merge 的作用是把多個 Observable 結(jié)合起來界酒,接受可變參數(shù)圣拄,也支持迭代器集合。注意它和 concat 的區(qū)別在于毁欣,不用等到 發(fā)射器 A 發(fā)送完所有的事件再進行發(fā)射器 B 的發(fā)送

image.png
  Observable.merge(Observable.just(1, 2), Observable.just(3, 4, 5, 99))
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.e("RxJavaAct", "accept : " + integer + "\n");
                    }
                });

reduce

reduce 操作符一次用一個方法處理一個值庇谆,可以有一個 seed 作為初始值。

image.png
   Observable.just(1, 2, 3)
                .reduce(new BiFunction<Integer, Integer, Integer>() {
                    //我們中間采用 reduce 凭疮,支持一個 function 為兩數(shù)值相加
                    @Override
                    public Integer apply(Integer integer, Integer integer2) throws Exception {
                        Log.e("RxJavaAct", "BiFunction: apply : " + integer + "  +  " + integer2 + " = " + (integer + integer2) + "\n");

                        return integer + integer2;
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e("RxJavaAct", "accept: reduce : " + integer + "\n");
            }
        });

scan

scan 操作符作用和上面的 reduce 一致饭耳,唯一區(qū)別是 reduce 是個只追求結(jié)果的壞人,而 scan 會始終如一地把每一個步驟都輸出执解。

image.png
   Observable.just(1, 2, 3)
                .scan(new BiFunction<Integer, Integer, Integer>() {
                    @Override
                    public Integer apply(Integer integer, Integer integer2) throws Exception {
                        Log.e("RxJavaAct", "BiFunction: apply : " + integer + "  +  " + integer2 + " = " + (integer + integer2) + "\n");

                        return integer + integer2;
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e("RxJavaAct", "accept: reduce : " + integer + "\n");
            }
        });

window

按照時間劃分窗口寞肖,將數(shù)據(jù)發(fā)送給不同的Observable

image.png
  Log.e("RxJavaAct", "window\n");
        Observable.interval(1, TimeUnit.SECONDS)
                .take(15)
                .window(3, TimeUnit.SECONDS)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Observable<Long>>() {
                    @Override
                    public void accept(Observable<Long> longObservable) throws Exception {
                        Log.e("RxJavaAct", "Sub Divide begin...\n");
                        longObservable.subscribeOn(Schedulers.io())
                                .observeOn(AndroidSchedulers.mainThread())
                                .subscribe(new Consumer<Long>() {
                                    @Override
                                    public void accept(Long aLong) throws Exception {

                                        Log.e("RxJavaAct", "Next:" + aLong + "\n");
                                    }
                                });
                    }
                });

PublishSubject

onNext() 會通知每個觀察者,僅此而已

PublishSubject<Integer> publishSubject = PublishSubject.create();
        publishSubject.subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Log.e("RxJavaAct", "First onSubscribe :"+d.isDisposed()+"\n");
            }

            @Override
            public void onNext(@NonNull Integer integer) {
                Log.e("RxJavaAct", "First onNext value :"+integer + "\n");
            }

            @Override
            public void onError(@NonNull Throwable e) {
                Log.e("RxJavaAct", "First onError:"+e.getMessage()+"\n" );
            }

            @Override
            public void onComplete() {
                Log.e("RxJavaAct", "First onComplete!\n");
            }
        });

        publishSubject.onNext(1);
        publishSubject.onNext(2);
        publishSubject.onNext(3);

        publishSubject.subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Log.e("RxJavaAct", "Second onSubscribe :"+d.isDisposed()+"\n");
            }

            @Override
            public void onNext(@NonNull Integer integer) {
                Log.e("RxJavaAct", "Second onNext value :"+integer + "\n");
            }

            @Override
            public void onError(@NonNull Throwable e) {
                Log.e("RxJavaAct", "Second onError:"+e.getMessage()+"\n" );
            }

            @Override
            public void onComplete() {
                Log.e("RxJavaAct", "Second onComplete!\n");
            }
        });

        publishSubject.onNext(4);
        publishSubject.onNext(5);
        publishSubject.onComplete();

AsyncSubject

在調(diào)用 onComplete() 之前衰腌,除了 subscribe() 其它的操作都會被緩存新蟆,
在調(diào)用 onComplete() 之后只有最后一個 onNext() 會生效

 AsyncSubject<Integer> asyncSubject = AsyncSubject.create();

        asyncSubject.subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Log.e("RxJavaAct", "First onSubscribe :"+d.isDisposed()+"\n");
            }

            @Override
            public void onNext(@NonNull Integer integer) {
                Log.e("RxJavaAct", "First onNext value :"+integer + "\n");
            }

            @Override
            public void onError(@NonNull Throwable e) {
                Log.e("RxJavaAct", "First onError:"+e.getMessage()+"\n" );
            }

            @Override
            public void onComplete() {
                Log.e("RxJavaAct", "First onComplete!\n");
            }
        });

        asyncSubject.onNext(1);
        asyncSubject.onNext(2);
        asyncSubject.onNext(3);

        asyncSubject.subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Log.e("RxJavaAct", "Second onSubscribe :"+d.isDisposed()+"\n");
            }

            @Override
            public void onNext(@NonNull Integer integer) {
                Log.e("RxJavaAct", "Second onNext value :"+integer + "\n");
            }

            @Override
            public void onError(@NonNull Throwable e) {
                Log.e("RxJavaAct", "Second onError:"+e.getMessage()+"\n" );
            }

            @Override
            public void onComplete() {
                Log.e("RxJavaAct", "Second onComplete!\n");
            }
        });

        asyncSubject.onNext(4);
        asyncSubject.onNext(5);
        asyncSubject.onComplete();

BehaviorSubject

// BehaviorSubject 的最后一次 onNext() 操作會被緩存,
// 然后在 subscribe() 后立刻推給新注冊的 Observer

 BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.create();

        behaviorSubject.subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Log.e("RxJavaAct", "First onSubscribe :"+d.isDisposed()+"\n");
            }

            @Override
            public void onNext(@NonNull Integer integer) {
                Log.e("RxJavaAct", "First onNext value :"+integer + "\n");
            }

            @Override
            public void onError(@NonNull Throwable e) {
                Log.e("RxJavaAct", "First onError:"+e.getMessage()+"\n" );
            }

            @Override
            public void onComplete() {
                Log.e("RxJavaAct", "First onComplete!\n");
            }
        });

        behaviorSubject.onNext(1);
        behaviorSubject.onNext(2);
        behaviorSubject.onNext(3);

        behaviorSubject.subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Log.e("RxJavaAct", "Second onSubscribe :"+d.isDisposed()+"\n");
            }

            @Override
            public void onNext(@NonNull Integer integer) {
                Log.e("RxJavaAct", "Second onNext value :"+integer + "\n");
            }

            @Override
            public void onError(@NonNull Throwable e) {
                Log.e("RxJavaAct", "Second onError:"+e.getMessage()+"\n" );
            }

            @Override
            public void onComplete() {
                Log.e("RxJavaAct", "Second onComplete!\n");
            }
        });

        behaviorSubject.onNext(4);
        behaviorSubject.onNext(5);
        behaviorSubject.onComplete();

Completable

// 只關(guān)心結(jié)果右蕊,也就是說 Completable 是沒有 onNext 的琼稻,
// 要么成功要么出錯,不關(guān)心過程饶囚,在 subscribe 后的某個時間點返回結(jié)果

  Log.e("RxJavaAct", "Completable\n");

        Completable.timer(1, TimeUnit.SECONDS)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new CompletableObserver() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {
                        Log.e("RxJavaAct", "onSubscribe : d :" + d.isDisposed() + "\n");
                    }

                    @Override
                    public void onComplete() {
                        Log.e("RxJavaAct", "onComplete\n");
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {
                        Log.e("RxJavaAct", "onError :" + e.getMessage() + "\n");
                    }
                });

Flowable

專用于解決被壓問題

  Flowable.just(1, 2, 3, 4)
                //seed 作為初始值
                .reduce(100, new BiFunction<Integer, Integer, Integer>() {
                    @Override
                    public Integer apply(Integer integer, Integer integer2) throws Exception {
                        Log.e("RxJavaAct", "reduce :" + integer + "   +   " + integer2 + "  =  " + (integer + integer2) + "\n");
                        return integer + integer2;
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e("RxJavaAct", "Flowable :" + integer + "\n");

            }
        });

上邊這些操作符的使用場景

采用 OkHttp3 配合 map , doOnNext , 線程切換做簡單的網(wǎng)絡請求

  • 1帕翻、通過 Observable.create() 方法,調(diào)用 OkHttp 網(wǎng)絡請求;
  • 2萝风、通過 map 操作符結(jié)合 Gson , 將 Response 轉(zhuǎn)換為 bean 類;
  • 3嘀掸、通過 doOnNext() 方法,解析 bean 中的數(shù)據(jù)规惰,并進行數(shù)據(jù)庫存儲等操作;
  • 4横殴、調(diào)度線程,在子線程進行耗時操作任務卿拴,在主線程更新 UI;
  • 5衫仑、通過 subscribe(),根據(jù)請求成功或者失敗來更新 UI。
   Observable.create(new ObservableOnSubscribe<Response>() {
            @Override
            public void subscribe(ObservableEmitter<Response> e) throws Exception {
                Request.Builder builder = new Request.Builder()
                        .url("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512")
                        .get();
                Request request = builder.build();
                Call call = new OkHttpClient().newCall(request);
                Response response = call.execute();
                e.onNext(response);
            }
        }).map(new Function<Response, MobileAddress>() {
            @Override
            public MobileAddress apply(Response response) throws Exception {

                Log.e(TAG, "map 線程:" + Thread.currentThread().getName() + "\n");
                if (response.isSuccessful()) {
                    ResponseBody body = response.body();
                    if (body != null) {
                        Log.e(TAG, "map:轉(zhuǎn)換前:" + response.body().toString());
                        return new Gson().fromJson(body.string(), MobileAddress.class);
                    }
                }
                return null;
            }
        }).observeOn(AndroidSchedulers.mainThread())
                .doOnNext(new Consumer<MobileAddress>() {
                    @Override
                    public void accept(MobileAddress mobileAddress) throws Exception {
                        Log.e(TAG, "doOnNext 線程:" + Thread.currentThread().getName() + "\n");
                        Log.e(TAG, "doOnNext: 保存成功:" + mobileAddress.toString() + "\n");
                    }
                }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<MobileAddress>() {
            @Override
            public void accept(MobileAddress mobileAddress) throws Exception {
                Log.e(TAG, "subscribe 線程:" + Thread.currentThread().getName() + "\n");
                Log.e(TAG, "成功:" + mobileAddress.toString() + "\n");
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.e(TAG, "subscribe 線程:" + Thread.currentThread().getName() + "\n");
                Log.e(TAG, "失敹榛ā:" + throwable.getMessage() + "\n");
            }
        });

使用框架 rx2-Networking

  • 1文狱、通過 Rx2AndroidNetworking 的 get() 方法獲取 Observable 對象(已解析);
  • 2、調(diào)度線程缘挽,根據(jù)請求結(jié)果更新 UI瞄崇。
  • 3呻粹、 implementation 'com.amitshekhar.android:rx2-android-networking:1.0.0'
Rx2AndroidNetworking.get("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512")
                .build()
                .getObjectObservable(MobileAddress.class)
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext(new Consumer<MobileAddress>() {
                    @Override
                    public void accept(MobileAddress mobileAddress) throws Exception {
                        Log.e(TAG, "doOnNext:" + Thread.currentThread().getName() + "\n");
                        Log.e(TAG, "doOnNext:" + mobileAddress.toString() + "\n");
                    }
                }).map(new Function<MobileAddress, MobileAddress.ResultBean>() {

            @Override
            public MobileAddress.ResultBean apply(MobileAddress mobileAddress) throws Exception {
                Log.e(TAG, "\n" );
                Log.e(TAG, "map:"+Thread.currentThread().getName()+"\n" );
                return mobileAddress.getResult();
            }
        }).subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<MobileAddress.ResultBean>() {
            @Override
            public void accept(MobileAddress.ResultBean resultBean) throws Exception {
                Log.e(TAG, "subscribe 成功:" + Thread.currentThread().getName() + "\n");
                Log.e(TAG, "成功:" + resultBean.toString() + "\n");
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.e(TAG, "subscribe 失敗:"+Thread.currentThread().getName()+"\n" );
                Log.e(TAG, "失敗:"+ throwable.getMessage()+"\n" );
            }
        });

zip操作符的使用場景

  • 結(jié)合多個接口的數(shù)據(jù)再更新 UI
  • zip 操作符可以把多個 Observable 的數(shù)據(jù)接口成一個數(shù)據(jù)源再發(fā)出去
  Observable<MobileAddress> observable1 = Rx2AndroidNetworking.get("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512")
                .build()
                .getObjectObservable(MobileAddress.class);

        Observable<CategoryResult> observable2 = Network.getGankApi()
                .getCategoryData("Android", 1, 1);

        Observable.zip(observable1, observable2, new BiFunction<MobileAddress, CategoryResult, String>() {
            @Override
            public String apply(MobileAddress mobileAddress, CategoryResult categoryResult) throws Exception {
                return "合并后的數(shù)據(jù)為:手機歸屬地:"+mobileAddress.getResult().getMobilearea()+"人名:"+categoryResult.toString();
            }
        }).subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.e(TAG, "accept: 成功:" + s+"\n");

            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.e(TAG, "accept: 失斔昭小:" + throwable+"\n");
            }
        });

flatMap 使用場景

  • 多個網(wǎng)絡請求依次依賴
  • 1等浊、注冊用戶前先通過接口A獲取當前用戶是否已注冊,再通過接口B注冊;
  • 2摹蘑、注冊后自動登錄筹燕,先通過注冊接口注冊用戶信息,注冊成功后馬上調(diào)用登錄接口進行自動登錄衅鹿。
Rx2AndroidNetworking.get("http://www.tngou.net/api/food/list")
                .addQueryParameter("rows", 1 + "")
                .build()
                .getObjectObservable(FoodList.class) // 發(fā)起獲取食品列表的請求撒踪,并解析到FootList
                .subscribeOn(Schedulers.io())        // 在io線程進行網(wǎng)絡請求
                .observeOn(AndroidSchedulers.mainThread()) // 在主線程處理獲取食品列表的請求結(jié)果
                .doOnNext(new Consumer<FoodList>() {
                    @Override
                    public void accept(@NonNull FoodList foodList) throws Exception {
                        // 先根據(jù)獲取食品列表的響應結(jié)果做一些操作
                        Log.e(TAG, "accept: doOnNext :" + foodList.toString());
                    }
                })
                .observeOn(Schedulers.io()) // 回到 io 線程去處理獲取食品詳情的請求
                .flatMap(new Function<FoodList, ObservableSource<FoodDetail>>() {
                    @Override
                    public ObservableSource<FoodDetail> apply(@NonNull FoodList foodList) throws Exception {
                        if (foodList != null && foodList.getTngou() != null && foodList.getTngou().size() > 0) {
                            return Rx2AndroidNetworking.post("http://www.tngou.net/api/food/show")
                                    .addBodyParameter("id", foodList.getTngou().get(0).getId() + "")
                                    .build()
                                    .getObjectObservable(FoodDetail.class);
                        }
                        return null;

                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<FoodDetail>() {
                    @Override
                    public void accept(@NonNull FoodDetail foodDetail) throws Exception {
                        Log.e(TAG, "accept: success :" + foodDetail.toString());
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {
                        Log.e(TAG, "accept: error :" + throwable.getMessage());
                    }
                });

concat 使用場景

  • 先讀取緩存數(shù)據(jù)再讀取網(wǎng)絡請求
  • 實用場景中經(jīng)常會用到緩存數(shù)據(jù),以通過減少頻繁的網(wǎng)絡請求達到節(jié)約流量:
  • 1大渤、concat 可以做到不交錯的發(fā)射兩個甚至多個 Observable 的發(fā)射物;
  • 2制妄、并且只有前一個 Observable 終止(onComplete)才會訂閱下一個 Observable
 
//        先讀取緩存再讀取網(wǎng)絡
        Observable<FoodList> cache = Observable.create(new ObservableOnSubscribe<FoodList>() {
            @Override
            public void subscribe(ObservableEmitter<FoodList> e) throws Exception {
                Log.e(TAG, "create當前線程:" + Thread.currentThread().getName());
                FoodList data = CacheManager.getInstance().getFoodListJsonData();

//                在操作符concat 中,只有調(diào)用onComplete 之后才會執(zhí)行下一個Observable
                if (data != null) {//如果緩存數(shù)據(jù)不為空泵三,則直接讀取緩存數(shù)據(jù)耕捞,而不讀取網(wǎng)絡數(shù)據(jù)
                    isFromNet = false;
                    Log.e(TAG, "\nsubscribe: 讀取緩存數(shù)據(jù):");
                    runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            Log.e(TAG, "正在 : 讀取緩存數(shù)據(jù):");
                        }
                    });
                    e.onNext(data);
                } else {
                    isFromNet = true;
                    runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            Log.e(TAG, "正在 : 讀取網(wǎng)絡數(shù)據(jù):");

                        }
                    });
                    Log.e(TAG, "\nsubscribe: 讀取網(wǎng)絡數(shù)據(jù):");
                    e.onComplete();
                }
            }
        });

//        請求網(wǎng)絡數(shù)據(jù)
        Observable<FoodList> network = Rx2AndroidNetworking.get("http://www.tngou.net/api/food/list")
                .addQueryParameter("rows", 10 + "")
                .build()
                .getObjectObservable(FoodList.class);

//        Concat 先讀取緩存數(shù)據(jù)并展示UI再獲取網(wǎng)絡數(shù)據(jù)刷新UI
//        1、concat 可以做到不交錯的發(fā)射兩個甚至多個 Observable 的發(fā)射物;
//        2烫幕、并且只有前一個 Observable 終止(onComplete)才會訂閱下一個 Observable

//        兩個 Observable 的泛型應當保持一致
        Observable.concat(cache, network)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<FoodList>() {
                    @Override
                    public void accept(FoodList foodList) throws Exception {
                        Log.e(TAG, "subscribe 成功:" + Thread.currentThread().getName());
                        if (isFromNet) {
                            Log.e(TAG, "accept : 網(wǎng)絡獲取數(shù)據(jù)設(shè)置緩存: \n" + foodList.toString());
                            CacheManager.getInstance().setFoodListJsonData(foodList);
                        }
                        Log.e(TAG, "accept: 讀取數(shù)據(jù)成功:" + foodList.toString());
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        Log.e(TAG, "subscribe 失敗:" + Thread.currentThread().getName());
                        Log.e(TAG, "accept: 讀取數(shù)據(jù)失敯吵椤:" + throwable.getMessage());
                    }
                });

debounce 使用場景

  • 減少頻繁的網(wǎng)絡請求
  • 設(shè)想情景:輸入框數(shù)據(jù)變化或者點擊一次按鈕時就要進行網(wǎng)絡請求,這樣會產(chǎn)生大量的網(wǎng)絡請求纬霞,而實際上又不需要
  • 這時候可以通過 debounce 過濾掉發(fā)射頻率過快的請求。

        btn_debounce = findViewById(R.id.btn_debounce);

//        減少頻繁的網(wǎng)絡請求
        RxView.clicks(btn_debounce)
                .debounce(2, TimeUnit.SECONDS) // 過濾掉發(fā)射頻率小于2秒的發(fā)射事件
                .subscribe(new Consumer<Object>() {
                    @Override
                    public void accept(@NonNull Object o) throws Exception {
                        clickBtn();
                    }
                });


    private void clickBtn() {
        Rx2AndroidNetworking.get("http://www.tngou.net/api/food/list")
                .addQueryParameter("rows", 2 + "") // 只獲取兩條數(shù)據(jù)
                .build()
                .getObjectObservable(FoodList.class)
                .subscribeOn(Schedulers.io())  // 在 io 線程進行網(wǎng)絡請求
                .observeOn(AndroidSchedulers.mainThread()) // 在主線程進行更新UI等操作
                .subscribe(new Consumer<FoodList>() {
                    @Override
                    public void accept(@NonNull FoodList foodList) throws Exception {
                        Log.e(TAG, "accept: 獲取數(shù)據(jù)成功:" + foodList.toString() + "\n");
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {
                        Log.e(TAG, "accept: 獲取數(shù)據(jù)失斍浴:" + throwable.getMessage() + "\n");
                    }
                });
    }

interval 使用場景

  • 間隔任務實現(xiàn)心跳
  • 可能我們會遇上各種即時通訊诗芜,如果是自己家開發(fā)的 IM 即時通訊,我相信在移動端一定少不了心跳包的管理
  • 而我們 RxJava 2.x 的 interval 操作符棒我們解決了這個問題埃疫。

    private Disposable mDisposable;

//        間隔任務實現(xiàn)心跳
        mDisposable = Flowable
                .interval(1, TimeUnit.SECONDS)
                .doOnNext(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.e(TAG, "accept: doOnNext : " + aLong);
                    }
                }).observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.e(TAG, "accept: 設(shè)置文本 :" + aLong);
                    }
                });
 @Override
    protected void onDestroy() {
        super.onDestroy();
        if (mDisposable != null) {
            mDisposable.dispose();
        }
    }


線程調(diào)度


        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                Log.e(TAG, "Observable thread is : " + Thread.currentThread().getName());
                e.onNext(1);
                e.onComplete();
            }
        }).subscribeOn(Schedulers.newThread())
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        Log.e(TAG, "After observeOn(mainThread)伏恐,Current thread is " + Thread.currentThread().getName());
                    }
                })
                .observeOn(Schedulers.io())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        Log.e(TAG, "After observeOn(io),Current thread is " + Thread.currentThread().getName());
                    }
                });

或許上邊的代碼接口有些調(diào)用不了

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末栓霜,一起剝皮案震驚了整個濱河市翠桦,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌胳蛮,老刑警劉巖销凑,帶你破解...
    沈念sama閱讀 222,183評論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異仅炊,居然都是意外死亡斗幼,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,850評論 3 399
  • 文/潘曉璐 我一進店門抚垄,熙熙樓的掌柜王于貴愁眉苦臉地迎上來蜕窿,“玉大人谋逻,你說我怎么就攤上這事⊥┚” “怎么了毁兆?”我有些...
    開封第一講書人閱讀 168,766評論 0 361
  • 文/不壞的土叔 我叫張陵,是天一觀的道長阴挣。 經(jīng)常有香客問我气堕,道長,這世上最難降的妖魔是什么屯吊? 我笑而不...
    開封第一講書人閱讀 59,854評論 1 299
  • 正文 為了忘掉前任送巡,我火速辦了婚禮,結(jié)果婚禮上盒卸,老公的妹妹穿的比我還像新娘骗爆。我一直安慰自己,他們只是感情好蔽介,可當我...
    茶點故事閱讀 68,871評論 6 398
  • 文/花漫 我一把揭開白布摘投。 她就那樣靜靜地躺著,像睡著了一般虹蓄。 火紅的嫁衣襯著肌膚如雪犀呼。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,457評論 1 311
  • 那天薇组,我揣著相機與錄音外臂,去河邊找鬼。 笑死律胀,一個胖子當著我的面吹牛宋光,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播炭菌,決...
    沈念sama閱讀 40,999評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼罪佳,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了黑低?” 一聲冷哼從身側(cè)響起赘艳,我...
    開封第一講書人閱讀 39,914評論 0 277
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎克握,沒想到半個月后蕾管,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,465評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡菩暗,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,543評論 3 342
  • 正文 我和宋清朗相戀三年娇掏,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片勋眯。...
    茶點故事閱讀 40,675評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡婴梧,死狀恐怖下梢,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情塞蹭,我是刑警寧澤孽江,帶...
    沈念sama閱讀 36,354評論 5 351
  • 正文 年R本政府宣布,位于F島的核電站番电,受9級特大地震影響岗屏,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜漱办,卻給世界環(huán)境...
    茶點故事閱讀 42,029評論 3 335
  • 文/蒙蒙 一这刷、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧娩井,春花似錦暇屋、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,514評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至扬霜,卻和暖如春定鸟,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背著瓶。 一陣腳步聲響...
    開封第一講書人閱讀 33,616評論 1 274
  • 我被黑心中介騙來泰國打工联予, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人材原。 一個月前我還...
    沈念sama閱讀 49,091評論 3 378
  • 正文 我出身青樓沸久,卻偏偏與公主長得像,于是被迫代替她去往敵國和親华糖。 傳聞我的和親對象是個殘疾皇子麦向,可洞房花燭夜當晚...
    茶點故事閱讀 45,685評論 2 360

推薦閱讀更多精彩內(nèi)容

  • 一瘟裸、RxJava操作符概述 RxJava中的操作符就是為了提供函數(shù)式的特性客叉,函數(shù)式最大的好處就是使得數(shù)據(jù)處理簡潔易...
    BrotherChen閱讀 1,624評論 0 10
  • 一、RxJava操作符概述 RxJava中的操作符就是為了提供函數(shù)式的特性话告,函數(shù)式最大的好處就是使得數(shù)據(jù)處理簡潔易...
    無求_95dd閱讀 3,112評論 0 21
  • 一兼搏、RxJava操作符概述 RxJava中的操作符就是為了提供函數(shù)式的特性,函數(shù)式最大的好處就是使得數(shù)據(jù)處理簡潔易...
    測天測地測空氣閱讀 637評論 0 1
  • RxJava操作符圖譜 創(chuàng)建操作符 create 完整創(chuàng)建1個被觀察者對象(Observable) just 快速...
    yswheye閱讀 9,631評論 1 15
  • 注:只包含標準包中的操作符格侯,用于個人學習及備忘參考博客:http://blog.csdn.net/maplejaw...
    小白要超神閱讀 940評論 0 3