RxJava2 源碼解析——線程調(diào)度 Scheduler

RxJava源碼解析第二篇库车。
我們知道哥放,在使用RxJava的時(shí)候智嚷,線程的調(diào)度是其內(nèi)部幫我們實(shí)現(xiàn)的,這讓我們可以便捷的實(shí)現(xiàn)函數(shù)式編程糠悯。
本文主要從源碼的角度來分析RxJava的線程調(diào)度機(jī)制
= =最近被項(xiàng)目搞瘋都沒什么時(shí)間寫筆記了帮坚。


引入

我們知道妻往,線程調(diào)度主要通過observeOnsubscribeOn這兩個(gè)方法,以及Schedular來指定使用的線程试和。
還是以上一次的代碼為例:

Observable.create(new ObservableOnSubscribe<LoginApiResult>() {
        @Override
        public void subscribe(ObservableEmitter<LoginApiResult> e) throws Exception {
            e.onNext(login());
        }
    }) //調(diào)用登錄接口
    .map(new Function<LoginApiBean, UserInfoBean>() {
        @Override
        protected UserInfoBean decode(LoginApiBean loginApiBean) {
            //處理登錄結(jié)果讯泣,返回UserInfo
            if (loginApiBean.isSuccess()) {
                return loginApiBean.getUserInfoBean();
            } else {
                throw new RequestFailException("獲取網(wǎng)絡(luò)請求失敗");
            }
        }
    })
    .doOnNext(new Consumer<UserInfoBean>() {    //保存登錄結(jié)果UserInfo
        @Override
        public void accept(@NonNull UserInfoBean bean) throws Exception {
            saveUserInfo(bean);
        }
    })
    .subscribeOn(Schedulers.io())   //調(diào)度線程
    .observeOn(AndroidSchedulers.mainThread())  //調(diào)度線程
    .subscribe(new Consumer<UserInfoBean>() {
        @Override
        public void accept(@NonNull UserInfoBean bean) throws Exception {LoginApiBean
            //整個(gè)請求成功,根據(jù)獲取的UserInfo更新對應(yīng)的View
            showSuccessView(bean);
        }
    }, new Consumer<Throwable>() {
        @Override
        public void accept(@NonNull Throwable throwable) throws Exception {
            //請求失敗灰署,顯示對應(yīng)的View
            showFailView();
        }
    });

我們知道判帮,通過:

.subscribeOn(Schedulers.io())   //調(diào)度線程
.observeOn(AndroidSchedulers.mainThread())  //調(diào)度線程

這兩句代碼,就使我們上半部分的請求和保存數(shù)據(jù)都執(zhí)行在io線程中溉箕,而下半部的ui更新則執(zhí)行在主線程晦墙。

通過這段代碼,我們引入幾個(gè)問題:

  1. observeOn和subscribeOn是如何實(shí)現(xiàn)線程調(diào)度的肴茄?
  2. observeOn和subscribeOn之間是否存在沖突晌畅?

observeOn源碼

首先解決第一個(gè)問題,我們先了解一下ObserveOn的實(shí)現(xiàn)原理:

首先看一下調(diào)用:

public final Observable<T> observeOn(Scheduler scheduler) {
    return observeOn(scheduler, false, bufferSize());
}

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    return new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize);
}

我們可以看到寡痰,ObserveOn最終是返回了一個(gè)ObservableObserveOn對象抗楔,并將scheduler傳入。

根據(jù)上一篇文的思路:


任務(wù)鏈.png

ObservableObserveOn會(huì)被我們最后subscribe的時(shí)候傳入的Observer訂閱拦坠。

讓我們跟進(jìn)看一下ObservableObserveOn被訂閱時(shí)會(huì)執(zhí)行什么邏輯:

