優(yōu)美的異步 --- RxAndroid

優(yōu)美的異步 --- RxAndroid

這里和大家一起分享一下一個著名的Android異步庫RxAndroid。它應(yīng)該是2016年最流行的開源庫之一椎工。RxAndroid起源于RxJava钙姊,是一個專門針對Android版本的Rxjava庫送浊。RxAndroid-Github 目前最新的版本是v2.0.x我們今天的分享也基于2.0版本的API博秫。

響應(yīng)式編程

什么是響應(yīng)式編程?和平常經(jīng)常聽說的面向?qū)ο缶幊毯秃瘮?shù)式編程一樣异希,響應(yīng)式編程(Reactive Programming)就是一個編程范式健盒,但是與其他編程范式不同的是它是基于數(shù)據(jù)流和變化傳播的。我們經(jīng)常在程序中這樣寫

A = B + C

A被賦值為B和C的值称簿。這時扣癣,如果我們改變B的值,A的值并不會隨之改變憨降。而如果我們運用一種機制父虑,當B或者C的值發(fā)現(xiàn)變化的時候,A的值也隨之改變授药,這樣就實現(xiàn)了響應(yīng)式士嚎。
而響應(yīng)式編程的提出,其目的就是簡化類似的操作悔叽,因此它在用戶界面編程領(lǐng)域以及基于實時系統(tǒng)的動畫方面都有廣泛的應(yīng)用莱衩。另一方面,在處理嵌套回調(diào)的異步事件娇澎,復雜的列表過濾和變換的時候也都有良好的表現(xiàn)笨蚁。
RxAndroid其實是一個響應(yīng)式編程思想的實現(xiàn)庫。也因為這樣的思想趟庄,是它在一些方面表現(xiàn)的異常優(yōu)秀括细。下面我將先用一個簡單的例子,讓大家直觀的感受一下的樣子戚啥。

網(wǎng)絡(luò)加載圖片顯示
Observable.just(getDrawableFromNet())
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<Drawable>() {
            @Override
            public void accept(Drawable drawable) throws Exception {
                ((ImageView)findViewById(R.id.imageView)).setImageDrawable(drawable);
            }
        });

環(huán)境搭建

RxAndroid環(huán)境只需求要引入如下項目即可勒极,我們不但需要RxAndroid項目還需要RxJava項目。

compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.1.5'

基礎(chǔ)知識

RxAndroid的核心就是“異步”兩個字虑鼎,其最關(guān)鍵的東西就是三個:

Observable(被觀察者)
Observer(觀察者)
Subscriber (訂閱)
Observable可以理解為事件的發(fā)送者,就好像快遞的寄出者,而這些事件就好比快遞
Observer可以理解為事件的接收者炫彩,就好像快遞的接收者

Subscriber 綁定兩者

Observable可以發(fā)出一系列的 事件匾七,這里的事件可以是任何東西,例如網(wǎng)絡(luò)請求江兢、復雜計算處理昨忆、數(shù)據(jù)庫操作、文件操作等等杉允,事件執(zhí)行結(jié)束后交給 Observer回調(diào)處理邑贴。

那他們之間是如何進行聯(lián)系的呢?答案就是通過subscribe()方法叔磷。
下面我們通過一個HelloDemo來看看Observable與Observer進行關(guān)聯(lián)的典型方式拢驾,

  private void test_1() {
        Observable<String> oble = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                e.onNext("hello");
                e.onComplete();
                e.onNext("hello2");

            }
        });

        Observer<String> oser = new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Log.w("kaelpu","onSubscribe");
            }

            @Override
            public void onNext(@NonNull String s) {
                Log.w("kaelpu","onNext = "+s);
            }

            @Override
            public void onError(@NonNull Throwable e) {
                Log.w("kaelpu","onError" + e);
            }

            @Override
            public void onComplete() {
                Log.w("kaelpu","onComplete");
            }
        };

        Log.w("kaelpu","subscribe");
        oble.subscribe(oser);

    }

