RxJava2源碼分析(二) ---- subscribeOn

subscribeOn

    Observable.create((ObservableOnSubscribe<Integer>) e -> {
        System.out.println("observable : " + Thread.currentThread());
        e.onNext(1);
    })
            .subscribeOn(Schedulers.single())
            .subscribe(integer -> {
                System.out.println(integer);
                System.out.println("observer:  " + Thread.currentThread());
            });

Rxjava默認是在當前線程生發(fā)送事件, subscribeOn可以切換Observable發(fā)送事件所在的線程;
如果沒有使用ObserveOn指定消費事件的線程, Observer將在Observable發(fā)送事件的的線程, 消費事件;

源碼分析目的:

  1. Schduler 作用
  2. subscribeOn 做了什么

1. Schduler

Schduler不好直接用代碼解釋, 先說結(jié)論, 后面再去具體代碼分析;

  1. 切換線程, 需要提供對應(yīng)的Schduler;
  2. Schduler可以通過createWorker方法, 創(chuàng)建一個Worker類的實例;
  3. Worker有一個schedule方法, 提交runnable去運行; 切換線程, 就是把各個onNext的調(diào)用方法,封裝成一個runnable 提交到指定線程去運行;
  4. 通過Worker.schedule提交runnable后, 會返回一個disposable對象, 用于取消或控制Observale的發(fā)射任務(wù);
  5. Schduler本身是個管理類, 一般內(nèi)部會創(chuàng)建具體的線程池, 同時通過統(tǒng)一的start shutdown等方法管理著線程池
  6. Schduler同時也管理著由createWorker創(chuàng)建的Worker; Worker一般都是持有Schduler中的線程池, 提交的runnable也是提交到該線程池

2. subscribeOn

2.1 Observable.subscribeOn
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

1. 參數(shù)檢測
2. 創(chuàng)建`ObservableOnSubscribe`對象, 并將當前Observable和Schduler傳入;
3. RxJavaPlugins的hook; 這個前面說過, 用于hook, 默認傳入什么 就返回什么;
2.2 ObservableSubscribeOn
  1. ObservableSubscribeOn是Observable的子類, 內(nèi)部包含一個Observableschduler, 用于對原Obverable擴展, 是一個裝飾模式;
  2. 上面說了, ObservableSubscribeOn是一個裝飾模式, 繼承于HasUpstreamObservableSource, 有一個source方法去獲取被裝飾的Observable對象;
  3. 上一篇說過, Observable.create方法創(chuàng)建的Observable, 實際是一個ObservableCreater對象, 現(xiàn)在ObservableScbscribeOn中包含的Observable即ObservableCreater;
  4. Observable的subscribe方法, 實際調(diào)用的是具體子類的subscribeActual方法;
2.3 ObservableSubscribeOn.subscribeActual

直接看 ObservableSubscribeOn.subscribeActual 的代碼;

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
  1. 創(chuàng)建 SubscribeOnObserver, 并傳入原始的Observer
  2. 調(diào)用Observer的onSubscribe方法
  3. 構(gòu)建SubscribeTask, 并提交給Schduler去執(zhí)行
2.4 SubscribeOnObserver

SubscribeOnObserverObservableSubscribeOn的靜態(tài)內(nèi)部類, 同時也是繼承于Observer, 內(nèi)部也包含一個原始的Observer, 也是一個裝飾模式;

SubscribeOnObserver對被裝飾類沒有額外增加功能, 僅僅是一個封裝, 在onNext, onError等方法中, 直接是調(diào)用的actual.onNext, actual.onError;

2.5 SubscribeTask

SubscribeTask 是一個runnable對象, 是ObservableSubscribeOn的內(nèi)部類; 前面Schduler中說過, 切換線程, 就是將消息發(fā)送,包裝成一個runnable, 提交給Worker去執(zhí)行;
這個SubscribeTask將原先的發(fā)送事件代碼 封裝成的runnable, 然后送去對應(yīng)的線程池執(zhí)行;

直接看run方法

    public void run() {
        source.subscribe(parent);
    }

ObservableSubscribeOn是Observable的子類, 同時是裝飾模式, 內(nèi)部持有一個Observable, source是被包裝的Observable, 在此處的代碼中, source即是ObservableCreater, parent是SubscribeOnObserver, source.subscribe即和第一篇中的邏輯一樣了;

2.6 scheduler.scheduleDirect
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
    return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}

