RxJava 了解

轉(zhuǎn)一篇文章

原地址:http://gank.io/post/560e15be2dca930e00da1083

前言

我從去年開始使用 RxJava 滩愁,到現(xiàn)在一年多了腌歉。今年加入了 Flipboard 后乾忱,看到 Flipboard 的 Android 項(xiàng)目也在使用 RxJava 耗美,并且使用的場景越來越多 。而最近這幾個月弯汰,我也發(fā)現(xiàn)國內(nèi)越來越多的人開始提及 RxJava 祷愉。有人說『RxJava 真是太好用了』窗宦,有人說『RxJava 真是太難用了』,另外更多的人表示:我真的百度了也谷歌了二鳄,但我還是想問: RxJava 到底是什么赴涵?

鑒于 RxJava 目前這種既火爆又神秘的現(xiàn)狀,而我又在一年的使用過程中對 RxJava 有了一些理解订讼,我決定寫下這篇文章來對 RxJava 做一個相對詳細(xì)的髓窜、針對 Android 開發(fā)者的介紹。

這篇文章的目的有兩個: 1. 給對 RxJava 感興趣的人一些入門的指引 2. 給正在使用 RxJava 但仍然心存疑惑的人一些更深入的解析

RxJava 到底是什么

一個詞:異步欺殿。

RxJava 在 GitHub 主頁上的自我介紹是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一個在 Java VM 上使用可觀測的序列來組成異步的寄纵、基于事件的程序的庫)。這就是 RxJava 脖苏,概括得非常精準(zhǔn)程拭。

然而,對于初學(xué)者來說棍潘,這太難看懂了恃鞋。因?yàn)樗且粋€『總結(jié)』,而初學(xué)者更需要一個『引言』亦歉。

其實(shí)恤浪, RxJava 的本質(zhì)可以壓縮為異步這一個詞。說到根上鳍徽,它就是一個實(shí)現(xiàn)異步操作的庫,而別的定語都是基于這之上的敢课。

RxJava 好在哪

換句話說阶祭,『同樣是做異步,為什么人們用它直秆,而不用現(xiàn)成的 AsyncTask / Handler / XXX / ... 濒募?』

一個詞:簡潔

異步操作很關(guān)鍵的一點(diǎn)是程序的簡潔性圾结,因?yàn)樵谡{(diào)度過程比較復(fù)雜的情況下瑰剃,異步代碼經(jīng)常會既難寫也難被讀懂。 Android 創(chuàng)造的?AsyncTask?和Handler?筝野,其實(shí)都是為了讓異步代碼更加簡潔晌姚。RxJava 的優(yōu)勢也是簡潔粤剧,但它的簡潔的與眾不同之處在于,隨著程序邏輯變得越來越復(fù)雜挥唠,它依然能夠保持簡潔抵恋。

假設(shè)有這樣一個需求:界面上有一個自定義的視圖?imageCollectorView?,它的作用是顯示多張圖片宝磨,并能使用?addImage(Bitmap)?方法來任意增加顯示的圖片。現(xiàn)在需要程序?qū)⒁粋€給出的目錄數(shù)組?File[] folders?中每個目錄下的 png 圖片都加載出來并顯示在?imageCollectorView?中。需要注意的是懒棉,由于讀取圖片的這一過程較為耗時椅贱,需要放在后臺執(zhí)行,而圖片的顯示則必須在 UI 線程執(zhí)行窿祥。常用的實(shí)現(xiàn)方式有多種株憾,我這里貼出其中一種:

····

public class Test{

}

····


new Thread() {? ? @Override? ? public void run() {? ? ? ? super.run();? ? ? ? for (File folder : folders) {? ? ? ? ? ? File[] files = folder.listFiles();? ? ? ? ? ? for (File file : files) {? ? ? ? ? ? ? ? if (file.getName().endsWith(".png")) {? ? ? ? ? ? ? ? ? ? final Bitmap bitmap = getBitmapFromFile(file);? ? ? ? ? ? ? ? ? ? getActivity().runOnUiThread(new Runnable() {? ? ? ? ? ? ? ? ? ? ? ? @Override? ? ? ? ? ? ? ? ? ? ? ? public void run() {? ? ? ? ? ? ? ? ? ? ? ? ? ? imageCollectorView.addImage(bitmap);? ? ? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? ? ? });? ? ? ? ? ? ? ? }? ? ? ? ? ? }? ? ? ? }? ? }}.start();



而如果使用 RxJava ,實(shí)現(xiàn)方式是這樣的:

<code>

Observable.from(folders)? ? .flatMap(new Func1>() {? ? ? ? @Override? ? ? ? public Observable call(File file) {? ? ? ? ? ? return Observable.from(file.listFiles());? ? ? ? }? ? })? ? .filter(new Func1() {? ? ? ? @Override? ? ? ? public Boolean call(File file) {? ? ? ? ? ? return file.getName().endsWith(".png");? ? ? ? }? ? })? ? .map(new Func1() {? ? ? ? @Override? ? ? ? public Bitmap call(File file) {? ? ? ? ? ? return getBitmapFromFile(file);? ? ? ? }? ? })? ? .subscribeOn(Schedulers.io())? ? .observeOn(AndroidSchedulers.mainThread())? ? .subscribe(new Action1() {? ? ? ? @Override? ? ? ? public void call(Bitmap bitmap) {? ? ? ? ? ? imageCollectorView.addImage(bitmap);? ? ? ? }? ? });

</code>

那位說話了:『你這代碼明明變多了氨诶摺号胚!簡潔個毛啊浸遗!』大兄弟你消消氣猫胁,我說的是邏輯的簡潔,不是單純的代碼量少(邏輯簡潔才是提升讀寫代碼速度的必殺技對不跛锌?)弃秆。觀察一下你會發(fā)現(xiàn), RxJava 的這個實(shí)現(xiàn)髓帽,是一條從上到下的鏈?zhǔn)秸{(diào)用菠赚,沒有任何嵌套,這在邏輯的簡潔性上是具有優(yōu)勢的郑藏。當(dāng)需求變得復(fù)雜時衡查,這種優(yōu)勢將更加明顯(試想如果還要求只選取前 10 張圖片,常規(guī)方式要怎么辦必盖?如果有更多這樣那樣的要求呢拌牲?再試想,在這一大堆需求實(shí)現(xiàn)完兩個月之后需要改功能歌粥,當(dāng)你翻回這里看到自己當(dāng)初寫下的那一片迷之縮進(jìn)塌忽,你能保證自己將迅速看懂,而不是對著代碼重新捋一遍思路失驶?)土居。

另外,如果你的 IDE 是 Android Studio ,其實(shí)每次打開某個 Java 文件的時候擦耀,你會看到被自動 Lambda 化的預(yù)覽棉圈,這將讓你更加清晰地看到程序邏輯:

Observable.from(folders)? ? .flatMap((Func1) (folder) -> { Observable.from(file.listFiles()) })? ? .filter((Func1) (file) -> { file.getName().endsWith(".png") })? ? .map((Func1) (file) -> { getBitmapFromFile(file) })? ? .subscribeOn(Schedulers.io())? ? .observeOn(AndroidSchedulers.mainThread())? ? .subscribe((Action1) (bitmap) -> { imageCollectorView.addImage(bitmap) });

如果你習(xí)慣使用 Retrolambda ,你也可以直接把代碼寫成上面這種簡潔的形式埂奈。而如果你看到這里還不知道什么是 Retrolambda 迄损,我不建議你現(xiàn)在就去學(xué)習(xí)它。原因有兩點(diǎn):1. Lambda 是把雙刃劍账磺,它讓你的代碼簡潔的同時芹敌,降低了代碼的可讀性,因此同時學(xué)習(xí) RxJava 和 Retrolambda 可能會讓你忽略 RxJava 的一些技術(shù)細(xì)節(jié)垮抗;2. Retrolambda 是 Java 6/7 對 Lambda 表達(dá)式的非官方兼容方案氏捞,它的向后兼容性和穩(wěn)定性是無法保障的,因此對于企業(yè)項(xiàng)目冒版,使用 Retrolambda 是有風(fēng)險(xiǎn)的液茎。所以,與很多 RxJava 的推廣者不同辞嗡,我并不推薦在學(xué)習(xí) RxJava 的同時一起學(xué)習(xí) Retrolambda捆等。事實(shí)上,我個人雖然很欣賞 Retrolambda续室,但我從來不用它栋烤。

在Flipboard 的 Android 代碼中,有一段邏輯非常復(fù)雜挺狰,包含了多次內(nèi)存操作明郭、本地文件操作和網(wǎng)絡(luò)操作,對象分分合合丰泊,線程間相互配合相互等待薯定,一會兒排成人字,一會兒排成一字瞳购。如果使用常規(guī)的方法來實(shí)現(xiàn)话侄,肯定是要寫得欲仙欲死,然而在使用 RxJava 的情況下学赛,依然只是一條鏈?zhǔn)秸{(diào)用就完成了年堆。它很長,但很清晰罢屈。

所以嘀韧, RxJava 好在哪篇亭?就好在簡潔缠捌,好在那把什么復(fù)雜邏輯都能穿成一條線的簡潔。

API 介紹和原理簡析

這個我就做不到一個詞說明了……因?yàn)檫@一節(jié)的主要內(nèi)容就是一步步地說明 RxJava 到底怎樣做到了異步,怎樣做到了簡潔曼月。

