RxJava之七——RxJava 2.0 圖文分析create()电抚、 subscribe()、map()竖共、observeOn()蝙叛、subscribeOn()源碼

前言

16年 的時(shí)候?qū)戇^(guò)兩篇關(guān)于Rxjava 1.0 的源碼分析,時(shí)過(guò)境遷公给,現(xiàn)在早已是2.0 了借帘。2.0 的代碼邏輯,封裝淌铐,更為易懂肺然,也包含了 一些新特性背壓,面向切面等等腿准。所以決定际起,寫(xiě)篇文章分析RxJava 2.0

關(guān)于RxJava,從表面上看起來(lái)很容易使用吐葱,但是如果理解不夠深刻街望,使用過(guò)程中,往往會(huì)出現(xiàn)一些問(wèn)題弟跑,所以我寫(xiě)了系列文章灾前,從入門(mén)到精通,從簡(jiǎn)單的使用到部分源碼詳解孟辑,希望能給讀者一個(gè)質(zhì)的飛躍:
1哎甲、RxJava之一——一次性學(xué)會(huì)使用RxJava RxJava簡(jiǎn)單的使用和使用它的好處
2、RxJava之二——Single和Subject 與Observable舉足輕重的類饲嗽,雖然用的少炭玫,但應(yīng)該知道
3、RxJava之三——RxJava 2.0 全部操作符示例
4貌虾、RxJava之四—— Lift()詳解 想要了解Operators础嫡,Lift()一定要學(xué)習(xí)
5、RxJava之五—— observeOn()與subscribeOn()的詳解Scheduler線程切換的原理
6、RxJava之六——RxBus 通過(guò)RxJava來(lái)替換EventBus
7榴鼎、RxJava之七——RxJava 2.0 圖文分析create()伯诬、 subscribe()、map()巫财、observeOn()盗似、subscribeOn()源碼 這張圖可能是全網(wǎng)最詳細(xì) 明了的圖

Rxjava2.x 與1.x 的相關(guān)文章:

關(guān)于 RxJava 最友好的文章—— RxJava 2.0 全新來(lái)襲
官方文檔:What's different in 2.0
RxJava github

示例

Rxjava的使用流程,相信大家都很清楚了平项,以下面這個(gè)簡(jiǎn)單的demo赫舒,重點(diǎn)分析一下create()、 subscribe()闽瓢、map()接癌、observeOn()、subscribeOn()源碼扣讼。 只要了解這些源碼缺猛,再去看其他的類都會(huì)有似曾相識(shí)的感覺(jué)

        Observable.create(object : ObservableOnSubscribe<Int> {
            override fun subscribe(emitter: ObservableEmitter<Int>) {
                  emitter.onNext(1)
            }})
            .map(object : Function<Int, String> {
                override fun apply(t: Int): String {
                    return t.toString()
                }}
            )
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(object : Observer<String> {
                override fun onComplete() {
                }

                override fun onSubscribe(d: Disposable) {
                }

                override fun onNext(t: String) {
                }

                override fun onError(e: Throwable) {
                }
            })

在看源代碼時(shí),被其中一層一層的封裝和調(diào)用椭符,類名搞的暈暈的荔燎,一不留神就不知道誰(shuí)是誰(shuí)。
我把源代碼的簡(jiǎn)單流程 销钝,畫(huà)了個(gè)圖有咨,先腦子里有個(gè)整體的輪廓,再去看代碼蒸健,會(huì)清晰很多

關(guān)于流程圖的介紹:

這張圖座享,可能是全網(wǎng)關(guān)于rxjava2 大體流程,最詳細(xì)的圖似忧。為了排版和方便理解渣叛,簡(jiǎn)化了函數(shù)的關(guān)系,忽略了很多細(xì)節(jié)


在這里插入圖片描述
  • 綠色模塊 表示訂閱者橡娄,例如 ObservableFlowable诗箍,或者他們的子類

  • 藍(lán)色模塊 表示觀察者癣籽,例如 ObserverSubscriber挽唉,或者他們的子類

  • 青色模塊 表示數(shù)據(jù)發(fā)送,例如:ObservableOnSubscribe筷狼,ObservableSource瓶籽,等等

  • 黃色模塊 表示切換了線程

  • 每個(gè)模塊右上角表示當(dāng)前類的名稱

  • 綠色的線 表示函數(shù)調(diào)用

  • 藍(lán)色的線 表示訂閱

  • 紅色的線 表示數(shù)據(jù)發(fā)送

  • 黑色的線 表示對(duì)象是什么或者對(duì)象從哪里來(lái)的

