RxJava2工作原理及線(xiàn)程切換


序言

RxJava是現(xiàn)在最流行的響應(yīng)式函數(shù)編程框架艘包,之前的項(xiàng)目中一直使用RxJava统倒,結(jié)合Retrofit+OkHttp搭建網(wǎng)絡(luò)請(qǐng)求框架寨典,很是好用。
后來(lái)RxJava2出來(lái)了房匆,官網(wǎng)表明一段時(shí)間之后不再維護(hù)RxJava耸成,所以在新項(xiàng)目中,決定使用RxJava2浴鸿。
對(duì)于新手來(lái)說(shuō)井氢,即使沒(méi)用過(guò)RxJava,也可以直接學(xué)習(xí)RxJava2岳链。而對(duì)于從RxJava過(guò)渡到RxJava2的同學(xué)花竞,自然更容易上手。

響應(yīng)式編程(Reactive Programming)這個(gè)詞很多人都知道掸哑,但具體是什么含義可能沒(méi)多少人能解釋清楚约急。我簡(jiǎn)單說(shuō)一下自己的理解:響應(yīng)式編程可基于任何事物(數(shù)據(jù)寇仓、用戶(hù)行為、時(shí)間烤宙、對(duì)象)創(chuàng)建事件流,并且框架提供一個(gè)強(qiáng)大的函數(shù)庫(kù)來(lái)操作事件流俭嘁,包括合并躺枕、過(guò)濾、轉(zhuǎn)換供填、切換線(xiàn)程拐云、監(jiān)聽(tīng)...,流是響應(yīng)式的核心近她。

RxJava就是這樣一個(gè)響應(yīng)式編程框架叉瘩,今天我們主要來(lái)介紹RxJava2的事件處理流程和線(xiàn)程切換原理。本文并不是一篇新手指引教程粘捎,而是一篇進(jìn)階教程薇缅。如果想入門(mén)RxJava2,可以看一下這篇文章攒磨。

用戶(hù)登錄場(chǎng)景

下面以用戶(hù)登錄場(chǎng)景為例泳桦,介紹怎樣通過(guò)RxJava2進(jìn)行事件流處理,登錄代碼如下:

/**
 * 用戶(hù)登錄操作
 */
public void login(final String userName, final String password) {
    Observable.create(new ObservableOnSubscribe<CommonApiBean<UserInfo>>() {
                @Override
                public void subscribe(ObservableEmitter<CommonApiBean<UserInfo>> e) throws Exception {
                    e.onNext(loginApi(userName, password));
                }
            })
            .map(new Function<CommonApiBean<UserInfo>, UserInfo>() {
                @Override
                public UserInfo apply(CommonApiBean<UserInfo> bean) throws Exception {
                    if (bean != null && bean.body != null) {
                        return bean.body;
                    }
                    return new UserInfo();
                }
            })
            .doOnNext(new Consumer<UserInfo>() {
                @Override
                public void accept(UserInfo userInfo) throws Exception {
                    saveUserInfo(userInfo);
                }
            })
            .subscribeOn(Schedulers.io()) 
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<UserInfo>() {
                @Override
                public void accept(UserInfo userInfo) throws Exception {
                    //登錄成功娩缰,跳轉(zhuǎn)頁(yè)面
                    loginSuccess(userInfo);
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    //登錄失敗提示用戶(hù)
                    loginFailed();
                }
            });
}

上面一段代碼是RxJava2常規(guī)的使用方式灸撰,能夠滿(mǎn)足多數(shù)網(wǎng)絡(luò)請(qǐng)求場(chǎng)景,下面我們就針對(duì)這段代碼進(jìn)行分析拼坎。

分析事件流

RxJava2處理事件流分為3個(gè)步驟:

  1. 構(gòu)建操作符對(duì)應(yīng)的Observable
  2. 逐級(jí)生成Observer浮毯,逆向訂閱Observable
  3. 逐級(jí)調(diào)用Observer的onNext方法

下面我們就分別來(lái)介紹這三個(gè)流程。

1. 構(gòu)建操作符對(duì)應(yīng)的Observable

