基礎概念
Queue:基本上肿轨,隊列就是一個滿足先進先出(FIFO)原則的數據結構荡碾。
一般來說適用于緩沖、并發(fā)訪問的場景均践,之前Volley的分析中就大量用到了queue晤锹。
首先講一下Queue接口,再針對常用的子類去分析具體實現彤委”廾看一下Queue提供的方法:
可以看到Queue提供了三組方法,分別是添加、移除车遂、查詢操作封断,前一組方法在出現錯誤時拋出異常,后面一組會有一個返回值舶担。這就完成了對queue的基本定義坡疼。
上圖是Queue接口的繼承關系圖,下面我們看一下BlockingQueue接口提供的方法:
從代碼中可以看到衣陶,BlockingQueue提供的方法都會拋出一個InterruptedException異常柄瑰,完成了對阻塞隊列的定義。
再針對Deque做一個介紹剪况,Dequeue是Queue的子接口教沾,是雙向隊列支持從兩個端點檢索和插入元素,因此Deque既可以支持FIFO形式也可以支持LIFO形式:
有了上面的這些概念之后译断,下面我會把這些集合分成四類進行分析授翻。
沒實現阻塞接口的Queue
內置的不阻塞隊列:PriorityQueue 和 ConcurrentLinkedQueue。
PriorityQueue
先說一下PriorityQueue的特性孙咪,該隊列是用數組實現堪唐,在不指定大小的情況下默認為11,數組大小可以動態(tài)增加该贾,不是線程安全的羔杨,如果要保證線程安全需要使用PriorityBlockingQueue。不允許使用null元素杨蛋,插入方法(offer()兜材、poll()、remove() 逞力、add()方法)時間復雜度為O(log(n))曙寡,remove(Object)和contains(Object)時間復雜度為O(n)。
檢索方法(peek寇荧、element 和 size)時間復雜度為常量举庶。
PriorityQueue實質上是一個最小堆(在沒指定Comparator的情況下),對元素采用的是堆排序揩抡,頭是按指定排序方式的最小元素户侥,堆排序只能保證根是最大(最小)峦嗤,整個堆并不是有序的蕊唐。也不滿足先進先出的定義,具體我們可以在他插入元素或者移除元素的操作中看出來烁设,下面分析一下插入的流程:
private void siftUpComparable(int k, E x) {
Comparable<? super E> key = (Comparable<? super E>) x;
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = queue[parent];
if (key.compareTo((E) e) >= 0)
break;
queue[k] = e;
k = parent;
}
queue[k] = key;
}
這個實際上就是堆排序的實現過程替梨,queue[0]為根節(jié)點,queue[1]、queue[2]為左右節(jié)點副瀑,queue[3]弓熏、queue[4]為queue[1]左右節(jié)點,以此類推糠睡。關于堆排序處理的具體分析可以參考這篇博客:https://blog.csdn.net/qq_21492635/article/details/73105580
至于隊列出棧的時候挽鞠,輸出的是有序的隊列,具體可以查看poll方法狈孔,輸出queue[0]后滞谢,siftDownComparable方法會對堆進行重新排序,具體規(guī)則是取隊尾的元素與新的根節(jié)點元素進行比較然后依次的往下面的節(jié)點對比除抛,如果原隊尾節(jié)點小于或等于對比節(jié)點,則替換位置母截,否則繼續(xù)循環(huán)左邊的樹到忽。因為最小堆只保證子節(jié)點比父節(jié)點要小,但是兩個子節(jié)點是沒有必然關系的清寇,而這個siftDownComparable方法能夠保證輸出時的有序喘漏。
上面提到了PriorityQueue是會自動擴容的,這里就講一下PriorityQueue的擴容算法华烟,重點都已經注釋:
private void grow(int minCapacity) {
int oldCapacity = queue.length;
//如果原來的容器大小小于64翩迈,新的容器將會是2*oldCapacity + 2,
//否則盔夜,將會擴容50%负饲,新的大小是1.5*oldCapacity
// Double size if small; else grow by 50%
int newCapacity = oldCapacity + ((oldCapacity < 64) ?
(oldCapacity + 2) :
(oldCapacity >> 1));
//這里會校驗一下傳入的參數,如果需要擴容的大小比Integer.MAX_VALUE-8還要大喂链,會直接返回 //Integer.MAX_VALUE否則還是返回Integer.MAX_VALUE-8
// overflow-conscious code
if (newCapacity - MAX_ARRAY_SIZE > 0)
newCapacity = hugeCapacity(minCapacity);
//這里會對數組進行復制 返回新的數組
queue = Arrays.copyOf(queue, newCapacity);
}
所以從上面擴容的操作來看返十,我們初始化容器的時候應該定義一個合適范圍足夠大的數,這樣可以避免頻繁的進行擴容操作椭微,犧牲空間提高性能洞坑。
ConcurrentLinkedQueue ConcurrentLinkedQueue是一個基于鏈表的無界非阻塞隊列,并且是線程安全的蝇率,它采用的是先進先出的規(guī)則迟杂,當我們增加一個元素時,它會添加到隊列的末尾本慕,當我們取一個元素時排拷,它會返回一個隊列頭部的元素。
這里需要提到一下實現一個線程安全的隊列有兩種實現方式:一種是使用阻塞算法间狂,阻塞隊列就是通過使用加鎖的阻塞算法實現的攻泼;另一種非阻塞的實現方式則可以使用循環(huán)CAS(比較并交換)的方式來實現。而現在講的ConcurrentLinkedQueue就是使用CAS算法實現的,非阻塞線程安全隊列忙菠。
首先從ConcurrentLinkedQueue的構造方法看起何鸡,它有兩個構造方法,一個無參一個參數為集合類牛欢。前者會創(chuàng)建空的head骡男、tail,后者會根據數據構造一個鏈表結構傍睹,具體存放數據的是Node隔盛,包含了一個數據域item和next指針:
private static class Node<E> {
volatile E item;
volatile Node<E> next;
}
而這其中具體的操作是由sun.misc.Unsafe完成的,這是SUN未開源的類拾稳,能提供相當強大的操作吮炕,能夠直接操作內存,但是又不提供專業(yè)的文檔访得,sun本身就不推薦使用這個類龙亲。一般應用級的代碼都不應該使用它,框架級別的悍抑,用這個的很多鳄炉,在這里我們只是簡單介紹一下。
在講ConcurrentLinkedQueue的插入搜骡、查詢之前拂盯,還需要了解CAS算法,這個是java.util.concurrent包的基礎记靡。
CAS
CAS:Compare and Swap, 翻譯成比較并交換谈竿。
java.util.concurrent包中借助CAS實現了區(qū)別于synchronouse同步鎖的一種樂觀鎖(悲觀鎖:數據被事物處理持保守態(tài)度,在整個數據處理的過程中將數據出于鎖定狀態(tài)摸吠;樂觀鎖:在修改數據提交的時候對數據進行沖突檢測榕订,如果沖突則讓用戶處理,常見的是對數據添加版本號沖突重試這樣的方式實現蜕便,總而言之在對于頻繁讀的情況下使用樂觀鎖劫恒,寫則使用悲觀鎖https://blog.csdn.net/feeltouch/article/details/78330797)。
CAS有3個操作數轿腺,內存值V两嘴,舊的預期值A,要修改的新值B族壳。當且僅當預期值A和內存值V相同時憔辫,將內存值V修改為B,否則什么都不做仿荆。
有了這些概念后我們看一下插入方法:
public boolean offer(E e) {
final Node<E> newNode = newNode(Objects.requireNonNull(e));
//初始化t贰您、p為隊尾元素
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
//如果p為最后一個節(jié)點
// p is last node
//這里實際的操作是通過cas操作坏平,將p的next更新為newNode
if (casNext(p, null, newNode)) {
//成功的CAS操作會讓e成為隊列中的元素
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
//每操作兩次會更新隊尾,具體原因會在下面分析
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
//CPS競爭失敗 重試
}
//以下兩段代碼 在底下具體分析
else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
else
//每兩次操作 更新
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
}
}
關于插入操作的CAS操作部分比較好理解锦亦,就是隊尾的next為空時更新舶替,但是另外兩個判斷分支第一次看會有點難以理解。首先要明白一個概念杠园,就是tail是延遲刷新的顾瞪,我們模擬一下offer(1)的操作,只會走到CAS操作抛蚁,這里tail是沒有被更新的陈醒,也就是說就算添加了第一個元素,head和tail還是同一個對象瞧甩。具體情況如圖:
當插入第二個元素時钉跷,此時p還是head所以會走這個代碼分支:
p = (p != t && t != (t = tail)) ? t : q;
如果在單線程情況下這里顯然p==t,但是!=不是原子操作肚逸,它是可以被中斷的尘应。在執(zhí)行“!=”時,會先取到左邊t的值吼虎,再去執(zhí)行t=tail,如果并發(fā)環(huán)境下苍鲜,可能存在獲取到了左邊的t后思灰,tail的值被更新,這樣就會存在t!=t的情況存在混滔。這種情況下洒疚,我們應該更新p為鏈表尾部,如果沒有的話我們列表的尾部就指向p坯屿。這段代碼也是一樣:
p = (t != (t = tail)) ? t : head;
如果出現隊尾被修改油湖,則更新p,如果沒有則返回head领跛,從頭部開始查找乏德。至于何時p==q,這就是碰到了哨兵節(jié)點(next指向自己)的情況吠昭,主要表示要刪除的節(jié)點或者空節(jié)點喊括,當遇到哨兵節(jié)點時,無法通過next取得后續(xù)的節(jié)點矢棚,這個時候很可能直接返回head郑什,讓鏈表從頭部開始遍歷。也有可能返回t蒲肋,這樣就進行了一次“打賭”蘑拯,使用新的tail作為鏈表尾部(即t)钝满,這樣避免了重新查找tail的開銷。
具體情況我們可以設想一下申窘,實際上開始循環(huán)時弯蚜,p和tail內存指向是同一個對象,并發(fā)環(huán)境下偶洋,在運行
Node<E> q = p.next;
這段代碼之前熟吏,如果其他線程完成了offer操作,且tail已經被更新玄窝,那這個時候是能拿到p.next的牵寺,如果執(zhí)行到
else if (p == q)
這個判斷時,tail又被其他線程又完成了offer操作恩脂,這個時候tail可能會與q是同一個對象帽氓。
下面說下poll方法,poll方法的作用就是刪除頭結點俩块,返回被刪除的值黎休。
poll的實現原理其實跟offer比較類似,具體細節(jié)參照offer理解
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
//如果頭結點的item不為空 且 CAS操作成功 則返回item
if (item != null && casItem(p, item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
總結一下玉凯,ConcurrentLinkedQueue的線程安全是通過其插入势腮、刪除時采取CAS操作來保證的。不會出現同一個tail結點的next指針被多個同時插入的結點所搶奪的情況出現漫仆。
實現阻塞接口的Queue
由于實現了阻塞接口的隊列核心在于ReentrantLock捎拯,所以我們現需要對ReentrantLock進行了解。
ReentrantLock
首先看一下ReentrantLock類有哪些東西:
ReentrantLock中定義了兩種鎖盲厌,NonfairSync署照、FairSync,一個是非公平鎖吗浩,一個是公平鎖建芙。而兩種鎖的區(qū)別在于:
1、公平鎖能保證:老的線程排隊使用鎖懂扼,新線程仍然排隊使用鎖禁荸。
2、非公平鎖保證:老的線程排隊使用鎖阀湿,但是無法保證新線程搶占已經在排隊的線程的鎖屡限。
具體實現如下:
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
通過閱讀代碼可以明顯發(fā)現其中不同,lock時炕倘,非公平鎖會先去嘗試直接CAS操作修改狀態(tài)搶鎖失敗后在nonfairTryAcquire()方法中還會再次嘗試一次钧大。而公平鎖會去優(yōu)先通過hasQueuedPredecessors()方法判斷隊列中是否有等待的隊列(是否有前驅節(jié)點),當沒有前驅節(jié)點的時候才會通過CAS修改同步狀態(tài)變量罩旋。至于另一個if分支啊央,其實是實現一個重入鎖機制眶诈,在鎖狀態(tài)為0時,重新嘗試獲取鎖瓜饥。如果已經被占用逝撬,那么做一次是否當前線程為占用鎖的線程的判斷,如果是一樣的那么進行計數乓土,當然在鎖的relase過程中會進行遞減宪潮,保證鎖的正常釋放。
AQS中的方法:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
這里的addWaiter()方法實際上就是構造一個node趣苏,把當前線程關聯起來狡相,添加到隊尾。這里的重點在于acquireQueued():
final boolean acquireQueued(final Node node, int arg) {
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
這個方法實現了Node節(jié)點線程的自旋食磕,主要是檢查當前節(jié)點是不是head的next節(jié)點尽棕,如果是的話就嘗試獲取鎖并返回。
其實需要里的是shouldParkAfterFailedAcquire()方法:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
現在簡要的分析一下shouldParkAfterFailedAcquire流程彬伦,首先判斷node節(jié)點的前驅節(jié)點pre當前狀態(tài)滔悉,如果為SIGNAL,則代表已經park了(park是通過LockSupport類對線程進行阻塞)单绑。如果狀態(tài)值大于0則代表此節(jié)點的線程已經cancel回官,這個時候會進入循環(huán),不斷的把節(jié)點前移搂橙,丟棄掉cancel狀態(tài)的node歉提。如果node狀態(tài)值=<0,則將該node狀態(tài)值置為SIGNAL份氧。
由于調用這個方法的是一個死循環(huán),所以這個方法最終會返回true弯屈,如果在這個循環(huán)過程中一直沒有獲取鎖成功蜗帜,之后就會調用parkAndCheckInterrupt對線程進行阻塞,然后返回線程的阻塞狀態(tài)资厉。
這樣設計就避免了在爭奪鎖時候不斷的循環(huán)檢查浪費資源厅缺。
下面看一下unlock方法:
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
tryRelease會獲取當前的state減去1,這里我們上面也講過了ReentrantLock是有重入機制的宴偿,所以當state - 1直到等于0的時候湘捎,才去釋放鎖。然后這個時候獲取head窄刘,不為空且狀態(tài)不為0時調用unparkSuccessor:
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
node.compareAndSetWaitStatus(ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
if (s != null)
LockSupport.unpark(s.thread);
}
這里會把頭節(jié)點的狀態(tài)置為0(這個時候非公平鎖在有新進程的情況下是有機會獲取到鎖的)窥妇,然后獲取next節(jié)點判斷其合法性,如果s是正常則執(zhí)行unpark操作娩践,對應之前我們的park,使線程繼續(xù)執(zhí)行幌蚊。如果next節(jié)點不合規(guī)烟瞧,則從隊列的尾部向前遍歷,找到合規(guī)的節(jié)點置為s沮焕。
Condition
由于阻塞隊列中Lock是和Condition一起使用完成讀寫過程的,所以一并講一下Condition拉宗。
我們知道在Object類中提供了wait峦树、notify方法用于線程的阻塞、喚醒旦事。而Condition是更加靈活可定時魁巩,可輪詢,可中斷族檬,公平隊列歪赢,及非塊結構的鎖。具體的用法大家在源碼中也可以看到单料,這里不再贅述埋凯。
至此ReentrantLock部分已經講完,ReentrantLock具有公平鎖和非公平鎖兩種模式扫尖,公平鎖是按照FIFO的模式進行鎖的競爭白对,而非公平鎖是無序競爭,剛釋放鎖的時候新線程是能較快獲取到鎖的换怖,但是隊列中的線程只能等待甩恼。
但是通常情況下掛起的線程重新開始與它真正開始運行,二者之間會產生嚴重的延時沉颂。因此非公平鎖就可以利用這段時間完成操作条摸。這是非公平鎖在某些時候比公平鎖性能要好的原因之一。所以非公平鎖的吞吐量是大于公平鎖的铸屉,這也是為什么JDK將非公平鎖作為默認的實現钉蒲。
PriorityBlockingQueue
PriorityBlockingQueue這個跟我們上面所講的PriorityQueue是比較類似的,之前在分析Volley的文章中也有提到過彻坛,所以這里不再做詳解顷啼。
最大的不同之處在于有一個ReentrantLock,在對數據進行操作時會有加鎖操作昌屉,完成后釋放钙蒙。所以理解PriorityBlockingQueue的關鍵在于理解ReentrantLock。其中使用Condition间驮,明確的指定喚醒讀線程躬厌。
ArrayBlockingQueue
這個邏輯比較簡單,內部維護的是一個數組竞帽,在初始化隊列的時候需要傳入容器的size烤咧,這里也不贅述了偏陪。
Dequeue
LinkedList
待更新
LinkedBlockingDeque
待更新
參考:
https://www.cnblogs.com/gnivor/p/4841191.html
http://www.cnblogs.com/Joe-Go/p/9757394.html
http://www.cnblogs.com/zhimingyang/p/5702752.html
https://www.cnblogs.com/xrq730/p/4979021.html
https://www.cnblogs.com/yulinfeng/p/6899316.html