RxJava2筆記(一、事件訂閱流程)

前言

接觸RxJava也有將近2年了导街,雖然能夠熟練使用披泪,可是對里面的運行流程一直都是未知半解,雖然中間也有看過網(wǎng)上的博客搬瑰,但總是看了又忘記款票。直到最近才下定決心,寫一個關(guān)于RxJava2的系列專題跌捆,好好學(xué)習(xí)一下里面的源碼流程徽职,并以文章的形式記錄下來(主要是防止自己過段時間又忘記了,誰叫我記性差呢佩厚,咳咳...)姆钉。希望我能堅持寫完,由于水平有限抄瓦,如有錯誤之處還請指正潮瓶。

好了,話不多說钙姊,讓我們先從一段代碼開始

Observer<Integer> observer = new Observer<Integer>() {
    Disposable disposable;

    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe:");
        disposable = d;
    }

    @Override
    public void onNext(Integer integer) {
        Log.i(TAG, "onNext: " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError: " + e.getMessage());
        e.printStackTrace();
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete:");
    }
};

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onNext(3);
        emitter.onComplete();
    }
}).subscribe(observer);

以上便是RxJava的教學(xué)課程中一開始就教大家寫的代碼(嗯毯辅,我個人認為是這樣的。)輸出結(jié)果如下:


image.png

可以看到煞额,onSubscribe是最先被調(diào)用的思恐;其次依次執(zhí)行onNext發(fā)送數(shù)1,2膊毁,3胀莹;最后執(zhí)行onComplete結(jié)束數(shù)據(jù)發(fā)送。
好了婚温,代碼執(zhí)行完了描焰,那它的運行流程是什么樣子的呢?我們先從事件源開始栅螟,也就是Observable.create()方法荆秦,如下:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

可以看到篱竭,create方法接收一個ObservableOnSubscribe<T>接口類型的參數(shù)

public interface ObservableOnSubscribe<T> {
    /**
     * Called for each Observer that subscribes.
     * @param emitter the safe emitter instance, never null
     * @throws Exception on error
     */
    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}

該接口只有一個方法subscribe,咦步绸?好像和文章一開始的代碼最后面的subscribe方法一模一樣啊掺逼,不過我們先不管這些,繼續(xù)往下看靡努,該方法接收一個ObservableEmitter<T>接口類型的參數(shù):

public interface ObservableEmitter<T> extends Emitter<T> {
    void setDisposable(@Nullable Disposable d);

    void setCancellable(@Nullable Cancellable c);

    boolean isDisposed();

    @NonNull
    ObservableEmitter<T> serialize();

    @Experimental
    boolean tryOnError(@NonNull Throwable t);
}

ObservableEmitter繼承自接口Emitter:

public interface Emitter<T> {
    void onNext(@NonNull T value);

    void onError(@NonNull Throwable error);

    void onComplete();
}

Emitter里面的三個方法還是蠻熟悉的坪圾,剛好和observer中的onNext,onComplete和onError一一對應(yīng)惑朦,他們之間是怎么建立聯(lián)系的呢兽泄?好像還漏了個onSubscribe,這個方法又是什么時候調(diào)用的呢漾月?我們回過頭來繼續(xù)看Observable.create方法病梢。

讓我們再看一眼Observable.create方法:

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

上面說到該方法接收類型為ObservableOnSubscribe<T>接口的參數(shù),然后將其傳入到ObservableCreate<T>類的構(gòu)造方法中梁肿,這個RxJavaPlugins.onAssembly又是什么鬼蜓陌?我們看一眼這個方法:

@SuppressWarnings({ "rawtypes", "unchecked" })
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    Function<? super Observable, ? extends Observable> f = onObservableAssembly;
    if (f != null) {
        return apply(f, source);
    }
    return source;
}

根據(jù)這個方法的描述,它返回一個鉤子(hook)吩蔑,也就是將接收的參數(shù)做了一下處理再將這個參數(shù)返回給調(diào)用者钮热。RxjavaPlugins類的其他方法也是類似的處理過程≈蚍遥總之我們只要明白這個方法把傳進去的參數(shù)最終又返回給了調(diào)用者就行了隧期。

我們再回到Observable.create方法中,可以看到赘娄,create方法將傳進去的ObservableOnSubscribe<T>又傳遞給了ObservableCreate<T>類的構(gòu)造方法仆潮,最后通過RxJavaPlugins.onAssembly將這個構(gòu)造方法生成的ObservableCreate對象返回給調(diào)用者,這個ObservableCreate也是繼承自O(shè)bservable類遣臼。于是一個具體的Observable對象就誕生了性置,其具體對象類型為ObservableCreate類,接下來我們就來介紹這個類:

在介紹ObservableCreate類之前揍堰,我們先來梳理下思路:

1鹏浅、Observable.create方法生成一個Observable對象,也就是被觀察者
2屏歹、該方法需要接收一個ObservableOnSubscribe<T>類型的參數(shù)
3隐砸、將接收到的ObservableOnSubscribe<T>類型的參數(shù)傳遞到ObservableCreate<T>類的構(gòu)造方法中,生成ObservableCreate對象西采,并通過RxJavaPlugins.onAssembly方法這個對象返回凰萨。
這樣继控,一個具體的被觀察者對象就誕生了械馆,他就是ObservableCreate對象胖眷。

