基于rxjava1.1.0 rxandroid 1.0.1
用例代碼↓
Observable<String> observable1 = Observable.create(new Observable.OnSubscribe<String>() {
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("1");
subscriber.onCompleted();
}
});
Subscriber<String> subscriber1 = new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.e("haha",s);
}
};
observable1.observeOn(AndroidSchedulers.mainThread()).subscribe(subscriber1);
observeOn源碼精簡↓
public final Observable<T> observeOn(Scheduler scheduler) {
return lift(new OperatorObserveOn<T>(scheduler));
}
AndroidSchedulers 源碼↓
public final class AndroidSchedulers {
private AndroidSchedulers() {
throw new AssertionError("No instances");
}
private static final Scheduler MAIN_THREAD_SCHEDULER =
new HandlerScheduler(new Handler(Looper.getMainLooper()));
①
public static Scheduler mainThread() {
Scheduler scheduler =
RxAndroidPlugins.getInstance().getSchedulersHook().getMainThreadScheduler();
return scheduler != null ? scheduler : MAIN_THREAD_SCHEDULER;
}
}
lift精簡源碼↓
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
②
//create Observable2 OnSubscribe2
return new Observable<R>(new OnSubscribe<R>() {
③
@Override
public void call(Subscriber<? super R> o) {
Subscriber<? super T> st = hook.onLift(operator).call(o);
st.onStart();
⑤
onSubscribe.call(st);//onSubscribe1.call(subscriber2)
}
});
}
OperatorObserveOn源碼片段↓
④
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {//child = subscriber1
if (scheduler instanceof ImmediateScheduler) {
// avoid overhead, execute directly
return child;
} else if (scheduler instanceof TrampolineScheduler) {
// avoid overhead, execute directly
return child;
} else {
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child);
parent.init();
return parent;
}
}
⑥
@Override
public void onNext(final T t) {
if (isUnsubscribed()) {
return;
}
⑦
if (!queue.offer(on.next(t))) {
onError(new MissingBackpressureException());
return;
}
schedule();
}
final Action0 action = new Action0() {
@Override
public void call() {
pollQueue();
}
};
protected void schedule() {
if (counter.getAndIncrement() == 0) {
recursiveScheduler.schedule(action);
}
}
pollQueue精簡版↓
void pollQueue() {
Object o = queue.poll();
if (o != null) {
⑧
child.onNext(on.getValue(o));
} else {
break;
}
}
OperatorObserveOn.ObserveOnSubscriber源碼片段↓
public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child) {
this.child = child;
this.recursiveScheduler = scheduler.createWorker();
if (UnsafeAccess.isUnsafeAvailable()) {
queue = new SpscArrayQueue<Object>(RxRingBuffer.SIZE);
} else {
queue = new SynchronizedQueue<Object>(RxRingBuffer.SIZE);
}
this.scheduledUnsubscribe = new ScheduledUnsubscribe(recursiveScheduler);
}
代碼調(diào)用流程由①到最后
代碼分解
observable1.observeOn(AndroidSchedulers.mainThread()).subscribe(subscriber1) =
observable1.lift(operatorObserveOn(func)).subscribe(subscriber1)=
observable2.subscribe(subscriber1)
執(zhí)行代碼首先在①創(chuàng)建一個(gè)HandlerScheduler 其Looper為主線程的Looper
繼續(xù)執(zhí)行②創(chuàng)建observable2 OnSubscribe2 此時(shí)訂閱關(guān)系變成observable2 .subscribe(subscriber1) 執(zhí)行observable2.OnSubscribe2.call(subscriber1)到達(dá)③傳入subscriber1到④中作為call()的入?yún)?此時(shí)child = subscriber1創(chuàng)建subscriber2
繼續(xù)執(zhí)行到達(dá)⑤等價(jià)執(zhí)行onSubscribe1.call(subscriber2) 即subscriber2.onNext("1")到達(dá)⑥其中subscriber2.onNext方法中在節(jié)點(diǎn)⑦把數(shù)據(jù)存放在隊(duì)列中然后執(zhí)行schedule();在節(jié)點(diǎn)⑧會(huì)在指定的線程從隊(duì)列中取出數(shù)據(jù)重新發(fā)射出來child.onNext(on.getValue(o));其中child為subscriber1 即調(diào)用subscriber1.onNext("123"));
至此流程完結(jié)