Rx2:小create杖爽,大文章

前言

自從去年8月底《淺談RxJava與2.0的新特性》宙刘,已經(jīng)過(guò)去快一年。筆者也沒(méi)想到此文竟有讀者等筆者填坑快一年了稚叹,不禁汗顏。所以筆者打算寫(xiě)關(guān)于一個(gè) RxJava2 的系列文章,既為填坑叠洗,也為回報(bào)讀者對(duì)我的支持。本文為第一篇旅东。

讀本系列文章灭抑,你可能有如下收獲:

  • 了解其設(shè)計(jì)原理,代碼實(shí)現(xiàn)
  • 掌握操作符的正確使用姿勢(shì)抵代,避免采坑
  • 強(qiáng)化 Rx 編程思想腾节,寫(xiě)出更 Rx 的代碼
  • 跟讀精彩的源碼,強(qiáng)化編程功底

廢話不多說(shuō)荤牍,進(jìn)入正題案腺。

Reactive Streams

之前在《淺談RxJava與2.0的新特性》我們提到過(guò), RxJava2 遵循 Reactive Streams 的編程規(guī)范康吵, 或者更精確的說(shuō)劈榨,是 RxJava2 中的 Flowable 相關(guān)的類。因此我們只分析 Flowable 相關(guān)的實(shí)現(xiàn)與使用晦嵌,剩下的 Observable同辣、 Completable、Single耍铜、 Maybe 這些不會(huì)再提及邑闺,相信讀者朋友們可以舉一反三。

Reactive Streams 中明確規(guī)范了如下4點(diǎn):

  • process a potentially unbounded number of elements
  • in sequence,
  • asynchronously passing elements between components,
  • with mandatory non-blocking backpressure.

后面筆者會(huì)用 RS 代替全稱棕兼。
請(qǐng)跟隨本系列文章慢慢看 Flowable 是出色的完成上述的要求陡舅。

閱讀源碼的正確姿勢(shì)

Rx2 在源碼中加入了一些注解,這些注解對(duì)運(yùn)行沒(méi)有任何實(shí)際作用伴挚,僅僅是用作標(biāo)識(shí)備注靶衍,有助于開(kāi)發(fā)者了解某個(gè)操作符的正確使用姿勢(shì)灾炭,同時(shí)也有利于閱讀源碼時(shí)整理思路。這些注解位于io.reactivex.annotations包名下颅眶,這里著重介紹一個(gè)蜈出。

BackpressureSupport

BackpressureSupport 是用作標(biāo)識(shí)這個(gè)操作符對(duì)背壓的支持類型,有以下幾種:

  • PASS_THROUGH:表示這個(gè)操作符僅僅傳遞背壓類型涛酗,不做任何改變铡原。例如defer 操作符,這個(gè)操作符支持的背壓類型取決于Callable產(chǎn)生的Publisher商叹。
  • FULL:表示這個(gè)操作符支持完全的背壓燕刻,協(xié)調(diào)上下游關(guān)系
  • SPECIAL:表示這個(gè)操作符支持的背壓類型由方法上的文檔說(shuō)明
  • UNBOUNDED_IN:表示這個(gè)操作符會(huì)向上游請(qǐng)求 Long.MAX_VALUE,并協(xié)調(diào)下游
  • ERROR:表示如果下游沒(méi)有足夠數(shù)量的request剖笙,上游發(fā)射了超額的數(shù)據(jù)卵洗,這個(gè)操作符會(huì)拋出一個(gè)MissingBackpressureException
  • NONE:表示不處理背壓

上面這些字面解釋看起來(lái)還是很繞的,尤其是對(duì)于沒(méi)有閱讀過(guò)相關(guān)源碼的讀者弥咪。我們也不必一次性全部弄明白过蹂,后續(xù)會(huì)慢慢講清楚所有。

走進(jìn)create源碼

前文中有提到過(guò)聚至,Rx2 收回了create方法的權(quán)限酷勺,使開(kāi)發(fā)者自定義的create也能夠正確的支持背壓。而實(shí)現(xiàn)的方式就是通過(guò)額外提供一個(gè)BackpressureStrategy參數(shù)晚岭。也因此鸥印,create的方法注解中 BackpressureSupportSPECIAL

FlowableCreate

拋開(kāi)Rx2提供的 plugin 不談坦报,本質(zhì)上就是用 create 傳進(jìn)來(lái)的2個(gè)參數(shù)創(chuàng)建了FlowableCreate這個(gè)類库说。
根據(jù)傳入的BackpressureStrategy生成不同的Emitter對(duì)象。并遵循一致的編程約定片择,先調(diào)用 onSubscribe潜的,隨后將Emitter傳遞給FlowableOnSubscribe用來(lái)發(fā)射數(shù)據(jù)。

