第6章 Java并發(fā)包中鎖原理剖析

目錄

LockSupport工具類

LockSupport是創(chuàng)建鎖和其他同步類的基礎(chǔ)。

LockSupport類與每個(gè)使用它的線程都會(huì)關(guān)聯(lián)一個(gè)許可證波闹,默認(rèn)情況下調(diào)用LockSupport類的方法的線程是不持有許可證的悯许。

下面介紹LockSupport類中的幾個(gè)主要函數(shù)术羔。

1. void park()

如果park方法拿到了與LockSupport關(guān)聯(lián)的許可證,則調(diào)用LockSupport.park()時(shí)會(huì)馬上返回滴铅,否則調(diào)用線程會(huì)被禁止參與線程的調(diào)度,也就是會(huì)被阻塞掛起。

如下代碼直接在main函數(shù)里面調(diào)用park方法式散,最終只會(huì)輸出 "begin park!"打颤,然后當(dāng)前線程被掛起暴拄,這時(shí)因?yàn)樵谀J(rèn)情況下調(diào)用線程是不持有許可證的。

public static void main(String[] args) {
    System.out.println("begin park!");
    LockSupport.park();
    System.out.println("end park!");
}

在其他線程調(diào)用unpark(Thread thread)方法并且將當(dāng)前線程作為參數(shù)時(shí)编饺,調(diào)用park方法而被阻塞的線程會(huì)返回乖篷。另外,如果其他線程調(diào)用了阻塞線程的interrupt()方法透且,設(shè)置了中斷標(biāo)志或者線程被虛假喚醒撕蔼,則線程也會(huì)返回豁鲤。所以在調(diào)用park方法時(shí)最好也使用循環(huán)條件判斷方式。

注意:
因調(diào)用park方法而被阻塞的線程被其他線程中斷而返回時(shí)并不會(huì)拋出InterruptedException異常鲸沮。

2. void unpark(Thread thread)

當(dāng)一個(gè)線程調(diào)用unpark時(shí)琳骡,如果參數(shù)thread線程沒(méi)有持有thread與LockSupport類相關(guān)聯(lián)的許可證,則讓thread線程持有讼溺。如果thread因調(diào)用park()而被掛起楣号,則unpark方法會(huì)使其被喚醒。如果thread之前沒(méi)有調(diào)用park怒坯,則調(diào)用unpark方法后再調(diào)用park方法會(huì)立即返回炫狱,代碼如下。

public static void main(String[] args) {
    System.out.println("begin park!");
    LockSupport.unpark(Thread.currentThread());
    LockSupport.park();
    System.out.println("end park!");
}

輸出如下:

begin park!
end park!

下面再來(lái)看一個(gè)例子來(lái)加深對(duì)park和unpark的理解剔猿。

public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("child thread begin park!");
                // 掛起自己
                LockSupport.park();
                System.out.println("child thread unpark!");
            }
        });

        thread.start();

        // 確保調(diào)用unpark前子線程已經(jīng)將自己掛起
        Thread.sleep(1000);

        System.out.println("main thread begin unpark!");

        LockSupport.unpark(thread);
    }

子線程將自己掛起视译,主線程中調(diào)用了unpark方法使得子線程得以繼續(xù)運(yùn)行。

3. void parkNanos(long nanos)

和park方法類似艳馒,如果調(diào)用park方法的線程已經(jīng)拿到了與LockkSupport關(guān)聯(lián)的許可證憎亚,則調(diào)用LockSupport.parkNanos(long nanos)方法會(huì)立即返回。不同之處在于弄慰,如果沒(méi)有拿到許可證第美,則調(diào)用線程會(huì)被掛起nanos時(shí)間后自動(dòng)返回。

抽象同步隊(duì)列AQS概述

AQS——鎖的底層支持

AbstractQueuedSynchronizer抽象同步隊(duì)列簡(jiǎn)稱AQS陆爽,是實(shí)現(xiàn)同步器的基礎(chǔ)組件什往。

以下為AQS的類結(jié)構(gòu)圖:

AQS是一個(gè)FIFO的雙向隊(duì)列,內(nèi)部通過(guò)head和tail兩個(gè)節(jié)點(diǎn)來(lái)對(duì)隊(duì)列進(jìn)行維護(hù)慌闭。

Node是AQS的一個(gè)靜態(tài)內(nèi)部類别威,屬性SHARED和EXCLUSIVE分別代表用來(lái)標(biāo)識(shí)線程是獲取共享資源和獨(dú)占資源時(shí)被阻塞掛起放入AQS隊(duì)列的。thread為Node持有的Thread驴剔;waitStatus用于記錄當(dāng)前線程的狀態(tài)省古,CANCELLED表示線程被取消,SIGNAL表示線程需要喚醒丧失,CONDITION表示線程在條件隊(duì)列里面等待豺妓,PROPAGATE表示釋放共享資源時(shí)需要通知其他節(jié)點(diǎn)。

AQS維護(hù)了一個(gè)單一的狀態(tài)信息state布讹,可以通過(guò)getState琳拭、setState、compareAndSetState函數(shù)修改其值描验。

AQS內(nèi)部類ConditionObject用來(lái)結(jié)合鎖實(shí)現(xiàn)線程同步白嘁。

AQS實(shí)現(xiàn)線程同步的關(guān)鍵是對(duì)state進(jìn)行操作,根據(jù)state是否屬于一個(gè)線程膘流,操作state的方式可分為獨(dú)占方式和共享方式絮缅。

獨(dú)占方式下獲取和釋放資源的方法為:

void acquire(int arg)
void acauireInterruptibly(int arg)
boolean release(int arg)

共享方式下獲取和釋放資源的方法為:

void acauireShared(int arg)
void acauireSharedInterruptibly(int arg)
boolean releaseShared(int arg)

獨(dú)占方式下鲁沥,獲取和釋放資源的流程如下:

當(dāng)一個(gè)線程調(diào)用acquire(int arg)獲取獨(dú)占資源時(shí),會(huì)首先使用tryAcquire方法進(jìn)行嘗試盟蚣,具體就是設(shè)置state的值黍析,成功則世界返回,失敗則將當(dāng)前線程封裝為類型為Node.EXCLUSIVE的Node節(jié)點(diǎn)后插入到AQS阻塞隊(duì)列的尾部屎开,并調(diào)用LockSupport.park(this)掛起自己。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

但一個(gè)線程調(diào)用release(int arg)會(huì)嘗試使用tryRelease操作釋放資源马靠,這里也是改變state的值奄抽,然后調(diào)用LockSupport.unpark(thread)方法激活A(yù)QS隊(duì)列里面被阻塞的一個(gè)線程(thread)。被阻塞的線程使用tryAcquire嘗試甩鳄,看當(dāng)前state的值是否滿足自己的需要逞度,滿足則該線程被激活,繼續(xù)向下運(yùn)行妙啃,否則還是會(huì)被放入AQS隊(duì)列并被掛起档泽。

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

注意;
AQS類并沒(méi)有提供tryAcquire和tryRelease方法的實(shí)現(xiàn)揖赴,因?yàn)锳QS是一個(gè)基礎(chǔ)框架馆匿,這兩個(gè)方法需要由子類自己實(shí)現(xiàn)來(lái)實(shí)現(xiàn)自己的特性。

共享方式下燥滑,獲取和釋放資源的流程如下渐北;

當(dāng)線程調(diào)用acquireShared(int arg)獲取共享資源時(shí),首先使用tryAcquireShared嘗試獲取資源并修改state铭拧,成功則直接放回赃蛛,否則將當(dāng)前線程封裝為Node.SHARED類型的節(jié)點(diǎn)插入到AQS阻塞隊(duì)列的尾部,并使用LockSupport.park(this)方法掛起自己搀菩。

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

當(dāng)一個(gè)線程調(diào)用releaseShared(int arg)時(shí)會(huì)嘗試使用tryReleasedShared操作釋放資源并修改state呕臂,然后使用LockSupport.unpark(thread)激活A(yù)QS隊(duì)列中的一個(gè)線程(thread)。被激活的線程會(huì)調(diào)用tryReleaseShared查看當(dāng)前state是否滿足自己需求肪跋,滿足則該線程被激活歧蒋,否則繼續(xù)掛起。

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

注意:
同上澎嚣,AQS沒(méi)有提供tryAcquiredShared和tryReleaseShared方法的實(shí)現(xiàn)疏尿,這兩個(gè)方法也需要由子類實(shí)現(xiàn)。

AQS——條件變量的支持

以下是使用條件變量的例子:

ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();

lock.lock();
try{
    System.out.println("begin wait");
    condition.await();
    System.out.println("end wait");
} catch (InterruptedException e) {
    e.printStackTrace();
}finally {
    lock.unlock();
}

lock.lock();
try{
    System.out.println("begin signal");
    condition.signal();
    System.out.println("end signal");
}catch (Exception e){
    e.printStackTrace();
}finally {
    lock.unlock();
}

上述代碼中易桃,condition是由Lock對(duì)象調(diào)用newCondition方法創(chuàng)建的條件變量褥琐,一個(gè)Lock對(duì)象可以創(chuàng)建多個(gè)條件變量。

