AQS

AQS解析

AQS通過FIFO的等待隊(duì)列障簿,提供了一個用來實(shí)現(xiàn)阻塞鎖和相關(guān)的同步器(信號量盹愚、事件機(jī)制)的框架,它是很多同步器實(shí)現(xiàn)的底層基礎(chǔ)站故,內(nèi)部基于一個int原子變量來代表狀態(tài)皆怕,子類必須實(shí)現(xiàn)protect方法來改變狀態(tài),這些方法定義怎么去獲取鎖和釋放鎖西篓。AQS的其他方法則定義了入隊(duì)以及阻塞的機(jī)制愈腾,子類可以定義其他的狀態(tài)變量,但是只有通過getState污淋,SetState以及compareAndSetState來原子更新的state變量能代表同步狀態(tài)。子類一般定義為非public的內(nèi)部類余掖,AQS提供了獨(dú)占模式和共享模式獲取鎖寸爆,當(dāng)在獨(dú)占模式,嘗試獲取其他線程占用的鎖將會失敗盐欺,共享模式下多線程則可能會成功赁豆。不同模式下的等待線程公用同一個FIFO隊(duì)列,通常冗美,子類只提供其中一種模式的實(shí)現(xiàn)魔种,ReadWriteLock除外。
該類的內(nèi)部定義了一個嵌套的ConditionObject類用來作為Condition的實(shí)現(xiàn)粉洼,用來支撐獨(dú)占模式的實(shí)現(xiàn)

AQS類的繼承關(guān)系:

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable

AbstractOwnableSynchronizer:同步器可能被一個線程獨(dú)占节预,因此抽象出該類保存獨(dú)占的線程。主要屬性和方法:

private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread thread) {
    exclusiveOwnerThread = thread;
}
protected final Thread getExclusiveOwnerThread() {
    return exclusiveOwnerThread;
}

Node節(jié)點(diǎn)

AQS最主要的是其中的節(jié)點(diǎn)類Node属韧,用來表示阻塞的等待隊(duì)列中的節(jié)點(diǎn)(下面這幾段話翻譯自java doc安拟,有點(diǎn)拗口)

等待隊(duì)列是CLH鎖隊(duì)列的變種,CLH鎖一般用來作為自旋鎖宵喂,這里用來進(jìn)行阻塞同步糠赦,但是使用的理論都是持有前置節(jié)點(diǎn)的狀態(tài)來進(jìn)行同步等待。每個節(jié)點(diǎn)有一個"status"屬性,用來表示線程是否block拙泽,當(dāng)前置節(jié)點(diǎn)釋放鎖淌山,當(dāng)前節(jié)點(diǎn)會被通知。隊(duì)列里面的每個節(jié)點(diǎn)就像持有單個等待線程的monitor一樣顾瞻。這個status屬性不會控制線程是否獲取到鎖泼疑。當(dāng)一個線程在隊(duì)頭的時候會嘗試獲取鎖,但是排在第一位并不能保證能夠獲取成功朋其,它只是表示有權(quán)利去競爭王浴。所以釋放鎖的線程再去獲取鎖時可能需要等待蹲嚣。
當(dāng)入CLH鎖隊(duì)列時崩哩,會通過原子操作將入隊(duì)節(jié)點(diǎn)排在隊(duì)尾米奸,出隊(duì)的時候锋谐,只需要設(shè)置頭結(jié)點(diǎn)的狀態(tài)(置為空)

        +------+  prev +-----+       +-----+
   head |      | <---- |     | <---- |     |  tail
        +------+       +-----+       +-----+

插入到CLH隊(duì)列中蝎宇,只需要對“尾巴”執(zhí)行一次原子操作技潘,因此存在一個簡單的原子分界點(diǎn)襟齿,即從未排隊(duì)到排隊(duì)铸本。同樣喇潘,出隊(duì)僅涉及更新“頭”体斩。但是,節(jié)點(diǎn)需要花費(fèi)更多的精力來確定其后繼者是誰颖低,因?yàn)橐幚碛捎诔瑫r和中斷而可能導(dǎo)致的取消絮吵。前置鏈接需要處理取消的情況,如果一個節(jié)點(diǎn)已經(jīng)取消了忱屑,它的后繼節(jié)點(diǎn)將會重新鏈接到新的非取消的前置節(jié)點(diǎn)蹬敲。同時這里又使用next鏈接來實(shí)現(xiàn)阻塞機(jī)制,每個節(jié)點(diǎn)的線程ID保存在自己的節(jié)點(diǎn)中莺戒,所以當(dāng)一個前置節(jié)點(diǎn)喚醒后面的節(jié)點(diǎn)伴嗡,通過遍歷next鏈接決定哪個線程被喚醒。
確定后繼者必須避免與新排隊(duì)的節(jié)點(diǎn)競爭从铲,競爭是通過設(shè)置他們前置節(jié)點(diǎn)的next屬性產(chǎn)生瘪校。這里的解決辦法是,如果需要名段,通過向后遍歷檢查隊(duì)列中的節(jié)點(diǎn)的后繼節(jié)點(diǎn)為null(換句話說阱扬,next鏈接是一種優(yōu)化,能夠避免向后掃描)
由于我們必須輪詢其他節(jié)點(diǎn)的取消情況伸辟,因此我們可能沒有注意到已取消的節(jié)點(diǎn)是在我們前面還是后面价认。要解決此問題,必須在取消時自娩,總是unpark后繼節(jié)點(diǎn)用踩,使他們能夠穩(wěn)定在新的前置節(jié)點(diǎn)身上渠退,除非我們可以確定未取消的前置節(jié)點(diǎn)。
在條件上等待的線程使用同樣的節(jié)點(diǎn)脐彩,但是使用額外的鏈接碎乃。

關(guān)于CLH鎖:博客1博客2惠奸,一個CLH鎖的實(shí)現(xiàn)梅誓,方便我們理解AQS隊(duì)列:

public class ClhSpinLock {
    // 前置節(jié)點(diǎn)
    private final ThreadLocal<Node> pred;
    // 當(dāng)前情況節(jié)點(diǎn)
    private final ThreadLocal<Node> node;
    // 尾部節(jié)點(diǎn)
    private final AtomicReference<Node> tail = new AtomicReference<Node>(new Node());

    public ClhSpinLock() {
        // 這兩個節(jié)點(diǎn)是線程內(nèi)部變量,tail是多線程共享變量
        this.node = ThreadLocal.withInitial(() -> new Node());

        this.pred = ThreadLocal.withInitial(() -> null);
    }

