AQS
可以先查看 Java并發(fā) - 讀寫鎖與AQS簡單了解 進(jìn)行簡單的了解
通過維護(hù) state 進(jìn)行加鎖和解鎖(含讀鎖(共享鎖)和寫鎖(獨(dú)占鎖))
/**
* The synchronization state.
*/
private volatile int state;
繼承 AbstractQueuedSynchronizer 后需要實(shí)現(xiàn)的方法
- isHeldExclusively():該線程是否正在獨(dú)占資源氛魁。只有用到condition才需要去實(shí)現(xiàn)它其屏。
- tryAcquire(int):獨(dú)占方式。嘗試獲取資源,成功則返回true拥知,失敗則返回false。
- tryRelease(int):獨(dú)占方式蟋座。嘗試釋放資源褐澎,成功則返回true,失敗則返回false烈评。
- tyAcquireShared(int):共享方式火俄。嘗試獲取資源。負(fù)數(shù)表示失敗;0表示成功讲冠,但沒有剩余可用資源;正數(shù)表示成功瓜客,且有剩余資源。
- tryReleaseShared(int):共享方式竿开。嘗試釋放資源谱仪,如果釋放后允許喚醒后續(xù)等待結(jié)點(diǎn)返回true,否則返回false否彩。
1. Semaphore (依賴了 AQS 定義的模板方法)
Semaphore是一個(gè)計(jì)數(shù)信號(hào)量疯攒,常用于限制可以訪問某些資源(物理或邏輯的)線程數(shù)目。
簡單說列荔,是一種用來控制并發(fā)量的共享鎖敬尺。
package concurrent;
import java.util.concurrent.Semaphore;
public class Semaphore_Demo {
static Semaphore sp =new Semaphore(6);
public static void main(String[] args) {
for (int i = 0; i <1000 ; i++) {
new Thread(()->{
try {
sp.acquire(); //就是拿信號(hào)量
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("當(dāng)前信號(hào)量限制....");
try {
Thread.sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("執(zhí)行完了");
sp.release(); //執(zhí)行完了,就釋放信號(hào)量
}).start();
}
}
}
當(dāng)前信號(hào)量限制....
當(dāng)前信號(hào)量限制....
當(dāng)前信號(hào)量限制....
當(dāng)前信號(hào)量限制....
當(dāng)前信號(hào)量限制....
當(dāng)前信號(hào)量限制....
執(zhí)行完了
執(zhí)行完了
執(zhí)行完了
執(zhí)行完了
當(dāng)前信號(hào)量限制....
執(zhí)行完了
執(zhí)行完了
當(dāng)前信號(hào)量限制....
.....
以上例子表明贴浙,Semaphore(6) 限制了只能同時(shí)有六個(gè)線程執(zhí)行砂吞,也就起到了限流的作用
- 自己實(shí)現(xiàn)Semaphore的例子
package concurrent;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
public class MySemaphore {
private Sync sync;
public MySemaphore(int permits) {
sync = new Sync(permits);
}
public void acquire() {
sync.acquireShared(1);
}
public void release() {
sync.releaseShared(1);
}
class Sync extends AbstractQueuedSynchronizer {
// 信號(hào)量大小
private int permits;
public Sync(int permits) {
this.permits = permits;
}
@Override
protected int tryAcquireShared(int arg) {
//定義自己的方法
int state = getState();
int nextState = state + arg;
if (nextState <= permits) {
if (compareAndSetState(state, nextState))
return 1;
}
return -1;
}
@Override
protected boolean tryReleaseShared(int arg) {
int state = getState();
if (compareAndSetState(state, state - arg)) {
return true;
} else {
return false;
}
}
}
}
執(zhí)行代碼
package concurrent;
public class Semaphore_Demo {
static MySemaphore sp =new MySemaphore(6);
public static void main(String[] args) {
for (int i = 0; i <1000 ; i++) {
new Thread(()->{
sp.acquire(); //就是拿信號(hào)量
System.out.println("當(dāng)前信號(hào)量限制....");
try {
Thread.sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("執(zhí)行完了");
sp.release(); //執(zhí)行完了,就釋放信號(hào)量
}).start();
}
}
}
執(zhí)行結(jié)果
當(dāng)前信號(hào)量限制....
當(dāng)前信號(hào)量限制....
當(dāng)前信號(hào)量限制....
當(dāng)前信號(hào)量限制....
當(dāng)前信號(hào)量限制....
當(dāng)前信號(hào)量限制....
執(zhí)行完了
執(zhí)行完了
執(zhí)行完了
執(zhí)行完了
執(zhí)行完了
執(zhí)行完了
2. CountDownLatch (依賴了 AQS 定義的模板方法)
即實(shí)現(xiàn)了一種計(jì)數(shù)器崎溃,可以認(rèn)為是倒數(shù)計(jì)時(shí)蜻直,比方說 從 6 開始依次遞減(此時(shí)只是準(zhǔn)備就緒),然后到 0 的時(shí)候再一起啟動(dòng)。
好比有這樣的一個(gè)跑步比賽概而,比賽開始之前唤殴,所有的選手都需要向裁判示意已經(jīng)準(zhǔn)備就緒,一共有六個(gè)選手到腥,那么每一個(gè)選手向裁判示意準(zhǔn)備就緒朵逝,未準(zhǔn)備就緒的人數(shù)就減1,直到所有的選手都準(zhǔn)備就緒乡范,此時(shí)未準(zhǔn)備就緒的人數(shù)為0配名,那么裁判就會(huì)扣下發(fā)令槍示意比賽開始。
package concurrent;
import java.util.concurrent.CountDownLatch;
public class CountDownLatch_Demo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(6); //計(jì)數(shù)為6
for (int i = 0; i <6 ; i++) {
new Thread(()->{
System.out.println("開始準(zhǔn)備.....");
latch.countDown();//計(jì)數(shù)減一
}).start();
Thread.sleep(1000);
}
latch.await(); //每個(gè)線程執(zhí)行一次晋辆,則-1渠脉,在latch為0的時(shí)候開始向下運(yùn)行 這是這些線程都準(zhǔn)備就緒,然后去一起干同一件事
//還有一種方式瓶佳, 將一個(gè)活分為多段芋膘,每個(gè)線程去干一段
// for (int i = 0; i <6 ; i++) {
// new Thread(()->{
// latch.countDown(); // 計(jì)數(shù)減一
// try {
// latch.await(); // 阻塞 -- > 0
// System.out.println("線程:"+Thread.currentThread().getName()+"執(zhí)行完畢");
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// }).start();
// }
System.out.println("開始干活....");
}
}
開始準(zhǔn)備.....
開始準(zhǔn)備.....
開始準(zhǔn)備.....
開始準(zhǔn)備.....
開始準(zhǔn)備.....
開始準(zhǔn)備.....
開始干活....
- 自己實(shí)現(xiàn)CountDownLatch的例子
package concurrent;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
public class MyCountDownLatch {
private Sync sync;
public MyCountDownLatch(int count) {
sync = new Sync(count);
}
public void countDown() {
sync.releaseShared(1);
}
public void await() {
sync.acquireShared(1);
}
class Sync extends AbstractQueuedSynchronizer {
public Sync(int count) {
setState(count);
}
@Override
protected int tryAcquireShared(int arg) {
return getState() == 0 ? 1 : -1;
}
@Override
protected boolean tryReleaseShared(int arg) {
while (true) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc)) {
return nextc == 0;
}
}
}
}
}
執(zhí)行代碼
package concurrent;
import java.util.concurrent.CountDownLatch;
public class CountDownLatch_Demo {
public static void main(String[] args) throws InterruptedException {
MyCountDownLatch latch = new MyCountDownLatch(6); //計(jì)數(shù)為6
for (int i = 0; i <6 ; i++) {
new Thread(()->{
System.out.println("開始準(zhǔn)備.....");
latch.countDown();//計(jì)數(shù)減一
}).start();
Thread.sleep(1000);
}
latch.await(); //每個(gè)線程執(zhí)行一次,則-1霸饲,在latch為0的時(shí)候開始向下運(yùn)行 這是這些線程都準(zhǔn)備就緒为朋,然后去一起干同一件事
//還有一種方式, 將一個(gè)活分為多段厚脉,每個(gè)線程去干一段
// for (int i = 0; i <6 ; i++) {
// new Thread(()->{
// latch.countDown(); // 計(jì)數(shù)減一
// try {
// latch.await(); // 阻塞 -- > 0
// System.out.println("線程:"+Thread.currentThread().getName()+"執(zhí)行完畢");
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// }).start();
// }
System.out.println("開始干活....");
}
}
執(zhí)行結(jié)果
開始準(zhǔn)備.....
開始準(zhǔn)備.....
開始準(zhǔn)備.....
開始準(zhǔn)備.....
開始準(zhǔn)備.....
開始準(zhǔn)備.....
開始干活....
3. CyclicBarrier (依賴了可重入鎖的Condition和 signalAll)
循環(huán)柵欄习寸,可以循環(huán)利用的屏障。
舉例:排隊(duì)上摩天輪時(shí)傻工,每到齊四個(gè)人霞溪,就可以上同一個(gè)車廂。
package concurrent;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrier_Demo {
public static void main(String[] args) throws InterruptedException {
CyclicBarrier barrier = new CyclicBarrier(4);
for (int i = 0; i < 100; i++) { //假設(shè)有100個(gè)任務(wù)中捆,每次只能有固定數(shù)量的線程去執(zhí)行鸯匹,可以使用這個(gè)
new Thread(() -> {
try {
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("任務(wù)開始執(zhí)行");
}).start();
Thread.sleep(500L);
}
}
}
- 自己實(shí)現(xiàn)CyclicBarrier 的例子
package concurrent;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class MyCyclicBarrier {
private final ReentrantLock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private int count = 0; //批次
private final int parties; //多少線程準(zhǔn)備就緒?
private Object generation = new Object(); // 類似于版本號(hào)泄伪,解決偽喚醒的問題
public MyCyclicBarrier(int parties) {
this.parties = parties;
}
public void await() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Object g = generation;
int index = ++count;
if (index == parties) {
nextGeneration();
return;
}
while (true) {
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
if (g != generation) {
return;
}
}
} finally {
lock.unlock();
}
}
public void nextGeneration() {
condition.signalAll();
count = 0;
generation = new Object();
}
}
執(zhí)行代碼
package concurrent;
public class CyclicBarrier_Demo {
public static void main(String[] args) throws InterruptedException {
MyCyclicBarrier barrier = new MyCyclicBarrier(4);
for (int i = 0; i < 100; i++) { //假設(shè)有100個(gè)任務(wù)殴蓬,每次只能有固定數(shù)量的線程去執(zhí)行,可以使用這個(gè)
new Thread(() -> {
barrier.await();
System.out.println("任務(wù)開始執(zhí)行");
}).start();
Thread.sleep(500L);
}
}
}
如果覺得有收獲就點(diǎn)個(gè)贊吧臂容,更多知識(shí)科雳,請(qǐng)點(diǎn)擊關(guān)注查看我的主頁信息哦~