Rxjava2.1 線程切換原理解析

一坠七、前提說(shuō)明

本文是在 Rxjava 2.1 的基礎(chǔ)上進(jìn)行的,目前只對(duì) Rxjava 進(jìn)行解析,未搭配 Retrofit 食用舔示,如果想看 Rxjava + Retrofit 源碼解析,請(qǐng)移步 Retrofit 2.1 + Rxjava 源碼解析(一)电抚。

二惕稻、Rxjava 使用栗子

new Thread("子線程"){
          @Override
          public void run() {
              Observable.create(new ObservableOnSubscribe<String>() {
                          @Override
                          public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                              Log.e(TAG, "Observable#subscribe(): 所在線程為 " + Thread.currentThread().getName());
                              emitter.onNext("1");
                              emitter.onComplete();
                          }
                      })
//              .subscribeOn(Schedulers.io())
                      .observeOn(Schedulers.io())
                      .subscribe(new Observer<String>() {
                          @Override
                          public void onSubscribe(Disposable d) {
                              Log.e(TAG, "observer#onSubscribe(): 所在線程為 " + Thread.currentThread().getName());
                          }

                          @Override
                          public void onNext(String s) {
                              Log.e(TAG, "observer#onNext(): 所在線程為 " + Thread.currentThread().getName());
                          }

                          @Override
                          public void onError(Throwable e) {
                          }

                          @Override
                          public void onComplete() {
                              Log.e(TAG, "observer#onComplete(): 所在線程為 " + Thread.currentThread().getName());
                          }
                      });
          }
      }.start();

輸出結(jié)果:

E/Rxjava: observer#onSubscribe(): 所在線程為 子線程
E/Rxjava: Observable#subscribe(): 所在線程為 子線程
E/Rxjava: observer#onNext(): 所在線程為 RxCachedThreadScheduler-1
E/Rxjava: observer#onComplete(): 所在線程為 RxCachedThreadScheduler-1

Rxjava2.1訂閱流程解析 中我們已經(jīng)分析了 Observable.create() 的過程,就是構(gòu)建一個(gè) ObservableCreate 對(duì)象蝙叛,ObservableCreate 是 Observable 的子類俺祠。

由上文可以知道,當(dāng)調(diào)用了 subscribe() 后,會(huì)執(zhí)行以下順序:Observable.subscribe(Observer) -> ObservableCreate.subscribeActual(Observer) -> Observer#onSubscribe()蜘渣,所以可以知道 Observer#onSubscribe() 的執(zhí)行線程是當(dāng)前線程妓布,即調(diào)用了 subscribe()的線程。

三宋梧、Observable.observeOn(Schedulers.io())

從上面栗子可以看到匣沼,如果我們只是調(diào)用了 observeOn(Schedulers.io()),這樣影響的是 observer 的 onNext() 和 onComplete()捂龄,對(duì)于 ObservableOnSubscribe#subscribe() 和 Observer#onSubscribe() 是沒有影響的释涛。

我們看看 Observable.observeOn(Schedulers.io()) 的源碼:

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        //刪除無(wú)關(guān)緊要的代碼
        //這里的 this 是 ObservableCreate 對(duì)象
        return new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize);
    }

可以看到就是將傳入的 ObservableCreate 對(duì)象封裝進(jìn)了 ObservableObserveOn 對(duì)象中,可以肯定的是 ObservableObserveOn 也是 Observable 的子類倦沧。

我們從上文得知唇撬,接下來(lái)會(huì)調(diào)用 observable.subscribe(observer) 的時(shí)候會(huì)跳轉(zhuǎn)調(diào)用 Observable 子類的 ObservableObserveOn.subscribeActual(observer) 方法。這其實(shí)是用了靜態(tài)工廠模式展融。

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {

    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
         // 如果傳入的 scheduler 是 Scheduler.trampoline() 的情況
        // 該線程的意義是傳入當(dāng)前線程窖认,也就是不做任何線程切換操作
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();
            //這里的 source 是 ObservableCreate 對(duì)象
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
}