@Override
protected void subscribeActual(Observer<? super T> observer) {
    //TrampolineScheduler 表示當(dāng)前線程
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        //根據(jù)scheduler創(chuàng)建worker
        Scheduler.Worker w = scheduler.createWorker();
        //通過ObservableObserveOnObserver代理
        source.subscribe(new ObservableObserveOn.ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}

這里的邏輯并不難理解连躏,(如果看了上一篇文章),

首先是判斷了scheduler是不是表示當(dāng)前線程的TrampolineScheduler贞滨,如果是就直接讓observer訂閱上一級的Observable入热,也就是跳過當(dāng)前這一層,即圖中的Observer直接訂閱ObservableSubscribeOn晓铆。

然后根據(jù)schedular生成對應(yīng)的worker勺良,交由ObservableObserveOnObserver代理,訂閱上一級的Observable骄噪。

根據(jù)我們引入的案例尚困,我們以observeOn(AndroidSchedulers.mainThread()) 為例,當(dāng)完成逆向訂閱链蕊,執(zhí)行任務(wù)鏈到ObservableObserveOnObserver時(shí):

@Override
public void onNext(T t) {
    // 上一級的模式如果不是異步的事甜,加入隊(duì)列
    if (sourceMode != QueueDisposable.ASYNC) {
        queue.offer(t);
    }
    //進(jìn)行線程調(diào)度
    schedule();
}

void schedule() {
    // 判斷當(dāng)前正在執(zhí)行的任務(wù)數(shù)目
    if (getAndIncrement() == 0) {
        worker.schedule(this);
    }
}

這里首先是判斷了sourceMode,這里先不跟蹤這個(gè)變量滔韵,只需要知道大多數(shù)情況下逻谦,這個(gè)判斷是成立,所以會(huì)把數(shù)據(jù)加入隊(duì)列奏属。

然后轉(zhuǎn)而讓worker執(zhí)行接下去的步驟。

我們跟蹤看看潮峦,可以發(fā)現(xiàn)囱皿,這是個(gè)抽象方法勇婴,可以找到他在不同類中有不同實(shí)現(xiàn),分別對應(yīng)了幾種不同的線程調(diào)度機(jī)制嘱腥,我們挑選案例中的AndroidSchedulers.mainThread()來跟蹤耕渴。

首先我們跟蹤mainThread方法,可以發(fā)現(xiàn)內(nèi)部轉(zhuǎn)到了這里:

static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));

我們再跟進(jìn)HandlerScheduler齿兔,我們知道worker是通過createWorker方法產(chǎn)生的:

public Worker createWorker() {
   return new HandlerWorker(handler);
}

可以看到直接生成了HandlerWorker橱脸,并傳入了一開始創(chuàng)建的綁定了MainLooperHandler》治看到這里也能大致猜出添诉,后續(xù)會(huì)把任務(wù)傳給這個(gè)handler執(zhí)行:

@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
    //省略部分代碼
    ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

    Message message = Message.obtain(handler, scheduled);
    message.obj = this; // Used as token for batch disposal of this worker's runnables.

    handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));

    return scheduled;
}

可以看到,這里將傳進(jìn)來的runnable包裝成ScheduledRunnable医寿,然后提交給綁定的handler栏赴。

我們知道,后續(xù)Handler會(huì)調(diào)用ScheduledRunnable的run方法:

ScheduledRunnable(Handler handler, Runnable delegate) {
    this.handler = handler;
    this.delegate = delegate;
}

@Override
public void run() {
    try {
        delegate.run();
    } catch (Throwable t) {
        //……
    }
}

可以看到靖秩,只是簡單的調(diào)用了我們傳入的runnablerun方法须眷,也就是剛才我們在ObservableObserveOnObserver中通過schedule方法傳入的runnable,我們回去看看:

void schedule() {
    // 判斷當(dāng)前正在執(zhí)行的任務(wù)數(shù)目
    if (getAndIncrement() == 0) {
        worker.schedule(this);
    }
}

可以看到其實(shí)本身就是個(gè)runnable

@Override
public void run() {
    //輸出結(jié)果是否融合
    if (outputFused) {
        drainFused();
    } else {
        drainNormal();
    }
}

可以看到沟突,根據(jù)outputFused來跳轉(zhuǎn)方法花颗,這里先不跟蹤這個(gè)變量,后面會(huì)再提到惠拭。
現(xiàn)在只需要知道當(dāng)連續(xù)兩個(gè)observable都需要線程調(diào)度時(shí)(比如從observeOnobserveOn)扩劝,這個(gè)outputFused才會(huì)發(fā)生變化,默認(rèn)為false求橄。

那么這里今野,我們先進(jìn)入drainNormal方法:

void drainNormal() {
    int missed = 1;
    final SimpleQueue<T> q = queue;
    final Observer<? super T> a = actual;
    //第一層循環(huán)
    for (;;) {
        // 檢查異常處理
        if (checkTerminated(done, q.isEmpty(), a)) {
            return;
        }
        //第二層循環(huán)
        for (;;) {
            boolean d = done;
            T v;
            //從隊(duì)列中獲取數(shù)據(jù)
            v = q.poll();
            boolean empty = v == null;
            // 檢查異常
            if (checkTerminated(d, empty, a)) {
                return;
            }
            //如果沒有數(shù)據(jù)了,跳出
            if (empty) {
                break;
            }
            //執(zhí)行下一次操作罐农。
            a.onNext(v);
        }
        //減掉執(zhí)行的次數(shù)条霜,并獲取剩于任務(wù)數(shù)量,然后再次循環(huán)
        //直到獲取剩余任務(wù)量為0涵亏,跳出循環(huán)
        missed = addAndGet(-missed);
        if (missed == 0) {
            break;
        }
    }
}

這里的邏輯其實(shí)也不難宰睡,具體可以看注釋。

到這里其實(shí)已經(jīng)切換了線程气筋,然后就是分發(fā)數(shù)據(jù)拆内,逐個(gè)調(diào)用onNext操作了。直到?jīng)]有數(shù)據(jù)就跳出循環(huán)宠默。(總覺得這里missed的設(shè)計(jì)很奇怪- -為什么是初始化1而不是missed=get()呢麸恍。望有大神解答~)

看到這里也就大致明白了ObserveOn的流程呢。

總結(jié)一下:
ObserveOn會(huì)用一個(gè)queue保存上一級傳下來的數(shù)據(jù),然后通過scheduler創(chuàng)建一個(gè)worker抹沪,提交數(shù)據(jù)刻肄,并將任務(wù)執(zhí)行在worker設(shè)置的線程中。

subscribeOn源碼

看完ObserveOn融欧,我們看一下subscribeOn,
首先看一下當(dāng)他被訂閱時(shí)會(huì)執(zhí)行什么操作:

@Override
public void subscribeActual(final Observer<? super T> s) {
    //創(chuàng)建對應(yīng)的Observer
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

    //執(zhí)行線程調(diào)度敏弃,內(nèi)部會(huì)訂閱上一級的Observable
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

可以看到,這里直接進(jìn)行了線程調(diào)度噪馏,創(chuàng)建了SubscribeTask任務(wù)麦到,然后交由Scheduler執(zhí)行。

我們先看看scheduleDirect會(huì)執(zhí)行什么操作:

@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    final Scheduler.Worker w = createWorker();

    Scheduler.DisposeTask task = new Scheduler.DisposeTask(run, w);

    w.schedule(task, delay, unit);

    return task;
}

可以看到欠肾,這里和我們剛才追蹤ObserveOn時(shí)的邏輯一樣瓶颠。都是將任務(wù)交給了Worker處理。我們剛才已經(jīng)分析了董济,Worker會(huì)將任務(wù)提交給對應(yīng)的線程執(zhí)行步清。

所以我們回過頭看一下我們提交了什么任務(wù):

@Override
public void run() {
    source.subscribe(parent);
}

可以看出,這里將訂閱的操作提交給了Worker執(zhí)行虏肾。

總結(jié)一下:
subscribeOn會(huì)將訂閱上一級的操作調(diào)交給worker中對應(yīng)的線程執(zhí)行廓啊。

ObserveOn和subscribeOn

我們還是以上述引入的例子為例,可以看出封豪,整個(gè)過程進(jìn)行了兩次線程調(diào)度谴轮,首先是subscribeOn,然后是ObserveOn吹埠,這個(gè)過程比較簡單第步,先解析這個(gè)過程。

