RxJava 源碼分析系列(一) - Observable的基本分析

??樓主最近在找實習(xí)工作,由于簡歷上說了解RxJava,所以在面試的時候應(yīng)該會問到RxJava的知識,于是樓主結(jié)合RxJava的源碼怜姿,對RxJava的工作原理進行初步的了解。也只敢說是初步了解疼燥,因為自己也是第一次看RxJava的源碼沧卢,理解的程度肯定不是很深。還是那樣醉者,如果有錯誤之處但狭,希望各位指正!
??本文參考:

??1.除非特殊說明撬即,源碼來自:2.2.0版本
??2.RxJava從源碼到應(yīng)用 移動端開發(fā)效率秒提速

1.概述

??樓主打算將RxJava的源碼分析寫成一個系列文章立磁,所以這個是這個系列的第一篇文章,在概述里面還是對RxJava是什么簡單的介紹一下剥槐,本系列文章不會對RxJava的基本用法進行展開唱歧,如果有老哥對RxJava的基本使用掌握的不是很好的話,推薦這個系列的文章:給初學(xué)者的RxJava2.0教程(一)粒竖。
??簡單的說一下RxJava颅崩,RxJava是基于觀察者模式的一個框架,在RxJava中有兩個角色蕊苗,一個Observable沿后,通常被稱為被觀察者,一個是Observer朽砰,通常被稱為觀察者得运∠ヲ冢總體的架構(gòu)是,由Observable來處理任務(wù)或者發(fā)送事件熔掺,然后在Observer里面來接受到Observable發(fā)送過來的信息饱搏。
??RxJava有很多的優(yōu)勢,比如線程調(diào)度置逻,在Android里面推沸,耗時操作必須放在子線程中,但是同時還需要主線程來更細(xì)UI券坞,所以線程調(diào)度就顯得尤為重要鬓催。當(dāng)然RxJava還有很多重要的操作符,使得我們的開發(fā)變得非常的方便恨锚。本系列文章不會對每個操作符的基本使用展開宇驾,而是對一些比較常用的操作源碼分析,所說的常用猴伶,也是指樓主用到的?紊帷!畢竟是菜雞他挎,肯定有很多的東西都不太懂筝尾。

2.基本元素

??想要對RxJava的基本原理有一個更好的了解,必須對它的基本有一個大概的了解办桨。我們先通過一個簡單的案例筹淫,來對RxJava的基本元素進行提取。

    Observable.create(new ObservableOnSubscribe<String>() {
      @Override
      public void subscribe(ObservableEmitter<String> emitter) throws Exception {

      }
    }).subscribe(new Observer<String>() {
      @Override
      public void onSubscribe(Disposable d) {

      }

      @Override
      public void onNext(String s) {

      }

      @Override
      public void onError(Throwable e) {

      }

      @Override
      public void onComplete() {

      }
    });

??在這個簡單的案例當(dāng)中呢撞,我們可以提取的元素有:Observable, ObservableOnSubscribe, ObservableEmitter,Observer损姜。
??元素還是挺少的,我們現(xiàn)在對每個元素的類結(jié)構(gòu)來進行簡單的分析一下殊霞。

(1).Observable

public abstract class Observable<T> implements ObservableSource<T> {
}

??我們發(fā)現(xiàn)Observable本身是一個抽象類薛匪,并且實現(xiàn)了ObservableSource接口,在來看看ObservableSource接口里面有什么脓鹃。

public interface ObservableSource<T> {
    void subscribe(@NonNull Observer<? super T> observer);
}

??ObservableSource接口里面只有一個subscribe方法,也就是說逸尖,RxJava將注冊觀察者這部分的功能提取成一個接口,從而可以看出來瘸右,面向接口編程是多么的重要????娇跟。。太颤。
??再分別來看看我們上面案例中使用的兩個方法--createsubscribe苞俘。

    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    // 先省略代碼部分,待會詳細(xì)的分析龄章。 
    }

