深入理解 RxJava2:論 Parallel 與并發(fā)(5)

前言

歡迎來(lái)到深入理解 RxJava2 系列第五篇僻他。在上一篇文章中,我們?cè)谝粋€(gè)例子里用到了 parallel 操作符,本篇我們便是要介紹該操作符坤候,并對(duì)比 RxJava 一些常見(jiàn)的并發(fā)手段,詳述 parallel 的優(yōu)越性企蹭。

陳舊的 parallel

出生

Parallel 這個(gè)操作符首次在 RxJava 0.13.4 版本添加進(jìn)去白筹,作為一個(gè)實(shí)驗(yàn)性質(zhì)的 API,并在同一個(gè)版本為Scheduler添加了degreeOfParallelism方法為parallel獨(dú)用谅摄。

public abstract class Scheduler {

    public int degreeOfParallelism() {
        return Runtime.getRuntime().availableProcessors();
    }
    
    ...
}

后來(lái)在 0.18.0 版本重構(gòu)了一次Scheduler徒河,并順帶把degreeOfParallelism簡(jiǎn)化成了parallelism

遺棄

然而這個(gè)操作符當(dāng)時(shí)的實(shí)現(xiàn)送漠,并不是那么恰當(dāng)顽照,和大家預(yù)期的用法不一致。類(lèi)比 Java8 的 Stream API闽寡,開(kāi)發(fā)者們期望的是在調(diào)用parallel后代兵,后續(xù)的操作符都會(huì)并發(fā)執(zhí)行尼酿,然而事實(shí)并不是這樣。

當(dāng)時(shí)的parallel實(shí)現(xiàn)的有點(diǎn)半成品的意味植影,因此在 1.0.0-RC2 時(shí)被移除了裳擎。詳情見(jiàn) Issue :https://github.com/ReactiveX/RxJava/issues/1673

與此同時(shí)Schedulerparallelism便不再有用了,隨即在 1.0.0-RC11 版本被移除何乎。

GroupBy 與 FlatMap

parallel不在的日子里句惯,我們?nèi)绻氩l(fā)的做一些操作,通常都會(huì)利用flatMap

...
.flatMap(new Function<Object, Publisher<?>>() {
    @Override
    public Publisher<?> apply(Object o) throws Exception {
        return Flowable
                .just(o)
                .subscribeOn(Schedulers.computation())
                ...;
    }
})
...
.subscribe();

有些讀者會(huì)疑問(wèn)為什么要這樣寫(xiě)支救,直接用observeOnsubscribeOn不行嗎抢野。顯然不行,我們?cè)?a target="_blank">《深入理解 RxJava2:Scheduler(2)》強(qiáng)調(diào)過(guò)各墨,每個(gè)Worker的任務(wù)都是串行的指孤,因此如果不用flatMap來(lái)生成多個(gè)Flowable,就無(wú)法達(dá)到并行的效果贬堵。

事實(shí)上上面的這種寫(xiě)法吞吐量非常的差恃轩,因此我們還需要借助groupByflatMap來(lái)配合:

Flowable.just("a", "b", "c", "d", "e")
        .groupBy(new Function<String, Integer>() {
            int i = 0;
            final int cpu = Runtime.getRuntime().availableProcessors();

            @Override
            public Integer apply(String s) throws Exception {
                return (i++) % cpu;
            }
        })
        .flatMap(new Function<GroupedFlowable<Integer, String>, Publisher<?>>() {
            @Override
            public Publisher<?> apply(GroupedFlowable<Integer, String> g) throws Exception {
                return g.observeOn(Schedulers.computation())
                        ... // do some job
            }
        })
        ...
        .subscribe();

通過(guò)groupBy將數(shù)據(jù)分組,再將每組的數(shù)據(jù)通過(guò)flatMap調(diào)度至一個(gè)線程來(lái)執(zhí)行黎做。groupByflatMap的組合叉跛,可以任意控制并發(fā)數(shù),由于避免了很多無(wú)用的損耗蒸殿,性能較單獨(dú)的flatMap大大提升筷厘。

然而上面的代碼表述力不太好,而且很多不熟悉這些操作符的開(kāi)發(fā)者寫(xiě)不出類(lèi)似的代碼宏所,簡(jiǎn)單的說(shuō)就是不太好用酥艳。

