目錄
LockSupport工具類
LockSupport是創(chuàng)建鎖和其他同步類的基礎(chǔ)。
LockSupport類與每個(gè)使用它的線程都會(huì)關(guān)聯(lián)一個(gè)許可證波闹,默認(rèn)情況下調(diào)用LockSupport類的方法的線程是不持有許可證的悯许。
下面介紹LockSupport類中的幾個(gè)主要函數(shù)术羔。
1. void park()
如果park方法拿到了與LockSupport關(guān)聯(lián)的許可證,則調(diào)用LockSupport.park()時(shí)會(huì)馬上返回滴铅,否則調(diào)用線程會(huì)被禁止參與線程的調(diào)度,也就是會(huì)被阻塞掛起。
如下代碼直接在main函數(shù)里面調(diào)用park方法式散,最終只會(huì)輸出 "begin park!"打颤,然后當(dāng)前線程被掛起暴拄,這時(shí)因?yàn)樵谀J(rèn)情況下調(diào)用線程是不持有許可證的。
public static void main(String[] args) {
System.out.println("begin park!");
LockSupport.park();
System.out.println("end park!");
}
在其他線程調(diào)用unpark(Thread thread)方法并且將當(dāng)前線程作為參數(shù)時(shí)编饺,調(diào)用park方法而被阻塞的線程會(huì)返回乖篷。另外,如果其他線程調(diào)用了阻塞線程的interrupt()方法透且,設(shè)置了中斷標(biāo)志或者線程被虛假喚醒撕蔼,則線程也會(huì)返回豁鲤。所以在調(diào)用park方法時(shí)最好也使用循環(huán)條件判斷方式。
注意:
因調(diào)用park方法而被阻塞的線程被其他線程中斷而返回時(shí)并不會(huì)拋出InterruptedException異常鲸沮。
2. void unpark(Thread thread)
當(dāng)一個(gè)線程調(diào)用unpark時(shí)琳骡,如果參數(shù)thread線程沒(méi)有持有thread與LockSupport類相關(guān)聯(lián)的許可證,則讓thread線程持有讼溺。如果thread因調(diào)用park()而被掛起楣号,則unpark方法會(huì)使其被喚醒。如果thread之前沒(méi)有調(diào)用park怒坯,則調(diào)用unpark方法后再調(diào)用park方法會(huì)立即返回炫狱,代碼如下。
public static void main(String[] args) {
System.out.println("begin park!");
LockSupport.unpark(Thread.currentThread());
LockSupport.park();
System.out.println("end park!");
}
輸出如下:
begin park!
end park!
下面再來(lái)看一個(gè)例子來(lái)加深對(duì)park和unpark的理解剔猿。
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("child thread begin park!");
// 掛起自己
LockSupport.park();
System.out.println("child thread unpark!");
}
});
thread.start();
// 確保調(diào)用unpark前子線程已經(jīng)將自己掛起
Thread.sleep(1000);
System.out.println("main thread begin unpark!");
LockSupport.unpark(thread);
}
子線程將自己掛起视译,主線程中調(diào)用了unpark方法使得子線程得以繼續(xù)運(yùn)行。
3. void parkNanos(long nanos)
和park方法類似艳馒,如果調(diào)用park方法的線程已經(jīng)拿到了與LockkSupport關(guān)聯(lián)的許可證憎亚,則調(diào)用LockSupport.parkNanos(long nanos)方法會(huì)立即返回。不同之處在于弄慰,如果沒(méi)有拿到許可證第美,則調(diào)用線程會(huì)被掛起nanos時(shí)間后自動(dòng)返回。
抽象同步隊(duì)列AQS概述
AQS——鎖的底層支持
AbstractQueuedSynchronizer抽象同步隊(duì)列簡(jiǎn)稱AQS陆爽,是實(shí)現(xiàn)同步器的基礎(chǔ)組件什往。
以下為AQS的類結(jié)構(gòu)圖:
AQS是一個(gè)FIFO的雙向隊(duì)列,內(nèi)部通過(guò)head和tail兩個(gè)節(jié)點(diǎn)來(lái)對(duì)隊(duì)列進(jìn)行維護(hù)慌闭。
Node是AQS的一個(gè)靜態(tài)內(nèi)部類别威,屬性SHARED和EXCLUSIVE分別代表用來(lái)標(biāo)識(shí)線程是獲取共享資源和獨(dú)占資源時(shí)被阻塞掛起放入AQS隊(duì)列的。thread為Node持有的Thread驴剔;waitStatus用于記錄當(dāng)前線程的狀態(tài)省古,CANCELLED表示線程被取消,SIGNAL表示線程需要喚醒丧失,CONDITION表示線程在條件隊(duì)列里面等待豺妓,PROPAGATE表示釋放共享資源時(shí)需要通知其他節(jié)點(diǎn)。
AQS維護(hù)了一個(gè)單一的狀態(tài)信息state布讹,可以通過(guò)getState琳拭、setState、compareAndSetState函數(shù)修改其值描验。
AQS內(nèi)部類ConditionObject用來(lái)結(jié)合鎖實(shí)現(xiàn)線程同步白嘁。
AQS實(shí)現(xiàn)線程同步的關(guān)鍵是對(duì)state進(jìn)行操作,根據(jù)state是否屬于一個(gè)線程膘流,操作state的方式可分為獨(dú)占方式和共享方式絮缅。
獨(dú)占方式下獲取和釋放資源的方法為:
void acquire(int arg)
void acauireInterruptibly(int arg)
boolean release(int arg)
共享方式下獲取和釋放資源的方法為:
void acauireShared(int arg)
void acauireSharedInterruptibly(int arg)
boolean releaseShared(int arg)
獨(dú)占方式下鲁沥,獲取和釋放資源的流程如下:
當(dāng)一個(gè)線程調(diào)用acquire(int arg)獲取獨(dú)占資源時(shí),會(huì)首先使用tryAcquire方法進(jìn)行嘗試盟蚣,具體就是設(shè)置state的值黍析,成功則世界返回,失敗則將當(dāng)前線程封裝為類型為Node.EXCLUSIVE的Node節(jié)點(diǎn)后插入到AQS阻塞隊(duì)列的尾部屎开,并調(diào)用LockSupport.park(this)掛起自己。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
但一個(gè)線程調(diào)用release(int arg)會(huì)嘗試使用tryRelease操作釋放資源马靠,這里也是改變state的值奄抽,然后調(diào)用LockSupport.unpark(thread)方法激活A(yù)QS隊(duì)列里面被阻塞的一個(gè)線程(thread)。被阻塞的線程使用tryAcquire嘗試甩鳄,看當(dāng)前state的值是否滿足自己的需要逞度,滿足則該線程被激活,繼續(xù)向下運(yùn)行妙啃,否則還是會(huì)被放入AQS隊(duì)列并被掛起档泽。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
注意;
AQS類并沒(méi)有提供tryAcquire和tryRelease方法的實(shí)現(xiàn)揖赴,因?yàn)锳QS是一個(gè)基礎(chǔ)框架馆匿,這兩個(gè)方法需要由子類自己實(shí)現(xiàn)來(lái)實(shí)現(xiàn)自己的特性。
共享方式下燥滑,獲取和釋放資源的流程如下渐北;
當(dāng)線程調(diào)用acquireShared(int arg)獲取共享資源時(shí),首先使用tryAcquireShared嘗試獲取資源并修改state铭拧,成功則直接放回赃蛛,否則將當(dāng)前線程封裝為Node.SHARED類型的節(jié)點(diǎn)插入到AQS阻塞隊(duì)列的尾部,并使用LockSupport.park(this)方法掛起自己搀菩。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
當(dāng)一個(gè)線程調(diào)用releaseShared(int arg)時(shí)會(huì)嘗試使用tryReleasedShared操作釋放資源并修改state呕臂,然后使用LockSupport.unpark(thread)激活A(yù)QS隊(duì)列中的一個(gè)線程(thread)。被激活的線程會(huì)調(diào)用tryReleaseShared查看當(dāng)前state是否滿足自己需求肪跋,滿足則該線程被激活歧蒋,否則繼續(xù)掛起。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
注意:
同上澎嚣,AQS沒(méi)有提供tryAcquiredShared和tryReleaseShared方法的實(shí)現(xiàn)疏尿,這兩個(gè)方法也需要由子類實(shí)現(xiàn)。
AQS——條件變量的支持
以下是使用條件變量的例子:
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
lock.lock();
try{
System.out.println("begin wait");
condition.await();
System.out.println("end wait");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
lock.lock();
try{
System.out.println("begin signal");
condition.signal();
System.out.println("end signal");
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
上述代碼中易桃,condition是由Lock對(duì)象調(diào)用newCondition方法創(chuàng)建的條件變量褥琐,一個(gè)Lock對(duì)象可以創(chuàng)建多個(gè)條件變量。
lock.lock()方法相當(dāng)于進(jìn)入synchronized同步代碼塊晤郑,用于獲取獨(dú)占鎖敌呈;await()方法相當(dāng)于Object.wait()方法贸宏,用于阻塞掛起當(dāng)前線程,當(dāng)其他線程調(diào)用了signal方法(相當(dāng)于Object.notify()方法)時(shí)磕洪,被阻塞的線程才會(huì)從await處返回吭练。
lock.newCondition()作用是new一個(gè)在AQS內(nèi)部類ConditionObject對(duì)象。每個(gè)條件變量?jī)?nèi)部都維護(hù)了一個(gè)條件隊(duì)列析显,用來(lái)存放調(diào)用該條件變量的await方法時(shí)被阻塞的線程鲫咽。
注意:
這個(gè)條件隊(duì)列和AQS隊(duì)列不是一回事。
以下是await的源碼:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 創(chuàng)建新的node節(jié)點(diǎn)谷异,并插入到條件隊(duì)列末尾
Node node = addConditionWaiter();
// 釋放當(dāng)前線程的鎖
int savedState = fullyRelease(node);
int interruptMode = 0;
// 調(diào)用park方法阻塞掛起當(dāng)前線程
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
...
}
首先會(huì)構(gòu)造一個(gè)類型為Node.CONDITION的node節(jié)點(diǎn)分尸,然后將該節(jié)點(diǎn)處插入條件隊(duì)列末尾,之后當(dāng)前線程會(huì)釋放獲取的鎖歹嘹,并被阻塞掛起箩绍。這時(shí)如果有其他線程調(diào)用lock.lock()方法嘗試獲取鎖,就會(huì)有一個(gè)線程獲取到鎖尺上。
再來(lái)看signal源碼:
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
// 將條件隊(duì)列頭元素移動(dòng)到AQS隊(duì)列等待執(zhí)行
doSignal(first);
}
調(diào)用signal時(shí)材蛛,會(huì)把條件隊(duì)列隊(duì)首元素放入AQS中并激活隊(duì)首元素對(duì)應(yīng)的線程。
基于AQS實(shí)現(xiàn)自定義同步器
下面基于AQS實(shí)現(xiàn)一個(gè)不可重入的獨(dú)占鎖怎抛。自定義AQS重寫一系列函數(shù)卑吭,還需要定義原子變量state的含義。這里定義state為0表示目前鎖沒(méi)有被線程持有抽诉,state為1表示鎖已經(jīng)被某一個(gè)線程持有陨簇。
public class NonReentrantLock implements Lock, Serializable {
// 內(nèi)部幫助類
private static class Sync extends AbstractQueuedSynchronizer {
// 鎖是否被持有
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
// 嘗試獲取鎖
@Override
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 嘗試釋放鎖
@Override
protected boolean tryRelease(int arg) {
if(getState() == 0) {
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// 提供條件變量接口
Condition newCondition() {
return new ConditionObject();
}
}
// 創(chuàng)建一個(gè)Sync來(lái)做具體工作
private final Sync sync = new Sync();
@Override
public void lock() {
sync.acquire(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.tryRelease(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
}
NonReentrantLock定義了一個(gè)內(nèi)部類Sync用來(lái)實(shí)現(xiàn)具體的鎖的操作,Sync繼承了AQS。由于是獨(dú)占鎖,:Sync只重寫了tryAcquire返劲、tryRelease、isHeldExclusively耙饰。此外,Sync提供了newCondition方法來(lái)支持條件變量纹份。
下面使用自定義的鎖來(lái)實(shí)現(xiàn)簡(jiǎn)單的生產(chǎn)-消費(fèi)模型
final static NonReentrantLock lock = new NonReentrantLock();
final static Condition notFull = lock.newCondition();
final static Condition notEmpty = lock.newCondition();
final static Queue<String> queue = new LinkedBlockingQueue<>();
final static int queueSize = 10;
public static void main(String[] args) {
Thread producer = new Thread(new Runnable() {
@Override
public void run() {
// 獲取獨(dú)占鎖
lock.lock();
try {
while (true) {
// 隊(duì)列滿了則等待
while (queue.size() >= queueSize) {
notEmpty.await();
}
queue.add("ele");
System.out.println("add...");
notFull.signalAll();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 釋放鎖
lock.unlock();
}
}
});
Thread consumer = new Thread(new Runnable() {
@Override
public void run() {
// 獲取獨(dú)占鎖
lock.lock();
try {
while (true) {
// 隊(duì)列空則等待
while (queue.size() == 0) {
notFull.await();
}
String ele = queue.poll();
System.out.println("poll...");
notEmpty.signalAll();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 釋放鎖
lock.unlock();
}
}
});
producer.start();
consumer.start();
}
代碼使用了NonReentrantLock來(lái)創(chuàng)建lock苟跪,并調(diào)用lock.newCondition創(chuàng)建了兩個(gè)條件變量用來(lái)實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者線程的同步。
ReentrantLock的原理
類圖結(jié)構(gòu)
構(gòu)造函數(shù)如下:
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
可以看到蔓涧,ReentrantLock最終還是依賴AQS件已,并且根據(jù)傳入的參數(shù)來(lái)決定其內(nèi)部是一個(gè)公平鎖還是非公平鎖(默認(rèn)為公平鎖)。
Sync類直接繼承自AQS元暴,它的子類NonfairSync和FairSync分別實(shí)現(xiàn)了獲取鎖的非公平與公平策略篷扩。
AQS的state表示線程獲取鎖的可重入次數(shù)。state為0表示當(dāng)前鎖沒(méi)有被任何線程持有茉盏。當(dāng)一個(gè)線程第一次獲取該所是會(huì)嘗試使用CAS設(shè)置state為1鉴未,成功后記錄該鎖的持有者為當(dāng)前線程枢冤。以后每一次加鎖state就增加1,表示可重入次數(shù)铜秆。當(dāng)該線程釋放該鎖時(shí)淹真,state減1,如果減1后state為0连茧,則當(dāng)前線程釋放該鎖核蘸。
獲取鎖
void lock()
當(dāng)一個(gè)線程調(diào)用該方法時(shí),如果鎖當(dāng)前沒(méi)有被其他線程占有并且當(dāng)前線程之前沒(méi)有獲取過(guò)該鎖梅屉,則當(dāng)前線程會(huì)獲取到該鎖值纱,然后設(shè)置當(dāng)前鎖的擁有者為當(dāng)前線程,并且將state置為1坯汤;如果當(dāng)前線程已經(jīng)獲取過(guò)該鎖,則將state的值增加1搀愧;如果該鎖已經(jīng)被其他線程持有惰聂,則調(diào)用該方法的線程會(huì)被放入AQS隊(duì)列中阻塞掛起等待,
public void lock() {
sync.lock();
}
ReentrantLock的lock()委托給了sync咱筛,根據(jù)創(chuàng)建ReentrantLock構(gòu)造函數(shù)選擇sync的實(shí)現(xiàn)時(shí)NonfairSync還是FairSync搓幌,這個(gè)鎖是一個(gè)公平鎖或者非公平鎖。
先來(lái)看非公平鎖的情況:
final void lock() {
// CAS設(shè)置state為1
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// 調(diào)用AQS的acquire方法
acquire(1);
}
默認(rèn)state為0迅箩,所以第一個(gè)調(diào)用Lock的吸納成會(huì)通過(guò)CAS設(shè)置狀態(tài)值為1溉愁,CAS成功則表示當(dāng)前線程獲取到了鎖,然后設(shè)置該鎖持有者為當(dāng)前線程饲趋。
如果此時(shí)有其他線程企圖過(guò)去該鎖拐揭,CAS會(huì)失敗,然后會(huì)調(diào)用AQS的acquire方法奕塑。
再貼下acquire的源碼:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
之前說(shuō)過(guò)堂污,AQS并沒(méi)有提供可用的tryAcquire方法,tryAcquire方法需要子類自己定制龄砰。這里會(huì)調(diào)用ReentrantLock重寫的tryAcquire方法盟猖。下面先看非公平鎖的代碼。
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// (1)鎖未被持有
if (c == 0) {
if (compareAndSetState(0, acquires)) {
// 設(shè)置鎖的持有者為當(dāng)前線程
setExclusiveOwnerThread(current);
return true;
}
}
// (2)鎖已經(jīng)被某個(gè)線程持有换棚,如果該線程為當(dāng)前線程
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 增加重入數(shù)
setState(nextc);
return true;
}
return false;
}
源碼比較簡(jiǎn)單式镐,分析見(jiàn)注釋。
下面來(lái)看非公平性體現(xiàn)在哪兒固蚤。首先非公平指的是先嘗試獲取鎖的線程并不一定首先獲取該鎖娘汞。
假設(shè)線程A執(zhí)行到代碼(1)發(fā)現(xiàn)線程已經(jīng)被持有然后執(zhí)行到(2)發(fā)現(xiàn)當(dāng)前線程不是鎖持有者,則返回false被放入AQS中進(jìn)行等待颇蜡。假設(shè)這時(shí)線程B也執(zhí)行到了代碼(1)价说,發(fā)現(xiàn)state為0(假設(shè)占有該鎖的其他線程釋放了該鎖)辆亏, 就成功獲取了鎖,而比B先請(qǐng)求鎖的線程A還在等待鳖目,這就是非公平性的體現(xiàn)扮叨。
下面看FairSync重寫的tryAcquire方法。
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 公平性策略
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
由代碼可知领迈,公平的tryAcquire方法與非公平的區(qū)別在于增加了一個(gè)hasQueuedPredecessors方法來(lái)判斷是否有線程在當(dāng)前線程前嘗試獲取鎖彻磁。
下面是hasQueuedPredecessors的具體實(shí)現(xiàn)。
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
如果當(dāng)前線程節(jié)點(diǎn)有前驅(qū)節(jié)點(diǎn)則返回true狸捅,否則如果當(dāng)前AQS隊(duì)列為空或者當(dāng)前線程節(jié)點(diǎn)是AQS的第一個(gè)節(jié)點(diǎn)則返回false衷蜓。其中h==t說(shuō)明當(dāng)前隊(duì)列為空,直接返回false尘喝;如果h!=t并且s==null說(shuō)明有一個(gè)元素將要作為AQS的第一個(gè)節(jié)點(diǎn)入隊(duì)(AQS入隊(duì)包含兩步操作:首先創(chuàng)建一個(gè)哨兵頭節(jié)點(diǎn)磁浇,然后將第一個(gè)元素插入哨兵節(jié)點(diǎn)后面),那么返回true朽褪;如果h!=t并且s!=null且s.thread!=Thread.currentThread()說(shuō)明隊(duì)列里面的第一個(gè)元素不是當(dāng)前線程置吓,那么返回true。
void lockInterruptibly()
與lock()方法類似缔赠,不同之處在于衍锚,它對(duì)中斷進(jìn)行相應(yīng),就是當(dāng)前線程在調(diào)用該方法時(shí)嗤堰,如果其他線程調(diào)用了當(dāng)前線程的interrupt方法戴质,則當(dāng)前線程會(huì)拋出inetrruptedException
異常,然后返回踢匣。
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public final void acquireInterruptibly(int arg)
throws InterruptedException {
// 如果當(dāng)前線程被中斷告匠,則直接拋出異常并返回
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
boolean tryLock()
嘗試獲取鎖,如果當(dāng)前該鎖沒(méi)有被其他線程持有符糊,則當(dāng)前線程獲取該鎖并返回true凫海,否則返回false。該方法不會(huì)引起當(dāng)前線程阻塞男娄。
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 非公平策略
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
上述代碼與非公平鎖的tryAcquire方法類似行贪,所以tryLock使用的是非公平策略。
boolean tryLock(long timeout, TimeUnit unit)
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
設(shè)置了超時(shí)時(shí)間模闲,如果超時(shí)時(shí)間到了還沒(méi)有獲取到該鎖則返回false建瘫。
釋放鎖
void unlock()
嘗試釋放鎖,如果當(dāng)前線程持有該鎖尸折,則調(diào)用該方法會(huì)讓該線程對(duì)該線程持有的AQS狀態(tài)值減1啰脚,如果減去1后狀態(tài)值為0,則當(dāng)前線程會(huì)釋放該鎖。
public void unlock() {
sync.release(1);
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 如果當(dāng)前線程不是該鎖持有者直接拋出異常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 若state變?yōu)?橄浓,則清空鎖持有線程
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
// 設(shè)置可重入次數(shù)減1
setState(c);
return free;
}
讀寫鎖ReentrantReadWriteLock原理
解決線程安全問(wèn)題使用ReentrantLock就可以粒梦,但是ReentrantLock是獨(dú)占鎖,同一時(shí)間只能有一個(gè)線程獲取該鎖荸实,而實(shí)際中會(huì)出現(xiàn)讀多寫少的情況匀们,顯然使用ReentrantLock滿足不了這個(gè)需求,這時(shí)就需要用到ReentrantReadWriteLock准给。ReentrantReadWriteLock采用讀寫分離的策略泄朴,允許多個(gè)線程同時(shí)獲取寫鎖。
類圖結(jié)構(gòu)
讀寫鎖內(nèi)部維護(hù)了一個(gè)ReadLock和一個(gè)WriteLock露氮,它們依賴Sync實(shí)現(xiàn)具體功能祖灰。而Sync繼承自AQS,并且也提供了公平與非公平的實(shí)現(xiàn)畔规。下面只介紹非公平讀寫鎖的實(shí)現(xiàn)局扶。
我們知道AQS中只維護(hù)了一個(gè)state狀態(tài),ReentrantReadWriteLock巧妙地使用state的高16位表示讀狀態(tài)叁扫,也就是獲取到讀鎖的次數(shù)详民;使用低16位表示獲取到寫鎖的線程的可重入次數(shù)。
static final int SHARED_SHIFT = 16;
// 讀鎖狀態(tài)值65536
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
// 寫鎖狀態(tài)值65535
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
// 寫鎖掩碼陌兑,二進(jìn)制,16個(gè)1
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
// 讀鎖線程數(shù)
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
// 寫鎖可重入數(shù)
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
讀寫鎖中的firstReader用來(lái)記錄第一個(gè)獲取到讀鎖的線程由捎,firstReaderHoldCount記錄第一個(gè)和獲取到讀鎖的線程獲取讀鎖的可重入次數(shù)兔综。HoldCounter類型的cachedHoldCounter用來(lái)記錄最后一個(gè)獲取讀鎖的線程獲取讀鎖的可重入次數(shù)。
static final class HoldCounter {
int count = 0;
// Use id, not reference, to avoid garbage retention
final long tid = getThreadId(Thread.currentThread());
}
readHolds是ThreadLocal變量狞玛,用來(lái)存放除去第一個(gè)獲取讀鎖線程外的其他線程獲取讀鎖的可重入次數(shù)软驰。ThreadLocalHoldCounter繼承了ThreadLocal,因此initialValue方法返回一個(gè)HoldCounter對(duì)象心肪。
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
寫鎖的獲取與釋放
void lock()
寫鎖與寫鎖锭亏、寫鎖與讀鎖是互斥的,如果當(dāng)前已經(jīng)有線程獲取了讀鎖或?qū)戞i硬鞍,則請(qǐng)求獲取寫鎖的線程會(huì)被阻塞掛起慧瘤。寫鎖是可重入鎖,如果當(dāng)前線程已經(jīng)獲取了該鎖固该,再次獲取只是簡(jiǎn)單地把可重入次數(shù)加1后返回锅减。
public void lock() {
sync.acquire(1);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
lock()內(nèi)部調(diào)用了acquire方法,其中tryAcquire是ReentrantReadWriteLock內(nèi)部的Sync類重寫的伐坏。
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState(); // 總狀態(tài)
int w = exclusiveCount(c); // 讀鎖狀態(tài)
// (1)c!=0說(shuō)明讀鎖或?qū)戞i已經(jīng)被獲取
if (c != 0) {
//(2)w==0說(shuō)明已經(jīng)有線程獲取了讀鎖怔匣,w!=0并且當(dāng)前線程不是寫鎖擁有者,則返回false
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// (3)當(dāng)前線程已經(jīng)獲取了寫鎖桦沉,判斷可重入次數(shù)
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// (4)設(shè)置可重入次數(shù)
setState(c + acquires);
return true;
}
// (5)c==0說(shuō)明鎖還沒(méi)有被獲取每瞒,此處第一次獲取
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
(1)如果當(dāng)前AQS狀態(tài)值不為0金闽,說(shuō)明當(dāng)前已經(jīng)有線程獲取到了讀鎖或?qū)戞i。如果w==0說(shuō)明state的低16位為0剿骨,而state不為0代芜,那么高16位必不為0,說(shuō)明有線程獲取了讀鎖懦砂,所以直接返回false(讀寫互斥蜒犯,保障數(shù)據(jù)一致性)。
(2)如果w!=0說(shuō)明當(dāng)前已經(jīng)有線程獲取了該寫鎖荞膘,再看當(dāng)前線程是不是該鎖的持有者罚随,不是則返回false。
執(zhí)行到(3)說(shuō)明當(dāng)前線程已經(jīng)獲取到了該鎖羽资,所以判斷該線程的可重入次數(shù)是否超過(guò)了最大值淘菩,是則拋出異常,否則執(zhí)行(4)增加可重入次數(shù)屠升。
如果state為0說(shuō)明目前沒(méi)有線程獲取到讀鎖和寫鎖潮改,所以執(zhí)行(5)。對(duì)于writerShouldBlock()腹暖,非公平鎖的實(shí)現(xiàn)為
final boolean writerShouldBlock() {
return false;
}
說(shuō)明(5)搶占式地執(zhí)行CAS嘗試獲取寫鎖汇在。
公平鎖的實(shí)現(xiàn)為
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
還是使用hasQueuedPredecessors來(lái)判斷當(dāng)前線程節(jié)點(diǎn)是否有前驅(qū)節(jié)點(diǎn)。
void lockInterruptibly()
會(huì)對(duì)中斷進(jìn)行相應(yīng)
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
boolean tryLock()
非阻塞方法脏答,嘗試獲取寫鎖糕殉,如果當(dāng)前沒(méi)有其他線程持有讀鎖或?qū)戞i,則當(dāng)前線程獲取寫鎖并返回true殖告,否則返回false阿蝶。如果當(dāng)前線程已經(jīng)持有了該寫鎖則增加state的值并返回true。
public boolean tryLock( ) {
return sync.tryWriteLock();
}
final boolean tryWriteLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c != 0) {
int w = exclusiveCount(c);
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
}
// 非公平策略
if (!compareAndSetState(c, c + 1))
return false;
setExclusiveOwnerThread(current);
return true;
}
此處代碼于tryAcquire方法類似黄绩,只是使用了非公平策略羡洁。
void unlock()
使state減1,如果減1后state為0爽丹,則當(dāng)前線程會(huì)釋放鎖筑煮。
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// 激活A(yù)QS隊(duì)列里面的一個(gè)線程
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
// 檢查是否使鎖持有者調(diào)用的unlock
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 獲取可重入值,沒(méi)有考慮高16位习劫,因?yàn)楂@取寫鎖時(shí)讀鎖狀態(tài)值 肯定為咆瘟。
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
讀鎖的獲取與釋放
讀鎖通過(guò)ReadLock來(lái)實(shí)現(xiàn)。
void lock()
獲取的鎖诽里,如果寫鎖沒(méi)有被其他線程持有袒餐,則可以獲取讀鎖,并將state的高16位加1;否則阻塞灸眼。
public void lock() {
sync.acquireShared(1);
}
// 來(lái)自AQS
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
lock方法調(diào)用了AQS的acquireShared方法卧檐,其內(nèi)部又調(diào)用了Sync重寫的tryAcquireShared方法。
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// 如果有其他線程獲取了寫鎖焰宣,返回-1
// 如果寫鎖被當(dāng)前線程持有霉囚,那么也可以獲取讀鎖,因?yàn)橥粋€(gè)線程同時(shí)最多只能執(zhí)行讀或?qū)懼械囊粋€(gè)操作
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
// 公平策略
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 讀鎖被第一次獲取
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
// 讀鎖被獲取過(guò)匕积,且當(dāng)前線程就是第一次獲取讀鎖的線程
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
// 記錄最后一個(gè)獲取讀鎖的線程或記錄其他線程讀鎖的可重入次數(shù)
HoldCounter rh = cachedHoldCounter;
// 如果rh為空或者rh不是當(dāng)前線程盈罐,需要通過(guò)get方法創(chuàng)建一個(gè)新的HoldCounter用來(lái)記錄當(dāng)前線程的可重入次數(shù)
// 并將其設(shè)為cachedHoldCounter
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
// 運(yùn)行到此處說(shuō)明當(dāng)前線程已經(jīng)被設(shè)為最后一個(gè)獲取讀鎖的線程,rh.count==0說(shuō)明當(dāng)前線程已經(jīng)完全釋放了讀鎖闪唆,
// 現(xiàn)在又要獲取讀鎖盅粪,需要更新自己對(duì)應(yīng)的HoldCounter
else if (rh.count == 0)
readHolds.set(rh);
// 增加重入數(shù)
rh.count++;
}
return 1;
}
// 嘗試一次失敗后自旋獲取
return fullTryAcquireShared(current);
}
代碼中readerShouldBlock用于決定代碼公平與否。非公平鎖的實(shí)現(xiàn)如下悄蕾。
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
僅當(dāng)AQS隊(duì)列存在元素且第一個(gè)元素在嘗試獲取寫鎖時(shí)才會(huì)阻塞當(dāng)前線程票顾,否則就算有線程在嘗試獲取讀鎖也不會(huì)讓步(非公平性的體現(xiàn))。
void unlock()
public void unlock() {
sync.releaseShared(1);
}
具體操作委托給sync帆调。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
...
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
將state減去一個(gè)單位奠骄,如果結(jié)果為0,則返回true番刊,調(diào)用doReleaseShared方法釋放一個(gè)由于獲取讀鎖而被阻塞的線程含鳞;如果不為0,說(shuō)明仍有線程持有讀鎖芹务,返回false民晒。
案例介紹
下面基于ReentrantLock實(shí)現(xiàn)線程安全的list,適用于讀多寫少的情況
public class ReentrantLockList {
private ArrayList<String> array = new ArrayList<>();
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock readLock;
private final Lock writeLock;
public ReentrantLockList() {
readLock = lock.readLock();
writeLock = lock.writeLock();
}
public void add(String e) {
writeLock.lock();
try{
array.add(e);
}finally {
writeLock.unlock();
}
}
public void remove(String e) {
writeLock.lock();
try{
array.remove(e);
}finally {
writeLock.unlock();
}
}
public String get(int index) {
readLock.lock();
try{
return array.get(index);
}finally {
readLock.unlock();
}
}
public int size() {
return array.size();
}
}
更多
相關(guān)筆記:《Java并發(fā)編程之美》閱讀筆記