上面一段代碼中泰鸡,使用的操作符包括create债蓝、map、doOnNext鸟顺、subscribeOn惦蚊、observeOn,我們依次看這些操作符做了什么事情讯嫂。

Observable
要了解RxJava2原理蹦锋,必須先了解Observable,Observable是RxJava2事件流的入口類(lèi)欧芽,也可以叫做事件源莉掂。

public abstract class Observalbe<T> implements ObservableSource<T> {
    //交由子類(lèi)實(shí)現(xiàn)的抽象方法
    @Override  
    protected abstract void subscribeActual(Observer observer) ;

    //實(shí)現(xiàn)了ObservableSource的方法
    @Override  
    public final void subscribe(Observer<? super T> observer) {
        //省略一堆判空等處理 
        subscribeActual(observer);
    }

    //省略一堆靜態(tài)操作符方法
}

從Observable的定義可知,它實(shí)現(xiàn)了ObservableSource接口千扔,并定義了一個(gè)subscribeActual抽象方法憎妙,調(diào)用Observable的subscribe方法實(shí)際上是做了一些基礎(chǔ)判斷后库正,調(diào)用subscribeActual方法。Observable的每個(gè)子類(lèi)需要需要實(shí)現(xiàn)自己的subscribeActual方法厘唾。

create

跟蹤到Observable的create方法:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

第一句代碼對(duì)source進(jìn)行判空褥符,如果為空,會(huì)拋出異常抚垃。接著生成一個(gè)ObservableCreate對(duì)象喷楣,把這個(gè)對(duì)象傳入RxJavaPlugin進(jìn)行組裝。
RxJavaPlugin提供了一系列的Hook Function鹤树,通過(guò)這種函數(shù)對(duì)RxJava的標(biāo)準(zhǔn)進(jìn)行加工铣焊,如果我們不配置這些方法,默認(rèn)直接返回原對(duì)象罕伯,即ObservableCreate曲伊。
注:下面介紹其他操作符,就不再解釋判空操作和RxJavaPlugin追他。

接著坟募,我們看一下ObservableCretae的定義:

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    //省略其他方法
}

很簡(jiǎn)單,ObservableCreate繼承Observable邑狸,并且在構(gòu)造方法中保存了傳入的ObservableOnSubscribe對(duì)象婿屹。
總結(jié):create()構(gòu)建了一個(gè)ObservableCreate對(duì)象,該對(duì)象繼承Observable推溃。

map
同樣昂利,跟蹤到Observable的map方法中:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}

這里創(chuàng)建并返回一個(gè)ObservalbeMap對(duì)象:

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }
    //省略其他方法
}

ObservableMap繼承AbstractObservableWithUpstream類(lèi):

/**
 * Base class for operators with a source consumable.
 */
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {

    protected final ObservableSource<T> source;
    
    AbstractObservableWithUpstream(ObservableSource<T> source) {
        this.source = source;
    }

    @Override
    public final ObservableSource<T> source() {
        return source;
    }
}

AbstractObservableWithUpstream是所有接收上一級(jí)輸入操作符的基類(lèi)。
總結(jié):map()構(gòu)建了一個(gè)ObservableMap對(duì)象铁坎。

doOnNext
根據(jù)上面兩個(gè)操作符的源碼蜂奸,我們猜測(cè)這里也會(huì)返回一個(gè)Observable子類(lèi)對(duì)象,進(jìn)入源碼驗(yàn)證一下:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> doOnNext(Consumer<? super T> onNext) {
    return doOnEach(onNext, Functions.emptyConsumer(), Functions.EMPTY_ACTION, Functions.EMPTY_ACTION);
}

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
private Observable<T> doOnEach(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Action onAfterTerminate) {
    //省略判空操作
    return RxJavaPlugins.onAssembly(new ObservableDoOnEach<T>(this, onNext, onError, onComplete, onAfterTerminate));
}

果不其然硬萍,doOnNext最后返回了一個(gè)ObservableDoOnEach對(duì)象:

public final class ObservableDoOnEach<T> extends AbstractObservableWithUpstream<T, T> {
    final Consumer<? super T> onNext;
    final Consumer<? super Throwable> onError;
    final Action onComplete;
    final Action onAfterTerminate;