于是一個(gè)能無(wú)縫的嵌入Flowable調(diào)用鏈的parallel迫在眉睫。

重生

在 RxJava 2.0.5 版本爬骤,parallel終于浴火重生充石。而這次重生后的parallel不再寄托于Flowable,而是自立門(mén)戶霞玄,通過(guò)獨(dú)立的ParallelFlowable來(lái)實(shí)現(xiàn)骤铃。

public abstract class ParallelFlowable<T> {
    public abstract void subscribe(@NonNull Subscriber<? super T>[] subscribers);
    public abstract int parallelism();
}

從類(lèi)的定義可以看出,這個(gè)對(duì)象的訂閱者是Subscriber數(shù)組坷剧,且數(shù)組的長(zhǎng)度必須嚴(yán)格等于parallelism()返回值惰爬。由于subscribe接口的變化,并發(fā)的操作符編寫(xiě)就簡(jiǎn)單很多听隐。

ParallelFlowable也類(lèi)似Flowable內(nèi)置了一些操作符,雖然數(shù)量有限哄啄,但是非常實(shí)用雅任,且可以與Flowable無(wú)縫轉(zhuǎn)換风范。

操作符

Parallel

Flowable中, 可以通過(guò)Parallel操作符將Flowable對(duì)象轉(zhuǎn)變成ParallelFlowable對(duì)象:

public final ParallelFlowable<T> parallel(int parallelism) {
    return ParallelFlowable.from(this, parallelism);
}

從一個(gè)Flowable轉(zhuǎn)變成ParallelFlowable并沒(méi)有線程相關(guān)的操作沪么,從參數(shù)也可看出硼婿,并無(wú)Scheduler的參與。數(shù)據(jù)流的轉(zhuǎn)換也非常簡(jiǎn)單:

Parallel

可見(jiàn)Parallel僅僅是將原本應(yīng)該分發(fā)至一個(gè)Subscriber的數(shù)據(jù)流拆分開(kāi)禽车,“雨露均沾”了而已寇漫。

但是轉(zhuǎn)變成ParallelFlowable后,由于多個(gè)Subscriber的存在殉摔,并發(fā)就非常的簡(jiǎn)單了州胳,我們只需要提供一個(gè)線程操作符即可:

RunOn

RunOnParallelFlowable就像ObserveOnFlowable

public final ParallelFlowable<T> runOn(@NonNull Scheduler scheduler, int prefetch) {
    return new ParallelRunOn<T>(this, scheduler, prefetch);
}

public final Flowable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    return new FlowableObserveOn<T>(this, scheduler, delayError, bufferSize);
}

這兩者參數(shù)幾乎一致,唯一不同的是ObserveOn額外提供了一個(gè)delayError的參數(shù)逸月。

他們的效果是非常相似的栓撞,都是對(duì)下游的onNext / onComplete / onError調(diào)度線程。不過(guò)RunOn對(duì)于下游的每個(gè)Subscriber都會(huì)獨(dú)立創(chuàng)建一個(gè)Worker來(lái)調(diào)度:

RunOn

因此多個(gè)Subscriber是可能并發(fā)的碗硬,這取決于選擇的Scheduler瓤湘。我們?cè)谇拔闹袕?qiáng)調(diào)過(guò),每個(gè)Worker創(chuàng)建的任務(wù)僅與該Worker相關(guān)聯(lián)恩尾,但是這并不意味著每個(gè)Worker對(duì)應(yīng)一個(gè)線程弛说,不同的Scheduler的實(shí)現(xiàn)創(chuàng)建的Worker效果大相徑庭,更多細(xì)節(jié)可查看《深入理解 RxJava2:Scheduler(2)》翰意。

Sequential

顧名思義木人,該操作符就是重新把ParallelFlowable轉(zhuǎn)回Flowable,但是數(shù)據(jù)是循環(huán)發(fā)射的猎物,不保證遵循數(shù)據(jù)原始的發(fā)射順序:

Sequential

其他

以上三個(gè)操作符是最核心也是最常用的虎囚,除此之外,ParallelFlowable還有諸多操作符蔫磨,效果與Flowable中類(lèi)似淘讥,部分可根據(jù)實(shí)際情況與runOn結(jié)合使用,以達(dá)到最佳效果堤如。

  • Map
  • Filter
  • FlatMap
  • doOnXXX / doAfterXXX
  • reduce
  • sorted
  • ...

