前言
最近由于項(xiàng)目需要自己搭建了網(wǎng)絡(luò)框架,采用時下非常流行的Rxjava2 + Retrofit搭建澳淑, Rxjava現(xiàn)在已經(jīng)發(fā)展到Rxjava2臭胜,之前一直都只是再用Rxjava还最,但從來沒有了解下Rxjava的內(nèi)部實(shí)現(xiàn)芭逝,未來知其然并且知其所以然,今天我將一步步來分析Rxjava2的源碼舆驶,Rxjava2分Observable和Flowable兩種(無被壓和有被壓)蔬浙,我們今天先從簡單的無背壓的observable來分析。如有不對的地方贞远,望大牛指教&輕拍畴博。源碼基于rxjava:2.1.1。
簡單的例子
先來段最簡單的代碼蓝仲,直觀的了解下整個Rxjava運(yùn)行的完整流程俱病。
private void doSomeWork() {
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("a");
e.onComplete();
}
});
Observer observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i("lx", " onSubscribe : " + d.isDisposed());
}
@Override
public void onNext(String str) {
Log.i("lx", " onNext : " + str);
}
@Override
public void onError(Throwable e) {
Log.i("lx", " onError : " + e.getMessage());
}
@Override
public void onComplete() {
Log.i("lx", " onComplete");
}
};
observable.subscribe(observer);
}
上面代碼之所以將observable和observer單獨(dú)聲明,最后再調(diào)用observable.subscribe(observer);
是為了分步來分析:
- 被觀察者 Observable 如何生產(chǎn)事件的
- 被觀察者 Observable 何時生產(chǎn)事件的
- 觀察者Observer是何時接收到上游事件的
- Observable 與Observer是如何關(guān)聯(lián)在一起的
Observable
Observable是數(shù)據(jù)的上游袱结,即事件生產(chǎn)者
首先來分析事件是如何生成的亮隙,直接看代碼 Observable.create()
方法。
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { // ObservableOnSubscribe 是個接口垢夹,只包含subscribe方法溢吻,是事件生產(chǎn)的源頭。
ObjectHelper.requireNonNull(source, "source is null"); // 判空
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
最重要的是RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));這句代碼果元。繼續(xù)跟蹤進(jìn)去
/**
* Calls the associated hook function.
* @param <T> the value type
* @param source the hook's input value
* @return the value returned by the hook
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
看注釋促王,原來這個方法是個hook function。 通過調(diào)試得知靜態(tài)對象onObservableAssembly默認(rèn)為null而晒, 所以此方法直接返回傳入的參數(shù)source蝇狼。
onObservableAssembly可以通過靜態(tài)方法RxJavaPlugins. setOnObservableAssembly ()設(shè)置全局的Hook函數(shù), 有興趣的同學(xué)可以自己去試試倡怎。 這里暫且不談迅耘,我們繼續(xù)返回代碼贱枣。
現(xiàn)在我們明白了:
Observable<String> observable=Observable.create(new ObservableOnSubscribe<String>() {
...
...
})
相當(dāng)于:
Observable<String> observable=new ObservableCreate(new ObservableOnSubscribe<String>() {
...
...
}))
好了,至此我們明白了颤专,事件的源就是new ObservableCreate()
對象纽哥,將ObservableOnSubscribe
作為參數(shù)傳遞給ObservableCreate
的構(gòu)造函數(shù)。
事件是由接口ObservableOnSubscribe
的subscribe方法上產(chǎn)的栖秕,至于何時生產(chǎn)事件春塌,稍后再分析。
Observer
Observer 是數(shù)據(jù)的下游累魔,即事件消費(fèi)者
Observer是個interface,包含 :
void onSubscribe(@NonNull Disposable d);
void onNext(@NonNull T t);
void onError(@NonNull Throwable e);
void onComplete();
上游發(fā)送的事件就是再這幾個方法中被消費(fèi)的够滑。上游何時發(fā)送事件垦写、如何發(fā)送,稍后再表彰触。
subscribe
重點(diǎn)來了梯投,接下來最重要的方法來了:observable.subscribe(observer);
從這個方法的名字就知道,subscribe是訂閱况毅,是將觀察者(observer)與被觀察者(observable)連接起來的方法分蓖。只有subscribe方法執(zhí)行后,上游產(chǎn)生的事件才能被下游接收并處理尔许。其實(shí)自然的方式應(yīng)該是observer訂閱(subscribe) observable, 但這樣會打斷rxjava的鏈?zhǔn)浇Y(jié)構(gòu)么鹤。所以采用相反的方式。
接下來看源碼味廊,只列出關(guān)鍵代碼
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
......
observer = RxJavaPlugins.onSubscribe(this, observer); // hook 蒸甜,默認(rèn)直接返回observer
......
subscribeActual(observer); // 這個才是真正實(shí)現(xiàn)訂閱的方法。
......
}
// subscribeActual 是抽象方法余佛,所以需要到實(shí)現(xiàn)類中去看具體實(shí)現(xiàn)柠新,也就是說實(shí)現(xiàn)是在上文中提到的ObservableCreate中
protected abstract void subscribeActual(Observer<? super T> observer);
接下來我們來看ObservableCreate.java:
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source; // 事件源,生產(chǎn)事件的接口辉巡,由我們自己實(shí)現(xiàn)
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer); // 發(fā)射器
observer.onSubscribe(parent); //直接回調(diào)了觀察者的onSubscribe
try {
// 調(diào)用了事件源subscribe方法生產(chǎn)事件恨憎,同時將發(fā)射器傳給事件源。
// 現(xiàn)在我們明白了郊楣,數(shù)據(jù)源生產(chǎn)事件的subscribe方法只有在observable.subscribe(observer)被執(zhí)行
后才執(zhí)行的憔恳。 換言之,事件流是在訂閱后才產(chǎn)生的净蚤。
//而observable被創(chuàng)建出來時并不生產(chǎn)事件喇嘱,同時也不發(fā)射事件。
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
現(xiàn)在我們明白了塞栅,數(shù)據(jù)源生產(chǎn)事件的subscribe方法只有在observable.subscribe(observer)被執(zhí)行后才執(zhí)行的者铜。 換言之腔丧,事件流是在訂閱后才產(chǎn)生的。而observable被創(chuàng)建出來時并不生產(chǎn)事件作烟,同時也不發(fā)射事件愉粤。
接下來我們再來看看事件是如何被發(fā)射出去,同時observer是如何接收到發(fā)射的事件的
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
CreateEmitter 實(shí)現(xiàn)了ObservableEmitter接口拿撩,同時ObservableEmitter接口又繼承了Emitter接口衣厘。
CreateEmitter 還實(shí)現(xiàn)了Disposable接口,這個disposable接口是用來判斷是否中斷事件發(fā)射的压恒。
從名稱上就能看出影暴,這個是發(fā)射器,故名思議是用來發(fā)射事件的探赫,正是它將上游產(chǎn)生的事件發(fā)射到下游的型宙。
Emitter是事件源與下游的橋梁。
CreateEmitter 主要包括方法:
void onNext(@NonNull T value);
void onError(@NonNull Throwable error);
void onComplete();
public void dispose() ;
public boolean isDisposed();
是不是跟observer的方法很像伦吠?
我們來看看CreateEmitter中這幾個方法的具體實(shí)現(xiàn):
只列出關(guān)鍵代碼
public void onNext(T t) {
if (!isDisposed()) { // 判斷事件是否需要被丟棄
observer.onNext(t); // 調(diào)用Emitter的onNext妆兑,它會直接調(diào)用observer的onNext
}
}
public void onError(Throwable t) {
if (!isDisposed()) {
try {
observer.onError(t); // 調(diào)用Emitter的onError,它會直接調(diào)用observer的onError
} finally {
dispose(); // 當(dāng)onError被觸發(fā)時毛仪,執(zhí)行dispose(), 后續(xù)onNext搁嗓,onError, onComplete就不會繼
續(xù)發(fā)射事件了
}
}
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete(); // 調(diào)用Emitter的onComplete箱靴,它會直接調(diào)用observer的onComplete
} finally {
dispose(); // 當(dāng)onComplete被觸發(fā)時腺逛,也會執(zhí)行dispose(), 后續(xù)onNext,onError衡怀, onComplete
同樣不會繼續(xù)發(fā)射事件了
}
}
}
CreateEmitter 的onError和onComplete方法任何一個執(zhí)行完都會執(zhí)行dispose()中斷事件發(fā)射屉来,所以observer中的onError和onComplete也只能有一個被執(zhí)行。
現(xiàn)在終于明白了狈癞,事件是如何被發(fā)射給下游的茄靠。
當(dāng)訂閱成功后,數(shù)據(jù)源ObservableOnSubscribe開始生產(chǎn)事件蝶桶,調(diào)用Emitter的onNext慨绳,onComplete向下游發(fā)射事件,Emitter包含了observer的引用真竖,又調(diào)用了observer onNext脐雪,onComplete,這樣下游observer就接收到了上游發(fā)射的數(shù)據(jù)恢共。
總結(jié)
Rxjava的流程大概是:
- Observable.create 創(chuàng)建事件源战秋,但并不生產(chǎn)也不發(fā)射事件。
- 實(shí)現(xiàn)observer接口讨韭,但此時沒有也無法接受到任何發(fā)射來的事件脂信。
- 訂閱 observable.subscribe(observer)癣蟋, 此時會調(diào)用具體Observable的實(shí)現(xiàn)類中的subscribeActual方法,
此時會才會真正觸發(fā)事件源生產(chǎn)事件狰闪,事件源生產(chǎn)出來的事件通過Emitter的onNext疯搅,onError,onComplete發(fā)射給observer對應(yīng)的方法由下游observer消費(fèi)掉埋泵。從而完成整個事件流的處理幔欧。
PS: observer中的onSubscribe在訂閱時即被調(diào)用,并傳回了Disposable丽声, observer中可以利用Disposable來隨時中斷事件流的發(fā)射礁蔗。
今天所列舉的例子是最簡單的一個事件處理流程,沒有使用線程調(diào)度雁社,Rxjava最強(qiáng)大的就是異步時對線程的調(diào)度和隨時切換觀察者線程浴井。至于這部分的源碼且聽下回講解。