聊一聊 RxJava2 中的異常及處理方式

眾所周知嚣镜,RxJava2 中當(dāng)鏈?zhǔn)秸{(diào)用中拋出異常時(shí)漓库,如果沒有對(duì)應(yīng)的 Consumer 去處理異常吧黄,則這個(gè)異常會(huì)被拋出到虛擬機(jī)中去柬帕,Android 上的直接表現(xiàn)就是 crash,程序崩潰隶垮。

訂閱方式

說異常處理前咱們先來看一下 RxJava2 中 Observable 訂閱方法 subscribe() 我們常用的幾種訂閱方式:


// 1
subscribe()
// 2
Disposable subscribe(Consumer<? super T> onNext)
// 3
Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)
// 4
Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete)
// 5
Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete, Consumer<? super Disposable> onSubscribe)
// 6
void subscribe(Observer<? super T> observer)

無參和以 Consumer為參數(shù)的幾種方法內(nèi)部都是以默認(rèn)參數(shù)補(bǔ)齊的方式最終調(diào)用第 5 個(gè)方法藻雪,而方法 5 內(nèi)部通過 LambdaObserver 將參數(shù)包裝成 Observer 再調(diào)用第 6 個(gè)方法

    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete, Consumer<? super Disposable> onSubscribe) {
        ObjectHelper.requireNonNull(onNext, "onNext is null");
        ObjectHelper.requireNonNull(onError, "onError is null");
        ObjectHelper.requireNonNull(onComplete, "onComplete is null");
        ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

        LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);

        subscribe(ls);

        return ls;
    }

所以使用 Consumer 參數(shù)方式和 Observer 參數(shù)方式進(jìn)行訂閱除了觀察回調(diào)來源不一樣其他沒有任何差別秘噪。但就是因?yàn)檫@種差別狸吞,在異常情況發(fā)生時(shí)的處理結(jié)果上也會(huì)產(chǎn)生差別

異常處理

我們分別進(jìn)行一下幾種方式模擬異常:

  • 1、Observer onNext 中拋出異常(切換線程)
                apiService.newJsonKeyData()
                    .doOnSubscribe { t -> compositeDisposable.add(t) }
                    .compose(RxScheduler.sync()) // 封裝的線程切換
                    .subscribe(object : Observer<List<ZooData>> {
                        override fun onComplete() {

                        }

                        override fun onSubscribe(d: Disposable) {

                        }

                        override fun onNext(t: List<ZooData>) {
                            throw RuntimeException("runtime exception")
                        }

                        override fun onError(e: Throwable) {
                            Log.d("error", e.message)
                        }

                    })

結(jié)果:不會(huì)觸發(fā) onError指煎,App 崩潰

  • 2蹋偏、Observer onNext 中拋出異常(未切換線程)
               Observable.create<String> {
                        it.onNext("ssss")
                    }
                            .subscribe(object : Observer<String> {
                                override fun onComplete() {

                                }

                                override fun onSubscribe(d: Disposable) {

                                }

                                override fun onNext(t: String) {
                                    Log.d("result::", t)
                                    throw RuntimeException("run llllll")
                                }

                                override fun onError(e: Throwable) {
                                    Log.e("sss", "sss", e)
                                }

                            })

結(jié)果:會(huì)觸發(fā) onError,App 未崩潰

  • 3至壤、Observer map 操作符中拋出異常
                apiService.newJsonKeyData()
                    .doOnSubscribe { t -> compositeDisposable.add(t) }
                    .map {
                        throw RuntimeException("runtime exception")
                    }
                    .compose(RxScheduler.sync())
                    .subscribe(object : Observer<List<ZooData>> {
                        override fun onComplete() {

                        }

                        override fun onSubscribe(d: Disposable) {

                        }

                        override fun onNext(t: List<ZooData>) {

                        }

                        override fun onError(e: Throwable) {
                            Log.d("error", e.message)
                        }

                    })

結(jié)果:會(huì)觸發(fā) Observer 的 onError威始,App 未崩潰

  • 4、Consumer onNext 中拋出異常
             apiService.newJsonKeyData()
                    .doOnSubscribe { t -> compositeDisposable.add(t) }
                    .compose(RxScheduler.sync())
                    .subscribe({
                        throw RuntimeException("messsasassssssssssssssssssssssssssssssssssssss")
                    }, {
                        Log.d("Error", it.message)
                    })

