subscribeOn
Observable.create((ObservableOnSubscribe<Integer>) e -> {
System.out.println("observable : " + Thread.currentThread());
e.onNext(1);
})
.subscribeOn(Schedulers.single())
.subscribe(integer -> {
System.out.println(integer);
System.out.println("observer: " + Thread.currentThread());
});
Rxjava默認是在當前線程生發(fā)送事件, subscribeOn
可以切換Observable發(fā)送事件所在的線程;
如果沒有使用ObserveOn指定消費事件的線程, Observer將在Observable發(fā)送事件的的線程, 消費事件;
源碼分析目的:
- Schduler 作用
- subscribeOn 做了什么
1. Schduler
Schduler不好直接用代碼解釋, 先說結(jié)論, 后面再去具體代碼分析;
- 切換線程, 需要提供對應(yīng)的Schduler;
- Schduler可以通過
createWorker
方法, 創(chuàng)建一個Worker
類的實例; - Worker有一個
schedule
方法, 提交runnable去運行; 切換線程, 就是把各個onNext的調(diào)用方法,封裝成一個runnable 提交到指定線程去運行; - 通過
Worker.schedule
提交runnable后, 會返回一個disposable對象, 用于取消或控制Observale的發(fā)射任務(wù); -
Schduler
本身是個管理類, 一般內(nèi)部會創(chuàng)建具體的線程池, 同時通過統(tǒng)一的start
shutdown
等方法管理著線程池 -
Schduler
同時也管理著由createWorker
創(chuàng)建的Worker
; Worker一般都是持有Schduler中的線程池, 提交的runnable也是提交到該線程池
2. subscribeOn
2.1 Observable.subscribeOn
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
1. 參數(shù)檢測
2. 創(chuàng)建`ObservableOnSubscribe`對象, 并將當前Observable和Schduler傳入;
3. RxJavaPlugins的hook; 這個前面說過, 用于hook, 默認傳入什么 就返回什么;
2.2 ObservableSubscribeOn
-
ObservableSubscribeOn
是Observable的子類, 內(nèi)部包含一個Observable
和schduler
, 用于對原Obverable擴展, 是一個裝飾模式; - 上面說了,
ObservableSubscribeOn
是一個裝飾模式, 繼承于HasUpstreamObservableSource
, 有一個source
方法去獲取被裝飾的Observable對象; - 上一篇說過, Observable.create方法創(chuàng)建的Observable, 實際是一個
ObservableCreater
對象, 現(xiàn)在ObservableScbscribeOn中包含的Observable即ObservableCreater; - Observable的subscribe方法, 實際調(diào)用的是具體子類的
subscribeActual
方法;
2.3 ObservableSubscribeOn.subscribeActual
直接看 ObservableSubscribeOn.subscribeActual
的代碼;
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
- 創(chuàng)建
SubscribeOnObserver
, 并傳入原始的Observer - 調(diào)用Observer的
onSubscribe
方法 - 構(gòu)建
SubscribeTask
, 并提交給Schduler去執(zhí)行
2.4 SubscribeOnObserver
SubscribeOnObserver
是ObservableSubscribeOn
的靜態(tài)內(nèi)部類, 同時也是繼承于Observer
, 內(nèi)部也包含一個原始的Observer
, 也是一個裝飾模式;
SubscribeOnObserver對被裝飾類沒有額外增加功能, 僅僅是一個封裝, 在onNext
, onError
等方法中, 直接是調(diào)用的actual.onNext
, actual.onError
;
2.5 SubscribeTask
SubscribeTask
是一個runnable對象, 是ObservableSubscribeOn的內(nèi)部類; 前面Schduler中說過, 切換線程, 就是將消息發(fā)送,包裝成一個runnable, 提交給Worker
去執(zhí)行;
這個SubscribeTask
將原先的發(fā)送事件代碼 封裝成的runnable, 然后送去對應(yīng)的線程池執(zhí)行;
直接看run方法
public void run() {
source.subscribe(parent);
}
ObservableSubscribeOn
是Observable的子類, 同時是裝飾模式, 內(nèi)部持有一個Observable, source是被包裝的Observable, 在此處的代碼中, source即是ObservableCreater
, parent是SubscribeOnObserver
, source.subscribe即和第一篇中的邏輯一樣了;
2.6 scheduler.scheduleDirect
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
- 通過
createWorker
創(chuàng)建相應(yīng)的Worker
; - hook處理相應(yīng)的runnable, 默認沒處理;
- 創(chuàng)建DisposeTask, 將需要運行的runnable對象, 封裝成disposable對象, 用于執(zhí)行取消操作;
- 將封裝后的runnable提交給worker去運行;
此處的scheduler由Schedulers.single()
生成, 實際是一個SingleScheduler
;
2.6.1 Worker.schedule()
直接看 SingleScheduler
的代碼
####### Schedulers.createWorker 創(chuàng)建Worker; 獲取公共的線程池, 創(chuàng)建Worker
public Worker createWorker() {
return new ScheduledWorker(executor.get());
}
####### Worker.scheduler
public Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
if (disposed) {
return EmptyDisposable.INSTANCE;
}
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, tasks);
tasks.add(sr);
try {
Future<?> f;
if (delay <= 0L) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delay, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
dispose();
RxJavaPlugins.onError(ex);
return EmptyDisposable.INSTANCE;
}
return sr;
}
- 封裝傳入的
runnable
對象, 將其封裝成ScheduledRunnable
對象 - 提交給線程池運行,
ScheduledRunnable
本身是一個Callable
對象, 可以用于取消執(zhí)行
上述提交給線程池運行的流程, 最終封裝的運行的run
方法, 其實還是最先封裝的SubscribeTask
中的source.subscribe(parent);
這一句代碼;
SubscribeTask
本身對應(yīng)的runnable
被一次次傳遞封裝, 最后給線程池運行;
source.subscribe(parent);
中, 上面說到是一個裝飾模式, 運行的還是Observable.subscribeActual
方法, 最后的運行邏輯和上一篇相同;
最后會調(diào)到ObservableSubscribeOn.onNext
方法, 內(nèi)部沒做處理, 裝飾模式,調(diào)用上一級的onNext
方法