接上文Rxjava(一)——鏈?zhǔn)秸{(diào)用怎么實(shí)現(xiàn)的?
在分析線程切換原理前要明白幾個(gè)概念蒿往;
線程調(diào)用的關(guān)鍵操作符subscribeOn盛垦、observeOn
observeOn作用:影響后續(xù)操作符所在的線程,直到下個(gè)observeOn設(shè)置為其他線程瓤漏;
subscribeOn作用:初始化整個(gè)鏈條所在的線程腾夯,多次設(shè)置只有第一次生效;
線程調(diào)度器 Schedulers
Rxjava里面將常用線程歸納為4種蔬充,即有4 種調(diào)度器:
- 主線程 AndroidSchedulers.mainThread();
- io線程 Schedulers.io()俯在;
- 計(jì)算線程 Schedulers.computation();
- 新建線程 Schedulers.newThread()娃惯;
同樣以一個(gè)實(shí)例來進(jìn)行分析:
Observable.just("a")
.observeOn(Schedulers.computation())
.map(new Func1<String, String>() { //操作1
@Override
public String call(String s) {
System.out.print(Thread.currentThread().getName() + ":first--" + s +"\n");
return s + s;
}
})
.observeOn(Schedulers.io())
.map(new Func1<String, String>() { //操作2
@Override
public String call(String s) {
System.out.print(Thread.currentThread().getName() + ":second--" + s+"\n");
return s + s;
}
})
.subscribeOn(Schedulers.newThread())
.subscribe(new Subscriber<String>() {//操作3
@Override
public void onCompleted() {
System.out.print(Thread.currentThread().getName()+"\n");
System.out.print("completed"+"\n");
}
@Override
public void onError(Throwable e) {
System.out.print("error");
}
@Override
public void onNext(String s) {
System.out.println(s);
}
});
上述實(shí)例中跷乐,分別使用了computation線程、io線程和新線程趾浅,執(zhí)行代碼如下:
由于subscribeOn是在整個(gè)調(diào)用鏈之前愕提,其作用于整個(gè)鏈條,而observeOn只作用此它操作符之后皿哨,因此上圖結(jié)束RxComputationThreadPool-1 在計(jì)算線程中浅侨,而之后的全部都處理io()線程RxCachedThreadScheduler-1中。
正式開始擼代碼
帶著幾個(gè)問題來跟讀代碼:
- 1 Observable.just("a")生成的Observable對(duì)象证膨,如何調(diào)用到計(jì)算線程中如输,線程切換通過什么實(shí)現(xiàn)的?
- 2 為什么subscribeOn()是對(duì)整個(gè)調(diào)用鏈條起作用?
問題1:Observable.just("a")生成的Observable對(duì)象不见,如何調(diào)用到計(jì)算線程中澳化,線程切換通過什么實(shí)現(xiàn)的?
Observable.just("a").observeOn(Schedulers.computation())
由于just發(fā)送的單個(gè)對(duì)象稳吮,因此Observable使用的創(chuàng)建對(duì)象為ScalarSynchronousObservable缎谷;在其初始化對(duì)象時(shí),將"a"作為構(gòu)造參數(shù)傳入灶似,并保存列林。
observeOn操作符會(huì)首先對(duì)Observable的類型進(jìn)行檢測(cè),若為ScalarSynchronousObservable類型酪惭,則通過ScalarSynchronousObservable@scalarScheduleOn來實(shí)現(xiàn)在某個(gè)線程中調(diào)度的過程希痴;
跟進(jìn)ScalarSynchronousObservable類,
public Observable<T> scalarScheduleOn(Scheduler scheduler) {
if (scheduler instanceof EventLoopsScheduler) {
EventLoopsScheduler es = (EventLoopsScheduler) scheduler;
return create(new DirectScheduledEmission<T>(es, t));
}
return create(new NormalScheduledEmission<T>(scheduler, t));
}
很顯然春感,使用DirectScheduledEmission润梯,通過call,完成計(jì)算線程池的直接調(diào)度甥厦。
static final class DirectScheduledEmission<T> implements OnSubscribe<T> {
private final EventLoopsScheduler es;
private final T value;
DirectScheduledEmission(EventLoopsScheduler es, T value) {
this.es = es;
this.value = value;
}
@Override
public void call(final Subscriber<? super T> child) {
child.add(es.scheduleDirect(new ScalarSynchronousAction<T>(child, value)));
}
}
//EventLoopsScheduler.class
public Subscription scheduleDirect(Action0 action) {
PoolWorker pw = pool.get().getEventLoop();
return pw.scheduleActual(action, -1, TimeUnit.NANOSECONDS);
}
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
Action0 decoratedAction = schedulersHook.onSchedule(action);
ScheduledAction run = new ScheduledAction(decoratedAction);
Future<?> f;
if (delayTime <= 0) {
f = executor.submit(run);//執(zhí)行動(dòng)作
} else {
f = executor.schedule(run, delayTime, unit);
}
run.add(f);
return run;
}
到此時(shí)纺铭,executor.submit(run);執(zhí)行,切換到對(duì)應(yīng)線程上完成Action的調(diào)用刀疙;而Action里面做的什么事舶赔, 找到實(shí)現(xiàn)方法,發(fā)現(xiàn)其call()方法就是將數(shù)據(jù)傳遞到下一層去而已:
/** Action that emits a single value when called. */
static final class ScalarSynchronousAction<T> implements Action0 {
private final Subscriber<? super T> subscriber;
private final T value;
private ScalarSynchronousAction(Subscriber<? super T> subscriber,
T value) {
this.subscriber = subscriber;
this.value = value;
}
@Override
public void call() {
try {
subscriber.onNext(value);
} catch (Throwable t) {
subscriber.onError(t);
return;
}
subscriber.onCompleted();
}
}
問題一到此谦秧,就分析得差不多了竟纳。
問題2: 為什么subscribeOn()是對(duì)整個(gè)調(diào)用鏈條起作用?
return nest().lift(new OperatorSubscribeOn<T>(scheduler));
能對(duì)整個(gè)調(diào)用鏈起作用的關(guān)鍵點(diǎn)是nest()疚鲤;為什么這么說锥累,看其實(shí)現(xiàn)氛赐;
public final Observable<Observable<T>> nest() {
return just(this);
}
在此败去,可以看到其實(shí)還是使用的just,但是挨摸,關(guān)鍵是發(fā)送的觀察者是this;發(fā)送this诲宇,就意味著subscribeOn所在的Observable對(duì)象發(fā)送了出去际歼。this所代表的對(duì)象將會(huì)作為一個(gè)嵌套鏈表嵌入到subscribeOn所產(chǎn)生的新的Observable調(diào)用鏈中;
那么this如何嵌入到新的Observable中的呢姑蓝?
查看OperatorSubscribeOn類源碼鹅心,關(guān)鍵代碼o.unsafeSubscribe。
@Override
public Subscriber<? super Observable<T>> call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();
subscriber.add(inner);
return new Subscriber<Observable<T>>(subscriber) {
@Override
public void onCompleted() {
// ignore because this is a nested Observable and we expect only 1 Observable<T> emitted to onNext
}
@Override
public void onError(Throwable e) {
subscriber.onError(e);
}
@Override
public void onNext(final Observable<T> o) {
inner.schedule(new Action0() {
@Override
public void call() {
final Thread t = Thread.currentThread();
o.unsafeSubscribe(new Subscriber<T>(subscriber) {
@Override
public void onCompleted() {
subscriber.onCompleted();
}
@Override
public void onError(Throwable e) {
subscriber.onError(e);
}
@Override
public void onNext(T t) {
subscriber.onNext(t);
}
@Override
public void setProducer(final Producer producer) {
subscriber.setProducer(new Producer() {
@Override
public void request(final long n) {
if (Thread.currentThread() == t) {
// don't schedule if we're already on the thread (primarily for first setProducer call)
// see unit test 'testSetProducerSynchronousRequest' for more context on this
producer.request(n);
} else {
inner.schedule(new Action0() {
@Override
public void call() {
producer.request(n);
}
});
}
}
});
}
});
}
});
}
};
}
}
此處的o對(duì)象就是nest發(fā)送的this纺荧,通過unsafeSubscribe函數(shù)旭愧,重新形成調(diào)用鏈颅筋;
此處在執(zhí)行的同時(shí)還存在線程切換,即inner.schedule输枯。