Rxjava使用詳解附帽,強(qiáng)烈推薦

最近項(xiàng)目里面有用到Rxjava框架埠戳,感覺很強(qiáng)大的巨作,所以在網(wǎng)上搜了很多相關(guān)文章蕉扮,發(fā)現(xiàn)一片文章很不錯(cuò)整胃,今天把這篇文章分享給大家,感謝原作者的傾心貢獻(xiàn)喳钟,那么下面大家來看看這篇文章:(小編在這里祝大家周末愉快哦Fㄊ埂!奔则!)

前言

我從去年開始使用

RxJava 蛮寂,到現(xiàn)在一年多了。今年加入了 Flipboard 后易茬,看到 Flipboard 的 Android 項(xiàng)目也在使用 RxJava

酬蹋,并且使用的場(chǎng)景越來越多 。而最近這幾個(gè)月疾呻,我也發(fā)現(xiàn)國內(nèi)越來越多的人開始提及 RxJava 除嘹。有人說『RxJava

真是太好用了』,有人說『RxJava 真是太難用了』岸蜗,另外更多的人表示:我真的百度了也谷歌了尉咕,但我還是想問: RxJava 到底是什么?

鑒于 RxJava 目前這種既火爆又神秘的現(xiàn)狀璃岳,而我又在一年的使用過程中對(duì) RxJava 有了一些理解年缎,我決定寫下這篇文章來對(duì) RxJava 做一個(gè)相對(duì)詳細(xì)的、針對(duì) Android 開發(fā)者的介紹铃慷。

這篇文章的目的有兩個(gè): 1. 給對(duì) RxJava 感興趣的人一些入門的指引 2. 給正在使用 RxJava 但仍然心存疑惑的人一些更深入的解析

RxJava 到底是什么

RxJava 好在哪

API 介紹和原理簡析

1. 概念:擴(kuò)展的觀察者模式

觀察者模式

RxJava 的觀察者模式

2. 基本實(shí)現(xiàn)

1) 創(chuàng)建 Observer

2) 創(chuàng)建 Observable

3) Subscribe (訂閱)

4) 場(chǎng)景示例

a. 打印字符串?dāng)?shù)組

b. 由 id 取得圖片并顯示

3. 線程控制 —— Scheduler (一)

1) Scheduler 的 API (一)

2) Scheduler 的原理 (一)

4. 變換

1) API

2) 變換的原理:lift()

3) compose: 對(duì) Observable 整體的變換

5. 線程控制:Scheduler (二)

1) Scheduler 的 API (二)

2) Scheduler 的原理(二)

3) 延伸:doOnSubscribe()

RxJava 的適用場(chǎng)景和使用方式

1. 與 Retrofit 的結(jié)合

2. RxBinding

3. 各種異步操作

4. RxBus

最后

關(guān)于作者:

為什么寫這個(gè)单芜?

在正文開始之前的最后,放上GitHub鏈接和引入依賴的gradle代碼: Github:

https://github.com/ReactiveX/RxJava

https://github.com/ReactiveX/RxAndroid

引入依賴:

compile 'io.reactivex:rxjava:1.0.14'

compile 'io.reactivex:rxandroid:1.0.1'

(版本號(hào)是文章發(fā)布時(shí)的最新穩(wěn)定版)

另外犁柜,感謝 RxJava 核心成員流火楓林的技術(shù)支持和內(nèi)測(cè)讀者代碼家洲鸠、鮑永章drakeet馬琳扒腕、有時(shí)放縱绢淀、程序亦非猿大頭鬼瘾腰、XZoomEye皆的、席德雨TCahead蹋盆、Tiiime费薄、Ailurus宅學(xué)長栖雾、妖孽楞抡、大大大大大臣哥NicodeLee的幫助岩灭,以及周伯通招聘的贊助拌倍。

RxJava 到底是什么

一個(gè)詞:異步赂鲤。

RxJava

在 GitHub 主頁上的自我介紹是 "a library for composing asynchronous and

event-based programs using observable sequences for the Java VM"(一個(gè)在

Java VM 上使用可觀測(cè)的序列來組成異步的噪径、基于事件的程序的庫)。這就是 RxJava 数初,概括得非常精準(zhǔn)找爱。

然而,對(duì)于初學(xué)者來說泡孩,這太難看懂了车摄。因?yàn)樗且粋€(gè)『總結(jié)』,而初學(xué)者更需要一個(gè)『引言』仑鸥。

其實(shí)吮播, RxJava 的本質(zhì)可以壓縮為異步這一個(gè)詞。說到根上眼俊,它就是一個(gè)實(shí)現(xiàn)異步操作的庫意狠,而別的定語都是基于這之上的。

RxJava 好在哪

換句話說疮胖,『同樣是做異步环戈,為什么人們用它,而不用現(xiàn)成的 AsyncTask / Handler / XXX / ... 澎灸?』

一個(gè)詞:簡潔院塞。

異步操作很關(guān)鍵的一點(diǎn)是程序的簡潔性,因?yàn)樵谡{(diào)度過程比較復(fù)雜的情況下性昭,異步代碼經(jīng)常會(huì)既難寫也難被讀懂拦止。 Android 創(chuàng)造的AsyncTask和Handler,其實(shí)都是為了讓異步代碼更加簡潔糜颠。RxJava 的優(yōu)勢(shì)也是簡潔汹族,但它的簡潔的與眾不同之處在于艺玲,隨著程序邏輯變得越來越復(fù)雜,它依然能夠保持簡潔鞠抑。

假設(shè)有這樣一個(gè)需求:界面上有一個(gè)自定義的視圖imageCollectorView饭聚,它的作用是顯示多張圖片,并能使用addImage(Bitmap)方法來任意增加顯示的圖片「樽荆現(xiàn)在需要程序?qū)⒁粋€(gè)給出的目錄數(shù)組File[] folders中每個(gè)目錄下的 png 圖片都加載出來并顯示在imageCollectorView中秒梳。需要注意的是,由于讀取圖片的這一過程較為耗時(shí)箕速,需要放在后臺(tái)執(zhí)行酪碘,而圖片的顯示則必須在 UI 線程執(zhí)行。常用的實(shí)現(xiàn)方式有多種盐茎,我這里貼出其中一種:

newThread(){@Overridepublicvoidrun(){super.run();for(Filefolder:folders){File[]files=folder.listFiles();for(Filefile:files){if(file.getName().endsWith(".png")){finalBitmapbitmap=getBitmapFromFile(file);getActivity().runOnUiThread(newRunnable(){@Overridepublicvoidrun(){imageCollectorView.addImage(bitmap);}});}}}}}.start();

而如果使用 RxJava 兴垦,實(shí)現(xiàn)方式是這樣的:

