多線(xiàn)程知識(shí)梳理(11) - 隊(duì)列同步器實(shí)現(xiàn)原理 & 應(yīng)用

一蚓让、基本概念

隊(duì)列同步器在Java并發(fā)包中的實(shí)現(xiàn)是AbstractQueuedSynchronizer,簡(jiǎn)稱(chēng)為AQS,它是用來(lái)構(gòu)建鎖或者其它同步組件的基礎(chǔ)框架余指。了解其實(shí)現(xiàn)原理有助于:

  • 理解同步組件ReentrantLockReentrantReadWriteLock的原理
  • 理解Condition實(shí)現(xiàn)等待通知/模式的原理
  • 根據(jù)業(yè)務(wù)場(chǎng)景,自定義同步組件(較少用到)

隊(duì)列同步器AQS和同步組件(ReentrantLockReentrantReadWriteLock等)的區(qū)別在于:

  • 同步組件面向使用者,它定義了使用者與鎖交互的接口余舶,隱藏了實(shí)現(xiàn)細(xì)節(jié)。
  • 隊(duì)列同步器面向鎖的實(shí)現(xiàn)者锹淌,它簡(jiǎn)化了鎖的實(shí)現(xiàn)方式匿值,屏蔽了同步狀態(tài)管理、線(xiàn)程排隊(duì)赂摆、等待與喚醒等底層操作挟憔。

隊(duì)列同步器基于模板方法模式,它的使用方式為如下:

  • 創(chuàng)建自定義同步組件的實(shí)現(xiàn)類(lèi)烟号。
  • 實(shí)現(xiàn)者繼承AbstractQueuedSynchronizer(一般會(huì)將它作為自定義同步組件的內(nèi)部類(lèi))绊谭,根據(jù)業(yè)務(wù)場(chǎng)景,重寫(xiě)指定的模板方法tryAcquire(int acquires)汪拥、tryRelease(int releases)等达传。
  • 創(chuàng)建同步器子類(lèi)的對(duì)象,作為自定義同步組件的成員變量迫筑,在對(duì)外提供的公共方法中宪赶,調(diào)用隊(duì)列同步器的指定方法。

ReentrantLock為例脯燃,ReentrantLock為自定義的同步組件搂妻,NonfairSyncFairSync就是隊(duì)列同步器的實(shí)現(xiàn)類(lèi)。

public class ReentrantLock implements Lock, java.io.Serializable {

    private final Sync sync;

    abstract static class Sync extends AbstractQueuedSynchronizer {
        //...
    }
    
    static final class NonfairSync extends Sync {
        //...
    }
    
    static final class FairSync extends Sync {
        //...
    }
    
    public void lock() {
        sync.lock();
    }

}

二辕棚、同步狀態(tài) & 同步隊(duì)列

在隊(duì)列同步器中有兩個(gè)關(guān)鍵的元素:

  • 同步狀態(tài)
  • 同步隊(duì)列

2.1 同步狀態(tài)

同步狀態(tài)欲主,用一個(gè)int型表示,訪(fǎng)問(wèn)或者修改同步狀態(tài)需要使用指定的方法:

  • getState():獲取當(dāng)前同步狀態(tài)逝嚎。
  • setState(int newState):設(shè)置當(dāng)前同步狀態(tài)扁瓢。
  • compareAndSetState(int except, int update):使用CAS設(shè)置當(dāng)前狀態(tài),該方法可以保證狀態(tài)設(shè)置的原子性懈糯。

2.2 同步隊(duì)列

同步隊(duì)列用來(lái)完成同步狀態(tài)的管理涤妒,當(dāng)前線(xiàn)程獲取同步狀態(tài)失敗時(shí),會(huì)將當(dāng)前線(xiàn)程以及等待狀態(tài)等信息構(gòu)造成一個(gè)結(jié)點(diǎn)赚哗,將其加入同步隊(duì)列她紫,同時(shí)會(huì)阻塞當(dāng)前線(xiàn)程,當(dāng)同步狀態(tài)釋放時(shí)屿储,會(huì)把首結(jié)點(diǎn)的后繼結(jié)點(diǎn)的線(xiàn)程喚醒贿讹,使其再次嘗試獲取同步狀態(tài)。

