Rxjava Amb

所有代碼的演示都在RxJava2.2.4版本上進(jìn)行的

當(dāng)你傳遞多個(gè)Observable給amb操作符時(shí),該操作符只發(fā)射其中一個(gè)Observable的數(shù)據(jù)和通知:首先發(fā)送通知給amb操作符的的那個(gè)Observable,不管發(fā)射的是一項(xiàng)數(shù)據(jù)還是一個(gè)onError或onCompleted通知掘托,amb將忽略和丟棄其它所有Observables的發(fā)射物


amb.png

1.example

   Observable<Integer> observable1 = Observable.timer(4, TimeUnit.SECONDS)
                .flatMap(__ -> Observable.just(1, 2, 3, 4, 5));

        Observable<Integer> observable2 = Observable.timer(3, TimeUnit.SECONDS)
                .flatMap(__ -> Observable.just(6, 7, 8, 9, 10));

        Observable<Integer> observable3 = Observable.timer(2, TimeUnit.SECONDS)
                .flatMap(__ -> Observable.just(11, 12, 13, 14, 15));

        Observable.ambArray(observable1, observable2, observable3)
                .subscribe(next -> Logger.getGlobal().info("OnNext: next: " + next));

2.源碼分析
先看看subscribe方法做了什么

public final Disposable subscribe(Consumer<? super T> onNext) {
        return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
    }
   public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete, Consumer<? super Disposable> onSubscribe) {
        ObjectHelper.requireNonNull(onNext, "onNext is null");
        ObjectHelper.requireNonNull(onError, "onError is null");
        ObjectHelper.requireNonNull(onComplete, "onComplete is null");
        ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

        LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);

        subscribe(ls);

        return ls;
    }

很簡(jiǎn)單雁仲,構(gòu)造一個(gè)LambdaObserver躯泰,注意代碼里的 subscribe(ls);

   public final void subscribe(Observer<? super T> observer) {
            ...省略部分代碼
            observer = RxJavaPlugins.onSubscribe(this, observer);
            subscribeActual(observer);
             ...省略部分代碼
    }

subscribeActual是一個(gè)抽象方法进肯,實(shí)現(xiàn)由子類負(fù)責(zé)
再來看看ambArray的執(zhí)行

   public static <T> Observable<T> ambArray(ObservableSource<? extends T>... sources) {
        ObjectHelper.requireNonNull(sources, "sources is null");
        int len = sources.length;
        if (len == 0) {
            return empty();
        }
        if (len == 1) {
            return (Observable<T>)wrap(sources[0]);
        }
        return RxJavaPlugins.onAssembly(new ObservableAmb<T>(sources, null));
    }

重點(diǎn)返回一個(gè)ObservableAmb牡彻,也就是subscribe是由該類的實(shí)例執(zhí)行的估灿。上面說執(zhí)行subscribe就會(huì)執(zhí)行subscribeActual方法崇呵,ObservableAmb類也會(huì)重寫了該方法

 public void subscribeActual(Observer<? super T> observer) {
        ObservableSource<? extends T>[] sources = this.sources;
        int count = 0;
        if (sources == null) {
            sources = new Observable[8];
            try {
                for (ObservableSource<? extends T> p : sourcesIterable) {
                    if (p == null) {
                        EmptyDisposable.error(new NullPointerException("One of the sources is null"), observer);
                        return;
                    }
                    if (count == sources.length) {
                        ObservableSource<? extends T>[] b = new ObservableSource[count + (count >> 2)];
                        System.arraycopy(sources, 0, b, 0, count);
                        sources = b;
                    }
                    sources[count++] = p;
                }
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                EmptyDisposable.error(e, observer);
                return;
            }
        } else {
            count = sources.length;
        }

        if (count == 0) {
            EmptyDisposable.complete(observer);
            return;
        } else
        if (count == 1) {
            sources[0].subscribe(observer);
            return;
        }

        AmbCoordinator<T> ac = new AmbCoordinator<T>(observer, count);
        ac.subscribe(sources);
    }

方法里面的sources是誰?就是一開始的observable1 馅袁,observable2域慷,observable3
方法里面的observer是誰?就是subscribe方法產(chǎn)生的LambdaObserver

看看AmbCoordinator類中subscribe方法

public void subscribe(ObservableSource<? extends T>[] sources) {
           AmbInnerObserver<T>[] as = observers;
           int len = as.length;
           for (int i = 0; i < len; i++) {
               as[i] = new AmbInnerObserver<T>(this, i + 1, downstream);
           }
           winner.lazySet(0); // release the contents of 'as'
           downstream.onSubscribe(this);

           for (int i = 0; i < len; i++) {
               if (winner.get() != 0) {
                   return;
               }

               sources[i].subscribe(as[i]);
           }
       }

