RxJava2 源碼解析(二)

轉(zhuǎn)載請(qǐng)標(biāo)明出處:
http://www.reibang.com/p/6ef45f8ee79d
本文出自:【張旭童的簡(jiǎn)書(shū)】 (http://www.reibang.com/users/8e91ff99b072/latest_articles)

概述

承接上一篇RxJava2 源碼解析(一)所袁,
本系列我們的目的:

  1. 知道源頭(Observable)是如何將數(shù)據(jù)發(fā)送出去的话浇。
  2. 知道終點(diǎn)(Observer)是如何接收到數(shù)據(jù)的。
  3. 何時(shí)將源頭和終點(diǎn)關(guān)聯(lián)起來(lái)的
  4. 知道線程調(diào)度是怎么實(shí)現(xiàn)的
  5. 知道操作符是怎么實(shí)現(xiàn)的

本篇計(jì)劃講解一下4,5.

RxJava最強(qiáng)大的莫過(guò)于它的線程調(diào)度花式操作符丁稀。

map操作符

map是一個(gè)高頻的操作符达址,我們首先拿他開(kāi)刀铃彰。
例子如下趴俘,源頭Observable發(fā)送的是String類(lèi)型的數(shù)字蜗巧,利用map轉(zhuǎn)換成int型掌眠,最終在終點(diǎn)Observer接受到的也是int類(lèi)型數(shù)據(jù)。:

        final Observable<String> testCreateObservable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("1");
                e.onComplete()
            }
        });
                Observable<Integer> map = testCreateObservable.map(new Function<String, Integer>() {
                    @Override
                    public Integer apply(String s) throws Exception {
                        return Integer.parseInt(s);
                    }
                });
                map.subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe() called with: d = [" + d + "]");
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "onNext() called with: value = [" + value + "]");
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError() called with: e = [" + e + "]");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete() called");
                    }
                });

我們看一下map函數(shù)的源碼:

    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        //判空略過(guò)
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        //RxJavaPlugins.onAssembly()是hook 上文提到過(guò)
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }

RxJavaPlugins.onAssembly()是hook 上文提到過(guò),所以我們只要看ObservableMap幕屹,它就是返回到我們手里的Observable:

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    //將function變換函數(shù)類(lèi)保存起來(lái)
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        //super()將上游的Observable保存起來(lái) 蓝丙,用于subscribeActual()中用级遭。
        super(source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }

它繼承自AbstractObservableWithUpstream,該類(lèi)繼承自Observable,很簡(jiǎn)單渺尘,就是將上游的ObservableSource保存起來(lái)挫鸽,做一次wrapper,所以它也算是裝飾者模式的提現(xiàn)鸥跟,如下:

abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
    //將上游的`ObservableSource`保存起來(lái)
    protected final ObservableSource<T> source;
    AbstractObservableWithUpstream(ObservableSource<T> source) {
        this.source = source;
    }
    @Override
    public final ObservableSource<T> source() {
        return source;
    }
}

關(guān)于ObservableSource丢郊,代表了一個(gè)標(biāo)準(zhǔn)的無(wú)背壓的 源數(shù)據(jù)接口,可以被Observer消費(fèi)(訂閱)医咨,如下:

public interface ObservableSource<T> {
    void subscribe(Observer<? super T> observer);
}

所有的Observable都已經(jīng)實(shí)現(xiàn)了它,所以我們可以認(rèn)為ObservableObservableSource在本文中是相等的

public abstract class Observable<T> implements ObservableSource<T> {

所以我們得到的ObservableMap對(duì)象也很簡(jiǎn)單蚂夕,就是將上游的Observable和變換函數(shù)類(lèi)Function保存起來(lái)
Function的定義超級(jí)簡(jiǎn)單腋逆,就是一個(gè)接口婿牍,給我一個(gè)T,還你一個(gè)R.

public interface Function<T, R> {
    R apply(T t) throws Exception;
}

本例寫(xiě)的是將String->int.

重頭戲惩歉,subscribeActual()是訂閱真正發(fā)生的地方等脂,ObservableMap如下編寫(xiě),就一句話撑蚌,用MapObserver訂閱上游Observable上遥。

