阻塞隊(duì)列 BlockingQueue
BlockingQueue用法
-
BlockingQueue 通常用于一個(gè)線(xiàn)程生產(chǎn)對(duì)象婚惫,而另外一個(gè)線(xiàn)程消費(fèi)這些對(duì)象的場(chǎng)景氛赐。下圖是
對(duì)這個(gè)原理的闡述: 一個(gè)線(xiàn)程將會(huì)持續(xù)生產(chǎn)新對(duì)象并將其插入到隊(duì)列之中魂爪,直到隊(duì)列達(dá)到它所能容納的臨界點(diǎn)。
也就是說(shuō)艰管,它是有限的滓侍。如果該阻塞隊(duì)列到達(dá)了其臨界點(diǎn),負(fù)責(zé)生產(chǎn)的線(xiàn)程將會(huì)在往里邊插
入新對(duì)象時(shí)發(fā)生阻塞牲芋。它會(huì)一直處于阻塞之中撩笆,直到負(fù)責(zé)消費(fèi)的線(xiàn)程從隊(duì)列中拿走一個(gè)對(duì)象。
負(fù)責(zé)消費(fèi)的線(xiàn)程將會(huì)一直從該阻塞隊(duì)列中拿出對(duì)象街图。如果消費(fèi)線(xiàn)程嘗試去從一個(gè)空的隊(duì)列中
提取對(duì)象的話(huà)浇衬,這個(gè)消費(fèi)線(xiàn)程將會(huì)處于阻塞之中,直到一個(gè)生產(chǎn)線(xiàn)程把一個(gè)對(duì)象丟進(jìn)隊(duì)列餐济。
BlockingQueue 的方法
- BlockingQueue 具有 4 組不同的方法用于插入耘擂、移除以及對(duì)隊(duì)列中的元素進(jìn)行檢查。如果請(qǐng)求的操作不能得到立即執(zhí)行的話(huà)絮姆,每個(gè)方法的表現(xiàn)也不同醉冤。這些方法如下:
/ | 拋異常 | 特定值 | 阻塞 | 超時(shí) |
---|---|---|---|---|
插入 | add(o) | offer(o) | put(o) | offer(o, timeout, TimeUnit) |
移出 | remove() | poll() | take() | poll(timeout, timeunit) |
檢查 | element() | peek() |
- 四組不同的行為方式解釋?zhuān)?/li>
- 拋異常:如果試圖的操作無(wú)法立即執(zhí)行,拋一個(gè)異常篙悯。
- 特定值:如果試圖的操作無(wú)法立即執(zhí)行蚁阳,返回一個(gè)特定的值(常常是 true / false)。
- 阻塞:如果試圖的操作無(wú)法立即執(zhí)行鸽照,該方法調(diào)用將會(huì)發(fā)生阻塞螺捐,直到能夠執(zhí)行。
- 超時(shí):如果試圖的操作無(wú)法立即執(zhí)行矮燎,該方法調(diào)用將會(huì)發(fā)生阻塞定血,直到能夠執(zhí)行,但等待時(shí)間不會(huì)超過(guò)給定值诞外。返回一個(gè)特定值以告知該操作是否成功(典型的是 true / false)澜沟。
- 無(wú)法向一個(gè) BlockingQueue 中插入 null。如果你試圖插入 null峡谊,BlockingQueue 將會(huì)拋出一個(gè) NullPointerException茫虽。
方法詳解(只分析 ArrayBlockingQueue 這一種實(shí)現(xiàn),其他的類(lèi)似)
-
插入數(shù)據(jù)方法:add(o)既们、offer(o)濒析、put(o)、offer(o, timeout, TimeUnit)
-
void put(E e) throws InterruptedException;
:隊(duì)列的容量已滿(mǎn)時(shí)贤壁,線(xiàn)程會(huì)一直阻塞public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) // 當(dāng)隊(duì)列的容量已滿(mǎn)時(shí)悼枢,線(xiàn)程會(huì)一直阻塞著 notFull.await(); insert(e); // 會(huì)在插入方法中調(diào)用 notEmpty.signal(); 喚醒阻塞的take方法線(xiàn)程 } finally { lock.unlock(); } }
-
boolean offer(E e);
試著往隊(duì)列中插入值,不管隊(duì)列滿(mǎn)沒(méi)滿(mǎn)脾拆,都會(huì)返回結(jié)果public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) // 當(dāng)隊(duì)列已滿(mǎn)時(shí)馒索,直接返回false return false; else { insert(e); return true; } } finally { lock.unlock(); } }
-
boolean add(E e);
add方法實(shí)際上是AbstractQueue
中的方法莹妒,里面的實(shí)現(xiàn)是先調(diào)用上述的 offer 方法,當(dāng) offer 方法返回為 true 是绰上,直接返回true旨怠, 如果是返回 false 是,直接拋出異常IllegalStateException Queue full
public boolean add(E e) { if (offer(e)) // 直接調(diào)用 offer 方法 return true; else // 如果 offer 返回為 false蜈块, 那么會(huì)拋出 Queue full 異常 throw new IllegalStateException("Queue full"); }
-
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
跟offer(o)
方法相似鉴腻,會(huì)先判斷 隊(duì)列是否已滿(mǎn),如果隊(duì)列已滿(mǎn)百揭,會(huì)循環(huán)等待傳入的時(shí)間爽哎,當(dāng)超過(guò)設(shè)置的時(shí)間后,還是無(wú)空閑位置器一。會(huì)直接返回一個(gè)false课锌。public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) { // 輪詢(xún)判斷隊(duì)列是否已滿(mǎn) if (nanos <= 0) // 如果隊(duì)列滿(mǎn)了,判斷設(shè)置的等待時(shí)間是否超時(shí)了祈秕,超時(shí)了直接返回false return false; nanos = notFull.awaitNanos(nanos); // 沒(méi)超時(shí)渺贤,調(diào)用計(jì)算時(shí)間方法, 進(jìn)入下一個(gè)循環(huán)中繼續(xù)判斷 } insert(e); // 在輪詢(xún)等待的時(shí)間內(nèi)请毛,有空閑就立刻插入數(shù)據(jù) return true; } finally { lock.unlock(); } }
-
-
移出數(shù)據(jù)的方法:remove()志鞍、 remove(o)、poll()方仿、take()固棚、 poll(timeout, TimeUnit)
-
E poll();
獲取隊(duì)列頭的數(shù)據(jù),并刪除該數(shù)據(jù)(將該位置數(shù)據(jù)置為null)仙蚜。如果隊(duì)列已經(jīng)空了玻孟,那么直接返回 nullpublic E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : extract(); // 判斷隊(duì)列是否為空,為空直接返回null } finally { lock.unlock(); } } // 如果隊(duì)列不為空鳍征,調(diào)用extract() 方法, 這個(gè)方法調(diào)用必須獲取鎖 /** * Extracts element at current take position, advances, and signals. * Call only when holding lock. */ private E extract() { final Object[] items = this.items; E x = this.<E>cast(items[takeIndex]); // 獲取當(dāng)前位置的元素(隊(duì)列頭) items[takeIndex] = null; // 然后把該位置置為null takeIndex = inc(takeIndex); // 把索引向前推進(jìn) --count; // 元素容量計(jì)數(shù)也減少 notFull.signal(); // 通知其他線(xiàn)程面徽,可以進(jìn)行put操作了 return x; // 返回結(jié)果 }
-
E poll(long timeout, TimeUnit unit) throws InterruptedException;
判斷隊(duì)列是否為空艳丛,如果為空等待設(shè)置的時(shí)間,等待時(shí)間超過(guò)設(shè)置的時(shí)間趟紊。直接返回null氮双。public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) { // 判斷隊(duì)列是否為空 if (nanos <= 0) // 如果為空,判斷是否等待時(shí)間已耗盡 return null; // 耗盡霎匈,直接返回null nanos = notEmpty.awaitNanos(nanos); // 沒(méi)耗盡戴差,調(diào)用計(jì)算時(shí)間方法進(jìn)入下一個(gè)循環(huán) } return extract(); // 隊(duì)列不為空,直接調(diào)用獲取數(shù)據(jù)的方法 } finally { lock.unlock(); } }
-
E take() throws InterruptedException;
take 方法铛嘱,如果隊(duì)列為空暖释,會(huì)進(jìn)入阻塞狀態(tài)public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) //當(dāng)隊(duì)列中沒(méi)有元素時(shí)袭厂,會(huì)進(jìn)入阻塞狀態(tài)等待喚醒 notEmpty.await(); return extract(); } finally { lock.unlock(); } }
-
E remove();
實(shí)際是調(diào)用 poll() 方法獲取隊(duì)列頭對(duì)象并刪除,如果返回的值不為null球匕,那么久直接返回纹磺。如果為null 就拋出NoSuchElementException
public E remove() { E x = poll(); // 移出隊(duì)列頭對(duì)象 if (x != null) return x; else throw new NoSuchElementException(); // 如果返回的值是null 直接拋出異常信息 }
boolean remove(Object o);
該方法不管有沒(méi)有獲取到隊(duì)列頭對(duì)象,都會(huì)返回一個(gè)結(jié)果值亮曹。不會(huì)拋出異常信息橄杨。public boolean remove(Object o) { if (o == null) return false; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) { //遍歷隊(duì)列 if (o.equals(items[i])) { // 判斷對(duì)象是否相等 removeAt(i); return true; } } return false; // 如果沒(méi)有相等的對(duì)象返回false } finally { lock.unlock(); } } void removeAt(int i) { final Object[] items = this.items; // if removing front item, just advance if (i == takeIndex) { // 如果需要移出的對(duì)象就是當(dāng)前的隊(duì)列頭,直接置為null且索引后移 items[takeIndex] = null; takeIndex = inc(takeIndex); } else { // slide over all others up through putIndex. for (;;) { int nexti = inc(i); // 需要移出元素索引的后一個(gè)為 nexti if (nexti != putIndex) { // 當(dāng)下一個(gè)索引不等于下一個(gè)添加元素的位置, 首次移出時(shí)ArrayBlockingQueue 的 putIndex 為0照卦, 隊(duì)列頭式矫。移除后為移出的索引位置 items[i] = items[nexti]; // 后一個(gè)元素覆蓋前一個(gè)元素 i = nexti; // 繼續(xù)向后推進(jìn) } else { items[i] = null; // 把最后一個(gè)元素置為null putIndex = i; // 最新添加元素的位置 putIndex 為 i break; } } } --count; notFull.signal(); }
-
-
檢查數(shù)據(jù)的方法: element()、peek()
-
E peek();
返回隊(duì)列頭的元素役耕,但是不刪除采转。如果隊(duì)列為空返回nullpublic E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : itemAt(takeIndex); // 返回隊(duì)列頭位置元素 } finally { lock.unlock(); } } final E itemAt(int i) { return this.<E>cast(items[i]); // 直接返回隊(duì)列頭元素,不做刪除操作 }
-
E element();
返回隊(duì)列頭元素蹄葱,如果為 null 拋出NoSuchElementException
異常, 調(diào)用的實(shí)際上是peek()
方法氏义。public E element() { E x = peek(); if (x != null) return x; else throw new NoSuchElementException(); }
-
-
其他方法: contains(o)、remainingCapacity()图云、drainTo(Collection<? super E> c)惯悠、drainTo(Collection<? super E> c, int maxElements);
public boolean contains(Object o);
使用迭代器遍歷元素判斷是否包含Object,返回true or falseint remainingCapacity();
返回剩余容量大小竣况,如果是ArrayBlockingQueue 那么就是數(shù)組容量的大小減去已經(jīng)存在元素的個(gè)數(shù)值克婶。int drainTo(Collection<? super E> c);
把隊(duì)列全部轉(zhuǎn)換成 Collection 類(lèi),并且清空隊(duì)列自身丹泉。-
int drainTo(Collection<? super E> c, int maxElements);
從隊(duì)列頭部開(kāi)始情萤,轉(zhuǎn)換并清空 maxElements 個(gè)元素成 Collection 類(lèi)?
待后續(xù)的其他隊(duì)列實(shí)現(xiàn)類(lèi)粗略講解