downstream是通過AmbCoordinator構(gòu)造方法傳遞進(jìn)來的汗销,也就是ObservableAmb實(shí)例
構(gòu)造了一個(gè)AmbInnerObserver數(shù)組犹褒,在AmbInnerObserver內(nèi)部,數(shù)組的大小與
AmbCoordinator<T> ac = new AmbCoordinator<T>(observer, count);
中的count是一致的弛针,在AmbInnerObserver內(nèi)部 叠骑,parent就是AmbCoordinator實(shí)例
在AmbInnerObserver內(nèi)部還關(guān)聯(lián)著AmbInnerObserver的序號(hào)。
所有AmbInnerObserver數(shù)組中的對(duì)象共享downstream
在AmbCoordinator類中削茁,還有一個(gè)win方法

      public boolean win(int index) {
            int w = winner.get();
            if (w == 0) {
               //如果原子變量中的值是0宙枷,就會(huì)設(shè)置原子變量為index掉房,也就是上面序號(hào)
                if (winner.compareAndSet(0, index)) {
                    //勝利者拿到后,其他的observer都將失敗慰丛,不會(huì)調(diào)用next卓囚,onError,onComplete
                    AmbInnerObserver<T>[] a = observers;
                    int n = a.length;
                    for (int i = 0; i < n; i++) {
                        if (i + 1 != index) {
                            a[i].dispose();
                        }
                    }
                    return true;
                }
                return false;
            }
            //已經(jīng)有勝利者诅病,直接拿出勝利者
            return w == index;
        }

可以看出amb操作符哪亿,誰先調(diào)用onNext誰就是勝利者,失敗者死的悄無聲息O桶省B嗉小!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末苏潜,一起剝皮案震驚了整個(gè)濱河市银萍,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌恤左,老刑警劉巖贴唇,帶你破解...
    沈念sama閱讀 219,490評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異飞袋,居然都是意外死亡戳气,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,581評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門巧鸭,熙熙樓的掌柜王于貴愁眉苦臉地迎上來瓶您,“玉大人,你說我怎么就攤上這事纲仍⊙礁ぃ” “怎么了?”我有些...
    開封第一講書人閱讀 165,830評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵郑叠,是天一觀的道長(zhǎng)夜赵。 經(jīng)常有香客問我,道長(zhǎng)乡革,這世上最難降的妖魔是什么寇僧? 我笑而不...
    開封第一講書人閱讀 58,957評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮沸版,結(jié)果婚禮上嘁傀,老公的妹妹穿的比我還像新娘。我一直安慰自己视粮,他們只是感情好细办,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,974評(píng)論 6 393
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著馒铃,像睡著了一般蟹腾。 火紅的嫁衣襯著肌膚如雪痕惋。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,754評(píng)論 1 307
  • 那天娃殖,我揣著相機(jī)與錄音值戳,去河邊找鬼。 笑死炉爆,一個(gè)胖子當(dāng)著我的面吹牛堕虹,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播芬首,決...
    沈念sama閱讀 40,464評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼赴捞,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了郁稍?” 一聲冷哼從身側(cè)響起赦政,我...
    開封第一講書人閱讀 39,357評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎耀怜,沒想到半個(gè)月后恢着,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,847評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡财破,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,995評(píng)論 3 338
  • 正文 我和宋清朗相戀三年掰派,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片左痢。...
    茶點(diǎn)故事閱讀 40,137評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡靡羡,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出俊性,到底是詐尸還是另有隱情略步,我是刑警寧澤,帶...
    沈念sama閱讀 35,819評(píng)論 5 346
  • 正文 年R本政府宣布磅废,位于F島的核電站纳像,受9級(jí)特大地震影響荆烈,放射性物質(zhì)發(fā)生泄漏拯勉。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,482評(píng)論 3 331
  • 文/蒙蒙 一憔购、第九天 我趴在偏房一處隱蔽的房頂上張望宫峦。 院中可真熱鬧,春花似錦玫鸟、人聲如沸导绷。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,023評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽妥曲。三九已至贾费,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間檐盟,已是汗流浹背褂萧。 一陣腳步聲響...
    開封第一講書人閱讀 33,149評(píng)論 1 272
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留葵萎,地道東北人导犹。 一個(gè)月前我還...
    沈念sama閱讀 48,409評(píng)論 3 373
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像羡忘,于是被迫代替她去往敵國(guó)和親谎痢。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,086評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • 一卷雕、RxJava操作符概述 RxJava中的操作符就是為了提供函數(shù)式的特性节猿,函數(shù)式最大的好處就是使得數(shù)據(jù)處理簡(jiǎn)潔易...
    BrotherChen閱讀 1,620評(píng)論 0 10
  • 一、Retrofit詳解 ·Retrofit的官網(wǎng)地址為 : http://square.github.io/re...
    余生_d630閱讀 1,862評(píng)論 0 5
  • 一漫雕、RxJava操作符概述 RxJava中的操作符就是為了提供函數(shù)式的特性沐批,函數(shù)式最大的好處就是使得數(shù)據(jù)處理簡(jiǎn)潔易...
    測(cè)天測(cè)地測(cè)空氣閱讀 636評(píng)論 0 1
  • 一、RxJava操作符概述 RxJava中的操作符就是為了提供函數(shù)式的特性蝎亚,函數(shù)式最大的好處就是使得數(shù)據(jù)處理簡(jiǎn)潔易...
    無求_95dd閱讀 3,092評(píng)論 0 21
  • 本篇文章介主要紹RxJava中操作符是以函數(shù)作為基本單位九孩,與響應(yīng)式編程作為結(jié)合使用的,對(duì)什么是操作发框、操作符都有哪些...
    嘎啦果安卓獸閱讀 2,863評(píng)論 0 10