前言
我從去年開始使用 RxJava ,到現(xiàn)在一年多了昵骤。今年加入了 Flipboard 后,看到 Flipboard 的 Android 項(xiàng)目也在使用 RxJava ,并且使用的場景越來越多 鳄炉。而最近這幾個月,我也發(fā)現(xiàn)國內(nèi)越來越多的人開始提及 RxJava 搜骡。有人說『RxJava 真是太好用了』拂盯,有人說『RxJava 真是太難用了』,另外更多的人表示:我真的百度了也谷歌了记靡,但我還是想問: RxJava 到底是什么谈竿?
鑒于 RxJava 目前這種既火爆又神秘的現(xiàn)狀,而我又在一年的使用過程中對 RxJava 有了一些理解摸吠,我決定寫下這篇文章來對 RxJava 做一個相對詳細(xì)的空凸、針對 Android 開發(fā)者的介紹。
這篇文章的目的有兩個: 1. 給對 RxJava 感興趣的人一些入門的指引 2. 給正在使用 RxJava 但仍然心存疑惑的人一些更深入的解析
RxJava 到底是什么
RxJava 好在哪
API 介紹和原理簡析1. 概念:擴(kuò)展的觀察者模式觀察者模式
RxJava 的觀察者模式
2. 基本實(shí)現(xiàn)1) 創(chuàng)建 Observer
2) 創(chuàng)建 Observable
3) Subscribe (訂閱)
4) 場景示例a. 打印字符串?dāng)?shù)組
b. 由 id 取得圖片并顯示
3. 線程控制 —— Scheduler (一)1) Scheduler 的 API (一)
2) Scheduler 的原理 (一)
4. 變換1) API
2) 變換的原理:lift()
3) compose: 對 Observable 整體的變換
5. 線程控制:Scheduler (二)1) Scheduler 的 API (二)
2) Scheduler 的原理(二)
3) 延伸:doOnSubscribe()
RxJava 的適用場景和使用方式1. 與 Retrofit 的結(jié)合
2. RxBinding
3. 各種異步操作
4. RxBus
在正文開始之前的最后劫恒,放上 GitHub
鏈接和引入依賴的 gradle
代碼: Github: https://github.com/ReactiveX/RxJava https://github.com/ReactiveX/RxAndroid 引入依賴: compile 'io.reactivex:rxjava:1.0.14'
compile 'io.reactivex:rxandroid:1.0.1'
(版本號是文章發(fā)布時的最新穩(wěn)定版)
另外,感謝 RxJava 核心成員流火楓林的技術(shù)支持和內(nèi)測讀者代碼家轿腺、鮑永章两嘴、drakeet、馬琳族壳、有時放縱憔辫、程序亦非猿、大頭鬼仿荆、XZoomEye贰您、席德雨、TCahead拢操、Tiiime锦亦、Ailurus、宅學(xué)長令境、妖孽杠园、大大大大大臣哥、NicodeLee的幫助舔庶,以及周伯通招聘的贊助抛蚁。
RxJava 到底是什么
一個詞:異步陈醒。
RxJava 在 GitHub 主頁上的自我介紹是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一個在 Java VM 上使用可觀測的序列來組成異步的、基于事件的程序的庫)瞧甩。這就是 RxJava 钉跷,概括得非常精準(zhǔn)。
然而肚逸,對于初學(xué)者來說爷辙,這太難看懂了。因?yàn)樗且粋€『總結(jié)』朦促,而初學(xué)者更需要一個『引言』膝晾。
其實(shí), RxJava 的本質(zhì)可以壓縮為異步這一個詞思灰。說到根上玷犹,它就是一個實(shí)現(xiàn)異步操作的庫混滔,而別的定語都是基于這之上的洒疚。
RxJava 好在哪
換句話說,『同樣是做異步坯屿,為什么人們用它油湖,而不用現(xiàn)成的 AsyncTask / Handler / XXX / ... ?』
一個詞:簡潔领跛。
異步操作很關(guān)鍵的一點(diǎn)是程序的簡潔性乏德,因?yàn)樵谡{(diào)度過程比較復(fù)雜的情況下,異步代碼經(jīng)常會既難寫也難被讀懂吠昭。 Android 創(chuàng)造的AsyncTask
和Handler
喊括,其實(shí)都是為了讓異步代碼更加簡潔。RxJava 的優(yōu)勢也是簡潔矢棚,但它的簡潔的與眾不同之處在于郑什,隨著程序邏輯變得越來越復(fù)雜,它依然能夠保持簡潔蒲肋。
假設(shè)有這樣一個需求:界面上有一個自定義的視圖 imageCollectorView
蘑拯,它的作用是顯示多張圖片,并能使用 addImage(Bitmap)
方法來任意增加顯示的圖片《嫡常現(xiàn)在需要程序?qū)⒁粋€給出的目錄數(shù)組 File[] folders
中每個目錄下的 png 圖片都加載出來并顯示在imageCollectorView
中申窘。需要注意的是,由于讀取圖片的這一過程較為耗時孔轴,需要放在后臺執(zhí)行剃法,而圖片的顯示則必須在 UI 線程執(zhí)行。常用的實(shí)現(xiàn)方式有多種路鹰,我這里貼出其中一種:
new Thread() {
@Override public void run() {
super.run();
for (File folder : folders) {
File[] files = folder.listFiles();
for (File file : files) {
if (file.getName().endsWith(".png")) {
final Bitmap bitmap = getBitmapFromFile(file);
getActivity().runOnUiThread(new Runnable() {
@Override public void run() {
imageCollectorView.addImage(bitmap);
}
});
}
}
}
}
}.start();
而如果使用 RxJava 玄窝,實(shí)現(xiàn)方式是這樣的:
Observable.from(folders)
.flatMap(new Func1<File, Observable<File>>() {
@Override public Observable<File> call(File file) {
return Observable.from(file.listFiles());
}
})
.filter(new Func1<File, Boolean>() {
@Override public Boolean call(File file) {
return file.getName().endsWith(".png"); } }
)
.map(new Func1<File, Bitmap>() {
@Override public Bitmap call(File file) {
return getBitmapFromFile(file); } }
)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Bitmap>() {
@Override public void call(Bitmap bitmap) {
imageCollectorView.addImage(bitmap);
}
});
那位說話了:『你這代碼明明變多了扒K隆!簡潔個毛岸髦帽氓!』大兄弟你消消氣,我說的是邏輯的簡潔俩块,不是單純的代碼量少(邏輯簡潔才是提升讀寫代碼速度的必殺技對不黎休?)。觀察一下你會發(fā)現(xiàn)玉凯, RxJava 的這個實(shí)現(xiàn)势腮,是一條從上到下的鏈?zhǔn)秸{(diào)用,沒有任何嵌套漫仆,這在邏輯的簡潔性上是具有優(yōu)勢的捎拯。當(dāng)需求變得復(fù)雜時,這種優(yōu)勢將更加明顯(試想如果還要求只選取前 10 張圖片盲厌,常規(guī)方式要怎么辦署照?如果有更多這樣那樣的要求呢?再試想吗浩,在這一大堆需求實(shí)現(xiàn)完兩個月之后需要改功能建芙,當(dāng)你翻回這里看到自己當(dāng)初寫下的那一片迷之縮進(jìn),你能保證自己將迅速看懂懂扼,而不是對著代碼重新捋一遍思路禁荸?)。
另外阀湿,如果你的 IDE 是 Android Studio 赶熟,其實(shí)每次打開某個 Java 文件的時候,你會看到被自動 Lambda 化的預(yù)覽陷嘴,這將讓你更加清晰地看到程序邏輯:
Observable.from(folders)
.flatMap((Func1) (folder) -> { Observable.from(file.listFiles()) })
.filter((Func1) (file) -> { file.getName().endsWith(".png") })
.map((Func1) (file) -> { getBitmapFromFile(file) })
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe((Action1) (bitmap) -> {imageCollectorView.addImage(bitmap) });
如果你習(xí)慣使用 Retrolambda 映砖,你也可以直接把代碼寫成上面這種簡潔的形式。而如果你看到這里還不知道什么是 Retrolambda 罩旋,我不建議你現(xiàn)在就去學(xué)習(xí)它啊央。原因有兩點(diǎn):1. Lambda 是把雙刃劍,它讓你的代碼簡潔的同時涨醋,降低了代碼的可讀性瓜饥,因此同時學(xué)習(xí) RxJava 和 Retrolambda 可能會讓你忽略 RxJava 的一些技術(shù)細(xì)節(jié);2. Retrolambda 是 Java 6/7 對 Lambda 表達(dá)式的非官方兼容方案浴骂,它的向后兼容性和穩(wěn)定性是無法保障的乓土,因此對于企業(yè)項(xiàng)目,使用 Retrolambda 是有風(fēng)險的。所以趣苏,與很多 RxJava 的推廣者不同狡相,我并不推薦在學(xué)習(xí) RxJava 的同時一起學(xué)習(xí) Retrolambda。事實(shí)上食磕,我個人雖然很欣賞 Retrolambda尽棕,但我從來不用它。
在Flipboard 的 Android 代碼中彬伦,有一段邏輯非常復(fù)雜滔悉,包含了多次內(nèi)存操作、本地文件操作和網(wǎng)絡(luò)操作单绑,對象分分合合回官,線程間相互配合相互等待,一會兒排成人字搂橙,一會兒排成一字歉提。如果使用常規(guī)的方法來實(shí)現(xiàn),肯定是要寫得欲仙欲死区转,然而在使用 RxJava 的情況下苔巨,依然只是一條鏈?zhǔn)秸{(diào)用就完成了。它很長蜗帜,但很清晰恋拷。
所以资厉, RxJava 好在哪厅缺?就好在簡潔,好在那把什么復(fù)雜邏輯都能穿成一條線的簡潔宴偿。
API 介紹和原理簡析
這個我就做不到一個詞說明了……因?yàn)檫@一節(jié)的主要內(nèi)容就是一步步地說明 RxJava 到底怎樣做到了異步湘捎,怎樣做到了簡潔。
1. 概念:擴(kuò)展的觀察者模式
RxJava 的異步實(shí)現(xiàn)窄刘,是通過一種擴(kuò)展的觀察者模式來實(shí)現(xiàn)的窥妇。
觀察者模式
先簡述一下觀察者模式,已經(jīng)熟悉的可以跳過這一段娩践。
觀察者模式面向的需求是:A 對象(觀察者)對 B 對象(被觀察者)的某種變化高度敏感活翩,需要在 B 變化的一瞬間做出反應(yīng)。舉個例子翻伺,新聞里喜聞樂見的警察抓小偷材泄,警察需要在小偷伸手作案的時候?qū)嵤┳ゲ丁T谶@個例子里吨岭,警察是觀察者拉宗,小偷是被觀察者,警察需要時刻盯著小偷的一舉一動,才能保證不會漏過任何瞬間旦事。程序的觀察者模式和這種真正的『觀察』略有不同魁巩,觀察者不需要時刻盯著被觀察者(例如 A 不需要每過 2ms 就檢查一次 B 的狀態(tài)),而是采用注冊(Register)或者稱為訂閱(Subscribe)的方式姐浮,告訴被觀察者:我需要你的某某狀態(tài)谷遂,你要在它變化的時候通知我。 Android 開發(fā)中一個比較典型的例子是點(diǎn)擊監(jiān)聽器 OnClickListener卖鲤。對設(shè)置OnClickListener來說埋凯, View是被觀察者, OnClickListener是觀察者扫尖,二者通過 setOnClickListener()方法達(dá)成訂閱關(guān)系白对。訂閱之后用戶點(diǎn)擊按鈕的瞬間,Android Framework 就會將點(diǎn)擊事件發(fā)送給已經(jīng)注冊的 OnClickListener换怖。采取這樣被動的觀察方式甩恼,既省去了反復(fù)檢索狀態(tài)的資源消耗,也能夠得到最高的反饋速度沉颂。當(dāng)然条摸,這也得益于我們可以隨意定制自己程序中的觀察者和被觀察者,而警察叔叔明顯無法要求小偷『你在作案的時候務(wù)必通知我』铸屉。
OnClickListener 的模式大致如下圖:
如圖所示钉蒲,通過 setOnClickListener()
方法,Button
持有 OnClickListener
的引用(這一過程沒有在圖上畫出)彻坛;當(dāng)用戶點(diǎn)擊時顷啼,Button
自動調(diào)用 OnClickListener
的 onClick()
方法。另外昌屉,如果把這張圖中的概念抽象出來(Button
-> 被觀察者钙蒙、OnClickListener
-> 觀察者、setOnClickListener()
-> 訂閱间驮,onClick()
-> 事件)躬厌,就由專用的觀察者模式(例如只用于監(jiān)聽控件點(diǎn)擊)轉(zhuǎn)變成了通用的觀察者模式。如下圖:
而 RxJava 作為一個工具庫竞帽,使用的就是通用形式的觀察者模式扛施。
RxJava 的觀察者模式
RxJava 有四個基本概念:Observable(可觀察者,即被觀察者)屹篓、 Observer(觀察者)疙渣、 subscribe(訂閱)、事件Observable和Observer通過 subscribe()方法實(shí)現(xiàn)訂閱關(guān)系抱虐,從而 Observable可以在需要的時候發(fā)出事件來通知 Observer昌阿。與傳統(tǒng)觀察者模式不同, RxJava 的事件回調(diào)方法除了普通事件 onNext()(相當(dāng)于 onClick() / onEvent())之外,還定義了兩個特殊的事件:onCompleted()和 onError()懦冰。
onCompleted(): 事件隊(duì)列完結(jié)灶轰。RxJava 不僅把每個事件單獨(dú)處理,還會把它們看做一個隊(duì)列刷钢。RxJava 規(guī)定笋颤,當(dāng)不會再有新的onNext()發(fā)出時,需要觸發(fā) onCompleted()方法作為標(biāo)志内地。
onError(): 事件隊(duì)列異常伴澄。在事件處理過程中出異常時,onError()會被觸發(fā)阱缓,同時隊(duì)列自動終止非凌,不允許再有事件發(fā)出。
在一個正確運(yùn)行的事件序列中, onCompleted()和 onError()有且只有一個荆针,并且是事件序列中的最后一個敞嗡。需要注意的是,onCompleted()和 onError()二者也是互斥的航背,即在隊(duì)列中調(diào)用了其中一個喉悴,就不應(yīng)該再調(diào)用另一個。
RxJava 的觀察者模式大致如下圖:
2. 基本實(shí)現(xiàn)
基于以上的概念玖媚, RxJava 的基本實(shí)現(xiàn)主要有三點(diǎn):
1) 創(chuàng)建 Observer
Observer 即觀察者箕肃,它決定事件觸發(fā)的時候?qū)⒂性鯓拥男袨椤?RxJava 中的 Observer接口的實(shí)現(xiàn)方式:
Observer<String> observer = new Observer<String>() {
@Override public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};
除了 Observer接口之外,RxJava 還內(nèi)置了一個實(shí)現(xiàn)了 Observer的抽象類:Subscriber今魔。Subscriber對 Observer接口進(jìn)行了一些擴(kuò)展勺像,但他們的基本使用方式是完全一樣的:
Subscriber<String> subscriber = new Subscriber<String>() {
@Override public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};
不僅基本使用方式一樣,實(shí)質(zhì)上涡贱,在 RxJava 的 subscribe 過程中咏删,Observer也總是會先被轉(zhuǎn)換成一個 Subscriber再使用惹想。所以如果你只想使用基本功能问词,選擇 Observer 和 Subscriber是完全一樣的。它們的區(qū)別對于使用者來說主要有兩點(diǎn):
onStart()
: 這是 Subscriber增加的方法嘀粱。它會在 subscribe 剛開始激挪,而事件還未發(fā)送之前被調(diào)用,可以用于做一些準(zhǔn)備工作锋叨,例如數(shù)據(jù)的清零或重置垄分。這是一個可選方法,默認(rèn)情況下它的實(shí)現(xiàn)為空娃磺。需要注意的是薄湿,如果對準(zhǔn)備工作的線程有要求(例如彈出一個顯示進(jìn)度的對話框,這必須在主線程執(zhí)行), onStart() 就不適用了豺瘤,因?yàn)樗偸窃?subscribe 所發(fā)生的線程被調(diào)用吆倦,而不能指定線程。要在指定的線程來做準(zhǔn)備工作坐求,可以使用 doOnSubscribe() 方法蚕泽,具體可以在后面的文中看到。unsubscribe()
: 這是 Subscriber所實(shí)現(xiàn)的另一個接口 Subscription的方法桥嗤,用于取消訂閱须妻。在這個方法被調(diào)用后,Subscriber將不再接收事件泛领。一般在這個方法調(diào)用前荒吏,可以使用 isUnsubscribed()先判斷一下狀態(tài)。 unsubscribe()這個方法很重要渊鞋,因?yàn)樵趕ubscribe()之后司倚, Observable會持有 Subscriber的引用,這個引用如果不能及時被釋放篓像,將有內(nèi)存泄露的風(fēng)險动知。所以最好保持一個原則:要在不再使用的時候盡快在合適的地方(例如 onPause() onStop()等方法中)調(diào)用 unsubscribe()來解除引用關(guān)系,以避免內(nèi)存泄露的發(fā)生员辩。
2) 創(chuàng)建 Observable
Observable 即被觀察者盒粮,它決定什么時候觸發(fā)事件以及觸發(fā)怎樣的事件。 RxJava 使用 create()方法來創(chuàng)建一個 Observable 奠滑,并為它定義事件觸發(fā)規(guī)則:
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onNext("Aloha");
subscriber.onCompleted();
}
});
可以看到丹皱,這里傳入了一個 OnSubscribe對象作為參數(shù)。OnSubscribe會被存儲在返回的 Observable對象中宋税,它的作用相當(dāng)于一個計(jì)劃表摊崭,當(dāng) Observable被訂閱的時候,OnSubscribe的 call()方法會自動被調(diào)用杰赛,事件序列就會依照設(shè)定依次觸發(fā)(對于上面的代碼呢簸,就是觀察者Subscriber將會被調(diào)用三次 onNext()和一次 onCompleted())。這樣乏屯,由被觀察者調(diào)用了觀察者的回調(diào)方法根时,就實(shí)現(xiàn)了由被觀察者向觀察者的事件傳遞,即觀察者模式辰晕。
這個例子很簡單:事件的內(nèi)容是字符串蛤迎,而不是一些復(fù)雜的對象褒链;事件的內(nèi)容是已經(jīng)定好了的贷岸,而不像有的觀察者模式一樣是待確定的(例如網(wǎng)絡(luò)請求的結(jié)果在請求返回之前是未知的)幌甘;所有事件在一瞬間被全部發(fā)送出去,而不是夾雜一些確定或不確定的時間間隔或者經(jīng)過某種觸發(fā)器來觸發(fā)的刻恭”补遥總之抒痒,這個例子看起來毫無實(shí)用價值痴奏。但這是為了便于說明,實(shí)質(zhì)上只要你想胸遇,各種各樣的事件發(fā)送規(guī)則你都可以自己來寫荧呐。至于具體怎么做,后面都會講到纸镊,但現(xiàn)在不行倍阐。只有把基礎(chǔ)原理先說明白了,上層的運(yùn)用才能更容易說清楚逗威。
create()方法是 RxJava 最基本的創(chuàng)造事件序列的方法峰搪。基于這個方法凯旭, RxJava 還提供了一些方法用來快捷創(chuàng)建事件隊(duì)列概耻,例如:
-
just(T...): 將傳入的參數(shù)依次發(fā)送出來。
Observable observable = Observable.just("Hello", "Hi", "Aloha"); // 將會依次調(diào)用: // onNext("Hello"); // onNext("Hi"); // onNext("Aloha"); // onCompleted();
-
from(T[])/ from(Iterable<? extends T>) : 將傳入的數(shù)組或 Iterable拆分成具體對象后罐呼,依次發(fā)送出來鞠柄。
String[] words = {"Hello", "Hi", "Aloha"}; Observable observable = Observable.from(words); // 將會依次調(diào)用: // onNext("Hello"); // onNext("Hi"); // onNext("Aloha"); // onCompleted();
上面 just(T...)的例子和 from(T[])的例子,都和之前的 create(OnSubscribe)的例子是等價的嫉柴。
3) Subscribe (訂閱)
創(chuàng)建了 Observable和 Observer之后厌杜,再用 subscribe()方法將它們聯(lián)結(jié)起來,整條鏈子就可以工作了计螺。代碼形式很簡單:
observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);
有人可能會注意到夯尽,subscribe()這個方法有點(diǎn)怪:它看起來是『observalbe訂閱了 observer/ subscriber』而不是『observer /subscriber訂閱了 observalbe』,這看起來就像『雜志訂閱了讀者』一樣顛倒了對象關(guān)系登馒。這讓人讀起來有點(diǎn)別扭匙握,不過如果把 API 設(shè)計(jì)成observer.subscribe(observable)/ subscriber.subscribe(observable)
,雖然更加符合思維邏輯陈轿,但對流式 API 的設(shè)計(jì)就造成影響了圈纺,比較起來明顯是得不償失的。
Observable.subscribe(Subscriber)
的內(nèi)部實(shí)現(xiàn)是這樣的(僅核心代碼):
// 注意:這不是 subscribe() 的源碼济欢,而是將源碼中與性能赠堵、兼容性、擴(kuò)展性有關(guān)的代碼剔除后的核心代碼法褥。// 如果需要看源碼,可以去 RxJava 的 GitHub 倉庫下載酬屉。public Subscription subscribe(Subscriber subscriber) { subscriber.onStart(); onSubscribe.call(subscriber); return subscriber;}
可以看到半等,subscriber()做了3件事:
- 調(diào)用 Subscriber.onStart() 揍愁。這個方法在前面已經(jīng)介紹過,是一個可選的準(zhǔn)備方法杀饵。
- 調(diào)用 Observable中的 OnSubscribe.call(Subscriber)莽囤。在這里,事件發(fā)送的邏輯開始運(yùn)行切距。從這也可以看出朽缎,在 RxJava 中,Observable并不是在創(chuàng)建的時候就立即開始發(fā)送事件谜悟,而是在它被訂閱的時候话肖,即當(dāng) subscribe()方法執(zhí)行的時候。
- 將傳入的 Subscriber作為 Subscription返回葡幸。這是為了方便 unsubscribe()
.
整個過程中對象間的關(guān)系如下圖:
或者可以看動圖:
除了 subscribe(Observer)和 subscribe(Subscriber)最筒,subscribe()還支持不完整定義的回調(diào),RxJava 會自動根據(jù)定義創(chuàng)建出Subscriber蔚叨。形式如下:
Action1<String> onNextAction = new Action1<String>() {
// onNext()
@Override public void call(String s) {
Log.d(tag, s);
}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
// onError()
@Override public void call(Throwable throwable) {
// Error handling
}
};
Action0 onCompletedAction = new Action0() {
// onCompleted()
@Override public void call() {
Log.d(tag, "completed");
}
};
// 自動創(chuàng)建 Subscriber 床蜘,并使用 onNextAction 來定義 onNext()
observable.subscribe(onNextAction);
// 自動創(chuàng)建 Subscriber ,并使用 onNextAction 和 onErrorAction 來定義 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自動創(chuàng)建 Subscriber 蔑水,并使用 onNextAction邢锯、 onErrorAction 和 onCompletedAction 來定義 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
簡單解釋一下這段代碼中出現(xiàn)的 Action1和 Action0搀别。Action0是 RxJava 的一個接口弹囚,它只有一個方法 call()
,這個方法是無參無返回值的领曼;由于 onCompleted()方法也是無參無返回值的鸥鹉,因此 Action0可以被當(dāng)成一個包裝對象,將 onCompleted()的內(nèi)容打包起來將自己作為一個參數(shù)傳入 subscribe()以實(shí)現(xiàn)不完整定義的回調(diào)庶骄。這樣其實(shí)也可以看做將 onCompleted()方法作為參數(shù)傳進(jìn)了subscribe()毁渗,相當(dāng)于其他某些語言中的『閉包』。 Action1也是一個接口单刁,它同樣只有一個方法 call(T param)灸异,這個方法也無返回值,但有一個參數(shù)羔飞;與 Action0同理肺樟,由于 onNext(T obj)和 onError(Throwable error)也是單參數(shù)無返回值的,因此 Action1可以將onNext(obj)和 onError(error)打包起來傳入 subscribe()以實(shí)現(xiàn)不完整定義的回調(diào)逻淌。事實(shí)上么伯,雖然 Action0和 Action1在 API 中使用最廣泛,但 RxJava 是提供了多個 ActionX形式的接口 (例如 Action2, Action3) 的卡儒,它們可以被用以包裝不同的無返回值的方法田柔。
注:正如前面所提到的俐巴,Observer和 Subscriber具有相同的角色,而且 Observer在 subscribe()過程中最終會被轉(zhuǎn)換成 Subscriber對象硬爆,因此欣舵,從這里開始,后面的描述我將用 Subscriber來代替 Observer缀磕,這樣更加嚴(yán)謹(jǐn)缘圈。
4) 場景示例
下面舉兩個例子:
為了把原理用更清晰的方式表述出來,本文中挑選的都是功能盡可能簡單的例子袜蚕,以至于有些示例代碼看起來會有『畫蛇添足』『明明不用 RxJava 可以更簡便地解決問題』的感覺糟把。當(dāng)你看到這種情況,不要覺得是因?yàn)?RxJava 太啰嗦廷没,而是因?yàn)樵谶^早的時候舉出真實(shí)場景的例子并不利于原理的解析糊饱,因此我刻意挑選了簡單的情景。
a. 打印字符串?dāng)?shù)組
將字符串?dāng)?shù)組 names
中的所有字符串依次打印出來:
String[] names = ...;
Observable.from(names)
.subscribe(new Action1<String>() {
@Override public void call(String name) {
Log.d(tag, name);
}
});
b. 由 id 取得圖片并顯示
由指定的一個 drawable 文件 id drawableRes取得圖片颠黎,并顯示在 ImageView中另锋,并在出現(xiàn)異常的時候打印 Toast 報錯:
int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe<Drawable>() {
@Override public void call(Subscriber<? super Drawable> subscriber) {
Drawable drawable = getTheme().getDrawable(drawableRes));
subscriber.onNext(drawable); subscriber.onCompleted();
}
}).subscribe(new Observer<Drawable>() {
@Override public void onNext(Drawable drawable) {
imageView.setImageDrawable(drawable);
}
@Override public void onCompleted() {
}
@Override public void onError(Throwable e) {
Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
}
});
正如上面兩個例子這樣,創(chuàng)建出 Observable和 Subscriber狭归,再用 subscribe()將它們串起來夭坪,一次 RxJava 的基本使用就完成了。非常簡單过椎。
然而室梅,
在 RxJava 的默認(rèn)規(guī)則中,事件的發(fā)出和消費(fèi)都是在同一個線程的疚宇。也就是說亡鼠,如果只用上面的方法,實(shí)現(xiàn)出來的只是一個同步的觀察者模式敷待。觀察者模式本身的目的就是『后臺處理间涵,前臺回調(diào)』的異步機(jī)制,因此異步對于 RxJava 是至關(guān)重要的榜揖。而要實(shí)現(xiàn)異步勾哩,則需要用到 RxJava 的另一個概念: Scheduler
。
3. 線程控制 —— Scheduler (一)
在不指定線程的情況下举哟, RxJava 遵循的是線程不變的原則思劳,即:在哪個線程調(diào)用 subscribe()
,就在哪個線程生產(chǎn)事件妨猩;在哪個線程生產(chǎn)事件潜叛,就在哪個線程消費(fèi)事件。如果需要切換線程册赛,就需要用到 Scheduler
(調(diào)度器)钠导。
1) Scheduler 的 API (一)
在RxJava 中震嫉,Scheduler——調(diào)度器森瘪,相當(dāng)于線程控制器牡属,RxJava 通過它來指定每一段代碼應(yīng)該運(yùn)行在什么樣的線程。RxJava 已經(jīng)內(nèi)置了幾個 Scheduler扼睬,它們已經(jīng)適合大多數(shù)的使用場景:
Schedulers.immediate(): 直接在當(dāng)前線程運(yùn)行逮栅,相當(dāng)于不指定線程。這是默認(rèn)的 Scheduler窗宇。
Schedulers.newThread(): 總是啟用新線程措伐,并在新線程執(zhí)行操作。
Schedulers.io(): I/O 操作(讀寫文件军俊、讀寫數(shù)據(jù)庫侥加、網(wǎng)絡(luò)信息交互等)所使用的 Scheduler。行為模式和newThread()差不多粪躬,區(qū)別在于 io()的內(nèi)部實(shí)現(xiàn)是是用一個無數(shù)量上限的線程池担败,可以重用空閑的線程,因此多數(shù)情況下 io()比 newThread()更有效率镰官。不要把計(jì)算工作放在 io()中提前,可以避免創(chuàng)建不必要的線程。
Schedulers.computation(): 計(jì)算所使用的 Scheduler泳唠。這個計(jì)算指的是 CPU 密集型計(jì)算狈网,即不會被 I/O 等操作限制性能的操作,例如圖形的計(jì)算笨腥。這個 Scheduler使用的固定的線程池拓哺,大小為 CPU 核數(shù)。不要把 I/O 操作放在computation()中脖母,否則 I/O 操作的等待時間會浪費(fèi)CPU士鸥。另外,Android 還有一個專用的AndroidSchedulers.mainThread()镶奉,它指定的操作將在 Android 主線程運(yùn)行础淤。
有了這幾個 Scheduler,就可以使用 subscribeOn() 和 observeOn()兩個方法來對線程進(jìn)行控制了哨苛。 * subscribeOn()
: 指定subscribe()
所發(fā)生的線程鸽凶,即 Observable.OnSubscribe
被激活時所處的線程〗ㄇ停或者叫做事件產(chǎn)生的線程玻侥。 * observeOn()
: 指定Subscriber
所運(yùn)行在的線程∫谡簦或者叫做事件消費(fèi)的線程凑兰。
文字?jǐn)⑹隹倸w難理解掌桩,上代碼:
Observable.just(1, 2, 3, 4) .subscribeOn(Schedulers.io()) // 指定 subscribe() 發(fā)生在 IO 線程 .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調(diào)發(fā)生在主線程 .subscribe(new Action1<Integer>() { @Override public void call(Integer number) { Log.d(tag, "number:" + number); } });
上面這段代碼中,由于 subscribeOn(Schedulers.io())
的指定姑食,被創(chuàng)建的事件的內(nèi)容 1
波岛、2
、3
音半、4
將會在 IO 線程發(fā)出则拷;而由于observeOn(AndroidScheculers.mainThread()
) 的指定,因此 subscriber
數(shù)字的打印將發(fā)生在主線程 曹鸠。事實(shí)上煌茬,這種在 subscribe()
之前寫上兩句 subscribeOn(Scheduler.io())
和 observeOn(AndroidSchedulers.mainThread())
的使用方式非常常見,它適用于多數(shù)的 『后臺線程取數(shù)據(jù)彻桃,主線程顯示』的程序策略坛善。
而前面提到的由圖片 id 取得圖片并顯示的例子,如果也加上這兩句:
int drawableRes = ...;ImageView imageView = ...;Observable.create(new OnSubscribe<Drawable>() { @Override public void call(Subscriber<? super Drawable> subscriber) { Drawable drawable = getTheme().getDrawable(drawableRes)); subscriber.onNext(drawable); subscriber.onCompleted(); }}).subscribeOn(Schedulers.io()) // 指定 subscribe() 發(fā)生在 IO 線程.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調(diào)發(fā)生在主線程.subscribe(new Observer<Drawable>() { @Override public void onNext(Drawable drawable) { imageView.setImageDrawable(drawable); } @Override public void onCompleted() { } @Override public void onError(Throwable e) { Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show(); }});
那么邻眷,加載圖片將會發(fā)生在 IO 線程眠屎,而設(shè)置圖片則被設(shè)定在了主線程。這就意味著耗溜,即使加載圖片耗費(fèi)了幾十甚至幾百毫秒的時間组力,也不會造成絲毫界面的卡頓。
- Scheduler 的原理 (一)
RxJava 的 Scheduler API 很方便抖拴,也很神奇(加了一句話就把線程切換了燎字,怎么做到的?而且 subscribe()
不是最外層直接調(diào)用的方法嗎阿宅,它竟然也能被指定線程候衍?)。然而 Scheduler 的原理需要放在后面講洒放,因?yàn)樗脑硎且韵乱还?jié)《變換》的原理作為基礎(chǔ)的蛉鹿。
好吧這一節(jié)其實(shí)我屁也沒說,只是為了讓你安心往湿,讓你知道我不是忘了講原理妖异,而是把它放在了更合適的地方。
- 變換
終于要到牛逼的地方了领追,不管你激動不激動他膳,反正我是激動了。
RxJava 提供了對事件序列進(jìn)行變換的支持绒窑,這是它的核心功能之一棕孙,也是大多數(shù)人說『RxJava 真是太好用了』的最大原因。所謂變換,就是將事件序列中的對象或整個序列進(jìn)行加工處理蟀俊,轉(zhuǎn)換成不同的事件或事件序列钦铺。概念說著總是模糊難懂的,來看 API肢预。
- API
首先看一個 map()
的例子:
Observable.just("images/logo.png") // 輸入類型 String .map(new Func1<String, Bitmap>() { @Override public Bitmap call(String filePath) { // 參數(shù)類型 String return getBitmapFromPath(filePath); // 返回類型 Bitmap } }) .subscribe(new Action1<Bitmap>() { @Override public void call(Bitmap bitmap) { // 參數(shù)類型 Bitmap showBitmap(bitmap); } });
這里出現(xiàn)了一個叫做 Func1
的類矛洞。它和 Action1
非常相似,也是 RxJava 的一個接口误甚,用于包裝含有一個參數(shù)的方法缚甩。 Func1
和 Action
的區(qū)別在于谱净, Func1
包裝的是有返回值的方法窑邦。另外,和 ActionX
一樣壕探, FuncX
也有多個冈钦,用于不同參數(shù)個數(shù)的方法。FuncX
和ActionX
的區(qū)別在 FuncX
包裝的是有返回值的方法李请。
可以看到瞧筛,map()
方法將參數(shù)中的 String
對象轉(zhuǎn)換成一個 Bitmap
對象后返回,而在經(jīng)過 map()
方法后导盅,事件的參數(shù)類型也由 String
轉(zhuǎn)為了 Bitmap
较幌。這種直接變換對象并返回的,是最常見的也最容易理解的變換白翻。不過 RxJava 的變換遠(yuǎn)不止這樣乍炉,它不僅可以針對事件對象,還可以針對整個事件隊(duì)列滤馍,這使得 RxJava 變得非常靈活岛琼。我列舉幾個常用的變換:
map()
: 事件對象的直接變換,具體功能上面已經(jīng)介紹過巢株。它是 RxJava 最常用的變換槐瑞。 map()
flatMap()
: 這是一個很有用但非常難理解的變換,因此我決定花多些篇幅來介紹它阁苞。 首先假設(shè)這么一種需求:假設(shè)有一個數(shù)據(jù)結(jié)構(gòu)『學(xué)生』困檩,現(xiàn)在需要打印出一組學(xué)生的名字。實(shí)現(xiàn)方式很簡單:
Student[] students = ...;Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onNext(String name) { Log.d(tag, name); } ...};Observable.from(students) .map(new Func1<Student, String>() { @Override public String call(Student student) { return student.getName(); } }) .subscribe(subscriber);
很簡單那槽。那么再假設(shè):如果要打印出每個學(xué)生所需要修的所有課程的名稱呢悼沿?(需求的區(qū)別在于,每個學(xué)生只有一個名字倦炒,但卻有多個課程显沈。)首先可以這樣實(shí)現(xiàn):
Student[] students = ...;Subscriber<Student> subscriber = new Subscriber<Student>() { @Override public void onNext(Student student) { List<Course> courses = student.getCourses(); for (int i = 0; i < courses.size(); i++) { Course course = courses.get(i); Log.d(tag, course.getName()); } } ...};Observable.from(students) .subscribe(subscriber);
依然很簡單。那么如果我不想在 Subscriber
中使用 for 循環(huán),而是希望 Subscriber
中直接傳入單個的 Course
對象呢(這對于代碼復(fù)用很重要)拉讯?用 map()
顯然是不行的涤浇,因?yàn)?map()
是一對一的轉(zhuǎn)化,而我現(xiàn)在的要求是一對多的轉(zhuǎn)化魔慷。那怎么才能把一個 Student 轉(zhuǎn)化成多個 Course 呢只锭?
這個時候,就需要用 flatMap()
了:
Student[] students = ...;Subscriber<Course> subscriber = new Subscriber<Course>() { @Override public void onNext(Course course) { Log.d(tag, course.getName()); } ...};Observable.from(students) .flatMap(new Func1<Student, Observable<Course>>() { @Override public Observable<Course> call(Student student) { return Observable.from(student.getCourses()); } }) .subscribe(subscriber);
從上面的代碼可以看出院尔, flatMap()
和 map()
有一個相同點(diǎn):它也是把傳入的參數(shù)轉(zhuǎn)化之后返回另一個對象蜻展。但需要注意,和 map()
不同的是邀摆, flatMap()
中返回的是個 Observable
對象纵顾,并且這個 Observable
對象并不是被直接發(fā)送到了 Subscriber
的回調(diào)方法中。flatMap()
的原理是這樣的:1. 使用傳入的事件對象創(chuàng)建一個 Observable
對象栋盹;2. 并不發(fā)送這個 Observable
, 而是將它激活施逾,于是它開始發(fā)送事件;3. 每一個創(chuàng)建出來的 Observable
發(fā)送的事件例获,都被匯入同一個 Observable
汉额,而這個 Observable
負(fù)責(zé)將這些事件統(tǒng)一交給Subscriber
的回調(diào)方法。這三個步驟榨汤,把事件拆成了兩級蠕搜,通過一組新創(chuàng)建的 Observable
將初始的對象『鋪平』之后通過統(tǒng)一路徑分發(fā)了下去。而這個『鋪平』就是 flatMap()
所謂的 flat收壕。
flatMap()
示意圖:
擴(kuò)展:由于可以在嵌套的 Observable
中添加異步代碼妓灌, flatMap()
也常用于嵌套的異步操作,例如嵌套的網(wǎng)絡(luò)請求啼器。示例代碼(Retrofit + RxJava):
networkClient.token() // 返回 Observable<String>旬渠,在訂閱時請求 token,并在響應(yīng)后發(fā)送 token .flatMap(new Func1<String, Observable<Messages>>() { @Override public Observable<Messages> call(String token) { // 返回 Observable<Messages>端壳,在訂閱時請求消息列表告丢,并在響應(yīng)后發(fā)送請求到的消息列表 return networkClient.messages(); } }) .subscribe(new Action1<Messages>() { @Override public void call(Messages messages) { // 處理顯示消息列表 showMessages(messages); } });
傳統(tǒng)的嵌套請求需要使用嵌套的 Callback 來實(shí)現(xiàn)。而通過 flatMap()
损谦,可以把嵌套的請求寫在一條鏈中岖免,從而保持程序邏輯的清晰。
throttleFirst()
: 在每次事件觸發(fā)后的一定時間間隔內(nèi)丟棄新的事件照捡。常用作去抖動過濾颅湘,例如按鈕的點(diǎn)擊監(jiān)聽器:RxView.clickEvents(button) // RxBinding 代碼,后面的文章有解釋 .throttleFirst(500, TimeUnit.MILLISECONDS) // 設(shè)置防抖間隔為 500ms .subscribe(subscriber);
媽媽再也不怕我的用戶手抖點(diǎn)開兩個重復(fù)的界面啦栗精。
此外闯参, RxJava 還提供很多便捷的方法來實(shí)現(xiàn)事件序列的變換瞻鹏,這里就不一一舉例了。
- 變換的原理:lift()
這些變換雖然功能各有不同鹿寨,但實(shí)質(zhì)上都是針對事件序列的處理和再發(fā)送新博。而在 RxJava 的內(nèi)部,它們是基于同一個基礎(chǔ)的變換方法:lift(Operator)
脚草。首先看一下 lift()
的內(nèi)部實(shí)現(xiàn)(僅核心代碼):
// 注意:這不是 lift() 的源碼赫悄,而是將源碼中與性能、兼容性馏慨、擴(kuò)展性有關(guān)的代碼剔除后的核心代碼埂淮。// 如果需要看源碼,可以去 RxJava 的 GitHub 倉庫下載写隶。public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) { return Observable.create(new OnSubscribe<R>() { @Override public void call(Subscriber subscriber) { Subscriber newSubscriber = operator.call(subscriber); newSubscriber.onStart(); onSubscribe.call(newSubscriber); } });}
這段代碼很有意思:它生成了一個新的 Observable
并返回倔撞,而且創(chuàng)建新 Observable
所用的參數(shù) OnSubscribe
的回調(diào)方法 call()
中的實(shí)現(xiàn)竟然看起來和前面講過的 Observable.subscribe()
一樣!然而它們并不一樣喲~不一樣的地方關(guān)鍵就在于第二行onSubscribe.call(subscriber)
中的 onSubscribe
所指代的對象不同(高能預(yù)警:接下來的幾句話可能會導(dǎo)致身體的嚴(yán)重不適)——
subscribe()
中這句話的 onSubscribe
指的是 Observable
中的 onSubscribe
對象樟澜,這個沒有問題误窖,但是 lift()
之后的情況就復(fù)雜了點(diǎn)。
當(dāng)含有 lift()
時: 1.lift()
創(chuàng)建了一個 Observable
后秩贰,加上之前的原始 Observable
,已經(jīng)有兩個 Observable
了柔吼; 2.而同樣地毒费,新 Observable
里的新 OnSubscribe
加上之前的原始 Observable
中的原始 OnSubscribe
,也就有了兩個 OnSubscribe
愈魏; 3.當(dāng)用戶調(diào)用經(jīng)過 lift()
后的 Observable
的 subscribe()
的時候觅玻,使用的是 lift()
所返回的新的 Observable
,于是它所觸發(fā)的onSubscribe.call(subscriber)
培漏,也是用的新 Observable
中的新 OnSubscribe
溪厘,即在 lift()
中生成的那個 OnSubscribe
; 4.而這個新 OnSubscribe
的 call()
方法中的 onSubscribe
牌柄,就是指的原始 Observable
中的原始 OnSubscribe
畸悬,在這個 call()
方法里,新 OnSubscribe
利用 operator.call(subscriber)
生成了一個新的 Subscriber
(Operator
就是在這里珊佣,通過自己的 call()
方法將新 Subscriber
和原始 Subscriber
進(jìn)行關(guān)聯(lián)蹋宦,并插入自己的『變換』代碼以實(shí)現(xiàn)變換),然后利用這個新 Subscriber
向原始Observable
進(jìn)行訂閱咒锻。 這樣就實(shí)現(xiàn)了 lift()
過程冷冗,有點(diǎn)像一種代理機(jī)制,通過事件攔截和處理實(shí)現(xiàn)事件序列的變換惑艇。
精簡掉細(xì)節(jié)的話蒿辙,也可以這么說:在 Observable
執(zhí)行了 lift(Operator)
方法之后,會返回一個新的 Observable
,這個新的 Observable
會像一個代理一樣思灌,負(fù)責(zé)接收原始的 Observable
發(fā)出的事件碰镜,并在處理后發(fā)送給 Subscriber
。
如果你更喜歡具象思維习瑰,可以看圖:
或者可以看動圖:
兩次和多次的 lift()
同理绪颖,如下圖:
舉一個具體的 Operator
的實(shí)現(xiàn)。下面這是一個將事件中的 Integer
對象轉(zhuǎn)換成 String
的例子甜奄,僅供參考:
observable.lift(new Observable.Operator<String, Integer>() { @Override public Subscriber<? super Integer> call(final Subscriber<? super String> subscriber) { // 將事件序列中的 Integer 對象轉(zhuǎn)換為 String 對象 return new Subscriber<Integer>() { @Override public void onNext(Integer integer) { subscriber.onNext("" + integer); } @Override public void onCompleted() { subscriber.onCompleted(); } @Override public void onError(Throwable e) { subscriber.onError(e); } }; }});
講述 lift()
的原理只是為了讓你更好地了解 RxJava 柠横,從而可以更好地使用它。然而不管你是否理解了 lift()
的原理课兄,RxJava 都不建議開發(fā)者自定義 Operator
來直接使用 lift()
牍氛,而是建議盡量使用已有的 lift()
包裝方法(如 map()
flatMap()
等)進(jìn)行組合來實(shí)現(xiàn)需求,因?yàn)橹苯邮褂?lift() 非常容易發(fā)生一些難以發(fā)現(xiàn)的錯誤烟阐。
- compose: 對 Observable 整體的變換
除了 lift()
之外搬俊, Observable
還有一個變換方法叫做 compose(Transformer)
。它和 lift()
的區(qū)別在于蜒茄, lift()
是針對事件項(xiàng)和事件序列的唉擂,而 compose()
是針對 Observable
自身進(jìn)行變換。舉個例子檀葛,假設(shè)在程序中有多個 Observable
玩祟,并且他們都需要應(yīng)用一組相同的 lift()
變換。你可以這么寫:
observable1 .lift1() .lift2() .lift3() .lift4() .subscribe(subscriber1);observable2 .lift1() .lift2() .lift3() .lift4() .subscribe(subscriber2);observable3 .lift1() .lift2() .lift3() .lift4() .subscribe(subscriber3);observable4 .lift1() .lift2() .lift3() .lift4() .subscribe(subscriber1);
你覺得這樣太不軟件工程了屿聋,于是你改成了這樣:
private Observable liftAll(Observable observable) { return observable .lift1() .lift2() .lift3() .lift4();}...liftAll(observable1).subscribe(subscriber1);liftAll(observable2).subscribe(subscriber2);liftAll(observable3).subscribe(subscriber3);liftAll(observable4).subscribe(subscriber4);
可讀性空扎、可維護(hù)性都提高了∪蠹ィ可是 Observable
被一個方法包起來转锈,這種方式對于 Observale
的靈活性似乎還是增添了那么點(diǎn)限制。怎么辦楚殿?這個時候撮慨,就應(yīng)該用 compose()
來解決了:
public class LiftAllTransformer implements Observable.Transformer<Integer, String> { @Override public Observable<String> call(Observable<Integer> observable) { return observable .lift1() .lift2() .lift3() .lift4(); }}...Transformer liftAll = new LiftAllTransformer();observable1.compose(liftAll).subscribe(subscriber1);observable2.compose(liftAll).subscribe(subscriber2);observable3.compose(liftAll).subscribe(subscriber3);observable4.compose(liftAll).subscribe(subscriber4);
像上面這樣,使用 compose()
方法勒魔,Observable
可以利用傳入的 Transformer
對象的 call
方法直接對自身進(jìn)行處理甫煞,也就不必被包在方法的里面了。
compose()
的原理比較簡單冠绢,不附圖嘍陡鹃。
- 線程控制:Scheduler (二)
除了靈活的變換猖毫,RxJava 另一個牛逼的地方萝毛,就是線程的自由控制。
- Scheduler 的 API (二)
前面講到了喊式,可以利用 subscribeOn()
結(jié)合 observeOn()
來實(shí)現(xiàn)線程控制,讓事件的產(chǎn)生和消費(fèi)發(fā)生在不同的線程萧朝〔砹簦可是在了解了map()
flatMap()
等變換方法后,有些好事的(其實(shí)就是當(dāng)初剛接觸 RxJava 時的我)就問了:能不能多切換幾次線程检柬?
答案是:能献联。因?yàn)?observeOn()
指定的是 Subscriber
的線程,而這個 Subscriber
并不是(嚴(yán)格說應(yīng)該為『不一定是』何址,但這里不妨理解為『不是』)subscribe()
參數(shù)中的 Subscriber
里逆,而是 observeOn()
執(zhí)行時的當(dāng)前 Observable
所對應(yīng)的 Subscriber
,即它的直接下級 Subscriber
用爪。換句話說原押,observeOn()
指定的是它之后的操作所在的線程。因此如果有多次切換線程的需求偎血,只要在每個想要切換線程的位置調(diào)用一次 observeOn()
即可诸衔。上代碼:
Observable.just(1, 2, 3, 4) // IO 線程,由 subscribeOn() 指定 .subscribeOn(Schedulers.io()) .observeOn(Schedulers.newThread()) .map(mapOperator) // 新線程颇玷,由 observeOn() 指定 .observeOn(Schedulers.io()) .map(mapOperator2) // IO 線程笨农,由 observeOn() 指定 .observeOn(AndroidSchedulers.mainThread) .subscribe(subscriber); // Android 主線程,由 observeOn() 指定
如上亚隙,通過 observeOn()
的多次調(diào)用磁餐,程序?qū)崿F(xiàn)了線程的多次切換。
不過阿弃,不同于 observeOn()
, subscribeOn()
的位置放在哪里都可以羞延,但它是只能調(diào)用一次的渣淳。
又有好事的(其實(shí)還是當(dāng)初的我)問了:如果我非要調(diào)用多次 subscribeOn()
呢?會有什么效果伴箩?
這個問題先放著入愧,我們還是從 RxJava 線程控制的原理說起吧。
-
Scheduler 的原理(二)
其實(shí)嗤谚, subscribeOn()
和 observeOn()
的內(nèi)部實(shí)現(xiàn)棺蛛,也是用的 lift()
。具體看圖(不同顏色的箭頭表示不同的線程):
subscribeOn()
原理圖:
observeOn()
原理圖:
從圖中可以看出巩步,subscribeOn()
和 observeOn()
都做了線程切換的工作(圖中的 "schedule..." 部位)旁赊。不同的是, subscribeOn()
的線程切換發(fā)生在 OnSubscribe
中椅野,即在它通知上一級 OnSubscribe
時终畅,這時事件還沒有開始發(fā)送籍胯,因此 subscribeOn()
的線程控制可以從事件發(fā)出的開端就造成影響;而 observeOn()
的線程切換則發(fā)生在它內(nèi)建的 Subscriber
中离福,即發(fā)生在它即將給下一級 Subscriber
發(fā)送事件時杖狼,因此 observeOn()
控制的是它后面的線程。
最后妖爷,我用一張圖來解釋當(dāng)多個 subscribeOn()
和 observeOn()
混合使用時蝶涩,線程調(diào)度是怎么發(fā)生的(由于圖中對象較多,相對于上面的圖對結(jié)構(gòu)做了一些簡化調(diào)整):
圖中共有 5 處含有對事件的操作絮识。由圖中可以看出绿聘,①和②兩處受第一個 subscribeOn()
影響,運(yùn)行在紅色線程笋除;③和④處受第一個observeOn()
的影響斜友,運(yùn)行在綠色線程;⑤處受第二個 onserveOn()
影響垃它,運(yùn)行在紫色線程鲜屏;而第二個 subscribeOn()
,由于在通知過程中線程就被第一個 subscribeOn()
截斷国拇,因此對整個流程并沒有任何影響洛史。這里也就回答了前面的問題:當(dāng)使用了多個 subscribeOn()
的時候,只有第一個 subscribeOn()
起作用酱吝。 - 延伸:doOnSubscribe()
然而也殖,雖然超過一個的 subscribeOn()
對事件處理的流程沒有影響,但在流程之前卻是可以利用的务热。
在前面講 Subscriber
的時候忆嗜,提到過 Subscriber
的 onStart()
可以用作流程開始前的初始化。然而 onStart()
由于在 subscribe()
發(fā)生時就被調(diào)用了崎岂,因此不能指定線程捆毫,而是只能執(zhí)行在 subscribe()
被調(diào)用時的線程。這就導(dǎo)致如果 onStart()
中含有對線程有要求的代碼(例如在界面上顯示一個 ProgressBar冲甘,這必須在主線程執(zhí)行)绩卤,將會有線程非法的風(fēng)險,因?yàn)橛袝r你無法預(yù)測 subscribe()
將會在什么線程執(zhí)行江醇。
而與 Subscriber.onStart()
相對應(yīng)的濒憋,有一個方法 Observable.doOnSubscribe()
。它和 Subscriber.onStart()
同樣是在 subscribe()
調(diào)用后而且在事件發(fā)送前執(zhí)行陶夜,但區(qū)別在于它可以指定線程凛驮。默認(rèn)情況下, doOnSubscribe()
執(zhí)行在 subscribe()
發(fā)生的線程律适;而如果在 doOnSubscribe()
之后有 subscribeOn()
的話辐烂,它將執(zhí)行在離它最近的 subscribeOn()
所指定的線程遏插。
示例代碼:
Observable.create(onSubscribe) .subscribeOn(Schedulers.io()) .doOnSubscribe(new Action0() { @Override public void call() { progressBar.setVisibility(View.VISIBLE); // 需要在主線程執(zhí)行 } }) .subscribeOn(AndroidSchedulers.mainThread()) // 指定主線程 .observeOn(AndroidSchedulers.mainThread()) .subscribe(subscriber);
如上,在 doOnSubscribe()
的后面跟一個 subscribeOn()
纠修,就能指定準(zhǔn)備工作的線程了胳嘲。
RxJava 的適用場景和使用方式
- 與 Retrofit 的結(jié)合
Retrofit 是 Square 的一個著名的網(wǎng)絡(luò)請求庫。沒有用過 Retrofit 的可以選擇跳過這一小節(jié)也沒關(guān)系扣草,我舉的每種場景都只是個例子了牛,而且例子之間并無前后關(guān)聯(lián),只是個拋磚引玉的作用辰妙,所以你跳過這里看別的場景也可以的鹰祸。
Retrofit 除了提供了傳統(tǒng)的 Callback
形式的 API,還有 RxJava 版本的 Observable
形式 API密浑。下面我用對比的方式來介紹 Retrofit 的 RxJava 版 API 和傳統(tǒng)版本的區(qū)別蛙婴。
以獲取一個 User
對象的接口作為例子。使用Retrofit 的傳統(tǒng) API尔破,你可以用這樣的方式來定義請求:
@GET("/user")public void getUser(@Query("userId") String userId, Callback<User> callback);
在程序的構(gòu)建過程中街图, Retrofit 會把自動把方法實(shí)現(xiàn)并生成代碼,然后開發(fā)者就可以利用下面的方法來獲取特定用戶并處理響應(yīng):
getUser(userId, new Callback<User>() { @Override public void success(User user) { userView.setUser(user); } @Override public void failure(RetrofitError error) { // Error handling ... }};
而使用 RxJava 形式的 API懒构,定義同樣的請求是這樣的:
@GET("/user")public Observable<User> getUser(@Query("userId") String userId);
使用的時候是這樣的:
getUser(userId) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<User>() { @Override public void onNext(User user) { userView.setUser(user); } @Override public void onCompleted() { } @Override public void onError(Throwable error) { // Error handling ... } });
看到區(qū)別了嗎餐济?
當(dāng) RxJava 形式的時候,Retrofit 把請求封裝進(jìn) Observable
胆剧,在請求結(jié)束后調(diào)用 onNext()
或在請求失敗后調(diào)用 onError()
絮姆。
對比來看, Callback
形式和 Observable
形式長得不太一樣秩霍,但本質(zhì)都差不多篙悯,而且在細(xì)節(jié)上 Observable
形式似乎還比 Callback
形式要差點(diǎn)。那 Retrofit 為什么還要提供 RxJava 的支持呢铃绒?
因?yàn)樗糜冒辕近。倪@個例子看不出來是因?yàn)檫@只是最簡單的情況匿垄。而一旦情景復(fù)雜起來, Callback
形式馬上就會開始讓人頭疼归粉。比如:
假設(shè)這么一種情況:你的程序取到的 User
并不應(yīng)該直接顯示椿疗,而是需要先與數(shù)據(jù)庫中的數(shù)據(jù)進(jìn)行比對和修正后再顯示。使用 Callback
方式大概可以這么寫:
getUser(userId, new Callback<User>() { @Override public void success(User user) { processUser(user); // 嘗試修正 User 數(shù)據(jù) userView.setUser(user); } @Override public void failure(RetrofitError error) { // Error handling ... }};
有問題嗎糠悼?
很簡便届榄,但不要這樣做。為什么倔喂?因?yàn)檫@樣做會影響性能铝条。數(shù)據(jù)庫的操作很重靖苇,一次讀寫操作花費(fèi) 10~20ms 是很常見的,這樣的耗時很容易造成界面的卡頓班缰。所以通常情況下贤壁,如果可以的話一定要避免在主線程中處理數(shù)據(jù)庫。所以為了提升性能埠忘,這段代碼可以優(yōu)化一下:
getUser(userId, new Callback<User>() { @Override public void success(User user) { new Thread() { @Override public void run() { processUser(user); // 嘗試修正 User 數(shù)據(jù) runOnUiThread(new Runnable() { // 切回 UI 線程 @Override public void run() { userView.setUser(user); } }); }).start(); } @Override public void failure(RetrofitError error) { // Error handling ... }};
性能問題解決脾拆,但……這代碼實(shí)在是太亂了,迷之縮進(jìn)坝ǘ省名船!雜亂的代碼往往不僅僅是美觀問題,因?yàn)榇a越亂往往就越難讀懂旨怠,而如果項(xiàng)目中充斥著雜亂的代碼渠驼,無疑會降低代碼的可讀性,造成團(tuán)隊(duì)開發(fā)效率的降低和出錯率的升高鉴腻。
這時候迷扇,如果用 RxJava 的形式,就好辦多了拘哨。 RxJava 形式的代碼是這樣的:
getUser(userId) .doOnNext(new Action1<User>() { @Override public void call(User user) { processUser(user); }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<User>() { @Override public void onNext(User user) { userView.setUser(user); } @Override public void onCompleted() { } @Override public void onError(Throwable error) { // Error handling ... } });
后臺代碼和前臺代碼全都寫在一條鏈中谋梭,明顯清晰了很多。
再舉一個例子:假設(shè) /user
接口并不能直接訪問倦青,而需要填入一個在線獲取的 token
瓮床,代碼應(yīng)該怎么寫?
Callback
方式产镐,可以使用嵌套的 Callback
:
@GET("/token")public void getToken(Callback<String> callback);@GET("/user")public void getUser(@Query("token") String token, @Query("userId") String userId, Callback<User> callback);...getToken(new Callback<String>() { @Override public void success(String token) { getUser(token, userId, new Callback<User>() { @Override public void success(User user) { userView.setUser(user); } @Override public void failure(RetrofitError error) { // Error handling ... } }; } @Override public void failure(RetrofitError error) { // Error handling ... }});
倒是沒有什么性能問題隘庄,可是迷之縮進(jìn)毀一生,你懂我也懂癣亚,做過大項(xiàng)目的人應(yīng)該更懂丑掺。
而使用 RxJava 的話,代碼是這樣的:
@GET("/token")public Observable<String> getToken();@GET("/user")public Observable<User> getUser(@Query("token") String token, @Query("userId") String userId);...getToken() .flatMap(new Func1<String, Observable<User>>() { @Override public Observable<User> onNext(String token) { return getUser(token, userId); }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<User>() { @Override public void onNext(User user) { userView.setUser(user); } @Override public void onCompleted() { } @Override public void onError(Throwable error) { // Error handling ... } });
用一個 flatMap()
就搞定了邏輯述雾,依然是一條鏈街州。看著就很爽玻孟,是吧唆缴?
2016/03/31 更新,加上我寫的一個 Sample 項(xiàng)目: rengwuxian RxJava Samples
好黍翎,Retrofit 部分就到這里面徽。
- RxBinding
RxBinding 是 Jake Wharton 的一個開源庫,它提供了一套在 Android 平臺上的基于 RxJava 的 Binding API。所謂 Binding趟紊,就是類似設(shè)置OnClickListener
氮双、設(shè)置 TextWatcher
這樣的注冊綁定對象的 API。
舉個設(shè)置點(diǎn)擊監(jiān)聽的例子霎匈。使用 RxBinding
戴差,可以把事件監(jiān)聽用這樣的方法來設(shè)置:
Button button = ...;RxView.clickEvents(button) // 以 Observable 形式來反饋點(diǎn)擊事件 .subscribe(new Action1<ViewClickEvent>() { @Override public void call(ViewClickEvent event) { // Click handling } });
看起來除了形式變了沒什么區(qū)別,實(shí)質(zhì)上也是這樣唧躲。甚至如果你看一下它的源碼造挽,你會發(fā)現(xiàn)它連實(shí)現(xiàn)都沒什么驚喜:它的內(nèi)部是直接用一個包裹著的 setOnClickListener()
來實(shí)現(xiàn)的。然而弄痹,僅僅這一個形式的改變饭入,卻恰好就是 RxBinding
的目的:擴(kuò)展性。通過 RxBinding
把點(diǎn)擊監(jiān)聽轉(zhuǎn)換成 Observable
之后肛真,就有了對它進(jìn)行擴(kuò)展的可能谐丢。擴(kuò)展的方式有很多,根據(jù)需求而定蚓让。一個例子是前面提到過的throttleFirst()
乾忱,用于去抖動,也就是消除手抖導(dǎo)致的快速連環(huán)點(diǎn)擊:
RxView.clickEvents(button) .throttleFirst(500, TimeUnit.MILLISECONDS) .subscribe(clickAction);
如果想對 RxBinding
有更多了解历极,可以去它的 GitHub 項(xiàng)目 下面看看窄瘟。
- 各種異步操作
前面舉的 Retrofit
和 RxBinding
的例子,是兩個可以提供現(xiàn)成的 Observable
的庫趟卸。而如果你有某些異步操作無法用這些庫來自動生成Observable
蹄葱,也完全可以自己寫。例如數(shù)據(jù)庫的讀寫锄列、大圖片的載入图云、文件壓縮/解壓等各種需要放在后臺工作的耗時操作,都可以用 RxJava 來實(shí)現(xiàn)邻邮,有了之前幾章的例子竣况,這里應(yīng)該不用再舉例了。 - RxBus
RxBus 名字看起來像一個庫筒严,但它并不是一個庫丹泉,而是一種模式,它的思想是使用 RxJava 來實(shí)現(xiàn)了 EventBus 鸭蛙,而讓你不再需要使用Otto
或者 GreenRobot 的 EventBus
嘀掸。至于什么是 RxBus,可以看這篇文章规惰。順便說一句,F(xiàn)lipboard 已經(jīng)用 RxBus 替換掉了 Otto
泉蝌,目前為止沒有不良反應(yīng)歇万。
最后
對于 Android 開發(fā)者來說揩晴, RxJava 是一個很難上手的庫,因?yàn)樗鼘τ?Android 開發(fā)者來說有太多陌生的概念了贪磺×蚶迹可是它真的很牛逼。因此寒锚,我寫了這篇《給 Android 開發(fā)者的 RxJava 詳解》劫映,希望能給始終搞不明白什么是 RxJava 的人一些入門的指引,或者能讓正在使用 RxJava 但仍然心存疑惑的人看到一些更深入的解析刹前。無論如何泳赋,只要能給各位同為 Android 工程師的你們提供一些幫助,這篇文章的目的就達(dá)到了喇喉。
再次感謝對這篇文章的產(chǎn)出提供支持的各位: 技術(shù)支持:流火楓林 內(nèi)測讀者:代碼家祖今、鮑永章、drakeet拣技、馬琳千诬、有時放縱、程序亦非猿膏斤、大頭鬼徐绑、XZoomEye、席德雨莫辨、TCahead傲茄、Tiiime、Ailurus衔掸、宅學(xué)長烫幕、妖孽、大大大大大臣哥敞映、NicodeLee 贊助方:周伯通招聘 是他們讓我的文章能夠以不那么丑陋的樣子出現(xiàn)在大家面前较曼。
關(guān)于作者
朱凱(扔物線),Flipboard 北京 Android 工程師振愿。
微博:扔物線
GitHub:rengwuxian
為什么寫這個捷犹?
與兩三年前的境況不同,中國現(xiàn)在已經(jīng)不缺初級 Android 工程師冕末,但中級和高級工程師嚴(yán)重供不應(yīng)求萍歉。因此我決定從今天開始不定期地發(fā)布我的技術(shù)分享,只希望能夠和大家共同提升档桃,通過我們的成長來解決一點(diǎn)點(diǎn)國內(nèi)互聯(lián)網(wǎng)公司人才稀缺的困境枪孩,也提升各位技術(shù)黨的收入。所以,不僅要寫這篇蔑舞,我還會寫更多拒担。至于內(nèi)容的定位,我計(jì)劃只定位真正的干貨攻询,一些邊邊角角的小技巧和炫酷的黑科技應(yīng)該都不會寫从撼,總之希望每篇文章都能幫讀者提升真正的實(shí)力。