基本流程

還是以上面的demo為例,拋開(kāi)操作符埂材,只分析主要流程

先來(lái)看下塑顺,demo中都使用哪些類和接口:

  • Observable 訂閱者
  • Observer 觀察者
  • ObservableOnSubscribe 數(shù)據(jù)發(fā)送源
public interface ObservableOnSubscribe<T> {

    // ObservableEmitter 也是一個(gè)接口,它繼承接口Emitter
    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}

//這個(gè)接口,有常見(jiàn)的幾種函數(shù)严拒。在demo中可以看到扬绪,調(diào)用的oNext 就是這個(gè)接口的函數(shù)
public interface Emitter<T> {
    void oNext(@NonNull T value);
    void onError(@NonNull Throwable error);
    void onComplete();
}

下面一步一步進(jìn)入源碼:

create

    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
         // 判斷非空
        ObjectHelper.requireNonNull(source, "source is null");
        //RxJavaPlugins 相當(dāng)于是切面編程,會(huì)對(duì)所有的中間訂閱者進(jìn)行自定義修改裤唠,如果沒(méi)有設(shè)置過(guò)挤牛。就直接當(dāng)前參數(shù)
        //創(chuàng)建一個(gè) ObservableCreate,它就是上圖中的綠色模塊
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

來(lái)看一個(gè)RxJavaPlugins.onAssembly

    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        // onObservableAssembly  需要手動(dòng)設(shè)置种蘸,實(shí)現(xiàn)切面效果
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        //如果沒(méi)有設(shè)置 就是null墓赴,直接返回參數(shù)source
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

create 傳入的是ObservableOnSubscribe 類型參數(shù),需要的Observable返回類型航瞭,這里的ObservableCreate 繼承了Observable 并實(shí)現(xiàn)接口subscribeActual诫硕,引用了ObservableOnSubscribe 并調(diào)用它的接口subscribe,所以它算是一種適配器模式刊侯。

public final class ObservableCreate<T> extends Observable<T> {

    // 每個(gè)Observable 的實(shí)現(xiàn)類章办,都有一個(gè)source,表示的是上游
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    // subscribe 最終會(huì)調(diào)用到這里
    // observer 是觀察者滔吠,它里面實(shí)現(xiàn)了onNext纲菌、onSubscribe 等 這些函數(shù)
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        //調(diào)用了觀察者的onSubscribe 方法
        observer.onSubscribe(parent);

        try {
            //這個(gè)上游source 就是demo中的ObservableOnSubscribe 實(shí)現(xiàn),
            //調(diào)用了subscribe疮绷,以本例來(lái)說(shuō)翰舌,就會(huì)執(zhí)行 emitter.onNext(1)
            // 那么問(wèn)題來(lái)了,subscribeActual 會(huì)在什么時(shí)候執(zhí)行呢冬骚?
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

subscribe

需要注意椅贱,代碼執(zhí)行到這里,是誰(shuí)調(diào)用了subscribe只冻,create() 返回的是Observable類型 ObservableCreate庇麦,所以是ObservableCreate調(diào)用了subscribe,那么關(guān)于Observable的接口喜德,自然也會(huì)調(diào)用到ObservableCreate

    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            // 面向切面的設(shè)置山橄,如果沒(méi)有設(shè)置,直接返回observer
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
            //以本例來(lái)看舍悯,就是執(zhí)行ObservableCreate 中的subscribeActual 
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            ... 省略代碼 ...
        }
    }
