AbstractQueuedSynchronizer
隊(duì)列同步器纫版,簡(jiǎn)稱(chēng)AQS,是一個(gè)用來(lái)構(gòu)建鎖或者其他同步組件的基礎(chǔ)框架,他使用了一個(gè)int型的成員變量來(lái)表示同步狀態(tài)客情,內(nèi)部通過(guò)FIFO隊(duì)列來(lái)完成資源獲取線程的排隊(duì)工作
使用AQS實(shí)現(xiàn)的鎖加鎖和釋放的過(guò)程(為了簡(jiǎn)單其弊,以不可重入鎖為例說(shuō)明)
lock()方法調(diào)用時(shí),嘗試去獲取鎖膀斋,如果當(dāng)前鎖沒(méi)有被其他線程持有梭伐,則獲取成功,將state+1仰担;如果當(dāng)前鎖已經(jīng)被其他線程持有糊识,則將當(dāng)前線程加入CLH隊(duì)列,此時(shí)還存在兩種情況,1.隊(duì)列為空赂苗,創(chuàng)建一個(gè)節(jié)點(diǎn)作為頭愉耙,并將新加入的節(jié)點(diǎn)加入到隊(duì)列作為尾節(jié)點(diǎn),2.隊(duì)列非空拌滋,直接將新加入的節(jié)點(diǎn)作為尾節(jié)點(diǎn)(設(shè)置為尾節(jié)點(diǎn)過(guò)程中涉及到了CAS的操作朴沿,即如果有其他線程也同時(shí)在做相同的操作,通過(guò)cas方式設(shè)置直到尾節(jié)點(diǎn)設(shè)置成功)鸠真。加入隊(duì)列后判斷一下自己的前置節(jié)點(diǎn)是不是head悯仙,如果是再次嘗試獲取鎖,獲取成功則將此節(jié)點(diǎn)設(shè)置為新的head吠卷,獲取失敗則掛起線程锡垄,等待前置節(jié)點(diǎn)釋放鎖
unlock()方法調(diào)用時(shí),將持有鎖的線程置空祭隔,并將state設(shè)置為0货岭,找到它的next并喚醒它
AQS使用方式和其中的設(shè)計(jì)模式
AQS的主要使用方式是繼承扣汪,子類(lèi)通過(guò)繼承AQS并實(shí)現(xiàn)它的抽象方法來(lái)管理同步狀態(tài)窿凤,在AQS里由一個(gè)int型的state來(lái)代表這個(gè)狀態(tài),在抽象方法的實(shí)現(xiàn)過(guò)程中免不了要對(duì)同步狀態(tài)進(jìn)行更改送朱,這時(shí)就需要使用同步器提供的3個(gè)方法(getState()搞坝、setState(int newState)和compareAndSetState(int expect,int update))來(lái)進(jìn)行操作搔谴,因?yàn)樗鼈兡軌虮WC狀態(tài)的改變是安全的。
private volatile int state;
在實(shí)現(xiàn)上桩撮,子類(lèi)推薦被定義為自定義同步組件的靜態(tài)內(nèi)部類(lèi)敦第,AQS自身沒(méi)有實(shí)現(xiàn)任何同步接口,它僅僅是定義了若干同步狀態(tài)獲取和釋放的方法來(lái)供自定義同步組件使用店量,同步器既可以支持獨(dú)占式地獲取同步狀態(tài)芜果,也可以支持共享式地獲取同步狀態(tài),這樣就可以方便實(shí)現(xiàn)不同類(lèi)型的同步組件(ReentrantLock融师、ReentrantReadWriteLock和CountDownLatch等)右钾。
同步器是實(shí)現(xiàn)鎖(也可以是任意同步組件)的關(guān)鍵,在鎖的實(shí)現(xiàn)中聚合同步器旱爆∫ㄉ洌可以這樣理解二者之間的關(guān)系:
鎖是面向使用者的,它定義了使用者與鎖交互的接口(比如可以允許兩個(gè)線程并行訪問(wèn))怀伦,隱藏了實(shí)現(xiàn)細(xì)節(jié)后控;
同步器面向的是鎖的實(shí)現(xiàn)者,它簡(jiǎn)化了鎖的實(shí)現(xiàn)方式空镜,屏蔽了同步狀態(tài)管理、線程的排隊(duì)、等待與喚醒等底層操作吴攒。鎖和同步器很好地隔離了使用者和實(shí)現(xiàn)者所需關(guān)注的領(lǐng)域张抄。
實(shí)現(xiàn)者需要繼承同步器并重寫(xiě)指定的方法,隨后將同步器組合在自定義同步組件的實(shí)現(xiàn)中洼怔,并調(diào)用同步器提供的模板方法署惯,而這些模板方法將會(huì)調(diào)用使用者重寫(xiě)的方法。
AQS基于CLH隊(duì)列鎖實(shí)現(xiàn)
CLH隊(duì)列鎖即Craig, Landin, and Hagersten (CLH) locks镣隶。
CLH隊(duì)列鎖也是一種基于鏈表的可擴(kuò)展极谊、高性能、公平的自旋鎖安岂,申請(qǐng)線程僅僅在本地變量上自旋轻猖,它不斷輪詢(xún)前驅(qū)的狀態(tài),假設(shè)發(fā)現(xiàn)前驅(qū)釋放了鎖就結(jié)束自旋域那。
當(dāng)一個(gè)線程需要獲取鎖時(shí):
1.創(chuàng)建一個(gè)新的QNode咙边,將其中的locked設(shè)置為true表示需要獲取鎖,myPred表示對(duì)其前驅(qū)結(jié)點(diǎn)的引用
2.線程A對(duì)tail域調(diào)用getAndSet方法次员,使自己成為隊(duì)列的尾部败许,同時(shí)獲取一個(gè)指向其前驅(qū)結(jié)點(diǎn)的引用myPred
3.線程B需要獲得鎖,同樣的流程再來(lái)一遍
4.線程就在前驅(qū)結(jié)點(diǎn)的locked字段上旋轉(zhuǎn)淑蔚,直到前驅(qū)結(jié)點(diǎn)釋放鎖(前驅(qū)節(jié)點(diǎn)的鎖值 locked == false)
5.當(dāng)一個(gè)線程需要釋放鎖時(shí)市殷,將當(dāng)前結(jié)點(diǎn)的locked域設(shè)置為false,同時(shí)回收前驅(qū)結(jié)點(diǎn)
如上圖所示刹衫,前驅(qū)結(jié)點(diǎn)釋放鎖醋寝,線程A的myPred所指向的前驅(qū)結(jié)點(diǎn)的locked字段變?yōu)閒alse,線程A就可以獲取到鎖绪妹。
CLH隊(duì)列鎖的優(yōu)點(diǎn)是空間復(fù)雜度低(如果有n個(gè)線程甥桂,L個(gè)鎖,每個(gè)線程每次只獲取一個(gè)鎖邮旷,那么需要的存儲(chǔ)空間是O(L+n)黄选,n個(gè)線程有n個(gè)myNode,L個(gè)鎖有L個(gè)tail)婶肩。CLH隊(duì)列鎖常用在SMP體系結(jié)構(gòu)下办陷。
Java中的AQS是CLH隊(duì)列鎖的一種變體實(shí)現(xiàn)。
AQS中的方法
acquire():獨(dú)占式獲取同步狀態(tài)律歼,如果當(dāng)前線程獲取同步狀態(tài)成功民镜,則由該方法返回,否則將會(huì)進(jìn)入同步隊(duì)列等待险毁,該方法將會(huì)調(diào)用重寫(xiě)的tryAcquire(int arg)方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
acquireInterruptibly:與acquire相同制圈,但是該方法會(huì)響應(yīng)中斷们童,當(dāng)線程未獲取到同步狀態(tài)而進(jìn)入同步隊(duì)列中,如果當(dāng)前線程被中斷鲸鹦,則該方法會(huì)拋出InterruptException并返回
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
tryAcquireNanos:在acquireInterruptibly基礎(chǔ)上增加了超時(shí)限制慧库,如果當(dāng)前線程在超時(shí)時(shí)間內(nèi)沒(méi)有獲取到同步鎖狀態(tài),那么就會(huì)返回false,如果獲取到了就返回true
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
acquireShared:共享式的獲取同步狀態(tài)馋嗜,如果當(dāng)前線程為獲取到同步狀態(tài)齐板,將會(huì)進(jìn)入同步隊(duì)列等待,與獨(dú)占式獲取的主要區(qū)別是在同一時(shí)刻可以有過(guò)個(gè)線程獲取到同步狀態(tài)
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
acquireSharedInterruptibly:與acquireShared相同葛菇,該方法會(huì)響應(yīng)中斷
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
tryAcquireSharedNanos:在acquireSharedInterruptibly基礎(chǔ)上增加了超時(shí)限制
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
release:獨(dú)占式的釋放同步狀態(tài)甘磨,該方法會(huì)在釋放同步狀態(tài)之后,將同步隊(duì)列中第一個(gè)節(jié)點(diǎn)包含的線程喚醒
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
releaseShared:共享式的釋放同步狀態(tài)
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
getQueuedThreads:獲取等待在同步隊(duì)列上的線程集合
public final Collection<Thread> getQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
return list;
}
其中可以被重寫(xiě)的方法:
tryAcquire
tryRelease
tryAcquireShared
tryReleaseShared
isHeldExclusively
既然AQS是實(shí)現(xiàn)同步器的基礎(chǔ)框架眯停,那么我們?cè)趺从盟麃?lái)實(shí)現(xiàn)一個(gè)我們自己的類(lèi)似ReentrantLock的工具呢济舆?
如下,定義一個(gè)類(lèi)實(shí)現(xiàn)Lock接口(每個(gè)鎖都要實(shí)現(xiàn)的接口)庵朝,然后定義一個(gè)內(nèi)部類(lèi)吗冤,我們這里命名為MySync繼承自AbstractQueuedSynchronizer,然后我們自定義的鎖的所有操作都是交給這個(gè)內(nèi)部類(lèi)MySync實(shí)現(xiàn)的
public class MyLock implements Lock {
MySync sync = new MySync();
@Override
public void lock() {
System.out.println(Thread.currentThread().getName()+" ready get lock");
sync.acquire(1);
System.out.println(Thread.currentThread().getName()+" already got lock");
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1,unit.toNanos(time));
}
@Override
public void unlock() {
System.out.println(Thread.currentThread().getName()+" ready release lock");
sync.release(1);
System.out.println(Thread.currentThread().getName()+" already released lock");
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
static class MySync extends AbstractQueuedSynchronizer{
@Override
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0,1)){
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
if (getState() == 0){
throw new IllegalMonitorStateException("already release lock");
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
protected Condition newCondition(){
return new ConditionObject();
}
}
}
測(cè)試類(lèi)
public class TestMyLock {
public void test() {
final Lock lock = new MyLock();
for (int i = 0; i < 4; i++) {
Worker w = new Worker(lock);
w.start();
}
}
public static void main(String[] args) {
TestMyLock lock = new TestMyLock();
lock.test();
}
class Worker extends Thread {
private final Lock lock;
public Worker(Lock lock) {
this.lock = lock;
}
public void run() {
lock.lock();
System.out.println(Thread.currentThread().getName());
try {
SleepTools.second(3);
} finally {
lock.unlock();
}
}
}
}
打印:
Thread-0 ready get lock
Thread-0 already got lock
Thread-0
Thread-1 ready get lock
Thread-2 ready get lock
Thread-3 ready get lock
Thread-0 ready release lock
Thread-0 already released lock
Thread-1 already got lock
Thread-1
Thread-1 ready release lock
Thread-1 already released lock
Thread-2 already got lock
Thread-2
Thread-2 ready release lock
Thread-2 already released lock
Thread-3 already got lock
Thread-3
Thread-3 ready release lock
Thread-3 already released lock
Process finished with exit code 0
我們這個(gè)鎖有個(gè)缺陷九府,他目前是不可重入的椎瘟,接下來(lái)我們實(shí)現(xiàn)一個(gè)自己的可重入鎖。重入鎖和非重入鎖的差別其實(shí)并不大侄旬,關(guān)鍵就在于獲取鎖和釋放鎖的時(shí)候是否有判斷當(dāng)前線程是否是已經(jīng)持有鎖的線程這一步
public class MyReentrantLock implements Lock {
MyReentrantSync reentrantSync = new MyReentrantSync();
@Override
public void lock() {
reentrantSync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
reentrantSync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return reentrantSync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return reentrantSync.tryAcquireNanos(1,unit.toNanos(time));
}
@Override
public void unlock() {
reentrantSync.release(1);
}
@Override
public Condition newCondition() {
return reentrantSync.newCondition();
}
class MyReentrantSync extends AbstractQueuedSynchronizer{
@Override
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
if (Thread.currentThread() == getExclusiveOwnerThread()){
setState(getState()+1);
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
if (Thread.currentThread() != getExclusiveOwnerThread()){
throw new IllegalMonitorStateException();
}
if (getState() == 0){
throw new IllegalMonitorStateException("already release");
}
setState(getState()-1);
if (getState() == 0){
setExclusiveOwnerThread(null);
}
return true;
}
protected Condition newCondition(){
return new ConditionObject();
}
}
}
測(cè)試類(lèi)
public class TestReenterSelfLock {
static final Lock lock = new MyReentrantLock();
public void reenter(int x){
lock.lock();
try {
System.out.println(Thread.currentThread().getName()+":遞歸層級(jí):"+x);
int y = x - 1;
if (y==0) return;
else{
reenter(y);
}
} finally {
lock.unlock();
}
}
public void test() {
for (int i = 0; i < 3; i++) {
Worker w = new Worker();
w.start();
}
}
public static void main(String[] args) {
TestReenterSelfLock testMyLock = new TestReenterSelfLock();
testMyLock.test();
}
class Worker extends Thread {
public void run() {
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
reenter(3);
}
}
}
打印結(jié)果
Thread-0
Thread-1
Thread-2
Thread-0:遞歸層級(jí):3
Thread-0:遞歸層級(jí):2
Thread-0:遞歸層級(jí):1
Thread-2:遞歸層級(jí):3
Thread-2:遞歸層級(jí):2
Thread-2:遞歸層級(jí):1
Thread-1:遞歸層級(jí):3
Thread-1:遞歸層級(jí):2
Thread-1:遞歸層級(jí):1