之前在學(xué)習(xí)Rxjava時(shí),寫demo莽红。發(fā)現(xiàn)在FlatMap運(yùn)算中返弹,即使上游發(fā)送了onComplete凹联,在下游也無法執(zhí)行onComplete:
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
// Log.i(TAG, "!!!!onNext 1 before");
// e.onNext(1);
// Log.i(TAG, "!!!!onNext 1 after");
Log.i(TAG, "!!!!onNext 2 before");
e.onNext(2);
Log.i(TAG, "!!!!onNext 2 after");
// 位置1:如果注釋以下代碼成艘,在位置10出將無法觸發(fā)
Log.i(TAG, "!!!!onComplete before");
e.onComplete();
Log.i(TAG, "!!!!onComplete after");
}
})
.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(final Integer integer) throws Exception {
Log.i(TAG, "@@@@flatMap " + integer);
Observable<String> ob = Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(final ObservableEmitter<String> e) throws Exception {
String value = "####flatMap: Observable2,create: " + integer;
Log.i(TAG,value);
Log.i(TAG, "####flatMap: Observable2, next " + integer + " before");
e.onNext(value);
Log.i(TAG, "####flatMap: Observable2, next " + integer + " after");
if(integer == 2){
// 位置2:如果注釋以下代碼赏半,在位置10出將無法觸發(fā)
// Log.i(TAG, "####flatMap: Observable2, onComplete 2 before");
// e.onComplete();
// Log.i(TAG, "####flatMap: Observable2, onComplete 2 after");
}
}
});
return ob;
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i(TAG, "XXXXsubscribe:" + s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.i(TAG, "TTTTerror");
}
}, new Action() {
@Override
public void run() throws Exception {
//位置10:觸發(fā)onComplete .如果注釋位置1和位置2中任何一處,都將無法觸發(fā)這里
Log.i(TAG, "VVVVcomplete");
}
}
);
Log.i(TAG, "VVVVcomplete"); 沒有執(zhí)行
為什么淆两?断箫??
分析Flatmap運(yùn)算符源碼琼腔,主要是ObservableFlatMap 這個(gè)類:
關(guān)鍵類:MergeObserver,InnerObserver
關(guān)鍵方法:mergeObserver.onNext(),merge.onComplete(),mergeObserver.drain(),merge.insertInnser(),merge.removeInner();
innerObserver.onComplete(),
mergeObserver.drain()關(guān)鍵代碼:
-
當(dāng)innerObserver中觸發(fā)了onComplete踱葛,將移除innerObserver
-
當(dāng)mergeObserver.onComplete或onError被觸發(fā)丹莲,done置為true,并且沒有innerObserver時(shí)尸诽,才能觸發(fā)child.onComplete()