RxJava3原理解析

文章首發(fā)于我建立的一個筆記倉庫

1. 背景

RxJava是一個基于事件流、實現(xiàn)異步操作的庫。

官方介紹: RxJava:a library for composing asynchronous and event-based programs using observable sequences for the Java VM
(RxJava 是一個在 Java VM 上使用可觀測的序列來組成異步的海诲、基于事件的程序的庫)

文中用到的RxJava源碼版本為3.0.13靠粪,文中的demo源碼 https://github.com/xfhy/AllInOne/tree/master/app/src/main/java/com/xfhy/allinone/opensource/rxjava

2. 基礎(chǔ)使用

簡單介紹一下如何與Retrofit結(jié)合使用。引入:

implementation "io.reactivex.rxjava3:rxjava:3.0.13"
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
implementation "com.github.akarnokd:rxjava3-retrofit-adapter:3.0.0"

//Retrofit
implementation "com.squareup.retrofit2:retrofit:2.9.0"
//可選
implementation "com.squareup.retrofit2:converter-gson:2.9.0"

構(gòu)建Retrofit實例

private val retrofit by lazy {
        Retrofit.Builder()
            .baseUrl("https://www.wanandroid.com")
            //使用Gson解析
            .addConverterFactory(GsonConverterFactory.create())
            //轉(zhuǎn)換器   RxJava3   每次執(zhí)行的時候在IO線程
            .addCallAdapterFactory(RxJava3CallAdapterFactory.createWithScheduler(Schedulers.io()))
            .build()
    }

定義Retrofit的API:

interface WanAndroidService {

    @GET("wxarticle/chapters/json")
     fun listReposByRxJava(): Single<WxList?>

}

class WxList {
    var errorMsg = ""
    var errorCode = -1
    var data = mutableListOf<Wx>()

    class Wx {
        var id: Int = 0
        var name: String = ""
    }
}

請求網(wǎng)絡(luò):

fun reqNet() {
    val request = retrofit.create(WanAndroidService::class.java)
    val call = request.listReposByRxJava()
    call.observeOn(AndroidSchedulers.mainThread()).subscribe(object : SingleObserver<WxList?> {
        override fun onSubscribe(d: Disposable?) {
            tvContent.text = "開始請求網(wǎng)絡(luò)"
        }

        override fun onSuccess(t: WxList?) {
            t?.let {
                tvContent.text = it.data[0].name
            }
        }

        override fun onError(e: Throwable?) {
            tvContent.text = "網(wǎng)絡(luò)出錯"
        }
    })
}

這樣峦失,一個簡單的Retrofit與OKHttp的結(jié)合案例就完成了】松簦現(xiàn)在請求網(wǎng)絡(luò)的時候就可以使用RxJava那些鏈?zhǔn)讲僮髁恕?/p>

3. just : 最簡單的訂閱關(guān)系

先從最簡單的just開始筒严,看一下RxJava的訂閱關(guān)系是怎么樣的。

val just: Single<Int> = Single.just(1)
just.subscribe(object : SingleObserver<Int> {
    override fun onSubscribe(d: Disposable?) {
    }

    override fun onSuccess(t: Int) {
    }

    override fun onError(e: Throwable?) {
    }
})

Single.just(1)會構(gòu)建一個SingleJust實例出來情萤,

//Single.java
public static <@NonNull T> Single<T> just(T item) {
    Objects.requireNonNull(item, "item is null");
    return RxJavaPlugins.onAssembly(new SingleJust<>(item));
}

其中RxJavaPlugins.onAssembly是一個鉤子鸭蛙,不用在意,這段代碼就是返回一個SingleJust對象筋岛。

點進(jìn)去看一下subscribe是怎么走的

//Single.java
@Override
public final void subscribe(@NonNull SingleObserver<? super T> observer) {
    ...
    subscribeActual(observer);
    ...
}

核心代碼就一句娶视,調(diào)用subscribeActual方法,從名字看是進(jìn)行實際地訂閱睁宰。那么我們將目光聚焦到subscribeActual里面肪获,它是一個抽象方法,就上面的demo而言其實際實現(xiàn)是剛才創(chuàng)建出來的SingleJust柒傻。

//Single.java
protected abstract void subscribeActual(@NonNull SingleObserver<? super T> observer);

//SingleJust.java
public final class SingleJust<T> extends Single<T> {

    final T value;

    public SingleJust(T value) {
        this.value = value;
    }

    @Override
    protected void subscribeActual(SingleObserver<? super T> observer) {
        observer.onSubscribe(Disposable.disposed());
        observer.onSuccess(value);
    }

}

SingleJust里面的代碼非常簡潔孝赫,在實際訂閱(調(diào)用subscribeActual)時,直接將傳進(jìn)來的觀察者(也就是上面?zhèn)魅氲腟ingleObserver)回調(diào)onSubscribe和onSuccess就完事了红符。此處沒有onError青柄,因為不會失敗伐债。