結(jié)果 A:有 errorConsumer 觸發(fā) errorConsumer像街,App 未崩潰

    apiService.newJsonKeyData()
                    .doOnSubscribe { t -> compositeDisposable.add(t) }
                    .compose(RxScheduler.sync())
                    .subscribe {
                        throw RuntimeException("messsasassssssssssssssssssssssssssssssssssssss")
                    }

結(jié)果 B:無 errorConsumer黎棠,App 崩潰

那么為什么會(huì)出現(xiàn)這些不同情況呢?我們從源碼中去一探究竟镰绎。

Consumer 訂閱方式的崩潰與不崩潰

subscribe() 傳入 consumer 類型參數(shù)最終在 Observable 中會(huì)將傳入的參數(shù)轉(zhuǎn)換為 LambdaObserver 再調(diào)用 subscribe(lambdaObserver)進(jìn)行訂閱脓斩。展開 LambdaObserver:(主要看 onNext 和 onError 方法中的處理)

        .
        .
        .
            @Override
    public void onNext(T t) {
        if (!isDisposed()) {
            try {
                onNext.accept(t);
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                get().dispose();
                onError(e);
            }
        }
    }

    @Override
    public void onError(Throwable t) {
        if (!isDisposed()) {
            lazySet(DisposableHelper.DISPOSED);
            try {
                onError.accept(t);
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                RxJavaPlugins.onError(new CompositeException(t, e));
            }
        } else {
            RxJavaPlugins.onError(t);
        }
    }
        .
        .
        .

onNext 中調(diào)用了對(duì)應(yīng) consumer 的 apply() 方法,并且進(jìn)行了 try catch畴栖。因此我們?cè)?consumer 中進(jìn)行的工作拋出異常會(huì)被捕獲觸發(fā) LambdaObserver 的 onError随静。再看 onError 中,如果訂閱未取消且 errorConsumer 的 apply() 執(zhí)行無異常則能正常走完事件流,否則會(huì)調(diào)用 RxJavaPlugins.onError(t)燎猛×道Γ看到這里應(yīng)該就能明白了,當(dāng)訂閱時(shí)未傳入 errorConsumer時(shí) Observable 會(huì)指定 OnErrorMissingConsumer 為默認(rèn)的 errorConsumer重绷,發(fā)生異常時(shí)拋出 OnErrorNotImplementedException沸停。

RxJavaPlugins.onError(t)

上面分析,發(fā)現(xiàn)異常最終會(huì)流向 RxJavaPlugins.onError(t)昭卓。這個(gè)方法為 RxJava2 提供的一個(gè)全局的靜態(tài)方法星立。

    public static void onError(@NonNull Throwable error) {
        Consumer<? super Throwable> f = errorHandler;

        if (error == null) {
            error = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
        } else {
            if (!isBug(error)) {
                error = new UndeliverableException(error);
            }
        }

        if (f != null) {
            try {
                f.accept(error);
                return;
            } catch (Throwable e) {
                // Exceptions.throwIfFatal(e); TODO decide
                e.printStackTrace(); // NOPMD
                uncaught(e);
            }
        }

        error.printStackTrace(); // NOPMD
        uncaught(error);
    }

查看其源碼發(fā)現(xiàn),當(dāng) errorHandler 不為空時(shí)異常將由其消耗掉葬凳,為空或者消耗過程產(chǎn)生新的異常則 RxJava 會(huì)將異常拋給虛擬機(jī)(可能導(dǎo)致程序崩潰)绰垂。 errorHandler本身是一個(gè) Consumer 對(duì)象,我們可以通過如下方式配置他:

    RxJavaPlugins.setErrorHandler(object : Consumer1<Throwable> {
        override fun accept(t: Throwable?) {
            TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
        }

    })

數(shù)據(jù)操作符中拋出異常

以 map 操作符為例火焰,map 操作符實(shí)際上 RxJava 是將事件流 hook 了另一個(gè)新的 Observable ObservableMap

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }

進(jìn)入 ObservableMap 類劲装,發(fā)現(xiàn)內(nèi)部訂閱了一個(gè)內(nèi)部靜態(tài)類 MapObserver,重點(diǎn)看 MapObserveronNext 方法

        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                downstream.onNext(null);
                return;
            }

            U v;

            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            downstream.onNext(v);
        }

