RxJava_整體流程分析

一、RxJava2 整體功能分析

下面這段代碼很簡單宵膨,就是事件源會(huì)在當(dāng)前線程通過 e.onNext() 的方式發(fā)送 "1"涯曲,"2","3" 三個(gè)事件秃臣,最后發(fā)送 e.onComplete() 第四個(gè)事件蠢终,那么在訂閱者 Observer 中就可以收到這個(gè)幾個(gè)由事件源發(fā)送的事件梦染。接下來通過源碼的角度分析下面這段代碼的整體邏輯

RxJava2_執(zhí)行流程分析圖.png

在分析代碼之前需要明白一個(gè)原則,那就是了解一個(gè)類首先先了解這個(gè)的頂層接口晶姊,通過頂層接口就可以明白這個(gè)類的框架體系的大體功能了,子類只是對(duì)這個(gè)體系的功能擴(kuò)展而已。這就好比學(xué)習(xí)集合框架一樣姑裂,我們首先會(huì)去了解 Collection 接口內(nèi)部的所有的方法,知道了這些方法之后泞辐,我們心里就大概知道這個(gè) Collection 體系大概的功能了,然后再慢慢的去了解它的實(shí)現(xiàn)類對(duì)這些功能的具體實(shí)現(xiàn)喂急。

Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {
        e.onNext("1");
        e.onNext("2");
        e.onNext("3");
        e.onComplete();
    }
}).subscribe(new Observer<String>() {
    private Disposable mD = null;
    @Override
    public void onSubscribe(Disposable d) {
        mD = d;
    }
    @Override
    public void onNext(String s) {
        if ("2".equals(s)) {
            mD.dispose();
        }
        System.out.println("s = " + s);
    }
    @Override
    public void onError(Throwable e) {
        System.out.println(e.toString());
    }
    @Override
    public void onComplete() {
        System.out.println("onComplete");
    }
});

二格嘁、Observable 的繼承關(guān)系

Observable 是一個(gè)抽象類,是 ObservableSource 的實(shí)現(xiàn)類廊移,而 ObservableSource 類是一個(gè)接口糕簿,它表示事件源。內(nèi)部只有一個(gè)方法 subscribe 該方法表示通過 Observer 訂閱當(dāng)前的事件源狡孔。那么事件發(fā)布的事件懂诗,在 Observer 訂閱者中就會(huì)被收到。了解了 Observable 的頂層接口之后苗膝,我們就知道該體系最重要的一個(gè)功能那就是 subscribe 方法了殃恒,因此我們就重點(diǎn)關(guān)注子類的 subscribe 方法。

public interface ObservableSource<T> {
    void subscribe(Observer<? super T> observer);
}

跟蹤 Observable 中 subscribe 的調(diào)用關(guān)系荚醒,最后可以知道最終會(huì)調(diào)用到一個(gè)方法第 31 行代碼 subscribeActual(observer); 期間做了多次轉(zhuǎn)換操作芋类,這些我們不用管。我說過現(xiàn)在分析是整體流程界阁,所以沒有必要去分析細(xì)枝末節(jié)的東西侯繁,不然會(huì)迷失方向的。所以大膽的得出一個(gè)結(jié)論泡躯,只要是 ObservableSource 的子類贮竟,那么我們只要關(guān)心 subscribeActual(observer); 這個(gè)方法就好的。

public abstract class Observable<T> implements ObservableSo
urce<T> {
    @SchedulerSupport(SchedulerSupport.NONE)
    public final Disposable subscribe() {
        return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
    }
}

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
        Action onComplete, Consumer<? super Disposable> onSubscribe) {
    ObjectHelper.requireNonNull(onNext, "onNext is null");
    ObjectHelper.requireNonNull(onError, "onError is null");
    ObjectHelper.requireNonNull(onComplete, "onComplete is null");
    ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
    LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
    subscribe(ls);
    return ls;
}

public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);
        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
        //這段代碼是核心代碼较剃。
        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        RxJavaPlugins.onError(e);
        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS")
        npe.initCause(e);
        throw npe;
    }
}

三咕别、Observable#create(ObservableOnSubscribe)

我們?cè)?create 方法中傳入一個(gè) ObsevableOnSubscribe 對(duì)象,而這個(gè)對(duì)象就是一個(gè) Observable 的父類写穴。而 create 方法顧名思義就適用于創(chuàng)建 Observable 對(duì)象的惰拱。

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    //非空校驗(yàn)
    ObjectHelper.requireNonNull(source, "source is null");
    //內(nèi)部就是創(chuàng)建一個(gè) ObservableCreate 對(duì)象
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