1. 概念:擴(kuò)展的觀察者模式

RxJava 的異步實(shí)現(xiàn)谊却,是通過一種擴(kuò)展的觀察者模式來實(shí)現(xiàn)的。

觀察者模式

先簡述一下觀察者模式哑芹,已經(jīng)熟悉的可以跳過這一段炎辨。

觀察者模式面向的需求是:A 對象(觀察者)對 B 對象(被觀察者)的某種變化高度敏感,需要在 B 變化的一瞬間做出反應(yīng)聪姿。舉個例子碴萧,新聞里喜聞樂見的警察抓小偷,警察需要在小偷伸手作案的時候?qū)嵤┳ゲ赌┕骸T谶@個例子里破喻,警察是觀察者,小偷是被觀察者盟榴,警察需要時刻盯著小偷的一舉一動曹质,才能保證不會漏過任何瞬間。程序的觀察者模式和這種真正的『觀察』略有不同擎场,觀察者不需要時刻盯著被觀察者(例如 A 不需要每過 2ms 就檢查一次 B 的狀態(tài))羽德,而是采用注冊(Register)或者稱為訂閱(Subscribe)的方式,告訴被觀察者:我需要你的某某狀態(tài)迅办,你要在它變化的時候通知我宅静。 Android 開發(fā)中一個比較典型的例子是點(diǎn)擊監(jiān)聽器?OnClickListener?。對設(shè)置?OnClickListener?來說礼饱,?View?是被觀察者坏为,?OnClickListener?是觀察者,二者通過?setOnClickListener()?方法達(dá)成訂閱關(guān)系镊绪。訂閱之后用戶點(diǎn)擊按鈕的瞬間匀伏,Android Framework 就會將點(diǎn)擊事件發(fā)送給已經(jīng)注冊的?OnClickListener?。采取這樣被動的觀察方式蝴韭,既省去了反復(fù)檢索狀態(tài)的資源消耗够颠,也能夠得到最高的反饋速度。當(dāng)然榄鉴,這也得益于我們可以隨意定制自己程序中的觀察者和被觀察者履磨,而警察叔叔明顯無法要求小偷『你在作案的時候務(wù)必通知我』。

OnClickListener 的模式大致如下圖:

如圖所示庆尘,通過?setOnClickListener()?方法剃诅,Button?持有?OnClickListener?的引用(這一過程沒有在圖上畫出);當(dāng)用戶點(diǎn)擊時驶忌,Button?自動調(diào)用?OnClickListener?的?onClick()?方法矛辕。另外,如果把這張圖中的概念抽象出來(Button?-> 被觀察者、OnClickListener?-> 觀察者聊品、setOnClickListener()?-> 訂閱飞蹂,onClick()?-> 事件),就由專用的觀察者模式(例如只用于監(jiān)聽控件點(diǎn)擊)轉(zhuǎn)變成了通用的觀察者模式翻屈。如下圖:

而 RxJava 作為一個工具庫陈哑,使用的就是通用形式的觀察者模式。

RxJava 的觀察者模式

RxJava 有四個基本概念:Observable?(可觀察者伸眶,即被觀察者)惊窖、?Observer?(觀察者)、?subscribe?(訂閱)厘贼、事件爬坑。Observable?和?Observer?通過?subscribe()?方法實(shí)現(xiàn)訂閱關(guān)系,從而?Observable?可以在需要的時候發(fā)出事件來通知?Observer涂臣。

與傳統(tǒng)觀察者模式不同盾计, RxJava 的事件回調(diào)方法除了普通事件?onNext()?(相當(dāng)于?onClick()?/?onEvent())之外,還定義了兩個特殊的事件:onCompleted()?和?onError()赁遗。

onCompleted(): 事件隊(duì)列完結(jié)署辉。RxJava 不僅把每個事件單獨(dú)處理,還會把它們看做一個隊(duì)列岩四。RxJava 規(guī)定哭尝,當(dāng)不會再有新的?onNext()?發(fā)出時,需要觸發(fā)?onCompleted()?方法作為標(biāo)志剖煌。

onError(): 事件隊(duì)列異常材鹦。在事件處理過程中出異常時,onError()?會被觸發(fā)耕姊,同時隊(duì)列自動終止桶唐,不允許再有事件發(fā)出。

在一個正確運(yùn)行的事件序列中,?onCompleted()?和?onError()?有且只有一個茉兰,并且是事件序列中的最后一個尤泽。需要注意的是,onCompleted()?和?onError()?二者也是互斥的规脸,即在隊(duì)列中調(diào)用了其中一個坯约,就不應(yīng)該再調(diào)用另一個。

RxJava 的觀察者模式大致如下圖:

2. 基本實(shí)現(xiàn)

基于以上的概念莫鸭, RxJava 的基本實(shí)現(xiàn)主要有三點(diǎn):

1) 創(chuàng)建 Observer

Observer 即觀察者闹丐,它決定事件觸發(fā)的時候?qū)⒂性鯓拥男袨椤?RxJava 中的?Observer?接口的實(shí)現(xiàn)方式:

Observer observer = new Observer() {? ? @Override? ? public void onNext(String s) {? ? ? ? Log.d(tag, "Item: " + s);? ? }? ? @Override? ? public void onCompleted() {? ? ? ? Log.d(tag, "Completed!");? ? }? ? @Override? ? public void onError(Throwable e) {? ? ? ? Log.d(tag, "Error!");? ? }};

除了?Observer?接口之外,RxJava 還內(nèi)置了一個實(shí)現(xiàn)了?Observer?的抽象類:Subscriber被因。?Subscriber?對?Observer?接口進(jìn)行了一些擴(kuò)展卿拴,但他們的基本使用方式是完全一樣的:

Subscriber subscriber = new Subscriber() {? ? @Override? ? public void onNext(String s) {? ? ? ? Log.d(tag, "Item: " + s);? ? }? ? @Override? ? public void onCompleted() {? ? ? ? Log.d(tag, "Completed!");? ? }? ? @Override? ? public void onError(Throwable e) {? ? ? ? Log.d(tag, "Error!");? ? }};

不僅基本使用方式一樣滥玷,實(shí)質(zhì)上,在 RxJava 的 subscribe 過程中巍棱,Observer?也總是會先被轉(zhuǎn)換成一個?Subscriber?再使用。所以如果你只想使用基本功能蛋欣,選擇?Observer?和?Subscriber?是完全一樣的航徙。它們的區(qū)別對于使用者來說主要有兩點(diǎn):

onStart(): 這是?Subscriber?增加的方法。它會在 subscribe 剛開始陷虎,而事件還未發(fā)送之前被調(diào)用到踏,可以用于做一些準(zhǔn)備工作,例如數(shù)據(jù)的清零或重置尚猿。這是一個可選方法窝稿,默認(rèn)情況下它的實(shí)現(xiàn)為空。需要注意的是凿掂,如果對準(zhǔn)備工作的線程有要求(例如彈出一個顯示進(jìn)度的對話框伴榔,這必須在主線程執(zhí)行),?onStart()?就不適用了庄萎,因?yàn)樗偸窃?subscribe 所發(fā)生的線程被調(diào)用踪少,而不能指定線程。要在指定的線程來做準(zhǔn)備工作糠涛,可以使用?doOnSubscribe()?方法援奢,具體可以在后面的文中看到。

unsubscribe(): 這是?Subscriber?所實(shí)現(xiàn)的另一個接口?Subscription?的方法忍捡,用于取消訂閱集漾。在這個方法被調(diào)用后,Subscriber?將不再接收事件砸脊。一般在這個方法調(diào)用前具篇,可以使用?isUnsubscribed()?先判斷一下狀態(tài)。?unsubscribe()?這個方法很重要凌埂,因?yàn)樵?subscribe()?之后栽连,?Observable?會持有?Subscriber?的引用,這個引用如果不能及時被釋放侨舆,將有內(nèi)存泄露的風(fēng)險(xiǎn)秒紧。所以最好保持一個原則:要在不再使用的時候盡快在合適的地方(例如?onPause()?onStop()?等方法中)調(diào)用?unsubscribe()?來解除引用關(guān)系,以避免內(nèi)存泄露的發(fā)生挨下。

2) 創(chuàng)建 Observable

Observable 即被觀察者熔恢,它決定什么時候觸發(fā)事件以及觸發(fā)怎樣的事件。 RxJava 使用?create()?方法來創(chuàng)建一個 Observable 臭笆,并為它定義事件觸發(fā)規(guī)則:

Observable observable = Observable.create(new Observable.OnSubscribe() {? ? @Override? ? public void call(Subscriber subscriber) {? ? ? ? subscriber.onNext("Hello");? ? ? ? subscriber.onNext("Hi");? ? ? ? subscriber.onNext("Aloha");? ? ? ? subscriber.onCompleted();? ? }});

