RxJava線程切換流程分析_subscribeOn

在上一小節(jié)中RxJava2_整體流程分析盯串,有這么一個(gè)結(jié)論,那就是每一次調(diào)用 Observable 的操作符都會(huì)返回一個(gè)新的 Observable 對(duì)象戒良,并且會(huì)通過(guò)構(gòu)造的方式傳入上一級(jí)創(chuàng)建的 Observable 對(duì)象体捏,將其保存起來(lái),下面是示例代碼。那么接下來(lái)操作的 subscribeOn几缭、observeOn 操作符都會(huì)分別創(chuàng)建新的 Observable 對(duì)象河泳,并存儲(chǔ)上一級(jí)創(chuàng)建的 observable。

//上一級(jí)創(chuàng)建的 observable 對(duì)象:ObservableOnSubscribe
Observable.create(new ObservableOnSubscribe<String>() {...}

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;
    //保存上一級(jí)創(chuàng)建的 Observable 對(duì)象 : ObservableOnSubscribe
    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
}

一奏司、執(zhí)行流程圖

RxJava2_執(zhí)行流程分析圖.png

二乔询、示例代碼

下面這段代碼的功能就是在 subscribe 方法內(nèi)部通過(guò)調(diào)用 getBitampFormServer 去請(qǐng)求一個(gè) Bitmap 對(duì)象,這個(gè)方法是耗時(shí)操作韵洋,當(dāng)前的操作應(yīng)該在子線程中執(zhí)行,得到 bmp 之后黄锤,根據(jù)結(jié)果分別去調(diào)用 onNext() /onError() 方法搪缨。而在訂閱者中若是 onNext 被回調(diào)則表示成功獲取到 bmp,對(duì)應(yīng)地將其設(shè)置給對(duì)應(yīng)的 mImageView 對(duì)象上鸵熟,如果 onError 被回調(diào)了副编,那么表示加載 Bitmap 是失敗的,對(duì)應(yīng)的再做一些其它操作流强,這些操作應(yīng)該在主線程中進(jìn)行痹届。本次通過(guò)從源碼的角度探究的是 RxJava2 內(nèi)部是如何進(jìn)行線程切換操作的。本小節(jié)先分析 subscribeOn 如何去實(shí)現(xiàn)事件源在子線程中發(fā)射事件打月。也就是 ObservableOnSubscribe#subscribe 在子線程中去執(zhí)行队腐。

Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Bitmap> e) throws Exception {
     
        //該方法進(jìn)行網(wǎng)絡(luò)請(qǐng)求,是比較耗時(shí)的操作奏篙。
        Bitmap bmp = getBitampFormServer("uri");
        if(bmp!=null) {
            //獲取 bmp 成功
            e.onNext(bmp);
            e.onComplete();
        }else{
            //如果從網(wǎng)絡(luò)加載圖片不成功柴淘,回調(diào)onError 來(lái)通知訂閱者
            e.onError(new Exception("圖片加載出錯(cuò)啦"));
        }
    }}) //事件源發(fā)射事件在子線程中運(yùn)行
        .subscribeOn(Schedulers.io())
        //訂閱者在主線程中接受事件
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<Bitmap>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("zeal", "onSubscribe");
            }
            @Override
            public void onNext(@NonNull Bitmap bmp) {
                //設(shè)置顯示在 ImageView 上
                mImageView.setImageBitmap(bmp);             
            }
            @Override
            public void onError(@NonNull Throwable e) {
                Log.e("zeal","error:"+e.toString());
            }
            @Override
            public void onComplete() {
                Log.e("zeal", "onComplete");
            }
        });

2、.subscribeOn(Schedulers.io()) 源碼分析