同步隊(duì)列中的結(jié)點(diǎn)保存了以下的信息:

  • thread:線(xiàn)程引用
  • waitStatus:等待狀態(tài)
    • CANCELLED:由于在同步隊(duì)列中等待的線(xiàn)程等待超時(shí)或者被中斷够掠,需要從同步隊(duì)列中取消等待民褂。
    • SIGNAL:后繼結(jié)點(diǎn)的線(xiàn)程處于等待狀態(tài),而當(dāng)前結(jié)點(diǎn)的線(xiàn)程如果釋放了同步狀態(tài)或取消,將會(huì)通知后繼結(jié)點(diǎn)赊堪,使后繼結(jié)點(diǎn)的線(xiàn)程運(yùn)行面殖。
    • CONDITION:結(jié)點(diǎn)在等待隊(duì)列中,結(jié)點(diǎn)線(xiàn)程等待在Condition上哭廉,當(dāng)其他線(xiàn)程對(duì)Condition調(diào)用了signal方法后脊僚,該結(jié)點(diǎn)會(huì)從等待隊(duì)列移到同步隊(duì)列中。
    • PROPAGATE:表示下一次共享式同步狀態(tài)獲取將會(huì)無(wú)條件傳播下去遵绰。
    • INITIAL:初始狀態(tài)辽幌。
  • prev & next:前驅(qū) & 后繼結(jié)點(diǎn)

同步器包含了兩個(gè)類(lèi)型結(jié)點(diǎn)的引用:頭結(jié)點(diǎn)和尾結(jié)點(diǎn)。設(shè)置的區(qū)別在于:

  • 頭結(jié)點(diǎn)是獲取同步狀態(tài)成功的結(jié)點(diǎn)椿访,頭結(jié)點(diǎn)的線(xiàn)程在釋放同步狀態(tài)時(shí)乌企,將會(huì)喚醒后繼結(jié)點(diǎn),而后繼結(jié)點(diǎn)將會(huì)在獲取同步狀態(tài)成功時(shí)將自己設(shè)置成頭結(jié)點(diǎn)成玫。由于頭結(jié)點(diǎn)是通過(guò)獲取同步狀態(tài)成功的線(xiàn)程來(lái)完成的加酵,因此設(shè)置頭結(jié)點(diǎn)的方式不需要CAS來(lái)保證。
  • 如果已經(jīng)有一個(gè)線(xiàn)程獲取了同步狀態(tài)梁剔,那么其他線(xiàn)程無(wú)法獲取同步狀態(tài)虽画,就會(huì)構(gòu)造結(jié)點(diǎn)假如到同步隊(duì)列當(dāng)中,為了保證線(xiàn)程安全荣病,因此需要采用compareAndSetTail(Node expect, Node update)CAS方式來(lái)設(shè)置尾結(jié)點(diǎn)码撰。
同步隊(duì)列

三、同步器提供的模板方法

  • 獨(dú)占式獲取 & 釋放同步狀態(tài)
  • 共享式獲取 & 釋放同步狀態(tài)
  • 獨(dú)占式超時(shí)獲取同步狀態(tài)

3.1 獨(dú)占式同步狀態(tài)獲取與釋放

3.1.1 獲取

通過(guò)調(diào)用同步器的acquire(int arg)方法可以獲取同步狀態(tài)个盆,該方法對(duì)中斷不敏感脖岛。當(dāng)同步狀態(tài)獲取成功后,當(dāng)前線(xiàn)程從acquire(int arg)方法返回颊亮,如果對(duì)于鎖這種并發(fā)組件柴梆,代表著當(dāng)前線(xiàn)程獲取了鎖。其實(shí)現(xiàn)為:

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

我們將其工作拆解稱(chēng)為三個(gè)部分:

Step 1 tryAcquire(arg)

調(diào)用 自定義同步器 實(shí)現(xiàn)的tryAcquire(int arg)方法终惑,該方法需要保證線(xiàn)程安全的獲取同步狀態(tài)绍在。

Step 2 addWaiter(Node.EXCLUSIVE), arg)

如果同步狀態(tài)獲取失敗(tryAcquire方法返回false)雹有,則構(gòu)造同步結(jié)點(diǎn)偿渡,并通過(guò)addWaiter(Node node)方法加入到同步隊(duì)列的尾部。為了保證能夠線(xiàn)程安全地添加霸奕,它采用了兩層保護(hù)機(jī)制:

  • CAScompareAndSetTail溜宽。
  • 死循環(huán):enq
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            //1.確保節(jié)點(diǎn)能夠線(xiàn)程安全地被添加
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //2.通過(guò)死循環(huán)來(lái)確保節(jié)點(diǎn)的正確添加质帅,在"死循環(huán)"中只有通過(guò)`CAS`將節(jié)點(diǎn)設(shè)置為尾節(jié)點(diǎn)之后适揉,當(dāng)前線(xiàn)程才能從該方法返回留攒,否則當(dāng)前線(xiàn)程不斷地進(jìn)行嘗試。
        enq(node);
        return node;
    }

