文章首發(fā)于我建立的一個筆記倉庫
1. 背景
RxJava是一個基于事件流、實現(xiàn)異步操作的庫。
官方介紹: RxJava:a library for composing asynchronous and event-based programs using observable sequences for the Java VM
(RxJava 是一個在 Java VM 上使用可觀測的序列來組成異步的海诲、基于事件的程序的庫)
文中用到的RxJava源碼版本為3.0.13靠粪,文中的demo源碼 https://github.com/xfhy/AllInOne/tree/master/app/src/main/java/com/xfhy/allinone/opensource/rxjava
2. 基礎(chǔ)使用
簡單介紹一下如何與Retrofit結(jié)合使用。引入:
implementation "io.reactivex.rxjava3:rxjava:3.0.13"
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
implementation "com.github.akarnokd:rxjava3-retrofit-adapter:3.0.0"
//Retrofit
implementation "com.squareup.retrofit2:retrofit:2.9.0"
//可選
implementation "com.squareup.retrofit2:converter-gson:2.9.0"
構(gòu)建Retrofit實例
private val retrofit by lazy {
Retrofit.Builder()
.baseUrl("https://www.wanandroid.com")
//使用Gson解析
.addConverterFactory(GsonConverterFactory.create())
//轉(zhuǎn)換器 RxJava3 每次執(zhí)行的時候在IO線程
.addCallAdapterFactory(RxJava3CallAdapterFactory.createWithScheduler(Schedulers.io()))
.build()
}
定義Retrofit的API:
interface WanAndroidService {
@GET("wxarticle/chapters/json")
fun listReposByRxJava(): Single<WxList?>
}
class WxList {
var errorMsg = ""
var errorCode = -1
var data = mutableListOf<Wx>()
class Wx {
var id: Int = 0
var name: String = ""
}
}
請求網(wǎng)絡(luò):
fun reqNet() {
val request = retrofit.create(WanAndroidService::class.java)
val call = request.listReposByRxJava()
call.observeOn(AndroidSchedulers.mainThread()).subscribe(object : SingleObserver<WxList?> {
override fun onSubscribe(d: Disposable?) {
tvContent.text = "開始請求網(wǎng)絡(luò)"
}
override fun onSuccess(t: WxList?) {
t?.let {
tvContent.text = it.data[0].name
}
}
override fun onError(e: Throwable?) {
tvContent.text = "網(wǎng)絡(luò)出錯"
}
})
}
這樣峦失,一個簡單的Retrofit與OKHttp的結(jié)合案例就完成了】松簦現(xiàn)在請求網(wǎng)絡(luò)的時候就可以使用RxJava那些鏈?zhǔn)讲僮髁恕?/p>
3. just : 最簡單的訂閱關(guān)系
先從最簡單的just開始筒严,看一下RxJava的訂閱關(guān)系是怎么樣的。
val just: Single<Int> = Single.just(1)
just.subscribe(object : SingleObserver<Int> {
override fun onSubscribe(d: Disposable?) {
}
override fun onSuccess(t: Int) {
}
override fun onError(e: Throwable?) {
}
})
Single.just(1)會構(gòu)建一個SingleJust實例出來情萤,
//Single.java
public static <@NonNull T> Single<T> just(T item) {
Objects.requireNonNull(item, "item is null");
return RxJavaPlugins.onAssembly(new SingleJust<>(item));
}
其中RxJavaPlugins.onAssembly是一個鉤子鸭蛙,不用在意,這段代碼就是返回一個SingleJust對象筋岛。
點進(jìn)去看一下subscribe是怎么走的
//Single.java
@Override
public final void subscribe(@NonNull SingleObserver<? super T> observer) {
...
subscribeActual(observer);
...
}
核心代碼就一句娶视,調(diào)用subscribeActual方法,從名字看是進(jìn)行實際地訂閱睁宰。那么我們將目光聚焦到subscribeActual里面肪获,它是一個抽象方法,就上面的demo而言其實際實現(xiàn)是剛才創(chuàng)建出來的SingleJust柒傻。
//Single.java
protected abstract void subscribeActual(@NonNull SingleObserver<? super T> observer);
//SingleJust.java
public final class SingleJust<T> extends Single<T> {
final T value;
public SingleJust(T value) {
this.value = value;
}
@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
observer.onSubscribe(Disposable.disposed());
observer.onSuccess(value);
}
}
SingleJust里面的代碼非常簡潔孝赫,在實際訂閱(調(diào)用subscribeActual)時,直接將傳進(jìn)來的觀察者(也就是上面?zhèn)魅氲腟ingleObserver)回調(diào)onSubscribe和onSuccess就完事了红符。此處沒有onError青柄,因為不會失敗伐债。
4. map 操作符
4.1 原理
我們知道,RxJava中map可以轉(zhuǎn)換數(shù)據(jù)致开,看一下它是怎么做到的
val singleInt = Single.just(1)
val singleString = singleInt.map(object : Function<Int, String> {
override fun apply(t: Int): String {
return t.toString()
}
})
singleString.subscribe(object : SingleObserver<String> {
override fun onSubscribe(d: Disposable?) {
}
override fun onSuccess(t: String) {
}
override fun onError(e: Throwable?) {
}
})
點進(jìn)去map看一下:
//Single.java
public final <@NonNull R> Single<R> map(@NonNull Function<? super T, ? extends R> mapper) {
Objects.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new SingleMap<>(this, mapper));
}
構(gòu)建了一個SingleMap峰锁,有了上面just的經(jīng)驗,訂閱的時候是走的SingleMap的subscribeActual方法喇喉。直接去看:
public final class SingleMap<T, R> extends Single<R> {
final SingleSource<? extends T> source;
final Function<? super T, ? extends R> mapper;
public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) {
this.source = source;
this.mapper = mapper;
}
@Override
protected void subscribeActual(final SingleObserver<? super R> t) {
source.subscribe(new MapSingleObserver<T, R>(t, mapper));
}
}
注意一下這個source祖今,它是啥?在構(gòu)造方法里面?zhèn)魅氲募鸺迹簿褪窃赟ingle.java的map方法那里傳入的this,這個this也就是Single.just(1)所構(gòu)建出來的SingleJust對象耍目。這個SingleJust也就是此處map的上游膏斤,上游把事件給下游。
此處訂閱時邪驮,就是調(diào)一下上游的subscribe與自己綁定起來莫辨,完成訂閱關(guān)系。現(xiàn)在生產(chǎn)者是上游毅访,而此處的SingleMap就是下游的觀察者沮榜。
MapSingleObserver,也就是map的觀察者喻粹,來看一下它是怎么實現(xiàn)的
public final class SingleMap<T, R> extends Single<R> {
static final class MapSingleObserver<T, R> implements SingleObserver<T> {
final SingleObserver<? super R> t;
final Function<? super T, ? extends R> mapper;
MapSingleObserver(SingleObserver<? super R> t, Function<? super T, ? extends R> mapper) {
this.t = t;
this.mapper = mapper;
}
@Override
public void onSubscribe(Disposable d) {
t.onSubscribe(d);
}
@Override
public void onSuccess(T value) {
R v;
try {
//mapper是demo中傳入的object : Function<Int, String>
v = Objects.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
onError(e);
return;
}
t.onSuccess(v);
}
@Override
public void onError(Throwable e) {
t.onError(e);
}
}
}
其實t是下游的觀察者蟆融,通過subscribeActual傳入。在上游調(diào)用map的onSubscribe同時守呜,map也向下傳遞這個事件型酥,調(diào)用下游觀察者的onSubscribe。在上游調(diào)用map的onSuccess時查乒,map自己進(jìn)行轉(zhuǎn)換一下弥喉,再交給下游的onSuccess。同理玛迄,onError也是一樣的路線由境。
到這里就理清楚了。
4.2 框架結(jié)構(gòu)
RxJava的整體結(jié)構(gòu)是一條鏈蓖议,其中:
- 鏈的最上游:生產(chǎn)者Observable
- 鏈的最下游:觀察者Observer
- 鏈的中間:各個中介節(jié)點虏杰,既是下游的Observable,又是上游的Observer
4.3 操作符Operator(map等)的本質(zhì)
- 基于原Observable創(chuàng)建一個新的Observable
- Observable內(nèi)部創(chuàng)建一個Observer
- 通過定制Observable的subscribeActual()方法和Observer的onXxx()方法拒担,來實現(xiàn)自己的中介角色(例如數(shù)據(jù)轉(zhuǎn)換嘹屯、線程切換等)
5. dispose工作原理
可以通過dispose()方法來讓上游或內(nèi)部調(diào)度器(或兩者都有)停止工作,達(dá)到「丟棄」的效果从撼。
下面分別講一下這幾種情況:
- Single.just 無后續(xù)州弟,無延遲
- Observable.interval 有后續(xù)钧栖,有延遲
- Single.map 無后續(xù),無延遲婆翔,有上下游
- Single.delay 無后續(xù)拯杠,有延遲
- Observable.map 有后續(xù),無延遲
- Observable.delay 無后續(xù)啃奴,有延遲
這幾種情況已經(jīng)足夠把所有dispose的情況都說明完整了潭陪。
5.1 Single.just 無后續(xù),無延遲
對于Single.just最蕾,情況比較簡單依溯,在SingleJust的subscribeActual中,給觀察者一個全局共享的Disposable對象瘟则。下游不能對其進(jìn)行取消黎炉,因為間隔太短了,馬上就調(diào)用onSuccess了醋拧。
@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
observer.onSubscribe(Disposable.disposed());
observer.onSuccess(value);
}
5.2 Observable.interval 有后續(xù)慷嗜,有延遲
先來一段示例代碼:
val longObservable: Observable<Long> = Observable.interval(0, 1, TimeUnit.SECONDS)
longObservable.subscribe(object : Observer<Long> {
override fun onSubscribe(d: Disposable?) {
}
override fun onNext(t: Long?) {
}
override fun onError(e: Throwable?) {
}
override fun onComplete() {
}
})
這里Observable.interval構(gòu)建的是ObservableInterval對象。有了前面的經(jīng)驗丹壕,直接進(jìn)去看ObservableInterval的subscribeActual方法庆械。
//ObservableInterval.java
@Override
public void subscribeActual(Observer<? super Long> observer) {
//1. 創(chuàng)建觀察者(該觀察者還實現(xiàn)了Disposable)
IntervalObserver is = new IntervalObserver(observer);
observer.onSubscribe(is);
//線程調(diào)度器
Scheduler sch = scheduler;
...
//將is(它實現(xiàn)了Runnable)這個任務(wù)交給線程調(diào)度器去執(zhí)行,同時返回一個Disposable對象
Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);
is.setResource(d);
...
}
首先是創(chuàng)建了一個觀察者菌赖,該觀察者很明顯是實現(xiàn)了Disposable接口缭乘,因為將該觀察者順著onSubscribe傳遞給了下游,方便下游取消盏袄。隨后忿峻,將該觀察者交給線程調(diào)度器去執(zhí)行,顯然它還實現(xiàn)了Runnable接口辕羽,緊接著將調(diào)度器返回的Disposable對象設(shè)置給該觀察者逛尚。
static final class IntervalObserver
extends AtomicReference<Disposable>
implements Disposable, Runnable {
private static final long serialVersionUID = 346773832286157679L;
final Observer<? super Long> downstream;
long count;
//傳入的Observer是下游的
IntervalObserver(Observer<? super Long> downstream) {
this.downstream = downstream;
}
@Override
public void dispose() {
//取消自己
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return get() == DisposableHelper.DISPOSED;
}
@Override
public void run() {
//通知下游
if (get() != DisposableHelper.DISPOSED) {
downstream.onNext(count++);
}
}
public void setResource(Disposable d) {
//設(shè)置Disposable給自己
DisposableHelper.setOnce(this, d);
}
}
IntervalObserver繼承自AtomicReference(AtomicReference類提供了一個可以原子讀寫的對象引用變量,避免出現(xiàn)線程安全問題)刁愿,泛型是Disposable绰寞。同時它也實現(xiàn)了Disposable和Runnable。在構(gòu)造方法里面?zhèn)魅胂掠蔚挠^察者铣口,方便待會兒把事件傳給下游滤钱。
當(dāng)事件一開始時,將IntervalObserver傳遞給下游脑题,因為它實現(xiàn)了Disposable件缸,可以被下游取消。然后將IntervalObserver傳遞給調(diào)度器叔遂,調(diào)度器會執(zhí)行里面的run方法他炊,run方法里面是將數(shù)據(jù)傳遞給下游争剿。在交給調(diào)度器的時候,返回了一個Disposable對象痊末,意味著可以隨時取消調(diào)度器里面的該任務(wù)蚕苇。然后將該Disposable對象設(shè)置給IntervalObserver的內(nèi)部,通過setResource方法凿叠,其實就是設(shè)置給IntervalObserver自己的涩笤,它本身就是一個AtomicReference<Disposable>
。當(dāng)下游調(diào)用dispose時盒件,即調(diào)用IntervalObserver的dispose蹬碧,然后IntervalObserver內(nèi)部隨即調(diào)用自己的dispose方法,完成了取消履恩。
這里為什么設(shè)計的這么繞锰茉?直接將調(diào)度器返回的Disposable對象返回給下游不就可以了么,下游也可以對其進(jìn)行取消扒行摹?這樣設(shè)計的好處是上游傳遞給下游的永遠(yuǎn)是IntervalObserver對象片吊,下游直接拿著這個實現(xiàn)了Disposable的IntervalObserver對象可以直接調(diào)用它的dispose進(jìn)行取消绽昏。而不用管它內(nèi)部當(dāng)前是握著哪個Disposable對象,即使IntervalObserver內(nèi)部的Disposable被更換了也絲毫不影響下游對上游的取消操作俏脊。
5.3 Single.map 無后續(xù)全谤,無延遲,有上下游
先來個簡單例子
val singleInt = Single.just(1)
val singleString = singleInt.map(object : Function<Int, String> {
override fun apply(t: Int): String {
return t.toString()
}
})
singleString.subscribe(object : SingleObserver<String> {
override fun onSubscribe(d: Disposable?) {
}
override fun onSuccess(t: String) {
}
override fun onError(e: Throwable?) {
}
})
singleInt.map點進(jìn)去
//Single.java
public final <@NonNull R> Single<R> map(@NonNull Function<? super T, ? extends R> mapper) {
Objects.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new SingleMap<>(this, mapper));
}
通過上面的例子我們知道爷贫,上游是創(chuàng)建了一個SingleJust對象认然。在調(diào)用map時,將自己(也就是SingleJust)傳給下游SingleMap里面去了漫萄。
//SingleMap.java
public final class SingleMap<T, R> extends Single<R> {
final SingleSource<? extends T> source;
final Function<? super T, ? extends R> mapper;
//source是上游卷员,通過構(gòu)造方法傳入進(jìn)來
public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) {
this.source = source;
this.mapper = mapper;
}
@Override
protected void subscribeActual(final SingleObserver<? super R> t) {
//t是下游
//訂閱
source.subscribe(new MapSingleObserver<T, R>(t, mapper));
}
static final class MapSingleObserver<T, R> implements SingleObserver<T> {
final SingleObserver<? super R> t;
final Function<? super T, ? extends R> mapper;
MapSingleObserver(SingleObserver<? super R> t, Function<? super T, ? extends R> mapper) {
this.t = t;
this.mapper = mapper;
}
@Override
public void onSubscribe(Disposable d) {
t.onSubscribe(d);
}
@Override
public void onSuccess(T value) {
R v;
try {
v = Objects.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
onError(e);
return;
}
t.onSuccess(v);
}
@Override
public void onError(Throwable e) {
t.onError(e);
}
}
}
一開場就直接調(diào)用上游source訂閱MapSingleObserver這個觀察者。在MapSingleObserver的邏輯也比較簡單,就是實現(xiàn)了onSubscribe腾务、onSuccess毕骡、onError這些方法。然后在上游調(diào)用onSubscribe時調(diào)用下游的onSubscribe岩瘦;在上游調(diào)用onSuccess時自己做了一下mapper.apply(value)
轉(zhuǎn)換操作未巫,將數(shù)據(jù)轉(zhuǎn)換成下游所需要的,然后再調(diào)用下游的onSuccess傳遞給下游启昧;onError同onSubscribe原理是一樣的叙凡。
5.4 Single.delay 無后續(xù),有延遲
來段示例代碼:
val singleInt: Single<Int> = Single.just(1)
val singleDelay: Single<Int> = singleInt.delay(1, TimeUnit.SECONDS)
val observer = object : SingleObserver<Int> {
override fun onSubscribe(d: Disposable?) {
log("onSubscribe")
}
override fun onSuccess(t: Int?) {
log("onSuccess")
}
override fun onError(e: Throwable?) {
log("onError")
}
}
singleDelay.subscribe(observer)
直搗黃龍密末,Single.delay背后的對象是SingleDelay∥找現(xiàn)在有經(jīng)驗了跛璧,直接看它的subscribeActual
@Override
protected void subscribeActual(final SingleObserver<? super T> observer) {
//可以確定的是這是一個Disposable
final SequentialDisposable sd = new SequentialDisposable();
//將這個Disposable通過onSubscribe傳遞給下游
observer.onSubscribe(sd);
//讓上游訂閱Delay這個觀察者
source.subscribe(new Delay(sd, observer));
}
看下SequentialDisposable是什么玩意兒
public final class SequentialDisposable
extends AtomicReference<Disposable>
implements Disposable {
public SequentialDisposable() {
// nothing to do
}
public SequentialDisposable(Disposable initial) {
lazySet(initial);
}
public boolean update(Disposable next) {
return DisposableHelper.set(this, next);
}
public boolean replace(Disposable next) {
return DisposableHelper.replace(this, next);
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
}
似曾相識,上面的IntervalObserver也是這種思想饼拍。只不過這里多了2個update和replace方法赡模,可以隨時更換AtomicReference里面的Disposable對象。這就體現(xiàn)出了這種設(shè)計的好處师抄,不管里面的Disposable怎么更換漓柑,傳遞給下游的是這個SequentialDisposable,下游只需要調(diào)SequentialDisposable的dispose就將其里面的Disposable給取消掉了叨吮,而不用管里面的Disposable究竟是誰辆布。
下面咱們來看SingleDelay里面的內(nèi)部類Delay(觀察者)
final class Delay implements SingleObserver<T> {
//傳遞給下游的Disposable
private final SequentialDisposable sd;
//下游的觀察者
final SingleObserver<? super T> downstream;
Delay(SequentialDisposable sd, SingleObserver<? super T> observer) {
this.sd = sd;
this.downstream = observer;
}
@Override
public void onSubscribe(Disposable d) {
//開始訂閱的時候,sd內(nèi)部的Disposable是上游給過來的
sd.replace(d);
}
@Override
public void onSuccess(final T value) {
//上游把數(shù)據(jù)給過來之后茶鉴,就不用管上游了锋玲,直接把sd里面Disposable 設(shè)置成線程調(diào)度器給回來那個
//因為此時下游調(diào)用dispose的話,直接取消調(diào)度器里面的任務(wù)就行了
//巧妙地將sd里面的Disposable掉包了
sd.replace(scheduler.scheduleDirect(new OnSuccess(value), time, unit));
}
@Override
public void onError(final Throwable e) {
sd.replace(scheduler.scheduleDirect(new OnError(e), delayError ? time : 0, unit));
}
final class OnSuccess implements Runnable {
private final T value;
OnSuccess(T value) {
this.value = value;
}
@Override
public void run() {
//調(diào)度器執(zhí)行到該任務(wù)時涵叮,將數(shù)據(jù)傳遞給下游
downstream.onSuccess(value);
}
}
final class OnError implements Runnable {
private final Throwable e;
OnError(Throwable e) {
this.e = e;
}
@Override
public void run() {
downstream.onError(e);
}
}
}
這段代碼比較精彩惭蹂,首先在上游訂閱Delay的時候,觸發(fā)onSubscribe割粮,Delay內(nèi)部隨即將該Disposable存入SequentialDisposable對象(需要注意的是下游拿到的Disposable始終是這個SequentialDisposable)中盾碗。此時如果下游調(diào)用dispose,也就是調(diào)用SequentialDisposable的dispose舀瓢,也就是上游的dispose廷雅,dispose流程在這個節(jié)點上就完成了,向上傳遞京髓。
上游有數(shù)據(jù)了航缀,通過onSuccess傳遞給觀察者Delay的時候,SequentialDisposable就可以不用管上游的那個Disposable了堰怨,此時要關(guān)心的是傳遞給線程調(diào)度器里面的任務(wù)的取消事件了芥玉。所以直接將調(diào)度器返回的Disposable替換到SequentialDisposable內(nèi)部,此時下游進(jìn)行取消時诚些,就直接把任務(wù)給取消掉了飞傀。
當(dāng)調(diào)度器執(zhí)行到任務(wù)OnSuccess時,就把數(shù)據(jù)傳遞給下游诬烹,這個節(jié)點的任務(wù)就完成了砸烦。
5.5 Observable.map 有后續(xù),無延遲
Observable.map所對應(yīng)的是ObservableMap绞吁,直接上代碼:
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
//t是下游的觀察者
//source是上游
source.subscribe(new MapObserver<T, U>(t, function));
}
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
try {
v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
downstream.onNext(v);
}
@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}
@Nullable
@Override
public U poll() throws Throwable {
T t = qd.poll();
return t != null ? Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
}
}
}
在subscribeActual中并沒有直接調(diào)用onSubscribe,而MapObserver中又沒有這個方法幢痘,那onSubscribe肯定是在其父類中完成的。在看onSubscribe之前咱干脆先把onNext理一下家破,這里通過mapper.apply轉(zhuǎn)一下之后馬上就交給下游的onNext去了颜说。
//BasicFuseableObserver.java
public abstract class BasicFuseableObserver<T, R> implements Observer<T>, QueueDisposable<R> {
public BasicFuseableObserver(Observer<? super R> downstream) {
this.downstream = downstream;
}
@Override
public final void onSubscribe(Disposable d) {
//驗證上游 d是上游的Disposable upstream是當(dāng)前類的字段购岗,還沒有被賦值
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
if (d instanceof QueueDisposable) {
this.qd = (QueueDisposable<T>)d;
}
//onSubscribe之前想做點什么事情的話,在beforeDownstream里面做
if (beforeDownstream()) {
//調(diào)用下游的onSubscribe
downstream.onSubscribe(this);
//onSubscribe之后想做點什么事情的話门粪,在afterDownstream里面做
afterDownstream();
}
}
}
protected boolean beforeDownstream() {
return true;
}
protected void afterDownstream() {
}
@Override
public void dispose() {
upstream.dispose();
}
}
//DisposableHelper.java
public static boolean validate(Disposable current, Disposable next) {
if (next == null) {
RxJavaPlugins.onError(new NullPointerException("next is null"));
return false;
}
if (current != null) {
next.dispose();
reportDisposableSet();
return false;
}
return true;
}
還是先調(diào)用下游的onSubscribe喊积,不過,并沒有將上游的Disposable直接傳給下游玄妈,而是將中間節(jié)點BasicFuseableObserver自己傳給了下游乾吻,同時將上游的Disposable存儲起來,方便待會兒dispose拟蜻。
5.6 Observable.delay 無后續(xù)绎签,有延遲
Observable.delay 對應(yīng)的是ObservableDelay
public final class ObservableDelay<T> extends AbstractObservableWithUpstream<T, T> {
@Override
@SuppressWarnings("unchecked")
public void subscribeActual(Observer<? super T> t) {
Observer<T> observer;
if (delayError) {
observer = (Observer<T>)t;
} else {
observer = new SerializedObserver<>(t);
}
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new DelayObserver<>(observer, delay, unit, w, delayError));
}
}
在subscribeActual沒有調(diào)用下游的onSubscribe,那說明是在DelayObserver中完成的
static final class DelayObserver<T> implements Observer<T>, Disposable {
final Scheduler.Worker w;
Disposable upstream;
DelayObserver(Observer<? super T> actual, long delay, TimeUnit unit, Worker w, boolean delayError) {
super();
this.downstream = actual;
this.w = w;
...
}
@Override
public void onSubscribe(Disposable d) {
//1. 先驗證一下上游 然后將上游的Disposable賦值給upstream
//2. 調(diào)用下游的onSubscribe酝锅,把自己傳給下游
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
downstream.onSubscribe(this);
}
}
@Override
public void onNext(final T t) {
//OnNext任務(wù)提交給調(diào)度器執(zhí)行->在執(zhí)行任務(wù)時調(diào)用下游的onNext方法
w.schedule(new OnNext(t), delay, unit);
}
@Override
public void onError(final Throwable t) {
w.schedule(new OnError(t), delayError ? delay : 0, unit);
}
@Override
public void onComplete() {
w.schedule(new OnComplete(), delay, unit);
}
@Override
public void dispose() {
//同時取消上游的Disposable和自己執(zhí)行的調(diào)度器任務(wù)
upstream.dispose();
w.dispose();
}
final class OnNext implements Runnable {
private final T t;
OnNext(T t) {
this.t = t;
}
@Override
public void run() {
downstream.onNext(t);
}
}
...
}
onXxx的所有操作都放到了DelayObserver里面來完成诡必,在上游調(diào)用到這節(jié)的onSubscribe時,先驗證一下上游 然后將上游的Disposable賦值給upstream搔扁,調(diào)用下游的onSubscribe爸舒,把自己傳給下游。
當(dāng)下游調(diào)用dispose時稿蹲,在DelayObserver的dispose方法中將上游的Disposable給取消掉碳抄,然后把自己的調(diào)度器任務(wù)也給取消掉。
事件的傳遞:當(dāng)上游調(diào)用到這一節(jié)的onNext時场绿,OnNext任務(wù)(Runnable)提交給調(diào)度器執(zhí)行->在執(zhí)行任務(wù)時調(diào)用下游的onNext方法。
6. 線程切換
線程切換是RxJava的另一個重要功能嫉入。
6.1 subscribeOn
subscribeOn在Single場景下對應(yīng)的是SingleSubscribeOn這個類
public final class SingleSubscribeOn<T> extends Single<T> {
final Scheduler scheduler;
public SingleSubscribeOn(SingleSource<? extends T> source, Scheduler scheduler) {
this.source = source;
this.scheduler = scheduler;
}
@Override
protected void subscribeActual(final SingleObserver<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer, source);
observer.onSubscribe(parent);
//切線程
Disposable f = scheduler.scheduleDirect(parent);
parent.task.replace(f);
}
}
直接看subscribeActual方法焰盗,很明顯是將parent這個任務(wù)交給了線程調(diào)度器去執(zhí)行。那我們直接看SubscribeOnObserver的run方法即可
static final class SubscribeOnObserver<T>
extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable, Runnable {
@Override
public void run() {
source.subscribe(this);
}
}
在scheduleDirect那里切線程咒林,然后在另一個線程中去執(zhí)行source.subscribe(this)
熬拒,也就是在Scheduler指定的線程里啟動subscribe(訂閱)。
- 切換起源Observable的線程
- 當(dāng)多次調(diào)用subscribeOn()的時候垫竞,只有最上面的會對起源Observable起作用
observeOn
observeOn在Single場景下的類是SingleObserveOn澎粟。它的subscribeActual方法如下:
@Override
protected void subscribeActual(final SingleObserver<? super T> observer) {
source.subscribe(new ObserveOnSingleObserver<>(observer, scheduler));
}
上游訂閱了ObserveOnSingleObserver這個觀察者,核心就在這個觀察者里面欢瞪。
static final class ObserveOnSingleObserver<T> extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable, Runnable {
private static final long serialVersionUID = 3528003840217436037L;
final SingleObserver<? super T> downstream;
final Scheduler scheduler;
T value;
Throwable error;
ObserveOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler) {
this.downstream = actual;
this.scheduler = scheduler;
}
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.setOnce(this, d)) {
downstream.onSubscribe(this);
}
}
@Override
public void onSuccess(T value) {
this.value = value;
Disposable d = scheduler.scheduleDirect(this);
DisposableHelper.replace(this, d);
}
@Override
public void onError(Throwable e) {
this.error = e;
Disposable d = scheduler.scheduleDirect(this);
DisposableHelper.replace(this, d);
}
@Override
public void run() {
Throwable ex = error;
if (ex != null) {
downstream.onError(ex);
} else {
downstream.onSuccess(value);
}
}
...
}
我們重點關(guān)注一下onSuccess和onError方法活烙,核心就是將當(dāng)前這個Runnable任務(wù)交給scheduler進(jìn)行執(zhí)行,而這里的scheduler是由使用者傳入的遣鼓,比如說是AndroidSchedulers.mainThread()啸盏。那么在run方法執(zhí)行時,就會在主線程中骑祟,那么在主線程中執(zhí)行下游的onError和onSuccess回懦。 這里通過Scheduler指定的線程來調(diào)用下級Observer的對應(yīng)回調(diào)方法气笙。
- 切換observeOn下面的Observer的回調(diào)所在的線程
- 當(dāng)多次調(diào)用observerOn()的時候,每個都好進(jìn)行一次線程切換怯晕,影響范圍是它下面的每個Observer(除非又遇到新的obServeOn())
6.2 Scheduler的原理
上面我們多次提到Scheduler潜圃,但是一直不知道它具體是什么。其實它就是用來控制控制線程的舟茶,用于將指定的邏輯在指定的線程中執(zhí)行谭期。這里就不帶著大家讀源碼了,篇幅過于長了稚晚,這塊源碼也比較簡單崇堵,感興趣的讀者可以去翻閱一下。下面是幾個核心點客燕。
其中Schedulers.newThread()里面是創(chuàng)建了一個線程池Executors.newScheduledThreadPool(1, factory)
來執(zhí)行任務(wù)鸳劳,但是這個線程池里面的線程不會得到重用,每次都是新建的線程池也搓。當(dāng) scheduleDirect() 被調(diào)用的時候赏廓,會創(chuàng)建一個 Worker,Worker 的內(nèi)部 會有一個 Executor傍妒,由 Executor 來完成實際的線程切換;scheduleDirect() 還會創(chuàng)建出一個 Disposable 對象幔摸,交給外層的 Observer,讓它能執(zhí)行 dispose() 操作颤练,取消訂閱鏈;
Schedulers.io()和Schedulers.newThread()差別不大既忆,但是io()這兒線程可能會被重用,所以一般io()用得多一些嗦玖。
AndroidSchedulers.mainThread()就更簡單了患雇,直接使用Handler進(jìn)行線程切換,將任務(wù)放到主線程去做宇挫,不管再怎么花里胡哨的庫苛吱,最后要切到主線程還得靠Handler。
7. 小結(jié)
Rxjava由于其基于事件流的鏈?zhǔn)秸{(diào)用器瘪、邏輯簡潔 & 使用簡單的特點翠储,深受各大 Android開發(fā)者的歡迎。平時在項目中也使用得比較多橡疼,所以本文對RxJava3中的訂閱流程援所、取消流程、線程切換進(jìn)行了核心源碼分析衰齐,希望能幫助到各位任斋。