10-21 01:28:01.600 11386-11386/? W/kaelpu: subscribe
10-21 01:28:01.600 11386-11386/? W/kaelpu: onSubscribe
10-21 01:28:01.600 11386-11386/? W/kaelpu: onNext = hello
10-21 01:28:01.600 11386-11386/? W/kaelpu: onComplete

其實這段代碼干了三件事:

  1. 創(chuàng)建被觀察者對象oble
  2. 創(chuàng)建觀察者oser
  3. 連接觀察者和被觀察者

被觀察者通過onNext函數(shù)給觀察者通知結(jié)果
被貫徹者onComplete函數(shù)通知觀察者執(zhí)行結(jié)束
連接觀察者和被觀察者我們使用subscribe函數(shù)

  • 通過打印的log我們可以看到觀察者函數(shù)調(diào)用情況,調(diào)用subscribe函數(shù)去綁定觀察者和被觀察者時候,觀察者的onSubscribe函數(shù)會被回調(diào)表示建立關(guān)聯(lián)改基。
  • 接著每當被觀察者調(diào)用onNext給觀察者發(fā)送數(shù)據(jù)時候繁疤,觀察者的onNext 會收到回調(diào),并且得到所發(fā)送的數(shù)據(jù)秕狰。
  • 當被觀察者調(diào)用onComplete函數(shù)時候,代表著完成稠腊,觀察者的onComplete回調(diào)會被觸發(fā),并且斷開了兩者的關(guān)聯(lián)鸣哀,這時被觀察者再發(fā)送數(shù)據(jù)架忌,觀察者也不會收到。

當然我們注意到觀察者還有一個onError函數(shù)沒有被觸發(fā)過我衬,那么該怎么觸發(fā)呢叹放,又代表著什么意思呢?我們來改變一下代碼:

    private String error() throws Exception {
        throw new Exception();
    }

添加一個函數(shù)低飒,名字隨便但是返回值是String许昨,這里我們叫做error函數(shù)。函數(shù)很簡單就是拋出一個異常褥赊。然后我們繼續(xù)修改被觀察者的代碼如下:

  Observable<String> oble = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                e.onNext("hello");
                e.onNext(error());
                e.onNext("hello1");
                e.onComplete();
                e.onNext("hello2");

            }
        });

其實我們就添加了兩行糕档,添加了一個e.onNext(error()) 并且在之后還添加了一個e.onNext("hello1") 運行一下我們看看

W/kaelpu: subscribe
W/kaelpu: onSubscribe
W/kaelpu: onNexthello
W/kaelpu: onErrorjava.lang.Exception

折斷l(xiāng)og說明三個問題:

  1. 被觀察者onNext中是可以運行函數(shù)的
  2. 如果運行的函數(shù)報錯,則會調(diào)用我們觀察者的onError函數(shù)
  3. 當調(diào)用onError函數(shù)時候拌喉,也會斷開關(guān)聯(lián)速那,被觀察者收不到后面的數(shù)據(jù),但是觀察者依然會繼續(xù)發(fā)送尿背。

最為關(guān)鍵的是onComplete和onError必須唯一并且互斥, 即不能發(fā)多個onComplete, 也不能發(fā)多個onError, 也不能先發(fā)一個onComplete, 然后再發(fā)一個onError, 反之亦然端仰。

關(guān)于onComplete和onError唯一并且互斥這一點, 是需要自行在代碼中進行控制, 如果你的代碼邏輯中違背了這個規(guī)則, 并不一定會導致程序崩潰. 比如發(fā)送多個onComplete是可以正常運行的, 依然是收到第一個onComplete就不再接收了, 但若是發(fā)送多個onError, 則收到第二個onError事件會導致程序會崩潰.當我們寫多個onComplete時,不會報錯田藐。

