前言
Rxjava
技羔,由于其基于事件流的鏈?zhǔn)秸{(diào)用瓮顽、邏輯簡(jiǎn)潔 & 使用簡(jiǎn)單的特點(diǎn)霜大,深受各大 Android
開發(fā)者的歡迎。
如果還不了解
RxJava
捡絮,請(qǐng)看文章:Android:這是一篇 清晰 & 易懂的Rxjava 入門教程
-
RxJava
如此受歡迎的原因熬芜,在于其提供了豐富 & 功能強(qiáng)大的操作符,幾乎能完成所有的功能需求 - 今天福稳,我將為大家詳細(xì)介紹
RxJava
操作符中最常用的 功能性操作符涎拉,并附帶 Retrofit 結(jié)合 RxJava的實(shí)例Demo教學(xué),希望你們會(huì)喜歡的圆。
Carson帶你學(xué)RxJava系列文章鼓拧,包括 原理、操作符越妈、應(yīng)用場(chǎng)景季俩、背壓等等,請(qǐng)關(guān)注看文章:Android:這是一份全面 & 詳細(xì)的RxJava學(xué)習(xí)指南
目錄
1. 作用
輔助被觀察者(Observable
) 在發(fā)送事件時(shí)實(shí)現(xiàn)一些功能性需求
如錯(cuò)誤處理叮称、線程調(diào)度等等
2. 類型
-
RxJava 2
中种玛,常見的功能性操作符 主要有:
- 下面,我將對(duì)每個(gè)操作符進(jìn)行詳細(xì)講解
3. 應(yīng)用場(chǎng)景 & 對(duì)應(yīng)操作符詳解
注:在使用RxJava 2
操作符前瓤檐,記得在項(xiàng)目的Gradle
中添加依賴:
dependencies {
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.0.7'
// 注:RxJava2 與 RxJava1 不能共存赂韵,即依賴不能同時(shí)存在
}
3.1 連接被觀察者 & 觀察者
需求場(chǎng)景
即使得被觀察者 & 觀察者 形成訂閱關(guān)系對(duì)應(yīng)操作符
subscribe()
作用
訂閱,即連接觀察者 & 被觀察者具體使用
observable.subscribe(observer);
// 前者 = 被觀察者(observable)挠蛉;后者 = 觀察者(observer 或 subscriber)
<-- 1. 分步驟的完整調(diào)用 -->
// 步驟1: 創(chuàng)建被觀察者 Observable 對(duì)象
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
});
// 步驟2:創(chuàng)建觀察者 Observer 并 定義響應(yīng)事件行為
Observer<Integer> observer = new Observer<Integer>() {
// 通過復(fù)寫對(duì)應(yīng)方法來 響應(yīng) 被觀察者
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開始采用subscribe連接");
}
// 默認(rèn)最先調(diào)用復(fù)寫的 onSubscribe()
@Override
public void onNext(Integer value) {
Log.d(TAG, "對(duì)Next事件"+ value +"作出響應(yīng)" );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對(duì)Error事件作出響應(yīng)");
}
@Override
public void onComplete() {
Log.d(TAG, "對(duì)Complete事件作出響應(yīng)");
}
};
// 步驟3:通過訂閱(subscribe)連接觀察者和被觀察者
observable.subscribe(observer);
<-- 2. 基于事件流的鏈?zhǔn)秸{(diào)用 -->
Observable.create(new ObservableOnSubscribe<Integer>() {
// 1. 創(chuàng)建被觀察者 & 生產(chǎn)事件
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {
// 2. 通過通過訂閱(subscribe)連接觀察者和被觀察者
// 3. 創(chuàng)建觀察者 & 定義響應(yīng)事件的行為
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開始采用subscribe連接");
}
// 默認(rèn)最先調(diào)用復(fù)寫的 onSubscribe()
@Override
public void onNext(Integer value) {
Log.d(TAG, "對(duì)Next事件"+ value +"作出響應(yīng)" );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對(duì)Error事件作出響應(yīng)");
}
@Override
public void onComplete() {
Log.d(TAG, "對(duì)Complete事件作出響應(yīng)");
}
});
}
}
- 測(cè)試結(jié)果
- 擴(kuò)展說明
<-- Observable.subscribe(Subscriber) 的內(nèi)部實(shí)現(xiàn) -->
public Subscription subscribe(Subscriber subscriber) {
subscriber.onStart();
// 在觀察者 subscriber抽象類復(fù)寫的方法 onSubscribe.call(subscriber)祭示,用于初始化工作
// 通過該調(diào)用,從而回調(diào)觀察者中的對(duì)應(yīng)方法從而響應(yīng)被觀察者生產(chǎn)的事件
// 從而實(shí)現(xiàn)被觀察者調(diào)用了觀察者的回調(diào)方法 & 由被觀察者向觀察者的事件傳遞谴古,即觀察者模式
// 同時(shí)也看出:Observable只是生產(chǎn)事件质涛,真正的發(fā)送事件是在它被訂閱的時(shí)候,即當(dāng) subscribe() 方法執(zhí)行時(shí)
}
3.2 線程調(diào)度
- 需求場(chǎng)景
快速掰担、方便指定 & 控制被觀察者 & 觀察者 的工作線程
- 對(duì)應(yīng)操作符使用
由于該部分內(nèi)容較多 & 重要汇陆,所以已獨(dú)立一篇文章,請(qǐng)看文章:Android RxJava:細(xì)說 線程控制(切換 / 調(diào)度 )(含Retrofit實(shí)例講解)
3.3 延遲操作
需求場(chǎng)景
即在被觀察者發(fā)送事件前進(jìn)行一些延遲的操作對(duì)應(yīng)操作符使用
delay()
作用
使得被觀察者延遲一段時(shí)間再發(fā)送事件方法介紹
delay()
具備多個(gè)重載方法带饱,具體如下:
// 1. 指定延遲時(shí)間
// 參數(shù)1 = 時(shí)間毡代;參數(shù)2 = 時(shí)間單位
delay(long delay,TimeUnit unit)
// 2. 指定延遲時(shí)間 & 調(diào)度器
// 參數(shù)1 = 時(shí)間;參數(shù)2 = 時(shí)間單位勺疼;參數(shù)3 = 線程調(diào)度器
delay(long delay,TimeUnit unit,mScheduler scheduler)
// 3. 指定延遲時(shí)間 & 錯(cuò)誤延遲
// 錯(cuò)誤延遲教寂,即:若存在Error事件,則如常執(zhí)行执庐,執(zhí)行后再拋出錯(cuò)誤異常
// 參數(shù)1 = 時(shí)間酪耕;參數(shù)2 = 時(shí)間單位;參數(shù)3 = 錯(cuò)誤延遲參數(shù)
delay(long delay,TimeUnit unit,boolean delayError)
// 4. 指定延遲時(shí)間 & 調(diào)度器 & 錯(cuò)誤延遲
// 參數(shù)1 = 時(shí)間轨淌;參數(shù)2 = 時(shí)間單位迂烁;參數(shù)3 = 線程調(diào)度器看尼;參數(shù)4 = 錯(cuò)誤延遲參數(shù)
delay(long delay,TimeUnit unit,mScheduler scheduler,boolean delayError): 指定延遲多長(zhǎng)時(shí)間并添加調(diào)度器,錯(cuò)誤通知可以設(shè)置是否延遲
- 具體使用
Observable.just(1, 2, 3)
.delay(3, TimeUnit.SECONDS) // 延遲3s再發(fā)送婚被,由于使用類似狡忙,所以此處不作全部展示
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對(duì)Error事件作出響應(yīng)");
}
@Override
public void onComplete() {
Log.d(TAG, "對(duì)Complete事件作出響應(yīng)");
}
});
- 測(cè)試結(jié)果
3.4 在事件的生命周期中操作
- 需求場(chǎng)景
在事件發(fā)送 & 接收的整個(gè)生命周期過程中進(jìn)行操作
如發(fā)送事件前的初始化、發(fā)送事件后的回調(diào)請(qǐng)求等
- 對(duì)應(yīng)操作符使用
do()
- 作用
在某個(gè)事件的生命周期中調(diào)用 - 類型
do()
操作符有很多個(gè)址芯,具體如下:
- 具體使用
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onError(new Throwable("發(fā)生錯(cuò)誤了"));
}
})
// 1. 當(dāng)Observable每發(fā)送1次數(shù)據(jù)事件就會(huì)調(diào)用1次
.doOnEach(new Consumer<Notification<Integer>>() {
@Override
public void accept(Notification<Integer> integerNotification) throws Exception {
Log.d(TAG, "doOnEach: " + integerNotification.getValue());
}
})
// 2. 執(zhí)行Next事件前調(diào)用
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "doOnNext: " + integer);
}
})
// 3. 執(zhí)行Next事件后調(diào)用
.doAfterNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "doAfterNext: " + integer);
}
})
// 4. Observable正常發(fā)送事件完畢后調(diào)用
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
Log.e(TAG, "doOnComplete: ");
}
})
// 5. Observable發(fā)送錯(cuò)誤事件時(shí)調(diào)用
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d(TAG, "doOnError: " + throwable.getMessage());
}
})
// 6. 觀察者訂閱時(shí)調(diào)用
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
Log.e(TAG, "doOnSubscribe: ");
}
})
// 7. Observable發(fā)送事件完畢后調(diào)用灾茁,無論正常發(fā)送完畢 / 異常終止
.doAfterTerminate(new Action() {
@Override
public void run() throws Exception {
Log.e(TAG, "doAfterTerminate: ");
}
})
// 8. 最后執(zhí)行
.doFinally(new Action() {
@Override
public void run() throws Exception {
Log.e(TAG, "doFinally: ");
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對(duì)Error事件作出響應(yīng)");
}
@Override
public void onComplete() {
Log.d(TAG, "對(duì)Complete事件作出響應(yīng)");
}
});
- 測(cè)試結(jié)果
3.5 錯(cuò)誤處理
需求場(chǎng)景
發(fā)送事件過程中,遇到錯(cuò)誤時(shí)的處理機(jī)制對(duì)應(yīng)操作符類型
- 對(duì)應(yīng)操作符使用
onErrorReturn()
- 作用
遇到錯(cuò)誤時(shí)谷炸,發(fā)送1個(gè)特殊事件 & 正常終止
可捕獲在它之前發(fā)生的異常
- 具體使用
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new Throwable("發(fā)生錯(cuò)誤了"));
}
})
.onErrorReturn(new Function<Throwable, Integer>() {
@Override
public Integer apply(@NonNull Throwable throwable) throws Exception {
// 捕捉錯(cuò)誤異常
Log.e(TAG, "在onErrorReturn處理了錯(cuò)誤: "+throwable.toString() );
return 666;
// 發(fā)生錯(cuò)誤事件后北专,發(fā)送一個(gè)"666"事件,最終正常結(jié)束
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對(duì)Error事件作出響應(yīng)");
}
@Override
public void onComplete() {
Log.d(TAG, "對(duì)Complete事件作出響應(yīng)");
}
});
- 測(cè)試結(jié)果
onErrorResumeNext()
- 作用
遇到錯(cuò)誤時(shí)旬陡,發(fā)送1個(gè)新的Observable
注:
onErrorResumeNext()
攔截的錯(cuò)誤 =Throwable
拓颓;若需攔截Exception
請(qǐng)用onExceptionResumeNext()
- 若
onErrorResumeNext()
攔截的錯(cuò)誤 =Exception
,則會(huì)將錯(cuò)誤傳遞給觀察者的onError
方法
- 具體使用
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new Throwable("發(fā)生錯(cuò)誤了"));
}
})
.onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {
@Override
public ObservableSource<? extends Integer> apply(@NonNull Throwable throwable) throws Exception {
// 1. 捕捉錯(cuò)誤異常
Log.e(TAG, "在onErrorReturn處理了錯(cuò)誤: "+throwable.toString() );
// 2. 發(fā)生錯(cuò)誤事件后描孟,發(fā)送一個(gè)新的被觀察者 & 發(fā)送事件序列
return Observable.just(11,22);
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對(duì)Error事件作出響應(yīng)");
}
@Override
public void onComplete() {
Log.d(TAG, "對(duì)Complete事件作出響應(yīng)");
}
});
- 測(cè)試結(jié)果
onExceptionResumeNext()
- 作用
遇到錯(cuò)誤時(shí)驶睦,發(fā)送1個(gè)新的Observable
注:
onExceptionResumeNext()
攔截的錯(cuò)誤 =Exception
;若需攔截Throwable
請(qǐng)用onErrorResumeNext()
- 若
onExceptionResumeNext()
攔截的錯(cuò)誤 =Throwable
匿醒,則會(huì)將錯(cuò)誤傳遞給觀察者的onError
方法
- 具體使用
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new Exception("發(fā)生錯(cuò)誤了"));
}
})
.onExceptionResumeNext(new Observable<Integer>() {
@Override
protected void subscribeActual(Observer<? super Integer> observer) {
observer.onNext(11);
observer.onNext(22);
observer.onComplete();
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對(duì)Error事件作出響應(yīng)");
}
@Override
public void onComplete() {
Log.d(TAG, "對(duì)Complete事件作出響應(yīng)");
}
});
- 測(cè)試結(jié)果
retry()
- 作用
重試场航,即當(dāng)出現(xiàn)錯(cuò)誤時(shí),讓被觀察者(Observable
)重新發(fā)射數(shù)據(jù)
- 接收到 onError()時(shí)廉羔,重新訂閱 & 發(fā)送事件
Throwable
和Exception
都可攔截
- 類型
共有5種重載方法
<-- 1. retry() -->
// 作用:出現(xiàn)錯(cuò)誤時(shí)溉痢,讓被觀察者重新發(fā)送數(shù)據(jù)
// 注:若一直錯(cuò)誤,則一直重新發(fā)送
<-- 2. retry(long time) -->
// 作用:出現(xiàn)錯(cuò)誤時(shí)憋他,讓被觀察者重新發(fā)送數(shù)據(jù)(具備重試次數(shù)限制
// 參數(shù) = 重試次數(shù)
<-- 3. retry(Predicate predicate) -->
// 作用:出現(xiàn)錯(cuò)誤后孩饼,判斷是否需要重新發(fā)送數(shù)據(jù)(若需要重新發(fā)送& 持續(xù)遇到錯(cuò)誤,則持續(xù)重試)
// 參數(shù) = 判斷邏輯
<-- 4. retry(new BiPredicate<Integer, Throwable>) -->
// 作用:出現(xiàn)錯(cuò)誤后竹挡,判斷是否需要重新發(fā)送數(shù)據(jù)(若需要重新發(fā)送 & 持續(xù)遇到錯(cuò)誤镀娶,則持續(xù)重試
// 參數(shù) = 判斷邏輯(傳入當(dāng)前重試次數(shù) & 異常錯(cuò)誤信息)
<-- 5. retry(long time,Predicate predicate) -->
// 作用:出現(xiàn)錯(cuò)誤后,判斷是否需要重新發(fā)送數(shù)據(jù)(具備重試次數(shù)限制
// 參數(shù) = 設(shè)置重試次數(shù) & 判斷邏輯
- 具體使用
<-- 1. retry() -->
// 作用:出現(xiàn)錯(cuò)誤時(shí)揪罕,讓被觀察者重新發(fā)送數(shù)據(jù)
// 注:若一直錯(cuò)誤梯码,則一直重新發(fā)送
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new Exception("發(fā)生錯(cuò)誤了"));
e.onNext(3);
}
})
.retry() // 遇到錯(cuò)誤時(shí),讓被觀察者重新發(fā)射數(shù)據(jù)(若一直錯(cuò)誤耸序,則一直重新發(fā)送
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對(duì)Error事件作出響應(yīng)");
}
@Override
public void onComplete() {
Log.d(TAG, "對(duì)Complete事件作出響應(yīng)");
}
});
<-- 2. retry(long time) -->
// 作用:出現(xiàn)錯(cuò)誤時(shí),讓被觀察者重新發(fā)送數(shù)據(jù)(具備重試次數(shù)限制
// 參數(shù) = 重試次數(shù)
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new Exception("發(fā)生錯(cuò)誤了"));
e.onNext(3);
}
})
.retry(3) // 設(shè)置重試次數(shù) = 3次
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對(duì)Error事件作出響應(yīng)");
}
@Override
public void onComplete() {
Log.d(TAG, "對(duì)Complete事件作出響應(yīng)");
}
});
<-- 3. retry(Predicate predicate) -->
// 作用:出現(xiàn)錯(cuò)誤后鲁猩,判斷是否需要重新發(fā)送數(shù)據(jù)(若需要重新發(fā)送& 持續(xù)遇到錯(cuò)誤坎怪,則持續(xù)重試)
// 參數(shù) = 判斷邏輯
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new Exception("發(fā)生錯(cuò)誤了"));
e.onNext(3);
}
})
// 攔截錯(cuò)誤后,判斷是否需要重新發(fā)送請(qǐng)求
.retry(new Predicate<Throwable>() {
@Override
public boolean test(@NonNull Throwable throwable) throws Exception {
// 捕獲異常
Log.e(TAG, "retry錯(cuò)誤: "+throwable.toString());
//返回false = 不重新重新發(fā)送數(shù)據(jù) & 調(diào)用觀察者的onError結(jié)束
//返回true = 重新發(fā)送請(qǐng)求(若持續(xù)遇到錯(cuò)誤廓握,就持續(xù)重新發(fā)送)
return true;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對(duì)Error事件作出響應(yīng)");
}
@Override
public void onComplete() {
Log.d(TAG, "對(duì)Complete事件作出響應(yīng)");
}
});
<-- 4. retry(new BiPredicate<Integer, Throwable>) -->
// 作用:出現(xiàn)錯(cuò)誤后搅窿,判斷是否需要重新發(fā)送數(shù)據(jù)(若需要重新發(fā)送 & 持續(xù)遇到錯(cuò)誤嘁酿,則持續(xù)重試
// 參數(shù) = 判斷邏輯(傳入當(dāng)前重試次數(shù) & 異常錯(cuò)誤信息)
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new Exception("發(fā)生錯(cuò)誤了"));
e.onNext(3);
}
})
// 攔截錯(cuò)誤后,判斷是否需要重新發(fā)送請(qǐng)求
.retry(new BiPredicate<Integer, Throwable>() {
@Override
public boolean test(@NonNull Integer integer, @NonNull Throwable throwable) throws Exception {
// 捕獲異常
Log.e(TAG, "異常錯(cuò)誤 = "+throwable.toString());
// 獲取當(dāng)前重試次數(shù)
Log.e(TAG, "當(dāng)前重試次數(shù) = "+integer);
//返回false = 不重新重新發(fā)送數(shù)據(jù) & 調(diào)用觀察者的onError結(jié)束
//返回true = 重新發(fā)送請(qǐng)求(若持續(xù)遇到錯(cuò)誤男应,就持續(xù)重新發(fā)送)
return true;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對(duì)Error事件作出響應(yīng)");
}
@Override
public void onComplete() {
Log.d(TAG, "對(duì)Complete事件作出響應(yīng)");
}
});
<-- 5. retry(long time,Predicate predicate) -->
// 作用:出現(xiàn)錯(cuò)誤后闹司,判斷是否需要重新發(fā)送數(shù)據(jù)(具備重試次數(shù)限制
// 參數(shù) = 設(shè)置重試次數(shù) & 判斷邏輯
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new Exception("發(fā)生錯(cuò)誤了"));
e.onNext(3);
}
})
// 攔截錯(cuò)誤后,判斷是否需要重新發(fā)送請(qǐng)求
.retry(3, new Predicate<Throwable>() {
@Override
public boolean test(@NonNull Throwable throwable) throws Exception {
// 捕獲異常
Log.e(TAG, "retry錯(cuò)誤: "+throwable.toString());
//返回false = 不重新重新發(fā)送數(shù)據(jù) & 調(diào)用觀察者的onError()結(jié)束
//返回true = 重新發(fā)送請(qǐng)求(最多重新發(fā)送3次)
return true;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對(duì)Error事件作出響應(yīng)");
}
@Override
public void onComplete() {
Log.d(TAG, "對(duì)Complete事件作出響應(yīng)");
}
});
retryUntil()
- 作用
出現(xiàn)錯(cuò)誤后沐飘,判斷是否需要重新發(fā)送數(shù)據(jù)
- 若需要重新發(fā)送 & 持續(xù)遇到錯(cuò)誤游桩,則持續(xù)重試
- 作用類似于
retry(Predicate predicate)
- 具體使用
具體使用類似于retry(Predicate predicate)
,唯一區(qū)別:返回true
則不重新發(fā)送數(shù)據(jù)事件耐朴。此處不作過多描述
retryWhen()
- 作用
遇到錯(cuò)誤時(shí)借卧,將發(fā)生的錯(cuò)誤傳遞給一個(gè)新的被觀察者(Observable
),并決定是否需要重新訂閱原始被觀察者(Observable
)& 發(fā)送事件
- 具體使用
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new Exception("發(fā)生錯(cuò)誤了"));
e.onNext(3);
}
})
// 遇到error事件才會(huì)回調(diào)
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull Observable<Throwable> throwableObservable) throws Exception {
// 參數(shù)Observable<Throwable>中的泛型 = 上游操作符拋出的異常筛峭,可通過該條件來判斷異常的類型
// 返回Observable<?> = 新的被觀察者 Observable(任意類型)
// 此處有兩種情況:
// 1. 若 新的被觀察者 Observable發(fā)送的事件 = Error事件铐刘,那么 原始Observable則不重新發(fā)送事件:
// 2. 若 新的被觀察者 Observable發(fā)送的事件 = Next事件 ,那么原始的Observable則重新發(fā)送事件:
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull Throwable throwable) throws Exception {
// 1. 若返回的Observable發(fā)送的事件 = Error事件影晓,則原始的Observable不重新發(fā)送事件
// 該異常錯(cuò)誤信息可在觀察者中的onError()中獲得
return Observable.error(new Throwable("retryWhen終止啦"));
// 2. 若返回的Observable發(fā)送的事件 = Next事件镰吵,則原始的Observable重新發(fā)送事件(若持續(xù)遇到錯(cuò)誤,則持續(xù)重試)
// return Observable.just(1);
}
});
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對(duì)Error事件作出響應(yīng)" + e.toString());
// 獲取異常錯(cuò)誤信息
}
@Override
public void onComplete() {
Log.d(TAG, "對(duì)Complete事件作出響應(yīng)");
}
});
- 測(cè)試結(jié)果
3.6 重復(fù)發(fā)送
需求場(chǎng)景
重復(fù)不斷地發(fā)送被觀察者事件對(duì)應(yīng)操作符類型
repeat()
&repeatWhen()
repeat()
- 作用
無條件地挂签、重復(fù)發(fā)送 被觀察者事件
具備重載方法疤祭,可設(shè)置重復(fù)創(chuàng)建次數(shù)
- 具體使用
// 不傳入?yún)?shù) = 重復(fù)發(fā)送次數(shù) = 無限次
repeat();
// 傳入?yún)?shù) = 重復(fù)發(fā)送次數(shù)有限
repeatWhen(Integer int )竹握;
// 注:
// 1. 接收到.onCompleted()事件后画株,觸發(fā)重新訂閱 & 發(fā)送
// 2. 默認(rèn)運(yùn)行在一個(gè)新的線程上
// 具體使用
Observable.just(1, 2, 3, 4)
.repeat(3) // 重復(fù)創(chuàng)建次數(shù) =- 3次
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開始采用subscribe連接");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對(duì)Error事件作出響應(yīng)");
}
@Override
public void onComplete() {
Log.d(TAG, "對(duì)Complete事件作出響應(yīng)");
}
});
- 測(cè)試結(jié)果
repeatWhen()
作用
有條件地、重復(fù)發(fā)送 被觀察者事件原理
將原始Observable
停止發(fā)送事件的標(biāo)識(shí)(Complete()
/Error()
)轉(zhuǎn)換成1個(gè)Object
類型數(shù)據(jù)傳遞給1個(gè)新被觀察者(Observable
)啦辐,以此決定是否重新訂閱 & 發(fā)送原來的Observable
- 若新被觀察者(
Observable
)返回1個(gè)Complete
/Error
事件谓传,則不重新訂閱 & 發(fā)送原來的Observable
- 若新被觀察者(
Observable
)返回其余事件時(shí),則重新訂閱 & 發(fā)送原來的Observable
- 具體使用
Observable.just(1,2,4).repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
@Override
// 在Function函數(shù)中芹关,必須對(duì)輸入的 Observable<Object>進(jìn)行處理续挟,這里我們使用的是flatMap操作符接收上游的數(shù)據(jù)
public ObservableSource<?> apply(@NonNull Observable<Object> objectObservable) throws Exception {
// 將原始 Observable 停止發(fā)送事件的標(biāo)識(shí)(Complete() / Error())轉(zhuǎn)換成1個(gè) Object 類型數(shù)據(jù)傳遞給1個(gè)新被觀察者(Observable)
// 以此決定是否重新訂閱 & 發(fā)送原來的 Observable
// 此處有2種情況:
// 1. 若新被觀察者(Observable)返回1個(gè)Complete() / Error()事件,則不重新訂閱 & 發(fā)送原來的 Observable
// 2. 若新被觀察者(Observable)返回其余事件侥衬,則重新訂閱 & 發(fā)送原來的 Observable
return objectObservable.flatMap(new Function<Object, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull Object throwable) throws Exception {
// 情況1:若新被觀察者(Observable)返回1個(gè)Complete() / Error()事件诗祸,則不重新訂閱 & 發(fā)送原來的 Observable
return Observable.empty();
// Observable.empty() = 發(fā)送Complete事件,但不會(huì)回調(diào)觀察者的onComplete()
// return Observable.error(new Throwable("不再重新訂閱事件"));
// 返回Error事件 = 回調(diào)onError()事件轴总,并接收傳過去的錯(cuò)誤信息直颅。
// 情況2:若新被觀察者(Observable)返回其余事件,則重新訂閱 & 發(fā)送原來的 Observable
// return Observable.just(1);
// 僅僅是作為1個(gè)觸發(fā)重新訂閱被觀察者的通知怀樟,發(fā)送的是什么數(shù)據(jù)并不重要功偿,只要不是Complete() / Error()事件
}
});
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開始采用subscribe連接");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對(duì)Error事件作出響應(yīng):" + e.toString());
}
@Override
public void onComplete() {
Log.d(TAG, "對(duì)Complete事件作出響應(yīng)");
}
});
- 測(cè)試結(jié)果
至此,RxJava 2
中的功能性操作符講解完畢往堡。
4. 實(shí)際開發(fā)需求案例
- 下面械荷,我將 結(jié)合
Retrofit
&RxJava
共耍,講解功能性操作符的3個(gè)實(shí)際需求案例場(chǎng)景:- 線程操作(切換 / 調(diào)度 / 控制 )
- 輪詢
- 發(fā)送網(wǎng)絡(luò)請(qǐng)求時(shí)的差錯(cuò)重試機(jī)制
4.1 線程控制(切換 / 調(diào)度 )
- 即,新開工作線程執(zhí)行耗時(shí)操作吨瞎;待執(zhí)行完畢后痹兜,切換到主線程實(shí)時(shí)更新
UI
- 具體請(qǐng)看文章:Android RxJava:細(xì)說 線程控制(切換 / 調(diào)度 )(含Retrofit實(shí)例講解)
4.2 輪詢
- 需求場(chǎng)景說明
- 下面,我將結(jié)合
Retrofit
與RxJava
用一個(gè)具體實(shí)例來實(shí)現(xiàn)輪詢需求 - 具體請(qǐng)看文章:Android RxJava 實(shí)際應(yīng)用講解:(有條件)網(wǎng)絡(luò)請(qǐng)求輪詢
4.3 發(fā)送網(wǎng)絡(luò)請(qǐng)求時(shí)的差錯(cuò)重試機(jī)制
-
需求場(chǎng)景說明
示意圖 -
功能說明
示意圖 下面我將結(jié)合
Retrofit
與RxJava
用一個(gè)具體實(shí)例來實(shí)現(xiàn) 發(fā)送網(wǎng)絡(luò)請(qǐng)求時(shí)的 差錯(cuò)重試機(jī)制需求具體請(qǐng)看文章:Android RxJava 實(shí)際應(yīng)用講解:網(wǎng)絡(luò)請(qǐng)求出錯(cuò)重連(結(jié)合Retrofit)
5. Demo地址
上述所有的Demo源代碼都存放在:Carson_Ho的Github地址:RxJava2_功能性操作符
6. 總結(jié)
- 下面颤诀,我將用一張圖總結(jié)
RxJava2
中常用的功能性操作符
- Carson帶你學(xué)RxJava系列文章:
入門
Carson帶你學(xué)Android:這是一篇清晰易懂的Rxjava入門教程
Carson帶你學(xué)Android:面向初學(xué)者的RxJava使用指南
Carson帶你學(xué)Android:RxJava2.0到底更新了什么字旭?
原理
Carson帶你學(xué)Android:圖文解析RxJava原理
Carson帶你學(xué)Android:手把手帶你源碼分析RxJava
使用教程:操作符
Carson帶你學(xué)Android:RxJava操作符教程
Carson帶你學(xué)Android:RxJava創(chuàng)建操作符
Carson帶你學(xué)Android:RxJava功能性操作符
Carson帶你學(xué)Android:RxJava過濾操作符
Carson帶你學(xué)Android:RxJava組合/合并操作符
Carson帶你學(xué)Android:RxJava變換操作符
Carson帶你學(xué)Android:RxJava條件/布爾操作符
實(shí)戰(zhàn)
Carson帶你學(xué)Android:什么時(shí)候應(yīng)該使用Rxjava?(開發(fā)場(chǎng)景匯總)
Carson帶你學(xué)Android:RxJava線程控制(含實(shí)例講解)
Carson帶你學(xué)Android:圖文詳解RxJava背壓策略
Carson帶你學(xué)Android:RxJava着绊、Retrofit聯(lián)合使用匯總(含實(shí)例教程)
Carson帶你學(xué)Android:優(yōu)雅實(shí)現(xiàn)網(wǎng)絡(luò)請(qǐng)求嵌套回調(diào)
Carson帶你學(xué)Android:網(wǎng)絡(luò)請(qǐng)求輪詢(有條件)
Carson帶你學(xué)Android:網(wǎng)絡(luò)請(qǐng)求輪詢(無條件)
Carson帶你學(xué)Android:網(wǎng)絡(luò)請(qǐng)求出錯(cuò)重連(結(jié)合Retrofit)
Carson帶你學(xué)Android:合并數(shù)據(jù)源
Carson帶你學(xué)Android:聯(lián)想搜索優(yōu)化
Carson帶你學(xué)Android:功能防抖
Carson帶你學(xué)Android:從磁盤/內(nèi)存緩存中獲取緩存數(shù)據(jù)
Carson帶你學(xué)Android:聯(lián)合判斷
歡迎關(guān)注Carson_Ho的簡(jiǎn)書
不定期分享關(guān)于安卓開發(fā)的干貨谐算,追求短、平归露、快洲脂,但卻不缺深度。