java提供了內(nèi)置鎖,即synchronized莲祸,除此以外蹂安,還提供了顯式鎖椭迎,下面我們分別分析其實(shí)現(xiàn)的機(jī)制,并討論如何在這兩者之間進(jìn)行選擇田盈。
AQS
AQS即AbstractQueuedSynchronizer侠碧,一般用于管理同步類中的狀態(tài),它管理了一個(gè)整數(shù)狀態(tài)信息缠黍,可以使用getState setState以及compareAndSetState來獲取或修改狀態(tài)弄兜。比如Semaphore可以用這個(gè)狀態(tài)表示剩余的許可數(shù)目,ReentrantLock可以用它來表示當(dāng)前持有鎖的線程已經(jīng)加鎖的次數(shù)等等瓷式。其獲取和釋放操作的偽代碼如下:
boolean acquire() throws InterruptedException {
while(當(dāng)前狀態(tài)不允許獲取) {
if (需要阻塞獲取請(qǐng)求) {
當(dāng)前線程不在隊(duì)列中替饿,則將其插入等待隊(duì)列
將當(dāng)前調(diào)用線程阻塞
} else {
返回失敗
}
}
獲取成功 修改狀態(tài)
將當(dāng)前線程移出等待隊(duì)列
返回成功
}
void release() {
更新狀態(tài)
if (新的狀態(tài)允許別的線程獲取資源) {
選擇一個(gè)或多個(gè)線程喚醒
}
}
比如一個(gè)簡單的二元閉鎖(代碼來自java并發(fā)編程實(shí)戰(zhàn))
public class OneShotLatch {
private final Sync sync = new Sync();
public void signal() { sync.releaseShared(0); }
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(0);
}
private class Sync extends AbstractQueuedSynchronizer {
protected int tryAcquireShared(int ignored) {
return (getState()) == 1 ? 1 : -1;
}
protected boolean tryReleaseShared(int ignored) {
setState(1);
return true;
}
}
}
AQS state為1時(shí)表示打開,0表示關(guān)閉贸典。await調(diào)用acquireSharedInterruptibly视卢,然后會(huì)去調(diào)用tryAcquireShared,如果state是1則tryAcquireShared返回成功并允許線程通過廊驼,否則線程將進(jìn)入等待線程隊(duì)列中去据过。
signal調(diào)用releaseShared,然后調(diào)用tryReleaseShared妒挎,然后讓所有等待中的線程都再次嘗試請(qǐng)求該同步器绳锅,從而通過閉鎖。另外酝掩,AQS還提供了這些操作的限時(shí)版本鳞芙,從而可以實(shí)現(xiàn)有時(shí)限的等待操作。
acquire
下面我們來看看acquire的代碼:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
上面的代碼是一個(gè)模板方法期虾,先調(diào)用了tryAcquire原朝,如果失敗,則調(diào)用addWaiter將當(dāng)前線程加入等待隊(duì)列中镶苞,然后再使用acquireQueued來嘗試獲取資源喳坠。完成后如果有中斷,則調(diào)用selfInterrupt傳遞中斷狀態(tài)茂蚓。
在addWaiter中壕鹉,使用Node類封裝了當(dāng)前線程,Node的狀態(tài)有:
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
使用prev和next可以形成一個(gè)鏈表:
* +------+ prev +-----+ +-----+
* head | | <---- | | <---- | | tail
* +------+ +-----+ +-----+
關(guān)于狀態(tài)煌贴,直接摘抄jdk8源碼的注釋:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
SIGNAL 這個(gè)記號(hào)表示當(dāng)前node之后的節(jié)點(diǎn)已經(jīng)或即將被阻塞御板,需要再release或者cancel后喚醒一個(gè)或若干個(gè)后續(xù)節(jié)點(diǎn)
CANCELLED表示這個(gè)線程已經(jīng)cancel掉了
CONDITION 說明這個(gè)node在一個(gè)condition queue上 和sync queue沒什么鳥關(guān)系
PROPAGATE releaseShared需要被所有等待中的node得知锥忿,doReleaseShared中會(huì)使用該狀態(tài)
了解了Node類后牛郑,我們來看看addWaiter
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
- 如果目前隊(duì)列非空,則嘗試快速入隊(duì)敬鬓,使用CAS把自己設(shè)置為tail淹朋,并且返回自己對(duì)應(yīng)的node
- 如果快速入隊(duì)失敗了 就調(diào)用enq將當(dāng)前node入隊(duì)
- 首先笙各,如果隊(duì)列為空的話會(huì)在隊(duì)列中添加第一個(gè)dummy的節(jié)點(diǎn),此時(shí)head == tail ==dummy節(jié)點(diǎn) 如果失敗說明已經(jīng)有別的線程設(shè)置過tail了 就再循環(huán)之前的操作
- tail非空础芍,則自旋的將自己入隊(duì)
入隊(duì)以后杈抢,就調(diào)用acquireQueued。
- 獲取該節(jié)點(diǎn)前驅(qū) 如果前驅(qū)是head仑性,說明前面已經(jīng)沒有等待中的節(jié)點(diǎn)了惶楼,就嘗試tryAcquire,成的話講當(dāng)前節(jié)點(diǎn)設(shè)置為head诊杆,返回
- 沒有tryAcquire成功歼捐,則查看是否應(yīng)當(dāng)block當(dāng)前線程 喚醒后檢查thread.interrupted()狀態(tài)
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
下面看看shouldParkAfterFailedAcquire,這個(gè)方法檢查一個(gè)acquire失敗的線程的狀態(tài)晨汹,看看是否需要park(block)它豹储。先看看node的prev的狀態(tài)
- 如果是SIGNAL說明已經(jīng)做好稍后被喚醒的準(zhǔn)備了,返回true表明可以被park淘这。
- 如果大于0說明被cancel了剥扣,那么一直找prev直到找到狀態(tài)小于等于0的節(jié)點(diǎn)
- 如果都不是 那么把pred的狀態(tài)設(shè)置為SIGNAL 為之后的park做準(zhǔn)備,進(jìn)入下一次acquireQueued的循環(huán)中
- 每次喚醒后铝穷,都會(huì)檢查并傳遞中斷狀態(tài) 用acquireQueued返回钠怯,如果有中斷 則拋出中斷異常
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
release
然后看看release的過程,會(huì)調(diào)用子類的tryRelease曙聂,這是一個(gè)模板方法模式呻疹。如果處理成功且head不為空且不是開始時(shí)添加的一個(gè)dummy head,則嘗試調(diào)用unparkSuccessor喚起后繼的線程們筹陵。
如果head的后繼是null或已經(jīng)被cancel了 則從tail開始向前找到一個(gè)等待中的線程刽锤。
如果有可以喚醒的線程,喚醒之朦佩,讓其重新回到acquireQueued的無限循環(huán)中去
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
synchronized
被動(dòng)鎖并思,即synchronized方法或代碼塊的原理與基于AQS的主動(dòng)鎖有所區(qū)別,先來一段代碼:
public class TestSynchronized {
public static void main(String[] args) {
synchronized (TestSynchronized.class) {
}
}
public synchronized void method() {
}
}
javap -c -verbose 以后看到:
public static void main(java.lang.String[]);
descriptor: ([Ljava/lang/String;)V
flags: ACC_PUBLIC, ACC_STATIC
Code:
stack=2, locals=3, args_size=1
0: ldc #2 // class test/TestSynchronized
2: dup
3: astore_1
4: monitorenter
5: aload_1
6: monitorexit
7: goto 15
10: astore_2
11: aload_1
12: monitorexit
13: aload_2
14: athrow
15: return
Exception table:
from to target type
5 7 10 any
10 13 10 any
LineNumberTable:
line 7: 0
line 8: 5
line 9: 15
StackMapTable: number_of_entries = 2
frame_type = 255 /* full_frame */
offset_delta = 10
locals = [ class "[Ljava/lang/String;", class java/lang/Object ]
stack = [ class java/lang/Throwable ]
frame_type = 250 /* chop */
offset_delta = 4
public synchronized void method();
descriptor: ()V
flags: ACC_PUBLIC, ACC_SYNCHRONIZED
Code:
stack=0, locals=1, args_size=1
0: return
LineNumberTable:
line 12: 0
可以看到语稠,同步代碼塊前后有monitorenter 和 monitorexit 指令宋彼,而同步方法是在修飾符上添加了ACC_SYNCHRONIZED。
那么仙畦,synchronized到底是靠什么實(shí)現(xiàn)的呢输涕?要了解這些,我們首先要明確兩個(gè)概念:對(duì)象頭和monitor慨畸。
對(duì)象頭
HotSpot的每個(gè)對(duì)象都有一個(gè)頭部莱坎,包括兩部分信息:Mark Word(標(biāo)記字段)和 Klass Pointer(類型指針)。
其中:
- Mark Word 用于存儲(chǔ)對(duì)象自身的運(yùn)行時(shí)數(shù)據(jù)寸士,如哈希碼(HashCode)檐什、GC分代年齡碴卧、鎖狀態(tài)標(biāo)志、線程持有的鎖乃正、偏向線程 ID住册、偏向時(shí)間戳等等。如果對(duì)象是一個(gè)數(shù)組瓮具,對(duì)象頭中還會(huì)有一塊用于記錄數(shù)組長度的數(shù)據(jù)荧飞。在不同的情況下,mark word可以變長的表示不同的含義名党,例如在 32 位的HotSpot 虛擬機(jī)中對(duì)象未被鎖定的狀態(tài)下垢箕,Mark Word 的 32個(gè)Bits 空間中的 25Bits 用于存儲(chǔ)對(duì)象哈希碼(HashCode),4Bits 用于存儲(chǔ)對(duì)象分代年齡兑巾,2Bits 用于存儲(chǔ)鎖標(biāo)志位条获,1Bit固定為0,在其他狀態(tài)(輕量級(jí)鎖定蒋歌、重量級(jí)鎖定帅掘、GC標(biāo)記、可偏向)下對(duì)象的存儲(chǔ)內(nèi)容如下表所示:
存儲(chǔ)內(nèi)容 | 標(biāo)志位 | 狀態(tài) |
---|---|---|
對(duì)象哈希碼堂油、對(duì)象分代年齡 | 01 | 未鎖定 |
指向鎖記錄的指針 | 00 | 輕量級(jí)鎖定 |
指向重量級(jí)鎖的指針 | 10 | 膨脹(重量級(jí)鎖定) |
空修档,不需要記錄信息 | 11 | GC標(biāo)記 |
偏向線程ID、偏向時(shí)間戳府框、對(duì)象分代年齡 | 01 | 可偏向 |
注意偏向鎖吱窝、輕量級(jí)鎖、重量級(jí)鎖等都是jdk 1.6以后引入的哦迫靖。
monitor record
每一個(gè)被鎖住的對(duì)象都與一個(gè)monitor record關(guān)聯(lián)院峡。每一個(gè)線程都有一個(gè)可用monitor record列表,同時(shí)還有一個(gè)全局的可用monitor record列表系宜。每個(gè)monitor record的結(jié)構(gòu)如下所示:
Monitor Record |
---|
Owner |
EntryQ |
RcThis |
Nest |
HashCode |
Candidate |
- Owner:初始時(shí)為NULL表示當(dāng)前沒有任何線程擁有該monitor照激,當(dāng)線程成功擁有該鎖后保存線程唯一標(biāo)識(shí),當(dāng)鎖被釋放時(shí)又設(shè)置為NULL盹牧;
- EntryQ:關(guān)聯(lián)一個(gè)系統(tǒng)互斥鎖(semaphore)俩垃,阻塞所有試圖鎖住monitor失敗的線程。
- RcThis:表示blocked或waiting在該monitor上的所有線程的個(gè)數(shù)汰寓。
- Nest:用來實(shí)現(xiàn)重入鎖的計(jì)數(shù)口柳。
- HashCode:保存從對(duì)象頭拷貝過來的HashCode值(可能還包含GC age)序厉。
- Candidate:0表示沒有需要喚醒的線程罗心,1表示要喚醒一個(gè)繼任線程來競爭鎖刀森。
在 java 虛擬機(jī)中沽翔,線程一旦進(jìn)入到被synchronized修飾的方法或代碼塊時(shí)种远,指定的鎖對(duì)象通過某些操作將對(duì)象頭中的LockWord指向monitor 的起始地址與之關(guān)聯(lián)车胡,同時(shí)monitor 中的Owner存放擁有該鎖的線程的唯一標(biāo)識(shí)拧晕,確保一次只能有一個(gè)線程執(zhí)行該部分的代碼倦春,線程在獲取鎖之前不允許執(zhí)行該部分的代碼睛榄。
偏向鎖
public class TestSynchronized {
private static Object lock = new Object();
public static void main(String[] args) {
method1();
method2();
}
synchronized static void method1() {}
synchronized static void method2() {}
}
偏向所是指若某一鎖被線程獲取后荣茫,便進(jìn)入偏向模式,當(dāng)線程再次請(qǐng)求這個(gè)鎖時(shí)场靴,就無需再進(jìn)行相關(guān)的同步操作了啡莉,從而節(jié)約了操作時(shí)間,如果在此之間有其他的線程進(jìn)行了鎖請(qǐng)求旨剥,則鎖退出偏向模式咧欣。比如上面,聯(lián)系兩個(gè)method都去獲取了關(guān)于TestSynchronized.class的鎖轨帜,就是一種偏向鎖魄咕。
輕量級(jí)鎖
通過膨脹一個(gè)處于01狀態(tài)的對(duì)象的對(duì)象頭,或者是將已處于膨脹狀態(tài)但monitor record中Owner為NULL的monitor record通過CAS置換為當(dāng)前線程蚌父,可以獲取鎖哮兰。
輕量級(jí)鎖會(huì)不斷的自旋來嘗試CAS獲取當(dāng)前的鎖
重量級(jí)鎖
當(dāng)自旋一定次數(shù)以后仍然沒獲取鎖,那么就需要調(diào)用操作系統(tǒng)重量級(jí)的互斥鎖了苟弛,此后喝滞,在鎖被釋放前所有試圖獲取鎖的線程都將被掛起。
synchronized的用處
synchronized主要有兩方面的用途:
- 防止競爭 保證某些代碼同時(shí)只有一個(gè)線程執(zhí)行膏秫,防止由于競爭導(dǎo)致邏輯出錯(cuò)
- 內(nèi)存可見性 即獲取鎖時(shí)右遭,線程會(huì)將本地緩存無效,從主內(nèi)存中獲取最新的數(shù)據(jù)缤削;釋放鎖時(shí)窘哈,會(huì)將本地緩存刷新到主內(nèi)存中,保證其他線程看到最新的數(shù)據(jù)
synchronized和顯示鎖的選擇
從功能上看亭敢,顯示鎖明顯比synchronized更為豐富宵距,可以選擇獲取鎖超時(shí)時(shí)間等,也可以自由的選擇加鎖的區(qū)域和鎖是否是公平鎖等特征吨拗。但是满哪,用synchronized 管理鎖定請(qǐng)求和釋放時(shí),JVM 在生成線程轉(zhuǎn)儲(chǔ)時(shí)能夠包括鎖定信息劝篷。這些對(duì)調(diào)試非常有價(jià)值哨鸭,因?yàn)樗鼈兡軜?biāo)識(shí)死鎖或者其他異常行為的來源。其次娇妓,作為一種內(nèi)置的鎖機(jī)制像鸡,可能會(huì)隨著jdk的升級(jí)而得到優(yōu)化,比如jdk 1.6以后的synchronized就比之前執(zhí)行效率提高了很多。所以只估,在滿足需求的情況下志群,建議優(yōu)先使用synchronized來加鎖。