先上代碼:
ObservableOnSubscribe<Integer> oos = new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
final int max = 100;
for (int i = 1; i <= max; i++) {
e.onNext(max);
}
e.onComplete();
}
};
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
Observable.create(oos)
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.computation())
.subscribe(observer);
上面是Rxjava最簡單的實現(xiàn)模型盆繁。
從鏈式調(diào)用的返回值來看:
Observable.create()------》ObservableCreate extends Observable
ObservableCreate.observerOn()------->ObservableObserveOn extends AbstractObservableWithUpstream extends Observable
ObservableObserveOn.subscribeOn()------->ObservableSubscribeOn extends AbstractObservableWithUpstream extends Observable
所以最后的調(diào)用對象是
ObservableSubscribeOn.subscribe(observer)
從上面的返回值可以看出中間任一一個的返回值返回的都是observable的子對象。
為什么要強調(diào)中間幾個的返回值都是observable的返回值怜瞒,這里要先明確一下夜焦,待會會大量用到subscribe()方法搏予,在Observable(子類)中的subscribe()方法:
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
//鉤子点把,如果未設(shè)置的話,返回值還是observer
observer = RxJavaPlugins.onSubscribe(this, observer);
//空檢驗
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
//核心
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
...... //handle exception
}
}
因此下面的分析代碼中幌衣,如果是調(diào)用上面4個對象的subscribe()方法的時候矾削,直接看subscribeActual()方法即可。
那就從最后一層 ObservableSubscribeOn 的 subscribeActual() 方法開始分析泼掠。
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
這里的這個s就是我們縮寫的observer
第一行:先把我們的observer封裝成了SubscribeOnObserver
第二行:調(diào)用了observer.onSubscribe()方法
??????????????也就是observer訂閱Observable時候的方法怔软,一般這個時候可以做一些操作
第三行:
parent.setDisposable() 以及scheduler調(diào)度器先不論,待會再分析择镇,這里先看SubscribeTask這個類:
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
看到runnable,熟悉線程的同學(xué)已經(jīng)可以猜到source.subscribe(parent) 這句代碼很可能在子線程中執(zhí)行挡逼,這里先mark一下,待會回到這個地方再具體看腻豌。
這里要先插入一下source和Observer的問題:
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
//后面兩個類繼承抽象類家坎,調(diào)用super(source)方法嘱能。
AbstractObservableWithUpstream(ObservableSource<T> source) {
this.source = source;
}
通過上面的代碼可以看到所有的這三個關(guān)鍵類,source都是通過構(gòu)造傳入進來的虱疏,
而后兩個類都還有schedule參數(shù)惹骂,這個涉及線程調(diào)度,待會也會說做瞪,也mark一下对粪。
通過以上代碼可以分析:
ObservableCreate 的 source 是 oos
ObservableObserveOn 的 source 是 ObservableCreate
ObservableSubscribeOn 的 source 是 ObservableObserveOn
至于Observer,通過代碼可以分析:
(ObservableObserveOn)source
.subscribe(parent(SubscribeOnObserver));
(ObservableCreate)source
.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
(oos)source.subscribe(parent);
也即是:
ObservableObserveOn 的 observer 是 SubscribeOnObserver
ObservableCreate 的 observer 是 ObserveOnObserver
oos 的 observer 是 CreateEmitter
這里有點一級一級調(diào)用的意味了装蓬,而這個意味就是Rxjava的一個很重要的點著拭。
插入結(jié)束,繼續(xù)回到剛才的 SubscribeTask
結(jié)合上面的分析:
source.subscribe(parent)
也就意味著
ObservableObserveOn.subscribeActual()
這里轉(zhuǎn)了兩個彎牍帚,各位可以稍微思考一下
而在ObservableObserveOn中:
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
//這個暫時忽略儡遮,未設(shè)置的時候不走這里
source.subscribe(observer);
} else {
//線程調(diào)度,待會再分析
Scheduler.Worker w = scheduler.createWorker();
//最終會調(diào)用這個暗赶,又是很熟悉的source subscribe()方法
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
繼續(xù)往上走鄙币,走到 ObservableCreate 中,這里省略了重復(fù)流程蹂随。
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
走到最上層十嘿,這個最上層的source就是我們前面寫的oos。
這個observer是在 ObservableObserverOn 中的 ObserveOnObserver糙及。這個名字有點像详幽,汗
第一行:先把 ObserveOnObserver 封裝成 CreateEmitter
第二行:調(diào)用 ObserveOnObserver.onSubscribe()方法。
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
if (s instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<T> qd = (QueueDisposable<T>) s;
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
if (m == QueueDisposable.SYNC) {
sourceMode = m;
queue = qd;
done = true;
actual.onSubscribe(this);
schedule();
return;
}
if (m == QueueDisposable.ASYNC) {
sourceMode = m;
queue = qd;
actual.onSubscribe(this);
return;
}
}
queue = new SpscLinkedArrayQueue<T>(bufferSize);
actual.onSubscribe(this);
}
}
這個方法比較長浸锨,但是對我們的流程分析關(guān)鍵的代碼其實就一句
actual.onSubscribe(this);
根據(jù)前面的observer的分析,這個observer其實就是 ObservableSubscribeOn 的 SubscribeOnObserver
最后找到源碼版姑,調(diào)用了 SubscribeOnObserver 的 onSubscribe()方法柱搜。
@Override
public void onSubscribe(Disposable s) {
DisposableHelper.setOnce(this.s, s);
}
public static boolean setOnce(AtomicReference<Disposable> field, Disposable d) {
ObjectHelper.requireNonNull(d, "d is null");
if (!field.compareAndSet(null, d)) {
d.dispose();
if (field.get() != DISPOSED) {
reportDisposableSet();
}
return false;
}
return true;
}
涉及到CAS的操作,感興趣的同學(xué)可以研究一下剥险,這里對我們的流程沒有太大影響聪蘸。
第三行:至此,整個的流程終于回到了我們的oos表制。
從ObservableSubscribeOn的subscribe()方法歷盡千辛萬苦終于調(diào)用了oos的subscribe()方法健爬。
ObservableOnSubscribe<Integer> oos = new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
final int max = 100;
for (int i = 1; i <= max; i++) {
e.onNext(max);
}
e.onComplete();
}
};
首先創(chuàng)建了 ObservableEmitter ,然后調(diào)用emmiter.onNext()方法么介。
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
異常先不考慮娜遵,最終是調(diào)用 observer.onNext()方法。
根據(jù)上面的分析壤短,這個Observer是ObserveOnObserver
第一行:先把 ObserveOnObserver 封裝成 CreateEmitter设拟,而CreateEmmiter的構(gòu)造:
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
所以慨仿,可以知道這個observer就是最前面的 ObserveOnObserver
也就是e.Next(n)------>最終會調(diào)用ObserveOnObserver.onNext(n)
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
最終調(diào)用了 schedule() 方法。
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
將當(dāng)前對象添加到worker中纳胧,這個是線程調(diào)度的問題了镰吆,待會分析。
再看一下ObserveOnObserver 類的聲明:
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable{}
實現(xiàn)了Runnable接口跑慕,所以關(guān)鍵代碼就在run()方法之中万皿。
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
//我們看drainNormal()的方法
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
終于看到了a.onNext()方法,也就是actual.onNext()方法核行。
通過 ObserveOnObserver 的構(gòu)造:
//構(gòu)造方法
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.actual = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
//創(chuàng)建對象的時候
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
//這里new了ObserveOnObserver對象牢硅。
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
可以看出,這個actual對象钮科,其實就是傳入進來的observer唤衫。
而這個observer結(jié)合SubscribeTask代碼,可以知道:
這個observer其實就是講我們的observer封裝起來的SubscribeOnObserver對象绵脯。
而SubscribeOnObserver的onNext()方法:
@Override
public void onNext(T t) {
actual.onNext(t);
}
其實就是我們的o.next()方法佳励。
七轉(zhuǎn)八彎,經(jīng)歷這個這么多,也是本文最核心的:
subscribe()方法蛆挫,先一層一層往上回調(diào)赃承,調(diào)用了我們的oos的onNext()方法,
而onNext()里面又一層一層往下回調(diào)悴侵,調(diào)用了我們的obsrever的onNext()方法瞧剖,實現(xiàn)了數(shù)據(jù)的傳遞。
然后是線程切換問題:
還記得我們之前說ObservableSubscribeOn, ObservableObserveOn這兩個對象的構(gòu)造都會傳入一個 schedule 的調(diào)度器嗎可免?
先看 ObservableSubscribeOn
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
再結(jié)合前面的代碼抓于,我們知道這個 scheduler 是Schedulers.computation()
最后走到了:
public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit unit) {
ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run));
try {
Future<?> f;
if (delayTime <= 0L) {
f = executor.submit(task);
} else {
f = executor.schedule(task, delayTime, unit);
}
task.setFuture(f);
return task;
} catch (RejectedExecutionException ex) {
RxJavaPlugins.onError(ex);
return EmptyDisposable.INSTANCE;
}
}
executor,我們非常熟悉的線程池〗浇瑁看到這捉撮,也就大概明白了我們的 source.subscribe(parent)
以及其對應(yīng)的一層層往上回調(diào)都是在subscribeOn(線程) 所調(diào)用的線程之中。
然后線程什么時候會再度切換呢妇垢?
是在ObservableObserveOn中的 schedule() 方法中:
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
這個worker一層層追蹤溯源巾遭,找到了其初始化的地方,是在ObservableObserveOn的subscribeActual()方法之中:
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
這個schedule就是observerOn所對應(yīng)的線程闯估。
AndroidSchedulers.mainThread() 的實現(xiàn)是 HandlerScheduler
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
if (async) {
message.setAsynchronous(true);
}
handler.sendMessageDelayed(message, unit.toMillis(delay));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
最終通過hanler進行了線程的切換灼舍。
也就是最后我們的observer.onNext()方法執(zhí)行的線程是由observeOn()所對應(yīng)的線程