RxJava詳解(一)

RxJava詳解(一)

年初的時候就想學(xué)習(xí)下RxJava然后寫一些RxJava的教程驮樊,無奈發(fā)現(xiàn)已經(jīng)年底了墙牌,然而我還么有寫轩娶。今天有點時間疑务,特別是發(fā)布了RxJava 2.0后沾凄,我決定動筆開始梗醇。

現(xiàn)在RxJava變的越來越流行了,很多項目中都使用了它撒蟀。特別是大神JakeWharton等的加入叙谨,以及RxBinding、Retrofit保屯、RxLifecycle等眾多項目的手负,然開發(fā)越來越方便,但是上手比較難姑尺,不過一旦你入門后你就會發(fā)現(xiàn)真是太棒了竟终。

在介紹RxJava之前,感覺有必要說一下什么是函數(shù)響應(yīng)式編程(FRP)切蟋?

函數(shù)響應(yīng)式編程(FRP)為解決現(xiàn)代編程問題提供了全新的視角统捶。一旦理解它,可以極大地簡化你的項目柄粹,特別是處理嵌套回調(diào)的異步事件喘鸟,復(fù)雜的列表過濾和變換,或者時間相關(guān)問題驻右,而且RxJava是響應(yīng)式編程的一個具體實現(xiàn)什黑。

這里以一個真實的例子來開始講解函數(shù)響應(yīng)式編程怎么提高我們代碼的可讀性。我們的任務(wù)是通過查詢GitHubAPI旺入, 首先獲取用戶列表兑凿,然后請求每個用戶的詳細(xì)信息。這個過程包括兩個web服務(wù)端點:
https://api.github.com/users-獲取用戶列表茵瘾;https://api.github.com/users/{username}-獲取特定用戶的詳細(xì)信息礼华,例如https://api.github.com/users/mutexkid

普通情況下是這樣寫的:

//The "Nested Callbacks" Way
public void fetchUserDetails() {
    //first, request the users...
    mService.requestUsers(new Callback<GithubUsersResponse>() {
        @Override
        public void success(final GithubUsersResponse githubUsersResponse,
                            final Response response) {
            Timber.i(TAG, "Request Users request completed");
            final synchronized List<GithubUserDetail> githubUserDetails = new ArrayList<GithubUserDetail>();
            //next, loop over each item in the response
            for (GithubUserDetail githubUserDetail : githubUsersResponse) {
                //request a detail object for that user
                mService.requestUserDetails(githubUserDetail.mLogin,
                                            new Callback<GithubUserDetail>() {
                    @Override
                    public void success(GithubUserDetail githubUserDetail,
                                        Response response) {
                        Log.i("User Detail request completed for user : " + githubUserDetail.mLogin);
                        githubUserDetails.add(githubUserDetail);
                        if (githubUserDetails.size() == githubUsersResponse.mGithubUsers.size()) {
                            //we've downloaded'em all - notify all who are interested!
                            mBus.post(new UserDetailsLoadedCompleteEvent(githubUserDetails));
                        }
                    }

                    @Override
                    public void failure(RetrofitError error) {
                        Log.e(TAG, "Request User Detail Failed!!!!", error);
                    }
                });
            }
        }

        @Override
        public void failure(RetrofitError error) {
            Log.e(TAG, "Request User Failed!!!!", error);
        }
    });
}

盡管這不是最差的代碼-至少它是異步的拗秘,因此在等待每個請求完成的時候不會阻塞-但由于代碼復(fù)雜(增加更多層次的回調(diào)代碼復(fù)雜度將呈指數(shù)級增長)因此遠非理想的代碼圣絮。
當(dāng)我們不可避免要修改代碼時(在前面的web service調(diào)用中,我們依賴前一次的回調(diào)狀態(tài)雕旨,因此它不適用于模塊化或者修改要傳遞給下一個回調(diào)的數(shù)據(jù))也遠非容易的工作扮匠。
我們親切的稱這種情況為“回調(diào)地獄”。

而通過RxJava的方式:

public void rxFetchUserDetails() {
    //request the users
    mService.rxRequestUsers().concatMap(Observable::from)
    .concatMap((GithubUser githubUser) ->
                    //request the details for each user
                    mService.rxRequestUserDetails(githubUser.mLogin)
    )
    //accumulate them as a list
    .toList()
    //define which threads information will be passed on
    .subscribeOn(Schedulers.newThread())
    .observeOn(AndroidSchedulers.mainThread())
    //post them on an eventbus
    .subscribe(githubUserDetails -> {
        EventBus.getDefault().post(new UserDetailsLoadedCompleteEvent(githubUserDetails));
    });
}

如你所見凡涩,使用函數(shù)響應(yīng)式編程模型我們完全擺脫了回調(diào)棒搜,并最終得到了更短小的程序。讓我們從函數(shù)響應(yīng)式編程的基本定義開始慢慢解釋到底發(fā)生了什么活箕,并逐漸理解上面的代碼力麸,這些代碼托管在GitHub上面。

從根本上講,函數(shù)響應(yīng)式編程是在觀察者模式的基礎(chǔ)上克蚂,增加對Observables發(fā)送的數(shù)據(jù)流進行操縱和變換的功能闺鲸。