4. map 操作符

4.1 原理

我們知道,RxJava中map可以轉(zhuǎn)換數(shù)據(jù)致开,看一下它是怎么做到的

val singleInt = Single.just(1)
val singleString = singleInt.map(object : Function<Int, String> {
    override fun apply(t: Int): String {
        return t.toString()
    }
})
singleString.subscribe(object : SingleObserver<String> {
    override fun onSubscribe(d: Disposable?) {
    }

    override fun onSuccess(t: String) {
    }

    override fun onError(e: Throwable?) {
    }
})

點進(jìn)去map看一下:

//Single.java
public final <@NonNull R> Single<R> map(@NonNull Function<? super T, ? extends R> mapper) {
    Objects.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new SingleMap<>(this, mapper));
}

構(gòu)建了一個SingleMap峰锁,有了上面just的經(jīng)驗,訂閱的時候是走的SingleMap的subscribeActual方法喇喉。直接去看:

public final class SingleMap<T, R> extends Single<R> {
    final SingleSource<? extends T> source;

    final Function<? super T, ? extends R> mapper;

    public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) {
        this.source = source;
        this.mapper = mapper;
    }

    @Override
    protected void subscribeActual(final SingleObserver<? super R> t) {
        source.subscribe(new MapSingleObserver<T, R>(t, mapper));
    }
}

注意一下這個source祖今,它是啥?在構(gòu)造方法里面?zhèn)魅氲募鸺迹簿褪窃赟ingle.java的map方法那里傳入的this,這個this也就是Single.just(1)所構(gòu)建出來的SingleJust對象耍目。這個SingleJust也就是此處map的上游膏斤,上游把事件給下游。

此處訂閱時邪驮,就是調(diào)一下上游的subscribe與自己綁定起來莫辨,完成訂閱關(guān)系。現(xiàn)在生產(chǎn)者是上游毅访,而此處的SingleMap就是下游的觀察者沮榜。

MapSingleObserver,也就是map的觀察者喻粹,來看一下它是怎么實現(xiàn)的

public final class SingleMap<T, R> extends Single<R> {
    static final class MapSingleObserver<T, R> implements SingleObserver<T> {

        final SingleObserver<? super R> t;

        final Function<? super T, ? extends R> mapper;

        MapSingleObserver(SingleObserver<? super R> t, Function<? super T, ? extends R> mapper) {
            this.t = t;
            this.mapper = mapper;
        }

        @Override
        public void onSubscribe(Disposable d) {
            t.onSubscribe(d);
        }

        @Override
        public void onSuccess(T value) {
            R v;
            try {
                //mapper是demo中傳入的object : Function<Int, String>
                v = Objects.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                onError(e);
                return;
            }

            t.onSuccess(v);
        }

        @Override
        public void onError(Throwable e) {
            t.onError(e);
        }
    }
}

其實t是下游的觀察者蟆融,通過subscribeActual傳入。在上游調(diào)用map的onSubscribe同時守呜,map也向下傳遞這個事件型酥,調(diào)用下游觀察者的onSubscribe。在上游調(diào)用map的onSuccess時查乒,map自己進(jìn)行轉(zhuǎn)換一下弥喉,再交給下游的onSuccess。同理玛迄,onError也是一樣的路線由境。

到這里就理清楚了。

4.2 框架結(jié)構(gòu)

RxJava的整體結(jié)構(gòu)是一條鏈蓖议,其中:

  1. 鏈的最上游:生產(chǎn)者Observable
  2. 鏈的最下游:觀察者Observer
  3. 鏈的中間:各個中介節(jié)點虏杰,既是下游的Observable,又是上游的Observer

4.3 操作符Operator(map等)的本質(zhì)

  1. 基于原Observable創(chuàng)建一個新的Observable
  2. Observable內(nèi)部創(chuàng)建一個Observer
  3. 通過定制Observable的subscribeActual()方法和Observer的onXxx()方法拒担,來實現(xiàn)自己的中介角色(例如數(shù)據(jù)轉(zhuǎn)換嘹屯、線程切換等)

5. dispose工作原理

可以通過dispose()方法來讓上游或內(nèi)部調(diào)度器(或兩者都有)停止工作,達(dá)到「丟棄」的效果从撼。

下面分別講一下這幾種情況:

  • Single.just 無后續(xù)州弟,無延遲
  • Observable.interval 有后續(xù)钧栖,有延遲
  • Single.map 無后續(xù),無延遲婆翔,有上下游
  • Single.delay 無后續(xù)拯杠,有延遲
  • Observable.map 有后續(xù),無延遲
  • Observable.delay 無后續(xù)啃奴,有延遲

