一、前言
- 基于RxJava2.1.1
- 我們在前面的 RxJava2.0使用詳解(一)初步分析了RxJava從創(chuàng)建到執(zhí)行的流程笨奠。RxJava2.0使用詳解(二) 中分析了RxJava的隨意終止Reactive流的能力的來源袭蝗;也明白了
RxJava
的onComplete();
與onError(t);
只有一個會被執(zhí)行的秘密唤殴。RxJava2.X 源碼分析(三)中探索了RxJava2調(diào)用subscribeOn切換被觀察者線程的原理。 - 本次我們將繼續(xù)探索
RxJava2.x
切換觀察者的原理到腥,分析observeOn
與subscribeOn
的不同之處眨八。繼續(xù)實現(xiàn)我們在第一篇中定下的小目標
二、從Demo到原理
- OK左电,我們的Demo還是上次的demo廉侧,忘記了的小伙伴可以點擊RxJava2.X 源碼分析(三),這里就不再重復(fù)了哦篓足,我們直接進入正題段誊。
- Ok,按照套路栈拖,我們從
observeOn
方法入手连舍。
- Ok,我點~_
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
//false為默認無延遲發(fā)送錯誤涩哟,bufferSize為緩沖區(qū)大小
return observeOn(scheduler, false, bufferSize());
}
- 我們繼續(xù)往下看索赏,我猜套路跟
subscribeOn
的逃不多,也是采用裝飾者模式贴彼,wrapper我們的Observable
和Observer
產(chǎn)生一個中間被觀察者和觀察中潜腻,通過中間被觀察者訂閱上游被觀察者,通過中間觀察者接收上游被觀察者下發(fā)的數(shù)據(jù)器仗,然后通過線程切換將數(shù)據(jù)傳遞給下游觀察者融涣。
- Ok,我們來驗證下才想精钮。我覺得就是沒完全猜對威鹿,也能猜對其中的大部分。
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
-
Ok轨香,熟悉的
RxJavaPlugins.onAssembly
hook處理忽你,略過,直接看new ObservableObserveOn(this, scheduler, delayError, bufferSize)
這句public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> { final Scheduler scheduler; final boolean delayError; final int bufferSize; public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) { super(source); this.scheduler = scheduler; this.delayError = delayError; this.bufferSize = bufferSize; } @Override protected void subscribeActual(Observersuper T> observer) { //1臂容、在當前線程調(diào)度科雳,但不是立即執(zhí)行,放入隊列中 if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { //2策橘、本次走的是這里 Scheduler.Worker w = scheduler.createWorker(); //3 source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } }
- Ok,果然炸渡,熟悉的模式娜亿,對我們上游的
Observable
,下游的Observer
wrapper一次丽已。
1、ObservableObserveOn
繼承了AbstractObservableWithUpstream
2买决、source
保存上游的Observable
3沛婴、scheduler
為本次的調(diào)度器
4吼畏、在下游調(diào)用subscribe
訂閱時觸發(fā)->subscribeActual
->Wrapper了下游的Observer
觀察者
- 3處:source為游Observable,下游Observer被wrapper到ObserveOnObserver嘁灯,發(fā)生訂閱數(shù)件泻蚊,上游Observable開始執(zhí)行subscribeActual,調(diào)用ObserveOnObserver的onSubscribe以及onNext丑婿、onError性雄、onComplete等
- OK,我們接著看Observer被包裝進 ObserveOnObserver的樣子羹奉,代碼有點多秒旋,我們分段講解
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
private static final long serialVersionUID = 6576896619930983584L;
//下游的Observer
final Observersuper T> actual;
//調(diào)度工作者
final Scheduler.Worker worker;
//是否延遲錯誤,默認false
final boolean delayError;
//隊列大小
final int bufferSize;
//存儲上游Observable下發(fā)的數(shù)據(jù)隊列
SimpleQueue<T> queue;
//存儲下游Observer的Disposable
Disposable s;
//存儲錯誤信息
Throwable error;
//校驗是否完畢
volatile boolean done;
//是否被取消
volatile boolean cancelled;
//存儲執(zhí)行模式诀拭,同步或者異步 同步
int sourceMode;
boolean outputFused;
ObserveOnObserver(Observersuper T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.actual = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
if (s instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<T> qd = (QueueDisposable<T>) s;
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
//1迁筛、判斷執(zhí)行模式并調(diào)用onSubscribe傳遞給下游Observer
if (m == QueueDisposable.SYNC) {
sourceMode = m;
queue = qd;
//true 后面的onXX方法都不會被調(diào)用
done = true;
actual.onSubscribe(this);
//2、同步模式下耕挨,直接調(diào)用schedule
schedule();
return;
}
if (m == QueueDisposable.ASYNC) {
sourceMode = m;
queue = qd;
actual.onSubscribe(this);
//2细卧、異步模式下,等待schedule
return;
}
}
queue = new SpscLinkedArrayQueue<T>(bufferSize);
//判斷執(zhí)行模式并調(diào)用onSubscribe傳遞給下游Observer
actual.onSubscribe(this);
}
}
- OK筒占,執(zhí)行玩這里之后贪庙,就到我們的onXX方法了
- 首先可無限調(diào)用的
onNext
@Override
public void onNext(T t) {
//3、數(shù)據(jù)源是同步模式或者執(zhí)行過error / complete 會是true
if (done) {
return;
}
//如果數(shù)據(jù)源不是異步類型翰苫,
if (sourceMode != QueueDisposable.ASYNC) {
//4插勤、上游Observable下發(fā)的數(shù)據(jù)壓入queue
queue.offer(t);
}
//5、開始調(diào)度
schedule();
}
- 其次只能觸發(fā)一次的onError革骨,基本差不多
@Override
public void onError(Throwable t) {
if (done) {
//6农尖、已完成再執(zhí)行會拋一場
RxJavaPlugins.onError(t);
return;
}
//7、記錄錯誤信息
error = t;
//8良哲、標識已完成
done = true;
//9盛卡、開始調(diào)度
schedule();
}
- 同樣是只能觸發(fā)一次的onComplete,同樣的套路筑凫,就不說了
@Override
public void onComplete() {
if (done) {
return;
}
done = true;
schedule();
}
- 然后就是我們的關(guān)鍵點
schedule();
//關(guān)鍵點就是直接滑沧、簡單、里面線程調(diào)度工作者調(diào)用schedule(this)巍实,傳入了this
void schedule() {
//getAndIncrement很關(guān)鍵滓技,他原子性的保證了worker.schedule(this);在調(diào)度完之前不會被再次調(diào)度
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
什么?傳入了this棚潦?那么說明什么呢令漂?( ̄? ̄)
嗯?this是個
runnable
,沒錯叠必,我們的ObserveOnObserver
實現(xiàn)了Runnable
接口那么荚孵,接下來自然是調(diào)用
run
方法
@Override
public void run() {
//outputFused一般是false
if (outputFused) {
drainFused();
} else {
drainNormal();
}
- 好吧,在看drainNormal前纬朝,我們先看一個函數(shù)
//從名字看是檢測是否已終止
boolean checkTerminated(boolean d, boolean empty, Observersuper T> a) {
//1收叶、訂閱已取消
if (cancelled) {
//清空隊列
queue.clear();
return true;
}
//2、d其實是done共苛,
if (d) {
//done==ture可能的情況onNext剛被調(diào)度完判没,onError或者onCompele被調(diào)用,
Throwable e = error;
if (delayError) {
//delayError==true時等到隊列為空才調(diào)用
if (empty) {
if (e != null) {
a.onError(e);
} else {
a.onComplete();
}
worker.dispose();
return true;
}
} else {
//否則直接調(diào)用
if (e != null) {
queue.clear();
a.onError(e);
worker.dispose();
return true;
} else
if (empty) {
a.onComplete();
worker.dispose();
return true;
}
}
}
//否則未終結(jié)
return false;
}
true:1隅茎、訂閱被取消cancelled==true哆致,2、done==true onNext剛被調(diào)度完患膛,onError或者onCompele被調(diào)用
繼續(xù)看drainNormal
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observersuper T> a = actual;
//Ok,死循環(huán)摊阀,我們來看下有哪些出口
for (;;) {
//Ok,出口踪蹬,該方法前面分析的
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
//在此死循環(huán)
for (;;) {
boolean d = done;
T v;
try {
//分發(fā)數(shù)據(jù)出隊列
v = q.poll();
} catch (Throwable ex) {
//有異常時終止退出
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
//停止worker(線程)
worker.dispose();
return;
}
boolean empty = v == null;
//判斷隊列是否為空
if (checkTerminated(d, empty, a)) {
return;
}
//沒數(shù)據(jù)退出
if (empty) {
break;
}
//數(shù)據(jù)下發(fā)給下游Obsever胞此,這里支付者onNext,onComplete和onError主要放在了checkTerminated里面回調(diào)
a.onNext(v);
}
//保證此時確實有一個 worker.schedule(this);正在被執(zhí)行跃捣,
missed = addAndGet(-missed);
//為何要這樣做呢漱牵?我的理解是保證drainNormal方法被原子性調(diào)用,如果執(zhí)行了addAndGet之后getAndIncrement() == 0就成立了疚漆,此時又一個worker.schedule(this);被調(diào)用了酣胀,那么就不能執(zhí)行break了
if (missed == 0) {
break;
}
}
}
總結(jié)
- Ok,看到這里我們基本了解了observeOn的實現(xiàn)流程娶聘,同樣是老套路闻镶,使用裝飾者模式,中間Wrapper了我們的Observable和Observer丸升,通過中間增加一個Observable和Observer來實現(xiàn)線程的切換铆农。
- 喜歡就給我留言哦