RxJava簡介

在介紹RxJava之前先說一下RxRx的全稱是Reactive Extensions埃叭,直譯過來就是響應(yīng)式擴展摸恍。

rx_logo.png

Rx基于觀察者模式葫掉,它是一種編程模型存筏,目標(biāo)是提供一致的編程接口,幫助開發(fā)者更方便的處理異步數(shù)據(jù)流穆咐。ReactiveX.io給的定義是益缎,Rx是一個使用可觀察數(shù)據(jù)流進行異步編程的編程接口谜慌,ReactiveX結(jié)合了觀察者模式、迭代器模式和函數(shù)式編程的精華莺奔。Rx已經(jīng)滲透到了各個語言中欣范,有了Rx所以才有了RxJavaRx.NET令哟、RxJS恼琼、RxSwiftRx.rb屏富、RxPHP等等晴竞,

這里先列舉一下相關(guān)的官網(wǎng):

RxJavaGitHub上的介紹是:a library for composing asynchronous and event-based programs by using observable sequences for the Java VM.
翻譯過來也就是一個基于事件和程序在Java VM上使用可觀測的序列來組成異步的庫。RxJava的本質(zhì)就是一個實現(xiàn)異步操作的庫狠半,它的優(yōu)勢就是簡潔噩死,隨著程序邏輯變得越來越復(fù)雜,它依然能夠保持簡潔神年。

其實一句話總結(jié)一下RxJava的作用就是:異步

這里可能會有人想不就是個異步嗎已维,至于辣么矯情么?用AsyncTaskHandler甚至自定義一個BigAsyncTask分分鐘搞定已日。

但是RxJava的好處是簡潔垛耳。異步操作很關(guān)鍵的一點是程序的簡潔性,因為在調(diào)度過程比較復(fù)雜的情況下飘千,異步代碼經(jīng)常會既難寫也難被讀懂堂鲜。 Android創(chuàng)造的AsyncTaskHandler其實都是為了讓異步代碼更加簡潔。雖然RxJava的優(yōu)勢也是簡潔护奈,但它的簡潔的與眾不同之處在于缔莲,隨著程序邏輯變得越來越復(fù)雜,它依然能夠保持簡潔霉旗。

擴展的觀察者模式

RxJava的異步實現(xiàn)酌予,是通過一種擴展的觀察者模式來實現(xiàn)的磺箕。

觀察者模式面向的需求是:A對象(觀察者)對B對象(被觀察者)的某種變化高度敏感,需要在B變化的一瞬間做出反應(yīng)抛虫。舉個例子,新聞里喜聞樂見的警察抓小偷简僧,警察需要在小偷伸手作案的時候?qū)嵤┳ゲ督ㄒT谶@個例子里,警察是觀察者岛马,小偷是被觀察者棉姐,警察需要時刻盯著小偷的一舉一動,才能保證不會漏過任何瞬間啦逆。程序的觀察者模式和這種真正的『觀察』略有不同伞矩,觀察者不需要時刻盯著被觀察者(例如A不需要每過2ms就檢查一次B的狀態(tài)),而是采用注冊(Register)或者稱為訂閱(Subscribe)的方式夏志,告訴被觀察者:我需要你的某某狀態(tài)乃坤,你要在它變化的時候通知我。 Android開發(fā)中一個比較典型的例子是點擊監(jiān)聽器OnClickListener沟蔑。對設(shè)置OnClickListener來說湿诊,View是被觀察者,OnClickListener是觀察者瘦材,二者通過 setOnClickListener()方法達成訂閱關(guān)系厅须。訂閱之后用戶點擊按鈕的瞬間,Android Framework就會將點擊事件發(fā)送給已經(jīng)注冊的OnClickListener食棕。采取這樣被動的觀察方式朗和,既省去了反復(fù)檢索狀態(tài)的資源消耗,也能夠得到最高的反饋速度簿晓。當(dāng)然眶拉,這也得益于我們可以隨意定制自己程序中的觀察者和被觀察者,而警察叔叔明顯無法要求小偷『你在作案的時候務(wù)必通知我』抢蚀。

OnClickListener的模式大致如下圖:

btn_onclick.jpg

如圖所示镀层,通過setOnClickListener()方法,Button持有OnClickListener的引用(這一過程沒有在圖上畫出)皿曲;當(dāng)用戶點擊時唱逢,Button自動調(diào)用OnClickListeneronClick() 方法。另外屋休,如果把這張圖中的概念抽象出來(Button -> 被觀察者坞古、OnClickListener -> 觀察者、setOnClickListener() -> 訂閱劫樟,onClick() -> 事件)痪枫,就由專用的觀察者模式(例如只用于監(jiān)聽控件點擊)轉(zhuǎn)變成了通用的觀察者模式织堂。如下圖:

btn_rxjava_observable.jpg

RxJava作為一個工具庫,使用的就是通用形式的觀察者模式奶陈。

RxJava的觀察者模式

