J.U.C之AQS-介紹
J.U.C即Java并發(fā)包(java.util.concurrent)挺勿,J.U.C中提供了很多并發(fā)工具。這其中,有很多我們耳熟能詳?shù)牟l(fā)工具烟阐,譬如ReentrangLock、Semaphore紊扬,而它們的實現(xiàn)都用到了一個共同的基類--AbstractQueuedSynchronizer(抽象隊列同步器)蜒茄,簡稱AQS。
AQS是JDK提供的一套用于實現(xiàn)基于FIFO等待隊列的阻塞鎖和相關(guān)的同步器的一個同步框架餐屎,它使用一個int類型的volatile變量(命名為state)來維護同步狀態(tài)檀葛,通過內(nèi)置的FIFO隊列來完成資源獲取線程的排隊工作。
AbstractQueuedSynchronizer中對state的操作是原子的腹缩,且不能被繼承屿聋。所有的同步機制的實現(xiàn)均依賴于對改變量的原子操作。為了實現(xiàn)不同的同步機制藏鹊,我們需要創(chuàng)建一個非共有的(non-public internal)擴展了AQS類的內(nèi)部輔助類來實現(xiàn)相應(yīng)的同步邏輯润讥。
AbstractQueuedSynchronizer并不實現(xiàn)任何同步接口,它提供了一些可以被具體實現(xiàn)類直接調(diào)用的一些原子操作方法來重寫相應(yīng)的同步邏輯盘寡。AQS同時提供了獨占模式(exclusive)和共享模式(shared)兩種不同的同步邏輯楚殿。一般情況下,子類只需要根據(jù)需求實現(xiàn)其中一種模式宴抚,當(dāng)然也有同時實現(xiàn)兩種模式的同步類勒魔,如ReadWriteLock。
使用AQS能簡單且高效地構(gòu)造出應(yīng)用廣泛的大量的同步器菇曲,比如我們提到的ReentrantLock冠绢,Semaphore,其他的諸如ReentrantReadWriteLock常潮,SynchronousQueue弟胀,F(xiàn)utureTask等等皆是基于AQS的。
當(dāng)然,我們自己也能利用AQS非常輕松容易地構(gòu)造出符合我們自己需求的同步器孵户,由此可知AQS是Java并發(fā)包中最為核心的一個基類萧朝。
AbstractQueuedSynchronizer底層數(shù)據(jù)結(jié)構(gòu)是一個雙向鏈表,屬于隊列的一種實現(xiàn):
- sync queue:同步隊列夏哭,其中head節(jié)點主要負(fù)責(zé)后面的調(diào)度
- Condition queue:單向鏈表检柬,不是必須的,只有程序中使用到Condition的時候才會存在竖配,可能會有多個Condition queue
關(guān)于AQS里的state狀態(tài):
我們提到了AbstractQueuedSynchronizer維護了一個volatile int類型的變量何址,命名為state,用于表示當(dāng)前同步狀態(tài)进胯。volatile雖然不能保證操作的原子性用爪,但是保證了當(dāng)前變量state的可見性。state的訪問方式有三種:
getState()
setState()
compareAndSetState()
這三種操作均是原子操作胁镐,其中compareAndSetState的實現(xiàn)依賴于Unsafe的compareAndSwapInt()方法偎血。
關(guān)于自定義資源共享方式:
AQS支持兩種資源共享方式:Exclusive(獨占,只有一個線程能執(zhí)行盯漂,如ReentrantLock)和Share(共享颇玷,多個線程可同時執(zhí)行,如Semaphore/CountDownLatch)宠能。這樣方便使用者實現(xiàn)不同類型的同步組件亚隙,獨占式如ReentrantLock磁餐,共享式如Semaphore违崇,CountDownLatch,組合式的如ReentrantReadWriteLock诊霹⌒哐樱總之,AQS為使用提供了底層支撐脾还,如何組裝實現(xiàn)伴箩,使用者可以自由發(fā)揮。
關(guān)于同步器設(shè)計:
同步器的設(shè)計是基于模板方法模式的鄙漏,一般的使用方式是這樣:
- 使用者繼承AbstractQueuedSynchronizer并重寫指定的方法嗤谚。(這些重寫方法很簡單,無非是對于共享資源state的獲取和釋放)
- 將AQS組合在自定義同步組件的實現(xiàn)中怔蚌,并調(diào)用其模板方法巩步,而這些模板方法會調(diào)用使用者重寫的方法。這其實是模板方法模式的一個很經(jīng)典的應(yīng)用桦踊。
不同的自定義同步器爭用共享資源的方式也不同椅野。自定義同步器在實現(xiàn)時只需要實現(xiàn)共享資源state的獲取與釋放方式即可,至于具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經(jīng)在底層實現(xiàn)好了竟闪。自定義同步器實現(xiàn)時主要實現(xiàn)以下幾種方法:
protected boolean isHeldExclusively() // 該線程是否正在獨占資源离福。只有用到condition才需要去實現(xiàn)它。
protected boolean tryAcquire(int) // 獨占方式炼蛤。嘗試獲取資源妖爷,成功則返回true,失敗則返回false理朋。
protected boolean tryRelease(int) // 獨占方式赠涮。嘗試釋放資源,成功則返回true暗挑,失敗則返回false笋除。
protected int tryAcquireShared(int) // 共享方式。嘗試獲取資源炸裆。負(fù)數(shù)表示失斃;0表示成功烹看,但沒有剩余可用資源国拇;正數(shù)表示成功,且有剩余資源惯殊。
protected boolean tryReleaseShared(int) // 共享方式酱吝。嘗試釋放資源,如果釋放后允許喚醒后續(xù)等待結(jié)點返回true土思,否則返回false务热。
如何使用:
首先,我們需要去繼承AbstractQueuedSynchronizer這個類己儒,然后我們根據(jù)我們的需求去重寫相應(yīng)的方法崎岂,比如要實現(xiàn)一個獨占鎖,那就去重寫tryAcquire闪湾,tryRelease方法冲甘,要實現(xiàn)共享鎖,就去重寫tryAcquireShared途样,tryReleaseShared江醇;最后,在我們的組件中調(diào)用AQS中的模板方法就可以了何暇,而這些模板方法是會調(diào)用到我們之前重寫的那些方法的陶夜。也就是說,我們只需要很小的工作量就可以實現(xiàn)自己的同步組件赖晶,重寫的那些方法律适,僅僅是一些簡單的對于共享資源state的獲取和釋放操作辐烂,至于像是獲取資源失敗,線程需要阻塞之類的操作捂贿,自然是AQS幫我們完成了纠修。
具體實現(xiàn)的思路:
- 首先AQS內(nèi)部維護了一個CLH隊列,來管理鎖
- 線程嘗試獲取鎖厂僧,如果獲取失敗扣草,則將等待信息等包裝成一個Node結(jié)點,加入到同步隊列Sync queue里
- 不斷重新嘗試獲取鎖(當(dāng)前結(jié)點為head的直接后繼才會嘗試)颜屠,如果獲取失敗辰妙,則會阻塞自己,直到被喚醒
- 當(dāng)持有鎖的線程釋放鎖的時候甫窟,會喚醒隊列中的后繼線程
設(shè)計思想:
對于使用者來講密浑,我們無需關(guān)心獲取資源失敗,線程排隊粗井,線程阻塞/喚醒等一系列復(fù)雜的實現(xiàn)尔破,這些都在AQS中為我們處理好了。我們只需要負(fù)責(zé)好自己的那個環(huán)節(jié)就好浇衬,也就是獲取/釋放共享資源state的姿勢懒构。很經(jīng)典的模板方法設(shè)計模式的應(yīng)用,AQS為我們定義好頂級邏輯的骨架耘擂,并提取出公用的線程入隊列/出隊列胆剧,阻塞/喚醒等一系列復(fù)雜邏輯的實現(xiàn),將部分簡單的可由使用者決定的操作邏輯延遲到子類中去實現(xiàn)即可醉冤。
基于AQS的同步組件:
- CountDownLatch
- Semaphore
- CyclicBarrier
- ReentrantLock
- Condition
- FutureTask
AQS小結(jié):
- 使用Node實現(xiàn)FIFO隊列秩霍,可以用于構(gòu)建鎖或者其他同步裝置的基礎(chǔ)框架
- 利用了一個int類型表示狀態(tài),有一個state的成員變量冤灾,表示獲取鎖的線程數(shù)(0沒有線程獲取鎖前域,1有線程獲取鎖,大于1表示重入鎖的數(shù)量)韵吨,和一個同步組件ReentrantLock。狀態(tài)信息通過procted級別的getState移宅,setState归粉,compareAndSetState進行操作
- 使用方法是繼承,然后復(fù)寫AQS中的方法漏峰,基于模板方法模式
- 子類通過繼承并通過實現(xiàn)它的方法管理其狀態(tài){acquire和release}的方法操作狀態(tài)
- 可以同時實現(xiàn)排它鎖和共享鎖的模式(獨占糠悼、共享)
CountDownLatch
CountDownLatch是一個同步工具類,它允許一個或多個線程一直等待浅乔,直到其他線程執(zhí)行完后再執(zhí)行倔喂。例如铝条,應(yīng)用程序的主線程希望在負(fù)責(zé)啟動框架服務(wù)的線程已經(jīng)啟動所有框架服務(wù)之后執(zhí)行。
CountDownLatch是通過一個計數(shù)器來實現(xiàn)的席噩,計數(shù)器的初始化值為線程的數(shù)量班缰。每當(dāng)一個線程完成了自己的任務(wù)后,計數(shù)器的值就相應(yīng)得減1悼枢。當(dāng)計數(shù)器到達(dá)0時埠忘,表示所有的線程都已完成任務(wù),然后在閉鎖上等待的線程就可以恢復(fù)執(zhí)行任務(wù)馒索。
CountDownLatch的構(gòu)造函數(shù)源碼如下:
/**
* Constructs a {@code CountDownLatch} initialized with the given count.
*
* @param count the number of times {@link #countDown} must be invoked
* before threads can pass through {@link #await}
* @throws IllegalArgumentException if {@code count} is negative
*/
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
計數(shù)器count是閉鎖需要等待的線程數(shù)量莹妒,只能被設(shè)置一次,且CountDownLatch沒有提供任何機制去重新設(shè)置計數(shù)器count绰上。
與CountDownLatch的第一次交互是主線程等待其他線程旨怠。主線程必須在啟動其他線程后立即調(diào)用CountDownLatch.await()方法。這樣主線程的操作就會在這個方法上阻塞蜈块,直到其他線程完成各自的任務(wù)运吓。
其他N個線程必須引用CountDownLatch閉鎖對象,因為它們需要通知CountDownLatch對象疯趟,它們各自完成了任務(wù)拘哨;這種通知機制是通過CountDownLatch.countDown()方法來完成的;每調(diào)用一次信峻,count的值就減1阅虫,因此當(dāng)N個線程都調(diào)用這個方法囱嫩,count的值就等于0,然后主線程就可以通過await()方法,恢復(fù)執(zhí)行自己的任務(wù)猜敢。
注:該計數(shù)器的操作是原子性的
CountDownLatch使用場景:
- 實現(xiàn)最大的并行性:有時我們想同時啟動多個線程,實現(xiàn)最大程度的并行性生棍。例如寺鸥,我們想測試一個單例類。如果我們創(chuàng)建一個初始計數(shù)器為1的CountDownLatch获印,并讓其他所有線程都在這個鎖上等待述雾,只需要調(diào)用一次countDown()方法就可以讓其他所有等待的線程同時恢復(fù)執(zhí)行。
- 開始執(zhí)行前等待N個線程完成各自任務(wù):例如應(yīng)用程序啟動類要確保在處理用戶請求前兼丰,所有N個外部系統(tǒng)都已經(jīng)啟動和運行了玻孟。
- 死鎖檢測:一個非常方便的使用場景是你用N個線程去訪問共享資源,在每個測試階段線程數(shù)量不同鳍征,并嘗試產(chǎn)生死鎖黍翎。
使用示例
1.基本用法:
@Slf4j
public class CountDownLatchExample1 {
private final static int THREAD_COUNT = 200;
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
final CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT);
for (int i = 0; i < THREAD_COUNT; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
test(threadNum);
} catch (InterruptedException e) {
log.error("", e);
} finally {
// 為防止出現(xiàn)異常,放在finally更保險一些
countDownLatch.countDown();
}
});
}
countDownLatch.await();
log.info("finish");
exec.shutdown();
}
private static void test(int threadNum) throws InterruptedException {
TimeUnit.MILLISECONDS.sleep(100);
log.info("{}", threadNum);
TimeUnit.MILLISECONDS.sleep(100);
}
}
2.比如有多個線程完成一個任務(wù)艳丛,但是這個任務(wù)只想給它一個指定的時間匣掸,超過這個任務(wù)就不繼續(xù)等待了趟紊,完成多少算多少:
// 等待指定的時間 參數(shù)1:等待時間,參數(shù)2:時間單位
countDownLatch.await(10, TimeUnit.MILLISECONDS);
關(guān)于CountDownLatch的其他例子可以參考我另一篇文章:
Semaphore
Semaphore(信號量)是用來控制同時訪問特定資源的線程數(shù)量碰酝,它通過協(xié)調(diào)各個線程霎匈,以保證合理的使用公共資源。很多年以來砰粹,我都覺得從字面上很難理解Semaphore所表達(dá)的含義唧躲,只能把它比作是控制流量的紅綠燈,比如XX馬路要限制流量碱璃,只允許同時有一百輛車在這條路上行使弄痹,其他的都必須在路口等待,所以前一百輛車會看到綠燈嵌器,可以開進這條馬路肛真,后面的車會看到紅燈,不能駛?cè)隭X馬路爽航,但是如果前一百輛中有五輛車已經(jīng)離開了XX馬路蚓让,那么后面就允許有5輛車駛?cè)腭R路,這個例子里說的車就是線程讥珍,駛?cè)腭R路就表示線程在執(zhí)行历极,離開馬路就表示線程執(zhí)行完成,看見紅燈就表示線程被阻塞衷佃,不能執(zhí)行趟卸。
所以簡單來說,Semaphore主要作用就是可以控制同一時間并發(fā)執(zhí)行的線程數(shù)氏义。Semaphore有兩個構(gòu)造函數(shù)锄列,參數(shù)permits表示許可數(shù),它最后傳遞給了AQS的state值惯悠。線程在運行時首先獲取許可邻邮,如果成功,許可數(shù)就減1克婶,線程運行筒严,當(dāng)線程運行結(jié)束就釋放許可,許可數(shù)就加1鸠补。如果許可數(shù)為0萝风,則獲取失敗,線程位于AQS的等待隊列中紫岩,它會被其它釋放許可的線程喚醒。在創(chuàng)建Semaphore對象的時候還可以指定它的公平性睬塌。一般常用非公平的信號量泉蝌,非公平信號量是指在獲取許可時先嘗試獲取許可歇万,而不必關(guān)心是否已有需要獲取許可的線程位于等待隊列中,如果獲取失敗勋陪,才會入列贪磺。而公平的信號量在獲取許可時首先要查看等待隊列中是否已有線程,如果有則入列诅愚。
使用場景:
Semaphore可以用于做流量控制寒锚,特別公用資源有限的應(yīng)用場景,比如數(shù)據(jù)庫連接违孝。假如有一個需求刹前,要讀取幾萬個文件的數(shù)據(jù),因為都是IO密集型任務(wù)雌桑,我們可以啟動幾十個線程并發(fā)的讀取喇喉,但是如果讀到內(nèi)存后,還需要存儲到數(shù)據(jù)庫中校坑,而數(shù)據(jù)庫的連接數(shù)只有10個拣技,這時我們必須控制只有十個線程同時獲取數(shù)據(jù)庫連接保存數(shù)據(jù),否則會報錯無法獲取數(shù)據(jù)庫連接耍目。這個時候膏斤,我們就可以使用Semaphore來做流控。
使用示例
1.每次獲取1個許可示例:
public class SemaphoreExample1 {
private final static int THREAD_COUNT = 200;
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(10);
for (int i = 0; i < THREAD_COUNT; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
// 獲取一個許可
semaphore.acquire();
System.out.println(threadNum);
// 釋放一個許可
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
exec.shutdown();
}
}
在代碼中邪驮,雖然有200個線程在執(zhí)行莫辨,但是只允許10個并發(fā)的執(zhí)行。Semaphore的構(gòu)造方法Semaphore(int permits)
接收一個整型的數(shù)字耕捞,表示可用的許可證數(shù)量衔掸。所以Semaphore(10)
表示允許10個線程獲取許可證,也就是最大并發(fā)數(shù)是10俺抽。Semaphore的用法也很簡單敞映,首先線程使用Semaphore的acquire()
獲取一個許可證,使用完之后調(diào)用release()
歸還許可證磷斧。還可以用tryAcquire()
方法嘗試獲取許可證振愿。
2.如何希望每次獲取多個許可的話,只需要在acquire()
方法的參數(shù)中進行指定即可弛饭,如下示例:
// 獲取多個許可
semaphore.acquire(3);
System.out.println(threadNum);
// 釋放多個許可
semaphore.release(3);
3.當(dāng)并發(fā)很高冕末,想要超過允許的并發(fā)數(shù)之后,就丟棄不處理的話侣颂,可以使用Semaphore里的tryAcquire()
方法嘗試獲取許可档桃,該方法返回boolean類型的值,我們可以通過判斷這個值來拋棄超過并發(fā)數(shù)的請求憔晒。如下示例:
public class SemaphoreExample3 {
private final static int THREAD_COUNT = 200;
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(10);
for (int i = 0; i < THREAD_COUNT; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
// 嘗試獲取一個許可藻肄,若沒有獲取到許可的線程就會被拋棄蔑舞,而不是阻塞
if (semaphore.tryAcquire()) {
System.out.println(threadNum);
// 釋放一個許可
semaphore.release();
}
} catch (Exception e) {
e.printStackTrace();
}
});
}
exec.shutdown();
}
}
Semaphore中嘗試獲取許可的相關(guān)方法:
我們可以指定嘗試獲取許可的超時時間,例如我設(shè)置超時時間為1秒:
// 嘗試獲取一個許可嘹屯,直到超過一秒
if (semaphore.tryAcquire(1, TimeUnit.SECONDS)) {
System.out.println(threadNum);
// 釋放一個許可
semaphore.release();
}
除此之外攻询,還可以嘗試獲取多個許可,并且指定超時時間:
// 嘗試獲取多個許可州弟,直到超過一秒
if (semaphore.tryAcquire(3, 1, TimeUnit.SECONDS)) {
System.out.println(threadNum);
// 釋放多個許可
semaphore.release(3);
}
Semaphore中其他一些常用的方法:
int availablePermits() // 返回此信號量中當(dāng)前可用的許可證數(shù)钧栖。
int getQueueLength() // 返回正在等待獲取許可證的線程數(shù)。
boolean hasQueuedThreads() // 是否有線程正在等待獲取許可證婆翔。
void reducePermits(int reduction) // 減少reduction個許可證拯杠。是個protected方法。
Collection getQueuedThreads() // 返回所有等待獲取許可證的線程集合浙滤。是個protected方法阴挣。
CyclicBarrier
CyclicBarrier 的字面意思是可循環(huán)使用(Cyclic)的屏障(Barrier)。它要做的事情是纺腊,讓一組線程到達(dá)一個屏障(也可以叫同步點)時被阻塞畔咧,直到最后一個線程到達(dá)屏障時,屏障才會開門揖膜,所有被屏障攔截的線程才會繼續(xù)干活誓沸。當(dāng)某個線程調(diào)用了await方法之后,就會進入等待狀態(tài)壹粟,并將計數(shù)器-1拜隧,直到所有線程調(diào)用await方法使計數(shù)器為0,才可以繼續(xù)執(zhí)行趁仙,由于計數(shù)器可以重復(fù)使用洪添,所以我們又叫他循環(huán)屏障。CyclicBarrier默認(rèn)的構(gòu)造方法是CyclicBarrier(int parties)雀费,其參數(shù)表示屏障攔截的線程數(shù)量干奢,每個線程調(diào)用await方法告訴CyclicBarrier我已經(jīng)到達(dá)了屏障,然后當(dāng)前線程被阻塞盏袄。
CyclicBarrier的應(yīng)用場景:
CyclicBarrier可以用于多線程計算數(shù)據(jù)忿峻,最后合并計算結(jié)果的應(yīng)用場景。比如我們用一個Excel保存了用戶所有銀行流水辕羽,每個Sheet保存一個帳戶近一年的每筆銀行流水逛尚,現(xiàn)在需要統(tǒng)計用戶的日均銀行流水,先用多線程處理每個sheet里的銀行流水刁愿,都執(zhí)行完之后绰寞,得到每個sheet的日均銀行流水,最后,再用barrierAction用這些線程的計算結(jié)果克握,計算出整個Excel的日均銀行流水蕾管。
CyclicBarrier和CountDownLatch的區(qū)別:
- CountDownLatch的計數(shù)器只能使用一次枷踏。而CyclicBarrier的計數(shù)器可以使用reset() 方法重置菩暗。所以CyclicBarrier能處理更為復(fù)雜的業(yè)務(wù)場景,比如如果計算發(fā)生錯誤旭蠕,可以重置計數(shù)器停团,并讓線程們重新執(zhí)行一次。
- CountDownLatch主要用于實現(xiàn)一個或n個線程需要等待其他線程完成某項操作之后掏熬,才能繼續(xù)往下執(zhí)行佑稠,描述的是一個或n個線程等待其他線程的關(guān)系,而CyclicBarrier是多個線程相互等待旗芬,知道滿足條件以后再一起往下執(zhí)行舌胶。描述的是多個線程相互等待的場景
- CyclicBarrier還提供其他有用的方法,比如getNumberWaiting方法可以獲得CyclicBarrier阻塞的線程數(shù)量疮丛。isBroken方法用來知道阻塞的線程是否被中斷幔嫂。
CyclicBarrier方法列表:
使用示例
1.基本使用:
@Slf4j
public class CyclicBarrierExample1 {
// 給定一個值,說明有多少個線程同步等待
private static CyclicBarrier barrier = new CyclicBarrier(5);
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int num = i;
// 延遲1秒誊薄,方便觀察
Thread.sleep(1000);
exec.execute(() -> {
try {
CyclicBarrierExample1.race(num);
} catch (Exception e) {
log.error("", e);
}
});
}
exec.shutdown();
}
private static void race(int num) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", num);
// 阻塞線程
barrier.await();
log.info("{} continue", num);
}
}
以防await無限阻塞進程履恩,我們可以設(shè)置await的超時時間,修改race方法代碼如下:
private static void race(int num) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", num);
try {
// 由于設(shè)置了超時時間后阻塞的線程可能會被中斷呢蔫,拋出BarrierException異常切心,如果想繼續(xù)往下執(zhí)行,需要加上try-catch
barrier.await(2000, TimeUnit.MILLISECONDS);
} catch (InterruptedException | TimeoutException | BrokenBarrierException e) {
// isBroken方法用來知道阻塞的線程是否被中斷
log.warn("exception occurred {} {}. isBroken : {}", e.getClass().getName(), e.getMessage(), barrier.isBroken());
}
log.info("{} continue", num);
}
如果希望當(dāng)所有線程到達(dá)屏障后就執(zhí)行一個runnable的話片吊,可以使用CyclicBarrier(int parties, Runnable barrierAction)
構(gòu)造函數(shù)傳遞一個runnable實例绽昏。如下示例:
/**
* 當(dāng)線程全部到達(dá)屏障時,優(yōu)先執(zhí)行這里傳入的runnable
*/
private static CyclicBarrier barrier = new CyclicBarrier(5, () -> log.info("callback is running"));
ReentrantLock
在Java里一共有兩類鎖俏脊,一類是synchornized同步鎖全谤,還有一種是JUC里提供的鎖Lock,Lock是個接口联予,其核心實現(xiàn)類就是ReentrantLock啼县。
synchornized與ReentrantLock的區(qū)別對比如下表:
對比維度 | synchornized | ReentrantLock |
---|---|---|
可重入性(線程進入鎖的時候計數(shù)器就自增1,計數(shù)器下降為0則會釋放鎖) | 可重入 | 可重入 |
鎖的實現(xiàn) | JVM實現(xiàn)沸久,很難操作源碼 | JDK實現(xiàn)季眷,可以觀察其源碼 |
性能 | 在引入偏向鎖、輕量級鎖/自旋鎖后性能大大提升卷胯,官方建議無特殊要求時盡量使用synchornized子刮,并且新版本的一些jdk源碼都由之前的ReentrantLock改成了synchornized | 與優(yōu)化后的synchornized相差不大 |
功能區(qū)別 | 方便簡潔,由編譯器負(fù)責(zé)加鎖和釋放鎖 | 需手工操作鎖的加鎖和釋放 |
鎖粒度 | 粗粒度,不靈活 | 細(xì)粒度挺峡,可靈活控制 |
可否指定公平鎖 | 不可以 | 可以 |
可否放棄鎖 | 不可以 | 可以 |
ReentrantLock實現(xiàn):
- 采用自旋鎖葵孤,循環(huán)調(diào)用CAS操作來實現(xiàn)加鎖,避免了使線程進入內(nèi)核態(tài)的阻塞狀態(tài)橱赠。想盡辦法避免線程進入內(nèi)核態(tài)的阻塞狀態(tài)尤仍,是我們分析和理解鎖設(shè)計的關(guān)鍵鑰匙。
ReentrantLock獨有的功能:
- 可指定是公平鎖還是非公平鎖狭姨,所謂公平鎖就是先等待的線程先獲得鎖
- 提供了一個Condition類宰啦,可以分組喚醒需要喚醒的線程
- 提供能夠中斷等待鎖的線程的機制,
lock.lockInterruptibly()
在ReentrantLock中饼拍,對于公平和非公平的定義是通過對同步器AQS的擴展加以實現(xiàn)的赡模,也就是在tryAcquire的實現(xiàn)上做了語義的控制。
這里提到一個鎖獲取的公平性問題师抄,如果在絕對時間上漓柑,先對鎖進行獲取的請求一定被先滿足,那么這個鎖是公平的叨吮,反之辆布,是不公平的,也就是說等待時間最長的線程最有機會獲取鎖挤安,也可以說鎖的獲取是有序的谚殊。ReentrantLock這個鎖提供了一個構(gòu)造函數(shù),能夠控制這個鎖是否是公平的蛤铜。
而鎖的名字也是說明了這個鎖具備了重復(fù)進入的可能嫩絮,也就是說能夠讓當(dāng)前線程多次的進行對鎖的獲取操作,這樣的最大次數(shù)限制是Integer.MAX_VALUE
围肥,約21億次左右剿干。
事實上公平的鎖機制往往沒有非公平的效率高,因為公平的獲取鎖沒有考慮到操作系統(tǒng)對線程的調(diào)度因素穆刻,這樣造成JVM對于等待中的線程調(diào)度次序和操作系統(tǒng)對線程的調(diào)度之間的不匹配置尔。對于鎖的快速且重復(fù)的獲取過程中,連續(xù)獲取的概率是非常高的氢伟,而公平鎖會壓制這種情況榜轿,雖然公平性得以保障,但是響應(yīng)比卻下降了朵锣,但是并不是任何場景都是以TPS作為唯一指標(biāo)的谬盐,因為公平鎖能夠減少“饑餓”發(fā)生的概率,等待越久的請求越是能夠得到優(yōu)先滿足诚些。
要放棄synchronized飞傀?
從上邊的介紹,看上去ReentrantLock不僅擁有synchronized的所有功能,而且有一些功能synchronized無法實現(xiàn)的特性砸烦。性能方面弃鸦,ReentrantLock也不比synchronized差,那么到底我們要不要放棄使用synchronized呢幢痘?答案是不要這樣做唬格。
J.U.C包中的鎖定類是用于高級情況和高級用戶的工具,除非說你對Lock的高級特性有特別清楚的了解以及有明確的需要雪隧,或這有明確的證據(jù)表明同步已經(jīng)成為可伸縮性的瓶頸的時候西轩,否則我們還是繼續(xù)使用synchronized。相比較這些高級的鎖定類脑沿,synchronized還是有一些優(yōu)勢的,比如synchronized不可能忘記釋放鎖马僻。還有當(dāng)JVM使用synchronized管理鎖定請求和釋放時庄拇,JVM在生成線程轉(zhuǎn)儲時能夠包括鎖定信息,這些信息對調(diào)試非常有價值韭邓,它們可以標(biāo)識死鎖以及其他異常行為的來源措近。
如何選擇鎖:
- 若業(yè)務(wù)邏輯需使用到鎖的高級功能去實現(xiàn),那么就可以選擇ReentrantLock
- 需要細(xì)粒度操作鎖時女淑,選擇ReentrantLock
- 對ReentrantLock的機制很了解瞭郑,有足夠經(jīng)驗?zāi)軌虮苊馑梨i的出現(xiàn)的開發(fā)者,可以選擇ReentrantLock鸭你,不建議對鎖機制不是很熟悉的開發(fā)者使用ReentrantLock
- 對鎖的需求較簡單屈张,使用synchornized
- 初級開發(fā)者建議使用synchornized
使用示例
基本使用:
@Slf4j
public class LockExample2 {
/**
* 請求總數(shù)
*/
public static int clientTotal = 5000;
/**
* 同時并發(fā)執(zhí)行的線程數(shù)量
*/
public static int threadTotal = 200;
/**
* 計數(shù)
*/
private static int count = 0;
/**
* 鎖對象,默認(rèn)是使用非公平鎖袱巨,可以傳入true和false來決定使用公平所還是非公平鎖
*/
private final static Lock LOCK = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
Semaphore semaphore = new Semaphore(threadTotal);
CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
executorService.execute(() -> {
try {
// 從信號量獲取執(zhí)行許可阁谆,若并發(fā)達(dá)到設(shè)定的數(shù)量,那么就不會獲取到許可愉老,將會阻塞當(dāng)前線程场绿,直到能夠獲取到執(zhí)行許可為止
semaphore.acquire();
LockExample2.add();
// 釋放當(dāng)前線程
semaphore.release();
} catch (InterruptedException e) {
log.error("", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("count: {}", count);
}
private static void add() {
// 加鎖
LOCK.lock();
try {
count++;
} finally {
// 解鎖
LOCK.unlock();
}
}
}
在ReentrantLock 中,lock()
方法是一個無條件的鎖嫉入,與synchronize意思差不多焰盗,但是另一個方法 tryLock()
方法只有在成功獲取了鎖的情況下才會返回true,如果別的線程當(dāng)前正持有鎖咒林,則會立即返回false熬拒。如果為這個方法加上timeout參數(shù),則會在等待timeout的時間才會返回false或者在獲取到鎖的時候返回true映九。
其他常用方法:
boolean isHeldByCurrentThread(); // 當(dāng)前線程是否保持鎖定
boolean isLocked() // 是否存在任意線程持有鎖資源
void lockInterruptbly() // 如果當(dāng)前線程未被中斷梦湘,則獲取鎖定;如果已中斷,則拋出異常(InterruptedException)
int getHoldCount() // 查詢當(dāng)前線程保持此鎖定的個數(shù)捌议,即調(diào)用lock()方法的次數(shù)
int getQueueLength() // 返回正等待獲取此鎖定的預(yù)估線程數(shù)
int getWaitQueueLength(Condition condition) // 返回與此鎖定相關(guān)的約定condition的線程預(yù)估數(shù)
boolean hasQueuedThread(Thread thread) // 當(dāng)前線程是否在等待獲取鎖資源
boolean hasQueuedThreads() // 是否有線程在等待獲取鎖資源
boolean hasWaiters(Condition condition) // 是否存在指定Condition的線程正在等待鎖資源
boolean isFair() // 是否使用的是公平鎖
Condition
Condition是一個多線程間協(xié)調(diào)通信的工具類哼拔,使得某個,或者某些線程一起等待某個條件(Condition)瓣颅,只有當(dāng)該條件具備( signal 或者 signalAll方法被調(diào)用)時 倦逐,這些等待線程才會被喚醒,從而重新爭奪鎖宫补。
Condition可以非常靈活的操作線程的喚醒檬姥,下面是一個線程等待與喚醒的例子,其中用1粉怕、2健民、3、4序號標(biāo)出了日志輸出順序:
@Slf4j
public class LockExample6 {
public static void main(String[] args) {
// 構(gòu)建ReentrantLock實例
ReentrantLock reentrantLock = new ReentrantLock();
// 從reentrantLock實例里獲取condition實例
Condition condition = reentrantLock.newCondition();
// 線程1
new Thread(() -> {
try {
// 線程1調(diào)用了lock方法贫贝,這時線程1就會加入到了AQS的等待隊里面去
reentrantLock.lock();
log.info("wait signal"); // 1 等待信號
// 調(diào)用await方法后秉犹,線程1就會從AQS隊列里移除,這里其實就已經(jīng)釋放了鎖稚晚,然后線程1會馬上進入到condition隊列里面去崇堵,等待一個信號
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("get signal"); // 4 得到信號
// 線程1釋放鎖,整個過程執(zhí)行完畢
reentrantLock.unlock();
}).start();
// 線程2
new Thread(() -> {
// 由于線程1中調(diào)用了await釋放了鎖的關(guān)系客燕,所以線程2就會被喚醒獲取到鎖鸳劳,加入到AQS等待隊列中
reentrantLock.lock();
log.info("get lock"); // 2 獲取鎖
try {
// 睡眠3秒
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 調(diào)用signalAll發(fā)送信號的方法,此時condition等待隊列里線程1所在的節(jié)點元素就會被取出也搓,然后重新放到AQS等待隊列里(注意此時線程1還沒有被喚醒)
condition.signalAll();
log.info("send signal ~ "); // 3 發(fā)送信號
// 線程2釋放鎖赏廓,這時候AQS隊列中只剩下線程1,然后AQS會按照從頭到尾的順序喚醒線程还绘,于是線程1開始執(zhí)行
reentrantLock.unlock();
}).start();
}
}
可以看到楚昭,整個協(xié)調(diào)通信的過程是靠線程所在的節(jié)點在AQS的等待隊列和condition的等待隊列中來回移動實現(xiàn)的。condition作為一個條件類很好的維護了一個等待信號的隊列拍顷,并在signal 或者 signalAll方法被調(diào)用后抚太,將等待的線程節(jié)點重新放回AQS的等待隊列中,從而實現(xiàn)喚醒線程的操作昔案。
ReentrantReadWriteLock
ReentrantReadWriteLock是Lock的另一種實現(xiàn)方式尿贫,我們已經(jīng)知道了ReentrantLock是一個排他鎖,同一時間只允許一個線程訪問踏揣,而ReentrantReadWriteLock允許多個讀線程同時訪問庆亡,但不允許寫線程和讀線程、寫線程和寫線程同時訪問捞稿。在沒有任何讀寫鎖的時候才能取得寫入的鎖又谋,可用于實現(xiàn)悲觀讀取拼缝。相對于排他鎖,提高了并發(fā)性彰亥。在實際應(yīng)用中咧七,大部分情況下對共享數(shù)據(jù)(如緩存)的訪問都是讀操作遠(yuǎn)多于寫操作,這時ReentrantReadWriteLock能夠提供比排他鎖更好的并發(fā)性和吞吐量任斋,所以讀寫鎖適用于讀多寫少的情況继阻。但讀多寫少的場景下可能會令寫入線程遭遇饑餓,即寫入線程遲遲無法獲取到鎖資源而處于等待狀態(tài)废酷。
與互斥鎖相比瘟檩,使用讀寫鎖能否提升性能則取決于讀寫操作期間讀取數(shù)據(jù)相對于修改數(shù)據(jù)的頻率,以及數(shù)據(jù)的爭用——即在同一時間試圖對該數(shù)據(jù)執(zhí)行讀取或?qū)懭氩僮鞯木€程數(shù)澈蟆。
讀寫鎖內(nèi)部維護了兩個鎖墨辛,一個用于讀操作,一個用于寫操作丰介。所有 ReadWriteLock實現(xiàn)都必須保證 writeLock操作的內(nèi)存同步效果也要保持與相關(guān) readLock的聯(lián)系背蟆。也就是說,成功獲取讀鎖的線程會看到寫入鎖之前版本所做的所有更新哮幢。
ReentrantReadWriteLock支持以下功能:
1.非公平模式(默認(rèn)):連續(xù)競爭的非公平鎖可能無限期地推遲一個或多個reader或writer線程,但吞吐量通常要高于公平鎖志珍。
2.公平模式:線程利用一個近似到達(dá)順序的策略來爭奪進入橙垢。當(dāng)釋放當(dāng)前保持的鎖時,可以為等待時間最長的單個writer線程分配寫入鎖伦糯,如果有一組等待時間大于所有正在等待的writer線程的reader柜某,將為該組分配讀者鎖。試圖獲得公平寫入鎖的非重入的線程將會阻塞敛纲,除非讀取鎖和寫入鎖都自由(這意味著沒有等待線程)喂击。
3.支持可重入。讀線程在獲取了讀鎖后還可以獲取讀鎖淤翔;寫線程在獲取了寫鎖之后既可以再次獲取寫鎖又可以獲取讀鎖
4.還允許從寫入鎖降級為讀取鎖翰绊,其實現(xiàn)方式是:先獲取寫入鎖,然后獲取讀取鎖旁壮,最后釋放寫入鎖监嗜。但是,從讀取鎖升級到寫入鎖是不允許的
5.讀取鎖和寫入鎖都支持鎖獲取期間的中斷
6.Condition支持抡谐。僅寫入鎖提供了一個 Conditon 實現(xiàn)裁奇;讀取鎖不支持 Conditon ,readLock().newCondition() 會拋出 UnsupportedOperationException麦撵。
7.監(jiān)測:此類支持一些確定是讀取鎖還是寫入鎖的方法刽肠。這些方法設(shè)計用于監(jiān)視系統(tǒng)狀態(tài)溃肪,而不是同步控制。
例如我現(xiàn)在有一個類音五,里面有一個map集合惫撰,我們都知道操作map時都是讀多寫少的,所以我希望在對其讀寫的時候能夠進行一些線程安全的保護放仗,這時我們就可以使用到ReentrantReadWriteLock润绎。示例代碼如下:
public class LockExample3 {
private final Map<String, Data> map = new TreeMap<>();
private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final Lock readLock = readWriteLock.readLock();
private final Lock writeLock = readWriteLock.writeLock();
public Data get(String key) {
// 讀鎖
readLock.lock();
try {
return map.get(key);
} finally {
readLock.unlock();
}
}
public Set<String> getAllKeys() {
// 讀鎖
readLock.lock();
try {
return map.keySet();
} finally {
readLock.unlock();
}
}
public Data put(String key, Data value) {
// 在沒有任何讀寫鎖的時候才會進行寫入操作
writeLock.lock();
try {
return map.put(key, value);
} finally {
writeLock.unlock();
}
}
class Data {
}
}
StempedLock
StampedLock是Java8引入的一種新的鎖機制,簡單的理解诞挨,可以認(rèn)為它是讀寫鎖的一個改進版本莉撇,讀寫鎖雖然分離了讀和寫的功能,使得讀與讀之間可以完全并發(fā)惶傻,但是讀和寫之間依然是沖突的棍郎,讀鎖會完全阻塞寫鎖,它使用的依然是悲觀的鎖策略银室。如果有大量的讀線程涂佃,它也有可能引起寫線程的饑餓。而StampedLock則提供了一種樂觀的讀策略蜈敢,這種樂觀策略的鎖非常類似于無鎖的操作辜荠,使得樂觀鎖完全不會阻塞寫線程。
StempedLock控制鎖有三種形式抓狭,分別是寫伯病,讀,和樂觀讀否过,重點在樂觀鎖午笛。一個StempedLock,狀態(tài)是由版本和模式兩個部分組成苗桂。鎖獲取的方法返回的是一個數(shù)字作為票據(jù)(Stempe)药磺,他用相應(yīng)的鎖狀態(tài)來表示并控制相關(guān)的訪問,數(shù)字0表示沒有寫鎖被授權(quán)訪問煤伟,在讀鎖上分為悲觀讀和樂觀讀癌佩。
所謂的樂觀讀模式,也就是若讀的操作很多持偏,寫的操作很少的情況下驼卖,你可以樂觀地認(rèn)為,寫入與讀取同時發(fā)生幾率很少鸿秆,因此不悲觀地使用完全的讀取鎖定酌畜,程序可以查看讀取資料之后,是否遭到寫入執(zhí)行的變更卿叽,再采取后續(xù)的措施(重新讀取變更信息桥胞,或者拋出異常) 恳守,這一個小小改進,可大幅度提高程序的吞吐量
適用場景:
樂觀讀取模式僅用于短時間讀取操作時經(jīng)常能夠降低競爭和提高吞吐量贩虾。當(dāng)然催烘,它的使用在本質(zhì)上是脆弱的。樂觀讀取的區(qū)域應(yīng)該只包括字段缎罢,并且在validation之后用局部變量持有它們從而在后續(xù)使用伊群。樂觀模式下讀取的字段值很可能是非常不一致的,所以它應(yīng)該只用于那些你熟悉如何展示數(shù)據(jù)策精,從而你可以不斷檢查一致性和調(diào)用方法validate
優(yōu)化點:
1.樂觀讀不阻塞悲觀讀和寫操作舰始,有利于獲得寫鎖
2.隊列頭結(jié)點采用有限次數(shù)SPINS次自旋(增加開銷),增加獲得鎖幾率(因為闖入的線程會競爭鎖)咽袜,有效夠降低上下文切換
3.讀模式的集合通過一個公共節(jié)點被聚集在一起(cowait鏈)丸卷,當(dāng)隊列尾節(jié)點為RMODE,通過CAS方法將該節(jié)點node添加至尾節(jié)點的cowait鏈中,node成為cowait中的頂元素询刹,cowait構(gòu)成了一個LIFO隊列谜嫉。
4.不支持鎖重入,如果只悲觀讀鎖和寫鎖凹联,效率沒有ReentrantReadWriteLock高沐兰。
基本使用示例:
public class LockExample5 {
private final static StampedLock LOCK = new StampedLock();
private static void add() {
// 加寫鎖
long stamp = LOCK.writeLock();
try {
count++;
} finally {
// 解鎖需要傳入加鎖時返回的stamp
LOCK.unlock(stamp);
}
}
}
其實在StempedLock的源碼中,提供了一段示例代碼蔽挠,但沒有相應(yīng)的注釋僧鲁,所以這里對該示例代碼給出一些注釋。如下:
class Point {
private double x, y;
private final StampedLock sl = new StampedLock();
void move(double deltaX, double deltaY) { // an exclusively locked method
long stamp = sl.writeLock();
try {
x += deltaX;
y += deltaY;
} finally {
sl.unlockWrite(stamp);
}
}
// 樂觀讀鎖案例
double distanceFromOrigin() { // A read-only method
long stamp = sl.tryOptimisticRead(); //獲得一個樂觀讀鎖
double currentX = x, currentY = y; //將兩個字段讀入本地局部變量
if (!sl.validate(stamp)) { //檢查發(fā)出樂觀讀鎖后同時是否有其他寫鎖發(fā)生象泵?
stamp = sl.readLock(); //如果沒有,我們再次獲得一個讀悲觀鎖
try {
currentX = x; // 將兩個字段讀入本地局部變量
currentY = y; // 將兩個字段讀入本地局部變量
} finally {
sl.unlockRead(stamp);
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
// 悲觀讀鎖案例
void moveIfAtOrigin(double newX, double newY) { // upgrade
// Could instead start with optimistic, not read mode
long stamp = sl.readLock();
try {
while (x == 0.0 && y == 0.0) { //循環(huán)斟叼,檢查當(dāng)前狀態(tài)是否符合
long ws = sl.tryConvertToWriteLock(stamp); //將讀鎖轉(zhuǎn)為寫鎖
if (ws != 0L) { //這是確認(rèn)轉(zhuǎn)為寫鎖是否成功
stamp = ws; //如果成功 替換票據(jù)
x = newX; //進行狀態(tài)改變
y = newY; //進行狀態(tài)改變
break;
} else { //如果不能成功轉(zhuǎn)換為寫鎖
sl.unlockRead(stamp); //我們顯式釋放讀鎖
stamp = sl.writeLock(); //顯式直接進行寫鎖 然后再通過循環(huán)再試
}
}
} finally {
sl.unlock(stamp); //釋放讀鎖或?qū)戞i
}
}
}
下圖是和ReadWritLock相比偶惠,在一個線程情況下,讀速度是其4倍左右朗涩,寫是1倍:
下圖是六個線程情況下忽孽,讀性能是其幾十倍,寫性能也是近10倍左右:
StampedLock 小結(jié):
StampedLock 對吞吐量有巨大的改進谢床,特別是在讀線程越來越多的場景下兄一。但StampedLock有一個復(fù)雜的API,對于加鎖操作识腿,很容易誤用其他方法出革。StampedLock 可以說是Lock的一個很好的補充,吞吐量以及性能上的提升足以打動很多人了渡讼,但并不是說要替代之前Lock的東西骂束,畢竟它還是有些應(yīng)用場景的耳璧,起碼API比StampedLock容易入手
總結(jié)關(guān)于鎖的幾個類:
- synchronized:JVM實現(xiàn),不但可以通過一些監(jiān)控工具監(jiān)控展箱,而且在出現(xiàn)未知異常的時候JVM也會自動幫我們釋放鎖
- ReentrantLock旨枯、ReentrantRead/WriteLock、StempedLock 他們都是對象層面的鎖定混驰,要想保證鎖一定被釋放攀隔,要放到finally里面,才會更安全一些栖榨。StempedLock對性能有很大的改進昆汹,特別是在讀線程越來越多的情況下。
如何使用:
- 在只有少量競爭者的時候治泥,synchronized是一個很好的鎖的實現(xiàn)
- 競爭者不少筹煮,但是增長量是可以預(yù)估的,ReentrantLock是一個很好的鎖的通用實現(xiàn)(適合使用場景的才是最好的居夹,不是越高級越好)
部分參考:
https://blog.csdn.net/luoyuyou/article/details/30259877
http://www.importnew.com/14941.html
由于篇幅有限败潦,AQS相關(guān)的組件先介紹到這,剩余的會在下一篇文章中介紹: