RXJava2.0源碼淺析

image.png

RXJava是什么

RxJava 在 GitHub 主頁上官方解釋為:RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.(一個(gè)在 Java VM 上使用可觀測的序列來組成異步的连躏、基于事件的程序的庫)

為什么使用RXJava

上面說了RXJava是一個(gè)異步事件庫,Android中關(guān)于異步的處理有Handler消息機(jī)制贞滨、AsyncTask異步任務(wù)入热,但是使用RXJava來實(shí)行異步處理更加簡潔,鏈?zhǔn)秸{(diào)用書寫起來更方便晓铆,邏輯也更加清晰勺良,而且RXJava有很多適合各種場景的操作符,功能非常強(qiáng)大骄噪,使用起來非常方便尚困。

觀察者模式

觀察者模式面向的需求是:A 對(duì)象(觀察者)對(duì) B 對(duì)象(被觀察者)的某種變化高度敏感,A對(duì)象需要在 B 變化的一瞬間做出反應(yīng)链蕊。
RxJava 有兩個(gè)基本概念:Observable (可觀察者事甜,即被觀察者)、 Observer (觀察者)滔韵、 subscribe (訂閱)逻谦。Observable 和 Observer 通過 subscribe() 方法實(shí)現(xiàn)訂閱關(guān)系,從而 Observable 可以在需要的時(shí)候發(fā)出事件來通知 Observer陪蜻。

RXJava基本使用

Observable.create(new ObservableOnSubscribe<TestBean>() {
            @Override
            public void subscribe(ObservableEmitter<TestBean> e) throws Exception {
                Log.d(TAG, "---subscribe---");
                e.onNext(new TestBean());
                e.onComplete();
            }
        })
                .map(new Function<TestBean, String>() {
                    @Override
                    public String apply(TestBean testBean) throws Exception {
                        Log.d(TAG, "---map操作符---");
                        return "test";
                    }
                })
//                .subscribeOn(Schedulers.io())
//                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {
                        Log.d(TAG, "---onSubscribe---");
                    }

                    @Override
                    public void onNext(@NonNull String string) {
                        Log.d(TAG, "---onNext---");
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {
                        Log.d(TAG, "---onError---");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "---onComplete---");
                    }
                });
    }
  1. Observable.create(...)產(chǎn)生一個(gè)繼承抽象類Observable的子類對(duì)象(ObservableCreate對(duì)象)邦马,RxJavaPlugins.onAssembly(...)為hook相關(guān)操作,一般的調(diào)用f==null宴卖,直接返回傳進(jìn)去的參數(shù)滋将,即ObservableCreate對(duì)象
public abstract class Observable<T> implements ObservableSource<T> {

  ......
   @CheckReturnValue
   @SchedulerSupport(SchedulerSupport.NONE)
   public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
       //判空操作乙濒,如果為空直接拋空指針異常
       ObjectHelper.requireNonNull(source, "source is null");
       return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
   }
  ......
}

   @SuppressWarnings({ "rawtypes", "unchecked" })
   @NonNull
   public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
       Function<? super Observable, ? extends Observable> f = onObservableAssembly;
       if (f != null) {
           return apply(f, source);
       }
       return source;
   }

2.Observable.map(...)方法會(huì)產(chǎn)生一個(gè)ObservableMap對(duì)象徊哑,此對(duì)象的構(gòu)造方法會(huì)保存this(這里的this為Observable對(duì)象(多態(tài)),即上面返回的ObservableCreate對(duì)象)和傳進(jìn)來的mapper邻吞。

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        //判空操作齿兔,如果為空直接拋空指針異常
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }

3.Observable.subscribe(...)讓觀察者和被觀察者產(chǎn)生訂閱關(guān)系(第1次訂閱),在里面會(huì)調(diào)用subscribeActual(observer)方法,并將第1個(gè)觀察者observer回調(diào)過去。

   @SchedulerSupport(SchedulerSupport.NONE)
   @Override
   public final void subscribe(Observer<? super T> observer) {
       ObjectHelper.requireNonNull(observer, "observer is null");
       try {
           observer = RxJavaPlugins.onSubscribe(this, observer);

           ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

           subscribeActual(observer);
       } catch (NullPointerException e) { // NOPMD
           throw e;
       } catch (Throwable e) {
           Exceptions.throwIfFatal(e);
           // can't call onError because no way to know if a Disposable has been set or not
           // can't call onSubscribe because the call might have set a Subscription already
           RxJavaPlugins.onError(e);

           NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
           npe.initCause(e);
           throw npe;
       }
   }