    @Override
    public void subscribeActual(Observer<? super U> t) {
    //用MapObserver訂閱上游Observable。
        source.subscribe(new MapObserver<T, U>(t, function));
    }

MapObserver也是裝飾者模式争涌,對(duì)終點(diǎn)(下游)Observer修飾粉楚。

    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;
        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            //super()將actual保存起來(lái)
            super(actual);
            //保存Function變量
            this.mapper = mapper;
        }
        @Override
        public void onNext(T t) {
            //done在onError 和 onComplete以后才會(huì)是true,默認(rèn)這里是false亮垫,所以跳過(guò)
            if (done) {
                return;
            }
            //默認(rèn)sourceMode是0模软,所以跳過(guò)
            if (sourceMode != NONE) {
                actual.onNext(null);
                return;
            }
            //下游Observer接受的值
            U v;
            //這一步執(zhí)行變換,將上游傳過(guò)來(lái)的T,利用Function轉(zhuǎn)換成下游需要的U饮潦。
            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            //變換后傳遞給下游Observer
            actual.onNext(v);
        }

到此我們梳理一下流程:
訂閱的過(guò)程燃异,是從下游到上游依次訂閱的。

  1. 終點(diǎn) Observer 訂閱了 map 返回的ObservableMap继蜡。
  2. 然后mapObservable(ObservableMap)在被訂閱時(shí)回俐,會(huì)訂閱其內(nèi)部保存上游Observable,用于訂閱上游的Observer是一個(gè)裝飾者(MapObserver)稀并,內(nèi)部保存了下游(本例是終點(diǎn))Observer仅颇,以便上游發(fā)送數(shù)據(jù)過(guò)來(lái)時(shí),能傳遞給下游碘举。
  3. 以此類(lèi)推忘瓦,直到源頭Observable被訂閱,根據(jù)上節(jié)課內(nèi)容殴俱,它開(kāi)始向Observer發(fā)送數(shù)據(jù)政冻。

數(shù)據(jù)傳遞的過(guò)程枚抵,當(dāng)然是從上游push到下游的线欲,

  1. 源頭Observable傳遞數(shù)據(jù)給下游Observer(本例就是MapObserver
  2. 然后MapObserver接收到數(shù)據(jù)明场,對(duì)其變換操作后(實(shí)際的function在這一步執(zhí)行),再調(diào)用內(nèi)部保存的下游ObserveronNext()發(fā)送數(shù)據(jù)給下游
  3. 以此類(lèi)推李丰,直到終點(diǎn)Observer苦锨。

線程調(diào)度subscribeOn

簡(jiǎn)化問(wèn)題,代碼如下:

                Observable.create(new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(ObservableEmitter<String> e) throws Exception {
                        Log.d(TAG, "subscribe() called with: e = [" + e + "]" + Thread.currentThread());
                        e.onNext("1");
                        e.onComplete();
                    }
                    //只是在Observable和Observer之間增加了一句線程調(diào)度代碼
                }).subscribeOn(Schedulers.io())
                        .subscribe(new Observer<String>() {
                            @Override
                            public void onSubscribe(Disposable d) {
                                Log.d(TAG, "onSubscribe() called with: d = [" + d + "]");
                            }
                            @Override
                            public void onNext(String value) {
                                Log.d(TAG, "onNext() called with: value = [" + value + "]");
                            }
                            @Override
                            public void onError(Throwable e) {
                                Log.d(TAG, "onError() called with: e = [" + e + "]");
                            }
                            @Override
                            public void onComplete() {
                                Log.d(TAG, "onComplete() called");
                            }
                        });