可以看到叙淌,這里傳入了一個?OnSubscribe?對象作為參數(shù)秤掌。OnSubscribe?會被存儲在返回的?Observable?對象中,它的作用相當(dāng)于一個計(jì)劃表鹰霍,當(dāng)?Observable?被訂閱的時候闻鉴,OnSubscribe?的?call()?方法會自動被調(diào)用,事件序列就會依照設(shè)定依次觸發(fā)(對于上面的代碼茂洒,就是觀察者Subscriber?將會被調(diào)用三次?onNext()?和一次?onCompleted())孟岛。這樣,由被觀察者調(diào)用了觀察者的回調(diào)方法督勺,就實(shí)現(xiàn)了由被觀察者向觀察者的事件傳遞渠羞,即觀察者模式。

這個例子很簡單:事件的內(nèi)容是字符串智哀,而不是一些復(fù)雜的對象次询;事件的內(nèi)容是已經(jīng)定好了的,而不像有的觀察者模式一樣是待確定的(例如網(wǎng)絡(luò)請求的結(jié)果在請求返回之前是未知的)瓷叫;所有事件在一瞬間被全部發(fā)送出去屯吊,而不是夾雜一些確定或不確定的時間間隔或者經(jīng)過某種觸發(fā)器來觸發(fā)的∧〔ぃ總之雌芽,這個例子看起來毫無實(shí)用價(jià)值。但這是為了便于說明辨嗽,實(shí)質(zhì)上只要你想世落,各種各樣的事件發(fā)送規(guī)則你都可以自己來寫。至于具體怎么做糟需,后面都會講到屉佳,但現(xiàn)在不行。只有把基礎(chǔ)原理先說明白了洲押,上層的運(yùn)用才能更容易說清楚武花。

create()?方法是 RxJava 最基本的創(chuàng)造事件序列的方法¤菊剩基于這個方法体箕, RxJava 還提供了一些方法用來快捷創(chuàng)建事件隊(duì)列,例如:

just(T...): 將傳入的參數(shù)依次發(fā)送出來挑童。

Observable observable = Observable.just("Hello", "Hi", "Aloha");// 將會依次調(diào)用:// onNext("Hello");// onNext("Hi");// onNext("Aloha");// onCompleted();

from(T[])?/?from(Iterable)?: 將傳入的數(shù)組或?Iterable?拆分成具體對象后累铅,依次發(fā)送出來。

String[] words = {"Hello", "Hi", "Aloha"};Observable observable = Observable.from(words);// 將會依次調(diào)用:// onNext("Hello");// onNext("Hi");// onNext("Aloha");// onCompleted();

上面?just(T...)?的例子和?from(T[])?的例子站叼,都和之前的?create(OnSubscribe)?的例子是等價(jià)的娃兽。

3) Subscribe (訂閱)

創(chuàng)建了?Observable?和?Observer?之后,再用?subscribe()?方法將它們聯(lián)結(jié)起來尽楔,整條鏈子就可以工作了投储。代碼形式很簡單:

observable.subscribe(observer);// 或者:observable.subscribe(subscriber);

有人可能會注意到第练,?subscribe()?這個方法有點(diǎn)怪:它看起來是『observalbe?訂閱了?observer?/?subscriber』而不是『observer?/?subscriber?訂閱了?observalbe』,這看起來就像『雜志訂閱了讀者』一樣顛倒了對象關(guān)系玛荞。這讓人讀起來有點(diǎn)別扭娇掏,不過如果把 API 設(shè)計(jì)成?observer.subscribe(observable)?/?subscriber.subscribe(observable)?,雖然更加符合思維邏輯勋眯,但對流式 API 的設(shè)計(jì)就造成影響了婴梧,比較起來明顯是得不償失的。

Observable.subscribe(Subscriber)?的內(nèi)部實(shí)現(xiàn)是這樣的(僅核心代碼):

// 注意:這不是 subscribe() 的源碼凡恍,而是將源碼中與性能、兼容性怔球、擴(kuò)展性有關(guān)的代碼剔除后的核心代碼嚼酝。// 如果需要看源碼,可以去 RxJava 的 GitHub 倉庫下載竟坛。public Subscription subscribe(Subscriber subscriber) {? ? subscriber.onStart();? ? onSubscribe.call(subscriber);? ? return subscriber;}

可以看到闽巩,subscriber()?做了3件事:

調(diào)用?Subscriber.onStart()?。這個方法在前面已經(jīng)介紹過担汤,是一個可選的準(zhǔn)備方法涎跨。

調(diào)用?Observable?中的?OnSubscribe.call(Subscriber)?。在這里崭歧,事件發(fā)送的邏輯開始運(yùn)行隅很。從這也可以看出,在 RxJava 中率碾,?Observable?并不是在創(chuàng)建的時候就立即開始發(fā)送事件叔营,而是在它被訂閱的時候,即當(dāng)?subscribe()?方法執(zhí)行的時候所宰。

將傳入的?Subscriber?作為?Subscription?返回绒尊。這是為了方便?unsubscribe().

整個過程中對象間的關(guān)系如下圖:

或者可以看動圖:

除了?subscribe(Observer)?和?subscribe(Subscriber)?,subscribe()?還支持不完整定義的回調(diào)仔粥,RxJava 會自動根據(jù)定義創(chuàng)建出?Subscriber?婴谱。形式如下:

Action1 onNextAction = new Action1() {? ? // onNext()? ? @Override? ? public void call(String s) {? ? ? ? Log.d(tag, s);? ? }};Action1 onErrorAction = new Action1() {? ? // onError()? ? @Override? ? public void call(Throwable throwable) {? ? ? ? // Error handling? ? }};Action0 onCompletedAction = new Action0() {? ? // onCompleted()? ? @Override? ? public void call() {? ? ? ? Log.d(tag, "completed");? ? }};// 自動創(chuàng)建 Subscriber ,并使用 onNextAction 來定義 onNext()observable.subscribe(onNextAction);// 自動創(chuàng)建 Subscriber 躯泰,并使用 onNextAction 和 onErrorAction 來定義 onNext() 和 onError()observable.subscribe(onNextAction, onErrorAction);// 自動創(chuàng)建 Subscriber 谭羔,并使用 onNextAction、 onErrorAction 和 onCompletedAction 來定義 onNext()麦向、 onError() 和 onCompleted()observable.subscribe(onNextAction, onErrorAction, onCompletedAction);

簡單解釋一下這段代碼中出現(xiàn)的?Action1?和?Action0口糕。?Action0?是 RxJava 的一個接口,它只有一個方法?call()磕蛇,這個方法是無參無返回值的景描;由于?onCompleted()?方法也是無參無返回值的十办,因此?Action0?可以被當(dāng)成一個包裝對象,將?onCompleted()?的內(nèi)容打包起來將自己作為一個參數(shù)傳入?subscribe()?以實(shí)現(xiàn)不完整定義的回調(diào)超棺。這樣其實(shí)也可以看做將?onCompleted()?方法作為參數(shù)傳進(jìn)了?subscribe()向族,相當(dāng)于其他某些語言中的『閉包』。?Action1?也是一個接口棠绘,它同樣只有一個方法?call(T param)件相,這個方法也無返回值,但有一個參數(shù)氧苍;與?Action0?同理夜矗,由于?onNext(T obj)?和?onError(Throwable error)?也是單參數(shù)無返回值的,因此?Action1可以將?onNext(obj)?和?onError(error)?打包起來傳入?subscribe()?以實(shí)現(xiàn)不完整定義的回調(diào)让虐。事實(shí)上紊撕,雖然?Action0?和?Action1在 API 中使用最廣泛,但 RxJava 是提供了多個?ActionX?形式的接口 (例如?Action2,?Action3) 的赡突,它們可以被用以包裝不同的無返回值的方法对扶。

注:正如前面所提到的,Observer?和?Subscriber?具有相同的角色惭缰,而且?Observer?在?subscribe()?過程中最終會被轉(zhuǎn)換成?Subscriber對象浪南,因此,從這里開始漱受,后面的描述我將用?Subscriber?來代替?Observer?络凿,這樣更加嚴(yán)謹(jǐn)。

4) 場景示例

下面舉兩個例子:

為了把原理用更清晰的方式表述出來昂羡,本文中挑選的都是功能盡可能簡單的例子喷众,以至于有些示例代碼看起來會有『畫蛇添足』『明明不用 RxJava 可以更簡便地解決問題』的感覺。當(dāng)你看到這種情況紧憾,不要覺得是因?yàn)?RxJava 太啰嗦到千,而是因?yàn)樵谶^早的時候舉出真實(shí)場景的例子并不利于原理的解析,因此我刻意挑選了簡單的情景赴穗。

a. 打印字符串?dāng)?shù)組

將字符串?dāng)?shù)組?names?中的所有字符串依次打印出來:

String[] names = ...;Observable.from(names)? ? .subscribe(new Action1() {? ? ? ? @Override? ? ? ? public void call(String name) {? ? ? ? ? ? Log.d(tag, name);? ? ? ? }? ? });