subscribeActual(observer)為Observable的抽象方法分苇,實(shí)際上會(huì)執(zhí)行子類ObservableMapsubscribeActual(observer)如下:

 @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }

  • 該方法中的source為ObservableMap構(gòu)造函數(shù)中保存的Observable的子類(多態(tài))添诉,即ObservableCreate對(duì)象
  • source.subscribe(new MapObserver<T, U>(t, function)),又產(chǎn)生了一次訂閱(第2次訂閱)医寿,和上面的第3步一樣栏赴,會(huì)走Observable的抽象方法subscribeActual(observer),并將MapObserver對(duì)象(第2個(gè)觀察者)回調(diào)回去靖秩,其中MapObserver對(duì)象的構(gòu)造方法中會(huì)保存t(第1次訂閱回調(diào)過來的的observer)和function(mapper)
  • 實(shí)際會(huì)執(zhí)行ObservableCreate對(duì)象中的subscribeActual(observer)方法如下:
@Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

該方法會(huì)執(zhí)行兩個(gè)方法observer.onSubscribe(parent),source.subscribe(parent)

  1. 第一個(gè)方法中回調(diào)回來的observer為上面的mapObserver,
    parent為上面的CreateEmitter對(duì)象(下面再介紹CreateEmitter里面做了什么)
    Observer是一個(gè)接口,調(diào)用onSubscribe(parent)實(shí)際走的是mapObserver的父類BasicFuseableObserver(Observer的實(shí)現(xiàn)類)中的onSubscribe(...)方法,如下:
public abstract class BasicFuseableObserver<T, R> implements Observer<T>, QueueDisposable<R> {

            ......

    @SuppressWarnings("unchecked")
    @Override
    public final void onSubscribe(Disposable s) {
        if (DisposableHelper.validate(this.s, s)) {
            this.s = s;
            if (s instanceof QueueDisposable) {
                this.qs = (QueueDisposable<T>)s;
            }
            if (beforeDownstream()) {
                actual.onSubscribe(this);
                afterDownstream();
            }
        }
    }
              ......

}

其中actual為在MapObserver構(gòu)造方法中保存的observer即第一次回調(diào)回來的observer须眷,并將mapObserver回調(diào)回去
終于回到了最初第一次new的observer,接著就走到了下圖中的onSubscribe(@NonNull Disposable d),d為回調(diào)回來的mapObserver對(duì)象沟突。

image1.png

  1. 第二個(gè)方法source.subscribe(parent),source為上圖中new出來的ObservableOnSubscribe匿名對(duì)象花颗,parent為第一個(gè)方法里的CreateEmitter對(duì)象,執(zhí)行回調(diào)方法subscribe(ObservableEmitter<TestBean> e)惠拭,回調(diào)回來的ObservableEmitter對(duì)象(CreateEmitter對(duì)象),CreateEmitter構(gòu)造中保存了第2次訂閱回調(diào)回來的觀察者MapObserver對(duì)象如下圖扩劝, CreateEmitter對(duì)象可以調(diào)用我們常用的方法如事件的發(fā)射emitter.onNext(...)和完成事件emitter.onComplete()
  • 首先看emitter.onNext(...)做了什么操作

static final class CreateEmitter<T> extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {

       CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }
       @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }
}

observer為第2次訂閱回調(diào)回來的觀察者mapObserver,調(diào)用他的onNext(T t)方法职辅,在onNext(T t)方法中會(huì)執(zhí)行mapper.apply(t)并返回一個(gè)泛型U v,實(shí)際會(huì)回調(diào)到下圖中箭頭所指的apply(...)方法

static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;
        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }
        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }
            if (sourceMode != NONE) {
                actual.onNext(null);
                return;
            }
            U v;
            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            actual.onNext(v);
        }
}
image2.png

緊接著又會(huì)調(diào)用actual.onNext(v),最終會(huì)回到第一次訂閱回調(diào)回來的觀察者的onNext()棒呛。

  • 接著當(dāng)調(diào)用emitter.onComplete()時(shí)會(huì)走到mapObserver父類BasicFuseableObserveronComplete(),其中actual為第一次回調(diào)回來的觀察者observer域携,最終會(huì)回到它的onComplete()
@Override
    public void onComplete() {
        if (done) {
            return;
        }
        done = true;
        actual.onComplete();
    }

到此為止一個(gè)簡單的調(diào)用流程就走完了簇秒,當(dāng)被觀察者和觀察者產(chǎn)生訂閱關(guān)系時(shí),各個(gè)方法的調(diào)用順序?yàn)橄聢D中的1秀鞭、2趋观、3、4气筋、5拆内、6

image3.png

總結(jié)

