在正文開始之前的最后熔萧,放上GitHub鏈接和引入依賴的gradle代碼: Github:
https://github.com/ReactiveX/RxJava
https://github.com/ReactiveX/RxAndroid
引入依賴:
compile 'io.reactivex:rxjava:1.0.14'
compile 'io.reactivex:rxandroid:1.0.1'
(版本號是文章發(fā)布時的最新穩(wěn)定版)
一個詞:異步捣辆。
RxJava 在 GitHub 主頁上的自我介紹是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一個在 Java VM 上使用可觀測的序列來組成異步的匈挖、基于事件的程序的庫)畅姊。這就是 RxJava ,概括得非常精準萍丐。
然而柄冲,對于初學者來說,這太難看懂了。因為它是一個『總結』增淹,而初學者更需要一個『引言』椿访。
其實, RxJava 的本質可以壓縮為異步這一個詞虑润。說到根上赎离,它就是一個實現異步操作的庫,而別的定語都是基于這之上的端辱。
換句話說梁剔,『同樣是做異步,為什么人們用它舞蔽,而不用現成的 AsyncTask / Handler / XXX / ... 荣病?』
一個詞:簡潔。
異步操作很關鍵的一點是程序的簡潔性渗柿,因為在調度過程比較復雜的情況下个盆,異步代碼經常會既難寫也難被讀懂。 Android 創(chuàng)造的AsyncTask和Handler朵栖,其實都是為了讓異步代碼更加簡潔胳蛮。RxJava 的優(yōu)勢也是簡潔也切,但它的簡潔的與眾不同之處在于,隨著程序邏輯變得越來越復雜,它依然能夠保持簡潔桐经。
假設有這樣一個需求:界面上有一個自定義的視圖imageCollectorView速和,它的作用是顯示多張圖片圆雁,并能使用addImage(Bitmap)方法來任意增加顯示的圖片〉氐恚現在需要程序將一個給出的目錄數組File[] folders中每個目錄下的 png 圖片都加載出來并顯示在imageCollectorView中。需要注意的是臼寄,由于讀取圖片的這一過程較為耗時霸奕,需要放在后臺執(zhí)行,而圖片的顯示則必須在 UI 線程執(zhí)行吉拳。常用的實現方式有多種质帅,我這里貼出其中一種:
new Thread() {
@Override
public voidrun() {
super.run();
for (File folder : folders) {
File[] files = folder.listFiles();
for (File file : files) {
if (file.getName().endsWith(".png")) {
finalBitmap bitmap = getBitmapFromFile(file);
getActivity().runOnUiThread(new Runnable() {
@Override
public voidrun() {
imageCollectorView.addImage(bitmap);
}
});
}
}
}
}
}.start();
而如果使用 RxJava ,實現方式是這樣的:
Observable.from(folders)
.flatMap(new Func1>() {
@Override
public Observablecall(File file) {
return Observable.from(file.listFiles());
}
})
.filter(new Func1() {
@Override
public Booleancall(File file) {
return file.getName().endsWith(".png");
}
})
.map(new Func1() {
@Override
public Bitmapcall(File file) {
return getBitmapFromFile(file);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1() {
@Override
public voidcall(Bitmap bitmap) {
imageCollectorView.addImage(bitmap);
}
});
那位說話了:『你這代碼明明變多了傲粼堋煤惩!簡潔個毛啊稼跳!』大兄弟你消消氣盟庞,我說的是邏輯的簡潔吃沪,不是單純的代碼量少(邏輯簡潔才是提升讀寫代碼速度的必殺技對不汤善?)。觀察一下你會發(fā)現, RxJava 的這個實現红淡,是一條從上到下的鏈式調用不狮,沒有任何嵌套,這在邏輯的簡潔性上是具有優(yōu)勢的在旱。當需求變得復雜時摇零,這種優(yōu)勢將更加明顯(試想如果還要求只選取前 10 張圖片,常規(guī)方式要怎么辦桶蝎?如果有更多這樣那樣的要求呢驻仅?再試想,在這一大堆需求實現完兩個月之后需要改功能登渣,當你翻回這里看到自己當初寫下的那一片迷之縮進噪服,你能保證自己將迅速看懂,而不是對著代碼重新捋一遍思路胜茧?)粘优。
另外,如果你的 IDE 是 Android Studio 呻顽,其實每次打開某個 Java 文件的時候雹顺,你會看到被自動 Lambda 化的預覽,這將讓你更加清晰地看到程序邏輯:
Observable.from(folders)
.flatMap((Func1) (folder) -> { Observable.from(file.listFiles()) })
.filter((Func1) (file) -> { file.getName().endsWith(".png") })
.map((Func1) (file) -> { getBitmapFromFile(file) })
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe((Action1) (bitmap) -> { imageCollectorView.addImage(bitmap) });
如果你習慣使用 Retrolambda 廊遍,你也可以直接把代碼寫成上面這種簡潔的形式嬉愧。而如果你看到這里還不知道什么是 Retrolambda ,我不建議你現在就去學習它喉前。原因有兩點:1. Lambda 是把雙刃劍英染,它讓你的代碼簡潔的同時,降低了代碼的可讀性被饿,因此同時學習 RxJava 和 Retrolambda 可能會讓你忽略 RxJava 的一些技術細節(jié)四康;2. Retrolambda 是 Java 6/7 對 Lambda 表達式的非官方兼容方案,它的向后兼容性和穩(wěn)定性是無法保障的狭握,因此對于企業(yè)項目闪金,使用 Retrolambda 是有風險的。所以论颅,與很多 RxJava 的推廣者不同哎垦,我并不推薦在學習 RxJava 的同時一起學習 Retrolambda。事實上恃疯,我個人雖然很欣賞 Retrolambda漏设,但我從來不用它。
在Flipboard 的 Android 代碼中今妄,有一段邏輯非常復雜郑口,包含了多次內存操作鸳碧、本地文件操作和網絡操作,對象分分合合犬性,線程間相互配合相互等待瞻离,一會兒排成人字,一會兒排成一字乒裆。如果使用常規(guī)的方法來實現套利,肯定是要寫得欲仙欲死,然而在使用 RxJava 的情況下鹤耍,依然只是一條鏈式調用就完成了肉迫。它很長,但很清晰稿黄。
所以昂拂, RxJava 好在哪?就好在簡潔抛猖,好在那把什么復雜邏輯都能穿成一條線的簡潔格侯。
這個我就做不到一個詞說明了……因為這一節(jié)的主要內容就是一步步地說明 RxJava 到底怎樣做到了異步,怎樣做到了簡潔财著。
1. 概念:擴展的觀察者模式
RxJava 的異步實現联四,是通過一種擴展的觀察者模式來實現的。
觀察者模式
先簡述一下觀察者模式撑教,已經熟悉的可以跳過這一段朝墩。
觀察者模式面向的需求是:A 對象(觀察者)對 B 對象(被觀察者)的某種變化高度敏感,需要在 B 變化的一瞬間做出反應伟姐。舉個例子收苏,新聞里喜聞樂見的警察抓小偷,警察需要在小偷伸手作案的時候實施抓捕愤兵。在這個例子里鹿霸,警察是觀察者,小偷是被觀察者秆乳,警察需要時刻盯著小偷的一舉一動懦鼠,才能保證不會漏過任何瞬間。程序的觀察者模式和這種真正的『觀察』略有不同屹堰,觀察者不需要時刻盯著被觀察者(例如 A 不需要每過 2ms 就檢查一次 B 的狀態(tài))肛冶,而是采用注冊(Register)或者稱為訂閱(Subscribe)的方式,告訴被觀察者:我需要你的某某狀態(tài)扯键,你要在它變化的時候通知我睦袖。 Android 開發(fā)中一個比較典型的例子是點擊監(jiān)聽器OnClickListener。對設置OnClickListener來說荣刑,View是被觀察者馅笙,OnClickListener是觀察者伦乔,二者通過setOnClickListener()方法達成訂閱關系。訂閱之后用戶點擊按鈕的瞬間延蟹,Android Framework 就會將點擊事件發(fā)送給已經注冊的OnClickListener。采取這樣被動的觀察方式叶堆,既省去了反復檢索狀態(tài)的資源消耗阱飘,也能夠得到最高的反饋速度。當然虱颗,這也得益于我們可以隨意定制自己程序中的觀察者和被觀察者沥匈,而警察叔叔明顯無法要求小偷『你在作案的時候務必通知我』。
OnClickListener 的模式大致如下圖:
如圖所示忘渔,通過setOnClickListener()方法高帖,Button持有OnClickListener的引用(這一過程沒有在圖上畫出);當用戶點擊時畦粮,Button自動調用OnClickListener的onClick()方法散址。另外,如果把這張圖中的概念抽象出來(Button-> 被觀察者宣赔、OnClickListener-> 觀察者预麸、setOnClickListener()-> 訂閱,onClick()-> 事件)儒将,就由專用的觀察者模式(例如只用于監(jiān)聽控件點擊)轉變成了通用的觀察者模式吏祸。如下圖:
而 RxJava 作為一個工具庫,使用的就是通用形式的觀察者模式钩蚊。
RxJava 的觀察者模式
RxJava 有四個基本概念:Observable(可觀察者贡翘,即被觀察者)、Observer(觀察者)砰逻、subscribe(訂閱)鸣驱、事件。Observable和Observer通過subscribe()方法實現訂閱關系蝠咆,從而Observable可以在需要的時候發(fā)出事件來通知Observer丐巫。
與傳統觀察者模式不同, RxJava 的事件回調方法除了普通事件onNext()(相當于onClick()/onEvent())之外勺美,還定義了兩個特殊的事件:onCompleted()和onError()递胧。
onCompleted(): 事件隊列完結。RxJava 不僅把每個事件單獨處理赡茸,還會把它們看做一個隊列缎脾。RxJava 規(guī)定,當不會再有新的onNext()發(fā)出時占卧,需要觸發(fā)onCompleted()方法作為標志遗菠。
onError(): 事件隊列異常联喘。在事件處理過程中出異常時,onError()會被觸發(fā)辙纬,同時隊列自動終止豁遭,不允許再有事件發(fā)出。
在一個正確運行的事件序列中,onCompleted()和onError()有且只有一個贺拣,并且是事件序列中的最后一個蓖谢。需要注意的是,onCompleted()和onError()二者也是互斥的譬涡,即在隊列中調用了其中一個闪幽,就不應該再調用另一個。
RxJava 的觀察者模式大致如下圖:
2. 基本實現
基于以上的概念涡匀, RxJava 的基本實現主要有三點:
1) 創(chuàng)建 Observer
Observer 即觀察者盯腌,它決定事件觸發(fā)的時候將有怎樣的行為。 RxJava 中的Observer接口的實現方式:
Observer observer = new Observer() {
@Override
public voidonNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public voidonCompleted() {
Log.d(tag, "Completed!");
}
@Override
public voidonError(Throwable e) {
Log.d(tag, "Error!");
}
};
除了Observer接口之外陨瘩,RxJava 還內置了一個實現了Observer的抽象類:Subscriber腕够。Subscriber對Observer接口進行了一些擴展,但他們的基本使用方式是完全一樣的:
Subscriber subscriber = new Subscriber() {
@Override
public voidonNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public voidonCompleted() {
Log.d(tag, "Completed!");
}
@Override
public voidonError(Throwable e) {
Log.d(tag, "Error!");
}
};
不僅基本使用方式一樣舌劳,實質上燕少,在 RxJava 的 subscribe 過程中,Observer也總是會先被轉換成一個Subscriber再使用蒿囤。所以如果你只想使用基本功能客们,選擇Observer和Subscriber是完全一樣的。它們的區(qū)別對于使用者來說主要有兩點:
onStart(): 這是Subscriber增加的方法材诽。它會在 subscribe 剛開始底挫,而事件還未發(fā)送之前被調用,可以用于做一些準備工作脸侥,例如數據的清零或重置建邓。這是一個可選方法,默認情況下它的實現為空睁枕。需要注意的是官边,如果對準備工作的線程有要求(例如彈出一個顯示進度的對話框,這必須在主線程執(zhí)行)外遇,onStart()就不適用了注簿,因為它總是在 subscribe 所發(fā)生的線程被調用,而不能指定線程跳仿。要在指定的線程來做準備工作诡渴,可以使用doOnSubscribe()方法,具體可以在后面的文中看到菲语。
unsubscribe(): 這是Subscriber所實現的另一個接口Subscription的方法妄辩,用于取消訂閱惑灵。在這個方法被調用后,Subscriber將不再接收事件眼耀。一般在這個方法調用前英支,可以使用isUnsubscribed()先判斷一下狀態(tài)。unsubscribe()這個方法很重要哮伟,因為在subscribe()之后干花,Observable會持有Subscriber的引用,這個引用如果不能及時被釋放澈吨,將有內存泄露的風險把敢。所以最好保持一個原則:要在不再使用的時候盡快在合適的地方(例如onPause()onStop()等方法中)調用unsubscribe()來解除引用關系寄摆,以避免內存泄露的發(fā)生谅辣。
2) 創(chuàng)建 Observable
Observable 即被觀察者,它決定什么時候觸發(fā)事件以及觸發(fā)怎樣的事件婶恼。 RxJava 使用create()方法來創(chuàng)建一個 Observable 桑阶,并為它定義事件觸發(fā)規(guī)則:
Observable observable = Observable.create(new Observable.OnSubscribe() {
@Override
public voidcall(Subscriber subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onNext("Aloha");
subscriber.onCompleted();
}
});
可以看到,這里傳入了一個OnSubscribe對象作為參數勾邦。OnSubscribe會被存儲在返回的Observable對象中蚣录,它的作用相當于一個計劃表,當Observable被訂閱的時候眷篇,OnSubscribe的call()方法會自動被調用萎河,事件序列就會依照設定依次觸發(fā)(對于上面的代碼,就是觀察者Subscriber將會被調用三次onNext()和一次onCompleted())蕉饼。這樣虐杯,由被觀察者調用了觀察者的回調方法,就實現了由被觀察者向觀察者的事件傳遞昧港,即觀察者模式擎椰。
這個例子很簡單:事件的內容是字符串,而不是一些復雜的對象创肥;事件的內容是已經定好了的达舒,而不像有的觀察者模式一樣是待確定的(例如網絡請求的結果在請求返回之前是未知的);所有事件在一瞬間被全部發(fā)送出去叹侄,而不是夾雜一些確定或不確定的時間間隔或者經過某種觸發(fā)器來觸發(fā)的巩搏。總之趾代,這個例子看起來毫無實用價值塔猾。但這是為了便于說明,實質上只要你想稽坤,各種各樣的事件發(fā)送規(guī)則你都可以自己來寫丈甸。至于具體怎么做糯俗,后面都會講到,但現在不行睦擂。只有把基礎原理先說明白了得湘,上層的運用才能更容易說清楚。
create()方法是 RxJava 最基本的創(chuàng)造事件序列的方法顿仇√哉基于這個方法, RxJava 還提供了一些方法用來快捷創(chuàng)建事件隊列臼闻,例如:
just(T...): 將傳入的參數依次發(fā)送出來鸿吆。
Observable observable = Observable.just("Hello", "Hi", "Aloha");
// 將會依次調用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
from(T[])/from(Iterable): 將傳入的數組或Iterable拆分成具體對象后,依次發(fā)送出來述呐。
String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);
// 將會依次調用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
上面just(T...)的例子和from(T[])的例子惩淳,都和之前的create(OnSubscribe)的例子是等價的。
3) Subscribe (訂閱)
創(chuàng)建了Observable和Observer之后乓搬,再用subscribe()方法將它們聯結起來思犁,整條鏈子就可以工作了。代碼形式很簡單:
observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);
有人可能會注意到进肯,subscribe()這個方法有點怪:它看起來是『observalbe訂閱了observer/subscriber』而不是『observer/subscriber訂閱了observalbe』激蹲,這看起來就像『雜志訂閱了讀者』一樣顛倒了對象關系。這讓人讀起來有點別扭江掩,不過如果把 API 設計成observer.subscribe(observable)/subscriber.subscribe(observable)学辱,雖然更加符合思維邏輯,但對流式 API 的設計就造成影響了环形,比較起來明顯是得不償失的策泣。
Observable.subscribe(Subscriber)的內部實現是這樣的(僅核心代碼):
// 注意:這不是 subscribe() 的源碼,而是將源碼中與性能斟赚、兼容性着降、擴展性有關的代碼剔除后的核心代碼。
// 如果需要看源碼拗军,可以去 RxJava 的 GitHub 倉庫下載任洞。
public Subscriptionsubscribe(Subscriber subscriber) {
subscriber.onStart();
onSubscribe.call(subscriber);
return subscriber;
}
可以看到,subscriber()做了3件事:
調用Subscriber.onStart()发侵。這個方法在前面已經介紹過交掏,是一個可選的準備方法。
調用Observable中的OnSubscribe.call(Subscriber)刃鳄。在這里盅弛,事件發(fā)送的邏輯開始運行。從這也可以看出,在 RxJava 中挪鹏,Observable并不是在創(chuàng)建的時候就立即開始發(fā)送事件见秽,而是在它被訂閱的時候,即當subscribe()方法執(zhí)行的時候讨盒。
將傳入的Subscriber作為Subscription返回解取。這是為了方便unsubscribe().
整個過程中對象間的關系如下圖:
或者可以看動圖:
除了subscribe(Observer)和subscribe(Subscriber),subscribe()還支持不完整定義的回調返顺,RxJava 會自動根據定義創(chuàng)建出Subscriber禀苦。形式如下:
Action1 onNextAction = new Action1() {
// onNext()
@Override
public voidcall(String s) {
Log.d(tag, s);
}
};
Action1 onErrorAction = new Action1() {
// onError()
@Override
public voidcall(Throwable throwable) {
// Error handling
}
};
Action0 onCompletedAction = new Action0() {
// onCompleted()
@Override
public voidcall() {
Log.d(tag, "completed");
}
};
// 自動創(chuàng)建 Subscriber ,并使用 onNextAction 來定義 onNext()
observable.subscribe(onNextAction);
// 自動創(chuàng)建 Subscriber 遂鹊,并使用 onNextAction 和 onErrorAction 來定義 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自動創(chuàng)建 Subscriber 振乏,并使用 onNextAction、 onErrorAction 和 onCompletedAction 來定義 onNext()秉扑、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
簡單解釋一下這段代碼中出現的Action1和Action0慧邮。Action0是 RxJava 的一個接口,它只有一個方法call()邻储,這個方法是無參無返回值的赋咽;由于onCompleted()方法也是無參無返回值的旧噪,因此Action0可以被當成一個包裝對象吨娜,將onCompleted()的內容打包起來將自己作為一個參數傳入subscribe()以實現不完整定義的回調。這樣其實也可以看做將onCompleted()方法作為參數傳進了subscribe()淘钟,相當于其他某些語言中的『閉包』宦赠。Action1也是一個接口,它同樣只有一個方法call(T param)米母,這個方法也無返回值勾扭,但有一個參數;與Action0同理铁瞒,由于onNext(T obj)和onError(Throwable error)也是單參數無返回值的妙色,因此Action1可以將onNext(obj)和onError(error)打包起來傳入subscribe()以實現不完整定義的回調。事實上慧耍,雖然Action0和Action1在 API 中使用最廣泛身辨,但 RxJava 是提供了多個ActionX形式的接口 (例如Action2,Action3) 的,它們可以被用以包裝不同的無返回值的方法芍碧。
注:正如前面所提到的煌珊,Observer和Subscriber具有相同的角色,而且Observer在subscribe()過程中最終會被轉換成Subscriber對象泌豆,因此定庵,從這里開始,后面的描述我將用Subscriber來代替Observer,這樣更加嚴謹蔬浙。
4) 場景示例
下面舉兩個例子:
為了把原理用更清晰的方式表述出來猪落,本文中挑選的都是功能盡可能簡單的例子,以至于有些示例代碼看起來會有『畫蛇添足』『明明不用 RxJava 可以更簡便地解決問題』的感覺畴博。當你看到這種情況许布,不要覺得是因為 RxJava 太啰嗦,而是因為在過早的時候舉出真實場景的例子并不利于原理的解析绎晃,因此我刻意挑選了簡單的情景蜜唾。
a. 打印字符串數組
將字符串數組names中的所有字符串依次打印出來:
String[] names = ...;
Observable.from(names)
.subscribe(new Action1() {
@Override
public voidcall(String name) {
Log.d(tag, name);
}
});
b. 由 id 取得圖片并顯示
由指定的一個 drawable 文件 iddrawableRes取得圖片,并顯示在ImageView中庶艾,并在出現異常的時候打印 Toast 報錯:
int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe() {
@Override
public voidcall(Subscriber subscriber) {
Drawable drawable = getTheme().getDrawable(drawableRes));
subscriber.onNext(drawable);
subscriber.onCompleted();
}
}).subscribe(new Observer() {
@Override
public voidonNext(Drawable drawable) {
imageView.setImageDrawable(drawable);
}
@Override
public voidonCompleted() {
}
@Override
public voidonError(Throwable e) {
Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
}
});
正如上面兩個例子這樣袁余,創(chuàng)建出Observable和Subscriber其做,再用subscribe()將它們串起來惫周,一次 RxJava 的基本使用就完成了。非常簡單拍摇。
然而煤裙,
在 RxJava 的默認規(guī)則中掩完,事件的發(fā)出和消費都是在同一個線程的。也就是說硼砰,如果只用上面的方法且蓬,實現出來的只是一個同步的觀察者模式。觀察者模式本身的目的就是『后臺處理题翰,前臺回調』的異步機制恶阴,因此異步對于 RxJava 是至關重要的。而要實現異步豹障,則需要用到 RxJava 的另一個概念:Scheduler冯事。
3. 線程控制 —— Scheduler (一)
在不指定線程的情況下, RxJava 遵循的是線程不變的原則血公,即:在哪個線程調用subscribe()昵仅,就在哪個線程生產事件;在哪個線程生產事件累魔,就在哪個線程消費事件摔笤。如果需要切換線程,就需要用到Scheduler(調度器)薛夜。
1) Scheduler 的 API (一)
在RxJava 中籍茧,Scheduler——調度器,相當于線程控制器梯澜,RxJava 通過它來指定每一段代碼應該運行在什么樣的線程寞冯。RxJava 已經內置了幾個Scheduler渴析,它們已經適合大多數的使用場景:
Schedulers.immediate(): 直接在當前線程運行,相當于不指定線程吮龄。這是默認的Scheduler俭茧。
Schedulers.newThread(): 總是啟用新線程,并在新線程執(zhí)行操作漓帚。
Schedulers.io(): I/O 操作(讀寫文件母债、讀寫數據庫、網絡信息交互等)所使用的Scheduler尝抖。行為模式和newThread()差不多毡们,區(qū)別在于io()的內部實現是是用一個無數量上限的線程池,可以重用空閑的線程昧辽,因此多數情況下io()比newThread()更有效率衙熔。不要把計算工作放在io()中,可以避免創(chuàng)建不必要的線程搅荞。
Schedulers.computation(): 計算所使用的Scheduler红氯。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作咕痛,例如圖形的計算痢甘。這個Scheduler使用的固定的線程池,大小為 CPU 核數茉贡。不要把 I/O 操作放在computation()中塞栅,否則 I/O 操作的等待時間會浪費 CPU。
另外块仆, Android 還有一個專用的AndroidSchedulers.mainThread()构蹬,它指定的操作將在 Android 主線程運行王暗。
有了這幾個Scheduler悔据,就可以使用subscribeOn()和observeOn()兩個方法來對線程進行控制了。 *subscribeOn(): 指定subscribe()所發(fā)生的線程俗壹,即Observable.OnSubscribe被激活時所處的線程科汗。或者叫做事件產生的線程绷雏。 *observeOn(): 指定Subscriber所運行在的線程头滔。或者叫做事件消費的線程涎显。
文字敘述總歸難理解坤检,上代碼:
Observable.just(1, 2, 3, 4)
.subscribeOn(Schedulers.io()) // 指定 subscribe() 發(fā)生在 IO 線程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調發(fā)生在主線程
.subscribe(new Action1() {
@Override
public voidcall(Integer number) {
Log.d(tag, "number:" + number);
}
});
上面這段代碼中,由于subscribeOn(Schedulers.io())的指定期吓,被創(chuàng)建的事件的內容1早歇、2、3、4將會在 IO 線程發(fā)出箭跳;而由于observeOn(AndroidScheculers.mainThread()) 的指定晨另,因此subscriber數字的打印將發(fā)生在主線程 。事實上谱姓,這種在subscribe()之前寫上兩句subscribeOn(Scheduler.io())和observeOn(AndroidSchedulers.mainThread())的使用方式非常常見借尿,它適用于多數的 『后臺線程取數據,主線程顯示』的程序策略屉来。
而前面提到的由圖片 id 取得圖片并顯示的例子路翻,如果也加上這兩句:
int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe() {
@Override
public voidcall(Subscriber subscriber) {
Drawable drawable = getTheme().getDrawable(drawableRes));
subscriber.onNext(drawable);
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.io()) // 指定 subscribe() 發(fā)生在 IO 線程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調發(fā)生在主線程
.subscribe(new Observer() {
@Override
public voidonNext(Drawable drawable) {
imageView.setImageDrawable(drawable);
}
@Override
public voidonCompleted() {
}
@Override
public voidonError(Throwable e) {
Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
}
});
那么,加載圖片將會發(fā)生在 IO 線程茄靠,而設置圖片則被設定在了主線程帚桩。這就意味著,即使加載圖片耗費了幾十甚至幾百毫秒的時間嘹黔,也不會造成絲毫界面的卡頓账嚎。
2) Scheduler 的原理 (一)
RxJava 的 Scheduler API 很方便,也很神奇(加了一句話就把線程切換了儡蔓,怎么做到的郭蕉?而且subscribe()不是最外層直接調用的方法嗎,它竟然也能被指定線程喂江?)召锈。然而 Scheduler 的原理需要放在后面講,因為它的原理是以下一節(jié)《變換》的原理作為基礎的获询。
好吧這一節(jié)其實我屁也沒說涨岁,只是為了讓你安心,讓你知道我不是忘了講原理吉嚣,而是把它放在了更合適的地方梢薪。
4. 變換
終于要到牛逼的地方了,不管你激動不激動尝哆,反正我是激動了秉撇。
RxJava 提供了對事件序列進行變換的支持,這是它的核心功能之一秋泄,也是大多數人說『RxJava 真是太好用了』的最大原因琐馆。所謂變換,就是將事件序列中的對象或整個序列進行加工處理恒序,轉換成不同的事件或事件序列瘦麸。概念說著總是模糊難懂的,來看 API歧胁。
1) API
首先看一個map()的例子:
Observable.just("images/logo.png") // 輸入類型 String
.map(new Func1() {
@Override
public Bitmapcall(String filePath) { // 參數類型 String
return getBitmapFromPath(filePath); // 返回類型 Bitmap
}
})
.subscribe(new Action1() {
@Override
public voidcall(Bitmap bitmap) { // 參數類型 Bitmap
showBitmap(bitmap);
}
});
這里出現了一個叫做Func1的類滋饲。它和Action1非常相似彤敛,也是 RxJava 的一個接口,用于包裝含有一個參數的方法了赌。Func1和Action的區(qū)別在于墨榄,Func1包裝的是有返回值的方法。另外勿她,和ActionX一樣袄秩,FuncX也有多個,用于不同參數個數的方法逢并。FuncX和ActionX的區(qū)別在FuncX包裝的是有返回值的方法之剧。
可以看到,map()方法將參數中的String對象轉換成一個Bitmap對象后返回砍聊,而在經過map()方法后背稼,事件的參數類型也由String轉為了Bitmap。這種直接變換對象并返回的玻蝌,是最常見的也最容易理解的變換蟹肘。不過 RxJava 的變換遠不止這樣,它不僅可以針對事件對象俯树,還可以針對整個事件隊列帘腹,這使得 RxJava 變得非常靈活。我列舉幾個常用的變換:
map(): 事件對象的直接變換许饿,具體功能上面已經介紹過阳欲。它是 RxJava 最常用的變換。map()的示意圖:
flatMap(): 這是一個很有用但非常難理解的變換陋率,因此我決定花多些篇幅來介紹它球化。 首先假設這么一種需求:假設有一個數據結構『學生』,現在需要打印出一組學生的名字瓦糟。實現方式很簡單:
Student[] students = ...;
Subscriber subscriber = new Subscriber() {
@Override
public void onNext(String name) {
Log.d(tag, name);
}
...
};
Observable.from(students)
.map(new Func1() {
@Override
public Stringcall(Student student) {
return student.getName();
}
})
.subscribe(subscriber);
很簡單筒愚。那么再假設:如果要打印出每個學生所需要修的所有課程的名稱呢?(需求的區(qū)別在于狸页,每個學生只有一個名字锨能,但卻有多個課程。)首先可以這樣實現:
Student[] students = ...;
Subscriber subscriber = new Subscriber() {
@Override
public voidonNext(Student student) {
List courses = student.getCourses();
for (int i = 0; i < courses.size(); i++) {
Course course = courses.get(i);
Log.d(tag, course.getName());
}
}
...
};
Observable.from(students)
.subscribe(subscriber);
依然很簡單芍耘。那么如果我不想在Subscriber中使用 for 循環(huán),而是希望Subscriber中直接傳入單個的Course對象呢(這對于代碼復用很重要)熄阻?用map()顯然是不行的斋竞,因為map()是一對一的轉化,而我現在的要求是一對多的轉化秃殉。那怎么才能把一個 Student 轉化成多個 Course 呢坝初?
這個時候浸剩,就需要用flatMap()了:
Student[] students = ...;
Subscriber subscriber = new Subscriber() {
@Override
public voidonNext(Course course) {
Log.d(tag, course.getName());
}
...
};
Observable.from(students)
.flatMap(new Func1>() {
@Override
public Observablecall(Student student) {
return Observable.from(student.getCourses());
}
})
.subscribe(subscriber);
從上面的代碼可以看出,flatMap()和map()有一個相同點:它也是把傳入的參數轉化之后返回另一個對象鳄袍。但需要注意绢要,和map()不同的是,flatMap()中返回的是個Observable對象拗小,并且這個Observable對象并不是被直接發(fā)送到了Subscriber的回調方法中重罪。flatMap()的原理是這樣的:1. 使用傳入的事件對象創(chuàng)建一個Observable對象;2. 并不發(fā)送這個Observable, 而是將它激活哀九,于是它開始發(fā)送事件剿配;3. 每一個創(chuàng)建出來的Observable發(fā)送的事件,都被匯入同一個Observable阅束,而這個Observable負責將這些事件統一交給Subscriber的回調方法呼胚。這三個步驟,把事件拆成了兩級息裸,通過一組新創(chuàng)建的Observable將初始的對象『鋪平』之后通過統一路徑分發(fā)了下去蝇更。而這個『鋪平』就是flatMap()所謂的 flat。
flatMap()示意圖:
擴展:由于可以在嵌套的Observable中添加異步代碼呼盆,flatMap()也常用于嵌套的異步操作簿寂,例如嵌套的網絡請求。示例代碼(Retrofit + RxJava):
networkClient.token() // 返回 Observable宿亡,在訂閱時請求 token常遂,并在響應后發(fā)送 token
.flatMap(new Func1>() {
@Override
public Observablecall(String token) {
// 返回 Observable,在訂閱時請求消息列表挽荠,并在響應后發(fā)送請求到的消息列表
return networkClient.messages();
}
})
.subscribe(new Action1() {
@Override
public voidcall(Messages messages) {
// 處理顯示消息列表
showMessages(messages);
}
});
傳統的嵌套請求需要使用嵌套的 Callback 來實現克胳。而通過flatMap(),可以把嵌套的請求寫在一條鏈中圈匆,從而保持程序邏輯的清晰漠另。
throttleFirst(): 在每次事件觸發(fā)后的一定時間間隔內丟棄新的事件。常用作去抖動過濾跃赚,例如按鈕的點擊監(jiān)聽器:RxView.clickEvents(button) // RxBinding 代碼笆搓,后面的文章有解釋 .throttleFirst(500, TimeUnit.MILLISECONDS) // 設置防抖間隔為 500ms .subscribe(subscriber);媽媽再也不怕我的用戶手抖點開兩個重復的界面啦。
此外纬傲, RxJava 還提供很多便捷的方法來實現事件序列的變換满败,這里就不一一舉例了。
2) 變換的原理:lift()
這些變換雖然功能各有不同叹括,但實質上都是針對事件序列的處理和再發(fā)送算墨。而在 RxJava 的內部,它們是基于同一個基礎的變換方法:lift(Operator)汁雷。首先看一下lift()的內部實現(僅核心代碼):
// 注意:這不是 lift() 的源碼净嘀,而是將源碼中與性能报咳、兼容性、擴展性有關的代碼剔除后的核心代碼挖藏。
// 如果需要看源碼暑刃,可以去 RxJava 的 GitHub 倉庫下載。
public Observablelift(Operator operator) {
return Observable.create(new OnSubscribe() {
@Override
public voidcall(Subscriber subscriber) {
Subscriber newSubscriber = operator.call(subscriber);
newSubscriber.onStart();
onSubscribe.call(newSubscriber);
}
});
}
這段代碼很有意思:它生成了一個新的Observable并返回膜眠,而且創(chuàng)建新Observable所用的參數OnSubscribe的回調方法call()中的實現竟然看起來和前面講過的Observable.subscribe()一樣岩臣!然而它們并不一樣喲~不一樣的地方關鍵就在于第二行onSubscribe.call(subscriber)中的onSubscribe所指代的對象不同(高能預警:接下來的幾句話可能會導致身體的嚴重不適)——
subscribe()中這句話的onSubscribe指的是Observable中的onSubscribe對象,這個沒有問題柴底,但是lift()之后的情況就復雜了點婿脸。
當含有l(wèi)ift()時:
1.lift()創(chuàng)建了一個Observable后,加上之前的原始Observable柄驻,已經有兩個Observable了狐树;
2.而同樣地,新Observable里的新OnSubscribe加上之前的原始Observable中的原始OnSubscribe鸿脓,也就有了兩個OnSubscribe抑钟;
3.當用戶調用經過lift()后的Observable的subscribe()的時候,使用的是lift()所返回的新的Observable野哭,于是它所觸發(fā)的onSubscribe.call(subscriber)在塔,也是用的新Observable中的新OnSubscribe,即在lift()中生成的那個OnSubscribe蛔溃;
4.而這個新OnSubscribe的call()方法中的onSubscribe,就是指的原始Observable中的原始OnSubscribe篱蝇,在這個call()方法里贺待,新OnSubscribe利用operator.call(subscriber)生成了一個新的Subscriber(Operator就是在這里,通過自己的call()方法將新Subscriber和原始Subscriber進行關聯零截,并插入自己的『變換』代碼以實現變換)麸塞,然后利用這個新Subscriber向原始Observable進行訂閱。
這樣就實現了lift()過程涧衙,有點像一種代理機制哪工,通過事件攔截和處理實現事件序列的變換。
精簡掉細節(jié)的話弧哎,也可以這么說:在Observable執(zhí)行了lift(Operator)方法之后雁比,會返回一個新的Observable,這個新的Observable會像一個代理一樣傻铣,負責接收原始的Observable發(fā)出的事件章贞,并在處理后發(fā)送給Subscriber。
如果你更喜歡具象思維非洲,可以看圖:
或者可以看動圖:
兩次和多次的lift()同理鸭限,如下圖:
舉一個具體的Operator的實現。下面這是一個將事件中的Integer對象轉換成String的例子两踏,僅供參考:
observable.lift(new Observable.Operator() {
@Override
public Subscriber call(finalSubscriber subscriber) {
// 將事件序列中的 Integer 對象轉換為 String 對象
return new Subscriber() {
@Override
public voidonNext(Integer integer) {
subscriber.onNext("" + integer);
}
@Override
public voidonCompleted() {
subscriber.onCompleted();
}
@Override
public voidonError(Throwable e) {
subscriber.onError(e);
}
};
}
});
講述lift()的原理只是為了讓你更好地了解 RxJava 败京,從而可以更好地使用它。然而不管你是否理解了lift()的原理梦染,RxJava 都不建議開發(fā)者自定義Operator來直接使用lift()赡麦,而是建議盡量使用已有的lift()包裝方法(如map()flatMap()等)進行組合來實現需求,因為直接使用 lift() 非常容易發(fā)生一些難以發(fā)現的錯誤帕识。
3) compose: 對 Observable 整體的變換
除了lift()之外泛粹,Observable還有一個變換方法叫做compose(Transformer)。它和lift()的區(qū)別在于肮疗,lift()是針對事件項和事件序列的晶姊,而compose()是針對Observable自身進行變換。舉個例子伪货,假設在程序中有多個Observable们衙,并且他們都需要應用一組相同的lift()變換。你可以這么寫:
observable1
.lift1()
.lift2()
.lift3()
.lift4()
.subscribe(subscriber1);
observable2
.lift1()
.lift2()
.lift3()
.lift4()
.subscribe(subscriber2);
observable3
.lift1()
.lift2()
.lift3()
.lift4()
.subscribe(subscriber3);
observable4
.lift1()
.lift2()
.lift3()
.lift4()
.subscribe(subscriber1);
你覺得這樣太不軟件工程了碱呼,于是你改成了這樣:
private ObservableliftAll(Observable observable) {
return observable
.lift1()
.lift2()
.lift3()
.lift4();
}
...
liftAll(observable1).subscribe(subscriber1);
liftAll(observable2).subscribe(subscriber2);
liftAll(observable3).subscribe(subscriber3);
liftAll(observable4).subscribe(subscriber4);
可讀性蒙挑、可維護性都提高了∮尥危可是Observable被一個方法包起來忆蚀,這種方式對于Observale的靈活性似乎還是增添了那么點限制。怎么辦姑裂?這個時候馋袜,就應該用compose()來解決了:
public class LiftAllTransformerimplementsObservable.Transformer {
@Override
public Observablecall(Observable observable) {
return observable
.lift1()
.lift2()
.lift3()
.lift4();
}
}
...
Transformer liftAll = new LiftAllTransformer();
observable1.compose(liftAll).subscribe(subscriber1);
observable2.compose(liftAll).subscribe(subscriber2);
observable3.compose(liftAll).subscribe(subscriber3);
observable4.compose(liftAll).subscribe(subscriber4);
像上面這樣,使用compose()方法炭分,Observable可以利用傳入的Transformer對象的call方法直接對自身進行處理桃焕,也就不必被包在方法的里面了。
compose()的原理比較簡單捧毛,不附圖嘍观堂。
5. 線程控制:Scheduler (二)
除了靈活的變換,RxJava 另一個牛逼的地方呀忧,就是線程的自由控制师痕。
1) Scheduler 的 API (二)
前面講到了,可以利用subscribeOn()結合observeOn()來實現線程控制而账,讓事件的產生和消費發(fā)生在不同的線程胰坟。可是在了解了map()flatMap()等變換方法后泞辐,有些好事的(其實就是當初剛接觸 RxJava 時的我)就問了:能不能多切換幾次線程笔横?
答案是:能竞滓。因為observeOn()指定的是Subscriber的線程,而這個Subscriber并不是(嚴格說應該為『不一定是』吹缔,但這里不妨理解為『不是』)subscribe()參數中的Subscriber商佑,而是observeOn()執(zhí)行時的當前Observable所對應的Subscriber,即它的直接下級Subscriber厢塘。換句話說茶没,observeOn()指定的是它之后的操作所在的線程。因此如果有多次切換線程的需求晚碾,只要在每個想要切換線程的位置調用一次observeOn()即可抓半。上代碼:
Observable.just(1, 2, 3, 4) // IO 線程,由 subscribeOn() 指定
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.map(mapOperator) // 新線程格嘁,由 observeOn() 指定
.observeOn(Schedulers.io())
.map(mapOperator2) // IO 線程笛求,由 observeOn() 指定
.observeOn(AndroidSchedulers.mainThread)
.subscribe(subscriber);? // Android 主線程,由 observeOn() 指定
如上讥蔽,通過observeOn()的多次調用涣易,程序實現了線程的多次切換。
不過冶伞,不同于observeOn()新症,subscribeOn()的位置放在哪里都可以,但它是只能調用一次的响禽。
又有好事的(其實還是當初的我)問了:如果我非要調用多次subscribeOn()呢徒爹?會有什么效果?
這個問題先放著芋类,我們還是從 RxJava 線程控制的原理說起吧隆嗅。
2) Scheduler 的原理(二)
其實,subscribeOn()和observeOn()的內部實現侯繁,也是用的lift()胖喳。具體看圖(不同顏色的箭頭表示不同的線程):
subscribeOn()原理圖:
observeOn()原理圖:
從圖中可以看出,subscribeOn()和observeOn()都做了線程切換的工作(圖中的 "schedule..." 部位)贮竟。不同的是丽焊,subscribeOn()的線程切換發(fā)生在OnSubscribe中,即在它通知上一級OnSubscribe時咕别,這時事件還沒有開始發(fā)送技健,因此subscribeOn()的線程控制可以從事件發(fā)出的開端就造成影響;而observeOn()的線程切換則發(fā)生在它內建的Subscriber中惰拱,即發(fā)生在它即將給下一級Subscriber發(fā)送事件時雌贱,因此observeOn()控制的是它后面的線程。
最后,我用一張圖來解釋當多個subscribeOn()和observeOn()混合使用時欣孤,線程調度是怎么發(fā)生的(由于圖中對象較多馋没,相對于上面的圖對結構做了一些簡化調整):
圖中共有 5 處含有對事件的操作。由圖中可以看出导街,①和②兩處受第一個subscribeOn()影響披泪,運行在紅色線程纤子;③和④處受第一個observeOn()的影響搬瑰,運行在綠色線程;⑤處受第二個onserveOn()影響控硼,運行在紫色線程泽论;而第二個subscribeOn(),由于在通知過程中線程就被第一個subscribeOn()截斷卡乾,因此對整個流程并沒有任何影響翼悴。這里也就回答了前面的問題:當使用了多個subscribeOn()的時候,只有第一個subscribeOn()起作用幔妨。
3) 延伸:doOnSubscribe()
然而鹦赎,雖然超過一個的subscribeOn()對事件處理的流程沒有影響,但在流程之前卻是可以利用的误堡。
在前面講Subscriber的時候古话,提到過Subscriber的onStart()可以用作流程開始前的初始化。然而onStart()由于在subscribe()發(fā)生時就被調用了锁施,因此不能指定線程陪踩,而是只能執(zhí)行在subscribe()被調用時的線程。這就導致如果onStart()中含有對線程有要求的代碼(例如在界面上顯示一個 ProgressBar悉抵,這必須在主線程執(zhí)行)肩狂,將會有線程非法的風險,因為有時你無法預測subscribe()將會在什么線程執(zhí)行姥饰。
而與Subscriber.onStart()相對應的傻谁,有一個方法Observable.doOnSubscribe()。它和Subscriber.onStart()同樣是在subscribe()調用后而且在事件發(fā)送前執(zhí)行列粪,但區(qū)別在于它可以指定線程审磁。默認情況下,doOnSubscribe()執(zhí)行在subscribe()發(fā)生的線程篱竭;而如果在doOnSubscribe()之后有subscribeOn()的話力图,它將執(zhí)行在離它最近的subscribeOn()所指定的線程。
示例代碼:
Observable.create(onSubscribe)
.subscribeOn(Schedulers.io())
.doOnSubscribe(new Action0() {
@Override
public voidcall() {
progressBar.setVisibility(View.VISIBLE); // 需要在主線程執(zhí)行
}
})
.subscribeOn(AndroidSchedulers.mainThread()) // 指定主線程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
如上掺逼,在doOnSubscribe()的后面跟一個subscribeOn()吃媒,就能指定準備工作的線程了。
1. 與 Retrofit 的結合
Retrofit 是 Square 的一個著名的網絡請求庫。沒有用過 Retrofit 的可以選擇跳過這一小節(jié)也沒關系赘那,我舉的每種場景都只是個例子刑桑,而且例子之間并無前后關聯,只是個拋磚引玉的作用募舟,所以你跳過這里看別的場景也可以的祠斧。
Retrofit 除了提供了傳統的Callback形式的 API,還有 RxJava 版本的Observable形式 API拱礁。下面我用對比的方式來介紹 Retrofit 的 RxJava 版 API 和傳統版本的區(qū)別琢锋。
以獲取一個User對象的接口作為例子。使用Retrofit 的傳統 API呢灶,你可以用這樣的方式來定義請求:
@GET("/user")
public void getUser(@Query("userId") String userId, Callback callback);
在程序的構建過程中吴超, Retrofit 會把自動把方法實現并生成代碼,然后開發(fā)者就可以利用下面的方法來獲取特定用戶并處理響應:
getUser(userId, new Callback() {
@Override
public voidsuccess(User user) {
userView.setUser(user);
}
@Override
public voidfailure(RetrofitError error) {
// Error handling
...
}
};
而使用 RxJava 形式的 API,定義同樣的請求是這樣的:
@GET("/user")
public Observable getUser(@Query("userId") String userId);
使用的時候是這樣的:
getUser(userId)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer() {
@Override
public voidonNext(User user) {
userView.setUser(user);
}
@Override
public voidonCompleted() {
}
@Override
public voidonError(Throwable error) {
// Error handling
...
}
});
看到區(qū)別了嗎?
當 RxJava 形式的時候诬留,Retrofit 把請求封裝進Observable,在請求結束后調用onNext()或在請求失敗后調用onError()鸟悴。
對比來看,Callback形式和Observable形式長得不太一樣奖年,但本質都差不多细诸,而且在細節(jié)上Observable形式似乎還比Callback形式要差點。那 Retrofit 為什么還要提供 RxJava 的支持呢拾并?
因為它好用白嵫摺!從這個例子看不出來是因為這只是最簡單的情況嗅义。而一旦情景復雜起來屏歹,Callback形式馬上就會開始讓人頭疼。比如:
假設這么一種情況:你的程序取到的User并不應該直接顯示之碗,而是需要先與數據庫中的數據進行比對和修正后再顯示蝙眶。使用Callback方式大概可以這么寫:
getUser(userId, new Callback() {
@Override
public voidsuccess(User user) {
processUser(user); // 嘗試修正 User 數據
userView.setUser(user);
}
@Override
public voidfailure(RetrofitError error) {
// Error handling
...
}
};
有問題嗎?
很簡便褪那,但不要這樣做幽纷。為什么?因為這樣做會影響性能博敬。數據庫的操作很重友浸,一次讀寫操作花費 10~20ms 是很常見的,這樣的耗時很容易造成界面的卡頓偏窝。所以通常情況下收恢,如果可以的話一定要避免在主線程中處理數據庫武学。所以為了提升性能,這段代碼可以優(yōu)化一下:
getUser(userId, new Callback() {
@Override
public voidsuccess(User user) {
new Thread() {
@Override
public voidrun() {
processUser(user); // 嘗試修正 User 數據
runOnUiThread(new Runnable() { // 切回 UI 線程
@Override
public voidrun() {
userView.setUser(user);
}
});
}).start();
}
@Override
public voidfailure(RetrofitError error) {
// Error handling
...
}
};
性能問題解決伦意,但……這代碼實在是太亂了火窒,迷之縮進啊驮肉!雜亂的代碼往往不僅僅是美觀問題熏矿,因為代碼越亂往往就越難讀懂,而如果項目中充斥著雜亂的代碼离钝,無疑會降低代碼的可讀性票编,造成團隊開發(fā)效率的降低和出錯率的升高。
這時候奈辰,如果用 RxJava 的形式栏妖,就好辦多了。 RxJava 形式的代碼是這樣的:
getUser(userId)
.doOnNext(new Action1() {
@Override
public voidcall(User user) {
processUser(user);
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer() {
@Override
public voidonNext(User user) {
userView.setUser(user);
}
@Override
public voidonCompleted() {
}
@Override
public voidonError(Throwable error) {
// Error handling
...
}
});
后臺代碼和前臺代碼全都寫在一條鏈中奖恰,明顯清晰了很多。
再舉一個例子:假設/user接口并不能直接訪問宛裕,而需要填入一個在線獲取的token瑟啃,代碼應該怎么寫?
Callback方式揩尸,可以使用嵌套的Callback:
@GET("/token")
public voidgetToken(Callback callback);
@GET("/user")
public voidgetUser(@Query("token") String token, @Query("userId") String userId, Callback callback);
...
getToken(new Callback() {
@Override
public voidsuccess(String token) {
getUser(token, userId, new Callback() {
@Override
public voidsuccess(User user) {
userView.setUser(user);
}
@Override
public voidfailure(RetrofitError error) {
// Error handling
...
}
};
}
@Override
public voidfailure(RetrofitError error) {
// Error handling
...
}
});
倒是沒有什么性能問題蛹屿,可是迷之縮進毀一生,你懂我也懂岩榆,做過大項目的人應該更懂错负。
而使用 RxJava 的話,代碼是這樣的:
@GET("/token")
public ObservablegetToken();
@GET("/user")
public ObservablegetUser(@Query("token") String token, @Query("userId") String userId);
...
getToken()
.flatMap(new Func1>() {
@Override
public ObservableonNext(String token) {
return getUser(token, userId);
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer() {
@Override
public voidonNext(User user) {
userView.setUser(user);
}
@Override
public voidonCompleted() {
}
@Override
public voidonError(Throwable error) {
// Error handling
...
}
});
用一個flatMap()就搞定了邏輯勇边,依然是一條鏈犹撒。看著就很爽粒褒,是吧识颊?
2016/03/31 更新,加上我寫的一個 Sample 項目:
好奕坟,Retrofit 部分就到這里祥款。
2. RxBinding
RxBinding是 Jake Wharton 的一個開源庫,它提供了一套在 Android 平臺上的基于 RxJava 的 Binding API月杉。所謂 Binding刃跛,就是類似設置OnClickListener、設置TextWatcher這樣的注冊綁定對象的 API苛萎。
舉個設置點擊監(jiān)聽的例子桨昙。使用RxBinding跌帐,可以把事件監(jiān)聽用這樣的方法來設置:
Button button = ...;
RxView.clickEvents(button) // 以 Observable 形式來反饋點擊事件
.subscribe(new Action1() {
@Override
public voidcall(ViewClickEvent event) {
// Click handling
}
});
看起來除了形式變了沒什么區(qū)別,實質上也是這樣绊率。甚至如果你看一下它的源碼谨敛,你會發(fā)現它連實現都沒什么驚喜:它的內部是直接用一個包裹著的setOnClickListener()來實現的。然而滤否,僅僅這一個形式的改變脸狸,卻恰好就是RxBinding的目的:擴展性。通過RxBinding把點擊監(jiān)聽轉換成Observable之后藐俺,就有了對它進行擴展的可能炊甲。擴展的方式有很多,根據需求而定欲芹。一個例子是前面提到過的throttleFirst()卿啡,用于去抖動,也就是消除手抖導致的快速連環(huán)點擊:
RxView.clickEvents(button)
.throttleFirst(500, TimeUnit.MILLISECONDS)
.subscribe(clickAction);
如果想對RxBinding有更多了解菱父,可以去它的GitHub 項目下面看看颈娜。
3. 各種異步操作
前面舉的Retrofit和RxBinding的例子,是兩個可以提供現成的Observable的庫浙宜。而如果你有某些異步操作無法用這些庫來自動生成Observable官辽,也完全可以自己寫。例如數據庫的讀寫粟瞬、大圖片的載入同仆、文件壓縮/解壓等各種需要放在后臺工作的耗時操作,都可以用 RxJava 來實現裙品,有了之前幾章的例子俗批,這里應該不用再舉例了。
4. RxBus
RxBus 名字看起來像一個庫市怎,但它并不是一個庫岁忘,而是一種模式,它的思想是使用 RxJava 來實現了 EventBus 焰轻,而讓你不再需要使用Otto或者 GreenRobot 的EventBus臭觉。至于什么是 RxBus,可以看這篇文章辱志。順便說一句蝠筑,Flipboard 已經用 RxBus 替換掉了Otto,目前為止沒有不良反應揩懒。
對于 Android 開發(fā)者來說什乙, RxJava 是一個很難上手的庫,因為它對于 Android 開發(fā)者來說有太多陌生的概念了已球〕剂停可是它真的很牛逼辅愿。因此,我寫了這篇《給 Android 開發(fā)者的 RxJava 詳解》忆某,希望能給始終搞不明白什么是 RxJava 的人一些入門的指引点待,或者能讓正在使用 RxJava 但仍然心存疑惑的人看到一些更深入的解析。無論如何弃舒,只要能給各位同為 Android 工程師的你們提供一些幫助癞埠,這篇文章的目的就達到了。