RxJava源碼分析之線程調(diào)度(一)

RxJava強大的地方之一是他的鏈?zhǔn)秸{(diào)用樱报,輕松地在線程之間進(jìn)行切換骂际。這幾天也大概分析了一下RxJava的線程切換的主流程于是打算寫一篇文章及記錄一下葫慎。

我們使用RxJava進(jìn)行線程切換的場景很多時候都是在進(jìn)行網(wǎng)絡(luò)請求的時候进倍,在IO線程進(jìn)行網(wǎng)絡(luò)數(shù)據(jù)的請求處理福侈,最后在Android的主線程進(jìn)行請求數(shù)據(jù)的結(jié)果處理审磁。

.subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())

當(dāng)然因為這段代碼的使用場景太多我們還可以利用ObservableTransformer操作符對其進(jìn)行簡化

   public static <T>ObservableTransformer<T,T> io_main()
    {
        return new ObservableTransformer<T, T>() {
            @Override
            public ObservableSource<T> apply(@NonNull Observable<T> upstream) {
                return upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
            }
        };
    }

這樣我們在使用的時候就是這樣的:

.compose(RxTransformUtil.<Object>io_main())

是不是感覺方便了一丟丟

好了扯遠(yuǎn)了谈飒,現(xiàn)在來分析一下RxJava是如何做到線程的輕松調(diào)度的。
首先有幾個概念是非常重要的:
Scheduler官方的解釋是這樣的

A Scheduler is an object that specifies an API for scheduling units of work with or without delays or periodically. 

初步看來Scheduler就是一個任務(wù)調(diào)度器相當(dāng)于就是一個調(diào)度中心的指揮者态蒂。當(dāng)然它是一個抽象類就肯定了Scheduler有很多具體的實現(xiàn)類杭措,例如IO線程的具體調(diào)度器就是IoScheduler。就像調(diào)度中心指揮者有客運中心的指揮者钾恢,有機場中心的指揮者一樣分別有不同的實現(xiàn)類手素。
當(dāng)然現(xiàn)在只有指揮者是肯定不行的鸳址,光頭司令怎么得行?這個時候關(guān)鍵的Worker類出現(xiàn)了泉懦,Worker官方的解釋是這樣的

Sequential Scheduler for executing actions on a single thread or event loop.
Disposing the Scheduler.Worker cancels all outstanding work and allows resource cleanup.

可以看到Worker就是線程任務(wù)的具體執(zhí)行者了稿黍。和Scheduler一樣Worker同樣也是抽象類,在不同的Scheduler具體實現(xiàn)類里面Worker也有自己的具體實現(xiàn)類崩哩,例如在IoScheduler類里面巡球,Worker的具體實現(xiàn)類就是EventLoopWorker,它負(fù)責(zé)管理IO線程的具體操作邓嘹,接下來我們就找到切入點看一看RxJava源碼里面都做了什么酣栈。

這里我們就以最典型的IO線程和主線程之間的切換為例來分析,線程切換的代碼就是上面的代碼。
Scheduler是以工廠方法對外提供它具體的實現(xiàn)類的汹押。Schedulers.io()可以提供一個IoScheduler的對象矿筝。你可以往里面看最后源碼是如何進(jìn)行IoScheduler的創(chuàng)建的

//創(chuàng)建IoScheduler
static final class IoHolder {
        static final Scheduler DEFAULT = new IoScheduler();
    }
//接著就行了IoScheduler的一系列初始化,CachedWorkerPool地初始化 棚贾,并由RxThreadFactory進(jìn)行線程地創(chuàng)建跋涣,線程優(yōu)先級別設(shè)置,是否是守護(hù)進(jìn)程等等

現(xiàn)在IoScheduler有了鸟悴,我們就看subscribe里面到底做了什么

public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

Hook我們不用管陈辱,可以看到是把當(dāng)前ObservableCreater對象和IoScheduler一起傳進(jìn)了ObservableSubscribeoOn的構(gòu)造函數(shù)里面。進(jìn)入到ObservableSubscribeOn里面看看细诸。

//AbstractObservableWithUpstream只是用來保存上游的源事件流的沛贪,就是保存剛剛傳入進(jìn)來的ObservableCreater
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
//裝飾模式 把下游的Observer裝飾成SubscribeOnObserver
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);     //執(zhí)行下游Observer的onSubscribe(Disposable disposabel)方法,當(dāng)前線程是上游的執(zhí)行線程
        s.onSubscribe(parent);
