RxJava2 源碼解析(一)

轉(zhuǎn)載請標(biāo)明出處:
http://www.reibang.com/p/23c38a4ed360
本文出自:【張旭童的簡書】 (http://www.reibang.com/users/8e91ff99b072/latest_articles)

概述

最近事情太多了桅锄,現(xiàn)在公司內(nèi)部的變動又固,自己崗位的變化脆诉,以及最近決定找工作棵红。所以博客耽誤了衷咽,準(zhǔn)備面試中演熟,打算看一看RxJava2的源碼景描,遂有了這篇文章堰乔。

不會對RxJava2的源碼逐字逐句的閱讀万哪,只尋找關(guān)鍵處侠驯,我們平時接觸得到的那些代碼。
背壓實際中接觸較少奕巍,故只分析了Observable.
分析的源碼版本為:2.0.1

我們的目的:

  1. 知道源頭(Observable)是如何將數(shù)據(jù)發(fā)送出去的吟策。
  2. 知道終點(Observer)是如何接收到數(shù)據(jù)的。
  3. 何時將源頭和終點關(guān)聯(lián)起來的
  4. 知道線程調(diào)度是怎么實現(xiàn)的
  5. 知道操作符是怎么實現(xiàn)的

本文先達(dá)到目的1 的止,2 踊挠,3。
我個人認(rèn)為主要還是適配器模式的體現(xiàn)冲杀,我們接觸的就只有ObservableObserver效床,其實內(nèi)部有大量的中間對象在適配:將它們兩聯(lián)系起來,加入一些額外功能权谁,例如考慮dispose和hook等剩檀。

從create開始。

這是一段不涉及操作符和線程切換的簡單例子:

        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("1");
                e.onComplete();
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe() called with: d = [" + d + "]");
            }

            @Override
            public void onNext(String value) {
                Log.d(TAG, "onNext() called with: value = [" + value + "]");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError() called with: e = [" + e + "]");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete() called");
            }
        });

拿 create來說旺芽,

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        //.....
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

返回值是Observable,參數(shù)是ObservableOnSubscribe,定義如下:

public interface ObservableOnSubscribe<T> {
    void subscribe(ObservableEmitter<T> e) throws Exception;
}

ObservableOnSubscribe是一個接口沪猴,里面就一個方法,也是我們實現(xiàn)的那個方法:
該方法的參數(shù)是 ObservableEmitter,我認(rèn)為它是關(guān)聯(lián)起 Disposable概念的一層:

public interface ObservableEmitter<T> extends Emitter<T> {
    void setDisposable(Disposable d);
    void setCancellable(Cancellable c);
    boolean isDisposed();
    ObservableEmitter<T> serialize();
}

ObservableEmitter也是一個接口采章。里面方法很多运嗜,它也繼承了 Emitter<T> 接口。

public interface Emitter<T> {
    void onNext(T value);
    void onError(Throwable error);
    void onComplete();
}

Emitter<T>定義了 我們在ObservableOnSubscribe中實現(xiàn)subscribe()方法里最常用的三個方法悯舟。

好担租,我們回到原點,create()方法里就一句話return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));抵怎,其中提到RxJavaPlugins.onAssembly():

    /**
     * Calls the associated hook function.
     * @param <T> the value type
     * @param source the hook's input value
     * @return the value returned by the hook
     */
    @SuppressWarnings({ "rawtypes", "unchecked" })
    public static <T> Observable<T> onAssembly(Observable<T> source) {
        Function<Observable, Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

可以看到這是一個關(guān)于hook的方法奋救,關(guān)于hook我們暫且不表岭参,不影響主流程,我們默認(rèn)使用中都沒有hook尝艘,所以這里就是直接返回source,即傳入的對象演侯,也就是new ObservableCreate<T>(source).

ObservableCreate我認(rèn)為算是一種適配器的體現(xiàn),create()需要返回的是Observable,而我現(xiàn)在有的是(方法傳入的是)ObservableOnSubscribe對象背亥,ObservableCreateObservableOnSubscribe適配成Observable秒际。
其中subscribeActual()方法表示的是被訂閱時真正被執(zhí)行的方法,放后面解析:

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;
    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);
        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

