Queue
J.U.C中分為阻塞隊里和非阻塞隊列。
阻塞隊列在滿時進(jìn)行入列操作會被阻塞,空時進(jìn)行出列操作會被阻塞,很適合并發(fā)編程中最常見的生產(chǎn)者-消費(fèi)者模式戏蔑。
非阻塞隊使用CAS無鎖算法避免鎖競爭,相比同步方式實現(xiàn)的隊列鲁纠,提高了吞吐量辛臊。
阻塞隊列:
- ArrayBlockingQueue基于數(shù)組實現(xiàn)的有界阻塞隊列。
- LinkedBlockingQueue基于鏈表實現(xiàn)的有界阻塞隊列房交。
- PriorityBlockingQueue基于數(shù)組實現(xiàn)的彻舰,支持優(yōu)先級排序的無界阻塞隊列。
- LinkedBlockingDeque基于鏈表實現(xiàn)的雙端阻塞隊列候味。
- SynchronousQueue不存儲元素的阻塞隊列刃唤。
- LinkedTransferQueue基于鏈表實現(xiàn)的無界阻塞隊列。
非阻塞隊列:
- ConcurrentLinkedQueue基于鏈表實現(xiàn)的無界非阻塞隊列白群。
- ConcurrentLinkedDeque基于鏈表實現(xiàn)的無界非阻塞雙端隊列尚胞。
隊列的入列、出列方法及處理方式(阻塞和超時只適用于阻塞隊列):
方法\處理方式 | 異常 | 特殊值 | 阻塞 | 超時 |
---|---|---|---|---|
入列方法 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
出列方法 | remove() | poll() | take | poll(time, unit) |
查看方法 | element() | peek() | 無 | 無 |
ArrayBlockingQueue
基于數(shù)組實現(xiàn)的有界阻塞隊列:
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/** 數(shù)組容器 */
final Object[] items;
/** 出列下標(biāo) */
int takeIndex;
/** 入列下標(biāo) */
int putIndex;
/** 元素個數(shù) */
int count;
/** 重入鎖 */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
public ArrayBlockingQueue(int capacity) {}
public ArrayBlockingQueue(int capacity, boolean fair) {}
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {}
... ...
}
ArrayBlockingQueue沒有默認(rèn)長度帜慢,初始化的時候必須指定笼裳。
fair參是用來設(shè)置重入鎖lock的公平性,重入鎖默認(rèn)是非公平鎖所以不能保證線程公平的訪問隊列粱玲」恚可以通過fair將重入鎖設(shè)置為公平鎖,但是會降低部分吞吐量抽减。
生產(chǎn)者線程和消費(fèi)者線程線程的協(xié)調(diào)工作是由兩個Condition完成的允青。
阻塞入列:
public void put(E e) throws InterruptedException {
checkNotNull(e);//元素為空拋異常
final ReentrantLock lock = this.lock;
//加鎖,鎖響應(yīng)中斷
lock.lockInterruptibly();
try {
//隊列已滿卵沉,入列線程在notFull上等待
while (count == items.length){
notFull.await();
}
//插入元素
insert(e);
} finally {
lock.unlock();//釋放鎖
}
}
private void insert(E x) {
//加入到數(shù)組
items[putIndex] = x;
//inc() putIndex下標(biāo)等于capacity如果等于隊列長度返回0
putIndex = inc(putIndex);
++count;//元素數(shù)量遞增
//喚醒在notEmpty上等待的出列線程
notEmpty.signal();
}
阻塞出列:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//加鎖颠锉,響應(yīng)中斷
try {
//隊列已空法牲,出列線程在notEmpty上等待
while (count == 0){
notEmpty.await();
}
//出列
return extract();
} finally {
lock.unlock();//解鎖
}
}
private E extract() {
//數(shù)組
final Object[] items = this.items;
//元素類型泛型轉(zhuǎn)換
E x = this.<E>cast(items[takeIndex]);
//置空下標(biāo)為takeIndex的元素
items[takeIndex] = null;
//inc() putIndex下標(biāo)等于capacity如果等于隊列長度返回0
takeIndex = inc(takeIndex);
--count;//元素個數(shù)遞減
//喚醒在notFull上等待的入列線程
notFull.signal();
return x;
}
當(dāng)隊列滿時,入列的線程會阻塞在notFull上琼掠,當(dāng)有出列操作時喚醒notFull上等待的線程拒垃,隊列空時出列線程會阻塞在notEmpty上,當(dāng)有入列操作時喚醒在notEmpty上等待的線程瓷蛙,典型的生產(chǎn)者消費(fèi)者邏輯悼瓮,關(guān)鍵點在于線程的協(xié)調(diào)。
LinkedBlockingQueue
基于單向鏈表實現(xiàn)的有界阻塞隊列:
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable{
/** 內(nèi)部類 節(jié)點 */
static class Node<E> {}
/** 容量 */
private final int capacity;
/** 元素數(shù)量 計數(shù)器 */
private final AtomicInteger count = new AtomicInteger(0);
/** 頭節(jié)點 */
private transient Node<E> head;
/** 尾節(jié)點 */
private transient Node<E> last;
/** 出列鎖 */
private final ReentrantLock takeLock = new ReentrantLock();
/** takeLock->condition */
private final Condition notEmpty = takeLock.newCondition();
/** 入列鎖 */
private final ReentrantLock putLock = new ReentrantLock();
/** putLock->condition */
private final Condition notFull = putLock.newCondition();
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {}
public LinkedBlockingQueue(Collection<? extends E> c) {}
... ...
}
LinkedBlockingQueue在初始化時可以不指定長度速挑,默認(rèn)長為整數(shù)的最大值 2147483647 谤牡。
使用了兩把鎖對對出列和入列進(jìn)行了鎖分離副硅,takeLock出列鎖姥宝、putLock入列鎖。
LinkedBlockingQueue沒有公平性設(shè)置恐疲,只能使用非公平鎖腊满。
阻塞入列:
public void put(E e) throws InterruptedException {
if (e == null)//入列元素不能為空
throw new NullPointerException();
int c = -1;//計數(shù)
Node<E> node = new Node(e);//構(gòu)造節(jié)點
//入列鎖
final ReentrantLock putLock = this.putLock;
//元素數(shù)量
final AtomicInteger count = this.count;
putLock.lockInterruptibly();//加鎖,響應(yīng)中斷
try {
//隊列已滿,入列線程在notFull上等待
while (count.get() == capacity) {
notFull.await();
}
//入列培己,在尾節(jié)點后鏈入node
enqueue(node);
//獲取元素數(shù)量 后加1
c = count.getAndIncrement();
//如果隊列還沒滿
//喚醒在notFull等待的入列線程碳蛋,表示可繼續(xù)入列
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();//解鎖
}
//原本為空的隊列,即使加入一個元素
//喚醒在notEmpty上等待的出列線程
if (c == 0)
signalNotEmpty();
}
//鏈入尾節(jié)點
private void enqueue(Node<E> node) {
last = last.next = node;
}
//喚醒在notEmpty上等待的出列線程
private void signalNotEmpty() {
//出列鎖
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//喚醒
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
阻塞出列:
public E take() throws InterruptedException {
E x;
int c = -1;
//元素數(shù)量
final AtomicInteger count = this.count;
//出列鎖
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();//加鎖省咨,響應(yīng)中斷
try {
//隊列為空肃弟,出列線程在notEmpty上等待
while (count.get() == 0) {
notEmpty.await();
}
//出列
x = dequeue();
//獲取元素數(shù)量 后減1
c = count.getAndDecrement();
//出列之后,隊列還沒空零蓉,表示可繼續(xù)出列
//喚醒在notEmpty等待的出列線程
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
//隊列有一個空位笤受,喚醒入列線程
if (c == capacity)
signalNotFull();
return x;
}
//喚醒入列線程
private void signalNotFull() {
//入列鎖
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
當(dāng)隊列滿時,入列的線程會阻塞在notFull上敌蜂,當(dāng)有出列操作時喚醒notFull上等待的線程箩兽,隊列空時出列線程會阻塞在notEmpty上,當(dāng)有入列操作時喚醒在notEmpty上等待的線程章喉,入列和出列使用了兩把鎖汗贫,喚醒notFull時要在putLock監(jiān)視范圍,喚醒notEmpty要做takeLock的監(jiān)視范圍秸脱。
ArrayBlockingQueue和LinkedBlockingQueue的差異
- ArrayBlockingQueue使用循環(huán)數(shù)組必須指定容量落包,LinkedBlockingQueue使用鏈表可以不指定容量,能預(yù)判隊列容量使用ArrayBlockingQueue可以更有效的利用內(nèi)存摊唇。LinkedBlockingQueue如果沒有指定容量妥色,過快大批量的入列有可能會導(dǎo)致內(nèi)存溢出。
- ArrayBlockingQueue可以設(shè)置為公平鎖遏片,使得線程能夠公平地訪問隊列嘹害。
- LinkedBlockingQueue使用鎖分離撮竿,入列和出列使用不同的鎖,之間互不干擾笔呀,減少了鎖爭用的次數(shù)幢踏,吞吐量比ArrayBlockingQueue更高。
PriorityBlockingQueue
基于數(shù)組實現(xiàn)的無界阻塞隊列许师,因為是無界隊列當(dāng)數(shù)組長度不夠時會自動擴(kuò)容所以put方法不會阻塞房蝉,但是隊列空時進(jìn)行take會阻塞。
PriorityBlockingQueue不再是FIFO微渠,而是根據(jù)元素的排序來確定元素出列的優(yōu)先級搭幻,元素必須實現(xiàn)Comparable接口。
LinkedBlockingDeque
基于鏈表實現(xiàn)的組成的雙向阻塞隊列逞盆,同時支持FIFO和FILO兩種操作方式檀蹋。
SynchronousQueue
SynchronousQueue是一個沒有容器的隊列,所謂沒有容器就是指它不能存儲任何元素云芦。不像ArrayBlockingQueue或LinkedBlockingQueue如果隊列沒有滿俯逾,生產(chǎn)線程入列之后就返回了,而SynchronousQueue不同舅逸,因為它沒有緩沖存儲區(qū)所以生產(chǎn)者線程入列之后會一直阻塞桌肴,直到有消費(fèi)線程取走數(shù)據(jù)。
就像一手交錢一手交貨的過程琉历,賣方拿著貨物不松手坠七,直到買房把錢給他,買方也是一樣的拿著錢不松手旗笔,直到賣方把貨物給他彪置。
所以SynchronousQueue從線程的角度看是一個配對的過程一個生成線程必須匹配一個消費(fèi)線程,一個消費(fèi)線程必須匹配一個生成線程换团,從數(shù)據(jù)的角度看是一個數(shù)據(jù)傳遞的過程生成線程將數(shù)據(jù)傳遞給消費(fèi)線程悉稠。
SynchronousQueue:
public class SynchronousQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable{
/** Transferer */
abstract static class Transferer {
abstract Object transfer(Object e, boolean timed, long nanos);
}
/** Transferer子類 棧 */
static final class TransferStack extends Transferer {}
/** Transferer子類 隊列 */
static final class TransferQueue extends Transferer {}
/** transferer實例 */
private transient volatile Transferer transferer;
/** 默認(rèn)構(gòu)造 */
public SynchronousQueue() {
this(false);
}
/** fair 公平性參數(shù) */
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue() : new TransferStack();
}
... ...
}
SynchronousQueue可以設(shè)置公平性策略,默認(rèn)是非公平隊列艘包。
Transferer是核心設(shè)置的猛,實現(xiàn)線程數(shù)據(jù)傳遞的基礎(chǔ),公平性隊列用TransferQueue新入列的節(jié)點會在隊尾或者和隊頭節(jié)點批量想虎,非公平隊列用TransferStack新入列的節(jié)點會在棧頂進(jìn)行匹配卦尊。因為沒有緩沖存儲所以容器類的常用方法size()、contains(Object o)舌厨、remove(Object o)等對其來說根本沒用岂却。
TransferStack:
static final class TransferStack extends Transferer {
/** 消費(fèi)端 consumer */
static final int REQUEST = 0;
/** 生成端 producer */
static final int DATA = 1;
/** 匹配 */
static final int FULFILLING = 2;
/** true 匹配中 */
static boolean isFulfilling(int m) {
return (m & FULFILLING) != 0;
}
/** TransferStacks節(jié)點 */
static final class SNode {}
/** 棧頂節(jié)點 */
volatile SNode head;
/** CAS設(shè)置棧頂 */
boolean casHead(SNode h, SNode nh) {}
/** 構(gòu)造節(jié)點 */
static SNode snode(SNode s, Object e, SNode next, int mode) {}
/** 交換方法 */
Object transfer(Object e, boolean timed, long nanos) {}
/** 線程等待 */
SNode awaitFulfill(SNode s, boolean timed, long nanos){}
/** 是否自旋 */
boolean shouldSpin(SNode s){}
/** 將節(jié)點從棧清除 */
void clean(SNode s){}
//Unsafe 相關(guān)初始化 ... ..
}
SNode :
static final class SNode {
volatile SNode next; //后繼節(jié)點
volatile SNode match; //匹配節(jié)點
volatile Thread waiter; //等待線程
Object item; //生產(chǎn)端:data;消費(fèi)端:null
int mode;//模式:DATA/REQUEST/FULFILLING
SNode(Object item) {
this.item = item;
}
/** CAS設(shè)置后繼節(jié)點 */
boolean casNext(SNode cmp, SNode val) {
return cmp == next
&& UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
/** 當(dāng)前節(jié)點和s節(jié)點匹配,匹配成功喚醒當(dāng)前節(jié)點等待的線程 */
boolean tryMatch(SNode s) {
//當(dāng)前節(jié)點的match設(shè)置為s
if (match == null
&&UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
Thread w = waiter;
if (w != null) { //waiter不為空 喚醒。
waiter = null;
LockSupport.unpark(w);
}
return true;
}
//如果match == s則說明已經(jīng)匹配成功
return match == s;
}
//取消 將match設(shè)置為自身
void tryCancel() {
UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
}
//是否已取消
boolean isCancelled() {
return match == this;
}
// Unsafe mechanics ... ...
}
TransferStack的transfer方法:
/** Puts 和 takes 數(shù)據(jù)交換 */
Object transfer(Object e, boolean timed, long nanos) {
SNode s = null;
// 0消費(fèi)端躏哩,1生產(chǎn)端
int mode = (e == null) ? REQUEST : DATA;
for (;;) {
SNode h = head;// 頭節(jié)點
// 棧為空,當(dāng)前線程進(jìn)入等待
// 或者棧不為空署浩,但是棧頂元素模式與當(dāng)前線程模式相同
// 即同為生成著或消費(fèi)者,比如線程put線程
// 當(dāng)前線程進(jìn)入等待
if (h == null || h.mode == mode) {
if (timed && nanos <= 0) { // 不等待
//h不為空并被取消
if (h != null && h.isCancelled())
//出棧
casHead(h, h.next);
else
return null;
// 壓棧 更新棧頂為s
} else if (casHead(h, s = snode(s, e, h, mode))) {
// 進(jìn)入等待扫尺,等待一個互補(bǔ)的節(jié)點進(jìn)行匹配
SNode m = awaitFulfill(s, timed, nanos);
// 取消的時候?qū)atch設(shè)置成了this
// 所以m==s即被取消筋栋,清除,返回正驻。
if (m == s) {
clean(s);
return null;
}
//已經(jīng)完成了批量
if ((h = head) != null && h.next == s) {
casHead(h, s.next);
}
// 如果是消費(fèi)者則返回生成值的值
// 如果是生產(chǎn)者返回自身的值
return (mode == REQUEST) ? m.item : s.item;
}
// 棧頂和當(dāng)前節(jié)點互補(bǔ)即模式不同弊攘,進(jìn)入匹配邏輯
} else if (!isFulfilling(h.mode)) {
if (h.isCancelled()) {// 已取消,出棧姑曙,置換棧頂為h.next
casHead(h, h.next);
//構(gòu)造當(dāng)前“正在匹配"狀態(tài)的節(jié)點s
} else if (casHead(h, s = snode(s, e, h, FULFILLING | mode))) {
for (;;) { // 循環(huán)直到找到一個可以匹配的節(jié)點
SNode m = s.next; // m即與s匹配的節(jié)點
//m==null說明棧s之后無元素襟交,可能被其他線程匹配了。
//s出棧伤靠,s置空捣域,進(jìn)行最外層的循環(huán).
if (m == null) {
casHead(s, null);
s = null;
break;
}
//mn為后備的棧頂
//匹配成功,將s和m同時出棧醋界,mn為棧頂
SNode mn = m.next;
if (m.tryMatch(s)) {
//匹配成功竟宋,mn設(shè)置為棧頂
casHead(s, mn);
// 如果是消費(fèi)者則返回生成值的值
// 如果是生產(chǎn)者返回自身的值
return (mode == REQUEST) ? m.item : s.item;
} else
// 設(shè)置匹配失敗提完,則說明m已經(jīng)被其他節(jié)點匹配了
s.casNext(m, mn); // help unlink
}
}
} else { // 非棧頂匹配形纺,邏輯與棧頂匹配一致
SNode m = h.next; // m is h's match
if (m == null) // waiter is gone
casHead(h, null); // pop fulfilling node
else {
SNode mn = m.next;
if (m.tryMatch(h)) // help match
casHead(h, mn); // pop both h and m
else // lost match
h.casNext(m, mn); // help unlink
}
}
}
}
// 等待
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
long lastTime = timed ? System.nanoTime() : 0;
// 當(dāng)前線程
Thread w = Thread.currentThread();
// 頭節(jié)點
SNode h = head;
// 自旋次數(shù)
int spins = (shouldSpin(s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted()) {// 當(dāng)前線程中斷
s.tryCancel();//取消節(jié)點
}
SNode m = s.match;
if (m != null) {//匹配成功,返回匹配的節(jié)點
return m;
}
// 超時
if (timed) {
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
if (nanos <= 0) {
s.tryCancel();//取消
continue;
}
}
// 自旋隶症,直到spins==0缺亮,進(jìn)入等待景用,自旋的目的是為了減少線程掛起的次數(shù)
// 如果線程掛起前,匹配線程來了脂新,則線程不需要掛起
if (spins > 0) {
spins = shouldSpin(s) ? (spins - 1) : 0;
}
// 設(shè)置節(jié)點的等待線程
else if (s.waiter == null) {
s.waiter = w; // establish waiter so can park next iter
}
// 掛起操作
else if (!timed) {
LockSupport.park(this);
} else if (nanos > spinForTimeoutThreshold) {
LockSupport.parkNanos(this, nanos);
}
}
}
TransferStack的transfer大致邏輯是:線程A進(jìn)行put(A),此時棧為空粗梭,將節(jié)點A入棧争便,線程A掛起,棧頂為節(jié)點A断医。線程B進(jìn)行put(B),和節(jié)點A模式一樣滞乙,同為DATA,將節(jié)點B入棧線程B掛起,棧頂為節(jié)點B鉴嗤。線程C進(jìn)行take(),和棧頂B模式互補(bǔ)斩启,將節(jié)點C的狀態(tài)設(shè)置為FULFILLING入棧,開始進(jìn)行匹配操作醉锅,匹配成則線程B被喚醒兔簇、節(jié)點B和節(jié)點C出棧,并返回節(jié)點B的值。
如果節(jié)點B和節(jié)點C正在匹配中垄琐,即棧頂節(jié)點的狀態(tài)為ULFILLING边酒,線程D進(jìn)行take(),那么線程D將幫助節(jié)點B和節(jié)點C完成匹配和出棧,自己在留在下一輪循環(huán)中匹配狸窘。
線程A是先入棧的反而后匹配甚纲,所以TransferStack的匹配過程是非公平的。TransferQueue則是在隊尾入列朦前,從隊列頭匹配介杆,能保證先入列的線程可以盡早的得到匹配,阻塞和匹配邏輯和上述差不多韭寸,只是入列過程不一樣春哨,不再贅述。
SynchronousQueue不能使用在緩沖場景恩伺,但是非常適合用在傳遞場景赴背,由于其阻塞過程沒有鎖的競爭吞吐量高于ArrayBlockingQueue和LinkedBlockingQueue。
LinkedTransferQueue
LinkedTransferQueue基于鏈表的無界阻塞隊列晶渠。同時它還實現(xiàn)了TransferQueue接口凰荚,這是一個在JDK1.7中新增的接口,接口中的transfer系列方法會使生產(chǎn)者一直阻塞直到所添加到隊列的元素被某一個消費(fèi)者所消費(fèi)褒脯。和SynchronousQueue中實現(xiàn)的TransferQue意思差不多便瑟。
與SynchronousQueue相比LinkedTransferQueue的應(yīng)用更廣泛,可以使用put/take方法用作緩沖番川,還可以使用transfer方法用作傳遞到涂,可以看做是ConcurrentLinkedQueue、SynchronousQueue(公平模式)和LinkedBlockingQueue的超集颁督。
ConcurrentLinkedQueue
使用單向鏈表構(gòu)造的無界非阻塞隊列:
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
implements Queue<E>, java.io.Serializable {
//內(nèi)部類 節(jié)點
private static class Node<E> {
volatile E item;
volatile Node<E> next;
... ...
}
//頭節(jié)點
private transient volatile Node<E> head;
//尾節(jié)點
private transient volatile Node<E> tail;
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}
public ConcurrentLinkedQueue(Collection<? extends E> c){}
... ...
}
入列操作:
public boolean offer(E e) {
checkNotNull(e);// 檢查e是否為空践啄,為空直接拋異常
// 構(gòu)造新節(jié)點
final Node<E> newNode = new Node<E>(e);
// 循環(huán),移動p節(jié)點沉御、確保CAS操作成功
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;// p的后繼節(jié)點
if (q == null) {// q為空屿讽,說明p為尾節(jié)點
// CAS更新p的next節(jié)點為新入節(jié)點
if (p.casNext(null, newNode)) {
// 當(dāng)p為尾節(jié)點時只進(jìn)行了p.casNext()操作,
// 并沒有移動尾節(jié)點吠裆。p和t中間至少隔了一個節(jié)點伐谈。
if (p != t) {
// CAS 更新尾節(jié)點
casTail(t, newNode);
}
return true;
}
} else if (p == q) {// 尾節(jié)點被出列
p = (t != (t = tail)) ? t : head;
} else {// p節(jié)點后移
p = (p != t && t != (t = tail)) ? t : q;
}
}
}
出列操作:
public E poll() {
restartFromHead: for (;;) {//循環(huán)體,移動p節(jié)點硫痰、確保CAS成功
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
// 頭節(jié)點的內(nèi)容不為空衩婚,將其置空
if (item != null && p.casItem(item, null)) {
// 出列時,進(jìn)行了p.casItem()但并沒有移動頭節(jié)點
// p節(jié)點和h節(jié)點中間至少隔了一個節(jié)點
if (p != h) {
// 設(shè)置頭節(jié)點
updateHead(h, ((q = p.next)
!= null) ? q : p);
}
return item;
} else if ((q = p.next) == null) {//空隊為空
updateHead(h, p);
return null;
} else if (p == q) {//從隊列頭重新開始
continue restartFromHead;
} else {// p后移
p = q;
}
}
}
}
入列操作offer(E e)只做了兩件事情效斑,第一是將新節(jié)點鏈到隊列尾部非春,第二是定位尾節(jié)點將其指向新入列的節(jié)點,這兩個操作都是使用CAS方式,出列poll()操作也類似奇昙。入列和出列都只需要動用一個節(jié)點护侮,并且是無鎖的,所以ConcurrentLinkedQueue在并發(fā)環(huán)境下出列和入列效率極高储耐。
獲取長度的方法羊初,需要遍歷整個鏈表,效率極低什湘,所以慎用长赞。如果想實時獲取列表長度,不妨使用一個AtomicInteger在入列和出列時記錄下闽撤,好過整表遍歷得哆。
public int size() {
int count = 0;
//從頭節(jié)點開始遍歷,p!=null說明p還沒到隊列尾
for (Node<E> p = first(); p != null; p = succ(p))
if (p.item != null)
// Collection.size() spec says to max out
if (++count == Integer.MAX_VALUE)
break;
return count;
}
ConcurrentLinkedDeque
使用雙向鏈表構(gòu)造的無界非阻塞雙端隊列哟旗,雙端隊列中的元素可以從兩端彈出贩据,入列和出列操作可以在表的兩端進(jìn)行,支持FIFO和FILO兩種出列模式闸餐。
盡管看起來比隊列更靈活饱亮,但實際上在應(yīng)用中遠(yuǎn)不及隊列有用。
碼字不易舍沙,轉(zhuǎn)載請保留原文連接http://www.reibang.com/p/0120984dea81