小白讀源碼 | RxJava2 入門篇(一)

轉(zhuǎn)載請注明出處:http://www.reibang.com/u/1d789e82337f

題記: RxJava2 想必很多人都用過捕儒,擴展的觀察者模式顺呕,簡潔的鏈?zhǔn)秸{(diào)用橄镜,通過簡單的API調(diào)用就可以滿足我們的各種需求端礼,讓人不禁感嘆這玩意兒真爽禽笑。當(dāng)然在我們用著很爽的時候,不禁也會對它產(chǎn)生一些好奇蛤奥,這玩意兒到底長是個啥模樣佳镜,嗯,想看看凡桥,那就看看吧蟀伸。花了些時間看了看它的部分源碼唬血,作此記錄望蜡。

引子

既然我是只小白,還挑什么呢拷恨,撿最容易的上手噻脖律,F(xiàn)lowable (帶背壓模式的被觀察者),我還沒有看腕侄,這里僅記錄普通的 Observable 源碼閱讀過程小泉。下面代碼即為眾所周知的入門用法芦疏,本篇文章就圍繞它來闡述。這是我使用的版本:

compile 'io.reactivex.rxjava2:rxjava:2.1.1'
compile 'io.reactivex.rxjava2:rxandroid:2.1.0' 
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "Observable emit 1 ");
                emitter.onNext(1);
                Log.d(TAG, "Observable emit 2");
                emitter.onNext(2);
                emitter.onComplete();
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Log.d(TAG, "onSubscribe: isDisposable " + d.isDisposed());
            }

            @Override
            public void onNext(@NonNull Integer integer) {
                Log.d(TAG, "onNext: " + integer);
            }

            @Override
            public void onError(@NonNull Throwable e) {
                Log.d(TAG, "onError: " + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: ");
            }
        });

一 微姊、關(guān)鍵概念

Observable(被觀察者):這是一個抽象類酸茴,里面方法眾多,就不列舉了兢交,讀的時候遇到哪個看哪個薪捍。
Observer(觀察者):這是個接口,里面有 4 個方法配喳,是必須都要知道的酪穿。

public interface Observer<T> {
    void onSubscribe(@NonNull Disposable d);
    void onNext(@NonNull T t);
    void onError(@NonNull Throwable e);
    void onComplete();
}

ObservableOnSubscribe(事件發(fā)射器的載體):

public interface ObservableOnSubscribe<T> {
    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}

ObservableEmitter(事件發(fā)射器):這是個接口,繼承了 Emitter 接口晴裹,用于發(fā)送事件被济。

public interface ObservableEmitter<T> extends Emitter<T> {
    void setDisposable(@Nullable Disposable d);
    void setCancellable(@Nullable Cancellable c);
    boolean isDisposed();
    @NonNull
    ObservableEmitter<T> serialize();
    boolean tryOnError(@NonNull Throwable t);
}

public interface Emitter<T> {
    void onNext(@NonNull T value);
    void onError(@NonNull Throwable error);
    void onComplete();
}

這些關(guān)鍵概念必須要記住,至少大體知道都是什么涧团,里面都有些什么方法只磷。

二、直奔核心

既然這是個鏈?zhǔn)秸{(diào)用泌绣,我們不妨從頭到尾過一遍钮追。
Observable.create(new ObservableOnSubscribe<Integer>(){...})創(chuàng)建了一個Observable對象,那就進(jìn)create()這個靜態(tài)方法看一看

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
      // 判空代碼赞别,不重要畏陕,不看也罷
     ObjectHelper.requireNonNull(source, "source is null");
    // 創(chuàng)建 Observable 對象的關(guān)鍵代碼
     return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

create()方法里只有兩行代碼,我們重點來看看RxJavaPlugins.onAssembly(new ObservableCreate<T>(source))這行代碼 仿滔。很明顯分為兩部分,onAssembly()new ObservableCreate<T>(source)犹芹,我們先看onAssembly()崎页,點進(jìn)去發(fā)現(xiàn)是

@SuppressWarnings({ "rawtypes", "unchecked" })
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    Function<? super Observable, ? extends Observable> f = onObservableAssembly;
    if (f != null) {
        return apply(f, source);
    }
    // 上面兩行代碼是和使用 map 操作符相關(guān)的,我們這里還沒用 map 操作符呢腰埂,對我們沒啥卵用飒焦,當(dāng)它不存在,
    // 那么方法參數(shù)里傳進(jìn)來一個 Observable 類型的 source, 現(xiàn)在被原原本本當(dāng)做返回值返回回去
    // 意思就很明顯屿笼,Observable 對象是由 new ObservableCreate<T>(source) 生成的
    return source;
}

