map操作符
被觀察者數(shù)據(jù)源泛型剩瓶,當發(fā)射器的數(shù)據(jù)類型和觀察者數(shù)據(jù)類型不同時曹傀,通過map操作符轉換,可以將上游發(fā)射的類型轉換成任意對象類型形用。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
}
}).map(new Function<Integer, String>() {
@Override
public String apply(Integer s) throws Exception {
String newStr = s + "_";
Log.d(TAG, "int apply s " + newStr);
return newStr;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String response) throws Exception {
Log.d(TAG, "Observer : " + response);
}
});
發(fā)射數(shù)據(jù)類型是Integer類,通過map操作符证杭,將類型轉換成String類田度。Function是一個類型轉換接口,F(xiàn)unction<T, R>解愤,將T轉換R镇饺,解決被觀察者和觀察者數(shù)據(jù)類型不匹配問題。
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
返回一個被觀察者ObservableMap送讲,封裝原始被觀察者ObservableCreate和轉換接口Function奸笤,調(diào)用ObservableMap的subscribe注冊方法。
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
創(chuàng)建觀察者MapObserver哼鬓,封裝自定義觀察者和轉換接口Function监右。source源即內(nèi)部ObservableCreate,調(diào)用它的subscribe方法异希。
ObservableCreate的#subscribeActual方法健盒,創(chuàng)建CreateEmitter數(shù)據(jù)發(fā)射器,通知觀察者已經(jīng)注冊称簿。
調(diào)用數(shù)據(jù)源source(ObservableOnSubscribe)的subscribe方法扣癣,將發(fā)射器暴漏給外部。外部通過發(fā)射器發(fā)射數(shù)據(jù)憨降,如onNext方法父虑。
發(fā)射器CreateEmitter持有觀察者MapObserver,當onNext事件發(fā)射后券册,通知觀察者MapObserver的onNext方法频轿,傳參發(fā)射的數(shù)據(jù)類型Integer類。
public void onNext(T t) {
...
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}
根據(jù)MapObserver內(nèi)部轉換接口Function烁焙,apply方法航邢,將T類型轉換成U類型,再調(diào)用自己定義觀察者Observer的onNext方法骄蝇,入?yún)?shù)據(jù)類型轉換成String膳殷。
發(fā)射器onNext方法和觀察者accept方法按照通知順序執(zhí)行。
flatMap操作符
flatMap操作符和map類似,F(xiàn)unction接口實現(xiàn)類型轉換赚窃,轉換的對象是一個被觀察者ObservableSource册招。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer s) throws Exception {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
//將上游Integer類型數(shù)據(jù),在新發(fā)射器中改造發(fā)射勒极。
String newStr = s + "_gc1";
String newStr2 = s + "_gc2";
Log.d(TAG, "int apply s " + newStr);
Log.d(TAG, "int apply s " + newStr2);
e.onNext(newStr);
e.onNext(newStr2);
}
});
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String response) throws Exception {
Log.d(TAG, "Observer : " + response);
}
});
將上游發(fā)射器每個Integer類型的數(shù)據(jù)轉換成Observable類型是掰,再由每個轉換的被觀察者發(fā)射目標類型數(shù)據(jù)。
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize) {
...
return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
}
返回一個被觀察者ObservableFlatMap辱匿。封裝原始被觀察者ObservableCreate和轉換接口Function键痛,調(diào)用ObservableFlatMap的subscribe注冊方法。
public void subscribeActual(Observer<? super U> t) {
...
source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
創(chuàng)建觀察者MergeObserver匾七,封裝自定義觀察者和轉換接口Function絮短,source源即內(nèi)部ObservableCreate,調(diào)用它的subscribe方法昨忆。當發(fā)射器onNext方法發(fā)射時丁频,調(diào)用發(fā)射器內(nèi)部MergeObserver的onNext方法。
@Override
public void onNext(T t) {
ObservableSource<? extends U> p;
try {
p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
} catch (Throwable e) {
return;
}
//調(diào)用的是新建ObservableSource的注冊方法邑贴。
subscribeInner(p);
}
通過Function接口方法席里,將Integer類型轉換成ObservableSource類型,轉換對象是一個被觀察者痢缎,外部創(chuàng)建胁勺,ObservableCreate類型,將Integer類型的數(shù)據(jù)暴露在新被觀察者的數(shù)據(jù)源發(fā)射器中独旷,處理轉換成新發(fā)射器支持String類型,subscribeInner方法寥裂,新被觀察者訂閱嵌洼。
void subscribeInner(ObservableSource<? extends U> p) {
for (;;) {
if (p instanceof Callable) {
...
} else {
InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
if (addInner(inner)) {
p.subscribe(inner);
}
break;
}
}
}
調(diào)用Observable的subscribe方法,訂閱InnerObserver觀察者封恰,外部調(diào)用發(fā)射器onNext方法麻养,可以獲取apply方法中上層發(fā)射的Integer數(shù)據(jù),按照String類型诺舔,觸發(fā)兩個onNext方法再次發(fā)射數(shù)據(jù)鳖昌,兩次調(diào)用觀察者InnerObserver的onNext方法,每次低飒,調(diào)用它引用MergeObserver的onNext方法许昨,最終,通知到外部觀察者褥赊。
flatMap最初的onNext順序糕档,在Function轉換成新Observable后,根據(jù)收到的數(shù)據(jù)拌喉,包裝重新發(fā)射一批新數(shù)據(jù)速那。在觀察者到的onNext順序不一定是按照最初的onNext順序調(diào)用的俐银。
上面發(fā)送的1,2端仰,3捶惜,在觀察者中看到的不一定是1,2荔烧,3的排序吱七,加一個延遲就能看到,即1_gc1茴晋,1_gc2陪捷,3_gc1,3_gc2诺擅,2_gc1市袖,2_gc2。
總結
flatMap不保證數(shù)據(jù)發(fā)射流的通知順序烁涌。
concatMap和flatMap功能相同苍碟,可以保證按照發(fā)射順序通知。
任重而道遠