RxJava的基本概念:

  • Observable(可觀察者易阳,即被觀察者):產(chǎn)生事件,例如去飯店吃飯的顧客吃粒。
  • Observer(觀察者):接收事件潦俺,并給出響應(yīng)動作,例如去飯店吃飯的廚房徐勃,會接受事件事示,并給出相應(yīng)。
  • subscribe()(訂閱):連接被觀察者與觀察者僻肖,例如去飯店吃飯的服務(wù)員肖爵。
    ObservableObserver通過subscribe() 方法實現(xiàn)訂閱關(guān)系,從而Observable可以在需要的時候發(fā)出事件來通知Observer臀脏。
  • Event(事件):被觀察者與觀察者溝通的載體劝堪,例如顧客點的菜。

與傳統(tǒng)觀察者模式不同谁榜,RxJava的事件回調(diào)方法除了普通事件onNext()(相當(dāng)于onClick()/onEvent())之外幅聘,還定義了兩個特殊的事件:onCompleted()onError():
但是RxJava與傳統(tǒng)的觀察者設(shè)計模式有一點明顯不同,那就是如果一個Observerble沒有任何的的Subscriber窃植,那么這個Observable是不會發(fā)出任何事件的帝蒿。

  • onCompleted(): 事件隊列完結(jié)。
    RxJava不僅把每個事件單獨處理巷怜,還會把它們看做一個隊列葛超。RxJava規(guī)定,當(dāng)不會再有新的onNext()發(fā)出時延塑,需要觸發(fā)onCompleted() 方法作為標(biāo)志绣张。
  • onError(): 事件隊列異常。
    在事件處理過程中出異常時关带,onError()會被觸發(fā)侥涵,同時隊列自動終止,不允許再有事件發(fā)出宋雏。
  • 在一個正確運行的事件序列中, onCompleted()onError()有且只有一個芜飘,并且是事件序列中的最后一個。需要注意的是onCompleted()onError() 二者也是互斥的磨总,即在隊列中調(diào)用了其中一個嗦明,就不應(yīng)該再調(diào)用另一個。

RxJava的觀察者模式大致如下圖:

rxjava_observer_1.jpg

基本實現(xiàn)

基于上面的概念蚪燕, RxJava的基本實現(xiàn)主要有三點:

  • 創(chuàng)建Observable
    Observable即被觀察者娶牌,它決定什么時候觸發(fā)事件以及觸發(fā)怎樣的事件奔浅。 RxJava使用Observable.create()方法來創(chuàng)建一個Observable,并為它定義事件觸發(fā)規(guī)則诗良。

  • 創(chuàng)建Observer即觀察者汹桦,它決定事件觸發(fā)的時候?qū)⒂性鯓拥男袨椤?br> RxJava中的Observer接口的實現(xiàn)方式:

    Observer<String> observer = new Observer<String>() {
        @Override
        public void onNext(String s) {
            Log.d(tag, "Item: " + s);
        }
    
        @Override
        public void onCompleted() {
            Log.d(tag, "Completed!");
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(tag, "Error!");
        }
    };
    

    除了Observer接口之外,RxJava還內(nèi)置了一個實現(xiàn)了Observer的抽象類:Subscriber鉴裹。SubscriberObserver接口進行了一些擴展营勤,但他們的基本使用方式是完全一樣的。

    Subscriber<String> subscriber = new Subscriber<String>() {
        @Override
        public void onNext(String s) {
            Log.d(tag, "Item: " + s);
        }
    
        @Override
        public void onCompleted() {
            Log.d(tag, "Completed!");
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(tag, "Error!");
        }
    };
    

    不僅基本使用方式一樣壹罚,實質(zhì)上,在RxJavasubscribe()過程中寿羞,Observer也總是會先被轉(zhuǎn)換成一個Subscriber再使用猖凛。所以如果你只想使用基本功能,選擇ObserverSubscriber是完全一樣的绪穆。它們的區(qū)別對于使用者來說主要有兩點:

    • onStart(): 這是Subscriber增加的方法辨泳。它會在subscribe()剛開始而事件還未發(fā)送之前被調(diào)用,可以用于做一些準(zhǔn)備工作玖院,例如數(shù)據(jù)的清零或重置菠红。這是一個可選方法,默認(rèn)情況下它的實現(xiàn)為空难菌。需要注意的是试溯,如果對準(zhǔn)備工作的線程有要求(例如彈出一個顯示進度的對話框,這必須在主線程執(zhí)行)郊酒, onStart()就不適用了遇绞,因為它總是在subscribe() 所發(fā)生的線程被調(diào)用,而不能指定線程燎窘。要在指定的線程來做準(zhǔn)備工作摹闽,可以使用doOnSubscribe()方法,具體可以在后面的文中看到褐健。
    • unsubscribe(): 這是Subscriber所實現(xiàn)的另一個接口Subscription的方法付鹿,用于取消訂閱。在這個方法被調(diào)用后蚜迅,Subscriber 將不再接收事件舵匾。一般在這個方法調(diào)用前,可以使用isUnsubscribed()先判斷一下狀態(tài)慢叨。 unsubscribe()這個方法很重要纽匙,因為在subscribe()之后,Observable會持有 Subscriber的引用,這個引用如果不能及時被釋放拍谐,將有內(nèi)存泄露的風(fēng)險烛缔。所以最好保持一個原則:要在不再使用的時候盡快在合適的地方(例如onPause()馏段、onStop()等方法中)調(diào)用unsubscribe()來解除引用關(guān)系,以避免內(nèi)存泄露的發(fā)生践瓷。
      所以后續(xù)講解時我們有時會用Subscriber來代替Observer院喜。
  • 調(diào)用subscribe()方法(訂閱)

    創(chuàng)建了一個ObservableObserver之后,再用subscribe()方法將它們聯(lián)結(jié)起來晕翠,整條鏈子就可以工作了喷舀。代碼形式很簡單:

    observable.subscribe(observer);  
    // 或者:
    observable.subscribe(subscriber);
    

    有人可能會注意到,subscribe()這個方法有點怪:它看起來是observalbe訂閱了observer/subscriber而不是observer/subscriber訂閱了observalbe淋肾,這看起來就像『雜志訂閱了讀者』一樣顛倒了對象關(guān)系硫麻。這讓人讀起來有點別扭,不過如果把API設(shè)計成observer.subscribe(observable)/subscriber.subscribe(observable) 樊卓,雖然更加符合思維邏輯拿愧,但對流式API的設(shè)計就造成影響了,比較起來明顯是得不償失的碌尔。