四、ObservableCreate

上面的 create 方法中內(nèi)部實(shí)際返回的是一個(gè) ObservableCreate 對(duì)象啊送,而這個(gè)類實(shí)際上就是 Observable 的子類偿短。通過構(gòu)造方法方法可以知道當(dāng)前創(chuàng)建的 ObservableCreate 內(nèi)部維護(hù)了上一級(jí)創(chuàng)建的 ObsevableOnSubscribe 對(duì)象欣孤,這個(gè)對(duì)象就是用戶在 create 方法傳入的對(duì)象。這里很重要昔逗,因?yàn)橄旅婷恳患?jí)都會(huì)創(chuàng)建一個(gè)新的 Observable 對(duì)象降传,內(nèi)部都會(huì)保存上一級(jí)的 ObservableOnSubscribe 對(duì)象。如果不太理解的話勾怒,先放下婆排,等下面分析了應(yīng)該就會(huì)明白了。到這里我們就知道 Observable.create() 方法會(huì)返回一個(gè) Observable 類型的對(duì)象笔链。

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;
    public ObservableCreate(ObservableOnSubscribe<T> 
    source) {
        //內(nèi)部保存了上一級(jí)創(chuàng)建的 ObservableOnSubscribe 對(duì)象的引用段只。
        this.source = source;
    }
}

五、觸發(fā) subscribe 方法

這個(gè)方法大家都知道卡乾,就是用來發(fā)生訂閱關(guān)系的翼悴。在 RxJava 中事件源 Observable 只有發(fā)生了訂閱才會(huì)發(fā)送事件。我們知道剛才通過 create 方法的分析可以知道幔妨,內(nèi)部是創(chuàng)建了 ObservableCreate 這個(gè) Observable 子類的鹦赎,那么就分析 ObservableCreate 的 subscribe 的內(nèi)部實(shí)現(xiàn)即可。

  • ObservableCreate#subscribeActual

在上面已經(jīng)分析過了误堡,只要是 Observable 類型的對(duì)象古话,在調(diào)用 subscribe(observer) 最終都會(huì)調(diào)用調(diào) subscribeActual(observer) 方法。

@Override
//subscribe 方法內(nèi)部會(huì)調(diào)用 subscribeActual
protected void subscribeActual(Observer<? super T> observer) {
    //發(fā)射器
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    //回調(diào)給 observer#onSubscribe
    observer.onSubscribe(parent);
    try {
        //告訴上一級(jí)的 observable 你可以發(fā)送事件了锁施。
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}
  • 分析事件源是如何發(fā)送事件的陪踩?

在文章開頭,我們?cè)?ObservableOnSubscribe#subscribe 方法內(nèi)部發(fā)送的了 4 個(gè)事件悉抵。那么這個(gè) ObservableOnSubscribe#subscribe(ObservableEmitter) 是在哪里調(diào)用的呢肩狂?還記得 ObservableCreate 類中的 subscribeActual 的實(shí)現(xiàn)嗎?它的內(nèi)部調(diào)用 source.subscribe(parent); 這個(gè)方法姥饰,目的就是將發(fā)射器 CreateEmitter 傳遞給上一級(jí)創(chuàng)建的 ObservableOnSubscribe 對(duì)象傻谁。

@Override
//subscribe 方法內(nèi)部會(huì)調(diào)用 subscribeActual
protected void subscribeActual(Observer<? super T> observer) {
    //發(fā)射器
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    //回調(diào)給 observer#onSubscribe
    observer.onSubscribe(parent);
    try {
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

這樣上面這 4 個(gè)事件就可以通過 ObservableEmitter 對(duì)象發(fā)送了,由于多態(tài)的原理列粪,實(shí)際上是由 CreateEmitter 去發(fā)送這四個(gè)事件的审磁。

**CreateEmitter 就是上面描述的 Emitter 的實(shí)現(xiàn)類。 **

//發(fā)射器
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//將發(fā)送器對(duì)象傳入給上一級(jí)創(chuàng)建的 ObservableOnSubscribe 對(duì)象岂座,其實(shí)也就類似于接口回調(diào)的方式去通知 Observable 您的訂閱者 Observer 已準(zhǔn)備好了态蒂,您可以發(fā)送事件了。
source.subscribe(parent);

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
       e.onNext("1");
       e.onNext("2");
       e.onNext("3");
       e.onComplete();
    }
});
  • ObservableCreate#CreateEmitter

