4-RxJava源碼分析之 --- 操作符

RxJava是一個(gè)生產(chǎn)者和消費(fèi)者模型硕并,有生產(chǎn)者Observable和消費(fèi)者Observer,對于簡單的一個(gè)生產(chǎn)者和一個(gè)消費(fèi)者的情況比較簡單缰雇,但真實(shí)業(yè)務(wù)中生產(chǎn)者可能有多個(gè)拒担,且多個(gè)生產(chǎn)者合作生產(chǎn)的情況可能有多種,比如多個(gè)生產(chǎn)者誰先生產(chǎn)第一個(gè)其他的就取消印荔。而RxJava的操作符就是用于組合多個(gè)生產(chǎn)者的邏輯實(shí)現(xiàn)低葫。

1 - "amb"操作符源碼分析

"amb"操作符的作用是,組合多個(gè)ObservableSource保證誰先生產(chǎn)出數(shù)據(jù)仍律,就只接收到ObservableSource的數(shù)據(jù)嘿悬,操作符示意圖如下:


amb操作符
public abstract class Observable<T> implements ObservableSource<T> {
     public static <T> Observable<T> amb(Iterable<? extends ObservableSource<? extends T>> sources) {
        ObjectHelper.requireNonNull(sources, "sources is null");
        // 該操作把多個(gè)ObservableSource轉(zhuǎn)化為一個(gè)ObservableAmb對象
        return RxJavaPlugins.onAssembly(new ObservableAmb<T>(null, sources));
    }
}


public final class ObservableAmb<T> extends Observable<T> {
    final ObservableSource<? extends T>[] sources;
    final Iterable<? extends ObservableSource<? extends T>> sourcesIterable;
    
    public ObservableAmb(ObservableSource<? extends T>[] sources, Iterable<? extends ObservableSource<? extends T>> sourcesIterable) {
        this.sources = sources;
        this.sourcesIterable = sourcesIterable;
    }
    
     public void subscribeActual(Observer<? super T> s) {
        ObservableSource<? extends T>[] sources = this.sources;
        int count = 0;
        if (sources == null) {
            // 1 - 這里的邏輯是把sourcesIterable中的對象添加到source數(shù)組中
            ... 
        } else {
            count = sources.length;
        }

        if (count == 0) {
            EmptyDisposable.complete(s);
            return;
        } else
        if (count == 1) {
            sources[0].subscribe(s);
            return;
        }
        
        // 2 - 創(chuàng)建一個(gè)AmbCoordinator對象
        AmbCoordinator<T> ac = new AmbCoordinator<T>(s, count);
        // 調(diào)用各個(gè)ObservableSource的subscribe()
        ac.subscribe(sources);
    }
    
    // 3 - 處理多個(gè)ObservableSource競爭生產(chǎn)權(quán)的邏輯處理類,通過為沒一個(gè)ObservableSource創(chuàng)建一個(gè)Observer水泉,
    // 當(dāng)對應(yīng)的Observer收到數(shù)據(jù)時(shí)善涨,來競爭是否贏得生產(chǎn)權(quán),如果贏得生產(chǎn)權(quán)就把其他的Observer dispose草则,
    // 保證只有贏得生產(chǎn)權(quán)的ObservableSource來發(fā)送數(shù)據(jù)給原始Observer
     static final class AmbCoordinator<T> implements Disposable {
        final Observer<? super T> actual;
        final AmbInnerObserver<T>[] observers;
        final AtomicInteger winner = new AtomicInteger();

        @SuppressWarnings("unchecked")
        AmbCoordinator(Observer<? super T> actual, int count) {
            this.actual = actual;
            this.observers = new AmbInnerObserver[count];
        }

        // 處理sources中各個(gè)ObservableSource的subscribe邏輯
        public void subscribe(ObservableSource<? extends T>[] sources) {
            AmbInnerObserver<T>[] as = observers;
            int len = as.length;
            for (int i = 0; i < len; i++) {
                // 4 - 把Observer轉(zhuǎn)為AmbInnerObserver钢拧,AmbInnerObserver發(fā)送數(shù)據(jù)時(shí)通知到AmbCoordinator中來處理誰贏得了生產(chǎn)權(quán)
                as[i] = new AmbInnerObserver<T>(this, i + 1, actual);
            }
            winner.lazySet(0); // release the contents of 'as'
            // 5 - 調(diào)用Observer.onSubscribe()
            actual.onSubscribe(this);

            for (int i = 0; i < len; i++) {
                // 6 - 如果已經(jīng)有ObservableSource贏取了生產(chǎn)權(quán),不再處理
                if (winner.get() != 0) {
                    return;
                }
                // 7 -如果還沒哪個(gè)ObservableSource贏得生產(chǎn)權(quán)炕横,就調(diào)用所有ObservableSource.subscribe()
                sources[i].subscribe(as[i]);
            }
        }

        // 8 - 當(dāng)AmbInnerObserver收到數(shù)據(jù)時(shí)調(diào)用源内,代表AmbInnerObserver對應(yīng)的ObservableSource贏得了生產(chǎn)權(quán),
        // 把其他的AmbInnerObserver dispose份殿。保證只有贏得生產(chǎn)權(quán)的ObservableSource生產(chǎn)數(shù)據(jù)膜钓。
        public boolean win(int index) {
            int w = winner.get();
            if (w == 0) {
                // 如果還沒人贏得生產(chǎn)權(quán),就設(shè)置該index卿嘲,表示該index對應(yīng)得Observable贏得生產(chǎn)權(quán)
                if (winner.compareAndSet(0, index)) {
                    AmbInnerObserver<T>[] a = observers;
                    int n = a.length;
                    // 贏得生產(chǎn)權(quán)后把其他得Observer dispose
                    for (int i = 0; i < n; i++) {
                        if (i + 1 != index) {
                            a[i].dispose();
                        }
                    }
                    return true;
                }
                return false;
            }
            return w == index;
        }
        ...   
    }
    
    // 9 - 用于轉(zhuǎn)發(fā)生產(chǎn)者發(fā)送過來的數(shù)據(jù)并轉(zhuǎn)發(fā)給原始的Observer呻此,同時(shí)在收到數(shù)據(jù)時(shí),調(diào)用AmbCoordinator.win(index)來贏得生產(chǎn)權(quán)
    static final class AmbInnerObserver<T> extends AtomicReference<Disposable> implements Observer<T> {
        private static final long serialVersionUID = -1185974347409665484L;
        final AmbCoordinator<T> parent;
        final int index;
        final Observer<? super T> actual;

        boolean won;

        AmbInnerObserver(AmbCoordinator<T> parent, int index, Observer<? super T> actual) {
            this.parent = parent;
            this.index = index;
            this.actual = actual;
        }

        @Override
        public void onSubscribe(Disposable s) {
            DisposableHelper.setOnce(this, s);
        }

        @Override
        public void onNext(T t) {
            if (won) {
                // 10 - 已經(jīng)贏得來生產(chǎn)權(quán)腔寡,直接轉(zhuǎn)發(fā)數(shù)據(jù)
                actual.onNext(t);
            } else {
                // 11 - 競爭焚鲜,贏得生產(chǎn)權(quán)
                if (parent.win(index)) {
                    // 12 - 贏得生產(chǎn)權(quán)
                    won = true;
                    actual.onNext(t);
                } else {
                    // 13 - 沒有贏得生產(chǎn)權(quán)
                    get().dispose();
                }
            }
        }

        @Override
        public void onError(Throwable t) {
            // 14 - 與onNext()處理一樣
            if (won) {
                actual.onError(t);
            } else {
                if (parent.win(index)) {
                    won = true;
                    actual.onError(t);
                } else {
                    RxJavaPlugins.onError(t);
                }
            }
        }

        @Override
        public void onComplete() {
            // 15 - 與onNext()處理一樣
            if (won) {
                actual.onComplete();
            } else {
                if (parent.win(index)) {
                    won = true;
                    actual.onComplete();
                }
            }
        }

        public void dispose() {
            DisposableHelper.dispose(this);
        }
    }
}

從上面代碼分析可知,"amb"操作符是把多個(gè)ObservableSource先轉(zhuǎn)化為ObservableAmb對象放前,并在ObservableAmb中為每個(gè)ObservableSource創(chuàng)建對應(yīng)的包裝的AmbInnerObserver(可把收到的數(shù)據(jù)轉(zhuǎn)發(fā)給原始Observer)忿磅,并進(jìn)行訂閱操作,當(dāng)某個(gè)AmbInnerObserver先收到數(shù)據(jù)時(shí)凭语,代表ObservableSource贏得生產(chǎn)權(quán)葱她,把其他的AmbInnerObserver dispose,保證只接收贏得生產(chǎn)權(quán)的ObservableSource生產(chǎn)的數(shù)據(jù)似扔。

2 - "concatArray"操作符源碼分析

"concatArray"操作符的作用是吨些,組合多個(gè)ObservableSource按照順序依次生產(chǎn)發(fā)送數(shù)據(jù)搓谆,即多個(gè)ObservableSource一個(gè)生產(chǎn)完數(shù)據(jù)之后再下一個(gè),依次下去知道所有的結(jié)束豪墅,Observer接收所有ObservableSource發(fā)送的數(shù)據(jù)泉手,操作符示意圖如下:


concatArray操作符