當(dāng)需要切換線程的時(shí)候,可以看到將傳進(jìn)來(lái)的 ObservableCreate 對(duì)象進(jìn)行了訂閱告希,只不過觀察者又被封裝成了 ObserveOnObserver 對(duì)象扑浸。這樣就會(huì)執(zhí)行 ObservableCreate#subscribeActual()

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        //這里的 observer 就是 ObserveOnObserver 對(duì)象
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        //這里的 observer 就是 ObserveOnObserver 對(duì)象
        observer.onSubscribe(parent);

        try {
            //這里的額 source 就是我們?cè)谧钔鈱觿?chuàng)建的 ObservableOnSubscribe 對(duì)象
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
}

這里可以看到使用了 observeOn(Schedulers.io()) 方法燕偶,但是 Observer#onSubscribe(Disposable d) 并沒有切換線程喝噪,仍在當(dāng)前線程中運(yùn)行。也就是 ObserveOnObserver.onSubscribe() 是運(yùn)行在當(dāng)前線程的指么。我們看看這個(gè)方法做了什么:

static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable {

        private static final long serialVersionUID = 6576896619930983584L;
        //真正的觀察者酝惧,最外層我們創(chuàng)建的 observer
        final Observer<? super T> actual;
        final Scheduler.Worker worker;

        Disposable s;

        ......
        
        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.actual = actual;
            this.worker = worker;
        }

        @Override
        public void onSubscribe(Disposable s) {
            if (DisposableHelper.validate(this.s, s)) {
                this.s = s;
                if (s instanceof QueueDisposable) {
                    @SuppressWarnings("unchecked")
                    QueueDisposable<T> qd = (QueueDisposable<T>) s;

                    int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);

                    if (m == QueueDisposable.SYNC) {
                        sourceMode = m;
                        queue = qd;
                        done = true;
                        //執(zhí)行真正的被觀察者 Observer(最外層我們創(chuàng)建的 observer)#onSubscribe()
                        actual.onSubscribe(this);
                        schedule();
                        return;
                    }
                    if (m == QueueDisposable.ASYNC) {
                        sourceMode = m;
                        queue = qd;
                        //執(zhí)行真正的被觀察者 Observer(最外層我們創(chuàng)建的 observer)#onSubscribe()
                        actual.onSubscribe(this);
                        return;
                    }
                }

                queue = new SpscLinkedArrayQueue<T>(bufferSize);
                //執(zhí)行真正的被觀察者 Observer(最外層我們創(chuàng)建的 observer)#onSubscribe()
                actual.onSubscribe(this);
            }
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        }

        @Override
        public void onError(Throwable t) {
            if (done) {
                RxJavaPlugins.onError(t);
                return;
            }
            error = t;
            done = true;
            schedule();
        }

        @Override
        public void onComplete() {
            if (done) {
                return;
            }
            done = true;
            schedule();
        }

        @Override
        public void dispose() {
            if (!cancelled) {
                cancelled = true;
                s.dispose();
                worker.dispose();
                if (getAndIncrement() == 0) {
                    queue.clear();
                }
            }
        }
    
    void schedule() {
            if (getAndIncrement() == 0) {
                //this 就是 ObserveOnObserver 對(duì)象
                worker.schedule(this);
            }
        }
}

可以看到 ObserveOnObserver#onSubscribe(Disposable s) 中一定會(huì)調(diào)用 actual.onSubscribe(this);,其中這個(gè) this 就是 ObserveOnObserver 對(duì)象伯诬,也就是讓我們最外層的 observer 訂閱了 ObserveOnObserver晚唇。

可以看到在 RxJava 中運(yùn)用的操作符都會(huì)在內(nèi)部創(chuàng)建一個(gè) Observable 和 Observer,所以外界使用起來(lái)和簡(jiǎn)單盗似,但是里面運(yùn)行的原理倒是挺復(fù)雜的哩陕,容易讓人混淆。