只是在ObservableObserver之間增加了一句線程調(diào)度代碼:.subscribeOn(Schedulers.io()).
查看subscribeOn()源碼:

    public final Observable<T> subscribeOn(Scheduler scheduler) {
    //判空略過(guò)
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        //拋開(kāi)Hook趴泌,重點(diǎn)還是ObservableSubscribeOn
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

等等舟舒,怎么有種似曾相識(shí)的感覺(jué),大家可以把文章向上翻嗜憔,看看map()的源碼秃励。
subscribeOn()的套路如出一轍,那么我們根據(jù)上面的結(jié)論吉捶,
先猜測(cè)ObservableSubscribeOn類(lèi)也是一個(gè)包裝類(lèi)(裝飾者)夺鲜,點(diǎn)進(jìn)去查看:

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    //保存線程調(diào)度器
    final Scheduler scheduler;
    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        //map的源碼中我們分析過(guò),super()只是簡(jiǎn)單的保存ObservableSource
        super(source);
        this.scheduler = scheduler;
    }
    @Override
    public void subscribeActual(final Observer<? super T> s) {
        //1  創(chuàng)建一個(gè)包裝Observer
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        //2  手動(dòng)調(diào)用 下游(終點(diǎn))Observer.onSubscribe()方法,所以onSubscribe()方法執(zhí)行在 訂閱處所在的線程
        s.onSubscribe(parent);
        //3 setDisposable()是為了將子線程的操作加入Disposable管理中
        parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
            @Override
            public void run() {
            //4 此時(shí)已經(jīng)運(yùn)行在相應(yīng)的Scheduler 的線程中
                source.subscribe(parent);
            }
        }));
    }

和map套路大體一致呐舔,ObservableSubscribeOn自身同樣是個(gè)包裝類(lèi)币励,同樣繼承AbstractObservableWithUpstream
創(chuàng)建了一個(gè)SubscribeOnObserver類(lèi)珊拼,該類(lèi)按照套路食呻,應(yīng)該也是實(shí)現(xiàn)了ObserverDisposable接口的包裝類(lèi)澎现,讓我們看一下:

    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
        //真正的下游(終點(diǎn))觀察者
        final Observer<? super T> actual;
        //用于保存上游的Disposable仅胞,以便在自身dispose時(shí),連同上游一起dispose
        final AtomicReference<Disposable> s;
        
        SubscribeOnObserver(Observer<? super T> actual) {
            this.actual = actual;
            this.s = new AtomicReference<Disposable>();
        }

        @Override
        public void onSubscribe(Disposable s) {
            //onSubscribe()方法由上游調(diào)用剑辫,傳入Disposable饼问。在本類(lèi)中賦值給this.s,加入管理揭斧。
            DisposableHelper.setOnce(this.s, s);
        }
        
        //直接調(diào)用下游觀察者的對(duì)應(yīng)方法
        @Override
        public void onNext(T t) {
            actual.onNext(t);
        }
        @Override
        public void onError(Throwable t) {
            actual.onError(t);
        }
        @Override
        public void onComplete() {
            actual.onComplete();
        }

        //取消訂閱時(shí)莱革,連同上游Disposable一起取消
        @Override
        public void dispose() {
            DisposableHelper.dispose(s);
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
        //這個(gè)方法在subscribeActual()中被手動(dòng)調(diào)用,為了將Schedulers返回的Worker加入管理
        void setDisposable(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }
    }

這兩個(gè)類(lèi)根據(jù)上一節(jié)的鋪墊加上注釋?zhuān)渌己美斫舛锟晕⒉缓美斫獾膽?yīng)該是下面兩句代碼:

        //ObservableSubscribeOn類(lèi)
        //3 setDisposable()是為了將子線程的操作加入Disposable管理中
        parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
            @Override
            public void run() {
            //4 此時(shí)已經(jīng)運(yùn)行在相應(yīng)的Scheduler 的線程中
                source.subscribe(parent);
            }
        }));

        //SubscribeOnObserver類(lèi)
        //這個(gè)方法在subscribeActual()中被手動(dòng)調(diào)用盅视,為了將Schedulers返回的Worker加入管理
        void setDisposable(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }

其中scheduler.scheduleDirect(new Runnable()..)方法源碼如下:

    /**
     * Schedules the given task on this scheduler non-delayed execution.
     * .....
     */
    public Disposable scheduleDirect(Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }

從注釋和方法名我們可以看出,這個(gè)傳入的Runnable會(huì)立刻執(zhí)行旦万。
再繼續(xù)往里面看:

    public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
        //class Worker implements Disposable 闹击,Worker本身是實(shí)現(xiàn)了Disposable  
        final Worker w = createWorker();
        //hook略過(guò)
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        //開(kāi)始在Worker的線程執(zhí)行任務(wù),
        w.schedule(new Runnable() {
            @Override
            public void run() {
                try {
                //調(diào)用的是 run()不是 start()方法執(zhí)行的線程的方法成艘。
                    decoratedRun.run();
                } finally {
                //執(zhí)行完畢會(huì) dispose()
                    w.dispose();
                }
            }
        }, delay, unit);
        //返回Worker對(duì)象
        return w;
    }

createWorker()是一個(gè)抽象方法赏半,由具體的Scheduler類(lèi)實(shí)現(xiàn)贺归,例如IoScheduler對(duì)應(yīng)的Schedulers.io().

    public abstract Worker createWorker();

初看源碼,為了了解大致流程断箫,不宜過(guò)入深入拂酣,先點(diǎn)到為止。
OK仲义,現(xiàn)在我們總結(jié)一下scheduler.scheduleDirect(new Runnable()..)的重點(diǎn):

  1. 傳入的Runnable立刻執(zhí)行的婶熬。
  2. 返回的Worker對(duì)象就是一個(gè)Disposable對(duì)象
  3. Runnable執(zhí)行時(shí)埃撵,是直接手動(dòng)調(diào)用的 run()赵颅,而不是 start()方法.
  4. 上一點(diǎn)應(yīng)該是為了,能控制在run()結(jié)束后(包括異常終止)暂刘,都會(huì)自動(dòng)執(zhí)行Worker.dispose().

而返回的Worker對(duì)象也會(huì)被parent.setDisposable(...)加入管理中饺谬,以便在手動(dòng)dispose()時(shí)能取消線程里的工作。

我們總結(jié)一下subscribeOn(Schedulers.xxx())過(guò)程

  1. 返回一個(gè)ObservableSubscribeOn包裝類(lèi)對(duì)象
  2. 上一步返回的對(duì)象被訂閱時(shí)谣拣,回調(diào)該類(lèi)中的subscribeActual()方法募寨,在其中會(huì)立刻將線程切換到對(duì)應(yīng)的Schedulers.xxx()線程。
  3. 在切換后的線程中芝发,執(zhí)行source.subscribe(parent);绪商,對(duì)上游(終點(diǎn))Observable訂閱
  4. 上游(終點(diǎn))Observable開(kāi)始發(fā)送數(shù)據(jù),根據(jù)RxJava2 源碼解析(一)辅鲸,上游發(fā)送數(shù)據(jù)僅僅是調(diào)用下游觀察者對(duì)應(yīng)的onXXX()方法而已格郁,所以此時(shí)操作是在切換后的線程中進(jìn)行

一點(diǎn)擴(kuò)展独悴,
大家可能看過(guò)一個(gè)結(jié)論:
subscribeOn(Schedulers.xxx())切換線程N(yùn)次例书,總是以第一次為準(zhǔn),或者說(shuō)離源Observable最近的那次為準(zhǔn)刻炒,并且對(duì)其上面的代碼生效(這一點(diǎn)對(duì)比的ObserveOn())决采。

為什么?

  • 因?yàn)楦鶕?jù)RxJava2 源碼解析(一)中提到坟奥,訂閱流程從下游往上游傳遞
  • subscribeActual()里開(kāi)啟了Scheduler的工作树瞭,source.subscribe(parent);,從這一句開(kāi)始切換了線程,所以在這之上的代碼都是在切換后的線程里的了爱谁。
  • 但如果連續(xù)切換晒喷,最上面的切換最晚執(zhí)行,此時(shí)線程變成了最上面的subscribeOn(xxxx)指定的線程,
  • 數(shù)據(jù)push時(shí)访敌,是從上游到下游的凉敲,所以會(huì)在離源頭最近的那次subscribeOn(xxxx)的線程里push數(shù)據(jù)(onXXX())給下游。

