RxJava Map操作原理
最近看了一些RxJava的文章儡毕,被他好多操作符的原理包括線程切換之類的搞得云里霧里〗慌牛現(xiàn)在整理了一份最基礎(chǔ)的Map操作符原理划滋,加強(qiáng)一下理解!0Bā处坪!
本文的原理分析基于RxJava 1.0.14。對(duì)Map操作進(jìn)行了仔細(xì)的分析,閱讀本文前要求知道RxJava觀察者模式的基本原理同窘,即只有觀察者注冊(cè)的時(shí)候被觀察者的事件序列才會(huì)被發(fā)送玄帕。
先來看一個(gè)例子
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("123456");
subscriber.onComplete();
}
}).map(new Func1<String, Integer>() {
@Override
public Integer call(String s) {
return Integer.parseInt(s);
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
Log.i(TAG,"Integer=" + integer);
}
});
這個(gè)例子先創(chuàng)建了一個(gè)Observable
對(duì)象,它持有一個(gè)OnSubscribe<String>
的對(duì)象想邦,如果沒有map
方法進(jìn)行轉(zhuǎn)換裤纹,那么這個(gè)Observable
對(duì)應(yīng)的Subscriber
應(yīng)該是Subscriber<String>
,并且Subscriber
的回調(diào)方法onNext
中的參數(shù)類型應(yīng)該是String
。通過map
方法的映射,最終我們注冊(cè)的是Subscriber<Integer>
蒸甜。
接下來,我們先介紹一下map方法里做了哪些事吹零。然后我們從發(fā)生注冊(cè)事件開始說起罩抗,因?yàn)镽xJava的所有事件都是觀察者在注冊(cè)的時(shí)候才開始發(fā)送(或者說激活)的拉庵。
那么map
方法究竟是怎么做這個(gè)轉(zhuǎn)換的呢?
可以看到套蒂,map
方法的入?yún)⑹且粋€(gè)Func1
對(duì)象钞支。Func1
類的聲明如下,先不管它泛型的參數(shù)是T
操刀,還是R
烁挟。只要記住它只有一個(gè)call
方法,該方法以左邊的參數(shù)為入?yún)⒐强樱疫叺膮?shù)為返回值撼嗓。即T
為入?yún)ⅲ?code>R為返回值。
public interface Func1<T, R> extends Function {
R call(T t);
}
map
方法的實(shí)現(xiàn)如下欢唾,我們看到它將我們傳入的Func1
對(duì)象封裝成了OperatorMap
對(duì)象且警。最終調(diào)用了lift
方法。
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return lift(new OperatorMap<T, R>(func));
}
再來看看礁遣,OperatorMap
是個(gè)什么玩意兒斑芜?
public final class OperatorMap<T, R> implements Operator<R, T>;
public OperatorMap(Func1<? super T, ? extends R> transformer) {
this.transformer = transformer;
}
public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {
// cover for generics insanity
}
Operator
其實(shí)就是Fun1
,它繼承了Fun1
祟霍,也沒做什么擴(kuò)展杏头。OperatorMap
的構(gòu)造方法里把我們new
出來的Fun1
對(duì)象賦值給了transformer
。
接著來看lift
方法里做了什么沸呐。
lift
方法的實(shí)現(xiàn)醇王,代碼位于Observable中。
// 注意:這不是 lift() 的源碼崭添,而是將源碼中與性能寓娩、兼容性、擴(kuò)展性有關(guān)的代碼剔除后的核心代碼。
// 如果需要看源碼根暑,可以去 RxJava 的 GitHub 倉庫下載力试。
public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
return Observable.create(new OnSubscribe<R>() {
@Override
public void call(Subscriber subscriber) {
Subscriber newSubscriber = operator.call(subscriber);
newSubscriber.onStart();
//注意!E畔印畸裳!這個(gè)onSubscribe是原來的Observable對(duì)象持有的onSubscriber
onSubscribe.call(newSubscriber);
}
});
}
可以看到lift
方法返回了一個(gè)Observable
對(duì)象,我們把它叫做Observable$2
淳地,它持有的OnSubscriber
對(duì)象叫做OnSubscriber$2
怖糊,注冊(cè)這個(gè)。我們本來有的那個(gè)Observable
對(duì)象颇象,叫做Observable$1
伍伤,同樣它持有的OnSubscriber對(duì)象叫做OnSubscriber$1
。
這個(gè)lift
方法做了三件事:
1遣钳、利用傳入的OperationMap
創(chuàng)建了一個(gè)新的Subscriber
扰魂。
2、調(diào)用了新的Subscriber
的onStart()
方法蕴茴。這只是一個(gè)可選的準(zhǔn)備方法劝评,可以暫時(shí)忽略
3、調(diào)用了OnSubscriber$1
的call
方法倦淀。
我們來看這個(gè)Subscriber$2
是怎么被創(chuàng)建出來的蒋畜,看我們傳入的OperationMap
的call
方法。為了不被T
和R
混淆撞叽,我們這里可以理解成T
就是String
姻成,R
就是Integer
。因?yàn)槲覀冋{(diào)用map
方法是創(chuàng)建的Fun1
對(duì)象是Fun1<String, Integer>
愿棋。
@Override
public Subscriber<? super T> call(final Subscriber<? super R> o) {
return new Subscriber<T>(o) {
@Override
public void onCompleted() {
o.onCompleted();
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
@Override
public void onNext(T t) {
try {
o.onNext(transformer.call(t));
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
onError(OnErrorThrowable.addValueAsLastCause(e, t));
}
}
};
}
到這里我們基本清楚map
方法里面做了哪些事科展,一是保存了我們傳入的Fun1
對(duì)象,二是創(chuàng)建了一個(gè)Observable$2
對(duì)象初斑,并返回辛润。我們拿到了這個(gè)Observable$2
并為之注冊(cè)了一個(gè)Subscriber
。
現(xiàn)在開始见秤,我們要從注冊(cè)事件發(fā)生開始說起砂竖,看看上面講的各個(gè)方法是什么時(shí)候被觸發(fā)調(diào)用。
要明確一件事情鹃答,我們?cè)谕獠空{(diào)用map
之后只創(chuàng)建了一個(gè)Subscriber
乎澄,并且它的參數(shù)是Integer
。那么當(dāng)我們調(diào)用Observable$2.subscribe(Subscriber)
時(shí)测摔,發(fā)生了什么置济?(這里之所以是Observable$2
解恰,是因?yàn)榱魇秸{(diào)用,注冊(cè)的應(yīng)該是lift方法返回的Observable
)
public Subscription subscribe(Subscriber subscriber) {
subscriber.onStart();
onSubscribe.call(subscriber);
return subscriber;
}
注意浙于!這個(gè)onSubscribe
是我們上文所說的onSubscribe$2
护盈,這個(gè)subscriber
應(yīng)該是Subscriber<Integer>
。onSubscribe.call(subscriber)
就會(huì)調(diào)用到我們講過的lift
方法里做的三件事情羞酗。
這里再重復(fù)一遍腐宋,并轉(zhuǎn)換一種說法。
1檀轨、利用Subscriber<Integer>
創(chuàng)建Subscriber<String>
胸竞,這個(gè)部分的實(shí)現(xiàn)就在OperationMap
里面。
2参萄、調(diào)用Subscriber<String>的onStart()
卫枝。
3、調(diào)用OnSubscriber.call(Subscriber<String>)
讹挎,這個(gè)OnSubscriber
已經(jīng)說過了是OnSubscriber$1
校赤,所以是我們?cè)谕獠縿?chuàng)建的。這個(gè)call
方法里面淤袜,我們調(diào)用了subscriber.onNext("123456")
; subscriber.onComplete()
;
最后的關(guān)鍵點(diǎn)在于我們通過Subscriber<Integer>
創(chuàng)建Subscriber<String>
之后痒谴,調(diào)用到了subscriber<String>.onNext("123456")
和subscriber<String>.onComplete()
(注意這個(gè)subscriber<String>
的寫法是不對(duì)的衰伯,我這邊是為了能看的清邏輯铡羡,所以這樣標(biāo)注)。那么這個(gè)Subscriber<String>
的onNext
與onComplete
里面做了什么呢意鲸? 還記得我們是怎么創(chuàng)建的吧烦周?
在OperationMap
里面,這里再貼一次代碼怎顾。
@Override
public Subscriber<? super T> call(final Subscriber<? super R> o) {
return new Subscriber<T>(o) {
@Override
public void onCompleted() {
o.onCompleted();
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
@Override
public void onNext(T t) {
try {
o.onNext(transformer.call(t));
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
onError(OnErrorThrowable.addValueAsLastCause(e, t));
}
}
};
}
關(guān)鍵的地方就在于o.onNext(transformer.call(t));
這個(gè)o應(yīng)該是我們外部創(chuàng)建的Subscriber<Integer>
读慎,這個(gè)transformer
是我們也是我們外部傳入的new Fun1<String,Integer>
』蔽恚看明白了吧X参!募强!Subscriber<String>
只是個(gè)代理株灸,最終是通過我們定義的Fun1
對(duì)象的規(guī)則把String
轉(zhuǎn)換成Integer
,再調(diào)用我們創(chuàng)建的Subscriber<Integer>
的onNext
方法擎值,其他方法同理慌烧。
最后的最后,盜用一個(gè)拋物線大俠的圖再加上我的一些文字說明再來回憶一下整個(gè)過程鸠儿。
可能語言比較啰嗦屹蚊,但也確實(shí)是自己學(xué)習(xí)和思考的過程厕氨。如有錯(cuò)誤,請(qǐng)大家指正~~~
參考文章