這幾種情況已經(jīng)足夠把所有dispose的情況都說明完整了潭陪。

5.1 Single.just 無后續(xù),無延遲

對于Single.just最蕾,情況比較簡單依溯,在SingleJust的subscribeActual中,給觀察者一個全局共享的Disposable對象瘟则。下游不能對其進(jìn)行取消黎炉,因為間隔太短了,馬上就調(diào)用onSuccess了醋拧。

@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
    observer.onSubscribe(Disposable.disposed());
    observer.onSuccess(value);
}

5.2 Observable.interval 有后續(xù)慷嗜,有延遲

先來一段示例代碼:

val longObservable: Observable<Long> = Observable.interval(0, 1, TimeUnit.SECONDS)
longObservable.subscribe(object : Observer<Long> {
    override fun onSubscribe(d: Disposable?) {
    }

    override fun onNext(t: Long?) {
    }

    override fun onError(e: Throwable?) {
    }

    override fun onComplete() {
    }
})

這里Observable.interval構(gòu)建的是ObservableInterval對象。有了前面的經(jīng)驗丹壕,直接進(jìn)去看ObservableInterval的subscribeActual方法庆械。

//ObservableInterval.java
@Override
public void subscribeActual(Observer<? super Long> observer) {
    //1. 創(chuàng)建觀察者(該觀察者還實現(xiàn)了Disposable)
    IntervalObserver is = new IntervalObserver(observer);
    observer.onSubscribe(is);

    //線程調(diào)度器
    Scheduler sch = scheduler;

    ...
    //將is(它實現(xiàn)了Runnable)這個任務(wù)交給線程調(diào)度器去執(zhí)行,同時返回一個Disposable對象
    Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);
    is.setResource(d);
    ...
}

首先是創(chuàng)建了一個觀察者菌赖,該觀察者很明顯是實現(xiàn)了Disposable接口缭乘,因為將該觀察者順著onSubscribe傳遞給了下游,方便下游取消盏袄。隨后忿峻,將該觀察者交給線程調(diào)度器去執(zhí)行,顯然它還實現(xiàn)了Runnable接口辕羽,緊接著將調(diào)度器返回的Disposable對象設(shè)置給該觀察者逛尚。

static final class IntervalObserver
    extends AtomicReference<Disposable>
    implements Disposable, Runnable {

    private static final long serialVersionUID = 346773832286157679L;

    final Observer<? super Long> downstream;

    long count;
    
    //傳入的Observer是下游的
    IntervalObserver(Observer<? super Long> downstream) {
        this.downstream = downstream;
    }

    @Override
    public void dispose() {
        //取消自己
        DisposableHelper.dispose(this);
    }

    @Override
    public boolean isDisposed() {
        return get() == DisposableHelper.DISPOSED;
    }

    @Override
    public void run() {
        //通知下游
        if (get() != DisposableHelper.DISPOSED) {
            downstream.onNext(count++);
        }
    }

    public void setResource(Disposable d) {
        //設(shè)置Disposable給自己
        DisposableHelper.setOnce(this, d);
    }
}

IntervalObserver繼承自AtomicReference(AtomicReference類提供了一個可以原子讀寫的對象引用變量,避免出現(xiàn)線程安全問題)刁愿,泛型是Disposable绰寞。同時它也實現(xiàn)了Disposable和Runnable。在構(gòu)造方法里面?zhèn)魅胂掠蔚挠^察者铣口,方便待會兒把事件傳給下游滤钱。

當(dāng)事件一開始時,將IntervalObserver傳遞給下游脑题,因為它實現(xiàn)了Disposable件缸,可以被下游取消。然后將IntervalObserver傳遞給調(diào)度器叔遂,調(diào)度器會執(zhí)行里面的run方法他炊,run方法里面是將數(shù)據(jù)傳遞給下游争剿。在交給調(diào)度器的時候,返回了一個Disposable對象痊末,意味著可以隨時取消調(diào)度器里面的該任務(wù)蚕苇。然后將該Disposable對象設(shè)置給IntervalObserver的內(nèi)部,通過setResource方法凿叠,其實就是設(shè)置給IntervalObserver自己的涩笤,它本身就是一個AtomicReference<Disposable>。當(dāng)下游調(diào)用dispose時盒件,即調(diào)用IntervalObserver的dispose蹬碧,然后IntervalObserver內(nèi)部隨即調(diào)用自己的dispose方法,完成了取消履恩。

這里為什么設(shè)計的這么繞锰茉?直接將調(diào)度器返回的Disposable對象返回給下游不就可以了么,下游也可以對其進(jìn)行取消扒行摹?這樣設(shè)計的好處是上游傳遞給下游的永遠(yuǎn)是IntervalObserver對象片吊,下游直接拿著這個實現(xiàn)了Disposable的IntervalObserver對象可以直接調(diào)用它的dispose進(jìn)行取消绽昏。而不用管它內(nèi)部當(dāng)前是握著哪個Disposable對象,即使IntervalObserver內(nèi)部的Disposable被更換了也絲毫不影響下游對上游的取消操作俏脊。

