RxJava操作符Zip彰阴、Merge、Concat

鏈?zhǔn)秸{(diào)用

鏈?zhǔn)秸{(diào)用.png

1拍冠、Zip

Zip

1.1尿这、zip將多個observable并行執(zhí)行,通過function庆杜,轉(zhuǎn)成一個value給下游射众。
1.2、當(dāng)最短的ObservableSource執(zhí)行完成后晃财,最長的ObservableSource剩余部分不再執(zhí)行叨橱,也就是說,較長的Source的onComplete調(diào)用不到。當(dāng)長度相等時罗洗,也會發(fā)生這種情況愉舔,比如zip(new ObservableSource[]{range(1, 5).doOnComplete(action1), range(6, 5).doOnComplete(action2)} action2可能調(diào)用不到,比如action1已經(jīng)完成伙菜,而action2將要完成轩缤,還沒有完成。上述圖片[3仇让,null]不會執(zhí)行典奉。
1.3躺翻、zip源碼分析

public final class ObservableZip extends <T, R> extends Observable<R>{

  public void subscribeActual(Observer<? super R> observer) {
        ObservableSource<? extends T>[] sources = this.sources;
        int count = 0;
        count = sources.length;
        ZipCoordinator<T, R> zc = new ZipCoordinator<>(observer, zipper, count, delayError);
        zc.subscribe(sources, bufferSize);
    }

//ZipCoordinator
class ZipCoordinator{

    public void subscribe(ObservableSource<? extends T>[] sources, int bufferSize) {
            ZipObserver<T, R>[] s = observers;
            int len = s.length;
            for (int i = 0; i < len; i++) {
                s[i] = new ZipObserver<>(this, bufferSize);
            }
       
            this.lazySet(0);
            downstream.onSubscribe(this);
            for (int i = 0; i < len; i++) {
                if (cancelled) {
                    return;
                }
                sources[i].subscribe(s[I]);
            }
     }

     public void drain() {

            if (getAndIncrement() != 0) {
                return;
            }

            int missing = 1;

            final ZipObserver<T, R>[] zs = observers;
            final Observer<? super R> a = downstream;
            final T[] os = row;
            final boolean delayError = this.delayError;

            for (;;) {  //為了能夠執(zhí)行多次丧叽,避免使用同步代碼塊

                for (;;) {  //遍歷observer
                    int i = 0;
                    int emptyCount = 0;
                    for (ZipObserver<T, R> z : zs) {
                        if (os[i] == null) {  //先用歷史的判斷
                            boolean d = z.done;
                            T v = z.queue.poll();
                            boolean empty = v == null;

                            if (checkTerminated(d, empty, a, delayError, z)) {
                                return;
                            }
                            if (!empty) {
                                os[i] = v;
                            } else {
                                emptyCount++;
                            }
                        } else {
                            if (z.done && !delayError) {
                   
                            }
                        }
                        I++;
                    }
                    if (emptyCount != 0) {
                        break;
                    }

                    R v;
                    try {
                        v = Objects.requireNonNull(zipper.apply(os.clone()), "The zipper returned a null value");
                    } catch (Throwable ex) {
                     
                    }
                    a.onNext(v);
                    Arrays.fill(os, null);
                }
               //避免同步代碼塊
                missing = addAndGet(-missing);
                if (missing == 0) {
                    return;
                }
            }
        }
   }

 static final class ZipObserver<T, R> implements Observer<T> {

        final ZipCoordinator<T, R> parent;
        final SpscLinkedArrayQueue<T> queue;

        @Override
        public void onNext(T t) {
            queue.offer(t);
            parent.drain();
        }
   }
}

上述代碼,首先 sources[i].subscribe(s[I])公你,訂閱后踊淳,上游執(zhí)行onNext,就會調(diào)用到ZipObserver中的onNext陕靠,此時把執(zhí)行的結(jié)果在onNext中迂尝,保存在queue中。因為Observer是有序的剪芥,遍歷Observer垄开,拿到每一個observer對應(yīng)的queue中的值。如果為null税肪,則跳出循環(huán)溉躲,每一個observer都取出相同index的值,則向下執(zhí)行apply方法益兄。
1.4锻梳、實現(xiàn)一個并發(fā)執(zhí)行,順序返回結(jié)果的功能

 Observable.zip(getSource1(true), getSource2(true), new BiFunction<Integer, Integer, Observable<Integer>>() {
            @Override
            public Observable<Integer> apply(Integer integer, Integer integer2) throws Throwable {
                Log.d(TAG, "apply:  完成====+++++");
                return Observable.fromArray(integer, integer2);
            }
        }).concatMap(new Function<Observable<Integer>, ObservableSource<Integer>>() {
            @Override
            public ObservableSource<Integer> apply(Observable<Integer> integerObservable) throws Throwable {
                return integerObservable;
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Throwable {
                Log.d(TAG, "accept:--------- " + integer);
            }
        });

上述concatMap還可以簡寫成concatMap(Functions.identity())净捅。
1.5疑枯、簡寫Zip功能,去掉轉(zhuǎn)換函數(shù)mapper蛔六。

public class ZipRxJava {

    private static final String TAG = "RxJavaZip";
    private Observable[] mObservables;
    private MyObserver[] mObservers;
    private Object[] row;

    public void rxjavaZip(Observable... sources) {
        if (sources == null || sources.length == 0) {
            return;
        }
        mObservables = sources;
        mObservers = new MyObserver[sources.length];
        row = new Object[sources.length];
        for (int i = 0; i < sources.length; i++) {
            MyObserver observer = new MyObserver(128);
            mObservers[i] = observer;
            mObservables[i].subscribe(mObservers[i]);
        }
    }

    private synchronized void drain() {
        Log.d(TAG, "drain:  開始begin====== " + Thread.currentThread().getName());
        boolean hasAllRunComplete = true;
        Object[] objects = row;
        for (int i = 0; i < mObservers.length; i++) {
            MyObserver observer = mObservers[i];
            if (objects[i] == null) {
                Object poll = observer.queue.poll();
                if (poll == null) {
                    hasAllRunComplete = false;
                    break;
                } else {
                    objects[i] = poll;
                }
            }
            Log.d(TAG, "drain: 數(shù)組   " + Arrays.toString(objects));
        }
        Log.d(TAG, "drain: 數(shù)組f " + Arrays.toString(objects));
        if (hasAllRunComplete) {
            for (int i = 0; i < row.length; i++) {
                Log.d(TAG, "drain: " + row[i]);
            }
            Arrays.fill(objects, null);
        }
    }

    class MyObserver<T> implements Observer<T> {

        final SpscLinkedArrayQueue<T> queue;

        public MyObserver(int count) {
            queue = new SpscLinkedArrayQueue<>(count);
        }

        @Override
        public void onNext(@NonNull T t) {
            queue.offer(t);
            drain();
        }
    }

}

使用

public void testRxjavaZip() {
        ZipRxJava rxJavaZip = new ZipRxJava();
        rxJavaZip.rxjavaZip(Observable.just(1, 2, 3).subscribeOn(Schedulers.newThread()),
                Observable.just(4, 5).delay(1, TimeUnit.SECONDS).subscribeOn(Schedulers.newThread()));
    }
2荆永、merge
merge.png
  public static <@NonNull T> Observable<T> merge(@NonNull ObservableSource<? extends T> source1, @NonNull ObservableSource<? extends T> source2) {
        return fromArray(source1, source2).flatMap((Function)Functions.identity(), false, 2);
    }

2.1、fromArray国章、fromIterable
從上面看出fromArray是最上游的Observable具钥,調(diào)用OnNext 會直接調(diào)用MergeObserver的onNext。source中的onNext會調(diào)用到InnerObserver中的onNext()

  void run() {
            boolean hasNext;
            do {
                T v;
                try {
                    v = Objects.requireNonNull(it.next(), "The iterator returned a null value");
                } catch (Throwable e) {
                }
                downstream.onNext(v);
                try {
                    hasNext = it.hasNext();
                } catch (Throwable e) {
                }
            } while (hasNext);
        }

2.2捉腥、flatMap
mapper : 轉(zhuǎn)換函數(shù)氓拼,返回一個ObservableSource對象。
delayErrors : 延遲錯誤。
maxConcurrency :最大并發(fā)數(shù)桃漾,同時可執(zhí)行多少個ObservableSource
bufferSize :緩存多少個ObservableSource對象

flatMap(@NonNull Function<? super T, ? extends ObservableSource<? extends R>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize)

看下FlatMap源碼
2.2.1坏匪、上游訂閱當(dāng)前的MergeObserver,傳遞OnNext會傳遞到MegeObser的onNext中撬统。

public final class ObservableFlatMap  extends Observable{

     @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MergeObserver<>(t, mapper, delayErrors, maxConcurrency, bufferSize));
    }
}

2.2.2适滓、在onNext中使用mapper函數(shù)轉(zhuǎn)化,轉(zhuǎn)化后的是Observable對象恋追,因此必須給emit出去凭迹。所以將改Observable訂閱內(nèi)部的InnerObserver,執(zhí)行內(nèi)部的onNext苦囱,然后調(diào)用下游真正的Observer的OnNext()

  @Override
        public void onNext(T t) {
            ObservableSource<? extends U> p;
            try {
                p = Objects.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
            } catch (Throwable e) {
            }
            if (maxConcurrency != Integer.MAX_VALUE) {
                synchronized (this) {
                    if (wip == maxConcurrency) {
                        sources.offer(p);
                        return;
                    }
                    wip++;
                }
            }
            subscribeInner(p);
        }

maxConcurrency 最大并發(fā)數(shù)嗅绸,如果超過了最大并發(fā)數(shù),就存在sources隊列中撕彤,當(dāng)執(zhí)行完一個Observable之后鱼鸠,再從sources隊列中取。


merge.png

從上圖也可以看出羹铅,merge結(jié)果是無序的蚀狰,但是每一個ObservableSource的結(jié)果是有序的。 當(dāng)超過超過最大并發(fā)數(shù)职员,就會等待前面的source執(zhí)行完麻蹋,再執(zhí)行。

class InnerObserver{
        public void onNext(U t) {
            parent.tryEmit(t, this);
       }
}

class MergeObserver {

     void tryEmit(U value, InnerObserver<T, U> inner) {
            if (get() == 0 && compareAndSet(0, 1)) {
                downstream.onNext(value);
                if (decrementAndGet() == 0) {
                    return;
                }
            } else {
                SimpleQueue<U> q = inner.queue;
                if (q == null) {
                    q = new SpscLinkedArrayQueue<>(bufferSize);
                    inner.queue = q;
                }
                q.offer(value);
                if (getAndIncrement() != 0) {
                    return;
                }
            }
            drainLoop();
        }


    void drainLoop() {
            final Observer<? super U> child = this.downstream;
            int missed = 1;
            for (;;) {

                boolean d = done;
                svq = queue;
                InnerObserver<?, ?>[] inner = observers.get();
                int n = inner.length;

                if (n != 0) {
                    int j = Math.min(n - 1, lastIndex);

                    sourceLoop:
                    for (int i = 0; i < n; i++) {
                        if (checkTerminate()) {
                            return;
                        }

                        @SuppressWarnings("unchecked")
                        InnerObserver<T, U> is = (InnerObserver<T, U>)inner[j];
                        SimpleQueue<U> q = is.queue;
                        if (q != null) {
                            for (;;) {
                                U o;
                                try {
                                    o = q.poll();
                                } catch (Throwable ex) {
                          
                                }
                                if (o == null) {
                                    break;
                                }

                                child.onNext(o);

                                if (checkTerminate()) {
                                    return;
                                }
                            }
                        }

                        boolean innerDone = is.done;
                        SimpleQueue<U> innerQueue = is.queue;
                        if (innerDone && (innerQueue == null || innerQueue.isEmpty())) {
                            removeInner(is);
                            innerCompleted++;
                        }

                        j++;
                        if (j == n) {
                            j = 0;
                        }
                    }
                    lastIndex = j;
                }

                if (innerCompleted != 0) {
                    if (maxConcurrency != Integer.MAX_VALUE) {
                        subscribeMore(innerCompleted);
                        innerCompleted = 0;
                    }
                    continue;
                }

                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }
}

上面代碼有這幾點
1)焊切、 外部調(diào)用onNext 直接調(diào)用到了InnerObserver的onNext扮授,將next值直接調(diào)用downstream.onNext(),或者存入到queue中蛛蒙。
2)糙箍、遍歷InnerObserver 從其中的queue中取出里面的值,調(diào)用downStream.onNext()牵祟,observer沒有值就遍歷下一個observer深夯。
3)、是執(zhí)行完成一個source后诺苹,從sources隊列中取出新source咕晋,訂閱InnerObserver。
可以看出收奔,外部的source誰先調(diào)用OnNext掌呜,就先調(diào)用誰的Observer, 然后執(zhí)行下游downStream.onNext()