看樣子我們的事件訂閱就是在這個ObservableCreate類中完成的,那我們就來看看它做了哪些工作霹崎。

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

    static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            //......代碼省略
        }
        //......代碼省略
    }
    //......代碼省略
}

我們看到該類有個source成員變量珊搀,類型是ObservableOnSubscribe<T>,正是Observable.create方法傳入的參數(shù)尾菇。我們重點來看subscribeActual(Observer<? super T> observer)這個方法:

subscribeActual方法分析
  • 1境析、subscribeActual接收一個Observer類型的參數(shù)(觀察者),看樣子好像是我們在訂閱的時候傳入的observer派诬,到底是不是我們后面再看
  • 2劳淆、將傳入的observer參數(shù)包裝成一個CreateEmitter。
  • 3默赂、observer調(diào)用自己的onSubscribe方法沛鸵,這個方法的參數(shù)正式上面包裝observer的CreateEmitter。
  • 4缆八、source.subscribe(parent)真正的訂閱發(fā)生的地方曲掰。這個source就是Observable.create方法中傳入的 ObservableOnSubscribe對象,parent則是步驟2中包裝observer的CreateEmitter奈辰。

因此這里執(zhí)行的subscribe方法正是Observable.create方法中所傳入ObservableOnSubscribe接口里面的subscribe方法栏妖。這樣一來我們就明白了在文章開始的代碼中,subscribe方法中我們調(diào)用emitter.onNext奖恰,emitter.onComplete吊趾,這個emitter實際上就是步驟2中包裝observer后生成的CreateEmitter對象,CreateEmitter類實現(xiàn)了ObservableEmitter接口房官。

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onNext(3);
        emitter.onComplete();
    }
}).subscribe(observer);
  • 5趾徽、這里在source.subscribe(parent)中進行了異常捕獲,如果subscribe拋出了異常翰守,則調(diào)用parent.onError(ex);

這樣看來孵奶,我們的觀察者observer和CreateEmitter之間有著很大的聯(lián)系,我們來分析下CreateEmitter這個類:

static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
    private static final long serialVersionUID = -3434801548987643227L;
    final Observer<? super T> observer;

    CreateEmitter(Observer<? super T> observer) {
        this.observer = observer;
    }

    @Override
    public void onNext(T t) {
        //......代碼省略
        if (!isDisposed()) {
            observer.onNext(t);
        }
    }

    @Override
    public void onError(Throwable t) {
        if (!tryOnError(t)) {
            RxJavaPlugins.onError(t);
        }
    }

    @Override
    public boolean tryOnError(Throwable t) {
        //......代碼省略
        if (!isDisposed()) {
            try {
                observer.onError(t);
            } finally {
                dispose();
            }
            return true;
        }
        return false;
    }

    @Override
    public void onComplete() {
        if (!isDisposed()) {
            try {
                observer.onComplete();
            } finally {
                dispose();
            }
        }
    }
    //......代碼省略
}

從上面代碼中我們看到蜡峰,這個類繼承自ActimicReference了袁,這樣它便是原子級的,同時實現(xiàn)了ObservableEmitter<T>和Disposable接口(關(guān)于Disposable我們在下一章討論)湿颅,而ObservableEmitter又繼承自Emitter接口载绿。因此我們重點看CreateEmitter所實現(xiàn)的Emitter中的三個方法,即分別是onNext油航,onComplete和OnError崭庸。首先我們看它的onNext方法

@Override
public void onNext(T t) {
    //......代碼省略
    if (!isDisposed()) {
        observer.onNext(t);
    }
}

簡要介紹下isDisposed()這個方法,該方法為true時表示訂閱被中斷,為false時正好相反怕享。在這里面當!isDisposed()為true時表示訂閱未被中斷执赡,此時執(zhí)行observer.onNext(t);這個observer正是通過CreateEmitter構(gòu)造方法傳遞進來的,也就是subscribeActual方法所接收的觀察者對象函筋。因此observer和CreateEmitter之間的onNext方法就是通過這種方式建立的聯(lián)系沙合,onComplete和onError同理。

至此跌帐,observer和CreateEmitter之間的關(guān)聯(lián)就分析完了首懈。還有一個問題,就是上面的步驟1中留下來的問題谨敛,就是這個subscribeActual方法所接收的observer是不是我們訂閱時傳進去的observer觀察者究履?我們就返回到最初的訂閱代碼:


image.png

這個subscribe執(zhí)行了哪些操作呢?我們點進去看看:

public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);

        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not
        // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);

        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    }
}

從上面的方法中我們可以很明顯的看出subscribeActual(observer)中接收的observer正式來自于我們訂閱時傳入的觀察者脸狸,在Observable類中subscribeActual方法是一個抽象方法挎袜,需要其子類去實現(xiàn),在上面的思路梳理中我們得出的結(jié)論是Observable.create生成的具體Observable對象是其子類ObservableCreate肥惭;然后ObservableCreate調(diào)用subscribe方法完成訂閱盯仪,因此此處執(zhí)行的subscribeActual(observer);正是ObservableCreate內(nèi)部的subscribeActual方法。