public abstract class Observable<T> implements ObservableSource<T> {
    public static <T> Observable<T> concatArray(ObservableSource<? extends T>... sources) {
        if (sources.length == 0) {
            return empty();
        } else
        if (sources.length == 1) {
            return wrap((ObservableSource<T>)sources[0]);
        }
        // 1 - fromArray(sources)把多個(gè)ObservableSource轉(zhuǎn)為一個(gè)發(fā)送這些ObservableSource的Observable
        // 并把轉(zhuǎn)化后的Observable轉(zhuǎn)為ObservableConcatMap對象,記住此處arrayObservable=fromArray(sources)發(fā)送的是參數(shù)的sources
        // 2 - 下一步看看ObservableConcatMap
        return RxJavaPlugins.onAssembly(new ObservableConcatMap(fromArray(sources), Functions.identity(), bufferSize(), ErrorMode.BOUNDARY));
    }
}

public final class ObservableConcatMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    public ObservableConcatMap(ObservableSource<T> source, Function<? super T, ? extends ObservableSource<? extends U>> mapper,
            int bufferSize, ErrorMode delayErrors) {
        super(source);
        this.mapper = mapper;
        this.delayErrors = delayErrors;
        this.bufferSize = Math.max(8, bufferSize);
    }
    @Override
    public void subscribeActual(Observer<? super U> s) {

        if (ObservableScalarXMap.tryScalarXMapSubscribe(source, s, mapper)) {
            return;
        }

        if (delayErrors == ErrorMode.IMMEDIATE) {
            SerializedObserver<U> serial = new SerializedObserver<U>(s);
            // 3 - 這里的source是arrayObservable偶器,即那個(gè)發(fā)送原始sources的Observable
            // 把原始Observer轉(zhuǎn)為SourceObserver斩萌,這種是接收到error就結(jié)束,下面的情況和這差不多就不看了屏轰,
            // 下面看看SourceObserver
            source.subscribe(new SourceObserver<T, U>(serial, mapper, bufferSize));
        } else {
            source.subscribe(new ConcatMapDelayErrorObserver<T, U>(s, mapper, bufferSize, delayErrors == ErrorMode.END));
        }
    }
    
    static final class SourceObserver<T, U> extends AtomicInteger implements Observer<T>, Disposable {
        SourceObserver(Observer<? super U> actual,
                                Function<? super T, ? extends ObservableSource<? extends U>> mapper, int bufferSize) {
            this.actual = actual;
            this.mapper = mapper;
            this.bufferSize = bufferSize;
            // 把原始Observer轉(zhuǎn)為InnerObserver
            this.inner = new InnerObserver<U>(actual, this);
        }
        
        public void onSubscribe(Disposable s) {
            if (DisposableHelper.validate(this.s, s)) {
                this.s = s;
                if (s instanceof QueueDisposable) {
                    @SuppressWarnings("unchecked")
                    QueueDisposable<T> qd = (QueueDisposable<T>) s;

                    int m = qd.requestFusion(QueueDisposable.ANY);
                    // 4 - 對于concatArray這里是true颊郎,可以看ObservableFromArray中的requestFusion邏輯
                    if (m == QueueDisposable.SYNC) {
                        fusionMode = m;
                        // 5 - 這里把QueueDisposable存為queue,這里的QueueDisposable對應(yīng)的在ObservableFromArray中霎苗,即保存了所有原始sources
                        queue = qd;
                        done = true;

                        actual.onSubscribe(this);
                        
                        // 6 - 處理發(fā)送數(shù)據(jù)邏輯
                        drain();
                        return;
                    }
                    
                    if (m == QueueDisposable.ASYNC) {
                        fusionMode = m;
                        queue = qd;

                        actual.onSubscribe(this);

                        return;
                    }
                }

                queue = new SpscLinkedArrayQueue<T>(bufferSize);

                actual.onSubscribe(this);
            }
        }
        
        void drain() {
            if (getAndIncrement() != 0) {
                return;
            }

            for (;;) {
                if (disposed) {
                    queue.clear();
                    return;
                }
                // 7 - 沒有正在處理的ObservableSource
                if (!active) {
                    boolean d = done;

                    T t;

                    try {
                        // 8 - 取出一個(gè)ObservableSource
                        t = queue.poll();
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        dispose();
                        queue.clear();
                        actual.onError(ex);
                        return;
                    }

                    boolean empty = t == null;

                    if (d && empty) {
                        disposed = true;
                        actual.onComplete();
                        return;
                    }

                    if (!empty) {
                        ObservableSource<? extends U> o;

                        try {
                            o = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
                        } catch (Throwable ex) {
                            Exceptions.throwIfFatal(ex);
                            dispose();
                            queue.clear();
                            actual.onError(ex);
                            return;
                        }

                        // 9 - 標(biāo)記為true姆吭,for循環(huán)不在處理ObservableSource
                        active = true;
                        // 10 - 處理訂閱原始的ObservableSource
                        o.subscribe(inner);
                    }
                }

                if (decrementAndGet() == 0) {
                    break;
                }
            }
        }
        
        void innerComplete() {
            // 13 - 比較為false,drain()中可以處理下一個(gè)ObservableSource
            active = false;
            // 14 - 調(diào)drain()唁盏,處理下一個(gè)ObservableSource
            drain();
        }
    }
    
    static final class InnerObserver<U> extends AtomicReference<Disposable> implements Observer<U> {
            ... 
            
            @Override
            public void onError(Throwable t) {
                // 11 - Error内狸,整個(gè)流程結(jié)束
                parent.dispose();
                actual.onError(t);
            }
            @Override
            public void onComplete() {
                // 12 - 一個(gè)成功結(jié)束,調(diào)用SourceObserver.innerComplete()
                parent.innerComplete();
            }

            void dispose() {
                DisposableHelper.dispose(this);
            }
        }
}

