Rxjava的使用<一>

原文地址:http://gank.io/post/560e15be2dca930e00da1083

前言

我從去年開始使用 RxJava ,到現(xiàn)在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的 Android 項(xiàng)目也在使用 RxJava ,并且使用的場景越來越多 。而最近這幾個月,我也發(fā)現(xiàn)國內(nèi)越來越多的人開始提及 RxJava 墓贿。有人說『RxJava 真是太好用了』,有人說『RxJava 真是太難用了』蜓氨,另外更多的人表示:我真的百度了也谷歌了募壕,但我還是想問: RxJava 到底是什么?

鑒于 RxJava 目前這種既火爆又神秘的現(xiàn)狀语盈,而我又在一年的使用過程中對 RxJava 有了一些理解舱馅,我決定寫下這篇文章來對 RxJava 做一個相對詳細(xì)的、針對 Android 開發(fā)者的介紹刀荒。

這篇文章的目的有兩個:

1代嗤、給對 RxJava 感興趣的人一些入門的指引

2、 給正在使用 RxJava 但仍然心存疑惑的人一些更深入的解析

在正文開始之前的最后缠借,放上GitHub鏈接和引入依賴的gradle
代碼:Github:
https://github.com/ReactiveX/RxJava
https://github.com/ReactiveX/RxAndroid

引入依賴:

compile 'io.reactivex:rxjava:1.0.14'
compile 'io.reactivex:rxandroid:1.0.1'

(版本號是文章發(fā)布時的最新穩(wěn)定版)

RxJava 到底是什么

一個詞:異步干毅。
RxJava 在 GitHub 主頁上的自我介紹是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一個在 Java VM 上使用可觀測的序列來組成異步的、基于事件的程序的庫)泼返。這就是 RxJava 硝逢,概括得非常精準(zhǔn)。

然而,對于初學(xué)者來說渠鸽,這太難看懂了叫乌。因?yàn)樗且粋€『總結(jié)』,而初學(xué)者更需要一個『引言』徽缚。

其實(shí)憨奸, RxJava 的本質(zhì)可以壓縮為異步這一個詞。說到根上凿试,它就是一個實(shí)現(xiàn)異步操作的庫排宰,而別的定語都是基于這之上的。

RxJava 好在哪

換句話說那婉,『同樣是做異步板甘,為什么人們用它,而不用現(xiàn)成的 AsyncTask / Handler / XXX / ... 详炬?』

一個詞:簡潔盐类。

異步操作很關(guān)鍵的一點(diǎn)是程序的簡潔性,因?yàn)樵谡{(diào)度過程比較復(fù)雜的情況下痕寓,異步代碼經(jīng)常會既難寫也難被讀懂傲醉。 Android 創(chuàng)造的AsyncTask
和Handler
蝇闭,其實(shí)都是為了讓異步代碼更加簡潔呻率。RxJava 的優(yōu)勢也是簡潔,但它的簡潔的與眾不同之處在于呻引,隨著程序邏輯變得越來越復(fù)雜礼仗,它依然能夠保持簡潔。

舉個例子

假設(shè)有這樣一個需求:界面上有一個自定義的視圖imageCollectorView逻悠,它的作用是顯示多張圖片元践,并能使用addImage(Bitmap)方法來任意增加顯示的圖片。現(xiàn)在需要程序?qū)⒁粋€給出的目錄數(shù)組File[] folders中每個目錄下的 png 圖片都加載出來并顯示在imageCollectorView中童谒。需要注意的是单旁,由于讀取圖片的這一過程較為耗時,需要放在后臺執(zhí)行饥伊,而圖片的顯示則必須在 UI 線程執(zhí)行象浑。常用的實(shí)現(xiàn)方式有多種,我這里貼出其中一種:

new Thread() {
    @Override public void run() {
        super.run();
        for (File folder : folders) {
            File[] files = folder.listFiles();
            for (File file : files) {
                if (file.getName().endsWith(".png")) {
                    final Bitmap bitmap = getBitmapFromFile(file);
                    getActivity().runOnUiThread(new Runnable() {
                        @Override public void run() {
                            imageCollectorView.addImage(bitmap);
                        }
                    });
                }
            }
        }
    }
} .start();

而如果使用 RxJava 琅豆,實(shí)現(xiàn)方式是這樣的:

Observable.from(folders)
    .flatMap(new Func1<File, Observable<File>>() {
        @Override
        public Observable<File> call(File file) {
            return Observable.from(file.listFiles());
        }
    })
    .filter(new Func1<File, Boolean>() {
        @Override
        public Boolean call(File file) {
            return file.getName().endsWith(".png");
        }
    })
    .map(new Func1<File, Bitmap>() {
        @Override
        public Bitmap call(File file) {
            return getBitmapFromFile(file);
        }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<Bitmap>() {
        @Override
        public void call(Bitmap bitmap) {
            imageCollectorView.addImage(bitmap);
        }
    });

那位說話了:『你這代碼明明變多了坝洳颉!簡潔個毛懊R颉蚪拦!』大兄弟你消消氣,我說的是邏輯的簡潔,不是單純的代碼量少(邏輯簡潔才是提升讀寫代碼速度的必殺技對不驰贷?)盛嘿。觀察一下你會發(fā)現(xiàn), RxJava 的這個實(shí)現(xiàn)饱苟,是一條從上到下的鏈?zhǔn)秸{(diào)用孩擂,沒有任何嵌套,這在邏輯的簡潔性上是具有優(yōu)勢的箱熬。當(dāng)需求變得復(fù)雜時类垦,這種優(yōu)勢將更加明顯(試想如果還要求只選取前 10 張圖片,常規(guī)方式要怎么辦城须?如果有更多這樣那樣的要求呢蚤认?再試想,在這一大堆需求實(shí)現(xiàn)完兩個月之后需要改功能糕伐,當(dāng)你翻回這里看到自己當(dāng)初寫下的那一片迷之縮進(jìn)砰琢,你能保證自己將迅速看懂,而不是對著代碼重新捋一遍思路良瞧?)陪汽。
另外,如果你的 IDE 是 Android Studio 褥蚯,其實(shí)每次打開某個 Java 文件的時候挚冤,你會看到被自動 Lambda 化的預(yù)覽,這將讓你更加清晰地看到程序邏輯:


Observable.from(folders)
    .flatMap((Func1) (folder) -> { Observable.from(file.listFiles()) })
    .filter((Func1) (file) -> { file.getName().endsWith(".png") })
    .map((Func1) (file) -> { getBitmapFromFile(file) })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe((Action1) (bitmap) -> { imageCollectorView.addImage(bitmap) });

如果你習(xí)慣使用 Retrolambda 赞庶,你也可以直接把代碼寫成上面這種簡潔的形式训挡。而如果你看到這里還不知道什么是 Retrolambda ,我不建議你現(xiàn)在就去學(xué)習(xí)它歧强。原因有兩點(diǎn):

  • 1澜薄、Lambda 是把雙刃劍,它讓你的代碼簡潔的同時摊册,降低了代碼的可讀性肤京,因此同時學(xué)習(xí) RxJava 和 Retrolambda 可能會讓你忽略 RxJava 的一些技術(shù)細(xì)節(jié);
  • 2茅特、Retrolambda 是 Java 6/7 對 Lambda 表達(dá)式的非官方兼容方案忘分,它的向后兼容性和穩(wěn)定性是無法保障的,因此對于企業(yè)項(xiàng)目温治,使用 Retrolambda 是有風(fēng)險的饭庞。所以,與很多 RxJava 的推廣者不同熬荆,我并不推薦在學(xué)習(xí) RxJava 的同時一起學(xué)習(xí) Retrolambda舟山。事實(shí)上,我個人雖然很欣賞 Retrolambda,但我從來不用它累盗。

在Flipboard 的 Android 代碼中寒矿,有一段邏輯非常復(fù)雜,包含了多次內(nèi)存操作若债、本地文件操作和網(wǎng)絡(luò)操作符相,對象分分合合,線程間相互配合相互等待蠢琳,一會兒排成人字啊终,一會兒排成一字。如果使用常規(guī)的方法來實(shí)現(xiàn)傲须,肯定是要寫得欲仙欲死蓝牲,然而在使用 RxJava 的情況下,依然只是一條鏈?zhǔn)秸{(diào)用就完成了泰讽。它很長例衍,但很清晰。

所以已卸, RxJava 好在哪佛玄?就好在簡潔,好在那把什么復(fù)雜邏輯都能穿成一條線的簡潔累澡。

API 介紹和原理簡析

這個我就做不到一個詞說明了……因?yàn)檫@一節(jié)的主要內(nèi)容就是一步步地說明 RxJava 到底怎樣做到了異步梦抢,怎樣做到了簡潔。

