Java中線程安全的容器主要包括兩類:
-
Vector
搬瑰、Hashtable
侣颂,以及封裝器類Collections.synchronizedList
和Collections.synchronizedMap
胰苏; - Java 5.0引入的
java.util.concurrent
包径缅,其中包含并發(fā)隊(duì)列猪腕、并發(fā)HashMap以及寫(xiě)入時(shí)復(fù)制容器配阵。
依筆者看馏颂,早期使用的同步容器主要有兩方面的問(wèn)題:1)通過(guò)對(duì)方法添加synchronized關(guān)鍵字實(shí)現(xiàn)同步,這種粗粒度的加鎖操作在synchronized關(guān)鍵字本身未充分優(yōu)化之前棋傍,效率偏低救拉;2)同步容器雖然是線程安全的,但在某些外部復(fù)合操作(例:若沒(méi)有則添加)時(shí)瘫拣,依然需要客戶端加鎖保證數(shù)據(jù)安全亿絮。因此,從Java 5.0以后拂铡,并發(fā)編程偏向于使用java.util.concurrent
包(作者:Doug Lea)中的容器類壹无,本文也將著重介紹該包中的容器類,主要包括:
- 阻塞隊(duì)列
- ConcurrentHashMap
- 寫(xiě)入時(shí)復(fù)制容器
一感帅、阻塞隊(duì)列
在并發(fā)環(huán)境下斗锭,阻塞隊(duì)列是常用的數(shù)據(jù)結(jié)構(gòu),它能確保數(shù)據(jù)高效安全的傳輸失球,為快速搭建高質(zhì)量的多線程應(yīng)用帶來(lái)極大的便利岖是,比如MQ的原理就是基于阻塞隊(duì)列的。java.util.concurrent
中包含豐富的隊(duì)列實(shí)現(xiàn)实苞,它們之間的關(guān)系如下圖所示:
- BlockingQueue豺撑、Deque(雙向隊(duì)列)繼承自Queue接口;
- BlockingDeque同時(shí)繼承自BlockingQueue黔牵、Deque接口聪轿,提供阻塞的雙向隊(duì)列屬性;
- LinkedBlockingQueue和LinkedBlockingDeque分別實(shí)現(xiàn)了BlockingQueue和BlockingDeque接口猾浦;
- DelayQueue實(shí)現(xiàn)了BlockingQueue接口陆错,提供任務(wù)延遲功能;
- TransferQueue是Java 7引入的金赦,用于替代BlockingQueue音瓷,LinkedTransferQueue是其實(shí)現(xiàn)類。
下面對(duì)這些隊(duì)列進(jìn)行詳細(xì)的介紹:
1.1 BlockingQueue與BlockingDeque
阻塞隊(duì)列(BlockingQueue)是一個(gè)支持兩個(gè)附加操作的隊(duì)列夹抗。這兩個(gè)附加的操作是:
- 在隊(duì)列為空時(shí)绳慎,獲取元素的線程會(huì)等待隊(duì)列變?yōu)榉强铡?/li>
- 當(dāng)隊(duì)列滿時(shí),存儲(chǔ)元素的線程會(huì)等待隊(duì)列可用。
阻塞隊(duì)列常用于生產(chǎn)者和消費(fèi)者的場(chǎng)景杏愤,生產(chǎn)者是往隊(duì)列里添加元素的線程靡砌,消費(fèi)者是從隊(duì)列里拿元素的線程。阻塞隊(duì)列就是生產(chǎn)者存放元素的容器珊楼,而消費(fèi)者也只從容器里拿元素乏奥。
阻塞隊(duì)列提供了四種處理方法:
方法 | 拋出異常 | 返回特殊值 | 一直阻塞 | 超時(shí)退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除方法 | remove() | poll() | take() | poll(time,unit) |
檢查方法 | element() | peek() | 不可用 | 不可用 |
- 拋出異常:是指當(dāng)阻塞隊(duì)列滿時(shí)候,再往隊(duì)列里插入元素亥曹,會(huì)拋出IllegalStateException("Queue full")異常邓了。當(dāng)隊(duì)列為空時(shí),從隊(duì)列里獲取元素時(shí)會(huì)拋出NoSuchElementException異常 媳瞪。
- 返回特殊值:插入方法會(huì)返回是否成功骗炉,成功則返回true。移除方法蛇受,則是從隊(duì)列里拿出一個(gè)元素句葵,如果沒(méi)有則返回null
- 一直阻塞:當(dāng)阻塞隊(duì)列滿時(shí),如果生產(chǎn)者線程往隊(duì)列里put元素兢仰,隊(duì)列會(huì)一直阻塞生產(chǎn)者線程乍丈,直到拿到數(shù)據(jù),或者響應(yīng)中斷退出把将。當(dāng)隊(duì)列空時(shí)轻专,消費(fèi)者線程試圖從隊(duì)列里take元素,隊(duì)列也會(huì)阻塞消費(fèi)者線程察蹲,直到隊(duì)列可用请垛。
- 超時(shí)退出:當(dāng)阻塞隊(duì)列滿時(shí),隊(duì)列會(huì)阻塞生產(chǎn)者線程一段時(shí)間洽议,如果超過(guò)一定的時(shí)間宗收,生產(chǎn)者線程就會(huì)退出。
BlockingDeque在BlockingQueue的基礎(chǔ)上亚兄,增加了支持雙向隊(duì)列的屬性混稽。如下圖所示,相比于BlockingQueue的插入和移除方法审胚,變?yōu)?code>XxxFirst匈勋,XxxLast
方法,分別對(duì)應(yīng)隊(duì)列的兩端菲盾,既可以在頭部添加或移除颓影,也可以在尾部添加或移除各淀。
1.2 LinkedBlockingQueue與LinkedBlockingDeque
LinkedBlockingQueue
是一個(gè)用鏈表實(shí)現(xiàn)的有界阻塞隊(duì)列懒鉴。此隊(duì)列的默認(rèn)和最大長(zhǎng)度為Integer.MAX_VALUE
,按照先進(jìn)先出的原則對(duì)元素進(jìn)行排序。
首先看下LinkedBlockingQueue
中核心的域:
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
private final int capacity;
private final AtomicInteger count = new AtomicInteger();
transient Node<E> head;
private transient Node<E> last;
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
-
LinkedBlockingQueue
和LinkedList
類似临谱,通過(guò)靜態(tài)內(nèi)部類Node<E>
進(jìn)行元素的存儲(chǔ)璃俗; -
capacity
表示阻塞隊(duì)列所能存儲(chǔ)的最大容量,在創(chuàng)建時(shí)可以手動(dòng)指定最大容量悉默,默認(rèn)的最大容量為Integer.MAX_VALUE
城豁; -
count
表示當(dāng)前隊(duì)列中的元素?cái)?shù)量,LinkedBlockingQueue
的入隊(duì)列和出隊(duì)列使用了兩個(gè)不同的lock對(duì)象抄课,因此無(wú)論是在入隊(duì)列還是出隊(duì)列唱星,都會(huì)涉及對(duì)元素?cái)?shù)量的并發(fā)修改,因此這里使用了一個(gè)原子操作類來(lái)解決對(duì)同一個(gè)變量進(jìn)行并發(fā)修改的線程安全問(wèn)題跟磨。 -
head
和last
分別表示鏈表的頭部和尾部间聊; -
takeLock
表示元素出隊(duì)列時(shí)線程所獲取的鎖,當(dāng)執(zhí)行take
抵拘、poll
等操作時(shí)線程獲劝チ瘛;notEmpty
當(dāng)隊(duì)列為空時(shí)僵蛛,通過(guò)該Condition
讓獲取元素的線程處于等待狀態(tài)尚蝌; -
putLock
表示元素入隊(duì)列時(shí)線程所獲取的鎖,當(dāng)執(zhí)行put
充尉、offer
等操作時(shí)獲绕浴;notFull
當(dāng)隊(duì)列容量達(dá)到capacity
時(shí)驼侠,通過(guò)該Condition
讓加入元素的線程處于等待狀態(tài)热凹。
其次,LinkedBlockingQueue
有三個(gè)構(gòu)造方法泪电,分別如下:
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
默認(rèn)構(gòu)造函數(shù)直接調(diào)用LinkedBlockingQueue(int capacity)
般妙,LinkedBlockingQueue(int capacity)
會(huì)初始化首尾節(jié)點(diǎn),并置位null相速。LinkedBlockingQueue(Collection<? extends E> c)
在初始化隊(duì)列的同時(shí)碟渺,將一個(gè)集合的全部元素加入隊(duì)列。
最后分析下put
和take
的過(guò)程突诬,這里重點(diǎn)關(guān)注:LinkedBlockingQueue
如何實(shí)現(xiàn)添加/移除并行的苫拍?
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
之所以把put
和take
放在一起,是因?yàn)樗鼈兪且粚?duì)互逆的過(guò)程:
-
put
在插入元素前首先獲得putLock
和當(dāng)前隊(duì)列的元素?cái)?shù)量旺隙,take
在去除元素前首先獲得takeLock
和當(dāng)前隊(duì)列的元素?cái)?shù)量绒极; -
put
時(shí)需要判斷當(dāng)前隊(duì)列是否已滿,已滿時(shí)當(dāng)前線程進(jìn)行等待蔬捷,take
時(shí)需要判斷隊(duì)列是否已空垄提,隊(duì)列為空時(shí)當(dāng)前線程進(jìn)行等待榔袋; -
put
調(diào)用enqueue
在隊(duì)尾插入元素,并修改尾指針铡俐,take
調(diào)用dequeue
將head
指向原來(lái)first
的位置凰兑,并將first的數(shù)據(jù)域置位null,實(shí)現(xiàn)刪除原first
指針审丘,并產(chǎn)生新的head
吏够,同時(shí),切斷原head
節(jié)點(diǎn)的引用滩报,便于垃圾回收锅知。
private void enqueue(Node<E> node) {
last = last.next = node;
}
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
- 最后,
put
根據(jù)count
決定是否觸發(fā)隊(duì)列未滿和隊(duì)列空脓钾;take
根據(jù)count
決定是否觸發(fā)隊(duì)列未空和隊(duì)列滿喉镰。
回到剛才的問(wèn)題:LinkedBlockingQueue
如何實(shí)現(xiàn)添加/移除并行的?
LinkedBlockingQueue
在入隊(duì)列和出隊(duì)列時(shí)使用的是不同的Lock惭笑,這也意味著它們之間的操作不會(huì)存在互斥侣姆。在多個(gè)CPU的情況下,可以做到在同一時(shí)刻既消費(fèi)沉噩、又生產(chǎn)捺宗,做到并行處理。
同樣的川蒙,LinkedBlockingDeque
在LinkedBlockingQueue
的基礎(chǔ)上蚜厉,增加了雙向操作的屬性。繼續(xù)以put
和take
為例畜眨,LinkedBlockingDeque
增加了putFirst
/putLast
昼牛、takeFirst
/takeLast
方法,分別用于在隊(duì)列頭康聂、尾進(jìn)行添加和刪除贰健。與LinkedBlockingQueue
不同的是,LinkedBlockingDeque
的入隊(duì)列和出隊(duì)列不再使用不同的Lock恬汁。
final ReentrantLock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();
其中伶椿,lock表示讀寫(xiě)的主鎖,notEmpty和notFull依然表示相應(yīng)的控制線程狀態(tài)條件量氓侧。以putFirst
和takeFirst
為例:
public void putFirst(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
while (!linkFirst(node))
notFull.await();
} finally {
lock.unlock();
}
}
putFirst
不支持插入null元素脊另,首先新建一個(gè)Node
對(duì)象,然后調(diào)用ReentrantLock
的lock
方法獲取鎖约巷,插入操作通過(guò)boolean linkFirst(Node<E> node)
實(shí)現(xiàn)偎痛,如果當(dāng)前隊(duì)列頭已滿,那么該線程等待(linkFirst
方法在寫(xiě)入元素成功后會(huì)釋放該鎖信號(hào))独郎,最后踩麦,在finally塊中釋放鎖(ReentrantLock
的使用)枚赡。
public E takeFirst() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
while ( (x = unlinkFirst()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}
與putFirst
類似,takeFirst
首先獲取鎖靖榕,然后在try中解除尾元素對(duì)象的引用,如果unlinkFirst
為空顽铸,表示隊(duì)列為空茁计,沒(méi)有元素可刪,那么該線程等待谓松。同樣星压,最后在finally塊中釋放鎖。
那么問(wèn)題來(lái)了鬼譬,LinkedBlockingDeque
為什么不使用LinkedBlockingQueue
讀寫(xiě)鎖分離的方式呢娜膘?LinkedBlockingDeque
與LinkedBlockingQueue
的使用場(chǎng)景有什么區(qū)別呢?
1.3 DelayQueue
DelayQueue
主要用于實(shí)現(xiàn)延時(shí)任務(wù)优质,比如:等待一段時(shí)間之后關(guān)閉連接竣贪,緩存對(duì)象過(guò)期刪除,任務(wù)超時(shí)處理等等巩螃,這些任務(wù)的共同特點(diǎn)是等待一段時(shí)間之后執(zhí)行(類似于TimerTask)演怎。DelayQueue
的實(shí)現(xiàn)包括三個(gè)核心特征:
- 延時(shí)任務(wù):
DelayQueue
的泛型類需要繼承自Delayed
接口,而Delayed
接口繼承自Comparable<Delayed>
避乏,用于隊(duì)列中優(yōu)先排序的比較爷耀; - 優(yōu)先隊(duì)列:
DelayQueue
的實(shí)現(xiàn)采用了優(yōu)先隊(duì)列PriorityQueue
,即延遲時(shí)間越短的任務(wù)越優(yōu)先(回憶下優(yōu)先隊(duì)列中二叉堆的實(shí)現(xiàn))拍皮。 - 阻塞隊(duì)列:支持并發(fā)讀寫(xiě)歹叮,采用
ReentrantLock
來(lái)實(shí)現(xiàn)讀寫(xiě)的鎖操作。
因此铆帽,DelayQueue
= Delayed
+ PriorityQueue
+ BlockingQueue
咆耿。
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
private Thread leader = null;
private final Condition available = lock.newCondition();
}
接下來(lái)看下DelayQueue
的讀寫(xiě)操作如何實(shí)現(xiàn)延時(shí)任務(wù)的?
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
首先執(zhí)行加鎖操作爹橱,然后往優(yōu)先隊(duì)列中插入元素e票灰,優(yōu)先隊(duì)列會(huì)調(diào)用泛型E的compareTo
方法進(jìn)行比較(具體關(guān)于二叉堆的操作,這里不再贅述宅荤,請(qǐng)參考數(shù)據(jù)結(jié)構(gòu)部分相關(guān)分析)屑迂,將延遲時(shí)間最短的任務(wù)添加到隊(duì)頭。最后檢查下元素是否為隊(duì)頭冯键,如果是隊(duì)頭的話惹盼,設(shè)置leader為空,喚醒所有等待的隊(duì)列惫确,釋放鎖手报。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
- 首先執(zhí)行加鎖操作蚯舱,然后取出優(yōu)先隊(duì)列的隊(duì)頭,如果對(duì)頭為空掩蛤,則該線程阻塞枉昏;
- 獲得對(duì)頭元素的延遲時(shí)間,如果延遲時(shí)間小于等于0揍鸟,說(shuō)明該元素已經(jīng)到了可以使用的時(shí)間兄裂,調(diào)用poll方法彈出該元素;
- 在延遲時(shí)間大于0時(shí)阳藻,首先釋放元素first的引用(避免內(nèi)存泄露)晰奖,其次判斷如果leader線程不為空,則該線程阻塞(表示已有線程在等待)腥泥。否則匾南,把當(dāng)前線程賦值給leader元素,然后阻塞delay的時(shí)間蛔外,即等待隊(duì)頭到達(dá)延遲時(shí)間蛆楞,在finally塊中釋放leader元素的引用。循環(huán)后夹厌,取出對(duì)頭元素臊岸,退出for循環(huán)。
- 最后尊流,如果leader為空并且優(yōu)先級(jí)隊(duì)列不為空的情況下(判斷還有沒(méi)有其他后續(xù)節(jié)點(diǎn))帅戒,調(diào)用signal通知其他的線程,并執(zhí)行解鎖操作崖技。
1.4 TransferQueue與LinkedTransferQueue
TransferQueue
是一個(gè)繼承了BlockingQueue
的接口逻住,并且增加若干新的方法。LinkedTransferQueue
是TransferQueue
接口的實(shí)現(xiàn)類迎献,其定義為一個(gè)無(wú)界的隊(duì)列瞎访,具有先進(jìn)先出(FIFO)的特性。
TransferQueue
接口主要包含以下方法:
public interface TransferQueue<E> extends BlockingQueue<E> {
boolean tryTransfer(E e);
void transfer(E e) throws InterruptedException;
boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException;
boolean hasWaitingConsumer();
int getWaitingConsumerCount();
}
- transfer(E e):若當(dāng)前存在一個(gè)正在等待獲取的消費(fèi)者線程吁恍,即立刻移交之扒秸;否則,會(huì)插入當(dāng)前元素e到隊(duì)列尾部冀瓦,并且等待進(jìn)入阻塞狀態(tài)伴奥,到有消費(fèi)者線程取走該元素。
- tryTransfer(E e):若當(dāng)前存在一個(gè)正在等待獲取的消費(fèi)者線程(使用take()或者poll()函數(shù))翼闽,使用該方法會(huì)即刻轉(zhuǎn)移/傳輸對(duì)象元素e拾徙;若不存在,則返回false感局,并且不進(jìn)入隊(duì)列尼啡。這是一個(gè)不阻塞的操作暂衡。
- tryTransfer(E e, long timeout, TimeUnit unit):若當(dāng)前存在一個(gè)正在等待獲取的消費(fèi)者線程,會(huì)立即傳輸給它崖瞭;否則將插入元素e到隊(duì)列尾部狂巢,并且等待被消費(fèi)者線程獲取消費(fèi)掉;若在指定的時(shí)間內(nèi)元素e無(wú)法被消費(fèi)者線程獲取书聚,則返回false唧领,同時(shí)該元素被移除。
- hasWaitingConsumer():判斷是否存在消費(fèi)者線程寺惫。
- getWaitingConsumerCount():獲取所有等待獲取元素的消費(fèi)線程數(shù)量疹吃。
LinkedTransferQueue
實(shí)現(xiàn)了上述方法蹦疑,較之于LinkedBlockingQueue
在隊(duì)列滿時(shí)西雀,入隊(duì)操作會(huì)被阻塞的特性,LinkedTransferQueue
在隊(duì)列不滿時(shí)也可以阻塞歉摧,只要沒(méi)有消費(fèi)者使用元素艇肴。下面來(lái)看下LinkedTransferQueue
的入隊(duì)和和出隊(duì)操作:transfer
和take
方法。
public void transfer(E e) throws InterruptedException {
if (xfer(e, true, SYNC, 0) != null) {
Thread.interrupted(); // failure possible only due to interrupt
throw new InterruptedException();
}
}
public E take() throws InterruptedException {
E e = xfer(null, false, SYNC, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
LinkedTransferQueue
入隊(duì)和和出隊(duì)都使用了一個(gè)關(guān)鍵方法:
private E xfer(E e, boolean haveData, int how, long nanos) {}
其中叁温,E
表示被操作的元素再悼,haveData
為true
表示添加數(shù)據(jù),false
表示移除數(shù)據(jù)膝但;how
有四種取值:NOW
, ASYNC
, SYNC
, 或者TIMED
冲九,分別表示執(zhí)行的時(shí)機(jī);nanos
表示how
為TIMED
時(shí)的時(shí)間限制跟束。
(xfer
方法具體流程較為復(fù)雜莺奸,這里不再展開(kāi)。另外冀宴,LinkedTransferQueue
采用了CAS非阻塞同步機(jī)制灭贷,后面會(huì)具體講到)