RxJava Observalble create subscribe源碼分析

先看一小段代碼

Observable<String> observable = Observable.create(observer->{
           observer.onNext("處理的數(shù)字是"+Math.random()*100);
           observer.onComplete();
});
observable.subscribe(consumer->{
    System.out.println("我處理的元素是"+consumer);
});
observable.subscribe(consumer->{
    System.out.println("我處理的元素是"+consumer);
});

執(zhí)行結(jié)果是

我處理的元素是處理的數(shù)字是19.702425673460567
我處理的元素是處理的數(shù)字是9.601318081392996

先看Observable.create方法

    @CheckReturnValue
    @NonNull
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

參數(shù)是ObservableOnSubscribe

public interface ObservableOnSubscribe<T> {

    /**
     * Called for each Observer that subscribes.
     * @param emitter the safe emitter instance, never null
     * @throws Exception on error
     */
    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}

其實(shí)我們可以把我們最開始的例子改寫成

ObservableOnSubscribe observableOnSubscribe = new ObservableOnSubscribe(){
        @Override
        public void subscribe(ObservableEmitter emitter) throws Exception {
               emitter.onNext("處理的數(shù)字是"+Math.random()*100);
               emitter.onComplete();
        }};
       
Observable<String> observable = Observable.create(observableOnSubscribe);

observable.subscribe(consumer->{
      System.out.println("我處理的元素是"+consumer);
});
observable.subscribe(consumer->{
      System.out.println("我處理的元素是"+consumer);
});

我們把create方法參數(shù)還原成1.8之前的寫法现斋,我們一眼就看出文章一開始的代碼寫的observer是影響我們理解代碼的

observer->{
            observer.onNext("處理的數(shù)字是"+Math.random()*100);
            observer.onComplete();
        }

其實(shí)是emitter更為恰當(dāng)

emitter->{
            emitter.onNext("處理的數(shù)字是"+Math.random()*100);
            emitter.onComplete();
        }

這個ObservableEmitter 又是個接口锥涕,也就是說下面這幾行代碼只是定義了一個模版,subscribe的時候莱找,由ObservableEmitter的實(shí)現(xiàn)類還具體執(zhí)行onNext和onComplete。那么實(shí)現(xiàn)類在哪里呢?

ObservableOnSubscribe observableOnSubscribe = new ObservableOnSubscribe(){
        @Override
        public void subscribe(ObservableEmitter emitter) throws Exception {
               emitter.onNext("處理的數(shù)字是"+Math.random()*100);
               emitter.onComplete();
        }};

我們再看Observable.create方法

    @CheckReturnValue
    @NonNull
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

也就是說

Observable<String> observable = Observable.create(observableOnSubscribe);

observable等于ObservableCreate的一個實(shí)例。這個ObservableCreate留著待用峦筒。

我們再看observable.subscribe方法

 @CheckReturnValue
 @SchedulerSupport(SchedulerSupport.NONE)
 public final Disposable subscribe(Consumer<? super T> onNext) {
     return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
 }

可以看到除了onNext函數(shù)是往下傳遞的,剩下的參數(shù)都是默認(rèn)值贸伐。
再放下跟subscribe方法

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete, Consumer<? super Disposable> onSubscribe) {
        ObjectHelper.requireNonNull(onNext, "onNext is null");
        ObjectHelper.requireNonNull(onError, "onError is null");
        ObjectHelper.requireNonNull(onComplete, "onComplete is null");
        ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
        LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
        subscribe(ls);
        return ls;
    }

注意這個LambdaObserver勘天,傳遞進(jìn)來的onNext函數(shù),在這里包裝成了一個observer對象捉邢。
繼續(xù)進(jìn)入subscribe(ls);

    @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);
            ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);
            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

沒什么重要的脯丝,繼續(xù)進(jìn)入subscribeActual(observer);

protected abstract void subscribeActual(Observer<? super T> observer);

發(fā)現(xiàn)是個抽象方法,那么自然應(yīng)該是剛剛待用的ObservableCreate的
subscribeActual方法

protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

第一句把observer對象再包裝成一個emitter對象
第二句什么也沒執(zhí)行伏伐,因?yàn)閛bserver只有onNext是傳進(jìn)來一個lambda宠进,其他三個參數(shù)都是默認(rèn)的。記得是個emptyConsumer藐翎。
本文的重中之重就是下面這句

source.subscribe(parent);

source就是我們一開始定義的observableOnSubscribe
subscribe就是observableOnSubscribe的subscribe方法
參數(shù)parent就是剛剛的CreateEmitter

ObservableOnSubscribe observableOnSubscribe = new ObservableOnSubscribe(){
        @Override
        public void subscribe(ObservableEmitter emitter) throws Exception {
               emitter.onNext("處理的數(shù)字是"+Math.random()*100);
               emitter.onComplete();
        }};

至此所有邏輯拼接成功
先執(zhí)行subscribe材蹬,然后再執(zhí)行我們自己定義的onNext。
done

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末吝镣,一起剝皮案震驚了整個濱河市堤器,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌末贾,老刑警劉巖闸溃,帶你破解...
    沈念sama閱讀 212,454評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異拱撵,居然都是意外死亡辉川,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,553評論 3 385
  • 文/潘曉璐 我一進(jìn)店門拴测,熙熙樓的掌柜王于貴愁眉苦臉地迎上來乓旗,“玉大人,你說我怎么就攤上這事集索∮煊蓿” “怎么了汇跨?”我有些...
    開封第一講書人閱讀 157,921評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長渺鹦。 經(jīng)常有香客問我扰法,道長蛹含,這世上最難降的妖魔是什么毅厚? 我笑而不...
    開封第一講書人閱讀 56,648評論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮浦箱,結(jié)果婚禮上吸耿,老公的妹妹穿的比我還像新娘。我一直安慰自己酷窥,他們只是感情好咽安,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,770評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著蓬推,像睡著了一般妆棒。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上沸伏,一...
    開封第一講書人閱讀 49,950評論 1 291
  • 那天糕珊,我揣著相機(jī)與錄音,去河邊找鬼毅糟。 笑死红选,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的姆另。 我是一名探鬼主播喇肋,決...
    沈念sama閱讀 39,090評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼迹辐!你這毒婦竟也來了蝶防?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,817評論 0 268
  • 序言:老撾萬榮一對情侶失蹤明吩,失蹤者是張志新(化名)和其女友劉穎间学,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體贺喝,經(jīng)...
    沈念sama閱讀 44,275評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡菱鸥,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,592評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了躏鱼。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片氮采。...
    茶點(diǎn)故事閱讀 38,724評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖染苛,靈堂內(nèi)的尸體忽然破棺而出鹊漠,到底是詐尸還是另有隱情主到,我是刑警寧澤,帶...
    沈念sama閱讀 34,409評論 4 333
  • 正文 年R本政府宣布躯概,位于F島的核電站登钥,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏娶靡。R本人自食惡果不足惜牧牢,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,052評論 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望姿锭。 院中可真熱鬧塔鳍,春花似錦、人聲如沸呻此。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,815評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽焚鲜。三九已至掌唾,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間忿磅,已是汗流浹背糯彬。 一陣腳步聲響...
    開封第一講書人閱讀 32,043評論 1 266
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留贝乎,地道東北人情连。 一個月前我還...
    沈念sama閱讀 46,503評論 2 361
  • 正文 我出身青樓,卻偏偏與公主長得像览效,于是被迫代替她去往敵國和親却舀。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,627評論 2 350