RxJava通過(guò)責(zé)任鏈的方式届谈,將各個(gè) 操作符
節(jié)點(diǎn)串連起來(lái)鉴嗤。當(dāng)調(diào)用訂閱subscribe
方法時(shí)盟劫,鏈上節(jié)點(diǎn)都會(huì)依賴(lài)訂閱上一個(gè)節(jié)點(diǎn)航缀。那線程切換是如何實(shí)現(xiàn)的商架?
subscribeOn
操作符是如何讓上游節(jié)點(diǎn)工作在指定線程上?
observeOn
操作符是如何讓下游節(jié)點(diǎn)工作在指定線程上芥玉?
subscribeOn
操作符關(guān)鍵代碼:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
///scheduler 指定工作線程
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
.......
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);
observer.onSubscribe(parent);
///scheduler.scheduleDirect 線程啟動(dòng)
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
........
}
///關(guān)鍵在SubscribeTask中的`run`方法
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
///在線程運(yùn)行的代碼塊中進(jìn)行了**訂閱**
source.subscribe(parent);
}
}
subscribeOn分析
Observable.just("goods")
.subscribeOn(Schedulers.io())
.subscribe()
等價(jià)于
Schedulers.io().createWorker().schedule {
Observable.just("goods").subscribe()
}
observeOn分析
Observable.just("goods")
.observeOn(Schedulers.io())
.subscribe()
等價(jià)于
Observable.just("goods").subscribe {
Schedulers.io().createWorker().schedule {
///do somethind
}
}