AbstractQueuedSynchronizer源碼
AbstractQueuedSynchronizer(AQS)內(nèi)部結(jié)構(gòu)
AQS是ReentrantLock、ReentrantReadWriteLock斑唬、CountDownLatch等經(jīng)常使用鎖的父類(lèi)撩炊。AQS既然是父類(lèi)砂竖,一些接口是需要實(shí)現(xiàn)類(lèi)自己去實(shí)現(xiàn)的例如tryAcquire么介,tryRelase罪治;AQS提供共享鎖和排它鎖(獨(dú)占鎖)。
AQS內(nèi)部有個(gè)Node礁蔗,申請(qǐng)鎖的線(xiàn)程會(huì)封裝為一個(gè)Node觉义,申請(qǐng)一個(gè)鎖的所有線(xiàn)程會(huì)組成Node鏈表。
鏈表是雙鏈表結(jié)構(gòu)
非租塞浴井,在并發(fā)下插入和移除操作不會(huì)阻塞(自旋晒骇,CAS)
FIFO
AQS內(nèi)還維護(hù)了state(內(nèi)部同步狀態(tài))、線(xiàn)程的阻塞和解除操作滋饲。
所有對(duì)state操作是原子性的
AQS內(nèi)部使用LockSuppert.park厉碟、unPark操作。內(nèi)部會(huì)在每個(gè)Thread設(shè)置一個(gè)本地狀態(tài)屠缭,讓線(xiàn)程輪訓(xùn)本地狀態(tài)箍鼓,而不是像普通的鎖,大家都是競(jìng)爭(zhēng)同一個(gè)鎖呵曹,造成更大的浪費(fèi)款咖。
Node的雙鏈表結(jié)構(gòu)圖
Node的屬性:
1.prev 指向節(jié)點(diǎn)上一個(gè)節(jié)點(diǎn),如果沒(méi)有則為null
2.next 指向節(jié)點(diǎn)下一個(gè)節(jié)點(diǎn)奄喂,為了查找最后一個(gè)節(jié)點(diǎn)方便铐殃,避免單鏈表情況下必須從head開(kāi)始找,如果沒(méi)有則為null
3.waitStatus 線(xiàn)程狀態(tài),CLH結(jié)構(gòu)中使用前一節(jié)點(diǎn)一個(gè)屬性標(biāo)識(shí)當(dāng)前節(jié)點(diǎn)狀態(tài)跨新,方便實(shí)現(xiàn)取消和超時(shí)功能富腊。
? CANCELLED =1,被取消或超時(shí)
? SIGNAL =-1 域帐,只是當(dāng)前節(jié)點(diǎn)的后續(xù)節(jié)點(diǎn)被阻塞了赘被,需要unparking
? CONDITION = -2,當(dāng)前節(jié)點(diǎn)在conditon隊(duì)列中
? PROPAGATE = -3肖揣,共享鎖使用民假,線(xiàn)性廣播喚醒線(xiàn)程。
默認(rèn) 0龙优,初始狀態(tài)
4.nextWaiter:標(biāo)識(shí)condtion隊(duì)列的后續(xù)節(jié)點(diǎn)羊异,此時(shí)prev,next不用彤断,而且節(jié)點(diǎn)waitStauts是CONDITION.
5.thread 對(duì)應(yīng)的線(xiàn)程
源代碼排它鎖(ReentrantLock)
ReentrantLock使用方式 1):
ReentrantLock.lock
ReentrantLock.unlock
ReentrantLock lock過(guò)程
ReentrantLock內(nèi)部有公平鎖FairSync和非公平鎖NonfairSync野舶,先看公平鎖
ReentrantLock:
public void lock() {
sync.lock();
}
//AbstractQueuedSynchronizer:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //如果沒(méi)有獲取到鎖 就創(chuàng)建Node節(jié)點(diǎn) 封裝當(dāng)前線(xiàn)程,成功后并中斷當(dāng)前線(xiàn)程
selfInterrupt();
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) // 需要unpark
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0); //大于1的都是取消的宰衙,要找到小與0的
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 需要前驅(qū)節(jié)點(diǎn)(waitStatus)unpark筒愚,也就說(shuō)告訴前節(jié)點(diǎn),你的next節(jié)點(diǎn)需要被通知運(yùn)行
}
return false;
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); //取出前節(jié)點(diǎn)
if (p == head && tryAcquire(arg)) {//如果前節(jié)點(diǎn)是head,則設(shè)置改節(jié)點(diǎn)為頭結(jié)點(diǎn)
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted; //并不中斷
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) //這里會(huì)有LockSupport操作,
//LockSupport.park unPark,這里會(huì)有一個(gè)許可纹安。park在獲取不到許可就會(huì)組阻塞陆淀,
interrupted = true;
}
} finally {
if (failed) //失敗就會(huì)取消線(xiàn)程
cancelAcquire(node); //取消后還要調(diào)整隊(duì)列
}
}
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode); //新建節(jié)點(diǎn) 并入隊(duì)尾 參數(shù)是模式
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) { //如果有并發(fā)考余,這里會(huì)失敗,進(jìn)入enq
pred.next = node;
return node;
}
}
enq(node);
return node;
}
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) { //CAS 操作轧苫,自旋模式 直到設(shè)置成功哦
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
//FairSync:
final void lock() {
acquire(1); //基類(lèi)AQS方法
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { //這里是判斷是否是空鏈表或者當(dāng)前線(xiàn)程是鏈表頭部
if (!hasQueuedPredecessors() && //判斷是否有pre節(jié)點(diǎn) 不包括head
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);//設(shè)置為當(dāng)前線(xiàn)程
return true;
}
}
else if (current == getExclusiveOwnerThread()) { //重入
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false; //沒(méi)有獲取到返回false
}
ReentrantLock unlock過(guò)程
//ReentrantLock:
public void unlock() {
sync.release(1);
}
//AQS:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0) //如果不是head不為空楚堤,切不為0,則要喚醒后續(xù)節(jié)點(diǎn)
unparkSuccessor(h);
return true;
}
return false;
}
//sync:
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
?
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) { //找到小于0的可執(zhí)行的
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread); //釋放
}
?
protected final boolean tryRelease(int releases) {
int c = getState() - releases; //同步參數(shù) -1
if (Thread.currentThread() != getExclusiveOwnerThread()) //不等于當(dāng)前線(xiàn)程則會(huì)有錯(cuò)誤
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { //如果沒(méi)有(并發(fā))含懊,則設(shè)置為null身冬,
free = true;
setExclusiveOwnerThread(null);
}
setState(c);//應(yīng)該和重入哦 重入鎖解鎖的時(shí)候不會(huì)喚醒下一個(gè)線(xiàn)程
return free;
}
非公平鎖:
非公平鎖很簡(jiǎn)單,在開(kāi)始是直接去爭(zhēng)奪修改status(0->1)岔乔,沒(méi)有使用鏈表酥筝,誰(shuí)修改status成功誰(shuí)拿到鎖
final void lock() {
if (compareAndSetState(0, 1)) //
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
?
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) { //
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
ReentrantLock使用方式 2):
Condtion condtion = ReentrantLock.newConditon();
condtion.await 和Object類(lèi)的wait方法等效
condtion.signal 和Object類(lèi)的notify方法等
condtion.signalAll Object類(lèi)的notifyAll方法等效
ReentrantLock condition實(shí)現(xiàn)類(lèi) ConditionObject
Node 鏈表只使用了nextWaiter,組成一個(gè)隊(duì)列,調(diào)用await會(huì)加入node鏈表tail雏门,signal是通知head節(jié)點(diǎn)運(yùn)行嘿歌;signalALl是遍歷鏈表,都去競(jìng)爭(zhēng)啊
//共享鎖 wait