鏈?zhǔn)秸{(diào)用
1拍冠、Zip
1.1尿这、zip將多個observable并行執(zhí)行,通過function庆杜,轉(zhuǎn)成一個value給下游射众。
1.2、當(dāng)最短的ObservableSource執(zhí)行完成后晃财,最長的ObservableSource剩余部分不再執(zhí)行叨橱,也就是說,較長的Source的onComplete調(diào)用不到。當(dāng)長度相等時罗洗,也會發(fā)生這種情況愉舔,比如zip(new ObservableSource[]{range(1, 5).doOnComplete(action1), range(6, 5).doOnComplete(action2)} action2可能調(diào)用不到,比如action1已經(jīng)完成伙菜,而action2將要完成轩缤,還沒有完成。上述圖片[3仇让,null]不會執(zhí)行典奉。
1.3躺翻、zip源碼分析
public final class ObservableZip extends <T, R> extends Observable<R>{
public void subscribeActual(Observer<? super R> observer) {
ObservableSource<? extends T>[] sources = this.sources;
int count = 0;
count = sources.length;
ZipCoordinator<T, R> zc = new ZipCoordinator<>(observer, zipper, count, delayError);
zc.subscribe(sources, bufferSize);
}
//ZipCoordinator
class ZipCoordinator{
public void subscribe(ObservableSource<? extends T>[] sources, int bufferSize) {
ZipObserver<T, R>[] s = observers;
int len = s.length;
for (int i = 0; i < len; i++) {
s[i] = new ZipObserver<>(this, bufferSize);
}
this.lazySet(0);
downstream.onSubscribe(this);
for (int i = 0; i < len; i++) {
if (cancelled) {
return;
}
sources[i].subscribe(s[I]);
}
}
public void drain() {
if (getAndIncrement() != 0) {
return;
}
int missing = 1;
final ZipObserver<T, R>[] zs = observers;
final Observer<? super R> a = downstream;
final T[] os = row;
final boolean delayError = this.delayError;
for (;;) { //為了能夠執(zhí)行多次丧叽,避免使用同步代碼塊
for (;;) { //遍歷observer
int i = 0;
int emptyCount = 0;
for (ZipObserver<T, R> z : zs) {
if (os[i] == null) { //先用歷史的判斷
boolean d = z.done;
T v = z.queue.poll();
boolean empty = v == null;
if (checkTerminated(d, empty, a, delayError, z)) {
return;
}
if (!empty) {
os[i] = v;
} else {
emptyCount++;
}
} else {
if (z.done && !delayError) {
}
}
I++;
}
if (emptyCount != 0) {
break;
}
R v;
try {
v = Objects.requireNonNull(zipper.apply(os.clone()), "The zipper returned a null value");
} catch (Throwable ex) {
}
a.onNext(v);
Arrays.fill(os, null);
}
//避免同步代碼塊
missing = addAndGet(-missing);
if (missing == 0) {
return;
}
}
}
}
static final class ZipObserver<T, R> implements Observer<T> {
final ZipCoordinator<T, R> parent;
final SpscLinkedArrayQueue<T> queue;
@Override
public void onNext(T t) {
queue.offer(t);
parent.drain();
}
}
}
上述代碼,首先 sources[i].subscribe(s[I])公你,訂閱后踊淳,上游執(zhí)行onNext,就會調(diào)用到ZipObserver中的onNext陕靠,此時把執(zhí)行的結(jié)果在onNext中迂尝,保存在queue中。因為Observer是有序的剪芥,遍歷Observer垄开,拿到每一個observer對應(yīng)的queue中的值。如果為null税肪,則跳出循環(huán)溉躲,每一個observer都取出相同index的值,則向下執(zhí)行apply方法益兄。
1.4锻梳、實現(xiàn)一個并發(fā)執(zhí)行,順序返回結(jié)果的功能
Observable.zip(getSource1(true), getSource2(true), new BiFunction<Integer, Integer, Observable<Integer>>() {
@Override
public Observable<Integer> apply(Integer integer, Integer integer2) throws Throwable {
Log.d(TAG, "apply: 完成====+++++");
return Observable.fromArray(integer, integer2);
}
}).concatMap(new Function<Observable<Integer>, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Observable<Integer> integerObservable) throws Throwable {
return integerObservable;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Throwable {
Log.d(TAG, "accept:--------- " + integer);
}
});
上述concatMap還可以簡寫成concatMap(Functions.identity())净捅。
1.5疑枯、簡寫Zip功能,去掉轉(zhuǎn)換函數(shù)mapper蛔六。
public class ZipRxJava {
private static final String TAG = "RxJavaZip";
private Observable[] mObservables;
private MyObserver[] mObservers;
private Object[] row;
public void rxjavaZip(Observable... sources) {
if (sources == null || sources.length == 0) {
return;
}
mObservables = sources;
mObservers = new MyObserver[sources.length];
row = new Object[sources.length];
for (int i = 0; i < sources.length; i++) {
MyObserver observer = new MyObserver(128);
mObservers[i] = observer;
mObservables[i].subscribe(mObservers[i]);
}
}
private synchronized void drain() {
Log.d(TAG, "drain: 開始begin====== " + Thread.currentThread().getName());
boolean hasAllRunComplete = true;
Object[] objects = row;
for (int i = 0; i < mObservers.length; i++) {
MyObserver observer = mObservers[i];
if (objects[i] == null) {
Object poll = observer.queue.poll();
if (poll == null) {
hasAllRunComplete = false;
break;
} else {
objects[i] = poll;
}
}
Log.d(TAG, "drain: 數(shù)組 " + Arrays.toString(objects));
}
Log.d(TAG, "drain: 數(shù)組f " + Arrays.toString(objects));
if (hasAllRunComplete) {
for (int i = 0; i < row.length; i++) {
Log.d(TAG, "drain: " + row[i]);
}
Arrays.fill(objects, null);
}
}
class MyObserver<T> implements Observer<T> {
final SpscLinkedArrayQueue<T> queue;
public MyObserver(int count) {
queue = new SpscLinkedArrayQueue<>(count);
}
@Override
public void onNext(@NonNull T t) {
queue.offer(t);
drain();
}
}
}
使用
public void testRxjavaZip() {
ZipRxJava rxJavaZip = new ZipRxJava();
rxJavaZip.rxjavaZip(Observable.just(1, 2, 3).subscribeOn(Schedulers.newThread()),
Observable.just(4, 5).delay(1, TimeUnit.SECONDS).subscribeOn(Schedulers.newThread()));
}
2荆永、merge
public static <@NonNull T> Observable<T> merge(@NonNull ObservableSource<? extends T> source1, @NonNull ObservableSource<? extends T> source2) {
return fromArray(source1, source2).flatMap((Function)Functions.identity(), false, 2);
}
2.1、fromArray国章、fromIterable
從上面看出fromArray是最上游的Observable具钥,調(diào)用OnNext 會直接調(diào)用MergeObserver的onNext。source中的onNext會調(diào)用到InnerObserver中的onNext()
void run() {
boolean hasNext;
do {
T v;
try {
v = Objects.requireNonNull(it.next(), "The iterator returned a null value");
} catch (Throwable e) {
}
downstream.onNext(v);
try {
hasNext = it.hasNext();
} catch (Throwable e) {
}
} while (hasNext);
}
2.2捉腥、flatMap
mapper : 轉(zhuǎn)換函數(shù)氓拼,返回一個ObservableSource對象。
delayErrors : 延遲錯誤。
maxConcurrency :最大并發(fā)數(shù)桃漾,同時可執(zhí)行多少個ObservableSource
bufferSize :緩存多少個ObservableSource對象
flatMap(@NonNull Function<? super T, ? extends ObservableSource<? extends R>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize)
看下FlatMap源碼
2.2.1坏匪、上游訂閱當(dāng)前的MergeObserver,傳遞OnNext會傳遞到MegeObser的onNext中撬统。
public final class ObservableFlatMap extends Observable{
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MergeObserver<>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
}
2.2.2适滓、在onNext中使用mapper函數(shù)轉(zhuǎn)化,轉(zhuǎn)化后的是Observable對象恋追,因此必須給emit出去凭迹。所以將改Observable訂閱內(nèi)部的InnerObserver,執(zhí)行內(nèi)部的onNext苦囱,然后調(diào)用下游真正的Observer的OnNext()
@Override
public void onNext(T t) {
ObservableSource<? extends U> p;
try {
p = Objects.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
} catch (Throwable e) {
}
if (maxConcurrency != Integer.MAX_VALUE) {
synchronized (this) {
if (wip == maxConcurrency) {
sources.offer(p);
return;
}
wip++;
}
}
subscribeInner(p);
}
maxConcurrency 最大并發(fā)數(shù)嗅绸,如果超過了最大并發(fā)數(shù),就存在sources隊列中撕彤,當(dāng)執(zhí)行完一個Observable之后鱼鸠,再從sources隊列中取。
從上圖也可以看出羹铅,merge結(jié)果是無序的蚀狰,但是每一個ObservableSource的結(jié)果是有序的。 當(dāng)超過超過最大并發(fā)數(shù)职员,就會等待前面的source執(zhí)行完麻蹋,再執(zhí)行。
class InnerObserver{
public void onNext(U t) {
parent.tryEmit(t, this);
}
}
class MergeObserver {
void tryEmit(U value, InnerObserver<T, U> inner) {
if (get() == 0 && compareAndSet(0, 1)) {
downstream.onNext(value);
if (decrementAndGet() == 0) {
return;
}
} else {
SimpleQueue<U> q = inner.queue;
if (q == null) {
q = new SpscLinkedArrayQueue<>(bufferSize);
inner.queue = q;
}
q.offer(value);
if (getAndIncrement() != 0) {
return;
}
}
drainLoop();
}
void drainLoop() {
final Observer<? super U> child = this.downstream;
int missed = 1;
for (;;) {
boolean d = done;
svq = queue;
InnerObserver<?, ?>[] inner = observers.get();
int n = inner.length;
if (n != 0) {
int j = Math.min(n - 1, lastIndex);
sourceLoop:
for (int i = 0; i < n; i++) {
if (checkTerminate()) {
return;
}
@SuppressWarnings("unchecked")
InnerObserver<T, U> is = (InnerObserver<T, U>)inner[j];
SimpleQueue<U> q = is.queue;
if (q != null) {
for (;;) {
U o;
try {
o = q.poll();
} catch (Throwable ex) {
}
if (o == null) {
break;
}
child.onNext(o);
if (checkTerminate()) {
return;
}
}
}
boolean innerDone = is.done;
SimpleQueue<U> innerQueue = is.queue;
if (innerDone && (innerQueue == null || innerQueue.isEmpty())) {
removeInner(is);
innerCompleted++;
}
j++;
if (j == n) {
j = 0;
}
}
lastIndex = j;
}
if (innerCompleted != 0) {
if (maxConcurrency != Integer.MAX_VALUE) {
subscribeMore(innerCompleted);
innerCompleted = 0;
}
continue;
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
}
上面代碼有這幾點
1)焊切、 外部調(diào)用onNext 直接調(diào)用到了InnerObserver的onNext扮授,將next值直接調(diào)用downstream.onNext(),或者存入到queue中蛛蒙。
2)糙箍、遍歷InnerObserver 從其中的queue中取出里面的值,調(diào)用downStream.onNext()牵祟,observer沒有值就遍歷下一個observer深夯。
3)、是執(zhí)行完成一個source后诺苹,從sources隊列中取出新source咕晋,訂閱InnerObserver。
可以看出收奔,外部的source誰先調(diào)用OnNext掌呜,就先調(diào)用誰的Observer, 然后執(zhí)行下游downStream.onNext()
3坪哄、concat
public static <@NonNull T> Observable<T> concat(@NonNull Iterable<@NonNull ? extends ObservableSource<? extends T>> sources) {
fromIterable(sources).concatMapDelayError((Function)Functions.identity(), false, bufferSize());
}
3.1 fromIterable這里和2.1是一樣的质蕉,也是上游發(fā)送數(shù)據(jù)的地方势篡。
3.2 訂閱
class ObservableConcatMap {
@Override
public void subscribeActual(Observer<? super U> observer) {
SerializedObserver<U> serial = new SerializedObserver<>(observer);
source.subscribe(new SourceObserver<>(serial, mapper, bufferSize));
}
}
3.3 將Observerable存到queue中,遍歷queue模暗,轉(zhuǎn)化observable對象禁悠,訂閱到
InnerObserver。
static final class SourceObserver<T, U> {
@Override
public void onNext(T t) {
if (fusionMode == QueueDisposable.NONE) {
queue.offer(t);
}
drain();
}
void drain() {
for (; ; ) {
if (!active) {
boolean d = done;
T t;
try {
t = queue.poll();
} catch (Throwable ex) {
}
boolean empty = t == null;
if (!empty) {
ObservableSource<? extends U> o;
try {
o = Objects.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
} catch (Throwable ex) {
}
active = true;
o.subscribe(inner);
}
}
}
}
void innerComplete() {
active = false;
drain();
}
}
static final class InnerObserver<U> {
@Override
public void onNext(U t) {
downstream.onNext(t);
}
@Override
public void onComplete() {
parent.innerComplete();
}
}
從上面代碼看出兑宇,當(dāng)一個InnerObserver執(zhí)行完成之后碍侦,將active 設(shè)置為false,然后for循環(huán)中再取下一個Observable對象訂閱隶糕。
4瓷产、總結(jié)
merge是上游調(diào)用OnNext之后,只要任何一個InnerObserver中有數(shù)據(jù)枚驻,就調(diào)用downstream.onNext()濒旦,結(jié)果是無序的。
concat 只有一個source執(zhí)行完成之后测秸,才會執(zhí)行下一個source疤估,結(jié)果是有序的。
zip通過1.4可以實現(xiàn)有序霎冯,但是必須得等待source都執(zhí)行完成才能執(zhí)行。