簡介
Queue
锯茄,翻譯成隊列,是一種先進先出(FIFO, First In First Out)的數(shù)據(jù)結構谈火。最先放進去的侈询,取的時候也就最先取出來。最形象的比喻就是我們常見的排隊就是一個隊列堆巧。排隊時妄荔,新來的人進入隊尾,先到的人首先接收服務谍肤。
Queue
大多數(shù)是單向隊列啦租,即只能從一端取數(shù)據(jù),另一端放入數(shù)據(jù)荒揣。就像把羽毛球放入球筒篷角,從一端放入,從另一端取出系任。
Queue體系
public interface Queue<E> extends Collection<E> {
boolean add(E e);
boolean offer(E e);
E remove();
E poll();
E element();
E peek();
}
-
boolean add(E e)
:向queue中添加element恳蹲。成功返回true虐块。不會返回false。當添加失敗(比如queue有大小限制)嘉蕾,則拋出異常IllegalStateException
-
boolean offer(E e)
:跟add類似贺奠。區(qū)別在于add插入失敗會拋出異常,offer會返回false错忱。 -
E remove()
: 返回第一個element儡率,并從queue中刪除。queue為空則拋出異常NoSuchElementException
-
E pool()
: 返回第一個element以清,并從queue中刪除儿普。queue為空則返回null -
E element()
: 返回第一個element,但是不從queue中刪除掷倔。queue為空則拋出異常NoSuchElementException
-
E peak()
:返回第一個element眉孩,但是不從queue中刪除。queue為空則拋出異常
上面的6個方法分別表示了3種操作勒葱,每一種操作都有兩種類型浪汪,拋出異常的類型和有特定返回值的類型。下面用表格來描述上述方法的異同:
方法 | 作用 | 返回值 | 成功 | 失敗 |
---|---|---|---|---|
add(E e) | 插入元素 | boolean | true | IllegalStateException |
offer(E e) | 同add() | boolean | true | false |
remove(E e) | 返回第一個元素错森,并刪除 | E | E | NoSuchElementException |
pool(E e) | 同remove | boolean | E | null |
element() | 返回第一個元素吟宦,不刪除 | E | E | NoSuchElementException |
peak() | 同element | E | E | null |
Queue
接口常見的擴展和實現(xiàn)有:
-
AbstractQueue
: 實現(xiàn)部分方法的抽象類篮洁。做為大多數(shù)具體實現(xiàn)類的基類涩维。 -
BlockingQueue
: 阻塞隊列。在Queue
的基礎上增加了阻塞接口袁波。 -
Deque
: 雙向隊列瓦阐,double ended queue
的縮寫。隊列兩端都可以操作隊列篷牌。
AbstractQueue
AbstractQueue
類似于很多JDK源碼中的Abstract*
類睡蟋,實現(xiàn)了部分通用方法,做為具體實現(xiàn)類的基類枷颊。這里主要講一下PriorityQueue
和ConcurrentLinkedQueue
戳杀。
PriorityQueue
基于堆實現(xiàn)的優(yōu)先隊列。獲取數(shù)據(jù)時是有序的夭苗。默認是升序信卡。
主要特點:
- 無界隊列。隊列是用數(shù)組實現(xiàn)题造,需要指定大小傍菇,數(shù)組大小可以動態(tài)增加,容量無限界赔。
- 要實現(xiàn)插入的元素有序丢习,有兩種方法:
- 插入元素實現(xiàn)了
Comparable
接口 - 在構造函數(shù)中傳入
Comparator
實現(xiàn)牵触,如下
- 插入元素實現(xiàn)了
public class PriorityQueue<E> extends AbstractQueue<E>{
public PriorityQueue(int initialCapacity, Comparator<? super E> comparator);
}
- 非線程安全。如需線程安全的實現(xiàn)咐低,請使用
PriorityBlockingQueue
- 插入的元素不能為null
- 使用
for-each
或iterator
來遍歷優(yōu)先隊列揽思,得到的結果并不保證有序。只有通過不斷調(diào)用poll()/remove()
方法得到的結果才是有序的见擦。如果需要按順序遍歷绰更,請考慮使用 Arrays.sort(pq.toArray())。如下:
PriorityQueue<Integer> test = new PriorityQueue<>();
test.add(10);
test.add(4);
test.add(7);
test.add(2);
test.add(9);
//可保證有序輸出锡宋,輸出 2 4 7 9 10
while (!test.isEmpty()) {
System.out.println(test.poll());
}
//不保證有序輸出儡湾,輸出 2 4 7 10 9
for(Integer e : test){
System.out.println(e);
}
//不保證有序輸出,輸出 2 4 7 10 9
Iterator it = test.iterator();
while (it.hasNext()) {
System.out.println(it.next());
}
ConcurrentLinkedQueue
ConcurrentLinkedQueue
是一個無鎖線程安全隊列执俩。
常見的線程安全實現(xiàn)都是通過加鎖(在“線程安全的實現(xiàn)”這一節(jié)就能看到)徐钠,而加鎖的成本是很高的。如果能找到一種方法役首,既不用加鎖尝丐,又能保證線程安全,很大可能能極大提升系統(tǒng)性能衡奥。ConcurrentLinkedQueue
就是使用了這種無鎖方法的一種隊列爹袁。
其實現(xiàn)思想借鑒了很通用,也很重要的CAS操作矮固。CAS簡介
詳情請見無鎖隊列 - ConcurrentLinkedQueue
BlockingQueue
BlockingQueue
名為阻塞隊列失息。
何為阻塞?以從隊列中取數(shù)據(jù)為例档址,當隊列為空時盹兢,Queue
提供方法的表現(xiàn)為:
-
remove()
: 拋出異常NoSuchElementException -
poll()
: 返回null
這種情況下,如果想在隊列不空時獲取數(shù)據(jù)守伸,只能通過循環(huán)不斷調(diào)用remove()
或poll()
绎秒。那么,如果能在隊列為空的時候尼摹,方法不返回见芹,而是等待數(shù)據(jù),直到隊列中有了數(shù)據(jù)蠢涝,才繼續(xù)進行玄呛,就方便很多了。這就是阻塞方法惠赫。
BlockingQueue
在Queue
的基礎上主要擴展了以下幾個阻塞方法:
public interface BlockingQueue<E> extends Queue<E> {
void put(E e) throws InterruptedException;
E take() throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
}
-
void put(E e)
: 作用同add(E e)
把鉴,當隊列滿時阻塞,直到隊列不滿。 -
boolean take()
: 作用同remove()
庭砍,當隊列為空時阻塞场晶,直到隊列不空。 -
boolean offer(E e, long timeout, TimeUnit unit)
: 作用同offer(E e)
怠缸,只是增加了超時诗轻,允許等一段時間。如果過了超時時間還不能插入成功揭北,則返回false扳炬。 -
E poll(long timeout, TimeUnit unit)
: 作用同poll()
,也是增加了超時搔体。如果過了超時時間還不能獲取數(shù)據(jù)恨樟,則返回null。
異常行為 | 插入數(shù)據(jù) | 取數(shù)據(jù) |
---|---|---|
拋出異常 | add(E e) | remove() |
返回null | offer(E e) | poll() |
阻塞 | put(E e)/offer(E e, long timeout) | take()/poll(long timeout) |
BlockingQueue的具體實現(xiàn)
BlockingQueue
只是個接口疚俱,其常用的具體實現(xiàn)有哪些呢劝术?主要有以下這些:
-
ArrayBlockingQueue
: 基于數(shù)組的隊列,有界隊列呆奕,必須指定大小养晋。 -
LinkedBlockingQueue
: 基于鏈表的隊列,無界隊列梁钾,大小可指定绳泉,可不指定。 -
PriorityBlockingQueue
: 優(yōu)先隊列姆泻,或者說有序隊列零酪。隊列中的元素按照指定規(guī)則排好序。 -
SynchronousQueue
: 沒有任何容量(capacity)的隊列麦射,是一個比較特殊的隊列蛾娶。 -
DelayQueue
: 延遲隊列灯谣。放入的元素必須過了超時時間才能取出潜秋。放入的元素必須實現(xiàn)Delayed
接口
ArrayBlockingQueue
主要特點:
- 有界隊列
- 基于數(shù)組存儲,數(shù)組長度固定胎许,需要在構造函數(shù)中指定
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
final Object[] items; //基于數(shù)組
final ReentrantLock lock; //線程同步鎖
private final Condition notEmpty; //條件變量峻呛,用于取數(shù)據(jù)同時隊列為空時阻塞線程
private final Condition notFull; //條件變量,用戶插入數(shù)據(jù)同時隊列滿時阻塞線程
}
LinkedBlockingQueue
主要特點:
- 可以是無界隊列辜窑,也可以是有界隊列钩述。區(qū)別在于是否設定隊列大小。
- 基于鏈表存儲穆碎。
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private transient Node<E> head; //隊列頭
private final ReentrantLock takeLock = new ReentrantLock(); //取數(shù)據(jù)線程同步鎖
private final Condition notEmpty = takeLock.newCondition(); //條件變量牙勘,用于取數(shù)據(jù)同時隊列為空時阻塞線程
private final ReentrantLock putLock = new ReentrantLock(); //插入數(shù)據(jù)線程同步鎖
private final Condition notFull = putLock.newCondition(); //條件變量,用戶插入數(shù)據(jù)同時隊列滿時阻塞線程
}
注:
put(E e)
方法只有在隊列滿時才組設,因此方面,如果是無界隊列放钦,put(E e)
永遠不會阻塞。
PriorityBlockingQueue
優(yōu)先隊列PriorityQueue
的線程安全版本恭金,同時提供阻塞方法操禀。
主要特點同PriorityQueue
。
public class PriorityBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private transient Object[] queue;
}
SynchronousQueue
比較特殊的隊列横腿,沒有任何存儲空間颓屑。
舉個簡單的例子:
A是個快遞員,要送快遞給用戶B耿焊,如果使用ArrayBlockingQueue
或者LinkedBlockingQueue
揪惦,會是這樣:
- A把快遞放到快遞柜的箱子里(假設快遞柜有20個箱子)
- 如果有空箱子,可以直接把快遞放到空箱子中罗侯。
- 如果所有的箱子都滿了丹擎,那么A等著,直到B取了任意快遞歇父,空出了箱子蒂培,A再把快遞放到空箱子中。
- 如果所有的箱子都空了榜苫,取快遞的人B會一直等待护戳,直到有快遞投遞到箱子中。
那么如果使用SynchronousQueue
垂睬,情況就不同了
- 首先沒有能臨時存放快遞的柜子和箱子
- A要送快遞媳荒,會一直拿著快遞等,直到B來取驹饺。
- B要取快遞钳枕,也會一直等,直到A來送赏壹。
SynchronousQueue
就是類似這種“手到手”的交付方式鱼炒,不經(jīng)過任何媒介緩存。
主要特點:
- 沒有任何數(shù)據(jù)結構可保存數(shù)據(jù)蝌借,不能調(diào)用peek()方法來看隊列中是否有數(shù)據(jù)元素
- 不能遍歷隊列昔瞧,應為根本沒有存儲任何數(shù)據(jù)
- 調(diào)用
put()
方法會等待,直到有其他線程調(diào)用了take()
方法
DelayQueue
DelayQueue
的繼承結構如下:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {
private final PriorityQueue<E> q = new PriorityQueue<E>(); //存儲數(shù)據(jù)
}
由類的聲明可知菩佑,插入到DelayQueue
中的元素必須實現(xiàn)Delayed
接口自晰。Delayed
接口比較簡單,只有一個getDelay(TimeUnit unit)
方法稍坯。
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit); //剩余時間
}
同時酬荞,DelayQueue
存儲數(shù)據(jù)既不像ArrayBlockingQueue
使用數(shù)組,也不像LinkedBlockingQueue
使用鏈表,而是使用現(xiàn)成的PriorityQueue
混巧。因此DelayQueue
取數(shù)據(jù)的規(guī)則跟PriorityQueue
類似糟把。
何謂延遲?
延遲主要體現(xiàn)在取數(shù)據(jù)的時候牲剃。通過查看poll()
的源碼可知:
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
return null;
else
return q.poll();
} finally {
lock.unlock();
}
}
在取數(shù)的時候遣疯,getDelay()
必須小于等于0才能把數(shù)取出來。通過實現(xiàn)getDelay()
方法凿傅,就能實現(xiàn)過多長時間以后才能取出數(shù)據(jù)這種延遲效果缠犀。
主要特點:
- 無界隊列,因此
put(E e)
不會阻塞 - 調(diào)用取元素的方法:
remove()
聪舒、poll()
辨液、take()
時,只有當元素超時以后才能取到箱残。
BlockingQueue的應用
實現(xiàn)生產(chǎn)者-消費者
生產(chǎn)者-消費者模式在工程之中應用廣泛滔迈。BlockingQueue
可極大簡化實現(xiàn)生產(chǎn)者-消費者的難度。
偽代碼:
//生產(chǎn)者
public class Producer{
private BlockingQueue queue;
Producer(BlockingQueue queue){this.queue = queue}
public void produce(E e){
//生產(chǎn)者調(diào)用put()方法被辑,隊列滿時阻塞等待燎悍。
queue.put(e);
}
}
//消費者
public class Consumer{
private BlockingQueue queue;
Consumer(BlockingQueue queue){this.queue = queue}
public E consume(){
//消費者調(diào)用take()方法,隊列空時阻塞等待
queue.take(e);
}
}
做為線程池的等待隊列
- 線程池
ThreadPoolExecutor
的構造函數(shù)接收BlockingQueue
做為等待隊列 -
Executors.newSingleThreadPool()/Executors.newFixedThreadPool()
默認使用LinkedBlockingQueue
做為等待隊列 -
Executors.newCachedThreadPool()
默認使用SynchronousQueue
做為等待隊列盼理。可以做到一有請求宏怔,就創(chuàng)建新線程奏路。
求Top K大/小的元素
比如有1億個隨機數(shù)字,找出最大的10個數(shù)臊诊。這種類似的求Top K的問題很常見鸽粉。
由于PriorityQueue/PriorityBlockingQueue
底層結構是堆(大頂堆/小頂堆),而解決Top K問題的最好辦法就是使用堆抓艳,因此PriorityQueue/PriorityBlockingQueue
是解決該問題的不二選擇触机。
代碼摘要:
public class FixSizedPriorityQueue<E extends Comparable> {
private PriorityQueue<E> queue;
private int maxSize; // 堆的最大容量
public FixSizedPriorityQueue(int maxSize) {
if (maxSize <= 0)
throw new IllegalArgumentException();
this.maxSize = maxSize;
this.queue = new PriorityQueue(maxSize, new Comparator<E>() {
public int compare(E o1, E o2) {
// 生成最大堆使用o2-o1,生成最小堆使用o1-o2, 并修改 e.compareTo(peek) 比較規(guī)則
return (o2.compareTo(o1));
}
});
}
public void add(E e) {
if (queue.size() < maxSize) { // 未達到最大容量,直接添加
queue.add(e);
} else { // 隊列已滿
E peek = queue.peek();
if (e.compareTo(peek) < 0) { // 將新元素與當前堆頂元素比較壶硅,保留較小的元素
queue.poll();
queue.add(e);
}
}
}
}
延時需求
如:考試時間為120分鐘威兜,30分鐘后才可交卷。這種情況下庐椒,就需要使用到DelayQueue
了。
案例參考
Deque
Deque
發(fā)音為'deck'蚂踊,為雙向隊列约谈。
Queue
默認是單向隊列,數(shù)據(jù)只能從一頭放入,從另一頭取出棱诱,這跟羽毛球的放取原理一樣泼橘。雙向隊列就像放取乒乓球,可以從兩端放入迈勋,也可以從兩端取出炬灭。
Deque
接口的部分組成
public interface Deque<E> extends Queue<E> {
//新增方法
void addFirst(E e);
void addLast(E e);
boolean offerFirst(E e);
boolean offerLast(E e);
E removeFirst();
E removeLast();
E pollFirst();
E pollLast();
E getFirst();
E getLast();
E peekFirst();
E peekLast();
void push(E e);
E pop();
//Queue接口方法
boolean add(E e);
boolean offer(E e);
E remove();
E poll();
E element();
E peek();
}
從擴展的函數(shù)名字就能看出來,Deque
分別針對插入和取出接口提供了對隊列頭和尾的操作靡菇。
同時重归,從Queue
繼承過來方法的與擴展方法有如下對應關系:
add(E e)
==addLast(E e)
offer(E e)
==offerLast(E e)
remove()
==removeFirst()
poll()
==pollFirst()
element()
==getFirst()
peek()
==peekFirst()
同時,Deque
還擴展出了push(E e)
和pop()
方法厦凤。其中:
push(E e)
等同于addFirst(E e)
pop()
等同于removeFirst()
因此鼻吮,當Deque
只通過push(E e)
和pop()
操作隊列頭時,Deque
就演化成了一個棧Stack
较鼓!
[圖片上傳失敗...(image-f152e6-1551348671199)]
擴展接口或?qū)崿F(xiàn)類
-
ArrayDeque
: 基于數(shù)組存儲的雙向隊列 -
LinkedList
: 基于鏈表存儲的雙向隊列 -
ConcurrentLinkedDeque
: 基于鏈表的線程安全版本 -
BlockingDeque
: 擴展了Deque
接口的阻塞接口椎木,同時繼承了BlockingQueue
接口。 -
LinkedBlockingDeque
: 實現(xiàn)了BlockingDeque
的雙向隊列博烂。
線程安全的實現(xiàn)
引申
Java線程安全...
BlockingQueue
中的方法都是線程安全的香椎,都使用了ReentrantLock
做為鎖。如:
ArrayBlockingQueue
ArrayBlockingQueue
中有公共變量count
來計數(shù)元素個數(shù)禽篱,因此需要一個全局的鎖來保護士鸥。
public ArrayBlockingQueue(int capacity, boolean fair) {
lock = new ReentrantLock(fair);
}
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//do something
} finally {
lock.unlock();
}
}
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//do something
} finally {
lock.unlock();
}
}
LinkedBlockingQueue
LinkedBlockingQueue
中計數(shù)元素個數(shù)使用的是AtomicInteger
類型,本身是線程安全的谆级。因此沒有使用全局鎖烤礁,而是針對插入和獲取分別創(chuàng)建了兩個鎖。
private final ReentrantLock takeLock = new ReentrantLock();
private final ReentrantLock putLock = new ReentrantLock();
public boolean offer(E e) {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
//do something
} finally {
putLock.unlock();
}
}
public E poll() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//do something
} finally {
takeLock.unlock();
}
}
引申
上面實現(xiàn)線程安全的方式都是通過鎖肥照。而我們知道脚仔,鎖是一個比較重的操作,在高并發(fā)系統(tǒng)中舆绎,能少用就少用鲤脏。因此,在此介紹一個不用鎖就能實現(xiàn)線程安全的隊列:
無鎖隊列 - ConcurrentLinkedQueue...
無鎖并且保證線程安全的思想是使用CAS...
阻塞算法的實現(xiàn)
引申
Java線程通信...
BlockingQueue
的功能是個標準的生產(chǎn)者-消費者模式吕朵。線程間通信使用的是條件變量Condition
猎醇。偽代碼如下:
ReentrantLock lock = new ReentrantLock(fair);
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();
public boolean put(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//如果隊列已經(jīng)滿了,那么等待隊列notFull
while(count == container.size){
notFull.await()
}
notEmpty.signal();
//do other things
} finally {
lock.unlock();
}
}
public E take() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//如果隊列為空努溃,那么等待隊列notEmpty
while (count == 0){
notEmpty.await();
}
notFull.signal();
} finally {
lock.unlock();
}
}
非阻塞型
ConcurrentLinkedQueue: 無鎖線程安全隊列