1. 什么是流键痛?
Stream是java8中新增加的一個特性,被java猿統(tǒng)稱為流.
Stream 不是集合元素,它不是數(shù)據(jù)結構并不保存數(shù)據(jù)硬纤,它是有關算法和計算的院峡,它更像一個高級版本的 Iterator。原始版本的 Iterator桌硫,用戶只能顯式地一個一個遍歷元素并對其執(zhí)行某些操作沛豌;高級版本的 Stream趋箩,用戶只要給出需要對其包含的元素執(zhí)行什么操作,比如 “過濾掉長度大于 10 的字符串”加派、“獲取每個字符串的首字母”等叫确,Stream 會隱式地在內部進行遍歷,做出相應的數(shù)據(jù)轉換哼丈。
Stream 就如同一個迭代器(Iterator)启妹,單向,不可往復醉旦,數(shù)據(jù)只能遍歷一次,遍歷過一次后即用盡了桨啃,就好比流水從面前流過车胡,一去不復返。
而和迭代器又不同的是照瘾,Stream 可以并行化操作匈棘,迭代器只能命令式地、串行化操作析命。顧名思義主卫,當使用串行方式去遍歷時,每個 item 讀完后再讀下一個 item鹃愤。而使用并行去遍歷時簇搅,數(shù)據(jù)會被分成多個段,其中每一個都在不同的線程中處理软吐,然后將結果一起輸出瘩将。Stream 的并行操作依賴于 Java7 中引入的 Fork/Join 框架(JSR166y)來拆分任務和加速處理過程。Java 的并行 API 演變歷程基本如下:
1.0-1.4 中的 java.lang.Thread
5.0 中的 java.util.concurrent
6.0 中的 Phasers 等
7.0 中的 Fork/Join 框架
8.0 中的 Lambda
Stream 的另外一大特點是,數(shù)據(jù)源本身可以是無限的姿现。
2. parallelStream是什么
parallelStream其實就是一個并行執(zhí)行的流.它通過默認的ForkJoinPool,可能提高你的多線程任務的速度.
3. parallelStream的作用
Stream具有平行處理能力肠仪,處理的過程會分而治之,也就是將一個大任務切分成多個小任務备典,這表示每個任務都是一個操作异旧,因此像以下的程式片段:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers.parallelStream()
.forEach(out::println);
你得到的展示順序不一定會是1、2提佣、3吮蛹、4、5镐依、6匹涮、7、8槐壳、9然低,而可能是任意的順序,就forEach()這個操作來講务唐,如果平行處理時雳攘,希望最后順序是按照原來Stream的數(shù)據(jù)順序,那可以調用forEachOrdered()枫笛。例如:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers.parallelStream()
.forEachOrdered(out::println);
注意:如果forEachOrdered()中間有其他如filter()的中介操作吨灭,會試著平行化處理,然后最終forEachOrdered()會以原數(shù)據(jù)順序處理刑巧,因此喧兄,使用forEachOrdered()這類的有序處理,可能會(或完全失去)失去平行化的一些優(yōu)勢,實際上中介操作亦有可能如此啊楚,例如sorted()方法吠冤。
4. parallelStream背后的男人:ForkJoinPool
要想深入的研究parallelStream之前,那么我們必須先了解ForkJoin框架和ForkJoinPool.本文旨在parallelStream,但因為兩種關系甚密,故在此簡單介紹一下ForkJoinPool,如有興趣可以更深入的去了解下ForkJoin
ForkJoin框架是從jdk7中新特性,它同ThreadPoolExecutor一樣,也實現(xiàn)了Executor和ExecutorService接口恭理。它使用了一個無限隊列來保存需要執(zhí)行的任務拯辙,而線程的數(shù)量則是通過構造函數(shù)傳入,如果沒有向構造函數(shù)中傳入希望的線程數(shù)量颜价,那么當前計算機可用的CPU數(shù)量會被設置為線程數(shù)量作為默認值涯保。
ForkJoinPool主要用來使用分治法(Divide-and-Conquer Algorithm)來解決問題。典型的應用比如快速排序算法周伦。這里的要點在于夕春,F(xiàn)orkJoinPool需要使用相對少的線程來處理大量的任務。比如要對1000萬個數(shù)據(jù)進行排序横辆,那么會將這個任務分割成兩個500萬的排序任務和一個針對這兩組500萬數(shù)據(jù)的合并任務撇他。以此類推茄猫,對于500萬的數(shù)據(jù)也會做出同樣的分割處理,到最后會設置一個閾值來規(guī)定當數(shù)據(jù)規(guī)模到多少時困肩,停止這樣的分割處理划纽。比如,當元素的數(shù)量小于10時锌畸,會停止分割勇劣,轉而使用插入排序對它們進行排序。那么到最后潭枣,所有的任務加起來會有大概2000000+個比默。問題的關鍵在于,對于一個任務而言盆犁,只有當它所有的子任務完成之后命咐,它才能夠被執(zhí)行。
所以當使用ThreadPoolExecutor時谐岁,使用分治法會存在問題醋奠,因為ThreadPoolExecutor中的線程無法像任務隊列中再添加一個任務并且在等待該任務完成之后再繼續(xù)執(zhí)行。而使用ForkJoinPool時伊佃,就能夠讓其中的線程創(chuàng)建新的任務窜司,并掛起當前的任務,此時線程就能夠從隊列中選擇子任務執(zhí)行航揉。
那么使用ThreadPoolExecutor或者ForkJoinPool塞祈,會有什么性能的差異呢?
首先帅涂,使用ForkJoinPool能夠使用數(shù)量有限的線程來完成非常多的具有父子關系的任務议薪,比如使用4個線程來完成超過200萬個任務。但是媳友,使用ThreadPoolExecutor時笙蒙,是不可能完成的,因為ThreadPoolExecutor中的Thread無法選擇優(yōu)先執(zhí)行子任務庆锦,需要完成200萬個具有父子關系的任務時,也需要200萬個線程轧葛,顯然這是不可行的搂抒。
工作竊取算法
forkjoin最核心的地方就是利用了現(xiàn)代硬件設備多核,在一個操作時候會有空閑的cpu,那么如何利用好這個空閑的cpu就成了提高性能的關鍵,而這里我們要提到的工作竊取(work-stealing)算法就是整個forkjion框架的核心理念,工作竊饶虺丁(work-stealing)算法是指某個線程從其他隊列里竊取任務來執(zhí)行求晶。
那么為什么需要使用工作竊取算法呢?
假如我們需要做一個比較大的任務衷笋,我們可以把這個任務分割為若干互不依賴的子任務芳杏,為了減少線程間的競爭,于是把這些子任務分別放到不同的隊列里,并為每個隊列創(chuàng)建一個單獨的線程來執(zhí)行隊列里的任務爵赵,線程和隊列一一對應吝秕,比如A線程負責處理A隊列里的任務。但是有的線程會先把自己隊列里的任務干完空幻,而其他線程對應的隊列里還有任務等待處理烁峭。干完活的線程與其等著,不如去幫其他線程干活秕铛,于是它就去其他線程的隊列里竊取一個任務來執(zhí)行约郁。而在這時它們會訪問同一個隊列,所以為了減少竊取任務線程和被竊取任務線程之間的競爭但两,通常會使用雙端隊列鬓梅,被竊取任務線程永遠從雙端隊列的頭部拿任務執(zhí)行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執(zhí)行谨湘。
工作竊取算法的優(yōu)點是充分利用線程進行并行計算绽快,并減少了線程間的競爭,其缺點是在某些情況下還是存在競爭悲关,比如雙端隊列里只有一個任務時谎僻。并且消耗了更多的系統(tǒng)資源,比如創(chuàng)建多個線程和多個雙端隊列寓辱。
5. 用看forkjion的眼光來看ParallelStreams
上文中已經(jīng)提到了在Java 8引入了自動并行化的概念艘绍。它能夠讓一部分Java代碼自動地以并行的方式執(zhí)行,也就是我們使用了ForkJoinPool的ParallelStream秫筏。
Java 8為ForkJoinPool添加了一個通用線程池诱鞠,這個線程池用來處理那些沒有被顯式提交到任何線程池的任務。它是ForkJoinPool類型上的一個靜態(tài)元素这敬,它擁有的默認線程數(shù)量等于運行計算機上的處理器數(shù)量航夺。當調用Arrays類上添加的新方法時,自動并行化就會發(fā)生崔涂。比如用來排序一個數(shù)組的并行快速排序阳掐,用來對一個數(shù)組中的元素進行并行遍歷。自動并行化也被運用在Java 8新添加的Stream API中冷蚂。
比如下面的代碼用來遍歷列表中的元素并執(zhí)行需要的操作:
List<UserInfo> userInfoList =
DaoContainers.getUserInfoDAO().queryAllByList(new UserInfoModel());
userInfoList.parallelStream().forEach(RedisUserApi::setUserIdUserInfo);
對于列表中的元素的操作都會以并行的方式執(zhí)行缭保。forEach方法會為每個元素的計算操作創(chuàng)建一個任務,該任務會被前文中提到的ForkJoinPool中的通用線程池處理蝙茶。以上的并行計算邏輯當然也可以使用ThreadPoolExecutor完成艺骂,但是就代碼的可讀性和代碼量而言,使用ForkJoinPool明顯更勝一籌隆夯。
對于ForkJoinPool通用線程池的線程數(shù)量钳恕,通常使用默認值就可以了别伏,即運行時計算機的處理器數(shù)量。我這里提供了一個示例的代碼讓你了解jvm所使用的ForkJoinPool的線程數(shù)量, 你可以可以通過設置系統(tǒng)屬性:-Djava.util.concurrent.ForkJoinPool.common.parallelism=N (N為線程數(shù)量),來調整ForkJoinPool的線程數(shù)量,可以嘗試調整成不同的參數(shù)來觀察每次的輸出結果:
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
/**
* @description 這是一個用來讓你更加熟悉parallelStream的原理的實力
* @date 2016年10月11日18:26:55
* @version v1.0
* @author wangguangdong
*/
public class App {
public static void main(String[] args) throws Exception {
System.out.println("Hello World!");
// 構造一個10000個元素的集合
List<Integer> list = new ArrayList<>();
for (int i = 0; i < 10000; i++) {
list.add(i);
}
// 統(tǒng)計并行執(zhí)行l(wèi)ist的線程
Set<Thread> threadSet = new CopyOnWriteArraySet<>();
// 并行執(zhí)行
list.parallelStream().forEach(integer -> {
Thread thread = Thread.currentThread();
// System.out.println(thread);
// 統(tǒng)計并行執(zhí)行l(wèi)ist的線程
threadSet.add(thread);
});
System.out.println("threadSet一共有" + threadSet.size() + "個線程");
System.out.println("系統(tǒng)一個有"+Runtime.getRuntime().availableProcessors()+"個cpu");
List<Integer> list1 = new ArrayList<>();
List<Integer> list2 = new ArrayList<>();
for (int i = 0; i < 100000; i++) {
list1.add(i);
list2.add(i);
}
Set<Thread> threadSetTwo = new CopyOnWriteArraySet<>();
CountDownLatch countDownLatch = new CountDownLatch(2);
Thread threadA = new Thread(() -> {
list1.parallelStream().forEach(integer -> {
Thread thread = Thread.currentThread();
// System.out.println("list1" + thread);
threadSetTwo.add(thread);
});
countDownLatch.countDown();
});
Thread threadB = new Thread(() -> {
list2.parallelStream().forEach(integer -> {
Thread thread = Thread.currentThread();
// System.out.println("list2" + thread);
threadSetTwo.add(thread);
});
countDownLatch.countDown();
});
threadA.start();
threadB.start();
countDownLatch.await();
System.out.print("threadSetTwo一共有" + threadSetTwo.size() + "個線程");
System.out.println("---------------------------");
System.out.println(threadSet);
System.out.println(threadSetTwo);
System.out.println("---------------------------");
threadSetTwo.addAll(threadSet);
System.out.println(threadSetTwo);
System.out.println("threadSetTwo一共有" + threadSetTwo.size() + "個線程");
System.out.println("系統(tǒng)一個有"+Runtime.getRuntime().availableProcessors()+"個cpu");
}
}
出現(xiàn)這種現(xiàn)象的原因是忧额,forEach方法用了一些小把戲厘肮。**它會將執(zhí)行forEach本身的線程也作為線程池中的一個工作線程。因此宙址,即使將ForkJoinPool的通用線程池的線程數(shù)量設置為1轴脐,實際上也會有2個工作線程。因此在使用forEach的時候抡砂,線程數(shù)為1的ForkJoinPool通用線程池和線程數(shù)為2的ThreadPoolExecutor是等價的大咱。
**
所以當ForkJoinPool通用線程池實際需要4個工作線程時,可以將它設置成3注益,那么在運行時可用的工作線程就是4了碴巾。
6. 小結:
- 當需要處理遞歸分治算法時,考慮使用ForkJoinPool丑搔。
- 仔細設置不再進行任務劃分的閾值厦瓢,這個閾值對性能有影響。
- Java 8中的一些特性會使用到ForkJoinPool中的通用線程池啤月。在某些場合下煮仇,需要調整該線程池的默認的線程數(shù)量。
7. ParallelStreams 的陷阱
上文中我們已經(jīng)看到了ParallelStream他強大無比的特性,但這里我們就講告訴你ParallelStreams不是萬金油,而是一把雙刃劍,如果錯誤的使用反倒可能傷人傷己.
以下是一個我們項目里使用 parallel streams 的很常見的情況谎仲。在這個例子中浙垫,我們想同時調用不同地址的api中并且獲得第一個返回的結果。
public static String query(String q, List<String> engines) { Optional<String> result = engines.stream().parallel().map((base) -> {
String url = base + q;
return WS.url(url).get();
}).findAny();
return result.get();
}
可能有很多朋友在jdk7用future配合countDownLatch自己實現(xiàn)的這個功能,但是jdk8的朋友基本都會用上面的實現(xiàn)方式,那么自信深究一下究竟自己用future實現(xiàn)的這個功能和利用jdk8的parallelStream來實現(xiàn)這個功能有什么不同點呢?坑又在哪里呢郑诺?
讓我們細思思考一下整個功能究竟是如何運轉的夹姥。首先我們的集合元素engines 由ParallelStreams并行的去進行map操作(ParallelStreams使用JVM默認的forkJoin框架的線程池由當前線程去執(zhí)行并行操作).
然而,這里需要注意的一地方是我們在調用第三方的api請求是一個響應略慢而且會阻塞操作的一個過程。所以在某時刻所有線程都會調用 get() 方法并且在那里等待結果返回.
再回過頭仔細思考一下這個功能的實現(xiàn)過程是我們一開始想要的嗎辙诞?我們是在同一時間等待所有的結果,而不是遍歷這個列表按順序等待每個回答.然而辙售,由于ForkJoinPool workders的存在,這樣平行的等待相對于使用主線程的等待會產(chǎn)生的一種副作用.
現(xiàn)在ForkJoin pool (關于forkjion的更多實現(xiàn)你可以去搜索引擎中去看一下他的具體實現(xiàn)方式) 的實現(xiàn)是: 它并不會因為產(chǎn)生了新的workers而抵消掉阻塞的workers飞涂。那么在某個時間所有 ForkJoinPool.common() 的線程都會被用光.也就是說旦部,下一次你調用這個查詢方法,就可能會在一個時間與其他的parallel stream同時運行较店,而導致第二個任務的性能大大受損志鹃。或者說泽西,例如你在這個功能里是用來快速返回調用的第三方api的,而在其他的功能里是用于一些簡單的數(shù)據(jù)并行計算的,但是假如你先調用了這個功能,同一時間之后調用計算的函數(shù),那么這里forkjionPool的實現(xiàn)會讓你計算的函數(shù)大打折扣.
不過也不要急著去吐槽ForkJoinPool的實現(xiàn),在不同的情況下你可以給它一個ManagedBlocker實例并且確保它知道在一個阻塞調用中應該什么時候去抵消掉卡住的workers.現(xiàn)在有意思的一點是,在一個parallel stream處理中并不一定是阻塞調用會拖延程序的性能缰趋。任何被用于映射在一個集合上的長時間運行的函數(shù)都會產(chǎn)生同樣的問題.
正如我們上面那個列子的情況分析得知,lambda的執(zhí)行并不是瞬間完成的,所有使用parallel streams的程序都有可能成為阻塞程序的源頭,并且在執(zhí)行過程中程序中的其他部分將無法訪問這些workers,這意味著任何依賴parallel streams的程序在什么別的東西占用著common ForkJoinPool時將會變得不可預知并且暗藏危機.
8. 怎么正確使用parallelStream
如果你正在寫一個其他地方都是單線程的程序并且準確地知道什么時候你應該要使用parallel streams捧杉,這樣的話你可能會覺得這個問題有一點膚淺陕见。然而,我們很多人是在處理web應用味抖、各種不同的框架以及重量級應用服務评甜。一個服務器是怎樣被設計成一個可以支持多種獨立應用的主機的?誰知道呢仔涩,給你一個可以并行的卻不能控制輸入的parallel stream.
很抱歉,請原諒我用的標注[怎么正確使用parallelStream],因為目前為止我也沒有發(fā)現(xiàn)一個好的方式來讓我真正的正確使用parallelStream.下面的網(wǎng)上寫的兩種方式:
一種方式是限制ForkJoinPool提供的并行數(shù)忍坷。可以通過使用-Djava.util.concurrent.ForkJoinPool.common.parallelism=1 來限制線程池的大小為1熔脂。不再從并行化中得到好處可以杜絕錯誤的使用它(其實這個方式還是有點搞笑的,既然這樣搞那我還不如不去使用并行流)佩研。
另一種方式就是,一個被稱為工作區(qū)的可以讓ForkJoinPool平行放置的 parallelStream() 實現(xiàn)霞揉。不幸的是現(xiàn)在的JDK還沒有實現(xiàn)旬薯。
Parallel streams 是無法預測的,而且想要正確地使用它有些棘手适秩。幾乎任何parallel streams的使用都會影響程序中無關部分的性能绊序,而且是一種無法預測的方式。秽荞。但是在調用stream.parallel() 或者parallelStream()時候在我的代碼里之前我仍然會重新審視一遍他給我的程序究竟會帶來什么問題,他能有多大的提升,是否有使用他的意義.
stream or parallelStream骤公?
上面我們也看到了parallelStream所帶來的隱患和好處,那么,在從stream和parallelStream方法中進行選擇時,我們可以考慮以下幾個問題:
- 是否需要并行?
- 任務之間是否是獨立的扬跋?是否會引起任何競態(tài)條件阶捆?
- 結果是否取決于任務的調用順序?
對于問題1胁住,在回答這個問題之前趁猴,你需要弄清楚你要解決的問題是什么,數(shù)據(jù)量有多大彪见,計算的特點是什么儡司?并不是所有的問題都適合使用并發(fā)程序來求解,比如當數(shù)據(jù)量不大時余指,順序執(zhí)行往往比并行執(zhí)行更快捕犬。畢竟,準備線程池和其它相關資源也是需要時間的酵镜。但是碉碉,當任務涉及到I/O操作并且任務之間不互相依賴時,那么并行化就是一個不錯的選擇淮韭。通常而言垢粮,將這類程序并行化之后,執(zhí)行速度會提升好幾個等級靠粪。
對于問題2蜡吧,如果任務之間是獨立的毫蚓,并且代碼中不涉及到對同一個對象的某個狀態(tài)或者某個變量的更新操作,那么就表明代碼是可以被并行化的昔善。
對于問題3元潘,由于在并行環(huán)境中任務的執(zhí)行順序是不確定的,因此對于依賴于順序的任務而言君仆,并行化也許不能給出正確的結果翩概。
參考文章
http://www.ibm.com/developerworks/cn/java/j-lo-java8streamapi/
http://zeroturnaround.com/rebellabs/java-parallel-streams-are-bad-for-your-health/
http://www.openhome.cc/Gossip/Java/ParallelStream.html
http://blog.csdn.net/dm_vincent/article/details/39505977
http://blog.csdn.net/dm_vincent/article/details/40856569