5.3 Single.map 無后續(xù)全谤,無延遲,有上下游

先來個簡單例子

val singleInt = Single.just(1)
val singleString = singleInt.map(object : Function<Int, String> {
    override fun apply(t: Int): String {
        return t.toString()
    }
})
singleString.subscribe(object : SingleObserver<String> {
    override fun onSubscribe(d: Disposable?) {
    }

    override fun onSuccess(t: String) {
    }

    override fun onError(e: Throwable?) {
    }
})

singleInt.map點進(jìn)去

//Single.java
public final <@NonNull R> Single<R> map(@NonNull Function<? super T, ? extends R> mapper) {
    Objects.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new SingleMap<>(this, mapper));
}

通過上面的例子我們知道爷贫,上游是創(chuàng)建了一個SingleJust對象认然。在調(diào)用map時,將自己(也就是SingleJust)傳給下游SingleMap里面去了漫萄。

//SingleMap.java
public final class SingleMap<T, R> extends Single<R> {
    final SingleSource<? extends T> source;

    final Function<? super T, ? extends R> mapper;
    
    //source是上游卷员,通過構(gòu)造方法傳入進(jìn)來
    public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) {
        this.source = source;
        this.mapper = mapper;
    }

    @Override
    protected void subscribeActual(final SingleObserver<? super R> t) {
        //t是下游
        //訂閱
        source.subscribe(new MapSingleObserver<T, R>(t, mapper));
    }

    static final class MapSingleObserver<T, R> implements SingleObserver<T> {

        final SingleObserver<? super R> t;

        final Function<? super T, ? extends R> mapper;

        MapSingleObserver(SingleObserver<? super R> t, Function<? super T, ? extends R> mapper) {
            this.t = t;
            this.mapper = mapper;
        }

        @Override
        public void onSubscribe(Disposable d) {
            t.onSubscribe(d);
        }

        @Override
        public void onSuccess(T value) {
            R v;
            try {
                v = Objects.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                onError(e);
                return;
            }

            t.onSuccess(v);
        }

        @Override
        public void onError(Throwable e) {
            t.onError(e);
        }
    }
}

一開場就直接調(diào)用上游source訂閱MapSingleObserver這個觀察者。在MapSingleObserver的邏輯也比較簡單,就是實現(xiàn)了onSubscribe腾务、onSuccess毕骡、onError這些方法。然后在上游調(diào)用onSubscribe時調(diào)用下游的onSubscribe岩瘦;在上游調(diào)用onSuccess時自己做了一下mapper.apply(value)轉(zhuǎn)換操作未巫,將數(shù)據(jù)轉(zhuǎn)換成下游所需要的,然后再調(diào)用下游的onSuccess傳遞給下游启昧;onError同onSubscribe原理是一樣的叙凡。

5.4 Single.delay 無后續(xù),有延遲

來段示例代碼:

val singleInt: Single<Int> = Single.just(1)
val singleDelay: Single<Int> = singleInt.delay(1, TimeUnit.SECONDS)
val observer = object : SingleObserver<Int> {
    override fun onSubscribe(d: Disposable?) {
        log("onSubscribe")
    }

    override fun onSuccess(t: Int?) {
        log("onSuccess")
    }

    override fun onError(e: Throwable?) {
        log("onError")
    }
}
singleDelay.subscribe(observer)

直搗黃龍密末,Single.delay背后的對象是SingleDelay∥找現(xiàn)在有經(jīng)驗了跛璧,直接看它的subscribeActual

@Override
protected void subscribeActual(final SingleObserver<? super T> observer) {
    //可以確定的是這是一個Disposable
    final SequentialDisposable sd = new SequentialDisposable();
    //將這個Disposable通過onSubscribe傳遞給下游
    observer.onSubscribe(sd);
    //讓上游訂閱Delay這個觀察者
    source.subscribe(new Delay(sd, observer));
}

看下SequentialDisposable是什么玩意兒

public final class SequentialDisposable
extends AtomicReference<Disposable>
implements Disposable {
    public SequentialDisposable() {
        // nothing to do
    }
    public SequentialDisposable(Disposable initial) {
        lazySet(initial);
    }
    public boolean update(Disposable next) {
        return DisposableHelper.set(this, next);
    }
    public boolean replace(Disposable next) {
        return DisposableHelper.replace(this, next);
    }

    @Override
    public void dispose() {
        DisposableHelper.dispose(this);
    }

    @Override
    public boolean isDisposed() {
        return DisposableHelper.isDisposed(get());
    }
}