既然onAssembly()把參數(shù)原樣作為返回值返回牺荠,那Observable.create(new ObservableOnSubscribe<Integer>(){...})創(chuàng)建的Observable對象就是new ObservableCreate<T>(source)了,那我們就來看 new ObservableCreate<T>(source)驴一,點進(jìn)去看關(guān)鍵代碼

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    // 方法名暴露了真相 "實際訂閱休雌,真實訂閱"
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        // 1.創(chuàng)建 CreateEmitter 對象,參數(shù)傳的是 observer肝断, 這個 observer 從哪冒出來的杈曲,待會兒就知道了
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        // 2.并且這個 observer 還訂閱了 CreateEmitter 對象
        observer.onSubscribe(parent);
        try {
            // 3.source 也訂閱了 CreateEmitter 對象
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    ...
}

首先ObservableCreate繼承Observable,那ObservableCreate就是個Observable了驰凛,對,就是這么簡單担扑,ObservableCreate就是我們要找的Observable對象 恰响。然后上面subscribeActual()方法里添加注釋的那 3 行代碼講的很清楚,一個是CreateEmitter(發(fā)射器)涌献,一個是observer(觀察者)胚宦,一個是 source(這個source就是ObservableOnSubscribe,下面我就以ObservableOnSubscribe指代source)燕垃,看到這 3 行代碼间唉,我們就敢假設(shè)整個觀察者模式的消息訂閱與發(fā)布就是由這 3 行代碼控制的,要驗證假設(shè)利术,我們還需再往下讀源碼呈野。既然observerObservableOnSubscribe都與CreateEmitter有關(guān),我們就來看看CreateEmitter印叁,

static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {

        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
           // 每次一定會先判斷連接有沒有切斷(就是有沒有 dispose)被冒,沒有切斷才接收事件
           // 這這個判斷就保證了一旦切斷肯定就收不到事件了
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            // 如果連接已經(jīng)切斷,還調(diào)用 Observer 的 onError() 方法轮蜕,那就拋異常了
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            // 如果連接沒有切斷昨悼,就調(diào)用 Observer 的 onError() 方法
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    // 如果連接沒有切斷,在調(diào)用 Observer 的 onError() 方法后跃洛,一定會調(diào)用 dispose() 切斷連接
                    dispose();
                }
                return true;
            }
            return false;
        }

        @Override
        public void onComplete() {
            // // 如果連接沒有切斷率触,就調(diào)用 Observer 的 onComplete() 方法
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    // 如果連接沒有切斷,在調(diào)用 Observer 的 onComplete() 方法后汇竭,一定會調(diào)用 dispose() 切斷連接
                    dispose();
                }
            }
        }
        ...
}

可以看到葱蝗,CreateEmitter繼承ObservableEmitter<T>Disposable,那它就既是個ObservableEmitter细燎,又是個Disposable两曼,那它什么時候是ObservableEmitter,又什么時候是Disposable呢玻驻,當(dāng)然是observer.onSubscribe(parent)里它是Disposable悼凑,在source.subscribe(parent)里它是ObservableEmitter
為什么這么說呢璧瞬,我們再接著看你就明白了户辫。這里我先講講source.subscribe(parent),其實這句代碼就是ObservableOnSubscribe.subscribe(ObservableEmitter),再看清楚些就是

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
               ...
            }
        }).subscribe(new Observer<Integer>() {
            ...
        });