除了被觀察者能斷開關(guān)聯(lián)荔烧,觀察者也能主動斷開連接吱七,調(diào)用onSubscribe函數(shù)中傳入的對象Disposable的dispose()函數(shù)即可完成斷開連接,同樣關(guān)聯(lián)斷開后鹤竭,被觀察者依然會繼續(xù)發(fā)送數(shù)據(jù)

**講到這里第一感覺是不是踊餐?**

Paste_Image.png

就輸出個數(shù)字就這么麻煩,完全沒看出哪里方便了臀稚!別著急我剛開始看RxAndroid文章也是這樣的感覺吝岭,而且很多網(wǎng)上的文章都沒有解釋這個問題。所以看一會你就更暈了吧寺。別著急我一起深呼吸,來看看如何簡化操作

你可能覺得窜管,我就打印幾個數(shù),還要把Observable寫的那么麻煩稚机,能不能簡便一點呢幕帆?答案是肯定的,RxAndroid內(nèi)置了很多簡化創(chuàng)建Observable對象的函數(shù)抒钱,比如Observable.just就是用來創(chuàng)建只發(fā)出一個事件就結(jié)束的Observable對象蜓肆,上面創(chuàng)建Observable對象的代碼可以簡化為一行

        Observable<String> observable = Observable.just("hello");

同樣對于Observer,這個例子中谋币,我們其實并不關(guān)心OnComplete和OnError仗扬,我們只需要在onNext的時候做一些處理,這時候就可以使用Consumer類蕾额。

        Observable<String> observable = Observable.just("hello");
        Consumer<String> consumer = new Consumer<String>() {
           @Override
           public void accept(String s) throws Exception {
               System.out.println(s);
           }
        };
        observable.subscribe(consumer);

其實在RxAndroid中早芭,我們可以為 Observer中的三種狀態(tài)根據(jù)自身需要分別創(chuàng)建一個回調(diào)動作,通過Action 來替代onComplete():诅蝶,通過Consumer來替代 onError(Throwable t)和onNext(T t)

Observable<String> observable = Observable.just("hello");
    Action onCompleteAction = new Action() {
        @Override
        public void run() throws Exception {
            Log.i("kaelpu", "complete");
        }
    };
    Consumer<String> onNextConsumer = new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.i("kaelpu", s);
        }
    };
    Consumer<Throwable> onErrorConsumer = new Consumer<Throwable>() {
        @Override
        public void accept(Throwable throwable) throws Exception {
            Log.i("kaelpu", "error");
        }
    };
    observable.subscribe(onNextConsumer, onErrorConsumer, onCompleteAction);

}

subscribe()有多個重載的方法:

 public final Disposable subscribe() {}
 public final Disposable subscribe(Consumer<? super T> onNext) {}
 public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {} 
 public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
 public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
 public final void subscribe(Observer<? super T> observer) {}

不帶任何參數(shù)的subscribe() 表示Observer不關(guān)心任何事件,Observable發(fā)送什么數(shù)據(jù)都隨你
帶有一個Consumer參數(shù)的方法表示Observer只關(guān)心onNext事件, 其他的事件我假裝沒看見, 因此我們?nèi)绻恍枰猳nNext事件可以這么寫

只要我們再本節(jié)中能明白觀察者和被觀察者之間是如何工作關(guān)聯(lián)的就可以


線程調(diào)度

關(guān)鍵的章節(jié)來了退个,看完上面的基礎(chǔ)知識,很多人都會感覺就一個發(fā)送调炬,一個接收语盈,不就是個觀察者模式嘛,感覺一點卵用都沒有缰泡,還寫這么多回調(diào)方法刀荒!完全沒有看出什么優(yōu)點。那么這一節(jié)就讓你看到RxAndroid真正厲害的地方棘钞。

