一政敢、概述
Java并發(fā)編程核心在于java.concurrent.util包,而juc當(dāng)中的大多數(shù)同步器實(shí)現(xiàn)都是圍繞著共同的基礎(chǔ)行為揣非,比如等待隊(duì)列、條件隊(duì)列躲因、獨(dú)占獲取、共享獲取等忌傻,而這個(gè)行為的抽象就是基于AbstractQueuedSynchronizer簡(jiǎn)稱AQS大脉,AQS是定義了一套多線程訪問(wèn)共享資源的同步器框架,是一個(gè)依賴狀態(tài)(state)的同步器水孩。
AQS具備特性:阻塞等待隊(duì)列镰矿、共享/獨(dú)占、公平/非公平俘种、可重入秤标、允許中斷。
注意:像ReentrantLock宙刘、Semaphore苍姜、Barrier等等框架都有一個(gè)內(nèi)部類Sync同步器,Sync繼承了AQS框架悬包。
注意:等待隊(duì)列衙猪、條件隊(duì)列讓JUC有了公平鎖和非公平鎖;獨(dú)占獲取布近、共享獲取讓讓JUC有了獨(dú)占鎖(排他鎖)和共享鎖垫释;狀態(tài)(state)讓JUC有了重入鎖。
應(yīng)用
AQS內(nèi)部維護(hù)屬性volatile int state (32位)撑瞧,state表示資源的可用狀態(tài)棵譬;內(nèi)部定義兩種隊(duì)列:同步等待隊(duì)列、條件等待隊(duì)列预伺;內(nèi)部類Sync繼承AQS订咸,AQS所有調(diào)用都映射到Sync對(duì)應(yīng)的方法琅束。
State三種訪問(wèn)方式
getState()、setState()算谈、compareAndSetState()
AQS定義兩種資源共享方式
Exclusive-獨(dú)占:只有一個(gè)線程能執(zhí)行涩禀,如ReentrantLock。
Share-共享:多個(gè)線程可以同時(shí)執(zhí)行然眼,如Semaphore/CountDownLatch艾船。
二、ReentrantLock
ReentrantLock是基于AQS實(shí)現(xiàn)的獨(dú)占鎖高每,如下圖屿岂,有內(nèi)部類Sync,Sync繼承AQS框架鲸匿,關(guān)鍵變量state記錄上鎖次數(shù)爷怀;exclusiveOwnerThread記錄當(dāng)前占有鎖的線程;CLH變種隊(duì)列BlockingQueue(雙向鏈表阻塞隊(duì)列)也是同步隊(duì)列存儲(chǔ)正在等待的線程带欢。
注意:原生的CLH隊(duì)列里面的線程是自旋的运授,不會(huì)丟掉CPU的使用權(quán)。CLH隊(duì)列變種是基于原生的修改乔煞,里面線程處于阻塞狀態(tài)吁朦。
CLH隊(duì)列的是由AQS框架中的Node對(duì)象構(gòu)建成的雙向鏈表,Node對(duì)象有以下重要屬性:
static final Node SHARED:標(biāo)志以共享節(jié)點(diǎn)入隊(duì)列渡贾。
static final Node EXCLUSIVE:標(biāo)志以獨(dú)占節(jié)點(diǎn)入隊(duì)列逗宜。
static final int CANCELLED:標(biāo)志線程中斷狀態(tài)。
static final int SIGNAL:標(biāo)志線程可喚醒空骚。
static final int CONDITION:標(biāo)志Node為條件隊(duì)列纺讲。
static final int PROPAGATE:標(biāo)志線程喚醒可傳播,共享模式下使用囤屹。
volatile int waitStatus:CANCELLED熬甚、SIGNAL、CONDITION牺丙、PROPAGATE信號(hào)量则涯。
volatile Thread thread:線程。
volatile Node prev:前驅(qū)節(jié)點(diǎn)冲簿,使用在CLH同步隊(duì)列中粟判。
volatile Node next:后繼節(jié)點(diǎn),使用在CLH同步隊(duì)列中峦剔。
Node nextWaiter:后繼節(jié)點(diǎn)档礁,使用在條件隊(duì)列中。
注意1:若為條件隊(duì)列吝沫,那么Node必須為EXCLUSIVE獨(dú)占模式呻澜,不能是共享模式递礼。
注意2:CLH同步隊(duì)列為雙向鏈表,條件隊(duì)列為單向鏈表羹幸。
應(yīng)用
ReentrantLock lock = new ReentrantLock(boolean flag)脊髓;
true:創(chuàng)建公平鎖,CLH隊(duì)列不為空栅受,新來(lái)的線程直接進(jìn)入隊(duì)列等待将硝。
false:創(chuàng)建非公平鎖,CLH隊(duì)列不為空屏镊,新來(lái)的線程直接去爭(zhēng)搶鎖依疼,爭(zhēng)搶不到再去隊(duì)列等待。
爭(zhēng)搶鎖過(guò)程如上圖而芥,若線程搶鎖成功律罢,會(huì)以CAS方式修改AQS框架中volatile int state的值,然后修改exclusiveOwnerThread指向當(dāng)前線程棍丐;若線程搶鎖失敗误辑,創(chuàng)建一個(gè)EXCLUSIVE Node,然后以CAS方式添加到CLH隊(duì)列尾部進(jìn)行阻塞骄酗,其中CAS原理是基于java unsafe魔術(shù)類來(lái)實(shí)現(xiàn)的稀余,AQS線程阻塞也是java unsafe實(shí)現(xiàn),阻塞和解鎖分別是unsafe.park()與unsafe.unpark()趋翻。
線程執(zhí)行完畢,也會(huì)以CAS方式修改volatile int state盒蟆,然后修改設(shè)置exclusiveOwnerThread為空踏烙,最后去喚醒隊(duì)列中的第一個(gè)線程節(jié)點(diǎn),與notify和notifyAll不同历等,不會(huì)隨機(jī)喚醒一個(gè)線程和喚醒所有的線程節(jié)點(diǎn)讨惩。
總結(jié):AQS中,涉及大量的死循環(huán)配合CAS和volatile進(jìn)行操作寒屯,大量的并發(fā)會(huì)觸使CAS和volatile操作失敗荐捻,只有CAS操作返回true才能跳出循環(huán)。
三寡夹、Semaphore
Semaphore 字面意思是信號(hào)量的意思处面,它的作用是控制訪問(wèn)特定資源的線程數(shù)目。Semaphore 在AQS中菩掏,屬于共享鎖魂角,使用場(chǎng)景:資源訪問(wèn),服務(wù)限流智绸。
如上圖野揪,在ReentrantLock中volatile int state為獨(dú)占鎖獨(dú)占的標(biāo)志访忿,但在Semaphore中state為共享鎖的資源總數(shù)。
應(yīng)用
Semaphore 屬于共享鎖斯稳,默認(rèn)創(chuàng)建非公平鎖海铆。調(diào)用Semaphore構(gòu)造方法可以創(chuàng)建一個(gè)共享鎖。
構(gòu)造方法
public Semaphore(int permits)
public Semaphore(int permits, boolean fair)
其中permits 表示許可線程的數(shù)量挣惰,fair 表示公平性卧斟,默認(rèn)為false,如果這個(gè)設(shè)為 true 的話通熄,下次執(zhí)行的線程會(huì)是等待最久的線程唆涝。
主要方法
public void acquire() throws InterruptedException
public void release()
tryAcquire(long timeout, TimeUnit unit)
acquire() 表示阻塞并獲取許可,volatile int state CAS操作減1唇辨。
release() 表示釋放許可廊酣,volatile int state CAS操作加1。
Semaphore semaphore = new Semaphore(5);
for(int i = 0; i<10; i++){
new Thread(new Task(semaphore,"lin"+i)).start();
}
//Task類的run方法
public void run() {
try
{
semaphore.acquire();
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName() + "處理結(jié)束...");
}
catch (Exception ex)
{
System.out.println(ex.getMessage());
}
finally {
semaphore.release();
}
}
四赏枚、CountDownLatch
CountDownLatch這個(gè)類能夠使一個(gè)線程等待其他線程完成各自的工作后再執(zhí)行亡驰。例如,應(yīng)用程序的主線程希望在負(fù)責(zé)啟動(dòng)框架服務(wù)的線程已經(jīng)啟動(dòng)所有的框架服務(wù)之后再執(zhí)行饿幅。
CountDownLatch是通過(guò)一個(gè)計(jì)數(shù)器來(lái)實(shí)現(xiàn)的凡辱,計(jì)數(shù)器的初始值為線程的數(shù)量。每當(dāng)一個(gè)線程完成了自己的任務(wù)后栗恩,計(jì)數(shù)器的值就會(huì)減1透乾。當(dāng)計(jì)數(shù)器值到達(dá)0時(shí),它表示所有的線程已經(jīng)完成了任務(wù)磕秤,然后在閉鎖上等待的線程就可以恢復(fù)執(zhí)行任務(wù)乳乌。
應(yīng)用
CountDownLatch屬于共享鎖,由主線程把任務(wù)分配給多個(gè)其它的線程進(jìn)行執(zhí)行市咆,其它的線程執(zhí)行完畢后汉操,再回到主線程這邊來(lái),這里的任務(wù)可以是不同類型的蒙兰。
構(gòu)造方法
public CountDownLatch(int count)
count為任務(wù)的數(shù)量磷瘤。
主要方法
CountDownLatch.countDown();
CountDownLatch.await()搜变;
countDown():volatile int state CAS操作減1采缚。
await():主線程匯總等待。
static CountDownLatch countDownLatchLock = new CountDownLatch(2);
//主線程
public static void main(String[] args) throws InterruptedException {
new SeeDoctorThread().start();
new QueueGetMedThread().start();
countDownLatchLock.await();
//最后集合匯總痹雅,一起回家
System.out.println("男生女生門口集合仰担,開車一起回家");
}
// 分工合作,不分先后
/**
* 女生上廁所線程
*/
static class SeeDoctorThread extends Thread
{
@Override
public void run() {
try {
Thread.sleep(7*1000);
System.out.println("女生上完廁所!");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
countDownLatchLock.countDown();
}
}
}
/**
* 男生上廁所線程
*/
static class QueueGetMedThread extends Thread
{
@Override
public void run() {
try {
Thread.sleep(3*1000);
System.out.println("男生上完廁所摔蓝!");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
countDownLatchLock.countDown();
}
}
}
注意:CountDownLatch 的使用是一次性的赂苗,用完即止,不可重復(fù)使用贮尉。
五拌滋、CyclicBarrier
柵欄屏障,讓一組線程到達(dá)一個(gè)屏障(也可以叫同步點(diǎn))時(shí)被阻塞猜谚,直到最后一個(gè)線程到達(dá)屏障時(shí)败砂,屏障才會(huì)開門,所有被屏障攔截的線程才會(huì)繼續(xù)運(yùn)行魏铅。CyclicBarrier默認(rèn)的構(gòu)造方法是CyclicBarrier(int parties)昌犹,其參數(shù)表示屏障攔截的線程數(shù)量,每個(gè)線程調(diào)用await方法告CyclicBarrier我已經(jīng)到達(dá)了屏障览芳,然后當(dāng)前線程被阻塞斜姥。
應(yīng)用
CyclicBarrier屬于共享鎖,任務(wù)分配給多個(gè)其它的線程進(jìn)行執(zhí)行沧竟,其它的線程執(zhí)行完畢后铸敏,再回到主線程這邊來(lái),這里的任務(wù)可以是不同類型的悟泵。
構(gòu)造方法
public CyclicBarrier(int parties, Runnable barrierAction)
parties為任務(wù)的數(shù)量杈笔。
主要方法
cyclicBarrier.await()
等待其它線程。
//游戲五人開黑
static CyclicBarrier barrierLock = new CyclicBarrier(5, new Runnable() {
@Override
public void run() {
System.out.println("開始游戲...");
}
});
public static void main(String[] args) throws InterruptedException {
//4臺(tái)頂級(jí)配置電腦 1臺(tái)普通配置電腦
for(int i = 0; i < 4; i++)
{
new LoadingGameThread_1().start();
}
new LoadingGameThread_2().start();
}
/**
* 玩家頂級(jí)配置電腦進(jìn)入游戲線程
*/
static class LoadingGameThread_1 extends Thread
{
@Override
public void run() {
try {
Thread.sleep(2*1000);
System.out.println("頂級(jí)配置電腦進(jìn)入游戲成功...");
barrierLock.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
/**
* 玩家普通配置電腦進(jìn)入游戲線程
*/
static class LoadingGameThread_2 extends Thread
{
@Override
public void run() {
try {
Thread.sleep(5*1000);
System.out.println("普通配置電腦進(jìn)入游戲成功...");
barrierLock.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
注意:CyclicBarrier跟CountDownLatch在于它是可以重復(fù)使用的糕非,還有個(gè)區(qū)別是CyclicBarrier不需要匯總蒙具。