1. 簡介
AQS是AbstractQueuedSynchronizer的簡寫伏蚊,即隊列同步器。它是構(gòu)建鎖或者其他同步組件的基礎(chǔ)框架(如ReentrantLock膛虫、ReentrantReadWriteLock草姻、Semaphore等)。
2. 工作原理
AQS通過內(nèi)置的FIFO同步隊列來完成資源獲取線程的排隊工作稍刀,如果當前線程獲取同步狀態(tài)失斄枚馈(鎖)時,AQS則會將當前線程以及等待狀態(tài)等信息構(gòu)造成一個節(jié)點(Node)并將其加入同步隊列账月,同時會阻塞當前線程综膀,當同步狀態(tài)釋放時,則會把節(jié)點中的線程喚醒局齿,使其再次嘗試獲取同步狀態(tài)剧劝。
- 使用Node實現(xiàn)FIFO隊列,可以用于構(gòu)建鎖或者其它同步裝置的基礎(chǔ)框架抓歼。
- 利用了一個int類型表示狀態(tài)担平。
- 使用方法是繼承,子類通過繼承并通過實現(xiàn)它的方法管理其狀態(tài){ acquire 和release }的方法操縱狀態(tài)锭部。
- 可以同時實現(xiàn)排它鎖和共享鎖模式(獨占、共享)面褐。
3.AQS組件
3.1 CountDownLatch
CountDownLatch是通過一個計數(shù)器來實現(xiàn)的拌禾,計數(shù)器的初始值為線程的數(shù)量。每當一個線程完成了自己的任務(wù)后展哭,計數(shù)器的值就會減1湃窍。當計數(shù)器值到達0時闻蛀,它表示所有的線程已經(jīng)完成了任務(wù),然后在閉鎖上等待的線程就可以恢復(fù)執(zhí)行任務(wù)您市。
@Slf4j
public class CountDownLatchExample1 {
private final static int threadCount = 200;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
test(threadNum);
} catch (Exception e) {
log.error("exception", e);
} finally {
countDownLatch.countDown();
}
});
}
//countDownLatch.await(100,TimeUnit.MILLSECONDS),設(shè)定等候時間
countDownLatch.await();
log.info("finish");
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
Thread.sleep(100);
log.info("{}", threadNum);
Thread.sleep(100);
}
}
3.2 Semaphore
Semaphore負責協(xié)調(diào)各個線程觉痛,以保證它們能夠正確、合理的使用公共資源茵休,也是操作系統(tǒng)中用于控制進程同步互斥的量薪棒。Semaphore是一種計數(shù)信號量,用于管理一組資源榕莺,內(nèi)部是基于AQS的共享模式俐芯。它相當于給線程規(guī)定一個量從而控制允許活動的線程數(shù)。
Semaphore主要方法:
//構(gòu)造方法钉鸯,創(chuàng)建具有給定許可數(shù)的計數(shù)信號量并設(shè)置為非公平信號量
Semaphore(int permits)
//構(gòu)造方法吧史,當fair等于true時,創(chuàng)建具有給定許可數(shù)的計數(shù)信號量并設(shè)置為公平信號量
Semaphore(int permits,boolean fair)
//從此信號量獲取一個許可前線程將一直阻塞唠雕。相當于一輛車占了一個車位
void acquire()
//從此信號量獲取給定數(shù)目許可贸营,在提供這些許可前一直將線程阻塞。比如n=2岩睁,就相當于一輛車占了兩個車位
void acquire(int n)
//嘗試獲取許可钞脂,停車場有車位就進入,沒有就走
tryAcquire()
//釋放一個許可笙僚,將其返回給信號量芳肌。就如同車開走返回一個車位
void release()
//釋放n個許可
void release(int n)
//當前可用的許可數(shù)
int availablePermits()
下面一起看看如何使用Semaphore
@Slf4j
public class SemaphoreExample1 {
private final static int threadCount = 20;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
//semaphore.acquire(3); 獲取多個許可
//semaphore.tryAcquire() 嘗試獲取許可
//semaphore.tryAcquire(5000, TimeUnit.MILLISECONDS) 嘗試等待獲取許可
semaphore.acquire(); // 獲取一個許可
test(threadNum);
semaphore.release(); // 釋放一個許可
} catch (Exception e) {
log.error("exception", e);
}
});
}
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
log.info("{}", threadNum);
Thread.sleep(1000);
}
}
3.3 CyclicBarrier
CyclicBarrier是一個同步的輔助類,允許一組線程相互之間等待肋层,達到一個共同點亿笤,再繼續(xù)執(zhí)行。
@Slf4j
public class CyclicBarrierExample3 {
private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
//此處代碼在barrier滿足條件時優(yōu)先執(zhí)行
log.info("callback is running");
});
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
barrier.await();
log.info("{} continue", threadNum);
}
}
3.4 ReetrantLock
栋猖,JDK6.0版本之后synchronized 得到了大量的優(yōu)化净薛,二者性能也不分伯仲,但是重入鎖是可以完全替代synchronized關(guān)鍵字的蒲拉。除此之外肃拜, ReetrantLock又自帶一系列高逼格BUFF:可中斷響應(yīng)、鎖申請等待限時雌团、公平鎖燃领。另外可以結(jié)合Condition來使用,使其更是逼格滿滿锦援。
ReentrantLock與Synchronized區(qū)別
- 可重入性(都可重入)
- 鎖的實現(xiàn):Synchronized是依賴于JVM實現(xiàn)的猛蔽,而ReentrantLock是依賴于程序?qū)崿F(xiàn)。
- 性能區(qū)別:在JDK5.0版本之前, ReetrantLock的性能遠遠好于synchronized關(guān)鍵字曼库,但是隨著鎖的不斷優(yōu)化(自旋鎖区岗、輕量級鎖、偏向鎖)毁枯,兩者性能也差不太多慈缔。在兩者都能滿足需求的情況,更推薦使用synchronized种玛,簡單藐鹤。
- 功能區(qū)別:Synchronized的使用便于ReentrantLock,并且它是由編譯器是保證鎖的加鎖和釋放的蒂誉,而ReentrantLock是由我們自己控制的教藻;第二點鎖定粒度與靈活度,明顯ReentrantLock優(yōu)于Synchronized右锨。
- ReentrantLock獨有功能:1括堤,可指定公平鎖或非公平鎖。2绍移,提供了一個Condition類悄窃,可以分組喚醒需要喚醒的線程。3蹂窖,提供能夠中斷等待鎖的線程的機制轧抗,lock.lockInterruptibly()。
Condition的使用:Condition可以非常靈活的操作線程的喚醒瞬测,下面是一個線程等待與喚醒的例子横媚,其中用1234序號標出了日志輸出順序
public static void main(String[] args) {
ReentrantLock reentrantLock = new ReentrantLock();
Condition condition = reentrantLock.newCondition();//創(chuàng)建condition
//線程1
new Thread(() -> {
try {
reentrantLock.lock();
log.info("wait signal"); // 1
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("get signal"); // 4
reentrantLock.unlock();
}).start();
//線程2
new Thread(() -> {
reentrantLock.lock();
log.info("get lock"); // 2
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
condition.signalAll();//發(fā)送信號
log.info("send signal"); // 3
reentrantLock.unlock();
}).start();
}
3.5 ReentrantReadWriteLock讀寫鎖
在沒有任何讀寫鎖的時候才可以取得寫入鎖(悲觀讀取,容易寫線程饑餓)月趟,也就是說如果一直存在讀操作灯蝴,那么寫鎖一直在等待沒有讀的情況出現(xiàn),這樣我的寫鎖就永遠也獲取不到孝宗,就會造成等待獲取寫鎖的線程饑餓穷躁。 平時使用的場景并不多。
public class LockExample3 {
private final Map<String, Data> map = new TreeMap<>();
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock readLock = lock.readLock();
private final Lock writeLock = lock.writeLock();
public Data get(String key) {
readLock.lock();
try {
return map.get(key);
} finally {
readLock.unlock();
}
}
public Set<String> getAllKeys() {
readLock.lock();
try {
return map.keySet();
} finally {
readLock.unlock();
}
}
public Data put(String key, Data value) {
writeLock.lock();
try {
return map.put(key, value);
} finally {
writeLock.unlock();
}
}
class Data {}
}
3.6 StampedLock
StampedLock是Java8引入的一種新的鎖機制因妇,可以認為它是讀寫鎖的一個改進版本问潭,讀寫鎖雖然分離了讀和寫的功能,使得讀與讀之間可以完全并發(fā)婚被,但是讀和寫之間依然是沖突的狡忙,讀鎖會完全阻塞寫鎖,它使用的依然是悲觀鎖策略址芯。如果有大量的讀線程去枷,他也有可能引起寫線程的饑餓。而StampedLock則提供了一種樂觀的讀策略,這種樂觀策略的鎖非常類似于無鎖的操作删顶,使得樂觀鎖完全不會阻塞寫線程。
@Slf4j
@ThreadSafe
public class LockExample5 {
// 請求總數(shù)
public static int clientTotal = 5000;
// 同時并發(fā)執(zhí)行的線程數(shù)
public static int threadTotal = 200;
public static int count = 0;
private final static StampedLock lock = new StampedLock();
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal ; i++) {
executorService.execute(() -> {
try {
semaphore.acquire();
add();
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("count:{}", count);
}
private static void add() {
long stamp = lock.writeLock();
try {
count++;
} finally {
lock.unlock(stamp);
}
}
}
3.7 Callable淑廊、Future逗余、FutureTask
線程的創(chuàng)建方式中有兩種,一種是實現(xiàn)Runnable接口季惩,另一種是繼承Thread录粱,但是這兩種方式都有個缺點,那就是在任務(wù)執(zhí)行完成之后無法獲取返回結(jié)果画拾,于是就有了Callable接口啥繁,F(xiàn)uture接口與FutureTask類的配合取得返回的結(jié)果。
@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
該接口聲明了一個名稱為call()的方法青抛,同時這個方法可以有返回值V旗闽,也可以拋出異常。
下面看下Future接口定義
public interface Future<V> {
//如果任務(wù)還沒開始蜜另,執(zhí)行cancel(...)方法將返回false适室;如果任務(wù)已經(jīng)啟動,執(zhí)
//行cancel(true)方法將以中斷執(zhí)行此任務(wù)線程的方式來試圖停止任務(wù)举瑰,如果停
//止成功捣辆,返回true;當任務(wù)已經(jīng)啟動此迅,執(zhí)行cancel(false)方法將不會對正在執(zhí)
//行的任務(wù)線程產(chǎn)生影響(讓線程正常執(zhí)行到完成)汽畴,此時返回false;當任務(wù)已經(jīng)/
//完成耸序,執(zhí)行cancel(...)方法將返回false忍些。mayInterruptRunning參數(shù)表示是否中
//斷執(zhí)行中的線
boolean cancel(boolean mayInterruptIfRunning);
//如果任務(wù)完成前被取消,則返回true
boolean isCancelled();
//如果任務(wù)執(zhí)行結(jié)束佑吝,無論是正常結(jié)束或是中途取消還是發(fā)生異常坐昙,都返回true
boolean isDone();
//獲取異步執(zhí)行的結(jié)果,如果沒有結(jié)果可用芋忿,此方法會阻塞直到異步計算完成
V get() throws InterruptedException, ExecutionException;
//獲取異步執(zhí)行結(jié)果炸客,如果沒有結(jié)果可用,此方法會阻塞戈钢,但是會有時間限
//制痹仙,如果阻塞時間超過設(shè)定的timeout時間,該方法將拋出異常
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
通過方法我們可知Future提供了3種功能:(1)能夠中斷執(zhí)行中的任務(wù)(2)判斷任務(wù)是否執(zhí)行完成(3)獲取任務(wù)執(zhí)行完成后的結(jié)果殉了。
下面我們再看下FutureTask的定義
public class FutureTask<V> implements RunnableFuture<V> {
}
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
FutureTask除了實現(xiàn)了Future接口外還實現(xiàn)了Runnable接口,
通過上面的介紹开仰,我們對Callable,F(xiàn)uture,F(xiàn)utureTask都有了比較清晰的了解了众弓,那么它們到底有什么用呢恩溅?我們前面說過通過這樣的方式去創(chuàng)建線程的話,最大的好處就是能夠返回結(jié)果谓娃,加入有這樣的場景脚乡,我們現(xiàn)在需要計算一個數(shù)據(jù),而這個數(shù)據(jù)的計算比較耗時滨达,而我們后面的程序也要用到這個數(shù)據(jù)結(jié)果奶稠,那么這個時Callable豈不是最好的選擇?我們可以開設(shè)一個線程去執(zhí)行計算捡遍,而主線程繼續(xù)做其他事锌订,而后面需要使用到這個數(shù)據(jù)時,我們再使用Future獲取不就可以了嗎画株?下面我們就來編寫一個這樣的實例辆飘。
@Slf4j
public class FutureExample {
static class MyCallable implements Callable<String> {
@Override
public String call() throws Exception {
log.info("do something in callable");
Thread.sleep(5000);
return "Done";
}
}
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
Future<String> future = executorService.submit(new MyCallable());
log.info("do something in main");
Thread.sleep(1000);
String result = future.get();
log.info("result:{}", result);
}
}
下面我們用FutureTask再次實現(xiàn)類似功能
@Slf4j
public class FutureTaskExample {
public static void main(String[] args) throws Exception {
FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() {
@Override
public String call() throws Exception {
log.info("do something in callable");
Thread.sleep(5000);
return "Done";
}
});
new Thread(futureTask).start();
log.info("do something in main");
Thread.sleep(1000);
String result = futureTask.get();
log.info("result:{}", result);
}
}
3.8 Fork/Join框架
Fork/Join框架是Java7提供了的一個用于并行執(zhí)行任務(wù)的框架, 是一個把大任務(wù)分割成若干個小任務(wù)污秆,最終匯總每個小任務(wù)結(jié)果后得到大任務(wù)結(jié)果的框架劈猪。此處我們不做過多的說明,其原理是基于工作竊取算法良拼,指某個線程從其他隊列里竊取任務(wù)來執(zhí)行战得。下面我們以一個demo來介紹如何使用
@Slf4j
public class ForkJoinTaskExample extends RecursiveTask<Integer> {
public static final int threshold = 2;
private int start;
private int end;
public ForkJoinTaskExample(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
//如果任務(wù)足夠小就計算任務(wù)
boolean canCompute = (end - start) <= threshold;
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
// 如果任務(wù)大于閾值,就分裂成兩個子任務(wù)計算
int middle = (start + end) / 2;
ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);
// 執(zhí)行子任務(wù)
leftTask.fork();
rightTask.fork();
// 等待任務(wù)執(zhí)行結(jié)束合并其結(jié)果
int leftResult = leftTask.join();
int rightResult = rightTask.join();
// 合并子任務(wù)
sum = leftResult + rightResult;
}
return sum;
}
public static void main(String[] args) {
ForkJoinPool forkjoinPool = new ForkJoinPool();
//生成一個計算任務(wù)庸推,計算1+2+3+4
ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);
//執(zhí)行一個任務(wù)
Future<Integer> result = forkjoinPool.submit(task);
try {
log.info("result:{}", result.get());
} catch (Exception e) {
log.error("exception", e);
}
}
}
3.9 Queue
在并發(fā)隊列上JDK提供了兩套實現(xiàn)常侦,一個是以ConcurrentLinkedQueue為代表的高性能隊列,一個是以BlockingQueue接口為代表的阻塞隊列贬媒,無論哪種都集成自Queue聋亡。
ConcurrentLinkedQueue:是一個適用于高并發(fā)場景下的隊列,通過無鎖的方式,實現(xiàn)了高并發(fā)狀態(tài)下的高性能,通常ConcurrentLinkedQueue性能好于BlockingQueue。它是一個基于鏈接節(jié)點的無界線程安全隊列际乘。該隊列的元素遵循先進先出的原則坡倔。
BlockingQueue接口:
- ArrayBlockingQueue:基于數(shù)組的阻塞隊列實現(xiàn),在ArrayBlockingQueue內(nèi)部脖含,維護了一個定長數(shù)組,以便緩存隊列中的數(shù)據(jù)對象罪塔,其內(nèi)部沒有實現(xiàn)讀寫分離,也就意味著生產(chǎn)和消費不能完全并行养葵,長度是需要定義的征堪,可以指定先進先出或者先進后出,也叫
有界隊列
关拒。 - LinkedBlockingQueue:基于鏈表的阻塞隊列佃蚜,同ArrayBlockingQueue類似庸娱,其內(nèi)部也維持一個數(shù)據(jù)緩沖隊列(由列表構(gòu)成),LinkedBlockingQueue之所以能夠高效的處理并發(fā)數(shù)據(jù)谐算,是因為內(nèi)部實現(xiàn)采用分離鎖(讀寫分離兩個鎖),從而實現(xiàn)生產(chǎn)者和消費者操作完全并發(fā)熟尉,是一個無界隊列。
- SynchronousQueue:一種沒有緩沖的隊列氯夷,生產(chǎn)者生產(chǎn)的數(shù)據(jù)直接會被消費者獲取并消費臣樱。
- PriorityBlockingQueue:基于優(yōu)先級的阻塞隊列(優(yōu)先級的判斷通過構(gòu)造函數(shù)傳入的Compator對象來決定,也就是說
傳入隊列的對象必須實現(xiàn)Comparable接口
),在實現(xiàn)PriorityBlockingQueue時腮考,內(nèi)部控制線程同步得鎖采用的是公平鎖,它是一個無界
的隊列玄捕。 - DelayQueue:帶有延遲時間的Queue,其中的元素只有當其指定的延遲時間到了踩蔚,才能夠從隊列中獲取到該元素,DelayQueue中的元素必須實現(xiàn)Delayed接口,DelayQueue是一個沒有大小限制的隊列枚粘,應(yīng)用場景很多馅闽,比如對緩存超時的數(shù)據(jù)進行移除、任務(wù)超時處理馍迄、空閑鏈接的關(guān)閉等等福也。