概述
前段時(shí)間在解決請求風(fēng)控服務(wù)器超時(shí)的問題時(shí)夕凝,涉及到到一個(gè)CountDownLunch的并發(fā)工具類,非常實(shí)用,順記自然就去研究了一下相關(guān)的并發(fā)工具類谜喊。
在JDK的并發(fā)包里(java.util.concurrent)提供了這樣幾個(gè)非常有用的并發(fā)工具類來解決并發(fā)編程的流程控制磁携。分別是CountDownLatch轧简、CyclicBarrier和Semaphore。
1. CountDownLatch
1.1 CountDownLatch是什么?
CountDownLatch打多是被用在等待多線程完成嗡载,具體來說就是允許一個(gè)或多個(gè)線程等待其他線程完成操作窑多。
1.2 CountDownLatch原理?
API 文檔有這樣一盒解釋
A CountDownLatch is a versatile synchronization tool and can be used for a number of purposes. A CountDownLatch initialized with a count of one serves as a simple on/off latch, or gate: all threads invoking await wait at the gate until it is opened by a thread invoking countDown(). A CountDownLatch initialized to N can be used to make one thread wait until N threads have completed some action, or some action has been completed N times.
構(gòu)造函數(shù)
//Constructs a CountDownLatch initialized with the given count.
public void CountDownLatch(int count) {...}
在 CountDownLunch啟動(dòng)的時(shí)候鼻疮。主線程必須在啟動(dòng)其他線程后立即調(diào)用CountDownLatch.await()方法怯伊。這樣主線程的操作就會在這個(gè)方法上阻塞,直到其他線程完成各自的任務(wù)判沟。
public void countDown()
在每次任務(wù)執(zhí)行完直接調(diào)用耿芹,計(jì)數(shù)器就會減一操作。
public boolean await(long timeout,TimeUnit unit) throws InterruptedException
public void await() throws InterruptedException
這個(gè)方法就是用來堵塞主線程的挪哄,前者是有等待時(shí)間的吧秕,可以自定義,后者是無限等待迹炼,知道其他count 計(jì)數(shù)器為0為止砸彬。
詳細(xì)的 demo 就不在這里粘貼了
如有需要傳送門
1.3 使用場景
超時(shí)機(jī)制
主線程里面設(shè)置好等待時(shí)間,如果發(fā)現(xiàn)在規(guī)定時(shí)間內(nèi)還是沒有返回結(jié)果斯入,那就喚醒主線程砂碉,拋棄。
開始執(zhí)行前等待n個(gè)線程完成各自任務(wù)
例如應(yīng)用程序啟動(dòng)類要確保在處理用戶請求前刻两,所有N個(gè)外部系統(tǒng)已經(jīng)啟動(dòng)和運(yùn)行了增蹭。
死鎖檢測
一個(gè)非常方便的使用場景是,你可以使用n個(gè)線程訪問共享資源磅摹,在每次測試階段的線程數(shù)目是不同的滋迈,并嘗試產(chǎn)生死鎖霎奢。
若有不正之處請多多諒解,并歡迎各位大牛批評指正饼灿。
1.4 深入源碼
這里面我簡單的研究了一下CountDownLunch 源碼幕侠。
底層是由AbstractQueuedSynchronizer提供支持(后面就簡稱 AQS),所以其數(shù)據(jù)結(jié)構(gòu)就是AQS的數(shù)據(jù)結(jié)構(gòu)碍彭,而AQS的核心就是兩個(gè)虛擬隊(duì)列:同步隊(duì)列syncQueue 和條件隊(duì)列conditionQueue(前者數(shù)據(jù)結(jié)構(gòu)是雙向鏈表晤硕,后者是單向鏈表)不同的條件會有不同的條件隊(duì)列。
本省CountDownLunch繼承的是 Object硕旗,比較簡單窗骑,但是存在內(nèi)部類,Sync漆枚,繼承自AbstractQueuedSynchronizer,我簡單理解一下
private static final class Sync extends AbstractQueuedSynchronizer {
// 版本號
private static final long serialVersionUID = 4982264981922014374L;
// 構(gòu)造器
Sync(int count) {
setState(count);
}
// 返回當(dāng)前計(jì)數(shù)
int getCount() {
return getState();
}
// 試圖在共享模式下獲取對象狀態(tài)
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
// 試圖設(shè)置狀態(tài)來反映共享模式下的一個(gè)釋放
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
// 無限循環(huán)
for (;;) {
// 獲取狀態(tài)
int c = getState();
if (c == 0) // 沒有被線程占有
return false;
// 下一個(gè)狀態(tài)
int nextc = c-1;
if (compareAndSetState(c, nextc)) // 比較并且設(shè)置成功
return nextc == 0;
}
}
}
1.4.1核心函數(shù)分析
- await函數(shù)
此函數(shù)將會使當(dāng)前線程在鎖存器倒計(jì)數(shù)至零之前一直等待创译,除非線程被中斷。其源碼如下
public void await() throws InterruptedException{
// 轉(zhuǎn)發(fā)到sync對象上
sync.acquireSharedInterruptibly(1);
}
源碼可知墙基,對CountDownLatch對象的await的調(diào)用會轉(zhuǎn)發(fā)為對Sync的acquireSharedInterruptibly(從AQS繼承的方法)方法的調(diào)用软族。
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
這里先檢測了線程中斷狀態(tài),中斷了則拋出異常残制,接下來調(diào)用tryAcquireShared立砸,tryAcquireShared是Syn的實(shí)現(xiàn)的
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
其實(shí)就是簡單的獲取了同步器的state,判斷是否為0.
接下來是
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
關(guān)鍵點(diǎn) 我看到的是
parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
執(zhí)行到此處時(shí)初茶,線程會阻塞颗祝,知道有其他線程喚醒此線程,執(zhí)行await之后恼布,上文中的主線程阻塞在這螺戳。
- countDown函數(shù)
此函數(shù)將遞減鎖存器的計(jì)數(shù),如果計(jì)數(shù)到達(dá)零折汞,則釋放所有等待的線程
void countDown() {
sync.releaseShared(1);
}
可以看出 對countDown的調(diào)用轉(zhuǎn)換為對Sync對象的releaseShared(從AQS繼承而來)方法的調(diào)用倔幼。
這里面的具體原理能力有限,有點(diǎn)看不懂爽待,CAS相關(guān)的東西损同。
1.5 小結(jié)
不得不說countdownlatch是一個(gè)很高的線程控制工具,極大的方便了我們開發(fā)鸟款。由于知識能力有限膏燃,上面是自己的一點(diǎn)見識,有什么錯(cuò)誤還望提出何什,便于我及時(shí)改進(jìn)组哩。