一坠七、前提說(shuō)明
本文是在 Rxjava 2.1 的基礎(chǔ)上進(jìn)行的,目前只對(duì) Rxjava 進(jìn)行解析,未搭配 Retrofit 食用舔示,如果想看 Rxjava + Retrofit 源碼解析,請(qǐng)移步 Retrofit 2.1 + Rxjava 源碼解析(一)电抚。
二惕稻、Rxjava 使用栗子
new Thread("子線程"){
@Override
public void run() {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.e(TAG, "Observable#subscribe(): 所在線程為 " + Thread.currentThread().getName());
emitter.onNext("1");
emitter.onComplete();
}
})
// .subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "observer#onSubscribe(): 所在線程為 " + Thread.currentThread().getName());
}
@Override
public void onNext(String s) {
Log.e(TAG, "observer#onNext(): 所在線程為 " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.e(TAG, "observer#onComplete(): 所在線程為 " + Thread.currentThread().getName());
}
});
}
}.start();
輸出結(jié)果:
E/Rxjava: observer#onSubscribe(): 所在線程為 子線程
E/Rxjava: Observable#subscribe(): 所在線程為 子線程
E/Rxjava: observer#onNext(): 所在線程為 RxCachedThreadScheduler-1
E/Rxjava: observer#onComplete(): 所在線程為 RxCachedThreadScheduler-1
在 Rxjava2.1訂閱流程解析 中我們已經(jīng)分析了 Observable.create()
的過程,就是構(gòu)建一個(gè) ObservableCreate 對(duì)象蝙叛,ObservableCreate 是 Observable 的子類俺祠。
由上文可以知道,當(dāng)調(diào)用了 subscribe()
后,會(huì)執(zhí)行以下順序:Observable.subscribe(Observer) -> ObservableCreate.subscribeActual(Observer) -> Observer#onSubscribe()
蜘渣,所以可以知道 Observer#onSubscribe()
的執(zhí)行線程是當(dāng)前線程妓布,即調(diào)用了 subscribe()
的線程。
三宋梧、Observable.observeOn(Schedulers.io())
從上面栗子可以看到匣沼,如果我們只是調(diào)用了 observeOn(Schedulers.io())
,這樣影響的是 observer 的 onNext() 和 onComplete()
捂龄,對(duì)于 ObservableOnSubscribe#subscribe() 和 Observer#onSubscribe()
是沒有影響的释涛。
我們看看 Observable.observeOn(Schedulers.io())
的源碼:
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
//刪除無(wú)關(guān)緊要的代碼
//這里的 this 是 ObservableCreate 對(duì)象
return new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize);
}
可以看到就是將傳入的 ObservableCreate 對(duì)象封裝進(jìn)了 ObservableObserveOn 對(duì)象中,可以肯定的是 ObservableObserveOn 也是 Observable 的子類倦沧。
我們從上文得知唇撬,接下來(lái)會(huì)調(diào)用 observable.subscribe(observer)
的時(shí)候會(huì)跳轉(zhuǎn)調(diào)用 Observable 子類的 ObservableObserveOn.subscribeActual(observer)
方法。這其實(shí)是用了靜態(tài)工廠模式展融。
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
// 如果傳入的 scheduler 是 Scheduler.trampoline() 的情況
// 該線程的意義是傳入當(dāng)前線程窖认,也就是不做任何線程切換操作
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
//這里的 source 是 ObservableCreate 對(duì)象
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
}
當(dāng)需要切換線程的時(shí)候,可以看到將傳進(jìn)來(lái)的 ObservableCreate 對(duì)象進(jìn)行了訂閱告希,只不過觀察者又被封裝成了 ObserveOnObserver 對(duì)象扑浸。這樣就會(huì)執(zhí)行 ObservableCreate#subscribeActual()
。
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
//這里的 observer 就是 ObserveOnObserver 對(duì)象
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//這里的 observer 就是 ObserveOnObserver 對(duì)象
observer.onSubscribe(parent);
try {
//這里的額 source 就是我們?cè)谧钔鈱觿?chuàng)建的 ObservableOnSubscribe 對(duì)象
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
}
這里可以看到使用了 observeOn(Schedulers.io())
方法燕偶,但是 Observer#onSubscribe(Disposable d) 并沒有切換線程喝噪,仍在當(dāng)前線程中運(yùn)行。也就是 ObserveOnObserver.onSubscribe() 是運(yùn)行在當(dāng)前線程的指么。我們看看這個(gè)方法做了什么:
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable {
private static final long serialVersionUID = 6576896619930983584L;
//真正的觀察者酝惧,最外層我們創(chuàng)建的 observer
final Observer<? super T> actual;
final Scheduler.Worker worker;
Disposable s;
......
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.actual = actual;
this.worker = worker;
}
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
if (s instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<T> qd = (QueueDisposable<T>) s;
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
if (m == QueueDisposable.SYNC) {
sourceMode = m;
queue = qd;
done = true;
//執(zhí)行真正的被觀察者 Observer(最外層我們創(chuàng)建的 observer)#onSubscribe()
actual.onSubscribe(this);
schedule();
return;
}
if (m == QueueDisposable.ASYNC) {
sourceMode = m;
queue = qd;
//執(zhí)行真正的被觀察者 Observer(最外層我們創(chuàng)建的 observer)#onSubscribe()
actual.onSubscribe(this);
return;
}
}
queue = new SpscLinkedArrayQueue<T>(bufferSize);
//執(zhí)行真正的被觀察者 Observer(最外層我們創(chuàng)建的 observer)#onSubscribe()
actual.onSubscribe(this);
}
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
error = t;
done = true;
schedule();
}
@Override
public void onComplete() {
if (done) {
return;
}
done = true;
schedule();
}
@Override
public void dispose() {
if (!cancelled) {
cancelled = true;
s.dispose();
worker.dispose();
if (getAndIncrement() == 0) {
queue.clear();
}
}
}
void schedule() {
if (getAndIncrement() == 0) {
//this 就是 ObserveOnObserver 對(duì)象
worker.schedule(this);
}
}
}
可以看到 ObserveOnObserver#onSubscribe(Disposable s)
中一定會(huì)調(diào)用 actual.onSubscribe(this);
,其中這個(gè) this 就是 ObserveOnObserver 對(duì)象伯诬,也就是讓我們最外層的 observer 訂閱了 ObserveOnObserver晚唇。
可以看到在 RxJava 中運(yùn)用的操作符都會(huì)在內(nèi)部創(chuàng)建一個(gè) Observable 和 Observer,所以外界使用起來(lái)和簡(jiǎn)單盗似,但是里面運(yùn)行的原理倒是挺復(fù)雜的哩陕,容易讓人混淆。
運(yùn)行完 ObserveOnObserver#onSubscribe(Disposable s)
后桥言,就輪到了 source.subscribe(parent);(這里的額 source 就是我們?cè)谧钔鈱觿?chuàng)建的 ObservableOnSubscribe 對(duì)象)
萌踱,也就是說(shuō)我們的 ObservableOnSubscribe#subscribe(emitter)
運(yùn)行在當(dāng)前線程葵礼。到這里的分析都很符合我們打印的結(jié)果号阿。
而我們?cè)谧钔鈱樱皇亲尠l(fā)射器 emitter 簡(jiǎn)單地發(fā)送了一個(gè) Next 事件鸳粉。這個(gè)事件會(huì)被誰(shuí)接收呢扔涧?
static final class CreateEmitter<T> extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
//這里的 observer 就是 ObserveOnObserver 對(duì)象
this.observer = observer;
}
@Override
public void onNext(T t) {
//這里的 observer 就是 ObserveOnObserver 對(duì)象
if (!isDisposed()) {
observer.onNext(t);
}
}
......
}
我們從之前調(diào)用到 ObservableCreate#subscribeActual()
可以知道,當(dāng)時(shí)傳進(jìn)來(lái)的 parent 是 ObserveOnObserver 對(duì)象。所以發(fā)射器 emitter 發(fā)射的事件會(huì)被 ObserveOnObserver 接收枯夜。
可以看到 ObserveOnObserver.onNext()
中最后執(zhí)行了 schedule()
弯汰,也就是在這里進(jìn)行了線程切換的操作。
由于我們傳入的 Scheduler 是 IO 線程湖雹,我們看看這個(gè) IO Schedule 的 worker.schedule(this)
咏闪。
一路追蹤,終于找到了這個(gè) IOScheduler 的廬山真面目:
public final class IoScheduler extends Scheduler {
private static final String WORKER_THREAD_NAME_PREFIX = "RxCachedThreadScheduler";
static final RxThreadFactory WORKER_THREAD_FACTORY;
private static final String EVICTOR_THREAD_NAME_PREFIX = "RxCachedWorkerPoolEvictor";
static final RxThreadFactory EVICTOR_THREAD_FACTORY;
private static final long KEEP_ALIVE_TIME = 60;
private static final TimeUnit KEEP_ALIVE_UNIT = TimeUnit.SECONDS;
static final ThreadWorker SHUTDOWN_THREAD_WORKER;
final ThreadFactory threadFactory;
final AtomicReference<CachedWorkerPool> pool;
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
......
static final class EventLoopWorker extends Scheduler.Worker {
private final CompositeDisposable tasks;
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;
final AtomicBoolean once = new AtomicBoolean();
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}
@Override
public void dispose() {
if (once.compareAndSet(false, true)) {
tasks.dispose();
// releasing the pool should be the last action
pool.release(threadWorker);
}
}
@Override
public boolean isDisposed() {
return once.get();
}
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
//放到線程池中執(zhí)行
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
}
......
}
至此摔吏,你可以看到調(diào)用了 observerOn()
方法的全過程鸽嫂,只是會(huì)改變觀察者 observer 的 onNext()、onComplete()
方法的運(yùn)行線程龙助,不會(huì)改變被觀察者 Observable 的運(yùn)行線程抠藕。
四桦卒、observeOn() 切換線程原理小結(jié)
看完整個(gè)過程,我們知道當(dāng)我們使用 observeOn(Schedulers.io())
的時(shí)候癣籽,其實(shí) Rxjava 在內(nèi)部幫我們創(chuàng)建封裝了若干個(gè)中間對(duì)象的 Observable 和 Observer。然后將這個(gè)訂閱操作放在 Rxjava 的線程池進(jìn)行滤祖,達(dá)到切換線程的功能筷狼。
被觀察者 Observable 的變化過程:Observable ==> ObservableCreate ==> ObserbvableObserveOn。
觀察者 Observer 的變化過程:Observer ==> ObserveOnObserver匠童,然后傳到 ObservableEmitter<String> emitter 里面桑逝,作為發(fā)射器的 observer 成員變量。
總之俏让,Observable#observeOn(Scheduler) 的實(shí)現(xiàn)原理在于將目標(biāo) Observer 的 onNext(T)/onError(Throwable)/onComplete() 置于指定線程中運(yùn)行楞遏。
五、subscribeOn() 栗子
new Thread("子線程"){
@Override
public void run() {
Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.e(TAG, "ObservableOnSubscribe#subscribe(): 所在線程為 " + Thread.currentThread().getName());
emitter.onNext("1");
emitter.onComplete();
}
})
.subscribeOn(Schedulers.io())
// .observeOn(Schedulers.io())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "observer#onSubscribe(): 所在線程為 " + Thread.currentThread().getName());
}
@Override
public void onNext(String s) {
Log.e(TAG, "observer#onNext(): 所在線程為 " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.e(TAG, "observer#onComplete(): 所在線程為 " + Thread.currentThread().getName());
}
});
}
}.start();
輸出結(jié)果:
E/Rxjava: observer#onSubscribe(): 所在線程為 子線程
E/Rxjava: ObservableOnSubscribe#subscribe(): 所在線程為 RxCachedThreadScheduler-2
E/Rxjava: observer#onNext(): 所在線程為 RxCachedThreadScheduler-2
E/Rxjava: observer#onComplete(): 所在線程為 RxCachedThreadScheduler-2
六首昔、ObservableCreate.subscribeOn()
由上文可以寡喝,Observable.create()
會(huì)生成一個(gè) ObservableCreate 對(duì)象。我們看看 ObservableCreate.subscribeOn()
勒奇。
public final Observable<T> subscribeOn(Scheduler scheduler) {
//過濾無(wú)關(guān)緊要的代碼
//this 是 ObservableCreate 對(duì)象
return new ObservableSubscribeOn<T>(this, scheduler);
}
可以看到將 ObservableCreate 對(duì)象封裝成了 ObservableSubscribeOn 對(duì)象预鬓,然后就會(huì)執(zhí)行 ObservableSubscribeOn#subscribeActual()
。
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
//這里的 source 是 ObservableCreate 對(duì)象赊颠,scheduler 是 IoScheduler
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
//s 是最外層的 observer
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//調(diào)用 observer#onSubscribe
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
......
}
可以看到 observer#onSubscribe()
仍在在當(dāng)前線程中執(zhí)行格二,之后的 observer 和 ObservableOnSubscribe 的方法都被線程切換類 IoScheduler 切換到了其他線程。
我們看看 IoScheduler 的 scheduler.scheduleDirect(new SubscribeTask(parent)))
竣蹦。
final class SubscribeTask implements Runnable {
//這個(gè) parent 就是 SubscribeOnObserver
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
//source 就是 ObservableCreate 對(duì)象
//parent 就是 SubscribeOnObserver 對(duì)象
source.subscribe(parent);
}
}
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> actual;
final AtomicReference<Disposable> s;
SubscribeOnObserver(Observer<? super T> actual) {
//actual 就是最外層的 observer
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable s) {
DisposableHelper.setOnce(this.s, s);
}
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
@Override
public void dispose() {
DisposableHelper.dispose(s);
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
可以看到將最外層的 observer 包裝成 SubscribeOnObserver 對(duì)象顶猜,然后包裝成一個(gè) SubscribeTask(可以執(zhí)行的任務(wù))。當(dāng)在線程池中被執(zhí)行的時(shí)候痘括,會(huì)執(zhí)行 SubscribeTask#run()
长窄。
我們?cè)倏?IoSchedule#scheduleDirect(subscribeTask)
滔吠。
在 IoSchedule 的父類 Schedule 中找到一個(gè)方法:
public abstract class Scheduler {
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
//其實(shí)就是 EventLoopWorker#schedule()
w.schedule(task, delay, unit);
return task;
}
}
又回到了 IOScheduler 創(chuàng)建的 EventLoopWorker 中:
static final class EventLoopWorker extends Scheduler.Worker {
private final CompositeDisposable tasks;
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;
final AtomicBoolean once = new AtomicBoolean();
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}
@Override
public void dispose() {
if (once.compareAndSet(false, true)) {
tasks.dispose();
// releasing the pool should be the last action
pool.release(threadWorker);
}
}
@Override
public boolean isDisposed() {
return once.get();
}
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
//最終放進(jìn)線程池中執(zhí)行任務(wù)
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
}
static final class ThreadWorker extends NewThreadWorker {
private long expirationTime;
ThreadWorker(ThreadFactory threadFactory) {
super(threadFactory);
this.expirationTime = 0L;
}
public long getExpirationTime() {
return expirationTime;
}
public void setExpirationTime(long expirationTime) {
this.expirationTime = expirationTime;
}
}
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
volatile boolean disposed;
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
......
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
//放進(jìn)線程池中執(zhí)行
f = executor.submit((Callable<Object>)sr);
} else {
//放進(jìn)線程池中執(zhí)行
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
......
}
最終在線程池中執(zhí)行了我們的 SubscribeTask#run()
。其實(shí)從 scheduleActual() 和 subscribeActual()
的命名方式可以看出挠日,Rxjava 很多地方都用到了靜態(tài)工廠模式疮绷,都是父類提供抽象方法,具體的子類根據(jù)需要實(shí)現(xiàn)不同的邏輯嚣潜,這個(gè)就很靈活了冬骚。
我們?cè)倏纯?SubscribeTask#run()
干了什么:
final class SubscribeTask implements Runnable {
//這個(gè) parent 就是 SubscribeOnObserver
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
//source 就是 ObservableCreate 對(duì)象
//parent 就是 SubscribeOnObserver 對(duì)象
source.subscribe(parent);
}
}
**那其實(shí)就是 ObservableCreate.subscribe(SubscribeOnObserver)
,這就又跳到了我們熟悉的 ObservableCreate.subscribeActual()
方法中了懂算。
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
//source 是最外層的 ObservableOnSubscribe 對(duì)象
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
//observer 是 SubscribeOnObserver 對(duì)象唉韭,里面包含最外層的 observer 對(duì)象
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//這里 SubscribeOnObserver 只是簡(jiǎn)單地 set 了一個(gè)引用
observer.onSubscribe(parent);
try {
//其實(shí)就是 ObservableOnSubscribe.subscribe(SubscribeOnObserver);
//此時(shí)已經(jīng)運(yùn)行在 Rxjava 的線程池中
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
......
}
在這里會(huì)先執(zhí)行 observer (SubscribeOnObserver)的 onSubscribe()
方法,這個(gè)方法就 set 了一個(gè)引用犯犁,可以先忽略属愤。接下來(lái)會(huì)調(diào)用 ObservableOnSubscribe.subscribe(SubscribeOnObserver)
。
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.e(TAG, "ObservableOnSubscribe#subscribe(): 所在線程為 " + Thread.currentThread().getName());
emitter.onNext("1");
emitter.onComplete();
}
});
我們?cè)谧钔鈱又话l(fā)送了一個(gè) Next 事件酸役,根據(jù) CreateEmitter<T> 類的源碼:
public void onNext(T t) {
if (!isDisposed()) {
//observer 就是 SubscribeOnObserver
observer.onNext(t);
}
}
SubscribeOnObserver.onNext()
會(huì)觸發(fā):
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> actual;
final AtomicReference<Disposable> s;
SubscribeOnObserver(Observer<? super T> actual) {
//最外層的 observer
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable s) {
DisposableHelper.setOnce(this.s, s);
}
@Override
public void onNext(T t) {
//調(diào)用最外層的 observer#onNext()
actual.onNext(t);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
//調(diào)用最外層的 observer#onComplete()
actual.onComplete();
}
@Override
public void dispose() {
DisposableHelper.dispose(s);
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
所以住诸,我們最外層的 observer 的 onNext() 和 onComplete()
會(huì)運(yùn)行在 Rxjava 的線程池的線程中。
至此涣澡,subscribeOn(Schedulers.io())
的過程分析完畢贱呐,subscribeOn(Schedulers.io())
會(huì)改變觀察者 observer 的 onNext()、onComplete()
方法的運(yùn)行線程入桂,也會(huì)改變被觀察者 Observable 的運(yùn)行線程奄薇。
七、subscribeOn() 切換線程原理小結(jié)
看完整個(gè)過程抗愁,我們知道當(dāng)我們使用 subscribeOn(Schedulers.io())
的時(shí)候馁蒂,其實(shí)跟上面的 observeOn(Schedulers.io()) 過程差不多,Rxjava 幫我們創(chuàng)建了若干個(gè)中間層的 Observable 和 Observer蜘腌,然后將這個(gè)訂閱操作放在 Rxjava 的線程池進(jìn)行沫屡,達(dá)到切換線程的功能。
被觀察者 Observable 的變化過程:Observable ==> ObservableCreate ==> ObserbvableSubscribeOn撮珠。
觀察者 Observer 的變化過程:Observer ==> SubscribeOnObserver沮脖,然后傳到 ObservableEmitter<String> emitter 里面,作為發(fā)射器的 observer 成員變量芯急。
總之勺届,Observable#subscribeOn(Scheduler) 的實(shí)現(xiàn)原理在于將目標(biāo) Observer 的 onNext(T)/onError(Throwable)/onComplete() 和 ObservableOnSubscribe 的 subscribe(T) 置于指定線程中運(yùn)行。
八娶耍、subscribeOn() 和 observeOn(Schedulers.io()) 一起使用
這兩個(gè) api 一起使用其實(shí)也不會(huì)有什么很大的變化免姿,就是 observeOn()
會(huì)影響 Observer 的 onNext(T)/onError(Throwable)/onComplete() 運(yùn)行線程,而 subscribeOn()
會(huì)影響 ObservableOnSubscribe 的 subscribe(T) 運(yùn)行線程伺绽。