LinkedBlockingQueue是一個單向鏈表結構的阻塞隊列,繼承了抽象類AbstractQueue继蜡,實現(xiàn)了BlockingQueue和Serializable接口。因為底層是鏈表結構核畴,所以優(yōu)點是存取快域慷,缺點是查詢慢严就。先存先取总寻。
構造方法和成員變量
數(shù)據(jù)單元Node
static class Node<E> {
//數(shù)據(jù)
E item;
//下一個元素
Node<E> next;
//構造方法
Node(E x) { item = x; }
}
成員變量
//隊列的最大容量
private final int capacity;
//這是一個Integer的原子類,記錄了當前隊列總共有多少元素
private final AtomicInteger count = new AtomicInteger();
//隊列的頭部
transient Node<E> head;
//隊列的尾部
private transient Node<E> last;
//出列鎖
private final ReentrantLock takeLock = new ReentrantLock();
//出列鎖的Condition梢为,用于暫停出列線程
private final Condition notEmpty = takeLock.newCondition();
//入列鎖
private final ReentrantLock putLock = new ReentrantLock();
//入列鎖的Condition渐行,用于暫停入列線程
private final Condition notFull = putLock.newCondition();
構造方法
//無參構造,默認隊列長度為Integer.MAX_VALUE
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
//傳入一個int型整數(shù)作為隊列的最大容量
public LinkedBlockingQueue(int capacity) {
//入?yún)⑿r炛∮?拋出異常
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
//初始化頭尾數(shù)據(jù)祟印,在隊列沒有數(shù)據(jù)時頭和尾是同一個空對象
last = head = new Node<E>(null);
}
//傳入一個實現(xiàn)了Collection接口的集合,將集合的數(shù)據(jù)導入到隊列中
public LinkedBlockingQueue(Collection<? extends E> c) {
//首先調用一個參數(shù)的構造方法初始化
this(Integer.MAX_VALUE);
//獲取入列鎖
final ReentrantLock putLock = this.putLock;
//上鎖
putLock.lock(); // Never contended, but necessary for visibility
//遍歷集合將數(shù)據(jù)導入到隊列
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
因為LinkedBlockingQueue底層是一個列表結構粟害,不涉及到擴容蕴忆,所以成員變量和構造方法總體上還是比較簡單的,主要做了初始化了隊列的容量以及鎖和頭尾數(shù)據(jù)悲幅。當時還能通過在構造方法中傳入一個集合來將集合的數(shù)據(jù)導入隊列
隊列最主要的是插入和取出這兩種操作套鹅。根據(jù)這些操作在隊列達到最大容量或者為空的時候,有分為以下幾種類型
操作類型 | Throws Exception | Special Value | Blocked | Timed out |
---|---|---|---|---|
插入 | add(o) | offer(o) | put(o) | offer(o,timeout,unit) |
取出 | take() | poll(timeout,unit) |
-
Throws Exception
:如果元素插入失敗汰具,則拋出異常卓鹿,成功則返回true -
Special Value
:返回一個boolean類型的數(shù)據(jù),如果是true則插入成功留荔,如果為false則插入失敗吟孙, -
Blocked
:在取出或插入元素時,如果隊列滿了,或者隊列取空了杰妓,則阻塞當前操作的線程藻治,直到當前隊列中有空閑位置或者當前隊列中有待取元素時繼續(xù)執(zhí)行 -
Timed out
:等待超時,在Blocked的情況下巷挥。如果等待超過了設定的時間桩卵,則線程不再等待,并且都返回一個null
主要方法
因為LinkedBlockingQueue作為一個隊列句各,最主要的必然是入列和出列這些方法吸占,接下來將從這些方法的源碼展開晴叨。其他不常用的方法稍稍提及
1凿宾、add(E e)
往隊列中添加一個元素,如果添加成功則返回true兼蕊,如果添加失敗則拋出異常
public boolean add(E e) {
//調用offer方法初厚,如果添加成功則返回true
if (offer(e))
return true;
else
//否則添加失敗拋出異常
throw new IllegalStateException("Queue full");
}
可以看到add(o)方法主要還是調用了offer(o)方法,對插入失敗做了拋異常的處理
2孙技、offer(E e)
往隊列中添加一條數(shù)據(jù)产禾,如果添加成功則返回true,失敗返回false
public boolean offer(E e) {
//插入數(shù)據(jù)為null牵啦,拋出空指針異常
if (e == null) throw new NullPointerException();
//引用用于記錄隊列大小的原子類
final AtomicInteger count = this.count;
//如果隊列裝滿了亚情,則不能繼續(xù)插入,返回false
if (count.get() == capacity)
return false;
//用于記錄操作前隊列的大小哈雏,預先設置為負數(shù)楞件,
//最后根據(jù)正負判斷是否插入成功
int c = -1;
//new一個新的節(jié)點
Node<E> node = new Node<E>(e);
//引用插入鎖
final ReentrantLock putLock = this.putLock;
//上鎖
putLock.lock();
try {
//如果隊列還沒有裝滿,就往鏈表中插入一條數(shù)據(jù)
if (count.get() < capacity) {
enqueue(node);
/**
隊列大小自增1(注意這里如果不了解原
子類的朋友裳瘪,getAndIncrement是先
取再自增土浸,所以這里的c還是插入前的隊
列大小)c = count.getAndIncrement();
(c+1代表插入后的隊列大小)所以這里
的邏輯是如果當插入后的隊列大小還未
達到最大容量的時候彭羹,去喚醒其他還在
等待插入的線程黄伊。這個問題在多線程中
會遇到,如果沒有這個判斷派殷,在多線程
環(huán)境中會造成線 程假死还最。具體原因在
offer(o,timeout,unit)說完后分析*/
if (c + 1 < capacity)
notFull.signal();
}
} finally {
//釋放鎖
putLock.unlock();
}
/**如果插入前隊列為空,則說明會有取出數(shù)據(jù)的
線程還在等待毡惜,此時隊列中已經插入了一條數(shù)據(jù)
喚醒還在等待取出數(shù)據(jù)的線程可以繼續(xù)從隊列中
*/取出數(shù)據(jù)了
if (c == 0)
signalNotEmpty();
//因為隊列長度預先設置為負數(shù)拓轻,如果插入成功則
//一定是判斷是>=0的數(shù),否則插入失敗
return c >= 0;
}
//往鏈表中添加一條數(shù)據(jù)的方法
private void enqueue(Node<E> node) {
last = last.next = node;
}
//通知還在出列的線程可以繼續(xù)從隊列中拿數(shù)據(jù)了
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
作為一個鏈表虱黄,LinkedBlockingQueue的offer()入列方法還是比較容易理解的悦即。在這里使用到的Lock鎖和原子類因為不是本篇文章的重點。所以如果有不了解的朋友可以去看《Java多線程編程核心技術》這本書。
3辜梳、offer(E e, long timeout, TimeUnit unit)
往隊列中添加一個元素粱甫,如果隊列塞滿了,則等待一段時間作瞄,如果在等待的這段時間內隊列內的元素還是慢的則等待超時茶宵,返回false
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
將傳入的時間轉換為納秒
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//以上代碼除了時間轉換其他的和offer(E e)一樣
/**這里是用lockInterruptibly()上鎖的原因
是lockInterruptibly()方法在上鎖的時候如果
當前線程中斷則會拋異常,因為這個方法設置的超
時是用線程等待來實現(xiàn)的宗挥,如果在當前線程等待的
過程中突然中斷了乌庶,則能把線程中斷的狀態(tài)響應給
調用方法的地方進行處理。而lock()上鎖則不能
響應線程的中斷狀態(tài)*/
putLock.lockInterruptibly();
try {
//當隊列塞滿的時候
while (count.get() == capacity) {
//如果設插入置的等待超時時間<=0契耿,就相當于不等待瞒大,直接返回插入失敗
if (nanos <= 0)
return false;
/** 線程等待,這里的awaitNanos(nanos)方法執(zhí)行后回返回
一個long類型的<=0的數(shù)搪桂。所以這里的awaitNanos(nanos)
方法實際上最多只會執(zhí)行一次透敌,下一次
會進入上面的if里面return掉,也就
達到了超時的效果踢械,最終超時后會返回
false插入失敗*/
nanos = notFull.awaitNanos(nanos);
}
//往鏈表中插入數(shù)據(jù)
enqueue(new Node<E>(e));
//下面的邏輯跟offer(E e)中的一樣
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
offer(E e, long timeout, TimeUnit unit)和offer(E e)的邏輯差不多酗电,只不過在隊列塞滿的情況下前者做的是等待超時,而后者返回false内列,這也導致了它們兩者選擇上鎖方式上的不同
4撵术、 put(E e)
往隊列中添加一個元素,如果隊列塞滿了话瞧,則線程無限期等
待直到被喚醒
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/**唯一與offer(E e, long timeout,
TimeUnit unit)中不一樣的是在隊列塞滿
的情況下這里執(zhí)行的是線程無限期等待嫩与,直到
被喚醒,其他邏輯都是一樣的*/
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
put()與其他入列方法最大的不同就是在隊列塞滿的時候線程會無限期等待移稳。直到有其他線程在執(zhí)行出列方法時隊列有了空閑空間蕴纳,就會喚醒put()方法繼續(xù)執(zhí)行入列動作
5、poll()
從隊列中取出第一個元素并返回个粱,同時將這個元素從隊列中刪除古毛。如果取出成功則返回取出的元素,如果失敗則返回null
public E poll() {
//引用原子類
final AtomicInteger count = this.count;
//當隊列中沒有數(shù)據(jù)時都许,啥也取不到稻薇,返回null
if (count.get() == 0)
return null;
//從隊列中取出的第一個元素,預先設置成null
E x = null;
//隊列長度胶征,預先設置成負數(shù)
int c = -1;
//引用“取鎖”
final ReentrantLock takeLock = this.takeLock;
//上鎖
takeLock.lock();
try {
//隊列中還有元素
if (count.get() > 0) {
//執(zhí)行出列方法
x = dequeue();
//獲取隊列長度并自減
c = count.getAndDecrement();
//跟入列中的一樣
if (c > 1)
notEmpty.signal();
}
} finally {
//釋放鎖
takeLock.unlock();
}
//與入列中的一樣
if (c == capacity)
signalNotFull();
//返回取出的元素
return x;
}
//隊列出列
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
//通知還在入列的線程可以繼續(xù)往隊列中存數(shù)據(jù)了
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
poll()方法和offer(E e)是一對的塞椎,一入一出芒篷,一個判斷隊列是否塞滿巫俺,一個判斷隊列是否取完。
6卷雕、poll(long timeout, TimeUnit unit)
從隊列中取出第一個元素,并將該元素從隊列中刪除骂铁,取出成功返回被取出的元素吹零,失敗則返回null
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
這個方法和offer(E e, long timeout, TimeUnit unit)也是一對的,一存一取拉庵,并且在取出第一個元素后將該元素從隊列中刪除灿椅。一個在塞滿是等待超時,一個在取完時等待超時
7钞支、take()
從隊列中取出第一個元素茫蛹,并將該元素從隊列中刪除,如果隊列中的元素取完了烁挟,則線程無限期等待婴洼,直到被其它線程喚醒
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
這個方法與put(E e)是一對的,一入一出信夫,一個在隊列塞滿時無限期等待窃蹋,一個在隊列取完時無限期等待。
8静稻、remove(Object o)
從隊列中刪除指定的元素
public boolean remove(Object o) {
if (o == null) return false;
//入列和出列同時上鎖
fullyLock();
try {
//遍歷鏈表,找到要刪除的元素
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
//找到了要刪除的元素匈辱, if (o.equals(p.item)) {
//執(zhí)行刪除方法
unlink(p, trail);
//刪除成功振湾,返回true
return true;
}
}
return false;
} finally {
//釋放鎖
fullyUnlock();
}
}
由于LinkedBlockingQueue是鏈表結構,并且還是單向鏈表亡脸,所以remove方法會涉及到從頭到尾的遍歷押搪,效率上來說是很低的
9、其他的一些方法
peek()
獲取隊列中的第一個元素浅碾,但是不刪除
contains(Object o)
判斷隊列中是否包含要查找的元素大州,同樣需要遍歷
toArray()
將隊列轉換成數(shù)組
toArray(T[] a)
將隊列轉換成指定數(shù)據(jù)類型的數(shù)組
clear()
清空隊列
drainTo(Collection<? super E> c)
將隊列轉換成實現(xiàn)Collection接口的集合。
drainTo(Collection<? super E> c, int maxElements)
將隊列的指定長度的元素轉換成實現(xiàn)Collection接口的集合垂谢,從頭開始轉換厦画,
如果指定的長度大于或等于隊列長度,則將隊列所有元素都轉換成集合滥朱。
iterator()
獲取迭代器
spliterator()
獲取分流迭代器(1.8新增)
remainingCapacity()
獲取隊列中的剩余容量
size()
獲取隊列的大小/長度
在offer(E e, long timeout, TimeUnit unit)方法中提到的在入列方法中喚醒入列的等待狀態(tài)是因為這個方法和put()方法中都會出現(xiàn)線程等待根暑。例如此時有三個線程ThreadA 、ThreadB和ThreadC徙邻,隊列最大容量為5排嫌。ThreadA先執(zhí)行offer(.........)方法往隊列中塞6條數(shù)據(jù),ThreadB執(zhí)行put()方法往隊列中塞2條數(shù)據(jù)缰犁,這時候由于隊列最大容量為5淳地,ThreadA中還有一條數(shù)據(jù)再等待插入怖糊,ThreadB()中2條數(shù)據(jù)都在等待插入,這時候ThreadC執(zhí)行take()方法從隊列中取兩條數(shù)據(jù)并刪除颇象,這時候喚醒了ThreadA執(zhí)行最后一條數(shù)據(jù)的插入蓬抄,這時候因為ThreadC刪除了2條數(shù)據(jù),ThreadA再插入一條夯到,最后隊列中總共有4條數(shù)據(jù)嚷缭,還能再插入一條數(shù)據(jù),因為使用的是同一把鎖耍贾,所以這時候就需要喚醒另一個Put()方法所在的線程阅爽。所以這就是上面方法中要加上那個判斷邏輯的原因,否則上面例子put()方法本來還能再插入一條數(shù)據(jù)荐开,現(xiàn)在就不能繼續(xù)插入造成線程假死付翁。
以上就是LinkedBlockingQueue常用的一些方法,以上Lock鎖由于不在本篇文章的主講范圍晃听,所以不做擴展百侧,原子類是一個可以在多線程中進行原子操作的類,例如本篇文章中的鏈表自增能扒,使用原子類在多線程中就不會出現(xiàn)線程安全問題佣渴。但是原子類只能保證單個方法中是原子的,不同方法之間還是非原子的初斑,但是最終的結果還是安全的辛润。