OK,至此狡汉,創(chuàng)建流程結(jié)束程癌,我們得到了Observable<T>對象,其實就是ObservableCreate<T>.

到訂閱subscribe 結(jié)束

subscribe():

    public final void subscribe(Observer<? super T> observer) {
        ...
        try {
            //1 hook相關(guān)轴猎,略過
            observer = RxJavaPlugins.onSubscribe(this, observer);
            ...
            //2 真正的訂閱處
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            //3 錯誤處理嵌莉,
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            //4 hook錯誤相關(guān),略過
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

關(guān)于hook的代碼:
可以看到如果沒有hook捻脖,即相應(yīng)的對象是null锐峭,則是傳入什么返回什么的

    /**
     * Calls the associated hook function.
     * @param <T> the value type
     * @param source the hook's input value
     * @param observer the observer
     * @return the value returned by the hook
     */
    @SuppressWarnings({ "rawtypes", "unchecked" })
    public static <T> Observer<? super T> onSubscribe(Observable<T> source, Observer<? super T> observer) {
        //1 默認(rèn)onObservableSubscribe(可理解為一個flatmap的操作)是null
        BiFunction<Observable, Observer, Observer> f = onObservableSubscribe;
        //2 所以這句跳過可婶,不會對其進(jìn)行apply
        if (f != null) {
            return apply(f, source, observer);
        }
        //3 返回參數(shù)2
        return observer;
    }

我也是驗證了一下 三個Hook相關(guān)的變量沿癞,確實是null:

        Consumer<Throwable> errorHandler = RxJavaPlugins.getErrorHandler();
        BiFunction<Observable, Observer, Observer> onObservableSubscribe = RxJavaPlugins.getOnObservableSubscribe();
        Function<Observable, Observable> onObservableAssembly = RxJavaPlugins.getOnObservableAssembly();

        Log.e(TAG, "errorHandler = [" + errorHandler + "]");
        Log.e(TAG, "onObservableSubscribe = [" + onObservableSubscribe + "]");
        Log.e(TAG, "onObservableAssembly = [" + onObservableAssembly + "]");

所以訂閱時的重點就是:

            //2 真正的訂閱處
            subscribeActual(observer);

我們將第一節(jié)提到的ObservableCreate里的subscribeActual()方法拿出來看看:

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        //1 創(chuàng)建CreateEmitter,也是一個適配器
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        //2 onSubscribe()參數(shù)是Disposable 矛渴,所以CreateEmitter可以將Observer->Disposable 椎扬。還有一點要注意的是`onSubscribe()`是在我們執(zhí)行`subscribe()`這句代碼的那個線程回調(diào)的,并不受線程調(diào)度影響具温。
        observer.onSubscribe(parent);
        try {
            //3 將ObservableOnSubscribe(源頭)與CreateEmitter(Observer蚕涤,終點)聯(lián)系起來
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            //4 錯誤回調(diào)
            parent.onError(ex);
        }
    }

Observer是一個接口,里面就四個方法铣猩,我們在開頭的例子中已經(jīng)全部實現(xiàn)(打印Log)揖铜。

public interface Observer<T> {
    void onSubscribe(Disposable d);
    void onNext(T value);
    void onError(Throwable e);
    void onComplete();
}

重點在這一句:

 //3 將ObservableOnSubscribe(源頭)與CreateEmitter(Observer,終點)聯(lián)系起來
            source.subscribe(parent);

sourceObservableOnSubscribe對象达皿,在本文中是:

        new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("1");
                e.onComplete();
            }
        }

