static void map() {
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
}
})
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return String.valueOf(integer + 1);
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("map-accept:" + s);
}
});
}
從create -> map ->subscribe
create()方法調(diào)用后返回ObservableCreate對(duì)象遣妥,在ObservableCreate對(duì)象上調(diào)用map()方法
在map()方法會(huì)創(chuàng)建ObservableMap對(duì)象
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
ObservableMap構(gòu)造函數(shù)里,ObservableCreate作為參數(shù)source保存在ObservableMap中习寸,map()方法調(diào)用后返回ObservableMap對(duì)象步藕,后面調(diào)用subscribe()方法惦界。
在Observable類中,調(diào)用subscribe()方法時(shí)咙冗,內(nèi)部會(huì)調(diào)用subscribeActual()方法
public final void subscribe(Observer<? super T> observer) {
...
subscribeActual(observer);
...
}
放調(diào)用subscribe()方法時(shí)沾歪,需要查看subscribeActual()方法實(shí)現(xiàn),或者子類subscribeActual()方法實(shí)現(xiàn)即可雾消。
從subscribe->map ->create
調(diào)用subscribe()灾搏,傳入觀察者挫望,該subscribe()方法是ObservableMap對(duì)象的方法,之前所說(shuō)狂窑,調(diào)用subscribe()方法媳板,看ObservableMap對(duì)象在subscribeActual()上的實(shí)現(xiàn)
public void subscribeActual(Observer<? super U> t) {
/**
* Observer訂閱時(shí),調(diào)用subscribe 接著調(diào)用subscribeActual泉哈,Observer保存在MapObserver中蛉幸,
* 通過(guò)source.subscribe()將訂閱繼續(xù)向上傳遞,當(dāng)上游發(fā)送數(shù)據(jù)時(shí)丛晦,傳遞到MapObserver奕纫,
* MapObserver在調(diào)用MyObserver,完成數(shù)據(jù)傳遞
*/
source.subscribe(new MapObserver<T, U>(t, function));
}
subscribeActual調(diào)用source.subscribe()方法采呐,原始觀察者包裝在MapObserver類中若锁,訂閱事件繼續(xù)上傳搁骑。從ObservableMap構(gòu)造方法可知斧吐,source為ObservableCreate對(duì)象,所以 source.subscribe()即為ObservableCreate.subscribe()仲器,具體可以查看查看ObservableCreate類中subscribeActual()方法實(shí)現(xiàn)煤率。
ObservableCreate類的方法
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
observer在這里為MapObserver。
subscribeActual內(nèi)部調(diào)用source.subscribe()乏冀,這里source為原始數(shù)據(jù)蝶糯。
new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
}
}
emitter調(diào)用onNext()發(fā)送數(shù)據(jù),即通過(guò)CreateEmitter.onNext()發(fā)送辆沦,onNext調(diào)用observer.onNext(t);
這里observer為MapObserver對(duì)象昼捍,數(shù)據(jù)傳遞到MapObserver.onNext()方法。
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
//執(zhí)行函數(shù)
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}
數(shù)據(jù)傳遞到onNext 肢扯,調(diào)用mapper.apply(t)妒茬,調(diào)用在map()操作符中實(shí)現(xiàn)的函數(shù),原始數(shù)據(jù)經(jīng)過(guò)map操作后返回新的數(shù)據(jù)蔚晨,調(diào)用actual.onNext(v)乍钻,數(shù)據(jù)繼續(xù)往下走,actual對(duì)象為我們最終訂閱者铭腕,第一個(gè)數(shù)據(jù)傳遞完畢银择。