似曾相識,上面的IntervalObserver也是這種思想饼拍。只不過這里多了2個update和replace方法赡模,可以隨時更換AtomicReference里面的Disposable對象。這就體現(xiàn)出了這種設(shè)計的好處师抄,不管里面的Disposable怎么更換漓柑,傳遞給下游的是這個SequentialDisposable,下游只需要調(diào)SequentialDisposable的dispose就將其里面的Disposable給取消掉了叨吮,而不用管里面的Disposable究竟是誰辆布。

下面咱們來看SingleDelay里面的內(nèi)部類Delay(觀察者)

final class Delay implements SingleObserver<T> {
    //傳遞給下游的Disposable
    private final SequentialDisposable sd;
    //下游的觀察者
    final SingleObserver<? super T> downstream;

    Delay(SequentialDisposable sd, SingleObserver<? super T> observer) {
        this.sd = sd;
        this.downstream = observer;
    }

    @Override
    public void onSubscribe(Disposable d) {
        //開始訂閱的時候,sd內(nèi)部的Disposable是上游給過來的
        sd.replace(d);
    }

    @Override
    public void onSuccess(final T value) {
        //上游把數(shù)據(jù)給過來之后茶鉴,就不用管上游了锋玲,直接把sd里面Disposable 設(shè)置成線程調(diào)度器給回來那個
        //因為此時下游調(diào)用dispose的話,直接取消調(diào)度器里面的任務(wù)就行了
        //巧妙地將sd里面的Disposable掉包了
        sd.replace(scheduler.scheduleDirect(new OnSuccess(value), time, unit));
    }

    @Override
    public void onError(final Throwable e) {
        sd.replace(scheduler.scheduleDirect(new OnError(e), delayError ? time : 0, unit));
    }

    final class OnSuccess implements Runnable {
        private final T value;

        OnSuccess(T value) {
            this.value = value;
        }

        @Override
        public void run() {
            //調(diào)度器執(zhí)行到該任務(wù)時涵叮,將數(shù)據(jù)傳遞給下游
            downstream.onSuccess(value);
        }
    }

    final class OnError implements Runnable {
        private final Throwable e;

        OnError(Throwable e) {
            this.e = e;
        }

        @Override
        public void run() {
            downstream.onError(e);
        }
    }
}

這段代碼比較精彩惭蹂,首先在上游訂閱Delay的時候,觸發(fā)onSubscribe割粮,Delay內(nèi)部隨即將該Disposable存入SequentialDisposable對象(需要注意的是下游拿到的Disposable始終是這個SequentialDisposable)中盾碗。此時如果下游調(diào)用dispose,也就是調(diào)用SequentialDisposable的dispose舀瓢,也就是上游的dispose廷雅,dispose流程在這個節(jié)點上就完成了,向上傳遞京髓。

上游有數(shù)據(jù)了航缀,通過onSuccess傳遞給觀察者Delay的時候,SequentialDisposable就可以不用管上游的那個Disposable了堰怨,此時要關(guān)心的是傳遞給線程調(diào)度器里面的任務(wù)的取消事件了芥玉。所以直接將調(diào)度器返回的Disposable替換到SequentialDisposable內(nèi)部,此時下游進(jìn)行取消時诚些,就直接把任務(wù)給取消掉了飞傀。

當(dāng)調(diào)度器執(zhí)行到任務(wù)OnSuccess時,就把數(shù)據(jù)傳遞給下游诬烹,這個節(jié)點的任務(wù)就完成了砸烦。

5.5 Observable.map 有后續(xù),無延遲

Observable.map所對應(yīng)的是ObservableMap绞吁,直接上代碼:

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        //t是下游的觀察者
        //source是上游
        source.subscribe(new MapObserver<T, U>(t, function));
    }

    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                downstream.onNext(null);
                return;
            }

            U v;

            try {
                v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            downstream.onNext(v);
        }

        @Override
        public int requestFusion(int mode) {
            return transitiveBoundaryFusion(mode);
        }

        @Nullable
        @Override
        public U poll() throws Throwable {
            T t = qd.poll();
            return t != null ? Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
        }
    }
}

在subscribeActual中并沒有直接調(diào)用onSubscribe,而MapObserver中又沒有這個方法幢痘,那onSubscribe肯定是在其父類中完成的。在看onSubscribe之前咱干脆先把onNext理一下家破,這里通過mapper.apply轉(zhuǎn)一下之后馬上就交給下游的onNext去了颜说。

