本系列文章如下:
- 拆輪子系列--RxJava前奏篇
- 拆輪子系列--RxJava理解(一)--Map解析
- 拆輪子系列--RxJava理解(二)--subscribeOn
- 拆輪子系列--RxJava理解(三)--observeOn
上一篇文章主要介紹了RxJava
中線程調(diào)度的核心方法之一subscribeOn
,本篇文章繼續(xù)分析RxJava
中線程調(diào)度的另一個(gè)核心方法--observeOn
版扩。本篇文章基于RxJava2
源碼進(jìn)行分析锈拨。
本文的大綱如下:
- 一個(gè)具體的例子
- observeOn源碼分析
- 總結(jié)
1 .一個(gè)具體的例子
首先,以一個(gè)具體的例子分析observeOn
的原理:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("1");
Thread.sleep(1000);
e.onNext("2");
Thread.sleep(1000);
e.onComplete();
}
})
.map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
Log.e("TAG", "map1--thread=" + Thread.currentThread().getName() + "-s:" + s);
return Integer.valueOf(s);
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.map(new Function<Integer, Long>() {
@Override
public Long apply(Integer integer) throws Exception {
Log.e("TAG", "map2--thread=" + Thread.currentThread().getName() + "-integer:" + integer);
return Long.valueOf(integer);
}
})
.observeOn(Schedulers.io())
.map(new Function<Long, String>() {
@Override
public String apply(Long aLong) throws Exception {
Log.e("TAG", "map3--thread=" + Thread.currentThread().getName() + "-aLong:" + aLong);
return String.valueOf(aLong);
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e("TAG", "Consumer--thread=" + Thread.currentThread().getName() + "-String:" + s);
}
});
如果你了解map
這個(gè)操作符惋鸥,那么這個(gè)例子你很快就能得運(yùn)行結(jié)果贺嫂,如果你對(duì)于map
這個(gè)操作符不太清楚际起,建議回顧下之前的文章拆輪子系列--RxJava理解(一)--Map解析拾碌。接下來我們看看本例的程序運(yùn)行結(jié)果:
E/TAG: map1--thread-main-s:1
E/TAG: map2--thread-main-integer:1
E/TAG: map3--thread-RxCachedThreadScheduler-1-aLong:1
E/TAG: Consumer--thread-RxCachedThreadScheduler-1-String:1
E/TAG: map1--thread-main-s:2
E/TAG: map2--thread-main-integer:2
E/TAG: map3--thread-RxCachedThreadScheduler-1-aLong:2
E/TAG: Consumer--thread-RxCachedThreadScheduler-1-String:2
細(xì)看下之前的例子吐葱,可能有些朋友已經(jīng)發(fā)現(xiàn)了一個(gè)異常操作Thread.sleep(1000);
。為什么在發(fā)射元素的時(shí)候睡了一秒鐘校翔?這個(gè)是為什么呢唇撬?哈哈,先不急展融,下文將一一道來窖认。
從上面運(yùn)行的結(jié)果我們發(fā)現(xiàn),除了observeOn()
下面的部分運(yùn)行在observeOn()
指定的線程中告希,其余的部分運(yùn)行在subscribeOn()
指定的線程扑浸,這個(gè)是為什么呢?下面再分析燕偶,這里先給個(gè)結(jié)論:RxJava中喝噪,observeOn()是用來指定下游observer回調(diào)發(fā)生的線程。對(duì)應(yīng)上面的例子指么,也就是map3與Consumer運(yùn)行的線程酝惧。
2. observeOn源碼分析
為什么會(huì)產(chǎn)生上面的結(jié)果?我們來看看源碼:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
...
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
從源碼中我們可以看出伯诬,調(diào)用observeOn()
方法返回了一個(gè)Observable
對(duì)象晚唇,而真正的操作是在ObservableObserveOn()
這個(gè)方法里面,接下來我們看看ObservableObserveOn()
這個(gè)方法到底干了什么事情:
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
我們主要看看ObservableObserveOn
中主要的實(shí)現(xiàn)方法subscribeActual()
盗似。在這個(gè)方法中哩陕,首先創(chuàng)建了一個(gè)指定的事物worker
,然后將worker
作為參數(shù)創(chuàng)建了一個(gè)ObserveOnObserver
對(duì)象赫舒,接下來我們分析這個(gè)ObserveOnObserver
中具體的邏輯:
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.actual = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
if (s instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<T> qd = (QueueDisposable<T>) s;
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
if (m == QueueDisposable.SYNC) {
sourceMode = m;
queue = qd;
done = true;
actual.onSubscribe(this);
schedule();
return;
}
if (m == QueueDisposable.ASYNC) {
sourceMode = m;
queue = qd;
actual.onSubscribe(this);
return;
}
}
queue = new SpscLinkedArrayQueue<T>(bufferSize);
actual.onSubscribe(this);
}
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
...
ObserveOnObserver
實(shí)現(xiàn)了Observer
這個(gè)接口悍及,重寫了Observer
里面的方法,我們看看主要的方法onNext()
接癌。在該方法中心赶,首先會(huì)向queue()
中添加元素,我們主要關(guān)注schedule()
這個(gè)方法缺猛,進(jìn)入schedule()
:
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
上述方法將實(shí)現(xiàn)了Runnable
接口的ObserveOnObserver
對(duì)象放入了worker
里面進(jìn)行操作缨叫,直白的說,就是該ObserveOnObserver
對(duì)象的操作會(huì)被放入一個(gè)線程池中枯夜,尋找合適的線程運(yùn)行弯汰。
主要的問題來了艰山,當(dāng)ObserveOnObserver
對(duì)象尋找到一條線程后執(zhí)行了什么操作呢湖雹?繼續(xù)看源碼:
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
//我們主要看看drainNormal()這個(gè)方法:
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
其實(shí)這個(gè)方法就是一個(gè)死循環(huán),它不斷的從queue
取出元素然后交給由下一級(jí)傳遞上來的observer
來執(zhí)行onNext()
方法曙搬。而這整個(gè)從queue
中取元素到由下級(jí)的observer
執(zhí)行onNext()
方法摔吏,都是執(zhí)行在scheduler( Scheduler.Worker w = scheduler.createWorker();)
所指定的線程中鸽嫂。總的來說征讲,ObserveOnObserver
會(huì)將下一級(jí)傳遞過來的observer
進(jìn)行封裝据某,讓它獨(dú)立的運(yùn)行在scheduler
指定的線程中去處理元素。
再回到前面的例子诗箍,我們?cè)?code>observeOn()操作符后面接著使用了一個(gè)map()
操作符癣籽,那么此時(shí)的流程又是怎么樣的呢?我們以一張圖來進(jìn)行說明:
從上圖中可以看到滤祖,observeOn
后面跟了一個(gè)map()
筷狼,那么在drainNormal ()
方法中a.onNext(v)
的a
就是經(jīng)過map
轉(zhuǎn)換過的observer
,接著調(diào)用map
中o.onNext(transformer.call(t))
匠童,此時(shí)保證了transformer.call()
方法運(yùn)行在observeOn()
所指定的線程中埂材,而o
就是observer2
。
3. 總結(jié)
使用observeOn()
這個(gè)操作符汤求,會(huì)在原來Observer
發(fā)射元素的時(shí)候俏险,將元素一個(gè)個(gè)的添加到一個(gè)指定的隊(duì)列中,然后異步(使用一個(gè)新的線程)的從該隊(duì)列中取出元素扬绪,將取出的元素交給下一級(jí)的observer
的onNext()
方法來處理元素竖独。
回到前面拋出的一個(gè)問題,我們?cè)诎l(fā)射元素的時(shí)候sleep了1秒鐘
挤牛,這個(gè)是為什么呢预鬓?說明一下:因?yàn)槲覀內(nèi)≡氐倪^程是異步操作的,那么很有可能出現(xiàn)某個(gè)線程的轉(zhuǎn)換執(zhí)行完畢之后才執(zhí)行另一個(gè)線程的轉(zhuǎn)換操作赊颠,最后與我們期望的結(jié)果不太一樣格二。
當(dāng)我們?nèi)サ衾又?code>sleep()操作,其結(jié)果如下:
E/TAG: map1--thread=main-s:1
E/TAG: map2--thread=main-integer:1
E/TAG: map1--thread=main-s:2
E/TAG: map2--thread=main-integer:2
E/TAG: map3--thread=RxCachedThreadScheduler-1-aLong:1
E/TAG: Consumer--thread=RxCachedThreadScheduler-1-String:1
E/TAG: map3--thread=RxCachedThreadScheduler-1-aLong:2
E/TAG: Consumer--thread=RxCachedThreadScheduler-1-String:2
好了竣蹦,關(guān)于RxJava
中線程調(diào)度的核心方法observeOn
操作符已經(jīng)介紹完畢顶猜。
如果文章中有什么疏漏或者錯(cuò)誤的地方,還望各位指正痘括,你們的監(jiān)督是我最大的動(dòng)力长窄,謝謝!