JDK中蒙揣,提供了一個ConcurrentLinkedQueue類來實現(xiàn)高并發(fā)的隊列靶溜。這個隊列使用鏈表作為其數(shù)據(jù)結(jié)構(gòu)。這個類算是高并發(fā)環(huán)境中性能最好的隊列懒震。高性能是因為其內(nèi)部復雜的實現(xiàn)罩息。
CAS
CAS有3個操作數(shù),內(nèi)存值V个扰,舊的預期值A(chǔ)瓷炮,要修改的新值B。當且僅當預期值A(chǔ)和內(nèi)存值V相同時递宅,將內(nèi)存值V修改為B娘香,否則什么都不做苍狰。
ConcurrentLinkedQueue是一種線程安全的隊列。他是使用非阻塞算法(CAS)來實現(xiàn)線程安全的茅主。ConcurrentLinkedQueue是一個基于鏈接節(jié)點的無界線程安全隊列舞痰,它采用先進先出的規(guī)則對節(jié)點進行排序,當我們添加一個元素的時候诀姚,它會添加到隊列的尾部响牛;當我們獲取一個元素時,它會返回隊列頭部的元素赫段。
ConcurrentLinkedQueue結(jié)構(gòu)
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
implements Queue<E>, java.io.Serializable
ConcurrentLinkedQueue由head節(jié)點和tail節(jié)點組成呀打,每個節(jié)點(Node)由節(jié)點元素(item)和指向下一個節(jié)點(next)的引用組成,節(jié)點與節(jié)點之間就是通過這個next關(guān)聯(lián)起來糯笙,從而組成一張鏈表結(jié)構(gòu)的隊列贬丛。默認情況下head節(jié)點存儲的元素為空,tail節(jié)點等于head節(jié)點
private static class Node<E> {
private volatile E item; // 聲明為 volatile 型
private volatile Node<E> next; // 聲明為 volatile 型
Node(E item) { // 創(chuàng)建新節(jié)點
lazySetItem(item); // 惰性設(shè)置 item 域的值
}
E getItem() {
return item;
}
boolean casItem(E cmp, E val) { // 使用 CAS 指令設(shè)置 item 域的值
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
void setItem(E val) { // 使用“volatile 寫”的方式给涕,設(shè)置 item 域的值
item = val;
}
void lazySetItem(E val) { //惰性設(shè)置 item 域的值
UNSAFE.putOrderedObject(this, itemOffset, val);
}
void lazySetNext(Node<E> val) { // 惰性設(shè)置 next 域的值
UNSAFE.putOrderedObject(this, nextOffset, val);
}
Node<E> getNext() {
return next;
}
//CAS 設(shè)置 next 域的值
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
private static final sun.misc.Unsafe UNSAFE= // 域更新器
sun.misc.Unsafe.getUnsafe();
private static final long nextOffset= //next 域的偏移量
objectFieldOffset(UNSAFE, "next", Node.class);
private static final long itemOffset= //item 域的偏移量
objectFieldOffset(UNSAFE, "item", Node.class);
}
其實這是一個靜態(tài)內(nèi)部類豺憔,item是來表示元素的。next表示當前Node的下一個元素够庙。
對Node進行操作時恭应,使用了CAS操作:
//表示設(shè)置當前Node的item值。第一個參數(shù)為期望值耘眨,第二個參數(shù)為設(shè)置目標值昼榛。當當前值等于期望值時(就是沒有被其他人改過),就會將目標設(shè)置為val剔难。
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
//和上個方法類似胆屿,是用來設(shè)置next字段。
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
ConcurrentLinkedQueue內(nèi)部有兩個重要的字段:head和tail偶宫,分別表示鏈表的頭部和尾部非迹,當然都得是Node類型。對于head來說读宙,它永遠不會為null彻秆,并且通過head和succ()后繼方法一定能完整地遍歷整個鏈表。對于tail來說结闸,它自然表示隊列的末尾唇兑。
以下是構(gòu)造函數(shù):
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}
但是ConcurrentLinkedQueue的內(nèi)部實現(xiàn)非常復雜,它允許在運行時鏈表處于多個不同的狀態(tài)桦锄。以tail為例扎附,一般來說我們期望tail總是作為鏈表的末尾,但實際上tail的更新并不是及時的结耀,可能會產(chǎn)生拖延現(xiàn)象留夜。就是說匙铡,有可能指向的并不是真正的尾巴,真正的可能在后面一個或者多個碍粥。
再來看看Node類的結(jié)構(gòu):
private static class Node<E> {
volatile E item;
volatile Node<E> next;
.......
}
Node節(jié)點主要包含了兩個域:一個是數(shù)據(jù)域item鳖眼,另一個是next指針,用于指向下一個節(jié)點從而構(gòu)成鏈式隊列嚼摩。并且都是用volatile進行修飾的钦讳,以保證內(nèi)存可見性
另外ConcurrentLinkedQueue含有這樣兩個成員變量:
private transient volatile Node<E> head;
private transient volatile Node<E> tail;
說明ConcurrentLinkedQueue通過持有頭尾指針進行管理隊列。當我們調(diào)用無參構(gòu)造器時枕面,其源碼為:
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}
head和tail指針會指向一個item域為null的節(jié)點,此時ConcurrentLinkedQueue狀態(tài)如下圖所示:
如圖愿卒,head和tail指向同一個節(jié)點Node0,該節(jié)點item域為null,next域為null潮秘。
操作Node的幾個CAS操作
在隊列進行出隊入隊的時候免不了對節(jié)點需要進行操作琼开,在多線程就很容易出現(xiàn)線程安全的問題。
可以看出在處理器指令集能夠支持CMPXCHG指令后枕荞,在java源碼中涉及到并發(fā)處理都會使用CAS操作,那么在ConcurrentLinkedQueue對Node的CAS操作有上面提到的這幾個:
//更改Node中的數(shù)據(jù)域item
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
//更改Node中的指針域next
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
//更改Node中的指針域next
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
可以看出這些方法實際上是通過調(diào)用UNSAFE實例的方法柜候,UNSAFE為sun.misc.Unsafe類,該類是hotspot底層方法躏精,目前為止了解即可改橘,知道CAS的操作歸根結(jié)底是由該類提供就好。
節(jié)點類型說明
有效節(jié)點:從 head 向后遍歷可達的節(jié)點當中玉控,item 域不為 null 的節(jié)點。
無效節(jié)點:從 head 向后遍歷可達的節(jié)點當中狮惜,item 域為 null 的節(jié)點高诺。
已刪除節(jié)點:從 head 向后遍歷不可達的節(jié)點。
哨兵節(jié)點:鏈接到自身的節(jié)點(哨兵節(jié)點同時也是以刪除節(jié)點)碾篡。
頭節(jié)點:隊列中的第一個有效節(jié)點(如果有的話)虱而。
尾節(jié)點:隊列中 next 域為 null 的節(jié)點(可以是無效節(jié)點)。
在后面的源代碼分析中开泽,我們將會看到隊列有時會處于不一致狀態(tài)牡拇。為此,ConcurrentLinkedQueue 使用三個不變式 ( 基本不變式穆律,head 的不變式和 tail 的不變式 )惠呼,來約束隊列中方法的執(zhí)行。通過這三個不變式來維護非阻塞算法的正確性
基本不變式
在執(zhí)行方法之前和之后峦耘,隊列必須要保持的不變式:
- 當入隊插入新節(jié)點之后剔蹋,隊列中有一個 next 域為 null 的(最后)節(jié)點。
- 從 head 開始遍歷隊列辅髓,可以訪問所有 item 域不為 null 的節(jié)點泣崩。
head 的不變式和可變式
在執(zhí)行方法之前和之后少梁,head 必須保持的不變式:
- 所有“活著”的節(jié)點(指未刪除節(jié)點),都能從 head 通過調(diào)用 succ() 方法遍歷可達矫付。
- head 不能為 null凯沪。
- head 節(jié)點的 next 域不能引用到自身。
在執(zhí)行方法之前和之后买优,head 的可變式: - head 節(jié)點的 item 域可能為 null妨马,也可能不為 null。
- 允許 tail 滯后(lag behind)于 head而叼,也就是說:從 head 開始遍歷隊列身笤,不一定能到達 tail。
tail 的不變式和可變式
在執(zhí)行方法之前和之后葵陵,tail 必須保持的不變式:
- 通過 tail 調(diào)用 succ() 方法液荸,最后節(jié)點總是可達的。
- tail 不能為 null脱篙。
在執(zhí)行方法之前和之后娇钱,tail 的可變式: - tail 節(jié)點的 item 域可能為 null,也可能不為 null绊困。
- 允許 tail 滯后于 head文搂,也就是說:從 head 開始遍歷隊列,不一定能到達 tail秤朗。
- tail 節(jié)點的 next 域可以引用到自身煤蹭。
入隊列過程
入隊列就是將入隊節(jié)點添加到隊列的尾部,在 ConcurrentLinkedQueue 中取视,插入新節(jié)點時硝皂,不用考慮尾節(jié)點是否為有效節(jié)點,直接把新節(jié)點插入到尾節(jié)點的后面即可作谭。由于 tail 可以指向任意節(jié)點礁鲁,所以入隊時必須先通過 tail 找到尾節(jié)點鸳君,然后才能執(zhí)行插入操作出革。如果插入不成功(說明其他線程已經(jīng)搶先插入了一個新的節(jié)點)就繼續(xù)向后推進谓着。重復上述迭代過程,直到插入成功為止锐秦。
/**
* Inserts the specified element at the tail of this queue.
* 在隊列后插入一個元素
* As the queue is unbounded, this method will never return {@code false}.
* 因為隊列是無限大的咪奖,這個方法永遠不會回false
*
* @return {@code true} (as specified by {@link Queue#offer})
* 返回true,
* @throws NullPointerException if the specified element is null
* 如果某元素為空則拋出空指針異常
*/
public boolean offer(E e) {
checkNotNull(e);//檢查e是否為空
final Node<E> newNode = new Node<E>(e);//創(chuàng)建新節(jié)點
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) { //1
// p is last node. p是最后一個節(jié)點
if (p.casNext(null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
if (p != t) // hop two nodes at a time
//每兩次农猬,更新一下tail
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next CAS競爭失敗赡艰,再次嘗試。
}
else if (p == q)//2
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
//遇到哨兵節(jié)點斤葱,從head開始遍歷慷垮。
//但如果tail被修改揖闸,則使用tail(因為可能被修改正確了)
p = (t != (t = tail)) ? t : head;
else //3
// Check for tail updates after two hops.取下一個節(jié)點或者最后一個節(jié)點
p = (p != t && t != (t = tail)) ? t : q;
}
}
這個方法沒有任何鎖操作。線程安全完全由CAS操作和隊列的算法來保證料身。整個算法的核心是for循環(huán)汤纸,這個循環(huán)沒有出口,知道嘗試成功芹血,這也符合CAS操作的流程贮泞。
當?shù)谝淮渭尤朐氐臅r候,由于隊列為空幔烛,因此p.next為null啃擦。程序進入1處,程序?qū)的next節(jié)點賦值為newNode饿悬,也就是將新的元素加入到隊列中令蛉。此時p==t成立,也就是不會執(zhí)行casTail()代碼更新tail末尾狡恬。如果casNext()成功,程序直接返回珠叔,如果失敗,則再進行一次循環(huán)嘗試弟劲,直到成功祷安。因此,增加一個元素后兔乞,tail并不會被更新汇鞭。
當程序嘗試加入第二個元素時,由于此時t還在head的位置上庸追,因此p.next指向?qū)嶋H的第一個元素虱咧,因此1處的q!=null,這表示q不是最后的節(jié)點锚国。由于往隊列中增加元素需要最后一個節(jié)點,因此玄坦,循環(huán)開始中查找最后一個節(jié)點血筑。于是,程序進入3處煎楣,獲得最后一個節(jié)點豺总。此時,p實際上是指向鏈表中的第一個元素择懂,它的next是null喻喳,故在第二個循環(huán)的時候,進入1處困曙,p更新自己的next表伦,讓它指向新加入的節(jié)點谦去。如果成功,由于此時p蹦哼!=t成功鳄哭,則會更新t所在位置,將t移到鏈表最后纲熏。
2處處理了p==q的情況妆丘。這種情況是由于遇到了哨兵節(jié)點導致的。所謂哨兵節(jié)點局劲,就是next指向自己的節(jié)點勺拣。這種節(jié)點在隊列中的存在價值不大,主要表示要刪除的節(jié)點或者空節(jié)點鱼填。當遇到哨兵節(jié)點的時候药有,由于無法通過next取得后續(xù)的節(jié)點,因此很可能直接返回head剔氏,期望通過從鏈表頭部開始遍歷塑猖,進一步查找到鏈表末尾。但一旦發(fā)生在執(zhí)行過程中谈跛,tail被其他線程修改的情況羊苟,則進行一次“打賭”,使用新的tail作為鏈表末尾感憾,這樣就避免了重新查找tail的開銷蜡励。
- 添加元素1。隊列更新head節(jié)點的next節(jié)點為元素1節(jié)點阻桅。又因為tail節(jié)點默認情況下等于head節(jié)點凉倚,所以它們的next節(jié)點都指向元素1節(jié)點。
- 添加元素2嫂沉。隊列首先設(shè)置元素1節(jié)點的next節(jié)點為元素2節(jié)點稽寒,然后更新tail節(jié)點指向元素2節(jié)點。
- 添加元素3趟章,設(shè)置tail節(jié)點的next節(jié)點為元素3節(jié)點杏糙。
- 添加元素4,設(shè)置元素3的next節(jié)點為元素4節(jié)點蚓土,然后將tail節(jié)點指向元素4節(jié)點宏侍。
入隊主要做兩件事情,第一是將入隊節(jié)點設(shè)置成當前隊列尾節(jié)點的下一個節(jié)點蜀漆。第二是更新tail節(jié)點谅河,在入隊列前如果tail節(jié)點的next節(jié)點不為空,則將入隊節(jié)點設(shè)置成tail節(jié)點,如果tail節(jié)點的next節(jié)點為空绷耍,則將入隊節(jié)點設(shè)置成tail的next節(jié)點吐限,所以tail節(jié)點不總是尾節(jié)點。
上面的分析從單線程入隊的角度來理解入隊過程锨天,但是多個線程同時進行入隊情況就變得更加復雜毯盈,因為可能會出現(xiàn)其他線程插隊的情況。如果有一個線程正在入隊病袄,那么它必須先獲取尾節(jié)點搂赋,然后設(shè)置尾節(jié)點的下一個節(jié)點為入隊節(jié)點,但這時可能有另外一個線程插隊了益缠,那么隊列的尾節(jié)點就會發(fā)生變化脑奠,這時當前線程要暫停入隊操作,然后重新獲取尾節(jié)點幅慌。讓我們再看看源碼來詳細分析下它是如何使用CAS方式來入隊的(JDK1.8):
public boolean offer(E e) {
checkNotNull(e);
//創(chuàng)建入隊節(jié)點
final Node<E> newNode = new Node<E>(e);
//t為tail節(jié)點宋欺,p為尾節(jié)點,默認相等胰伍,采用失敗即重試的方式齿诞,直到入隊成功
for (Node<E> t = tail, p = t;;) {
//獲得p的下一個節(jié)點
Node<E> q = p.next;
// 如果下一個節(jié)點是null,也就是p節(jié)點就是尾節(jié)點
if (q == null) {
//將入隊節(jié)點newNode設(shè)置為當前隊列尾節(jié)點p的next節(jié)點
if (p.casNext(null, newNode)) {
//判斷tail節(jié)點是不是尾節(jié)點,也可以理解為如果插入結(jié)點后tail節(jié)點和p節(jié)點距離達到兩個結(jié)點
if (p != t)
//如果tail不是尾節(jié)點則將入隊節(jié)點設(shè)置為tail骂租。
// 如果失敗了祷杈,那么說明有其他線程已經(jīng)把tail移動過
casTail(t, newNode);
return true;
}
}
// 如果p節(jié)點等于p的next節(jié)點,則說明p節(jié)點和q節(jié)點都為空渗饮,表示隊列剛初始化但汞,所以返回 head節(jié)點
else if (p == q)
p = (t != (t = tail)) ? t : head;
else
//p有next節(jié)點,表示p的next節(jié)點是尾節(jié)點互站,則需要重新更新p后將它指向next節(jié)點
p = (p != t && t != (t = tail)) ? t : q;
}
}
從源代碼我們看出入隊過程中主要做了三件事情私蕾,第一是定位出尾節(jié)點;第二個是使用CAS指令將入隊節(jié)點設(shè)置成尾節(jié)點的next節(jié)點胡桃,如果不成功則重試踩叭;第三是重新定位tail節(jié)點。
從第一個if判斷就來判定p有沒有next節(jié)點如果沒有則p是尾節(jié)點則將入隊節(jié)點設(shè)置為p的next節(jié)點翠胰,同時如果tail節(jié)點不是尾節(jié)點則將入隊節(jié)點設(shè)置為tail節(jié)點懊纳。如果p有next節(jié)點則p的next節(jié)點是尾節(jié)點,需要重新更新p后將它指向next節(jié)點亡容。還有一種情況p等于p的next節(jié)點說明p節(jié)點和p的next節(jié)點都為空,表示這個隊列剛初始化冤今,正準備添加數(shù)據(jù)闺兢,所以需要返回head節(jié)點。
出隊列
出隊列的就是從隊列里返回一個節(jié)點元素,并清空該節(jié)點對元素的引用屋谭。讓我們通過每個節(jié)點出隊的快照來觀察下head節(jié)點的變化脚囊。
從上圖可知,并不是每次出隊時都更新head節(jié)點桐磁,當head節(jié)點里有元素時悔耘,直接彈出head節(jié)點里的元素,而不會更新head節(jié)點我擂。只有當head節(jié)點里沒有元素時衬以,出隊操作才會更新head節(jié)點。讓我們再通過源碼來深入分析下出隊過程(JDK1.8):
public E poll() {
// 設(shè)置起始點
restartFromHead:
for (;;) {
//p表示head結(jié)點校摩,需要出隊的節(jié)點
for (Node<E> h = head, p = h, q;;) {
//獲取p節(jié)點的元素
E item = p.item;
//如果p節(jié)點的元素不為空看峻,使用CAS設(shè)置p節(jié)點引用的元素為null
if (item != null && p.casItem(item, null)) {
if (p != h) // hop two nodes at a time
//如果p節(jié)點不是head節(jié)點則更新head節(jié)點,也可以理解為刪除該結(jié)點后檢查head是否與頭結(jié)點相差兩個結(jié)點衙吩,如果是則更新head節(jié)點
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
//如果p節(jié)點的下一個節(jié)點為null互妓,則說明這個隊列為空,更新head結(jié)點
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
//結(jié)點出隊失敗坤塞,重新跳到restartFromHead來進行出隊
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
首先獲取head節(jié)點的元素冯勉,并判斷head節(jié)點元素是否為空,如果為空摹芙,表示另外一個線程已經(jīng)進行了一次出隊操作將該節(jié)點的元素取走灼狰,如果不為空,則使用CAS的方式將head節(jié)點的引用設(shè)置成null瘫辩,如果CAS成功伏嗜,則直接返回head節(jié)點的元素,如果CAS不成功伐厌,表示另外一個線程已經(jīng)進行了一次出隊操作更新了head節(jié)點承绸,導致元素發(fā)生了變化,需要重新獲取head節(jié)點挣轨。如果p節(jié)點的下一個節(jié)點為null军熏,則說明這個隊列為空(此時隊列沒有元素,只有一個偽結(jié)點p)卷扮,則更新head節(jié)點荡澎。
隊列判空
有些人在判斷隊列是否為空時喜歡用queue.size()==0,讓我們來看看size方法:
public int size()
{
int count = 0;
for (Node<E> p = first(); p != null; p = succ(p))
if (p.item != null)
// Collection.size() spec says to max out
if (++count == Integer.MAX_VALUE)
break;
return count;
}
可以看到這樣在隊列在結(jié)點較多時會依次遍歷所有結(jié)點晤锹,這樣的性能會有較大影響摩幔,因而可以考慮empty函數(shù),它只要判斷第一個結(jié)點(注意不一定是head指向的結(jié)點)鞭铆。
public boolean isEmpty() {
return first() == null;
}
HOPS的設(shè)計
通過上面對offer和poll方法的分析或衡,我們發(fā)現(xiàn)tail和head是延遲更新的,兩者更新觸發(fā)時機為:
tail更新觸發(fā)時機:當tail指向的節(jié)點的下一個節(jié)點不為null的時候,會執(zhí)行定位隊列真正的隊尾節(jié)點的操作封断,找到隊尾節(jié)點后完成插入之后才會通過casTail進行tail更新斯辰;當tail指向的節(jié)點的下一個節(jié)點為null的時候,只插入節(jié)點不更新tail坡疼。
head更新觸發(fā)時機當head指向的節(jié)點的item域為null的時候彬呻,會執(zhí)行定位隊列真正的隊頭節(jié)點的操作,找到隊頭節(jié)點后完成刪除之后才會通過updateHead進行head更新柄瑰;當head指向的節(jié)點的item域不為null的時候闸氮,只刪除節(jié)點不更新head。
并且在更新操作時狱意,源碼中會有注釋為:hop two nodes at a time湖苞。所以這種延遲更新的策略就被叫做HOPS的大概原因是這個(猜的 :)),從上面更新時的狀態(tài)圖可以看出详囤,head和tail的更新是“跳著的”即中間總是間隔了一個财骨。那么這樣設(shè)計的意圖是什么呢?
如果讓tail永遠作為隊列的隊尾節(jié)點藏姐,實現(xiàn)的代碼量會更少隆箩,而且邏輯更易懂。但是羔杨,這樣做有一個缺點捌臊,如果大量的入隊操作,每次都要執(zhí)行CAS進行tail的更新兜材,匯總起來對性能也會是大大的損耗理澎。如果能減少CAS更新的操作,無疑可以大大提升入隊的操作效率曙寡,所以doug lea大師每間隔1次(tail和隊尾節(jié)點的距離為1)進行才利用CAS更新tail糠爬。對head的更新也是同樣的道理,雖然举庶,這樣設(shè)計會多出在循環(huán)中定位隊尾節(jié)點执隧,但總體來說讀的操作效率要遠遠高于寫的性能,因此户侥,多出來的在循環(huán)中定位尾節(jié)點的操作的性能損耗相對而言是很小的镀琉。