//BasicFuseableObserver.java
public abstract class BasicFuseableObserver<T, R> implements Observer<T>, QueueDisposable<R> {
    public BasicFuseableObserver(Observer<? super R> downstream) {
        this.downstream = downstream;
    }
    @Override
    public final void onSubscribe(Disposable d) {
        //驗證上游   d是上游的Disposable   upstream是當(dāng)前類的字段购岗,還沒有被賦值
        if (DisposableHelper.validate(this.upstream, d)) {
            this.upstream = d;
            if (d instanceof QueueDisposable) {
                this.qd = (QueueDisposable<T>)d;
            }
            //onSubscribe之前想做點什么事情的話,在beforeDownstream里面做
            if (beforeDownstream()) {
                //調(diào)用下游的onSubscribe
                downstream.onSubscribe(this);
                //onSubscribe之后想做點什么事情的話门粪,在afterDownstream里面做
                afterDownstream();
            }

        }
    }
    protected boolean beforeDownstream() {
        return true;
    }
    protected void afterDownstream() {
    }
    @Override
    public void dispose() {
        upstream.dispose();
    }
}

//DisposableHelper.java
public static boolean validate(Disposable current, Disposable next) {
    if (next == null) {
        RxJavaPlugins.onError(new NullPointerException("next is null"));
        return false;
    }
    if (current != null) {
        next.dispose();
        reportDisposableSet();
        return false;
    }
    return true;
}

還是先調(diào)用下游的onSubscribe喊积,不過,并沒有將上游的Disposable直接傳給下游玄妈,而是將中間節(jié)點BasicFuseableObserver自己傳給了下游乾吻,同時將上游的Disposable存儲起來,方便待會兒dispose拟蜻。

5.6 Observable.delay 無后續(xù)绎签,有延遲

Observable.delay 對應(yīng)的是ObservableDelay

public final class ObservableDelay<T> extends AbstractObservableWithUpstream<T, T> {
    @Override
    @SuppressWarnings("unchecked")
    public void subscribeActual(Observer<? super T> t) {
        Observer<T> observer;
        if (delayError) {
            observer = (Observer<T>)t;
        } else {
            observer = new SerializedObserver<>(t);
        }
        Scheduler.Worker w = scheduler.createWorker();
        source.subscribe(new DelayObserver<>(observer, delay, unit, w, delayError));
    }
}

在subscribeActual沒有調(diào)用下游的onSubscribe,那說明是在DelayObserver中完成的

static final class DelayObserver<T> implements Observer<T>, Disposable {
    final Scheduler.Worker w;
    Disposable upstream;

    DelayObserver(Observer<? super T> actual, long delay, TimeUnit unit, Worker w, boolean delayError) {
        super();
        this.downstream = actual;
        this.w = w;
        ...
    }

    @Override
    public void onSubscribe(Disposable d) {
        //1. 先驗證一下上游  然后將上游的Disposable賦值給upstream
        //2. 調(diào)用下游的onSubscribe酝锅,把自己傳給下游
        if (DisposableHelper.validate(this.upstream, d)) {
            this.upstream = d;
            downstream.onSubscribe(this);
        }
    }

    @Override
    public void onNext(final T t) {
        //OnNext任務(wù)提交給調(diào)度器執(zhí)行->在執(zhí)行任務(wù)時調(diào)用下游的onNext方法
        w.schedule(new OnNext(t), delay, unit);
    }

    @Override
    public void onError(final Throwable t) {
        w.schedule(new OnError(t), delayError ? delay : 0, unit);
    }

    @Override
    public void onComplete() {
        w.schedule(new OnComplete(), delay, unit);
    }

    @Override
    public void dispose() {
        //同時取消上游的Disposable和自己執(zhí)行的調(diào)度器任務(wù)
        upstream.dispose();
        w.dispose();
    }

    final class OnNext implements Runnable {
        private final T t;

        OnNext(T t) {
            this.t = t;
        }

        @Override
        public void run() {
            downstream.onNext(t);
        }
    }
    ...
}

onXxx的所有操作都放到了DelayObserver里面來完成诡必,在上游調(diào)用到這節(jié)的onSubscribe時,先驗證一下上游 然后將上游的Disposable賦值給upstream搔扁,調(diào)用下游的onSubscribe爸舒,把自己傳給下游。

當(dāng)下游調(diào)用dispose時稿蹲,在DelayObserver的dispose方法中將上游的Disposable給取消掉碳抄,然后把自己的調(diào)度器任務(wù)也給取消掉。

事件的傳遞:當(dāng)上游調(diào)用到這一節(jié)的onNext時场绿,OnNext任務(wù)(Runnable)提交給調(diào)度器執(zhí)行->在執(zhí)行任務(wù)時調(diào)用下游的onNext方法。

6. 線程切換

線程切換是RxJava的另一個重要功能嫉入。

6.1 subscribeOn

subscribeOn在Single場景下對應(yīng)的是SingleSubscribeOn這個類

public final class SingleSubscribeOn<T> extends Single<T> {
    final Scheduler scheduler;

