subscribeOn
-
找到ObservableSubscribeOn類
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) { super(source); // 傳入上游的Observable和調(diào)度器Scheduler this.scheduler = scheduler; } @Override public void subscribeActual(final Observer<? super T> s) { // 用下游的observer創(chuàng)建一個(gè)新的observer final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); s.onSubscribe(parent); // scheduler直接執(zhí)行SubscribeTask parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); }
-
查看SubscribeOnObserver類钝凶,除了實(shí)現(xiàn)Disposable接口還實(shí)現(xiàn)了Observer接口,說明他還是個(gè)新的observer筐乳;
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable { private static final long serialVersionUID = 8094547886072529208L; final Observer<? super T> actual; final AtomicReference<Disposable> s; SubscribeOnObserver(Observer<? super T> actual) { this.actual = actual;// 下游的observer this.s = new AtomicReference<Disposable>(); } @Override public void onSubscribe(Disposable s) { // 上游的onSubscribe會(huì)調(diào)用致稀,但是因?yàn)閠his.s的disposable不為null杂瘸,大部分情況一直都是直接跳過 DisposableHelper.setOnce(this.s, s); } @Override public void onNext(T t) { actual.onNext(t); } @Override public void onError(Throwable t) { actual.onError(t); } @Override public void onComplete() { actual.onComplete(); } @Override public void dispose() { DisposableHelper.dispose(s); // 這里dispose多了個(gè)步驟,沒明白?仪媒??谢鹊? DisposableHelper.dispose(this); } @Override public boolean isDisposed() { return DisposableHelper.isDisposed(get()); } void setDisposable(Disposable d) { DisposableHelper.setOnce(this, d); } }
-
查看SubscribeTask规丽,這是個(gè)Runnable,并且是ObservableSubscribeOn的內(nèi)部類
final class SubscribeTask implements Runnable { private final SubscribeOnObserver<T> parent; SubscribeTask(SubscribeOnObserver<T> parent) { this.parent = parent; } @Override public void run() { // 上游的observable執(zhí)行subscribe撇贺,傳入的是下游的obser source.subscribe(parent); } }
-
查看scheduler.scheduleDirect赌莺,這個(gè)方法就是用scheduler立即執(zhí)行傳入的Runnable任務(wù)
@NonNull public Disposable scheduleDirect(@NonNull Runnable run) { return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS); }
-
所以總結(jié)下
- ObservableSubscribeOn起到一個(gè)橋接的功能,執(zhí)行source.subscribe(parent)松嘶,處理上游的Observable
- 傳入的scheduler用來控制怎么執(zhí)行
observeOn
-
找到ObservableObserveOn類
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> { final Scheduler scheduler; final boolean delayError; final int bufferSize; public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) { super(source); this.scheduler = scheduler; // 調(diào)度器 this.delayError = delayError;// this.bufferSize = bufferSize;// 任務(wù)隊(duì)列大小艘狭,默認(rèn)128 } @Override protected void subscribeActual(Observer<? super T> observer) { if (scheduler instanceof TrampolineScheduler) { // 如果是TrampolineScheduler,放個(gè)屁啥都不干翠订? source.subscribe(observer); } else { // scheduler.createWorker Scheduler.Worker w = scheduler.createWorker(); // 下游的observer包裝成ObserveOnObserver巢音,傳給上游的source.subscribe source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } }
-
查看ObserveOnObserver類:AtomicInteger的子類,實(shí)現(xiàn)了QueueDisposable尽超,Observer<T>, Runnable
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable { private static final long serialVersionUID = 6576896619930983584L; final Observer<? super T> actual; final Scheduler.Worker worker; final boolean delayError; final int bufferSize; SimpleQueue<T> queue; // 一個(gè)隊(duì)列 Disposable s; Throwable error; volatile boolean done;// 標(biāo)記是否結(jié)束官撼,和disposable不一樣的是如果done為true,還是會(huì)走OnComplete或者onError volatile boolean cancelled;// 標(biāo)記disposable狀態(tài) int sourceMode; boolean outputFused; ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) { this.actual = actual; this.worker = worker; this.delayError = delayError; this.bufferSize = bufferSize; }
-
查看ObserveOnObserver類的onSubscribe方法
@Override public void onSubscribe(Disposable s) { if (DisposableHelper.validate(this.s, s)) { this.s = s; // 這里省略了一部分FuseMode的代碼似谁,默認(rèn)情況不會(huì)走 // 初始化隊(duì)列 queue = new SpscLinkedArrayQueue<T>(bufferSize); // 下游observer的onSubscribe回調(diào) actual.onSubscribe(this); } }
-
查看disposable的邏輯傲绣,cancelled變量標(biāo)記,dispose同時(shí)還會(huì)清空隊(duì)列和dispose任務(wù)worker
@Override public void dispose() { if (!cancelled) { cancelled = true; s.dispose(); worker.dispose();// 中斷Scheduler任務(wù) if (getAndIncrement() == 0) { // 清空隊(duì)列 queue.clear(); } } } @Override public boolean isDisposed() { return cancelled; }
-
查看ObserveOnObserver的onNext巩踏,把數(shù)據(jù)塞到隊(duì)列里秃诵,并且只有AtomicInteger值為0,才執(zhí)行任務(wù)worker.schedule
@Override public void onNext(T t) { if (done) { return; } if (sourceMode != QueueDisposable.ASYNC) { // 入隊(duì)列 queue.offer(t); } schedule(); } void schedule() { if (getAndIncrement() == 0) { // AtomicInteger的值為0就執(zhí)行worker.schedule塞琼,傳入的this是Runnable菠净;AtomicInteger加1 worker.schedule(this); } }
-
ObserveOnObserver的Runnable實(shí)現(xiàn)run方法
@Override public void run() { if (outputFused) { // 默認(rèn)為false,先不管 drainFused(); } else { drainNormal(); } } void drainNormal() { int missed = 1;// 這里成功接收一次數(shù)據(jù)missed就加1,默認(rèn)為1 final SimpleQueue<T> q = queue; final Observer<? super T> a = actual; for (;;) { if (checkTerminated(done, q.isEmpty(), a)) { // disposable或者done的情況會(huì)返回true return; } for (;;) { boolean d = done; T v; try { v = q.poll();// 取出隊(duì)列一個(gè)數(shù)據(jù)v } catch (Throwable ex) { Exceptions.throwIfFatal(ex); s.dispose(); q.clear(); a.onError(ex); worker.dispose(); return; } boolean empty = v == null; if (checkTerminated(d, empty, a)) { // 再次檢查是否已經(jīng)done或者diposable return; } if (empty) { // 如果隊(duì)列空了跳出里面的for循環(huán) break; } // 下游的Observer回調(diào)onNext a.onNext(v); } // 更新missed值毅往,表示還剩幾個(gè)走了schedule但是還沒有被調(diào)用onNext的任務(wù) missed = addAndGet(-missed); if (missed == 0) { // missed為0跳出for循環(huán) break; } } }
這里解釋下為什么要兩個(gè)for循環(huán)和為什么用AtomicInteger標(biāo)記schedule次數(shù)牵咙,因?yàn)橐紤]上游事件發(fā)送和下游事件接受速度是不一樣,而且worker.schedule導(dǎo)致上下游不在一個(gè)線程攀唯,比如下面幾個(gè)例子
- 發(fā)送數(shù)據(jù)(1霜大,2,3革答,4)很快战坤,接收數(shù)據(jù)很慢:那么異步情況下,很快的會(huì)調(diào)用四次schedule残拐,getAndIncrement只有第一次為0途茫,只會(huì)走一次worker.schedule(this),那么run方法就只會(huì)走一次溪食,會(huì)在外面的for循環(huán)跳出囊卜,也就是missed=0結(jié)束
- 發(fā)送數(shù)據(jù)(1,2错沃,3栅组,4,complete)很快枢析,接收數(shù)據(jù)很慢:基本同上玉掸,但是因?yàn)榘l(fā)送了complete導(dǎo)致狀態(tài)為disposable,會(huì)在里面的for循環(huán)return醒叁,因?yàn)閏heckTerminated(d, empty, a)返回了true
- 發(fā)送數(shù)據(jù)(1司浪,2)很慢,發(fā)送數(shù)據(jù)(3把沼,4)很快啊易,接收數(shù)據(jù)(2)很慢:那么異步情況下,有可能在2執(zhí)行完onNext饮睬,剛剛跳出里面的for循環(huán)租谈,這時(shí)候發(fā)送數(shù)據(jù)3了導(dǎo)致隊(duì)列不為空,miss不為空了所以不會(huì)跳出外面的for循環(huán)捆愁。這樣就不用worker.schedule(this);
-
總結(jié)下
- 同樣是橋接割去,這里對(duì)下游的observer做處理
- 傳入的scheduler用來控制怎么執(zhí)行
Example
下面代碼是我們很常見的一個(gè)例子,發(fā)送數(shù)據(jù)(1牙瓢,2)在IO線程執(zhí)行劫拗,Observer在UI線程中執(zhí)行
Observable.just(1,2)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.e("yj", "---onSubscribe==" + d.isDisposed());
}
@Override
public void onNext(@NonNull Integer i) {
Log.e("yj", "---onNext==" + i);
}
@Override
public void onError(@NonNull Throwable e) {
Log.e("yj", "---onError==" + e);
}
@Override
public void onComplete() {
Log.e("yj", "---onComplete");
}
});
- 依次創(chuàng)建了三個(gè)Observable:ObservableFromArray间校,ObservableSubscribeOn矾克,ObservableObserveOn
- 從下往上看subscribeActual方法調(diào)用:ObservableObserveOn不做處理,ObservableSubscribeOn使subscribe在io線程中執(zhí)行憔足,ObservableFromArray順序發(fā)送1胁附,2
- Observer的回調(diào)只有ObservableObserveOn處理了酒繁,使其在UI線程中被調(diào)用
再舉個(gè)的例子
下面的代碼執(zhí)行了兩次subscribeOn和observeOn,那么just和accept在哪個(gè)線程執(zhí)行呢?
Observable.just(1,2)
.subscribeOn(Schedulers.io())
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
.observeOn(Schedulers.io())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
}
});
答案:在兩個(gè)不同的IO線程
PS
我的github:https://github.com/nppp1990/MyTips