優(yōu)美的異步 --- RxAndroid
這里和大家一起分享一下一個著名的Android異步庫RxAndroid。它應(yīng)該是2016年最流行的開源庫之一椎工。RxAndroid起源于RxJava钙姊,是一個專門針對Android版本的Rxjava庫送浊。RxAndroid-Github 目前最新的版本是v2.0.x我們今天的分享也基于2.0版本的API博秫。
響應(yīng)式編程
什么是響應(yīng)式編程?和平常經(jīng)常聽說的面向?qū)ο缶幊毯秃瘮?shù)式編程一樣异希,響應(yīng)式編程(Reactive Programming)就是一個編程范式健盒,但是與其他編程范式不同的是它是基于數(shù)據(jù)流和變化傳播的。我們經(jīng)常在程序中這樣寫
A = B + C
A被賦值為B和C的值称簿。這時扣癣,如果我們改變B的值,A的值并不會隨之改變憨降。而如果我們運用一種機制父虑,當B或者C的值發(fā)現(xiàn)變化的時候,A的值也隨之改變授药,這樣就實現(xiàn)了響應(yīng)式士嚎。
而響應(yīng)式編程的提出,其目的就是簡化類似的操作悔叽,因此它在用戶界面編程領(lǐng)域以及基于實時系統(tǒng)的動畫方面都有廣泛的應(yīng)用莱衩。另一方面,在處理嵌套回調(diào)的異步事件娇澎,復雜的列表過濾和變換的時候也都有良好的表現(xiàn)笨蚁。
RxAndroid其實是一個響應(yīng)式編程思想的實現(xiàn)庫。也因為這樣的思想趟庄,是它在一些方面表現(xiàn)的異常優(yōu)秀括细。下面我將先用一個簡單的例子,讓大家直觀的感受一下的樣子戚啥。
網(wǎng)絡(luò)加載圖片顯示
Observable.just(getDrawableFromNet())
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Drawable>() {
@Override
public void accept(Drawable drawable) throws Exception {
((ImageView)findViewById(R.id.imageView)).setImageDrawable(drawable);
}
});
環(huán)境搭建
RxAndroid環(huán)境只需求要引入如下項目即可勒极,我們不但需要RxAndroid項目還需要RxJava項目。
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.1.5'
基礎(chǔ)知識
RxAndroid的核心就是“異步”兩個字虑鼎,其最關(guān)鍵的東西就是三個:
Observable(被觀察者)
Observer(觀察者)
Subscriber (訂閱)
Observable可以理解為事件的發(fā)送者,就好像快遞的寄出者,而這些事件就好比快遞
Observer可以理解為事件的接收者炫彩,就好像快遞的接收者
Subscriber 綁定兩者
Observable可以發(fā)出一系列的 事件匾七,這里的事件可以是任何東西,例如網(wǎng)絡(luò)請求江兢、復雜計算處理昨忆、數(shù)據(jù)庫操作、文件操作等等杉允,事件執(zhí)行結(jié)束后交給 Observer回調(diào)處理邑贴。
那他們之間是如何進行聯(lián)系的呢?答案就是通過subscribe()方法叔磷。
下面我們通過一個HelloDemo來看看Observable與Observer進行關(guān)聯(lián)的典型方式拢驾,
private void test_1() {
Observable<String> oble = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("hello");
e.onComplete();
e.onNext("hello2");
}
});
Observer<String> oser = new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.w("kaelpu","onSubscribe");
}
@Override
public void onNext(@NonNull String s) {
Log.w("kaelpu","onNext = "+s);
}
@Override
public void onError(@NonNull Throwable e) {
Log.w("kaelpu","onError" + e);
}
@Override
public void onComplete() {
Log.w("kaelpu","onComplete");
}
};
Log.w("kaelpu","subscribe");
oble.subscribe(oser);
}
10-21 01:28:01.600 11386-11386/? W/kaelpu: subscribe
10-21 01:28:01.600 11386-11386/? W/kaelpu: onSubscribe
10-21 01:28:01.600 11386-11386/? W/kaelpu: onNext = hello
10-21 01:28:01.600 11386-11386/? W/kaelpu: onComplete
其實這段代碼干了三件事:
- 創(chuàng)建被觀察者對象oble
- 創(chuàng)建觀察者oser
- 連接觀察者和被觀察者
被觀察者通過onNext函數(shù)給觀察者通知結(jié)果
被貫徹者onComplete函數(shù)通知觀察者執(zhí)行結(jié)束
連接觀察者和被觀察者我們使用subscribe函數(shù)
- 通過打印的log我們可以看到觀察者函數(shù)調(diào)用情況,調(diào)用subscribe函數(shù)去綁定觀察者和被觀察者時候,觀察者的onSubscribe函數(shù)會被回調(diào)表示建立關(guān)聯(lián)改基。
- 接著每當被觀察者調(diào)用onNext給觀察者發(fā)送數(shù)據(jù)時候繁疤,觀察者的onNext 會收到回調(diào),并且得到所發(fā)送的數(shù)據(jù)秕狰。
- 當被觀察者調(diào)用onComplete函數(shù)時候,代表著完成稠腊,觀察者的onComplete回調(diào)會被觸發(fā),并且斷開了兩者的關(guān)聯(lián)鸣哀,這時被觀察者再發(fā)送數(shù)據(jù)架忌,觀察者也不會收到。
當然我們注意到觀察者還有一個onError函數(shù)沒有被觸發(fā)過我衬,那么該怎么觸發(fā)呢叹放,又代表著什么意思呢?我們來改變一下代碼:
private String error() throws Exception {
throw new Exception();
}
添加一個函數(shù)低飒,名字隨便但是返回值是String许昨,這里我們叫做error函數(shù)。函數(shù)很簡單就是拋出一個異常褥赊。然后我們繼續(xù)修改被觀察者的代碼如下:
Observable<String> oble = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("hello");
e.onNext(error());
e.onNext("hello1");
e.onComplete();
e.onNext("hello2");
}
});
其實我們就添加了兩行糕档,添加了一個e.onNext(error()) 并且在之后還添加了一個e.onNext("hello1") 運行一下我們看看
W/kaelpu: subscribe
W/kaelpu: onSubscribe
W/kaelpu: onNexthello
W/kaelpu: onErrorjava.lang.Exception
折斷l(xiāng)og說明三個問題:
- 被觀察者onNext中是可以運行函數(shù)的
- 如果運行的函數(shù)報錯,則會調(diào)用我們觀察者的onError函數(shù)
- 當調(diào)用onError函數(shù)時候拌喉,也會斷開關(guān)聯(lián)速那,被觀察者收不到后面的數(shù)據(jù),但是觀察者依然會繼續(xù)發(fā)送尿背。
最為關(guān)鍵的是onComplete和onError必須唯一并且互斥, 即不能發(fā)多個onComplete, 也不能發(fā)多個onError, 也不能先發(fā)一個onComplete, 然后再發(fā)一個onError, 反之亦然端仰。
關(guān)于onComplete和onError唯一并且互斥這一點, 是需要自行在代碼中進行控制, 如果你的代碼邏輯中違背了這個規(guī)則, 并不一定會導致程序崩潰. 比如發(fā)送多個onComplete是可以正常運行的, 依然是收到第一個onComplete就不再接收了, 但若是發(fā)送多個onError, 則收到第二個onError事件會導致程序會崩潰.當我們寫多個onComplete時,不會報錯田藐。
除了被觀察者能斷開關(guān)聯(lián)荔烧,觀察者也能主動斷開連接吱七,調(diào)用onSubscribe函數(shù)中傳入的對象Disposable的dispose()函數(shù)即可完成斷開連接,同樣關(guān)聯(lián)斷開后鹤竭,被觀察者依然會繼續(xù)發(fā)送數(shù)據(jù)
**講到這里第一感覺是不是踊餐?**
就輸出個數(shù)字就這么麻煩,完全沒看出哪里方便了臀稚!別著急我剛開始看RxAndroid文章也是這樣的感覺吝岭,而且很多網(wǎng)上的文章都沒有解釋這個問題。所以看一會你就更暈了吧寺。別著急我一起深呼吸,來看看如何簡化操作
你可能覺得窜管,我就打印幾個數(shù),還要把Observable寫的那么麻煩稚机,能不能簡便一點呢幕帆?答案是肯定的,RxAndroid內(nèi)置了很多簡化創(chuàng)建Observable對象的函數(shù)抒钱,比如Observable.just就是用來創(chuàng)建只發(fā)出一個事件就結(jié)束的Observable對象蜓肆,上面創(chuàng)建Observable對象的代碼可以簡化為一行
Observable<String> observable = Observable.just("hello");
同樣對于Observer,這個例子中谋币,我們其實并不關(guān)心OnComplete和OnError仗扬,我們只需要在onNext的時候做一些處理,這時候就可以使用Consumer類蕾额。
Observable<String> observable = Observable.just("hello");
Consumer<String> consumer = new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
};
observable.subscribe(consumer);
其實在RxAndroid中早芭,我們可以為 Observer中的三種狀態(tài)根據(jù)自身需要分別創(chuàng)建一個回調(diào)動作,通過Action 來替代onComplete():诅蝶,通過Consumer來替代 onError(Throwable t)和onNext(T t)
Observable<String> observable = Observable.just("hello");
Action onCompleteAction = new Action() {
@Override
public void run() throws Exception {
Log.i("kaelpu", "complete");
}
};
Consumer<String> onNextConsumer = new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i("kaelpu", s);
}
};
Consumer<Throwable> onErrorConsumer = new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.i("kaelpu", "error");
}
};
observable.subscribe(onNextConsumer, onErrorConsumer, onCompleteAction);
}
subscribe()有多個重載的方法:
public final Disposable subscribe() {}
public final Disposable subscribe(Consumer<? super T> onNext) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
public final void subscribe(Observer<? super T> observer) {}
不帶任何參數(shù)的subscribe() 表示Observer不關(guān)心任何事件,Observable發(fā)送什么數(shù)據(jù)都隨你
帶有一個Consumer參數(shù)的方法表示Observer只關(guān)心onNext事件, 其他的事件我假裝沒看見, 因此我們?nèi)绻恍枰猳nNext事件可以這么寫
只要我們再本節(jié)中能明白觀察者和被觀察者之間是如何工作關(guān)聯(lián)的就可以
線程調(diào)度
關(guān)鍵的章節(jié)來了退个,看完上面的基礎(chǔ)知識,很多人都會感覺就一個發(fā)送调炬,一個接收语盈,不就是個觀察者模式嘛,感覺一點卵用都沒有缰泡,還寫這么多回調(diào)方法刀荒!完全沒有看出什么優(yōu)點。那么這一節(jié)就讓你看到RxAndroid真正厲害的地方棘钞。
正常情況下, Observer和Observable是工作在同一個線程中的, 也就是說Observable在哪個線程發(fā)事件, Observer就在哪個線程接收事件.
RxAndroid中, 當我們在主線程中去創(chuàng)建一個Observable來發(fā)送事件, 則這個Observable默認就在主線程發(fā)送事件.
當我們在主線程去創(chuàng)建一個Observer來接收事件, 則這個Observer默認就在主線程中接收事件缠借,但其實在現(xiàn)實工作中我們更多的是需要進行線程切換的,最常見的例子就是在子線程中請求網(wǎng)絡(luò)數(shù)據(jù)宜猜,在主線程中進行展示
要達到這個目的, 我們需要先改變Observable發(fā)送事件的線程, 讓它去子線程中發(fā)送事件, 然后再改變Observer的線程, 讓它去主線程接收事件. 通過RxAndroid內(nèi)置的線程調(diào)度器可以很輕松的做到這一點. 接下來看一段代碼:
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d("kaelpu", "Observable thread is : " + Thread.currentThread().getName());
Log.d("kaelpu", "emitter 1");
emitter.onNext(1);
}
});
Consumer<Integer> consumer = new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("kaelpu", "Observer thread is :" + Thread.currentThread().getName());
Log.d("kaelpu", "onNext: " + integer);
}
};
observable.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(consumer);
}
Observable thread is : RxNewThreadScheduler-1
emitter 1
Observer thread is :main
onNext: 1
可以看到, observable發(fā)送事件的線程的確改變了, 是在一個叫 RxNewThreadScheduler-1的線程中發(fā)送的事件, 而consumer 仍然在主線程中接收事件, 這說明我們的目的達成了, 接下來看看是如何做到的.
這段代碼只不過是增加了兩行代碼:
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
簡單的來說, subscribeOn() 指定的是Observable發(fā)送事件的線程, observeOn() 指定的是Observer接收事件的線程.
多次指定Observable的線程只有第一次指定的有效, 也就是說多次調(diào)用subscribeOn() 只有第一次的有效, 其余的會被忽略.
多次指定Observer的線程是可以的, 也就是說每調(diào)用一次observeOn() , Observer的線程就會切換一次.例如:
observable.subscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.observeOn(Schedulers.io())
.subscribe(consumer);
Observable thread is : RxNewThreadScheduler-1
emitter 1
Observer thread is :RxCachedThreadScheduler-2
onNext: 1
可以看到, Observable雖然指定了兩次線程, 但只有第一次指定的有效, 依然是在RxNewThreadScheduler線程中, 而Observer則跑到了RxCachedThreadScheduler 中, 這個CacheThread其實就是IO線程池中的一個.
在 RxAndroid 中泼返,提供了一個名為 Scheduler 的線程調(diào)度器,RxAndroid 內(nèi)部提供了4個調(diào)度器姨拥,分別是:
- Schedulers.io(): I/O 操作(讀寫文件绅喉、數(shù)據(jù)庫渠鸽、網(wǎng)絡(luò)請求等),與newThread()差不多霹疫,區(qū)別在于io() 的內(nèi)部實現(xiàn)是是用一個無數(shù)量上限的線程池拱绑,可以重用空閑的線程,因此多數(shù)情況下 io() 效率比 newThread() 更高丽蝎。值得注意的是,在 io() 下膀藐,不要進行大量的計算屠阻,以免產(chǎn)生不必要的線程;
- Schedulers.newThread(): 開啟新線程操作额各;
- Schedulers.immediate(): 默認指定的線程国觉,也就是當前線程;
- Schedulers.computation():計算所使用的調(diào)度器虾啦。這個計算指的是 CPU 密集型計算麻诀,即不會被 I/O等操作限制性能的操作,例如圖形的計算傲醉。這個 Scheduler 使用的固定的線程池蝇闭,大小為 CPU 核數(shù)。值得注意的是硬毕,不要把 I/O 操作放在 computation() 中呻引,否則 I/O 操作的等待時間會浪費 CPU;
- AndroidSchedulers.mainThread(): Rxndroid 擴展的 Android 主線程吐咳;
這些內(nèi)置的Scheduler已經(jīng)足夠滿足我們開發(fā)的需求, 因此我們應(yīng)該使用內(nèi)置的這些選項, 在RxAndroid內(nèi)部使用的是線程池來維護這些線程, 所有效率也比較高逻悠。
對于線程還需要注意
- create() , just() , from() 等 --- 事件產(chǎn)生
- map() , flapMap() , scan() , filter() 等 -- 事件加工
- subscribe() -- 事件消費
事件產(chǎn)生:默認運行在當前線程,可以由 subscribeOn() 自定義線程
事件加工:默認跟事件產(chǎn)生的線程保持一致, 可由 observeOn() 自定義線程
事件消費:默認運行在當前線程韭脊,可以有observeOn() 自定義
好了說了這么多了,我們來寫個簡單的異步的例子童谒,看看實際效果。我們這個例子就以加載網(wǎng)絡(luò)圖片并顯示為例:
首先我們寫一個耗時函數(shù)沪羔,用來模擬圖片請求
// 模擬網(wǎng)絡(luò)請求圖片
private Drawable getDrawableFromUrl(String url){
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return getResources().getDrawable(R.drawable.baidu);
}
代碼很簡答饥伊,就是線程sleep 6秒,然后返回一張圖片任内,如果運行在主線程那就會NAR撵渡,然后我么來用RxAndroid寫一下這個異步拉去圖片并顯是的操作!
Observable.just(getDrawableFromNet("http://www.baidu.com/icon.png"))
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Drawable>() {
@Override
public void accept(Drawable drawable) throws Exception {
((ImageView)findViewById(R.id.imageView)).setImageDrawable(drawable);
}
});
就這幾行代碼就搞定了死嗦!自己結(jié)合上面講的理解一下~這里就不做解釋了趋距!因為我們還有更重要的一個環(huán)節(jié),這個環(huán)節(jié)堪稱RxAndroid的精髓越除!
操作符的使用
在了解基本知識和線程調(diào)度后节腐,我們來學習一下RxAndroid各種神奇的操作符
Map
Map是RxAndroid中最簡單的一個變換操作符了, 它的作用就是對Observable發(fā)送的每一個事件應(yīng)用一個函數(shù), 使得每一個事件都按照指定的函數(shù)去變化外盯。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "This is result " + integer;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("kaelpu", s);
}
});
This is result 1
This is result 2
This is result 3
通過Map, 可以將Observable發(fā)來的事件轉(zhuǎn)換為任意的類型, 可以是一個Object, 也可以是一個集合,功能非常強大
例子:還是以圖片加載的例子翼雀,我們傳進來一個圖片的路徑饱苟,然后通過Map把drawble轉(zhuǎn)換成bitmap再發(fā)送給觀察者
Observable.just(getDrawableFromNet())
.map(new Function<Drawable, Bitmap>() {
@Override
public Bitmap apply(@NonNull Drawable drawable) throws Exception {
BitmapDrawable bt = (BitmapDrawable)drawable;
return bt.getBitmap();
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(Schedulers.newThread())
.subscribe(new Consumer<Bitmap>() {
@Override
public void accept(Bitmap bitmap) throws Exception {
}
});
Observable –> map變換 –> Observable
url -> drawable -> bitmap
不用到處調(diào)代碼,直接一個鏈式操作... 是不是感覺很爽狼渊!
ZIP
Zip通過一個函數(shù)將多個Observable發(fā)送的事件結(jié)合到一起箱熬,然后發(fā)送這些組合到一起的事件. 它按照嚴格的順序應(yīng)用這個函數(shù)。它只發(fā)射與發(fā)射數(shù)據(jù)項最少的那個Observable一樣多的數(shù)據(jù)狈邑。
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emitter 1");
emitter.onNext(1);
Log.d(TAG, "emitter 2");
emitter.onNext(2);
Log.d(TAG, "emitter 3");
emitter.onNext(3);
Log.d(TAG, "emitter 4");
emitter.onNext(4);
Log.d(TAG, "emit complete1");
emitter.onComplete();
}
});
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(TAG, "emitter A");
emitter.onNext("A");
Log.d(TAG, "emitter B");
emitter.onNext("B");
Log.d(TAG, "emitter C");
emitter.onNext("C");
Log.d(TAG, "emitter complete2");
emitter.onComplete();
}
});
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return integer + s;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String value) {
Log.d(TAG, "onNext: " + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
我們分別創(chuàng)建了observable, 一個發(fā)送1,2,3,4,Complete, 另一個發(fā)送A,B,C,Complete, 接著用Zip把發(fā)出的事件組合, 來看看運行結(jié)果吧:
onSubscribe
emitter 1
emitter 2
emitter 3
emitter 4
emit complete1
emitter A
onNext: 1A
emitter B
onNext: 2B
emitter C
onNext: 3C
emitter complete2
onComplete
觀察發(fā)現(xiàn)observable1發(fā)送事件后城须,observable2才發(fā)送
這是因為我們兩個observable都是運行在同一個線程里, 同一個線程里執(zhí)行代碼肯定有先后順序呀.
from
在Rxndroid的from操作符到2.0已經(jīng)被拆分成了3個,fromArray, fromIterable, fromFuture接收一個集合作為輸入米苹,然后每次輸出一個元素給subscriber糕伐。
Observable.fromArray(new Integer[]{1, 2, 3, 4, 5}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i(TAG, "number:" + integer);
}
});
number:1
number:2
number:3
number:4
number:5
注意:如果from()里面執(zhí)行了耗時操作,即使使用了subscribeOn(Schedulers.io())蘸嘶,仍然是在主線程執(zhí)行良瞧,可能會造成界面卡頓甚至崩潰,所以耗時操作還是使用Observable.create(…);
filter
條件過濾训唱,去除不符合某些條件的事件褥蚯。舉個栗子:
Observable.fromArray(new Integer[]{1, 2, 3, 4, 5})
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
// 偶數(shù)返回true,則表示剔除奇數(shù)雪情,留下偶數(shù)
return integer % 2 == 0;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i(TAG, "number:" + integer);
}
});
number:2
number:4
take
最多保留的事件數(shù)遵岩。
Observable.just("1", "2", "6", "3", "4", "5").take(2).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String value) {
Log.d(TAG,value);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
1
2
可以發(fā)現(xiàn)我們發(fā)送了6個String,最后只打印了前兩個巡通,這就是take過濾掉的結(jié)果
doOnNext
如果你想在處理下一個事件之前做某些事尘执,就可以調(diào)用該方法
Observable.fromArray(new Integer[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}).filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
// 偶數(shù)返回true,則表示剔除奇數(shù)
return integer % 2 == 0;
}
})// 最多保留三個宴凉,也就是最后剩三個偶數(shù)
.take(3).doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
// 在輸出偶數(shù)之前輸出它的hashCode
Log.i(TAG, "hahcode = " + integer.hashCode() + "");
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.i(TAG, "number = " + value);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
hahcode = 2
number = 2
hahcode = 4
number = 4
hahcode = 6
number = 6
針對Android的一些擴展
RxAndroid是RxJava的一個針對Android平臺的擴展誊锭。它包含了一些能夠簡化Android開發(fā)的工具。
首先弥锄,AndroidSchedulers提供了針對Android的線程系統(tǒng)的調(diào)度器丧靡。需要在UI線程中運行某些代碼?很簡單籽暇,只需要使用AndroidSchedulers.mainThread():
retrofitService.getImage(url)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(bitmap -> myImageView.setImageBitmap(bitmap));
接著要介紹的就是AndroidObservable温治,它提供了跟多的功能來配合Android的生命周期。bindActivity()和bindFragment()方法默認使用AndroidSchedulers.mainThread()來執(zhí)行觀察者代碼戒悠,這兩個方法會在Activity或者Fragment結(jié)束的時候通知被觀察者停止發(fā)出新的消息熬荆。
AndroidObservable.bindActivity(this, retrofitService.getImage(url))
.subscribeOn(Schedulers.io())
.subscribe(bitmap -> myImageView.setImageBitmap(bitmap);
我自己也很喜歡AndroidObservable.fromBroadcast()方法,它允許你創(chuàng)建一個類似BroadcastReceiver的Observable對象绸狐。下面的例子展示了如何在網(wǎng)絡(luò)變化的時候被通知到:
IntentFilter filter = new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION);
AndroidObservable.fromBroadcast(context, filter)
.subscribe(intent -> handleConnectivityChange(intent));
最后要介紹的是ViewObservable,使用它可以給View添加了一些綁定卤恳。如果你想在每次點擊view的時候都收到一個事件累盗,可以使用ViewObservable.clicks(),或者你想監(jiān)聽TextView的內(nèi)容變化突琳,可以使用ViewObservable.text()
ViewObservable.clicks(mCardNameEditText, false)
.subscribe(view -> handleClick(view));
RxAndroid的一些使用場景
這里總結(jié)了一些很合適使用RxAndroid的場景若债,供大家打開腦洞~分享時候有時間給大家看看demo
- 界面需要等到多個接口并發(fā)取完數(shù)據(jù),再更新
Observable<String> observable1 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("haha");
}
}).subscribeOn(Schedulers.newThread());
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("hehe");
}
}).subscribeOn(Schedulers.newThread());
Observable.merge(observable1, observable2)
.subscribeOn(Schedulers.newThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String value) {
Log.d(TAG,value);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
- 界面按鈕需要防止連續(xù)點擊的情況
RxView.clicks(button)
.throttleFirst(1, TimeUnit.SECONDS)
.subscribe(new Observer<Object>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Object o) {
Log.i(TAG, "do clicked!");
}
});
- 響應(yīng)式的界面 比如勾選了某個checkbox拆融,自動更新對應(yīng)的preference
SharedPreferences preferences = PreferenceManager.getDefaultSharedPreferences(context);
RxSharedPreferences rxPreferences = RxSharedPreferences.create(preferences);
Preference<String> username = rxPreferences.getString("username");
Preference<Boolean> showWhatsNew = rxPreferences.getBoolean("show-whats-new", true);
username.asObservable().subscribe(new Action1<String>() {
@Override public void call(String username) {
Log.d(TAG, "Username: " + username); 讀取到當前值
}
}
RxCompoundButton.checks(showWhatsNewView)
.subscribe(showWhatsNew.asAction());
最后的話
通過本篇文章蠢琳,大家應(yīng)該對RxAndroid有個大體的認識了,也應(yīng)該體會到它在異步操作镜豹,代碼鏈式書寫等方面的優(yōu)勢了挪凑。需要注意的是由于RxJava存在理解的門檻,貿(mào)然引入項目要確保協(xié)同開發(fā)的人員也都對Rxjava有所了解~
[參考資料]