上面幾行代碼就能解釋 “ 在source.subscribe(parent)里它是ObservableEmitter 這句話 嗤锉。
接下來看連接ObservableObserversubscribe()方法渔欢,

    @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        // 判空,不用看
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            // 1.這個其實沒啥用档冬,我們用的是最簡單的用法膘茎,所以參數(shù)傳的是什么桃纯,返回值將它原樣返回
            observer = RxJavaPlugins.onSubscribe(this, observer);
            // 判空,不用看
            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
            // 2.關(guān)鍵披坏,進(jìn)入這個方法發(fā)現(xiàn)它是 Observale 類里的一個抽象方法态坦,這個抽象方法在哪里實現(xiàn)呢,
            // 就在創(chuàng)建 Observable 對象的 create() 方法里的 new ObservableCreate<T>(source) 里
            // 不信可以翻看上面介紹 new ObservableCreate<T>(source) 的代碼
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            ...
        }
    }

subscribe()方法里的subscribeActual(observer) 方法在new ObservableCreate<T>(source)里重寫了棒拂,翻看 new ObservableCreate<T>(source) 的代碼后伞梯,你會發(fā)現(xiàn)它里面的 subscribeActual 方法里的 observer 就是 subscribe()方法里的observer

     // 方法名暴露了真相 "實際訂閱帚屉,真實訂閱"
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        // 1.創(chuàng)建 CreateEmitter 對象谜诫,參數(shù)傳的是 observer罚勾, 這個 observer 從哪冒出來的推盛,待會兒就知道了
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        // 2.并且這個 observer 還訂閱了 CreateEmitter 對象
        observer.onSubscribe(parent);
        try {
            // 3.source 也訂閱了 CreateEmitter 對象
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

既然subscribe()方法里的 observersubscribeActual方法里執(zhí)行了observer.onSubscribe(parent),那我們就來看下subscribe()方法的參數(shù)Observer對象吧躏啰,

...subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Log.d(TAG, "onSubscribe: isDisposable " + d.isDisposed());
            }

            @Override
            public void onNext(@NonNull Integer integer) {
                Log.d(TAG, "onNext: " + integer);
            }

            @Override
            public void onError(@NonNull Throwable e) {
                Log.d(TAG, "onError: " + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: ");
            }
        }
...

看沒看到 public void onSubscribe(@NonNull Disposable d), 在這里就可以解釋 “ 在observer.onSubscribe(parent)里它是Disposable 這句話 牢屋。
既然 “ 在source.subscribe(parent)里它是ObservableEmitter“ 在observer.onSubscribe(parent)里它是Disposable 都解釋清楚了且预,那ObservableObserver之間千絲萬縷的聯(lián)系也就全在上述 2 句話里了,ObservableObserver的事件發(fā)布和接收就是這 2 行代碼起的作用烙无。
這里還可以解釋一個問題锋谐,為什么在打印日志時我們發(fā)現(xiàn)

// Observer 里的 onSubscribe(@NonNull Disposable d) 方法先執(zhí)行
12-15 19:05:39.665 18795-18795/com.persist.rxjava D/MainActivity: onSubscribe: isDisposable false
// Observable 里的 subscribe(@NonNull ObservableEmitter<Integer> emitter) 方法后執(zhí)行
12-15 19:05:39.665 18795-18795/com.persist.rxjava D/MainActivity: Observable emit 1 
12-15 19:05:39.665 18795-18795/com.persist.rxjava D/MainActivity: onNext: 1
12-15 19:05:39.665 18795-18795/com.persist.rxjava D/MainActivity: Observable emit 2
12-15 19:05:39.666 18795-18795/com.persist.rxjava D/MainActivity: onNext: 2
12-15 19:05:39.666 18795-18795/com.persist.rxjava D/MainActivity: onComplete: 

