Rxjava2原理完全剖析

概述

rxjava2想必大家都用的很熟練了牙丽,但我們大多數(shù)工程師又有多少是從源碼來深入了解它呢,尤其是在找工作面試中兔魂,想必大家深有體會烤芦,知道怎么用,卻不不知道怎么說入热∨淖兀總的來說就是對原理不了解。
也許你由于工作原因沒時間勺良,也可能是其他原因绰播,總之沒了解,沒關(guān)系我也沒怎么了解尚困。由于打算回家發(fā)展蠢箩,辭掉了現(xiàn)在的工作,自己也很想寫一篇關(guān)于rxjava2的文章事甜,也不用再去寫業(yè)務(wù)方面的代碼谬泌,想想現(xiàn)在的生活狀態(tài),真的很nice逻谦。

由于Android studio默認(rèn)沒有rxjava2的相關(guān)api掌实,所以第一步就是在github中找到rxjava2的庫,當(dāng)我在查找的時候已經(jīng)出來了rxjava3了邦马,有需要了解的可以 點擊了解贱鼻。這里我們只是針對rxjava2,rxjava3可能有一些新的api或什么的 但大多api還是通用的這里就不過多的闡述滋将。

在日常的工作中rxjava使用的最多的就是在網(wǎng)絡(luò)請求中邻悬,但如果你對rxjava運用的比較熟悉的話,它干的事情也是很多的随闽,比如切線程,類型的轉(zhuǎn)換,舉個簡單例子父丰,比如輸入的int類型的數(shù)據(jù),通過rxjava可以轉(zhuǎn)換成String類型掘宪。一個實體類轉(zhuǎn)換成另外一個實體類蛾扇。或者修改類中的成員變量添诉,或者遍歷集合等等都可以用rxjava來實現(xiàn)屁桑。其實這個庫功能是相當(dāng)強大的。我們也不要局限在只是使用在網(wǎng)絡(luò)框架請求上栏赴。

網(wǎng)絡(luò)框架使用的三方庫依賴

    implementation "io.reactivex.rxjava2:rxjava:2.2.8"
    implementation "com.squareup.retrofit2:retrofit:2.5.0"
    implementation "com.squareup.retrofit2:converter-gson:2.4.0"
    implementation "com.squareup.retrofit2:adapter-rxjava2:2.4.0"
    implementation "me.jessyan:retrofit-url-manager:1.4.0"
    implementation "io.reactivex.rxjava2:rxkotlin:2.2.0"
    implementation "io.reactivex.rxjava2:rxandroid:2.1.0"
    implementation "com.squareup.okhttp3:okhttp:3.14.1"
    implementation "com.google.code.gson:gson:2.8.5"
    implementation "com.squareup.okhttp3:logging-interceptor:3.11.0"  

注意:添加依賴后需要注明依賴來源 maven { url "https://jitpack.io" }
點擊右上方的同步按鈕蘑斧,依賴就算配置好了。
講解rxjava之前,先簡單的搭建下網(wǎng)絡(luò)請求的相關(guān)配置竖瘾,我這里以github 為例
比如使用users/{user}/repos網(wǎng)絡(luò)請求api

public interface Api {
    @GET("users/{user}/repos")
    Single<List<Repo>> listRepos(@Path("user") String user);
}

這里講解下Single 沟突,我這里使用的是Single,為什么使用它而沒有使用Observable或者是Flowable捕传,我個人覺得因為Single請求返回的只有成功或者失敗兩種狀態(tài)的返回方法惠拭,與網(wǎng)絡(luò)請求失敗或成功一致,因此我這里選擇了Single而不是其他兩種庸论,也有大佬說在網(wǎng)絡(luò)請求中使用Single比較好一點這也是選擇它的原因之一职辅,在實際項目中我這里的也是選擇Single的。
Repo實體類的內(nèi)容比較多我這里就不貼了聂示,主要還是看下網(wǎng)絡(luò)請求OkhttpClient和Retrofit的相關(guān)代碼
主要的地方我講下 其他的最后我會把代碼上傳到github域携,大家可以到github上看下就可以了

subscribeOn和observeOn