對(duì)比

GroupBy 與 Parallel

上面我們舉例了通過(guò)groupByflatMap組合實(shí)現(xiàn)的并發(fā)效果蒲列。事實(shí)上,除了從感官上更加好用外搀罢,parallel的并發(fā)效果也是最好的蝗岖。

Benchmark

在 GitHub RxJava 的倉(cāng)庫(kù)中,其實(shí)已經(jīng)內(nèi)置了基于 OpenJDK JMH 的 Benchmark 的代碼榔至,均在 src/jmh 目錄中抵赢,對(duì) JMH 不熟悉的同學(xué)可以自行去了解。

我們這里對(duì)并發(fā)的性能做一次測(cè)試,使用倉(cāng)庫(kù)中的ParallelPerf類(lèi)即可铅鲤,筆者機(jī)器的配置是 3 GHz Intel Core i7 4 核 + 16 GB 1600 MHz DDR3划提,效果如下:

Benchmark

我這里解釋一下參數(shù)的含義:

  • Count:數(shù)據(jù)源數(shù)目
  • Compute: 可以認(rèn)為是 CPU 耗時(shí)的單位,隨著數(shù)值增大而接近線性增長(zhǎng)
  • Parallelism:并發(fā)數(shù)目邢享,這里可以近似地認(rèn)為是線程數(shù)目

另外圖表中表頭帶 error 的字樣是表示 99.9% 的置信區(qū)間鹏往,如
第一行的 GroupBy 置信區(qū)間為:[1539.814 - 41.88, 1539.814 + 41.88]。

根據(jù)圖中的結(jié)果骇塘,可見(jiàn)在Compute較小的情況下伊履,parallelgroupBy是有著絕對(duì)的優(yōu)勢(shì)的,說(shuō)明parallel的性能損耗較小款违。

Compute較大時(shí)唐瀑,操作符內(nèi)部的性能損耗相對(duì)全局的影響較小,因此這兩者性能則差不多奠货。

SchedulerMultiWorkerSupport

不僅如此介褥,runOn操作符在創(chuàng)建Worker時(shí),有特別的優(yōu)化:

public interface SchedulerMultiWorkerSupport {
    void createWorkers(int number, @NonNull WorkerCallback callback);

    interface WorkerCallback {
        void onWorker(int index, @NonNull Scheduler.Worker worker);
    }
}

Scheduler通過(guò)實(shí)現(xiàn)這個(gè)接口递惋,能夠針對(duì)一次創(chuàng)建多個(gè)Worker的情況做優(yōu)化柔滔,目前僅ComputationScheduler支持。具體的源碼不列出來(lái)了萍虽,優(yōu)化后實(shí)際的效果就是盡可能的平均了線程和Worker的負(fù)載睛廊。

換言之,如果我們使用groupBy做并發(fā)時(shí)杉编,對(duì)應(yīng)的分組后的Flowable可能由于其他的操作符也在使用ComputationScheduler導(dǎo)致分下去的Worker對(duì)應(yīng)的線程可能有重合和遺漏超全。

舉個(gè)例子,請(qǐng)看下面的代碼:

Flowable.just(1, 2)
        .groupBy(new Function<Integer, Integer>() {
            @Override
            public Integer apply(Integer v) throws Exception {
                return v % 2;
            }
        })
        .subscribeOn(Schedulers.io())
        .flatMap(new Function<GroupedFlowable<Integer, Integer>, Publisher<Integer>>() {
            @Override
            public Publisher<Integer> apply(GroupedFlowable<Integer, Integer> g) throws Exception {
                Publisher<Integer> it = g.observeOn(Schedulers.computation()).doOnNext(i -> {
                    System.out.println(Thread.currentThread().getName());
                });
                Thread.sleep(1000);
                return it;
            }
        })
        .subscribe();

輸出:
RxComputationThreadPool-1
RxComputationThreadPool-2

以上的結(jié)果是符合我們期望的邓馒,數(shù)據(jù)根據(jù)模 2 的剩余類(lèi)劃分了兩組嘶朱,每組的數(shù)據(jù)的分發(fā)在不同的線程中,但是我們?cè)谏厦娴拇a后面追加以下的代碼執(zhí)行:

...
Thread.sleep(1500);
int core = Runtime.getRuntime().availableProcessors();
for (int i = 0; i < core - 1; i++) {
    scheduler.createWorker();
}

