本案例參考至《Java7并發(fā)編程實(shí)戰(zhàn)手冊(cè)》
在java并發(fā)編程的過(guò)程中,往往會(huì)遇到這樣的需求:現(xiàn)在有多個(gè)工人漱抓,每個(gè)工人都制作同一件產(chǎn)品,而且相對(duì)于每個(gè)工人來(lái)說(shuō)產(chǎn)品的制作工序都是一樣的。每制作完一道工序惨奕,產(chǎn)品都需要使用大型機(jī)器進(jìn)行再加工揖赴,為了保證經(jīng)濟(jì)效率」菽洌現(xiàn)在要求每一道工序都需要所有的工人完成后,將所有的產(chǎn)品送進(jìn)工廠加工燥滑,加工完畢之后再將產(chǎn)品分發(fā)給所有的工人進(jìn)行下一輪的工序渐北。(也就是說(shuō),每道工序都必須等待所有的人完成之后铭拧,大家才能繼續(xù)下面的工作赃蛛,只要有一個(gè)人沒(méi)完成都要等到這個(gè)人完成之后才能向下執(zhí)行)
在java中要實(shí)現(xiàn)這樣的需求可以使用Phaser并發(fā)階段任務(wù)執(zhí)行機(jī)制
工人類(lèi)(表示工人執(zhí)行工作的各個(gè)步驟)
import java.util.Date;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
public class Worker implements Runnable{
private Phaser phaser;
public Worker(Phaser phaser) {
super();
this.phaser = phaser;
}
@Override
public void run() {
//將每個(gè)工人到達(dá)工廠的信息打印出來(lái)
System.out.printf("%s: Has arrived to do the company. %s---------00000000000000000000\n",Thread.currentThread().getName(),new Date());
//等待所有的線(xiàn)程執(zhí)行到這里,各個(gè)線(xiàn)程才會(huì)開(kāi)始向下執(zhí)行搀菩,不過(guò)在向下執(zhí)行之前會(huì)執(zhí)行phaser的onAdvance()方法
phaser.arriveAndAwaitAdvance();
System.out.printf("%s :Is going to do the frist step.%s******************0\n",Thread.currentThread().getName(),new Date());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("%s: Has done the frist step. %s\n",Thread.currentThread().getName(),new Date());
phaser.arriveAndAwaitAdvance();
System.out.printf("%s :Is going to do the second step.%s---------1111111111111111111\n",Thread.currentThread().getName(),new Date());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("%s: Has done the second step. %s*****************1\n",Thread.currentThread().getName(),new Date());
phaser.arriveAndAwaitAdvance();
System.out.printf("%s :Is going to do the thrid step.%s---------222222222222222222\n",Thread.currentThread().getName(),new Date());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("%s: Has done the thrid step. %s******************2\n",Thread.currentThread().getName(),new Date());
phaser.arriveAndAwaitAdvance();
}
}
自定義的MyPhaser類(lèi)
/**
* MyPhaser:并發(fā)階段任務(wù)中的階段切換
* @author JM
* @date 2017-2-27 下午9:54:15
* @since JDK 1.7
*/
public class MyPhaser extends Phaser {
/**
* 重寫(xiě)onAdvance(int x,int y)方法
* 在Phaser類(lèi)中呕臂,onAdvance(int x,int y)方法在Phaser階段改變的時(shí)候會(huì)自動(dòng)執(zhí)行,
* x表示當(dāng)前的階段數(shù)肪跋,y表示注冊(cè)的參與者數(shù)量
* 如果onAdvance(int x,int y)方法返回false表示phaser在繼續(xù)執(zhí)行歧蒋,返回true表示phaser已經(jīng)完成執(zhí)行并且進(jìn)入了終止態(tài)
*/
@Override
public boolean onAdvance(int phase, int registeredParties) {
switch (phase) {
case 0:
return workersArrived();
case 1:
return finishFristExercise();
case 2:
return finishSecondExercise();
case 3:
return finishExam();
default:
return true;
}
}
/**
* workersArrived:返回false,表明phaser已經(jīng)開(kāi)始執(zhí)行
* @author JM
* 2017-2-27 下午9:53:39
* @return
* boolean
*/
private boolean workersArrived(){
System.out.printf("Phaser: The job are going to start. The workers are ready.\n");
//getRegisteredParties()返回的是注冊(cè)的線(xiàn)程數(shù)
System.out.printf("We have %d workers.\n",getRegisteredParties());
return false;
}
/**
* finishFristExercise:表示完成了第一階段的工序
* @author JM
* 2017-2-27 下午10:08:04
* @return
* boolean
*/
private boolean finishFristJob(){
System.out.printf("Phaser: All the workers have finished the first step.\n");
System.out.printf("Phaser: It's time to second step.\n");
return false;
}
/**
* finishFristExercise:表示完成了第二階段的工序
* @author JM
* 2017-2-27 下午10:08:04
* @return
* boolean
*/
private boolean finishSecondJob(){
System.out.printf("Phaser: All the workers have finished the second step.\n");
System.out.printf("Phaser: It's time to third step.\n");
return false;
}
/**
* finishFristExercise:表示完成了第二階段的工序
* @author JM
* 2017-2-27 下午10:08:04
* @return
* boolean
*/
private boolean finishJob(){
System.out.printf("Phaser: All the workers have finished the job.\n");
System.out.printf("Phaser: Thank you for your time.\n");
return true;
}
}
測(cè)試類(lèi)
import java.util.concurrent.Phaser;
public class Main {
public static void main(String[] args) {
MyPhaser myPhaser = new MyPhaser();
Woker[] workers= new Worker[5];
for (int i = 0; i < workers.length; i++) {
// 創(chuàng)建五個(gè)工人對(duì)象,并且通過(guò)register()方法將他們注冊(cè)到phaser谜洽。五個(gè)線(xiàn)程萝映,phaser的每個(gè)階段
// (調(diào)用arriveAndAwaitAdvance()方法的地方)都要等待五個(gè)線(xiàn)程執(zhí)行完才能繼續(xù)執(zhí)行下去
students[i] = new Student(myPhaser);
myPhaser.register();
}
Thread threads[] = new Thread[workers.length];
for (int i = 0; i < workers.length; i++) {
threads[i] = new Thread(workers[i], "Workers" + i);
threads[i].start();
}
for (int i = 0; i < threads.length; i++) {
try {
threads[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.printf("Main:The phaser has finished:%s.\n",myPhaser.isTerminated());
}
}
MyPhaser中,重寫(xiě)了onAdvance(int phase,int registerParties)方法阐虚。其中phaser表示當(dāng)前階段數(shù)(每個(gè)線(xiàn)程在完成一階段的任務(wù)時(shí)可以調(diào)用arriverAndAwaitAdvance()等待其他所有線(xiàn)程執(zhí)行完這個(gè)階段锌俱,然后再繼續(xù)執(zhí)行下去,這樣每一次調(diào)用arriverAndAwaitAdvance()方法就會(huì)使phaser階段數(shù)加一敌呈,所有的線(xiàn)程都完成之后并不是立即向下執(zhí)行贸宏,而是先要執(zhí)行onAdvance,這個(gè)就是onAdvance方法的意義)磕洪。