??啊吃谣,嚇我一跳乞封,我以為create方法的參數(shù)又是一個接口類型,還好是ObservableOnSubscribe類型岗憋,也是上面提取出來的元素其中之一肃晚,關(guān)于這個類,待會會詳細(xì)的分析仔戈。

    public final void subscribe(Observer<? super T> observer) {
      //...
    }

??這個方法就更加的簡單了关串,就是傳遞了一個Observer接口的對象。不過需要注意的是這個方法有很多的重載监徘,其中以Consumer類型的操作最為多晋修,不過這個也沒什么,最后還是Consumer轉(zhuǎn)換成為了Observer凰盔,這個就涉及到Observer接口的一個實現(xiàn)類--LambdaObserver墓卦。不要害怕,待會都會一一的講解的户敬。

(2).Observer

??說了被觀察者落剪,我們先來看看觀察者--Observer

public interface Observer<T> {
    void onSubscribe(@NonNull Disposable d);
    void onNext(@NonNull T t);
    void onError(@NonNull Throwable e);
    void onComplete();
}

??哎呀呀山叮,更加的簡單了, Observer只是簡單的接口添履,不過我們需要注意的是這個接口定義的4個方法屁倔,這里不講解四個方法的作用,畢竟我們這里將Observable的基本原理????暮胧。

(3).ObservableOnSubscribe

public interface ObservableOnSubscribe<T> {
    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}

??一如既往的接口锐借,subscribe方法里面就是具體做事情的地方,這個相信大佬們應(yīng)該都知道往衷,我這里就班門弄斧的提醒一下????钞翔。

(4).ObservableEmitter

public interface ObservableEmitter<T> extends Emitter<T> {
    void setDisposable(@Nullable Disposable d);
    void setCancellable(@Nullable Cancellable c);
    boolean isDisposed();
    ObservableEmitter<T> serialize();
    boolean tryOnError(@NonNull Throwable t);
}

??ObservableEmitter也是一個接口,同時繼承了Emitter接口席舍,我們來看看Emitter接口的定義

public interface Emitter<T> {
    void onNext(@NonNull T value);
    void onError(@NonNull Throwable error);
    void onComplete();
}

??作為一個發(fā)射器布轿,Emitter里面定義了很多關(guān)于發(fā)送消息給Observer的方法,EmitteronNext對應(yīng)著ObserveronNext方法,其他的方法也是類似的来颤。

3.Observable的工作原理

(1).create方法

??我們對相關(guān)部分的基本元素有了一個基本的了解汰扭,現(xiàn)在我們來對整個流程的工作原理進行分析。首先我們create方法入手

    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

??create方法沒有我們想象中的那么難福铅,就只有兩行代碼萝毛,還有一行用來check的????。對于ObservableCreate類這里先不進行分析滑黔,我們來看看 RxJavaPluginsonAssembly方法笆包。

    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

??這里提醒一下环揽,onAssembly方法的參數(shù)類型是Observable類型,也就是說ObservableCreate本身就是一個Observable庵佣。好了歉胶,扯了題外話,來看看onAssembly方法具體是干嘛的秧了。
??整個方法的執(zhí)行過程比較簡單跨扮,如果onObservableAssembly為null,直接就返回了source,也就是說返回了ObservableCreate本身验毡。而我們在整個Observable的源碼中發(fā)現(xiàn)衡创,onObservableAssembly初始值本身為null。

    public static void reset() {
        //······
        setOnObservableAssembly(null);
        //······
    }

??為什么需要這樣子繞圈子的做呢晶通?這里就是做了鉤子璃氢,以便于以后的擴展。
??所以Observablecreate方法就是返回了一個ObservableCreate對象狮辽,不過需要注意的是ObservableCreate包裹了一個ObservableOnSubscribe對象一也,也就是我們在create方法里面new的那個ObservableOnSubscribe對象。
??我們先來不急著去理解ObservableCreate是什么喉脖,還是來看看subscribe方法為我們做了什么椰苟。

