JAVA并發(fā)編程與高并發(fā)解決方案 - 并發(fā)編程 四
版本 | 作者 | 內(nèi)容 |
---|---|---|
2018.6.2 | chuIllusions | J.U.C之AQS |
相關(guān)文章
JAVA并發(fā)編程與高并發(fā)解決方案 - 并發(fā)編程 一 之 并發(fā)相關(guān)知識(shí)
JAVA并發(fā)編程與高并發(fā)解決方案 - 并發(fā)編程 二 之 線程安全性靴拱、安全發(fā)布對(duì)象
JAVA并發(fā)編程與高并發(fā)解決方案 - 并發(fā)編程 三 之 線程安全策略
JAVA并發(fā)編程與高并發(fā)解決方案 - 并發(fā)編程 五 之 J.U.C組件拓展
JAVA并發(fā)編程與高并發(fā)解決方案 - 并發(fā)編程 六 之 線程池
J.U.C 之 AQS
Introduction
AbStractQueuedSynchronizer類呐馆,簡(jiǎn)稱AQS项玛,一個(gè)用來(lái)構(gòu)建鎖和同步器的框架 疹味。從JDK1.5開(kāi)始辟汰,引入了并發(fā)包吕漂,也就是J.U.C惹苗,大大提高了JAVA程序的并發(fā)性能咬展,而AQS則是J.U.C的核心,是并發(fā)類中核心部分隘谣,它提供一個(gè)基于FIFO隊(duì)列增拥,這個(gè)隊(duì)列可以構(gòu)建鎖或其他相關(guān)的同步裝置的基礎(chǔ)框架啄巧。
AQS底層數(shù)據(jù)結(jié)構(gòu):
底層采用雙向鏈表,是隊(duì)列的一種實(shí)現(xiàn)掌栅,因此可以當(dāng)做是一個(gè)隊(duì)列秩仆。其中Sync queue
即同步隊(duì)列,它是雙向鏈表猾封,包括hean
結(jié)點(diǎn)(主要用作后續(xù)的調(diào)度)與tail
結(jié)點(diǎn)澄耍。Condition queue
不是必須的,單向鏈表晌缘,只有在需要使用到condition
的時(shí)候才會(huì)存在這個(gè)單向鏈表齐莲,并且可能存在多個(gè)Condition queue
Design
- 使用Node實(shí)現(xiàn)FIFO隊(duì)列,可以用于構(gòu)建鎖或者其他同步裝置的基礎(chǔ)框架
- 利用了一個(gè)int類型表示狀態(tài)磷箕。在AQS中选酗,存在一個(gè)
state
成員變量,基于AQS有一個(gè)同步組件ReentrantLock
岳枷,在這個(gè)組件中芒填,state
表示獲取鎖的線程數(shù),假如state == 0
表示無(wú)線程獲取鎖空繁,state == 1
表示已有線程獲取鎖殿衰,state > 1
表示鎖的數(shù)量 - 使用方法是繼承。AQS的設(shè)計(jì)是基于模板方法家厌,使用需要繼承AQS播玖,并覆寫其中的方法。
- 子類通過(guò)繼承并通過(guò)實(shí)現(xiàn)它的方法管理其狀態(tài){acquire() 和 release()}的方法操縱狀態(tài)
- 可以同時(shí)實(shí)現(xiàn)排它鎖和共享鎖模式(獨(dú)占饭于、共享)蜀踏。它的所有子類中,要么實(shí)現(xiàn)并使用它的獨(dú)占功能API掰吕,要么實(shí)現(xiàn)共享鎖的功能果覆,而不會(huì)同時(shí)使用兩套API。即便是它比較有名的子類
ReentrantReadWirteLock
也是通過(guò)兩個(gè)內(nèi)部類讀鎖和寫鎖分別使用兩套API實(shí)現(xiàn)的殖熟。AQS在功能上局待,有獨(dú)占控制和共享控制兩種功能。 - 在LOCK包中的相關(guān)鎖(常用的有ReentrantLock菱属、 ReadWriteLock)都是基于AQS來(lái)構(gòu)建.然而這些鎖都沒(méi)有直接來(lái)繼承AQS,而是定義了一個(gè)Sync類去繼承AQS钳榨,因?yàn)殒i面向的是使用用戶,而同步器面向的則是線程控制,那么在鎖的實(shí)現(xiàn)中聚合同步器而不是直接繼承AQS就可以很好的隔離二者所關(guān)注的事情.
基于以上設(shè)計(jì),AQS具體實(shí)現(xiàn)的大致思路:
AQS內(nèi)部維護(hù)了一個(gè)CLH隊(duì)列來(lái)管理鎖纽门,線程首先會(huì)嘗試獲取鎖薛耻,如果失敗,會(huì)將當(dāng)前線程以及等待狀態(tài)等信息包裝成Node結(jié)點(diǎn)加入同步隊(duì)列(Sync queue)中赏陵。接著不斷循環(huán)嘗試獲取鎖饼齿,條件是當(dāng)前結(jié)點(diǎn)為head直接后繼才會(huì)嘗試饲漾,如果失敗則會(huì)阻塞自己,直到自己被喚醒缕溉;而當(dāng)持有鎖的線程考传,釋放鎖的時(shí)候,會(huì)喚醒隊(duì)列中后繼線程证鸥×爬悖基于這些基礎(chǔ)的設(shè)計(jì)和思路,JDK提供了許多基于AQS的子類枉层。
獨(dú)占式鎖過(guò)程總結(jié):
AQS的模板方法acquire通過(guò)調(diào)用子類自定義實(shí)現(xiàn)的tryAcquire獲取同步狀態(tài)失敗后->將線程構(gòu)造成Node節(jié)點(diǎn)(創(chuàng)建一個(gè)獨(dú)占式節(jié)點(diǎn) )(addWaiter)->將Node節(jié)點(diǎn)添加到同步隊(duì)列對(duì)尾(addWaiter)->節(jié)點(diǎn)以自旋的方法獲取同步狀態(tài)(acquirQueued)镜硕。在節(jié)點(diǎn)自旋獲取同步狀態(tài)時(shí),只有其前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn)的時(shí)候才會(huì)嘗試獲取同步狀態(tài)返干,如果該節(jié)點(diǎn)的前驅(qū)不是頭節(jié)點(diǎn)或者該節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn)單獲取同步狀態(tài)失敗,則判斷當(dāng)前線程需要阻塞血淌,如果需要阻塞則需要被喚醒過(guò)后才返回矩欠。在釋放同步狀態(tài)時(shí),同步器調(diào)用tryRelease(int arg)方法釋放同步狀態(tài)悠夯,然后喚醒頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)癌淮。
共享式鎖過(guò)程總結(jié):
共享式獲取與獨(dú)占式獲取的最主要區(qū)別在于同一時(shí)刻能否有多個(gè)線程同時(shí)獲取到同步狀態(tài)。通過(guò)調(diào)用acquireShared(int arg)方法可以共享式得獲取同步狀態(tài)沦补。
同步器調(diào)用tryAcquireShared(int arg)
方法嘗試獲取同步狀態(tài)乳蓄,其返回值為int類型,當(dāng)返回值大于0時(shí)夕膀,表示能夠獲取同步狀態(tài)虚倒。因此,在共享式獲取的自旋過(guò)程中产舞,成功獲取同步狀態(tài)并且退出自旋的條件就是tryAcquireShared(int arg)
方法返回值大于等于0魂奥。共享式釋放同步狀態(tài)狀態(tài)是通過(guò)調(diào)用releaseShared(int arg)
方法
CountDownLatch
、ReentrantReadWriteLock
易猫、Semaphore
等都是共享式獲取同步狀態(tài)的耻煤。
同步隊(duì)列結(jié)構(gòu)分析
本小節(jié)內(nèi)容引用于AQS實(shí)現(xiàn)分析
同步器中包含了兩個(gè)節(jié)點(diǎn)類型的引用,一個(gè)指向頭節(jié)點(diǎn)(head)准颓,一個(gè)指向尾節(jié)點(diǎn)(tail),沒(méi)有獲取到鎖的線程哈蝇,加入到隊(duì)列的過(guò)程必須保證線程安全,因此同步器提供了一個(gè)基于CAS的設(shè)置尾節(jié)點(diǎn)的方法CompareAndSetTail(Node expect,Node update)
,它需要傳遞當(dāng)前線程認(rèn)為的尾節(jié)點(diǎn)和當(dāng)前節(jié)點(diǎn)攘已,只有設(shè)置成功后炮赦,當(dāng)前節(jié)點(diǎn)才能正式與之前的尾節(jié)點(diǎn)建立關(guān)聯(lián)。
同步器將結(jié)點(diǎn)加入到同步隊(duì)列的過(guò)程:
同步隊(duì)列遵循FIFO贯被,首節(jié)點(diǎn)是獲取鎖成功的節(jié)點(diǎn)眼五,首節(jié)點(diǎn)的線程在釋放鎖時(shí)妆艘,將會(huì)喚醒后繼節(jié)點(diǎn),而后繼節(jié)點(diǎn)將會(huì)在獲取到鎖時(shí)看幼,將自己設(shè)置位首節(jié)點(diǎn)批旺,設(shè)置首節(jié)點(diǎn)是由成功獲取鎖的線程來(lái)完成的,由于只有一個(gè)線程能夠成功獲取鎖诵姜,因此設(shè)置首節(jié)點(diǎn)不需要CAS操作汽煮。 過(guò)程如下所示:
同步組件概覽
- CountDownLatch:是閉鎖,通過(guò)一個(gè)計(jì)數(shù)來(lái)保證線程是否需要一直阻塞
- Semaphore:控制同一時(shí)間棚唆,并發(fā)線程的數(shù)目
- CyclicBarrier:和
CountDwonLatch
相似暇赤,能阻塞線程 - ReentrantLock
- Condition:使用時(shí)需要
ReentrantLock
- FutureTask
CountDownLatch
CountDownLatch
是一個(gè)同步輔助類,已在第一篇文章中提到宵凌。內(nèi)容通過(guò)截圖顯示:
構(gòu)造器中的計(jì)數(shù)值(count)實(shí)際上就是閉鎖需要等待的線程數(shù)量鞋囊。這個(gè)值只能被設(shè)置一次,而且CountDownLatch
沒(méi)有提供任何機(jī)制去重新設(shè)置這個(gè)計(jì)數(shù)值瞎惫。
典型的應(yīng)用:并行計(jì)算溜腐,當(dāng)某個(gè)任務(wù)需要處理運(yùn)算量非常大,可以將該運(yùn)算任務(wù)拆分為多個(gè)子任務(wù)瓜喇,等待所有的子任務(wù)完成之后挺益,父任務(wù)再拿到所有子任務(wù)的運(yùn)算結(jié)果進(jìn)行匯總。利用CountDownLatch
可以保證任務(wù)都被處理完才去執(zhí)行最終的結(jié)果運(yùn)算乘寒,過(guò)程中每一個(gè)線程都可以看做是一個(gè)子任務(wù)望众。
案例:
@Slf4j
public class CountDownLatchExample1 {
private final static int threadCount = 200;
public static void main(String[] args) throws Exception {
//定義線程池
ExecutorService exec = Executors.newCachedThreadPool();
//定義閉鎖實(shí)例
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
//每次放入一個(gè)線程
exec.execute(() -> {
try {
test(threadNum);
} catch (Exception e) {
log.error("exception", e);
} finally {
//計(jì)算器完成一次
countDownLatch.countDown();
}
});
}
countDownLatch.await();
//所有子任務(wù)執(zhí)行完后才會(huì)執(zhí)行
log.info("finish");
//線程池不再使用需要關(guān)閉
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
Thread.sleep(100);
log.info("{}", threadNum);
Thread.sleep(100);
}
}
CountDownLatch
還提供在指定時(shí)間內(nèi)完成的條件(超出時(shí)間沒(méi)有完成,完成多少算多少)伞辛,如果等待時(shí)間沒(méi)有完成烂翰,則繼續(xù)執(zhí)行。通過(guò)countDownLatch.await(int timeout,TimeUnit timeUnit);
設(shè)置,第一個(gè)參數(shù)沒(méi)超時(shí)時(shí)間,第二個(gè)參數(shù)為時(shí)間單位
@Slf4j
public class CountDownLatchExample2 {
private final static int threadCount = 200;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
test(threadNum);
} catch (Exception e) {
log.error("exception", e);
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await(10, TimeUnit.MILLISECONDS);
//線程未完成浩姥,就可以輸出以下信息
log.info("finish");
//執(zhí)行關(guān)閉線程池嘉抒,內(nèi)部先把所有正在工作的線程完成后,再關(guān)閉
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
//等待時(shí)間
Thread.sleep(100);
log.info("{}", threadNum);
}
}
Semaphore
INTRODUCTION
Semaphore
經(jīng)常用于限制獲取某種資源的線程數(shù)量,其內(nèi)部是基于AQS的共享模式,AQS的狀態(tài)表示許可證的數(shù)量,在許可證數(shù)量不夠時(shí)殿怜,線程將會(huì)被掛起;而一旦有一個(gè)線程釋放一個(gè)資源曙砂,那么就有可能重新喚醒等待隊(duì)列中的線程繼續(xù)執(zhí)行头谜。 已在第一篇文章中提到。內(nèi)容通過(guò)截圖顯示:
應(yīng)用場(chǎng)景
Semaphore可以用于做流量控制鸠澈,特別公用資源有限的應(yīng)用場(chǎng)景柱告,比如數(shù)據(jù)庫(kù)連接截驮。假如有一個(gè)需求,要讀取幾萬(wàn)個(gè)文件的數(shù)據(jù)际度,因?yàn)槎际荌O密集型任務(wù)葵袭,我們可以啟動(dòng)幾十個(gè)線程并發(fā)的讀取,但是如果讀到內(nèi)存后乖菱,還需要存儲(chǔ)到數(shù)據(jù)庫(kù)中坡锡,而數(shù)據(jù)庫(kù)的連接數(shù)只有10個(gè),這時(shí)我們必須控制只有十個(gè)線程同時(shí)獲取數(shù)據(jù)庫(kù)連接保存數(shù)據(jù)窒所,否則會(huì)報(bào)錯(cuò)無(wú)法獲取數(shù)據(jù)庫(kù)連接鹉勒。這個(gè)時(shí)候,我們就可以使用Semaphore來(lái)做流控
案例:
@Slf4j
public class SemaphoreExample1 {
//總共有20個(gè)線程數(shù)
private final static int threadCount = 20;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
//定義信號(hào)量吵取,并且制定每次可用的許可數(shù)量
final Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
semaphore.acquire(); // 獲取一個(gè)許可
test(threadNum);
semaphore.release(); // 釋放一個(gè)許可
} catch (Exception e) {
log.error("exception", e);
}
});
}
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
log.info("{}", threadNum);
Thread.sleep(1000);
}
}
從上面的輸出結(jié)果禽额,每次輸出一組日志,每一組日志有三條記錄皮官,直到所有線程執(zhí)行完畢绵疲。使用Semaphore
進(jìn)行并發(fā)的控制,使用相當(dāng)容易臣疑,但是效果很明顯。同時(shí)也支持獲取多個(gè)許可徙菠,以下例子即是一次只允許一個(gè)線程執(zhí)行:
@Slf4j
public class SemaphoreExample2 {
private final static int threadCount = 20;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
semaphore.acquire(3); // 獲取多個(gè)許可
test(threadNum);
//也可以分別釋放許可
semaphore.release(3); // 釋放多個(gè)許可
} catch (Exception e) {
log.error("exception", e);
}
});
}
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
log.info("{}", threadNum);
Thread.sleep(1000);
}
tryAcquire
嘗試獲取許可讯沈,如果獲取不成功,則放棄操作婿奔,tryAcquire
方法提供幾個(gè)重載
tryAcquire() : boolean
-
tryAcquire(int permits) : boolean
嘗試獲取指定數(shù)量的許可 tryAcquire(int permits,long timeout,TimeUnit timeUnit) : boolean
-
tryAcquire(long timeout,TimeUnit timeUnit) : boolean
嘗試獲取許可的時(shí)候可以等待一段時(shí)間缺狠,在指定時(shí)間內(nèi)未獲取到許可則放棄
@Slf4j
public class SemaphoreExample3 {
private final static int threadCount = 20;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
//如果獲取失敗,則不進(jìn)行操作
//semaphore.tryAcquire(5000,TimeUnit.MILLISECONDS)
if (semaphore.tryAcquire()) { // 嘗試獲取一個(gè)許可
test(threadNum);
semaphore.release(); // 釋放一個(gè)許可
}
} catch (Exception e) {
log.error("exception", e);
}
});
}
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
log.info("{}", threadNum);
Thread.sleep(1000);
}
}
源碼分析
Semaphore
有兩種模式萍摊,公平模式和非公平模式挤茄。公平模式就是調(diào)用acquire
的順序就是獲取許可證的順序,遵循FIFO
冰木;而非公平模式是搶占式的穷劈,也就是有可能一個(gè)新的獲取線程恰好在一個(gè)許可證釋放時(shí)得到了這個(gè)許可證,而前面還有等待的線程踊沸。
Semaphore
構(gòu)造函數(shù)
public class Semaphore implements java.io.Serializable {
/*
* 只指定許可量歇终,構(gòu)造不公平模式
*/
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
/*
* 指定許可量,并指定模式
*/
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
//Semaphore內(nèi)部基于AQS的共享模式逼龟,所以實(shí)現(xiàn)都委托給了Sync類评凝。
abstract static class Sync extends AbstractQueuedSynchronizer {}
/**
* NonFair version
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
// 可以看到調(diào)用了setState方法,也就是說(shuō)AQS中的資源就是許可證的數(shù)量腺律。
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
/**
* Fair version
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
// 可以看到調(diào)用了setState方法奕短,也就是說(shuō)AQS中的資源就是許可證的數(shù)量宜肉。
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
}
CyclicBarrier
Introduction
CyclicBarrier
也是一個(gè)同步輔助類,它允許一組線程相互等待翎碑, 直到到達(dá)某個(gè)公共的屏障點(diǎn)(common barrier point )谬返,也稱之為柵欄點(diǎn)。通過(guò)它可以完成多個(gè)線程之間相互等待杈女,只有當(dāng)每個(gè)線程都準(zhǔn)備就緒后朱浴,才能各自繼續(xù)進(jìn)行后面的操作。它和CountDownLatch
有相似的地方达椰,都是通過(guò)計(jì)數(shù)器實(shí)現(xiàn)翰蠢。當(dāng)某個(gè)線程調(diào)用await()
方法之后,該線程就進(jìn)入等待狀態(tài)啰劲,而且計(jì)數(shù)器是執(zhí)行加一操作梁沧,當(dāng)計(jì)數(shù)器值達(dá)到初始值(設(shè)定的值),因?yàn)檎{(diào)用await()
方法進(jìn)入等待的線程蝇裤,會(huì)被喚醒廷支,繼續(xù)執(zhí)行他們后續(xù)的操作。由于CyclicBarrier
在等待線程釋放之后栓辜,可以進(jìn)行重用恋拍,所以稱之為循環(huán)屏障。它非常適用于一組線程之間必需經(jīng)撑核Γ互相等待的情況施敢。
與CountDownLatch比較
相同點(diǎn):
- 都是同步輔助類。
- 使用計(jì)數(shù)器實(shí)現(xiàn)
不同點(diǎn):
-
CountDownLatch
允許一個(gè)或多個(gè)線程狭莱,等待其他一組線程完成操作僵娃,再繼續(xù)執(zhí)行。 -
CyclicBarrier
允許一組線程相互之間等待腋妙,達(dá)到一個(gè)共同點(diǎn)默怨,再繼續(xù)執(zhí)行。 -
CountDownLatch
不能被復(fù)用 -
CyclicBarrier
適用于更復(fù)雜的業(yè)務(wù)場(chǎng)景骤素,如計(jì)算發(fā)生錯(cuò)誤匙睹,通過(guò)重置計(jì)數(shù)器,并讓線程重新執(zhí)行 -
CyclicBarrier
還提供其他有用的方法济竹,比如getNumberWaiting
方法可以獲得CyclicBarrier
阻塞的線程數(shù)量垃僚。isBroken
方法用來(lái)知道阻塞的線程是否被中斷。
場(chǎng)景比較:
-
CyclicBarrier
: 好比一扇門规辱,默認(rèn)情況下關(guān)閉狀態(tài)谆棺,堵住了線程執(zhí)行的道路,直到所有線程都就位,門才打開(kāi)改淑,讓所有線程一起通過(guò)碍岔。 -
CyclicBarrier
可以用于多線程計(jì)算數(shù)據(jù),最后合并計(jì)算結(jié)果的應(yīng)用場(chǎng)景朵夏。比如我們用一個(gè)Excel保存了用戶所有銀行流水蔼啦,每個(gè)Sheet保存一個(gè)帳戶近一年的每筆銀行流水,現(xiàn)在需要統(tǒng)計(jì)用戶的日均銀行流水仰猖,先用多線程處理每個(gè)sheet里的銀行流水捏肢,都執(zhí)行完之后,得到每個(gè)sheet的日均銀行流水饥侵,最后鸵赫,再用barrierAction
用這些線程的計(jì)算結(jié)果,計(jì)算出整個(gè)Excel的日均銀行流水躏升。 -
CountDownLatch
: 監(jiān)考老師發(fā)下去試卷辩棒,然后坐在講臺(tái)旁邊玩著手機(jī)等待著學(xué)生答題,有的學(xué)生提前交了試卷膨疏,并約起打球了一睁,等到最后一個(gè)學(xué)生交卷了,老師開(kāi)始整理試卷佃却,貼封條
案例
@Slf4j
public class CyclicBarrierExample1 {
//定義屏障者吁,指定數(shù)量為5個(gè)
private static CyclicBarrier barrier = new CyclicBarrier(5);
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
//往線程池中放入線程
for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
//如果當(dāng)前線程就緒,則告訴CyclicBarrier 需要等待
barrier.await();
// 當(dāng)達(dá)到指定數(shù)量時(shí)饲帅,繼續(xù)執(zhí)行下面代碼
log.info("{} continue", threadNum);
}
}
await()
支持多個(gè)參數(shù)
@Slf4j
public class CyclicBarrierExample2 {
private static CyclicBarrier barrier = new CyclicBarrier(5);
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
// 需要異常處理砚偶,否則不能進(jìn)行等待后的代碼
try {
//等待時(shí)間,繼續(xù)執(zhí)行洒闸,但需要進(jìn)行異常的捕獲,才能繼續(xù)執(zhí)行
barrier.await(2000, TimeUnit.MILLISECONDS);
} catch (Exception e) {
//盡可能捕捉所有的異常類型
log.warn("BarrierException", e);
}
log.info("{} continue", threadNum);
}
}
構(gòu)造函數(shù)
public class CyclicBarrier {
/*
* 除了指定屏障數(shù)外均芽,指定一個(gè)Runnable任務(wù)丘逸,
* 意味著:在線程到達(dá)屏障時(shí),優(yōu)先執(zhí)行Runnable任務(wù)掀宋,方便處理更復(fù)雜的業(yè)務(wù)場(chǎng)景
*/
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
//常用的構(gòu)造函數(shù)
public CyclicBarrier(int parties) {
this(parties, null);
}
}
mport java.util.concurrent.Executors;
@Slf4j
public class CyclicBarrierExample3 {
private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
//當(dāng)達(dá)到線程屏障數(shù)5時(shí)深纲,執(zhí)行任務(wù)
//每滿足一次屏障數(shù),則執(zhí)行
log.info("callback is running");
});
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
barrier.await();
log.info("{} continue", threadNum);
}
}
ReentrantLock 與 鎖
JAVA中的鎖主要分兩類:synchronized
關(guān)鍵字 與 J.U.C所提供的鎖劲妙。J.U.C中核心的鎖即是ReentrantLock
湃鹊,本質(zhì)上都是lock與unlock的操作 。
ReentrantLock(可重入鎖)和synchronized區(qū)別
可重入性:
ReentrantLock
字面意思即為再進(jìn)入鎖镣奋,稱為可重入鎖币呵,其實(shí)synchronize
所使用的鎖也是可以重入的,兩者關(guān)于這個(gè)區(qū)別不打侨颈,它們都是同一個(gè)線程進(jìn)入一次余赢,鎖的計(jì)數(shù)器進(jìn)行自增芯义,要等到鎖的計(jì)數(shù)器下降為零時(shí),才能釋放鎖鎖的實(shí)現(xiàn):
synchronized
依賴于JVM實(shí)現(xiàn)無(wú)法了解底層源碼妻柒,而ReentrantLock
基于JDK實(shí)現(xiàn)通過(guò)閱讀源碼了解實(shí)現(xiàn)扛拨,區(qū)別就類似于操作系統(tǒng)控制實(shí)現(xiàn)與用戶使用代碼實(shí)現(xiàn)。性能區(qū)別:在
synchronized
優(yōu)化以前举塔,性能比ReentrantLock
差很多绑警,但自從synchronize
引入了偏向鎖、輕量級(jí)鎖(自選鎖)后 央渣,也就是自循鎖后计盒,兩者性能差不多(JDK1.6以后,為了減少獲得鎖和釋放鎖所帶來(lái)的性能消耗痹屹,提高性能章郁,引入了“輕量級(jí)鎖”和“偏向鎖”)。在兩種場(chǎng)景下都可以使用志衍,官方更推薦使用synchronized
暖庄,因?yàn)閷懛ǜ菀住?code>synchronized的優(yōu)化其實(shí)是借鑒了ReentrantLock
中的CAS技術(shù),都是試圖在用戶態(tài)就把加鎖問(wèn)題解決楼肪,避免進(jìn)入內(nèi)核態(tài)的線程阻塞培廓。-
功能區(qū)別:
- 便利性:
synchronized
更便利,它是由編譯器保證加鎖與釋放春叫。ReentrantLock
是需要手動(dòng)聲明與釋放鎖肩钠,所以為了避免忘記手工釋放鎖造成死鎖,所以最好在finally中聲明釋放鎖暂殖。 - 鎖的細(xì)粒度和靈活度:
ReentrantLock
優(yōu)于synchronized
- 便利性:
-
ReentrantLock獨(dú)有的功能
-
ReentrantLock
可以指定是公平鎖還是非公平鎖价匠,synchronized
只能是非公平鎖。(所謂公平鎖就是先等待的線程先獲得鎖) - 提供了一個(gè)Condition類呛每,可以分組喚醒需要喚醒的線程踩窖。不像是synchronized要么隨機(jī)喚醒一個(gè)線程,要么全部喚醒晨横。
- 提供能夠中斷等待鎖的線程的機(jī)制洋腮,通過(guò)
lock.lockInterruptibly()
實(shí)現(xiàn),這種機(jī)制ReentrantLock
是一種自選鎖手形,通過(guò)循環(huán)調(diào)用CAS操作來(lái)實(shí)現(xiàn)加鎖啥供。性能比較好的原因是避免了進(jìn)入內(nèi)核態(tài)的阻塞狀態(tài)。想進(jìn)辦法避免線程進(jìn)入內(nèi)核阻塞狀態(tài)库糠, 是我們分析和理解鎖設(shè)計(jì)的關(guān)鍵
-
如果滿足ReentrantLock
三個(gè)獨(dú)有的功能伙狐,那么必須使用ReentrantLock
。其他情況下可以根據(jù)性能、業(yè)務(wù)場(chǎng)景等等來(lái)選擇synchronized
還是ReentrantLock
是否要放棄synchronized
synchronized
能做的鳞骤,ReentrantLock
都能做窒百;而ReentrantLock
能做的,而synchronized
卻不一定做得了豫尽。性能方面篙梢,ReentrantLock
不比synchronized
差,那么要放棄使用synchronized
美旧?
- J.U.C包中的鎖定類是用于高級(jí)情況和高級(jí)用戶的工具渤滞,除非說(shuō)你對(duì)Lock的高級(jí)特性有特別清楚的了解以及有明確的需要,或這有明確的證據(jù)表明同步已經(jīng)成為可伸縮性的瓶頸的時(shí)候榴嗅,否則我們還是繼續(xù)使用synchronized
- 相比較這些高級(jí)的鎖定類妄呕,
synchronized
還是有一些優(yōu)勢(shì)的,比如synchronized不可能忘記釋放鎖嗽测。 在退出synchronized
塊時(shí)绪励,JVM會(huì)自動(dòng)釋放鎖,會(huì)很容易忘記要使用finally
釋放鎖唠粥,這對(duì)程序非常有害疏魏。 - 還有當(dāng)JVM使用
synchronized
管理鎖定請(qǐng)求和釋放時(shí),JVM在生成線程轉(zhuǎn)儲(chǔ)時(shí)能夠包括鎖定信息晤愧,這些信息對(duì)調(diào)試非常有價(jià)值大莫,它們可以標(biāo)識(shí)死鎖以及其他異常行為的來(lái)源。 而Lock
類知識(shí)普通的類官份,JVM不知道哪個(gè)線程具有Lock
對(duì)象只厘,而且?guī)缀趺總€(gè)開(kāi)發(fā)人員都是比較熟悉synchronized
案例
@Slf4j
@ThreadSafe
public class LockExample2 {
// 請(qǐng)求總數(shù)
public static int clientTotal = 5000;
// 同時(shí)并發(fā)執(zhí)行的線程數(shù)
public static int threadTotal = 200;
public static int count = 0;
//聲明鎖的實(shí)例,調(diào)用構(gòu)造方法,默認(rèn)生成一個(gè)不公平的鎖
private final static Lock lock = new ReentrantLock();
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal ; i++) {
executorService.execute(() -> {
try {
semaphore.acquire();
add();
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("count:{}", count);
}
private static void add() {
//上鎖
lock.lock();
try {
count++;
} finally {
//解鎖
lock.unlock();
}
}
}
ReentrantLock
提供了許多方法:
-
tryLock()
:僅在調(diào)用時(shí)鎖定未被另一個(gè)線程保持的情況下才獲取鎖定舅巷。 -
tryLock(long timeout, TimeUnit unit)
:如果鎖定在給定的時(shí)間內(nèi)沒(méi)有被另一個(gè)線程保持且當(dāng)前線程沒(méi)有被中斷羔味,則獲取這個(gè)鎖定。 -
lockInterruptbily()
:如果當(dāng)前線程沒(méi)有被中斷的話钠右,那么就獲取鎖定赋元。如果中斷了就拋出異常。 -
isLocked()
:查詢此鎖定是否由任意線程保持 -
isHeldByCurrentThread
:查詢當(dāng)前線程是否保持鎖定狀態(tài)爬舰。 -
isFair
:判斷是不是公平鎖
ReentrantReadWriteLock
public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
/** 內(nèi)部類提供的讀鎖 */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** 內(nèi)部類提供的讀鎖 */
private final ReentrantReadWriteLock.WriteLock writerLock;
}
我們可以看到RenntrantLock
提供了ReadLock
與WriteLock
,在沒(méi)有任何讀寫鎖時(shí)寒瓦,才可以取得寫入鎖情屹。如果進(jìn)行讀取時(shí),可能有另一個(gè)寫入的需求杂腰,為了保持同步垃你,讀取鎖定。
ReentrantReadWriteLock
寫鎖的互斥的,也就是說(shuō)惜颇,讀和讀是不互斥的皆刺,但讀和寫、寫和寫是互斥的凌摄。
在沒(méi)有任何讀寫鎖的時(shí)候才可以取得寫入鎖(悲觀讀取羡蛾,容易寫線程饑餓),也就是說(shuō)如果一直存在讀操作锨亏,那么寫鎖一直在等待沒(méi)有讀的情況出現(xiàn)痴怨,這樣我的寫鎖就永遠(yuǎn)也獲取不到,就會(huì)造成等待獲取寫鎖的線程饑餓器予。所以浪藻,此類不能亂用,在使用是一定要掌握其特性與實(shí)現(xiàn)方式乾翔。
ReentrantReadWriteLock
是Lock
的另一種實(shí)現(xiàn)方式爱葵,我們已經(jīng)知道了ReentrantLock
是一個(gè)排他鎖,同一時(shí)間只允許一個(gè)線程訪問(wèn)反浓,而ReentrantReadWriteLock
允許多個(gè)讀線程同時(shí)訪問(wèn)萌丈,但不允許寫線程和讀線程、寫線程和寫線程同時(shí)訪問(wèn)勾习。相對(duì)于排他鎖浓瞪,提高了并發(fā)性。在實(shí)際應(yīng)用中巧婶,大部分情況下對(duì)共享數(shù)據(jù)(如緩存)的訪問(wèn)都是讀操作遠(yuǎn)多于寫操作乾颁,這時(shí)ReentrantReadWriteLock
能夠提供比排他鎖更好的并發(fā)性和吞吐量。
@Slf4j
//使用場(chǎng)景并不多
public class LockExample3 {
//定義Map
private final Map<String, Data> map = new TreeMap<>();
//聲明讀寫鎖
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
//獲得讀寫鎖中的讀鎖
private final Lock readLock = lock.readLock();
//獲得讀寫鎖中的寫鎖
private final Lock writeLock = lock.writeLock();
//獲取
public Data get(String key) {
readLock.lock();
try {
return map.get(key);
} finally {
readLock.unlock();
}
}
//獲取
public Set<String> getAllKeys() {
readLock.lock();
try {
return map.keySet();
} finally {
readLock.unlock();
}
}
//寫
public Data put(String key, Data value) {
//可能導(dǎo)致線程饑餓艺栈,處于一直等待狀態(tài)
writeLock.lock();
try {
return map.put(key, value);
} finally {
readLock.unlock();
}
}
class Data {
}
}
StampedLock
在JDK1.8中英岭,新增 StampedLock
,它是ReentrantReadWriteLock
的增強(qiáng)版湿右,是為了解決ReentrantReadWriteLock
的一些不足诅妹。正因?yàn)?code>ReentrantReadWriteLock出現(xiàn)了讀和寫是互斥的情況,需要優(yōu)化毅人,因此就出現(xiàn)了StampedLock
吭狡!
它控制鎖有三種模式(寫、讀丈莺、樂(lè)觀讀)划煮。一個(gè)StempedLock
的狀態(tài)是由版本和模式兩個(gè)部分組成。鎖獲取方法返回一個(gè)數(shù)字作為票據(jù)(stamp)缔俄,他用相應(yīng)的鎖狀態(tài)表示并控制相關(guān)的訪問(wèn)弛秋。數(shù)字0表示沒(méi)有寫鎖被鎖寫訪問(wèn)器躏,在讀鎖上分為悲觀鎖和樂(lè)觀鎖。
樂(lè)觀讀: 如果讀的操作很多寫的很少蟹略,我們可以樂(lè)觀的認(rèn)為讀的操作與寫的操作同時(shí)發(fā)生的情況很少登失,因此不悲觀的使用完全的讀取鎖定。程序可以查看讀取資料之后是否遭到寫入資料的變更挖炬,再采取之后的措施揽浙。
它的思想是讀寫鎖中讀不僅不阻塞讀,同時(shí)也不應(yīng)該阻塞寫茅茂。 在讀的時(shí)候如果發(fā)生了寫捏萍,則應(yīng)當(dāng)重讀而不是在讀的時(shí)候直接阻塞寫。使用StampedLock
就可以實(shí)現(xiàn)一種無(wú)障礙操作空闲,即讀寫之間不會(huì)阻塞對(duì)方令杈,但是寫和寫之間還是阻塞的
在源碼中,提供一個(gè)使用StampedLock
案例
public class LockExample4 {
class Point {
private double x, y;
private final StampedLock sl = new StampedLock();
void move(double deltaX, double deltaY) { // an exclusively locked method
long stamp = sl.writeLock();
try {
x += deltaX;
y += deltaY;
} finally {
sl.unlockWrite(stamp);
}
}
//下面看看樂(lè)觀讀鎖案例
double distanceFromOrigin() { // A read-only method
long stamp = sl.tryOptimisticRead(); //獲得一個(gè)樂(lè)觀讀鎖
double currentX = x, currentY = y; //將兩個(gè)字段讀入本地局部變量
if (!sl.validate(stamp)) { //檢查發(fā)出樂(lè)觀讀鎖后同時(shí)是否有其他寫鎖發(fā)生碴倾?
stamp = sl.readLock(); //如果沒(méi)有逗噩,我們?cè)俅潍@得一個(gè)讀悲觀鎖
try {
currentX = x; // 將兩個(gè)字段讀入本地局部變量
currentY = y; // 將兩個(gè)字段讀入本地局部變量
} finally {
sl.unlockRead(stamp);
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
//下面是悲觀讀鎖案例
void moveIfAtOrigin(double newX, double newY) { // upgrade
// Could instead start with optimistic, not read mode
long stamp = sl.readLock();
try {
while (x == 0.0 && y == 0.0) { //循環(huán),檢查當(dāng)前狀態(tài)是否符合
long ws = sl.tryConvertToWriteLock(stamp); //將讀鎖轉(zhuǎn)為寫鎖
if (ws != 0L) { //這是確認(rèn)轉(zhuǎn)為寫鎖是否成功
stamp = ws; //如果成功 替換票據(jù)
x = newX; //進(jìn)行狀態(tài)改變
y = newY; //進(jìn)行狀態(tài)改變
break;
} else { //如果不能成功轉(zhuǎn)換為寫鎖
sl.unlockRead(stamp); //我們顯式釋放讀鎖
stamp = sl.writeLock(); //顯式直接進(jìn)行寫鎖 然后再通過(guò)循環(huán)再試
}
}
} finally {
sl.unlock(stamp); //釋放讀鎖或?qū)戞i
}
}
}
}
如何選擇鎖跌榔?
synchronized
是JVM層面的异雁,通過(guò)底層監(jiān)控工具監(jiān)控synchronized
的鎖定,出現(xiàn)異常會(huì)自動(dòng)釋放鎖僧须,JVM實(shí)現(xiàn)自動(dòng)的加鎖與解鎖纲刀。
Lock
是對(duì)象級(jí)的鎖定,要保證鎖一定要被釋放担平。StampedLock
對(duì)吞吐量有巨大的改進(jìn)示绊,特別是在讀線程越來(lái)越多的場(chǎng)景下。
- 當(dāng)只有少量競(jìng)爭(zhēng)者暂论,使用
synchronized
是很明智的選擇 - 競(jìng)爭(zhēng)者不少但是線程增長(zhǎng)的趨勢(shì)是能預(yù)估的面褐,使用
ReetrantLock
- 使用鎖一定要看是否適應(yīng)場(chǎng)景,并不是哪個(gè)高級(jí)用哪個(gè)取胎。
-
synchronized
不會(huì)引發(fā)死鎖展哭,如果Lock
使用不當(dāng)可能造成死鎖
Condition
Condition
是一個(gè)多線程間協(xié)調(diào)通信的工具類,在前面AQS底層數(shù)據(jù)結(jié)果分析時(shí)提到除了AQS自身隊(duì)列之外闻蛀,還有可能存在Condition
隊(duì)列(不存在或者存在一個(gè)以上匪傍,即多個(gè)等待隊(duì)列)。
使得某個(gè)觉痛,或者某些線程一起等待某個(gè)條件(Condition),只有當(dāng)該條件具備( signal 或者 signalAll方法被帶調(diào)用)時(shí) 役衡,這些等待線程才會(huì)被喚醒,從而重新?tīng)?zhēng)奪鎖秧饮。
Condition
是同步器AbstractQueuedSynchronized
的內(nèi)部類映挂,因?yàn)?code>Condition的操作需要獲取相關(guān)的鎖,所以作為同步器的內(nèi)部類比較合理盗尸。每個(gè)Condition
的關(guān)鍵柑船。
一個(gè) Condition
包含一個(gè)等待隊(duì)列,Condition
擁有首節(jié)點(diǎn)firstWaiter
和尾節(jié)點(diǎn)lastWaiter
泼各。當(dāng)前線程調(diào)用Condition.await()
方法時(shí)鞍时,將會(huì)以當(dāng)前線程構(gòu)造節(jié)點(diǎn),并將節(jié)點(diǎn)從尾部加入等待隊(duì)列扣蜻。
@Slf4j
public class LockExample6 {
public static void main(String[] args) {
ReentrantLock reentrantLock = new ReentrantLock();
//從ReentrantLock中取得Condition對(duì)象
//此時(shí)在AQS中生成Condition隊(duì)列(可以有多個(gè))
Condition condition = reentrantLock.newCondition();
//線程1
new Thread(() -> {
try {
//加入AQS的等待隊(duì)列里
reentrantLock.lock();
//輸出等待信號(hào)動(dòng)作
log.info("wait signal"); // 1
//線程1沉睡逆巍,從AQS等待隊(duì)列中移除,對(duì)應(yīng)的操作即是鎖的釋放莽使,然后加入Condition隊(duì)列中
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("get signal"); // 4
reentrantLock.unlock();
}).start();
//線程2
new Thread(() -> {
//因?yàn)榫€程1釋放鎖锐极,這時(shí)得到鎖
reentrantLock.lock();
log.info("get lock"); // 2
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//發(fā)送信號(hào),這時(shí)Condition隊(duì)列中有線程1的結(jié)點(diǎn)芳肌,被取出加入AQS等待隊(duì)列(注意灵再,線程1沒(méi)有被喚醒)
condition.signalAll();
log.info("send signal ~ "); // 3
//釋放鎖會(huì)喚醒AQS隊(duì)列
reentrantLock.unlock();
}).start();
}
/*
* Condition作為一個(gè)條件類,很好的維護(hù)了一個(gè)等待信號(hào)的隊(duì)列亿笤,并在適合的時(shí)候翎迁,將自身隊(duì)列中的
* 結(jié)點(diǎn)加入到AQS等待隊(duì)列中,實(shí)現(xiàn)喚醒操作净薛。使得某個(gè)線程等待某個(gè)條件汪榔,實(shí)際上使用很少
*/
}