lock.lock()方法相當(dāng)于進(jìn)入synchronized同步代碼塊晤郑,用于獲取獨(dú)占鎖敌呈;await()方法相當(dāng)于Object.wait()方法贸宏,用于阻塞掛起當(dāng)前線程,當(dāng)其他線程調(diào)用了signal方法(相當(dāng)于Object.notify()方法)時(shí)磕洪,被阻塞的線程才會(huì)從await處返回吭练。

lock.newCondition()作用是new一個(gè)在AQS內(nèi)部類ConditionObject對(duì)象。每個(gè)條件變量?jī)?nèi)部都維護(hù)了一個(gè)條件隊(duì)列析显,用來(lái)存放調(diào)用該條件變量的await方法時(shí)被阻塞的線程鲫咽。

注意:
這個(gè)條件隊(duì)列和AQS隊(duì)列不是一回事。

以下是await的源碼:

    public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        // 創(chuàng)建新的node節(jié)點(diǎn)谷异,并插入到條件隊(duì)列末尾    
        Node node = addConditionWaiter();
        // 釋放當(dāng)前線程的鎖
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        // 調(diào)用park方法阻塞掛起當(dāng)前線程
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        ...
    }

首先會(huì)構(gòu)造一個(gè)類型為Node.CONDITION的node節(jié)點(diǎn)分尸,然后將該節(jié)點(diǎn)處插入條件隊(duì)列末尾,之后當(dāng)前線程會(huì)釋放獲取的鎖歹嘹,并被阻塞掛起箩绍。這時(shí)如果有其他線程調(diào)用lock.lock()方法嘗試獲取鎖,就會(huì)有一個(gè)線程獲取到鎖尺上。

再來(lái)看signal源碼:

public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        // 將條件隊(duì)列頭元素移動(dòng)到AQS隊(duì)列等待執(zhí)行
        doSignal(first);
}

調(diào)用signal時(shí)材蛛,會(huì)把條件隊(duì)列隊(duì)首元素放入AQS中并激活隊(duì)首元素對(duì)應(yīng)的線程。

基于AQS實(shí)現(xiàn)自定義同步器

下面基于AQS實(shí)現(xiàn)一個(gè)不可重入的獨(dú)占鎖怎抛。自定義AQS重寫一系列函數(shù)卑吭,還需要定義原子變量state的含義。這里定義state為0表示目前鎖沒(méi)有被線程持有抽诉,state為1表示鎖已經(jīng)被某一個(gè)線程持有陨簇。

public class NonReentrantLock implements Lock, Serializable {

    // 內(nèi)部幫助類
    private static class Sync extends AbstractQueuedSynchronizer {

        // 鎖是否被持有
        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        // 嘗試獲取鎖
        @Override
        protected boolean tryAcquire(int arg) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        // 嘗試釋放鎖
        @Override
        protected boolean tryRelease(int arg) {
            if(getState() == 0) {
                throw new IllegalMonitorStateException();
            }
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        // 提供條件變量接口
        Condition newCondition() {
            return new ConditionObject();
        }

    }

    // 創(chuàng)建一個(gè)Sync來(lái)做具體工作
    private final Sync sync = new Sync();

    @Override
    public void lock() {
        sync.acquire(1);
    }


    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }


    @Override
    public void unlock() {
        sync.tryRelease(1);
    }


    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }


    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}

NonReentrantLock定義了一個(gè)內(nèi)部類Sync用來(lái)實(shí)現(xiàn)具體的鎖的操作,Sync繼承了AQS。由于是獨(dú)占鎖,:Sync只重寫了tryAcquire返劲、tryRelease、isHeldExclusively耙饰。此外,Sync提供了newCondition方法來(lái)支持條件變量纹份。

下面使用自定義的鎖來(lái)實(shí)現(xiàn)簡(jiǎn)單的生產(chǎn)-消費(fèi)模型

final static NonReentrantLock lock = new NonReentrantLock();
final static Condition notFull = lock.newCondition();
final static Condition notEmpty = lock.newCondition();

final static Queue<String> queue = new LinkedBlockingQueue<>();
final static int queueSize = 10;

public static void main(String[] args) {
    Thread producer = new Thread(new Runnable() {
        @Override
        public void run() {
            // 獲取獨(dú)占鎖
            lock.lock();
            try {
                while (true) {
                    // 隊(duì)列滿了則等待
                    while (queue.size() >= queueSize) {
                        notEmpty.await();
                    }
                    queue.add("ele");
                    System.out.println("add...");
                    notFull.signalAll();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // 釋放鎖
                lock.unlock();
            }
        }
    });

    Thread consumer = new Thread(new Runnable() {
        @Override
        public void run() {
            // 獲取獨(dú)占鎖
            lock.lock();
            try {
                while (true) {
                    // 隊(duì)列空則等待
                    while (queue.size() == 0) {
                        notFull.await();
                    }
                    String ele = queue.poll();
                    System.out.println("poll...");
                    notEmpty.signalAll();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // 釋放鎖
                lock.unlock();
            }
        }
    });

    producer.start();
    consumer.start();
}

代碼使用了NonReentrantLock來(lái)創(chuàng)建lock苟跪,并調(diào)用lock.newCondition創(chuàng)建了兩個(gè)條件變量用來(lái)實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者線程的同步。

ReentrantLock的原理

類圖結(jié)構(gòu)

構(gòu)造函數(shù)如下:

public ReentrantLock() {
    sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

可以看到蔓涧,ReentrantLock最終還是依賴AQS件已,并且根據(jù)傳入的參數(shù)來(lái)決定其內(nèi)部是一個(gè)公平鎖還是非公平鎖(默認(rèn)為公平鎖)。

Sync類直接繼承自AQS元暴,它的子類NonfairSync和FairSync分別實(shí)現(xiàn)了獲取鎖的非公平與公平策略篷扩。

AQS的state表示線程獲取鎖的可重入次數(shù)。state為0表示當(dāng)前鎖沒(méi)有被任何線程持有茉盏。當(dāng)一個(gè)線程第一次獲取該所是會(huì)嘗試使用CAS設(shè)置state為1鉴未,成功后記錄該鎖的持有者為當(dāng)前線程枢冤。以后每一次加鎖state就增加1,表示可重入次數(shù)铜秆。當(dāng)該線程釋放該鎖時(shí)淹真,state減1,如果減1后state為0连茧,則當(dāng)前線程釋放該鎖核蘸。

獲取鎖

void lock()

當(dāng)一個(gè)線程調(diào)用該方法時(shí),如果鎖當(dāng)前沒(méi)有被其他線程占有并且當(dāng)前線程之前沒(méi)有獲取過(guò)該鎖梅屉,則當(dāng)前線程會(huì)獲取到該鎖值纱,然后設(shè)置當(dāng)前鎖的擁有者為當(dāng)前線程,并且將state置為1坯汤;如果當(dāng)前線程已經(jīng)獲取過(guò)該鎖,則將state的值增加1搀愧;如果該鎖已經(jīng)被其他線程持有惰聂,則調(diào)用該方法的線程會(huì)被放入AQS隊(duì)列中阻塞掛起等待,

public void lock() {
    sync.lock();
}

ReentrantLock的lock()委托給了sync咱筛,根據(jù)創(chuàng)建ReentrantLock構(gòu)造函數(shù)選擇sync的實(shí)現(xiàn)時(shí)NonfairSync還是FairSync搓幌,這個(gè)鎖是一個(gè)公平鎖或者非公平鎖。

先來(lái)看非公平鎖的情況:

final void lock() {
    // CAS設(shè)置state為1
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        // 調(diào)用AQS的acquire方法
        acquire(1);
}

默認(rèn)state為0迅箩,所以第一個(gè)調(diào)用Lock的吸納成會(huì)通過(guò)CAS設(shè)置狀態(tài)值為1溉愁,CAS成功則表示當(dāng)前線程獲取到了鎖,然后設(shè)置該鎖持有者為當(dāng)前線程饲趋。

如果此時(shí)有其他線程企圖過(guò)去該鎖拐揭,CAS會(huì)失敗,然后會(huì)調(diào)用AQS的acquire方法奕塑。

再貼下acquire的源碼:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

之前說(shuō)過(guò)堂污,AQS并沒(méi)有提供可用的tryAcquire方法,tryAcquire方法需要子類自己定制龄砰。這里會(huì)調(diào)用ReentrantLock重寫的tryAcquire方法盟猖。下面先看非公平鎖的代碼。

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    // (1)鎖未被持有
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            // 設(shè)置鎖的持有者為當(dāng)前線程
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // (2)鎖已經(jīng)被某個(gè)線程持有换棚,如果該線程為當(dāng)前線程
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
            // 增加重入數(shù)
        setState(nextc);
        return true;
    }
    return false;
}

源碼比較簡(jiǎn)單式镐,分析見(jiàn)注釋。

下面來(lái)看非公平性體現(xiàn)在哪兒固蚤。首先非公平指的是先嘗試獲取鎖的線程并不一定首先獲取該鎖娘汞。

假設(shè)線程A執(zhí)行到代碼(1)發(fā)現(xiàn)線程已經(jīng)被持有然后執(zhí)行到(2)發(fā)現(xiàn)當(dāng)前線程不是鎖持有者,則返回false被放入AQS中進(jìn)行等待颇蜡。假設(shè)這時(shí)線程B也執(zhí)行到了代碼(1)价说,發(fā)現(xiàn)state為0(假設(shè)占有該鎖的其他線程釋放了該鎖)辆亏, 就成功獲取了鎖,而比B先請(qǐng)求鎖的線程A還在等待鳖目,這就是非公平性的體現(xiàn)扮叨。

下面看FairSync重寫的tryAcquire方法。

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        // 公平性策略
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

由代碼可知领迈,公平的tryAcquire方法與非公平的區(qū)別在于增加了一個(gè)hasQueuedPredecessors方法來(lái)判斷是否有線程在當(dāng)前線程前嘗試獲取鎖彻磁。

下面是hasQueuedPredecessors的具體實(shí)現(xiàn)。

public final boolean hasQueuedPredecessors() {
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

如果當(dāng)前線程節(jié)點(diǎn)有前驅(qū)節(jié)點(diǎn)則返回true狸捅,否則如果當(dāng)前AQS隊(duì)列為空或者當(dāng)前線程節(jié)點(diǎn)是AQS的第一個(gè)節(jié)點(diǎn)則返回false衷蜓。其中h==t說(shuō)明當(dāng)前隊(duì)列為空,直接返回false尘喝;如果h!=t并且s==null說(shuō)明有一個(gè)元素將要作為AQS的第一個(gè)節(jié)點(diǎn)入隊(duì)(AQS入隊(duì)包含兩步操作:首先創(chuàng)建一個(gè)哨兵頭節(jié)點(diǎn)磁浇,然后將第一個(gè)元素插入哨兵節(jié)點(diǎn)后面),那么返回true朽褪;如果h!=t并且s!=null且s.thread!=Thread.currentThread()說(shuō)明隊(duì)列里面的第一個(gè)元素不是當(dāng)前線程置吓,那么返回true。

void lockInterruptibly()

與lock()方法類似缔赠,不同之處在于衍锚,它對(duì)中斷進(jìn)行相應(yīng),就是當(dāng)前線程在調(diào)用該方法時(shí)嗤堰,如果其他線程調(diào)用了當(dāng)前線程的interrupt方法戴质,則當(dāng)前線程會(huì)拋出inetrruptedException
異常,然后返回踢匣。

public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
}

public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    // 如果當(dāng)前線程被中斷告匠,則直接拋出異常并返回
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

boolean tryLock()

嘗試獲取鎖,如果當(dāng)前該鎖沒(méi)有被其他線程持有符糊,則當(dāng)前線程獲取該鎖并返回true凫海,否則返回false。該方法不會(huì)引起當(dāng)前線程阻塞男娄。

public boolean tryLock() {
    return sync.nonfairTryAcquire(1);
}

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        // 非公平策略
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

上述代碼與非公平鎖的tryAcquire方法類似行贪,所以tryLock使用的是非公平策略。

boolean tryLock(long timeout, TimeUnit unit)

public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

設(shè)置了超時(shí)時(shí)間模闲,如果超時(shí)時(shí)間到了還沒(méi)有獲取到該鎖則返回false建瘫。

釋放鎖

void unlock()

嘗試釋放鎖,如果當(dāng)前線程持有該鎖尸折,則調(diào)用該方法會(huì)讓該線程對(duì)該線程持有的AQS狀態(tài)值減1啰脚,如果減去1后狀態(tài)值為0,則當(dāng)前線程會(huì)釋放該鎖。

public void unlock() {
    sync.release(1);
}

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    // 如果當(dāng)前線程不是該鎖持有者直接拋出異常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // 若state變?yōu)?橄浓,則清空鎖持有線程
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    // 設(shè)置可重入次數(shù)減1
    setState(c);
    return free;
}