Observable.from(folders).flatMap(newFunc1>(){@OverridepublicObservablecall(Filefile){returnObservable.from(file.listFiles());}}).filter(newFunc1(){@OverridepublicBooleancall(Filefile){returnfile.getName().endsWith(".png");}}).map(newFunc1(){@OverridepublicBitmapcall(Filefile){returngetBitmapFromFile(file);}}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(newAction1(){@Overridepublicvoidcall(Bitmapbitmap){imageCollectorView.addImage(bitmap);}});

那位說話了:『你這代碼明明變多了啊字柠!簡潔個(gè)毛疤皆健!』大兄弟你消消氣窑业,我說的是邏輯的簡潔钦幔,不是單純的代碼量少(邏輯簡潔才是提升讀寫代碼速度的必殺技對(duì)不酿矢?)婴程。觀察一下你會(huì)發(fā)現(xiàn),

RxJava

的這個(gè)實(shí)現(xiàn)汽绢,是一條從上到下的鏈?zhǔn)秸{(diào)用西潘,沒有任何嵌套卷玉,這在邏輯的簡潔性上是具有優(yōu)勢(shì)的。當(dāng)需求變得復(fù)雜時(shí)喷市,這種優(yōu)勢(shì)將更加明顯(試想如果還要求只選取前

10

張圖片相种,常規(guī)方式要怎么辦?如果有更多這樣那樣的要求呢东抹?再試想蚂子,在這一大堆需求實(shí)現(xiàn)完兩個(gè)月之后需要改功能,當(dāng)你翻回這里看到自己當(dāng)初寫下的那一片迷之縮進(jìn)缭黔,你能保證自己將迅速看懂食茎,而不是對(duì)著代碼重新捋一遍思路?)馏谨。

另外别渔,如果你的 IDE 是 Android Studio ,其實(shí)每次打開某個(gè) Java 文件的時(shí)候,你會(huì)看到被自動(dòng) Lambda 化的預(yù)覽哎媚,這將讓你更加清晰地看到程序邏輯:

Observable.from(folders).flatMap((Func1)(folder)->{Observable.from(file.listFiles())}).filter((Func1)(file)->{file.getName().endsWith(".png")}).map((Func1)(file)->{getBitmapFromFile(file)}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe((Action1)(bitmap)->{imageCollectorView.addImage(bitmap)});

如果你習(xí)慣使用

Retrolambda 喇伯,你也可以直接把代碼寫成上面這種簡潔的形式。而如果你看到這里還不知道什么是 Retrolambda

拨与,我不建議你現(xiàn)在就去學(xué)習(xí)它稻据。原因有兩點(diǎn):1. Lambda 是把雙刃劍,它讓你的代碼簡潔的同時(shí)买喧,降低了代碼的可讀性捻悯,因此同時(shí)學(xué)習(xí) RxJava 和

Retrolambda 可能會(huì)讓你忽略 RxJava 的一些技術(shù)細(xì)節(jié);2. Retrolambda 是 Java 6/7 對(duì) Lambda

表達(dá)式的非官方兼容方案淤毛,它的向后兼容性和穩(wěn)定性是無法保障的今缚,因此對(duì)于企業(yè)項(xiàng)目,使用 Retrolambda 是有風(fēng)險(xiǎn)的低淡。所以姓言,與很多

RxJava 的推廣者不同,我并不推薦在學(xué)習(xí) RxJava 的同時(shí)一起學(xué)習(xí) Retrolambda蔗蹋。事實(shí)上何荚,我個(gè)人雖然很欣賞

Retrolambda,但我從來不用它纸颜。

在Flipboard

的 Android

代碼中兽泣,有一段邏輯非常復(fù)雜绎橘,包含了多次內(nèi)存操作胁孙、本地文件操作和網(wǎng)絡(luò)操作,對(duì)象分分合合称鳞,線程間相互配合相互等待涮较,一會(huì)兒排成人字,一會(huì)兒排成一字冈止。如果使用常規(guī)的方法來實(shí)現(xiàn)狂票,肯定是要寫得欲仙欲死,然而在使用

RxJava 的情況下熙暴,依然只是一條鏈?zhǔn)秸{(diào)用就完成了闺属。它很長,但很清晰周霉。

所以掂器, RxJava 好在哪?就好在簡潔俱箱,好在那把什么復(fù)雜邏輯都能穿成一條線的簡潔国瓮。

API 介紹和原理簡析

這個(gè)我就做不到一個(gè)詞說明了……因?yàn)檫@一節(jié)的主要內(nèi)容就是一步步地說明 RxJava 到底怎樣做到了異步,怎樣做到了簡潔。

1. 概念:擴(kuò)展的觀察者模式

RxJava 的異步實(shí)現(xiàn)乃摹,是通過一種擴(kuò)展的觀察者模式來實(shí)現(xiàn)的禁漓。

觀察者模式

先簡述一下觀察者模式,已經(jīng)熟悉的可以跳過這一段孵睬。

觀察者模式面向的需求是:A 對(duì)象(觀察者)對(duì) B 對(duì)象(被觀察者)的某種變化高度敏感播歼,需要在 B 變化的一瞬間做出反應(yīng)。舉個(gè)例子掰读,新聞里喜聞樂見的警察抓小偷荚恶,警察需要在小偷伸手作案的時(shí)候?qū)嵤┳ゲ丁T谶@個(gè)例子里磷支,警察是觀察者谒撼,小偷是被觀察者,警察需要時(shí)刻盯著小偷的一舉一動(dòng)雾狈,才能保證不會(huì)漏過任何瞬間廓潜。程序的觀察者模式和這種真正的『觀察』略有不同,觀察者不需要時(shí)刻盯著被觀察者(例如 A 不需要每過 2ms 就檢查一次 B 的狀態(tài))善榛,而是采用注冊(cè)(Register)或者稱為訂閱(Subscribe)的方式辩蛋,告訴被觀察者:我需要你的某某狀態(tài),你要在它變化的時(shí)候通知我移盆。 Android 開發(fā)中一個(gè)比較典型的例子是點(diǎn)擊監(jiān)聽器OnClickListener悼院。對(duì)設(shè)置OnClickListener來說,View是被觀察者咒循,OnClickListener是觀察者据途,二者通過setOnClickListener()方法達(dá)成訂閱關(guān)系。訂閱之后用戶點(diǎn)擊按鈕的瞬間叙甸,Android Framework 就會(huì)將點(diǎn)擊事件發(fā)送給已經(jīng)注冊(cè)的OnClickListener颖医。采取這樣被動(dòng)的觀察方式,既省去了反復(fù)檢索狀態(tài)的資源消耗裆蒸,也能夠得到最高的反饋速度熔萧。當(dāng)然,這也得益于我們可以隨意定制自己程序中的觀察者和被觀察者僚祷,而警察叔叔明顯無法要求小偷『你在作案的時(shí)候務(wù)必通知我』佛致。

OnClickListener 的模式大致如下圖:

如圖所示,通過setOnClickListener()方法辙谜,Button持有OnClickListener的引用(這一過程沒有在圖上畫出)俺榆;當(dāng)用戶點(diǎn)擊時(shí),Button自動(dòng)調(diào)用OnClickListener的onClick()方法筷弦。另外肋演,如果把這張圖中的概念抽象出來(Button-> 被觀察者抑诸、OnClickListener-> 觀察者、setOnClickListener()-> 訂閱爹殊,onClick()-> 事件)蜕乡,就由專用的觀察者模式(例如只用于監(jiān)聽控件點(diǎn)擊)轉(zhuǎn)變成了通用的觀察者模式。如下圖:

而 RxJava 作為一個(gè)工具庫梗夸,使用的就是通用形式的觀察者模式层玲。

RxJava 的觀察者模式

RxJava 有四個(gè)基本概念:Observable(可觀察者,即被觀察者)反症、Observer(觀察者)辛块、subscribe(訂閱)、事件铅碍。Observable和Observer通過subscribe()方法實(shí)現(xiàn)訂閱關(guān)系润绵,從而Observable可以在需要的時(shí)候發(fā)出事件來通知Observer。

與傳統(tǒng)觀察者模式不同胞谈, RxJava 的事件回調(diào)方法除了普通事件onNext()(相當(dāng)于onClick()/onEvent())之外尘盼,還定義了兩個(gè)特殊的事件:onCompleted()和onError()。

onCompleted(): 事件隊(duì)列完結(jié)烦绳。RxJava 不僅把每個(gè)事件單獨(dú)處理卿捎,還會(huì)把它們看做一個(gè)隊(duì)列。RxJava 規(guī)定径密,當(dāng)不會(huì)再有新的onNext()發(fā)出時(shí),需要觸發(fā)onCompleted()方法作為標(biāo)志享扔。

onError(): 事件隊(duì)列異常底桂。在事件處理過程中出異常時(shí),onError()會(huì)被觸發(fā)伪很,同時(shí)隊(duì)列自動(dòng)終止戚啥,不允許再有事件發(fā)出。

在一個(gè)正確運(yùn)行的事件序列中,onCompleted()和onError()有且只有一個(gè)锉试,并且是事件序列中的最后一個(gè)。需要注意的是览濒,onCompleted()和onError()二者也是互斥的呆盖,即在隊(duì)列中調(diào)用了其中一個(gè),就不應(yīng)該再調(diào)用另一個(gè)贷笛。

RxJava 的觀察者模式大致如下圖:

2. 基本實(shí)現(xiàn)

基于以上的概念应又, RxJava 的基本實(shí)現(xiàn)主要有三點(diǎn):

1) 創(chuàng)建 Observer

Observer 即觀察者,它決定事件觸發(fā)的時(shí)候?qū)⒂性鯓拥男袨椤?RxJava 中的Observer接口的實(shí)現(xiàn)方式:

Observerobserver=newObserver(){@OverridepublicvoidonNext(Strings){Log.d(tag,"Item: "+s);}@OverridepublicvoidonCompleted(){Log.d(tag,"Completed!");}@OverridepublicvoidonError(Throwablee){Log.d(tag,"Error!");}};

除了Observer接口之外乏苦,RxJava 還內(nèi)置了一個(gè)實(shí)現(xiàn)了Observer的抽象類:Subscriber株扛。Subscriber對(duì)Observer接口進(jìn)行了一些擴(kuò)展尤筐,但他們的基本使用方式是完全一樣的:

Subscribersubscriber=newSubscriber(){@OverridepublicvoidonNext(Strings){Log.d(tag,"Item: "+s);}@OverridepublicvoidonCompleted(){Log.d(tag,"Completed!");}@OverridepublicvoidonError(Throwablee){Log.d(tag,"Error!");}};

不僅基本使用方式一樣,實(shí)質(zhì)上洞就,在 RxJava 的 subscribe 過程中盆繁,Observer也總是會(huì)先被轉(zhuǎn)換成一個(gè)Subscriber再使用。所以如果你只想使用基本功能旬蟋,選擇Observer和Subscriber是完全一樣的油昂。它們的區(qū)別對(duì)于使用者來說主要有兩點(diǎn):

