這邊文章主要記錄使用Rxjava過程中對map方法以及flatmap方法的源碼理解主到,自認(rèn)為也是RxJava的一個精髓所在淆珊。
有關(guān)RxJava的詳細(xì)使用蛉签,網(wǎng)絡(luò)已經(jīng)有很多資料悯嗓。這里推薦[匠心寫作]
的一篇文章
下面進(jìn)入正題拒迅,先看下map方法
map方法解析:
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return lift(new OperatorMap<T, R>(func));
}
OperatorMap是Operator接口的實現(xiàn)類骚秦,來看一下Operator接口
public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {
// cover for generics insanity
}
而Operator又繼承了Func1,這個接口有一個只有一個方法R call(T t);
看下OperatorMap的call方法實現(xiàn)
public Subscriber<? super T> call(final Subscriber<? super R> o) {
return new Subscriber<T>(o) {
@Override
public void onCompleted() {
o.onCompleted();
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
@Override
public void onNext(T t) {
try {
o.onNext(transformer.call(t));
} catch (Throwable e) {
Exceptions.throwOrReport(e, this, t);
}
}
};
}
然后看lift函數(shù)
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = hook.onLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
onSubscribe.call(st);
} catch (Throwable e) {
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
}
}
});
}
簡單點來說呢璧微,做了三件事
1.創(chuàng)建新的Observable作箍,負(fù)責(zé)接手原Observable發(fā)出的事件
2.hook.onLift(operator).call(o)
會執(zhí)行OperatorMap的call方法,返回一個新的Subscriber。
3.onSubscribe.call(st)
新的Subscriber會傳給原Observable前硫。在原Observable發(fā)送事件是會調(diào)用新Subscriber的onNext方法胞得,會先執(zhí)行transformer.call(t)
即map(Func1)方法中參數(shù)Func1的call方法,然后執(zhí)行原Subscriber的onNext方法屹电。
faltmap方法解析
還是老樣子阶剑,先貼一下faltmap的源碼
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
if (getClass() == ScalarSynchronousObservable.class) {
return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func);
}
return merge(map(func));
}
這里會執(zhí)行 merge(map(func))
這里其實就是merge了剛才的map方法嘛。
好了嗤详,map方法我就不說了个扰,可以看下前面的解釋,這里提示一點 這里的map方法返回的是Observable<Observable>
葱色,然后看下merge方法
public final static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) {
if (source.getClass() == ScalarSynchronousObservable.class) {
return ((ScalarSynchronousObservable<T>)source).scalarFlatMap((Func1)UtilityFunctions.identity());
}
return source.lift(OperatorMerge.<T>instance(false));
}
這里會執(zhí)行到 source.lift(OperatorMerge.<T>instance(false))
其實就是之前map方法里面介紹的lift方法递宅。這里的source類型是Observable<Observable>
可以理解成一個新Observable接收所有原Observable發(fā)出的事件,組成一個新的Observable。然后執(zhí)行l(wèi)ift的方法办龄,從之前map方法的分析知道烘绽,這里會先去執(zhí)行Operator中的call方法,然后執(zhí)行原Subscriber中的call方法俐填。
這里Operator為OperatorMerge安接,看下這個類中的call方法
public Subscriber<Observable<? extends T>> call(final Subscriber<? super T> child) {
MergeSubscriber<T> subscriber = new MergeSubscriber<T>(child, delayErrors, maxConcurrent);
MergeProducer<T> producer = new MergeProducer<T>(subscriber);
subscriber.producer = producer;
child.add(subscriber);
child.setProducer(producer);
return subscriber;
}
這里經(jīng)過一些操作,最終會走到MergeProducer中的request方法中
public void request(long n) {
if (n > 0) {
if (get() == Long.MAX_VALUE) {
return;
}
BackpressureUtils.getAndAddRequest(this, n);
subscriber.emit();
} else
if (n < 0) {
throw new IllegalArgumentException("n >= 0 required");
}
}
這里注意這句subscriber.emit()
會發(fā)送所有的事件英融。這樣就解釋通了
1.接收所有原Observable的事件盏檐,組成新的Observable
2.新Observable發(fā)送所有事件
3.原Subscriber接收到新事件后進(jìn)行處理
總結(jié):
不管是map還是flatmap,其實都是運用了轉(zhuǎn)換的思想
- 截斷原事件分發(fā)流程驶悟。
- 增加中間處理操作(map 增加了一個call方法回調(diào)胡野,flatmap增加了一次事件收集再發(fā)送)。
- 回到原事件分發(fā)流程處理事件痕鳍。