????在并發(fā)編程中姐帚,有時候需要使用線程安全的隊列苏揣。如果要實現(xiàn)一個線程安全的隊列有兩種方式:一種是使用阻塞算法氯夷,另一種是使用非阻塞算法边器。
????使用阻塞算法的隊列可以用一個鎖(入隊和出隊用同一把鎖)或兩個鎖(入隊和出隊用不同的鎖)等方式來實現(xiàn)。非阻塞的實現(xiàn)方式則可以使用循環(huán)CAS的方式來實現(xiàn)筛严。
阻塞隊列:
阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列醉旦。這兩個附加的操作支持阻塞的插入和移除方法。
1)支持阻塞的插入方法:意思是當(dāng)隊列滿時脑漫,隊列會阻塞插入元素的線程髓抑,直到隊列不滿。
2)支持阻塞的移除方法:意思是在隊列為空時优幸,獲取元素的線程會等待隊列變?yōu)榉强铡?br>
阻塞隊列常用于生產(chǎn)者和消費者的場景吨拍,生產(chǎn)者是向隊列里添加元素的線程,消費者是從隊列里取元素的線程网杆。阻塞隊列就是生產(chǎn)者用來存放元素羹饰、消費者用來獲取元素的容器。
下表是阻塞隊列的部分方法:
方法\處理方式 | 拋出異常 | 返回特殊值 | 一直阻塞 | 超時退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除方法 | remove() | poll() | take() | poll(time,unit) |
檢查方法 | element() | peek() | 不可用 | 不可用 |
拋出異常:是指當(dāng)阻塞隊列滿時候碳却,再往隊列里插入元素队秩,會拋出IllegalStateException(“Queue full”)異常。當(dāng)隊列為空時昼浦,從隊列里獲取元素時會拋出NoSuchElementException異常 馍资。
返回特殊值:插入方法會返回是否成功,成功則返回true关噪。移除方法鸟蟹,則是從隊列里拿出一個元素,如果沒有則返回null
一直阻塞:當(dāng)阻塞隊列滿時使兔,如果生產(chǎn)者線程往隊列里put元素建钥,隊列會一直阻塞生產(chǎn)者線程,直到拿到數(shù)據(jù)虐沥,或者響應(yīng)中斷退出熊经。當(dāng)隊列空時,消費者線程試圖從隊列里take元素欲险,隊列也會阻塞消費者線程镐依,直到隊列可用。
超時退出:當(dāng)阻塞隊列滿時天试,隊列會阻塞生產(chǎn)者線程一段時間槐壳,如果超過一定的時間,生產(chǎn)者線程就會退出秋秤。
JDK7提供了7個阻塞隊列宏粤。分別是:
ArrayBlockingQueue :一個由數(shù)組結(jié)構(gòu)組成的有界阻塞隊列。
LinkedBlockingQueue:一個由鏈表結(jié)構(gòu)組成的有界阻塞隊列灼卢。
PriorityBlockingQueue :一個支持優(yōu)先級排序的無界阻塞隊列绍哎。
DelayQueue:一個使用優(yōu)先級隊列實現(xiàn)的無界阻塞隊列。
SynchronousQueue:一個不存儲元素的阻塞隊列鞋真。
LinkedTransferQueue:一個由鏈表結(jié)構(gòu)組成的無界阻塞隊列崇堰。
LinkedBlockingDeque:一個由鏈表結(jié)構(gòu)組成的雙向阻塞隊列。
簡單介紹下其中三個隊列:
SynchronousQueue
????SynchronousQueue是無界的涩咖,是一種無緩沖的等待隊列海诲,但是由于該Queue本身的特性,在某次添加元素后必須等待其他線程取走后才能繼續(xù)添加檩互;可以認(rèn)為SynchronousQueue是一個緩存值為1的阻塞隊列特幔,但是 isEmpty()方法永遠(yuǎn)返回是true,remainingCapacity() 方法永遠(yuǎn)返回是0闸昨,remove()和removeAll() 方法永遠(yuǎn)返回是false蚯斯,iterator()方法永遠(yuǎn)返回空,peek()方法永遠(yuǎn)返回null饵较。
????聲明一個SynchronousQueue有兩種不同的方式拍嵌,它們之間有著不太一樣的行為。公平模式和非公平模式的區(qū)別:如果采用公平模式:SynchronousQueue會采用公平鎖循诉,并配合一個FIFO隊列來阻塞多余的生產(chǎn)者和消費者横辆,從而體系整體的公平策略;但如果是非公平模式(SynchronousQueue默認(rèn)):SynchronousQueue采用非公平鎖茄猫,同時配合一個LIFO隊列來管理多余的生產(chǎn)者和消費者狈蚤,而后一種模式,如果生產(chǎn)者和消費者的處理速度有差距募疮,則很容易出現(xiàn)饑渴的情況炫惩,即可能有某些生產(chǎn)者或者是消費者的數(shù)據(jù)永遠(yuǎn)都得不到處理。
LinkedBlockingQueue
????LinkedBlockingQueue默認(rèn)大小是Integer.MAX_VALUE阿浓,可以設(shè)定大小他嚷,可以理解為一個緩存的有界等待隊列。
????基于鏈表的阻塞隊列芭毙,內(nèi)部維持著一個數(shù)據(jù)緩沖隊列(該隊列由鏈表構(gòu)成)筋蓖。當(dāng)生產(chǎn)者往隊列中放入一個數(shù)據(jù)時,隊列會從生產(chǎn)者手中獲取數(shù)據(jù)退敦,并緩存在隊列內(nèi)部粘咖,而生產(chǎn)者立即返回;只有當(dāng)隊列緩沖區(qū)達(dá)到最大值緩存容量時(LinkedBlockingQueue可以通過構(gòu)造函數(shù)指定該值)侈百,才會阻塞生產(chǎn)者隊列瓮下,直到消費者從隊列中消費掉一份數(shù)據(jù)翰铡,生產(chǎn)者線程會被喚醒,反之對于消費者這端的處理也基于同樣的原理讽坏。
????LinkedBlockingQueue之所以能夠高效的處理并發(fā)數(shù)據(jù)锭魔,還因為其對于生產(chǎn)者端和消費者端分別采用了獨立的鎖來控制數(shù)據(jù)同步,這也意味著在高并發(fā)的情況下生產(chǎn)者和消費者可以并行地操作隊列中的數(shù)據(jù)路呜,以此來提高整個隊列的并發(fā)性能迷捧。
ArrayListBlockingQueue
????ArrayListBlockingQueue是有界的,是一個有界緩存的等待隊列胀葱。
????基于數(shù)組的阻塞隊列漠秋,同LinkedBlockingQueue類似,內(nèi)部維持著一個定長數(shù)據(jù)緩沖隊列(該隊列由數(shù)組構(gòu)成)抵屿。ArrayBlockingQueue內(nèi)部還保存著兩個整形變量庆锦,分別標(biāo)識著隊列的頭部和尾部在數(shù)組中的位置。
????ArrayBlockingQueue在生產(chǎn)者放入數(shù)據(jù)和消費者獲取數(shù)據(jù)轧葛,都是共用同一個鎖對象肥荔,由此也意味著兩者無法真正并行運行,這點尤其不同于LinkedBlockingQueue朝群;按照實現(xiàn)原理來分析燕耿,ArrayBlockingQueue完全可以采用分離鎖,從而實現(xiàn)生產(chǎn)者和消費者操作的完全并行運行姜胖。Doug Lea之所以沒這樣去做誉帅,也許是因為ArrayBlockingQueue的數(shù)據(jù)寫入和獲取操作已經(jīng)足夠輕巧,以至于引入獨立的鎖機(jī)制右莱,除了給代碼帶來額外的復(fù)雜性外蚜锨,其在性能上完全占不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue間還有一個明顯的不同之處在于慢蜓,前者在插入或刪除元素時不會產(chǎn)生或銷毀任何額外的對象實例亚再,而后者則會生成一個額外的Node對象。這在長時間內(nèi)需要高效并發(fā)地處理大批量數(shù)據(jù)的系統(tǒng)中晨抡,其對于GC的影響還是存在一定的區(qū)別氛悬。
非阻塞隊列:
ConcurrentLinkedQueue
????ConcurrentLinkedQueue是一個基于鏈接節(jié)點的無界線程安全隊列,它采用先進(jìn)先出的規(guī)則對節(jié)點進(jìn)行排序耘柱,當(dāng)我們添加一個元素的時候如捅,它會添加到隊列的尾部;當(dāng)我們獲取一個元素時调煎,它會返回隊列頭部的元素镜遣。
結(jié)構(gòu)如下:
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
implements Queue<E>, java.io.Serializable {
private transient volatile Node<E> head;//頭指針
private transient volatile Node<E> tail;//尾指針
public ConcurrentLinkedQueue() {//初始化,head=tail=(一個空的頭結(jié)點)
head = tail = new Node<E>(null);
}
private static class Node<E> {
volatile E item;
volatile Node<E> next;//內(nèi)部是使用單向鏈表實現(xiàn)
......
}
......
}
入隊和出隊操作均利用CAS(compare and set)更新士袄,這樣允許多個線程并發(fā)執(zhí)行悲关,并且不會因為加鎖而阻塞線程谎僻,使得并發(fā)性能更好。
參考文章:https://blog.csdn.net/danengbinggan33/article/details/73105838