3坪哄、concat

concat.png
 public static <@NonNull T> Observable<T> concat(@NonNull Iterable<@NonNull ? extends ObservableSource<? extends T>> sources) {
   fromIterable(sources).concatMapDelayError((Function)Functions.identity(), false, bufferSize());
    }

3.1 fromIterable這里和2.1是一樣的质蕉,也是上游發(fā)送數(shù)據(jù)的地方势篡。
3.2 訂閱

class ObservableConcatMap {

 @Override
    public void subscribeActual(Observer<? super U> observer) {
            SerializedObserver<U> serial = new SerializedObserver<>(observer);
            source.subscribe(new SourceObserver<>(serial, mapper, bufferSize));
    }
}

3.3 將Observerable存到queue中,遍歷queue模暗,轉(zhuǎn)化observable對象禁悠,訂閱到
InnerObserver。

static final class SourceObserver<T, U> {

        @Override
        public void onNext(T t) {
            if (fusionMode == QueueDisposable.NONE) {
                queue.offer(t);
            }
            drain();
        }

        void drain() {

            for (; ; ) {
                if (!active) {
                    boolean d = done;
                    T t;
                    try {
                        t = queue.poll();
                    } catch (Throwable ex) {
                    }
                    boolean empty = t == null;
                    if (!empty) {
                        ObservableSource<? extends U> o;
                        try {
                            o = Objects.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
                        } catch (Throwable ex) {
                        }

                        active = true;
                        o.subscribe(inner);
                    }
                }
            }
        }
     