根據(jù)上一篇文章的分析缘琅,RxJava的整個(gè)流程分為三個(gè)步驟:

  1. 創(chuàng)建任務(wù)鏈粘都,這里沒有涉及線程調(diào)度。默認(rèn)執(zhí)行在當(dāng)前線程刷袍,在這里也就是主線程翩隧。

  2. 逆向訂閱,這里當(dāng)遇到ObserveOn的時(shí)候呻纹,ObserveOn直接進(jìn)行了訂閱操作堆生,所以沒有影響。
    但是但我們訂閱ObservableSubscribeOn的時(shí)候雷酪,其便將訂閱操作提交到了對應(yīng)線程淑仆,所以后續(xù)的訂閱操作都執(zhí)行在對應(yīng)線程,在這里便是IO線程哥力。

  3. 執(zhí)行任務(wù)鏈蔗怠,受到ObservableSubscribeOn的影響,這里也會(huì)繼續(xù)執(zhí)行在IO線程
    但是當(dāng)我們執(zhí)行到ObserveOnObserver的時(shí)候寞射,onNext操作會(huì)執(zhí)行在對應(yīng)的線程中最住,在這里也就是切換到主線程

線程調(diào)度.png

圖中怠惶,紫色的箭頭表示執(zhí)行在默認(rèn)線程(主線程),紅色的箭頭表示執(zhí)行在IO線程轧粟,繩藍(lán)色的線表示執(zhí)行在切換后的主線程策治。

observeOn和subscribeOn之間是否存在沖突

其實(shí)從上述的例子我們可以看出并不存在沖突的問題,一個(gè)影響的subscribe之后的操作兰吟,一個(gè)影響的是doNext之后的操作通惫。

從圖中可以看出,不管subscribeObserveOn怎么變化混蔼,都不會(huì)發(fā)生沖突的情況履腋。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市惭嚣,隨后出現(xiàn)的幾起案子遵湖,更是在濱河造成了極大的恐慌,老刑警劉巖晚吞,帶你破解...
    沈念sama閱讀 218,941評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件延旧,死亡現(xiàn)場離奇詭異,居然都是意外死亡槽地,警方通過查閱死者的電腦和手機(jī)迁沫,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,397評論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來捌蚊,“玉大人集畅,你說我怎么就攤上這事∶逶悖” “怎么了挺智?”我有些...
    開封第一講書人閱讀 165,345評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長溺拱。 經(jīng)常有香客問我逃贝,道長,這世上最難降的妖魔是什么迫摔? 我笑而不...
    開封第一講書人閱讀 58,851評論 1 295
  • 正文 為了忘掉前任沐扳,我火速辦了婚禮,結(jié)果婚禮上句占,老公的妹妹穿的比我還像新娘沪摄。我一直安慰自己,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,868評論 6 392
  • 文/花漫 我一把揭開白布杨拐。 她就那樣靜靜地躺著祈餐,像睡著了一般。 火紅的嫁衣襯著肌膚如雪哄陶。 梳的紋絲不亂的頭發(fā)上帆阳,一...
    開封第一講書人閱讀 51,688評論 1 305
  • 那天,我揣著相機(jī)與錄音屋吨,去河邊找鬼蜒谤。 笑死,一個(gè)胖子當(dāng)著我的面吹牛至扰,可吹牛的內(nèi)容都是我干的鳍徽。 我是一名探鬼主播,決...
    沈念sama閱讀 40,414評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了粤攒?” 一聲冷哼從身側(cè)響起钓简,我...
    開封第一講書人閱讀 39,319評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,775評論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡萨咳,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了疫稿。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片培他。...
    茶點(diǎn)故事閱讀 40,096評論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖遗座,靈堂內(nèi)的尸體忽然破棺而出舀凛,到底是詐尸還是另有隱情,我是刑警寧澤途蒋,帶...
    沈念sama閱讀 35,789評論 5 346
  • 正文 年R本政府宣布猛遍,位于F島的核電站,受9級特大地震影響号坡,放射性物質(zhì)發(fā)生泄漏懊烤。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,437評論 3 331
  • 文/蒙蒙 一宽堆、第九天 我趴在偏房一處隱蔽的房頂上張望腌紧。 院中可真熱鬧,春花似錦畜隶、人聲如沸壁肋。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,993評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽浸遗。三九已至猫胁,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間跛锌,已是汗流浹背弃秆。 一陣腳步聲響...
    開封第一講書人閱讀 33,107評論 1 271
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留髓帽,地道東北人驾茴。 一個(gè)月前我還...
    沈念sama閱讀 48,308評論 3 372
  • 正文 我出身青樓,卻偏偏與公主長得像氢卡,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子晨缴,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,037評論 2 355

推薦閱讀更多精彩內(nèi)容