RxJava線程切換-源碼解析(二)

首先明確一點(diǎn)線程切換一定需要Handler的參與

線程切換的代碼最簡單的做法大概如下代碼

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "subscribe" + Thread.currentThread());
            }

            // ObservableOnSubscribe 運(yùn)行在工作線程
        }).subscribeOn(Schedulers.newThread()).
                // 觀察者運(yùn)行在主線程 這段代碼獲取了主線程的handler呀狼,之后的線程切換就需要他的參與
                observeOn(AndroidSchedulers.mainThread()).
                subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe" + Thread.currentThread());
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "" + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "error");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "complete");
                    }
                });

subscribeOn(Schedulers.newThread()).代碼指定ObservableOnSubscribe運(yùn)行在工作線程
看下代碼怎么實(shí)現(xiàn)?

public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        // 返回的還是 ObservableSubscribeOn(是Observable 的子類)對象 也是調(diào)用里面的subscribeActual示惊,默認(rèn)是調(diào)用ObservableCreate 的subscribeActual方法
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

然后調(diào)用subscribe

public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

里面可以調(diào)用subscribeActual方法

public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        // s代表注冊的Obersver還是直接執(zhí)行
        s.onSubscribe(parent);
        // 這次沒有直接執(zhí)行代碼寫的subscribe方法 而是setDisposable
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

看下SubscribeTask類 實(shí)現(xiàn)了Runnable接口

final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            // 在工作線程執(zhí)行上游subscribe
            source.subscribe(parent);
        }
    }

接著往下看 就是一些線程池 線程管理的事情

    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }

    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
       
        final Worker w = createWorker();
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); 
        DisposeTask task = new DisposeTask(decoratedRun, w);
        w.schedule(task, delay, unit);
        return task;
    }

    public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }
            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }

public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }

        Future<?> f;
        try {
            if (delayTime <= 0) {
                // 在這提交到了線程池
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }
        return sr;
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末耿币,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子钱反,更是在濱河造成了極大的恐慌址儒,老刑警劉巖演训,帶你破解...
    沈念sama閱讀 217,509評論 6 504
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件蜒秤,死亡現(xiàn)場離奇詭異汁咏,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)作媚,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,806評論 3 394
  • 文/潘曉璐 我一進(jìn)店門攘滩,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人纸泡,你說我怎么就攤上這事漂问。” “怎么了女揭?”我有些...
    開封第一講書人閱讀 163,875評論 0 354
  • 文/不壞的土叔 我叫張陵蚤假,是天一觀的道長。 經(jīng)常有香客問我吧兔,道長勤哗,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,441評論 1 293
  • 正文 為了忘掉前任掩驱,我火速辦了婚禮,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘欧穴。我一直安慰自己民逼,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,488評論 6 392
  • 文/花漫 我一把揭開白布涮帘。 她就那樣靜靜地躺著拼苍,像睡著了一般。 火紅的嫁衣襯著肌膚如雪调缨。 梳的紋絲不亂的頭發(fā)上疮鲫,一...
    開封第一講書人閱讀 51,365評論 1 302
  • 那天,我揣著相機(jī)與錄音弦叶,去河邊找鬼俊犯。 笑死,一個(gè)胖子當(dāng)著我的面吹牛伤哺,可吹牛的內(nèi)容都是我干的燕侠。 我是一名探鬼主播,決...
    沈念sama閱讀 40,190評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼立莉,長吁一口氣:“原來是場噩夢啊……” “哼绢彤!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起蜓耻,我...
    開封第一講書人閱讀 39,062評論 0 276
  • 序言:老撾萬榮一對情侶失蹤茫舶,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后刹淌,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體饶氏,經(jīng)...
    沈念sama閱讀 45,500評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,706評論 3 335
  • 正文 我和宋清朗相戀三年芦鳍,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了嚷往。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,834評論 1 347
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡柠衅,死狀恐怖皮仁,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情菲宴,我是刑警寧澤贷祈,帶...
    沈念sama閱讀 35,559評論 5 345
  • 正文 年R本政府宣布,位于F島的核電站喝峦,受9級特大地震影響势誊,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜谣蠢,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,167評論 3 328
  • 文/蒙蒙 一粟耻、第九天 我趴在偏房一處隱蔽的房頂上張望查近。 院中可真熱鬧,春花似錦挤忙、人聲如沸霜威。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,779評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽戈泼。三九已至,卻和暖如春赏僧,著一層夾襖步出監(jiān)牢的瞬間大猛,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,912評論 1 269
  • 我被黑心中介騙來泰國打工淀零, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留挽绩,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,958評論 2 370
  • 正文 我出身青樓窑滞,卻偏偏與公主長得像琼牧,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子哀卫,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,779評論 2 354