這個(gè)類是一個(gè)發(fā)射器费什,它是 Emitter 的實(shí)現(xiàn)類钾恢,主要用于發(fā)射事件的。內(nèi)部封裝了 Observer 對(duì)象,這個(gè) Observer 就是通過 subscribe(observer) 參數(shù)傳入的 observer 對(duì)象瘩蚪,那么在 CreateEmitter 中調(diào)用 onNext,onError,onComplete 方法內(nèi)部都去調(diào)用該 observber 對(duì)象對(duì)應(yīng)的 onNext(t),onError(t),onComplete() 方法刑桑。這樣就實(shí)現(xiàn)了事件源 Emitter 發(fā)送事件,在訂閱者 Observer 收到事件了募舟。

static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
    final Observer<? super T> observer;
    CreateEmitter(Observer<? super T> observer) {
        this.observer = observer;
    }
    @Override
    public void onNext(T t) {
        //onNext 的參數(shù)不能為 null
        if (t == null) {
            return;
        }
        if (!isDisposed()) {
            //回調(diào) observer 對(duì)應(yīng)的方法
            observer.onNext(t);
        }
    }
    @Override
    public void onError(Throwable t) {
        //onError 的參數(shù)不能為 null
        if (t == null) {
            t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."
        }
        if (!isDisposed()) {
            try {
                //回調(diào) observer 對(duì)應(yīng)的方法
                observer.onError(t);
            } finally {
                dispose();
            }
        } else {
            RxJavaPlugins.onError(t);
        }
    }
    @Override
    public void onComplete() {
        if (!isDisposed()) {
            try {
                //回調(diào) observer 對(duì)應(yīng)的方法
                observer.onComplete();
            } finally {
                dispose();
            }
        }
    }
    @Override
    public void setDisposable(Disposable d) {
        DisposableHelper.set(this, d);
    }
    @Override
    public void setCancellable(Cancellable c) {
        setDisposable(new CancellableDisposable(c));
    }
    @Override
    public ObservableEmitter<T> serialize() {
        return new SerializedEmitter<T>(this);
    }
    @Override
    public void dispose() {
        DisposableHelper.dispose(this);
    }
    @Override
    public boolean isDisposed() {
        return DisposableHelper.isDisposed(get());
    }
}
  • Emitter

發(fā)射器頂層接口,定義 onNext,onError,onComplete 方法闻察。

public interface Emitter<T> {

    /**
     * Signal a normal value.
     * @param value the value to signal, not null
     */
    void onNext(T value);

    /**
     * Signal a Throwable exception.
     * @param error the Throwable to signal, not null
     */
    void onError(Throwable error);

    /**
     * Signal a completion.
     */
    void onComplete();
}

  • Disposable 的作用

Disposable 可以理解為一個(gè)事件源和訂閱者的一個(gè)連接器拱礁,當(dāng)調(diào)用 dispose() 方法之后,這個(gè)連接器就關(guān)閉了辕漂,那么事件源將不會(huì)往該訂閱者 observer 發(fā)送事件了呢灶。isDisposed() 就是用于判斷該連接器是否被中斷了。

public interface Disposable {
    /**
     * Dispose the resource, the operation should be idempotent.
     */
    void dispose();
    /**
     * Returns true if this resource has been disposed.
     * @return true if this resource has been disposed
     */
    boolean isDisposed();
}
  • Disposable 的使用

還是回到 ObservableCreate 這個(gè)類的 subscribeActual 方法钉嘹,這個(gè)方法中是發(fā)生訂閱的時(shí)候調(diào)用的鸯乃。在其內(nèi)部有這段代碼
observer.onSubscribe(parent); 這個(gè) parent 就是先前創(chuàng)建的 CreateEmitter 對(duì)象,從上面的源碼可以看到該類實(shí)現(xiàn)了 Emitter 接口外跋涣,還實(shí)現(xiàn)了 Disposable 接口缨睡。那么在外部的Observer 中的 onSubscribe 這個(gè)方法可以收到 Disposable 對(duì)象,那么用戶就可以在適當(dāng)?shù)臅r(shí)候進(jìn)行關(guān)閉連接器操作了陈辱。下面的代碼示例中奖年,在 onNext 方法中當(dāng)收到的事件為 "2" 時(shí),那么就調(diào)用 dispose() 關(guān)閉連接器沛贪。而關(guān)閉之后事件源在發(fā)送下一個(gè)事件的時(shí)候就會(huì)判斷該連接器是否是關(guān)閉的陋守,具體代碼看 CreateEmitter#onNext 方法,它內(nèi)部會(huì)判斷 if (!isDisposed()) 判斷利赋。如果已經(jīng)關(guān)水评,那么將不會(huì)再往該 Observer 發(fā)送事件了。

