關(guān)鍵方法弊攘,
put和take
這倆個(gè)方法在入隊(duì)列的時(shí)候如果隊(duì)列已滿,就會(huì)等待隊(duì)列有了空位置再入,出隊(duì)的時(shí)候如果隊(duì)列為空就會(huì)阻塞等待隊(duì)列有了值再出蹲堂。
package com.ghw.springboot.A0307;
import java.util.Random;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 阻塞隊(duì)列測(cè)試
*/
public class BlockingQueueTest {
public static void main(String[] args) throws InterruptedException {
// 聲明一個(gè)容量為5的緩存隊(duì)列
BlockingQueue<String> queue = new LinkedBlockingQueue<>(5);
Producer producer1 = new Producer(queue);
Producer producer2 = new Producer(queue);
Producer producer3 = new Producer(queue);
Consumer consumer = new Consumer(queue);
// 借助Executors
ExecutorService service = Executors.newCachedThreadPool();
// 啟動(dòng)線程
service.execute(producer1);
service.execute(producer2);
service.execute(producer3);
service.execute(consumer);
// 執(zhí)行3s
Thread.sleep(3 * 1000);
producer1.stop();
producer2.stop();
producer3.stop();
Thread.sleep(2000);
// 退出Executor
service.shutdown();
}
}
class Consumer implements Runnable {
private BlockingQueue<String> queue;
private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
public Consumer(BlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
System.out.println("啟動(dòng)消費(fèi)者線程搁料!" + Thread.currentThread().getName());
Random r = new Random();
boolean isRunning = true;
try {
while (true) {
System.out.println("隊(duì)列:" + queue + "隊(duì)列大小:" + queue.size());
String data = queue.take();
System.out.println("正在消費(fèi)數(shù)據(jù):" + data);
TimeUnit.MILLISECONDS.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
System.out.println("退出消費(fèi)者線程或详!");
}
}
}
class Producer implements Runnable {
private volatile boolean isRunning = true;
private BlockingQueue queue;
private static AtomicInteger count = new AtomicInteger();
private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
public Producer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
String data;
Random r = new Random();
System.out.println("啟動(dòng)生產(chǎn)者線程!" + Thread.currentThread().getName());
try {
while (isRunning) {
TimeUnit.MILLISECONDS.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
data = "data:" + count.incrementAndGet();
System.out.println(data + "入隊(duì)");
queue.put(data);
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
System.out.println("退出生產(chǎn)者線程郭计!");
}
}
public void stop() {
isRunning = false;
}
}
運(yùn)行結(jié)果:
啟動(dòng)生產(chǎn)者線程霸琴!pool-1-thread-2
啟動(dòng)消費(fèi)者線程!pool-1-thread-4
隊(duì)列:[]隊(duì)列大小:0
啟動(dòng)生產(chǎn)者線程昭伸!pool-1-thread-1
啟動(dòng)生產(chǎn)者線程梧乘!pool-1-thread-3
data:1入隊(duì)
正在消費(fèi)數(shù)據(jù):data:1
隊(duì)列:[]隊(duì)列大小:0
data:2入隊(duì)
正在消費(fèi)數(shù)據(jù):data:2
data:3入隊(duì)
data:4入隊(duì)
data:5入隊(duì)
隊(duì)列:[data:3, data:4, data:5]隊(duì)列大小:3
正在消費(fèi)數(shù)據(jù):data:3
data:6入隊(duì)
data:7入隊(duì)
data:8入隊(duì)
data:9入隊(duì)
隊(duì)列:[data:4, data:5, data:6, data:7, data:8]隊(duì)列大小:5
正在消費(fèi)數(shù)據(jù):data:4
data:10入隊(duì)
data:11入隊(duì)
data:12入隊(duì)
隊(duì)列:[data:5, data:6, data:7, data:8, data:9]隊(duì)列大小:5
正在消費(fèi)數(shù)據(jù):data:5
data:13入隊(duì)
隊(duì)列:[data:6, data:7, data:8, data:9, data:10]隊(duì)列大小:5
正在消費(fèi)數(shù)據(jù):data:6
隊(duì)列:[data:7, data:8, data:9, data:10, data:11]隊(duì)列大小:5
退出生產(chǎn)者線程!
正在消費(fèi)數(shù)據(jù):data:7
data:14入隊(duì)
隊(duì)列:[data:8, data:9, data:10, data:11, data:12]隊(duì)列大小:5
正在消費(fèi)數(shù)據(jù):data:8
退出生產(chǎn)者線程庐杨!
隊(duì)列:[data:9, data:10, data:11, data:12, data:13]隊(duì)列大小:5
正在消費(fèi)數(shù)據(jù):data:9
退出生產(chǎn)者線程选调!
隊(duì)列:[data:10, data:11, data:12, data:13, data:14]隊(duì)列大小:5
正在消費(fèi)數(shù)據(jù):data:10
隊(duì)列:[data:11, data:12, data:13, data:14]隊(duì)列大小:4
正在消費(fèi)數(shù)據(jù):data:11
隊(duì)列:[data:12, data:13, data:14]隊(duì)列大小:3
正在消費(fèi)數(shù)據(jù):data:12
隊(duì)列:[data:13, data:14]隊(duì)列大小:2
正在消費(fèi)數(shù)據(jù):data:13
隊(duì)列:[data:14]隊(duì)列大小:1
正在消費(fèi)數(shù)據(jù):data:14
隊(duì)列:[]隊(duì)列大小:0