1. 概念:擴(kuò)展的觀察者模式

RxJava 的異步實(shí)現(xiàn)永乌,是通過一種擴(kuò)展的觀察者模式來實(shí)現(xiàn)的惑申。

觀察者模式具伍; 也可以參考這篇文章《設(shè)計模式總結(jié)筆記<四> 觀察者模式》

先簡述一下觀察者模式翅雏,已經(jīng)熟悉的可以跳過這一段。
觀察者模式面向的需求是:A 對象(觀察者)對 B 對象(被觀察者)的某種變化高度敏感人芽,需要在 B 變化的一瞬間做出反應(yīng)望几。舉個例子,新聞里喜聞樂見的警察抓小偷萤厅,警察需要在小偷伸手作案的時候?qū)嵤┳ゲ堕夏āT谶@個例子里,警察是觀察者惕味,小偷是被觀察者楼誓,警察需要時刻盯著小偷的一舉一動,才能保證不會漏過任何瞬間名挥。程序的觀察者模式和這種真正的『觀察』略有不同疟羹,觀察者不需要時刻盯著被觀察者(例如 A 不需要每過 2ms 就檢查一次 B 的狀態(tài)),而是采用注冊(Register)或者稱為訂閱(Subscribe)的方式,告訴被觀察者:我需要你的某某狀態(tài)榄融,你要在它變化的時候通知我参淫。 Android 開發(fā)中一個比較典型的例子是點(diǎn)擊監(jiān)聽器 OnClickListener 。對設(shè)置 OnClickListener 來說愧杯, View 是被觀察者涎才, OnClickListener 是觀察者,二者通過 setOnClickListener() 方法達(dá)成訂閱關(guān)系力九。訂閱之后用戶點(diǎn)擊按鈕的瞬間耍铜,Android Framework 就會將點(diǎn)擊事件發(fā)送給已經(jīng)注冊的 OnClickListener 。采取這樣被動的觀察方式跌前,既省去了反復(fù)檢索狀態(tài)的資源消耗业扒,也能夠得到最高的反饋速度。當(dāng)然舒萎,這也得益于我們可以隨意定制自己程序中的觀察者和被觀察者程储,而警察叔叔明顯無法要求小偷『你在作案的時候務(wù)必通知我』。
OnClickListener 的模式大致如下圖:

OnClickListener 觀察者模式

如圖所示臂寝,通過 setOnClickListener() 方法章鲤,Button 持有 OnClickListener 的引用(這一過程沒有在圖上畫出);當(dāng)用戶點(diǎn)擊時咆贬,Button 自動調(diào)用 OnClickListener 的 onClick() 方法败徊。另外,如果把這張圖中的概念抽象出來(Button -> 被觀察者掏缎、OnClickListener -> 觀察者皱蹦、setOnClickListener() -> 訂閱,onClick() -> 事件)眷蜈,就由專用的觀察者模式(例如只用于監(jiān)聽控件點(diǎn)擊)轉(zhuǎn)變成了通用的觀察者模式沪哺。如下圖:

通用觀察者模式

而 RxJava 作為一個工具庫,使用的就是通用形式的觀察者模式酌儒。

RxJava 的觀察者模式

RxJava 有四個基本概念:Observable (可觀察者辜妓,即被觀察者)、 Observer (觀察者)忌怎、 subscribe (訂閱)籍滴、事件。Observable 和 Observer 通過 subscribe() 方法實(shí)現(xiàn)訂閱關(guān)系榴啸,從而 Observable 可以在需要的時候發(fā)出事件來通知 Observer孽惰。

與傳統(tǒng)觀察者模式不同, RxJava 的事件回調(diào)方法除了普通事件 onNext() (相當(dāng)于 onClick() / onEvent())之外鸥印,還定義了兩個特殊的事件:onCompleted() 和 onError()勋功。

  • onCompleted(): 事件隊(duì)列完結(jié)腥例。RxJava 不僅把每個事件單獨(dú)處理,還會把它們看做一個隊(duì)列酝润。RxJava 規(guī)定燎竖,當(dāng)不會再有新的 onNext() 發(fā)出時,需要觸發(fā) onCompleted() 方法作為標(biāo)志要销。
  • onError(): 事件隊(duì)列異常构回。在事件處理過程中出異常時,onError() 會被觸發(fā)疏咐,同時隊(duì)列自動終止纤掸,不允許再有事件發(fā)出。
  • 在一個正確運(yùn)行的事件序列中, onCompleted() 和 onError() 有且只有一個浑塞,并且是事件序列中的最后一個借跪。需要注意的是,onCompleted() 和 onError() 二者也是互斥的酌壕,即在隊(duì)列中調(diào)用了其中一個掏愁,就不應(yīng)該再調(diào)用另一個。

RxJava 的觀察者模式大致如下圖:


RxJava 的觀察者模式
2. 基本實(shí)現(xiàn)

基于以上的概念卵牍, RxJava 的基本實(shí)現(xiàn)主要有三點(diǎn):

  1. 創(chuàng)建 Observer

Observer 即觀察者果港,它決定事件觸發(fā)的時候?qū)⒂性鯓拥男袨椤xJava 中的Observer接口的實(shí)現(xiàn)方式:

Observer<String> observer = new Observer<String>() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }

    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!");
    }
};

除了Observer接口之外糊昙,RxJava 還內(nèi)置了一個實(shí)現(xiàn)了Observer的抽象類:Subscriber辛掠。Subscriber對Observer接口進(jìn)行了一些擴(kuò)展,但他們的基本使用方式是完全一樣的:

Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }

    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!");
    }
};

不僅基本使用方式一樣释牺,實(shí)質(zhì)上萝衩,在 RxJava 的 subscribe 過程中,Observer 也總是會先被轉(zhuǎn)換成一個 Subscriber 再使用没咙。所以如果你只想使用基本功能猩谊,選擇 Observer 和 Subscriber 是完全一樣的。它們的區(qū)別對于使用者來說主要有兩點(diǎn):

  • 1.onStart(): 這是Subscriber增加的方法镜撩。它會在 subscribe 剛開始预柒,而事件還未發(fā)送之前被調(diào)用队塘,可以用于做一些準(zhǔn)備工作袁梗,例如數(shù)據(jù)的清零或重置。這是一個可選方法憔古,默認(rèn)情況下它的實(shí)現(xiàn)為空遮怜。需要注意的是,如果對準(zhǔn)備工作的線程有要求(例如彈出一個顯示進(jìn)度的對話框鸿市,這必須在主線程執(zhí)行)锯梁,onStart()就不適用了即碗,因?yàn)樗偸窃?subscribe 所發(fā)生的線程被調(diào)用,而不能指定線程陌凳。要在指定的線程來做準(zhǔn)備工作剥懒,可以使用doOnSubscribe()方法,具體可以在后面的文中看到合敦。
  • 2.unsubscribe(): 這是Subscriber所實(shí)現(xiàn)的另一個接口Subscription
    的方法初橘,用于取消訂閱。在這個方法被調(diào)用后充岛,Subscriber將不再接收事件保檐。一般在這個方法調(diào)用前,可以使用isUnsubscribed()先判斷一下狀態(tài)崔梗。unsubscribe()這個方法很重要夜只,因?yàn)樵趕ubscribe()
    之后,Observable會持有Subscriber的引用蒜魄,這個引用如果不能及時被釋放扔亥,將有內(nèi)存泄露的風(fēng)險。所以最好保持一個原則:要在不再使用的時候盡快在合適的地方(例如onPause() onStop()等方法中)調(diào)用unsubscribe()來解除引用關(guān)系谈为,以避免內(nèi)存泄露的發(fā)生砸王。
  1. 創(chuàng)建 Observable
    Observable 即被觀察者,它決定什么時候觸發(fā)事件以及觸發(fā)怎樣的事件峦阁。 RxJava 使用 create() 方法來創(chuàng)建一個 Observable 谦铃,并為它定義事件觸發(fā)規(guī)則:
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello");
        subscriber.onNext("Hi");
        subscriber.onNext("Aloha");
        subscriber.onCompleted();
    }
});

