Scheduler
通過 Scheduler 來控制被觀察者在哪個線程發(fā)射质欲,觀察者在哪個線程接收捻脖。默認(rèn)情況肝匆,發(fā)射時在哪個線程,接收就在哪個線程坛悉。
RxJava 內(nèi)置了幾個 Scheduler,通過 Schedulers 來獲取承绸。
- Schedulers.trampoline():當(dāng)其它排隊(duì)的任務(wù)完成后,在當(dāng)前線程排隊(duì)開始執(zhí)行挣轨,F(xiàn)IFO军熏。
- Schedulers.newThread(): 總是啟用新線程,并在新線程執(zhí)行操作卷扮。
- Schedulers.single():擁有一個線程,和 newThread 相比晤锹,這個線程可以共用摩幔。
- Schedulers.computation():計(jì)算所使用的 Scheduler鞭铆。這個計(jì)算指的是 CPU 密集型計(jì)算,即不會被 I/O 等操作限制性能的操作车遂,例如圖形的計(jì)算封断。這個 Scheduler 使用的固定的線程池,大小為 CPU 核數(shù)舶担。不要把 I/O 操作放在 computation() 中坡疼,否則 I/O 操作的等待時間會浪費(fèi) CPU。
- Schedulers.io():I/O 操作(讀寫文件衣陶、讀寫數(shù)據(jù)庫柄瑰、網(wǎng)絡(luò)信息交互等)所使用的 Scheduler。行為模式和 newThread() 差不多剪况,區(qū)別在于 io() 的內(nèi)部實(shí)現(xiàn)是用一個無數(shù)量上限的線程池教沾,可以重用空閑的線程,因此多數(shù)情況下 io() 比 newThread() 更有效率拯欧。不要把計(jì)算工作放在 io() 中详囤,可以避免創(chuàng)建不必要的線程。
切換線程
- subscribeOn(): 控制事件產(chǎn)生的線程镐作。
- observeOn(): 控制事件消費(fèi)的線程藏姐。
- unsubscribeOn(): 控制解除訂閱時的線程。
Observable.create(ObservableOnSubscribe<String> { emitter ->
Log.e("RX", "發(fā)射線程 ${Thread.currentThread().name}")
emitter.onNext("a")
}).subscribeOn(Schedulers.io()) // 在 io 線程發(fā)射
.observeOn(Schedulers.computation()) // 在計(jì)算線程接收
.subscribe( {
Log.e("RX", "接收線程 ${Thread.currentThread().name}")
Log.e("RX", "接收數(shù)據(jù) $it")
})
日志顯示:
發(fā)射線程 RxCachedThreadScheduler-1
接收線程 RxComputationThreadPool-1
接收數(shù)據(jù) a
如果 observeOn(Schedulers.trampoline())
该贾,意思是在當(dāng)前線程羔杨,由于發(fā)射時將線程切換到 io 上去了,所以接收時也在這個 io 線程上杨蛋,日志顯示:
發(fā)射線程 RxCachedThreadScheduler-1
接收線程 RxCachedThreadScheduler-1
接收數(shù)據(jù) a
多次切換接收線程
Observable.create(ObservableOnSubscribe<String> { emitter ->
Log.e("RX", "發(fā)射線程 ${Thread.currentThread().name}")
emitter.onNext("a")
}).subscribeOn(Schedulers.io()) // 在 io 線程發(fā)射
.observeOn(Schedulers.computation())
.map {
Log.e("RX", "第一次轉(zhuǎn)換數(shù)據(jù)的線程 ${Thread.currentThread().name}")
"$it$it"
} // 雙倍
.observeOn(Schedulers.newThread())
.map {
Log.e("RX", "第二次轉(zhuǎn)換數(shù)據(jù)的線程 ${Thread.currentThread().name}")
"$it$it" } // 四倍
.observeOn(Schedulers.single())
.subscribe( {
Log.e("RX", "接收線程 ${Thread.currentThread().name}")
Log.e("RX", "接收數(shù)據(jù) $it")
})
日志:
發(fā)射線程 RxCachedThreadScheduler-1
第一次轉(zhuǎn)換數(shù)據(jù)的線程 RxComputationThreadPool-1
第二次轉(zhuǎn)換數(shù)據(jù)的線程 RxNewThreadScheduler-1
接收線程 RxSingleScheduler-1
接收數(shù)據(jù) aaaa
源碼分析
對象創(chuàng)建
- Observable 通過 create 方法創(chuàng)建 ObservableCreate兜材,將參數(shù) ObservableOnSubscribe 作為 ObservableCreate 的 source理澎。
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
...
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
- 在上面返回的 ObservableCreate 上調(diào)用 subscribeOn,創(chuàng)建 ObservableSubscribeOn曙寡,ObservableCreate 對象作為 source糠爬。
public final Observable<T> subscribeOn(Scheduler scheduler) {
...
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
- 在上面返回的 ObservableSubscribeOn 上調(diào)用 observeOn,創(chuàng)建 ObservableObserveOn举庶,ObservableSubscribeOn 自己作為 ObservableObserveOn 的 source执隧。
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
...
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
基本上每調(diào)一個方法,就創(chuàng)建了一個 Observable 的實(shí)現(xiàn)類户侥,然后將上層調(diào)用它的 Observable 作為自己內(nèi)部的 source镀琉。
訂閱
向上傳遞觀察者
最后調(diào)用 ObservableObserveOn 的 subscribe,內(nèi)部調(diào)用 subscribeActual蕊唐,參數(shù)是用戶傳遞進(jìn)來的 Observer屋摔。
protected void subscribeActual(Observer<? super T> observer) {
// 如果是 TrampolineScheduler,在當(dāng)前線程執(zhí)行替梨,不涉及任何線程的切換钓试,所以直接調(diào) source.subscribe
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
如果 scheduler 不是 TrampolineScheduler,比如 NewThreadScheduler耙替,先調(diào)用 createWorker 方法亚侠,創(chuàng)建 NewThreadWorker,實(shí)現(xiàn)了 Disposable俗扇。
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
...
}
構(gòu)造中創(chuàng)建一個線程池硝烂,再深入下去看,創(chuàng)建了一個只有一個核心線程的線程池對象铜幽。然后根據(jù)外界傳入的 Observer 和這個線程池封裝出另一個 Observer滞谢。所以在 ObservableSubscribeOn 的對象 source 上用重新封裝好的一個觀察者訂閱它。這樣就會調(diào)用 ObservableSubscribeOn 的 subscribeActual 方法除抛。
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
第一步對下級的被觀察者拋上來的觀察者再封裝一次狮杨,這個 s 其實(shí)就是 ObservableObserveOn 中封裝后的 ObserveOnObserver。
-
第二步調(diào)用 onSubscribe到忽,看 ObservableObserveOn 的 onSubscribe 方法
public void onSubscribe(Disposable s) { if (DisposableHelper.validate(this.s, s)) { this.s = s; // 創(chuàng)建一個隊(duì)列 queue = new SpscLinkedArrayQueue<T>(bufferSize); // actual 是下級的觀察者橄教,這里最外層用戶傳進(jìn)來的那個 Observer 就收到了 onSubscribe 回調(diào) actual.onSubscribe(this); } }
第三步是最主要的開始發(fā)射數(shù)據(jù)
向下發(fā)射數(shù)據(jù)
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
SubscribeTask 是一個 Runnable,內(nèi)部持有最后封裝的 Observer喘漏。且 run 方法就是用這個觀察者去訂閱 source护蝶。在這個例子里,ObservableSubscribeOn 中的 source 就是上層 ObservableCreate翩迈。
而 scheduler.scheduleDirect 這個 Runnable持灰,假設(shè)發(fā)射的 schedule 是 IoScheduler,scheduleDirect 最后是
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
// 創(chuàng)建線程负饲,返回 EventLoopWorker
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
看最后調(diào)用 EventLoopWorker 的 schedule 方法
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
...
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
這里使用線程池方法執(zhí)行參數(shù) run 里面的任務(wù)堤魁,參數(shù)這個 run 是 DisposeTask 對象喂链。
public void run() {
runner = Thread.currentThread();
try {
decoratedRun.run();
} finally {
dispose();
runner = null;
}
}
這個 decoratedRun 就相當(dāng)于上面的 SubscribeTask。run 方法中是用經(jīng)過若干層封裝的觀察者訂閱最初的那個被觀察者妥泉。
觀察者收到數(shù)據(jù)
SubscribeOnObserver 收到 onNext
public void onNext(T t) {
actual.onNext(t);
}
這樣就進(jìn)了 ObserveOnObserver 的 onNext
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
把數(shù)據(jù)先加到上面創(chuàng)建的那個隊(duì)列里椭微。
void schedule() {
// 如果之前是 0
if (getAndIncrement() == 0) {
// 這里就是切換接收數(shù)據(jù)的線程
worker.schedule(this);
}
}
線程執(zhí)行自己的 run 方法
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
void drainNormal() {
...
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
for (;;) {
...
for (;;) {
...
try {
v = q.poll(); // 取出隊(duì)列里的值
} catch (Throwable ex) {
...
}
...
a.onNext(v); // 發(fā)射給觀察者
}
...
}
}
// 這個暫時不清楚是何時調(diào)用
void drainFused() {
...
for (;;) {
...
actual.onNext(null); // 發(fā)射空值
if (d) {
ex = error;
if (ex != null) {
actual.onError(ex); // 發(fā)射事件
} else {
actual.onComplete();
}
worker.dispose();
return;
}
...
}
}
最后將數(shù)據(jù)發(fā)射給最外層的觀察者,即這個 actual盲链,同時運(yùn)行在 observerOn 指定的線程上赏表。