這可能是最好的 RxJava 2.x 入門教程系列專欄
文章鏈接:
這可能是最好的RxJava 2.x 入門教程(一)
這可能是最好的RxJava 2.x 入門教程(二)
這可能是最好的RxJava 2.x 入門教程(三)
這可能是最好的RxJava 2.x 入門教程(四)
這可能是最好的RxJava 2.x 入門教程(五)
GitHub 代碼同步更新:https://github.com/nanchen2251/RxJava2Examples
為了滿足大家的饑渴難耐奴迅,GitHub將同步更新代碼板驳,主要包含基本的代碼封裝掠哥,RxJava 2.x所有操作符應(yīng)用場景介紹和實際應(yīng)用場景帚豪,后期除了RxJava可能還會增添其他東西,總之枕屉,GitHub上的Demo專為大家傾心打造蠢终。傳送門:https://github.com/nanchen2251/RxJava2Examples
為什么要學(xué) RxJava晋辆?
提升開發(fā)效率廓奕,降低維護(hù)成本一直是開發(fā)團(tuán)隊永恒不變的宗旨。近兩年來國內(nèi)的技術(shù)圈子中越來越多的開始提及 RxJava 档叔,越來越多的應(yīng)用和面試中都會有 RxJava 桌粉,而就目前的情況,Android 的網(wǎng)絡(luò)庫基本被 Retrofit + OkHttp 一統(tǒng)天下了衙四,而配合上響應(yīng)式編程 RxJava 可謂如魚得水铃肯。想必大家肯定被近期的 Kotlin 炸開了鍋,筆者也在閑暇之時去了解了一番(作為一個與時俱進(jìn)的有理想的青年怎么可能不與時俱進(jìn)传蹈?)押逼,發(fā)現(xiàn)其中有個非常好的優(yōu)點就是簡潔,支持函數(shù)式編程惦界。是的挑格, RxJava 最大的優(yōu)點也是簡潔,但它不止是簡潔沾歪,而且是** 隨著程序邏輯變得越來越復(fù)雜漂彤,它依然能夠保持簡潔 **(這貨潔身自好呀有木有)。
咳咳,要例子挫望,猛戳這里:給 Android 開發(fā)者的 RxJava 詳解
什么是響應(yīng)式編程
上面我們提及了響應(yīng)式編程立润,不少新司機對它可謂一臉懵逼,那什么是響應(yīng)式編程呢媳板?響應(yīng)式編程是一種基于異步數(shù)據(jù)流概念的編程模式桑腮。數(shù)據(jù)流就像一條河:它可以被觀測,被過濾蛉幸,被操作破讨,或者為新的消費者與另外一條流合并為一條新的流。
響應(yīng)式編程的一個關(guān)鍵概念是事件巨缘。事件可以被等待添忘,可以觸發(fā)過程,也可以觸發(fā)其它事件若锁。事件是唯一的以合適的方式將我們的現(xiàn)實世界映射到我們的軟件中:如果屋里太熱了我們就打開一扇窗戶搁骑。同樣的,當(dāng)我們的天氣app從服務(wù)端獲取到新的天氣數(shù)據(jù)后又固,我們需要更新app上展示天氣信息的UI仲器;汽車上的車道偏移系統(tǒng)探測到車輛偏移了正常路線就會提醒駕駛者糾正,就是是響應(yīng)事件仰冠。
今天乏冀,響應(yīng)式編程最通用的一個場景是UI:我們的移動App必須做出對網(wǎng)絡(luò)調(diào)用、用戶觸摸輸入和系統(tǒng)彈框的響應(yīng)洋只。在這個世界上辆沦,軟件之所以是事件驅(qū)動并響應(yīng)的是因為現(xiàn)實生活也是如此。
為什么出了一個系列后還有完結(jié)版识虚?
RxJava 這些年可謂越來越流行肢扯,而在去年的晚些時候發(fā)布了2.0正式版。大半年已過担锤,雖然網(wǎng)上已經(jīng)出現(xiàn)了大部分的 RxJava 教程(其實細(xì)心的你還是會發(fā)現(xiàn) 1.x 的超級多)蔚晨,前些日子,筆者花了大約兩周的閑暇之時寫了 RxJava 2.x 系列教程肛循,也得到了不少反饋铭腕,其中就有不少讀者覺得每一篇的教程太短,抑或是希望更多的側(cè)重適用場景的介紹多糠,在這樣的大前提下累舷,這篇完結(jié)版教程就此誕生,僅供各位新司機采納夹孔。
開始
RxJava 2.x 已經(jīng)按照 Reactive-Streams specification 規(guī)范完全的重寫了笋粟,maven也被放在了io.reactivex.rxjava2:rxjava:2.x.y
下怀挠,所以 RxJava 2.x 獨立于 RxJava 1.x 而存在,而隨后官方宣布的將在一段時間后終止對 RxJava 1.x 的維護(hù)害捕,所以對于熟悉 RxJava 1.x 的老司機自然可以直接看一下 2.x 的文檔和異同就能輕松上手了绿淋,而對于不熟悉的年輕司機,不要慌尝盼,本醬帶你裝逼帶你飛吞滞,馬上就發(fā)車,坐穩(wěn)了:https://github.com/nanchen2251/RxJava2Examples
你只需要在 build.gradle 中加上:compile 'io.reactivex.rxjava2:rxjava:2.1.1'
(2.1.1為寫此文章時的最新版本)
接口變化
RxJava 2.x 擁有了新的特性盾沫,其依賴于4個基礎(chǔ)接口裁赠,它們分別是
- Publisher
- Subscriber
- Subscription
- Processor
其中最核心的莫過于 Publisher
和 Subscriber
。Publisher
可以發(fā)出一系列的事件赴精,而 Subscriber
負(fù)責(zé)和處理這些事件佩捞。
其中用的比較多的自然是 Publisher
的 Flowable
,它支持背壓蕾哟。關(guān)于背壓給個簡潔的定義就是:
背壓是指在異步場景中一忱,被觀察者發(fā)送事件速度遠(yuǎn)快于觀察者的處理速度的情況下,一種告訴上游的被觀察者降低發(fā)送速度的策略谭确。
簡而言之帘营,背壓是流速控制的一種策略。有興趣的可以看一下官方對于背壓的講解逐哈。
可以明顯地發(fā)現(xiàn)芬迄,RxJava 2.x 最大的改動就是對于 backpressure
的處理,為此將原來的 Observable
拆分成了新的 Observable
和 Flowable
昂秃,同時其他相關(guān)部分也同時進(jìn)行了拆分禀梳,但令人慶幸的是,是它肠骆,是它出皇,還是它,還是我們最熟悉和最喜歡的 RxJava哗戈。
觀察者模式
大家可能都知道, RxJava 以觀察者模式為骨架荷科,在 2.0 中依舊如此唯咬。
不過此次更新中,出現(xiàn)了兩種觀察者模式:
- Observable ( 被觀察者 ) / Observer ( 觀察者 )
-
Flowable (被觀察者)/ Subscriber (觀察者)
在 RxJava 2.x 中畏浆,Observable 用于訂閱 Observer胆胰,不再支持背壓(1.x 中可以使用背壓策略),而 Flowable 用于訂閱 Subscriber 刻获, 是支持背壓(Backpressure)的蜀涨。
Observable
在 RxJava 1.x 中,我們最熟悉的莫過于 Observable
這個類了,筆者在剛剛使用 RxJava 2.x 的時候厚柳,創(chuàng)建了 一個 Observable
,瞬間一臉懵逼有木有,居然連我們最最熟悉的 Subscriber
都沒了茧球,取而代之的是 ObservableEmmiter
晶府,俗稱發(fā)射器。此外碳想,由于沒有了Subscriber
的蹤影烧董,我們創(chuàng)建觀察者時需使用 Observer
。而 Observer
也不是我們熟悉的那個 Observer
胧奔,又出現(xiàn)了一個 Disposable
參數(shù)帶你裝逼帶你飛逊移。
廢話不多說,從會用開始龙填,還記得 RxJava 的三部曲嗎胳泉?
** 第一步:初始化 Observable **
** 第二步:初始化 Observer **
** 第三步:建立訂閱關(guān)系 **
Observable.create(new ObservableOnSubscribe<Integer>() { // 第一步:初始化Observable
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
Log.e(TAG, "Observable emit 1" + "\n");
e.onNext(1);
Log.e(TAG, "Observable emit 2" + "\n");
e.onNext(2);
Log.e(TAG, "Observable emit 3" + "\n");
e.onNext(3);
e.onComplete();
Log.e(TAG, "Observable emit 4" + "\n" );
e.onNext(4);
}
}).subscribe(new Observer<Integer>() { // 第三步:訂閱
// 第二步:初始化Observer
private int i;
private Disposable mDisposable;
@Override
public void onSubscribe(@NonNull Disposable d) {
mDisposable = d;
}
@Override
public void onNext(@NonNull Integer integer) {
i++;
if (i == 2) {
// 在RxJava 2.x 中,新增的Disposable可以做到切斷的操作觅够,讓Observer觀察者不再接收上游事件
mDisposable.dispose();
}
}
@Override
public void onError(@NonNull Throwable e) {
Log.e(TAG, "onError : value : " + e.getMessage() + "\n" );
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete" + "\n" );
}
});
不難看出胶背,RxJava 2.x 與 1.x 還是存在著一些區(qū)別的。首先喘先,創(chuàng)建 Observable
時钳吟,回調(diào)的是 ObservableEmitter
,字面意思即發(fā)射器窘拯,并且直接 throws Exception红且。其次,在創(chuàng)建的 Observer 中涤姊,也多了一個回調(diào)方法:onSubscribe
暇番,傳遞參數(shù)為Disposable
,Disposable
相當(dāng)于 RxJava 1.x 中的 Subscription
思喊, 用于解除訂閱壁酬。可以看到示例代碼中恨课,在 i 自增到 2 的時候舆乔,訂閱關(guān)系被切斷。
07-03 14:24:11.663 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: onSubscribe : false
07-03 14:24:11.664 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: Observable emit 1
07-03 14:24:11.665 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: onNext : value : 1
07-03 14:24:11.666 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: Observable emit 2
07-03 14:24:11.667 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: onNext : value : 2
07-03 14:24:11.668 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: onNext : isDisposable : true
07-03 14:24:11.669 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: Observable emit 3
07-03 14:24:11.670 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: Observable emit 4
當(dāng)然剂公,我們的 RxJava 2.x 也為我們保留了簡化訂閱方法希俩,我們可以根據(jù)需求,進(jìn)行相應(yīng)的簡化訂閱纲辽,只不過傳入對象改為了 Consumer
颜武。
Consumer
即消費者璃搜,用于接收單個值,BiConsumer
則是接收兩個值鳞上,Function
用于變換對象这吻,Predicate
用于判斷。這些接口命名大多參照了 Java 8 因块,熟悉 Java 8 新特性的應(yīng)該都知道意思橘原,這里也不再贅述。
線程調(diào)度
關(guān)于線程切換這點涡上,RxJava 1.x 和 RxJava 2.x 的實現(xiàn)思路是一樣的趾断。這里簡單的說一下,以便于我們的新司機入手吩愧。
subScribeOn
同 RxJava 1.x 一樣芋酌,subscribeOn
用于指定 subscribe()
時所發(fā)生的線程,從源碼角度可以看出雁佳,內(nèi)部線程調(diào)度是通過 ObservableSubscribeOn
來實現(xiàn)的脐帝。
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
ObservableSubscribeOn
的核心源碼在 subscribeActual
方法中,通過代理的方式使用 SubscribeOnObserver
包裝 Observer
后糖权,設(shè)置 Disposable
來將 subscribe
切換到 Scheduler
線程中堵腹。
observeOn
observeOn
方法用于指定下游 Observer
回調(diào)發(fā)生的線程。
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
線程切換需要注意的
RxJava 內(nèi)置的線程調(diào)度器的確可以讓我們的線程切換得心應(yīng)手星澳,但其中也有些需要注意的地方疚顷。
- 簡單地說,
subscribeOn()
指定的就是發(fā)射事件的線程禁偎,observerOn
指定的就是訂閱者接收事件的線程腿堤。 - 多次指定發(fā)射事件的線程只有第一次指定的有效,也就是說多次調(diào)用
subscribeOn()
只有第一次的有效如暖,其余的會被忽略笆檀。 - 但多次指定訂閱者接收線程是可以的,也就是說每調(diào)用一次
observerOn()
盒至,下游的線程就會切換一次酗洒。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
Log.e(TAG, "Observable thread is : " + Thread.currentThread().getName());
e.onNext(1);
e.onComplete();
}
}).subscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, "After observeOn(mainThread),Current thread is " + Thread.currentThread().getName());
}
})
.observeOn(Schedulers.io())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, "After observeOn(io)枷遂,Current thread is " + Thread.currentThread().getName());
}
});
輸出:
07-03 14:54:01.177 15121-15438/com.nanchen.rxjava2examples E/RxThreadActivity: Observable thread is : RxNewThreadScheduler-1
07-03 14:54:01.178 15121-15121/com.nanchen.rxjava2examples E/RxThreadActivity: After observeOn(mainThread)樱衷,Current thread is main
07-03 14:54:01.179 15121-15439/com.nanchen.rxjava2examples E/RxThreadActivity: After observeOn(io),Current thread is RxCachedThreadScheduler-2
實例代碼中登淘,分別用 Schedulers.newThread()
和 Schedulers.io()
對發(fā)射線程進(jìn)行切換,并采用 observeOn(AndroidSchedulers.mainThread()
和 Schedulers.io()
進(jìn)行了接收線程的切換封字∏荩可以看到輸出中發(fā)射線程僅僅響應(yīng)了第一個 newThread
耍鬓,但每調(diào)用一次 observeOn()
,線程便會切換一次流妻,因此如果我們有類似的需求時牲蜀,便知道如何處理了。
RxJava 中绅这,已經(jīng)內(nèi)置了很多線程選項供我們選擇涣达,例如有:
-
Schedulers.io()
代表io操作的線程, 通常用于網(wǎng)絡(luò),讀寫文件等io密集型的操作; -
Schedulers.computation()
代表CPU計算密集型的操作, 例如需要大量計算的操作证薇; -
Schedulers.newThread()
代表一個常規(guī)的新線程度苔; -
AndroidSchedulers.mainThread()
代表Android的主線程
這些內(nèi)置的 Scheduler
已經(jīng)足夠滿足我們開發(fā)的需求,因此我們應(yīng)該使用內(nèi)置的這些選項浑度,而 RxJava 內(nèi)部使用的是線程池來維護(hù)這些線程寇窑,所以效率也比較高。
操作符
關(guān)于操作符箩张,在官方文檔中已經(jīng)做了非常完善的講解甩骏,并且筆者前面的系列教程中也著重講解了絕大多數(shù)的操作符作用,這里受于篇幅限制先慷,就不多做贅述饮笛,只挑選幾個進(jìn)行實際情景的講解。
map
map
操作符可以將一個 Observable
對象通過某種關(guān)系轉(zhuǎn)換為另一個Observable
對象论熙。在 2.x 中和 1.x 中作用幾乎一致福青,不同點在于:2.x 將 1.x 中的 Func1
和 Func2
改為了 Function
和 BiFunction
。
采用 map 操作符進(jìn)行網(wǎng)絡(luò)數(shù)據(jù)解析
想必大家都知道赴肚,很多時候我們在使用 RxJava 的時候總是和 Retrofit 進(jìn)行結(jié)合使用素跺,而為了方便演示,這里我們就暫且采用 OkHttp3 進(jìn)行演示誉券,配合 map
指厌,doOnNext
,線程切換進(jìn)行簡單的網(wǎng)絡(luò)請求:
1)通過 Observable.create() 方法踊跟,調(diào)用 OkHttp 網(wǎng)絡(luò)請求踩验;
2)通過 map 操作符集合 gson,將 Response 轉(zhuǎn)換為 bean 類商玫;
3)通過 doOnNext() 方法箕憾,解析 bean 中的數(shù)據(jù),并進(jìn)行數(shù)據(jù)庫存儲等操作拳昌;
4)調(diào)度線程袭异,在子線程中進(jìn)行耗時操作任務(wù),在主線程中更新 UI 炬藤;
5)通過 subscribe()御铃,根據(jù)請求成功或者失敗來更新 UI 碴里。
Observable.create(new ObservableOnSubscribe<Response>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Response> e) throws Exception {
Builder builder = new Builder()
.url("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512")
.get();
Request request = builder.build();
Call call = new OkHttpClient().newCall(request);
Response response = call.execute();
e.onNext(response);
}
}).map(new Function<Response, MobileAddress>() {
@Override
public MobileAddress apply(@NonNull Response response) throws Exception {
if (response.isSuccessful()) {
ResponseBody body = response.body();
if (body != null) {
Log.e(TAG, "map:轉(zhuǎn)換前:" + response.body());
return new Gson().fromJson(body.string(), MobileAddress.class);
}
}
return null;
}
}).observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<MobileAddress>() {
@Override
public void accept(@NonNull MobileAddress s) throws Exception {
Log.e(TAG, "doOnNext: 保存成功:" + s.toString() + "\n");
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<MobileAddress>() {
@Override
public void accept(@NonNull MobileAddress data) throws Exception {
Log.e(TAG, "成功:" + data.toString() + "\n");
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
Log.e(TAG, "失敗:" + throwable.getMessage() + "\n");
}
});
concat
concat
可以做到不交錯的發(fā)射兩個甚至多個 Observable 的發(fā)射事件上真,并且只有前一個 Observable
終止(onComplete
) 后才會訂閱下一個 Observable
咬腋。
采用 concat 操作符先讀取緩存再通過網(wǎng)絡(luò)請求獲取數(shù)據(jù)
想必在實際應(yīng)用中,很多時候(對數(shù)據(jù)操作不敏感時)都需要我們先讀取緩存的數(shù)據(jù)睡互,如果緩存沒有數(shù)據(jù)根竿,再通過網(wǎng)絡(luò)請求獲取,隨后在主線程更新我們的UI就珠。
concat
操作符簡直就是為我們這種需求量身定做寇壳。
利用 concat
的必須調(diào)用 onComplete
后才能訂閱下一個 Observable
的特性,我們就可以先讀取緩存數(shù)據(jù)嗓违,倘若獲取到的緩存數(shù)據(jù)不是我們想要的九巡,再調(diào)用 onComplete()
以執(zhí)行獲取網(wǎng)絡(luò)數(shù)據(jù)的 Observable
,如果緩存數(shù)據(jù)能應(yīng)我們所需蹂季,則直接調(diào)用 onNext()
冕广,防止過度的網(wǎng)絡(luò)請求,浪費用戶的流量偿洁。
Observable<FoodList> cache = Observable.create(new ObservableOnSubscribe<FoodList>() {
@Override
public void subscribe(@NonNull ObservableEmitter<FoodList> e) throws Exception {
Log.e(TAG, "create當(dāng)前線程:"+Thread.currentThread().getName() );
FoodList data = CacheManager.getInstance().getFoodListData();
// 在操作符 concat 中撒汉,只有調(diào)用 onComplete 之后才會執(zhí)行下一個 Observable
if (data != null){ // 如果緩存數(shù)據(jù)不為空,則直接讀取緩存數(shù)據(jù)涕滋,而不讀取網(wǎng)絡(luò)數(shù)據(jù)
isFromNet = false;
Log.e(TAG, "\nsubscribe: 讀取緩存數(shù)據(jù):" );
runOnUiThread(new Runnable() {
@Override
public void run() {
mRxOperatorsText.append("\nsubscribe: 讀取緩存數(shù)據(jù):\n");
}
});
e.onNext(data);
}else {
isFromNet = true;
runOnUiThread(new Runnable() {
@Override
public void run() {
mRxOperatorsText.append("\nsubscribe: 讀取網(wǎng)絡(luò)數(shù)據(jù):\n");
}
});
Log.e(TAG, "\nsubscribe: 讀取網(wǎng)絡(luò)數(shù)據(jù):" );
e.onComplete();
}
}
});
Observable<FoodList> network = Rx2AndroidNetworking.get("http://www.tngou.net/api/food/list")
.addQueryParameter("rows",10+"")
.build()
.getObjectObservable(FoodList.class);
// 兩個 Observable 的泛型應(yīng)當(dāng)保持一致
Observable.concat(cache,network)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<FoodList>() {
@Override
public void accept(@NonNull FoodList tngouBeen) throws Exception {
Log.e(TAG, "subscribe 成功:"+Thread.currentThread().getName() );
if (isFromNet){
mRxOperatorsText.append("accept : 網(wǎng)絡(luò)獲取數(shù)據(jù)設(shè)置緩存: \n");
Log.e(TAG, "accept : 網(wǎng)絡(luò)獲取數(shù)據(jù)設(shè)置緩存: \n"+tngouBeen.toString() );
CacheManager.getInstance().setFoodListData(tngouBeen);
}
mRxOperatorsText.append("accept: 讀取數(shù)據(jù)成功:" + tngouBeen.toString()+"\n");
Log.e(TAG, "accept: 讀取數(shù)據(jù)成功:" + tngouBeen.toString());
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
Log.e(TAG, "subscribe 失敗:"+Thread.currentThread().getName() );
Log.e(TAG, "accept: 讀取數(shù)據(jù)失敳欠:"+throwable.getMessage() );
mRxOperatorsText.append("accept: 讀取數(shù)據(jù)失敗:"+throwable.getMessage()+"\n");
}
});
有時候我們的緩存可能還會分為 memory 和 disk 宾肺,實際上都差不多溯饵,無非是多寫點 Observable
,然后通過 concat
合并即可锨用。
flatMap 實現(xiàn)多個網(wǎng)絡(luò)請求依次依賴
想必這種情況也在實際情況中比比皆是丰刊,例如用戶注冊成功后需要自動登錄,我們只需要先通過注冊接口注冊用戶信息增拥,注冊成功后馬上調(diào)用登錄接口進(jìn)行自動登錄即可啄巧。
我們的 flatMap
恰好解決了這種應(yīng)用場景,flatMap
操作符可以將一個發(fā)射數(shù)據(jù)的 Observable
變換為多個 Observables
掌栅,然后將它們發(fā)射的數(shù)據(jù)合并后放到一個單獨的 Observable
秩仆,利用這個特性,我們很輕松地達(dá)到了我們的需求猾封。
Rx2AndroidNetworking.get("http://www.tngou.net/api/food/list")
.addQueryParameter("rows", 1 + "")
.build()
.getObjectObservable(FoodList.class) // 發(fā)起獲取食品列表的請求澄耍,并解析到FootList
.subscribeOn(Schedulers.io()) // 在io線程進(jìn)行網(wǎng)絡(luò)請求
.observeOn(AndroidSchedulers.mainThread()) // 在主線程處理獲取食品列表的請求結(jié)果
.doOnNext(new Consumer<FoodList>() {
@Override
public void accept(@NonNull FoodList foodList) throws Exception {
// 先根據(jù)獲取食品列表的響應(yīng)結(jié)果做一些操作
Log.e(TAG, "accept: doOnNext :" + foodList.toString());
mRxOperatorsText.append("accept: doOnNext :" + foodList.toString()+"\n");
}
})
.observeOn(Schedulers.io()) // 回到 io 線程去處理獲取食品詳情的請求
.flatMap(new Function<FoodList, ObservableSource<FoodDetail>>() {
@Override
public ObservableSource<FoodDetail> apply(@NonNull FoodList foodList) throws Exception {
if (foodList != null && foodList.getTngou() != null && foodList.getTngou().size() > 0) {
return Rx2AndroidNetworking.post("http://www.tngou.net/api/food/show")
.addBodyParameter("id", foodList.getTngou().get(0).getId() + "")
.build()
.getObjectObservable(FoodDetail.class);
}
return null;
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<FoodDetail>() {
@Override
public void accept(@NonNull FoodDetail foodDetail) throws Exception {
Log.e(TAG, "accept: success :" + foodDetail.toString());
mRxOperatorsText.append("accept: success :" + foodDetail.toString()+"\n");
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
Log.e(TAG, "accept: error :" + throwable.getMessage());
mRxOperatorsText.append("accept: error :" + throwable.getMessage()+"\n");
}
});
善用 zip 操作符,實現(xiàn)多個接口數(shù)據(jù)共同更新 UI
在實際應(yīng)用中,我們極有可能會在一個頁面顯示的數(shù)據(jù)來源于多個接口齐莲,這時候我們的 zip
操作符為我們排憂解難卿城。
zip
操作符可以將多個 Observable
的數(shù)據(jù)結(jié)合為一個數(shù)據(jù)源再發(fā)射出去。
Observable<MobileAddress> observable1 = Rx2AndroidNetworking.get("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512")
.build()
.getObjectObservable(MobileAddress.class);
Observable<CategoryResult> observable2 = Network.getGankApi()
.getCategoryData("Android",1,1);
Observable.zip(observable1, observable2, new BiFunction<MobileAddress, CategoryResult, String>() {
@Override
public String apply(@NonNull MobileAddress mobileAddress, @NonNull CategoryResult categoryResult) throws Exception {
return "合并后的數(shù)據(jù)為:手機歸屬地:"+mobileAddress.getResult().getMobilearea()+"人名:"+categoryResult.results.get(0).who;
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.e(TAG, "accept: 成功:" + s+"\n");
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
Log.e(TAG, "accept: 失斍Υ辍:" + throwable+"\n");
}
});
采用 interval 操作符實現(xiàn)心跳間隔任務(wù)
想必即時通訊等需要輪訓(xùn)的任務(wù)在如今的 APP 中已是很常見,而 RxJava 2.x 的 interval
操作符可謂完美地解決了我們的疑惑搀捷。
這里就簡單的意思一下輪訓(xùn)星掰。
private Disposable mDisposable;
@Override
protected void doSomething() {
mDisposable = Flowable.interval(1, TimeUnit.SECONDS)
.doOnNext(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.e(TAG, "accept: doOnNext : "+aLong );
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.e(TAG, "accept: 設(shè)置文本 :"+aLong );
mRxOperatorsText.append("accept: 設(shè)置文本 :"+aLong +"\n");
}
});
}
/**
* 銷毀時停止心跳
*/
@Override
protected void onDestroy() {
super.onDestroy();
if (mDisposable != null){
mDisposable.dispose();
}
}
RxJava 1.x 如何平滑升級到 RxJava 2.x?
由于 RxJava 2.x 變化較大無法直接升級嫩舟,幸運的是氢烘,官方為我們提供了 RxJava2Interrop 這個庫,可以方便地把 RxJava 1.x 升級到 RxJava 2.x家厌,或者將 RxJava 2.x 轉(zhuǎn)回到 RxJava 1.x播玖。
寫在最后
本醬看你都看到這兒了,實為未來的棟梁之才饭于,所以且送你一本經(jīng)書:
https://github.com/nanchen2251/RxJava2Examples