Rxjava(二)——線程切換怎么實(shí)現(xiàn)的挨队?

接上文Rxjava(一)——鏈?zhǔn)秸{(diào)用怎么實(shí)現(xiàn)的?
在分析線程切換原理前要明白幾個(gè)概念蒿往;

線程調(diào)用的關(guān)鍵操作符subscribeOn盛垦、observeOn

observeOn作用:影響后續(xù)操作符所在的線程,直到下個(gè)observeOn設(shè)置為其他線程瓤漏;
subscribeOn作用:初始化整個(gè)鏈條所在的線程腾夯,多次設(shè)置只有第一次生效;

線程調(diào)度器 Schedulers

Rxjava里面將常用線程歸納為4種蔬充,即有4 種調(diào)度器:

  • 主線程 AndroidSchedulers.mainThread();
  • io線程 Schedulers.io()俯在;
  • 計(jì)算線程 Schedulers.computation()
  • 新建線程 Schedulers.newThread()娃惯;

同樣以一個(gè)實(shí)例來進(jìn)行分析:

    Observable.just("a")
               .observeOn(Schedulers.computation())
                .map(new Func1<String, String>() {  //操作1
                    @Override
                    public String call(String s) {
                        System.out.print(Thread.currentThread().getName() + ":first--" + s +"\n");
                        return s + s;
                    }
                })
                 .observeOn(Schedulers.io())
                .map(new Func1<String, String>() { //操作2
                    @Override
                    public String call(String s) {
                        System.out.print(Thread.currentThread().getName() + ":second--" + s+"\n");
                        return s + s;
                    }
                })
                .subscribeOn(Schedulers.newThread())
                .subscribe(new Subscriber<String>() {//操作3
                    @Override
                    public void onCompleted() {
                        System.out.print(Thread.currentThread().getName()+"\n");
                        System.out.print("completed"+"\n");

                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.print("error");
                    }

                    @Override
                    public void onNext(String s) {
                        System.out.println(s);
                    }
                });

上述實(shí)例中跷乐,分別使用了computation線程、io線程和新線程趾浅,執(zhí)行代碼如下:


image.png

由于subscribeOn是在整個(gè)調(diào)用鏈之前愕提,其作用于整個(gè)鏈條,而observeOn只作用此它操作符之后皿哨,因此上圖結(jié)束RxComputationThreadPool-1 在計(jì)算線程中浅侨,而之后的全部都處理io()線程RxCachedThreadScheduler-1中。

正式開始擼代碼

帶著幾個(gè)問題來跟讀代碼:

  • 1 Observable.just("a")生成的Observable對(duì)象证膨,如何調(diào)用到計(jì)算線程中如输,線程切換通過什么實(shí)現(xiàn)的?
  • 2 為什么subscribeOn()是對(duì)整個(gè)調(diào)用鏈條起作用?

問題1:Observable.just("a")生成的Observable對(duì)象不见,如何調(diào)用到計(jì)算線程中澳化,線程切換通過什么實(shí)現(xiàn)的?

Observable.just("a").observeOn(Schedulers.computation())

由于just發(fā)送的單個(gè)對(duì)象稳吮,因此Observable使用的創(chuàng)建對(duì)象為ScalarSynchronousObservable缎谷;在其初始化對(duì)象時(shí),將"a"作為構(gòu)造參數(shù)傳入灶似,并保存列林。
observeOn操作符會(huì)首先對(duì)Observable的類型進(jìn)行檢測(cè),若為ScalarSynchronousObservable類型酪惭,則通過ScalarSynchronousObservable@scalarScheduleOn來實(shí)現(xiàn)在某個(gè)線程中調(diào)度的過程希痴;
跟進(jìn)ScalarSynchronousObservable類,

    public Observable<T> scalarScheduleOn(Scheduler scheduler) {
        if (scheduler instanceof EventLoopsScheduler) {
            EventLoopsScheduler es = (EventLoopsScheduler) scheduler;
            return create(new DirectScheduledEmission<T>(es, t));
        }
        return create(new NormalScheduledEmission<T>(scheduler, t));
    }

很顯然春感,使用DirectScheduledEmission润梯,通過call,完成計(jì)算線程池的直接調(diào)度甥厦。

  static final class DirectScheduledEmission<T> implements OnSubscribe<T> {
        private final EventLoopsScheduler es;
        private final T value;
        DirectScheduledEmission(EventLoopsScheduler es, T value) {
            this.es = es;
            this.value = value;
        }
        @Override
        public void call(final Subscriber<? super T> child) {
            child.add(es.scheduleDirect(new ScalarSynchronousAction<T>(child, value)));
        }
    }
    //EventLoopsScheduler.class
    public Subscription scheduleDirect(Action0 action) {
       PoolWorker pw = pool.get().getEventLoop();
       return pw.scheduleActual(action, -1, TimeUnit.NANOSECONDS);
    }
    public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
        Action0 decoratedAction = schedulersHook.onSchedule(action);
        ScheduledAction run = new ScheduledAction(decoratedAction);
        Future<?> f;
        if (delayTime <= 0) {
            f = executor.submit(run);//執(zhí)行動(dòng)作
        } else {
            f = executor.schedule(run, delayTime, unit);
        }
        run.add(f);

        return run;
    }

