RxJava源碼分析(一)基本的數(shù)據(jù)流分析(無背壓)

引言

關(guān)于RxJava2的用法網(wǎng)上的資料很多,這里我們只學(xué)習(xí)它的實現(xiàn)原理握侧。本文專題目的:
1.知道源頭(Observable)是如何將數(shù)據(jù)發(fā)送出去的。
2.知道終點(Observer)是如何接收到數(shù)據(jù)的。
3.何時將源頭和終點關(guān)聯(lián)起來的
今天我們先從最簡單的無背壓(Observable)的create操作符說起远剩,來解決前三個問題汛兜。

樣例

       //1.創(chuàng)建被觀察者巴粪,生產(chǎn)事件
        final Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            //2.訂閱的時候發(fā)送事件
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);

//                emitter.onError(new Throwable("haha"));
                emitter.onComplete();//onComplete事件發(fā)送后,后面的所有事件無效,且后面不能發(fā)送錯誤事件
                emitter.onNext(3);

            }
        });
        //3.定義觀察者
        Observer<Integer> observer = new Observer<Integer>() {
            
            @Override
            public void onSubscribe(Disposable d) {
                Log.e(TAG, "開始采用subscribe連接");
                mDisposable = d;
            }

            @Override
            public void onNext(Integer value) {
                Log.e(TAG, "對Next事件" + value + "作出響應(yīng)");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "對Error事件作出響應(yīng)");
            }

            @Override
            public void onComplete() {
                Log.e(TAG, "對Complete事件作出響應(yīng)");
            }
        };
        //4建立聯(lián)系
        observable.subscribe(observer);

我們看到出現(xiàn)了一下幾個角色:

  1. Observable:被觀察者粥谬,是數(shù)據(jù)的源頭肛根,通過subscribe訂閱被觀察者;
  2. ObservableOnSubscribe:從代碼結(jié)構(gòu)上看,Observable的構(gòu)造方法需要它漏策,且持有subscribe方法派哲,這里暫時理解為觀察者和被觀察者的中間件,具體作用后面再看;
  3. ObservableEmitter:顧名思義,是數(shù)據(jù)發(fā)射器掺喻,被觀察者通過它發(fā)送事件;
  4. Observer:被觀察者芭届,數(shù)據(jù)接受者,持有onNext感耙、onError褂乍、onComplete、onSubscribe方法即硼。

Observable

public abstract class Observable<T> implements ObservableSource<T> {
...
}

實現(xiàn)了ObservableSource接口:

public interface ObservableSource<T> {
    /**
     * Subscribes the given Observer to this ObservableSource instance.
     * @param observer the Observer, not null
     * @throws NullPointerException if {@code observer} is null
     */
    void subscribe(Observer<? super T> observer);
}

接口很簡單逃片,提供了訂閱觀察者的功能,Observable中該方法的實現(xiàn)后面再看只酥。我們先看看create操作符干了些啥:

public abstract class Observable<T> implements ObservableSource<T> {
    ...
   @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        //判空
        ObjectHelper.requireNonNull(source, "source is null");
       //構(gòu)造ObservableCreate對象
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
    ...
}

最后通過用戶構(gòu)造的ObservableOnSubscribe對象褥实,返回了ObservableCreate對象。我們先看看ObservableOnSubscribe裂允。

數(shù)據(jù)發(fā)射封裝-ObservableOnSubscribe

public interface ObservableOnSubscribe<T> {
    /**
     * Called for each Observer that subscribes.
     * @param e the safe emitter instance, never null
     * @throws Exception on error
     */
    void subscribe(ObservableEmitter<T> e) throws Exception;
}

是個接口损离,用戶發(fā)射數(shù)據(jù)就是在這個接口實現(xiàn)中完成,具體見樣例代碼绝编,這里流一個疑問:入?yún)bservableEmitter是怎么來的僻澎,別急后面馬上會講到!

create操作符的產(chǎn)物-ObservableCreate

