08.RxJava運(yùn)作流程源碼分析

RxJava線程切換非常方便愧捕,只要調(diào)用subscribeOn(Schedules.io())就可以使前邊的操作運(yùn)行于子線程,調(diào)用obsersableOn(AndroidSchedules.mainThread())就可以設(shè)置后邊的代碼運(yùn)行于主線程汁蝶,那么是如此神奇,他是如何實(shí)現(xiàn)的诸典?

今天就以下邊的代碼為切入點(diǎn)深入源碼看一下

Observable.just("我是網(wǎng)絡(luò)圖片url").map(new Function<String, String>() {
            @Override
            public String apply(@NonNull String s) throws Exception {
                Log.i(TAG, "apply1 thread:"+Thread.currentThread().getName());
                Log.i(TAG, "apply1");
                s = s +" 加上一個(gè)時(shí)間戳后";
                return s;
            }
        }).map(new Function<String, String>() {
            @Override
            public String apply(@NonNull String s) throws Exception {
                Log.i(TAG, "apply2 thread:"+Thread.currentThread().getName());
                Log.i(TAG, "apply2");
                s = s +" 加上第二個(gè)參數(shù)后";
                return s;
            }
        }).subscribeOn(Schedulers.io()).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe() {
                Log.i(TAG, "onSubscribe thread:"+Thread.currentThread().getName());
                Log.i(TAG, "onSubscribe");
            }

            @Override
            public void onNext(@NonNull String s) {
                Log.i(TAG, "onNext thread:"+Thread.currentThread().getName());
                Log.i(TAG, "onNext:"+s+" 開(kāi)啟下載這個(gè)圖片");

            }

            @Override
            public void onError(@NonNull Throwable throwable) {
                Log.i(TAG, "onError");
                throwable.printStackTrace();
            }

            @Override
            public void onComplete() {
                Log.i(TAG, "onComplete thread:"+Thread.currentThread().getName());
                Log.i(TAG, "onComplete:下載完成");
            }
        });

程序運(yùn)行流程圖如下


RxJava運(yùn)行流程圖.png

just方法

創(chuàng)建一個(gè)ObservableJust對(duì)象返回媒至,并將just傳入的參數(shù)保保存為value

@CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> just(T item) {
        ObjectHelper.requireNonNull(item, "The item is null");
        return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
    }

@SuppressWarnings({ "rawtypes", "unchecked" })
    @NonNull
    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

map方法

同樣道理,創(chuàng)建一個(gè)ObservableMap對(duì)象餐塘,由于map方法由上邊的ObservableJust對(duì)象調(diào)用妥衣,所以構(gòu)造方法中傳入的this表示的就是ObservableJust對(duì)象,創(chuàng)建ObservableMap對(duì)象后,保存上一級(jí)產(chǎn)生的ObservableJust為當(dāng)前ObservableMap對(duì)象中的成員變量source税手,保存當(dāng)前function回調(diào)接口蜂筹,這樣一來(lái),當(dāng)前對(duì)象持有上一級(jí)ObservableJust的引用芦倒。不管map調(diào)用幾次艺挪,當(dāng)前對(duì)象都會(huì)持有上一級(jí)產(chǎn)生的對(duì)象的引用

@CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }

假設(shè)再次調(diào)用map之后,這個(gè)map就是由上一次調(diào)用map產(chǎn)生的ObservableMap對(duì)象調(diào)用的兵扬,此時(shí)會(huì)將上一級(jí)這個(gè)ObservableMap對(duì)象保存到當(dāng)前對(duì)象的source成員變量中麻裳,就這樣,一級(jí)套一級(jí)

subscribeOn方法

產(chǎn)生一個(gè)ObservableSubscribeOn對(duì)象器钟,并將上一級(jí)的ObservableMap對(duì)象保存為當(dāng)前對(duì)象的source變量津坑,保存?zhèn)魅氲膕cheduler,那么這個(gè)scheduler是什么傲霸?

@CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

subscribeOn(Schedulers.io())方法使上邊的操作在子線程中執(zhí)行疆瑰,Schedulers.io()就是上邊傳入的schedulers,我們看一下schedulers是如何創(chuàng)建的
來(lái)到Schedulers類(lèi)中

public static Scheduler io() {
        return RxJavaPlugins.onIoScheduler(IO);
    }

可以找到IO對(duì)象是在本類(lèi)靜態(tài)代碼塊中創(chuàng)建的

 static {
        ....
        IO = RxJavaPlugins.initIoScheduler(new IOTask());
        ....
    }

IOTask是一個(gè)實(shí)現(xiàn)了Callable接口的線程

static final class IOTask implements Callable<Scheduler> {
        @Override
        public Scheduler call() throws Exception {
            return IoHolder.DEFAULT;
        }
    }

線程執(zhí)行會(huì)得到Scheduler昙啄,可以看到穆役,這是以?xún)?nèi)部類(lèi)形式實(shí)現(xiàn)的單例模式

static final class IoHolder {
        static final Scheduler DEFAULT = new IoScheduler();
    }

可以看到,這個(gè)IoScheduler內(nèi)部是線程池實(shí)現(xiàn)的

CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);

