前言
接觸RxJava也有將近2年了导街,雖然能夠熟練使用披泪,可是對里面的運行流程一直都是未知半解,雖然中間也有看過網(wǎng)上的博客搬瑰,但總是看了又忘記款票。直到最近才下定決心,寫一個關(guān)于RxJava2的系列專題跌捆,好好學(xué)習(xí)一下里面的源碼流程徽职,并以文章的形式記錄下來(主要是防止自己過段時間又忘記了,誰叫我記性差呢佩厚,咳咳...)姆钉。希望我能堅持寫完,由于水平有限抄瓦,如有錯誤之處還請指正潮瓶。
好了,話不多說钙姊,讓我們先從一段代碼開始
Observer<Integer> observer = new Observer<Integer>() {
Disposable disposable;
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe:");
disposable = d;
}
@Override
public void onNext(Integer integer) {
Log.i(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError: " + e.getMessage());
e.printStackTrace();
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete:");
}
};
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).subscribe(observer);
以上便是RxJava的教學(xué)課程中一開始就教大家寫的代碼(嗯毯辅,我個人認為是這樣的。)輸出結(jié)果如下:
可以看到煞额,onSubscribe是最先被調(diào)用的思恐;其次依次執(zhí)行onNext發(fā)送數(shù)1,2膊毁,3胀莹;最后執(zhí)行onComplete結(jié)束數(shù)據(jù)發(fā)送。
好了婚温,代碼執(zhí)行完了描焰,那它的運行流程是什么樣子的呢?我們先從事件源開始栅螟,也就是Observable.create()方法荆秦,如下:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
可以看到篱竭,create方法接收一個ObservableOnSubscribe<T>接口類型的參數(shù)
public interface ObservableOnSubscribe<T> {
/**
* Called for each Observer that subscribes.
* @param emitter the safe emitter instance, never null
* @throws Exception on error
*/
void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}
該接口只有一個方法subscribe,咦步绸?好像和文章一開始的代碼最后面的subscribe方法一模一樣啊掺逼,不過我們先不管這些,繼續(xù)往下看靡努,該方法接收一個ObservableEmitter<T>接口類型的參數(shù):
public interface ObservableEmitter<T> extends Emitter<T> {
void setDisposable(@Nullable Disposable d);
void setCancellable(@Nullable Cancellable c);
boolean isDisposed();
@NonNull
ObservableEmitter<T> serialize();
@Experimental
boolean tryOnError(@NonNull Throwable t);
}
ObservableEmitter繼承自接口Emitter:
public interface Emitter<T> {
void onNext(@NonNull T value);
void onError(@NonNull Throwable error);
void onComplete();
}
Emitter里面的三個方法還是蠻熟悉的坪圾,剛好和observer中的onNext,onComplete和onError一一對應(yīng)惑朦,他們之間是怎么建立聯(lián)系的呢兽泄?好像還漏了個onSubscribe,這個方法又是什么時候調(diào)用的呢漾月?我們回過頭來繼續(xù)看Observable.create方法病梢。
讓我們再看一眼Observable.create方法:
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
上面說到該方法接收類型為ObservableOnSubscribe<T>接口的參數(shù),然后將其傳入到ObservableCreate<T>類的構(gòu)造方法中梁肿,這個RxJavaPlugins.onAssembly又是什么鬼蜓陌?我們看一眼這個方法:
@SuppressWarnings({ "rawtypes", "unchecked" })
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
根據(jù)這個方法的描述,它返回一個鉤子(hook)吩蔑,也就是將接收的參數(shù)做了一下處理再將這個參數(shù)返回給調(diào)用者钮热。RxjavaPlugins類的其他方法也是類似的處理過程≈蚍遥總之我們只要明白這個方法把傳進去的參數(shù)最終又返回給了調(diào)用者就行了隧期。
我們再回到Observable.create方法中,可以看到赘娄,create方法將傳進去的ObservableOnSubscribe<T>又傳遞給了ObservableCreate<T>類的構(gòu)造方法仆潮,最后通過RxJavaPlugins.onAssembly將這個構(gòu)造方法生成的ObservableCreate對象返回給調(diào)用者,這個ObservableCreate也是繼承自O(shè)bservable類遣臼。于是一個具體的Observable對象就誕生了性置,其具體對象類型為ObservableCreate類,接下來我們就來介紹這個類:
在介紹ObservableCreate類之前揍堰,我們先來梳理下思路:
1鹏浅、Observable.create方法生成一個Observable對象,也就是被觀察者
2屏歹、該方法需要接收一個ObservableOnSubscribe<T>類型的參數(shù)
3隐砸、將接收到的ObservableOnSubscribe<T>類型的參數(shù)傳遞到ObservableCreate<T>類的構(gòu)造方法中,生成ObservableCreate對象西采,并通過RxJavaPlugins.onAssembly方法這個對象返回凰萨。
這樣继控,一個具體的被觀察者對象就誕生了械馆,他就是ObservableCreate對象胖眷。
看樣子我們的事件訂閱就是在這個ObservableCreate類中完成的,那我們就來看看它做了哪些工作霹崎。
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
//......代碼省略
}
//......代碼省略
}
//......代碼省略
}
我們看到該類有個source成員變量珊搀,類型是ObservableOnSubscribe<T>,正是Observable.create方法傳入的參數(shù)尾菇。我們重點來看subscribeActual(Observer<? super T> observer)這個方法:
subscribeActual方法分析
- 1境析、subscribeActual接收一個Observer類型的參數(shù)(觀察者),看樣子好像是我們在訂閱的時候傳入的observer派诬,到底是不是我們后面再看
- 2劳淆、將傳入的observer參數(shù)包裝成一個CreateEmitter。
- 3默赂、observer調(diào)用自己的onSubscribe方法沛鸵,這個方法的參數(shù)正式上面包裝observer的CreateEmitter。
- 4缆八、source.subscribe(parent)真正的訂閱發(fā)生的地方曲掰。這個source就是Observable.create方法中傳入的 ObservableOnSubscribe對象,parent則是步驟2中包裝observer的CreateEmitter奈辰。
因此這里執(zhí)行的subscribe方法正是Observable.create方法中所傳入ObservableOnSubscribe接口里面的subscribe方法栏妖。這樣一來我們就明白了在文章開始的代碼中,subscribe方法中我們調(diào)用emitter.onNext奖恰,emitter.onComplete吊趾,這個emitter實際上就是步驟2中包裝observer后生成的CreateEmitter對象,CreateEmitter類實現(xiàn)了ObservableEmitter接口房官。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).subscribe(observer);
- 5趾徽、這里在source.subscribe(parent)中進行了異常捕獲,如果subscribe拋出了異常翰守,則調(diào)用parent.onError(ex);
這樣看來孵奶,我們的觀察者observer和CreateEmitter之間有著很大的聯(lián)系,我們來分析下CreateEmitter這個類:
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
//......代碼省略
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
//......代碼省略
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
//......代碼省略
}
從上面代碼中我們看到蜡峰,這個類繼承自ActimicReference了袁,這樣它便是原子級的,同時實現(xiàn)了ObservableEmitter<T>和Disposable接口(關(guān)于Disposable我們在下一章討論)湿颅,而ObservableEmitter又繼承自Emitter接口载绿。因此我們重點看CreateEmitter所實現(xiàn)的Emitter中的三個方法,即分別是onNext油航,onComplete和OnError崭庸。首先我們看它的onNext方法
@Override
public void onNext(T t) {
//......代碼省略
if (!isDisposed()) {
observer.onNext(t);
}
}
簡要介紹下isDisposed()這個方法,該方法為true時表示訂閱被中斷,為false時正好相反怕享。在這里面當!isDisposed()為true時表示訂閱未被中斷执赡,此時執(zhí)行observer.onNext(t);這個observer正是通過CreateEmitter構(gòu)造方法傳遞進來的,也就是subscribeActual方法所接收的觀察者對象函筋。因此observer和CreateEmitter之間的onNext方法就是通過這種方式建立的聯(lián)系沙合,onComplete和onError同理。
至此跌帐,observer和CreateEmitter之間的關(guān)聯(lián)就分析完了首懈。還有一個問題,就是上面的步驟1中留下來的問題谨敛,就是這個subscribeActual方法所接收的observer是不是我們訂閱時傳進去的observer觀察者究履?我們就返回到最初的訂閱代碼:
這個subscribe執(zhí)行了哪些操作呢?我們點進去看看:
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
從上面的方法中我們可以很明顯的看出subscribeActual(observer)中接收的observer正式來自于我們訂閱時傳入的觀察者脸狸,在Observable類中subscribeActual方法是一個抽象方法挎袜,需要其子類去實現(xiàn),在上面的思路梳理中我們得出的結(jié)論是Observable.create生成的具體Observable對象是其子類ObservableCreate肥惭;然后ObservableCreate調(diào)用subscribe方法完成訂閱盯仪,因此此處執(zhí)行的subscribeActual(observer);正是ObservableCreate內(nèi)部的subscribeActual方法。
到這里我們的事件訂閱流程就分析完了蜜葱,最后我們再來總結(jié)一下:
結(jié)論
- 1全景、Observable.create()方法接收一個ObservableOnSubscribe接口類型的對象(source),并將這個接收的對象作為參數(shù)傳遞到ObservableCreate類的構(gòu)造方法中生成一個Observable子類對象ObservableCreate對象(new ObservableCreate(source))牵囤,最后將其返回作為事件源(被觀察者)爸黄。
- 2、然后調(diào)用subscribe(observer)方法揭鳞,實際上執(zhí)行的是observableCreate.subscribe(observer)炕贵。在這個方法中調(diào)用subscribeActual(observer);其參數(shù)正是我們前面自己寫的observer對象(觀察者)。而在subscribeActual(observer)方法內(nèi)部野崇,首先將傳入的observer包裝為CreateEmitter對象(parent)称开,然后執(zhí)行observer.onSubscribe(parent)。至此觀察者和被觀察者之間正式建立訂閱關(guān)系乓梨。
-
3鳖轰、最后執(zhí)行方法source.subscribe(parent),這個方法實際上是ObservableOnSubscribe接口中的方法扶镀,也就是我們自己手寫的ObservableOnSubscribe實現(xiàn)類中的方法:
image.png
在上圖中蕴侣,emitter就是傳入的parent,也就是結(jié)論2中將外部傳進來的observer包裝起來的CreateEmitter對象臭觉。因此昆雀,當我們調(diào)用emitter.onNext,onComplete,onError等方法時辱志,實際上調(diào)用的是CreateEmitter內(nèi)部的onNext,onComplete,onError方法;在CreateEmitter內(nèi)部的onNext,onComplete,onError方法中狞膘,又調(diào)用了observer.onNext,onComplete,onError方法荸频,這個observer正是外部傳進來的觀察者對象,如下圖所示:
image.png
好了客冈,整個的時間訂閱流程終于分析完了,當然了有事件訂閱自然就有取消訂閱稳强,下一章RxJava2筆記(二场仲、事件取消流程)我們將分析事件是如何取消訂閱的。