本小節(jié)主要是我在學(xué)習(xí)了 subscribeOn 源碼之后的記錄茂契,方便日后查閱蝶桶。
本人能力有限,若內(nèi)容有錯(cuò)誤之處掉冶,請(qǐng)指出真竖,謝謝脐雪。
示例代碼
- 示例代碼,下面這段代碼很簡(jiǎn)單恢共,就是發(fā)送兩個(gè)事件战秋,并且指定在子線程中發(fā)送。接下來(lái)分析這個(gè)過(guò)程實(shí)現(xiàn)讨韭。
//定義事件源
Observable observable = Observable.create(new ObservableOnSubscribe<I
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Except
//發(fā)送兩個(gè)事件
e.onNext(1);
e.onNext(2);
}
})
.subscribeOn(Schedulers.io());//將發(fā)送事件切換到子線程中去執(zhí)行
Observer observer1 = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
//獲取連接器 Disposable 對(duì)象
Log.e("zeal", "onSubscribe");
}
@Override
public void onNext(Integer integer) {
//接受事件
Log.e("zeal", "onNext=" + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
//observer1 訂閱 observable 事件源
observable.subscribe(observer1);
- 執(zhí)行結(jié)果
onSubscribe
onNext=1
onNext=2
開(kāi)始分析
- 從 subscribe 開(kāi)始
- observable.subscribe(observer1) 這段代碼中脂信,我們首先要明白這個(gè) observable 對(duì)象實(shí)際上指的是哪個(gè)對(duì)象?我們使用 subscribeOn 內(nèi)部會(huì)創(chuàng)建一個(gè) Observable 的子類(lèi)并返回透硝,這樣才能實(shí)現(xiàn) rxjava 引以為傲的鏈?zhǔn)秸{(diào)用狰闪。那么 subscribeOn 返回的就是 ObservableSubscribeOn 對(duì)象,而我們之前分析過(guò) Observable 的 subscribe 方法濒生,我們知道最終它會(huì)調(diào)用 subscribeActual 方法埋泵,而這個(gè)方法是抽象的,因此我們?nèi)リP(guān)注 ObservableSubscribeOn 的 subscribeActual 方法罪治。
- rxjava 中的 observer 是一層一層的往上包裝的丽声,也就是每一個(gè) Observable 的 subscribeActual 方法內(nèi)部都會(huì)將 observer 參數(shù)再進(jìn)一層包裝。例如下面就將 s 作為構(gòu)造傳入 SubscribeOnObserver 觉义。
@Override
public void subscribeActual(final Observer<? super T> s) {
//將 s 作為構(gòu)造傳入 SubscribeOnObserver
//SubscribeOnObserver 它即是 observer 類(lèi)型雁社,也是 disposable 類(lèi)型。
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//將連接器parent傳遞給當(dāng)前 s.onSubscribe ,s 就是我們剛才創(chuàng)建的 observer1 對(duì)象晒骇。
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
source.subscribe(parent);
}
}));
}
- s.onSubscribe(parent);
將連接器 parent 傳遞給當(dāng)前 s.onSubscribe ,s 就是我們剛才創(chuàng)建的 observer1 對(duì)象歧胁。
因此 observer1 中的 onSubscribe 方法會(huì)被調(diào)用也就是這個(gè)日志被打印Log.e("zeal", "onSubscribe");
- scheduler 是什么?
scheduler 在之前的章節(jié)已經(jīng)分析過(guò)了厉碟,這里簡(jiǎn)單的提一下喊巍, 通過(guò)
Schedulers.io() 獲取到的 scheduler 就是 IoScheduler 對(duì)象。
- IoScheduler
這個(gè)類(lèi)的注釋還是有必要提一下箍鼓,在下面的分析中會(huì)用到崭参,它表示能在池中創(chuàng)建和緩存一些線程,并且在必要的時(shí)候可以重用這些線程款咖。這個(gè)下面再具體分析何暮。
/**
* Scheduler that creates and caches a set of thread pools and reuses them if possible.
*/
public final class IoScheduler extends Scheduler {
- scheduler.scheduleDirect(runnable)
Schedules the given task on this scheduler non-delayed execution.
在當(dāng)前的調(diào)度器中去執(zhí)行一個(gè)給定的任務(wù) runnable。
從下面的 Runnable 的核心的 run 方法可以知道铐殃,它是負(fù)責(zé)去告知 parent 這個(gè) observer 去訂閱上一層的 Observable 這個(gè)事件源海洼。
new Runnable() {
@Override
public void run() {
source.subscribe(parent);
}
}
- SubscribeOnObserver.onSubscribe 方法內(nèi)部實(shí)現(xiàn)
該方法內(nèi)部通過(guò) setOnce 給對(duì)象的 s 變量賦值為 d,而這個(gè) d 就是上一級(jí) ObservableCreate.subscribeActual 內(nèi)部的 CreateEmitter 對(duì)象。也就是在 SubscribeOnObserver 內(nèi)部保存有上一級(jí)的連接器富腊,這樣就可以控制事件的發(fā)送了坏逢。
@Override
public void onSubscribe(Disposable d) {
DisposableHelper.setOnce(this.s, d);
}
- parent.setDisposable(Disposable)
scheduler.schedulerdDirect(run) 會(huì)返回一個(gè) Disposable 對(duì)象。并且會(huì)設(shè)置給 parent 對(duì)象。注意:也就是之后使用 parent.get() 就不會(huì)為 null 了是整,換句話說(shuō) parent 是作為 Disposable 傳遞給我們?cè)谕獠縿?chuàng)建的訂閱者 observer1 的 onSubscribe(disposable) 的肖揣。
void setDisposable(Disposable d) {
//給當(dāng)前對(duì)象 SubscribeOnObserver 設(shè)置為 d
DisposableHelper.setOnce(this, d);
}
- DisposableHelper.setOnce(this, d);
- 內(nèi)部使用的 ActomicRefrence ,它可以保證對(duì)對(duì)象的操作是原子性的浮入。
- DisposableHelper.setOnce 中的第一個(gè)參數(shù)是 ActomicReference<Disposable>,而 SubscribeOnObserver 就是繼承了 ActomicReference 龙优。
- 因?yàn)檫@里沒(méi)有給初始值,因此這里傳入給 setOnce 的 this 就是 null事秀,因此它會(huì)被賦值為 scheduler.scheduleDirect 返回的 Disposable 對(duì)象彤断。
public static boolean setOnce(AtomicReference<Disposable> field, Disposable d) {
ObjectHelper.requireNonNull(d, "d is null");
//判斷 field 是否為 null,若是為null 那么就將 field 設(shè)置為 d易迹,并且返回 true瓦糟,否則返回false。
if (!field.compareAndSet(null, d)) {
d.dispose();
if (field.get() != DISPOSED) {
reportDisposableSet();
}
return false;
}
return true;
}
- SubscribeOnObserver.dispose()
外部調(diào)用 dispose 方法的話赴蝇,會(huì)通知上一級(jí)的 CreateEmitter 不要再發(fā)送事件菩浙,同時(shí)也會(huì)告訴當(dāng)前類(lèi)的 worker 不要去執(zhí)行這個(gè)任務(wù)了。
@Override
public void dispose() {
//s 指向的是上一級(jí) ObservableCreate 連接器句伶。關(guān)閉上一級(jí)的連接
DisposableHelper.dispose(s);
//這個(gè) dispose(this) 表示的是通過(guò) scheduler.scheduleDirect 返回的 Dispose.dispose() 操作劲蜻。
DisposableHelper.dispose(this);
}
- scheduler.scheduleDirect
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
//創(chuàng)建一個(gè) worker
final Worker w = createWorker();
//包裝 run 對(duì)象
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
//3.由 worker 去執(zhí)行這個(gè)任務(wù)
w.schedule(task, delay, unit);
return task;
}
- IoScheduler.createWorker
這個(gè)方法實(shí)在 Scheduler 子類(lèi)中實(shí)現(xiàn)的,看下面的代碼出現(xiàn)一個(gè) pool,我們猜測(cè)它可能就是用于緩存線程的池子考余。其內(nèi)部返回的就是一個(gè) EventLoopWorker 對(duì)象先嬉,這個(gè)對(duì)象封裝了一個(gè) pool 這個(gè)的池子。
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
- EventLoopWorker 父類(lèi) Worker
它是 IoScheduler 內(nèi)部的靜態(tài)類(lèi)楚堤。從類(lèi)注釋來(lái)看它是在單個(gè)線程或事件循環(huán)上執(zhí)行動(dòng)作的順序調(diào)度器疫蔓。簡(jiǎn)單來(lái)講就用于執(zhí)行任務(wù)的。
/**
* Sequential Scheduler for executing actions on a single thread or event loop.
* <p>
* Disposing the {@link Worker} cancels all outstanding work and allows resource cleanup.
*/
public abstract static class Worker implements Disposable
- pool 是什么身冬?
下面是對(duì) pool 賦值的代碼衅胀,給 pool 執(zhí)行線程工廠,存活時(shí)間酥筝,單位滚躯。
final AtomicReference<CachedWorkerPool> pool;
//在 start 方法給該 pool 賦值
CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
if (!pool.compareAndSet(NONE, update)) {
update.shutdown();
}
- CachedWorkerPool
上面講的 pool 內(nèi)部維護(hù)的就是 CachedWorkerPool 類(lèi)。它內(nèi)部有一個(gè)比較重要的屬性 expiringWorkerQueue嘿歌,它是一個(gè)隊(duì)列掸掏,內(nèi)部維護(hù)的元素是 ThreadWorker ,這個(gè)類(lèi)就是真正去干活的類(lèi)宙帝。這里先暫時(shí)不討論丧凤,待會(huì)再分析。
- pool.get()
上面的** "pool 是什么步脓?"**這一小點(diǎn)中貼出 pool 賦值的代碼愿待,因此 pool.get() 獲取到的對(duì)象就是 update 浩螺,它是 CachedWorkerPool 類(lèi)型的。
CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
- new EventLoopWorker(pool.get());
在 EventLoopWoker 構(gòu)造中傳入一個(gè) CachedWorkerPool 對(duì)象呼盆,給對(duì)應(yīng)的屬性賦值,這里要關(guān)注一個(gè) this.threadWorker = pool.get()蚁廓;獲取的访圃。注意這的 pool 是CachedWorkerPool 類(lèi)型的,上面的 pool 是 AtomicReference<CachedWorkerPool> 類(lèi)型的相嵌,主要要區(qū)分開(kāi)腿时。
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}
- pool.get()
在 EventLoopWorker 構(gòu)造方法內(nèi)部調(diào)用了 pool.get() 方法。這個(gè) get() 是 CachedWorkerPool 內(nèi)部一個(gè)方法饭宾,它的功能就是從 expiringWorkerQueue 隊(duì)列中去獲取一個(gè) ThreadWork 對(duì)象批糟,當(dāng)隊(duì)列中沒(méi)有找到,就創(chuàng)建一個(gè)新的 ThreadWorker 對(duì)象看铆。注意這里并沒(méi)有馬上將其添加到 expiringWorkerQueue 隊(duì)列中徽鼎,至于為什么,待會(huì)再分析弹惦。
ThreadWorker get() {
if (allWorkers.isDisposed()) {
return SHUTDOWN_THREAD_WORKER;
}
//判斷隊(duì)列是否為空
while (!expiringWorkerQueue.isEmpty()) {
//從隊(duì)列中取出一個(gè) ThreadWorker
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}
//沒(méi)有在隊(duì)列里找到可用的 ThreadWorker 那么就創(chuàng)建一個(gè)否淤。
// No cached worker found, so create a new one.
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
return w;
}
至此 scheduleDirect 方法內(nèi)部成功通過(guò) createWorker() 獲取到對(duì)應(yīng)的 Worker 對(duì)象。
- DisposeTask
在 scheduler.scheduleDirect 方法內(nèi)部會(huì)創(chuàng)建一個(gè) DisposeTask 對(duì)象棠隐,DisposeTask task = new DisposeTask(decoratedRun, w);內(nèi)部封裝了 worker 和 要執(zhí)行的任務(wù) runnable 石抡。關(guān)注一下內(nèi)部實(shí)現(xiàn)可以發(fā)現(xiàn)該類(lèi)是一個(gè) Runnbale 和 Disposable 的實(shí)現(xiàn)類(lèi)。在 run 方法內(nèi)部真正去執(zhí)行任務(wù) decorateRun 的功能助泽。這里為什么要封裝 worker 呢啰扛?在上面分析了 Worker 的功能,它就是用執(zhí)行任務(wù)的嗡贺,因?yàn)閷?shí)現(xiàn)了 Disposable 接口隐解,因此具備暫停任務(wù)的功能。因?yàn)閮?nèi)部若是調(diào)用了 dispose 方法的話诫睬,就會(huì)回調(diào) EventLoopWorker 方法的 dispose() 方法厢漩,停止任務(wù)。
static final class DisposeTask implements Runnable, Disposable {
final Runnable decoratedRun;
final Worker w;
Thread runner;
DisposeTask(Runnable decoratedRun, Worker w) {
this.decoratedRun = decoratedRun;
this.w = w;
}
@Override
public void run() {
runner = Thread.currentThread();
try {
//真正任務(wù) decoratedRun 是在 run 方法中被執(zhí)行岩臣。
decoratedRun.run();
} finally {
//任務(wù)執(zhí)行完畢之后調(diào)用關(guān)閉連接器溜嗜。
dispose();
runner = null;
}
}
@Override
public void dispose() {
if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
((NewThreadWorker)w).shutdown();
} else {
//這里也會(huì)通過(guò) w.dispose() 進(jìn)行關(guān)閉 worker 的執(zhí)行
w.dispose();
}
}
@Override
public boolean isDisposed() {
return w.isDisposed();
}
}
- EvenLoopWorker.dispose() 方法內(nèi)部實(shí)現(xiàn)
上面分析了在 DisposeTask 內(nèi)部為什么要封裝了對(duì)應(yīng)的 Worker 對(duì)象,因?yàn)樵?DisposeTask 內(nèi)部的 decoreRun 執(zhí)行完畢之后架谎,告訴 worker 炸宵,這樣就可以通過(guò) pool.release(threadWorker) 將 threadWorker添加到對(duì)應(yīng)的 expiringWorkerQueue 隊(duì)列中。這里就是上面說(shuō)的在 expiringWorkerQueue 沒(méi)有找對(duì)應(yīng)的 ThreadWorker的情況下谷扣,會(huì)創(chuàng)建了 ThreadWorker 之后土全,但卻沒(méi)有立即添加到 expiringWorkerQueue 隊(duì)列的原因了捎琐,只有執(zhí)行完任務(wù)的 ThreadWorker 才能添加到隊(duì)列中去。
@Override
public void dispose() {
if (once.compareAndSet(false, true)) {
tasks.dispose();
// releasing the pool should be the last action
//釋放一個(gè) threadWorker 裹匙,將其添加到 expiringWorkerQueue 隊(duì)列中
pool.release(threadWorker);
}
}
void release(ThreadWorker threadWorker) {
// Refresh expire time before putting worker back in pool
threadWorker.setExpirationTime(now() + keepAliveTime);
//添加到 expiringWorkerQueue 隊(duì)列中
expiringWorkerQueue.offer(threadWorker);
}
- 開(kāi)始執(zhí)行任務(wù):w.schedule(runnable)
分析完 DisposeWorker 之后現(xiàn)在就開(kāi)始分析 worker 是怎么去執(zhí)行任務(wù)了瑞凑。
w 表示對(duì)應(yīng)的 EventLoopWorker 對(duì)象。w.schedule(runnable
) 表示去執(zhí)行的是一個(gè)任務(wù) runnable概页,下面分析一下內(nèi)部是怎么執(zhí)行的籽御?
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
//真正去執(zhí)行任務(wù)的就是 threadWorker
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
- threadWorker.scheduleActual(action, delayTime, unit, tasks)
threadWorker 是繼承至 NewThreadWorker,最后通過(guò) executor.submit 去執(zhí)行這個(gè)任務(wù)惰匙。
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//包裝
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
//線程池去執(zhí)行這個(gè)任務(wù)技掏。
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
CachedWorkerPool
當(dāng)一個(gè)事件源被多個(gè)訂閱者訂閱的情況, rxjava 內(nèi)部是怎么處理的项鬼?
//偽代碼
Observable observable
//兩個(gè)訂閱者
Observer observer1,observer2
//分別訂閱 observable
obserable.subscribe(observer1);
obserable.subscribe(observer2);
- subscribeActual 方法會(huì)被調(diào)用兩次
核心代碼就是scheduler.scheduleDirect 會(huì)被執(zhí)行兩次哑梳,就是通知 observable 去往兩個(gè) observer 去發(fā)送事件。
@Override
public void subscribeActual(final Observer<? super T> s) {
//將 s 作為構(gòu)造傳入 SubscribeOnObserver
//SubscribeOnObserver 它即是 observer 類(lèi)型绘盟,也是 disposable 類(lèi)型鸠真。
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//將連接器parent傳遞給當(dāng)前 s.onSubscribe ,s 就是我們剛才創(chuàng)建的 observer1 對(duì)象。
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
source.subscribe(parent);
}
}));
}
- IoScheduler.EventLoopWorker 對(duì)象被創(chuàng)建兩次
IoScheduler.EventLoopWorker 對(duì)象被創(chuàng)建兩次,但是由于 subscribeOn(Schedulers.io() ) 只是調(diào)用了一次龄毡,因此 IoScheduler 對(duì)象只有一個(gè)弧哎,也就是說(shuō)內(nèi)部的 CachedWorkerPool 只有一個(gè)。當(dāng)多個(gè)任務(wù)需要執(zhí)行時(shí)稚虎,會(huì)創(chuàng)建兩個(gè) ThreadWorker 去執(zhí)行撤嫩,因?yàn)閮蓚€(gè)任務(wù)會(huì)在不同的線程去執(zhí)行,因此就會(huì)有快有慢之分蠢终,任務(wù)完成之后序攘,就會(huì)釋放這個(gè) ThreadWorker ,并且將其添加到 CachedWorkerPool 的 expiringWorkerQueue 隊(duì)列中寻拂,那么下次有其他訂閱者訂閱該 Observable 時(shí)程奠,就可以直接從該隊(duì)列中取出 ThreadWorker 。這就是對(duì) IoScheduler 類(lèi)的注釋的驗(yàn)證祭钉。
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
線程池
在 NewThreadWorker 中內(nèi)部有一個(gè) executor 瞄沙,它就是線程池,關(guān)注這個(gè)線程池的特點(diǎn)慌核,它是通過(guò) SchedulerPoolFactory 去創(chuàng)建的距境。
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
volatile boolean disposed;
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
...
}
- SchedulerPoolFactory.create
該方法負(fù)責(zé)去創(chuàng)建一個(gè) ScheduledExecutorService 線程池。內(nèi)部的核心線程數(shù)為 1垮卓,最大線程數(shù) maximumPoolSize 為 Integer.MAX_VALUE垫桂。也就是說(shuō)若是通過(guò)同時(shí)有多個(gè)線程去訂閱該事件源,那么會(huì)創(chuàng)建多個(gè)線程粟按。
public static ScheduledExecutorService create(ThreadFactory factory) {
final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
if (exec instanceof ScheduledThreadPoolExecutor) {
ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
POOLS.put(e, exec);
}
return exec;