BlockingQueue介紹
- ArrayBlockingQueue: 一個(gè)由數(shù)組結(jié)構(gòu)組成的有界阻塞隊(duì)列胧奔。
- LinkedBlockingQueue: 一個(gè)由鏈表結(jié)構(gòu)組成的有界阻塞隊(duì)列勺馆。
- SynchronousQueue: 一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列誉己。
- PriorityBlockingQueue: 一個(gè)支持優(yōu)先級(jí)排序的無界阻塞隊(duì)列涕刚。
插入方法 add offer put offer
刪除方法 remove poll take poll
方法 | 拋出異常 | 返回特殊值 | 一直阻塞 | 超時(shí)退出 |
---|---|---|---|---|
插入方法 | add | offer | put | offer |
刪除方法 | remove | poll | take | poll |
BlockingQueue用法
帶著疑問去使用几晤,可能會(huì)更好:
- 隊(duì)列空著的時(shí)候去拿數(shù)據(jù)會(huì)怎么樣宫补?
- 隊(duì)列滿著的時(shí)候去存數(shù)據(jù)會(huì)怎么樣盛撑?
- 前面的方法碎节,能否會(huì)出現(xiàn)阻塞或者非阻塞現(xiàn)象?
隊(duì)列滿了抵卫,存數(shù)據(jù)會(huì)怎么樣狮荔?
隊(duì)列滿了,那么調(diào)用put方法介粘,會(huì)將當(dāng)前線程阻塞殖氏。
public static void main(String[] args) throws InterruptedException {
ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(10);
for (int i = 0 ; i < 15 ; i++) {
final int i1 = i;
Thread t1 = new Thread(() -> {
try {
blockingQueue.put(" msg " + i1);
System.out.println("======================= provider " + i1);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
t1.start();
}
Thread t2 = new Thread(() -> {
try {
String msg = blockingQueue.take();
System.out.println("======================= consumer " + msg);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
t2.start();
}
隊(duì)列空了,取數(shù)據(jù)會(huì)怎么樣碗短?
隊(duì)列空了受葛,那么調(diào)用take方法,會(huì)將當(dāng)前線程阻塞偎谁。
public static void main(String[] args) throws InterruptedException {
ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(10);
for (int i = 0 ; i < 5 ; i++) {
final int i1 = i;
Thread t1 = new Thread(() -> {
try {
blockingQueue.put(" msg " + i1);
System.out.println("======================= provider " + i1);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
t1.start();
}
for (int j = 0 ; j < 10 ; j++) {
Thread t2 = new Thread(() -> {
try {
String msg = blockingQueue.take();
System.out.println("======================= consumer " + msg);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
t2.start();
}
}
poll方法锄贼,在隊(duì)列空了的時(shí)候,返回false却嗡,取出的對(duì)象為空暮现,當(dāng)前線程非阻塞。
offer方法铐望,在隊(duì)列滿了的時(shí)候冈涧,返回false,當(dāng)前線程非阻塞正蛙。
源碼
ArrayBlockingQueue構(gòu)造器初始化參數(shù)
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
// 容量參數(shù)小于0督弓,報(bào)非法參數(shù)異常
if (capacity <= 0)
throw new IllegalArgumentException();
// 初始化存入元素的數(shù)組
this.items = new Object[capacity];
// 初始化非公平的重入鎖
lock = new ReentrantLock(fair);
// 設(shè)置非空的等待喚醒隊(duì)列
notEmpty = lock.newCondition();
// 設(shè)置非滿的等待喚醒隊(duì)列
notFull = lock.newCondition();
}
put阻塞存入數(shù)據(jù)方法
public void put(E var1) throws InterruptedException {
checkNotNull(var1);
// 重入鎖上鎖
ReentrantLock var2 = this.lock;
var2.lockInterruptibly();
try {
// 當(dāng)隊(duì)列已經(jīng)滿了,那么就將當(dāng)前線程等待
while(this.count == this.items.length) {
this.notFull.await();
}
// 進(jìn)隊(duì)列
this.enqueue(var1);
} finally {
// 重入鎖解鎖
var2.unlock();
}
}
private void enqueue(E var1) {
// 獲取存放對(duì)象的數(shù)組
Object[] var2 = this.items;
// 將元素存入數(shù)組
var2[this.putIndex] = var1;
// 如果當(dāng)前的下標(biāo)已經(jīng)超出了存放對(duì)象數(shù)組的長度
if (++this.putIndex == var2.length) {
this.putIndex = 0;
}
++this.count;
// 喚醒非空等待隊(duì)列中的元素
this.notEmpty.signal();
}
take阻塞拿取數(shù)據(jù)方法
public E take() throws InterruptedException {
ReentrantLock var1 = this.lock;
var1.lockInterruptibly();
Object var2;
try {
// 當(dāng)前元素的個(gè)數(shù)已經(jīng)為0
while(this.count == 0) {
// 當(dāng)前線程等待
this.notEmpty.await();
}
// 出隊(duì)列
var2 = this.dequeue();
} finally {
var1.unlock();
}
return var2;
}
private E dequeue() {
// 獲取存放元素的數(shù)組
Object[] var1 = this.items;
// 通過記錄的取出下標(biāo)索引取出元素
Object var2 = var1[this.takeIndex];
var1[this.takeIndex] = null;
// 存放元素的數(shù)組已經(jīng)到達(dá)的最后乒验,沒有元素了
if (++this.takeIndex == var1.length) {
this.takeIndex = 0;
}
// 記錄元素的個(gè)數(shù)減1
--this.count;
if (this.itrs != null) {
this.itrs.elementDequeued();
}
// 喚醒所有的未滿的等待隊(duì)列中的元素
this.notFull.signal();
return var2;
}