RxJava 線程調(diào)度和源碼閱讀

ReactiveX 系列文章目錄


Scheduler

通過 Scheduler 來控制被觀察者在哪個線程發(fā)射质欲,觀察者在哪個線程接收捻脖。默認(rèn)情況肝匆,發(fā)射時在哪個線程,接收就在哪個線程坛悉。

RxJava 內(nèi)置了幾個 Scheduler,通過 Schedulers 來獲取承绸。

  • Schedulers.trampoline():當(dāng)其它排隊(duì)的任務(wù)完成后,在當(dāng)前線程排隊(duì)開始執(zhí)行挣轨,F(xiàn)IFO军熏。
  • Schedulers.newThread(): 總是啟用新線程,并在新線程執(zhí)行操作卷扮。
  • Schedulers.single():擁有一個線程,和 newThread 相比晤锹,這個線程可以共用摩幔。
  • Schedulers.computation():計(jì)算所使用的 Scheduler鞭铆。這個計(jì)算指的是 CPU 密集型計(jì)算,即不會被 I/O 等操作限制性能的操作车遂,例如圖形的計(jì)算封断。這個 Scheduler 使用的固定的線程池,大小為 CPU 核數(shù)舶担。不要把 I/O 操作放在 computation() 中坡疼,否則 I/O 操作的等待時間會浪費(fèi) CPU。
  • Schedulers.io():I/O 操作(讀寫文件衣陶、讀寫數(shù)據(jù)庫柄瑰、網(wǎng)絡(luò)信息交互等)所使用的 Scheduler。行為模式和 newThread() 差不多剪况,區(qū)別在于 io() 的內(nèi)部實(shí)現(xiàn)是用一個無數(shù)量上限的線程池教沾,可以重用空閑的線程,因此多數(shù)情況下 io() 比 newThread() 更有效率拯欧。不要把計(jì)算工作放在 io() 中详囤,可以避免創(chuàng)建不必要的線程。

切換線程

  • subscribeOn(): 控制事件產(chǎn)生的線程镐作。
  • observeOn(): 控制事件消費(fèi)的線程藏姐。
  • unsubscribeOn(): 控制解除訂閱時的線程。
Observable.create(ObservableOnSubscribe<String> { emitter ->
                Log.e("RX", "發(fā)射線程 ${Thread.currentThread().name}")
                emitter.onNext("a")
            }).subscribeOn(Schedulers.io()) // 在 io 線程發(fā)射
                    .observeOn(Schedulers.computation()) // 在計(jì)算線程接收
                    .subscribe( {
                        Log.e("RX", "接收線程 ${Thread.currentThread().name}")
                        Log.e("RX", "接收數(shù)據(jù) $it")
                    })

日志顯示:

發(fā)射線程 RxCachedThreadScheduler-1
接收線程 RxComputationThreadPool-1
接收數(shù)據(jù) a

如果 observeOn(Schedulers.trampoline())该贾,意思是在當(dāng)前線程羔杨,由于發(fā)射時將線程切換到 io 上去了,所以接收時也在這個 io 線程上杨蛋,日志顯示:

發(fā)射線程 RxCachedThreadScheduler-1
接收線程 RxCachedThreadScheduler-1
接收數(shù)據(jù) a

多次切換接收線程

Observable.create(ObservableOnSubscribe<String> { emitter ->
                Log.e("RX", "發(fā)射線程 ${Thread.currentThread().name}")
                emitter.onNext("a")
            }).subscribeOn(Schedulers.io()) // 在 io 線程發(fā)射
                    .observeOn(Schedulers.computation())
                    .map {
                        Log.e("RX", "第一次轉(zhuǎn)換數(shù)據(jù)的線程 ${Thread.currentThread().name}")
                        "$it$it"
                    } // 雙倍
                    .observeOn(Schedulers.newThread())
                    .map {
                        Log.e("RX", "第二次轉(zhuǎn)換數(shù)據(jù)的線程 ${Thread.currentThread().name}")
                        "$it$it" } // 四倍
                    .observeOn(Schedulers.single())
                    .subscribe( {
                        Log.e("RX", "接收線程 ${Thread.currentThread().name}")
                        Log.e("RX", "接收數(shù)據(jù) $it")
                    })

日志:

發(fā)射線程 RxCachedThreadScheduler-1
第一次轉(zhuǎn)換數(shù)據(jù)的線程 RxComputationThreadPool-1
第二次轉(zhuǎn)換數(shù)據(jù)的線程 RxNewThreadScheduler-1
接收線程 RxSingleScheduler-1
接收數(shù)據(jù) aaaa

源碼分析

對象創(chuàng)建

  1. Observable 通過 create 方法創(chuàng)建 ObservableCreate兜材,將參數(shù) ObservableOnSubscribe 作為 ObservableCreate 的 source理澎。
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
   ...
   return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
  1. 在上面返回的 ObservableCreate 上調(diào)用 subscribeOn,創(chuàng)建 ObservableSubscribeOn曙寡,ObservableCreate 對象作為 source糠爬。
