在上一篇文章RxJava2筆記(三熟吏、訂閱線程切換)中,我們分析了訂閱線程是如何切換的玄窝,即調用subscribeOn()來切換訂閱線程時都執(zhí)行了哪些操作牵寺。在本文我們將繼續(xù)介紹觀察者線程切換,也就是將線程由子線程切換回UI線程恩脂。
繼續(xù)在前面的基礎上修改代碼帽氓,在訂閱線程切換方法后調用observeOn(AndroidSchedulers.mainThread())將線程切換回主線程:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) {
Log.i(TAG, "subscribe--運行線程:" + Thread.currentThread().getName());
emitter.onNext(1);
emitter.onNext(2);
try {
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
emitter.onNext(3);
emitter.onComplete();
}
})
//將線程由UI線程切換到子線程執(zhí)行IO請求
.subscribeOn(Schedulers.io())
//將線程切換回UI線程,方面后續(xù)操作更新UI界面
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
看下運行結果:
I/MainActivity: onSubscribe--運行線程:main
I/MainActivity: subscribe--運行線程:RxCachedThreadScheduler-1
I/MainActivity: onNext: 1 --運行線程:main
I/MainActivity: onNext: 2 --運行線程:main
I/MainActivity: onNext: 3 --運行線程:main
I/MainActivity: onComplete--運行線程:main
可以看到俩块,subscribe方法運行在子線程中(也就是訂閱線程運行在名為RxCachedThreadScheduler-1的一個子線程中黎休,上文提到該線程是由RxJava實現(xiàn)的一個工廠類創(chuàng)建的)浓领,而observer運行在名為main的線程中,這個main線程就是UI線程奋渔。
看完了輸出結果,接下來就看看這個observeOn(AndroidSchedulers.mainThread())
是如何將線程切換到UI線程的壮啊,點進去看下:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
//當出現(xiàn)異常時嫉鲸,默認無延遲發(fā)送錯誤。bufferSize()是緩沖區(qū)大小歹啼,RxJava設置了一個默認大小玄渗,為128。
return observeOn(scheduler, false, bufferSize());
}
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
//包裝類狸眼,保存上游observable
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
熟悉的套路藤树,跟調用subscribe.on方法時類似,只是多了一個驗證緩沖區(qū)大小不為空的代碼拓萌,這些我們都略過岁钓,直接看ObservableObserveOn這個類:
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
//如果傳入的調度器是TrampolineScheduler,則不切換線程微王,在當前線程調度
//但是調度的任務并不是馬上執(zhí)行屡限,而是等待當前任務執(zhí)行完畢再執(zhí)行
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
//創(chuàng)建工作者worker
Scheduler.Worker w = scheduler.createWorker();
//上游的subscribe,該方法會觸發(fā)上游的subscribeActual炕倘,
//ObserveOnObserver也是一個包裝類钧大,保存下游的observer
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
//......代碼省略
}
熟悉的裝飾器模式:
- 1、ObservableObserveOn繼承了AbstractObservableWithUpstream罩旋,其繼承的ObservableSource類型的source成員變量用于保存上游的observable啊央。
- 2、AndroidSchedulers.mainThread()為本次傳入的scheduler涨醋,負責將線程切換到UI線程瓜饥。
- 3、下游調用subscribe方法是觸發(fā)=>當前observable的subscribeActual方法=>觸發(fā)上游observable的subscribe方法=>傳入?yún)?shù)包裝類ObserveOnObserver(包裝了下游的觀察者observer)浴骂。
這里簡要介紹下步驟3:
這里的source是上游的observable對象压固,source.subscribe()
方法實際調用的是上游observable對象的subscribeActual方法,并將下游observer對象的包裝類ObserveOnObserver作為參數(shù)傳遞進去靠闭,在上游observable對象的subscribeActual方法內帐我,調用ObserveOnObserver包裝類中的onSubscribe,onNext等方法愧膀,進而調用下游observer的onSubscribe拦键,onNext等方法。
接下來看下ObserveOnObserver這個下游observer包裝類:
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
private static final long serialVersionUID = 6576896619930983584L;
//下游observer
final Observer<? super T> actual;
//調度工作者
final Scheduler.Worker worker;
//當訂閱任務執(zhí)行出錯時檩淋,是否延遲發(fā)送錯誤消息芬为,默認為false萄金,也就是不延遲
final boolean delayError;
//緩沖區(qū)大小,緩存上游發(fā)送的事件
final int bufferSize;
//存儲上游observable發(fā)出的數(shù)據(jù)隊列
SimpleQueue<T> queue;
//存儲管理下游observer的訂閱狀態(tài)disposable
Disposable s;
//訂閱任務執(zhí)行出錯時媚朦,存儲錯誤信息
Throwable error;
//訂閱任務是否終止
volatile boolean done;
//訂閱任務是否被取消
volatile boolean cancelled;
//任務執(zhí)行模式----同步還是異步
int sourceMode;
//是否輸出融合(通常情況下該選項為false)
boolean outputFused;
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.actual = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
public void onSubscribe(Disposable s) {
//當前的disposable為null氧敢,上游subscribe產生的disposable不為null,則驗證通過
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
//如果訂閱時獲取的disposable對象s是QueueDisposable類型的
if (s instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
//新建QueueDisposable隊列并將訂閱時獲取的disposable對象s強轉為QueueDisposable询张,然后賦值給queue
QueueDisposable<T> qd = (QueueDisposable<T>) s;
//獲取任務運行模式
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
//判斷運行模式孙乖,并且調用下游observer的onSubscribe方法將當前observer在訂閱時產生的disposable傳遞給下游observer
if (m == QueueDisposable.SYNC) {
sourceMode = m;
queue = qd;
//為true 使得接下來的onXXX等方法均不會執(zhí)行
done = true;
actual.onSubscribe(this);
//worker直接調度任務
schedule();
return;
}
if (m == QueueDisposable.ASYNC) {
sourceMode = m;
queue = qd;
actual.onSubscribe(this);
//在異步模式下,等待onXXX方法中的worker調度
return;
}
}
//否則創(chuàng)建一個支持單一生產者單一消費者的隊列
queue = new SpscLinkedArrayQueue<T>(bufferSize);
//調用下游observer的onSubscribe方法將當前observer在訂閱時產生的disposable傳遞給下游observer
actual.onSubscribe(this);
}
}
//......代碼省略
}
onSubscribe方法調用后份氧,就開始執(zhí)行onXXX方法了唯袄,首先是onNext方法,這個方法可以反復調用:
@Override
public void onNext(T t) {
//訂閱模式是同步模式或者執(zhí)行過onComplete/onError方法時蜗帜,此時done為true恋拷,直接返回
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
//將上游數(shù)據(jù)源發(fā)射的數(shù)據(jù)添加到緩存隊列中
queue.offer(t);
}
//開始worker調度任務
//(這里面調用了handler,將數(shù)據(jù)發(fā)送到主線程所在的消息隊列厅缺,進而更新UI界面蔬顾,這里稍后分析)
schedule();
}
其次是onError和onComplete,這兩個方法只能執(zhí)行一次并且是互斥的:
@Override
public void onError(Throwable t) {
//如果任務狀態(tài)已經(jīng)是終止狀態(tài)湘捎,再執(zhí)行該任務是就會拋出異常
if (done) {
RxJavaPlugins.onError(t);
return;
}
error = t;
//設置任務狀態(tài)為終止狀態(tài)
done = true;
//worker任務調度
schedule();
}
@Override
public void onComplete() {
//如果任務是終止狀態(tài)阎抒,直接返回
if (done) {
return;
}
//設置任務狀態(tài)為終止狀態(tài)
done = true;
//worker任務調度
schedule();
}
這里onNext,onComplete消痛,onError最后都調用schedule()來調度任務:
//調用worker.schedule(this)開始任務調度
void schedule() {
//這里通過getAndIncrement() == 0原子性的保證了worker.schedule(this)在調度完之前不會再次被調度
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
上面在執(zhí)行worker.schedule(this)時傳入了this且叁,也就是當前對象ObserveOnObserver,ObserveOnObserver類實現(xiàn)了Runnable接口秩伞,因此worker.schedule(this)調度的任務就是自己run()實現(xiàn)方法中的任務:
@Override
public void run() {
//outputFused通常情況下為false
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
drainFused()通常情況下不會執(zhí)行逞带,我們只需要關注drainNormal()方法即可,在查看該方法之前纱新,先看下drainNormal()內部調用的一個驗證方法checkTerminated(boolean d, boolean empty, Observer<? super T> a)
展氓,該方法主要是檢測任務是否已終止:
boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
//如果訂閱被取消,清空數(shù)據(jù)緩存隊列
if (cancelled) {
queue.clear();
return true;
}
//這個d就是done
if (d) {
//done為true時脸爱,有兩種情況遇汞,在onNext調度完畢后執(zhí)行onComplete或onError
Throwable e = error;
if (delayError) {
//如果是延遲發(fā)送錯誤的情況,必須等到queue(緩存上游observable發(fā)出的數(shù)據(jù))為空的情況下才能發(fā)送錯誤(有錯誤的情況下)
if (empty) {
if (e != null) {
a.onError(e);
} else {
a.onComplete();
}
//終止worker任務調度
worker.dispose();
return true;
}
} else {
//不延遲發(fā)送錯誤時簿废,直接調用
if (e != null) {
//如果任務執(zhí)行出錯空入,即調用了onNext方法,清空queue
queue.clear();
//調用下游observer的onError
a.onError(e);
//終止worker調度
worker.dispose();
return true;
} else
if (empty) {
//任務正常執(zhí)行族檬,未出現(xiàn)錯誤
//調用下游observer的onComplete歪赢,并終止worker調度
a.onComplete();
worker.dispose();
return true;
}
}
}
//否則返回false,任務還未終止
return false;
}
任務結束情況分為以下兩種
- 1单料、訂閱被取消埋凯,即cancelled==true点楼,此時任務為終止狀態(tài)。
- 2白对、任務運行標記done==true掠廓,即onNext調用完畢,調用onComplete正常結束甩恼;或者在onNext調用過程中出現(xiàn)錯誤蟀瞧,調用了onNext,此時是異常結束媳拴,會發(fā)送異常信息黄橘。
說完了這個方法兆览,我們繼續(xù)看drainNormal()這個方法:
void drainNormal() {
//這個變量只是一個控制變量屈溉,用來確保drainNormal()方法能被原子性調用
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
//外層死循環(huán)
for (;;) {
//根據(jù)數(shù)據(jù)緩存隊列是否為空檢查任務是否已終止
//為空就會直接跳出外層循環(huán),方法執(zhí)行結束抬探,內層的循環(huán)也不會執(zhí)行
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
//內層死循環(huán)子巾,負責具體的任務處理
for (;;) {
boolean d = done;
T v;
try {
//從隊列中獲取數(shù)據(jù)
v = q.poll();
} catch (Throwable ex) {
//任務執(zhí)行出現(xiàn)錯誤,拋出致命錯誤信息小压,結束循環(huán)线梗,終止本次任務
Exceptions.throwIfFatal(ex);
//終止訂閱
s.dispose();
//清空緩存隊列
q.clear();
a.onError(ex);
//停止worker調度
worker.dispose();
return;
}
boolean empty = v == null;
//判斷從隊列中取出的數(shù)據(jù)是否為空,即判斷隊列是否為空
if (checkTerminated(d, empty, a)) {
return;
}
//隊列中沒有了數(shù)據(jù)怠益,直接退出
if (empty) {
break;
}
//調用下游observer的onNext(onComplete和onError均在checkTerminated方法里調用)
a.onNext(v);
}
//這里主要是確保在同一時間只有一個worker.schedule(this)正在執(zhí)行仪搔。
//missed變量在方法最開始初始化為1,這里missed會被重置為0蜻牢,這樣下面的missed==0成立烤咧,當前任務結束。
//addAndGet(-missed)方法也會將AtomicInteger內部的VALUE值設置為0抢呆。
//同時煮嫌,run()方法中的判定方法getAndIncrement() == 0成立,繼續(xù)執(zhí)行下一個worker.schedule(this)抱虐。
//如果程序沒有走到這里昌阿,那么missed==0也就不成立,相應的run()方法中的getAndIncrement() == 0不成立恳邀,也就不會執(zhí)行下一個worker.schedule(this)懦冰。
//這樣就原子性的確保了同一時間只有一個worker.schedule(this)正在執(zhí)行,即同一時間只有一個drainNormal()方法在執(zhí)行谣沸。
missed = addAndGet(-missed);
//程序走到這里儿奶,表示當前任務正常結束,退出循環(huán)鳄抒。
//run()方法繼續(xù)執(zhí)行闯捎,getAndIncrement() == 0成立椰弊,開始執(zhí)行下一個worker.schedule(this)。
if (missed == 0) {
break;
}
}
}
到這里瓤鼻,ObserveOnObserver這個下游observer包裝類也就介紹的差不多了秉版,簡要總結下:
- 1、構造方法接收了外部創(chuàng)建的工作者worker茬祷,負責任務調度清焕。
- 2、內部定義了一個緩存隊列祭犯,緩存上游observable發(fā)射的數(shù)據(jù)秸妥。
- 3、ObserveOnObserver的父類繼承了AtomicInteger類沃粗,這樣在多線程環(huán)境中粥惧,可以通過AtomicInteger的CAS操作確保線程安全。
- 4最盅、ObserveOnObserver類實現(xiàn)了Runnable接口突雪,在run()方法內,使用worker進行任務調度涡贱,并通過getAndIncrement()==0來確保worker任務調度的原子性操作咏删。
- 5、drainNormal()為實際調度執(zhí)行的方法问词,其內部聲明了一個局部變量missed督函,并初始化為1。然后開始死循環(huán)激挪,在外層循環(huán)末尾處(內層循環(huán)正常結束辰狡,消息隊列中的數(shù)據(jù)都被取了出來并處理完畢,此時消息隊列為空灌灾,結束內層循環(huán))搓译,調用missed = addAndGet(-missed)后,ObserveOnObserver內部的VALUE值被重置為0锋喜,最后判定missed==0成立些己,程序跳出死循環(huán),drainNormal()方法執(zhí)行完畢嘿般。最后run()方法繼續(xù)執(zhí)行段标,此時getAndIncrement()==0成立,開始下一個worker任務調度炉奴。
如果在循環(huán)途中因為某些原因(任務已執(zhí)行完畢或出現(xiàn)異常)而中途退出循環(huán)沒能執(zhí)行到循環(huán)末尾逼庞,那么其內部的VALUE值就不為0,getAndIncrement()==0不成立瞻赶,不會執(zhí)行worker調度赛糟,因此整個訂閱也就結束了派任。
在上一篇文章中,訂閱線程最終是通過調度器來執(zhí)行具體切換過程的璧南。同樣的掌逛,對于觀察者線程切換執(zhí)行的也是類似的過程。前面分析道司倚,ObservableObserveOn構造方法接收我們傳入的調度器scheduler豆混,并通過scheduler創(chuàng)建工作者worker,將其傳入到ObserveOnObserver的構造方法中动知,最后在run()方法中執(zhí)行具體的任務調度皿伺。因此觀察者的線程切換肯定是發(fā)生在worker的調度過程中。
先從observeOn(AndroidSchedulers.mainThread())
的參數(shù)AndroidSchedulers.mainThread()開始盒粮,點進去看下:
public final class AndroidSchedulers {
private static final class MainHolder {
//構造一個與UI線程關聯(lián)的Handler鸵鸥,并將其作為參數(shù)構造HandlerScheduler調度器
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
/** A {@link Scheduler} which executes actions on the Android main thread. */
//返回與主線程相關的scheduler調度器
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
/** A {@link Scheduler} which executes actions on {@code looper}. */
//自己指定線程Looper自定義觀察者線程切換
public static Scheduler from(Looper looper) {
if (looper == null) throw new NullPointerException("looper == null");
return new HandlerScheduler(new Handler(looper));
}
private AndroidSchedulers() {
throw new AssertionError("No instances.");
}
}
AndroidSchedulers.mainThread()最終返回HandlerScheduler對象,HandlerScheduler也是繼承自Scheduler拆讯,其構造方法接收一個Handler類型的參數(shù)脂男,這個Handler通過Looper.getMainLooper()與UI線程關聯(lián)起來养叛,這樣通過Handler發(fā)送的消息就能被UI線程接收种呐,從而更新UI界面。具體返回HandlerScheduler對象的步驟與上文所介紹的訂閱線程切換生成IoScheduler一致弃甥,不再詳述爽室。
熟悉Handler的童鞋都知道,Handler在安卓的消息機制中占有重要的地位淆攻,它貫穿著整個安卓的體系阔墩,在很多地方我們都能見到它的身影。在剛開始接觸安卓開發(fā)的時候我們都寫過類似下面的代碼:
private Handler handler = new Handler(){
@Override
public void handleMessage(Message msg) {
super.handleMessage(msg);
//更新UI線程代碼
//獲取子線程發(fā)送的message消息里面的請求數(shù)據(jù)瓶珊,更新UI界面
//操作省略
}
};
public void getData(){
new Thread(new Runnable() {
@Override
public void run() {
//1啸箫、網(wǎng)絡請求數(shù)據(jù)并返回
//2、獲取Message對象伞芹,并將請求到的數(shù)據(jù)用Message包裹起來
//3忘苛、調用handler的sendMessage等方法發(fā)送Message
}
}).start();
}
隨著現(xiàn)在各種各樣網(wǎng)絡請求框架的出現(xiàn),大大簡化了我們網(wǎng)絡請求更新UI的操作唱较,上面的代碼我們很少再去寫了扎唾,但并不意味著它就不重要了,尤其是Handler南缓。其實很多的網(wǎng)絡請求框架都只不過是將上面的操作封裝起來了而已胸遇,RxJava雖然與網(wǎng)絡請求無關,但在觀察者線程切換里面同樣也是將上面的過程封裝起來汉形,方便我們使用纸镊。
我們接著來看HandlerScheduler這個類:
final class HandlerScheduler extends Scheduler {
private final Handler handler;
HandlerScheduler(Handler handler) {
this.handler = handler;
}
//......代碼省略
@Override
public Worker createWorker() {
return new HandlerWorker(handler);
}
//......代碼省略
}
HandlerScheduler繼承自Scheduler倍阐,并實現(xiàn)了createWorker()方法生成任務調度worker,這里返回的是一個HandlerWorker對象逗威,前面提到的worker.schedule(this)中的worker實際上就是這個HandlerWorker收捣。
private static final class HandlerWorker extends Worker {
//保存外部傳進來的Handler,這里保存的是與UI線程關聯(lián)的Handler庵楷,具體參見上面介紹
private final Handler handler;
//事件訂閱狀態(tài)標志位
private volatile boolean disposed;
HandlerWorker(Handler handler) {
this.handler = handler;
}
//worker.schedule(this)最終調用的方法罢艾,這里delay為0,表示handler無延遲發(fā)送消息
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
//如果訂閱已終止尽纽,返回帶有終止狀態(tài)的disposable
if (disposed) {
return Disposables.disposed();
}
//對任務做一些自己的處理(默認情況下沒做任何處理)
run = RxJavaPlugins.onSchedule(run);
//包裝類
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
//獲取message咐蚯,設置其target成員為handler,callback成員為scheduled
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
//將消息發(fā)送到UI關聯(lián)的消息隊列中弄贿,此時handler中的queue是UI線程中的queue
//(參考new Handler(Looper.getMainLooper()))
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
// Re-check disposed state for removing in case we were racing a call to dispose().
//再次檢查訂閱是否被終止春锋,若已被終止,移除handler中的callback差凹,并返回帶有終止狀態(tài)的disposable
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
//訂閱終止
@Override
public void dispose() {
disposed = true;
handler.removeCallbacksAndMessages(this /* token */);
}
@Override
public boolean isDisposed() {
return disposed;
}
}
在HandlerWorker中期奔,將任務run用ScheduledRunnable包裝起來,在設置到Message的callback中危尿,生成一個Message呐萌,熟悉handler的都知道,handler通過dispatchMessage(Message msg)方法來進行消息的分發(fā)處理谊娇,這個方法的處理順序如下:
- 1肺孤、判斷message中的callback(該callback是一個Runnable)是否為空,若不為空济欢,調用handleCallback(msg)赠堵,在handleCallback(msg)中,直接執(zhí)行message.callback.run()法褥。若為空茫叭,進入步驟2。
- 2半等、判斷handler中的callback(該callback是Handler內部的一個內部接口揍愁,由我們自己實現(xiàn))是否為空,若不為空酱鸭,執(zhí)行callback.handleMessage(msg)吗垮。若為空,進入步驟3凹髓。
- 3烁登、若步驟1和步驟2中的判斷均不成立,執(zhí)行handleMessage(msg),也就是我們前面寫的樣例代碼饵沧。
在這里由于Message中的callback不為空锨络,因此執(zhí)行執(zhí)行callback中的run()方法,也就是前面提到的ObserveOnObserver(實現(xiàn)了Runnable接口)內部的run()方法狼牺,進而調用drainNormal()方法羡儿,最終調用下游observer的onNext。
生成Message后是钥,調用handler的sendMessageDelayed方法發(fā)送消息(這里delay參數(shù)為0掠归,因此是立即發(fā)送),由于這里的handler是與UI線程關聯(lián)在一起的悄泥,因此ObserveOnObserver(實現(xiàn)了Runnable接口)內部的run()方法就被發(fā)送到了UI線程中的消息隊列中虏冻,最終通過handler的dispatchMessage(Message msg)方法調用handleCallback(msg),最后調用message.callback.run()執(zhí)行run()方法里面的代碼弹囚,完成UI線程更新厨相。
在文章的最后,我們來看下這個run任務的包裝類ScheduledRunnable:
private static final class ScheduledRunnable implements Runnable, Disposable {
private final Handler handler;
private final Runnable delegate;
private volatile boolean disposed;
//保存外部傳入的handler和runnable任務
ScheduledRunnable(Handler handler, Runnable delegate) {
this.handler = handler;
this.delegate = delegate;
}
@Override
public void run() {
try {
//執(zhí)行run方法鸥鹉,即傳入的ObserveOnObserver(實現(xiàn)了Runnable接口)內部的run()方法
delegate.run();
} catch (Throwable t) {
IllegalStateException ie =
new IllegalStateException("Fatal Exception thrown on Scheduler.", t);
RxJavaPlugins.onError(ie);
Thread thread = Thread.currentThread();
thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
}
}
//訂閱終止蛮穿,從handler中移除該runnable
//(實際上是從UI線程內部的隊列中將包裝這個runnable的message移除,如果這個message還未處理的話)
@Override
public void dispose() {
disposed = true;
handler.removeCallbacks(this);
}
@Override
public boolean isDisposed() {
return disposed;
}
}
至此毁渗,整個觀察者線程切換也就介紹完了践磅,最后我們再來梳理下思路:
- 1、observeOn(AndroidSchedulers.mainThread())返回一個ObservableObserveOn對象祝蝠,該對象內部(source)保存了上游傳遞過來的observable音诈。
- 2幻碱、下游執(zhí)行subscribe時绎狭,實際上執(zhí)行的是ObservableObserveOn對象內部的subscribeActual方法,該方法內部首先判斷調度器的類型(具體參看上面介紹)褥傍,這里傳入的調度器類型是HandlerScheduler儡嘶。
- 3、將下游observer對象用ObserveOnObserver包裝起來恍风,生成ObserveOnObserver對象蹦狂。
- 4、執(zhí)行source.subscribe方法朋贬,并將步驟3中生成的ObserveOnObserver對象作為參數(shù)傳遞進去凯楔。
- 5、步驟4中的subscribe方法實際上執(zhí)行的是上游observable的subscribeActual方法锦募,在該方法中ObserveOnObserver內部的onSubscribe方法首先被執(zhí)行摆屯,在這個onSubscribe方法內部調用下游observer的onSubscribe方法,建立訂閱關系糠亩。
- 6虐骑、執(zhí)行完onSubscribe方法后准验,ObserveOnObserver內部的onNext和onComplete或者onNext依次被執(zhí)行(onNext可被反復調用)。
- 7廷没、在ObserveOnObserver內部的onNext方法中糊饱,將上游發(fā)射的數(shù)據(jù)緩存到隊列中,然后調用schedule方法颠黎,通過HandlerScheduler生成的worker對象(HandlerWorker)開始調度任務另锋,通過步驟1中調用AndroidSchedulers.mainThread()時生成的關聯(lián)UI線程的handler將Runnable任務發(fā)送到UI線程的消息隊列中。
- 8狭归、ObserveOnObserver實現(xiàn)了Runnable接口砰蠢,在其實現(xiàn)方法run()中,最終調用了drainNormal()方法唉铜,該方法內有兩個嵌套的循環(huán)台舱,且都是死循環(huán)。外層循環(huán)結束的條件是當次訂閱任務被終止(外部取消或者任務執(zhí)行完畢或任務執(zhí)行出錯)潭流,內層循環(huán)結束的條件是當次訂閱任務執(zhí)行完畢或執(zhí)行出現(xiàn)錯誤竞惋。在內層循環(huán)內部,不斷的從緩存隊列中取出數(shù)據(jù)灰嫉,最后調用下游observer的onNext方法拆宛。在步驟7中,Runnable任務最后被發(fā)送到了UI線程內的消息隊列中讼撒,因此這個run()方法也是運行在UI線程浑厚,最終在UI線程中我們可以用這些數(shù)據(jù)更新UI界面。
- 9根盒、ObserveOnObserver內部的onComplete和onError執(zhí)行過程與onNext類似钳幅,區(qū)別是onComplete和onNext只會調用一次并且是互斥調用(參見上面源碼分析)。同樣炎滞,最終也會調用drainNormal()方法敢艰,只不過在drainNormal()方法的外層死循環(huán)內,首先調用的是checkTerminated方法册赛,用來判斷當前訂閱任務是否已被終止钠导,下游observer的onComplete和onError就是在這個方法里調用的。
下一章RxJava2筆記(五森瘪、訂閱流程梳理以及線程切換次數(shù)有效性)將對前面做一個流程梳理牡属,以此來結束RxJava的學習。