AbstractQueuedSynchronizer
隊列同步器AbstractQueuedSynchronizer(以下簡稱同步器或AQS),是用來構(gòu)建鎖或者其他同步組件的基礎(chǔ)框架,它使用了一個int成員變量表示同步狀態(tài)亭敢,通過內(nèi)置的FIFO隊列來完成資源獲取線程的排隊工作蛉鹿。并發(fā)包的大師(Doug Lea)期望它能夠成為實(shí)現(xiàn)大部分同步需求的基礎(chǔ)内斯。
AQS使用方式和其中的設(shè)計模式
AQS的主要使用方式是繼承祈坠,子類通過繼承AQS并實(shí)現(xiàn)它的抽象方法來管理同步狀態(tài)叨咖,在AQS里由一個int型的state來代表這個狀態(tài)莱睁,在抽象方法的實(shí)現(xiàn)過程中免不了要對同步狀態(tài)進(jìn)行更改待讳,這時就需要使用同步器提供的3個方法(getState()、setState(int newState)和compareAndSetState(int expect,int update))來進(jìn)行操作仰剿,因為它們能夠保證狀態(tài)的改變是安全的创淡。
/**
* The synchronization state.
*/
private volatile int state;
在實(shí)現(xiàn)上,子類推薦被定義為自定義同步組件的靜態(tài)內(nèi)部類南吮,AQS自身沒有實(shí)現(xiàn)任何同步接口琳彩,它僅僅是定義了若干同步狀態(tài)獲取和釋放的方法來供自定義同步組件使用,同步器既可以支持獨(dú)占式地獲取同步狀態(tài)部凑,也可以支持共享式地獲取同步狀態(tài)露乏,這樣就可以方便實(shí)現(xiàn)不同類型的同步組件(ReentrantLock、ReentrantReadWriteLock和CountDownLatch等)涂邀。
同步器是實(shí)現(xiàn)鎖(也可以是任意同步組件)的關(guān)鍵瘟仿,在鎖的實(shí)現(xiàn)中聚合同步器”让悖可以這樣理解二者之間的關(guān)系:
鎖是面向使用者的劳较,它定義了使用者與鎖交互的接口(比如可以允許兩個線程并行訪問),隱藏了實(shí)現(xiàn)細(xì)節(jié)浩聋;
同步器面向的是鎖的實(shí)現(xiàn)者观蜗,它簡化了鎖的實(shí)現(xiàn)方式,屏蔽了同步狀態(tài)管理衣洁、線程的排隊墓捻、等待與喚醒等底層操作。鎖和同步器很好地隔離了使用者和實(shí)現(xiàn)者所需關(guān)注的領(lǐng)域坊夫。
實(shí)現(xiàn)者需要繼承同步器并重寫指定的方法砖第,隨后將同步器組合在自定義同步組件的實(shí)現(xiàn)中,并調(diào)用同步器提供的模板方法环凿,而這些模板方法將會調(diào)用使用者重寫的方法梧兼。
模板方法模式
同步器的設(shè)計基于模板方法模式。模板方法模式的意圖是拷邢,定義一個操作中的算法的骨架,而將一些步驟的實(shí)現(xiàn)延遲到子類中屎慢。模板方法使得子類可以不改變一個算法的結(jié)構(gòu)即可重定義該算法的某些特定步驟瞭稼。我們最常見的就是Spring框架里的各種Template。
實(shí)際例子
我們開了個蛋糕店腻惠,蛋糕店不能只賣一種蛋糕呀环肘,于是我們決定先賣奶油蛋糕,芝士蛋糕和慕斯蛋糕集灌。三種蛋糕在制作方式上一樣悔雹,都包括造型复哆,烘焙和涂抹蛋糕上的東西。所以可以定義一個抽象蛋糕模型
/**
* 類說明:抽象蛋糕模型
*/
public abstract class AbstractCake {
protected abstract void shape();
protected abstract void apply();
protected abstract void brake();
/*模板方法*/
public final void run(){
this.shape();
this.apply();
this.brake();
}
}
然后就可以批量生產(chǎn)三種蛋糕
/**
* 類說明:芝士蛋糕
*/
public class CheeseCake extends AbstractCake {
@Override
protected void shape() {
System.out.println("芝士蛋糕造型");
}
@Override
protected void apply() {
System.out.println("芝士蛋糕涂抹");
}
@Override
protected void brake() {
System.out.println("芝士蛋糕烘焙");
}
}
public class CreamCake extends AbstractCake {
public class MouseCake extends AbstractCake {
AbstractCake cake = new CheeseCake();
AbstractCake cake2 = new CreamCake();
//AbstractCake cake3 = new MouseCake();
cake.run();
這樣一來腌零,不但可以批量生產(chǎn)三種蛋糕梯找,而且如果日后有擴(kuò)展,只需要繼承抽象蛋糕方法就可以了益涧,十分方便锈锤,我們天天生意做得越來越賺錢。突然有一天闲询,我們發(fā)現(xiàn)市面有一種最簡單的小蛋糕銷量很好久免,這種蛋糕就是簡單烘烤成型就可以賣,并不需要涂抹什么食材扭弧,由于制作簡單銷售量大阎姥,這個品種也很賺錢,于是我們也想要生產(chǎn)這種蛋糕鸽捻。但是我們發(fā)現(xiàn)了一個問題呼巴,抽象蛋糕是定義了抽象的涂抹方法的,也就是說擴(kuò)展的這種蛋糕是必須要實(shí)現(xiàn)涂抹方法泊愧。怎么辦伊磺?我們可以將原來的模板修改為帶鉤子的模板。
/*模板方法*/
public final void run(){
this.shape();
if(this.shouldApply()){
this.apply();
}
this.brake();
}
protected boolean shouldApply(){
return true;
}
做小蛋糕的時候通過flag來控制是否涂抹删咱,其余已有的蛋糕制作不需要任何修改可以照常進(jìn)行屑埋。
/**
* 類說明:小蛋糕
*/
public class SmallCake extends AbstractCake {
private boolean flag = false;
public void setFlag(boolean shouldApply){
flag = shouldApply;
}
@Override
protected boolean shouldApply() {
return this.flag;
}
@Override
protected void shape() {
System.out.println("小蛋糕造型");
}
@Override
protected void apply() {
System.out.println("小蛋糕涂抹");
}
@Override
protected void brake() {
System.out.println("小蛋糕烘焙");
}
}
AQS中的方法
模板方法
實(shí)現(xiàn)自定義同步組件時,將會調(diào)用同步器提供的模板方法
這些模板方法同步器提供的模板方法基本上分為3類:獨(dú)占式獲取與釋放同步狀態(tài)痰滋、共享式獲取與釋放摘能、同步狀態(tài)和查詢同步隊列中的等待線程情況。
可重寫的方法
訪問或修改同步狀態(tài)的方法
重寫同步器指定的方法時敲街,需要使用同步器提供的如下3個方法來訪問或修改同步狀態(tài)团搞。
?getState():獲取當(dāng)前同步狀態(tài)。
?setState(int newState):設(shè)置當(dāng)前同步狀態(tài)多艇。
?compareAndSetState(int expect,int update):使用CAS設(shè)置當(dāng)前狀態(tài)逻恐,該方法能夠保證狀態(tài)設(shè)置的原子性。
AQS的基本思想CLH隊列鎖
CLH隊列鎖即Craig, Landin, and Hagersten (CLH) locks峻黍。
CLH隊列鎖也是一種基于鏈表的可擴(kuò)展复隆、高性能、公平的自旋鎖姆涩,申請線程僅僅在本地變量上自旋挽拂,它不斷輪詢前驅(qū)的狀態(tài),假設(shè)發(fā)現(xiàn)前驅(qū)釋放了鎖就結(jié)束自旋骨饿。
當(dāng)一個線程需要獲取鎖時:
-
創(chuàng)建一個的QNode亏栈,將其中的locked設(shè)置為true表示需要獲取鎖台腥,myPred表示對其前驅(qū)結(jié)點(diǎn)的引用
-
線程A對tail域調(diào)用getAndSet方法,使自己成為隊列的尾部绒北,同時獲取一個指向其前驅(qū)結(jié)點(diǎn)的引用myPred
線程B需要獲得鎖黎侈,同樣的流程再來一遍
3.線程就在前驅(qū)結(jié)點(diǎn)的locked字段上旋轉(zhuǎn),直到前驅(qū)結(jié)點(diǎn)釋放鎖(前驅(qū)節(jié)點(diǎn)的鎖值 locked == false)
4.當(dāng)一個線程需要釋放鎖時镇饮,將當(dāng)前結(jié)點(diǎn)的locked域設(shè)置為false蜓竹,同時回收前驅(qū)結(jié)點(diǎn)
如上圖所示,前驅(qū)結(jié)點(diǎn)釋放鎖储藐,線程A的myPred所指向的前驅(qū)結(jié)點(diǎn)的locked字段變?yōu)閒alse俱济,線程A就可以獲取到鎖。
CLH隊列鎖的優(yōu)點(diǎn)是空間復(fù)雜度低(如果有n個線程钙勃,L個鎖蛛碌,每個線程每次只獲取一個鎖,那么需要的存儲空間是O(L+n)辖源,n個線程有n個myNode蔚携,L個鎖有L個tail)。CLH隊列鎖常用在SMP體系結(jié)構(gòu)下克饶。
Java中的AQS是CLH隊列鎖的一種變體實(shí)現(xiàn)酝蜒。
實(shí)現(xiàn)我們自己獨(dú)占鎖,不可重入demo
SelfLock
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
*類說明:實(shí)現(xiàn)我們自己獨(dú)占鎖,不可重入
*/
public class SelfLock implements Lock {
// 靜態(tài)內(nèi)部類,自定義同步器
private static class Sync extends AbstractQueuedSynchronizer {
/*判斷處于占用狀態(tài)*/
@Override
protected boolean isHeldExclusively() {
return getState()==1;
}
/*獲得鎖*/
@Override
protected boolean tryAcquire(int arg) {
if(compareAndSetState(0,1)){
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
/*釋放鎖*/
@Override
protected boolean tryRelease(int arg) {
if(getState()==0){
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
//compareAndSetState(1,0);
return true;
}
// 返回一個Condition矾湃,每個condition都包含了一個condition隊列
Condition newCondition() {
return new ConditionObject();
}
}
// 僅需要將操作代理到Sync上即可
private final Sync sync = new Sync();
public void lock() {
System.out.println(Thread.currentThread().getName()+" ready get lock");
sync.acquire(1);
System.out.println(Thread.currentThread().getName()+" already got lock");
}
public boolean tryLock() {
return sync.tryAcquire(1);
}
public void unlock() {
System.out.println(Thread.currentThread().getName()+" ready release lock");
sync.release(1);
System.out.println(Thread.currentThread().getName()+" already released lock");
}
public Condition newCondition() {
return sync.newCondition();
}
public boolean isLocked() {
return sync.isHeldExclusively();
}
public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}
SleepTools
import java.util.concurrent.TimeUnit;
/**
*
*類說明:線程休眠輔助工具類
*/
public class SleepTools {
/**
* 按秒休眠
* @param seconds 秒數(shù)
*/
public static final void second(int seconds) {
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
}
}
/**
* 按毫秒數(shù)休眠
* @param seconds 毫秒數(shù)
*/
public static final void ms(int seconds) {
try {
TimeUnit.MILLISECONDS.sleep(seconds);
} catch (InterruptedException e) {
}
}
}
TestSelfLock
import java.util.concurrent.locks.Lock;
public class TestMyLock {
public void test() {
final Lock lock = new SelfLock();
class Worker extends Thread {
public void run() {
lock.lock();
System.out.println(Thread.currentThread().getName());
try {
SleepTools.second(1);
} finally {
lock.unlock();
}
}
}
// 啟動4個子線程
for (int i = 0; i < 4; i++) {
Worker w = new Worker();
//w.setDaemon(true);
w.start();
}
// 主線程每隔1秒換行
for (int i = 0; i < 10; i++) {
SleepTools.second(1);
//System.out.println();
}
}
public static void main(String[] args) {
TestMyLock testMyLock = new TestMyLock();
testMyLock.test();
}
}
打印結(jié)果為:
Thread-0 ready get lock
Thread-2 ready get lock
Thread-3 ready get lock
Thread-1 ready get lock
Thread-0 already got lock
Thread-0
Thread-0 ready release lock
Thread-0 already released lock
Thread-2 already got lock
Thread-2
Thread-2 ready release lock
Thread-2 already released lock
Thread-3 already got lock
Thread-3
Thread-3 ready release lock
Thread-3 already released lock
Thread-1 already got lock
Thread-1
Thread-1 ready release lock
Thread-1 already released lock
Process finished with exit code 0
ReentrantLock的實(shí)現(xiàn)
鎖的可重入
重進(jìn)入是指任意線程在獲取到鎖之后能夠再次獲取該鎖而不會被鎖所阻塞亡脑,該特性的實(shí)現(xiàn)需要解決以下兩個問題。
1)線程再次獲取鎖邀跃。鎖需要去識別獲取鎖的線程是否為當(dāng)前占據(jù)鎖的線程霉咨,如果是,則再次成功獲取拍屑。
2)鎖的最終釋放途戒。線程重復(fù)n次獲取了鎖,隨后在第n次釋放該鎖后僵驰,其他線程能夠獲取到該鎖喷斋。鎖的最終釋放要求鎖對于獲取進(jìn)行計數(shù)自增,計數(shù)表示當(dāng)前鎖被重復(fù)獲取的次數(shù)蒜茴,而鎖被釋放時星爪,計數(shù)自減,當(dāng)計數(shù)等于0時表示鎖已經(jīng)成功釋放矮男。
nonfairTryAcquire方法增加了再次獲取同步狀態(tài)的處理邏輯:通過判斷當(dāng)前線程是否為獲取鎖的線程來決定獲取操作是否成功移必,如果是獲取鎖的線程再次請求室谚,則將同步狀態(tài)值進(jìn)行增加并返回true毡鉴,表示獲取同步狀態(tài)成功崔泵。同步狀態(tài)表示鎖被一個線程重復(fù)獲取的次數(shù)。
如果該鎖被獲取了n次猪瞬,那么前(n-1)次tryRelease(int releases)方法必須返回false憎瘸,而只有同步狀態(tài)完全釋放了,才能返回true陈瘦』细剩可以看到,該方法將同步狀態(tài)是否為0作為最終釋放的條件痊项,當(dāng)同步狀態(tài)為0時锅风,將占有線程設(shè)置為null,并返回true鞍泉,表示釋放成功皱埠。
公平和非公平鎖
ReentrantLock的構(gòu)造函數(shù)中,默認(rèn)的無參構(gòu)造函數(shù)將會把Sync對象創(chuàng)建為NonfairSync對象咖驮,這是一個“非公平鎖”边器;而另一個構(gòu)造函數(shù)ReentrantLock(boolean fair)傳入?yún)?shù)為true時將會把Sync對象創(chuàng)建為“公平鎖”FairSync。
nonfairTryAcquire(int acquires)方法托修,對于非公平鎖忘巧,只要CAS設(shè)置同步狀態(tài)成功,則表示當(dāng)前線程獲取了鎖睦刃,而公平鎖則不同砚嘴。tryAcquire方法,該方法與nonfairTryAcquire(int acquires)比較眯勾,唯一不同的位置為判斷條件多了hasQueuedPredecessors()方法枣宫,即加入了同步隊列中當(dāng)前節(jié)點(diǎn)是否有前驅(qū)節(jié)點(diǎn)的判斷,如果該方法返回true吃环,則表示有線程比當(dāng)前線程更早地請求獲取鎖也颤,因此需要等待前驅(qū)線程獲取并釋放鎖之后才能繼續(xù)獲取鎖。
實(shí)現(xiàn)我們自己獨(dú)占鎖,可重入
ReenterSelfLock
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
*類說明:實(shí)現(xiàn)我們自己獨(dú)占鎖,可重入
*/
public class ReenterSelfLock implements Lock {
// 靜態(tài)內(nèi)部類郁轻,自定義同步器
private static class Sync extends AbstractQueuedSynchronizer {
// 是否處于占用狀態(tài)
protected boolean isHeldExclusively() {
return getState() > 0;
}
// 當(dāng)狀態(tài)為0的時候獲取鎖
public boolean tryAcquire(int acquires) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}else if(getExclusiveOwnerThread()==Thread.currentThread()){
setState(getState()+1);
return true;
}
return false;
}
// 釋放鎖翅娶,將狀態(tài)設(shè)置為0
protected boolean tryRelease(int releases) {
if(getExclusiveOwnerThread()!=Thread.currentThread()){
throw new IllegalMonitorStateException();
}
if (getState() == 0)
throw new IllegalMonitorStateException();
setState(getState()-1);
if(getState()==0){
setExclusiveOwnerThread(null);
}
return true;
}
// 返回一個Condition,每個condition都包含了一個condition隊列
Condition newCondition() {
return new ConditionObject();
}
}
// 僅需要將操作代理到Sync上即可
private final Sync sync = new Sync();
public void lock() {
System.out.println(Thread.currentThread().getName()+" ready get lock");
sync.acquire(1);
System.out.println(Thread.currentThread().getName()+" already got lock");
}
public boolean tryLock() {
return sync.tryAcquire(1);
}
public void unlock() {
System.out.println(Thread.currentThread().getName()+" ready release lock");
sync.release(1);
System.out.println(Thread.currentThread().getName()+" already released lock");
}
public Condition newCondition() {
return sync.newCondition();
}
public boolean isLocked() {
return sync.isHeldExclusively();
}
public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}
TestReenterSelfLock
import java.util.concurrent.locks.Lock;
/**
*類說明:
*/
public class TestReenterSelfLock {
static final Lock lock = new ReenterSelfLock();
public void reenter(int x){
lock.lock();
try {
System.out.println(Thread.currentThread().getName()+":遞歸層級:"+x);
int y = x - 1;
if (y==0) return;
else{
reenter(y);
}
} finally {
lock.unlock();
}
}
public void test() {
class Worker extends Thread {
public void run() {
System.out.println(Thread.currentThread().getName());
SleepTools.second(1);
reenter(3);
}
}
// 啟動3個子線程
for (int i = 0; i < 3; i++) {
Worker w = new Worker();
w.start();
}
// 主線程每隔1秒換行
for (int i = 0; i < 100; i++) {
SleepTools.second(1);
}
}
public static void main(String[] args) {
TestReenterSelfLock testMyLock = new TestReenterSelfLock();
testMyLock.test();
}
}
打印結(jié)果為:
Thread-0
Thread-2
Thread-1
Thread-1 ready get lock
Thread-0 ready get lock
Thread-2 ready get lock
Thread-1 already got lock
Thread-1:遞歸層級:3
Thread-1 ready get lock
Thread-1 already got lock
Thread-1:遞歸層級:2
Thread-1 ready get lock
Thread-1 already got lock
Thread-1:遞歸層級:1
Thread-1 ready release lock
Thread-1 already released lock
Thread-1 ready release lock
Thread-1 already released lock
Thread-1 ready release lock
Thread-1 already released lock
Thread-0 already got lock
Thread-0:遞歸層級:3
Thread-0 ready get lock
Thread-0 already got lock
Thread-0:遞歸層級:2
Thread-0 ready get lock
Thread-0 already got lock
Thread-0:遞歸層級:1
Thread-0 ready release lock
Thread-0 already released lock
Thread-0 ready release lock
Thread-0 already released lock
Thread-0 ready release lock
Thread-0 already released lock
Thread-2 already got lock
Thread-2:遞歸層級:3
Thread-2 ready get lock
Thread-2 already got lock
Thread-2:遞歸層級:2
Thread-2 ready get lock
Thread-2 already got lock
Thread-2:遞歸層級:1
Thread-2 ready release lock
Thread-2 already released lock
Thread-2 ready release lock
Thread-2 already released lock
Thread-2 ready release lock
Thread-2 already released lock
Process finished with exit code 0