RxJava系列_02線程調(diào)度

  • 1簸州、關(guān)于線程調(diào)度的例子, 就只用過(guò)幾個(gè)操作符, 所以只針對(duì)這幾個(gè)操作符進(jìn)行源碼閱讀;
  • 2、關(guān)于線程調(diào)度, 有下面幾個(gè)api需要分析:
Schedulers.newThread();
AndroidSchedulers.mainThread()
Schedulers.io();
  • 3歧譬、關(guān)于newThread與io, 是如何操作線程池?
  • 4岸浑、demo1講Schedulers.newThread(), demo2講Schedulers.io()
  • 5、切記一句話, 一旦看暈了, 趕緊翻到最后結(jié)合流程圖嘗試對(duì)當(dāng)前片段的理解;

demo1 :

Observable
        .create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                LogUtils.log(Note01.class, "subscribe()->ThreadName:" + Thread.currentThread().getName());
                emitter.onNext(1);
                emitter.onComplete();
            }
        })
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                LogUtils.log(Note01.class, "onSubscribe()->ThreadName:" + Thread.currentThread().getName());
            }

            @Override
            public void onNext(Integer value) {
                LogUtils.log(Note01.class, "onNext()->ThreadName:" + Thread.currentThread().getName());
            }

            @Override
            public void onError(Throwable e) {
                LogUtils.log(Note01.class, "onError()->ThreadName:" + Thread.currentThread().getName());
            }

            @Override
            public void onComplete() {
                LogUtils.log(Note01.class, "onComplete()->ThreadName:" + Thread.currentThread().getName());
            }
        });
  • 打印結(jié)果:
04-24 21:05:57.418 3141-3141/ Note01->onSubscribe()->ThreadName:main
04-24 21:05:57.418 3141-3241/ Note01->subscribe()->ThreadName:RxNewThreadScheduler-1
04-24 21:05:57.418 3141-3141/ Note01->onNext()->ThreadName:main
04-24 21:05:57.418 3141-3141/ Note01->onComplete()->ThreadName:main

一瑰步、Schedulers.newThread:

1.1 Schedulers.newThread
public final class Schedulers {
    static final Scheduler NEW_THREAD;

    static {
        NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new Callable<Scheduler>() {
            @Override
            public Scheduler call() throws Exception {
                return NewThreadHolder.DEFAULT;
            }
        });
    }

    public static Scheduler newThread() {
        return NEW_THREAD;
    }

    static final class NewThreadHolder {
        static final Scheduler DEFAULT = NewThreadScheduler.instance();
    }
}
public final class NewThreadScheduler extends Scheduler {

    public static NewThreadScheduler instance() {
        return INSTANCE;
    }

    private static final NewThreadScheduler INSTANCE = new NewThreadScheduler();
}
  • 主要是構(gòu)建Schedules的實(shí)例, 實(shí)際指向NewThreadScheduler, 給這里的Schedulers打算標(biāo)簽,Schedulers_1(NewThreadScheduler);
1.2 Observable.subscribeOn:
public abstract class Observable<T> implements ObservableSource<T> {
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        return new ObservableSubscribeOn<T>(this, scheduler);
    }
}

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    /** 
     * 1. ObservableSubscribeOn持有Observable_1(ObservableCreate)的引用, 
     *    ObservableSubscribeOn持有Scheduler_1(NewThreadScheduler)的引用, 繼續(xù)模塊<1.4>;
     * 2. 給此處返回的ObservableSubscribeOn打上標(biāo)簽Observable_2(ObservableSubscribeOn);
     */
    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }
}
1.3 AndroidSchedulers.mainThread:
public final class AndroidSchedulers {

    private static final class MainHolder {

        static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    }

    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call() throws Exception {
                    return MainHolder.DEFAULT;
                }
            });

    public static Scheduler mainThread() {
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }
}
  • 創(chuàng)建一個(gè)持有主線程Handler的HandlerScheduler實(shí)例, 給此處的HandlerScheduler打上標(biāo)簽, Schedulers_2(HandlerScheduler);
