在前面兩篇文章RxJava2筆記(一盐股、事件訂閱流程)和RxJava2筆記(二、事件取消流程)中耻卡,我們分別了解了事件的訂閱以及取消是如何進行的疯汁,接下來我們將要介紹RxJava的線程切換。
對RxJava有過了解的肯定知道卵酪,前面的代碼都是運行在主線程當(dāng)中幌蚊,讓我們來確認(rèn)下,繼續(xù)對代碼做一些改動凛澎,添加些打印日志霹肝,打印各個方法的當(dāng)前運行線程:
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--運行線程:" + Thread.currentThread().getName());
disposable = d;
}
@Override
public void onNext(Integer integer) {
Log.i(TAG, "onNext: " + integer + " --運行線程:" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError: " + e.getMessage());
e.printStackTrace();
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--運行線程:" + Thread.currentThread().getName());
}
};
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) {
Log.i(TAG, "subscribe--運行線程:" + Thread.currentThread().getName());
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).subscribe(observer);
輸出結(jié)果如下:
I/MainActivity: onSubscribe--運行線程:main
subscribe--運行線程:main
I/MainActivity: onNext: 1 --運行線程:main
onNext: 2 --運行線程:main
onNext: 3 --運行線程:main
onComplete--運行線程:main
可以看到,所有的線程都運行在主線程當(dāng)中塑煎。
而在實際開發(fā)中沫换,我們需要為網(wǎng)絡(luò)請求相關(guān)代碼單獨開啟一個子線程,將網(wǎng)絡(luò)請求代碼運行在這個子線程中最铁,當(dāng)網(wǎng)絡(luò)請求結(jié)束返回數(shù)據(jù)并開始更新UI界面時讯赏,我們需要將線程切換回主線程后才能去更新UI界面,否則就會報錯(android.view.ViewRootImpl$CalledFromWrongThreadException: Only the original thread that created a view hierarchy can touch its views. at android.view.ViewRootImpl.checkThread(ViewRootImpl.java:6838))
這段話的意思其實就是我們只可以在主線程中更新UI冷尉,當(dāng)我們在其他線程中更新UI時漱挎,就會拋出這個異常。
而使用RxJava切換線程也非常簡單雀哨,只需要添加兩個調(diào)用方法即可:
我們在訂閱線程添加了一行代碼TimeUnit.MILLISECONDS.sleep(1000)來模擬網(wǎng)絡(luò)請求磕谅。如果在此處不添加這行代碼,當(dāng)我們把observeOn(AndroidSchedulers.mainThread())這行代碼去掉時雾棺,程序仍能正常運行膊夹,有興趣的讀者可以試一下。至于原因捌浩,參考這篇文章Android中子線程真的不能更新UI嗎放刨?。
好了尸饺,按照上圖的代碼进统,我們再打印下結(jié)果(打印結(jié)果中間亂入了兩行助币,這是因為訂閱線程睡眠了1s的緣故,我們直接無視掉就行):
I/MainActivity: onSubscribe--運行線程:main
I/MainActivity: subscribe--運行線程:RxCachedThreadScheduler-1
I/MainActivity: onNext: 1 --運行線程:main
I/MainActivity: onNext: 2 --運行線程:main
I/OpenGLRenderer: Initialized EGL, version 1.4
W/art: Before Android 4.1, method int android.support.v7.widget.DropDownListView.lookForSelectablePosition(int, boolean) would have incorrectly overridden the package-private method in android.widget.ListView
I/MainActivity: onNext: 3 --運行線程:main
I/MainActivity: onComplete--運行線程:main
從上面的結(jié)果可以看到螟碎,我們的訂閱線程(即發(fā)射數(shù)據(jù)事件所在的線程眉菱,也就是被觀察者)運行在了名為RxCachedThreadScheduler-1這個線程中,而observer的四個方法(觀察者)均運行在main線程中(也就是UI線程)抚芦。接下來我們就開始分析這兩個線程切換代碼做了哪些工作倍谜。
我們先從訂閱線程開始:
將訂閱線程切換到子線程運行
將訂閱線程切換到子線程只需要調(diào)用方法subscribeOn(Schedulers.io())就能完成切換。那么這個方法做了哪些工作呢叉抡?我們點進去看看:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
該方法接收一個Scheduler類型的參數(shù),這個Scheduler類就是負(fù)責(zé)線程調(diào)度的答毫,關(guān)于這個類我們稍后再討論褥民。沿著上面的代碼接著分析下去,subscribeOn(Scheduler scheduler)方法將傳入的scheduler參數(shù)又傳遞到了new ObservableSubscribeOn<T>(this, scheduler)這個構(gòu)造方法中洗搂,然后通過RxJavaPlugins.onAssembly將這個生成的ObservableSubscribeOn對象作為鉤子返回生成一個新的Observable對象消返。我們接著來看ObservableSubscribeOn這個類:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T>{
}
發(fā)現(xiàn)它繼承了AbstractObservableWithUpstream這個類
/**
* Base class for operators with a source consumable.
*
* @param <T> the input source type
* @param <U> the output type
*/
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
/** The source consumable Observable. */
protected final ObservableSource<T> source;
/**
* Constructs the ObservableSource with the given consumable.
* @param source the consumable Observable
*/
AbstractObservableWithUpstream(ObservableSource<T> source) {
this.source = source;
}
@Override
public final ObservableSource<T> source() {
return source;
}
}
通過源碼可以發(fā)現(xiàn),AbstractObservableWithUpstream繼承了Observable耘拇,其內(nèi)部有個ObservableSource類型成員變量source撵颊,這個ObservableSource又是什么?我們點進去看下
/**
* Represents a basic, non-backpressured {@link Observable} source base interface,
* consumable via an {@link Observer}.
*
* @param <T> the element type
* @since 2.0
*/
public interface ObservableSource<T> {
/**
* Subscribes the given Observer to this ObservableSource instance.
* @param observer the Observer, not null
* @throws NullPointerException if {@code observer} is null
*/
void subscribe(@NonNull Observer<? super T> observer);
}
可以看出惫叛,ObservableSource是一個接口倡勇,Observable實現(xiàn)了這個接口,而這個接口和前面的ObservableOnSubscribe接口如出一轍嘉涌。因此AbstractObservableWithUpstream中的source成員變量就是用來保存上游傳遞下來的observable妻熊,也就是Observable.create方法生成的數(shù)據(jù)源。
我們來看下ObservableSubscribeOn的源碼:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
//1仑最、線程調(diào)度器扔役,這里是負(fù)責(zé)訂閱線程切換
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
//2、保存上游傳遞的observable
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
//3警医、封裝外部傳遞進來的觀察者對象(將observer包裝起來亿胸,這是一個裝飾器模式)
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//4、建立訂閱關(guān)系(觀察者調(diào)用自己的onSubscribe方法)
s.onSubscribe(parent);
//5预皇、將訂閱線程中的disposable賦值給parent(也就是SubscribeOnObserver對象)
//這里使用scheduler開始線程調(diào)度侈玄,將外部observer的包裝對象parent用SubscribeTask構(gòu)造方法包裝起來并使這個SubscribeTask運行在我們指定的線程中
//這個SubscribeTask是一個Runnable,實際上真正的訂閱是發(fā)生在它的run()方法里面深啤,而這個run()方法正是運行在我們前面指定的線程中(比如Schedulers.io()指定的IO線程)拗馒。
//這樣我們就完成了線程切換
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
//......代碼省略
}
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
//6、訂閱事件發(fā)生
source.subscribe(parent);
}
}
}
在步驟3中溯街,這個外部observer的包裝類SubscribeOnObserver跟前面所講的CreateEmitter相比較起來诱桂,雖然它們都是observer包裝類洋丐,但還是有一定的區(qū)別的,我們來分析下這個類:
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
//7挥等、保存外部傳進來的observer對象
//(這里的actual用于保存下游傳遞過來的observer友绝,
//當(dāng)前這個類SubscribeOnObserver對象可以看做是介于上游observable和下游observer之間的一個中間observer,主要用于輔助訂閱線程切換)
final Observer<? super T> actual;
//8肝劲、保存訂閱發(fā)生時生成的disposable迁客,用于后續(xù)的解除訂閱
final AtomicReference<Disposable> s;
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable s) {
//9、該方法在調(diào)用source.subscribe(parent)時最先調(diào)用辞槐,
//將當(dāng)前訂閱事件產(chǎn)生的disposable保存到AtomicReference<Disposable> s這個成員變量中掷漱,用于之后取消訂閱
//(從上面的代碼可以看出這里傳入的參數(shù)實際上就是SubscribeOnObserver自身對象)
DisposableHelper.setOnce(this.s, s);
}
@Override
public void onNext(T t) {
//10、這里調(diào)用下游observer的onNext方法榄檬,actual保存的就是下游傳遞過來的observer
//后面的onComplete和onError方法同理
actual.onNext(t);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
@Override
public void dispose() {
//11卜范、取消訂閱時除了要終止當(dāng)前訂閱事件(onSubscribe方法被調(diào)用時保存的disposable(保存在其成員變量s中)),
//還要終止scheduler線程調(diào)度時保存其返回的task(保存在其自身)鹿榜,
//因為網(wǎng)絡(luò)請求任務(wù)是運行在獨立的線程中海雪,終止訂閱事件時,我們也需要終止相應(yīng)運行的線程
DisposableHelper.dispose(s);
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
//12舱殿、這個方法是在ObservableSubscribeOn類中的subscribeActual方法內(nèi)部調(diào)用的奥裸,
//用于保存scheduler進行線程調(diào)度時返回的task(實際上返回的是DisposeTask類型的對象,它實現(xiàn)了Disposable接口沪袭,這個后面再討論)
//這里是將scheduler線程調(diào)度時返回的task保存到自身
//(SubscribeOnObserver繼承了AtomicReference<Disposable>類(繼承該類保證了線程安全)和Disposable接口)
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
我們來梳理下思路:
- 1湾宙、當(dāng)我們調(diào)用subscribeOn(Schedulers.io())方法時,調(diào)用這個方法的對象實際上是Observable.create方法生成的一個新的observable對象(即ObservableCreate對象枝恋,該對象有個source成員變量创倔,用于保存上游傳遞過來的observable被觀察者,也就是事件源)焚碌;調(diào)用subscribeOn這個方法又會產(chǎn)生一個新的observable對象(通過new ObservableSubscribeOn<T>(this, scheduler)得到)畦攘,用于進一步的事件操作。
- 2十电、步驟1中提到的ObservableSubscribeOn構(gòu)造方法第一個參數(shù)是ObservableSource接口類型(Observable實現(xiàn)了這個接口)的參數(shù)知押,該參數(shù)實際上接收的是步驟1中的ObservableCreate對象,并保存在其成員變量source中(類型為ObservableSource鹃骂,通過繼承AbstractObservableWithUpstream得到)台盯。
- 3、在ObservableSubscribeOn類的subscribeActual(final Observer<? super T> s)方法中(這個方法接收的observer參數(shù)是從下游傳遞過來的)畏线,將外部傳進來的observer對象用SubscribeOnObserver內(nèi)部類包裝起來(保存在其成員變量actual中)得到該內(nèi)部類的一個對象parent(通過SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s)静盅,這個內(nèi)部類主要用于輔助訂閱線程切換)。
- 4寝殴、然后通過s.onSubscribe(parent)建立訂閱關(guān)系蒿叠,最后執(zhí)行parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)))明垢,通過scheduler.scheduleDirect(new SubscribeTask(parent))完成線程切換。(備注:SubscribeTask是一個Runnable市咽,這里是將parent傳入到SubscribeTask的構(gòu)造方法中痊银,并在SubscribeTask的run()方法中執(zhí)行訂閱(source.subscribe(parent)))。
至此施绎,訂閱線程的切換流程就介紹完了溯革,本文也就告一段落。下面谷醉,還有幾個細(xì)節(jié)致稀,如果只關(guān)注訂閱線程切換流程的話,可以結(jié)束本文的閱讀了俱尼。
留下的幾個問題:
調(diào)用subscribeOn(Schedulers.io())這個傳入的Schedulers.io()到底是啥豺裆?
我們點進去看下代碼:
/**
* Returns a default, shared {@link Scheduler} instance intended for IO-bound work.
* ......已省略
* @return a {@link Scheduler} meant for IO-bound work
*/
@NonNull
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
看注釋,我們了解到這個方法是返回一個綁定在IO操作的默認(rèn)共享實例号显,返回類型是Scheduler類型。RxJavaPlugins我們前面提到過躺酒,它的作用是返回一個鉤子押蚤,這里返回的是一個常量IO,這個IO主要負(fù)責(zé)網(wǎng)絡(luò)通信任務(wù)羹应。其實Schedulers類內(nèi)部聲明了很多個常量揽碘,我們大致看下:
public final class Schedulers {
@NonNull
static final Scheduler SINGLE;
@NonNull
static final Scheduler COMPUTATION;
@NonNull
static final Scheduler IO;
@NonNull
static final Scheduler TRAMPOLINE;
@NonNull
static final Scheduler NEW_THREAD;
static final class SingleHolder {
static final Scheduler DEFAULT = new SingleScheduler();
}
static final class ComputationHolder {
static final Scheduler DEFAULT = new ComputationScheduler();
}
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
static final class NewThreadHolder {
static final Scheduler DEFAULT = new NewThreadScheduler();
}
static {
SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
IO = RxJavaPlugins.initIoScheduler(new IOTask());
TRAMPOLINE = TrampolineScheduler.instance();
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}
//......代碼省略
}
Schedulers類一共內(nèi)置了5個Scheduler類型的常量,除了IO之外园匹,其他幾個也簡單介紹下:
- SINGLE:創(chuàng)建一個共享的單一線程雳刺,所有工作均在這個線程里面執(zhí)行
- COMPUTATION:創(chuàng)建一個指定數(shù)量線程的線程池,主要適用于計算密集型的任務(wù)
- IO:創(chuàng)建一個預(yù)置一定數(shù)量線程的線程池裸违,主要用于IO密集型的任務(wù)
- TRAMPOLINE:使用該種方式的任務(wù)會在當(dāng)前線程上運行掖桦,但并不會馬上執(zhí)行,任務(wù)會被保存在一個隊列中供汛,等當(dāng)前任務(wù)執(zhí)行完后再從隊列中把該任務(wù)取出來并執(zhí)行枪汪。
- NEW_THREAD:直接啟動一個新線程執(zhí)行指定任務(wù)。
- 還有一種創(chuàng)建線程的方式:Scheduler from(@NonNull Executor executor)怔昨,這是由我們自己指定線程創(chuàng)建以及調(diào)度方式雀久。
上面六種方式常用的為IO和COMPUTATION,本文我們就分析下這個IO趁舀,COMPUTATION其實和IO是類似的赖捌,讀者可以自行分析。
我們看到這個常量IO執(zhí)行了靜態(tài)代碼初始化矮烹,初始化為一個IOTask對象:
static final class IOTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return IoHolder.DEFAULT;
}
}
Callable和Runnable類似越庇,區(qū)別是使用Callable可以得到返回值罩锐,而使用Runnable則沒有返回值。
上面的代碼在call()方法內(nèi)返回了IoHolder.DEFAULT悦荒,最終返回一個IoScheduler類型的對象:
IO = RxJavaPlugins.initIoScheduler(new IOTask());
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
有人在這里可能就有疑問唯欣,Schedulers.io()返回的是一個Scheduler對象啊,這個IOTask的call()方法是什么時候執(zhí)行的呢搬味?
我們可以看到境氢,IO這個變量在聲明的時候它的類型就是Scheduler類型,因此可以想到它在初始化的時候必然是經(jīng)過了一些處理碰纬。我們看下IO的初始化代碼:
IO = RxJavaPlugins.initIoScheduler(new IOTask());
這里直接將IOTask對象作為參數(shù)傳遞給 RxJavaPlugins.initIoScheduler(@NonNull Callable<Scheduler> defaultScheduler)方法萍聊,這個方法正好接收一個Callable對象,看來call()方法應(yīng)該就是在這里面執(zhí)行的悦析,點進去看下:
@NonNull
public static Scheduler initIoScheduler(@NonNull Callable<Scheduler> defaultScheduler) {
ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null");
Function<? super Callable<Scheduler>, ? extends Scheduler> f = onInitIoHandler;
if (f == null) {
return callRequireNonNull(defaultScheduler);
}
return applyRequireNonNull(f, defaultScheduler);
}
我們看下這個方法寿桨,首先驗證傳入的參數(shù)不能為空,然后定義一個Function類型的臨時變量f强戴,并用同是類型為Function的成員變量onInitIoHandler為其賦值亭螟,然后判斷f==null是否成立,也就是onInitIoHandler==null是否成立骑歹。這里f==null是成立的预烙,原因是RxJavaPlugins內(nèi)部并沒有onInitIoHandler預(yù)初始化方法,只有與其相關(guān)的Getter和Setter方法道媚,而在本文開始到目前為止扁掸,onInitIoHandler的Setter方法也并沒有被調(diào)用過,因此onInitIoHandler為null最域,自然f==null也就為true谴分,因此調(diào)用callRequireNonNull(defaultScheduler)方法,我們點進這個方法看下:
static Scheduler callRequireNonNull(@NonNull Callable<Scheduler> s) {
try {
return ObjectHelper.requireNonNull(s.call(), "Scheduler Callable result can't be null");
} catch (Throwable ex) {
throw ExceptionHelper.wrapOrThrow(ex);
}
}
終于镀脂,我們看到了call()方法的調(diào)用牺蹄。上面說到,在IOTask類中狗热,call()方法返回的是一個IoScheduler對象钞馁,因此結(jié)合前面的分析,這個IO在靜態(tài)初始化的時候就被初始化為一個IoScheduler對象匿刮。
這里還有一個問題僧凰,f==null這個條件什么時候不成立呢?當(dāng)然是我們想要自己處理傳入的Callable對象了熟丸。那要如何自己處理呢训措?我們自己聲明一個繼承自Function接口的處理類,并實現(xiàn)接口中的apply方法(當(dāng)然了這里面最終肯定是要調(diào)用call()方法返回Scheduler對象的,我們可以在返回之前做一些自己的處理)绩鸣,然后調(diào)用onInitIoHandler的靜態(tài)Setter方法(RxJavaPlugins類中的成員變量都是靜態(tài)的怀大,因此其Setter和Getter方法也都是靜態(tài)的)為onInitIoHandler賦值,此時由于onInitIoHandler是不為null的呀闻,因此f==null不成立化借,自然就執(zhí)行下面的方法,我們簡要的看下代碼:
@NonNull
static Scheduler applyRequireNonNull(@NonNull Function<? super Callable<Scheduler>, ? extends Scheduler> f, Callable<Scheduler> s) {
//這里調(diào)用apply(f, s)捡多,f為我們自定義的處理類對象
return ObjectHelper.requireNonNull(apply(f, s), "Scheduler Callable result can't be null");
}
@NonNull
static <T, R> R apply(@NonNull Function<T, R> f, @NonNull T t) {
try {
//調(diào)用f中的apply方法蓖康,也就是我們自己定義的處理方法,t為Callable對象
return f.apply(t);
} catch (Throwable ex) {
throw ExceptionHelper.wrapOrThrow(ex);
}
}
至此垒手,我們可以回答前面的問題了蒜焊,Schedulers.io()返回的就是一個IoScheduler對象,這個IoScheduler實際上就是在IO線程調(diào)度時用來管理IO線程的科贬。
scheduler.scheduleDirect(new SubscribeTask(parent))具體是如何完成線程調(diào)度的泳梆?
我們點進這個方法看一下:
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
直接無延遲調(diào)用了scheduleDirect方法
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
//1、創(chuàng)建一個worker榜掌,其實現(xiàn)了Disposable接口优妙,是一個工作者類
final Worker w = createWorker();
//2、這里我們可能要對run參數(shù)做一些處理憎账,也可能不處理鳞溉,取決為我們是否為RxJavaPlugins類的成員變量onScheduleHandler賦值(自定義繼承Function的實現(xiàn)類處理run)。
//一般情況下我們可以認(rèn)為decoratedRun=run鼠哥。
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//3、包裝類看政。DisposeTask實現(xiàn)了Disposable, Runnable, SchedulerRunnableIntrospection這三個接口朴恳,
//主要用于管理任務(wù)Runnable及其工作Worker
DisposeTask task = new DisposeTask(decoratedRun, w);
//4、開始執(zhí)行任務(wù)
w.schedule(task, delay, unit);
//5允蚣、返回這個管理者
return task;
}
上面的方法將外面?zhèn)魅脒M來的run傳遞給了DisposeTask于颖,然后調(diào)用worker來進行任務(wù)調(diào)度,看起來主要的任務(wù)是在DisposeTask里面執(zhí)行的嚷兔,我看下DisposeTask這個類的代碼:
static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection {
//6森渐、外部傳進來的Runnable任務(wù)
final Runnable decoratedRun;
//7、調(diào)度工作者worker
final Worker w;
//8冒晰、當(dāng)前線程
Thread runner;
DisposeTask(Runnable decoratedRun, Worker w) {
this.decoratedRun = decoratedRun;
this.w = w;
}
@Override
public void run() {
//9同衣、保存decoratedRun執(zhí)行時候所在的線程
runner = Thread.currentThread();
try {
//10、執(zhí)行decoratedRun的run()方法
decoratedRun.run();
} finally {
dispose();
runner = null;
}
}
@Override
public void dispose() {
if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
//11壶运、當(dāng)DisposeTask被取消執(zhí)行任務(wù)時耐齐,如果取消任務(wù)時所在的線程和decoratedRun任務(wù)執(zhí)行時所在的線程為同一個線程,
//并且調(diào)度工作者worker類型為NewThreadWorker類型,直接結(jié)束任務(wù)埠况,關(guān)閉底層執(zhí)行程序(這里是同步任務(wù)場景)
((NewThreadWorker)w).shutdown();
} else {
//12耸携、當(dāng)DisposeTask被取消執(zhí)行任務(wù)時,告訴worker工作者取消調(diào)度任務(wù)(IO異步任務(wù)場景)
w.dispose();
}
}
@Override
public boolean isDisposed() {
return w.isDisposed();
}
@Override
public Runnable getWrappedRunnable() {
return this.decoratedRun;
}
}
具體的步驟都在上面的代碼中標(biāo)注了出來辕翰,我們可以看到夺衍,在DisposeTask的run方法中最終執(zhí)行了decoratedRun.run(),而這個decoratedRun也就是我們在ObservableSubscribeOn類的subscribeActual(final Observer<? super T> s)方法里面所執(zhí)行的代碼parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)))傳進來的new SubscribeTask(parent))對象喜命,通過本文前面的分析我們知道沟沙,SubscribeTask也是一個Runnable,實際上的訂閱source.subscribe(parent)正是在其run()方法中渊抄。
至此我們明白了scheduler.scheduleDirect(new SubscribeTask(parent))都做了哪些工作尝胆,還有一個問題,我們通過查看Worker類的源碼會發(fā)現(xiàn)這是一個抽象類护桦,而createWorker方法也是一個抽象方法含衔,我們在上面說到在IO線程調(diào)度時實際上管理IO線程的是IoScheduler,這個類也是繼承自Scheduler二庵,我們?nèi)oScheduler類看下createWorker方法:
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
該方法直接返回了EventLoopWorker對象(管理Worker--負(fù)責(zé)任務(wù)調(diào)度的工作者贪染,CachedWorkerPool--管理Worker的一個隊列陪每,CompositeDisposable--批量管理訂閱狀態(tài)的disposable容器 ):
static final class EventLoopWorker extends Scheduler.Worker {
private final CompositeDisposable tasks;
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;
//保存訂閱狀態(tài)
final AtomicBoolean once = new AtomicBoolean();
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}
//取消訂閱時械筛,釋放當(dāng)前的threadWorker
@Override
public void dispose() {
//如果成功的將disposed狀態(tài)設(shè)置為true,則取消訂閱铜跑,并將釋放當(dāng)前的threadWorker資源因妙,并將其添加到Worker管理池中
if (once.compareAndSet(false, true)) {
tasks.dispose();
// releasing the pool should be the last action
pool.release(threadWorker);
}
}
@Override
public boolean isDisposed() {
return once.get();
}
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
//實際線程調(diào)度工作
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
}
雖然EventLoopWorker也繼承了Scheduler的內(nèi)部類Worker痰憎,但在scheduler方法中,把實際的線程調(diào)度工作轉(zhuǎn)發(fā)給了ThreadWorker去進行攀涵,ThreadWorker又繼承了NewThreadWorker類铣耘,我們點進scheduleActual(action, delayTime, unit, tasks)方法看下(該方法位于NewThreadWorker類中):
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//1、包裝類以故,管理任務(wù)線程蜗细,內(nèi)部實現(xiàn)了一系列方法,包括線程任務(wù)的執(zhí)行怒详,設(shè)置任務(wù)標(biāo)志位炉媒,任務(wù)的取消等。
//這個parent參數(shù)就是我們平常使用的CompositeDisposable對象昆烁,負(fù)責(zé)訂閱批量管理
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
//2吊骤、將任務(wù)線程ScheduledRunnable放入線程池執(zhí)行任務(wù),根據(jù)是否有延遲時間調(diào)用相應(yīng)的方法
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
//3静尼、設(shè)置ScheduledRunnable在線程池中的執(zhí)行狀態(tài)
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
//4水援、如果ScheduledRunnable在執(zhí)行過程中出錯密强,并且有設(shè)置訂閱狀態(tài)管理容器,將ScheduledRunnable的訂閱狀態(tài)從該容器中移除
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
//5蜗元、返回任務(wù)線程包裝類給外部或渤,用于該任務(wù)的管理
return sr;
}
既然返回的是ScheduledRunnable對象,我們就來看下這個類:
//這里即實現(xiàn)了Runnable接口也實現(xiàn)了Callable接口奕扣,用于應(yīng)對需要返回值和不需要返回值的情況
public final class ScheduledRunnable extends AtomicReferenceArray<Object> implements Runnable, Callable<Object>, Disposable {
private static final long serialVersionUID = -6120223772001106981L;
final Runnable actual;
/** Indicates that the parent tracking this task has been notified about its completion. */
static final Object PARENT_DISPOSED = new Object();
/** Indicates the dispose() was called from within the run/call method. */
static final Object SYNC_DISPOSED = new Object();
/** Indicates the dispose() was called from another thread. */
static final Object ASYNC_DISPOSED = new Object();
static final Object DONE = new Object();
//保存追蹤該任務(wù)的外部任務(wù)狀態(tài)的標(biāo)識
static final int PARENT_INDEX = 0;
//保存該任務(wù)運行狀態(tài)的標(biāo)識
static final int FUTURE_INDEX = 1;
//保存其他線程像該任務(wù)發(fā)出的指令標(biāo)識
static final int THREAD_INDEX = 2;
/**
* Creates a ScheduledRunnable by wrapping the given action and setting
* up the optional parent.
* @param actual the runnable to wrap, not-null (not verified)
* @param parent the parent tracking container or null if none
*/
public ScheduledRunnable(Runnable actual, DisposableContainer parent) {
super(3);
this.actual = actual;
//如果我們在外部通過一個disposable容器來管理當(dāng)前任務(wù)薪鹦,就保存這個disposable容器
this.lazySet(0, parent);
}
@Override
public Object call() {
// Being Callable saves an allocation in ThreadPoolExecutor
run();
return null;
}
@Override
public void run() {
//設(shè)置任務(wù)執(zhí)行線程為當(dāng)前所在線程
lazySet(THREAD_INDEX, Thread.currentThread());
try {
try {
//執(zhí)行實際IO任務(wù)
actual.run();
} catch (Throwable e) {
// Exceptions.throwIfFatal(e); nowhere to go
RxJavaPlugins.onError(e);
}
} finally {
//任務(wù)執(zhí)行期間出現(xiàn)錯誤,重置當(dāng)前執(zhí)行線程為null
lazySet(THREAD_INDEX, null);
//獲取追蹤次任務(wù)的外部任務(wù)狀態(tài)
Object o = get(PARENT_INDEX);
if (o != PARENT_DISPOSED && compareAndSet(PARENT_INDEX, o, DONE) && o != null) {
//如果追蹤該任務(wù)狀態(tài)的外部任務(wù)還沒有被終止惯豆,將它設(shè)置為DONE狀態(tài)池磁,并將當(dāng)前任務(wù)從訂閱狀態(tài)管理容器中刪除
((DisposableContainer)o).delete(this);
}
//任務(wù)在執(zhí)行出錯時,確保任務(wù)的執(zhí)行結(jié)果狀態(tài)為SYNC_DISPOSED楷兽,ASYNC_DISPOSED以及DONE中的任意一個即可
for (;;) {
o = get(FUTURE_INDEX);
if (o == SYNC_DISPOSED || o == ASYNC_DISPOSED || compareAndSet(FUTURE_INDEX, o, DONE)) {
break;
}
}
}
}
//設(shè)置Runnable任務(wù)的運行狀態(tài)
public void setFuture(Future<?> f) {
for (;;) {
Object o = get(FUTURE_INDEX);
if (o == DONE) {
return;
}
if (o == SYNC_DISPOSED) {
f.cancel(false);
return;
}
if (o == ASYNC_DISPOSED) {
f.cancel(true);
return;
}
if (compareAndSet(FUTURE_INDEX, o, f)) {
return;
}
}
}
//結(jié)束Runnable任務(wù)運行
@Override
public void dispose() {
for (;;) {
//獲取任務(wù)運行狀態(tài)
Object o = get(FUTURE_INDEX);
//如果任務(wù)執(zhí)行結(jié)果狀態(tài)為以下三個狀態(tài)中的任意一個地熄,表示任務(wù)已被結(jié)束,什么也不做
if (o == DONE || o == SYNC_DISPOSED || o == ASYNC_DISPOSED) {
break;
}
//判斷任務(wù)在執(zhí)行期間所在的線程以及執(zhí)行終止后所在的線程是否一致
boolean async = get(THREAD_INDEX) != Thread.currentThread();
//設(shè)置該任務(wù)是在同步環(huán)境下結(jié)束的還是在異步環(huán)境下結(jié)束的
if (compareAndSet(FUTURE_INDEX, o, async ? ASYNC_DISPOSED : SYNC_DISPOSED)) {
if (o != null) {
//取消任務(wù)的執(zhí)行
((Future<?>)o).cancel(async);
}
break;
}
}
for (;;) {
//獲取追蹤該任務(wù)的外部任務(wù)所處狀態(tài)
Object o = get(PARENT_INDEX);
if (o == DONE || o == PARENT_DISPOSED || o == null) {
//如果外部任務(wù)狀態(tài)是終止?fàn)顟B(tài)芯杀,則什么也不做
return;
}
if (compareAndSet(PARENT_INDEX, o, PARENT_DISPOSED)) {
//如果外部任務(wù)狀態(tài)不是終止?fàn)顟B(tài)端考,將其設(shè)置為終止?fàn)顟B(tài),并將當(dāng)前任務(wù)從狀態(tài)容器中移除
((DisposableContainer)o).delete(this);
return;
}
}
}
//根據(jù)追蹤該任務(wù)的父任務(wù)狀態(tài)并根據(jù)父任務(wù)是否結(jié)束來判斷該任務(wù)是否結(jié)束
@Override
public boolean isDisposed() {
Object o = get(PARENT_INDEX);
return o == PARENT_DISPOSED || o == DONE;
}
}
通過上面代碼我們可以看到揭厚,NewThreadWorker類內(nèi)部確實是通過線程池來執(zhí)行我們提交的IO任務(wù)却特,這個線程池是什么時候創(chuàng)建的呢?答案是在createWorker()方法調(diào)用的時候筛圆,即我們創(chuàng)建worker工作者對象時裂明。通過調(diào)用EventLoopWorker的構(gòu)造方法生成該對象并返回。
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}
這個構(gòu)造方法接收一個CachedWorkerPool類型的參數(shù)太援,這個CachedWorkerPool并不是線程池闽晦,而是用來管理我們創(chuàng)建的Worker工作者。
static final class CachedWorkerPool implements Runnable {
//空閑Worker在隊列中的存活時間
private final long keepAliveTime;
//線程安全的任務(wù)隊列提岔,用于保存處于空閑狀態(tài)的Worker工作者
private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
//Disposable容器尼荆,可以用于統(tǒng)一管理多個訂閱任務(wù)狀態(tài)
final CompositeDisposable allWorkers;
//可以實現(xiàn)循環(huán)或延時執(zhí)行任務(wù)的線程池
private final ScheduledExecutorService evictorService;
//保存延遲周期任務(wù)
private final Future<?> evictorTask;
//線程創(chuàng)建工廠方法
private final ThreadFactory threadFactory;
CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
this.allWorkers = new CompositeDisposable();
this.threadFactory = threadFactory;
ScheduledExecutorService evictor = null;
Future<?> task = null;
//如果空閑隊列中的Worker有存活時間,則創(chuàng)建線程池和相應(yīng)的延遲周期任務(wù)
//該類實現(xiàn)了Runnable接口唧垦,并在run()方法中調(diào)用方法evictExpiredWorkers(),用于移除空閑隊列中超過設(shè)置的存活時間的Worker
if (unit != null) {
//創(chuàng)建線程池
evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
//并將當(dāng)前Runnable任務(wù)對象(即CachedWorkerPool)添加到周期延遲執(zhí)行任務(wù)中液样,然后啟動線程池執(zhí)行該周期延遲任務(wù)振亮,最后返回執(zhí)行結(jié)果
//由于run()方法中主要是為了在周期時間內(nèi)檢查Worker空閑隊列中緩存的Worker是否超過存活時間,因此此處周期延遲任務(wù)的延遲時間與Worker的存活時間一致
task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
}
evictorService = evictor;
evictorTask = task;
}
//CachedWorkerPool內(nèi)部的線程池啟動后開始執(zhí)行
@Override
public void run() {
evictExpiredWorkers();
}
//從空閑隊列中取出一個緩存的工作者Worker
ThreadWorker get() {
if (allWorkers.isDisposed()) {
return SHUTDOWN_THREAD_WORKER;
}
while (!expiringWorkerQueue.isEmpty()) {
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}
//如果隊列中沒有緩存的Worker鞭莽,則新建一個坊秸,并將其添加到disposable容器中,便于訂閱任務(wù)的統(tǒng)一管理
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
return w;
}
//釋放一個ThreadWorker澎怒,并為其設(shè)置存活時間褒搔,將其添加到空閑隊列中
void release(ThreadWorker threadWorker) {
// Refresh expire time before putting worker back in pool
threadWorker.setExpirationTime(now() + keepAliveTime);
expiringWorkerQueue.offer(threadWorker);
}
//移除空閑隊列中所有超過存活時間(keepAliveTime)的Worker
void evictExpiredWorkers() {
if (!expiringWorkerQueue.isEmpty()) {
long currentTimestamp = now();
for (ThreadWorker threadWorker : expiringWorkerQueue) {
if (threadWorker.getExpirationTime() <= currentTimestamp) {
if (expiringWorkerQueue.remove(threadWorker)) {
allWorkers.remove(threadWorker);
}
} else {
// Queue is ordered with the worker that will expire first in the beginning, so when we
// find a non-expired worker we can stop evicting.
break;
}
}
}
}
//......代碼省略
}
在get()方法中,先檢查空閑隊列有沒有緩存的Worker,如果沒有則創(chuàng)建一個星瘾。
static final class ThreadWorker extends NewThreadWorker {
private long expirationTime;
ThreadWorker(ThreadFactory threadFactory) {
super(threadFactory);
this.expirationTime = 0L;
}
//......代碼省略
}
ThreadWorker繼承自NewThreadWorker走孽,雖然它們的名字都帶了Thread,但它們并不是線程琳状,而是一個調(diào)度訂閱任務(wù)的工作者磕瓷,NewThreadWorker繼承自Scheduler內(nèi)的一個內(nèi)部類--Worker。
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
volatile boolean disposed;
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
//......代碼省略
}
我們看到創(chuàng)建線程池的代碼了念逞,繼續(xù)點進去看下:
//SchedulerPoolFactory.create
public static ScheduledExecutorService create(ThreadFactory factory) {
final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
if (PURGE_ENABLED && exec instanceof ScheduledThreadPoolExecutor) {
ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
POOLS.put(e, exec);
}
return exec;
}
//Executors.newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
//new ScheduledThreadPoolExecutor
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), threadFactory);
}
這里創(chuàng)建了一個線程池困食,其核心線程數(shù)為1,可創(chuàng)建的最大線程為Integer.MAX_VALUE翎承,非核心線程在線程池中的存活時間為10硕盹,存活時間單位為MILLISECONDS,線程管理隊列為DelayedWorkQueue叨咖,線程創(chuàng)建的工廠類為我們外面?zhèn)魅氲墓S類(這里是RxJava默認(rèn)實現(xiàn)的工廠類==>RxThreadFactory)瘩例。
最后NewThreadWorker就是通過這里創(chuàng)建的線程池來執(zhí)行具體的IO任務(wù)==>SubscribeTask。
由上面代碼可以看到芒澜,每個Worker內(nèi)部都自己創(chuàng)建了一個獨立的線程池來執(zhí)行IO任務(wù)仰剿,這些Worker又都是通過CachedWorkerPool來統(tǒng)一管理的,那么這個CachedWorkerPool又是什么時候創(chuàng)建的呢痴晦?答案是在我們調(diào)用Schedulers.io()時南吮。
前面我們提到過,當(dāng)調(diào)用Schedulers.io()時誊酌,最終會調(diào)用IoScheduler類的構(gòu)造方法產(chǎn)生一個IoScheduler對象部凑。
public final class IoScheduler extends Scheduler {
//......代碼省略
private static final String WORKER_THREAD_NAME_PREFIX = "RxCachedThreadScheduler";
static final RxThreadFactory WORKER_THREAD_FACTORY;
static final CachedWorkerPool NONE;
static {
//......代碼省略
//創(chuàng)建線程的工廠類,為每個創(chuàng)建的線程名添加前綴(RxCachedThreadScheduler)
WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority);
//初始化一個沒有線程池的CachedWorkerPool碧浊,該Worker管理池內(nèi)空閑隊列中的Worker存活時間為0涂邀,Worker內(nèi)部線程池創(chuàng)建線程的工廠類為RxThreadFactory
NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);
//清空CachedWorkerPool內(nèi)部狀態(tài)
NONE.shutdown();
}
public IoScheduler() {
this(WORKER_THREAD_FACTORY);
}
public IoScheduler(ThreadFactory threadFactory) {
//創(chuàng)建線程的工廠類對象
this.threadFactory = threadFactory;
//包裝類,以保證多線程環(huán)境下CachedWorkerPool可以正常工作
this.pool = new AtomicReference<CachedWorkerPool>(NONE);
start();
}
public void start() {
//新建一個有線程池以及Worker存活時間的CachedWorkerPool
CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
//這里通過AtomicReference的CAS方法來確保NONE能被正常更新為update
if (!pool.compareAndSet(NONE, update)) {
update.shutdown();
}
}
//......代碼省略
}
至此:訂閱線程切換也就告一段落了箱锐,文章的后面還扯了很多題外話比勉,下一篇文章RxJava2筆記(四、觀察者線程切換)我們繼續(xù)介紹觀察者線程切換(即如何將線程由子線程切換回主線程從而進行UI更新)驹止。