(2). subscribe方法

??當(dāng)我們通過Observable的create方法來獲取一個Observable對象時,通常還會調(diào)用Observable的subscribe方法來注冊一個觀察者∈鬟矗現(xiàn)在我們來看看subscribe方法的實現(xiàn)舆蝴。

    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);
            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

??整個過程也不是想象中的那么神秘,除去check相關(guān)的方法不看题诵,歸根結(jié)底就是兩行代碼洁仗,先是通過RxJavaPluginsonSubscribe方法來獲取Observer對象,具體操作這里就不說了性锭,肯定跟RxJavaPluginsonAssembly方法差不多赠潦,最后返回的是observer本身,最后調(diào)用了subscribeActual方法草冈。這個subscribeActual方法是干嘛的她奥?

    protected abstract void subscribeActual(Observer<? super T> observer);

??臥了個槽?抽象方法怎棱!那我怎么知道調(diào)用的是哪個類的subscribeActual方法方淤?不急哈,記得我們之前在create方法返回的Observable對象是哪個類的對象嗎蹄殃?想起來了吧携茂,是ObservableCreate

(3). ObservableCreate

??先來看看ObservableCreate類結(jié)構(gòu)。

public final class ObservableCreate<T> extends Observable<T> {
}

??我們發(fā)現(xiàn)诅岩,ObservableCreate繼承了Observable,其實在分析create方法時讳苦,我也說過喲带膜。
??在ObservableCreate類中,只有一個ObservableOnSubscribe類型的成員變量鸳谜,這個成員變量就是我們在create方法里面new的ObservableOnSubscribe對象
??我們再來看看ObservableCreatesubscribeActual方法的實現(xiàn)

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

??在subscribeActual方法里面膝藕,先是對Observer對象進行一次包裝,將它包裝在CreateEmitter類中咐扭。然后我們會發(fā)現(xiàn)兩個比較眼熟的方法onSubscribe方法和subscribe方法芭挽。其中onSubscribe方法在Observer里面看到過,而這里恰好是通過Observer對象來調(diào)用的蝗肪,沒錯袜爪,這個的observer就是在subscribe方法里面new的對象⊙ι粒可是我們記得onSubscribe方法的參數(shù)類型是Disposable,而這里是一個CreateEmitter辛馆。我們來看看CreateEmitter的類結(jié)構(gòu):

    static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {
    //······
}

??沒錯,CreateEmitter實現(xiàn)了Disposable接口,所以CreateEmitter本身可以充當(dāng)Disposable的角色豁延。
??調(diào)用了ObserveronSubscribe方法之后昙篙,然后就會調(diào)用ObservableOnSubscribesubscribe方法。
??到這里诱咏,我們應(yīng)該徹底的明白了整個Observable的工作流程苔可。我們通過create方法創(chuàng)建一個ObservableCreate方法,然后調(diào)用了subscribe方法來注冊了一個觀察者袋狞,在subscribe方法里面又調(diào)用了subscribeActual方法焚辅,在subscribeActual方法里面先是調(diào)用了ObserveronSubscribe方法,然后調(diào)用了
ObservableOnSubscribesubscribe方法硕并,在ObservableOnSubscribesubscribe方法當(dāng)中法焰,具體的做的事有兩件:1.做我們自己的事情秧荆,比如從服務(wù)器上獲取數(shù)據(jù)之類倔毙;2.將發(fā)送信息到Observer去。
??理解了整個流程的工作原理乙濒,我們現(xiàn)在來看看CreateEmitter是怎么信息發(fā)給Observer的陕赃。

4. CreateEmitter的工作原理

??我們知道,我們在ObservableOnSubscribesubscribe方法里面使用ObservableEmitter來發(fā)射信息到Observer“涔桑現(xiàn)在我們來看看整個CreateEmitter的工作原理么库,不過,我們還是先來看看這個類的結(jié)構(gòu)甘有,雖然上面已經(jīng)看了诉儒,但是擔(dān)心大佬們忘了:

    static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {
    //······
}

