首先開始填坑革半,上篇文章最后的問題還沒有解決檬嘀,subscribeOn是如何切換線程的憾股。
先回顧一下:
使用方法:
observable.subscribeOn(Schedulers.newThread());
我們直接看它重寫的abstract方法subscribeActual
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
source.subscribe(parent);
}
}));
}
看到了一個(gè)熟悉的Runnable是不是瞬間熱淚盈眶英遭?這里就看到了在一個(gè)Runnable中訂閱了事件掷伙,由于是接口回調(diào),所以observable中的事件是運(yùn)行在這個(gè)線程的雳攘,而observer回調(diào)接口的時(shí)候就要看具體的observeOn是什么參數(shù)了带兜。
跟蹤這里的schedulerDirect方法。
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
w.schedule(new Runnable() {
@Override
public void run() {
try {
decoratedRun.run();
} finally {
w.dispose();
}
}
}, delay, unit);
return w;
}
而這個(gè)creatework()的具體實(shí)現(xiàn)類在HandlerScheduler中吨灭。
@Override
public Worker createWorker() {
return new HandlerWorker(handler);
}
private static final class HandlerWorker extends Worker {
private final Handler handler;
private volatile boolean disposed;
HandlerWorker(Handler handler) {
this.handler = handler;
}
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
@Override
public void dispose() {
disposed = true;
handler.removeCallbacksAndMessages(this /* token */);
}
@Override
public boolean isDisposed() {
return disposed;
}
}
而這里的handler是ObserveOn的時(shí)候new Handler(Looper.getMainLooper())時(shí)候創(chuàng)建的刚照,運(yùn)行在主線程。
所以這里通過handler發(fā)送一個(gè)帶有Runnable的消息喧兄,完成了new Thread和Main Thread的線程切換无畔。