轉(zhuǎn)載請(qǐng)標(biāo)明出處:
http://www.reibang.com/p/6ef45f8ee79d
本文出自:【張旭童的簡(jiǎn)書(shū)】 (http://www.reibang.com/users/8e91ff99b072/latest_articles)
概述
承接上一篇RxJava2 源碼解析(一)所袁,
本系列我們的目的:
- 知道源頭(
Observable
)是如何將數(shù)據(jù)發(fā)送出去的话浇。 - 知道終點(diǎn)(
Observer
)是如何接收到數(shù)據(jù)的。 - 何時(shí)將源頭和終點(diǎn)關(guān)聯(lián)起來(lái)的
- 知道線程調(diào)度是怎么實(shí)現(xiàn)的
- 知道操作符是怎么實(shí)現(xiàn)的
本篇計(jì)劃講解一下4,5.
RxJava最強(qiáng)大的莫過(guò)于它的線程調(diào)度 和 花式操作符丁稀。
map操作符
map是一個(gè)高頻的操作符达址,我們首先拿他開(kāi)刀铃彰。
例子如下趴俘,源頭Observable
發(fā)送的是String類(lèi)型的數(shù)字蜗巧,利用map轉(zhuǎn)換成int型掌眠,最終在終點(diǎn)Observer
接受到的也是int類(lèi)型數(shù)據(jù)。:
final Observable<String> testCreateObservable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("1");
e.onComplete()
}
});
Observable<Integer> map = testCreateObservable.map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
return Integer.parseInt(s);
}
});
map.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe() called with: d = [" + d + "]");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "onNext() called with: value = [" + value + "]");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError() called with: e = [" + e + "]");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete() called");
}
});
我們看一下map
函數(shù)的源碼:
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
//判空略過(guò)
ObjectHelper.requireNonNull(mapper, "mapper is null");
//RxJavaPlugins.onAssembly()是hook 上文提到過(guò)
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
RxJavaPlugins.onAssembly()
是hook 上文提到過(guò),所以我們只要看ObservableMap
幕屹,它就是返回到我們手里的Observable
:
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
//將function變換函數(shù)類(lèi)保存起來(lái)
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
//super()將上游的Observable保存起來(lái) 蓝丙,用于subscribeActual()中用级遭。
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
它繼承自AbstractObservableWithUpstream
,該類(lèi)繼承自Observable
,很簡(jiǎn)單渺尘,就是將上游的ObservableSource
保存起來(lái)挫鸽,做一次wrapper,所以它也算是裝飾者模式的提現(xiàn)鸥跟,如下:
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
//將上游的`ObservableSource`保存起來(lái)
protected final ObservableSource<T> source;
AbstractObservableWithUpstream(ObservableSource<T> source) {
this.source = source;
}
@Override
public final ObservableSource<T> source() {
return source;
}
}
關(guān)于ObservableSource
丢郊,代表了一個(gè)標(biāo)準(zhǔn)的無(wú)背壓的 源數(shù)據(jù)接口,可以被Observer
消費(fèi)(訂閱)医咨,如下:
public interface ObservableSource<T> {
void subscribe(Observer<? super T> observer);
}
所有的Observable
都已經(jīng)實(shí)現(xiàn)了它,所以我們可以認(rèn)為Observable
和ObservableSource
在本文中是相等的:
public abstract class Observable<T> implements ObservableSource<T> {
所以我們得到的ObservableMap
對(duì)象也很簡(jiǎn)單蚂夕,就是將上游的Observable
和變換函數(shù)類(lèi)Function
保存起來(lái)。
Function
的定義超級(jí)簡(jiǎn)單腋逆,就是一個(gè)接口婿牍,給我一個(gè)T,還你一個(gè)R.
public interface Function<T, R> {
R apply(T t) throws Exception;
}
本例寫(xiě)的是將String->int.
重頭戲惩歉,subscribeActual()
是訂閱真正發(fā)生的地方等脂,ObservableMap
如下編寫(xiě),就一句話撑蚌,用MapObserver訂閱上游Observable上遥。:
@Override
public void subscribeActual(Observer<? super U> t) {
//用MapObserver訂閱上游Observable。
source.subscribe(new MapObserver<T, U>(t, function));
}
MapObserver
也是裝飾者模式争涌,對(duì)終點(diǎn)(下游)Observer
修飾粉楚。
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
//super()將actual保存起來(lái)
super(actual);
//保存Function變量
this.mapper = mapper;
}
@Override
public void onNext(T t) {
//done在onError 和 onComplete以后才會(huì)是true,默認(rèn)這里是false亮垫,所以跳過(guò)
if (done) {
return;
}
//默認(rèn)sourceMode是0模软,所以跳過(guò)
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
//下游Observer接受的值
U v;
//這一步執(zhí)行變換,將上游傳過(guò)來(lái)的T,利用Function轉(zhuǎn)換成下游需要的U饮潦。
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
//變換后傳遞給下游Observer
actual.onNext(v);
}
到此我們梳理一下流程:
訂閱的過(guò)程燃异,是從下游到上游依次訂閱的。
- 即終點(diǎn)
Observer
訂閱了map
返回的ObservableMap
继蜡。 - 然后
map
的Observable
(ObservableMap
)在被訂閱時(shí)回俐,會(huì)訂閱其內(nèi)部保存上游Observable
,用于訂閱上游的Observer
是一個(gè)裝飾者(MapObserver
)稀并,內(nèi)部保存了下游(本例是終點(diǎn))Observer
仅颇,以便上游發(fā)送數(shù)據(jù)過(guò)來(lái)時(shí),能傳遞給下游碘举。 - 以此類(lèi)推忘瓦,直到源頭
Observable
被訂閱,根據(jù)上節(jié)課內(nèi)容殴俱,它開(kāi)始向Observer發(fā)送數(shù)據(jù)政冻。
數(shù)據(jù)傳遞的過(guò)程枚抵,當(dāng)然是從上游push到下游的线欲,
-
源頭
Observable
傳遞數(shù)據(jù)給下游Observer
(本例就是MapObserver
) - 然后
MapObserver
接收到數(shù)據(jù)明场,對(duì)其變換操作后(實(shí)際的function在這一步執(zhí)行),再調(diào)用內(nèi)部保存的下游Observer
的onNext()
發(fā)送數(shù)據(jù)給下游 - 以此類(lèi)推李丰,直到終點(diǎn)
Observer
苦锨。
線程調(diào)度subscribeOn
簡(jiǎn)化問(wèn)題,代碼如下:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
Log.d(TAG, "subscribe() called with: e = [" + e + "]" + Thread.currentThread());
e.onNext("1");
e.onComplete();
}
//只是在Observable和Observer之間增加了一句線程調(diào)度代碼
}).subscribeOn(Schedulers.io())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe() called with: d = [" + d + "]");
}
@Override
public void onNext(String value) {
Log.d(TAG, "onNext() called with: value = [" + value + "]");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError() called with: e = [" + e + "]");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete() called");
}
});
只是在Observable
和Observer
之間增加了一句線程調(diào)度代碼:.subscribeOn(Schedulers.io())
.
查看subscribeOn()
源碼:
public final Observable<T> subscribeOn(Scheduler scheduler) {
//判空略過(guò)
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
//拋開(kāi)Hook趴泌,重點(diǎn)還是ObservableSubscribeOn
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
等等舟舒,怎么有種似曾相識(shí)的感覺(jué),大家可以把文章向上翻嗜憔,看看map()
的源碼秃励。
和subscribeOn()
的套路如出一轍,那么我們根據(jù)上面的結(jié)論吉捶,
先猜測(cè)ObservableSubscribeOn
類(lèi)也是一個(gè)包裝類(lèi)(裝飾者)夺鲜,點(diǎn)進(jìn)去查看:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
//保存線程調(diào)度器
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
//map的源碼中我們分析過(guò),super()只是簡(jiǎn)單的保存ObservableSource
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
//1 創(chuàng)建一個(gè)包裝Observer
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//2 手動(dòng)調(diào)用 下游(終點(diǎn))Observer.onSubscribe()方法,所以onSubscribe()方法執(zhí)行在 訂閱處所在的線程
s.onSubscribe(parent);
//3 setDisposable()是為了將子線程的操作加入Disposable管理中
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
//4 此時(shí)已經(jīng)運(yùn)行在相應(yīng)的Scheduler 的線程中
source.subscribe(parent);
}
}));
}
和map套路大體一致呐舔,ObservableSubscribeOn
自身同樣是個(gè)包裝類(lèi)币励,同樣繼承AbstractObservableWithUpstream
。
創(chuàng)建了一個(gè)SubscribeOnObserver
類(lèi)珊拼,該類(lèi)按照套路食呻,應(yīng)該也是實(shí)現(xiàn)了Observer
、Disposable
接口的包裝類(lèi)澎现,讓我們看一下:
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
//真正的下游(終點(diǎn))觀察者
final Observer<? super T> actual;
//用于保存上游的Disposable仅胞,以便在自身dispose時(shí),連同上游一起dispose
final AtomicReference<Disposable> s;
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable s) {
//onSubscribe()方法由上游調(diào)用剑辫,傳入Disposable饼问。在本類(lèi)中賦值給this.s,加入管理揭斧。
DisposableHelper.setOnce(this.s, s);
}
//直接調(diào)用下游觀察者的對(duì)應(yīng)方法
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
//取消訂閱時(shí)莱革,連同上游Disposable一起取消
@Override
public void dispose() {
DisposableHelper.dispose(s);
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
//這個(gè)方法在subscribeActual()中被手動(dòng)調(diào)用,為了將Schedulers返回的Worker加入管理
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
這兩個(gè)類(lèi)根據(jù)上一節(jié)的鋪墊加上注釋?zhuān)渌己美斫舛锟晕⒉缓美斫獾膽?yīng)該是下面兩句代碼:
//ObservableSubscribeOn類(lèi)
//3 setDisposable()是為了將子線程的操作加入Disposable管理中
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
//4 此時(shí)已經(jīng)運(yùn)行在相應(yīng)的Scheduler 的線程中
source.subscribe(parent);
}
}));
//SubscribeOnObserver類(lèi)
//這個(gè)方法在subscribeActual()中被手動(dòng)調(diào)用盅视,為了將Schedulers返回的Worker加入管理
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
其中scheduler.scheduleDirect(new Runnable()..)
方法源碼如下:
/**
* Schedules the given task on this scheduler non-delayed execution.
* .....
*/
public Disposable scheduleDirect(Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
從注釋和方法名我們可以看出,這個(gè)傳入的Runnable
會(huì)立刻執(zhí)行旦万。
再繼續(xù)往里面看:
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
//class Worker implements Disposable 闹击,Worker本身是實(shí)現(xiàn)了Disposable
final Worker w = createWorker();
//hook略過(guò)
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//開(kāi)始在Worker的線程執(zhí)行任務(wù),
w.schedule(new Runnable() {
@Override
public void run() {
try {
//調(diào)用的是 run()不是 start()方法執(zhí)行的線程的方法成艘。
decoratedRun.run();
} finally {
//執(zhí)行完畢會(huì) dispose()
w.dispose();
}
}
}, delay, unit);
//返回Worker對(duì)象
return w;
}
createWorker()
是一個(gè)抽象方法赏半,由具體的Scheduler
類(lèi)實(shí)現(xiàn)贺归,例如IoScheduler
對(duì)應(yīng)的Schedulers.io()
.
public abstract Worker createWorker();
初看源碼,為了了解大致流程断箫,不宜過(guò)入深入拂酣,先點(diǎn)到為止。
OK仲义,現(xiàn)在我們總結(jié)一下scheduler.scheduleDirect(new Runnable()..)
的重點(diǎn):
- 傳入的
Runnable
是立刻執(zhí)行的婶熬。 - 返回的
Worker
對(duì)象就是一個(gè)Disposable
對(duì)象, -
Runnable
執(zhí)行時(shí)埃撵,是直接手動(dòng)調(diào)用的run()
赵颅,而不是start()
方法. - 上一點(diǎn)應(yīng)該是為了,能控制在
run()
結(jié)束后(包括異常終止)暂刘,都會(huì)自動(dòng)執(zhí)行Worker
.dispose()
.
而返回的Worker
對(duì)象也會(huì)被parent.setDisposable(...)
加入管理中饺谬,以便在手動(dòng)dispose()
時(shí)能取消線程里的工作。
我們總結(jié)一下subscribeOn(Schedulers.xxx())
的過(guò)程:
- 返回一個(gè)
ObservableSubscribeOn
包裝類(lèi)對(duì)象 - 上一步返回的對(duì)象被訂閱時(shí)谣拣,回調(diào)該類(lèi)中的
subscribeActual()
方法募寨,在其中會(huì)立刻將線程切換到對(duì)應(yīng)的Schedulers.xxx()
線程。 -
在切換后的線程中芝发,執(zhí)行
source.subscribe(parent);
绪商,對(duì)上游(終點(diǎn))Observable
訂閱 -
上游(終點(diǎn))
Observable
開(kāi)始發(fā)送數(shù)據(jù),根據(jù)RxJava2 源碼解析(一)辅鲸,上游發(fā)送數(shù)據(jù)僅僅是調(diào)用下游觀察者對(duì)應(yīng)的onXXX()
方法而已格郁,所以此時(shí)操作是在切換后的線程中進(jìn)行。
一點(diǎn)擴(kuò)展独悴,
大家可能看過(guò)一個(gè)結(jié)論:
subscribeOn(Schedulers.xxx())
切換線程N(yùn)次例书,總是以第一次為準(zhǔn),或者說(shuō)離源Observable最近的那次為準(zhǔn)刻炒,并且對(duì)其上面的代碼生效(這一點(diǎn)對(duì)比的ObserveOn()
)决采。
為什么?
- 因?yàn)楦鶕?jù)RxJava2 源碼解析(一)中提到坟奥,訂閱流程從下游往上游傳遞
- 在
subscribeActual()
里開(kāi)啟了Scheduler的工作树瞭,source.subscribe(parent);
,從這一句開(kāi)始切換了線程,所以在這之上的代碼都是在切換后的線程里的了爱谁。 - 但如果連續(xù)切換晒喷,最上面的切換最晚執(zhí)行,此時(shí)線程變成了最上面的
subscribeOn(xxxx)
指定的線程, - 而數(shù)據(jù)push時(shí)访敌,是從上游到下游的凉敲,所以會(huì)在離源頭最近的那次
subscribeOn(xxxx)
的線程里push數(shù)據(jù)(onXXX()
)給下游。
可寫(xiě)如下代碼驗(yàn)證:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
Log.d(TAG, "subscribe() called with: e = [" + e + "]" + Thread.currentThread());
e.onNext("1");
e.onComplete();
}
}).subscribeOn(Schedulers.io())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
//依然是io線程
Log.d(TAG, "apply() called with: s = [" + s + "]" + Thread.currentThread());
return s;
}
})
.subscribeOn(Schedulers.computation())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe() called with: d = [" + d + "]");
}
@Override
public void onNext(String value) {
Log.d(TAG, "onNext() called with: value = [" + value + "]");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError() called with: e = [" + e + "]");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete() called");
}
});
線程調(diào)度observeOn
在上一節(jié)的基礎(chǔ)上,增加一個(gè)observeOn(AndroidSchedulers.mainThread())
,就完成了觀察者線程的切換爷抓。
.subscribeOn(Schedulers.computation())
//在上一節(jié)的基礎(chǔ)上势决,增加一個(gè)ObserveOn
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
繼續(xù)看源碼吧,我已經(jīng)能猜出來(lái)了蓝撇,hook+new XXXObservable();
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
....
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
果然果复,查看ObservableObserveOn
,:
高能預(yù)警,這部分的代碼 有些略多唉地,建議讀者打開(kāi)源碼邊看邊讀据悔。
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
//本例是 AndroidSchedulers.mainThread()
final Scheduler scheduler;
//默認(rèn)false
final boolean delayError;
//默認(rèn)128
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
// false
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
//1 創(chuàng)建出一個(gè) 主線程的Worker
Scheduler.Worker w = scheduler.createWorker();
//2 訂閱上游數(shù)據(jù)源传透,
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
本例中耘沼,就是兩步:
- 創(chuàng)建一個(gè)
AndroidSchedulers.mainThread()
對(duì)應(yīng)的Worker
- 用
ObserveOnObserver
訂閱上游數(shù)據(jù)源。這樣當(dāng)數(shù)據(jù)從上游push下來(lái)朱盐,會(huì)由ObserveOnObserver
對(duì)應(yīng)的onXXX()
處理群嗤。
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
//下游的觀察者
final Observer<? super T> actual;
//對(duì)應(yīng)Scheduler里的Worker
final Scheduler.Worker worker;
//上游被觀察者 push 過(guò)來(lái)的數(shù)據(jù)都存在這里
SimpleQueue<T> queue;
Disposable s;
//如果onError了,保存對(duì)應(yīng)的異常
Throwable error;
//是否完成
volatile boolean done;
//是否取消
volatile boolean cancelled;
// 代表同步發(fā)送 異步發(fā)送
int sourceMode;
....
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
//省略大量無(wú)關(guān)代碼
//創(chuàng)建一個(gè)queue 用于保存上游 onNext() push的數(shù)據(jù)
queue = new SpscLinkedArrayQueue<T>(bufferSize);
//回調(diào)下游觀察者onSubscribe方法
actual.onSubscribe(this);
}
}
@Override
public void onNext(T t) {
//1 執(zhí)行過(guò)error / complete 會(huì)是true
if (done) {
return;
}
//2 如果數(shù)據(jù)源類(lèi)型不是異步的兵琳, 默認(rèn)不是
if (sourceMode != QueueDisposable.ASYNC) {
//3 將上游push過(guò)來(lái)的數(shù)據(jù) 加入 queue里
queue.offer(t);
}
//4 開(kāi)始進(jìn)入對(duì)應(yīng)Workder線程狂秘,在線程里 將queue里的t 取出 發(fā)送給下游Observer
schedule();
}
@Override
public void onError(Throwable t) {
//已經(jīng)done 會(huì) 拋異常 和 上一篇文章里提到的一樣
if (done) {
RxJavaPlugins.onError(t);
return;
}
//給error存?zhèn)€值
error = t;
done = true;
//開(kāi)始調(diào)度
schedule();
}
@Override
public void onComplete() {
//已經(jīng)done 會(huì) 返回 不會(huì)crash 和上一篇文章里提到的一樣
if (done) {
return;
}
done = true;
//開(kāi)始調(diào)度
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
//該方法需要傳入一個(gè)線程, 注意看本類(lèi)實(shí)現(xiàn)了Runnable的接口躯肌,所以查看對(duì)應(yīng)的run()方法
worker.schedule(this);
}
}
//從這里開(kāi)始者春,這個(gè)方法已經(jīng)是在Workder對(duì)應(yīng)的線程里執(zhí)行的了
@Override
public void run() {
//默認(rèn)是false
if (outputFused) {
drainFused();
} else {
//取出queue里的數(shù)據(jù) 發(fā)送
drainNormal();
}
}
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
for (;;) {
// 1 如果已經(jīng) 終止 或者queue空,則跳出函數(shù)清女,
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
//2 從queue里取出一個(gè)值
v = q.poll();
} catch (Throwable ex) {
//3 異常處理 并跳出函數(shù)
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
return;
}
boolean empty = v == null;
//4 再次檢查 是否 終止 如果滿足條件 跳出函數(shù)
if (checkTerminated(d, empty, a)) {
return;
}
//5 上游還沒(méi)結(jié)束數(shù)據(jù)發(fā)送钱烟,但是這邊處理的隊(duì)列已經(jīng)是空的,不會(huì)push給下游 Observer
if (empty) {
//僅僅是結(jié)束這次循環(huán)嫡丙,不發(fā)送這個(gè)數(shù)據(jù)而已拴袭,并不會(huì)跳出函數(shù)
break;
}
//6 發(fā)送給下游了
a.onNext(v);
}
//7 對(duì)不起這里我也不是很明白,大致猜測(cè)是用于 同步原子操作 如有人知道 煩請(qǐng)告知
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
//檢查 是否 已經(jīng) 結(jié)束(error complete)曙博, 是否沒(méi)數(shù)據(jù)要發(fā)送了(empty 空)拥刻,
boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
//如果已經(jīng)disposed
if (cancelled) {
queue.clear();
return true;
}
// 如果已經(jīng)結(jié)束
if (d) {
Throwable e = error;
//如果是延遲發(fā)送錯(cuò)誤
if (delayError) {
//如果空
if (empty) {
if (e != null) {
a.onError(e);
} else {
a.onComplete();
}
//停止worker(線程)
worker.dispose();
return true;
}
} else {
//發(fā)送錯(cuò)誤
if (e != null) {
queue.clear();
a.onError(e);
worker.dispose();
return true;
} else
//發(fā)送complete
if (empty) {
a.onComplete();
worker.dispose();
return true;
}
}
}
return false;
}
}
核心處都加了注釋?zhuān)偨Y(jié)起來(lái)就是,
-
ObserveOnObserver
實(shí)現(xiàn)了Observer
和Runnable
接口父泳。 - 在
onNext()
里般哼,先不切換線程,將數(shù)據(jù)加入隊(duì)列queue
惠窄。然后開(kāi)始切換線程蒸眠,在另一線程中,從queue
里取出數(shù)據(jù)睬捶,push
給下游Observer
-
onError()
onComplete()
除了和RxJava2 源碼解析(一)提到的一樣特性之外黔宛,也是將錯(cuò)誤/完成信息先保存,切換線程后再發(fā)送。 - 所以
observeOn()
影響的是其下游的代碼臀晃,且多次調(diào)用仍然生效觉渴。 - 因?yàn)槠淝?strong>換線程代碼是在
Observer
里onXXX()
做的,這是一個(gè)主動(dòng)的push行為(影響下游)徽惋。 - 關(guān)于多次調(diào)用生效問(wèn)題案淋。對(duì)比
subscribeOn()
切換線程是在subscribeActual()
里做的,只是主動(dòng)切換了上游的訂閱線程险绘,從而影響其發(fā)射數(shù)據(jù)時(shí)所在的線程踢京。而直到真正發(fā)射數(shù)據(jù)之前,任何改變線程的行為宦棺,都會(huì)生效(影響發(fā)射數(shù)據(jù)的線程)瓣距。所以subscribeOn()
只生效一次。observeOn()
是一個(gè)主動(dòng)的行為代咸,并且切換線程后會(huì)立刻發(fā)送數(shù)據(jù)蹈丸,所以會(huì)生效多次.
轉(zhuǎn)載請(qǐng)標(biāo)明出處:
http://www.reibang.com/p/6ef45f8ee79d
本文出自:【張旭童的簡(jiǎn)書(shū)】 (http://www.reibang.com/users/8e91ff99b072/latest_articles)
總結(jié)
本文帶大家走讀分析了三個(gè)東西:
map操作符原理:
- 內(nèi)部對(duì)上游
Observable
進(jìn)行訂閱 - 內(nèi)部訂閱者接收到數(shù)據(jù)后,將數(shù)據(jù)轉(zhuǎn)換呐芥,發(fā)送給下游
Observer
. -
操作符返回的
Observable
和其內(nèi)部訂閱者逻杖、是裝飾者模式的體現(xiàn)。 - 操作符數(shù)據(jù)變換的操作思瘟,也是發(fā)生在訂閱后荸百。
線程調(diào)度subscribeOn()
:
- 內(nèi)部先切換線程,在切換后的線程中對(duì)上游
Observable
進(jìn)行訂閱滨攻,這樣上游發(fā)送數(shù)據(jù)時(shí)就是處于被切換后的線程里了够话。 - 也因此多次切換線程,最后一次切換(離源數(shù)據(jù)最近)的生效铡买。
- 內(nèi)部訂閱者接收到數(shù)據(jù)后更鲁,直接發(fā)送給下游
Observer
. - 引入內(nèi)部訂閱者是為了控制線程(dispose)
-
線程切換發(fā)生在
Observable
中。
線程調(diào)度observeOn()
:
- 使用裝飾的
Observer
對(duì)上游Observable
進(jìn)行訂閱 - 在
Observer
中onXXX()
方法里奇钞,將待發(fā)送數(shù)據(jù)存入隊(duì)列澡为,同時(shí)請(qǐng)求切換線程處理真正push數(shù)據(jù)給下游。 - 多次切換線程景埃,都會(huì)對(duì)下游生效媒至。
源碼里那些實(shí)現(xiàn)了Runnable
的類(lèi)或者匿名內(nèi)部類(lèi),最終并沒(méi)有像往常那樣被丟給Thread
類(lèi)執(zhí)行谷徙。
而是先切換線程拒啰,再直接執(zhí)行Runnable
的run()
方法。
這也加深了我對(duì)面向?qū)ο笸昊郏瑢?duì)抽象谋旦、Runnable
的理解,它就是一個(gè)簡(jiǎn)簡(jiǎn)單單的接口,里面就一個(gè)簡(jiǎn)簡(jiǎn)單單的run()
册着,
我認(rèn)為拴孤,之所以有Runnable
,只是抽象出 一個(gè)可運(yùn)行的任務(wù)的概念甲捏。
也許這句話很平淡演熟,書(shū)上也會(huì)提到,各位大佬早就知道司顿,但是如今我順著RxJava2的源碼這么走讀了一遍芒粹,確真真切切的感受到了這些設(shè)計(jì)思想的美妙。