本文的分析基于RxJava1.1.5版本,map的主要作用是用來將一個對象轉換成另外一個對象缩焦,它的實現(xiàn)基于了RxJava中非常重要的lift()方法
1丸凭、下面先寫一個簡單的例子
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("1");
subscriber.onCompleted();
}
}).map(new Func1<String, Integer>() {
@Override
public Integer call(String s) {
return Integer.valueOf(s);
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.e("TAG","---onCompleted()----");
}
@Override
public void onError(Throwable e) {
Log.e("TAG","---onError()----");
}
@Override
public void onNext(Integer integer) {
Log.e("TAG", "值為:" + integer);
}
});
以上是最簡單的RxJava的一個試用,因為本文是分析map的過程绵疲,所以并沒有編寫線程調度器
2勤婚、下面是具體的分析過程:
首先:進入到map()方法的實現(xiàn)
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return lift(new OperatorMap<T, R>(func));
}
這里傳入func1對象作為參數(shù)摹量,返回值為lift()方法的返回值,其中在調用liftf()方法的時候馒胆,會創(chuàng)建一個OperatorMap對象作為參數(shù)傳入缨称,而OperatorMap繼承自Operator接口,我的理解是祝迂,這里主要是為了將func1對象作為參數(shù)睦尽,然后在OperatorMap對象中創(chuàng)建新的Subscriber對象(新的觀察者對象),這里新的Subscriber對象稱為subscriber_new
接著進入到liftf()方法中
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
}
在lift()方法中型雳,會返回新的Observable對象(新的被觀察者對象当凡,這里先稱為observable_new)山害,同時會將原來的Observable對象中的onSubscribe和subscriber_new作為參數(shù)創(chuàng)建OnSubscribeLift對象,在我理解中沿量,這個對象就是新的OnSubscribe對象浪慌,這里先稱為onsubscribe_new
通過上面兩步的分析可以知道,map最終的返回值是在lift()方法中創(chuàng)建的新的Observable對象朴则,即observable_new對象权纤。
接著,我們的程序會到了主線上乌妒,程序將會調用subscribe()方法汹想,這個方法用于訂閱被觀察者,那么撤蚊,這里觀察者將不會是舊的Observable對象古掏,而是新的Observable對象,即observable_new對象侦啸,那么我們進入到該方法內部看看槽唾,下面是subscribe()方法實現(xiàn)
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
在該方法內部會將舊的Subscriber對象和舊的Observable對象作為參數(shù)去調用Observable.subscribe(subscriber, this)
下面接著看Observable.subscribe(subscriber, this)
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
if (subscriber == null) {
throw new IllegalArgumentException("observer can not be null");
}
if (observable.onSubscribe == null) {
throw new IllegalStateException("onSubscribe function can not be null.");
}
subscriber.onStart();
if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber<T>(subscriber);
}
// The code below is exactly the same an unsafeSubscribe but not used because it would
// add a significant depth to already huge call stacks.
try {
// allow the hook to intercept and/or decorate
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
//異常的處理代碼省略....
}
}
前面的判斷跳過,直接看hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);這個調用匹中,它的返回值為OnSubscribe對象夏漱,這個OnSubscribe對象就是新的OnSubscribe對象,即為onsubscribe_new對象顶捷,而調用它的call()方法,即調用的是OnSubscribeLift類中的call()方法屎篱,那么進入到OnSubscribeLift類中的call()方法看看服赎,注意,這里傳入的參數(shù)的舊的subscriber對象
@Override
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = hook.onLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
parent.call(st);
} catch (Throwable e) {
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
}
}
首先Subscriber<? super T> st =hook.onLift(operator).call(o);這個操作交播,這里的onLift()方法返回的是傳入的operator對象重虑,那么這個operator對象是什么呢,它就是在之前創(chuàng)建的OperatorMap對象秦士,即使新的偽subscriber對象缺厉,那么也就是說,這個操作調用的call()方法隧土,調用的是OperatorMap類中的call()方法提针,傳入的參數(shù)為舊的subscriber對象,下面進入OperatorMap類中的call()方法中看看
@Override
public Subscriber<? super T> call(final Subscriber<? super R> o) {
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
o.add(parent);
return parent;
}
在這個方法中曹傀,首先會以舊的subscriber對象和transformer(這個transformer就是func1對象辐脖,在創(chuàng)建OperatorMap對象的時候傳入)為參數(shù)創(chuàng)建新的subscriber對象,之后將該對象添加到subscriptions集合中皆愉,并且返回新的subscriber對象嗜价。那么在OnSubscribeLift類中的這個類中的call()方法中的Subscriber<? super T> st = hook.onLift(operator).call(o);這個操作所獲得的就是新的subscriber對象艇抠,接著回到OnSubscribeLift類中的這個類中的call()方法中,在獲取到新的subscriber對象之后久锥,它會用新的subscriber對象去調用onStart()方法家淤,之后會執(zhí)行parent.call(st);這個方法,那么這個parent是什么呢瑟由?它就是在創(chuàng)建OnSubscribeLift對象是傳入的舊的OnSubscribe對象媒鼓,那么舊的OnSubscribe對象去調用call()方法,并且傳入的參數(shù)為新的subscriber對象错妖,那么自然調用的這個call()方法就是我們在創(chuàng)建初始Observable對象的時候的回調的call()方法绿鸣,即使以下的方法
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("1");
subscriber.onCompleted();
}
這里的subscriber對象已經(jīng)不是原來的subscriber對象了,而是在OperatorMap類中的call()方法中創(chuàng)建的MapSubscriber對象(即新的subscriber對象)暂氯,那么它調用的onNext()方法潮模,自然調用的就應該是MapSubscriber類中的onNext()方法
@Override
public void onNext(T t) {
R result;
try {
result = mapper.call(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}
actual.onNext(result);
}
在這個方法中,主要的作用就是將func1對象中的類型轉換邏輯痴施,作為新的參數(shù)去調用舊的subscriber對象中的onNext()方法擎厢,也就是我們自己創(chuàng)建的subscriber對象中的onNext()回調方法,在這里辣吃,T類型就是代表舊的數(shù)據(jù)類型动遭,而R轉換后的數(shù)據(jù)類型息楔,mapper就是func1對象员辩,它調用的call()方法,就是我們自己在call中寫的類型轉換邏輯鸵赫,而actual就是舊的subscriber對象哩簿,到這里宵蕉,整個流程就已經(jīng)貫通了
最后,這里需要注意的就是舊的對象就是代表初始的對象节榜,而新的對象就代表是調用map()方法的時候創(chuàng)建的新的對象羡玛,還有要注意的就是舊的Observable對象中的回調call()方法中的subscriber對象在經(jīng)過map方法之后已經(jīng)不是舊的subscriber對象了,而是新的subscriber對象