正常情況下, Observer和Observable是工作在同一個線程中的, 也就是說Observable在哪個線程發(fā)事件, Observer就在哪個線程接收事件.
RxAndroid中, 當我們在主線程中去創(chuàng)建一個Observable來發(fā)送事件, 則這個Observable默認就在主線程發(fā)送事件.
當我們在主線程去創(chuàng)建一個Observer來接收事件, 則這個Observer默認就在主線程中接收事件缠借,但其實在現(xiàn)實工作中我們更多的是需要進行線程切換的,最常見的例子就是在子線程中請求網(wǎng)絡(luò)數(shù)據(jù)宜猜,在主線程中進行展示

要達到這個目的, 我們需要先改變Observable發(fā)送事件的線程, 讓它去子線程中發(fā)送事件, 然后再改變Observer的線程, 讓它去主線程接收事件. 通過RxAndroid內(nèi)置的線程調(diào)度器可以很輕松的做到這一點. 接下來看一段代碼:

Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            Log.d("kaelpu", "Observable thread is : " + Thread.currentThread().getName());
            Log.d("kaelpu", "emitter 1");
            emitter.onNext(1);
        }
    });

    Consumer<Integer> consumer = new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.d("kaelpu", "Observer thread is :" + Thread.currentThread().getName());
            Log.d("kaelpu", "onNext: " + integer);
        }
    };

    observable.subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(consumer);
}

Observable thread is : RxNewThreadScheduler-1
emitter 1
Observer thread is :main
onNext: 1

可以看到, observable發(fā)送事件的線程的確改變了, 是在一個叫 RxNewThreadScheduler-1的線程中發(fā)送的事件, 而consumer 仍然在主線程中接收事件, 這說明我們的目的達成了, 接下來看看是如何做到的.

這段代碼只不過是增加了兩行代碼:

.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())

簡單的來說, subscribeOn() 指定的是Observable發(fā)送事件的線程, observeOn() 指定的是Observer接收事件的線程.
多次指定Observable的線程只有第一次指定的有效, 也就是說多次調(diào)用subscribeOn() 只有第一次的有效, 其余的會被忽略.
多次指定Observer的線程是可以的, 也就是說每調(diào)用一次observeOn() , Observer的線程就會切換一次.例如:

observable.subscribeOn(Schedulers.newThread())
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .observeOn(Schedulers.io())
        .subscribe(consumer);

Observable thread is : RxNewThreadScheduler-1
emitter 1
Observer thread is :RxCachedThreadScheduler-2
onNext: 1

可以看到, Observable雖然指定了兩次線程, 但只有第一次指定的有效, 依然是在RxNewThreadScheduler線程中, 而Observer則跑到了RxCachedThreadScheduler 中, 這個CacheThread其實就是IO線程池中的一個.
在 RxAndroid 中泼返,提供了一個名為 Scheduler 的線程調(diào)度器,RxAndroid 內(nèi)部提供了4個調(diào)度器姨拥,分別是:

  • Schedulers.io(): I/O 操作(讀寫文件绅喉、數(shù)據(jù)庫渠鸽、網(wǎng)絡(luò)請求等),與newThread()差不多霹疫,區(qū)別在于io() 的內(nèi)部實現(xiàn)是是用一個無數(shù)量上限的線程池拱绑,可以重用空閑的線程,因此多數(shù)情況下 io() 效率比 newThread() 更高丽蝎。值得注意的是,在 io() 下膀藐,不要進行大量的計算屠阻,以免產(chǎn)生不必要的線程;
  • Schedulers.newThread(): 開啟新線程操作额各;
  • Schedulers.immediate(): 默認指定的線程国觉,也就是當前線程;
  • Schedulers.computation():計算所使用的調(diào)度器虾啦。這個計算指的是 CPU 密集型計算麻诀,即不會被 I/O等操作限制性能的操作,例如圖形的計算傲醉。這個 Scheduler 使用的固定的線程池蝇闭,大小為 CPU 核數(shù)。值得注意的是硬毕,不要把 I/O 操作放在 computation() 中呻引,否則 I/O 操作的等待時間會浪費 CPU;
  • AndroidSchedulers.mainThread(): Rxndroid 擴展的 Android 主線程吐咳;

