RxJava強大的地方之一是他的鏈?zhǔn)秸{(diào)用樱报,輕松地在線程之間進(jìn)行切換骂际。這幾天也大概分析了一下RxJava的線程切換的主流程于是打算寫一篇文章及記錄一下葫慎。
我們使用RxJava進(jìn)行線程切換的場景很多時候都是在進(jìn)行網(wǎng)絡(luò)請求的時候进倍,在IO線程進(jìn)行網(wǎng)絡(luò)數(shù)據(jù)的請求處理福侈,最后在Android的主線程進(jìn)行請求數(shù)據(jù)的結(jié)果處理审磁。
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
當(dāng)然因為這段代碼的使用場景太多我們還可以利用ObservableTransformer操作符對其進(jìn)行簡化
public static <T>ObservableTransformer<T,T> io_main()
{
return new ObservableTransformer<T, T>() {
@Override
public ObservableSource<T> apply(@NonNull Observable<T> upstream) {
return upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
}
};
}
這樣我們在使用的時候就是這樣的:
.compose(RxTransformUtil.<Object>io_main())
是不是感覺方便了一丟丟
好了扯遠(yuǎn)了谈飒,現(xiàn)在來分析一下RxJava是如何做到線程的輕松調(diào)度的。
首先有幾個概念是非常重要的:
Scheduler官方的解釋是這樣的
A Scheduler is an object that specifies an API for scheduling units of work with or without delays or periodically.
初步看來Scheduler就是一個任務(wù)調(diào)度器相當(dāng)于就是一個調(diào)度中心的指揮者态蒂。當(dāng)然它是一個抽象類就肯定了Scheduler有很多具體的實現(xiàn)類杭措,例如IO線程的具體調(diào)度器就是IoScheduler。就像調(diào)度中心指揮者有客運中心的指揮者钾恢,有機場中心的指揮者一樣分別有不同的實現(xiàn)類手素。
當(dāng)然現(xiàn)在只有指揮者是肯定不行的鸳址,光頭司令怎么得行?這個時候關(guān)鍵的Worker類出現(xiàn)了泉懦,Worker官方的解釋是這樣的
Sequential Scheduler for executing actions on a single thread or event loop.
Disposing the Scheduler.Worker cancels all outstanding work and allows resource cleanup.
可以看到Worker就是線程任務(wù)的具體執(zhí)行者了稿黍。和Scheduler一樣Worker同樣也是抽象類,在不同的Scheduler具體實現(xiàn)類里面Worker也有自己的具體實現(xiàn)類崩哩,例如在IoScheduler類里面巡球,Worker的具體實現(xiàn)類就是EventLoopWorker,它負(fù)責(zé)管理IO線程的具體操作邓嘹,接下來我們就找到切入點看一看RxJava源碼里面都做了什么酣栈。
這里我們就以最典型的IO線程和主線程之間的切換為例來分析,線程切換的代碼就是上面的代碼。
Scheduler是以工廠方法對外提供它具體的實現(xiàn)類的汹押。Schedulers.io()可以提供一個IoScheduler的對象矿筝。你可以往里面看最后源碼是如何進(jìn)行IoScheduler的創(chuàng)建的
//創(chuàng)建IoScheduler
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
//接著就行了IoScheduler的一系列初始化,CachedWorkerPool地初始化 棚贾,并由RxThreadFactory進(jìn)行線程地創(chuàng)建跋涣,線程優(yōu)先級別設(shè)置,是否是守護(hù)進(jìn)程等等
現(xiàn)在IoScheduler有了鸟悴,我們就看subscribe里面到底做了什么
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
Hook我們不用管陈辱,可以看到是把當(dāng)前ObservableCreater對象和IoScheduler一起傳進(jìn)了ObservableSubscribeoOn的構(gòu)造函數(shù)里面。進(jìn)入到ObservableSubscribeOn里面看看细诸。
//AbstractObservableWithUpstream只是用來保存上游的源事件流的沛贪,就是保存剛剛傳入進(jìn)來的ObservableCreater
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
//裝飾模式 把下游的Observer裝飾成SubscribeOnObserver
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); //執(zhí)行下游Observer的onSubscribe(Disposable disposabel)方法,當(dāng)前線程是上游的執(zhí)行線程
s.onSubscribe(parent);
//開啟的子線程最終是以帶Disposable的返回值返回的
//在這里是將子線程加入管理震贵,因為這里是并發(fā)操作所以使用了AtomicReference<Object>的院子操作類利赋,是一種效率高于synchronized的樂觀鎖,感興趣的可以自行上網(wǎng)搜索
//我們只用知道這里加入管理了以后方便在以后我們切斷上下游的時候可以將我們的子線程一同dispose().
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> actual;
final AtomicReference<Disposable> s;
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
//這中間的代碼和最基本的鏈?zhǔn)秸{(diào)用關(guān)系是一樣的猩系,只不過在onNext媚送、onError、onComplete中實際上還是調(diào)用的下游真正的onNext寇甸、onError塘偎、onComplete
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
//這就是實際執(zhí)行的Runnable 會把其傳入IoScheduler中供Worker使用。
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
/*看到了吧拿霉,SubscribeOnObserver 作用其實就是將源事件流發(fā)生的地點和下游的事件流處理的地點訂閱在了子線程中進(jìn)行處理吟秩。
這樣上游發(fā)送事件流的地方就被切換到了子線程中。*/
source.subscribe(parent);
}
}
}
接下來我們仔細(xì)看一下上面代碼的這一段:
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
//這里scheduler.schedlerDirect非常的重要绽淘,可以看到RxJava把剛剛包裝好的Runnable對象傳入了方法里
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
我們跟進(jìn)去看一下里面的具體實現(xiàn)
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
//實際上是調(diào)用的下面3個參數(shù)的方法涵防,延遲時間為0
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
//創(chuàng)建具體的Worker類
final Worker w = createWorker();
//hook函數(shù)我們不用管,只要沒有設(shè)置依舊返回的是傳入的Runnable
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//將runnable和worker封裝到DisposeTask中
DisposeTask task = new DisposeTask(decoratedRun, w);
//執(zhí)行Worker的schedule方法具體的就是EventLoopWorker里面的schedule方法
w.schedule(task, delay, unit);
return task;
}
接下來我們來看一下EventLoopWorker里面的schedule方法是怎么實現(xiàn)的
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
//判斷是否解除訂閱
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
可以看到這里如果沒有被解除訂閱的話又會執(zhí)行到NewThreadWorker的scheduleActual方法里面沪铭。
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
//hook函數(shù)我們這里不用管decoratedRun依然是傳進(jìn)來的Runnable對象run
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//ScheduledRunnable是一個即實現(xiàn)了Runnable接口又實現(xiàn)了Callable接口的對象壮池,為了后面能成功加入到線程池當(dāng)中
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
//將sr加入到CompositeDisposable中偏瓤,方便管理
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
//將sr加入到線程池當(dāng)中 并將線程的執(zhí)行結(jié)果返回給 Future<?> f
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);//對運行結(jié)果進(jìn)行處理
} catch (RejectedExecutionException ex) {
if (parent != null) {
//在CompositeDisposable中一處剛剛加入的sr
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
接下來看一下ScheduledRunnable是如何對返回的結(jié)果進(jìn)行處理的
public void setFuture(Future<?> f) {
//一個死循環(huán)會一直判斷返回回來的結(jié)果 因為其實原子操作類,樂觀鎖的機制決定了如果不是想要的結(jié)果的話會重新執(zhí)行一次
for (;;) {
Object o = get(FUTURE_INDEX);
if (o == DONE) {
//完成直接return
return;
}
//如果取消訂閱了則直接取消線程任務(wù)
if (o == DISPOSED) {
f.cancel(get(THREAD_INDEX) != Thread.currentThread());
return;
}
//前兩者都不滿足的話 就將future的值存下來
if (compareAndSet(FUTURE_INDEX, o, f)) {
return;
}
}
}
到現(xiàn)在為止上游的線程切換大體的流程就分析的差不多了椰憋,我們從源碼中也可以分析出很多網(wǎng)上經(jīng)常說的一些結(jié)論硼补,最經(jīng)典的一條就是上游切換線程只有第一次生效,后面的線程切換都不起作用了熏矿,其實分析這點最重要的就是理解 ObservableSubscribeOn類里面下面的這段代碼了
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
再結(jié)合RxJava的鏈?zhǔn)讲僮饕押В幚頂?shù)據(jù)的時候是自下而上,而發(fā)射數(shù)據(jù)的時候是自上而下(這句話網(wǎng)上說的太多了票编,我最開始也是不理解褪储,只有自己真正看過源碼分析了,自己Debug一邊才能真正地理解)慧域。
好了先寫到這里了鲤竹,剩下的內(nèi)容我會放到另外一篇博客里面,感覺文章太長不利于閱讀昔榴。
這篇文章也是我第一次試著去分析源碼最后寫出的辛藻,很多都是我自己的理解,所以肯定有不妥當(dāng)或者錯誤的地方希望大家看到了以后能給我指出來互订,我一定改正吱肌!
最后
沒有最后了 大家再見~~~