多線程中比較經典的就是生產者消費者模式了,很多復雜的模式也是在這個基礎上演變的哪雕。里面也又很多的小知識點。
生產者消費者代碼
public class AssemblyLine<T> {
//存放生產數(shù)據(jù)的隊列
private final List<T> queue;
//最大生產值
private final int queueMaxSize;
public AssemblyLine(List<T> messageQueue, int max) {
this.queue = messageQueue;
this.queueMaxSize = max;
}
//生產者
public void produceMessage(T message) {
synchronized (queue){
while (queue.size()>=queueMaxSize){
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName()+":produce:"+message);
queue.add(message);
queue.notifyAll();
}
}
//消費者
public void consumeMessage(){
synchronized (queue){
while (queue.isEmpty()){
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName()+":consume:"+ queue.remove(0));
queue.notifyAll();
}
}
執(zhí)行代碼
public class Test {
public static void main(String[] args) {
final List<String> messageQueue=new LinkedList<>();
final int MAX_PRODUCE_SIZE=20;
AssemblyLine<String> assemblyLine = new AssemblyLine<>(messageQueue, MAX_PRODUCE_SIZE);
Stream.of("produce1","produce2","produce3").forEach(s ->
new Thread(()-> {
while (true){
assemblyLine.produceMessage(UUID.randomUUID().toString());
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},s).start()
);
Stream.of("consume1","consume2").forEach(s ->
new Thread(()->{
while (true){
assemblyLine.consumeMessage();
try {
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},s).start()
);
}
總結
- 在生產者消費者模型中判斷邏輯代碼中不能使用if必須使用while,因為需要被喚醒后的生產者或者消費者去重新執(zhí)行判斷列吼,否則可能一次生成多次消費,生成過剩的情況發(fā)生寞钥。
- 因為Object.notify()不能指定喚醒某個等待的線程,所以只能使用notifyAll();