則會調(diào)用parent.onNext()parent.onComplete()天吓,parentCreateEmitter對象,如下:

 static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {
        final Observer<? super T> observer;
        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            ...
            //如果沒有被dispose峦椰,會調(diào)用Observer的onNext()方法
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            ...
            //1 如果沒有被dispose龄寞,會調(diào)用Observer的onError()方法
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                //2 一定會自動dispose()
                    dispose();
                }
            } else {
            //3 如果已經(jīng)被dispose了,會拋出異常汤功。所以onError物邑、onComplete彼此互斥,只能被調(diào)用一次
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public void onComplete() {
         //1 如果沒有被dispose,會調(diào)用Observer的onComplete()方法
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                 //2 一定會自動dispose()
                    dispose();
                }
            }
        }
        
        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }
        
        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
    }

總結(jié)重點:

  1. ObservableObserver的關(guān)系沒有被dispose拂封,才會回調(diào)ObserveronXXXX()方法
  2. ObserveronComplete()onError() 互斥只能執(zhí)行一次,因為CreateEmitter在回調(diào)他們兩中任意一個后鹦蠕,都會自動dispose()冒签。根據(jù)第一點,驗證此結(jié)論钟病。
  3. ObservableObserver關(guān)聯(lián)時(訂閱時)萧恕,Observable才會開始發(fā)送數(shù)據(jù)。
  4. ObservableCreateObservableOnSubscribe(真正的源)->Observable.
  5. ObservableOnSubscribe(真正的源)需要的是發(fā)射器ObservableEmitter.
  6. CreateEmitterObserver->ObservableEmitter,同時它也是Disposable.
  7. errorcomplete肠阱,complete不顯示票唆。 反之會crash,感興趣的可以寫如下代碼驗證屹徘。
      e.onNext("1");
      //先error后complete走趋,complete不顯示。 反之 會crash
      //e.onError(new IOException("sb error"));
      e.onComplete();
      e.onError(new IOException("sb error"));

一個好玩的地方DisposableHelper

原本到這里噪伊,最簡單的一個流程我們算是搞清了簿煌。
還值得一提的是,DisposableHelper.dispose(this);
DisposableHelper很有趣鉴吹,它是一個枚舉姨伟,這是利用枚舉實現(xiàn)了一個單例disposed state,即是否disposed,如果Disposable類型的變量的引用等于DISPOSED,則起點和終點已經(jīng)斷開聯(lián)系豆励。
其中大多數(shù)方法 都是靜態(tài)方法夺荒,所以isDisposed()方法的實現(xiàn)就很簡單,直接比較引用即可.
其他的幾個方法良蒸,和AtomicReference類攪基在了一起技扼。
這是一個實現(xiàn)引用原子操作的類,對象引用的原子更新嫩痰,常用方法如下:

//返回當(dāng)前的引用淮摔。
V get()
//如果當(dāng)前值與給定的expect引用相等,(注意是引用相等而不是equals()相等)始赎,更新為指定的update值和橙。
boolean compareAndSet(V expect, V update)
//原子地設(shè)為給定值并返回舊值。
V getAndSet(V newValue)

OK,鋪墊完了我們看看源碼吧:

public enum DisposableHelper implements Disposable {
    /**
     * The singleton instance representing a terminal, disposed state, don't leak it.
     */
    DISPOSED
    ;

    public static boolean isDisposed(Disposable d) {
        return d == DISPOSED;
    }

    public static boolean dispose(AtomicReference<Disposable> field) {
        //1 通過斷點查看造垛,默認(rèn)情況下,field的值是"null"魔招,并非引用是null哦!大坑大坑大坑
        //但是current是null引用
        Disposable current = field.get();
        Disposable d = DISPOSED;
        //2 null不等于DISPOSED
        if (current != d) {
            //3 field是DISPOSED了五辽,current還是null
            current = field.getAndSet(d);
            if (current != d) {
            //4 默認(rèn)情況下 走不到這里办斑,這里是在設(shè)置了setCancellable()后會走到。
                if (current != null) {
                    current.dispose();
                }
                return true;
            }
        }
        return false;
    }

總結(jié)