RxJava入門示例

一個Observable可以發(fā)出零個或者多個事件浇辜,知道結(jié)束或者出錯。每發(fā)出一個事件唾戚,就會調(diào)用它的SubscriberonNext方法柳洋,最后調(diào)用Subscriber.onComplete()或者Subscriber.onError()結(jié)束。

Hello World

compile 'io.reactivex:rxandroid:1.2.1'
// Because RxAndroid releases are few and far between, it is recommended you also
// explicitly depend on RxJava's latest version for bug fixes and new features.
compile 'io.reactivex:rxjava:1.2.3'
// 創(chuàng)建被觀察者叹坦、數(shù)據(jù)源
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        // 可以看到熊镣,這里傳入了一個 OnSubscribe 對象作為參數(shù)。OnSubscribe 會被存儲在返回的 Observable 對象中立由,它的作用相當(dāng)于一個計劃表轧钓,當(dāng) Observable      
        //被訂閱的時候,OnSubscribe 的 call() 方法會自動被調(diào)用锐膜,事件序列就會依照設(shè)定依次觸發(fā)(對于上面的代碼毕箍,就是觀察者Subscriber 將會被調(diào)用三次 onNext() 和一次 
        // onCompleted())。這樣道盏,由被觀察者調(diào)用了觀察者的回調(diào)方法而柑,就實現(xiàn)了由被觀察者向觀察者的事件傳遞,即觀察者模式荷逞。
        Log.i("@@@", "call");
        subscriber.onNext("Hello ");
        subscriber.onNext("World !");
        subscriber.onCompleted();
    }
});
// 創(chuàng)建觀察者
Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onCompleted() {
        Log.i("@@@", "onCompleted");
    }

    @Override
    public void onError(Throwable e) {
        Log.i("@@@", "onError");
    }

    @Override
    public void onNext(String s) {
        Log.i("@@@", "onNext : " + s);
    }
};
// 關(guān)聯(lián)或者叫訂閱更合適媒咳。
observable.subscribe(subscriber);

一旦subscriber訂閱了observableobservable就會調(diào)用subscriber對象的onNextonComplete方法种远,subscriber就會打印出Hello World.

Observable.subscribe(Subscriber)的內(nèi)部實現(xiàn)是這樣的(僅核心代碼):

// 注意:這不是`subscribe()`的源碼涩澡,而是將源碼中與性能、兼容性坠敷、擴展性有關(guān)的代碼剔除后的核心代碼妙同。
public Subscription subscribe(Subscriber subscriber) {
   subscriber.onStart();
   onSubscribe.call(subscriber);
   return subscriber;
}

可以看到subscriber()做了3件事:

  • 調(diào)用Subscriber.onStart()射富。這個方法在前面已經(jīng)介紹過,是一個可選的準(zhǔn)備方法粥帚。
  • 調(diào)用Observable中的onSubscribe.call(Subscriber)胰耗。在這里,事件發(fā)送的邏輯開始運行芒涡。從這也可以看出柴灯,在RxJava中,Observable 并不是在創(chuàng)建的時候就立即開始發(fā)送事件费尽,而是在它被訂閱的時候赠群,即當(dāng)subscribe()方法執(zhí)行的時候。
  • 將傳入的Subscriber作為Subscription返回旱幼。這是為了方便unsubscribe().

整個過程中對象間的關(guān)系如下圖:

rxjava_observable_list.gif

講到這里很多人肯定會罵傻X,這TM簡潔你妹啊...乎串,這里只是個入門Hello World,真正的簡潔等你看完全部介紹后就明白了速警。

RxJava內(nèi)置了很多簡化創(chuàng)建Observable對象的函數(shù),比如Observable.just()就是用來創(chuàng)建只發(fā)出一個事件就結(jié)束的Observable對象鸯两,上面創(chuàng)建Observable對象的代碼可以簡化為一行

Observable<String> observable = Observable.just("Hello ", "World !");

