RxJava流控機(jī)制之Flowable

背景

對(duì)于生產(chǎn)者和消費(fèi)者模型肛走,存在一個(gè)問(wèn)題就是當(dāng)生產(chǎn)者生產(chǎn)的速度大于消費(fèi)者消費(fèi)速度焚志,并且生產(chǎn)過(guò)程不會(huì)停止映凳,生產(chǎn)者和消費(fèi)者位于不同的線(xiàn)程中胆筒,這是要如何對(duì)待多余出來(lái)的生產(chǎn)內(nèi)容?是丟掉诈豌,是緩沖仆救?
在強(qiáng)大的異步處理框架中,RxJava又是怎么處理的呢矫渔?如果在工作中萬(wàn)一發(fā)生丟包事件怎么辦彤蔽?

使用環(huán)境與本文目的

RxJava版本:2.1.0
默認(rèn)條件:觀(guān)察者和被觀(guān)察者位于main線(xiàn)程中,且使用了默認(rèn)的事件發(fā)射器庙洼。
目的:通過(guò)Flowable顿痪,探究RxJava的流控機(jī)制。

Flowable創(chuàng)建過(guò)程

Flowable flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
                            @Override
                            public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                                for(int i =0 ; i<10;i++){
                                    e.onNext(i);
                                }
                            }
                        }
                , BackpressureStrategy.BUFFER);

在create方法中油够,完成了對(duì)Flowable的構(gòu)建過(guò)程:

public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) {
        ObjectHelper.requireNonNull(source, "source is null");
        ObjectHelper.requireNonNull(mode, "mode is null");
        //在工廠(chǎng)中構(gòu)建出一個(gè)Flowable對(duì)象蚁袭。需要傳入對(duì)向FlowableCreate
        //如果要構(gòu)建Observable,則傳入的是ObservableDefer
        return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
    }

FlowableCreate實(shí)際上是Flowable子類(lèi)石咬。當(dāng)調(diào)用Flowable的subscribe方法時(shí)揩悄,實(shí)際上將執(zhí)行FlowableCreate中的subscribeActual(該方法在Flowable是一個(gè)抽象方法,在FlowableCreate中實(shí)現(xiàn))方法:

public final void subscribe(FlowableSubscriber<? super T> s) {
        ObjectHelper.requireNonNull(s, "s is null");
        try {
            Subscriber<? super T> z = RxJavaPlugins.onSubscribe(this, s);

            ObjectHelper.requireNonNull(z, "Plugin returned null Subscriber");

            subscribeActual(z);
        }
......

subscribe過(guò)程分析

實(shí)際執(zhí)行的是subscribeActual鬼悠,這個(gè)方法非常重要虏束,該方法的實(shí)現(xiàn)為:

BaseEmitter<T> emitter;
        switch (backpressure) {
        case MISSING: {
            emitter = new MissingEmitter<T>(t);
            break;
        }
        case ERROR: {
            emitter = new ErrorAsyncEmitter<T>(t);
            break;
        }
        case DROP: {
            emitter = new DropAsyncEmitter<T>(t);
            break;
        }
        case LATEST: {
            emitter = new LatestAsyncEmitter<T>(t);
            break;
        }
        default: {
            emitter = new BufferAsyncEmitter<T>(t, bufferSize());
            break;
        }
        }

        t.onSubscribe(emitter);
        try {
            source.subscribe(emitter);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            emitter.onError(ex);
        }

我們可以看到:

  • 它首先會(huì)根據(jù)我們選擇的背壓模式,設(shè)置不同的emitter厦章;如果沒(méi)有設(shè)置镇匀,默認(rèn)將開(kāi)啟帶有緩存的emitter;
  • Subscriber中的onSubscribe在事件沒(méi)有發(fā)射前就執(zhí)行了袜啃;
  • 事件的發(fā)射汗侵,是通過(guò)source.subscribe(emitter)實(shí)現(xiàn)的,而這個(gè)source,實(shí)際上就是我們?cè)跇?gòu)建Flowable時(shí)創(chuàng)建的FlowableOnSubscribe晰韵。
    現(xiàn)在回過(guò)來(lái)我們看看在構(gòu)建時(shí)发乔,F(xiàn)lowableOnSubscribe的內(nèi)容,通常我們會(huì)這么寫(xiě):
new FlowableOnSubscribe<Integer>() {
                            @Override
                            public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                                for(int i =0 ; i<10;i++){
                                    e.onNext(i);
                                }
                            }
                        }