Step 3 acquireQueued(Node node, int arg)

調(diào)用acquireQueued(Node node, int arg)方法嫉嘀,使得新構(gòu)造的同步結(jié)點(diǎn)以“死循環(huán)”的方式獲取同步狀態(tài)炼邀。

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //1.1 得到當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)
                final Node p = node.predecessor();
                //1.2 如果當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn),只有在這種情況下獲取同步狀態(tài)成功
                if (p == head && tryAcquire(arg)) {
                    //1.3 將當(dāng)前節(jié)點(diǎn)設(shè)為頭節(jié)點(diǎn)
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //2 前驅(qū)結(jié)點(diǎn)不是頭結(jié)點(diǎn)吃沪,或者無(wú)法獲取到同步狀態(tài)的情況汤善。
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

這里需要注意的有以下幾點(diǎn):

  • 只有前驅(qū)結(jié)點(diǎn)是頭結(jié)點(diǎn)時(shí),才會(huì)嘗試再次調(diào)用tryAcquire(int arg)來(lái)獲取同步狀態(tài)票彪。這么做是為了維護(hù)同步隊(duì)列的FIFO原則,并且也便于對(duì)過(guò)早通知的處理不狮。
  • 如果當(dāng)前結(jié)點(diǎn)成功獲取到同步狀態(tài)降铸,那么會(huì)通過(guò)setHead方法將其設(shè)為新的頭結(jié)點(diǎn),并斷開(kāi)舊的頭結(jié)點(diǎn)與后繼結(jié)點(diǎn)的關(guān)系(p.next = null)摇零。
  • 如果無(wú)法獲取到同步狀態(tài)推掸,那么會(huì)通過(guò)shouldParkAfterFailedAcquire判斷是否要調(diào)用parkAndCheckInterrupt進(jìn)入阻塞狀態(tài)。waitStatus的默認(rèn)值是0驻仅,因此第一次會(huì)進(jìn)入最后一個(gè)判斷谅畅,并將其waitStatue設(shè)置為Node.SIGNAL,之后進(jìn)入第二次的for循環(huán)噪服,假如其前驅(qū)結(jié)點(diǎn)仍然不是頭結(jié)點(diǎn)或者無(wú)法獲取到同步狀態(tài)毡泻,那么將會(huì)進(jìn)入第一個(gè)判斷,阻塞當(dāng)前線(xiàn)程粘优。
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
                pred.next = node;
        } else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    
    private final boolean parkAndCheckInterrupt() {
        //阻塞當(dāng)前線(xiàn)程仇味。
        LockSupport.park(this);
        //如果當(dāng)前線(xiàn)程已經(jīng)被中斷,那么返回 true雹顺。
        return Thread.interrupted();
    }
  • 如果當(dāng)前線(xiàn)程在獲取同步狀態(tài)時(shí)丹墨,已經(jīng)被中斷,即interrupted=true嬉愧,那么會(huì)調(diào)用selfInterrupt方法贩挣。

3.1.2 釋放

當(dāng)前線(xiàn)程獲取同步狀態(tài)并執(zhí)行了相應(yīng)的邏輯后,就需要釋放同步狀態(tài)没酣,并調(diào)用unparkSuccessor來(lái)喚醒后繼結(jié)點(diǎn)王财。

  public final boolean release(int arg) {
      if (tryRelease(arg)) {
          Node h = head;
          if (h != null && h.waitStatus != 0)
              unparkSuccessor(h);
          return true;
      }
      return false;
  }
  
  private void unparkSuccessor(Node node) {
      int ws = node.waitStatus;
      if (ws < 0)
          node.compareAndSetWaitStatus(ws, 0);
      Node s = node.next;
      if (s == null || s.waitStatus > 0) {
          s = null;
          for (Node p = tail; p != node && p != null; p = p.prev)
              if (p.waitStatus <= 0)
                  s = p;
      }
      if (s != null)
          LockSupport.unpark(s.thread);
  }

