Fork/Join
體現(xiàn)了分而治之,大問題分割為相同的小問題视事,小問題之間無關(guān)聯(lián)。動(dòng)態(tài)規(guī)劃庆揩,分割為相同的小問題俐东,小問題之間是有關(guān)聯(lián)的
工作密鹊搿:因?yàn)閯澐值淖尤蝿?wù)的計(jì)算時(shí)間不一樣,先完成的線程會(huì)去后完成的線程中提取任務(wù)執(zhí)行
public class Main {
public static void main(String[] args) throws Exception {
// 創(chuàng)建2000個(gè)隨機(jī)數(shù)組成的數(shù)組:
long[] array = new long[2000];
long expectedSum = 0;
for (int i = 0; i < array.length; i++) {
array[i] = random();
expectedSum += array[i];
}
System.out.println("Expected sum: " + expectedSum);
// fork/join:
ForkJoinTask<Long> task = new SumTask(array, 0, array.length);
long startTime = System.currentTimeMillis();
Long result = ForkJoinPool.commonPool().invoke(task);
long endTime = System.currentTimeMillis();
System.out.println("Fork/join sum: " + result + " in " + (endTime - startTime) + " ms.");
}
static Random random = new Random(0);
static long random() {
return random.nextInt(10000);
}
}
class SumTask extends RecursiveTask<Long> {
static final int THRESHOLD = 500;
long[] array;
int start;
int end;
SumTask(long[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if (end - start <= THRESHOLD) {
// 如果任務(wù)足夠小,直接計(jì)算:
long sum = 0;
for (int i = start; i < end; i++) {
sum += this.array[i];
// 故意放慢計(jì)算速度:
try {
Thread.sleep(1);
} catch (InterruptedException e) {
}
}
return sum;
}
// 任務(wù)太大,一分為二:
int middle = (end + start) / 2;
System.out.println(String.format("split %d~%d ==> %d~%d, %d~%d", start, end, start, middle, middle, end));
SumTask subtask1 = new SumTask(this.array, start, middle);
SumTask subtask2 = new SumTask(this.array, middle, end);
invokeAll(subtask1, subtask2);
Long subresult1 = subtask1.join();
Long subresult2 = subtask2.join();
Long result = subresult1 + subresult2;
System.out.println("result = " + subresult1 + " + " + subresult2 + " ==> " + result);
return result;
}
}
ForkJoinPool線程池可以把一個(gè)大任務(wù)分拆成小任務(wù)并行執(zhí)行虏辫,任務(wù)類必須繼承自RecursiveTask或RecursiveAction蚌吸。返回值 Task,無返回值 Action砌庄。
CountDownLatch
//調(diào)用await()方法的線程會(huì)被掛起羹唠,它會(huì)等待直到count值為0才繼續(xù)執(zhí)行
public void await() throws InterruptedException { };
//和await()類似,只不過等待一定的時(shí)間后count值還沒變?yōu)?的話就會(huì)繼續(xù)執(zhí)行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };
//將count值減1
public void countDown() { };
1.CNT與線程數(shù)不是對(duì)應(yīng)關(guān)系
2.一個(gè)線程在進(jìn)行countdown后可以繼續(xù)運(yùn)行
3.可以在一個(gè)線程中多次扣減
4.await線程可以是多個(gè)