需要注意的分析點
1.裝飾模式 對 上一步observable.create生成的對象 進行包裝
2.NewThreadScheduler
3.RxThreadFactory--ThreadFactory
4.CachedWorkerPool---Runnable
5.createWorker----NewThreadWorker.
6.DisposeTask--Runnable
- w.schedule(task, delay, unit)在線程中執(zhí)行的scheduleActual(action, delayTime, unit, null);
一我纪、RxJava的線程調(diào)度
在RxJava中问拘,要指定上游事件觸發(fā)的線需要通過subscribeOn方法傳入schedulers.
Observable observable = Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
e.onNext(1);
e.onComplete();
}
}).subscribeOn(Schedulers.newThread());
二、subscribleOn包裝事件
1.onAssembly是一個hock方法献丑,如果在subscribeOn前沒有使用其他操作符轉(zhuǎn)換悟民,那么就會返回一個new ObservableSubscribeOn<T>(this, scheduler)對象厕氨。
2.ObservableSubscribeOn<T>(this, scheduler)进每。this表示的是Observable.create對象創(chuàng)建的ObservableCreate對象
而scheduler就是我們的Schedulers.newThread()對象
3.ObservableCreate和ObservableSubscribeOn對象都繼承了Observable對象,這是典型的裝飾模式命斧。目的是對剛創(chuàng)建的Observable進行包裝進行包裝田晚。
所以subscribleOn返回的是一個ObservableSubscribeOn對象
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
三、ObservableSubscribeOn下發(fā)事件
RxJava的事件流下發(fā)是在發(fā)生訂閱事件后(subscribe)方法冯丙,而真正執(zhí)行下發(fā)的是subscribeActual方法(查看父類的subscribe方法)
1.在subscribeActual方法中肉瓦,首先對下游的observer進行了包裝
2.調(diào)用了下游observer的onSubscribe方法,所以這個方法是在主線程中調(diào)用的
3. SubscribeOnObserver是對下游observer進行包裝
4. SubscribeTask是一個Runnable胃惜,負(fù)責(zé)觸發(fā)上游observable對下游observer元素的訂閱(事件業(yè)務(wù)觸發(fā)及傳遞)泞莉。
5.那么scheduler.scheduleDirect(Runnable)方法一定是負(fù)責(zé)開啟線程的類,通過上面的代碼知道scheduler是NewThreadScheduler
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//上游的onSubscribe方法仍然在主線程中
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
/**
* 創(chuàng)建一個線程船殉,負(fù)責(zé)執(zhí)行subscribe方法鲫趁,subscribe方法內(nèi)部通常
* 調(diào)用了onNext,onComplete等方法利虫,這樣上游的所有方法都會在一個新的線程中執(zhí)行
*/
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
四挨厚、Schedule的使用
1.NewThreadScheduler首先創(chuàng)建了一個RxThreadFactory的線程工廠
2.將ThreadFactory交給了Worker堡僻,Worker是實際執(zhí)行線程的地方
3.NewThreadWorker內(nèi)部通過ScheduledExecutorService來管理線程,ScheduledExecutorService是一個可以將周期性任務(wù)通過線程池來執(zhí)行的類疫剃,所以scheduleDirect傳入0毫秒钉疫,表示立即執(zhí)行
4.schedule調(diào)用worker的方法來執(zhí)行線程
//表示立即執(zhí)行
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
父類的方法
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
//調(diào)用子類的方法創(chuàng)建worker
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//對runnable進行包裝
DisposeTask task = new DisposeTask(decoratedRun, w);
//執(zhí)行線程
w.schedule(task, delay, unit);
return task;
}
final ThreadFactory threadFactory;
//創(chuàng)建線程時巢价,給線程追加的前綴
private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler";
private static final RxThreadFactory THREAD_FACTORY;
/** The name of the system property for setting the thread priority for this Scheduler. */
private static final String KEY_NEWTHREAD_PRIORITY = "rx2.newthread-priority";
static {
int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
Integer.getInteger(KEY_NEWTHREAD_PRIORITY, Thread.NORM_PRIORITY)));
THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);
}
public NewThreadScheduler() {
this(THREAD_FACTORY);
}
public NewThreadScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}
@NonNull
@Override
public Worker createWorker() {
return new NewThreadWorker(threadFactory);
}
RxThreadFactory:負(fù)責(zé)給創(chuàng)建的線程追加前綴,以及通過AtomicLong來管理創(chuàng)建的線程個數(shù)
/**
* A ThreadFactory that counts how many threads have been created and given a prefix,
* sets the created Thread's name to {@code prefix-count}.
*/
public final class RxThreadFactory extends AtomicLong implements ThreadFactory {
.....
@Override
public Thread newThread(Runnable r) {
StringBuilder nameBuilder = new StringBuilder(prefix).append('-').append(incrementAndGet());
String name = nameBuilder.toString();
Thread t = nonBlocking ? new RxCustomThread(r, name) : new Thread(r, name);
t.setPriority(priority);
t.setDaemon(true);
return t;
}
....
}
NewThreadWorker內(nèi)部有一個ScheduledExecutorService來管理線程任務(wù),可以延時壤躲,立即或是周期性執(zhí)行
1.內(nèi)部有ScheduledExecutorService執(zhí)行周期性任務(wù)
2.scheduleActual是真正執(zhí)行線程的方法,這過程中也對runnable進行了一定的封裝
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
....
//
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
// 執(zhí)行任務(wù)
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
....
}
總結(jié)
subscribeOn方法會對上游創(chuàng)建的Obserable對象進行一次包裝碉克,當(dāng)完成對下游事件的訂閱時,會觸發(fā)它的subscribeActual方法漏麦,而這個發(fā)放內(nèi)部會啟動一個線程去觸發(fā)obserable的subscribe方法客税。這樣上游的所有事件都發(fā)生在指定的線程中了