onNext 中 try catch 了 mapper.apply()昌简,這個(gè) apply 執(zhí)行的就是我們?cè)诓僮鞣袑?shí)現(xiàn)的 function 方法占业。因此在 map 之類數(shù)據(jù)變換操作符中產(chǎn)生異常能夠自身捕獲并發(fā)送給最終的 Observer。如果此時(shí)的訂閱對(duì)象中能消耗掉異常則事件流正常走 onError() 結(jié)束,如果訂閱方式為上以節(jié)中的 consumer纯赎,則崩潰情況為上一節(jié)中的分析結(jié)果谦疾。

Observer 的 onNext 中拋出異常

上述的方式 1 為一次網(wǎng)絡(luò)請(qǐng)求,里面涉及到線程的切換犬金。方式 2 為直接 create 一個(gè) Observable 對(duì)象念恍,不涉及線程切換,其結(jié)果為線程切換后,觀察者 Observer 的 onNext() 方法中拋出異常無法觸發(fā) onError()晚顷,程序崩潰峰伙。

未切換線程的 Observable.create

查看 create() 方法源碼,發(fā)現(xiàn)內(nèi)部創(chuàng)建了一個(gè) ObservableCreate 對(duì)象该默,在調(diào)用訂閱時(shí)會(huì)觸發(fā) subscribeActual() 方法瞳氓。在 subscribeActual() 中再調(diào)用我們 create 時(shí)傳入的 ObservableOnSubscribe 對(duì)象的 subscribe() 方法來觸發(fā)事件流。

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
    
        // 對(duì)我們的觀察者使用 CreateEmitter 進(jìn)行包裝,內(nèi)部的觸發(fā)方法是相對(duì)應(yīng)的
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            // source 為 create 時(shí)創(chuàng)建的 ObservableOnSubscribe 匿名內(nèi)部接口實(shí)現(xiàn)類
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

上述代碼中的訂閱過程是使用 try catch 今夕包裹的栓袖。訂閱及訂閱觸發(fā)后發(fā)送的事件流都在一個(gè)線程匣摘,所以能夠捕獲整個(gè)事件流中的異常。(PS : 大家可以嘗試下使用 observeOn() 切換事件發(fā)送線程裹刮。會(huì)發(fā)現(xiàn)異常不能再捕獲音榜,程序崩潰)

涉及線程變換時(shí)的異常處理

Retrofit 進(jìn)行網(wǎng)絡(luò)請(qǐng)求返回的 Observable 對(duì)象實(shí)質(zhì)上是 RxJava2CallAdapter 中生成的 BodyObservable,期內(nèi)部的 onNext 是沒有進(jìn)行異常捕獲的。其實(shí)這里是否捕獲并不是程序崩潰的根本原因必指,因?yàn)檫M(jìn)行網(wǎng)絡(luò)請(qǐng)求囊咏,必然是涉及到線程切換的。就算此處 try catch 處理了,也并不能捕獲到事件流下游的異常梅割。

    @Override public void onNext(Response<R> response) {
      if (response.isSuccessful()) {
        observer.onNext(response.body());
      } else {
        terminated = true;
        Throwable t = new HttpException(response);
        try {
          observer.onError(t);
        } catch (Throwable inner) {
          Exceptions.throwIfFatal(inner);
          RxJavaPlugins.onError(new CompositeException(t, inner));
        }
      }
    }

以我們?cè)谧罱K的 Observer 的 onNext 拋出異常為例霜第,要捕獲這次異常那么必須在最終的調(diào)用線程中去進(jìn)行捕獲。即 .observeOn(AndroidSchedulers.mainThread()) 切換過來的 Android 主線程户辞。與其他操作符一樣泌类,線程切換時(shí)產(chǎn)生了一組新的訂閱關(guān)系,RxJava 內(nèi)部會(huì)創(chuàng)建一個(gè)新的觀察對(duì)象 ObservableObserveOn底燎。

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        }
        .
        .
        .
        void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this); // 執(zhí)行 ObservableObserveOn 的 run 方法
            }
        }
        .
        .
        .
          @Override
        public void run() {
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }
    

