AQS解析
AQS通過FIFO的等待隊(duì)列障簿,提供了一個用來實(shí)現(xiàn)阻塞鎖和相關(guān)的同步器(信號量盹愚、事件機(jī)制)的框架,它是很多同步器實(shí)現(xiàn)的底層基礎(chǔ)站故,內(nèi)部基于一個int原子變量來代表狀態(tài)皆怕,子類必須實(shí)現(xiàn)protect方法來改變狀態(tài),這些方法定義怎么去獲取鎖和釋放鎖西篓。AQS的其他方法則定義了入隊(duì)以及阻塞的機(jī)制愈腾,子類可以定義其他的狀態(tài)變量,但是只有通過getState污淋,SetState以及compareAndSetState來原子更新的state變量能代表同步狀態(tài)。子類一般定義為非public的內(nèi)部類余掖,AQS提供了獨(dú)占模式和共享模式獲取鎖寸爆,當(dāng)在獨(dú)占模式,嘗試獲取其他線程占用的鎖將會失敗盐欺,共享模式下多線程則可能會成功赁豆。不同模式下的等待線程公用同一個FIFO隊(duì)列,通常冗美,子類只提供其中一種模式的實(shí)現(xiàn)魔种,ReadWriteLock除外。
該類的內(nèi)部定義了一個嵌套的ConditionObject類用來作為Condition的實(shí)現(xiàn)粉洼,用來支撐獨(dú)占模式的實(shí)現(xiàn)
AQS類的繼承關(guān)系:
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable
AbstractOwnableSynchronizer:同步器可能被一個線程獨(dú)占节预,因此抽象出該類保存獨(dú)占的線程。主要屬性和方法:
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
Node節(jié)點(diǎn)
AQS最主要的是其中的節(jié)點(diǎn)類Node属韧,用來表示阻塞的等待隊(duì)列中的節(jié)點(diǎn)(下面這幾段話翻譯自java doc安拟,有點(diǎn)拗口)
等待隊(duì)列是CLH鎖隊(duì)列的變種,CLH鎖一般用來作為自旋鎖宵喂,這里用來進(jìn)行阻塞同步糠赦,但是使用的理論都是持有前置節(jié)點(diǎn)的狀態(tài)來進(jìn)行同步等待。每個節(jié)點(diǎn)有一個"status"屬性,用來表示線程是否block拙泽,當(dāng)前置節(jié)點(diǎn)釋放鎖淌山,當(dāng)前節(jié)點(diǎn)會被通知。隊(duì)列里面的每個節(jié)點(diǎn)就像持有單個等待線程的monitor一樣顾瞻。這個status屬性不會控制線程是否獲取到鎖泼疑。當(dāng)一個線程在隊(duì)頭的時候會嘗試獲取鎖,但是排在第一位并不能保證能夠獲取成功朋其,它只是表示有權(quán)利去競爭王浴。所以釋放鎖的線程再去獲取鎖時可能需要等待蹲嚣。
當(dāng)入CLH鎖隊(duì)列時崩哩,會通過原子操作將入隊(duì)節(jié)點(diǎn)排在隊(duì)尾米奸,出隊(duì)的時候锋谐,只需要設(shè)置頭結(jié)點(diǎn)的狀態(tài)(置為空)
+------+ prev +-----+ +-----+ head | | <---- | | <---- | | tail +------+ +-----+ +-----+
插入到CLH隊(duì)列中蝎宇,只需要對“尾巴”執(zhí)行一次原子操作技潘,因此存在一個簡單的原子分界點(diǎn)襟齿,即從未排隊(duì)到排隊(duì)铸本。同樣喇潘,出隊(duì)僅涉及更新“頭”体斩。但是,節(jié)點(diǎn)需要花費(fèi)更多的精力來確定其后繼者是誰颖低,因?yàn)橐幚碛捎诔瑫r和中斷而可能導(dǎo)致的取消絮吵。前置鏈接需要處理取消的情況,如果一個節(jié)點(diǎn)已經(jīng)取消了忱屑,它的后繼節(jié)點(diǎn)將會重新鏈接到新的非取消的前置節(jié)點(diǎn)蹬敲。同時這里又使用next鏈接來實(shí)現(xiàn)阻塞機(jī)制,每個節(jié)點(diǎn)的線程ID保存在自己的節(jié)點(diǎn)中莺戒,所以當(dāng)一個前置節(jié)點(diǎn)喚醒后面的節(jié)點(diǎn)伴嗡,通過遍歷next鏈接決定哪個線程被喚醒。
確定后繼者必須避免與新排隊(duì)的節(jié)點(diǎn)競爭从铲,競爭是通過設(shè)置他們前置節(jié)點(diǎn)的next屬性產(chǎn)生瘪校。這里的解決辦法是,如果需要名段,通過向后遍歷檢查隊(duì)列中的節(jié)點(diǎn)的后繼節(jié)點(diǎn)為null(換句話說阱扬,next鏈接是一種優(yōu)化,能夠避免向后掃描)
由于我們必須輪詢其他節(jié)點(diǎn)的取消情況伸辟,因此我們可能沒有注意到已取消的節(jié)點(diǎn)是在我們前面還是后面价认。要解決此問題,必須在取消時自娩,總是unpark后繼節(jié)點(diǎn)用踩,使他們能夠穩(wěn)定在新的前置節(jié)點(diǎn)身上渠退,除非我們可以確定未取消的前置節(jié)點(diǎn)。
在條件上等待的線程使用同樣的節(jié)點(diǎn)脐彩,但是使用額外的鏈接碎乃。
關(guān)于CLH鎖:博客1,博客2惠奸,一個CLH鎖的實(shí)現(xiàn)梅誓,方便我們理解AQS隊(duì)列:
public class ClhSpinLock {
// 前置節(jié)點(diǎn)
private final ThreadLocal<Node> pred;
// 當(dāng)前情況節(jié)點(diǎn)
private final ThreadLocal<Node> node;
// 尾部節(jié)點(diǎn)
private final AtomicReference<Node> tail = new AtomicReference<Node>(new Node());
public ClhSpinLock() {
// 這兩個節(jié)點(diǎn)是線程內(nèi)部變量,tail是多線程共享變量
this.node = ThreadLocal.withInitial(() -> new Node());
this.pred = ThreadLocal.withInitial(() -> null);
}
public void lock() {
// 當(dāng)前持有鎖的節(jié)點(diǎn)
final Node node = this.node.get();
// 表示當(dāng)前準(zhǔn)備持有鎖
node.locked = true;
// 加鎖都會直接設(shè)置tail節(jié)點(diǎn)佛南,并獲取之前的tail節(jié)點(diǎn)
// 有競爭時梗掰,只有獲取到鎖的才會執(zhí)行完lock,其他都會在while中等待
// 對于同一個線程都是獲取同一個node嗅回,對于不同線程會獲取之前的tail及穗,從而形成一個鏈
Node pred = this.tail.getAndSet(node);
// 每次后來的接單會覆蓋前置節(jié)點(diǎn),形成一個鏈
this.pred.set(pred);
while (pred.locked) {}
}
public void unlock() {
// 獲取當(dāng)前線程的鎖節(jié)點(diǎn)
final Node node = this.node.get();
// 釋放鎖绵载,上面的while會退出從而獲取到鎖
node.locked = false;
// 當(dāng)前節(jié)點(diǎn)設(shè)置為前置節(jié)點(diǎn)埂陆,開始時,pred相當(dāng)于一個虛擬的頭結(jié)點(diǎn)
this.node.set(this.pred.get());
}
private static class Node {
private volatile boolean locked;
}
}
AQS中的等待隊(duì)列Node代碼如下:
static final class Node {
/** 表示節(jié)點(diǎn)在共享模式下等待 */
static final Node SHARED = new Node();
/** 表示節(jié)點(diǎn)在獨(dú)占模式下等待 */
static final Node EXCLUSIVE = null;
/** waitStatus的值娃豹,表示取消 */
static final int CANCELLED = 1;
/** waitStatus的值焚虱,表示后繼節(jié)點(diǎn)需要喚醒 */
static final int SIGNAL = -1;
/** waitStatus的值,表示線程正在條件等待 */
static final int CONDITION = -2;
/**
* waitStatus的值懂版,表示下一個獲取共享鎖需要無條件傳播
*/
static final int PROPAGATE = -3;
/**
* 非負(fù)的數(shù)值表示節(jié)點(diǎn)不需要喚醒鹃栽,在同步節(jié)點(diǎn)中,該值初始化為0
* Status field, taking on only the values:
* SIGNAL: 當(dāng)前節(jié)點(diǎn)獲取鎖后躯畴,后繼節(jié)點(diǎn)將會被被阻塞民鼓,因此當(dāng)當(dāng)前節(jié)點(diǎn)釋放鎖或者取消的時候,當(dāng)前節(jié)點(diǎn)需要喚醒后繼節(jié)點(diǎn)私股,
為了避免競爭摹察,acquire方法必須一開要說明他們需要signal恩掷,然后進(jìn)行原子地acquire倡鲸,然后失敗的時候阻塞
The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* CANCELLED: 當(dāng)前節(jié)點(diǎn)超時或者被中斷,節(jié)點(diǎn)進(jìn)入到這個狀態(tài)后不會再改變
This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: 當(dāng)前節(jié)點(diǎn)處于等待條件隊(duì)列中黄娘,它知道狀態(tài)發(fā)生改變峭状,變成0時,才會作為一個同步隊(duì)列節(jié)點(diǎn)逼争,使用這個值和該屬性的其他使用無關(guān)优床,只是為了簡化機(jī)制
This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: 一個釋放的共享狀態(tài)需要傳播到其他節(jié)點(diǎn),只針對頭節(jié)點(diǎn)設(shè)置誓焦,并在doReleaseShared設(shè)置胆敞,來保證傳播的繼續(xù),即使其他操作已經(jīng)介入
A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above 其他
* 非負(fù)值表示節(jié)點(diǎn)不需要喚醒操作,所以大部分代碼不需要檢查特定值移层,直接賦值即可
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
* 對于一般的同步節(jié)點(diǎn)來說初始值是0仍翰,對于條件等待節(jié)點(diǎn)來說是CONDITION,該值的修改使用CAS
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
volatile int waitStatus;
/**
* Link to predecessor node that current node/thread relies on
* for checking waitStatus. Assigned during enqueuing, and nulled
* out (for sake of GC) only upon dequeuing. Also, upon
* cancellation of a predecessor, we short-circuit while
* finding a non-cancelled one, which will always exist
* because the head node is never cancelled: A node becomes
* head only as a result of successful acquire. A
* cancelled thread never succeeds in acquiring, and a thread only
* cancels itself, not any other node.
* 前驅(qū)節(jié)點(diǎn):入隊(duì)的時候賦值观话,出隊(duì)的時候賦值為null
前驅(qū)節(jié)點(diǎn)的鏈接予借,在入隊(duì)的時候賦值,在出隊(duì)的時候賦值為null频蛔。如果一個前驅(qū)節(jié)點(diǎn)取消灵迫,會通過短路操作找到一個非取消的節(jié)點(diǎn),
該節(jié)點(diǎn)一定存在晦溪,因?yàn)轭^節(jié)點(diǎn)永遠(yuǎn)不會取消瀑粥。當(dāng)且僅當(dāng)獲取鎖成功,一個節(jié)點(diǎn)才會變成頭結(jié)點(diǎn)尼变。一個取消的節(jié)點(diǎn)永遠(yuǎn)不會成功的獲取鎖利凑,
并且一個線程只會取消自己,不會取消其他節(jié)點(diǎn)
*/
volatile Node prev;
/**
* Link to the successor node that the current node/thread
* unparks upon release. Assigned during enqueuing, adjusted
* when bypassing cancelled predecessors, and nulled out (for
* sake of GC) when dequeued. The enq operation does not
* assign next field of a predecessor until after attachment,
* so seeing a null next field does not necessarily mean that
* node is at end of queue. However, if a next field appears
* to be null, we can scan prev's from the tail to
* double-check. The next field of cancelled nodes is set to
* point to the node itself instead of null, to make life
* easier for isOnSyncQueue.
后繼節(jié)點(diǎn)的鏈接嫌术,入隊(duì)的時候賦值哀澈,當(dāng)繞過取消節(jié)點(diǎn)的時候會調(diào)整,當(dāng)出隊(duì)的時候會賦值為null度气。直到關(guān)聯(lián)后割按,入隊(duì)操作才會賦值前置節(jié)點(diǎn)的next屬性,所以當(dāng)看到next屬性是null磷籍,并不代表它是隊(duì)列的最后一個節(jié)點(diǎn)适荣。然后,如果next屬性是null院领,我們可以從尾部向后掃描prev節(jié)點(diǎn)進(jìn)行double-check弛矛。取消節(jié)點(diǎn)的next屬性設(shè)置成該節(jié)點(diǎn)自己,而不是null比然,這是為了方便isOnSyncQueue方法
*/
volatile Node next;
/** 當(dāng)前入隊(duì)節(jié)點(diǎn)的線程
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
*/
volatile Thread thread;
/**
下一個在條件上等待的節(jié)點(diǎn),或者是特殊節(jié)點(diǎn)SHARED丈氓。因?yàn)闂l件隊(duì)列僅在獨(dú)占模式下才會訪問,所以我們只需要一個簡單的隊(duì)列來存放等待條件的節(jié)點(diǎn)强法。
* Link to next node waiting on condition, or the special
* value SHARED. Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
*/
Node nextWaiter;
/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
子類實(shí)現(xiàn)需要重寫的方法
繼承AQS的實(shí)現(xiàn)類需要重寫這幾個方法:
//嘗試在獨(dú)占模式中獲取鎖万俗,這個方法調(diào)用之前需要查詢當(dāng)前對象的狀態(tài)是否在獨(dú)占模式下允許獲取,如果獲取失敗饮怯,該方法可能會將當(dāng)前線程入隊(duì)
protected boolean tryAcquire(int arg)
//在獨(dú)占模式下闰歪,線程釋放鎖,該方法會嘗試設(shè)置同步器的狀態(tài)
protected boolean tryRelease(int arg)
//嘗試在共享模式下獲取鎖
protected int tryAcquireShared(int arg)
//嘗試在共享模式下釋放鎖
protected boolean tryReleaseShared(int arg)
AQS其他字段屬性:
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
等待隊(duì)列的頭結(jié)點(diǎn)蓖墅,采用延遲初始化库倘,如果頭存在临扮,waitStatus保證不能是取消狀態(tài)
*/
private transient volatile Node head;
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
* 尾結(jié)點(diǎn)
*/
private transient volatile Node tail;
/**
* 同步狀態(tài),為0表示沒加鎖
*/
private volatile int state;
// 為了提高性能直接使用本地方法CAS修改相關(guān)字段的值,所以需要Unsafe獲取到每個屬性的內(nèi)存偏移
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
acquire方法
AQS的主要方法:
acquire 獲取允許
release釋放允許
下面講解下acquire 方法教翩,配合ReentrantLock中的實(shí)現(xiàn)講解公条,參考博客
//獲取信號量的過程,tryAcquire邏輯由子類實(shí)現(xiàn),如果獲取失敗迂曲,進(jìn)入隊(duì)列靶橱,在隊(duì)列中獲取,如果中斷了 則中斷
public final void acquire(int arg) { // Method 1
/**如果tryAcquire成功路捧。不會走到后面的acquireQueued关霸,
如果tryAcquire失敗,會走到acquireQueued杰扫,表示在隊(duì)里中去等待獲取鎖*/
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
/**這里看下ReentrantLock中的公平鎖的tryAcquire的實(shí)現(xiàn)*/
protected final boolean tryAcquire(int acquires) { // Method 2
final Thread current = Thread.currentThread();
// 調(diào)用的AQS代碼队寇,獲取當(dāng)前狀態(tài)
int c = getState();
// 如果當(dāng)前狀態(tài)是0,
if (c == 0) {
// hasQueuedPredecessors代碼在下面章姓,看下隊(duì)列中是否還有前置節(jié)點(diǎn)
// 如果沒有前置節(jié)點(diǎn)佳遣,就去獲取信號量,獲取的數(shù)量是acquires
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
// 設(shè)置當(dāng)前線程
setExclusiveOwnerThread(current);
return true;
}
}
// 這里表示有競爭凡伊,但是重入的情況還是可以獲取鎖的
else if (current == getExclusiveOwnerThread()) {
// 如果是重入的情況零渐,直接設(shè)置信號量
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 表示有競爭,或者有前置節(jié)點(diǎn)系忙,需要等待
return false;
}
/**
* 查詢當(dāng)前是否有其他的線程比當(dāng)前線程等待的更久诵盼,當(dāng)前方法的邏輯類似于下面的代碼(效率比JDK的代碼低):
*
* getFirstQueuedThread() != Thread.currentThread() && hasQueuedThreads()
*
* 注意,由于中斷或者超時產(chǎn)生的取消可能發(fā)生在任何時候银还,所以返回true不能保證有其他線程會比當(dāng)前線程更先獲取到鎖风宁,同樣,當(dāng)方法返回false(判斷了隊(duì)列為空)蛹疯,也可能出現(xiàn)另一個線程在方法調(diào)用之后贏得競爭入隊(duì)(也就是該方法結(jié)果不可信)
* 這個方法主要被公平鎖用來避免競爭戒财,當(dāng)這個方法返回true(非重入情況),公平鎖的tryAcquire方法應(yīng)該直接返回false捺弦,以及tryAcquireShared返回一個負(fù)值饮寞,下面是一個代碼案例:
* protected boolean tryAcquire(int arg) {
* if (isHeldExclusively()) {
* // A reentrant acquire; increment hold count
* // 重入,增加計(jì)數(shù)
* return true;
* } else if (hasQueuedPredecessors()) {
* return false;
* } else {
* // try to acquire normally
* }
* }
*/
public final boolean hasQueuedPredecessors() { // Method 3
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
// 結(jié)果的正確性依賴頭節(jié)點(diǎn)比尾節(jié)點(diǎn)更早初始化羹呵,以及如果當(dāng)前線程是隊(duì)列中的第一個骂际,head.next是準(zhǔn)確的
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
// 頭不等于尾疗琉,頭等于尾表示隊(duì)列為空冈欢,在初始化的時候
// 頭的下一個節(jié)點(diǎn)為null或者當(dāng)前線程不能等于頭節(jié)點(diǎn)的線程
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
// 假設(shè)tryAcquire獲取失敗,也就是有競爭的情況盈简,那后進(jìn)入的線程肯定要等待凑耻,等待需要進(jìn)入等待隊(duì)列太示,需要調(diào)用addWaiter方法
// 創(chuàng)建當(dāng)前線程節(jié)點(diǎn)并入隊(duì)列,返回當(dāng)前線程節(jié)點(diǎn)
private Node addWaiter(Node mode) { // Method 4
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 拿到尾節(jié)點(diǎn)香浩,嘗試在尾部加節(jié)點(diǎn)
Node pred = tail;
if (pred != null) {
// 當(dāng)前節(jié)點(diǎn)的prev鏈接指向pred
node.prev = pred;
//CAS設(shè)置尾結(jié)點(diǎn)类缤,設(shè)置成功返回true,設(shè)置失敗表示入隊(duì)有競爭
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果設(shè)置失敗,獲取的尾結(jié)點(diǎn)已經(jīng)不是尾結(jié)點(diǎn)
// 或者尾節(jié)點(diǎn)為null邻吭,還沒有初始化餐弱,再嘗試入隊(duì)
enq(node);
// 返回新入隊(duì)的節(jié)點(diǎn)
return node;
}
// 循環(huán)入隊(duì)
private Node enq(final Node node) {
//循環(huán)操作
for (;;) {
Node t = tail;
//如果尾結(jié)點(diǎn)為null 表示沒有初始化,開始初始化頭結(jié)點(diǎn)和尾結(jié)點(diǎn)為同一節(jié)點(diǎn)
//這里使用了延遲初始化囱晴,初始化時膏蚓,頭節(jié)點(diǎn)等于尾節(jié)點(diǎn)
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 當(dāng)前節(jié)點(diǎn)的pre節(jié)點(diǎn)指向尾節(jié)點(diǎn)
// 注意此處的設(shè)置鏈接和下面的設(shè)置next鏈接不是原子操作
// 因此在找取消節(jié)點(diǎn)的是,是從后向前遍歷畸写,因?yàn)閚ext指針可能沒連上
node.prev = t;
// 設(shè)置新的尾結(jié)點(diǎn)
if (compareAndSetTail(t, node)) {
t.next = node;
// 返回舊的尾節(jié)點(diǎn)
return t;
}
}
}
}
// 現(xiàn)在節(jié)點(diǎn)已經(jīng)入隊(duì)了
// node是新創(chuàng)建的入隊(duì)的節(jié)點(diǎn)
// 該方法是節(jié)點(diǎn)在隊(duì)列中喚醒然后獲取鎖這一邏輯的實(shí)現(xiàn)
final boolean acquireQueued(final AbstractQueuedSynchronizer.Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//循環(huán)入隊(duì)
for (;;) {
final AbstractQueuedSynchronizer.Node p = node.predecessor();
// 如果前置節(jié)點(diǎn)是頭節(jié)點(diǎn)驮瞧,再獲取資源
if (p == head && tryAcquire(arg)) {
// 設(shè)置新的頭結(jié)點(diǎn),相當(dāng)于獲取到鎖后取代新的頭節(jié)點(diǎn)
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 是否需要park枯芬,如果需要則park
// 如果不需要直接進(jìn)入下一輪循環(huán)
if (shouldParkAfterFailedAcquire(p, node) &&
// 用來阻塞等待(park)论笔,醒來后返回是否中斷過,如果中斷過返回true
// 然后設(shè)置interrupted為true
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// pred是前置節(jié)點(diǎn)千所,node是當(dāng)前節(jié)點(diǎn)
// 判斷是否需要park狂魔,如果不park 繼續(xù)循環(huán)
private static boolean shouldParkAfterFailedAcquire(AbstractQueuedSynchronizer.Node pred, AbstractQueuedSynchronizer.Node node) {
// 獲取當(dāng)前節(jié)點(diǎn)狀態(tài)
int ws = pred.waitStatus;
if (ws == AbstractQueuedSynchronizer.Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
* 如果節(jié)點(diǎn)的等待狀態(tài)時SIGNAL,表示已經(jīng)在請求SIGNAL淫痰,所以可以安全的等待
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
* 表示前置節(jié)點(diǎn)取消了毅臊,跳過前置節(jié)點(diǎn)
*/
do {
// pred = pred.prev 前置節(jié)點(diǎn)設(shè)置成前置節(jié)點(diǎn)的前置節(jié)點(diǎn)
// node.prev = pred 當(dāng)前節(jié)點(diǎn)的前置節(jié)點(diǎn)變成前前置節(jié)點(diǎn)
// 也就是跳過前置節(jié)點(diǎn),一直這樣向后遍歷跳過取消的節(jié)點(diǎn)
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
* 表示等待狀態(tài)是 0 或者PROPAGATE 表明需要喚醒黑界,但還沒有park管嬉,調(diào)用方需要重試來保證在park之前不會獲取到鎖
* 設(shè)置前置節(jié)點(diǎn)為SIGNAL,表示要喚醒后繼節(jié)點(diǎn)
* 正常的情況會走這里朗鸠,表示想要獲取信號量蚯撩,就要把其前置節(jié)點(diǎn)的waiteStatus修改
* 當(dāng)前線程對應(yīng)的節(jié)點(diǎn)進(jìn)行入隊(duì)至隊(duì)尾(掛起之前),那么其前驅(qū)節(jié)點(diǎn)的狀態(tài)就必須為SIGNAL烛占,以便后者取消或釋放時將當(dāng)前節(jié)點(diǎn)喚醒
*/
compareAndSetWaitStatus(pred, ws, AbstractQueuedSynchronizer.Node.SIGNAL);
}
//如果不是第一種情況胎挎,比如第三種,設(shè)置完前置節(jié)點(diǎn)狀態(tài)后 又進(jìn)入acquireQueued繼續(xù)循環(huán)
return false;
}
//判斷線程是否中斷過
private final boolean parkAndCheckInterrupt() {
//阻塞線程
LockSupport.park(this);
return Thread.interrupted();
}
// 如果acquireQueued的過程中被中斷忆家,從而導(dǎo)致失敗犹菇,會取消acquire
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
// 線程置為空
node.thread = null;
// 重置pre節(jié)點(diǎn),跳過取消的節(jié)點(diǎn)
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
// 獲取pre的next節(jié)點(diǎn)芽卿,用來做CAS重新設(shè)置next節(jié)點(diǎn)
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
// 當(dāng)前節(jié)點(diǎn)狀態(tài)設(shè)置為取消
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
// 如果當(dāng)前節(jié)點(diǎn)是尾部節(jié)點(diǎn)揭芍,尾節(jié)點(diǎn)設(shè)置成前置節(jié)點(diǎn),next值置為空
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
// 如果前置節(jié)點(diǎn)是SIGNAL表示要喚醒后繼節(jié)點(diǎn)
// 并且如果狀態(tài)不是取消,也嘗試設(shè)置SIGNAL標(biāo)記卸例,喚醒后面的節(jié)點(diǎn)(自己能喚醒說明自己的前置狀態(tài)是SIGNAL称杨,自己既然取消了肌毅,SIGNAL狀態(tài)要讓現(xiàn)在的pre繼承)
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
// 前置節(jié)點(diǎn)的next指針指向當(dāng)前節(jié)點(diǎn)的next指針,跳過當(dāng)前節(jié)點(diǎn)
compareAndSetNext(pred, predNext, next);
} else {
// pred是頭節(jié)點(diǎn)姑原,嘗試喚醒后面的節(jié)點(diǎn)
unparkSuccessor(node);
}
// 指向自己悬而,方便GC,當(dāng)前節(jié)點(diǎn)消亡
node.next = node; // help GC
}
}
release方法
釋放的主要方法:
//獨(dú)占模式下釋放信號
public final boolean release(int arg) {
//如果釋放鎖成功
if (tryRelease(arg)) {
// 判斷當(dāng)前頭節(jié)點(diǎn)
AbstractQueuedSynchronizer.Node h = head;
// 0 表示不是初始化狀態(tài)
if (h != null && h.waitStatus != 0)
// 喚醒后繼節(jié)點(diǎn)
unparkSuccessor(h);
return true;
}
return false;
}
/**看下ReentrantLock的Sync鎖的實(shí)現(xiàn):
* release返回true表名解鎖成功
*/
protected final boolean tryRelease(int releases) {
// 減去當(dāng)前的信號量
int c = getState() - releases;
// 如果當(dāng)前線程不是獲取鎖的線程锭汛,不能釋放
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
// 如果信號量是0
free = true;
setExclusiveOwnerThread(null);
}
// 設(shè)置信號量
setState(c);
return free;
}
// 當(dāng)tryRelease成功后笨奠,需要喚醒后繼節(jié)點(diǎn),這里的node是頭結(jié)點(diǎn)
private void unparkSuccessor(AbstractQueuedSynchronizer.Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
* 如果狀態(tài)是是負(fù)數(shù)(可能是需要SIGNAL)唤殴,預(yù)期需要SINGAL艰躺,因此嘗試清除狀態(tài)值,
* 如果清楚失敗或者狀態(tài)已經(jīng)被改變都不會影響正確性
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
* upark下一個節(jié)點(diǎn)眨八,通常情況及時下一個節(jié)點(diǎn)腺兴,但是可能存在取消或者明顯的就是null情況,從尾部向后遍歷去找一個非取消的節(jié)點(diǎn)
* 從tail往前找一個沒有取消的節(jié)點(diǎn)
* 為什么從尾部找:是因?yàn)閑nq的時候不是原子操作廉侧,在cas設(shè)置尾部節(jié)點(diǎn)的時候页响,指向前面的連接已經(jīng)創(chuàng)建好,但是指向后面的連接沒有
*/
AbstractQueuedSynchronizer.Node s = node.next;
// 拿到頭節(jié)點(diǎn)的下一個節(jié)點(diǎn)段誊,如果是取消狀態(tài)或者下一個節(jié)點(diǎn)是null
// 從尾部向后遍歷找一個非取消的節(jié)點(diǎn)喚醒
if (s == null || s.waitStatus > 0) {
s = null;
// 從尾部向后遍歷闰蚕,如果t不等于當(dāng)前節(jié)點(diǎn)
for (AbstractQueuedSynchronizer.Node t = tail; t != null && t != node; t = t.prev)
// 非取消狀態(tài),s更新為當(dāng)前遍歷的節(jié)點(diǎn)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
acquireShared
/** tryAcquireShared:在共享模式下獲取鎖*/
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
// CountDownLatch的實(shí)現(xiàn)
protected int tryAcquireShared(int acquires) {
// 如果獲取的狀態(tài)是0,表示沒有被加鎖索赏,返回1
return (getState() == 0) ? 1 : -1;
}
// tryAcquireShared失敗盼玄,進(jìn)入隊(duì)列
private void doAcquireShared(int arg) {
// 添加shared節(jié)點(diǎn),注意潜腻,這個Node.SHARED是一個靜態(tài)變量埃儿,主要方便isShared方法判斷節(jié)點(diǎn)類型
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 拿到前置節(jié)點(diǎn)
final Node p = node.predecessor();
if (p == head) {
// 如果前置節(jié)點(diǎn)是頭節(jié)點(diǎn),說明該嘗試獲取鎖了
int r = tryAcquireShared(arg);
if (r >= 0) {
// 如果大于等于0表示獲取成功
// 設(shè)置新的頭 并傳播狀態(tài)
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// 設(shè)置隊(duì)列的頭結(jié)點(diǎn)融涣,檢查是否后繼節(jié)點(diǎn)在共享模式下等待童番,如果是,當(dāng)propagate>0 或者PROPAGATE狀態(tài)已經(jīng)設(shè)置威鹿,則進(jìn)行傳播
// 為什么acquire中會調(diào)用doReleaseShared剃斧,原因在于共享鎖是可以支持多個線程獲取的,所以要通知其他線程來獲取鎖
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
// 獲取到資源后忽你,設(shè)置新的頭
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
// 當(dāng)在下列的情況下嘗試去喚醒下一個節(jié)點(diǎn):
// 調(diào)用者需要傳播 propagate>0 或者 傳播已經(jīng)有記錄了
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 看下一個節(jié)點(diǎn)是否是共享的幼东,如果是共享的則傳播
if (s == null || s.isShared())
doReleaseShared();
}
}
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
// 保證release是傳播的,即使當(dāng)前有其他正在acquire/release的調(diào)用。
// 如果head的后繼節(jié)點(diǎn)需要SIGNAL,嘗試unpark head的后繼節(jié)點(diǎn)筋粗,如果不需要SIGNAL,status設(shè)置為PROPAGATE,來保證傳播繼續(xù).另外炸渡,必須要循環(huán)處理娜亿,避免在我們執(zhí)行的時候有其他節(jié)點(diǎn)添加到隊(duì)列中,和其他的unparkSuccessor使用不同蚌堵,我們需要知道CAS是否重置status失敗买决,如果失敗重新檢查
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) { // 說明有后繼節(jié)點(diǎn)喚醒
// 如果是SIGANL,表示要喚醒后續(xù)節(jié)點(diǎn)吼畏,設(shè)置成0督赤,然后喚醒后繼節(jié)點(diǎn)
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
// 失敗說明有其他節(jié)點(diǎn)競爭
continue; // loop to recheck cases
// unpark后繼節(jié)點(diǎn),其他節(jié)點(diǎn)會獲取資源泻蚊,然后設(shè)置頭節(jié)點(diǎn)躲舌,此時頭結(jié)點(diǎn)會變化
unparkSuccessor(h);
}
// 有新的head產(chǎn)生,新head的ws初始值為0
// 如果compareAndSetWaitStatus失敗性雄,表示有競爭没卸,繼續(xù)執(zhí)行
// 設(shè)置head為PROPAGATE,表示傳遞下去
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 如果頭節(jié)點(diǎn)沒有發(fā)生變化秒旋,表示沒有后續(xù)的節(jié)點(diǎn)獲取到資源约计,直接退出
// 其他共享節(jié)點(diǎn)獲取資源,會設(shè)置頭為Shared狀態(tài)迁筛,這樣一直喚醒下去
if (h == head) // loop if head changed
break;
}
}
releaseShared
releaseShared實(shí)際就是調(diào)用的上面的doReleaseShared
Condition使用
synchronized和notify/wait 組合是和lock和condition的await以及signal 組合等價煤蚌。await在AQS的一個內(nèi)部類中實(shí)現(xiàn):
條件隊(duì)列和等待隊(duì)列是兩個不同的隊(duì)列,但是使用的都是Node類细卧,條件隊(duì)列結(jié)束等待會進(jìn)入到等待隊(duì)列中去
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** 條件隊(duì)列的第一個節(jié)點(diǎn). */
private transient Node firstWaiter;
/** 最后一個節(jié)點(diǎn). */
private transient Node lastWaiter;
public ConditionObject() { }
// Internal methods
// 添加一個新的條件節(jié)點(diǎn)
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
// 如果尾節(jié)點(diǎn)是取消的狀態(tài)尉桩,則清理
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 重新構(gòu)造等待節(jié)點(diǎn)
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
// 開始的情況
firstWaiter = node;
else
// 構(gòu)造鏈接
t.nextWaiter = node;
lastWaiter = node;
return node;
}
// 取消鏈接waiter
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
// 拿到下一個節(jié)點(diǎn)
Node next = t.nextWaiter;
// 判斷是否取消
if (t.waitStatus != Node.CONDITION) {
// 切斷鏈接
t.nextWaiter = null;
if (trail == null)
// 頭節(jié)點(diǎn)是取消的,頭節(jié)點(diǎn)直接等于下一個節(jié)點(diǎn)
firstWaiter = next;
else
// 重新構(gòu)造鏈接
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
// 上一次遍歷的節(jié)點(diǎn)
trail = t;
// t遍歷到下一個
t = next;
}
}
}
await方法
//AQS方法
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//添加到condition隊(duì)列
Node node = addConditionWaiter();
//釋放鎖
int savedState = fullyRelease(node);
int interruptMode = 0;
//第一次總是返回false贪庙,因?yàn)樯厦嬉呀?jīng)釋放鎖了
while (!isOnSyncQueue(node)) {
// 如果沒有在同步隊(duì)列魄健,阻塞
LockSupport.park(this);
// 如果中斷退出
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//醒來之后的操作,會嘗試拿鎖插勤,同時設(shè)置標(biāo)志位
// acquireQueued進(jìn)入隊(duì)列阻塞等待被喚醒
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
// 清楚cancel節(jié)點(diǎn)
unlinkCancelledWaiters();
if (interruptMode != 0)
//根據(jù)interruptMode要不拋異常后者自我中斷
reportInterruptAfterWait(interruptMode);
}
//釋放鎖
final int fullyRelease(Node node) {
boolean failed = true;
try {
// 首先獲取當(dāng)前線程所在的AQS隊(duì)列的狀態(tài)值
int savedState = getState();
// 嘗試去釋放信號量
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
// 釋放資源
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// 如果釋放成功沽瘦,喚醒后繼節(jié)點(diǎn)
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
//判斷當(dāng)前線程是否在同步隊(duì)列,一開始是在條件隊(duì)列
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
// 第一次走這個邏輯
return false;
if (node.next != null) // If has successor, it must be on queue
// 如果有next鏈接农尖,肯定是在等待隊(duì)列上
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
// node.prev是非空的析恋,但是可能還沒有加入隊(duì)列中個,因?yàn)镃AS操作可能失敗
// 所以從尾部向頭遍歷的時候需要確定它已經(jīng)加入到隊(duì)列中
return findNodeFromTail(node);
}
/**
* Returns true if node is on sync queue by searching backwards from tail.
* Called only when needed by isOnSyncQueue.
* 從尾向后查找同步隊(duì)列盛卡,查看節(jié)點(diǎn)是否存在
*/
private boolean findNodeFromTail(AbstractQueuedSynchronizer.Node node) {
AbstractQueuedSynchronizer.Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
//檢查是否中斷助隧,如果中斷了,轉(zhuǎn)移到同步等待隊(duì)列
private int checkInterruptWhileWaiting(Node node) {
// 如果中斷則轉(zhuǎn)換節(jié)點(diǎn),如果轉(zhuǎn)換節(jié)點(diǎn)成功,拋出異常并村,失敗則自我中斷
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
// 為什么取消要進(jìn)入同步隊(duì)列:取消只是不再等待巍实,可以再去獲取鎖
final boolean transferAfterCancelledWait(Node node) {
// 設(shè)置WS為0
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
// 入同步隊(duì)列
enq(node);
return true;
}
/*
* If we lost out to a signal(), then we can't proceed
* until it finishes its enq(). Cancelling during an
* incomplete transfer is both rare and transient, so just
* spin.
*/
// 循環(huán)等待入隊(duì)列,知道入隊(duì)成功
// 走到這說明有競爭力 已經(jīng)在入隊(duì)了
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
unlinkCancelledWaiters方法的示意圖
signal
// 移除等待最長的線程哩牍,從條件隊(duì)列移到同步隊(duì)列中去
public final void signal() {
// 如果獲取鎖的線程不是當(dāng)前線程棚潦,拋異常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
// 向前遍歷,如果沒有取消膝昆,則喚醒
// 如果遇到取消節(jié)點(diǎn)丸边,則繼續(xù)找下一個節(jié)點(diǎn)處理
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
// 設(shè)置節(jié)點(diǎn)狀態(tài),如果不能設(shè)置荚孵,表示節(jié)點(diǎn)已經(jīng)取消了
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
// 入隊(duì)妹窖,返回前一個節(jié)點(diǎn)
Node p = enq(node);
int ws = p.waitStatus;
// ws>0 表示前置節(jié)點(diǎn)取消了 或者設(shè)置狀態(tài)失敗,即節(jié)點(diǎn)發(fā)生變化
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// 喚醒當(dāng)前節(jié)點(diǎn)收叶,重新同步狀態(tài)
LockSupport.unpark(node.thread);
return true;
}
ReentrantLock
前面在講解AQS的時候已經(jīng)介紹了ReentrantLock公平鎖的代碼骄呼,下面完整介紹下ReentrantLock
以ReenterantLock為例,該類中定義幾個同步器:
Sync是基類判没,是公平鎖和非公平鎖實(shí)現(xiàn)的基礎(chǔ)
tryAcquire由于公平鎖和非公平鎖實(shí)現(xiàn)不一樣 因此在Sync子類實(shí)現(xiàn)
下圖是非公平鎖NonfairSync的調(diào)用邏輯谒麦,公平鎖類似
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
允許在非公平鎖情況下實(shí)現(xiàn)快速lock
*/
abstract void lock();
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
* 嘗試獲取信號量,也就是設(shè)置state為非0數(shù)值
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//獲取
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果當(dāng)前線程已經(jīng)獲取了哆致,增加獲取的量
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//表示沒獲取到绕德,已經(jīng)被其他人獲取了
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
//只有當(dāng)前顯示是獲取信號量的線程才能釋放
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
//判斷當(dāng)前線程是否獨(dú)占
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
// Methods relayed from outer class
final Thread getOwner() {
//state為0表示沒加鎖
return getState() == 0 ? null : getExclusiveOwnerThread();
}
final int getHoldCount() {
//state的數(shù)值表示獲取鎖的數(shù)量
return isHeldExclusively() ? getState() : 0;
}
final boolean isLocked() {
return getState() != 0;
}
/**
* Reconstitutes the instance from a stream (that is, deserializes it).
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
非公平鎖的實(shí)現(xiàn):
static final class NonfairSync extends ReentrantLock.Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* 嘗試獲取鎖,將state從0更新成1摊阀,如果失敗直接使用acquire
* 獲取鎖的主要邏輯acquire是AQS中的方法耻蛇,其中又會調(diào)用tryAcquire
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
//AQS子類關(guān)聯(lián)的實(shí)現(xiàn)方法,可以看到調(diào)用的是上面的nonfairTryAcquire
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
公平鎖的實(shí)現(xiàn):
static final class FairSync extends ReentrantLock.Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果是鎖的重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
//查詢是否有線程比當(dāng)前線程等待的時間更長
//由于并發(fā)的實(shí)時性 該方法返回的結(jié)果不一定是正確的
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
AbstractQueuedSynchronizer.Node t = tail; // Read fields in reverse initialization order
AbstractQueuedSynchronizer.Node h = head;
AbstractQueuedSynchronizer.Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}