ObservableCreate 類
    // subscribe 最終會(huì)調(diào)用到這里
    // observer 是觀察者航棱,它里面實(shí)現(xiàn)了onNext、onSubscribe 等 這些函數(shù)
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
         // 把觀察者萌衬,封裝成CreateEmitter
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        //調(diào)用了觀察者的onSubscribe 方法
        observer.onSubscribe(parent);

        try {
            //這個(gè)上游source 就是demo中的ObservableOnSubscribe 實(shí)現(xiàn)饮醇,
            //調(diào)用了subscribe,以本例來(lái)說(shuō)秕豫,就會(huì)執(zhí)行 emitter.onNext(1)朴艰,也就是執(zhí)行了parent.onNext(1)
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

最后進(jìn)入CreateEmitter 來(lái)看一下:

 static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {

        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            //demo 中的observer 最終被傳遞到這里
            this.observer = observer;
        }

       // 在subscribeActual 調(diào)用到這里
        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                //如果沒(méi)有disposed ,就會(huì)執(zhí)行demo中observer的onNext
                observer.onNext(t);
            }
        }
          ... 省略代碼 ...

        @Override
        public void onComplete() {
            //如果沒(méi)有被dispose,會(huì)調(diào)用Observer的onComplete()方法
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    //執(zhí)行完調(diào)用dispose
                    dispose();
                }
            }
        }

       ... 省略代碼 ...
    }

源碼看到這里祠墅,再去看上圖侮穿,思路應(yīng)該更清晰,但是應(yīng)該對(duì)整圖還不是很了解毁嗦。

仔細(xì)總結(jié)一下會(huì)發(fā)現(xiàn)撮珠,在create后,創(chuàng)建了ObservableCreate 金矛,他知道上游(source)芯急,知道被訂閱后的處理方式(subscribeActual )也就是如何把數(shù)據(jù)發(fā)給下游,但是它需要等待調(diào)用subscribe驶俊,才會(huì)最終觸發(fā)這個(gè)流程娶耍。

而且ObservableCreate 是繼承于Observable ,對(duì)設(shè)計(jì)模式敏感的小伙伴饼酿,可能會(huì)想到裝飾著模式榕酒,沒(méi)錯(cuò),所謂的操作符故俐,無(wú)非就是用裝飾著模式包裹一層想鹰,讓他也知道上游(source),知道如何數(shù)據(jù)發(fā)給下游(實(shí)現(xiàn)subscribeActual )药版,最終subscribe 一調(diào)用辑舷,這個(gè)過(guò)程就被觸發(fā)阵苇。

如果感覺(jué)混亂讶舰,沒(méi)關(guān)系,下面跟著源碼走一下蕴坪,就會(huì)豁然開(kāi)朗

map操作符

調(diào)用map还栓,會(huì)執(zhí)行下面的函數(shù)

    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        //發(fā)現(xiàn)了嗎碌廓,上面的create 創(chuàng)建ObservableCreate  , map 創(chuàng)建了 ObservableMap 
        //沒(méi)錯(cuò)他們都是Observable的子類
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }

有了新的Observable (ObservableMap)剩盒,那肯定得有新的Observer谷婆,不然ObservableMap和誰(shuí)關(guān)聯(lián)呢?是的辽聊,新的Observer就是 MapObserver

//AbstractObservableWithUpstream 繼承于Observable纪挎,有成員變量source
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    //這里傳入的是this,也就是本例中的ObservableCreate 身隐,它繼承于Observable 繼承于ObservableSource
    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        // 把上游的源保存起來(lái)
        super(source);
        //保存當(dāng)前的操作函數(shù)廷区,也就是demo 中map 里實(shí)現(xiàn)的函數(shù)
        this.function = function;
    }

    // 太熟悉了唯灵,又是它贾铝,等ObservableMap 調(diào)用subscribe 的時(shí)候,會(huì)調(diào)用到這里
    @Override
    public void subscribeActual(Observer<? super U> t) {
       // 調(diào)用了上游(source)的subscribe,這就相當(dāng)于觸發(fā)了上游
       // 傳入的參數(shù)是MapObserver垢揩,聯(lián)想到上面的subscribe 直接訂閱Observer
       // 這個(gè)MapObserver繼承Observer玖绿,它的參數(shù)t 也是Observer,也就是把下游的Observer 包裹了一層叁巨,傳遞給上游斑匪。這正是裝飾者模式
       // 本例中emitter.onNext(1)執(zhí)行后,也就是會(huì)執(zhí)行MapObserver中的onNext
        source.subscribe(new MapObserver<T, U>(t, function));
    }

     //BasicFuseableObserver 繼承了Observer锋勺,有成員變量downstream蚀瘸、upstream 等
    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            // 把下游的Observer 保存在downstream 中
            super(actual);
            // 保存map 變換操作
            this.mapper = mapper;
        }

        // 本例中,onNext 是在emitter.onNext(1) 執(zhí)行后庶橱,調(diào)用到這里的
        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                downstream.onNext(null);
                return;
            }
            //目標(biāo)類型
            U v;

            try {
                // 實(shí)現(xiàn)了變化操作贮勃,把t 轉(zhuǎn)為 v 
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            
            //調(diào)用了下游onNext,繼續(xù)分發(fā)數(shù)據(jù)苏章,此時(shí)的數(shù)據(jù)是轉(zhuǎn)換后的目標(biāo)數(shù)據(jù)
            downstream.onNext(v);
        }

        @Override
        public int requestFusion(int mode) {
            return transitiveBoundaryFusion(mode);
        }

        @Nullable
        @Override
        public U poll() throws Exception {
            T t = qd.poll();
            return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
        }
    }
}