可以看到,這里傳入了一個 OnSubscribe 對象作為參數(shù)榔昔。OnSubscribe 會被存儲在返回的 Observable 對象中驹闰,它的作用相當(dāng)于一個計劃表,當(dāng) Observable 被訂閱的時候撒会,OnSubscribe 的 call() 方法會自動被調(diào)用嘹朗,事件序列就會依照設(shè)定依次觸發(fā)(對于上面的代碼,就是觀察者Subscriber 將會被調(diào)用三次 onNext() 和一次 onCompleted())诵肛。這樣屹培,由被觀察者調(diào)用了觀察者的回調(diào)方法场刑,就實(shí)現(xiàn)了由被觀察者向觀察者的事件傳遞猜绣,即觀察者模式秸脱。

這個例子很簡單:事件的內(nèi)容是字符串癞己,而不是一些復(fù)雜的對象茅茂;事件的內(nèi)容是已經(jīng)定好了的呐能,而不像有的觀察者模式一樣是待確定的(例如網(wǎng)絡(luò)請求的結(jié)果在請求返回之前是未知的)往堡;所有事件在一瞬間被全部發(fā)送出去坝疼,而不是夾雜一些確定或不確定的時間間隔或者經(jīng)過某種觸發(fā)器來觸發(fā)的乙埃≌⒂ⅲ總之锯岖,這個例子看起來毫無實(shí)用價值。但這是為了便于說明甫何,實(shí)質(zhì)上只要你想出吹,各種各樣的事件發(fā)送規(guī)則你都可以自己來寫。至于具體怎么做辙喂,后面都會講到趋箩,但現(xiàn)在不行。只有把基礎(chǔ)原理先說明白了加派,上層的運(yùn)用才能更容易說清楚叫确。

create() 方法是 RxJava 最基本的創(chuàng)造事件序列的方法∩纸酰基于這個方法竹勉, RxJava 還提供了一些方法用來快捷創(chuàng)建事件隊(duì)列,例如:

  • just(T...): 將傳入的參數(shù)依次發(fā)送出來娄琉。
Observable observable = Observable.just("Hello", "Hi", "Aloha");
// 將會依次調(diào)用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
  • from(T[]) / from(Iterable<? extends T>) : 將傳入的數(shù)組或 Iterable 拆分成具體對象后次乓,依次發(fā)送出來。
String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);
// 將會依次調(diào)用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();

上面 just(T...) 的例子和 from(T[]) 的例子孽水,都和之前的 create(OnSubscribe) 的例子是等價的票腰。

  1. Subscribe (訂閱)

創(chuàng)建了 Observable 和 Observer 之后,再用 subscribe() 方法將它們聯(lián)結(jié)起來女气,整條鏈子就可以工作了杏慰。代碼形式很簡單:

observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);

有人可能會注意到, subscribe() 這個方法有點(diǎn)怪:它看起來是『observalbe 訂閱了 observer / subscriber』而不是『observer / subscriber 訂閱了 observalbe』炼鞠,這看起來就像『雜志訂閱了讀者』一樣顛倒了對象關(guān)系缘滥。這讓人讀起來有點(diǎn)別扭,不過如果把 API 設(shè)計成 observer.subscribe(observable) / subscriber.subscribe(observable) 谒主,雖然更加符合思維邏輯朝扼,但對流式 API 的設(shè)計就造成影響了,比較起來明顯是得不償失的霎肯。

Observable.subscribe(Subscriber) 的內(nèi)部實(shí)現(xiàn)是這樣的(僅核心代碼):

// 注意:這不是 subscribe() 的源碼擎颖,而是將源碼中與性能、兼容性观游、擴(kuò)展性有關(guān)的代碼剔除后的核心代碼搂捧。
// 如果需要看源碼,可以去 RxJava 的 GitHub 倉庫下載备典。
public Subscription subscribe(Subscriber subscriber) {
    subscriber.onStart();
    onSubscribe.call(subscriber);
    return subscriber;
}

可以看到异旧,subscriber() 做了3件事:

  • 1、調(diào)用Subscriber.onStart()提佣。這個方法在前面已經(jīng)介紹過吮蛹,是一個可選的準(zhǔn)備方法。
  • 2拌屏、調(diào)用Observable中的OnSubscribe.call(Subscriber)潮针。在這里,事件發(fā)送的邏輯開始運(yùn)行倚喂。從這也可以看出每篷,在 RxJava 中,Observable并不是在創(chuàng)建的時候就立即開始發(fā)送事件端圈,而是在它被訂閱的時候焦读,即當(dāng)subscribe()方法執(zhí)行的時候。
  • 3舱权、將傳入的Subscriber
    作為Subscription
    返回矗晃。這是為了方便unsubscribe()

整個過程中對象間的關(guān)系如下圖:

關(guān)系靜圖

或者可以看動圖:

關(guān)系靜圖

除了 subscribe(Observer) 和 subscribe(Subscriber) ,subscribe() 還支持不完整定義的回調(diào)宴倍,RxJava 會自動根據(jù)定義創(chuàng)建出 Subscriber 张症。形式如下:

Action1<String> onNextAction = new Action1<String>() {
    // onNext()
    @Override
    public void call(String s) {
        Log.d(tag, s);
    }
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
    // onError()
    @Override
    public void call(Throwable throwable) {
        // Error handling
    }
};
Action0 onCompletedAction = new Action0() {
    // onCompleted()
    @Override
    public void call() {
        Log.d(tag, "completed");
    }
};

// 自動創(chuàng)建 Subscriber ,并使用 onNextAction 來定義 onNext()
observable.subscribe(onNextAction);
// 自動創(chuàng)建 Subscriber 鸵贬,并使用 onNextAction 和 onErrorAction 來定義 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自動創(chuàng)建 Subscriber 俗他,并使用 onNextAction、 onErrorAction 和 onCompletedAction 來定義 onNext()阔逼、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);

簡單解釋一下這段代碼中出現(xiàn)的 Action1 和 Action0兆衅。 Action0 是 RxJava 的一個接口,它只有一個方法 call()嗜浮,這個方法是無參無返回值的涯保;由于 onCompleted() 方法也是無參無返回值的,因此 Action0 可以被當(dāng)成一個包裝對象周伦,將 onCompleted() 的內(nèi)容打包起來將自己作為一個參數(shù)傳入 subscribe() 以實(shí)現(xiàn)不完整定義的回調(diào)夕春。這樣其實(shí)也可以看做將 onCompleted() 方法作為參數(shù)傳進(jìn)了 subscribe(),相當(dāng)于其他某些語言中的『閉包』专挪。 Action1 也是一個接口及志,它同樣只有一個方法 call(T param),這個方法也無返回值寨腔,但有一個參數(shù)速侈;與 Action0 同理,由于 onNext(T obj) 和 onError(Throwable error) 也是單參數(shù)無返回值的迫卢,因此 Action1 可以將 onNext(obj) 和 onError(error) 打包起來傳入 subscribe() 以實(shí)現(xiàn)不完整定義的回調(diào)倚搬。事實(shí)上,雖然 Action0 和 Action1 在 API 中使用最廣泛乾蛤,但 RxJava 是提供了多個 ActionX 形式的接口 (例如 Action2, Action3) 的每界,它們可以被用以包裝不同的無返回值的方法捅僵。

注:正如前面所提到的,Observer 和 Subscriber 具有相同的角色眨层,而且 Observer 在 subscribe() 過程中最終會被轉(zhuǎn)換成 Subscriber 對象庙楚,因此,從這里開始趴樱,后面的描述我將用 Subscriber 來代替 Observer 馒闷,這樣更加嚴(yán)謹(jǐn)。

  1. 場景示例

下面舉兩個例子:

為了把原理用更清晰的方式表述出來叁征,本文中挑選的都是功能盡可能簡單的例子纳账,以至于有些示例代碼看起來會有『畫蛇添足』『明明不用 RxJava 可以更簡便地解決問題』的感覺。當(dāng)你看到這種情況捺疼,不要覺得是因?yàn)?RxJava 太啰嗦疏虫,而是因?yàn)樵谶^早的時候舉出真實(shí)場景的例子并不利于原理的解析,因此我刻意挑選了簡單的情景帅涂。

a. 打印字符串?dāng)?shù)組

將字符串?dāng)?shù)組 names 中的所有字符串依次打印出來:

String[] names = ...;
Observable.from(names)
    .subscribe(new Action1<String>() {
        @Override
        public void call(String name) {
            Log.d(tag, name);
        }
    });

b. 由 id 取得圖片并顯示

由指定的一個 drawable 文件 id drawableRes 取得圖片议薪,并顯示在 ImageView 中,并在出現(xiàn)異常的時候打印 Toast 報錯:

int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe<Drawable>() {
    @Override
    public void call(Subscriber<? super Drawable> subscriber) {
        Drawable drawable = getTheme().getDrawable(drawableRes));
        subscriber.onNext(drawable);
        subscriber.onCompleted();
    }
}).subscribe(new Observer<Drawable>() {
    @Override
    public void onNext(Drawable drawable) {
        imageView.setImageDrawable(drawable);
    }

    @Override
    public void onCompleted() {
    }

    @Override
    public void onError(Throwable e) {
        Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
    }
});

正如上面兩個例子這樣媳友,創(chuàng)建出Observable和Subscriber
斯议,再用subscribe()將它們串起來,一次 RxJava 的基本使用就完成了醇锚。非常簡單哼御。
然而,


這并沒有什么diao用

在 RxJava 的默認(rèn)規(guī)則中焊唬,事件的發(fā)出和消費(fèi)都是在同一個線程的恋昼。也就是說,如果只用上面的方法赶促,實(shí)現(xiàn)出來的只是一個同步的觀察者模式液肌。觀察者模式本身的目的就是『后臺處理,前臺回調(diào)』的異步機(jī)制鸥滨,因此異步對于 RxJava 是至關(guān)重要的嗦哆。而要實(shí)現(xiàn)異步,則需要用到 RxJava 的另一個概念:Scheduler婿滓。

3.線程控制 —— Scheduler (一)

在不指定線程的情況下老速, RxJava 遵循的是線程不變的原則,即:在哪個線程調(diào)用 subscribe()凸主,就在哪個線程生產(chǎn)事件橘券;在哪個線程生產(chǎn)事件,就在哪個線程消費(fèi)事件。如果需要切換線程旁舰,就需要用到 Scheduler (調(diào)度器)锋华。

  1. Scheduler 的 API (一)
    在RxJava 中,Scheduler ——調(diào)度器鬓梅,相當(dāng)于線程控制器供置,RxJava 通過它來指定每一段代碼應(yīng)該運(yùn)行在什么樣的線程谨湘。RxJava 已經(jīng)內(nèi)置了幾個 Scheduler 绽快,它們已經(jīng)適合大多數(shù)的使用場景:
  • Schedulers.immediate(): 直接在當(dāng)前線程運(yùn)行,相當(dāng)于不指定線程紧阔。這是默認(rèn)的 Scheduler坊罢。
  • Schedulers.newThread(): 總是啟用新線程,并在新線程執(zhí)行操作擅耽。
  • Schedulers.io(): I/O 操作(讀寫文件活孩、讀寫數(shù)據(jù)庫、網(wǎng)絡(luò)信息交互等)所使用的 Scheduler乖仇。行為模式和 newThread() 差不多憾儒,區(qū)別在于 io() 的內(nèi)部實(shí)現(xiàn)是是用一個無數(shù)量上限的線程池,可以重用空閑的線程乃沙,因此多數(shù)情況下 io() 比 newThread() 更有效率起趾。不要把計算工作放在 io() 中,可以避免創(chuàng)建不必要的線程警儒。
  • Schedulers.computation(): 計算所使用的 Scheduler训裆。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作蜀铲,例如圖形的計算边琉。這個 Scheduler 使用的固定的線程池,大小為 CPU 核數(shù)记劝。不要把 I/O 操作放在 computation() 中变姨,否則 I/O 操作的等待時間會浪費(fèi) CPU。
  • 另外厌丑, Android 還有一個專用的 AndroidSchedulers.mainThread()定欧,它指定的操作將在 Android 主線程運(yùn)行。

有了這幾個 Scheduler 蹄衷,就可以使用 subscribeOn() 和 observeOn() 兩個方法來對線程進(jìn)行控制了忧额。 * subscribeOn(): 指定 subscribe() 所發(fā)生的線程,即 Observable.OnSubscribe 被激活時所處的線程愧口∧婪或者叫做事件產(chǎn)生的線程。 * observeOn(): 指定 Subscriber 所運(yùn)行在的線程⊥邢或者叫做事件消費(fèi)的線程巩检。

文字?jǐn)⑹隹倸w難理解,上代碼:

Observable.just(1, 2, 3, 4)
    .subscribeOn(Schedulers.io()) // 指定 subscribe() 發(fā)生在 IO 線程
    .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調(diào)發(fā)生在主線程
    .subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer number) {
            Log.d(tag, "number:" + number);
        }
    });

上面這段代碼中示启,由于 subscribeOn(Schedulers.io()) 的指定兢哭,被創(chuàng)建的事件的內(nèi)容 1、2夫嗓、3迟螺、4 將會在 IO 線程發(fā)出;而由于 observeOn(AndroidScheculers.mainThread()) 的指定舍咖,因此 subscriber 數(shù)字的打印將發(fā)生在主線程 矩父。事實(shí)上,這種在 subscribe() 之前寫上兩句 subscribeOn(Scheduler.io()) 和 observeOn(AndroidSchedulers.mainThread()) 的使用方式非常常見排霉,它適用于多數(shù)的 『后臺線程取數(shù)據(jù)窍株,主線程顯示』的程序策略。

而前面提到的由圖片 id 取得圖片并顯示的例子攻柠,如果也加上這兩句:

int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe<Drawable>() {
    @Override
    public void call(Subscriber<? super Drawable> subscriber) {
        Drawable drawable = getTheme().getDrawable(drawableRes));
        subscriber.onNext(drawable);
        subscriber.onCompleted();
    }
})
.subscribeOn(Schedulers.io()) // 指定 subscribe() 發(fā)生在 IO 線程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調(diào)發(fā)生在主線程
.subscribe(new Observer<Drawable>() {
    @Override
    public void onNext(Drawable drawable) {
        imageView.setImageDrawable(drawable);
    }

    @Override
    public void onCompleted() {
    }

    @Override
    public void onError(Throwable e) {
        Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
    }
});

那么球订,加載圖片將會發(fā)生在 IO 線程,而設(shè)置圖片則被設(shè)定在了主線程瑰钮。這就意味著冒滩,即使加載圖片耗費(fèi)了幾十甚至幾百毫秒的時間,也不會造成絲毫界面的卡頓飞涂。

  1. Scheduler 的原理 (一)
    RxJava 的 Scheduler API 很方便旦部,也很神奇(加了一句話就把線程切換了,怎么做到的较店?而且 subscribe() 不是最外層直接調(diào)用的方法嗎士八,它竟然也能被指定線程?)梁呈。然而 Scheduler 的原理需要放在后面講婚度,因?yàn)樗脑硎且韵乱还?jié)《變換》的原理作為基礎(chǔ)的。

好吧這一節(jié)其實(shí)我屁也沒說官卡,只是為了讓你安心蝗茁,讓你知道我不是忘了講原理,而是把它放在了更合適的地方寻咒。

4. 變換

終于要到牛逼的地方了哮翘,不管你激動不激動,反正我是激動了毛秘。

RxJava 提供了對事件序列進(jìn)行變換的支持饭寺,這是它的核心功能之一阻课,也是大多數(shù)人說『RxJava 真是太好用了』的最大原因。所謂變換艰匙,就是將事件序列中的對象或整個序列進(jìn)行加工處理限煞,轉(zhuǎn)換成不同的事件或事件序列。概念說著總是模糊難懂的员凝,來看 API署驻。

  1. API
    首先看一個map()的例子:
Observable.just("images/logo.png") // 輸入類型 String
    .map(new Func1<String, Bitmap>() {
        @Override
        public Bitmap call(String filePath) { // 參數(shù)類型 String
            return getBitmapFromPath(filePath); // 返回類型 Bitmap
        }
    })
    .subscribe(new Action1<Bitmap>() {
        @Override
        public void call(Bitmap bitmap) { // 參數(shù)類型 Bitmap
            showBitmap(bitmap);
        }
    });

這里出現(xiàn)了一個叫做 Func1 的類。它和 Action1 非常相似健霹,也是 RxJava 的一個接口旺上,用于包裝含有一個參數(shù)的方法。 Func1 和 Action 的區(qū)別在于骤公, Func1 包裝的是有返回值的方法抚官。另外扬跋,和 ActionX 一樣阶捆, FuncX 也有多個,用于不同參數(shù)個數(shù)的方法钦听。FuncX 和 ActionX 的區(qū)別在 FuncX 包裝的是有返回值的方法洒试。

可以看到,map() 方法將參數(shù)中的 String 對象轉(zhuǎn)換成一個 Bitmap 對象后返回朴上,而在經(jīng)過 map() 方法后垒棋,事件的參數(shù)類型也由 String 轉(zhuǎn)為了 Bitmap。這種直接變換對象并返回的痪宰,是最常見的也最容易理解的變換叼架。不過 RxJava 的變換遠(yuǎn)不止這樣,它不僅可以針對事件對象衣撬,還可以針對整個事件隊(duì)列乖订,這使得 RxJava 變得非常靈活。我列舉幾個常用的變換:

map(): 事件對象的直接變換具练,具體功能上面已經(jīng)介紹過乍构。它是 RxJava 最常用的變換。 map() 的示意圖:


map() 示意圖

flatMap(): 這是一個很有用但非常難理解的變換扛点,因此我決定花多些篇幅來介紹它哥遮。 首先假設(shè)這么一種需求:假設(shè)有一個數(shù)據(jù)結(jié)構(gòu)『學(xué)生』,現(xiàn)在需要打印出一組學(xué)生的名字陵究。實(shí)現(xiàn)方式很簡單:

Student[] students = ...;
Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onNext(String name) {
        Log.d(tag, name);
    }
    ...
};
Observable.from(students)
    .map(new Func1<Student, String>() {
        @Override
        public String call(Student student) {
            return student.getName();
        }
    })
    .subscribe(subscriber);

很簡單眠饮。那么再假設(shè):如果要打印出每個學(xué)生所需要修的所有課程的名稱呢?(需求的區(qū)別在于铜邮,每個學(xué)生只有一個名字仪召,但卻有多個課程。)首先可以這樣實(shí)現(xiàn):

Student[] students = ...;
Subscriber<Student> subscriber = new Subscriber<Student>() {
    @Override
    public void onNext(Student student) {
        List<Course> courses = student.getCourses();
        for (int i = 0; i < courses.size(); i++) {
            Course course = courses.get(i);
            Log.d(tag, course.getName());
        }
    }
    ...
};
Observable.from(students)
    .subscribe(subscriber);

依然很簡單。那么如果我不想在 Subscriber 中使用 for 循環(huán)返咱,而是希望 Subscriber 中直接傳入單個的 Course 對象呢(這對于代碼復(fù)用很重要)钥庇?用 map() 顯然是不行的,因?yàn)?map() 是一對一的轉(zhuǎn)化咖摹,而我現(xiàn)在的要求是一對多的轉(zhuǎn)化评姨。那怎么才能把一個 Student 轉(zhuǎn)化成多個 Course 呢?

這個時候萤晴,就需要用 flatMap() 了:

Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
    @Override
    public void onNext(Course course) {
        Log.d(tag, course.getName());
    }
    ...
};
Observable.from(students)
    .flatMap(new Func1<Student, Observable<Course>>() {
        @Override
        public Observable<Course> call(Student student) {
            return Observable.from(student.getCourses());
        }
    })
    .subscribe(subscriber);

從上面的代碼可以看出吐句, flatMap() 和 map() 有一個相同點(diǎn):它也是把傳入的參數(shù)轉(zhuǎn)化之后返回另一個對象。但需要注意店读,和 map() 不同的是嗦枢, flatMap() 中返回的是個 Observable 對象,并且這個 Observable 對象并不是被直接發(fā)送到了 Subscriber 的回調(diào)方法中屯断。 flatMap() 的原理是這樣的:1. 使用傳入的事件對象創(chuàng)建一個 Observable 對象文虏;2. 并不發(fā)送這個 Observable, 而是將它激活,于是它開始發(fā)送事件殖演;3. 每一個創(chuàng)建出來的 Observable 發(fā)送的事件氧秘,都被匯入同一個 Observable ,而這個 Observable 負(fù)責(zé)將這些事件統(tǒng)一交給 Subscriber 的回調(diào)方法趴久。這三個步驟丸相,把事件拆成了兩級,通過一組新創(chuàng)建的 Observable 將初始的對象『鋪平』之后通過統(tǒng)一路徑分發(fā)了下去彼棍。而這個『鋪平』就是 flatMap() 所謂的 flat灭忠。

flatMap() 示意圖:

flatMap() 示意圖

擴(kuò)展:由于可以在嵌套的 Observable 中添加異步代碼, flatMap() 也常用于嵌套的異步操作座硕,例如嵌套的網(wǎng)絡(luò)請求弛作。示例代碼(Retrofit + RxJava):

networkClient.token() // 返回 Observable<String>,在訂閱時請求 token坎吻,并在響應(yīng)后發(fā)送 token
    .flatMap(new Func1<String, Observable<Messages>>() {
        @Override
        public Observable<Messages> call(String token) {
            // 返回 Observable<Messages>缆蝉,在訂閱時請求消息列表,并在響應(yīng)后發(fā)送請求到的消息列表
            return networkClient.messages();
        }
    })
    .subscribe(new Action1<Messages>() {
        @Override
        public void call(Messages messages) {
            // 處理顯示消息列表
            showMessages(messages);
        }
    });

傳統(tǒng)的嵌套請求需要使用嵌套的 Callback 來實(shí)現(xiàn)瘦真。而通過flatMap()
刊头,可以把嵌套的請求寫在一條鏈中,從而保持程序邏輯的清晰诸尽。

  • throttleFirst(): 在每次事件觸發(fā)后的一定時間間隔內(nèi)丟棄新的事件原杂。常用作去抖動過濾,例如按鈕的點(diǎn)擊監(jiān)聽器: RxView.clickEvents(button) // RxBinding 代碼您机,后面的文章有解釋 .throttleFirst(500, TimeUnit.MILLISECONDS) // 設(shè)置防抖間隔為 500ms .subscribe(subscriber); 媽媽再也不怕我的用戶手抖點(diǎn)開兩個重復(fù)的界面啦穿肄。

此外年局, RxJava 還提供很多便捷的方法來實(shí)現(xiàn)事件序列的變換,這里就不一一舉例了咸产。

  1. 變換的原理:lift()
    這些變換雖然功能各有不同矢否,但實(shí)質(zhì)上都是針對事件序列的處理和再發(fā)送。而在 RxJava 的內(nèi)部脑溢,它們是基于同一個基礎(chǔ)的變換方法: lift(Operator)僵朗。首先看一下 lift() 的內(nèi)部實(shí)現(xiàn)(僅核心代碼):
// 注意:這不是 lift() 的源碼,而是將源碼中與性能屑彻、兼容性验庙、擴(kuò)展性有關(guān)的代碼剔除后的核心代碼。
// 如果需要看源碼社牲,可以去 RxJava 的 GitHub 倉庫下載粪薛。
public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
    return Observable.create(new OnSubscribe<R>() {
        @Override
        public void call(Subscriber subscriber) {
            Subscriber newSubscriber = operator.call(subscriber);
            newSubscriber.onStart();
            onSubscribe.call(newSubscriber);
        }
    });
}

這段代碼很有意思:它生成了一個新的 Observable 并返回,而且創(chuàng)建新 Observable 所用的參數(shù) OnSubscribe 的回調(diào)方法 call() 中的實(shí)現(xiàn)竟然看起來和前面講過的 Observable.subscribe() 一樣搏恤!然而它們并不一樣喲~不一樣的地方關(guān)鍵就在于第二行 onSubscribe.call(subscriber) 中的 onSubscribe 所指代的對象不同(高能預(yù)警:接下來的幾句話可能會導(dǎo)致身體的嚴(yán)重不適)——

  • subscribe() 中這句話的 onSubscribe 指的是 Observable 中的 onSubscribe 對象违寿,這個沒有問題,但是 lift() 之后的情況就復(fù)雜了點(diǎn)挑社。
  • 當(dāng)含有 lift() 時:
    1.lift() 創(chuàng)建了一個 Observable 后陨界,加上之前的原始 Observable,已經(jīng)有兩個 Observable 了痛阻;
    2.而同樣地,新 Observable 里的新 OnSubscribe 加上之前的原始 Observable 中的原始 OnSubscribe腮敌,也就有了兩個 OnSubscribe阱当;
    3.當(dāng)用戶調(diào)用經(jīng)過 lift() 后的 Observable 的 subscribe() 的時候,使用的是 lift() 所返回的新的 Observable 糜工,于是它所觸發(fā)的 onSubscribe.call(subscriber)弊添,也是用的新 Observable 中的新 OnSubscribe,即在 lift() 中生成的那個 OnSubscribe捌木;
    4.而這個新 OnSubscribe 的 call() 方法中的 onSubscribe 油坝,就是指的原始 Observable 中的原始 OnSubscribe ,在這個 call() 方法里刨裆,新 OnSubscribe 利用 operator.call(subscriber) 生成了一個新的 Subscriber(Operator 就是在這里澈圈,通過自己的 call() 方法將新 Subscriber 和原始 Subscriber 進(jìn)行關(guān)聯(lián),并插入自己的『變換』代碼以實(shí)現(xiàn)變換)帆啃,然后利用這個新 Subscriber 向原始 Observable 進(jìn)行訂閱瞬女。
    這樣就實(shí)現(xiàn)了 lift() 過程,有點(diǎn)像一種代理機(jī)制努潘,通過事件攔截和處理實(shí)現(xiàn)事件序列的變換诽偷。