//開啟的子線程最終是以帶Disposable的返回值返回的
//在這里是將子線程加入管理震贵,因為這里是并發(fā)操作所以使用了AtomicReference<Object>的院子操作類利赋,是一種效率高于synchronized的樂觀鎖,感興趣的可以自行上網(wǎng)搜索
//我們只用知道這里加入管理了以后方便在以后我們切斷上下游的時候可以將我們的子線程一同dispose().
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

        private static final long serialVersionUID = 8094547886072529208L;
        final Observer<? super T> actual;

        final AtomicReference<Disposable> s;

        SubscribeOnObserver(Observer<? super T> actual) {
            this.actual = actual;
            this.s = new AtomicReference<Disposable>();
        }
     
    //這中間的代碼和最基本的鏈?zhǔn)秸{(diào)用關(guān)系是一樣的猩系,只不過在onNext媚送、onError、onComplete中實際上還是調(diào)用的下游真正的onNext寇甸、onError塘偎、onComplete

        @Override
        public void onNext(T t) {
            actual.onNext(t);
        }
        @Override
        public void onError(Throwable t) {
            actual.onError(t);
        }
        @Override
        public void onComplete() {
            actual.onComplete();
        }
              void setDisposable(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }
    }
//這就是實際執(zhí)行的Runnable 會把其傳入IoScheduler中供Worker使用。
    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
/*看到了吧拿霉,SubscribeOnObserver 作用其實就是將源事件流發(fā)生的地點和下游的事件流處理的地點訂閱在了子線程中進(jìn)行處理吟秩。
這樣上游發(fā)送事件流的地方就被切換到了子線程中。*/
            source.subscribe(parent);
        }
    }
}

接下來我們仔細(xì)看一下上面代碼的這一段:

 @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        s.onSubscribe(parent);
//這里scheduler.schedlerDirect非常的重要绽淘,可以看到RxJava把剛剛包裝好的Runnable對象傳入了方法里
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

我們跟進(jìn)去看一下里面的具體實現(xiàn)

 @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run) {
//實際上是調(diào)用的下面3個參數(shù)的方法涵防,延遲時間為0
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }
 @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
//創(chuàng)建具體的Worker類
        final Worker w = createWorker();
//hook函數(shù)我們不用管,只要沒有設(shè)置依舊返回的是傳入的Runnable
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//將runnable和worker封裝到DisposeTask中
        DisposeTask task = new DisposeTask(decoratedRun, w);
//執(zhí)行Worker的schedule方法具體的就是EventLoopWorker里面的schedule方法
        w.schedule(task, delay, unit);

        return task;
    }

接下來我們來看一下EventLoopWorker里面的schedule方法是怎么實現(xiàn)的

 @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            //判斷是否解除訂閱
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }
            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }

可以看到這里如果沒有被解除訂閱的話又會執(zhí)行到NewThreadWorker的scheduleActual方法里面沪铭。

@NonNull
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        //hook函數(shù)我們這里不用管decoratedRun依然是傳進(jìn)來的Runnable對象run
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        //ScheduledRunnable是一個即實現(xiàn)了Runnable接口又實現(xiàn)了Callable接口的對象壮池,為了后面能成功加入到線程池當(dāng)中    
        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
        //將sr加入到CompositeDisposable中偏瓤,方便管理
        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }
     
        Future<?> f;
        try {
            if (delayTime <= 0) {
              //將sr加入到線程池當(dāng)中 并將線程的執(zhí)行結(jié)果返回給 Future<?> f
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);//對運行結(jié)果進(jìn)行處理
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                //在CompositeDisposable中一處剛剛加入的sr
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }
  
        return sr;
    }

接下來看一下ScheduledRunnable是如何對返回的結(jié)果進(jìn)行處理的

  public void setFuture(Future<?> f) {
//一個死循環(huán)會一直判斷返回回來的結(jié)果 因為其實原子操作類,樂觀鎖的機制決定了如果不是想要的結(jié)果的話會重新執(zhí)行一次
        for (;;) {
            Object o = get(FUTURE_INDEX);
            if (o == DONE) {
                //完成直接return
                return;
            }
              //如果取消訂閱了則直接取消線程任務(wù)
            if (o == DISPOSED) {
                f.cancel(get(THREAD_INDEX) != Thread.currentThread());
                return;
            }
            //前兩者都不滿足的話 就將future的值存下來
            if (compareAndSet(FUTURE_INDEX, o, f)) {
                return;
            }
        }
    }