    public ObservableDoOnEach(ObservableSource<T> source, Consumer<? super T> onNext,
                              Consumer<? super Throwable> onError,
                              Action onComplete,
                              Action onAfterTerminate) {
        super(source);
        this.onNext = onNext;
        this.onError = onError;
        this.onComplete = onComplete;
        this.onAfterTerminate = onAfterTerminate;
    }
    //省略其他代碼
}

doOnNext也需要接收上流傳來(lái)的Observable作為source扩所,所以也繼承了AbstractObservableWithUpstream。
總結(jié):doOnNext()構(gòu)建了一個(gè)ObservableDoOnEach對(duì)象朴乖。

subscribeOn
用過(guò)RxJava的同學(xué)都知道祖屏,subscribeOn是用來(lái)切換線(xiàn)程的,用于指定被觀察者執(zhí)行的線(xiàn)程买羞。
不著急袁勺,怎樣切換線(xiàn)程我們后面會(huì)分析,先看一下它的源碼:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

這里返回了一個(gè)ObservableSubscribeOn對(duì)象畜普,繼續(xù)跟蹤:

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }
    //省略其他代碼
}

ObservableSubscribeOn中保存了事件源source和線(xiàn)程調(diào)度器scheduler期丰,而這個(gè)scheduler是我們傳入的Schedulers.io()
總結(jié):subscribeOn()構(gòu)建了一個(gè)ObservableSubscribeOn對(duì)象

observeOn
各位同學(xué)肯定也知道钝荡,observeOn用于指定觀察者執(zhí)行的線(xiàn)程街立,至于怎樣實(shí)現(xiàn)線(xiàn)程切換等到后面分析。
老套路埠通,我們跟蹤到Observable.observeOn方法:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
    return observeOn(scheduler, false, bufferSize());
}

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    //省略判空驗(yàn)證操作
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

observeOn最后返回了一個(gè)ObservableObserveOn對(duì)象:

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }
    //省略其他代碼
}

ObservableObserveOn對(duì)象也保存了事件源source和線(xiàn)程調(diào)度器scheduler赎离,這里的scheduler是我們傳入的AndroidScheduler.mainThread()
總結(jié):observeOn構(gòu)建了一個(gè)ObservableObserveOn對(duì)象端辱。

到這里蟹瘾,操作符對(duì)應(yīng)的Observable構(gòu)建完成,總結(jié)一下掠手,按照操作符順序,構(gòu)建了ObservableCreate -> ObservableMap -> ObservableDoOnEach -> ObservableSubscribeOn -> ObservableObserveOn幾個(gè)Observable對(duì)象狸捕。

2. 逐級(jí)生成Observer喷鸽,逆向訂閱Observable

LambdaObserver
在登錄場(chǎng)景的最后,調(diào)用了subscribe方法灸拍,傳入了兩個(gè)Comsumer對(duì)象,我們看一下subscribe方法的實(shí)現(xiàn):

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {
    return subscribe(onNext, onError, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
        Action onComplete, Consumer<? super Disposable> onSubscribe) {
    //省略判空操作
    LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);

    subscribe(ls);

    return ls;
}

從代碼看出,subscribe方法拿我們傳入的兩個(gè)Consumer構(gòu)建了一個(gè)LambdaObserver對(duì)象慨亲,而兩個(gè)Consumer分別對(duì)應(yīng)onNext和onError戈锻,并且用LambdaObserver來(lái)訂閱ObservableObserveOn。
總結(jié):用LambdaObserver訂閱ObservableObserveOn對(duì)象轩性。

ObserveOnObserver
繼續(xù)分析ObservableObserveOn.subscribe方法声登,上面提到過(guò),Observable的subscribe方法實(shí)際上會(huì)調(diào)用具體子類(lèi)的subscribeActual方法揣苏,所以我們跟蹤ObservableObserveOn的subscribeActual方法:

//ObservableObserveOn
@Override
protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        Scheduler.Worker w = scheduler.createWorker();

        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}

