Carson帶你學(xué)Android:手把手帶你源碼分析RxJava


前言

Rxjava由于其基于事件流的鏈?zhǔn)秸{(diào)用、邏輯簡潔 & 使用簡單的特點(diǎn),深受各大 Android開發(fā)者的歡迎。

如果還不了解RxJava潘靖,請看文章:Android:這是一篇 清晰 & 易懂的Rxjava 入門教程

  • 今天,我將為大家?guī)?源碼分析:Rxjava的訂閱流程蚤蔓,其為Rxjava使用的基本 & 核心卦溢,希望大家會喜歡。

Carson帶你學(xué)RxJava系列文章秀又,包括 原理单寂、操作符、應(yīng)用場景吐辙、背壓等等宣决,請關(guān)注看文章:Android:這是一份全面 & 詳細(xì)的RxJava學(xué)習(xí)指南


目錄

示意圖

1. RxJava簡介

此處簡單介紹RxJava

示意圖

若還不了解RxJava,請看文章:Android:這是一篇 清晰 & 易懂的Rxjava 入門教程


2. 訂閱流程 的使用

2.1 使用步驟

RxJava的訂閱流程 使用方式 = 基于事件流的鏈?zhǔn)秸{(diào)用昏苏,具體步驟如下:

步驟1:創(chuàng)建被觀察者(Observable)& 定義需發(fā)送的事件
步驟2:創(chuàng)建觀察者(Observer) & 定義響應(yīng)事件的行為
步驟3:通過訂閱(subscribe)連接觀察者和被觀察者

2.2 實(shí)例講解

// RxJava的鏈?zhǔn)讲僮?        Observable.create(new ObservableOnSubscribe<Integer>() {
        // 1. 創(chuàng)建被觀察者(Observable) & 定義需發(fā)送的事件
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).subscribe(new Observer<Integer>() {
            // 2. 創(chuàng)建觀察者(Observer) & 定義響應(yīng)事件的行為
            // 3. 通過訂閱(subscribe)連接觀察者和被觀察者
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "開始采用subscribe連接");
            }
            // 默認(rèn)最先調(diào)用復(fù)寫的 onSubscribe()

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "對Next事件"+ value +"作出響應(yīng)"  );
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "對Error事件作出響應(yīng)");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "對Complete事件作出響應(yīng)");
            }

        });
    }
}

  • 運(yùn)行結(jié)果
示意圖

3. 源碼分析

下面尊沸,我將根據(jù) 使用步驟 進(jìn)行RxJava的源碼分析:
步驟1:創(chuàng)建被觀察者(Observable)& 定義需發(fā)送的事件
步驟2:創(chuàng)建觀察者(Observer) & 定義響應(yīng)事件的行為
步驟3:通過訂閱(subscribe)連接觀察者和被觀察者

步驟1:創(chuàng)建被觀察者(Observable)& 定義需發(fā)送的事件

  • 源碼分析如下
/** 
  * 使用步驟1:創(chuàng)建被觀察者(Observable)& 定義需發(fā)送的事件
  **/

  Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        })