接下來看看如何簡化Subscriber闷旧,上面的例子中,我們其實并不關(guān)心onComplete()onError钧唐,我們只需要在onNext的時候做一些處理忙灼,這時候就可以使用Action1類。

Action1<String> action1 = new Action1<String>() {
    @Override
    public void call(String s) {
        Log.i("@@@", "call : " + s);
    }
};

Observable.subscribe()方法有一個重載版本钝侠,接受三個Action1類型的參數(shù)

rxjava_subscribe_params.png

所以上面的代碼最終可以寫成這樣:

Observable.just("Hello ", "World !").subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
        Log.i("@@@", "call : " + s);
    }
});

這里順便多提一些subscribe()的多個Action參數(shù):

Action1<String> onNextAction = new Action1<String>() {
    // onNext()
    @Override
    public void call(String s) {
        Log.d(tag, s);
    }
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
    // onError()
    @Override
    public void call(Throwable throwable) {
        // Error handling
    }
};
Action0 onCompletedAction = new Action0() {
    // onCompleted()
    @Override
    public void call() {
        Log.d(tag, "completed");
    }
};

observable.subscribe(onNextAction, onErrorAction, onCompletedAction);

簡單解釋一下這段代碼中出現(xiàn)的Action1Action0该园。Action0RxJava的一個接口,它只有一個方法call()帅韧,這個方法是無參無返回值的里初;由于onCompleted() 方法也是無參無返回值的,因此Action0可以被當(dāng)成一個包裝對象忽舟,將onCompleted()的內(nèi)容打包起來將自己作為一個參數(shù)傳入subscribe()以實現(xiàn)不完整定義的回調(diào)双妨。這樣其實也可以看做將 onCompleted()方法作為參數(shù)傳進了subscribe(),相當(dāng)于其他某些語言中的『閉包』叮阅。Action1也是一個接口刁品,它同樣只有一個方法call(T param),這個方法也無返回值浩姥,但有一個參數(shù)挑随;與Action0同理,由于onNext(T obj)onError(Throwable error)也是單參數(shù)無返回值的勒叠,因此Action1可以將onNext(obj)onError(error)打包起來傳入subscribe()以實現(xiàn)不完整定義的回調(diào)兜挨。事實上膏孟,雖然Action0Action1API中使用最廣泛,但RxJava是提供了多個ActionX形式的接口(例如Action2, Action3)的暑劝,它們可以被用以包裝不同的無返回值的方法骆莹。

假設(shè)我們的Observable是第三方提供的,它提供了大量的用戶數(shù)據(jù)給我們担猛,而我們需要從用戶數(shù)據(jù)中篩選部分有用的信息幕垦,那我們該怎么辦呢?
Observable中去修改肯定是不現(xiàn)實的傅联?那從Subscriber中進行修改呢先改? 這樣好像是可以完成的。但是這種方式并不好蒸走,因為我們希望Subscriber越輕量越好仇奶,因為很有可能我們需要
在主線程中去執(zhí)行Subscriber。另外比驻,根據(jù)響應(yīng)式函數(shù)編程的概念该溯,Subscribers更應(yīng)該做的事情是響應(yīng),響應(yīng)Observable發(fā)出的事件别惦,而不是去修改狈茉。
那該怎么辦呢? 這就要用到下面的部分要講的操作符掸掸。

接口變化

RxJava 2.x擁有了新的特性氯庆,其依賴于4個基礎(chǔ)接口,它們分別是:

  • Publisher
  • Subscriber
  • Subscription
  • Processor
    其中最核心的莫過于PublisherSubscriber扰付。Publisher可以發(fā)出一系列的事件堤撵,而Subscriber負(fù)責(zé)和處理這些事件。

其中用的比較多的自然是PublisherFlowable羽莺,它支持背壓(backpressure)实昨。關(guān)于背壓給個簡潔的定義就是:

背壓是指在異步場景中,被觀察者發(fā)送事件速度遠快于觀察者的處理速度的情況下盐固,一種告訴上游的被觀察者降低發(fā)送速度的策略屠橄。

簡而言之,背壓是流速控制的一種策略闰挡。
其實RxJava 2.x最大的改動就是對于backpressure的處理锐墙,為此將原來的Observable拆分成了新的ObservableFlowable,同時其他相關(guān)部分也同時進行了拆分长酗。

rxjava1vs2.png

RxJava 2.x中溪北,Observable用于訂閱Observer,不再支持背壓(1.x中可以使用背壓策略),而Flowable用于訂閱Subscriber之拨,是支持背壓的茉继。

操作符(Operators)

RxJava提供了對事件序列進行變換的支持,這是它的核心功能之一.所謂變換蚀乔,就是將事件序列中的對象或整個序列進行加工處理烁竭,轉(zhuǎn)換成不同的事件或事件序列。
操作符就是為了解決對Observable對象的變換的問題吉挣,操作符用于在Observable和最終的Subscriber之間修改Observable發(fā)出的事件派撕。RxJava提供了很多很有用的操作符。
比如map操作符睬魂,就是用來把把一個事件轉(zhuǎn)換為另一個事件的终吼。

map

Returns an Observable that applies a specified function to each item emitted by the source Observable and emits the results of these function applications.

