概要
RxJava
最神秘的部分莫過于此,我為了編寫這篇文章也是一遍一遍的查看源碼尋找它的運行原理秩霍,同樣的也查閱了很多的相關(guān)資料,但是慚愧的是并未找到實質(zhì)性有用的資料蚁阳,很多都是避開這話題避而不談铃绒,在此我們詳細的對它進行剖析。
不得不說這個框架編寫的真的很棒螺捐,相關(guān)類也是錯綜復(fù)雜颠悬,不過作為程序員我們必須拿出積極的態(tài)度來學(xué)習它的實現(xiàn),以此提高自身的價值定血;好了廢話不多說我們馬上開始赔癌。
源碼剖析
我們先來看一下外部的實現(xiàn)調(diào)用
public void onMineTask() {
//聲明一個ObservableCreate類型的 被觀察者對象
Observable mObservable = new ObservableCreate(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
e.onNext("ONE");
e.onNext("TWO");
e.onNext("THREE");
e.onNext("FOUR");
}
});
//聲明一個Observer類型的觀察者對象
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
disposable = d;
Log.e("回調(diào)執(zhí)行了onSubscribe函數(shù)->", "觀察者已成功訂閱");
}
@Override
public void onNext(String value) {
if (disposable.isDisposed())
onComplete();
value = mine_result.getText().toString() + "\n" + value;
mine_result.setText(value);
Log.e("回調(diào)執(zhí)行了onNext函數(shù)->", value);
}
@Override
public void onError(Throwable e) {
Log.e("回調(diào)執(zhí)行了onError函數(shù)->", e.getMessage());
}
@Override
public void onComplete() {
Log.e("回調(diào)執(zhí)行了onComplete函數(shù)->", "本次結(jié)束!");
}
};
//指定被觀察者和觀察者運行的線程
//指定 被觀察者線程 ObservableSubscribeOn類型
mObservable.subscribeOn(Schedulers.io())
//指定 觀察者線程 ObservableObserveOn類型
.observeOn(AndroidSchedulers.mainThread())
//事件發(fā)布
.subscribe(observer);
}
我們通過上述的代碼可以看出mObservable
它是ObservableCreate
類型,通過他調(diào)取了[1]subscribeOn(Schedulers.io())
來指定被觀察者的執(zhí)行線程澜沟,并返回一個Observable
類型的返回值灾票,當然這個Observable
類型是個抽象類型;
而獲取到Observable
類型的返回值之后茫虽,我們有調(diào)用了[2]observeOn(AndroidSchedulers.mainThread())
函數(shù)方法刊苍,實現(xiàn)了對觀察者執(zhí)行線程的指定,此時我們得到返回值依舊是Observable
類型濒析;
最終我們依舊使用Observable
類型的返回值正什,通過調(diào)取[3]subscribe(observer)
函數(shù)實現(xiàn)主題發(fā)布,并攜帶著observer
觀察者實例号杏。
那么我們接下來婴氮,就按序講解他們的執(zhí)行的過程:
[ 1 ]
subscribeOn(Schedulers.io())
函數(shù)方法,源碼解析
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
//此處查Null操作
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
//這里是一個鉤子函數(shù)馒索,在無擴展性特殊操作情況下莹妒,
//原封不動的返回一個ObservableSubscribeOn類型的值,
//而由于它的基類是 Observable類型绰上,所以我們直接將其基類類型
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
我們來看RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler))
函數(shù)方法旨怠,它其實是一個鉤子函數(shù),在無擴展性特殊操作情況下蜈块,會原封不動的返回一個ObservableSubscribeOn
類型的值鉴腻,而由于它的基類是 Observable
類型,所以我們直接將其基類類型返回百揭。
而我們重點來看這方法中的參數(shù)new ObservableSubscribeOn<T>(this, scheduler)
爽哎,它是聲明了一個ObservableSubscribeOn
類型對象,并傳入了一個Observable
類型的被觀察者器一,但由于Observable
實現(xiàn)了ObservableSource
的接口课锌,而我們也只需要ObservableSource
這部分內(nèi)容,所以最終聲明ObservableSubscribeOn
類型實例,我們傳入的是一個ObservableSource
類型參數(shù)和一個指定運行線程的Scheduler
類型參數(shù).
在ObservableSubscribeOn
類中我們對subscribeActual(final Observer<? super T> observer)
抽象方法進行了實現(xiàn).但目前還未進行調(diào)用
[ 2 ]
observeOn(AndroidSchedulers.mainThread())
函數(shù)方法,源碼解析
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
//調(diào)用其自身的多態(tài)方法
return observeOn(scheduler, false, bufferSize());
}
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
//省略...
//鉤子函數(shù)渺贤,若無擴展性特殊邏輯實現(xiàn)依舊返回一個`ObservableObserveOn`類型的值雏胃,
//由于`Observable`是其基類,所以這里直接將其基類類型返回值
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
此方法與[ 1 ]中的方法一樣這里不再贅述志鞍,我們的重點是其參數(shù)new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)
瞭亮,它返回是一個ObservableObserveOn
類型的值,但由于它的基類是Observable
類型固棚,所以此處直接按基類類型返回统翩,它攜帶參數(shù)包括
ObservableSource
類型的this
;Scheduler
類型的執(zhí)行線程此洲;boolean
類型的延遲錯誤厂汗,指示onError通知是否不能在另一側(cè)的onNext通知之前切換;int
類型的緩沖區(qū)大小.
ObservableObserveOn
類同樣對`subscribeActual(final Observer<? super T> observer)抽象方法進行了實現(xiàn).但目前還未進行調(diào)用.
[ 3 ]
subscribe(observer)
函數(shù)實現(xiàn)主題發(fā)布,攜帶著觀察者參數(shù)呜师。源碼解析:
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
//省略...
//調(diào)取 subscribeActual抽象方法
subscribeActual(observer);
}
//省略...
}
我們來看subscribeActual(observer)
這個函數(shù)面徽,我們的被觀察者的ObservableSubscribeOn
和觀察者的ObservableObserveOn
分別都重寫了此函數(shù),那么此處的subscribeActual(observer)
將進入哪個類呢匣掸?
我們將外部的調(diào)用鏈式表達分解一下來看:
Observable observableSubscribeOn1 = mObservable.subscribeOn(Schedulers.io()); //第一次
//指定 觀察者線程 ObservableObserveOn類型
Observable observeObserveOn2 = observableSubscribeOn1.observeOn(AndroidSchedulers.mainThread());
//發(fā)布執(zhí)行
observeObserveOn2.subscribe(observer);
很清晰的我們能看到,最終對subscribe(observer)
函數(shù)方法發(fā)起調(diào)用的是觀察者的ObservableObserveOn
類,所以我們將會執(zhí)行ObservableObserveOn
類中的subscribeActual(observer)
實現(xiàn)
public ObservableObserveOn(ObservableSource<T> source, Scheduler
@Override
protected void subscribeActual(Observer<? super T> observer) {
//省略...
//創(chuàng)建一個線程氮双,當前創(chuàng)建出來的是一個Handler線程
Scheduler.Worker w = scheduler.createWorker();
//此處我們調(diào)用上一個Observable類型對象的發(fā)布事件
//即 被觀察者對象的發(fā)布事件
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
首先明確一點此處的source
對象是被觀察者的Observable
類型實例碰酝,由于在指定運行線程時我們都傳入一個Observable
類型對象,而傳遞的Observable
都是上一個對象的實例戴差。
此處的參數(shù)new ObserveOnObserver<T>(observer, w, delayError, bufferSize)
是聲明的ObservableObserveOn
類的靜態(tài)內(nèi)部類的實例送爸。
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.downstream = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
}
可以看出它實現(xiàn)了Observer
接口,而我們在source.subscribe(...)
方法中也僅需要這部分內(nèi)容暖释,至此我們又回到了Observable
類中的subscribe(Observer<? super T> observer)
函數(shù)方法袭厂。
public final void subscribe(Observer<? super T> observer) {
//省略...
subscribeActual(observer);
//省略...
而再次調(diào)用subscribeActual(observer)
函數(shù)方法,不會再進入到ObservableObserveOn
類中球匕,而是進入ObservableSubscribeOn
類纹磺,執(zhí)行其subscribeActual(final Observer<? super T> observer)
實現(xiàn)方法
@Override
public void subscribeActual(final Observer<? super T> observer) {
//將觀察者包裝成ObservableObserveOn類型
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
//[ 3.1 ]調(diào)用此方法會進入觀察者`ObservableObserveOn`類中的實現(xiàn)方法
//可以看出此處還沒進行被觀察者的線程指定
observer.onSubscribe(parent);
//[ 3.2 ]在此處設(shè)置被觀察者的運行線程
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
[ 3.1 ] 我們來看看
observer.onSubscribe(parent)
,它在ObservableObserveOn
類的內(nèi)部類ObserveOnObserver
的onSubscribe(Disposable d)
做了什么
@Override
public void onSubscribe(Disposable d) {
//省略...
downstream.onSubscribe(this);
}
}
downstream
對象使我們在創(chuàng)建ObserveOnObserver
類型實例時傳入的Observer
類型實例亮曹,即觀察者實例橄杨,而它調(diào)取onSubscribe(this)
方法后直接回調(diào)至我們的外部實現(xiàn)中所創(chuàng)建的觀察者對象的onSubscribe(Disposable d)
中,通知觀察者訂閱成功照卦。
[ 3.2 ] 接下來分析一下
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)))
方法是如何指定被觀察者運行線程的,
先來分析他參數(shù)實現(xiàn)scheduler.scheduleDirect(new SubscribeTask(parent))
,
[ 3.2.1 ] 來看
new SubscribeTask(parent)
做了什么
//實現(xiàn)Runnable 接口
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
//調(diào)取事件發(fā)布事件,而此處的source是`ObservableCreate`類型
source.subscribe(parent);
}
}
此處實現(xiàn)了Runnable
接口式矫,并在重寫的run()
方法中調(diào)取了source.subscribe(parent)
,首先明確此時的source
對象它的類型是誰?答案是ObservableCreate
役耕,因為在創(chuàng)建ObservableSubscribeOn
類型實例時傳入的ObservableSource
類型參數(shù)是上一個對象采转,所以在執(zhí)行subscribe(parent)
方法后我們會通過subscribeActual(observer)
方法函數(shù),進入到ObservableCreate
類中對subscribeActual(observer)
方法實現(xiàn)瞬痘;而在此處我們還未進行有效的調(diào)用.
[ 3.2.2 ] 分析完
SubscribeTask
后我們接著分析他外層scheduler.scheduleDirect(new SubscribeTask(parent))
方法做了什么
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
//創(chuàng)建一個Worker線程,
final Worker w = createWorker();
//鉤子函數(shù),若無擴展性特殊處理則返回參數(shù)本身
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//聲明一個處理任務(wù)故慈,將Runnable和Work封裝成DisposeTask
DisposeTask task = new DisposeTask(decoratedRun, w);
//調(diào)取woker對象的schedule方法
w.schedule(task, delay, unit);
return task;
}
<3.2.2.1> 我們先調(diào)用了createWorker()
函數(shù)創(chuàng)建了一個EventLoopWorker
類型對象板熊,利用它實現(xiàn)線程調(diào)度;而EventLoopWorker
繼承于Scheduler.Worker
抽象類
static final class EventLoopWorker extends Scheduler.Worker {
//省略...
//結(jié)束操作
@Override
public void dispose() {
if (once.compareAndSet(false, true)) {
tasks.dispose();
// 釋放線程池操作
pool.release(threadWorker);
}
}
//是否結(jié)束
@Override
public boolean isDisposed() {
return once.get();
}
//重寫進程調(diào)度
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
//執(zhí)行線程調(diào)度執(zhí)行
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
}
<3.2.2.2> 然后聲明了一個DisposeTask
類型對象惯悠,將Runnable
對象和線程調(diào)度器包裝在一起邻邮,而DisposeTask
類實現(xiàn)了Runnable
接口,重寫了run()
函數(shù)方法克婶,
static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection {
//省略...
@Override
public void run() {
runner = Thread.currentThread();
try {
//執(zhí)行Runnable
decoratedRun.run();
} finally {
dispose();
runner = null;
}
}
//省略...
}
<3.2.2.3> 完成上述預(yù)備工作后筒严,我們開始調(diào)用w.schedule(task, delay, unit)
方法進行任務(wù)執(zhí)行
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
//判定任務(wù)是否成功被訂閱
if (tasks.isDisposed()) {
// 若沒有被訂閱,則不作任何處理
return EmptyDisposable.INSTANCE;
}
//進行線程調(diào)度
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
}
<3.2.2.4> 調(diào)用 scheduleActual()
進行線程任務(wù)調(diào)度
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
// 省略...
try {
// 線程提交
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
線程被提交后,會交給Executor
來執(zhí)行情萤,調(diào)用Runnable
接口的run
方法鸭蛙,那么這就簡單了.
小結(jié)
在上述操作中我們成功的將[被觀察者]提交給了指定的線程去執(zhí)行,那么接下來就是線程向上執(zhí)行調(diào)取被觀察者的
run()
的流程了筋岛,別頭大娶视,這個框架的復(fù)雜度就是那么繞~~~,想要成長我們就得邁過這道坎,畢竟成長的就是這么痛苦U鲈住肪获!
這里我建議一下,閱讀這樣復(fù)雜的框架的時候我們最好在Debug
模式下,跟隨著框架流程進行學(xué)習····好了休息一下吧柒傻!我們馬上開始下半場的學(xué)習孝赫。
[ 3.3 ]我們在<3.2.2.4>()中執(zhí)行的
scheduleActual()
中將Runnable
封裝成了ScheduledRunnable
類型,那么首先執(zhí)行進入它的里面,看看它的run()
方法做了什么詭譎的操作.
public void run() {
lazySet(THREAD_INDEX, Thread.currentThread());
try {
try {
//獲取的上層的Runnable對象,執(zhí)行其run方法
actual.run();
} catch (Throwable e) {
//若發(fā)生異常,則直接調(diào)用onError通知觀察者
RxJavaPlugins.onError(e);
}
//省略....
}
可以看到ScheduledRunnable
類中的run()
并未做什么特殊操作,而是執(zhí)行了上層的Runnable
類型對象的run()
方法红符,那么它的上層是誰呢青柄?
[ 3.4 ] 目光轉(zhuǎn)向[ 3.2.2 ]的源碼,我們將其中把
Runnable
和Work
類型對象關(guān)聯(lián)了起來预侯,并封裝成了一個DisposeTask
類型對象致开,那么廢話不多說我們直接去看看其中的源碼
@Override
public void run() {
runner = Thread.currentThread();
try {
decoratedRun.run();
} finally {
dispose();
runner = null;
}
}
[ 3.5 ]依舊是繼續(xù)向上調(diào)取,我最終回到了ObservableSubscribeOn
類中內(nèi)部類SubscribeOnObserver
中的內(nèi)部類的SubscribeTask
的run()
方法萎馅,饒了這么大一圈最終還是回到了最初始調(diào)取的地方才有實質(zhì)的處理操作_!! 來看看吧
final class SubscribeTask implements Runnable {
// 省略...
@Override
public void run() {
//調(diào)取被觀察者的 subscribe方法
source.subscribe(parent);
}
}
[ 3.6 ] 此時的
source
就是外部第一次聲明的Observable
類型對象的實例了双戳,即ObservableCreate
類,那么我們將再次回到Observable
類中執(zhí)行其subscribe(Observer<? super T> observer)
函數(shù)糜芳,老套路來看吧拣技!
public final void subscribe(Observer<? super T> observer) {
//省略...
subscribeActual(observer);
//省略...
}
[ 3.7 ]我們又一次要執(zhí)行
subscribeActual(observer)
方法了,那么這次我們要進入哪個類呢耍目?猜對了就是ObservableCreate
類
@Override
protected void subscribeActual(Observer<? super T> observer) {
//將觀察者封裝成CreateEmitter類型
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//回調(diào)觀察者的接口膏斤,告訴觀察者訂閱成功,
//并將新封裝的觀察者傳遞過去
observer.onSubscribe(parent);
try {
//回調(diào)外部被觀察者的接口,告訴被觀察者主題已被訂閱,
//可進行接下來相關(guān)操作了,并將最新封裝的觀察者對象傳入
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
[ 3.8 ] 終于出去了=_=!! 我們首先通知觀察者成功訂閱主題邪驮,然后再告訴被觀察者被成功訂閱,可以新進行接下來的主題發(fā)布了!!
Observable mObservable = new ObservableCreate(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
//[ 4 ]發(fā)布下一個
e.onNext("ONE");
e.onNext("TWO");
e.onNext("THREE");
e.onNext("FOUR");
}
});
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("回調(diào)執(zhí)行了onSubscribe函數(shù)->", "觀察者已成功訂閱");
}
@Override
public void onNext(String value) {
Log.e("回調(diào)執(zhí)行了onNext函數(shù)->", value);
}
@Override
public void onError(Throwable e) {
Log.e("回調(diào)執(zhí)行了onError函數(shù)->", e.getMessage());
}
@Override
public void onComplete() {
Log.e("回調(diào)執(zhí)行了onComplete函數(shù)->", "本次結(jié)束!");
}
};
[ 3.8.1 ]調(diào)取
ObservableEmitter
類型的觀察者的onNext(Object value)
方法
@Override
public void onNext(T t) {
//省略...
if (!isDisposed()) {
//調(diào)取未進行包裝的觀察者的onNext方法函數(shù)
observer.onNext(t);
}
}
[ 3.8.2 ]這里的并沒有進行任何處理莫辨,而是調(diào)取了上層的
SubscribeOnObserver
類型的觀察者的onNext(Object value)
@Override
public void onNext(T t) {
downstream.onNext(t);
}
[ 3.8.3 ] 這里依舊是繼續(xù)向上調(diào)取,通過調(diào)取上層的
ObserveOnObserver
類型的觀察者的onNext(Object value)
方法
@Override
public void onNext(T t) {
//省略
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
//進行線程調(diào)度,并將觀察者傳入
worker.schedule(this);
}
}
[ 3.8.4 ]此處進行觀察者執(zhí)行線程的調(diào)度,由于我們在外部指定觀察者運行于
UI
線程沮榜,所以此處我們HandlerScheduler
類的schedule(Runnable run, long delay, TimeUnit unit)
方法中
public abstract class Scheduler {
public abstract static class Worker implements Disposable {
@NonNull
public Disposable schedule(@NonNull Runnable run) {
return schedule(run, 0L, TimeUnit.NANOSECONDS);
}
}
}
final class HandlerScheduler extends Scheduler {
private static final class HandlerWorker extends Worker {
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
//省略...
//將handle和run關(guān)聯(lián)起來包裝成新對象
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this;
if (async) {
message.setAsynchronous(true);
}
//handler發(fā)送消息
handler.sendMessageDelayed(message, unit.toMillis(delay));
//省略...
return scheduled;
}
}
}
[ 3.8.5 ] 我們在這里將
Runnable
包裝成Message
同過Handler
進行發(fā)送盘榨,這將進入到Hanlder
類中的runWithScissors(final Runnable r, long timeout)
方法
public final boolean runWithScissors(final Runnable r, long timeout) {
//省略
if (Looper.myLooper() == mLooper) {
//執(zhí)行Runnable接口的run()方法
r.run();
return true;
}
//省略...
}
[ 3.8.6 ] 通過判定我們當前運行的線程是否跟
Handler
所在的線程是否一致 ,如果一致直接運行Runnable
的run()
方法蟆融,即我們再次回到了HandlerScheduler
類中的內(nèi)部類HandlerWorker
下草巡,執(zhí)行其run()
方法
@Override
public void run() {
try {
delegate.run();
} catch (Throwable t) {
RxJavaPlugins.onError(t);
}
}
[ 3.8.7 ]沒有意外,這沒有進行任何處理型酥,繼續(xù)向上傳遞進入
ObservableObserveOn
類的run()
中
@Override
public void run() {
//省略...
drainNormal();
}
void drainNormal() {
//省略...
a.onNext(v);
//省略...
}
[ 3.8.8 ] 可以看到在此處我們回調(diào)了外部聲明觀察者接口的
onNext(Object value)
方法山憨,進而執(zhí)行我們自定義的業(yè)務(wù)!C趾怼郁竟!
@Override
public void onNext(String value) {
if (disposable.isDisposed())
onComplete();
value = mine_result.getText().toString() + "\n" + value;
mine_result.setText(value);
Log.e("回調(diào)執(zhí)行了onNext函數(shù)->", value);
}
小結(jié)
至此完成了線程切換的全部過程,不得不說這個框架的復(fù)雜程度尤其之高由境,為了寫這篇文章也是累的自己一頭的汗=_=!! 一開始分析源碼的時候真是一個頭兩個大棚亩,主要是其中的類粘連性太強了,特別容易把自己搞得暈頭轉(zhuǎn)向虏杰,不過分析完之后腦中也就豁然開朗了讥蟆。
由于篇幅太長,我們將整體流程大致的再進行一下梳理纺阔,之后我會將流程圖奉上9パ!
流程梳理
- 4.使用最新的[被觀察者]調(diào)取
subscribe(observer)
方法進行主題發(fā)布掏婶,并將觀察者作為參數(shù)傳入啃奴。- 5.調(diào)用新的被觀察者2 對象的
subscribeActual(Observer<? super T> observer)
方法,創(chuàng)建一個執(zhí)行線程,調(diào)取新的被觀察者1的subscribe(Observer<? super T> observer)
方法雄妥,比創(chuàng)建一個ObserveOnObserver
類型對象傳入- 6.調(diào)用新的被觀察者1 對象的
subscribeActual(Observer<? super T> observer)
方法最蕾,創(chuàng)建一個執(zhí)行線程,在線程的run()
方法中調(diào)取被觀察者 0的subscribe(Observer<? super T> observer)
方法老厌,聲明一個SubscribeOnObserver
類型的對象傳入- 7.調(diào)用被觀察者0 對象的
subscribeActual(Observer<? super T> observer)
方法瘟则,通過調(diào)取被觀察者的subscribe(ObservableEmitter e)
方法,回調(diào)到了我們的外部實現(xiàn).- 8.在外部的
subscribe(...)
方法枝秤,調(diào)取觀察者的onNext(Object value)
方法醋拧,而此時的觀察者經(jīng)過一套流程下來也是被進行的層層包裝,最終回調(diào)到了我們的外部實現(xiàn).
這篇文章篇幅我有點長,希望有興趣的同學(xué)慢慢讀丹壕,之后我把流程圖梳理完畢后會如數(shù)奉上~~~
此文章只代表個人理解觀點庆械,如有差錯希望積極之處,我們共同交流!!!