/** 
  * 源碼分析:Observable.create(new ObservableOnSubscribe<Integer>(){...})
  **/
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {

    ...
      // 僅貼出關(guān)鍵源碼

      return new ObservableCreate<T>(source);
      // 創(chuàng)建ObservableCreate類對象 ->>分析1
    // 注:傳入source對象(即 我們手動創(chuàng)建的ObservableOnSubscribe對象)  
    }


  /** 
    * 分析1:new ObservableCreate<T>(source)
    **/

    public final class ObservableCreate<T> extends Observable<T> {
  // ObservableCreate類 = Observable的子類 

      ...
      // 僅貼出關(guān)鍵源碼

        final ObservableOnSubscribe<T> source;

        // 構(gòu)造函數(shù)
      // 傳入了傳入source對象 = 手動創(chuàng)建的ObservableOnSubscribe對象
        public ObservableCreate(ObservableOnSubscribe<T> source) {
            this.source = source;
        }
        
    /** 
      * 重點(diǎn)關(guān)注:復(fù)寫了subscribeActual()
      * 作用:訂閱時,通過接口回調(diào) 調(diào)用被觀察者(Observerable) 與 觀察者(Observer)的方法
      **/
        @Override
        protected void subscribeActual(Observer<? super T> observer) {

              // 1. 創(chuàng)建1個CreateEmitter對象(封裝成1個Disposable對象)
          // 作用:發(fā)射事件
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);

            // 2. 調(diào)用觀察者(Observer)的onSubscribe()
            // onSubscribe()的實(shí)現(xiàn) = 使用步驟2(創(chuàng)建觀察者(Observer))時復(fù)寫的onSubscribe()
            observer.onSubscribe(parent);

            try {
                // 3. 調(diào)用source對象的subscribe()
                // source對象 = 使用步驟1(創(chuàng)建被觀察者(Observable))中創(chuàng)建的ObservableOnSubscribe對象 
                // subscribe()的實(shí)現(xiàn) = 使用步驟1(創(chuàng)建被觀察者(Observable))中復(fù)寫的subscribe()->>分析2
                source.subscribe(parent);

            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
    }

  /** 
    * 分析2:emitter.onNext("1");
    * 此處僅講解subscribe()實(shí)現(xiàn)中的onNext()
    * onError()捷雕、onComplete()類似椒丧,此處不作過多描述
    **/
    static final class CreateEmitter<T> extends AtomicReference<Disposable>
                                        implements ObservableEmitter<T>, Disposable {

        ...
        // 僅貼出關(guān)鍵代碼

        // onNext()源碼分析
        @Override
        public void onNext(T t) {
            // 注:發(fā)送的事件不可為空
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }

            // 若無斷開連接(調(diào)用Disposable.dispose()),則調(diào)用觀察者(Observer)的同名方法 = onNext()
            // 觀察者的onNext()的內(nèi)容 = 使用步驟2中復(fù)寫內(nèi)容
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        // onError()救巷、onComplete()類似,此處不作過多描述
        // 特別說明:調(diào)用該2方法句柠,最終都會自動調(diào)用dispose()浦译,即斷開觀察者 & 被觀察者的連接
        @Override
        public void onError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
            } else {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }
  • 步驟1總結(jié)
示意圖

步驟2:創(chuàng)建觀察者 & 定義響應(yīng)事件的行為

  • 源碼分析
/** 
  * 使用步驟2:創(chuàng)建觀察者 & 定義響應(yīng)事件的行為(方法內(nèi)的創(chuàng)建對象代碼)
  **/
  subscribe(new Observer<Integer>() {
      
              @Override
              public void onSubscribe(Disposable d) {
                  Log.d(TAG, "開始采用subscribe連接");
              }
              // 默認(rèn)最先調(diào)用復(fù)寫的 onSubscribe()

              @Override
              public void onNext(Integer value) {
                  Log.d(TAG, "對Next事件"+ value +"作出響應(yīng)"  );
              }

              @Override
              public void onError(Throwable e) {
                  Log.d(TAG, "對Error事件作出響應(yīng)");
              }

              @Override
              public void onComplete() {
                  Log.d(TAG, "對Complete事件作出響應(yīng)");
              }

          });

/** 
  * 源碼分析:Observer類
  **/
  public interface Observer<T> {
      // 注:Observer本質(zhì) = 1個接口

      // 接口內(nèi)含4個方法,分別用于 響應(yīng) 對應(yīng)于被觀察者發(fā)送的不同事件
        void onSubscribe(@NonNull Disposable d); // 內(nèi)部參數(shù):Disposable 對象溯职,可結(jié)束事件
        void onNext(@NonNull T t);
        void onError(@NonNull Throwable e);
        void onComplete();
    }

/** 
  * 特別說明:Subscriber類
  * 定義:RxJava 內(nèi)置的一個實(shí)現(xiàn)了 Observer 的抽象類
  * 作用:擴(kuò)展Observer 接口 = 新增了2個方法 = 
  *      1. onStart():在還未響應(yīng)事件前調(diào)用精盅,用于初始化工作
  *      2. unsubscribe():用于取消訂閱。在該方法被調(diào)用后谜酒,觀察者將不再接收 & 響應(yīng)事件
  *      注:調(diào)用該方法前叹俏,先使用 isUnsubscribed() 判斷狀態(tài),確定被觀察者Observable是否還持有觀察者Subscriber的引用僻族;若引用不能及時釋放粘驰,就會出現(xiàn)內(nèi)存泄露
  * 使用方式:與Observer使用幾乎相同(實(shí)質(zhì)上,Observer總是會先被轉(zhuǎn)換成Subscriber再使用)
  **/
  Subscriber<String> subscriber = new Subscriber<Integer>() {

            @Override
            public void onSubscribe(Subscription s) {
                Log.d(TAG, "開始采用subscribe連接");
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "對Next事件作出響應(yīng)" + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "對Error事件作出響應(yīng)");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "對Complete事件作出響應(yīng)");
            }
        };

步驟3:通過訂閱(subscribe)連接觀察者和被觀察者

  • 源碼分析
