RxJava源碼分析之線程調度(二)

在上一篇文章當中我們把RxJava的上游線程切換的源碼都大致梳理了一遍惜辑,如果還沒有看的請猛戳這里,但是光有上游的線程切換是不足以讓我們完成在實際項目中的應用的譬正,絕大多數(shù)時候我們都需要在下游進行線程的切換來處理上游在其他線程中得到的結果宫补。所以現(xiàn)在我們就來分析一下RxJava源碼中是如何實現(xiàn)對下游線程的切換控制管理的檬姥。

這里我們一切換到Android主線程為例:

.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())

現(xiàn)在就從observeOn(AndroidSchedulers.mainThread())入手,探探究竟。
首先我們來看一下RxJava是如何得到一個Android主線程的Scheduler的即HandlerScheduler粉怕。我們點進源碼看一下:

/** Android-specific Schedulers. */
public final class AndroidSchedulers {

    private static final class MainHolder {
        //創(chuàng)建一個Handle拿到主線程的Looper 創(chuàng)建默認的 HandlerScheduler
        static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    }

    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call() throws Exception {
                    //該Callable默認返回的就是上面的HandleScheduler
                    return MainHolder.DEFAULT;
                }
            });

    /** A {@link Scheduler} which executes actions on the Android main thread. */
    public static Scheduler mainThread() {
        //這里就是入口 可以看到其實該方法是直接獲取到了一個靜態(tài)的Scheduler常量健民。
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }

    /** A {@link Scheduler} which executes actions on {@code looper}. */
    public static Scheduler from(Looper looper) {
        if (looper == null) throw new NullPointerException("looper == null");
        return new HandlerScheduler(new Handler(looper));
    }

    private AndroidSchedulers() {
        throw new AssertionError("No instances.");
    }
}

好了現(xiàn)在Scheduler有了,我們繼續(xù)分析observeOn方法贫贝。

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

看到了吧秉犹,RxJava所有的代碼基本都是一致的,橋接模式稚晚,這里看到是創(chuàng)建了一個ObservableObserveOn對象崇堵,當然第二個參數(shù)默認是false,表明了如果執(zhí)行了onError() 將會重新發(fā)送一遍上游的事件序列客燕,第三個參數(shù)是緩存的大小默認是128鸳劳。我們點進ObservableObserveOn的構造方法看看里面都做了什么,很關鍵也搓。