onStart(): 這是Subscriber增加的方法。它會(huì)在 subscribe 剛開始倾贰,而事件還未發(fā)送之前被調(diào)用冕碟,可以用于做一些準(zhǔn)備工作,例如數(shù)據(jù)的清零或重置匆浙。這是一個(gè)可選方法安寺,默認(rèn)情況下它的實(shí)現(xiàn)為空。需要注意的是首尼,如果對(duì)準(zhǔn)備工作的線程有要求(例如彈出一個(gè)顯示進(jìn)度的對(duì)話框我衬,這必須在主線程執(zhí)行),onStart()就不適用了饰恕,因?yàn)樗偸窃?subscribe 所發(fā)生的線程被調(diào)用挠羔,而不能指定線程。要在指定的線程來做準(zhǔn)備工作埋嵌,可以使用doOnSubscribe()方法破加,具體可以在后面的文中看到。

unsubscribe(): 這是Subscriber所實(shí)現(xiàn)的另一個(gè)接口Subscription的方法雹嗦,用于取消訂閱范舀。在這個(gè)方法被調(diào)用后,Subscriber將不再接收事件了罪。一般在這個(gè)方法調(diào)用前锭环,可以使用isUnsubscribed()先判斷一下狀態(tài)。unsubscribe()這個(gè)方法很重要泊藕,因?yàn)樵趕ubscribe()之后辅辩,Observable會(huì)持有Subscriber的引用,這個(gè)引用如果不能及時(shí)被釋放娃圆,將有內(nèi)存泄露的風(fēng)險(xiǎn)玫锋。所以最好保持一個(gè)原則:要在不再使用的時(shí)候盡快在合適的地方(例如onPause()onStop()等方法中)調(diào)用unsubscribe()來解除引用關(guān)系,以避免內(nèi)存泄露的發(fā)生讼呢。

2) 創(chuàng)建 Observable

Observable 即被觀察者撩鹿,它決定什么時(shí)候觸發(fā)事件以及觸發(fā)怎樣的事件。 RxJava 使用create()方法來創(chuàng)建一個(gè) Observable 悦屏,并為它定義事件觸發(fā)規(guī)則:

Observableobservable=Observable.create(newObservable.OnSubscribe(){@Overridepublicvoidcall(Subscribersubscriber){subscriber.onNext("Hello");subscriber.onNext("Hi");subscriber.onNext("Aloha");subscriber.onCompleted();}});

可以看到节沦,這里傳入了一個(gè)OnSubscribe對(duì)象作為參數(shù)键思。OnSubscribe會(huì)被存儲(chǔ)在返回的Observable對(duì)象中,它的作用相當(dāng)于一個(gè)計(jì)劃表甫贯,當(dāng)Observable被訂閱的時(shí)候吼鳞,OnSubscribe的call()方法會(huì)自動(dòng)被調(diào)用,事件序列就會(huì)依照設(shè)定依次觸發(fā)(對(duì)于上面的代碼获搏,就是觀察者Subscriber將會(huì)被調(diào)用三次onNext()和一次onCompleted())赖条。這樣,由被觀察者調(diào)用了觀察者的回調(diào)方法常熙,就實(shí)現(xiàn)了由被觀察者向觀察者的事件傳遞纬乍,即觀察者模式。

這個(gè)例子很簡單:事件的內(nèi)容是字符串裸卫,而不是一些復(fù)雜的對(duì)象仿贬;事件的內(nèi)容是已經(jīng)定好了的,而不像有的觀察者模式一樣是待確定的(例如網(wǎng)絡(luò)請(qǐng)求的結(jié)果在請(qǐng)求返回之前是未知的)墓贿;所有事件在一瞬間被全部發(fā)送出去茧泪,而不是夾雜一些確定或不確定的時(shí)間間隔或者經(jīng)過某種觸發(fā)器來觸發(fā)的×總之队伟,這個(gè)例子看起來毫無實(shí)用價(jià)值。但這是為了便于說明幽勒,實(shí)質(zhì)上只要你想嗜侮,各種各樣的事件發(fā)送規(guī)則你都可以自己來寫。至于具體怎么做啥容,后面都會(huì)講到锈颗,但現(xiàn)在不行。只有把基礎(chǔ)原理先說明白了咪惠,上層的運(yùn)用才能更容易說清楚击吱。

create()方法是 RxJava 最基本的創(chuàng)造事件序列的方法∫C粒基于這個(gè)方法覆醇, RxJava 還提供了一些方法用來快捷創(chuàng)建事件隊(duì)列,例如:

just(T...): 將傳入的參數(shù)依次發(fā)送出來渠鸽。

Observableobservable=Observable.just("Hello","Hi","Aloha");// 將會(huì)依次調(diào)用:// onNext("Hello");// onNext("Hi");// onNext("Aloha");// onCompleted();

from(T[])/from(Iterable): 將傳入的數(shù)組或Iterable拆分成具體對(duì)象后叫乌,依次發(fā)送出來。

String[]words={"Hello","Hi","Aloha"};Observableobservable=Observable.from(words);// 將會(huì)依次調(diào)用:// onNext("Hello");// onNext("Hi");// onNext("Aloha");// onCompleted();

上面just(T...)的例子和from(T[])的例子徽缚,都和之前的create(OnSubscribe)的例子是等價(jià)的。

3) Subscribe (訂閱)

創(chuàng)建了Observable和Observer之后革屠,再用subscribe()方法將它們聯(lián)結(jié)起來凿试,整條鏈子就可以工作了排宰。代碼形式很簡單:

observable.subscribe(observer);// 或者:observable.subscribe(subscriber);

有人可能會(huì)注意到,subscribe()這個(gè)方法有點(diǎn)怪:它看起來是『observalbe訂閱了observer/subscriber』而不是『observer/subscriber訂閱了observalbe』那婉,這看起來就像『雜志訂閱了讀者』一樣顛倒了對(duì)象關(guān)系板甘。這讓人讀起來有點(diǎn)別扭,不過如果把 API 設(shè)計(jì)成observer.subscribe(observable)/subscriber.subscribe(observable)详炬,雖然更加符合思維邏輯盐类,但對(duì)流式 API 的設(shè)計(jì)就造成影響了,比較起來明顯是得不償失的呛谜。

Observable.subscribe(Subscriber)的內(nèi)部實(shí)現(xiàn)是這樣的(僅核心代碼):

// 注意:這不是 subscribe() 的源碼在跳,而是將源碼中與性能、兼容性隐岛、擴(kuò)展性有關(guān)的代碼剔除后的核心代碼猫妙。// 如果需要看源碼,可以去 RxJava 的 GitHub 倉庫下載聚凹。publicSubscriptionsubscribe(Subscribersubscriber){subscriber.onStart();onSubscribe.call(subscriber);returnsubscriber;}

可以看到割坠,subscriber()做了3件事:

調(diào)用Subscriber.onStart()。這個(gè)方法在前面已經(jīng)介紹過妒牙,是一個(gè)可選的準(zhǔn)備方法彼哼。

調(diào)用Observable中的OnSubscribe.call(Subscriber)。在這里湘今,事件發(fā)送的邏輯開始運(yùn)行敢朱。從這也可以看出,在 RxJava 中象浑,Observable并不是在創(chuàng)建的時(shí)候就立即開始發(fā)送事件蔫饰,而是在它被訂閱的時(shí)候,即當(dāng)subscribe()方法執(zhí)行的時(shí)候愉豺。

將傳入的Subscriber作為Subscription返回篓吁。這是為了方便unsubscribe().

整個(gè)過程中對(duì)象間的關(guān)系如下圖:

或者可以看動(dòng)圖:

除了subscribe(Observer)和subscribe(Subscriber),subscribe()還支持不完整定義的回調(diào)蚪拦,RxJava 會(huì)自動(dòng)根據(jù)定義創(chuàng)建出Subscriber杖剪。形式如下:

Action1onNextAction=newAction1(){// onNext()@Overridepublicvoidcall(Strings){Log.d(tag,s);}};Action1onErrorAction=newAction1(){// onError()@Overridepublicvoidcall(Throwablethrowable){// Error handling}};Action0onCompletedAction=newAction0(){// onCompleted()@Overridepublicvoidcall(){Log.d(tag,"completed");}};// 自動(dòng)創(chuàng)建 Subscriber ,并使用 onNextAction 來定義 onNext()observable.subscribe(onNextAction);// 自動(dòng)創(chuàng)建 Subscriber 驰贷,并使用 onNextAction 和 onErrorAction 來定義 onNext() 和 onError()observable.subscribe(onNextAction,onErrorAction);// 自動(dòng)創(chuàng)建 Subscriber 盛嘿,并使用 onNextAction、 onErrorAction 和 onCompletedAction 來定義 onNext()括袒、 onError() 和 onCompleted()observable.subscribe(onNextAction,onErrorAction,onCompletedAction);

簡單解釋一下這段代碼中出現(xiàn)的Action1和Action0次兆。Action0是 RxJava 的一個(gè)接口,它只有一個(gè)方法call()锹锰,這個(gè)方法是無參無返回值的芥炭;由于onCompleted()方法也是無參無返回值的漓库,因此Action0可以被當(dāng)成一個(gè)包裝對(duì)象,將onCompleted()的內(nèi)容打包起來將自己作為一個(gè)參數(shù)傳入subscribe()以實(shí)現(xiàn)不完整定義的回調(diào)园蝠。這樣其實(shí)也可以看做將onCompleted()方法作為參數(shù)傳進(jìn)了subscribe()渺蒿,相當(dāng)于其他某些語言中的『閉包』。Action1也是一個(gè)接口彪薛,它同樣只有一個(gè)方法call(T param)茂装,這個(gè)方法也無返回值,但有一個(gè)參數(shù)善延;與Action0同理少态,由于onNext(T obj)和onError(Throwable error)也是單參數(shù)無返回值的,因此Action1可以將onNext(obj)和onError(error)打包起來傳入subscribe()以實(shí)現(xiàn)不完整定義的回調(diào)挚冤。事實(shí)上况增,雖然Action0和Action1在 API 中使用最廣泛,但 RxJava 是提供了多個(gè)ActionX形式的接口 (例如Action2,Action3) 的训挡,它們可以被用以包裝不同的無返回值的方法澳骤。

注:正如前面所提到的,Observer和Subscriber具有相同的角色澜薄,而且Observer在subscribe()過程中最終會(huì)被轉(zhuǎn)換成Subscriber對(duì)象为肮,因此,從這里開始肤京,后面的描述我將用Subscriber來代替Observer颊艳,這樣更加嚴(yán)謹(jǐn)。

4) 場(chǎng)景示例