public final Observable<T> subscribeOn(Scheduler scheduler) {
    ...
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
  1. 在上面返回的 ObservableSubscribeOn 上調(diào)用 observeOn,創(chuàng)建 ObservableObserveOn举庶,ObservableSubscribeOn 自己作為 ObservableObserveOn 的 source执隧。
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ...
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

基本上每調(diào)一個方法,就創(chuàng)建了一個 Observable 的實(shí)現(xiàn)類户侥,然后將上層調(diào)用它的 Observable 作為自己內(nèi)部的 source镀琉。

訂閱

向上傳遞觀察者

最后調(diào)用 ObservableObserveOn 的 subscribe,內(nèi)部調(diào)用 subscribeActual蕊唐,參數(shù)是用戶傳遞進(jìn)來的 Observer屋摔。

protected void subscribeActual(Observer<? super T> observer) {
    // 如果是 TrampolineScheduler,在當(dāng)前線程執(zhí)行替梨,不涉及任何線程的切換钓试,所以直接調(diào) source.subscribe
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        Scheduler.Worker w = scheduler.createWorker();
        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}

如果 scheduler 不是 TrampolineScheduler,比如 NewThreadScheduler耙替,先調(diào)用 createWorker 方法亚侠,創(chuàng)建 NewThreadWorker,實(shí)現(xiàn)了 Disposable俗扇。

public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    private final ScheduledExecutorService executor;

    public NewThreadWorker(ThreadFactory threadFactory) {
        executor = SchedulerPoolFactory.create(threadFactory);
    }

    ...
}

構(gòu)造中創(chuàng)建一個線程池硝烂,再深入下去看,創(chuàng)建了一個只有一個核心線程的線程池對象铜幽。然后根據(jù)外界傳入的 Observer 和這個線程池封裝出另一個 Observer滞谢。所以在 ObservableSubscribeOn 的對象 source 上用重新封裝好的一個觀察者訂閱它。這樣就會調(diào)用 ObservableSubscribeOn 的 subscribeActual 方法除抛。

@Override
public void subscribeActual(final Observer<? super T> s) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

    s.onSubscribe(parent);

    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
  • 第一步對下級的被觀察者拋上來的觀察者再封裝一次狮杨,這個 s 其實(shí)就是 ObservableObserveOn 中封裝后的 ObserveOnObserver。

  • 第二步調(diào)用 onSubscribe到忽,看 ObservableObserveOn 的 onSubscribe 方法

    public void onSubscribe(Disposable s) {
       if (DisposableHelper.validate(this.s, s)) {
           this.s = s;
    
           // 創(chuàng)建一個隊(duì)列
           queue = new SpscLinkedArrayQueue<T>(bufferSize);
    
           // actual 是下級的觀察者橄教,這里最外層用戶傳進(jìn)來的那個 Observer 就收到了 onSubscribe 回調(diào)
           actual.onSubscribe(this);
       }
    }
    
  • 第三步是最主要的開始發(fā)射數(shù)據(jù)

向下發(fā)射數(shù)據(jù)

final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver<T> parent;

    SubscribeTask(SubscribeOnObserver<T> parent) {
        this.parent = parent;
    }

    @Override
    public void run() {
        source.subscribe(parent);
    }
}

SubscribeTask 是一個 Runnable,內(nèi)部持有最后封裝的 Observer喘漏。且 run 方法就是用這個觀察者去訂閱 source护蝶。在這個例子里,ObservableSubscribeOn 中的 source 就是上層 ObservableCreate翩迈。

而 scheduler.scheduleDirect 這個 Runnable持灰,假設(shè)發(fā)射的 schedule 是 IoScheduler,scheduleDirect 最后是

@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    // 創(chuàng)建線程负饲,返回 EventLoopWorker
    final Worker w = createWorker();

    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    DisposeTask task = new DisposeTask(decoratedRun, w);

    w.schedule(task, delay, unit);

    return task;
}

看最后調(diào)用 EventLoopWorker 的 schedule 方法

public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
    ...
    return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
    Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

    if (parent != null) {
        if (!parent.add(sr)) {
            return sr;
        }
    }

    Future<?> f;
    try {
        if (delayTime <= 0) {
            f = executor.submit((Callable<Object>)sr);
        } else {
            f = executor.schedule((Callable<Object>)sr, delayTime, unit);
        }
        sr.setFuture(f);
    } catch (RejectedExecutionException ex) {
        if (parent != null) {
            parent.remove(sr);
        }
        RxJavaPlugins.onError(ex);
    }

    return sr;
}

這里使用線程池方法執(zhí)行參數(shù) run 里面的任務(wù)堤魁,參數(shù)這個 run 是 DisposeTask 對象喂链。