Observable.create(new ObservableOnSubscribe<TestBean>() {...})
                .map(new Function<TestBean, String>(){...} )
                //.subscribeOn(Schedulers.io())
               // .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<String>() {...})
  1. Observable通過.XXX(...)都會(huì)依次生成一個(gè)Observable對(duì)象(多態(tài)),每個(gè)Observable對(duì)象都會(huì)依次保存上一個(gè)Observable對(duì)象的引用和傳進(jìn)來的參數(shù)宠默,參數(shù)最終會(huì)被保存到觀察者observer中(鏈?zhǔn)秸{(diào)用連起來的關(guān)鍵)麸恍。
  2. 當(dāng)通過Observable.subscribe(new Observer<String>(){...})產(chǎn)生訂閱時(shí),抽象類Observable會(huì)在subscribe(observer)方法中調(diào)用subscribeActual(observer)(子類已復(fù)寫該方法),實(shí)際上就是Observable子類(ObservableCreate搀矫、ObservableMap等均繼承Observable)執(zhí)行此方法抹沪,在subscribeActual(observer)方法中會(huì)調(diào)用source.subscribe(parent)利用保存的上個(gè)Observable(source)又一次產(chǎn)生訂閱,也就是自下而上產(chǎn)生訂閱直到第一個(gè)source開始發(fā)射事件瓤球。
  3. subscribeActual(observer)會(huì)生成一個(gè)新的observer(如MapObserver對(duì)象),新的observer會(huì)自下而上的保存上個(gè)observer,當(dāng)發(fā)射事件時(shí)會(huì)利用observer 自上而下將事件依次發(fā)送到最下面的observer上融欧。

結(jié)束

第一次寫blog,表達(dá)能力也不好,希望用來記錄自己的學(xué)習(xí)過程卦羡,以上的RXJava只是最基本的調(diào)用(注釋了線程切換噪馏,如果加上線程切換整體流程大同小異)麦到,有不對(duì)的地方歡迎指正,下次準(zhǔn)備學(xué)習(xí)RXJava的Schedulers線程調(diào)度欠肾。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末瓶颠,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子刺桃,更是在濱河造成了極大的恐慌粹淋,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,692評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件瑟慈,死亡現(xiàn)場離奇詭異桃移,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)葛碧,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,482評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門借杰,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人吹埠,你說我怎么就攤上這事第步。” “怎么了缘琅?”我有些...
    開封第一講書人閱讀 162,995評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵粘都,是天一觀的道長。 經(jīng)常有香客問我刷袍,道長翩隧,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,223評(píng)論 1 292
  • 正文 為了忘掉前任呻纹,我火速辦了婚禮堆生,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘雷酪。我一直安慰自己淑仆,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,245評(píng)論 6 388
  • 文/花漫 我一把揭開白布哥力。 她就那樣靜靜地躺著蔗怠,像睡著了一般。 火紅的嫁衣襯著肌膚如雪吩跋。 梳的紋絲不亂的頭發(fā)上寞射,一...
    開封第一講書人閱讀 51,208評(píng)論 1 299
  • 那天,我揣著相機(jī)與錄音锌钮,去河邊找鬼桥温。 笑死,一個(gè)胖子當(dāng)著我的面吹牛梁丘,可吹牛的內(nèi)容都是我干的侵浸。 我是一名探鬼主播旺韭,決...
    沈念sama閱讀 40,091評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼掏觉!你這毒婦竟也來了茂翔?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,929評(píng)論 0 274
  • 序言:老撾萬榮一對(duì)情侶失蹤履腋,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后惭嚣,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體遵湖,經(jīng)...
    沈念sama閱讀 45,346評(píng)論 1 311
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,570評(píng)論 2 333
  • 正文 我和宋清朗相戀三年晚吞,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了延旧。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,739評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡槽地,死狀恐怖迁沫,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情捌蚊,我是刑警寧澤集畅,帶...
    沈念sama閱讀 35,437評(píng)論 5 344
  • 正文 年R本政府宣布,位于F島的核電站缅糟,受9級(jí)特大地震影響挺智,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜窗宦,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,037評(píng)論 3 326
  • 文/蒙蒙 一赦颇、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧赴涵,春花似錦媒怯、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,677評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至纱烘,卻和暖如春杨拐,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背擂啥。 一陣腳步聲響...
    開封第一講書人閱讀 32,833評(píng)論 1 269
  • 我被黑心中介騙來泰國打工哄陶, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人哺壶。 一個(gè)月前我還...
    沈念sama閱讀 47,760評(píng)論 2 369
  • 正文 我出身青樓屋吨,卻偏偏與公主長得像蜒谤,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子至扰,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,647評(píng)論 2 354