可寫(xiě)如下代碼驗(yàn)證:

Observable.create(new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(ObservableEmitter<String> e) throws Exception {
                        Log.d(TAG, "subscribe() called with: e = [" + e + "]" + Thread.currentThread());
                        e.onNext("1");
                        e.onComplete();
                    }
                }).subscribeOn(Schedulers.io())
                        .map(new Function<String, String>() {
                            @Override
                            public String apply(String s) throws Exception {
                                //依然是io線程
                                Log.d(TAG, "apply() called with: s = [" + s + "]" + Thread.currentThread());
                                return s;
                            }
                        })
                        .subscribeOn(Schedulers.computation())
                        .subscribe(new Observer<String>() {
                            @Override
                            public void onSubscribe(Disposable d) {
                                Log.d(TAG, "onSubscribe() called with: d = [" + d + "]");
                            }
                            @Override
                            public void onNext(String value) {
                                Log.d(TAG, "onNext() called with: value = [" + value + "]");
                            }
                            @Override
                            public void onError(Throwable e) {
                                Log.d(TAG, "onError() called with: e = [" + e + "]");
                            }
                            @Override
                            public void onComplete() {
                                Log.d(TAG, "onComplete() called");
                            }
                        });

線程調(diào)度observeOn

在上一節(jié)的基礎(chǔ)上,增加一個(gè)observeOn(AndroidSchedulers.mainThread()),就完成了觀察者線程的切換爷抓。

                        .subscribeOn(Schedulers.computation())
                        //在上一節(jié)的基礎(chǔ)上势决,增加一個(gè)ObserveOn
                        .observeOn(AndroidSchedulers.mainThread())
                        .subscribe(new Observer<String>() {

繼續(xù)看源碼吧,我已經(jīng)能猜出來(lái)了蓝撇,hook+new XXXObservable();

    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }
    
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ....
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

果然果复,查看ObservableObserveOn,:
高能預(yù)警,這部分的代碼 有些略多唉地,建議讀者打開(kāi)源碼邊看邊讀据悔。

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    //本例是 AndroidSchedulers.mainThread()
    final Scheduler scheduler;
    //默認(rèn)false
    final boolean delayError;
    //默認(rèn)128
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        // false
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            //1 創(chuàng)建出一個(gè) 主線程的Worker
            Scheduler.Worker w = scheduler.createWorker();
            //2 訂閱上游數(shù)據(jù)源传透, 
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

本例中耘沼,就是兩步:

  1. 創(chuàng)建一個(gè)AndroidSchedulers.mainThread()對(duì)應(yīng)的Worker
  2. ObserveOnObserver訂閱上游數(shù)據(jù)源。這樣當(dāng)數(shù)據(jù)從上游push下來(lái)朱盐,會(huì)由ObserveOnObserver對(duì)應(yīng)的onXXX()處理群嗤。
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {
        //下游的觀察者
        final Observer<? super T> actual;
        //對(duì)應(yīng)Scheduler里的Worker
        final Scheduler.Worker worker;
        //上游被觀察者 push 過(guò)來(lái)的數(shù)據(jù)都存在這里
        SimpleQueue<T> queue;
        Disposable s;
        //如果onError了,保存對(duì)應(yīng)的異常
        Throwable error;
        //是否完成
        volatile boolean done;
        //是否取消
        volatile boolean cancelled;
        // 代表同步發(fā)送 異步發(fā)送 
        int sourceMode;
        ....
        @Override
        public void onSubscribe(Disposable s) {
            if (DisposableHelper.validate(this.s, s)) {
                this.s = s;
                //省略大量無(wú)關(guān)代碼
                //創(chuàng)建一個(gè)queue 用于保存上游 onNext() push的數(shù)據(jù)
                queue = new SpscLinkedArrayQueue<T>(bufferSize);
                //回調(diào)下游觀察者onSubscribe方法
                actual.onSubscribe(this);
            }
        }

        @Override
        public void onNext(T t) {
            //1 執(zhí)行過(guò)error / complete 會(huì)是true
            if (done) {
                return;
            }
            //2 如果數(shù)據(jù)源類(lèi)型不是異步的兵琳, 默認(rèn)不是
            if (sourceMode != QueueDisposable.ASYNC) {
                //3 將上游push過(guò)來(lái)的數(shù)據(jù) 加入 queue里
                queue.offer(t);
            }
            //4 開(kāi)始進(jìn)入對(duì)應(yīng)Workder線程狂秘,在線程里 將queue里的t 取出 發(fā)送給下游Observer
            schedule();
        }

        @Override
        public void onError(Throwable t) {
            //已經(jīng)done 會(huì) 拋異常 和 上一篇文章里提到的一樣
            if (done) {
                RxJavaPlugins.onError(t);
                return;
            }
            //給error存?zhèn)€值 
            error = t;
            done = true;
            //開(kāi)始調(diào)度
            schedule();
        }

        @Override
        public void onComplete() {
        //已經(jīng)done 會(huì) 返回  不會(huì)crash 和上一篇文章里提到的一樣
            if (done) {
                return;
            }
            done = true;
            //開(kāi)始調(diào)度
            schedule();
        }
        
        void schedule() {
            if (getAndIncrement() == 0) {
                //該方法需要傳入一個(gè)線程, 注意看本類(lèi)實(shí)現(xiàn)了Runnable的接口躯肌,所以查看對(duì)應(yīng)的run()方法
                worker.schedule(this);
            }
        }
        //從這里開(kāi)始者春,這個(gè)方法已經(jīng)是在Workder對(duì)應(yīng)的線程里執(zhí)行的了
        @Override
        public void run() {
            //默認(rèn)是false
            if (outputFused) {
                drainFused();
            } else {
                //取出queue里的數(shù)據(jù) 發(fā)送
                drainNormal();
            }
        }


        void drainNormal() {
            int missed = 1;

            final SimpleQueue<T> q = queue;
            final Observer<? super T> a = actual;

            for (;;) {
                // 1 如果已經(jīng) 終止 或者queue空,則跳出函數(shù)清女,
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }
               
                for (;;) {
                    boolean d = done;
                    T v;

                    try {
                        //2 從queue里取出一個(gè)值
                        v = q.poll();
                    } catch (Throwable ex) {
                        //3 異常處理 并跳出函數(shù)
                        Exceptions.throwIfFatal(ex);
                        s.dispose();
                        q.clear();
                        a.onError(ex);
                        return;
                    }
                    boolean empty = v == null;
                    //4 再次檢查 是否 終止  如果滿足條件 跳出函數(shù)
                    if (checkTerminated(d, empty, a)) {
                        return;
                    }
                    //5 上游還沒(méi)結(jié)束數(shù)據(jù)發(fā)送钱烟,但是這邊處理的隊(duì)列已經(jīng)是空的,不會(huì)push給下游 Observer
                    if (empty) {
                        //僅僅是結(jié)束這次循環(huán)嫡丙,不發(fā)送這個(gè)數(shù)據(jù)而已拴袭,并不會(huì)跳出函數(shù)
                        break;
                    }
                    //6 發(fā)送給下游了
                    a.onNext(v);
                }
                
                //7 對(duì)不起這里我也不是很明白,大致猜測(cè)是用于 同步原子操作 如有人知道 煩請(qǐng)告知 
                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }

        //檢查 是否 已經(jīng) 結(jié)束(error complete)曙博, 是否沒(méi)數(shù)據(jù)要發(fā)送了(empty 空)拥刻, 
        boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
            //如果已經(jīng)disposed 
            if (cancelled) {
                queue.clear();
                return true;
            }
            // 如果已經(jīng)結(jié)束
            if (d) {
                Throwable e = error;
                //如果是延遲發(fā)送錯(cuò)誤
                if (delayError) {
                    //如果空
                    if (empty) {
                        if (e != null) {
                            a.onError(e);
                        } else {
                            a.onComplete();
                        }
                        //停止worker(線程)
                        worker.dispose();
                        return true;
                    }
                } else {
                    //發(fā)送錯(cuò)誤
                    if (e != null) {
                        queue.clear();
                        a.onError(e);
                        worker.dispose();
                        return true;
                    } else
                    //發(fā)送complete
                    if (empty) {
                        a.onComplete();
                        worker.dispose();
                        return true;
                    }
                }
            }
            return false;
        }
    }

