使用BlockingQueue模擬生產(chǎn)者與消費(fèi)者
class Producer extends Thread{
private BlockingQueue queue;
private volatile boolean flag=true;
private static AtomicInteger count=new AtomicInteger();
public Producer(BlockingQueue queue){
this.queue=queue;
}
@Override
public void run() {
System.out.println(getName()+"生產(chǎn)者線程啟動(dòng)...");
try {
while (flag){
System.out.println(getName()+"生產(chǎn)者開始生產(chǎn)消息...");
//如果flag為true,queue就入隊(duì)列。(原子類進(jìn)行計(jì)數(shù))
Integer i = count.incrementAndGet();
boolean offer = queue.offer(i);
if(offer){
System.out.println(getName()+"生產(chǎn)者生產(chǎn)生產(chǎn)消息:"+i+"成功");
}else {
System.out.println(getName()+"生產(chǎn)者生產(chǎn)生產(chǎn)消息:"+i+"失敗");
}
Thread.sleep(1000);
}
}catch (Exception e){
}finally {
System.out.println(getName()+"生產(chǎn)者線程停止...");
}
}
public void stopThread(){
this.flag=false;
}
}
class Consumer extends Thread{
private BlockingQueue queue;
private volatile boolean flag=true;
public Consumer(BlockingQueue queue){
this.queue=queue;
}
@Override
public void run() {
System.out.println(getName()+"消費(fèi)者線程啟動(dòng)...");
try {
while (flag){
System.out.println(getName()+"消費(fèi)者開始消費(fèi)消息...");
//如果flag為true盖喷,queue就出隊(duì)列
Integer poll = (Integer) queue.poll(2, TimeUnit.SECONDS);
if(poll != null){
System.out.println(getName()+"消費(fèi)者獲取消息:"+poll+"成功");
}else {
System.out.println(getName()+"消費(fèi)者獲取消息:"+poll+"失敗");
this.flag=false;
}
}
}catch (Exception e){
}finally {
System.out.println(getName()+"消費(fèi)者線程停止...");
}
}
}
public class ProduceConsumerThread {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(10);
Producer p1 =new Producer(queue);
Producer p2 =new Producer(queue);
Consumer c1 =new Consumer(queue);
p1.start();
p2.start();
c1.start();
Thread.sleep(3*1000);
p1.stopThread();
p2.stopThread();
}
}