1.4 Observable.observeOn:
public abstract class Observable<T> implements ObservableSource<T> {
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        /**
         * 1. 給此處構(gòu)造的實(shí)例ObservableObserveOn打上標(biāo)簽為Observable_3(ObservableObserveOn),
         * 而這里的this指向的是Observable_2(ObservableSubscribeOn);
         * 2. 同時(shí)Observable_3(ObservableObserveOn)持有Schedulers_2(HandlerScheduler)的引用;
         */
        return new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize);
    }
}

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
    }
}
  • 1.1 ~ 1.4僅僅是進(jìn)行了初始化實(shí)例的操作, 很關(guān)鍵的一點(diǎn)是每次調(diào)用都會(huì)返回一個(gè)Observable對(duì)象, 該Observable對(duì)象會(huì)持有前一個(gè)Observable的引用, 這點(diǎn)也是RxJava鏈?zhǔn)秸{(diào)用的一個(gè)核心;
1.5 Observable.subscribe:
1.6 Observable3(ObservableObserveOn).subscribe:
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Observer<? super T> actual;
    protected final ObservableSource<T> source;
    /**
     * Observable3(ObservableObserveOn).subscribe最終會(huì)觸發(fā)subscribeActual的執(zhí)行;
     */
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        Scheduler.Worker w = scheduler.createWorker();
        /**
         * 1. 然后傳入Observer的實(shí)例, 此處給Observer打上標(biāo)簽Observer_1(Observer);
         * 2. 然后將Observer_1(Observer)與w(HandlerScheduler)封裝進(jìn)ObserveOnObserver,
         *    給此處的ObserveOnObserver打上標(biāo)簽Observer_2(ObserveOnObserver);
         * 3. 然后通過(guò)subscribe將Observer_2(ObserveOnObserver)傳給Observable2(ObservableObserveOn);
         */
        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}

final class HandlerScheduler extends Scheduler {
    private final Handler handler;

    HandlerScheduler(Handler handler) {
        this.handler = handler;
    }
    @Override
    public Worker createWorker() {
        return new HandlerWorker(handler);
    }
}
1.7 Observable_2(ObservableSubscribeOn).subscribeActual:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    @Override
    public void subscribeActual(final Observer<? super T> s) {
        /**
         * 1. 給此處的parent打上標(biāo)簽Observer_3(SubscribeOnObserver);
         * 2. Observer_3(SubscribeOnObserver)持有Observer_2(ObserveOnObserver)的引用;
         * 3. Observer_2(ObserveOnObserver)通過(guò)onSubscribe持有
         *    Observer_3(SubscribeOnObserver)的引用, 又是一個(gè)相互持有的過(guò)程;
         */
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        
        /**
         * Observer_2(ObserveOnObserver)通過(guò)onSubscribe持有
         * Observer_3(SubscribeOnObserver)的引用, 又是一個(gè)相互持有的過(guò)程進(jìn)入到模塊<1.8>;
         */
        s.onSubscribe(parent);
        /**
         * 1. 此處的schedule實(shí)際為Scheduler_1(NewThreadScheduler), 進(jìn)入模塊<1.9>中;
         */
        scheduler.scheduleDirect(new Runnable() {
            @Override
            public void run() {
                /**
                 * 1. 通過(guò)對(duì)模塊<1.9>的分析可知, source.subscribe(parent)運(yùn)行在子線程中;
                 * 2. 此處的source指向Observable_1(ObservableCreate), 通過(guò)subscribe, 
                 *    Observable_1(ObservableCreate)持有parent即Observer_3(SubscribeOnObserver)
                 *    的引用, 然后進(jìn)入模塊<1.10>;
                 */
                source.subscribe(parent);
            }
        });
    }

    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

        final Observer<? super T> actual;

        final AtomicReference<Disposable> s;

        SubscribeOnObserver(Observer<? super T> actual) {
            this.actual = actual;
            this.s = new AtomicReference<Disposable>();
        }
}
1.8 Observer_2(ObserverOnObserver).onSubscribe:
static final class ObserverOnObserver<T> implements Observer<T>, Runnable {

