AQS 全稱 AbstractQueuedSynchronizer奴烙,靠著開局一個int state和一個雙端FIFO的Node隊列,實現(xiàn)抽象的隊列式的同步器。AQS定義了一套多線程訪問共享資源的同步器框架爆存,許多同步類實現(xiàn)都依賴于它,如常用的ReentrantLock/Semaphore/CountDownLatch蝗砾。
Node類如下先较,
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
//隊列前后
volatile Node prev;
volatile Node next;
//節(jié)點對應的線程
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
對外需要重寫的方法,如果沒有重寫悼粮,使用的話闲勺,會拋異常
//獲得鎖
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
//釋放鎖
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
//當前線程是否持有鎖
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
公開使用的方法如下,這里列出我們重點說明的
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
從上述protected和public的方法扣猫,可以看到菜循,acquire方法需要調用用戶重寫的tryAcquire方法,根據(jù)tryAcquire返回值申尤,決定下一步操作癌幕,release方法同理衙耕。
我們結合Reentrantlock的實現(xiàn)來講解,
Reentrantlock | Sync
代碼如下勺远,刪除不重要的
abstract static class Sync extends AbstractQueuedSynchronizer {
abstract void lock(); //定義統(tǒng)一的抽象lock方法橙喘,等待實現(xiàn)
//定義非公平鎖的獲取
final boolean nonfairTryAcquire(int acquires) {
//當前線程
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { //沒有任何鎖,非0表示鎖已經被占
if (compareAndSetState(0, acquires)) { //CAS競爭獲得鎖
setExclusiveOwnerThread(current); //競爭后設置鎖的持有線程
return true;//返回
}
}
else if (current == getExclusiveOwnerThread()) { //鎖的可重入設置
int nextc = c + acquires;
if (nextc < 0) //
throw new Error("Maximum lock count exceeded");
setState(nextc); //重入次數(shù)累加
return true;
}
return false;
}
//鎖釋放
protected final boolean tryRelease(int releases) {
int c = getState() - releases; //重入次數(shù)減去
if (Thread.currentThread() != getExclusiveOwnerThread()) //當前鎖持有線程判斷
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { //如果當前重入次數(shù)為0胶逢,鎖釋放厅瞎,返回true,表示已經釋放鎖初坠。
free = true;
setExclusiveOwnerThread(null); //清空持有線程
}
setState(c);
return free; //否則和簸,返回false,沒有釋放鎖
}
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
}
從代碼上可以看出來碟刺,sync玩的是aqs的state而已锁保,并沒有涉及到fifo隊列蓄氧,而且姆泻,上述代碼和我們自定義的可重入鎖大同小異飒赃。
Reentrantlock | NonfairSync
代碼如下,
static final class NonfairSync extends Sync {
//實現(xiàn)Sync的lock方法
final void lock() {
if (compareAndSetState(0, 1)) //直接cas一下抄囚,如果獲得到鎖,說明當前沒有鎖橄务,不需阻塞
setExclusiveOwnerThread(Thread.currentThread()); //設置持有線程
else
acquire(1); //調用aqs的acquire幔托,進而調用下面的tryAcquire
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires); //調用nonfairTryAcquire
}
}
Sync生成對象sync
Reentrantlock使用sync的acquire和realse方法(來自aqs),實現(xiàn)獲取鎖和釋放鎖蜂挪,
public void lock() {
sync.lock(); //后者是非公平的Sync重挑,或者是公平的Sync
}
public void unlock() {
sync.release(1);
}
所以,重點指向aqs的acquire和release
方法棠涮。再貼一下code谬哀。
acquire
public final void acquire(int arg) {
if (!tryAcquire(arg) && //如果tryAcquire放回true,表示獲得到鎖严肪,退出if史煎,否則表示沒有得到
//鎖,進入阻塞隊列驳糯,執(zhí)行if條件 &&后面的語句
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
addWaiter方法
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode); //包裝Node
Node pred = tail;
if (pred != null) { //插入隊尾
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node); //如果執(zhí)行到此篇梭,說明上面快速插入隊列尾部失敗,畢竟是cas方式酝枢,這里enq通過
//無限循環(huán)實現(xiàn)cas方式插入隊列尾部
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued方法恬偷,
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) { //循環(huán),死磕
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { //直到這個時候退出帘睦,
//即袍患,節(jié)點node的前一個節(jié)點是head且如果成功獲得鎖坦康,如果沒有獲得鎖,下次循環(huán)繼續(xù)协怒,繼續(xù)阻塞在隊里中涝焙。
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted; // head后移后,當前節(jié)點從隊列移除孕暇,返回
}
//如果沒有退出for循環(huán)仑撞,干什么?死磕當前節(jié)點的前一個節(jié)點妖滔,
//當前一個節(jié)點都在等待信號signal的時候隧哮,當前節(jié)點如何操作?繼續(xù)等待W帷>谙琛!
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//查看當前節(jié)點的前節(jié)點是否是signal曲秉,順便清空下無效的節(jié)點
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) //只要前一個節(jié)點是signal采蚀,則返回
return true;
if (ws > 0) { //清理狀態(tài)無用的節(jié)點
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
//繼續(xù)等待,返回是否中斷
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
release
代碼如下承二,
public final boolean release(int arg) {
if (tryRelease(arg)) {
//如果重寫的tryRelease方法返回true榆鼠,表明釋放鎖了
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
//喚醒successor,即亥鸠,喚醒head的successor后繼者妆够,
return true;
}
//如果在這里,說明沒有釋放鎖负蚊,返回true
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);//設置當前Node狀態(tài)為0
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 如果當前Node的下一個節(jié)點符合狀態(tài)就直接進行喚醒神妹,
// 否則從隊尾開始進行倒序查找,找到最優(yōu)先的線程進行喚醒
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//尋找下一個等待線程節(jié)點來喚醒等待線程并通過LockSupport.unpark()喚醒線程
if (s != null)
LockSupport.unpark(s.thread);
}
Reentrantlock | FairSync
公平的鎖家妆,保證了阻塞的線程喚醒后鸵荠,能夠獲得到鎖,而不是被新的線程搶占伤极。和非公平的鎖唯一區(qū)別在ReentrantLock中的部分腰鬼,會根據(jù)情況判斷是否阻塞當前的線程
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//hasQueuedPredecessors是關鍵,他的作用在于塑荒,對于新來的線程熄赡,
//不讓其搶占鎖,而是直接進入到阻塞隊列中等待
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// 當前線程不為空齿税,并且阻塞的線程不是當前線程(非重入)
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread()); //
}