CountDownLatch
- CountDownLatchExample1
package com.alan.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class CountDownLatchExample1 {
private final static int threadCount = 200;
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
{
int threadNum = i;
exec.execute(() -> {
try {
test(threadNum);
} catch (InterruptedException e) {
log.error("InterruptedException", e);
} finally {
countDownLatch.countDown();
}
});
}
}
//通過countDown()和await()能保證所有線程執(zhí)行完成后如贷,再調(diào)用log.info("finish")
countDownLatch.await();
log.info("finish");
exec.shutdown();
}
public static void test(int threadNum) throws InterruptedException {
Thread.sleep(100);
log.info("{}",threadNum);
}
}
- CountDownLatchExample2 限制指定時(shí)間完成
package com.alan.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@Slf4j
public class CountDownLatchExample2 {
private final static int threadCount = 200;
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
{
int threadNum = i;
exec.execute(() -> {
try {
test(threadNum);
} catch (InterruptedException e) {
log.error("InterruptedException", e);
} finally {
countDownLatch.countDown();
}
});
}
}
//通過countDown()和await()能保證所有線程執(zhí)行完成后,再調(diào)用log.info("finish")
//設(shè)置超時(shí)時(shí)間10毫秒
countDownLatch.await(10,TimeUnit.MILLISECONDS);
log.info("finish");
//是先讓當(dāng)前線程任務(wù)都執(zhí)行完成后,才進(jìn)行shutdown操作
exec.shutdown();
}
public static void test(int threadNum) throws InterruptedException {
Thread.sleep(100);
log.info("{}",threadNum);
}
}
Semaphore 同步組件-信號量
Semaphore是一種在多線程環(huán)境下使用的設(shè)施杠袱,該設(shè)施負(fù)責(zé)協(xié)調(diào)各個(gè)線程尚猿,以保證它們能夠正確、合理的使用公共資源的設(shè)施霞掺,也是操作系統(tǒng)中用于控制進(jìn)程同步互斥的量谊路。
以一個(gè)停車場是運(yùn)作為例。為了簡單起見菩彬,假設(shè)停車場只有三個(gè)車位缠劝,一開始三個(gè)車位都是空的。這時(shí)如果同時(shí)來了五輛車骗灶,看門人允許其中三輛不受阻礙的進(jìn)入惨恭,然后放下車攔,剩下的車則必須在入口等待耙旦,此后來的車也都不得不在入口處等待脱羡。這時(shí),有一輛車離開停車場免都,看門人得知后锉罐,打開車攔,放入一輛绕娘,如果又離開兩輛脓规,則又可以放入兩輛,如此往復(fù)险领。
在這個(gè)停車場系統(tǒng)中侨舆,車位是公共資源,每輛車好比一個(gè)線程绢陌,看門人起的就是信號量的作用挨下。
更進(jìn)一步,信號量的特性如下:信號量是一個(gè)非負(fù)整數(shù)(車位數(shù))脐湾,所有通過它的線程(車輛)都會將該整數(shù)減一(通過它當(dāng)然是為了使用資源)臭笆,當(dāng)該整數(shù)值為零時(shí),所有試圖通過它的線程都將處于等待狀態(tài)沥割。在信號量上我們定義兩種操作: Wait(等待) 和 Release(釋放)耗啦。 當(dāng)一個(gè)線程調(diào)用Wait(等待)操作時(shí),它要么通過然后將信號量減一机杜,要么一直等下去帜讲,直到信號量大于一或超時(shí)。Release(釋放)實(shí)際上是在信號量上執(zhí)行加操作椒拗,對應(yīng)于車輛離開停車場似将,該操作之所以叫做“釋放”是因?yàn)榧硬僮鲗?shí)際上是釋放了由信號量守護(hù)的資源获黔。
應(yīng)用場景:只能訪問有限的資源
1、設(shè)置數(shù)據(jù)庫的連接數(shù)
2在验、設(shè)置數(shù)為1玷氏,將相當(dāng)于單線程運(yùn)行了。單一許可
package com.alan.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
@Slf4j
public class SemaphoreExample1 {
private final static int threadCount = 200;
//設(shè)置允許的并發(fā)數(shù)為20
private final static Semaphore semaphore = new Semaphore(20);
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < threadCount; i++) {
{
int threadNum = i;
exec.execute(() -> {
try {
semaphore.acquire(); //獲取一個(gè)許可
test(threadNum);
semaphore.release(); //釋放一個(gè)許可
} catch (InterruptedException e) {
log.error("InterruptedException", e);
}
});
}
}
exec.shutdown();
}
public static void test(int threadNum) throws InterruptedException {
log.info("{}",threadNum);
Thread.sleep(1000);
}
}
- 多個(gè)許可
package com.alan.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
@Slf4j
public class SemaphoreExample1 {
private final static int threadCount = 200;
//設(shè)置允許的并發(fā)數(shù)為20
private final static Semaphore semaphore = new Semaphore(20);
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < threadCount; i++) {
{
int threadNum = i;
exec.execute(() -> {
try {
semaphore.acquire(20);
test(threadNum);
semaphore.release(20);
} catch (InterruptedException e) {
log.error("InterruptedException", e);
}
});
}
}
exec.shutdown();
}
public static void test(int threadNum) throws InterruptedException {
log.info("{}",threadNum);
Thread.sleep(1000);
}
}
CyclicBarrier
- CyclicBarrier是一個(gè)同步工具類腋舌,它允許一組線程互相等待盏触,直到到達(dá)某個(gè)公共屏障點(diǎn)。與CountDownLatch不同的是該barrier在釋放等待線程后可以重用块饺,所以稱它為循環(huán)(Cyclic)的屏障(Barrier)赞辩。
- CyclicBarrier支持一個(gè)可選的Runnable命令,在一組線程中的最后一個(gè)線程到達(dá)之后(但在釋放所有線程之前)授艰,該命令只在每個(gè)屏障點(diǎn)運(yùn)行一次辨嗽。若在繼續(xù)所有參與線程之前更新共享狀態(tài),此屏障操作很有用淮腾。
package com.alan.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class CyclicBarrierExample1 {
private static CyclicBarrier barrier = new CyclicBarrier(5);
public static void main(String[] args) throws Exception {
ExecutorService executor= Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(()->{
try {
race(threadNum);
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
private static void race(int threadNum) throws Exception{
Thread.sleep(1000);
log.info("{} is ready",threadNum);
barrier.await();
log.info("{} continue",threadNum);
}
}
ReentrantLock 與鎖
可重入性:
從名字上理解糟需,ReenTrantLock的字面意思就是再進(jìn)入的鎖,其實(shí)synchronized關(guān)鍵字所使用的鎖也是可重入的谷朝,兩者關(guān)于這個(gè)的區(qū)別不大洲押。兩者都是同一個(gè)線程沒進(jìn)入一次,鎖的計(jì)數(shù)器都自增1圆凰,所以要等到鎖的計(jì)數(shù)器下降為0時(shí)才能釋放鎖诅诱。鎖的實(shí)現(xiàn):
Synchronized是依賴于JVM實(shí)現(xiàn)的,而ReenTrantLock是JDK實(shí)現(xiàn)的送朱,有什么區(qū)別,說白了就類似于操作系統(tǒng)來控制實(shí)現(xiàn)和用戶自己敲代碼實(shí)現(xiàn)的區(qū)別干旁。前者的實(shí)現(xiàn)是比較難見到的驶沼,后者有直接的源碼可供閱讀。性能的區(qū)別:
在Synchronized優(yōu)化以前争群,synchronized的性能是比ReenTrantLock差很多的回怜,但是自從Synchronized引入了偏向鎖,輕量級鎖(自旋鎖)后换薄,兩者的性能就差不多了玉雾,在兩種方法都可用的情況下,官方甚至建議使用synchronized轻要,其實(shí)synchronized的優(yōu)化我感覺就借鑒了ReenTrantLock中的CAS技術(shù)复旬。都是試圖在用戶態(tài)就把加鎖問題解決,避免進(jìn)入內(nèi)核態(tài)的線程阻塞冲泥。功能區(qū)別:
便利性:很明顯Synchronized的使用比較方便簡潔驹碍,并且由編譯器去保證鎖的加鎖和釋放壁涎,而ReenTrantLock需要手工聲明來加鎖和釋放鎖,為了避免忘記手工釋放鎖造成死鎖志秃,所以最好在finally中聲明釋放鎖怔球。
鎖的細(xì)粒度和靈活度:很明顯ReenTrantLock優(yōu)于Synchronized
- ReenTrantLock獨(dú)有的能力:
1、ReenTrantLock可以指定是公平鎖還是非公平鎖浮还。而synchronized只能是非公平鎖竟坛。所謂的公平鎖就是先等待的線程先獲得鎖。
2钧舌、ReenTrantLock提供了一個(gè)Condition(條件)類担汤,用來實(shí)現(xiàn)分組喚醒需要喚醒的線程們,而不是像synchronized要么隨機(jī)喚醒一個(gè)線程要么喚醒全部線程延刘。
3漫试、ReenTrantLock提供了一種能夠中斷等待鎖的線程的機(jī)制,通過lock.lockInterruptibly()來實(shí)現(xiàn)這個(gè)機(jī)制碘赖。
package com.alan.concurrency.example.lock;
import com.alan.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j
@ThreadSafe
public class LockExample2 {
//請求數(shù)1000
public static int clientTotal = 5000;
//同時(shí)并發(fā)執(zhí)行的線程數(shù)
public static int threadTotal = 200;
public static int count = 0;
//通過Lock接口實(shí)現(xiàn)
private static Lock lock = new ReentrantLock();
private static void add(){
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
//定義線程池ExecutorService接口
ExecutorService executorService = Executors.newCachedThreadPool();
//定義信號量,傳入并發(fā)線程數(shù) final修飾不允許重新賦值
final Semaphore semaphore = new Semaphore(threadTotal);
//定義計(jì)數(shù)器閉鎖驾荣。傳入請求總數(shù)
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
//通過匿名內(nèi)部類方式
executorService.execute(new Runnable() {
@Override
public void run() {
try {
//semaphore控制并發(fā)數(shù)量
semaphore.acquire();
add();
semaphore.release();
} catch (InterruptedException e) {
log.error("exception",e);
}
//每次執(zhí)行計(jì)數(shù)器減掉一個(gè)
countDownLatch.countDown();
}
});
}
countDownLatch.await();
executorService.shutdown();
log.info("count:{}",count);
}
}
- ReentrantReadWriteLock
package com.alan.concurrency.example.lock;
import com.alan.concurrency.annoations.ThreadSafe;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.util.Date;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@Slf4j
public class LockExample3 {
private final Map<String, Data> map = new TreeMap<>();
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
//分別定義讀鎖和寫鎖
private final Lock readLock = lock.readLock();
private final Lock writeLock = lock.writeLock();
public Data get(String key) {
readLock.lock();
try {
return map.get(key);
} finally {
readLock.unlock();
}
}
public Set<String> getAllKeys(){
readLock.lock();
try {
return map.keySet();
} finally {
readLock.unlock();
}
}
public Data put(String key, Data value){
writeLock.lock();
try {
return map.put(key,value);
} finally {
writeLock.unlock();
}
}
}
- StampedLock
package com.alan.concurrency.example.lock;
import java.util.concurrent.locks.StampedLock;
public class LockExample4 {
class Point {
private double x, y;
private final StampedLock sl = new StampedLock();
void move(double deltaX, double deltaY) { // an exclusively locked method
long stamp = sl.writeLock();
try {
x += deltaX;
y += deltaY;
} finally {
sl.unlockWrite(stamp);
}
}
//下面看看樂觀讀鎖案例
double distanceFromOrigin() { // A read-only method
long stamp = sl.tryOptimisticRead(); //獲得一個(gè)樂觀讀鎖
double currentX = x, currentY = y; //將兩個(gè)字段讀入本地局部變量
if (!sl.validate(stamp)) { //檢查發(fā)出樂觀讀鎖后同時(shí)是否有其他寫鎖發(fā)生?
stamp = sl.readLock(); //如果沒有普泡,我們再次獲得一個(gè)讀悲觀鎖
try {
currentX = x; // 將兩個(gè)字段讀入本地局部變量
currentY = y; // 將兩個(gè)字段讀入本地局部變量
} finally {
sl.unlockRead(stamp);
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
//下面是悲觀讀鎖案例
void moveIfAtOrigin(double newX, double newY) { // upgrade
// Could instead start with optimistic, not read mode
long stamp = sl.readLock();
try {
while (x == 0.0 && y == 0.0) { //循環(huán)播掷,檢查當(dāng)前狀態(tài)是否符合
long ws = sl.tryConvertToWriteLock(stamp); //將讀鎖轉(zhuǎn)為寫鎖
if (ws != 0L) { //這是確認(rèn)轉(zhuǎn)為寫鎖是否成功
stamp = ws; //如果成功 替換票據(jù)
x = newX; //進(jìn)行狀態(tài)改變
y = newY; //進(jìn)行狀態(tài)改變
break;
} else { //如果不能成功轉(zhuǎn)換為寫鎖
sl.unlockRead(stamp); //我們顯式釋放讀鎖
stamp = sl.writeLock(); //顯式直接進(jìn)行寫鎖 然后再通過循環(huán)再試
}
}
} finally {
sl.unlock(stamp); //釋放讀鎖或?qū)戞i
}
}
}
}
package com.alan.concurrency.example.lock;
import com.alan.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.StampedLock;
@Slf4j
@ThreadSafe
public class LockExample5 {
// 請求總數(shù)
public static int clientTotal = 5000;
// 同時(shí)并發(fā)執(zhí)行的線程數(shù)
public static int threadTotal = 200;
public static int count = 0;
private final static StampedLock lock = new StampedLock();
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal ; i++) {
executorService.execute(() -> {
try {
semaphore.acquire();
add();
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("count:{}", count);
}
private static void add() {
long stamp = lock.writeLock();
try {
count++;
} finally {
lock.unlock(stamp);
}
}
}