    public SingleSubscribeOn(SingleSource<? extends T> source, Scheduler scheduler) {
        this.source = source;
        this.scheduler = scheduler;
    }
    @Override
    protected void subscribeActual(final SingleObserver<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer, source);
        observer.onSubscribe(parent);
        
        //切線程
        Disposable f = scheduler.scheduleDirect(parent);

        parent.task.replace(f);

    }
}

直接看subscribeActual方法焰盗,很明顯是將parent這個任務(wù)交給了線程調(diào)度器去執(zhí)行。那我們直接看SubscribeOnObserver的run方法即可

static final class SubscribeOnObserver<T>
extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable, Runnable {
    @Override
    public void run() {
        source.subscribe(this);
    }
}

在scheduleDirect那里切線程咒林,然后在另一個線程中去執(zhí)行source.subscribe(this)熬拒,也就是在Scheduler指定的線程里啟動subscribe(訂閱)。

  • 切換起源Observable的線程
  • 當(dāng)多次調(diào)用subscribeOn()的時候垫竞,只有最上面的會對起源Observable起作用

observeOn

observeOn在Single場景下的類是SingleObserveOn澎粟。它的subscribeActual方法如下:

@Override
protected void subscribeActual(final SingleObserver<? super T> observer) {
    source.subscribe(new ObserveOnSingleObserver<>(observer, scheduler));
}

上游訂閱了ObserveOnSingleObserver這個觀察者,核心就在這個觀察者里面欢瞪。

static final class ObserveOnSingleObserver<T> extends AtomicReference<Disposable>
    implements SingleObserver<T>, Disposable, Runnable {
    private static final long serialVersionUID = 3528003840217436037L;

    final SingleObserver<? super T> downstream;

    final Scheduler scheduler;

    T value;
    Throwable error;

    ObserveOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler) {
        this.downstream = actual;
        this.scheduler = scheduler;
    }

    @Override
    public void onSubscribe(Disposable d) {
        if (DisposableHelper.setOnce(this, d)) {
            downstream.onSubscribe(this);
        }
    }

    @Override
    public void onSuccess(T value) {
        this.value = value;
        Disposable d = scheduler.scheduleDirect(this);
        DisposableHelper.replace(this, d);
    }

    @Override
    public void onError(Throwable e) {
        this.error = e;
        Disposable d = scheduler.scheduleDirect(this);
        DisposableHelper.replace(this, d);
    }

    @Override
    public void run() {
        Throwable ex = error;
        if (ex != null) {
            downstream.onError(ex);
        } else {
            downstream.onSuccess(value);
        }
    }
    ...
}

我們重點關(guān)注一下onSuccess和onError方法活烙,核心就是將當(dāng)前這個Runnable任務(wù)交給scheduler進(jìn)行執(zhí)行,而這里的scheduler是由使用者傳入的遣鼓,比如說是AndroidSchedulers.mainThread()啸盏。那么在run方法執(zhí)行時,就會在主線程中骑祟,那么在主線程中執(zhí)行下游的onError和onSuccess回懦。 這里通過Scheduler指定的線程來調(diào)用下級Observer的對應(yīng)回調(diào)方法气笙。

  • 切換observeOn下面的Observer的回調(diào)所在的線程
  • 當(dāng)多次調(diào)用observerOn()的時候,每個都好進(jìn)行一次線程切換怯晕,影響范圍是它下面的每個Observer(除非又遇到新的obServeOn())

6.2 Scheduler的原理

上面我們多次提到Scheduler潜圃,但是一直不知道它具體是什么。其實它就是用來控制控制線程的舟茶,用于將指定的邏輯在指定的線程中執(zhí)行谭期。這里就不帶著大家讀源碼了,篇幅過于長了稚晚,這塊源碼也比較簡單崇堵,感興趣的讀者可以去翻閱一下。下面是幾個核心點客燕。

其中Schedulers.newThread()里面是創(chuàng)建了一個線程池Executors.newScheduledThreadPool(1, factory)來執(zhí)行任務(wù)鸳劳,但是這個線程池里面的線程不會得到重用,每次都是新建的線程池也搓。當(dāng) scheduleDirect() 被調(diào)用的時候赏廓,會創(chuàng)建一個 Worker,Worker 的內(nèi)部 會有一個 Executor傍妒,由 Executor 來完成實際的線程切換;scheduleDirect() 還會創(chuàng)建出一個 Disposable 對象幔摸,交給外層的 Observer,讓它能執(zhí)行 dispose() 操作颤练,取消訂閱鏈;

Schedulers.io()和Schedulers.newThread()差別不大既忆,但是io()這兒線程可能會被重用,所以一般io()用得多一些嗦玖。

AndroidSchedulers.mainThread()就更簡單了患雇,直接使用Handler進(jìn)行線程切換,將任務(wù)放到主線程去做宇挫,不管再怎么花里胡哨的庫苛吱,最后要切到主線程還得靠Handler。