    public void lock() {
        // 當(dāng)前持有鎖的節(jié)點(diǎn)
        final Node node = this.node.get();
        // 表示當(dāng)前準(zhǔn)備持有鎖
        node.locked = true;
        // 加鎖都會直接設(shè)置tail節(jié)點(diǎn)佛南,并獲取之前的tail節(jié)點(diǎn)
        // 有競爭時梗掰,只有獲取到鎖的才會執(zhí)行完lock,其他都會在while中等待
        // 對于同一個線程都是獲取同一個node嗅回,對于不同線程會獲取之前的tail及穗,從而形成一個鏈
        Node pred = this.tail.getAndSet(node);
        // 每次后來的接單會覆蓋前置節(jié)點(diǎn),形成一個鏈
        this.pred.set(pred);
        while (pred.locked) {}
    }

    public void unlock() {
        // 獲取當(dāng)前線程的鎖節(jié)點(diǎn)
        final Node node = this.node.get();
        // 釋放鎖绵载,上面的while會退出從而獲取到鎖
        node.locked = false;
        // 當(dāng)前節(jié)點(diǎn)設(shè)置為前置節(jié)點(diǎn)埂陆,開始時,pred相當(dāng)于一個虛擬的頭結(jié)點(diǎn)
        this.node.set(this.pred.get());
    }

    private static class Node {
        private volatile boolean locked;
    }
}

AQS中的等待隊(duì)列Node代碼如下:


static final class Node {
  /** 表示節(jié)點(diǎn)在共享模式下等待 */
  static final Node SHARED = new Node();
  /** 表示節(jié)點(diǎn)在獨(dú)占模式下等待 */
  static final Node EXCLUSIVE = null;

  /** waitStatus的值娃豹,表示取消 */
  static final int CANCELLED =  1;
  /** waitStatus的值焚虱,表示后繼節(jié)點(diǎn)需要喚醒 */
  static final int SIGNAL    = -1;
  /** waitStatus的值,表示線程正在條件等待 */
  static final int CONDITION = -2;
  /**
  * waitStatus的值懂版,表示下一個獲取共享鎖需要無條件傳播
  */
  static final int PROPAGATE = -3;

  /**
 * 非負(fù)的數(shù)值表示節(jié)點(diǎn)不需要喚醒鹃栽,在同步節(jié)點(diǎn)中,該值初始化為0
 * Status field, taking on only the values:
 *   SIGNAL:     當(dāng)前節(jié)點(diǎn)獲取鎖后躯畴,后繼節(jié)點(diǎn)將會被被阻塞民鼓,因此當(dāng)當(dāng)前節(jié)點(diǎn)釋放鎖或者取消的時候,當(dāng)前節(jié)點(diǎn)需要喚醒后繼節(jié)點(diǎn)私股,
 為了避免競爭摹察,acquire方法必須一開要說明他們需要signal恩掷,然后進(jìn)行原子地acquire倡鲸,然后失敗的時候阻塞
                 The successor of this node is (or will soon be)
 *               blocked (via park), so the current node must
 *               unpark its successor when it releases or
 *               cancels. To avoid races, acquire methods must
 *               first indicate they need a signal,
 *               then retry the atomic acquire, and then,
 *               on failure, block.
 *   CANCELLED:  當(dāng)前節(jié)點(diǎn)超時或者被中斷,節(jié)點(diǎn)進(jìn)入到這個狀態(tài)后不會再改變
                 This node is cancelled due to timeout or interrupt.
 *               Nodes never leave this state. In particular,
 *               a thread with cancelled node never again blocks.
 *   CONDITION:  當(dāng)前節(jié)點(diǎn)處于等待條件隊(duì)列中黄娘,它知道狀態(tài)發(fā)生改變峭状,變成0時,才會作為一個同步隊(duì)列節(jié)點(diǎn)逼争,使用這個值和該屬性的其他使用無關(guān)优床,只是為了簡化機(jī)制
                 This node is currently on a condition queue.
 *               It will not be used as a sync queue node
 *               until transferred, at which time the status
 *               will be set to 0. (Use of this value here has
 *               nothing to do with the other uses of the
 *               field, but simplifies mechanics.)
 *   PROPAGATE:  一個釋放的共享狀態(tài)需要傳播到其他節(jié)點(diǎn),只針對頭節(jié)點(diǎn)設(shè)置誓焦,并在doReleaseShared設(shè)置胆敞,來保證傳播的繼續(xù),即使其他操作已經(jīng)介入
                 A releaseShared should be propagated to other
 *               nodes. This is set (for head node only) in
 *               doReleaseShared to ensure propagation
 *               continues, even if other operations have
 *               since intervened.
 *   0:          None of the above 其他
 * 非負(fù)值表示節(jié)點(diǎn)不需要喚醒操作,所以大部分代碼不需要檢查特定值移层,直接賦值即可
 * The values are arranged numerically to simplify use.
 * Non-negative values mean that a node doesn't need to
 * signal. So, most code doesn't need to check for particular
 * values, just for sign.
 * 對于一般的同步節(jié)點(diǎn)來說初始值是0仍翰,對于條件等待節(jié)點(diǎn)來說是CONDITION,該值的修改使用CAS
 * The field is initialized to 0 for normal sync nodes, and
 * CONDITION for condition nodes.  It is modified using CAS
 * (or when possible, unconditional volatile writes).
 */
  volatile int waitStatus;

/**
 * Link to predecessor node that current node/thread relies on
 * for checking waitStatus. Assigned during enqueuing, and nulled
 * out (for sake of GC) only upon dequeuing.  Also, upon
 * cancellation of a predecessor, we short-circuit while
 * finding a non-cancelled one, which will always exist
 * because the head node is never cancelled: A node becomes
 * head only as a result of successful acquire. A
 * cancelled thread never succeeds in acquiring, and a thread only
 * cancels itself, not any other node.
 * 前驅(qū)節(jié)點(diǎn):入隊(duì)的時候賦值观话,出隊(duì)的時候賦值為null
   前驅(qū)節(jié)點(diǎn)的鏈接予借,在入隊(duì)的時候賦值,在出隊(duì)的時候賦值為null频蛔。如果一個前驅(qū)節(jié)點(diǎn)取消灵迫,會通過短路操作找到一個非取消的節(jié)點(diǎn),
   該節(jié)點(diǎn)一定存在晦溪,因?yàn)轭^節(jié)點(diǎn)永遠(yuǎn)不會取消瀑粥。當(dāng)且僅當(dāng)獲取鎖成功,一個節(jié)點(diǎn)才會變成頭結(jié)點(diǎn)尼变。一個取消的節(jié)點(diǎn)永遠(yuǎn)不會成功的獲取鎖利凑,
   并且一個線程只會取消自己,不會取消其他節(jié)點(diǎn)
 */
  volatile Node prev;

/**
 * Link to the successor node that the current node/thread
 * unparks upon release. Assigned during enqueuing, adjusted
 * when bypassing cancelled predecessors, and nulled out (for
 * sake of GC) when dequeued.  The enq operation does not
 * assign next field of a predecessor until after attachment,
 * so seeing a null next field does not necessarily mean that
 * node is at end of queue. However, if a next field appears
 * to be null, we can scan prev's from the tail to
 * double-check.  The next field of cancelled nodes is set to
 * point to the node itself instead of null, to make life
 * easier for isOnSyncQueue.
 后繼節(jié)點(diǎn)的鏈接嫌术,入隊(duì)的時候賦值哀澈,當(dāng)繞過取消節(jié)點(diǎn)的時候會調(diào)整,當(dāng)出隊(duì)的時候會賦值為null度气。直到關(guān)聯(lián)后割按,入隊(duì)操作才會賦值前置節(jié)點(diǎn)的next屬性,所以當(dāng)看到next屬性是null磷籍,并不代表它是隊(duì)列的最后一個節(jié)點(diǎn)适荣。然后,如果next屬性是null院领,我們可以從尾部向后掃描prev節(jié)點(diǎn)進(jìn)行double-check弛矛。取消節(jié)點(diǎn)的next屬性設(shè)置成該節(jié)點(diǎn)自己,而不是null比然,這是為了方便isOnSyncQueue方法
 */
  volatile Node next;

