下面我們來說一下java中的BlockingQueue承绸。先來看一下BlockingQueue都有哪些方法
offer(E e): 將給定的元素設(shè)置到隊(duì)列中屑彻,如果設(shè)置成功返回true, 否則返回false. e的值不能為空亿絮,否則拋出空指針異常卵蛉。
offer(E e, long timeout, TimeUnit unit): 將給定元素在給定的時(shí)間內(nèi)設(shè)置到隊(duì)列中娘纷,如果設(shè)置成功返回true, 否則返回false.
add(E e): 將給定元素設(shè)置到隊(duì)列中回铛,如果設(shè)置成功返回true, 否則拋出異常狗准。如果是往限定了長度的隊(duì)列中設(shè)置值,推薦使用offer()方法茵肃。
put(E e): 將元素設(shè)置到隊(duì)列中腔长,如果隊(duì)列中沒有多余的空間,該方法會(huì)一直阻塞验残,直到隊(duì)列中有多余的空間捞附。
take(): 從隊(duì)列中獲取值,如果隊(duì)列中沒有值,線程會(huì)一直阻塞鸟召,直到隊(duì)列中有值胆绊,并且該方法取得了該值。
poll(long timeout, TimeUnit unit): 在給定的時(shí)間里欧募,從隊(duì)列中獲取值压状,如果沒有取到會(huì)拋出異常。
remove(Object o): 從隊(duì)列中移除指定的值跟继。
contains(Object o): 判斷隊(duì)列中是否擁有該值种冬。
put和take方法是阻塞的。下面我們就來看一下LinkedBlockingQueue是怎么實(shí)現(xiàn)的舔糖。
LinkedBlockingQueue是基于鏈表實(shí)現(xiàn)的娱两,我們來看一下LinkedBlockingQueue都有哪些屬性
/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();
/**
* Head of linked list.
* Invariant: head.item == null
*/
transient Node<E> head;
/**
* Tail of linked list.
* Invariant: last.next == null
*/
private transient Node<E> last;
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
有一個(gè)head,保存首節(jié)點(diǎn)的引用剩盒,一個(gè)last,保存尾節(jié)點(diǎn)的引用慨蛙,還有一個(gè)tabkLock和Condition辽聊,用來在take操作的時(shí)候阻塞線程的,一個(gè)putLock和Condition期贫,用來在put操作的時(shí)候阻塞線程的跟匆。
LinkedBlockingQueue有一個(gè)內(nèi)部類Node,有一個(gè)屬性item通砍,代表元素玛臂,一個(gè)Node元素的next,代表下一個(gè)節(jié)點(diǎn)
static class Node<E> {
E item;
/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node<E> next;
Node(E x) { item = x; }
}
下面我們來看一下put方法
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
先將值放到Node節(jié)點(diǎn)中封孙,然后上鎖迹冤,如果當(dāng)前隊(duì)列的數(shù)量等于容量,則調(diào)用await方法阻塞等待虎忌,否則進(jìn)行入隊(duì)操作泡徙,然后如果隊(duì)列的數(shù)量小于容量,則喚醒等待的線程繼續(xù)執(zhí)行膜蠢,然后解鎖堪藐,如果隊(duì)列的數(shù)量為0,則喚醒take操作阻塞的線程
再來看一下take方法
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
先上鎖挑围,然后判斷如果隊(duì)列數(shù)量等于0礁竞,則阻塞當(dāng)前線程,否則從隊(duì)列中取出元素杉辙,如果隊(duì)列的數(shù)量大于0模捂,則喚醒被阻塞的線程繼續(xù)執(zhí)行,然后解鎖,如果隊(duì)列的數(shù)量等于容量-1枫绅,則喚醒put操作阻塞的線程
LinkedBlockingQueue就分析到這里了泉孩。