b. 由 id 取得圖片并顯示

由指定的一個 drawable 文件 id?drawableRes?取得圖片憔四,并顯示在?ImageView?中,并在出現(xiàn)異常的時候打印 Toast 報(bào)錯:

int drawableRes = ...;ImageView imageView = ...;Observable.create(new OnSubscribe() {? ? @Override? ? public void call(Subscriber subscriber) {? ? ? ? Drawable drawable = getTheme().getDrawable(drawableRes));? ? ? ? subscriber.onNext(drawable);? ? ? ? subscriber.onCompleted();? ? }}).subscribe(new Observer() {? ? @Override? ? public void onNext(Drawable drawable) {? ? ? ? imageView.setImageDrawable(drawable);? ? }? ? @Override? ? public void onCompleted() {? ? }? ? @Override? ? public void onError(Throwable e) {? ? ? ? Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();? ? }});

正如上面兩個例子這樣般眉,創(chuàng)建出?Observable?和?Subscriber?了赵,再用?subscribe()?將它們串起來,一次 RxJava 的基本使用就完成了甸赃。非常簡單柿汛。

然而,

在 RxJava 的默認(rèn)規(guī)則中,事件的發(fā)出和消費(fèi)都是在同一個線程的络断。也就是說裁替,如果只用上面的方法,實(shí)現(xiàn)出來的只是一個同步的觀察者模式貌笨。觀察者模式本身的目的就是『后臺處理弱判,前臺回調(diào)』的異步機(jī)制,因此異步對于 RxJava 是至關(guān)重要的锥惋。而要實(shí)現(xiàn)異步昌腰,則需要用到 RxJava 的另一個概念:?Scheduler?。

3. 線程控制 —— Scheduler (一)

在不指定線程的情況下膀跌, RxJava 遵循的是線程不變的原則遭商,即:在哪個線程調(diào)用?subscribe(),就在哪個線程生產(chǎn)事件捅伤;在哪個線程生產(chǎn)事件劫流,就在哪個線程消費(fèi)事件。如果需要切換線程暑认,就需要用到?Scheduler?(調(diào)度器)困介。

1) Scheduler 的 API (一)

在RxJava 中大审,Scheduler?——調(diào)度器蘸际,相當(dāng)于線程控制器,RxJava 通過它來指定每一段代碼應(yīng)該運(yùn)行在什么樣的線程徒扶。RxJava 已經(jīng)內(nèi)置了幾個?Scheduler?粮彤,它們已經(jīng)適合大多數(shù)的使用場景:

Schedulers.immediate(): 直接在當(dāng)前線程運(yùn)行,相當(dāng)于不指定線程姜骡。這是默認(rèn)的?Scheduler导坟。

Schedulers.newThread(): 總是啟用新線程,并在新線程執(zhí)行操作圈澈。

Schedulers.io(): I/O 操作(讀寫文件惫周、讀寫數(shù)據(jù)庫、網(wǎng)絡(luò)信息交互等)所使用的?Scheduler康栈。行為模式和?newThread()?差不多递递,區(qū)別在于?io()?的內(nèi)部實(shí)現(xiàn)是是用一個無數(shù)量上限的線程池,可以重用空閑的線程啥么,因此多數(shù)情況下?io()?比?newThread()?更有效率登舞。不要把計(jì)算工作放在?io()?中,可以避免創(chuàng)建不必要的線程悬荣。

Schedulers.computation(): 計(jì)算所使用的?Scheduler菠秒。這個計(jì)算指的是 CPU 密集型計(jì)算,即不會被 I/O 等操作限制性能的操作氯迂,例如圖形的計(jì)算践叠。這個?Scheduler?使用的固定的線程池言缤,大小為 CPU 核數(shù)。不要把 I/O 操作放在?computation()?中酵熙,否則 I/O 操作的等待時間會浪費(fèi) CPU轧简。

另外, Android 還有一個專用的?AndroidSchedulers.mainThread()匾二,它指定的操作將在 Android 主線程運(yùn)行哮独。

有了這幾個?Scheduler?,就可以使用?subscribeOn()?和?observeOn()?兩個方法來對線程進(jìn)行控制了察藐。 *?subscribeOn(): 指定?subscribe()?所發(fā)生的線程皮璧,即?Observable.OnSubscribe?被激活時所處的線程》址桑或者叫做事件產(chǎn)生的線程悴务。 *?observeOn(): 指定?Subscriber?所運(yùn)行在的線程∑┟ǎ或者叫做事件消費(fèi)的線程讯檐。

文字?jǐn)⑹隹倸w難理解,上代碼:

Observable.just(1, 2, 3, 4)? ? .subscribeOn(Schedulers.io()) // 指定 subscribe() 發(fā)生在 IO 線程? ? .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調(diào)發(fā)生在主線程? ? .subscribe(new Action1() {? ? ? ? @Override? ? ? ? public void call(Integer number) {? ? ? ? ? ? Log.d(tag, "number:" + number);? ? ? ? }? ? });

上面這段代碼中染服,由于?subscribeOn(Schedulers.io())?的指定别洪,被創(chuàng)建的事件的內(nèi)容?1蒙袍、2舶得、3、4?將會在 IO 線程發(fā)出婉支;而由于?observeOn(AndroidScheculers.mainThread()) 的指定秉颗,因此?subscriber?數(shù)字的打印將發(fā)生在主線程 痢毒。事實(shí)上,這種在?subscribe()?之前寫上兩句?subscribeOn(Scheduler.io())?和?observeOn(AndroidSchedulers.mainThread())?的使用方式非常常見蚕甥,它適用于多數(shù)的 『后臺線程取數(shù)據(jù)哪替,主線程顯示』的程序策略。

而前面提到的由圖片 id 取得圖片并顯示的例子菇怀,如果也加上這兩句:

int drawableRes = ...;ImageView imageView = ...;Observable.create(new OnSubscribe() {? ? @Override? ? public void call(Subscriber subscriber) {? ? ? ? Drawable drawable = getTheme().getDrawable(drawableRes));? ? ? ? subscriber.onNext(drawable);? ? ? ? subscriber.onCompleted();? ? }}).subscribeOn(Schedulers.io()) // 指定 subscribe() 發(fā)生在 IO 線程.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調(diào)發(fā)生在主線程.subscribe(new Observer() {? ? @Override? ? public void onNext(Drawable drawable) {? ? ? ? imageView.setImageDrawable(drawable);? ? }? ? @Override? ? public void onCompleted() {? ? }? ? @Override? ? public void onError(Throwable e) {? ? ? ? Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();? ? }});

那么凭舶,加載圖片將會發(fā)生在 IO 線程,而設(shè)置圖片則被設(shè)定在了主線程敏释。這就意味著库快,即使加載圖片耗費(fèi)了幾十甚至幾百毫秒的時間,也不會造成絲毫界面的卡頓钥顽。

2) Scheduler 的原理 (一)

RxJava 的 Scheduler API 很方便义屏,也很神奇(加了一句話就把線程切換了,怎么做到的?而且?subscribe()?不是最外層直接調(diào)用的方法嗎闽铐,它竟然也能被指定線程蝶怔?)。然而 Scheduler 的原理需要放在后面講兄墅,因?yàn)樗脑硎且韵乱还?jié)《變換》的原理作為基礎(chǔ)的踢星。

好吧這一節(jié)其實(shí)我屁也沒說,只是為了讓你安心隙咸,讓你知道我不是忘了講原理沐悦,而是把它放在了更合適的地方。

4. 變換

終于要到牛逼的地方了五督,不管你激動不激動藏否,反正我是激動了。

RxJava 提供了對事件序列進(jìn)行變換的支持充包,這是它的核心功能之一副签,也是大多數(shù)人說『RxJava 真是太好用了』的最大原因。所謂變換基矮,就是將事件序列中的對象或整個序列進(jìn)行加工處理淆储,轉(zhuǎn)換成不同的事件或事件序列。概念說著總是模糊難懂的家浇,來看 API本砰。

1) API

首先看一個?map()?的例子:

Observable.just("images/logo.png") // 輸入類型 String? ? .map(new Func1() {? ? ? ? @Override? ? ? ? public Bitmap call(String filePath) { // 參數(shù)類型 String? ? ? ? ? ? return getBitmapFromPath(filePath); // 返回類型 Bitmap? ? ? ? }? ? })? ? .subscribe(new Action1() {? ? ? ? @Override? ? ? ? public void call(Bitmap bitmap) { // 參數(shù)類型 Bitmap? ? ? ? ? ? showBitmap(bitmap);? ? ? ? }? ? });

這里出現(xiàn)了一個叫做?Func1?的類。它和?Action1?非常相似蓝谨,也是 RxJava 的一個接口灌具,用于包裝含有一個參數(shù)的方法青团。?Func1?和?Action?的區(qū)別在于譬巫,?Func1?包裝的是有返回值的方法。另外督笆,和?ActionX?一樣芦昔,?FuncX?也有多個,用于不同參數(shù)個數(shù)的方法娃肿。FuncX和?ActionX?的區(qū)別在?FuncX?包裝的是有返回值的方法咕缎。