  /** 當(dāng)前入隊(duì)節(jié)點(diǎn)的線程
  * The thread that enqueued this node.  Initialized on
  * construction and nulled out after use.
  */
  volatile Thread thread;

/**
 下一個在條件上等待的節(jié)點(diǎn),或者是特殊節(jié)點(diǎn)SHARED丈氓。因?yàn)闂l件隊(duì)列僅在獨(dú)占模式下才會訪問,所以我們只需要一個簡單的隊(duì)列來存放等待條件的節(jié)點(diǎn)强法。
 * Link to next node waiting on condition, or the special
 * value SHARED.  Because condition queues are accessed only
 * when holding in exclusive mode, we just need a simple
 * linked queue to hold nodes while they are waiting on
 * conditions. They are then transferred to the queue to
 * re-acquire. And because conditions can only be exclusive,
 * we save a field by using special value to indicate shared
 * mode.
 */
  Node nextWaiter;

  /**
  * Returns true if node is waiting in shared mode.
  */
  final boolean isShared() {
    return nextWaiter == SHARED;
  }

  final Node predecessor() throws NullPointerException {
    Node p = prev;
    if (p == null)
      throw new NullPointerException();
    else
      return p;
  }

  Node() {    // Used to establish initial head or SHARED marker
  }

  Node(Thread thread, Node mode) {     // Used by addWaiter
    this.nextWaiter = mode;
    this.thread = thread;
  }

  Node(Thread thread, int waitStatus) { // Used by Condition
    this.waitStatus = waitStatus;
    this.thread = thread;
  }
}

子類實(shí)現(xiàn)需要重寫的方法

繼承AQS的實(shí)現(xiàn)類需要重寫這幾個方法:

//嘗試在獨(dú)占模式中獲取鎖万俗,這個方法調(diào)用之前需要查詢當(dāng)前對象的狀態(tài)是否在獨(dú)占模式下允許獲取,如果獲取失敗饮怯,該方法可能會將當(dāng)前線程入隊(duì)
protected boolean tryAcquire(int arg)
//在獨(dú)占模式下闰歪,線程釋放鎖,該方法會嘗試設(shè)置同步器的狀態(tài)
protected boolean tryRelease(int arg)
//嘗試在共享模式下獲取鎖
protected int tryAcquireShared(int arg)
//嘗試在共享模式下釋放鎖
protected boolean tryReleaseShared(int arg)
image.png

AQS其他字段屬性:

/**
* Head of the wait queue, lazily initialized.  Except for
* initialization, it is modified only via method setHead.  Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
等待隊(duì)列的頭結(jié)點(diǎn)蓖墅,采用延遲初始化库倘,如果頭存在临扮,waitStatus保證不能是取消狀態(tài)
*/
private transient volatile Node head;

/**
* Tail of the wait queue, lazily initialized.  Modified only via
* method enq to add new wait node.
* 尾結(jié)點(diǎn)
*/
private transient volatile Node tail;

/**
* 同步狀態(tài),為0表示沒加鎖
*/
private volatile int state;
// 為了提高性能直接使用本地方法CAS修改相關(guān)字段的值,所以需要Unsafe獲取到每個屬性的內(nèi)存偏移
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;

acquire方法

AQS的主要方法:

acquire 獲取允許

release釋放允許

下面講解下acquire 方法教翩,配合ReentrantLock中的實(shí)現(xiàn)講解公条,參考博客