@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    final Worker w = createWorker();

    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    DisposeTask task = new DisposeTask(decoratedRun, w);

    w.schedule(task, delay, unit);

    return task;
}
  1. 通過createWorker創(chuàng)建相應(yīng)的Worker;
  2. hook處理相應(yīng)的runnable, 默認沒處理;
  3. 創(chuàng)建DisposeTask, 將需要運行的runnable對象, 封裝成disposable對象, 用于執(zhí)行取消操作;
  4. 將封裝后的runnable提交給worker去運行;

此處的scheduler由Schedulers.single()生成, 實際是一個SingleScheduler;

2.6.1 Worker.schedule()

直接看 SingleScheduler的代碼

####### Schedulers.createWorker 創(chuàng)建Worker; 獲取公共的線程池, 創(chuàng)建Worker

    public Worker createWorker() {
        return new ScheduledWorker(executor.get());
    }

####### Worker.scheduler

    public Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        if (disposed) {
            return EmptyDisposable.INSTANCE;
        }

        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, tasks);
        tasks.add(sr);

        try {
            Future<?> f;
            if (delay <= 0L) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delay, unit);
            }

            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            dispose();
            RxJavaPlugins.onError(ex);
            return EmptyDisposable.INSTANCE;
        }

        return sr;
    }
  1. 封裝傳入的runnable對象, 將其封裝成ScheduledRunnable對象
  2. 提交給線程池運行, ScheduledRunnable本身是一個Callable對象, 可以用于取消執(zhí)行

上述提交給線程池運行的流程, 最終封裝的運行的run方法, 其實還是最先封裝的SubscribeTask中的source.subscribe(parent);這一句代碼;
SubscribeTask本身對應(yīng)的runnable被一次次傳遞封裝, 最后給線程池運行;
source.subscribe(parent);中, 上面說到是一個裝飾模式, 運行的還是Observable.subscribeActual方法, 最后的運行邏輯和上一篇相同;
最后會調(diào)到ObservableSubscribeOn.onNext方法, 內(nèi)部沒做處理, 裝飾模式,調(diào)用上一級的onNext方法

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市炸宵,隨后出現(xiàn)的幾起案子吕朵,更是在濱河造成了極大的恐慌,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,539評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件闷袒,死亡現(xiàn)場離奇詭異致盟,居然都是意外死亡,警方通過查閱死者的電腦和手機匾南,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,594評論 3 396
  • 文/潘曉璐 我一進店門啃匿,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人蛆楞,你說我怎么就攤上這事溯乒。” “怎么了豹爹?”我有些...
    開封第一講書人閱讀 165,871評論 0 356
  • 文/不壞的土叔 我叫張陵裆悄,是天一觀的道長。 經(jīng)常有香客問我臂聋,道長光稼,這世上最難降的妖魔是什么崖技? 我笑而不...
    開封第一講書人閱讀 58,963評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮钟哥,結(jié)果婚禮上迎献,老公的妹妹穿的比我還像新娘。我一直安慰自己腻贰,他們只是感情好吁恍,可當我...
    茶點故事閱讀 67,984評論 6 393
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著播演,像睡著了一般冀瓦。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上写烤,一...
    開封第一講書人閱讀 51,763評論 1 307
  • 那天翼闽,我揣著相機與錄音,去河邊找鬼洲炊。 笑死感局,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的暂衡。 我是一名探鬼主播询微,決...
    沈念sama閱讀 40,468評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼狂巢!你這毒婦竟也來了撑毛?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,357評論 0 276
  • 序言:老撾萬榮一對情侶失蹤唧领,失蹤者是張志新(化名)和其女友劉穎藻雌,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體斩个,經(jīng)...
    沈念sama閱讀 45,850評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡胯杭,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,002評論 3 338
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了萨驶。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片歉摧。...
    茶點故事閱讀 40,144評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖腔呜,靈堂內(nèi)的尸體忽然破棺而出叁温,到底是詐尸還是另有隱情,我是刑警寧澤核畴,帶...
    沈念sama閱讀 35,823評論 5 346
  • 正文 年R本政府宣布膝但,位于F島的核電站,受9級特大地震影響谤草,放射性物質(zhì)發(fā)生泄漏跟束。R本人自食惡果不足惜莺奸,卻給世界環(huán)境...
    茶點故事閱讀 41,483評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望冀宴。 院中可真熱鬧灭贷,春花似錦、人聲如沸略贮。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,026評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽逃延。三九已至览妖,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間揽祥,已是汗流浹背讽膏。 一陣腳步聲響...
    開封第一講書人閱讀 33,150評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留拄丰,地道東北人府树。 一個月前我還...
    沈念sama閱讀 48,415評論 3 373
  • 正文 我出身青樓,卻偏偏與公主長得像愈案,于是被迫代替她去往敵國和親挺尾。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,092評論 2 355