在Java8中,集合類Collection新增了兩個(gè)流方法渊季,分別是Stream()和ParallelStream()朋蔫。
什么是Stream
在Java8之前,我們通總是通過for循環(huán)或者Iterator迭代來重新排序合并數(shù)據(jù)却汉,又或者過重新定義Collections.sorts的Comparator方法來實(shí)現(xiàn)驯妄,這兩種方式對(duì)于大數(shù)據(jù)量系統(tǒng)耒說,效率并不是很理想合砂。
Java8添加了一個(gè)新的口類Stream,他和我們之前接觸的字節(jié)流概念不太一樣青扔,Java8集合的Stream相當(dāng)于高級(jí)版的Iterator,他可以通過Lambda表達(dá)式對(duì)集合進(jìn)行各種非便利、高效的聚合操作(AggregateOperation)翩伪,或者大批量數(shù)據(jù)操作(Bulk Data Operation).
Stream的聚合操作與數(shù)據(jù)庫SQL的聚臺(tái)操作sorted微猖、filter、map等類似幻工。我們?cè)趹?yīng)用層就可以高效地實(shí)現(xiàn)類似數(shù)據(jù)庫SQL的聚合操作了励两,而在數(shù)據(jù)作方面Stream不僅可以通過串行的方式實(shí)現(xiàn)數(shù)據(jù)操作,還可以通過并行的方式處理大批量數(shù)據(jù)囊颅,提高數(shù)據(jù)的處理效率当悔。
舉例:
過濾分組一所中學(xué)里身高在160cm以上的男女同學(xué)傅瞻,傳統(tǒng)的代碼實(shí)現(xiàn):
Map<String,List<Student>> stuMap = new HashMap<>();
for(Student stu:studentsList){
if(stu.getHeight() > 160){//身高大于160cm
if(stuMap.get(stu.getSex()) == null){//性別沒分類
List<Student> list = new ArrayList<>();
list.add(stu);//把學(xué)生放進(jìn)列表
stuMap.put(stu.getSex(),list);//把表放入map
} else {//該性別分類已存在
stuMap.get(stu.getSex()).add(stu);
}
}
}
再看看Java8中的Stream API進(jìn)行實(shí)現(xiàn):
//串行
Map<String,List<Student>> stuMap = stuList.Stream().filter(Student s) -> s.getHeight() > 160).collect(Collectors.groupingBy(Student ::getSex));
//并行
Map<String,List<Student>> stuMap = stuList.parallelStream().filter(Student s) -> s.getHeight() > 160).collect(Collectors.groupingBy(Student ::getSex));
可以看到,使用Stream結(jié)合Lambda表達(dá)式實(shí)現(xiàn)遍歷篩選功能非常簡單盲憎。
Stream如何優(yōu)化遍歷
Stream是如何做到優(yōu)化迭代的嗅骄?并行又是如何實(shí)現(xiàn)的?我們通過源碼看一下Stream的實(shí)現(xiàn)原理饼疙。
Stream操作分類
Stream的操作分類其實(shí)是實(shí)現(xiàn)高效迭代大數(shù)據(jù)集合的重要原因之一溺森。
官方將Stream中的操作分為兩大類:中間操作(Intermediate operations)和終結(jié)操作(Terminal operations)。中間操作只對(duì)操作進(jìn)行了記景窑眯,即只會(huì)返回一個(gè)流屏积,不會(huì)進(jìn)行計(jì)算操作,而終結(jié)操作是實(shí)現(xiàn)了計(jì)算操作磅甩。
中間操作又可以分為無狀態(tài)(Stateless)與有狀態(tài)(Stateful)操作炊林,前者是指元素的處理不受之前元素的影響,后者是指該操作只有拿到所有元素之后才能繼續(xù)下去卷要。
終結(jié)操作又可以分為短路(Short-circuiting)與非短路(Unshort-circuiting)操作渣聚,前者是指遇到某些符合條件的元素就可以得到最終結(jié)果,后者是指必須處理完所有元素才能得到最終結(jié)果僧叉。操作分類詳情如下圖所示:
我們通常還會(huì)將中間操作稱為懶操作奕枝,也正是由這種懶操作結(jié)合終結(jié)操作、數(shù)據(jù)源構(gòu)成的處理管道(Pipeline)瓶堕,實(shí)現(xiàn)了Stream的高效隘道。
Stream源碼實(shí)現(xiàn)
在了解Stream如何工作之前,我們先來了解下Stream包是由哪些主要結(jié)構(gòu)類組合而成的郎笆,各個(gè)類的職責(zé)參照下圖:
BaseStream和Stream為最頂端的接口類薄声。
BaseStream主要定義了流的基本接口方法,例如spliterator题画、isParallel等,Stream則定義了一些流的常用操作方法德频,例如苍息,map、filter等壹置。
ReferencePipeline是一個(gè)結(jié)構(gòu)類竞思,他通過定義內(nèi)部類組裝了各種操作流。他定義了Head钞护、StatelessOp盖喷、StatefulOp三個(gè)內(nèi)部類,實(shí)現(xiàn)了BaseStream與Stream的接口方法难咕。
Sink接口是定義每個(gè)Stream操作之間關(guān)系的協(xié)議课梳,他包含begin()距辆、end()、
cancellationRequested()暮刃、accpt()四個(gè)方法跨算。
ReferencePipeline最終會(huì)將整個(gè)Stream流操作組裝成一個(gè)調(diào)用鏈,而調(diào)用鏈上的各個(gè)Stream操作的上下關(guān)系就是通過Sink接口協(xié)議來定義實(shí)現(xiàn)的椭懊。
Stream操作疊加
一個(gè)Stream的各個(gè)操作是由處理管道組裝诸蚕,并統(tǒng)一完成數(shù)據(jù)處理的。在JDK每次的中斷操作會(huì)以使用階段(Stage)命名氧猬。
管道結(jié)構(gòu)通常是由ReferencePipeline類實(shí)現(xiàn)的背犯,前面提到過,ReferencePipeline包含了Head盅抚、StatelessOp漠魏、StatefulOp三種內(nèi)部類。
Head類主要來定義數(shù)據(jù)源操作泉哈,在我們初次調(diào)用xxx.stream()方法時(shí)蛉幸,會(huì)初次加載Head對(duì)象,此時(shí)為加載數(shù)據(jù)源操作丛晦。
接著加載的是中間操作奕纫,分別為無狀態(tài)中間操作StatelessOp對(duì)象和有狀態(tài)操作StatefulOp對(duì)象,此時(shí)的Stage并沒有執(zhí)行烫沙,而是通過AbstractPipeline生成了一個(gè)中間操作Stage鏈表匹层。
當(dāng)我們調(diào)用終結(jié)操作時(shí),會(huì)生成一個(gè)最終的Stage,通過這個(gè)個(gè)Stage觸發(fā)之前的中間操作锌蓄,從最后一個(gè)Stage開始升筏,遞歸產(chǎn)生一個(gè)Sink鏈。
如下圖所示:
下面通過一個(gè)例子來感受一下Stream的操作分類是如何實(shí)現(xiàn)高效迭代大數(shù)據(jù)集合的瘸爽。
List<String> names = Arrays.asList("張三","李四","王老五","李三","劉老四","王小二","張四","張五六七");
String maxLenStarWithZ = names.stream()
.filter(name -> name.startsWith("張"))
.mapToInt(String::length)
.max()
.toString();
這個(gè)例子的操作是查出一個(gè)以張開頭您访,長度最長的名字。從經(jīng)驗(yàn)上剪决,我們會(huì)認(rèn)為是這樣的操作流程:
- 首先遍歷一次集合灵汪,得到以‘張’開頭的所有名字;
- 遍歷一次filter得到的集合柑潦,將名字轉(zhuǎn)換成數(shù)字長度享言;
- 最后找到最長的名字并返回。
但渗鬼,實(shí)際情況并非這樣览露,下面我們來逐步分析一下這個(gè)操作是如何執(zhí)行的。
首先譬胎,由于names是ArrayList集合差牛,所以names.stream()方法將會(huì)調(diào)用集合類基礎(chǔ)接口Collection的Stream方法:
default Stream<E> stream(){
return StreamSupport.stream(spliterator(),false);
}
然后命锄,Stream方法會(huì)調(diào)用StreamSupport類的Stream方法,方法中初始化了一個(gè)ReferencePipeline的Head內(nèi)部類對(duì)象:
public static <T> Stream<T> (Spliterator<T> spliterator,boolean parallel) {
Objects.requireNonNull(spliterator);
return new ReferencePipeline.Head<>(spliterator,StreamOpFlag.fromCharacteristics(spliterator),parallel);
}
再調(diào)用filter和map方法多糠,這兩個(gè)方法都是無狀態(tài)的中間操作累舷,所以執(zhí)行filter和map作時(shí),并沒有進(jìn)行任何的操作夹孔,而是分別創(chuàng)建了一個(gè)Stage來標(biāo)識(shí)總戶的每一次操作被盈。
而通總情況下Stream的操作又要一個(gè)回調(diào)函數(shù),所以一個(gè)完整的Stage是由數(shù)據(jù)來源搭伤、操作只怎、回調(diào)函數(shù)組成的三元組來表示。如下圖所示怜俐,分別是ReferencePipeline的filter方法和map方法:
@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
Objects.requireNonNull(predicate);
return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
if (predicate.test(u))
downstream.accept(u);
}
};
}
};
}
@Override
@SuppressWarnings("unchecked")
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
Objects.requireNonNull(mapper);
return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new Sink.ChainedReference<P_OUT, R>(sink) {
@Override
public void accept(P_OUT u) {
downstream.accept(mapper.apply(u));
}
};
}
};
}
newStatelessOp將令調(diào)用父類AbstractPipeline的構(gòu)造函數(shù)身堡,這個(gè)構(gòu)造函數(shù)將前后的Stage聯(lián)系起來,生成一個(gè)Stage鏈表:
/**
* Constructor for appending an intermediate operation stage onto an
* existing pipeline.
*
* @param previousStage the upstream pipeline stage
* @param opFlags the operation flags for the new stage, described in
* {@link StreamOpFlag}
*/
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
if (previousStage.linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
previousStage.linkedOrConsumed = true;
previousStage.nextStage = this;
this.previousStage = previousStage;
this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
this.sourceStage = previousStage.sourceStage;
if (opIsStateful())
sourceStage.sourceAnyStateful = true;
this.depth = previousStage.depth + 1;
}
因?yàn)樵趧?chuàng)建每一個(gè)Stage時(shí)拍鲤,都會(huì)包含一個(gè)opWrapSink()方法贴谎,該方法會(huì)把一個(gè)操作的具體實(shí)現(xiàn)封裝在Sink類中,Sink采用(處理一>轉(zhuǎn)發(fā))的模式來疊加操作。
當(dāng)執(zhí)行了max方法時(shí)季稳,會(huì)調(diào)用ReferencePipeline的max方法擅这,此時(shí)由于ma×方法是終結(jié)操作,所以會(huì)創(chuàng)建一個(gè)TerminalOp操作,同時(shí)創(chuàng)建一個(gè)ReducingSink,并且將操作封裝在Sink類中景鼠。
@Override
public final Optional<P_OUT> max(Comparator<? super P_OUT> comparator) {
return reduce(BinaryOperator.maxBy(comparator));
}
最調(diào)用AbstractPipeline的wrapSink方法仲翎,該方法會(huì)調(diào)用opWrapSink生成一一Sink鏈表,Sink鏈表中的每一個(gè)Sink都封裝了一個(gè)操作的具體實(shí)現(xiàn)铛漓。
@Override
@SuppressWarnings("unchecked")
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
Objects.requireNonNull(sink);
for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
}
return (Sink<P_IN>) sink;
}
當(dāng)Sink鏈表生成完成后Stream開執(zhí)行溯香,通過spliterator迭代集合,執(zhí)行Sink鏈表中的具體操作浓恶。
@Override
@SuppressWarnings("unchecked")
final <P_IN> void copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
@SuppressWarnings({"rawtypes","unchecked"})
AbstractPipeline p = AbstractPipeline.this;
while (p.depth > 0) {
p = p.previousStage;
}
wrappedSink.begin(spliterator.getExactSizeIfKnown());
p.forEachWithCancel(spliterator, wrappedSink);
wrappedSink.end();
}
Java8的Spliterator的forEachRemaining會(huì)迭代集合玫坛,每迭代一次,都會(huì)執(zhí)行一次filter操作包晰,如果filter作通過昂秃,就會(huì)觸發(fā)map操作,然后將結(jié)果放入到臨時(shí)數(shù)組object杜窄,再進(jìn)行下一次的迭代。完成中間操作后算途,就會(huì)觸發(fā)終結(jié)操作max塞耕。以上就是Stream的串行處理方式了。
Stream并行處理
要實(shí)現(xiàn)Stream的并行處理嘴瓤,我們只要在例子中的代碼中新增一個(gè)Parallel()方法扫外。
List<String> names = Arrays.asList("張三","李四","王老五","李三","劉老四","王小二","張四","張五六七");
String maxLenStarWithZ = names.stream()
.parallel()
.filter(name -> name.startsWith("張"))
.mapToInt(String::length)
.max()
.toString();
Stream的并行處理在執(zhí)行終結(jié)操作之前莉钙,跟串行處理的實(shí)現(xiàn)是一樣的。而在調(diào)用終結(jié)方法之后筛谚,實(shí)現(xiàn)的方式就有點(diǎn)不太一樣磁玉,會(huì)調(diào)用TerminalOp的evaluateParallel方法進(jìn)行并行處理。
/**
* Evaluate the pipeline with a terminal operation to produce a result.
*
* @param <R> the type of result
* @param terminalOp the terminal operation to be applied to the pipeline.
* @return the result
*/
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
送里的并行處理指的是Stream結(jié)合了ForkJoin框架對(duì)Stream處理進(jìn)行了分片驾讲,Splititerator中的estimateSize方法會(huì)估算出分片的數(shù)據(jù)量蚊伞。
ForkJoin框架和估算算法,可以深入源碼分析下該算法的實(shí)現(xiàn)吮铭。
通過預(yù)估的數(shù)據(jù)量獲取最小處理單元的值时迫,如果當(dāng)前分片大小大于最小處理單元的值,就繼續(xù)切分集合每個(gè)分片將會(huì)生成一個(gè)Sink鏈表谓晌,當(dāng)所有的分片操作完成后ForkJoin框架將會(huì)合并分片任何結(jié)果集掠拳。
合理使用Stream
StreamAPI用起來簡潔,還能并行處理纸肉,我們將對(duì)常規(guī)的迭代溺欧、Stream串行迭代以及Stream并行迭代進(jìn)行性能測試對(duì)比,迭代循環(huán)中柏肪,我們將對(duì)數(shù)據(jù)進(jìn)行過濾分組等操作姐刁。分別進(jìn)行以下幾組測試:
測試環(huán)境 | 數(shù)據(jù)長度 | 數(shù)據(jù)類型 | 測試結(jié)果(ms) |
---|---|---|---|
多核CPU | 100 | int數(shù)組 | 常規(guī)<Stream并行<Stream串行 |
多核CPU | 1.00E+8 | int數(shù)組 | Stream并行<常規(guī)<Stream串行 |
多核CPU | 1.00E+8 | 對(duì)象數(shù)組 | Stream并行<常規(guī)<Stream串行 |
單核CPU | 1.00E+8 | 對(duì)象數(shù)組 | 常規(guī)<Stream串行<Stream并行 |
從上述測試可以看出,在多核大批量數(shù)據(jù)的情況下预吆,Stream并行操作的性能要好一些龙填,盲目使用Stream未必可以使系統(tǒng)性能更佳,還是要結(jié)合具體場景進(jìn)行選擇拐叉。
總結(jié)
縱觀Stream的設(shè)計(jì)實(shí)現(xiàn)岩遗,非常值得我們學(xué)習(xí)。從大的設(shè)計(jì)方向上來說凤瘦,Stream將整個(gè)操作分解為了鏈?zhǔn)浇Y(jié)構(gòu)宿礁,不僅簡化了遍歷操作,還為實(shí)現(xiàn)了并行計(jì)算打下了基礎(chǔ)蔬芥。
從小的分類方向上來說梆靖,Stream將遍歷元素的操作和對(duì)元素的計(jì)算分為中間操作和終結(jié)操作,而中間操作又根據(jù)元素之間狀態(tài)有無干擾分為有狀態(tài)和無狀態(tài)操作笔诵,實(shí)現(xiàn)了鏈結(jié)構(gòu)中的不同階段返吻。、
在串行處理作中乎婿,Stream在執(zhí)行每一步中間操作時(shí)测僵,并不會(huì)做實(shí)際的數(shù)據(jù)操作處理,而是將這些中間操作串聯(lián)起來最終由終結(jié)操作觸發(fā),生成一個(gè)數(shù)據(jù)處理鏈表通過Java8的Spliterator迭代器進(jìn)行數(shù)據(jù)處理捍靠;此時(shí)沐旨,執(zhí)行一次迭代,就對(duì)所有的無狀態(tài)的中間操作進(jìn)行數(shù)據(jù)處理榨婆,而對(duì)有狀態(tài)的甲間操作磁携,就要迭代處理完所有的數(shù)據(jù),再進(jìn)行處理作良风;最后就是進(jìn)行終結(jié)操作的數(shù)據(jù)處理谊迄。
在并行處理操作中,Stream對(duì)中間操作基本跟串行處理方式是一樣的拖吼,但在終結(jié)操作中鳞上,Stream將結(jié)合ForkJoin框架對(duì)集合進(jìn)行切片處理,F(xiàn)orkJoin框架將每個(gè)切片的處理結(jié)果Join合并起來吊档。最后就是要注意Stream的使用場景篙议。