從上面的代碼分析可清除了解到升敲,"concatArray"操作符是把多個(gè)ObservableSource轉(zhuǎn)為一個(gè)ObservableFromArray類型的Observable(注意:在ObservableFromArray中把sources放進(jìn)FromArrayDisposable,之后傳遞給Observer.onSubscribe(FromArrayDisposable))轰传,然后又把ObservableFromArray轉(zhuǎn)為ObservableConcatMap類型Observable驴党,在ObservableConcatMap中把原始Observer轉(zhuǎn)為SourceObserver,在SourceObserver中取出FromArrayDisposable中的sources获茬,逐個(gè)ObservableSource進(jìn)行生產(chǎn)數(shù)據(jù)港庄,并把原始的Observer轉(zhuǎn)為InnerObserver配合實(shí)現(xiàn)逐個(gè)ObservableSource生產(chǎn)數(shù)據(jù)。

總結(jié)語

操作符就只分析這兩個(gè)恕曲,因?yàn)閷?shí)在太多了鹏氧,大家感興趣可以自己去看看源碼∨逡ィ總的來說操作符得邏輯就是把原始得一個(gè)或多個(gè)Observable轉(zhuǎn)為另一種Observable來處理相關(guān)邏輯把还,把原始Observer轉(zhuǎn)為另一種Observer出來相關(guān)邏輯。

RxJava源碼分析系列文章主題目錄:

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末茸俭,一起剝皮案震驚了整個(gè)濱河市吊履,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌调鬓,老刑警劉巖艇炎,帶你破解...
    沈念sama閱讀 218,607評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異腾窝,居然都是意外死亡缀踪,警方通過查閱死者的電腦和手機(jī)居砖,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,239評論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來驴娃,“玉大人奏候,你說我怎么就攤上這事⊥锌” “怎么了鼻由?”我有些...
    開封第一講書人閱讀 164,960評論 0 355
  • 文/不壞的土叔 我叫張陵,是天一觀的道長厚棵。 經(jīng)常有香客問我蕉世,道長,這世上最難降的妖魔是什么婆硬? 我笑而不...
    開封第一講書人閱讀 58,750評論 1 294
  • 正文 為了忘掉前任狠轻,我火速辦了婚禮,結(jié)果婚禮上彬犯,老公的妹妹穿的比我還像新娘向楼。我一直安慰自己,他們只是感情好谐区,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,764評論 6 392
  • 文/花漫 我一把揭開白布湖蜕。 她就那樣靜靜地躺著,像睡著了一般宋列。 火紅的嫁衣襯著肌膚如雪昭抒。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,604評論 1 305
  • 那天炼杖,我揣著相機(jī)與錄音灭返,去河邊找鬼。 笑死坤邪,一個(gè)胖子當(dāng)著我的面吹牛熙含,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播艇纺,決...
    沈念sama閱讀 40,347評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼怎静,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了黔衡?” 一聲冷哼從身側(cè)響起消约,我...
    開封第一講書人閱讀 39,253評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎员帮,沒想到半個(gè)月后或粮,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,702評論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡捞高,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,893評論 3 336
  • 正文 我和宋清朗相戀三年氯材,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了渣锦。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,015評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡氢哮,死狀恐怖袋毙,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情冗尤,我是刑警寧澤听盖,帶...
    沈念sama閱讀 35,734評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站裂七,受9級特大地震影響皆看,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜背零,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,352評論 3 330
  • 文/蒙蒙 一腰吟、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧徙瓶,春花似錦毛雇、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,934評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至壳繁,卻和暖如春震捣,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背氮趋。 一陣腳步聲響...
    開封第一講書人閱讀 33,052評論 1 270
  • 我被黑心中介騙來泰國打工伍派, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留江耀,地道東北人剩胁。 一個(gè)月前我還...
    沈念sama閱讀 48,216評論 3 371
  • 正文 我出身青樓,卻偏偏與公主長得像祥国,于是被迫代替她去往敵國和親昵观。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,969評論 2 355