運(yùn)行完 ObserveOnObserver#onSubscribe(Disposable s) 后桥言,就輪到了 source.subscribe(parent);(這里的額 source 就是我們?cè)谧钔鈱觿?chuàng)建的 ObservableOnSubscribe 對(duì)象)萌踱,也就是說(shuō)我們的 ObservableOnSubscribe#subscribe(emitter) 運(yùn)行在當(dāng)前線程葵礼。到這里的分析都很符合我們打印的結(jié)果号阿。

而我們?cè)谧钔鈱樱皇亲尠l(fā)射器 emitter 簡(jiǎn)單地發(fā)送了一個(gè) Next 事件鸳粉。這個(gè)事件會(huì)被誰(shuí)接收呢扔涧?

static final class CreateEmitter<T> extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {
    
        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            //這里的 observer 就是 ObserveOnObserver 對(duì)象
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            //這里的 observer 就是 ObserveOnObserver 對(duì)象
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }
        
        ......
}

我們從之前調(diào)用到 ObservableCreate#subscribeActual() 可以知道,當(dāng)時(shí)傳進(jìn)來(lái)的 parent 是 ObserveOnObserver 對(duì)象。所以發(fā)射器 emitter 發(fā)射的事件會(huì)被 ObserveOnObserver 接收枯夜。

可以看到 ObserveOnObserver.onNext() 中最后執(zhí)行了 schedule()弯汰,也就是在這里進(jìn)行了線程切換的操作。

由于我們傳入的 Scheduler 是 IO 線程湖雹,我們看看這個(gè) IO Schedule 的 worker.schedule(this)咏闪。

一路追蹤,終于找到了這個(gè) IOScheduler 的廬山真面目:

public final class IoScheduler extends Scheduler {

private static final String WORKER_THREAD_NAME_PREFIX = "RxCachedThreadScheduler";
    static final RxThreadFactory WORKER_THREAD_FACTORY;

    private static final String EVICTOR_THREAD_NAME_PREFIX = "RxCachedWorkerPoolEvictor";
    static final RxThreadFactory EVICTOR_THREAD_FACTORY;

    private static final long KEEP_ALIVE_TIME = 60;
    private static final TimeUnit KEEP_ALIVE_UNIT = TimeUnit.SECONDS;

    static final ThreadWorker SHUTDOWN_THREAD_WORKER;
    final ThreadFactory threadFactory;
    final AtomicReference<CachedWorkerPool> pool;
    
    public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }
    
    ......
    
    static final class EventLoopWorker extends Scheduler.Worker {
        private final CompositeDisposable tasks;
        private final CachedWorkerPool pool;
        private final ThreadWorker threadWorker;

        final AtomicBoolean once = new AtomicBoolean();

        EventLoopWorker(CachedWorkerPool pool) {
            this.pool = pool;
            this.tasks = new CompositeDisposable();
            this.threadWorker = pool.get();
        }

        @Override
        public void dispose() {
            if (once.compareAndSet(false, true)) {
                tasks.dispose();

                // releasing the pool should be the last action
                pool.release(threadWorker);
            }
        }

        @Override
        public boolean isDisposed() {
            return once.get();
        }

        @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }
            //放到線程池中執(zhí)行
            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }
    }

    ......
}

至此摔吏,你可以看到調(diào)用了 observerOn() 方法的全過程鸽嫂,只是會(huì)改變觀察者 observer 的 onNext()、onComplete() 方法的運(yùn)行線程龙助,不會(huì)改變被觀察者 Observable 的運(yùn)行線程抠藕。

四桦卒、observeOn() 切換線程原理小結(jié)

