RxJava線程切換非常方便愧捕,只要調(diào)用subscribeOn(Schedules.io())就可以使前邊的操作運(yùn)行于子線程,調(diào)用obsersableOn(AndroidSchedules.mainThread())就可以設(shè)置后邊的代碼運(yùn)行于主線程汁蝶,那么是如此神奇,他是如何實(shí)現(xiàn)的诸典?
今天就以下邊的代碼為切入點(diǎn)深入源碼看一下
Observable.just("我是網(wǎng)絡(luò)圖片url").map(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
Log.i(TAG, "apply1 thread:"+Thread.currentThread().getName());
Log.i(TAG, "apply1");
s = s +" 加上一個(gè)時(shí)間戳后";
return s;
}
}).map(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
Log.i(TAG, "apply2 thread:"+Thread.currentThread().getName());
Log.i(TAG, "apply2");
s = s +" 加上第二個(gè)參數(shù)后";
return s;
}
}).subscribeOn(Schedulers.io()).subscribe(new Observer<String>() {
@Override
public void onSubscribe() {
Log.i(TAG, "onSubscribe thread:"+Thread.currentThread().getName());
Log.i(TAG, "onSubscribe");
}
@Override
public void onNext(@NonNull String s) {
Log.i(TAG, "onNext thread:"+Thread.currentThread().getName());
Log.i(TAG, "onNext:"+s+" 開(kāi)啟下載這個(gè)圖片");
}
@Override
public void onError(@NonNull Throwable throwable) {
Log.i(TAG, "onError");
throwable.printStackTrace();
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete thread:"+Thread.currentThread().getName());
Log.i(TAG, "onComplete:下載完成");
}
});
程序運(yùn)行流程圖如下
just方法
創(chuàng)建一個(gè)ObservableJust對(duì)象返回媒至,并將just傳入的參數(shù)保保存為value
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> just(T item) {
ObjectHelper.requireNonNull(item, "The item is null");
return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
map方法
同樣道理,創(chuàng)建一個(gè)ObservableMap對(duì)象餐塘,由于map方法由上邊的ObservableJust對(duì)象調(diào)用妥衣,所以構(gòu)造方法中傳入的this表示的就是ObservableJust對(duì)象,創(chuàng)建ObservableMap對(duì)象后,保存上一級(jí)產(chǎn)生的ObservableJust為當(dāng)前ObservableMap對(duì)象中的成員變量source税手,保存當(dāng)前function回調(diào)接口蜂筹,這樣一來(lái),當(dāng)前對(duì)象持有上一級(jí)ObservableJust的引用芦倒。不管map調(diào)用幾次艺挪,當(dāng)前對(duì)象都會(huì)持有上一級(jí)產(chǎn)生的對(duì)象的引用
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
假設(shè)再次調(diào)用map之后,這個(gè)map就是由上一次調(diào)用map產(chǎn)生的ObservableMap對(duì)象調(diào)用的兵扬,此時(shí)會(huì)將上一級(jí)這個(gè)ObservableMap對(duì)象保存到當(dāng)前對(duì)象的source成員變量中麻裳,就這樣,一級(jí)套一級(jí)
subscribeOn方法
產(chǎn)生一個(gè)ObservableSubscribeOn對(duì)象器钟,并將上一級(jí)的ObservableMap對(duì)象保存為當(dāng)前對(duì)象的source變量津坑,保存?zhèn)魅氲膕cheduler,那么這個(gè)scheduler是什么傲霸?
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
subscribeOn(Schedulers.io())方法使上邊的操作在子線程中執(zhí)行疆瑰,Schedulers.io()就是上邊傳入的schedulers,我們看一下schedulers是如何創(chuàng)建的
來(lái)到Schedulers類(lèi)中
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
可以找到IO對(duì)象是在本類(lèi)靜態(tài)代碼塊中創(chuàng)建的
static {
....
IO = RxJavaPlugins.initIoScheduler(new IOTask());
....
}
IOTask是一個(gè)實(shí)現(xiàn)了Callable接口的線程
static final class IOTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return IoHolder.DEFAULT;
}
}
線程執(zhí)行會(huì)得到Scheduler昙啄,可以看到穆役,這是以?xún)?nèi)部類(lèi)形式實(shí)現(xiàn)的單例模式
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
可以看到,這個(gè)IoScheduler內(nèi)部是線程池實(shí)現(xiàn)的
CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
也就是說(shuō)梳凛,當(dāng)我們?cè)诖a中設(shè)置了這個(gè)操作之后(subscribeOn(Schedulers.io()))耿币,會(huì)創(chuàng)建一個(gè)線程池(如果存在就不必創(chuàng)建),很明顯韧拒,最終將會(huì)需要放在子線程中執(zhí)行的方法在這個(gè)線程池中執(zhí)行淹接,從而達(dá)到切換線程的效果,目前看到這里叭莫,這只能作為一個(gè)猜想蹈集,我們繼續(xù)往下看
observeOn(AndroidSchedulers.mainThread())方法
這個(gè)方法執(zhí)行會(huì)保存一個(gè)運(yùn)行于主線程的Scheduler,這個(gè)主線程Scheduler如何創(chuàng)建的雇初?
AndroidSchedulers中
private static final class MainHolder {
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
/** A {@link Scheduler} which executes actions on the Android main thread. */
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
可以看到這個(gè)Scheduler是通過(guò)封裝Handler得到的一個(gè)運(yùn)行于主線程的封裝類(lèi)拢肆,這里將它保存起來(lái)。最后我們看subscribe方法
subscribe方法
public final void subscribe(Observer<? super T> observer) {
......
subscribeActual(observer);
......
}
protected abstract void subscribeActual(Observer<? super T> observer);
由于subscribeActual方法是抽象的靖诗,那么要從其子類(lèi)中找郭怪,subscribe方法由上次操作observeOn方法得到的ObservableObserveOn對(duì)象調(diào)用,所以會(huì)執(zhí)行這個(gè)類(lèi)中的subscribeActual方法刊橘,進(jìn)入ObservableObserveOn中
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
scheduler就是之前保存的AndroidSchedulers.mainThread對(duì)象鄙才,這里的source表示的就是上一級(jí)產(chǎn)生的Observable對(duì)象,具體到當(dāng)前代碼促绵,就是ObservableSubscribeOn攒庵,調(diào)用ObservableSubscribeOn中的subscribe方法嘴纺,逐層向上傳遞,直到傳遞到ObservableJust對(duì)象中浓冒,再不斷的調(diào)用map中傳入的function回調(diào)方法apply栽渴,當(dāng)apply方法調(diào)用完成,再執(zhí)行Observer的onNext onComplete方法稳懒,具體流程見(jiàn)上邊的流程圖闲擦,下一篇博客我將會(huì)詳細(xì)分析線程調(diào)度的源碼。
到這里场梆,這段示例代碼的流程已經(jīng)走了一遍
寫(xiě)了一個(gè)簡(jiǎn)化版的RxJava墅冷,實(shí)現(xiàn)了just map subscribeOn obserseOn方法,有助于對(duì)原理的理解或油,GitHub地址:https://github.com/renzhenming/MyRxJava