讀寫鎖ReentrantReadWriteLock原理

解決線程安全問(wèn)題使用ReentrantLock就可以粒梦,但是ReentrantLock是獨(dú)占鎖,同一時(shí)間只能有一個(gè)線程獲取該鎖荸实,而實(shí)際中會(huì)出現(xiàn)讀多寫少的情況匀们,顯然使用ReentrantLock滿足不了這個(gè)需求,這時(shí)就需要用到ReentrantReadWriteLock准给。ReentrantReadWriteLock采用讀寫分離的策略泄朴,允許多個(gè)線程同時(shí)獲取寫鎖。

類圖結(jié)構(gòu)

讀寫鎖內(nèi)部維護(hù)了一個(gè)ReadLock和一個(gè)WriteLock露氮,它們依賴Sync實(shí)現(xiàn)具體功能祖灰。而Sync繼承自AQS,并且也提供了公平與非公平的實(shí)現(xiàn)畔规。下面只介紹非公平讀寫鎖的實(shí)現(xiàn)局扶。

我們知道AQS中只維護(hù)了一個(gè)state狀態(tài),ReentrantReadWriteLock巧妙地使用state的高16位表示讀狀態(tài)叁扫,也就是獲取到讀鎖的次數(shù)详民;使用低16位表示獲取到寫鎖的線程的可重入次數(shù)。