      void innerComplete() {
            active = false;
            drain();
        }

}

 static final class InnerObserver<U>  {
        @Override
        public void onNext(U t) {
            downstream.onNext(t);
        }

            @Override
            public void onComplete() {
                parent.innerComplete();
            }

  }

從上面代碼看出兑宇,當(dāng)一個InnerObserver執(zhí)行完成之后碍侦,將active 設(shè)置為false,然后for循環(huán)中再取下一個Observable對象訂閱隶糕。

4瓷产、總結(jié)

merge是上游調(diào)用OnNext之后,只要任何一個InnerObserver中有數(shù)據(jù)枚驻,就調(diào)用downstream.onNext()濒旦,結(jié)果是無序的。
concat 只有一個source執(zhí)行完成之后测秸,才會執(zhí)行下一個source疤估,結(jié)果是有序的。
zip通過1.4可以實現(xiàn)有序霎冯,但是必須得等待source都執(zhí)行完成才能執(zhí)行。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末钞瀑,一起剝皮案震驚了整個濱河市沈撞,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌雕什,老刑警劉巖缠俺,帶你破解...
    沈念sama閱讀 219,589評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異贷岸,居然都是意外死亡壹士,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,615評論 3 396
  • 文/潘曉璐 我一進(jìn)店門偿警,熙熙樓的掌柜王于貴愁眉苦臉地迎上來躏救,“玉大人,你說我怎么就攤上這事螟蒸。” “怎么了?”我有些...
    開封第一講書人閱讀 165,933評論 0 356
  • 文/不壞的土叔 我叫張陵翼抠,是天一觀的道長店煞。 經(jīng)常有香客問我,道長诵原,這世上最難降的妖魔是什么英妓? 我笑而不...
    開封第一講書人閱讀 58,976評論 1 295
  • 正文 為了忘掉前任挽放,我火速辦了婚禮,結(jié)果婚禮上蔓纠,老公的妹妹穿的比我還像新娘骂维。我一直安慰自己,他們只是感情好贺纲,可當(dāng)我...
    茶點故事閱讀 67,999評論 6 393
  • 文/花漫 我一把揭開白布航闺。 她就那樣靜靜地躺著,像睡著了一般猴誊。 火紅的嫁衣襯著肌膚如雪潦刃。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,775評論 1 307
  • 那天懈叹,我揣著相機(jī)與錄音乖杠,去河邊找鬼。 笑死澄成,一個胖子當(dāng)著我的面吹牛胧洒,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播墨状,決...
    沈念sama閱讀 40,474評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼卫漫,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了肾砂?” 一聲冷哼從身側(cè)響起列赎,我...
    開封第一講書人閱讀 39,359評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎镐确,沒想到半個月后包吝,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,854評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡源葫,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,007評論 3 338
  • 正文 我和宋清朗相戀三年诗越,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片息堂。...
    茶點故事閱讀 40,146評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡嚷狞,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出储矩,到底是詐尸還是另有隱情感耙,我是刑警寧澤,帶...
    沈念sama閱讀 35,826評論 5 346
  • 正文 年R本政府宣布持隧,位于F島的核電站即硼,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏屡拨。R本人自食惡果不足惜只酥,卻給世界環(huán)境...
    茶點故事閱讀 41,484評論 3 331
  • 文/蒙蒙 一褥实、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧裂允,春花似錦损离、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,029評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至十饥,卻和暖如春窟勃,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背逗堵。 一陣腳步聲響...
    開封第一講書人閱讀 33,153評論 1 272
  • 我被黑心中介騙來泰國打工秉氧, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人蜒秤。 一個月前我還...
    沈念sama閱讀 48,420評論 3 373
  • 正文 我出身青樓汁咏,卻偏偏與公主長得像,于是被迫代替她去往敵國和親作媚。 傳聞我的和親對象是個殘疾皇子攘滩,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,107評論 2 356

推薦閱讀更多精彩內(nèi)容