RxJava源碼分析

最簡(jiǎn)單的觀察者列車(chē)

Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("邦");
            }
        }).subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(String s) {

            }
        });

        觀察subscribe()得知  大體流程:

         1纤勒、會(huì)立即調(diào)用onStart()方法,在其它操作之前調(diào)用
subscriber.onStart();

         2挥吵、之后它喜歡用SafeSubscriber吧subscriber包起來(lái)(裝飾模式)
if (!(subscriber instanceof SafeSubscriber)) {
            subscriber = new SafeSubscriber<T>(subscriber);
        }

         3逝嚎、包起來(lái)后,就開(kāi)始調(diào)用observable的call()方法啟動(dòng)整個(gè)列車(chē)了
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);//可以忽略那個(gè)hook,至今沒(méi)發(fā)現(xiàn)hook中有什么實(shí)際的代碼,方法都只是返回傳入的參數(shù)而已
    public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) {//hook中僅僅返回了參數(shù)
        return onSubscribe;
    }

         4、而我們?cè)赾all()中操作的subscribe實(shí)際上是裝飾者SafeSubscriber阿浓。原因是傳入的參數(shù)subscriber就是包好的SafeSubscriber。
public class SafeSubscriber<T> extends Subscriber<T> {

    private final Subscriber<? super T> actual;

    public SafeSubscriber(Subscriber<? super T> actual) {
        super(actual);
        this.actual = actual;
    }

         5蹋绽、但其實(shí)我們?cè)赾all()中調(diào)用的SafeSubscriber.onNext()方法會(huì)直接調(diào)用SafeSubscriber內(nèi)部被包起來(lái)的subscriber的onNext()方法
