LinkedBolckingQueue源碼學習
LinkedBolckingQueue是JUC包下基于鏈表實現(xiàn)的隊列蒿讥,隊列最大容量是int的最大正值蝶念,實現(xiàn)了BlockingQueue接口,可以阻塞入隊出隊芋绸,可以用于工作隊列媒殉,Executors.newFixedThreaadPool就是用此隊列作為工作隊列的
?
- 首先是靜態(tài)內(nèi)部類節(jié)點Node
static class Node<E> {
E item;
/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node<E> next;
Node(E x) { item = x; }
}
由此看出這是一個單向鏈表,只持有下一個節(jié)點打引用和當前節(jié)點保存的元素E
?
- 成員變量
int capacity //容量
AtomicInteger count //當前元素數(shù)量
Node<E> head //指向頭節(jié)點
Node<E> last //指向尾節(jié)點
ReentrantLock takeLock //出對鎖
Condition notEmpty = takeLock.newCondition() //由入隊鎖的Condition
ReentrantLock putLock //入隊鎖
Condition notFull = putLock.newCondition()//由出隊鎖打Condtion
- 構造方法
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
構造方法沒有什么好說打摔敛,如果不傳參的話廷蓉,默認是用int打最大值作為隊列最大容量
并且把成員變量last=head設置為null。
?
- 入隊系列方法
首先看一下如何加入隊列打頭部打私有方法舷夺,這個方法將被入隊的方法調(diào)用
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
非常簡單苦酱。。因為容量檢查给猾,并發(fā)檢查都被調(diào)用它的方法處理啦
1.帶阻塞時間打offer方法
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
//不可以入隊null
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);//轉(zhuǎn)換時間單位Condition.awaitNanos()
int c = -1;
//入隊鎖
final ReentrantLock putLock = this.putLock;
//獲取當前的隊列長度
final AtomicInteger count = this.count;
//嘗試獲取可中斷的鎖
putLock.lockInterruptibly();
try {
//隊列滿的情況下疫萤,進入這個循環(huán)
while (count.get() == capacity) {
//等待時間已過,返回入隊失敗
if (nanos <= 0)
return false;
//否則休眠
nanos = notFull.awaitNanos(nanos);
}
//獲取鎖并且隊列未滿敢伸,入隊
enqueue(new Node<E>(e));
//原子增加
c = count.getAndIncrement();
//如果發(fā)現(xiàn)隊列未滿扯饶,喚醒其他在隊列滿打狀態(tài)等待打線程
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
//如果c=0,說嗎在當前元素入隊前隊列為空,
//可能由其他線程在等待出隊元素,喚醒
if (c == 0)
signalNotEmpty();
return true;
}
無阻塞參數(shù)的offer方法更加簡單尾序,就不寫啦钓丰,當成timeout為0的用就是了
2.put方法
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
沒啥好寫的,傻等的主每币。携丁。。
?
- 出隊系列方法
老規(guī)矩兰怠,先看刪除節(jié)點的dequeue方法
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;//頭節(jié)點的元素是空的
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
操作有點復雜梦鉴,首先要了解一個概念,頭節(jié)點是一直是空的揭保,這樣做是為了方便統(tǒng)一簡化在隊列未空時打出隊入隊操作
明白了這一點肥橙,以上方法打邏輯是移除當前打頭節(jié)點,把接下的那個不為空的節(jié)點打item彈出秸侣,然后item置為null存筏,把它設置未頭節(jié)點。
這個過程需要注意把彈出的頭節(jié)點的next設置為自身味榛,幫助gc
1.帶阻塞參數(shù)的poll方法
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
//出隊鎖
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
if (nanos <= 0)
return null;
//未超時的話一直等待
nanos = notEmpty.awaitNanos(nanos);
}
//出隊
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
不帶阻塞參數(shù)打poll方法更簡單椭坚,隊列為空就直接返回null
2.take方法
//無法出隊就傻等
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
?
- remove方法
public boolean remove(Object o) {
if (o == null) return false;
//全局鎖定,就是入隊出隊一起鎖定
fullyLock();
try {
//循環(huán)遍歷
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
//下面看看unlink方法
unlink(p, trail);
return true;
}
}
return false;
} finally {
fullyUnlock();
}
}
//p是要刪除打那個節(jié)點搏色,而trail是前一個節(jié)點
void unlink(Node<E> p, Node<E> trail) {
// assert isFullyLocked();
// p.next is not changed, to allow iterators that are
// traversing p to maintain their weak-consistency guarantee.
p.item = null;
//把trail打next指向p的next
trail.next = p.next;
if (last == p)
last = trail;
if (count.getAndDecrement() == capacity)
notFull.signal();
}
//上面說的那個頭節(jié)點之所以要保持未空就是為了刪除方便
?
- 其他方法
1.toArray
全局鎖定藕溅,轉(zhuǎn)換為數(shù)組返回
2.Iterator
private Node<E> current;
private Node<E> lastRet;
private E currentElement;
public boolean hasNext() {
return current != null;
}
public E next() {
fullyLock();
try {
if (current == null)
throw new NoSuchElementException();
E x = currentElement;
lastRet = current;
current = nextNode(current);
/寫入下一次next時候打element值
currentElement = (current == null) ? null : current.item;
return x;
} finally {
fullyUnlock();
}
}
/**
*即使調(diào)用它的next()方法全局鎖定了,但是還可能會出現(xiàn)p節(jié)點已經(jīng)出隊的情況
*這個時候直接從事實上的head節(jié)點從頭開始遍歷
**/
private Node<E> nextNode(Node<E> p) {
for (;;) {
Node<E> s = p.next;
if (s == p)
return head.next;
if (s == null || s.item != null)
return s;
p = s;
}
}
?
再來看看LinkedBlockingDeque:
雙端隊列继榆,可以在隊列頭和尾執(zhí)行插入,既是一個隊列汁掠,也是一個棧略吨,具體打代碼細節(jié)和LinkedBlockingQueue相似,只是這個是可以在雙向入隊的隊列考阱。