RxJava2 源碼分析(一)

前言

最近由于項(xiàng)目需要自己搭建了網(wǎng)絡(luò)框架,采用時下非常流行的Rxjava2 + Retrofit搭建澳淑, Rxjava現(xiàn)在已經(jīng)發(fā)展到Rxjava2臭胜,之前一直都只是再用Rxjava还最,但從來沒有了解下Rxjava的內(nèi)部實(shí)現(xiàn)芭逝,未來知其然并且知其所以然,今天我將一步步來分析Rxjava2的源碼舆驶,Rxjava2分Observable和Flowable兩種(無被壓和有被壓)蔬浙,我們今天先從簡單的無背壓的observable來分析。如有不對的地方贞远,望大牛指教&輕拍畴博。源碼基于rxjava:2.1.1。

簡單的例子

先來段最簡單的代碼蓝仲,直觀的了解下整個Rxjava運(yùn)行的完整流程俱病。

private void doSomeWork() {
        Observable<String> observable =  Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("a");
                e.onComplete();
            }
        });
        Observer observer = new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {
                Log.i("lx", " onSubscribe : " + d.isDisposed());
            }

            @Override
            public void onNext(String str) {
                Log.i("lx", " onNext : " + str);
            }

            @Override
            public void onError(Throwable e) {
                Log.i("lx", " onError : " + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.i("lx", " onComplete");
            }
        };
        observable.subscribe(observer);
    }

上面代碼之所以將observable和observer單獨(dú)聲明,最后再調(diào)用observable.subscribe(observer);
是為了分步來分析:

  1. 被觀察者 Observable 如何生產(chǎn)事件的
  2. 被觀察者 Observable 何時生產(chǎn)事件的
  3. 觀察者Observer是何時接收到上游事件的
  4. Observable 與Observer是如何關(guān)聯(lián)在一起的

Observable

Observable是數(shù)據(jù)的上游袱结,即事件生產(chǎn)者
首先來分析事件是如何生成的亮隙,直接看代碼 Observable.create()方法。

   @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {    // ObservableOnSubscribe 是個接口垢夹,只包含subscribe方法溢吻,是事件生產(chǎn)的源頭。
        ObjectHelper.requireNonNull(source, "source is null"); // 判空
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

最重要的是RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));這句代碼果元。繼續(xù)跟蹤進(jìn)去

/**
     * Calls the associated hook function.
     * @param <T> the value type
     * @param source the hook's input value
     * @return the value returned by the hook
     */
    @SuppressWarnings({ "rawtypes", "unchecked" })
    @NonNull
    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

看注釋促王,原來這個方法是個hook function。 通過調(diào)試得知靜態(tài)對象onObservableAssembly默認(rèn)為null而晒, 所以此方法直接返回傳入的參數(shù)source蝇狼。
onObservableAssembly可以通過靜態(tài)方法RxJavaPlugins. setOnObservableAssembly ()設(shè)置全局的Hook函數(shù), 有興趣的同學(xué)可以自己去試試倡怎。 這里暫且不談迅耘,我們繼續(xù)返回代碼贱枣。
現(xiàn)在我們明白了:

 Observable<String> observable=Observable.create(new ObservableOnSubscribe<String>() {
...
...
})

相當(dāng)于:

Observable<String> observable=new ObservableCreate(new ObservableOnSubscribe<String>() {
...
...
}))

好了,至此我們明白了颤专,事件的源就是new ObservableCreate()對象纽哥,將ObservableOnSubscribe作為參數(shù)傳遞給ObservableCreate的構(gòu)造函數(shù)。
事件是由接口ObservableOnSubscribe的subscribe方法上產(chǎn)的栖秕,至于何時生產(chǎn)事件春塌,稍后再分析。

Observer

Observer 是數(shù)據(jù)的下游累魔,即事件消費(fèi)者
Observer是個interface,包含 :

   void onSubscribe(@NonNull Disposable d);
    void onNext(@NonNull T t);
    void onError(@NonNull Throwable e);
    void onComplete();

上游發(fā)送的事件就是再這幾個方法中被消費(fèi)的够滑。上游何時發(fā)送事件垦写、如何發(fā)送,稍后再表彰触。

subscribe

重點(diǎn)來了梯投,接下來最重要的方法來了:observable.subscribe(observer);
從這個方法的名字就知道,subscribe是訂閱况毅,是將觀察者(observer)與被觀察者(observable)連接起來的方法分蓖。只有subscribe方法執(zhí)行后,上游產(chǎn)生的事件才能被下游接收并處理尔许。其實(shí)自然的方式應(yīng)該是observer訂閱(subscribe) observable, 但這樣會打斷rxjava的鏈?zhǔn)浇Y(jié)構(gòu)么鹤。所以采用相反的方式。
接下來看源碼味廊,只列出關(guān)鍵代碼