轉(zhuǎn)了一圈,又回到了這里雪猪。FlowableEmitter來(lái)發(fā)射事件栏尚。默認(rèn)的,將使用BufferAsyncEmitter只恨,這是一個(gè)支持背壓處理的Emitter译仗。
該Emitter中,onNext方法是這樣的:

@Override
        public void onNext(T t) {
            if (done || isCancelled()) {
                return;
            }

            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            queue.offer(t); //生產(chǎn)
            drain();       //實(shí)際消費(fèi)過(guò)程會(huì)執(zhí)行queue.poll
        }

我們看到官觅,queue就是它維護(hù)的一個(gè)SpscLinkedArrayQueue隊(duì)列(其中使用的大量的原子類(lèi)型處理多線(xiàn)程訪(fǎng)問(wèn)問(wèn)題)纵菌,隊(duì)列容量會(huì)根據(jù)生產(chǎn)消費(fèi)情況自動(dòng)擴(kuò)容。
生產(chǎn)過(guò)程休涤,或者說(shuō)事件發(fā)射過(guò)程咱圆,直接調(diào)用了隊(duì)列的offer方法,進(jìn)行入隊(duì)操作功氨;
消費(fèi)過(guò)程序苏,或者說(shuō)消費(fèi)事件,則是先使用了drain方法捷凄,該方法的本質(zhì)忱详,是執(zhí)行隊(duì)列的poll方法取出事件,然后在onNext()中消費(fèi)纵势。
在 offer 中主要完成生產(chǎn):

//producerLookAhead相當(dāng)于一個(gè)生產(chǎn)者的斥候踱阿,主要用于檢測(cè)邊界
//這里將檢測(cè)管钳,要插入的位置钦铁,是否已經(jīng)越界了
 if (index < producerLookAhead) {
            return writeToQueue(buffer, e, index, offset);
        }
//else這種情況,主要時(shí)考慮到循環(huán)隊(duì)列
 else {
            //producerLookAheadStep實(shí)際上是一個(gè)定值才漆,表示固定步長(zhǎng)
            final int lookAheadStep = producerLookAheadStep;
            // go around the buffer or resize if full (unless we hit max capacity)
            //首先檢查前進(jìn)了固定步長(zhǎng)之后牛曹,是否還有位置用來(lái)插入,注意醇滥,使用calcWrappedOffset方法黎比,
           //包括很多其他用到mask的地方,實(shí)際上是將數(shù)組作為一個(gè)循環(huán)隊(duì)列使用鸳玩。
           //如果前進(jìn)固定步長(zhǎng)之后阅虫,還可以插入,那么不跟,說(shuō)明生產(chǎn)者可用空間還有很多
            int lookAheadElementOffset = calcWrappedOffset(index + lookAheadStep, mask);
            if (null == lvElement(buffer, lookAheadElementOffset)) { // LoadLoad
                producerLookAhead = index + lookAheadStep - 1; // joy, there's plenty of room
                return writeToQueue(buffer, e, index, offset);
            }
            //檢查下一插入位是否為空颓帝,如果不為空,則使用 ;
            //反之购城,插入位已經(jīng)滿(mǎn)了吕座,需要?jiǎng)?chuàng)建一個(gè)新的數(shù)組以完成生產(chǎn)者的工作
           else if (null == lvElement(buffer, calcWrappedOffset(index + 1, mask))) { // buffer is not full
                return writeToQueue(buffer, e, index, offset);
            } else {
                //現(xiàn)有數(shù)組容量已經(jīng)滿(mǎn)了,消費(fèi)者速度無(wú)法跟上生產(chǎn)者速度瘪板,
                //需要開(kāi)辟一塊新的空間用于生產(chǎn)吴趴。空間大小和現(xiàn)有數(shù)組大小一致侮攀。
                //這里將完成對(duì)已經(jīng)生產(chǎn)并且尚未消費(fèi)的數(shù)組進(jìn)行保存的工作锣枝;
                //同時(shí),開(kāi)辟一個(gè)新的數(shù)組魏身,用于生產(chǎn)
                resize(buffer, index, offset, e, mask); // add a buffer and link old to new
                return true;
            }
}

在poll中主要完成事件取出惊橱,以在onNext中消費(fèi):

