原文地址
RxAndroid Tutorial
響應(yīng)式編程(Reactive programming)不是一種API,而是一種新的非常有用的范式包各,而RxJava就是一套基于此思想的框架噪服,在Android開發(fā)中我們通過這個框架就能探索響應(yīng)式的世界埠帕,同時結(jié)合另一個庫话侄,RxAndroid,這是一個擴展庫年堆,更好的兼容了Android特性盏浇,比如主線程变丧,UI事件等。
在這篇指南中绢掰,你將會學(xué)習(xí)到以下這些內(nèi)容:
- 什么是響應(yīng)式編程
- 什么是observable
- 如何將異步事件比如按鈕點擊或者EditText字符變化轉(zhuǎn)換成observables
- observable變換
- observable 過濾攔截
- 如何指定鏈式中的代碼執(zhí)行線程
- 如何合并多個observables
前言
從 the starter project for this tutorial 可以下載這篇文章中項目的所有代碼滴劲, 可以直接在Android Studio中打開班挖。
大部分的代碼都在 CheeseActivity.java
這個類里面,繼承于 BaseSearchActivity
给梅;里面有一些基礎(chǔ)方法:
showProgressBar(): 顯示一個進度條
hideProgressBar(): 隱藏一個進度條
showResult(List<String> result): 顯示一個列表數(shù)據(jù)
mCheeseSearchEngine: CheeseSearchEngine類的一個對象动羽,內(nèi)部有一個search方法运吓,接收一個數(shù)據(jù)查詢并返回一個匹配的列表list擎场。
直接運行的話迅办,跑出來是這樣子站欺,就是一個查詢的界面:
什么是響應(yīng)式編程
在創(chuàng)建第一個observable之前纤垂,先看一下響應(yīng)式編程的理論 :]
一般的程序是這樣的峭沦,表達式只會計算一次吼鱼,然后把賦值給變量
int a = 2;
int b = 3;
int c = a * b; // c is 6
a = 10;
// c is still 6
在a重新賦值后菇肃,前面的c并不會變化取募,而響應(yīng)式編程會對值的變化做出響應(yīng)玩敏。
有時候很有可能你已經(jīng)做過一些響應(yīng)式編程旺聚,但是并沒有意識到這一點翻屈。
比如Excel中的表格,我們可以在表格里面填上一些值,同時將某個格子的值設(shè)為一個表達式厘贼,就像下面這樣
設(shè)置這個表格里面 B1區(qū)域的值為2嘴秸,B2區(qū)域的值為3岳掐,B3是一個表達式串述,B3 = B1* B2寞肖,當(dāng)其中一個值改變的時候,這個觀察者B3也會變化右蕊,如圖把B1改成10饶囚,B3就會自動計算成30坯约。
RxJava Observable
RxJava使用的是觀察者模式闹丐,其中有兩個關(guān)鍵的接口:Observable 和 Observer被因,當(dāng)Observable(被觀察的對象)狀態(tài)改變梨与,所有subscribed(訂閱)的Observer(觀察者)會收到一個通知粥鞋。
在Observable的接口中有一個方法 subscribe()
呻粹,這樣Observer 可以調(diào)用來進行訂閱等浊。
同樣,在Observer 接口中有三個方法轧飞,會被Observable 回調(diào):
- onNext(T value) 提供了一個 T 類型的item給Observer
- onComplete() 在Observable發(fā)送items結(jié)束后通知Observer
- onError(Throwable e) 當(dāng)Observable發(fā)生錯誤時通知Observer
作為一個表現(xiàn)良好的Observable,發(fā)射0到多個數(shù)據(jù)時后面都會跟上一個completion 或是error的回調(diào)掸绞。
聽起來有點復(fù)雜切黔,但是一些例子可以很清晰的解釋具篇。
一個網(wǎng)絡(luò)請求observable 通常只發(fā)射一個數(shù)據(jù)并且立刻completes驱显。
每一個圓代表了從observable 發(fā)射出去的item數(shù)據(jù)伏恐,黑色的block代表了結(jié)束或是錯誤栓霜。
一個鼠標的移動observable 將會不斷的發(fā)送鼠標當(dāng)前坐標胳蛮,并且從不會結(jié)束仅炊。
在一個observable 已經(jīng)結(jié)束后不能再發(fā)射新的item數(shù)據(jù)抚垄,下面這個就是一個不好的示范呆馁,違反了Observable 的準則
在已經(jīng)發(fā)信號結(jié)束后依然發(fā)射了一個item智哀。
怎么創(chuàng)建一個Observable
你可以直接通過 Observable.create()
創(chuàng)建一個Observable
Observable<T> create(ObservableOnSubscribe<T> source)
看起來十分的簡潔瓷叫,但是這段代碼是什么意思呢摹菠?這個 “source” 又是什么次氨? 想要理解這個煮寡,只需要知道 ObservableOnSubscribe
是什么。 這是一個接口薇组,其中有一個方法:
public interface ObservableOnSubscribe<T> {
void subscribe(ObservableEmitter<T> emitter) throws Exception;
}
這個你創(chuàng)建Observable 時的一個“source” 需要暴露一個 subscribe()
方法,從這里又引出來另一個 emitter(發(fā)射器)炭菌,那么什么又是emitter逛漫?
RxJava中的 Emitter
接口和 Observer 比較相似酌毡,都有以下方法
public interface Emitter<T> {
void onNext(T value);
void onError(Throwable error);
void onComplete();
}
ObservableEmitter
提供了一個方法用來取消訂閱阔馋,用一個實際場景來形容一下呕寝。想象一個水龍頭和水流下梢,這個管道就相當(dāng)于Observable孽江,從里面能放出水岗屏,ObservableEmitter 就相當(dāng)于是水龍頭这刷,控制開關(guān),而水龍頭連接到管道就是 Observable.create()似袁。
舉個例子免得前面描述太過于抽象昙衅,先來看看第一個例子
觀察按鈕點擊事件
在 CheeseActivity
類中有這么一段代碼
// 1
private Observable<String> createButtonClickObservable() {
// 2
return Observable.create(new ObservableOnSubscribe<String>() {
// 3
@Override
public void subscribe(final ObservableEmitter<String> emitter) throws Exception {
// 4
mSearchButton.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View view) {
// 5
emitter.onNext(mQueryEditText.getText().toString());
}
});
// 6
emitter.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
// 7
mSearchButton.setOnClickListener(null);
}
});
}
});
}
上面這段代碼做了以下幾件事情
- 定義了一個方法會返回一個Observable著瓶,泛型是String類型蟹但。
- 通過
Observable.create()
創(chuàng)建了一個observable 谭羔,并提供了一個ObservableOnSubscribe瘟裸。 - 在參數(shù)的內(nèi)部類中覆寫了
subscribe()
方法话告。 - 給搜索按鈕mSearchButton添加了一個點擊事件沙郭。
- 當(dāng)點擊事件觸發(fā)時病线,調(diào)用emitter 的onNext 方法送挑,并傳遞了當(dāng)前mQueryEditText的值惕耕。
- 在Java中保持引用容易造成內(nèi)存泄漏司澎,在不再需要的時候及時移除listeners是一個好習(xí)慣挤安,那么這里怎么移除呢漱受?ObservableEmitter 有一個
setCancellable()
方法昂羡。通過重寫cancel()方法虐先,然后當(dāng)Observable 被處理的時候這個實現(xiàn)會被回調(diào)蛹批,比如已經(jīng)結(jié)束或者是所有的觀察者都解除了訂閱。 - 通過setOnClickListener(null) 來移除監(jiān)聽差导。
現(xiàn)在被觀察者Observable 已經(jīng)有了设褐,還需要觀察者來進行訂閱助析,在此之前外冀,我們先看看另一個接口雪隧, Consumer
膀跌,它可以十分簡單的從emitter 接收到數(shù)據(jù)捅伤。
public interface Consumer<T> {
void accept(T t) throws Exception;
}
如果僅是想要簡單的訂閱一下Observable丛忆,這個接口是很方便的熄诡。
Observable 的接口方法 subscribe() 可以接收很多類型的參數(shù)诗力,你可以訂閱一個全參數(shù)的版本,只要你實現(xiàn)其中所有的方法就可以菜拓。如果只是想要接收一下發(fā)射的數(shù)據(jù),可以使用單一的 Consumer 的版本贱鄙,這樣只需要實現(xiàn)一個方法姨谷,而且也是 onNext
疙剑。
我們可以直接在Activity的OnStart方法中來實現(xiàn)這個
@Override
protected void onStart() {
super.onStart();
// 1
Observable<String> searchTextObservable = createButtonClickObservable();
searchTextObservable
// 2
.subscribe(new Consumer<String>() {
//3
@Override
public void accept(String query) throws Exception {
// 4
showResult(mCheeseSearchEngine.search(query));
}
});
}
其中Consumer需要導(dǎo)的包是
import io.reactivex.functions.Consumer;
依次解釋一下上面每一步
- 創(chuàng)建一個Observable 基于前面寫的事件監(jiān)聽代碼
- 通過subscribe方法來訂閱這個Observable 言缤,并提供一個單一的 Consumer
- 重寫Consumer 方法管挟,這會在按鈕點擊的時候接收到發(fā)射出來的EditText的值
- 搜索并展示結(jié)果
這樣一個簡單的實現(xiàn)也寫完了守谓,運行一下APP斋荞,跑出來的結(jié)果就像下面這樣
RxJava線程模型
雖然已經(jīng)像模像樣的寫了一個小程序平酿,但其實存在一些問題蜈彼。當(dāng)按鈕按下去后這個UI線程實際上被阻塞住了
如果在控制臺可能可以看到這樣的提示
> 08-24 14:36:34.554 3500-3500/com.raywenderlich.cheesefinder I/Choreographer: Skipped 119 frames!
The application may be doing too much work on its main thread.
這是由于search 發(fā)生在主線程棍辕,如果是一個網(wǎng)絡(luò)請求的話痢毒,Android會直接crash哪替,拋出一個NetworkOnMainThreadException 的異常凭舶。如果不指定線程帅霜,那么RxJava的操作會一直在一個線程上身冀。
通過 subscribeOn
和 observeOn
兩個操作符能改變線程的執(zhí)行狀態(tài)搂根。
subscribeOn
在操作鏈上最好只調(diào)用一次剩愧,如果多次調(diào)用仁卷,依然只有第一次生效
subscribeOn
用來指定 observable 在哪個線程上創(chuàng)建執(zhí)行操作,如果想要通過observables 發(fā)射事件給Android的View丰介,那么需要保證訂閱者在Android的UI線程上執(zhí)行操作基矮。
另一方面家浇, observeOn
可以在鏈上調(diào)用多次钢悲,它主要是用來指定下一個操作在哪一個線程上執(zhí)行莺琳,來個例子:
myObservable // observable will be subscribed on i/o thread
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.map(/* this will be called on main thread... */)
.doOnNext(/* ...and everything below until next observeOn */)
.observeOn(Schedulers.io())
.subscribe(/* this will be called on i/o thread */);
主要用到三種schedulers:
Schedulers.io(): 適合I/O類型的操作珍手,比如網(wǎng)絡(luò)請求辞做,磁盤操作秤茅。
Schedulers.computation(): 適合計算任務(wù)课幕,比如事件循環(huán)或者回調(diào)處理乍惊。
AndroidSchedulers.mainThread() : 回調(diào)主線程污桦,比如UI操作凡橱。
Map 操作符
map操作符通過運用一個方法把從一個observable 發(fā)射的數(shù)據(jù)再返回成另一個observable給那些調(diào)用的稼钩。
比如你有一個observable稱之為numbers,并且會發(fā)射一系列的值达罗,如下所示
通過map操作符的apply方法
numbers.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer number) throws Exception {
return number * number;
}
}
然后結(jié)果就像下面這樣
再來個實例坝撑,我們用這個操作符能夠把前面的代碼拆分一下
@Override
protected void onStart() {
super.onStart();
Observable<String> searchTextObservable = createButtonClickObservable();
searchTextObservable
// 1
.observeOn(Schedulers.io())
// 2
.map(new Function<String, List<String>>() {
@Override
public List<String> apply(String query) {
return mCheeseSearchEngine.search(query);
}
})
// 3
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<List<String>>() {
@Override
public void accept(List<String> result) {
showResult(result);
}
});
}
簡述一下代碼,首先粮揉,指定下一次操作在I/O線程上巡李,然后通過給的String,執(zhí)行search返回一個結(jié)果列表侨拦,
再將線程從I/O上變更為主線程,showResult
辐宾,展示返回的數(shù)據(jù)狱从。
通過doOnNext顯示進度條
為了用戶體驗膨蛮,我們需要一個進度條
這里可以引入 doOnNext
操作符,doOnNext
有一個 Consumer
季研,并且在每次observable 發(fā)射數(shù)據(jù)的時候都會被調(diào)用敞葛,再改一下前面的代碼
@Override
protected void onStart() {
super.onStart();
Observable<String> searchTextObservable = createButtonClickObservable();
searchTextObservable
// 1
.observeOn(AndroidSchedulers.mainThread())
// 2
.doOnNext(new Consumer<String>() {
@Override
public void accept(String s) {
showProgressBar();
}
})
.observeOn(Schedulers.io())
.map(new Function<String, List<String>>() {
@Override
public List<String> apply(String query) {
return mCheeseSearchEngine.search(query);
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<List<String>>() {
@Override
public void accept(List<String> result) {
// 3
hideProgressBar();
showResult(result);
}
});
}
每次在點擊按鈕的時候就能收到一個事件
首先把線程切換到主線程,然后在 doOnNext
里面來顯示進度條与涡,再把線程切換到子線程惹谐,來進行請求數(shù)據(jù),最后在切換回來關(guān)閉進度條递沪,展示數(shù)據(jù)豺鼻。RxJava非常適合這種需求,代碼也很清晰款慨。
把這個例子跑起來的效果就像下面這樣,點擊的時候就顯示進度條:
觀察EditText變化
除了通過點擊按鈕來搜索谬莹,更好的方式就是根據(jù)EditText的text內(nèi)容變化自動的搜索檩奠。
首先,就需要對EditText的內(nèi)容變化進行訂閱觀察附帽,先看代碼實例:
//1
private Observable<String> createTextChangeObservable() {
//2
Observable<String> textChangeObservable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(final ObservableEmitter<String> emitter) throws Exception {
//3
final TextWatcher watcher = new TextWatcher() {
@Override
public void beforeTextChanged(CharSequence s, int start, int count, int after) {}
@Override
public void afterTextChanged(Editable s) {}
//4
@Override
public void onTextChanged(CharSequence s, int start, int before, int count) {
emitter.onNext(s.toString());
}
};
//5
mQueryEditText.addTextChangedListener(watcher);
//6
emitter.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
mQueryEditText.removeTextChangedListener(watcher);
}
});
}
});
// 7
return textChangeObservable;
}
分析一下上面這幾步代碼:
- 定義一個方法返回一個EditText變化的observable
- 通過
Observable.create
創(chuàng)建一個textChangeObservable 埠戳,傳入一個ObservableOnSubscribe 對象 - 在subscribe 方法中,創(chuàng)建一個TextWatcher蕉扮,這是用來監(jiān)聽值變化的
- 這里不用管
beforeTextChanged()
和afterTextChanged()
整胃,在onTextChanged 里面,把這個數(shù)據(jù)通過emitter.onNext 發(fā)射出去喳钟,這樣訂閱的觀察者就能接收到 - 通過addTextChangedListener將Edittext綁定上這個watcher監(jiān)聽
- 最后在emitter的setCancellable中去移除這個監(jiān)聽屁使,防止內(nèi)存泄漏
實現(xiàn)了這個Observable后就可以把前面的給替換掉
Observable<String> searchTextObservable = createTextChangeObservable();
再跑一次程序,就可以邊輸入邊搜索了
內(nèi)容長度攔截過濾
現(xiàn)在可能有一個需求是在輸入長度比較短的時候不進行搜索奔则,達到一定字符后才搜索蛮寂,RxJava引入了一個 filter
操作符。
filter只會通過那些滿足條件的item易茬,filter通過一個 Predicate
酬蹋,這個接口內(nèi)部有一個 test
方法用來決定是否滿足條件,最后會返回一個boolean 值抽莱。
這里范抓,Predicate 拿到的是一個輸入字符String,如果長度大于或等于2食铐,就返回true匕垫,表示滿足條件。
return textChangeObservable
.filter(new Predicate<String>() {
@Override
public boolean test(String query) throws Exception {
return query.length() >= 2;
}
});
注意Predicate需要導(dǎo)的包是:
import io.reactivex.functions.Predicate;
再前面創(chuàng)建Observable的代碼后面加一個 filter
后璃岳,當(dāng)query的長度不足2時年缎,那這個值就不會被發(fā)射出去悔捶,然后訂閱的就收不到這個消息。
跑起來就像這樣单芜,只輸一個數(shù)蜕该,返回false,不會觸發(fā)搜索洲鸠。
再輸一個字符就通過了filter的過濾堂淡。
Debounce 操作符
有時我們對于EditText內(nèi)容頻繁變化的場景并不想每次變化都去新發(fā)送一個請求,所以扒腕,這里又引入了一個新的操作符 debounce
绢淀,意思就是防抖動,這個和filter比較類似瘾腰,也是一種攔截的策略皆的。
這個操作符是根據(jù)item被發(fā)射的時間來進行過濾。每次在一個item被發(fā)射后蹋盆,debounce 會等待一段指定長度的時間费薄,然后才去發(fā)射下一個item。
如果在這段時間內(nèi)都沒有一個item發(fā)生栖雾,那么上一個最后的item會被發(fā)射出去楞抡,這樣能保證起碼有一個item能被發(fā)射成功。
從圖里看到析藕,2召廷,3,4账胧,5觸發(fā)的時間非常的接近竞慢,所以這一段時間內(nèi)前三個都被過濾了,只留下了5找爱。
在前面的
createTextChangeObservable()
中梗顺,我們再添加一個 debounce
操作符在 filter
的后面
return textChangeObservable
.filter(new Predicate<String>() {
@Override
public boolean test(String query) throws Exception {
return query.length() >= 2;
}
}).debounce(1000, TimeUnit.MILLISECONDS); // add this line
再跑一下APP,可以看到中間階段直接省略了车摄,最后搜索了一下結(jié)果值
Merge 操作符
一開始我們實現(xiàn)了一個observable 是監(jiān)聽點擊按鈕的事件寺谤,然后又實現(xiàn)了一個observable 是監(jiān)聽EditText的內(nèi)容變化,那么怎么把這兩個合二為一呢吮播。
RxJava提供了很多的操作符來聯(lián)合observables变屁,但是其中最有用和簡單的就是 merge
。
merge
可以將兩個或更多的observable 聯(lián)合起來意狠,合成一個單一的observable粟关。
這里我們把前面兩個observable 綁定起來
Observable<String> buttonClickStream = createButtonClickObservable();
Observable<String> textChangeStream = createTextChangeObservable();
Observable<String> searchTextObservable = Observable.merge(textChangeStream, buttonClickStream);
現(xiàn)在的效果就是前面的兩種效果的結(jié)合體,無論是自動搜索還是手動搜索都是可以觸發(fā)的环戈。
RxJava和Activity/Fragment生命周期
前面我們實現(xiàn)過 setCancellable
方法闷板,這個方法會在解除訂閱的時候回調(diào)澎灸。
Observable.subscribe()
會返回一個Disposable,Disposable是一個接口遮晚,其中有兩個方法:
public interface Disposable {
void dispose(); // ends a subscription
boolean isDisposed(); // returns true if resource is disposed (unsubscribed)
}
我們先在 CheeseActivity
中定義一個Disposable
private Disposable mDisposable;
在 onStart()
中性昭,把 subscribe()
的返回值賦給mDisposable
mDisposable = searchTextObservable // change this line
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<String>() {
@Override
public void accept(String s) {
showProgressBar();
}
})
.observeOn(Schedulers.io())
.map(new Function<String, List<String>>() {
@Override
public List<String> apply(String query) {
return mCheeseSearchEngine.search(query);
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<List<String>>() {
@Override
public void accept(List<String> result) {
hideProgressBar();
showResult(result);
}
});
最后我們就能在 onStop()
中去解除這個訂閱,代碼如下:
@Override
protected void onStop() {
super.onStop();
if (!mDisposable.isDisposed()) {
mDisposable.dispose();
}
}
這樣就解除了訂閱县遣。
后記
你可以下載這篇文章中的代碼程序糜颠,下載地址
當(dāng)然這篇文章只是講到了RxJava世界的一小點,比如萧求,JakeWharton大神的庫 RxBinding 其兴,這個庫里面包括大量的Android View的API,你可以通過調(diào)用 RxView.clicks(viewVariable)
來創(chuàng)建一個點擊事件observable 夸政。
除此之外元旬,學(xué)習(xí)更多有關(guān)RxJava的知識,可以看 官方文檔守问。