@Override
    public void onNext(T args) {
        try {
            if (!done) {
                actual.onNext(args);
            }
        } catch (Throwable e) {
            // we handle here instead of another method so we don't add stacks to the frame
            // which can prevent it from being able to handle StackOverflow
            Exceptions.throwOrReport(e, this);
        }
    }

         6芭毙、結(jié)果因?yàn)檫@個(gè)被包起來(lái)的subscriber方法是我們寫(xiě)的訂閱者,于是訂閱者的onNext()觸發(fā)了 PS: 所以?xún)H僅是包起來(lái),并沒(méi)有其它操作
         7筋蓖、綜上所述 調(diào)用subscribe()之前都是準(zhǔn)備階段,各種包裹,存儲(chǔ)變量。一旦調(diào)用subscribe(),整個(gè)列車(chē)就啟動(dòng)了稿蹲。
    最簡(jiǎn)單的異常處理:
         1扭勉、并不是全程try包起來(lái)異常處理的。
         2苛聘、第一個(gè)異常檢測(cè)是在subscribe()方法開(kāi)始時(shí)判斷訂閱者與被訂閱者是否為null涂炎,拋出“你是不是故意找茬”的異常,這個(gè)檢測(cè)甚至在調(diào)用onStart()之前设哗。
         3唱捣、值得一提的是onStart()并沒(méi)有被try包裹起來(lái)。
        if (subscriber == null) {
            throw new IllegalArgumentException("subscriber can not be null");
        }
        if (observable.onSubscribe == null) {
            throw new IllegalStateException("onSubscribe function can not be null.");
        }
        
        subscriber.onStart();

         4网梢、有try塊包裹了列車(chē)的啟動(dòng)方法call()震缭。處理的方式是 (1)手動(dòng)檢測(cè)拋出致命錯(cuò)誤(這個(gè)操作挺頻繁) -> (2)傳遞Throwable給subscriber的onError()  PS: 在檢測(cè)致命錯(cuò)誤后其實(shí)還會(huì)檢測(cè)是否訂閱了,但因?yàn)橐欢ㄊ?已訂閱),所以沒(méi)區(qū)別(因?yàn)楦緵](méi)初始化“是否訂閱”這個(gè)變量)
        try {
            hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
            return hook.onSubscribeReturn(subscriber);
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);//(1)手動(dòng)檢測(cè)拋出致命錯(cuò)誤
            if (subscriber.isUnsubscribed()) {
                RxJavaPluginUtils.handleException(hook.onSubscribeError(e));
            } else {
                try {
                    subscriber.onError(hook.onSubscribeError(e));//(2)傳遞Throwable給subscriber的onError()
                } catch (Throwable e2) {//(3)onError都出錯(cuò)了、拋出 “啊战虏,完蛋啦”
                    Exceptions.throwIfFatal(e2);
                    RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                    hook.onSubscribeError(r);
                    throw r;
                }
            }
            return Subscriptions.unsubscribed();
        }
    }

         5拣宰、注意這里的onError()方法,它并不是我們寫(xiě)的onError(),而是愛(ài)包裝的SafeSubscriber的onError(),在此方法內(nèi)有一個(gè)唯一的標(biāo)識(shí)用于讓此方法只會(huì)被調(diào)用一次。
         SafeSubscriber的onError()只做了一個(gè)“默認(rèn)異常處理(其實(shí)就是什么都不做)”就執(zhí)行了我們寫(xiě)的onError()來(lái)解決異常烦感。不管我們的onError()執(zhí)行成功了,還是拋出異常了,又或是根本沒(méi)寫(xiě)onError()巡社,它都會(huì)unsubscribe()來(lái)取消訂閱。 PS“是否訂閱”變量終于改變了
         unsubscribe()還做了另外的操作,但這里沒(méi)有看到手趣。 PS: 這就是SafeSubscriber(安全訂閱者),它代理了對(duì)subscribe的操作,當(dāng)出異常時(shí)執(zhí)行額外的代碼晌该。 這可能是RxJava的秘密

         6、如果subscriber.onError()都報(bào)錯(cuò)了绿渣、就只會(huì)檢測(cè)拋出致命錯(cuò)誤后拋出錯(cuò)誤 “你的onError()拋異常啦!朝群,異常為 $%# ” PS: 連這個(gè)異常都包起來(lái)了

        PS: 你可能認(rèn)為我漏掉了onCompleted(),但這個(gè)方法無(wú)論是運(yùn)行成功,還是因失敗拋出異常,它都沒(méi)有被調(diào)用。
    最簡(jiǎn)單的泛型限定:
     1中符、這里被限制的類(lèi)型只有兩處姜胖,(1)、create(OnSubscribe<T>) ; (2)淀散、subscribe(Subscribe<T>)
     它限制泛型的秘密在create()方法中,create()內(nèi)創(chuàng)建了Observable對(duì)象,當(dāng)光標(biāo)選中Observable構(gòu)造方法里的泛型T時(shí)谭期,整個(gè)滾動(dòng)條都綠了!
     Observable擁有的泛型只有一個(gè)<T>,在構(gòu)造時(shí)實(shí)現(xiàn)了<T>的類(lèi)型,又在subscribe()中限定了<T>,導(dǎo)致subscribe()的參數(shù)泛型也必須一致了吧凉。
     PS: subscribe()中機(jī)智的使用了<? super T>,這是為數(shù)不多的泛型父類(lèi)限定,理由也很簡(jiǎn)單(父類(lèi)引用子類(lèi))

<head>添加map運(yùn)算符</head>
瞬間設(shè)計(jì)模式的難度以幾何的倍數(shù)上升隧出,為了清晰直觀的看源碼,我仿寫(xiě)了它的代碼阀捅。
public class MyRxJava {
    {//主體調(diào)用部分
        MyObservable.create(new MyOnSubscribe() {
            @Override
            public void call(MySubscriber s) {
                s.onNext();
            }
        }).subscribe(new MySubscriber() {
            @Override
            void onStart() {

            }

            @Override
            void onNext() {

            }

            @Override
            void onCompleted() {

            }
        });
    }
}

class MyObservable {//RxJava的操作主體,Observable
    private final MyOnSubscribe onSubscribe;

    public MyObservable(MyOnSubscribe subscribe) {
        this.onSubscribe = subscribe;
    }

    public static MyObservable create(MyOnSubscribe subscribe) {
        return new MyObservable(subscribe);
    }

    public final MySubscriber subscribe(MySubscriber subscriber) {
        subscriber.onStart();
        this.onSubscribe.call(subscriber);
        return subscriber;
    }
}

interface MyOnSubscribe {//create時(shí)使用胀瞪,被訂閱者

    void call(MySubscriber s);
}

abstract class MySubscriber {//訂閱時(shí)使用,訂閱者
    private boolean isUnsubscribed;//是否被取消訂閱(目前沒(méi)用)