核心處都加了注釋?zhuān)偨Y(jié)起來(lái)就是,

  1. ObserveOnObserver實(shí)現(xiàn)了ObserverRunnable接口父泳。
  2. onNext()里般哼,先不切換線程,將數(shù)據(jù)加入隊(duì)列queue惠窄。然后開(kāi)始切換線程蒸眠,在另一線程中,queue里取出數(shù)據(jù)睬捶,push給下游Observer
  3. onError() onComplete()除了和RxJava2 源碼解析(一)提到的一樣特性之外黔宛,也是將錯(cuò)誤/完成信息先保存,切換線程后再發(fā)送。
  4. 所以observeOn()影響的是其下游的代碼臀晃,且多次調(diào)用仍然生效觉渴。
  5. 因?yàn)槠淝?strong>換線程代碼是在ObserveronXXX()做的,這是一個(gè)主動(dòng)的push行為(影響下游)徽惋。
  6. 關(guān)于多次調(diào)用生效問(wèn)題案淋。對(duì)比subscribeOn()切換線程是在subscribeActual()里做的,只是主動(dòng)切換了上游的訂閱線程险绘,從而影響其發(fā)射數(shù)據(jù)時(shí)所在的線程踢京。而直到真正發(fā)射數(shù)據(jù)之前,任何改變線程的行為宦棺,都會(huì)生效(影響發(fā)射數(shù)據(jù)的線程)瓣距。所以subscribeOn()只生效一次。observeOn()是一個(gè)主動(dòng)的行為代咸,并且切換線程后會(huì)立刻發(fā)送數(shù)據(jù)蹈丸,所以會(huì)生效多次.

