Queue:基本上,隊(duì)列就是一個(gè)先進(jìn)先出(FIFO)的數(shù)據(jù)結(jié)構(gòu)
Queue接口與List、Set同一級(jí)別,都是集成了Collection接口粟关。
Queue分為:阻塞隊(duì)列實(shí)現(xiàn)和非阻塞隊(duì)列實(shí)現(xiàn),線程安全和非線程安全之分环戈。
一闷板、非阻塞隊(duì)列(Queue)
不阻塞隊(duì)列: PriorityQueue 和 ConcurrentLinkedQueue
PriorityQueue 類實(shí)質(zhì)上維護(hù)了一個(gè)有序列表。加入到 Queue 中的元素根據(jù)它們的天然排序(通過(guò)其 java.util.Comparable 實(shí)現(xiàn))或者根據(jù)傳遞給構(gòu)造函數(shù)的 java.util.Comparator 實(shí)現(xiàn)來(lái)定位院塞。
ConcurrentLinkedQueue 是基于鏈接節(jié)點(diǎn)的遮晚、線程安全的隊(duì)列。并發(fā)訪問(wèn)不需要同步拦止。因?yàn)樗陉?duì)列的尾部添加元素并從頭部刪除它們县遣,所以只要不需要知道隊(duì)列的大 小,ConcurrentLinkedQueue 對(duì)公共集合的共享訪問(wèn)就可以工作得很好汹族。收集關(guān)于隊(duì)列大小的信息會(huì)很慢萧求,需要遍歷隊(duì)列。
當(dāng)往隊(duì)列中追加10個(gè)對(duì)象顶瞒,耗時(shí)對(duì)比
PriorityQueue total cost=26072ns , each cost=2607ns
ConcurrentLinkedQueue total cost=39504ns , each cost=3950ns
當(dāng)往隊(duì)列里面追加10000個(gè)數(shù)據(jù)時(shí)夸政,耗時(shí)對(duì)比
PriorityQueue total cost=1185094ns , each cost=118ns
ConcurrentLinkedQueue total cost=1157837ns , each cost=115ns
對(duì)比發(fā)現(xiàn)隊(duì)列在一次性追加越多數(shù)量的數(shù)據(jù)似乎速度越快
所有隊(duì)列的性能對(duì)比(代碼最下面)
PriorityQueue add total cost=2263924ns , each cost=226ns
ConcurrentLinkedQueue add total cost=1170872ns , each cost=117ns
LinkedBlockingQueue add total cost=3611376ns , each cost=361ns
ArrayBlockingQueue add total cost=1749199ns , each cost=174ns
PriorityBlockingQueue add total cost=2289602ns , each cost=228ns
DelayQueue add total cost=2733222ns , each cost=273ns
PriorityQueue remove total cost=1747223ns , each cost=174ns
ConcurrentLinkedQueue remove total cost=1502304ns , each cost=150ns
LinkedBlockingQueue remove total cost=1677698ns , each cost=167ns
ArrayBlockingQueue remove total cost=1133345ns , each cost=113ns
PriorityBlockingQueue remove total cost=1420928ns , each cost=142ns
DelayQueue remove total cost=1947109ns , each cost=194ns
二、阻塞隊(duì)列(BlockingQueue)
不是立即從隊(duì)列中添加或者刪除元素榴徐,線程執(zhí)行操作阻塞秒梳,直到有空間或者元素可用。
五個(gè)隊(duì)列所提供的各有不同:
- ArrayBlockingQueue :一個(gè)由數(shù)組支持的有界隊(duì)列箕速。
- LinkedBlockingQueue :一個(gè)由鏈接節(jié)點(diǎn)支持的可選有界隊(duì)列。
- PriorityBlockingQueue :一個(gè)由優(yōu)先級(jí)堆支持的無(wú)界優(yōu)先級(jí)隊(duì)列朋譬。
- DelayQueue :一個(gè)由優(yōu)先級(jí)堆支持的盐茎、基于時(shí)間的調(diào)度隊(duì)列。
- SynchronousQueue :一個(gè)利用 BlockingQueue 接口的簡(jiǎn)單聚集(rendezvous)機(jī)制徙赢。
add 增加一個(gè)元索 如果隊(duì)列已滿字柠,則拋出一個(gè)IIIegaISlabEepeplian異常
remove 移除并返回隊(duì)列頭部的元素 如果隊(duì)列為空,則拋出一個(gè)NoSuchElementException異常
** element **返回隊(duì)列頭部的元素 如果隊(duì)列為空狡赐,則拋出一個(gè)NoSuchElementException異常
offer 添加一個(gè)元素并返回true 如果隊(duì)列已滿窑业,則返回false
poll 移除并返問(wèn)隊(duì)列頭部的元素 如果隊(duì)列為空,則返回null
peek 返回隊(duì)列頭部的元素 如果隊(duì)列為空枕屉,則返回null
put 添加一個(gè)元素 如果隊(duì)列滿常柄,則阻塞
take 移除并返回隊(duì)列頭部的元素 如果隊(duì)列為空,則阻塞
** remove、element西潘、offer 卷玉、poll、peek 其實(shí)是屬于Queue接口 **
** 阻塞隊(duì)列的操作可以根據(jù)它們的響應(yīng)方式分為以下三類:aad喷市、removee和element操作在你試圖為一個(gè)已滿的隊(duì)列增加元素或從空隊(duì)列取得元素時(shí) 拋出異常相种。當(dāng)然,在多線程程序中品姓,隊(duì)列在任何時(shí)間都可能變成滿的或空的寝并,所以你可能想使用offer、poll腹备、peek方法衬潦。這些方法在無(wú)法完成任務(wù)時(shí) 只是給出一個(gè)出錯(cuò)示而不會(huì)拋出異常。**
** 注意:poll和peek方法出錯(cuò)進(jìn)返回null馏谨。因此别渔,向隊(duì)列中插入null值是不合法的 **
** 最后,我們有阻塞操作put和take惧互。put方法在隊(duì)列滿時(shí)阻塞哎媚,take方法在隊(duì)列空時(shí)阻塞 **
阻塞隊(duì)列說(shuō)明
LinkedBlockingQueue的容量是沒(méi)有上限的(說(shuō)的不準(zhǔn)確,在不指定時(shí)容量為Integer.MAX_VALUE喊儡,不要然的話在put時(shí)怎么會(huì)受阻呢)拨与,但是也可以選擇指定其最大容量,它是基于鏈表的隊(duì)列艾猜,此隊(duì)列按 FIFO(先進(jìn)先出)排序元素买喧。
ArrayBlockingQueue在構(gòu)造時(shí)需要指定容量, 并可以選擇是否需要公平性匆赃,如果公平參數(shù)被設(shè)置true淤毛,等待時(shí)間最長(zhǎng)的線程會(huì)優(yōu)先得到處理(其實(shí)就是通過(guò)將ReentrantLock設(shè)置為true來(lái) 達(dá)到這種公平性的:即等待時(shí)間最長(zhǎng)的線程會(huì)先操作)。通常算柳,公平性會(huì)使你在性能上付出代價(jià)低淡,只有在的確非常需要的時(shí)候再使用它。它是基于數(shù)組的阻塞循環(huán)隊(duì) 列瞬项,此隊(duì)列按 FIFO(先進(jìn)先出)原則對(duì)元素進(jìn)行排序蔗蹋。
PriorityBlockingQueue是一個(gè)帶優(yōu)先級(jí)的 隊(duì)列,而不是先進(jìn)先出隊(duì)列囱淋。元素按優(yōu)先級(jí)順序被移除猪杭,該隊(duì)列也沒(méi)有上限(看了一下源碼,PriorityBlockingQueue是對(duì) PriorityQueue的再次包裝妥衣,是基于堆數(shù)據(jù)結(jié)構(gòu)的皂吮,而PriorityQueue是沒(méi)有容量限制的戒傻,與ArrayList一樣,所以在優(yōu)先阻塞 隊(duì)列上put時(shí)是不會(huì)受阻的涮较。雖然此隊(duì)列邏輯上是無(wú)界的稠鼻,但是由于資源被耗盡,所以試圖執(zhí)行添加操作可能會(huì)導(dǎo)致 OutOfMemoryError)狂票,但是如果隊(duì)列為空候齿,那么取元素的操作take就會(huì)阻塞,所以它的檢索操作take是受阻的闺属。另外慌盯,往入該隊(duì)列中的元 素要具有比較能力。
DelayQueue(基于PriorityQueue來(lái)實(shí)現(xiàn)的)是一個(gè)存放Delayed 元素的無(wú)界阻塞隊(duì)列掂器,只有在延遲期滿時(shí)才能從中提取元素亚皂。該隊(duì)列的頭部是延遲期滿后保存時(shí)間最長(zhǎng)的 Delayed 元素。如果延遲都還沒(méi)有期滿国瓮,則隊(duì)列沒(méi)有頭部灭必,并且poll將返回null。當(dāng)一個(gè)元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一個(gè)小于或等于零的值時(shí)乃摹,則出現(xiàn)期滿禁漓,poll就以移除這個(gè)元素了。此隊(duì)列不允許使用 null 元素孵睬。
---代碼:開(kāi)始 簡(jiǎn)單的數(shù)據(jù)操作對(duì)比---
/**
* @Description: 非阻塞隊(duì)列對(duì)比,1萬(wàn)次追加數(shù)據(jù)播歼,1萬(wàn)次移除數(shù)據(jù) 1萬(wàn)次生產(chǎn)消費(fèi)同時(shí)處理效率對(duì)比
* @Author: [Bill] on [2019/8/1 15:08]
*/
public class QueueInterTest {
public static void main(String[] args) {
//執(zhí)行1萬(wàn)次追加數(shù)據(jù)
int count = 10000;
Queue priorityQueue = new PriorityQueue();//有序列表
Queue concurrentLinkedQueue = new ConcurrentLinkedQueue();//基于鏈接節(jié)點(diǎn),線程安全
Queue linkedBlockingQueue = new LinkedBlockingQueue();
Queue arrayBlockingQueue = new ArrayBlockingQueue<>(count);//需要選擇是否需要公平性掰读,所以需要加入的對(duì)象實(shí)現(xiàn)Comparable比較接口
Queue priorityBlockingQueue = new PriorityBlockingQueue();
Queue delayQueue = new DelayQueue();//只有在延遲期滿時(shí)才能從中提取元素秘狞,由于需要實(shí)現(xiàn)延遲需要加入的對(duì)象實(shí)現(xiàn)Delayed接口
List<Queue> queues = Arrays.asList(priorityQueue, concurrentLinkedQueue, linkedBlockingQueue,arrayBlockingQueue, priorityBlockingQueue, delayQueue);
queues.forEach(p -> testAddTemplate(p));
//溢出1萬(wàn)次獲取數(shù)據(jù)
queues.forEach(p -> testRemoveTemplate(p));
}
private static void testAddTemplate(Queue priorityQueue) {
//進(jìn)行GC回收
JvmUtils.restoreJVM();
//進(jìn)行測(cè)試
int count = 10000;
//創(chuàng)建對(duì)象
TestBean testBean = new TestBean();
long start = System.nanoTime();
for (int i = 0; i < count; i++) {
priorityQueue.add(testBean);
}
long nscost = (System.nanoTime()) - start;
System.out.println(priorityQueue.getClass().getSimpleName() + " add total cost=" + nscost + "ns , each cost="
+ (nscost / count) + "ns");
JvmUtils.restoreJVM();
}
private static void testRemoveTemplate(Queue priorityQueue) {
//進(jìn)行GC回收
JvmUtils.restoreJVM();
//進(jìn)行測(cè)試
long start = System.nanoTime();
int size = priorityQueue.size();
for (int i = 0; i < size; i++) {
priorityQueue.remove();
}
long nscost = (System.nanoTime()) - start;
System.out.println(priorityQueue.getClass().getSimpleName() + " remove total cost=" + nscost + "ns , each cost="
+ (nscost / size) + "ns");
JvmUtils.restoreJVM();
}
---代碼:結(jié)束 簡(jiǎn)單的數(shù)據(jù)操作對(duì)比---
阻塞隊(duì)列使用對(duì)比 1萬(wàn)次生產(chǎn)消費(fèi)同時(shí)處理效率對(duì)比。
對(duì)比代碼
package com.evolution.upgrade.collection.queue;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import com.evolution.upgrade.Serialization.TestBean;
import com.evolution.upgrade.collection.queue.impl.BlockingQueueInterImpl;
import com.evolution.upgrade.utils.JvmUtils;
/**
* @Description: 阻塞隊(duì)列使用對(duì)比 1萬(wàn)次生產(chǎn)消費(fèi)同時(shí)處理效率對(duì)比蹈集。
* @Author: [Bill] on [2019/8/1 17:30]
*/
public class BlockingQueueInterTest {
private static final int count = 10000;
public static void main(String[] args) {
BlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
BlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(count);//需要選擇是否需要公平性烁试,所以需要加入的對(duì)象實(shí)現(xiàn)Comparable比較接口
BlockingQueue priorityBlockingQueue = new PriorityBlockingQueue();
BlockingQueue delayQueue = new DelayQueue();//只有在延遲期滿時(shí)才能從中提取元素,由于需要實(shí)現(xiàn)延遲需要加入的對(duì)象實(shí)現(xiàn)Delayed接口
List<BlockingQueue> queues = Arrays.asList(linkedBlockingQueue, linkedBlockingQueue, arrayBlockingQueue,
priorityBlockingQueue, delayQueue);
queues.forEach(p -> testTemplate(p));
System.out.println("==========================");
queues.forEach(p -> testThreadTemplate(p));
}
private static void testThreadTemplate(BlockingQueue blockingQueue) {
BlockingQueueInter blockingQueueInter = new BlockingQueueInterImpl(blockingQueue);
//預(yù)處理
for (int i = 0; i < 10; i++) {
try {
blockingQueueInter.produce(new TestBean());
blockingQueueInter.consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//進(jìn)行GC回收
JvmUtils.restoreJVM();
//多線程拢肆,生產(chǎn)消費(fèi)同時(shí)進(jìn)行
Producer producer = new Producer(blockingQueueInter, new TestBean());
CountDownLatch countDownLatch = new CountDownLatch(count - 1);
Consumer consumer = new Consumer(blockingQueueInter, countDownLatch);
ExecutorService service = Executors.newCachedThreadPool();
service.submit(producer);
service.submit(consumer);
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
consumer.keepRunning = false;
service.shutdown();
//進(jìn)行GC回收
JvmUtils.restoreJVM();
}
private static void testTemplate(BlockingQueue blockingQueue) {
try {
//進(jìn)行GC回收
JvmUtils.restoreJVM();
BlockingQueueInter blockingQueueInter = new BlockingQueueInterImpl(blockingQueue);
//創(chuàng)建對(duì)象
TestBean testBean = new TestBean();
long start = System.nanoTime();
for (int i = 0; i < count; i++) {
blockingQueueInter.produce(testBean);
}
for (int i = 0; i < count; i++) {
blockingQueueInter.consume();
}
long nscost = (System.nanoTime()) - start;
System.out.println(blockingQueue.getClass().getSimpleName() + " add total cost=" + nscost
+ "ns , each cost=" + (nscost / count) + "ns");
JvmUtils.restoreJVM();
} catch (Exception ex) {
}
}
static class Producer implements Runnable {
private final BlockingQueueInter blockingQueueInter;
private final TestBean testBean;
public Producer(BlockingQueueInter blockingQueueInter, TestBean testBean) {
this.blockingQueueInter = blockingQueueInter;
this.testBean = testBean;
}
@Override
public void run() {
for (int i = 0; i < count; i++) {
try {
blockingQueueInter.produce(testBean);
} catch (InterruptedException e) {
}
}
}
}
static class Consumer implements Runnable {
private final BlockingQueueInter blockingQueueInter;
private final CountDownLatch countDownLatch;
AtomicInteger cou = new AtomicInteger(0);
volatile boolean keepRunning = true;
long start = System.nanoTime();
public Consumer(BlockingQueueInter blockingQueueInter, CountDownLatch countDownLatch) {
this.blockingQueueInter = blockingQueueInter;
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
while (keepRunning) {
if (cou.intValue() + 1 >= count) {
long nscost = (System.nanoTime()) - start;
System.out.println(blockingQueueInter.getName() + " add total cost=" + nscost
+ "ns , each cost=" + (nscost / count) + "ns");
break;
}
Object consume = blockingQueueInter.consume();
if (Objects.nonNull(consume)) {
cou.getAndIncrement();
countDownLatch.countDown();
}
}
} catch (InterruptedException e) {
} finally {
countDownLatch.countDown();
}
}
}
}
結(jié)果
LinkedBlockingQueue add total cost=6567496ns , each cost=656ns
ArrayBlockingQueue add total cost=11259355ns , each cost=1125ns
PriorityBlockingQueue add total cost=8507130ns , each cost=850ns
DelayQueue add total cost=7681897ns , each cost=768ns
==========================
LinkedBlockingQueue add total cost=6663490ns , each cost=666ns
ArrayBlockingQueue add total cost=4468268ns , each cost=446ns
PriorityBlockingQueue add total cost=8451825ns , each cost=845ns
DelayQueue add total cost=13924673ns , each cost=1392ns