之前我們已經了解過雙向同步隊列了恒傻,LinkedBlockingQueu有一點不同淑际,使用了兩把鎖畏纲,所以相對也更復雜一點
類結構
我們依然是線看其整個類內部機構:
- final int capacity; // 隊列的容量
- final AtomicInteger count = new AtomicInteger(); // 當前元素的數量
- Node<E> head; // 頭結點
- Node<E> last; // 尾節(jié)點
- ReentrantLock takeLock = new ReentrantLock(); // 出隊鎖
- Condition notEmpty = takeLock.newCondition(); // 出隊條件變量
- ReentrantLock putLock = new ReentrantLock(); // 入隊鎖
- Condition notFull = putLock.newCondition(); // 入隊條件變量
static class Node<E> { // 就是一個單向鏈表 E item; Node<E> next; Node(E x) { item = x; } }
由上可以LinkedBlockingQueue使用了兩把鎖,為什么這么做呢庸追,為何不像雙向隊列那樣只使用一把鎖兩個條件變量呢霍骄?
我們帶著疑問往下看。
方法介紹
和雙向隊列一樣淡溯,只要是隊列读整,離不開offer、poll咱娶、peek這幾個重要方法米间。
入隊
其入隊操作也有兩種put、offer膘侮、add
- offer
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; if (count.get() == capacity) return false; int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; putLock.lock(); // 獲取到鎖屈糊,其他入隊操作將被阻塞 try { if (count.get() < capacity) { enqueue(node); // 未滿則加入隊列 c = count.getAndIncrement(); // 并增加隊列數量 if (c + 1 < capacity) notFull.signal(); // 如果還沒滿,繼續(xù)喚醒其他入隊線程 } } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); // c==0說明之前隊列時空的琼了,那么入隊之后就不空了逻锐,因此喚醒出隊線程,但為什么是==0,而不是>= 0呢雕薪? return c >= 0; } public boolean add(E e) { // add 方法繼承自父類昧诱,調用的是offer方法 if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }
- put
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); // 響應中斷 try { while (count.get() == capacity) { // 已經滿了,則等待直到可以入隊 notFull.await(); } enqueue(node); // 入隊 c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); }
上面代碼要點:
- 入隊操作使用 putLock鎖所袁,也就是說不不會阻塞出隊操作
- 入隊成功且如果隊列還是未滿盏档,則釋放之前因為隊列滿了而阻塞的入隊線程
- put和offer的區(qū)別之一:put操作是一個阻塞操作,如果隊列滿則會等待直到可以插入燥爷,所以put的返回值是void永不失敗蜈亩,而offer的返回值是boolean,可能因為隊列滿了而失敗
- put和offer的區(qū)別之二:put會響應中斷
- 回到上面那個問題前翎,為什么c==0 的時候才喚醒出隊線程呢稚配?
這是因為如果c=0,則表示在這次入隊之前隊列時空的港华,那么所有的出隊操作都將被阻塞药有。這個時候需要入隊線程來喚醒。那么為什么c>0的時候不喚醒?其實這個問題我們應該拋開之前講過的雙向隊列的思路愤惰,LinkedBlockingDeque是使用一個鎖同時控制讀寫,所以讀寫互相通知赘理,但這里不一樣宦言,用了兩把鎖。再往上看代碼notFull.signal(),發(fā)現了嗎商模,寫線程會通知寫線程奠旺。這就會大大提升性能,讀和寫互補干擾施流,只有在隊列為空响疚,或者隊列滿時才有交流。
可以看出put和offer的不同之處在于瞪醋,put是一定會成功的忿晕,而offer則因為隊列滿了而失敗。
出隊
同入隊方式相對應也有三種方式poll银受、take践盼、remove,remove也是父類的方法宾巍,調用的是子類的poll方法咕幻。
- poll
public E poll() { final AtomicInteger count = this.count; if (count.get() == 0) // 如果隊列為空,返回null return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { if (count.get() > 0) { x = dequeue(); // 出隊 c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); // 通知下一個出隊線程 } } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); // 如果之前滿了顶霞,則釋放一個入隊操作肄程。 return x; }
take()方法與put方法類似,相對poll选浑,會阻塞直到成功蓝厌,不會返回null,且響應中斷鲜侥。代碼這里不再展示
獲取元素
public E peek() {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}
從代碼可以看出邏輯非常簡單褂始,就是takelock來實現并發(fā)阻塞出隊操作。
其他方法
- size
public int size() { return count.get(); // 這是一個原子類描函,所以是一個精確值 }
- contains
public boolean contains(Object o) { if (o == null) return false; fullyLock(); // 讀寫鎖同時鎖住崎苗,所以也是一個精確值 try { for (Node<E> p = head.next; p != null; p = p.next) if (o.equals(p.item)) return true; return false; } finally { fullyUnlock(); } }
將這兩個函數,主要是與ConcurrentLinkedQueue作對比舀寓,ConcurrentLinkedQueue采用的非阻塞的方式實現了入隊出隊的操作胆数,但其size()和contains()方法并沒有實現同步,因此不精確互墓,但是入隊出隊的效率更高必尼。