public final Observable<T> subscribeOn(Scheduler scheduler) {
    ...
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

2.1秘通、Scheduler

從下面的類(lèi)注釋可以知道为严,這個(gè)類(lèi)是一個(gè)調(diào)度類(lèi),可以延時(shí)/周期性地去執(zhí)行一個(gè)任務(wù)肺稀〉诠桑可以從 Schedulers 這個(gè)類(lèi)去獲取 Scheduler 的實(shí)現(xiàn)子類(lèi)對(duì)象,例如在頻繁進(jìn)行 io 操作就可以調(diào)用 Schedulers.io() 话原,如果是計(jì)算比較多的可以調(diào)用 Schedulers.computation()夕吻。

/**
 * A {@code Scheduler} is an object that specifies an API for scheduling
 * units of work with or without delays or periodically.
 * You can get common instances of this class in {@link io.reactivex.schedulers.Schedulers}.
 */
public abstract class Scheduler {}

2.2、Schedulers.io()

通過(guò)下面的 Schedulers.io() 源碼跟蹤稿静,最終返回的是一個(gè) IoScheduler 對(duì)象梭冠,這個(gè)對(duì)象實(shí)際上就是 Scheduler 的子類(lèi)對(duì)象。那么就符合 subscribeOn(Scheduler) 參數(shù)的要求了改备。

@NonNull
public static Scheduler io() {
    //內(nèi)部是 IO 
    return RxJavaPlugins.onIoScheduler(IO);
}
//-----------------------------------------------------
@NonNull
static final Scheduler IO;

static {
    ...
    // IO 是在靜態(tài)代碼塊中實(shí)例化的
    IO = RxJavaPlugins.initIoScheduler(new Callable<Scheduler>() {
        @Override
        public Scheduler call() throws Exception {
            //這里返回一個(gè) IoHolder 對(duì)象控漠。
            return IoHolder.DEFAULT;
        }
    });
    ...
}
//-----------------------------------------------------
static final class IoHolder {
    static final Scheduler DEFAULT = new IoScheduler();
}

//-----------------------------------------------------
//IoHolder 類(lèi)定義中可以知道,該類(lèi)是繼承至 Scheduler
public final class IoScheduler extends Scheduler {}

2.3、subscribeOn 內(nèi)部實(shí)現(xiàn)

  • subscribeOn(Scheduler scheduler)

這個(gè)方法內(nèi)部會(huì)通過(guò)創(chuàng)建一個(gè) ObservableSubscribeOn 對(duì)象盐捷,根據(jù)之前的經(jīng)驗(yàn)可知道偶翅,這個(gè)類(lèi)肯定也是一個(gè) Observable 的子類(lèi)對(duì)象。因此對(duì)于 subscribe(observer) 方法而言碉渡,我們就只關(guān)心它真正調(diào)用的方法 subscribeActual(observer) 聚谁。

  • subscribeActual(observer)

在subscribeActual 內(nèi)部首先是對(duì) observer 進(jìn)行包裝成 SubscribeOnObserver 對(duì)象。這里的 SubscribeOnObserver 不僅是一個(gè) Observer 滞诺,而且具備一個(gè)連接器的作用 Disposable 形导。

@Override
public void subscribeActual(final Observer<? super T> s) {
    //包裝 observer 對(duì)象
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    //將連接器 parent 通過(guò) onSubscribe 回調(diào)給 observer 對(duì)象
    s.onSubscribe(parent);
    //這里是通過(guò) scheduler 去執(zhí)行一個(gè)任務(wù) SubscribeTask。
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
  • SubscribeOnObserver

這個(gè)類(lèi)是對(duì) observer 的包裝习霹,內(nèi)部實(shí)現(xiàn)了 Observer 和 Disposable 接口朵耕。也就是說(shuō)它既有訂閱者的功能,也實(shí)現(xiàn)了連接器的功能淋叶。注意 actual 這個(gè)變量阎曹,它是下一級(jí)的 Observer 對(duì)象,為什么說(shuō)是下一級(jí)呢煞檩?因?yàn)槊看伟b的 Observer 是一級(jí)級(jí)別往上被訂閱的处嫌,當(dāng)前的 Observer 都會(huì)包裝下一級(jí)別的 Observer 對(duì)象。例如 SubscribeOnObserver 就封裝了下一級(jí)的 Observer 對(duì)象斟湃,其實(shí)就是當(dāng)前 Observer 接受到事件源發(fā)送過(guò)來(lái)的事件時(shí)熏迹,再調(diào)用包裝的 Observer 回調(diào)給下一級(jí),這樣一級(jí)級(jí)傳遞下去知道最后一級(jí) Observer桐早。

static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
    ...
    final Observer<? super T> actual;
    final AtomicReference<Disposable> s;
    SubscribeOnObserver(Observer<? super T> actual) {
        this.actual = actual;
        this.s = new AtomicReference<Disposable>();
    }
    @Override
    public void onSubscribe(Disposable s) {
        DisposableHelper.setOnce(this.s, s);
    }
    //發(fā)送事件
    @Override
    public void onNext(T t) {
        //回調(diào)給下一級(jí)
        actual.onNext(t);
    }
    //發(fā)送事件
    @Override
    public void onError(Throwable t) {
        //回調(diào)給下一級(jí)
        actual.onError(t);
    }
    //發(fā)送事件
    @Override
    public void onComplete() {
        //回調(diào)給下一級(jí)
        actual.onComplete();
    }
    @Override
    public void dispose() {
        DisposableHelper.dispose(s);
        DisposableHelper.dispose(this);
    }
    @Override
    public boolean isDisposed() {
        return DisposableHelper.isDisposed(get());
    }
    void setDisposable(Disposable d) {
        DisposableHelper.setOnce(this, d);
  • SubscribeTask(parent)

SubscribeTask 它是一個(gè) Runnbale 癣缅,因此我們把它理解為一個(gè)任務(wù)。首先關(guān)注是它的 run 方法哄酝,它內(nèi)部實(shí)現(xiàn)很簡(jiǎn)單友存,就是**通知上一級(jí)的 Observable 通過(guò) subscribe 這個(gè)方法進(jìn)行訂閱當(dāng)前 observer **。下面會(huì)執(zhí)行一大堆代碼陶衅,其實(shí)都會(huì)為創(chuàng)建一個(gè)線程然后交給指定的線程池取執(zhí)行這個(gè)任務(wù)屡立,先記住這個(gè)任務(wù)的使命。那么既然是一個(gè)線程搀军,那么肯定有一個(gè)地方需要執(zhí)行這個(gè)線程的膨俐,接下來(lái)關(guān)注 scheduler.scheduleDirect 方法。

final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver<T> parent;
    SubscribeTask(SubscribeOnObserver<T> parent) {
        this.parent = parent;
    }
    @Override
    public void run() {
        //【核心代碼罩句,這段代碼決定上一級(jí)observable訂閱在哪個(gè)線程執(zhí)行焚刺。】
        //source:就是上一級(jí)創(chuàng)建的 observable
        //parent 就是包裝后的 observer
        source.subscribe(parent);
    }
}

開(kāi)始尋找 SubscribeTask 這個(gè)線程實(shí)在哪里被執(zhí)行的门烂。

  • scheduler.scheduleDirect(new SubscribeTask(parent))

剛才分析過(guò) scheduler 就是 IoScheduler 對(duì)象了乳愉,跟蹤源碼發(fā)現(xiàn)兄淫,這個(gè)類(lèi)并沒(méi)有重寫(xiě)這個(gè)方法,因此直接進(jìn)入 Scheduler 查看蔓姚。

@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
    return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}