這些內(nèi)置的Scheduler已經(jīng)足夠滿足我們開發(fā)的需求, 因此我們應(yīng)該使用內(nèi)置的這些選項, 在RxAndroid內(nèi)部使用的是線程池來維護這些線程, 所有效率也比較高逻悠。

對于線程還需要注意

  • create() , just() , from() 等 --- 事件產(chǎn)生
  • map() , flapMap() , scan() , filter() 等 -- 事件加工
  • subscribe() -- 事件消費

事件產(chǎn)生:默認運行在當前線程,可以由 subscribeOn() 自定義線程
事件加工:默認跟事件產(chǎn)生的線程保持一致, 可由 observeOn() 自定義線程
事件消費:默認運行在當前線程韭脊,可以有observeOn() 自定義

好了說了這么多了,我們來寫個簡單的異步的例子童谒,看看實際效果。我們這個例子就以加載網(wǎng)絡(luò)圖片并顯示為例:
首先我們寫一個耗時函數(shù)沪羔,用來模擬圖片請求

    // 模擬網(wǎng)絡(luò)請求圖片
    private Drawable getDrawableFromUrl(String url){
        try {
            Thread.sleep(6000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return getResources().getDrawable(R.drawable.baidu);
    }

代碼很簡答饥伊,就是線程sleep 6秒,然后返回一張圖片任内,如果運行在主線程那就會NAR撵渡,然后我么來用RxAndroid寫一下這個異步拉去圖片并顯是的操作!

Observable.just(getDrawableFromNet("http://www.baidu.com/icon.png"))
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<Drawable>() {
            @Override
            public void accept(Drawable drawable) throws Exception {
                ((ImageView)findViewById(R.id.imageView)).setImageDrawable(drawable);
            }
        });

就這幾行代碼就搞定了死嗦!自己結(jié)合上面講的理解一下~這里就不做解釋了趋距!因為我們還有更重要的一個環(huán)節(jié),這個環(huán)節(jié)堪稱RxAndroid的精髓越除!

操作符的使用

在了解基本知識和線程調(diào)度后节腐,我們來學習一下RxAndroid各種神奇的操作符

Map
Map是RxAndroid中最簡單的一個變換操作符了, 它的作用就是對Observable發(fā)送的每一個事件應(yīng)用一個函數(shù), 使得每一個事件都按照指定的函數(shù)去變化外盯。

Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
        }
    }).map(new Function<Integer, String>() {
        @Override
        public String apply(Integer integer) throws Exception {
            return "This is result " + integer;
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.d("kaelpu", s);
        }
    });

This is result 1
This is result 2
This is result 3

通過Map, 可以將Observable發(fā)來的事件轉(zhuǎn)換為任意的類型, 可以是一個Object, 也可以是一個集合,功能非常強大

例子:還是以圖片加載的例子翼雀,我們傳進來一個圖片的路徑饱苟,然后通過Map把drawble轉(zhuǎn)換成bitmap再發(fā)送給觀察者

Observable.just(getDrawableFromNet())
        .map(new Function<Drawable, Bitmap>() {
            @Override
            public Bitmap apply(@NonNull Drawable drawable) throws Exception {
                BitmapDrawable bt = (BitmapDrawable)drawable;
                return bt.getBitmap();
            }
        })
        .subscribeOn(AndroidSchedulers.mainThread())
        .observeOn(Schedulers.newThread())
        .subscribe(new Consumer<Bitmap>() {
            @Override
            public void accept(Bitmap bitmap) throws Exception {

            }
        });

Observable –> map變換 –> Observable
url -> drawable -> bitmap