//可以看到套路基本都是一樣的棍辕, ObservableObserveOn<T> 同樣是繼承于AbstractObservableWithUpstream<T, T> ,用來保存上游的原事件流。
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }
//訂閱的真正發(fā)生之處
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {//肯定不是這個Scheduler啊还绘,我們這里是HandleScheduler
            source.subscribe(observer);
        } else {
            //創(chuàng)建HandlerScheduler的Worker,HandlerWorker.
            Scheduler.Worker w = scheduler.createWorker();
            //上游事件和下游事件產生訂閱,這里又是一個包裝類ObserveOnObserver包裝了下游真正的Observer栖袋。
           //我們到ObserverOnObserver里面去看看拍顷,其是一個靜態(tài)內部類
          //這里是把worker,delayError塘幅,bufferSizew也都傳了進去
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
  //實現(xiàn)了Runnable接口姥卢,繼承于BasicIntQueueDisposable
    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {

        private static final long serialVersionUID = 6576896619930983584L;
        final Observer<? super T> actual;
        final Scheduler.Worker worker;
        final boolean delayError;
        final int bufferSize;

        SimpleQueue<T> queue;

        Disposable s;

        Throwable error;
        volatile boolean done;

        volatile boolean cancelled;

        int sourceMode;

        boolean outputFused;

        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.actual = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }

        @Override
        public void onSubscribe(Disposable s) {
            if (DisposableHelper.validate(this.s, s)) {
                this.s = s;
                if (s instanceof QueueDisposable) {
                    @SuppressWarnings("unchecked")
                    QueueDisposable<T> qd = (QueueDisposable<T>) s;

                    int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);

                    if (m == QueueDisposable.SYNC) {
                        sourceMode = m;
                        queue = qd;
                        done = true;
                        actual.onSubscribe(this);
                        schedule();
                        return;
                    }
                    if (m == QueueDisposable.ASYNC) {
                        sourceMode = m;
                        queue = qd;
                        actual.onSubscribe(this);
                        return;
                    }
                }
              //事件的緩存隊列 確定了緩存隊列的大小
                queue = new SpscLinkedArrayQueue<T>(bufferSize);
              //執(zhí)行真正的onSubscribe方法
                actual.onSubscribe(this);
            }
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
         //開始調度    
        schedule();
        }

        @Override
        public void onError(Throwable t) {
            if (done) {
                RxJavaPlugins.onError(t);
                return;
            }
            error = t;
            done = true;
             //開始調度 
            schedule();
        }

        @Override
        public void onComplete() {
            if (done) {
                return;
            }
            done = true;//已經(jīng)完成
            //開始調度  
            schedule();
        }
      //取消訂閱
        @Override
        public void dispose() {
            if (!cancelled) {
                cancelled = true;
                s.dispose();
                worker.dispose();
                if (getAndIncrement() == 0) {
                    queue.clear();
                }
            }
        }
      //判斷是否被取消訂閱
        @Override
        public boolean isDisposed() {
            return cancelled;
        }
      //執(zhí)行調度的方法 
        void schedule() {
            if (getAndIncrement() == 0) {
              //傳入當前ObserveOnObserver對象漓帚,其實現(xiàn)了Runnable接口
                worker.schedule(this);
            }
        }
      
        void drainNormal() {
            int missed = 1;
            //緩存數(shù)據(jù)的隊列
            final SimpleQueue<T> q = queue;
            //實際下游的Observer
            final Observer<? super T> a = actual;

            for (;;) {
                //檢測事件是否被終止,如果終止了直接跳出循環(huán)
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }

                for (;;) {
                    //標記事件是否完成
                    boolean d = done;
                    T v;

                    try {
                        //拿到隊列里的第一個事件
                        v = q.poll();
                    } catch (Throwable ex) {
                        //發(fā)生異常了 做一系列的后續(xù)動作
                        //取消訂閱,隊列的制空亮蒋,發(fā)送異常事件,取消線程調度懒震,最后跳出循環(huán)
                        Exceptions.throwIfFatal(ex);
                        s.dispose();
                        q.clear();
                        a.onError(ex);
                        worker.dispose();
                        return;
                    }
                    //判斷事件是否為空
                    boolean empty = v == null;

                    if (checkTerminated(d, empty, a)) {
                        return;
                    }
                    //為空直接進入下一輪循環(huán)
                    //因為上游的事件處理也是需要時間的鳄厌,上游的執(zhí)行有可能是非常大量的數(shù)據(jù)所以可能會出現(xiàn)緩存隊列里面暫時沒有事件,所以這里需要一直進行循環(huán)去等待新的事件產生
                    if (empty) {
                        break;
                    }
                    //發(fā)送事件
                    a.onNext(v);
                }
              //下面這段代碼我也不是很確定他的意思拼缝,這里我說一下我自己的理解不知道正不正確:
              //因為ObserveOnObserver是繼承于BasicIntQueueDisposable 娱局,而BasicIntQueueDisposable 又繼承了AtomicInteger,一個原子操作類
            //用一個Integer整數(shù)來控制當前ObserveOnObserver對象的并發(fā)操作
            //如果當前ObserveOnObserver對象沒有被其他線程獨占咧七,那么該對象就自己持有的話(代表已經(jīng)執(zhí)行完了當前的事件)衰齐,就可以執(zhí)行addAndGet(int i)方法了。
            //執(zhí)行完改方法對自己的負數(shù)相加那么最終得出的是0继阻,為0的話就可以開始下一個循環(huán)了耻涛,那么以后的每一個循環(huán)missed的值都為0都可以直接break废酷!
            //最終要的是addAndGet()是一個阻塞式的方法,如果不成功的話抹缕,它會重新執(zhí)行一遍
          //所以我分析得出這里其實是一個控制標記位“好了澈蟆!現(xiàn)在輪到你了,開始吧”當?shù)谝淮文玫綑嘞藓缶涂梢砸恢眻?zhí)行下去了歉嗓。
        
                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }

        void drainFused() {
   ...........
        }
        //具體的run方法內部
        @Override
        public void run() {
            if (outputFused) {
                drainFused();
            } else {
                //去處理隊列里面緩存的數(shù)據(jù)
                drainNormal();
            }
        }
        //檢查是否終止  代碼都很簡單 我就不做注釋了
        boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
            if (cancelled) {
                queue.clear();
                return true;
            }
            if (d) {
                Throwable e = error;
                if (delayError) {
                    if (empty) {
                        if (e != null) {
                            a.onError(e);
                        } else {
                            a.onComplete();
                        }
                        worker.dispose();
                        return true;
                    }
                } else {
                    if (e != null) {
                        queue.clear();
                        a.onError(e);
                        worker.dispose();
                        return true;
                    } else
                    if (empty) {
                        a.onComplete();
                        worker.dispose();
                        return true;
                    }
                }
            }
            return false;
        }
    }
}