@Override
public void subscribeActual(Subscriber<? super T> t) {
    BaseEmitter<T> emitter;

    switch (backpressure) {
    case MISSING: {
        emitter = new MissingEmitter<T>(t);
        break;
    }
    case ERROR: {
        emitter = new ErrorAsyncEmitter<T>(t);
        break;
    }
    case DROP: {
        emitter = new DropAsyncEmitter<T>(t);
        break;
    }
    case LATEST: {
        emitter = new LatestAsyncEmitter<T>(t);
        break;
    }
    default: {
        emitter = new BufferAsyncEmitter<T>(t, bufferSize());
        break;
    }
    }

    t.onSubscribe(emitter);
    try {
        source.subscribe(emitter);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        emitter.onError(ex);
    }
}

BackpressureStrategy

Rx2 大量的類通過(guò)繼承AtomicLong來(lái)表示計(jì)算個(gè)數(shù)與發(fā)射個(gè)數(shù)字管,請(qǐng)求一個(gè)+1啰挪,發(fā)射一個(gè)-1。并且在 Rx2 中 Long.MAX_VALUE 有特殊含義嘲叔,表示無(wú)限的數(shù)據(jù)亡呵。即,如果 request(Long.MAX_VALUE)硫戈,即使發(fā)射了數(shù)據(jù)也不會(huì)減少自身的數(shù)值锰什。

這里所有的 Emitter 都繼承了基類BaseEmitter,并提供一些公共方法如setDisposable/setCancellable/requested/serialize等。然后根據(jù)各自的背壓策略汁胆,實(shí)現(xiàn)相應(yīng)的邏輯梭姓,下面分別介紹。

MISSING

MISSING即沒(méi)有背壓嫩码,我們看 onNext 函數(shù)會(huì)發(fā)現(xiàn)誉尖,每調(diào)用一次就會(huì)傳遞給下游的 subscriber.onNext,空指針則onError铸题。

@Override
public void onNext(T t) {
    if (isCancelled()) {
        return;
    }

    if (t != null) {
        actual.onNext(t);
    } else {
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
    }

    for (;;) {
        long r = get();
        if (r == 0L || compareAndSet(r, r - 1)) {
            return;
        }
    }
}

上面的代碼是2.1.2版本的源碼铡恕,筆者認(rèn)為這里有一處 BUG 。即在自減的時(shí)候回挽,沒(méi)有檢查 Long.MAX_VALUE 的情況没咙,導(dǎo)致在request(Long.MAX_VALUE)后,發(fā)射數(shù)據(jù)時(shí)依然會(huì)不斷自減千劈,這是與一致的設(shè)計(jì)思路相悖的。反觀下面 DROP 與 BUFFER 相關(guān)的 Emitter 處理時(shí)牌捷,則直接調(diào)用了BackpressureHelper.produced(this, 1)墙牌,在里面會(huì)有 Long.MAX_VALUE 的判斷。

雖然有點(diǎn)小 BUG暗甥,但是實(shí)際中除了在requested()函數(shù)中會(huì)出錯(cuò)外喜滨,不會(huì)影響正常的執(zhí)行流。且一般開(kāi)發(fā)者也不會(huì)使用Emitter.requested()函數(shù)撤防。

雖然 MISSING 不支持背壓虽风,但是沒(méi)關(guān)系,我們可以通過(guò)操作符來(lái)彌補(bǔ)寄月。



這些操作符結(jié)合使用 MISSING 的 create 方法辜膝,使得原本不支持背壓的 Flowable 支持背壓。

當(dāng)然我們也大可不必這樣麻煩漾肮,既然要使用 buffer厂抖、drop 或者 latest,使用下面的策略即可克懊。除非我們需要那些操作符提供的額外功能忱辅。

ERROR

ERROR則和最開(kāi)始的BackpressureSupport.ERROR表現(xiàn)一致。

下面這塊代碼是NoOverflowBaseAsyncEmitter谭溉,會(huì)被 ERROR 和 DROP 對(duì)應(yīng)的Emitter繼承墙懂。邏輯也很簡(jiǎn)單,即請(qǐng)求數(shù)如果還大于0扮念,則向下發(fā)射并將請(qǐng)求數(shù)減1损搬,否則走onOverflow()方法。

@Override
public final void onNext(T t) {
    if (isCancelled()) {
        return;
    }

    if (t == null) {
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
    }

    if (get() != 0) {
        actual.onNext(t);
        BackpressureHelper.produced(this, 1);
    } else {
        onOverflow();
    }
}

而 ERROR 對(duì)應(yīng)的實(shí)現(xiàn)則很簡(jiǎn)單了,不在贅述场躯。