3.1.3 小結(jié)

由以上兩點(diǎn)分析,可以看出獨(dú)占式獲取 & 釋放同步狀態(tài)采用了模板方法的設(shè)計(jì)模式四康,AQS內(nèi)部會(huì)基于當(dāng)前線(xiàn)程創(chuàng)建一個(gè)結(jié)點(diǎn)搪搏,并負(fù)責(zé)管理線(xiàn)程的入隊(duì)和出隊(duì),線(xiàn)程的阻塞和喚醒闪金,我們只需要重寫(xiě)重要的模板方法:

  • protected boolean tryAcquire(int arg):獨(dú)占式獲取同步狀態(tài)疯溺,實(shí)現(xiàn)該方法需要查詢(xún)當(dāng)前狀態(tài)并判斷同步狀態(tài)是否符合預(yù)期论颅,然后再進(jìn)行CAS設(shè)置同步狀態(tài)。
  • protected boolean tryRelease(int arg):獨(dú)占式釋放同步狀態(tài)囱嫩。

下面是一個(gè)獨(dú)占式獲取 & 釋放同步狀態(tài)的簡(jiǎn)單實(shí)現(xiàn)示例:

/**
 * @author lizejun
 **/
public class ExclusiveLock implements Lock {

    private static class Sync extends AbstractQueuedSynchronizer {

        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        @Override
        protected boolean tryAcquire(int acquires) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int releases) {
            if (getState() == 0) {
                throw new IllegalMonitorStateException();
            }
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        Condition newCondition() {
            return new ConditionObject();
        }
    }

    private final Sync sync = new Sync();

    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException{
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }

    public boolean isLocked() {
        return sync.isHeldExclusively();
    }

    public boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }


    public static void run() {
        final ExclusiveLock lock = new ExclusiveLock();
        for (int i = 0; i < 3; i++) {
            final int index = i;
            new Thread() {

                @Override
                public void run() {
                    lock.lock();
                    System.out.println("begin:" + index);
                    try {
                        System.out.println("run:" + index);
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        System.out.println("end:" + index);
                        lock.unlock();
                    }
                }

            }.start();
        }
    }
}

運(yùn)行結(jié)果為如下恃疯,可以看到,處于lock ~ unlock之間的邏輯在同一時(shí)間只能由一個(gè)線(xiàn)程執(zhí)行墨闲。

獨(dú)占鎖實(shí)現(xiàn)示例

假如我們注釋掉lock.lock()lock.unlock()今妄,那么打印的結(jié)果為,這時(shí)不能保證是線(xiàn)程安全的鸳碧。

獨(dú)占鎖關(guān)閉

3.2 共享式獲取 & 釋放同步狀態(tài)

3.2.1 獲取

共享式獲取與獨(dú)占式獲取最主要的區(qū)別在于 同一時(shí)刻能否有多個(gè)線(xiàn)程同時(shí)獲取到同步狀態(tài)盾鳞。

調(diào)用同步器的acquireShared(int arg)方法可以共享式地獲取同步狀態(tài)。鎖的實(shí)現(xiàn)者需要重寫(xiě)tryAcquireShared方法瞻离,如果該方法的返回值大于0腾仅,表示能夠獲取到同步狀態(tài)。

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    //1.如果前驅(qū)結(jié)點(diǎn)是頭結(jié)點(diǎn)套利,那么會(huì)嘗試獲取同步狀態(tài)推励。
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        //2.獲取同步狀態(tài)成功,設(shè)為頭結(jié)點(diǎn)肉迫。
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                //3.獲取同步狀態(tài)失敗验辞,那么阻塞當(dāng)前線(xiàn)程。
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

3.2.2 釋放

通過(guò)調(diào)用releaseShared(int args)方法可以釋放同步狀態(tài)喊衫,該方法在釋放同步狀態(tài)后跌造,將會(huì)喚醒后續(xù)處于阻塞狀態(tài)的結(jié)點(diǎn)。對(duì)于支持多個(gè)線(xiàn)程同時(shí)訪(fǎng)問(wèn)的并發(fā)組件格侯,它和獨(dú)占式的區(qū)別在于鼻听,tryReleaseShared(int arg)方法必須保證同步狀態(tài)安全釋放,一般是通過(guò)循環(huán)和CAS來(lái)保證的联四,因?yàn)獒尫磐綘顟B(tài)的操作會(huì)同時(shí)來(lái)自多個(gè)線(xiàn)程撑碴。

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