可以看到,map()?方法將參數(shù)中的?String?對象轉(zhuǎn)換成一個?Bitmap?對象后返回料扰,而在經(jīng)過?map()?方法后凭豪,事件的參數(shù)類型也由?String轉(zhuǎn)為了?Bitmap。這種直接變換對象并返回的晒杈,是最常見的也最容易理解的變換嫂伞。不過 RxJava 的變換遠(yuǎn)不止這樣,它不僅可以針對事件對象,還可以針對整個事件隊(duì)列帖努,這使得 RxJava 變得非常靈活撰豺。我列舉幾個常用的變換:

map(): 事件對象的直接變換,具體功能上面已經(jīng)介紹過拼余。它是 RxJava 最常用的變換污桦。?map()?的示意圖:

flatMap(): 這是一個很有用但非常難理解的變換,因此我決定花多些篇幅來介紹它匙监。 首先假設(shè)這么一種需求:假設(shè)有一個數(shù)據(jù)結(jié)構(gòu)『學(xué)生』凡橱,現(xiàn)在需要打印出一組學(xué)生的名字。實(shí)現(xiàn)方式很簡單:

Student[] students = ...;Subscriber subscriber = new Subscriber() {? ? @Override? ? public void onNext(String name) {? ? ? ? Log.d(tag, name);? ? }? ? ...};Observable.from(students)? ? .map(new Func1() {? ? ? ? @Override? ? ? ? public String call(Student student) {? ? ? ? ? ? return student.getName();? ? ? ? }? ? })? ? .subscribe(subscriber);

很簡單亭姥。那么再假設(shè):如果要打印出每個學(xué)生所需要修的所有課程的名稱呢梭纹?(需求的區(qū)別在于,每個學(xué)生只有一個名字致份,但卻有多個課程变抽。)首先可以這樣實(shí)現(xiàn):

Student[] students = ...;Subscriber subscriber = new Subscriber() {? ? @Override? ? public void onNext(Student student) {? ? ? ? List courses = student.getCourses();? ? ? ? for (int i = 0; i < courses.size(); i++) {? ? ? ? ? ? Course course = courses.get(i);? ? ? ? ? ? Log.d(tag, course.getName());? ? ? ? }? ? }? ? ...};Observable.from(students)? ? .subscribe(subscriber);

依然很簡單。那么如果我不想在?Subscriber?中使用 for 循環(huán)氮块,而是希望?Subscriber?中直接傳入單個的?Course?對象呢(這對于代碼復(fù)用很重要)绍载?用?map()?顯然是不行的,因?yàn)?map()?是一對一的轉(zhuǎn)化滔蝉,而我現(xiàn)在的要求是一對多的轉(zhuǎn)化击儡。那怎么才能把一個 Student 轉(zhuǎn)化成多個 Course 呢?

這個時候蝠引,就需要用?flatMap()?了:

Student[] students = ...;Subscriber subscriber = new Subscriber() {? ? @Override? ? public void onNext(Course course) {? ? ? ? Log.d(tag, course.getName());? ? }? ? ...};Observable.from(students)? ? .flatMap(new Func1>() {? ? ? ? @Override? ? ? ? public Observable call(Student student) {? ? ? ? ? ? return Observable.from(student.getCourses());? ? ? ? }? ? })? ? .subscribe(subscriber);

從上面的代碼可以看出阳谍,?flatMap()?和?map()?有一個相同點(diǎn):它也是把傳入的參數(shù)轉(zhuǎn)化之后返回另一個對象。但需要注意螃概,和?map()不同的是矫夯,?flatMap()?中返回的是個?Observable?對象,并且這個?Observable?對象并不是被直接發(fā)送到了?Subscriber?的回調(diào)方法中吊洼。?flatMap()?的原理是這樣的:1. 使用傳入的事件對象創(chuàng)建一個?Observable?對象训貌;2. 并不發(fā)送這個?Observable, 而是將它激活,于是它開始發(fā)送事件冒窍;3. 每一個創(chuàng)建出來的?Observable?發(fā)送的事件递沪,都被匯入同一個?Observable?,而這個?Observable?負(fù)責(zé)將這些事件統(tǒng)一交給?Subscriber?的回調(diào)方法综液。這三個步驟款慨,把事件拆成了兩級,通過一組新創(chuàng)建的?Observable?將初始的對象『鋪平』之后通過統(tǒng)一路徑分發(fā)了下去谬莹。而這個『鋪平』就是?flatMap()?所謂的 flat檩奠。

flatMap()?示意圖:

擴(kuò)展:由于可以在嵌套的?Observable?中添加異步代碼约素,?flatMap()?也常用于嵌套的異步操作,例如嵌套的網(wǎng)絡(luò)請求笆凌。示例代碼(Retrofit + RxJava):

networkClient.token() // 返回 Observable圣猎,在訂閱時請求 token,并在響應(yīng)后發(fā)送 token? ? .flatMap(new Func1>() {? ? ? ? @Override? ? ? ? public Observable call(String token) {? ? ? ? ? ? // 返回 Observable乞而,在訂閱時請求消息列表送悔,并在響應(yīng)后發(fā)送請求到的消息列表? ? ? ? ? ? return networkClient.messages();? ? ? ? }? ? })? ? .subscribe(new Action1() {? ? ? ? @Override? ? ? ? public void call(Messages messages) {? ? ? ? ? ? // 處理顯示消息列表? ? ? ? ? ? showMessages(messages);? ? ? ? }? ? });

傳統(tǒng)的嵌套請求需要使用嵌套的 Callback 來實(shí)現(xiàn)。而通過?flatMap()?爪模,可以把嵌套的請求寫在一條鏈中欠啤,從而保持程序邏輯的清晰。

throttleFirst(): 在每次事件觸發(fā)后的一定時間間隔內(nèi)丟棄新的事件屋灌。常用作去抖動過濾洁段,例如按鈕的點(diǎn)擊監(jiān)聽器:RxView.clickEvents(button) // RxBinding 代碼,后面的文章有解釋 .throttleFirst(500, TimeUnit.MILLISECONDS) // 設(shè)置防抖間隔為 500ms .subscribe(subscriber);媽媽再也不怕我的用戶手抖點(diǎn)開兩個重復(fù)的界面啦共郭。

此外祠丝, RxJava 還提供很多便捷的方法來實(shí)現(xiàn)事件序列的變換,這里就不一一舉例了除嘹。

2) 變換的原理:lift()

這些變換雖然功能各有不同写半,但實(shí)質(zhì)上都是針對事件序列的處理和再發(fā)送。而在 RxJava 的內(nèi)部尉咕,它們是基于同一個基礎(chǔ)的變換方法:?lift(Operator)叠蝇。首先看一下?lift()?的內(nèi)部實(shí)現(xiàn)(僅核心代碼):

// 注意:這不是 lift() 的源碼,而是將源碼中與性能年缎、兼容性悔捶、擴(kuò)展性有關(guān)的代碼剔除后的核心代碼。// 如果需要看源碼单芜,可以去 RxJava 的 GitHub 倉庫下載蜕该。public Observable lift(Operator operator) {? ? return Observable.create(new OnSubscribe() {? ? ? ? @Override? ? ? ? public void call(Subscriber subscriber) {? ? ? ? ? ? Subscriber newSubscriber = operator.call(subscriber);? ? ? ? ? ? newSubscriber.onStart();? ? ? ? ? ? onSubscribe.call(newSubscriber);? ? ? ? }? ? });}

這段代碼很有意思:它生成了一個新的?Observable?并返回,而且創(chuàng)建新?Observable?所用的參數(shù)?OnSubscribe?的回調(diào)方法?call()?中的實(shí)現(xiàn)竟然看起來和前面講過的?Observable.subscribe()?一樣缓溅!然而它們并不一樣喲~不一樣的地方關(guān)鍵就在于第二行?onSubscribe.call(subscriber)?中的?onSubscribe?所指代的對象不同(高能預(yù)警:接下來的幾句話可能會導(dǎo)致身體的嚴(yán)重不適)——

subscribe()?中這句話的?onSubscribe?指的是?Observable?中的?onSubscribe?對象蛇损,這個沒有問題赁温,但是?lift()?之后的情況就復(fù)雜了點(diǎn)坛怪。

當(dāng)含有?lift()?時:?

1.lift()?創(chuàng)建了一個?Observable?后,加上之前的原始?Observable股囊,已經(jīng)有兩個?Observable?了袜匿;?

2.而同樣地,新?Observable?里的新?OnSubscribe?加上之前的原始?Observable?中的原始?OnSubscribe稚疹,也就有了兩個?OnSubscribe居灯;?

3.當(dāng)用戶調(diào)用經(jīng)過?lift()?后的?Observable?的?subscribe()?的時候祭务,使用的是?lift()?所返回的新的?Observable?,于是它所觸發(fā)的?onSubscribe.call(subscriber)怪嫌,也是用的新?Observable?中的新?OnSubscribe义锥,即在?lift()?中生成的那個?OnSubscribe;?