//獲取信號量的過程,tryAcquire邏輯由子類實(shí)現(xiàn),如果獲取失敗迂曲,進(jìn)入隊(duì)列靶橱,在隊(duì)列中獲取,如果中斷了 則中斷
public final void acquire(int arg) {    // Method 1
  /**如果tryAcquire成功路捧。不會走到后面的acquireQueued关霸,
  如果tryAcquire失敗,會走到acquireQueued杰扫,表示在隊(duì)里中去等待獲取鎖*/
  if (!tryAcquire(arg) &&
      acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
}

/**這里看下ReentrantLock中的公平鎖的tryAcquire的實(shí)現(xiàn)*/
protected final boolean tryAcquire(int acquires) {  // Method 2
    final Thread current = Thread.currentThread();
    // 調(diào)用的AQS代碼队寇,獲取當(dāng)前狀態(tài)
    int c = getState();
    // 如果當(dāng)前狀態(tài)是0,
    if (c == 0) {
        // hasQueuedPredecessors代碼在下面章姓,看下隊(duì)列中是否還有前置節(jié)點(diǎn)
        // 如果沒有前置節(jié)點(diǎn)佳遣,就去獲取信號量,獲取的數(shù)量是acquires
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            // 設(shè)置當(dāng)前線程
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 這里表示有競爭凡伊,但是重入的情況還是可以獲取鎖的
    else if (current == getExclusiveOwnerThread()) {
        // 如果是重入的情況零渐,直接設(shè)置信號量
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    // 表示有競爭,或者有前置節(jié)點(diǎn)系忙,需要等待
    return false;
}
/**
 * 查詢當(dāng)前是否有其他的線程比當(dāng)前線程等待的更久诵盼,當(dāng)前方法的邏輯類似于下面的代碼(效率比JDK的代碼低):
 *
 * getFirstQueuedThread() != Thread.currentThread() && hasQueuedThreads()
 *
 * 注意,由于中斷或者超時產(chǎn)生的取消可能發(fā)生在任何時候银还,所以返回true不能保證有其他線程會比當(dāng)前線程更先獲取到鎖风宁,同樣,當(dāng)方法返回false(判斷了隊(duì)列為空)蛹疯,也可能出現(xiàn)另一個線程在方法調(diào)用之后贏得競爭入隊(duì)(也就是該方法結(jié)果不可信)
 * 這個方法主要被公平鎖用來避免競爭戒财,當(dāng)這個方法返回true(非重入情況),公平鎖的tryAcquire方法應(yīng)該直接返回false捺弦,以及tryAcquireShared返回一個負(fù)值饮寞,下面是一個代碼案例:
 * protected boolean tryAcquire(int arg) {
 *   if (isHeldExclusively()) {
 *     // A reentrant acquire; increment hold count
 *   // 重入,增加計(jì)數(shù)
 *     return true;
 *   } else if (hasQueuedPredecessors()) {
 *     return false;
 *   } else {
 *     // try to acquire normally
 *   }
 * }
 */
public final boolean hasQueuedPredecessors() {  // Method 3
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    // 結(jié)果的正確性依賴頭節(jié)點(diǎn)比尾節(jié)點(diǎn)更早初始化羹呵,以及如果當(dāng)前線程是隊(duì)列中的第一個骂际,head.next是準(zhǔn)確的
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    // 頭不等于尾疗琉,頭等于尾表示隊(duì)列為空冈欢,在初始化的時候
    // 頭的下一個節(jié)點(diǎn)為null或者當(dāng)前線程不能等于頭節(jié)點(diǎn)的線程
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}


// 假設(shè)tryAcquire獲取失敗,也就是有競爭的情況盈简,那后進(jìn)入的線程肯定要等待凑耻,等待需要進(jìn)入等待隊(duì)列太示,需要調(diào)用addWaiter方法
// 創(chuàng)建當(dāng)前線程節(jié)點(diǎn)并入隊(duì)列,返回當(dāng)前線程節(jié)點(diǎn)
private Node addWaiter(Node mode) { // Method 4
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    // 拿到尾節(jié)點(diǎn)香浩,嘗試在尾部加節(jié)點(diǎn)
    Node pred = tail;
    if (pred != null) {
        // 當(dāng)前節(jié)點(diǎn)的prev鏈接指向pred
        node.prev = pred;
        //CAS設(shè)置尾結(jié)點(diǎn)类缤,設(shè)置成功返回true,設(shè)置失敗表示入隊(duì)有競爭
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 如果設(shè)置失敗,獲取的尾結(jié)點(diǎn)已經(jīng)不是尾結(jié)點(diǎn)
    // 或者尾節(jié)點(diǎn)為null邻吭,還沒有初始化餐弱,再嘗試入隊(duì)
    enq(node);
    // 返回新入隊(duì)的節(jié)點(diǎn)
    return node;
}
// 循環(huán)入隊(duì)
private Node enq(final Node node) {
  //循環(huán)操作
  for (;;) {
    Node t = tail;
    //如果尾結(jié)點(diǎn)為null 表示沒有初始化,開始初始化頭結(jié)點(diǎn)和尾結(jié)點(diǎn)為同一節(jié)點(diǎn)
    //這里使用了延遲初始化囱晴,初始化時膏蚓,頭節(jié)點(diǎn)等于尾節(jié)點(diǎn)
    if (t == null) { // Must initialize
      if (compareAndSetHead(new Node()))
        tail = head;
    } else {
        // 當(dāng)前節(jié)點(diǎn)的pre節(jié)點(diǎn)指向尾節(jié)點(diǎn)
        // 注意此處的設(shè)置鏈接和下面的設(shè)置next鏈接不是原子操作
        // 因此在找取消節(jié)點(diǎn)的是,是從后向前遍歷畸写,因?yàn)閚ext指針可能沒連上
        node.prev = t;
        // 設(shè)置新的尾結(jié)點(diǎn)
        if (compareAndSetTail(t, node)) {
            t.next = node;
            // 返回舊的尾節(jié)點(diǎn)
            return t;
        }
    }
  }
}
// 現(xiàn)在節(jié)點(diǎn)已經(jīng)入隊(duì)了
// node是新創(chuàng)建的入隊(duì)的節(jié)點(diǎn)
// 該方法是節(jié)點(diǎn)在隊(duì)列中喚醒然后獲取鎖這一邏輯的實(shí)現(xiàn)
final boolean acquireQueued(final AbstractQueuedSynchronizer.Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        //循環(huán)入隊(duì)
        for (;;) {
            final AbstractQueuedSynchronizer.Node p = node.predecessor();
            // 如果前置節(jié)點(diǎn)是頭節(jié)點(diǎn)驮瞧,再獲取資源
            if (p == head && tryAcquire(arg)) {
                // 設(shè)置新的頭結(jié)點(diǎn),相當(dāng)于獲取到鎖后取代新的頭節(jié)點(diǎn)
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 是否需要park枯芬,如果需要則park
            // 如果不需要直接進(jìn)入下一輪循環(huán)
            if (shouldParkAfterFailedAcquire(p, node) &&
                    // 用來阻塞等待(park)论笔,醒來后返回是否中斷過,如果中斷過返回true
                    // 然后設(shè)置interrupted為true
                    parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
// pred是前置節(jié)點(diǎn)千所,node是當(dāng)前節(jié)點(diǎn)
// 判斷是否需要park狂魔,如果不park 繼續(xù)循環(huán)
private static boolean shouldParkAfterFailedAcquire(AbstractQueuedSynchronizer.Node pred, AbstractQueuedSynchronizer.Node node) {
    // 獲取當(dāng)前節(jié)點(diǎn)狀態(tài)
    int ws = pred.waitStatus;
    if (ws == AbstractQueuedSynchronizer.Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         * 如果節(jié)點(diǎn)的等待狀態(tài)時SIGNAL,表示已經(jīng)在請求SIGNAL淫痰,所以可以安全的等待
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         * 表示前置節(jié)點(diǎn)取消了毅臊,跳過前置節(jié)點(diǎn)
         */
        do {
            // pred = pred.prev 前置節(jié)點(diǎn)設(shè)置成前置節(jié)點(diǎn)的前置節(jié)點(diǎn)
            // node.prev = pred 當(dāng)前節(jié)點(diǎn)的前置節(jié)點(diǎn)變成前前置節(jié)點(diǎn)
            // 也就是跳過前置節(jié)點(diǎn),一直這樣向后遍歷跳過取消的節(jié)點(diǎn)
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         * 表示等待狀態(tài)是 0 或者PROPAGATE 表明需要喚醒黑界,但還沒有park管嬉,調(diào)用方需要重試來保證在park之前不會獲取到鎖
         * 設(shè)置前置節(jié)點(diǎn)為SIGNAL,表示要喚醒后繼節(jié)點(diǎn)
         * 正常的情況會走這里朗鸠,表示想要獲取信號量蚯撩,就要把其前置節(jié)點(diǎn)的waiteStatus修改
         * 當(dāng)前線程對應(yīng)的節(jié)點(diǎn)進(jìn)行入隊(duì)至隊(duì)尾(掛起之前),那么其前驅(qū)節(jié)點(diǎn)的狀態(tài)就必須為SIGNAL烛占,以便后者取消或釋放時將當(dāng)前節(jié)點(diǎn)喚醒
         */
        compareAndSetWaitStatus(pred, ws, AbstractQueuedSynchronizer.Node.SIGNAL);
    }
    //如果不是第一種情況胎挎,比如第三種,設(shè)置完前置節(jié)點(diǎn)狀態(tài)后 又進(jìn)入acquireQueued繼續(xù)循環(huán)
    return false;
}
//判斷線程是否中斷過
private final boolean parkAndCheckInterrupt() {
  //阻塞線程
  LockSupport.park(this);
  return Thread.interrupted();
}
// 如果acquireQueued的過程中被中斷忆家,從而導(dǎo)致失敗犹菇,會取消acquire
private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)
        return;
    // 線程置為空
    node.thread = null;

    // 重置pre節(jié)點(diǎn),跳過取消的節(jié)點(diǎn)
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    // predNext is the apparent node to unsplice. CASes below will
    // fail if not, in which case, we lost race vs another cancel
    // or signal, so no further action is necessary.
    // 獲取pre的next節(jié)點(diǎn)芽卿,用來做CAS重新設(shè)置next節(jié)點(diǎn)
    Node predNext = pred.next;

    // Can use unconditional write instead of CAS here.
    // After this atomic step, other Nodes can skip past us.
    // Before, we are free of interference from other threads.
    // 當(dāng)前節(jié)點(diǎn)狀態(tài)設(shè)置為取消
    node.waitStatus = Node.CANCELLED;

    // If we are the tail, remove ourselves.
    // 如果當(dāng)前節(jié)點(diǎn)是尾部節(jié)點(diǎn)揭芍,尾節(jié)點(diǎn)設(shè)置成前置節(jié)點(diǎn),next值置為空
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        if (pred != head &&
            // 如果前置節(jié)點(diǎn)是SIGNAL表示要喚醒后繼節(jié)點(diǎn)
            // 并且如果狀態(tài)不是取消,也嘗試設(shè)置SIGNAL標(biāo)記卸例,喚醒后面的節(jié)點(diǎn)(自己能喚醒說明自己的前置狀態(tài)是SIGNAL称杨,自己既然取消了肌毅,SIGNAL狀態(tài)要讓現(xiàn)在的pre繼承)
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                // 前置節(jié)點(diǎn)的next指針指向當(dāng)前節(jié)點(diǎn)的next指針,跳過當(dāng)前節(jié)點(diǎn)
                compareAndSetNext(pred, predNext, next);
        } else {
            // pred是頭節(jié)點(diǎn)姑原,嘗試喚醒后面的節(jié)點(diǎn)
            unparkSuccessor(node);
        }
        // 指向自己悬而,方便GC,當(dāng)前節(jié)點(diǎn)消亡
        node.next = node; // help GC
    }
}

release方法

釋放的主要方法:

//獨(dú)占模式下釋放信號
public final boolean release(int arg) {
    //如果釋放鎖成功
    if (tryRelease(arg)) {
        // 判斷當(dāng)前頭節(jié)點(diǎn)
        AbstractQueuedSynchronizer.Node h = head;
        // 0 表示不是初始化狀態(tài)
        if (h != null && h.waitStatus != 0)
            // 喚醒后繼節(jié)點(diǎn)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
/**看下ReentrantLock的Sync鎖的實(shí)現(xiàn):
* release返回true表名解鎖成功
*/
protected final boolean tryRelease(int releases) {
    // 減去當(dāng)前的信號量
    int c = getState() - releases;
    // 如果當(dāng)前線程不是獲取鎖的線程锭汛,不能釋放
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        // 如果信號量是0  
        free = true;
        setExclusiveOwnerThread(null);
    }
    // 設(shè)置信號量
    setState(c);
    return free;
}
// 當(dāng)tryRelease成功后笨奠,需要喚醒后繼節(jié)點(diǎn),這里的node是頭結(jié)點(diǎn)
private void unparkSuccessor(AbstractQueuedSynchronizer.Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     * 如果狀態(tài)是是負(fù)數(shù)(可能是需要SIGNAL)唤殴,預(yù)期需要SINGAL艰躺,因此嘗試清除狀態(tài)值,
     * 如果清楚失敗或者狀態(tài)已經(jīng)被改變都不會影響正確性
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     * upark下一個節(jié)點(diǎn)眨八,通常情況及時下一個節(jié)點(diǎn)腺兴,但是可能存在取消或者明顯的就是null情況,從尾部向后遍歷去找一個非取消的節(jié)點(diǎn) 
     * 從tail往前找一個沒有取消的節(jié)點(diǎn)
     * 為什么從尾部找:是因?yàn)閑nq的時候不是原子操作廉侧,在cas設(shè)置尾部節(jié)點(diǎn)的時候页响,指向前面的連接已經(jīng)創(chuàng)建好,但是指向后面的連接沒有
     */
    AbstractQueuedSynchronizer.Node s = node.next;
    // 拿到頭節(jié)點(diǎn)的下一個節(jié)點(diǎn)段誊,如果是取消狀態(tài)或者下一個節(jié)點(diǎn)是null
    // 從尾部向后遍歷找一個非取消的節(jié)點(diǎn)喚醒
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 從尾部向后遍歷闰蚕,如果t不等于當(dāng)前節(jié)點(diǎn)
        for (AbstractQueuedSynchronizer.Node t = tail; t != null && t != node; t = t.prev)
            // 非取消狀態(tài),s更新為當(dāng)前遍歷的節(jié)點(diǎn)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

acquireShared

共享資源的獲取连舍,參考:博客没陡,博客2

/** tryAcquireShared:在共享模式下獲取鎖*/
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}
// CountDownLatch的實(shí)現(xiàn)
protected int tryAcquireShared(int acquires) {
    // 如果獲取的狀態(tài)是0,表示沒有被加鎖索赏,返回1
    return (getState() == 0) ? 1 : -1;
}
// tryAcquireShared失敗盼玄,進(jìn)入隊(duì)列
private void doAcquireShared(int arg) {
    // 添加shared節(jié)點(diǎn),注意潜腻,這個Node.SHARED是一個靜態(tài)變量埃儿,主要方便isShared方法判斷節(jié)點(diǎn)類型
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 拿到前置節(jié)點(diǎn)
            final Node p = node.predecessor();
            if (p == head) {
                // 如果前置節(jié)點(diǎn)是頭節(jié)點(diǎn),說明該嘗試獲取鎖了
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // 如果大于等于0表示獲取成功
                    // 設(shè)置新的頭 并傳播狀態(tài)
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
// 設(shè)置隊(duì)列的頭結(jié)點(diǎn)融涣,檢查是否后繼節(jié)點(diǎn)在共享模式下等待童番,如果是,當(dāng)propagate>0 或者PROPAGATE狀態(tài)已經(jīng)設(shè)置威鹿,則進(jìn)行傳播
// 為什么acquire中會調(diào)用doReleaseShared剃斧,原因在于共享鎖是可以支持多個線程獲取的,所以要通知其他線程來獲取鎖
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    // 獲取到資源后忽你,設(shè)置新的頭
    setHead(node);
    /*
         * Try to signal next queued node if:
         *   Propagation was indicated by caller,
         *     or was recorded (as h.waitStatus either before
         *     or after setHead) by a previous operation
         *     (note: this uses sign-check of waitStatus because
         *      PROPAGATE status may transition to SIGNAL.)
         * and
         *   The next node is waiting in shared mode,
         *     or we don't know, because it appears null
         *
         * The conservatism in both of these checks may cause
         * unnecessary wake-ups, but only when there are multiple
         * racing acquires/releases, so most need signals now or soon
         * anyway.
         */
    // 當(dāng)在下列的情況下嘗試去喚醒下一個節(jié)點(diǎn):
    // 調(diào)用者需要傳播 propagate>0 或者 傳播已經(jīng)有記錄了
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        // 看下一個節(jié)點(diǎn)是否是共享的幼东,如果是共享的則傳播
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
    // 保證release是傳播的,即使當(dāng)前有其他正在acquire/release的調(diào)用。
    // 如果head的后繼節(jié)點(diǎn)需要SIGNAL,嘗試unpark head的后繼節(jié)點(diǎn)筋粗,如果不需要SIGNAL,status設(shè)置為PROPAGATE,來保證傳播繼續(xù).另外炸渡,必須要循環(huán)處理娜亿,避免在我們執(zhí)行的時候有其他節(jié)點(diǎn)添加到隊(duì)列中,和其他的unparkSuccessor使用不同蚌堵,我們需要知道CAS是否重置status失敗买决,如果失敗重新檢查
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {    // 說明有后繼節(jié)點(diǎn)喚醒
                // 如果是SIGANL,表示要喚醒后續(xù)節(jié)點(diǎn)吼畏,設(shè)置成0督赤,然后喚醒后繼節(jié)點(diǎn)
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    // 失敗說明有其他節(jié)點(diǎn)競爭
                    continue;            // loop to recheck cases
                // unpark后繼節(jié)點(diǎn),其他節(jié)點(diǎn)會獲取資源泻蚊,然后設(shè)置頭節(jié)點(diǎn)躲舌,此時頭結(jié)點(diǎn)會變化
                unparkSuccessor(h);
            }
            // 有新的head產(chǎn)生,新head的ws初始值為0
            // 如果compareAndSetWaitStatus失敗性雄,表示有競爭没卸,繼續(xù)執(zhí)行
            // 設(shè)置head為PROPAGATE,表示傳遞下去
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        // 如果頭節(jié)點(diǎn)沒有發(fā)生變化秒旋,表示沒有后續(xù)的節(jié)點(diǎn)獲取到資源约计,直接退出
        // 其他共享節(jié)點(diǎn)獲取資源,會設(shè)置頭為Shared狀態(tài)迁筛,這樣一直喚醒下去
        if (h == head)                   // loop if head changed
            break;
    }
}

releaseShared

releaseShared實(shí)際就是調(diào)用的上面的doReleaseShared

Condition使用

synchronized和notify/wait 組合是和lock和condition的await以及signal 組合等價煤蚌。await在AQS的一個內(nèi)部類中實(shí)現(xiàn):

條件隊(duì)列和等待隊(duì)列是兩個不同的隊(duì)列,但是使用的都是Node類细卧,條件隊(duì)列結(jié)束等待會進(jìn)入到等待隊(duì)列中去

public class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;
    /** 條件隊(duì)列的第一個節(jié)點(diǎn). */
    private transient Node firstWaiter;
    /** 最后一個節(jié)點(diǎn). */
    private transient Node lastWaiter;

    
    public ConditionObject() { }

    // Internal methods

    // 添加一個新的條件節(jié)點(diǎn)
    private Node addConditionWaiter() {
        Node t = lastWaiter;
        // If lastWaiter is cancelled, clean out.
        // 如果尾節(jié)點(diǎn)是取消的狀態(tài)尉桩,則清理
        if (t != null && t.waitStatus != Node.CONDITION) {
            unlinkCancelledWaiters();
            t = lastWaiter;
        }
        // 重新構(gòu)造等待節(jié)點(diǎn)
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        if (t == null)
            // 開始的情況
            firstWaiter = node;
        else
            // 構(gòu)造鏈接
            t.nextWaiter = node;
        lastWaiter = node;
        return node;
    }
    // 取消鏈接waiter
    private void unlinkCancelledWaiters() {
        Node t = firstWaiter;
        Node trail = null;
        while (t != null) {
            // 拿到下一個節(jié)點(diǎn)
            Node next = t.nextWaiter;
            // 判斷是否取消
            if (t.waitStatus != Node.CONDITION) {
                // 切斷鏈接
                t.nextWaiter = null;
                if (trail == null)
                    // 頭節(jié)點(diǎn)是取消的,頭節(jié)點(diǎn)直接等于下一個節(jié)點(diǎn)
                    firstWaiter = next;
                else
                    // 重新構(gòu)造鏈接
                    trail.nextWaiter = next;
                if (next == null)
                    lastWaiter = trail;
            }
            else
                // 上一次遍歷的節(jié)點(diǎn)
                trail = t;
            // t遍歷到下一個
            t = next;
        }
    }
}

await方法

//AQS方法
public final void await() throws InterruptedException {
  if (Thread.interrupted())
    throw new InterruptedException();
  //添加到condition隊(duì)列
  Node node = addConditionWaiter(); 
  //釋放鎖
  int savedState = fullyRelease(node);
  int interruptMode = 0;
  //第一次總是返回false贪庙,因?yàn)樯厦嬉呀?jīng)釋放鎖了
  while (!isOnSyncQueue(node)) {
    // 如果沒有在同步隊(duì)列魄健,阻塞
    LockSupport.park(this);
    // 如果中斷退出
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
      break;
  }
  //醒來之后的操作,會嘗試拿鎖插勤,同時設(shè)置標(biāo)志位
  // acquireQueued進(jìn)入隊(duì)列阻塞等待被喚醒
  if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    interruptMode = REINTERRUPT;
  if (node.nextWaiter != null) // clean up if cancelled
    // 清楚cancel節(jié)點(diǎn)
    unlinkCancelledWaiters();
  if (interruptMode != 0)
    //根據(jù)interruptMode要不拋異常后者自我中斷
    reportInterruptAfterWait(interruptMode);
}
//釋放鎖
final int fullyRelease(Node node) {
  boolean failed = true;
  try {
    // 首先獲取當(dāng)前線程所在的AQS隊(duì)列的狀態(tài)值
    int savedState = getState();
    // 嘗試去釋放信號量
    if (release(savedState)) {
      failed = false;
      return savedState;
    } else {
      throw new IllegalMonitorStateException();
    }
  } finally {
    if (failed)
      node.waitStatus = Node.CANCELLED;
  }
}
// 釋放資源
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        // 如果釋放成功沽瘦,喚醒后繼節(jié)點(diǎn)
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
//判斷當(dāng)前線程是否在同步隊(duì)列,一開始是在條件隊(duì)列
final boolean isOnSyncQueue(Node node) {
  if (node.waitStatus == Node.CONDITION || node.prev == null)
    // 第一次走這個邏輯
    return false;
  if (node.next != null) // If has successor, it must be on queue
    // 如果有next鏈接农尖,肯定是在等待隊(duì)列上
    return true;
  /*
         * node.prev can be non-null, but not yet on queue because
         * the CAS to place it on queue can fail. So we have to
         * traverse from tail to make sure it actually made it.  It
         * will always be near the tail in calls to this method, and
         * unless the CAS failed (which is unlikely), it will be
         * there, so we hardly ever traverse much.
         */
    // node.prev是非空的析恋,但是可能還沒有加入隊(duì)列中個,因?yàn)镃AS操作可能失敗
    // 所以從尾部向頭遍歷的時候需要確定它已經(jīng)加入到隊(duì)列中
    return findNodeFromTail(node);
}

/**
 * Returns true if node is on sync queue by searching backwards from tail.
 * Called only when needed by isOnSyncQueue.
 * 從尾向后查找同步隊(duì)列盛卡,查看節(jié)點(diǎn)是否存在
 */
private boolean findNodeFromTail(AbstractQueuedSynchronizer.Node node) {
    AbstractQueuedSynchronizer.Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}
//檢查是否中斷助隧,如果中斷了,轉(zhuǎn)移到同步等待隊(duì)列
private int checkInterruptWhileWaiting(Node node) {
    // 如果中斷則轉(zhuǎn)換節(jié)點(diǎn),如果轉(zhuǎn)換節(jié)點(diǎn)成功,拋出異常并村,失敗則自我中斷
  return Thread.interrupted() ?
    (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
  0;
}
// 為什么取消要進(jìn)入同步隊(duì)列:取消只是不再等待巍实,可以再去獲取鎖
final boolean transferAfterCancelledWait(Node node) {
    // 設(shè)置WS為0
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        // 入同步隊(duì)列
        enq(node);
        return true;
    }
    /*
         * If we lost out to a signal(), then we can't proceed
         * until it finishes its enq().  Cancelling during an
         * incomplete transfer is both rare and transient, so just
         * spin.
         */
    // 循環(huán)等待入隊(duì)列,知道入隊(duì)成功
    // 走到這說明有競爭力 已經(jīng)在入隊(duì)了
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}

unlinkCancelledWaiters方法的示意圖

image.png
image.png

signal

// 移除等待最長的線程哩牍,從條件隊(duì)列移到同步隊(duì)列中去
public final void signal() {
    // 如果獲取鎖的線程不是當(dāng)前線程棚潦,拋異常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}
private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
        // 向前遍歷,如果沒有取消膝昆,則喚醒
        // 如果遇到取消節(jié)點(diǎn)丸边,則繼續(xù)找下一個節(jié)點(diǎn)處理
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
    // 設(shè)置節(jié)點(diǎn)狀態(tài),如果不能設(shè)置荚孵,表示節(jié)點(diǎn)已經(jīng)取消了
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
    // 入隊(duì)妹窖,返回前一個節(jié)點(diǎn)
    Node p = enq(node);
    int ws = p.waitStatus;
    // ws>0 表示前置節(jié)點(diǎn)取消了 或者設(shè)置狀態(tài)失敗,即節(jié)點(diǎn)發(fā)生變化
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        // 喚醒當(dāng)前節(jié)點(diǎn)收叶,重新同步狀態(tài)
        LockSupport.unpark(node.thread);
    return true;
}

ReentrantLock

前面在講解AQS的時候已經(jīng)介紹了ReentrantLock公平鎖的代碼骄呼,下面完整介紹下ReentrantLock

以ReenterantLock為例,該類中定義幾個同步器:

Sync是基類判没,是公平鎖和非公平鎖實(shí)現(xiàn)的基礎(chǔ)

tryAcquire由于公平鎖和非公平鎖實(shí)現(xiàn)不一樣 因此在Sync子類實(shí)現(xiàn)

下圖是非公平鎖NonfairSync的調(diào)用邏輯谒麦,公平鎖類似

image.png
abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = -5179523762034025860L;

    /**
     * Performs {@link Lock#lock}. The main reason for subclassing
     * is to allow fast path for nonfair version.
     允許在非公平鎖情況下實(shí)現(xiàn)快速lock
     */
    abstract void lock();

    /**
     * Performs non-fair tryLock.  tryAcquire is implemented in
     * subclasses, but both need nonfair try for trylock method.
     * 嘗試獲取信號量,也就是設(shè)置state為非0數(shù)值
     */
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
      //獲取
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
    //如果當(dāng)前線程已經(jīng)獲取了哆致,增加獲取的量
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
    //表示沒獲取到绕德,已經(jīng)被其他人獲取了
        return false;
    }

    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
    //只有當(dāng)前顯示是獲取信號量的線程才能釋放
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }

  //判斷當(dāng)前線程是否獨(dú)占
    protected final boolean isHeldExclusively() {
        // While we must in general read state before owner,
        // we don't need to do so to check if current thread is owner
        return getExclusiveOwnerThread() == Thread.currentThread();
    }

    final ConditionObject newCondition() {
        return new ConditionObject();
    }

    // Methods relayed from outer class

    final Thread getOwner() {
    //state為0表示沒加鎖
        return getState() == 0 ? null : getExclusiveOwnerThread();
    }

    final int getHoldCount() {
    //state的數(shù)值表示獲取鎖的數(shù)量
        return isHeldExclusively() ? getState() : 0;
    }

    final boolean isLocked() {
        return getState() != 0;
    }

    /**
     * Reconstitutes the instance from a stream (that is, deserializes it).
     */
    private void readObject(java.io.ObjectInputStream s)
            throws java.io.IOException, ClassNotFoundException {
        s.defaultReadObject();
        setState(0); // reset to unlocked state
    }
}

非公平鎖的實(shí)現(xiàn):

static final class NonfairSync extends ReentrantLock.Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    /**
     * 嘗試獲取鎖,將state從0更新成1摊阀,如果失敗直接使用acquire
     * 獲取鎖的主要邏輯acquire是AQS中的方法耻蛇,其中又會調(diào)用tryAcquire
     */
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }
    //AQS子類關(guān)聯(lián)的實(shí)現(xiàn)方法,可以看到調(diào)用的是上面的nonfairTryAcquire
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

公平鎖的實(shí)現(xiàn):

static final class FairSync extends ReentrantLock.Sync {
    private static final long serialVersionUID = -3000897897090466540L;

    final void lock() {
        acquire(1);
    }

    /**
     * Fair version of tryAcquire.  Don't grant access unless
     * recursive call or no waiters or is first.
     */
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
    //如果是鎖的重入
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}
//查詢是否有線程比當(dāng)前線程等待的時間更長
//由于并發(fā)的實(shí)時性 該方法返回的結(jié)果不一定是正確的
public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    AbstractQueuedSynchronizer.Node t = tail; // Read fields in reverse initialization order
    AbstractQueuedSynchronizer.Node h = head;
    AbstractQueuedSynchronizer.Node s;
    return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末胞此,一起剝皮案震驚了整個濱河市臣咖,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌漱牵,老刑警劉巖夺蛇,帶你破解...
    沈念sama閱讀 216,324評論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異酣胀,居然都是意外死亡刁赦,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,356評論 3 392
  • 文/潘曉璐 我一進(jìn)店門闻镶,熙熙樓的掌柜王于貴愁眉苦臉地迎上來甚脉,“玉大人,你說我怎么就攤上這事铆农∥保” “怎么了?”我有些...
    開封第一講書人閱讀 162,328評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長猴凹。 經(jīng)常有香客問我夷狰,道長,這世上最難降的妖魔是什么郊霎? 我笑而不...
    開封第一講書人閱讀 58,147評論 1 292
  • 正文 為了忘掉前任沼头,我火速辦了婚禮,結(jié)果婚禮上歹篓,老公的妹妹穿的比我還像新娘瘫证。我一直安慰自己揉阎,他們只是感情好庄撮,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,160評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著毙籽,像睡著了一般洞斯。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上坑赡,一...
    開封第一講書人閱讀 51,115評論 1 296
  • 那天烙如,我揣著相機(jī)與錄音,去河邊找鬼毅否。 笑死亚铁,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的螟加。 我是一名探鬼主播徘溢,決...
    沈念sama閱讀 40,025評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼捆探!你這毒婦竟也來了然爆?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,867評論 0 274
  • 序言:老撾萬榮一對情侶失蹤黍图,失蹤者是張志新(化名)和其女友劉穎曾雕,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體助被,經(jīng)...
    沈念sama閱讀 45,307評論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡剖张,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,528評論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了揩环。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片修械。...
    茶點(diǎn)故事閱讀 39,688評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖检盼,靈堂內(nèi)的尸體忽然破棺而出肯污,到底是詐尸還是另有隱情,我是刑警寧澤,帶...
    沈念sama閱讀 35,409評論 5 343
  • 正文 年R本政府宣布蹦渣,位于F島的核電站哄芜,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏柬唯。R本人自食惡果不足惜认臊,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,001評論 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望锄奢。 院中可真熱鬧失晴,春花似錦、人聲如沸拘央。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,657評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽灰伟。三九已至拆又,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間栏账,已是汗流浹背帖族。 一陣腳步聲響...
    開封第一講書人閱讀 32,811評論 1 268
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留挡爵,地道東北人竖般。 一個月前我還...
    沈念sama閱讀 47,685評論 2 368
  • 正文 我出身青樓,卻偏偏與公主長得像茶鹃,于是被迫代替她去往敵國和親涣雕。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,573評論 2 353