synchronized有個重要的功能千诬,可以通過object中的wait()和 notify()方法實現(xiàn)生產(chǎn)者/消費者智润。ReentrantLock基于Condition也同樣可以實現(xiàn),而且相對于synchronized的無差別通知仑性,ReentrantLock可以選擇性的通知,減少了很多無用的線程競爭。本文主要就是分析下ReentrantLock是怎么通過Condition實現(xiàn)生產(chǎn)者/消費者模式的馏艾。
public class ProducerCustomerWithLock {
Executor pool = Executors.newFixedThreadPool(5);
private List<String> storeList = new LinkedList<>();//倉庫
//倉庫容量
private int MAX_VALUE = 5;
//倉庫為空
private int MIN_VALUE = 0;
// 線程鎖
private Lock lock = new ReentrantLock();
//倉庫滿了,綁定生產(chǎn)者線程
private Condition full = lock.newCondition();
//倉庫為空奴愉,綁定消費者線程
private Condition empty = lock.newCondition();
//生產(chǎn)者
private class producer implements Runnable {
@Override
public void run() {
while (true) {
produce();
}
}
private void produce() {
System.out.println(Thread.currentThread().getName() + "進(jìn)入倉庫琅摩,準(zhǔn)備生產(chǎn)!");
try {
lock.lock();
if (storeList.size() == MAX_VALUE) {
System.out.println("倉庫已滿,等待消費");
Thread.sleep(1000);
full.await(); //當(dāng)前線程等待,讓其他線程繼續(xù)執(zhí)行,可以看出wait是釋放鎖的
}
if (storeList.size() < MAX_VALUE) {
String product = "產(chǎn)品" + new Random().nextInt();
storeList.add(product);
System.out.println(Thread.currentThread().getName() + "往倉庫中生產(chǎn)了一個產(chǎn)品锭硼!" + product);
}
Thread.sleep(1000);
empty.signalAll();//喚醒消費者線程
} catch (InterruptedException e) {
System.out.println("中斷異常");
// e.printStackTrace();
}
finally {
lock.unlock();
}
}
}
private class consumer implements Runnable {
@Override
public void run() {
while (true) {
consume();
}
}
private void consume() {
System.out.println(Thread.currentThread().getName() + "進(jìn)入倉庫房资,準(zhǔn)備消費!");
try {
lock.lock();
if (storeList.size() == MIN_VALUE) {
System.out.println("倉庫已空,等待生產(chǎn)");
Thread.sleep(1000);
empty.await(); //當(dāng)前線程等待,讓其他線程繼續(xù)執(zhí)行,可以看出wait是釋放鎖的
}
if (storeList.size() > MIN_VALUE) {
System.out.println(Thread.currentThread().getName() + "從倉庫取得產(chǎn)品:" + storeList.remove(0));
}
Thread.sleep(1000);
full.signalAll();//喚醒生產(chǎn)者線程
} catch (InterruptedException e) {
System.out.println("中斷異常");
// e.printStackTrace();
}
finally {
lock.unlock();
}
}
}
//啟動生產(chǎn)者和消費者線程
public void start() {
for (int i = 1; i < 5; i++) {
pool.execute(new producer());
pool.execute(new consumer());
}
}
public static void main(String[] args) {
ProducerCustomerWithLock pc = new ProducerCustomerWithLock();
pc.start();
}
}
打印結(jié)果如下檀头,生產(chǎn)者消費者交替運行:
pool-1-thread-1進(jìn)入倉庫轰异,準(zhǔn)備生產(chǎn)!
pool-1-thread-2進(jìn)入倉庫暑始,準(zhǔn)備消費搭独!
pool-1-thread-3進(jìn)入倉庫,準(zhǔn)備生產(chǎn)廊镜!
pool-1-thread-1往倉庫中生產(chǎn)了一個產(chǎn)品牙肝!產(chǎn)品-885144207
pool-1-thread-4進(jìn)入倉庫,準(zhǔn)備消費嗤朴!
pool-1-thread-5進(jìn)入倉庫配椭,準(zhǔn)備生產(chǎn)!
pool-1-thread-1進(jìn)入倉庫雹姊,準(zhǔn)備生產(chǎn)股缸!
pool-1-thread-2從倉庫取得產(chǎn)品:產(chǎn)品-885144207
pool-1-thread-2進(jìn)入倉庫,準(zhǔn)備消費吱雏!
pool-1-thread-3往倉庫中生產(chǎn)了一個產(chǎn)品乓序!產(chǎn)品-1433422526
pool-1-thread-3進(jìn)入倉庫,準(zhǔn)備生產(chǎn)坎背!
pool-1-thread-4從倉庫取得產(chǎn)品:產(chǎn)品-1433422526
pool-1-thread-4進(jìn)入倉庫替劈,準(zhǔn)備消費!
pool-1-thread-5往倉庫中生產(chǎn)了一個產(chǎn)品得滤!產(chǎn)品-137356193
pool-1-thread-5進(jìn)入倉庫陨献,準(zhǔn)備生產(chǎn)!
pool-1-thread-1往倉庫中生產(chǎn)了一個產(chǎn)品懂更!產(chǎn)品831540660
...
接下來就直接進(jìn)入正題吧眨业,看下ConditionObject.await()方法急膀。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
先判斷線程狀態(tài)是否被中斷,如果被中斷則拋出異常龄捡。然后開始構(gòu)建Condition隊列卓嫂,看下addConditionWaiter()方法。
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
先判斷condition尾節(jié)點的waitStatus是否是condition狀態(tài)聘殖。如果不是只能是cancel狀態(tài)晨雳,則把隊列中是cancel狀態(tài)的節(jié)點移除。找到第一個是condition狀態(tài)的尾節(jié)點奸腺。
新建一個Node節(jié)點餐禁,模式是CONDITION,如果之前不存在尾節(jié)點突照。則把當(dāng)前節(jié)點作為頭結(jié)點和尾節(jié)點帮非,否則把當(dāng)前節(jié)點置為尾節(jié)點。然后執(zhí)行fullyRelease(node)方法讹蘑。
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
首先進(jìn)入到await的方法的前提條件是獲取到鎖末盔,所以這一步是釋放鎖并且喚醒AQS隊列頭結(jié)點的后置節(jié)點。這里為什么要釋放鎖呢座慰,很簡單await方法會阻塞當(dāng)前線程陨舱。當(dāng)前線程如果不釋放鎖就會導(dǎo)致后面的線程獲取不到鎖從而阻塞。極大的影響性能還可能造成死鎖角骤。所以await方法是會釋放鎖的隅忿。
再看isOnSyncQueue(node)是判斷當(dāng)前節(jié)點是否在AQS隊列中,如果不在則阻塞當(dāng)前節(jié)點所在線程邦尊。直到signal方法喚醒該線程背桐。那么先看下signal():
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
首先把當(dāng)前節(jié)點waitStatus狀態(tài)CAS操作為0。如果設(shè)置失敗蝉揍,且該節(jié)點為condition頭結(jié)點链峭,則把該節(jié)點排除出隊列。
如果設(shè)置成功又沾,則把該節(jié)點插入到AQS隊列中弊仪。也就是說Signal()喚醒后不是立即執(zhí)行的。而是進(jìn)入到AQS隊列中排隊杖刷。
如果該節(jié)點被取消了或者已經(jīng)被設(shè)置成了SIGNAL励饵,則取消阻塞該節(jié)點所在線程。其他情況由AQS頭節(jié)點喚醒滑燃。
再回過頭看await方法役听。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
之前睡眠的線程被喚醒了,改節(jié)點已經(jīng)加入到了AQS隊列可以退出循環(huán)了,然后主要就是去獲取鎖并返回線程的中斷狀態(tài)典予。