//這里的 delay = 0捕虽,也就是馬上執(zhí)行這個(gè)任務(wù)。
//【這個(gè) run 就是我們的目標(biāo)】
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    
    //核心代碼 createWorker() 創(chuàng)建一個(gè)可以可以執(zhí)行 run 的 worker 
    final Worker w = createWorker();
    //對(duì) run 進(jìn)行了包裝坡脐,實(shí)際上還是 run 這個(gè)對(duì)象泄私。【這個(gè) run 就是我們的目標(biāo)】
    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
    //decoratedRun 交給了 worker 去執(zhí)行
    w.schedule(new Runnable() {
        @Override
        public void run() {
            try {
                【我們的目標(biāo)在此處被執(zhí)行】
                decoratedRun.run();
            } finally {
                //事件源發(fā)射事件完畢之后备闲,就關(guān)閉連接器晌端。
                w.dispose();
            }
        }
    }, delay, unit);
    return w;
}
  • IoScheduler#createWorker();

現(xiàn)在我們知道我們的任務(wù)是交給 worker.schedule() 去執(zhí)行的。因?yàn)?Worker 是負(fù)責(zé)去執(zhí)行調(diào)度的浅役,因此不同的子類(lèi)會(huì)有不同的 Worker 的實(shí)現(xiàn)斩松,在 Scheduler 中通過(guò) createWorker() 來(lái)獲取子類(lèi)實(shí)現(xiàn)的 Worker 對(duì)象。

@Override
public Worker createWorker() {
    return new EventLoopWorker(pool.get());
}
  • Scheduler#Worker

這個(gè)類(lèi)具備延遲執(zhí)行任務(wù)觉既,周期性執(zhí)行任務(wù)的功能。所有的執(zhí)行都是基于 schedule() 方法乳幸,而這個(gè)方法是一個(gè)抽象方法瞪讼,也就是它無(wú)法知道子類(lèi)需要怎么執(zhí)行這個(gè)任務(wù),因?yàn)槊恳环N調(diào)度器執(zhí)行的方式 schedule 都不一樣粹断,因此交給子類(lèi)去實(shí)現(xiàn)符欠。

 @NonNull
 public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);
  • EnentLooerWorker#schedule()

有了 Worker 之后就要開(kāi)始執(zhí)行【我們的任務(wù) action 啦】

@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
    if (tasks.isDisposed()) {
        // don't schedule, we are unsubscribed
        return EmptyDisposable.INSTANCE;
    }
    //【任務(wù) action 】交給 threadWorker 去執(zhí)行
    return threadWorker.scheduleActual(action, delayTime, unit, tasks);
  • threadWorker.scheduleActual

threadWorker 是 ThreadWorker ,繼承至 NewThreadWorker 瓶埋。

static final class ThreadWorker extends NewThreadWorker 

//NewThreadWorker 內(nèi)部維護(hù)一個(gè)線程池 executor希柿。
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    private final ScheduledExecutorService executor;

    volatile boolean disposed;

    public NewThreadWorker(ThreadFactory threadFactory) {
        executor = SchedulerPoolFactory.create(threadFactory);
    }
    

