Fork-Join
java下多線程的開發(fā)可以我們自己?jiǎn)⒂枚嗑€程,線程池撒强,還可以使用forkjoin,forkjoin可以讓我們不去了解諸如Thread,Runnable等相關(guān)的知識(shí),只要遵循forkjoin的開發(fā)模式漱办,就可以寫出很好的多線程并發(fā)程序。
Fork-Join使用分治發(fā)的思想婉烟,將一個(gè)難以直接解決的大問題娩井,分割成一些規(guī)模較小的相同問題,以便各個(gè)擊破似袁,分而治之洞辣。常見的歸并排序、二分查找昙衅、快速排序都是扬霜。
Fork-Join原理
Fork-Join框架就是在必要的情況下,將一個(gè)任務(wù)進(jìn)行拆分fork成若干的小任務(wù)再將一個(gè)小任務(wù)運(yùn)算結(jié)果進(jìn)行join匯總而涉。
工作密取
private static class ConsumerAndProducer implements Runnable{
private Random random=new Random();
private final LinkedBlockingDeque<Work> deque;
private final LinkedBlockingDeque<Work> otherWork;
public ConsumerAndProducer(LinkedBlockingDeque<Work> deque,LinkedBlockingDeque<Work> otherWork){
this.deque = deque;
this.otherWork = otherWork;
}
@Override
public void run() {
//dosomethting
if(deque.isEmpty()){
if(!otherWork.isEmpty()){
System.out.println("otherWork is run:");
otherWork.takeLast().run();;
}
}
}
}
即當(dāng)前線程的Task已經(jīng)全部執(zhí)行完畢著瓶,則自動(dòng)取到其他線程的Task池中的Task執(zhí)行。ForkJoinPool中維護(hù)著多個(gè)線程(一般為CPU核數(shù))在不斷地執(zhí)行Task婴谱,每個(gè)線程除了執(zhí)行自己職務(wù)內(nèi)的Task之外蟹但,還會(huì)根據(jù)自己工作線程的閑置情況去獲取其他繁忙的工作線程的Task,如此一來就能能夠減少線程阻塞或是閑置的時(shí)間谭羔,提高CPU利用率华糖。代碼上沒有什么好說的,利用雙端隊(duì)列將繁忙的線程的任務(wù)拿到自己線程去處理瘟裸。
Fork-Join實(shí)戰(zhàn)
我們要使用ForkJoin框架客叉,必須首先創(chuàng)建一個(gè)ForkJoin任務(wù)。它提供在任務(wù)中執(zhí)行fork和join的操作機(jī)制,通常我們不直接繼承ForkjoinTask類兼搏,只需要直接繼承其子類卵慰。
- RecursiveAction,用于沒有返回結(jié)果的任務(wù)
- RecursiveTask佛呻,用于有返回值的任務(wù)
task要通過ForkJoinPool來執(zhí)行裳朋,使用submit 或 invoke 提交,兩者的區(qū)別是:invoke是同步執(zhí)行吓著,調(diào)用之后需要等待任務(wù)完成鲤嫡,才能執(zhí)行后面的代碼;submit是異步執(zhí)行绑莺。
在我們自己實(shí)現(xiàn)的compute方法里暖眼,首先需要判斷任務(wù)是否足夠小,如果足夠小就直接執(zhí)行任務(wù)纺裁。如果不足夠小诫肠,就必須分割成兩個(gè)子任務(wù),每個(gè)子任務(wù)在調(diào)用invokeAll方法時(shí)欺缘,又會(huì)進(jìn)入compute方法栋豫,看看當(dāng)前子任務(wù)是否需要繼續(xù)分割成孫任務(wù),如果不需要繼續(xù)分割浪南,則執(zhí)行當(dāng)前子任務(wù)并返回結(jié)果笼才。使用join方法會(huì)等待子任務(wù)執(zhí)行完并得到其結(jié)果。
例如我們要給一個(gè)文件夾中包含子文件夾下的文件某些文件
public class FindDirsFiles extends RecursiveAction {
private File path;
public FindDirsFiles(File path) {
this.path = path;
}
@Override
protected void compute() {
List<FindDirsFiles> subTasks = new ArrayList<>();
File[] files = path.listFiles();
if (files!=null){
for (File file : files) {
if (file.isDirectory()) {
// 對(duì)每個(gè)子目錄都新建一個(gè)子任務(wù)络凿。
subTasks.add(new FindDirsFiles(file));
} else {
// 遇到文件骡送,檢查。
if (file.getAbsolutePath().endsWith("txt")){
System.out.println("文件:" + file.getAbsolutePath());
}
}
}
if (!subTasks.isEmpty()) {
// 在當(dāng)前的 ForkJoinPool 上調(diào)度所有的子任務(wù)絮记。
for (FindDirsFiles subTask : invokeAll(subTasks)) {
subTask.join();
}
}
}
}
public static void main(String [] args){
try {
// 用一個(gè) ForkJoinPool 實(shí)例調(diào)度總?cè)蝿?wù)
ForkJoinPool pool = new ForkJoinPool();
FindDirsFiles task = new FindDirsFiles(new File("F:/"));
/*異步提交*/
pool.execute(task);
/*主線程做自己的業(yè)務(wù)工作*/
System.out.println("Task is Running......");
Thread.sleep(1);
int otherWork = 0;
for(int i=0;i<100;i++){
otherWork = otherWork + i;
}
System.out.println("Main Thread done sth......,otherWork="
+otherWork);
task.join();//阻塞方法
System.out.println("Task end");
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
我們首先初始化連接池摔踱,遍歷當(dāng)前文件下的文件夾存放到subTasks集合中,通過invokeAll方法放入ForkJoinPool中怨愤,遍歷每個(gè)FindDirsFiles的join獲得結(jié)果派敷。