??在上面已經(jīng)說了CreateEmitter實現(xiàn)了Disposable接口,可以作為Disposable對象來操作亏掀,在接下來忱反,我們將重點介紹Disposable是怎么控制Observer對信息的接收泛释,同時還會介紹CreateEmitter作為ObservableEmitter接口的那部分功能。
??之前在分析基本元素時温算,已經(jīng)說了ObservableEmitter這個接口怜校,它實現(xiàn)了Emitter接口。在Emitter接口里面有三個方法用來發(fā)送信息給Observer注竿,分別是:onNext茄茁,onErroronComplete巩割。而CreateEmitter類則是具體的實現(xiàn)了這三個方法裙顽,我們來看看。

        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

??代碼是非常的簡單喂分,直接調(diào)用了ObserveronNext方法锦庸,也沒用什么高逼格的東西????。其余兩個方法也是如此蒲祈。只不過是甘萧,在調(diào)用onNext方法時做了一個isDisposed的判斷。
??所以感覺Disposable才是這個類的核心梆掸。我們來看看isDisposed方法:

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }

??在isDisposed方法里面調(diào)用了DisposableHelperisDisposed方法扬卷。不過這里需要注意的是這里傳遞過去的是get方法的返回值,這個返回值什么意思酸钦?
??回到CreateEmitter的類結(jié)構(gòu)怪得,發(fā)現(xiàn)它繼承了AtomicReference類,所以get方法返回的是一個Disposable對象卑硫。
??同時徒恋,我們發(fā)現(xiàn)CreateEmitterdispose方法也是通過DisposableHelper類進行進行操作的,看看要理解Disposable的功能欢伏,必須了解DisposableHelper是怎么操作的入挣。

5.DisposableHelper

??從感官上來說,一個發(fā)射器是否dispose硝拧,直接設(shè)置一個boolean類型的flag就OK了径筏,為什么搞得這么復(fù)雜,又是AtomicReference障陶,又是DisposableHelper滋恬。這一切,我們從DisposableHelper來尋找答案抱究。
??首先我們還是來看看DisposableHelper的結(jié)構(gòu):

public enum DisposableHelper implements Disposable {
    DISPOSED
    ;
}

??DisposableHelper本身是一個enum類型恢氯,同時實現(xiàn)了Disposable接口。這里使用enum主要是為了做一個DISPOSED的單例。然后在通過isDisposed方法來判斷是否dispose勋拟,可以直接與DISPOSED比較遏暴。

    public static boolean isDisposed(Disposable d) {
        return d == DISPOSED;
    }

??既然判斷是否dispose是直接與DISPOSED比較,那么如果dispose的話指黎,應(yīng)該是將AtomicReference里面的值設(shè)置為DISPOSED吧朋凉?我們來看一下dispose方法:

    public static boolean dispose(AtomicReference<Disposable> field) {
        Disposable current = field.get();
        Disposable d = DISPOSED;
        if (current != d) {
            current = field.getAndSet(d);
            if (current != d) {
                if (current != null) {
                    current.dispose();
                }
                return true;
            }
        }
        return false;
    }

??果然,跟我們猜測一樣的醋安,AtomicReference里面的值設(shè)置為DISPOSED杂彭。只是,這里為了線程安全吓揪,做了很多的判斷操作亲怠。
??從這里我們可以得到,為什么需要設(shè)置DisposableHelper來控制dispose的狀態(tài)柠辞,那是因為線程安全团秽,如果直接設(shè)置一個flag,在有些情況下叭首,可能存在線程不安全的風(fēng)險习勤。同時為了代碼的優(yōu)雅,如果這部分的邏輯寫在CreateEmitter里面焙格,會不會顯得冗雜呢图毕?

6.總結(jié)