7. 小結(jié)

Rxjava由于其基于事件流的鏈?zhǔn)秸{(diào)用器瘪、邏輯簡潔 & 使用簡單的特點翠储,深受各大 Android開發(fā)者的歡迎。平時在項目中也使用得比較多橡疼,所以本文對RxJava3中的訂閱流程援所、取消流程、線程切換進(jìn)行了核心源碼分析衰齐,希望能幫助到各位任斋。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子废酷,更是在濱河造成了極大的恐慌瘟檩,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,682評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件澈蟆,死亡現(xiàn)場離奇詭異墨辛,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)趴俘,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,277評論 3 395
  • 文/潘曉璐 我一進(jìn)店門睹簇,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人寥闪,你說我怎么就攤上這事太惠。” “怎么了疲憋?”我有些...
    開封第一講書人閱讀 165,083評論 0 355
  • 文/不壞的土叔 我叫張陵凿渊,是天一觀的道長。 經(jīng)常有香客問我缚柳,道長埃脏,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,763評論 1 295
  • 正文 為了忘掉前任秋忙,我火速辦了婚禮彩掐,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘灰追。我一直安慰自己堵幽,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,785評論 6 392
  • 文/花漫 我一把揭開白布弹澎。 她就那樣靜靜地躺著谐檀,像睡著了一般。 火紅的嫁衣襯著肌膚如雪裁奇。 梳的紋絲不亂的頭發(fā)上禁熏,一...
    開封第一講書人閱讀 51,624評論 1 305
  • 那天狈癞,我揣著相機(jī)與錄音桦山,去河邊找鬼食棕。 笑死啥寇,一個胖子當(dāng)著我的面吹牛涩禀,可吹牛的內(nèi)容都是我干的亿汞。 我是一名探鬼主播亮垫,決...
    沈念sama閱讀 40,358評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼羔沙,長吁一口氣:“原來是場噩夢啊……” “哼躺涝!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起扼雏,我...
    開封第一講書人閱讀 39,261評論 0 276
  • 序言:老撾萬榮一對情侶失蹤坚嗜,失蹤者是張志新(化名)和其女友劉穎夯膀,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體苍蔬,經(jīng)...
    沈念sama閱讀 45,722評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡诱建,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,900評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了碟绑。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片俺猿。...
    茶點故事閱讀 40,030評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖格仲,靈堂內(nèi)的尸體忽然破棺而出押袍,到底是詐尸還是另有隱情,我是刑警寧澤凯肋,帶...
    沈念sama閱讀 35,737評論 5 346
  • 正文 年R本政府宣布谊惭,位于F島的核電站,受9級特大地震影響否过,放射性物質(zhì)發(fā)生泄漏午笛。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,360評論 3 330
  • 文/蒙蒙 一苗桂、第九天 我趴在偏房一處隱蔽的房頂上張望药磺。 院中可真熱鬧,春花似錦煤伟、人聲如沸癌佩。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,941評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽围辙。三九已至,卻和暖如春放案,著一層夾襖步出監(jiān)牢的瞬間姚建,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,057評論 1 270
  • 我被黑心中介騙來泰國打工吱殉, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留掸冤,地道東北人。 一個月前我還...
    沈念sama閱讀 48,237評論 3 371
  • 正文 我出身青樓友雳,卻偏偏與公主長得像稿湿,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子押赊,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,976評論 2 355

推薦閱讀更多精彩內(nèi)容

  • 基本使用 添加依賴 定義Api請求接口倉庫 發(fā)起網(wǎng)絡(luò)請求 框架結(jié)構(gòu) RxJava的整體結(jié)構(gòu)是一條鏈 鏈的最上游:生...
    Hsicen閱讀 1,428評論 0 1
  • RxJava 相信各位已經(jīng)使用了很久饺藤,大部分人在剛學(xué)習(xí) RxJava 感嘆切換線程的方便,調(diào)用邏輯清晰的同時,并不...
    連續(xù)三屆村草閱讀 626評論 0 2
  • Rxjava 框架結(jié)構(gòu) RxJava 的整體結(jié)構(gòu)是一條鏈涕俗,其中: 鏈的上游:生產(chǎn)者 Observable 鏈的最下...
    拂曉是個小人物閱讀 993評論 0 2
  • 創(chuàng)建操作符 操作符使用 基本創(chuàng)建create() 完整創(chuàng)建1個被觀察者對象(Observable) 快速創(chuàng)建罗丰,發(fā)送...
    帝王鯊kingcp閱讀 1,522評論 0 1
  • 前言 自此文章起,逐層邁入RxJava2源碼世界咽袜,探索Rx思想丸卷。此前,需要對Rx有簡單了询刹,起碼曾使用過谜嫉。對于必要的...
    MxsQ閱讀 1,249評論 0 1