/** 
  * 使用步驟3:通過訂閱(subscribe)連接觀察者和被觀察者 = subscribe()
  **/
  subscribe(new Observer<Integer>() {
              // 2. 通過通過訂閱(subscribe)連接觀察者和被觀察者
              // 3. 創(chuàng)建觀察者 & 定義響應(yīng)事件的行為
              @Override
              public void onSubscribe(Disposable d) {
                  Log.d(TAG, "開始采用subscribe連接");
              }
              // 默認(rèn)最先調(diào)用復(fù)寫的 onSubscribe()

              @Override
              public void onNext(Integer value) {
                  Log.d(TAG, "對Next事件"+ value +"作出響應(yīng)"  );
              }

              @Override
              public void onError(Throwable e) {
                  Log.d(TAG, "對Error事件作出響應(yīng)");
              }

              @Override
              public void onComplete() {
                  Log.d(TAG, "對Complete事件作出響應(yīng)");
              }

          });

/** 
  * 源碼分析:Observable.subscribe(observer)
  * 說明:該方法屬于 Observable 類的方法(注:傳入1個 Observer 對象)
  **/  

  @Override
  public final void subscribe(Observer<? super T> observer) {

    ...
    // 僅貼出關(guān)鍵源碼
   
    subscribeActual(observer);
    // 繼續(xù)往下看:分析1

  }

/** 
  * Observable.subscribeActual(observer)
  * 說明:屬于抽象方法述么,由子類實(shí)現(xiàn)蝌数;此處的子類 = 步驟1創(chuàng)建被觀察者(Observable)時創(chuàng)建的ObservableCreate類
  * 即 在訂閱時,實(shí)際上是調(diào)用了步驟1創(chuàng)建被觀察者(Observable)時創(chuàng)建的ObservableCreate類里的subscribeActual()
  * 此時度秘,你應(yīng)該回頭看上面的步驟1里的subscribeActual()顶伞,應(yīng)該能理解RxJava的整個訂閱流程了。
  **/
  protected abstract void subscribeActual(Observer<? super T> observer);
  • 總結(jié)


    示意圖

4. 源碼總結(jié)

  • 在步驟1(創(chuàng)建被觀察者(Observable))、步驟2(創(chuàng)建觀察者(Observer))時唆貌,僅僅只是定義了發(fā)送的事件 & 響應(yīng)事件的行為滑潘;
  • 只有在步驟3(訂閱時),才開始發(fā)送事件 & 響應(yīng)事件锨咙,真正連接了被觀察者 & 觀察者
  • 具體源碼總結(jié)如下
示意圖

5. 特別注意:涉及多個被觀察者(Observable)的發(fā)送事件順序

  • 具體描述
示意圖
  • 實(shí)例講解
/** 
  * 存在涉及多個被觀察者(Observable)的情況
  **/
  
    // 創(chuàng)建第1個被觀察者(Observable1)
    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    emitter.onNext(1);
                    emitter.onNext(2);
                    emitter.onNext(3);
                }
            })
            // 使用flatMap操作符(內(nèi)部會創(chuàng)建第2個被觀察者(Observable2))
            .flatMap(new Function<Integer, ObservableSource<String>>() {
                @Override
                public ObservableSource<String> apply(Integer integer) throws Exception {
                    final List<String> list = new ArrayList<>();
                    for (int i = 0; i < 3; i++) {
                        list.add("我是事件" + integer + "拆分后的子事件" + i);
                        // 通過flatMap中將被觀察者生產(chǎn)的事件序列先進(jìn)行拆分语卤,再將每個事件轉(zhuǎn)換為一個新的發(fā)送三個String事件
                        // 最終合并,再發(fā)送給被觀察者
                    }
                    return Observable.fromIterable(list);
                }

            })
            .subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, "開始采用subscribe連接");
                }
                // 默認(rèn)最先調(diào)用復(fù)寫的 onSubscribe()

                @Override
                public void onNext(String value) {
                    Log.d(TAG, "響應(yīng)事件:"+ value   );
                }

                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "對Error事件作出響應(yīng)");
                }

                @Override
                public void onComplete() {
                    Log.d(TAG, "對Complete事件作出響應(yīng)");
                }
            });
            // 過程講解
            // 調(diào)用順序:先回調(diào)Observable2的subscribe(Observer) 蓖租、subscribeActual(Observer)粱侣、再回調(diào)Observable1的subscribe(Observer) 、subscribeActual(Observer)
            // Observable的發(fā)送順序 = 先發(fā)送Observable1蓖宦、再發(fā)送Observable2
  • 測試結(jié)果


    示意圖

6. 總結(jié)

  • 本文主要對 RxJava2 中 的訂閱流程進(jìn)行了源碼分析

  • Carson帶你學(xué)RxJava系列文章:

入門
Carson帶你學(xué)Android:這是一篇清晰易懂的Rxjava入門教程
Carson帶你學(xué)Android:面向初學(xué)者的RxJava使用指南
Carson帶你學(xué)Android:RxJava2.0到底更新了什么齐婴?
原理
Carson帶你學(xué)Android:圖文解析RxJava原理
Carson帶你學(xué)Android:手把手帶你源碼分析RxJava
使用教程:操作符
Carson帶你學(xué)Android:RxJava操作符教程
Carson帶你學(xué)Android:RxJava創(chuàng)建操作符
Carson帶你學(xué)Android:RxJava功能性操作符
Carson帶你學(xué)Android:RxJava過濾操作符
Carson帶你學(xué)Android:RxJava組合/合并操作符
Carson帶你學(xué)Android:RxJava變換操作符
Carson帶你學(xué)Android:RxJava條件/布爾操作符
實(shí)戰(zhàn)
Carson帶你學(xué)Android:什么時候應(yīng)該使用Rxjava?(開發(fā)場景匯總)
Carson帶你學(xué)Android:RxJava線程控制(含實(shí)例講解)
Carson帶你學(xué)Android:圖文詳解RxJava背壓策略
Carson帶你學(xué)Android:RxJava稠茂、Retrofit聯(lián)合使用匯總(含實(shí)例教程)
Carson帶你學(xué)Android:優(yōu)雅實(shí)現(xiàn)網(wǎng)絡(luò)請求嵌套回調(diào)
Carson帶你學(xué)Android:網(wǎng)絡(luò)請求輪詢(有條件)
Carson帶你學(xué)Android:網(wǎng)絡(luò)請求輪詢(無條件)
Carson帶你學(xué)Android:網(wǎng)絡(luò)請求出錯重連(結(jié)合Retrofit)
Carson帶你學(xué)Android:合并數(shù)據(jù)源
Carson帶你學(xué)Android:聯(lián)想搜索優(yōu)化
Carson帶你學(xué)Android:功能防抖
Carson帶你學(xué)Android:從磁盤/內(nèi)存緩存中獲取緩存數(shù)據(jù)
Carson帶你學(xué)Android:聯(lián)合判斷


歡迎關(guān)注Carson_Ho的簡書

不定期分享關(guān)于安卓開發(fā)的干貨柠偶,追求短、平睬关、快诱担,但卻不缺深度


請點(diǎn)贊电爹!因?yàn)槟愕墓膭钍俏覍懽鞯淖畲髣恿Γ?/h1>

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末蔫仙,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子丐箩,更是在濱河造成了極大的恐慌摇邦,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,214評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件屎勘,死亡現(xiàn)場離奇詭異施籍,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)概漱,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,307評論 2 382
  • 文/潘曉璐 我一進(jìn)店門丑慎,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人瓤摧,你說我怎么就攤上這事竿裂。” “怎么了姻灶?”我有些...
    開封第一講書人閱讀 152,543評論 0 341
  • 文/不壞的土叔 我叫張陵铛绰,是天一觀的道長。 經(jīng)常有香客問我产喉,道長捂掰,這世上最難降的妖魔是什么敢会? 我笑而不...
    開封第一講書人閱讀 55,221評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮这嚣,結(jié)果婚禮上鸥昏,老公的妹妹穿的比我還像新娘。我一直安慰自己姐帚,他們只是感情好吏垮,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,224評論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著罐旗,像睡著了一般膳汪。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上九秀,一...
    開封第一講書人閱讀 49,007評論 1 284
  • 那天遗嗽,我揣著相機(jī)與錄音,去河邊找鬼鼓蜒。 笑死痹换,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的都弹。 我是一名探鬼主播娇豫,決...
    沈念sama閱讀 38,313評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼畅厢!你這毒婦竟也來了冯痢?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 36,956評論 0 259
  • 序言:老撾萬榮一對情侶失蹤框杜,失蹤者是張志新(化名)和其女友劉穎系羞,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體霸琴,經(jīng)...
    沈念sama閱讀 43,441評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,925評論 2 323
  • 正文 我和宋清朗相戀三年昭伸,在試婚紗的時候發(fā)現(xiàn)自己被綠了梧乘。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,018評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡庐杨,死狀恐怖选调,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情灵份,我是刑警寧澤仁堪,帶...
    沈念sama閱讀 33,685評論 4 322
  • 正文 年R本政府宣布,位于F島的核電站填渠,受9級特大地震影響弦聂,放射性物質(zhì)發(fā)生泄漏鸟辅。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,234評論 3 307
  • 文/蒙蒙 一莺葫、第九天 我趴在偏房一處隱蔽的房頂上張望匪凉。 院中可真熱鬧,春花似錦捺檬、人聲如沸再层。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,240評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽聂受。三九已至,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背防楷。 一陣腳步聲響...
    開封第一講書人閱讀 31,464評論 1 261
  • 我被黑心中介騙來泰國打工歉备, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人毫蚓。 一個月前我還...
    沈念sama閱讀 45,467評論 2 352
  • 正文 我出身青樓,卻偏偏與公主長得像,于是被迫代替她去往敵國和親扛芽。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,762評論 2 345

推薦閱讀更多精彩內(nèi)容