作者: 一字馬胡
轉(zhuǎn)載標(biāo)志 【2017-11-03】
更新日志
日期 | 更新內(nèi)容 | 備注 |
---|---|---|
2017-11-03 | 添加轉(zhuǎn)載標(biāo)志 | 持續(xù)更新 |
并行與并發(fā)
關(guān)于并發(fā)與并行嫉父,需要弄清楚的是,并行關(guān)注于多個(gè)任務(wù)同時(shí)進(jìn)行眼刃,而并發(fā)則通過(guò)調(diào)度來(lái)不停的切換多個(gè)任務(wù)執(zhí)行绕辖,而實(shí)質(zhì)上多個(gè)任務(wù)不是同時(shí)執(zhí)的。并發(fā)擂红,英文單詞為:Concurrent仪际。并行的英文單詞為:parallel。如果想對(duì)并發(fā)和并行有一個(gè)比較直觀的認(rèn)識(shí)昵骤,可以參考下面這張圖片:
Fork/Join 框架與 Java Stream API
Fork/Join框架屬于并行框架树碱,關(guān)于Fork/Join框架的一些內(nèi)容,可以參考這篇文章:Java Fork/Join并行框架变秦。簡(jiǎn)單來(lái)說(shuō)成榜,F(xiàn)ork/Join框架可以將大的任務(wù)切分為足夠小的任務(wù),然后將小任務(wù)分配給不同的線程來(lái)執(zhí)行蹦玫,而線程之間通過(guò)工作竊取算法來(lái)協(xié)調(diào)資源赎婚,提前昨晚任務(wù)的線程可以去“竊取”其他還沒(méi)有做完任務(wù)的線程的任務(wù)刘绣,而每一個(gè)線程都會(huì)持有一個(gè)雙端隊(duì)列,里面存儲(chǔ)著分配給自己的任務(wù)挣输,F(xiàn)ork/Join框架在實(shí)現(xiàn)上纬凤,為了防止線程之間的競(jìng)爭(zhēng),線程在消費(fèi)分配給自己的任務(wù)時(shí)歧焦,是從隊(duì)列頭取任務(wù)的移斩,而“竊取”線程則從隊(duì)列尾部取任務(wù)肚医。
Fork/Join框架通過(guò)fork方法來(lái)分割大任務(wù)绢馍,通過(guò)使用join來(lái)獲取小任務(wù)的結(jié)果,然后組合成大任務(wù)的結(jié)果肠套。關(guān)于Fork/Join任務(wù)模型舰涌,可以參考下面的圖片:
關(guān)于Java Stream API的相關(guān)內(nèi)容,可以參考該文章:Java Streams API你稚。
Stream在實(shí)現(xiàn)上使用了Fork/Join框架來(lái)實(shí)現(xiàn)并發(fā)瓷耙,所以使用Stream我們可以在不知不覺(jué)間就使得我們的程序跑得飛快,究其原因就是Stream使用了Fork/Join并發(fā)框架來(lái)處理任務(wù)刁赖,當(dāng)然搁痛,你需要顯示的指定Stream為parallel,否則Stream默認(rèn)都是串行流宇弛。比如對(duì)于Collection鸡典,你可以使用parallelStream來(lái)轉(zhuǎn)換為一個(gè)并發(fā)流,或者使用stream方法轉(zhuǎn)換為串行流枪芒,然后使用parallel操作使得串行流變?yōu)椴l(fā)流彻况。本文的重點(diǎn)是剖析Stream是如何使用Fork/Join來(lái)做并發(fā)的。
Stream的并發(fā)實(shí)現(xiàn)細(xì)節(jié)
在了解了Fork/Join并發(fā)框架和Java Stream之后舅踪,首要的問(wèn)題就是:Stream是如何使用Fork/Join框架來(lái)做到并發(fā)的纽甘?其實(shí)對(duì)于使用者來(lái)說(shuō),了解Stream就是通過(guò)Fork/Join框架來(lái)做的就好了抽碌,但是如果想要深入了解一下Fork/Join框架的實(shí)踐悍赢,以及Java Stream的設(shè)計(jì)方法,那么去讀一下實(shí)現(xiàn)的源碼還是很有必要的货徙,下文中的分析僅代表個(gè)人觀點(diǎn)左权!
需要注意的一點(diǎn)是,Java Stream的操作分為兩類破婆,也可以分為三類涮总,具體的細(xì)節(jié)可以參考該文章:Java Streams API。一個(gè)簡(jiǎn)單的判斷一個(gè)操作是否是Terminal操作還是Intermediate操作的方法是祷舀,如果操作返回的是一個(gè)新的Stream瀑梗,那么就是一個(gè)Intermediate操作烹笔,否則就是一個(gè)Terminal操作。
Intermediate:一個(gè)流可以后面跟隨零個(gè)或多個(gè) intermediate 操作抛丽。其目的主要是打開(kāi)流谤职,做出某種程度的數(shù)據(jù)操作,然后返回一個(gè)新的流亿鲜,交給下一個(gè)操作使用允蜈。這類操作都是惰性化的(lazy),就是說(shuō)蒿柳,僅僅調(diào)用到這類方法饶套,并沒(méi)有真正開(kāi)始流的遍歷。
Terminal:一個(gè)流只能有一個(gè) terminal 操作垒探,當(dāng)這個(gè)操作執(zhí)行后妓蛮,流就被使用“光”了,無(wú)法再被操作圾叼。所以這必定是流的最后一個(gè)操作蛤克。Terminal 操作的執(zhí)行,才會(huì)真正開(kāi)始流的遍歷夷蚊,并且會(huì)生成一個(gè)結(jié)果构挤,或者一個(gè) side effect。
-
還有一種操作被稱為 short-circuiting惕鼓。用以指:
- 對(duì)于一個(gè) intermediate 操作筋现,如果它接受的是一個(gè)無(wú)限大(infinite/unbounded)的 Stream,但返回一個(gè) 有限的新 Stream呜笑。
- 對(duì)于一個(gè) terminal 操作夫否,如果它接受的是一個(gè)無(wú)限大的 Stream,但能在有限的時(shí)間計(jì)算出結(jié)果叫胁。
Java Stream對(duì)四種類型的Terminal操作使用了Fork/Join實(shí)現(xiàn)了并發(fā)操作凰慈,下面的圖片展示了這四種操作類型:
我們首先來(lái)走一遍Stream操作的執(zhí)行路徑,下面的代碼是我們想要做的操作流驼鹅,下文會(huì)根據(jù)該代碼示例來(lái)跟蹤Stream的執(zhí)行路徑:
Stream.of(1,2,3,4)
.parallel()
.map(n -> n*2)
.collect(Collectors.toCollection(ArrayList::new));
解釋一下微谓,上面的代碼想要實(shí)現(xiàn)的功能是將(1,2输钩,3豺型,4)這四個(gè)數(shù)字每一個(gè)都變?yōu)槠渥陨淼膬杀叮缓笫占@些元素到一個(gè)ArrayList中返回买乃。這是一個(gè)非常簡(jiǎn)單的功能姻氨,下面是上面的操作流的執(zhí)行路徑:
step 1:
public static<T> Stream<T> of(T... values) {
return Arrays.stream(values);
}
step 2:
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
Objects.requireNonNull(mapper);
return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new Sink.ChainedReference<P_OUT, R>(sink) {
@Override
public void accept(P_OUT u) {
downstream.accept(mapper.apply(u));
}
};
}
};
}
step 3:
public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
...
container = evaluate(ReduceOps.makeRef(collector));
...
}
step 4:
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
step 5:
使用Fork/Join框架執(zhí)行操作。
上面的五個(gè)步驟是經(jīng)過(guò)一些省略的剪验,需要注意的一點(diǎn)是肴焊,intermediate類型的操作僅僅將操作加到一個(gè)upstream里面前联,具體的原文描述如下:
Construct a new Stream by appending a stateless intermediate operation to an existing stream.
比如上面我們的操作中的map操作,實(shí)際上只是將操作加到一個(gè)intermediate鏈條上面娶眷,不會(huì)立刻執(zhí)行似嗤。重點(diǎn)是第五步,Stream是如何使用Fork/Join來(lái)實(shí)現(xiàn)并發(fā)的届宠。evaluate這個(gè)方法至關(guān)重要烁落,在方法里面會(huì)分開(kāi)處理,對(duì)于設(shè)置了并發(fā)標(biāo)志的操作流豌注,會(huì)使用Fork/Join來(lái)并發(fā)執(zhí)行操作任務(wù)伤塌,而對(duì)于沒(méi)有打開(kāi)并發(fā)標(biāo)志的操作流,則串行執(zhí)行操作幌羞。
Fork/Join框架的核心方法是一個(gè)叫做compute的方法寸谜,下面分析一個(gè)forEach操作如何通過(guò)Fork/Join框架來(lái)實(shí)現(xiàn)并發(fā),通過(guò)追蹤代碼属桦,可以發(fā)現(xiàn)forEach的并發(fā)版本其實(shí)是一個(gè)交由一個(gè)ForEachTask對(duì)象來(lái)做,而ForEachTask類中實(shí)現(xiàn)了compute方法:
// Similar to AbstractTask but doesn't need to track child tasks
public void compute() {
Spliterator<S> rightSplit = spliterator, leftSplit;
long sizeEstimate = rightSplit.estimateSize(), sizeThreshold;
if ((sizeThreshold = targetSize) == 0L)
targetSize = sizeThreshold = AbstractTask.suggestTargetSize(sizeEstimate);
boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags());
boolean forkRight = false;
Sink<S> taskSink = sink;
ForEachTask<S, T> task = this;
while (!isShortCircuit || !taskSink.cancellationRequested()) {
if (sizeEstimate <= sizeThreshold ||
(leftSplit = rightSplit.trySplit()) == null) {
task.helper.copyInto(taskSink, rightSplit);
break;
}
ForEachTask<S, T> leftTask = new ForEachTask<>(task, leftSplit);
task.addToPendingCount(1);
ForEachTask<S, T> taskToFork;
if (forkRight) {
forkRight = false;
rightSplit = leftSplit;
taskToFork = task;
task = leftTask;
}
else {
forkRight = true;
taskToFork = leftTask;
}
taskToFork.fork();
sizeEstimate = rightSplit.estimateSize();
}
task.spliterator = null;
task.propagateCompletion();
}
}
在上面的代碼中將大任務(wù)拆成成了小任務(wù)他爸,那哪里收集了這些小任務(wù)呢聂宾?看下面的代碼:
@Override
public <S> Void evaluateParallel(PipelineHelper<T> helper,
Spliterator<S> spliterator) {
if (ordered)
new ForEachOrderedTask<>(helper, spliterator, this).invoke();
else
new ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();
return null;
}
可以看到調(diào)用了invoke方法,而對(duì)invoke的描述如下:
* Commences performing this task, awaits its completion if
* necessary, and returns its result, or throws an (unchecked)
* {@code RuntimeException} or {@code Error} if the underlying
* computation did so.
不是說(shuō)Fork/Join框架嘛诊笤?那有了fork為什么沒(méi)有join而是invoke呢系谐?下面是對(duì)join方法的描述:
* Returns the result of the computation when it {@link #isDone is
* done}. This method differs from {@link #get()} in that
* abnormal completion results in {@code RuntimeException} or
* {@code Error}, not {@code ExecutionException}, and that
* interrupts of the calling thread do <em>not</em> cause the
* method to abruptly return by throwing {@code
* InterruptedException}.
根據(jù)join的描述,我們知道還可以使用get方法來(lái)獲取結(jié)果讨跟,但是get方法會(huì)拋出異常而join和invoke方法都不會(huì)拋出異常纪他,而是將異常報(bào)告給ForkJoinTask,讓ForkJoinTask來(lái)拋出異常晾匠。