多線程與高并發(fā)(二)-- java.util .concurrent同步工具

一芯丧、cas自旋原理

1腔彰、概念

CAS的全稱是Compare-And-Swap叫编,它是CPU并發(fā)原語,原語的執(zhí)行必須是連續(xù)的霹抛,在執(zhí)行過程中不允許被中斷搓逾,也就是說CAS是一條CPU的原子指令,不會(huì)造成所謂的數(shù)據(jù)不一致性問題杯拐,是線程安全的霞篡。CAS并發(fā)原語體現(xiàn)在Java語言中就是sun.misc.Unsafe類的各個(gè)方法,調(diào)用UnSafe類中的CAS方法端逼。從其命名可以發(fā)現(xiàn)朗兵,其本質(zhì)就是比較和替換。

2顶滩、手動(dòng)實(shí)現(xiàn)一個(gè)自旋鎖

private static int num = 0;

public static boolean add(int source, int target) {
    int count = 0;
    while (true) {
        if (num == source) {
            num = target;
            return true;
        } else {
            count++;
            if (count == 10) {
                return false;
            }
        }
    }
}

public static void main(String[] args) {
    //線程?hào)艡谟嘁矗却芯€程準(zhǔn)備完畢后執(zhí)行
    CyclicBarrier cyclicBarrier = new CyclicBarrier(10);
    for (int i = 0; i < 10; i++) {
        new Thread(() -> {
            try {
                //內(nèi)部使用ReentrantLock重入鎖
                cyclicBarrier.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
            boolean flag = add(0, 1);
            if (flag) {
                System.out.println(Thread.currentThread().getName() + "更新成功==================");
            } else {
                System.out.println(Thread.currentThread().getName() + "更新失敗");
            }
        }).start();
    }
}

結(jié)果:只有一條更新成功

Thread-0更新失敗
Thread-7更新失敗
Thread-6更新失敗
Thread-5更新失敗
Thread-4更新失敗
Thread-3更新失敗
Thread-8更新成功==================
Thread-2更新失敗
Thread-1更新失敗
Thread-9更新失敗

3、底層核心

sun.misc.Unsafe是CAS的底層核心類礁鲁,Unsafe類中所有方法都是native修飾的盐欺,也就是說Unsafe類中的方法都直接調(diào)用操作系統(tǒng)底層資源執(zhí)行相應(yīng)任務(wù)赁豆。

以ava.util.concurrent.atomic.AtomicInteger的getAndIncrement方法源碼分析

/**
 * 當(dāng)前值自增1
 **/
public final int getAndIncrement() {
    //valueOffset系統(tǒng)偏移量
    return unsafe.getAndAddInt(this, valueOffset, 1);
}

/**
 * 獲取當(dāng)前值var5,并加var4
 **/
public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        //獲取主內(nèi)存當(dāng)前值var5
        var5 = this.getIntVolatile(var1, var2);
        //cas循環(huán)等待替換冗美,var5+var4是替換后的值
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;
}

4魔种、CAS缺點(diǎn)

1)循環(huán)時(shí)間長CPU開銷大
2)只能保證一個(gè)共享變量的原子操作
3)會(huì)引發(fā)ABA問題

5、ABA問題介紹及解決

簡單通過代碼實(shí)現(xiàn)下ABA問題,線程Thread-0先將num修改為了1粉洼,然后又將num修改成了0节预;線程Thread-1則認(rèn)為當(dāng)前num一直沒有經(jīng)過改變,而將其修改成了10属韧。這里我們可以發(fā)現(xiàn)問題安拟,這時(shí)候num雖然值仍然是0,但是其實(shí)已經(jīng)不是最開始那個(gè)0了挫剑,這樣在某些情況下就會(huì)導(dǎo)致問題去扣。

private static int num = 0;

public static boolean add(int source, int target) {
    int count = 0;
    while (true) {
        if (num == source) {
            num = target;
            return true;
        } else {
            count++;
            if (count == 10) {
                return false;
            }
        }
    }
}

public static void main(String[] args) {
    new Thread(() -> {
        //將數(shù)據(jù)更新為1
        if (add(0, 1)) {
            System.out.println(Thread.currentThread().getName() + "更新num為1成功");
        } else {
            System.out.println(Thread.currentThread().getName() + "更新num為1失敗");
        }
        //將數(shù)據(jù)更新為0
        if (add(1, 0)) {
            System.out.println(Thread.currentThread().getName() + "更新num為0成功");
        } else {
            System.out.println(Thread.currentThread().getName() + "更新num為0失敗");
        }

    }).start();

    new Thread(() -> {
        //將數(shù)據(jù)更新為10
        if (add(0, 10)) {
            System.out.println(Thread.currentThread().getName() + "更新num為10成功");
        } else {
            System.out.println(Thread.currentThread().getName() + "更新num為10失敗");
        }

    }).start();
}

結(jié)果:

Thread-0更新num為1成功
Thread-0更新num為0成功
Thread-1更新num為10成功

上面我自行實(shí)現(xiàn)的自旋鎖過程,下面看一個(gè)atomic原子類的實(shí)現(xiàn)樊破。非常簡單

public static void main(String[] args) {
    AtomicInteger atomicInteger = new AtomicInteger(0);

    new Thread(()->{
        atomicInteger.compareAndSet(0,1);
        atomicInteger.compareAndSet(1,0);
    }).start();

    new Thread(()->{
        boolean b = atomicInteger.compareAndSet(0, 10);
        if (b){
            System.out.println("更新為10成功");
        }else{
            System.out.println("更新為10失敗");
        }
    }).start();
}

結(jié)果:

更新為10成功

ABA問題的解決:其實(shí)問題的本質(zhì)原因在于我們的樂觀鎖只比較了值是否相等,可以通過增加其他屬性的比較唆铐,例如時(shí)間戳、版本號(hào)等。這里我們采用AtomicStampedReference類解決該問題耘斩。

/**
 * 構(gòu)造方法
 * @param initialRef 初始值
 * @param initialStamp  初始版本戳
 */
public AtomicStampedReference(V initialRef, int initialStamp) {
    pair = Pair.of(initialRef, initialStamp);
}

/**
 * CAS方法
 * @param expectedReference 初始值
 * @param newReference  替換值
 * @param expectedStamp 初始版本戳
 * @param newStamp  新版本戳
 * @return
 */
public boolean compareAndSet(V   expectedReference,
                             V   newReference,
                             int expectedStamp,
                             int newStamp) {
    AtomicStampedReference.Pair<V> current = pair;
    return
            expectedReference == current.reference &&
                    expectedStamp == current.stamp &&
                    ((newReference == current.reference &&
                            newStamp == current.stamp) ||
                            casPair(current, AtomicStampedReference.Pair.of(newReference, newStamp)));
}

實(shí)例:

public static void main(String[] args) {
    AtomicStampedReference atomicStampedReference = new AtomicStampedReference(0, 0);

    new Thread(() -> {
        atomicStampedReference.compareAndSet(0, 1, 0, 1);
        atomicStampedReference.compareAndSet(1, 0, 1, 2);
    }).start();

    new Thread(() -> {
        boolean b = atomicStampedReference.compareAndSet(0, 1, 0, 1);
        if (b) {
            System.out.println("更新為10成功");
        } else {
            System.out.println("更新為10失敗");
        }
    }).start();
}