精簡掉細(xì)節(jié)的話坤学,也可以這么說:在 Observable 執(zhí)行了 lift(Operator) 方法之后,會返回一個新的 Observable报慕,這個新的 Observable 會像一個代理一樣深浮,負(fù)責(zé)接收原始的 Observable 發(fā)出的事件,并在處理后發(fā)送給 Subscriber眠冈。

如果你更喜歡具象思維略号,可以看圖:

lift() 原理圖

或者可以看動圖:


lift 原理動圖

兩次和多次的lift()同理,如下圖:


兩次 lift

舉一個具體的 Operator 的實(shí)現(xiàn)洋闽。下面這是一個將事件中的 Integer 對象轉(zhuǎn)換成 String 的例子玄柠,僅供參考:

observable.lift(new Observable.Operator<String, Integer>() {
    @Override
    public Subscriber<? super Integer> call(final Subscriber<? super String> subscriber) {
        // 將事件序列中的 Integer 對象轉(zhuǎn)換為 String 對象
        return new Subscriber<Integer>() {
            @Override
            public void onNext(Integer integer) {
                subscriber.onNext("" + integer);
            }

            @Override
            public void onCompleted() {
                subscriber.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                subscriber.onError(e);
            }
        };
    }
});

講述 lift() 的原理只是為了讓你更好地了解 RxJava ,從而可以更好地使用它诫舅。然而不管你是否理解了 lift() 的原理羽利,RxJava 都不建議開發(fā)者自定義 Operator 來直接使用 lift(),而是建議盡量使用已有的 lift() 包裝方法(如 map() flatMap() 等)進(jìn)行組合來實(shí)現(xiàn)需求刊懈,因?yàn)橹苯邮褂?lift() 非常容易發(fā)生一些難以發(fā)現(xiàn)的錯誤这弧。

  1. compose: 對 Observable 整體的變換

除了 lift() 之外, Observable 還有一個變換方法叫做 compose(Transformer)虚汛。它和 lift() 的區(qū)別在于匾浪, lift() 是針對事件項(xiàng)和事件序列的,而 compose() 是針對 Observable 自身進(jìn)行變換卷哩。舉個例子蛋辈,假設(shè)在程序中有多個 Observable ,并且他們都需要應(yīng)用一組相同的 lift() 變換将谊。你可以這么寫:

observable1
    .lift1()
    .lift2()
    .lift3()
    .lift4()
    .subscribe(subscriber1);
observable2
    .lift1()
    .lift2()
    .lift3()
    .lift4()
    .subscribe(subscriber2);
observable3
    .lift1()
    .lift2()
    .lift3()
    .lift4()
    .subscribe(subscriber3);
observable4
    .lift1()
    .lift2()
    .lift3()
    .lift4()
    .subscribe(subscriber1);

你覺得這樣太不軟件工程了冷溶,于是你改成了這樣:

private Observable liftAll(Observable observable) {
    return observable
        .lift1()
        .lift2()
        .lift3()
        .lift4();
}
...
liftAll(observable1).subscribe(subscriber1);
liftAll(observable2).subscribe(subscriber2);
liftAll(observable3).subscribe(subscriber3);
liftAll(observable4).subscribe(subscriber4);

可讀性、可維護(hù)性都提高了尊浓〕哑担可是 Observable 被一個方法包起來,這種方式對于 Observale 的靈活性似乎還是增添了那么點(diǎn)限制栋齿。怎么辦苗胀?這個時候,就應(yīng)該用 compose() 來解決了:

public class LiftAllTransformer implements Observable.Transformer<Integer, String> {
    @Override
    public Observable<String> call(Observable<Integer> observable) {
        return observable
            .lift1()
            .lift2()
            .lift3()
            .lift4();
    }
}
...
Transformer liftAll = new LiftAllTransformer();
observable1.compose(liftAll).subscribe(subscriber1);
observable2.compose(liftAll).subscribe(subscriber2);
observable3.compose(liftAll).subscribe(subscriber3);
observable4.compose(liftAll).subscribe(subscriber4);

像上面這樣瓦堵,使用 compose() 方法基协,Observable 可以利用傳入的 Transformer 對象的 call 方法直接對自身進(jìn)行處理,也就不必被包在方法的里面了谷丸。

compose() 的原理比較簡單堡掏,不附圖嘍

5. 線程控制:Scheduler (二)

除了靈活的變換,RxJava 另一個牛逼的地方刨疼,就是線程的自由控制泉唁。

  1. Scheduler 的 API (二)

前面講到了鹅龄,可以利用 subscribeOn() 結(jié)合 observeOn() 來實(shí)現(xiàn)線程控制,讓事件的產(chǎn)生和消費(fèi)發(fā)生在不同的線程亭畜“缧荩可是在了解了 map() flatMap() 等變換方法后,有些好事的(其實(shí)就是當(dāng)初剛接觸 RxJava 時的我)就問了:能不能多切換幾次線程拴鸵?

答案是:能玷坠。因?yàn)?observeOn() 指定的是 Subscriber 的線程,而這個 Subscriber 并不是(嚴(yán)格說應(yīng)該為『不一定是』劲藐,但這里不妨理解為『不是』)subscribe() 參數(shù)中的 Subscriber 八堡,而是 observeOn() 執(zhí)行時的當(dāng)前 Observable 所對應(yīng)的 Subscriber ,即它的直接下級 Subscriber 聘芜。換句話說宰译,observeOn() 指定的是它之后的操作所在的線程伤靠。因此如果有多次切換線程的需求础嫡,只要在每個想要切換線程的位置調(diào)用一次 observeOn() 即可君账。上代碼:

Observable.just(1, 2, 3, 4) // IO 線程,由 subscribeOn() 指定
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.newThread())
    .map(mapOperator) // 新線程瞎饲,由 observeOn() 指定
    .observeOn(Schedulers.io())
    .map(mapOperator2) // IO 線程口叙,由 observeOn() 指定
    .observeOn(AndroidSchedulers.mainThread) 
    .subscribe(subscriber);  // Android 主線程,由 observeOn() 指定

如上嗅战,通過 observeOn() 的多次調(diào)用妄田,程序?qū)崿F(xiàn)了線程的多次切換。

不過仗哨,不同于 observeOn() 形庭, subscribeOn() 的位置放在哪里都可以,但它是只能調(diào)用一次的厌漂。

又有好事的(其實(shí)還是當(dāng)初的我)問了:如果我非要調(diào)用多次 subscribeOn() 呢?會有什么效果斟珊?

這個問題先放著苇倡,我們還是從 RxJava 線程控制的原理說起吧。

  1. Scheduler 的原理(二)

其實(shí)囤踩, subscribeOn() 和 observeOn() 的內(nèi)部實(shí)現(xiàn)旨椒,也是用的 lift()。具體看圖(不同顏色的箭頭表示不同的線程):

subscribeOn() 原理圖:

subscribeOn() 原理

observeOn()原理圖:

Paste_Image.png

從圖中可以看出堵漱,subscribeOn() 和 observeOn() 都做了線程切換的工作(圖中的 "schedule..." 部位)综慎。不同的是, subscribeOn() 的線程切換發(fā)生在 OnSubscribe 中勤庐,即在它通知上一級 OnSubscribe 時示惊,這時事件還沒有開始發(fā)送好港,因此 subscribeOn() 的線程控制可以從事件發(fā)出的開端就造成影響;而 observeOn() 的線程切換則發(fā)生在它內(nèi)建的 Subscriber 中米罚,即發(fā)生在它即將給下一級 Subscriber 發(fā)送事件時钧汹,因此 observeOn() 控制的是它后面的線程。

最后录择,我用一張圖來解釋當(dāng)多個 subscribeOn() 和 observeOn() 混合使用時拔莱,線程調(diào)度是怎么發(fā)生的(由于圖中對象較多,相對于上面的圖對結(jié)構(gòu)做了一些簡化調(diào)整):


線程控制綜合調(diào)用

