AQS基本結(jié)構(gòu)
private transient volatile Node head; //頭節(jié)點
private transient volatile Node tail; //尾節(jié)點 喊废, 每個新的結(jié)點進來等限,都插入到最后枉氮,也就形成了一個鏈表
//表示當(dāng)前鎖的狀態(tài) 0 表示沒有被占用 锦援,大于 0 表示有線程持有了當(dāng)前鎖 state 的值大于1 是因為被重入了多次
private volatile int state;
阻塞隊列是從head后的結(jié)點開始的,也即 不包含頭節(jié)點head
哨兵結(jié)點head作用
因為AQS本身實現(xiàn)的目的是要在無鎖操作的情況下實現(xiàn)一個線程安全的同步隊列浦马, 它實現(xiàn)線程安全就是靠這個哨兵節(jié)點把出入隊操作隔離開:
如果沒有哨兵節(jié)點迅栅,那么每次執(zhí)行入隊操作殊校,都需要判斷head是否為空,如果為空則head=new Node如果不為空則
head.next=new Node
,而有哨兵節(jié)點則可以大膽的head.next=new Node
如果沒有哨兵節(jié)點读存,可能存在之前所說的安全性問題为流,當(dāng)只有一個節(jié)點的時候執(zhí)行入隊方法,無法保證tail和head不為空让簿。哪怕執(zhí)行入隊方法之前tail和head還指向一個節(jié)點敬察,可能由于并發(fā)性在具體調(diào)用入隊方法操作tail的時候head和tail共同指向的頭節(jié)點已經(jīng)完成出隊,此時
tail
和head
都為null
尔当,所以入隊方法中的tail.next=new node
會拋空指針異常莲祸,由于線程并發(fā)性的問題,tail始終可能隨時為空的問題不使用哨兵節(jié)點是無法解決的椭迎。
Node結(jié)構(gòu)
node結(jié)點的數(shù)據(jù)結(jié)構(gòu) 構(gòu)成 thread + waitStatus + prev + next
static final class Node {
static final Node SHARED = new Node(); //標識當(dāng)前在共享線程下
static final Node EXCLUSIVE = null; //標識當(dāng)前在獨占線程下
static final int CANCELLED = 1; //線程取消競爭當(dāng)前鎖標志位
static final int SIGNAL = -1; //當(dāng)前node結(jié)點的后繼結(jié)點對應(yīng)的線程需要被喚醒
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev; //前驅(qū)結(jié)點引用
volatile Node next; //后繼線程引用
volatile Thread thread; //當(dāng)前線程
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
線程搶鎖
以ReentrantLock
舉例
公平鎖下
//ReentrantLock 公平鎖
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
//父類AQS 實現(xiàn)
public final void acquire(int arg) { //此時arg == 1
if (!tryAcquire(arg) && // tryAcquire() 再次嘗試 若嘗試成功則返回true 不需要進隊等待
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 锐帜!false() && (入隊) addWaiter()將線程包裝成node
selfInterrupt();
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 可重入鎖 體現(xiàn)
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc); //設(shè)置鎖狀態(tài) 直接計數(shù)
return true;
}
return false;
}
}
acquire() 分析
public final void acquire(int arg) {
if (!tryAcquire(arg) && // tryAcquire() 再次嘗試 若嘗試成功則返回true 不需要進隊等待
// !false() && (入隊) tryAcquire(arg)沒有成功畜号,這個時候需要把當(dāng)前線程掛起缴阎,放到阻塞隊列中。
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire() 分析
//AQS 實現(xiàn)方式
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException(); //若子類沒有對該方法進行覆寫 简软, 則拋出異常
}
//FairSync 實現(xiàn)方式
//嘗試獲取鎖 返回值boolean 蛮拔,代表是否獲取到鎖
//返回true:1.沒有線程在等待鎖;2.重入鎖替饿,線程本來就持有鎖语泽,也就可以理所當(dāng)然可以直接獲取
protected final boolean tryAcquire(int acquires) { //對tryAcquire 進行覆寫 且使用 `final` 進行修飾 不可被更改
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 雖然此時此刻鎖是可以用的贸典,但是這是公平鎖视卢,既然是公平,就得講究先來后到廊驼,
// 看看有沒有別人在隊列中等了半天了 hasQueuedPredecessors() 查看隊列中是否有等待的線程
if (!hasQueuedPredecessors() &&
// 如果沒有線程在等待据过,那就用CAS嘗試一下惋砂,成功了就獲取到鎖了,
// 不成功的話绳锅,只能說明一個問題西饵,就在剛剛幾乎同一時刻有個線程搶先了
compareAndSetState(0, acquires)) {
//將當(dāng)前線程設(shè)置為鎖的擁有者 設(shè)置標記位
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 可重入鎖 體現(xiàn)
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc); //設(shè)置鎖狀態(tài) 直接計數(shù)
return true;
}
//獲取鎖失敗 回到上層 判斷acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
return false;
}
addWaiter()和 enq()分析
// 此方法的作用是把線程包裝成node,同時進入到隊列中
// 參數(shù)mode此時是Node.EXCLUSIVE鳞芙,代表獨占模式
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode); //根據(jù)當(dāng)前線程 實例化一個node對象 入隊
// Try the fast path of enq; backup to full enq on failure
//將node 結(jié)點加入到鏈表的最后面去 眷柔, 也即 進入阻塞隊列
Node pred = tail;
if (pred != null) { //此時 隊尾為空
node.prev = pred; //設(shè)置隊尾node 為該元素的前驅(qū)
if (compareAndSetTail(pred, node)) { // cas 將自己設(shè)置為隊尾 , 成功后 tail == node
pred.next = node; //建立后繼鏈接
return node;
}
}
enq(node); //此時tail == null 入隊
return node;
}
// 采用自旋的方式入隊
// 調(diào)用enq() 方法的情況有2種 : 1. 等待隊列為空原朝, 2. 或者有線程競爭入隊驯嘱,
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize 初始化 隊列為空時
if (compareAndSetHead(new Node())) //構(gòu)建哨兵結(jié)點 初始時 head == null
// 此時head節(jié)點的waitStatus==0, 看new Node()構(gòu)造方法就知道了
// 這個時候有了head,但是tail還是null喳坠,設(shè)置一下鞠评, 暫時將tail 指向head結(jié)點 , 下一次自旋tail就會移動
tail = head;
} else {
node.prev = t; //將當(dāng)前結(jié)點入隊
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued()分析
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
此時線程已經(jīng)經(jīng)過addWaiter()被包裝成node結(jié)點壕鹉,且進入了阻塞隊列
final boolean acquireQueued(final Node node, int arg) { // 進行park
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// p == head 說明當(dāng)前節(jié)點雖然進到了阻塞隊列剃幌,但是是阻塞隊列的第一個,因為它的前驅(qū)是head
//為何是第一個排隊的元素就有資格進行 嘗試一次 head一般指的是占有鎖的線程晾浴,head后面的才稱為阻塞隊列
// 首先负乡,①它是隊頭,這個是第一個條件脊凰,其次敬鬓,②當(dāng)前的head有可能是剛剛初始化的node,
// enq(node) 方法里面有提到笙各,head是延時初始化的钉答,而且new Node()的時候沒有設(shè)置任何線程
// 也就是說,當(dāng)前的head不屬于任何一個線程杈抢,所以作為隊頭数尿,可以去試一試,
if (p == head && tryAcquire(arg)) { // 第一個排隊的人(哨兵后的第一個結(jié)點) 調(diào)用tryAcquire()自旋一次
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//此時 要么node結(jié)點不是第一個排隊的元素 要么競爭失敗
//shouldParkAfterFailedAcquire 返回為true (即前驅(qū)結(jié)點 waitStutas == -1)正常調(diào)用parkAndCheckInterrupt() 將線程掛起
// 返回為false 此時前驅(qū)結(jié)點有 非-1 cas到 - 1 惶楼,下次自旋的時候就會返回true
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed) // true : 在自旋過程種出現(xiàn)了異常 右蹦, failed 本就被初始化 true
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire()
當(dāng)前線程沒有搶到鎖,是否需要掛起當(dāng)前線程歼捐?
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //pred為前驅(qū)結(jié)點 node為當(dāng)前結(jié)點
int ws = pred.waitStatus; //前驅(qū)結(jié)點的狀態(tài)信息
if (ws == Node.SIGNAL) //waitStatus == -1 標識前驅(qū)結(jié)點正常何陆,需要掛起當(dāng)前線程 ,直接返回true
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
//waitStatus > 0 說明前驅(qū)結(jié)點取消了排隊 豹储,需要為當(dāng)前結(jié)點順延找到一個 排隊的結(jié)點(也即 waitStatus == - 1)
//進入阻塞隊列排隊的線程會被掛起贷盲,而喚醒的操作是由前驅(qū)節(jié)點完成的。
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else { //此時waitStatus不為 - 1 和 1 可能的值為 0,-2巩剖,-3
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL); //將前驅(qū)結(jié)點的waitStatus 設(shè)置為 -1 铝穷,便于后續(xù)喚醒node
}
// 這個方法返回 false,那么會在外部自旋一次佳魔, 再走一次 for 循序曙聂,
// 然后再次進來此方法,此時會從第一個分支返回 true
return false;
}
線程解鎖
同樣以ReentrantLock
舉例
release()分析
public void unlock() { // 解鎖方式直接為ReentrantLock 方法
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
//AQS 實現(xiàn)方式
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
//ReentrantLock 實現(xiàn)方式(Sync)
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
//是否完全釋放鎖
boolean free = false;
if (c == 0) { //沒有嵌套鎖了鞠鲜,可以釋放了宁脊,否則還不能釋放掉
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
unparkSuccessor()分析
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus; //node 為頭節(jié)點
if (ws < 0) //頭節(jié)點waitStatus < 0
compareAndSetWaitStatus(node, ws, 0); // 如果head節(jié)點當(dāng)前waitStatus<0, 將其修改為0
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
//從隊尾往前找,找到waitStatus<=0的所有節(jié)點中排在最前面的 不直接從對頭找 可能后繼結(jié)點取消了排隊 (waitStutas == 1)
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev) //從以后往前找 不必擔(dān)心中間有結(jié)點取消排隊
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//喚醒線程
LockSupport.unpark(s.thread);
}
喚醒后
線程本來在acquireQueued()
中的parkAndCheckInterrupt()
被掛起贤姆,繼續(xù)執(zhí)行
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 剛剛線程被掛起在這里了
return Thread.interrupted();
}
回到競爭鎖流程中朦佩,acquireQueued(final Node node, int arg),這個時候庐氮,node的前驅(qū)是head了
AQS獲取鎖和釋放鎖流程圖
總結(jié):
AQS的實現(xiàn)是圍繞這自旋语稠、park--unpark、cas來進行實現(xiàn)的弄砍。
AQS的加鎖和解鎖需要三個部件協(xié)調(diào):
鎖狀態(tài) state :需要知道當(dāng)前鎖是否被占有了
線程的阻塞和解除阻塞 : AQS中采用 LockSupport.park(thread) 來掛起線程仙畦,用 unpark 來喚醒線程
阻塞隊列 : 因為爭搶鎖的線程可能很多,但是只能有一個線程拿到鎖音婶,其他的線程都必須等待慨畸,這個時候就需要一個 queue 來管理這些線程,AQS 用的是一個 FIFO 的隊列衣式,就是一個鏈表寸士,每個 node 都持有后繼節(jié)點的引用