@Override
void onOverflow() {
    onError(new MissingBackpressureException("create: could not emit value due to lack of requests"));
}

DROP

DROP即直接丟棄超額的數(shù)據(jù)谈为,體現(xiàn)在代碼中就非常簡(jiǎn)單。

@Override
void onOverflow() {
    // nothing to do
}

BUFFER 與 LATEST

這倆之所以放到了一起踢关,是因?yàn)?BUFFER 與 LATEST 本質(zhì)上都是緩存了數(shù)據(jù)伞鲫,細(xì)節(jié)上的區(qū)別就是,BUFFER 是緩存了所有數(shù)據(jù)签舞,而 LATEST 只保留了最近的一個(gè) onNext 數(shù)據(jù)秕脓。

體現(xiàn)在代碼中這兩者最主要的區(qū)別就是一個(gè)用了隊(duì)列來(lái)緩存,一個(gè)用了AtomicReference 來(lái)維持最后一個(gè)未被消費(fèi)的數(shù)據(jù)儒搭。

就挑 BUFFER 來(lái)說(shuō)吠架, onNext 就是將數(shù)據(jù)扔進(jìn)隊(duì)列,而后嘗試消費(fèi)數(shù)據(jù)即調(diào)用drain()搂鲫。onError 與 onComplete 則是將 結(jié)束標(biāo)識(shí)置為 true 傍药,并保留異常,然后依然也是在drain()中消費(fèi)該消息魂仍。
在/onNext/onError/onComplete/onRequested時(shí)拐辽,都會(huì)調(diào)用drain()來(lái)消費(fèi)隊(duì)列中的數(shù)據(jù)。

@Override
public void onNext(T t) {
    if (done || isCancelled()) {
        return;
    }

    if (t == null) {
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
    }
    queue.offer(t);
    drain();
}

drain中做的事就比較復(fù)雜了擦酌,為了保證線程安全俱诸,首先通過(guò)一個(gè)AtomicInteger來(lái)確保只有一個(gè)線程可以進(jìn)入for(;;)循環(huán)。
在 for 循環(huán)中赊舶,不斷的消費(fèi)隊(duì)列中的數(shù)據(jù)睁搭,如果隊(duì)列為空則檢查結(jié)束標(biāo)識(shí)是否為 true,是的話則發(fā)射 onComplete 或者 onError 笼平。

void drain() {
    if (wip.getAndIncrement() != 0) {
        return;
    }

    int missed = 1;
    final Subscriber<? super T> a = actual;
    final SpscLinkedArrayQueue<T> q = queue;

    for (;;) {
        long r = get();
        long e = 0L;

        while (e != r) {
            if (isCancelled()) {
                q.clear();
                return;
            }

            boolean d = done;

            T o = q.poll();

            boolean empty = o == null;

            if (d && empty) {
                Throwable ex = error;
                if (ex != null) {
                    error(ex);
                } else {
                    complete();
                }
                return;
            }

            if (empty) {
                break;
            }

            a.onNext(o);

            e++;
        }

        if (e == r) {
            if (isCancelled()) {
                q.clear();
                return;
            }

            boolean d = done;

            boolean empty = q.isEmpty();

            if (d && empty) {
                Throwable ex = error;
                if (ex != null) {
                    error(ex);
                } else {
                    complete();
                }
                return;
            }
        }

        if (e != 0) {
            BackpressureHelper.produced(this, e);
        }

        missed = wip.addAndGet(-missed);
        if (missed == 0) {
            break;
        }
    }
}

這里請(qǐng)大家留意一個(gè)編程的套路园骆,在絕大多數(shù)隊(duì)列消費(fèi)的場(chǎng)景里, Rx2 中都是使用了下面的方式出吹。這也是我們可以積累使用的遇伞。通過(guò)這種方式可以保證for循環(huán)里的代碼是單線程執(zhí)行的,且如果執(zhí)行期間有一次或多次新的調(diào)用drain()捶牢,會(huì)導(dǎo)致重新走一遍包含注釋處的代碼鸠珠,確保數(shù)據(jù)可以正確的消費(fèi)發(fā)射。

void drain() {
    if (wip.getAndIncrement() != 0) {
        return;
    }

    int missed = 1;
    for (;;) {
        
        // 消費(fèi)隊(duì)列秋麸, 發(fā)射數(shù)據(jù)

        missed = wip.addAndGet(-missed);
        if (missed == 0) {
            break;
        }
    }
}

小結(jié)

筆者在介紹過(guò)程中已經(jīng)省略了很多細(xì)枝末節(jié)渐排,不免顯得知識(shí)有些分散,結(jié)合源碼閱讀效果更佳灸蟆。沒(méi)想到一個(gè)小小的 create 也包含這么多的玄機(jī)驯耻。

