首先明確一點(diǎn)線程切換一定需要Handler
的參與
線程切換的代碼最簡單的做法大概如下代碼
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "subscribe" + Thread.currentThread());
}
// ObservableOnSubscribe 運(yùn)行在工作線程
}).subscribeOn(Schedulers.newThread()).
// 觀察者運(yùn)行在主線程 這段代碼獲取了主線程的handler呀狼,之后的線程切換就需要他的參與
observeOn(AndroidSchedulers.mainThread()).
subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe" + Thread.currentThread());
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "error");
}
@Override
public void onComplete() {
Log.d(TAG, "complete");
}
});
subscribeOn(Schedulers.newThread()).
代碼指定ObservableOnSubscribe
運(yùn)行在工作線程
看下代碼怎么實(shí)現(xiàn)?
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
// 返回的還是 ObservableSubscribeOn(是Observable 的子類)對象 也是調(diào)用里面的subscribeActual示惊,默認(rèn)是調(diào)用ObservableCreate 的subscribeActual方法
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
然后調(diào)用subscribe
到
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
里面可以調(diào)用subscribeActual
方法
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
// s代表注冊的Obersver還是直接執(zhí)行
s.onSubscribe(parent);
// 這次沒有直接執(zhí)行代碼寫的subscribe方法 而是setDisposable
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
看下SubscribeTask類 實(shí)現(xiàn)了Runnable接口
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
// 在工作線程執(zhí)行上游subscribe
source.subscribe(parent);
}
}
接著往下看 就是一些線程池 線程管理的事情
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
// 在這提交到了線程池
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}