1 上游怎么傳遞onComplete到下游
2 上游怎么傳遞onError到下游
3 它倆是互斥的嗎
4 多次調(diào)用的結(jié)果是什么
5 onNext onSubscribe順便分析一下
示例代碼
Observable.create(new ObservableOnSubscribe<String>() {
public void subscribe(@NonNull ObservableEmitter<String> emitter) {
emitter.onNext("msg");
String data = null;
data.length();
// emitter.onComplete();
// emitter.onError(new IllegalArgumentException("1"));
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d(TAG, s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "" + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "complete");
}
});
先具體分析一下流程
Observable.create.subscribe鏈?zhǔn)秸{(diào)用
Observable.create返回一個(gè)Observable,實(shí)現(xiàn)是ObservableCreate
Observable.subcribe如下粉洼,會(huì)觸發(fā)subcribeActual方法
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
subscribeActual是抽象的方法,所以針對(duì)操作符create需要看ObservableCreate的
subscribeActual
分析ObservableCreate的subscribeActual
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
先調(diào)用觀察者onSubscribe,然后調(diào)用source的subscribe,也就是如下例子中new ObservableOnSubscribe,接著觸發(fā)emitter.onNext,接著觸發(fā)觀察者的onNext戒突,這也是上游的subscribe怎么通過onNext觸發(fā)到下游的onNext
梳理調(diào)用流程
1 ObservableCreate.subcribe
2 Observable.subscibeActual
3 ObservableOnSubscribe.subscribe
4 CreateEmitter.onNext
5 observer.onNext(t);
所以CreateEmitter就把上游和下游關(guān)聯(lián)起來,
當(dāng)上游調(diào)用subcribeActual就會(huì)出發(fā)CreateEmitter然后調(diào)用下游onNext
Observable.create(new ObservableOnSubscribe<String>() {
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("msg");
public void onNext(T t) {
if (!isDisposed()) {
observer.onNext(t);
}
}
并且此操作會(huì)捕捉異常,如果出現(xiàn)異常交給parent.onError(ex),而onError方法觸發(fā)tryOnError方法
public boolean tryOnError(Throwable t) {
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
這是程序運(yùn)行拋出異常,也就是調(diào)用顯式的onError:
如果第二次onError顾瞻,就會(huì)判斷當(dāng)前訂閱關(guān)系解除,然后交給RxJavaPlugins.onError(t);如果設(shè)置RxJava統(tǒng)一異常捕捉則交給此處理否則
直接拋出異常德绿;一般為閃退荷荤;
subscribe(Consumer)分析
內(nèi)部轉(zhuǎn)成觀察者Observer
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
在onNext觸發(fā)accept方法
@Override
public void onNext(T t) {
if (!isDisposed()) {
try {
onNext.accept(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
get().dispose();
onError(e);
}
}
}
onComplete調(diào)用也是判斷沒有解除訂閱就finally解除訂閱
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
所以調(diào)用一次complete就會(huì)解除訂閱退渗,多次complete就會(huì)失效,因?yàn)檫B接已經(jīng)解除關(guān)系了蕴纳。
onNext
onNext只是判斷有沒有解除上下游關(guān)系会油,并沒有finally里面解除關(guān)聯(lián),所以可以多次調(diào)用
public void onNext(T t) {
if (!isDisposed()) {
observer.onNext(t);
}
}
onSubcribe
這個(gè)觀察者觸發(fā)條件只是subcribe觸發(fā)古毛,開發(fā)者無法調(diào)用CreateEmitter也沒有onSubscribe方法
總結(jié):onError調(diào)用一次則解除上下游關(guān)聯(lián)翻翩,如果subscribe內(nèi)部出現(xiàn)異常會(huì)自動(dòng)捕捉傳給下層觀察者的onError;由于調(diào)用一次就會(huì)解除訂閱,第二次調(diào)用交給默認(rèn)Rxjava處理稻薇,默認(rèn)拋出異常出現(xiàn)閃退
onNext則不會(huì)自動(dòng)解除訂閱
onSubcribe內(nèi)部觸發(fā)
onComplete也是第一次觸發(fā)就不會(huì)
先onError后onComplete則由于前一次解除訂閱則后續(xù)onComplete判斷訂閱解除也就不再觸發(fā)
先onComplete后onError則前一次解除訂閱嫂冻,后續(xù)再onError默認(rèn)會(huì)閃退