前言
歡迎來(lái)到深入理解 RxJava2 系列第五篇僻他。在上一篇文章中,我們?cè)谝粋€(gè)例子里用到了 parallel 操作符,本篇我們便是要介紹該操作符坤候,并對(duì)比 RxJava 一些常見(jiàn)的并發(fā)手段,詳述 parallel 的優(yōu)越性企蹭。
陳舊的 parallel
出生
Parallel 這個(gè)操作符首次在 RxJava 0.13.4 版本添加進(jìn)去白筹,作為一個(gè)實(shí)驗(yàn)性質(zhì)的 API,并在同一個(gè)版本為Scheduler
添加了degreeOfParallelism
方法為parallel
獨(dú)用谅摄。
public abstract class Scheduler {
public int degreeOfParallelism() {
return Runtime.getRuntime().availableProcessors();
}
...
}
后來(lái)在 0.18.0 版本重構(gòu)了一次Scheduler
徒河,并順帶把degreeOfParallelism
簡(jiǎn)化成了parallelism
。
遺棄
然而這個(gè)操作符當(dāng)時(shí)的實(shí)現(xiàn)送漠,并不是那么恰當(dāng)顽照,和大家預(yù)期的用法不一致。類(lèi)比 Java8 的 Stream API闽寡,開(kāi)發(fā)者們期望的是在調(diào)用parallel
后代兵,后續(xù)的操作符都會(huì)并發(fā)執(zhí)行尼酿,然而事實(shí)并不是這樣。
當(dāng)時(shí)的parallel
實(shí)現(xiàn)的有點(diǎn)半成品的意味植影,因此在 1.0.0-RC2 時(shí)被移除了裳擎。詳情見(jiàn) Issue :https://github.com/ReactiveX/RxJava/issues/1673
與此同時(shí)Scheduler
的parallelism
便不再有用了,隨即在 1.0.0-RC11 版本被移除何乎。
GroupBy 與 FlatMap
在parallel
不在的日子里句惯,我們?nèi)绻氩l(fā)的做一些操作,通常都會(huì)利用flatMap
:
...
.flatMap(new Function<Object, Publisher<?>>() {
@Override
public Publisher<?> apply(Object o) throws Exception {
return Flowable
.just(o)
.subscribeOn(Schedulers.computation())
...;
}
})
...
.subscribe();
有些讀者會(huì)疑問(wèn)為什么要這樣寫(xiě)支救,直接用observeOn
與subscribeOn
不行嗎抢野。顯然不行,我們?cè)?a target="_blank">《深入理解 RxJava2:Scheduler(2)》強(qiáng)調(diào)過(guò)各墨,每個(gè)Worker
的任務(wù)都是串行的指孤,因此如果不用flatMap
來(lái)生成多個(gè)Flowable
,就無(wú)法達(dá)到并行的效果贬堵。
事實(shí)上上面的這種寫(xiě)法吞吐量非常的差恃轩,因此我們還需要借助groupBy
和 flatMap
來(lái)配合:
Flowable.just("a", "b", "c", "d", "e")
.groupBy(new Function<String, Integer>() {
int i = 0;
final int cpu = Runtime.getRuntime().availableProcessors();
@Override
public Integer apply(String s) throws Exception {
return (i++) % cpu;
}
})
.flatMap(new Function<GroupedFlowable<Integer, String>, Publisher<?>>() {
@Override
public Publisher<?> apply(GroupedFlowable<Integer, String> g) throws Exception {
return g.observeOn(Schedulers.computation())
... // do some job
}
})
...
.subscribe();
通過(guò)groupBy
將數(shù)據(jù)分組,再將每組的數(shù)據(jù)通過(guò)flatMap
調(diào)度至一個(gè)線程來(lái)執(zhí)行黎做。groupBy
與flatMap
的組合叉跛,可以任意控制并發(fā)數(shù),由于避免了很多無(wú)用的損耗蒸殿,性能較單獨(dú)的flatMap
大大提升筷厘。
然而上面的代碼表述力不太好,而且很多不熟悉這些操作符的開(kāi)發(fā)者寫(xiě)不出類(lèi)似的代碼宏所,簡(jiǎn)單的說(shuō)就是不太好用酥艳。
于是一個(gè)能無(wú)縫的嵌入Flowable
調(diào)用鏈的parallel
迫在眉睫。
重生
在 RxJava 2.0.5 版本爬骤,parallel
終于浴火重生充石。而這次重生后的parallel
不再寄托于Flowable
,而是自立門(mén)戶霞玄,通過(guò)獨(dú)立的ParallelFlowable
來(lái)實(shí)現(xiàn)骤铃。
public abstract class ParallelFlowable<T> {
public abstract void subscribe(@NonNull Subscriber<? super T>[] subscribers);
public abstract int parallelism();
}
從類(lèi)的定義可以看出,這個(gè)對(duì)象的訂閱者是Subscriber
數(shù)組坷剧,且數(shù)組的長(zhǎng)度必須嚴(yán)格等于parallelism()
返回值惰爬。由于subscribe
接口的變化,并發(fā)的操作符編寫(xiě)就簡(jiǎn)單很多听隐。
ParallelFlowable
也類(lèi)似Flowable
內(nèi)置了一些操作符,雖然數(shù)量有限哄啄,但是非常實(shí)用雅任,且可以與Flowable
無(wú)縫轉(zhuǎn)換风范。
操作符
Parallel
在Flowable
中, 可以通過(guò)Parallel
操作符將Flowable
對(duì)象轉(zhuǎn)變成ParallelFlowable
對(duì)象:
public final ParallelFlowable<T> parallel(int parallelism) {
return ParallelFlowable.from(this, parallelism);
}
從一個(gè)Flowable
轉(zhuǎn)變成ParallelFlowable
并沒(méi)有線程相關(guān)的操作沪么,從參數(shù)也可看出硼婿,并無(wú)Scheduler
的參與。數(shù)據(jù)流的轉(zhuǎn)換也非常簡(jiǎn)單:
可見(jiàn)Parallel
僅僅是將原本應(yīng)該分發(fā)至一個(gè)Subscriber
的數(shù)據(jù)流拆分開(kāi)禽车,“雨露均沾”了而已寇漫。
但是轉(zhuǎn)變成ParallelFlowable
后,由于多個(gè)Subscriber
的存在殉摔,并發(fā)就非常的簡(jiǎn)單了州胳,我們只需要提供一個(gè)線程操作符即可:
RunOn
RunOn
于ParallelFlowable
就像ObserveOn
于Flowable
:
public final ParallelFlowable<T> runOn(@NonNull Scheduler scheduler, int prefetch) {
return new ParallelRunOn<T>(this, scheduler, prefetch);
}
public final Flowable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
return new FlowableObserveOn<T>(this, scheduler, delayError, bufferSize);
}
這兩者參數(shù)幾乎一致,唯一不同的是ObserveOn
額外提供了一個(gè)delayError
的參數(shù)逸月。
他們的效果是非常相似的栓撞,都是對(duì)下游的onNext / onComplete / onError
調(diào)度線程。不過(guò)RunOn
對(duì)于下游的每個(gè)Subscriber
都會(huì)獨(dú)立創(chuàng)建一個(gè)Worker
來(lái)調(diào)度:
因此多個(gè)Subscriber
是可能并發(fā)的碗硬,這取決于選擇的Scheduler
瓤湘。我們?cè)谇拔闹袕?qiáng)調(diào)過(guò),每個(gè)Worker
創(chuàng)建的任務(wù)僅與該Worker
相關(guān)聯(lián)恩尾,但是這并不意味著每個(gè)Worker
對(duì)應(yīng)一個(gè)線程弛说,不同的Scheduler
的實(shí)現(xiàn)創(chuàng)建的Worker
效果大相徑庭,更多細(xì)節(jié)可查看《深入理解 RxJava2:Scheduler(2)》翰意。
Sequential
顧名思義木人,該操作符就是重新把ParallelFlowable
轉(zhuǎn)回Flowable
,但是數(shù)據(jù)是循環(huán)發(fā)射的猎物,不保證遵循數(shù)據(jù)原始的發(fā)射順序:
其他
以上三個(gè)操作符是最核心也是最常用的虎囚,除此之外,ParallelFlowable
還有諸多操作符蔫磨,效果與Flowable
中類(lèi)似淘讥,部分可根據(jù)實(shí)際情況與runOn
結(jié)合使用,以達(dá)到最佳效果堤如。
- Map
- Filter
- FlatMap
- doOnXXX / doAfterXXX
- reduce
- sorted
- ...
對(duì)比
GroupBy 與 Parallel
上面我們舉例了通過(guò)groupBy
與flatMap
組合實(shí)現(xiàn)的并發(fā)效果蒲列。事實(shí)上,除了從感官上更加好用外搀罢,parallel
的并發(fā)效果也是最好的蝗岖。
Benchmark
在 GitHub RxJava 的倉(cāng)庫(kù)中,其實(shí)已經(jīng)內(nèi)置了基于 OpenJDK JMH 的 Benchmark 的代碼榔至,均在 src/jmh 目錄中抵赢,對(duì) JMH 不熟悉的同學(xué)可以自行去了解。
我們這里對(duì)并發(fā)的性能做一次測(cè)試,使用倉(cāng)庫(kù)中的ParallelPerf
類(lèi)即可铅鲤,筆者機(jī)器的配置是 3 GHz Intel Core i7 4 核 + 16 GB 1600 MHz DDR3划提,效果如下:
我這里解釋一下參數(shù)的含義:
- Count:數(shù)據(jù)源數(shù)目
- Compute: 可以認(rèn)為是 CPU 耗時(shí)的單位,隨著數(shù)值增大而接近線性增長(zhǎng)
- Parallelism:并發(fā)數(shù)目邢享,這里可以近似地認(rèn)為是線程數(shù)目
另外圖表中表頭帶 error 的字樣是表示 99.9% 的置信區(qū)間鹏往,如
第一行的 GroupBy 置信區(qū)間為:[1539.814 - 41.88, 1539.814 + 41.88]。
根據(jù)圖中的結(jié)果骇塘,可見(jiàn)在Compute
較小的情況下伊履,parallel
比groupBy
是有著絕對(duì)的優(yōu)勢(shì)的,說(shuō)明parallel
的性能損耗較小款违。
而Compute
較大時(shí)唐瀑,操作符內(nèi)部的性能損耗相對(duì)全局的影響較小,因此這兩者性能則差不多奠货。
SchedulerMultiWorkerSupport
不僅如此介褥,runOn
操作符在創(chuàng)建Worker
時(shí),有特別的優(yōu)化:
public interface SchedulerMultiWorkerSupport {
void createWorkers(int number, @NonNull WorkerCallback callback);
interface WorkerCallback {
void onWorker(int index, @NonNull Scheduler.Worker worker);
}
}
Scheduler
通過(guò)實(shí)現(xiàn)這個(gè)接口递惋,能夠針對(duì)一次創(chuàng)建多個(gè)Worker
的情況做優(yōu)化柔滔,目前僅ComputationScheduler
支持。具體的源碼不列出來(lái)了萍虽,優(yōu)化后實(shí)際的效果就是盡可能的平均了線程和Worker
的負(fù)載睛廊。
換言之,如果我們使用groupBy
做并發(fā)時(shí)杉编,對(duì)應(yīng)的分組后的Flowable
可能由于其他的操作符也在使用ComputationScheduler
導(dǎo)致分下去的Worker
對(duì)應(yīng)的線程可能有重合和遺漏超全。
舉個(gè)例子,請(qǐng)看下面的代碼:
Flowable.just(1, 2)
.groupBy(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer v) throws Exception {
return v % 2;
}
})
.subscribeOn(Schedulers.io())
.flatMap(new Function<GroupedFlowable<Integer, Integer>, Publisher<Integer>>() {
@Override
public Publisher<Integer> apply(GroupedFlowable<Integer, Integer> g) throws Exception {
Publisher<Integer> it = g.observeOn(Schedulers.computation()).doOnNext(i -> {
System.out.println(Thread.currentThread().getName());
});
Thread.sleep(1000);
return it;
}
})
.subscribe();
輸出:
RxComputationThreadPool-1
RxComputationThreadPool-2
以上的結(jié)果是符合我們期望的邓馒,數(shù)據(jù)根據(jù)模 2 的剩余類(lèi)劃分了兩組嘶朱,每組的數(shù)據(jù)的分發(fā)在不同的線程中,但是我們?cè)谏厦娴拇a后面追加以下的代碼執(zhí)行:
...
Thread.sleep(1500);
int core = Runtime.getRuntime().availableProcessors();
for (int i = 0; i < core - 1; i++) {
scheduler.createWorker();
}
輸出:
RxComputationThreadPool-1
RxComputationThreadPool-1
為什么發(fā)生這樣的情況呢光酣,首先我們?cè)诿總€(gè)數(shù)據(jù)源observeOn
后疏遏,休眠一秒,隨后這個(gè)Flowable
會(huì)被立即訂閱救军,觸發(fā)createWorker
财异,我們下面的代碼休眠了 1.5 秒,即處于第一個(gè)Flowable
被訂閱后觸發(fā)了createWorker
唱遭,第二個(gè)Flowable
尚未被訂閱時(shí)戳寸,我們又分配了core - 1
個(gè)的Worker
,因此groupBy
分配的下個(gè)Worker
的線程又和第一個(gè)分配的相同了拷泽。注意這里我們說(shuō)的是依賴的線程相同疫鹊,但是每個(gè)Worker
對(duì)象都是獨(dú)立的袖瞻,具體原因在上面鏈接的系列第二篇中詳細(xì)講述過(guò)。
而在parallel
中Worker
是連續(xù)分配的拆吆,因此不受這種情況的干擾虏辫,有興趣的讀者們可以自己嘗試一番。
結(jié)語(yǔ)
Parallel 在改版后锈拨,確實(shí)是 RxJava2 中并發(fā)的不二選擇。配合內(nèi)置的操作符能夠讓大家收放自如羹唠,不再受并發(fā)的困擾奕枢。