結(jié)果:

更新為10失敗

二潭兽、ReentrantLock可重入鎖

在上一篇基礎(chǔ)概念中,我們使用ReentrantLock實(shí)現(xiàn)了線程同步問題王浴,代碼如下:

/**
 * 庫存
 */
static class Inventory {

    //初始化ReentrantLock實(shí)例
    Lock lock = new ReentrantLock();

    //庫存數(shù)量
    private int num = 100;

    //增加庫存
    public void add(int n) {
        //加鎖
        lock.lock();
        try {
            num += n;
            System.out.println("增加庫存后的數(shù)量=" + num);
        } finally {
            //釋放鎖
            lock.unlock();
        }
    }

    //減少庫存
    public void sub(int n) {
        //加鎖
        lock.lock();
        try {
            num -= n;
            System.out.println("減少庫存后的數(shù)量=" + num);
        } finally {
            //釋放鎖
            lock.unlock();
        }
    }
}

public static void main(String[] args) {
    Inventory inventory = new Inventory();
    for (int i = 0; i < 100; i++) {
        new Thread(() -> {
            inventory.add(1);
        }).start();
    }
    for (int i = 0; i < 100; i++) {
        new Thread(() -> {
            inventory.sub(1);
        }).start();
    }
}

1脆炎、代碼內(nèi)部依賴關(guān)系

我們借ReentrantLock看下java內(nèi)鎖的底層結(jié)構(gòu),后續(xù)我們進(jìn)行逐個(gè)節(jié)點(diǎn)的分析


ReentrantLock底層結(jié)構(gòu)

2氓辣、接下來我們來分析下底層原理:

ReentrantLock位于java.util.concurrent.locks包下秒裕,其實(shí)中包含三個(gè)內(nèi)部類。
Syn:繼承AbstractQueuedSynchronizer(AQS)钞啸,用于實(shí)現(xiàn)同步機(jī)制几蜻。
FairSync:公平鎖對象,繼承Syn体斩。
NonfairSync:非公平鎖對象梭稚,繼承Syn。


ReentrantLock

2.1 AbstractQueuedSynchronizer(AQS)

用來構(gòu)建鎖或其他同步組件的框架絮吵,是JDK中實(shí)現(xiàn)并發(fā)編程的核心弧烤,它提供了一個(gè)基于FIFO隊(duì)列,平時(shí)我們工作中經(jīng)常用到的ReentrantLock蹬敲,CountDownLatch等都是基于它來實(shí)現(xiàn)的暇昂。
分析其源碼想幻,有兩個(gè)內(nèi)部類

AbstractQueuedSynchronizer內(nèi)部類

Node:同步隊(duì)列的模型
ConditionObject:等待隊(duì)列的模型