輸出:
RxComputationThreadPool-1
RxComputationThreadPool-1

為什么發(fā)生這樣的情況呢光酣,首先我們?cè)诿總€(gè)數(shù)據(jù)源observeOn后疏遏,休眠一秒,隨后這個(gè)Flowable會(huì)被立即訂閱救军,觸發(fā)createWorker财异,我們下面的代碼休眠了 1.5 秒,即處于第一個(gè)Flowable被訂閱后觸發(fā)了createWorker唱遭,第二個(gè)Flowable尚未被訂閱時(shí)戳寸,我們又分配了core - 1個(gè)的Worker,因此groupBy分配的下個(gè)Worker的線程又和第一個(gè)分配的相同了拷泽。注意這里我們說(shuō)的是依賴的線程相同疫鹊,但是每個(gè)Worker對(duì)象都是獨(dú)立的袖瞻,具體原因在上面鏈接的系列第二篇中詳細(xì)講述過(guò)。

而在parallelWorker是連續(xù)分配的拆吆,因此不受這種情況的干擾虏辫,有興趣的讀者們可以自己嘗試一番。

結(jié)語(yǔ)

Parallel 在改版后锈拨,確實(shí)是 RxJava2 中并發(fā)的不二選擇。配合內(nèi)置的操作符能夠讓大家收放自如羹唠,不再受并發(fā)的困擾奕枢。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市佩微,隨后出現(xiàn)的幾起案子缝彬,更是在濱河造成了極大的恐慌,老刑警劉巖哺眯,帶你破解...
    沈念sama閱讀 211,042評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件谷浅,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡奶卓,警方通過(guò)查閱死者的電腦和手機(jī)一疯,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,996評(píng)論 2 384
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)夺姑,“玉大人墩邀,你說(shuō)我怎么就攤上這事≌嫡悖” “怎么了眉睹?”我有些...
    開(kāi)封第一講書(shū)人閱讀 156,674評(píng)論 0 345
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)废膘。 經(jīng)常有香客問(wèn)我竹海,道長(zhǎng),這世上最難降的妖魔是什么丐黄? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,340評(píng)論 1 283
  • 正文 為了忘掉前任斋配,我火速辦了婚禮,結(jié)果婚禮上孵稽,老公的妹妹穿的比我還像新娘许起。我一直安慰自己,他們只是感情好菩鲜,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,404評(píng)論 5 384
  • 文/花漫 我一把揭開(kāi)白布园细。 她就那樣靜靜地躺著,像睡著了一般接校。 火紅的嫁衣襯著肌膚如雪猛频。 梳的紋絲不亂的頭發(fā)上狮崩,一...
    開(kāi)封第一講書(shū)人閱讀 49,749評(píng)論 1 289
  • 那天,我揣著相機(jī)與錄音鹿寻,去河邊找鬼睦柴。 笑死,一個(gè)胖子當(dāng)著我的面吹牛毡熏,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播痢法,決...
    沈念sama閱讀 38,902評(píng)論 3 405
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼财搁!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起尖奔,我...
    開(kāi)封第一講書(shū)人閱讀 37,662評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎提茁,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體茴扁,經(jīng)...
    沈念sama閱讀 44,110評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡稀拐,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,451評(píng)論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了丹弱。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片德撬。...
    茶點(diǎn)故事閱讀 38,577評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖躲胳,靈堂內(nèi)的尸體忽然破棺而出蜓洪,到底是詐尸還是另有隱情,我是刑警寧澤坯苹,帶...
    沈念sama閱讀 34,258評(píng)論 4 328
  • 正文 年R本政府宣布隆檀,位于F島的核電站,受9級(jí)特大地震影響粹湃,放射性物質(zhì)發(fā)生泄漏恐仑。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,848評(píng)論 3 312
  • 文/蒙蒙 一为鳄、第九天 我趴在偏房一處隱蔽的房頂上張望裳仆。 院中可真熱鬧,春花似錦孤钦、人聲如沸歧斟。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,726評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)静袖。三九已至觉鼻,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間队橙,已是汗流浹背坠陈。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,952評(píng)論 1 264
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留捐康,地道東北人畅姊。 一個(gè)月前我還...
    沈念sama閱讀 46,271評(píng)論 2 360
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像吹由,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子朱嘴,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,452評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容