public T poll() {
        // local load of field to avoid repeated loads after volatile reads
        final AtomicReferenceArray<Object> buffer = consumerBuffer;
        final long index = lpConsumerIndex();
        final int mask = consumerMask;
        final int offset = calcWrappedOffset(index, mask);
        final Object e = lvElement(buffer, offset);// LoadLoad
        boolean isNextBuffer = e == HAS_NEXT;
        if (null != e && !isNextBuffer) {
           //取出發(fā)射的事件,進(jìn)行消費(fèi)
            soElement(buffer, offset, null);// StoreStore
            soConsumerIndex(index + 1);// this ensures correctness on 32bit platforms
            return (T) e;
        } else if (isNextBuffer) {
            //如果這個(gè)數(shù)組中所有元素已經(jīng)消費(fèi)完箭昵,同時(shí)生產(chǎn)者已經(jīng)不再這個(gè)數(shù)組中進(jìn)行生產(chǎn)工作税朴;
            //跳轉(zhuǎn)到新的數(shù)組中,完成消費(fèi)工作家制,同時(shí)正林,移除當(dāng)前數(shù)組,即放棄這塊空間颤殴,不再使用
            return newBufferPoll(lvNext(buffer), index, mask);
        }

        return null;
    }

本質(zhì)上將觅廓,生產(chǎn)和消費(fèi)就是在操作這樣一個(gè)隊(duì)列。
現(xiàn)在涵但,可以回過(guò)頭來(lái)杈绸,重新看一看上面的 drain() 方法,看看它具體的消費(fèi)過(guò)程矮瘟,這個(gè)方法很有意思瞳脓。
drain:

void drain() {
            //保證同時(shí)只能有一個(gè)線(xiàn)程操作進(jìn)行下面的循環(huán)
            //注意在該方法的末尾,對(duì)wip進(jìn)行了重置為-1澈侠,打開(kāi)進(jìn)行循環(huán)的權(quán)限
            if (wip.getAndIncrement() != 0) {
                return;
            }

            int missed = 1;
            final Subscriber<? super T> a = actual;
            //無(wú)限隊(duì)列劫侧,本質(zhì)上有很多個(gè)固定長(zhǎng)度的數(shù)組自動(dòng)擴(kuò)展構(gòu)成
            final SpscLinkedArrayQueue<T> q = queue;
            //死循環(huán)
            for (;;) {
                //得到請(qǐng)求的數(shù)量,該值是通過(guò)Subscription.request()設(shè)置的哨啃,
                //而這個(gè)方法烧栋,Subscription參數(shù),實(shí)際上在Subscriber的onSubscribe傳遞進(jìn)去拳球。
                //所以审姓,這也就是為什么,要在subscriber中request(num), num為多少祝峻,就消費(fèi)多少魔吐。
                //這個(gè)請(qǐng)求對(duì)于外界來(lái)說(shuō)次坡,只能通過(guò)subscription設(shè)置
                long r = get();
                long e = 0L;
                //如果請(qǐng)求量為0,不進(jìn)入循環(huán)進(jìn)行消費(fèi)
                while (e != r) {
                    if (isCancelled()) {
                        q.clear();
                        return;
                    }
                    //用來(lái)判斷是否執(zhí)行了onComplete或者onError
                    boolean d = done;

                    T o = q.poll();

                    boolean empty = o == null;
                    //如果事件全部消費(fèi)完画畅,之后執(zhí)行了onCopmlete或者onError
                    if (d && empty) {
                        Throwable ex = error;
                        if (ex != null) {
                            super.onError(ex);
                        } else {
                            super.onComplete();
                        }
                        return;
                    }
                    //如果事件全部消費(fèi)完砸琅,跳出本次循環(huán)
                    //注意,此時(shí)空轉(zhuǎn)了轴踱。如果消費(fèi)者速度大于生產(chǎn)者速度症脂,會(huì)發(fā)生這次空轉(zhuǎn),同時(shí)繼續(xù)循環(huán)過(guò)程
                    if (empty) {
                        break;
                    }
                    //消費(fèi)事件
                    a.onNext(o);
                    //處理完一件事情淫僻,計(jì)數(shù)器加一
                    e++;
                }

                if (e == r) {
                    if (isCancelled()) {
                        q.clear();
                        return;
                    }

                    boolean d = done;

                    boolean empty = q.isEmpty();

                    if (d && empty) {
                        Throwable ex = error;
                        if (ex != null) {
                            super.onError(ex);
                        } else {
                            super.onComplete();
                        }
                        return;
                    }
                }
                //上一次request的量已經(jīng)全部完成诱篷,此時(shí)重置請(qǐng)求量
                if (e != 0) {
                    BackpressureHelper.produced(this, e);
                }
                //開(kāi)鎖
                missed = wip.addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }

總結(jié)

上述的分析過(guò)程,實(shí)際上并沒(méi)有設(shè)置觀(guān)察者雳灵、 被觀(guān)察者于不同的線(xiàn)程棕所,且使用默認(rèn)的事件發(fā)射器。緩沖隊(duì)列的空間是無(wú)限大的(一旦當(dāng)前緩沖被使用完悯辙,則開(kāi)辟新的緩沖空間琳省,直到這個(gè)空間的容量達(dá)到了 long 類(lèi)型的最大值,或者內(nèi)存溢出)躲撰。
這種背壓方式针贬,需要觀(guān)察者或者消費(fèi)者主動(dòng)請(qǐng)求要處理的事件的數(shù)量,已達(dá)到流速控制拢蛋。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末桦他,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子谆棱,更是在濱河造成了極大的恐慌快压,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,599評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件垃瞧,死亡現(xiàn)場(chǎng)離奇詭異蔫劣,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)皆警,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,629評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門(mén)拦宣,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)截粗,“玉大人信姓,你說(shuō)我怎么就攤上這事〕衤蓿” “怎么了意推?”我有些...
    開(kāi)封第一講書(shū)人閱讀 158,084評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀(guān)的道長(zhǎng)珊蟀。 經(jīng)常有香客問(wèn)我菊值,道長(zhǎng)外驱,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,708評(píng)論 1 284
  • 正文 為了忘掉前任腻窒,我火速辦了婚禮昵宇,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘儿子。我一直安慰自己瓦哎,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,813評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布柔逼。 她就那樣靜靜地躺著蒋譬,像睡著了一般。 火紅的嫁衣襯著肌膚如雪愉适。 梳的紋絲不亂的頭發(fā)上犯助,一...
    開(kāi)封第一講書(shū)人閱讀 50,021評(píng)論 1 291
  • 那天,我揣著相機(jī)與錄音维咸,去河邊找鬼剂买。 笑死,一個(gè)胖子當(dāng)著我的面吹牛癌蓖,可吹牛的內(nèi)容都是我干的雷恃。 我是一名探鬼主播,決...
    沈念sama閱讀 39,120評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼费坊,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼倒槐!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起附井,我...
    開(kāi)封第一講書(shū)人閱讀 37,866評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤讨越,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后永毅,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體把跨,經(jīng)...
    沈念sama閱讀 44,308評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,633評(píng)論 2 327
  • 正文 我和宋清朗相戀三年沼死,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了着逐。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,768評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡意蛀,死狀恐怖耸别,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情县钥,我是刑警寧澤秀姐,帶...
    沈念sama閱讀 34,461評(píng)論 4 333
  • 正文 年R本政府宣布,位于F島的核電站若贮,受9級(jí)特大地震影響省有,放射性物質(zhì)發(fā)生泄漏痒留。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,094評(píng)論 3 317
  • 文/蒙蒙 一蠢沿、第九天 我趴在偏房一處隱蔽的房頂上張望伸头。 院中可真熱鬧,春花似錦舷蟀、人聲如沸熊锭。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,850評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)碗殷。三九已至,卻和暖如春速缨,著一層夾襖步出監(jiān)牢的瞬間锌妻,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,082評(píng)論 1 267
  • 我被黑心中介騙來(lái)泰國(guó)打工旬牲, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留仿粹,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,571評(píng)論 2 362
  • 正文 我出身青樓原茅,卻偏偏與公主長(zhǎng)得像吭历,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子擂橘,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,666評(píng)論 2 350

推薦閱讀更多精彩內(nèi)容

  • 怎么如此平靜, 感覺(jué)像是走錯(cuò)了片場(chǎng).為什么呢, 因?yàn)樯舷掠喂ぷ髟谕粋€(gè)線(xiàn)程呀騷年們! 這個(gè)時(shí)候上游每次調(diào)用emit...
    Young1657閱讀 1,458評(píng)論 2 1
  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理晌区,服務(wù)發(fā)現(xiàn),斷路器通贞,智...
    卡卡羅2017閱讀 134,637評(píng)論 18 139
  • 教程一:視頻截圖(Tutorial 01: Making Screencaps) 首先我們需要了解視頻文件的一些基...
    90后的思維閱讀 4,680評(píng)論 0 3
  • ¥開(kāi)啟¥ 【iAPP實(shí)現(xiàn)進(jìn)入界面執(zhí)行逐一顯】 〖2017-08-25 15:22:14〗 《//首先開(kāi)一個(gè)線(xiàn)程朗若,因...
    小菜c閱讀 6,375評(píng)論 0 17
  • 轉(zhuǎn)載自:https://xiaobailong24.me/2017/03/18/Android-RxJava2.x...
    Young1657閱讀 2,016評(píng)論 1 9