Stream API
經(jīng)過前面 4 篇內(nèi)容的學(xué)習(xí),我們已經(jīng)掌握了 Stream 大部分的知識,本節(jié)我們針對之前 Stream 未涉及的內(nèi)容及周邊知識點做個補(bǔ)充嫂伞。
Fork/Join 框架
fork/join 框架是 Java 7 中引入的新特性 移层,它是一個工具,通過 「 分而治之 」 的方法嘗試將所有可用的處理器內(nèi)核使用起來幫助加速并行處理拂盯。
在實際使用過程中,這種 「 分而治之 」的方法意味著框架首先要 fork 记靡,遞歸地將任務(wù)分解為較小的獨立子任務(wù)谈竿,直到它們足夠簡單以便異步執(zhí)行。然后摸吠,join 部分開始工作空凸,將所有子任務(wù)的結(jié)果遞歸地連接成單個結(jié)果,或者在返回 void 的任務(wù)的情況下寸痢,程序只是等待每個子任務(wù)執(zhí)行完畢呀洲。
為了提供有效的并行執(zhí)行,fork/join 框架使用了一個名為 ForkJoinPool 的線程池啼止,用于管理 ForkJoinWorkerThread 類型的工作線程道逗。
Fork/Join 優(yōu)點
Fork/Join 架構(gòu)使用了一種名為工作竊取( work-stealing )算法來平衡線程的工作負(fù)載献烦。
簡單來說滓窍,工作竊取算法就是空閑的線程試圖從繁忙線程的隊列中竊取工作。
默認(rèn)情況下巩那,每個工作線程從其自己的雙端隊列中獲取任務(wù)吏夯。但如果自己的雙端隊列中的任務(wù)已經(jīng)執(zhí)行完畢,雙端隊列為空時即横,工作線程就會從另一個忙線程的雙端隊列尾部或全局入口隊列中獲取任務(wù)噪生,因為這是最大概率可能找到工作的地方。
這種方法最大限度地減少了線程競爭任務(wù)的可能性东囚。它還減少了工作線程尋找任務(wù)的次數(shù)跺嗽,因為它首先在最大可用的工作塊上工作。
Fork/Join 使用
ForkJoinTask 是 ForkJoinPool 線程之中執(zhí)行的任務(wù)的基本類型页藻。我們?nèi)粘J褂脮r桨嫁,一般不直接使用 ForkJoinTask ,而是擴(kuò)展它的兩個子類中的任意一個
- 任務(wù)不返回結(jié)果 ( 返回 void ) 的 RecursiveAction
- 返回值的任務(wù)的 RecursiveTask <V>
這兩個類都有一個抽象方法 compute() 惕橙,用于定義任務(wù)的邏輯。
我們所要做的钉跷,就是繼承任意一個類弥鹦,然后實現(xiàn) compute() 方法,步驟如下:
- 創(chuàng)建一個表示工作總量的對象
- 選擇合適的閾值
- 定義分割工作的方法
- 定義執(zhí)行工作的方法
如下是使用 Fork/Join 方式實現(xiàn)的1至1000006587的 Fork/Join 方式累加,我們和單線程的循環(huán)累加做了下對比彬坏,在 Intel i5-4460 的 PC 機(jī)器下朦促,單線程執(zhí)行使用了 650 ms,使用了 Fork/Join 方式執(zhí)行 210 ms栓始,優(yōu)化效果挺明顯务冕。
public class NumberAddTask extends RecursiveTask<Long> {
private static final int THRESHOLD = 10_0000;
private final int begin;
private final int end;
public NumberAddTask(int begin, int end) {
super();
this.begin = begin;
this.end = end;
}
@Override
protected Long compute() {
if (end - begin <= THRESHOLD) {
long sum = 0;
for(int i = begin; i <= end; i++) {
sum += i;
}
return sum;
}
int mid = (begin + end) /2;
NumberAddTask t1 = new NumberAddTask(begin, mid);
NumberAddTask t2 = new NumberAddTask(mid + 1, end);
ForkJoinTask.invokeAll(t1, t2);
return t1.join() + t2.join();
}
}
// 1至1000006587的Fork/Join方式累加
@Test
public void testAddForkJoin() {
long begin = System.currentTimeMillis();
int n = 10_0000_6587;
ForkJoinPool pool = ForkJoinPool.commonPool();
log.info("1 + 2 + ... {} = {}", n, pool.invoke(new NumberAddTask(1, n)));
long end = System.currentTimeMillis();
log.info("ForkJoin方式執(zhí)行時間:{}ms", end - begin);
}
// 1至1000006587的單線程累加
@Test
public void testAddFunction() {
long begin = System.currentTimeMillis();
int n = 10_0000_6587;
long sum = 0;
for(int i = 1; i <= n; i++ ) {
sum += i;
}
log.info("1 + 2 + ... {} = {}", n, sum);
long end = System.currentTimeMillis();
log.info("函數(shù)方式執(zhí)行時間:{}ms", end - begin);
}
Fork/Join 使用場景
我使用 Java 8 官方 Api 中 RecursiveTask 的示例,創(chuàng)建了一個計算斐波那契數(shù)列的 Fork/Join 實現(xiàn)幻赚,雖然官方也提到了這是愚蠢的實現(xiàn)斐波那契數(shù)列方法禀忆,甚至效果還不如單線程的遞歸計算,但是這也說明了 Fork/Join 并非萬能的落恼。
@Test
public void testForkJoin() {
// 執(zhí)行f(40) = 102334155使用3411ms
// 執(zhí)行f(80) 2個多小時箩退,無法計算出結(jié)果
long begin = System.currentTimeMillis();
int n = 40;
ForkJoinPool pool = ForkJoinPool.commonPool();
log.info("ForkJoinPool初始化時間:{}ms", System.currentTimeMillis() - begin);
log.info("斐波那契數(shù)列f({}) = {}", n, pool.invoke(new FibonacciTask(n)));
long end = System.currentTimeMillis();
log.info("ForkJoin方式執(zhí)行時間:{}ms", end - begin);
}
// 不用遞歸計算斐波那契數(shù)列反而更快
@Test
public void testFibonacci() {
// 執(zhí)行f(50000) 使用 110ms
// 輸出 f(50000) = 17438開頭的10450位長的整數(shù)
long begin = System.currentTimeMillis();
int n = 50000;
log.info("斐波那契數(shù)列f({}) = {}", n, FibonacciUtil.fibonacci(n));
long end = System.currentTimeMillis();
log.info("函數(shù)方式執(zhí)行時間:{}ms", end - begin);
}
以上代碼見 StreamOtherTest 。
Fork/Join 最大的優(yōu)點是提供了工作竊取算法佳谦,可以在多核CPU處理器上加速并行處理戴涝,他并非多線程開發(fā)替代品。
那么他們之間有什么區(qū)別呢钻蔑?
Fork/Join框架是從jdk7中新特性,它同ThreadPoolExecutor一樣啥刻,也實現(xiàn)了Executor和ExecutorService接口。它使用了一個無限隊列來保存需要執(zhí)行的任務(wù)咪笑,而線程的數(shù)量則是通過構(gòu)造函數(shù)傳入可帽,如果沒有向構(gòu)造函數(shù)中傳入希望的線程數(shù)量,那么當(dāng)前計算機(jī)可用的CPU數(shù)量會被設(shè)置為線程數(shù)量作為默認(rèn)值蒲肋。
ForkJoinPool主要用來使用分治法(Divide-and-Conquer Algorithm)來解決問題蘑拯。典型的應(yīng)用比如快速排序算法。這里的要點在于兜粘,F(xiàn)orkJoinPool需要使用相對少的線程來處理大量的任務(wù)申窘。比如要對1000萬個數(shù)據(jù)進(jìn)行排序,那么會將這個任務(wù)分割成兩個500萬的排序任務(wù)和一個針對這兩組500萬數(shù)據(jù)的合并任務(wù)孔轴。以此類推剃法,對于500萬的數(shù)據(jù)也會做出同樣的分割處理,到最后會設(shè)置一個閾值來規(guī)定當(dāng)數(shù)據(jù)規(guī)模到多少時路鹰,停止這樣的分割處理贷洲。比如,當(dāng)元素的數(shù)量小于10時晋柱,會停止分割优构,轉(zhuǎn)而使用插入排序?qū)λ鼈冞M(jìn)行排序。那么到最后雁竞,所有的任務(wù)加起來會有大概2000000+個钦椭。問題的關(guān)鍵在于拧额,對于一個任務(wù)而言,只有當(dāng)它所有的子任務(wù)完成之后彪腔,它才能夠被執(zhí)行侥锦。
所以當(dāng)使用ThreadPoolExecutor時,使用分治法會存在問題德挣,因為ThreadPoolExecutor中的線程無法像任務(wù)隊列中再添加一個任務(wù)并且在等待該任務(wù)完成之后再繼續(xù)執(zhí)行恭垦。而使用ForkJoinPool時,就能夠讓其中的線程創(chuàng)建新的任務(wù)格嗅,并掛起當(dāng)前的任務(wù)番挺,此時線程就能夠從隊列中選擇子任務(wù)執(zhí)行。
那么使用ThreadPoolExecutor或者ForkJoinPool吗浩,會有什么差異呢建芙?
首先,使用ForkJoinPool能夠使用數(shù)量有限的線程來完成非常多的具有父子關(guān)系的任務(wù)懂扼,比如使用4個線程來完成超過200萬個任務(wù)禁荸。但是,使用ThreadPoolExecutor時阀湿,是不可能完成的赶熟,因為ThreadPoolExecutor中的Thread無法選擇優(yōu)先執(zhí)行子任務(wù),需要完成200萬個具有父子關(guān)系的任務(wù)時陷嘴,也需要200萬個線程映砖,顯然這是不可行的。
在實踐中灾挨,ThreadPoolExecutor通常用于同時(并行)處理許多獨立請求(又稱為事務(wù))邑退,F(xiàn)ork/Join通常用于加速一項連貫的工作任務(wù)。
parallelStream 并行化
parallelStream 其實就是一個并行執(zhí)行的流.它通過默認(rèn)的 ForkJoinPool 劳澄,可以提高你的多線程任務(wù)的速度地技。parallelStream 具有并行處理能力,處理的過程會分而治之秒拔,也就是將一個大任務(wù)切分成多個小任務(wù)莫矗,這表示每個任務(wù)都是一個操作,可以并行處理砂缩。
parallelStream 的使用
使用方式:
- 創(chuàng)建時返回并行流:如 Collection<T>.parallelStream()
- 過程中轉(zhuǎn)換為并行流:如 Stream<T>.parallel()
- 如果需要作谚,轉(zhuǎn)換為順序流:Stream<T>.sequential()
// 并行流時,并非按照1,2,3...500的順序輸出
IntStream.range(1, 500).parallel().forEach(System.out::println);
parallelStream 的陷阱
由于 parallelStream 使用的是 ForkJoinPool 中的 commonPool庵芭,該方法默認(rèn)創(chuàng)建程序運行時所在計算機(jī)處理器內(nèi)核數(shù)量的線程妹懒,當(dāng)同時存在多個工作并行執(zhí)行時,F(xiàn)orkJoinPool 中的線程將被消耗完双吆,而當(dāng)有的worker因為執(zhí)行耗時操作眨唬,將導(dǎo)致其他工作也被阻塞滔悉,而此時我們也不清楚哪個任務(wù)導(dǎo)致了阻塞。這就是 parallelStream 的陷阱单绑。
parallelStream 是無法預(yù)測的,而且想要正確地使用它有些棘手曹宴。幾乎任何 parallelStream 的使用都會影響程序中其他部分的性能搂橙,而且是一種無法預(yù)測的方式。但是在調(diào)用stream.parallel() 或者 parallelStream() 時候在我的代碼里之前我仍然會重新審視一遍他給我的程序究竟會帶來什么問題笛坦,他能有多大的提升区转,是否有使用他的意義。
那么到底是使用 stream 還是 parallelStream 呢版扩?通過下面3個標(biāo)準(zhǔn)來鑒定
1. 是否需要并行废离?
在回答這個問題之前,你需要弄清楚你要解決的問題是什么礁芦,數(shù)據(jù)量有多大蜻韭,計算的特點是什么?并不是所有的問題都適合使用并發(fā)程序來求解柿扣,比如當(dāng)數(shù)據(jù)量不大時肖方,順序執(zhí)行往往比并行執(zhí)行更快。畢竟未状,準(zhǔn)備線程池和其它相關(guān)資源也是需要時間的俯画。但是,當(dāng)任務(wù)涉及到I/O操作并且任務(wù)之間不互相依賴時司草,那么并行化就是一個不錯的選擇艰垂。通常而言,將這類程序并行化之后埋虹,執(zhí)行速度會提升好幾個等級猜憎。
2. 任務(wù)之間是否是獨立的?是否會引起任何競態(tài)條件吨岭?
如果任務(wù)之間是獨立的拉宗,并且代碼中不涉及到對同一個對象的某個狀態(tài)或者某個變量的更新操作,那么就表明代碼是可以被并行化的辣辫。
3. 結(jié)果是否取決于任務(wù)的調(diào)用順序旦事?
由于在并行環(huán)境中任務(wù)的執(zhí)行順序是不確定的,因此對于依賴于順序的任務(wù)而言急灭,并行化也許不能給出正確的結(jié)果姐浮。
創(chuàng)建流的其他方式
我們在第1篇中記錄了幾種創(chuàng)建流的方式,但還是遺漏了一部分葬馋,再此稍作補(bǔ)充卖鲤。
從I/O通道
方式1:從緩存流中讀取為Stream肾扰,詳見如下代碼:
final String name = "明玉";
// 從網(wǎng)絡(luò)上讀取文字內(nèi)容
new BufferedReader(
new InputStreamReader(
new URL("https://www.txtxzz.com/txt/download/NWJhZjI3YjIzYWQ3N2UwMTZiNDQwYWE3")
// new URL("https://api.apiopen.top/getAllUrl")
.openStream()))
.lines()
.filter(str -> StrUtil.contains(str, name))
.forEach(System.out::println);
方式2:從文件系統(tǒng)獲取下級路徑及文件,詳見如下代碼:
// 獲取文件系統(tǒng)的下級路徑及其文件
Files.walk(FileSystems.getDefault().getPath("D:\\soft"))
.forEach(System.out::println);
方式3:從文件系統(tǒng)獲取文件內(nèi)容蛋逾,詳見如下代碼:
Files.lines(FileSystems.getDefault().getPath("D:\\", "a.txt"))
// .parallel()
.limit(200)
.forEach(System.out::println);
方式4:讀取JarFile內(nèi)的文件集晚,詳見如下代碼:
new JarFile("D:\\J2EE_Tools\\repository\\org\\springframework\\spring-core\\5.2.6.RELEASE\\spring-core-5.2.6.RELEASE.jar")
.stream()
.filter(entry -> StrUtil.contains(entry.getName(), "Method"))
.forEach(System.out::println);
獲取隨機(jī)數(shù)字流
使用類Random的ints、longs区匣、doubles的方法偷拔,根據(jù)傳遞不同的參數(shù),可以產(chǎn)生無限數(shù)字流亏钩、有限數(shù)字流莲绰、以及指定范圍的有限或無限數(shù)字流,示例如下:
double v = new Random()
.doubles(30, 2, 45)
.peek(System.out::println)
.max()
.getAsDouble();
log.info("一串隨機(jī)數(shù)的最大值為:{}", v);
位向量流
將BitSet中位向量為真的轉(zhuǎn)換為Stream姑丑,示例如下:
BitSet bitSet = new BitSet(8);
bitSet.set(1);
bitSet.set(6);
log.info("cardinality值{}", bitSet.cardinality());
bitSet.stream().forEach(System.out::println);
正則分割流
將字符串按照正則表達(dá)式分隔成子串流蛤签,示例如下:
Pattern.compile(":")
.splitAsStream("boo:and:foo")
.map(String::toUpperCase)
.forEach(System.out::println);
Stream 的其他方法
轉(zhuǎn)為無序流
使用 unordered() 方法可將 Stream 隨時轉(zhuǎn)為無序流。
轉(zhuǎn)換為Spliterator
使用 spliterator() 方法可將 Stream 轉(zhuǎn)為 Spliterator栅哀,Spliterator 介紹請看 https://juejin.im/post/5cf2622de51d4550bf1ae7ff震肮。
綜合示例
根據(jù)1962年第1屆百花獎至2018年第34屆百花獎數(shù)據(jù),有以下數(shù)據(jù)留拾,編寫代碼按照獲得最佳男主角的演員次數(shù)排名钙蒙,次數(shù)相同的按照參演年份正序排,并打印他所參演的電影间驮。
序號 | 最佳男主角 | 電影 |
---|---|---|
第1屆1962年 | 崔嵬 | 《紅旗譜》 |
第2屆1963年 | 張良 | 《哥倆好 |
第3屆1980年 | 李仁堂 | 《淚痕》 |
第4屆1981年 | 達(dá)式常 | 《燕歸來》 |
第5屆1982年 | 王心剛 | 《知音》 |
第6屆1983年 | 嚴(yán)順開 | 《阿Q正傳》 |
第7屆1984年 | 楊在葆 | 《血躬厌,總是熱的》 |
第8屆1985年 | 呂曉禾 | 《高山下的花環(huán)》 |
第9屆1986年 | 楊在葆 | 《代理市長》 |
第10屆1987年 | 姜文 | 《芙蓉鎮(zhèn)》 |
第11屆1988年 | 張藝謀 | 《老井》 |
第12屆1989年 | 姜文 | 《春桃》 |
第13屆1990年 | 古月 | 《開國大典》 |
第14屆1991年 | 李雪健 | 《焦裕祿》 |
第15屆1992年 | 王鐵成 | 《周恩來》 |
第16屆1993年 | 古月 | 《毛澤東的故事》 |
第17屆1994年 | 李保田 | 《鳳凰琴》 |
第18屆1995年 | 李仁堂 | 《被告山杠爺》 |
第19屆1996年 | 張國立 | 《混在北京》 |
第20屆1997年 | 高明 | 《孔繁森》 |
第21屆1998年 | 葛優(yōu) | 《甲方乙方》 |
第22屆1999年 | 趙本山 | 《男婦女主任》 |
第23屆2000年 | 潘長江 | 《明天我愛你》 |
第24屆2001年 | 王慶祥 | 《生死抉擇》 |
第25屆2002年 | 葛優(yōu) | 《大腕》 |
第26屆2003年 | 盧奇 | 《鄧小平》 |
第27屆2004年 | 葛優(yōu) | 《手機(jī)》 |
第27屆2004年 | 李幼斌 | 《驚心動魂》 |
第28屆2006年 | 吳軍 | 《張思德》 |
第29屆2008年 | 張涵予 | 《集結(jié)號》 |
第30屆2010年 | 陳坤 | 《畫皮》 |
第31屆2012年 | 文章 | 《失戀33天》 |
第32屆2014年 | 黃曉明 | 《中國合伙人》 |
第33屆2016年 | 馮紹峰 | 《狼圖騰》 |
第34屆2018年 | 吳京 | 《戰(zhàn)狼2》 |
根據(jù)題目要求,創(chuàng)建 HundredFlowersAwards 實體用來存儲上述數(shù)據(jù)竞帽,我們分析題目要求最終需要轉(zhuǎn)換為以演員為主的信息扛施,然后再根據(jù)演員的獲獎次數(shù)及出演年份做排序。
所以創(chuàng)建 ActorInfo 實體屹篓,包含 演員姓名和出演電影的信息疙渣。出演電影也需創(chuàng)建實體 FilmInfo ,包含 出演年份和電影名稱堆巧。
有了如上存儲數(shù)據(jù)實體信息后妄荔,代碼實現(xiàn)邏輯如下:
- 將百花獎的集合數(shù)據(jù)轉(zhuǎn)換為 Stream
- 將該數(shù)據(jù)流轉(zhuǎn)換為Map類型,Map 的 key 為演員名谍肤,Map 的 Value 為演員信息
- 對于重復(fù)出現(xiàn)的演員啦租,我們需要把電影信息追加到該演員出現(xiàn)的電影列表中
- 對于處理完的 Map 數(shù)據(jù),將該 Map 的 values 數(shù)據(jù)再次轉(zhuǎn)換為 Stream
- 將該 Stream 排序即可荒揣。
list.stream()
.collect(Collectors.toMap(HundredFlowersAwards::getActorName, ActorInfo::new, ActorInfo::addFilmInfos))
.values()
.stream()
.sorted(new ActorComparator())
.forEach(System.out::println);
本節(jié)代碼見 StreamOtherTest 篷角。
經(jīng)過幾天的學(xué)習(xí)和總結(jié),以上就是 Java Stream Api 的全部內(nèi)容了系任。從開始認(rèn)識 Stream Api恳蹲,我們逐漸了解了使用 Stream Api 的流程:創(chuàng)建 Stream 虐块、中間操作、終端操作嘉蕾。
我們對創(chuàng)建 Stream 贺奠、中間操作、終端操作的各個 api 方法進(jìn)行了介紹及案例演示错忱,之后我們還單獨抽出一節(jié)講解了 Collector 接口的實現(xiàn)及使用敞嗡。
上述內(nèi)容雖然文字不多,大部分都在代碼中給出了演示航背,希望大家能下載下來代碼并運行,以加深印象棱貌。
以上是前傳部分的學(xué)習(xí)內(nèi)容了玖媚,接下來我們將進(jìn)入到 Reactor 部分的學(xué)習(xí)。
源碼下載:https://github.com/crystalxmumu/spring-web-flux-study-note