3.2.3 小結(jié)

與獨(dú)占式獲取 & 釋放同步狀態(tài)類(lèi)似,在繼承AQS時(shí)主要關(guān)注兩個(gè)重要的模板方法:

  • protected int tryAcquireShared(int arg):共享式獲取同步狀態(tài)朝墩,返回值>=0表示獲取成功醉拓,否則失敗。
  • protected boolean tryReleaseShared(int arg):共享式釋放同步狀態(tài)收苏。

下面是一個(gè)簡(jiǎn)單地共享式獲取 & 釋放同步狀態(tài)的示例:

/**
 * @author lizejun
 **/
public class TwinsLock implements Lock {

    private Sync sync = new Sync(2);

    private static final class Sync extends AbstractQueuedSynchronizer {

        Sync(int count) {
            setState(count);
        }

        @Override
        protected int tryAcquireShared(int reduceCount) {
            for (;;) {
                int current = getState();
                int newCount = current - reduceCount;
                if (newCount < 0 || compareAndSetState(current, newCount)) {
                    return newCount;
                }
            }
        }

        @Override
        protected boolean tryReleaseShared(int returnCount) {
            for (;;) {
                int current = getState();
                int newCount = current + returnCount;
                if (compareAndSetState(current, newCount)) {
                    return true;
                }
            }
        }

        Condition newCondition() {
            return new ConditionObject();
        }
    }

    @Override
    public void lock() {
        sync.acquireShared(1);
    }

    @Override
    public void unlock() {
        sync.releaseShared(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    @Override
    public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquireShared(1) >= 0;
    }

    public static void run() {
        final TwinsLock lock = new TwinsLock();
        for (int i = 0; i < 5; i++) {
            final int index = i;
            new Thread() {

                @Override
                public void run() {
                    lock.lock();
                    System.out.println("begin:" + index);
                    try {
                        System.out.println("run:" + index);
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        System.out.println("end:" + index);
                        lock.unlock();
                    }
                }

            }.start();
        }
    }
}

運(yùn)行結(jié)果為如下亿卤,可以看到在同一時(shí)刻只有兩個(gè)線(xiàn)程能夠獲取到鎖:

共享鎖實(shí)現(xiàn)示例

3.3 獨(dú)占式超時(shí)獲取同步狀態(tài)

通過(guò)調(diào)用同步器的doAcquireNanos(int arg, long nanosTimeout)可以超時(shí)獲取同步狀態(tài)。

  • synchronized:當(dāng)一個(gè)線(xiàn)程獲取不到鎖而被阻塞在synchronized之外時(shí)鹿霸,對(duì)該線(xiàn)程進(jìn)行中斷操作排吴,此時(shí)線(xiàn)程中斷標(biāo)志位會(huì)被修改,但線(xiàn)程仍然會(huì)阻塞在synchronized方法上懦鼠。
  • acquireInterruptibly(int arg):在等待獲取同步狀態(tài)時(shí)钻哩,如果當(dāng)前線(xiàn)程被中斷屹堰,會(huì)立刻返回InterruptedException
  • acquireInterruptibly(int arg, long nanosTimeout):與獨(dú)占式獲取的區(qū)別在于街氢,在獲取同步狀態(tài)失敗后扯键,不是簡(jiǎn)單地進(jìn)入阻塞狀態(tài),而是會(huì)判斷是否超時(shí)珊肃。
    • 如果超時(shí) > 1000ns荣刑,重新計(jì)算超時(shí)時(shí)間,通過(guò)LockSupport.parkNanos等待對(duì)應(yīng)的超時(shí)時(shí)間后返回
    • 如果超時(shí) <= 1000ns伦乔,不進(jìn)入超時(shí)等待厉亏,而是進(jìn)入快速的自旋過(guò)程。
    private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        //1.計(jì)算出截止時(shí)間.
        final long deadline = System.nanoTime() + nanosTimeout;
       //2.加入節(jié)點(diǎn)
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                //3.取出前驅(qū)節(jié)點(diǎn)
                final Node p = node.predecessor();
                //4.如果獲取成功則直接返回
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();
                //5.如果到了超時(shí)時(shí)間烈和,則直接返回
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                //6.如果在自旋過(guò)程中被中斷叶堆,那么拋出異常返回
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (faisled)
                cancelAcquire(node);
        }
    }

四、阻塞和喚醒線(xiàn)程