也就是說(shuō)梳凛,當(dāng)我們?cè)诖a中設(shè)置了這個(gè)操作之后(subscribeOn(Schedulers.io()))耿币,會(huì)創(chuàng)建一個(gè)線程池(如果存在就不必創(chuàng)建),很明顯韧拒,最終將會(huì)需要放在子線程中執(zhí)行的方法在這個(gè)線程池中執(zhí)行淹接,從而達(dá)到切換線程的效果,目前看到這里叭莫,這只能作為一個(gè)猜想蹈集,我們繼續(xù)往下看

observeOn(AndroidSchedulers.mainThread())方法

這個(gè)方法執(zhí)行會(huì)保存一個(gè)運(yùn)行于主線程的Scheduler,這個(gè)主線程Scheduler如何創(chuàng)建的雇初?
AndroidSchedulers中

private static final class MainHolder {

        static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    }

    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call() throws Exception {
                    return MainHolder.DEFAULT;
                }
            });

    /** A {@link Scheduler} which executes actions on the Android main thread. */
    public static Scheduler mainThread() {
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);

可以看到這個(gè)Scheduler是通過(guò)封裝Handler得到的一個(gè)運(yùn)行于主線程的封裝類(lèi)拢肆,這里將它保存起來(lái)。最后我們看subscribe方法

subscribe方法

public final void subscribe(Observer<? super T> observer) {
        ......
            subscribeActual(observer);
        ......
    }

protected abstract void subscribeActual(Observer<? super T> observer);

由于subscribeActual方法是抽象的靖诗,那么要從其子類(lèi)中找郭怪,subscribe方法由上次操作observeOn方法得到的ObservableObserveOn對(duì)象調(diào)用,所以會(huì)執(zhí)行這個(gè)類(lèi)中的subscribeActual方法刊橘,進(jìn)入ObservableObserveOn中

@Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

scheduler就是之前保存的AndroidSchedulers.mainThread對(duì)象鄙才,這里的source表示的就是上一級(jí)產(chǎn)生的Observable對(duì)象,具體到當(dāng)前代碼促绵,就是ObservableSubscribeOn攒庵,調(diào)用ObservableSubscribeOn中的subscribe方法嘴纺,逐層向上傳遞,直到傳遞到ObservableJust對(duì)象中浓冒,再不斷的調(diào)用map中傳入的function回調(diào)方法apply栽渴,當(dāng)apply方法調(diào)用完成,再執(zhí)行Observer的onNext onComplete方法稳懒,具體流程見(jiàn)上邊的流程圖闲擦,下一篇博客我將會(huì)詳細(xì)分析線程調(diào)度的源碼。
到這里场梆,這段示例代碼的流程已經(jīng)走了一遍

寫(xiě)了一個(gè)簡(jiǎn)化版的RxJava墅冷,實(shí)現(xiàn)了just map subscribeOn obserseOn方法,有助于對(duì)原理的理解或油,GitHub地址:https://github.com/renzhenming/MyRxJava

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末寞忿,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子顶岸,更是在濱河造成了極大的恐慌罐脊,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,755評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件蜕琴,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡宵溅,警方通過(guò)查閱死者的電腦和手機(jī)凌简,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,305評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)恃逻,“玉大人雏搂,你說(shuō)我怎么就攤上這事】芩穑” “怎么了凸郑?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,138評(píng)論 0 355
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)矛市。 經(jīng)常有香客問(wèn)我芙沥,道長(zhǎng),這世上最難降的妖魔是什么浊吏? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,791評(píng)論 1 295
  • 正文 為了忘掉前任而昨,我火速辦了婚禮,結(jié)果婚禮上找田,老公的妹妹穿的比我還像新娘歌憨。我一直安慰自己,他們只是感情好墩衙,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,794評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布务嫡。 她就那樣靜靜地躺著甲抖,像睡著了一般。 火紅的嫁衣襯著肌膚如雪心铃。 梳的紋絲不亂的頭發(fā)上准谚,一...
    開(kāi)封第一講書(shū)人閱讀 51,631評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音于个,去河邊找鬼氛魁。 笑死,一個(gè)胖子當(dāng)著我的面吹牛厅篓,可吹牛的內(nèi)容都是我干的秀存。 我是一名探鬼主播,決...
    沈念sama閱讀 40,362評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼羽氮,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼或链!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起档押,我...
    開(kāi)封第一講書(shū)人閱讀 39,264評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤澳盐,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后令宿,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體叼耙,經(jīng)...
    沈念sama閱讀 45,724評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,900評(píng)論 3 336
  • 正文 我和宋清朗相戀三年粒没,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了筛婉。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,040評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡癞松,死狀恐怖爽撒,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情响蓉,我是刑警寧澤硕勿,帶...
    沈念sama閱讀 35,742評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站枫甲,受9級(jí)特大地震影響源武,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜想幻,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,364評(píng)論 3 330
  • 文/蒙蒙 一软能、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧举畸,春花似錦查排、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,944評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)岖瑰。三九已至,卻和暖如春砂代,著一層夾襖步出監(jiān)牢的瞬間蹋订,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,060評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工刻伊, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留露戒,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,247評(píng)論 3 371
  • 正文 我出身青樓捶箱,卻偏偏與公主長(zhǎng)得像智什,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子丁屎,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,979評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容