下面舉兩個(gè)例子:

為了把原理用更清晰的方式表述出來忘分,本文中挑選的都是功能盡可能簡單的例子棋枕,以至于有些示例代碼看起來會(huì)有『畫蛇添足』『明明不用

RxJava 可以更簡便地解決問題』的感覺。當(dāng)你看到這種情況妒峦,不要覺得是因?yàn)?RxJava

太啰嗦重斑,而是因?yàn)樵谶^早的時(shí)候舉出真實(shí)場(chǎng)景的例子并不利于原理的解析,因此我刻意挑選了簡單的情景肯骇。

a. 打印字符串?dāng)?shù)組

將字符串?dāng)?shù)組names中的所有字符串依次打印出來:

String[]names=...;Observable.from(names).subscribe(newAction1(){@Overridepublicvoidcall(Stringname){Log.d(tag,name);}});

b. 由 id 取得圖片并顯示

由指定的一個(gè) drawable 文件 iddrawableRes取得圖片窥浪,并顯示在ImageView中,并在出現(xiàn)異常的時(shí)候打印 Toast 報(bào)錯(cuò):

intdrawableRes=...;ImageViewimageView=...;Observable.create(newOnSubscribe(){@Overridepublicvoidcall(Subscribersubscriber){Drawabledrawable=getTheme().getDrawable(drawableRes));subscriber.onNext(drawable);subscriber.onCompleted();}}).subscribe(newObserver(){@OverridepublicvoidonNext(Drawabledrawable){imageView.setImageDrawable(drawable);}@OverridepublicvoidonCompleted(){}@OverridepublicvoidonError(Throwablee){Toast.makeText(activity,"Error!",Toast.LENGTH_SHORT).show();}});

正如上面兩個(gè)例子這樣笛丙,創(chuàng)建出Observable和Subscriber漾脂,再用subscribe()將它們串起來,一次 RxJava 的基本使用就完成了胚鸯。非常簡單骨稿。

然而,

在 RxJava 的默認(rèn)規(guī)則中,事件的發(fā)出和消費(fèi)都是在同一個(gè)線程的啊终。也就是說镜豹,如果只用上面的方法傲须,實(shí)現(xiàn)出來的只是一個(gè)同步的觀察者模式蓝牲。觀察者模式本身的目的就是『后臺(tái)處理,前臺(tái)回調(diào)』的異步機(jī)制泰讽,因此異步對(duì)于 RxJava 是至關(guān)重要的例衍。而要實(shí)現(xiàn)異步,則需要用到 RxJava 的另一個(gè)概念:Scheduler已卸。

3. 線程控制 —— Scheduler (一)

在不指定線程的情況下佛玄, RxJava 遵循的是線程不變的原則,即:在哪個(gè)線程調(diào)用subscribe()累澡,就在哪個(gè)線程生產(chǎn)事件梦抢;在哪個(gè)線程生產(chǎn)事件,就在哪個(gè)線程消費(fèi)事件愧哟。如果需要切換線程奥吩,就需要用到Scheduler(調(diào)度器)。

1) Scheduler 的 API (一)

在RxJava 中蕊梧,Scheduler——調(diào)度器霞赫,相當(dāng)于線程控制器,RxJava 通過它來指定每一段代碼應(yīng)該運(yùn)行在什么樣的線程肥矢。RxJava 已經(jīng)內(nèi)置了幾個(gè)Scheduler端衰,它們已經(jīng)適合大多數(shù)的使用場(chǎng)景:

Schedulers.immediate(): 直接在當(dāng)前線程運(yùn)行,相當(dāng)于不指定線程甘改。這是默認(rèn)的Scheduler旅东。

Schedulers.newThread(): 總是啟用新線程,并在新線程執(zhí)行操作十艾。

Schedulers.io(): I/O 操作(讀寫文件抵代、讀寫數(shù)據(jù)庫、網(wǎng)絡(luò)信息交互等)所使用的Scheduler疟羹。行為模式和newThread()差不多主守,區(qū)別在于io()的內(nèi)部實(shí)現(xiàn)是是用一個(gè)無數(shù)量上限的線程池,可以重用空閑的線程榄融,因此多數(shù)情況下io()比newThread()更有效率参淫。不要把計(jì)算工作放在io()中,可以避免創(chuàng)建不必要的線程愧杯。

Schedulers.computation(): 計(jì)算所使用的Scheduler涎才。這個(gè)計(jì)算指的是 CPU 密集型計(jì)算,即不會(huì)被 I/O 等操作限制性能的操作,例如圖形的計(jì)算耍铜。這個(gè)Scheduler使用的固定的線程池,大小為 CPU 核數(shù)棕兼。不要把 I/O 操作放在computation()中陡舅,否則 I/O 操作的等待時(shí)間會(huì)浪費(fèi) CPU。

另外伴挚, Android 還有一個(gè)專用的AndroidSchedulers.mainThread()靶衍,它指定的操作將在 Android 主線程運(yùn)行。

有了這幾個(gè)Scheduler茎芋,就可以使用subscribeOn()和observeOn()兩個(gè)方法來對(duì)線程進(jìn)行控制了颅眶。 *subscribeOn(): 指定subscribe()所發(fā)生的線程,即Observable.OnSubscribe被激活時(shí)所處的線程田弥√涡铮或者叫做事件產(chǎn)生的線程。 *observeOn(): 指定Subscriber所運(yùn)行在的線程偷厦∩烫荆或者叫做事件消費(fèi)的線程。

文字?jǐn)⑹隹倸w難理解沪哺,上代碼:

Observable.just(1,2,3,4).subscribeOn(Schedulers.io())// 指定 subscribe() 發(fā)生在 IO 線程.observeOn(AndroidSchedulers.mainThread())// 指定 Subscriber 的回調(diào)發(fā)生在主線程.subscribe(newAction1(){@Overridepublicvoidcall(Integernumber){Log.d(tag,"number:"+number);}});

上面這段代碼中沈自,由于subscribeOn(Schedulers.io())的指定,被創(chuàng)建的事件的內(nèi)容1辜妓、2枯途、3、4將會(huì)在 IO 線程發(fā)出籍滴;而由于observeOn(AndroidScheculers.mainThread()) 的指定酪夷,因此subscriber數(shù)字的打印將發(fā)生在主線程 。事實(shí)上孽惰,這種在subscribe()之前寫上兩句subscribeOn(Scheduler.io())和observeOn(AndroidSchedulers.mainThread())的使用方式非常常見晚岭,它適用于多數(shù)的 『后臺(tái)線程取數(shù)據(jù),主線程顯示』的程序策略勋功。

而前面提到的由圖片 id 取得圖片并顯示的例子坦报,如果也加上這兩句:

intdrawableRes=...;ImageViewimageView=...;Observable.create(newOnSubscribe(){@Overridepublicvoidcall(Subscribersubscriber){Drawabledrawable=getTheme().getDrawable(drawableRes));subscriber.onNext(drawable);subscriber.onCompleted();}}).subscribeOn(Schedulers.io())// 指定 subscribe() 發(fā)生在 IO 線程.observeOn(AndroidSchedulers.mainThread())// 指定 Subscriber 的回調(diào)發(fā)生在主線程.subscribe(newObserver(){@OverridepublicvoidonNext(Drawabledrawable){imageView.setImageDrawable(drawable);}@OverridepublicvoidonCompleted(){}@OverridepublicvoidonError(Throwablee){Toast.makeText(activity,"Error!",Toast.LENGTH_SHORT).show();}});

那么,加載圖片將會(huì)發(fā)生在 IO 線程狂鞋,而設(shè)置圖片則被設(shè)定在了主線程片择。這就意味著,即使加載圖片耗費(fèi)了幾十甚至幾百毫秒的時(shí)間骚揍,也不會(huì)造成絲毫界面的卡頓字管。

