一猴鲫、并發(fā)隊(duì)列
在并發(fā)隊(duì)列上JDK提供了兩套實(shí)現(xiàn)权均,
一個(gè)是以ConcurrentLinkedQueue為代表的高性能隊(duì)列非阻塞具伍,
一個(gè)是以BlockingQueue接口為代表的阻塞隊(duì)列钞脂,無(wú)論哪種都繼承自Queue燥狰。
1棘脐、阻塞隊(duì)列與非阻塞隊(duì)
阻塞隊(duì)列與普通隊(duì)列的區(qū)別在于:
阻塞隊(duì)列:
- 當(dāng)隊(duì)列是空的時(shí),從隊(duì)列中獲取元素的操作將會(huì)被阻塞龙致,試圖從空的阻塞隊(duì)列中獲取元素的線程將會(huì)被阻塞蛀缝,直到其他的線程往空的隊(duì)列插入新的元素;
- 當(dāng)隊(duì)列是滿時(shí)目代,往隊(duì)列里添加元素的操作會(huì)被阻塞屈梁。試圖往已滿的阻塞隊(duì)列中添加新元素的線程同樣也會(huì)被阻塞嗤练,直到其他的線程使隊(duì)列重新變得空閑起來(lái),如從隊(duì)列中移除一個(gè)或者多個(gè)元素俘闯,或者完全清空隊(duì)列.
2潭苞、ConcurrentLinkedQeque
ConcurrentLinkedQueue : 是一個(gè)適用于高并發(fā)場(chǎng)景下的隊(duì)列,通過(guò)無(wú)鎖的方式真朗,實(shí)現(xiàn)
了高并發(fā)狀態(tài)下的高性能此疹,通常ConcurrentLinkedQueue性能好于BlockingQueue.它
是一個(gè)基于鏈接節(jié)點(diǎn)的無(wú)界線程安全隊(duì)列。該隊(duì)列的元素遵循先進(jìn)先出的原則遮婶。頭是最先
加入的蝗碎,尾是最近加入的,該隊(duì)列不允許null元素旗扑。
// 非阻塞式隊(duì)列蹦骑,無(wú)界隊(duì)列
ConcurrentLinkedDeque q = new ConcurrentLinkedDeque();
q.offer("張三");
q.offer("李四");
q.offer("王五");
//從頭獲取元素,刪除該元素
System.out.println(q.poll());
//從頭獲取元素,不刪除該元素
System.out.println(q.peek());
//獲取總長(zhǎng)度
System.out.println(q.size());
3、BlockingQueue
阻塞隊(duì)列(BlockingQueue)是一個(gè)支持兩個(gè)附加操作的隊(duì)列臀防。這兩個(gè)附加的操作是:
- 在隊(duì)列為空時(shí)眠菇,獲取元素的線程會(huì)等待隊(duì)列變?yōu)榉强铡?/li>
- 當(dāng)隊(duì)列滿時(shí),存儲(chǔ)元素的線程會(huì)等待隊(duì)列可用袱衷。
在Java中捎废,BlockingQueue的接口位于java.util.concurrent 包中(在Java5版本開(kāi)始提供),由上面介紹的阻塞隊(duì)列的特性可知致燥,阻塞隊(duì)列是線程安全的登疗。
1)、ArrayBlockingQueue
ArrayBlockingQueue是一個(gè)有邊界的阻塞隊(duì)列嫌蚤,它的內(nèi)部實(shí)現(xiàn)是一個(gè)數(shù)組辐益。有邊界的意思是它的容量是有限的,我們必須在其初始化的時(shí)候指定它的容量大小脱吱,容量大小一旦指定就不可改變智政。
ArrayBlockingQueue是以先進(jìn)先出的方式存儲(chǔ)數(shù)據(jù),最新插入的對(duì)象是尾部箱蝠,最新移出的對(duì)象是頭部女仰。下面
是一個(gè)初始化和使用ArrayBlockingQueue的例子:
<String> arrays = new ArrayBlockingQueue<String>(3);
arrays.offer("張三");
arrays.offer("李四");
arrays.offer("王五");
arrays.offer("666", 3, TimeUnit.SECONDS); // 隊(duì)列滿了,阻塞3秒后向下執(zhí)行
System.out.println(arrays.poll()); // 張三
System.out.println(arrays.poll()); // 李四
System.out.println(arrays.poll()); // 王五
System.out.println(arrays.poll(3, TimeUnit.SECONDS)); //隊(duì)列為空抡锈,阻塞3秒后結(jié)束
2)疾忍、LinkedBlockingQueue
LinkedBlockingQueue阻塞隊(duì)列大小的配置是可選的,如果我們初始化時(shí)指定一個(gè)大小床三,它就是有邊界的一罩,如果不指定,它就是無(wú)邊界的撇簿。說(shuō)是無(wú)邊界聂渊,其實(shí)是采用了默認(rèn)大小為Integer.MAX_VALUE的容量 差购。它的內(nèi)部實(shí)現(xiàn)是一個(gè)鏈表。
和ArrayBlockingQueue一樣汉嗽,LinkedBlockingQueue 也是以先進(jìn)先出的方式存儲(chǔ)數(shù)據(jù)欲逃,最新插入的對(duì)象是尾部,最新移出的對(duì)象是頭部饼暑。下面是一個(gè)初始化和使LinkedBlockingQueue的例子:
LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(3);
linkedBlockingQueue.add("張三");
linkedBlockingQueue.add("李四");
linkedBlockingQueue.add("李四");
System.out.println(linkedBlockingQueue.size()); // 3
3)稳析、PriorityBlockingQueue(有界,快滿時(shí)自動(dòng)擴(kuò)容弓叛,看似無(wú)界)
PriorityBlockingQueue是一個(gè)沒(méi)有邊界的隊(duì)列彰居,它的排序規(guī)則和 java.util.PriorityQueue一樣。需要注意撰筷,PriorityBlockingQueue中允許插入null對(duì)象陈惰。
所有插入PriorityBlockingQueue的對(duì)象必須實(shí)現(xiàn) java.lang.Comparable接口,隊(duì)列優(yōu)先級(jí)的排序規(guī)則就
是按照我們對(duì)這個(gè)接口的實(shí)現(xiàn)來(lái)定義的毕籽。
另外抬闯,我們可以從PriorityBlockingQueue獲得一個(gè)迭代器Iterator,但這個(gè)迭代器并不保證按照優(yōu)先級(jí)順序進(jìn)行迭代关筒。
4)溶握、SynchronousQueue
SynchronousQueue隊(duì)列內(nèi)部?jī)H允許容納一個(gè)元素。當(dāng)一個(gè)線程插入一個(gè)元素后會(huì)被阻塞平委,除非這個(gè)元素被另一個(gè)線程消費(fèi)奈虾。
5)夺谁、使用BlockingQueue模擬生產(chǎn)者與消費(fèi)者
class ProducerThread implements Runnable {
private BlockingQueue<String> blockingQueue;
private AtomicInteger count = new AtomicInteger();
private volatile boolean FLAG = true;
public ProducerThread(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "生產(chǎn)者開(kāi)始啟動(dòng)....");
while (FLAG) {
String data = count.incrementAndGet() + "";
try {
boolean offer = blockingQueue.offer(data, 2, TimeUnit.SECONDS);
if (offer) {
System.out.println(Thread.currentThread().getName() + ",生產(chǎn)隊(duì)列" + data + "成功..");
} else {
System.out.println(Thread.currentThread().getName() + ",生產(chǎn)隊(duì)列" + data + "失敗..");
}
Thread.sleep(1000);
} catch (Exception e) {
}
}
System.out.println(Thread.currentThread().getName() + ",生產(chǎn)者線程停止...");
}
public void stop() {
this.FLAG = false;
}
}
class ConsumerThread implements Runnable {
private volatile boolean FLAG = true;
private BlockingQueue<String> blockingQueue;
public ConsumerThread(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "消費(fèi)者開(kāi)始啟動(dòng)....");
while (FLAG) {
try {
String data = blockingQueue.poll(2, TimeUnit.SECONDS);
if (data == null || data == "") {
FLAG = false;
System.out.println("消費(fèi)者超過(guò)2秒時(shí)間未獲取到消息.");
return;
}
System.out.println("消費(fèi)者獲取到隊(duì)列信息成功,data:" + data);
} catch (Exception e) {
// TODO: handle exception
}
}
}
}
public class Test0008 {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>(3);
ProducerThread producerThread = new ProducerThread(blockingQueue);
ConsumerThread consumerThread = new ConsumerThread(blockingQueue);
Thread t1 = new Thread(producerThread);
Thread t2 = new Thread(consumerThread);
t1.start();
t2.start();
//10秒后 停止線程..
try {
Thread.sleep(10*1000);
producerThread.stop();
} catch (Exception e) {
// TODO: handle exception
}
}
}
- ArrayDeque, (數(shù)組雙端隊(duì)列)
- PriorityQueue, (優(yōu)先級(jí)隊(duì)列)
- ConcurrentLinkedQueue, (基于鏈表的并發(fā)隊(duì)列)
- DelayQueue, (延期阻塞隊(duì)列)(阻塞隊(duì)列實(shí)現(xiàn)了BlockingQueue接口)
- ArrayBlockingQueue, 常用(基于數(shù)組的并發(fā)阻塞隊(duì)列)
- LinkedBlockingQueue, 常用(基于鏈表的FIFO阻塞隊(duì)列)
- LinkedBlockingDeque, (基于鏈表的FIFO雙端阻塞隊(duì)列)
- PriorityBlockingQueue,常用 (帶優(yōu)先級(jí)的無(wú)界阻塞隊(duì)列廉赔,)
- SynchronousQueue常用 (并發(fā)同步阻塞隊(duì)列)
個(gè)人博客 蝸牛