//CreateEmitter 類繼承結(jié)構(gòu)
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable 

//開始訂閱
.subscribe(new Observer<String>() {
    private Disposable mD = null;
    //onSubscribe 方法用于接收一個(gè) Dispoable 對(duì)象媚送。
    @Override
    public void onSubscribe(Disposable d) {
        mD = d;
    }
    @Override
    public void onNext(String s) {
        //當(dāng)接收到的事件為 "2" 時(shí)中燥,那么就關(guān)閉連接器。
        if ("2".equals(s)) {
            mD.dispose();
        }
        System.out.println("s = " + s);
    }
    @Override
    public void onError(Throwable e) {
        System.out.println(e.toString());
    }
    @Override
    public void onComplete() {
        System.out.println("onComplete");
    }
});

六季希、總結(jié)

1褪那、在 RxJava 中最重要的就是每一次 Observable 的創(chuàng)建都會(huì)保存上一級(jí)的創(chuàng)建的 Observable 對(duì)象,這個(gè)有什么用呢式塌?其實(shí)每一個(gè) Observable 都要進(jìn)行 subscribe 發(fā)生訂閱關(guān)系的博敬。在當(dāng)前 Observable 調(diào)用了 subscribe 之后,還需要調(diào)用上一級(jí)創(chuàng)建的 Observable.subscribe() 進(jìn)行訂閱峰尝,這樣一級(jí)級(jí)往上發(fā)生訂閱關(guān)系偏窝。這個(gè)作用是可以在下一節(jié)分析線程切換時(shí)就用體現(xiàn)了,到時(shí)再分析咯。
2祭往、分析整體流程不要在意細(xì)枝末節(jié)伦意,先接觸頂層接口,了解體系功能硼补。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末驮肉,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子已骇,更是在濱河造成了極大的恐慌离钝,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,546評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件褪储,死亡現(xiàn)場離奇詭異卵渴,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)鲤竹,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,224評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門浪读,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人辛藻,你說我怎么就攤上這事碘橘。” “怎么了吱肌?”我有些...
    開封第一講書人閱讀 164,911評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵蛹屿,是天一觀的道長。 經(jīng)常有香客問我岩榆,道長错负,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,737評(píng)論 1 294
  • 正文 為了忘掉前任勇边,我火速辦了婚禮犹撒,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘粒褒。我一直安慰自己识颊,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,753評(píng)論 6 392
  • 文/花漫 我一把揭開白布奕坟。 她就那樣靜靜地躺著祥款,像睡著了一般。 火紅的嫁衣襯著肌膚如雪月杉。 梳的紋絲不亂的頭發(fā)上刃跛,一...
    開封第一講書人閱讀 51,598評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音苛萎,去河邊找鬼桨昙。 笑死检号,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的蛙酪。 我是一名探鬼主播齐苛,決...
    沈念sama閱讀 40,338評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼桂塞!你這毒婦竟也來了凹蜂?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,249評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤阁危,失蹤者是張志新(化名)和其女友劉穎炊甲,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體欲芹,經(jīng)...
    沈念sama閱讀 45,696評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,888評(píng)論 3 336
  • 正文 我和宋清朗相戀三年吟吝,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了菱父。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,013評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡剑逃,死狀恐怖浙宜,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情蛹磺,我是刑警寧澤粟瞬,帶...
    沈念sama閱讀 35,731評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站萤捆,受9級(jí)特大地震影響裙品,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜俗或,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,348評(píng)論 3 330
  • 文/蒙蒙 一市怎、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧辛慰,春花似錦区匠、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,929評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至速客,卻和暖如春戚篙,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背溺职。 一陣腳步聲響...
    開封第一講書人閱讀 33,048評(píng)論 1 270
  • 我被黑心中介騙來泰國打工已球, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留臣镣,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,203評(píng)論 3 370
  • 正文 我出身青樓智亮,卻偏偏與公主長得像忆某,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子阔蛉,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,960評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容