一直覺得多線程是Android開發(fā)工程師的一個硬傷, 感覺一提到多線程就是Handler;
Example:
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
LogUtils.log(Note01.class, "subscribe()->ThreadName:" + Thread.currentThread().getName());
}
})
.subscribeOn(Schedulers.newThread())
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
LogUtils.log(Note01.class, "onSubscribe()->ThreadName:" + Thread.currentThread().getName());
}
@Override
public void onNext(Integer value) {
LogUtils.log(Note01.class, "onNext()->ThreadName:" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
LogUtils.log(Note01.class, "onError()->ThreadName:" + Thread.currentThread().getName());
}
@Override
public void onComplete() {
LogUtils.log(Note01.class, "onComplete()->ThreadName:" + Thread.currentThread().getName());
}
});
- 打印結果
onSubscribe()->ThreadName:main
subscribe()->ThreadName:RxNewThreadScheduler-2
onNext()->ThreadName:RxNewThreadScheduler-2
onComplete()->ThreadName:RxNewThreadScheduler-2
- 打印結果是onSubscribe()為主線程, subscribe, onNext, onComplete()均在子線程中調用;
目前有幾個疑問:
- 1、如何創(chuàng)建線程;
- 2戒劫、子線程切換到主線程時如何進行主子線程通信;
public final class Schedulers {
public static Scheduler newThread() {
return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
}
static final Scheduler NEW_THREAD;
static {
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new Callable<Scheduler>() {
@Override
public Scheduler call() throws Exception {
return NewThreadHolder.DEFAULT;
}
});
}
static final class NewThreadHolder {
static final Scheduler DEFAULT = NewThreadScheduler.instance();
}
}
public final class NewThreadScheduler extends Scheduler {
private static final NewThreadScheduler INSTANCE = new NewThreadScheduler();
}
- 1、.subscribeOn(Schedulers.newThread())里面?zhèn)魅氲腟chedule實際指向NewThreadScheduler;
- 2敲霍、其內部實現(xiàn)等待后邊onXXX系列時繼續(xù)分析;
public abstract class Observable<T> implements ObservableSource<T> {
public final Observable<T> subscribeOn(Scheduler scheduler) {
return new ObservableSubscribeOn<T>(this, scheduler);
}
}
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
}
- 1快耿、創(chuàng)建ObservableSubscribeOn對象, 并將其引用賦給Observer;
- 2彼水、將this即ObservableCreate引用賦給AbstractObservableWithUpstream中的ObservableSource;
- 3条霜、ObservableSubscribeOn內部持有NewThreadScheduler的引用;
.subscribe(new Observer<Integer>() {...}
public abstract class Observable<T> implements ObservableSource<T> {
@Override
public final void subscribe(Observer<? super T> observer) {
subscribeActual(observer);
}
protected abstract void subscribeActual(Observer<? super T> observer);
}
- subscribeActual被子類ObservableSubscribeOn實現(xiàn):
public final class ObservableSubscribeOn<T> {
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
source.subscribe(parent);
}
}));
}
}
- disposable指向SubscribeOnObserver;從代碼中可以看出s.onSubscribe(parent);還沒有創(chuàng)建任何線程, 印證了開始的打印結果;
- 然后看下面代碼是如何創(chuàng)建子線程的;
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
source.subscribe(parent);
}
}));
public abstract class Scheduler {
public Disposable scheduleDirect(Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = run;
w.schedule(new Runnable() {
@Override
public void run() {
try {
decoratedRun.run();
} finally {
w.dispose();
}
}
}, delay, unit);
return w;
}
public abstract Worker createWorker();
}
- 前邊提到過Scheduler由NewThreadScheduler實現(xiàn):
public final class NewThreadScheduler extends Scheduler {
@Override
public Worker createWorker() {
return new NewThreadWorker(THREAD_FACTORY);
}
}
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
}
- 下邊看看是如何創(chuàng)建線程:
public interface ThreadFactory {
Thread newThread(Runnable r);
}
public final class NewThreadScheduler extends Scheduler {
private static final RxThreadFactory THREAD_FACTORY;
static {
THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);
}
}
public final class RxThreadFactory extends AtomicLong implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
StringBuilder nameBuilder = new StringBuilder(prefix).append('-').append(incrementAndGet());
Thread t = new Thread(r, nameBuilder.toString());
t.setPriority(priority);
t.setDaemon(true);
return t;
}
}
- 內部創(chuàng)建線程, 并為線程賦別名;
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
@Override
public Disposable schedule(final Runnable run) {
return schedule(run, 0, null);
}
@Override
public Disposable schedule(final Runnable action, long delayTime, TimeUnit unit) {
return scheduleActual(action, delayTime, unit, null);
}
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, TimeUnit unit, DisposableContainer parent) {
ScheduledRunnable sr = new ScheduledRunnable(run, parent);
Future<?> f = executor.submit((Callable<Object>)sr);
sr.setFuture(f);
return sr;
}
}
executor.submit()->sr的call()方法執(zhí)行:
public final class ScheduledRunnable extends AtomicReferenceArray<Object>
implements Runnable, Callable<Object>, Disposable {
final Runnable actual;
public ScheduledRunnable(Runnable actual, DisposableContainer parent) {
super(2);
this.actual = actual;
}
@Override
public Object call() {
// Being Callable saves an allocation in ThreadPoolExecutor
run();
return null;
}
@Override
public void run() {
try {
try {
actual.run();
} catch (Throwable e) {
// Exceptions.throwIfFatal(e); nowhere to go
RxJavaPlugins.onError(e);
}
} finally {
Object o = get(PARENT_INDEX);
if (o != DISPOSED && o != null && compareAndSet(PARENT_INDEX, o, DONE)) {
((DisposableContainer)o).delete(this);
}
for (;;) {
o = get(FUTURE_INDEX);
if (o == DISPOSED || compareAndSet(FUTURE_INDEX, o, DONE)) {
break;
}
}
}
}
}
- 此處的actual即為我們在ObservableSubscribeOn中new出來的Runnable;
- 下一篇嘗試分析主子線程切換, 這兩篇文章分析完以后會切換回來去分析Atomic系列, Executor系列以及適配器模式, 代理模式, 裝飾模式