RxJava詳解(一)
年初的時候就想學(xué)習(xí)下RxJava
然后寫一些RxJava
的教程驮樊,無奈發(fā)現(xiàn)已經(jīng)年底了墙牌,然而我還么有寫轩娶。今天有點時間疑务,特別是發(fā)布了RxJava 2.0
后沾凄,我決定動筆開始梗醇。
現(xiàn)在RxJava
變的越來越流行了,很多項目中都使用了它撒蟀。特別是大神JakeWharton
等的加入叙谨,以及RxBinding、Retrofit保屯、RxLifecycle
等眾多項目的手负,然開發(fā)越來越方便,但是上手比較難姑尺,不過一旦你入門后你就會發(fā)現(xiàn)真是太棒了竟终。
在介紹RxJava
之前,感覺有必要說一下什么是函數(shù)響應(yīng)式編程(FRP
)切蟋?
函數(shù)響應(yīng)式編程(FRP
)為解決現(xiàn)代編程問題提供了全新的視角统捶。一旦理解它,可以極大地簡化你的項目柄粹,特別是處理嵌套回調(diào)的異步事件喘鸟,復(fù)雜的列表過濾和變換,或者時間相關(guān)問題驻右,而且RxJava
是響應(yīng)式編程的一個具體實現(xiàn)什黑。
這里以一個真實的例子來開始講解函數(shù)響應(yīng)式編程怎么提高我們代碼的可讀性。我們的任務(wù)是通過查詢GitHub
的API
旺入, 首先獲取用戶列表兑凿,然后請求每個用戶的詳細(xì)信息。這個過程包括兩個web
服務(wù)端點:
https://api.github.com/users
-獲取用戶列表茵瘾;https://api.github.com/users/{username}
-獲取特定用戶的詳細(xì)信息礼华,例如https://api.github.com/users/mutexkid
。
普通情況下是這樣寫的:
//The "Nested Callbacks" Way
public void fetchUserDetails() {
//first, request the users...
mService.requestUsers(new Callback<GithubUsersResponse>() {
@Override
public void success(final GithubUsersResponse githubUsersResponse,
final Response response) {
Timber.i(TAG, "Request Users request completed");
final synchronized List<GithubUserDetail> githubUserDetails = new ArrayList<GithubUserDetail>();
//next, loop over each item in the response
for (GithubUserDetail githubUserDetail : githubUsersResponse) {
//request a detail object for that user
mService.requestUserDetails(githubUserDetail.mLogin,
new Callback<GithubUserDetail>() {
@Override
public void success(GithubUserDetail githubUserDetail,
Response response) {
Log.i("User Detail request completed for user : " + githubUserDetail.mLogin);
githubUserDetails.add(githubUserDetail);
if (githubUserDetails.size() == githubUsersResponse.mGithubUsers.size()) {
//we've downloaded'em all - notify all who are interested!
mBus.post(new UserDetailsLoadedCompleteEvent(githubUserDetails));
}
}
@Override
public void failure(RetrofitError error) {
Log.e(TAG, "Request User Detail Failed!!!!", error);
}
});
}
}
@Override
public void failure(RetrofitError error) {
Log.e(TAG, "Request User Failed!!!!", error);
}
});
}
盡管這不是最差的代碼-至少它是異步的拗秘,因此在等待每個請求完成的時候不會阻塞-但由于代碼復(fù)雜(增加更多層次的回調(diào)代碼復(fù)雜度將呈指數(shù)級增長)因此遠非理想的代碼圣絮。
當(dāng)我們不可避免要修改代碼時(在前面的web service
調(diào)用中,我們依賴前一次的回調(diào)狀態(tài)雕旨,因此它不適用于模塊化或者修改要傳遞給下一個回調(diào)的數(shù)據(jù))也遠非容易的工作扮匠。
我們親切的稱這種情況為“回調(diào)地獄”。
而通過RxJava
的方式:
public void rxFetchUserDetails() {
//request the users
mService.rxRequestUsers().concatMap(Observable::from)
.concatMap((GithubUser githubUser) ->
//request the details for each user
mService.rxRequestUserDetails(githubUser.mLogin)
)
//accumulate them as a list
.toList()
//define which threads information will be passed on
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
//post them on an eventbus
.subscribe(githubUserDetails -> {
EventBus.getDefault().post(new UserDetailsLoadedCompleteEvent(githubUserDetails));
});
}
如你所見凡涩,使用函數(shù)響應(yīng)式編程模型我們完全擺脫了回調(diào)棒搜,并最終得到了更短小的程序。讓我們從函數(shù)響應(yīng)式編程的基本定義開始慢慢解釋到底發(fā)生了什么活箕,并逐漸理解上面的代碼力麸,這些代碼托管在GitHub
上面。
從根本上講,函數(shù)響應(yīng)式編程是在觀察者模式的基礎(chǔ)上克蚂,增加對Observables
發(fā)送的數(shù)據(jù)流進行操縱和變換的功能闺鲸。
RxJava
簡介
在介紹RxJava
之前先說一下Rx
。Rx
的全稱是Reactive Extensions
埃叭,直譯過來就是響應(yīng)式擴展摸恍。
Rx
基于觀察者模式葫掉,它是一種編程模型存筏,目標(biāo)是提供一致的編程接口,幫助開發(fā)者更方便的處理異步數(shù)據(jù)流穆咐。ReactiveX.io
給的定義是益缎,Rx
是一個使用可觀察數(shù)據(jù)流進行異步編程的編程接口谜慌,ReactiveX
結(jié)合了觀察者模式、迭代器模式和函數(shù)式編程的精華莺奔。Rx
已經(jīng)滲透到了各個語言中欣范,有了Rx
所以才有了RxJava
、Rx.NET
令哟、RxJS
恼琼、RxSwift
、Rx.rb
屏富、RxPHP
等等晴竞,
這里先列舉一下相關(guān)的官網(wǎng):
RxJava
在GitHub
上的介紹是:a library for composing asynchronous and event-based programs by using observable sequences for the Java VM.
翻譯過來也就是一個基于事件和程序在Java VM
上使用可觀測的序列來組成異步的庫。RxJava
的本質(zhì)就是一個實現(xiàn)異步操作的庫狠半,它的優(yōu)勢就是簡潔噩死,隨著程序邏輯變得越來越復(fù)雜,它依然能夠保持簡潔神年。
其實一句話總結(jié)一下RxJava
的作用就是:異步
這里可能會有人想不就是個異步嗎已维,至于辣么矯情么?用AsyncTask
、Handler
甚至自定義一個BigAsyncTask
分分鐘搞定已日。
但是RxJava
的好處是簡潔垛耳。異步操作很關(guān)鍵的一點是程序的簡潔性,因為在調(diào)度過程比較復(fù)雜的情況下飘千,異步代碼經(jīng)常會既難寫也難被讀懂堂鲜。 Android
創(chuàng)造的AsyncTask
和Handler
其實都是為了讓異步代碼更加簡潔。雖然RxJava
的優(yōu)勢也是簡潔护奈,但它的簡潔的與眾不同之處在于缔莲,隨著程序邏輯變得越來越復(fù)雜,它依然能夠保持簡潔霉旗。
擴展的觀察者模式
RxJava
的異步實現(xiàn)酌予,是通過一種擴展的觀察者模式來實現(xiàn)的磺箕。
觀察者模式面向的需求是:A
對象(觀察者)對B
對象(被觀察者)的某種變化高度敏感,需要在B
變化的一瞬間做出反應(yīng)抛虫。舉個例子,新聞里喜聞樂見的警察抓小偷简僧,警察需要在小偷伸手作案的時候?qū)嵤┳ゲ督ㄒT谶@個例子里,警察是觀察者岛马,小偷是被觀察者棉姐,警察需要時刻盯著小偷的一舉一動,才能保證不會漏過任何瞬間啦逆。程序的觀察者模式和這種真正的『觀察』略有不同伞矩,觀察者不需要時刻盯著被觀察者(例如A
不需要每過2ms
就檢查一次B
的狀態(tài)),而是采用注冊(Register
)或者稱為訂閱(Subscribe
)的方式夏志,告訴被觀察者:我需要你的某某狀態(tài)乃坤,你要在它變化的時候通知我。 Android
開發(fā)中一個比較典型的例子是點擊監(jiān)聽器OnClickListener
沟蔑。對設(shè)置OnClickListener
來說湿诊,View
是被觀察者,OnClickListener
是觀察者瘦材,二者通過 setOnClickListener()
方法達成訂閱關(guān)系厅须。訂閱之后用戶點擊按鈕的瞬間,Android Framework
就會將點擊事件發(fā)送給已經(jīng)注冊的OnClickListener
食棕。采取這樣被動的觀察方式朗和,既省去了反復(fù)檢索狀態(tài)的資源消耗,也能夠得到最高的反饋速度簿晓。當(dāng)然眶拉,這也得益于我們可以隨意定制自己程序中的觀察者和被觀察者,而警察叔叔明顯無法要求小偷『你在作案的時候務(wù)必通知我』抢蚀。
OnClickListener
的模式大致如下圖:
如圖所示镀层,通過setOnClickListener()
方法,Button
持有OnClickListener
的引用(這一過程沒有在圖上畫出)皿曲;當(dāng)用戶點擊時唱逢,Button
自動調(diào)用OnClickListener
的onClick()
方法。另外屋休,如果把這張圖中的概念抽象出來(Button
-> 被觀察者坞古、OnClickListener
-> 觀察者、setOnClickListener()
-> 訂閱劫樟,onClick()
-> 事件)痪枫,就由專用的觀察者模式(例如只用于監(jiān)聽控件點擊)轉(zhuǎn)變成了通用的觀察者模式织堂。如下圖:
而RxJava
作為一個工具庫,使用的就是通用形式的觀察者模式奶陈。
RxJava
的觀察者模式
RxJava
的基本概念:
-
Observable
(可觀察者易阳,即被觀察者):產(chǎn)生事件,例如去飯店吃飯的顧客吃粒。 -
Observer
(觀察者):接收事件潦俺,并給出響應(yīng)動作,例如去飯店吃飯的廚房徐勃,會接受事件事示,并給出相應(yīng)。 -
subscribe()
(訂閱):連接被觀察者與觀察者僻肖,例如去飯店吃飯的服務(wù)員肖爵。
Observable
和Observer
通過subscribe()
方法實現(xiàn)訂閱關(guān)系,從而Observable
可以在需要的時候發(fā)出事件來通知Observer
臀脏。 -
Event
(事件):被觀察者與觀察者溝通的載體劝堪,例如顧客點的菜。
與傳統(tǒng)觀察者模式不同谁榜,RxJava
的事件回調(diào)方法除了普通事件onNext()
(相當(dāng)于onClick()
/onEvent()
)之外幅聘,還定義了兩個特殊的事件:onCompleted()
和onError()
:
但是RxJava
與傳統(tǒng)的觀察者設(shè)計模式有一點明顯不同,那就是如果一個Observerble
沒有任何的的Subscriber
窃植,那么這個Observable
是不會發(fā)出任何事件的帝蒿。
-
onCompleted()
: 事件隊列完結(jié)。
RxJava
不僅把每個事件單獨處理巷怜,還會把它們看做一個隊列葛超。RxJava
規(guī)定,當(dāng)不會再有新的onNext()
發(fā)出時延塑,需要觸發(fā)onCompleted()
方法作為標(biāo)志绣张。 -
onError()
: 事件隊列異常。
在事件處理過程中出異常時关带,onError()
會被觸發(fā)侥涵,同時隊列自動終止,不允許再有事件發(fā)出宋雏。 - 在一個正確運行的事件序列中,
onCompleted()
和onError()
有且只有一個芜飘,并且是事件序列中的最后一個。需要注意的是onCompleted()
和onError()
二者也是互斥的磨总,即在隊列中調(diào)用了其中一個嗦明,就不應(yīng)該再調(diào)用另一個。
RxJava
的觀察者模式大致如下圖:
基本實現(xiàn)
基于上面的概念蚪燕, RxJava
的基本實現(xiàn)主要有三點:
創(chuàng)建
Observable
Observable
即被觀察者娶牌,它決定什么時候觸發(fā)事件以及觸發(fā)怎樣的事件奔浅。RxJava
使用Observable.create()
方法來創(chuàng)建一個Observable
,并為它定義事件觸發(fā)規(guī)則诗良。-
創(chuàng)建
Observer
即觀察者汹桦,它決定事件觸發(fā)的時候?qū)⒂性鯓拥男袨椤?br>RxJava
中的Observer
接口的實現(xiàn)方式:Observer<String> observer = new Observer<String>() { @Override public void onNext(String s) { Log.d(tag, "Item: " + s); } @Override public void onCompleted() { Log.d(tag, "Completed!"); } @Override public void onError(Throwable e) { Log.d(tag, "Error!"); } };
除了
Observer
接口之外,RxJava
還內(nèi)置了一個實現(xiàn)了Observer
的抽象類:Subscriber
鉴裹。Subscriber
對Observer
接口進行了一些擴展营勤,但他們的基本使用方式是完全一樣的。Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onNext(String s) { Log.d(tag, "Item: " + s); } @Override public void onCompleted() { Log.d(tag, "Completed!"); } @Override public void onError(Throwable e) { Log.d(tag, "Error!"); } };
不僅基本使用方式一樣壹罚,實質(zhì)上,在
RxJava
的subscribe()
過程中寿羞,Observer
也總是會先被轉(zhuǎn)換成一個Subscriber
再使用猖凛。所以如果你只想使用基本功能,選擇Observer
和Subscriber
是完全一樣的绪穆。它們的區(qū)別對于使用者來說主要有兩點:-
onStart()
: 這是Subscriber
增加的方法辨泳。它會在subscribe()
剛開始而事件還未發(fā)送之前被調(diào)用,可以用于做一些準(zhǔn)備工作玖院,例如數(shù)據(jù)的清零或重置菠红。這是一個可選方法,默認(rèn)情況下它的實現(xiàn)為空难菌。需要注意的是试溯,如果對準(zhǔn)備工作的線程有要求(例如彈出一個顯示進度的對話框,這必須在主線程執(zhí)行)郊酒,onStart()
就不適用了遇绞,因為它總是在subscribe()
所發(fā)生的線程被調(diào)用,而不能指定線程燎窘。要在指定的線程來做準(zhǔn)備工作摹闽,可以使用doOnSubscribe()
方法,具體可以在后面的文中看到褐健。 -
unsubscribe
(): 這是Subscriber
所實現(xiàn)的另一個接口Subscription
的方法付鹿,用于取消訂閱。在這個方法被調(diào)用后蚜迅,Subscriber
將不再接收事件舵匾。一般在這個方法調(diào)用前,可以使用isUnsubscribed()
先判斷一下狀態(tài)慢叨。unsubscribe()
這個方法很重要纽匙,因為在subscribe()
之后,Observable
會持有Subscriber
的引用,這個引用如果不能及時被釋放拍谐,將有內(nèi)存泄露的風(fēng)險烛缔。所以最好保持一個原則:要在不再使用的時候盡快在合適的地方(例如onPause()馏段、onStop()
等方法中)調(diào)用unsubscribe()
來解除引用關(guān)系,以避免內(nèi)存泄露的發(fā)生践瓷。
所以后續(xù)講解時我們有時會用Subscriber
來代替Observer
院喜。
-
-
調(diào)用
subscribe()
方法(訂閱)創(chuàng)建了一個
Observable
和Observer
之后,再用subscribe()
方法將它們聯(lián)結(jié)起來晕翠,整條鏈子就可以工作了喷舀。代碼形式很簡單:observable.subscribe(observer); // 或者: observable.subscribe(subscriber);
有人可能會注意到,
subscribe()
這個方法有點怪:它看起來是observalbe
訂閱了observer/subscriber
而不是observer/subscriber
訂閱了observalbe
淋肾,這看起來就像『雜志訂閱了讀者』一樣顛倒了對象關(guān)系硫麻。這讓人讀起來有點別扭,不過如果把API
設(shè)計成observer.subscribe(observable)/subscriber.subscribe(observable)
樊卓,雖然更加符合思維邏輯拿愧,但對流式API
的設(shè)計就造成影響了,比較起來明顯是得不償失的碌尔。
RxJava
入門示例
一個Observable
可以發(fā)出零個或者多個事件浇辜,知道結(jié)束或者出錯。每發(fā)出一個事件唾戚,就會調(diào)用它的Subscriber
的onNext
方法柳洋,最后調(diào)用Subscriber.onComplete()
或者Subscriber.onError()
結(jié)束。
Hello World
compile 'io.reactivex:rxandroid:1.2.1'
// Because RxAndroid releases are few and far between, it is recommended you also
// explicitly depend on RxJava's latest version for bug fixes and new features.
compile 'io.reactivex:rxjava:1.2.3'
// 創(chuàng)建被觀察者叹坦、數(shù)據(jù)源
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
// 可以看到熊镣,這里傳入了一個 OnSubscribe 對象作為參數(shù)。OnSubscribe 會被存儲在返回的 Observable 對象中立由,它的作用相當(dāng)于一個計劃表轧钓,當(dāng) Observable
//被訂閱的時候,OnSubscribe 的 call() 方法會自動被調(diào)用锐膜,事件序列就會依照設(shè)定依次觸發(fā)(對于上面的代碼毕箍,就是觀察者Subscriber 將會被調(diào)用三次 onNext() 和一次
// onCompleted())。這樣道盏,由被觀察者調(diào)用了觀察者的回調(diào)方法而柑,就實現(xiàn)了由被觀察者向觀察者的事件傳遞,即觀察者模式荷逞。
Log.i("@@@", "call");
subscriber.onNext("Hello ");
subscriber.onNext("World !");
subscriber.onCompleted();
}
});
// 創(chuàng)建觀察者
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
Log.i("@@@", "onCompleted");
}
@Override
public void onError(Throwable e) {
Log.i("@@@", "onError");
}
@Override
public void onNext(String s) {
Log.i("@@@", "onNext : " + s);
}
};
// 關(guān)聯(lián)或者叫訂閱更合適媒咳。
observable.subscribe(subscriber);
一旦subscriber
訂閱了observable
,observable
就會調(diào)用subscriber
對象的onNext
和onComplete
方法种远,subscriber
就會打印出Hello World
.
Observable.subscribe(Subscriber)
的內(nèi)部實現(xiàn)是這樣的(僅核心代碼):
// 注意:這不是`subscribe()`的源碼涩澡,而是將源碼中與性能、兼容性坠敷、擴展性有關(guān)的代碼剔除后的核心代碼妙同。
public Subscription subscribe(Subscriber subscriber) {
subscriber.onStart();
onSubscribe.call(subscriber);
return subscriber;
}
可以看到subscriber()
做了3件事:
- 調(diào)用
Subscriber.onStart()
射富。這個方法在前面已經(jīng)介紹過,是一個可選的準(zhǔn)備方法粥帚。 - 調(diào)用
Observable
中的onSubscribe.call(Subscriber)
胰耗。在這里,事件發(fā)送的邏輯開始運行芒涡。從這也可以看出柴灯,在RxJava
中,Observable
并不是在創(chuàng)建的時候就立即開始發(fā)送事件费尽,而是在它被訂閱的時候赠群,即當(dāng)subscribe()
方法執(zhí)行的時候。 - 將傳入的
Subscriber
作為Subscription
返回旱幼。這是為了方便unsubscribe()
.
整個過程中對象間的關(guān)系如下圖:
講到這里很多人肯定會罵傻X
,這TM簡潔你妹啊...乎串,這里只是個入門Hello World
,真正的簡潔等你看完全部介紹后就明白了速警。
RxJava
內(nèi)置了很多簡化創(chuàng)建Observable
對象的函數(shù),比如Observable.just()
就是用來創(chuàng)建只發(fā)出一個事件就結(jié)束的Observable
對象鸯两,上面創(chuàng)建Observable
對象的代碼可以簡化為一行
Observable<String> observable = Observable.just("Hello ", "World !");
接下來看看如何簡化Subscriber
闷旧,上面的例子中,我們其實并不關(guān)心onComplete()
和onError
钧唐,我們只需要在onNext
的時候做一些處理忙灼,這時候就可以使用Action1
類。
Action1<String> action1 = new Action1<String>() {
@Override
public void call(String s) {
Log.i("@@@", "call : " + s);
}
};
Observable.subscribe()
方法有一個重載版本钝侠,接受三個Action1
類型的參數(shù)
所以上面的代碼最終可以寫成這樣:
Observable.just("Hello ", "World !").subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i("@@@", "call : " + s);
}
});
這里順便多提一些subscribe()
的多個Action
參數(shù):
Action1<String> onNextAction = new Action1<String>() {
// onNext()
@Override
public void call(String s) {
Log.d(tag, s);
}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
// onError()
@Override
public void call(Throwable throwable) {
// Error handling
}
};
Action0 onCompletedAction = new Action0() {
// onCompleted()
@Override
public void call() {
Log.d(tag, "completed");
}
};
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
簡單解釋一下這段代碼中出現(xiàn)的Action1
和Action0
该园。Action0
是RxJava
的一個接口,它只有一個方法call()
帅韧,這個方法是無參無返回值的里初;由于onCompleted()
方法也是無參無返回值的,因此Action0
可以被當(dāng)成一個包裝對象忽舟,將onCompleted()
的內(nèi)容打包起來將自己作為一個參數(shù)傳入subscribe()
以實現(xiàn)不完整定義的回調(diào)双妨。這樣其實也可以看做將 onCompleted()
方法作為參數(shù)傳進了subscribe()
,相當(dāng)于其他某些語言中的『閉包』叮阅。Action1
也是一個接口刁品,它同樣只有一個方法call(T param)
,這個方法也無返回值浩姥,但有一個參數(shù)挑随;與Action0
同理,由于onNext(T obj)
和onError(Throwable error)
也是單參數(shù)無返回值的勒叠,因此Action1
可以將onNext(obj)
和onError(error)
打包起來傳入subscribe()
以實現(xiàn)不完整定義的回調(diào)兜挨。事實上膏孟,雖然Action0
和Action1
在API
中使用最廣泛,但RxJava
是提供了多個ActionX
形式的接口(例如Action2
, Action3
)的暑劝,它們可以被用以包裝不同的無返回值的方法骆莹。
假設(shè)我們的Observable
是第三方提供的,它提供了大量的用戶數(shù)據(jù)給我們担猛,而我們需要從用戶數(shù)據(jù)中篩選部分有用的信息幕垦,那我們該怎么辦呢?
從Observable
中去修改肯定是不現(xiàn)實的傅联?那從Subscriber
中進行修改呢先改? 這樣好像是可以完成的。但是這種方式并不好蒸走,因為我們希望Subscriber
越輕量越好仇奶,因為很有可能我們需要
在主線程中去執(zhí)行Subscriber
。另外比驻,根據(jù)響應(yīng)式函數(shù)編程的概念该溯,Subscribers
更應(yīng)該做的事情是響應(yīng)
,響應(yīng)Observable
發(fā)出的事件别惦,而不是去修改狈茉。
那該怎么辦呢? 這就要用到下面的部分要講的操作符掸掸。
接口變化
RxJava 2.x
擁有了新的特性氯庆,其依賴于4個基礎(chǔ)接口,它們分別是:
Publisher
Subscriber
Subscription
-
Processor
其中最核心的莫過于Publisher
和Subscriber
扰付。Publisher
可以發(fā)出一系列的事件堤撵,而Subscriber
負(fù)責(zé)和處理這些事件。
其中用的比較多的自然是Publisher
的Flowable
羽莺,它支持背壓(backpressure
)实昨。關(guān)于背壓給個簡潔的定義就是:
背壓是指在異步場景中,被觀察者發(fā)送事件速度遠快于觀察者的處理速度的情況下盐固,一種告訴上游的被觀察者降低發(fā)送速度的策略屠橄。
簡而言之,背壓是流速控制的一種策略闰挡。
其實RxJava 2.x
最大的改動就是對于backpressure
的處理锐墙,為此將原來的Observable
拆分成了新的Observable
和Flowable
,同時其他相關(guān)部分也同時進行了拆分长酗。
在RxJava 2.x
中溪北,Observable
用于訂閱Observer
,不再支持背壓(1.x
中可以使用背壓策略),而Flowable
用于訂閱Subscriber
之拨,是支持背壓的茉继。
操作符(Operators
)
RxJava
提供了對事件序列進行變換的支持,這是它的核心功能之一.所謂變換蚀乔,就是將事件序列中的對象或整個序列進行加工處理烁竭,轉(zhuǎn)換成不同的事件或事件序列。
操作符就是為了解決對Observable
對象的變換的問題吉挣,操作符用于在Observable
和最終的Subscriber
之間修改Observable
發(fā)出的事件派撕。RxJava
提供了很多很有用的操作符。
比如map
操作符睬魂,就是用來把把一個事件轉(zhuǎn)換為另一個事件的终吼。
map
Returns an Observable that applies a specified function to each item emitted by the source Observable and emits the results of these function applications.
Observable<String> just = Observable.just("Hello ", "World !");
Observable<String> map = just.map(new Func1<String, String>() {
@Override
public String call(String s) {
return s + "@@@";
}
});
map.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i("@@@", s);
}
});
上面的代碼打印出的結(jié)果是:
12-12 15:51:22.184 472-472/com.charon.rxjavastudydemo I/@@@: Hello @@@
12-12 15:51:22.184 472-472/com.charon.rxjavastudydemo I/@@@: World !@@@
map()
操作符就是用于變換Observable
對象的,map
操作符返回一個Observable
對象氯哮,這樣就可以實現(xiàn)鏈?zhǔn)秸{(diào)用际跪,在一個Observable
對象上多次使用map操作符,最終將最簡潔的數(shù)據(jù)傳遞給Subscriber
對象喉钢。
map
操作符更有趣的一點是它不必返回Observable對象返回的類型姆打,你可以使用map
操作符返回一個發(fā)出新的數(shù)據(jù)類型的Observable
對象。
比如上面的例子中肠虽,Subscriber
并不關(guān)心返回的字符串穴肘,而是想要字符串的hash
值。
Observable<String> just = Observable.just("Hello ", "World !");
Observable<Integer> map = just.map(new Func1<String, Integer>() {
@Override
public Integer call(String s) {
return s.hashCode();
}
});
map.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.i("@@@", "" + integer);
}
});
上面部分的打印結(jié)果是:
12-12 15:54:35.515 8521-8521/com.charon.rxjavastudydemo I/@@@: -2137068114
12-12 15:54:35.516 8521-8521/com.charon.rxjavastudydemo I/@@@: -1105126669
map()
的示意圖:
通過上面的部分我們可以得知:
Observable
和Subscriber
可以做任何事情
Observable
可以是一個數(shù)據(jù)庫查詢舔痕,Subscriber
用來顯示查詢結(jié)果;Observable
可以是屏幕上的點擊事件豹缀,Subscriber
用來響應(yīng)點擊事件伯复;Observable
可以是一個網(wǎng)絡(luò)請求,Subscriber
用來顯示請求結(jié)果邢笙。Observable
和Subscriber
是獨立于中間的變換過程的啸如。
在Observable
和Subscriber
中間 可以增減任何數(shù)量的map
。整個系統(tǒng)是高度可組合的氮惯,操作數(shù)據(jù)是一個很簡單的過程叮雳。
flatmap
Returns an Observable that emits items based on applying a function that you supply to each item emitted by the source Observable, where that function returns an Observable, and then merging those resulting Observables and emitting the results of this merger.
flatMap()
是一個很有用但非常難理解的變換,首先假設(shè)這么一種需求:假設(shè)有一個數(shù)據(jù)結(jié)構(gòu)『學(xué)生』妇汗,現(xiàn)在需要打印出一組學(xué)生的名字帘不。實現(xiàn)方式很簡單:
Student[] students = ...;
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String name) {
Log.d(tag, name);
}
...
};
Observable.from(students)
.map(new Func1<Student, String>() {
@Override
public String call(Student student) {
return student.getName();
}
})
.subscribe(subscriber);
很簡單。那么再假設(shè):如果要打印出每個學(xué)生所需要修的所有課程的名稱呢?(需求的區(qū)別在于杨箭,每個學(xué)生只有一個名字寞焙,但卻有多個課程)首先可以這樣實現(xiàn):
Student[] students = ...;
Subscriber<Student> subscriber = new Subscriber<Student>() {
@Override
public void onNext(Student student) {
List<Course> courses = student.getCourses();
for (int i = 0; i < courses.size(); i++) {
Course course = courses.get(i);
Log.d(tag, course.getName());
}
}
...
};
Observable.from(students)
.subscribe(subscriber);
依然很簡單。那么如果我不想在Subscriber
中使用for
循環(huán),而是希望Subscriber
中直接傳入單個的Course
對象呢(這對于代碼復(fù)用很重要),用map()
顯然是不行的捣郊,因為map()
是一對一的轉(zhuǎn)化辽狈,而我現(xiàn)在的要求是一對多的轉(zhuǎn)化。那怎么才能把一個Student
轉(zhuǎn)化成多個Course
呢呛牲?
這個時候刮萌,就需要用flatMap()
了:
Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
@Override
public void onNext(Course course) {
Log.d(tag, course.getName());
}
...
};
Observable.from(students)
.flatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student student) {
return Observable.from(student.getCourses());
}
})
.subscribe(subscriber);
map
與flatmap
在功能上是一致的,它也是把傳入的參數(shù)轉(zhuǎn)化之后返回另一個對象。區(qū)別在于flatmap
是通過中間Observable
來進行娘扩,而map
是直接執(zhí)行.flatMap()
中返回的是個 Observable
對象着茸,并且這個Observable
對象并不是被直接發(fā)送到了Subscriber
的回調(diào)方法中。
flatMap()
的原理是這樣的:
- 使用傳入的事件對象創(chuàng)建一個
Observable
對象 - 并不發(fā)送這個
Observable
而是將它激活畜侦,于是它開始發(fā)送事件 - 每一個創(chuàng)建出來的
Observable
發(fā)送的事件元扔,都被匯入同一個Observable
,而這個Observable
負(fù)責(zé)將這些事件統(tǒng)一交給Subscriber
的回調(diào)方法。
這三個步驟旋膳,把事件拆成了兩級澎语,通過一組新創(chuàng)建的Observable
將初始的對象『鋪平』之后通過統(tǒng)一路徑分發(fā)了下去。而這個『鋪平』就是flatMap()
所謂的flat
验懊。
flatMap()
就是根據(jù)你的規(guī)則擅羞,將Observable
轉(zhuǎn)換之后再發(fā)射出去,注意最后的順序很可能是錯亂的义图,如果要保證順序的一致性减俏,要使用concatMap()
由于可以在嵌套的Observable
中添加異步代碼flatMap()
也常用于嵌套的異步操作,例如嵌套的網(wǎng)絡(luò)請求(Retrofit + RxJava)
flatMap()
示意圖:
throttleFirst()
在每次事件觸發(fā)后的一定時間間隔內(nèi)丟棄新的事件碱工。常用作去抖動過濾娃承。
例如按鈕的點擊監(jiān)聽器:
RxView.clickEvents(button); // RxBinding 代碼`
.throttleFirst(500, TimeUnit.MILLISECONDS) // 設(shè)置防抖間隔為 500ms
.subscribe(subscriber); // 媽媽再也不怕我的用戶手抖點開兩個重復(fù)的界面啦。
from
convert various other objects and data types into Observables
from()
接收一個集合作為輸入怕篷,然后每次輸出一個元素給subscriber
.
List<String> s = Arrays.asList("Java", "Android", "Ruby", "Ios", "Swift");
Observable.from(s).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i("@@@", s);
}
});
filter
返回滿足過濾條件的數(shù)據(jù)历筝。
Observable.from(new Integer[]{1, 2, 3, 4, 5, 6, 7, 8, 9})
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer < 5;
}
})
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.i("@@@", "integer=" + integer); //1,2,3,4
}
});
timer
Timer
會在指定時間后發(fā)射一個數(shù)字0,該操作符運行在Computation Scheduler
廊谓。
Observable.timer(3, TimeUnit.SECONDS).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
Log.i("@@@", "aLong=" + aLong); // 延時3s
}
});
interval
創(chuàng)建一個按固定時間間隔發(fā)射整數(shù)序列的Observable
.
interval
默認(rèn)在computation
調(diào)度器上執(zhí)行梳猪。
Observable.interval(1, TimeUnit.SECONDS, AndroidSchedulers.mainThread())
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
Log.i("@@@", "aLong=" + aLong); //從0遞增,間隔1s 0,1,2,3....
}
});
Repeat
重復(fù)執(zhí)行
doOnNext
其實覺得doOnNext
應(yīng)該不算一個操作符蒸痹,但考慮到其常用性春弥,我們還是咬咬牙將它放在了這里。它的作用是讓訂閱者在接收到數(shù)據(jù)之前干點有意思的事情叠荠。假如我們在獲取到數(shù)據(jù)之前想先保存一下它匿沛,無疑我們可以這樣實現(xiàn)。
distinct
這個操作符非常的簡單榛鼎、通俗俺祠、易懂公给,就是簡單的去重
take
take
,接受一個long
型參數(shù)count
蜘渣,代表至多接收count
個數(shù)據(jù)淌铐。
等等...就不繼續(xù)介紹了,到時候查下文檔就好了蔫缸。
是不是感覺沒什么卵用腿准,也稀里糊涂的,下面用一個網(wǎng)絡(luò)請求的例子:
很多時候我們在使用RxJava
的時候總是和Retrofit
進行結(jié)合使用拾碌,而為了方便演示吐葱,這里我們就暫且采用OkHttp3
進行演示,配合map
校翔,doOnNext
弟跑,線程切換進行簡單的網(wǎng)絡(luò)請求:
- 通過
Observable.create()
方法,調(diào)用OkHttp
網(wǎng)絡(luò)請求防症; - 通過
map
操作符集合gson
孟辑,將Response
轉(zhuǎn)換為bean
類; - 通過
doOnNext()
方法蔫敲,解析bean
中的數(shù)據(jù)饲嗽,并進行數(shù)據(jù)庫存儲等操作; - 調(diào)度線程奈嘿,在子線程中進行耗時操作任務(wù)貌虾,在主線程中更新
UI
; - 通過
subscribe()
裙犹,根據(jù)請求成功或者失敗來更新UI
尽狠。
Observable.create(new ObservableOnSubscribe<Response>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Response> e) throws Exception {
Builder builder = new Builder()
.url(mUrl)
.get();
Request request = builder.build();
Call call = new OkHttpClient().newCall(request);
Response response = call.execute();
e.onNext(response);
}
}).map(new Function<Response, MobileAddress>() {
@Override
public MobileAddress apply(@NonNull Response response) throws Exception {
Log.e(TAG, "map 線程:" + Thread.currentThread().getName() + "\n");
if (response.isSuccessful()) {
ResponseBody body = response.body();
if (body != null) {
Log.e(TAG, "map:轉(zhuǎn)換前:" + response.body());
return new Gson().fromJson(body.string(), MobileAddress.class);
}
}
return null;
}
}).observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<MobileAddress>() {
@Override
public void accept(@NonNull MobileAddress s) throws Exception {
Log.e(TAG, "doOnNext 線程:" + Thread.currentThread().getName() + "\n");
mRxOperatorsText.append("\ndoOnNext 線程:" + Thread.currentThread().getName() + "\n");
Log.e(TAG, "doOnNext: 保存成功:" + s.toString() + "\n");
mRxOperatorsText.append("doOnNext: 保存成功:" + s.toString() + "\n");
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<MobileAddress>() {
@Override
public void accept(@NonNull MobileAddress data) throws Exception {
Log.e(TAG, "subscribe 線程:" + Thread.currentThread().getName() + "\n");
mRxOperatorsText.append("\nsubscribe 線程:" + Thread.currentThread().getName() + "\n");
Log.e(TAG, "成功:" + data.toString() + "\n");
mRxOperatorsText.append("成功:" + data.toString() + "\n");
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
Log.e(TAG, "subscribe 線程:" + Thread.currentThread().getName() + "\n");
mRxOperatorsText.append("\nsubscribe 線程:" + Thread.currentThread().getName() + "\n");
Log.e(TAG, "失敗:" + throwable.getMessage() + "\n");
mRxOperatorsText.append("失斠镀浴:" + throwable.getMessage() + "\n");
}
});
更多內(nèi)容請看下一篇文章RxJava詳解(二)
參考:
- RxJava Wiki
- Grokking RxJava, Part 1: The Basics
- NotRxJava
- When Not to Use RxJava
- 給 Android 開發(fā)者的 RxJava 詳解
- Google Agera 從入門到放棄
- 郵箱 :charon.chui@gmail.com
- Good Luck!