static final int SHARED_SHIFT   = 16;
// 讀鎖狀態(tài)值65536
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
// 寫鎖狀態(tài)值65535
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
// 寫鎖掩碼陌兑,二進(jìn)制,16個(gè)1
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
// 讀鎖線程數(shù)
static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
// 寫鎖可重入數(shù)
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

讀寫鎖中的firstReader用來(lái)記錄第一個(gè)獲取到讀鎖的線程由捎,firstReaderHoldCount記錄第一個(gè)和獲取到讀鎖的線程獲取讀鎖的可重入次數(shù)兔综。HoldCounter類型的cachedHoldCounter用來(lái)記錄最后一個(gè)獲取讀鎖的線程獲取讀鎖的可重入次數(shù)。

static final class HoldCounter {
    int count = 0;
    // Use id, not reference, to avoid garbage retention
    final long tid = getThreadId(Thread.currentThread());
}

readHolds是ThreadLocal變量狞玛,用來(lái)存放除去第一個(gè)獲取讀鎖線程外的其他線程獲取讀鎖的可重入次數(shù)软驰。ThreadLocalHoldCounter繼承了ThreadLocal,因此initialValue方法返回一個(gè)HoldCounter對(duì)象心肪。

static final class ThreadLocalHoldCounter
    extends ThreadLocal<HoldCounter> {
    public HoldCounter initialValue() {
        return new HoldCounter();
    }
}

寫鎖的獲取與釋放

void lock()

寫鎖與寫鎖锭亏、寫鎖與讀鎖是互斥的,如果當(dāng)前已經(jīng)有線程獲取了讀鎖或?qū)戞i硬鞍,則請(qǐng)求獲取寫鎖的線程會(huì)被阻塞掛起慧瘤。寫鎖是可重入鎖,如果當(dāng)前線程已經(jīng)獲取了該鎖固该,再次獲取只是簡(jiǎn)單地把可重入次數(shù)加1后返回锅减。

public void lock() {
    sync.acquire(1);
}

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

lock()內(nèi)部調(diào)用了acquire方法,其中tryAcquire是ReentrantReadWriteLock內(nèi)部的Sync類重寫的伐坏。

protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();
    int c = getState(); // 總狀態(tài)
    int w = exclusiveCount(c); // 讀鎖狀態(tài)
    // (1)c!=0說(shuō)明讀鎖或?qū)戞i已經(jīng)被獲取
    if (c != 0) {
        //(2)w==0說(shuō)明已經(jīng)有線程獲取了讀鎖怔匣,w!=0并且當(dāng)前線程不是寫鎖擁有者,則返回false
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        // (3)當(dāng)前線程已經(jīng)獲取了寫鎖桦沉,判斷可重入次數(shù)
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // (4)設(shè)置可重入次數(shù)
        setState(c + acquires);
        return true;
    }
    // (5)c==0說(shuō)明鎖還沒(méi)有被獲取每瞒,此處第一次獲取
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}

(1)如果當(dāng)前AQS狀態(tài)值不為0金闽,說(shuō)明當(dāng)前已經(jīng)有線程獲取到了讀鎖或?qū)戞i。如果w==0說(shuō)明state的低16位為0剿骨,而state不為0代芜,那么高16位必不為0,說(shuō)明有線程獲取了讀鎖懦砂,所以直接返回false(讀寫互斥蜒犯,保障數(shù)據(jù)一致性)。

(2)如果w!=0說(shuō)明當(dāng)前已經(jīng)有線程獲取了該寫鎖荞膘,再看當(dāng)前線程是不是該鎖的持有者罚随,不是則返回false。

執(zhí)行到(3)說(shuō)明當(dāng)前線程已經(jīng)獲取到了該鎖羽资,所以判斷該線程的可重入次數(shù)是否超過(guò)了最大值淘菩,是則拋出異常,否則執(zhí)行(4)增加可重入次數(shù)屠升。