到此時(shí)纺铭,executor.submit(run);執(zhí)行,切換到對(duì)應(yīng)線程上完成Action的調(diào)用刀疙;而Action里面做的什么事舶赔, 找到實(shí)現(xiàn)方法,發(fā)現(xiàn)其call()方法就是將數(shù)據(jù)傳遞到下一層去而已:

    /** Action that emits a single value when called. */
    static final class ScalarSynchronousAction<T> implements Action0 {
        private final Subscriber<? super T> subscriber;
        private final T value;

        private ScalarSynchronousAction(Subscriber<? super T> subscriber,
                T value) {
            this.subscriber = subscriber;
            this.value = value;
        }

        @Override
        public void call() {
            try {
                subscriber.onNext(value);
            } catch (Throwable t) {
                subscriber.onError(t);
                return;
            }
            subscriber.onCompleted();
        }
    }

問題一到此谦秧,就分析得差不多了竟纳。

問題2: 為什么subscribeOn()是對(duì)整個(gè)調(diào)用鏈條起作用?

 return nest().lift(new OperatorSubscribeOn<T>(scheduler));

能對(duì)整個(gè)調(diào)用鏈起作用的關(guān)鍵點(diǎn)是nest()疚鲤;為什么這么說锥累,看其實(shí)現(xiàn)氛赐;

  public final Observable<Observable<T>> nest() {
        return just(this);
    }

在此败去,可以看到其實(shí)還是使用的just,但是挨摸,關(guān)鍵是發(fā)送的觀察者是this;發(fā)送this诲宇,就意味著subscribeOn所在的Observable對(duì)象發(fā)送了出去际歼。this所代表的對(duì)象將會(huì)作為一個(gè)嵌套鏈表嵌入到subscribeOn所產(chǎn)生的新的Observable調(diào)用鏈中;
那么this如何嵌入到新的Observable中的呢姑蓝?
查看OperatorSubscribeOn類源碼鹅心,關(guān)鍵代碼o.unsafeSubscribe

  @Override
    public Subscriber<? super Observable<T>> call(final Subscriber<? super T> subscriber) {
        final Worker inner = scheduler.createWorker();
        subscriber.add(inner);
        return new Subscriber<Observable<T>>(subscriber) {

            @Override
            public void onCompleted() {
                // ignore because this is a nested Observable and we expect only 1 Observable<T> emitted to onNext
            }

            @Override
            public void onError(Throwable e) {
                subscriber.onError(e);
            }

            @Override
            public void onNext(final Observable<T> o) {
                inner.schedule(new Action0() {

                    @Override
                    public void call() {
                        final Thread t = Thread.currentThread();
                        o.unsafeSubscribe(new Subscriber<T>(subscriber) {

                            @Override
                            public void onCompleted() {
                                subscriber.onCompleted();
                            }

                            @Override
                            public void onError(Throwable e) {
                                subscriber.onError(e);
                            }

                            @Override
                            public void onNext(T t) {
                                subscriber.onNext(t);
                            }

                            @Override
                            public void setProducer(final Producer producer) {
                                subscriber.setProducer(new Producer() {

                                    @Override
                                    public void request(final long n) {
                                        if (Thread.currentThread() == t) {
                                            // don't schedule if we're already on the thread (primarily for first setProducer call)
                                            // see unit test 'testSetProducerSynchronousRequest' for more context on this
                                            producer.request(n);
                                        } else {
                                            inner.schedule(new Action0() {

                                                @Override
                                                public void call() {
                                                    producer.request(n);
                                                }
                                            });
                                        }
                                    }

                                });
                            }

                        });
                    }
                });
            }

        };
    }
}