TrampolineScheduler表示是否當(dāng)前線(xiàn)程悯嗓,而我們傳入的schedule是AndroidScheduler.mainThread(),并不是TrampolineScheduler對(duì)象卸察。
所以會(huì)走else邏輯脯厨,創(chuàng)建一個(gè)Scheduler.Worker,并把它作為參數(shù)構(gòu)建一個(gè)ObserveOnObserver對(duì)象坑质。
用ObserveOnObserver對(duì)象訂閱source(source是我們構(gòu)建ObservableObserveOn對(duì)象傳入的ObservableSubscribeOn對(duì)象)合武。
總結(jié):用ObserveOnObserver訂閱ObservableSubscribeOn對(duì)象

SubscribeOnObserver
接著,看一下ObservableSubscribeOn的訂閱邏輯:

@Override
public void subscribeActual(final Observer<? super T> s) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

    s.onSubscribe(parent);

    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

首先創(chuàng)建一個(gè)SubscribeOnObserver對(duì)象涡扼,scheduler.scheduleDirect從名字上看稼跳,大概用來(lái)切換線(xiàn)程的。在指定線(xiàn)程中執(zhí)行SubscribeTask任務(wù):

final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver<T> parent;

    SubscribeTask(SubscribeOnObserver<T> parent) {
        this.parent = parent;
    }

    @Override
    public void run() {
        source.subscribe(parent);
    }
}

這個(gè)任務(wù)很簡(jiǎn)單吃沪,就是用parent(SubscribeOnObserver對(duì)象)來(lái)訂閱source(此處的source是ObservableDoOnEach)岂贩。
總結(jié):SubscribeOnObserver訂閱ObservableDoOnEach對(duì)象

DoOnEachObserver
老套路,我們繼續(xù)看ObservableDoOnEach的subscribeActual方法:

@Override
public void subscribeActual(Observer<? super T> t) {
    source.subscribe(new DoOnEachObserver<T>(t, onNext, onError, onComplete, onAfterTerminate));
}

這個(gè)方法很干脆萎津,直接用DoOnEachObserver訂閱source(此處的source是ObservableMap對(duì)象)
總結(jié):用DoOnEachObserver訂閱ObservableMap對(duì)象卸伞。

MapObserver
到這里,相信大家也可以猜到锉屈,ObservableMap的subscribeActual中荤傲,肯定也是構(gòu)建一個(gè)MapObserver來(lái)訂閱source,本著實(shí)事求是的精神颈渊,源碼還是要看的:

@Override
public void subscribeActual(Observer<? super U> t) {
    source.subscribe(new MapObserver<T, U>(t, function));
}

果不其然_K焓颉!
總結(jié):用MapObserver訂閱ObservableCreate俊嗽。

最后雾家,我們要看一下ObservableCreate中的subscribeActual方法:

 @Override
 protected void subscribeActual(Observer<? super T> observer) {
     CreateEmitter<T> parent = new CreateEmitter<T>(observer);
     observer.onSubscribe(parent);

     try {
         source.subscribe(parent);
     } catch (Throwable ex) {
         Exceptions.throwIfFatal(ex);
         parent.onError(ex);
     }
 }

這里也出現(xiàn)了source.subscribe(parent),parent是CreateEmitter對(duì)象绍豁,那么source是什么呢芯咧?

有哪位同學(xué)能回答一下這個(gè)問(wèn)題嗎?

哈哈竹揍,不賣(mài)關(guān)子了敬飒,這里的source就是我們最開(kāi)始創(chuàng)建Observable事件流傳入的ObservableOnSubscribe對(duì)象,還有印象嗎芬位,沒(méi)有也沒(méi)關(guān)系:

public void login(final String userName, final String password) {
    Observable.create(new ObservableOnSubscribe<CommonApiBean<UserInfo>>() {
                @Override
                public void subscribe(ObservableEmitter<CommonApiBean<UserInfo>> e) throws Exception {
                    e.onNext(loginApi(userName, password));
                }
            }).//省略后續(xù)操作代碼
}