相信很多開發(fā)工作者對這兩種該如何使用有點犯暈,不知道這兩種該如何選擇使用鱼喉,甚至在代碼中亂用的現(xiàn)象秀鞭,有時候我看我同事使用的時候也會出現(xiàn)這種的現(xiàn)象。如何使用呢扛禽?其實也是很簡單的锋边。首先它們都是切換線程用的,它們都可以切換到主線程或者子線程编曼。比較通俗的講subscribeOn是對上游進行切換線程使用的豆巨,observeOn是對下游線程進行切換使用的。什么是上游什么又是下游呢掐场?其實也是很簡單的搀矫,我們都知道rxjava是鏈?zhǔn)秸{(diào)用的,這里的上游就是以你切換線程的地方作為分界點刻肄,上方就稱為上游,反之為下游融欧。比如上游需要進行耗時io操作敏弃。那么我們就可以使用subscribeOn(Schedulers.io()) ,當(dāng)然如果你了解了源碼噪馏,你會發(fā)現(xiàn)Schedulers可以選擇的線程方式開始有很多種的麦到,比如說newThread computation或者single等等。默認(rèn)的使用的是computation

computation

用于CPU密集型的計算任務(wù)欠肾,不適合I/O操作

newThread

為每個任務(wù)創(chuàng)建一個新的線程瓶颠。

single

single擁有一個線程單例,所有的任務(wù)都在這一個線程中執(zhí)行刺桃,當(dāng)此線程中有
任務(wù)執(zhí)行時粹淋,它的任務(wù)將會按照先進先出的順序依次執(zhí)行。

先來個簡單的例子

    Single<String> stringSingle = Single.just("1");
        stringSingle
                .subscribe(new SingleObserver<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onSuccess(String s) {
                        mTextView.setText(s);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }
                });

先講解上面的代碼,首先創(chuàng)建一個單一的事件流桃移,通過訂閱subscribe發(fā)送事件流屋匕,把數(shù)據(jù)發(fā)送到onSuccess
到這里只是簡單的描述了下整個事件流的事件執(zhí)行過程,現(xiàn)在從源碼來了解下它是如何返回的single對象借杰。

 public static <T> Single<T> just(final T item) {
        ObjectHelper.requireNonNull(item, "item is null");
        return RxJavaPlugins.onAssembly(new SingleJust<T>(item));
    }

方法塊的第一行判斷是否為null过吻,第二行onAssembly這個方法其實是一個鉤子,可以點進去看下蔗衡,代碼如下:

public static <T> Single<T> onAssembly(@NonNull Single<T> source) {
        Function<? super Single, ? extends Single> f = onSingleAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

onSingleAssembly是一個全局變量纤虽,它是對你的single進行一個統(tǒng)一的處理,比如增加日志绞惦,但我們這里是不需要使用到它的逼纸。也就是直接返回source而沒有執(zhí)行!=null判斷條件下的代碼塊。這里可以忽略它翩隧。
從上面的代碼可以分析到樊展,其實這里只是創(chuàng)建了一個SingleJust就返回了
我們在點擊SingleJust看下它的源碼長什么樣

public final class SingleJust<T> extends Single<T> {

    final T value;

    public SingleJust(T value) {
        this.value = value;
    }

    @Override
    protected void subscribeActual(SingleObserver<? super T> observer) {
        observer.onSubscribe(Disposables.disposed());
        observer.onSuccess(value);
    }

}

這段代碼其實也很好理解,就是把發(fā)送的1這個值存儲起來堆生,subscribeActual這個方法专缠,稍后再講解下,接下來我們看下訂閱事件的subscribe

public final void subscribe(SingleObserver<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");

        observer = RxJavaPlugins.onSubscribe(this, observer);

        ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null SingleObserver. Please check the handler provided to RxJavaPlugins.setOnSingleSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

        try {
            subscribeActual(observer);
        } catch (NullPointerException ex) {
            throw ex;
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            NullPointerException npe = new NullPointerException("subscribeActual failed");
            npe.initCause(ex);
            throw npe;
        }
    }

方法塊的第一行判斷null就不說了淑仆,第二行其實也是一個hook(鉤子)方法涝婉。點擊去看,你會發(fā)現(xiàn)跟上面的鉤子方法是很像的蔗怠。這個hook在默認(rèn)的情況下也是沒有用到的墩弯。這個方法最核心的代碼 subscribeActual(observer),我們點進去看下寞射,會發(fā)現(xiàn)這行代碼是一個抽象方法渔工。它是在哪里執(zhí)行的呢。其實就是上面講解的SingleJust中還沒有講解到的那個方法桥温。也就是這段代碼引矩,如下:

  protected void subscribeActual(SingleObserver<? super T> observer) {
        observer.onSubscribe(Disposables.disposed());
        observer.onSuccess(value);
    }

這就把創(chuàng)建的被觀察者和觀察者聯(lián)系起來了。

整個流程圖

image.png

有時候我們在開發(fā)的時候也會遇到這樣一種場景侵浸,就是當(dāng)前的activity不存在的情況旺韭,代碼正在執(zhí)行onSuccess方法,如果是在進行UI操作掏觉,由于找不到對應(yīng)的ui控件而直接crash的情況区端。那么我們該怎么做呢?這里就要用到Disposables澳腹。在當(dāng)前activity被onDestory之前阻斷事件流织盼。
在講解Disposable原理前杨何,我們對被觀察者分為后續(xù)延遲性和非后續(xù)非延遲性進行分類。比如上面提到的Single.just就屬于非后續(xù)非延遲性被觀察者悔政,Observable.just就屬于后續(xù)延遲性被觀察者晚吞。
這里給個簡單的例子來分析下Observable的后續(xù)和延遲性

Observable
                .interval(1, TimeUnit.SECONDS)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {

                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {

                    }
                }, new Action() {
                    @Override
                    public void run() throws Exception {

                    }
                }, new Consumer<Disposable>() {
                    @Override
                    public void accept(Disposable disposable) throws Exception {

                    }
                });