轉(zhuǎn)載請(qǐng)標(biāo)明出處:
http://www.reibang.com/p/6ef45f8ee79d
本文出自:【張旭童的簡(jiǎn)書(shū)】 (http://www.reibang.com/users/8e91ff99b072/latest_articles)

總結(jié)

本文帶大家走讀分析了三個(gè)東西:

map操作符原理:

  • 內(nèi)部對(duì)上游Observable進(jìn)行訂閱
  • 內(nèi)部訂閱者接收到數(shù)據(jù)后,將數(shù)據(jù)轉(zhuǎn)換呐芥,發(fā)送給下游Observer.
  • 操作符返回的Observable其內(nèi)部訂閱者逻杖、是裝飾者模式的體現(xiàn)。
  • 操作符數(shù)據(jù)變換的操作思瘟,也是發(fā)生在訂閱后荸百。

線程調(diào)度subscribeOn()

  • 內(nèi)部先切換線程,在切換后的線程中對(duì)上游Observable進(jìn)行訂閱滨攻,這樣上游發(fā)送數(shù)據(jù)時(shí)就是處于被切換后的線程里了够话。
  • 也因此多次切換線程最后一次切換(離源數(shù)據(jù)最近)的生效铡买。
  • 內(nèi)部訂閱者接收到數(shù)據(jù)后更鲁,直接發(fā)送給下游Observer.
  • 引入內(nèi)部訂閱者是為了控制線程(dispose)
  • 線程切換發(fā)生在Observable中。

線程調(diào)度observeOn():

  • 使用裝飾的Observer對(duì)上游Observable進(jìn)行訂閱
  • ObserveronXXX()方法里奇钞,將待發(fā)送數(shù)據(jù)存入隊(duì)列澡为,同時(shí)請(qǐng)求切換線程處理真正push數(shù)據(jù)給下游。
  • 多次切換線程景埃,都會(huì)對(duì)下游生效媒至。

源碼里那些實(shí)現(xiàn)了Runnable的類(lèi)或者匿名內(nèi)部類(lèi),最終并沒(méi)有像往常那樣被丟給Thread類(lèi)執(zhí)行谷徙。
而是先切換線程拒啰,再直接執(zhí)行Runnablerun()方法。
這也加深了我對(duì)面向?qū)ο笸昊郏瑢?duì)抽象谋旦、Runnable的理解,它就是一個(gè)簡(jiǎn)簡(jiǎn)單單的接口,里面就一個(gè)簡(jiǎn)簡(jiǎn)單單的run()册着,
我認(rèn)為拴孤,之所以有Runnable,只是抽象出 一個(gè)可運(yùn)行的任務(wù)的概念甲捏。
也許這句話很平淡演熟,書(shū)上也會(huì)提到,各位大佬早就知道司顿,但是如今我順著RxJava2的源碼這么走讀了一遍芒粹,確真真切切的感受到了這些設(shè)計(jì)思想的美妙。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末大溜,一起剝皮案震驚了整個(gè)濱河市化漆,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,464評(píng)論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件踩叭,死亡現(xiàn)場(chǎng)離奇詭異耿币,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)棺聊,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,033評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門(mén)伞租,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人限佩,你說(shuō)我怎么就攤上這事葵诈。” “怎么了祟同?”我有些...
    開(kāi)封第一講書(shū)人閱讀 169,078評(píng)論 0 362
  • 文/不壞的土叔 我叫張陵作喘,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我晕城,道長(zhǎng)泞坦,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 59,979評(píng)論 1 299
  • 正文 為了忘掉前任砖顷,我火速辦了婚禮贰锁,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘滤蝠。我一直安慰自己豌熄,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,001評(píng)論 6 398
  • 文/花漫 我一把揭開(kāi)白布物咳。 她就那樣靜靜地躺著锣险,像睡著了一般。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上芯肤,一...
    開(kāi)封第一講書(shū)人閱讀 52,584評(píng)論 1 312
  • 那天夯接,我揣著相機(jī)與錄音,去河邊找鬼纷妆。 笑死盔几,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的掩幢。 我是一名探鬼主播逊拍,決...
    沈念sama閱讀 41,085評(píng)論 3 422
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼际邻!你這毒婦竟也來(lái)了芯丧?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 40,023評(píng)論 0 277
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤世曾,失蹤者是張志新(化名)和其女友劉穎缨恒,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體轮听,經(jīng)...
    沈念sama閱讀 46,555評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡骗露,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,626評(píng)論 3 342
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了血巍。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片萧锉。...
    茶點(diǎn)故事閱讀 40,769評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖述寡,靈堂內(nèi)的尸體忽然破棺而出柿隙,到底是詐尸還是另有隱情,我是刑警寧澤鲫凶,帶...
    沈念sama閱讀 36,439評(píng)論 5 351
  • 正文 年R本政府宣布禀崖,位于F島的核電站,受9級(jí)特大地震影響螟炫,放射性物質(zhì)發(fā)生泄漏波附。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,115評(píng)論 3 335
  • 文/蒙蒙 一不恭、第九天 我趴在偏房一處隱蔽的房頂上張望叶雹。 院中可真熱鬧,春花似錦换吧、人聲如沸折晦。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,601評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)满着。三九已至谦炒,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間风喇,已是汗流浹背宁改。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,702評(píng)論 1 274
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留魂莫,地道東北人还蹲。 一個(gè)月前我還...
    沈念sama閱讀 49,191評(píng)論 3 378
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像耙考,于是被迫代替她去往敵國(guó)和親谜喊。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,781評(píng)論 2 361

推薦閱讀更多精彩內(nèi)容