0X01概述
提到j(luò)ava里的鎖機(jī)制守谓,通常有兩種一種是jvm的synchronized關(guān)鍵字實(shí)現(xiàn),一種是ReentranLock。前者是jvm實(shí)現(xiàn)沥割,而后者則是java代碼實(shí)現(xiàn),AbstractQueuedSynchronizer則是java并發(fā)包里很多類的實(shí)現(xiàn)基礎(chǔ),比如ReentranLock,CountDownLatch,CyclicBarrier等呀非。
要實(shí)現(xiàn)鎖的控制壁畸,試想一下絮吵,有幾個(gè)地方
- 怎么才叫持有鎖霍骄?怎么才叫未拿到鎖淘邻?鎖是什么東西似将?
- 如何實(shí)現(xiàn)搶占梭伐?未搶到鎖的線程該用什么數(shù)據(jù)結(jié)構(gòu)存儲痹雅,以便其能在鎖被釋放后,按照什么邏輯獲得鎖糊识?
- 線程控制與強(qiáng)鎖之間如何協(xié)調(diào)绩社?才達(dá)到性能最好
- 線程多個(gè)狀態(tài)之間如何控制線程搶占關(guān)系
- 釋放鎖的時(shí)候線程如何競爭?
如果是面試的時(shí)候如何解釋這個(gè)原理赂苗?
先關(guān)注幾個(gè)基礎(chǔ)數(shù)據(jù)結(jié)構(gòu)
- 用一個(gè)FIFO的CLH隊(duì)列存儲線程node
- Node對象是隊(duì)列的節(jié)點(diǎn)對象愉耙,里面封裝了thread、node statu等
- 用一個(gè)volatile的int state控制允許同時(shí)拿鎖的數(shù)量
在關(guān)幾個(gè)關(guān)鍵過程
- 初始化隊(duì)列
- 添加節(jié)點(diǎn)
- 節(jié)點(diǎn)狀態(tài)變更
一個(gè)線程通過子類實(shí)現(xiàn)的acquire方法哑梳,來實(shí)現(xiàn)對鎖的獲取劲阎,比如常見的ReentranLock實(shí)現(xiàn)的state值是1,就是同時(shí)只允許一個(gè)線程獲取鎖鸠真。
對于未獲取鎖的線程開始競爭悯仙,這里舉個(gè)栗子,加入此時(shí)有T2 T3兩個(gè)線程沒拿到鎖吠卷,則他們開始入隊(duì)锡垄,初始化隊(duì)列,設(shè)置頭節(jié)點(diǎn)祭隔,逐個(gè)添加尾節(jié)點(diǎn)货岭。
但這里問題又來了,如何保證多個(gè)線程對隊(duì)列讀寫的安全性疾渴?要知道現(xiàn)在是沒有鎖的概念千贯,因?yàn)檎趯?shí)現(xiàn)鎖的基礎(chǔ)框架。所以java這里使用了樂觀鎖搞坝。提到樂觀鎖搔谴,就不得不提到cas操作,提到cas就要想到volatile保證線程可見性桩撮。
另外一點(diǎn)敦第,樂觀鎖一般是和“重試”操作息息相關(guān)的,cas只能保證一次操作的成功店量,如果一個(gè)線程操作成功了芜果,那其他線程你不能不管不顧,要給他們一個(gè)去處融师,一個(gè)邏輯右钾,比如重試邏輯,重試就代表你的代碼需要重復(fù)執(zhí)行,比如while(true) 比如for(;;)霹粥,但凡死循環(huán)勢必帶一個(gè)終止條件灭将。
AQS里就是用cas、volatile和重試來保證隊(duì)列操作的安全性后控。
T2庙曙、T3現(xiàn)在(順序看搶占順序)添加到隊(duì)列里,T3是隊(duì)尾浩淘,T2是第二個(gè)節(jié)點(diǎn)捌朴,頭節(jié)點(diǎn)是初始化節(jié)點(diǎn)是一個(gè)空的Node節(jié)點(diǎn)(不懂,就是這么設(shè)計(jì)的张抄,只是作為一個(gè)控制節(jié)點(diǎn)存在),T2在隊(duì)尾砂蔽,如果后面有新來的線程就添加到隊(duì)尾。
如果僅僅是放到隊(duì)列里肯定是不夠的署惯,在入隊(duì)后左驾,AQS會用一個(gè)for(;;)去操作節(jié)點(diǎn),也就是俗稱的線程自旋极谊,自旋是說一個(gè)線程暫時(shí)不掛起诡右,而是空轉(zhuǎn)(就是空循環(huán)),每次循環(huán)嘗試去獲取鎖轻猖,看看鎖有沒有被釋放(調(diào)用子類acquire方法)帆吻,如果沒有就去更改Node狀態(tài),從初始化改為可以阻塞狀態(tài)咙边,下一次自旋后猜煮,根據(jù)狀態(tài)讓線程掛起,終止自旋败许。
以上只是概述王带,下面來看看關(guān)鍵類源碼和一些圖。
0X02關(guān)鍵代碼和圖例
2.1Node代碼
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
//表示節(jié)點(diǎn)的線程是已被取消的
static final int CANCELLED = 1;
//表示當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)的線程需要被喚醒
static final int SIGNAL = -1;
//表示線程正在等待某個(gè)條件
static final int CONDITION = -2;
//表示下一個(gè)共享模式的節(jié)點(diǎn)應(yīng)該無條件的傳播下去
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
}
2.2AQS結(jié)構(gòu)
[圖片上傳失敗...(image-38499c-1536736127795)]
2.3Node節(jié)點(diǎn)狀態(tài)圖
2.4AQS關(guān)鍵代碼
/**
* 以獨(dú)占所ReentrantLock為例市殷,只有l(wèi)ock方法里線程CAS拿鎖失敗的那些線程會進(jìn)入該方法
*/
public final void acquire(int arg) {
logger.info("線程{}開始搶占鎖", Thread.currentThread().getId());
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { //如果tryAcquire嘗試cas獲取鎖失敗了愕撰,則入隊(duì)
//搶占失敗 && 別的node 自旋搶到了鎖
selfInterrupt();
}
}
tryAcquire的子類實(shí)現(xiàn),Sync類ReentranLock里的類:
/**
* 嘗試獲取鎖
*/
final boolean nonfairTryAcquire(int acquires) {
logger.info("nonfairTryAcquire {}嘗試獲取鎖begin", Thread.currentThread().getId());
final Thread current = Thread.currentThread();
int c = getState();
// logger.info("nonfairTryAcquire {}當(dāng)前 stat:{}", Thread.currentThread().getId(), c);
if (c == 0) {//如果當(dāng)前state0被丧,說明上一個(gè)鎖釋放了,進(jìn)入該方法的線程嘗試cas獲取鎖
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
logger.info("nonfairTryAcquire {}嘗試獲取鎖success state = 0 ", Thread.currentThread().getId());
return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
logger.info("nonfairTryAcquire nextc is {}", nextc);
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
logger.info("nonfairTryAcquire {}嘗試獲取鎖success state != 0 ", Thread.currentThread().getId());
return true;
}
logger.info("nonfairTryAcquire {}嘗試獲取鎖fail", Thread.currentThread().getId());
return false;
}
添加到隊(duì)尾或初始化隊(duì)列
/**
* Creates and enqueues node for current thread and given mode.
* 添加等待線程绪妹,設(shè)置獨(dú)占還是共享
* 獨(dú)占模式甥桂,mode是一個(gè)null
* mode是node里的nextWaiter
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
logger.info("addWaiter -{}", Thread.currentThread().getId());
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 隊(duì)列不為空,隊(duì)尾不為空的時(shí)候邮旷,把節(jié)點(diǎn)添加到隊(duì)尾黄选,
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
logger.info("addWaiter 隊(duì)尾已經(jīng)有元素,將新元素添加到隊(duì)尾");
try {
logger.info("最新的隊(duì)尾元素{},舊的隊(duì)尾元素{}", node.thread.getId(), node.prev.thread.getId());
} catch (Exception e) {
}
return node;
}
}
//否則办陷,入隊(duì)貌夕,初始化
enq(node);
return node;
}
初始化隊(duì)列,生成頭結(jié)點(diǎn)民镜,添加到隊(duì)尾
private Node enq(final Node node) {
logger.info("enq");
int i = 0;
for (; ; ) {
Node t = tail;
if (t == null) { // //tail節(jié)點(diǎn)為空啡专,初始化隊(duì)列,將頭結(jié)點(diǎn)設(shè)置為new Node(不關(guān)聯(lián)任何線程)
logger.info("tail節(jié)點(diǎn)為空制圈,初始化隊(duì)列们童,將頭結(jié)點(diǎn)設(shè)置為new Node(不關(guān)聯(lián)任何線程)");
if (compareAndSetHead(new Node()))//CAS設(shè)置頭結(jié)點(diǎn),CAS的地方都可能有并發(fā),多個(gè)thread過來鲸鹦,只有一個(gè)設(shè)置成head慧库,其他的在下次循環(huán),進(jìn)入else邏輯
tail = head; //只有一個(gè)節(jié)點(diǎn)的時(shí)候馋嗜,頭尾都是一個(gè)
} else {
logger.info("線程{},自旋弟{}次添加到隊(duì)尾", Thread.currentThread().getId(), ++i);
node.prev = t;
if (compareAndSetTail(t, node)) {//CAS添加到隊(duì)尾齐板,每次成功一個(gè),只有成功設(shè)置了葛菇,才會返回t甘磨,cas失敗繼續(xù)循環(huán),所以重試與CAS幾乎是綁定的
t.next = node;
return t;//死循環(huán)終止條件
}
}
}
}
自旋等待熟呛、狀態(tài)變更宽档、掛起線程
/**
* 把node插入隊(duì)列末尾后,它并不立即掛起該節(jié)點(diǎn)中線程,
* 因?yàn)樵诓迦胨倪^程中,前面的線程可能已經(jīng)執(zhí)行完成,
* 所以它會先進(jìn)行自旋操作acquireQueued(node, arg),
* 嘗試讓該線程重新獲取鎖!當(dāng)條件滿足獲取到了鎖則可以從自旋過程中退出,
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
logger.info("acquireQueued start... {}", Thread.currentThread().getId());
boolean failed = true;
try {
boolean interrupted = false;
int i = 0;
for (; ; ) {
logger.info("acquireQueue-{}-自旋{}次", Thread.currentThread().getId(), ++i);
final Node p = node.predecessor();//拿到前一個(gè)節(jié)點(diǎn)
if (p == head && tryAcquire(arg)) {//如果前一個(gè)是頭節(jié)點(diǎn)庵朝,就嘗試一次tryAcquire獲取鎖
setHead(node);//獲取成功就將該節(jié)點(diǎn)設(shè)為頭節(jié)點(diǎn)吗冤,代表成功獲得鎖
logger.info("acquireQueued setHead {}", Thread.currentThread().getId());
p.next = null; // help GC
failed = false;
return interrupted;
}
//如果沒獲取到鎖,則判斷是否應(yīng)該掛起
//這個(gè)判斷在shouldParkAfterFailedAcquire方法,
//通過它的前驅(qū)節(jié)點(diǎn)的waitStatus來確定
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
狀態(tài)變更:
/**
* 當(dāng)一個(gè)node嘗試獲取鎖失敗的時(shí)候九府,檢查更新node status字段椎瘟。
* 返回true表示需要阻塞(調(diào)用LockSupport)
* //該方法在上一層死循環(huán)里被調(diào)用
* > 0,將前驅(qū)節(jié)點(diǎn)踢出隊(duì)列,返回false
* < 0,也是返回false,不過先將前驅(qū)節(jié)點(diǎn)waitStatus設(shè)置為SIGNAL,使得下次判斷時(shí),將當(dāng)前節(jié)點(diǎn)掛起.
*
* @param pred 前一個(gè)節(jié)點(diǎn)
* @param node 當(dāng)前節(jié)點(diǎn)
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
logger.info("shouldParkAfterFailedAcquire-{}begin..", Thread.currentThread().getId());
int ws = pred.waitStatus;
logger.info("shouldParkAfterFailedAcquire-{} wait status:{}", Thread.currentThread().getId(), ws);
if (ws == Node.SIGNAL)
//SIGNAL,則返回true表示應(yīng)該掛起當(dāng)前線程,掛起該線程,并等待被喚醒,
//被喚醒后進(jìn)行中斷檢測,如果發(fā)現(xiàn)當(dāng)前線程被中斷,那么拋出InterruptedException并退出循環(huán).
return true;
if (ws > 0) { //線程被中斷或超時(shí)被取消侄旬,處于該CANCELLED狀態(tài)的節(jié)點(diǎn)肺蔚,應(yīng)該從隊(duì)列里刪除
//>0,將前驅(qū)節(jié)點(diǎn)踢出隊(duì)列,返回false
do { // 替換操作
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// <=0,非SINNAL,非CANCELLED也是返回false,
// 不過先將前驅(qū)節(jié)點(diǎn)(acquireQueue方法里傳過來的已經(jīng)是前驅(qū)節(jié)點(diǎn))waitStatus CAS置為SIGNAL,使得下次判斷時(shí),將當(dāng)前節(jié)點(diǎn)掛起.
logger.info("shouldParkAfterFailedAcquire-{} 將前驅(qū)節(jié)點(diǎn)waitStatus CAS置為SIGNAL,使得下次判斷時(shí),將當(dāng)前節(jié)點(diǎn)掛起", Thread.currentThread().getId());
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
最后貼一個(gè)喚醒操作
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
logger.info("unparkSuccessor-{} wait status :{}", Thread.currentThread().getId(), ws);
if (ws < 0)//小于0的時(shí)候儡羔,說明線程需要被喚醒
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
debug("unparkSuccessor 身后的節(jié)點(diǎn):", s);
logger.info("unparkSuccessor 此時(shí)隊(duì)列隊(duì)尾node是:{}", tail.thread.getId());
if (s == null || s.waitStatus > 0) {
//node的后繼節(jié)點(diǎn)==null,從末尾開始向前尋找合適的節(jié)點(diǎn),如果找到,則喚醒
//是為何是從tail尾節(jié)點(diǎn)開始宣羊,而不是從node.next開始呢?
// 原因在于node.next仍然可能會存在null或者取消了汰蜘,
// 所以采用tail回溯辦法找第一個(gè)可用的線程仇冯。
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)//直到找到初始狀態(tài)或等待喚醒狀態(tài)的node
s = t;
}
//喚醒這個(gè)節(jié)點(diǎn)上的線程,該節(jié)點(diǎn)有可能是后繼節(jié)點(diǎn)族操,有可能是從隊(duì)列末尾找到的節(jié)點(diǎn)
if (s != null) {
debug("unparkSuccessor", s);
logger.info("unparkSuccessor-{}喚醒下一個(gè)節(jié)點(diǎn){}", Thread.currentThread().getId(), s.thread.getId());
LockSupport.unpark(s.thread);
}
}
2.5下圖是未拿到鎖的線程的調(diào)用流程
流程