然后再看Observable的子類ObservableCreate十饥,根據(jù)名字我們可以猜到它是由create操作符創(chuàng)建的被觀察者:

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;//用戶實現(xiàn)具體的數(shù)據(jù)發(fā)射操作

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            //調(diào)用中間件ObservableOnSubscribe的訂閱方法窟勃,開始調(diào)用發(fā)射數(shù)據(jù)的代碼了!
             //說明1
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
...
}

這里的source由用戶構(gòu)造绷跑,實現(xiàn)發(fā)射數(shù)據(jù)操作拳恋,subscribeActual方法是核心,當(dāng)訂閱觀察者是砸捏,最終會執(zhí)行subscribeActual方法谬运,后面會具體說明隙赁,不過看方法名也應(yīng)該能猜到。
前面我們講ObservableOnSubscribe的subscribe方法時梆暖,關(guān)于入?yún)⒌膩碓戳粝铝艘粋€疑問伞访,這里看說明1的代碼:入?yún)arent類型為CreateEmitter,很明顯它必然是ObservableEmitter的子類或者子接口轰驳。

事件發(fā)射器--ObservableEmitter

public interface ObservableEmitter<T> extends Emitter<T> {
  ...
}

public interface Emitter<T> {
    /**
     * Signal a normal value.
     * @param value the value to signal, not null
     */
    void onNext(T value);

    /**
     * Signal a Throwable exception.
     * @param error the Throwable to signal, not null
     */
    void onError(Throwable error);

    /**
     * Signal a completion.
     */
    void onComplete();
}

可見發(fā)射器接口提供了發(fā)射數(shù)據(jù)的功能厚掷,在回過頭來看看CreateEmitter,它是ObservableCreate的內(nèi)部類.

public final class ObservableCreate<T> extends Observable<T> {
...
...
static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {

        private static final long serialVersionUID = -3434801548987643227L;
        //持有觀察者,發(fā)射器的發(fā)送數(shù)據(jù)的方法其實是調(diào)用觀察者對應(yīng)的方法
        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                //調(diào)用觀察者的接收數(shù)據(jù)的方法
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
            } else {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    //發(fā)送完成事件后,斷開連接级解,不接受后序事件
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }
        ...
    }
}

饒了半天冒黑,我們終于找到被觀察者被調(diào)用的地方了,用戶調(diào)用發(fā)射器的發(fā)送數(shù)據(jù)的方法最終會通過ObservableCreate中的CreateEmitter實現(xiàn)調(diào)用勤哗,而CreateEmitter最終又會調(diào)用觀察者的接收數(shù)據(jù)方法抡爹,到此為止,下游接收數(shù)據(jù)的流程如下:
1.create操作符通過ObservableOnSubscribe對象構(gòu)造ObservableCreate對象;

  1. ObservableCreate在執(zhí)行訂閱方法subscribeActual時芒划,通過Observer對象構(gòu)造發(fā)射器CreateEmitter;
  2. CreateEmitter發(fā)射數(shù)據(jù)最終會調(diào)用Observer對應(yīng)的接收數(shù)據(jù)方法冬竟。

Observable的訂閱方法

上面我們理解了接收數(shù)據(jù)的流程,下面我們瞅瞅Observable和Observer建立聯(lián)系的訂閱方法:

 @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
            //subscribeActual核心代碼C癖啤1门埂!
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

lei了lei了拼苍!看到subscribeActual方法就像遇到親人了笑诅,之前我們了解到Create操作法創(chuàng)建的是ObservableCreate對象,這里用戶執(zhí)行訂閱方法時會調(diào)用subscribeActual映屋,我們再回頭看看ObservableCreate的subscribeActual實現(xiàn):

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        //根據(jù)Observer構(gòu)造發(fā)射器
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            //這里由用戶實現(xiàn)發(fā)射數(shù)據(jù)操作
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

讀到這里苟鸯,發(fā)現(xiàn)整個數(shù)據(jù)的產(chǎn)生和接收終于打通同蜻,訂閱方法通過 source.subscribe(parent)由用戶發(fā)射數(shù)據(jù)棚点,在本樣例中就是:

final Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            //2.訂閱的時候發(fā)送事件
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);