在前面斥杜,我們多次提到了阻塞和喚醒線(xiàn)程,它的實(shí)現(xiàn)依靠的是LockSupport工具類(lèi)沥匈,它定義了一組的公共靜態(tài)方法蔗喂,提供了最基本的線(xiàn)程阻塞和喚醒功能:

  • park:阻塞當(dāng)前線(xiàn)程,直到調(diào)用unpark(Thread thread)或者當(dāng)前線(xiàn)程被中斷才返回高帖。
  • parkNanos(long nanos):在park基礎(chǔ)上增加了超時(shí)返回缰儿。
  • parkUntil(long deadline):直到deadline時(shí)間才返回。
  • unpark(Thread thread):?jiǎn)拘烟幱谧枞麪顟B(tài)的線(xiàn)程thread散址。

五乖阵、Condition

ConditionObject是同步器AQS的內(nèi)部類(lèi),每個(gè)Condition對(duì)象都包含著一個(gè)等待隊(duì)列预麸,該隊(duì)列是實(shí)現(xiàn)等待/通知功能的關(guān)鍵瞪浸,其用法也很簡(jiǎn)單,就是通過(guò)awaitsignal/signalAll方法來(lái)進(jìn)行等待和通知吏祸。

5.1 等待隊(duì)列

每一個(gè)Condition包含了一個(gè)等待隊(duì)列对蒲,該隊(duì)列結(jié)點(diǎn)的類(lèi)型為AbstractQueuedSynchronizer.Node,與AQS中同步隊(duì)列結(jié)點(diǎn)的類(lèi)型相同贡翘,頭結(jié)點(diǎn)firstWaiter和尾結(jié)點(diǎn)lastWaiter蹈矮。

等待隊(duì)列

這里需要注意的是 一個(gè)AQS對(duì)象只有一個(gè)同步隊(duì)列,但是它可以關(guān)聯(lián)到多個(gè)等待隊(duì)列上鸣驱。

5.2 等待