public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        ......
       observer = RxJavaPlugins.onSubscribe(this, observer); // hook 蒸甜,默認(rèn)直接返回observer
       ......
       subscribeActual(observer);  // 這個才是真正實(shí)現(xiàn)訂閱的方法。
       ......
    }

// subscribeActual 是抽象方法余佛,所以需要到實(shí)現(xiàn)類中去看具體實(shí)現(xiàn)柠新,也就是說實(shí)現(xiàn)是在上文中提到的ObservableCreate中
protected abstract void subscribeActual(Observer<? super T> observer);

接下來我們來看ObservableCreate.java:

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;  // 事件源,生產(chǎn)事件的接口辉巡,由我們自己實(shí)現(xiàn)
    }

   @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer); // 發(fā)射器
        observer.onSubscribe(parent);  //直接回調(diào)了觀察者的onSubscribe

        try {
            // 調(diào)用了事件源subscribe方法生產(chǎn)事件恨憎,同時將發(fā)射器傳給事件源。 
            // 現(xiàn)在我們明白了郊楣,數(shù)據(jù)源生產(chǎn)事件的subscribe方法只有在observable.subscribe(observer)被執(zhí)行
              后才執(zhí)行的憔恳。 換言之,事件流是在訂閱后才產(chǎn)生的净蚤。
            //而observable被創(chuàng)建出來時并不生產(chǎn)事件喇嘱,同時也不發(fā)射事件。
          source.subscribe(parent);  
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

現(xiàn)在我們明白了塞栅,數(shù)據(jù)源生產(chǎn)事件的subscribe方法只有在observable.subscribe(observer)被執(zhí)行后才執(zhí)行的者铜。 換言之腔丧,事件流是在訂閱后才產(chǎn)生的。而observable被創(chuàng)建出來時并不生產(chǎn)事件作烟,同時也不發(fā)射事件愉粤。
接下來我們再來看看事件是如何被發(fā)射出去,同時observer是如何接收到發(fā)射的事件的
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
CreateEmitter 實(shí)現(xiàn)了ObservableEmitter接口拿撩,同時ObservableEmitter接口又繼承了Emitter接口衣厘。
CreateEmitter 還實(shí)現(xiàn)了Disposable接口,這個disposable接口是用來判斷是否中斷事件發(fā)射的压恒。
從名稱上就能看出影暴,這個是發(fā)射器,故名思議是用來發(fā)射事件的探赫,正是它將上游產(chǎn)生的事件發(fā)射到下游的型宙。
Emitter是事件源與下游的橋梁。
CreateEmitter 主要包括方法:

    void onNext(@NonNull T value);
    void onError(@NonNull Throwable error);
    void onComplete();
    public void dispose() ;
    public boolean isDisposed();

是不是跟observer的方法很像伦吠?
我們來看看CreateEmitter中這幾個方法的具體實(shí)現(xiàn):
只列出關(guān)鍵代碼

     public void onNext(T t) {
         if (!isDisposed()) { // 判斷事件是否需要被丟棄
             observer.onNext(t); // 調(diào)用Emitter的onNext妆兑,它會直接調(diào)用observer的onNext
         }
      }
      public void onError(Throwable t) {
           if (!isDisposed()) {
                try {
                    observer.onError(t); // 調(diào)用Emitter的onError,它會直接調(diào)用observer的onError
                } finally {
                    dispose();  // 當(dāng)onError被觸發(fā)時毛仪,執(zhí)行dispose(), 后續(xù)onNext搁嗓,onError, onComplete就不會繼
                                    續(xù)發(fā)射事件了
                }
            }
        }

       @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete(); // 調(diào)用Emitter的onComplete箱靴,它會直接調(diào)用observer的onComplete
                } finally {
                    dispose();  // 當(dāng)onComplete被觸發(fā)時腺逛,也會執(zhí)行dispose(), 后續(xù)onNext,onError衡怀, onComplete
                                      同樣不會繼續(xù)發(fā)射事件了
                }
            }
        }

