1.什么是阻塞隊(duì)列
阻塞隊(duì)列--BlockingQueue,它是一個(gè)接口潘酗,
public interface BlockingQueue<E> extends Queue<E>
BlcokingQueue繼承了Queue接口然眼,是隊(duì)列的一種,Queue和BlockingQueue都是在Java5中加入的琅攘,BlockingQueue是線程安全的,我們?cè)诤芏鄨?chǎng)景下都可以利用線程安全的隊(duì)列來(lái)優(yōu)雅地解決我們業(yè)務(wù)自身的線程安全問(wèn)題松邪。
主要并發(fā)隊(duì)列關(guān)系圖
上圖展示了Queue最主要的實(shí)現(xiàn)類乎澄,可以看出Java提供的線程安全的隊(duì)列(也成為并發(fā)隊(duì)列)主要分為阻塞隊(duì)列和非阻塞隊(duì)列倆大類。
- 阻塞隊(duì)列的典型例子就是BlockingQueue接口的實(shí)現(xiàn)類测摔,主要有6種實(shí)現(xiàn):ArrayBlcokingQueue,LinkedBlockingQueue,SynchronousQueue,DelayQueue,PriorityBlockingQueue和LinkedTransherQueue,它們各自有不同不同的特點(diǎn)。
- 非阻塞并發(fā)隊(duì)列最典型的就是ConcurrentLinkedQueue解恰,這個(gè)類不會(huì)讓線程阻塞锋八,利用CAS保證了線程安全。
- 從上圖還可以看到Deque护盈,就是雙端隊(duì)列挟纱,雙端隊(duì)列從頭和尾都能添加和刪除元素,而普通的Queue只能從一端進(jìn)入腐宋,另一端出去紊服。這是Queue和Deque最主要的區(qū)別檀轨,其它方面基本都差不多。
2.阻塞隊(duì)列的特點(diǎn)
阻塞隊(duì)列的特點(diǎn)就是阻塞兩個(gè)字欺嗤,哈哈哈哈参萄。像是廢話。阻塞功能使得生產(chǎn)者和消費(fèi)者兩端的能力得以平衡煎饼,當(dāng)有任何一端速度過(guò)快時(shí)讹挎,阻塞隊(duì)列便會(huì)把過(guò)快的速度降下來(lái)。最重要的兩個(gè)方法:take()和put()吆玖;
take()
take方法的功能是獲取并移除隊(duì)列的頭節(jié)點(diǎn)筒溃,通常在隊(duì)列里有數(shù)據(jù)的時(shí)候是可以正常移除的,當(dāng)隊(duì)列里無(wú)數(shù)據(jù)沾乘,則阻塞怜奖,直到隊(duì)列里面有數(shù)據(jù)。一旦有數(shù)據(jù)了翅阵,就立刻解除阻塞狀態(tài)歪玲,并且獲取到數(shù)據(jù) put()
put方法插入元素時(shí),如果隊(duì)列沒(méi)有滿可以正常插入怎顾,如果隊(duì)列滿了读慎,則阻塞,直到隊(duì)列里面有了空閑空間槐雾,就會(huì)消除阻塞狀態(tài)夭委,并把數(shù)據(jù)添加進(jìn)去。
阻塞隊(duì)列容量問(wèn)題
阻塞隊(duì)列分為兩種有界和無(wú)界募强。無(wú)界隊(duì)列意味著里面可以容納非常多的元素株灸,例如LinkedBlcokingQueue的上限是Integer.MAX_VALUE。學(xué)過(guò)Java的人都知道這個(gè)數(shù)其實(shí)挺大的擎值。但有界隊(duì)列例如ArrayBlockingQueue如果隊(duì)列滿了慌烧,也不會(huì)擴(kuò)容。
3.阻塞隊(duì)列常用方法
在阻塞隊(duì)列中有很多方法鸠儿,而且都非常相似屹蚊,這里我把常用的8個(gè)方法總結(jié)了一下以添加、刪除為主进每。主要分為三類:
- 拋出異常:add汹粤、remove、element
- 返回結(jié)果但是不拋出異常:offer田晚、poll嘱兼、peek、
- 阻塞:take贤徒、put芹壕、
3.1 拋出異常:add汇四、remove、element
add方法是往隊(duì)列里面添加一個(gè)元素踢涌,如果隊(duì)列滿了通孽,就會(huì)拋出異常來(lái)提示我們隊(duì)列已滿。
public static void main(String[] args) {
BlockingQueue<Integer> blockingQueue=new ArrayBlockingQueue<>(2);
blockingQueue.add(1);
blockingQueue.add(1);
blockingQueue.add(1);
}
這里我們指定隊(duì)列容量為2斯嚎,并且嘗試加入3個(gè)值利虫。超過(guò)了容量上限就會(huì)報(bào)IllegalStateException的錯(cuò)誤。
remove方法是刪除元素堡僻,如果我們隊(duì)列為空的時(shí)候又進(jìn)行了刪除操作糠惫,同樣會(huì)報(bào)NoSuchElementException。
public static void main(String[] args) {
BlockingQueue<Integer> blockingQueue=new ArrayBlockingQueue<>(2);
blockingQueue.add(1);
blockingQueue.add(1);
blockingQueue.remove();
blockingQueue.remove();
blockingQueue.remove();
}
這里我們指定容量為2钉疫,并且添加兩個(gè)元素硼讽,然后刪除三個(gè)元素。結(jié)果如下
element方法是返回隊(duì)列的頭節(jié)點(diǎn)牲阁,但是不會(huì)刪除這個(gè)元素固阁。當(dāng)隊(duì)列為空時(shí)同樣會(huì)報(bào)NoSuchElementException的錯(cuò)誤
public static void main(String[] args) {
BlockingQueue<Integer> blockingQueue=new ArrayBlockingQueue<>(2);
blockingQueue.element();
}
3.2返回結(jié)果但是不拋出異常offer、poll城菊、peek
offer方法用來(lái)插入一個(gè)元素备燃,如果插入成功會(huì)返回true,如果隊(duì)列滿了凌唬,再插入元素不會(huì)拋出異常但是會(huì)返回false并齐。
public static void main(String[] args) {
BlockingQueue<Integer> blockingQueue=new ArrayBlockingQueue<>(2);
System.out.println(blockingQueue.offer(1));
System.out.println(blockingQueue.offer(1));
System.out.println(blockingQueue.offer(1));
}
poll方法和remove方法是對(duì)應(yīng)的都是刪除元素,都會(huì)返回刪除的元素客税,但是當(dāng)隊(duì)列為空時(shí)則會(huì)返回null
public static void main(String[] args) {
BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(2);
blockingQueue.offer(1);
blockingQueue.offer(1);
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
}
peek方法和element方法對(duì)應(yīng)况褪,返回隊(duì)列的頭節(jié)點(diǎn)但并不刪除,如果隊(duì)列為空則直接返回null
public static void main(String[] args) {
BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(2);
System.out.println(blockingQueue.peek());
}
帶超時(shí)時(shí)間的offer和poll
offer(E e, long timeout, TimeUnit unit)
它有三個(gè)參數(shù)更耻,分別是元素测垛、超時(shí)時(shí)長(zhǎng)和時(shí)間單位。通常情況下秧均,這個(gè)方法會(huì)插入成功并且返回true食侮;如果隊(duì)列滿了導(dǎo)致插入不成功,在調(diào)用帶超時(shí)時(shí)間重載方法的offer的時(shí)候目胡,則會(huì)等待指定的超時(shí)時(shí)間锯七,如果到了時(shí)間依然沒(méi)有插入成功,則返回false讶隐。
E poll(long timeout, TimeUnit unit)
這個(gè)帶參數(shù)的poll和上面的offer類似。如果能夠移除久又,便會(huì)立即返回這個(gè)節(jié)點(diǎn)的內(nèi)容巫延;如果超過(guò)了我們定義的超時(shí)時(shí)間依然沒(méi)有元素可以移除效五,便會(huì)返回null作為提示。
3.2 阻塞put和take
put方法的作用是插入元素炉峰,通常在隊(duì)列沒(méi)有滿的時(shí)候是正常插入畏妖。如果隊(duì)列滿了無(wú)法繼續(xù)插入,這時(shí)它不會(huì)立刻返回false和拋出異常疼阔,而是讓插入的線程進(jìn)入阻塞狀態(tài)戒劫,直到隊(duì)列里面有空閑空間了。此時(shí)隊(duì)列就會(huì)讓之前的線程接觸阻塞狀態(tài)婆廊,并把剛才那個(gè)元素添加進(jìn)去迅细。 take方法的作用是獲取并移除隊(duì)列的頭節(jié)點(diǎn)。通常隊(duì)列里面有元素會(huì)正常取出數(shù)據(jù)并移除淘邻;但是如果執(zhí)行take的時(shí)候隊(duì)列里無(wú)數(shù)據(jù)茵典,則阻塞,直到隊(duì)列里面有數(shù)據(jù)以后宾舅,就會(huì)立即解除阻塞狀態(tài)统阿,并且取到數(shù)據(jù)。 搞了個(gè)總結(jié)可以看一下
4.常見(jiàn)的阻塞隊(duì)列
4.1 ArrayBlockingQueue
ArrayBlockingQueue是最典型的有界隊(duì)列筹我,其內(nèi)部是用數(shù)組存儲(chǔ)元素的扶平,利用Reentrant實(shí)現(xiàn)線程安全
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
我們?cè)趧?chuàng)建它的時(shí)候就需要指定它的容量,之后也不可以在擴(kuò)容了蔬蕊,在構(gòu)造函數(shù)中我們同樣可以指定是否是公平的结澄。如果ArrayBlockingQueue被設(shè)置為非公平的,那么就存在插隊(duì)的可能袁串;如果設(shè)置為公平的概而,那么等待了最長(zhǎng)時(shí)間的線程會(huì)優(yōu)先被處理,其它線程不允許插隊(duì)囱修。
4.2 LinkedBlockingQueue
LinkedBlockingQueue內(nèi)部使用鏈表實(shí)現(xiàn)的赎瑰,如果我們不指定它的初始容量,那么它的默認(rèn)容量就為整形的最大值Integer.MAX_VALUE,由于這個(gè)數(shù)特別特別的大破镰,所以它也被稱為無(wú)界隊(duì)列餐曼。
4.3 SynchronousQueue
SynchronousQueue最大的不同之處在于,它的容量不同鲜漩,所以沒(méi)有地方來(lái)暫存元素源譬,導(dǎo)致每次取數(shù)據(jù)都要先阻塞,直到有數(shù)據(jù)放入孕似;同理踩娘,每次放數(shù)據(jù)的時(shí)候也會(huì)阻塞,直到有消費(fèi)者來(lái)取喉祭。SynchronousQueue的容量不是1而是0养渴,因?yàn)镾ynchronousQueue不需要去持有元素雷绢,它做的就是直接傳遞。
4.4 PriorityBlockingQueue
PriorityBlockingQueue是一個(gè)支持優(yōu)先級(jí)的無(wú)界阻塞隊(duì)列理卑,可以通過(guò)自定義類實(shí)現(xiàn)compareTo()方法來(lái)制定元素排序規(guī)則翘紊,或者初始化時(shí)通過(guò)構(gòu)造器參數(shù)Comparator來(lái)制定排序規(guī)則。同時(shí)藐唠,插入隊(duì)列的對(duì)象必須是可比較大小的帆疟,也就是Comparable的,否則就會(huì)拋出ClasscastException異常宇立。
它的take方法在隊(duì)列為空時(shí)會(huì)阻塞踪宠,但是正因?yàn)樗菬o(wú)界隊(duì)列,而且會(huì)自動(dòng)擴(kuò)容泄伪,所以它的隊(duì)列永遠(yuǎn)不會(huì)滿殴蓬,所以它的put()方法永遠(yuǎn)不會(huì)阻塞,添加操作始終都會(huì)成功蟋滴。
4.5 DelayQueue
Delay這個(gè)隊(duì)列比較特殊染厅,具有延遲的功能,我們可以設(shè)定在隊(duì)列中的任務(wù)延遲多久之后執(zhí)行津函。它是無(wú)界隊(duì)列肖粮,但是放入的元素必須實(shí)現(xiàn)Delayed接口,而Delayed接口又繼承了Comparable接口尔苦,所以自然就擁有了比較和排序的能力.
public interface Delayed extends Comparable<Delayed> {
/**
* Returns the remaining delay associated with this object, in the
* given time unit.
*
* @param unit the time unit
* @return the remaining delay; zero or negative values indicate
* that the delay has already elapsed
*/
long getDelay(TimeUnit unit);
}
可以看出這個(gè)Delayed接口繼承自Comparable涩馆,里面需要涉嫌getDelay.這里的getDelay方法返回的是還剩下多長(zhǎng)的延遲時(shí)間才會(huì)被執(zhí)行。如果返回0或者負(fù)數(shù)則代表任務(wù)已過(guò)期允坚。元素會(huì)根據(jù)延遲時(shí)間的長(zhǎng)短被放到隊(duì)列的不同位置魂那,越靠近隊(duì)列頭代表越早過(guò)期。
5 阻塞和非阻塞隊(duì)列的并發(fā)安全原理是什么
5.1阻塞隊(duì)列-以ArrayBlockingQueue為例
先來(lái)看一下ArrayBlockingQueue的幾個(gè)重要屬性
//用于存放元素的數(shù)組
final Object[] items;
//下一次讀取的位置
int takeIndex;
// 下一次寫(xiě)入的位置
int putIndex;
// 隊(duì)列中元素的數(shù)量
int count;
/*
* 用于控制并發(fā)的工具類
*/
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
ArrayBlockingQueue實(shí)現(xiàn)并發(fā)同步的原理就是利用ReentrantLock和它的倆個(gè)Condition稠项,讀操作和寫(xiě)操作都需要先獲取到ReentrantLock獨(dú)占鎖才能進(jìn)行下一步操作涯雅。進(jìn)行讀操作時(shí)如果隊(duì)列為空,線程就會(huì)進(jìn)入到讀線程專屬的notEmpty的Condition的隊(duì)列中去排隊(duì)展运,等待寫(xiě)線程寫(xiě)入新的元素活逆;同理隊(duì)列已滿,這個(gè)時(shí)候?qū)懖僮鞯木€程進(jìn)入到寫(xiě)線程專屬的notFull隊(duì)列中去排隊(duì)拗胜,等待讀線程將隊(duì)列元素移除并騰出空間蔗候。
看一下ArrayBlockingQueue的put方法。
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
在put方法中埂软,首先用checkNotNull方法檢查插入的元素是不是null锈遥,如果不是null,我們會(huì)用Reentrantlock上鎖,并且使用的上鎖方法是lock.lockInterruptibly()所灸。這個(gè)方法在獲取鎖的同時(shí)是可以相應(yīng)中斷的儿礼,這也正是我們的阻塞隊(duì)列調(diào)用put方法時(shí),在嘗試獲取鎖但還沒(méi)拿到鎖的期間可以響應(yīng)中斷的底層原因庆寺。緊接著在while循環(huán)中,它會(huì)檢查當(dāng)前隊(duì)列是不是已經(jīng)滿了诉字,也就是count的長(zhǎng)度是否等于數(shù)組長(zhǎng)度懦尝。如果等于代表已經(jīng)滿了,于是我們便會(huì)進(jìn)行等待壤圃,直到有空余的時(shí)候陵霉,我們才會(huì)執(zhí)行下一步操作,調(diào)用enqueue方法讓元素進(jìn)入隊(duì)列伍绳,最后用unlock方法解鎖踊挠。
5.2 非阻塞隊(duì)列ConcurrentLinkedQueue
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// p is last node
if (p.casNext(null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
}
}
通過(guò)源碼我們可以清晰的看出,非阻塞隊(duì)列ConcurrentLinkedQueue它的主要流程就是在判空以后冲杀,會(huì)進(jìn)入一個(gè)大循環(huán)中效床,p.casNext()方法,這個(gè)方法正是利用了CAS來(lái)操作的权谁,這個(gè)死循環(huán)去配合CAS其實(shí)就是我們平時(shí)說(shuō)的樂(lè)觀鎖的思想剩檀。其實(shí)非阻塞隊(duì)列ConcurrentLickedQueue使用CAS非阻塞算法+不停重試的實(shí)際來(lái)實(shí)現(xiàn)線程安全的。
6 線程池對(duì)于阻塞隊(duì)列的選擇
- FixedThreadPool選取的是LinkedBlcokingQueue(同理SingleThreadExecutor) 首先我們知道LinkedBlockingQueu默認(rèn)是無(wú)限長(zhǎng)的旺芽,而FixedThreadPool的線程數(shù)是固定的沪猴,當(dāng)核心線程數(shù)都在被使用時(shí),這個(gè)時(shí)候如果進(jìn)來(lái)新的任務(wù)會(huì)被放進(jìn)阻塞隊(duì)列中采章。由于隊(duì)列是沒(méi)有容量上限的运嗜,隊(duì)列永遠(yuǎn)不會(huì)被填滿,這樣就保證了線程池FixedThreadPool和SingleThreadExecutor悯舟,不會(huì)拒絕新任務(wù)的提交担租,也不會(huì)丟失數(shù)據(jù)。
- CachedThreadPool選取的是SynchronousQueue 首先CachedThreadPool的線程最大數(shù)量是無(wú)限的图谷,也就意味著它的線程數(shù)不會(huì)受限制翩活,那么它就不需要額外的空間來(lái)存儲(chǔ)那些Task,因?yàn)槊總€(gè)任務(wù)都可以通過(guò)新建線程來(lái)處理便贵。SynchronousQueue會(huì)直接把任務(wù)交給線程菠镇,不保存它們,效率更好承璃。
- ScheduledThreadPool選取的是延遲隊(duì)列
- 對(duì)于ScneduledThreadPool而言利耍,它使用的是DelayedWorkQueue,延遲隊(duì)列的特點(diǎn)是:不是先進(jìn)先出,而是會(huì)按照延遲時(shí)間的長(zhǎng)短來(lái)排序隘梨,下一個(gè)即將執(zhí)行的任務(wù)會(huì)排到隊(duì)列的最前面程癌。選擇使用延遲隊(duì)列的原因是,ScheduledThreadPool處理的是基于時(shí)間而執(zhí)行的Task轴猎,而延遲隊(duì)列有能力把Task按照?qǐng)?zhí)行時(shí)間的