前言
16年 的時(shí)候?qū)戇^(guò)兩篇關(guān)于Rxjava 1.0 的源碼分析,時(shí)過(guò)境遷公给,現(xiàn)在早已是2.0 了借帘。2.0 的代碼邏輯,封裝淌铐,更為易懂肺然,也包含了 一些新特性背壓,面向切面等等腿准。所以決定际起,寫(xiě)篇文章分析RxJava 2.0
關(guān)于RxJava,從表面上看起來(lái)很容易使用吐葱,但是如果理解不夠深刻街望,使用過(guò)程中,往往會(huì)出現(xiàn)一些問(wèn)題弟跑,所以我寫(xiě)了系列文章灾前,從入門(mén)到精通,從簡(jiǎn)單的使用到部分源碼詳解孟辑,希望能給讀者一個(gè)質(zhì)的飛躍:
1哎甲、RxJava之一——一次性學(xué)會(huì)使用RxJava RxJava簡(jiǎn)單的使用和使用它的好處
2、RxJava之二——Single和Subject 與Observable舉足輕重的類饲嗽,雖然用的少炭玫,但應(yīng)該知道
3、RxJava之三——RxJava 2.0 全部操作符示例
4貌虾、RxJava之四—— Lift()詳解 想要了解Operators础嫡,Lift()一定要學(xué)習(xí)
5、RxJava之五—— observeOn()與subscribeOn()的詳解Scheduler線程切換的原理
6、RxJava之六——RxBus 通過(guò)RxJava來(lái)替換EventBus
7榴鼎、RxJava之七——RxJava 2.0 圖文分析create()伯诬、 subscribe()、map()巫财、observeOn()盗似、subscribeOn()源碼 這張圖可能是全網(wǎng)最詳細(xì) 明了的圖
Rxjava2.x 與1.x 的相關(guān)文章:
關(guān)于 RxJava 最友好的文章—— RxJava 2.0 全新來(lái)襲
官方文檔:What's different in 2.0
RxJava github
示例
Rxjava的使用流程,相信大家都很清楚了平项,以下面這個(gè)簡(jiǎn)單的demo赫舒,重點(diǎn)分析一下create()、 subscribe()闽瓢、map()接癌、observeOn()、subscribeOn()源碼扣讼。 只要了解這些源碼缺猛,再去看其他的類都會(huì)有似曾相識(shí)的感覺(jué)
Observable.create(object : ObservableOnSubscribe<Int> {
override fun subscribe(emitter: ObservableEmitter<Int>) {
emitter.onNext(1)
}})
.map(object : Function<Int, String> {
override fun apply(t: Int): String {
return t.toString()
}}
)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(object : Observer<String> {
override fun onComplete() {
}
override fun onSubscribe(d: Disposable) {
}
override fun onNext(t: String) {
}
override fun onError(e: Throwable) {
}
})
在看源代碼時(shí),被其中一層一層的封裝和調(diào)用椭符,類名搞的暈暈的荔燎,一不留神就不知道誰(shuí)是誰(shuí)。
我把源代碼的簡(jiǎn)單流程 销钝,畫(huà)了個(gè)圖有咨,先腦子里有個(gè)整體的輪廓,再去看代碼蒸健,會(huì)清晰很多
關(guān)于流程圖的介紹:
這張圖座享,可能是全網(wǎng)關(guān)于rxjava2 大體流程,最詳細(xì)的圖似忧。為了排版和方便理解渣叛,簡(jiǎn)化了函數(shù)的關(guān)系,忽略了很多細(xì)節(jié)
綠色模塊 表示訂閱者橡娄,例如
Observable
或Flowable
诗箍,或者他們的子類藍(lán)色模塊 表示觀察者癣籽,例如
Observer
或Subscriber
挽唉,或者他們的子類青色模塊 表示數(shù)據(jù)發(fā)送,例如:
ObservableOnSubscribe
筷狼,ObservableSource
瓶籽,等等黃色模塊 表示切換了線程
每個(gè)模塊右上角表示當(dāng)前類的名稱
綠色的線 表示函數(shù)調(diào)用
藍(lán)色的線 表示訂閱
紅色的線 表示數(shù)據(jù)發(fā)送
黑色的線 表示對(duì)象是什么或者對(duì)象從哪里來(lái)的
基本流程
還是以上面的demo為例,拋開(kāi)操作符埂材,只分析主要流程
先來(lái)看下塑顺,demo中都使用哪些類和接口:
- Observable 訂閱者
- Observer 觀察者
- ObservableOnSubscribe 數(shù)據(jù)發(fā)送源
public interface ObservableOnSubscribe<T> {
// ObservableEmitter 也是一個(gè)接口,它繼承接口Emitter
void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}
//這個(gè)接口,有常見(jiàn)的幾種函數(shù)严拒。在demo中可以看到扬绪,調(diào)用的oNext 就是這個(gè)接口的函數(shù)
public interface Emitter<T> {
void oNext(@NonNull T value);
void onError(@NonNull Throwable error);
void onComplete();
}
下面一步一步進(jìn)入源碼:
create
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
// 判斷非空
ObjectHelper.requireNonNull(source, "source is null");
//RxJavaPlugins 相當(dāng)于是切面編程,會(huì)對(duì)所有的中間訂閱者進(jìn)行自定義修改裤唠,如果沒(méi)有設(shè)置過(guò)挤牛。就直接當(dāng)前參數(shù)
//創(chuàng)建一個(gè) ObservableCreate,它就是上圖中的綠色模塊
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
來(lái)看一個(gè)RxJavaPlugins.onAssembly
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
// onObservableAssembly 需要手動(dòng)設(shè)置种蘸,實(shí)現(xiàn)切面效果
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
//如果沒(méi)有設(shè)置 就是null墓赴,直接返回參數(shù)source
if (f != null) {
return apply(f, source);
}
return source;
}
create 傳入的是ObservableOnSubscribe
類型參數(shù),需要的Observable
返回類型航瞭,這里的ObservableCreate 繼承了Observable
并實(shí)現(xiàn)接口subscribeActual
诫硕,引用了ObservableOnSubscribe
并調(diào)用它的接口subscribe
,所以它算是一種適配器模式刊侯。
public final class ObservableCreate<T> extends Observable<T> {
// 每個(gè)Observable 的實(shí)現(xiàn)類章办,都有一個(gè)source,表示的是上游
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
// subscribe 最終會(huì)調(diào)用到這里
// observer 是觀察者滔吠,它里面實(shí)現(xiàn)了onNext纲菌、onSubscribe 等 這些函數(shù)
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//調(diào)用了觀察者的onSubscribe 方法
observer.onSubscribe(parent);
try {
//這個(gè)上游source 就是demo中的ObservableOnSubscribe 實(shí)現(xiàn),
//調(diào)用了subscribe疮绷,以本例來(lái)說(shuō)翰舌,就會(huì)執(zhí)行 emitter.onNext(1)
// 那么問(wèn)題來(lái)了,subscribeActual 會(huì)在什么時(shí)候執(zhí)行呢冬骚?
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
subscribe
需要注意椅贱,代碼執(zhí)行到這里,是誰(shuí)調(diào)用了subscribe只冻,create() 返回的是Observable
類型 ObservableCreate
庇麦,所以是ObservableCreate
調(diào)用了subscribe
,那么關(guān)于Observable
的接口喜德,自然也會(huì)調(diào)用到ObservableCreate
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
// 面向切面的設(shè)置山橄,如果沒(méi)有設(shè)置,直接返回observer
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
//以本例來(lái)看舍悯,就是執(zhí)行ObservableCreate 中的subscribeActual
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
... 省略代碼 ...
}
}
ObservableCreate 類
// subscribe 最終會(huì)調(diào)用到這里
// observer 是觀察者航棱,它里面實(shí)現(xiàn)了onNext、onSubscribe 等 這些函數(shù)
@Override
protected void subscribeActual(Observer<? super T> observer) {
// 把觀察者萌衬,封裝成CreateEmitter
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//調(diào)用了觀察者的onSubscribe 方法
observer.onSubscribe(parent);
try {
//這個(gè)上游source 就是demo中的ObservableOnSubscribe 實(shí)現(xiàn)饮醇,
//調(diào)用了subscribe,以本例來(lái)說(shuō)秕豫,就會(huì)執(zhí)行 emitter.onNext(1)朴艰,也就是執(zhí)行了parent.onNext(1)
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
最后進(jìn)入CreateEmitter
來(lái)看一下:
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
//demo 中的observer 最終被傳遞到這里
this.observer = observer;
}
// 在subscribeActual 調(diào)用到這里
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
//如果沒(méi)有disposed ,就會(huì)執(zhí)行demo中observer的onNext
observer.onNext(t);
}
}
... 省略代碼 ...
@Override
public void onComplete() {
//如果沒(méi)有被dispose,會(huì)調(diào)用Observer的onComplete()方法
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
//執(zhí)行完調(diào)用dispose
dispose();
}
}
}
... 省略代碼 ...
}
源碼看到這里祠墅,再去看上圖侮穿,思路應(yīng)該更清晰,但是應(yīng)該對(duì)整圖還不是很了解毁嗦。
仔細(xì)總結(jié)一下會(huì)發(fā)現(xiàn)撮珠,在create后,創(chuàng)建了ObservableCreate 金矛,他知道上游(source)芯急,知道被訂閱后的處理方式(subscribeActual )也就是如何把數(shù)據(jù)發(fā)給下游,但是它需要等待調(diào)用subscribe驶俊,才會(huì)最終觸發(fā)這個(gè)流程娶耍。
而且ObservableCreate 是繼承于Observable ,對(duì)設(shè)計(jì)模式敏感的小伙伴饼酿,可能會(huì)想到裝飾著模式榕酒,沒(méi)錯(cuò),所謂的操作符故俐,無(wú)非就是用裝飾著模式包裹一層想鹰,讓他也知道上游(source),知道如何數(shù)據(jù)發(fā)給下游(實(shí)現(xiàn)subscribeActual )药版,最終subscribe 一調(diào)用辑舷,這個(gè)過(guò)程就被觸發(fā)阵苇。
如果感覺(jué)混亂讶舰,沒(méi)關(guān)系,下面跟著源碼走一下蕴坪,就會(huì)豁然開(kāi)朗
map操作符
調(diào)用map还栓,會(huì)執(zhí)行下面的函數(shù)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
//發(fā)現(xiàn)了嗎碌廓,上面的create 創(chuàng)建ObservableCreate , map 創(chuàng)建了 ObservableMap
//沒(méi)錯(cuò)他們都是Observable的子類
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
有了新的Observable (ObservableMap)剩盒,那肯定得有新的Observer谷婆,不然ObservableMap和誰(shuí)關(guān)聯(lián)呢?是的辽聊,新的Observer就是 MapObserver
//AbstractObservableWithUpstream 繼承于Observable纪挎,有成員變量source
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
//這里傳入的是this,也就是本例中的ObservableCreate 身隐,它繼承于Observable 繼承于ObservableSource
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
// 把上游的源保存起來(lái)
super(source);
//保存當(dāng)前的操作函數(shù)廷区,也就是demo 中map 里實(shí)現(xiàn)的函數(shù)
this.function = function;
}
// 太熟悉了唯灵,又是它贾铝,等ObservableMap 調(diào)用subscribe 的時(shí)候,會(huì)調(diào)用到這里
@Override
public void subscribeActual(Observer<? super U> t) {
// 調(diào)用了上游(source)的subscribe,這就相當(dāng)于觸發(fā)了上游
// 傳入的參數(shù)是MapObserver垢揩,聯(lián)想到上面的subscribe 直接訂閱Observer
// 這個(gè)MapObserver繼承Observer玖绿,它的參數(shù)t 也是Observer,也就是把下游的Observer 包裹了一層叁巨,傳遞給上游斑匪。這正是裝飾者模式
// 本例中emitter.onNext(1)執(zhí)行后,也就是會(huì)執(zhí)行MapObserver中的onNext
source.subscribe(new MapObserver<T, U>(t, function));
}
//BasicFuseableObserver 繼承了Observer锋勺,有成員變量downstream蚀瘸、upstream 等
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
// 把下游的Observer 保存在downstream 中
super(actual);
// 保存map 變換操作
this.mapper = mapper;
}
// 本例中,onNext 是在emitter.onNext(1) 執(zhí)行后庶橱,調(diào)用到這里的
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
//目標(biāo)類型
U v;
try {
// 實(shí)現(xiàn)了變化操作贮勃,把t 轉(zhuǎn)為 v
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
//調(diào)用了下游onNext,繼續(xù)分發(fā)數(shù)據(jù)苏章,此時(shí)的數(shù)據(jù)是轉(zhuǎn)換后的目標(biāo)數(shù)據(jù)
downstream.onNext(v);
}
@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}
@Nullable
@Override
public U poll() throws Exception {
T t = qd.poll();
return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
}
}
}
此時(shí)再去看看上面的圖寂嘉,是不是有點(diǎn)感覺(jué)了,沒(méi)錯(cuò)枫绅,全部都是這個(gè)套路泉孩,只是每次在裝飾的時(shí)候,行為不一樣
subscribeOn操作符
作用是控制subscribe
的線程并淋,下面來(lái)看看subscribeOn
是如何實(shí)現(xiàn)的
不仔細(xì)看代碼寓搬,還以為作者直接把實(shí)現(xiàn)map的代碼拷貝了一份,簡(jiǎn)直太相似了
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
//使用原來(lái)的Observable 和調(diào)度線程县耽,創(chuàng)建一個(gè)新的Observable订咸,就是ObservableSubscribeOn
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
依舊是創(chuàng)建新的Observable(ObservableSubscribeOn) 和 Observer (SubscribeOnObserver)
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
// 依舊是保存了上游 ObservableSource
super(source);
// 保存了線程調(diào)度的Scheduler
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> observer) {
// 使用裝飾著模式,把原來(lái)的Observer 封裝了一層
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
//注意酬诀,這里還沒(méi)有切換新城脏嚷,調(diào)用了onSubscribe
observer.onSubscribe(parent);
//這里是重點(diǎn),
//scheduler.scheduleDirect(new SubscribeTask(parent)) 在新線程中執(zhí)行parent瞒御,
//parent.setDisposable 把新線程任務(wù)父叙,加入到DisposableHelper,如果手動(dòng)dispose后肴裙,保證線程可以停止
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> downstream;
final AtomicReference<Disposable> upstream;
SubscribeOnObserver(Observer<? super T> downstream) {
this.downstream = downstream;
this.upstream = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable d) {
DisposableHelper.setOnce(this.upstream, d);
}
// 下面的常規(guī)操作方法趾唱,就是調(diào)用downstream 的各個(gè)操作方法
@Override
public void onNext(T t) {
downstream.onNext(t);
}
@Override
public void onError(Throwable t) {
downstream.onError(t);
}
@Override
public void onComplete() {
downstream.onComplete();
}
@Override
public void dispose() {
DisposableHelper.dispose(upstream);
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
void setDisposable(Disposable d) {
//把當(dāng)前
DisposableHelper.setOnce(this, d);
}
}
// 實(shí)現(xiàn)了線程的Runnable 接口,目的是讓線程可以調(diào)度
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
// 會(huì)在新的線程中調(diào)用蜻懦,source是上游的Observable
source.subscribe(parent);
}
}
}
截止到這里甜癞,subscribeOn總體的邏輯已經(jīng),搞清楚了宛乃,再深入一點(diǎn)看一下scheduler.scheduleDirect(new SubscribeTask(parent))
是如何實(shí)現(xiàn)線程切換的
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
//調(diào)用scheduleDirect悠咱,參數(shù)表示立即執(zhí)行
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
//創(chuàng)建了一個(gè)worker蒸辆,注意這里的createWorker(),是一個(gè)抽象方法析既,不同的線程躬贡,創(chuàng)建的worker 不一樣。
// 例如:Schedulers.io() 創(chuàng)建的是 EventLoopWorker
final Worker w = createWorker();
// 依舊是切面編程眼坏,對(duì)每個(gè)切換線程的拂玻, 包裹一層
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//把worker 和 Runnable 封裝成DisposeTask ,方便外界調(diào)用disposed宰译,來(lái)停止它的運(yùn)行
DisposeTask task = new DisposeTask(decoratedRun, w);
//立即開(kāi)始執(zhí)行
w.schedule(task, delay, unit);
return task;
}
在深入看一下上面的DisposeTask檐蚜,是如何封裝的。
這段代碼沿侈,我多說(shuō)一句熬甚,你都會(huì)嫌我啰嗦
static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection {
@NonNull
final Runnable decoratedRun;
@NonNull
final Worker w;
@Nullable
Thread runner;
DisposeTask(@NonNull Runnable decoratedRun, @NonNull Worker w) {
this.decoratedRun = decoratedRun;
this.w = w;
}
@Override
public void run() {
runner = Thread.currentThread();
try {
decoratedRun.run();
} finally {
dispose();
runner = null;
}
}
@Override
public void dispose() {
if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
((NewThreadWorker)w).shutdown();
} else {
w.dispose();
}
}
@Override
public boolean isDisposed() {
return w.isDisposed();
}
@Override
public Runnable getWrappedRunnable() {
return this.decoratedRun;
}
}
總結(jié):
現(xiàn)在你再結(jié)合上圖來(lái)理解一下,是不是瞬間感覺(jué)肋坚,WC乡括,這么簡(jiǎn)單的代碼,也有人寫(xiě)博客發(fā)布
懂了subscribeOn的源碼智厌,那么想想诲泌,大家經(jīng)常討論的多個(gè)subscribeOn 調(diào)用,線程切換的問(wèn)題铣鹏,兩句話敷扫。
- subscribeOn 對(duì)上游訂閱有效
- 最上面的subscribeOn 對(duì)發(fā)送數(shù)據(jù)有效
observeOn操作符
控制 onNext
,onComplete
等數(shù)據(jù)消費(fèi)方法的線程诚卸,
下面的分析略有差異葵第,但總體和上面還是一樣的,你也可以自行read fuck source code 合溺,相信你的印象會(huì)更加深刻
public final Observable<T> observeOn(Scheduler scheduler) {
//為什么這里卒密,有bufferSize()
//這就是Rxjava 2 所添加背壓的概念,rxjava 1 會(huì)有一個(gè)問(wèn)題棠赛,就是如果上游一直發(fā)送數(shù)據(jù)onNext 哮奇,拼命的調(diào)用,
// 但是onNext() 處理的不夠快睛约,就會(huì)出現(xiàn)棧溢出鼎俘。這個(gè)bufferSize 就是控制了,緩沖區(qū)的大小
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
//重點(diǎn)ObservableObserveOn
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
創(chuàng)建新的Observable (ObservableObserveOn) 和 Observer(ObserveOnObserver)
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
//保存上游Observable
super(source);
//保存調(diào)度線程
this.scheduler = scheduler;
//
this.delayError = delayError;
//緩沖區(qū)
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
//TrampolineScheduler 官方文檔的解釋 * Schedules work on the current thread but does not execute immediately. Work is put in a queue and executed after the current unit of work is completed.
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
// 創(chuàng)建一個(gè)worker辩涝,線程的調(diào)度就是在這里
Scheduler.Worker w = scheduler.createWorker();
// 因?yàn)閛bserveOn 是控制處理數(shù)據(jù)的線程贸伐,所以在訂閱的時(shí)候,不去切換線程
// 把線程worker怔揩,緩沖區(qū)捉邢,下游observer 都都封裝進(jìn)ObserveOnObserver 脯丝,等到onSubscribe,onNext 調(diào)用到來(lái)是歌逢,在去切換線程去處理
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
... 省略 ObserveOnObserver 代碼 ...
}
下面來(lái)看一下 ObserveOnObserver
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
private static final long serialVersionUID = 6576896619930983584L;
final Observer<? super T> downstream;
final Scheduler.Worker worker;
final boolean delayError;
final int bufferSize;
SimpleQueue<T> queue;
Disposable upstream;
Throwable error;
volatile boolean done;
volatile boolean disposed;
int sourceMode;
boolean outputFused;
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.downstream = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
// 上游的Observer 被封裝成了QueueDisposable,那么使用這里面的sourceMode 翘狱,queue 等信息
if (d instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<T> qd = (QueueDisposable<T>) d;
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
if (m == QueueDisposable.SYNC) {
sourceMode = m;
queue = qd;
done = true;
downstream.onSubscribe(this);
schedule();
return;
}
if (m == QueueDisposable.ASYNC) {
sourceMode = m;
queue = qd;
downstream.onSubscribe(this);
return;
}
}
//大多數(shù)情況會(huì)到這里創(chuàng)建一個(gè)隊(duì)列秘案,使用指定的緩沖區(qū)
queue = new SpscLinkedArrayQueue<T>(bufferSize);
downstream.onSubscribe(this);
}
}
@Override
public void onNext(T t) {
//執(zhí)行過(guò)error / complete 會(huì)是true
if (done) {
return;
}
//如果不是異步的,就把數(shù)據(jù)放到隊(duì)列中
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
//開(kāi)始執(zhí)行
schedule();
}
@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
error = t;
done = true;
schedule();
}
@Override
public void onComplete() {
if (done) {
return;
}
done = true;
schedule();
}
@Override
public void dispose() {
if (!disposed) {
// 結(jié)束運(yùn)行
disposed = true;
upstream.dispose();
worker.dispose();
if (getAndIncrement() == 0) {
queue.clear();
}
}
}
@Override
public boolean isDisposed() {
return disposed;
}
void schedule() {
//該類繼承了潦匈,AtomicInteger阱高,多線程下保證worker值開(kāi)啟一個(gè)線程
//有可能調(diào)用onNext 是在不同的線程
if (getAndIncrement() == 0) {
// this 是一個(gè)Runable 接口,在新線程中執(zhí)行
worker.schedule(this);
}
}
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = downstream;
for (;;) {
//判斷是否需要結(jié)束
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
//隊(duì)列中取出一個(gè)數(shù)據(jù)
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
disposed = true;
upstream.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
//判斷是否需要結(jié)束
if (checkTerminated(d, empty, a)) {
return;
}
//隊(duì)列是否為空
if (empty) {
break;
}
//發(fā)送數(shù)據(jù)給下游
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
void drainFused() {
int missed = 1;
for (;;) {
if (disposed) {
return;
}
boolean d = done;
Throwable ex = error;
if (!delayError && d && ex != null) {
disposed = true;
downstream.onError(error);
worker.dispose();
return;
}
downstream.onNext(null);
if (d) {
disposed = true;
ex = error;
if (ex != null) {
downstream.onError(ex);
} else {
downstream.onComplete();
}
worker.dispose();
return;
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
//上面work.schedule(this) 后茬缩,會(huì)執(zhí)行這里
@Override
public void run() {
if (outputFused) {
//這里是有關(guān)背壓的赤惊,暫時(shí)先不講解
drainFused();
} else {
drainNormal();
}
}
boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
if (disposed) {
queue.clear();
return true;
}
// 是否已經(jīng)完成
if (d) {
Throwable e = error;
//是否分發(fā)Error
if (delayError) {
//隊(duì)列是否為空
if (empty) {
disposed = true;
//是否有錯(cuò)誤信息
if (e != null) {
a.onError(e);
} else {
a.onComplete();
}
worker.dispose();
return true;
}
} else {
if (e != null) {
disposed = true;
queue.clear();
a.onError(e);
worker.dispose();
return true;
} else
if (empty) {
disposed = true;
a.onComplete();
worker.dispose();
return true;
}
}
}
return false;
}
@Override
public int requestFusion(int mode) {
// 這部分也與背壓有關(guān),暫時(shí)不展開(kāi)
if ((mode & ASYNC) != 0) {
outputFused = true;
return ASYNC;
}
return NONE;
}
@Nullable
@Override
public T poll() throws Exception {
return queue.poll();
}
@Override
public void clear() {
queue.clear();
}
@Override
public boolean isEmpty() {
return queue.isEmpty();
}
}
}
總結(jié):
到這里應(yīng)該對(duì)observeOn 也一定的了解凰锡,多個(gè)observeOn 嵌套的問(wèn)題未舟,一句話,只對(duì)下游的Observer有效掂为。再結(jié)合上圖的流程裕膀,subscribeOn 和 observeOn 胡亂嵌套,都能理清楚了勇哗。