1.Lock接口
一般來說识窿,一個鎖能夠防止多個線程同時訪問共享資源(但有些鎖可以允許多個線程并發(fā)的訪問共享資源至耻,比如讀寫鎖)庆揩。
Java SE 5之后,并發(fā)包中新增了Lock接口(以及相關(guān)實現(xiàn)類)用來實現(xiàn)鎖功能间景,它提供了與synchronized關(guān)鍵字類似的同步功能,只是在使用時需要顯示地獲取和釋放鎖艺智。
Lock lock = new ReentrantLock();
lock.lock();
try{
} finally {
lock.unlock();
}
不要將獲取鎖的過程寫在try塊中倘要,因為如果在獲取鎖(自定義鎖的實現(xiàn))時發(fā)生了異常,異常拋出的同時十拣,也會導(dǎo)致鎖無故釋放封拧。
2.隊列同步器
隊列同步器AbstractQueuedSynchronizer(以下簡稱同步器),是用來構(gòu)建鎖或者其他同步組件的基礎(chǔ)框架夭问。它使用了一個int成員變量標(biāo)識同步狀態(tài)哮缺,通過內(nèi)置的FIFO隊列來完成資源獲取線程的排隊工作。
同步器的主要使用方式是繼承甲喝,子類通過繼承同步器并實現(xiàn)它的抽象方法來管理同步狀態(tài)尝苇。同步器提供3個方法操作同步狀態(tài),getState()埠胖、setState(int newState)糠溜、compareAndSetState(int expect, int update)。
同步器是實現(xiàn)鎖(也可以是任意同步組件)的關(guān)鍵直撤,在鎖的實現(xiàn)中聚合同步器非竿,利用同步器實現(xiàn)鎖的語義。
可以這樣理解鎖和同步器的關(guān)系:
- 鎖是面向使用者的谋竖,它定義了使用者與鎖交互的接口红柱,隱藏了實現(xiàn)細(xì)節(jié)承匣。
- 同步器面向的是鎖的實現(xiàn)者,它簡化了鎖的實現(xiàn)方式锤悄,屏蔽了同步狀態(tài)管理韧骗、線程的排隊、等待與喚醒等底層操作零聚。
鎖和同步器很好地隔離了使用者和實現(xiàn)者所需關(guān)注的領(lǐng)域袍暴。
①隊列同步器的接口與示例
同步器提供的模板方法基本上分為3類:
- 獨占式獲取與釋放同步狀態(tài)
- 共享式獲取與釋放同步狀態(tài)
- 查詢同步隊列中的等待線程情況
下面通過一個獨占鎖的示例來深入了解一下同步器的工作原理。
獨占鎖就是在同一時刻只能有一個線程獲取到鎖隶症,而其他獲取鎖的線程只能處于同步隊列中等待政模,只有獲取鎖的線程釋放了鎖,后繼的線程才能夠獲取鎖蚂会。
public class Mutex implements Lock {
//靜態(tài)內(nèi)部類淋样,自定義同步器
private static class Sync extends AbstractQueuedSynchronizer {
//是否處于占用狀態(tài)
protected boolean isHeldExclusively() {
return getState() == 1;
}
//當(dāng)狀態(tài)為0的時候獲取鎖
public boolean tryAcquire(int acquires) {
if (compareAndSetState(0,1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
//釋放鎖,將狀態(tài)設(shè)置為0
protected boolean tryRelease(int releases) {
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
//返回一個Condition胁住,每個condition都包含了一個condition隊列
Condition newCondition() {
return new ConditionObject();
}
}
//僅需要將操作代理到Sync上即可
private final Sync sync = new Sync();
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
public boolean isLocked() {
return sync.isHeldExclusively();
}
public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
}
Mutex是一個自定義同步組件习蓬,它在同一時刻只允許一個線程占有鎖。Mutex中定義了一個靜態(tài)內(nèi)部類措嵌,該內(nèi)部類繼承了同步器并實現(xiàn)了獨占式獲取和釋放同步狀態(tài)躲叼。
②隊列同步器的實現(xiàn)分析
接下來將從實現(xiàn)角度分析同步器是如何完成線程同步的。
1)同步隊列
同步器依賴內(nèi)部的同步隊列(一個FIFO雙向隊列)來完成同步狀態(tài)的管理企巢,當(dāng)前線程獲取同步狀態(tài)失敗時枫慷,同步器會將當(dāng)前線程以及等待狀態(tài)等信息構(gòu)造成一個節(jié)點Node并將其加入同步隊列,同時會阻塞當(dāng)前線程浪规,當(dāng)同步狀態(tài)釋放時或听,會把首節(jié)點中的線程喚醒,使其再次嘗試獲取同步狀態(tài)笋婿。
當(dāng)一個線程成功地獲取了同步狀態(tài)(或者鎖)誉裆,其他線程將無法獲取到同步狀態(tài),轉(zhuǎn)而被構(gòu)造成節(jié)點并加入到隊列中缸濒,而這個加入隊列的過程必須要保證線程安全足丢,因此同步器提供了一個基于CAS的設(shè)置尾節(jié)點的方法:compareAndSetTail(Node expect, Node update)。
同步隊列遵循FIFO庇配,首節(jié)點是獲取同步狀態(tài)成功的節(jié)點斩跌,首節(jié)點的線程在釋放同步狀態(tài)時,會喚醒后繼接口捞慌,而后繼節(jié)點將會在獲取同步狀態(tài)成功時將自己設(shè)置為首節(jié)點耀鸦。
2)獨占式同步狀態(tài)獲取與釋放
通過調(diào)用同步器的acquire(int arg)方法可以獲取同步狀態(tài),該方法對中斷不敏感啸澡,也就是由于線程獲取同步狀態(tài)失敗后進(jìn)入同步隊列中袖订,后續(xù)對線程進(jìn)行中斷操作時氮帐,線程不會從同步隊列中移除。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
上述代碼主要邏輯:
- 首先調(diào)用自定義同步器實現(xiàn)的tryAcquire(int acquires)方法洛姑,該方法保證線程安全的獲取同步狀態(tài)上沐。
- 如果同步狀態(tài)獲取失敗,則構(gòu)造同步節(jié)點(獨占式Node.EXCLUSIVE吏口,同一時刻只能有一個線程成功獲取同步狀態(tài))并通過addWaiter(Node mode)將節(jié)點加入到同步隊列的尾部奄容。
- 最后調(diào)用acquireQueued(final Node node, int arg)使得該節(jié)點以“死循環(huán)”的方式獲取同步狀態(tài)冰更。如果獲取不到則阻塞節(jié)點中的線程产徊,而被阻塞線程的喚醒主要依靠前驅(qū)節(jié)點的出隊或阻塞線程被中斷來實現(xiàn)。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 快速嘗試在尾部添加
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
節(jié)點進(jìn)入同步隊列之后蜀细,就進(jìn)入了一個自旋的過程舟铜,每個節(jié)點(或者說每個線程)都在自省地觀察,當(dāng)條件滿足奠衔,獲取到了同步狀態(tài)谆刨,就可以從這個自旋過程中退出,否則依舊留在這個自旋過程中(并會阻塞節(jié)點的線程)归斤。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
為什么只有頭節(jié)點才能嘗試獲取同步狀態(tài):
- 頭節(jié)點是成功獲取到同步狀態(tài)的節(jié)點痊夭,而頭節(jié)點的線程釋放了同步狀態(tài)之后,將會喚醒其后繼節(jié)點脏里,后繼節(jié)點的線程被喚醒后需要檢查自己的前驅(qū)節(jié)點是否是頭節(jié)點她我。
- 維護(hù)同步隊列的FIFO原則。該方法中迫横,節(jié)點自旋獲取同步狀態(tài)的行為如下圖番舆。
acquire(int arg)方法調(diào)用流程:
當(dāng)前線程獲取同步狀態(tài)并執(zhí)行了相應(yīng)邏輯之后,就需要釋放同步狀態(tài)矾踱,使得后續(xù)節(jié)點能夠繼續(xù)獲取同步狀態(tài)恨狈。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
方法執(zhí)行時,會喚醒頭節(jié)點的后繼節(jié)點線程呛讲,unparkSuccessor(Node node)方法使用LockSupport來喚醒處于等待狀態(tài)的線程禾怠。
總結(jié):在獲取同步狀態(tài)時,同步器維護(hù)一個同步隊列贝搁,獲取失敗的線程都會被加入到隊列中并在隊列中進(jìn)行自旋刃宵;移除隊列(或停止自旋)的條件是前驅(qū)節(jié)點為頭節(jié)點且成功獲取了同步狀態(tài)。在釋放同步狀態(tài)時徘公,同步器調(diào)用tryRelease(int arg)方法釋放同步狀態(tài)牲证,然后喚醒頭節(jié)點的后繼節(jié)點。
3)共享式同步狀態(tài)獲取與釋放
共享式獲取與獨占式獲取最主要的區(qū)別在于同一時刻能否有多個線程同時獲取到同步狀態(tài)关面。
通過調(diào)用同步器的acquireShared(int arg)方法可以共享地獲取同步狀態(tài)坦袍。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
同步器調(diào)用tryAcquireShared(int arg)嘗試獲取同步狀態(tài)十厢,tryAcquireShared(int arg)返回大于等于0時,表示能夠獲取到同步狀態(tài)捂齐。因此蛮放,在共享式獲取的自旋過程中,成功獲取到同步狀態(tài)并退出自旋的條件就是tryAcquireShared(int arg)返回值大于等于0奠宜“洌可以看到,在doAcquireShared(int arg)的自旋過程中压真,如果當(dāng)前節(jié)點的前驅(qū)為頭節(jié)點時娩嚼,嘗試獲取同步狀態(tài),如果返回值大于等于0滴肿,表示該次獲取同步狀態(tài)成功并從自旋過程中退出岳悟。
與獨占式一樣,共享式獲取也需要釋放同步狀態(tài)泼差,通過調(diào)用releaseShared(int arg)方法可以釋放同步狀態(tài)贵少。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
該方法在釋放同步狀態(tài)之后,將會喚醒后續(xù)處于等待狀態(tài)的節(jié)點堆缘。對于能夠支持多個線程同時訪問的并發(fā)組件(比如Semaphore)滔灶,它和獨占式主要區(qū)別在于tryReleaseShared(int arg)必須確保同步狀態(tài)(或者資源數(shù))線程安全釋放,一般是通過循環(huán)和CAS來保證的吼肥,以為釋放同步狀態(tài)的操作會同時來自多個線程录平。
4)超時獲取同步狀態(tài)
acquireInterruptibly(int arg)獲取同步狀態(tài)時,如果當(dāng)前線程被中斷潜沦,會立刻返回萄涯,并拋出InterruptedException
超時獲取同步狀態(tài),調(diào)用同步器的doAcquireNanos(int arg, long nanosTimeout)唆鸡,它是上述方法的增強(qiáng)版涝影。
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
//當(dāng)已到設(shè)置的超時時間,該線程會從這里返回
LockSupport.parkNanos(this, nanosTimeout);
//當(dāng)nanosTimeout <= spinForTimeoutThreshold時争占,不會使該線程超時等待燃逻,而是進(jìn)入快速的自旋過程。原因在于臂痕,非常短的超時等待無法做到十分精確伯襟。
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
5)自定義同步組件——TwinsLock
public class TwinsLock implements Lock {
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
if (count <= 0) {
throw new IllegalArgumentException("count must large than zero.");
}
setState(count);
}
public int tryAcquireShared(int reduceCount) {
for (;;) {
int current = getState();
int newCount = current - reduceCount;
if (newCount < 0 || compareAndSetState(current, newCount)) {
return newCount;
}
}
}
public boolean tryReleaseShared(int returnCount) {
for (;;) {
int current = getState();
int newCount = current + returnCount;
if (compareAndSetState(current, newCount)) {
return true;
}
}
}
}
private final Sync sync = new Sync(2);
@Override
public void lock() {
sync.acquireShared(1);
}
@Override
public void unlock() {
sync.releaseShared(1);
}
//其他接口方法略
}
public class TwinsLockTest {
Lock lock = new TwinsLock();
class Worker extends Thread {
public void run() {
while (true) {
lock.lock();
try {
SleepUtils.second(1);
System.out.println(Thread.currentThread().getName());
SleepUtils.second(1);
} finally {
lock.unlock();
}
}
}
}
public void test() {
//啟動10個線程
for (int i = 0; i < 10; i++) {
Worker w = new Worker();
w.setDaemon(true);
w.start();
}
//每隔1秒換行
for (int i = 0; i < 10; i++) {
SleepUtils.second(1);
System.out.println();
}
}
public static void main(String[] args) {
new TwinsLockTest().test();
}
}
3.重入鎖
重入鎖ReentrantLock,顧名思義握童,就是支持沖重進(jìn)入的鎖姆怪。自定義的Mutex,占有鎖的線程再次調(diào)用tryAcquire方法時返回false,導(dǎo)致該線程被阻塞稽揭。所以Mutex是一個不支持重新進(jìn)入的鎖俺附。
synchronized關(guān)鍵字隱式支持重進(jìn)入,比如一個synchronized修飾的遞歸方法溪掀。
公平鎖:等待時間最長的線程最優(yōu)先獲取鎖事镣。反之則是不公平鎖。
①實現(xiàn)重進(jìn)入
重進(jìn)入是指任意線程在獲取到鎖之后能夠再次獲取該鎖而不會被鎖所阻塞揪胃。
實現(xiàn)該特性需要解決一下兩個問題:
1)線程再次獲取鎖璃哟。鎖需要去識別獲取鎖的線程是否為當(dāng)前占據(jù)鎖的線程,如果是喊递,則再次成功獲取随闪。
2)鎖的最終釋放。線程重復(fù)n次獲取了鎖册舞,隨后在第n次釋放該鎖后蕴掏,其他線程能夠獲取到該鎖障般。
ReentrantLock通過組合自定義同步器Sync(繼承了AbstractQueuedSynchronizer)來實現(xiàn)鎖的獲取與釋放调鲸,以非公平性(默認(rèn)的)實現(xiàn)為例,獲取同步狀態(tài)的代碼如下:
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
成功獲取鎖的線程再次獲取鎖挽荡,只是增加了同步狀態(tài)值藐石,這也就要求ReentrantLock在釋放同步狀態(tài)時減少同步狀態(tài)值,該方法的代碼如下:
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
②公平與非公平獲取鎖的區(qū)別
公平性與否是針對獲取鎖而言的定拟,如果一個鎖是公平的于微,那么鎖的獲取順序就應(yīng)該符合請求的絕對時間順序,也就是FIFO青自。
公平鎖獲取同步狀態(tài)的方法:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
該方法與nonfairTryAcquire比較株依,唯一不同的位置為判斷條件多了hasQueuedPredecessors方法,即加入了同步隊列中當(dāng)前節(jié)點是否有前驅(qū)節(jié)點的判斷延窜。
下面編寫一個測試來觀察公平和非公平鎖在獲取鎖時的區(qū)別:
public class FairAndUnfairTest {
private static Lock fairLock = new ReentrantLock2(true);
private static Lock unFairLock = new ReentrantLock2(false);
@Test
public void fair() {
testLock(fairLock);
}
@Test
public void unfair() {
testLock(unFairLock);
}
private void testLock(Lock lock) {
//啟動5個Job
for (int i = 0; i < 5; i++) {
Job job = new Job(lock);
job.setName(String.valueOf(i));
job.start();
}
}
private static class Job extends Thread {
private Lock lock;
Job(Lock lock) {
this.lock = lock;
}
public void run() {
//連續(xù)2次打印當(dāng)前的Thread和等待隊列中的Thread
for (int i = 0; i < 2; i++) {
lock.lock();
try {
ReentrantLock2 l = (ReentrantLock2) this.lock;
Collection<Thread> queuedThreads = l.getQueuedThreads();
String collect = queuedThreads.stream().map(Thread::getName).collect(Collectors.joining(","));
System.out.println("Lock by[" + Thread.currentThread().getName() + "].Waiting by [" + collect + "]");
} finally {
lock.unlock();
}
}
}
}
private static class ReentrantLock2 extends ReentrantLock {
ReentrantLock2(boolean fair) {
super(fair);
}
public Collection<Thread> getQueuedThreads() {
ArrayList<Thread> threads = new ArrayList<>(super.getQueuedThreads());
Collections.reverse(threads);
return threads;
}
}
}
公平性鎖每次都是從同步隊列中的第一個節(jié)點獲取到鎖恋腕,非公平性鎖出現(xiàn)了一個線程連續(xù)獲取鎖的情況。
為什么會出現(xiàn)線程連續(xù)獲取鎖的情況呢逆瑞?nonfairTryAcquire方法荠藤,當(dāng)一個線程請求鎖時,只要獲取了同步狀態(tài)即成功獲取鎖获高。在這個前提下哈肖,剛釋放的線程再次獲取同步狀態(tài)的幾率會非常大,使得其他線程只能在同步隊列中等待念秧。
為什么非公平性鎖被設(shè)定成默認(rèn)實現(xiàn)淤井?上表結(jié)果,公平性鎖在測試中進(jìn)行了10次切換,而非公平性鎖只有5次切換币狠,這說明非公平性鎖的開銷更小缎除。
測試:10個線程,每個線程獲取100000次鎖总寻,通過vmstat統(tǒng)計測試運行時系統(tǒng)線程上下文切換的次數(shù)器罐,結(jié)果如下:
公平性鎖保證了鎖的獲取按照FIFO原則,代價是進(jìn)行大量的線程切換渐行。非公平性鎖雖然可能造成線程“饑餓”轰坊,但極少的線程切換,保證了其更大的吞吐量祟印。
4.讀寫鎖
讀寫鎖維護(hù)了一對鎖肴沫,一個讀鎖和一個寫鎖。
當(dāng)寫鎖被獲取到時蕴忆,后續(xù)(非當(dāng)前寫操作線程)的讀寫操作都會被阻塞颤芬,寫鎖釋放之后,所有操作繼續(xù)執(zhí)行套鹅。
一般情況下站蝠,讀寫鎖的性能都會比排它鎖好,因為大多數(shù)場景讀是多于寫的卓鹿。在讀多于寫的情況下菱魔,讀寫鎖能夠提供比排它鎖更好的并發(fā)性和吞吐量。Java并發(fā)包提供讀寫鎖的實現(xiàn)是ReentrantReadWriteLock吟孙。
①讀寫鎖的接口與示例
示例:
public class Cache {
static Map<String, Object> map = new HashMap<>();
static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
static Lock r = rwl.readLock();
static Lock w = rwl.writeLock();
//獲取一個key對應(yīng)的value
public static final Object get(String key) {
r.lock();
try {
return map.get(key);
} finally {
r.unlock();
}
}
//設(shè)置key對應(yīng)的value澜倦,并返回舊的value
public static final Object put(String key, Object value) {
w.lock();
try {
return map.put(key, value);
} finally {
w.unlock();
}
}
//清空所有的內(nèi)存
public static final void clear() {
w.lock();
try {
map.clear();
} finally {
w.unlock();
}
}
}
上述示例中,Cache組合一個非線程安全的HashMap作為緩存的實現(xiàn)杰妓,同事使用讀寫鎖的讀鎖和寫鎖來保證Cache是線程安全的藻治。
②讀寫鎖的實現(xiàn)分析
1)讀寫狀態(tài)的設(shè)計
讀寫鎖的自定義同步器(繼承AQS)需要在同步狀態(tài)(一個整型變量)上維護(hù)多個線程和一個寫線程的狀態(tài),使得該狀態(tài)的設(shè)計成為讀寫鎖實現(xiàn)的關(guān)鍵巷挥。
上圖同步狀態(tài)表示一個線程已經(jīng)獲取了寫鎖桩卵,且重進(jìn)入了兩次,同時也連續(xù)獲取了兩次讀鎖句各。
讀寫鎖通過位運算迅速確定讀和寫各自的狀態(tài)吸占。假設(shè)當(dāng)前同步狀態(tài)為S,寫狀態(tài)等于S&0x0000FFFF(將高16位全部抹去)凿宾,讀狀態(tài)等于S>>>16(無符號補0右移16位)矾屯。當(dāng)寫狀態(tài)增加1時,等于S+1初厚,當(dāng)讀狀態(tài)增加1時件蚕,等于S+(1<<16)孙技,也就是S+0x00010000。
2)寫鎖的獲取與釋放
寫鎖是一個支持重進(jìn)入的排它鎖排作。如果當(dāng)前線程已經(jīng)獲取了寫鎖牵啦,則增加寫狀態(tài)。如果當(dāng)前線程在獲取寫鎖時妄痪,讀鎖已經(jīng)被獲裙(讀狀態(tài)不為0)或者該線程不是已經(jīng)獲取寫鎖的線程,則當(dāng)前線程進(jìn)入等待狀態(tài)衫生。
如果存在讀鎖裳瘪,則寫鎖不能被獲取,原因在于:讀寫鎖要確保寫鎖的操作對讀鎖可見罪针,如果允許讀鎖在已被獲取的情況下對寫鎖的獲取彭羹,那么正在運行的其他讀線程就無法感知到當(dāng)前寫線程的操作。因此泪酱,只有等待其他讀線程都釋放了讀鎖派殷,寫鎖才能被當(dāng)前線程獲取,而寫鎖一旦被獲取墓阀,則其他讀寫線程的后續(xù)訪問均被阻塞毡惜。
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
//存在讀鎖或者當(dāng)前獲取線程不是已經(jīng)獲取寫鎖的線程
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
寫鎖的釋放與ReentrantLock的釋放過程基本類似,每次釋放減少寫狀態(tài)岂津,當(dāng)寫狀態(tài)為0時表示寫鎖已被釋放虱黄,從而等待的讀寫線程能夠繼續(xù)訪問讀寫鎖悦即。
3)讀鎖的獲取與釋放
讀鎖是一個支持重進(jìn)入的共享鎖吮成,它能夠被多個線程同時獲取。
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
//寫鎖已經(jīng)被獲取辜梳,且獲取的線程不是該線程
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
//不需要被放到阻塞隊列粱甫、已經(jīng)存在的讀鎖小于最大值、增加讀狀態(tài)成功
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
//記錄線程獲取讀鎖的次數(shù)
if (r == 0) {//之前沒有線程獲取過讀鎖
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {//第一個獲取讀鎖的是該線程
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
//獲取讀鎖的完整版本作瞄,用于處理tryAcquireShared中CAS失敗的茶宵、重入讀鎖在tryAcquireShared中未處理的
final int fullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// 否則,我們持有獨占鎖宗挥,在這里阻塞會導(dǎo)致死鎖
} else if (readerShouldBlock()) {
// 確保我們不會重新獲得讀鎖
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
讀鎖的每次釋放(線程安全的乌庶,可能有多個讀線程同時釋放讀鎖)均減少狀態(tài),減少的值是(1<<16)契耿。
4)鎖降級
鎖降級指的是寫鎖降級為讀鎖瞒大。如果當(dāng)前線程擁有寫鎖系草,然后將其釋放碍论,最后再獲取讀鎖,這種分段完成的過程不能稱之為鎖降級定页。鎖降級是指把持住(當(dāng)前擁有的)寫鎖酗电,再獲取到讀鎖魄藕,隨后釋放(先前擁有的)寫鎖的過程。
示例:
class CachedData {
Object data;
volatile boolean cacheValid;
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
void processCachedData() {
rwl.readLock().lock();
if (!cacheValid) {
//在獲取寫鎖之前必須釋放讀鎖
rwl.readLock().unlock();
//鎖降級從寫鎖獲取到開始
rwl.writeLock().lock();
try {
//重新檢查狀態(tài)撵术,因為另一個線程可能在我們之前已經(jīng)獲取了寫鎖和更改了狀態(tài)
if (!cacheValid) {
//準(zhǔn)備數(shù)據(jù)的流程(略)
data = ...
cacheValid = true;
}
// 在釋放寫鎖之前通過獲取讀鎖來降級
rwl.readLock().lock();
} finally {
rwl.writeLock().unlock(); //釋放寫鎖背率,仍然保持讀鎖
}
}
try {
//使用數(shù)據(jù)的流程(略)
use(data);
} finally {
rwl.readLock().unlock();
}
}
}
}
5.LockSupport工具
隊列同步器里,當(dāng)需要阻塞或喚醒一個線程的時候嫩与,都會使用LockSupport工具類來完成相應(yīng)工作退渗。
LockSupport定義了一組以park開頭的方法用來阻塞當(dāng)前線程,以及unpark(Thread thread)方法來喚醒一個被阻塞的線程蕴纳。
Java 6中会油,增加了park(Object blocker)、parkNanos(Object blocker, long nanos)古毛、parkUntil(Object blocker, long deadline)翻翩,用于實現(xiàn)阻塞當(dāng)前線程的功能,其中參數(shù)blocker是用來標(biāo)識當(dāng)前線程在等待的對象(以下稱為阻塞對象)稻薇,該對象主要用于問題排查和系統(tǒng)監(jiān)控嫂冻。
6.Condition接口
任意一個Java對象,都擁有一組監(jiān)視器方法(定義在Object上)塞椎,主要包括wait()桨仿、wait(long timeout)、notify()案狠、notifyAll()方法服傍,這些方法與synchronized同步關(guān)鍵字配合,可以實現(xiàn)等待/通知模式骂铁。
Condition接口提供了類似Object的監(jiān)視器方法吹零,與Lock配合可以實現(xiàn)等待/通知模式。
①Condition接口與示例
Condition定義了等待/通知兩種類型的方法拉庵,當(dāng)前線程調(diào)用這些方法時灿椅,需要提前獲取到Condition對象關(guān)聯(lián)的鎖。Condition是依賴Lock對象的钞支,調(diào)用Lock對象的newCondition方法創(chuàng)建茫蛹。
public class ConditionUseCase {
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void conditionWait() throws InterruptedException {
lock.lock();
try {
condition.await();
} finally {
lock.unlock();
}
}
public void conditionSignal() {
lock.lock();
try {
condition.signal();
} finally {
lock.unlock();
}
}
}
示例:
public class BoundedQueue<T> {
private Object[] items;
//添加的下標(biāo),刪除的下標(biāo)和數(shù)組單簽數(shù)量
private int addIndex, removeIndex, count;
private Lock lock = new ReentrantLock();
private Condition notEmpty = lock.newCondition();
private Condition notFull = lock.newCondition();
public BoundedQueue(int size) {
items = new Object[size];
}
//添加一個元素烁挟,如果數(shù)組滿婴洼,則添加線程進(jìn)入等待狀態(tài),直到有“空位”
public void add(T t) throws InterruptedException {
lock.lock();
try {
while (count == items.length) //數(shù)組已滿
notFull.await(); //釋放鎖并進(jìn)入等待狀態(tài)信夫。 收到通知之后獲取鎖并返回
items[addIndex] = t;//添加元素到數(shù)組中
if (++addIndex == items.length)
addIndex = 0;
++count;
notEmpty.signal();//通知等待在notEmpty上的線程窃蹋,數(shù)組中已經(jīng)有新元素可以獲取卡啰。
} finally {
lock.unlock();
}
}
//由頭部刪除一個元素,如果數(shù)組空警没,則刪除線程進(jìn)入等待狀態(tài)匈辱,直到有新添加元素
@SuppressWarnings("unchecked")
public T remove() throws InterruptedException {
lock.lock();
try {
while (count == 0) //使用while而不用if,目的是防止過早或意外的通知杀迹,只有條件符合才能退出循環(huán)亡脸。
notEmpty.await();
Object x = items[removeIndex];
if (++removeIndex == items.length)
removeIndex = 0;
--count;
notFull.signal();
return (T) x;
} finally {
lock.unlock();
}
}
}
②Condition的實現(xiàn)分析
ConditionObject是同步器AbstractQueuedSynchronizer的內(nèi)部類。每個Condition對象都包含著一個隊列(以下稱為等待隊列)树酪,該隊列是Condition對象實現(xiàn)等待/通知功能的關(guān)鍵浅碾。下面提到的Condition不加說明都指的是ConditionObject。
1)等待隊列
等待隊列是一個FIFO的隊列续语,在隊列中的每個節(jié)點都包含了一個線程引用垂谢,該線程就是在Condition對象上等待的線程。節(jié)點的定義復(fù)用了同步器中節(jié)點的定義(AbstractQueuedSynchronizer.Node)疮茄。
調(diào)用Condition.await()方法滥朱,那么該線程將會釋放鎖、構(gòu)造成節(jié)點膠乳等待隊列并進(jìn)入等待狀態(tài)力试。
節(jié)點引用更新的過程并沒有使用CAS保證徙邻,原因在于調(diào)用await()方法的線程必定是獲取了鎖的線程,也就是說該過程是由鎖來保證線程安全的畸裳。
Object的監(jiān)視器模型上缰犁,一個對象擁有一個同步隊列和等待隊列,而并發(fā)包中的Lock(更確切的說是同步器)擁有一個同步隊列和多個等待隊列怖糊,其對應(yīng)關(guān)系如下:
2)等待
調(diào)用Condition.await()方法(或者以await開頭的方法)帅容,會使當(dāng)前線程進(jìn)入等待隊列并釋放鎖,同時線程狀態(tài)變?yōu)榈却隣顟B(tài)蓬抄。當(dāng)從await()方法返回時丰嘉,當(dāng)前線程一定獲取了Condition相關(guān)聯(lián)的鎖。
如果從隊列(同步隊列和等待隊列)的角度看await()方法嚷缭,當(dāng)調(diào)用await方法時,相當(dāng)于同步隊列的首節(jié)點(獲取了鎖的節(jié)點)移動到Condition的等待隊列中耍贾。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//當(dāng)前線程加入等待隊列
Node node = addConditionWaiter();
//釋放同步狀態(tài)阅爽,也就是釋放鎖
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {//isOnSyncQueue:節(jié)點已經(jīng)在同步隊列為true,否則為false
LockSupport.park(this);//阻塞線程
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) //acquireQueued:加入到獲取同步狀態(tài)的競爭
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
該方法將當(dāng)前線程構(gòu)造成節(jié)點并加入等待隊列中荐开,然后釋放同步狀態(tài)付翁,喚醒同步狀態(tài)隊列中的后繼節(jié)點,然后當(dāng)前線程會進(jìn)入等待狀態(tài)晃听。
如上圖所示百侧,同步隊列的首節(jié)點并不會直接加入等待隊列砰识,而是通過addConditionWaiter方法把當(dāng)前線程構(gòu)造成一個新的節(jié)點并將其加入等待隊列中。
3)通知
調(diào)用Condition的signal方法佣渴,將會喚醒在等待隊列中等待時間最長的節(jié)點(首節(jié)點)辫狼,在喚醒節(jié)點之前,會將節(jié)點移動到同步隊列中辛润。
public final void signal() {
if (!isHeldExclusively()) //isHeldExclusively:當(dāng)前線程獲取了鎖膨处,返回true
throw new IllegalMonitorStateException();
Node first = firstWaiter;//獲取等待隊列的首節(jié)點
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node); //將首節(jié)點移動到同步隊列
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);//喚醒節(jié)點中的線程
return true;
}
Condition的signalAll()方法,相當(dāng)于對等待隊列中的每個節(jié)點均執(zhí)行一次signal()方法砂竖,效果就是將等待隊列中所有節(jié)點全部移動到同步隊列中真椿,并喚醒每個節(jié)點的線程。