    final Observer<? super T> actual;
    final Scheduler.Worker worker;
    Disposable s;
    /**
     * 1. 此處的ObserverOnObserver為Observer_2(ObserverOnObserver), 在模塊<1.6>中被創(chuàng)建;
     * 2. 有模塊<1.6>可知, 此處的actual指向Observer_1(Observer);
     * 3. 而worker指向了Worker(HandlerScheduler);
     */
    ObserverOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) { 
        this.actual = actual;
        this.worker = worker;
    }

    @Override
    public void onSubscribe(Disposable s) {
        /**
         * 類似于遞歸的方式, 依次往上調(diào)用, 直到調(diào)用到Observer1.onSubscribe為止;
         * 到目前為止還沒(méi)有發(fā)現(xiàn)有子線程的跡象, 所以此時(shí)Observer.onSubscribe(Dispose)運(yùn)行在主線程;
         */
        actual.onSubscribe(this);
    }
}
1.9 NewThreadScheduler.scheduleDirect:
public abstract class NewThreadScheduler {
    public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        w.schedule(new Runnable() {
            @Override
            public void run() {
                try {
                    decoratedRun.run();
                } finally {
                    w.dispose();
                }
            }
        }, delay, unit);

        return w;
    }

    @Override
    public Worker createWorker() {
        return new NewThreadWorker(THREAD_FACTORY);
    }
}

public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    private final ScheduledExecutorService executor;

    volatile boolean disposed;

    public NewThreadWorker(ThreadFactory threadFactory) {
        executor = SchedulerPoolFactory.create(threadFactory);
    }
}
  • 創(chuàng)建線程池, 使run運(yùn)行在子線程中;
1.10 Observable_1(ObservableCreate).subscribe:
public final class ObservableCreate<T> extends Observable<T> {
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        /**
         * 1. 此處的observer實(shí)際為Observer_3(SubscribeOnObserver);
         * 2. source實(shí)際為ObservableOnSubscribe, 通過(guò)subscribe持有CreateEmitter的引用;
         */ 
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);
        source.subscribe(parent);
    }
}
  • 目前算是完成了觀察者觀察事件的注冊(cè), 接下來(lái)被觀察者通過(guò)CreateEmitter發(fā)送幾個(gè)事件看看會(huì)發(fā)生些什么;
1.11 CreateEmitter.onNext:
static final class CreateEmitter<T> extends AtomicReference<Disposable>  implements ObservableEmitter<T>, Disposable {

    final Observer<? super T> observer;

    CreateEmitter(Observer<? super T> observer) {
        this.observer = observer;
    }

    @Override
    public void onNext(T t) {
        if (!isDisposed()) {
            /**
             * 為Observer_3(SubscribeOnObserver)觸發(fā)onNext的執(zhí)行, 進(jìn)入到模塊<1.12>;
             */
            observer.onNext(t);
        }
    }
}
1.12 Observer_3(SubscribeOnObserver).onNext:
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

    final Observer<? super T> actual;

    final AtomicReference<Disposable> s;

    @Override
     public void onNext(T t) {
        /**
         * Observer_3(SubscribeOnObserver)在模塊<1.7>進(jìn)行初始化, 然后傳入Observer_2(ObserveOnObserver)
         * 使actual指向Observer_2(ObserveOnObserver);
         */
        actual.onNext(t);
    }
}
1.13 Observer_2(ObserveOnObserver).onNext:
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable {
    @Override
    public void onNext(T t) {
        if (done) {
            return;
        }
        schedule();
    }

    void schedule() {
        if (getAndIncrement() == 0) {
           /**
            * Observer_2(ObserveOnObserver)在模塊<1.6>處被初始化, 并且傳入HandlerWorker使worker
            * 指向worker, 然后觸發(fā)Observer_2(ObserveOnObserver)的run方法的執(zhí)行;
            */
            worker.schedule(this);
        }
    }
    /**
     * 通過(guò)閱讀HandlerWorker源碼可知, HandlerWorker通過(guò)schedule將run運(yùn)行在主線程中;
     */
    @Override
    public void run() {
        if (outputFused) {
            drainFused();
        } else {
            drainNormal();
        }
    }
    void drainNormal() {
        /**
         * actual指向的是Observer_1(Observer);
         */
        actual.onNext(v);
    }
}

關(guān)于demo1的流程圖:

關(guān)于demo1的流程圖
  • 1矢洲、關(guān)于上面的流程大致可以看清楚RxJava的套路了, 首先是Observable<N>持有Observable<N-1>的引用, 然后在Observable<N>的subscribe中傳入Observer1, 并創(chuàng)建Observer2, Observer2持有Observer1的引用, 然后將Observer2傳給Observable<N-1>, 采用遞歸的方式, 直到Observable<1>持有Observer<N>的引用為止, 此時(shí)將Observer<N>傳給CreateEmitter, 然后當(dāng)執(zhí)行CreateEmitter.onNext事件時(shí), Observer.onNext的方式為Observer<N> ---> Observer<1>;
  • 2、在1的總結(jié)中也可以看出來(lái), Observable與Observer的索引為逆序持有;

demo2:

Observable
        .create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                LogUtils.log(Note01.class, "subscribe()->ThreadName:" + Thread.currentThread().getName());
                emitter.onNext(1);
                emitter.onComplete();
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {
                LogUtils.log(Note01.class, "onSubscribe()->ThreadName:" + Thread.currentThread().getName());
            }

            @Override
            public void onNext(Object value) {
                LogUtils.log(Note01.class, "onNext()->ThreadName:" + Thread.currentThread().getName());
            }

            @Override
            public void onError(Throwable e) {
                LogUtils.log(Note01.class, "onError()->ThreadName:" + Thread.currentThread().getName());
            }

            @Override
            public void onComplete() {
                LogUtils.log(Note01.class, "onComplete()->ThreadName:" + Thread.currentThread().getName());
            }
       });

關(guān)于demo2, 只分析Scheduler.io, 其他的在demo1里面已經(jīng)進(jìn)行了分析, demo1Schedulers.newThread每次都會(huì)創(chuàng)建一個(gè)新的線程池;

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末面氓,一起剝皮案震驚了整個(gè)濱河市兵钮,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌舌界,老刑警劉巖掘譬,帶你破解...
    沈念sama閱讀 219,039評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異呻拌,居然都是意外死亡葱轩,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,426評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門藐握,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)靴拱,“玉大人,你說(shuō)我怎么就攤上這事猾普⊥嗫唬” “怎么了?”我有些...
    開封第一講書人閱讀 165,417評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵初家,是天一觀的道長(zhǎng)偎窘。 經(jīng)常有香客問(wèn)我污它,道長(zhǎng)哪审,這世上最難降的妖魔是什么国拇? 我笑而不...
    開封第一講書人閱讀 58,868評(píng)論 1 295
  • 正文 為了忘掉前任础锐,我火速辦了婚禮镇辉,結(jié)果婚禮上靠瞎,老公的妹妹穿的比我還像新娘逻悠。我一直安慰自己俏站,他們只是感情好志笼,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,892評(píng)論 6 392
  • 文/花漫 我一把揭開白布沿盅。 她就那樣靜靜地躺著把篓,像睡著了一般。 火紅的嫁衣襯著肌膚如雪嗡呼。 梳的紋絲不亂的頭發(fā)上纸俭,一...
    開封第一講書人閱讀 51,692評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音南窗,去河邊找鬼揍很。 笑死,一個(gè)胖子當(dāng)著我的面吹牛万伤,可吹牛的內(nèi)容都是我干的窒悔。 我是一名探鬼主播,決...
    沈念sama閱讀 40,416評(píng)論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼敌买,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼简珠!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起虹钮,我...
    開封第一講書人閱讀 39,326評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤聋庵,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后芙粱,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體祭玉,經(jīng)...
    沈念sama閱讀 45,782評(píng)論 1 316
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,957評(píng)論 3 337
  • 正文 我和宋清朗相戀三年春畔,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了脱货。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,102評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡律姨,死狀恐怖振峻,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情择份,我是刑警寧澤扣孟,帶...
    沈念sama閱讀 35,790評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站荣赶,受9級(jí)特大地震影響凤价,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜讯壶,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,442評(píng)論 3 331
  • 文/蒙蒙 一料仗、第九天 我趴在偏房一處隱蔽的房頂上張望湾盗。 院中可真熱鬧伏蚊,春花似錦、人聲如沸格粪。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,996評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至比伏,卻和暖如春胜卤,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背赁项。 一陣腳步聲響...
    開封第一講書人閱讀 33,113評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工葛躏, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人悠菜。 一個(gè)月前我還...
    沈念sama閱讀 48,332評(píng)論 3 373
  • 正文 我出身青樓舰攒,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親悔醋。 傳聞我的和親對(duì)象是個(gè)殘疾皇子摩窃,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,044評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容