- 最外層的while是為了執(zhí)行完一輪不終止
wait()和notifyall()
ublic class ShareDataV1 {
public static AtomicInteger atomicInteger = new AtomicInteger();
public volatile boolean flag = true;
public static final int MAX_COUNT = 10;
public static final List<Integer> pool = new ArrayList<>();
public void produce() {
// 判斷,干活他托,通知
while (flag) {
// 每隔 1000 毫秒生產(chǎn)一個(gè)商品
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
synchronized (pool) {
//池子滿了笔刹,生產(chǎn)者停止生產(chǎn)
//TODO 判斷
while (pool.size() == MAX_COUNT) {
try {
System.out.println("pool is full, wating...");
pool.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//干活
pool.add(atomicInteger.incrementAndGet());
System.out.println("produce number:" + atomicInteger.get() + "\t" + "current size:" + pool.size());
//通知
pool.notifyAll();
}
}
}
public void consumue() {
// 判斷页畦,干活,通知
while (flag) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
synchronized (pool) {
//池子空了,消費(fèi)者停止消費(fèi)
while (pool.size() == 0) {
try {
System.out.println("pool is empty, wating...");
pool.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//干活
int temp = pool.get(0);
pool.remove(0);
System.out.println("cousume number:" + temp + "\t" + "current size:" + pool.size());
//通知
pool.notifyAll();
}
}
}
public void stop() {
flag = false;
}
}
lock,condition,signal
public class ShareDataV2 {
public static AtomicInteger atomicInteger = new AtomicInteger();
public volatile boolean flag = true;
public static final int MAX_COUNT = 10;
public static final List<Integer> pool = new ArrayList<>();
private Lock lock = new ReentrantLock();
//也可以一個(gè)condition然后signalall
private Condition produce_condition = lock.newCondition();
private Condition consumue_condition = lock.newCondition();
public void produce() {
// 判斷蒋纬,干活萤彩,通知
while (flag){
lock.lock();
try {
Thread.sleep(100);
//池子滿了粪滤,生產(chǎn)者停止生產(chǎn)
while (pool.size() == MAX_COUNT) {
//等待,不能生產(chǎn)
produce_condition.await();
}
//干活
pool.add(atomicInteger.incrementAndGet());
System.out.println("produce number:" + atomicInteger.get() + "\t" + "current size:" + pool.size());
consumue_condition.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public void consumue() {
// 判斷雀扶,干活杖小,通知
while (flag) {
lock.lock();
try {
Thread.sleep(1000);
//池子空了,消費(fèi)者停止消費(fèi)
while (pool.size() == 0) {
//等待愚墓,不能消費(fèi)
System.out.println("pool is empty, wating...");
consumue_condition.await();
}
//干活
int temp = pool.get(0);
pool.remove(0);
System.out.println("cousume number:" + temp + "\t" + "current size:" + pool.size());
produce_condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public void stop() {
flag = false;
}
}
阻塞隊(duì)列
public class ShareDataV3 {
private static final int MAX_CAPACITY = 10; //阻塞隊(duì)列容量
private static BlockingQueue<Integer> blockingQueue= new ArrayBlockingQueue<>(MAX_CAPACITY); //阻塞隊(duì)列
private volatile boolean FLAG = true;
private AtomicInteger atomicInteger = new AtomicInteger();
public void produce() throws InterruptedException {
while (FLAG){
boolean retvalue = blockingQueue.offer(atomicInteger.incrementAndGet(), 2, TimeUnit.SECONDS);
if (retvalue==true){
System.out.println(Thread.currentThread().getName()+"\t 插入隊(duì)列"+ atomicInteger.get()+"成功"+"資源隊(duì)列大小= " + blockingQueue.size());
}else {
System.out.println(Thread.currentThread().getName()+"\t 插入隊(duì)列"+ atomicInteger.get()+"失敗"+"資源隊(duì)列大小= " + blockingQueue.size());
}
TimeUnit.SECONDS.sleep(1);
}
System.out.println(Thread.currentThread().getName()+"FLAG變?yōu)閒lase予权,生產(chǎn)停止");
}
public void consume() throws InterruptedException {
Integer result = null;
while (true){
result = blockingQueue.poll(2, TimeUnit.SECONDS);
if (null==result){
System.out.println("超過(guò)兩秒沒(méi)有取道數(shù)據(jù),消費(fèi)者即將退出");
return;
}
System.out.println(Thread.currentThread().getName()+"\t 消費(fèi)"+ result+"成功"+"\t\t"+"資源隊(duì)列大小= " + blockingQueue.size());
Thread.sleep(1500);
}
}
public void stop() {
this.FLAG = false;
}
}
三個(gè)的統(tǒng)一調(diào)用
public class ProducerConsumer_V1 {
public static void main(String[] args) {
ShareDataV1 shareDataV1 = new ShareDataV1();
new Thread(() -> {
shareDataV1.produce();
}, "AAA").start();
new Thread(() -> {
shareDataV1.consumue();
}, "BBB").start();
new Thread(() -> {
shareDataV1.produce();
}, "CCC").start();
new Thread(() -> {
shareDataV1.consumue();
}, "DDD").start();
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
shareDataV1.stop();
}
}