ConcurrentLinkedQueue是一個(gè)基于鏈表結(jié)構(gòu)的無(wú)界隊(duì)列,提供了Queue的基本特性FIFO,出入規(guī)則是:從head出魁衙,從tail進(jìn)报腔。非阻塞特性使其在高并發(fā)環(huán)境依然能有出色的性能。
ConcurrentLinkedQueue的基礎(chǔ)數(shù)據(jù)結(jié)構(gòu):
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
implements Queue<E>, java.io.Serializable {
private transient volatile Node<E> head;
private transient volatile Node<E> tail;
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}
剖淀。纯蛾。。
private static class Node<E> {
volatile E item;
volatile Node<E> next;
纵隔。翻诉。。
}
下面是ConcurrentLinkedQueue的一些需要注意的點(diǎn):
1.不允許存儲(chǔ)null值,否則拋出NPE
2.Iterator遍歷時(shí)數(shù)據(jù)的弱一致性
3.size方法沒(méi)有絕對(duì)的實(shí)時(shí)性
4.沒(méi)有fast-fail機(jī)制捌刮,不會(huì)拋出ConcurrentModificationException
5.head,tail節(jié)點(diǎn)的延遲更新處理
6.無(wú)鎖化設(shè)計(jì)的ABA問(wèn)題
1.不允許存儲(chǔ)null值,否則拋出NPE
每次offer操作時(shí)都需要進(jìn)行checkNotNull操作,若item為null,直接跑NPE.
private static void checkNotNull(Object v) {
if (v == null)
throw new NullPointerException();
}
2.Iterator遍歷時(shí)數(shù)據(jù)的弱一致性 3.size方法沒(méi)有絕對(duì)的實(shí)時(shí)性 4.沒(méi)有fast-fail機(jī)制
前兩個(gè)問(wèn)題都是由于ConcurrentLinkedQueue的異步特性造成的,在遍歷時(shí)無(wú)法保證隊(duì)列不會(huì)被其他線程修改.并且ConcurrentLinkedQueue沒(méi)有fast-fail機(jī)制碰煌,即在遍歷隊(duì)列時(shí),隊(duì)列被別的線程修改并不會(huì)拋出ConcurrentModificationException绅作。所以當(dāng)前線程對(duì)于其他線程做的修改不能及時(shí)感知到芦圾。除了Iterator遍歷/size()方法,其他所有批量操作/全局操作的方法都存在這個(gè)問(wèn)題(如:toArray/addAll等)
5.head,tail節(jié)點(diǎn)的延遲更新處理
這點(diǎn)是最初接觸ConcurrentLinkedQueue時(shí)比較困惑的地方棚蓄,head和tail節(jié)點(diǎn)并不是實(shí)時(shí)更新的堕扶,也就是說(shuō)每次offer/poll操作并不一定改變head/tail的值碍脏,這種處理方式可以減少入隊(duì)出隊(duì)時(shí)的cas操作梭依,對(duì)性能是個(gè)很大的提升。下面結(jié)合offer和poll操作來(lái)具體分析下:
執(zhí)行操作:初始化
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>();
隊(duì)列狀態(tài)
執(zhí)行操作:queue.offer("aaa");
隊(duì)列狀態(tài)
執(zhí)行操作:queue.offer("bbb");
隊(duì)列狀態(tài)
執(zhí)行操作:queue.offer("ccc");
隊(duì)列狀態(tài)
執(zhí)行操作:queue.offer("ddd");
隊(duì)列狀態(tài)
下面根據(jù)offer源碼具體分析:
offer操作源碼:
public boolean offer(E e) {
checkNotNull(e);//NPE檢查
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
(1) if (q == null) {
// p是最后一個(gè)節(jié)點(diǎn),cas更新
if (p.casNext(null, newNode)) {
//cas更新成功
if (p != t)
casTail(t, newNode); //cas更新tail
return true;
}
// cas操作失敗,重復(fù)操作
}
(2) else if (p == q)
p = (t != (t = tail)) ? t : head;
(3) else
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
}
}
offer操作可以分為兩個(gè)步驟:1.確定尾節(jié)點(diǎn) 2. 做cas更新
從上面的狀態(tài)圖中可以看到典尾,tail指向的節(jié)點(diǎn)并不一定是隊(duì)列的尾節(jié)點(diǎn)役拴,所以要做元素入隊(duì)操作的第一要?jiǎng)?wù)是確定尾節(jié)點(diǎn).
重點(diǎn)分析下為什么q 會(huì)出現(xiàn)以上三種情況:
- q == null
當(dāng)前的隊(duì)列狀態(tài)1
執(zhí)行操作:queue.offer("ccc");
當(dāng)tail指向隊(duì)列的最后一個(gè)節(jié)點(diǎn)bbb的時(shí)候,tail.next為null钾埂,此時(shí)q 為 null,執(zhí)行cas更新節(jié)點(diǎn)的操作河闰,p.casNext(null, newNode),將p的next值設(shè)置為newNode褥紫,等同于p.next = newNode;
隊(duì)列的狀態(tài)2如下:
問(wèn)題:為什么需要做下面這個(gè)判斷姜性?這個(gè)cas操作失敗了會(huì)有什么后果?
if (p != t)
casTail(t, newNode);
下面繼續(xù)分析
在上面隊(duì)列狀態(tài)基礎(chǔ)上執(zhí)行操作:queue.offer("ddd");
此時(shí) p = tail,q = p.next; p指向bbb髓考,q指向ccc部念。狀態(tài)如下:
這種情況下 q != null 且 p != q,所以執(zhí)行(3)操作:
p = (p != t && t != (t = tail)) ? t : q;
這步操作后 p 指向了 節(jié)點(diǎn)ccc。即p指向了尾節(jié)點(diǎn)。尾節(jié)點(diǎn)確定了儡炼,那么下次循環(huán)即可做cas更新操作了妓湘。
得到最新?tīng)顟B(tài)3:
在隊(duì)列狀態(tài)2【即執(zhí)行queue.offer("ccc")后】的基礎(chǔ)上做如下操作:
在并發(fā)環(huán)境中,當(dāng)線程A執(zhí)行操作queue.offer("ddd");之前乌询,同時(shí)另一個(gè)線程執(zhí)行完以下三次poll操作:
queue.poll();
queue.poll();
queue.poll();
此時(shí)線程A看到的隊(duì)列的狀態(tài)如下:
所以當(dāng) p == q時(shí)榜贴,表示當(dāng)前節(jié)點(diǎn)已經(jīng)不再鏈表中(已被移除)。所以這種情況下需要將p重新指向head或tail妹田。
p = (t != (t = tail)) ? t : head;
至此唬党,offer操作中的三種情況分析完畢。
下面分析poll操作秆麸。
當(dāng)前狀態(tài)
執(zhí)行操作:queue.poll();
狀態(tài)如下:
繼續(xù)執(zhí)行操作:queue.poll();
狀態(tài)如下:
繼續(xù)執(zhí)行操作:queue.poll(); 狀態(tài)如下
可見(jiàn)head的更新和tail類似初嘹,都是采用了延遲更新的優(yōu)化方式。
poll源碼如下:
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null && p.casItem(item, null)) {
// 成功將item置為null
if (p != h) // 更新head
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
poll操作和offer類似沮趣,先獲取head屯烦,然后cas(依情況決定是否更新head),一般hop為2房铭。
6.無(wú)鎖化設(shè)計(jì)的ABA問(wèn)題
在用cas做lock-free操作時(shí)有一個(gè)經(jīng)典的ABA問(wèn)題驻龟。那么ConcurrentLinkedQueue需要考慮這個(gè)問(wèn)題嗎?
ConcurrentLinkedQueue中不存在ABA問(wèn)題缸匪,這主要依賴于Java語(yǔ)言的垃圾回收機(jī)制翁狐。當(dāng)一個(gè)節(jié)點(diǎn)被poll或remove后,即被gc凌蔬,該節(jié)點(diǎn)會(huì)被垃圾回收器回收露懒。
使用場(chǎng)景
- Tomcat存儲(chǔ)等待請(qǐng)求
protected ConcurrentLinkedQueue<SocketWrapper<Socket>> waitingRequests =
new ConcurrentLinkedQueue<SocketWrapper<Socket>>();
ConcurrentLinkedDeque
JDK還提供了一個(gè)雙端隊(duì)列的實(shí)現(xiàn)版本。并發(fā)操作特性和ConcurrentLinkedQueue相似砂心,提供了Deque接口特征懈词。
參考: jdk1.7 java.util.concurrent.ConcurrentLinkedQueue源碼