//最終代碼會(huì)走到這里
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
    //對(duì) run 進(jìn)行包裝
    Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
    if (parent != null) {
        if (!parent.add(sr)) {
            return sr;
        }
    }
    Future<?> f;
    try {
        //上面已經(jīng)提到,delayTime = 0养筒;所以這個(gè)任務(wù)會(huì)被立即執(zhí)行曾撤,
        if (delayTime <= 0) {
            f = executor.submit((Callable<Object>)sr);
        } else {
            f = executor.schedule((Callable<Object>)sr, delayTime, unit);
        }
        sr.setFuture(f);
    } catch (RejectedExecutionException ex) {
        if (parent != null) {
            parent.remove(sr);
        }
        RxJavaPlugins.onError(ex);
    }
    return sr;
}

@Override
public void run() {
    try {
        try {
            //執(zhí)行原始的 run 方法。
            actual.run();
        } catch (Throwable e) {
            // Exceptions.throwIfFatal(e); nowhere to go
            RxJavaPlugins.onError(e);
        }
    } finally {
        Object o = get(PARENT_INDEX);
        if (o != DISPOSED && o != null && compareAndSet(PARENT_INDEX, o, DONE)) {
            ((DisposableContainer)o).delete(this);
        }
        for (;;) {
            o = get(FUTURE_INDEX);
            if (o == DISPOSED || compareAndSet(FUTURE_INDEX, o, DONE)) {
                break;
            }
        }
    }
}

2.4晕粪、 結(jié)果

f = executor.submit((Callable<Object>)sr); 這里執(zhí)行了 SubscribeTask#run() 方法挤悉,也就是當(dāng)前的訂閱者 Observer 訂閱了上一級(jí)的 Observable 。也就是上一級(jí)的 ObservableCreate.subscribe(observer) 被執(zhí)行了巫湘。請(qǐng)注意它是在子線程中被執(zhí)行的装悲。如果想要了解接下來(lái)的事件源是怎么發(fā)送事件的可以參考RxJava2_整體流程分析

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市尚氛,隨后出現(xiàn)的幾起案子诀诊,更是在濱河造成了極大的恐慌,老刑警劉巖阅嘶,帶你破解...
    沈念sama閱讀 218,204評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件属瓣,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)奠涌,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,091評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén)宪巨,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人溜畅,你說(shuō)我怎么就攤上這事捏卓。” “怎么了慈格?”我有些...
    開(kāi)封第一講書(shū)人閱讀 164,548評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵怠晴,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我浴捆,道長(zhǎng)蒜田,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,657評(píng)論 1 293
  • 正文 為了忘掉前任选泻,我火速辦了婚禮冲粤,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘页眯。我一直安慰自己梯捕,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,689評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布窝撵。 她就那樣靜靜地躺著傀顾,像睡著了一般。 火紅的嫁衣襯著肌膚如雪碌奉。 梳的紋絲不亂的頭發(fā)上短曾,一...
    開(kāi)封第一講書(shū)人閱讀 51,554評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音赐劣,去河邊找鬼嫉拐。 笑死,一個(gè)胖子當(dāng)著我的面吹牛隆豹,可吹牛的內(nèi)容都是我干的椭岩。 我是一名探鬼主播,決...
    沈念sama閱讀 40,302評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼璃赡,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼判哥!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起碉考,我...
    開(kāi)封第一講書(shū)人閱讀 39,216評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤塌计,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后侯谁,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體锌仅,經(jīng)...
    沈念sama閱讀 45,661評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡章钾,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,851評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了热芹。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片唉地。...
    茶點(diǎn)故事閱讀 39,977評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡沟涨,死狀恐怖创坞,靈堂內(nèi)的尸體忽然破棺而出断凶,到底是詐尸還是另有隱情,我是刑警寧澤报腔,帶...
    沈念sama閱讀 35,697評(píng)論 5 347
  • 正文 年R本政府宣布株搔,位于F島的核電站,受9級(jí)特大地震影響纯蛾,放射性物質(zhì)發(fā)生泄漏纤房。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,306評(píng)論 3 330
  • 文/蒙蒙 一翻诉、第九天 我趴在偏房一處隱蔽的房頂上張望炮姨。 院中可真熱鬧,春花似錦碰煌、人聲如沸剑令。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,898評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至棚蓄,卻和暖如春堕扶,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背梭依。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,019評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工稍算, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人役拴。 一個(gè)月前我還...
    沈念sama閱讀 48,138評(píng)論 3 370
  • 正文 我出身青樓糊探,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親河闰。 傳聞我的和親對(duì)象是個(gè)殘疾皇子科平,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,927評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容