如果state為0說(shuō)明目前沒(méi)有線程獲取到讀鎖和寫鎖潮改,所以執(zhí)行(5)。對(duì)于writerShouldBlock()腹暖,非公平鎖的實(shí)現(xiàn)為

final boolean writerShouldBlock() {
    return false;
}

說(shuō)明(5)搶占式地執(zhí)行CAS嘗試獲取寫鎖汇在。

公平鎖的實(shí)現(xiàn)為

final boolean writerShouldBlock() {
    return hasQueuedPredecessors();
}

還是使用hasQueuedPredecessors來(lái)判斷當(dāng)前線程節(jié)點(diǎn)是否有前驅(qū)節(jié)點(diǎn)。

void lockInterruptibly()

會(huì)對(duì)中斷進(jìn)行相應(yīng)

public void lockInterruptibly() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

boolean tryLock()

非阻塞方法脏答,嘗試獲取寫鎖糕殉,如果當(dāng)前沒(méi)有其他線程持有讀鎖或?qū)戞i,則當(dāng)前線程獲取寫鎖并返回true殖告,否則返回false阿蝶。如果當(dāng)前線程已經(jīng)持有了該寫鎖則增加state的值并返回true。

public boolean tryLock( ) {
    return sync.tryWriteLock();
}

final boolean tryWriteLock() {
    Thread current = Thread.currentThread();
    int c = getState();
    if (c != 0) {
        int w = exclusiveCount(c);
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
    }
    // 非公平策略
    if (!compareAndSetState(c, c + 1))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}

此處代碼于tryAcquire方法類似黄绩,只是使用了非公平策略羡洁。

void unlock()

使state減1,如果減1后state為0爽丹,則當(dāng)前線程會(huì)釋放鎖筑煮。

public void unlock() {
    sync.release(1);
}

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        // 激活A(yù)QS隊(duì)列里面的一個(gè)線程
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

protected final boolean tryRelease(int releases) {
    // 檢查是否使鎖持有者調(diào)用的unlock
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 獲取可重入值,沒(méi)有考慮高16位习劫,因?yàn)楂@取寫鎖時(shí)讀鎖狀態(tài)值 肯定為咆瘟。
    int nextc = getState() - releases;
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
}

讀鎖的獲取與釋放

讀鎖通過(guò)ReadLock來(lái)實(shí)現(xiàn)。

void lock()

獲取的鎖诽里,如果寫鎖沒(méi)有被其他線程持有袒餐,則可以獲取讀鎖,并將state的高16位加1;否則阻塞灸眼。

public void lock() {
    sync.acquireShared(1);
}

// 來(lái)自AQS
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

lock方法調(diào)用了AQS的acquireShared方法卧檐,其內(nèi)部又調(diào)用了Sync重寫的tryAcquireShared方法。

protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    int c = getState();
    // 如果有其他線程獲取了寫鎖焰宣,返回-1
    // 如果寫鎖被當(dāng)前線程持有霉囚,那么也可以獲取讀鎖,因?yàn)橥粋€(gè)線程同時(shí)最多只能執(zhí)行讀或?qū)懼械囊粋€(gè)操作
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    int r = sharedCount(c);
    // 公平策略
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        // 讀鎖被第一次獲取
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        // 讀鎖被獲取過(guò)匕积,且當(dāng)前線程就是第一次獲取讀鎖的線程    
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        } else {
            // 記錄最后一個(gè)獲取讀鎖的線程或記錄其他線程讀鎖的可重入次數(shù)
            HoldCounter rh = cachedHoldCounter;
            // 如果rh為空或者rh不是當(dāng)前線程盈罐,需要通過(guò)get方法創(chuàng)建一個(gè)新的HoldCounter用來(lái)記錄當(dāng)前線程的可重入次數(shù)
            // 并將其設(shè)為cachedHoldCounter
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            // 運(yùn)行到此處說(shuō)明當(dāng)前線程已經(jīng)被設(shè)為最后一個(gè)獲取讀鎖的線程,rh.count==0說(shuō)明當(dāng)前線程已經(jīng)完全釋放了讀鎖闪唆,
            // 現(xiàn)在又要獲取讀鎖盅粪,需要更新自己對(duì)應(yīng)的HoldCounter
            else if (rh.count == 0)
                readHolds.set(rh);
            // 增加重入數(shù)
            rh.count++;
        }
        return 1;
    }
    // 嘗試一次失敗后自旋獲取
    return fullTryAcquireShared(current);
}

代碼中readerShouldBlock用于決定代碼公平與否。非公平鎖的實(shí)現(xiàn)如下悄蕾。