  1. subscribeActual()方法中,源頭和終點關(guān)聯(lián)起來乡翅。
  2. source.subscribe(parent);這句代碼執(zhí)行時鳞疲,才開始從發(fā)送ObservableOnSubscribe中利用ObservableEmitter發(fā)送數(shù)據(jù)Observer。即數(shù)據(jù)是從源頭push給終點的蠕蚜。
  3. CreateEmitter 中么夫,只有ObservableObserver的關(guān)系沒有被dispose其掂,才會回調(diào)ObserveronXXXX()方法
  4. ObserveronComplete()onError() 互斥只能執(zhí)行一次帝美,因為CreateEmitter在回調(diào)他們兩中任意一個后骂澄,都會自動dispose()。根據(jù)上一點挣柬,驗證此結(jié)論潮酒。
  5. errorcompletecomplete不顯示邪蛔。 反之會crash
  6. 還有一點要注意的是onSubscribe()是在我們執(zhí)行subscribe()這句代碼的那個線程回調(diào)的急黎,并不受線程調(diào)度影響

轉(zhuǎn)載請標(biāo)明出處:
http://www.reibang.com/p/23c38a4ed360
本文出自:【張旭童的簡書】 (http://www.reibang.com/users/8e91ff99b072/latest_articles)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末侧到,一起剝皮案震驚了整個濱河市叁熔,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌床牧,老刑警劉巖荣回,帶你破解...
    沈念sama閱讀 216,997評論 6 502
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異戈咳,居然都是意外死亡心软,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,603評論 3 392
  • 文/潘曉璐 我一進(jìn)店門著蛙,熙熙樓的掌柜王于貴愁眉苦臉地迎上來删铃,“玉大人,你說我怎么就攤上這事踏堡×匝洌” “怎么了?”我有些...
    開封第一講書人閱讀 163,359評論 0 353
  • 文/不壞的土叔 我叫張陵顷蟆,是天一觀的道長诫隅。 經(jīng)常有香客問我,道長帐偎,這世上最難降的妖魔是什么逐纬? 我笑而不...
    開封第一講書人閱讀 58,309評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮削樊,結(jié)果婚禮上豁生,老公的妹妹穿的比我還像新娘兔毒。我一直安慰自己,他們只是感情好甸箱,可當(dāng)我...
    茶點故事閱讀 67,346評論 6 390
  • 文/花漫 我一把揭開白布育叁。 她就那樣靜靜地躺著,像睡著了一般芍殖。 火紅的嫁衣襯著肌膚如雪豪嗽。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,258評論 1 300
  • 那天围小,我揣著相機(jī)與錄音昵骤,去河邊找鬼树碱。 笑死肯适,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的成榜。 我是一名探鬼主播框舔,決...
    沈念sama閱讀 40,122評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼赎婚!你這毒婦竟也來了刘绣?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,970評論 0 275
  • 序言:老撾萬榮一對情侶失蹤挣输,失蹤者是張志新(化名)和其女友劉穎纬凤,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體撩嚼,經(jīng)...
    沈念sama閱讀 45,403評論 1 313
  • 正文 獨居荒郊野嶺守林人離奇死亡停士,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,596評論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了完丽。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片恋技。...
    茶點故事閱讀 39,769評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖逻族,靈堂內(nèi)的尸體忽然破棺而出蜻底,到底是詐尸還是另有隱情,我是刑警寧澤聘鳞,帶...
    沈念sama閱讀 35,464評論 5 344
  • 正文 年R本政府宣布薄辅,位于F島的核電站,受9級特大地震影響抠璃,放射性物質(zhì)發(fā)生泄漏长搀。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,075評論 3 327
  • 文/蒙蒙 一鸡典、第九天 我趴在偏房一處隱蔽的房頂上張望源请。 院中可真熱鬧,春花似錦、人聲如沸谁尸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,705評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽良蛮。三九已至抽碌,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間决瞳,已是汗流浹背货徙。 一陣腳步聲響...
    開封第一講書人閱讀 32,848評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留皮胡,地道東北人痴颊。 一個月前我還...
    沈念sama閱讀 47,831評論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像屡贺,于是被迫代替她去往敵國和親蠢棱。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,678評論 2 354

推薦閱讀更多精彩內(nèi)容