在上一篇文章當中我們把RxJava的上游線程切換的源碼都大致梳理了一遍惜辑,如果還沒有看的請猛戳這里,但是光有上游的線程切換是不足以讓我們完成在實際項目中的應用的譬正,絕大多數(shù)時候我們都需要在下游進行線程的切換來處理上游在其他線程中得到的結果宫补。所以現(xiàn)在我們就來分析一下RxJava源碼中是如何實現(xiàn)對下游線程的切換控制管理的檬姥。
這里我們一切換到Android主線程為例:
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
現(xiàn)在就從observeOn(AndroidSchedulers.mainThread())入手,探探究竟。
首先我們來看一下RxJava是如何得到一個Android主線程的Scheduler的即HandlerScheduler粉怕。我們點進源碼看一下:
/** Android-specific Schedulers. */
public final class AndroidSchedulers {
private static final class MainHolder {
//創(chuàng)建一個Handle拿到主線程的Looper 創(chuàng)建默認的 HandlerScheduler
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
//該Callable默認返回的就是上面的HandleScheduler
return MainHolder.DEFAULT;
}
});
/** A {@link Scheduler} which executes actions on the Android main thread. */
public static Scheduler mainThread() {
//這里就是入口 可以看到其實該方法是直接獲取到了一個靜態(tài)的Scheduler常量健民。
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
/** A {@link Scheduler} which executes actions on {@code looper}. */
public static Scheduler from(Looper looper) {
if (looper == null) throw new NullPointerException("looper == null");
return new HandlerScheduler(new Handler(looper));
}
private AndroidSchedulers() {
throw new AssertionError("No instances.");
}
}
好了現(xiàn)在Scheduler有了,我們繼續(xù)分析observeOn方法贫贝。
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
看到了吧秉犹,RxJava所有的代碼基本都是一致的,橋接模式稚晚,這里看到是創(chuàng)建了一個ObservableObserveOn對象崇堵,當然第二個參數(shù)默認是false,表明了如果執(zhí)行了onError() 將會重新發(fā)送一遍上游的事件序列客燕,第三個參數(shù)是緩存的大小默認是128鸳劳。我們點進ObservableObserveOn的構造方法看看里面都做了什么,很關鍵也搓。
//可以看到套路基本都是一樣的棍辕, ObservableObserveOn<T> 同樣是繼承于AbstractObservableWithUpstream<T, T> ,用來保存上游的原事件流。
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
//訂閱的真正發(fā)生之處
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {//肯定不是這個Scheduler啊还绘,我們這里是HandleScheduler
source.subscribe(observer);
} else {
//創(chuàng)建HandlerScheduler的Worker,HandlerWorker.
Scheduler.Worker w = scheduler.createWorker();
//上游事件和下游事件產生訂閱,這里又是一個包裝類ObserveOnObserver包裝了下游真正的Observer栖袋。
//我們到ObserverOnObserver里面去看看拍顷,其是一個靜態(tài)內部類
//這里是把worker,delayError塘幅,bufferSizew也都傳了進去
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
//實現(xiàn)了Runnable接口姥卢,繼承于BasicIntQueueDisposable
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
private static final long serialVersionUID = 6576896619930983584L;
final Observer<? super T> actual;
final Scheduler.Worker worker;
final boolean delayError;
final int bufferSize;
SimpleQueue<T> queue;
Disposable s;
Throwable error;
volatile boolean done;
volatile boolean cancelled;
int sourceMode;
boolean outputFused;
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.actual = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
if (s instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<T> qd = (QueueDisposable<T>) s;
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
if (m == QueueDisposable.SYNC) {
sourceMode = m;
queue = qd;
done = true;
actual.onSubscribe(this);
schedule();
return;
}
if (m == QueueDisposable.ASYNC) {
sourceMode = m;
queue = qd;
actual.onSubscribe(this);
return;
}
}
//事件的緩存隊列 確定了緩存隊列的大小
queue = new SpscLinkedArrayQueue<T>(bufferSize);
//執(zhí)行真正的onSubscribe方法
actual.onSubscribe(this);
}
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
//開始調度
schedule();
}
@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
error = t;
done = true;
//開始調度
schedule();
}
@Override
public void onComplete() {
if (done) {
return;
}
done = true;//已經(jīng)完成
//開始調度
schedule();
}
//取消訂閱
@Override
public void dispose() {
if (!cancelled) {
cancelled = true;
s.dispose();
worker.dispose();
if (getAndIncrement() == 0) {
queue.clear();
}
}
}
//判斷是否被取消訂閱
@Override
public boolean isDisposed() {
return cancelled;
}
//執(zhí)行調度的方法
void schedule() {
if (getAndIncrement() == 0) {
//傳入當前ObserveOnObserver對象漓帚,其實現(xiàn)了Runnable接口
worker.schedule(this);
}
}
void drainNormal() {
int missed = 1;
//緩存數(shù)據(jù)的隊列
final SimpleQueue<T> q = queue;
//實際下游的Observer
final Observer<? super T> a = actual;
for (;;) {
//檢測事件是否被終止,如果終止了直接跳出循環(huán)
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
//標記事件是否完成
boolean d = done;
T v;
try {
//拿到隊列里的第一個事件
v = q.poll();
} catch (Throwable ex) {
//發(fā)生異常了 做一系列的后續(xù)動作
//取消訂閱,隊列的制空亮蒋,發(fā)送異常事件,取消線程調度懒震,最后跳出循環(huán)
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
//判斷事件是否為空
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
//為空直接進入下一輪循環(huán)
//因為上游的事件處理也是需要時間的鳄厌,上游的執(zhí)行有可能是非常大量的數(shù)據(jù)所以可能會出現(xiàn)緩存隊列里面暫時沒有事件,所以這里需要一直進行循環(huán)去等待新的事件產生
if (empty) {
break;
}
//發(fā)送事件
a.onNext(v);
}
//下面這段代碼我也不是很確定他的意思拼缝,這里我說一下我自己的理解不知道正不正確:
//因為ObserveOnObserver是繼承于BasicIntQueueDisposable 娱局,而BasicIntQueueDisposable 又繼承了AtomicInteger,一個原子操作類
//用一個Integer整數(shù)來控制當前ObserveOnObserver對象的并發(fā)操作
//如果當前ObserveOnObserver對象沒有被其他線程獨占咧七,那么該對象就自己持有的話(代表已經(jīng)執(zhí)行完了當前的事件)衰齐,就可以執(zhí)行addAndGet(int i)方法了。
//執(zhí)行完改方法對自己的負數(shù)相加那么最終得出的是0继阻,為0的話就可以開始下一個循環(huán)了耻涛,那么以后的每一個循環(huán)missed的值都為0都可以直接break废酷!
//最終要的是addAndGet()是一個阻塞式的方法,如果不成功的話抹缕,它會重新執(zhí)行一遍
//所以我分析得出這里其實是一個控制標記位“好了澈蟆!現(xiàn)在輪到你了,開始吧”當?shù)谝淮文玫綑嘞藓缶涂梢砸恢眻?zhí)行下去了歉嗓。
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
void drainFused() {
...........
}
//具體的run方法內部
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
//去處理隊列里面緩存的數(shù)據(jù)
drainNormal();
}
}
//檢查是否終止 代碼都很簡單 我就不做注釋了
boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
if (cancelled) {
queue.clear();
return true;
}
if (d) {
Throwable e = error;
if (delayError) {
if (empty) {
if (e != null) {
a.onError(e);
} else {
a.onComplete();
}
worker.dispose();
return true;
}
} else {
if (e != null) {
queue.clear();
a.onError(e);
worker.dispose();
return true;
} else
if (empty) {
a.onComplete();
worker.dispose();
return true;
}
}
}
return false;
}
}
}
同樣是裝飾模式丰介,關鍵就是每當執(zhí)行onNext(),onError(),onCompleted()方法的時候,都會開啟線程的調度鉴分,上游的每一次事件哮幢,都會在指定線程中處理,這就是核心志珍。然后就執(zhí)行了具體的Worker實現(xiàn)類里面的schedule方法橙垢,我們一起看一下。
//HandlerWorker里面的schedule方法伦糯,其第二個參數(shù)為0L柜某,第三個參數(shù)為TimeUnit.NANOSECONDS。
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
//判斷是否取消訂閱了
return Disposables.disposed();
}
//滿篇飛的Hook函數(shù) +_+
run = RxJavaPlugins.onSchedule(run);
//封裝當前持有主線程Looper的handler和ObserveOnObserver對象
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
//創(chuàng)建Message
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
//給主線程發(fā)送消息
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
// Re-check disposed state for removing in case we were racing a call to dispose().
//判斷是否取消訂閱了
if (disposed) {
//如果取消訂閱了 就remove掉消息處理的回調接口
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
當然了最后主線程的執(zhí)行的程序是ScheduledRunnable里面的run()方法敛纲,代碼如下:
@Override
public void run() {
try {
//ObserveOnObserver對象的run方法
delegate.run();
} catch (Throwable t) {
//捕獲異常了進行一系列處理
IllegalStateException ie =
new IllegalStateException("Fatal Exception thrown on Scheduler.", t);
RxJavaPlugins.onError(ie);
Thread thread = Thread.currentThread();
thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
}
}
這樣RxJava就實現(xiàn)了把上游發(fā)送的每一個事件都巧妙地轉換到了指定線程中處理喂击,此處是Android主線程。
可以看到如果你在下游多次調用observeon()的話線程是會一直切換的淤翔,這也是網(wǎng)上一直說的結論翰绊。每一次切換線程,都會把對應的Observer對象的各項處理方法的處理執(zhí)行在制定線程當中旁壮。
大概瀏覽完源碼你會發(fā)現(xiàn)监嗜,RxJava的設計者真的是把面向對象的思想用到了極致,抽象接口與實體抡谐,設計模式地巧用都無處不在裁奇,感嘆自己要學的真的還有太多,如果讓我來寫不知道還要多少年才能寫出如此牛B的代碼麦撵。
這也算是我第一次寫源碼分析的文章刽肠,還有很多地方有待提高,最開始聽說別人源碼分析很重要免胃,不光要會用那些優(yōu)秀的Library更要理解其中的精髓五垮,與是我傻乎乎地悶著腦袋去看,結果真的看不懂杜秸,后來看了一本書叫做《Android源碼設計模式》才恍然大悟放仗,設計模式地巧用在各大優(yōu)秀的開源Library中無處不在,只有真正地理解了設計模式撬碟,精通架構诞挨,才能寫出如此優(yōu)秀的代碼莉撇。最后再安利一本書《設計模式之禪》這本書很有意思,作者語言幽默風趣惶傻,像看連環(huán)畫一樣很有意思棍郎。
哈哈 廢話說了一大堆了,如果上面我的分析有誤的話银室,歡迎指正批評涂佃,有什么不懂得地方也可以一起探討。
最后
沒有最后了蜈敢,大家再見~~~