rxjava代碼
Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("有情況");
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.e("qwer", s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
然后create和subscribe也不講了(可以看前面文章)
1、直接看observeOn
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
再進(jìn)入重載的observeOn方法
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
再進(jìn)入ObservableObserveOn
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
同樣的套路,完成賦值
通過(guò)前面的三篇文章,我們已經(jīng)知道接下來(lái)會(huì)進(jìn)入ObservableObserveOn的subscribeActual方法
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
很顯然,scheduler 不是 TrampolineScheduler類(lèi)型孵构,也就是進(jìn)入else代碼中,調(diào)用了scheduler.createWorker();
2、這個(gè)時(shí)候必須要先具體了解一下scheduler了
這個(gè)scheduler就是我們傳的AndroidSchedulers.mainThread()
進(jìn)入該方法
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
其實(shí)這里onMainThreadScheduler返回的就是本身
3累榜、所以繼續(xù)看MAIN_THREAD
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
進(jìn)入RxAndroidPlugins.initMainThreadScheduler
public static Scheduler initMainThreadScheduler(Callable<Scheduler> scheduler) {
if (scheduler == null) {
throw new NullPointerException("scheduler == null");
}
Function<Callable<Scheduler>, Scheduler> f = onInitMainThreadHandler;
if (f == null) {
return callRequireNonNull(scheduler);
}
return applyRequireNonNull(f, scheduler);
}
因?yàn)閒為空(為什么為空,就是我沒(méi)有對(duì)onInitMainThreadHandler進(jìn)行賦值)灵嫌,所以返回了callRequireNonNull(scheduler)壹罚,進(jìn)入該方法
static Scheduler callRequireNonNull(Callable<Scheduler> s) {
try {
Scheduler scheduler = s.call();
if (scheduler == null) {
throw new NullPointerException("Scheduler Callable returned null");
}
return scheduler;
} catch (Throwable ex) {
throw Exceptions.propagate(ex);
}
}
其實(shí)就是返回了s.call(),而s.call()是什么寿羞,不錯(cuò)猖凛,就是步驟3最開(kāi)始的
return MainHolder.DEFAULT;
4、繼續(xù)查看 MainHolder.DEFAULT
private static final class MainHolder {
static final Scheduler DEFAULT
= new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
}
P髂隆1嬗尽虱岂!發(fā)現(xiàn)重點(diǎn)
這里返回了一個(gè)HandlerScheduler,構(gòu)造函數(shù)參數(shù)還放了一個(gè)
new Handler(Looper.getMainLooper())菠红,是不是感覺(jué)有點(diǎn)感覺(jué)
進(jìn)入HandlerScheduler
HandlerScheduler(Handler handler, boolean async) {
this.handler = handler;
this.async = async;
}
只是賦值第岖,一個(gè)主線程handler,然后async是false
而這個(gè)時(shí)候我們聯(lián)系步驟1最后是不是應(yīng)該看它的createWorker方法
public Worker createWorker() {
return new HandlerWorker(handler, async);
}
好,所以此時(shí)回到步驟1的最后试溯,Scheduler.Worker就是HandlerWorker
5蔑滓、順著步驟1最后的else代碼繼續(xù)看
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
source.subscribe就不用講了,直接看ObserveOnObserver
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.downstream = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
一系列的賦值操作
嗯哼遇绞?是不是好像結(jié)束了键袱??
不存在的D∶觥8茏荨!
注意它可是Observer钩骇,注意它的方法
6比藻、首先看onSubscribe方法,因?yàn)樗鼤?huì)被第一個(gè)調(diào)用(看過(guò)第一篇文章的都知道)倘屹,它也是我們自己new的觀察者的第一個(gè)回調(diào)方法
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
if (d instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<T> qd = (QueueDisposable<T>) d;
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
if (m == QueueDisposable.SYNC) {
sourceMode = m;
queue = qd;
done = true;
downstream.onSubscribe(this);
schedule();
return;
}
if (m == QueueDisposable.ASYNC) {
sourceMode = m;
queue = qd;
downstream.onSubscribe(this);
return;
}
}
queue = new SpscLinkedArrayQueue<T>(bufferSize);
downstream.onSubscribe(this);
}
}
根據(jù)之前的代碼我們知道這個(gè)d不是QueueDisposable
所以直接跳過(guò)if
對(duì)queue 進(jìn)行了初始化
這里queue 說(shuō)一下银亲,雖然不知道具體細(xì)節(jié),單可以肯定的是纽匙,他是一個(gè)隊(duì)列
然后繼續(xù)往下看downstream.onSubscribe(this)完成Disposable的繼續(xù)傳遞
7务蝠、接下來(lái)再看什么?是不是就是我們的onNext()方法了
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
因?yàn)槲覀冎肋@個(gè)t肯定要往下傳烛缔,所以done為false,sourceMode 也不等于QueueDisposable.ASYNC
然后我們被觀察者發(fā)送的數(shù)據(jù)t就被壓入了隊(duì)列queue里去了
8馏段、然后執(zhí)行schedule()
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
這里getAndIncrement()應(yīng)該是線程任務(wù)數(shù)(我猜的,返回這里肯定要為0)
然后進(jìn)入schedule方法
public Disposable schedule(@NonNull Runnable run) {
return schedule(run, 0L, TimeUnit.NANOSECONDS);
}
走了重載的方法践瓷,而且schedule是個(gè)抽象方法
9院喜、那此時(shí)你還記得這個(gè)worker是誰(shuí)了么?
不錯(cuò)晕翠,看步驟4的最后喷舀,就是HandlerWorker,我們看它的schedule方法
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
if (async) {
message.setAsynchronous(true);
}
handler.sendMessageDelayed(message, unit.toMillis(delay));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
在代碼里只需要注意重點(diǎn)
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run)
這個(gè)handler是主線程handler(步驟4)
這個(gè)run就是步驟5里new的那個(gè)ObserveOnObserver淋肾,它實(shí)現(xiàn)了Runnable接口
下面執(zhí)行代碼
handler.sendMessageDelayed(message, unit.toMillis(delay))
好硫麻,此時(shí)run線程被執(zhí)行(如果不知道為什么可以好好看看handler基礎(chǔ)),而且是在主線程執(zhí)行7俊D美ⅰ!說(shuō)明線程已經(jīng)切換到主線程了
那么接下來(lái)呢碌尔?
不錯(cuò)浇辜!回到步驟5里new的那個(gè)ObserveOnObserver的run方法
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
這里outputFused為false(賦值true的地方我沒(méi)有執(zhí)行券敌,其實(shí)具體看drainFused和drainNormal兩個(gè)方法的代碼,也可以辨別出其為false)
10奢赂、然后進(jìn)入drainNormal方法
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = downstream;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
disposed = true;
upstream.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
直接說(shuō)重點(diǎn)
首先開(kāi)啟for循環(huán)陪白,當(dāng)隊(duì)列不為空的時(shí)候
v = q.poll();
拿到隊(duì)列里的值(就是步驟7里offer的那個(gè))
然后a.onNext(v)完成事件傳遞(往上面看颈走,a就是downstream)
自此就算結(jié)束了
總結(jié)---
關(guān)鍵步驟在9膳灶,通過(guò)主線程的hanlder完成線程切換