RxJava2----切換上游線程

需要注意的分析點

1.裝飾模式 對 上一步observable.create生成的對象 進行包裝
2.NewThreadScheduler
3.RxThreadFactory--ThreadFactory
4.CachedWorkerPool---Runnable
5.createWorker----NewThreadWorker.
6.DisposeTask--Runnable

  1. w.schedule(task, delay, unit)在線程中執(zhí)行的scheduleActual(action, delayTime, unit, null);

一我纪、RxJava的線程調(diào)度

在RxJava中问拘,要指定上游事件觸發(fā)的線需要通過subscribeOn方法傳入schedulers.

        Observable observable = Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(@NonNull ObservableEmitter e) throws Exception {
                e.onNext(1);
                e.onComplete();
            }
        }).subscribeOn(Schedulers.newThread());

二、subscribleOn包裝事件

1.onAssembly是一個hock方法献丑,如果在subscribeOn前沒有使用其他操作符轉(zhuǎn)換悟民,那么就會返回一個new ObservableSubscribeOn<T>(this, scheduler)對象厕氨。

2.ObservableSubscribeOn<T>(this, scheduler)进每。this表示的是Observable.create對象創(chuàng)建的ObservableCreate對象
而scheduler就是我們的Schedulers.newThread()對象

3.ObservableCreate和ObservableSubscribeOn對象都繼承了Observable對象,這是典型的裝飾模式命斧。目的是對剛創(chuàng)建的Observable進行包裝進行包裝田晚。

所以subscribleOn返回的是一個ObservableSubscribeOn對象

   public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

三、ObservableSubscribeOn下發(fā)事件

RxJava的事件流下發(fā)是在發(fā)生訂閱事件后(subscribe)方法冯丙,而真正執(zhí)行下發(fā)的是subscribeActual方法(查看父類的subscribe方法)

1.在subscribeActual方法中肉瓦,首先對下游的observer進行了包裝

2.調(diào)用了下游observer的onSubscribe方法,所以這個方法是在主線程中調(diào)用的

3. SubscribeOnObserver是對下游observer進行包裝

4. SubscribeTask是一個Runnable胃惜,負(fù)責(zé)觸發(fā)上游observable對下游observer元素的訂閱(事件業(yè)務(wù)觸發(fā)及傳遞)泞莉。

5.那么scheduler.scheduleDirect(Runnable)方法一定是負(fù)責(zé)開啟線程的類,通過上面的代碼知道scheduler是NewThreadScheduler

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        //上游的onSubscribe方法仍然在主線程中
        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
   /**
     * 創(chuàng)建一個線程船殉,負(fù)責(zé)執(zhí)行subscribe方法鲫趁,subscribe方法內(nèi)部通常
     * 調(diào)用了onNext,onComplete等方法利虫,這樣上游的所有方法都會在一個新的線程中執(zhí)行
     */
    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;
        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }
        @Override
        public void run() {
            source.subscribe(parent);
        }
    }

四挨厚、Schedule的使用

1.NewThreadScheduler首先創(chuàng)建了一個RxThreadFactory的線程工廠
2.將ThreadFactory交給了Worker堡僻,Worker是實際執(zhí)行線程的地方
3.NewThreadWorker內(nèi)部通過ScheduledExecutorService來管理線程,ScheduledExecutorService是一個可以將周期性任務(wù)通過線程池來執(zhí)行的類疫剃,所以scheduleDirect傳入0毫秒钉疫,表示立即執(zhí)行
4.schedule調(diào)用worker的方法來執(zhí)行線程

 //表示立即執(zhí)行
 @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }
 父類的方法
   @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    //調(diào)用子類的方法創(chuàng)建worker
        final Worker w = createWorker();
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
     //對runnable進行包裝
        DisposeTask task = new DisposeTask(decoratedRun, w);
      //執(zhí)行線程
        w.schedule(task, delay, unit);
        return task;
    }
final ThreadFactory threadFactory;
    //創(chuàng)建線程時巢价,給線程追加的前綴
    private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler";
    private static final RxThreadFactory THREAD_FACTORY;

    /** The name of the system property for setting the thread priority for this Scheduler. */
    private static final String KEY_NEWTHREAD_PRIORITY = "rx2.newthread-priority";

    static {
        int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
                Integer.getInteger(KEY_NEWTHREAD_PRIORITY, Thread.NORM_PRIORITY)));
        THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);
    }
    public NewThreadScheduler() {
        this(THREAD_FACTORY);
    }
    public NewThreadScheduler(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
    }
    @NonNull
    @Override
    public Worker createWorker() {
        return new NewThreadWorker(threadFactory);
    }

