1.主線程是怎么切換到子線程
2.為什么只有第一次切換有效
3.子線程是怎么切換到主線程
1>問題1,主線程是怎么切換到子線程,實際上先不看源碼,換做我們自己寫的話,最簡單的用Thread和Runnable就可以實現(xiàn)
那么從這行代碼分析:
.subscribeOn(Schedulers.io())
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}...
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
so,接下來看Schedulers的具體實現(xiàn)了.
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
縷縷頭緒,實際就是一段代碼:
createWorker().schedule(new Runnable() {
@Override
public void run() {
source.subscribe(parent);
}
},0,TimeUnit.NANOSECONDS)
createWorker()-->Schedulers.io()
source-->ObservableCreate
那么接下來看一下Schedulers.io()的實現(xiàn)吧.
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
}
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
... ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
executor--->ScheduledExecutorService
總結:
executor.submit(new Runnable() {
@Override
public void run() {
source.subscribe(parent);
}
})
2>為什么只有第一次切換有效
看了流程圖是不是明白鏈式調用了
3>子線程是怎么切換到主線程
接著還是按咱們平時的想法,用handler就可以
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
...
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
...
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
void drainNormal() {
...
a.onNext(v);
...
}
...
final class HandlerScheduler extends Scheduler {
...
@Override
public Worker createWorker() {
return new HandlerWorker(handler, async);
}
...
}
private static final class HandlerWorker extends Worker {
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
handler.sendMessageDelayed(message, unit.toMillis(delay));
...
果不其然,就是用了handler
總結:鏈式調用優(yōu)化了調用方的體驗,但是被調用方彎彎繞繞,當然rxjava中有很多值得琢磨的代碼點,這里就不贅述了,暫先把總的邏輯問題弄明白吧.