- 1簸州、關(guān)于線程調(diào)度的例子, 就只用過(guò)幾個(gè)操作符, 所以只針對(duì)這幾個(gè)操作符進(jìn)行源碼閱讀;
- 2、關(guān)于線程調(diào)度, 有下面幾個(gè)api需要分析:
Schedulers.newThread();
AndroidSchedulers.mainThread()
Schedulers.io();
- 3歧譬、關(guān)于newThread與io, 是如何操作線程池?
- 4岸浑、demo1講Schedulers.newThread(), demo2講Schedulers.io()
- 5、切記一句話, 一旦看暈了, 趕緊翻到最后結(jié)合流程圖嘗試對(duì)當(dāng)前片段的理解;
demo1 :
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
LogUtils.log(Note01.class, "subscribe()->ThreadName:" + Thread.currentThread().getName());
emitter.onNext(1);
emitter.onComplete();
}
})
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
LogUtils.log(Note01.class, "onSubscribe()->ThreadName:" + Thread.currentThread().getName());
}
@Override
public void onNext(Integer value) {
LogUtils.log(Note01.class, "onNext()->ThreadName:" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
LogUtils.log(Note01.class, "onError()->ThreadName:" + Thread.currentThread().getName());
}
@Override
public void onComplete() {
LogUtils.log(Note01.class, "onComplete()->ThreadName:" + Thread.currentThread().getName());
}
});
- 打印結(jié)果:
04-24 21:05:57.418 3141-3141/ Note01->onSubscribe()->ThreadName:main
04-24 21:05:57.418 3141-3241/ Note01->subscribe()->ThreadName:RxNewThreadScheduler-1
04-24 21:05:57.418 3141-3141/ Note01->onNext()->ThreadName:main
04-24 21:05:57.418 3141-3141/ Note01->onComplete()->ThreadName:main
一瑰步、Schedulers.newThread:
1.1 Schedulers.newThread
public final class Schedulers {
static final Scheduler NEW_THREAD;
static {
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new Callable<Scheduler>() {
@Override
public Scheduler call() throws Exception {
return NewThreadHolder.DEFAULT;
}
});
}
public static Scheduler newThread() {
return NEW_THREAD;
}
static final class NewThreadHolder {
static final Scheduler DEFAULT = NewThreadScheduler.instance();
}
}
public final class NewThreadScheduler extends Scheduler {
public static NewThreadScheduler instance() {
return INSTANCE;
}
private static final NewThreadScheduler INSTANCE = new NewThreadScheduler();
}
- 主要是構(gòu)建Schedules的實(shí)例, 實(shí)際指向NewThreadScheduler, 給這里的Schedulers打算標(biāo)簽,Schedulers_1(NewThreadScheduler);
1.2 Observable.subscribeOn:
public abstract class Observable<T> implements ObservableSource<T> {
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
return new ObservableSubscribeOn<T>(this, scheduler);
}
}
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
/**
* 1. ObservableSubscribeOn持有Observable_1(ObservableCreate)的引用,
* ObservableSubscribeOn持有Scheduler_1(NewThreadScheduler)的引用, 繼續(xù)模塊<1.4>;
* 2. 給此處返回的ObservableSubscribeOn打上標(biāo)簽Observable_2(ObservableSubscribeOn);
*/
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
}
1.3 AndroidSchedulers.mainThread:
public final class AndroidSchedulers {
private static final class MainHolder {
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
}
- 創(chuàng)建一個(gè)持有主線程Handler的HandlerScheduler實(shí)例, 給此處的HandlerScheduler打上標(biāo)簽, Schedulers_2(HandlerScheduler);
1.4 Observable.observeOn:
public abstract class Observable<T> implements ObservableSource<T> {
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
/**
* 1. 給此處構(gòu)造的實(shí)例ObservableObserveOn打上標(biāo)簽為Observable_3(ObservableObserveOn),
* 而這里的this指向的是Observable_2(ObservableSubscribeOn);
* 2. 同時(shí)Observable_3(ObservableObserveOn)持有Schedulers_2(HandlerScheduler)的引用;
*/
return new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize);
}
}
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
}
}
- 1.1 ~ 1.4僅僅是進(jìn)行了初始化實(shí)例的操作, 很關(guān)鍵的一點(diǎn)是每次調(diào)用都會(huì)返回一個(gè)Observable對(duì)象, 該Observable對(duì)象會(huì)持有前一個(gè)Observable的引用, 這點(diǎn)也是RxJava鏈?zhǔn)秸{(diào)用的一個(gè)核心;
1.5 Observable.subscribe:
- 結(jié)合RxJava系列_01ObservableEmitter可知, Observable.subscribe實(shí)際會(huì)觸發(fā)最后一個(gè)Observable的subscribeActual方法;
1.6 Observable3(ObservableObserveOn).subscribe:
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Observer<? super T> actual;
protected final ObservableSource<T> source;
/**
* Observable3(ObservableObserveOn).subscribe最終會(huì)觸發(fā)subscribeActual的執(zhí)行;
*/
@Override
protected void subscribeActual(Observer<? super T> observer) {
Scheduler.Worker w = scheduler.createWorker();
/**
* 1. 然后傳入Observer的實(shí)例, 此處給Observer打上標(biāo)簽Observer_1(Observer);
* 2. 然后將Observer_1(Observer)與w(HandlerScheduler)封裝進(jìn)ObserveOnObserver,
* 給此處的ObserveOnObserver打上標(biāo)簽Observer_2(ObserveOnObserver);
* 3. 然后通過(guò)subscribe將Observer_2(ObserveOnObserver)傳給Observable2(ObservableObserveOn);
*/
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
final class HandlerScheduler extends Scheduler {
private final Handler handler;
HandlerScheduler(Handler handler) {
this.handler = handler;
}
@Override
public Worker createWorker() {
return new HandlerWorker(handler);
}
}
1.7 Observable_2(ObservableSubscribeOn).subscribeActual:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
@Override
public void subscribeActual(final Observer<? super T> s) {
/**
* 1. 給此處的parent打上標(biāo)簽Observer_3(SubscribeOnObserver);
* 2. Observer_3(SubscribeOnObserver)持有Observer_2(ObserveOnObserver)的引用;
* 3. Observer_2(ObserveOnObserver)通過(guò)onSubscribe持有
* Observer_3(SubscribeOnObserver)的引用, 又是一個(gè)相互持有的過(guò)程;
*/
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
/**
* Observer_2(ObserveOnObserver)通過(guò)onSubscribe持有
* Observer_3(SubscribeOnObserver)的引用, 又是一個(gè)相互持有的過(guò)程進(jìn)入到模塊<1.8>;
*/
s.onSubscribe(parent);
/**
* 1. 此處的schedule實(shí)際為Scheduler_1(NewThreadScheduler), 進(jìn)入模塊<1.9>中;
*/
scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
/**
* 1. 通過(guò)對(duì)模塊<1.9>的分析可知, source.subscribe(parent)運(yùn)行在子線程中;
* 2. 此處的source指向Observable_1(ObservableCreate), 通過(guò)subscribe,
* Observable_1(ObservableCreate)持有parent即Observer_3(SubscribeOnObserver)
* 的引用, 然后進(jìn)入模塊<1.10>;
*/
source.subscribe(parent);
}
});
}
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
final Observer<? super T> actual;
final AtomicReference<Disposable> s;
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
}
1.8 Observer_2(ObserverOnObserver).onSubscribe:
static final class ObserverOnObserver<T> implements Observer<T>, Runnable {
final Observer<? super T> actual;
final Scheduler.Worker worker;
Disposable s;
/**
* 1. 此處的ObserverOnObserver為Observer_2(ObserverOnObserver), 在模塊<1.6>中被創(chuàng)建;
* 2. 有模塊<1.6>可知, 此處的actual指向Observer_1(Observer);
* 3. 而worker指向了Worker(HandlerScheduler);
*/
ObserverOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.actual = actual;
this.worker = worker;
}
@Override
public void onSubscribe(Disposable s) {
/**
* 類似于遞歸的方式, 依次往上調(diào)用, 直到調(diào)用到Observer1.onSubscribe為止;
* 到目前為止還沒(méi)有發(fā)現(xiàn)有子線程的跡象, 所以此時(shí)Observer.onSubscribe(Dispose)運(yùn)行在主線程;
*/
actual.onSubscribe(this);
}
}
1.9 NewThreadScheduler.scheduleDirect:
public abstract class NewThreadScheduler {
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
w.schedule(new Runnable() {
@Override
public void run() {
try {
decoratedRun.run();
} finally {
w.dispose();
}
}
}, delay, unit);
return w;
}
@Override
public Worker createWorker() {
return new NewThreadWorker(THREAD_FACTORY);
}
}
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
volatile boolean disposed;
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
}
- 創(chuàng)建線程池, 使run運(yùn)行在子線程中;
1.10 Observable_1(ObservableCreate).subscribe:
public final class ObservableCreate<T> extends Observable<T> {
@Override
protected void subscribeActual(Observer<? super T> observer) {
/**
* 1. 此處的observer實(shí)際為Observer_3(SubscribeOnObserver);
* 2. source實(shí)際為ObservableOnSubscribe, 通過(guò)subscribe持有CreateEmitter的引用;
*/
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
source.subscribe(parent);
}
}
- 目前算是完成了觀察者觀察事件的注冊(cè), 接下來(lái)被觀察者通過(guò)CreateEmitter發(fā)送幾個(gè)事件看看會(huì)發(fā)生些什么;
1.11 CreateEmitter.onNext:
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (!isDisposed()) {
/**
* 為Observer_3(SubscribeOnObserver)觸發(fā)onNext的執(zhí)行, 進(jìn)入到模塊<1.12>;
*/
observer.onNext(t);
}
}
}
1.12 Observer_3(SubscribeOnObserver).onNext:
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
final Observer<? super T> actual;
final AtomicReference<Disposable> s;
@Override
public void onNext(T t) {
/**
* Observer_3(SubscribeOnObserver)在模塊<1.7>進(jìn)行初始化, 然后傳入Observer_2(ObserveOnObserver)
* 使actual指向Observer_2(ObserveOnObserver);
*/
actual.onNext(t);
}
}
1.13 Observer_2(ObserveOnObserver).onNext:
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable {
@Override
public void onNext(T t) {
if (done) {
return;
}
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
/**
* Observer_2(ObserveOnObserver)在模塊<1.6>處被初始化, 并且傳入HandlerWorker使worker
* 指向worker, 然后觸發(fā)Observer_2(ObserveOnObserver)的run方法的執(zhí)行;
*/
worker.schedule(this);
}
}
/**
* 通過(guò)閱讀HandlerWorker源碼可知, HandlerWorker通過(guò)schedule將run運(yùn)行在主線程中;
*/
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
void drainNormal() {
/**
* actual指向的是Observer_1(Observer);
*/
actual.onNext(v);
}
}
關(guān)于demo1的流程圖:
關(guān)于demo1的流程圖
- 1矢洲、關(guān)于上面的流程大致可以看清楚RxJava的套路了, 首先是Observable<N>持有Observable<N-1>的引用, 然后在Observable<N>的subscribe中傳入Observer1, 并創(chuàng)建Observer2, Observer2持有Observer1的引用, 然后將Observer2傳給Observable<N-1>, 采用遞歸的方式, 直到Observable<1>持有Observer<N>的引用為止, 此時(shí)將Observer<N>傳給CreateEmitter, 然后當(dāng)執(zhí)行CreateEmitter.onNext事件時(shí), Observer.onNext的方式為Observer<N> ---> Observer<1>;
- 2、在1的總結(jié)中也可以看出來(lái), Observable與Observer的索引為逆序持有;
demo2:
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
LogUtils.log(Note01.class, "subscribe()->ThreadName:" + Thread.currentThread().getName());
emitter.onNext(1);
emitter.onComplete();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
LogUtils.log(Note01.class, "onSubscribe()->ThreadName:" + Thread.currentThread().getName());
}
@Override
public void onNext(Object value) {
LogUtils.log(Note01.class, "onNext()->ThreadName:" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
LogUtils.log(Note01.class, "onError()->ThreadName:" + Thread.currentThread().getName());
}
@Override
public void onComplete() {
LogUtils.log(Note01.class, "onComplete()->ThreadName:" + Thread.currentThread().getName());
}
});
關(guān)于demo2, 只分析Scheduler.io, 其他的在demo1里面已經(jīng)進(jìn)行了分析, demo1Schedulers.newThread每次都會(huì)創(chuàng)建一個(gè)新的線程池;