到現(xiàn)在為止上游的線程切換大體的流程就分析的差不多了椰憋,我們從源碼中也可以分析出很多網(wǎng)上經(jīng)常說的一些結(jié)論硼补,最經(jīng)典的一條就是上游切換線程只有第一次生效,后面的線程切換都不起作用了熏矿,其實分析這點最重要的就是理解 ObservableSubscribeOn類里面下面的這段代碼了

final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;
        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }
        @Override
        public void run() {
            source.subscribe(parent);
        }
    }

再結(jié)合RxJava的鏈?zhǔn)讲僮饕押В幚頂?shù)據(jù)的時候是自下而上,而發(fā)射數(shù)據(jù)的時候是自上而下(這句話網(wǎng)上說的太多了票编,我最開始也是不理解褪储,只有自己真正看過源碼分析了,自己Debug一邊才能真正地理解)慧域。
好了先寫到這里了鲤竹,剩下的內(nèi)容我會放到另外一篇博客里面,感覺文章太長不利于閱讀昔榴。

這篇文章也是我第一次試著去分析源碼最后寫出的辛藻,很多都是我自己的理解,所以肯定有不妥當(dāng)或者錯誤的地方希望大家看到了以后能給我指出來互订,我一定改正吱肌!

最后

沒有最后了 大家再見~~~

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市仰禽,隨后出現(xiàn)的幾起案子氮墨,更是在濱河造成了極大的恐慌,老刑警劉巖吐葵,帶你破解...
    沈念sama閱讀 222,946評論 6 518
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件规揪,死亡現(xiàn)場離奇詭異,居然都是意外死亡温峭,警方通過查閱死者的電腦和手機猛铅,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,336評論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來凤藏,“玉大人奸忽,你說我怎么就攤上這事∏灞浚” “怎么了月杉?”我有些...
    開封第一講書人閱讀 169,716評論 0 364
  • 文/不壞的土叔 我叫張陵刃跛,是天一觀的道長抠艾。 經(jīng)常有香客問我,道長桨昙,這世上最難降的妖魔是什么检号? 我笑而不...
    開封第一講書人閱讀 60,222評論 1 300
  • 正文 為了忘掉前任腌歉,我火速辦了婚禮,結(jié)果婚禮上齐苛,老公的妹妹穿的比我還像新娘翘盖。我一直安慰自己,他們只是感情好凹蜂,可當(dāng)我...
    茶點故事閱讀 69,223評論 6 398
  • 文/花漫 我一把揭開白布馍驯。 她就那樣靜靜地躺著,像睡著了一般玛痊。 火紅的嫁衣襯著肌膚如雪汰瘫。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,807評論 1 314
  • 那天擂煞,我揣著相機與錄音混弥,去河邊找鬼。 笑死对省,一個胖子當(dāng)著我的面吹牛蝗拿,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播蒿涎,決...
    沈念sama閱讀 41,235評論 3 424
  • 文/蒼蘭香墨 我猛地睜開眼哀托,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了劳秋?” 一聲冷哼從身側(cè)響起萤捆,我...
    開封第一講書人閱讀 40,189評論 0 277
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎俗批,沒想到半個月后俗或,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,712評論 1 320
  • 正文 獨居荒郊野嶺守林人離奇死亡岁忘,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,775評論 3 343
  • 正文 我和宋清朗相戀三年辛慰,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片干像。...
    茶點故事閱讀 40,926評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡帅腌,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出麻汰,到底是詐尸還是另有隱情速客,我是刑警寧澤,帶...
    沈念sama閱讀 36,580評論 5 351
  • 正文 年R本政府宣布五鲫,位于F島的核電站溺职,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜浪耘,卻給世界環(huán)境...
    茶點故事閱讀 42,259評論 3 336
  • 文/蒙蒙 一乱灵、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧七冲,春花似錦痛倚、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,750評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至掘鄙,卻和暖如春颠区,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背通铲。 一陣腳步聲響...
    開封第一講書人閱讀 33,867評論 1 274
  • 我被黑心中介騙來泰國打工毕莱, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人颅夺。 一個月前我還...
    沈念sama閱讀 49,368評論 3 379
  • 正文 我出身青樓朋截,卻偏偏與公主長得像,于是被迫代替她去往敵國和親吧黄。 傳聞我的和親對象是個殘疾皇子部服,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,930評論 2 361

推薦閱讀更多精彩內(nèi)容