public void run() {
    runner = Thread.currentThread();
    try {
        decoratedRun.run();
    } finally {
        dispose();
        runner = null;
    }
}

這個 decoratedRun 就相當(dāng)于上面的 SubscribeTask。run 方法中是用經(jīng)過若干層封裝的觀察者訂閱最初的那個被觀察者妥泉。

觀察者收到數(shù)據(jù)

SubscribeOnObserver 收到 onNext

public void onNext(T t) {
   actual.onNext(t);
}

這樣就進(jìn)了 ObserveOnObserver 的 onNext

@Override
public void onNext(T t) {
    if (done) {
        return;
    }

    if (sourceMode != QueueDisposable.ASYNC) {
        queue.offer(t);
    }
    schedule();
}

把數(shù)據(jù)先加到上面創(chuàng)建的那個隊(duì)列里椭微。

void schedule() {
    // 如果之前是 0
    if (getAndIncrement() == 0) {
        // 這里就是切換接收數(shù)據(jù)的線程
        worker.schedule(this);
    }
}

線程執(zhí)行自己的 run 方法

@Override
public void run() {
    if (outputFused) {
        drainFused();
    } else {
        drainNormal();
    }
}
void drainNormal() {
   ...

   final SimpleQueue<T> q = queue;
   final Observer<? super T> a = actual;

   for (;;) {
       ...

       for (;;) {
           ...

           try {
               v = q.poll(); // 取出隊(duì)列里的值
           } catch (Throwable ex) {
               ...
           }
           ...

           a.onNext(v); // 發(fā)射給觀察者
       }

       ...
   }
}

// 這個暫時不清楚是何時調(diào)用
void drainFused() {
   ...

   for (;;) {
       ...

       actual.onNext(null); // 發(fā)射空值

       if (d) {
           ex = error;
           if (ex != null) {
               actual.onError(ex); // 發(fā)射事件
           } else {
               actual.onComplete();
           }
           worker.dispose();
           return;
       }

       ...
   }
}

最后將數(shù)據(jù)發(fā)射給最外層的觀察者,即這個 actual盲链,同時運(yùn)行在 observerOn 指定的線程上赏表。

mermaid Observable
mermaid Observer
plantuml
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市匈仗,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌逢慌,老刑警劉巖悠轩,帶你破解...
    沈念sama閱讀 211,194評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異攻泼,居然都是意外死亡火架,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,058評論 2 385
  • 文/潘曉璐 我一進(jìn)店門忙菠,熙熙樓的掌柜王于貴愁眉苦臉地迎上來何鸡,“玉大人,你說我怎么就攤上這事牛欢÷饽校” “怎么了?”我有些...
    開封第一講書人閱讀 156,780評論 0 346
  • 文/不壞的土叔 我叫張陵傍睹,是天一觀的道長隔盛。 經(jīng)常有香客問我,道長拾稳,這世上最難降的妖魔是什么吮炕? 我笑而不...
    開封第一講書人閱讀 56,388評論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮访得,結(jié)果婚禮上龙亲,老公的妹妹穿的比我還像新娘。我一直安慰自己悍抑,他們只是感情好鳄炉,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,430評論 5 384
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著传趾,像睡著了一般迎膜。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上浆兰,一...
    開封第一講書人閱讀 49,764評論 1 290
  • 那天磕仅,我揣著相機(jī)與錄音珊豹,去河邊找鬼。 笑死榕订,一個胖子當(dāng)著我的面吹牛店茶,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播劫恒,決...
    沈念sama閱讀 38,907評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼贩幻,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了两嘴?” 一聲冷哼從身側(cè)響起丛楚,我...
    開封第一講書人閱讀 37,679評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎憔辫,沒想到半個月后趣些,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,122評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡贰您,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,459評論 2 325
  • 正文 我和宋清朗相戀三年坏平,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片锦亦。...
    茶點(diǎn)故事閱讀 38,605評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡舶替,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出杠园,到底是詐尸還是另有隱情顾瞪,我是刑警寧澤,帶...
    沈念sama閱讀 34,270評論 4 329
  • 正文 年R本政府宣布返劲,位于F島的核電站玲昧,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏篮绿。R本人自食惡果不足惜孵延,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,867評論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望亲配。 院中可真熱鬧尘应,春花似錦、人聲如沸吼虎。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,734評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽思灰。三九已至玷犹,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間洒疚,已是汗流浹背歹颓。 一陣腳步聲響...
    開封第一講書人閱讀 31,961評論 1 265
  • 我被黑心中介騙來泰國打工坯屿, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人巍扛。 一個月前我還...
    沈念sama閱讀 46,297評論 2 360
  • 正文 我出身青樓领跛,卻偏偏與公主長得像,于是被迫代替她去往敵國和親撤奸。 傳聞我的和親對象是個殘疾皇子吠昭,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,472評論 2 348

推薦閱讀更多精彩內(nèi)容