同樣是裝飾模式丰介,關鍵就是每當執(zhí)行onNext(),onError(),onCompleted()方法的時候,都會開啟線程的調度鉴分,上游的每一次事件哮幢,都會在指定線程中處理,這就是核心志珍。然后就執(zhí)行了具體的Worker實現(xiàn)類里面的schedule方法橙垢,我們一起看一下。

//HandlerWorker里面的schedule方法伦糯,其第二個參數(shù)為0L柜某,第三個參數(shù)為TimeUnit.NANOSECONDS。
 @Override
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            if (run == null) throw new NullPointerException("run == null");
            if (unit == null) throw new NullPointerException("unit == null");

            if (disposed) {
                //判斷是否取消訂閱了
                return Disposables.disposed();
            }
            //滿篇飛的Hook函數(shù) +_+
            run = RxJavaPlugins.onSchedule(run);
            //封裝當前持有主線程Looper的handler和ObserveOnObserver對象
            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
            //創(chuàng)建Message
            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.
            //給主線程發(fā)送消息
            handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));

            // Re-check disposed state for removing in case we were racing a call to dispose().
            //判斷是否取消訂閱了
            if (disposed) {
            //如果取消訂閱了 就remove掉消息處理的回調接口
                handler.removeCallbacks(scheduled);
                return Disposables.disposed();
            }

            return scheduled;
        }

當然了最后主線程的執(zhí)行的程序是ScheduledRunnable里面的run()方法敛纲,代碼如下:

 @Override
        public void run() {
            try {
              //ObserveOnObserver對象的run方法
                delegate.run();
            } catch (Throwable t) {
                //捕獲異常了進行一系列處理
                IllegalStateException ie =
                    new IllegalStateException("Fatal Exception thrown on Scheduler.", t);
                RxJavaPlugins.onError(ie);
                Thread thread = Thread.currentThread();
                thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
            }
        }

這樣RxJava就實現(xiàn)了把上游發(fā)送的每一個事件都巧妙地轉換到了指定線程中處理喂击,此處是Android主線程。
可以看到如果你在下游多次調用observeon()的話線程是會一直切換的淤翔,這也是網(wǎng)上一直說的結論翰绊。每一次切換線程,都會把對應的Observer對象的各項處理方法的處理執(zhí)行在制定線程當中旁壮。
大概瀏覽完源碼你會發(fā)現(xiàn)监嗜,RxJava的設計者真的是把面向對象的思想用到了極致,抽象接口與實體抡谐,設計模式地巧用都無處不在裁奇,感嘆自己要學的真的還有太多,如果讓我來寫不知道還要多少年才能寫出如此牛B的代碼麦撵。
這也算是我第一次寫源碼分析的文章刽肠,還有很多地方有待提高,最開始聽說別人源碼分析很重要免胃,不光要會用那些優(yōu)秀的Library更要理解其中的精髓五垮,與是我傻乎乎地悶著腦袋去看,結果真的看不懂杜秸,后來看了一本書叫做《Android源碼設計模式》才恍然大悟放仗,設計模式地巧用在各大優(yōu)秀的開源Library中無處不在,只有真正地理解了設計模式撬碟,精通架構诞挨,才能寫出如此優(yōu)秀的代碼莉撇。最后再安利一本書《設計模式之禪》這本書很有意思,作者語言幽默風趣惶傻,像看連環(huán)畫一樣很有意思棍郎。
哈哈 廢話說了一大堆了,如果上面我的分析有誤的話银室,歡迎指正批評涂佃,有什么不懂得地方也可以一起探討。