沒(méi)錯(cuò)无拗,就是調(diào)用此處我們實(shí)現(xiàn)的subscribe方法。
到這里昧碉,逆向訂閱Observable的過(guò)程分析完畢了英染。
總結(jié):
LambdaObserver -> ObservableObserveOn
ObserveOnObserver -> ObservableSubscribeOn
SubscribeOnObserver -> ObservableDoOnEach
DoOnEachObserver -> ObservableMap
MapObserver -> ObservableCreate
最后調(diào)用ObservableOnSubscribe的subscribe方法

有木有感覺(jué)思緒逐漸明朗起來(lái)!不急被饿,后面還有呢税迷!

3. 逐級(jí)調(diào)用Observer的onNext方法

MapObserver.onNext
接著上面最后一步進(jìn)行分析,調(diào)用ObservableOnSubscribe的subscribe方法锹漱,而我們?cè)趕ubscribe中調(diào)用了e.onNext箭养,此處的e是CreateEmmiter對(duì)象,進(jìn)入它的onNext方法:

@Override
public void onNext(T t) {
    if (t == null) {
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
    }
    if (!isDisposed()) {
        observer.onNext(t);
    }
}

邏輯很簡(jiǎn)單哥牍,對(duì)發(fā)射的數(shù)據(jù)判空毕泌,如果數(shù)據(jù)為空則拋出異常。如果沒(méi)有中斷事件流嗅辣,則調(diào)用observer.onNext撼泛,此處的observer是創(chuàng)建CreateEmitter時(shí)傳入的MapObserver。
總結(jié):調(diào)用MapObserver的onNext方法澡谭。

DoOnEachObserver
跟蹤到MapObserver的onNext方法:

@Override
public void onNext(T t) {
    //省略一些判斷
    U v;
    try {
        v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
    } catch (Throwable ex) {
        fail(ex);
        return;
    }
    actual.onNext(v);
}

先通過(guò)mapper.appply對(duì)數(shù)據(jù)t做變換愿题,變換之后繼續(xù)調(diào)用DoOnEachObserver的onNext方法。

總結(jié):調(diào)用DoOnEachObserver的onNext方法

SubscribeOnObserver.onNext
我們跟到DoOnEachObserver.onNext方法:

@Override
public void onNext(T t) {
    if (done) {
        return;
    }
    try {
        onNext.accept(t);
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        s.dispose();
        onError(e);
        return;
    }

    actual.onNext(t);
}

跟我們預(yù)料一樣潘酗,繼續(xù)調(diào)用下一級(jí)observer(SubscribeOnObserver)的onNext方法杆兵。

總結(jié):調(diào)用SubscribeOnObserver的onNext方法

ObserveOnObserver.onNext
長(zhǎng)征的路一步一步走仔夺,接著看SubscribeOnObserver.onNext方法:

@Override
public void onNext(T t) {
    actual.onNext(t);
}

這里也很干脆直接琐脏,調(diào)用上一級(jí)observer(ObserveOnObserver)的onNext方法。
總結(jié):調(diào)用ObserveOnObserver的onNext方法缸兔。

LambdaObserver.onNext
進(jìn)入ObserveOnObserver.onNext方法:

@Override
public void onNext(T t) {
    if (done) {
        return;
    }

    if (sourceMode != QueueDisposable.ASYNC) {
        queue.offer(t);
    }
    schedule();
}

先把數(shù)據(jù)加入queue日裙,然后代用schedule方法,這里涉及到線(xiàn)程調(diào)度惰蜜,我們稍后分析昂拂,總之最后會(huì)調(diào)用到LambdaObserver.onNext方法。

到這里抛猖,逐級(jí)調(diào)用Observer的onNext方法也分析完畢了格侯。
總結(jié):
MapObserver.onNext -> DoOnEachObserver.onNext -> SubscribeOnObserver.onNext -> ObserveOnObserver.onNext -> LambdaObserver.onNext

到這里,RxJava整個(gè)事件流的原理分析完畢了樟结。回顧一下精算,包括三個(gè)步驟:

  1. 構(gòu)建操作符對(duì)應(yīng)的Observable
  2. 逐級(jí)生成Observer瓢宦,逆向訂閱Observable
  3. 逐級(jí)調(diào)用Observer的onNext方法

前面很詳細(xì)講解了每一個(gè)步驟,下面我們用一張圖來(lái)概括整個(gè)過(guò)程:
image.png

