AbstractQueuedSynchronizer - AQS
Provides a framework for implementing blocking locks and related
synchronizers (semaphores, events, etc) that rely on
first-in-first-out (FIFO) wait queues.
AQS本質(zhì)是一個(gè)支持FIFO的同步隊(duì)列眷射,使用Node構(gòu)建鎖或其他同步組件的基礎(chǔ)框架。
CountDownLatch
,Semaphore
和ReentrantLock
內(nèi)部就實(shí)現(xiàn)了這種同步隊(duì)列。
abstract static class Sync extends AbstractQueuedSynchronizer {
...
}
AQS組件 - CountDownLatch
使用它使用了計(jì)數(shù)器阻塞當(dāng)前線程裕循,直到計(jì)數(shù)器為0除破,只會(huì)出現(xiàn)一次鼎姐。
package com.accat.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class CountDownLatchExample1 {
private final static int threadCount = 200;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
test(threadNum);
} catch (Exception e) {
log.error("exception", e);
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await();
log.info("finish");
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
Thread.sleep(100);
log.info("{}", threadNum);
Thread.sleep(100);
}
}
countDownLatch.await(10, TimeUnit.MILLISECONDS); // 也支持指定等待時(shí)間,超時(shí)則繼續(xù)執(zhí)行
AQS組件 - Semaphore
信號(hào)量茁影,規(guī)劃一次性可同時(shí)運(yùn)行的線程個(gè)數(shù)。
四個(gè)隊(duì)伍的隊(duì)員接力跑運(yùn)動(dòng)員(一次可容納四線程)在起跑線上等待接棒(信號(hào)量)阅束。
package com.accat.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
@Slf4j
public class SemaphoreExample1 {
private final static int threadCount = 20;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
semaphore.acquire(); // 獲取一個(gè)許可
test(threadNum);
semaphore.release(); // 釋放一個(gè)許可
} catch (Exception e) {
log.error("exception", e);
}
});
}
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
log.info("{}", threadNum);
Thread.sleep(1000);
}
}
嘗試做一些操作呼胚,如果沒有及時(shí)操作則丟棄這些操作。
如:接包處理息裸,如果處理時(shí)間超時(shí)蝇更,將處理不及時(shí)的包丟棄掉。
package com.accat.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@Slf4j
public class SemaphoreExample4 {
private final static int threadCount = 20;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
if (semaphore.tryAcquire(5000, TimeUnit.MILLISECONDS)) { // 嘗試獲取一個(gè)許可
test(threadNum);
semaphore.release(); // 釋放一個(gè)許可
}
} catch (Exception e) {
log.error("exception", e);
}
});
}
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
log.info("{}", threadNum);
Thread.sleep(1000);
}
}
AQS組件 - CyclicBarrier
計(jì)數(shù)器容許重置后再使用呼盆,多個(gè)線程等待其他線程的關(guān)系年扩。
相當(dāng)于多個(gè)運(yùn)動(dòng)員相互等待,等待其他運(yùn)動(dòng)員就位后再一起沖刺访圃,這里既有計(jì)數(shù)器功能厨幻,又有信號(hào)量功能。
package com.accat.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class CyclicBarrierExample1 {
private static CyclicBarrier barrier = new CyclicBarrier(5);
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
barrier.await();
log.info("{} continue", threadNum);
}
}
當(dāng)所有就位的運(yùn)動(dòng)員(線程)準(zhǔn)備時(shí)腿时, 裁判員鳴槍况脆, 運(yùn)動(dòng)員搶跑。
這個(gè)過程相當(dāng)于多個(gè)線程互相等待就位后批糟,需要在之前執(zhí)行其他操作(鳴槍)格了,
之后所有線程同時(shí)執(zhí)行,CyclicBarrier支持這種操作徽鼎。
package com.accat.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class CyclicBarrierExample3 {
private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
log.info("callback is running");
});
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
barrier.await();
log.info("{} continue", threadNum);
}
}
AQS組件 - ReentrantLock
ReentrantLock
和synchronize
區(qū)別
1.可重入鎖盛末,synchronize
依賴JVM實(shí)現(xiàn)弹惦,ReentrantLock
依賴JDK實(shí)現(xiàn),后者能查看源碼悄但。
2.兩者性能相差不大棠隐,官方推薦synchronize
,實(shí)現(xiàn)更加容易檐嚣,不用手動(dòng)釋放鎖助泽。
3.ReentrantLock
可以指定公平鎖和非公平鎖。
4.ReentrantLock
提供一個(gè)Condition
類净嘀,可以分組喚醒需要喚醒的線程
5.ReentrantLock
提供一個(gè)能夠中斷等待鎖的線程的機(jī)制报咳,lock.lockInterruptibly()
ReentrantLock
是一種自旋鎖實(shí)現(xiàn)侠讯,通過CAS機(jī)制不斷嘗試加鎖挖藏,避免線程進(jìn)入內(nèi)核態(tài)的阻塞狀態(tài),想盡辦法讓線程不進(jìn)入內(nèi)核態(tài)的阻塞狀態(tài)是理解鎖設(shè)計(jì)的關(guān)鍵厢漩。
package com.accat.concurrency.example.lock;
import com.accat.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j
@ThreadSafe
public class LockExample2 {
// 請(qǐng)求總數(shù)
public static int clientTotal = 5000;
// 同時(shí)并發(fā)執(zhí)行的線程數(shù)
public static int threadTotal = 200;
public static int count = 0;
private final static Lock lock = new ReentrantLock();
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal ; i++) {
executorService.execute(() -> {
try {
semaphore.acquire();
add();
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("count:{}", count);
}
private static void add() {
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
}
AQS組件 - ReentrantReadWriteLock
讓我們想一下膜眠,我們有一個(gè)類
Data
, 其中getData()
,setData()
溜嗜,我們要保證這個(gè)類線程安全宵膨,在其上加鎖,那么我們需要getData()
,setData()
互斥和setData()
,setData()
互斥炸宵,但是如果直接使用synchronize
或者ReentrantLock
的話辟躏,那么getData()
,getData()
也將互斥,這不是我們要的土全。
所以JDK提供了一個(gè)ReentrantLock
的繼承類ReentrantReadWriteLock
捎琐,實(shí)現(xiàn)讀寫鎖分離的實(shí)現(xiàn),避免上述問題裹匙。
package com.mmall.concurrency.example.lock;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@Slf4j
public class LockExample3 {
private final Map<String, Data> map = new TreeMap<>();
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock readLock = lock.readLock();
private final Lock writeLock = lock.writeLock();
public Data get(String key) {
readLock.lock();
try {
return map.get(key);
} finally {
readLock.unlock();
}
}
public Set<String> getAllKeys() {
readLock.lock();
try {
return map.keySet();
} finally {
readLock.unlock();
}
}
public Data put(String key, Data value) {
writeLock.lock();
try {
return map.put(key, value);
} finally {
readLock.unlock();
}
}
class Data {
}
}
StampedLock
StampedLock
是Java樂觀鎖的一種實(shí)現(xiàn)瑞凑。樂觀鎖是為執(zhí)行操作的對(duì)象附加一個(gè)versionId
,每次操作前獲取versionId
概页,操作完后查看versionId
是否沒被改變籽御,如果是則執(zhí)行更新,不是則放棄更新惰匙,重新操作技掏。
樂觀鎖適用于讀多寫少的場景。
package com.accat.concurrency.example.lock;
import com.accat.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.StampedLock;
@Slf4j
@ThreadSafe
public class LockExample5 {
// 請(qǐng)求總數(shù)
public static int clientTotal = 5000;
// 同時(shí)并發(fā)執(zhí)行的線程數(shù)
public static int threadTotal = 200;
public static int count = 0;
private final static StampedLock lock = new StampedLock();
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal ; i++) {
executorService.execute(() -> {
try {
semaphore.acquire();
add();
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("count:{}", count);
}
private static void add() {
long stamp = lock.writeLock();
try {
count++;
} finally {
lock.unlock(stamp);
}
}
}
AQS組件 - Condition
本質(zhì)上就是加入Sync queue之后项鬼, Sync queue和Condition queue之間Node的相互放置操作哑梳。
package com.accat.concurrency.example.lock;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j
public class LockExample6 {
public static void main(String[] args) {
ReentrantLock reentrantLock = new ReentrantLock();
Condition condition = reentrantLock.newCondition();
new Thread(() -> {
try {
reentrantLock.lock(); // 獲取鎖,加入 Sync queue
log.info("wait signal"); // 1
condition.await(); // 將鎖釋放秃臣, 進(jìn)入Condition queue
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("get signal"); // 4
reentrantLock.unlock();
}).start();
new Thread(() -> {
reentrantLock.lock(); // 由于鎖被釋放涧衙,拿到鎖哪工, 進(jìn)入Sync queue
log.info("get lock"); // 2
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
condition.signalAll(); // 將Condition queue中的Node放回到 Sync queue
log.info("send signal ~ "); // 3
reentrantLock.unlock(); // 釋放鎖, 將Node從Sync queue移除
}).start();
}
}