此時(shí)再去看看上面的圖寂嘉,是不是有點(diǎn)感覺(jué)了,沒(méi)錯(cuò)枫绅,全部都是這個(gè)套路泉孩,只是每次在裝飾的時(shí)候,行為不一樣

subscribeOn操作符

作用是控制subscribe 的線程并淋,下面來(lái)看看subscribeOn 是如何實(shí)現(xiàn)的

不仔細(xì)看代碼寓搬,還以為作者直接把實(shí)現(xiàn)map的代碼拷貝了一份,簡(jiǎn)直太相似了

    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        //使用原來(lái)的Observable 和調(diào)度線程县耽,創(chuàng)建一個(gè)新的Observable订咸,就是ObservableSubscribeOn
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

依舊是創(chuàng)建新的Observable(ObservableSubscribeOn) 和 Observer (SubscribeOnObserver)

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        // 依舊是保存了上游 ObservableSource
        super(source);
        // 保存了線程調(diào)度的Scheduler 
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> observer) {
        // 使用裝飾著模式,把原來(lái)的Observer 封裝了一層
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);

        //注意酬诀,這里還沒(méi)有切換新城脏嚷,調(diào)用了onSubscribe
        observer.onSubscribe(parent);
        //這里是重點(diǎn),
        //scheduler.scheduleDirect(new SubscribeTask(parent))   在新線程中執(zhí)行parent瞒御,
        //parent.setDisposable  把新線程任務(wù)父叙,加入到DisposableHelper,如果手動(dòng)dispose后肴裙,保證線程可以停止
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

        private static final long serialVersionUID = 8094547886072529208L;
        final Observer<? super T> downstream;

        final AtomicReference<Disposable> upstream;

        SubscribeOnObserver(Observer<? super T> downstream) {
            this.downstream = downstream;
            this.upstream = new AtomicReference<Disposable>();
        }

        @Override
        public void onSubscribe(Disposable d) {
            DisposableHelper.setOnce(this.upstream, d);
        }

       // 下面的常規(guī)操作方法趾唱,就是調(diào)用downstream  的各個(gè)操作方法
        @Override
        public void onNext(T t) {
            downstream.onNext(t);
        }

        @Override
        public void onError(Throwable t) {
            downstream.onError(t);
        }

        @Override
        public void onComplete() {
            downstream.onComplete();
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(upstream);
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }

        void setDisposable(Disposable d) {
           //把當(dāng)前
            DisposableHelper.setOnce(this, d);
        }
    }

    // 實(shí)現(xiàn)了線程的Runnable 接口,目的是讓線程可以調(diào)度
    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            // 會(huì)在新的線程中調(diào)用蜻懦,source是上游的Observable
            source.subscribe(parent);
        }
    }
}

截止到這里甜癞,subscribeOn總體的邏輯已經(jīng),搞清楚了宛乃,再深入一點(diǎn)看一下scheduler.scheduleDirect(new SubscribeTask(parent)) 是如何實(shí)現(xiàn)線程切換的

    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run) {
        //調(diào)用scheduleDirect悠咱,參數(shù)表示立即執(zhí)行
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }

    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        //創(chuàng)建了一個(gè)worker蒸辆,注意這里的createWorker(),是一個(gè)抽象方法析既,不同的線程躬贡,創(chuàng)建的worker 不一樣。
        // 例如:Schedulers.io()  創(chuàng)建的是 EventLoopWorker
        final Worker w = createWorker();

        // 依舊是切面編程眼坏,對(duì)每個(gè)切換線程的拂玻, 包裹一層
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        //把worker 和 Runnable  封裝成DisposeTask ,方便外界調(diào)用disposed宰译,來(lái)停止它的運(yùn)行
        DisposeTask task = new DisposeTask(decoratedRun, w);
        //立即開(kāi)始執(zhí)行
        w.schedule(task, delay, unit);

        return task;
    }

