目錄
- 一、觀察者Observer創(chuàng)建過(guò)程
- 二、被觀察者Observable創(chuàng)建過(guò)程
- 三屡谐、subscribe訂閱過(guò)程
- 四、map操作符
- 五剃根、線程切換原理
簡(jiǎn)單示例1:
private Disposable mDisposable;
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("test");
}
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
mDisposable = d;
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
@Override
protected void onDestroy() {
super.onDestroy();
if (mDisposable != null) {
if (!mDisposable.isDisposed()) {
mDisposable.dispose();
}
}
}
特別注意:上面示例代碼中的mDisposable最后必須要釋放掉,不然會(huì)出現(xiàn)內(nèi)存泄漏
一前方、觀察者Observer創(chuàng)建過(guò)程
首先對(duì)觀察者Observer源碼開(kāi)始進(jìn)行簡(jiǎn)單分析下:
Observer.java
public interface Observer<T> {
//表示一執(zhí)行subscribe訂閱就會(huì)執(zhí)行該函數(shù)狈醉,這個(gè)函數(shù)跟當(dāng)前調(diào)用.subscribe()一定執(zhí)行在主線程中
void onSubscribe(@NonNull Disposable d);
// 表示拿到上一個(gè)流程的數(shù)據(jù)
void onNext(@NonNull T t);
// 表示拿到上一個(gè)流程的錯(cuò)誤數(shù)據(jù)
void onError(@NonNull Throwable e);
// 表示事件流程結(jié)束
void onComplete();
}
具體的對(duì)象創(chuàng)建是在上面示例代碼1中的new Observer<String>()
操作,這個(gè)稱(chēng)這個(gè)為自定義觀察者惠险。
二苗傅、被觀察者Observable創(chuàng)建過(guò)程
分析完觀察者Observer的創(chuàng)建,現(xiàn)在來(lái)分析下被觀察者Observable的創(chuàng)建流程班巩,
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("test");
}
})
將new ObservableOnSubscribe()過(guò)程可以理解為是自定義source的過(guò)程渣慕。
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("test");
}
}
執(zhí)行Observable.create()代碼流程
Observable.java
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null"); //校驗(yàn)是否為null
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
其中,RxJavaPlugins.onAssembly()采用了hook技術(shù)抱慌,如果沒(méi)有重寫(xiě)RxJavaPlugins.setOnObservableAssembly()
方法逊桦,這個(gè)可以不要考慮。
ObservableCreate.java
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source; // 自定義source
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
@Override
public void setDisposable(Disposable d) {
DisposableHelper.set(this, d);
}
@Override
public void setCancellable(Cancellable c) {
setDisposable(new CancellableDisposable(c));
}
@Override
public ObservableEmitter<T> serialize() {
return new SerializedEmitter<T>(this);
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
}
/**
* Serializes calls to onNext, onError and onComplete.
*
* @param <T> the value type
*/
static final class SerializedEmitter<T>
extends AtomicInteger
implements ObservableEmitter<T> {
private static final long serialVersionUID = 4883307006032401862L;
final ObservableEmitter<T> emitter;
final AtomicThrowable error;
final SpscLinkedArrayQueue<T> queue;
volatile boolean done;
SerializedEmitter(ObservableEmitter<T> emitter) {
this.emitter = emitter;
this.error = new AtomicThrowable();
this.queue = new SpscLinkedArrayQueue<T>(16);
}
@Override
public void onNext(T t) {
if (emitter.isDisposed() || done) {
return;
}
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (get() == 0 && compareAndSet(0, 1)) {
emitter.onNext(t);
if (decrementAndGet() == 0) {
return;
}
} else {
SimpleQueue<T> q = queue;
synchronized (q) {
q.offer(t);
}
if (getAndIncrement() != 0) {
return;
}
}
drainLoop();
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
if (emitter.isDisposed() || done) {
return false;
}
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (error.addThrowable(t)) {
done = true;
drain();
return true;
}
return false;
}
@Override
public void onComplete() {
if (emitter.isDisposed() || done) {
return;
}
done = true;
drain();
}
void drain() {
if (getAndIncrement() == 0) {
drainLoop();
}
}
void drainLoop() {
ObservableEmitter<T> e = emitter;
SpscLinkedArrayQueue<T> q = queue;
AtomicThrowable error = this.error;
int missed = 1;
for (;;) {
for (;;) {
if (e.isDisposed()) {
q.clear();
return;
}
if (error.get() != null) {
q.clear();
e.onError(error.terminate());
return;
}
boolean d = done;
T v = q.poll();
boolean empty = v == null;
if (d && empty) {
e.onComplete();
return;
}
if (empty) {
break;
}
e.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
@Override
public void setDisposable(Disposable s) {
emitter.setDisposable(s);
}
@Override
public void setCancellable(Cancellable c) {
emitter.setCancellable(c);
}
@Override
public boolean isDisposed() {
return emitter.isDisposed();
}
@Override
public ObservableEmitter<T> serialize() {
return this;
}
}
}
這里將ObservableCreate的源碼全部放在這抑进,作為一個(gè)埋點(diǎn)
其實(shí)强经,Observable.create()方法主要功能就是創(chuàng)建了一個(gè)ObservableCreate對(duì)象,并將自定義的source傳給ObservableCreate寺渗。該方法最終返回的是ObserverableCreate對(duì)象匿情。
三兰迫、subscribe訂閱過(guò)程
分析執(zhí)行subscribe()訂閱流程,并將自定義觀察者作為參數(shù)傳入码秉。
Observable.java
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null"); // 功能校驗(yàn),判定observer是否為null
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
首先會(huì)執(zhí)行一些功能校驗(yàn)鸡号,最后執(zhí)行到subscribeActual()方法中转砖。
Observable.java
protected abstract void subscribeActual(Observer<? super T> observer);
subscribeActual()是一個(gè)抽象類(lèi),從而最終調(diào)用的是ObservableCreate的subscribeActual()方法中鲸伴。
ObservableCreate.java
@Override
protected void subscribeActual(Observer<? super T> observer) { // observer為自定義觀察者
// 自定義一個(gè)CreateEmitter發(fā)射器
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
// 執(zhí)行該方法就會(huì)執(zhí)行自定義觀察者的onSubscribe()方法中
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
subscribeActual()方法里面會(huì)執(zhí)行如下三個(gè)操作:
1)CreateEmitter<T> parent = new CreateEmitter<T>(observer);
--> 首先會(huì)創(chuàng)建一個(gè)CreateEmitter發(fā)射器府蔗,并將自定義觀察者傳入該發(fā)射器中
2)observer.onSubscribe(parent);
–> 執(zhí)行自定義觀察者的onSubscribe()方法,所以該方法也是最先執(zhí)行調(diào)用汞窗,并且一定在主線程中
3)source.subscribe(parent);
-->執(zhí)行自定義source的subscribe()訂閱操作,從而跳轉(zhuǎn)到示例代碼1中ObservableOnSubscribe的subscribe()方法姓赤,并將CreateEmitter發(fā)射器作為參數(shù)傳入進(jìn)去
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("test");
}
}
執(zhí)行e.onNext("test")
就會(huì)跳轉(zhuǎn)到CreateEmitter發(fā)射器中的onNext()
方法
ObservableCreate.java
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t); //執(zhí)行該流程,observer為自定義觀察者
}
}
...
}
該observer為上面流程中自定義的CreateEmitter發(fā)射器CreateEmitter<T> parent = new CreateEmitter<T>(observer);
傳入進(jìn)來(lái)的自定義觀察者對(duì)象仲吏,執(zhí)行observer.onNext(t)
該語(yǔ)句就調(diào)到示例代碼1中的
@Override
public void onNext(String s) {
}
Observable與Observer訂閱的過(guò)程時(shí)序圖如下:
[圖片上傳失敗...(image-e9a070-1677294388046)]
在標(biāo)準(zhǔn)的觀察者設(shè)計(jì)模式中不铆,是一個(gè)“被觀察者”,多個(gè)“觀察者”裹唆,并且需要“被觀察者”發(fā)出改變通知后誓斥,所以的“觀察者”才能觀察到
??在RxJava觀察者設(shè)計(jì)模式中,是多個(gè)“被觀察者”许帐,一個(gè)“觀察者”劳坑,并且需要 起點(diǎn)(被觀察者) 和 終點(diǎn)(觀察者) 在“訂閱”一次后,才發(fā)出改變通知成畦,終點(diǎn)(觀察者)才能觀察到
圖1:RxJava簡(jiǎn)單訂閱過(guò)程:
[圖片上傳失敗...(image-f0f708-1677294388046)]
四距芬、map操作符
加入map操作符之后的簡(jiǎn)單示例代碼2:
private Disposable mDisposable;
// 創(chuàng)建ObserverCreate
Observable.create(new ObservableOnSubscribe<String>() { //自定義source
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("test");
}
})
// ObservableCreate.map
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
return s;
}
})
// ObservableMap.subscribe
.subscribe(new Observer<String>() { //自定義觀察者
@Override
public void onSubscribe(Disposable d) {
mDisposable = d;
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
@Override
protected void onDestroy() {
super.onDestroy();
if (mDisposable != null) {
if (!mDisposable.isDisposed()) {
mDisposable.dispose();
}
}
}
這個(gè)示例代碼2寫(xiě)法采用裝飾模型
圖2:加入map操作符之后的流程:
[圖片上傳失敗...(image-40fc9-1677294388046)]
從①~⑥流程簡(jiǎn)稱(chēng)為封包裹,⑦ ~⑨流程簡(jiǎn)稱(chēng)為拆包裹
其實(shí)圖1與圖2的區(qū)別不大循帐,主要就是多了一個(gè)ObservableMap封包裹的流程框仔,其他流程都類(lèi)似。針對(duì)這個(gè)區(qū)別進(jìn)行代碼流程闡述下:
??從示例代碼2中執(zhí)行map()
操作進(jìn)行分析:
Observable.java
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
進(jìn)行創(chuàng)建ObservableMap對(duì)象
ObservableMap.java
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source); //source指ObservableCreate
this.function = function; // 自定義的Function方法
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function)); //這里面的t為下一層包裹即圖2中的自定義觀察者,source指上一層ObservableCreate
}
...
}
這里需要注意拄养,在ObservableMap()構(gòu)造函數(shù)中存和,參數(shù)source指從上一層傳過(guò)來(lái)的ObservableCreate對(duì)象,參數(shù)function指示例代碼2中的new Function()方法衷旅。
.map(new Function<String, String>()
執(zhí)行示例代碼2中的.subscribe()
其實(shí)就是執(zhí)行到了ObservableMap類(lèi)的subscribeActual()
方法捐腿,在這個(gè)方法中會(huì)對(duì)MapObserver進(jìn)行封裝一層包裹,并將下一層的包裹即自定義觀察者也就是參數(shù)t
傳入柿顶。
MapObserver為ObservableMap的內(nèi)部類(lèi)茄袖。
ObservableMap.java
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual); // actual為自定義觀察者
this.mapper = mapper;
}
...
}
在執(zhí)行圖2的第⑧步流程時(shí),就會(huì)調(diào)用執(zhí)行包裹1的onNext()方法嘁锯,即MapObserver類(lèi)的onNext();
ObservableMap.java
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
// 代碼1
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
// 代碼2
actual.onNext(v);
}
1:代碼1
??執(zhí)行mapper.apply(t)流程的時(shí)候宪祥,其實(shí)就是調(diào)用了示例代碼2中的apply()方法聂薪。
Function.java
public interface Function<T, R> {
R apply(@NonNull T t) throws Exception;
}
@Override
public String apply(String s) throws Exception {
return s;
}
2:代碼2
?? actual.onNext(v);中的actual是在ObservableMap構(gòu)造函數(shù)傳過(guò)來(lái)的,actual對(duì)應(yīng)圖2中的自定義觀察者對(duì)象蝗羊,也就是對(duì)應(yīng)圖2中的第9步流程藏澳。
五、線程切換原理
subscribeOn:給上面代碼分配線程
observeOn:給下面代碼分配線程
Scheduler分類(lèi):
調(diào)度器類(lèi)型 | 效果 |
---|---|
Schedulers.computation() | 用于計(jì)算任務(wù)耀找,如事件循環(huán)或回調(diào)處理翔悠,不要用于IO操作(IO操作使用Schedulers.io());默認(rèn)線程數(shù)等于處理器的數(shù)量 |
Schedulers.from(executor) | 使用指定的Executor作為調(diào)度器 |
Schedulers.immediate() | 在當(dāng)前線程立即開(kāi)始執(zhí)行任務(wù) |
Schedulers.io() | 用于IO密集型任務(wù) |
Schedulers.newThread() | 為每個(gè)任務(wù)創(chuàng)建一個(gè)新任務(wù) |
Schedulers.trampoline() | 當(dāng)其他排隊(duì)的任務(wù)完成后野芒,在當(dāng)前線程排隊(duì)開(kāi)始執(zhí)行 |
AndroidSchedulers.mainThread() | 用于Android的UI更新操作 |
1. 異步線程流程
示例代碼3:
private Disposable mDisposable;
//創(chuàng)建ObserverableCreate對(duì)象
Observable.create(
// 自定義source
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("test");
}
}
)
// TODO 第二步
//ObservableCreate.subscribeOn()
.subscribeOn(
// TODO 第一步
Schedulers.io() // 給上面的代碼分配異步線程
)
// TODO 第三步
// ObservableSubscribeOn.subscribe()
.subscribe(
// 自定義觀察者
new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
mDisposable = d;
}
@Override
public void onNext(String s) {
Log.d("abc", "onNext:" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
@Override
protected void onDestroy() {
super.onDestroy();
if (mDisposable != null) {
if (!mDisposable.isDisposed()) {
mDisposable.dispose();
}
}
}
示例代碼3只是簡(jiǎn)單的在示例代碼1上添加了一行異步線程的操作 .subscribeOn(Schedulers.io())
蓄愁,從第一步該語(yǔ)句進(jìn)行分析:
Schedules.java
static final Scheduler IO;
...
static {
SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
IO = RxJavaPlugins.initIoScheduler(new IOTask());
TRAMPOLINE = TrampolineScheduler.instance();
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}
...
@NonNull
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
RxJavaPlugins.initIoScheduler(...);
這條語(yǔ)句也采用了hook機(jī)制,繼續(xù)分析new IOTask()
流程
Schedules.java
static final class IOTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return IoHolder.DEFAULT;
}
}
DEFAULT賦值如下:
Schedules.java
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
IoScheduler.java
public IoScheduler() {
this(WORKER_THREAD_FACTORY);
}
public IoScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference<CachedWorkerPool>(NONE);
start();
}
@Override
public void start() {
CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
if (!pool.compareAndSet(NONE, update)) {
update.shutdown();
}
}
IoScheduler.java
private final ScheduledExecutorService evictorService; //線程池
CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
this.allWorkers = new CompositeDisposable();
this.threadFactory = threadFactory;
ScheduledExecutorService evictor = null;
Future<?> task = null;
if (unit != null) {
evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY); // 創(chuàng)建線程池
task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
}
evictorService = evictor;
evictorTask = task;
}
第一步總結(jié):執(zhí)行Schedulers.io()
最終返回的是Scheduler狞悲,也就是IOScheduler對(duì)象撮抓。通過(guò)new IOScheduler 創(chuàng)建了一個(gè)線程池,然后通過(guò)subscribeOn()來(lái)觸發(fā)摇锋。
Observable.java
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
此時(shí)new 了一個(gè)ObservableSubscribeOn對(duì)象丹拯,并將IoScheduler
對(duì)象傳進(jìn)去
ObservableSubscribeOn.java
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
第二步總結(jié):就是創(chuàng)建了一個(gè)ObservableSubscribeOn對(duì)象,并將IoScheduler傳入到該類(lèi)中荸恕。
??執(zhí)行第三步的.subscribe()
訂閱流程也就執(zhí)行到了ObservableSubscribeOn.subscribeActual()
這個(gè)方法中咽笼。
ObservableSubscribeOn.java
// s為自定義觀察者
public void subscribeActual(final Observer<? super T> s) {
// 代碼1
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); //定義一個(gè)包裹SubscribeOnObserver
// 代碼2
s.onSubscribe(parent);
// 代碼3
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
1:代碼1
??final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
定義一個(gè)包裹SubscribeOnObserver,并將自定義觀察者s
作為參數(shù)傳入
2:代碼2
??執(zhí)行s.onSubscribe(parent);
對(duì)應(yīng)的執(zhí)行到了示例代碼3中的語(yǔ)句塊
@Override
public void onSubscribe(Disposable d) {
mDisposable = d;
}
3:代碼3
??首先先分析parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
語(yǔ)句中的new SubscribeTask(parent)
代碼流程
ObservableSubscribeOn.java
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
SubscribeTask 就是一個(gè)線程任務(wù)
source.subscribe(parent);
這個(gè)語(yǔ)句塊中的source就是指上一層的對(duì)象戚炫,在示例代碼3中指ObservableCreate剑刑,parent指包裹SubscribeOnObserver。
之后繼續(xù)分析parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
語(yǔ)句中的scheduler.scheduleDirect(...)
代碼流程
Scheduler.java
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
Scheduler.java
// run是指SubscribeTask任務(wù)
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
// 代碼1
final Worker w = createWorker();
// 代碼2
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); // hook機(jī)制
// 代碼3
DisposeTask task = new DisposeTask(decoratedRun, w);
// 代碼4
w.schedule(task, delay, unit);
return task;
}
1:代碼1
??執(zhí)行final Worker w = createWorker();
双肤,createWorker()是一個(gè)抽象方法施掏,其實(shí)調(diào)用到了IoScheduler類(lèi)的createWorker()
IoScheduler.java
@NonNull
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
IoScheduler.java
static final class EventLoopWorker extends Scheduler.Worker {
private final CompositeDisposable tasks;
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;
final AtomicBoolean once = new AtomicBoolean();
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}
...
}
該語(yǔ)句塊最后返回的是EventLoopWorker對(duì)象。
2:代碼2
??其實(shí)代碼2語(yǔ)句就是將Runable進(jìn)行封裝了下茅糜,最后還是Runnable
3:代碼3
??將Runnable又包裝了一層 為DisposeTask
Scheduler.java
static final class DisposeTask implements Runnable, Disposable {
final Runnable decoratedRun;
final Worker w;
DisposeTask(Runnable decoratedRun, Worker w) {
this.decoratedRun = decoratedRun;
this.w = w;
}
...
}
4:代碼4
??執(zhí)行 w.schedule(task, delay, unit);
就會(huì)執(zhí)行到EventLoopWorker類(lèi)的schedule()方法
IoScheduler.java
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
NewThreadWorker.java
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr); //線程池的執(zhí)行七芭,executor是ScheduleExecutorService線程池
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
scheduler.scheduleDirect(new SubscribeTask(parent))
這句代碼的最終目的就是將 SubscribeTask 任務(wù)
交給線程池去執(zhí)行。
執(zhí)行executor.submit()
該語(yǔ)句就會(huì)觸發(fā)SubscribeTask任務(wù)的Run()
方法執(zhí)行蔑赘,該SubscribeTask任務(wù)就處于異步線程中狸驳。
ObservableSubscribeOn.java
public void run() {
source.subscribe(parent); //處于異步線程中執(zhí)行
}
source.subscribe(parent);
這個(gè)語(yǔ)句塊中的source就是指上一層的對(duì)象,在示例代碼3中指ObservableCreate缩赛,parent指包裹SubscribeOnObserver耙箍。
圖3:subscribeOn異步流程:
[圖片上傳失敗...(image-584a9b-1677294388046)]
說(shuō)明:步驟④是將SubscribeTask任務(wù)加入到線程池中執(zhí)行,則后續(xù)步驟⑤~⑩都是在異步線程中執(zhí)行
subscribeOn()切換線程時(shí)序圖:
[圖片上傳失敗...(image-32486d-1677294388046)]
2. 主線程流程
示例代碼4:
private Disposable mDisposable;
//創(chuàng)建ObserverableCreate對(duì)象
Observable.create(
// 自定義source
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("test");
}
}
)
// TODO 第二步
//ObservableCreate.observeOn()
.observeOn(
// TODO 第一步
AndroidSchedulers.mainThread() // 給上面的代碼分配主線程
)
// TODO 第三步
// ObservableObserveOn.subscribe()
.subscribe(
// 自定義觀察者
new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
mDisposable = d;
}
@Override
public void onNext(String s) {
Log.d("abc", "onNext:" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
@Override
protected void onDestroy() {
super.onDestroy();
if (mDisposable != null) {
if (!mDisposable.isDisposed()) {
mDisposable.dispose();
}
}
}
先來(lái)分析下第一步AndroidSchedulers.mainThread()
流程:
AndroidSchedulers.java
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
private static final class MainHolder {
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}
HandlerScheduler.java
final class HandlerScheduler extends Scheduler {
private final Handler handler;
HandlerScheduler(Handler handler) {
this.handler = handler;
}
...
}
AndroidSchedulers.mainThread()
流程就是創(chuàng)建了一個(gè)HandlerScheduler對(duì)象酥馍。
執(zhí)行第二步.observeOn(...)
Observable.java
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
創(chuàng)建一個(gè)ObservableObserveOn對(duì)象辩昆,其中scheduler就是HandlerScheduler對(duì)象。
ObservableObserveOn.java
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
.observeOn(...)
流程就是創(chuàng)建一個(gè)ObservableObserveOn對(duì)象旨袒,并將HandlerScheduler對(duì)象傳入汁针。
之后執(zhí)行第三步.subscribe()
术辐,對(duì)應(yīng)執(zhí)行的是ObservableObserveOn.subscribeActual()
ObservableObserveOn.java
@Override
public void subscribeActual(final Observer<? super T> s) {
// 代碼1
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
// 代碼2
s.onSubscribe(parent);
// 代碼3
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
其實(shí),參數(shù)s為自定義的觀察者施无,這個(gè)地方跟異步線程的流程是一樣的辉词,
代碼1:封裝了一層SubscribeOnObserver包裹
代碼2:執(zhí)行自定義觀察者中的onSubscribe()方法流程
代碼3:scheduler.scheduleDirect(new SubscribeTask(parent))
這行代碼的功能就是將SubscribeTask任務(wù)交給主線程執(zhí)行。
HandlerScheduler.java
@Override
public Worker createWorker() {
return new HandlerWorker(handler);
}
private static final class HandlerWorker extends Worker {
private final Handler handler;
private volatile boolean disposed;
HandlerWorker(Handler handler) {
this.handler = handler;
}
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run); //將run運(yùn)行在主線程中
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
...
}
圖4:observeOn主線程流程:
[圖片上傳失敗...(image-5ac86f-1677294388046)]
observeOn()時(shí)序圖:
[圖片上傳失敗...(image-e9a7a-1677294388046)]