圖中共有 5 處含有對事件的操作隘竭。由圖中可以看出塘秦,①和②兩處受第一個 subscribeOn() 影響,運(yùn)行在紅色線程动看;③和④處受第一個 observeOn() 的影響尊剔,運(yùn)行在綠色線程;⑤處受第二個 onserveOn() 影響弧圆,運(yùn)行在紫色線程赋兵;而第二個 subscribeOn() ,由于在通知過程中線程就被第一個 subscribeOn() 截斷搔预,因此對整個流程并沒有任何影響霹期。這里也就回答了前面的問題:當(dāng)使用了多個 subscribeOn() 的時候,只有第一個 subscribeOn() 起作用拯田。

  1. 延伸:doOnSubscribe()
    然而历造,雖然超過一個的 subscribeOn() 對事件處理的流程沒有影響,但在流程之前卻是可以利用的船庇。

在前面講 Subscriber 的時候吭产,提到過 Subscriber 的 onStart() 可以用作流程開始前的初始化。然而 onStart() 由于在 subscribe() 發(fā)生時就被調(diào)用了鸭轮,因此不能指定線程臣淤,而是只能執(zhí)行在 subscribe() 被調(diào)用時的線程。這就導(dǎo)致如果 onStart() 中含有對線程有要求的代碼(例如在界面上顯示一個 ProgressBar窃爷,這必須在主線程執(zhí)行)邑蒋,將會有線程非法的風(fēng)險,因?yàn)橛袝r你無法預(yù)測 subscribe() 將會在什么線程執(zhí)行按厘。

而與 Subscriber.onStart() 相對應(yīng)的医吊,有一個方法 Observable.doOnSubscribe() 。它和 Subscriber.onStart() 同樣是在 subscribe() 調(diào)用后而且在事件發(fā)送前執(zhí)行逮京,但區(qū)別在于它可以指定線程卿堂。默認(rèn)情況下, doOnSubscribe() 執(zhí)行在 subscribe() 發(fā)生的線程懒棉;而如果在 doOnSubscribe() 之后有 subscribeOn() 的話草描,它將執(zhí)行在離它最近的 subscribeOn() 所指定的線程览绿。

示例代碼:

Observable.create(onSubscribe)
    .subscribeOn(Schedulers.io())
    .doOnSubscribe(new Action0() {
        @Override
        public void call() {
            progressBar.setVisibility(View.VISIBLE); // 需要在主線程執(zhí)行
        }
    })
    .subscribeOn(AndroidSchedulers.mainThread()) // 指定主線程
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(subscriber);

如上,在 doOnSubscribe()的后面跟一個 subscribeOn() 陶珠,就能指定準(zhǔn)備工作的線程了挟裂。

RxJava 的適用場景和使用方式

1. 與 Retrofit 的結(jié)合

Retrofit 是 Square 的一個著名的網(wǎng)絡(luò)請求庫。沒有用過 Retrofit 的可以選擇跳過這一小節(jié)也沒關(guān)系揍诽,我舉的每種場景都只是個例子诀蓉,而且例子之間并無前后關(guān)聯(lián),只是個拋磚引玉的作用暑脆,所以你跳過這里看別的場景也可以的渠啤。

Retrofit 除了提供了傳統(tǒng)的 Callback 形式的 API,還有 RxJava 版本的 Observable 形式 API添吗。下面我用對比的方式來介紹 Retrofit 的 RxJava 版 API 和傳統(tǒng)版本的區(qū)別沥曹。

以獲取一個 User 對象的接口作為例子。使用Retrofit 的傳統(tǒng) API碟联,你可以用這樣的方式來定義請求:

@GET("/user")
public void getUser(@Query("userId") String userId, Callback<User> callback);

在程序的構(gòu)建過程中妓美, Retrofit 會把自動把方法實(shí)現(xiàn)并生成代碼,然后開發(fā)者就可以利用下面的方法來獲取特定用戶并處理響應(yīng):

getUser(userId, new Callback<User>() {
    @Override
    public void success(User user) {
        userView.setUser(user);
    }

    @Override
    public void failure(RetrofitError error) {
        // Error handling
        ...
    }
};

而使用 RxJava 形式的 API鲤孵,定義同樣的請求是這樣的:

@GET("/user")
public Observable<User> getUser(@Query("userId") String userId);

使用的時候是這樣的:

getUser(userId)
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<User>() {
        @Override
        public void onNext(User user) {
            userView.setUser(user);
        }

        @Override
        public void onCompleted() {
        }

        @Override
        public void onError(Throwable error) {
            // Error handling
            ...
        }
    });

看到區(qū)別了嗎壶栋?

當(dāng) RxJava 形式的時候,Retrofit 把請求封裝進(jìn) Observable 普监,在請求結(jié)束后調(diào)用 onNext() 或在請求失敗后調(diào)用 onError()贵试。

對比來看, Callback 形式和 Observable 形式長得不太一樣凯正,但本質(zhì)都差不多毙玻,而且在細(xì)節(jié)上 Observable 形式似乎還比 Callback 形式要差點(diǎn)。那 Retrofit 為什么還要提供 RxJava 的支持呢廊散?

因?yàn)樗糜冒桑滩。倪@個例子看不出來是因?yàn)檫@只是最簡單的情況允睹。而一旦情景復(fù)雜起來施符, Callback 形式馬上就會開始讓人頭疼。比如:

假設(shè)這么一種情況:你的程序取到的 User 并不應(yīng)該直接顯示擂找,而是需要先與數(shù)據(jù)庫中的數(shù)據(jù)進(jìn)行比對和修正后再顯示。使用 Callback 方式大概可以這么寫:

getUser(userId, new Callback<User>() {
    @Override
    public void success(User user) {
        processUser(user); // 嘗試修正 User 數(shù)據(jù)
        userView.setUser(user);
    }

    @Override
    public void failure(RetrofitError error) {
        // Error handling
        ...
    }
};

有問題嗎浩销?

很簡便贯涎,但不要這樣做。為什么慢洋?因?yàn)檫@樣做會影響性能塘雳。數(shù)據(jù)庫的操作很重陆盘,一次讀寫操作花費(fèi) 10~20ms 是很常見的,這樣的耗時很容易造成界面的卡頓败明。所以通常情況下隘马,如果可以的話一定要避免在主線程中處理數(shù)據(jù)庫。所以為了提升性能妻顶,這段代碼可以優(yōu)化一下:

getUser(userId, new Callback<User>() {
    @Override
    public void success(User user) {
        new Thread() {
            @Override
            public void run() {
                processUser(user); // 嘗試修正 User 數(shù)據(jù)
                runOnUiThread(new Runnable() { // 切回 UI 線程
                    @Override
                    public void run() {
                        userView.setUser(user);
                    }
                });
            }).start();
    }

    @Override
    public void failure(RetrofitError error) {
        // Error handling
        ...
    }
};

性能問題解決酸员,但……這代碼實(shí)在是太亂了,迷之縮進(jìn)盎渲觥幔嗦!雜亂的代碼往往不僅僅是美觀問題,因?yàn)榇a越亂往往就越難讀懂沥潭,而如果項(xiàng)目中充斥著雜亂的代碼邀泉,無疑會降低代碼的可讀性,造成團(tuán)隊(duì)開發(fā)效率的降低和出錯率的升高钝鸽。

這時候汇恤,如果用 RxJava 的形式,就好辦多了拔恰。 RxJava 形式的代碼是這樣的:

getUser(userId)
    .doOnNext(new Action1<User>() {
        @Override
        public void call(User user) {
            processUser(user);
        })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<User>() {
        @Override
        public void onNext(User user) {
            userView.setUser(user);
        }

        @Override
        public void onCompleted() {
        }

        @Override
        public void onError(Throwable error) {
            // Error handling
            ...
        }
    });

后臺代碼和前臺代碼全都寫在一條鏈中因谎,明顯清晰了很多。

再舉一個例子:假設(shè) /user 接口并不能直接訪問仁连,而需要填入一個在線獲取的 token 蓝角,代碼應(yīng)該怎么寫?

Callback 方式饭冬,可以使用嵌套的 Callback:

@GET("/token")
public void getToken(Callback<String> callback);

@GET("/user")
public void getUser(@Query("token") String token, @Query("userId") String userId, Callback<User> callback);

...

