1 callable和future
一般情況启妹,我們實(shí)現(xiàn)多線程都是Thread或者Runnable(后者比較多),但是削祈,這兩種都是沒返回值的翅溺,所以我們需要使用callable(有返回值的多線程)和future(獲得線程的返回值)來實(shí)現(xiàn)了。
public class TestThread {
public static void main(String[] args) {
ThreadCount tc = null;
ExecutorService es = Executors.newCachedThreadPool();//線程池
CompletionService<Integer> cs = new ExecutorCompletionService<Integer>(es);
for(int i=0;i<4;i++){
tc = new ThreadCount(i+1);
cs.submit(tc);
}
// 添加結(jié)束髓抑,及時(shí)shutdown,不然主線程不會(huì)結(jié)束
es.shutdown();
int total = 0;
for(int i=0;i<4;i++){
try {
total+=cs.take().get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
System.out.println(total);
}
}
class ThreadCount implements Callable<Integer>{
private int type;
ThreadCount(int type){
this.type = type;
}
@Override
public Integer call() throws Exception {
if(type==1){
System.out.println("C盤統(tǒng)計(jì)大小");
return 1;
}else if(type==2){
Thread.sleep(20000);
System.out.println("D盤統(tǒng)計(jì)大小");
return 2;
}else if(type==3){
System.out.println("E盤統(tǒng)計(jì)大小");
return 3;
}else if(type==4){
System.out.println("F盤統(tǒng)計(jì)大小");
return 4;
}
return null;
}
}
ps:一個(gè)需要注意的小細(xì)節(jié)优幸,cs.take.get()獲取返回值(這里阻塞吨拍?),是按照完成的順序的网杆,即上面案例返回順序是CEFD
tips:
ExecutorCompletionService 是將 Executor和BlockQueue結(jié)合的jdk類羹饰,其實(shí)現(xiàn)的主要目的是:提交任務(wù)線程,每一個(gè)線程任務(wù)直線完成后碳却,將返回值放在阻塞隊(duì)列中队秩,然后可以通過阻塞隊(duì)列的take()方法返回 對應(yīng)線程的執(zhí)行結(jié)果!昼浦!
2. join阻塞——直接用join把線程5加入進(jìn)去即可
直接用join把線程5加入進(jìn)去即可
public static void main(String[] args) throws InterruptedException
{
Thread t1 = new Thread(new Worker("thread-1"));
Thread t2 = new Thread(new Worker("thread-2"));
Thread t3 = new Thread(new Worker("thread-3"));
Thread t4 = new Thread(new Worker("thread-4"));
Thread t5 = new Thread(new Worker("thread-5"));
t1.start();t2.start();t3.start();t4.start();
t1.join();t2.join();t3.join();t4.join();
t5.start();
t5.join();
}
或者馍资,在t5的run:
t1.start();t2.start();t3.start();t4.start(); t1.join();t2.join();t3.join();t4.join();
t5線程中等待四個(gè)線程返回
3.用java.util.concurrent下的方法解決
用CountDownLatch : 一個(gè)線程(或者多個(gè)), 等待另外N個(gè)線程完成某個(gè)事情之后才能執(zhí)行
CountDownLatch 是計(jì)數(shù)器, 線程完成一個(gè)就記一個(gè), 就像 報(bào)數(shù)一樣, 只不過是遞減的.
盜用別人的一個(gè)例子:
public class CountDownLatchDemo {
final static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch=new CountDownLatch(2);//兩個(gè)工人的協(xié)作
Worker worker1=new Worker("zhang san", 5000, latch);
Worker worker2=new Worker("li si", 8000, latch);
worker1.start();//
worker2.start();//
latch.await();//等待所有工人完成工作
System.out.println("all work done at "+sdf.format(new Date()));
}
static class Worker extends Thread{
String workerName;
int workTime;
CountDownLatch latch;
public Worker(String workerName ,int workTime ,CountDownLatch latch){
this.workerName=workerName;
this.workTime=workTime;
this.latch=latch;
}
public void run(){
System.out.println("Worker "+workerName+" do work begin at "+sdf.format(new Date()));
doWork();//工作了
System.out.println("Worker "+workerName+" do work complete at "+sdf.format(new Date()));
latch.countDown();//工人完成工作关噪,計(jì)數(shù)器減一
}
private void doWork(){
try {
Thread.sleep(workTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
CyclicBarrier : N個(gè)線程 相互等待鸟蟹,任何一個(gè)線程完成之前,所有的線程都必須等待使兔。
這樣應(yīng)該就清楚一點(diǎn)了建钥,對于CountDownLatch來說,重點(diǎn)是那個(gè) “一個(gè)線程” , 是它在等待虐沥, 而另外那N的線程在把 “某個(gè)事情” 做完之后可以繼續(xù)等待熊经,可以終止。而對于CyclicBarrier來說,重點(diǎn)是那 N個(gè)線程 镐依,他們之間任何一個(gè)沒有完成悉盆,所有的線程都必須等待。
CyclicBarrier更像一個(gè)水閘, 線程執(zhí)行就想水流, 在水閘處都會(huì)堵住, 等到水滿(線程到齊)了, 才開始泄流.
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierTest {
public static void main(String[] args) {
//創(chuàng)建CyclicBarrier對象馋吗,
//并設(shè)置執(zhí)行完一組5個(gè)線程的并發(fā)任務(wù)后焕盟,再執(zhí)行MainTask任務(wù)
CyclicBarrier cb = new CyclicBarrier(5, new MainTask());
new SubTask("A", cb).start();
new SubTask("B", cb).start();
new SubTask("C", cb).start();
new SubTask("D", cb).start();
new SubTask("E", cb).start();
}
}
/**
* 最后執(zhí)行的任務(wù)
*/
class MainTask implements Runnable {
public void run() {
//根據(jù)jdkdoc里的描述,哪個(gè)線程最后運(yùn)行完宏粤,就執(zhí)行下面的代碼脚翘。
System.out.println("......執(zhí)行最后的任務(wù)了......");
}
}
/**
* 一組并發(fā)任務(wù)
*/
class SubTask extends Thread {
private String name;
private CyclicBarrier cb;
SubTask(String name, CyclicBarrier cb) {
this.name = name;
this.cb = cb;
}
public void run() {
System.out.println("[并發(fā)任務(wù)" + name + "] 開始執(zhí)行");
for (int i = 0; i < 999999; i++) ; //模擬耗時(shí)的任務(wù)
System.out.println("[并發(fā)任務(wù)" + name + "] 開始執(zhí)行完畢,通知障礙器");
try {
//每執(zhí)行完一項(xiàng)任務(wù)就通知障礙器
cb.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
結(jié)果:
[并發(fā)任務(wù)A] 開始執(zhí)行
[并發(fā)任務(wù)B] 開始執(zhí)行
[并發(fā)任務(wù)B] 開始執(zhí)行完畢绍哎,通知障礙器
[并發(fā)任務(wù)C] 開始執(zhí)行
[并發(fā)任務(wù)C] 開始執(zhí)行完畢来农,通知障礙器
[并發(fā)任務(wù)A] 開始執(zhí)行完畢,通知障礙器
[并發(fā)任務(wù)D] 開始執(zhí)行
[并發(fā)任務(wù)D] 開始執(zhí)行完畢崇堰,通知障礙器
[并發(fā)任務(wù)E] 開始執(zhí)行
[并發(fā)任務(wù)E] 開始執(zhí)行完畢沃于,通知障礙器
......執(zhí)行最后的任務(wù)了......