一、ReentrantLock
從jdk發(fā)行1.5版本之后买雾,在原來synchronize的基礎(chǔ)上,增加了重入鎖 ReentrantLock。
首先來看一個(gè)實(shí)例:
class ReentrantLockTestDemo {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2
, 10
, 1
, TimeUnit.HOURS
, new ArrayBlockingQueue<>(4)
, Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy());
MyRunnable myRunnable = new MyRunnable();
threadPoolExecutor.submit(myRunnable);
threadPoolExecutor.submit(myRunnable);
}
}
未使用ReentrantLock:
static class MyRunnable implements Runnable {
@Override
public void run() {
for (int i = 0; i < 3; i++) {
System.out.println("index=" + i + " thread=" + Thread.currentThread().getName());
}
}
}
未使用ReentrantLock打印的結(jié)果是沒有順序扛施,雜亂無章的
index=0 thread=pool-1-thread-1
index=0 thread=pool-1-thread-2
index=1 thread=pool-1-thread-1
index=1 thread=pool-1-thread-2
index=2 thread=pool-1-thread-1
index=2 thread=pool-1-thread-2
使用ReentrantLock加入鎖:
static class MyRunnable implements Runnable {
@Override
public void run() {
ReentrantLock reentrantLock = new ReentrantLock();
// 加鎖
reentrantLock.lock();
try {
for (int i = 0; i < 3; i++) {
System.out.println("index=" + i + " thread=" + Thread.currentThread().getName());
}
} finally {
// 解鎖
reentrantLock.unlock();
}
}
}
打印出的結(jié)果,是有順序的
index=0 thread=pool-1-thread-1
index=1 thread=pool-1-thread-1
index=2 thread=pool-1-thread-1
index=0 thread=pool-1-thread-2
index=1 thread=pool-1-thread-2
index=2 thread=pool-1-thread-2
這就是鎖的作用屹篓,它是互斥的疙渣,當(dāng)一個(gè)線程持有鎖的時(shí)候,其他線程只有等待堆巧,待待線程執(zhí)行結(jié)束妄荔,釋放鎖泼菌,等待的線程再通過競(jìng)爭(zhēng)得到鎖。
二啦租、Condition
通常在開發(fā)并發(fā)程序的時(shí)候哗伯,會(huì)碰到需要停止正在執(zhí)行業(yè)務(wù)A,來執(zhí)行另一個(gè)業(yè)務(wù)B篷角,當(dāng)業(yè)務(wù)B執(zhí)行完成后焊刹,業(yè)務(wù)A繼續(xù)執(zhí)行。就可以通過ReentrantLock和Condtion等待/喚醒來完成這樣的操作恳蹲。在LinkedBlockingQueue的put/take操作中就有使用到虐块。
相較于synchronize的wait()、notify()/notifyAll()則更有針對(duì)性嘉蕾、靈活性贺奠。可以喚醒符合某個(gè)條件線程去執(zhí)行荆针,而notify/notifyAll()則是隨機(jī)通知的敞嗡,具有很大的不可控性。
1航背、使用Condition實(shí)現(xiàn)線程等待和喚醒
class ConditionTestDemo {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3
, 10
, 1
, TimeUnit.HOURS
, new ArrayBlockingQueue<>(4)
, Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy());
MyService service = new MyService();
// 線程1喉悴、2是符合條件A的
threadPoolExecutor.submit(new RunnableA(service));
threadPoolExecutor.submit(new RunnableA(service));
// 線程3是符合條件B的
threadPoolExecutor.submit(new RunnableB(service));
// 主線程sleep2s后,主動(dòng)喚醒符合條件B的線程玖媚。再由線程B去喚醒符合條件A的兩個(gè)線程箕肃。
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "等待2s 去喚醒符合條件B的所有線程" + "--------" + DateUtils.INSTANCE.getCurrDataStr());
service.signalB();
}
static class RunnableA implements Runnable {
private MyService service;
public RunnableA(MyService service) {
this.service = service;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "符合條件A--------" + DateUtils.INSTANCE.getCurrDataStr());
service.awaitA();
}
}
static class RunnableB implements Runnable {
private MyService service;
public RunnableB(MyService service) {
this.service = service;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "符合條件B--------" + DateUtils.INSTANCE.getCurrDataStr());
service.awaitB();
}
}
static class MyService {
ReentrantLock reentrantLock = new ReentrantLock();
Condition threadACondition = reentrantLock.newCondition();
Condition threadBCondition = reentrantLock.newCondition();
/**
* 符合添加A的線程進(jìn)入等待
*/
public void awaitA() {
reentrantLock.lock();
try {
System.out.println(Thread.currentThread().getName() + "獲取到鎖。被要求等待--------" + DateUtils.INSTANCE.getCurrDataStr());
threadACondition.await();
System.out.println(Thread.currentThread().getName() + "被喚醒--------" + DateUtils.INSTANCE.getCurrDataStr());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
}
}
/**
* 符合添加B的線程進(jìn)入等待
*/
public void awaitB() {
reentrantLock.lock();
try {
System.out.println(Thread.currentThread().getName() + "獲取到鎖今魔,被要求等待--------" + DateUtils.INSTANCE.getCurrDataStr());
threadBCondition.await();
System.out.println(Thread.currentThread().getName() + "被喚醒--------" + DateUtils.INSTANCE.getCurrDataStr());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
}
}
/**
* 喚醒符合條件A的所有線程
*/
public void signalA() {
reentrantLock.lock();
try {
threadACondition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
}
}
public void signalB() {
reentrantLock.lock();
try {
// 喚醒符合條件B的所有線程
threadBCondition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
}
System.out.println(Thread.currentThread().getName() + "等待2s 再去喚醒符合條件A的所有線程" + "--------" + DateUtils.INSTANCE.getCurrDataStr());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 在喚醒符合條件B的所有線程后勺像,2s再去喚醒符合條件A的所有線程
signalA();
}
}
}
打印結(jié)果:
pool-1-thread-1符合條件A--------------22/04/01 21:37:21
pool-1-thread-3符合條件B--------------22/04/01 21:37:21
pool-1-thread-1獲取到鎖。被要求等待-----22/04/01 21:37:21
pool-1-thread-2符合條件A--------------22/04/01 21:37:21
pool-1-thread-2獲取到鎖错森。被要求等待-----22/04/01 21:37:21
pool-1-thread-3獲取到鎖吟宦,被要求等待-----22/04/01 21:37:21 // 三個(gè)線程一開啟,就被要求等待
main等待2s 去喚醒符合條件B的所有線程-----22/04/01 21:37:23 // 主線程等待2s后涩维,主動(dòng)去喚醒符合條件B的線程
pool-1-thread-3被喚醒----------------22/04/01 21:37:23 // 符合條件B的線程被喚醒
main等待2s 再去喚醒符合條件A的所有線程---22/04/01 21:37:25 // 符合條件B的線程被喚醒后殃姓,再等待2s,去喚醒符合條件A的所有線程
pool-1-thread-1被喚醒----------------22/04/01 21:37:25
pool-1-thread-2被喚醒----------------22/04/01 21:37:25 // 線程1瓦阐、2符合條件A,被同一時(shí)間喚醒
分別實(shí)例化了兩個(gè)Condition對(duì)象蜗侈,都是使用同一個(gè)lock注冊(cè)。注意 conditionA對(duì)象的等待和喚醒只對(duì)使用了conditionA的線程有用睡蟋,同理 conditionB對(duì)象的等待和喚醒只對(duì)使用了conditionB的線程有用踏幻。
2、模擬生產(chǎn)/消費(fèi)者
static class MyService {
private ReentrantLock mReentrantLock = new ReentrantLock();
private Condition mCondition = mReentrantLock.newCondition();
private boolean isFull;
private int index;
public void put() {
mReentrantLock.lock();
try {
// 如果隊(duì)列已滿戳杀,則進(jìn)入等待中
if (isFull) {
System.out.println("隊(duì)列已滿该面,生產(chǎn)者進(jìn)入等待中----" + DateUtils.INSTANCE.getCurrDataStr());
mCondition.await();
}
System.out.println("開始生產(chǎn)夭苗,index=" + index + "需要2s----" +DateUtils.INSTANCE.getCurrDataStr());
// 每隔2s放一個(gè)元素
index++;
Thread.sleep(2000);
// 通知取
isFull = true;
mCondition.signalAll();
System.out.println("結(jié)束生產(chǎn),index=" + index + "喚醒消費(fèi)者進(jìn)行生產(chǎn)----" + DateUtils.INSTANCE.getCurrDataStr());
} catch (Exception e) {
e.printStackTrace();
} finally {
mReentrantLock.unlock();
}
}
public void take() {
mReentrantLock.lock();
try {
// 如果隊(duì)列已空吆倦,則進(jìn)入等待中
if (!isFull) {
System.out.println("隊(duì)列已空听诸,消費(fèi)者進(jìn)入等待中----" + DateUtils.INSTANCE.getCurrDataStr());
mCondition.await();
}
System.out.println("開始消費(fèi),index=" + index + "需要3s----" + DateUtils.INSTANCE.getCurrDataStr());
index--;
Thread.sleep(3000);
isFull = false;
// 提醒生產(chǎn)者
mCondition.signalAll();
System.out.println("結(jié)束消費(fèi)蚕泽,index=" + index + "喚醒生產(chǎn)者進(jìn)行生產(chǎn)----" + DateUtils.INSTANCE.getCurrDataStr());
} catch (Exception e) {
e.printStackTrace();
} finally {
mReentrantLock.unlock();
}
}
}
生產(chǎn)者類:
static class PutRunnable implements Runnable {
MyService myService;
public PutRunnable(MyService myService) {
this.myService = myService;
}
@Override
public void run() {
while (true) {
myService.put();
}
}
}
消費(fèi)者類:
static class TakeRunnable implements Runnable {
MyService myService;
public TakeRunnable(MyService myService) {
this.myService = myService;
}
@Override
public void run() {
while (true) {
myService.take();
}
}
}
啟動(dòng)類:
class ConditionDemo {
public static void main(String[] args) {
MyService myService = new MyService();
ExecutorService executorService = Executors.newFixedThreadPool(4);
executorService.execute(new PutRunnable(myService));
executorService.execute(new TakeRunnable(myService));
}
}
打印結(jié)果:
開始生產(chǎn)晌梨,index=0需要2s----22/04/02 13:21:15
結(jié)束生產(chǎn),index=1喚醒消費(fèi)者進(jìn)行生產(chǎn)----22/04/02 13:21:17
隊(duì)列已滿须妻,生產(chǎn)者進(jìn)入等待中----22/04/02 13:21:17
開始消費(fèi)仔蝌,index=1需要3s----22/04/02 13:21:17
結(jié)束消費(fèi),index=0喚醒生產(chǎn)者進(jìn)行生產(chǎn)----22/04/02 13:21:20
隊(duì)列已空荒吏,消費(fèi)者進(jìn)入等待中----22/04/02 13:21:20
開始生產(chǎn)敛惊,index=0需要2s----22/04/02 13:21:20
結(jié)束生產(chǎn),index=1喚醒消費(fèi)者進(jìn)行生產(chǎn)----22/04/02 13:21:22
隊(duì)列已滿绰更,生產(chǎn)者進(jìn)入等待中----22/04/02 13:21:22
開始消費(fèi)瞧挤,index=1需要3s----22/04/02 13:21:22
結(jié)束消費(fèi),index=0喚醒生產(chǎn)者進(jìn)行生產(chǎn)----22/04/02 13:21:25
隊(duì)列已空儡湾,消費(fèi)者進(jìn)入等待中----22/04/02 13:21:25
開始生產(chǎn)特恬,index=0需要2s----22/04/02 13:21:25
...
3、順序執(zhí)行線程
充分發(fā)掘Condition的靈活性徐钠,可以用它來實(shí)現(xiàn)順序執(zhí)行線程癌刽。
class MyService {
private ReentrantLock mReentrantLock = new ReentrantLock();
// 有三個(gè)線程,所有注冊(cè)三個(gè)Condition
private Condition mConditionA = mReentrantLock.newCondition();
private Condition mConditionB = mReentrantLock.newCondition();
private Condition mConditionC = mReentrantLock.newCondition();
// 通過index控制下一個(gè)執(zhí)行的線程
private int index;
public void actionA() {
mReentrantLock.lock();
try {
// 只有index 不等于 0尝丐,則進(jìn)入等待中
if (index != 0) {
mConditionA.await();
}
System.out.println("A執(zhí)行");
Thread.sleep(1000);
index = 1;
mConditionB.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
mReentrantLock.unlock();
}
}
public void actionB() {
mReentrantLock.lock();
try {
// 只有index 不等于 1显拜,則進(jìn)入等待中
if (index != 1) {
mConditionB.await();
}
System.out.println("B執(zhí)行");
Thread.sleep(1000);
index = 2;
mConditionC.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
mReentrantLock.unlock();
}
}
// 只有index==2時(shí),才執(zhí)行下面操作爹袁,否則休眠
public void actionC() {
mReentrantLock.lock();
try {
// 只有index 不等于 2远荠,則進(jìn)入等待中
if (index != 2) {
mConditionC.await();
}
System.out.println("C執(zhí)行");
Thread.sleep(1000);
index = 0;
mConditionA.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
mReentrantLock.unlock();
}
}
}
業(yè)務(wù)類:
class RunnableA implements Runnable {
MyService myService;
public RunnableA(MyService myService) {
this.myService = myService;
}
@Override
public void run() {
while (true) {
myService.actionA();
}
}
}
class RunnableB implements Runnable {
MyService myService;
public RunnableB(MyService myService) {
this.myService = myService;
}
@Override
public void run() {
while (true) {
myService.actionB();
}
}
}
class RunnableC implements Runnable {
MyService myService;
public RunnableC(MyService myService) {
this.myService = myService;
}
@Override
public void run() {
while (true) {
myService.actionC();
}
}
}
啟動(dòng)類:
class ConditionDemo {
public static void main(String[] args) {
MyService myService = new MyService();
ExecutorService executorService = Executors.newFixedThreadPool(4);
// 這邊故意打亂啟動(dòng)順序
executorService.execute(new RunnableB(myService));
executorService.execute(new RunnableA(myService));
executorService.execute(new RunnableC(myService));
}
}
打印結(jié)果:
A執(zhí)行
B執(zhí)行
C執(zhí)行
A執(zhí)行
B執(zhí)行
C執(zhí)行
...