相信各位同學(xué)很容易理解這個(gè)圖片描述的流程灰羽。

線(xiàn)程切換原理

在理解了RxJava2操作符工作原理之后驮履,我們需要分析subscribeOn和observeOn切換線(xiàn)程的原理。

1. subscribeOn

上面提到過(guò)廉嚼,subscribeOn指定被觀察者處理事件流所在線(xiàn)程玫镐,它作用在subscribe階段(即圖中逆向訂閱過(guò)程),我們重新看一下ObservableSubscribeOn的訂閱過(guò)程:

@Override
public void subscribeActual(final Observer<? super T> s) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

    s.onSubscribe(parent);

    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

此處的scheduler怠噪,是我們出傳入的Schedulers.io()恐似,這是個(gè)什么東西呢?

public final class Schedulers {
    @NonNull
    static final Scheduler IO;
    
    @NonNull
    public static Scheduler io() {
        return RxJavaPlugins.onIoScheduler(IO);
    }
    
    static {
        IO = RxJavaPlugins.initIoScheduler(new IOTask());
        //省略其他幾個(gè)Scheduler初始化過(guò)程
    }
    
    static final class IOTask implements Callable<Scheduler> {
        @Override
        public Scheduler call() throws Exception {
            return IoHolder.DEFAULT;
        }
    }
    
    static final class IoHolder {
        static final Scheduler DEFAULT = new IoScheduler();
    }

//省略其他代碼
}

看到這里傍念,我們知道了矫夷,Scheduler.io()具體是指IoScheduler對(duì)象,IoScheduler繼承Scheduler憋槐,Scheduler是所有線(xiàn)程調(diào)度器的父類(lèi)双藕,看一下Scheduler的實(shí)現(xiàn):

public abstract class Scheduler {
    @NonNull
    public abstract Worker createWorker();
    
    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }
    
    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();
    
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
        DisposeTask task = new DisposeTask(decoratedRun, w);
    
        w.schedule(task, delay, unit);
    
        return task;
    }
    //省略其代碼
}

Scheduler是個(gè)抽象類(lèi),包含一個(gè)抽象方法createWorker阳仔,返回一個(gè)Worker對(duì)象忧陪。
而它的scheduleDirect方法實(shí)際上就是調(diào)用這個(gè)Worker的schedule方法

繼續(xù)分析線(xiàn)程切換邏輯,代碼中調(diào)用了IoScheduler.scheduleDirect方法嘶摊,實(shí)際就是把SubscribeTask交給IoScheduler.createWorker構(gòu)建的worker去執(zhí)行延蟹。
跟到IoScheduler.createWorker方法:

@NonNull
@Override
public Worker createWorker() {
    return new EventLoopWorker(pool.get());
}

返回了一個(gè)EventLoopWorker對(duì)象,進(jìn)入它的schedule方法:

@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
    if (tasks.isDisposed()) {
        // don't schedule, we are unsubscribed
        return EmptyDisposable.INSTANCE;
    }

    return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}

這里也不是真正執(zhí)行任務(wù)的地方更卒,那就繼續(xù)跟進(jìn)到ThreadWorker.scheduleActual方法:

@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
    Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

    if (parent != null) {
        if (!parent.add(sr)) {
            return sr;
        }
    }

    Future<?> f;
    try {
        if (delayTime <= 0) {
            f = executor.submit((Callable<Object>)sr);
        } else {
            f = executor.schedule((Callable<Object>)sr, delayTime, unit);
        }
        sr.setFuture(f);
    } catch (RejectedExecutionException ex) {
        if (parent != null) {
            parent.remove(sr);
        }
        RxJavaPlugins.onError(ex);
    }

    return sr;
}

到這里等孵,總算看到了,通過(guò)線(xiàn)程池分配線(xiàn)程來(lái)執(zhí)行任務(wù)蹂空。
總結(jié):subscribeOn(Schedulers.io())會(huì)在逆向訂閱步驟中俯萌,通過(guò)線(xiàn)程池分配一個(gè)子線(xiàn)程來(lái)執(zhí)行任務(wù)。

