Semaphore實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模式
- acquire方法:獲取信號(hào)量的許可慈省,并把信號(hào)量的值減1
- release方法:釋放一個(gè)許可葵腹,將信號(hào)量的值加1
package thread;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
public class ProducerConsumer4 {
static Semaphore emptySlot = new Semaphore(1);
static Semaphore fullSlot = new Semaphore(0);
public static void main(String[] args) throws InterruptedException {
Queue<Integer> queue = new LinkedList<>();
ProducerConsumer4.Producer p1 = new ProducerConsumer4.Producer(queue);
ProducerConsumer4.Consumer c1 = new ProducerConsumer4.Consumer(queue);
p1.start();
c1.start();
p1.join();
c1.join();
}
static class Producer extends Thread {
Queue<Integer> queue;
Producer(Queue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
emptySlot.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (ProducerConsumer4.class) {
int tmp = new Random().nextInt();
queue.offer(tmp);
System.out.println("Producing " + tmp);
}
fullSlot.release();
}
}
static class Consumer extends Thread {
Queue<Integer> queue;
Consumer(Queue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
fullSlot.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (ProducerConsumer4.class) {
System.out.println("Consuming " + queue.poll());
}
emptySlot.release();
}
}
}
Exchanger實(shí)現(xiàn)生產(chǎn)者消費(fèi)者
- 線(xiàn)程到達(dá)交換點(diǎn)驻子,進(jìn)行交換
package thread;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Exchanger;
public class ProducerConsumer5 {
public static void main(String[] args) throws InterruptedException {
Exchanger<Integer> exchanger = new Exchanger<>();
ProducerConsumer5.Producer p1 = new ProducerConsumer5.Producer(exchanger);
ProducerConsumer5.Consumer c1 = new ProducerConsumer5.Consumer(exchanger);
p1.start();
c1.start();
p1.join();
c1.join();
}
static class Producer extends Thread {
Exchanger<Integer> exchanger;
Producer(Exchanger<Integer> exchanger) {
this.exchanger = exchanger;
}
@Override
public void run() {
while (true) {
int tmp = new Random().nextInt();
try {
exchanger.exchange(tmp);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Producing " + tmp);
}
}
}
static class Consumer extends Thread {
Exchanger<Integer> exchanger;
Consumer(Exchanger<Integer> exchanger) {
this.exchanger = exchanger;
}
@Override
public void run() {
while (true) {
try {
System.out.println("Consuming " + exchanger.exchange(null));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}