此處的o對(duì)象就是nest發(fā)送的this纺荧,通過unsafeSubscribe函數(shù)旭愧,重新形成調(diào)用鏈颅筋;
此處在執(zhí)行的同時(shí)還存在線程切換,即inner.schedule输枯。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末议泵,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子用押,更是在濱河造成了極大的恐慌肢簿,老刑警劉巖靶剑,帶你破解...
    沈念sama閱讀 217,542評(píng)論 6 504
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件蜻拨,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡桩引,警方通過查閱死者的電腦和手機(jī)缎讼,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,822評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來坑匠,“玉大人血崭,你說我怎么就攤上這事±遄疲” “怎么了夹纫?”我有些...
    開封第一講書人閱讀 163,912評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)设凹。 經(jīng)常有香客問我舰讹,道長(zhǎng),這世上最難降的妖魔是什么闪朱? 我笑而不...
    開封第一講書人閱讀 58,449評(píng)論 1 293
  • 正文 為了忘掉前任月匣,我火速辦了婚禮,結(jié)果婚禮上奋姿,老公的妹妹穿的比我還像新娘锄开。我一直安慰自己,他們只是感情好称诗,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,500評(píng)論 6 392
  • 文/花漫 我一把揭開白布萍悴。 她就那樣靜靜地躺著,像睡著了一般寓免。 火紅的嫁衣襯著肌膚如雪退腥。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,370評(píng)論 1 302
  • 那天再榄,我揣著相機(jī)與錄音狡刘,去河邊找鬼。 笑死困鸥,一個(gè)胖子當(dāng)著我的面吹牛嗅蔬,可吹牛的內(nèi)容都是我干的剑按。 我是一名探鬼主播,決...
    沈念sama閱讀 40,193評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼澜术,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼艺蝴!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起鸟废,我...
    開封第一講書人閱讀 39,074評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤猜敢,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后盒延,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體缩擂,經(jīng)...
    沈念sama閱讀 45,505評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,722評(píng)論 3 335
  • 正文 我和宋清朗相戀三年添寺,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了胯盯。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,841評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡计露,死狀恐怖博脑,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情票罐,我是刑警寧澤叉趣,帶...
    沈念sama閱讀 35,569評(píng)論 5 345
  • 正文 年R本政府宣布,位于F島的核電站该押,受9級(jí)特大地震影響疗杉,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜沈善,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,168評(píng)論 3 328
  • 文/蒙蒙 一乡数、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧闻牡,春花似錦净赴、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,783評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至割以,卻和暖如春金度,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背严沥。 一陣腳步聲響...
    開封第一講書人閱讀 32,918評(píng)論 1 269
  • 我被黑心中介騙來泰國(guó)打工猜极, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人消玄。 一個(gè)月前我還...
    沈念sama閱讀 47,962評(píng)論 2 370
  • 正文 我出身青樓跟伏,卻偏偏與公主長(zhǎng)得像丢胚,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子受扳,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,781評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容

  • 前言我從去年開始使用 RxJava 携龟,到現(xiàn)在一年多了。今年加入了 Flipboard 后勘高,看到 Flipboard...
    占導(dǎo)zqq閱讀 9,164評(píng)論 6 151
  • 我從去年開始使用 RxJava 峡蟋,到現(xiàn)在一年多了。今年加入了 Flipboard 后华望,看到 Flipboard 的...
    Jason_andy閱讀 5,472評(píng)論 7 62
  • 文章轉(zhuǎn)自:http://gank.io/post/560e15be2dca930e00da1083作者:扔物線在正...
    xpengb閱讀 7,032評(píng)論 9 73
  • 最近項(xiàng)目里面有用到Rxjava框架立美,感覺很強(qiáng)大的巨作匿又,所以在網(wǎng)上搜了很多相關(guān)文章方灾,發(fā)現(xiàn)一片文章很不錯(cuò)建蹄,今天把這篇文...
    Scus閱讀 6,878評(píng)論 2 50
  • 花兒所有的美好,來自對(duì)儀式感的追求裕偿,每一步巧妙的安排洞慎,既包含了花蕾的舒張,也包括了花粉的散播嘿棘。 藝術(shù)真正的價(jià)值劲腿,出...
    文心訪藝閱讀 227評(píng)論 0 0