圖中通過(guò)粉色和紅色的箭頭區(qū)分了UI線(xiàn)程和子線(xiàn)程上枕,走到訂閱ObservableSubscribeOn時(shí)咐熙,從UI線(xiàn)程切換到子線(xiàn)程,箭頭從粉色變?yōu)榧t色辨萍,之后的逆向訂閱操作都在子線(xiàn)程中進(jìn)行棋恼。

2. observeOn

接下來(lái)分析observeOn,它是指定觀察者(訂閱者)處理事件所在線(xiàn)程锈玉。我們傳入的是AndroidScheduler.mainThread(),這又是個(gè)什么東西呢爪飘?

public final class AndroidSchedulers {

    private static final class MainHolder {

        static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    }

    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call() throws Exception {
                    return MainHolder.DEFAULT;
                }
            });

    public static Scheduler mainThread() {
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }

    public static Scheduler from(Looper looper) {
        if (looper == null) throw new NullPointerException("looper == null");
        return new HandlerScheduler(new Handler(looper));
    }

    private AndroidSchedulers() {
        throw new AssertionError("No instances.");
    }
}

看到這段代碼,我們就知道了拉背,AndroidScheduler.mainThread指的是HandlerThread师崎,在它構(gòu)造方法中會(huì)傳入一個(gè)主線(xiàn)程Handler,相信不用解釋?zhuān)魑煌瑢W(xué)明白這個(gè)Handler的作用吧椅棺。對(duì)犁罩,,两疚,就是用來(lái)把觀察者執(zhí)行邏輯切換到主線(xiàn)程床估。

那么,具體是在哪個(gè)過(guò)程切換的呢诱渤?執(zhí)行ObserveOnObserver的onNext階段丐巫!

代碼如下:

@Override
public void onNext(T t) {
    if (done) {
        return;
    }

    if (sourceMode != QueueDisposable.ASYNC) {
        queue.offer(t);
    }
    schedule();
}

先把事件流中的數(shù)據(jù)t加入隊(duì)列queue,然后執(zhí)行schedule方法:

void schedule() {
    if (getAndIncrement() == 0) {
        worker.schedule(this);
    }
}

這個(gè)worker相信大家猜到了勺美,就是HandlerScheduler的createWorker得到的對(duì)象鞋吉,把this作為任務(wù)(ObserveOnObserver實(shí)現(xiàn)了Runnable),交給worker執(zhí)行:

@Override
public Worker createWorker() {
    return new HandlerWorker(handler);
}

這里的worker是HandlerWorker對(duì)象励烦,繼續(xù)看它的schedule方法:

@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
    //省略一些判斷

    ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

    Message message = Message.obtain(handler, scheduled);
    message.obj = this; // Used as token for batch disposal of this worker's runnables.

    handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));

    // Re-check disposed state for removing in case we were racing a call to dispose().
    if (disposed) {
        handler.removeCallbacks(scheduled);
        return Disposables.disposed();
    }

    return scheduled;
}

到這里谓着,總算守得云開(kāi)見(jiàn)日出。通過(guò)Handler發(fā)送消息把任務(wù)切換到主線(xiàn)程執(zhí)行坛掠。

這個(gè)任務(wù)就是剛才提到的ObserveOnObserver赊锚,我們看一下它的run方法:

@Override
public void run() {
    if (outputFused) {
        drainFused();
    } else {
        drainNormal();
    }
}

這里會(huì)根據(jù)outputFused走不通的邏輯治筒,正常情況下都會(huì)走else邏輯,我們就只分析這條分支舷蒲。

void drainNormal() {
    int missed = 1;

    final SimpleQueue<T> q = queue;
    final Observer<? super T> a = actual;

    for (;;) {
        if (checkTerminated(done, q.isEmpty(), a)) {
            return;
        }

        for (;;) {
            boolean d = done;
            T v;

            try {
                v = q.poll();
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                s.dispose();
                q.clear();
                a.onError(ex);
                worker.dispose();
                return;
            }
            boolean empty = v == null;

            if (checkTerminated(d, empty, a)) {
                return;
            }

            if (empty) {
                break;
            }

            a.onNext(v);
        }

        missed = addAndGet(-missed);
        if (missed == 0) {
            break;
        }
    }
}