不用到處調(diào)代碼,直接一個鏈式操作... 是不是感覺很爽狼渊!

ZIP
Zip通過一個函數(shù)將多個Observable發(fā)送的事件結(jié)合到一起箱熬,然后發(fā)送這些組合到一起的事件. 它按照嚴格的順序應(yīng)用這個函數(shù)。它只發(fā)射與發(fā)射數(shù)據(jù)項最少的那個Observable一樣多的數(shù)據(jù)狈邑。

Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            Log.d(TAG, "emitter 1");
            emitter.onNext(1);
            Log.d(TAG, "emitter 2");
            emitter.onNext(2);
            Log.d(TAG, "emitter 3");
            emitter.onNext(3);
            Log.d(TAG, "emitter 4");
            emitter.onNext(4);
            Log.d(TAG, "emit complete1");
            emitter.onComplete();
        }
    });

    Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            Log.d(TAG, "emitter A");
            emitter.onNext("A");
            Log.d(TAG, "emitter B");
            emitter.onNext("B");
            Log.d(TAG, "emitter C");
            emitter.onNext("C");
            Log.d(TAG, "emitter complete2");
            emitter.onComplete();
        }
    });

    Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
        @Override
        public String apply(Integer integer, String s) throws Exception {
            return integer + s;
        }
    }).subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "onSubscribe");
        }

        @Override
        public void onNext(String value) {
            Log.d(TAG, "onNext: " + value);
        }

        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "onError");
        }

        @Override
        public void onComplete() {
            Log.d(TAG, "onComplete");
        }
    });

我們分別創(chuàng)建了observable, 一個發(fā)送1,2,3,4,Complete, 另一個發(fā)送A,B,C,Complete, 接著用Zip把發(fā)出的事件組合, 來看看運行結(jié)果吧:

onSubscribe
emitter 1
emitter 2
emitter 3
emitter 4
emit complete1
emitter A
onNext: 1A
emitter B
onNext: 2B
emitter C
onNext: 3C
emitter complete2
onComplete

觀察發(fā)現(xiàn)observable1發(fā)送事件后城须,observable2才發(fā)送
這是因為我們兩個observable都是運行在同一個線程里, 同一個線程里執(zhí)行代碼肯定有先后順序呀.

from
在Rxndroid的from操作符到2.0已經(jīng)被拆分成了3個,fromArray, fromIterable, fromFuture接收一個集合作為輸入米苹,然后每次輸出一個元素給subscriber糕伐。

Observable.fromArray(new Integer[]{1, 2, 3, 4, 5}).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.i(TAG, "number:" + integer);
    }
});

number:1
number:2
number:3
number:4
number:5

注意:如果from()里面執(zhí)行了耗時操作,即使使用了subscribeOn(Schedulers.io())蘸嘶,仍然是在主線程執(zhí)行良瞧,可能會造成界面卡頓甚至崩潰,所以耗時操作還是使用Observable.create(…);

filter
條件過濾训唱,去除不符合某些條件的事件褥蚯。舉個栗子:

Observable.fromArray(new Integer[]{1, 2, 3, 4, 5})
       .filter(new Predicate<Integer>() {
           @Override
           public boolean test(Integer integer) throws Exception {
               // 偶數(shù)返回true,則表示剔除奇數(shù)雪情,留下偶數(shù)
               return integer % 2 == 0;

           }
       }).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.i(TAG, "number:" + integer);
    }
});

number:2
number:4

