map—— 一對(duì)一轉(zhuǎn)換典奉,一般是轉(zhuǎn)換成結(jié)果悼沿,如string到int的轉(zhuǎn)換∥可以集合進(jìn)行轉(zhuǎn)換翁巍,每次轉(zhuǎn)換都接著執(zhí)行onNext操作。
看個(gè)map的例子
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
observableEmitter.onNext("hello world");
}
}).map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
return s.hashCode();
}
})
.subscribe(new io.reactivex.Observer<Integer>() {
@Override
public void onSubscribe(Disposable disposable) {
}
@Override
public void onNext(Integer number) {
Log.d("RXJava2", "hascode: " + number);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
});
我們多次強(qiáng)調(diào)所有的訂閱者關(guān)系都會(huì)是形如observable.subscribe(observer)休雌。那么從上面的例子中灶壶,肯定可以看到經(jīng)過(guò)調(diào)用map方法之后,仍然返回Observable具體實(shí)例杈曲,當(dāng)然驰凛,參考線程變換一文,經(jīng)過(guò)map之后担扑,肯定是經(jīng)過(guò)裝飾者模式進(jìn)行了一層Observeable組合包裝恰响,經(jīng)過(guò)suscibe方法之后,也進(jìn)行了一層Observer組合包裝涌献。
具體情況不再詳細(xì)分析胚宦,可以詳細(xì)看線程變換一文,原理大體一致。現(xiàn)在主要大概分析下經(jīng)過(guò)map之后间唉,返回的ObservableMap如何執(zhí)行subscribe方法:
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}
@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}
@Override
public U poll() throws Exception {
T t = qs.poll();
return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
}
}
}
我們可以看到绞灼,直接調(diào)用source.subscribe(new MapObserver<T, U>(t, function))利术,進(jìn)而進(jìn)入到MapObserver的onNexy方法中呈野,主要看下面幾句代碼:
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
執(zhí)行mapper.apply(t),這里mapper即為map方法參數(shù)印叁,獲得值v被冒,本例中即為s.hashCode, 接著執(zhí)行事件發(fā)射,到達(dá)最基層observer轮蜕,打印log昨悼。
flatmap—— 一對(duì)多轉(zhuǎn)換,多對(duì)多轉(zhuǎn)換跃洛,每次轉(zhuǎn)換都會(huì)生成observable率触,而且是全部轉(zhuǎn)換完成才提交給訂閱者處理。
flatmap的作用一般是用于一個(gè)列表做扁平化處理汇竭,比如一個(gè)學(xué)生數(shù)據(jù)結(jié)構(gòu)的表單葱蝗,想對(duì)這個(gè)表單里的單元數(shù)據(jù)做一一處理,那么flatmap就派上場(chǎng)用到了细燎。flatmap的實(shí)現(xiàn)方式仍然和前面分析的大致两曼。不過(guò)既然是從一對(duì)多轉(zhuǎn)換,我們先得出一個(gè)的就是肯定通過(guò)某個(gè)組合包裝的Observable玻驻,在subscribe訂閱的時(shí)候通過(guò)循環(huán)往外發(fā)射數(shù)據(jù)悼凑,而observer觀察者也應(yīng)該通過(guò)循環(huán)接收數(shù)據(jù)。
我們以一個(gè)例子以及進(jìn)入相關(guān)的關(guān)鍵代碼來(lái)分析是否與猜想的一致:
observable.just(studentList).flatMap(new Func1<List, Observable<?>>() {
@Override
public Observable<?> call(List list) {
return Observable.fromInterable(list);
}
}).subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
Log.d("flatmap", o.toString());
}
});
上面的例子中,studentList是一個(gè)list列表璧瞬。那么經(jīng)過(guò)flatMap方法
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency, int bufferSize) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
if (this instanceof ScalarCallable) {
T v = ((ScalarCallable)this).call();
return v == null ? empty() : ObservableScalarXMap.scalarXMap(v, mapper);
} else {
return RxJavaPlugins.onAssembly(new ObservableFlatMap(this, mapper, delayErrors, maxConcurrency, bufferSize));
}
}
我們看到返回一個(gè)ObservableFlatMap實(shí)例户辫。當(dāng)我們調(diào)用observable.subscrbe(obverser)的時(shí)候,調(diào)用到如下方法:
@Override
public void subscribeActual(Observer<? super U> t) {
if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
return;
}
source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
主要看MergeObserver的onNext方法
public void onNext(T t) {
// safeguard against misbehaving sources
if (done) {
return;
}
ObservableSource<? extends U> p;
try {
p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
s.dispose();
onError(e);
return;
}
if (maxConcurrency != Integer.MAX_VALUE) {
synchronized (this) {
if (wip == maxConcurrency) {
sources.offer(p);
return;
}
wip++;
}
}
subscribeInner(p);
}
而mapper.apply(t)即是例子中返回的Observable.from(list)嗤锉,我們繼續(xù)分析subscribeInner方法:
void subscribeInner(ObservableSource<? extends U> p) {
for (;;) {
if (p instanceof Callable) {
if (tryEmitScalar(((Callable<? extends U>)p)) && maxConcurrency != Integer.MAX_VALUE) {
boolean empty = false;
synchronized (this) {
p = sources.poll();
if (p == null) {
wip--;
empty = true;
}
}
if (empty) {
drain();
break;
}
} else {
break;
}
} else {
InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
if (addInner(inner)) {
p.subscribe(inner);
}
break;
}
}
}
一般走else語(yǔ)句渔欢,進(jìn)入InnerObserver中,走到InnerObserver的onNext方法
public void onNext(U t) {
if (fusionMode == QueueDisposable.NONE) {
parent.tryEmit(t, this);
} else {
parent.drain();
}
}
一般fusionMode為NONE档冬,調(diào)用MergeObserver的tryEmit方法:
void tryEmit(U value, InnerObserver<T, U> inner) {
if (get() == 0 && compareAndSet(0, 1)) {
actual.onNext(value);
if (decrementAndGet() == 0) {
return;
}
} else {
SimpleQueue<U> q = inner.queue;
if (q == null) {
q = new SpscLinkedArrayQueue<U>(bufferSize);
inner.queue = q;
}
q.offer(value);
if (getAndIncrement() != 0) {
return;
}
}
drainLoop();
}
主要看drainLoop方法, drainLoop比較長(zhǎng)膘茎,大體就是循環(huán)接收發(fā)射來(lái)的數(shù)據(jù),并通知子observer接收數(shù)據(jù)child.onNext();
那么誰(shuí)發(fā)射數(shù)據(jù)到MergeObserver的酷誓,我們上面提到p = Observable.fromInterable(list); 這里就是主要發(fā)射數(shù)據(jù)點(diǎn), 實(shí)際上p為ObservableFromIterable實(shí)例披坏,進(jìn)入具體subscribeActual方法:
public void subscribeActual(Observer<? super T> s) {
Iterator<? extends T> it;
try {
it = source.iterator();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
EmptyDisposable.error(e, s);
return;
}
boolean hasNext;
try {
hasNext = it.hasNext();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
EmptyDisposable.error(e, s);
return;
}
if (!hasNext) {
EmptyDisposable.complete(s);
return;
}
FromIterableDisposable<T> d = new FromIterableDisposable<T>(s, it);
s.onSubscribe(d);
if (!d.fusionMode) {
d.run();
}
}
進(jìn)入FromIterableDisposable的run方法
void run() {
boolean hasNext;
do {
if (isDisposed()) {
return;
}
T v;
try {
v = ObjectHelper.requireNonNull(it.next(), "The iterator returned a null value");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
actual.onError(e);
return;
}
actual.onNext(v);
if (isDisposed()) {
return;
}
try {
hasNext = it.hasNext();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
actual.onError(e);
return;
}
} while (hasNext);
if (!isDisposed()) {
actual.onComplete();
}
}
進(jìn)入for循環(huán),發(fā)射數(shù)據(jù)actual.onNext(v)盐数,這個(gè)actual是通過(guò)組合包裝的MergeObserver棒拂,于是驗(yàn)證了我們一開(kāi)始猜測(cè)的是正確的。
scan:將發(fā)射源的結(jié)果作為入?yún)⒌牡谝粋€(gè)參數(shù),入?yún)⒌牡诙€(gè)參數(shù)為第i個(gè)發(fā)射源數(shù)據(jù)帚屉,然后根據(jù)自定義的處理結(jié)果繼續(xù)處理谜诫。
scan通過(guò)看完源碼之后會(huì)特別容易理解,先舉個(gè)例子攻旦。
Observable.just(1, 2, 3, 4,5)
.scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer t1, Integer t2) {
return t1 + t2;//累加操作喻旷,
}
})
.subscribe(getObserver())
那么上面的例子,observer將依次接收到1牢屋,3且预,6,10烙无,15
上面的例子中我們主要看scan相關(guān)的Observer——ScanObserver锋谐。onNext方法如下:
public void onNext(T t) {
if (!this.done) {
Observer<? super T> a = this.actual;
T v = this.value;
if (v == null) {
this.value = t;
a.onNext(t);
} else {
Object u;
try {
u = ObjectHelper.requireNonNull(this.accumulator.apply(v, t), "The value returned by the accumulator is null");
} catch (Throwable var6) {
Exceptions.throwIfFatal(var6);
this.s.dispose();
this.onError(var6);
return;
}
this.value = u;
a.onNext(u);
}
}
}
從上面可以看出,當(dāng)發(fā)射1的時(shí)候截酷,this.value為空涮拗,因此進(jìn)入if語(yǔ)句中,輸出1迂苛,接著發(fā)送2的時(shí)候三热,進(jìn)入else語(yǔ)句,通過(guò)this.accumulator.apply(v, t)灾部,例子中是進(jìn)行v+t操作康铭,這里t為上一個(gè)發(fā)射源結(jié)果1,因?yàn)?+2赌髓,得出3从藤。后面處理結(jié)果一樣。
reduce:和scan類(lèi)似锁蠕,只不過(guò)scan會(huì)先把結(jié)果提交給訂閱者輸出夷野,而reduce是等所有observable處理完,才把最終結(jié)果提交給訂閱者處理荣倾。
先看個(gè)例子:
Observable.just(1, 2, 3, 4)
.reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer t1, Integer t2) {
return t1 + t2;
}
})
.subscribe(getObserver())
主要看reduce方法的對(duì)應(yīng)的Observer——ReduceObserver悯搔,onNext方法如下:
public void onNext(T value) {
if (!this.done) {
T v = this.value;
if (v == null) {
this.value = value;
} else {
try {
this.value = ObjectHelper.requireNonNull(this.reducer.apply(v, value), "The reducer returned a null value");
} catch (Throwable var4) {
Exceptions.throwIfFatal(var4);
this.d.dispose();
this.onError(var4);
}
}
}
}
這樣,只是先計(jì)算出來(lái)并保存結(jié)果value上舌仍。然后在onComplete中輸出妒貌。
public void onComplete() {
if (!this.done) {
this.done = true;
T v = this.value;
this.value = null;
if (v != null) {
this.actual.onSuccess(v);
} else {
this.actual.onComplete();
}
}
}