而執(zhí)行任務(wù)的 worker 即為對(duì)應(yīng)線程 Scheduler 的對(duì)應(yīng)實(shí)現(xiàn)子類所創(chuàng)建的 Worker刃榨,以 AndroidSchedulers.mainThread() 為例,Scheduler 實(shí)現(xiàn)類為 HandlerScheduler双仍,其對(duì)應(yīng) Worker 為 HandlerWorker枢希,最終任務(wù)交給 ScheduledRunnable 來執(zhí)行。

    private static final class ScheduledRunnable implements Runnable, Disposable {
        private final Handler handler;
        private final Runnable delegate;

        private volatile boolean disposed; // Tracked solely for isDisposed().

        ScheduledRunnable(Handler handler, Runnable delegate) {
            this.handler = handler;
            this.delegate = delegate;
        }

        @Override
        public void run() {
            try {
                delegate.run();
            } catch (Throwable t) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public void dispose() {
            handler.removeCallbacks(this);
            disposed = true;
        }

        @Override
        public boolean isDisposed() {
            return disposed;
        }
    }

會(huì)發(fā)現(xiàn)朱沃,run 中 進(jìn)行了 try catch苞轿。但 catch 內(nèi)消化異常使用的是全局異常處理 RxJavaPlugins.onError(t);,而不是某一個(gè)觀察者的 onError逗物。所以在經(jīng)過切換線程操作符后搬卒,觀察者 onNext 中拋出的異常,onError 無法捕獲翎卓。

處理方案

既然知道了問題所在契邀,那么處理問題的方案也就十分清晰了。
1失暴、注冊(cè)全局的異常處理

        RxJavaPlugins.setErrorHandler(object : Consumer<Throwable> {
            override fun accept(t: Throwable?) {
                // do something   
            }

        })

2坯门、Consumer 作為觀察者時(shí),不完全確定沒有異常一定要添加異常處理 Consumer

 apiService.stringData()
                    .doOnSubscribe { t -> compositeDisposable.add(t) }
                    .compose(RxScheduler.sync())
                    .subscribe(Consumer<Boolean>{ }, Consumer<Throwable> { })

3锐帜、Observer 可以創(chuàng)建一個(gè) BaseObaerver 將 onNext 內(nèi)部進(jìn)行 try catch 人為的流轉(zhuǎn)到 onError 中田盈,項(xiàng)目中的觀察這都使用這個(gè) BaseObserver 的子類。

    @Override
    public void onNext(T t) {
        try {
            onSuccess(t);
        } catch (Exception e) {
            onError(e);
        }
        data = t;
        success = true;
    }
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末缴阎,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子简软,更是在濱河造成了極大的恐慌蛮拔,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,657評(píng)論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件痹升,死亡現(xiàn)場(chǎng)離奇詭異建炫,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)疼蛾,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,889評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門肛跌,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人,你說我怎么就攤上這事衍慎∽Γ” “怎么了?”我有些...
    開封第一講書人閱讀 164,057評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵稳捆,是天一觀的道長(zhǎng)赠法。 經(jīng)常有香客問我,道長(zhǎng)乔夯,這世上最難降的妖魔是什么砖织? 我笑而不...
    開封第一講書人閱讀 58,509評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮末荐,結(jié)果婚禮上侧纯,老公的妹妹穿的比我還像新娘。我一直安慰自己甲脏,他們只是感情好茂蚓,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,562評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著剃幌,像睡著了一般聋涨。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上负乡,一...
    開封第一講書人閱讀 51,443評(píng)論 1 302
  • 那天牍白,我揣著相機(jī)與錄音,去河邊找鬼抖棘。 笑死茂腥,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的切省。 我是一名探鬼主播最岗,決...
    沈念sama閱讀 40,251評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼朝捆!你這毒婦竟也來了般渡?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,129評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤芙盘,失蹤者是張志新(化名)和其女友劉穎驯用,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體儒老,經(jīng)...
    沈念sama閱讀 45,561評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡蝴乔,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,779評(píng)論 3 335
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了驮樊。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片薇正。...
    茶點(diǎn)故事閱讀 39,902評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡片酝,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出挖腰,到底是詐尸還是另有隱情雕沿,我是刑警寧澤,帶...
    沈念sama閱讀 35,621評(píng)論 5 345
  • 正文 年R本政府宣布曙聂,位于F島的核電站晦炊,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏宁脊。R本人自食惡果不足惜断国,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,220評(píng)論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望榆苞。 院中可真熱鬧稳衬,春花似錦、人聲如沸坐漏。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,838評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽赊琳。三九已至街夭,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間躏筏,已是汗流浹背板丽。 一陣腳步聲響...
    開封第一講書人閱讀 32,971評(píng)論 1 269
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留趁尼,地道東北人埃碱。 一個(gè)月前我還...
    沈念sama閱讀 48,025評(píng)論 2 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像酥泞,于是被迫代替她去往敵國(guó)和親砚殿。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,843評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容