getToken(new Callback<String>() {
    @Override
    public void success(String token) {
        getUser(token, userId, new Callback<User>() {
            @Override
            public void success(User user) {
                userView.setUser(user);
            }

            @Override
            public void failure(RetrofitError error) {
                // Error handling
                ...
            }
        };
    }

    @Override
    public void failure(RetrofitError error) {
        // Error handling
        ...
    }
});

倒是沒有什么性能問題使鹅,可是迷之縮進(jìn)毀一生,你懂我也懂昌抠,做過大項(xiàng)目的人應(yīng)該更懂患朱。

而使用 RxJava 的話,代碼是這樣的:

@GET("/token")
public Observable<String> getToken();

@GET("/user")
public Observable<User> getUser(@Query("token") String token, @Query("userId") String userId);

...

getToken()
    .flatMap(new Func1<String, Observable<User>>() {
        @Override
        public Observable<User> onNext(String token) {
            return getUser(token, userId);
        })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<User>() {
        @Override
        public void onNext(User user) {
            userView.setUser(user);
        }

        @Override
        public void onCompleted() {
        }

        @Override
        public void onError(Throwable error) {
            // Error handling
            ...
        }
    });

用一個 flatMap()
就搞定了邏輯炊苫,依然是一條鏈裁厅。看著就很爽侨艾,是吧执虹?
2016/03/31 更新,加上我寫的一個 Sample 項(xiàng)目: rengwuxian RxJava Samples
好唠梨,Retrofit 部分就到這里袋励。

2. RxBinding

RxBinding 是 Jake Wharton 的一個開源庫,它提供了一套在 Android 平臺上的基于 RxJava 的 Binding API。所謂 Binding茬故,就是類似設(shè)置OnClickListener
盖灸、設(shè)置 TextWatcher
這樣的注冊綁定對象的 API。
舉個設(shè)置點(diǎn)擊監(jiān)聽的例子磺芭。使用 RxBinding
赁炎,可以把事件監(jiān)聽用這樣的方法來設(shè)置:

Button button = ...;
RxView.clickEvents(button) // 以 Observable 形式來反饋點(diǎn)擊事件
    .subscribe(new Action1<ViewClickEvent>() {
        @Override
        public void call(ViewClickEvent event) {
            // Click handling
        }
    });

看起來除了形式變了沒什么區(qū)別,實(shí)質(zhì)上也是這樣钾腺。甚至如果你看一下它的源碼徙垫,你會發(fā)現(xiàn)它連實(shí)現(xiàn)都沒什么驚喜:它的內(nèi)部是直接用一個包裹著的 setOnClickListener() 來實(shí)現(xiàn)的。然而垮庐,僅僅這一個形式的改變松邪,卻恰好就是 RxBinding 的目的:擴(kuò)展性。通過 RxBinding 把點(diǎn)擊監(jiān)聽轉(zhuǎn)換成 Observable 之后哨查,就有了對它進(jìn)行擴(kuò)展的可能逗抑。擴(kuò)展的方式有很多,根據(jù)需求而定寒亥。一個例子是前面提到過的 throttleFirst() 邮府,用于去抖動,也就是消除手抖導(dǎo)致的快速連環(huán)點(diǎn)擊:

RxView.clickEvents(button)
    .throttleFirst(500, TimeUnit.MILLISECONDS)
    .subscribe(clickAction);

如果想對 RxBinding
有更多了解溉奕,可以去它的 GitHub 項(xiàng)目 下面看看褂傀。

3. 各種異步操作

前面舉的 Retrofit 和 RxBinding 的例子,是兩個可以提供現(xiàn)成的 Observable 的庫加勤。而如果你有某些異步操作無法用這些庫來自動生成 Observable仙辟,也完全可以自己寫。例如數(shù)據(jù)庫的讀寫鳄梅、大圖片的載入叠国、文件壓縮/解壓等各種需要放在后臺工作的耗時操作,都可以用 RxJava 來實(shí)現(xiàn)戴尸,有了之前幾章的例子粟焊,這里應(yīng)該不用再舉例了。

4. RxBus

RxBus 名字看起來像一個庫孙蒙,但它并不是一個庫项棠,而是一種模式,它的思想是使用 RxJava 來實(shí)現(xiàn)了 EventBus 挎峦,而讓你不再需要使用Otto
或者 GreenRobot 的 EventBus
香追。至于什么是 RxBus,可以看這篇文章坦胶。順便說一句翅阵,F(xiàn)lipboard 已經(jīng)用 RxBus 替換掉了 Otto
歪玲,目前為止沒有不良反應(yīng)。

最后

對于 Android 開發(fā)者來說掷匠, RxJava 是一個很難上手的庫,因?yàn)樗鼘τ?Android 開發(fā)者來說有太多陌生的概念了岖圈《镉铮可是它真的很牛逼。因此蜂科,我寫了這篇《給 Android 開發(fā)者的 RxJava 詳解》顽决,希望能給始終搞不明白什么是 RxJava 的人一些入門的指引,或者能讓正在使用 RxJava 但仍然心存疑惑的人看到一些更深入的解析导匣。無論如何才菠,只要能給各位同為 Android 工程師的你們提供一些幫助,這篇文章的目的就達(dá)到了贡定。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末赋访,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子缓待,更是在濱河造成了極大的恐慌蚓耽,老刑警劉巖,帶你破解...
    沈念sama閱讀 207,113評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件旋炒,死亡現(xiàn)場離奇詭異步悠,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)瘫镇,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,644評論 2 381
  • 文/潘曉璐 我一進(jìn)店門鼎兽,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人铣除,你說我怎么就攤上這事谚咬。” “怎么了通孽?”我有些...
    開封第一講書人閱讀 153,340評論 0 344
  • 文/不壞的土叔 我叫張陵序宦,是天一觀的道長。 經(jīng)常有香客問我背苦,道長互捌,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,449評論 1 279
  • 正文 為了忘掉前任行剂,我火速辦了婚禮秕噪,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘厚宰。我一直安慰自己腌巾,他們只是感情好遂填,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,445評論 5 374
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著澈蝙,像睡著了一般吓坚。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上灯荧,一...
    開封第一講書人閱讀 49,166評論 1 284
  • 那天礁击,我揣著相機(jī)與錄音,去河邊找鬼逗载。 笑死哆窿,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的厉斟。 我是一名探鬼主播挚躯,決...
    沈念sama閱讀 38,442評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼擦秽!你這毒婦竟也來了码荔?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,105評論 0 261
  • 序言:老撾萬榮一對情侶失蹤号涯,失蹤者是張志新(化名)和其女友劉穎目胡,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體链快,經(jīng)...
    沈念sama閱讀 43,601評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡誉己,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,066評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了域蜗。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片巨双。...
    茶點(diǎn)故事閱讀 38,161評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖霉祸,靈堂內(nèi)的尸體忽然破棺而出筑累,到底是詐尸還是另有隱情,我是刑警寧澤丝蹭,帶...
    沈念sama閱讀 33,792評論 4 323
  • 正文 年R本政府宣布慢宗,位于F島的核電站,受9級特大地震影響奔穿,放射性物質(zhì)發(fā)生泄漏镜沽。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,351評論 3 307
  • 文/蒙蒙 一贱田、第九天 我趴在偏房一處隱蔽的房頂上張望缅茉。 院中可真熱鬧,春花似錦男摧、人聲如沸蔬墩。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,352評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽拇颅。三九已至奏司,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間蔬蕊,已是汗流浹背结澄。 一陣腳步聲響...
    開封第一講書人閱讀 31,584評論 1 261
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留岸夯,地道東北人。 一個月前我還...
    沈念sama閱讀 45,618評論 2 355
  • 正文 我出身青樓们妥,卻偏偏與公主長得像猜扮,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子监婶,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,916評論 2 344

推薦閱讀更多精彩內(nèi)容

  • 在正文開始之前的最后旅赢,放上GitHub鏈接和引入依賴的gradle代碼: Github: https://gith...
    蘇蘇說zz閱讀 673評論 0 2
  • 來自于:CSDNblog.csdn.net/caihongdao123/article/details/51897...
    于加澤閱讀 1,256評論 0 5
  • 最近很長一段時間煮盼,我失眠了!是工作壓力太大導(dǎo)致的結(jié)果带污。我的生意做的并不是很大僵控,但還是要負(fù)責(zé)任的!上游的廠家和供應(yīng)商...
    一埝閱讀 195評論 0 1
  • 大能容天下鱼冀,小不納微塵报破。 堅(jiān)硬如磐石,柔軟若絲絹千绪。 一笑三冬暖充易,一怒十歲寒。 一諾抵千金荸型,變幻瞬息間盹靴。 莫說參不透...
    苔花也會綻放閱讀 325評論 3 4
  • 【幸福少年鄂佳云飛小學(xué)生鄭州堅(jiān)持分享的64天2017年11月5日】 今天媽媽晚上做了好...
    大同行者閱讀 298評論 0 0