Observable<String> just = Observable.just("Hello ", "World !");
Observable<String> map = just.map(new Func1<String, String>() {
    @Override
    public String call(String s) {
        return s + "@@@";
    }
});
map.subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
        Log.i("@@@", s);
    }
});

上面的代碼打印出的結(jié)果是:

12-12 15:51:22.184 472-472/com.charon.rxjavastudydemo I/@@@: Hello @@@
12-12 15:51:22.184 472-472/com.charon.rxjavastudydemo I/@@@: World !@@@

map()操作符就是用于變換Observable對象的,map操作符返回一個Observable對象氯哮,這樣就可以實現(xiàn)鏈?zhǔn)秸{(diào)用际跪,在一個Observable對象上多次使用map操作符,最終將最簡潔的數(shù)據(jù)傳遞給Subscriber對象喉钢。

map操作符更有趣的一點是它不必返回Observable對象返回的類型姆打,你可以使用map操作符返回一個發(fā)出新的數(shù)據(jù)類型的Observable對象。
比如上面的例子中肠虽,Subscriber并不關(guān)心返回的字符串穴肘,而是想要字符串的hash值。

Observable<String> just = Observable.just("Hello ", "World !");
Observable<Integer> map = just.map(new Func1<String, Integer>() {
    @Override
    public Integer call(String s) {
        return s.hashCode();
    }
});
map.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.i("@@@", "" + integer);
    }
});

上面部分的打印結(jié)果是:

12-12 15:54:35.515 8521-8521/com.charon.rxjavastudydemo I/@@@: -2137068114
12-12 15:54:35.516 8521-8521/com.charon.rxjavastudydemo I/@@@: -1105126669

map()的示意圖:

rxjava_map.jpg

通過上面的部分我們可以得知:

  • ObservableSubscriber可以做任何事情
    Observable可以是一個數(shù)據(jù)庫查詢舔痕,Subscriber用來顯示查詢結(jié)果;Observable可以是屏幕上的點擊事件豹缀,Subscriber用來響應(yīng)點擊事件伯复;Observable可以是一個網(wǎng)絡(luò)請求,Subscriber用來顯示請求結(jié)果邢笙。

  • ObservableSubscriber是獨立于中間的變換過程的啸如。
    ObservableSubscriber中間 可以增減任何數(shù)量的map。整個系統(tǒng)是高度可組合的氮惯,操作數(shù)據(jù)是一個很簡單的過程叮雳。

flatmap

Returns an Observable that emits items based on applying a function that you supply to each item emitted by the source Observable, where that function returns an Observable, and then merging those resulting Observables and emitting the results of this merger.

flatMap()是一個很有用但非常難理解的變換,首先假設(shè)這么一種需求:假設(shè)有一個數(shù)據(jù)結(jié)構(gòu)『學(xué)生』妇汗,現(xiàn)在需要打印出一組學(xué)生的名字帘不。實現(xiàn)方式很簡單:

Student[] students = ...;
Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onNext(String name) {
        Log.d(tag, name);
    }
    ...
};
Observable.from(students)
    .map(new Func1<Student, String>() {
        @Override
        public String call(Student student) {
            return student.getName();
        }
    })
    .subscribe(subscriber);

很簡單。那么再假設(shè):如果要打印出每個學(xué)生所需要修的所有課程的名稱呢?(需求的區(qū)別在于杨箭,每個學(xué)生只有一個名字寞焙,但卻有多個課程)首先可以這樣實現(xiàn):

Student[] students = ...;
Subscriber<Student> subscriber = new Subscriber<Student>() {
    @Override
    public void onNext(Student student) {
        List<Course> courses = student.getCourses();
        for (int i = 0; i < courses.size(); i++) {
            Course course = courses.get(i);
            Log.d(tag, course.getName());
        }
    }
    ...
};
Observable.from(students)
    .subscribe(subscriber);

依然很簡單。那么如果我不想在Subscriber中使用for循環(huán),而是希望Subscriber中直接傳入單個的Course對象呢(這對于代碼復(fù)用很重要),用map()顯然是不行的捣郊,因為map() 是一對一的轉(zhuǎn)化辽狈,而我現(xiàn)在的要求是一對多的轉(zhuǎn)化。那怎么才能把一個Student轉(zhuǎn)化成多個Course呢呛牲?

這個時候刮萌,就需要用flatMap()了:

Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
    @Override
    public void onNext(Course course) {
        Log.d(tag, course.getName());
    }
    ...
};
Observable.from(students)
    .flatMap(new Func1<Student, Observable<Course>>() {
        @Override
        public Observable<Course> call(Student student) {
            return Observable.from(student.getCourses());
        }
    })
    .subscribe(subscriber);

mapflatmap在功能上是一致的,它也是把傳入的參數(shù)轉(zhuǎn)化之后返回另一個對象。區(qū)別在于flatmap是通過中間Observable來進行娘扩,而map是直接執(zhí)行.flatMap()中返回的是個 Observable對象着茸,并且這個Observable對象并不是被直接發(fā)送到了Subscriber的回調(diào)方法中。

