AQS獨占和共享鎖,ReentantLock為獨占鎖铝宵,ReentantReadWriteLock中readLock()為共享鎖首懈,writeLock()為獨占鎖。
讀鎖與讀鎖可以共享
讀鎖與寫鎖不可以共享
寫鎖與寫鎖不可以共享
AQS內部維護了CLH的FIFO的隊列垂睬,隊列中的節(jié)點為Node類型的元素
圖片轉載自https://www.cnblogs.com/waterystone/p/4920797.html
Node節(jié)點結構:
static final class Node {
int waitStatus;
Node prev;
Node next;
Node nextWaiter;
Thread thread;
}
waitStatus:等待狀態(tài)媳荒,具體類型如下:
CANCELLED:值為1 取消
SIGNAL:值為-1 喚醒后繼節(jié)點
CONDITION:值為-2 表示當前節(jié)點在condition隊列
PROPAGATE:值為-3 共享模式下acquireShared需要向下傳播
0:無狀態(tài),表示當前節(jié)點在隊列中等待獲取鎖
prev:前向節(jié)點
next:后繼節(jié)點
nextWaiter:condition隊列中的下一個等待節(jié)點
thread:當前線程
AQS中會維護head驹饺、tail節(jié)點钳枕,head節(jié)點中實際不存儲線程,只保留隊列頭部節(jié)點的引用
state:
臨界資源赏壹,當未獲取到鎖時鱼炒,state=0,獲取到鎖則會置為state=1
ReentantLock為可重入鎖蝌借,同一線程多次獲取鎖時昔瞧,只會相應增加state的值,釋放鎖時菩佑,會對state遞減自晰,當state=0時則釋放鎖。
CountDownLatch中稍坯,初始化new CountDownLatch(arg),arg則為state的值酬荞,當其他線程調用countDown()時,則減少state瞧哟,state=0時混巧,則立即去喚醒后繼的節(jié)點
ReentantLock中實現了三個內部類:
Sync:繼承AQS
FairSync:繼承Sync. 實現公平鎖
NoFairSync:繼承Sync 實現非公平鎖,ReentrantLock默認實現
現在已非公平鎖為例進行說明:
lock方法:
1.CAS將state狀態(tài)改為1勤揩,成功則將exclusiveOwnerThread設置為當前線程
2.CAS失敗咧党,執(zhí)行acquire(1)
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
//AQS中
1.嘗試獲取鎖,成功則返回雄可,否則執(zhí)行2
2.執(zhí)行addWaiter方法凿傅,創(chuàng)建獨占節(jié)點加入隊列尾
3.然后自旋的獲取鎖缠犀,獲取成功,則返回中斷標示
4.獲取失敗則找到安全點(前續(xù)節(jié)點為signal -1),則將自己park
5.檢查是否被中斷聪舒,如果是則拋出中斷異常
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
ReentrantLock中實現tryAcquire(arg)方法
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
addWaiter方法
1.將當前線程封裝成獨占節(jié)點
2.尾節(jié)點不為null辨液,則CAS方法將node節(jié)點添加到隊列尾部,否則執(zhí)行3
3.enq方法箱残,自旋的將node添加到隊列尾部滔迈,并返回node節(jié)點
在添加到對列尾部時,都會先設置node的prev節(jié)點被辑,所以判斷一個節(jié)點是否在隊列中燎悍,如果node節(jié)點的prev==null,則一定不在隊列中盼理。如果prev谈山!=null,也不能說節(jié)點一定在隊列中宏怔,有可能正在自旋的設置pred的next節(jié)點為node節(jié)點
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued方法
1.獲取節(jié)點的前續(xù)節(jié)點p奏路,如果p==head,則去嘗試獲取鎖臊诊。成功后將node設置為head節(jié)點鸽粉,返回中斷標示,否則執(zhí)行2
- shouldParkAfterFailedAcquire方法為了找到當前節(jié)點的安全點抓艳,即前續(xù)節(jié)點的waitStatus=Node.SIgnal(喚醒后續(xù)節(jié)點)触机,然后執(zhí)行3。否則自旋
- 當前線程park玷或,然后返回中斷標示
- 如果線程被中斷過儡首,則將中斷標示設置為true
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
parkAndCheckInterrupt方法判斷中斷標示,為什么要判斷中斷標示偏友?
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
非常簡單椒舵,阻塞當前線程,然后返回線程的中斷狀態(tài)并復位中斷狀態(tài)
在博客【http://www.ideabuffer.cn/2017/03/15/深入理解AbstractQueuedSynchronizer(一)】中有很詳細的講解
···
注意interrupted()方法的作用约谈,該方法是獲取線程的中斷狀態(tài),并復位犁钟,也就是說棱诱,如果當前線程是中斷狀態(tài),則第一次調用該方法獲取的是true涝动,第二次則是false迈勋。而isInterrupted()方法則只是返回線程的中斷狀態(tài),不執(zhí)行復位操作醋粟。
···
如果acquireQueued執(zhí)行完畢靡菇,返回中斷狀態(tài)重归,回到acquire方法中,根據返回的中斷狀態(tài)判斷是否需要執(zhí)行Thread.currentThread().interrupt()厦凤。
為什么要多做這一步呢鼻吮?先判斷中斷狀態(tài),然后復位较鼓,如果之前線程是中斷狀態(tài)椎木,再進行中斷?
這里就要介紹一下park方法了博烂。park方法是Unsafe類中的方法香椎,與之對應的是unpark方法。簡單來說禽篱,當前線程如果執(zhí)行了park方法畜伐,也就是阻塞了當前線程,反之躺率,unpark就是喚醒一個線程玛界。
具體的說明請參考http://blog.csdn.net/hengyunabc/article/details/28126139
park與wait的作用類似,但是對中斷狀態(tài)的處理并不相同肥照。如果當前線程不是中斷的狀態(tài)脚仔,park與wait的效果是一樣的;如果一個線程是中斷的狀態(tài)舆绎,這時執(zhí)行wait方法會報java.lang.IllegalMonitorStateException鲤脏,而執(zhí)行park時并不會報異常,而是直接返回吕朵。
所以猎醇,知道了這一點,就可以知道為什么要進行中斷狀態(tài)的復位了:
如果當前線程是非中斷狀態(tài)努溃,則在執(zhí)行park時被阻塞硫嘶,這是返回中斷狀態(tài)是false;
如果當前線程是中斷狀態(tài)梧税,則park方法不起作用沦疾,會立即返回,然后parkAndCheckInterrupt方法會獲取中斷的狀態(tài)第队,也就是true哮塞,并復位;
再次執(zhí)行循環(huán)的時候凳谦,由于在前一步已經把該線程的中斷狀態(tài)進行了復位忆畅,則再次調用park方法時會阻塞。
所以尸执,這里判斷線程中斷的狀態(tài)實際上是為了不讓循環(huán)一直執(zhí)行家凯,要讓當前線程進入阻塞的狀態(tài)缓醋。想象一下,如果不這樣判斷绊诲,前一個線程在獲取鎖之后執(zhí)行了很耗時的操作送粱,那么豈不是要一直執(zhí)行該死循環(huán)?這樣就造成了CPU使用率飆升驯镊,這是很嚴重的后果
cancelAcquire方法
在acquireQueued方法的finally語句塊中葫督,如果在循環(huán)的過程中出現了異常,則執(zhí)行cancelAcquire方法板惑,用于將該節(jié)點標記為取消狀態(tài)橄镜。該方法代碼如下:
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
//跳過前續(xù)取消節(jié)點
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
//前續(xù)節(jié)點的后續(xù)節(jié)點
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
//將當前節(jié)點設置為取消
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
//如果當前節(jié)點為尾節(jié)點,則將尾節(jié)點設置為當前節(jié)點的pred冯乘,并將pred的next節(jié)點設置為null洽胶,相當于將node節(jié)點從隊列中移除
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
//
1.如果當前節(jié)點的前續(xù)節(jié)點不為head
2.前續(xù)節(jié)點ws不為-1則將前續(xù)節(jié)點的ws設置為-1
3.以上2步都為true時,校驗前續(xù)節(jié)點的線程不為null
3點均滿足則pred的next設置為node的后續(xù)節(jié)點裆馒,將node移除
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
//
1.如果node的前續(xù)節(jié)點為head或者設置狀態(tài)失敗姊氓,則喚醒后續(xù)節(jié)點
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
這里直接unpark后繼節(jié)點的線程,然后將next指向了自己喷好。
這里可能會有疑問翔横,既然要刪除節(jié)點,為什么都沒有對prev進行操作梗搅,而僅僅是修改了next禾唁?【以下摘自參考博客中的內容】
要明確的一點是,這里修改指針的操作都是CAS操作无切,在AQS中所有以compareAndSet開頭的方法都是嘗試更新荡短,并不保證成功,圖中所示的都是執(zhí)行成功的情況哆键。
那么在執(zhí)行cancelAcquire方法時掘托,當前節(jié)點的前繼節(jié)點有可能已經執(zhí)行完并移除隊列了(參見setHead方法),所以在這里只能用CAS來嘗試更新籍嘹,而就算是嘗試更新闪盔,也只能更新next,不能更新prev辱士,因為prev是不確定的锭沟,否則有可能會導致整個隊列的不完整,例如把prev指向一個已經移除隊列的node识补。
什么時候修改prev呢?其實prev是由其他線程來修改的辫红∑就浚回去看下shouldParkAfterFailedAcquire方法祝辣,該方法有這樣一段代碼:
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
該段代碼的作用就是通過prev遍歷到第一個不是取消狀態(tài)的node,并修改prev切油。
這里為什么可以更新prev蝙斜?因為shouldParkAfterFailedAcquire方法是在獲取鎖失敗的情況下才能執(zhí)行,因此進入該方法時澎胡,說明已經有線程獲得鎖了孕荠,并且在執(zhí)行該方法時,當前節(jié)點之前的節(jié)點不會變化(因為只有當下一個節(jié)點獲得鎖的時候才會設置head)攻谁,所以這里可以更新prev稚伍,而且不必用CAS來更新。
unlock方法
調用了sync的release(1)方法
public void unlock() {
sync.release(1);
}
AQS中的release方法
1.嘗試釋放鎖戚宦,釋放成功則執(zhí)行2
2.如果頭節(jié)點不為空且不為初始化狀態(tài)个曙,則喚醒后繼節(jié)點
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
ReentrantLock重寫了tryRelease方法
1.校驗當前線程是否為獲取鎖時設置的exclusiveOwnerThread
2.如果state==0,則將exclusiveOwnerThread設置為null受楼,返回true垦搬,釋放成功,否則3
3.否則減少重入鎖的次數艳汽,返回false
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
unparkSuccessor方法
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
主要功能就是要喚醒下一個線程猴贰,這里s == null || s.waitStatus > 0判斷后繼節(jié)點是否為空或者是否是取消狀態(tài),然后從隊列尾部向前遍歷找到最前面的一個waitStatus小于0的節(jié)點河狐,至于為什么從尾部開始向前遍歷米绕,回想一下cancelAcquire方法的處理過程,cancelAcquire只是設置了next的變化甚牲,沒有設置prev的變化义郑,在最后有這樣一行代碼:node.next = node,如果這時執(zhí)行了unparkSuccessor方法丈钙,并且向后遍歷的話非驮,就成了死循環(huán)了,所以這時只有prev是穩(wěn)定的雏赦。
ReentrantLock默認實現了非公平鎖
public ReentrantLock() {
sync = new NonfairSync();
}
也可以設置為公平鎖:
ReentrantLock lock=new ReentrantLock(true);
構造函數:
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
公平鎖劫笙、非公平鎖中實現的tryRelease方法一樣,tryAcquire()方法則不同星岗,非公平鎖實現如下:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
會在臨界資源為0時判斷當前是否還有前項節(jié)點存在:
1.head!=tail
2.h.next為空
3.h.next !=null時s中線程不是當前線程
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
在AQS中填大,state可以標示鎖的數量,也可以標示其他狀態(tài)俏橘,state由子類負責去定義允华,AQS只負責維護state,通過state實現對臨界資源的訪問
【參考博客】
http://www.ideabuffer.cn/2017/03/15/深入理解AbstractQueuedSynchronizer(一)