RxJava源碼分析-線程切換
接著上篇分析筐摘,本篇我們來揭開RxJava線程切換的神秘面試,先上一段代碼
Observable.just("hello,world!")
.map { res->
Log.d("Observable", "thread:" + Thread.currentThread().name)
res+"1234"
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({ res ->
Log.d("Observable", "thread:" + Thread.currentThread().name)
Log.d("Observable", "length:" + res.length)
}, { e ->
Log.d("Observable", "thread:" + Thread.currentThread().name)
e.printStackTrace()
}, {
Log.d("Observable", "thread:" + Thread.currentThread().name)
Log.d("Observable", "onComplete")
},{
Log.d("Observable", "thread:" + Thread.currentThread().name)
Log.d("Observable", "onSubscribe")
})
這段代碼執(zhí)行玩打印的log如下
05-21 10:45:06.109 17068-17068/com.example.pandaguo.rxdemo D/Observable: thread:main
onSubscribe
05-21 10:45:06.115 17068-17086/com.example.pandaguo.rxdemo D/Observable: thread:RxCachedThreadScheduler-1
05-21 10:45:06.165 17068-17068/com.example.pandaguo.rxdemo D/Observable: thread:main
length:16
thread:main
onComplete
可以看到其中在map操作符中執(zhí)行的代碼是在RxCachedThreadScheduler-1線程中執(zhí)行船老,而其余的均是在UI線程咖熟,為什么呢?
- 本文的重點(diǎn)不在數(shù)據(jù)流向分析柳畔,因此前面幾個(gè)函數(shù)不在仔細(xì)分析
Observable.just("hello,world!")
.map { res->
Log.d("Observable", "thread:" + Thread.currentThread().name)
res+"1234"
}
代碼執(zhí)行到這里馍管,我們可以拿到經(jīng)過封裝的數(shù)據(jù)源ObservableMap,其實(shí)就是個(gè)Observable荸镊,那么接下來調(diào)用subscribeOn(Schedulers.io())
來進(jìn)行線程切換的操作了咽斧,我們來一點(diǎn)點(diǎn)的分析,首先看下Schedulers.io()
是怎么創(chuàng)建一個(gè)Scheduler對(duì)象IO的
Schedulers.io()
public final class Schedulers {
...
static final Scheduler IO;
...
static {
IO = RxJavaPlugins.initIoScheduler(new IOTask());
...
}
...
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
...
static final class IOTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return IoHolder.DEFAULT;
}
}
...
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
...
}
- 一部分代碼需要結(jié)合著RxJavaPlugins來一起看
public final class RxJavaPlugins {
...
public static Scheduler initIoScheduler(@NonNull Callable<Scheduler> defaultScheduler) {
ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null");
//初始化時(shí)候 onInitIoHandler = null
Function<? super Callable<Scheduler>, ? extends Scheduler> f = onInitIoHandler;
if (f == null) {
return callRequireNonNull(defaultScheduler);
}
return applyRequireNonNull(f, defaultScheduler);
}
...
static Scheduler callRequireNonNull(@NonNull Callable<Scheduler> s) {
try {
return ObjectHelper.requireNonNull(s.call(), "Scheduler Callable result can't be null");
} catch (Throwable ex) {
throw ExceptionHelper.wrapOrThrow(ex);
}
}
...
public static Scheduler onIoScheduler(@NonNull Scheduler defaultScheduler) {
//初始化時(shí)候onIoHandler = null
Function<? super Scheduler, ? extends Scheduler> f = onIoHandler;
if (f == null) {
return defaultScheduler;
}
return apply(f, defaultScheduler);
}
...
}
- 上面代碼首先
Schedulers.io()
會(huì)調(diào)用RxJavaPlugins.onIoScheduler(IO)
躬存,這里傳入的IO實(shí)際上是一早就初始化的RxJavaPlugins.initIoScheduler(new IOTask())
, IOTask是Schedulers的一個(gè)靜態(tài)內(nèi)部類张惹,實(shí)現(xiàn)了Callable接口,并且在call()方法中返回了一個(gè)IoHolder.DEFAULT
岭洲,這個(gè)IoHolder
其實(shí)是一個(gè)Schedulers的靜態(tài)內(nèi)部類宛逗,然后默認(rèn)會(huì)持有一個(gè)IoScheduler對(duì)象DEFAULT -
RxJavaPlugins.initIoScheduler(new IOTask())
會(huì)調(diào)用到callRequireNonNull()
,我們來看下這個(gè)方法回去調(diào)用s.call()
盾剩,s的類型是IOTask
- 說了那么多
Schedulers.io()
最終就是創(chuàng)建了一個(gè)類型為IoScheduler的對(duì)象雷激,我們先不去看IoScheduler的實(shí)現(xiàn),先來分析Observable.subscribeOn()
的實(shí)現(xiàn)
Observable.subscribeOn()
public abstract class Observable<T> implements ObservableSource<T> {
...
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
...
}
- 還是要結(jié)合RxJavaPlugins來一起看
public final class RxJavaPlugins {
...
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
...
}
- 這里其實(shí)
RxJavaPlugins.onAssembly()
實(shí)際上就還是返回了傳入的參數(shù)告私,也就是創(chuàng)建拿到了一個(gè)ObservableSubscribeOn對(duì)象屎暇,那么線程切換的核心邏輯也就在這個(gè)類中實(shí)現(xiàn),接下來我們來分析這個(gè)類
ObservableSubscribeOn
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
//這里是線程切換的關(guān)鍵
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> actual;
final AtomicReference<Disposable> s;
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable s) {
DisposableHelper.setOnce(this.s, s);
}
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
@Override
public void dispose() {
DisposableHelper.dispose(s);
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
//訂閱實(shí)際發(fā)生的位置
source.subscribe(parent);
}
}
}
- 我們來看下
subscribeActual
的實(shí)現(xiàn)驻粟,s.onSubscribe(parent);
執(zhí)行的時(shí)候我們并沒有看到線程切換的業(yè)務(wù)根悼,所以我們可以肯定Observ.onSubscribe()
一定是在UI線程回調(diào)的,那么為什么map操作符中的邏輯是在另一個(gè)線程呢?parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
中parent.setDisposable()
從實(shí)現(xiàn)上來看是沒有做線程切換的邏輯挤巡,new SubscribeTask(parent)
從實(shí)現(xiàn)上來看僅僅是讓訂閱的操作發(fā)生在SubscribeTask執(zhí)行的線程剩彬,等等我有一個(gè)大膽的想法 ,既然有Runnable對(duì)象了矿卑,那么scheduler.scheduleDirect()
會(huì)不會(huì)就是實(shí)際上去切換線程的操作呢喉恋?我們來追下代碼
Scheduler.scheduleDirect()
public abstract class Scheduler {
...
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
...
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
...
public abstract Worker createWorker();
...
}
- Scheduler是一個(gè)抽象類!
- 上面代碼可以看到母廷,實(shí)際上執(zhí)行的邏輯是在另一個(gè)重載方法scheduleDirect中轻黑,這里調(diào)用
createWorker()
創(chuàng)建了Worker對(duì)象w,然后調(diào)用了w.schedule(task, delay, unit);
去實(shí)現(xiàn)了線程切換的邏輯 - 啥徘意?你說這樣就實(shí)現(xiàn)了線程切換苔悦,我們不信!那我就證明給你看撒椎咧,還記得前面說得么
Schedulers.io()
最終創(chuàng)建了一個(gè)IoScheduler對(duì)象玖详,我們來看下它的定義
IOScheduler
public final class IoScheduler extends Scheduler {
private static final String WORKER_THREAD_NAME_PREFIX = "RxCachedThreadScheduler";
static final RxThreadFactory WORKER_THREAD_FACTORY;
private static final String EVICTOR_THREAD_NAME_PREFIX = "RxCachedWorkerPoolEvictor";
static final RxThreadFactory EVICTOR_THREAD_FACTORY;
private static final long KEEP_ALIVE_TIME = 60;
private static final TimeUnit KEEP_ALIVE_UNIT = TimeUnit.SECONDS;
static final ThreadWorker SHUTDOWN_THREAD_WORKER;
final ThreadFactory threadFactory;
final AtomicReference<CachedWorkerPool> pool;
/** The name of the system property for setting the thread priority for this Scheduler. */
private static final String KEY_IO_PRIORITY = "rx2.io-priority";
static final CachedWorkerPool NONE;
static {
SHUTDOWN_THREAD_WORKER = new ThreadWorker(new RxThreadFactory("RxCachedThreadSchedulerShutdown"));
SHUTDOWN_THREAD_WORKER.dispose();
int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
Integer.getInteger(KEY_IO_PRIORITY, Thread.NORM_PRIORITY)));
WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority);
EVICTOR_THREAD_FACTORY = new RxThreadFactory(EVICTOR_THREAD_NAME_PREFIX, priority);
NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);
NONE.shutdown();
}
static final class CachedWorkerPool implements Runnable {
private final long keepAliveTime;
private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
final CompositeDisposable allWorkers;
private final ScheduledExecutorService evictorService;
private final Future<?> evictorTask;
private final ThreadFactory threadFactory;
CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
this.allWorkers = new CompositeDisposable();
this.threadFactory = threadFactory;
ScheduledExecutorService evictor = null;
Future<?> task = null;
if (unit != null) {
evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
}
evictorService = evictor;
evictorTask = task;
}
@Override
public void run() {
evictExpiredWorkers();
}
ThreadWorker get() {
if (allWorkers.isDisposed()) {
return SHUTDOWN_THREAD_WORKER;
}
while (!expiringWorkerQueue.isEmpty()) {
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}
// No cached worker found, so create a new one.
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
return w;
}
void release(ThreadWorker threadWorker) {
// Refresh expire time before putting worker back in pool
threadWorker.setExpirationTime(now() + keepAliveTime);
expiringWorkerQueue.offer(threadWorker);
}
void evictExpiredWorkers() {
if (!expiringWorkerQueue.isEmpty()) {
long currentTimestamp = now();
for (ThreadWorker threadWorker : expiringWorkerQueue) {
if (threadWorker.getExpirationTime() <= currentTimestamp) {
if (expiringWorkerQueue.remove(threadWorker)) {
allWorkers.remove(threadWorker);
}
} else {
// Queue is ordered with the worker that will expire first in the beginning, so when we
// find a non-expired worker we can stop evicting.
break;
}
}
}
}
long now() {
return System.nanoTime();
}
void shutdown() {
allWorkers.dispose();
if (evictorTask != null) {
evictorTask.cancel(true);
}
if (evictorService != null) {
evictorService.shutdownNow();
}
}
}
public IoScheduler() {
this(WORKER_THREAD_FACTORY);
}
public IoScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference<CachedWorkerPool>(NONE);
start();
}
@Override
public void start() {
CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
if (!pool.compareAndSet(NONE, update)) {
update.shutdown();
}
}
@Override
public void shutdown() {
for (;;) {
CachedWorkerPool curr = pool.get();
if (curr == NONE) {
return;
}
if (pool.compareAndSet(curr, NONE)) {
curr.shutdown();
return;
}
}
}
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
...
static final class EventLoopWorker extends Scheduler.Worker {
private final CompositeDisposable tasks;
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;
final AtomicBoolean once = new AtomicBoolean();
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}
@Override
public void dispose() {
if (once.compareAndSet(false, true)) {
tasks.dispose();
// releasing the pool should be the last action
pool.release(threadWorker);
}
}
@Override
public boolean isDisposed() {
return once.get();
}
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
}
...
}
- 代碼比較長,我們對(duì)著
Scheduler.scheduleDirect()
的流程來看下IOScheduler勤讽,首先我們來看createWorker()
- 可以看到這里是通過
pool.get()
獲取了一個(gè)CachedWorkerPool蟋座,這個(gè)pool是IOScheduler的成員變量是在構(gòu)造方法中進(jìn)行初始化的,它是一個(gè)AtomicReference能夠保證針對(duì)持有對(duì)象的原子操作,換句話說能夠保證線程的安全
public IoScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference<CachedWorkerPool>(NONE);
start();
}
- 默認(rèn)其持有的是一個(gè)
NONE
脚牍,是在靜態(tài)代碼塊中初始化完成的
static {
...
NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);
NONE.shutdown();
}
- 緊接著的start方法中會(huì)替換為新的值向臀,并且這個(gè)CachedWorkerPool處于非shutDown狀態(tài)
public void start() {
CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
if (!pool.compareAndSet(NONE, update)) {
update.shutdown();
}
}
- CachedWorkerPool是什么我們后面再去分析,我們回到
Scheduler.scheduleDirect()
接著往下看诸狭,后續(xù)會(huì)將創(chuàng)建的Worker對(duì)象w與傳入的Runnable接口對(duì)象run封裝成一個(gè)DisposeTask對(duì)象task券膀,之后調(diào)用Worker對(duì)象w的schedule方法也就是EventLoopWorker的schedule方法 - 這里說下DisposeTask其實(shí)就是代理了傳入的Runnable對(duì)象run,在其run()方法中會(huì)調(diào)用到傳入的Runnable對(duì)象的run方法
static final class DisposeTask implements Disposable,
Runnable, SchedulerRunnableIntrospection {
final Runnable decoratedRun;
...
DisposeTask(Runnable decoratedRun, Worker w) {
this.decoratedRun = decoratedRun;
...
}
...
public void run() {
runner = Thread.currentThread();
try {
decoratedRun.run();
} finally {
dispose();
runner = null;
}
}
...
}
- EventLoopWorker.schedule這里會(huì)調(diào)用
threadWorker.scheduleActual(action, delayTime, unit, tasks)
public Disposable schedule(@NonNull Runnable action, long delayTime,
@NonNull TimeUnit unit) {
...
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
- threadWorker是在EventLoopWorker的構(gòu)造方法中進(jìn)行初始化的
EventLoopWorker(CachedWorkerPool pool) {
...
this.threadWorker = pool.get();
}
- 可以看到就是從CachedWorkerPool中獲取的驯遇,我們來看下CachedWorkerPool的get()方法
static final class CachedWorkerPool implements Runnable {
...
ThreadWorker get() {
if (allWorkers.isDisposed()) {
return SHUTDOWN_THREAD_WORKER;
}
while (!expiringWorkerQueue.isEmpty()) {
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}
// No cached worker found, so create a new one.
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
return w;
}
...
}
- 這段代碼其實(shí)就是先從緩存中看看能不能拿到一個(gè)已經(jīng)緩存下來的ThreadWorker芹彬,如果沒有就創(chuàng)建一個(gè)新的ThreadWorker對(duì)象并緩存起來,接下來我們看下ThreadWorker的實(shí)現(xiàn)
ThreadWorker
static final class ThreadWorker extends NewThreadWorker {
private long expirationTime;
ThreadWorker(ThreadFactory threadFactory) {
super(threadFactory);
this.expirationTime = 0L;
}
public long getExpirationTime() {
return expirationTime;
}
public void setExpirationTime(long expirationTime) {
this.expirationTime = expirationTime;
}
}
- 可以看到并沒有實(shí)現(xiàn)特殊的方法叉庐,所以其大部門實(shí)現(xiàn)都是在NewThreadWorker中的舒帮,回到前面分析的,我們說過EventLoopWorker.schedule會(huì)調(diào)用
threadWorker.scheduleActual(action, delayTime, unit, tasks)
陡叠,在NewThreadWorker中最終會(huì)調(diào)用到scheduleActual這個(gè)方法玩郊,我們來看下具體實(shí)現(xiàn)
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//包裝一層傳入的參數(shù)run
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
//通過線程池去執(zhí)行run
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
- 這里會(huì)將傳入的Runnable再做一層包裝,之后通過線程池去submit或者schedule執(zhí)行對(duì)應(yīng)的任務(wù)枉阵,這個(gè)Runnable對(duì)象就是前面我們分析的SubscribeTask译红,還記得SubscribeTask的run方法執(zhí)行了什么么?
source.subscribe(parent);
- 所以map操作符中所有的流程就是執(zhí)行在線程池之中
- 對(duì)于ObservableOn()原理也是一樣的兴溜,只不過開源庫中通過Handler將獲取MainThread.Looper然后將其切回UI線程临庇,看客大佬們可以跟下源碼分析一波