RxJava2 實(shí)戰(zhàn)系列文章
RxJava2 實(shí)戰(zhàn)知識(shí)梳理(1) - 后臺(tái)執(zhí)行耗時(shí)操作岗仑,實(shí)時(shí)通知 UI 更新
RxJava2 實(shí)戰(zhàn)知識(shí)梳理(2) - 計(jì)算一段時(shí)間內(nèi)數(shù)據(jù)的平均值
RxJava2 實(shí)戰(zhàn)知識(shí)梳理(3) - 優(yōu)化搜索聯(lián)想功能
RxJava2 實(shí)戰(zhàn)知識(shí)梳理(4) - 結(jié)合 Retrofit 請(qǐng)求新聞資訊
RxJava2 實(shí)戰(zhàn)知識(shí)梳理(5) - 簡(jiǎn)單及進(jìn)階的輪詢操作
RxJava2 實(shí)戰(zhàn)知識(shí)梳理(6) - 基于錯(cuò)誤類型的重試請(qǐng)求
RxJava2 實(shí)戰(zhàn)知識(shí)梳理(7) - 基于 combineLatest 實(shí)現(xiàn)的輸入表單驗(yàn)證
RxJava2 實(shí)戰(zhàn)知識(shí)梳理(8) - 使用 publish + merge 優(yōu)化先加載緩存诅需,再讀取網(wǎng)絡(luò)數(shù)據(jù)的請(qǐng)求過(guò)程
RxJava2 實(shí)戰(zhàn)知識(shí)梳理(9) - 使用 timer/interval/delay 實(shí)現(xiàn)任務(wù)調(diào)度
RxJava2 實(shí)戰(zhàn)知識(shí)梳理(10) - 屏幕旋轉(zhuǎn)導(dǎo)致 Activity 重建時(shí)恢復(fù)任務(wù)
RxJava2 實(shí)戰(zhàn)知識(shí)梳理(11) - 檢測(cè)網(wǎng)絡(luò)狀態(tài)并自動(dòng)重試請(qǐng)求
RxJava2 實(shí)戰(zhàn)知識(shí)梳理(12) - 實(shí)戰(zhàn)講解 publish & replay & share & refCount & autoConnect
RxJava2 實(shí)戰(zhàn)知識(shí)梳理(13) - 如何使得錯(cuò)誤發(fā)生時(shí)不自動(dòng)停止訂閱關(guān)系
RxJava2 實(shí)戰(zhàn)知識(shí)梳理(14) - 在 token 過(guò)期時(shí)秧廉,刷新過(guò)期 token 并重新發(fā)起請(qǐng)求
RxJava2 實(shí)戰(zhàn)知識(shí)梳理(15) - 實(shí)現(xiàn)一個(gè)簡(jiǎn)單的 MVP + RxJava + Retrofit 應(yīng)用
一、前言
在很多資訊應(yīng)用當(dāng)中汁果,當(dāng)我們進(jìn)入一個(gè)新的頁(yè)面涡拘,為了提升用戶體驗(yàn),不讓頁(yè)面空白太久据德,我們一般會(huì)先讀取緩存中的數(shù)據(jù)鳄乏,再去請(qǐng)求網(wǎng)絡(luò)。
今天這篇文章棘利,我們將實(shí)現(xiàn)下面這個(gè)效果:同時(shí)發(fā)起讀取緩存橱野、訪問(wèn)網(wǎng)絡(luò)的請(qǐng)求,如果緩存的數(shù)據(jù)先回來(lái)赡译,那么就先展示緩存的數(shù)據(jù)仲吏,而如果網(wǎng)絡(luò)的數(shù)據(jù)先回來(lái),那么就不再展示緩存的數(shù)據(jù)蝌焚。
為了讓大家對(duì)這一過(guò)程有更深刻的理解裹唆,我們介紹"先加載緩存,再請(qǐng)求網(wǎng)絡(luò)"這種模型的四種實(shí)現(xiàn)方式只洒,其中第四種實(shí)現(xiàn)可以達(dá)到上面我們所說(shuō)的效果许帐,而前面的三種實(shí)現(xiàn)雖然也能夠?qū)崿F(xiàn)相同的需求,并且可以正常工作毕谴,但是在某些特殊情況下成畦,會(huì)出現(xiàn)意想不到的情況:
- 使用
concat
實(shí)現(xiàn) - 使用
concatEager
實(shí)現(xiàn) - 使用
merge
實(shí)現(xiàn) - 使用
publish
實(shí)現(xiàn)
二距芬、示例
2.1 準(zhǔn)備工作
我們需要準(zhǔn)備兩個(gè)Observable
,分別表示 緩存數(shù)據(jù)源 和 網(wǎng)絡(luò)數(shù)據(jù)源循帐,在其中填入相應(yīng)的緩存數(shù)據(jù)和網(wǎng)絡(luò)數(shù)據(jù)框仔,為了之后演示一些特殊的情況,我們可以在創(chuàng)建它的時(shí)候指定它執(zhí)行的時(shí)間:
//模擬緩存數(shù)據(jù)源拄养。
private Observable<List<NewsResultEntity>> getCacheArticle(final long simulateTime) {
return Observable.create(new ObservableOnSubscribe<List<NewsResultEntity>>() {
@Override
public void subscribe(ObservableEmitter<List<NewsResultEntity>> observableEmitter) throws Exception {
try {
Log.d(TAG, "開始加載緩存數(shù)據(jù)");
Thread.sleep(simulateTime);
List<NewsResultEntity> results = new ArrayList<>();
for (int i = 0; i < 10; i++) {
NewsResultEntity entity = new NewsResultEntity();
entity.setType("緩存");
entity.setDesc("序號(hào)=" + i);
results.add(entity);
}
observableEmitter.onNext(results);
observableEmitter.onComplete();
Log.d(TAG, "結(jié)束加載緩存數(shù)據(jù)");
} catch (InterruptedException e) {
if (!observableEmitter.isDisposed()) {
observableEmitter.onError(e);
}
}
}
});
}
//模擬網(wǎng)絡(luò)數(shù)據(jù)源离斩。
private Observable<List<NewsResultEntity>> getNetworkArticle(final long simulateTime) {
return Observable.create(new ObservableOnSubscribe<List<NewsResultEntity>>() {
@Override
public void subscribe(ObservableEmitter<List<NewsResultEntity>> observableEmitter) throws Exception {
try {
Log.d(TAG, "開始加載網(wǎng)絡(luò)數(shù)據(jù)");
Thread.sleep(simulateTime);
List<NewsResultEntity> results = new ArrayList<>();
for (int i = 0; i < 10; i++) {
NewsResultEntity entity = new NewsResultEntity();
entity.setType("網(wǎng)絡(luò)");
entity.setDesc("序號(hào)=" + i);
results.add(entity);
}
observableEmitter.onNext(results);
observableEmitter.onComplete();
Log.d(TAG, "結(jié)束加載網(wǎng)絡(luò)數(shù)據(jù)");
} catch (InterruptedException e) {
if (!observableEmitter.isDisposed()) {
observableEmitter.onError(e);
}
}
}
});
}
在最終的下游,我們接收數(shù)據(jù)瘪匿,并在頁(yè)面上通過(guò)RecyclerView
進(jìn)行展示:
private DisposableObserver<List<NewsResultEntity>> getArticleObserver() {
return new DisposableObserver<List<NewsResultEntity>>() {
@Override
public void onNext(List<NewsResultEntity> newsResultEntities) {
mNewsResultEntities.clear();
mNewsResultEntities.addAll(newsResultEntities);
mNewsAdapter.notifyDataSetChanged();
}
@Override
public void onError(Throwable throwable) {
Log.d(TAG, "加載錯(cuò)誤, e=" + throwable);
}
@Override
public void onComplete() {
Log.d(TAG, "加載完成");
}
};
}
2.2 使用 concat 實(shí)現(xiàn)
concat
是很多文章都推薦使用的方式跛梗,因?yàn)樗粫?huì)有任何問(wèn)題,實(shí)現(xiàn)代碼如下:
private void refreshArticleUseContact() {
Observable<List<NewsResultEntity>> contactObservable = Observable.concat(
getCacheArticle(500).subscribeOn(Schedulers.io()), getNetworkArticle(2000).subscribeOn(Schedulers.io()));
DisposableObserver<List<NewsResultEntity>> disposableObserver = getArticleObserver();
contactObservable.observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
}
上面這段代碼的運(yùn)行結(jié)果為:
從控制臺(tái)的輸出可以看到棋弥,整個(gè)過(guò)程是先取讀取緩存核偿,等緩存的數(shù)據(jù)讀取完畢之后,才開始請(qǐng)求網(wǎng)絡(luò)顽染,因此整個(gè)過(guò)程的耗時(shí)為兩個(gè)階段的相加漾岳,即
2500ms
。
它的原理圖如下所示:
從原理圖中也驗(yàn)證了我們前面的現(xiàn)象家乘,它會(huì)連接多個(gè)
Observable
蝗羊,并且必須要等到前一個(gè)Observable
的所有數(shù)據(jù)項(xiàng)都發(fā)送完之后,才會(huì)開始下一個(gè)Observable
數(shù)據(jù)的發(fā)送仁锯。
那么耀找,concat
操作符的缺點(diǎn)是什么呢?很明顯业崖,我們白白浪費(fèi)了前面讀取緩存的這段時(shí)間野芒,能不能同時(shí)發(fā)起讀取緩存和網(wǎng)絡(luò)的請(qǐng)求,而不是等到讀取緩存完畢之后双炕,才去請(qǐng)求網(wǎng)絡(luò)呢狞悲?
2.3 使用 concatEager 實(shí)現(xiàn)
為了解決前面沒(méi)有同時(shí)發(fā)起請(qǐng)求的問(wèn)題,我們可以使用concatEager
妇斤,它的使用方法如下:
private void refreshArticleUseConcatEager() {
List<Observable<List<NewsResultEntity>>> observables = new ArrayList<>();
observables.add(getCacheArticle(500).subscribeOn(Schedulers.io()));
observables.add(getNetworkArticle(2000).subscribeOn(Schedulers.io()));
Observable<List<NewsResultEntity>> contactObservable = Observable.concatEager(observables);
DisposableObserver<List<NewsResultEntity>> disposableObserver = getArticleObserver();
contactObservable.observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
}
它和concat
最大的不同就是多個(gè)Observable
可以同時(shí)開始發(fā)射數(shù)據(jù)摇锋,如果后一個(gè)Observable
發(fā)射完成后,前一個(gè)Observable
還有發(fā)射完數(shù)據(jù)站超,那么它會(huì)將后一個(gè)Observable
的數(shù)據(jù)先緩存起來(lái)荸恕,等到前一個(gè)Observable
發(fā)射完畢后,才將緩存的數(shù)據(jù)發(fā)射出去死相。
上面代碼中融求,請(qǐng)求緩存的時(shí)長(zhǎng)改為500ms
,而請(qǐng)求網(wǎng)絡(luò)的時(shí)長(zhǎng)改為2000ms
算撮,運(yùn)行結(jié)果為:
那么這種實(shí)現(xiàn)方式的缺點(diǎn)是什么呢生宛?就是在某些異常情況下县昂,如果讀取緩存的時(shí)間要大于網(wǎng)絡(luò)請(qǐng)求的時(shí)間,那么就會(huì)導(dǎo)致出現(xiàn)“網(wǎng)絡(luò)請(qǐng)求的結(jié)果”等待“讀取緩存”這一過(guò)程完成后才能傳遞給下游陷舅,白白浪費(fèi)了一段時(shí)間倒彰。
我們將請(qǐng)求緩存的時(shí)長(zhǎng)改為2000ms
,而請(qǐng)求網(wǎng)絡(luò)的時(shí)長(zhǎng)改為500ms
蔑赘,查看控制臺(tái)的輸出狸驳,可以驗(yàn)證上面的結(jié)論:
2.4 使用 merge 實(shí)現(xiàn)
下面,我們來(lái)看一下merge
操作符的示例:
private void refreshArticleUseMerge() {
Observable<List<NewsResultEntity>> contactObservable = Observable.merge(
getCacheArticle(500).subscribeOn(Schedulers.io()), getNetworkArticle(2000).subscribeOn(Schedulers.io()));
DisposableObserver<List<NewsResultEntity>> disposableObserver = getArticleObserver();
contactObservable.observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
}
merge
的原理圖如下所示:
它和
concatEager
一樣缩赛,會(huì)讓多個(gè)Observable
同時(shí)開始發(fā)射數(shù)據(jù),但是它不需要Observable
之間的互相等待撰糠,而是直接發(fā)送給下游酥馍。
當(dāng)緩存時(shí)間為500ms
,而請(qǐng)求網(wǎng)絡(luò)時(shí)間為2000ms
時(shí)阅酪,它的結(jié)果為:
在讀取緩存的時(shí)間小于請(qǐng)求網(wǎng)絡(luò)的時(shí)間時(shí)旨袒,這個(gè)操作符能夠很好的工作,但是反之术辐,就會(huì)出現(xiàn)我們先展示了網(wǎng)絡(luò)的數(shù)據(jù)砚尽,然后又被刷新成舊的緩存數(shù)據(jù)。
發(fā)生該異常時(shí)的現(xiàn)象如下所示:
2.5 使用 publish 實(shí)現(xiàn)
使用publish
的實(shí)現(xiàn)如下所示:
private void refreshArticleUsePublish() {
Observable<List<NewsResultEntity>> publishObservable = getNetworkArticle(2000).subscribeOn(Schedulers.io()).publish(new Function<Observable<List<NewsResultEntity>>, ObservableSource<List<NewsResultEntity>>>() {
@Override
public ObservableSource<List<NewsResultEntity>> apply(Observable<List<NewsResultEntity>> network) throws Exception {
return Observable.merge(network, getCacheArticle(500).subscribeOn(Schedulers.io()).takeUntil(network));
}
});
DisposableObserver<List<NewsResultEntity>> disposableObserver = getArticleObserver();
publishObservable.observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
}
這里面一共涉及到了三個(gè)操作符辉词,publish
必孤、merge
和takeUnti
,我們先來(lái)看一下它能否解決我們之前三種方式的缺陷:
- 讀取緩存的時(shí)間為
500ms
瑞躺,請(qǐng)求網(wǎng)絡(luò)的時(shí)間為2000ms
- 讀取緩存的時(shí)間為
2000ms
敷搪,請(qǐng)求網(wǎng)絡(luò)的時(shí)間為500ms
可以看到,在讀取緩存的時(shí)間大于請(qǐng)求網(wǎng)絡(luò)時(shí)間的時(shí)候幢哨,僅僅只會(huì)展示網(wǎng)絡(luò)的數(shù)據(jù)赡勘,顯示效果為:
并且讀取緩存和請(qǐng)求網(wǎng)絡(luò)是同時(shí)發(fā)起的,很好地解決了前面幾種實(shí)現(xiàn)方式的缺陷捞镰。
這里要感謝簡(jiǎn)友 無(wú)心下棋 在評(píng)論里提到的問(wèn)題:如果網(wǎng)絡(luò)請(qǐng)求先返回時(shí)發(fā)生了錯(cuò)誤(例如沒(méi)有網(wǎng)絡(luò)等)導(dǎo)致發(fā)送了onError
事件闸与,從而使得緩存的Observable
也無(wú)法發(fā)送事件,最后界面顯示空白岸售。
針對(duì)這個(gè)問(wèn)題践樱,我們需要對(duì)網(wǎng)絡(luò)的Observable
進(jìn)行優(yōu)化,讓其不將onError
事件傳遞給下游冰评。其中一種解決方式是通過(guò)使用onErrorResume
操作符映胁,它可以接收一個(gè)Func
函數(shù),其形參為網(wǎng)絡(luò)發(fā)送的錯(cuò)誤甲雅,而在上游發(fā)生錯(cuò)誤時(shí)會(huì)回調(diào)該函數(shù)解孙。我們可以根據(jù)錯(cuò)誤的類型來(lái)返回一個(gè)新的Observable
坑填,讓訂閱者鏡像到這個(gè)新的Observable
,并且忽略onError
事件弛姜,從而避免onError
事件導(dǎo)致整個(gè)訂閱關(guān)系的結(jié)束脐瑰。
這里為了避免訂閱者在鏡像到新的Observable
時(shí)會(huì)收到額外的時(shí)間,我們返回一個(gè)Observable.never()
廷臼,它表示一個(gè)永遠(yuǎn)不發(fā)送事件的上游苍在。
private Observable<List<NewsResultEntity>> getNetworkArticle(final long simulateTime) {
return Observable.create(new ObservableOnSubscribe<List<NewsResultEntity>>() {
@Override
public void subscribe(ObservableEmitter<List<NewsResultEntity>> observableEmitter) throws Exception {
try {
Log.d(TAG, "開始加載網(wǎng)絡(luò)數(shù)據(jù)");
Thread.sleep(simulateTime);
List<NewsResultEntity> results = new ArrayList<>();
for (int i = 0; i < 10; i++) {
NewsResultEntity entity = new NewsResultEntity();
entity.setType("網(wǎng)絡(luò)");
entity.setDesc("序號(hào)=" + i);
results.add(entity);
}
//a.正常情況。
//observableEmitter.onNext(results);
//observableEmitter.onComplete();
//b.發(fā)生異常荠商。
observableEmitter.onError(new Throwable("netWork Error"));
Log.d(TAG, "結(jié)束加載網(wǎng)絡(luò)數(shù)據(jù)");
} catch (InterruptedException e) {
if (!observableEmitter.isDisposed()) {
observableEmitter.onError(e);
}
}
}
}).onErrorResumeNext(new Function<Throwable, ObservableSource<? extends List<NewsResultEntity>>>() {
@Override
public ObservableSource<? extends List<NewsResultEntity>> apply(Throwable throwable) throws Exception {
Log.d(TAG, "網(wǎng)絡(luò)請(qǐng)求發(fā)生錯(cuò)誤throwable=" + throwable);
return Observable.never();
}
});
}
當(dāng)發(fā)生錯(cuò)誤時(shí)寂恬,控制臺(tái)的輸出如下,可以看到緩存仍然正常地發(fā)送給了下游:
下面莱没,我們就來(lái)分析一下它的實(shí)現(xiàn)原理初肉。
2.5.1 takeUntil
takeUntil
的原理圖如下所示:
這里,我們給
sourceObservable
通過(guò)takeUntil
傳入了另一個(gè)otherObservable
饰躲,它表示sourceObservable
在otherObservable
發(fā)射數(shù)據(jù)之后牙咏,就不允許再發(fā)射數(shù)據(jù)了,這就剛好滿足了我們前面說(shuō)的“只要網(wǎng)絡(luò)源發(fā)送了數(shù)據(jù)嘹裂,那么緩存源就不應(yīng)再發(fā)射數(shù)據(jù)”妄壶。
之后,我們?cè)儆们懊娼榻B過(guò)的merge
操作符寄狼,讓兩個(gè)緩存源和網(wǎng)絡(luò)源同時(shí)開始工作丁寄,去取數(shù)據(jù)。
2.5.2 publish
但是上面有一點(diǎn)缺陷例嘱,就是調(diào)用merge
和takeUntil
會(huì)發(fā)生兩次訂閱狡逢,這時(shí)候就需要使用publish
操作符,它接收一個(gè)Function
函數(shù)拼卵,該函數(shù)返回一個(gè)Observable
奢浑,該Observable
是對(duì)原Observable
,也就是上面網(wǎng)絡(luò)源的Observable
轉(zhuǎn)換之后的結(jié)果腋腮,該Observable
可以被takeUntil
和merge
操作符所共享雀彼,從而實(shí)現(xiàn)只訂閱一次的效果。
publish
的原理圖如下所示:
更多文章即寡,歡迎訪問(wèn)我的 Android 知識(shí)梳理系列:
- Android 知識(shí)梳理目錄:http://www.reibang.com/p/fd82d18994ce
- 個(gè)人主頁(yè):http://lizejun.cn
- 個(gè)人知識(shí)總結(jié)目錄:http://lizejun.cn/categories/