因為在subscribeActual() 方法里它們的先后順序已經(jīng)定了,注釋 2 和注釋 3就是它們先后順序了截酷,可以看下面代碼涮拗。而且這也就是說ObservableObserver建立連接后,ObservableEmitter才開始發(fā)送事件迂苛。

    // 方法名暴露了真相 "實際訂閱三热,真實訂閱"
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        // 1.創(chuàng)建 CreateEmitter 對象,參數(shù)傳的是 observer灾部, 這個 observer 從哪冒出來的康铭,待會兒就知道了
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        // 2.并且這個 observer 還訂閱了 CreateEmitter 對象
        observer.onSubscribe(parent);
        try {
            // 3.source 也訂閱了 CreateEmitter 對象
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

總結(jié)

1.Observable對象其實是ObservableCreate對象;
2.ObservableCreate對象里的subscribeActual方法執(zhí)行了ObservableObserver連接的建立赌髓;
3.subscribeActual方法里的CreateEmitter既是ObservableEmitter(發(fā)射器)又是Disposable(切斷器),只有在連接沒有切斷的情況下ObserveronNext()方法才會執(zhí)行催跪,并且每次Observer執(zhí)行了onError()onComplete()方法后CreateEmitter對象里的onError()onComplete()方法中一定會自動執(zhí)行 dispose()切斷連接锁蠕;
4.在ObservableObserver建立連接之后,Observable才會由ObservableEmitter(實際上是CreateEmitter)發(fā)射事件懊蒸。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末荣倾,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子骑丸,更是在濱河造成了極大的恐慌舌仍,老刑警劉巖妒貌,帶你破解...
    沈念sama閱讀 221,576評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異铸豁,居然都是意外死亡灌曙,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,515評論 3 399
  • 文/潘曉璐 我一進(jìn)店門节芥,熙熙樓的掌柜王于貴愁眉苦臉地迎上來在刺,“玉大人,你說我怎么就攤上這事头镊◎纪眨” “怎么了?”我有些...
    開封第一講書人閱讀 168,017評論 0 360
  • 文/不壞的土叔 我叫張陵相艇,是天一觀的道長颖杏。 經(jīng)常有香客問我,道長坛芽,這世上最難降的妖魔是什么留储? 我笑而不...
    開封第一講書人閱讀 59,626評論 1 296
  • 正文 為了忘掉前任,我火速辦了婚禮靡馁,結(jié)果婚禮上欲鹏,老公的妹妹穿的比我還像新娘。我一直安慰自己臭墨,他們只是感情好赔嚎,可當(dāng)我...
    茶點故事閱讀 68,625評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著胧弛,像睡著了一般尤误。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上结缚,一...
    開封第一講書人閱讀 52,255評論 1 308
  • 那天损晤,我揣著相機與錄音,去河邊找鬼红竭。 笑死尤勋,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的茵宪。 我是一名探鬼主播最冰,決...
    沈念sama閱讀 40,825評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼稀火!你這毒婦竟也來了暖哨?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,729評論 0 276
  • 序言:老撾萬榮一對情侶失蹤凰狞,失蹤者是張志新(化名)和其女友劉穎篇裁,沒想到半個月后沛慢,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,271評論 1 320
  • 正文 獨居荒郊野嶺守林人離奇死亡达布,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,363評論 3 340
  • 正文 我和宋清朗相戀三年团甲,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片往枣。...
    茶點故事閱讀 40,498評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡伐庭,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出分冈,到底是詐尸還是另有隱情圾另,我是刑警寧澤,帶...
    沈念sama閱讀 36,183評論 5 350
  • 正文 年R本政府宣布雕沉,位于F島的核電站集乔,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏坡椒。R本人自食惡果不足惜扰路,卻給世界環(huán)境...
    茶點故事閱讀 41,867評論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望倔叼。 院中可真熱鬧汗唱,春花似錦、人聲如沸丈攒。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,338評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽巡验。三九已至际插,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間显设,已是汗流浹背框弛。 一陣腳步聲響...
    開封第一講書人閱讀 33,458評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留捕捂,地道東北人瑟枫。 一個月前我還...
    沈念sama閱讀 48,906評論 3 376
  • 正文 我出身青樓,卻偏偏與公主長得像指攒,于是被迫代替她去往敵國和親力奋。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,507評論 2 359

推薦閱讀更多精彩內(nèi)容