介紹
subscribeOn()方法是將Observable內(nèi)的數(shù)據(jù)處理器Observable.OnSubscribe放置在一個(gè)新的線程內(nèi)執(zhí)行。
執(zhí)行代碼
//初始化被觀察者Observable树叽,并給其加上數(shù)據(jù)處理器Observable.OnSubscribe
Observable Aobservable = Observable.create(new Observable.OnSubscribe<String>(){
@Override
public void call(Subscriber<? super String> subscriber) {
LogShowUtil.addLog("RxJava","發(fā)送線程: "+Thread.currentThread().getName(),true);
subscriber.onNext("楊");
subscriber.onCompleted();
}
});
//做subscribeOn線程切換處理
Observable Bobservable = Aobservable.subscribeOn(Schedulers.newThread());
//初始化觀察者Observer,視作結(jié)果接收器
Observer observer = new Observer<String>() {
@Override
public void onCompleted() {
LogShowUtil.addLog("RxJava","結(jié)束",true);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String string) {
LogShowUtil.addLog("RxJava","接受線程: "+Thread.currentThread().getName(),true);
LogShowUtil.addLog("RxJava","結(jié)果: "+string,true);
}
};
//訂閱
Bobservable.subscribe(observer);
源碼分析
1. 初始化被觀察者AObservable
Observable Aobservable = Observable.create(原始數(shù)據(jù)處理器);
由此可知被觀察者AObservable持有原始數(shù)據(jù)處理器對象Observable.OnSubscribe。
2. 執(zhí)行subscribeOn線程切換操作
Observable Bobservable = Aobservable.subscribeOn(Schedulers.newThread());
Observable#subscribeOn
public final Observable<T> subscribeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return create(new OperatorSubscribeOn<T>(this, scheduler));
}
接著我們看其中的new OperatorSubscribeOn(Aobservable,線程切換工具)操作
public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
this.scheduler = scheduler;
this.source = source;
}
由代碼可知代理線程切換器OperatorSubscribeOn持有
Aobservable
和線程切換工具Scheduler
回到subscribeOn()方法內(nèi)繼續(xù)執(zhí)行create(代理線程切換器)
return create(new OperatorSubscribeOn<T>(this, scheduler));
create方法之前已經(jīng)分析過姥敛,由此可知Bobservable持有代理線程切換器OperatorSubscribeOn浆劲。
3. 初始化結(jié)果接受器觀察者Observer
Observer observer = new Observer<String>() {
...
}
4. 訂閱
Bobservable.subscribe(observer);
由之前分析可知會(huì)使用 Bobservable內(nèi)的代理線程切換器OperatorSubscribeOn做call()方法。
其中observer為結(jié)果接受器
@Override
public void call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();
subscriber.add(inner);
//步驟一 切換線程執(zhí)行以下操作
inner.schedule(new Action0() {
@Override
public void call() {
final Thread t = Thread.currentThread();
//步驟二
//新建一個(gè)代理結(jié)果接受器烫扼,其內(nèi)持有結(jié)果接收器Observer
Subscriber<T> s = new Subscriber<T>(subscriber) {
@Override
public void onNext(T t) {
subscriber.onNext(t);
}
@Override
public void onError(Throwable e) {
try {
subscriber.onError(e);
} finally {
inner.unsubscribe();
}
}
@Override
public void onCompleted() {
try {
subscriber.onCompleted();
} finally {
inner.unsubscribe();
}
}
@Override
public void setProducer(final Producer p) {
subscriber.setProducer(new Producer() {
@Override
public void request(final long n) {
if (t == Thread.currentThread()) {
p.request(n);
} else {
inner.schedule(new Action0() {
@Override
public void call() {
p.request(n);
}
});
}
}
});
}
};
//步驟三 Aobservable.unsafeSubscribe(代理結(jié)果接受器)
source.unsafeSubscribe(s);
}
});
}
步驟一
inner.schedule
會(huì)將下面將要執(zhí)行的步驟二和步驟三會(huì)在一個(gè)新的線程內(nèi)執(zhí)行
步驟二會(huì)生成一個(gè)新的代理接收器Subscribers
,代理接收器內(nèi)持有外部實(shí)現(xiàn)的結(jié)果接受器Observer
接著會(huì)執(zhí)行步驟三unsafeSubscribe()
方法
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
try {
subscriber.onStart();
//獲取數(shù)據(jù)處理器Observable.OnSubscribe曙求,并做數(shù)據(jù)處理工作
RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
try {
subscriber.onError(RxJavaHooks.onObservableError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
RxJavaHooks.onObservableError(r);
throw r;
}
return Subscriptions.unsubscribed();
}
}
由此可知AObservable的原始數(shù)據(jù)處理器先執(zhí)行call(代理接收器Subscriber
s
)方法
接下來會(huì)進(jìn)入外部實(shí)現(xiàn)的外部數(shù)據(jù)處理器
原始數(shù)據(jù)處理器內(nèi)的call()方法
@Override
public void call(Subscriber<? super String> subscriber) {
LogShowUtil.addLog("RxJava","發(fā)送線程: "+Thread.currentThread().getName(),true);
subscriber.onNext("楊");
subscriber.onCompleted();
}
然后會(huì)進(jìn)入代理接收器Subscriber s
的onNext()方法
Subscriber<T> s = new Subscriber<T>(subscriber) {
@Override
public void onNext(T t) {
subscriber.onNext(t);
}
@Override
public void onError(Throwable e) {
try {
subscriber.onError(e);
} finally {
inner.unsubscribe();
}
}
@Override
public void onCompleted() {
try {
subscriber.onCompleted();
} finally {
inner.unsubscribe();
}
}
@Override
public void setProducer(final Producer p) {
subscriber.setProducer(new Producer() {
@Override
public void request(final long n) {
if (t == Thread.currentThread()) {
p.request(n);
} else {
inner.schedule(new Action0() {
@Override
public void call() {
p.request(n);
}
});
}
}
});
}
};
接著會(huì)執(zhí)行subscriber.onNext(t);
進(jìn)入結(jié)果接收器Observer內(nèi)方法體內(nèi)
Observer observer = new Observer<String>() {
@Override
public void onCompleted() {
LogShowUtil.addLog("RxJava","結(jié)束",true);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String string) {
LogShowUtil.addLog("RxJava","接受線程: "+Thread.currentThread().getName(),true);
LogShowUtil.addLog("RxJava","結(jié)果: "+string,true);
}
};
此過程中只有在代理線程切換器OperatorSubscribeOn做call()方法的時(shí)候做過一次線程切換,所以那之后的所有操作都是在新的線程內(nèi)執(zhí)行的材蛛。
最終輸出結(jié)果
發(fā)送線程: RxNewThreadScheduler-1
接受線程: RxNewThreadScheduler-1
結(jié)果: 楊
結(jié)束