flatMap()的原理是這樣的:

  • 使用傳入的事件對象創(chuàng)建一個Observable對象
  • 并不發(fā)送這個Observable而是將它激活畜侦,于是它開始發(fā)送事件
  • 每一個創(chuàng)建出來的Observable發(fā)送的事件元扔,都被匯入同一個Observable,而這個Observable負(fù)責(zé)將這些事件統(tǒng)一交給Subscriber 的回調(diào)方法。

這三個步驟旋膳,把事件拆成了兩級澎语,通過一組新創(chuàng)建的Observable將初始的對象『鋪平』之后通過統(tǒng)一路徑分發(fā)了下去。而這個『鋪平』就是flatMap()所謂的flat验懊。

flatMap()就是根據(jù)你的規(guī)則擅羞,將Observable轉(zhuǎn)換之后再發(fā)射出去,注意最后的順序很可能是錯亂的义图,如果要保證順序的一致性减俏,要使用concatMap()
由于可以在嵌套的Observable中添加異步代碼flatMap()也常用于嵌套的異步操作,例如嵌套的網(wǎng)絡(luò)請求(Retrofit + RxJava)

flatMap()示意圖:

rxjava_flatmap.jpg

throttleFirst()

在每次事件觸發(fā)后的一定時間間隔內(nèi)丟棄新的事件碱工。常用作去抖動過濾娃承。
例如按鈕的點擊監(jiān)聽器:

RxView.clickEvents(button); // RxBinding 代碼`
    .throttleFirst(500, TimeUnit.MILLISECONDS) // 設(shè)置防抖間隔為 500ms 
    .subscribe(subscriber); // 媽媽再也不怕我的用戶手抖點開兩個重復(fù)的界面啦。

from

convert various other objects and data types into Observables

from()接收一個集合作為輸入怕篷,然后每次輸出一個元素給subscriber.

List<String> s = Arrays.asList("Java", "Android", "Ruby", "Ios", "Swift");
Observable.from(s).subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
        Log.i("@@@", s);
    }
});

filter

返回滿足過濾條件的數(shù)據(jù)历筝。

Observable.from(new Integer[]{1, 2, 3, 4, 5, 6, 7, 8, 9})
                .filter(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {
                        return integer < 5;
                    }
                })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.i("@@@", "integer=" + integer); //1,2,3,4
                    }
                });

timer

Timer會在指定時間后發(fā)射一個數(shù)字0,該操作符運行在Computation Scheduler廊谓。

Observable.timer(3, TimeUnit.SECONDS).observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                        Log.i("@@@", "aLong=" + aLong); // 延時3s
                    }
                });

interval

創(chuàng)建一個按固定時間間隔發(fā)射整數(shù)序列的Observable.
interval默認(rèn)在computation調(diào)度器上執(zhí)行梳猪。

Observable.interval(1, TimeUnit.SECONDS, AndroidSchedulers.mainThread())
        .subscribe(new Action1<Long>() {
            @Override
            public void call(Long aLong) {
                Log.i("@@@", "aLong=" + aLong); //從0遞增,間隔1s 0,1,2,3....
            }
        });

Repeat

重復(fù)執(zhí)行

doOnNext

其實覺得doOnNext應(yīng)該不算一個操作符蒸痹,但考慮到其常用性春弥,我們還是咬咬牙將它放在了這里。它的作用是讓訂閱者在接收到數(shù)據(jù)之前干點有意思的事情叠荠。假如我們在獲取到數(shù)據(jù)之前想先保存一下它匿沛,無疑我們可以這樣實現(xiàn)。

distinct

這個操作符非常的簡單榛鼎、通俗俺祠、易懂公给,就是簡單的去重

take

take,接受一個long型參數(shù)count蜘渣,代表至多接收count個數(shù)據(jù)淌铐。

等等...就不繼續(xù)介紹了,到時候查下文檔就好了蔫缸。

是不是感覺沒什么卵用腿准,也稀里糊涂的,下面用一個網(wǎng)絡(luò)請求的例子:

很多時候我們在使用RxJava的時候總是和Retrofit進行結(jié)合使用拾碌,而為了方便演示吐葱,這里我們就暫且采用OkHttp3進行演示,配合map校翔,doOnNext弟跑,線程切換進行簡單的網(wǎng)絡(luò)請求:

  • 通過Observable.create()方法,調(diào)用OkHttp網(wǎng)絡(luò)請求防症;
  • 通過map操作符集合gson孟辑,將Response轉(zhuǎn)換為bean類;
  • 通過doOnNext()方法蔫敲,解析bean中的數(shù)據(jù)饲嗽,并進行數(shù)據(jù)庫存儲等操作;
  • 調(diào)度線程奈嘿,在子線程中進行耗時操作任務(wù)貌虾,在主線程中更新UI
  • 通過subscribe()裙犹,根據(jù)請求成功或者失敗來更新UI尽狠。