4.而這個新?OnSubscribe?的?call()?方法中的?onSubscribe?岩灭,就是指的原始?Observable?中的原始?OnSubscribe?拌倍,在這個?call()方法里,新?OnSubscribe?利用?operator.call(subscriber)?生成了一個新的?Subscriber(Operator?就是在這里噪径,通過自己的?call()?方法將新?Subscriber?和原始?Subscriber?進(jìn)行關(guān)聯(lián)柱恤,并插入自己的『變換』代碼以實(shí)現(xiàn)變換),然后利用這個新?Subscriber?向原始?Observable?進(jìn)行訂閱找爱。?

這樣就實(shí)現(xiàn)了?lift()?過程梗顺,有點(diǎn)像一種代理機(jī)制,通過事件攔截和處理實(shí)現(xiàn)事件序列的變換车摄。

精簡掉細(xì)節(jié)的話寺谤,也可以這么說:在?Observable?執(zhí)行了?lift(Operator)?方法之后,會返回一個新的?Observable吮播,這個新的?Observable?會像一個代理一樣矗漾,負(fù)責(zé)接收原始的?Observable?發(fā)出的事件,并在處理后發(fā)送給?Subscriber薄料。

如果你更喜歡具象思維敞贡,可以看圖:

或者可以看動圖:

兩次和多次的?lift()?同理,如下圖:

舉一個具體的?Operator?的實(shí)現(xiàn)摄职。下面這是一個將事件中的?Integer?對象轉(zhuǎn)換成?String?的例子誊役,僅供參考:

observable.lift(new Observable.Operator() {? ? @Override? ? public Subscriber call(final Subscriber subscriber) {? ? ? ? // 將事件序列中的 Integer 對象轉(zhuǎn)換為 String 對象? ? ? ? return new Subscriber() {? ? ? ? ? ? @Override? ? ? ? ? ? public void onNext(Integer integer) {? ? ? ? ? ? ? ? subscriber.onNext("" + integer);? ? ? ? ? ? }? ? ? ? ? ? @Override? ? ? ? ? ? public void onCompleted() {? ? ? ? ? ? ? ? subscriber.onCompleted();? ? ? ? ? ? }? ? ? ? ? ? @Override? ? ? ? ? ? public void onError(Throwable e) {? ? ? ? ? ? ? ? subscriber.onError(e);? ? ? ? ? ? }? ? ? ? };? ? }});

講述?lift()?的原理只是為了讓你更好地了解 RxJava ,從而可以更好地使用它谷市。然而不管你是否理解了?lift()?的原理蛔垢,RxJava 都不建議開發(fā)者自定義?Operator?來直接使用?lift(),而是建議盡量使用已有的?lift()?包裝方法(如?map()?flatMap()?等)進(jìn)行組合來實(shí)現(xiàn)需求迫悠,因?yàn)橹苯邮褂?lift() 非常容易發(fā)生一些難以發(fā)現(xiàn)的錯誤鹏漆。

3) compose: 對 Observable 整體的變換

除了?lift()?之外,?Observable?還有一個變換方法叫做?compose(Transformer)创泄。它和?lift()?的區(qū)別在于艺玲,?lift()?是針對事件項(xiàng)和事件序列的,而?compose()?是針對?Observable?自身進(jìn)行變換鞠抑。舉個例子饭聚,假設(shè)在程序中有多個?Observable?,并且他們都需要應(yīng)用一組相同的?lift()?變換搁拙。你可以這么寫:

observable1

? ? .lift1()? ? .lift2()? ? .lift3()? ? .lift4()? ? .subscribe(subscriber1);observable2

? ? .lift1()? ? .lift2()? ? .lift3()? ? .lift4()? ? .subscribe(subscriber2);observable3

? ? .lift1()? ? .lift2()? ? .lift3()? ? .lift4()? ? .subscribe(subscriber3);observable4

? ? .lift1()? ? .lift2()? ? .lift3()? ? .lift4()? ? .subscribe(subscriber1);

你覺得這樣太不軟件工程了秒梳,于是你改成了這樣:

private Observable liftAll(Observable observable) {? ? return observable

? ? ? ? .lift1()? ? ? ? .lift2()? ? ? ? .lift3()? ? ? ? .lift4();}...liftAll(observable1).subscribe(subscriber1);liftAll(observable2).subscribe(subscriber2);liftAll(observable3).subscribe(subscriber3);liftAll(observable4).subscribe(subscriber4);

可讀性法绵、可維護(hù)性都提高了±业猓可是?Observable?被一個方法包起來朋譬,這種方式對于?Observale?的靈活性似乎還是增添了那么點(diǎn)限制。怎么辦兴垦?這個時候此熬,就應(yīng)該用?compose()?來解決了:

public class LiftAllTransformer implements Observable.Transformer {? ? @Override? ? public Observable call(Observable observable) {? ? ? ? return observable

? ? ? ? ? ? .lift1()? ? ? ? ? ? .lift2()? ? ? ? ? ? .lift3()? ? ? ? ? ? .lift4();? ? }}...Transformer liftAll = new LiftAllTransformer();observable1.compose(liftAll).subscribe(subscriber1);observable2.compose(liftAll).subscribe(subscriber2);observable3.compose(liftAll).subscribe(subscriber3);observable4.compose(liftAll).subscribe(subscriber4);

像上面這樣,使用?compose()?方法滑进,Observable?可以利用傳入的?Transformer?對象的?call?方法直接對自身進(jìn)行處理犀忱,也就不必被包在方法的里面了。

compose()?的原理比較簡單扶关,不附圖嘍阴汇。

5. 線程控制:Scheduler (二)

除了靈活的變換,RxJava 另一個牛逼的地方节槐,就是線程的自由控制搀庶。

1) Scheduler 的 API (二)

前面講到了,可以利用?subscribeOn()?結(jié)合?observeOn()?來實(shí)現(xiàn)線程控制铜异,讓事件的產(chǎn)生和消費(fèi)發(fā)生在不同的線程哥倔。可是在了解了?map()?flatMap()?等變換方法后揍庄,有些好事的(其實(shí)就是當(dāng)初剛接觸 RxJava 時的我)就問了:能不能多切換幾次線程咆蒿?

答案是:能。因?yàn)?observeOn()?指定的是?Subscriber?的線程蚂子,而這個?Subscriber?并不是(嚴(yán)格說應(yīng)該為『不一定是』沃测,但這里不妨理解為『不是』)subscribe()?參數(shù)中的?Subscriber?,而是?observeOn()?執(zhí)行時的當(dāng)前?Observable?所對應(yīng)的?Subscriber?食茎,即它的直接下級?Subscriber?蒂破。換句話說,observeOn()?指定的是它之后的操作所在的線程别渔。因此如果有多次切換線程的需求附迷,只要在每個想要切換線程的位置調(diào)用一次?observeOn()?即可。上代碼:

Observable.just(1, 2, 3, 4) // IO 線程哎媚,由 subscribeOn() 指定? ? .subscribeOn(Schedulers.io())? ? .observeOn(Schedulers.newThread())? ? .map(mapOperator) // 新線程喇伯,由 observeOn() 指定? ? .observeOn(Schedulers.io())? ? .map(mapOperator2) // IO 線程,由 observeOn() 指定? ? .observeOn(AndroidSchedulers.mainThread)

? ? .subscribe(subscriber);? // Android 主線程抄伍,由 observeOn() 指定

如上艘刚,通過?observeOn()?的多次調(diào)用,程序?qū)崿F(xiàn)了線程的多次切換截珍。

不過攀甚,不同于?observeOn()?,?subscribeOn()?的位置放在哪里都可以岗喉,但它是只能調(diào)用一次的秋度。

又有好事的(其實(shí)還是當(dāng)初的我)問了:如果我非要調(diào)用多次?subscribeOn()?呢?會有什么效果钱床?

這個問題先放著荚斯,我們還是從 RxJava 線程控制的原理說起吧。

2) Scheduler 的原理(二)

其實(shí)查牌,?subscribeOn()?和?observeOn()?的內(nèi)部實(shí)現(xiàn)事期,也是用的?lift()。具體看圖(不同顏色的箭頭表示不同的線程):

subscribeOn()?原理圖:

observeOn()?原理圖:

從圖中可以看出纸颜,subscribeOn()?和?observeOn()?都做了線程切換的工作(圖中的 "schedule..." 部位)兽泣。不同的是,?subscribeOn()的線程切換發(fā)生在?OnSubscribe?中胁孙,即在它通知上一級?OnSubscribe?時唠倦,這時事件還沒有開始發(fā)送,因此?subscribeOn()?的線程控制可以從事件發(fā)出的開端就造成影響涮较;而?observeOn()?的線程切換則發(fā)生在它內(nèi)建的?Subscriber?中稠鼻,即發(fā)生在它即將給下一級?Subscriber?發(fā)送事件時,因此?observeOn()?控制的是它后面的線程狂票。