CreateEmitter 的onError和onComplete方法任何一個執(zhí)行完都會執(zhí)行dispose()中斷事件發(fā)射屉来,所以observer中的onError和onComplete也只能有一個被執(zhí)行。
現(xiàn)在終于明白了狈癞,事件是如何被發(fā)射給下游的茄靠。
當(dāng)訂閱成功后,數(shù)據(jù)源ObservableOnSubscribe開始生產(chǎn)事件蝶桶,調(diào)用Emitter的onNext慨绳,onComplete向下游發(fā)射事件,Emitter包含了observer的引用真竖,又調(diào)用了observer onNext脐雪,onComplete,這樣下游observer就接收到了上游發(fā)射的數(shù)據(jù)恢共。

總結(jié)

Rxjava的流程大概是:

  1. Observable.create 創(chuàng)建事件源战秋,但并不生產(chǎn)也不發(fā)射事件。
  2. 實(shí)現(xiàn)observer接口讨韭,但此時沒有也無法接受到任何發(fā)射來的事件脂信。
  3. 訂閱 observable.subscribe(observer)癣蟋, 此時會調(diào)用具體Observable的實(shí)現(xiàn)類中的subscribeActual方法,
    此時會才會真正觸發(fā)事件源生產(chǎn)事件狰闪,事件源生產(chǎn)出來的事件通過Emitter的onNext疯搅,onError,onComplete發(fā)射給observer對應(yīng)的方法由下游observer消費(fèi)掉埋泵。從而完成整個事件流的處理幔欧。

PS: observer中的onSubscribe在訂閱時即被調(diào)用,并傳回了Disposable丽声, observer中可以利用Disposable來隨時中斷事件流的發(fā)射礁蔗。

今天所列舉的例子是最簡單的一個事件處理流程,沒有使用線程調(diào)度雁社,Rxjava最強(qiáng)大的就是異步時對線程的調(diào)度和隨時切換觀察者線程浴井。至于這部分的源碼且聽下回講解。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末歧胁,一起剝皮案震驚了整個濱河市滋饲,隨后出現(xiàn)的幾起案子厉碟,更是在濱河造成了極大的恐慌喊巍,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,464評論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件箍鼓,死亡現(xiàn)場離奇詭異崭参,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)款咖,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,033評論 3 399
  • 文/潘曉璐 我一進(jìn)店門何暮,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人铐殃,你說我怎么就攤上這事海洼。” “怎么了富腊?”我有些...
    開封第一講書人閱讀 169,078評論 0 362
  • 文/不壞的土叔 我叫張陵坏逢,是天一觀的道長。 經(jīng)常有香客問我赘被,道長是整,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,979評論 1 299
  • 正文 為了忘掉前任民假,我火速辦了婚禮浮入,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘羊异。我一直安慰自己事秀,他們只是感情好彤断,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,001評論 6 398
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著秽晚,像睡著了一般瓦糟。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上赴蝇,一...
    開封第一講書人閱讀 52,584評論 1 312
  • 那天菩浙,我揣著相機(jī)與錄音,去河邊找鬼句伶。 笑死劲蜻,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的考余。 我是一名探鬼主播先嬉,決...
    沈念sama閱讀 41,085評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼楚堤!你這毒婦竟也來了疫蔓?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 40,023評論 0 277
  • 序言:老撾萬榮一對情侶失蹤身冬,失蹤者是張志新(化名)和其女友劉穎衅胀,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體酥筝,經(jīng)...
    沈念sama閱讀 46,555評論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡滚躯,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,626評論 3 342
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了嘿歌。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片掸掏。...
    茶點(diǎn)故事閱讀 40,769評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖宙帝,靈堂內(nèi)的尸體忽然破棺而出丧凤,到底是詐尸還是另有隱情,我是刑警寧澤步脓,帶...
    沈念sama閱讀 36,439評論 5 351
  • 正文 年R本政府宣布愿待,位于F島的核電站,受9級特大地震影響沪编,放射性物質(zhì)發(fā)生泄漏呼盆。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,115評論 3 335
  • 文/蒙蒙 一蚁廓、第九天 我趴在偏房一處隱蔽的房頂上張望访圃。 院中可真熱鬧,春花似錦相嵌、人聲如沸腿时。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,601評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽批糟。三九已至格了,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間徽鼎,已是汗流浹背盛末。 一陣腳步聲響...
    開封第一講書人閱讀 33,702評論 1 274
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留否淤,地道東北人悄但。 一個月前我還...
    沈念sama閱讀 49,191評論 3 378
  • 正文 我出身青樓,卻偏偏與公主長得像石抡,于是被迫代替她去往敵國和親檐嚣。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,781評論 2 361

推薦閱讀更多精彩內(nèi)容