take
最多保留的事件數(shù)遵岩。

 Observable.just("1", "2", "6", "3", "4", "5").take(2).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String value) {
                Log.d(TAG,value);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

1
2

可以發(fā)現(xiàn)我們發(fā)送了6個String,最后只打印了前兩個巡通,這就是take過濾掉的結(jié)果

doOnNext
如果你想在處理下一個事件之前做某些事尘执,就可以調(diào)用該方法

Observable.fromArray(new Integer[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}).filter(new Predicate<Integer>() {
    @Override
    public boolean test(Integer integer) throws Exception {
        // 偶數(shù)返回true,則表示剔除奇數(shù)
        return integer % 2 == 0;
    }
})// 最多保留三個宴凉,也就是最后剩三個偶數(shù)
        .take(3).doOnNext(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        // 在輸出偶數(shù)之前輸出它的hashCode
        Log.i(TAG, "hahcode = " + integer.hashCode() + "");
    }
}).subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(Integer value) {
        Log.i(TAG, "number = " + value);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});

hahcode = 2
number = 2
hahcode = 4
number = 4
hahcode = 6
number = 6


針對Android的一些擴展

RxAndroid是RxJava的一個針對Android平臺的擴展誊锭。它包含了一些能夠簡化Android開發(fā)的工具。
首先弥锄,AndroidSchedulers提供了針對Android的線程系統(tǒng)的調(diào)度器丧靡。需要在UI線程中運行某些代碼?很簡單籽暇,只需要使用AndroidSchedulers.mainThread():

retrofitService.getImage(url)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(bitmap -> myImageView.setImageBitmap(bitmap));

接著要介紹的就是AndroidObservable温治,它提供了跟多的功能來配合Android的生命周期。bindActivity()和bindFragment()方法默認使用AndroidSchedulers.mainThread()來執(zhí)行觀察者代碼戒悠,這兩個方法會在Activity或者Fragment結(jié)束的時候通知被觀察者停止發(fā)出新的消息熬荆。

AndroidObservable.bindActivity(this, retrofitService.getImage(url))
    .subscribeOn(Schedulers.io())
    .subscribe(bitmap -> myImageView.setImageBitmap(bitmap);

我自己也很喜歡AndroidObservable.fromBroadcast()方法,它允許你創(chuàng)建一個類似BroadcastReceiver的Observable對象绸狐。下面的例子展示了如何在網(wǎng)絡(luò)變化的時候被通知到:

IntentFilter filter = new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION);
AndroidObservable.fromBroadcast(context, filter)
    .subscribe(intent -> handleConnectivityChange(intent));

最后要介紹的是ViewObservable,使用它可以給View添加了一些綁定卤恳。如果你想在每次點擊view的時候都收到一個事件累盗,可以使用ViewObservable.clicks(),或者你想監(jiān)聽TextView的內(nèi)容變化突琳,可以使用ViewObservable.text()

ViewObservable.clicks(mCardNameEditText, false)
    .subscribe(view -> handleClick(view));

RxAndroid的一些使用場景

這里總結(jié)了一些很合適使用RxAndroid的場景若债,供大家打開腦洞~分享時候有時間給大家看看demo

  1. 界面需要等到多個接口并發(fā)取完數(shù)據(jù),再更新
Observable<String> observable1 = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            e.onNext("haha");
        }
    }).subscribeOn(Schedulers.newThread());

    Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            e.onNext("hehe");
        }
    }).subscribeOn(Schedulers.newThread());


    Observable.merge(observable1, observable2)
            .subscribeOn(Schedulers.newThread())
            .subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(String value) {
                    Log.d(TAG,value);
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {

                }
            });
  1. 界面按鈕需要防止連續(xù)點擊的情況
RxView.clicks(button)
        .throttleFirst(1, TimeUnit.SECONDS)
        .subscribe(new Observer<Object>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Object o) {
                Log.i(TAG, "do clicked!");
            }
        });
  1. 響應(yīng)式的界面 比如勾選了某個checkbox拆融,自動更新對應(yīng)的preference

SharedPreferences preferences = PreferenceManager.getDefaultSharedPreferences(context);
RxSharedPreferences rxPreferences = RxSharedPreferences.create(preferences);

Preference<String> username = rxPreferences.getString("username");
Preference<Boolean> showWhatsNew = rxPreferences.getBoolean("show-whats-new", true);

