在并發(fā)編程中鸭限,有時(shí)候需要使用線程安全的隊(duì)列锯仪。如果要實(shí)現(xiàn)一個(gè)線程安全的隊(duì)列有兩 種方式:一種是使用阻塞算法于毙,另一種是使用非阻塞算法。使用阻塞算法的隊(duì)列可以用一個(gè)鎖 (入隊(duì)和出隊(duì)用同一把鎖)或兩個(gè)鎖(入隊(duì)和出隊(duì)用不同的鎖)等方式來實(shí)現(xiàn)搬泥。非阻塞的實(shí)現(xiàn)方式則可以使用循環(huán)CAS的方式來實(shí)現(xiàn)桑寨。
阻塞隊(duì)列
阻塞隊(duì)列(BlockingQueue)是一個(gè)支持兩個(gè)附加操作的隊(duì)列。這兩個(gè)附加的操作支持阻塞的插入和移除方法忿檩。
- 支持阻塞的插入方法:意思是當(dāng)隊(duì)列滿時(shí)尉尾,隊(duì)列會(huì)阻塞插入元素的線程,直到隊(duì)列不滿燥透。
- 支持阻塞的移除方法:意思是在隊(duì)列為空時(shí)沙咏,獲取元素的線程會(huì)等待隊(duì)列變?yōu)榉强铡?/li>
應(yīng)用場(chǎng)景
阻塞隊(duì)列常用于生產(chǎn)者和消費(fèi)者的場(chǎng)景辨图,生產(chǎn)者是向隊(duì)列里添加元素的線程,消費(fèi)者是 從隊(duì)列里取元素的線程肢藐。阻塞隊(duì)列就是生產(chǎn)者用來存放元素故河、消費(fèi)者用來獲取元素的容器。
插入和移除操作的4中處理方式
- 拋出異常:當(dāng)隊(duì)列滿時(shí)吆豹,如果再往隊(duì)列里插入元素忧勿,會(huì)拋出IllegalStateException("Queue full")異常。當(dāng)隊(duì)列空時(shí)瞻讽,從隊(duì)列里獲取元素會(huì)拋出NoSuchElementException異常鸳吸。
- 返回特殊值:當(dāng)往隊(duì)列插入元素時(shí),會(huì)返回元素是否插入成功速勇,成功返回true晌砾。如果是移除方法,則是從隊(duì)列里取出一個(gè)元素烦磁,如果沒有則返回null养匈。
- 一直阻塞:當(dāng)阻塞隊(duì)列滿時(shí),如果生產(chǎn)者線程往隊(duì)列里put元素都伪,隊(duì)列會(huì)一直阻塞生產(chǎn)者線程呕乎,直到隊(duì)列可用或者響應(yīng)中斷退出。當(dāng)隊(duì)列空時(shí)陨晶,如果消費(fèi)者線程從隊(duì)列里take元素猬仁,隊(duì)列會(huì)阻塞消費(fèi)者線程,直到隊(duì)列不為空先誉。
- 超時(shí)退出:當(dāng)阻塞隊(duì)列滿時(shí)湿刽,如果生產(chǎn)者線程往隊(duì)列里插入元素,隊(duì)列會(huì)阻塞生產(chǎn)者線程 一段時(shí)間褐耳,如果超過了指定的時(shí)間诈闺,生產(chǎn)者線程就會(huì)退出。
注意: 如果是無界阻塞隊(duì)列铃芦,隊(duì)列不可能會(huì)出現(xiàn)滿的情況雅镊,所以使用put或offer方法永 遠(yuǎn)不會(huì)被阻塞,而且使用offer方法時(shí)刃滓,該方法永遠(yuǎn)返回true仁烹。
Java里的阻塞隊(duì)列
- ArrayBlockingQueue:一個(gè)由數(shù)組結(jié)構(gòu)組成的有界阻塞隊(duì)列。
- LinkedBlockingQueue:一個(gè)由鏈表結(jié)構(gòu)組成的有界阻塞隊(duì)列注盈。
- PriorityBlockingQueue:一個(gè)支持優(yōu)先級(jí)排序的無界阻塞隊(duì)列晃危。
- DelayQueue:一個(gè)使用優(yōu)先級(jí)隊(duì)列實(shí)現(xiàn)的無界阻塞隊(duì)列叙赚。
- SynchronousQueue:一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列老客。
- LinkedTransferQueue:一個(gè)由鏈表結(jié)構(gòu)組成的無界阻塞隊(duì)列僚饭。
- LinkedBlockingDeque:一個(gè)由鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列。
ArrayBlockingQueue
ArrayBlockingQueue是一個(gè)用數(shù)組實(shí)現(xiàn)的有界阻塞隊(duì)列胧砰,在初始化時(shí)需要指定隊(duì)列的長(zhǎng)度鳍鸵。此隊(duì)列按照先進(jìn)先出(FIFO)的原則對(duì)元素進(jìn)行排序。默認(rèn)情況下不保證線程公平的訪問隊(duì)列尉间,但是在初始化的隊(duì)列的時(shí)候指定阻塞隊(duì)列的公平性偿乖,如:ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);
。它使用ReentrantLock
來實(shí)現(xiàn)隊(duì)列的線程安全哲嘲。
核心屬性
/** The queued items */
final Object[] items;
/** items index for next take, poll, peek or remove */
int takeIndex;
/** items index for next put, offer, or add */
int putIndex;
/** Number of elements in the queue */
int count;
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
核心方法
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
LinkedBlockingQueue
LinkedBlockingQueue是一個(gè)用鏈表實(shí)現(xiàn)的有界阻塞隊(duì)列贪薪。此隊(duì)列的默認(rèn)和最大長(zhǎng)度為 Integer.MAX_VALUE。此隊(duì)列按照先進(jìn)先出的原則對(duì)元素進(jìn)行排序眠副。出隊(duì)和入隊(duì)使用兩把鎖來實(shí)現(xiàn)画切。
核心屬性
/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();
/**
* Head of linked list.
* Invariant: head.item == null
*/
transient Node<E> head;
/**
* Tail of linked list.
* Invariant: last.next == null
*/
private transient Node<E> last;
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
PriorityBlockingQueue
PriorityBlockingQueue是一個(gè)支持優(yōu)先級(jí)的無界阻塞隊(duì)列。默認(rèn)情況下元素采取自然順序升序排列囱怕。也可以自定義類實(shí)現(xiàn)compareTo()方法來指定元素排序規(guī)則霍弹,或者初始化 PriorityBlockingQueue時(shí),指定構(gòu)造參數(shù)Comparator來對(duì)元素進(jìn)行排序娃弓。需要注意的是不能保證 同優(yōu)先級(jí)元素的順序典格。底層使用數(shù)組實(shí)現(xiàn),默認(rèn)初始容量是11台丛,最大值是Integer.MAX_VALUE - 8
耍缴。容量不夠時(shí)會(huì)進(jìn)行擴(kuò)容
核心方法
// 入隊(duì)
private static <T> void siftUpComparable(int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x;
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = array[parent];
if (key.compareTo((T) e) >= 0)
break;
array[k] = e;
k = parent;
}
array[k] = key;
}
// 擴(kuò)容
private void tryGrow(Object[] array, int oldCap) {
lock.unlock(); // must release and then re-acquire main lock
Object[] newArray = null;
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
allocationSpinLock = 0;
}
}
if (newArray == null) // back off if another thread is allocating
Thread.yield();
lock.lock();
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
DelayQueue
DelayQueue是一個(gè)支持延時(shí)獲取元素的無界阻塞隊(duì)列。隊(duì)列使用PriorityQueue來實(shí)現(xiàn)挽霉。隊(duì) 列中的元素必須實(shí)現(xiàn)Delayed接口和Comparable<Delayed>接口私恬,在創(chuàng)建元素時(shí)可以指定多久才能從隊(duì)列中獲取當(dāng)前元素。 只有在延遲期滿時(shí)才能從隊(duì)列中提取元素炼吴。
應(yīng)用場(chǎng)景
- 緩存系統(tǒng)的設(shè)計(jì):可以用DelayQueue保存緩存元素的有效期本鸣,使用一個(gè)線程循環(huán)查詢 DelayQueue,一旦能從DelayQueue中獲取元素時(shí)硅蹦,表示緩存有效期到了荣德。
- 定時(shí)任務(wù)調(diào)度:使用DelayQueue保存當(dāng)天將會(huì)執(zhí)行的任務(wù)和執(zhí)行時(shí)間,一旦從 DelayQueue中獲取到任務(wù)就開始執(zhí)行童芹,比如TimerQueue就是使用DelayQueue實(shí)現(xiàn)的涮瞻。
如何實(shí)現(xiàn)Delayed接口
DelayQueue隊(duì)列的元素必須實(shí)現(xiàn)Delayed接口。我們可以參考ScheduledThreadPoolExecutor 里ScheduledFutureTask類的實(shí)現(xiàn)假褪,一共有三步署咽。
第一步:在對(duì)象創(chuàng)建的時(shí)候,初始化基本數(shù)據(jù)。使用time記錄當(dāng)前對(duì)象延遲到什么時(shí)候可 以使用宁否,使用sequenceNumber來標(biāo)識(shí)元素在隊(duì)列中的先后順序窒升。代碼如下:
private static final AtomicLong sequencer = new AtomicLong(0);
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
ScheduledFutureTask(Runnable r, V result, long ns, long period){
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
}
第二步:實(shí)現(xiàn)getDelay方法,該方法返回當(dāng)前元素還需要延時(shí)多長(zhǎng)時(shí)間慕匠,單位是納秒饱须,代碼 如下:
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}
注意當(dāng)time小于當(dāng)前時(shí)間時(shí),getDelay會(huì)返回負(fù)數(shù)台谊,這時(shí)才可以出隊(duì)蓉媳。
第三步:實(shí)現(xiàn)compareTo方法來指定元素的順序。例如锅铅,讓延時(shí)時(shí)間最長(zhǎng)的放在隊(duì)列的末 尾酪呻。實(shí)現(xiàn)代碼如下:
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledThreadPoolExecutor.ScheduledFutureTask) {
ScheduledThreadPoolExecutor.ScheduledFutureTask<?> x = (ScheduledThreadPoolExecutor.ScheduledFutureTask<?>)other;
// 過期時(shí)間小的排前面,大的排后面盐须,如果一樣就使用sequenceNumber 來排序号杠。
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
// 快要過期的排在前面
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
如何實(shí)現(xiàn)延時(shí)阻塞隊(duì)列
延時(shí)阻塞隊(duì)列的實(shí)現(xiàn)很簡(jiǎn)單,當(dāng)消費(fèi)者從隊(duì)列里獲取元素時(shí)丰歌,如果元素沒有達(dá)到延時(shí)時(shí) 間姨蟋,就阻塞當(dāng)前線程。
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null) {
if (nanos <= 0)
return null;
else
// 隊(duì)列為NULL立帖,阻塞線程直到超時(shí)
nanos = available.awaitNanos(nanos);
} else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
if (nanos <= 0)
return null;
first = null; // don't retain ref while waiting
// 等待時(shí)間小于第一個(gè)元素的過期時(shí)間
if (nanos < delay || leader != null)
// 阻塞線程直到超時(shí)
nanos = available.awaitNanos(nanos);
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 等待時(shí)間大于第一個(gè)元素的過期時(shí)間眼溶,阻塞線程直到第一個(gè)元素過期
long timeLeft = available.awaitNanos(delay);
nanos -= delay - timeLeft;
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
// 喚醒其他阻塞線程
available.signal();
lock.unlock();
}
}
SynchronousQueue
SynchronousQueue是一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列。每一個(gè)put操作必須等待一個(gè)take操作晓勇, 否則不能繼續(xù)添加元素堂飞。 它支持公平訪問隊(duì)列。默認(rèn)情況下線程采用非公平性策略訪問隊(duì)列绑咱。
SynchronousQueue可以看成是一個(gè)傳球手绰筛,負(fù)責(zé)把生產(chǎn)者線程處理的數(shù)據(jù)直接傳遞給消費(fèi) 者線程。隊(duì)列本身并不存儲(chǔ)任何元素描融,非常適合傳遞性場(chǎng)景铝噩。SynchronousQueue的吞吐量高于 LinkedBlockingQueue和ArrayBlockingQueue。
LinkedTransferQueue
LinkedTransferQueue是一個(gè)由鏈表結(jié)構(gòu)組成的無界阻塞TransferQueue隊(duì)列窿克。相對(duì)于其他阻 塞隊(duì)列骏庸,LinkedTransferQueue多了tryTransfer和transfer方法。
transfer方法
如果當(dāng)前有消費(fèi)者正在等待接收元素(消費(fèi)者使用take()方法或帶時(shí)間限制的poll()方法 時(shí))年叮,transfer方法可以把生產(chǎn)者傳入的元素立刻transfer(傳輸)給消費(fèi)者具被。如果沒有消費(fèi)者在等 待接收元素,transfer方法會(huì)將元素存放在隊(duì)列的tail節(jié)點(diǎn)只损,并等到該元素被消費(fèi)者消費(fèi)了才返 回一姿。
LinkedBlockingDeque
LinkedBlockingDeque是一個(gè)由鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列。所謂雙向隊(duì)列指的是可以 從隊(duì)列的兩端插入和移出元素。雙向隊(duì)列因?yàn)槎嗔艘粋€(gè)操作隊(duì)列的入口叮叹,在多線程同時(shí)入隊(duì) 時(shí)艾栋,也就減少了一半的競(jìng)爭(zhēng)。相比其他的阻塞隊(duì)列衬横,LinkedBlockingDeque多了addFirst裹粤、 addLast终蒂、offerFirst蜂林、offerLast、peekFirst和peekLast等方法拇泣,以First單詞結(jié)尾的方法噪叙,表示插入、 獲让瓜琛(peek)或移除雙端隊(duì)列的第一個(gè)元素睁蕾。以Last單詞結(jié)尾的方法,表示插入债朵、獲取或移除雙 端隊(duì)列的最后一個(gè)元素子眶。另外,插入方法add等同于addLast序芦,移除方法remove等效于 removeFirst臭杰。但是take方法卻等同于takeFirst,不知道是不是JDK的bug谚中,使用時(shí)還是用帶有First 和Last后綴的方法更清楚渴杆。
在初始化LinkedBlockingDeque時(shí)可以設(shè)置容量防止其過度膨脹。另外宪塔,雙向阻塞隊(duì)列可以運(yùn)用在“工作竊取”模式中磁奖。
參考
《java并發(fā)編程的藝術(shù)》
源碼
https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases
spring-boot-student-concurrent 工程
layering-cache
為監(jiān)控而生的多級(jí)緩存框架 layering-cache這是我開源的一個(gè)多級(jí)緩存框架的實(shí)現(xiàn),如果有興趣可以看一下