最新的項(xiàng)目代碼把RxJava1的代碼升級(jí)到了RxJava2.發(fā)布后發(fā)現(xiàn)crash概率提高了許多.簡(jiǎn)單地總結(jié)就是之前RxJava1中在Subscriber中onNext方法,如果發(fā)生了異常.默認(rèn)會(huì)進(jìn)入Subscriber的onError.但是在RxJava2中直接拋出到了虛擬機(jī).導(dǎo)致crash.由于RxJava1這種消費(fèi)所有異常特性.導(dǎo)致在老版本code中諸多不嚴(yán)謹(jǐn)代碼的bug都被rxjava1隱藏掉了.尤其在當(dāng)前應(yīng)用,業(yè)務(wù)與UI極度耦合的情況下,業(yè)務(wù)線程返回后UI已經(jīng)被銷毀或者改動(dòng).這種情況隨處可見.
RxJava的重要組成Observable(被觀察者) Observer(觀察者在Rxjava2中最終是Consumer)
其實(shí)說RxJava2中Observer的onNext回調(diào)拋出異常會(huì)crash.是非常不嚴(yán)謹(jǐn)?shù)恼f法.因?yàn)镽xJava的操作符以及鏈?zhǔn)骄幊痰恼{(diào)用結(jié)構(gòu).我們最終的訂閱者Observer會(huì)在操作符的幫助下.層層包裹最后被調(diào)用.
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("normal data test");
emitter.onComplete();
}
});
Observer observer = new Observer<String>() {
@Override
public void onComplete() {
System.out.println("onComplete");
}
@Override
public void onError(Throwable arg0) {
System.out.println("error = " + arg0.getMessage());
}
@Override
public void onNext(String arg0) {
System.out.println("onnext");
throw new RuntimeException(arg0);
}
@Override
public void onSubscribe(Disposable arg0) {
}
};
observable.subscribe(observer);
上邊這個(gè)代碼,也不會(huì)引起崩潰.onNext中throw的異常會(huì)進(jìn)入onError中print.
其原因在與
Observable.create()方法,所生成的ObservableCreate對(duì)象,ObservableCreate是Observable的子類,他的subscribe方法,會(huì)包裹我們傳入的observer,使用CreateEmitter(發(fā)射器)這個(gè)類,最終回調(diào)傳入observer的onNext.source是我們create方法傳入的ObservableOnSubscribe對(duì)象,調(diào)用他的subscribe方法,其實(shí)就是執(zhí)行其內(nèi)部的方法體.
emitter.onNext("normal data test");
emitter.onComplete();
這部分操作被try-catch包裹,可以看到異常也會(huì)進(jìn)入emitter的onError最終給Observer
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
不過對(duì)應(yīng)我們項(xiàng)目中的Observable并不是這個(gè)簡(jiǎn)單地ObservableCreate.
Retrofit.create(IApi).subscribeOn().observeOn().map().subsribe();//其實(shí)還有個(gè)retryWhen
Retrofit.create這個(gè)方法生成的Observable實(shí)在Rxjava2CallAdapter的adapter生成的
BodyObservable
subscribeOn也會(huì)生成對(duì)應(yīng)的ObservableSbuscribeOn
observeOn生成的是ObservableObserverOn
map則對(duì)應(yīng)生成ObservableMap
考慮到observeOn方法控制了Rxjava的線程切換,所以,ObservableObserverOn這個(gè)實(shí)現(xiàn)類的內(nèi)部必然有線程切換,所以他之前的Observable對(duì)象即使try了自己內(nèi)部對(duì)應(yīng)的observer的onNext方法,也不可能對(duì)往下的傳遞生效(聽明白了嗎?).所以,只有兩個(gè)位置還有可能try住我們最終observer的onNext方法.
ObservableObserverOn
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
ObserveOnObserver
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);//把事件切換回主線程
}
}
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();//這里開始在主線程調(diào)用
}
}
void drainNormal() {
//這里邊的內(nèi)容就不粘貼了占地方.這里邊1.沒有對(duì)下一個(gè)observable的onNext沒有任何處理,沒有負(fù)責(zé)try-catch
}
這里的worker其實(shí)HandlerScheduler,他被AndroidSchedulers持有,RxAndroid,負(fù)責(zé)把往后的事件切換回主線程.他的schedule方法,其實(shí)就是執(zhí)行ObserveOnObserver的run方法.
其中ObserveOnObserver傳遞給HandlerScheduler,也會(huì)進(jìn)行一層封裝ScheduledRunnable
,這里邊會(huì)捕獲異常,不過他沒有把exception交給observer的onError而是交給全局plugin.
@Override
public void run() {
try {
delegate.run();
} catch (Throwable t) {
RxJavaPlugins.onError(t);//這里可以做全局的異常處理.
}
}
最終事件發(fā)射給ObservableMap
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);//終于到了我們傳入的observer的onNext
}
這部分代碼就非常簡(jiǎn)單了.最后簡(jiǎn)單地調(diào)動(dòng)v = mapper.apply(t)方法,做數(shù)據(jù)的轉(zhuǎn)換,然后調(diào)用我們最后傳入的真正的Observer的onNext(v)方法.可以看到這里也沒有處理異常情況.
所以到此為止.最終,如果我們傳入的Observer的onNext方法,拋出了異常.他的catch位置就是HandlerScheduler ScheduledRunnable
這個(gè)內(nèi)部類的run方法.
這個(gè)Exception最終是傳遞給RxJavaPlugins
這個(gè)對(duì)象處理.
public static void onError(@NonNull Throwable error) {
Consumer<? super Throwable> f = errorHandler;
if (error == null) {
error = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
} else {
if (!isBug(error)) {
error = new UndeliverableException(error);
}
}
if (f != null) {
try {
f.accept(error);
return;
} catch (Throwable e) {
// Exceptions.throwIfFatal(e); TODO decide
e.printStackTrace(); // NOPMD
uncaught(e);
}
}
error.printStackTrace(); // NOPMD
uncaught(error);
}
這個(gè)方法內(nèi)部,只要實(shí)現(xiàn)了errorHandler這個(gè)靜態(tài)對(duì)象,他就可以不把這個(gè)異常.拋給虛擬機(jī).
所以,針對(duì)這次Rxjava2的異常.最簡(jiǎn)單的方法就是給RxJavaPlugins設(shè)置errorHandler.
RxJavaPlugins.setErrorHandler(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
LogUtil.e("RxJava catch global exception", throwable);
}
});
這樣就可以保障RxJava2自己的調(diào)用鏈流程不會(huì)崩潰.
但是如果我們希望有了異常,還能進(jìn)入自己的onError做對(duì)應(yīng)處理.那么也很簡(jiǎn)單.對(duì)自己傳入的observer做一次封裝,直接try住自己的onNext塊,然后發(fā)給onError就行.
說到這里難免就要提到rxjava1.rxjava1的沒有崩潰正式因?yàn)槿绱?rxjava1中subscribe一個(gè)subscriber,他并沒有直接使用這個(gè)subscriber,而是直接對(duì)他封裝成SafeSubscriber
,可以看到,就是一個(gè)簡(jiǎn)單地try操作.然后轉(zhuǎn)onError
@Override
public void onNext(T t) {
try {
if (!done) {
actual.onNext(t);
}
} catch (Throwable e) {
// we handle here instead of another method so we don't add stacks to the frame
// which can prevent it from being able to handle StackOverflow
Exceptions.throwOrReport(e, this);
}
}
public static void throwOrReport(Throwable t, Observer<?> o) {
Exceptions.throwIfFatal(t);
o.onError(t);
}
說道這里,其實(shí)真?zhèn)€流程也算是又摸索了一遍,算是又熟悉一遍RxJava的鏈?zhǔn)搅鞒?不過這其實(shí)這是最微小的一部分.Rxjava操作符用這么點(diǎn),實(shí)在是不應(yīng)該.
另外就是說這么多,異常的根本原因,還是自身業(yè)務(wù)代碼的糟糕.這一點(diǎn)也無力吐槽了.正如使用這個(gè)RxJava一樣,根本沒有展示RxJava最強(qiáng)大的一面.只是把他做成了了最普通的回調(diào)功能.業(yè)務(wù)層不使用RxJava來消除無限回調(diào),那么使用Rxjava的意義也縮小了.另外就是,代碼的規(guī)范看上去遠(yuǎn)比使用新得技術(shù)來的重要.
盜圖一張(其實(shí)我也想畫一張,但是這幅感覺已經(jīng)足夠完美,這也是我要偷過來的原因):
這幅圖比我上面打那么多字給力的多.但我保證,我摸這個(gè)流程的時(shí)候,沒過看這個(gè)圖.
Observable.create().map().doOnNext().subsribeOn().observeOn().subscibe(Observe);
如上所述這個(gè)鏈?zhǔn)秸{(diào)用:
最后說三句話來總結(jié),我的總結(jié)完了.謝謝.