調(diào)用Conditionawait()方法的前提是當(dāng)前線(xiàn)程已經(jīng)獲取了同步狀態(tài)泛鸟。它會(huì)執(zhí)行下面的操作:

  • 使用當(dāng)前線(xiàn)程創(chuàng)建一個(gè)新的結(jié)點(diǎn)加入等待隊(duì)列。
  • 從同步隊(duì)列中移除當(dāng)前線(xiàn)程踊东,即釋放當(dāng)前線(xiàn)程的同步狀態(tài)北滥,再喚醒其在同步隊(duì)列中的后繼結(jié)點(diǎn)刚操。
    public final void await() throws InterruptedException {
        if (Thread.interrupted()) throw new InterruptedException();
        //1.創(chuàng)建新的結(jié)點(diǎn)加入等待隊(duì)列。
        Node node = addConditionWaiter();
        //2.釋放同步狀態(tài)碑韵。
        long savedState = fullyRelease(node);
        int interruptMode = 0;
        //3.判斷是否在同步隊(duì)列當(dāng)中赡茸,如果不再則阻塞。
        while (!isOnSyncQueue(node)) {
            //3.1.等待被喚醒祝闻,喚醒的方法就是調(diào)用 signal 方法占卧。
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        //4.被喚醒后從等待隊(duì)列移到同步隊(duì)列中,繼續(xù)參與同步狀態(tài)的競(jìng)爭(zhēng)联喘。
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }

5.3 通知

調(diào)用Conditionsignal()方法的前提條件是當(dāng)前線(xiàn)程已經(jīng)獲取了同步狀態(tài)华蜒,它會(huì)執(zhí)行下面的操作:

  • 調(diào)用同步器的end(Node node)方法,將等待隊(duì)列中的頭結(jié)點(diǎn)(注意是等待隊(duì)列的頭結(jié)點(diǎn)豁遭,而不是當(dāng)前線(xiàn)程關(guān)聯(lián)的結(jié)點(diǎn))線(xiàn)程安全地移動(dòng)到同步隊(duì)列叭喜。
  • 結(jié)點(diǎn)移動(dòng)到同步隊(duì)列后,當(dāng)前線(xiàn)程再使用LockSupport喚醒該結(jié)點(diǎn)的線(xiàn)程蓖谢。
    public final void signal() {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignal(first);
    }

    private void doSignal(Node first) {
        do {
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;
        } while (!transferForSignal(first) &&
                (first = firstWaiter) != null);
    }

    final boolean transferForSignal(Node node) {
        if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
            return false;
        //1.加入到同步隊(duì)列中捂蕴。    
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
            //2.喚醒。
            LockSupport.unpark(node.thread);
        return true;
    }

  • 被喚醒的線(xiàn)程闪幽,從await方法第3步中的while循環(huán)中退出啥辨,進(jìn)而調(diào)用同步器的acquireQueued()方法加入到獲取同步狀態(tài)的競(jìng)爭(zhēng)中(也就是5.2小結(jié)中從3循環(huán)退出后的邏輯)。

signalAll相當(dāng)于對(duì)等待隊(duì)列中的每個(gè)結(jié)點(diǎn)均執(zhí)行一次signal方法盯腌,效果就是將等待隊(duì)列中所有節(jié)點(diǎn)全部移動(dòng)到同步隊(duì)列中溉知,并喚醒每個(gè)節(jié)點(diǎn)的線(xiàn)程。

5.4 生產(chǎn)者消費(fèi)者模型

生產(chǎn)者消費(fèi)者模型是多線(xiàn)程協(xié)作的經(jīng)典示例腕够。在這個(gè)模型中包含三個(gè)角色级乍。

  • 生產(chǎn)者:向緩沖區(qū)中寫(xiě)入,當(dāng)緩沖區(qū)滿(mǎn)時(shí)等待帚湘;當(dāng)緩沖區(qū)有新的數(shù)據(jù)后玫荣,通知消費(fèi)者。
  • 消費(fèi)者:從緩存區(qū)獲取客们,當(dāng)緩沖區(qū)為空時(shí)等待崇决,并通知生產(chǎn)者。
  • 緩沖區(qū):存儲(chǔ)數(shù)據(jù)底挫,這里我們用LinkedList來(lái)表示恒傻。

下面是實(shí)現(xiàn)的代碼:

/**
 * @author lizejun
 **/
public class Demo {

    private static final int MAX_SIZE = 5;

    private final LinkedList<Integer> factory = new LinkedList<>();

    private Lock lock = new ReentrantLock();
    private Condition consumer = lock.newCondition();
    private Condition producer = lock.newCondition();

    private void producer2() {
        lock.lock();
        try {
            while (factory.size() == MAX_SIZE) {
                try {
                    System.out.println("-- factory is full --");
                    producer.await();
                    Thread.sleep(Math.round(Math.random() * 100));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("produce product");
            factory.add(factory.size());
            consumer.signalAll();
        } finally {
            lock.unlock();
        }
    }

    private void consumer2() {
        lock.lock();
        try {
            while (factory.size() == 0) {
                try {
                    System.out.println("-- factory is empty --");
                    consumer.await();
                    Thread.sleep(Math.round(Math.random() * 100));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("consume product");
            factory.removeLast();
            producer.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public static void run2() {
        final Demo demo = new Demo();
        Thread pThread = new Thread() {

            @Override
            public void run() {
                for (int i = 0; i < 100; i++) {
                    demo.producer2();
                }
            }

        };
        Thread cThread = new Thread() {

            @Override
            public void run() {
                for (int i = 0; i < 100; i++) {
                    demo.consumer2();
                }
            }
        };
        pThread.start();
        cThread.start();
    }
}

運(yùn)行結(jié)果為:

生產(chǎn)者消費(fèi)者模型

5.5 Condition 提供的其它方法

  • void await() throw InterruptedException:當(dāng)前線(xiàn)程進(jìn)入等待隊(duì)列直到被signal通知或中斷。
  • void awaitUninterruptibly():當(dāng)前線(xiàn)程進(jìn)入等待狀態(tài)直到被通知建邓,對(duì)中斷不敏感盈厘。
  • long awaitNanos(long nanosTimeout) throws InterruptedException:當(dāng)前線(xiàn)程進(jìn)入等待狀態(tài)直到被通知,返回值表示剩余的時(shí)間官边,如果返回值<=0沸手,那么就是超時(shí)外遇。
  • boolean waitUntil(Data deadline) throws InterruptedException:當(dāng)前線(xiàn)程進(jìn)入等待狀態(tài)直到被通知、中斷或者到某個(gè)時(shí)間契吉,如果沒(méi)有到指定時(shí)間就通知跳仿,則返回true
  • void signal():?jiǎn)拘岩粋€(gè)等待在Condition上的線(xiàn)程捐晶,該線(xiàn)程從等待方法返回前必須獲得和Condition相關(guān)的鎖菲语。
  • void signalAll():與signal()的區(qū)別在于,它會(huì)喚醒所有等待在Condition上的線(xiàn)程惑灵。

這里容易混淆的是響應(yīng)中斷的概念山上,根據(jù)await()調(diào)用后所處的階段,可以分為下面三種情況:

  • 調(diào)用awaiXXt()之前已經(jīng)被中斷:
    • await():拋出異常英支。
    • awaitUninterruptibly():不拋出異常佩憾。
  • 調(diào)用完awaitXX()之后,仍然處于等待隊(duì)列中:
    • await():被喚醒干花,并拋出異常妄帘。
    • awaitUninterruptibly():被喚醒,設(shè)置標(biāo)志位interruptedtrue后池凄,繼續(xù)在等待隊(duì)列中等待寄摆。
  • 調(diào)用完awaitXX()之后,并且調(diào)用了signal(將它等待隊(duì)列移動(dòng)到了同步隊(duì)列):
    • await():繼續(xù)在同步隊(duì)列中等待修赞,并不會(huì)拋出異常,只是在返回后Threadinterruptedtrue桑阶。
    • awaitUninterruptibly():和await()的表現(xiàn)是一樣的柏副。

也就是說(shuō)是響應(yīng)中斷是針對(duì) 調(diào)用之前和處于等待隊(duì)列 這兩種情況而言的,當(dāng)它已經(jīng)移到到同步隊(duì)列后蚣录,兩者都不會(huì)響應(yīng)中斷割择。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市萎河,隨后出現(xiàn)的幾起案子荔泳,更是在濱河造成了極大的恐慌,老刑警劉巖虐杯,帶你破解...
    沈念sama閱讀 218,607評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件玛歌,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡擎椰,警方通過(guò)查閱死者的電腦和手機(jī)支子,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,239評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)达舒,“玉大人值朋,你說(shuō)我怎么就攤上這事叹侄。” “怎么了昨登?”我有些...
    開(kāi)封第一講書(shū)人閱讀 164,960評(píng)論 0 355
  • 文/不壞的土叔 我叫張陵趾代,是天一觀(guān)的道長(zhǎng)。 經(jīng)常有香客問(wèn)我丰辣,道長(zhǎng)撒强,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,750評(píng)論 1 294
  • 正文 為了忘掉前任糯俗,我火速辦了婚禮尿褪,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘得湘。我一直安慰自己杖玲,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,764評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布淘正。 她就那樣靜靜地躺著摆马,像睡著了一般。 火紅的嫁衣襯著肌膚如雪鸿吆。 梳的紋絲不亂的頭發(fā)上囤采,一...
    開(kāi)封第一講書(shū)人閱讀 51,604評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音惩淳,去河邊找鬼蕉毯。 笑死,一個(gè)胖子當(dāng)著我的面吹牛思犁,可吹牛的內(nèi)容都是我干的代虾。 我是一名探鬼主播,決...
    沈念sama閱讀 40,347評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼激蹲,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼棉磨!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起学辱,我...
    開(kāi)封第一講書(shū)人閱讀 39,253評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤乘瓤,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后策泣,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體衙傀,經(jīng)...
    沈念sama閱讀 45,702評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,893評(píng)論 3 336
  • 正文 我和宋清朗相戀三年萨咕,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了差油。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,015評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖蓄喇,靈堂內(nèi)的尸體忽然破棺而出发侵,到底是詐尸還是另有隱情,我是刑警寧澤妆偏,帶...
    沈念sama閱讀 35,734評(píng)論 5 346
  • 正文 年R本政府宣布刃鳄,位于F島的核電站,受9級(jí)特大地震影響钱骂,放射性物質(zhì)發(fā)生泄漏叔锐。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,352評(píng)論 3 330
  • 文/蒙蒙 一见秽、第九天 我趴在偏房一處隱蔽的房頂上張望愉烙。 院中可真熱鬧,春花似錦解取、人聲如沸步责。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,934評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)蔓肯。三九已至,卻和暖如春振乏,著一層夾襖步出監(jiān)牢的瞬間蔗包,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,052評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工慧邮, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留调限,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,216評(píng)論 3 371
  • 正文 我出身青樓误澳,卻偏偏與公主長(zhǎng)得像旧噪,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子脓匿,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,969評(píng)論 2 355