LinkedBlockingDeque
類實現(xiàn)了BlockingDeque
接口懒震。閱讀BlockingDeque
文本以獲取有關(guān)的更多信息。
Deque
來自“雙端隊列” 這個詞嗤详。Deque
是一個隊列个扰,你可以在插入和刪除隊列兩端的元素。
LinkedBlockingDeque
是一個Deque
葱色,如果一個線程試圖從中獲取一個元素递宅,而隊列空的,不管線程從哪一端試圖獲取元素苍狰,都會被阻塞办龄。
以下是實例化和使用LinkedBlockingDeque
的例子:
BlockingDeque<String> deque = new LinkedBlockingDeque<String>();
deque.addFirst("1");
deque.addLast("2");
String two = deque.takeLast();
String one = deque.takeFirst();
源碼
LinkedBlockingDeque
與LinkedBlockingQueue
的實現(xiàn)大體上類似,區(qū)別在于LinkedBlockingDeque
提供的操作更多淋昭。并且LinkedBlockingQueue
內(nèi)置兩個鎖分別用于put和take操作俐填,而LinkedBlockingDeque
只使用一個鎖控制所有操作。因為隊列能夠同時在頭尾進(jìn)行put和take操作翔忽,所以使用兩個鎖也需要將兩個鎖同時加鎖才能保證操作的同步性英融,不如只使用一個鎖的性能好。
同步節(jié)點相比LinkedBlockingQueue
多了一個prev
字段歇式。
static final class Node<E> {
E item;
Node<E> prev;
Node<E> next;
Node(E x) {
item = x;
}
}
增加操作
增加操作相比LinkedBlockingQueue
只能在隊列尾部增加驶悟,它能在隊列的頭尾兩端都進(jìn)行增加操作。
public void addFirst(E e) {
// 復(fù)用offer方法
if (!offerFirst(e))
throw new IllegalStateException("Deque full");
}
public void addLast(E e) {
if (!offerLast(e))
throw new IllegalStateException("Deque full");
}
public boolean offerFirst(E e) {
if (e == null) throw new NullPointerException();
// 構(gòu)造節(jié)點
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 插入到隊列頭部
return linkFirst(node);
} finally {
lock.unlock();
}
}
private boolean linkFirst(Node<E> node) {
// assert lock.isHeldByCurrentThread();
// 如果隊列已滿材失,返回false
if (count >= capacity)
return false;
// 獲取頭節(jié)點撩银,將自己的 next字段指向頭節(jié)點,然后設(shè)置自己為頭節(jié)點
Node<E> f = first;
node.next = f;
first = node;
// 如果隊列為空豺憔,尾節(jié)點也指向自己
if (last == null)
last = node;
else
f.prev = node;
++count;
// 喚醒等待獲取元素的線程
notEmpty.signal();
return true;
}
public boolean offerLast(E e) {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 插入到隊列尾部
return linkLast(node);
} finally {
lock.unlock();
}
}
private boolean linkLast(Node<E> node) {
// assert lock.isHeldByCurrentThread();
// 如果隊列已滿额获,返回false
if (count >= capacity)
return false;
// 將自己設(shè)置為尾節(jié)點
Node<E> l = last;
node.prev = l;
last = node;
// 如果隊列為空,頭節(jié)點也指向自己
if (first == null)
first = node;
else
l.next = node;
++count;
// 喚醒等待獲取元素的線程
notEmpty.signal();
return true;
}
public void putFirst(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果隊列已滿恭应,等待
while (!linkFirst(node))
notFull.await();
} finally {
lock.unlock();
}
}
public void putLast(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果隊列已滿抄邀,等待
while (!linkLast(node))
notFull.await();
} finally {
lock.unlock();
}
}
public boolean offerFirst(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
// 計算超時時間
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 如果隊列已滿,超時等待
while (!linkFirst(node)) {
if (nanos <= 0L)
return false;
nanos = notFull.awaitNanos(nanos);
}
return true;
} finally {
lock.unlock();
}
}
public boolean offerLast(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (!linkLast(node)) {
if (nanos <= 0L)
return false;
nanos = notFull.awaitNanos(nanos);
}
return true;
} finally {
lock.unlock();
}
}
刪除操作
public E removeFirst() {
// 復(fù)用poll操作
E x = pollFirst();
if (x == null) throw new NoSuchElementException();
return x;
}
public E removeLast() {
E x = pollLast();
if (x == null) throw new NoSuchElementException();
return x;
}
public E pollFirst() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 獲取頭節(jié)點的值昼榛,并刪除它
return unlinkFirst();
} finally {
lock.unlock();
}
}
private E unlinkFirst() {
// assert lock.isHeldByCurrentThread();
// 如果隊列為空境肾,返回null
Node<E> f = first;
if (f == null)
return null;
// 重置頭節(jié)點
Node<E> n = f.next;
E item = f.item;
f.item = null;
f.next = f; // help GC
first = n;
if (n == null)
last = null;
else
n.prev = null;
--count;
// 喚醒等待插入的線程
notFull.signal();
return item;
}
public E pollLast() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return unlinkLast();
} finally {
lock.unlock();
}
}
private E unlinkLast() {
// assert lock.isHeldByCurrentThread();
Node<E> l = last;
// 隊列為空,返回null
if (l == null)
return null;
// 更新尾節(jié)點
Node<E> p = l.prev;
E item = l.item;
l.item = null;
l.prev = l; // help GC
last = p;
if (p == null)
first = null;
else
p.next = null;
--count;
notFull.signal();
return item;
}
public E takeFirst() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
// 如果隊列為空胆屿,等待
while ( (x = unlinkFirst()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}
public E takeLast() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
// 如果隊列為空奥喻,等待
while ( (x = unlinkLast()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}
public E pollFirst(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
E x;
while ( (x = unlinkFirst()) == null) {
if (nanos <= 0L)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return x;
} finally {
lock.unlock();
}
}
public E pollLast(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
E x;
while ( (x = unlinkLast()) == null) {
if (nanos <= 0L)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return x;
} finally {
lock.unlock();
}
}
訪問操作
public E getFirst() {
// 復(fù)用peek方法
E x = peekFirst();
if (x == null) throw new NoSuchElementException();
return x;
}
public E getLast() {
E x = peekLast();
if (x == null) throw new NoSuchElementException();
return x;
}
public E peekFirst() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果隊列不為空,返回頭元素
return (first == null) ? null : first.item;
} finally {
lock.unlock();
}
}
public E peekLast() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果隊列不為空非迹,返回尾元素
return (last == null) ? null : last.item;
} finally {
lock.unlock();
}
}
BlockingQueue 方法
由于BlockingDeque
繼承自BlockingQueue
接口环鲤,所以需要實現(xiàn)BlockingQueue
中的方法,具體只需要復(fù)用前面提到的方法即可憎兽。
public boolean add(E e) {
addLast(e);
return true;
}
public boolean offer(E e) {
return offerLast(e);
}
public void put(E e) throws InterruptedException {
putLast(e);
}
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
return offerLast(e, timeout, unit);
}
public E remove() {
return removeFirst();
}
public E poll() {
return pollFirst();
}
public E take() throws InterruptedException {
return takeFirst();
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
return pollFirst(timeout, unit);
}
public E element() {
return getFirst();
}
public E peek() {
return peekFirst();
}
核心要點
- 內(nèi)部使用一個雙向鏈表
- 可以在鏈表兩頭同時進(jìn)行put和take操作冷离,只能使用一個鎖
- 插入線程在執(zhí)行完操作后如果隊列未滿會喚醒其他等待插入的線程吵冒,同時隊列非空還會喚醒等待獲取元素的線程;take線程同理西剥。
- 迭代器與內(nèi)部的雙向鏈表保持弱一致性痹栖,調(diào)用
remove(T)
方法刪除一個元素后,不會解除其對下一個結(jié)點的next引用瞭空,否則迭代器將無法工作揪阿。 - 迭代器的
forEachRemaining(Consumer<? super E> action)
以64個元素為一批進(jìn)行操作 -
forEach(Consumer<? super E> action)
,removeIf
咆畏,removeAll
南捂,retainAll
都是64個元素為一批進(jìn)行操作