在深入看一下上面的DisposeTask檐蚜,是如何封裝的。

這段代碼沿侈,我多說(shuō)一句熬甚,你都會(huì)嫌我啰嗦

    static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection {

        @NonNull
        final Runnable decoratedRun;

        @NonNull
        final Worker w;

        @Nullable
        Thread runner;

        DisposeTask(@NonNull Runnable decoratedRun, @NonNull Worker w) {
            this.decoratedRun = decoratedRun;
            this.w = w;
        }

        @Override
        public void run() {
            runner = Thread.currentThread();
            try {
                decoratedRun.run();
            } finally {
                dispose();
                runner = null;
            }
        }

        @Override
        public void dispose() {
            if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
                ((NewThreadWorker)w).shutdown();
            } else {
                w.dispose();
            }
        }

        @Override
        public boolean isDisposed() {
            return w.isDisposed();
        }

        @Override
        public Runnable getWrappedRunnable() {
            return this.decoratedRun;
        }
    }

總結(jié):

現(xiàn)在你再結(jié)合上圖來(lái)理解一下,是不是瞬間感覺(jué)肋坚,WC乡括,這么簡(jiǎn)單的代碼,也有人寫(xiě)博客發(fā)布

懂了subscribeOn的源碼智厌,那么想想诲泌,大家經(jīng)常討論的多個(gè)subscribeOn 調(diào)用,線程切換的問(wèn)題铣鹏,兩句話敷扫。

  1. subscribeOn 對(duì)上游訂閱有效
  2. 最上面的subscribeOn 對(duì)發(fā)送數(shù)據(jù)有效

observeOn操作符

控制 onNextonComplete 等數(shù)據(jù)消費(fèi)方法的線程诚卸,

下面的分析略有差異葵第,但總體和上面還是一樣的,你也可以自行read fuck source code 合溺,相信你的印象會(huì)更加深刻

    public final Observable<T> observeOn(Scheduler scheduler) {
        //為什么這里卒密,有bufferSize()
        //這就是Rxjava 2 所添加背壓的概念,rxjava 1  會(huì)有一個(gè)問(wèn)題棠赛,就是如果上游一直發(fā)送數(shù)據(jù)onNext 哮奇,拼命的調(diào)用,
        // 但是onNext() 處理的不夠快睛约,就會(huì)出現(xiàn)棧溢出鼎俘。這個(gè)bufferSize 就是控制了,緩沖區(qū)的大小
        return observeOn(scheduler, false, bufferSize());
    }

    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        //重點(diǎn)ObservableObserveOn
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

創(chuàng)建新的Observable (ObservableObserveOn) 和 Observer(ObserveOnObserver)

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        //保存上游Observable 
        super(source);
        //保存調(diào)度線程
        this.scheduler = scheduler;
        //
        this.delayError = delayError;
        //緩沖區(qū)
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        //TrampolineScheduler 官方文檔的解釋  * Schedules work on the current thread but does not execute immediately. Work is put in a queue and executed after the current unit of work is completed.
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            // 創(chuàng)建一個(gè)worker辩涝,線程的調(diào)度就是在這里
            Scheduler.Worker w = scheduler.createWorker();
            // 因?yàn)閛bserveOn 是控制處理數(shù)據(jù)的線程贸伐,所以在訂閱的時(shí)候,不去切換線程
            // 把線程worker怔揩,緩沖區(qū)捉邢,下游observer  都都封裝進(jìn)ObserveOnObserver 脯丝,等到onSubscribe,onNext 調(diào)用到來(lái)是歌逢,在去切換線程去處理
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
    
     ...   省略 ObserveOnObserver 代碼 ...
}