看完整個(gè)過程,我們知道當(dāng)我們使用 observeOn(Schedulers.io())的時(shí)候癣籽,其實(shí) Rxjava 在內(nèi)部幫我們創(chuàng)建封裝了若干個(gè)中間對(duì)象的 Observable 和 Observer。然后將這個(gè)訂閱操作放在 Rxjava 的線程池進(jìn)行滤祖,達(dá)到切換線程的功能筷狼。

被觀察者 Observable 的變化過程:Observable ==> ObservableCreate ==> ObserbvableObserveOn。

觀察者 Observer 的變化過程:Observer ==> ObserveOnObserver匠童,然后傳到 ObservableEmitter<String> emitter 里面桑逝,作為發(fā)射器的 observer 成員變量。

總之俏让,Observable#observeOn(Scheduler) 的實(shí)現(xiàn)原理在于將目標(biāo) Observer 的 onNext(T)/onError(Throwable)/onComplete() 置于指定線程中運(yùn)行楞遏。

五、subscribeOn() 栗子

new Thread("子線程"){
          @Override
          public void run() {
              Observable
                      .create(new ObservableOnSubscribe<String>() {
                          @Override
                          public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                              Log.e(TAG, "ObservableOnSubscribe#subscribe(): 所在線程為 " + Thread.currentThread().getName());
                              emitter.onNext("1");
                              emitter.onComplete();
                          }
                      })
                      .subscribeOn(Schedulers.io())
//                      .observeOn(Schedulers.io())
                      .subscribe(new Observer<String>() {
                          @Override
                          public void onSubscribe(Disposable d) {
                              Log.e(TAG, "observer#onSubscribe(): 所在線程為 " + Thread.currentThread().getName());
                          }

                          @Override
                          public void onNext(String s) {
                              Log.e(TAG, "observer#onNext(): 所在線程為 " + Thread.currentThread().getName());
                          }

                          @Override
                          public void onError(Throwable e) {
                          }

                          @Override
                          public void onComplete() {
                              Log.e(TAG, "observer#onComplete(): 所在線程為 " + Thread.currentThread().getName());
                          }
                      });
          }
      }.start();

輸出結(jié)果:

E/Rxjava: observer#onSubscribe(): 所在線程為 子線程
E/Rxjava: ObservableOnSubscribe#subscribe(): 所在線程為 RxCachedThreadScheduler-2
E/Rxjava: observer#onNext(): 所在線程為 RxCachedThreadScheduler-2
E/Rxjava: observer#onComplete(): 所在線程為 RxCachedThreadScheduler-2

六首昔、ObservableCreate.subscribeOn()

由上文可以寡喝,Observable.create() 會(huì)生成一個(gè) ObservableCreate 對(duì)象。我們看看 ObservableCreate.subscribeOn()勒奇。

public final Observable<T> subscribeOn(Scheduler scheduler) {
        //過濾無(wú)關(guān)緊要的代碼
        //this 是 ObservableCreate 對(duì)象
        return new ObservableSubscribeOn<T>(this, scheduler);
    }

可以看到將 ObservableCreate 對(duì)象封裝成了 ObservableSubscribeOn 對(duì)象预鬓,然后就會(huì)執(zhí)行 ObservableSubscribeOn#subscribeActual()

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    //這里的 source 是 ObservableCreate 對(duì)象赊颠,scheduler 是 IoScheduler
    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        //s 是最外層的 observer
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        
        //調(diào)用 observer#onSubscribe
        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    
    ......
}

可以看到 observer#onSubscribe() 仍在在當(dāng)前線程中執(zhí)行格二,之后的 observer 和 ObservableOnSubscribe 的方法都被線程切換類 IoScheduler 切換到了其他線程。

我們看看 IoScheduler 的 scheduler.scheduleDirect(new SubscribeTask(parent)))竣蹦。