Observable.create(new ObservableOnSubscribe<Response>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Response> e) throws Exception {
        Builder builder = new Builder()
                .url(mUrl)
                .get();
        Request request = builder.build();
        Call call = new OkHttpClient().newCall(request);
        Response response = call.execute();
        e.onNext(response);
    }
}).map(new Function<Response, MobileAddress>() {
    @Override
    public MobileAddress apply(@NonNull Response response) throws Exception {

        Log.e(TAG, "map 線程:" + Thread.currentThread().getName() + "\n");
        if (response.isSuccessful()) {
            ResponseBody body = response.body();
            if (body != null) {
                Log.e(TAG, "map:轉(zhuǎn)換前:" + response.body());
                return new Gson().fromJson(body.string(), MobileAddress.class);
            }
        }
        return null;
    }
}).observeOn(AndroidSchedulers.mainThread())
    .doOnNext(new Consumer<MobileAddress>() {
        @Override
        public void accept(@NonNull MobileAddress s) throws Exception {
            Log.e(TAG, "doOnNext 線程:" + Thread.currentThread().getName() + "\n");
            mRxOperatorsText.append("\ndoOnNext 線程:" + Thread.currentThread().getName() + "\n");
            Log.e(TAG, "doOnNext: 保存成功:" + s.toString() + "\n");
            mRxOperatorsText.append("doOnNext: 保存成功:" + s.toString() + "\n");

        }
    }).subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<MobileAddress>() {
            @Override
            public void accept(@NonNull MobileAddress data) throws Exception {
                Log.e(TAG, "subscribe 線程:" + Thread.currentThread().getName() + "\n");
                mRxOperatorsText.append("\nsubscribe 線程:" + Thread.currentThread().getName() + "\n");
                Log.e(TAG, "成功:" + data.toString() + "\n");
                mRxOperatorsText.append("成功:" + data.toString() + "\n");
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(@NonNull Throwable throwable) throws Exception {
                Log.e(TAG, "subscribe 線程:" + Thread.currentThread().getName() + "\n");
                mRxOperatorsText.append("\nsubscribe 線程:" + Thread.currentThread().getName() + "\n");

                Log.e(TAG, "失敗:" + throwable.getMessage() + "\n");
                mRxOperatorsText.append("失斠镀浴:" + throwable.getMessage() + "\n");
            }
        });

更多內(nèi)容請看下一篇文章RxJava詳解(二)

參考:


最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末袄膏,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子盗似,更是在濱河造成了極大的恐慌,老刑警劉巖平项,帶你破解...
    沈念sama閱讀 221,198評論 6 514
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件赫舒,死亡現(xiàn)場離奇詭異,居然都是意外死亡闽瓢,警方通過查閱死者的電腦和手機接癌,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,334評論 3 398
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來扣讼,“玉大人缺猛,你說我怎么就攤上這事。” “怎么了荔燎?”我有些...
    開封第一講書人閱讀 167,643評論 0 360
  • 文/不壞的土叔 我叫張陵耻姥,是天一觀的道長。 經(jīng)常有香客問我有咨,道長琐簇,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,495評論 1 296
  • 正文 為了忘掉前任座享,我火速辦了婚禮婉商,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘渣叛。我一直安慰自己丈秩,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 68,502評論 6 397
  • 文/花漫 我一把揭開白布淳衙。 她就那樣靜靜地躺著蘑秽,像睡著了一般。 火紅的嫁衣襯著肌膚如雪滤祖。 梳的紋絲不亂的頭發(fā)上筷狼,一...
    開封第一講書人閱讀 52,156評論 1 308
  • 那天,我揣著相機與錄音匠童,去河邊找鬼埂材。 笑死,一個胖子當(dāng)著我的面吹牛汤求,可吹牛的內(nèi)容都是我干的俏险。 我是一名探鬼主播,決...
    沈念sama閱讀 40,743評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼扬绪,長吁一口氣:“原來是場噩夢啊……” “哼竖独!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起挤牛,我...
    開封第一講書人閱讀 39,659評論 0 276
  • 序言:老撾萬榮一對情侶失蹤莹痢,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后墓赴,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體竞膳,經(jīng)...
    沈念sama閱讀 46,200評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,282評論 3 340
  • 正文 我和宋清朗相戀三年诫硕,在試婚紗的時候發(fā)現(xiàn)自己被綠了坦辟。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片暴区。...
    茶點故事閱讀 40,424評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡蚣抗,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出术徊,到底是詐尸還是另有隱情,我是刑警寧澤挪蹭,帶...
    沈念sama閱讀 36,107評論 5 349
  • 正文 年R本政府宣布亭饵,位于F島的核電站,受9級特大地震影響嚣潜,放射性物質(zhì)發(fā)生泄漏冬骚。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,789評論 3 333
  • 文/蒙蒙 一懂算、第九天 我趴在偏房一處隱蔽的房頂上張望只冻。 院中可真熱鬧,春花似錦计技、人聲如沸喜德。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,264評論 0 23
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽舍悯。三九已至,卻和暖如春睡雇,著一層夾襖步出監(jiān)牢的瞬間萌衬,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,390評論 1 271
  • 我被黑心中介騙來泰國打工它抱, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留秕豫,地道東北人。 一個月前我還...
    沈念sama閱讀 48,798評論 3 376
  • 正文 我出身青樓观蓄,卻偏偏與公主長得像混移,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子侮穿,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,435評論 2 359

推薦閱讀更多精彩內(nèi)容