下面來(lái)看一下 ObserveOnObserver

    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {

        private static final long serialVersionUID = 6576896619930983584L;
        final Observer<? super T> downstream;
        final Scheduler.Worker worker;
        final boolean delayError;
        final int bufferSize;

        SimpleQueue<T> queue;

        Disposable upstream;

        Throwable error;
        volatile boolean done;

        volatile boolean disposed;

        int sourceMode;

        boolean outputFused;

        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.downstream = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }


        @Override
        public void onSubscribe(Disposable d) {
            if (DisposableHelper.validate(this.upstream, d)) {
                this.upstream = d;
                // 上游的Observer 被封裝成了QueueDisposable,那么使用這里面的sourceMode 翘狱,queue  等信息
                if (d instanceof QueueDisposable) {
                    @SuppressWarnings("unchecked")
                    QueueDisposable<T> qd = (QueueDisposable<T>) d;

                    int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);

                    if (m == QueueDisposable.SYNC) {
                        sourceMode = m;
                        queue = qd;
                        done = true;
                        downstream.onSubscribe(this);
                        schedule();
                        return;
                    }
                    if (m == QueueDisposable.ASYNC) {
                        sourceMode = m;
                        queue = qd;
                        downstream.onSubscribe(this);
                        return;
                    }
                }
                //大多數(shù)情況會(huì)到這里創(chuàng)建一個(gè)隊(duì)列秘案,使用指定的緩沖區(qū)
                queue = new SpscLinkedArrayQueue<T>(bufferSize);

                downstream.onSubscribe(this);
            }
        }

        @Override
        public void onNext(T t) {
            //執(zhí)行過(guò)error / complete 會(huì)是true
            if (done) {
                return;
            }
            //如果不是異步的,就把數(shù)據(jù)放到隊(duì)列中
            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            //開(kāi)始執(zhí)行
            schedule();
        }

        @Override
        public void onError(Throwable t) {
            if (done) {
                RxJavaPlugins.onError(t);
                return;
            }
            error = t;
            done = true;
            schedule();
        }

        @Override
        public void onComplete() {
            if (done) {
                return;
            }
            done = true;
            schedule();
        }

        @Override
        public void dispose() {
            if (!disposed) {
                // 結(jié)束運(yùn)行
                disposed = true;
                upstream.dispose();
                worker.dispose();
                if (getAndIncrement() == 0) {
                    queue.clear();
                }
            }
        }

        @Override
        public boolean isDisposed() {
            return disposed;
        }

        void schedule() {
            //該類繼承了潦匈,AtomicInteger阱高,多線程下保證worker值開(kāi)啟一個(gè)線程
            //有可能調(diào)用onNext 是在不同的線程
            if (getAndIncrement() == 0) {
                // this 是一個(gè)Runable 接口,在新線程中執(zhí)行
                worker.schedule(this);
            }
        }

        void drainNormal() {
            int missed = 1;

            final SimpleQueue<T> q = queue;
            final Observer<? super T> a = downstream;

            for (;;) {
                //判斷是否需要結(jié)束
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }

                for (;;) {
                    boolean d = done;
                    T v;

                    try {
                       //隊(duì)列中取出一個(gè)數(shù)據(jù)
                        v = q.poll();
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        disposed = true;
                        upstream.dispose();
                        q.clear();
                        a.onError(ex);
                        worker.dispose();
                        return;
                    }
                    boolean empty = v == null;
                    //判斷是否需要結(jié)束
                    if (checkTerminated(d, empty, a)) {
                        return;
                    }
                    //隊(duì)列是否為空
                    if (empty) {
                        break;
                    }
                    //發(fā)送數(shù)據(jù)給下游
                    a.onNext(v);
                }

                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }

        void drainFused() {
            int missed = 1;

            for (;;) {
                if (disposed) {
                    return;
                }

                boolean d = done;
                Throwable ex = error;

                if (!delayError && d && ex != null) {
                    disposed = true;
                    downstream.onError(error);
                    worker.dispose();
                    return;
                }

                downstream.onNext(null);

                if (d) {
                    disposed = true;
                    ex = error;
                    if (ex != null) {
                        downstream.onError(ex);
                    } else {
                        downstream.onComplete();
                    }
                    worker.dispose();
                    return;
                }

                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }

        //上面work.schedule(this) 后茬缩,會(huì)執(zhí)行這里
        @Override
        public void run() {
            if (outputFused) {
                //這里是有關(guān)背壓的赤惊,暫時(shí)先不講解
                drainFused();
            } else {
                drainNormal();
            }
        }

        boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
            if (disposed) {
                queue.clear();
                return true;
            }
            // 是否已經(jīng)完成
            if (d) {
                Throwable e = error;
                //是否分發(fā)Error
                if (delayError) {
                   //隊(duì)列是否為空
                    if (empty) {
                        disposed = true;
                        //是否有錯(cuò)誤信息
                        if (e != null) {
                            a.onError(e);
                        } else {
                            a.onComplete();
                        }
                        worker.dispose();
                        return true;
                    }
                } else {
                    if (e != null) {
                        disposed = true;
                        queue.clear();
                        a.onError(e);
                        worker.dispose();
                        return true;
                    } else
                    if (empty) {
                        disposed = true;
                        a.onComplete();
                        worker.dispose();
                        return true;
                    }
                }
            }
            return false;
        }

        @Override
        public int requestFusion(int mode) {
           // 這部分也與背壓有關(guān),暫時(shí)不展開(kāi)
            if ((mode & ASYNC) != 0) {
                outputFused = true;
                return ASYNC;
            }
            return NONE;
        }

        @Nullable
        @Override
        public T poll() throws Exception {
            return queue.poll();
        }

        @Override
        public void clear() {
            queue.clear();
        }

        @Override
        public boolean isEmpty() {
            return queue.isEmpty();
        }
    }
}