final boolean readerShouldBlock() {
    return apparentlyFirstQueuedIsExclusive();
}

final boolean apparentlyFirstQueuedIsExclusive() {
    Node h, s;
    return (h = head) != null &&
        (s = h.next)  != null &&
        !s.isShared()         &&
        s.thread != null;
}

僅當(dāng)AQS隊(duì)列存在元素且第一個(gè)元素在嘗試獲取寫鎖時(shí)才會(huì)阻塞當(dāng)前線程票顾,否則就算有線程在嘗試獲取讀鎖也不會(huì)讓步(非公平性的體現(xiàn))。

void unlock()

public void unlock() {
    sync.releaseShared(1);
}

具體操作委托給sync帆调。

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    ...
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

將state減去一個(gè)單位奠骄,如果結(jié)果為0,則返回true番刊,調(diào)用doReleaseShared方法釋放一個(gè)由于獲取讀鎖而被阻塞的線程含鳞;如果不為0,說(shuō)明仍有線程持有讀鎖芹务,返回false民晒。

案例介紹

下面基于ReentrantLock實(shí)現(xiàn)線程安全的list,適用于讀多寫少的情況

public class ReentrantLockList {

    private ArrayList<String> array = new ArrayList<>();
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private final Lock readLock;
    private final Lock writeLock;

    public ReentrantLockList() {
        readLock = lock.readLock();
        writeLock = lock.writeLock();
    }

    public void add(String e) {
        writeLock.lock();
        try{
            array.add(e);
        }finally {
            writeLock.unlock();
        }
    }

    public void remove(String e) {
        writeLock.lock();
        try{
            array.remove(e);
        }finally {
            writeLock.unlock();
        }
    }

    public String get(int index) {
        readLock.lock();
        try{
            return array.get(index);
        }finally {
            readLock.unlock();
        }
    }

    public int size() {
        return array.size();
    }
}

更多

相關(guān)筆記:《Java并發(fā)編程之美》閱讀筆記

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末锄禽,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子靴姿,更是在濱河造成了極大的恐慌沃但,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,639評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件佛吓,死亡現(xiàn)場(chǎng)離奇詭異宵晚,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)维雇,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,277評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門淤刃,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人吱型,你說(shuō)我怎么就攤上這事逸贾。” “怎么了?”我有些...
    開(kāi)封第一講書人閱讀 157,221評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵铝侵,是天一觀的道長(zhǎng)灼伤。 經(jīng)常有香客問(wèn)我,道長(zhǎng)咪鲜,這世上最難降的妖魔是什么狐赡? 我笑而不...
    開(kāi)封第一講書人閱讀 56,474評(píng)論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮疟丙,結(jié)果婚禮上颖侄,老公的妹妹穿的比我還像新娘。我一直安慰自己享郊,他們只是感情好览祖,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,570評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著拂蝎,像睡著了一般穴墅。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上温自,一...
    開(kāi)封第一講書人閱讀 49,816評(píng)論 1 290
  • 那天玄货,我揣著相機(jī)與錄音,去河邊找鬼悼泌。 笑死,一個(gè)胖子當(dāng)著我的面吹牛馆里,可吹牛的內(nèi)容都是我干的隘世。 我是一名探鬼主播,決...
    沈念sama閱讀 38,957評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼鸠踪,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼丙者!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起营密,我...
    開(kāi)封第一講書人閱讀 37,718評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤械媒,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后评汰,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體纷捞,經(jīng)...
    沈念sama閱讀 44,176評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,511評(píng)論 2 327
  • 正文 我和宋清朗相戀三年被去,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了主儡。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,646評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡惨缆,死狀恐怖糜值,靈堂內(nèi)的尸體忽然破棺而出丰捷,到底是詐尸還是另有隱情,我是刑警寧澤臀玄,帶...
    沈念sama閱讀 34,322評(píng)論 4 330
  • 正文 年R本政府宣布瓢阴,位于F島的核電站,受9級(jí)特大地震影響健无,放射性物質(zhì)發(fā)生泄漏荣恐。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,934評(píng)論 3 313
  • 文/蒙蒙 一累贤、第九天 我趴在偏房一處隱蔽的房頂上張望叠穆。 院中可真熱鬧,春花似錦臼膏、人聲如沸硼被。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 30,755評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)嚷硫。三九已至,卻和暖如春始鱼,著一層夾襖步出監(jiān)牢的瞬間仔掸,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 31,987評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工医清, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留起暮,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,358評(píng)論 2 360
  • 正文 我出身青樓会烙,卻偏偏與公主長(zhǎng)得像负懦,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子柏腻,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,514評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容