在上一小節(jié)中RxJava2_整體流程分析盯串,有這么一個(gè)結(jié)論,那就是每一次調(diào)用 Observable 的操作符都會(huì)返回一個(gè)新的 Observable 對(duì)象戒良,并且會(huì)通過(guò)構(gòu)造的方式傳入上一級(jí)創(chuàng)建的 Observable 對(duì)象体捏,將其保存起來(lái),下面是示例代碼。那么接下來(lái)操作的 subscribeOn几缭、observeOn 操作符都會(huì)分別創(chuàng)建新的 Observable 對(duì)象河泳,并存儲(chǔ)上一級(jí)創(chuàng)建的 observable。
//上一級(jí)創(chuàng)建的 observable 對(duì)象:ObservableOnSubscribe
Observable.create(new ObservableOnSubscribe<String>() {...}
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
//保存上一級(jí)創(chuàng)建的 Observable 對(duì)象 : ObservableOnSubscribe
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
}
一奏司、執(zhí)行流程圖
二乔询、示例代碼
下面這段代碼的功能就是在 subscribe 方法內(nèi)部通過(guò)調(diào)用 getBitampFormServer 去請(qǐng)求一個(gè) Bitmap 對(duì)象,這個(gè)方法是耗時(shí)操作韵洋,當(dāng)前的操作應(yīng)該在子線程中執(zhí)行,得到 bmp 之后黄锤,根據(jù)結(jié)果分別去調(diào)用 onNext() /onError() 方法搪缨。而在訂閱者中若是 onNext 被回調(diào)則表示成功獲取到 bmp,對(duì)應(yīng)地將其設(shè)置給對(duì)應(yīng)的 mImageView 對(duì)象上鸵熟,如果 onError 被回調(diào)了副编,那么表示加載 Bitmap 是失敗的,對(duì)應(yīng)的再做一些其它操作流强,這些操作應(yīng)該在主線程中進(jìn)行痹届。本次通過(guò)從源碼的角度探究的是 RxJava2 內(nèi)部是如何進(jìn)行線程切換操作的。本小節(jié)先分析 subscribeOn 如何去實(shí)現(xiàn)事件源在子線程中發(fā)射事件打月。也就是 ObservableOnSubscribe#subscribe 在子線程中去執(zhí)行队腐。
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Bitmap> e) throws Exception {
//該方法進(jìn)行網(wǎng)絡(luò)請(qǐng)求,是比較耗時(shí)的操作奏篙。
Bitmap bmp = getBitampFormServer("uri");
if(bmp!=null) {
//獲取 bmp 成功
e.onNext(bmp);
e.onComplete();
}else{
//如果從網(wǎng)絡(luò)加載圖片不成功柴淘,回調(diào)onError 來(lái)通知訂閱者
e.onError(new Exception("圖片加載出錯(cuò)啦"));
}
}}) //事件源發(fā)射事件在子線程中運(yùn)行
.subscribeOn(Schedulers.io())
//訂閱者在主線程中接受事件
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Bitmap>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("zeal", "onSubscribe");
}
@Override
public void onNext(@NonNull Bitmap bmp) {
//設(shè)置顯示在 ImageView 上
mImageView.setImageBitmap(bmp);
}
@Override
public void onError(@NonNull Throwable e) {
Log.e("zeal","error:"+e.toString());
}
@Override
public void onComplete() {
Log.e("zeal", "onComplete");
}
});
2、.subscribeOn(Schedulers.io()) 源碼分析
public final Observable<T> subscribeOn(Scheduler scheduler) {
...
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
2.1秘通、Scheduler
從下面的類(lèi)注釋可以知道为严,這個(gè)類(lèi)是一個(gè)調(diào)度類(lèi),可以延時(shí)/周期性地去執(zhí)行一個(gè)任務(wù)肺稀〉诠桑可以從 Schedulers 這個(gè)類(lèi)去獲取 Scheduler 的實(shí)現(xiàn)子類(lèi)對(duì)象,例如在頻繁進(jìn)行 io 操作就可以調(diào)用 Schedulers.io() 话原,如果是計(jì)算比較多的可以調(diào)用 Schedulers.computation()夕吻。
/**
* A {@code Scheduler} is an object that specifies an API for scheduling
* units of work with or without delays or periodically.
* You can get common instances of this class in {@link io.reactivex.schedulers.Schedulers}.
*/
public abstract class Scheduler {}
2.2、Schedulers.io()
通過(guò)下面的 Schedulers.io() 源碼跟蹤稿静,最終返回的是一個(gè) IoScheduler 對(duì)象梭冠,這個(gè)對(duì)象實(shí)際上就是 Scheduler 的子類(lèi)對(duì)象。那么就符合 subscribeOn(Scheduler) 參數(shù)的要求了改备。
@NonNull
public static Scheduler io() {
//內(nèi)部是 IO
return RxJavaPlugins.onIoScheduler(IO);
}
//-----------------------------------------------------
@NonNull
static final Scheduler IO;
static {
...
// IO 是在靜態(tài)代碼塊中實(shí)例化的
IO = RxJavaPlugins.initIoScheduler(new Callable<Scheduler>() {
@Override
public Scheduler call() throws Exception {
//這里返回一個(gè) IoHolder 對(duì)象控漠。
return IoHolder.DEFAULT;
}
});
...
}
//-----------------------------------------------------
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
//-----------------------------------------------------
//IoHolder 類(lèi)定義中可以知道,該類(lèi)是繼承至 Scheduler
public final class IoScheduler extends Scheduler {}
2.3、subscribeOn 內(nèi)部實(shí)現(xiàn)
- subscribeOn(Scheduler scheduler)
這個(gè)方法內(nèi)部會(huì)通過(guò)創(chuàng)建一個(gè) ObservableSubscribeOn 對(duì)象盐捷,根據(jù)之前的經(jīng)驗(yàn)可知道偶翅,這個(gè)類(lèi)肯定也是一個(gè) Observable 的子類(lèi)對(duì)象。因此對(duì)于 subscribe(observer) 方法而言碉渡,我們就只關(guān)心它真正調(diào)用的方法 subscribeActual(observer) 聚谁。
- subscribeActual(observer)
在subscribeActual 內(nèi)部首先是對(duì) observer 進(jìn)行包裝成 SubscribeOnObserver 對(duì)象。這里的 SubscribeOnObserver 不僅是一個(gè) Observer 滞诺,而且具備一個(gè)連接器的作用 Disposable 形导。
@Override
public void subscribeActual(final Observer<? super T> s) {
//包裝 observer 對(duì)象
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//將連接器 parent 通過(guò) onSubscribe 回調(diào)給 observer 對(duì)象
s.onSubscribe(parent);
//這里是通過(guò) scheduler 去執(zhí)行一個(gè)任務(wù) SubscribeTask。
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
- SubscribeOnObserver
這個(gè)類(lèi)是對(duì) observer 的包裝习霹,內(nèi)部實(shí)現(xiàn)了 Observer 和 Disposable 接口朵耕。也就是說(shuō)它既有訂閱者的功能,也實(shí)現(xiàn)了連接器的功能淋叶。注意 actual 這個(gè)變量阎曹,它是下一級(jí)的 Observer 對(duì)象,為什么說(shuō)是下一級(jí)呢煞檩?因?yàn)槊看伟b的 Observer 是一級(jí)級(jí)別往上被訂閱的处嫌,當(dāng)前的 Observer 都會(huì)包裝下一級(jí)別的 Observer 對(duì)象。例如 SubscribeOnObserver 就封裝了下一級(jí)的 Observer 對(duì)象斟湃,其實(shí)就是當(dāng)前 Observer 接受到事件源發(fā)送過(guò)來(lái)的事件時(shí)熏迹,再調(diào)用包裝的 Observer 回調(diào)給下一級(jí),這樣一級(jí)級(jí)傳遞下去知道最后一級(jí) Observer桐早。
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
...
final Observer<? super T> actual;
final AtomicReference<Disposable> s;
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable s) {
DisposableHelper.setOnce(this.s, s);
}
//發(fā)送事件
@Override
public void onNext(T t) {
//回調(diào)給下一級(jí)
actual.onNext(t);
}
//發(fā)送事件
@Override
public void onError(Throwable t) {
//回調(diào)給下一級(jí)
actual.onError(t);
}
//發(fā)送事件
@Override
public void onComplete() {
//回調(diào)給下一級(jí)
actual.onComplete();
}
@Override
public void dispose() {
DisposableHelper.dispose(s);
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
- SubscribeTask(parent)
SubscribeTask 它是一個(gè) Runnbale 癣缅,因此我們把它理解為一個(gè)任務(wù)。首先關(guān)注是它的 run 方法哄酝,它內(nèi)部實(shí)現(xiàn)很簡(jiǎn)單友存,就是**通知上一級(jí)的 Observable 通過(guò) subscribe 這個(gè)方法進(jìn)行訂閱當(dāng)前 observer **。下面會(huì)執(zhí)行一大堆代碼陶衅,其實(shí)都會(huì)為創(chuàng)建一個(gè)線程然后交給指定的線程池取執(zhí)行這個(gè)任務(wù)屡立,先記住這個(gè)任務(wù)的使命。那么既然是一個(gè)線程搀军,那么肯定有一個(gè)地方需要執(zhí)行這個(gè)線程的膨俐,接下來(lái)關(guān)注 scheduler.scheduleDirect 方法。
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
//【核心代碼罩句,這段代碼決定上一級(jí)observable訂閱在哪個(gè)線程執(zhí)行焚刺。】
//source:就是上一級(jí)創(chuàng)建的 observable
//parent 就是包裝后的 observer
source.subscribe(parent);
}
}
開(kāi)始尋找 SubscribeTask 這個(gè)線程實(shí)在哪里被執(zhí)行的门烂。
- scheduler.scheduleDirect(new SubscribeTask(parent))
剛才分析過(guò) scheduler 就是 IoScheduler 對(duì)象了乳愉,跟蹤源碼發(fā)現(xiàn)兄淫,這個(gè)類(lèi)并沒(méi)有重寫(xiě)這個(gè)方法,因此直接進(jìn)入 Scheduler 查看蔓姚。
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
//這里的 delay = 0捕虽,也就是馬上執(zhí)行這個(gè)任務(wù)。
//【這個(gè) run 就是我們的目標(biāo)】
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
//核心代碼 createWorker() 創(chuàng)建一個(gè)可以可以執(zhí)行 run 的 worker
final Worker w = createWorker();
//對(duì) run 進(jìn)行了包裝坡脐,實(shí)際上還是 run 這個(gè)對(duì)象泄私。【這個(gè) run 就是我們的目標(biāo)】
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//decoratedRun 交給了 worker 去執(zhí)行
w.schedule(new Runnable() {
@Override
public void run() {
try {
【我們的目標(biāo)在此處被執(zhí)行】
decoratedRun.run();
} finally {
//事件源發(fā)射事件完畢之后备闲,就關(guān)閉連接器晌端。
w.dispose();
}
}
}, delay, unit);
return w;
}
- IoScheduler#createWorker();
現(xiàn)在我們知道我們的任務(wù)是交給 worker.schedule() 去執(zhí)行的。因?yàn)?Worker 是負(fù)責(zé)去執(zhí)行調(diào)度的浅役,因此不同的子類(lèi)會(huì)有不同的 Worker 的實(shí)現(xiàn)斩松,在 Scheduler 中通過(guò) createWorker() 來(lái)獲取子類(lèi)實(shí)現(xiàn)的 Worker 對(duì)象。
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
- Scheduler#Worker
這個(gè)類(lèi)具備延遲執(zhí)行任務(wù)觉既,周期性執(zhí)行任務(wù)的功能。所有的執(zhí)行都是基于 schedule() 方法乳幸,而這個(gè)方法是一個(gè)抽象方法瞪讼,也就是它無(wú)法知道子類(lèi)需要怎么執(zhí)行這個(gè)任務(wù),因?yàn)槊恳环N調(diào)度器執(zhí)行的方式 schedule 都不一樣粹断,因此交給子類(lèi)去實(shí)現(xiàn)符欠。
@NonNull
public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);
- EnentLooerWorker#schedule()
有了 Worker 之后就要開(kāi)始執(zhí)行【我們的任務(wù) action 啦】
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
//【任務(wù) action 】交給 threadWorker 去執(zhí)行
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
- threadWorker.scheduleActual
threadWorker 是 ThreadWorker ,繼承至 NewThreadWorker 瓶埋。
static final class ThreadWorker extends NewThreadWorker
//NewThreadWorker 內(nèi)部維護(hù)一個(gè)線程池 executor希柿。
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
volatile boolean disposed;
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
//最終代碼會(huì)走到這里
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
//對(duì) run 進(jìn)行包裝
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
//上面已經(jīng)提到,delayTime = 0养筒;所以這個(gè)任務(wù)會(huì)被立即執(zhí)行曾撤,
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
@Override
public void run() {
try {
try {
//執(zhí)行原始的 run 方法。
actual.run();
} catch (Throwable e) {
// Exceptions.throwIfFatal(e); nowhere to go
RxJavaPlugins.onError(e);
}
} finally {
Object o = get(PARENT_INDEX);
if (o != DISPOSED && o != null && compareAndSet(PARENT_INDEX, o, DONE)) {
((DisposableContainer)o).delete(this);
}
for (;;) {
o = get(FUTURE_INDEX);
if (o == DISPOSED || compareAndSet(FUTURE_INDEX, o, DONE)) {
break;
}
}
}
}
2.4晕粪、 結(jié)果
f = executor.submit((Callable<Object>)sr); 這里執(zhí)行了 SubscribeTask#run() 方法挤悉,也就是當(dāng)前的訂閱者 Observer 訂閱了上一級(jí)的 Observable 。也就是上一級(jí)的 ObservableCreate.subscribe(observer) 被執(zhí)行了巫湘。請(qǐng)注意它是在子線程中被執(zhí)行的装悲。如果想要了解接下來(lái)的事件源是怎么發(fā)送事件的可以參考RxJava2_整體流程分析