final class SubscribeTask implements Runnable {
        //這個(gè) parent 就是 SubscribeOnObserver
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            //source 就是 ObservableCreate 對(duì)象
            //parent 就是 SubscribeOnObserver 對(duì)象
            source.subscribe(parent);
        }
    }
    
    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

        private static final long serialVersionUID = 8094547886072529208L;
        final Observer<? super T> actual;

        final AtomicReference<Disposable> s;

        SubscribeOnObserver(Observer<? super T> actual) {
            //actual 就是最外層的 observer 
            this.actual = actual;
            this.s = new AtomicReference<Disposable>();
        }

        @Override
        public void onSubscribe(Disposable s) {
            DisposableHelper.setOnce(this.s, s);
        }

        @Override
        public void onNext(T t) {
            actual.onNext(t);
        }

        @Override
        public void onError(Throwable t) {
            actual.onError(t);
        }

        @Override
        public void onComplete() {
            actual.onComplete();
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(s);
            DisposableHelper.dispose(this);
        }
        
        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }

        void setDisposable(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }
    }

可以看到將最外層的 observer 包裝成 SubscribeOnObserver 對(duì)象顶猜,然后包裝成一個(gè) SubscribeTask(可以執(zhí)行的任務(wù))。當(dāng)在線程池中被執(zhí)行的時(shí)候痘括,會(huì)執(zhí)行 SubscribeTask#run()长窄。

我們?cè)倏?IoSchedule#scheduleDirect(subscribeTask)滔吠。

在 IoSchedule 的父類 Schedule 中找到一個(gè)方法:

public abstract class Scheduler {
    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }
    
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        DisposeTask task = new DisposeTask(decoratedRun, w);
        //其實(shí)就是 EventLoopWorker#schedule()
        w.schedule(task, delay, unit);

        return task;
    }
}

又回到了 IOScheduler 創(chuàng)建的 EventLoopWorker 中:

static final class EventLoopWorker extends Scheduler.Worker {
        private final CompositeDisposable tasks;
        private final CachedWorkerPool pool;
        private final ThreadWorker threadWorker;

        final AtomicBoolean once = new AtomicBoolean();

        EventLoopWorker(CachedWorkerPool pool) {
            this.pool = pool;
            this.tasks = new CompositeDisposable();
            this.threadWorker = pool.get();
        }

        @Override
        public void dispose() {
            if (once.compareAndSet(false, true)) {
                tasks.dispose();

                // releasing the pool should be the last action
                pool.release(threadWorker);
            }
        }

        @Override
        public boolean isDisposed() {
            return once.get();
        }

        @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }
            
            //最終放進(jìn)線程池中執(zhí)行任務(wù)
            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }
    }

static final class ThreadWorker extends NewThreadWorker {
        private long expirationTime;

        ThreadWorker(ThreadFactory threadFactory) {
            super(threadFactory);
            this.expirationTime = 0L;
        }

        public long getExpirationTime() {
            return expirationTime;
        }

        public void setExpirationTime(long expirationTime) {
            this.expirationTime = expirationTime;
        }
    }
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    private final ScheduledExecutorService executor;

    volatile boolean disposed;

    public NewThreadWorker(ThreadFactory threadFactory) {
        executor = SchedulerPoolFactory.create(threadFactory);
    }
    
    ......
    
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }

        Future<?> f;
        try {
            if (delayTime <= 0) {
                //放進(jìn)線程池中執(zhí)行
                f = executor.submit((Callable<Object>)sr);
            } else {
                //放進(jìn)線程池中執(zhí)行
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }

        return sr;
    }
    
    ......
}

最終在線程池中執(zhí)行了我們的 SubscribeTask#run()。其實(shí)從 scheduleActual() 和 subscribeActual() 的命名方式可以看出挠日,Rxjava 很多地方都用到了靜態(tài)工廠模式疮绷,都是父類提供抽象方法,具體的子類根據(jù)需要實(shí)現(xiàn)不同的邏輯嚣潜,這個(gè)就很靈活了冬骚。

我們?cè)倏纯?SubscribeTask#run() 干了什么:

final class SubscribeTask implements Runnable {
        //這個(gè) parent 就是 SubscribeOnObserver
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            //source 就是 ObservableCreate 對(duì)象
            //parent 就是 SubscribeOnObserver 對(duì)象
            source.subscribe(parent);
        }
    }

**那其實(shí)就是 ObservableCreate.subscribe(SubscribeOnObserver),這就又跳到了我們熟悉的 ObservableCreate.subscribeActual() 方法中了懂算。

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    //source 是最外層的 ObservableOnSubscribe 對(duì)象
    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        //observer 是 SubscribeOnObserver 對(duì)象唉韭,里面包含最外層的 observer 對(duì)象
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        //這里 SubscribeOnObserver 只是簡(jiǎn)單地 set 了一個(gè)引用
        observer.onSubscribe(parent);

        try {
            //其實(shí)就是 ObservableOnSubscribe.subscribe(SubscribeOnObserver);
            //此時(shí)已經(jīng)運(yùn)行在 Rxjava 的線程池中
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    
    ......
}

在這里會(huì)先執(zhí)行 observer (SubscribeOnObserver)的 onSubscribe() 方法,這個(gè)方法就 set 了一個(gè)引用犯犁,可以先忽略属愤。接下來(lái)會(huì)調(diào)用 ObservableOnSubscribe.subscribe(SubscribeOnObserver)

Observable.create(new ObservableOnSubscribe<String>() {
                          @Override
                          public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                              Log.e(TAG, "ObservableOnSubscribe#subscribe(): 所在線程為 " + Thread.currentThread().getName());
                              emitter.onNext("1");
                              emitter.onComplete();
                          }
                      });

我們?cè)谧钔鈱又话l(fā)送了一個(gè) Next 事件酸役,根據(jù) CreateEmitter<T> 類的源碼:

public void onNext(T t) {
           
            if (!isDisposed()) {
                //observer 就是 SubscribeOnObserver 
                observer.onNext(t);
            }
        }

SubscribeOnObserver.onNext() 會(huì)觸發(fā):

static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

        private static final long serialVersionUID = 8094547886072529208L;
        final Observer<? super T> actual;

        final AtomicReference<Disposable> s;

        SubscribeOnObserver(Observer<? super T> actual) {
            //最外層的 observer 
            this.actual = actual;
            this.s = new AtomicReference<Disposable>();
        }

        @Override
        public void onSubscribe(Disposable s) {
            DisposableHelper.setOnce(this.s, s);
        }

        @Override
        public void onNext(T t) {
            //調(diào)用最外層的 observer#onNext()
            actual.onNext(t);
        }

        @Override
        public void onError(Throwable t) {
            actual.onError(t);
        }

        @Override
        public void onComplete() {
            //調(diào)用最外層的 observer#onComplete()
            actual.onComplete();
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(s);
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }

        void setDisposable(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }
    }

所以住诸,我們最外層的 observer 的 onNext() 和 onComplete() 會(huì)運(yùn)行在 Rxjava 的線程池的線程中。

至此涣澡,subscribeOn(Schedulers.io()) 的過程分析完畢贱呐,subscribeOn(Schedulers.io()) 會(huì)改變觀察者 observer 的 onNext()、onComplete() 方法的運(yùn)行線程入桂,也會(huì)改變被觀察者 Observable 的運(yùn)行線程奄薇。

七、subscribeOn() 切換線程原理小結(jié)

看完整個(gè)過程抗愁,我們知道當(dāng)我們使用 subscribeOn(Schedulers.io())的時(shí)候馁蒂,其實(shí)跟上面的 observeOn(Schedulers.io()) 過程差不多,Rxjava 幫我們創(chuàng)建了若干個(gè)中間層的 Observable 和 Observer蜘腌,然后將這個(gè)訂閱操作放在 Rxjava 的線程池進(jìn)行沫屡,達(dá)到切換線程的功能。