通過入口interval找到IntervalObserver,先來分析下這個類

 static final class IntervalObserver
    extends AtomicReference<Disposable>
    implements Disposable, Runnable {

        private static final long serialVersionUID = 346773832286157679L;

        final Observer<? super Long> downstream;

        long count;

        IntervalObserver(Observer<? super Long> downstream) {
            this.downstream = downstream;
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return get() == DisposableHelper.DISPOSED;
        }

        @Override
        public void run() {
            if (get() != DisposableHelper.DISPOSED) {
                downstream.onNext(count++);
            }
        }

        public void setResource(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }
    }

它繼承至AtomicReference<Disposable>谋国,AtomicReference可以保證原子操作的Disposable類對象的引用槽地。實現(xiàn)了Disposable接口,這里我們就能夠知道它內(nèi)部的操作是可以被打斷的芦瘾,但實際真正打斷的卻是AtomicReference類中的打斷方法捌蚊,也就是DisposableHelper.dispose(this);點進去可以發(fā)現(xiàn)他就是通過真正可以取消的Dispose去執(zhí)行取消的操作。那么它真正的取消的對象又是在哪呢近弟?其實就是上面的 DisposableHelper.setOnce(this, d)缅糟,而它是通過執(zhí)行setResource方法,而這個方法又是在執(zhí)行定時任務(wù)可取消的 Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit)得到的祷愉,整個過程就算理順了窗宦,同時他還實現(xiàn)了Runnable,這就保證了被觀察者具有延遲和持續(xù)性的條件二鳄,這里也可以看出它是在子線程中進行計時操作的赴涵。到最后在進行ui操作是需要切換到UI線程執(zhí)行的,那么這里就可以理解為它是通過子線程切換到主線程然后循環(huán)往復(fù)操作的订讼。它的作用就是讓下游處理的數(shù)據(jù)比上游晚髓窜,這樣就可以持續(xù)和數(shù)據(jù)的穩(wěn)定。

我們都知道rxjava有很多的操作符欺殿,接下來講解下rxjava比較常見的操作符寄纵。

Map操作符

map字面意思是映射,通過映射操作符返回的結(jié)果通過map做下轉(zhuǎn)換脖苏。比如說事件源是一個int類型我們希望轉(zhuǎn)換成String類型程拭,這個我們就可以用map操作符來實現(xiàn),比如:

Single<Integer> stringSingle = Single.just(1);
        stringSingle
                .map(new Function<Integer, String>() {
                    @Override
                    public String apply(Integer s) throws Exception {
                        return String.valueOf(s);
                    }
                })
                .subscribe(new SingleObserver<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onSuccess(String s) {
                        mTextView.setText(s);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }
                });

接著從源碼解讀map是如何實現(xiàn)類型轉(zhuǎn)換的

 public final <R> Single<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new SingleMap<T, R>(this, mapper));
    }

onAssembly上面已經(jīng)說過了,這里我們直接分析下SingleMap

public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) {
        this.source = source;
        this.mapper = mapper;
    }

對source和mapper兩個參數(shù)直接賦值棍潘,source指的是事件來源哺壶,來源指的Single<Integer>,mapper指的Function蜒谤,它相當(dāng)于一個轉(zhuǎn)換器。
接著看下面的代碼

 protected void subscribeActual(final SingleObserver<? super R> t) {
        source.subscribe(new MapSingleObserver<T, R>(t, mapper));
    }

了解到map把最上層single包裹在內(nèi)部至扰,通過內(nèi)部的上層的Single來訂閱事件并向下傳遞事件鳍徽。這里的范型t就是我們最外層看到的new Function<Integer, String>()