逐一看下其內(nèi)部源碼:
Node源碼:

  static final class Node {
    // 模式,分為共享與獨(dú)占
    // 共享模式
    static final Node SHARED = new Node();
    // 獨(dú)占模式
    static final Node EXCLUSIVE = null;        
    // 結(jié)點(diǎn)狀態(tài)
    // CANCELLED话浇,值為1脏毯,表示當(dāng)前的線程被取消
    // SIGNAL,值為-1幔崖,表示當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)包含的線程需要運(yùn)行食店,也就是unpark
    // CONDITION,值為-2赏寇,表示當(dāng)前節(jié)點(diǎn)在等待condition吉嫩,也就是在condition隊(duì)列中
    // PROPAGATE,值為-3嗅定,表示當(dāng)前場景下后續(xù)的acquireShared能夠得以執(zhí)行
    // 值為0自娩,表示當(dāng)前節(jié)點(diǎn)在sync隊(duì)列中,等待著獲取鎖
    static final int CANCELLED =  1;
    static final int SIGNAL    = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;        

    // 結(jié)點(diǎn)狀態(tài)
    volatile int waitStatus;        
    // 前驅(qū)結(jié)點(diǎn)
    volatile Node prev;    
    // 后繼結(jié)點(diǎn)
    volatile Node next;        
    // 結(jié)點(diǎn)所對應(yīng)的線程
    volatile Thread thread;        
    // 下一個(gè)等待者
    Node nextWaiter;
    
    // 結(jié)點(diǎn)是否在共享模式下等待
    final boolean isShared() {
        return nextWaiter == SHARED;
    }
    
    // 獲取前驅(qū)結(jié)點(diǎn)渠退,若前驅(qū)結(jié)點(diǎn)為空忙迁,拋出異常
    final Node predecessor() throws NullPointerException {
        // 保存前驅(qū)結(jié)點(diǎn)
        Node p = prev; 
        if (p == null) // 前驅(qū)結(jié)點(diǎn)為空,拋出異常
            throw new NullPointerException();
        else // 前驅(qū)結(jié)點(diǎn)不為空碎乃,返回
            return p;
    }
    
    // 無參構(gòu)造函數(shù)
    Node() {    // Used to establish initial head or SHARED marker
    }
    
    // 構(gòu)造函數(shù)
     Node(Thread thread, Node mode) {    // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }
    
    // 構(gòu)造函數(shù)
    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

ConditionObject 源碼
實(shí)現(xiàn)了condition接口姊扔,關(guān)于condition的學(xué)習(xí)請看下一小節(jié):三、Condition條件等待與通知

  // 內(nèi)部類
public class ConditionObject implements Condition, java.io.Serializable {
    // 版本號(hào)
    private static final long serialVersionUID = 1173984872572414699L;
    /** First node of condition queue. */
    // condition隊(duì)列的頭結(jié)點(diǎn)
    private transient Node firstWaiter;
    /** Last node of condition queue. */
    // condition隊(duì)列的尾結(jié)點(diǎn)
    private transient Node lastWaiter;

    /**
     *  構(gòu)造函數(shù)
     */
    public ConditionObject() { }

    /**
     * 添加新的waiter到wait隊(duì)列
     */
    private Node addConditionWaiter() {
        // 保存尾結(jié)點(diǎn)
        Node t = lastWaiter;
       // 尾結(jié)點(diǎn)不為空梅誓,并且尾結(jié)點(diǎn)的狀態(tài)不為CONDITION
        if (t != null && t.waitStatus != Node.CONDITION) { 
            // 清除狀態(tài)不為CONDITION的結(jié)點(diǎn)恰梢,對firstWaiter和lastWaiter重新賦值
            unlinkCancelledWaiters(); 
            // 將最后一個(gè)結(jié)點(diǎn)重新賦值給t
            t = lastWaiter;
        }
        // 新建一個(gè)結(jié)點(diǎn)
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        // 尾結(jié)點(diǎn)為空
        if (t == null) 
            // 設(shè)置condition隊(duì)列的頭結(jié)點(diǎn)
            firstWaiter = node;
        else 
            // 設(shè)置為節(jié)點(diǎn)的nextWaiter域?yàn)閚ode結(jié)點(diǎn)
            t.nextWaiter = node;
        // 更新condition隊(duì)列的尾結(jié)點(diǎn)
        lastWaiter = node;
        return node;
    }

    /**
     *  轉(zhuǎn)移first節(jié)點(diǎn)到sync隊(duì)列
     */
    private void doSignal(Node first) {
        // 循環(huán)
        do {
            // 將下一個(gè)節(jié)點(diǎn)設(shè)為首節(jié)點(diǎn),如果為空
            if ( (firstWaiter = first.nextWaiter) == null) 
                // 設(shè)置尾結(jié)點(diǎn)為空
                lastWaiter = null;
            // 設(shè)置first結(jié)點(diǎn)的nextWaiter域
            first.nextWaiter = null;
        } while (!transferForSignal(first) &&
                 (first = firstWaiter) != null); // 將結(jié)點(diǎn)從condition隊(duì)列轉(zhuǎn)移到sync隊(duì)列失敗并且condition隊(duì)列中的頭結(jié)點(diǎn)不為空梗掰,一直循環(huán)
    }

    /**
     * 轉(zhuǎn)移所有等待隊(duì)列的節(jié)點(diǎn)到同步隊(duì)列
     */
    private void doSignalAll(Node first) {
        // condition隊(duì)列的頭結(jié)點(diǎn)尾結(jié)點(diǎn)都設(shè)置為空
        lastWaiter = firstWaiter = null;
        // 循環(huán)
        do {
            // 獲取first結(jié)點(diǎn)的nextWaiter域結(jié)點(diǎn)
            Node next = first.nextWaiter;
            // 設(shè)置first結(jié)點(diǎn)的nextWaiter域?yàn)榭?            first.nextWaiter = null;
            // 將first結(jié)點(diǎn)從condition隊(duì)列轉(zhuǎn)移到sync隊(duì)列
            transferForSignal(first);
            // 重新設(shè)置first
            first = next;
        } while (first != null);
    }

    /**
     * 過濾掉狀態(tài)不為CONDITION的節(jié)點(diǎn)
     * 對firstWaiter和lastWaiter重新賦值
     **/
    private void unlinkCancelledWaiters() {
        // 保存condition隊(duì)列頭結(jié)點(diǎn)
        Node t = firstWaiter;
        Node trail = null;
        while (t != null) {
            // 下一個(gè)結(jié)點(diǎn)
            Node next = t.nextWaiter;
            // t結(jié)點(diǎn)的狀態(tài)不為CONDTION狀態(tài)
            if (t.waitStatus != Node.CONDITION) { 
                // 設(shè)置t節(jié)點(diǎn)的額nextWaiter域?yàn)榭?                t.nextWaiter = null;
                if (trail == null) // trail為空
                    // 重新設(shè)置condition隊(duì)列的頭結(jié)點(diǎn)
                    firstWaiter = next;
                else 
                    // 設(shè)置trail結(jié)點(diǎn)的nextWaiter域?yàn)閚ext結(jié)點(diǎn)
                    trail.nextWaiter = next;
                if (next == null) // next結(jié)點(diǎn)為空
                    // 設(shè)置condition隊(duì)列的尾結(jié)點(diǎn)
                    lastWaiter = trail;
            }
            else // t結(jié)點(diǎn)的狀態(tài)為CONDTION狀態(tài)
                // 設(shè)置trail結(jié)點(diǎn)
                trail = t;
            // 設(shè)置t結(jié)點(diǎn)
            t = next;
        }
    }

    /**
     * 實(shí)現(xiàn)Condition接口的signal方法
     */
    public final void signal() {
        if (!isHeldExclusively()) // 不被當(dāng)前線程獨(dú)占嵌言,拋出異常
            throw new IllegalMonitorStateException();
        // 保存condition隊(duì)列頭結(jié)點(diǎn)
        Node first = firstWaiter;
        if (first != null) // 頭結(jié)點(diǎn)不為空
            // 喚醒一個(gè)等待線程
            doSignal(first);
    }

    /**
     * 實(shí)現(xiàn)Condition的signalAll方法,喚醒所有線程
     */
    public final void signalAll() {
        if (!isHeldExclusively()) // 不被當(dāng)前線程獨(dú)占及穗,拋出異常
            throw new IllegalMonitorStateException();
        // 保存condition隊(duì)列頭結(jié)點(diǎn)
        Node first = firstWaiter;
        if (first != null) // 頭結(jié)點(diǎn)不為空
            // 喚醒所有等待線程
            doSignalAll(first);
    }

    /**
     * 與await()區(qū)別在于摧茴,使用await方法,調(diào)用interrupt()中斷后會(huì)報(bào)錯(cuò)拥坛,而該方法不會(huì)報(bào)錯(cuò)蓬蝶。
     */
    public final void awaitUninterruptibly() {
        // 添加一個(gè)結(jié)點(diǎn)到等待隊(duì)列
        Node node = addConditionWaiter();
        // 獲取釋放的狀態(tài)
        int savedState = fullyRelease(node);
        boolean interrupted = false;
        while (!isOnSyncQueue(node)) { // 
            // 阻塞當(dāng)前線程
            LockSupport.park(this);
            if (Thread.interrupted()) // 當(dāng)前線程被中斷
                // 設(shè)置interrupted狀態(tài)
                interrupted = true; 
        }
        if (acquireQueued(node, savedState) || interrupted) // 
            selfInterrupt();
    }

    /**
     *  等待,當(dāng)前線程在接到信號(hào)或被中斷之前一直處于等待狀態(tài)
     */
    public final void await() throws InterruptedException {
        // 當(dāng)前線程被中斷猜惋,拋出異常
        if (Thread.interrupted()) 
            throw new InterruptedException();
        // 將當(dāng)前線程包裝成Node丸氛,尾插入到等待隊(duì)列中
        Node node = addConditionWaiter();
        // 釋放當(dāng)前線程所占用的lock,在釋放的過程中會(huì)喚醒同步隊(duì)列中的下一個(gè)節(jié)點(diǎn)
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            // 當(dāng)前線程進(jìn)入到等待狀態(tài)
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // 檢查結(jié)點(diǎn)等待時(shí)的中斷類型
                break;
        }
        // 自旋等待獲取到同步狀態(tài)(即獲取到lock)
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        // 處理被中斷的情況
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }

    /**
     * 等待著摔,當(dāng)前線程在接到信號(hào)缓窜、被中斷或到達(dá)指定等待時(shí)間之前一直處于等待狀態(tài)
     */
    public final long awaitNanos(long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter();
        int savedState = fullyRelease(node);
        final long deadline = System.nanoTime() + nanosTimeout;
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            if (nanosTimeout <= 0L) {
                transferAfterCancelledWait(node);
                break;
            }
            if (nanosTimeout >= spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
            nanosTimeout = deadline - System.nanoTime();
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
        return deadline - System.nanoTime();
    }

    /**
     * 等待,當(dāng)前線程在接到信號(hào)、被中斷或到達(dá)指定最后期限之前一直處于等待狀態(tài)
     */
    public final boolean awaitUntil(Date deadline)
            throws InterruptedException {
        long abstime = deadline.getTime();
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter();
        int savedState = fullyRelease(node);
        boolean timedout = false;
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            if (System.currentTimeMillis() > abstime) {
                timedout = transferAfterCancelledWait(node);
                break;
            }
            LockSupport.parkUntil(this, abstime);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
        return !timedout;
    }

    /**
     * 等待禾锤,當(dāng)前線程在接到信號(hào)私股、被中斷或到達(dá)指定等待時(shí)間之前一直處于等待狀態(tài)。此方法在行為上等              
     * 效于:awaitNanos(unit.toNanos(time)) > 0
     */
    public final boolean await(long time, TimeUnit unit)
            throws InterruptedException {
        long nanosTimeout = unit.toNanos(time);
        if (Thread.interrupted())
            throw new InterruptedException();
        // 1. 將當(dāng)前線程包裝成Node恩掷,尾插入到等待隊(duì)列中
        Node node = addConditionWaiter();
        // 2. 釋放當(dāng)前線程所占用的lock倡鲸,在釋放的過程中會(huì)喚醒同步隊(duì)列中的下一個(gè)節(jié)點(diǎn)
        int savedState = fullyRelease(node);
        final long deadline = System.nanoTime() + nanosTimeout;
        boolean timedout = false;
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            if (nanosTimeout <= 0L) {
                timedout = transferAfterCancelledWait(node);
                break;
            }
            if (nanosTimeout >= spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
            nanosTimeout = deadline - System.nanoTime();
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
        return !timedout;
    }

大概了解源碼后我們通過圖看下同步隊(duì)列和等待隊(duì)列的關(guān)系:

同步隊(duì)列與等待隊(duì)列模型

同步隊(duì)列是一個(gè)雙向的鏈表,每個(gè)節(jié)點(diǎn)會(huì)存儲(chǔ)下一個(gè)節(jié)點(diǎn)的信息黄娘,是一種隊(duì)列的實(shí)現(xiàn)峭状。
等待隊(duì)列是一個(gè)單向的鏈表,只有使用到Condition時(shí)才會(huì)存在逼争,并且會(huì)存在多個(gè)优床。
當(dāng)?shù)却?duì)列的線程被喚醒會(huì)被添加到同步隊(duì)列的尾部。

2.2 公平鎖與非公平鎖

二者的區(qū)別主要在于獲取鎖是否和排隊(duì)順序有關(guān)誓焦。當(dāng)鎖唄一個(gè)線程持有胆敞,其他嘗試獲取鎖的線程會(huì)被掛起,加到等待隊(duì)列中杂伟,先被掛起的在隊(duì)列的最前端移层。當(dāng)鎖被釋放,需要通知隊(duì)列中的線程稿壁。作為公平鎖幽钢,會(huì)先喚醒隊(duì)列最前端的線程;而非公平鎖會(huì)喚醒所有線程傅是,通過競爭去獲取鎖,后來的線程有可能獲得鎖蕾羊。

3.3 lock()和unlock()

我們通過本節(jié)的開始時(shí)提供的例子喧笔,代碼跟蹤發(fā)現(xiàn)lock()默認(rèn)走的是非公平鎖:

public ReentrantLock() {
    //初始化默認(rèn)是非公平鎖
    sync = new NonfairSync();
}

可以通過設(shè)置boolean的值設(shè)置是公平鎖還是非公平鎖

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

lock()方法走NonFairLock的lock方法

public void lock() {
    sync.lock();
}

/**
  * 獲取鎖
 */
  final void lock() {
      //CAS嘗試設(shè)置鎖狀態(tài),占用鎖
      if (compareAndSetState(0, 1))
          //修改狀態(tài)成功龟再,設(shè)置當(dāng)前線程為獨(dú)占鎖擁有者
          setExclusiveOwnerThread(Thread.currentThread());
      else
          acquire(1);
   }

只有一個(gè)線程的時(shí)候會(huì)直接獨(dú)占书闸,當(dāng)存在線程競爭的時(shí)候CAS獲取會(huì)返回false,走acquire(1);走到AQS的acquire方法利凑。

public final void acquire(int arg) {
    //走非公平鎖的獲取鎖方法
    if (!tryAcquire(arg) &&
        //鎖獲取失敗并且添加該線程到等待隊(duì)列中
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        //中斷當(dāng)前線程
        selfInterrupt();
}

逐步看看上面代碼中的幾個(gè)方法
tryAcquire()走到獲取非公平鎖:

 final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        //獲取當(dāng)前狀態(tài)
        int c = getState();
        if (c == 0) {
            //  活躍狀態(tài)浆劲,再次嘗試獲取鎖
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        //判斷當(dāng)前線程是否是占用鎖的線程
        else if (current == getExclusiveOwnerThread()) {
            //是當(dāng)前持有鎖的線程,計(jì)數(shù)加1
            //TODO 這里我推測是可重入鎖計(jì)數(shù)的實(shí)現(xiàn)哀澈,后面去驗(yàn)證
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

添加當(dāng)前線程到同步隊(duì)列

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // 將尾節(jié)點(diǎn)設(shè)置為當(dāng)前新節(jié)點(diǎn)的前繼節(jié)點(diǎn)
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        //CAS設(shè)置當(dāng)前節(jié)點(diǎn)為tail
        if (compareAndSetTail(pred, node)) {
            //將當(dāng)前節(jié)點(diǎn)設(shè)置為上一節(jié)點(diǎn)的下一節(jié)點(diǎn)牌借,有點(diǎn)繞
            pred.next = node;
            return node;
        }
    }
    //尾節(jié)點(diǎn)是null
    enq(node);
    return node;
}

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            //尾節(jié)點(diǎn)是null,初始化頭尾節(jié)點(diǎn)
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            //將node 設(shè)置為tail割按,設(shè)置前后節(jié)點(diǎn)的prev和next
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

acquireQueued():

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            //獲取當(dāng)前節(jié)點(diǎn)的前置節(jié)點(diǎn)
            final Node p = node.predecessor();
            //如果前置節(jié)點(diǎn)是頭并且能重新獲取到鎖膨报,應(yīng)該是防止入隊(duì)列時(shí)頭結(jié)點(diǎn)被釋放
            if (p == head && tryAcquire(arg)) {
                //設(shè)置當(dāng)前節(jié)點(diǎn)為頭
                setHead(node);
                p.next = null; // help GC
                failed = false;
                //返回中斷失敗
                return interrupted;
            }
            //如果前置節(jié)點(diǎn)不是head,也未獲取到鎖,立即執(zhí)行中斷
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

接下來分析unlock()方法:

public void unlock() {
    sync.release(1);
}

public final boolean release(int arg) {
    //嘗試釋放
    if (tryRelease(arg)) {
        Node h = head;
        //head不是null现柠,不是活躍狀態(tài)
        if (h != null && h.waitStatus != 0)
           //釋放鎖成功
            unparkSuccessor(h);
        return true;
    }
    return false;
}

protected final boolean tryRelease(int releases) {
        //計(jì)數(shù)減1
        int c = getState() - releases;
        //當(dāng)前線程是否是持有鎖的線程院领,不是則拋出異常
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            //沒有線程持有鎖
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }

而公平鎖獲取比非公平鎖多了一個(gè)判斷

protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            //此處增加了判斷,是否有前驅(qū)節(jié)點(diǎn)在等待
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
//判斷是否有前驅(qū)節(jié)點(diǎn)在等待
public final boolean hasQueuedPredecessors() {
    Node t = tail;
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

三够吩、Condition條件等待與通知

java的Object類型實(shí)現(xiàn)線程等待與通知: 應(yīng)用Object的wait(),wait(long timeout),wait(long timeout, int nanos)與notify(),notifyAll()比然。整體上看是通過對象監(jiān)視器配合完成線程間的等待/通知機(jī)制。

Condition與Lock配合完成等待通知機(jī)制:針對Object類型的等待與通知周循,Condition也提供了對應(yīng)的方式强法。

針對Object的wait(),wait(long timeout),wait(long timeout, int nanos),Condition提供了以下幾個(gè)方法:

void await() throws InterruptedException:當(dāng)前線程進(jìn)入等待狀態(tài)鱼鼓,如果其他線程調(diào)用condition的signal或者signalAll方法并且當(dāng)前線程獲取Lock從await方法返回拟烫,如果在等待狀態(tài)中被中斷會(huì)拋出被中斷異常;
long awaitNanos(long nanosTimeout):當(dāng)前線程進(jìn)入等待狀態(tài)直到被通知迄本,中斷或者超時(shí)硕淑;
boolean await(long time, TimeUnit unit)throws InterruptedException:當(dāng)前線程進(jìn)入等待狀態(tài)直到被通知,支持自定義時(shí)間單位
boolean awaitUntil(Date deadline) throws InterruptedException:當(dāng)前線程進(jìn)入等待狀態(tài)直到被通知嘉赎,中斷或者到了某個(gè)指定時(shí)間

還額外提供個(gè)

void awaitUninterruptibly(); 與await()區(qū)別在于置媳,使用await方法,調(diào)用interrupt()中斷后會(huì)報(bào)錯(cuò)公条,而該方法不會(huì)報(bào)錯(cuò)拇囊。

針對Object的notify(),notifyAll(),Condition提供了以下幾個(gè)方法:

void signal():喚醒一個(gè)等待在condition上的線程靶橱,將該線程從等待隊(duì)列中轉(zhuǎn)移到同步隊(duì)列中寥袭,如果在同步隊(duì)列中能夠競爭到Lock則可以從等待方法中返回。
void signalAll():夠喚醒所有等待在condition上的線程关霸,將全部線程從等待隊(duì)列中轉(zhuǎn)移到同步隊(duì)列中传黄,如果在同步隊(duì)列中能夠競爭到Lock則可以從等待方法中返回。

以上鎖的方式實(shí)際是在AQS中實(shí)現(xiàn)的队寇,源碼請看上一章節(jié)的AQS分析膘掰。

Condition與Object方式的不同:

Condition能夠支持不響應(yīng)中斷,而通過使用Object方式不支持佳遣;
Condition能夠支持多個(gè)等待隊(duì)列(new 多個(gè)Condition對象)识埋,而Object方式只能支持一個(gè);
Condition能夠支持超時(shí)時(shí)間的設(shè)置零渐,而Object不支持

Condition結(jié)合ReentrantLock的使用

/**
 * 庫存
 */
static class Inventory {

    //初始化ReentrantLock實(shí)例
    Lock lock = new ReentrantLock();
    Condition condition = lock.newCondition();

    //庫存數(shù)量
    private int num = 100;

    //增加庫存
    public void add(int n) throws InterruptedException {
        //加鎖
        lock.lock();
        try {
            //先等待sub的通知
            condition.await();
            num += n;
            System.out.println("增加庫存后的數(shù)量=" + num);
        } finally {
            //釋放鎖
            lock.unlock();
        }
    }

    //減少庫存
    public void sub(int n) throws InterruptedException {
        //加鎖
        lock.lock();
        try {
            num -= n;
            System.out.println("減少庫存后的數(shù)量=" + num);
            //睡1s窒舟,為了看add方法接收通知的效果
            Thread.sleep(1000);
            condition.signal();
        } finally {
            //釋放鎖
            lock.unlock();
        }
    }
}

結(jié)果分析:按照代碼邏輯,先走增加方法相恃,但是被await方法阻塞了辜纲,1s后執(zhí)行sub方法笨觅,減少數(shù)量后并sleep1s,使用signal方法通知add方法耕腾,最終看到sub先輸出见剩,add后輸出。

減少庫存后的數(shù)量=99
增加庫存后的數(shù)量=100

在代碼中看到扫俺,condition對象實(shí)際是調(diào)用lock的new ConditionObject()方法苍苞,new了一個(gè)ConditionObject對象,ReentrantLock的內(nèi)部Sync繼承了AQS狼纬,而ConditionObject是AQS的一個(gè)內(nèi)部類羹呵,實(shí)現(xiàn)了Condition接口。接口內(nèi)提供了諸多通信機(jī)制的方法疗琉,可見ReentrantLock冈欢、AQS與Condition的緊密關(guān)聯(lián)。相互關(guān)系請見本章節(jié)開頭的圖盈简。

有點(diǎn)結(jié)論可以提出一下凑耻,了解過lock和synchronized之后,發(fā)現(xiàn)兩種鎖前者是基于jvm內(nèi)存模型的柠贤,后者基于代碼實(shí)現(xiàn)香浩,不知道同學(xué)們有沒有相同感受。

四臼勉、Latch門閂

首先我們寫個(gè)例子邻吭,來理解下門栓的含義:

public static void main(String[] args) throws InterruptedException {

    // 使用倒計(jì)數(shù)門閂器 ,迫使主線程進(jìn)入等待 宴霸;設(shè)置門栓的值為10
    CountDownLatch latch = new CountDownLatch(10);
    new Thread(() -> {
        for (int i = 0; i < 10; i++) {
            //門栓值減1
            latch.countDown();
            System.out.println("當(dāng)前門栓值:" + latch.getCount());
        }
    }).start();
    //阻塞主線程囱晴,等門栓值為0,主線程執(zhí)行
    latch.await();
    System.out.println("主線程執(zhí)行瓢谢。速缆。。");
}

結(jié)果:從以下結(jié)果可以看到恩闻,當(dāng)門栓的值降到0之后,主線程執(zhí)行了剧董。

當(dāng)前門栓值:9
當(dāng)前門栓值:8
當(dāng)前門栓值:7
當(dāng)前門栓值:6
當(dāng)前門栓值:5
當(dāng)前門栓值:4
當(dāng)前門栓值:3
當(dāng)前門栓值:2
當(dāng)前門栓值:1
當(dāng)前門栓值:0
主線程執(zhí)行幢尚。。翅楼。

接下來我們分析下原理尉剩,其中有個(gè)內(nèi)部類Sync,同樣繼承了AQS


內(nèi)部類
private static final class Sync extends AbstractQueuedSynchronizer

結(jié)合上面的例子逐步分析源碼毅臊,首先初始化了一個(gè)CountDownLatch對象:

// 使用倒計(jì)數(shù)門閂器 理茎,迫使主線程進(jìn)入等待 ;設(shè)置門栓的值為10
CountDownLatch latch = new CountDownLatch(10);

//構(gòu)造
public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

//同步代碼塊
Sync(int count) {
         //設(shè)置AQS的state計(jì)數(shù)
        setState(count);
}

用await阻塞主線程:

public void await() throws InterruptedException {
    //AQS的獲取中斷共享鎖
    sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    //獲取當(dāng)前值是多少
    if (tryAcquireShared(arg) < 0)
        //獲取共享鎖
        doAcquireSharedInterruptibly(arg);
}

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    //填加獲取共享鎖類型到同步隊(duì)列
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            //獲取前驅(qū)節(jié)點(diǎn)
            final Node p = node.predecessor();
            if (p == head) {
                //前驅(qū)節(jié)點(diǎn)等于head,嘗試獲取共享鎖皂林,就是獲取state的值
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    //獲取共享鎖成功朗鸠,設(shè)置當(dāng)前節(jié)點(diǎn)為head,釋放原h(huán)ead共享鎖
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            //阻塞和中斷
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

countDown()減數(shù)量础倍,釋放共享鎖

public void countDown() {
    //釋放共享鎖
    sync.releaseShared(1);
}
//AQS釋放共享鎖
public final boolean releaseShared(int arg) {
    //獲取state并減1
    if (tryReleaseShared(arg)) {
        //無線循環(huán)并通過CAS釋放所有共享鎖
        doReleaseShared();
        return true;
    }
    return false;
}

五烛占、CyclicBarrier線程?hào)艡?/h1>

先看一個(gè)使用例子

public static void main(String[] args) throws BrokenBarrierException, InterruptedException {

    CyclicBarrier cyclicBarrier = new CyclicBarrier(6);

    for (int i = 0; i < 5; i++) {
        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + "準(zhǔn)備就緒");
                cyclicBarrier.await();
                System.out.println(Thread.currentThread().getName() + "到達(dá)");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }).start();
    }

    Thread.sleep(1000);
    System.out.println(Thread.currentThread().getName() + "準(zhǔn)備開始");
    cyclicBarrier.await();

}

結(jié)果:5個(gè)線程和main函數(shù)進(jìn)行await,當(dāng)總數(shù)達(dá)到6后沟启,開始執(zhí)行忆家。是不是很簡單。

Thread-1準(zhǔn)備就緒
Thread-4準(zhǔn)備就緒
Thread-0準(zhǔn)備就緒
Thread-2準(zhǔn)備就緒
Thread-3準(zhǔn)備就緒
main準(zhǔn)備開始
Thread-1到達(dá)
Thread-0到達(dá)
Thread-3到達(dá)
Thread-2到達(dá)
Thread-4到達(dá)

看看源碼實(shí)現(xiàn):

//構(gòu)造函數(shù)德迹,parties為線程數(shù)量
public CyclicBarrier(int parties) {
    this(parties, null);
}
//Runnable 參數(shù)芽卿,這個(gè)參數(shù)的意思是最后一個(gè)到達(dá)線程要做的任務(wù)
public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

//阻塞方法
public int await() throws InterruptedException, BrokenBarrierException {
    try {
        //引入了Condition等待隊(duì)列,使用await()方法與signalAll()方法胳搞,通過counnt計(jì)數(shù)
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

六卸例、Semaphore信號(hào)量

Semaphore 通常我們叫它信號(hào)量, 可以用來控制同時(shí)訪問特定資源的線程數(shù)量流酬,通過協(xié)調(diào)各個(gè)線程币厕,以保證合理的使用資源。
官方解釋是Semaphore用于限制可以訪問某些資源(物理或邏輯的)的線程數(shù)目芽腾,他維護(hù)了一個(gè)許可證集合旦装,有多少資源需要限制就維護(hù)多少許可證集合,假如這里有N個(gè)資源摊滔,那就對應(yīng)于N個(gè)許可證阴绢,同一時(shí)刻也只能有N個(gè)線程訪問。一個(gè)線程獲取許可證就調(diào)用acquire方法艰躺,用完了釋放資源就調(diào)用release方法呻袭。

舉個(gè)例子

public static void main(String[] args) {
    Semaphore semaphore = new Semaphore(2);
    for (int i = 0; i < 10; i++) {
        new Thread(() -> {
            try {
                semaphore.acquire();
                System.out.println("線程" + Thread.currentThread().getName() + "占用時(shí)間:" + LocalDateTime.now());
                Thread.sleep(2000);
                semaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

結(jié)果:每次只通過兩個(gè)線程,等待兩秒腺兴。

線程Thread-0占用時(shí)間:2020-08-24T09:45:31.738
線程Thread-1占用時(shí)間:2020-08-24T09:45:31.738
線程Thread-2占用時(shí)間:2020-08-24T09:45:33.740
線程Thread-3占用時(shí)間:2020-08-24T09:45:33.740
線程Thread-4占用時(shí)間:2020-08-24T09:45:35.740
線程Thread-5占用時(shí)間:2020-08-24T09:45:35.740
線程Thread-6占用時(shí)間:2020-08-24T09:45:37.741
線程Thread-7占用時(shí)間:2020-08-24T09:45:37.741
線程Thread-8占用時(shí)間:2020-08-24T09:45:39.741
線程Thread-9占用時(shí)間:2020-08-24T09:45:39.742

針對上面的例子左电,我們看下具體的實(shí)現(xiàn)原理:


內(nèi)部類

實(shí)現(xiàn)了三個(gè)內(nèi)部類,與ReentrantLock是相同的页响,Syn繼承的AQS篓足,公平鎖與非公平鎖分別繼承Sync實(shí)現(xiàn)同步。

初始化方法:默認(rèn)非公平鎖闰蚕,同時(shí)定義下通行證的數(shù)量栈拖。將通行證數(shù)量設(shè)置到AQS的state。

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

protected final void setState(int newState) {
    state = newState;
}

獲取鎖方法:semaphore.acquire();

public void acquire() throws InterruptedException {
    //獲取共享可中斷鎖
    sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    //嘗試獲取共享鎖没陡,小于0涩哟,則表示當(dāng)前通行證不足
    if (tryAcquireShared(arg) < 0)
        //通行證數(shù)量不足索赏,創(chuàng)建阻塞隊(duì)列
        doAcquireSharedInterruptibly(arg);
}

跟蹤tryAcquireShared(arg)到底層:

final int nonfairTryAcquireShared(int acquires) {
        for (;;) {
            //獲取通行證數(shù)量
            int available = getState();
            //減去需要或取得數(shù)量
            int remaining = available - acquires;
            //獲取后數(shù)量小于0,直接返回獲取后數(shù)量,大于0贴彼,CAS設(shè)置state
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }

跟蹤doAcquireSharedInterruptibly(int arg)方法

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    //添加共享鎖節(jié)點(diǎn)到同步隊(duì)列的尾部
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
             //獲得當(dāng)前節(jié)點(diǎn)pre節(jié)點(diǎn)
            final Node p = node.predecessor();
            if (p == head) {
                //再次嘗試獲取共享鎖
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                     //獲取共享鎖成功潜腻,設(shè)置當(dāng)前節(jié)點(diǎn)為head,釋放原h(huán)ead共享鎖
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
           //重組雙向鏈表锻弓,清空無效節(jié)點(diǎn)砾赔,掛起當(dāng)前線程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

釋放鎖方法semaphore.release(),跟蹤到底層

public final boolean releaseShared(int arg) {
    //釋放鎖
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            //獲取當(dāng)前狀態(tài)
            int current = getState();
            //加上要釋放的值得到最新的值
            int next = current + releases;
            //加完后小于當(dāng)前值青灼,【】拋出異常
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            //CAS設(shè)置state
            if (compareAndSetState(current, next))
                return true;
        }
    }

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            //是否需要喚醒后繼節(jié)點(diǎn)
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                 //喚醒h.nex節(jié)點(diǎn)線程
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

七暴心、Semaphore與Lock的區(qū)別(高頻面試)

最主要的區(qū)別在于,Semaphore可以進(jìn)行死鎖恢復(fù)杂拨。

我們看下Lock的釋放鎖源碼专普,以ReentrantLock為例。如果當(dāng)前線程不是持有鎖的線程弹沽,則拋出IllegalMonitorStateException異常檀夹,也就是說,Lock在unlock前策橘,必須先lock炸渡,持有鎖。

protected final boolean tryRelease(int releases) {
    //計(jì)數(shù)減1
    int c = getState() - releases;
    //當(dāng)前線程是否是持有鎖的線程丽已,不是則拋出異常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        //沒有線程持有鎖
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

而Semaphore則沒有這個(gè)判斷蚌堵,會(huì)直接將設(shè)置state的值,增加通行證的數(shù)量沛婴。分別舉兩個(gè)例子看下吼畏。

public static void main(String[] args) {

    Lock lock = new ReentrantLock();

    //Semaphore semaphore = new Semaphore(1);

    new Thread(()->{
        lock.unlock();
    }).start();
}

結(jié)果拋出異常:

Exception in thread "Thread-0" java.lang.IllegalMonitorStateException
at java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.java:151)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueuedSynchronizer.java:1261)
at java.util.concurrent.locks.ReentrantLock.unlock(ReentrantLock.java:457)
at com.cloud.bssp.thread.SemaphoreAndLock.lambda$main$0(SemaphoreAndLock.java:26)
at java.lang.Thread.run(Thread.java:748)

下面來看下semaphore的例子

public static void main(String[] args) throws InterruptedException {
    Semaphore semaphore = new Semaphore(1);
    System.out.println("當(dāng)前通行證數(shù)量:" + semaphore.availablePermits());
    new Thread(()->{
        semaphore.release();
    }).start();
    Thread.sleep(1000);
    System.out.println("當(dāng)前通行證數(shù)量:" + semaphore.availablePermits());
}

結(jié)果:發(fā)現(xiàn)在release之后,數(shù)量增加的一個(gè)嘁灯。我們可以利用這個(gè)特性去做死鎖恢復(fù)泻蚊。

簡單模仿下死鎖恢復(fù)的例子,兩個(gè)線程一個(gè)先占用semaphore1丑婿,一個(gè)先占用semaphore2性雄,分別sleep5秒,這時(shí)候沒有釋放羹奉,在去占用另外一個(gè)毅贮,發(fā)現(xiàn)產(chǎn)生了死鎖,線程卡在這里不動(dòng)了尘奏。main方法主線程會(huì)在10秒后去判斷是否釋放鎖,沒有的話由主線程去釋放病蛉,這時(shí)候發(fā)現(xiàn)兩個(gè)線程分別獲取到了鎖炫加。

/**
 * 死鎖恢復(fù)
 */
public static void main(String[] args) throws InterruptedException {
    Semaphore semaphore1 = new Semaphore(1);
    Semaphore semaphore2 = new Semaphore(1);
    new Thread(() -> {
        try {
            semaphore1.acquire();
            System.out.println("線程" + Thread.currentThread().getName() + "獲取semaphore1");
            Thread.sleep(5000);
            semaphore2.acquire();
            System.out.println("線程" + Thread.currentThread().getName() + "獲取semaphore2");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();

    new Thread(() -> {
        try {
            semaphore2.acquire();
            System.out.println("線程" + Thread.currentThread().getName() + "獲取semaphore2");
            Thread.sleep(5000);
            semaphore1.acquire();
            System.out.println("線程" + Thread.currentThread().getName() + "獲取semaphore1");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();

    Thread.sleep(10000);
    //主線程等待十秒瑰煎,判斷兩個(gè)線程是否執(zhí)行完畢,是否釋放鎖
    if (semaphore1.availablePermits() != 1) {
        System.out.println("發(fā)生死鎖了俗孝,釋放semaphore1");
        semaphore1.release();
    }
    if (semaphore2.availablePermits() != 1) {
        System.out.println("發(fā)生死鎖了酒甸,釋放semaphore2");
        semaphore2.release();
    }
}

結(jié)果:

線程Thread-0獲取semaphore1
線程Thread-1獲取semaphore2
發(fā)生死鎖了,釋放semaphore1
發(fā)生死鎖了赋铝,釋放semaphore2
線程Thread-1獲取semaphore1
線程Thread-0獲取semaphore2

八插勤、ThreadLocal線程本地變量(高頻面試)

顧名思義,ThreadLocal可以理解為線程本地變量革骨,當(dāng)創(chuàng)建了ThreadLocal變量农尖,那么線程對于ThreadLocal的讀取就是相互隔離的,不會(huì)產(chǎn)生影響良哲。

8.1 使用實(shí)例

先拋個(gè)實(shí)際使用的例子扔在這盛卡,10個(gè)線程分別對ThreadLocal進(jìn)行加1,最終結(jié)果都是101筑凫,每個(gè)線程修改了各自的本地變量滑沧。如果是int類型的,結(jié)果應(yīng)該為110巍实,體現(xiàn)了線程本地變量的特性滓技。

/**
 * 庫存
 */
static class Inventory {

    private ThreadLocal<Integer> num = ThreadLocal.withInitial(() -> 100);

    //增加庫存
    public synchronized void add(int n, String threadName) {
        //增加庫存
        num.set(num.get() + n);
        System.out.println("線程:" + threadName + ",增加庫存后的數(shù)量=" + num.get());
    }
}

public static void main(String[] args) {
    Inventory inventory = new Inventory();
    for (int i = 0; i < 10; i++) {
        new Thread(() -> {
            inventory.add(1, Thread.currentThread().getName());
        }).start();
    }
}

結(jié)果:

線程:Thread-0,增加庫存后的數(shù)量=101
線程:Thread-2,增加庫存后的數(shù)量=101
線程:Thread-1,增加庫存后的數(shù)量=101
線程:Thread-3,增加庫存后的數(shù)量=101
線程:Thread-7,增加庫存后的數(shù)量=101
線程:Thread-9,增加庫存后的數(shù)量=101
線程:Thread-8,增加庫存后的數(shù)量=101
線程:Thread-6,增加庫存后的數(shù)量=101
線程:Thread-4,增加庫存后的數(shù)量=101
線程:Thread-5,增加庫存后的數(shù)量=101

8.2 源碼解讀

我很難寫出比這篇文章更好的了,所以直接上連接了棚潦,不在寫了令漂,這篇文章絕對是當(dāng)前百度能找到最詳細(xì)的了。
https://www.cnblogs.com/micrari/p/6790229.html

九瓦盛、Phaser 線程階段器(本文只介紹簡單使用)

在jdk1.7中被引入洗显,能夠完成多階段的任務(wù),并且每個(gè)階段可以多線程并發(fā)執(zhí)行原环,但是需要當(dāng)前階段全部完成才能進(jìn)入下一階段挠唆,相比于CyclicBarrier或者CountryDownLatch,功能更加強(qiáng)大和靈活嘱吗。

用法

/**
 * 線程數(shù)玄组,即學(xué)生數(shù)量
 */
private static int PARTIES = 5;

static Phaser p = new Phaser() {

    @Override
    protected boolean onAdvance(int phase, int registeredParties) {
        switch (phase) {
            case 0:
                System.out.println("第一題完成");
                return false;
            case 1:
                System.out.println("第二題完成");
                return false;
            case 2:
                System.out.println("第三題完成");
                return false;
            default:
                return true;
        }
    }
};

private static void firstQuestion() {
    System.out.println("線程:" + Thread.currentThread().getName() + "谒麦,第一題");
    p.arriveAndAwaitAdvance();
}

private static void secondQuestion() {
    System.out.println("線程:" + Thread.currentThread().getName() + "绕德,第二題");
    p.arriveAndAwaitAdvance();
}

private static void thirdQuestion() {
    System.out.println("線程:" + Thread.currentThread().getName() + "患膛,第三題");
    p.arriveAndAwaitAdvance();
}

public static void main(String[] args) {
    for (int i = 0; i < PARTIES; i++) {
        new Thread(() -> {
            //線程注冊
            p.register();
            firstQuestion();
            secondQuestion();
            thirdQuestion();
        }).start();
    }
}

結(jié)果:五個(gè)線程分階段完成了每個(gè)題目

線程:Thread-1,第一題
線程:Thread-3踪蹬,第一題
線程:Thread-2,第一題
線程:Thread-0跃捣,第一題
線程:Thread-4漱牵,第一題
第一題完成
線程:Thread-4疚漆,第二題
線程:Thread-3,第二題
線程:Thread-2闻镶,第二題
線程:Thread-1瞬雹,第二題
線程:Thread-0,第二題
第二題完成
線程:Thread-0,第三題
線程:Thread-4挡育,第三題
線程:Thread-1,第三題
線程:Thread-3蒲障,第三題
線程:Thread-2瘫证,第三題
第三題完成

十背捌、Exchanger 線程數(shù)據(jù)交換器 (本文只介紹簡單使用)

Exchanger 是 JDK 1.5 開始提供的一個(gè)用于兩個(gè)工作線程之間交換數(shù)據(jù)的封裝工具類毡庆,當(dāng)?shù)谝粋€(gè)線程調(diào)用了exchange()方法后坑赡,當(dāng)前線程會(huì)進(jìn)入阻塞狀態(tài)毅否,直到第二個(gè)線程也執(zhí)行了exchange()方法螟加,交換數(shù)據(jù),繼續(xù)執(zhí)行甸昏。

使用實(shí)例

/**
 * 初始化string類型的Exchanger
 */
static Exchanger<String> exchanger = new Exchanger<>();

public static void main(String[] args) throws InterruptedException {
    
    new Thread(() -> {
        String flag1 = "111";
        System.out.println(Thread.currentThread().getName() + "交換前flag1=" + flag1);
        try {
            //交換數(shù)據(jù),并進(jìn)入阻塞
            flag1 = exchanger.exchange(flag1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + "交換后flag1=" + flag1);
    }).start();

    Thread.sleep(1000);
    new Thread(() -> {
        String flag2 = "222";
        System.out.println(Thread.currentThread().getName()+ "交換后flag2=" + flag2);
        try {
            //交換數(shù)據(jù)雌隅,喚醒上一個(gè)線程
            flag2 = exchanger.exchange(flag2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName()+ "交換前flag2=" + flag2);
    }).start();
}

結(jié)果:在交換過后恰起,flag1和flag2的值發(fā)生了互換检盼。

Thread-0交換前flag1=111
Thread-1交換后flag2=222
Thread-1交換前flag2=111
Thread-0交換后flag1=222
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末吨枉,一起剝皮案震驚了整個(gè)濱河市貌亭,隨后出現(xiàn)的幾起案子认臊,更是在濱河造成了極大的恐慌失晴,老刑警劉巖涂屁,帶你破解...
    沈念sama閱讀 216,692評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件胯陋,死亡現(xiàn)場離奇詭異,居然都是意外死亡义矛,警方通過查閱死者的電腦和手機(jī)凉翻,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,482評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門制轰,熙熙樓的掌柜王于貴愁眉苦臉地迎上來垃杖,“玉大人,你說我怎么就攤上這事伶棒》粑蓿” “怎么了宛渐?”我有些...
    開封第一講書人閱讀 162,995評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵窥翩,是天一觀的道長鳍烁。 經(jīng)常有香客問我繁扎,道長梳玫,這世上最難降的妖魔是什么提澎? 我笑而不...
    開封第一講書人閱讀 58,223評(píng)論 1 292
  • 正文 為了忘掉前任盼忌,我火速辦了婚禮谦纱,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘川慌。我一直安慰自己梦重,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,245評(píng)論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著力崇,像睡著了一般赢织。 火紅的嫁衣襯著肌膚如雪于置。 梳的紋絲不亂的頭發(fā)上八毯,一...
    開封第一講書人閱讀 51,208評(píng)論 1 299
  • 那天话速,我揣著相機(jī)與錄音泊交,去河邊找鬼柱查。 笑死唉工,一個(gè)胖子當(dāng)著我的面吹牛淋硝,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播竿报,決...
    沈念sama閱讀 40,091評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了侨嘀?” 一聲冷哼從身側(cè)響起咬腕,我...
    開封第一講書人閱讀 38,929評(píng)論 0 274
  • 序言:老撾萬榮一對情侶失蹤涨共,失蹤者是張志新(化名)和其女友劉穎举反,沒想到半個(gè)月后火鼻,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體雕崩,經(jīng)...
    沈念sama閱讀 45,346評(píng)論 1 311
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡粗蔚,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,570評(píng)論 2 333
  • 正文 我和宋清朗相戀三年鹏控,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了牧挣。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片醒陆。...
    茶點(diǎn)故事閱讀 39,739評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡寺晌,死狀恐怖澡刹,靈堂內(nèi)的尸體忽然破棺而出罢浇,到底是詐尸還是另有隱情,我是刑警寧澤赖临,帶...
    沈念sama閱讀 35,437評(píng)論 5 344
  • 正文 年R本政府宣布兢榨,位于F島的核電站吵聪,受9級(jí)特大地震影響兼雄,放射性物質(zhì)發(fā)生泄漏赦肋。R本人自食惡果不足惜金砍,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,037評(píng)論 3 326
  • 文/蒙蒙 一恕稠、第九天 我趴在偏房一處隱蔽的房頂上張望鹅巍。 院中可真熱鬧骆捧,春花似錦髓绽、人聲如沸顺呕。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,677評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至藤滥,卻和暖如春颗味,著一層夾襖步出監(jiān)牢的瞬間浦马,已是汗流浹背晶默。 一陣腳步聲響...
    開封第一講書人閱讀 32,833評(píng)論 1 269
  • 我被黑心中介騙來泰國打工磺陡, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留币他,地道東北人蝴悉。 一個(gè)月前我還...
    沈念sama閱讀 47,760評(píng)論 2 369
  • 正文 我出身青樓拍冠,卻偏偏與公主長得像庆杜,于是被迫代替她去往敵國和親晃财。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,647評(píng)論 2 354