最后

沒有最后了蜈敢,大家再見~~~

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末辜荠,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子抓狭,更是在濱河造成了極大的恐慌伯病,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,807評論 6 518
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件否过,死亡現(xiàn)場離奇詭異午笛,居然都是意外死亡,警方通過查閱死者的電腦和手機苗桂,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,284評論 3 399
  • 文/潘曉璐 我一進店門药磺,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人煤伟,你說我怎么就攤上這事与涡。” “怎么了持偏?”我有些...
    開封第一講書人閱讀 169,589評論 0 363
  • 文/不壞的土叔 我叫張陵,是天一觀的道長氨肌。 經(jīng)常有香客問我鸿秆,道長,這世上最難降的妖魔是什么怎囚? 我笑而不...
    開封第一講書人閱讀 60,188評論 1 300
  • 正文 為了忘掉前任卿叽,我火速辦了婚禮,結果婚禮上恳守,老公的妹妹穿的比我還像新娘考婴。我一直安慰自己,他們只是感情好催烘,可當我...
    茶點故事閱讀 69,185評論 6 398
  • 文/花漫 我一把揭開白布沥阱。 她就那樣靜靜地躺著,像睡著了一般伊群。 火紅的嫁衣襯著肌膚如雪考杉。 梳的紋絲不亂的頭發(fā)上策精,一...
    開封第一講書人閱讀 52,785評論 1 314
  • 那天,我揣著相機與錄音崇棠,去河邊找鬼咽袜。 笑死,一個胖子當著我的面吹牛枕稀,可吹牛的內容都是我干的询刹。 我是一名探鬼主播,決...
    沈念sama閱讀 41,220評論 3 423
  • 文/蒼蘭香墨 我猛地睜開眼萎坷,長吁一口氣:“原來是場噩夢啊……” “哼凹联!你這毒婦竟也來了?” 一聲冷哼從身側響起食铐,我...
    開封第一講書人閱讀 40,167評論 0 277
  • 序言:老撾萬榮一對情侶失蹤匕垫,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后虐呻,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體象泵,經(jīng)...
    沈念sama閱讀 46,698評論 1 320
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 38,767評論 3 343
  • 正文 我和宋清朗相戀三年斟叼,在試婚紗的時候發(fā)現(xiàn)自己被綠了偶惠。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,912評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡朗涩,死狀恐怖忽孽,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情谢床,我是刑警寧澤兄一,帶...
    沈念sama閱讀 36,572評論 5 351
  • 正文 年R本政府宣布,位于F島的核電站识腿,受9級特大地震影響出革,放射性物質發(fā)生泄漏。R本人自食惡果不足惜渡讼,卻給世界環(huán)境...
    茶點故事閱讀 42,254評論 3 336
  • 文/蒙蒙 一骂束、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧成箫,春花似錦展箱、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,746評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至,卻和暖如春账胧,著一層夾襖步出監(jiān)牢的瞬間竞慢,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,859評論 1 274
  • 我被黑心中介騙來泰國打工治泥, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留筹煮,地道東北人。 一個月前我還...
    沈念sama閱讀 49,359評論 3 379
  • 正文 我出身青樓居夹,卻偏偏與公主長得像败潦,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子准脂,可洞房花燭夜當晚...
    茶點故事閱讀 45,922評論 2 361

推薦閱讀更多精彩內容