RxThreadFactory:負(fù)責(zé)給創(chuàng)建的線程追加前綴,以及通過AtomicLong來管理創(chuàng)建的線程個數(shù)

/**
 * A ThreadFactory that counts how many threads have been created and given a prefix,
 * sets the created Thread's name to {@code prefix-count}.
 */
public final class RxThreadFactory extends AtomicLong implements ThreadFactory {
.....
    @Override
    public Thread newThread(Runnable r) {
        StringBuilder nameBuilder = new StringBuilder(prefix).append('-').append(incrementAndGet());
        String name = nameBuilder.toString();
        Thread t = nonBlocking ? new RxCustomThread(r, name) : new Thread(r, name);
        t.setPriority(priority);
        t.setDaemon(true);
        return t;
    }
....
}

NewThreadWorker內(nèi)部有一個ScheduledExecutorService來管理線程任務(wù),可以延時壤躲,立即或是周期性執(zhí)行
1.內(nèi)部有ScheduledExecutorService執(zhí)行周期性任務(wù)
2.scheduleActual是真正執(zhí)行線程的方法,這過程中也對runnable進行了一定的封裝

public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    private final ScheduledExecutorService executor;
    public NewThreadWorker(ThreadFactory threadFactory) {
        executor = SchedulerPoolFactory.create(threadFactory);
    }
  ....
//
       @NonNull
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }
        Future<?> f;
        try {
            if (delayTime <= 0) {
// 執(zhí)行任務(wù)
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }
        return sr;
    }
....
}

總結(jié)

subscribeOn方法會對上游創(chuàng)建的Obserable對象進行一次包裝碉克,當(dāng)完成對下游事件的訂閱時,會觸發(fā)它的subscribeActual方法漏麦,而這個發(fā)放內(nèi)部會啟動一個線程去觸發(fā)obserable的subscribe方法客税。這樣上游的所有事件都發(fā)生在指定的線程中了

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市撕贞,隨后出現(xiàn)的幾起案子霎挟,更是在濱河造成了極大的恐慌,老刑警劉巖麻掸,帶你破解...
    沈念sama閱讀 218,525評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異赐纱,居然都是意外死亡脊奋,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,203評論 3 395
  • 文/潘曉璐 我一進店門疙描,熙熙樓的掌柜王于貴愁眉苦臉地迎上來诚隙,“玉大人,你說我怎么就攤上這事起胰【糜郑” “怎么了?”我有些...
    開封第一講書人閱讀 164,862評論 0 354
  • 文/不壞的土叔 我叫張陵效五,是天一觀的道長地消。 經(jīng)常有香客問我,道長畏妖,這世上最難降的妖魔是什么脉执? 我笑而不...
    開封第一講書人閱讀 58,728評論 1 294
  • 正文 為了忘掉前任,我火速辦了婚禮戒劫,結(jié)果婚禮上半夷,老公的妹妹穿的比我還像新娘婆廊。我一直安慰自己,他們只是感情好巫橄,可當(dāng)我...
    茶點故事閱讀 67,743評論 6 392
  • 文/花漫 我一把揭開白布淘邻。 她就那樣靜靜地躺著,像睡著了一般湘换。 火紅的嫁衣襯著肌膚如雪宾舅。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,590評論 1 305
  • 那天枚尼,我揣著相機與錄音贴浙,去河邊找鬼。 笑死署恍,一個胖子當(dāng)著我的面吹牛崎溃,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播盯质,決...
    沈念sama閱讀 40,330評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼袁串,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了呼巷?” 一聲冷哼從身側(cè)響起囱修,我...
    開封第一講書人閱讀 39,244評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎王悍,沒想到半個月后破镰,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體压储,經(jīng)...
    沈念sama閱讀 45,693評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,885評論 3 336
  • 正文 我和宋清朗相戀三年孕似,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片喉祭。...
    茶點故事閱讀 40,001評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖泛烙,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情胶惰,我是刑警寧澤,帶...
    沈念sama閱讀 35,723評論 5 346
  • 正文 年R本政府宣布孵滞,位于F島的核電站中捆,受9級特大地震影響泄伪,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜匿级,卻給世界環(huán)境...
    茶點故事閱讀 41,343評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望津函。 院中可真熱鬧,春花似錦孤页、人聲如沸尔苦。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,919評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽蛾号。三九已至,卻和暖如春鲜结,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背精刷。 一陣腳步聲響...
    開封第一講書人閱讀 33,042評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留贬养,地道東北人琴庵。 一個月前我還...
    沈念sama閱讀 48,191評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像迷殿,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子庆寺,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,955評論 2 355

推薦閱讀更多精彩內(nèi)容