LinkedBlockingQueue
類實(shí)現(xiàn)了BlockingQueue
接口邻储。閱讀BlockingQueue
文本以獲取有關(guān)的更多信息蹄皱。
LinkedBlockingQueue
在內(nèi)部將元素存儲在鏈接結(jié)構(gòu)(鏈接節(jié)點(diǎn))中。如果需要菌湃,該鏈接結(jié)構(gòu)可以具有一個上限问拘。如果未指定上限,則使用Integer.MAX_VALUE
作為上限惧所。
LinkedBlockingQueue
內(nèi)部將元素以FIFO(先入先出)次序存儲骤坐。隊列的頭部是已在隊列中的時間最長的元素,隊列的尾部是已在隊列中的時間最短的元素下愈。
以下是如何實(shí)例化和使用LinkedBlockingQueue
:
BlockingQueue<String> unbounded = new LinkedBlockingQueue<String>();
BlockingQueue<String> bounded = new LinkedBlockingQueue<String>(1024);
bounded.put("Value");
String value = bounded.take();
源碼
LinkedBlockingQueue
內(nèi)部使用了一個單向鏈表纽绍,同時它提供了兩個鎖,一個用于獲取并刪除元素势似,一個用于增加元素拌夏。count
字段使用原子變量,避免修改它時需要同時獲取兩個鎖履因。
static class Node<E> {
E item;
/**
* 下面中的一個:
* - 真實(shí)的后繼節(jié)點(diǎn)
* - 這個節(jié)點(diǎn)本身障簿,此時原后繼節(jié)點(diǎn)現(xiàn)在是head.next,即第一個元素
* - null, 意味沒有后繼節(jié)點(diǎn)栅迄,此節(jié)點(diǎn)是隊列最后一個節(jié)點(diǎn)
*/
Node<E> next;
Node(E x) { item = x; }
}
private final int capacity;
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();
/**
* Head of linked list.
* Invariant: head.item == null
*/
transient Node<E> head;
/**
* Tail of linked list.
* Invariant: last.next == null
*/
private transient Node<E> last;
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
增加操作
注意進(jìn)行增加操作時站故,只對putLock
加鎖,如果還對takeLock
也進(jìn)行加鎖霞篡,那么就會影響性能世蔗。同時,為了彌補(bǔ)此方法帶來的后果朗兵,count
使用原子變量污淋,進(jìn)行CAS更新,防止數(shù)據(jù)不一致余掖。
為了提升性能寸爆,在增加元素成功后礁鲁,如果隊列還沒有滿,那么便喚醒其他因隊列滿而被阻塞的插入線程赁豆。
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
// 注意即使count沒有被鎖保護(hù)仅醇,它依然可以被用作等待條件
// 判定。因為此時count只會被減少(putLock已加鎖)魔种,如果容量
// 改變析二,會被喚醒。count在其他地方的使用也與此相似节预。
// 隊列已滿叶摄,阻塞自己
while (count.get() == capacity) {
notFull.await();
}
// 插入隊列中
enqueue(node);
// CAS更新count值
c = count.getAndIncrement();
// 如果隊列沒滿,喚醒其他等待插入的線程
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
// 如果隊列原來是空隊列安拟,喚醒等待提取元素的線程
if (c == 0)
signalNotEmpty();
}
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
// 先加鎖蛤吓,才能調(diào)用對應(yīng)Condtion的signal()方法
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
// 隊列已滿,返回false
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
// 等待-超時機(jī)制
while (count.get() == capacity) {
if (nanos <= 0L)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
刪除操作
刪除操作與增加操作一樣糠赦。
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 當(dāng)隊列為空会傲,阻塞自己
while (count.get() == 0) {
notEmpty.await();
}
// 將頭節(jié)點(diǎn)出隊
x = dequeue();
c = count.getAndDecrement();
// 如果隊列還有元素,喚醒其他等待提取元素的線程
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
// 如果原本隊列是滿的拙泽,喚醒增加線程淌山,因為現(xiàn)在元素已經(jīng)被取出,隊列不滿
if (c == capacity)
signalNotFull();
return x;
}
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
// 頭節(jié)點(diǎn)為空奔滑,其中不存儲元素
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
if (nanos <= 0L)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
// 刪除一個指定元素
public boolean remove(Object o) {
if (o == null) return false;
// 將兩個鎖全部加鎖
fullyLock();
try {
for (Node<E> pred = head, p = pred.next;
p != null;
pred = p, p = p.next) {
if (o.equals(p.item)) {
// 從隊列中移除此節(jié)點(diǎn)
unlink(p, pred);
return true;
}
}
return false;
} finally {
// 釋放全部兩個鎖
fullyUnlock();
}
}
void unlink(Node<E> p, Node<E> pred) {
// assert putLock.isHeldByCurrentThread();
// assert takeLock.isHeldByCurrentThread();
// p.next沒有被設(shè)置為null艾岂,為了保證迭代器遍歷到p時繼續(xù)工作,
// 保證弱一致性
p.item = null;
pred.next = p.next;
if (last == p)
last = pred;
if (count.getAndDecrement() == capacity)
notFull.signal();
}
訪問操作
public E peek() {
// 隊列為空朋其,返回null
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// 返回第一個元素
return (count.get() > 0) ? head.next.item : null;
} finally {
takeLock.unlock();
}
}
其他操作
public void clear() {
fullyLock();
try {
for (Node<E> p, h = head; (p = h.next) != null; h = p) {
// 使得next指向自己
h.next = h;
// 解除對元素實(shí)體的引用
p.item = null;
}
head = last;
// assert head.item == null && head.next == null;
// 如果原來隊列是滿的王浴,喚醒等待的插入線程
if (count.getAndSet(0) == capacity)
notFull.signal();
} finally {
fullyUnlock();
}
}
public int drainTo(Collection<? super E> c) {
return drainTo(c, Integer.MAX_VALUE);
}
public int drainTo(Collection<? super E> c, int maxElements) {
Objects.requireNonNull(c);
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
boolean signalNotFull = false;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// 獲取當(dāng)前隊列中的元素數(shù)量
int n = Math.min(maxElements, count.get());
// count.get provides visibility to first n Nodes
Node<E> h = head;
int i = 0;
try {
// 將n個元素加入到指定集合中
while (i < n) {
Node<E> p = h.next;
c.add(p.item);
p.item = null;
h.next = h;
h = p;
++i;
}
return n;
} finally {
// Restore invariants even if c.add() threw
if (i > 0) {
// assert h.item == null;
head = h;
signalNotFull = (count.getAndAdd(-i) == capacity);
}
}
} finally {
takeLock.unlock();
if (signalNotFull)
signalNotFull();
}
}
迭代器
LinkedBlockingQueue
的迭代器與DelayQueue
的不同,DelayQueue
的迭代器與原組件沒有任何的一致性梅猿,而LinkedBlockingQueue
的迭代器與內(nèi)部的鏈表保持了弱一致性氓辣。
注意它的next()
方法,它會跳過內(nèi)容為null的節(jié)點(diǎn)袱蚓,回憶前面刪除操作中的remove(Object)
方法钞啸,他沒有修改節(jié)點(diǎn)的next字段,如果修改了喇潘,迭代器就會無法正常工作体斩,而為了保證一致性,迭代器也需要跳過這個空節(jié)點(diǎn)颖低。
而它的forEachRemaining(Consumer<? super E> action)
方法是分批次進(jìn)行處理的絮吵,每批64個元素,如果數(shù)量小于64忱屑,那就使用此數(shù)量蹬敲。
private class Itr implements Iterator<E> {
private Node<E> next; // 持有nextItem的節(jié)點(diǎn)
private E nextItem; // 下一個進(jìn)行處理的元素
private Node<E> lastRet; // 上一個返回的元素暇昂,即當(dāng)前正在使用的
private Node<E> ancestor; // Helps unlink lastRet on remove()
Itr() {
fullyLock();
try {
// 保存第一個元素
if ((next = head.next) != null)
nextItem = next.item;
} finally {
fullyUnlock();
}
}
public boolean hasNext() {
return next != null;
}
public E next() {
Node<E> p;
if ((p = next) == null)
throw new NoSuchElementException();
lastRet = p;
E x = nextItem;
fullyLock();
try {
E e = null;
// 注意此處,遇到空節(jié)點(diǎn)會跳過去訪問下一個節(jié)點(diǎn)
for (p = p.next; p != null && (e = p.item) == null; )
p = succ(p);
next = p;
nextItem = e;
} finally {
fullyUnlock();
}
return x;
}
Node<E> succ(Node<E> p) {
// 正常出隊的元素next字段會指向自己
if (p == (p = p.next))
p = head.next;
return p;
}
public void forEachRemaining(Consumer<? super E> action) {
// A variant of forEachFrom
Objects.requireNonNull(action);
Node<E> p;
if ((p = next) == null) return;
lastRet = p;
next = null;
final int batchSize = 64;
Object[] es = null;
int n, len = 1;
do {
fullyLock();
try {
if (es == null) {
p = p.next;
// 獲取真正存在的元素的數(shù)量伴嗡,如果多于64急波,分批進(jìn)行,一批為64個
for (Node<E> q = p; q != null; q = succ(q))
if (q.item != null && ++len == batchSize)
break;
es = new Object[len];
es[0] = nextItem;
nextItem = null;
n = 1;
} else
n = 0;
// n為1的使用只因為p=p.next瘪校,經(jīng)過此步后p已經(jīng)不是首元素澄暮,
// 而是第二個元素。而后面批次的插入直接從0開始即可
// 將元素放入數(shù)組中
for (; p != null && n < len; p = succ(p))
if ((es[n] = p.item) != null) {
lastRet = p;
n++;
}
} finally {
fullyUnlock();
}
// 分別調(diào)用accept方法
for (int i = 0; i < n; i++) {
@SuppressWarnings("unchecked") E e = (E) es[i];
action.accept(e);
}
} while (n > 0 && p != null);
}
public void remove() {
// 獲取當(dāng)前元素
Node<E> p = lastRet;
if (p == null)
throw new IllegalStateException();
lastRet = null;
fullyLock();
try {
if (p.item != null) {
if (ancestor == null)
ancestor = head;
// 獲取p的前驅(qū)結(jié)點(diǎn)
ancestor = findPred(p, ancestor);
// 從鏈表中刪除結(jié)點(diǎn)p
unlink(p, ancestor);
}
} finally {
fullyUnlock();
}
}
}
測試:
import org.junit.Test;
import java.util.Iterator;
import java.util.concurrent.LinkedBlockingQueue;
public class LinkedBlockingQueueTest {
private static LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
@Test
public void test() {
queue.offer("1");
queue.offer("2");
queue.offer("3");
queue.offer("4");
Iterator<String> itr = queue.iterator();
queue.remove("3");
itr.forEachRemaining(System.out::println);
}
}
輸出如下:
1
2
4
核心要點(diǎn)
- 內(nèi)部使用一個單向鏈表阱扬,以FIFO順序存儲
- 可以在鏈表兩頭同時進(jìn)行操作赏寇,所以使用兩個鎖分別保護(hù)
- 插入線程在執(zhí)行完操作后如果隊列未滿會喚醒其他等待插入的線程,同時隊列非空還會喚醒等待獲取元素的線程价认;提取線程同理。
- 迭代器與單向鏈表保持弱一致性自娩,調(diào)用
remove(T)
方法刪除一個元素后用踩,不會解除其對下一個結(jié)點(diǎn)的next引用,否則迭代器將無法工作忙迁。 - 迭代器的
forEachRemaining(Consumer<? super E> action)
以64個元素為一批進(jìn)行操作