??寫到這里,我感覺也差不多了眷唉。這里對著部分的知識做一個總結(jié)予颤。
??1.在整個流程中,基本有Observable,ObservableOnSubscribe,ObservableEmitter,Observer冬阳,如果想要對整個過程有一個大概的理解蛤虐,必須對這幾個元素有基本的認(rèn)識。
??2.ObserveronNext之類方法的觸發(fā)時機肝陪,實際上是Observablesubscribe方法驳庭,因為subscribe方法調(diào)用了ObservablesubscribeActual方法,而在subscribeActual方法里面做了兩部分的操作:1.直接調(diào)用了ObserveronSubscribe方法见坑;2.使用ObservableEmitterObserver包裹起來嚷掠,所以我們在ObservableOnSubscribesubscribe方法用ObservableEmitter來發(fā)射信息捏检,相當(dāng)于調(diào)用了Observer的相關(guān)方法。
??3.在ObservableEmitteronNext之類方法里面,存在一種類似AOP的代碼魄懂,因為在調(diào)用Observer的相關(guān)方法鸭叙,做了一些其他的操作。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市鲫骗,隨后出現(xiàn)的幾起案子犬耻,更是在濱河造成了極大的恐慌,老刑警劉巖执泰,帶你破解...
    沈念sama閱讀 219,039評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件枕磁,死亡現(xiàn)場離奇詭異,居然都是意外死亡术吝,警方通過查閱死者的電腦和手機计济,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,426評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來排苍,“玉大人沦寂,你說我怎么就攤上這事√匝茫” “怎么了传藏?”我有些...
    開封第一講書人閱讀 165,417評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長彤守。 經(jīng)常有香客問我毯侦,道長,這世上最難降的妖魔是什么具垫? 我笑而不...
    開封第一講書人閱讀 58,868評論 1 295
  • 正文 為了忘掉前任叫惊,我火速辦了婚禮,結(jié)果婚禮上做修,老公的妹妹穿的比我還像新娘霍狰。我一直安慰自己,他們只是感情好饰及,可當(dāng)我...
    茶點故事閱讀 67,892評論 6 392
  • 文/花漫 我一把揭開白布蔗坯。 她就那樣靜靜地躺著,像睡著了一般燎含。 火紅的嫁衣襯著肌膚如雪宾濒。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,692評論 1 305
  • 那天屏箍,我揣著相機與錄音绘梦,去河邊找鬼。 笑死赴魁,一個胖子當(dāng)著我的面吹牛卸奉,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播颖御,決...
    沈念sama閱讀 40,416評論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼榄棵,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起疹鳄,我...
    開封第一講書人閱讀 39,326評論 0 276
  • 序言:老撾萬榮一對情侶失蹤拧略,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后瘪弓,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體垫蛆,經(jīng)...
    沈念sama閱讀 45,782評論 1 316
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,957評論 3 337
  • 正文 我和宋清朗相戀三年腺怯,在試婚紗的時候發(fā)現(xiàn)自己被綠了月褥。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,102評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡瓢喉,死狀恐怖宁赤,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情栓票,我是刑警寧澤决左,帶...
    沈念sama閱讀 35,790評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站走贪,受9級特大地震影響佛猛,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜坠狡,卻給世界環(huán)境...
    茶點故事閱讀 41,442評論 3 331
  • 文/蒙蒙 一继找、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧逃沿,春花似錦婴渡、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,996評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至假消,卻和暖如春柠并,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背富拗。 一陣腳步聲響...
    開封第一講書人閱讀 33,113評論 1 272
  • 我被黑心中介騙來泰國打工臼予, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人啃沪。 一個月前我還...
    沈念sama閱讀 48,332評論 3 373
  • 正文 我出身青樓粘拾,卻偏偏與公主長得像,于是被迫代替她去往敵國和親谅阿。 傳聞我的和親對象是個殘疾皇子半哟,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,044評論 2 355

推薦閱讀更多精彩內(nèi)容