2) Scheduler 的原理 (一)

RxJava 的 Scheduler API 很方便啰挪,也很神奇(加了一句話就把線程切換了,怎么做到的嘲叔?而且subscribe()不是最外層直接調(diào)用的方法嗎亡呵,它竟然也能被指定線程?)硫戈。然而 Scheduler 的原理需要放在后面講锰什,因?yàn)樗脑硎且韵乱还?jié)《變換》的原理作為基礎(chǔ)的。

好吧這一節(jié)其實(shí)我屁也沒說掏愁,只是為了讓你安心歇由,讓你知道我不是忘了講原理,而是把它放在了更合適的地方果港。

4. 變換

終于要到牛逼的地方了,不管你激動(dòng)不激動(dòng)糊昙,反正我是激動(dòng)了辛掠。

RxJava 提供了對(duì)事件序列進(jìn)行變換的支持,這是它的核心功能之一释牺,也是大多數(shù)人說『RxJava 真是太好用了』的最大原因萝衩。所謂變換,就是將事件序列中的對(duì)象或整個(gè)序列進(jìn)行加工處理没咙,轉(zhuǎn)換成不同的事件或事件序列猩谊。概念說著總是模糊難懂的,來看 API祭刚。

1) API

首先看一個(gè)map()的例子:

Observable.just("images/logo.png")// 輸入類型 String.map(newFunc1(){@OverridepublicBitmapcall(StringfilePath){// 參數(shù)類型 StringreturngetBitmapFromPath(filePath);// 返回類型 Bitmap}}).subscribe(newAction1(){@Overridepublicvoidcall(Bitmapbitmap){// 參數(shù)類型 BitmapshowBitmap(bitmap);}});

這里出現(xiàn)了一個(gè)叫做Func1的類杂腰。它和Action1非常相似呈野,也是 RxJava 的一個(gè)接口,用于包裝含有一個(gè)參數(shù)的方法。Func1和Action的區(qū)別在于锨亏,F(xiàn)unc1包裝的是有返回值的方法。另外闯捎,和ActionX一樣习劫,F(xiàn)uncX也有多個(gè),用于不同參數(shù)個(gè)數(shù)的方法棒口。FuncX和ActionX的區(qū)別在FuncX包裝的是有返回值的方法寄月。

可以看到,map()方法將參數(shù)中的String對(duì)象轉(zhuǎn)換成一個(gè)Bitmap對(duì)象后返回无牵,而在經(jīng)過map()方法后漾肮,事件的參數(shù)類型也由String轉(zhuǎn)為了Bitmap。這種直接變換對(duì)象并返回的合敦,是最常見的也最容易理解的變換初橘。不過 RxJava 的變換遠(yuǎn)不止這樣,它不僅可以針對(duì)事件對(duì)象,還可以針對(duì)整個(gè)事件隊(duì)列保檐,這使得 RxJava 變得非常靈活耕蝉。我列舉幾個(gè)常用的變換:

map(): 事件對(duì)象的直接變換,具體功能上面已經(jīng)介紹過夜只。它是 RxJava 最常用的變換垒在。map()的示意圖:

flatMap(): 這是一個(gè)很有用但非常難理解的變換,因此我決定花多些篇幅來介紹它扔亥。 首先假設(shè)這么一種需求:假設(shè)有一個(gè)數(shù)據(jù)結(jié)構(gòu)『學(xué)生』场躯,現(xiàn)在需要打印出一組學(xué)生的名字。實(shí)現(xiàn)方式很簡單:

Student[]students=...;Subscribersubscriber=newSubscriber(){@OverridepublicvoidonNext(Stringname){Log.d(tag,name);}...};Observable.from(students).map(newFunc1(){@OverridepublicStringcall(Studentstudent){returnstudent.getName();}}).subscribe(subscriber);

很簡單旅挤。那么再假設(shè):如果要打印出每個(gè)學(xué)生所需要修的所有課程的名稱呢踢关?(需求的區(qū)別在于,每個(gè)學(xué)生只有一個(gè)名字粘茄,但卻有多個(gè)課程签舞。)首先可以這樣實(shí)現(xiàn):

