所有代碼的演示都在RxJava2.2.4版本上進(jìn)行的
當(dāng)你傳遞多個(gè)Observable給amb操作符時(shí),該操作符只發(fā)射其中一個(gè)Observable的數(shù)據(jù)和通知:首先發(fā)送通知給amb操作符的的那個(gè)Observable,不管發(fā)射的是一項(xiàng)數(shù)據(jù)還是一個(gè)onError或onCompleted通知掘托,amb將忽略和丟棄其它所有Observables的發(fā)射物
1.example
Observable<Integer> observable1 = Observable.timer(4, TimeUnit.SECONDS)
.flatMap(__ -> Observable.just(1, 2, 3, 4, 5));
Observable<Integer> observable2 = Observable.timer(3, TimeUnit.SECONDS)
.flatMap(__ -> Observable.just(6, 7, 8, 9, 10));
Observable<Integer> observable3 = Observable.timer(2, TimeUnit.SECONDS)
.flatMap(__ -> Observable.just(11, 12, 13, 14, 15));
Observable.ambArray(observable1, observable2, observable3)
.subscribe(next -> Logger.getGlobal().info("OnNext: next: " + next));
2.源碼分析
先看看subscribe方法做了什么
public final Disposable subscribe(Consumer<? super T> onNext) {
return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
return ls;
}
很簡(jiǎn)單雁仲,構(gòu)造一個(gè)LambdaObserver躯泰,注意代碼里的 subscribe(ls);
public final void subscribe(Observer<? super T> observer) {
...省略部分代碼
observer = RxJavaPlugins.onSubscribe(this, observer);
subscribeActual(observer);
...省略部分代碼
}
subscribeActual是一個(gè)抽象方法进肯,實(shí)現(xiàn)由子類負(fù)責(zé)
再來看看ambArray的執(zhí)行
public static <T> Observable<T> ambArray(ObservableSource<? extends T>... sources) {
ObjectHelper.requireNonNull(sources, "sources is null");
int len = sources.length;
if (len == 0) {
return empty();
}
if (len == 1) {
return (Observable<T>)wrap(sources[0]);
}
return RxJavaPlugins.onAssembly(new ObservableAmb<T>(sources, null));
}
重點(diǎn)返回一個(gè)ObservableAmb牡彻,也就是subscribe是由該類的實(shí)例執(zhí)行的估灿。上面說執(zhí)行subscribe就會(huì)執(zhí)行subscribeActual方法崇呵,ObservableAmb類也會(huì)重寫了該方法
public void subscribeActual(Observer<? super T> observer) {
ObservableSource<? extends T>[] sources = this.sources;
int count = 0;
if (sources == null) {
sources = new Observable[8];
try {
for (ObservableSource<? extends T> p : sourcesIterable) {
if (p == null) {
EmptyDisposable.error(new NullPointerException("One of the sources is null"), observer);
return;
}
if (count == sources.length) {
ObservableSource<? extends T>[] b = new ObservableSource[count + (count >> 2)];
System.arraycopy(sources, 0, b, 0, count);
sources = b;
}
sources[count++] = p;
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
EmptyDisposable.error(e, observer);
return;
}
} else {
count = sources.length;
}
if (count == 0) {
EmptyDisposable.complete(observer);
return;
} else
if (count == 1) {
sources[0].subscribe(observer);
return;
}
AmbCoordinator<T> ac = new AmbCoordinator<T>(observer, count);
ac.subscribe(sources);
}
方法里面的sources是誰?就是一開始的observable1 馅袁,observable2域慷,observable3
方法里面的observer是誰?就是subscribe方法產(chǎn)生的LambdaObserver
看看AmbCoordinator類中subscribe方法
public void subscribe(ObservableSource<? extends T>[] sources) {
AmbInnerObserver<T>[] as = observers;
int len = as.length;
for (int i = 0; i < len; i++) {
as[i] = new AmbInnerObserver<T>(this, i + 1, downstream);
}
winner.lazySet(0); // release the contents of 'as'
downstream.onSubscribe(this);
for (int i = 0; i < len; i++) {
if (winner.get() != 0) {
return;
}
sources[i].subscribe(as[i]);
}
}
downstream是通過AmbCoordinator構(gòu)造方法傳遞進(jìn)來的汗销,也就是ObservableAmb實(shí)例
構(gòu)造了一個(gè)AmbInnerObserver數(shù)組犹褒,在AmbInnerObserver內(nèi)部,數(shù)組的大小與
AmbCoordinator<T> ac = new AmbCoordinator<T>(observer, count);
中的count是一致的弛针,在AmbInnerObserver內(nèi)部 叠骑,parent就是AmbCoordinator實(shí)例
在AmbInnerObserver內(nèi)部還關(guān)聯(lián)著AmbInnerObserver的序號(hào)。
所有AmbInnerObserver數(shù)組中的對(duì)象共享downstream
在AmbCoordinator類中削茁,還有一個(gè)win方法
public boolean win(int index) {
int w = winner.get();
if (w == 0) {
//如果原子變量中的值是0宙枷,就會(huì)設(shè)置原子變量為index掉房,也就是上面序號(hào)
if (winner.compareAndSet(0, index)) {
//勝利者拿到后,其他的observer都將失敗慰丛,不會(huì)調(diào)用next卓囚,onError,onComplete
AmbInnerObserver<T>[] a = observers;
int n = a.length;
for (int i = 0; i < n; i++) {
if (i + 1 != index) {
a[i].dispose();
}
}
return true;
}
return false;
}
//已經(jīng)有勝利者诅病,直接拿出勝利者
return w == index;
}
可以看出amb操作符哪亿,誰先調(diào)用onNext誰就是勝利者,失敗者死的悄無聲息O桶省B嗉小!