stream描述
用于支持元素流上的功能樣式操作的類伺通,例如集合上的map-reduce轉(zhuǎn)換。例如:
int sum = widgets.stream()
.filter(b -> b.getColor() == RED)
.mapToInt(b -> b.getWeight())
.sum();
這里我們使用widgets
a Collection<Widget>
作為流的源荔棉,然后對(duì)流執(zhí)行filter-map-reduce以獲得紅色小部件的權(quán)重之和。(求和是減少 操作的一個(gè)例子。)
此包中引入的關(guān)鍵抽象是流刹前。類Stream
集索,IntStream
屿愚, LongStream
,和DoubleStream
超過(guò)目的和原始流int
务荆,long
和 double
類型妆距。Streams在幾個(gè)方面與集合不同:
- 沒有存儲(chǔ)空間 流不是存儲(chǔ)元素的數(shù)據(jù)結(jié)構(gòu); 相反,它通過(guò)計(jì)算操作管道傳遞來(lái)自諸如數(shù)據(jù)結(jié)構(gòu)函匕,數(shù)組娱据,生成器函數(shù)或I / O通道的源的元素。
- 功能性浦箱。對(duì)流的操作會(huì)產(chǎn)生結(jié)果吸耿,但不會(huì)修改其源祠锣。例如,過(guò)濾
Stream
從集合中獲取的內(nèi)容會(huì)生成一個(gè)Stream
沒有過(guò)濾元素的新元素咽安,而不是從源集合中刪除元素伴网。 - 懶惰尋求。許多流操作(例如過(guò)濾妆棒,映射或重復(fù)刪除)可以懶惰地實(shí)現(xiàn)澡腾,從而暴露出優(yōu)化的機(jī)會(huì)。例如糕珊,“找到
String
具有三個(gè)連續(xù)元音的第一個(gè)”不需要檢查所有輸入字符串动分。流操作分為中間(生成Stream
)操作和終端(產(chǎn)生價(jià)值或副作用)操作。中間操作總是很懶惰红选。 - 可能是無(wú)限的澜公。雖然集合的大小有限,但流不需要喇肋。短路操作坟乾,例如
limit(n)
或findFirst()
可以允許無(wú)限流上的計(jì)算在有限時(shí)間內(nèi)完成。 - 耗材蝶防。流的元素僅在流的生命期間訪問(wèn)過(guò)一次甚侣。像
Iterator
a 一樣,必須生成一個(gè)新流來(lái)重新訪問(wèn)源的相同元素间学。
流可以通過(guò)多種方式獲得殷费。一些例子包括:
- 來(lái)自
Collection
通過(guò)stream()
和parallelStream()
方法; - 從陣列通過(guò)
Arrays.stream(Object[])
; - 來(lái)自流類的靜態(tài)工廠方法,例如
Stream.of(Object[])
低葫,IntStream.range(int, int)
或Stream.iterate(Object, UnaryOperator)
; - 文件的行可以從
BufferedReader.lines()
; - 文件路徑流可以從方法中獲得
Files
; - 隨機(jī)數(shù)的流可以從
Random.ints()
; - 在JDK許多其他流的軸承的方法详羡,包括
BitSet.stream()
,Pattern.splitAsStream(java.lang.CharSequence)
氮采,和JarFile.stream()
殷绍。
使用這些技術(shù),第三方庫(kù)可以提供其他流源 鹊漠。
流操作和管道
流操作分為中間操作和 終端操作主到,并組合成流管道。流管道由源(例如 Collection
躯概,數(shù)組登钥,生成器函數(shù)或I / O通道)組成; 然后是零或更多的中間操作,如 Stream.filter
或Stream.map
; 和終端操作娶靡,如Stream.forEach
或Stream.reduce
牧牢。
中間操作返回一個(gè)新流。他們總是 懶惰 ; 執(zhí)行中間操作,例如 filter()
實(shí)際上不執(zhí)行任何過(guò)濾塔鳍,而是創(chuàng)建一個(gè)新流伯铣,當(dāng)遍歷時(shí),它包含與給定謂詞匹配的初始流的元素轮纫。在執(zhí)行管道的終端操作之前腔寡,不會(huì)開始遍歷管道源。
終端操作(例如Stream.forEach
或 IntStream.sum
)可以遍歷流以產(chǎn)生結(jié)果或副作用掌唾。在執(zhí)行終端操作之后放前,流管道被認(rèn)為已消耗,并且不能再使用; 如果需要再次遍歷同一數(shù)據(jù)源糯彬,則必須返回?cái)?shù)據(jù)源以獲取新流凭语。在幾乎所有情況下,終端操作都很渴望撩扒,在返回之前完成數(shù)據(jù)源的遍歷和管道的處理似扔。只有終端操作iterator()
而 spliterator()
不是; 這些是作為“逃生艙口”提供的,以便在現(xiàn)有操作不足以執(zhí)行任務(wù)時(shí)啟用任意客戶端控制的管道遍歷却舀。
懶惰地處理流可以顯著提高效率; 在諸如上面的filter-map-sum示例的流水線中虫几,過(guò)濾,映射和求和可以融合到數(shù)據(jù)的單個(gè)傳遞中挽拔,具有最小的中間狀態(tài)。懶惰還允許在沒有必要時(shí)避免檢查所有數(shù)據(jù); 對(duì)于諸如“查找超過(guò)1000個(gè)字符的第一個(gè)字符串”之類的操作但校,只需要檢查足夠的字符串以找到具有所需特征的字符串螃诅,而不檢查源中可用的所有字符串。(當(dāng)輸入流是無(wú)限的而不僅僅是大的時(shí)候状囱,這種行為變得更加重要术裸。)
中間操作進(jìn)一步分為無(wú)狀態(tài)操作 和有狀態(tài)操作。無(wú)狀態(tài)操作(例如filter
和)map
在處理新元素時(shí)不保留先前看到的元素的狀態(tài) - 每個(gè)元素都可以獨(dú)立于其他元素的操作進(jìn)行處理亭枷。狀態(tài)操作(例如 distinct
和sorted
)可以在處理新元素時(shí)包含先前看到的元素的狀態(tài)袭艺。
有狀態(tài)操作可能需要在生成結(jié)果之前處理整個(gè)輸入。例如叨粘,在查看流的所有元素之前猾编,不能通過(guò)對(duì)流進(jìn)行排序來(lái)產(chǎn)生任何結(jié)果。因此升敲,在并行計(jì)算下答倡,一些包含有狀態(tài)中間操作的管道可能需要對(duì)數(shù)據(jù)進(jìn)行多次傳遞,或者可能需要緩沖重要數(shù)據(jù)驴党。僅包含無(wú)狀態(tài)中間操作的管道可以在一次通過(guò)中處理瘪撇,無(wú)論是順序還是并行,具有最小的數(shù)據(jù)緩沖。
此外倔既,一些操作被認(rèn)為是短路操作恕曲。如果在呈現(xiàn)無(wú)限輸入時(shí),它可能產(chǎn)生有限流渤涌,則中間操作是短路的码俩。如果在呈現(xiàn)無(wú)限輸入時(shí)它可以在有限時(shí)間內(nèi)終止,則終端操作是短路的歼捏。在流水線中進(jìn)行短路操作是處理無(wú)限流以在有限時(shí)間內(nèi)正常終止的必要但不充分的條件稿存。
并行
具有顯式for-
循環(huán)的處理元素本質(zhì)上是串行的。Streams通過(guò)將計(jì)算重新定義為聚合操作的流水線而不是作為每個(gè)單獨(dú)元素的命令操作來(lái)促進(jìn)并行執(zhí)行瞳秽。所有流操作都可以串行或并行執(zhí)行瓣履。除非明確請(qǐng)求并行性,否則JDK中的流實(shí)現(xiàn)會(huì)創(chuàng)建串行流练俐。例如袖迎,Collection
有方法Collection.stream()
和Collection.parallelStream()
分別生成順序和并行流; 其他流方法,例如IntStream.range(int, int)
產(chǎn)生順序流腺晾,但這些流可以通過(guò)調(diào)用它們的BaseStream.parallel()
方法有效地并行化燕锥。要并行執(zhí)行先前的“窗口小部件權(quán)重總和”查詢,我們會(huì)這樣做:
int sumOfWeights = widgets.
此示例的串行和并行版本之間的唯一區(qū)別是創(chuàng)建初始流悯蝉,使用“ parallelStream()
”而不是“ stream()
”归形。當(dāng)啟動(dòng)終端操作時(shí),根據(jù)調(diào)用它的流的方向鼻由,順序地或并行地執(zhí)行流管道暇榴。可以使用該isParallel()
方法確定流是以串行還是并行方式執(zhí)行蕉世,并且可以使用BaseStream.sequential()
和BaseStream.parallel()
操作來(lái)修改流的方向 蔼紧。當(dāng)啟動(dòng)終端操作時(shí),根據(jù)調(diào)用它的流的模式狠轻,順序地或并行地執(zhí)行流管道奸例。
除了被識(shí)別為明確不確定的操作之外,例如findAny()
向楼,流是順序執(zhí)行還是并行執(zhí)行不應(yīng)該改變計(jì)算結(jié)果查吊。
大多數(shù)流操作接受描述用戶指定行為的參數(shù),這些參數(shù)通常是lambda表達(dá)式蜜自。為了保持正確的行為菩貌,這些行為參數(shù)必須是非干擾的,并且在大多數(shù)情況下必須是無(wú)狀態(tài)的重荠。這些參數(shù)總是函數(shù)接口的實(shí)例箭阶,例如Function
虚茶,并且通常是lambda表達(dá)式或方法引用。
不干涉
Streams使您能夠在各種數(shù)據(jù)源上執(zhí)行可能并行的聚合操作仇参,包括甚至非線程安全的集合嘹叫,例如 ArrayList
。只有在執(zhí)行流管道期間我們能夠防止干擾數(shù)據(jù)源時(shí)诈乒,才有可能實(shí)現(xiàn)這一點(diǎn) 罩扇。除了轉(zhuǎn)義艙口的操作iterator()
和 spliterator()
,被調(diào)用的終端操作時(shí)開始執(zhí)行怕磨,并且終端操作完成時(shí)結(jié)束喂饥。對(duì)于大多數(shù)數(shù)據(jù)源,防止干擾意味著確保數(shù)據(jù)源根本 不被修改在流管道的執(zhí)行期間肠鲫。值得注意的例外是其源是并發(fā)集合的流员帮,這些集合專門用于處理并發(fā)修改。并發(fā)流源是那些Spliterator
報(bào)告 CONCURRENT
特征的源导饲。
因此捞高,源流可能不是并發(fā)的流管道中的行為參數(shù)永遠(yuǎn)不應(yīng)該修改流的數(shù)據(jù)源。如果行為參數(shù)修改或?qū)е滦薷牧鞯臄?shù)據(jù)源渣锦,則該行為參數(shù)會(huì)干擾非并發(fā)數(shù)據(jù)源硝岗。不干涉的需要適用于所有管道,而不僅僅是并行管道袋毙。除非流源是并發(fā)的型檀,否則在執(zhí)行流管道期間修改流的數(shù)據(jù)源可能會(huì)導(dǎo)致異常,錯(cuò)誤答案或不一致的行為娄猫。對(duì)于性能良好的流源贱除,可以在終端操作開始之前修改源,并且這些修改將反映在所覆蓋的元素中媳溺。例如,請(qǐng)考慮以下代碼:
List<String> l = new ArrayList(Arrays.asList("one", "two"));
Stream<String> sl = l.stream();
l.add("three");
String s = sl.collect(joining(" "));
首先創(chuàng)建一個(gè)包含兩個(gè)字符串的列表:“one”; 和“兩個(gè)”碍讯。然后從該列表創(chuàng)建流悬蔽。接下來(lái),通過(guò)添加第三個(gè)字符串來(lái)修改列表:“three”捉兴。最后蝎困,收集流的元素并將它們連接在一起。由于在終端collect
操作開始之前修改了列表倍啥,因此結(jié)果將是一串“一二三”禾乘。從JDK集合和大多數(shù)其他JDK類返回的所有流都以這種方式表現(xiàn)良好; 對(duì)于其他庫(kù)生成的流,請(qǐng)參閱 低級(jí)流構(gòu)造虽缕,以了解構(gòu)建行為良好的流的要求始藕。
無(wú)國(guó)籍行為
如果流操作的行為參數(shù)是有狀態(tài)的,則流管道結(jié)果可能是不確定的或不正確的。有狀態(tài)lambda(或?qū)崿F(xiàn)適當(dāng)功能接口的其他對(duì)象)的結(jié)果取決于在流管道執(zhí)行期間可能發(fā)生變化的任何狀態(tài)伍派。有狀態(tài)lambda的一個(gè)示例是map()
in中的參數(shù):
Set<Integer> seen = Collections.synchronizedSet(new HashSet<>());
stream.parallel().map(e -> { if (seen.add(e)) return 0; else return e; })...
這里江耀,如果映射操作是并行執(zhí)行的,則由于線程調(diào)度差異诉植,相同輸入的結(jié)果可能因運(yùn)行而不同祥国,而對(duì)于無(wú)狀態(tài)lambda表達(dá)式,結(jié)果將始終相同晾腔。
另請(qǐng)注意舌稀,嘗試從行為參數(shù)訪問(wèn)可變狀態(tài)會(huì)給您在安全性和性能方面做出錯(cuò)誤的選擇; 如果您不同步對(duì)該狀態(tài)的訪問(wèn),則會(huì)出現(xiàn)數(shù)據(jù)爭(zhēng)用灼擂,因此您的代碼已損壞壁查,但如果您同步訪問(wèn)該狀態(tài),則存在爭(zhēng)用的風(fēng)險(xiǎn)會(huì)破壞您希望從中受益的并行性缤至。最好的方法是避免有狀態(tài)的行為參數(shù)完全流動(dòng)操作; 通常有一種方法可以重構(gòu)流管道以避免有狀態(tài)潮罪。
副作用
通常,不鼓勵(lì)行為參數(shù)對(duì)流操作的副作用领斥,因?yàn)樗鼈兺ǔ?huì)導(dǎo)致無(wú)意中違反無(wú)國(guó)籍要求以及其他線程安全危險(xiǎn)嫉到。
如果行為參數(shù)確實(shí)有副作用,除非明確說(shuō)明月洛,否則不能保證 這些副作用對(duì)其他線程的 可見性何恶,也不保證對(duì)同一流管道中“相同”元素的不同操作在同一個(gè)線程中執(zhí)行。此外嚼黔,這些效果的排序可能令人驚訝细层。即使管道被約束以產(chǎn)生與流源的遭遇順序一致的 結(jié)果(例如,IntStream.range(0,5).parallel().map(x -> x*2).toArray()
必須產(chǎn)生[0, 2, 4, 6, 8]
)唬涧,也不保證將映射器函數(shù)應(yīng)用于各個(gè)元素的順序疫赎,或者什么線程為給定元素執(zhí)行任何行為參數(shù)。
許多可能試圖使用副作用的計(jì)算可以更安全和有效地表達(dá)而沒有副作用碎节,例如使用 減少而不是可變累加器捧搞。但是,諸如println()
用于調(diào)試目的的副作用通常是無(wú)害的狮荔。少量的流操作胎撇,例如 forEach()
和peek()
,只能通過(guò)副作用操作; 這些應(yīng)該小心使用殖氏。
作為如何將不適當(dāng)?shù)厥褂酶弊饔玫牧鞴艿擂D(zhuǎn)換為不使用副作用的流管道的示例晚树,以下代碼在字符串流中搜索與給定正則表達(dá)式匹配的那些,并將匹配放在列表中雅采。
ArrayList<String> results = new ArrayList<>();
stream.filter(s -> pattern.matcher(s).matches())
.forEach(s -> results.add(s)); // Unnecessary use of side-effects!
此代碼不必要地使用副作用爵憎。如果并行執(zhí)行慨亲,非線程安全性ArrayList
將導(dǎo)致不正確的結(jié)果,并且添加所需的同步將導(dǎo)致爭(zhēng)用纲堵,從而破壞并行性的好處巡雨。此外,在這里使用副作用是完全沒有必要的; 在forEach()
可以簡(jiǎn)單地用一個(gè)縮小操作是更安全席函,更有效铐望,并且更適合于并行替換:
List<String>results =
stream.filter(s -> pattern.matcher(s).matches())
.collect(Collectors.toList()); // No side-effects!
訂購(gòu)
流可能有也可能沒有已定義的遭遇順序。流是否具有遭遇順序取決于源和中間操作茂附。某些流源(例如List
或數(shù)組)本質(zhì)上是有序的正蛙,而其他(例如HashSet
)則不是。一些中間操作(例如sorted()
营曼,可以)在其他無(wú)序流上施加遭遇順序乒验,而其他中間操作可以呈現(xiàn)無(wú)序的有序流,例如BaseStream.unordered()
蒂阱。此外锻全,一些終端操作可以忽略遭遇順序,例如 forEach()
录煤。
如果訂購(gòu)了流鳄厌,則大多數(shù)操作都被約束為對(duì)其遭遇順序中的元素進(jìn)行操作; 如果流的源是List
包含的[1, 2, 3]
,那么執(zhí)行的結(jié)果map(x -> x*2)
必須是[2, 4, 6]
妈踊。但是了嚎,如果源沒有定義的遭遇順序,則值的任何排列[2, 4, 6]
都將是有效結(jié)果廊营。
對(duì)于順序流歪泳,遭遇順序的存在與否不會(huì)影響性能,只影響確定性露筒。如果訂購(gòu)了流呐伞,則在相同的源上重復(fù)執(zhí)行相同的流管道將產(chǎn)生相同的結(jié)果; 如果沒有訂購(gòu),重復(fù)執(zhí)行可能會(huì)產(chǎn)生不同的結(jié)果慎式。
對(duì)于并行流荸哟,放寬排序約束有時(shí)可以實(shí)現(xiàn)更高效的執(zhí)行。如果元素的排序不相關(guān)瞬捕,則可以更有效地實(shí)現(xiàn)某些聚合操作,例如過(guò)濾重復(fù)項(xiàng)(distinct()
)或分組縮減(Collectors.groupingBy()
)舵抹。類似地肪虎,與遇到訂單本質(zhì)上相關(guān)的操作(例如limit()
)可能需要緩沖以確保正確排序,從而破壞并行性的好處惧蛹。如果流具有遭遇順序扇救,但用戶并不特別關(guān)心該遭遇順序刑枝,則使用明確地對(duì)流進(jìn)行排序unordered()
可以改善某些有狀態(tài)或終端操作的并行性能。然而迅腔,大多數(shù)流管道装畅,例如上面的“塊的權(quán)重總和”示例,即使在排序約束下仍然有效地并行化沧烈。
減少操作
甲減少操作(也被稱為倍)取輸入元素的序列掠兄,并通過(guò)組合操作的反復(fù)應(yīng)用,例如找到一組數(shù)字锌雀,或累積元素的總和或最大到一個(gè)列表它們組合成一個(gè)單一的匯總結(jié)果蚂夕。該流的類具有普遍減少操作,所謂的多種形式 reduce()
和collect()
腋逆,以及多個(gè)專業(yè)化還原的形式婿牍,如 sum()
,max()
或count()
惩歉。
當(dāng)然等脂,這樣的操作可以很容易地實(shí)現(xiàn)為簡(jiǎn)單的順序循環(huán),如:
int sum = 0;
for (int x : numbers) {
sum += x;
}
然而撑蚌,有充分理由優(yōu)先考慮減少操作而不是如上所述的變異累積上遥。簡(jiǎn)化不僅“更抽象” - 它作為一個(gè)整體而不是單個(gè)元素在整個(gè)流上運(yùn)行 - 但正確構(gòu)造的reduce操作本質(zhì)上是可并行化的,只要用于處理元素的函數(shù)是關(guān)聯(lián)的和 無(wú)國(guó)籍的锨并。例如露该,給定我們想要找到總和的數(shù)字流,我們可以寫:
int sum = numbers.stream().reduce(0, (x,y) -> x+y);
要么:
int sum = numbers.stream().reduce(0, Integer::sum);
這些減少操作可以安全地并行運(yùn)行第煮,幾乎不需要修改:
int sum = numbers.parallelStream().reduce(0, Integer::sum);
減少并行很好解幼,因?yàn)閷?shí)現(xiàn)可以并行地對(duì)數(shù)據(jù)的子集進(jìn)行操作,然后組合中間結(jié)果以獲得最終的正確答案包警。(即使語(yǔ)言具有“并行for-each”結(jié)構(gòu)撵摆,變異累積方法仍然需要開發(fā)人員為共享累積變量提供線程安全更新sum
,然后所需的同步可能會(huì)消除并行性帶來(lái)的任何性能提升害晦。 )reduce()
相反怒允,使用刪除了并行化還原操作的所有負(fù)擔(dān)雳灾,并且?guī)炜梢蕴峁┯行У牟⑿袑?shí)現(xiàn),而無(wú)需額外的同步。
前面顯示的“小部件”示例顯示了簡(jiǎn)化如何與其他操作結(jié)合使用批量操作替換循環(huán)万哪。如果widgets
是Widget
具有getWeight
方法的對(duì)象集合,我們可以找到最重的小部件:
OptionalInt heaviest = widgets.parallelStream()
.mapToInt(Widget::getWeight)
.max();
在更一般的形式中政钟,reduce
對(duì)類型元素的操作<T>
產(chǎn)生類型 的結(jié)果<U>
需要三個(gè)參數(shù):
<U> U reduce(U identity,
BiFunction<U, ? super T, U> accumulator,
BinaryOperator<U> combiner);
這里蜈敢,identity元素既是縮減的初始種子值,也是沒有輸入元素的默認(rèn)結(jié)果殴俱。所述累加器 函數(shù)接受部分結(jié)果和下一個(gè)元素政冻,并產(chǎn)生一個(gè)新的部分結(jié)果枚抵。該組合功能結(jié)合了兩個(gè)部分結(jié)果產(chǎn)生一個(gè)新的部分結(jié)果。(組合器在并行縮減中是必需的明场,其中輸入被分區(qū)汽摹,為每個(gè)分區(qū)計(jì)算部分累積,然后組合部分結(jié)果以產(chǎn)生最終結(jié)果苦锨。)
更正式地說(shuō)逼泣,該identity
值必須是組合器函數(shù)的標(biāo)識(shí)。這意味著逆屡,對(duì)所有人來(lái)說(shuō)u
圾旨, combiner.apply(identity, u)
等于u
。此外魏蔗,該 combiner
功能必須是關(guān)聯(lián)的砍的,必須與兼容accumulator
功能:對(duì)所有u
和t
,combiner.apply(u, accumulator.apply(identity, t))
一定要equals()
到accumulator.apply(u, t)
莺治。
三參數(shù)形式是兩參數(shù)形式的概括廓鞠,將映射步驟結(jié)合到累積步驟中。我們可以使用更一般的形式重新構(gòu)建簡(jiǎn)單的權(quán)重總和示例谣旁,如下所示:
int sumOfWeights = widgets.stream()
.reduce(0,
(sum, b) -> sum + b.getWeight())
Integer::sum);
雖然顯式的map-reduce形式更具可讀性床佳,因此通常應(yīng)該是首選。通過(guò)將映射和縮減組合成單個(gè)函數(shù)榄审,可以優(yōu)化遠(yuǎn)離重要工作的情況提供通用形式砌们。
可變減少
甲可變歸約運(yùn)算累積輸入元件到一個(gè)可變的結(jié)果的容器,如一個(gè)Collection
或StringBuilder
搁进,作為其處理流中的元素浪感。
如果我們想要獲取字符串流并將它們連接成一個(gè)長(zhǎng)字符串,我們可以通過(guò)普通減少來(lái)實(shí)現(xiàn):
String concatenated = strings.reduce("", String::concat)
我們會(huì)得到理想的結(jié)果饼问,甚至可以并行工作影兽。但是,我們可能對(duì)性能不滿意莱革!這樣的實(shí)現(xiàn)將進(jìn)行大量的字符串復(fù)制峻堰,并且運(yùn)行時(shí)間將是字符數(shù)的O(n ^ 2)。一種更高效的方法是將結(jié)果累積到a中StringBuilder
盅视,這是一個(gè)用于累積字符串的可變?nèi)萜骶杳N覀兛梢允褂孟嗤募夹g(shù)來(lái)并行化可變縮減,就像我們使用普通縮減一樣闹击。
調(diào)用可變縮減操作 collect()
桐筏,因?yàn)樗鼘⑺杞Y(jié)果收集到諸如a的結(jié)果容器中Collection
。甲collect
操作要求三個(gè)功能:一個(gè)供應(yīng)商功能構(gòu)建結(jié)果容器,蓄能器功能并入一個(gè)輸入元件到結(jié)果容器和組合功能的新實(shí)例的一個(gè)結(jié)果容器的內(nèi)容物合并到另一個(gè)梅忌。這種形式與普通減少的一般形式非常相似:
<R> R collect(Supplier<R> supplier,
BiConsumer<R, ? super T> accumulator,
BiConsumer<R, R> combiner);
與此同樣reduce()
,collect
以這種抽象方式表達(dá)的好處是它直接適用于并行化:我們可以并行累積部分結(jié)果然后將它們組合除破,只要累積和組合函數(shù)滿足適當(dāng)?shù)囊蠹纯赡恋@纾獙⒘髦性氐腟tring表示形式收集到一個(gè)中ArrayList
瑰枫,我們可以為每個(gè)表單編寫明顯的順序:
ArrayList<String> strings = new ArrayList<>();
for (T element : stream) {
strings.add(element.toString());
}
或者我們可以使用可并行化的收集表單:
ArrayList<String> strings = stream.collect(() -> new ArrayList<>(),
(c, e) -> c.add(e.toString()),
(c1, c2) -> c1.addAll(c2));
或者踱葛,將映射操作從累加器函數(shù)中拉出來(lái),我們可以更簡(jiǎn)潔地表達(dá)它:
List<String> strings = stream.map(Object::toString)
.collect(ArrayList::new, ArrayList::add, ArrayList::addAll);
在這里光坝,我們的供應(yīng)商只是ArrayList constructor
尸诽,累加器將字符串化元素添加到一個(gè) ArrayList
,而組合器只是用于addAll
將字符串從一個(gè)容器復(fù)制到另一個(gè)容器盯另。
collect
供應(yīng)商性含,累加器和組合器三個(gè)方面緊密耦合。我們可以使用a的抽象 Collector
來(lái)捕獲所有這三個(gè)方面鸳惯。上面用于將字符串收集到a中的示例List
可以使用標(biāo)準(zhǔn)重寫Collector
:
List<String> strings = stream.map(Object::toString)
.collect(Collectors.toList());
將可變減少包裝到收集器中具有另一個(gè)優(yōu)點(diǎn):可組合性商蕴。該類Collectors
包含許多用于收集器的預(yù)定義工廠,包括將一個(gè)收集器轉(zhuǎn)換為另一個(gè)收集器的組合器芝发。例如绪商,假設(shè)我們有一個(gè)收集器來(lái)計(jì)算員工流的工資總和,如下所示:
Collector<Employee, ?, Integer> summingSalaries
= Collectors.summingInt(Employee::getSalary);
(在?
第二個(gè)類型參數(shù)僅僅表明我們不關(guān)心這個(gè)收集器所使用的中間表示)辅鲸。如果我們想創(chuàng)造一個(gè)收藏家制表按部門工資的總和格郁,我們可以重復(fù)summingSalaries
使用 groupingBy
:
Map<Department, Integer> salariesByDept
= employees.stream().collect(Collectors.groupingBy(Employee::getDepartment,
summingSalaries));
與常規(guī)還原操作一樣,collect()
只有滿足適當(dāng)?shù)臈l件才能并行化操作独悴。對(duì)于任何部分累積的結(jié)果例书,將其與空結(jié)果容器組合必須產(chǎn)生等效結(jié)果。也就是說(shuō)绵患,對(duì)于作為p
任何一系列累加器和組合器調(diào)用的結(jié)果的部分累積結(jié)果 雾叭,p
必須等效于 combiner.apply(p, supplier.get())
。
此外落蝙,然而织狐,計(jì)算是分開的,它必須產(chǎn)生等效的結(jié)果筏勒。對(duì)于任何輸入元件t1
和t2
移迫,結(jié)果 r1
和r2
在計(jì)算下面必須是等價(jià)的:
A a1 = supplier.get();
accumulator.accept(a1, t1);
accumulator.accept(a1, t2);
R r1 = finisher.apply(a1); // result without splitting
A a2 = supplier.get();
accumulator.accept(a2, t1);
A a3 = supplier.get();
accumulator.accept(a3, t2);
R r2 = finisher.apply(combiner.apply(a2, a3)); // result with splitting
在這里,等價(jià)通常意味著根據(jù)Object.equals(Object)
管行。但在某些情況下厨埋,可以放寬等同性以解釋順序的差異。
減少捐顷,并發(fā)和排序
通過(guò)一些復(fù)雜的簡(jiǎn)化操作荡陷,例如collect()
產(chǎn)生一個(gè)Map
雨效,例如:
Map<Buyer, List<Transaction>> salesByBuyer
= txns.parallelStream()
.collect(Collectors.groupingBy(Transaction::getBuyer));
實(shí)際上并行執(zhí)行操作可能會(huì)適得其反。這是因?yàn)?code>Map對(duì)于某些Map
實(shí)現(xiàn)废赞,組合步驟(通過(guò)密鑰將一個(gè)合并到另一個(gè))可能是昂貴的徽龟。
但是,假設(shè)在此縮減中使用的結(jié)果容器是可同時(shí)修改的集合 - 例如a ConcurrentHashMap
唉地。在這種情況下据悔,累加器的并行調(diào)用實(shí)際上可以將它們的結(jié)果同時(shí)存入同一個(gè)共享結(jié)果容器中,從而消除了組合器合并不同結(jié)果容器的需要耘沼。這可能會(huì)提升并行執(zhí)行性能极颓。我們稱之為 同時(shí)減少。
Collector
支持并發(fā)縮減的A 標(biāo)記為Collector.Characteristics.CONCURRENT
特征群嗤。然而菠隆,并發(fā)收集也有缺點(diǎn)。如果多個(gè)線程同時(shí)將結(jié)果存入共享容器骚烧,則存儲(chǔ)結(jié)果的順序是不確定的浸赫。因此,只有在對(duì)正在處理的流不重要的情況下赃绊,才能實(shí)現(xiàn)并發(fā)減少既峡。如果Stream.collect(Collector)
實(shí)現(xiàn),實(shí)現(xiàn)將僅執(zhí)行并發(fā)減少
- 流是平行的;
- 收集器具有
Collector.Characteristics.CONCURRENT
特征碧查,并且; - 流是無(wú)序的运敢,或者收集器具有
Collector.Characteristics.UNORDERED
特征。
您可以使用該BaseStream.unordered()
方法確保流無(wú)序 忠售。例如:
Map<Buyer, List<Transaction>> salesByBuyer
= txns.parallelStream()
.unordered()
.collect(groupingByConcurrent(Transaction::getBuyer));
(Collectors.groupingByConcurrent(java.util.function.Function<? super T, ? extends K>)
并發(fā)等效于何處groupingBy
)传惠。
請(qǐng)注意,如果給定鍵的元素按照它們?cè)谠粗谐霈F(xiàn)的順序出現(xiàn)很重要稻扬,那么我們就不能使用并發(fā)縮減卦方,因?yàn)榕判蚴遣l(fā)插入的犧牲品之一。然后泰佳,我們將被限制為實(shí)現(xiàn)順序縮減或基于合并的并行縮減盼砍。
關(guān)聯(lián)性
如果滿足以下條件,則 運(yùn)算符或函數(shù)op
是關(guān)聯(lián)的:
(a op b) op c == a op (b op c)
如果我們將其擴(kuò)展為四個(gè)術(shù)語(yǔ)逝她,可以看出這對(duì)并行評(píng)估的重要性:
a op b op c op d == (a op b) op (c op d)
所以我們可以(a op b)
并行評(píng)估(c op d)
浇坐,然后調(diào)用op
結(jié)果。
關(guān)聯(lián)操作的示例包括數(shù)字加法黔宛,最小值和最大值以及字符串連接近刘。
低級(jí)流建設(shè)
到目前為止,所有流示例都使用了類似 Collection.stream()
或Arrays.stream(Object[])
獲取流的方法。這些流式方法是如何實(shí)現(xiàn)的觉渴?
該類StreamSupport
有許多用于創(chuàng)建流的低級(jí)方法介劫,所有方法都使用某種形式的 Spliterator
。分裂器是一個(gè)并行的模擬器 Iterator
; 它描述了一個(gè)(可能是無(wú)限的)元素集合疆拘,支持順序前進(jìn)蜕猫,批量遍歷,并將輸入的某些部分分成另一個(gè)可以并行處理的分裂器哎迄。在最低級(jí)別,所有流都由分裂器驅(qū)動(dòng)隆圆。
在實(shí)現(xiàn)spliterator時(shí)有許多實(shí)現(xiàn)選擇漱挚,幾乎所有這些都是使用該spliterator在實(shí)現(xiàn)的簡(jiǎn)單性和流的運(yùn)行時(shí)性能之間進(jìn)行權(quán)衡。創(chuàng)建spliterator的最簡(jiǎn)單但性能最差的方法是使用迭代器創(chuàng)建一個(gè) Spliterators.spliteratorUnknownSize(java.util.Iterator, int)
渺氧。雖然這樣的分裂器可以工作旨涝,但它可能會(huì)提供較差的并行性能,因?yàn)槲覀儊G失了大小調(diào)整信息(基礎(chǔ)數(shù)據(jù)集有多大)侣背,以及被限制為簡(jiǎn)單的分裂算法白华。
更高質(zhì)量的分裂器將提供平衡且已知大小的分割,準(zhǔn)確的大小調(diào)整信息贩耐,以及characteristics
可由實(shí)現(xiàn)用于優(yōu)化執(zhí)行的許多其他 分裂器或數(shù)據(jù)弧腥。
可變數(shù)據(jù)源的Spliterators還有一個(gè)挑戰(zhàn); 綁定到數(shù)據(jù)的時(shí)間,因?yàn)閿?shù)據(jù)可能在創(chuàng)建分裂器的時(shí)間和流管道的執(zhí)行時(shí)間之間發(fā)生變化潮太。理想情況下管搪,流的分裂器會(huì)報(bào)告IMMUTABLE
或的特征 CONCURRENT
; 如果不是它應(yīng)該是 遲到的。如果某個(gè)源無(wú)法直接提供推薦的分裂器铡买,它可能使用a間接提供分裂器Supplier
更鲁,并通過(guò)Supplier
-accepting版本 構(gòu)造流 stream()
。僅在流管道的終端操作開始之后才從供應(yīng)商獲得分裂器奇钞。
這些要求顯著減少了流源突變和流管道執(zhí)行之間潛在干擾的范圍澡为。基于具有所需特征的分裂器的流或使用基于供應(yīng)商的工廠形式的流不受在終端操作開始之前對(duì)數(shù)據(jù)源的修改的影響(假設(shè)流操作的行為參數(shù)滿足非操作的要求標(biāo)準(zhǔn))干涉和無(wú)國(guó)籍狀態(tài))