被觀察者 Observable 的變化過程:Observable ==> ObservableCreate ==> ObserbvableSubscribeOn撮珠。

觀察者 Observer 的變化過程:Observer ==> SubscribeOnObserver沮脖,然后傳到 ObservableEmitter<String> emitter 里面,作為發(fā)射器的 observer 成員變量芯急。

總之勺届,Observable#subscribeOn(Scheduler) 的實(shí)現(xiàn)原理在于將目標(biāo) Observer 的 onNext(T)/onError(Throwable)/onComplete() 和 ObservableOnSubscribe 的 subscribe(T) 置于指定線程中運(yùn)行。

八娶耍、subscribeOn() 和 observeOn(Schedulers.io()) 一起使用

這兩個(gè) api 一起使用其實(shí)也不會(huì)有什么很大的變化免姿,就是 observeOn() 會(huì)影響 Observer 的 onNext(T)/onError(Throwable)/onComplete() 運(yùn)行線程,而 subscribeOn() 會(huì)影響 ObservableOnSubscribe 的 subscribe(T) 運(yùn)行線程伺绽。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末养泡,一起剝皮案震驚了整個(gè)濱河市嗜湃,隨后出現(xiàn)的幾起案子奈应,更是在濱河造成了極大的恐慌澜掩,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,817評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件杖挣,死亡現(xiàn)場(chǎng)離奇詭異肩榕,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)惩妇,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,329評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門株汉,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人歌殃,你說(shuō)我怎么就攤上這事乔妈。” “怎么了氓皱?”我有些...
    開封第一講書人閱讀 157,354評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵路召,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我波材,道長(zhǎng)股淡,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,498評(píng)論 1 284
  • 正文 為了忘掉前任廷区,我火速辦了婚禮唯灵,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘隙轻。我一直安慰自己埠帕,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,600評(píng)論 6 386
  • 文/花漫 我一把揭開白布玖绿。 她就那樣靜靜地躺著搞监,像睡著了一般。 火紅的嫁衣襯著肌膚如雪镰矿。 梳的紋絲不亂的頭發(fā)上琐驴,一...
    開封第一講書人閱讀 49,829評(píng)論 1 290
  • 那天,我揣著相機(jī)與錄音秤标,去河邊找鬼绝淡。 笑死,一個(gè)胖子當(dāng)著我的面吹牛苍姜,可吹牛的內(nèi)容都是我干的牢酵。 我是一名探鬼主播,決...
    沈念sama閱讀 38,979評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼衙猪,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼馍乙!你這毒婦竟也來(lái)了布近?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,722評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤丝格,失蹤者是張志新(化名)和其女友劉穎撑瞧,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體显蝌,經(jīng)...
    沈念sama閱讀 44,189評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡预伺,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,519評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了曼尊。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片酬诀。...
    茶點(diǎn)故事閱讀 38,654評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖骆撇,靈堂內(nèi)的尸體忽然破棺而出瞒御,到底是詐尸還是另有隱情,我是刑警寧澤神郊,帶...
    沈念sama閱讀 34,329評(píng)論 4 330
  • 正文 年R本政府宣布肴裙,位于F島的核電站,受9級(jí)特大地震影響屿岂,放射性物質(zhì)發(fā)生泄漏践宴。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,940評(píng)論 3 313
  • 文/蒙蒙 一爷怀、第九天 我趴在偏房一處隱蔽的房頂上張望阻肩。 院中可真熱鬧,春花似錦运授、人聲如沸烤惊。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,762評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)柒室。三九已至,卻和暖如春逗宜,著一層夾襖步出監(jiān)牢的瞬間雄右,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,993評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工纺讲, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留擂仍,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,382評(píng)論 2 360
  • 正文 我出身青樓熬甚,卻偏偏與公主長(zhǎng)得像逢渔,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子乡括,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,543評(píng)論 2 349