在正文開(kāi)始之前的最后,放上 GitHub 鏈接和引入依賴(lài)的 gradle 代碼: Github:
https://github.com/ReactiveX/RxJava
https://github.com/ReactiveX/RxAndroid
引入依賴(lài):
compile 'io.reactivex:rxjava:1.0.14'
compile 'io.reactivex:rxandroid:1.0.1'
RxJava 到底是什么
一個(gè)詞:異步。
RxJava 在 GitHub 主頁(yè)上的自我介紹是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一個(gè)在 Java VM 上使用可觀測(cè)的序列來(lái)組成異步的派诬、基于事件的程序的庫(kù))。這就是 RxJava 父叙,概括得非常精準(zhǔn)碎浇。
然而腾它,對(duì)于初學(xué)者來(lái)說(shuō)力惯,這太難看懂了碗誉。因?yàn)樗且粋€(gè)『總結(jié)』,而初學(xué)者更需要一個(gè)『引言』父晶。
其實(shí)诗充, RxJava 的本質(zhì)可以壓縮為異步這一個(gè)詞。說(shuō)到根上诱建,它就是一個(gè)實(shí)現(xiàn)異步操作的庫(kù),而別的定語(yǔ)都是基于這之上的碟绑。
RxJava 好在哪
換句話說(shuō)俺猿,『同樣是做異步,為什么人們用它格仲,而不用現(xiàn)成的 AsyncTask / Handler / XXX / ... 押袍?』
一個(gè)詞:簡(jiǎn)潔。
異步操作很關(guān)鍵的一點(diǎn)是程序的簡(jiǎn)潔性凯肋,因?yàn)樵谡{(diào)度過(guò)程比較復(fù)雜的情況下谊惭,異步代碼經(jīng)常會(huì)既難寫(xiě)也難被讀懂。 Android 創(chuàng)造的 AsyncTask 和Handler 侮东,其實(shí)都是為了讓異步代碼更加簡(jiǎn)潔圈盔。RxJava 的優(yōu)勢(shì)也是簡(jiǎn)潔,但它的簡(jiǎn)潔的與眾不同之處在于悄雅,隨著程序邏輯變得越來(lái)越復(fù)雜驱敲,它依然能夠保持簡(jiǎn)潔。
假設(shè)有這樣一個(gè)需求:界面上有一個(gè)自定義的視圖imageCollectorView宽闲,它的作用是顯示多張圖片众眨,并能使用addImage(Bitmap)方法來(lái)任意增加顯示的圖片。現(xiàn)在需要程序?qū)⒁粋€(gè)給出的目錄數(shù)組File[] folders中每個(gè)目錄下的 png 圖片都加載出來(lái)并顯示在imageCollectorView中容诬。需要注意的是娩梨,由于讀取圖片的這一過(guò)程較為耗時(shí),需要放在后臺(tái)執(zhí)行览徒,而圖片的顯示則必須在 UI 線程執(zhí)行狈定。常用的實(shí)現(xiàn)方式有多種,我這里貼出其中一種:
newThread() {@Overridepublicvoidrun(){super.run();for(File folder : folders) {? ? ? ? ? ? File[] files = folder.listFiles();for(File file : files) {if(file.getName().endsWith(".png")) {finalBitmap bitmap = getBitmapFromFile(file);? ? ? ? ? ? ? ? ? ? getActivity().runOnUiThread(newRunnable() {@Overridepublicvoidrun(){? ? ? ? ? ? ? ? ? ? ? ? ? ? imageCollectorView.addImage(bitmap);? ? ? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? ? ? });? ? ? ? ? ? ? ? }? ? ? ? ? ? }? ? ? ? }? ? }}.start();
而如果使用 RxJava 习蓬,實(shí)現(xiàn)方式是這樣的:
Observable.from(folders)? ? .flatMap(newFunc1>() {@OverridepublicObservablecall(File file){returnObservable.from(file.listFiles());? ? ? ? }? ? })? ? .filter(newFunc1() {@OverridepublicBooleancall(File file){returnfile.getName().endsWith(".png");? ? ? ? }? ? })? ? .map(newFunc1() {@OverridepublicBitmapcall(File file){returngetBitmapFromFile(file);? ? ? ? }? ? })? ? .subscribeOn(Schedulers.io())? ? .observeOn(AndroidSchedulers.mainThread())? ? .subscribe(newAction1() {@Overridepublicvoidcall(Bitmap bitmap){? ? ? ? ? ? imageCollectorView.addImage(bitmap);? ? ? ? }? ? });
那位說(shuō)話了:『你這代碼明明變多了暗г厘托!簡(jiǎn)潔個(gè)毛啊稿湿!』大兄弟你消消氣铅匹,我說(shuō)的是邏輯的簡(jiǎn)潔,不是單純的代碼量少(邏輯簡(jiǎn)潔才是提升讀寫(xiě)代碼速度的必殺技對(duì)不饺藤?)包斑。觀察一下你會(huì)發(fā)現(xiàn), RxJava 的這個(gè)實(shí)現(xiàn)涕俗,是一條從上到下的鏈?zhǔn)秸{(diào)用罗丰,沒(méi)有任何嵌套,這在邏輯的簡(jiǎn)潔性上是具有優(yōu)勢(shì)的再姑。當(dāng)需求變得復(fù)雜時(shí)萌抵,這種優(yōu)勢(shì)將更加明顯(試想如果還要求只選取前 10 張圖片,常規(guī)方式要怎么辦元镀?如果有更多這樣那樣的要求呢绍填?再試想,在這一大堆需求實(shí)現(xiàn)完兩個(gè)月之后需要改功能栖疑,當(dāng)你翻回這里看到自己當(dāng)初寫(xiě)下的那一片迷之縮進(jìn)讨永,你能保證自己將迅速看懂,而不是對(duì)著代碼重新捋一遍思路遇革?)卿闹。
另外,如果你的 IDE 是 Android Studio 萝快,其實(shí)每次打開(kāi)某個(gè) Java 文件的時(shí)候锻霎,你會(huì)看到被自動(dòng) Lambda 化的預(yù)覽,這將讓你更加清晰地看到程序邏輯:
Observable.from(folders)? ? .flatMap((Func1) (folder) -> { Observable.from(file.listFiles()) })? ? .filter((Func1) (file) -> { file.getName().endsWith(".png") })? ? .map((Func1) (file) -> { getBitmapFromFile(file) })? ? .subscribeOn(Schedulers.io())? ? .observeOn(AndroidSchedulers.mainThread())? ? .subscribe((Action1) (bitmap) -> { imageCollectorView.addImage(bitmap) });
如果你習(xí)慣使用 Retrolambda 揪漩,你也可以直接把代碼寫(xiě)成上面這種簡(jiǎn)潔的形式量窘。而如果你看到這里還不知道什么是 Retrolambda ,我不建議你現(xiàn)在就去學(xué)習(xí)它氢拥。原因有兩點(diǎn):1. Lambda 是把雙刃劍蚌铜,它讓你的代碼簡(jiǎn)潔的同時(shí),降低了代碼的可讀性嫩海,因此同時(shí)學(xué)習(xí) RxJava 和 Retrolambda 可能會(huì)讓你忽略 RxJava 的一些技術(shù)細(xì)節(jié)冬殃;2. Retrolambda 是 Java 6/7 對(duì) Lambda 表達(dá)式的非官方兼容方案,它的向后兼容性和穩(wěn)定性是無(wú)法保障的叁怪,因此對(duì)于企業(yè)項(xiàng)目审葬,使用 Retrolambda 是有風(fēng)險(xiǎn)的。所以,與很多 RxJava 的推廣者不同涣觉,我并不推薦在學(xué)習(xí) RxJava 的同時(shí)一起學(xué)習(xí) Retrolambda痴荐。
API 介紹和原理簡(jiǎn)析
這個(gè)我就做不到一個(gè)詞說(shuō)明了……因?yàn)檫@一節(jié)的主要內(nèi)容就是一步步地說(shuō)明 RxJava 到底怎樣做到了異步,怎樣做到了簡(jiǎn)潔官册。
1. 概念:擴(kuò)展的觀察者模式
RxJava 的異步實(shí)現(xiàn)生兆,是通過(guò)一種擴(kuò)展的觀察者模式來(lái)實(shí)現(xiàn)的。
觀察者模式
先簡(jiǎn)述一下觀察者模式膝宁,已經(jīng)熟悉的可以跳過(guò)這一段鸦难。
觀察者模式面向的需求是:A 對(duì)象(觀察者)對(duì) B 對(duì)象(被觀察者)的某種變化高度敏感,需要在 B 變化的一瞬間做出反應(yīng)员淫。舉個(gè)例子合蔽,新聞里喜聞樂(lè)見(jiàn)的警察抓小偷,警察需要在小偷伸手作案的時(shí)候?qū)嵤┳ゲ督榉怠T谶@個(gè)例子里拴事,警察是觀察者,小偷是被觀察者圣蝎,警察需要時(shí)刻盯著小偷的一舉一動(dòng)刃宵,才能保證不會(huì)漏過(guò)任何瞬間。程序的觀察者模式和這種真正的『觀察』略有不同捅彻,觀察者不需要時(shí)刻盯著被觀察者(例如 A 不需要每過(guò) 2ms 就檢查一次 B 的狀態(tài)),而是采用注冊(cè)(Register)或者稱(chēng)為訂閱(Subscribe)的方式鞍陨,告訴被觀察者:我需要你的某某狀態(tài)步淹,你要在它變化的時(shí)候通知我。
Android 開(kāi)發(fā)中一個(gè)比較典型的例子是點(diǎn)擊監(jiān)聽(tīng)器 OnClickListener 诚撵。對(duì)設(shè)置 OnClickListener 來(lái)說(shuō)缭裆, View 是被觀察者, OnClickListener 是觀察者寿烟,二者通過(guò) setOnClickListener() 方法達(dá)成訂閱關(guān)系澈驼。訂閱之后用戶點(diǎn)擊按鈕的瞬間,Android Framework 就會(huì)將點(diǎn)擊事件發(fā)送給已經(jīng)注冊(cè)的 OnClickListener 筛武。
采取這樣被動(dòng)的觀察方式缝其,既省去了反復(fù)檢索狀態(tài)的資源消耗,也能夠得到最高的反饋速度徘六。當(dāng)然内边,這也得益于我們可以隨意定制自己程序中的觀察者和被觀察者,而警察叔叔明顯無(wú)法要求小偷『你在作案的時(shí)候務(wù)必通知我』待锈。
OnClickListener 的模式大致如下圖:
OnClickListener 觀察者模式
如圖所示漠其,通過(guò) setOnClickListener()方法,Button持有 OnClickListener的引用(這一過(guò)程沒(méi)有在圖上畫(huà)出);當(dāng)用戶點(diǎn)擊時(shí)和屎,Button自動(dòng)調(diào)用 OnClickListener的 onClick()方法拴驮。另外,如果把這張圖中的概念抽象出來(lái)
Button-> 被觀察者柴信、OnClickListener-> 觀察套啤、setOnClickListener()
-> 訂閱,onClick()-> 事件)
就由專(zhuān)用的觀察者模式(例如只用于監(jiān)聽(tīng)控件點(diǎn)擊)轉(zhuǎn)變成了通用的觀察者模式颠印。如下圖:而 RxJava 作為一個(gè)工具庫(kù)纲岭,使用的就是通用形式的觀察者模式。
通用觀察者模式
RxJava 的觀察者模式
RxJava 有四個(gè)基本概念:Observable (可觀察者线罕,即被觀察者)止潮、Observer (觀察者)、subscribe (訂閱)钞楼、事件喇闸。
Observable 和 Observer 通過(guò) subscribe() 方法實(shí)現(xiàn)訂閱關(guān)系,從而 Observable 可以在需要的時(shí)候發(fā)出事件來(lái)通知 Observer询件。
與傳統(tǒng)觀察者模式不同燃乍, RxJava 的事件回調(diào)方法除了普通事件 onNext() (相當(dāng)于 onClick() / onEvent())之外,還定義了兩個(gè)特殊的事件:onCompleted() 和 onError()宛琅。
onCompleted(): 事件隊(duì)列完結(jié)刻蟹。RxJava 不僅把每個(gè)事件單獨(dú)處理,還會(huì)把它們看做一個(gè)隊(duì)列嘿辟。RxJava 規(guī)定舆瘪,當(dāng)不會(huì)再有新的 onNext() 發(fā)出時(shí),需要觸發(fā) onCompleted() 方法作為標(biāo)志红伦。
onError(): 事件隊(duì)列異常英古。在事件處理過(guò)程中出異常時(shí),onError() 會(huì)被觸發(fā)昙读,同時(shí)隊(duì)列自動(dòng)終止召调,不允許再有事件發(fā)出。
在一個(gè)正確運(yùn)行的事件序列中, onCompleted() 和 onError() 有且只有一個(gè)蛮浑,并且是事件序列中的最后一個(gè)唠叛。需要注意的是,onCompleted() 和 onError() 二者也是互斥的沮稚,即在隊(duì)列中調(diào)用了其中一個(gè)玻墅,就不應(yīng)該再調(diào)用另一個(gè)。
RxJava 的觀察者模式大致如下圖:
RxJava 的觀察者模式
2. 基本實(shí)現(xiàn)
基于以上的概念壮虫, RxJava 的基本實(shí)現(xiàn)主要有三點(diǎn):
1) 創(chuàng)建 Observer
Observer即觀察者澳厢,它決定事件觸發(fā)的時(shí)候?qū)⒂性鯓拥男袨椤?RxJava 中的 Observer 接口的實(shí)現(xiàn)方式:
Observer observer =newObserver() {@OverridepublicvoidonNext(String s){? ? ? ? Log.d(tag,"Item: "+ s);? ? }@OverridepublicvoidonCompleted(){? ? ? ? Log.d(tag,"Completed!");? ? }@OverridepublicvoidonError(Throwable e){? ? ? ? Log.d(tag,"Error!");? ? }};
除了 Observer 接口之外环础,RxJava 還內(nèi)置了一個(gè)實(shí)現(xiàn)了 Observer 的抽象類(lèi):Subscriber。 Subscriber 對(duì) Observer 接口進(jìn)行了一些擴(kuò)展剩拢,但他們的基本使用方式是完全一樣的:
Subscriber subscriber =newSubscriber() {@OverridepublicvoidonNext(String s){? ? ? ? Log.d(tag,"Item: "+ s);? ? }@OverridepublicvoidonCompleted(){? ? ? ? Log.d(tag,"Completed!");? ? }@OverridepublicvoidonError(Throwable e){? ? ? ? Log.d(tag,"Error!");? ? }};
不僅基本使用方式一樣线得,實(shí)質(zhì)上,在 RxJava 的 subscribe 過(guò)程中徐伐,Observer 也總是會(huì)先被轉(zhuǎn)換成一個(gè) Subscriber 再使用贯钩。所以如果你只想使用基本功能,選擇 Observer 和 Subscriber 是完全一樣的办素。它們的區(qū)別對(duì)于使用者來(lái)說(shuō)主要有兩點(diǎn):
onStart(): 這是 Subscriber 增加的方法角雷。它會(huì)在 subscribe 剛開(kāi)始,而事件還未發(fā)送之前被調(diào)用性穿,可以用于做一些準(zhǔn)備工作勺三,例如數(shù)據(jù)的清零或重置。這是一個(gè)可選方法需曾,默認(rèn)情況下它的實(shí)現(xiàn)為空吗坚。需要注意的是,如果對(duì)準(zhǔn)備工作的線程有要求(例如彈出一個(gè)顯示進(jìn)度的對(duì)話框呆万,這必須在主線程執(zhí)行)商源, onStart() 就不適用了,因?yàn)樗偸窃?subscribe 所發(fā)生的線程被調(diào)用谋减,而不能指定線程牡彻。要在指定的線程來(lái)做準(zhǔn)備工作,可以使用 doOnSubscribe() 方法出爹,具體可以在后面的文中看到庄吼。
unsubscribe(): 這是 Subscriber 所實(shí)現(xiàn)的另一個(gè)接口 Subscription 的方法,用于取消訂閱以政。在這個(gè)方法被調(diào)用后霸褒,Subscriber 將不再接收事件伴找。一般在這個(gè)方法調(diào)用前盈蛮,可以使用 isUnsubscribed() 先判斷一下?tīng)顟B(tài)。 unsubscribe() 這個(gè)方法很重要技矮,因?yàn)樵?subscribe() 之后抖誉, Observable 會(huì)持有 Subscriber 的引用,這個(gè)引用如果不能及時(shí)被釋放衰倦,將有內(nèi)存泄露的風(fēng)險(xiǎn)袒炉。所以最好保持一個(gè)原則:要在不再使用的時(shí)候盡快在合適的地方(例如 onPause() onStop() 等方法中)調(diào)用 unsubscribe() 來(lái)解除引用關(guān)系,以避免內(nèi)存泄露的發(fā)生樊零。
2) 創(chuàng)建 Observable
Observable即被觀察者我磁,它決定什么時(shí)候觸發(fā)事件以及觸發(fā)怎樣的事件孽文。 RxJava 使用create()方法來(lái)創(chuàng)建一個(gè) Observable ,并為它定義事件觸發(fā)規(guī)則:
Observable observable = Observable.create(newObservable.OnSubscribe() {@Overridepublicvoidcall(Subscriber subscriber){? ? ? ? subscriber.onNext("Hello");? ? ? ? subscriber.onNext("Hi");? ? ? ? subscriber.onNext("Aloha");? ? ? ? subscriber.onCompleted();? ? }});
可以看到夺艰,這里傳入了一個(gè) OnSubscribe 對(duì)象作為參數(shù)芋哭。OnSubscribe 會(huì)被存儲(chǔ)在返回的 Observable 對(duì)象中,它的作用相當(dāng)于一個(gè)計(jì)劃表郁副,當(dāng)Observable 被訂閱的時(shí)候减牺,OnSubscribe 的 call() 方法會(huì)自動(dòng)被調(diào)用,事件序列就會(huì)依照設(shè)定依次觸發(fā)(對(duì)于上面的代碼存谎,就是觀察者Subscriber 將會(huì)被調(diào)用三次 onNext() 和一次 onCompleted())拔疚。這樣,由被觀察者調(diào)用了觀察者的回調(diào)方法既荚,就實(shí)現(xiàn)了由被觀察者向觀察者的事件傳遞稚失,即觀察者模式。
這個(gè)例子很簡(jiǎn)單:事件的內(nèi)容是字符串固以,而不是一些復(fù)雜的對(duì)象墩虹;事件的內(nèi)容是已經(jīng)定好了的,而不像有的觀察者模式一樣是待確定的(例如網(wǎng)絡(luò)請(qǐng)求的結(jié)果在請(qǐng)求返回之前是未知的)憨琳;所有事件在一瞬間被全部發(fā)送出去诫钓,而不是夾雜一些確定或不確定的時(shí)間間隔或者經(jīng)過(guò)某種觸發(fā)器來(lái)觸發(fā)的「菝總之菌湃,這個(gè)例子看起來(lái)毫無(wú)實(shí)用價(jià)值。但這是為了便于說(shuō)明遍略,實(shí)質(zhì)上只要你想惧所,各種各樣的事件發(fā)送規(guī)則你都可以自己來(lái)寫(xiě)。至于具體怎么做绪杏,后面都會(huì)講到下愈,但現(xiàn)在不行。只有把基礎(chǔ)原理先說(shuō)明白了蕾久,上層的運(yùn)用才能更容易說(shuō)清楚势似。
create() 方法是 RxJava 最基本的創(chuàng)造事件序列的方法∩基于這個(gè)方法履因, RxJava 還提供了一些方法用來(lái)快捷創(chuàng)建事件隊(duì)列,例如:
just(T...): 將傳入的參數(shù)依次發(fā)送出來(lái)盹愚。
Observable observable = Observable.just("Hello","Hi","Aloha");// 將會(huì)依次調(diào)用:// onNext("Hello");// onNext("Hi");// onNext("Aloha");// onCompleted();
from(T[]) / from(Iterable): 將傳入的數(shù)組或 Iterable 拆分成具體對(duì)象后栅迄,依次發(fā)送出來(lái)。
String[] words = {"Hello","Hi","Aloha"};Observable observable = Observable.from(words);// 將會(huì)依次調(diào)用:// onNext("Hello");// onNext("Hi");// onNext("Aloha");// onCompleted();
3) Subscribe (訂閱)
創(chuàng)建了 Observable 和 Observer 之后皆怕,再用 subscribe() 方法將它們聯(lián)結(jié)起來(lái)毅舆,整條鏈子就可以工作了西篓。代碼形式很簡(jiǎn)單:
observable.subscribe(observer);// 或者:observable.subscribe(subscriber);
有人可能會(huì)注意到, subscribe() 這個(gè)方法有點(diǎn)怪:它看起來(lái)是『observalbe 訂閱了 observer / subscriber』而不是『observer / subscriber 訂閱了 observalbe』憋活,這看起來(lái)就像『雜志訂閱了讀者』一樣顛倒了對(duì)象關(guān)系污淋。這讓人讀起來(lái)有點(diǎn)別扭,不過(guò)如果把 API 設(shè)計(jì)成 observer.subscribe(observable) / subscriber.subscribe(observable) 余掖,雖然更加符合思維邏輯寸爆,但對(duì)流式 API 的設(shè)計(jì)就造成影響了,比較起來(lái)明顯是得不償失的盐欺。
Observable.subscribe(Subscriber)的內(nèi)部實(shí)現(xiàn)是這樣的(僅核心代碼):
// 注意:這不是 subscribe() 的源碼赁豆,而是將源碼中與性能、兼容性冗美、擴(kuò)展性有關(guān)的代碼剔除后的核心代碼魔种。// 如果需要看源碼,可以去 RxJava 的 GitHub 倉(cāng)庫(kù)下載粉洼。publicSubscriptionsubscribe(Subscriber subscriber){? ? subscriber.onStart();? ? onSubscribe.call(subscriber);returnsubscriber;}
可以看到节预,subscriber() 做了3件事:
調(diào)用Subscriber.onStart()。這個(gè)方法在前面已經(jīng)介紹過(guò)属韧,是一個(gè)可選的準(zhǔn)備方法安拟。
調(diào)用 Observable 中的OnSubscribe.call(Subscriber)。在這里宵喂,事件發(fā)送的邏輯開(kāi)始運(yùn)行糠赦。從這也可以看出,在 RxJava 中锅棕,Observable 并不是在創(chuàng)建的時(shí)候就立即開(kāi)始發(fā)送事件拙泽,而是在它被訂閱的時(shí)候,即當(dāng) subscribe() 方法執(zhí)行的時(shí)候裸燎。
將傳入的 Subscriber 作為 Subscription 返回顾瞻。這是為了方便unsubscribe().
整個(gè)過(guò)程中對(duì)象間的關(guān)系如下圖:
關(guān)系靜圖
除了subscribe(Observer)和subscribe(Subscriber),subscribe() 還支持不完整定義的回調(diào)德绿,RxJava 會(huì)自動(dòng)根據(jù)定義創(chuàng)建出 Subscriber 荷荤。形式如下:
Action1 onNextAction =newAction1() {// onNext()@Overridepublicvoidcall(String s){? ? ? ? Log.d(tag, s);? ? }};Action1 onErrorAction =newAction1() {// onError()@Overridepublicvoidcall(Throwable throwable){// Error handling}};Action0 onCompletedAction =newAction0() {// onCompleted()@Overridepublicvoidcall(){? ? ? ? Log.d(tag,"completed");? ? }};// 自動(dòng)創(chuàng)建 Subscriber 惯驼,并使用 onNextAction 來(lái)定義 onNext()observable.subscribe(onNextAction);// 自動(dòng)創(chuàng)建 Subscriber ,并使用 onNextAction 和 onErrorAction 來(lái)定義 onNext() 和 onError()observable.subscribe(onNextAction, onErrorAction);// 自動(dòng)創(chuàng)建 Subscriber 幅疼,并使用 onNextAction烙无、 onErrorAction 和 onCompletedAction 來(lái)定義 onNext()、 onError() 和 onCompleted()observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
簡(jiǎn)單解釋一下這段代碼中出現(xiàn)的Action1 和 Action0埂息。
Action0 是 RxJava 的一個(gè)接口,它只有一個(gè)方法 call(),這個(gè)方法是無(wú)參無(wú)返回值的几蜻;由于 onCompleted() 方法也是無(wú)參無(wú)返回值的喇潘,因此 Action0 可以被當(dāng)成一個(gè)包裝對(duì)象,將 onCompleted() 的內(nèi)容打包起來(lái)將自己作為一個(gè)參數(shù)傳入 subscribe() 以實(shí)現(xiàn)不完整定義的回調(diào)梭稚。這樣其實(shí)也可以看做將 onCompleted() 方法作為參數(shù)傳進(jìn)了 subscribe()颖低,相當(dāng)于其他某些語(yǔ)言中的『閉包』。
Action1 也是一個(gè)接口弧烤,它同樣只有一個(gè)方法 call(T param)忱屑,這個(gè)方法也無(wú)返回值,但有一個(gè)參數(shù)暇昂;與 Action0 同理莺戒,由于 onNext(T obj) 和 onError(Throwable error) 也是單參數(shù)無(wú)返回值的,因此 Action1 可以將 onNext(obj) 和 onError(error) 打包起來(lái)傳入 subscribe() 以實(shí)現(xiàn)不完整定義的回調(diào)急波。事實(shí)上从铲,雖然 Action0 和 Action1 在 API 中使用最廣泛,但 RxJava 是提供了多個(gè) ActionX 形式的接口 (例如 Action2, Action3) 的澄暮,它們可以被用以包裝不同的無(wú)返回值的方法名段。
注:正如前面所提到的,Observer 和 Subscriber 具有相同的角色泣懊,而且 Observer 在 subscribe() 過(guò)程中最終會(huì)被轉(zhuǎn)換成 Subscriber 對(duì)象伸辟,因此,從這里開(kāi)始馍刮,后面的描述我將用 Subscriber 來(lái)代替 Observer 自娩,這樣更加嚴(yán)謹(jǐn)。
4) 場(chǎng)景示例
下面舉兩個(gè)例子:
為了把原理用更清晰的方式表述出來(lái)渠退,本文中挑選的都是功能盡可能簡(jiǎn)單的例子忙迁,以至于有些示例代碼看起來(lái)會(huì)有『畫(huà)蛇添足』『明明不用 RxJava 可以更簡(jiǎn)便地解決問(wèn)題』的感覺(jué)。當(dāng)你看到這種情況碎乃,不要覺(jué)得是因?yàn)?RxJava 太啰嗦姊扔,而是因?yàn)樵谶^(guò)早的時(shí)候舉出真實(shí)場(chǎng)景的例子并不利于原理的解析,因此我刻意挑選了簡(jiǎn)單的情景梅誓。
a. 打印字符串?dāng)?shù)組
將字符串?dāng)?shù)組 names 中的所有字符串依次打印出來(lái):
String[] names = ...;Observable.from(names)? ? .subscribe(newAction1() {@Overridepublicvoidcall(String name){? ? ? ? ? ? Log.d(tag, name);? ? ? ? }? ? });
b. 由 id 取得圖片并顯示
由指定的一個(gè) drawable 文件 id drawableRes 取得圖片恰梢,并顯示在 ImageView 中,并在出現(xiàn)異常的時(shí)候打印 Toast 報(bào)錯(cuò):
intdrawableRes = ...;ImageView imageView = ...;Observable.create(newOnSubscribe() {@Overridepublicvoidcall(Subscriber subscriber){? ? ? ? Drawable drawable = getTheme().getDrawable(drawableRes));? ? ? ? subscriber.onNext(drawable);? ? ? ? subscriber.onCompleted();? ? }}).subscribe(newObserver() {@OverridepublicvoidonNext(Drawable drawable){? ? ? ? imageView.setImageDrawable(drawable);? ? }@OverridepublicvoidonCompleted(){? ? }@OverridepublicvoidonError(Throwable e){? ? ? ? Toast.makeText(activity,"Error!", Toast.LENGTH_SHORT).show();? ? }});
正如上面兩個(gè)例子這樣梗掰,創(chuàng)建出 Observable 和 Subscriber 嵌言,再用 subscribe() 將它們串起來(lái),一次 RxJava 的基本使用就完成了及穗。非常簡(jiǎn)單摧茴。
在 RxJava 的默認(rèn)規(guī)則中,事件的發(fā)出和消費(fèi)都是在同一個(gè)線程的埂陆。也就是說(shuō)苛白,如果只用上面的方法娃豹,實(shí)現(xiàn)出來(lái)的只是一個(gè)同步的觀察者模式。觀察者模式本身的目的就是『后臺(tái)處理购裙,前臺(tái)回調(diào)』的異步機(jī)制懂版,因此異步對(duì)于 RxJava 是至關(guān)重要的。而要實(shí)現(xiàn)異步躏率,則需要用到 RxJava 的另一個(gè)概念:Scheduler躯畴。
3. 線程控制 —— Scheduler (一)
在不指定線程的情況下, RxJava 遵循的是線程不變的原則薇芝,即:在哪個(gè)線程調(diào)用 subscribe()私股,就在哪個(gè)線程生產(chǎn)事件;在哪個(gè)線程生產(chǎn)事件恩掷,就在哪個(gè)線程消費(fèi)事件倡鲸。如果需要切換線程,就需要用到Scheduler(調(diào)度器)黄娘。
Scheduler 的 API (一)
在RxJava 中峭状,Scheduler ——調(diào)度器,相當(dāng)于線程控制器逼争,RxJava 通過(guò)它來(lái)指定每一段代碼應(yīng)該運(yùn)行在什么樣的線程优床。RxJava 已經(jīng)內(nèi)置了幾個(gè) Scheduler ,它們已經(jīng)適合大多數(shù)的使用場(chǎng)景:
Schedulers.immediate(): 直接在當(dāng)前線程運(yùn)行誓焦,相當(dāng)于不指定線程胆敞。這是默認(rèn)的 Scheduler。
Schedulers.newThread(): 總是啟用新線程杂伟,并在新線程執(zhí)行操作移层。
Schedulers.io(): I/O 操作(讀寫(xiě)文件、讀寫(xiě)數(shù)據(jù)庫(kù)赫粥、網(wǎng)絡(luò)信息交互等)所使用的 Scheduler观话。行為模式和 newThread() 差不多,區(qū)別在于 io() 的內(nèi)部實(shí)現(xiàn)是是用一個(gè)無(wú)數(shù)量上限的線程池越平,可以重用空閑的線程频蛔,因此多數(shù)情況下 io() 比 newThread() 更有效率。不要把計(jì)算工作放在 io() 中秦叛,可以避免創(chuàng)建不必要的線程晦溪。
Schedulers.computation(): 計(jì)算所使用的 Scheduler。這個(gè)計(jì)算指的是 CPU 密集型計(jì)算挣跋,即不會(huì)被 I/O 等操作限制性能的操作三圆,例如圖形的計(jì)算。這個(gè) Scheduler 使用的固定的線程池,大小為 CPU 核數(shù)嫌术。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時(shí)間會(huì)浪費(fèi) CPU牌借。
另外度气, Android 還有一個(gè)專(zhuān)用的AndroidSchedulers.mainThread(),它指定的操作將在 Android主線程運(yùn)行膨报。
有了這幾個(gè) Scheduler 磷籍,就可以使用subscribeOn()和observeOn()兩個(gè)方法來(lái)對(duì)線程進(jìn)行控制了。subscribeOn(): 指定 subscribe() 所發(fā)生的線程现柠,即 Observable.OnSubscribe 被激活時(shí)所處的線程院领。或者叫做事件產(chǎn)生的線程够吩。 observeOn(): 指定 Subscriber 所運(yùn)行在的線程比然。或者叫做事件消費(fèi)的線程周循。
文字?jǐn)⑹隹倸w難理解强法,上代碼:
Observable.just(1,2,3,4)? ? .subscribeOn(Schedulers.io())// 指定 subscribe() 發(fā)生在 IO 線程.observeOn(AndroidSchedulers.mainThread())// 指定 Subscriber 的回調(diào)發(fā)生在主線程.subscribe(newAction1() {@Overridepublicvoidcall(Integer number){? ? ? ? ? ? Log.d(tag,"number:"+ number);? ? ? ? }? ? });
上面這段代碼中,由于subscribeOn(Schedulers.io())的指定湾笛,被創(chuàng)建的事件的內(nèi)容 1饮怯、2、3嚎研、4 將會(huì)在 IO 線程發(fā)出蓖墅;而由于observeOn(AndroidScheculers.mainThread())的指定,因此 subscriber 數(shù)字的打印將發(fā)生在主線程 临扮。
事實(shí)上论矾,這種在 subscribe() 之前寫(xiě)上兩句 subscribeOn(Scheduler.io()) 和 observeOn(AndroidSchedulers.mainThread()) 的使用方式非常常見(jiàn),它適用于多數(shù)的 『后臺(tái)線程取數(shù)據(jù)杆勇,主線程顯示』的程序策略拇囊。
而前面提到的由圖片 id 取得圖片并顯示的例子,如果也加上這兩句:
intdrawableRes = ...;ImageView imageView = ...;Observable.create(newOnSubscribe() {@Overridepublicvoidcall(Subscriber subscriber){? ? ? ? Drawable drawable = getTheme().getDrawable(drawableRes));? ? ? ? subscriber.onNext(drawable);? ? ? ? subscriber.onCompleted();? ? }}).subscribeOn(Schedulers.io())// 指定 subscribe() 發(fā)生在 IO 線程.observeOn(AndroidSchedulers.mainThread())// 指定 Subscriber 的回調(diào)發(fā)生在主線程.subscribe(newObserver() {@OverridepublicvoidonNext(Drawable drawable){? ? ? ? imageView.setImageDrawable(drawable);? ? }@OverridepublicvoidonCompleted(){? ? }@OverridepublicvoidonError(Throwable e){? ? ? ? Toast.makeText(activity,"Error!", Toast.LENGTH_SHORT).show();? ? }});
那么靶橱,加載圖片將會(huì)發(fā)生在 IO 線程寥袭,而設(shè)置圖片則被設(shè)定在了主線程。這就意味著关霸,即使加載圖片耗費(fèi)了幾十甚至幾百毫秒的時(shí)間传黄,也不會(huì)造成絲毫界面的卡頓。
Scheduler 的原理 (一)
RxJava 的 Scheduler API 很方便队寇,也很神奇(加了一句話就把線程切換了膘掰,怎么做到的?而且 subscribe() 不是最外層直接調(diào)用的方法嗎,它竟然也能被指定線程识埋?)凡伊。然而 Scheduler 的原理需要放在后面講,因?yàn)樗脑硎且韵乱还?jié)《變換》的原理作為基礎(chǔ)的窒舟。
4. 變換
RxJava 提供了對(duì)事件序列進(jìn)行變換的支持系忙,這是它的核心功能之一,也是大多數(shù)人說(shuō)『RxJava 真是太好用了』的最大原因惠豺。所謂變換银还,就是將事件序列中的對(duì)象或整個(gè)序列進(jìn)行加工處理,轉(zhuǎn)換成不同的事件或事件序列洁墙。概念說(shuō)著總是模糊難懂的蛹疯,來(lái)看 API。
API
首先看一個(gè) map() 的例子:
Observable.just("images/logo.png")// 輸入類(lèi)型 String.map(newFunc1() {@OverridepublicBitmapcall(String filePath){// 參數(shù)類(lèi)型 StringreturngetBitmapFromPath(filePath);// 返回類(lèi)型 Bitmap}? ? })? ? .subscribe(newAction1() {@Overridepublicvoidcall(Bitmap bitmap){// 參數(shù)類(lèi)型 BitmapshowBitmap(bitmap);? ? ? ? }? ? });
這里出現(xiàn)了一個(gè)叫做Func1的類(lèi)热监。它和 Action1 非常相似捺弦,也是 RxJava 的一個(gè)接口,用于包裝含有一個(gè)參數(shù)的方法孝扛。 Func1 和 Action 的區(qū)別在于羹呵,F(xiàn)unc1 包裝的是有返回值的方法。另外疗琉,和 ActionX 一樣冈欢, FuncX 也有多個(gè),用于不同參數(shù)個(gè)數(shù)的方法盈简。FuncX 和 ActionX 的區(qū)別在 FuncX 包裝的是有返回值的方法凑耻。
可以看到,map() 方法將參數(shù)中的 String 對(duì)象轉(zhuǎn)換成一個(gè) Bitmap 對(duì)象后返回柠贤,而在經(jīng)過(guò) map() 方法后香浩,事件的參數(shù)類(lèi)型也由 String 轉(zhuǎn)為了 Bitmap。這種直接變換對(duì)象并返回的臼勉,是最常見(jiàn)的也最容易理解的變換邻吭。不過(guò) RxJava 的變換遠(yuǎn)不止這樣,它不僅可以針對(duì)事件對(duì)象宴霸,還可以針對(duì)整個(gè)事件隊(duì)列囱晴,這使得 RxJava 變得非常靈活。我列舉幾個(gè)常用的變換:
map(): 事件對(duì)象的直接變換瓢谢,具體功能上面已經(jīng)介紹過(guò)畸写。它是 RxJava 最常用的變換。 map() 的示意圖:
map() 示意圖
flatMap(): 這是一個(gè)很有用但非常難理解的變換氓扛,因此我決定花多些篇幅來(lái)介紹它枯芬。 首先假設(shè)這么一種需求:假設(shè)有一個(gè)數(shù)據(jù)結(jié)構(gòu)『學(xué)生』,現(xiàn)在需要打印出一組學(xué)生的名字。實(shí)現(xiàn)方式很簡(jiǎn)單:
Student[] students = ...;Subscriber subscriber =newSubscriber() {@OverridepublicvoidonNext(String name){? ? ? ? Log.d(tag, name);? ? }? ? ...};Observable.from(students)? ? .map(newFunc1() {@OverridepublicStringcall(Student student){returnstudent.getName();? ? ? ? }? ? })? ? .subscribe(subscriber);
很簡(jiǎn)單千所。那么再假設(shè):如果要打印出每個(gè)學(xué)生所需要修的所有課程的名稱(chēng)呢狂魔?(需求的區(qū)別在于,每個(gè)學(xué)生只有一個(gè)名字淫痰,但卻有多個(gè)課程最楷。)首先可以這樣實(shí)現(xiàn):
Student[] students = ...;Subscriber subscriber =newSubscriber() {@OverridepublicvoidonNext(Student student){? ? ? ? List courses = student.getCourses();for(inti =0; i < courses.size(); i++) {? ? ? ? ? ? Course course = courses.get(i);? ? ? ? ? ? Log.d(tag, course.getName());? ? ? ? }? ? }? ? ...};Observable.from(students)? ? .subscribe(subscriber);
這個(gè)時(shí)候,就需要用 flatMap() 了:
Student[] students = ...;Subscriber subscriber =newSubscriber() {@OverridepublicvoidonNext(Course course){? ? ? ? Log.d(tag, course.getName());? ? }? ? ...};Observable.from(students)? ? .flatMap(newFunc1>() {@OverridepublicObservablecall(Student student){returnObservable.from(student.getCourses());? ? ? ? }? ? })? ? .subscribe(subscriber);
從上面的代碼可以看出黑界, flatMap() 和 map() 有一個(gè)相同點(diǎn):它也是把傳入的參數(shù)轉(zhuǎn)化之后返回另一個(gè)對(duì)象管嬉。但需要注意皂林,和 map() 不同的是朗鸠,flatMap() 中返回的是個(gè) Observable 對(duì)象,并且這個(gè) Observable 對(duì)象并不是被直接發(fā)送到了 Subscriber 的回調(diào)方法中础倍。
flatMap() 的原理是這樣的:
1.使用傳入的事件對(duì)象創(chuàng)建一個(gè) Observable 對(duì)象烛占;
2.并不發(fā)送這個(gè) Observable, 而是將它激活,于是它開(kāi)始發(fā)送事件沟启;
3.每一個(gè)創(chuàng)建出來(lái)的 Observable 發(fā)送的事件忆家,都被匯入同一個(gè) Observable ,而這個(gè) Observable 負(fù)責(zé)將這些事件統(tǒng)一交給 Subscriber 的回調(diào)方法德迹。
這三個(gè)步驟芽卿,把事件拆成了兩級(jí),通過(guò)一組新創(chuàng)建的 Observable 將初始的對(duì)象『鋪平』之后通過(guò)統(tǒng)一路徑分發(fā)了下去胳搞。而這個(gè)『鋪平』就是 flatMap() 所謂的 flat卸例。
flatMap() 示意圖:
flatMap() 示意圖
擴(kuò)展:由于可以在嵌套的 Observable 中添加異步代碼, flatMap() 也常用于嵌套的異步操作肌毅,例如嵌套的網(wǎng)絡(luò)請(qǐng)求筷转。示例代碼(Retrofit + RxJava):
networkClient.token()// 返回 Observable,在訂閱時(shí)請(qǐng)求 token悬而,并在響應(yīng)后發(fā)送 token.flatMap(newFunc1>() {@OverridepublicObservablecall(String token){// 返回 Observable呜舒,在訂閱時(shí)請(qǐng)求消息列表,并在響應(yīng)后發(fā)送請(qǐng)求到的消息列表returnnetworkClient.messages();? ? ? ? }? ? })? ? .subscribe(newAction1() {@Overridepublicvoidcall(Messages messages){// 處理顯示消息列表showMessages(messages);? ? ? ? }? ? });
傳統(tǒng)的嵌套請(qǐng)求需要使用嵌套的 Callback 來(lái)實(shí)現(xiàn)笨奠。而通過(guò) flatMap() 袭蝗,可以把嵌套的請(qǐng)求寫(xiě)在一條鏈中,從而保持程序邏輯的清晰般婆。
throttleFirst(): 在每次事件觸發(fā)后的一定時(shí)間間隔內(nèi)丟棄新的事件呻袭。常用作去抖動(dòng)過(guò)濾,例如按鈕的點(diǎn)擊監(jiān)聽(tīng)器: RxView.clickEvents(button) // RxBinding 代碼腺兴,后面的文章有解釋 .throttleFirst(500, TimeUnit.MILLISECONDS) // 設(shè)置防抖間隔為 500ms .subscribe(subscriber); 媽媽再也不怕我的用戶手抖點(diǎn)開(kāi)兩個(gè)重復(fù)的界面啦左电。
此外, RxJava 還提供很多便捷的方法來(lái)實(shí)現(xiàn)事件序列的變換,這里就不一一舉例了篓足。
變換的原理:lift()
這些變換雖然功能各有不同段誊,但實(shí)質(zhì)上都是針對(duì)事件序列的處理和再發(fā)送。而在 RxJava 的內(nèi)部栈拖,它們是基于同一個(gè)基礎(chǔ)的變換方法: lift(Operator)连舍。首先看一下 lift() 的內(nèi)部實(shí)現(xiàn)(僅核心代碼):
// 注意:這不是 lift() 的源碼,而是將源碼中與性能涩哟、兼容性索赏、擴(kuò)展性有關(guān)的代碼剔除后的核心代碼。// 如果需要看源碼贴彼,可以去 RxJava 的 GitHub 倉(cāng)庫(kù)下載潜腻。publicObservablelift(Operator operator){returnObservable.create(newOnSubscribe() {@Overridepublicvoidcall(Subscriber subscriber){? ? ? ? ? ? Subscriber newSubscriber = operator.call(subscriber);? ? ? ? ? ? newSubscriber.onStart();? ? ? ? ? ? onSubscribe.call(newSubscriber);? ? ? ? }? ? });}
這段代碼很有意思:它生成了一個(gè)新的 Observable 并返回,而且創(chuàng)建新 Observable 所用的參數(shù) OnSubscribe 的回調(diào)方法 call() 中的實(shí)現(xiàn)竟然看起來(lái)和前面講過(guò)的 Observable.subscribe() 一樣器仗!然而它們并不一樣喲~不一樣的地方關(guān)鍵就在于第二行 onSubscribe.call(subscriber) 中的 onSubscribe 所指代的對(duì)象不同(高能預(yù)警:接下來(lái)的幾句話可能會(huì)導(dǎo)致身體的嚴(yán)重不適)——
subscribe() 中這句話的 onSubscribe 指的是 Observable 中的 onSubscribe 對(duì)象融涣,這個(gè)沒(méi)有問(wèn)題,但是 lift() 之后的情況就復(fù)雜了點(diǎn)精钮。
當(dāng)含有 lift() 時(shí):
1.lift() 創(chuàng)建了一個(gè) Observable 后威鹿,加上之前的原始 Observable,已經(jīng)有兩個(gè) Observable 了轨香;
2.而同樣地忽你,新 Observable 里的新 OnSubscribe 加上之前的原始 Observable 中的原始 OnSubscribe,也就有了兩個(gè) OnSubscribe臂容;
3.當(dāng)用戶調(diào)用經(jīng)過(guò) lift() 后的 Observable 的 subscribe() 的時(shí)候科雳,使用的是 lift() 所返回的新的 Observable ,于是它所觸發(fā)的 onSubscribe.call(subscriber)策橘,也是用的新 Observable 中的新 OnSubscribe炸渡,即在 lift() 中生成的那個(gè) OnSubscribe;
4.而這個(gè)新 OnSubscribe 的 call() 方法中的 onSubscribe 丽已,就是指的原始 Observable 中的原始 OnSubscribe 蚌堵,在這個(gè) call() 方法里,新 OnSubscribe 利用 operator.call(subscriber) 生成了一個(gè)新的 Subscriber(Operator 就是在這里沛婴,通過(guò)自己的 call() 方法將新 Subscriber 和原始 Subscriber 進(jìn)行關(guān)聯(lián)吼畏,并插入自己的『變換』代碼以實(shí)現(xiàn)變換),然后利用這個(gè)新 Subscriber 向原始 Observable 進(jìn)行訂閱嘁灯。
這樣就實(shí)現(xiàn)了 lift() 過(guò)程泻蚊,有點(diǎn)像一種代理機(jī)制,通過(guò)事件攔截和處理實(shí)現(xiàn)事件序列的變換丑婿。
精簡(jiǎn)掉細(xì)節(jié)的話性雄,也可以這么說(shuō):在 Observable 執(zhí)行了 lift(Operator) 方法之后没卸,會(huì)返回一個(gè)新的 Observable,這個(gè)新的 Observable 會(huì)像一個(gè)代理一樣秒旋,負(fù)責(zé)接收原始的 Observable 發(fā)出的事件约计,并在處理后發(fā)送給 Subscriber。
如果你更喜歡具象思維迁筛,可以看圖:
need-to-insert-img
lift() 原理圖
舉一個(gè)具體的 Operator 的實(shí)現(xiàn)煤蚌。下面這是一個(gè)將事件中的 Integer 對(duì)象轉(zhuǎn)換成 String 的例子,僅供參考:
observable.lift(newObservable.Operator() {@OverridepublicSubscriber call(finalSubscriber subscriber) {// 將事件序列中的 Integer 對(duì)象轉(zhuǎn)換為 String 對(duì)象returnnewSubscriber() {@OverridepublicvoidonNext(Integer integer){? ? ? ? ? ? ? ? subscriber.onNext(""+ integer);? ? ? ? ? ? }@OverridepublicvoidonCompleted(){? ? ? ? ? ? ? ? subscriber.onCompleted();? ? ? ? ? ? }@OverridepublicvoidonError(Throwable e){? ? ? ? ? ? ? ? subscriber.onError(e);? ? ? ? ? ? }? ? ? ? };? ? }});
講述 lift() 的原理只是為了讓你更好地了解 RxJava 细卧,從而可以更好地使用它尉桩。然而不管你是否理解了 lift() 的原理,RxJava 都不建議開(kāi)發(fā)者自定義 Operator 來(lái)直接使用 lift()贪庙,而是建議盡量使用已有的 lift() 包裝方法(如 map() flatMap() 等)進(jìn)行組合來(lái)實(shí)現(xiàn)需求蜘犁,因?yàn)橹苯邮褂?lift() 非常容易發(fā)生一些難以發(fā)現(xiàn)的錯(cuò)誤。
compose: 對(duì) Observable 整體的變換
除了 lift() 之外插勤, Observable 還有一個(gè)變換方法叫做 compose(Transformer)沽瘦。它和 lift() 的區(qū)別在于革骨, lift() 是針對(duì)事件項(xiàng)和事件序列的农尖,而 compose() 是針對(duì) Observable 自身進(jìn)行變換。舉個(gè)例子良哲,假設(shè)在程序中有多個(gè) Observable 盛卡,并且他們都需要應(yīng)用一組相同的 lift() 變換。你可以這么寫(xiě):
publicclassLiftAllTransformerimplementsObservable.Transformer{@OverridepublicObservablecall(Observable observable){returnobservable? ? ? ? ? ? .lift1()? ? ? ? ? ? .lift2()? ? ? ? ? ? .lift3()? ? ? ? ? ? .lift4();? ? }}...Transformer liftAll =newLiftAllTransformer();observable1.compose(liftAll).subscribe(subscriber1);observable2.compose(liftAll).subscribe(subscriber2);observable3.compose(liftAll).subscribe(subscriber3);observable4.compose(liftAll).subscribe(subscriber4);
像上面這樣筑凫,使用 compose() 方法滑沧,Observable 可以利用傳入的 Transformer 對(duì)象的 call 方法直接對(duì)自身進(jìn)行處理,也就不必被包在方法的里面了巍实。
compose() 的原理比較簡(jiǎn)單滓技,不附圖嘍。
5. 線程控制:Scheduler (二)
除了靈活的變換棚潦,RxJava 另一個(gè)牛逼的地方令漂,就是線程的自由控制。
Scheduler 的 API (二)
前面講到了丸边,可以利用subscribeOn()結(jié)合observeOn()來(lái)實(shí)現(xiàn)線程控制叠必,讓事件的產(chǎn)生和消費(fèi)發(fā)生在不同的線程。
可是在了解了 map() flatMap() 等變換方法后妹窖,有些好事的(其實(shí)就是當(dāng)初剛接觸 RxJava 時(shí)的我)就問(wèn)了:能不能多切換幾次線程纬朝?
答案是:能。
因?yàn)閛bserveOn()指定的是 Subscriber的線程骄呼,而這個(gè) Subscriber并不是(嚴(yán)格說(shuō)應(yīng)該為『不一定是』共苛,但這里不妨理解為『不是』subscribe()參數(shù)中的 Subscriber判没,而是 observeOn()執(zhí)行時(shí)的當(dāng)前 Observable所對(duì)應(yīng)的 Subscriber,即它的直接下級(jí) Subscriber隅茎。換句話說(shuō)哆致,observeOn()指定的是它之后的操作所在的線程。因此如果有多次切換線程的需求患膛,只要在每個(gè)想要切換線程的位置調(diào)用一次 observeOn()即可摊阀。上代碼:
Observable.just(1,2,3,4)// IO 線程,由 subscribeOn() 指定.subscribeOn(Schedulers.io())? ? .observeOn(Schedulers.newThread())? ? .map(mapOperator)// 新線程踪蹬,由 observeOn() 指定.observeOn(Schedulers.io())? ? .map(mapOperator2)// IO 線程胞此,由 observeOn() 指定.observeOn(AndroidSchedulers.mainThread)? ? .subscribe(subscriber);// Android 主線程,由 observeOn() 指定
如上跃捣,通過(guò) observeOn()的多次調(diào)用漱牵,程序?qū)崿F(xiàn)了線程的多次切換。不過(guò)疚漆,不同于observeOn()酣胀,subscribeOn()的位置放在哪里都可以,但它是只能調(diào)用一次的娶聘。又有好事的(其實(shí)還是當(dāng)初的我)問(wèn)了:如果我非要調(diào)用多次 subscribeOn()呢闻镶?會(huì)有什么效果?
這個(gè)問(wèn)題先放著丸升,我們還是從 RxJava 線程控制的原理說(shuō)起吧铆农。
Scheduler 的原理(二)
其實(shí), subscribeOn()和 observeOn()的內(nèi)部實(shí)現(xiàn)狡耻,也是用的 lift()墩剖。
具體看圖(不同顏色的箭頭表示不同的線程):subscribeOn() 原理圖:
need-to-insert-img
subscribeOn() 原理
observeOn() 原理圖:
need-to-insert-img
observeOn() 原理
從圖中可以看出
subscribeOn()和observeOn()都做了線程切換的工作(圖中的 "schedule..." 部位)。
不同的是夷狰,subscribeOn()的線程切換發(fā)生在 OnSubscribe中岭皂,即在它通知上一級(jí) OnSubscribe時(shí),這時(shí)事件還沒(méi)有開(kāi)始發(fā)送沼头,因此 subscribeOn()的線程控制可以從事件發(fā)出的開(kāi)端就造成影響爷绘;
而observeOn()的線程切換則發(fā)生在它內(nèi)建的 Subscriber中,即發(fā)生在它即將給下一級(jí) Subscriber發(fā)送事件時(shí)瘫证,因此 observeOn()控制的是它后面的線程揉阎。
最后,我用一張圖來(lái)解釋當(dāng)多個(gè) subscribeOn()和 observeOn()混合使用時(shí)背捌,線程調(diào)度是怎么發(fā)生的(由于圖中對(duì)象較多毙籽,相對(duì)于上面的圖對(duì)結(jié)構(gòu)做了一些簡(jiǎn)化調(diào)整):
need-to-insert-img
線程控制綜合調(diào)用
圖中共有 5 處含有對(duì)事件的操作。由圖中可以看出毡庆,
①和②兩處受第一個(gè) subscribeOn()影響坑赡,運(yùn)行在紅色線程烙如;
③和④處受第一個(gè) observeOn()的影響,運(yùn)行在綠色線程毅否;
⑤處受第二個(gè) onserveOn()影響亚铁,運(yùn)行在紫色線程螟加;而第二個(gè) subscribeOn() 徘溢,由于在通知過(guò)程中線程就被第一個(gè) subscribeOn()截?cái)啵虼藢?duì)整個(gè)流程并沒(méi)有任何影響捆探。這里也就回答了前面的問(wèn)題:當(dāng)使用了多個(gè) subscribeOn()
的時(shí)候然爆,只有第一個(gè) subscribeOn() 起作用。
延伸:doOnSubscribe()
然而黍图,雖然超過(guò)一個(gè)的 subscribeOn() 對(duì)事件處理的流程沒(méi)有影響曾雕,但在流程之前卻是可以利用的。
在前面講 Subscriber 的時(shí)候助被,提到過(guò) Subscriber 的onStart()可以用作流程開(kāi)始前的初始化剖张。然而 onStart() 由于在 subscribe() 發(fā)生時(shí)就被調(diào)用了,因此不能指定線程揩环,而是只能執(zhí)行在 subscribe() 被調(diào)用時(shí)的線程搔弄。這就導(dǎo)致如果 onStart() 中含有對(duì)線程有要求的代碼(例如在界面上顯示一個(gè) ProgressBar,這必須在主線程執(zhí)行)检盼,將會(huì)有線程非法的風(fēng)險(xiǎn)肯污,因?yàn)橛袝r(shí)你無(wú)法預(yù)測(cè) subscribe() 將會(huì)在什么線程執(zhí)行翘单。
而與 Subscriber.onStart() 相對(duì)應(yīng)的吨枉,有一個(gè)方法Observable.doOnSubscribe()。它和 Subscriber.onStart() 同樣是在 subscribe() 調(diào)用后而且在事件發(fā)送前執(zhí)行哄芜,但區(qū)別在于它可以指定線程貌亭。默認(rèn)情況下,doOnSubscribe() 執(zhí)行在 subscribe() 發(fā)生的線程认臊;而如果在 doOnSubscribe() 之后有 subscribeOn() 的話圃庭,它將執(zhí)行在離它最近的 subscribeOn() 所指定的線程。
示例代碼:
Observable.create(onSubscribe)? ? .subscribeOn(Schedulers.io())? ? .doOnSubscribe(newAction0() {@Overridepublicvoidcall(){? ? ? ? ? ? progressBar.setVisibility(View.VISIBLE);// 需要在主線程執(zhí)行}? ? })? ? .subscribeOn(AndroidSchedulers.mainThread())// 指定主線程.observeOn(AndroidSchedulers.mainThread())? ? .subscribe(subscriber);
如上失晴,在 doOnSubscribe()的后面跟一個(gè) subscribeOn() 剧腻,就能指定準(zhǔn)備工作的線程了。
RxJava 的適用場(chǎng)景和使用方式
與 Retrofit 的結(jié)合
Retrofit是 Square 的一個(gè)著名的網(wǎng)絡(luò)請(qǐng)求庫(kù)涂屁。沒(méi)有用過(guò) Retrofit 的可以選擇跳過(guò)這一小節(jié)也沒(méi)關(guān)系书在,我舉的每種場(chǎng)景都只是個(gè)例子,而且例子之間并無(wú)前后關(guān)聯(lián)拆又,只是個(gè)拋磚引玉的作用儒旬,所以你跳過(guò)這里看別的場(chǎng)景也可以的栏账。
Retrofit 除了提供了傳統(tǒng)的Callback形式的 API,還有 RxJava 版本的Observable形式 API栈源。下面我用對(duì)比的方式來(lái)介紹 Retrofit 的 RxJava 版 API 和傳統(tǒng)版本的區(qū)別挡爵。
以獲取一個(gè) User對(duì)象的接口作為例子。使用Retrofit 的傳統(tǒng) API甚垦,你可以用這樣的方式來(lái)定義請(qǐng)求:
@GET("/user")publicvoidgetUser(@Query("userId")String userId, Callback callback);
在程序的構(gòu)建過(guò)程中茶鹃, Retrofit 會(huì)把自動(dòng)把方法實(shí)現(xiàn)并生成代碼,然后開(kāi)發(fā)者就可以利用下面的方法來(lái)獲取特定用戶并處理響應(yīng):
getUser(userId,newCallback() {@Overridepublicvoidsuccess(User user){? ? ? ? userView.setUser(user);? ? }@Overridepublicvoidfailure(RetrofitError error){// Error handling...? ? }};
而使用 RxJava 形式的 API艰亮,定義同樣的請(qǐng)求是這樣的:
@GET("/user")publicObservablegetUser(@Query("userId")String userId);
使用的時(shí)候是這樣的:
getUser(userId)? ? .observeOn(AndroidSchedulers.mainThread())? ? .subscribe(newObserver() {@OverridepublicvoidonNext(User user){? ? ? ? ? ? userView.setUser(user);? ? ? ? }@OverridepublicvoidonCompleted(){? ? ? ? }@OverridepublicvoidonError(Throwable error){// Error handling...? ? ? ? }? ? });
看到區(qū)別了嗎前计?
當(dāng) RxJava 形式的時(shí)候,Retrofit 把請(qǐng)求封裝進(jìn)Observable垃杖,在請(qǐng)求結(jié)束后調(diào)用onNext()或在請(qǐng)求失敗后調(diào)用onError()男杈。
對(duì)比來(lái)看, Callback形式和 Observable形式長(zhǎng)得不太一樣调俘,但本質(zhì)都差不多伶棒,而且在細(xì)節(jié)上 Observable形式似乎還比 Callback形式要差點(diǎn)。那 Retrofit 為什么還要提供 RxJava 的支持呢彩库?
因?yàn)樗糜冒肤无。倪@個(gè)例子看不出來(lái)是因?yàn)檫@只是最簡(jiǎn)單的情況骇钦。而一旦情景復(fù)雜起來(lái)宛渐, Callback形式馬上就會(huì)開(kāi)始讓人頭疼。比如:假設(shè)這么一種情況:你的程序取到的 User 并不應(yīng)該直接顯示眯搭,而是需要先與數(shù)據(jù)庫(kù)中的數(shù)據(jù)進(jìn)行比對(duì)和修正后再顯示窥翩。使用 Callback方式大概可以這么寫(xiě):
getUser(userId,newCallback() {@Overridepublicvoidsuccess(User user){? ? ? ? processUser(user);// 嘗試修正 User 數(shù)據(jù)userView.setUser(user);? ? }@Overridepublicvoidfailure(RetrofitError error){// Error handling...? ? }};
有問(wèn)題嗎?
很簡(jiǎn)便鳞仙,但不要這樣做寇蚊。為什么?因?yàn)檫@樣做會(huì)影響性能棍好。數(shù)據(jù)庫(kù)的操作很重仗岸,一次讀寫(xiě)操作花費(fèi) 10~20ms 是很常見(jiàn)的,這樣的耗時(shí)很容易造成界面的卡頓借笙。所以通常情況下扒怖,如果可以的話一定要避免在主線程中處理數(shù)據(jù)庫(kù)。所以為了提升性能业稼,這段代碼可以?xún)?yōu)化一下:
getUser(userId,newCallback() {@Overridepublicvoidsuccess(User user){newThread() {@Overridepublicvoidrun(){? ? ? ? ? ? ? ? processUser(user);// 嘗試修正 User 數(shù)據(jù)runOnUiThread(newRunnable() {// 切回 UI 線程@Overridepublicvoidrun(){? ? ? ? ? ? ? ? ? ? ? ? userView.setUser(user);? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? });? ? ? ? ? ? }).start();? ? }@Overridepublicvoidfailure(RetrofitError error){// Error handling...? ? }};
性能問(wèn)題解決盗痒,但……這代碼實(shí)在是太亂了,迷之縮進(jìn)芭渭伞积糯!雜亂的代碼往往不僅僅是美觀問(wèn)題掂墓,因?yàn)榇a越亂往往就越難讀懂,而如果項(xiàng)目中充斥著雜亂的代碼看成,無(wú)疑會(huì)降低代碼的可讀性君编,造成團(tuán)隊(duì)開(kāi)發(fā)效率的降低和出錯(cuò)率的升高。
這時(shí)候川慌,如果用 RxJava 的形式吃嘿,就好辦多了。 RxJava 形式的代碼是這樣的:
getUser(userId)? ? .doOnNext(newAction1() {@Overridepublicvoidcall(User user){? ? ? ? ? ? processUser(user);? ? ? ? })? ? .observeOn(AndroidSchedulers.mainThread())? ? .subscribe(newObserver() {@OverridepublicvoidonNext(User user){? ? ? ? ? ? userView.setUser(user);? ? ? ? }@OverridepublicvoidonCompleted(){? ? ? ? }@OverridepublicvoidonError(Throwable error){// Error handling...? ? ? ? }? ? });
后臺(tái)代碼和前臺(tái)代碼全都寫(xiě)在一條鏈中梦重,明顯清晰了很多兑燥。
再舉一個(gè)例子:假設(shè) /user接口并不能直接訪問(wèn),而需要填入一個(gè)在線獲取的 token琴拧,代碼應(yīng)該怎么寫(xiě)降瞳?
Callback方式,可以使用嵌套的 Callback:
@GET("/token")publicvoidgetToken(Callback callback);@GET("/user")publicvoidgetUser(@Query("token")String token, @Query("userId")String userId, Callback callback);...getToken(newCallback() {@Overridepublicvoidsuccess(String token){? ? ? ? getUser(token, userId,newCallback() {@Overridepublicvoidsuccess(User user){? ? ? ? ? ? ? ? userView.setUser(user);? ? ? ? ? ? }@Overridepublicvoidfailure(RetrofitError error){// Error handling...? ? ? ? ? ? }? ? ? ? };? ? }@Overridepublicvoidfailure(RetrofitError error){// Error handling...? ? }});
倒是沒(méi)有什么性能問(wèn)題蚓胸,可是迷之縮進(jìn)毀一生挣饥,你懂我也懂,做過(guò)大項(xiàng)目的人應(yīng)該更懂沛膳。
而使用 RxJava 的話扔枫,代碼是這樣的:
@GET("/token")publicObservablegetToken();@GET("/user")publicObservablegetUser(@Query("token")String token, @Query("userId")String userId);...getToken()? ? .flatMap(newFunc1>() {@OverridepublicObservableonNext(String token){returngetUser(token, userId);? ? ? ? })? ? .observeOn(AndroidSchedulers.mainThread())? ? .subscribe(newObserver() {@OverridepublicvoidonNext(User user){? ? ? ? ? ? userView.setUser(user);? ? ? ? }@OverridepublicvoidonCompleted(){? ? ? ? }@OverridepublicvoidonError(Throwable error){// Error handling...? ? ? ? }? ? });
用一個(gè) flatMap()
就搞定了邏輯,依然是一條鏈锹安《碳觯看著就很爽,是吧叹哭?
2.RxBinding
RxBinding是 Jake Wharton 的一個(gè)開(kāi)源庫(kù)忍宋,它提供了一套在 Android 平臺(tái)上的基于 RxJava 的 Binding API。所謂 Binding话速,就是類(lèi)似設(shè)置 OnClickListener讶踪、設(shè)置 TextWatcher這樣的注冊(cè)綁定對(duì)象的 API。舉個(gè)設(shè)置點(diǎn)擊監(jiān)聽(tīng)的例子泊交。使用 RxBinding,可以把事件監(jiān)聽(tīng)用這樣的方法來(lái)設(shè)置:
Button button = ...;RxView.clickEvents(button)// 以 Observable 形式來(lái)反饋點(diǎn)擊事件.subscribe(newAction1() {@Overridepublicvoidcall(ViewClickEvent event){// Click handling}? ? });
看起來(lái)除了形式變了沒(méi)什么區(qū)別柱查,實(shí)質(zhì)上也是這樣廓俭。甚至如果你看一下它的源碼,你會(huì)發(fā)現(xiàn)它連實(shí)現(xiàn)都沒(méi)什么驚喜:它的內(nèi)部是直接用一個(gè)包裹著的 setOnClickListener()來(lái)實(shí)現(xiàn)的唉工。然而研乒,僅僅這一個(gè)形式的改變,卻恰好
就是 RxBinding 的目的:擴(kuò)展性淋硝。
通過(guò)RxBinding把點(diǎn)擊監(jiān)聽(tīng)轉(zhuǎn)換成Observable之后雹熬,就有了對(duì)它進(jìn)行擴(kuò)展的可能宽菜。擴(kuò)展的方式有很多,根據(jù)需求而定竿报。一個(gè)例子是前面提到過(guò)的 throttleFirst() 铅乡,用于去抖動(dòng),也就是消除手抖導(dǎo)致的快速連環(huán)點(diǎn)擊:
RxView.clickEvents(button).throttleFirst(500,TimeUnit.MILLISECONDS).subscribe(clickAction);
如果想對(duì) RxBinding
有更多了解烈菌,可以去它的GitHub 項(xiàng)目下面看看阵幸。
各種異步操作
前面舉的 Retrofit和 RxBinding的例子,是兩個(gè)可以提供現(xiàn)的 Observable的庫(kù)芽世。而如果你有某些異步操作無(wú)法用這些庫(kù)來(lái)自動(dòng)生成 Observable挚赊,也完全可以自己寫(xiě)。例如數(shù)據(jù)庫(kù)的讀寫(xiě)济瓢、大圖片的載入荠割、文件壓縮/解壓等各種需要放在后臺(tái)工作的耗時(shí)操作,都可以用 RxJava 來(lái)實(shí)現(xiàn)旺矾,有了之前幾章的例子涨共,這里應(yīng)該不用再舉例了。
RxBus
RxBus 名字看起來(lái)像一個(gè)庫(kù)宠漩,但它并不是一個(gè)庫(kù)举反,而是一種模式,它的思想是使用 RxJava 來(lái)實(shí)現(xiàn)了 EventBus 扒吁,而讓你不再需要使用 Otto或者 GreenRobot 的 EventBus火鼻。至于什么是 RxBus,可以看這篇文章雕崩。順便說(shuō)一句魁索,F(xiàn)lipboard 已經(jīng)用 RxBus 替換掉了 Otto,目前為止沒(méi)有不良反應(yīng)盼铁。
最后
對(duì)于 Android 開(kāi)發(fā)者來(lái)說(shuō)粗蔚, RxJava 是一個(gè)很難上手的庫(kù),因?yàn)樗鼘?duì)于 Android 開(kāi)發(fā)者來(lái)說(shuō)有太多陌生的概念了饶火∨艨兀可是它真的很牛逼。因此肤寝,我寫(xiě)了這篇《給 Android 開(kāi)發(fā)者的 RxJava 詳解》当辐,希望能給始終搞不明白什么是 RxJava 的人一些入門(mén)的指引,或者能讓正在使用 RxJava 但仍然心存疑惑的人看到一些更深入的解析鲤看。無(wú)論如何缘揪,只要能給各位同為 Android 工程師的你們提供一些幫助,這篇文章的目的就達(dá)到了。