    abstract void onStart();

    abstract void onNext();

    abstract void onCompleted();

    public void unsubscribe() {
        isUnsubscribed = false;
    }

    public boolean isUnsubscribed() {
        return isUnsubscribed;
    }
}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市凄诞,隨后出現(xiàn)的幾起案子圆雁,更是在濱河造成了極大的恐慌,老刑警劉巖帆谍,帶你破解...
    沈念sama閱讀 217,406評(píng)論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件伪朽,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡汛蝙,警方通過(guò)查閱死者的電腦和手機(jī)烈涮,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,732評(píng)論 3 393
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)窖剑,“玉大人坚洽,你說(shuō)我怎么就攤上這事∥魍粒” “怎么了讶舰?”我有些...
    開(kāi)封第一講書(shū)人閱讀 163,711評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)需了。 經(jīng)常有香客問(wèn)我跳昼,道長(zhǎng),這世上最難降的妖魔是什么肋乍? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,380評(píng)論 1 293
  • 正文 為了忘掉前任鹅颊,我火速辦了婚禮,結(jié)果婚禮上住拭,老公的妹妹穿的比我還像新娘挪略。我一直安慰自己历帚,他們只是感情好滔岳,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,432評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著挽牢,像睡著了一般谱煤。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上禽拔,一...
    開(kāi)封第一講書(shū)人閱讀 51,301評(píng)論 1 301
  • 那天刘离,我揣著相機(jī)與錄音,去河邊找鬼睹栖。 笑死硫惕,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的野来。 我是一名探鬼主播恼除,決...
    沈念sama閱讀 40,145評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了豁辉?” 一聲冷哼從身側(cè)響起令野,我...
    開(kāi)封第一講書(shū)人閱讀 39,008評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎徽级,沒(méi)想到半個(gè)月后气破,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,443評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡餐抢,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,649評(píng)論 3 334
  • 正文 我和宋清朗相戀三年现使,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片弹澎。...
    茶點(diǎn)故事閱讀 39,795評(píng)論 1 347
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡朴下,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出苦蒿,到底是詐尸還是另有隱情殴胧,我是刑警寧澤,帶...
    沈念sama閱讀 35,501評(píng)論 5 345
  • 正文 年R本政府宣布佩迟,位于F島的核電站团滥,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏报强。R本人自食惡果不足惜灸姊,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,119評(píng)論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望秉溉。 院中可真熱鬧力惯,春花似錦、人聲如沸召嘶。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,731評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)弄跌。三九已至甲喝,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間铛只,已是汗流浹背埠胖。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,865評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留淳玩,地道東北人直撤。 一個(gè)月前我還...
    沈念sama閱讀 47,899評(píng)論 2 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像蜕着,于是被迫代替她去往敵國(guó)和親谋竖。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,724評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容

  • 引言 簡(jiǎn)單闡述RxJava流程源碼,RxJava有以下三種流程,向下遞增。 Observable->Observe...
    伍零一閱讀 760評(píng)論 0 8
  • RxJava強(qiáng)大的地方之一是他的鏈?zhǔn)秸{(diào)用圈盔,輕松地在線程之間進(jìn)行切換豹芯。這幾天也大概分析了一下RxJava的線程切換的...
    TripleZhao閱讀 1,229評(píng)論 2 8
  • RxJava源碼分析(1) Rxjava相信大家都不陌生,是現(xiàn)在很流行的一種解決異步通信的框架驱敲,分析源碼铁蹈,不會(huì)對(duì)R...
    JCJIE閱讀 297評(píng)論 0 0
  • RxJava 簡(jiǎn)單來(lái)說(shuō) , 是一個(gè)很靈活切換線程的褲子 . 簡(jiǎn)單試?yán)?源碼解讀試?yán)?變換思想圖解 變換思想總結(jié) 1...
    Justson閱讀 1,398評(píng)論 0 1
  • 情人節(jié)前后握牧,理應(yīng)高調(diào)大聲秀恩愛(ài)、歌頌偉大的愛(ài)情娩梨。 但看多了勵(lì)志文章和心靈雞湯沿腰,我現(xiàn)在對(duì)喊口號(hào)式的寄語(yǔ)無(wú)感,甚至有些...
    顆粒crown閱讀 955評(píng)論 0 3