最近又在看網(wǎng)易公開課里面的美國名校的畢業(yè)演講名党,雖然以前看過区丑,但現(xiàn)在重新看的話嘱蛋,依然能學(xué)到不少東西叫潦,能把某個話題講的很細(xì),很深爸邢,我現(xiàn)在特別喜歡Michelle樊卓,作為第一夫人卻沒有第一夫人的架子,聽她的演講杠河,能感受到那種能量密度特別大碌尔,她演講到人要想成為什么樣的人時,我特別震撼券敌,她認(rèn)為如果想要成為什么樣的人的話唾戚,必須要有適應(yīng)力和責(zé)任心。而國內(nèi)的演講都是講怎么成功待诅,怎么成才叹坦,怎么愛國,且都談的特別的大卑雁,泛泛而談募书,我認(rèn)為,這也是現(xiàn)在的年輕人特別浮躁的原因之一吧序厉。對于我目前而言要做的是锐膜,腳踏實地的做事情,好好孝敬父母弛房。
本篇文章主要是明白Rxjava的流程 ,本文基于Rxjava2.2.8 的源碼
1.分析案例
分析下經(jīng)常用的這個操作符到底做了哪些事情
Observable.create(new ObservableOnSubscribe<String>(){
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("hellp");
}
}).map(new Function<String, Object>() {
@Override
public Object apply(String s) throws Exception {
return s + " world";
}
}).subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
Log.d("verse", "onSubscribe");
}
@Override
public void onNext(Object o) {
String str = (String) o;
Log.d("verse", str);
}
@Override
public void onError(Throwable e) {
Log.d("verse", "onError");
}
@Override
public void onComplete() {
Log.d("wangyang", "onComplete");
}
});
上述代碼很簡單而柑,在此不再多說
2.源碼分析create()
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
當(dāng)調(diào)用Observable.create(ObservableOnSubscribe)時文捶,初始化了ObservableCreate對象,并引用了ObservableOnSubscribe對象
3.源碼分析map()
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
實際上是ObservableCreate.map(Function)媒咳,初始化了ObservableMap對象粹排,并引用了ObservableCreate對象和Function對象
4.源碼分析subscribe()
其實調(diào)用的是ObservableMap.subscribe()
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
剔除不是關(guān)鍵的代碼后,其實只有一行代碼
subscribeActual(observer);
查看該代碼發(fā)現(xiàn)是抽象方法涩澡,該實現(xiàn)方法是ObservableMap.subscribeActual(observer)
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
將實現(xiàn)的Observer和Function封裝在MapObserver中顽耳,而source是引用上游的對象
即ObservableCreate.subscribe(Observer),還是這段代碼
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
又繼續(xù)調(diào)用ObservableCreate.subscribeActual(observer)
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
又將observer封裝在CreateEmitter中妙同,并調(diào)用observer.onSubscribe(parent)射富,而這個observer就是MapObserver,查看MapObserver.onSubscribe()
public final void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
if (d instanceof QueueDisposable) {
this.qd = (QueueDisposable<T>)d;
}
if (beforeDownstream()) {
downstream.onSubscribe(this);
afterDownstream();
}
}
}
而這里的downstream就是我們在subscribe中實現(xiàn)的內(nèi)部內(nèi)粥帚,看源碼知道了原來onSubscribe()方法啥事情也沒干胰耗,只是調(diào)用了該方法,一般在這個方法里做一些初始化的操作芒涡,繼續(xù)往下看柴灯,會調(diào)用
source.subscribe(parent);
source就是在 create()所實現(xiàn)的類卖漫,并傳遞CreateEmitter,當(dāng)調(diào)用onNext()時赠群,就是調(diào)用的CreateEmitter.onNext()羊始,查看該方法
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
剔除非關(guān)鍵性代碼后,只有一行代碼
observer.onNext(t);
而這個Observer是MapObserver查描,查看MapObserver.onNext()
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
downstream.onNext(v);
}
剔除非關(guān)鍵性代碼后店枣,只有兩行代碼
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
downstream.onNext(v);
mapper是實現(xiàn)的Function,轉(zhuǎn)化為相應(yīng)的類型后叹誉,會調(diào)用
downstream.onNext(v);
而downstrean是在subscribe中所實現(xiàn)的內(nèi)部內(nèi)鸯两,最后會調(diào)用我們實現(xiàn)的匿名內(nèi)部內(nèi)Observer.onNext()
自此,該案例分析完畢
5.總結(jié)
當(dāng)調(diào)用操作符時长豁,會創(chuàng)建對應(yīng)的對象钧唐,并引用上游的對象,如調(diào)用操作符map()時匠襟,會創(chuàng)建ObservableMap對象钝侠,并持有上游的ObservableCreate對象,這樣一環(huán)扣著一環(huán)酸舍,直至調(diào)用subscribe時帅韧,會將subscribe(Observer)里面的參數(shù)和操作符的內(nèi)部內(nèi)封裝在Observable類里面的靜態(tài)內(nèi)部內(nèi)里面,由于引用了上游的Observable啃勉,再調(diào)用上游的subscribe的方法忽舟,依次類推,直至ObservableCreate.subscribe(Observer)方法里淮阐,在這里叮阅,會先執(zhí)行一個onSubscribe()方法,一般是在這里面做一個執(zhí)行前的操作泣特,再會執(zhí)行ObservableOnSubscribe.subscribe(ObservableEmitter)方法浩姥,在這個方法里可以執(zhí)行ObservableEmitter.onNext(),而當(dāng)前的實例是在ObservableCreate的內(nèi)部內(nèi)的onNext()方法里實現(xiàn)的状您,在這個onNext里勒叠,又會調(diào)用observer.onNext()方法,這里的observer是下游的Observable的內(nèi)部內(nèi)膏孟,在下游的內(nèi)部內(nèi)的onNext()執(zhí)行需要的操作后又繼續(xù)傳遞給下下游的Observable.onNext()眯分,依次類推,直至最后一個onNext()骆莹,也就是subscribe(Observer)方法的參數(shù)的Observer.onNext()
一圖勝千言颗搂,我在網(wǎng)上找了Rxjava2的流程圖