最后候齿,我用一張圖來解釋當(dāng)多個?subscribeOn()?和?observeOn()?混合使用時,線程調(diào)度是怎么發(fā)生的(由于圖中對象較多闺属,相對于上面的圖對結(jié)構(gòu)做了一些簡化調(diào)整):

圖中共有 5 處含有對事件的操作毛肋。由圖中可以看出,①和②兩處受第一個?subscribeOn()?影響屋剑,運(yùn)行在紅色線程润匙;③和④處受第一個?observeOn()?的影響,運(yùn)行在綠色線程唉匾;⑤處受第二個?onserveOn()?影響孕讳,運(yùn)行在紫色線程;而第二個?subscribeOn()?巍膘,由于在通知過程中線程就被第一個?subscribeOn()?截?cái)喑Р疲虼藢φ麄€流程并沒有任何影響。這里也就回答了前面的問題:當(dāng)使用了多個?subscribeOn()?的時候峡懈,只有第一個?subscribeOn()?起作用璃饱。

3) 延伸:doOnSubscribe()

然而,雖然超過一個的?subscribeOn()?對事件處理的流程沒有影響肪康,但在流程之前卻是可以利用的荚恶。

在前面講?Subscriber?的時候撩穿,提到過?Subscriber?的?onStart()?可以用作流程開始前的初始化。然而?onStart()?由于在?subscribe()?發(fā)生時就被調(diào)用了谒撼,因此不能指定線程食寡,而是只能執(zhí)行在?subscribe()?被調(diào)用時的線程。這就導(dǎo)致如果?onStart()?中含有對線程有要求的代碼(例如在界面上顯示一個 ProgressBar廓潜,這必須在主線程執(zhí)行)抵皱,將會有線程非法的風(fēng)險(xiǎn),因?yàn)橛袝r你無法預(yù)測?subscribe()?將會在什么線程執(zhí)行辩蛋。

而與?Subscriber.onStart()?相對應(yīng)的呻畸,有一個方法?Observable.doOnSubscribe()?。它和?Subscriber.onStart()?同樣是在?subscribe()?調(diào)用后而且在事件發(fā)送前執(zhí)行悼院,但區(qū)別在于它可以指定線程伤为。默認(rèn)情況下,?doOnSubscribe()?執(zhí)行在?subscribe()?發(fā)生的線程樱蛤;而如果在?doOnSubscribe()?之后有?subscribeOn()?的話钮呀,它將執(zhí)行在離它最近的?subscribeOn()?所指定的線程。

示例代碼:

Observable.create(onSubscribe)? ? .subscribeOn(Schedulers.io())? ? .doOnSubscribe(new Action0() {? ? ? ? @Override? ? ? ? public void call() {? ? ? ? ? ? progressBar.setVisibility(View.VISIBLE); // 需要在主線程執(zhí)行? ? ? ? }? ? })? ? .subscribeOn(AndroidSchedulers.mainThread()) // 指定主線程? ? .observeOn(AndroidSchedulers.mainThread())? ? .subscribe(subscriber);

如上昨凡,在?doOnSubscribe()的后面跟一個?subscribeOn()?爽醋,就能指定準(zhǔn)備工作的線程了。

RxJava 的適用場景和使用方式

1. 與 Retrofit 的結(jié)合

Retrofit 是 Square 的一個著名的網(wǎng)絡(luò)請求庫便脊。沒有用過 Retrofit 的可以選擇跳過這一小節(jié)也沒關(guān)系蚂四,我舉的每種場景都只是個例子,而且例子之間并無前后關(guān)聯(lián)哪痰,只是個拋磚引玉的作用遂赠,所以你跳過這里看別的場景也可以的。

Retrofit 除了提供了傳統(tǒng)的?Callback?形式的 API晌杰,還有 RxJava 版本的?Observable?形式 API跷睦。下面我用對比的方式來介紹 Retrofit 的 RxJava 版 API 和傳統(tǒng)版本的區(qū)別。

以獲取一個?User?對象的接口作為例子肋演。使用Retrofit 的傳統(tǒng) API抑诸,你可以用這樣的方式來定義請求:

@GET("/user")public void getUser(@Query("userId") String userId, Callback callback);

在程序的構(gòu)建過程中, Retrofit 會把自動把方法實(shí)現(xiàn)并生成代碼爹殊,然后開發(fā)者就可以利用下面的方法來獲取特定用戶并處理響應(yīng):

getUser(userId, new Callback() {? ? @Override? ? public void success(User user) {? ? ? ? userView.setUser(user);? ? }? ? @Override? ? public void failure(RetrofitError error) {? ? ? ? // Error handling? ? ? ? ...? ? }};

而使用 RxJava 形式的 API蜕乡,定義同樣的請求是這樣的:

@GET("/user")public Observable getUser(@Query("userId") String userId);

使用的時候是這樣的:

getUser(userId)? ? .observeOn(AndroidSchedulers.mainThread())? ? .subscribe(new Observer() {? ? ? ? @Override? ? ? ? public void onNext(User user) {? ? ? ? ? ? userView.setUser(user);? ? ? ? }? ? ? ? @Override? ? ? ? public void onCompleted() {? ? ? ? }? ? ? ? @Override? ? ? ? public void onError(Throwable error) {? ? ? ? ? ? // Error handling? ? ? ? ? ? ...? ? ? ? }? ? });

看到區(qū)別了嗎?

當(dāng) RxJava 形式的時候梗夸,Retrofit 把請求封裝進(jìn)?Observable?层玲,在請求結(jié)束后調(diào)用?onNext()?或在請求失敗后調(diào)用?onError()。

對比來看,?Callback?形式和?Observable?形式長得不太一樣辛块,但本質(zhì)都差不多畔派,而且在細(xì)節(jié)上?Observable?形式似乎還比?Callback形式要差點(diǎn)。那 Retrofit 為什么還要提供 RxJava 的支持呢憨降?

因?yàn)樗糜冒父虑「眯铮倪@個例子看不出來是因?yàn)檫@只是最簡單的情況授药。而一旦情景復(fù)雜起來,?Callback?形式馬上就會開始讓人頭疼呜魄。比如:

假設(shè)這么一種情況:你的程序取到的?User?并不應(yīng)該直接顯示悔叽,而是需要先與數(shù)據(jù)庫中的數(shù)據(jù)進(jìn)行比對和修正后再顯示。使用?Callback方式大概可以這么寫:

getUser(userId, new Callback() {? ? @Override? ? public void success(User user) {? ? ? ? processUser(user); // 嘗試修正 User 數(shù)據(jù)? ? ? ? userView.setUser(user);? ? }? ? @Override? ? public void failure(RetrofitError error) {? ? ? ? // Error handling? ? ? ? ...? ? }};

有問題嗎爵嗅?

很簡便娇澎,但不要這樣做。為什么睹晒?因?yàn)檫@樣做會影響性能趟庄。數(shù)據(jù)庫的操作很重,一次讀寫操作花費(fèi) 10~20ms 是很常見的伪很,這樣的耗時很容易造成界面的卡頓戚啥。所以通常情況下,如果可以的話一定要避免在主線程中處理數(shù)據(jù)庫锉试。所以為了提升性能猫十,這段代碼可以優(yōu)化一下:

getUser(userId, new Callback() {? ? @Override? ? public void success(User user) {? ? ? ? new Thread() {? ? ? ? ? ? @Override? ? ? ? ? ? public void run() {? ? ? ? ? ? ? ? processUser(user); // 嘗試修正 User 數(shù)據(jù)? ? ? ? ? ? ? ? runOnUiThread(new Runnable() { // 切回 UI 線程? ? ? ? ? ? ? ? ? ? @Override? ? ? ? ? ? ? ? ? ? public void run() {? ? ? ? ? ? ? ? ? ? ? ? userView.setUser(user);? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? });? ? ? ? ? ? }).start();? ? }? ? @Override? ? public void failure(RetrofitError error) {? ? ? ? // Error handling? ? ? ? ...? ? }};

性能問題解決,但……這代碼實(shí)在是太亂了呆盖,迷之縮進(jìn)巴显啤!雜亂的代碼往往不僅僅是美觀問題应又,因?yàn)榇a越亂往往就越難讀懂宙项,而如果項(xiàng)目中充斥著雜亂的代碼,無疑會降低代碼的可讀性株扛,造成團(tuán)隊(duì)開發(fā)效率的降低和出錯率的升高尤筐。

這時候,如果用 RxJava 的形式席里,就好辦多了叔磷。 RxJava 形式的代碼是這樣的:

getUser(userId)? ? .doOnNext(new Action1() {? ? ? ? @Override? ? ? ? public void call(User user) {? ? ? ? ? ? processUser(user);? ? ? ? })? ? .observeOn(AndroidSchedulers.mainThread())? ? .subscribe(new Observer() {? ? ? ? @Override? ? ? ? public void onNext(User user) {? ? ? ? ? ? userView.setUser(user);? ? ? ? }? ? ? ? @Override? ? ? ? public void onCompleted() {? ? ? ? }? ? ? ? @Override? ? ? ? public void onError(Throwable error) {? ? ? ? ? ? // Error handling? ? ? ? ? ? ...? ? ? ? }? ? });

后臺代碼和前臺代碼全都寫在一條鏈中,明顯清晰了很多奖磁。

再舉一個例子:假設(shè)?/user?接口并不能直接訪問改基,而需要填入一個在線獲取的?token?,代碼應(yīng)該怎么寫?

Callback?方式秕狰,可以使用嵌套的?Callback:

@GET("/token")public void getToken(Callback callback);@GET("/user")public void getUser(@Query("token") String token, @Query("userId") String userId, Callback callback);...getToken(new Callback() {? ? @Override? ? public void success(String token) {? ? ? ? getUser(token, userId, new Callback() {? ? ? ? ? ? @Override? ? ? ? ? ? public void success(User user) {? ? ? ? ? ? ? ? userView.setUser(user);? ? ? ? ? ? }? ? ? ? ? ? @Override? ? ? ? ? ? public void failure(RetrofitError error) {? ? ? ? ? ? ? ? // Error handling? ? ? ? ? ? ? ? ...? ? ? ? ? ? }? ? ? ? };? ? }? ? @Override? ? public void failure(RetrofitError error) {? ? ? ? // Error handling? ? ? ? ...? ? }});

倒是沒有什么性能問題稠腊,可是迷之縮進(jìn)毀一生,你懂我也懂鸣哀,做過大項(xiàng)目的人應(yīng)該更懂架忌。

而使用 RxJava 的話,代碼是這樣的:

@GET("/token")public Observable getToken();@GET("/user")public Observable getUser(@Query("token") String token, @Query("userId") String userId);...getToken()? ? .flatMap(new Func1>() {? ? ? ? @Override? ? ? ? public Observable onNext(String token) {? ? ? ? ? ? return getUser(token, userId);? ? ? ? })? ? .observeOn(AndroidSchedulers.mainThread())? ? .subscribe(new Observer() {? ? ? ? @Override? ? ? ? public void onNext(User user) {? ? ? ? ? ? userView.setUser(user);? ? ? ? }? ? ? ? @Override? ? ? ? public void onCompleted() {? ? ? ? }? ? ? ? @Override? ? ? ? public void onError(Throwable error) {? ? ? ? ? ? // Error handling? ? ? ? ? ? ...? ? ? ? }? ? });

用一個?flatMap()?就搞定了邏輯我衬,依然是一條鏈叹放。看著就很爽挠羔,是吧井仰?

2016/03/31 更新,加上我寫的一個 Sample 項(xiàng)目:?

rengwuxian RxJava Samples

好破加,Retrofit 部分就到這里俱恶。

2. RxBinding

RxBinding?是 Jake Wharton 的一個開源庫,它提供了一套在 Android 平臺上的基于 RxJava 的 Binding API范舀。所謂 Binding合是,就是類似設(shè)置?OnClickListener?、設(shè)置?TextWatcher?這樣的注冊綁定對象的 API锭环。

舉個設(shè)置點(diǎn)擊監(jiān)聽的例子聪全。使用?RxBinding?,可以把事件監(jiān)聽用這樣的方法來設(shè)置:

Button button = ...;RxView.clickEvents(button) // 以 Observable 形式來反饋點(diǎn)擊事件? ? .subscribe(new Action1() {? ? ? ? @Override? ? ? ? public void call(ViewClickEvent event) {? ? ? ? ? ? // Click handling? ? ? ? }? ? });

看起來除了形式變了沒什么區(qū)別田藐,實(shí)質(zhì)上也是這樣荔烧。甚至如果你看一下它的源碼,你會發(fā)現(xiàn)它連實(shí)現(xiàn)都沒什么驚喜:它的內(nèi)部是直接用一個包裹著的?setOnClickListener()?來實(shí)現(xiàn)的汽久。然而鹤竭,僅僅這一個形式的改變,卻恰好就是?RxBinding?的目的:擴(kuò)展性景醇。通過?RxBinding?把點(diǎn)擊監(jiān)聽轉(zhuǎn)換成?Observable?之后臀稚,就有了對它進(jìn)行擴(kuò)展的可能。擴(kuò)展的方式有很多三痰,根據(jù)需求而定吧寺。一個例子是前面提到過的?throttleFirst()?,用于去抖動散劫,也就是消除手抖導(dǎo)致的快速連環(huán)點(diǎn)擊:

RxView.clickEvents(button)? ? .throttleFirst(500, TimeUnit.MILLISECONDS)? ? .subscribe(clickAction);

如果想對?RxBinding?有更多了解稚机,可以去它的?GitHub 項(xiàng)目?下面看看。

3. 各種異步操作

前面舉的?Retrofit?和?RxBinding?的例子获搏,是兩個可以提供現(xiàn)成的?Observable?的庫赖条。而如果你有某些異步操作無法用這些庫來自動生成?Observable,也完全可以自己寫。例如數(shù)據(jù)庫的讀寫纬乍、大圖片的載入碱茁、文件壓縮/解壓等各種需要放在后臺工作的耗時操作,都可以用 RxJava 來實(shí)現(xiàn)仿贬,有了之前幾章的例子纽竣,這里應(yīng)該不用再舉例了。

4. RxBus

RxBus 名字看起來像一個庫茧泪,但它并不是一個庫蜓氨,而是一種模式,它的思想是使用 RxJava 來實(shí)現(xiàn)了 EventBus 调炬,而讓你不再需要使用?Otto?或者 GreenRobot 的?EventBus语盈。至于什么是 RxBus舱馅,可以看這篇文章缰泡。順便說一句,F(xiàn)lipboard 已經(jīng)用 RxBus 替換掉了?Otto?代嗤,目前為止沒有不良反應(yīng)棘钞。

最后

對于 Android 開發(fā)者來說, RxJava 是一個很難上手的庫干毅,因?yàn)樗鼘τ?Android 開發(fā)者來說有太多陌生的概念了宜猜。可是它真的很牛逼硝逢。因此姨拥,我寫了這篇《給 Android 開發(fā)者的 RxJava 詳解》,希望能給始終搞不明白什么是 RxJava 的人一些入門的指引渠鸽,或者能讓正在使用 RxJava 但仍然心存疑惑的人看到一些更深入的解析叫乌。無論如何,只要能給各位同為 Android 工程師的你們提供一些幫助徽缚,這篇文章的目的就達(dá)到了憨奸。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市凿试,隨后出現(xiàn)的幾起案子排宰,更是在濱河造成了極大的恐慌,老刑警劉巖那婉,帶你破解...
    沈念sama閱讀 219,270評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件板甘,死亡現(xiàn)場離奇詭異,居然都是意外死亡详炬,警方通過查閱死者的電腦和手機(jī)盐类,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,489評論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人傲醉,你說我怎么就攤上這事蝇闭。” “怎么了硬毕?”我有些...
    開封第一講書人閱讀 165,630評論 0 356
  • 文/不壞的土叔 我叫張陵呻引,是天一觀的道長。 經(jīng)常有香客問我吐咳,道長逻悠,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,906評論 1 295
  • 正文 為了忘掉前任韭脊,我火速辦了婚禮童谒,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘沪羔。我一直安慰自己饥伊,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,928評論 6 392
  • 文/花漫 我一把揭開白布蔫饰。 她就那樣靜靜地躺著琅豆,像睡著了一般。 火紅的嫁衣襯著肌膚如雪篓吁。 梳的紋絲不亂的頭發(fā)上茫因,一...
    開封第一講書人閱讀 51,718評論 1 305
  • 那天,我揣著相機(jī)與錄音杖剪,去河邊找鬼冻押。 笑死,一個胖子當(dāng)著我的面吹牛盛嘿,可吹牛的內(nèi)容都是我干的洛巢。 我是一名探鬼主播,決...
    沈念sama閱讀 40,442評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼孩擂,長吁一口氣:“原來是場噩夢啊……” “哼狼渊!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起类垦,我...
    開封第一講書人閱讀 39,345評論 0 276
  • 序言:老撾萬榮一對情侶失蹤呈枉,失蹤者是張志新(化名)和其女友劉穎褐健,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,802評論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡僵娃,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,984評論 3 337
  • 正文 我和宋清朗相戀三年饲宿,在試婚紗的時候發(fā)現(xiàn)自己被綠了胡桨。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片履植。...
    茶點(diǎn)故事閱讀 40,117評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡良瞧,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出训唱,到底是詐尸還是另有隱情褥蚯,我是刑警寧澤,帶...
    沈念sama閱讀 35,810評論 5 346
  • 正文 年R本政府宣布况增,位于F島的核電站赞庶,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏澳骤。R本人自食惡果不足惜歧强,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,462評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望为肮。 院中可真熱鬧摊册,春花似錦、人聲如沸颊艳。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,011評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽籽暇。三九已至温治,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間戒悠,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,139評論 1 272
  • 我被黑心中介騙來泰國打工舟山, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留绸狐,地道東北人。 一個月前我還...
    沈念sama閱讀 48,377評論 3 373
  • 正文 我出身青樓累盗,卻偏偏與公主長得像寒矿,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子若债,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,060評論 2 355

推薦閱讀更多精彩內(nèi)容