我所理解的Rxjava2

最近又在看網(wǎng)易公開課里面的美國名校的畢業(yè)演講名党,雖然以前看過区丑,但現(xiàn)在重新看的話嘱蛋,依然能學(xué)到不少東西叫潦,能把某個話題講的很細(xì),很深爸邢,我現(xiàn)在特別喜歡Michelle樊卓,作為第一夫人卻沒有第一夫人的架子,聽她的演講杠河,能感受到那種能量密度特別大碌尔,她演講到人要想成為什么樣的人時,我特別震撼券敌,她認(rèn)為如果想要成為什么樣的人的話唾戚,必須要有適應(yīng)力和責(zé)任心。而國內(nèi)的演講都是講怎么成功待诅,怎么成才叹坦,怎么愛國,且都談的特別的大卑雁,泛泛而談募书,我認(rèn)為,這也是現(xiàn)在的年輕人特別浮躁的原因之一吧序厉。對于我目前而言要做的是锐膜,腳踏實地的做事情,好好孝敬父母弛房。

本篇文章主要是明白Rxjava的流程 ,本文基于Rxjava2.2.8 的源碼

1.分析案例

分析下經(jīng)常用的這個操作符到底做了哪些事情

Observable.create(new ObservableOnSubscribe<String>(){

        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            emitter.onNext("hellp");
        }
    }).map(new Function<String, Object>() {
        @Override
        public Object apply(String s) throws Exception {
            return s + " world";
        }
    }).subscribe(new Observer<Object>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d("verse", "onSubscribe");
        }

        @Override
        public void onNext(Object o) {
            String str = (String) o;
            Log.d("verse", str);
        }

        @Override
        public void onError(Throwable e) {
            Log.d("verse", "onError");
        }

        @Override
        public void onComplete() {
            Log.d("wangyang", "onComplete");
        }
    });

上述代碼很簡單而柑,在此不再多說

2.源碼分析create()

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

當(dāng)調(diào)用Observable.create(ObservableOnSubscribe)時文捶,初始化了ObservableCreate對象,并引用了ObservableOnSubscribe對象

3.源碼分析map()

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}

實際上是ObservableCreate.map(Function)媒咳,初始化了ObservableMap對象粹排,并引用了ObservableCreate對象和Function對象

4.源碼分析subscribe()

其實調(diào)用的是ObservableMap.subscribe()

public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);

        ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not
        // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);

        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    }
}

剔除不是關(guān)鍵的代碼后,其實只有一行代碼

subscribeActual(observer);

查看該代碼發(fā)現(xiàn)是抽象方法涩澡,該實現(xiàn)方法是ObservableMap.subscribeActual(observer)

public void subscribeActual(Observer<? super U> t) {
    source.subscribe(new MapObserver<T, U>(t, function));
}

將實現(xiàn)的Observer和Function封裝在MapObserver中顽耳,而source是引用上游的對象

即ObservableCreate.subscribe(Observer),還是這段代碼

public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);

        ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not
        // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);

        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    }
}

又繼續(xù)調(diào)用ObservableCreate.subscribeActual(observer)

protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);

    try {
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

又將observer封裝在CreateEmitter中妙同,并調(diào)用observer.onSubscribe(parent)射富,而這個observer就是MapObserver,查看MapObserver.onSubscribe()

public final void onSubscribe(Disposable d) {
    if (DisposableHelper.validate(this.upstream, d)) {

        this.upstream = d;
        if (d instanceof QueueDisposable) {
            this.qd = (QueueDisposable<T>)d;
        }

        if (beforeDownstream()) {

            downstream.onSubscribe(this);

            afterDownstream();
        }

    }
}

而這里的downstream就是我們在subscribe中實現(xiàn)的內(nèi)部內(nèi)粥帚,看源碼知道了原來onSubscribe()方法啥事情也沒干胰耗,只是調(diào)用了該方法,一般在這個方法里做一些初始化的操作芒涡,繼續(xù)往下看柴灯,會調(diào)用

source.subscribe(parent);

source就是在 create()所實現(xiàn)的類卖漫,并傳遞CreateEmitter,當(dāng)調(diào)用onNext()時赠群,就是調(diào)用的CreateEmitter.onNext()羊始,查看該方法

public void onNext(T t) {
        if (t == null) {
            onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
            return;
        }
        if (!isDisposed()) {
            observer.onNext(t);
        }
    }

剔除非關(guān)鍵性代碼后,只有一行代碼

observer.onNext(t);

而這個Observer是MapObserver查描,查看MapObserver.onNext()

public void onNext(T t) {
        if (done) {
            return;
        }

        if (sourceMode != NONE) {
            downstream.onNext(null);
            return;
        }

        U v;

        try {
            v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
        } catch (Throwable ex) {
            fail(ex);
            return;
        }
        downstream.onNext(v);
    }

剔除非關(guān)鍵性代碼后店枣,只有兩行代碼

v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
downstream.onNext(v);

mapper是實現(xiàn)的Function,轉(zhuǎn)化為相應(yīng)的類型后叹誉,會調(diào)用

downstream.onNext(v);

而downstrean是在subscribe中所實現(xiàn)的內(nèi)部內(nèi)鸯两,最后會調(diào)用我們實現(xiàn)的匿名內(nèi)部內(nèi)Observer.onNext()

自此,該案例分析完畢

5.總結(jié)

當(dāng)調(diào)用操作符時长豁,會創(chuàng)建對應(yīng)的對象钧唐,并引用上游的對象,如調(diào)用操作符map()時匠襟,會創(chuàng)建ObservableMap對象钝侠,并持有上游的ObservableCreate對象,這樣一環(huán)扣著一環(huán)酸舍,直至調(diào)用subscribe時帅韧,會將subscribe(Observer)里面的參數(shù)和操作符的內(nèi)部內(nèi)封裝在Observable類里面的靜態(tài)內(nèi)部內(nèi)里面,由于引用了上游的Observable啃勉,再調(diào)用上游的subscribe的方法忽舟,依次類推,直至ObservableCreate.subscribe(Observer)方法里淮阐,在這里叮阅,會先執(zhí)行一個onSubscribe()方法,一般是在這里面做一個執(zhí)行前的操作泣特,再會執(zhí)行ObservableOnSubscribe.subscribe(ObservableEmitter)方法浩姥,在這個方法里可以執(zhí)行ObservableEmitter.onNext(),而當(dāng)前的實例是在ObservableCreate的內(nèi)部內(nèi)的onNext()方法里實現(xiàn)的状您,在這個onNext里勒叠,又會調(diào)用observer.onNext()方法,這里的observer是下游的Observable的內(nèi)部內(nèi)膏孟,在下游的內(nèi)部內(nèi)的onNext()執(zhí)行需要的操作后又繼續(xù)傳遞給下下游的Observable.onNext()眯分,依次類推,直至最后一個onNext()骆莹,也就是subscribe(Observer)方法的參數(shù)的Observer.onNext()

一圖勝千言颗搂,我在網(wǎng)上找了Rxjava2的流程圖

rxjava2事件流程圖
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市幕垦,隨后出現(xiàn)的幾起案子丢氢,更是在濱河造成了極大的恐慌傅联,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,839評論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件疚察,死亡現(xiàn)場離奇詭異蒸走,居然都是意外死亡,警方通過查閱死者的電腦和手機貌嫡,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,543評論 2 382
  • 文/潘曉璐 我一進(jìn)店門比驻,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人岛抄,你說我怎么就攤上這事别惦。” “怎么了夫椭?”我有些...
    開封第一講書人閱讀 153,116評論 0 344
  • 文/不壞的土叔 我叫張陵掸掸,是天一觀的道長。 經(jīng)常有香客問我蹭秋,道長扰付,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,371評論 1 279
  • 正文 為了忘掉前任仁讨,我火速辦了婚禮羽莺,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘洞豁。我一直安慰自己盐固,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 64,384評論 5 374
  • 文/花漫 我一把揭開白布族跛。 她就那樣靜靜地躺著闰挡,像睡著了一般。 火紅的嫁衣襯著肌膚如雪礁哄。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,111評論 1 285
  • 那天溪北,我揣著相機與錄音桐绒,去河邊找鬼。 笑死之拨,一個胖子當(dāng)著我的面吹牛茉继,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播蚀乔,決...
    沈念sama閱讀 38,416評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼烁竭,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了吉挣?” 一聲冷哼從身側(cè)響起派撕,我...
    開封第一講書人閱讀 37,053評論 0 259
  • 序言:老撾萬榮一對情侶失蹤婉弹,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后终吼,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體镀赌,經(jīng)...
    沈念sama閱讀 43,558評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,007評論 2 325
  • 正文 我和宋清朗相戀三年际跪,在試婚紗的時候發(fā)現(xiàn)自己被綠了商佛。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,117評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡姆打,死狀恐怖良姆,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情幔戏,我是刑警寧澤玛追,帶...
    沈念sama閱讀 33,756評論 4 324
  • 正文 年R本政府宣布,位于F島的核電站评抚,受9級特大地震影響豹缀,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜慨代,卻給世界環(huán)境...
    茶點故事閱讀 39,324評論 3 307
  • 文/蒙蒙 一邢笙、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧侍匙,春花似錦氮惯、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,315評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至说莫,卻和暖如春杨箭,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背储狭。 一陣腳步聲響...
    開封第一講書人閱讀 31,539評論 1 262
  • 我被黑心中介騙來泰國打工互婿, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人辽狈。 一個月前我還...
    沈念sama閱讀 45,578評論 2 355
  • 正文 我出身青樓慈参,卻偏偏與公主長得像,于是被迫代替她去往敵國和親刮萌。 傳聞我的和親對象是個殘疾皇子驮配,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,877評論 2 345

推薦閱讀更多精彩內(nèi)容