/**
* Provides a framework for implementing blocking locks and related
* synchronizers (semaphores, events, etc) that rely on
* first-in-first-out (FIFO) wait queues. This class is designed to
* be a useful basis for most kinds of synchronizers that rely on a
* single atomic int value to represent state. Subclasses
* must define the protected methods that change this state, and which
* define what that state means in terms of this object being acquired
* or released. Given these, the other methods in this class carry
* out all queuing and blocking mechanics. Subclasses can maintain
* other state fields, but only the atomically updated int
* value manipulated using methods {@link #getState}, {@link
* #setState} and {@link #compareAndSetState} is tracked with respect
* to synchronization.
*/
* +------+ prev +-----+ +-----+
* head | | <---- | | <---- | | tail
* +------+ +-----+ +-----+
在AQS中維護(hù)著一個(gè)FIFO的同步隊(duì)列,當(dāng)線程獲取同步狀態(tài)失敗后鞭执,則會(huì)加入到這個(gè)CLH同步隊(duì)列的對(duì)尾并一直保持著自旋。在CLH同步隊(duì)列中的線程在自旋時(shí)會(huì)判斷其前驅(qū)節(jié)點(diǎn)是否為首節(jié)點(diǎn)包竹,如果為首節(jié)點(diǎn)則不斷嘗試獲取同步狀態(tài)厉碟,獲取成功則退出CLH同步隊(duì)列。當(dāng)線程執(zhí)行完邏輯后鼎姊,會(huì)釋放同步狀態(tài)骡和,釋放后會(huì)喚醒其后繼節(jié)點(diǎn)。
它維護(hù)了一個(gè)volatile int state(代表共享資源)和一個(gè)FIFO線程等待隊(duì)列(多線程爭(zhēng)用資源被阻塞時(shí)會(huì)進(jìn)入此隊(duì)列)相寇。這里volatile是核心關(guān)鍵詞慰于,具體volatile的語(yǔ)義,在此不述唤衫。state的訪問(wèn)方式有三種:
getState()
setState()
compareAndSetState()
AQS定義兩種資源共享方式:
- Exclusive(獨(dú)占婆赠,只有一個(gè)線程能執(zhí)行,如ReentrantLock)
- Share(共享佳励,多個(gè)線程可同時(shí)執(zhí)行休里,如Semaphore/CountDownLatch)
ReentrantLock的公平和非公平如何實(shí)現(xiàn)的呢?它內(nèi)部有幾個(gè)類
abstract static class Sync extends AbstractQueuedSynchronizer
static final class NonfairSync extends Sync
static final class FairSync extends Sync
對(duì)于State都是tryAcquire方法
公平鎖
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
}
非公平鎖
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
可以發(fā)現(xiàn)區(qū)別在于公平鎖多了一層hasQueuedPredecessors()
判斷
/**
* Queries whether any threads have been waiting to acquire longer
* than the current thread.
*
* <p>An invocation of this method is equivalent to (but may be
* more efficient than):
* <pre> {@code
* getFirstQueuedThread() != Thread.currentThread() &&
* hasQueuedThreads()}</pre>
*
* <p>Note that because cancellations due to interrupts and
* timeouts may occur at any time, a {@code true} return does not
* guarantee that some other thread will acquire before the current
* thread. Likewise, it is possible for another thread to win a
* race to enqueue after this method has returned {@code false},
* due to the queue being empty.
*
* <p>This method is designed to be used by a fair synchronizer to
* avoid <a href="AbstractQueuedSynchronizer#barging">barging</a>.
* Such a synchronizer's {@link #tryAcquire} method should return
* {@code false}, and its {@link #tryAcquireShared} method should
* return a negative value, if this method returns {@code true}
* (unless this is a reentrant acquire). For example, the {@code
* tryAcquire} method for a fair, reentrant, exclusive mode
* synchronizer might look like this:
*
* <pre> {@code
* protected boolean tryAcquire(int arg) {
* if (isHeldExclusively()) {
* // A reentrant acquire; increment hold count
* return true;
* } else if (hasQueuedPredecessors()) {
* return false;
* } else {
* // try to acquire normally
* }
* }}</pre>
*
* @return {@code true} if there is a queued thread preceding the
* current thread, and {@code false} if the current thread
* is at the head of the queue or the queue is empty
* @since 1.7
*/
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
doc 和 代碼都很清楚, 此方法判斷當(dāng)前線程之前是否還有其他線程在(非頭結(jié)點(diǎn))