說起JAVA并發(fā)編程,就不得不聊聊CAS(Compare And Swap)和AQS了(AbstractQueuedSynchronizer)。
CAS(Compare And Swap)
什么是CAS
CAS(
Compare And Swap
)燎孟,即比較并交換。是解決多線程并行情況下使用鎖造成性能損耗的一種機(jī)制树枫,CAS操作包含三個(gè)操作數(shù)——內(nèi)存位置(V)募舟、預(yù)期原值(A)和新值(B)。如果內(nèi)存位置的值與預(yù)期原值相匹配盹兢,那么處理器會(huì)自動(dòng)將該位置值更新為新值邻梆。否則,處理器不做任何操作绎秒。無論哪種情況浦妄,它都會(huì)在CAS指令之前返回該位置的值。CAS有效地說明了“我認(rèn)為位置V應(yīng)該包含值A(chǔ)见芹;如果包含該值剂娄,則將B放到這個(gè)位置;否則玄呛,不要更改該位置阅懦,只告訴我這個(gè)位置現(xiàn)在的值即可。
在JAVA中把鉴,sun.misc.Unsafe
類提供了硬件級(jí)別的原子操作來實(shí)現(xiàn)這個(gè)CAS故黑。 java.util.concurrent
包下的大量類都使用了這個(gè) Unsafe.java
類的CAS操作儿咱。至于 Unsafe.java
的具體實(shí)現(xiàn)這里就不討論了。
CAS典型應(yīng)用
java.util.concurrent.atomic
包下的類大多是使用CAS操作來實(shí)現(xiàn)的(eg. AtomicInteger.java,AtomicBoolean,AtomicLong
)场晶。下面以 AtomicInteger.java
的部分實(shí)現(xiàn)來大致講解下這些原子類的實(shí)現(xiàn)混埠。
public class AtomicInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = 6214790243416807050L;
// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
private volatile int value;// 初始int大小
// 省略了部分代碼...
// 帶參數(shù)構(gòu)造函數(shù),可設(shè)置初始int大小
public AtomicInteger(int initialValue) {
value = initialValue;
}
// 不帶參數(shù)構(gòu)造函數(shù),初始int大小為0
public AtomicInteger() {
}
// 獲取當(dāng)前值
public final int get() {
return value;
}
// 設(shè)置值為 newValue
public final void set(int newValue) {
value = newValue;
}
//返回舊值诗轻,并設(shè)置新值為 newValue
public final int getAndSet(int newValue) {
/**
* 這里使用for循環(huán)不斷通過CAS操作來設(shè)置新值
* CAS實(shí)現(xiàn)和加鎖實(shí)現(xiàn)的關(guān)系有點(diǎn)類似樂觀鎖和悲觀鎖的關(guān)系
* */
for (;;) {
int current = get();
if (compareAndSet(current, newValue))
return current;
}
}
// 原子的設(shè)置新值為update, expect為期望的當(dāng)前的值
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
// 獲取當(dāng)前值current钳宪,并設(shè)置新值為current+1
public final int getAndIncrement() {
for (;;) {
int current = get();
int next = current + 1;
if (compareAndSet(current, next))
return current;
}
}
// 此處省略部分代碼,余下的代碼大致實(shí)現(xiàn)原理都是類似的
}
一般來說在競(jìng)爭(zhēng)不是特別激烈的時(shí)候扳炬,使用該包下的原子操作性能比使用 synchronized
關(guān)鍵字的方式高效的多(查看getAndSet()
吏颖,可知如果資源競(jìng)爭(zhēng)十分激烈的話,這個(gè)for
循環(huán)可能換持續(xù)很久都不能成功跳出恨樟。不過這種情況可能需要考慮降低資源競(jìng)爭(zhēng)才是)半醉。
在較多的場(chǎng)景我們都可能會(huì)使用到這些原子類操作。一個(gè)典型應(yīng)用就是計(jì)數(shù)了劝术,在多線程的情況下需要考慮線程安全問題缩多。通常第一映像可能就是:
public class Counter {
private int count;
public Counter(){}
public int getCount(){
return count;
}
public void increase(){
count++;
}
}
上面這個(gè)類在多線程環(huán)境下會(huì)有線程安全問題,要解決這個(gè)問題最簡(jiǎn)單的方式可能就是通過加鎖的方式,調(diào)整如下:
public class Counter {
private int count;
public Counter(){}
public synchronized int getCount(){
return count;
}
public synchronized void increase(){
count++;
}
}
這類似于悲觀鎖的實(shí)現(xiàn)养晋,我需要獲取這個(gè)資源衬吆,那么我就給他加鎖,別的線程都無法訪問該資源绳泉,直到我操作完后釋放對(duì)該資源的鎖逊抡。我們知道,悲觀鎖的效率是不如樂觀鎖的零酪,上面說了Atomic
下的原子類的實(shí)現(xiàn)是類似樂觀鎖的冒嫡,效率會(huì)比使用 synchronized
關(guān)系字高,推薦使用這種方式四苇,實(shí)現(xiàn)如下:
public class Counter {
private AtomicInteger count = new AtomicInteger();
public Counter(){}
public int getCount(){
return count.get();
}
public void increase(){
count.getAndIncrement();
}
}
AQS(AbstractQueuedSynchronizer)
什么是AQS
AQS(
AbstractQueuedSynchronizer
)灯谣,AQS是JDK下提供的一套用于實(shí)現(xiàn)基于FIFO等待隊(duì)列的阻塞鎖和相關(guān)的同步器的一個(gè)同步框架。這個(gè)抽象類被設(shè)計(jì)為作為一些可用原子int值來表示狀態(tài)的同步器的基類蛔琅。如果你有看過類似CountDownLatch
類的源碼實(shí)現(xiàn)胎许,會(huì)發(fā)現(xiàn)其內(nèi)部有一個(gè)繼承了AbstractQueuedSynchronizer
的內(nèi)部類Sync
÷奘郏可見CountDownLatch
是基于AQS框架來實(shí)現(xiàn)的一個(gè)同步器.類似的同步器在JUC下還有不少辜窑。(eg.Semaphore
)
AQS用法
如上所述,AQS管理一個(gè)關(guān)于狀態(tài)信息的單一整數(shù)寨躁,該整數(shù)可以表現(xiàn)任何狀態(tài)穆碎。比如, Semaphore
用它來表現(xiàn)剩余的許可數(shù)职恳,ReentrantLock
用它來表現(xiàn)擁有它的線程已經(jīng)請(qǐng)求了多少次鎖所禀;FutureTask
用它來表現(xiàn)任務(wù)的狀態(tài)(尚未開始方面、運(yùn)行、完成和取消)
To use this class as the basis of a synchronizer, redefine the
- following methods, as applicable, by inspecting and/or modifying
- the synchronization state using {@link #getState}, {@link
setState} and/or {@link #compareAndSetState}:
- <ul>
- <li> {@link #tryAcquire}
- <li> {@link #tryRelease}
- <li> {@link #tryAcquireShared}
- <li> {@link #tryReleaseShared}
- <li> {@link #isHeldExclusively}
- </ul>
如JDK的文檔中所說色徘,使用AQS來實(shí)現(xiàn)一個(gè)同步器需要覆蓋實(shí)現(xiàn)如下幾個(gè)方法恭金,并且使用getState,setState,compareAndSetState
這幾個(gè)方法來設(shè)置獲取狀態(tài)
1. boolean tryAcquire(int arg)
2. boolean tryRelease(int arg)
3. int tryAcquireShared(int arg)
4. boolean tryReleaseShared(int arg)
5. boolean isHeldExclusively()
以上方法不需要全部實(shí)現(xiàn),根據(jù)獲取的鎖的種類可以選擇實(shí)現(xiàn)不同的方法褂策,支持獨(dú)占(排他)獲取鎖的同步器應(yīng)該實(shí)現(xiàn)tryAcquire横腿、 tryRelease、isHeldExclusively
而支持共享獲取的同步器應(yīng)該實(shí)現(xiàn)tryAcquireShared斤寂、tryReleaseShared耿焊、isHeldExclusively
。下面以 CountDownLatch
舉例說明基于AQS實(shí)現(xiàn)同步器, CountDownLatch
用同步狀態(tài)持有當(dāng)前計(jì)數(shù)遍搞,countDown
方法調(diào)用 release
從而導(dǎo)致計(jì)數(shù)器遞減罗侯;當(dāng)計(jì)數(shù)器為0時(shí),解除所有線程的等待溪猿;await
調(diào)用acquire
歇父,如果計(jì)數(shù)器為0,acquire
會(huì)立即返回再愈,否則阻塞。通常用于某任務(wù)需要等待其他任務(wù)都完成后才能繼續(xù)執(zhí)行的情景护戳。源碼如下:
public class CountDownLatch {
/**
* 基于AQS的內(nèi)部Sync
* 使用AQS的state來表示計(jì)數(shù)count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
// 使用AQS的getState()方法設(shè)置狀態(tài)
setState(count);
}
int getCount() {
// 使用AQS的getState()方法獲取狀態(tài)
return getState();
}
// 覆蓋在共享模式下嘗試獲取鎖
protected int tryAcquireShared(int acquires) {
// 這里用狀態(tài)state是否為0來表示是否成功翎冲,為0的時(shí)候可以獲取到返回1,否則不可以返回-1
return (getState() == 0) ? 1 : -1;
}
// 覆蓋在共享模式下嘗試釋放鎖
protected boolean tryReleaseShared(int releases) {
// 在for循環(huán)中Decrement count直至成功;
// 當(dāng)狀態(tài)值即count為0的時(shí)候媳荒,返回false表示 signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;
// 使用給定計(jì)數(shù)值構(gòu)造CountDownLatch
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
// 讓當(dāng)前線程阻塞直到計(jì)數(shù)count變?yōu)?抗悍,或者線程被中斷
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// 阻塞當(dāng)前線程,除非count變?yōu)?或者等待了timeout的時(shí)間钳枕。當(dāng)count變?yōu)?時(shí)缴渊,返回true
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
// count遞減
public void countDown() {
sync.releaseShared(1);
}
// 獲取當(dāng)前count值
public long getCount() {
return sync.getCount();
}
public String toString() {
return super.toString() + "[Count = " + sync.getCount() + "]";
}
}
本文大致就講了這些東西,有些地方說的也不是特別好鱼炒。也有不全的地方衔沼,AQS的東西還是有不少的,建議大家自己去看JUC下的各個(gè)類的實(shí)現(xiàn)昔瞧,配合《JAVA并發(fā)編程實(shí)戰(zhàn)》這本書指蚁,相信是可以看明白的,從而得到更深刻的理解自晰。
作者:vioao
來源:CSDN
原文:https://blog.csdn.net/u010862794/article/details/72892300
版權(quán)聲明:本文為博主原創(chuàng)文章凝化,轉(zhuǎn)載請(qǐng)附上博文鏈接!