AQS
AbstractQuenedSynchronizer抽象的隊列式同步器。是除了java自帶的synchronized關(guān)鍵字之外的鎖機制。
-
AQS主要結(jié)構(gòu)
AQS就是基于CLH隊列栋操,用volatile修飾共享變量state,線程通過CAS去改變狀態(tài)符,成功則獲取鎖成功呵晚,失敗則進入等待隊列(CLH隊列),等待被喚醒沫屡。
ReentrantLock源碼
-
ReentrantLock也是基于AQS實現(xiàn)的框架他的結(jié)構(gòu)為
主要里面包含著一個sync來控制
- lock方法(沒特殊標(biāo)志的都用公平鎖來講解,ReentrantLock初始化的時候其實是非公平鎖)
public void lock() {
sync.lock();
}
final void lock() {
acquire(1);
}
java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire
public final void acquire(int arg) {
//tryAcquire(arg)嘗試獲取鎖饵隙,如果能獲取成功的話則!tryAcquire(arg)為false,&&后面的邏輯也不會執(zhí)行
//如果獲取失敗的話會執(zhí)行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))添加入進入隊列
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire方法(可重入鎖實現(xiàn)地方)
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
//獲取當(dāng)前上鎖的次數(shù)
int c = getState();
if (c == 0) {
//判斷他前面是否還有人排隊,如果沒有人的話嘗試獲取鎖
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//可重入鎖實現(xiàn)地方沮脖,判斷當(dāng)前獲取鎖的線程是不是持有線程金矛,如果是的話則加一層鎖
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
進隊列方法acquireQueued(addWaiter(Node.EXCLUSIVE), arg))方法
private Node addWaiter(Node mode) {
//新建一個node
Node node = new Node(Thread.currentThread(), mode);
//尾巴節(jié)點
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果tail節(jié)點為空,說明是第一次入隊勺届,還沒有初始化驶俊,則初始化隊列
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
//t置為當(dāng)前tail的指針
Node t = tail;
if (t == null) {
//第一次進來初始化頭尾節(jié)點(初始化頭結(jié)點之后再復(fù)制尾巴節(jié)點)
if (compareAndSetHead(new Node()))
tail = head;
} else {
//第一次循環(huán)完后,將
node.prev = t;
//將tail置為node節(jié)點的指針(雖然上面把tail的指針復(fù)制給了t免姿,但是這邊直接改變了tail的指針饼酿,跟t沒干系,t還是指向原來的tail指針)
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//獲取前置節(jié)點
final Node p = node.predecessor();
//如果他的前置節(jié)點是head節(jié)點胚膊,則再嘗試獲取鎖
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//如果獲取不到鎖
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release to signal it, so it can safely park.
* 節(jié)點已經(jīng)設(shè)置好為 Node.SIGNAL狀態(tài)故俐,可以放心停止線程
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and indicate retry.
* 前置節(jié)點已經(jīng)cancel了,直接剔除
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* 將他的waitStatus置為SIGNAL
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
//停止線程
UNSAFE.park(false, 0L);
setBlocker(t, null);
}
- unlock解鎖方法
調(diào)用 sync.release(1);
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
//釋放鎖
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//喚醒等待線程
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//當(dāng)前的鎖持有數(shù)為0時(重入的話C可能不等于0)紊婉,則釋放鎖药版,返回true
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
//設(shè)置當(dāng)前的鎖狀態(tài)為剩余c
setState(c);
return free;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
//當(dāng)調(diào)用線程的unpark方法后會進入到休眠之前的方法
final boolean acquireQueued(final Node node, int arg) {
...
for (;;) {
//喚醒后會進入該自旋方法
//獲取前置節(jié)點
final Node p = node.predecessor();
//如果他的前置節(jié)點是head節(jié)點,則再嘗試獲取鎖
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
}
}
}
private void setHead(Node node) {
//將head修改為該節(jié)點的前置節(jié)點
head = node;
//置空
node.thread = null;
node.prev = null;
}
- 公平鎖與非公平鎖的差別,
1肩榕、非公平鎖在lock調(diào)用的時候會直接嘗試獲取鎖刚陡,不會看隊列前面有沒有等待的惩妇,公平鎖則是要看前面有沒有已經(jīng)在排隊的
非公平鎖他嘗試獲取不到鎖之后也會進入CLH隊列,如果進入到隊列之后就是排隊了筐乳。這個跟公平鎖是一樣的
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
final void lock() {
//差異1歌殃、非公平鎖一進來就會嘗試獲取鎖,如果沒有獲取到鎖才進入隊列
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//差異2蝙云、公平鎖這邊會判斷前面是否還有節(jié)點氓皱,非公平鎖不會判斷嗎,直接cas嘗試獲取
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}