Student[]students=...;Subscribersubscriber=newSubscriber(){@OverridepublicvoidonNext(Studentstudent){Listcourses=student.getCourses();for(inti=0;i

依然很簡單。那么如果我不想在Subscriber中使用 for 循環(huán)柒瓣,而是希望Subscriber中直接傳入單個(gè)的Course對(duì)象呢(這對(duì)于代碼復(fù)用很重要)儒搭?用map()顯然是不行的,因?yàn)閙ap()是一對(duì)一的轉(zhuǎn)化芙贫,而我現(xiàn)在的要求是一對(duì)多的轉(zhuǎn)化搂鲫。那怎么才能把一個(gè) Student 轉(zhuǎn)化成多個(gè) Course 呢?

這個(gè)時(shí)候磺平,就需要用flatMap()了:

Student[]students=...;Subscribersubscriber=newSubscriber(){@OverridepublicvoidonNext(Coursecourse){Log.d(tag,course.getName());}...};Observable.from(students).flatMap(newFunc1>(){@OverridepublicObservablecall(Studentstudent){returnObservable.from(student.getCourses());}}).subscribe(subscriber);

從上面的代碼可以看出魂仍,flatMap()和map()有一個(gè)相同點(diǎn):它也是把傳入的參數(shù)轉(zhuǎn)化之后返回另一個(gè)對(duì)象。但需要注意褪秀,和map()不同的是蓄诽,flatMap()中返回的是個(gè)Observable對(duì)象,并且這個(gè)Observable對(duì)象并不是被直接發(fā)送到了Subscriber的回調(diào)方法中媒吗。flatMap()的原理是這樣的:1. 使用傳入的事件對(duì)象創(chuàng)建一個(gè)Observable對(duì)象仑氛;2. 并不發(fā)送這個(gè)Observable, 而是將它激活,于是它開始發(fā)送事件闸英;3. 每一個(gè)創(chuàng)建出來的Observable發(fā)送的事件锯岖,都被匯入同一個(gè)Observable,而這個(gè)Observable負(fù)責(zé)將這些事件統(tǒng)一交給Subscriber的回調(diào)方法甫何。這三個(gè)步驟出吹,把事件拆成了兩級(jí),通過一組新創(chuàng)建的Observable將初始的對(duì)象『鋪平』之后通過統(tǒng)一路徑分發(fā)了下去辙喂。而這個(gè)『鋪平』就是flatMap()所謂的 flat捶牢。

flatMap()示意圖:

擴(kuò)展:由于可以在嵌套的Observable中添加異步代碼鸠珠,flatMap()也常用于嵌套的異步操作,例如嵌套的網(wǎng)絡(luò)請(qǐng)求秋麸。示例代碼(Retrofit + RxJava):

networkClient.token()// 返回 Observable渐排,在訂閱時(shí)請(qǐng)求 token,并在響應(yīng)后發(fā)送 token.flatMap(newFunc1>(){@OverridepublicObservablecall(Stringtoken){// 返回 Observable灸蟆,在訂閱時(shí)請(qǐng)求消息列表驯耻,并在響應(yīng)后發(fā)送請(qǐng)求到的消息列表returnnetworkClient.messages();}}).subscribe(newAction1(){@Overridepublicvoidcall(Messagesmessages){// 處理顯示消息列表showMessages(messages);}});

傳統(tǒng)的嵌套請(qǐng)求需要使用嵌套的 Callback 來實(shí)現(xiàn)。而通過flatMap()炒考,可以把嵌套的請(qǐng)求寫在一條鏈中可缚,從而保持程序邏輯的清晰。

throttleFirst(): 在每次事件觸發(fā)后的一定時(shí)間間隔內(nèi)丟棄新的事件斋枢。常用作去抖動(dòng)過濾帘靡,例如按鈕的點(diǎn)擊監(jiān)聽器:RxView.clickEvents(button)

// RxBinding 代碼,后面的文章有解釋 .throttleFirst(500, TimeUnit.MILLISECONDS) //

設(shè)置防抖間隔為 500ms .subscribe(subscriber);媽媽再也不怕我的用戶手抖點(diǎn)開兩個(gè)重復(fù)的界面啦杏慰。

此外测柠, RxJava 還提供很多便捷的方法來實(shí)現(xiàn)事件序列的變換,這里就不一一舉例了缘滥。

2) 變換的原理:lift()

這些變換雖然功能各有不同,但實(shí)質(zhì)上都是針對(duì)事件序列的處理和再發(fā)送谒主。而在 RxJava 的內(nèi)部朝扼,它們是基于同一個(gè)基礎(chǔ)的變換方法:lift(Operator)。首先看一下lift()的內(nèi)部實(shí)現(xiàn)(僅核心代碼):

// 注意:這不是 lift() 的源碼霎肯,而是將源碼中與性能擎颖、兼容性、擴(kuò)展性有關(guān)的代碼剔除后的核心代碼观游。// 如果需要看源碼搂捧,可以去 RxJava 的 GitHub 倉庫下載。publicObservablelift(Operatoroperator){returnObservable.create(newOnSubscribe(){@Overridepublicvoidcall(Subscribersubscriber){SubscribernewSubscriber=operator.call(subscriber);newSubscriber.onStart();onSubscribe.call(newSubscriber);}});}

這段代碼很有意思:它生成了一個(gè)新的Observable并返回懂缕,而且創(chuàng)建新Observable所用的參數(shù)OnSubscribe的回調(diào)方法call()中的實(shí)現(xiàn)竟然看起來和前面講過的Observable.subscribe()一樣允跑!然而它們并不一樣喲~不一樣的地方關(guān)鍵就在于第二行onSubscribe.call(subscriber)中的onSubscribe所指代的對(duì)象不同(高能預(yù)警:接下來的幾句話可能會(huì)導(dǎo)致身體的嚴(yán)重不適)——

subscribe()中這句話的onSubscribe指的是Observable中的onSubscribe對(duì)象,這個(gè)沒有問題搪柑,但是lift()之后的情況就復(fù)雜了點(diǎn)聋丝。

當(dāng)含有l(wèi)ift()時(shí):

1.lift()創(chuàng)建了一個(gè)Observable后,加上之前的原始Observable工碾,已經(jīng)有兩個(gè)Observable了弱睦;

2.而同樣地,新Observable里的新OnSubscribe加上之前的原始Observable中的原始OnSubscribe渊额,也就有了兩個(gè)OnSubscribe况木;

3.當(dāng)用戶調(diào)用經(jīng)過lift()后的Observable的subscribe()的時(shí)候垒拢,使用的是lift()所返回的新的Observable,于是它所觸發(fā)的onSubscribe.call(subscriber)火惊,也是用的新Observable中的新OnSubscribe求类,即在lift()中生成的那個(gè)OnSubscribe;

4.而這個(gè)新OnSubscribe的call()方法中的onSubscribe矗晃,就是指的原始Observable中的原始OnSubscribe仑嗅,在這個(gè)call()方法里,新OnSubscribe利用operator.call(subscriber)生成了一個(gè)新的Subscriber(Operator就是在這里张症,通過自己的call()方法將新Subscriber和原始Subscriber進(jìn)行關(guān)聯(lián)仓技,并插入自己的『變換』代碼以實(shí)現(xiàn)變換),然后利用這個(gè)新Subscriber向原始Observable進(jìn)行訂閱俗他。

這樣就實(shí)現(xiàn)了lift()過程脖捻,有點(diǎn)像一種代理機(jī)制,通過事件攔截和處理實(shí)現(xiàn)事件序列的變換兆衅。

精簡掉細(xì)節(jié)的話地沮,也可以這么說:在Observable執(zhí)行了lift(Operator)方法之后,會(huì)返回一個(gè)新的Observable羡亩,這個(gè)新的Observable會(huì)像一個(gè)代理一樣摩疑,負(fù)責(zé)接收原始的Observable發(fā)出的事件,并在處理后發(fā)送給Subscriber畏铆。

如果你更喜歡具象思維雷袋,可以看圖:

或者可以看動(dòng)圖:

兩次和多次的lift()同理,如下圖:

舉一個(gè)具體的Operator的實(shí)現(xiàn)辞居。下面這是一個(gè)將事件中的Integer對(duì)象轉(zhuǎn)換成String的例子楷怒,僅供參考:

observable.lift(newObservable.Operator(){@OverridepublicSubscribercall(finalSubscribersubscriber){// 將事件序列中的 Integer 對(duì)象轉(zhuǎn)換為 String 對(duì)象returnnewSubscriber(){@OverridepublicvoidonNext(Integerinteger){subscriber.onNext(""+integer);}@OverridepublicvoidonCompleted(){subscriber.onCompleted();}@OverridepublicvoidonError(Throwablee){subscriber.onError(e);}};}});

講述lift()的原理只是為了讓你更好地了解 RxJava ,從而可以更好地使用它瓦灶。然而不管你是否理解了lift()的原理鸠删,RxJava 都不建議開發(fā)者自定義Operator來直接使用lift(),而是建議盡量使用已有的lift()包裝方法(如map()flatMap()等)進(jìn)行組合來實(shí)現(xiàn)需求贼陶,因?yàn)橹苯邮褂?lift() 非常容易發(fā)生一些難以發(fā)現(xiàn)的錯(cuò)誤刃泡。

3) compose: 對(duì) Observable 整體的變換

除了lift()之外,Observable還有一個(gè)變換方法叫做compose(Transformer)每界。它和lift()的區(qū)別在于捅僵,lift()是針對(duì)事件項(xiàng)和事件序列的,而compose()是針對(duì)Observable自身進(jìn)行變換眨层。舉個(gè)例子庙楚,假設(shè)在程序中有多個(gè)Observable,并且他們都需要應(yīng)用一組相同的lift()變換趴樱。你可以這么寫:

observable1.lift1().lift2().lift3().lift4().subscribe(subscriber1);observable2.lift1().lift2().lift3().lift4().subscribe(subscriber2);observable3.lift1().lift2().lift3().lift4().subscribe(subscriber3);observable4.lift1().lift2().lift3().lift4().subscribe(subscriber1);

你覺得這樣太不軟件工程了馒闷,于是你改成了這樣:

privateObservableliftAll(Observableobservable){returnobservable.lift1().lift2().lift3().lift4();}...liftAll(observable1).subscribe(subscriber1);liftAll(observable2).subscribe(subscriber2);liftAll(observable3).subscribe(subscriber3);liftAll(observable4).subscribe(subscriber4);

可讀性酪捡、可維護(hù)性都提高了∧烧耍可是Observable被一個(gè)方法包起來逛薇,這種方式對(duì)于Observale的靈活性似乎還是增添了那么點(diǎn)限制呢袱。怎么辦?這個(gè)時(shí)候棒旗,就應(yīng)該用compose()來解決了:

publicclassLiftAllTransformerimplementsObservable.Transformer{@OverridepublicObservablecall(Observableobservable){returnobservable.lift1().lift2().lift3().lift4();}}...TransformerliftAll=newLiftAllTransformer();observable1.compose(liftAll).subscribe(subscriber1);observable2.compose(liftAll).subscribe(subscriber2);observable3.compose(liftAll).subscribe(subscriber3);observable4.compose(liftAll).subscribe(subscriber4);

像上面這樣,使用compose()方法卿吐,Observable可以利用傳入的Transformer對(duì)象的call方法直接對(duì)自身進(jìn)行處理纳猫,也就不必被包在方法的里面了。

compose()的原理比較簡單诗舰,不附圖嘍。

5. 線程控制:Scheduler (二)

除了靈活的變換渔呵,RxJava 另一個(gè)牛逼的地方双饥,就是線程的自由控制弟断。

1) Scheduler 的 API (二)

前面講到了咏花,可以利用subscribeOn()結(jié)合observeOn()來實(shí)現(xiàn)線程控制,讓事件的產(chǎn)生和消費(fèi)發(fā)生在不同的線程夫嗓〕俾荩可是在了解了map()flatMap()等變換方法后冲秽,有些好事的(其實(shí)就是當(dāng)初剛接觸 RxJava 時(shí)的我)就問了:能不能多切換幾次線程?

答案是:能矩父。因?yàn)閛bserveOn()指定的是Subscriber的線程锉桑,而這個(gè)Subscriber并不是(嚴(yán)格說應(yīng)該為『不一定是』,但這里不妨理解為『不是』)subscribe()參數(shù)中的Subscriber窍株,而是observeOn()執(zhí)行時(shí)的當(dāng)前Observable所對(duì)應(yīng)的Subscriber民轴,即它的直接下級(jí)Subscriber。換句話說球订,observeOn()指定的是它之后的操作所在的線程后裸。因此如果有多次切換線程的需求,只要在每個(gè)想要切換線程的位置調(diào)用一次observeOn()即可冒滩。上代碼:

Observable.just(1,2,3,4)// IO 線程微驶,由 subscribeOn() 指定.subscribeOn(Schedulers.io()).observeOn(Schedulers.newThread()).map(mapOperator)// 新線程,由 observeOn() 指定.observeOn(Schedulers.io()).map(mapOperator2)// IO 線程开睡,由 observeOn() 指定.observeOn(AndroidSchedulers.mainThread).subscribe(subscriber);// Android 主線程因苹,由 observeOn() 指定

如上,通過observeOn()的多次調(diào)用篇恒,程序?qū)崿F(xiàn)了線程的多次切換扶檐。

不過,不同于observeOn()胁艰,subscribeOn()的位置放在哪里都可以款筑,但它是只能調(diào)用一次的。

又有好事的(其實(shí)還是當(dāng)初的我)問了:如果我非要調(diào)用多次subscribeOn()呢腾么?會(huì)有什么效果奈梳?

這個(gè)問題先放著,我們還是從 RxJava 線程控制的原理說起吧解虱。

2) Scheduler 的原理(二)