到這里我們的事件訂閱流程就分析完了蜜葱,最后我們再來總結(jié)一下:

結(jié)論

  • 1全景、Observable.create()方法接收一個ObservableOnSubscribe接口類型的對象(source),并將這個接收的對象作為參數(shù)傳遞到ObservableCreate類的構(gòu)造方法中生成一個Observable子類對象ObservableCreate對象(new ObservableCreate(source))牵囤,最后將其返回作為事件源(被觀察者)爸黄。
  • 2、然后調(diào)用subscribe(observer)方法揭鳞,實際上執(zhí)行的是observableCreate.subscribe(observer)炕贵。在這個方法中調(diào)用subscribeActual(observer);其參數(shù)正是我們前面自己寫的observer對象(觀察者)。而在subscribeActual(observer)方法內(nèi)部野崇,首先將傳入的observer包裝為CreateEmitter對象(parent)称开,然后執(zhí)行observer.onSubscribe(parent)。至此觀察者和被觀察者之間正式建立訂閱關(guān)系乓梨。
  • 3鳖轰、最后執(zhí)行方法source.subscribe(parent),這個方法實際上是ObservableOnSubscribe接口中的方法扶镀,也就是我們自己手寫的ObservableOnSubscribe實現(xiàn)類中的方法:


    image.png

    在上圖中蕴侣,emitter就是傳入的parent,也就是結(jié)論2中將外部傳進來的observer包裝起來的CreateEmitter對象臭觉。因此昆雀,當我們調(diào)用emitter.onNext,onComplete,onError等方法時辱志,實際上調(diào)用的是CreateEmitter內(nèi)部的onNext,onComplete,onError方法;在CreateEmitter內(nèi)部的onNext,onComplete,onError方法中狞膘,又調(diào)用了observer.onNext,onComplete,onError方法荸频,這個observer正是外部傳進來的觀察者對象,如下圖所示:


    image.png

好了客冈,整個的時間訂閱流程終于分析完了,當然了有事件訂閱自然就有取消訂閱稳强,下一章RxJava2筆記(二场仲、事件取消流程)我們將分析事件是如何取消訂閱的。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末退疫,一起剝皮案震驚了整個濱河市渠缕,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌褒繁,老刑警劉巖亦鳞,帶你破解...
    沈念sama閱讀 219,539評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異棒坏,居然都是意外死亡燕差,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,594評論 3 396
  • 文/潘曉璐 我一進店門坝冕,熙熙樓的掌柜王于貴愁眉苦臉地迎上來徒探,“玉大人,你說我怎么就攤上這事喂窟〔獍担” “怎么了?”我有些...
    開封第一講書人閱讀 165,871評論 0 356
  • 文/不壞的土叔 我叫張陵磨澡,是天一觀的道長碗啄。 經(jīng)常有香客問我,道長稳摄,這世上最難降的妖魔是什么稚字? 我笑而不...
    開封第一講書人閱讀 58,963評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮厦酬,結(jié)果婚禮上尉共,老公的妹妹穿的比我還像新娘。我一直安慰自己弃锐,他們只是感情好袄友,可當我...
    茶點故事閱讀 67,984評論 6 393
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著霹菊,像睡著了一般剧蚣。 火紅的嫁衣襯著肌膚如雪支竹。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,763評論 1 307
  • 那天鸠按,我揣著相機與錄音礼搁,去河邊找鬼。 笑死目尖,一個胖子當著我的面吹牛馒吴,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播瑟曲,決...
    沈念sama閱讀 40,468評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼饮戳,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了洞拨?” 一聲冷哼從身側(cè)響起扯罐,我...
    開封第一講書人閱讀 39,357評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎烦衣,沒想到半個月后歹河,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,850評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡花吟,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,002評論 3 338
  • 正文 我和宋清朗相戀三年秸歧,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片衅澈。...
    茶點故事閱讀 40,144評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡寥茫,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出矾麻,到底是詐尸還是另有隱情纱耻,我是刑警寧澤,帶...
    沈念sama閱讀 35,823評論 5 346
  • 正文 年R本政府宣布险耀,位于F島的核電站弄喘,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏甩牺。R本人自食惡果不足惜蘑志,卻給世界環(huán)境...
    茶點故事閱讀 41,483評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望贬派。 院中可真熱鬧急但,春花似錦、人聲如沸搞乏。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,026評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽请敦。三九已至镐躲,卻和暖如春储玫,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背萤皂。 一陣腳步聲響...
    開封第一講書人閱讀 33,150評論 1 272
  • 我被黑心中介騙來泰國打工撒穷, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人裆熙。 一個月前我還...
    沈念sama閱讀 48,415評論 3 373
  • 正文 我出身青樓端礼,卻偏偏與公主長得像,于是被迫代替她去往敵國和親入录。 傳聞我的和親對象是個殘疾皇子蛤奥,可洞房花燭夜當晚...
    茶點故事閱讀 45,092評論 2 355

推薦閱讀更多精彩內(nèi)容