username.asObservable().subscribe(new Action1<String>() {
  @Override public void call(String username) {
    Log.d(TAG, "Username: " + username);  讀取到當前值
  }
}

RxCompoundButton.checks(showWhatsNewView)
    .subscribe(showWhatsNew.asAction());

最后的話

通過本篇文章蠢琳,大家應(yīng)該對RxAndroid有個大體的認識了,也應(yīng)該體會到它在異步操作镜豹,代碼鏈式書寫等方面的優(yōu)勢了挪凑。需要注意的是由于RxJava存在理解的門檻,貿(mào)然引入項目要確保協(xié)同開發(fā)的人員也都對Rxjava有所了解~


[參考資料]

  1. 響應(yīng)式編程簡介
  2. 深入淺出RxJava
  3. RxPreferences 簡單整理
  4. RxBinding安卓UI響應(yīng)式編程
  5. 給 Android 開發(fā)者的 RxJava 詳解
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末逛艰,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子搞旭,更是在濱河造成了極大的恐慌散怖,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,490評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件肄渗,死亡現(xiàn)場離奇詭異镇眷,居然都是意外死亡,警方通過查閱死者的電腦和手機翎嫡,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,581評論 3 395
  • 文/潘曉璐 我一進店門欠动,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人惑申,你說我怎么就攤上這事具伍。” “怎么了圈驼?”我有些...
    開封第一講書人閱讀 165,830評論 0 356
  • 文/不壞的土叔 我叫張陵人芽,是天一觀的道長。 經(jīng)常有香客問我绩脆,道長萤厅,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,957評論 1 295
  • 正文 為了忘掉前任靴迫,我火速辦了婚禮惕味,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘玉锌。我一直安慰自己名挥,他們只是感情好,可當我...
    茶點故事閱讀 67,974評論 6 393
  • 文/花漫 我一把揭開白布芬沉。 她就那樣靜靜地躺著躺同,像睡著了一般阁猜。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上蹋艺,一...
    開封第一講書人閱讀 51,754評論 1 307
  • 那天剃袍,我揣著相機與錄音,去河邊找鬼捎谨。 笑死民效,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的涛救。 我是一名探鬼主播畏邢,決...
    沈念sama閱讀 40,464評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼检吆!你這毒婦竟也來了舒萎?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,357評論 0 276
  • 序言:老撾萬榮一對情侶失蹤蹭沛,失蹤者是張志新(化名)和其女友劉穎臂寝,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體摊灭,經(jīng)...
    沈念sama閱讀 45,847評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡咆贬,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,995評論 3 338
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了帚呼。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片掏缎。...
    茶點故事閱讀 40,137評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖煤杀,靈堂內(nèi)的尸體忽然破棺而出眷蜈,到底是詐尸還是另有隱情,我是刑警寧澤怜珍,帶...
    沈念sama閱讀 35,819評論 5 346
  • 正文 年R本政府宣布端蛆,位于F島的核電站,受9級特大地震影響酥泛,放射性物質(zhì)發(fā)生泄漏今豆。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,482評論 3 331
  • 文/蒙蒙 一柔袁、第九天 我趴在偏房一處隱蔽的房頂上張望呆躲。 院中可真熱鬧,春花似錦捶索、人聲如沸插掂。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,023評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽辅甥。三九已至酝润,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間璃弄,已是汗流浹背要销。 一陣腳步聲響...
    開封第一講書人閱讀 33,149評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留夏块,地道東北人疏咐。 一個月前我還...
    沈念sama閱讀 48,409評論 3 373
  • 正文 我出身青樓,卻偏偏與公主長得像脐供,于是被迫代替她去往敵國和親侥涵。 傳聞我的和親對象是個殘疾皇子负乡,可洞房花燭夜當晚...
    茶點故事閱讀 45,086評論 2 355

推薦閱讀更多精彩內(nèi)容