//                emitter.onError(new Throwable("haha"));
                emitter.onComplete();//onComplete事件發(fā)送后,后面的所有事件無效,且后面不能發(fā)送錯誤事件
                emitter.onNext(3);
            }
        });

emitter參數(shù)就是CreateEmitter類型發(fā)射器parent湾蔓。相信看到這里瘫析,整個數(shù)據(jù)的流程應(yīng)該比較清晰了。數(shù)據(jù)流向如下:
Observable訂閱Observer--> Observable執(zhí)行subscribeActual方法--> ObservableOnSubscribe執(zhí)行subscribe方法--> ObservableEmitter執(zhí)行發(fā)射數(shù)據(jù)方法--> Observer執(zhí)行接收數(shù)據(jù)方法默责。
我們可以看到贬循,RxJava規(guī)定了數(shù)據(jù)從Observable到Observer的統(tǒng)一流程,至于用戶發(fā)送什么數(shù)據(jù)桃序、按什么順序發(fā)都通過中間件ObservableOnSubscribe和ObservableEmitter實現(xiàn)杖虾。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市媒熊,隨后出現(xiàn)的幾起案子奇适,更是在濱河造成了極大的恐慌坟比,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,525評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件嚷往,死亡現(xiàn)場離奇詭異葛账,居然都是意外死亡,警方通過查閱死者的電腦和手機皮仁,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,203評論 3 395
  • 文/潘曉璐 我一進店門籍琳,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人贷祈,你說我怎么就攤上這事趋急。” “怎么了势誊?”我有些...
    開封第一講書人閱讀 164,862評論 0 354
  • 文/不壞的土叔 我叫張陵宣谈,是天一觀的道長。 經(jīng)常有香客問我键科,道長闻丑,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,728評論 1 294
  • 正文 為了忘掉前任勋颖,我火速辦了婚禮嗦嗡,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘饭玲。我一直安慰自己侥祭,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,743評論 6 392
  • 文/花漫 我一把揭開白布茄厘。 她就那樣靜靜地躺著矮冬,像睡著了一般。 火紅的嫁衣襯著肌膚如雪次哈。 梳的紋絲不亂的頭發(fā)上胎署,一...
    開封第一講書人閱讀 51,590評論 1 305
  • 那天,我揣著相機與錄音窑滞,去河邊找鬼琼牧。 笑死,一個胖子當(dāng)著我的面吹牛哀卫,可吹牛的內(nèi)容都是我干的巨坊。 我是一名探鬼主播,決...
    沈念sama閱讀 40,330評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼此改,長吁一口氣:“原來是場噩夢啊……” “哼趾撵!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起共啃,我...
    開封第一講書人閱讀 39,244評論 0 276
  • 序言:老撾萬榮一對情侶失蹤占调,失蹤者是張志新(化名)和其女友劉穎勋拟,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體妈候,經(jīng)...
    沈念sama閱讀 45,693評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡敢靡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,885評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了苦银。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片啸胧。...
    茶點故事閱讀 40,001評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖幔虏,靈堂內(nèi)的尸體忽然破棺而出纺念,到底是詐尸還是另有隱情,我是刑警寧澤想括,帶...
    沈念sama閱讀 35,723評論 5 346
  • 正文 年R本政府宣布陷谱,位于F島的核電站,受9級特大地震影響瑟蜈,放射性物質(zhì)發(fā)生泄漏烟逊。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,343評論 3 330
  • 文/蒙蒙 一铺根、第九天 我趴在偏房一處隱蔽的房頂上張望宪躯。 院中可真熱鬧,春花似錦位迂、人聲如沸访雪。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,919評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽臣缀。三九已至,卻和暖如春泻帮,著一層夾襖步出監(jiān)牢的瞬間精置,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,042評論 1 270
  • 我被黑心中介騙來泰國打工刑顺, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留氯窍,地道東北人饲常。 一個月前我還...
    沈念sama閱讀 48,191評論 3 370
  • 正文 我出身青樓蹲堂,卻偏偏與公主長得像,于是被迫代替她去往敵國和親贝淤。 傳聞我的和親對象是個殘疾皇子柒竞,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,955評論 2 355

推薦閱讀更多精彩內(nèi)容