public void onSuccess(T value) {
            R v;
            try {
                v = ObjectHelper.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                onError(e);
                return;
            }

            t.onSuccess(v);
        }

關(guān)鍵代碼 v = ObjectHelper.requireNonNull(mapper.apply(value)
它除了判斷mapper.apply(value)是否為空外還對其進行了賦值操作。返回的R指的是轉(zhuǎn)換過后的值敢课。
通過源碼能夠了解到阶祭,從最外層鏈?zhǔn)秸{(diào)用讓我們直接想到構(gòu)建模式绷杜,但從源碼可以看到內(nèi)部都是通過new 新創(chuàng)建的
執(zhí)行流程如下


image.png

rxjava2中的操作符其實大部分都是這樣的。講到這大部分都講完了濒募,如果讀者覺得還有些沒講到的可以在評論區(qū)補充鞭盟,講的不到位的或者有誤的還望訂正

為了讓大家更好的理解,個人覺得有些從網(wǎng)上找來圖片能夠更好的解釋一些問題瑰剃,如有侵權(quán)齿诉,請聯(lián)系我,我會第一時間刪掉晌姚。后續(xù)還會講到retrofit源碼解析以及自定義view的使用及實戰(zhàn)都會在這個項目中使用

代碼傳送門
??????創(chuàng)作不易粤剧,你的鼓勵就是我最大的創(chuàng)作動力 歡迎star 點贊??

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市挥唠,隨后出現(xiàn)的幾起案子抵恋,更是在濱河造成了極大的恐慌,老刑警劉巖宝磨,帶你破解...
    沈念sama閱讀 206,126評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件弧关,死亡現(xiàn)場離奇詭異,居然都是意外死亡唤锉,警方通過查閱死者的電腦和手機世囊,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,254評論 2 382
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來腌紧,“玉大人茸习,你說我怎么就攤上這事”诶撸” “怎么了号胚?”我有些...
    開封第一講書人閱讀 152,445評論 0 341
  • 文/不壞的土叔 我叫張陵,是天一觀的道長浸遗。 經(jīng)常有香客問我猫胁,道長,這世上最難降的妖魔是什么跛锌? 我笑而不...
    開封第一講書人閱讀 55,185評論 1 278
  • 正文 為了忘掉前任弃秆,我火速辦了婚禮,結(jié)果婚禮上髓帽,老公的妹妹穿的比我還像新娘菠赚。我一直安慰自己末患,他們只是感情好漠嵌,可當(dāng)我...
    茶點故事閱讀 64,178評論 5 371
  • 文/花漫 我一把揭開白布仅财。 她就那樣靜靜地躺著异剥,像睡著了一般哗讥。 火紅的嫁衣襯著肌膚如雪朵栖。 梳的紋絲不亂的頭發(fā)上责循,一...
    開封第一講書人閱讀 48,970評論 1 284
  • 那天撮珠,我揣著相機與錄音,去河邊找鬼塌忽。 笑死拍埠,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的土居。 我是一名探鬼主播枣购,決...
    沈念sama閱讀 38,276評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼装盯!你這毒婦竟也來了坷虑?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 36,927評論 0 259
  • 序言:老撾萬榮一對情侶失蹤埂奈,失蹤者是張志新(化名)和其女友劉穎迄损,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體账磺,經(jīng)...
    沈念sama閱讀 43,400評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡芹敌,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,883評論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了垮抗。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片氏捞。...
    茶點故事閱讀 37,997評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖冒版,靈堂內(nèi)的尸體忽然破棺而出液茎,到底是詐尸還是另有隱情,我是刑警寧澤辞嗡,帶...
    沈念sama閱讀 33,646評論 4 322
  • 正文 年R本政府宣布捆等,位于F島的核電站,受9級特大地震影響续室,放射性物質(zhì)發(fā)生泄漏栋烤。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,213評論 3 307
  • 文/蒙蒙 一挺狰、第九天 我趴在偏房一處隱蔽的房頂上張望明郭。 院中可真熱鬧,春花似錦丰泊、人聲如沸薯定。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,204評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽沉唠。三九已至,卻和暖如春苛败,著一層夾襖步出監(jiān)牢的瞬間满葛,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,423評論 1 260
  • 我被黑心中介騙來泰國打工罢屈, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留嘀韧,地道東北人。 一個月前我還...
    沈念sama閱讀 45,423評論 2 352
  • 正文 我出身青樓缠捌,卻偏偏與公主長得像锄贷,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子曼月,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,722評論 2 345

推薦閱讀更多精彩內(nèi)容