總結(jié):

到這里應(yīng)該對(duì)observeOn 也一定的了解凰锡,多個(gè)observeOn 嵌套的問(wèn)題未舟,一句話,只對(duì)下游的Observer有效掂为。再結(jié)合上圖的流程裕膀,subscribeOn 和 observeOn 胡亂嵌套,都能理清楚了勇哗。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末昼扛,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子欲诺,更是在濱河造成了極大的恐慌抄谐,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,378評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件扰法,死亡現(xiàn)場(chǎng)離奇詭異蛹含,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)塞颁,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,356評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門(mén)挣惰,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人殴边,你說(shuō)我怎么就攤上這事憎茂。” “怎么了锤岸?”我有些...
    開(kāi)封第一講書(shū)人閱讀 152,702評(píng)論 0 342
  • 文/不壞的土叔 我叫張陵竖幔,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我是偷,道長(zhǎng)拳氢,這世上最難降的妖魔是什么募逞? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,259評(píng)論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮馋评,結(jié)果婚禮上放接,老公的妹妹穿的比我還像新娘。我一直安慰自己留特,他們只是感情好纠脾,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,263評(píng)論 5 371
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著蜕青,像睡著了一般苟蹈。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上右核,一...
    開(kāi)封第一講書(shū)人閱讀 49,036評(píng)論 1 285
  • 那天慧脱,我揣著相機(jī)與錄音,去河邊找鬼贺喝。 笑死菱鸥,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的躏鱼。 我是一名探鬼主播采缚,決...
    沈念sama閱讀 38,349評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼挠他!你這毒婦竟也來(lái)了扳抽?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 36,979評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤殖侵,失蹤者是張志新(化名)和其女友劉穎贸呢,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體拢军,經(jīng)...
    沈念sama閱讀 43,469評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡楞陷,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,938評(píng)論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了茉唉。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片固蛾。...
    茶點(diǎn)故事閱讀 38,059評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖度陆,靈堂內(nèi)的尸體忽然破棺而出艾凯,到底是詐尸還是另有隱情,我是刑警寧澤懂傀,帶...
    沈念sama閱讀 33,703評(píng)論 4 323
  • 正文 年R本政府宣布趾诗,位于F島的核電站,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏恃泪。R本人自食惡果不足惜郑兴,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,257評(píng)論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望贝乎。 院中可真熱鬧情连,春花似錦、人聲如沸览效。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,262評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)朽肥。三九已至禁筏,卻和暖如春持钉,著一層夾襖步出監(jiān)牢的瞬間衡招,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,485評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工每强, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留始腾,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 45,501評(píng)論 2 354
  • 正文 我出身青樓空执,卻偏偏與公主長(zhǎng)得像浪箭,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子辨绊,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,792評(píng)論 2 345