其實(shí)颈嚼,subscribeOn()和observeOn()的內(nèi)部實(shí)現(xiàn),也是用的lift()饭寺。具體看圖(不同顏色的箭頭表示不同的線程):

subscribeOn()原理圖:

observeOn()原理圖:

從圖中可以看出,subscribeOn()和observeOn()都做了線程切換的工作(圖中的 "schedule..." 部位)叫挟。不同的是艰匙,subscribeOn()的線程切換發(fā)生在OnSubscribe中,即在它通知上一級(jí)OnSubscribe時(shí)抹恳,這時(shí)事件還沒有開始發(fā)送员凝,因此subscribeOn()的線程控制可以從事件發(fā)出的開端就造成影響;而observeOn()的線程切換則發(fā)生在它內(nèi)建的Subscriber中奋献,即發(fā)生在它即將給下一級(jí)Subscriber發(fā)送事件時(shí)健霹,因此observeOn()控制的是它后面的線程旺上。

最后,我用一張圖來解釋當(dāng)多個(gè)subscribeOn()和observeOn()混合使用時(shí)糖埋,線程調(diào)度是怎么發(fā)生的(由于圖中對(duì)象較多宣吱,相對(duì)于上面的圖對(duì)結(jié)構(gòu)做了一些簡化調(diào)整):

圖中共有 5 處含有對(duì)事件的操作。由圖中可以看出瞳别,①和②兩處受第一個(gè)subscribeOn()影響征候,運(yùn)行在紅色線程;③和④處受第一個(gè)observeOn()的影響祟敛,運(yùn)行在綠色線程疤坝;⑤處受第二個(gè)onserveOn()影響,運(yùn)行在紫色線程馆铁;而第二個(gè)subscribeOn()跑揉,由于在通知過程中線程就被第一個(gè)subscribeOn()截?cái)啵虼藢?duì)整個(gè)流程并沒有任何影響埠巨。這里也就回答了前面的問題:當(dāng)使用了多個(gè)subscribeOn()的時(shí)候历谍,只有第一個(gè)subscribeOn()起作用。

3) 延伸:doOnSubscribe()

然而乖订,雖然超過一個(gè)的subscribeOn()對(duì)事件處理的流程沒有影響扮饶,但在流程之前卻是可以利用的。

在前面講Subscriber的時(shí)候乍构,提到過Subscriber的onStart()可以用作流程開始前的初始化甜无。然而onStart()由于在subscribe()發(fā)生時(shí)就被調(diào)用了,因此不能指定線程哥遮,而是只能執(zhí)行在subscribe()被調(diào)用時(shí)的線程岂丘。這就導(dǎo)致如果onStart()中含有對(duì)線程有要求的代碼(例如在界面上顯示一個(gè) ProgressBar,這必須在主線程執(zhí)行)眠饮,將會(huì)有線程非法的風(fēng)險(xiǎn)奥帘,因?yàn)橛袝r(shí)你無法預(yù)測(cè)subscribe()將會(huì)在什么線程執(zhí)行。

而與Subscriber.onStart()相對(duì)應(yīng)的仪召,有一個(gè)方法Observable.doOnSubscribe()寨蹋。它和Subscriber.onStart()同樣是在subscribe()調(diào)用后而且在事件發(fā)送前執(zhí)行,但區(qū)別在于它可以指定線程扔茅。默認(rèn)情況下已旧,doOnSubscribe()執(zhí)行在subscribe()發(fā)生的線程;而如果在doOnSubscribe()之后有subscribeOn()的話召娜,它將執(zhí)行在離它最近的subscribeOn()所指定的線程运褪。

示例代碼:

Observable.create(onSubscribe).subscribeOn(Schedulers.io()).doOnSubscribe(newAction0(){@Overridepublicvoidcall(){progressBar.setVisibility(View.VISIBLE);// 需要在主線程執(zhí)行}}).subscribeOn(AndroidSchedulers.mainThread())// 指定主線程.observeOn(AndroidSchedulers.mainThread()).subscribe(subscriber);

如上,在doOnSubscribe()的后面跟一個(gè)subscribeOn(),就能指定準(zhǔn)備工作的線程了秸讹。

RxJava 的適用場(chǎng)景和使用方式

1. 與 Retrofit 的結(jié)合

Retrofit 是 Square 的一個(gè)著名的網(wǎng)絡(luò)請(qǐng)求庫檀咙。沒有用過 Retrofit 的可以選擇跳過這一小節(jié)也沒關(guān)系,我舉的每種場(chǎng)景都只是個(gè)例子璃诀,而且例子之間并無前后關(guān)聯(lián)弧可,只是個(gè)拋磚引玉的作用,所以你跳過這里看別的場(chǎng)景也可以的文虏。

Retrofit 除了提供了傳統(tǒng)的Callback形式的 API侣诺,還有 RxJava 版本的Observable形式 API。下面我用對(duì)比的方式來介紹 Retrofit 的 RxJava 版 API 和傳統(tǒng)版本的區(qū)別氧秘。

以獲取一個(gè)User對(duì)象的接口作為例子年鸳。使用Retrofit 的傳統(tǒng) API,你可以用這樣的方式來定義請(qǐng)求:

@GET("/user")publicvoidgetUser(@Query("userId")StringuserId,Callbackcallback);

在程序的構(gòu)建過程中丸相, Retrofit 會(huì)把自動(dòng)把方法實(shí)現(xiàn)并生成代碼搔确,然后開發(fā)者就可以利用下面的方法來獲取特定用戶并處理響應(yīng):

getUser(userId,newCallback(){@Overridepublicvoidsuccess(Useruser){userView.setUser(user);}@Overridepublicvoidfailure(RetrofitErrorerror){// Error handling...}};

而使用 RxJava 形式的 API,定義同樣的請(qǐng)求是這樣的:

@GET("/user")publicObservablegetUser(@Query("userId")StringuserId);

使用的時(shí)候是這樣的:

getUser(userId).observeOn(AndroidSchedulers.mainThread()).subscribe(newObserver(){@OverridepublicvoidonNext(Useruser){userView.setUser(user);}@OverridepublicvoidonCompleted(){}@OverridepublicvoidonError(Throwableerror){// Error handling...}});

看到區(qū)別了嗎灭忠?

當(dāng) RxJava 形式的時(shí)候膳算,Retrofit 把請(qǐng)求封裝進(jìn)Observable,在請(qǐng)求結(jié)束后調(diào)用onNext()或在請(qǐng)求失敗后調(diào)用onError()弛作。

對(duì)比來看涕蜂,Callback形式和Observable形式長得不太一樣,但本質(zhì)都差不多映琳,而且在細(xì)節(jié)上Observable形式似乎還比Callback形式要差點(diǎn)机隙。那 Retrofit 為什么還要提供 RxJava 的支持呢?

因?yàn)樗糜冒萨西∮新梗倪@個(gè)例子看不出來是因?yàn)檫@只是最簡單的情況。而一旦情景復(fù)雜起來谎脯,Callback形式馬上就會(huì)開始讓人頭疼葱跋。比如:

假設(shè)這么一種情況:你的程序取到的User并不應(yīng)該直接顯示,而是需要先與數(shù)據(jù)庫中的數(shù)據(jù)進(jìn)行比對(duì)和修正后再顯示源梭。使用Callback方式大概可以這么寫:

getUser(userId,newCallback(){@Overridepublicvoidsuccess(Useruser){processUser(user);// 嘗試修正 User 數(shù)據(jù)userView.setUser(user);}@Overridepublicvoidfailure(RetrofitErrorerror){// Error handling...}};

有問題嗎娱俺?

很簡便,但不要這樣做废麻。為什么矢否?因?yàn)檫@樣做會(huì)影響性能。數(shù)據(jù)庫的操作很重脑溢,一次讀寫操作花費(fèi) 10~20ms 是很常見的,這樣的耗時(shí)很容易造成界面的卡頓。所以通常情況下屑彻,如果可以的話一定要避免在主線程中處理數(shù)據(jù)庫验庙。所以為了提升性能,這段代碼可以優(yōu)化一下:

getUser(userId,newCallback(){@Overridepublicvoidsuccess(Useruser){newThread(){@Overridepublicvoidrun(){processUser(user);// 嘗試修正 User 數(shù)據(jù)runOnUiThread(newRunnable(){// 切回 UI 線程@Overridepublicvoidrun(){userView.setUser(user);}});}).start();}@Overridepublicvoidfailure(RetrofitErrorerror){// Error handling...}};

性能問題解決社牲,但……這代碼實(shí)在是太亂了粪薛,迷之縮進(jìn)啊搏恤!雜亂的代碼往往不僅僅是美觀問題违寿,因?yàn)榇a越亂往往就越難讀懂,而如果項(xiàng)目中充斥著雜亂的代碼熟空,無疑會(huì)降低代碼的可讀性藤巢,造成團(tuán)隊(duì)開發(fā)效率的降低和出錯(cuò)率的升高。

這時(shí)候息罗,如果用 RxJava 的形式掂咒,就好辦多了。 RxJava 形式的代碼是這樣的:

getUser(userId).doOnNext(newAction1(){@Overridepublicvoidcall(Useruser){processUser(user);}).observeOn(AndroidSchedulers.mainThread()).subscribe(newObserver(){@OverridepublicvoidonNext(Useruser){userView.setUser(user);}@OverridepublicvoidonCompleted(){}@OverridepublicvoidonError(Throwableerror){// Error handling...}});

后臺(tái)代碼和前臺(tái)代碼全都寫在一條鏈中迈喉,明顯清晰了很多绍刮。

再舉一個(gè)例子:假設(shè)/user接口并不能直接訪問,而需要填入一個(gè)在線獲取的token挨摸,代碼應(yīng)該怎么寫孩革?

Callback方式,可以使用嵌套的Callback:

@GET("/token")publicvoidgetToken(Callbackcallback);@GET("/user")publicvoidgetUser(@Query("token")Stringtoken,@Query("userId")StringuserId,Callbackcallback);...getToken(newCallback(){@Overridepublicvoidsuccess(Stringtoken){getUser(token,userId,newCallback(){@Overridepublicvoidsuccess(Useruser){userView.setUser(user);}@Overridepublicvoidfailure(RetrofitErrorerror){// Error handling...}};}@Overridepublicvoidfailure(RetrofitErrorerror){// Error handling...}});

倒是沒有什么性能問題得运,可是迷之縮進(jìn)毀一生膝蜈,你懂我也懂,做過大項(xiàng)目的人應(yīng)該更懂澈圈。

而使用 RxJava 的話彬檀,代碼是這樣的:

@GET("/token")publicObservablegetToken();@GET("/user")publicObservablegetUser(@Query("token")Stringtoken,@Query("userId")StringuserId);...getToken().flatMap(newFunc1>(){@OverridepublicObservableonNext(Stringtoken){returngetUser(token,userId);}).observeOn(AndroidSchedulers.mainThread()).subscribe(newObserver(){@OverridepublicvoidonNext(Useruser){userView.setUser(user);}@OverridepublicvoidonCompleted(){}@OverridepublicvoidonError(Throwableerror){// Error handling...}});

用一個(gè)flatMap()就搞定了邏輯,依然是一條鏈瞬女∏系郏看著就很爽,是吧诽偷?

2016/03/31 更新坤学,加上我寫的一個(gè) Sample 項(xiàng)目:

rengwuxian RxJava Samples

好,Retrofit 部分就到這里报慕。

2. RxBinding

RxBinding是 Jake Wharton 的一個(gè)開源庫深浮,它提供了一套在 Android 平臺(tái)上的基于 RxJava 的 Binding API。所謂 Binding眠冈,就是類似設(shè)置OnClickListener飞苇、設(shè)置TextWatcher這樣的注冊(cè)綁定對(duì)象的 API菌瘫。

舉個(gè)設(shè)置點(diǎn)擊監(jiān)聽的例子。使用RxBinding布卡,可以把事件監(jiān)聽用這樣的方法來設(shè)置:

Buttonbutton=...;RxView.clickEvents(button)// 以 Observable 形式來反饋點(diǎn)擊事件.subscribe(newAction1(){@Overridepublicvoidcall(ViewClickEventevent){// Click handling}});

看起來除了形式變了沒什么區(qū)別雨让,實(shí)質(zhì)上也是這樣。甚至如果你看一下它的源碼忿等,你會(huì)發(fā)現(xiàn)它連實(shí)現(xiàn)都沒什么驚喜:它的內(nèi)部是直接用一個(gè)包裹著的setOnClickListener()來實(shí)現(xiàn)的栖忠。然而,僅僅這一個(gè)形式的改變贸街,卻恰好就是RxBinding的目的:擴(kuò)展性庵寞。通過RxBinding把點(diǎn)擊監(jiān)聽轉(zhuǎn)換成Observable之后,就有了對(duì)它進(jìn)行擴(kuò)展的可能薛匪。擴(kuò)展的方式有很多捐川,根據(jù)需求而定。一個(gè)例子是前面提到過的throttleFirst()蛋辈,用于去抖動(dòng)属拾,也就是消除手抖導(dǎo)致的快速連環(huán)點(diǎn)擊:

RxView.clickEvents(button).throttleFirst(500,TimeUnit.MILLISECONDS).subscribe(clickAction);

如果想對(duì)RxBinding有更多了解,可以去它的GitHub 項(xiàng)目下面看看冷溶。

3. 各種異步操作

前面舉的Retrofit和RxBinding的例子渐白,是兩個(gè)可以提供現(xiàn)成的Observable的庫。而如果你有某些異步操作無法用這些庫來自動(dòng)生成Observable逞频,也完全可以自己寫纯衍。例如數(shù)據(jù)庫的讀寫、大圖片的載入苗胀、文件壓縮/解壓等各種需要放在后臺(tái)工作的耗時(shí)操作襟诸,都可以用 RxJava 來實(shí)現(xiàn),有了之前幾章的例子基协,這里應(yīng)該不用再舉例了歌亲。

4. RxBus

RxBus 名字看起來像一個(gè)庫,但它并不是一個(gè)庫澜驮,而是一種模式陷揪,它的思想是使用 RxJava 來實(shí)現(xiàn)了 EventBus ,而讓你不再需要使用Otto或者 GreenRobot 的EventBus杂穷。至于什么是 RxBus悍缠,可以看這篇文章。順便說一句耐量,F(xiàn)lipboard 已經(jīng)用 RxBus 替換掉了Otto飞蚓,目前為止沒有不良反應(yīng)。

最后

對(duì)于

Android 開發(fā)者來說廊蜒, RxJava 是一個(gè)很難上手的庫趴拧,因?yàn)樗鼘?duì)于 Android

開發(fā)者來說有太多陌生的概念了溅漾。可是它真的很牛逼著榴。因此樟凄,我寫了這篇《給 Android 開發(fā)者的 RxJava 詳解》,希望能給始終搞不明白什么是

RxJava 的人一些入門的指引兄渺,或者能讓正在使用 RxJava 但仍然心存疑惑的人看到一些更深入的解析。無論如何汰现,只要能給各位同為

Android 工程師的你們提供一些幫助挂谍,這篇文章的目的就達(dá)到了。

原作者博客地址:http://gank.io/post/560e15be2dca930e00da1083

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末瞎饲,一起剝皮案震驚了整個(gè)濱河市口叙,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌嗅战,老刑警劉巖妄田,帶你破解...
    沈念sama閱讀 219,539評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異驮捍,居然都是意外死亡疟呐,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,594評(píng)論 3 396
  • 文/潘曉璐 我一進(jìn)店門东且,熙熙樓的掌柜王于貴愁眉苦臉地迎上來启具,“玉大人,你說我怎么就攤上這事珊泳÷撤耄” “怎么了?”我有些...
    開封第一講書人閱讀 165,871評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵色查,是天一觀的道長薯演。 經(jīng)常有香客問我,道長秧了,這世上最難降的妖魔是什么跨扮? 我笑而不...
    開封第一講書人閱讀 58,963評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮示惊,結(jié)果婚禮上好港,老公的妹妹穿的比我還像新娘。我一直安慰自己米罚,他們只是感情好钧汹,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,984評(píng)論 6 393
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著录择,像睡著了一般拔莱。 火紅的嫁衣襯著肌膚如雪碗降。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,763評(píng)論 1 307
  • 那天塘秦,我揣著相機(jī)與錄音讼渊,去河邊找鬼。 笑死尊剔,一個(gè)胖子當(dāng)著我的面吹牛爪幻,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播须误,決...
    沈念sama閱讀 40,468評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼挨稿,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了京痢?” 一聲冷哼從身側(cè)響起奶甘,我...
    開封第一講書人閱讀 39,357評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎祭椰,沒想到半個(gè)月后臭家,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,850評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡方淤,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,002評(píng)論 3 338
  • 正文 我和宋清朗相戀三年钉赁,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片臣淤。...
    茶點(diǎn)故事閱讀 40,144評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡橄霉,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出邑蒋,到底是詐尸還是另有隱情姓蜂,我是刑警寧澤,帶...
    沈念sama閱讀 35,823評(píng)論 5 346
  • 正文 年R本政府宣布医吊,位于F島的核電站钱慢,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏卿堂。R本人自食惡果不足惜束莫,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,483評(píng)論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望草描。 院中可真熱鬧览绿,春花似錦、人聲如沸穗慕。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,026評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽逛绵。三九已至怀各,卻和暖如春倔韭,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背瓢对。 一陣腳步聲響...
    開封第一講書人閱讀 33,150評(píng)論 1 272
  • 我被黑心中介騙來泰國打工寿酌, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人硕蛹。 一個(gè)月前我還...
    沈念sama閱讀 48,415評(píng)論 3 373
  • 正文 我出身青樓醇疼,卻偏偏與公主長得像,于是被迫代替她去往敵國和親法焰。 傳聞我的和親對(duì)象是個(gè)殘疾皇子僵腺,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,092評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容