不知道你有沒有想過统舀,那些去申請鎖的線程都怎樣了?有些可能申請到了鎖卿捎,馬上就能執(zhí)行業(yè)務(wù)代碼。但是如果有一個(gè)鎖被很多個(gè)線程需要径密,那么這些線程是如何被處理的呢午阵?
今天我們走進(jìn)synchronized 重量級鎖,看看那些沒有申請到鎖的線程都怎樣了享扔。
ps: 如果你不想看分析結(jié)果,可以拉到最后底桂,末尾有一張總結(jié)圖,一圖勝千言
之前文章分析過synchroinzed中鎖的優(yōu)化,但是如果存在大量競爭的情況下惧眠,那么最終還是都會變成重量級鎖戚啥。所以我們這里開始直接分析重量級鎖的代碼。
申請鎖
在ObjectMonitor::enter函數(shù)中锉试,有很多判斷和優(yōu)化執(zhí)行的邏輯猫十,但是核心還是通過EnterI函數(shù)實(shí)際進(jìn)入隊(duì)列將將當(dāng)前線程阻塞
void ObjectMonitor::EnterI(TRAPS) {
Thread * const Self = THREAD;
// CAS嘗試將當(dāng)前線程設(shè)置為持有鎖的線程
if (TryLock (Self) > 0) {
assert(_succ != Self, "invariant");
assert(_owner == Self, "invariant");
assert(_Responsible != Self, "invariant");
return;
}
// 通過自旋方式調(diào)用tryLock再次嘗試,操作系統(tǒng)認(rèn)為會有一些微妙影響
if (TrySpin(Self) > 0) {
assert(_owner == Self, "invariant");
assert(_succ != Self, "invariant");
assert(_Responsible != Self, "invariant");
return;
}
...
// 將當(dāng)前線程構(gòu)建成ObjectWaiter
ObjectWaiter node(Self);
Self->_ParkEvent->reset();
node._prev = (ObjectWaiter *) 0xBAD;
node.TState = ObjectWaiter::TS_CXQ;
ObjectWaiter * nxt;
for (;;) {
// 通過CAS方式將ObjectWaiter對象插入CXQ隊(duì)列頭部中
node._next = nxt = _cxq;
if (Atomic::cmpxchg(&node, &_cxq, nxt) == nxt) break;
// 由于cxq改變呆盖,導(dǎo)致CAS失敗拖云,這里進(jìn)行tryLock重試
if (TryLock (Self) > 0) {
assert(_succ != Self, "invariant");
assert(_owner == Self, "invariant");
assert(_Responsible != Self, "invariant");
return;
}
}
// 阻塞當(dāng)前線程
for (;;) {
if (TryLock(Self) > 0) break;
assert(_owner != Self, "invariant");
// park self
if (_Responsible == Self) {
Self->_ParkEvent->park((jlong) recheckInterval);
recheckInterval *= 8;
if (recheckInterval > MAX_RECHECK_INTERVAL) {
recheckInterval = MAX_RECHECK_INTERVAL;
}
} else {
Self->_ParkEvent->park();
}
...
if (TryLock(Self) > 0) break;
++nWakeups;
if (TrySpin(Self) > 0) break;
...
}
...
// Self已經(jīng)獲取到鎖了,需要將它從CXQ或者EntryList中移除
UnlinkAfterAcquire(Self, &node);
...
}
- 在入隊(duì)之前,會調(diào)用tryLock嘗試通過CAS操作將_owner(當(dāng)前ObjectMonitor對象鎖持有的線程指針)字段設(shè)置為Self(指向當(dāng)前執(zhí)行的線程),如果設(shè)置成功应又,表示當(dāng)前線程獲得了鎖宙项,否則沒有。
int ObjectMonitor::TryLock(Thread * Self) {
void * own = _owner;
if (own != NULL) return 0;
if (Atomic::replace_if_null(Self, &_owner)) {
return 1;
}
return -1;
}
如果tryLock沒有成功株扛,又會再次調(diào)用tryLock(trySpin中調(diào)用了tryLock)去嘗試獲取鎖尤筐,因?yàn)檫@樣可以告訴操作系統(tǒng)我迫切需要這個(gè)資源汇荐,希望能盡量分配給我。不過這種親和力并不是一定能得到保證的協(xié)議盆繁,只是一種積極的操作掀淘。
通過 ObjectWaiter對象將當(dāng)前線程包裹起來,入到 CXQ 隊(duì)列的頭部
阻塞當(dāng)前線程(通過pthread_cond_wait)
當(dāng)線程被喚醒而獲取了鎖油昂,調(diào)用UnlinkAfterAcquire方法將ObjectWaiter從CXQ或者EntryList中移除
核心數(shù)據(jù)結(jié)構(gòu)
ObjectMonitor對象中保存了 sychronized 阻塞的線程的隊(duì)列革娄,以及實(shí)現(xiàn)了不同的隊(duì)列調(diào)度策略,因此我們有必須先來認(rèn)識下這個(gè)對象的一些重要屬性
class ObjectMonitor {
// mark word
volatile markOop _header;
// 指向擁有線程或BasicLock的指針
void * volatile _owner;
// monitor的先前所有者的線程ID
volatile jlong _previous_owner_tid;
// 重入次數(shù)冕碟,第一次為0
volatile intptr_t _recursions;
// 下一個(gè)被喚醒的線程
Thread * volatile _succ;
// 線程在進(jìn)入或者重新進(jìn)入時(shí)被阻塞的列表,由ObjectWaiter組成,相當(dāng)于對線程的一個(gè)封裝對象
ObjectWaiter * volatile _EntryList;
// CXQ隊(duì)列存儲的是enter的時(shí)候因?yàn)殒i已經(jīng)被別的線程阻塞而進(jìn)不來的線程
ObjectWaiter * volatile _cxq;
// 處于wait狀態(tài)(調(diào)用了wait())的線程拦惋,會被加入到waitSet
ObjectWaiter * volatile _WaitSet;
// 省略其他屬性以及方法
}
class ObjectWaiter : public StackObj {
public:
enum TStates { TS_UNDEF, TS_READY, TS_RUN, TS_WAIT, TS_ENTER, TS_CXQ };
// 后一個(gè)節(jié)點(diǎn)
ObjectWaiter * volatile _next;
// 前一個(gè)節(jié)點(diǎn)
ObjectWaiter * volatile _prev;
// 線程
Thread* _thread;
// 線程狀態(tài)
volatile TStates TState;
public:
ObjectWaiter(Thread* thread);
};
看到ObjectWaiter中的_next和_prev你就會明白,這是使用了雙向隊(duì)列實(shí)現(xiàn)等待隊(duì)列的的安寺,但是實(shí)際上我們上面的入隊(duì)操作并沒有形成雙向列表厕妖,形成雙向列表是在exit鎖的時(shí)候。
wait
Java Object 類提供了一個(gè)基于 native 實(shí)現(xiàn)的 wait 和 notify 線程間通訊的方式,JDK中wait/notify/notifyAll全部是通過native實(shí)現(xiàn)的挑庶,當(dāng)然到了JVM叹放,它的實(shí)現(xiàn)還是在 src/hotspot/share/runtime/objectMonitor.cpp
中。
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
Thread * const Self = THREAD;
JavaThread *jt = (JavaThread *)THREAD;
...
// 如果線程被中斷挠羔,需要拋出異常
if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) {
THROW(vmSymbols::java_lang_InterruptedException());
return;
}
jt->set_current_waiting_monitor(this);
// 構(gòu)造 ObjectWaiter節(jié)點(diǎn)
ObjectWaiter node(Self);
node.TState = ObjectWaiter::TS_WAIT;
...
// 將ObjectWaiter加入WaitSet的尾部
AddWaiter(&node);
// 讓出鎖
exit(true, Self);
...
// 調(diào)研park()井仰,阻塞當(dāng)前線程
if (interruptible && (Thread::is_interrupted(THREAD, false) || HAS_PENDING_EXCEPTION)) {
// Intentionally empty
} else if (node._notified == 0) {
if (millis <= 0) {
Self->_ParkEvent->park();
} else {
ret = Self->_ParkEvent->park(millis);
}
}
...
}
// 將node插入雙向列表_WaitSet的尾部
inline void ObjectMonitor::AddWaiter(ObjectWaiter* node) {
if (_WaitSet == NULL) {
_WaitSet = node;
node->_prev = node;
node->_next = node;
} else {
ObjectWaiter* head = _WaitSet;
ObjectWaiter* tail = head->_prev;
tail->_next = node;
head->_prev = node;
node->_next = head;
node->_prev = tail;
}
上面我把wait的主要方法邏輯列出來了,主要會執(zhí)行以下步驟
- 首先判斷當(dāng)前線程是否被中斷破加,如果被中斷了需要拋出InterruptedException
- 如果沒有被中斷俱恶,則會使用當(dāng)前線程構(gòu)造ObjectWaiter節(jié)點(diǎn),將其插入雙向鏈表WaitSet的尾部
- 調(diào)用exit,讓出鎖(讓出鎖的邏輯會在后面分析)
- 調(diào)用park(實(shí)際上是調(diào)用pthread_cond_wait)阻塞當(dāng)前線程
notify
同樣的notify的邏輯也是在ObjectMonitory.cpp中
void ObjectMonitor::notify(TRAPS) {
CHECK_OWNER();
// waitSet為空范舀,直接返回
if (_WaitSet == NULL) {
TEVENT(Empty-Notify);
return;
}
DTRACE_MONITOR_PROBE(notify, this, object(), THREAD);
// 喚醒某個(gè)線程
INotify(THREAD);
OM_PERFDATA_OP(Notifications, inc(1));
}
在notify中首先會判斷waitSet是否為空合是,如果為空,表示沒有線程在等待锭环,則直接返回聪全。否則則調(diào)用INotify方法。
notifyAll方法實(shí)際上是循環(huán)調(diào)用INotify
void ObjectMonitor::INotify(Thread * Self) {
// notify之前需要獲取一個(gè)鎖辅辩,保證并發(fā)安全
Thread::SpinAcquire(&_WaitSetLock, "WaitSet - notify");
// 移除并返回WaitSet中的第一個(gè)元素,比如之前waitSet中是1 <--> 2 <--> 3,現(xiàn)在是返回1难礼,然后waitSet變成 2<-->3
ObjectWaiter * iterator = DequeueWaiter();
if (iterator != NULL) {
// Disposition - what might we do with iterator ?
// a. add it directly to the EntryList - either tail (policy == 1)
// or head (policy == 0).
// b. push it onto the front of the _cxq (policy == 2).
// For now we use (b).
// 設(shè)置線程狀態(tài)
iterator->TState = ObjectWaiter::TS_ENTER;
iterator->_notified = 1;
iterator->_notifier_tid = JFR_THREAD_ID(Self);
ObjectWaiter * list = _EntryList;
if (list != NULL) {
assert(list->_prev == NULL, "invariant");
assert(list->TState == ObjectWaiter::TS_ENTER, "invariant");
assert(list != iterator, "invariant");
}
// prepend to cxq
if (list == NULL) {
iterator->_next = iterator->_prev = NULL;
_EntryList = iterator;
} else {
iterator->TState = ObjectWaiter::TS_CXQ;
for (;;) {
// 將需要喚醒的node放到CXQ的頭部
ObjectWaiter * front = _cxq;
iterator->_next = front;
if (Atomic::cmpxchg(iterator, &_cxq, front) == front) {
break;
}
}
}
iterator->wait_reenter_begin(this);
}
// notify執(zhí)行完成之后釋放waitSet鎖,注意這里并不是釋放線程持有的鎖
Thread::SpinRelease(&_WaitSetLock);
}
notify的邏輯比較簡單,就是將WaitSet的頭節(jié)點(diǎn)從隊(duì)列中移除玫锋,如果EntryList為空蛾茉,則將出隊(duì)節(jié)點(diǎn)放入到EntryList中,如果EntryList不為空撩鹿,則將節(jié)點(diǎn)插入到CXQ列表的頭節(jié)點(diǎn)谦炬。
需要注意的是,notify并沒有釋放鎖,釋放鎖的邏輯是在exit中
exit
當(dāng)一個(gè)線程獲得對象鎖成功之后,就可以執(zhí)行自定義的同步代碼塊了键思。執(zhí)行完成之后會執(zhí)行到 ObjectMonitor 的 exit 函數(shù)中础爬,釋放當(dāng)前對象鎖,方便下一個(gè)線程來獲取這個(gè)鎖吼鳞。
void ObjectMonitor::exit(bool not_suspended, TRAPS) {
Thread * const Self = THREAD;
if (THREAD != _owner) {
// 鎖的持有者是當(dāng)前線程
if (THREAD->is_lock_owned((address) _owner)) {
assert(_recursions == 0, "invariant");
_owner = THREAD;
_recursions = 0;
} else {
assert(false, "Non-balanced monitor enter/exit! Likely JNI locking");
return;
}
}
// 重入次數(shù)減去1
if (_recursions != 0) {
_recursions--; // this is simple recursive enter
return;
}
for (;;) {
...
w = _EntryList;
// 如果entryList不為空看蚜,則將
if (w != NULL) {
assert(w->TState == ObjectWaiter::TS_ENTER, "invariant");
// 執(zhí)行unpark,讓出鎖
ExitEpilog(Self, w);
return;
}
w = _cxq;
...
_EntryList = w;
ObjectWaiter * q = NULL;
ObjectWaiter * p;
// 這里將_cxq或者說_EntryList從單向鏈表變成了一個(gè)雙向鏈表
for (p = w; p != NULL; p = p->_next) {
guarantee(p->TState == ObjectWaiter::TS_CXQ, "Invariant");
p->TState = ObjectWaiter::TS_ENTER;
p->_prev = q;
q = p;
}
w = _EntryList;
if (w != NULL) {
guarantee(w->TState == ObjectWaiter::TS_ENTER, "invariant");
// 執(zhí)行unpark,讓出鎖
ExitEpilog(Self, w);
return;
}
...
}
...
}
void ObjectMonitor::ExitEpilog(Thread * Self, ObjectWaiter * Wakee) {
// Exit protocol:
// 1. ST _succ = wakee
// 2. membar #loadstore|#storestore;
// 2. ST _owner = NULL
// 3. unpark(wakee)
_succ = Wakee->_thread;
ParkEvent * Trigger = Wakee->_event;
Wakee = NULL;
// Drop the lock
OrderAccess::release_store(&_owner, (void*)NULL);
OrderAccess::fence();
...
// 釋放鎖
Trigger->unpark();
}
exit的邏輯還是比較簡單的
如果當(dāng)前是當(dāng)前線程要讓出鎖,那么則查看其重入次數(shù)是否為0赖条,不為0則將重入次數(shù)減去1失乾,然后直接退出常熙。
如果EntryList不為空纬乍,則將EntryList的頭元素中的線程喚醒
將cxq指針賦值給EntryList,然后通過循環(huán)將cxq鏈表變成雙向鏈表裸卫,然后調(diào)用ExitEpilog將CXQ鏈表的頭結(jié)點(diǎn)喚醒(實(shí)際是通過pthread_cond_signal)
從這里之后,EntryList和CXQ就是同一個(gè)了仿贬,因?yàn)閷XQ賦值給了EntryList了。
需要注意的是這里喚醒的線程會繼續(xù)執(zhí)行文章開頭的EnterI方法墓贿,此時(shí)會將ObjectWaiter從EntryList或者CXQ中移除茧泪。
實(shí)戰(zhàn)演示
上面的源碼均是基于JDK12,JDK8中的代碼關(guān)于exit和notify都還有其他策略(選擇哪個(gè)線程),而從JDK9開始就只保留了默認(rèn)策略了聋袋。
所以下面的Java代碼的運(yùn)行結(jié)果無論是在jdk8還是jdk12,得到的結(jié)果都是一樣的队伟。
Object lock = new Object();
Thread t1 = new Thread(() -> {
System.out.println("Thread 1 start!!!!!!");
synchronized (lock) {
try {
lock.wait();
} catch (Exception e) {
}
System.out.println("Thread 1 end!!!!!!");
}
});
Thread t2 = new Thread(() -> {
System.out.println("Thread 2 start!!!!!!");
synchronized (lock) {
try {
lock.wait();
} catch (Exception e) {
}
System.out.println("Thread 2 end!!!!!!");
}
});
Thread t3 = new Thread(() -> {
System.out.println("Thread 3 start!!!!!!");
synchronized (lock) {
try {
lock.wait();
} catch (Exception e) {
}
System.out.println("Thread 3 end!!!!!!");
}
});
Thread t4 = new Thread(() -> {
System.out.println("Thread 4 start!!!!!!");
synchronized (lock) {
try {
System.in.read();
} catch (Exception e) {
}
lock.notify();
lock.notify();
lock.notify();
System.out.println("Thread 4 end!!!!!!");
}
});
Thread t5 = new Thread(() -> {
System.out.println("Thread 5 start!!!!!!");
synchronized (lock) {
System.out.println("Thread 5 end!!!!!!");
}
});
Thread t6 = new Thread(() -> {
System.out.println("Thread 6 start!!!!!!");
synchronized (lock) {
System.out.println("Thread 6 end!!!!!!");
}
});
Thread t7 = new Thread(() -> {
System.out.println("Thread 7 start!!!!!!");
synchronized (lock) {
System.out.println("Thread 7 end!!!!!!");
}
});
t1.start();
sleep_1_second();
t2.start();
sleep_1_second();
t3.start();
sleep_1_second();
t4.start();
sleep_1_second();
t5.start();
sleep_1_second();
t6.start();
sleep_1_second();
t7.start();
上面的代碼很簡單,我們來分析一下幽勒。
線程1,2,3都調(diào)用了wait,所以會阻塞嗜侮,然后WaitSet的鏈表結(jié)構(gòu)如下:
線程4獲取了鎖,在等待一個(gè)輸入
線程5,6,7也在等待鎖啥容,所以他們也會把阻塞锈颗,所以CXQ鏈表結(jié)構(gòu)如下:
當(dāng)線程4輸入任意內(nèi)容,并回車結(jié)束后(調(diào)用了其中的3個(gè)notify方法咪惠,但還未釋放鎖)
線程4讓出鎖之后击吱,由于EntryList不為空,所以會先喚醒EntryList中的線程1,然后接下來會喚醒CXQ隊(duì)列中的線程(后面你可以認(rèn)為CXQ就是EntryList)
所以最終線程執(zhí)行順序?yàn)?code>4 1 3 2 7 6 5,我們的輸出結(jié)果也能驗(yàn)證我們的結(jié)論
Thread 1 start!!!!!!
Thread 2 start!!!!!!
Thread 3 start!!!!!!
Thread 4 start!!!!!!
Thread 5 start!!!!!!
Thread 6 start!!!!!!
Thread 7 start!!!!!!
think123
Thread 4 end!!!!!!
Thread 1 end!!!!!!
Thread 3 end!!!!!!
Thread 2 end!!!!!!
Thread 7 end!!!!!!
Thread 6 end!!!!!!
Thread 5 end!!!!!!