目錄
最近在閱讀Spark源碼的過程中孽糖,又重新接觸到了一些Java并發(fā)方面的知識纵装,于是就見縫插針地將它們記錄下來右蕊,當做復(fù)習(xí)與備忘足丢。
阻塞隊列簡介
阻塞隊列的定義
根據(jù)Doug Lea在JavaDoc中的解釋,所謂阻塞隊列脓钾,就是在普通隊列的基礎(chǔ)之上,支持以下兩種操作的隊列:
- 當某線程從隊列獲取元素時桩警,如果隊列為空可训,就等待(阻塞)直至隊列中有元素;
- 當某線程向隊列插入元素時捶枢,如果隊列已滿握截,就等待(阻塞)直至隊列中有空間。
也就是說烂叔,阻塞隊列是自帶同步機制的隊列谨胞。它最常用來解決線程同步中經(jīng)典的生產(chǎn)者-消費者問題,前面講過的Spark Core異步事件總線中蒜鸡,就采用阻塞隊列作為事件存儲胯努。
Java中的阻塞隊列
Java中阻塞隊列的基類是j.u.c.BlockingQueue接口牢裳,它繼承自Queue接口,并且定義了額外的方法實現(xiàn)同步:
void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit) throws InterruptedException;
上述put()與offer()方法用于向隊列插入元素叶沛,take()與poll()方法則是從隊列獲取元素蒲讯。不同的是,put()與take()方法在插入/獲取時灰署,如果必須等待判帮,就會一直阻塞下去;而offer()與poll()方法可以指定阻塞的時間長度溉箕。
以BlockingQueue接口為中心的繼承關(guān)系如下圖所示脊另。
平時開發(fā)中比較常用的阻塞隊列是基于數(shù)組實現(xiàn)的ArrayBlockingQueue,與基于單鏈表實現(xiàn)的LinkedBlockingQueue约巷。本文選擇后者來深入看一下阻塞隊列的實現(xiàn)細節(jié),因為它的性能在多數(shù)情況下更優(yōu)旱捧,可以自行寫benchmark程序來測測独郎。
LinkedBlockingQueue
LinkedBlockingQueue(以下簡稱LBQ)是基于單鏈表實現(xiàn)的,先進先出(FIFO)的有界阻塞隊列枚赡。
單鏈表定義
LBQ的單鏈表結(jié)點數(shù)據(jù)結(jié)構(gòu)定義在靜態(tài)內(nèi)部類Node中氓癌。
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
在類的內(nèi)部還定義了單鏈表的頭結(jié)點與尾結(jié)點。
transient Node<E> head;
private transient Node<E> last;
head始終指向鏈表的第一個結(jié)點贫橙,該結(jié)點是哨兵結(jié)點贪婉,不存儲數(shù)據(jù),只標記鏈表的開始卢肃,即head.item == null
疲迂。這樣可以避免只有一個結(jié)點時造成混亂。
tail始終指向鏈表的最后一個結(jié)點莫湘,該結(jié)點是有數(shù)據(jù)的尤蒿,并滿足last.next == null
。
LBQ在隊頭獲取及彈出元素幅垮,在隊尾插入元素腰池。
鎖和等待隊列
LBQ采用雙鎖機制保證入隊和出隊可以同時進行,互不干擾忙芒。
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
可見定義有兩個ReentrantLock示弓,takeLock用于控制出隊,putLock用于控制入隊呵萨。另外奏属,還有這兩個鎖分別對應(yīng)的條件變量notEmpty和notFull,分別維護出隊和入隊線程的等待隊列甘桑。ReentrantLock和Condition都是Java AQS機制的重要組成部分拍皮,之后也會細說歹叮。
值得注意的是,在某些方法中需要同時對takeLock與putLock加鎖與解鎖铆帽,所以LBQ內(nèi)部也提供了這樣的方法咆耿。
void fullyLock() {
putLock.lock();
takeLock.lock();
}
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
這兩個方法總會成對調(diào)用,保證所有需要同時加鎖和解鎖的地方爹橱,其順序都一致并且不可中斷萨螺,也防止了前一個鎖操作成功執(zhí)行,后一個鎖操作被打斷導(dǎo)致死鎖的風(fēng)險愧驱。
另外慰技,LBQ也對條件變量的Condition.signal()方法進行了簡單封裝,分別用來喚醒阻塞的出隊操作線程和入隊操作線程组砚。
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
容量和計數(shù)
private final int capacity;
private final AtomicInteger count = new AtomicInteger();
capacity是LBQ的最大容量吻商,可以在構(gòu)造方法中隨同名參數(shù)傳入,默認值是Integer.MAX_VALUE糟红。
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
count則是LBQ內(nèi)當前元素的計數(shù)艾帐,由于入隊和出隊動作可以并發(fā)執(zhí)行,所以要用原子類型AtomicInteger保證線程安全盆偿。
入隊操作
由于put()和offer()方法的邏輯基本相同柒爸,所以只看offer()方法就好了。
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
在入隊時事扭,首先將putLock加鎖捎稚,然后用衛(wèi)語句count.get() == capacity
判斷隊列是否已滿,若已滿求橄,則進入等待循環(huán)今野。當阻塞的時間超時后,判定入隊操作失敗谈撒,并返回false腥泥。
如果隊列未滿,或者在超時時間未到時有了空間啃匿,就調(diào)用enqueue()方法在隊尾插入元素蛔外,并將計數(shù)器自增。入隊后若還有更多的剩余空間溯乒,則喚醒其他等待的入隊線程夹厌。
最后將putLock解鎖,并檢查由count.getAndIncrement()返回的值是否為0裆悄。如果為0矛纹,表示隊列剛剛由空變?yōu)榉强諣顟B(tài),因此也要喚醒等待的出隊線程光稼。
出隊操作
同理或南,只看poll()方法孩等。
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
將講解入隊方法時的描述反著說一遍就行了:
在出隊時,首先將takeLock加鎖采够,然后用衛(wèi)語句count.get() == 0
判斷隊列是否為空肄方,若為空,則進入等待循環(huán)蹬癌。當阻塞的時間超時后权她,判定出隊操作失敗,并返回false逝薪。
如果隊列不為空隅要,或者在超時時間未到時進了新元素,就調(diào)用dequeue()方法彈出隊頭元素董济,并將計數(shù)器自減步清。出隊后若還有更多的剩余元素,則喚醒其他等待的出隊線程虏肾。
最后將takeLock解鎖尼啡,并檢查由count.getAndDecrement()返回的值是否為capacity。如果為capacity询微,表示隊列剛剛由滿變?yōu)椴粷M狀態(tài),因此也要喚醒等待的入隊線程狂巢。
需要操作雙鎖的情況
以remove()方法為例撑毛。
public boolean remove(Object o) {
if (o == null) return false;
fullyLock();
try {
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
fullyUnlock();
}
}
void unlink(Node<E> p, Node<E> trail) {
// assert isFullyLocked();
// p.next is not changed, to allow iterators that are
// traversing p to maintain their weak-consistency guarantee.
p.item = null;
trail.next = p.next;
if (last == p)
last = trail;
if (count.getAndDecrement() == capacity)
notFull.signal();
}
由于單鏈表刪除結(jié)點涉及到對鏈表的遍歷,以及對前驅(qū)和后繼結(jié)點的斷鏈和補鏈唧领,因此必須將兩個鎖都加上藻雌,禁止一切修改。待刪除成功后才能解鎖斩个,繼續(xù)正常的入隊和出隊操作胯杭。
生產(chǎn)者-消費者問題示例
生產(chǎn)者-消費者問題的解決方法用操作系統(tǒng)理論中的信號量PV(wait-signal)原語描述如下:
semaphore filled = 0;
semaphore empty = BUF_CAPACITY;
mutex_semaphore mutex = 1;
procedure producer() {
while (true) {
item = produce();
wait(empty);
wait(mutex);
buffer.put(item);
signal(mutex);
signal(filled);
}
}
procedure consumer() {
while (true) {
wait(filled);
wait(mutex);
item = buffer.get();
signal(mutex);
signal(empty);
consume(item);
}
}
利用阻塞隊列可以免去自己實現(xiàn)同步機制的麻煩,從而非常方便地實現(xiàn)受啥。一個極簡的示例如下:
public class ProducerConsumerExample {
private static final int BUF_CAPACITY = 16;
public static void main(String[] args) {
BlockingQueue<Long> blockingQueue = new LinkedBlockingQueue<>(BUF_CAPACITY);
Thread producerThread = new Thread(() -> {
try {
while (true) {
long value = System.currentTimeMillis() % 1000;
blockingQueue.put(value);
Thread.sleep(value);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "producer");
Thread consumerThread = new Thread(() -> {
try {
while (true) {
System.out.println(blockingQueue.take());
Thread.sleep(System.currentTimeMillis() % 1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "consumer");
producerThread.start();
consumerThread.start();
}
}
一個凶龈觥(?)問題
在上面的代碼(以及j.u.c包中很多類的代碼)的方法體中滚局,經(jīng)常能看到類似以下的語句:
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
也就是有些類中定義的字段居暖,在方法中使用時會先賦值給一個局部變量。這樣做到底是為了什么藤肢?以目前我所了解到的而言太闺,還沒有特別確切的答案,但可以確定是一個非常微小的優(yōu)化嘁圈,與JVM及緩存有關(guān)省骂。
以下是reference傳送門:
- https://stackoverflow.com/questions/8019831/java-lock-variable-assignment-before-use-why
- https://stackoverflow.com/questions/2785964/in-arrayblockingqueue-why-copy-final-member-field-into-local-final-variable
- http://cs.oswego.edu/pipermail/concurrency-interest/2013-February/010768.html
順便蟀淮,StackOverflow最近(不知道是哪一天)改版成了1998年的樣式,滿滿的懷舊感钞澳。上面concurrency-interest郵件列表中關(guān)于這個問題也是眾說紛紜怠惶,如果仔細爬樓還會發(fā)現(xiàn)Doug Lea本人的回復(fù),不過有些令人費解略贮。