這段代碼比較長(zhǎng)耸袜,但仔細(xì)看下來(lái),邏輯還是比較簡(jiǎn)單牲平,就是從queue中獲取數(shù)據(jù)堤框,然后把數(shù)據(jù)交給actual(LambdaObserver)的onNext方法。到這里纵柿,經(jīng)過(guò)轉(zhuǎn)換的數(shù)據(jù)交給我們傳入的Comsumer蜈抓,在主線(xiàn)程中處理,observeOn切換線(xiàn)程的邏輯分析完畢昂儒。

還是可以看回那張圖沟使,當(dāng)執(zhí)行ObserveOnObserver.onNext時(shí),就從子線(xiàn)程切換回UI線(xiàn)程渊跋,箭頭變成粉色腊嗡。
總結(jié):observeOn作用于onNext階段

總結(jié)

這篇文章很長(zhǎng)拾酝,相信看完的同學(xué)肯定會(huì)有所收獲燕少,對(duì)RxJava2有更好的認(rèn)識(shí)。
當(dāng)然蒿囤,除了本文舉例場(chǎng)景中的操作符客们,RxJava2還提供了很有強(qiáng)的而好用的操作符,各位同學(xué)可以學(xué)習(xí)學(xué)習(xí)蟋软。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末镶摘,一起剝皮案震驚了整個(gè)濱河市嗽桩,隨后出現(xiàn)的幾起案子岳守,更是在濱河造成了極大的恐慌,老刑警劉巖碌冶,帶你破解...
    沈念sama閱讀 212,454評(píng)論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件湿痢,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡扑庞,警方通過(guò)查閱死者的電腦和手機(jī)譬重,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,553評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)罐氨,“玉大人臀规,你說(shuō)我怎么就攤上這事≌ひ” “怎么了塔嬉?”我有些...
    開(kāi)封第一講書(shū)人閱讀 157,921評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵玩徊,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我谨究,道長(zhǎng)恩袱,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,648評(píng)論 1 284
  • 正文 為了忘掉前任胶哲,我火速辦了婚禮畔塔,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘鸯屿。我一直安慰自己澈吨,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,770評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布碾盟。 她就那樣靜靜地躺著棚辽,像睡著了一般。 火紅的嫁衣襯著肌膚如雪冰肴。 梳的紋絲不亂的頭發(fā)上屈藐,一...
    開(kāi)封第一講書(shū)人閱讀 49,950評(píng)論 1 291
  • 那天,我揣著相機(jī)與錄音熙尉,去河邊找鬼联逻。 笑死,一個(gè)胖子當(dāng)著我的面吹牛检痰,可吹牛的內(nèi)容都是我干的包归。 我是一名探鬼主播,決...
    沈念sama閱讀 39,090評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼铅歼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼公壤!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起椎椰,我...
    開(kāi)封第一講書(shū)人閱讀 37,817評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤厦幅,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后慨飘,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體确憨,經(jīng)...
    沈念sama閱讀 44,275評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,592評(píng)論 2 327
  • 正文 我和宋清朗相戀三年瓤的,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了休弃。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,724評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡圈膏,死狀恐怖塔猾,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情稽坤,我是刑警寧澤丈甸,帶...
    沈念sama閱讀 34,409評(píng)論 4 333
  • 正文 年R本政府宣布医增,位于F島的核電站,受9級(jí)特大地震影響老虫,放射性物質(zhì)發(fā)生泄漏叶骨。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,052評(píng)論 3 316
  • 文/蒙蒙 一祈匙、第九天 我趴在偏房一處隱蔽的房頂上張望忽刽。 院中可真熱鬧,春花似錦夺欲、人聲如沸跪帝。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,815評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)伞剑。三九已至,卻和暖如春市埋,著一層夾襖步出監(jiān)牢的瞬間黎泣,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,043評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工缤谎, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留抒倚,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,503評(píng)論 2 361
  • 正文 我出身青樓坷澡,卻偏偏與公主長(zhǎng)得像托呕,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子频敛,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,627評(píng)論 2 350

推薦閱讀更多精彩內(nèi)容