眾所周知嚣镜,RxJava2 中當(dāng)鏈?zhǔn)秸{(diào)用中拋出異常時(shí)漓库,如果沒有對(duì)應(yīng)的 Consumer 去處理異常吧黄,則這個(gè)異常會(huì)被拋出到虛擬機(jī)中去柬帕,Android 上的直接表現(xiàn)就是 crash,程序崩潰隶垮。
訂閱方式
說異常處理前咱們先來看一下 RxJava2 中 Observable
訂閱方法 subscribe()
我們常用的幾種訂閱方式:
// 1
subscribe()
// 2
Disposable subscribe(Consumer<? super T> onNext)
// 3
Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)
// 4
Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete)
// 5
Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete, Consumer<? super Disposable> onSubscribe)
// 6
void subscribe(Observer<? super T> observer)
無參和以 Consumer
為參數(shù)的幾種方法內(nèi)部都是以默認(rèn)參數(shù)補(bǔ)齊的方式最終調(diào)用第 5
個(gè)方法藻雪,而方法 5
內(nèi)部通過 LambdaObserver 將參數(shù)包裝成 Observer 再調(diào)用第 6
個(gè)方法
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
return ls;
}
所以使用 Consumer
參數(shù)方式和 Observer
參數(shù)方式進(jìn)行訂閱除了觀察回調(diào)來源不一樣其他沒有任何差別秘噪。但就是因?yàn)檫@種差別狸吞,在異常情況發(fā)生時(shí)的處理結(jié)果上也會(huì)產(chǎn)生差別
異常處理
我們分別進(jìn)行一下幾種方式模擬異常:
- 1、Observer onNext 中拋出異常(切換線程)
apiService.newJsonKeyData()
.doOnSubscribe { t -> compositeDisposable.add(t) }
.compose(RxScheduler.sync()) // 封裝的線程切換
.subscribe(object : Observer<List<ZooData>> {
override fun onComplete() {
}
override fun onSubscribe(d: Disposable) {
}
override fun onNext(t: List<ZooData>) {
throw RuntimeException("runtime exception")
}
override fun onError(e: Throwable) {
Log.d("error", e.message)
}
})
結(jié)果:不會(huì)觸發(fā) onError指煎,App 崩潰
- 2蹋偏、Observer onNext 中拋出異常(未切換線程)
Observable.create<String> {
it.onNext("ssss")
}
.subscribe(object : Observer<String> {
override fun onComplete() {
}
override fun onSubscribe(d: Disposable) {
}
override fun onNext(t: String) {
Log.d("result::", t)
throw RuntimeException("run llllll")
}
override fun onError(e: Throwable) {
Log.e("sss", "sss", e)
}
})
結(jié)果:會(huì)觸發(fā) onError,App 未崩潰
- 3至壤、Observer map 操作符中拋出異常
apiService.newJsonKeyData()
.doOnSubscribe { t -> compositeDisposable.add(t) }
.map {
throw RuntimeException("runtime exception")
}
.compose(RxScheduler.sync())
.subscribe(object : Observer<List<ZooData>> {
override fun onComplete() {
}
override fun onSubscribe(d: Disposable) {
}
override fun onNext(t: List<ZooData>) {
}
override fun onError(e: Throwable) {
Log.d("error", e.message)
}
})
結(jié)果:會(huì)觸發(fā) Observer 的 onError威始,App 未崩潰
- 4、Consumer onNext 中拋出異常
apiService.newJsonKeyData()
.doOnSubscribe { t -> compositeDisposable.add(t) }
.compose(RxScheduler.sync())
.subscribe({
throw RuntimeException("messsasassssssssssssssssssssssssssssssssssssss")
}, {
Log.d("Error", it.message)
})
結(jié)果 A:有 errorConsumer 觸發(fā) errorConsumer像街,App 未崩潰
apiService.newJsonKeyData()
.doOnSubscribe { t -> compositeDisposable.add(t) }
.compose(RxScheduler.sync())
.subscribe {
throw RuntimeException("messsasassssssssssssssssssssssssssssssssssssss")
}
結(jié)果 B:無 errorConsumer黎棠,App 崩潰
那么為什么會(huì)出現(xiàn)這些不同情況呢?我們從源碼中去一探究竟镰绎。
Consumer 訂閱方式的崩潰與不崩潰
subscribe()
傳入 consumer 類型參數(shù)最終在 Observable
中會(huì)將傳入的參數(shù)轉(zhuǎn)換為 LambdaObserver
再調(diào)用 subscribe(lambdaObserver)
進(jìn)行訂閱脓斩。展開 LambdaObserver
:(主要看 onNext 和 onError 方法中的處理)
.
.
.
@Override
public void onNext(T t) {
if (!isDisposed()) {
try {
onNext.accept(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
get().dispose();
onError(e);
}
}
}
@Override
public void onError(Throwable t) {
if (!isDisposed()) {
lazySet(DisposableHelper.DISPOSED);
try {
onError.accept(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(new CompositeException(t, e));
}
} else {
RxJavaPlugins.onError(t);
}
}
.
.
.
onNext
中調(diào)用了對(duì)應(yīng) consumer 的 apply()
方法,并且進(jìn)行了 try catch畴栖。因此我們?cè)?consumer 中進(jìn)行的工作拋出異常會(huì)被捕獲觸發(fā) LambdaObserver 的 onError
随静。再看 onError
中,如果訂閱未取消且 errorConsumer 的 apply()
執(zhí)行無異常則能正常走完事件流,否則會(huì)調(diào)用 RxJavaPlugins.onError(t)
燎猛×道Γ看到這里應(yīng)該就能明白了,當(dāng)訂閱時(shí)未傳入 errorConsumer時(shí) Observable
會(huì)指定 OnErrorMissingConsumer
為默認(rèn)的 errorConsumer重绷,發(fā)生異常時(shí)拋出 OnErrorNotImplementedException
沸停。
RxJavaPlugins.onError(t)
上面分析,發(fā)現(xiàn)異常最終會(huì)流向 RxJavaPlugins.onError(t)昭卓。這個(gè)方法為 RxJava2 提供的一個(gè)全局的靜態(tài)方法星立。
public static void onError(@NonNull Throwable error) {
Consumer<? super Throwable> f = errorHandler;
if (error == null) {
error = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
} else {
if (!isBug(error)) {
error = new UndeliverableException(error);
}
}
if (f != null) {
try {
f.accept(error);
return;
} catch (Throwable e) {
// Exceptions.throwIfFatal(e); TODO decide
e.printStackTrace(); // NOPMD
uncaught(e);
}
}
error.printStackTrace(); // NOPMD
uncaught(error);
}
查看其源碼發(fā)現(xiàn),當(dāng) errorHandler
不為空時(shí)異常將由其消耗掉葬凳,為空或者消耗過程產(chǎn)生新的異常則 RxJava 會(huì)將異常拋給虛擬機(jī)(可能導(dǎo)致程序崩潰)绰垂。 errorHandler
本身是一個(gè) Consumer 對(duì)象,我們可以通過如下方式配置他:
RxJavaPlugins.setErrorHandler(object : Consumer1<Throwable> {
override fun accept(t: Throwable?) {
TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
}
})
數(shù)據(jù)操作符中拋出異常
以 map 操作符為例火焰,map 操作符實(shí)際上 RxJava 是將事件流 hook 了另一個(gè)新的 Observable ObservableMap
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
進(jìn)入 ObservableMap 類劲装,發(fā)現(xiàn)內(nèi)部訂閱了一個(gè)內(nèi)部靜態(tài)類 MapObserver
,重點(diǎn)看 MapObserver
的 onNext
方法
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
downstream.onNext(v);
}
onNext
中 try catch 了 mapper.apply()昌简,這個(gè) apply 執(zhí)行的就是我們?cè)诓僮鞣袑?shí)現(xiàn)的 function
方法占业。因此在 map 之類數(shù)據(jù)變換操作符中產(chǎn)生異常能夠自身捕獲并發(fā)送給最終的 Observer。如果此時(shí)的訂閱對(duì)象中能消耗掉異常則事件流正常走 onError()
結(jié)束,如果訂閱方式為上以節(jié)中的 consumer纯赎,則崩潰情況為上一節(jié)中的分析結(jié)果谦疾。
Observer 的 onNext 中拋出異常
上述的方式 1
為一次網(wǎng)絡(luò)請(qǐng)求,里面涉及到線程的切換犬金。方式 2
為直接 create 一個(gè) Observable
對(duì)象念恍,不涉及線程切換,其結(jié)果為線程切換后,觀察者 Observer 的 onNext() 方法中拋出異常無法觸發(fā) onError()晚顷,程序崩潰峰伙。
未切換線程的 Observable.create
查看 create()
方法源碼,發(fā)現(xiàn)內(nèi)部創(chuàng)建了一個(gè) ObservableCreate
對(duì)象该默,在調(diào)用訂閱時(shí)會(huì)觸發(fā) subscribeActual()
方法瞳氓。在 subscribeActual()
中再調(diào)用我們 create 時(shí)傳入的 ObservableOnSubscribe
對(duì)象的 subscribe()
方法來觸發(fā)事件流。
@Override
protected void subscribeActual(Observer<? super T> observer) {
// 對(duì)我們的觀察者使用 CreateEmitter 進(jìn)行包裝,內(nèi)部的觸發(fā)方法是相對(duì)應(yīng)的
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
// source 為 create 時(shí)創(chuàng)建的 ObservableOnSubscribe 匿名內(nèi)部接口實(shí)現(xiàn)類
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
上述代碼中的訂閱過程是使用 try catch 今夕包裹的栓袖。訂閱及訂閱觸發(fā)后發(fā)送的事件流都在一個(gè)線程匣摘,所以能夠捕獲整個(gè)事件流中的異常。(PS : 大家可以嘗試下使用 observeOn() 切換事件發(fā)送線程裹刮。會(huì)發(fā)現(xiàn)異常不能再捕獲音榜,程序崩潰)
涉及線程變換時(shí)的異常處理
Retrofit 進(jìn)行網(wǎng)絡(luò)請(qǐng)求返回的 Observable 對(duì)象實(shí)質(zhì)上是 RxJava2CallAdapter
中生成的 BodyObservable
,期內(nèi)部的 onNext
是沒有進(jìn)行異常捕獲的。其實(shí)這里是否捕獲并不是程序崩潰的根本原因必指,因?yàn)檫M(jìn)行網(wǎng)絡(luò)請(qǐng)求囊咏,必然是涉及到線程切換的。就算此處 try catch 處理了,也并不能捕獲到事件流下游的異常梅割。
@Override public void onNext(Response<R> response) {
if (response.isSuccessful()) {
observer.onNext(response.body());
} else {
terminated = true;
Throwable t = new HttpException(response);
try {
observer.onError(t);
} catch (Throwable inner) {
Exceptions.throwIfFatal(inner);
RxJavaPlugins.onError(new CompositeException(t, inner));
}
}
}
以我們?cè)谧罱K的 Observer 的 onNext 拋出異常為例霜第,要捕獲這次異常那么必須在最終的調(diào)用線程中去進(jìn)行捕獲。即 .observeOn(AndroidSchedulers.mainThread())
切換過來的 Android 主線程户辞。與其他操作符一樣泌类,線程切換時(shí)產(chǎn)生了一組新的訂閱關(guān)系,RxJava 內(nèi)部會(huì)創(chuàng)建一個(gè)新的觀察對(duì)象 ObservableObserveOn
底燎。
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
.
.
.
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this); // 執(zhí)行 ObservableObserveOn 的 run 方法
}
}
.
.
.
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
而執(zhí)行任務(wù)的 worker 即為對(duì)應(yīng)線程 Scheduler 的對(duì)應(yīng)實(shí)現(xiàn)子類所創(chuàng)建的 Worker刃榨,以 AndroidSchedulers.mainThread()
為例,Scheduler 實(shí)現(xiàn)類為 HandlerScheduler
双仍,其對(duì)應(yīng) Worker 為 HandlerWorker
枢希,最終任務(wù)交給 ScheduledRunnable
來執(zhí)行。
private static final class ScheduledRunnable implements Runnable, Disposable {
private final Handler handler;
private final Runnable delegate;
private volatile boolean disposed; // Tracked solely for isDisposed().
ScheduledRunnable(Handler handler, Runnable delegate) {
this.handler = handler;
this.delegate = delegate;
}
@Override
public void run() {
try {
delegate.run();
} catch (Throwable t) {
RxJavaPlugins.onError(t);
}
}
@Override
public void dispose() {
handler.removeCallbacks(this);
disposed = true;
}
@Override
public boolean isDisposed() {
return disposed;
}
}
會(huì)發(fā)現(xiàn)朱沃,run 中 進(jìn)行了 try catch苞轿。但 catch 內(nèi)消化異常使用的是全局異常處理 RxJavaPlugins.onError(t);
,而不是某一個(gè)觀察者的 onError
逗物。所以在經(jīng)過切換線程操作符后搬卒,觀察者 onNext 中拋出的異常,onError 無法捕獲翎卓。
處理方案
既然知道了問題所在契邀,那么處理問題的方案也就十分清晰了。
1失暴、注冊(cè)全局的異常處理
RxJavaPlugins.setErrorHandler(object : Consumer<Throwable> {
override fun accept(t: Throwable?) {
// do something
}
})
2坯门、Consumer 作為觀察者時(shí),不完全確定沒有異常一定要添加異常處理 Consumer
apiService.stringData()
.doOnSubscribe { t -> compositeDisposable.add(t) }
.compose(RxScheduler.sync())
.subscribe(Consumer<Boolean>{ }, Consumer<Throwable> { })
3锐帜、Observer 可以創(chuàng)建一個(gè) BaseObaerver 將 onNext 內(nèi)部進(jìn)行 try catch 人為的流轉(zhuǎn)到 onError 中田盈,項(xiàng)目中的觀察這都使用這個(gè) BaseObserver 的子類。
@Override
public void onNext(T t) {
try {
onSuccess(t);
} catch (Exception e) {
onError(e);
}
data = t;
success = true;
}