我相信通過(guò)閱讀這篇文章,讀者們寫(xiě) create 的時(shí)候應(yīng)該可以做到結(jié)合實(shí)際場(chǎng)景選擇正確的BackpressureStrategy

有了 create 便從此開(kāi)啟 Rx2 萬(wàn)里長(zhǎng)征第一步可缚。下一篇霎迫,我們將會(huì)介紹 Rx2 的線程調(diào)度相關(guān)的操作符及其實(shí)現(xiàn),敬請(qǐng)期待帘靡。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末知给,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子描姚,更是在濱河造成了極大的恐慌涩赢,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,348評(píng)論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件轩勘,死亡現(xiàn)場(chǎng)離奇詭異筒扒,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)绊寻,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,122評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門花墩,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人澄步,你說(shuō)我怎么就攤上這事观游。” “怎么了驮俗?”我有些...
    開(kāi)封第一講書(shū)人閱讀 156,936評(píng)論 0 347
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)允跑。 經(jīng)常有香客問(wèn)我王凑,道長(zhǎng),這世上最難降的妖魔是什么聋丝? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,427評(píng)論 1 283
  • 正文 為了忘掉前任索烹,我火速辦了婚禮,結(jié)果婚禮上弱睦,老公的妹妹穿的比我還像新娘百姓。我一直安慰自己,他們只是感情好况木,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,467評(píng)論 6 385
  • 文/花漫 我一把揭開(kāi)白布垒拢。 她就那樣靜靜地躺著,像睡著了一般火惊。 火紅的嫁衣襯著肌膚如雪求类。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 49,785評(píng)論 1 290
  • 那天屹耐,我揣著相機(jī)與錄音尸疆,去河邊找鬼。 笑死,一個(gè)胖子當(dāng)著我的面吹牛寿弱,可吹牛的內(nèi)容都是我干的犯眠。 我是一名探鬼主播,決...
    沈念sama閱讀 38,931評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼症革,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼筐咧!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起地沮,我...
    開(kāi)封第一講書(shū)人閱讀 37,696評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤嗜浮,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后摩疑,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體危融,經(jīng)...
    沈念sama閱讀 44,141評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,483評(píng)論 2 327
  • 正文 我和宋清朗相戀三年雷袋,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了吉殃。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,625評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡楷怒,死狀恐怖蛋勺,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情鸠删,我是刑警寧澤抱完,帶...
    沈念sama閱讀 34,291評(píng)論 4 329
  • 正文 年R本政府宣布,位于F島的核電站刃泡,受9級(jí)特大地震影響巧娱,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜烘贴,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,892評(píng)論 3 312
  • 文/蒙蒙 一禁添、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧桨踪,春花似錦老翘、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,741評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至纳账,卻和暖如春逛薇,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背疏虫。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,977評(píng)論 1 265
  • 我被黑心中介騙來(lái)泰國(guó)打工永罚, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留啤呼,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,324評(píng)論 2 360
  • 正文 我出身青樓呢袱,卻偏偏與公主長(zhǎng)得像官扣,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子羞福,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,492評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容

  • 怎么如此平靜, 感覺(jué)像是走錯(cuò)了片場(chǎng).為什么呢, 因?yàn)樯舷掠喂ぷ髟谕粋€(gè)線程呀騷年們! 這個(gè)時(shí)候上游每次調(diào)用emit...
    Young1657閱讀 1,455評(píng)論 2 1
  • 轉(zhuǎn)載自:https://xiaobailong24.me/2017/03/18/Android-RxJava2.x...
    Young1657閱讀 2,016評(píng)論 1 9
  • 背景 對(duì)于生產(chǎn)者和消費(fèi)者模型惕蹄,存在一個(gè)問(wèn)題就是當(dāng)生產(chǎn)者生產(chǎn)的速度大于消費(fèi)者消費(fèi)速度,并且生產(chǎn)過(guò)程不會(huì)停止治专,生產(chǎn)者和...
    風(fēng)雪圍城閱讀 7,622評(píng)論 0 4
  • 作者: maplejaw本篇只解析標(biāo)準(zhǔn)包中的操作符卖陵。對(duì)于擴(kuò)展包,由于使用率較低张峰,如有需求泪蔫,請(qǐng)讀者自行查閱文檔。 創(chuàng)...
    maplejaw_閱讀 45,628評(píng)論 8 93
  • 本篇文章介主要紹RxJava中操作符是以函數(shù)作為基本單位喘批,與響應(yīng)式編程作為結(jié)合使用的撩荣,對(duì)什么是操作、操作符都有哪些...
    嘎啦果安卓獸閱讀 2,851評(píng)論 0 10