基本使用
添加依賴
//retrofit 依賴
implementation 'com.squareup.retrofit2:retrofit:2.6.0'
implementation 'com.squareup.retrofit2:adapter-rxjava2:2.6.0'
implementation 'com.squareup.retrofit2:converter-gson:2.6.0'
//RxJava依賴
implementation 'io.reactivex.rxjava2:rxjava:2.2.8'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
定義Api請(qǐng)求接口倉庫
interface ApiStore {
@GET("/users/{user}/repos")
fun listRepos(@Path("user") user: String): Single<Any>
}
發(fā)起網(wǎng)絡(luò)請(qǐng)求
val mRetrofit = Retrofit.Builder()
.baseUrl("https://api.github.com/")
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
//默認(rèn)所有訂閱都在IO線程中執(zhí)行
//.addCallAdapterFactory(RxJava2CallAdapterFactory.createWithScheduler(Schedulers.io()))
.build()
val apiStore = mRetrofit.create(ApiStore::class.java)
apiStore.listRepos("hsicen")
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(object : SingleObserver<Any> {
override fun onSuccess(t: Any) {
Log.d("hsc", t.toString())
tv_info.text = t.toString()
}
override fun onSubscribe(d: Disposable) {
//獲取可取消對(duì)象隐解,方便后續(xù)取消請(qǐng)求
}
override fun onError(e: Throwable) {
Log.d("hsc", "請(qǐng)求失敗")
tv_info.text = "請(qǐng)求失敗"
}
})
框架結(jié)構(gòu)
RxJava的整體結(jié)構(gòu)是一條鏈
- 鏈的最上游:生產(chǎn)者(被觀察者) Observable/Single/Flowable/Maybe
- 鏈的最下游:消費(fèi)者(觀察者) Observer/SingleObserver/Subscriber/MaybeObserver
- 鏈的中間:各個(gè)中介節(jié)點(diǎn),即是下游的 Observable坞笙,又是上游的 Observer鞋邑,連接(訂閱) Subscribe
原理分析
先來看看簡單的使用 Single.just(xxx)
Single.just(1)
.subscribe(object : SingleObserver<Int> {
override fun onSuccess(t: Int) {
tv_info.text = "$t"
}
override fun onSubscribe(d: Disposable) {
tv_info.text = "開始"
}
override fun onError(e: Throwable) {
tv_info.text = "出錯(cuò)"
}
})
這里我們利用Single.just(),在上游發(fā)送了一個(gè)簡單的1秒裕,下游訂閱這個(gè)事件袱蚓,在onSuccess()中接收到事件后然后打印出來;由于Single沒有后續(xù)事件几蜻,所以只有開始訂閱onSubscribe喇潘,成功onSuccess,失敗onError三個(gè)方法梭稚;可以看到上面的代碼并沒有進(jìn)行線程的切換颖低,所有事件的發(fā)生都是在當(dāng)前線程中進(jìn)行的,也就是UI線程
現(xiàn)在我們點(diǎn)進(jìn)subscribe中弧烤,看看上游和下游是怎樣連接起來的
public final void subscribe(SingleObserver<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null SingleObserver. Please check the handler provided to RxJavaPlugins.setOnSingleSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
try {
subscribeActual(observer);
} catch (NullPointerException ex) {
throw ex;
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
NullPointerException npe = new NullPointerException("subscribeActual failed");
npe.initCause(ex);
throw npe;
}
}
可以看到在這個(gè)方法中起主要作用的就一句代碼 subscribeActual(observer)
, 所以我們只需要找到這個(gè)方法忱屑,看它里面做了什么操作,就知道是怎樣連接起來的了暇昂,下面我們就點(diǎn)進(jìn)這個(gè)方法
protected abstract void subscribeActual(@NonNull SingleObserver<? super T> observer);
這是一個(gè)抽象方法莺戒,沒有具體的實(shí)現(xiàn),但是我們可以發(fā)現(xiàn)急波,這是Single這個(gè)類的抽象方法从铲,所以們只需要找到這個(gè)類的實(shí)現(xiàn)類,也就可以找到這個(gè)抽象方法的具體實(shí)現(xiàn)澄暮;但是先別忙名段,我們不是還有一句代碼沒有看么阱扬,那我們先看看Single.just()做了什么操作
public static <T> Single<T> just(final T item) {
ObjectHelper.requireNonNull(item, "item is null");
return RxJavaPlugins.onAssembly(new SingleJust<T>(item));
}
這個(gè)方法沒有做什么操作,最主要的就是最后一句代碼伸辟,返回了一個(gè)SingleJust對(duì)象价认,這個(gè)SingleJust應(yīng)該就是Just的實(shí)現(xiàn)類,現(xiàn)在我們點(diǎn)進(jìn)這個(gè)類
public final class SingleJust<T> extends Single<T> {
final T value;
public SingleJust(T value) {
this.value = value;
}
@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
observer.onSubscribe(Disposables.disposed());
observer.onSuccess(value);
}
}
可以看到在這個(gè)類的subscribeActual方法中自娩,直接調(diào)用了下游的onSubscribe()和onSuccess()方法用踩,onError方法都不需要調(diào)用,而且在訂閱的時(shí)候調(diào)用的是Disposables.disposed()
,應(yīng)該是要返回一個(gè)可取消訂閱的對(duì)象忙迁,那么點(diǎn)進(jìn)這個(gè)方法脐彩,看看返回的是什么對(duì)象
public static Disposable disposed() {
return EmptyDisposable.INSTANCE;
}
public enum EmptyDisposable implements QueueDisposable<Object> {
INSTANCE,NEVER;
@Override
public void dispose() {
// no-op
}
@Override
public boolean isDisposed() {
return this == INSTANCE;
}
}
通過源碼可以看到返回的是一個(gè)EmptyDisposable,這個(gè)Disposable在創(chuàng)建時(shí)就默認(rèn)已經(jīng)取消了姊扔,所以Single.just()惠奸,一個(gè)沒有后續(xù)操作的事件,流程大致如下:
SingleJust just = Single.just(1); 創(chuàng)建被觀察者
just.subscribe(observer); 訂閱(連接觀察者和被觀察者)
just.subscribeActual(observer); 核心操作
接下來看一個(gè)復(fù)雜一點(diǎn)的恰梢,有后續(xù)操作的事件 Observable.interval()
Observable.interval(1, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(object : Observer<Long> {
override fun onComplete() {
tv_info.text = "結(jié)束"
}
override fun onSubscribe(d: Disposable) {
Log.d("hsc", " 線程: " + Thread.currentThread().name)
tv_info.text = "開始"
}
override fun onNext(t: Long) {
Log.d("hsc", " 線程: " + Thread.currentThread().name)
tv_info.text = "$t"
}
override fun onError(e: Throwable) {
tv_info.text = "出錯(cuò)"
}
})
上面代碼的功能是每個(gè)一秒發(fā)送一個(gè)事件佛南,下游接收到這個(gè)事件后打印出來;那么我們分析源碼還是先從subscribe()方法切入
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
可以發(fā)現(xiàn)嵌言,起主要作用的還是 subscribeActual(observer)
方法嗅回,這個(gè)方法同樣是Observable的抽象方法,所以摧茴,下面需要從另一方向切入绵载,看看Observable.interval()做了什么操作
public static Observable<Long> interval(long period, TimeUnit unit) {
return interval(period, period, unit, Schedulers.computation());
}
可以看到這里進(jìn)行了一層包裝,而且為我們切換了線程苛白,這也是為什么上面我們調(diào)用了observeOn()
,主動(dòng)進(jìn)行了線程切換的原因娃豹,繼續(xù)點(diǎn)進(jìn)去
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
ObjectHelper.requireNonNull(unit, "unit is null");
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableInterval(Math.max(0L, initialDelay), Math.max(0L, period), unit, scheduler));
}
在這個(gè)interval()方法中,就和上面的Single.just()類似了购裙,給我們返回了一個(gè)ObservableInterval
類懂版,而且對(duì)我們的傳進(jìn)來的參數(shù)進(jìn)行了兼容性處理,現(xiàn)在我們就點(diǎn)進(jìn)去看subscribeActual()
所做的處理
public void subscribeActual(Observer<? super Long> observer) {
IntervalObserver is = new IntervalObserver(observer);
observer.onSubscribe(is);
Scheduler sch = scheduler;
if (sch instanceof TrampolineScheduler) {
Worker worker = sch.createWorker();
is.setResource(worker);
worker.schedulePeriodically(is, initialDelay, period, unit);
} else {
Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);
is.setResource(d);
}
}
首先將下游observer進(jìn)行了包裝躏率,創(chuàng)建了一個(gè)IntervalObserver躯畴,這個(gè)IntervalObserver是一個(gè)可取消對(duì)象,實(shí)現(xiàn)了Disposable, Runnable接口禾锤, 然后調(diào)用了下游的訂閱方法私股,把這個(gè)可取消對(duì)象傳了過去摹察;然后就分支判斷恩掷,除非主動(dòng)設(shè)置,一般情況下都會(huì)走默認(rèn)的else分支供嚎;在else分支中先調(diào)用Scheduler的方法進(jìn)行了線程切換黄娘,后面有專門講Scheduler的原理峭状,這里只簡單的講一下這行代碼的作用,就是進(jìn)行線程切換逼争,最后調(diào)用is.setResource(d)
方法优床,這個(gè)方法的作用先不分析,現(xiàn)在只需要記住有這個(gè)方法誓焦,后面會(huì)返回來分析這個(gè)方法的作用
現(xiàn)在我們點(diǎn)進(jìn)IntervalObserver胆敞,看看它的后臺(tái)任務(wù)(run)是怎樣執(zhí)行的
static final class IntervalObserver extends AtomicReference<Disposable> implements Disposable, Runnable {
private static final long serialVersionUID = 346773832286157679L;
final Observer<? super Long> downstream;
long count;
IntervalObserver(Observer<? super Long> downstream) {
this.downstream = downstream;
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return get() == DisposableHelper.DISPOSED;
}
@Override
public void run() {
if (get() != DisposableHelper.DISPOSED) {
downstream.onNext(count++);
}
}
public void setResource(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
可以看到IntervalObserver繼承自AtomicReference,實(shí)現(xiàn)了Disposable和Runnable接口杂伟,在run方法中先判斷是否已經(jīng)取消了訂閱移层,若沒有取消訂閱,會(huì)調(diào)用下游的onNext()方法赫粥,然后count加1观话;可以看到可取消對(duì)象都是通過DisposableHelper來管理的,包括我們剛才的setResource(d)越平,這個(gè)setResource()方法只是一層包裝频蛔,里面是設(shè)置給DisposableHelper的,然后取消時(shí)也是通過DisposableHelper來取消的秦叛,這個(gè)可取消對(duì)象繼承自AtomicReferenc晦溪,是線程安全的,總結(jié)Observable.interval()流程如下:
訂閱過程:Observable.interval() -> ObservableInterval.subscribe(observer) -> subscribeActual(observer) -> IntervalObserver.run() -> observer.onNext()
取消訂閱過程:IntervalObserver.setResource(d) -> DisposableHelper處理
操作符分析
先來看一下沒有后續(xù)操作事件的操作符
Single.just(1)
.map { it + 3 }
.subscribe(object : SingleObserver<Int> {
override fun onSuccess(t: Int) {
tv_info.text = "$t"
}
override fun onSubscribe(d: Disposable) {
tv_info.text = "開始"
}
override fun onError(e: Throwable) {
tv_info.text = "出錯(cuò)"
}
})
前面已經(jīng)分析了just和subscribe所做的事情挣跋,現(xiàn)在我們點(diǎn)進(jìn)map
public final <R> Single<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new SingleMap<T, R>(this, mapper));
}
可以看到創(chuàng)建了一個(gè)新的對(duì)象SingleMap尼变,傳進(jìn)去了Single對(duì)象和map的操作邏輯函數(shù),當(dāng)下游的Observer調(diào)用subscribeActual()方法時(shí)浆劲,就會(huì)調(diào)用SingleMap的subscribeActual()方法
public final class SingleMap<T, R> extends Single<R> {
final SingleSource<? extends T> source;
final Function<? super T, ? extends R> mapper;
public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) {
this.source = source;
this.mapper = mapper;
}
@Override
protected void subscribeActual(final SingleObserver<? super R> t) {
source.subscribe(new MapSingleObserver<T, R>(t, mapper));
}
}
可以看到subscribeActual()方法中嫌术,調(diào)用了source的subscribe()方法,這個(gè)Source就是我們Single.just()創(chuàng)建的SingleJust牌借,而且將下游的Observer進(jìn)行了一層包裝度气,創(chuàng)建了一個(gè)MapSingleObserver,在SingleJust的subscribe()方法中會(huì)調(diào)用subscribeActual()方法膨报,然后流程就和上面沒有操作符的流程一樣了
現(xiàn)在還需要弄清楚是如何將數(shù)據(jù)傳遞給下游的Observer的磷籍,那么就需要弄清楚MapSingleObserver做了什么操作
static final class MapSingleObserver<T, R> implements SingleObserver<T> {
final SingleObserver<? super R> t;
final Function<? super T, ? extends R> mapper;
MapSingleObserver(SingleObserver<? super R> t, Function<? super T, ? extends R> mapper) {
this.t = t;
this.mapper = mapper;
}
@Override
public void onSubscribe(Disposable d) {
t.onSubscribe(d);
}
@Override
public void onSuccess(T value) {
R v = ObjectHelper.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
t.onSuccess(v);
}
@Override
public void onError(Throwable e) {
t.onError(e);
}
}
可以看到MapSingleObserver中,除了在onSuccess()中將數(shù)據(jù)進(jìn)行轉(zhuǎn)換外现柠,其它都是將事件直接傳遞給下游的Observer的院领,下面簡單總結(jié)一下事件的流程:
上游:
Single.just() -> 創(chuàng)建SingleJust對(duì)象
SingleJust.map(mapper) -> 創(chuàng)建SingleMap(SingleJust, mapper)對(duì)象
SingleMap.subscribe(Observer) -> 鏈接上下游發(fā)生訂閱
SingleMap.subscribeActual() -> 在訂閱方法subscribe()中調(diào)用
SingleJust.subscribe(MapSingleObserver(Observer,mapper))
SingleJust.subscribeActual(Observer) -> 發(fā)送事件到下游
下游:
SingleMap.subscribe(Observer) -> 鏈接上游和下游
SingleJust.subscribe(MapSingleObserver(Observer,mapper))
SingleJust.subscribeActual(MapSingleObserver) -> 發(fā)送事件到中轉(zhuǎn)Observer
MapSingleObserver 中轉(zhuǎn)SingleJust的事件到下游Observer
如果有多個(gè)操作會(huì)怎么樣呢?有了上面的分析够吩,其實(shí)我們可以發(fā)現(xiàn)當(dāng)存在多個(gè)操作符時(shí)比然,首先上游基于每個(gè)操作符都會(huì)創(chuàng)建一個(gè)新的Observable,在新的Observable的subscribeActual()方法中回調(diào)用source的subscribe()方法周循;然后下游Observer會(huì)封裝自己subscribe()方法傳進(jìn)來的Observer强法,創(chuàng)建一個(gè)新的Observer万俗,這個(gè)新的Observer充當(dāng)?shù)氖且粋€(gè)中轉(zhuǎn)的角色,它會(huì)把自己source傳遞過來的事件傳遞給通過subscribe傳遞過來的下游Observer饮怯;所以Observable是一層一層的往上傳闰歪,而Observer是一層一層的往下傳:
對(duì)于有后續(xù)操作的事件,其流程也是一樣的蓖墅,每一個(gè)操作符都會(huì)創(chuàng)建新的Observable對(duì)象和Observer對(duì)象库倘,用于鏈接上游和下游,傳遞事件
Disposable 原理分析
這個(gè)模塊主要分析一下訂閱的取消流程论矾,訂閱的取消要分多種情況于樟,根據(jù)下面幾種分類來分析一下
沒延遲,沒后續(xù)操作
這種情況是最簡單的拇囊,直接就是Single.just(xxx).subscribe(Observer)
在這種情況下迂曲,當(dāng)發(fā)生訂閱時(shí),會(huì)調(diào)用Disposables.disposed()返回一個(gè)已經(jīng)取消訂閱的Disposable對(duì)象
沒延遲寥袭,有后續(xù)操作
這種情況就是我們上面提到的Observable.interval()
在這種情況下路捧,會(huì)創(chuàng)建一個(gè)IntervalObserver和一個(gè)后臺(tái)執(zhí)行onNext操作的Worker對(duì)象,當(dāng)發(fā)生訂閱時(shí)會(huì)把這個(gè)可取消對(duì)象傳遞給下游传黄,下游調(diào)用dispose()取消訂閱時(shí)杰扫,會(huì)調(diào)用DisposableHelper.dispose()來處理取消訂閱操作;首先會(huì)把DisposableHelper置為DISPOSED的狀態(tài)膘掰,然后把自己內(nèi)部創(chuàng)建的worker取消掉(這個(gè)Worker是執(zhí)行后續(xù)onNext操作的Worker)
有延遲章姓,沒后續(xù)操作
這種情況相當(dāng)于給第一種情況加上了delay()操作符(會(huì)自動(dòng)切換線程)
在這種情況下,會(huì)創(chuàng)建一個(gè)SequentialDisposable對(duì)象识埋,然后在訂閱時(shí)把這個(gè)可取消對(duì)象傳給下游凡伊,下游拿到這個(gè)可取消對(duì)象就可以自由操作了;當(dāng)觸發(fā)成功和失敗事件時(shí)窒舟,會(huì)創(chuàng)建一個(gè)DisposeTask系忙,利用Scheduler延時(shí)發(fā)給下游Observer,并調(diào)用SequentialDisposable的replace()來替換掉之前的Disposable可取消對(duì)象惠豺,當(dāng)下游調(diào)用dispose()時(shí)银还,會(huì)交由DisposableHelper.dispose()來處理取消訂閱操作;由于有延時(shí)洁墙,會(huì)創(chuàng)建Worker對(duì)象來處理延時(shí)操作蛹疯,當(dāng)調(diào)用DisposableHelper.dispose()時(shí),首先會(huì)把DisposableHelper置為DISPOSED的狀態(tài)热监,然后把處理延時(shí)操作的Worker取消掉
有延時(shí)捺弦,有后續(xù)操作
這種情況相當(dāng)于給第二種情況加上了delay()操作符
在這種情況下,默認(rèn)會(huì)創(chuàng)建一個(gè)SerializedObserver和一個(gè)后臺(tái)執(zhí)行onNext操作的Worker對(duì)象;當(dāng)發(fā)生訂閱時(shí)直接調(diào)用下游的onSubscribe()羹呵,onNext(),onError(),onComplete()都會(huì)交由Worker進(jìn)行延遲下發(fā)骂际;當(dāng)調(diào)用dispose()取消訂閱時(shí)疗琉,會(huì)調(diào)用上游的dispose()和自己內(nèi)部Worker的取消
線程切換原理分析
RxJava有兩個(gè)線程切換方法冈欢,subscribeOn()和observerOn(),這兩個(gè)方法各有用處,下面就來分析一下這兩個(gè)方法所做的事情
subscribeOn
功能:在Scheduler指定的線程里啟動(dòng)訂閱 subscribe()
效果:
- 切換起源的 Observable 線程
- 當(dāng)多次調(diào)用 subscribeOn() 的時(shí)候盈简,只有第一個(gè)subscribeOn()會(huì)對(duì)起源的 Observable 起作用凑耻;后續(xù)的subscribeOn()會(huì)影響onSubscribe()的調(diào)用線程
單次調(diào)用subscribeOn()的大致流程如下:
多次調(diào)用subscribeOn()的大致流程如下:
當(dāng)帶有操作符的多次調(diào)用subscribeOn()的大致流程如下:
observeOn
功能: 在內(nèi)部創(chuàng)建的 Observer的 onNext(), onError(), onSuccess()/onComplete() 等回調(diào)方法里,通過Scheduler 指定的線程來調(diào)用下級(jí)Observer的對(duì)應(yīng)回調(diào)方法
效果:
- 切換 observeOn() 下面的 Observer 的回調(diào)所在的線程
- 當(dāng)多次調(diào)用 observeOn() 的時(shí)候柠贤,每個(gè)都會(huì)進(jìn)行一次線程切換香浩,影響范圍是它下面的每個(gè)Observer
流程大致如下:
Scheduler原理
這里主要涉及到兩個(gè)類,Schedulers 和 AndroidSchedulers
-
Schedulers.newThread() 和 Schedulers.io()臼勉,Schedulers.computation()
- 當(dāng)scheduleDirect() 被調(diào)用時(shí)邻吭,會(huì)創(chuàng)建一個(gè)Worker,Worker的內(nèi)部會(huì)有一個(gè)Executor宴霸,由Executor來完成實(shí)際的線程切換操作
- scheduleDirect() 還會(huì)創(chuàng)建出一個(gè)Disposable 對(duì)象囱晴,交給外層的Observer,讓它能夠執(zhí)行dispose()操作瓢谢,取消訂閱鏈
- newThread() 和 io() 的區(qū)別在于畸写,io是創(chuàng)建的緩存池,可能會(huì)對(duì)Executor進(jìn)行重用氓扛;computation創(chuàng)建的是線程池
-
AndroidSchedulers.mainThread()
- 創(chuàng)建Handler枯芬,獲取到mainLooper,發(fā)送消息到主線程執(zhí)行